/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Diff of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 2011 by torben, Wed Jul 10 20:20:21 2013 UTC revision 2172 by torben, Sat May 17 10:53:58 2014 UTC
# Line 7  using System.Diagnostics; Line 7  using System.Diagnostics;
7    
8  using IBM.WMQ;  using IBM.WMQ;
9  using MySql.Data.MySqlClient;  using MySql.Data.MySqlClient;
10    using System.Globalization;
11    
12    using DaoCommon;
13    
14  namespace DaoMqPump2  namespace DaoMqPump2
15  {  {
16      public class Transport      public class Transport
17      {      {
18    
         enum LogfileType {  
             LogTransactions,  
             LogEvents  
         }  
   
19          public static string SQL2MQ = "sql2mq";          public static string SQL2MQ = "sql2mq";
20          public static string MQ2SQL = "mq2sql";          public static string MQ2SQL = "mq2sql";
21    
# Line 27  namespace DaoMqPump2 Line 25  namespace DaoMqPump2
25    
26          StatusData statusData = new StatusData();          StatusData statusData = new StatusData();
27    
28            public Logfile logfile { get; private set; }
29    
30          public string name { get; private set; }          public string name { get; private set; }
31          public string direction { get; private set; }          public string direction { get; private set; }
32          public string queueName { get; private set; }          public string queueName { get; private set; }
# Line 62  namespace DaoMqPump2 Line 62  namespace DaoMqPump2
62                  statusData.transportEnabled = value;                  statusData.transportEnabled = value;
63                  if (value == true)                  if (value == true)
64                  {                  {
65                      this.addLogEntry("Transport enabled");                      logfile.addSingleLogEntry("Transport enabled");
66                  }                  }
67                  else                  else
68                  {                  {
69                      this.addLogEntry("Transport disabled");                      logfile.addSingleLogEntry("Transport disabled");
70                  }                  }
71              }              }
72          }          }
73    
74    
75          private LinkedList<string> logEntries = new LinkedList<string>();  
76    
77    
78          public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)          public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)
# Line 92  namespace DaoMqPump2 Line 92  namespace DaoMqPump2
92              statusData.counter = 0;              statusData.counter = 0;
93              statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";              statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";
94    
95              addLogEntry( "Starting ... " );              logfile = new Logfile(name, controller.logDirectory);
96                logfile.addSingleLogEntry("Starting ... ");
97          }          }
98    
99          ~Transport()          ~Transport()
100          {          {
101              addLogEntry("Stopping ... ");              logfile.addSingleLogEntry("Stopping ... ");
102          }          }
103    
104    
# Line 122  namespace DaoMqPump2 Line 123  namespace DaoMqPump2
123    
124              if (statusData.lastrunOk == true)              if (statusData.lastrunOk == true)
125              {              {
126                  statusData.lastOkTime = getNowString();                  statusData.lastOkTime = Logfile.getNowString();
127    
128                  if (statusData.counter != startCounter)                  if (statusData.counter != startCounter)
129                  {                  {
130                      //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet                      //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet
131                      statusData.lastTransferTime = getNowString();                      statusData.lastTransferTime = Logfile.getNowString();
132                  }                  }
133              }              }
134              else              else
135              {              {
136                  addLogEntry(statusData.lastErrorMessage);                  logfile.addSingleLogEntry(statusData.lastErrorMessage);
137                  statusData.lastErrorTime = getNowString();                  statusData.lastErrorTime = Logfile.getNowString();
138              }              }
139          }          }
140    
141          private void transportSql2Mq()          private void transportSql2Mq()
142          {          {
143              string filename = getLogFilename(LogfileType.LogTransactions);              MQQueueManager mqMgr = null;
144                MQQueue out_queue = null;
145    
146                string filename = Logfile.getLogFilename(LogfileType.LogTransactions, controller.logDirectory, name);
147              using (StreamWriter translog = new StreamWriter(filename, true) )              using (StreamWriter translog = new StreamWriter(filename, true) )
148              try              try
149              {              {
150                  //MQ Options                  //MQ Options
151                  Hashtable connProps = getConnectionProperties();                  Hashtable connProps = MQHelper.getConnectionProperties(controller.mqHost, controller.mqChannel);
152                  int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;                  int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
153    
154                  //MySQL Options                  //MySQL Options
155                  string mysqlString = buildMysqlConnString();                  string mysqlString = buildMysqlConnString();
                   
156    
157                  using (MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps))//stage 1 connect to mq                  //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
158                  using (MQQueue out_queue = mqMgr.AccessQueue(queueName, openOptions))                  mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
159                    out_queue = mqMgr.AccessQueue(queueName, openOptions);
160                  using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql                  using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
161                  using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))                  using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
162                  {                  {
# Line 184  namespace DaoMqPump2 Line 188  namespace DaoMqPump2
188                          MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);                          MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
189                          int numrows = updateCmd.ExecuteNonQuery();                          int numrows = updateCmd.ExecuteNonQuery();
190    
191                          translog.WriteLine(getNowString() + " " + msgString + "\n");                          translog.WriteLine(Logfile.getNowString() + " " + msgString);
192    
193                          if (numrows != 1)                          if (numrows != 1)
194                          {                          {
# Line 198  namespace DaoMqPump2 Line 202  namespace DaoMqPump2
202              catch (Exception e)              catch (Exception e)
203              {              {
204                  statusData.lastrunOk = false;                  statusData.lastrunOk = false;
205                  statusData.lastErrorMessage = name + ".transportSql2Mq error: " + e.Message;                  statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
206                  Console.WriteLine(statusData.lastErrorMessage);                  Console.WriteLine(statusData.lastErrorMessage);
207                    Console.WriteLine(e.StackTrace);
208                  EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);                  EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
209              }              }
210                finally
211                {
212                    MQHelper.closeQueueSafe(out_queue);
213                    MQHelper.closeQueueManagerSafe(mqMgr);
214    
215    
216                }
217          }          }
218    
219          private void transportMq2Sql()          private void transportMq2Sql()
220          {          {
221              string filename = getLogFilename(LogfileType.LogTransactions);              int messageCount = 0;
             using (StreamWriter translog = new StreamWriter(filename, true))  
             try  
             {  
                 //MQ options  
                 Hashtable connProps = getConnectionProperties();                  
                 int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;  
   
                 //MySQL options  
                 string mysqlString = buildMysqlConnString();  
222    
223                  using (MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps))//stage 1 connect to mq              MQQueueManager mqMgr = null;
224                  using (MQQueue in_queue = mqMgr.AccessQueue(queueName, openOptions) )              MQQueue in_queue = null;
225                  using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql              string filename = Logfile.getLogFilename(LogfileType.LogTransactions, controller.logDirectory, name);
226                using (StreamWriter translog = new StreamWriter(filename, true))
227                    try
228                  {                  {
229                        //MQ options
230                        Hashtable connProps = MQHelper.getConnectionProperties(controller.mqHost, controller.mqChannel);
231                        int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
232    
233                        //MySQL options
234                        string mysqlString = buildMysqlConnString();
235    
236                        //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
237                        mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
238                        in_queue = mqMgr.AccessQueue(queueName, openOptions);
239                        using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
240                        {
241    
242                      sqlConnection.Open();                          sqlConnection.Open();
243    
244    
245                      //stage 3 move messages                          //stage 3 move messages
246                      bool isContinue = true;                          bool isContinue = true;
247                      while (isContinue)                          while (isContinue)
248                      {                          {
249    
250                          MQMessage mqMsg = new MQMessage();                              MQMessage mqMsg = new MQMessage();
251                          MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();                              MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
252    
253                          mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen                              mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
254    
255                          try                              try
                         {  
                             in_queue.Get(mqMsg, mqGetMsgOpts);  
                             if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)  
256                              {                              {
257                                  string msgString = mqMsg.ReadString(mqMsg.MessageLength);                                  in_queue.Get(mqMsg, mqGetMsgOpts);
258                                  System.Console.WriteLine(msgString);                                  if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
259                                    {
260                                        string msgString = mqMsg.ReadString(mqMsg.MessageLength);
261                                        //System.Console.WriteLine(msgString);
262    
                                 string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng  
263    
264                                  MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);                                      // Hvis transaktionen starter med et ? er det ikke en gyldig transaktion
265                                  int numrows = sqlcmd.ExecuteNonQuery();                                      // validér ligeledes at headeren er gyldig
266                                        if ( Salt2Helper.validateSalt2Header(msgString) == false )
267                                        {
268                                            string discarded_filename = Logfile.getLogFilename(LogfileType.LogTransactions, controller.logDirectory, name);
269                                            using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))
270                                            {
271                                                discardedlog.WriteLine(Logfile.getNowString() + " " + msgString);
272                                            }
273                                            mqMgr.Commit();//fjern den afviste transaktion fra køen
274                                            statusData.discardedCounter++;
275                                            continue; //gå frem til at tage næste transaktion fra køen
276                                        }
277    
278    
279                                        string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
280    
281                                        MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
282                                        int numrows = sqlcmd.ExecuteNonQuery();
283    
284                                        if (numrows == 1)
285                                        {
286                                            translog.WriteLine(Logfile.getNowString() + " " + msgString);
287                                            mqMgr.Commit();
288                                            statusData.counter++;
289    
290    
291                                            messageCount++;// increment per run message counter
292                                            if (messageCount >= 10000) // if we have moved  10000 messages in this run - let it go and give the other transports a change
293                                            {
294                                                isContinue = false;
295                                            }
296    
297    
298    
299                                        }
300                                        else
301                                        {
302                                            mqMgr.Backout();
303                                            isContinue = false;
304                                        }
305    
                                 if (numrows == 1)  
                                 {  
                                     translog.WriteLine(getNowString() + " " + msgString + "\n");  
                                     mqMgr.Commit();  
                                     statusData.counter++;  
306                                  }                                  }
307                                  else                                  else
308                                  {                                  {
309                                      mqMgr.Backout();                                      System.Console.WriteLine("Non-text message");
                                     isContinue = false;  
310                                  }                                  }
   
311                              }                              }
312                              else                              catch (MQException mqe)
313                              {                              {
314                                  System.Console.WriteLine("Non-text message");                                  isContinue = false;
315                              }  
316                          }                                  // report reason, if any
317                          catch (MQException mqe)                                  if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
318                          {                                  {
319                              isContinue = false;                                      // special report for normal end
320                                        System.Console.WriteLine("no more messages");
321                                    }
322                                    else
323                                    {
324                                        // general report for other reasons
325                                        System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
326                                        statusData.lastrunOk = false;
327                                    }
328    
                             // report reason, if any  
                             if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)  
                             {  
                                 // special report for normal end  
                                 System.Console.WriteLine("no more messages");  
                             }  
                             else  
                             {  
                                 // general report for other reasons  
                                 System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;  
                                 statusData.lastrunOk = false;  
329                              }                              }
330    
331    
332                          }                          }
333    
334                        }
335    
336                    }
337                    catch (Exception e)
338                    {
339                        //Det vil være mest korrekt at Rollback/backout MQ Transaktionen her - for at være sikker på at Message'n fjernes fra køen
340                        try
341                        {
342                            if (mqMgr != null)
343                            {
344                                mqMgr.Backout();
345                            }
346                      }                      }
347                        catch (Exception e2)
348                        {
349                            logfile.addSingleLogEntry("Error backing out from MQ Transaction: " + e2.Message);
350                        }
351    
352                        statusData.lastrunOk = false;
353    
354                        statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
355                        Console.WriteLine(statusData.lastErrorMessage);
356                        Console.WriteLine(e.StackTrace);
357                        EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
358                  }                  }
359                    finally
360                    {
361    
362              }                      MQHelper.closeQueueSafe(in_queue);
363              catch (Exception e)                      MQHelper.closeQueueManagerSafe(mqMgr);
             {  
                 statusData.lastrunOk = false;  
364    
365                  statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.Message;                  }
                 Console.WriteLine(statusData.lastErrorMessage);  
                 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);  
             }  
