/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Diff of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 2166 by torben, Fri May 16 18:24:05 2014 UTC revision 2172 by torben, Sat May 17 10:53:58 2014 UTC
# Line 16  namespace DaoMqPump2 Line 16  namespace DaoMqPump2
16      public class Transport      public class Transport
17      {      {
18    
         enum LogfileType {  
             LogTransactions,  
             LogEvents,  
             LogDiscarded  
         }  
   
19          public static string SQL2MQ = "sql2mq";          public static string SQL2MQ = "sql2mq";
20          public static string MQ2SQL = "mq2sql";          public static string MQ2SQL = "mq2sql";
21    
# Line 31  namespace DaoMqPump2 Line 25  namespace DaoMqPump2
25    
26          StatusData statusData = new StatusData();          StatusData statusData = new StatusData();
27    
28            public Logfile logfile { get; private set; }
29    
30          public string name { get; private set; }          public string name { get; private set; }
31          public string direction { get; private set; }          public string direction { get; private set; }
32          public string queueName { get; private set; }          public string queueName { get; private set; }
# Line 66  namespace DaoMqPump2 Line 62  namespace DaoMqPump2
62                  statusData.transportEnabled = value;                  statusData.transportEnabled = value;
63                  if (value == true)                  if (value == true)
64                  {                  {
65                      this.addLogEntry("Transport enabled");                      logfile.addSingleLogEntry("Transport enabled");
66                  }                  }
67                  else                  else
68                  {                  {
69                      this.addLogEntry("Transport disabled");                      logfile.addSingleLogEntry("Transport disabled");
70                  }                  }
71              }              }
72          }          }
73    
74    
75          private LinkedList<string> logEntries = new LinkedList<string>();  
76    
77    
78          public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)          public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)
# Line 96  namespace DaoMqPump2 Line 92  namespace DaoMqPump2
92              statusData.counter = 0;              statusData.counter = 0;
93              statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";              statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";
94    
95              addLogEntry( "Starting ... " );              logfile = new Logfile(name, controller.logDirectory);
96                logfile.addSingleLogEntry("Starting ... ");
97          }          }
98    
99          ~Transport()          ~Transport()
100          {          {
101              addLogEntry("Stopping ... ");              logfile.addSingleLogEntry("Stopping ... ");
102          }          }
103    
104    
# Line 126  namespace DaoMqPump2 Line 123  namespace DaoMqPump2
123    
124              if (statusData.lastrunOk == true)              if (statusData.lastrunOk == true)
125              {              {
126                  statusData.lastOkTime = DaoUtil.getNowString();                  statusData.lastOkTime = Logfile.getNowString();
127    
128                  if (statusData.counter != startCounter)                  if (statusData.counter != startCounter)
129                  {                  {
130                      //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet                      //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet
131                      statusData.lastTransferTime = DaoUtil.getNowString();                      statusData.lastTransferTime = Logfile.getNowString();
132                  }                  }
133              }              }
134              else              else
135              {              {
136                  addLogEntry(statusData.lastErrorMessage);                  logfile.addSingleLogEntry(statusData.lastErrorMessage);
137                  statusData.lastErrorTime = DaoUtil.getNowString();                  statusData.lastErrorTime = Logfile.getNowString();
138              }              }
139          }          }
140    
# Line 146  namespace DaoMqPump2 Line 143  namespace DaoMqPump2
143              MQQueueManager mqMgr = null;              MQQueueManager mqMgr = null;
144              MQQueue out_queue = null;              MQQueue out_queue = null;
145    
146              string filename = getLogFilename(LogfileType.LogTransactions);              string filename = Logfile.getLogFilename(LogfileType.LogTransactions, controller.logDirectory, name);
147              using (StreamWriter translog = new StreamWriter(filename, true) )              using (StreamWriter translog = new StreamWriter(filename, true) )
148              try              try
149              {              {
150                  //MQ Options                  //MQ Options
151                  Hashtable connProps = getConnectionProperties();                  Hashtable connProps = MQHelper.getConnectionProperties(controller.mqHost, controller.mqChannel);
152                  int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;                  int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
153    
154                  //MySQL Options                  //MySQL Options
# Line 191  namespace DaoMqPump2 Line 188  namespace DaoMqPump2
188                          MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);                          MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
189                          int numrows = updateCmd.ExecuteNonQuery();                          int numrows = updateCmd.ExecuteNonQuery();
190    
191                          translog.WriteLine(DaoUtil.getNowString() + " " + msgString);                          translog.WriteLine(Logfile.getNowString() + " " + msgString);
192    
193                          if (numrows != 1)                          if (numrows != 1)
194                          {                          {
# Line 212  namespace DaoMqPump2 Line 209  namespace DaoMqPump2
209              }              }
210              finally              finally
211              {              {
212                    MQHelper.closeQueueSafe(out_queue);
213                    MQHelper.closeQueueManagerSafe(mqMgr);
214    
                 if (out_queue != null && out_queue.IsOpen)  
                 {  
                     try  
                     {  
                         out_queue.Close();  
                     }  
                     catch (Exception e)  
                     {  
                         Console.WriteLine("Error cleaning up qmgr " + e.Message);  
                     }  
                 }  
   
                 if (mqMgr != null && mqMgr.IsOpen)  
                 {  
                     try  
                     {  
                         mqMgr.Close();  
                     }  
                     catch (Exception e)  
                     {  
                         Console.WriteLine("Error cleaning up qmgr " + e.Message);  
                     }  
                 }  
215    
216              }              }
217          }          }
# Line 245  namespace DaoMqPump2 Line 221  namespace DaoMqPump2
221              int messageCount = 0;              int messageCount = 0;
222    
223              MQQueueManager mqMgr = null;              MQQueueManager mqMgr = null;
224              MQQueue in_queue = null;              MQQueue in_queue = null;
225              string filename = getLogFilename(LogfileType.LogTransactions);              string filename = Logfile.getLogFilename(LogfileType.LogTransactions, controller.logDirectory, name);
226              using (StreamWriter translog = new StreamWriter(filename, true))              using (StreamWriter translog = new StreamWriter(filename, true))
227                  try                  try
228                  {                  {
229                      //MQ options                      //MQ options
230                      Hashtable connProps = getConnectionProperties();                      Hashtable connProps = MQHelper.getConnectionProperties(controller.mqHost, controller.mqChannel);
231                      int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;                      int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
232    
233                      //MySQL options                      //MySQL options
# Line 287  namespace DaoMqPump2 Line 263  namespace DaoMqPump2
263    
264                                      // Hvis transaktionen starter med et ? er det ikke en gyldig transaktion                                      // Hvis transaktionen starter med et ? er det ikke en gyldig transaktion
265                                      // validér ligeledes at headeren er gyldig                                      // validér ligeledes at headeren er gyldig
266                                      if ( msgString.StartsWith("?") || validateSalt2Header(msgString) == false )                                      if ( Salt2Helper.validateSalt2Header(msgString) == false )
267                                      {                                      {
268                                          string discarded_filename = getLogFilename(LogfileType.LogDiscarded);                                          string discarded_filename = Logfile.getLogFilename(LogfileType.LogTransactions, controller.logDirectory, name);
269                                          using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))                                          using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))
270                                          {                                          {
271                                              discardedlog.WriteLine( DaoUtil.getNowString() + " " + msgString );                                              discardedlog.WriteLine(Logfile.getNowString() + " " + msgString);
272                                          }                                          }
273                                          mqMgr.Commit();//fjern den afviste transaktion fra køen                                          mqMgr.Commit();//fjern den afviste transaktion fra køen
274                                          statusData.discardedCounter++;                                          statusData.discardedCounter++;
# Line 307  namespace DaoMqPump2 Line 283  namespace DaoMqPump2
283    
284                                      if (numrows == 1)                                      if (numrows == 1)
285                                      {                                      {
286                                          translog.WriteLine( DaoUtil.getNowString() + " " + msgString );                                          translog.WriteLine(Logfile.getNowString() + " " + msgString);
287                                          mqMgr.Commit();                                          mqMgr.Commit();
288                                          statusData.counter++;                                          statusData.counter++;
289    
# Line 370  namespace DaoMqPump2 Line 346  namespace DaoMqPump2
346                      }                      }
347                      catch (Exception e2)                      catch (Exception e2)
348                      {                      {
349                          this.addLogEntry("Error backing out from MQ Transaction: " + e2.Message);                          logfile.addSingleLogEntry("Error backing out from MQ Transaction: " + e2.Message);
350                      }                      }
351    
352                      statusData.lastrunOk = false;                      statusData.lastrunOk = false;
# Line 383  namespace DaoMqPump2 Line 359  namespace DaoMqPump2
359                  finally                  finally
360                  {                  {
361    
362                      if (in_queue != null && in_queue.IsOpen)                      MQHelper.closeQueueSafe(in_queue);
363                      {                      MQHelper.closeQueueManagerSafe(mqMgr);
                         try  
                         {  
                             in_queue.Close();  
                         }  
                         catch (Exception e)  
                         {  
                             Console.WriteLine("Error cleaning up qmgr " + e.Message);  
                         }  
                     }  
                       
                     if (mqMgr != null && mqMgr.IsOpen)  
                     {  
                         try  
                         {  
                             mqMgr.Close();  
                         } catch (Exception e) {  
                             Console.WriteLine("Error cleaning up qmgr " + e.Message);  
                         }  
                     }  
364    
365                  }                  }
366          }          }
# Line 422  namespace DaoMqPump2 Line 379  namespace DaoMqPump2
379              return connectionString;              return connectionString;
380          }          }
381    
         private Hashtable getConnectionProperties()  
         {  
             Hashtable connProperties = new Hashtable();  
             connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);  
             connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);  
             connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel);  
             return connProperties;  
         }  
   
   
   
         private string getLogFilename(LogfileType type)  
         {  
   
             DateTime now = DateTime.Now;  
             string filename = controller.logDirectory + "\\";  
   
             //Find uge nr  
             DateTimeFormatInfo dfi = DateTimeFormatInfo.CurrentInfo;  
             Calendar myCal = CultureInfo.InvariantCulture.Calendar;//System.Globalization  
             int week = myCal.GetWeekOfYear(now, dfi.CalendarWeekRule, dfi.FirstDayOfWeek);  
   
             switch (type)  
             {  
                 case LogfileType.LogEvents:  
                     filename += "eventlog_";  
                     break;  
   
                 case LogfileType.LogTransactions:  
                     filename += "transactionlog_";  
                     break;  
                 case LogfileType.LogDiscarded:  
                     filename += "discardedlog_";  
                     break;  
             }  
   
   
             filename += name + "_" + now.Year.ToString("D4") + "_W" + week.ToString("D2") + ".log";  
   
             return filename;  
         }  
   
         private bool validateSalt2Header(string salt2String)  
         {  
             if (salt2String.Length < 66)  
             {  
                 addLogEntry("Transaction too short - discarding");  
                 return false;  
             }  
   
   
             int result;  
             long result_long;  
   
             string afsender = salt2String.Substring(0, 5);  
             string modtager = salt2String.Substring(5, 5);  
             string afsenderTegnSaet = salt2String.Substring(10, 6);  
             string standardNavn = salt2String.Substring(16, 6);  
             string standardVersion = salt2String.Substring(22, 3);  
             string afsenderSekvensnr = salt2String.Substring(25, 6);  
             string afsenderTidsstempel = salt2String.Substring(31, 14);  
             string afsenderBakkeIdent = salt2String.Substring(45, 5);  
             string modtagerBakkeIdent = salt2String.Substring(50, 5);  
             string transaktionForkortelse = salt2String.Substring(55, 4);  
             string transaktionsLaengde = salt2String.Substring(59, 5);  
             string prioritet = salt2String.Substring(64, 1);  
   
   
               
             if (int.TryParse(standardVersion.Trim(), out result) == false) // standardVersion _skal_ være en int  
             {  
                 addLogEntry("standardVersion not an integer, discarding");  
                 return false;  
             }  
   
             if (int.TryParse(afsenderSekvensnr.Trim(), out result) == false) // afsenderSekvensnr _skal_ være en int  
             {  
                 addLogEntry("afsenderSekvensnr not an integer, discarding");  
                 return false;  
             }  
   
             if (long.TryParse(afsenderTidsstempel.Trim(), out result_long) == false) // afsenderTidsstempel _skal_ være en long  
             {  
                 addLogEntry("afsenderTidsstempel not a long integer, discarding");  
                 return false;  
             }  
   
             if (int.TryParse(transaktionsLaengde.Trim(), out result) == false) // transaktionsLaengde _skal_ være en int  
             {  
                 addLogEntry("transaktionsLaengde not an integer, discarding");  
                 return false;  
             }  
   
             if ( int.TryParse(prioritet.Trim(), out result) == false ) // prioritet _skal_ være en int  
             {  
                 addLogEntry("prioritet not an integer, discarding");  
                 return false;  
             }  
   
             return true;  
         }  
   
         private void addLogEntry(string msg)  
         {  
             msg = DaoUtil.getNowString() + " " + msg;  
             lock (logEntries)  
             {  
                 logEntries.AddFirst(msg);  
   
                 if (logEntries.Count > 20)  
                 {  
                     logEntries.RemoveLast();  
                 }                  
             }  
   
             string filename = getLogFilename(LogfileType.LogEvents);  
             using (StreamWriter eventlog = new StreamWriter(filename, true))  
             {  
                 eventlog.WriteLine(msg);  
             }  
         }  
   
         public string[] getLog()  
         {  
             lock(logEntries)  
             {  
                 List<string> tmpEntries = new List<string>();  
                 foreach (string s in logEntries)  
                 {  
                     tmpEntries.Add(s);  
                 }  
                 return tmpEntries.ToArray();  
             }  
         }  
382    
383      }      }
384  }  }

Legend:
Removed from v.2166  
changed lines
  Added in v.2172

  ViewVC Help
Powered by ViewVC 1.1.20