/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Diff of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1999 by torben, Mon Jul 8 14:22:33 2013 UTC revision 2136 by torben, Wed Mar 26 14:00:14 2014 UTC
# Line 7  using System.Diagnostics; Line 7  using System.Diagnostics;
7    
8  using IBM.WMQ;  using IBM.WMQ;
9  using MySql.Data.MySqlClient;  using MySql.Data.MySqlClient;
10    using System.Globalization;
11    
12  namespace DaoMqPump2  namespace DaoMqPump2
13  {  {
# Line 15  namespace DaoMqPump2 Line 16  namespace DaoMqPump2
16    
17          enum LogfileType {          enum LogfileType {
18              LogTransactions,              LogTransactions,
19              LogEvents              LogEvents,
20                LogDiscarded
21          }          }
22    
23          public static string SQL2MQ = "sql2mq";          public static string SQL2MQ = "sql2mq";
24          public static string MQ2SQL = "mq2sql";          public static string MQ2SQL = "mq2sql";
25    
26          private bool enabled;                  //private bool enabled;        
27    
28          TransportController controller;          TransportController controller;
29    
# Line 55  namespace DaoMqPump2 Line 57  namespace DaoMqPump2
57          public bool Enabled          public bool Enabled
58          {          {
59              get {              get {
60                  return this.enabled;                  return statusData.transportEnabled;
61              }              }
62              set              set
63              {              {
64                  this.enabled = value;                  statusData.transportEnabled = value;
65                  if (value == true)                  if (value == true)
66                  {                  {
67                      this.addLogEntry("Transport enabled");                      this.addLogEntry("Transport enabled");
# Line 85  namespace DaoMqPump2 Line 87  namespace DaoMqPump2
87              this.sql2mqReadQuery = sql2mqReadQuery;              this.sql2mqReadQuery = sql2mqReadQuery;
88              this.sql2mqUpdateQuery = sql2mqUpdateQuery;              this.sql2mqUpdateQuery = sql2mqUpdateQuery;
89    
90              this.enabled = enabled;              statusData.transportEnabled = enabled;
91    
92    
93              statusData.lastrunOk = true;              statusData.lastrunOk = true;
# Line 103  namespace DaoMqPump2 Line 105  namespace DaoMqPump2
105    
106          public void transportMessages()          public void transportMessages()
107          {          {
108              if (enabled == false)              if (statusData.transportEnabled == false)
109                  return;                  return;
110    
111              Console.WriteLine(name + " -> transportMessages() ");              Console.WriteLine(name + " -> transportMessages() ");
# Line 139  namespace DaoMqPump2 Line 141  namespace DaoMqPump2
141    
142          private void transportSql2Mq()          private void transportSql2Mq()
143          {          {
144                MQQueueManager mqMgr = null;
145                MQQueue out_queue = null;
146    
147              string filename = getLogFilename(LogfileType.LogTransactions);              string filename = getLogFilename(LogfileType.LogTransactions);
148              using (StreamWriter translog = new StreamWriter(filename, true) )              using (StreamWriter translog = new StreamWriter(filename, true) )
149              try              try
150              {              {
151                  //stage 1 connect to mq                  //MQ Options
152                  Hashtable connProps = getConnectionProperties();                  Hashtable connProps = getConnectionProperties();
                 MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);  
153                  int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;                  int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
154    
155                  MQQueue out_queue = mqMgr.AccessQueue(queueName, openOptions);                  //MySQL Options
   
   
                 //stage 2 connect to mysql  
156                  string mysqlString = buildMysqlConnString();                  string mysqlString = buildMysqlConnString();
                 MySqlConnection sqlConnection = new MySqlConnection(mysqlString);  
                 sqlConnection.Open();  
   
157    
158                  //stage 3 move messages                  //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
159                  string readSql = "CALL " + sql2mqReadQuery + "()";                  mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
160                  MySqlCommand readCmd = new MySqlCommand(readSql, sqlConnection);                  out_queue = mqMgr.AccessQueue(queueName, openOptions);
161                  MySqlDataReader dataReader = readCmd.ExecuteReader();                  using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
162                  while (dataReader.Read())                  using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
163                  {                  {
164                      int id = dataReader.GetInt32(0);                      sqlReadConnection.Open();
165                      string msgString = dataReader.GetString(1);                      sqlWriteConnection.Open();
166    
167                      MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,                      //stage 3 move messages
168                      // same as MQPMO_DEFAULT                      string readSql = "CALL " + sql2mqReadQuery + "()";
169                        MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);
170                        MySqlDataReader dataReader = readCmd.ExecuteReader();
171                        while (dataReader.Read())
172                        {
173                            int id = dataReader.GetInt32(0);
174                            string msgString = dataReader.GetString(1);
175    
176                      MQMessage msg = new MQMessage();                          MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
177                      msg.Format = MQC.MQFMT_STRING;                          // same as MQPMO_DEFAULT
                     msg.CharacterSet = 1252;  
                     msg.WriteString(msgString);  
178    
179                      out_queue.Put(msg, pmo);                          MQMessage msg = new MQMessage();
180                            msg.Format = MQC.MQFMT_STRING;
181                            msg.CharacterSet = 1252;
182                            msg.WriteString(msgString);
183    
184                      //now that the message has been put on queue mark it as such                          out_queue.Put(msg, pmo);
185    
186                      string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";                          //now that the message has been put on queue mark it as such
                     MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlConnection);  
                     int numrows = updateCmd.ExecuteNonQuery();  
187    
188                      translog.WriteLine(getNowString() + " " + msgString + "\n");                          string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
189                            MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
190                            int numrows = updateCmd.ExecuteNonQuery();
191    
192                      if (numrows != 1)                          translog.WriteLine(getNowString() + " " + msgString);
193                      {  
194                          break;                          if (numrows != 1)
195                            {
196                                break;
197                            }
198                            statusData.counter++;
199                      }                      }
                     statusData.counter++;  
                 }  
                   
200    
201                  //stage 4: everything went smooth so clean up afterwards                  }
                 dataReader.Close();  
                 out_queue.Close();  
                 mqMgr.Close();  
                 sqlConnection.Close();  
202              }              }
203              catch (Exception e)              catch (Exception e)
204              {              {
205                  statusData.lastrunOk = false;                  statusData.lastrunOk = false;
206                  statusData.lastErrorMessage = name + ".transportSql2Mq error: " + e.Message;                  statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
207                  Console.WriteLine(statusData.lastErrorMessage);                  Console.WriteLine(statusData.lastErrorMessage);
208                    Console.WriteLine(e.StackTrace);
209                  EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);                  EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
210              }              }
211                finally
212                {
213    
214                    if (out_queue != null && out_queue.IsOpen)
215                    {
216                        try
217                        {
218                            out_queue.Close();
219                        }
220                        catch (Exception e)
221                        {
222                            Console.WriteLine("Error cleaning up qmgr " + e.Message);
223                        }
224                    }
225    
226                    if (mqMgr != null && mqMgr.IsOpen)
227                    {
228                        try
229                        {
230                            mqMgr.Close();
231                        }
232                        catch (Exception e)
233                        {
234                            Console.WriteLine("Error cleaning up qmgr " + e.Message);
235                        }
236                    }
237    
238                }
239          }          }
240    
241          private void transportMq2Sql()          private void transportMq2Sql()
242          {          {
243                int messageCount = 0;
244    
245                MQQueueManager mqMgr = null;
246                MQQueue in_queue = null;
247              string filename = getLogFilename(LogfileType.LogTransactions);              string filename = getLogFilename(LogfileType.LogTransactions);
248              using (StreamWriter translog = new StreamWriter(filename, true))              using (StreamWriter translog = new StreamWriter(filename, true))
249              try                  try
250              {                  {
251                  //stage 1 connect to mq                      //MQ options
252                  Hashtable connProps = getConnectionProperties();                      Hashtable connProps = getConnectionProperties();
253                  MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);                      int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
254                  int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;  
255                        //MySQL options
256                  MQQueue in_queue = mqMgr.AccessQueue(queueName, openOptions);                      string mysqlString = buildMysqlConnString();
257    
258                        //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
259                        mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
260                        in_queue = mqMgr.AccessQueue(queueName, openOptions);
261                        using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
262                        {
263    
264                            sqlConnection.Open();
265    
                 //stage 2 connect to mysql  
                 string mysqlString = buildMysqlConnString();  
                 MySqlConnection sqlConnection = new MySqlConnection(mysqlString);  
                 sqlConnection.Open();  
266    
267                            //stage 3 move messages
268                            bool isContinue = true;
269                            while (isContinue)
270                            {
271    
272                  //stage 3 move messages                              MQMessage mqMsg = new MQMessage();
273                  bool isContinue = true;                              MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
                 while (isContinue)  
                 {  
                       
                     MQMessage mqMsg = new MQMessage();  
                     MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();  
274    
275                      mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen                              mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
276    
277                      try                              try
278                      {                              {
279                          in_queue.Get(mqMsg, mqGetMsgOpts);                                  in_queue.Get(mqMsg, mqGetMsgOpts);
280                          if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)                                  if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
281                          {                                                              {
282                              string msgString = mqMsg.ReadString(mqMsg.MessageLength);                                      string msgString = mqMsg.ReadString(mqMsg.MessageLength);
283                              System.Console.WriteLine(msgString);                                      //System.Console.WriteLine(msgString);
284    
285    
286                                        // Hvis transaktionen starter med et ? er det ikke en gyldig transaktion
287                                        // validér ligeledes at headeren er gyldig
288                                        if ( msgString.StartsWith("?") || validateSalt2Header(msgString) == false )
289                                        {
290                                            string discarded_filename = getLogFilename(LogfileType.LogDiscarded);
291                                            using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))
292                                            {
293                                                discardedlog.WriteLine(getNowString() + " " + msgString);
294                                            }
295                                            mqMgr.Commit();//fjern den afviste transaktion fra køen
296                                            statusData.discardedCounter++;
297                                            continue; //gå frem til at tage næste transaktion fra køen
298                                        }
299    
300    
301                                        string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
302    
303                                        MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
304                                        int numrows = sqlcmd.ExecuteNonQuery();
305    
306                                        if (numrows == 1)
307                                        {
308                                            translog.WriteLine(getNowString() + " " + msgString);
309                                            mqMgr.Commit();
310                                            statusData.counter++;
311    
312    
313                                            messageCount++;// increment per run message counter
314                                            if (messageCount >= 10000) // if we have moved  10000 messages in this run - let it go and give the other transports a change
315                                            {
316                                                isContinue = false;
317                                            }
318    
                             string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng  
319    
                             MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);  
                             int numrows = sqlcmd.ExecuteNonQuery();  
320    
321                              if (numrows == 1)                                      }
322                              {                                      else
323                                  translog.WriteLine(getNowString() + " " + msgString + "\n");                                      {
324                                  mqMgr.Commit();                                          mqMgr.Backout();
325                                  statusData.counter++;                                          isContinue = false;
326                                        }
327    
328                                    }
329                                    else
330                                    {
331                                        System.Console.WriteLine("Non-text message");
332                                    }
333                              }                              }
334                              else                              catch (MQException mqe)
335                              {                              {
                                 mqMgr.Backout();  
336                                  isContinue = false;                                  isContinue = false;
337                              }                              
338                                    // report reason, if any
339                                    if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
340                                    {
341                                        // special report for normal end
342                                        System.Console.WriteLine("no more messages");
343                                    }
344                                    else
345                                    {
346                                        // general report for other reasons
347                                        System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
348                                        statusData.lastrunOk = false;
349                                    }
350    
351                                }
352    
353    
354                          }                          }
355                          else  
356                        }
357    
358                    }
359                    catch (Exception e)
360                    {
361                        //Det vil være mest korrekt at Rollback/backout MQ Transaktionen her - for at være sikker på at Message'n fjernes fra køen
362                        try
363                        {
364                            if (mqMgr != null)
365                          {                          {
366                              System.Console.WriteLine("Non-text message");                              mqMgr.Backout();
367                          }                          }
368                      }                      }
369                      catch (MQException mqe)                      catch (Exception e2)
370                      {                      {
371                          isContinue = false;                          this.addLogEntry("Error backing out from MQ Transaction: " + e2.Message);
372                        }
373    
374                        statusData.lastrunOk = false;
375    
376                          // report reason, if any                      statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
377                          if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)                      Console.WriteLine(statusData.lastErrorMessage);
378                        Console.WriteLine(e.StackTrace);
379                        EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
380                    }
381                    finally
382                    {
383    
384                        if (in_queue != null && in_queue.IsOpen)
385                        {
386                            try
387                          {                          {
388                              // special report for normal end                              in_queue.Close();
                             System.Console.WriteLine("no more messages");  
389                          }                          }
390                          else                          catch (Exception e)
391                          {                          {
392                              // general report for other reasons                              Console.WriteLine("Error cleaning up qmgr " + e.Message);
393                              System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message);;                          }
394                              statusData.lastrunOk = false;                      }
395                        
396                        if (mqMgr != null && mqMgr.IsOpen)
397                        {
398                            try
399                            {
400                                mqMgr.Close();
401                            } catch (Exception e) {
402                                Console.WriteLine("Error cleaning up qmgr " + e.Message);
403                          }                          }
   
404                      }                      }
   
405    
406                  }                  }
   
                 //stage 4: everything went smooth so clean up afterwards  
                 in_queue.Close();  
                 mqMgr.Close();  
                 sqlConnection.Close();  
   
   
             }  
             catch (Exception e)  
             {  
                 statusData.lastrunOk = false;  
   
                 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.Message;  
                 Console.WriteLine(statusData.lastErrorMessage);  
                 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);  
             }  
407          }          }
408    
409          private string buildMysqlConnString()          private string buildMysqlConnString()
# Line 315  namespace DaoMqPump2 Line 414  namespace DaoMqPump2
414              //connectionString += "DATABASE=" + controller.mysqlHost + ";";              //connectionString += "DATABASE=" + controller.mysqlHost + ";";
415              connectionString += "UID=" + controller.mysqlUser + ";";              connectionString += "UID=" + controller.mysqlUser + ";";
416              connectionString += "PASSWORD=" + controller.mysqlPassword + ";";              connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
417                connectionString += "Max Pool Size=20;";
418                //connectionString += "ConnectionReset=true;";//ConnectionReset kræver muligvis at Default Database er angivet - det virker ihvertfald ikke uden
419    
420              return connectionString;              return connectionString;
421          }          }
# Line 324  namespace DaoMqPump2 Line 425  namespace DaoMqPump2
425              Hashtable connProperties = new Hashtable();              Hashtable connProperties = new Hashtable();
426              connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);              connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
427              connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);              connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);
428              connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel); //WARNING: Hardcoded Channel Value !!!              connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel);
429              return connProperties;              return connProperties;
430          }          }
431    
# Line 336  namespace DaoMqPump2 Line 437  namespace DaoMqPump2
437              DateTime now = DateTime.Now;              DateTime now = DateTime.Now;
438              string filename = controller.logDirectory + "\\";              string filename = controller.logDirectory + "\\";
439    
440                //Find uge nr
441                DateTimeFormatInfo dfi = DateTimeFormatInfo.CurrentInfo;
442                Calendar myCal = CultureInfo.InvariantCulture.Calendar;//System.Globalization
443                int week = myCal.GetWeekOfYear(now, dfi.CalendarWeekRule, dfi.FirstDayOfWeek);
444    
445              switch (type)              switch (type)
446              {              {
447                  case LogfileType.LogEvents:                  case LogfileType.LogEvents:
# Line 345  namespace DaoMqPump2 Line 451  namespace DaoMqPump2
451                  case LogfileType.LogTransactions:                  case LogfileType.LogTransactions:
452                      filename += "transactionlog_";                      filename += "transactionlog_";
453                      break;                      break;
454                    case LogfileType.LogDiscarded:
455                        filename += "discardedlog_";
456                        break;
457              }              }
458    
459    
460              filename += "transactionlog_" + name + "_" + now.Year.ToString("D4") + now.Month.ToString("D2") + ".log";              filename += name + "_" + now.Year.ToString("D4") + "_W" + week.ToString("D2") + ".log";
461    
462              return filename;              return filename;
463          }          }
# Line 360  namespace DaoMqPump2 Line 469  namespace DaoMqPump2
469              return now.ToString("s");              return now.ToString("s");
470          }          }
471    
472            /* no used any where used added here for reference/ just in case */
473            private void sendErrorMail(string sub, string msg)
474            {
475                System.Net.Mail.MailMessage message = new System.Net.Mail.MailMessage();
476                message.To.Add("thn@daoas.dk");
477                message.Subject = "Error from DaoMqPump2: " + sub;
478                message.From = new System.Net.Mail.MailAddress("no-reply@daoas.dk");
479                message.Body = msg;
480    
481                System.Net.Mail.SmtpClient smtp = new System.Net.Mail.SmtpClient("mail.dao.int");
482                smtp.Send(message);
483            }
484    
485            private bool validateSalt2Header(string salt2String)
486            {
487                if (salt2String.Length < 66)
488                {
489                    addLogEntry("Transaction too short - discarding");
490                    return false;
491                }
492    
493    
494                int result;
495                long result_long;
496    
497                string afsender = salt2String.Substring(0, 5);
498                string modtager = salt2String.Substring(5, 5);
499                string afsenderTegnSaet = salt2String.Substring(10, 6);
500                string standardNavn = salt2String.Substring(16, 6);
501                string standardVersion = salt2String.Substring(22, 3);
502                string afsenderSekvensnr = salt2String.Substring(25, 6);
503                string afsenderTidsstempel = salt2String.Substring(31, 14);
504                string afsenderBakkeIdent = salt2String.Substring(45, 5);
505                string modtagerBakkeIdent = salt2String.Substring(50, 5);
506                string transaktionForkortelse = salt2String.Substring(55, 4);
507                string transaktionsLaengde = salt2String.Substring(59, 5);
508                string prioritet = salt2String.Substring(64, 1);
509    
510    
511                
512                if (int.TryParse(standardVersion.Trim(), out result) == false) // standardVersion _skal_ være en int
513                {
514                    addLogEntry("standardVersion not an integer, discarding");
515                    return false;
516                }
517    
518                if (int.TryParse(afsenderSekvensnr.Trim(), out result) == false) // afsenderSekvensnr _skal_ være en int
519                {
520                    addLogEntry("afsenderSekvensnr not an integer, discarding");
521                    return false;
522                }
523    
524                if (long.TryParse(afsenderTidsstempel.Trim(), out result_long) == false) // afsenderTidsstempel _skal_ være en long
525                {
526                    addLogEntry("afsenderTidsstempel not a long integer, discarding");
527                    return false;
528                }
529    
530                if (int.TryParse(transaktionsLaengde.Trim(), out result) == false) // transaktionsLaengde _skal_ være en int
531                {
532                    addLogEntry("transaktionsLaengde not an integer, discarding");
533                    return false;
534                }
535    
536                if ( int.TryParse(prioritet.Trim(), out result) == false ) // prioritet _skal_ være en int
537                {
538                    addLogEntry("prioritet not an integer, discarding");
539                    return false;
540                }
541    
542                return true;
543            }
544    
545          private void addLogEntry(string msg)          private void addLogEntry(string msg)
546          {          {
547              msg = getNowString() + " " + msg;              msg = getNowString() + " " + msg;

Legend:
Removed from v.1999  
changed lines
  Added in v.2136

  ViewVC Help
Powered by ViewVC 1.1.20