/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Diff of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 2001 by torben, Mon Jul 8 14:28:01 2013 UTC revision 2177 by torben, Mon May 19 19:51:47 2014 UTC
# Line 7  using System.Diagnostics; Line 7  using System.Diagnostics;
7    
8  using IBM.WMQ;  using IBM.WMQ;
9  using MySql.Data.MySqlClient;  using MySql.Data.MySqlClient;
10    using System.Globalization;
11    
12    using DaoCommon;
13    
14  namespace DaoMqPump2  namespace DaoMqPump2
15  {  {
16      public class Transport      public class Transport
17      {      {
18    
         enum LogfileType {  
             LogTransactions,  
             LogEvents  
         }  
   
19          public static string SQL2MQ = "sql2mq";          public static string SQL2MQ = "sql2mq";
20          public static string MQ2SQL = "mq2sql";          public static string MQ2SQL = "mq2sql";
21    
# Line 27  namespace DaoMqPump2 Line 25  namespace DaoMqPump2
25    
26          StatusData statusData = new StatusData();          StatusData statusData = new StatusData();
27    
28            public Logfile logfile { get; private set; }
29    
30          public string name { get; private set; }          public string name { get; private set; }
31          public string direction { get; private set; }          public string direction { get; private set; }
32          public string queueName { get; private set; }          public string queueName { get; private set; }
# Line 62  namespace DaoMqPump2 Line 62  namespace DaoMqPump2
62                  statusData.transportEnabled = value;                  statusData.transportEnabled = value;
63                  if (value == true)                  if (value == true)
64                  {                  {
65                      this.addLogEntry("Transport enabled");                      logfile.addSingleLogEntry("Transport enabled");
66                  }                  }
67                  else                  else
68                  {                  {
69                      this.addLogEntry("Transport disabled");                      logfile.addSingleLogEntry("Transport disabled");
70                  }                  }
71              }              }
72          }          }
73    
74    
75          private LinkedList<string> logEntries = new LinkedList<string>();  
76    
77    
78          public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)          public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)
# Line 92  namespace DaoMqPump2 Line 92  namespace DaoMqPump2
92              statusData.counter = 0;              statusData.counter = 0;
93              statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";              statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";
94    
95              addLogEntry( "Starting ... " );              logfile = new Logfile(LogfileType.LogEvents,  name, controller.logDirectory);
96                logfile.addSingleLogEntry("Starting ... ");
97          }          }
98    
99          ~Transport()          ~Transport()
100          {          {
101              addLogEntry("Stopping ... ");              logfile.addSingleLogEntry("Stopping ... ");
102          }          }
103    
104    
# Line 122  namespace DaoMqPump2 Line 123  namespace DaoMqPump2
123    
124              if (statusData.lastrunOk == true)              if (statusData.lastrunOk == true)
125              {              {
126                  statusData.lastOkTime = getNowString();                  statusData.lastOkTime = Logfile.getNowString();
127    
128                  if (statusData.counter != startCounter)                  if (statusData.counter != startCounter)
129                  {                  {
130                      //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet                      //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet
131                      statusData.lastTransferTime = getNowString();                      statusData.lastTransferTime = Logfile.getNowString();
132                  }                  }
133              }              }
134              else              else
135              {              {
136                  addLogEntry(statusData.lastErrorMessage);                  logfile.addSingleLogEntry(statusData.lastErrorMessage);
137                  statusData.lastErrorTime = getNowString();                  statusData.lastErrorTime = Logfile.getNowString();
138              }              }
139          }          }
140    
141          private void transportSql2Mq()          private void transportSql2Mq()
142          {          {
143              string filename = getLogFilename(LogfileType.LogTransactions);              MQQueueManager mqMgr = null;
144                MQQueue out_queue = null;
145    
146                string filename = Logfile.getLogFilename(LogfileType.LogTransactions, controller.logDirectory, name);
147              using (StreamWriter translog = new StreamWriter(filename, true) )              using (StreamWriter translog = new StreamWriter(filename, true) )
148              try              try
149              {              {
150                  //stage 1 connect to mq                  //MQ Options
151                  Hashtable connProps = getConnectionProperties();                  Hashtable connProps = MQHelper.getConnectionProperties(controller.mqHost, controller.mqChannel);
                 MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);  
152                  int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;                  int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
153    
154                  MQQueue out_queue = mqMgr.AccessQueue(queueName, openOptions);                  //MySQL Options
   
   
                 //stage 2 connect to mysql  
155                  string mysqlString = buildMysqlConnString();                  string mysqlString = buildMysqlConnString();
                 MySqlConnection sqlConnection = new MySqlConnection(mysqlString);  
                 sqlConnection.Open();  
   
156    
157                  //stage 3 move messages                  //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
158                  string readSql = "CALL " + sql2mqReadQuery + "()";                  mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
159                  MySqlCommand readCmd = new MySqlCommand(readSql, sqlConnection);                  out_queue = mqMgr.AccessQueue(queueName, openOptions);
160                  MySqlDataReader dataReader = readCmd.ExecuteReader();                  using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
161                  while (dataReader.Read())                  using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
162                  {                  {
163                      int id = dataReader.GetInt32(0);                      sqlReadConnection.Open();
164                      string msgString = dataReader.GetString(1);                      sqlWriteConnection.Open();
165    
166                      MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,                      //stage 3 move messages
167                      // same as MQPMO_DEFAULT                      string readSql = "CALL " + sql2mqReadQuery + "()";
168                        MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);
169                        MySqlDataReader dataReader = readCmd.ExecuteReader();
170                        while (dataReader.Read())
171                        {
172                            int id = dataReader.GetInt32(0);
173                            string msgString = dataReader.GetString(1);
174    
175                      MQMessage msg = new MQMessage();                          MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
176                      msg.Format = MQC.MQFMT_STRING;                          // same as MQPMO_DEFAULT
                     msg.CharacterSet = 1252;  
                     msg.WriteString(msgString);  
177    
178                      out_queue.Put(msg, pmo);                          MQMessage msg = new MQMessage();
179                            msg.Format = MQC.MQFMT_STRING;
180                            msg.CharacterSet = 1252;
181                            msg.WriteString(msgString);
182    
183                      //now that the message has been put on queue mark it as such                          out_queue.Put(msg, pmo);
184    
185                      string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";                          //now that the message has been put on queue mark it as such
                     MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlConnection);  
                     int numrows = updateCmd.ExecuteNonQuery();  
186    
187                      translog.WriteLine(getNowString() + " " + msgString + "\n");                          string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
188                            MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
189                            int numrows = updateCmd.ExecuteNonQuery();
190    
191                      if (numrows != 1)                          translog.WriteLine(Logfile.getNowString() + " " + msgString);
192                      {  
193                          break;                          if (numrows != 1)
194                            {
195                                break;
196                            }
197                            statusData.counter++;
198                      }                      }
                     statusData.counter++;  
                 }  
                   
199    
200                  //stage 4: everything went smooth so clean up afterwards                  }
                 dataReader.Close();  
                 out_queue.Close();  
                 mqMgr.Close();  
                 sqlConnection.Close();  
201              }              }
202              catch (Exception e)              catch (Exception e)
203              {              {
204                  statusData.lastrunOk = false;                  statusData.lastrunOk = false;
205                  statusData.lastErrorMessage = name + ".transportSql2Mq error: " + e.Message;                  statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
206                  Console.WriteLine(statusData.lastErrorMessage);                  Console.WriteLine(statusData.lastErrorMessage);
207                    Console.WriteLine(e.StackTrace);
208                  EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);                  EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
209              }              }
210                finally
211                {
212                    MQHelper.closeQueueSafe(out_queue);
213                    MQHelper.closeQueueManagerSafe(mqMgr);
214    
215    
216                }
217          }          }
218    
219          private void transportMq2Sql()          private void transportMq2Sql()
220          {          {
221              string filename = getLogFilename(LogfileType.LogTransactions);              int messageCount = 0;
             using (StreamWriter translog = new StreamWriter(filename, true))  
             try  
             {  
                 //stage 1 connect to mq  
                 Hashtable connProps = getConnectionProperties();  
                 MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);  
                 int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;  
222    
223                  MQQueue in_queue = mqMgr.AccessQueue(queueName, openOptions);              MQQueueManager mqMgr = null;
224                MQQueue in_queue = null;
225                string filename = Logfile.getLogFilename(LogfileType.LogTransactions, controller.logDirectory, name);
226                using (StreamWriter translog = new StreamWriter(filename, true))
227                    try
228                    {
229                        //MQ options
230                        Hashtable connProps = MQHelper.getConnectionProperties(controller.mqHost, controller.mqChannel);
231                        int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
232    
233                        //MySQL options
234                        string mysqlString = buildMysqlConnString();
235    
236                        //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
237                        mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
238                        in_queue = mqMgr.AccessQueue(queueName, openOptions);
239                        using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
240                        {
241    
242                            sqlConnection.Open();
243    
                 //stage 2 connect to mysql  
                 string mysqlString = buildMysqlConnString();  
                 MySqlConnection sqlConnection = new MySqlConnection(mysqlString);  
                 sqlConnection.Open();  
244    
245                            //stage 3 move messages
246                            bool isContinue = true;
247                            while (isContinue)
248                            {
249    
250                  //stage 3 move messages                              MQMessage mqMsg = new MQMessage();
251                  bool isContinue = true;                              MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
                 while (isContinue)  
                 {  
                       
                     MQMessage mqMsg = new MQMessage();  
                     MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();  
252    
253                      mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen                              mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
254    
255                      try                              try
256                      {                              {
257                          in_queue.Get(mqMsg, mqGetMsgOpts);                                  in_queue.Get(mqMsg, mqGetMsgOpts);
258                          if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)                                  if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
259                          {                                                              {
260                              string msgString = mqMsg.ReadString(mqMsg.MessageLength);                                      string msgString = mqMsg.ReadString(mqMsg.MessageLength);
261                              System.Console.WriteLine(msgString);                                      //System.Console.WriteLine(msgString);
262    
263    
264                                        // Hvis transaktionen starter med et ? er det ikke en gyldig transaktion
265                                        // validér ligeledes at headeren er gyldig
266                                        if ( Salt2Helper.validateSalt2Header(msgString) == false )
267                                        {
268                                            string discarded_filename = Logfile.getLogFilename(LogfileType.LogTransactions, controller.logDirectory, name);
269                                            using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))
270                                            {
271                                                discardedlog.WriteLine(Logfile.getNowString() + " " + msgString);
272                                            }
273                                            mqMgr.Commit();//fjern den afviste transaktion fra køen
274                                            statusData.discardedCounter++;
275                                            continue; //gå frem til at tage næste transaktion fra køen
276                                        }
277    
278    
279                                        string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
280    
281                                        MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
282                                        int numrows = sqlcmd.ExecuteNonQuery();
283    
284                                        if (numrows == 1)
285                                        {
286                                            translog.WriteLine(Logfile.getNowString() + " " + msgString);
287                                            mqMgr.Commit();
288                                            statusData.counter++;
289    
290    
291                                            messageCount++;// increment per run message counter
292                                            if (messageCount >= 10000) // if we have moved  10000 messages in this run - let it go and give the other transports a change
293                                            {
294                                                isContinue = false;
295                                            }
296    
                             string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng  
297    
                             MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);  
                             int numrows = sqlcmd.ExecuteNonQuery();  
298    
299                              if (numrows == 1)                                      }
300                              {                                      else
301                                  translog.WriteLine(getNowString() + " " + msgString + "\n");                                      {
302                                  mqMgr.Commit();                                          mqMgr.Backout();
303                                  statusData.counter++;                                          isContinue = false;
304                                        }
305    
306                                    }
307                                    else
308                                    {
309                                        System.Console.WriteLine("Non-text message");
310                                    }
311                              }                              }
312                              else                              catch (MQException mqe)
313                              {                              {
                                 mqMgr.Backout();  
314                                  isContinue = false;                                  isContinue = false;
315                              }                              
316                                    // report reason, if any
317                                    if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
318                                    {
319                                        // special report for normal end
320                                        System.Console.WriteLine("no more messages");
321                                    }
322                                    else
323                                    {
324                                        // general report for other reasons
325                                        System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
326                                        statusData.lastrunOk = false;
327                                    }
328    
329                                }
330    
331    
332                          }                          }
333                          else  
                         {  
                             System.Console.WriteLine("Non-text message");  
                         }  
334                      }                      }
                     catch (MQException mqe)  
                     {  
                         isContinue = false;  
335    
336                          // report reason, if any                  }
337                          if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)                  catch (Exception e)
338                          {                  {
339                              // special report for normal end                      //Det vil være mest korrekt at Rollback/backout MQ Transaktionen her - for at være sikker på at Message'n fjernes fra køen
340                              System.Console.WriteLine("no more messages");                      try
341                          }                      {
342                          else                          if (mqMgr != null)
343                          {                          {
344                              // general report for other reasons                              mqMgr.Backout();
                             System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message);;  
                             statusData.lastrunOk = false;  
345                          }                          }
346                        }
347                        catch (Exception e2)
348                        {
349                            logfile.addSingleLogEntry("Error backing out from MQ Transaction: " + e2.Message);
350                      }                      }
351    
352                        statusData.lastrunOk = false;
353    
354                        statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
355                        Console.WriteLine(statusData.lastErrorMessage);
356                        Console.WriteLine(e.StackTrace);
357                        EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
358                  }                  }
359                    finally
360                    {
361    
362                  //stage 4: everything went smooth so clean up afterwards                      MQHelper.closeQueueSafe(in_queue);
363                  in_queue.Close();                      MQHelper.closeQueueManagerSafe(mqMgr);
                 mqMgr.Close();  
                 sqlConnection.Close();  
   
   
             }  
             catch (Exception e)  
             {  
                 statusData.lastrunOk = false;  
364    
365                  statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.Message;                  }
                 Console.WriteLine(statusData.lastErrorMessage);  
                 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);  
             }  
