/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Diff of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 2051 by torben, Fri Aug 23 20:03:47 2013 UTC revision 2172 by torben, Sat May 17 10:53:58 2014 UTC
# Line 7  using System.Diagnostics; Line 7  using System.Diagnostics;
7    
8  using IBM.WMQ;  using IBM.WMQ;
9  using MySql.Data.MySqlClient;  using MySql.Data.MySqlClient;
10    using System.Globalization;
11    
12    using DaoCommon;
13    
14  namespace DaoMqPump2  namespace DaoMqPump2
15  {  {
16      public class Transport      public class Transport
17      {      {
18    
         enum LogfileType {  
             LogTransactions,  
             LogEvents  
         }  
   
19          public static string SQL2MQ = "sql2mq";          public static string SQL2MQ = "sql2mq";
20          public static string MQ2SQL = "mq2sql";          public static string MQ2SQL = "mq2sql";
21    
# Line 27  namespace DaoMqPump2 Line 25  namespace DaoMqPump2
25    
26          StatusData statusData = new StatusData();          StatusData statusData = new StatusData();
27    
28            public Logfile logfile { get; private set; }
29    
30          public string name { get; private set; }          public string name { get; private set; }
31          public string direction { get; private set; }          public string direction { get; private set; }
32          public string queueName { get; private set; }          public string queueName { get; private set; }
# Line 62  namespace DaoMqPump2 Line 62  namespace DaoMqPump2
62                  statusData.transportEnabled = value;                  statusData.transportEnabled = value;
63                  if (value == true)                  if (value == true)
64                  {                  {
65                      this.addLogEntry("Transport enabled");                      logfile.addSingleLogEntry("Transport enabled");
66                  }                  }
67                  else                  else
68                  {                  {
69                      this.addLogEntry("Transport disabled");                      logfile.addSingleLogEntry("Transport disabled");
70                  }                  }
71              }              }
72          }          }
73    
74    
75          private LinkedList<string> logEntries = new LinkedList<string>();  
76    
77    
78          public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)          public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)
# Line 92  namespace DaoMqPump2 Line 92  namespace DaoMqPump2
92              statusData.counter = 0;              statusData.counter = 0;
93              statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";              statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";
94    
95              addLogEntry( "Starting ... " );              logfile = new Logfile(name, controller.logDirectory);
96                logfile.addSingleLogEntry("Starting ... ");
97          }          }
98    
99          ~Transport()          ~Transport()
100          {          {
101              addLogEntry("Stopping ... ");              logfile.addSingleLogEntry("Stopping ... ");
102          }          }
103    
104    
# Line 122  namespace DaoMqPump2 Line 123  namespace DaoMqPump2
123    
124              if (statusData.lastrunOk == true)              if (statusData.lastrunOk == true)
125              {              {
126                  statusData.lastOkTime = getNowString();                  statusData.lastOkTime = Logfile.getNowString();
127    
128                  if (statusData.counter != startCounter)                  if (statusData.counter != startCounter)
129                  {                  {
130                      //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet                      //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet
131                      statusData.lastTransferTime = getNowString();                      statusData.lastTransferTime = Logfile.getNowString();
132                  }                  }
133              }              }
134              else              else
135              {              {
136                  addLogEntry(statusData.lastErrorMessage);                  logfile.addSingleLogEntry(statusData.lastErrorMessage);
137                  statusData.lastErrorTime = getNowString();                  statusData.lastErrorTime = Logfile.getNowString();
138              }              }
139          }          }
140    
# Line 142  namespace DaoMqPump2 Line 143  namespace DaoMqPump2
143              MQQueueManager mqMgr = null;              MQQueueManager mqMgr = null;
144              MQQueue out_queue = null;              MQQueue out_queue = null;
145    
146              string filename = getLogFilename(LogfileType.LogTransactions);              string filename = Logfile.getLogFilename(LogfileType.LogTransactions, controller.logDirectory, name);
147              using (StreamWriter translog = new StreamWriter(filename, true) )              using (StreamWriter translog = new StreamWriter(filename, true) )
148              try              try
149              {              {
150                  //MQ Options                  //MQ Options
151                  Hashtable connProps = getConnectionProperties();                  Hashtable connProps = MQHelper.getConnectionProperties(controller.mqHost, controller.mqChannel);
152                  int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;                  int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
153    
154                  //MySQL Options                  //MySQL Options
# Line 187  namespace DaoMqPump2 Line 188  namespace DaoMqPump2
188                          MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);                          MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
189                          int numrows = updateCmd.ExecuteNonQuery();                          int numrows = updateCmd.ExecuteNonQuery();
190    
191                          translog.WriteLine(getNowString() + " " + msgString + "\n");                          translog.WriteLine(Logfile.getNowString() + " " + msgString);
192    
193                          if (numrows != 1)                          if (numrows != 1)
194                          {                          {
# Line 208  namespace DaoMqPump2 Line 209  namespace DaoMqPump2
209              }              }
210              finally              finally
211              {              {
212                    MQHelper.closeQueueSafe(out_queue);
213                    MQHelper.closeQueueManagerSafe(mqMgr);
214    
                 if (out_queue != null && out_queue.IsOpen)  
                 {  
                     try  
                     {  
                         out_queue.Close();  
                     }  
                     catch (Exception e)  
                     {  
                         Console.WriteLine("Error cleaning up qmgr " + e.Message);  
                     }  
                 }  
   
                 if (mqMgr != null && mqMgr.IsOpen)  
                 {  
                     try  
                     {  
                         mqMgr.Close();  
                     }  
                     catch (Exception e)  
                     {  
                         Console.WriteLine("Error cleaning up qmgr " + e.Message);  
                     }  
                 }  
215    
216              }              }
217          }          }
218    
219          private void transportMq2Sql()          private void transportMq2Sql()
220          {          {
221                int messageCount = 0;
222    
223              MQQueueManager mqMgr = null;              MQQueueManager mqMgr = null;
224              MQQueue in_queue = null;              MQQueue in_queue = null;
225              string filename = getLogFilename(LogfileType.LogTransactions);              string filename = Logfile.getLogFilename(LogfileType.LogTransactions, controller.logDirectory, name);
226              using (StreamWriter translog = new StreamWriter(filename, true))              using (StreamWriter translog = new StreamWriter(filename, true))
227                  try                  try
228                  {                  {
229                      //MQ options                      //MQ options
230                      Hashtable connProps = getConnectionProperties();                      Hashtable connProps = MQHelper.getConnectionProperties(controller.mqHost, controller.mqChannel);
231                      int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;                      int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
232    
233                      //MySQL options                      //MySQL options
# Line 276  namespace DaoMqPump2 Line 258  namespace DaoMqPump2
258                                  if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)                                  if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
259                                  {                                  {
260                                      string msgString = mqMsg.ReadString(mqMsg.MessageLength);                                      string msgString = mqMsg.ReadString(mqMsg.MessageLength);
261                                      System.Console.WriteLine(msgString);                                      //System.Console.WriteLine(msgString);
262    
263    
264                                        // Hvis transaktionen starter med et ? er det ikke en gyldig transaktion
265                                        // validér ligeledes at headeren er gyldig
266                                        if ( Salt2Helper.validateSalt2Header(msgString) == false )
267                                        {
268                                            string discarded_filename = Logfile.getLogFilename(LogfileType.LogTransactions, controller.logDirectory, name);
269                                            using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))
270                                            {
271                                                discardedlog.WriteLine(Logfile.getNowString() + " " + msgString);
272                                            }
273                                            mqMgr.Commit();//fjern den afviste transaktion fra køen
274                                            statusData.discardedCounter++;
275                                            continue; //gå frem til at tage næste transaktion fra køen
276                                        }
277    
278    
279                                      string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng                                      string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
280    
# Line 285  namespace DaoMqPump2 Line 283  namespace DaoMqPump2
283    
284                                      if (numrows == 1)                                      if (numrows == 1)
285                                      {                                      {
286                                          translog.WriteLine(getNowString() + " " + msgString + "\n");                                          translog.WriteLine(Logfile.getNowString() + " " + msgString);
287                                          mqMgr.Commit();                                          mqMgr.Commit();
288                                          statusData.counter++;                                          statusData.counter++;
289    
290    
291                                            messageCount++;// increment per run message counter
292                                            if (messageCount >= 10000) // if we have moved  10000 messages in this run - let it go and give the other transports a change
293                                            {
294                                                isContinue = false;
295                                            }
296    
297    
298    
299                                      }                                      }
300                                      else                                      else
301                                      {                                      {
# Line 328  namespace DaoMqPump2 Line 336  namespace DaoMqPump2
336                  }                  }
337                  catch (Exception e)                  catch (Exception e)
338                  {                  {
339                        //Det vil være mest korrekt at Rollback/backout MQ Transaktionen her - for at være sikker på at Message'n fjernes fra køen
340                        try
341                        {
342                            if (mqMgr != null)
343                            {
344                                mqMgr.Backout();
345                            }
346                        }
347                        catch (Exception e2)
348                        {
349                            logfile.addSingleLogEntry("Error backing out from MQ Transaction: " + e2.Message);
350                        }
351    
352                      statusData.lastrunOk = false;                      statusData.lastrunOk = false;
353    
354                      statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;                      statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
# Line 338  namespace DaoMqPump2 Line 359  namespace DaoMqPump2
359                  finally                  finally
360                  {                  {
361    
362                      if (in_queue != null && in_queue.IsOpen)                      MQHelper.closeQueueSafe(in_queue);
363                      {                      MQHelper.closeQueueManagerSafe(mqMgr);
                         try  
                         {  
                             in_queue.Close();  
                         }  
                         catch (Exception e)  
                         {  
                             Console.WriteLine("Error cleaning up qmgr " + e.Message);  
                         }  
                     }  
                       
                     if (mqMgr != null && mqMgr.IsOpen)  
                     {  
                         try  
                         {  
                             mqMgr.Close();  
                         } catch (Exception e) {  
                             Console.WriteLine("Error cleaning up qmgr " + e.Message);  
                         }  
                     }  
364    
365                  }                  }
366          }          }
# Line 377  namespace DaoMqPump2 Line 379  namespace DaoMqPump2
379              return connectionString;              return connectionString;
380          }          }
381    
         private Hashtable getConnectionProperties()  
         {  
             Hashtable connProperties = new Hashtable();  
             connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);  
             connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);  
             connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel); //WARNING: Hardcoded Channel Value !!!  
             return connProperties;  
         }  
   
   
   
         private string getLogFilename(LogfileType type)  
         {  
   
             DateTime now = DateTime.Now;  
             string filename = controller.logDirectory + "\\";  
   
             switch (type)  
             {  
                 case LogfileType.LogEvents:  
                     filename += "eventlog_";  
                     break;  
   
                 case LogfileType.LogTransactions:  
                     filename += "transactionlog_";  
                     break;  
             }  
   
   
             filename += name + "_" + now.Year.ToString("D4") + now.Month.ToString("D2") + ".log";  
   
             return filename;  
         }  
   
         public string getNowString()  
         {  
             DateTime now = DateTime.Now;  
   
             return now.ToString("s");  
         }  
   
         private void addLogEntry(string msg)  
         {  
             msg = getNowString() + " " + msg;  
             lock (logEntries)  
             {  
                 logEntries.AddFirst(msg);  
   
                 if (logEntries.Count > 20)  
                 {  
                     logEntries.RemoveLast();  
                 }                  
             }  
   
             string filename = getLogFilename(LogfileType.LogEvents);  
             using (StreamWriter eventlog = new StreamWriter(filename, true))  
             {  
                 eventlog.WriteLine(msg);  
             }  
         }  
   
         public string[] getLog()  
         {  
             lock(logEntries)  
             {  
                 List<string> tmpEntries = new List<string>();  
                 foreach (string s in logEntries)  
                 {  
                     tmpEntries.Add(s);  
                 }  
                 return tmpEntries.ToArray();  
             }  
         }  
382    
383      }      }
384  }  }

Legend:
Removed from v.2051  
changed lines
  Added in v.2172

  ViewVC Help
Powered by ViewVC 1.1.20