/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Diff of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1986 by torben, Wed Jul 3 07:56:52 2013 UTC revision 2058 by torben, Wed Aug 28 06:45:20 2013 UTC
# Line 7  using System.Diagnostics; Line 7  using System.Diagnostics;
7    
8  using IBM.WMQ;  using IBM.WMQ;
9  using MySql.Data.MySqlClient;  using MySql.Data.MySqlClient;
10    using System.Globalization;
11    
12  namespace DaoMqPump2  namespace DaoMqPump2
13  {  {
14      public class Transport      public class Transport
15      {      {
16    
17            enum LogfileType {
18                LogTransactions,
19                LogEvents,
20                LogDiscarded
21            }
22    
23          public static string SQL2MQ = "sql2mq";          public static string SQL2MQ = "sql2mq";
24          public static string MQ2SQL = "mq2sql";          public static string MQ2SQL = "mq2sql";
25    
26            //private bool enabled;        
         public bool enabled { get; set; }  
27    
28          TransportController controller;          TransportController controller;
29    
30            StatusData statusData = new StatusData();
31    
32          public string name { get; private set; }          public string name { get; private set; }
33          public string direction { get; private set; }          public string direction { get; private set; }
34          public string queueName { get; private set; }          public string queueName { get; private set; }
# Line 27  namespace DaoMqPump2 Line 36  namespace DaoMqPump2
36          public string sql2mqReadQuery { get; private set; }          public string sql2mqReadQuery { get; private set; }
37          public string sql2mqUpdateQuery { get; private set; }          public string sql2mqUpdateQuery { get; private set; }
38    
39          public bool lastrunOk { get; private set; }          //public bool lastrunOk { get; private set; }
40          public string lastErrorMessage { get; private set; }          //public string lastErrorMessage { get; private set; }
41    
42            //public string lastOkTime { get; private set; }
43            //public string lastErrorTime { get; private set; }
44            //public string lastTransferTime { get; private set; }
45    
46          public string lastOkTime { get; private set; }          //public int counter { get; private set; }
47          public string lastErrorTime { get; private set; }  
48            public StatusData TransportStatusData
49            {
50                get
51                {
52                    return this.statusData;
53                }
54            }
55    
56    
57            public bool Enabled
58            {
59                get {
60                    return statusData.transportEnabled;
61                }
62                set
63                {
64                    statusData.transportEnabled = value;
65                    if (value == true)
66                    {
67                        this.addLogEntry("Transport enabled");
68                    }
69                    else
70                    {
71                        this.addLogEntry("Transport disabled");
72                    }
73                }
74            }
75    
         public int counter { get; private set; }  
76    
77          private LinkedList<string> logEntries = new LinkedList<string>();          private LinkedList<string> logEntries = new LinkedList<string>();
78    
# Line 48  namespace DaoMqPump2 Line 87  namespace DaoMqPump2
87              this.sql2mqReadQuery = sql2mqReadQuery;              this.sql2mqReadQuery = sql2mqReadQuery;
88              this.sql2mqUpdateQuery = sql2mqUpdateQuery;              this.sql2mqUpdateQuery = sql2mqUpdateQuery;
89    
90              this.enabled = enabled;              statusData.transportEnabled = enabled;
91                
92                
93              lastrunOk = true;              statusData.lastrunOk = true;
94              counter = 0;              statusData.counter = 0;
95              lastErrorMessage = lastOkTime = lastErrorTime = "";              statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";
96    
97              addLogEntry( "Starting ... " );              addLogEntry( "Starting ... " );
98          }          }
99    
100            ~Transport()
101            {
102                addLogEntry("Stopping ... ");
103            }
104    
105    
106          public void transportMessages()          public void transportMessages()
107          {          {
108              if (enabled == false)              if (statusData.transportEnabled == false)
109                  return;                  return;
110    
111              Console.WriteLine(name + " -> transportMessages() ");              Console.WriteLine(name + " -> transportMessages() ");
112              lastrunOk = true;              statusData.lastrunOk = true;
113    
114                int startCounter = statusData.counter;
115    
116              if (direction == SQL2MQ)              if (direction == SQL2MQ)
117              {              {
# Line 75  namespace DaoMqPump2 Line 122  namespace DaoMqPump2
122                  transportMq2Sql();                  transportMq2Sql();
123              }              }
124    
125              if (lastrunOk == true)              if (statusData.lastrunOk == true)
126              {              {
127                  lastOkTime = getNowString();                  statusData.lastOkTime = getNowString();
128    
129                    if (statusData.counter != startCounter)
130                    {
131                        //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet
132                        statusData.lastTransferTime = getNowString();
133                    }
134              }              }
135              else              else
136              {              {
137                  addLogEntry(lastErrorMessage);                  addLogEntry(statusData.lastErrorMessage);
138                  lastErrorTime = getNowString();                  statusData.lastErrorTime = getNowString();
139              }              }
140          }          }
141    
142          private void transportSql2Mq()          private void transportSql2Mq()
143          {          {
144              string filename = getTransactionlogFilename();              MQQueueManager mqMgr = null;
145                MQQueue out_queue = null;
146    
147                string filename = getLogFilename(LogfileType.LogTransactions);
148              using (StreamWriter translog = new StreamWriter(filename, true) )              using (StreamWriter translog = new StreamWriter(filename, true) )
149              try              try
150              {              {
151                  //stage 1 connect to mq                  //MQ Options
152                  Hashtable connProps = getConnectionProperties();                  Hashtable connProps = getConnectionProperties();
                 MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);  
153                  int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;                  int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
154    
155                  MQQueue out_queue = mqMgr.AccessQueue(queueName, openOptions);                  //MySQL Options
   
   
                 //stage 2 connect to mysql  
156                  string mysqlString = buildMysqlConnString();                  string mysqlString = buildMysqlConnString();
                 MySqlConnection sqlConnection = new MySqlConnection(mysqlString);  
                 sqlConnection.Open();  
157    
158                    //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
159                  //stage 3 move messages                  mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
160                  string readSql = "CALL " + sql2mqReadQuery + "()";                  out_queue = mqMgr.AccessQueue(queueName, openOptions);
161                  MySqlCommand readCmd = new MySqlCommand(readSql, sqlConnection);                  using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
162                  MySqlDataReader dataReader = readCmd.ExecuteReader();                  using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
                 while (dataReader.Read())  
163                  {                  {
164                      int id = dataReader.GetInt32(0);                      sqlReadConnection.Open();
165                      string msgString = dataReader.GetString(1);                      sqlWriteConnection.Open();
166    
167                        //stage 3 move messages
168                        string readSql = "CALL " + sql2mqReadQuery + "()";
169                        MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);
170                        MySqlDataReader dataReader = readCmd.ExecuteReader();
171                        while (dataReader.Read())
172                        {
173                            int id = dataReader.GetInt32(0);
174                            string msgString = dataReader.GetString(1);
175    
176                      MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,                          MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
177                      // same as MQPMO_DEFAULT                          // same as MQPMO_DEFAULT
178    
179                      MQMessage msg = new MQMessage();                          MQMessage msg = new MQMessage();
180                      msg.Format = MQC.MQFMT_STRING;                          msg.Format = MQC.MQFMT_STRING;
181                      msg.CharacterSet = 1252;                          msg.CharacterSet = 1252;
182                      msg.WriteString(msgString);                          msg.WriteString(msgString);
183    
184                      out_queue.Put(msg, pmo);                          out_queue.Put(msg, pmo);
185    
186                      //now that the message has been put on queue mark it as such                          //now that the message has been put on queue mark it as such
187    
188                      string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";                          string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
189                      MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlConnection);                          MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
190                      int numrows = updateCmd.ExecuteNonQuery();                          int numrows = updateCmd.ExecuteNonQuery();
191    
192                      translog.WriteLine(getNowString() + " " + msgString + "\n");                          translog.WriteLine(getNowString() + " " + msgString);
193    
194                      if (numrows != 1)                          if (numrows != 1)
195                      {                          {
196                          break;                              break;
197                            }
198                            statusData.counter++;
199                      }                      }
                     counter++;  
                 }  
                   
200    
201                  //stage 4: everything went smooth so clean up afterwards                  }
                 dataReader.Close();  
                 out_queue.Close();  
                 mqMgr.Close();  
                 sqlConnection.Close();  
202              }              }
203              catch (Exception e)              catch (Exception e)
204              {              {
205                  lastrunOk = false;                  statusData.lastrunOk = false;
206                  lastErrorMessage = name + ".transportSql2Mq error: " + e.Message;                  statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
207                  Console.WriteLine(lastErrorMessage);                  Console.WriteLine(statusData.lastErrorMessage);
208                  EventLog.WriteEntry("DaoMqPump2", lastErrorMessage, EventLogEntryType.Warning);                  Console.WriteLine(e.StackTrace);
209                    EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
210              }              }
211          }              finally
   
         private void transportMq2Sql()  
         {  
             string filename = getTransactionlogFilename();  
             using (StreamWriter translog = new StreamWriter(filename, true))  
             try  
212              {              {
                 //stage 1 connect to mq  
                 Hashtable connProps = getConnectionProperties();  
                 MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);  
                 int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;  
   
                 MQQueue in_queue = mqMgr.AccessQueue(queueName, openOptions);  
213    
214                    if (out_queue != null && out_queue.IsOpen)
215                    {
216                        try
217                        {
218                            out_queue.Close();
219                        }
220                        catch (Exception e)
221                        {
222                            Console.WriteLine("Error cleaning up qmgr " + e.Message);
223                        }
224                    }
225    
226                  //stage 2 connect to mysql                  if (mqMgr != null && mqMgr.IsOpen)
227                  string mysqlString = buildMysqlConnString();                  {
228                  MySqlConnection sqlConnection = new MySqlConnection(mysqlString);                      try
229                  sqlConnection.Open();                      {
230                            mqMgr.Close();
231                        }
232                        catch (Exception e)
233                        {
234                            Console.WriteLine("Error cleaning up qmgr " + e.Message);
235                        }
236                    }
237    
238                }
239            }
240    
241                  //stage 3 move messages          private void transportMq2Sql()
242                  bool isContinue = true;          {
243                  while (isContinue)              MQQueueManager mqMgr = null;
244                MQQueue in_queue = null;
245                string filename = getLogFilename(LogfileType.LogTransactions);
246                using (StreamWriter translog = new StreamWriter(filename, true))
247                    try
248                  {                  {
249                                            //MQ options
250                      MQMessage mqMsg = new MQMessage();                      Hashtable connProps = getConnectionProperties();
251                      MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();                      int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
252    
253                        //MySQL options
254                        string mysqlString = buildMysqlConnString();
255    
256                        //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
257                        mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
258                        in_queue = mqMgr.AccessQueue(queueName, openOptions);
259                        using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
260                        {
261    
262                      mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen                          sqlConnection.Open();
263    
                     try  
                     {  
                         in_queue.Get(mqMsg, mqGetMsgOpts);  
                         if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)  
                         {                              
                             string msgString = mqMsg.ReadString(mqMsg.MessageLength);  
                             System.Console.WriteLine(msgString);  
264    
265                              string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng                          //stage 3 move messages
266                            bool isContinue = true;
267                            while (isContinue)
268                            {
269    
270                                MQMessage mqMsg = new MQMessage();
271                                MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
272    
273                              MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);                              mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
                             int numrows = sqlcmd.ExecuteNonQuery();  
274    
275                              if (numrows == 1)                              try
276                              {                              {
277                                  translog.WriteLine(getNowString() + " " + msgString + "\n");                                  in_queue.Get(mqMsg, mqGetMsgOpts);
278                                  mqMgr.Commit();                                  if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
279                                  counter++;                                  {
280                                        string msgString = mqMsg.ReadString(mqMsg.MessageLength);
281                                        System.Console.WriteLine(msgString);
282    
283                                        if ( msgString.StartsWith("?") ) //Hvis transaktionen starter med et ? er det ikke en gyldig transaktion
284                                        {
285                                            string discarded_filename = getLogFilename(LogfileType.LogDiscarded);
286                                            using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))
287                                            {
288                                                discardedlog.WriteLine(msgString);
289                                            }
290                                            mqMgr.Commit();//fjern den afviste transaktion fra køen
291                                            statusData.discardedCounter++;
292                                            continue; //gå frem til at tage næste transaktion fra køen
293                                        }
294    
295    
296                                        string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
297    
298                                        MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
299                                        int numrows = sqlcmd.ExecuteNonQuery();
300    
301                                        if (numrows == 1)
302                                        {
303                                            translog.WriteLine(getNowString() + " " + msgString);
304                                            mqMgr.Commit();
305                                            statusData.counter++;
306                                        }
307                                        else
308                                        {
309                                            mqMgr.Backout();
310                                            isContinue = false;
311                                        }
312    
313                                    }
314                                    else
315                                    {
316                                        System.Console.WriteLine("Non-text message");
317                                    }
318                              }                              }
319                              else                              catch (MQException mqe)
320                              {                              {
                                 mqMgr.Backout();  
321                                  isContinue = false;                                  isContinue = false;
322                              }                              
323                                    // report reason, if any
324                                    if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
325                                    {
326                                        // special report for normal end
327                                        System.Console.WriteLine("no more messages");
328                                    }
329                                    else
330                                    {
331                                        // general report for other reasons
332                                        System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
333                                        statusData.lastrunOk = false;
334                                    }
335    
336                                }
337    
338    
339                          }                          }
340                          else  
341                        }
342    
343                    }
344                    catch (Exception e)
345                    {
346                        //Det vil være mest korrekt at Rollback/backout MQ Transaktionen her - for at være sikker på at Message'n fjernes fra køen
347                        try
348                        {
349                            if (mqMgr != null)
350                          {                          {
351                              System.Console.WriteLine("Non-text message");                              mqMgr.Backout();
352                          }                          }
353                      }                      }
354                      catch (MQException mqe)                      catch (Exception e2)
355                      {                      {
356                          isContinue = false;                          this.addLogEntry("Error backing out from MQ Transaction: " + e2.Message);
357                        }
358    
359                        statusData.lastrunOk = false;
360    
361                        statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
362                        Console.WriteLine(statusData.lastErrorMessage);
363                        Console.WriteLine(e.StackTrace);
364                        EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
365                    }
366                    finally
367                    {
368    
369                          // report reason, if any                      if (in_queue != null && in_queue.IsOpen)
370                          if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)                      {
371                            try
372                          {                          {
373                              // special report for normal end                              in_queue.Close();
                             System.Console.WriteLine("no more messages");  
374                          }                          }
375                          else                          catch (Exception e)
376                          {                          {
377                              // general report for other reasons                              Console.WriteLine("Error cleaning up qmgr " + e.Message);
378                              System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message);;                          }
379                              lastrunOk = false;                      }
380                        
381                        if (mqMgr != null && mqMgr.IsOpen)
382                        {
383                            try
384                            {
385                                mqMgr.Close();
386                            } catch (Exception e) {
387                                Console.WriteLine("Error cleaning up qmgr " + e.Message);
388                          }                          }
   
