/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Diff of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 2010 by torben, Wed Jul 10 18:28:31 2013 UTC revision 2169 by torben, Fri May 16 21:10:02 2014 UTC
# Line 7  using System.Diagnostics; Line 7  using System.Diagnostics;
7    
8  using IBM.WMQ;  using IBM.WMQ;
9  using MySql.Data.MySqlClient;  using MySql.Data.MySqlClient;
10    using System.Globalization;
11    
12    using DaoCommon;
13    
14  namespace DaoMqPump2  namespace DaoMqPump2
15  {  {
16      public class Transport      public class Transport
17      {      {
18    
         enum LogfileType {  
             LogTransactions,  
             LogEvents  
         }  
   
19          public static string SQL2MQ = "sql2mq";          public static string SQL2MQ = "sql2mq";
20          public static string MQ2SQL = "mq2sql";          public static string MQ2SQL = "mq2sql";
21    
# Line 27  namespace DaoMqPump2 Line 25  namespace DaoMqPump2
25    
26          StatusData statusData = new StatusData();          StatusData statusData = new StatusData();
27    
28            public Logfile logfile { get; private set; }
29    
30          public string name { get; private set; }          public string name { get; private set; }
31          public string direction { get; private set; }          public string direction { get; private set; }
32          public string queueName { get; private set; }          public string queueName { get; private set; }
# Line 62  namespace DaoMqPump2 Line 62  namespace DaoMqPump2
62                  statusData.transportEnabled = value;                  statusData.transportEnabled = value;
63                  if (value == true)                  if (value == true)
64                  {                  {
65                      this.addLogEntry("Transport enabled");                      logfile.addSingleLogEntry("Transport enabled");
66                  }                  }
67                  else                  else
68                  {                  {
69                      this.addLogEntry("Transport disabled");                      logfile.addSingleLogEntry("Transport disabled");
70                  }                  }
71              }              }
72          }          }
73    
74    
75          private LinkedList<string> logEntries = new LinkedList<string>();  
76    
77    
78          public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)          public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)
# Line 92  namespace DaoMqPump2 Line 92  namespace DaoMqPump2
92              statusData.counter = 0;              statusData.counter = 0;
93              statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";              statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";
94    
95              addLogEntry( "Starting ... " );              logfile = new Logfile(name, controller.logDirectory);
96                logfile.addSingleLogEntry("Starting ... ");
97          }          }
98    
99          ~Transport()          ~Transport()
100          {          {
101              addLogEntry("Stopping ... ");              logfile.addSingleLogEntry("Stopping ... ");
102          }          }
103    
104    
# Line 122  namespace DaoMqPump2 Line 123  namespace DaoMqPump2
123    
124              if (statusData.lastrunOk == true)              if (statusData.lastrunOk == true)
125              {              {
126                  statusData.lastOkTime = getNowString();                  statusData.lastOkTime = Logfile.getNowString();
127    
128                  if (statusData.counter != startCounter)                  if (statusData.counter != startCounter)
129                  {                  {
130                      //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet                      //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet
131                      statusData.lastTransferTime = getNowString();                      statusData.lastTransferTime = Logfile.getNowString();
132                  }                  }
133              }              }
134              else              else
135              {              {
136                  addLogEntry(statusData.lastErrorMessage);                  logfile.addSingleLogEntry(statusData.lastErrorMessage);
137                  statusData.lastErrorTime = getNowString();                  statusData.lastErrorTime = Logfile.getNowString();
138              }              }
139          }          }
140    
141          private void transportSql2Mq()          private void transportSql2Mq()
142          {          {
143              string filename = getLogFilename(LogfileType.LogTransactions);              MQQueueManager mqMgr = null;
144                MQQueue out_queue = null;
145    
146                string filename = Logfile.getLogFilename(LogfileType.LogTransactions, controller.logDirectory, name);
147              using (StreamWriter translog = new StreamWriter(filename, true) )              using (StreamWriter translog = new StreamWriter(filename, true) )
148              try              try
149              {              {
150                  //stage 1 connect to mq                  //MQ Options
151                  Hashtable connProps = getConnectionProperties();                  Hashtable connProps = getConnectionProperties();
                 MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);  
152                  int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;                  int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
153    
154                  MQQueue out_queue = mqMgr.AccessQueue(queueName, openOptions);                  //MySQL Options
   
   
                 //stage 2 connect to mysql  
155                  string mysqlString = buildMysqlConnString();                  string mysqlString = buildMysqlConnString();
                 MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString);  
                 sqlReadConnection.Open();  
156    
157                  MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString);                  //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
158                  sqlWriteConnection.Open();                  mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
159                    out_queue = mqMgr.AccessQueue(queueName, openOptions);
160                    using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
161                  //stage 3 move messages                  using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
                 string readSql = "CALL " + sql2mqReadQuery + "()";  
                 MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);  
                 MySqlDataReader dataReader = readCmd.ExecuteReader();  
                 while (dataReader.Read())  
