/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Diff of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1999 by torben, Mon Jul 8 14:22:33 2013 UTC revision 2057 by torben, Tue Aug 27 06:49:36 2013 UTC
# Line 7  using System.Diagnostics; Line 7  using System.Diagnostics;
7    
8  using IBM.WMQ;  using IBM.WMQ;
9  using MySql.Data.MySqlClient;  using MySql.Data.MySqlClient;
10    using System.Globalization;
11    
12  namespace DaoMqPump2  namespace DaoMqPump2
13  {  {
# Line 21  namespace DaoMqPump2 Line 22  namespace DaoMqPump2
22          public static string SQL2MQ = "sql2mq";          public static string SQL2MQ = "sql2mq";
23          public static string MQ2SQL = "mq2sql";          public static string MQ2SQL = "mq2sql";
24    
25          private bool enabled;                  //private bool enabled;        
26    
27          TransportController controller;          TransportController controller;
28    
# Line 55  namespace DaoMqPump2 Line 56  namespace DaoMqPump2
56          public bool Enabled          public bool Enabled
57          {          {
58              get {              get {
59                  return this.enabled;                  return statusData.transportEnabled;
60              }              }
61              set              set
62              {              {
63                  this.enabled = value;                  statusData.transportEnabled = value;
64                  if (value == true)                  if (value == true)
65                  {                  {
66                      this.addLogEntry("Transport enabled");                      this.addLogEntry("Transport enabled");
# Line 85  namespace DaoMqPump2 Line 86  namespace DaoMqPump2
86              this.sql2mqReadQuery = sql2mqReadQuery;              this.sql2mqReadQuery = sql2mqReadQuery;
87              this.sql2mqUpdateQuery = sql2mqUpdateQuery;              this.sql2mqUpdateQuery = sql2mqUpdateQuery;
88    
89              this.enabled = enabled;              statusData.transportEnabled = enabled;
90    
91    
92              statusData.lastrunOk = true;              statusData.lastrunOk = true;
# Line 103  namespace DaoMqPump2 Line 104  namespace DaoMqPump2
104    
105          public void transportMessages()          public void transportMessages()
106          {          {
107              if (enabled == false)              if (statusData.transportEnabled == false)
108                  return;                  return;
109    
110              Console.WriteLine(name + " -> transportMessages() ");              Console.WriteLine(name + " -> transportMessages() ");
# Line 139  namespace DaoMqPump2 Line 140  namespace DaoMqPump2
140    
141          private void transportSql2Mq()          private void transportSql2Mq()
142          {          {
143                MQQueueManager mqMgr = null;
144                MQQueue out_queue = null;
145    
146              string filename = getLogFilename(LogfileType.LogTransactions);              string filename = getLogFilename(LogfileType.LogTransactions);
147              using (StreamWriter translog = new StreamWriter(filename, true) )              using (StreamWriter translog = new StreamWriter(filename, true) )
148              try              try
149              {              {
150                  //stage 1 connect to mq                  //MQ Options
151                  Hashtable connProps = getConnectionProperties();                  Hashtable connProps = getConnectionProperties();
                 MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);  
152                  int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;                  int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
153    
154                  MQQueue out_queue = mqMgr.AccessQueue(queueName, openOptions);                  //MySQL Options
   
   
                 //stage 2 connect to mysql  
155                  string mysqlString = buildMysqlConnString();                  string mysqlString = buildMysqlConnString();
                 MySqlConnection sqlConnection = new MySqlConnection(mysqlString);  
                 sqlConnection.Open();  
156    
157                    //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
158                  //stage 3 move messages                  mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
159                  string readSql = "CALL " + sql2mqReadQuery + "()";                  out_queue = mqMgr.AccessQueue(queueName, openOptions);
160                  MySqlCommand readCmd = new MySqlCommand(readSql, sqlConnection);                  using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
161                  MySqlDataReader dataReader = readCmd.ExecuteReader();                  using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
                 while (dataReader.Read())  
162                  {                  {
163                      int id = dataReader.GetInt32(0);                      sqlReadConnection.Open();
164                      string msgString = dataReader.GetString(1);                      sqlWriteConnection.Open();
165    
166                      MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,                      //stage 3 move messages
167                      // same as MQPMO_DEFAULT                      string readSql = "CALL " + sql2mqReadQuery + "()";
168                        MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);
169                        MySqlDataReader dataReader = readCmd.ExecuteReader();
170                        while (dataReader.Read())
171                        {
172                            int id = dataReader.GetInt32(0);
173                            string msgString = dataReader.GetString(1);
174    
175                      MQMessage msg = new MQMessage();                          MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
176                      msg.Format = MQC.MQFMT_STRING;                          // same as MQPMO_DEFAULT
                     msg.CharacterSet = 1252;  
                     msg.WriteString(msgString);  
177    
178                      out_queue.Put(msg, pmo);                          MQMessage msg = new MQMessage();
179                            msg.Format = MQC.MQFMT_STRING;
180                            msg.CharacterSet = 1252;
181                            msg.WriteString(msgString);
182    
183                      //now that the message has been put on queue mark it as such                          out_queue.Put(msg, pmo);
184    
185                      string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";                          //now that the message has been put on queue mark it as such
                     MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlConnection);  
                     int numrows = updateCmd.ExecuteNonQuery();  
186    
187                      translog.WriteLine(getNowString() + " " + msgString + "\n");                          string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
188                            MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
189                            int numrows = updateCmd.ExecuteNonQuery();
190    
191                      if (numrows != 1)                          translog.WriteLine(getNowString() + " " + msgString);
192                      {  
193                          break;                          if (numrows != 1)
194                            {
195                                break;
196                            }
197                            statusData.counter++;
198                      }                      }
                     statusData.counter++;  
                 }  
                   
199    
200                  //stage 4: everything went smooth so clean up afterwards                  }
                 dataReader.Close();  
                 out_queue.Close();  
                 mqMgr.Close();  
                 sqlConnection.Close();  
201              }              }
202              catch (Exception e)              catch (Exception e)
203              {              {
204                  statusData.lastrunOk = false;                  statusData.lastrunOk = false;
205                  statusData.lastErrorMessage = name + ".transportSql2Mq error: " + e.Message;                  statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
206                  Console.WriteLine(statusData.lastErrorMessage);                  Console.WriteLine(statusData.lastErrorMessage);
207                    Console.WriteLine(e.StackTrace);
208                  EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);                  EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
209              }              }
210                finally
211                {
212    
213                    if (out_queue != null && out_queue.IsOpen)
214                    {
215                        try
216                        {
217                            out_queue.Close();
218                        }
219                        catch (Exception e)
220                        {
221                            Console.WriteLine("Error cleaning up qmgr " + e.Message);
222                        }
223                    }
224    
225                    if (mqMgr != null && mqMgr.IsOpen)
226                    {
227                        try
228                        {
229                            mqMgr.Close();
230                        }
231                        catch (Exception e)
232                        {
233                            Console.WriteLine("Error cleaning up qmgr " + e.Message);
234                        }
235                    }
236    
237                }
238          }          }
239    
240          private void transportMq2Sql()          private void transportMq2Sql()
241          {          {
242                MQQueueManager mqMgr = null;
243                MQQueue in_queue = null;
244              string filename = getLogFilename(LogfileType.LogTransactions);              string filename = getLogFilename(LogfileType.LogTransactions);
245              using (StreamWriter translog = new StreamWriter(filename, true))              using (StreamWriter translog = new StreamWriter(filename, true))
246              try                  try
             {  
                 //stage 1 connect to mq  
                 Hashtable connProps = getConnectionProperties();  
                 MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);  
                 int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;  
   
                 MQQueue in_queue = mqMgr.AccessQueue(queueName, openOptions);  
   
   
                 //stage 2 connect to mysql  
                 string mysqlString = buildMysqlConnString();  
                 MySqlConnection sqlConnection = new MySqlConnection(mysqlString);  
                 sqlConnection.Open();  
   
   
                 //stage 3 move messages  
                 bool isContinue = true;  
                 while (isContinue)  
247                  {                  {
248                                            //MQ options
249                      MQMessage mqMsg = new MQMessage();                      Hashtable connProps = getConnectionProperties();
250                      MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();                      int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
251    
252                        //MySQL options
253                        string mysqlString = buildMysqlConnString();
254    
255                        //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
256                        mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
257                        in_queue = mqMgr.AccessQueue(queueName, openOptions);
258                        using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
259                        {
260    
261                      mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen                          sqlConnection.Open();
262    
                     try  
                     {  
                         in_queue.Get(mqMsg, mqGetMsgOpts);  
                         if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)  
                         {                              
                             string msgString = mqMsg.ReadString(mqMsg.MessageLength);  
                             System.Console.WriteLine(msgString);  
263    
264                              string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng                          //stage 3 move messages
265                            bool isContinue = true;
266                            while (isContinue)
267                            {
268    
269                              MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);                              MQMessage mqMsg = new MQMessage();
270                              int numrows = sqlcmd.ExecuteNonQuery();                              MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
271    
272                              if (numrows == 1)                              mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
273    
274                                try
275                              {                              {
276                                  translog.WriteLine(getNowString() + " " + msgString + "\n");                                  in_queue.Get(mqMsg, mqGetMsgOpts);
277                                  mqMgr.Commit();                                  if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
278                                  statusData.counter++;                                  {
279                                        string msgString = mqMsg.ReadString(mqMsg.MessageLength);
280                                        System.Console.WriteLine(msgString);
281    
282                                        string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
283    
284                                        MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
285                                        int numrows = sqlcmd.ExecuteNonQuery();
286    
287                                        if (numrows == 1)
288                                        {
289                                            translog.WriteLine(getNowString() + " " + msgString);
290                                            mqMgr.Commit();
291                                            statusData.counter++;
292                                        }
293                                        else
294                                        {
295                                            mqMgr.Backout();
296                                            isContinue = false;
297                                        }
298    
299                                    }
300                                    else
301                                    {
302                                        System.Console.WriteLine("Non-text message");
303                                    }
304                              }                              }
305                              else                              catch (MQException mqe)
306                              {                              {
                                 mqMgr.Backout();  
307                                  isContinue = false;                                  isContinue = false;
308                              }                              
309                                    // report reason, if any
310                                    if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
311                                    {
312                                        // special report for normal end
313                                        System.Console.WriteLine("no more messages");
314                                    }
315                                    else
316                                    {
317                                        // general report for other reasons
318                                        System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
319                                        statusData.lastrunOk = false;
320                                    }
321    
322                                }
323    
324    
325                          }                          }
326                          else  
327                        }
328    
329                    }
330                    catch (Exception e)
331                    {
332                        //Det vil være mest korrekt at Rollback/backout MQ Transaktionen her - for at være sikker på at Message'n fjernes fra køen
333                        try
334                        {
335                            if (mqMgr != null)
336                          {                          {
337                              System.Console.WriteLine("Non-text message");                              mqMgr.Backout();
338                          }                          }
339                      }                      }
340                      catch (MQException mqe)                      catch (Exception e2)
341                      {                      {
342                          isContinue = false;                          this.addLogEntry("Error backing out from MQ Transaction: " + e2.Message);
343                        }
344    
345                        statusData.lastrunOk = false;
346    
347                        statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
348                        Console.WriteLine(statusData.lastErrorMessage);
349                        Console.WriteLine(e.StackTrace);
350                        EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
351                    }
352                    finally
353                    {
354    
355                          // report reason, if any                      if (in_queue != null && in_queue.IsOpen)
356                          if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)                      {
357                            try
358                          {                          {
359                              // special report for normal end                              in_queue.Close();
                             System.Console.WriteLine("no more messages");  
360                          }                          }
361                          else                          catch (Exception e)
362                          {                          {
363                              // general report for other reasons                              Console.WriteLine("Error cleaning up qmgr " + e.Message);
364                              System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message);;                          }
365                              statusData.lastrunOk = false;                      }
366                        
367                        if (mqMgr != null && mqMgr.IsOpen)
368                        {
369                            try
370                            {
371                                mqMgr.Close();
372                            } catch (Exception e) {
373                                Console.WriteLine("Error cleaning up qmgr " + e.Message);
374                          }                          }
   
375                      }                      }
   
376    
377                  }                  }
   
                 //stage 4: everything went smooth so clean up afterwards  
                 in_queue.Close();  
                 mqMgr.Close();  
                 sqlConnection.Close();  
   
   
             }  
             catch (Exception e)  
             {  
                 statusData.lastrunOk = false;  
   
                 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.Message;  
                 Console.WriteLine(statusData.lastErrorMessage);  
                 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);  
             }  
378          }          }
379    
380          private string buildMysqlConnString()          private string buildMysqlConnString()
# Line 315  namespace DaoMqPump2 Line 385  namespace DaoMqPump2
385              //connectionString += "DATABASE=" + controller.mysqlHost + ";";              //connectionString += "DATABASE=" + controller.mysqlHost + ";";
386              connectionString += "UID=" + controller.mysqlUser + ";";              connectionString += "UID=" + controller.mysqlUser + ";";
387              connectionString += "PASSWORD=" + controller.mysqlPassword + ";";              connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
388                connectionString += "Max Pool Size=20;";
389                //connectionString += "ConnectionReset=true;";//ConnectionReset kræver muligvis at Default Database er angivet - det virker ihvertfald ikke uden
390    
391              return connectionString;              return connectionString;
392          }          }
# Line 336  namespace DaoMqPump2 Line 408  namespace DaoMqPump2
408              DateTime now = DateTime.Now;              DateTime now = DateTime.Now;
409              string filename = controller.logDirectory + "\\";              string filename = controller.logDirectory + "\\";
410    
411                //Find uge nr
412                DateTimeFormatInfo dfi = DateTimeFormatInfo.CurrentInfo;
413                Calendar myCal = CultureInfo.InvariantCulture.Calendar;//System.Globalization
414                int week = myCal.GetWeekOfYear(now, dfi.CalendarWeekRule, dfi.FirstDayOfWeek);
415    
416              switch (type)              switch (type)
417              {              {
418                  case LogfileType.LogEvents:                  case LogfileType.LogEvents:
# Line 348  namespace DaoMqPump2 Line 425  namespace DaoMqPump2
425              }              }
426    
427    
428              filename += "transactionlog_" + name + "_" + now.Year.ToString("D4") + now.Month.ToString("D2") + ".log";              filename += name + "_" + now.Year.ToString("D4") + "_W" + week.ToString("D2") + ".log";
429    
430              return filename;              return filename;
431          }          }

Legend:
Removed from v.1999  
changed lines
  Added in v.2057

  ViewVC Help
Powered by ViewVC 1.1.20