/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Diff of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 2086 by torben, Wed Nov 27 09:46:11 2013 UTC revision 2172 by torben, Sat May 17 10:53:58 2014 UTC
# Line 9  using IBM.WMQ; Line 9  using IBM.WMQ;
9  using MySql.Data.MySqlClient;  using MySql.Data.MySqlClient;
10  using System.Globalization;  using System.Globalization;
11    
12    using DaoCommon;
13    
14  namespace DaoMqPump2  namespace DaoMqPump2
15  {  {
16      public class Transport      public class Transport
17      {      {
18    
         enum LogfileType {  
             LogTransactions,  
             LogEvents,  
             LogDiscarded  
         }  
   
19          public static string SQL2MQ = "sql2mq";          public static string SQL2MQ = "sql2mq";
20          public static string MQ2SQL = "mq2sql";          public static string MQ2SQL = "mq2sql";
21    
# Line 29  namespace DaoMqPump2 Line 25  namespace DaoMqPump2
25    
26          StatusData statusData = new StatusData();          StatusData statusData = new StatusData();
27    
28            public Logfile logfile { get; private set; }
29    
30          public string name { get; private set; }          public string name { get; private set; }
31          public string direction { get; private set; }          public string direction { get; private set; }
32          public string queueName { get; private set; }          public string queueName { get; private set; }
# Line 64  namespace DaoMqPump2 Line 62  namespace DaoMqPump2
62                  statusData.transportEnabled = value;                  statusData.transportEnabled = value;
63                  if (value == true)                  if (value == true)
64                  {                  {
65                      this.addLogEntry("Transport enabled");                      logfile.addSingleLogEntry("Transport enabled");
66                  }                  }
67                  else                  else
68                  {                  {
69                      this.addLogEntry("Transport disabled");                      logfile.addSingleLogEntry("Transport disabled");
70                  }                  }
71              }              }
72          }          }
73    
74    
75          private LinkedList<string> logEntries = new LinkedList<string>();  
76    
77    
78          public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)          public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)
# Line 94  namespace DaoMqPump2 Line 92  namespace DaoMqPump2
92              statusData.counter = 0;              statusData.counter = 0;
93              statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";              statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";
94    
95              addLogEntry( "Starting ... " );              logfile = new Logfile(name, controller.logDirectory);
96                logfile.addSingleLogEntry("Starting ... ");
97          }          }
98    
99          ~Transport()          ~Transport()
100          {          {
101              addLogEntry("Stopping ... ");              logfile.addSingleLogEntry("Stopping ... ");
102          }          }
103    
104    
# Line 124  namespace DaoMqPump2 Line 123  namespace DaoMqPump2
123    
124              if (statusData.lastrunOk == true)              if (statusData.lastrunOk == true)
125              {              {
126                  statusData.lastOkTime = getNowString();                  statusData.lastOkTime = Logfile.getNowString();
127    
128                  if (statusData.counter != startCounter)                  if (statusData.counter != startCounter)
129                  {                  {
130                      //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet                      //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet
131                      statusData.lastTransferTime = getNowString();                      statusData.lastTransferTime = Logfile.getNowString();
132                  }                  }
133              }              }
134              else              else
135              {              {
136                  addLogEntry(statusData.lastErrorMessage);                  logfile.addSingleLogEntry(statusData.lastErrorMessage);
137                  statusData.lastErrorTime = getNowString();                  statusData.lastErrorTime = Logfile.getNowString();
138              }              }
139          }          }
140    
# Line 144  namespace DaoMqPump2 Line 143  namespace DaoMqPump2
143              MQQueueManager mqMgr = null;              MQQueueManager mqMgr = null;
144              MQQueue out_queue = null;              MQQueue out_queue = null;
145    
146              string filename = getLogFilename(LogfileType.LogTransactions);              string filename = Logfile.getLogFilename(LogfileType.LogTransactions, controller.logDirectory, name);
147              using (StreamWriter translog = new StreamWriter(filename, true) )              using (StreamWriter translog = new StreamWriter(filename, true) )
148              try              try
149              {              {
150                  //MQ Options                  //MQ Options
151                  Hashtable connProps = getConnectionProperties();                  Hashtable connProps = MQHelper.getConnectionProperties(controller.mqHost, controller.mqChannel);
152                  int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;                  int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
153    
154                  //MySQL Options                  //MySQL Options
# Line 189  namespace DaoMqPump2 Line 188  namespace DaoMqPump2
188                          MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);                          MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
189                          int numrows = updateCmd.ExecuteNonQuery();                          int numrows = updateCmd.ExecuteNonQuery();
190    
191                          translog.WriteLine(getNowString() + " " + msgString);                          translog.WriteLine(Logfile.getNowString() + " " + msgString);
192    
193                          if (numrows != 1)                          if (numrows != 1)
194                          {                          {
# Line 210  namespace DaoMqPump2 Line 209  namespace DaoMqPump2
209              }              }
210              finally              finally
211              {              {
212                    MQHelper.closeQueueSafe(out_queue);
213                    MQHelper.closeQueueManagerSafe(mqMgr);
214    
                 if (out_queue != null && out_queue.IsOpen)  
                 {  
                     try  
                     {  
                         out_queue.Close();  
                     }  
                     catch (Exception e)  
                     {  
                         Console.WriteLine("Error cleaning up qmgr " + e.Message);  
                     }  
                 }  
   
                 if (mqMgr != null && mqMgr.IsOpen)  
                 {  
                     try  
                     {  
                         mqMgr.Close();  
                     }  
                     catch (Exception e)  
                     {  
                         Console.WriteLine("Error cleaning up qmgr " + e.Message);  
                     }  
                 }  
215    
216              }              }
217          }          }
218    
219          private void transportMq2Sql()          private void transportMq2Sql()
220          {          {
221                int messageCount = 0;
222    
223              MQQueueManager mqMgr = null;              MQQueueManager mqMgr = null;
224              MQQueue in_queue = null;              MQQueue in_queue = null;
225              string filename = getLogFilename(LogfileType.LogTransactions);              string filename = Logfile.getLogFilename(LogfileType.LogTransactions, controller.logDirectory, name);
226              using (StreamWriter translog = new StreamWriter(filename, true))              using (StreamWriter translog = new StreamWriter(filename, true))
227                  try                  try
228                  {                  {
229                      //MQ options                      //MQ options
230                      Hashtable connProps = getConnectionProperties();                      Hashtable connProps = MQHelper.getConnectionProperties(controller.mqHost, controller.mqChannel);
231                      int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;                      int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
232    
233                      //MySQL options                      //MySQL options
# Line 283  namespace DaoMqPump2 Line 263  namespace DaoMqPump2
263    
264                                      // Hvis transaktionen starter med et ? er det ikke en gyldig transaktion                                      // Hvis transaktionen starter med et ? er det ikke en gyldig transaktion
265                                      // validér ligeledes at headeren er gyldig                                      // validér ligeledes at headeren er gyldig
266                                      if ( msgString.StartsWith("?") || validateSalt2Header(msgString) == false )                                      if ( Salt2Helper.validateSalt2Header(msgString) == false )
267                                      {                                      {
268                                          string discarded_filename = getLogFilename(LogfileType.LogDiscarded);                                          string discarded_filename = Logfile.getLogFilename(LogfileType.LogTransactions, controller.logDirectory, name);
269                                          using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))                                          using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))
270                                          {                                          {
271                                              discardedlog.WriteLine(getNowString() + " " + msgString);                                              discardedlog.WriteLine(Logfile.getNowString() + " " + msgString);
272                                          }                                          }
273                                          mqMgr.Commit();//fjern den afviste transaktion fra køen                                          mqMgr.Commit();//fjern den afviste transaktion fra køen
274                                          statusData.discardedCounter++;                                          statusData.discardedCounter++;
# Line 303  namespace DaoMqPump2 Line 283  namespace DaoMqPump2
283    
284                                      if (numrows == 1)                                      if (numrows == 1)
285                                      {                                      {
286                                          translog.WriteLine(getNowString() + " " + msgString);                                          translog.WriteLine(Logfile.getNowString() + " " + msgString);
287                                          mqMgr.Commit();                                          mqMgr.Commit();
288                                          statusData.counter++;                                          statusData.counter++;
289    
290    
291                                            messageCount++;// increment per run message counter
292                                            if (messageCount >= 10000) // if we have moved  10000 messages in this run - let it go and give the other transports a change
293                                            {
294                                                isContinue = false;
295                                            }
296    
297    
298    
299                                      }                                      }
300                                      else                                      else
301                                      {                                      {
# Line 356  namespace DaoMqPump2 Line 346  namespace DaoMqPump2
346                      }                      }
347                      catch (Exception e2)                      catch (Exception e2)
348                      {                      {
349                          this.addLogEntry("Error backing out from MQ Transaction: " + e2.Message);                          logfile.addSingleLogEntry("Error backing out from MQ Transaction: " + e2.Message);
350                      }                      }
351    
352                      statusData.lastrunOk = false;                      statusData.lastrunOk = false;
# Line 369  namespace DaoMqPump2 Line 359  namespace DaoMqPump2
359                  finally                  finally
360                  {                  {
361    
362                      if (in_queue != null && in_queue.IsOpen)                      MQHelper.closeQueueSafe(in_queue);
363                      {                      MQHelper.closeQueueManagerSafe(mqMgr);
                         try  
                         {  
                             in_queue.Close();  
                         }  
                         catch (Exception e)  
                         {  
                             Console.WriteLine("Error cleaning up qmgr " + e.Message);  
                         }  
                     }  
                       
                     if (mqMgr != null && mqMgr.IsOpen)  
                     {  
                         try  
                         {  
                             mqMgr.Close();  
                         } catch (Exception e) {  
                             Console.WriteLine("Error cleaning up qmgr " + e.Message);  
                         }  
                     }  
364    
365                  }                  }
366          }          }
# Line 408  namespace DaoMqPump2 Line 379  namespace DaoMqPump2
379              return connectionString;              return connectionString;
380          }          }
381    
         private Hashtable getConnectionProperties()  
         {  
             Hashtable connProperties = new Hashtable();  
             connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);  
             connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);  
             connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel);  
             return connProperties;  
         }  
   
   
   
         private string getLogFilename(LogfileType type)  
         {  
   
             DateTime now = DateTime.Now;  
             string filename = controller.logDirectory + "\\";  
   
             //Find uge nr  
             DateTimeFormatInfo dfi = DateTimeFormatInfo.CurrentInfo;  
             Calendar myCal = CultureInfo.InvariantCulture.Calendar;//System.Globalization  
             int week = myCal.GetWeekOfYear(now, dfi.CalendarWeekRule, dfi.FirstDayOfWeek);  
   
             switch (type)  
             {  
                 case LogfileType.LogEvents:  
                     filename += "eventlog_";  
                     break;  
   
                 case LogfileType.LogTransactions:  
                     filename += "transactionlog_";  
                     break;  
                 case LogfileType.LogDiscarded:  
                     filename += "discardedlog_";  
                     break;  
             }  
   
   
             filename += name + "_" + now.Year.ToString("D4") + "_W" + week.ToString("D2") + ".log";  
   
             return filename;  
         }  
   
         public string getNowString()  
         {  
             DateTime now = DateTime.Now;  
   
             return now.ToString("s");  
         }  
   
         private bool validateSalt2Header(string salt2String)  
         {  
             if (salt2String.Length < 66)  
             {  
                 addLogEntry("Transaction too short - discarding");  
                 return false;  
             }  
   
   
             int result;  
             long result_long;  
   
             string afsender = salt2String.Substring(0, 5);  
             string modtager = salt2String.Substring(5, 5);  
             string afsenderTegnSaet = salt2String.Substring(10, 6);  
             string standardNavn = salt2String.Substring(16, 6);  
             string standardVersion = salt2String.Substring(22, 3);  
             string afsenderSekvensnr = salt2String.Substring(25, 6);  
             string afsenderTidsstempel = salt2String.Substring(31, 14);  
             string afsenderBakkeIdent = salt2String.Substring(45, 5);  
             string modtagerBakkeIdent = salt2String.Substring(50, 5);  
             string transaktionForkortelse = salt2String.Substring(55, 4);  
             string transaktionsLaengde = salt2String.Substring(59, 5);  
             string prioritet = salt2String.Substring(64, 1);  
   
   
               
             if (int.TryParse(standardVersion.Trim(), out result) == false) // standardVersion _skal_ være en int  
             {  
                 addLogEntry("standardVersion not an integer, discarding");  
                 return false;  
             }  
   
             if (int.TryParse(afsenderSekvensnr.Trim(), out result) == false) // afsenderSekvensnr _skal_ være en int  
             {  
                 addLogEntry("afsenderSekvensnr not an integer, discarding");  
                 return false;  
             }  
   
             if (long.TryParse(afsenderTidsstempel.Trim(), out result_long) == false) // afsenderTidsstempel _skal_ være en long  
             {  
                 addLogEntry("afsenderSekvensnr not a long integer, discarding");  
                 return false;  
             }  
   
             if (int.TryParse(transaktionsLaengde.Trim(), out result) == false) // transaktionsLaengde _skal_ være en int  
             {  
                 addLogEntry("transaktionsLaengde not an integer, discarding");  
                 return false;  
             }  
   
             if ( int.TryParse(prioritet.Trim(), out result) == false ) // prioritet _skal_ være en int  
             {  
                 addLogEntry("prioritet not an integer, discarding");  
                 return false;  
             }  
   
             return true;  
         }  
   
         private void addLogEntry(string msg)  
         {  
             msg = getNowString() + " " + msg;  
             lock (logEntries)  
             {  
                 logEntries.AddFirst(msg);  
   
                 if (logEntries.Count > 20)  
                 {  
                     logEntries.RemoveLast();  
                 }                  
             }  
   
             string filename = getLogFilename(LogfileType.LogEvents);  
             using (StreamWriter eventlog = new StreamWriter(filename, true))  
             {  
                 eventlog.WriteLine(msg);  
             }  
         }  
   
         public string[] getLog()  
         {  
             lock(logEntries)  
             {  
                 List<string> tmpEntries = new List<string>();  
                 foreach (string s in logEntries)  
                 {  
                     tmpEntries.Add(s);  
                 }  
                 return tmpEntries.ToArray();  
             }  
         }  
382    
383      }      }
384  }  }

Legend:
Removed from v.2086  
changed lines
  Added in v.2172

  ViewVC Help
Powered by ViewVC 1.1.20