/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Diff of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 2010 by torben, Wed Jul 10 18:28:31 2013 UTC revision 2166 by torben, Fri May 16 18:24:05 2014 UTC
# Line 7  using System.Diagnostics; Line 7  using System.Diagnostics;
7    
8  using IBM.WMQ;  using IBM.WMQ;
9  using MySql.Data.MySqlClient;  using MySql.Data.MySqlClient;
10    using System.Globalization;
11    
12    using DaoCommon;
13    
14  namespace DaoMqPump2  namespace DaoMqPump2
15  {  {
# Line 15  namespace DaoMqPump2 Line 18  namespace DaoMqPump2
18    
19          enum LogfileType {          enum LogfileType {
20              LogTransactions,              LogTransactions,
21              LogEvents              LogEvents,
22                LogDiscarded
23          }          }
24    
25          public static string SQL2MQ = "sql2mq";          public static string SQL2MQ = "sql2mq";
# Line 122  namespace DaoMqPump2 Line 126  namespace DaoMqPump2
126    
127              if (statusData.lastrunOk == true)              if (statusData.lastrunOk == true)
128              {              {
129                  statusData.lastOkTime = getNowString();                  statusData.lastOkTime = DaoUtil.getNowString();
130    
131                  if (statusData.counter != startCounter)                  if (statusData.counter != startCounter)
132                  {                  {
133                      //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet                      //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet
134                      statusData.lastTransferTime = getNowString();                      statusData.lastTransferTime = DaoUtil.getNowString();
135                  }                  }
136              }              }
137              else              else
138              {              {
139                  addLogEntry(statusData.lastErrorMessage);                  addLogEntry(statusData.lastErrorMessage);
140                  statusData.lastErrorTime = getNowString();                  statusData.lastErrorTime = DaoUtil.getNowString();
141              }              }
142          }          }
143    
144          private void transportSql2Mq()          private void transportSql2Mq()
145          {          {
146                MQQueueManager mqMgr = null;
147                MQQueue out_queue = null;
148    
149              string filename = getLogFilename(LogfileType.LogTransactions);              string filename = getLogFilename(LogfileType.LogTransactions);
150              using (StreamWriter translog = new StreamWriter(filename, true) )              using (StreamWriter translog = new StreamWriter(filename, true) )
151              try              try
152              {              {
153                  //stage 1 connect to mq                  //MQ Options
154                  Hashtable connProps = getConnectionProperties();                  Hashtable connProps = getConnectionProperties();
                 MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);  
155                  int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;                  int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
156    
157                  MQQueue out_queue = mqMgr.AccessQueue(queueName, openOptions);                  //MySQL Options
   
   
                 //stage 2 connect to mysql  
158                  string mysqlString = buildMysqlConnString();                  string mysqlString = buildMysqlConnString();
                 MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString);  
                 sqlReadConnection.Open();  
159    
160                  MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString);                  //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
161                  sqlWriteConnection.Open();                  mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
162                    out_queue = mqMgr.AccessQueue(queueName, openOptions);
163                    using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
164                  //stage 3 move messages                  using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
                 string readSql = "CALL " + sql2mqReadQuery + "()";  
                 MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);  
                 MySqlDataReader dataReader = readCmd.ExecuteReader();  
                 while (dataReader.Read())  
165                  {                  {
166                      int id = dataReader.GetInt32(0);                      sqlReadConnection.Open();
167                      string msgString = dataReader.GetString(1);                      sqlWriteConnection.Open();
168    
169                        //stage 3 move messages
170                        string readSql = "CALL " + sql2mqReadQuery + "()";
171                        MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);
172                        MySqlDataReader dataReader = readCmd.ExecuteReader();
173                        while (dataReader.Read())
174                        {
175                            int id = dataReader.GetInt32(0);
176                            string msgString = dataReader.GetString(1);
177    
178                      MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,                          MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
179                      // same as MQPMO_DEFAULT                          // same as MQPMO_DEFAULT
180    
181                      MQMessage msg = new MQMessage();                          MQMessage msg = new MQMessage();
182                      msg.Format = MQC.MQFMT_STRING;                          msg.Format = MQC.MQFMT_STRING;
183                      msg.CharacterSet = 1252;                          msg.CharacterSet = 1252;
184                      msg.WriteString(msgString);                          msg.WriteString(msgString);
185    
186                      out_queue.Put(msg, pmo);                          out_queue.Put(msg, pmo);
187    
188                      //now that the message has been put on queue mark it as such                          //now that the message has been put on queue mark it as such
189    
190                      string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";                          string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
191                      MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);                          MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
192                      int numrows = updateCmd.ExecuteNonQuery();                          int numrows = updateCmd.ExecuteNonQuery();
193    
194                      translog.WriteLine(getNowString() + " " + msgString + "\n");                          translog.WriteLine(DaoUtil.getNowString() + " " + msgString);
195    
196                      if (numrows != 1)                          if (numrows != 1)
197                      {                          {
198                          break;                              break;
199                            }
200                            statusData.counter++;
201                      }                      }
                     statusData.counter++;  
                 }  
                   
