/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Diff of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 2050 by torben, Fri Aug 23 17:07:25 2013 UTC revision 2051 by torben, Fri Aug 23 20:03:47 2013 UTC
# Line 139  namespace DaoMqPump2 Line 139  namespace DaoMqPump2
139    
140          private void transportSql2Mq()          private void transportSql2Mq()
141          {          {
142                MQQueueManager mqMgr = null;
143                MQQueue out_queue = null;
144    
145              string filename = getLogFilename(LogfileType.LogTransactions);              string filename = getLogFilename(LogfileType.LogTransactions);
146              using (StreamWriter translog = new StreamWriter(filename, true) )              using (StreamWriter translog = new StreamWriter(filename, true) )
147              try              try
# Line 149  namespace DaoMqPump2 Line 152  namespace DaoMqPump2
152    
153                  //MySQL Options                  //MySQL Options
154                  string mysqlString = buildMysqlConnString();                  string mysqlString = buildMysqlConnString();
                   
155    
156                  using (MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps))//stage 1 connect to mq                  //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
157                  using (MQQueue out_queue = mqMgr.AccessQueue(queueName, openOptions))                  mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
158                    out_queue = mqMgr.AccessQueue(queueName, openOptions);
159                  using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql                  using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
160                  using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))                  using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
161                  {                  {
# Line 203  namespace DaoMqPump2 Line 206  namespace DaoMqPump2
206                  Console.WriteLine(e.StackTrace);                  Console.WriteLine(e.StackTrace);
207                  EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);                  EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
208              }              }
209                finally
210                {
211    
212                    if (out_queue != null && out_queue.IsOpen)
213                    {
214                        try
215                        {
216                            out_queue.Close();
217                        }
218                        catch (Exception e)
219                        {
220                            Console.WriteLine("Error cleaning up qmgr " + e.Message);
221                        }
222                    }
223    
224                    if (mqMgr != null && mqMgr.IsOpen)
225                    {
226                        try
227                        {
228                            mqMgr.Close();
229                        }
230                        catch (Exception e)
231                        {
232                            Console.WriteLine("Error cleaning up qmgr " + e.Message);
233                        }
234                    }
235    
236                }
237          }          }
238    
239          private void transportMq2Sql()          private void transportMq2Sql()
240          {          {
241                MQQueueManager mqMgr = null;
242                MQQueue in_queue = null;
243              string filename = getLogFilename(LogfileType.LogTransactions);              string filename = getLogFilename(LogfileType.LogTransactions);
244              using (StreamWriter translog = new StreamWriter(filename, true))              using (StreamWriter translog = new StreamWriter(filename, true))
245              try                  try
             {  
                 //MQ options  
                 Hashtable connProps = getConnectionProperties();                  
                 int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;  
   
                 //MySQL options  
                 string mysqlString = buildMysqlConnString();  
   
                 using (MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps))//stage 1 connect to mq  
                 using (MQQueue in_queue = mqMgr.AccessQueue(queueName, openOptions) )  
                 using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql  
246                  {                  {
247                        //MQ options
248                        Hashtable connProps = getConnectionProperties();
249                        int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
250    
251                        //MySQL options
252                        string mysqlString = buildMysqlConnString();
253    
254                        //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
255                        mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
256                        in_queue = mqMgr.AccessQueue(queueName, openOptions);
257                        using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
258                        {
259    
260                      sqlConnection.Open();                          sqlConnection.Open();
261    
262    
263                      //stage 3 move messages                          //stage 3 move messages
264                      bool isContinue = true;                          bool isContinue = true;
265                      while (isContinue)                          while (isContinue)
266                      {                          {
267    
268                          MQMessage mqMsg = new MQMessage();                              MQMessage mqMsg = new MQMessage();
269                          MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();                              MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
270    
271                          mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen                              mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
272    
273                          try                              try
                         {  
                             in_queue.Get(mqMsg, mqGetMsgOpts);  
                             if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)  
274                              {                              {
275                                  string msgString = mqMsg.ReadString(mqMsg.MessageLength);                                  in_queue.Get(mqMsg, mqGetMsgOpts);
276                                  System.Console.WriteLine(msgString);                                  if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
277                                    {
278                                        string msgString = mqMsg.ReadString(mqMsg.MessageLength);
279                                        System.Console.WriteLine(msgString);
280    
281                                  string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng                                      string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
282    
283                                  MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);                                      MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
284                                  int numrows = sqlcmd.ExecuteNonQuery();                                      int numrows = sqlcmd.ExecuteNonQuery();
285    
286                                        if (numrows == 1)
287                                        {
288                                            translog.WriteLine(getNowString() + " " + msgString + "\n");
289                                            mqMgr.Commit();
290                                            statusData.counter++;
291                                        }
292                                        else
293                                        {
294                                            mqMgr.Backout();
295                                            isContinue = false;
296                                        }
297    
                                 if (numrows == 1)  
                                 {  
                                     translog.WriteLine(getNowString() + " " + msgString + "\n");  
                                     mqMgr.Commit();  
                                     statusData.counter++;  
298                                  }                                  }
299                                  else                                  else
300                                  {                                  {
301                                      mqMgr.Backout();                                      System.Console.WriteLine("Non-text message");
                                     isContinue = false;  
302                                  }                                  }
   
303                              }                              }
304                              else                              catch (MQException mqe)
305                              {                              {
306                                  System.Console.WriteLine("Non-text message");                                  isContinue = false;
307                              }  
308                          }                                  // report reason, if any
309                          catch (MQException mqe)                                  if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
310                          {                                  {
311                              isContinue = false;                                      // special report for normal end
312                                        System.Console.WriteLine("no more messages");
313                                    }
314                                    else
315                                    {
316                                        // general report for other reasons
317                                        System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
318                                        statusData.lastrunOk = false;
319                                    }
320    
                             // report reason, if any  
                             if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)  
                             {  
                                 // special report for normal end  
                                 System.Console.WriteLine("no more messages");  
                             }  
                             else  
                             {  
                                 // general report for other reasons  
                                 System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;  
                                 statusData.lastrunOk = false;  
321                              }                              }
322    
                         }  
323    
324                            }
325    
326                      }                      }
327    
328                  }                  }
329                    catch (Exception e)
330                    {
331                        statusData.lastrunOk = false;
332    
333              }                      statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
334              catch (Exception e)                      Console.WriteLine(statusData.lastErrorMessage);
335              {                      Console.WriteLine(e.StackTrace);
336                  statusData.lastrunOk = false;                      EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
337                                    }
338                  statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;                  finally
339                  Console.WriteLine(statusData.lastErrorMessage);                  {
340                  Console.WriteLine(e.StackTrace);  
341                  EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);                      if (in_queue != null && in_queue.IsOpen)
342              }                      {
343                            try
344                            {
345                                in_queue.Close();
346                            }
347                            catch (Exception e)
348                            {
349                                Console.WriteLine("Error cleaning up qmgr " + e.Message);
350                            }
351                        }
352                        
353                        if (mqMgr != null && mqMgr.IsOpen)
354                        {
355                            try
356                            {
357                                mqMgr.Close();
358                            } catch (Exception e) {
359                                Console.WriteLine("Error cleaning up qmgr " + e.Message);
360                            }
361                        }
362    
363                    }
364          }          }
365    
366          private string buildMysqlConnString()          private string buildMysqlConnString()
# Line 311  namespace DaoMqPump2 Line 371  namespace DaoMqPump2
371              //connectionString += "DATABASE=" + controller.mysqlHost + ";";              //connectionString += "DATABASE=" + controller.mysqlHost + ";";
372              connectionString += "UID=" + controller.mysqlUser + ";";              connectionString += "UID=" + controller.mysqlUser + ";";
373              connectionString += "PASSWORD=" + controller.mysqlPassword + ";";              connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
374              //connectionString += "maximumpoolsize=10;";              connectionString += "Max Pool Size=20;";
375              //connectionString += "ConnectionReset=true;";              //connectionString += "ConnectionReset=true;";//ConnectionReset kræver muligvis at Default Database er angivet - det virker ihvertfald ikke uden
376    
377              return connectionString;              return connectionString;
378          }          }

Legend:
Removed from v.2050  
changed lines
  Added in v.2051

  ViewVC Help
Powered by ViewVC 1.1.20