/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Diff of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 2003 by torben, Mon Jul 8 14:49:31 2013 UTC revision 2056 by torben, Tue Aug 27 06:33:28 2013 UTC
# Line 139  namespace DaoMqPump2 Line 139  namespace DaoMqPump2
139    
140          private void transportSql2Mq()          private void transportSql2Mq()
141          {          {
142                MQQueueManager mqMgr = null;
143                MQQueue out_queue = null;
144    
145              string filename = getLogFilename(LogfileType.LogTransactions);              string filename = getLogFilename(LogfileType.LogTransactions);
146              using (StreamWriter translog = new StreamWriter(filename, true) )              using (StreamWriter translog = new StreamWriter(filename, true) )
147              try              try
148              {              {
149                  //stage 1 connect to mq                  //MQ Options
150                  Hashtable connProps = getConnectionProperties();                  Hashtable connProps = getConnectionProperties();
                 MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);  
151                  int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;                  int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
152    
153                  MQQueue out_queue = mqMgr.AccessQueue(queueName, openOptions);                  //MySQL Options
   
   
                 //stage 2 connect to mysql  
154                  string mysqlString = buildMysqlConnString();                  string mysqlString = buildMysqlConnString();
                 MySqlConnection sqlConnection = new MySqlConnection(mysqlString);  
                 sqlConnection.Open();  
155    
156                    //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
157                  //stage 3 move messages                  mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
158                  string readSql = "CALL " + sql2mqReadQuery + "()";                  out_queue = mqMgr.AccessQueue(queueName, openOptions);
159                  MySqlCommand readCmd = new MySqlCommand(readSql, sqlConnection);                  using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
160                  MySqlDataReader dataReader = readCmd.ExecuteReader();                  using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
                 while (dataReader.Read())  
161                  {                  {
162                      int id = dataReader.GetInt32(0);                      sqlReadConnection.Open();
163                      string msgString = dataReader.GetString(1);                      sqlWriteConnection.Open();
164    
165                        //stage 3 move messages
166                        string readSql = "CALL " + sql2mqReadQuery + "()";
167                        MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);
168                        MySqlDataReader dataReader = readCmd.ExecuteReader();
169                        while (dataReader.Read())
170                        {
171                            int id = dataReader.GetInt32(0);
172                            string msgString = dataReader.GetString(1);
173    
174                      MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,                          MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
175                      // same as MQPMO_DEFAULT                          // same as MQPMO_DEFAULT
176    
177                      MQMessage msg = new MQMessage();                          MQMessage msg = new MQMessage();
178                      msg.Format = MQC.MQFMT_STRING;                          msg.Format = MQC.MQFMT_STRING;
179                      msg.CharacterSet = 1252;                          msg.CharacterSet = 1252;
180                      msg.WriteString(msgString);                          msg.WriteString(msgString);
181    
182                      out_queue.Put(msg, pmo);                          out_queue.Put(msg, pmo);
183    
184                      //now that the message has been put on queue mark it as such                          //now that the message has been put on queue mark it as such
185    
186                      string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";                          string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
187                      MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlConnection);                          MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
188                      int numrows = updateCmd.ExecuteNonQuery();                          int numrows = updateCmd.ExecuteNonQuery();
189    
190                      translog.WriteLine(getNowString() + " " + msgString + "\n");                          translog.WriteLine(getNowString() + " " + msgString);
191    
192                      if (numrows != 1)                          if (numrows != 1)
193                      {                          {
194                          break;                              break;
195                            }
196                            statusData.counter++;
197                      }                      }
                     statusData.counter++;  
                 }  
                   
198    
199                  //stage 4: everything went smooth so clean up afterwards                  }
                 dataReader.Close();  
                 out_queue.Close();  
                 mqMgr.Close();  
                 sqlConnection.Close();  
200              }              }
201              catch (Exception e)              catch (Exception e)
202              {              {
203                  statusData.lastrunOk = false;                  statusData.lastrunOk = false;
204                  statusData.lastErrorMessage = name + ".transportSql2Mq error: " + e.Message;                  statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
205                  Console.WriteLine(statusData.lastErrorMessage);                  Console.WriteLine(statusData.lastErrorMessage);
206                    Console.WriteLine(e.StackTrace);
207                  EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);                  EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
208              }              }
209                finally
210                {
211    
212                    if (out_queue != null && out_queue.IsOpen)
213                    {
214                        try
215                        {
216                            out_queue.Close();
217                        }
218                        catch (Exception e)
219                        {
220                            Console.WriteLine("Error cleaning up qmgr " + e.Message);
221                        }
222                    }
223    
224                    if (mqMgr != null && mqMgr.IsOpen)
225                    {
226                        try
227                        {
228                            mqMgr.Close();
229                        }
230                        catch (Exception e)
231                        {
232                            Console.WriteLine("Error cleaning up qmgr " + e.Message);
233                        }
234                    }
235    
236                }
237          }          }
238    
239          private void transportMq2Sql()          private void transportMq2Sql()
240          {          {
241                MQQueueManager mqMgr = null;
242                MQQueue in_queue = null;
243              string filename = getLogFilename(LogfileType.LogTransactions);              string filename = getLogFilename(LogfileType.LogTransactions);
244              using (StreamWriter translog = new StreamWriter(filename, true))              using (StreamWriter translog = new StreamWriter(filename, true))
245              try                  try
             {  
                 //stage 1 connect to mq  
                 Hashtable connProps = getConnectionProperties();  
                 MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);  
                 int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;  
   
                 MQQueue in_queue = mqMgr.AccessQueue(queueName, openOptions);  
   
   
                 //stage 2 connect to mysql  
                 string mysqlString = buildMysqlConnString();  
                 MySqlConnection sqlConnection = new MySqlConnection(mysqlString);  
                 sqlConnection.Open();  
   
   
                 //stage 3 move messages  
                 bool isContinue = true;  
                 while (isContinue)  
246                  {                  {
247                                            //MQ options
248                      MQMessage mqMsg = new MQMessage();                      Hashtable connProps = getConnectionProperties();
249                      MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();                      int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
250    
251                        //MySQL options
252                        string mysqlString = buildMysqlConnString();
253    
254                        //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
255                        mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
256                        in_queue = mqMgr.AccessQueue(queueName, openOptions);
257                        using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
258                        {
259    
260                      mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen                          sqlConnection.Open();
261    
                     try  
                     {  
                         in_queue.Get(mqMsg, mqGetMsgOpts);  
                         if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)  
                         {                              
                             string msgString = mqMsg.ReadString(mqMsg.MessageLength);  
                             System.Console.WriteLine(msgString);  
262    
263                              string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng                          //stage 3 move messages
264                            bool isContinue = true;
265                            while (isContinue)
266                            {
267    
268                                MQMessage mqMsg = new MQMessage();
269                                MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
270    
271                              MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);                              mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
                             int numrows = sqlcmd.ExecuteNonQuery();  
272    
273                              if (numrows == 1)                              try
274                              {                              {
275                                  translog.WriteLine(getNowString() + " " + msgString + "\n");                                  in_queue.Get(mqMsg, mqGetMsgOpts);
276                                  mqMgr.Commit();                                  if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
277                                  statusData.counter++;                                  {
278                                        string msgString = mqMsg.ReadString(mqMsg.MessageLength);
279                                        System.Console.WriteLine(msgString);
280    
281                                        string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
282    
283                                        MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
284                                        int numrows = sqlcmd.ExecuteNonQuery();
285    
286                                        if (numrows == 1)
287                                        {
288                                            translog.WriteLine(getNowString() + " " + msgString);
289                                            mqMgr.Commit();
290                                            statusData.counter++;
291                                        }
292                                        else
293                                        {
294                                            mqMgr.Backout();
295                                            isContinue = false;
296                                        }
297    
298                                    }
299                                    else
300                                    {
301                                        System.Console.WriteLine("Non-text message");
302                                    }
303                              }                              }
304                              else                              catch (MQException mqe)
305                              {                              {
                                 mqMgr.Backout();  
306                                  isContinue = false;                                  isContinue = false;
307                              }                              
308                                    // report reason, if any
309                                    if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
310                                    {
311                                        // special report for normal end
312                                        System.Console.WriteLine("no more messages");
313                                    }
314                                    else
315                                    {
316                                        // general report for other reasons
317                                        System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
318                                        statusData.lastrunOk = false;
319                                    }
320    
321                                }
322    
323    
324                          }                          }
325                          else  
326                        }
327    
328                    }
329                    catch (Exception e)
330                    {
331                        //Det vil være mest korrekt at Rollback/backout MQ Transaktionen her - for at være sikker på at Message'n fjernes fra køen
332                        try
333                        {
334                            if (mqMgr != null)
335                          {                          {
336                              System.Console.WriteLine("Non-text message");                              mqMgr.Backout();
337                          }                          }
338                      }                      }
339                      catch (MQException mqe)                      catch (Exception e2)
340                      {                      {
341                          isContinue = false;                          this.addLogEntry("Error backing out from MQ Transaction: " + e2.Message);
342                        }
343    
344                        statusData.lastrunOk = false;
345    
346                        statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
347                        Console.WriteLine(statusData.lastErrorMessage);
348                        Console.WriteLine(e.StackTrace);
349                        EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
350                    }
351                    finally
352                    {
353    
354                          // report reason, if any                      if (in_queue != null && in_queue.IsOpen)
355                          if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)                      {
356                            try
357                          {                          {
358                              // special report for normal end                              in_queue.Close();
                             System.Console.WriteLine("no more messages");  
359                          }                          }
360                          else                          catch (Exception e)
361                          {                          {
362                              // general report for other reasons                              Console.WriteLine("Error cleaning up qmgr " + e.Message);
363                              System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message);;                          }
364                              statusData.lastrunOk = false;                      }
365                        
366                        if (mqMgr != null && mqMgr.IsOpen)
367                        {
368                            try
369                            {
370                                mqMgr.Close();
371                            } catch (Exception e) {
372                                Console.WriteLine("Error cleaning up qmgr " + e.Message);
373                          }                          }
   
374                      }                      }
   