162                  {                  {
163                      int id = dataReader.GetInt32(0);                      sqlReadConnection.Open();
164                      string msgString = dataReader.GetString(1);                      sqlWriteConnection.Open();
165    
166                      MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,                      //stage 3 move messages
167                      // same as MQPMO_DEFAULT                      string readSql = "CALL " + sql2mqReadQuery + "()";
168                        MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);
169                        MySqlDataReader dataReader = readCmd.ExecuteReader();
170                        while (dataReader.Read())
171                        {
172                            int id = dataReader.GetInt32(0);
173                            string msgString = dataReader.GetString(1);
174    
175                      MQMessage msg = new MQMessage();                          MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
176                      msg.Format = MQC.MQFMT_STRING;                          // same as MQPMO_DEFAULT
                     msg.CharacterSet = 1252;  
                     msg.WriteString(msgString);  
177    
178                      out_queue.Put(msg, pmo);                          MQMessage msg = new MQMessage();
179                            msg.Format = MQC.MQFMT_STRING;
180                            msg.CharacterSet = 1252;
181                            msg.WriteString(msgString);
182    
183                      //now that the message has been put on queue mark it as such                          out_queue.Put(msg, pmo);
184    
185                      string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";                          //now that the message has been put on queue mark it as such
                     MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);  
                     int numrows = updateCmd.ExecuteNonQuery();  
186    
187                      translog.WriteLine(getNowString() + " " + msgString + "\n");                          string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
188                            MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
189                            int numrows = updateCmd.ExecuteNonQuery();
190    
191                      if (numrows != 1)                          translog.WriteLine(Logfile.getNowString() + " " + msgString);
192                      {  
193                          break;                          if (numrows != 1)
194                            {
195                                break;
196                            }
197                            statusData.counter++;
198                      }                      }
                     statusData.counter++;  
                 }  
                   
199    
200                  //stage 4: everything went smooth so clean up afterwards                  }
                 dataReader.Close();  
                 out_queue.Close();  
                 mqMgr.Close();  
                 sqlReadConnection.Close();  
                 sqlWriteConnection.Close();  
