/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Diff of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 2047 by torben, Mon Aug 19 07:21:59 2013 UTC revision 2062 by torben, Thu Aug 29 07:54:49 2013 UTC
# Line 7  using System.Diagnostics; Line 7  using System.Diagnostics;
7    
8  using IBM.WMQ;  using IBM.WMQ;
9  using MySql.Data.MySqlClient;  using MySql.Data.MySqlClient;
10    using System.Globalization;
11    
12  namespace DaoMqPump2  namespace DaoMqPump2
13  {  {
# Line 15  namespace DaoMqPump2 Line 16  namespace DaoMqPump2
16    
17          enum LogfileType {          enum LogfileType {
18              LogTransactions,              LogTransactions,
19              LogEvents              LogEvents,
20                LogDiscarded
21          }          }
22    
23          public static string SQL2MQ = "sql2mq";          public static string SQL2MQ = "sql2mq";
# Line 139  namespace DaoMqPump2 Line 141  namespace DaoMqPump2
141    
142          private void transportSql2Mq()          private void transportSql2Mq()
143          {          {
144                MQQueueManager mqMgr = null;
145                MQQueue out_queue = null;
146    
147              string filename = getLogFilename(LogfileType.LogTransactions);              string filename = getLogFilename(LogfileType.LogTransactions);
148              using (StreamWriter translog = new StreamWriter(filename, true) )              using (StreamWriter translog = new StreamWriter(filename, true) )
149              try              try
# Line 149  namespace DaoMqPump2 Line 154  namespace DaoMqPump2
154    
155                  //MySQL Options                  //MySQL Options
156                  string mysqlString = buildMysqlConnString();                  string mysqlString = buildMysqlConnString();
                   
157    
158                  using (MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps))//stage 1 connect to mq                  //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
159                  using (MQQueue out_queue = mqMgr.AccessQueue(queueName, openOptions))                  mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
160                    out_queue = mqMgr.AccessQueue(queueName, openOptions);
161                  using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql                  using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
162                  using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))                  using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
163                  {                  {
# Line 184  namespace DaoMqPump2 Line 189  namespace DaoMqPump2
189                          MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);                          MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
190                          int numrows = updateCmd.ExecuteNonQuery();                          int numrows = updateCmd.ExecuteNonQuery();
191    
192                          translog.WriteLine(getNowString() + " " + msgString + "\n");                          translog.WriteLine(getNowString() + " " + msgString);
193    
194                          if (numrows != 1)                          if (numrows != 1)
195                          {                          {
# Line 198  namespace DaoMqPump2 Line 203  namespace DaoMqPump2
203              catch (Exception e)              catch (Exception e)
204              {              {
205                  statusData.lastrunOk = false;                  statusData.lastrunOk = false;
206                  statusData.lastErrorMessage = name + ".transportSql2Mq error: " + e.Message;                  statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
207                  Console.WriteLine(statusData.lastErrorMessage);                  Console.WriteLine(statusData.lastErrorMessage);
208                    Console.WriteLine(e.StackTrace);
209                  EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);                  EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
210              }              }
211                finally
212                {
213    
214                    if (out_queue != null && out_queue.IsOpen)
215                    {
216                        try
217                        {
218                            out_queue.Close();
219                        }
220                        catch (Exception e)
221                        {
222                            Console.WriteLine("Error cleaning up qmgr " + e.Message);
223                        }
224                    }
225    
226                    if (mqMgr != null && mqMgr.IsOpen)
227                    {
228                        try
229                        {
230                            mqMgr.Close();
231                        }
232                        catch (Exception e)
233                        {
234                            Console.WriteLine("Error cleaning up qmgr " + e.Message);
235                        }
236                    }
237    
238                }
239          }          }
240    
241          private void transportMq2Sql()          private void transportMq2Sql()
242          {          {
243                MQQueueManager mqMgr = null;
244                MQQueue in_queue = null;
245              string filename = getLogFilename(LogfileType.LogTransactions);              string filename = getLogFilename(LogfileType.LogTransactions);
246              using (StreamWriter translog = new StreamWriter(filename, true))              using (StreamWriter translog = new StreamWriter(filename, true))
247              try                  try
             {  
                 //MQ options  
                 Hashtable connProps = getConnectionProperties();                  
                 int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;  
   
                 //MySQL options  
                 string mysqlString = buildMysqlConnString();  
   
                 using (MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps))//stage 1 connect to mq  
                 using (MQQueue in_queue = mqMgr.AccessQueue(queueName, openOptions) )  
                 using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql  
248                  {                  {
249                        //MQ options
250                        Hashtable connProps = getConnectionProperties();
251                        int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
252    
253                        //MySQL options
254                        string mysqlString = buildMysqlConnString();
255    
256                        //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
257                        mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
258                        in_queue = mqMgr.AccessQueue(queueName, openOptions);
259                        using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
260                        {
261    
262                      sqlConnection.Open();                          sqlConnection.Open();
263    
264    
265                      //stage 3 move messages                          //stage 3 move messages
266                      bool isContinue = true;                          bool isContinue = true;
267                      while (isContinue)                          while (isContinue)
268                      {                          {
269    
270                          MQMessage mqMsg = new MQMessage();                              MQMessage mqMsg = new MQMessage();
271                          MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();                              MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
272    
273                          mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen                              mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
274    
275                          try                              try
                         {  
                             in_queue.Get(mqMsg, mqGetMsgOpts);  
                             if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)  
276                              {                              {
277                                  string msgString = mqMsg.ReadString(mqMsg.MessageLength);                                  in_queue.Get(mqMsg, mqGetMsgOpts);
278                                  System.Console.WriteLine(msgString);                                  if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
279                                    {
280                                        string msgString = mqMsg.ReadString(mqMsg.MessageLength);
281                                        //System.Console.WriteLine(msgString);
282    
283                                  string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng                                      if ( msgString.StartsWith("?") ) //Hvis transaktionen starter med et ? er det ikke en gyldig transaktion
284                                        {
285                                            string discarded_filename = getLogFilename(LogfileType.LogDiscarded);
286                                            using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))
287                                            {
288                                                discardedlog.WriteLine(msgString);
289                                            }
290                                            mqMgr.Commit();//fjern den afviste transaktion fra køen
291                                            statusData.discardedCounter++;
292                                            continue; //gå frem til at tage næste transaktion fra køen
293                                        }
294    
295    
296                                        string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
297    
298                                        MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
299                                        int numrows = sqlcmd.ExecuteNonQuery();
300    
301                                        if (numrows == 1)
302                                        {
303                                            translog.WriteLine(getNowString() + " " + msgString);
304                                            mqMgr.Commit();
305                                            statusData.counter++;
306                                        }
307                                        else
308                                        {
309                                            mqMgr.Backout();
310                                            isContinue = false;
311                                        }
312    
313                                  MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);                                  }
314                                  int numrows = sqlcmd.ExecuteNonQuery();                                  else
315                                    {
316                                        System.Console.WriteLine("Non-text message");
317                                    }
318                                }
319                                catch (MQException mqe)
320                                {
321                                    isContinue = false;
322    
323                                  if (numrows == 1)                                  // report reason, if any
324                                    if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
325                                  {                                  {
326                                      translog.WriteLine(getNowString() + " " + msgString + "\n");                                      // special report for normal end
327                                      mqMgr.Commit();                                      System.Console.WriteLine("no more messages");
                                     statusData.counter++;  
328                                  }                                  }
329                                  else                                  else
330                                  {                                  {
331                                      mqMgr.Backout();                                      // general report for other reasons
332                                      isContinue = false;                                      System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
333                                        statusData.lastrunOk = false;
334                                  }                                  }
335    
336                              }                              }
                             else  
                             {  
                                 System.Console.WriteLine("Non-text message");  
                             }  
                         }  
                         catch (MQException mqe)  
                         {  
                             isContinue = false;  
337    
                             // report reason, if any  
                             if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)  
                             {  
                                 // special report for normal end  
                                 System.Console.WriteLine("no more messages");  
                             }  
                             else  
                             {  
                                 // general report for other reasons  
                                 System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;  
                                 statusData.lastrunOk = false;  
                             }  
338    
339                          }                          }
340    
341                        }
342    
343                    }
344                    catch (Exception e)
345                    {
346                        //Det vil være mest korrekt at Rollback/backout MQ Transaktionen her - for at være sikker på at Message'n fjernes fra køen
347                        try
348                        {
349                            if (mqMgr != null)
350                            {
351                                mqMgr.Backout();
352                            }
353                      }                      }
354                        catch (Exception e2)
355                        {
356                            this.addLogEntry("Error backing out from MQ Transaction: " + e2.Message);
357                        }
358    
359                        statusData.lastrunOk = false;
360    
361                        statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
362                        Console.WriteLine(statusData.lastErrorMessage);
363                        Console.WriteLine(e.StackTrace);
364                        EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
365                  }                  }
366                    finally
367                    {
368    
369              }                      if (in_queue != null && in_queue.IsOpen)
370              catch (Exception e)                      {
371              {                          try
372                  statusData.lastrunOk = false;                          {
373                                in_queue.Close();
374                            }
375                            catch (Exception e)
376                            {
377                                Console.WriteLine("Error cleaning up qmgr " + e.Message);
378                            }
379                        }
380                        
381                        if (mqMgr != null && mqMgr.IsOpen)
382                        {
383                            try
384                            {
385                                mqMgr.Close();
386                            } catch (Exception e) {
387                                Console.WriteLine("Error cleaning up qmgr " + e.Message);
388                            }
389                        }
390    
391                  statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.Message;                  }
                 Console.WriteLine(statusData.lastErrorMessage);  
                 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);  
             }  