366          }          }
367    
368          private string buildMysqlConnString()          private string buildMysqlConnString()
# Line 315  namespace DaoMqPump2 Line 373  namespace DaoMqPump2
373              //connectionString += "DATABASE=" + controller.mysqlHost + ";";              //connectionString += "DATABASE=" + controller.mysqlHost + ";";
374              connectionString += "UID=" + controller.mysqlUser + ";";              connectionString += "UID=" + controller.mysqlUser + ";";
375              connectionString += "PASSWORD=" + controller.mysqlPassword + ";";              connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
376                connectionString += "Max Pool Size=20;";
377                //connectionString += "ConnectionReset=true;";//ConnectionReset kræver muligvis at Default Database er angivet - det virker ihvertfald ikke uden
378    
379              return connectionString;              return connectionString;
380          }          }
381    
         private Hashtable getConnectionProperties()  
         {  
             Hashtable connProperties = new Hashtable();  
             connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);  
             connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);  
             connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel); //WARNING: Hardcoded Channel Value !!!  
             return connProperties;  
         }  
   
   
   
         private string getLogFilename(LogfileType type)  
         {  
   
             DateTime now = DateTime.Now;  
             string filename = controller.logDirectory + "\\";  
   
             switch (type)  
             {  
                 case LogfileType.LogEvents:  
                     filename += "eventlog_";  
                     break;  
   
                 case LogfileType.LogTransactions:  
                     filename += "transactionlog_";  
                     break;  
             }  
   
   
             filename += "transactionlog_" + name + "_" + now.Year.ToString("D4") + now.Month.ToString("D2") + ".log";  
   
             return filename;  
         }  
   
         public string getNowString()  
         {  
             DateTime now = DateTime.Now;  
   
             return now.ToString("s");  
         }  
   
         private void addLogEntry(string msg)  
         {  
             msg = getNowString() + " " + msg;  
             lock (logEntries)  
             {  
                 logEntries.AddFirst(msg);  
   
                 if (logEntries.Count > 20)  
                 {  
                     logEntries.RemoveLast();  
                 }                  
             }  
   
             string filename = getLogFilename(LogfileType.LogEvents);  
             using (StreamWriter eventlog = new StreamWriter(filename, true))  
             {  
                 eventlog.WriteLine(msg);  
             }  
         }  
   
         public string[] getLog()  
         {  
             lock(logEntries)  
             {  
                 List<string> tmpEntries = new List<string>();  
                 foreach (string s in logEntries)  
                 {  
                     tmpEntries.Add(s);  
                 }  
                 return tmpEntries.ToArray();  
             }  
         }  
382    
383      }      }
384  }  }

Legend:
Removed from v.2001  
changed lines
  Added in v.2177

  ViewVC Help
Powered by ViewVC 1.1.20