201              }              }
202              catch (Exception e)              catch (Exception e)
203              {              {
204                  statusData.lastrunOk = false;                  statusData.lastrunOk = false;
205                  statusData.lastErrorMessage = name + ".transportSql2Mq error: " + e.Message;                  statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
206                  Console.WriteLine(statusData.lastErrorMessage);                  Console.WriteLine(statusData.lastErrorMessage);
207                    Console.WriteLine(e.StackTrace);
208                  EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);                  EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
209              }              }
210                finally
211                {
212    
213                    if (out_queue != null && out_queue.IsOpen)
214                    {
215                        try
216                        {
217                            out_queue.Close();
218                        }
219                        catch (Exception e)
220                        {
221                            Console.WriteLine("Error cleaning up qmgr " + e.Message);
222                        }
223                    }
224    
225                    if (mqMgr != null && mqMgr.IsOpen)
226                    {
227                        try
228                        {
229                            mqMgr.Close();
230                        }
231                        catch (Exception e)
232                        {
233                            Console.WriteLine("Error cleaning up qmgr " + e.Message);
234                        }
235                    }
236    
237                }
238          }          }
239    
240          private void transportMq2Sql()          private void transportMq2Sql()
241          {          {
242              string filename = getLogFilename(LogfileType.LogTransactions);              int messageCount = 0;
243    
244                MQQueueManager mqMgr = null;
245                MQQueue in_queue = null;
246                string filename = Logfile.getLogFilename(LogfileType.LogTransactions, controller.logDirectory, name);
247              using (StreamWriter translog = new StreamWriter(filename, true))              using (StreamWriter translog = new StreamWriter(filename, true))
248              try                  try
249              {                  {
250                  //stage 1 connect to mq                      //MQ options
251                  Hashtable connProps = getConnectionProperties();                      Hashtable connProps = getConnectionProperties();
252                  MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);                      int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
253                  int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;  
254                        //MySQL options
255                        string mysqlString = buildMysqlConnString();
256    
257                        //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
258                        mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
259                        in_queue = mqMgr.AccessQueue(queueName, openOptions);
260                        using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
261                        {
262    
263                  MQQueue in_queue = mqMgr.AccessQueue(queueName, openOptions);                          sqlConnection.Open();
264    
265    
266                  //stage 2 connect to mysql                          //stage 3 move messages
267                  string mysqlString = buildMysqlConnString();                          bool isContinue = true;
268                  MySqlConnection sqlConnection = new MySqlConnection(mysqlString);                          while (isContinue)
269                  sqlConnection.Open();                          {
270    
271                                MQMessage mqMsg = new MQMessage();
272                                MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
273    
274                  //stage 3 move messages                              mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
                 bool isContinue = true;  
                 while (isContinue)  
                 {  
                       
                     MQMessage mqMsg = new MQMessage();  
                     MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();  
   
                     mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen  
275    
276                      try                              try
277                      {                              {
278                          in_queue.Get(mqMsg, mqGetMsgOpts);                                  in_queue.Get(mqMsg, mqGetMsgOpts);
279                          if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)                                  if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
280                          {                                                              {
281                              string msgString = mqMsg.ReadString(mqMsg.MessageLength);                                      string msgString = mqMsg.ReadString(mqMsg.MessageLength);
282                              System.Console.WriteLine(msgString);                                      //System.Console.WriteLine(msgString);
283    
284    
285                                        // Hvis transaktionen starter med et ? er det ikke en gyldig transaktion
286                                        // validér ligeledes at headeren er gyldig
287                                        if ( msgString.StartsWith("?") || Salt2Helper.validateSalt2Header(msgString) == false )
288                                        {
289                                            string discarded_filename = Logfile.getLogFilename(LogfileType.LogTransactions, controller.logDirectory, name);
290                                            using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))
291                                            {
292                                                discardedlog.WriteLine(Logfile.getNowString() + " " + msgString);
293                                            }
294                                            mqMgr.Commit();//fjern den afviste transaktion fra køen
295                                            statusData.discardedCounter++;
296                                            continue; //gå frem til at tage næste transaktion fra køen
297                                        }
298    
299    
300                                        string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
301    
302                                        MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
303                                        int numrows = sqlcmd.ExecuteNonQuery();
304    
305                                        if (numrows == 1)
306                                        {
307                                            translog.WriteLine(Logfile.getNowString() + " " + msgString);
308                                            mqMgr.Commit();
309                                            statusData.counter++;
310    
311    
312                                            messageCount++;// increment per run message counter
313                                            if (messageCount >= 10000) // if we have moved  10000 messages in this run - let it go and give the other transports a change
314                                            {
315                                                isContinue = false;
316                                            }
317    
                             string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng  
318    
                             MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);  
                             int numrows = sqlcmd.ExecuteNonQuery();  
319    
320                              if (numrows == 1)                                      }
321                              {                                      else
322                                  translog.WriteLine(getNowString() + " " + msgString + "\n");                                      {
323                                  mqMgr.Commit();                                          mqMgr.Backout();
324                                  statusData.counter++;                                          isContinue = false;
325                                        }
326    
327                                    }
328                                    else
329                                    {
330                                        System.Console.WriteLine("Non-text message");
331                                    }
332                              }                              }
333                              else                              catch (MQException mqe)
334                              {                              {
                                 mqMgr.Backout();  
335                                  isContinue = false;                                  isContinue = false;
336                              }                              
337                                    // report reason, if any
338                                    if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
339                                    {
340                                        // special report for normal end
341                                        System.Console.WriteLine("no more messages");
342                                    }
343                                    else
344                                    {
345                                        // general report for other reasons
346                                        System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
347                                        statusData.lastrunOk = false;
348                                    }
349    
350                                }
351    
352    
353                          }                          }
354                          else  
355                        }
356    
357                    }
358                    catch (Exception e)
359                    {
360                        //Det vil være mest korrekt at Rollback/backout MQ Transaktionen her - for at være sikker på at Message'n fjernes fra køen
361                        try
362                        {
363                            if (mqMgr != null)
364                          {                          {
365                              System.Console.WriteLine("Non-text message");                              mqMgr.Backout();
366                          }                          }
367                      }                      }
368                      catch (MQException mqe)                      catch (Exception e2)
369                      {                      {
370                          isContinue = false;                          logfile.addSingleLogEntry("Error backing out from MQ Transaction: " + e2.Message);
371                        }
372    
373                        statusData.lastrunOk = false;
374    
375                        statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
376                        Console.WriteLine(statusData.lastErrorMessage);
377                        Console.WriteLine(e.StackTrace);
378                        EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
379                    }
380                    finally
381                    {
382    
383                          // report reason, if any                      if (in_queue != null && in_queue.IsOpen)
384                          if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)                      {
385                            try
386                          {                          {
387                              // special report for normal end                              in_queue.Close();
                             System.Console.WriteLine("no more messages");  
388                          }                          }
389                          else                          catch (Exception e)
390                          {                          {
391                              // general report for other reasons                              Console.WriteLine("Error cleaning up qmgr " + e.Message);
392                              System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message);;                          }
393                              statusData.lastrunOk = false;                      }
394                        
395                        if (mqMgr != null && mqMgr.IsOpen)
396                        {
397                            try
398                            {
399                                mqMgr.Close();
400                            } catch (Exception e) {
401                                Console.WriteLine("Error cleaning up qmgr " + e.Message);
402                          }                          }
   