389                      }                      }
   
390    
391                  }                  }
   
                 //stage 4: everything went smooth so clean up afterwards  
                 in_queue.Close();  
                 mqMgr.Close();  
                 sqlConnection.Close();  
   
   
             }  
             catch (Exception e)  
             {  
                 lastrunOk = false;  
   
                 lastErrorMessage = name + ".transportMq2Sql error: " + e.Message;  
                 Console.WriteLine(lastErrorMessage);  
                 EventLog.WriteEntry("DaoMqPump2", lastErrorMessage, EventLogEntryType.Warning);  
             }  
392          }          }
393    
394          private string buildMysqlConnString()          private string buildMysqlConnString()
# Line 264  namespace DaoMqPump2 Line 399  namespace DaoMqPump2
399              //connectionString += "DATABASE=" + controller.mysqlHost + ";";              //connectionString += "DATABASE=" + controller.mysqlHost + ";";
400              connectionString += "UID=" + controller.mysqlUser + ";";              connectionString += "UID=" + controller.mysqlUser + ";";
401              connectionString += "PASSWORD=" + controller.mysqlPassword + ";";              connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
402                connectionString += "Max Pool Size=20;";
403                //connectionString += "ConnectionReset=true;";//ConnectionReset kræver muligvis at Default Database er angivet - det virker ihvertfald ikke uden
404    
405              return connectionString;              return connectionString;
406          }          }
# Line 277  namespace DaoMqPump2 Line 414  namespace DaoMqPump2
414              return connProperties;              return connProperties;
415          }          }
416    
417          private string getTransactionlogFilename()  
418    
419            private string getLogFilename(LogfileType type)
420          {          {
421    
422              DateTime now = DateTime.Now;              DateTime now = DateTime.Now;
423              string filename = controller.logDirectory + "\\";              string filename = controller.logDirectory + "\\";
424              filename += "transactionlog_" + name + "_" + now.Year.ToString("D4") + now.Month.ToString("D2") + ".log";  
425                //Find uge nr
426                DateTimeFormatInfo dfi = DateTimeFormatInfo.CurrentInfo;
427                Calendar myCal = CultureInfo.InvariantCulture.Calendar;//System.Globalization
428                int week = myCal.GetWeekOfYear(now, dfi.CalendarWeekRule, dfi.FirstDayOfWeek);
429    
430                switch (type)
431                {
432                    case LogfileType.LogEvents:
433                        filename += "eventlog_";
434                        break;
435    
436                    case LogfileType.LogTransactions:
437                        filename += "transactionlog_";
438                        break;
439                    case LogfileType.LogDiscarded:
440                        filename += "discardedlog_";
441                        break;
442                }
443    
444    
445                filename += name + "_" + now.Year.ToString("D4") + "_W" + week.ToString("D2") + ".log";
446    
447              return filename;              return filename;
448          }          }
# Line 304  namespace DaoMqPump2 Line 464  namespace DaoMqPump2
464                  if (logEntries.Count > 20)                  if (logEntries.Count > 20)
465                  {                  {
466                      logEntries.RemoveLast();                      logEntries.RemoveLast();
467                  }                  }                
468                                }
469    
470                string filename = getLogFilename(LogfileType.LogEvents);
471                using (StreamWriter eventlog = new StreamWriter(filename, true))
472                {
473                    eventlog.WriteLine(msg);
474              }              }
475          }          }
476    

Legend:
Removed from v.1986  
changed lines
  Added in v.2058

  ViewVC Help
Powered by ViewVC 1.1.20