366          }          }
367    
368          private string buildMysqlConnString()          private string buildMysqlConnString()
# Line 309  namespace DaoMqPump2 Line 373  namespace DaoMqPump2
373              //connectionString += "DATABASE=" + controller.mysqlHost + ";";              //connectionString += "DATABASE=" + controller.mysqlHost + ";";
374              connectionString += "UID=" + controller.mysqlUser + ";";              connectionString += "UID=" + controller.mysqlUser + ";";
375              connectionString += "PASSWORD=" + controller.mysqlPassword + ";";              connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
376                connectionString += "Max Pool Size=20;";
377                //connectionString += "ConnectionReset=true;";//ConnectionReset kræver muligvis at Default Database er angivet - det virker ihvertfald ikke uden
378    
379              return connectionString;              return connectionString;
380          }          }
381    
         private Hashtable getConnectionProperties()  
         {  
             Hashtable connProperties = new Hashtable();  
             connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);  
             connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);  
             connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel); //WARNING: Hardcoded Channel Value !!!  
             return connProperties;  
         }  
   
   
   
         private string getLogFilename(LogfileType type)  
         {  
   
             DateTime now = DateTime.Now;  
             string filename = controller.logDirectory + "\\";  
   
             switch (type)  
             {  
                 case LogfileType.LogEvents:  
                     filename += "eventlog_";  
                     break;  
   
                 case LogfileType.LogTransactions:  
                     filename += "transactionlog_";  
                     break;  
             }  
   
   
             filename += name + "_" + now.Year.ToString("D4") + now.Month.ToString("D2") + ".log";  
   
             return filename;  
         }  
   
         public string getNowString()  
         {  
             DateTime now = DateTime.Now;  
   
             return now.ToString("s");  
         }  
   
         private void addLogEntry(string msg)  
         {  
             msg = getNowString() + " " + msg;  
             lock (logEntries)  
             {  
                 logEntries.AddFirst(msg);  
   
                 if (logEntries.Count > 20)  
                 {  
                     logEntries.RemoveLast();  
                 }                  
             }  
   
             string filename = getLogFilename(LogfileType.LogEvents);  
             using (StreamWriter eventlog = new StreamWriter(filename, true))  
             {  
                 eventlog.WriteLine(msg);  
             }  
         }  
   
         public string[] getLog()  
         {  
             lock(logEntries)  
             {  
                 List<string> tmpEntries = new List<string>();  
                 foreach (string s in logEntries)  
                 {  
                     tmpEntries.Add(s);  
                 }  
                 return tmpEntries.ToArray();  
             }  
         }  
382    
383      }      }
384  }  }

Legend:
Removed from v.2011  
changed lines
  Added in v.2172

  ViewVC Help
Powered by ViewVC 1.1.20