375    
376                  }                  }
   
                 //stage 4: everything went smooth so clean up afterwards  
                 in_queue.Close();  
                 mqMgr.Close();  
                 sqlConnection.Close();  
   
   
             }  
             catch (Exception e)  
             {  
                 statusData.lastrunOk = false;  
   
                 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.Message;  
                 Console.WriteLine(statusData.lastErrorMessage);  
                 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);  
             }  
377          }          }
378    
379          private string buildMysqlConnString()          private string buildMysqlConnString()
# Line 315  namespace DaoMqPump2 Line 384  namespace DaoMqPump2
384              //connectionString += "DATABASE=" + controller.mysqlHost + ";";              //connectionString += "DATABASE=" + controller.mysqlHost + ";";
385              connectionString += "UID=" + controller.mysqlUser + ";";              connectionString += "UID=" + controller.mysqlUser + ";";
386              connectionString += "PASSWORD=" + controller.mysqlPassword + ";";              connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
387                connectionString += "Max Pool Size=20;";
388                //connectionString += "ConnectionReset=true;";//ConnectionReset kræver muligvis at Default Database er angivet - det virker ihvertfald ikke uden
389    
390              return connectionString;              return connectionString;
391          }          }

Legend:
Removed from v.2003  
changed lines
  Added in v.2056

  ViewVC Help
Powered by ViewVC 1.1.20