/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Diff of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 2011 by torben, Wed Jul 10 20:20:21 2013 UTC revision 2057 by torben, Tue Aug 27 06:49:36 2013 UTC
# Line 7  using System.Diagnostics; Line 7  using System.Diagnostics;
7    
8  using IBM.WMQ;  using IBM.WMQ;
9  using MySql.Data.MySqlClient;  using MySql.Data.MySqlClient;
10    using System.Globalization;
11    
12  namespace DaoMqPump2  namespace DaoMqPump2
13  {  {
# Line 139  namespace DaoMqPump2 Line 140  namespace DaoMqPump2
140    
141          private void transportSql2Mq()          private void transportSql2Mq()
142          {          {
143                MQQueueManager mqMgr = null;
144                MQQueue out_queue = null;
145    
146              string filename = getLogFilename(LogfileType.LogTransactions);              string filename = getLogFilename(LogfileType.LogTransactions);
147              using (StreamWriter translog = new StreamWriter(filename, true) )              using (StreamWriter translog = new StreamWriter(filename, true) )
148              try              try
# Line 149  namespace DaoMqPump2 Line 153  namespace DaoMqPump2
153    
154                  //MySQL Options                  //MySQL Options
155                  string mysqlString = buildMysqlConnString();                  string mysqlString = buildMysqlConnString();
                   
156    
157                  using (MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps))//stage 1 connect to mq                  //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
158                  using (MQQueue out_queue = mqMgr.AccessQueue(queueName, openOptions))                  mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
159                    out_queue = mqMgr.AccessQueue(queueName, openOptions);
160                  using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql                  using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
161                  using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))                  using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
162                  {                  {
# Line 184  namespace DaoMqPump2 Line 188  namespace DaoMqPump2
188                          MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);                          MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
189                          int numrows = updateCmd.ExecuteNonQuery();                          int numrows = updateCmd.ExecuteNonQuery();
190    
191                          translog.WriteLine(getNowString() + " " + msgString + "\n");                          translog.WriteLine(getNowString() + " " + msgString);
192    
193                          if (numrows != 1)                          if (numrows != 1)
194                          {                          {
# Line 198  namespace DaoMqPump2 Line 202  namespace DaoMqPump2
202              catch (Exception e)              catch (Exception e)
203              {              {
204                  statusData.lastrunOk = false;                  statusData.lastrunOk = false;
205                  statusData.lastErrorMessage = name + ".transportSql2Mq error: " + e.Message;                  statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
206                  Console.WriteLine(statusData.lastErrorMessage);                  Console.WriteLine(statusData.lastErrorMessage);
207                    Console.WriteLine(e.StackTrace);
208                  EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);                  EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
209              }              }
210                finally
211                {
212    
213                    if (out_queue != null && out_queue.IsOpen)
214                    {
215                        try
216                        {
217                            out_queue.Close();
218                        }
219                        catch (Exception e)
220                        {
221                            Console.WriteLine("Error cleaning up qmgr " + e.Message);
222                        }
223                    }
224    
225                    if (mqMgr != null && mqMgr.IsOpen)
226                    {
227                        try
228                        {
229                            mqMgr.Close();
230                        }
231                        catch (Exception e)
232                        {
233                            Console.WriteLine("Error cleaning up qmgr " + e.Message);
234                        }
235                    }
236    
237                }
238          }          }
239    
240          private void transportMq2Sql()          private void transportMq2Sql()
241          {          {
242                MQQueueManager mqMgr = null;
243                MQQueue in_queue = null;
244              string filename = getLogFilename(LogfileType.LogTransactions);              string filename = getLogFilename(LogfileType.LogTransactions);
245              using (StreamWriter translog = new StreamWriter(filename, true))              using (StreamWriter translog = new StreamWriter(filename, true))
246              try                  try
             {  
                 //MQ options  
                 Hashtable connProps = getConnectionProperties();                  
                 int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;  
   
                 //MySQL options  
                 string mysqlString = buildMysqlConnString();  
   
                 using (MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps))//stage 1 connect to mq  
                 using (MQQueue in_queue = mqMgr.AccessQueue(queueName, openOptions) )  
                 using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql  
247                  {                  {
248                        //MQ options
249                        Hashtable connProps = getConnectionProperties();
250                        int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
251    
252                        //MySQL options
253                        string mysqlString = buildMysqlConnString();
254    
255                        //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
256                        mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
257                        in_queue = mqMgr.AccessQueue(queueName, openOptions);
258                        using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
259                        {
260    
261                      sqlConnection.Open();                          sqlConnection.Open();
262    
263    
264                      //stage 3 move messages                          //stage 3 move messages
265                      bool isContinue = true;                          bool isContinue = true;
266                      while (isContinue)                          while (isContinue)
267                      {                          {
268    
269                          MQMessage mqMsg = new MQMessage();                              MQMessage mqMsg = new MQMessage();
270                          MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();                              MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
271    
272                          mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen                              mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
273    
274                          try                              try
                         {  
                             in_queue.Get(mqMsg, mqGetMsgOpts);  
                             if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)  
275                              {                              {
276                                  string msgString = mqMsg.ReadString(mqMsg.MessageLength);                                  in_queue.Get(mqMsg, mqGetMsgOpts);
277                                  System.Console.WriteLine(msgString);                                  if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
278                                    {
279                                        string msgString = mqMsg.ReadString(mqMsg.MessageLength);
280                                        System.Console.WriteLine(msgString);
281    
282                                  string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng                                      string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
283    
284                                  MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);                                      MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
285                                  int numrows = sqlcmd.ExecuteNonQuery();                                      int numrows = sqlcmd.ExecuteNonQuery();
286    
287                                        if (numrows == 1)
288                                        {
289                                            translog.WriteLine(getNowString() + " " + msgString);
290                                            mqMgr.Commit();
291                                            statusData.counter++;
292                                        }
293                                        else
294                                        {
295                                            mqMgr.Backout();
296                                            isContinue = false;
297                                        }
298    
                                 if (numrows == 1)  
                                 {  
                                     translog.WriteLine(getNowString() + " " + msgString + "\n");  
                                     mqMgr.Commit();  
                                     statusData.counter++;  
299                                  }                                  }
300                                  else                                  else
301                                  {                                  {
302                                      mqMgr.Backout();                                      System.Console.WriteLine("Non-text message");
                                     isContinue = false;  
303                                  }                                  }
   
304                              }                              }
305                              else                              catch (MQException mqe)
306                              {                              {
307                                  System.Console.WriteLine("Non-text message");                                  isContinue = false;
308                              }  
309                          }                                  // report reason, if any
310                          catch (MQException mqe)                                  if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
311                          {                                  {
312                              isContinue = false;                                      // special report for normal end
313                                        System.Console.WriteLine("no more messages");
314                                    }
315                                    else
316                                    {
317                                        // general report for other reasons
318                                        System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
319                                        statusData.lastrunOk = false;
320                                    }
321    
                             // report reason, if any  
                             if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)  
                             {  
                                 // special report for normal end  
                                 System.Console.WriteLine("no more messages");  
                             }  
                             else  
                             {  
                                 // general report for other reasons  
                                 System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;  
                                 statusData.lastrunOk = false;  
322                              }                              }
323    
324    
325                          }                          }
326    
327                        }
328    
329                    }
330                    catch (Exception e)
331                    {
332                        //Det vil være mest korrekt at Rollback/backout MQ Transaktionen her - for at være sikker på at Message'n fjernes fra køen
333                        try
334                        {
335                            if (mqMgr != null)
336                            {
337                                mqMgr.Backout();
338                            }
339                        }
340                        catch (Exception e2)
341                        {
342                            this.addLogEntry("Error backing out from MQ Transaction: " + e2.Message);
343                      }                      }
344    
345                        statusData.lastrunOk = false;
346    
347                        statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
348                        Console.WriteLine(statusData.lastErrorMessage);
349                        Console.WriteLine(e.StackTrace);
350                        EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
351                  }                  }
352                    finally
353                    {
354    
355              }                      if (in_queue != null && in_queue.IsOpen)
356              catch (Exception e)                      {
357              {                          try
358                  statusData.lastrunOk = false;                          {
359                                in_queue.Close();
360                            }
361                            catch (Exception e)
362                            {
363                                Console.WriteLine("Error cleaning up qmgr " + e.Message);
364                            }
365                        }
366                        
367                        if (mqMgr != null && mqMgr.IsOpen)
368                        {
369                            try
370                            {
371                                mqMgr.Close();
372                            } catch (Exception e) {
373                                Console.WriteLine("Error cleaning up qmgr " + e.Message);
374                            }
375                        }
376    
377                  statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.Message;                  }
                 Console.WriteLine(statusData.lastErrorMessage);  
                 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);  
             }  