202    
203                  //stage 4: everything went smooth so clean up afterwards                  }
                 dataReader.Close();  
                 out_queue.Close();  
                 mqMgr.Close();  
                 sqlReadConnection.Close();  
                 sqlWriteConnection.Close();  
204              }              }
205              catch (Exception e)              catch (Exception e)
206              {              {
207                  statusData.lastrunOk = false;                  statusData.lastrunOk = false;
208                  statusData.lastErrorMessage = name + ".transportSql2Mq error: " + e.Message;                  statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
209                  Console.WriteLine(statusData.lastErrorMessage);                  Console.WriteLine(statusData.lastErrorMessage);
210                    Console.WriteLine(e.StackTrace);
211                  EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);                  EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
212              }              }
213                finally
214                {
215    
216                    if (out_queue != null && out_queue.IsOpen)
217                    {
218                        try
219                        {
220                            out_queue.Close();
221                        }
222                        catch (Exception e)
223                        {
224                            Console.WriteLine("Error cleaning up qmgr " + e.Message);
225                        }
226                    }
227    
228                    if (mqMgr != null && mqMgr.IsOpen)
229                    {
230                        try
231                        {
232                            mqMgr.Close();
233                        }
234                        catch (Exception e)
235                        {
236                            Console.WriteLine("Error cleaning up qmgr " + e.Message);
237                        }
238                    }
239    
240                }
241          }          }
242    
243          private void transportMq2Sql()          private void transportMq2Sql()
244          {          {
245                int messageCount = 0;
246    
247                MQQueueManager mqMgr = null;
248                MQQueue in_queue = null;
249              string filename = getLogFilename(LogfileType.LogTransactions);              string filename = getLogFilename(LogfileType.LogTransactions);
250              using (StreamWriter translog = new StreamWriter(filename, true))              using (StreamWriter translog = new StreamWriter(filename, true))
251              try                  try
252              {                  {
253                  //stage 1 connect to mq                      //MQ options
254                  Hashtable connProps = getConnectionProperties();                      Hashtable connProps = getConnectionProperties();
255                  MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);                      int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
256                  int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;  
257                        //MySQL options
258                  MQQueue in_queue = mqMgr.AccessQueue(queueName, openOptions);                      string mysqlString = buildMysqlConnString();
259    
260                        //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
261                        mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
262                        in_queue = mqMgr.AccessQueue(queueName, openOptions);
263                        using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
264                        {
265    
266                            sqlConnection.Open();
267    
                 //stage 2 connect to mysql  
                 string mysqlString = buildMysqlConnString();  
                 MySqlConnection sqlConnection = new MySqlConnection(mysqlString);  
                 sqlConnection.Open();  
268    
269                            //stage 3 move messages
270                            bool isContinue = true;
271                            while (isContinue)
272                            {
273    
274                  //stage 3 move messages                              MQMessage mqMsg = new MQMessage();
275                  bool isContinue = true;                              MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
                 while (isContinue)  
                 {  
                       
                     MQMessage mqMsg = new MQMessage();  
                     MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();  
276    
277                      mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen                              mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
278    
279                      try                              try
280                      {                              {
281                          in_queue.Get(mqMsg, mqGetMsgOpts);                                  in_queue.Get(mqMsg, mqGetMsgOpts);
282                          if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)                                  if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
283                          {                                                              {
284                              string msgString = mqMsg.ReadString(mqMsg.MessageLength);                                      string msgString = mqMsg.ReadString(mqMsg.MessageLength);
285                              System.Console.WriteLine(msgString);                                      //System.Console.WriteLine(msgString);
286    
287    
288                                        // Hvis transaktionen starter med et ? er det ikke en gyldig transaktion
289                                        // validér ligeledes at headeren er gyldig
290                                        if ( msgString.StartsWith("?") || validateSalt2Header(msgString) == false )
291                                        {
292                                            string discarded_filename = getLogFilename(LogfileType.LogDiscarded);
293                                            using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))
294                                            {
295                                                discardedlog.WriteLine( DaoUtil.getNowString() + " " + msgString );
296                                            }
297                                            mqMgr.Commit();//fjern den afviste transaktion fra køen
298                                            statusData.discardedCounter++;
299                                            continue; //gå frem til at tage næste transaktion fra køen
300                                        }
301    
302    
303                                        string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
304    
305                                        MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
306                                        int numrows = sqlcmd.ExecuteNonQuery();
307    
308                                        if (numrows == 1)
309                                        {
310                                            translog.WriteLine( DaoUtil.getNowString() + " " + msgString );
311                                            mqMgr.Commit();
312                                            statusData.counter++;
313    
314    
315                                            messageCount++;// increment per run message counter
316                                            if (messageCount >= 10000) // if we have moved  10000 messages in this run - let it go and give the other transports a change
317                                            {
318                                                isContinue = false;
319                                            }
320    
                             string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng  
321    
                             MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);  
                             int numrows = sqlcmd.ExecuteNonQuery();  
322    
323                              if (numrows == 1)                                      }
324                              {                                      else
325                                  translog.WriteLine(getNowString() + " " + msgString + "\n");                                      {
326                                  mqMgr.Commit();                                          mqMgr.Backout();
327                                  statusData.counter++;                                          isContinue = false;
328                                        }
329    
330                                    }
331                                    else
332                                    {
333                                        System.Console.WriteLine("Non-text message");
334                                    }
335                              }                              }
336                              else                              catch (MQException mqe)
337                              {                              {
                                 mqMgr.Backout();  
338                                  isContinue = false;                                  isContinue = false;
339                              }                              
340                                    // report reason, if any
341                                    if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
342                                    {
343                                        // special report for normal end
344                                        System.Console.WriteLine("no more messages");
345                                    }
346                                    else
347                                    {
348                                        // general report for other reasons
349                                        System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
350                                        statusData.lastrunOk = false;
351                                    }
352    
353                                }
354    
355    
356                          }                          }
357                          else  
358                        }
359    
360                    }
361                    catch (Exception e)
362                    {
363                        //Det vil være mest korrekt at Rollback/backout MQ Transaktionen her - for at være sikker på at Message'n fjernes fra køen
364                        try
365                        {
366                            if (mqMgr != null)
367                          {                          {
368                              System.Console.WriteLine("Non-text message");                              mqMgr.Backout();
369                          }                          }
370                      }                      }
371                      catch (MQException mqe)                      catch (Exception e2)
372                      {                      {
373                          isContinue = false;                          this.addLogEntry("Error backing out from MQ Transaction: " + e2.Message);
374                        }
375    
376                        statusData.lastrunOk = false;
377    
378                          // report reason, if any                      statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
379                          if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)                      Console.WriteLine(statusData.lastErrorMessage);
380                        Console.WriteLine(e.StackTrace);
381                        EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
382                    }
383                    finally
384                    {
385    
386                        if (in_queue != null && in_queue.IsOpen)
387                        {
388                            try
389                          {                          {
390                              // special report for normal end                              in_queue.Close();
                             System.Console.WriteLine("no more messages");  
391                          }                          }
392                          else                          catch (Exception e)
393                          {                          {
394                              // general report for other reasons                              Console.WriteLine("Error cleaning up qmgr " + e.Message);
395                              System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message);;                          }
396                              statusData.lastrunOk = false;                      }
397                        
398                        if (mqMgr != null && mqMgr.IsOpen)
399                        {
400                            try
401                            {
402                                mqMgr.Close();
403                            } catch (Exception e) {
404                                Console.WriteLine("Error cleaning up qmgr " + e.Message);
405                          }                          }
   
406                      }                      }
   