403                      }                      }
   
404    
405                  }                  }
   
                 //stage 4: everything went smooth so clean up afterwards  
                 in_queue.Close();  
                 mqMgr.Close();  
                 sqlConnection.Close();  
   
   
             }  
             catch (Exception e)  
             {  
                 statusData.lastrunOk = false;  
   
                 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.Message;  
                 Console.WriteLine(statusData.lastErrorMessage);  
                 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);  
             }  
406          }          }
407    
408          private string buildMysqlConnString()          private string buildMysqlConnString()
# Line 319  namespace DaoMqPump2 Line 413  namespace DaoMqPump2
413              //connectionString += "DATABASE=" + controller.mysqlHost + ";";              //connectionString += "DATABASE=" + controller.mysqlHost + ";";
414              connectionString += "UID=" + controller.mysqlUser + ";";              connectionString += "UID=" + controller.mysqlUser + ";";
415              connectionString += "PASSWORD=" + controller.mysqlPassword + ";";              connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
416                connectionString += "Max Pool Size=20;";
417                //connectionString += "ConnectionReset=true;";//ConnectionReset kræver muligvis at Default Database er angivet - det virker ihvertfald ikke uden
418    
419              return connectionString;              return connectionString;
420          }          }
# Line 328  namespace DaoMqPump2 Line 424  namespace DaoMqPump2
424              Hashtable connProperties = new Hashtable();              Hashtable connProperties = new Hashtable();
425              connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);              connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
426              connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);              connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);
427              connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel); //WARNING: Hardcoded Channel Value !!!              connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel);
428              return connProperties;              return connProperties;
429          }          }
430    
431    
   
         private string getLogFilename(LogfileType type)  
         {  
   
             DateTime now = DateTime.Now;  
             string filename = controller.logDirectory + "\\";  
   
             switch (type)  
             {  
                 case LogfileType.LogEvents:  
                     filename += "eventlog_";  
                     break;  
   
                 case LogfileType.LogTransactions:  
                     filename += "transactionlog_";  
                     break;  
             }  
   
   
             filename += name + "_" + now.Year.ToString("D4") + now.Month.ToString("D2") + ".log";  
   
             return filename;  
         }  
   
         public string getNowString()  
         {  
             DateTime now = DateTime.Now;  
   
             return now.ToString("s");  
         }  
   
         private void addLogEntry(string msg)  
         {  
             msg = getNowString() + " " + msg;  
             lock (logEntries)  
             {  
                 logEntries.AddFirst(msg);  
   
                 if (logEntries.Count > 20)  
                 {  
                     logEntries.RemoveLast();  
                 }                  
             }  
   
             string filename = getLogFilename(LogfileType.LogEvents);  
             using (StreamWriter eventlog = new StreamWriter(filename, true))  
             {  
                 eventlog.WriteLine(msg);  
             }  
         }  
   
         public string[] getLog()  
         {  
             lock(logEntries)  
             {  
                 List<string> tmpEntries = new List<string>();  
                 foreach (string s in logEntries)  
                 {  
                     tmpEntries.Add(s);  
                 }  
                 return tmpEntries.ToArray();  
             }  
         }  
   
432      }      }
433  }  }

Legend:
Removed from v.2010  
changed lines
  Added in v.2169

  ViewVC Help
Powered by ViewVC 1.1.20