378          }          }
379    
380          private string buildMysqlConnString()          private string buildMysqlConnString()
# Line 309  namespace DaoMqPump2 Line 385  namespace DaoMqPump2
385              //connectionString += "DATABASE=" + controller.mysqlHost + ";";              //connectionString += "DATABASE=" + controller.mysqlHost + ";";
386              connectionString += "UID=" + controller.mysqlUser + ";";              connectionString += "UID=" + controller.mysqlUser + ";";
387              connectionString += "PASSWORD=" + controller.mysqlPassword + ";";              connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
388                connectionString += "Max Pool Size=20;";
389                //connectionString += "ConnectionReset=true;";//ConnectionReset kræver muligvis at Default Database er angivet - det virker ihvertfald ikke uden
390    
391              return connectionString;              return connectionString;
392          }          }
# Line 330  namespace DaoMqPump2 Line 408  namespace DaoMqPump2
408              DateTime now = DateTime.Now;              DateTime now = DateTime.Now;
409              string filename = controller.logDirectory + "\\";              string filename = controller.logDirectory + "\\";
410    
411                //Find uge nr
412                DateTimeFormatInfo dfi = DateTimeFormatInfo.CurrentInfo;
413                Calendar myCal = CultureInfo.InvariantCulture.Calendar;//System.Globalization
414                int week = myCal.GetWeekOfYear(now, dfi.CalendarWeekRule, dfi.FirstDayOfWeek);
415    
416              switch (type)              switch (type)
417              {              {
418                  case LogfileType.LogEvents:                  case LogfileType.LogEvents:
# Line 342  namespace DaoMqPump2 Line 425  namespace DaoMqPump2
425              }              }
426    
427    
428              filename += name + "_" + now.Year.ToString("D4") + now.Month.ToString("D2") + ".log";              filename += name + "_" + now.Year.ToString("D4") + "_W" + week.ToString("D2") + ".log";
429    
430              return filename;              return filename;
431          }          }

Legend:
Removed from v.2011  
changed lines
  Added in v.2057

  ViewVC Help
Powered by ViewVC 1.1.20