407    
408                  }                  }
   
                 //stage 4: everything went smooth so clean up afterwards  
                 in_queue.Close();  
                 mqMgr.Close();  
                 sqlConnection.Close();  
   
   
             }  
             catch (Exception e)  
             {  
                 statusData.lastrunOk = false;  
   
                 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.Message;  
                 Console.WriteLine(statusData.lastErrorMessage);  
                 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);  
             }  
409          }          }
410    
411          private string buildMysqlConnString()          private string buildMysqlConnString()
# Line 319  namespace DaoMqPump2 Line 416  namespace DaoMqPump2
416              //connectionString += "DATABASE=" + controller.mysqlHost + ";";              //connectionString += "DATABASE=" + controller.mysqlHost + ";";
417              connectionString += "UID=" + controller.mysqlUser + ";";              connectionString += "UID=" + controller.mysqlUser + ";";
418              connectionString += "PASSWORD=" + controller.mysqlPassword + ";";              connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
419                connectionString += "Max Pool Size=20;";
420                //connectionString += "ConnectionReset=true;";//ConnectionReset kræver muligvis at Default Database er angivet - det virker ihvertfald ikke uden
421    
422              return connectionString;              return connectionString;
423          }          }
# Line 328  namespace DaoMqPump2 Line 427  namespace DaoMqPump2
427              Hashtable connProperties = new Hashtable();              Hashtable connProperties = new Hashtable();
428              connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);              connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
429              connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);              connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);
430              connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel); //WARNING: Hardcoded Channel Value !!!              connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel);
431              return connProperties;              return connProperties;
432          }          }
433    
# Line 340  namespace DaoMqPump2 Line 439  namespace DaoMqPump2
439              DateTime now = DateTime.Now;              DateTime now = DateTime.Now;
440              string filename = controller.logDirectory + "\\";              string filename = controller.logDirectory + "\\";
441    
442                //Find uge nr
443                DateTimeFormatInfo dfi = DateTimeFormatInfo.CurrentInfo;
444                Calendar myCal = CultureInfo.InvariantCulture.Calendar;//System.Globalization
445                int week = myCal.GetWeekOfYear(now, dfi.CalendarWeekRule, dfi.FirstDayOfWeek);
446    
447              switch (type)              switch (type)
448              {              {
449                  case LogfileType.LogEvents:                  case LogfileType.LogEvents:
# Line 349  namespace DaoMqPump2 Line 453  namespace DaoMqPump2
453                  case LogfileType.LogTransactions:                  case LogfileType.LogTransactions:
454                      filename += "transactionlog_";                      filename += "transactionlog_";
455                      break;                      break;
456                    case LogfileType.LogDiscarded:
457                        filename += "discardedlog_";
458                        break;
459              }              }
460    
461    
462              filename += name + "_" + now.Year.ToString("D4") + now.Month.ToString("D2") + ".log";              filename += name + "_" + now.Year.ToString("D4") + "_W" + week.ToString("D2") + ".log";
463    
464              return filename;              return filename;
465          }          }
466    
467          public string getNowString()          private bool validateSalt2Header(string salt2String)
468          {          {
469              DateTime now = DateTime.Now;              if (salt2String.Length < 66)
470                {
471                    addLogEntry("Transaction too short - discarding");
472                    return false;
473                }
474    
475    
476                int result;
477                long result_long;
478    
479                string afsender = salt2String.Substring(0, 5);
480                string modtager = salt2String.Substring(5, 5);
481                string afsenderTegnSaet = salt2String.Substring(10, 6);
482                string standardNavn = salt2String.Substring(16, 6);
483                string standardVersion = salt2String.Substring(22, 3);
484                string afsenderSekvensnr = salt2String.Substring(25, 6);
485                string afsenderTidsstempel = salt2String.Substring(31, 14);
486                string afsenderBakkeIdent = salt2String.Substring(45, 5);
487                string modtagerBakkeIdent = salt2String.Substring(50, 5);
488                string transaktionForkortelse = salt2String.Substring(55, 4);
489                string transaktionsLaengde = salt2String.Substring(59, 5);
490                string prioritet = salt2String.Substring(64, 1);
491    
492    
493                
494                if (int.TryParse(standardVersion.Trim(), out result) == false) // standardVersion _skal_ være en int
495                {
496                    addLogEntry("standardVersion not an integer, discarding");
497                    return false;
498                }
499    
500                if (int.TryParse(afsenderSekvensnr.Trim(), out result) == false) // afsenderSekvensnr _skal_ være en int
501                {
502                    addLogEntry("afsenderSekvensnr not an integer, discarding");
503                    return false;
504                }
505    
506                if (long.TryParse(afsenderTidsstempel.Trim(), out result_long) == false) // afsenderTidsstempel _skal_ være en long
507                {
508                    addLogEntry("afsenderTidsstempel not a long integer, discarding");
509                    return false;
510                }
511    
512                if (int.TryParse(transaktionsLaengde.Trim(), out result) == false) // transaktionsLaengde _skal_ være en int
513                {
514                    addLogEntry("transaktionsLaengde not an integer, discarding");
515                    return false;
516                }
517    
518                if ( int.TryParse(prioritet.Trim(), out result) == false ) // prioritet _skal_ være en int
519                {
520                    addLogEntry("prioritet not an integer, discarding");
521                    return false;
522                }
523    
524              return now.ToString("s");              return true;
525          }          }
526    
527          private void addLogEntry(string msg)          private void addLogEntry(string msg)
528          {          {
529              msg = getNowString() + " " + msg;              msg = DaoUtil.getNowString() + " " + msg;
530              lock (logEntries)              lock (logEntries)
531              {              {
532                  logEntries.AddFirst(msg);                  logEntries.AddFirst(msg);

Legend:
Removed from v.2010  
changed lines
  Added in v.2166

  ViewVC Help
Powered by ViewVC 1.1.20