392          }          }
393    
394          private string buildMysqlConnString()          private string buildMysqlConnString()
# Line 309  namespace DaoMqPump2 Line 399  namespace DaoMqPump2
399              //connectionString += "DATABASE=" + controller.mysqlHost + ";";              //connectionString += "DATABASE=" + controller.mysqlHost + ";";
400              connectionString += "UID=" + controller.mysqlUser + ";";              connectionString += "UID=" + controller.mysqlUser + ";";
401              connectionString += "PASSWORD=" + controller.mysqlPassword + ";";              connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
402              connectionString += "maximumpoolsize=10;";              connectionString += "Max Pool Size=20;";
403                //connectionString += "ConnectionReset=true;";//ConnectionReset kræver muligvis at Default Database er angivet - det virker ihvertfald ikke uden
404    
405              return connectionString;              return connectionString;
406          }          }
# Line 319  namespace DaoMqPump2 Line 410  namespace DaoMqPump2
410              Hashtable connProperties = new Hashtable();              Hashtable connProperties = new Hashtable();
411              connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);              connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
412              connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);              connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);
413              connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel); //WARNING: Hardcoded Channel Value !!!              connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel);
414              return connProperties;              return connProperties;
415          }          }
416    
# Line 331  namespace DaoMqPump2 Line 422  namespace DaoMqPump2
422              DateTime now = DateTime.Now;              DateTime now = DateTime.Now;
423              string filename = controller.logDirectory + "\\";              string filename = controller.logDirectory + "\\";
424    
425                //Find uge nr
426                DateTimeFormatInfo dfi = DateTimeFormatInfo.CurrentInfo;
427                Calendar myCal = CultureInfo.InvariantCulture.Calendar;//System.Globalization
428                int week = myCal.GetWeekOfYear(now, dfi.CalendarWeekRule, dfi.FirstDayOfWeek);
429    
430              switch (type)              switch (type)
431              {              {
432                  case LogfileType.LogEvents:                  case LogfileType.LogEvents:
# Line 340  namespace DaoMqPump2 Line 436  namespace DaoMqPump2
436                  case LogfileType.LogTransactions:                  case LogfileType.LogTransactions:
437                      filename += "transactionlog_";                      filename += "transactionlog_";
438                      break;                      break;
439                    case LogfileType.LogDiscarded:
440                        filename += "discardedlog_";
441                        break;
442              }              }
443    
444    
445              filename += name + "_" + now.Year.ToString("D4") + now.Month.ToString("D2") + ".log";              filename += name + "_" + now.Year.ToString("D4") + "_W" + week.ToString("D2") + ".log";
446    
447              return filename;              return filename;
448          }          }

Legend:
Removed from v.2047  
changed lines
  Added in v.2062

  ViewVC Help
Powered by ViewVC 1.1.20