/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Diff of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 2051 by torben, Fri Aug 23 20:03:47 2013 UTC revision 2168 by torben, Fri May 16 20:56:22 2014 UTC
# Line 7  using System.Diagnostics; Line 7  using System.Diagnostics;
7    
8  using IBM.WMQ;  using IBM.WMQ;
9  using MySql.Data.MySqlClient;  using MySql.Data.MySqlClient;
10    using System.Globalization;
11    
12    using DaoCommon;
13    
14  namespace DaoMqPump2  namespace DaoMqPump2
15  {  {
16      public class Transport      public class Transport
17      {      {
18    
         enum LogfileType {  
             LogTransactions,  
             LogEvents  
         }  
   
19          public static string SQL2MQ = "sql2mq";          public static string SQL2MQ = "sql2mq";
20          public static string MQ2SQL = "mq2sql";          public static string MQ2SQL = "mq2sql";
21    
# Line 27  namespace DaoMqPump2 Line 25  namespace DaoMqPump2
25    
26          StatusData statusData = new StatusData();          StatusData statusData = new StatusData();
27    
28            public Logfile logfile { get; private set; }
29    
30          public string name { get; private set; }          public string name { get; private set; }
31          public string direction { get; private set; }          public string direction { get; private set; }
32          public string queueName { get; private set; }          public string queueName { get; private set; }
# Line 62  namespace DaoMqPump2 Line 62  namespace DaoMqPump2
62                  statusData.transportEnabled = value;                  statusData.transportEnabled = value;
63                  if (value == true)                  if (value == true)
64                  {                  {
65                      this.addLogEntry("Transport enabled");                      logfile.addSingleLogEntry("Transport enabled");
66                  }                  }
67                  else                  else
68                  {                  {
69                      this.addLogEntry("Transport disabled");                      logfile.addSingleLogEntry("Transport disabled");
70                  }                  }
71              }              }
72          }          }
73    
74    
75          private LinkedList<string> logEntries = new LinkedList<string>();  
76    
77    
78          public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)          public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)
# Line 92  namespace DaoMqPump2 Line 92  namespace DaoMqPump2
92              statusData.counter = 0;              statusData.counter = 0;
93              statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";              statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";
94    
95              addLogEntry( "Starting ... " );              logfile = new Logfile(name, controller.logDirectory);
96                logfile.addSingleLogEntry("Starting ... ");
97          }          }
98    
99          ~Transport()          ~Transport()
100          {          {
101              addLogEntry("Stopping ... ");              logfile.addSingleLogEntry("Stopping ... ");
102          }          }
103    
104    
# Line 122  namespace DaoMqPump2 Line 123  namespace DaoMqPump2
123    
124              if (statusData.lastrunOk == true)              if (statusData.lastrunOk == true)
125              {              {
126                  statusData.lastOkTime = getNowString();                  statusData.lastOkTime = Logfile.getNowString();
127    
128                  if (statusData.counter != startCounter)                  if (statusData.counter != startCounter)
129                  {                  {
130                      //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet                      //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet
131                      statusData.lastTransferTime = getNowString();                      statusData.lastTransferTime = Logfile.getNowString();
132                  }                  }
133              }              }
134              else              else
135              {              {
136                  addLogEntry(statusData.lastErrorMessage);                  logfile.addSingleLogEntry(statusData.lastErrorMessage);
137                  statusData.lastErrorTime = getNowString();                  statusData.lastErrorTime = Logfile.getNowString();
138              }              }
139          }          }
140    
# Line 142  namespace DaoMqPump2 Line 143  namespace DaoMqPump2
143              MQQueueManager mqMgr = null;              MQQueueManager mqMgr = null;
144              MQQueue out_queue = null;              MQQueue out_queue = null;
145    
146              string filename = getLogFilename(LogfileType.LogTransactions);              string filename = Logfile.getLogFilename(LogfileType.LogTransactions, controller.logDirectory, name);
147              using (StreamWriter translog = new StreamWriter(filename, true) )              using (StreamWriter translog = new StreamWriter(filename, true) )
148              try              try
149              {              {
# Line 187  namespace DaoMqPump2 Line 188  namespace DaoMqPump2
188                          MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);                          MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
189                          int numrows = updateCmd.ExecuteNonQuery();                          int numrows = updateCmd.ExecuteNonQuery();
190    
191                          translog.WriteLine(getNowString() + " " + msgString + "\n");                          translog.WriteLine(Logfile.getNowString() + " " + msgString);
192    
193                          if (numrows != 1)                          if (numrows != 1)
194                          {                          {
# Line 238  namespace DaoMqPump2 Line 239  namespace DaoMqPump2
239    
240          private void transportMq2Sql()          private void transportMq2Sql()
241          {          {
242                int messageCount = 0;
243    
244              MQQueueManager mqMgr = null;              MQQueueManager mqMgr = null;
245              MQQueue in_queue = null;              MQQueue in_queue = null;
246              string filename = getLogFilename(LogfileType.LogTransactions);              string filename = Logfile.getLogFilename(LogfileType.LogTransactions, controller.logDirectory, name);
247              using (StreamWriter translog = new StreamWriter(filename, true))              using (StreamWriter translog = new StreamWriter(filename, true))
248                  try                  try
249                  {                  {
# Line 276  namespace DaoMqPump2 Line 279  namespace DaoMqPump2
279                                  if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)                                  if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
280                                  {                                  {
281                                      string msgString = mqMsg.ReadString(mqMsg.MessageLength);                                      string msgString = mqMsg.ReadString(mqMsg.MessageLength);
282                                      System.Console.WriteLine(msgString);                                      //System.Console.WriteLine(msgString);
283    
284    
285                                        // Hvis transaktionen starter med et ? er det ikke en gyldig transaktion
286                                        // validér ligeledes at headeren er gyldig
287                                        if ( msgString.StartsWith("?") || DaoUtil.validateSalt2Header(msgString) == false )
288                                        {
289                                            string discarded_filename = Logfile.getLogFilename(LogfileType.LogTransactions, controller.logDirectory, name);
290                                            using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))
291                                            {
292                                                discardedlog.WriteLine(Logfile.getNowString() + " " + msgString);
293                                            }
294                                            mqMgr.Commit();//fjern den afviste transaktion fra køen
295                                            statusData.discardedCounter++;
296                                            continue; //gå frem til at tage næste transaktion fra køen
297                                        }
298    
299    
300                                      string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng                                      string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
301    
# Line 285  namespace DaoMqPump2 Line 304  namespace DaoMqPump2
304    
305                                      if (numrows == 1)                                      if (numrows == 1)
306                                      {                                      {
307                                          translog.WriteLine(getNowString() + " " + msgString + "\n");                                          translog.WriteLine(Logfile.getNowString() + " " + msgString);
308                                          mqMgr.Commit();                                          mqMgr.Commit();
309                                          statusData.counter++;                                          statusData.counter++;
310    
311    
312                                            messageCount++;// increment per run message counter
313                                            if (messageCount >= 10000) // if we have moved  10000 messages in this run - let it go and give the other transports a change
314                                            {
315                                                isContinue = false;
316                                            }
317    
318    
319    
320                                      }                                      }
321                                      else                                      else
322                                      {                                      {
# Line 328  namespace DaoMqPump2 Line 357  namespace DaoMqPump2
357                  }                  }
358                  catch (Exception e)                  catch (Exception e)
359                  {                  {
360                        //Det vil være mest korrekt at Rollback/backout MQ Transaktionen her - for at være sikker på at Message'n fjernes fra køen
361                        try
362                        {
363                            if (mqMgr != null)
364                            {
365                                mqMgr.Backout();
366                            }
367                        }
368                        catch (Exception e2)
369                        {
370                            logfile.addSingleLogEntry("Error backing out from MQ Transaction: " + e2.Message);
371                        }
372    
373                      statusData.lastrunOk = false;                      statusData.lastrunOk = false;
374    
375                      statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;                      statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
# Line 382  namespace DaoMqPump2 Line 424  namespace DaoMqPump2
424              Hashtable connProperties = new Hashtable();              Hashtable connProperties = new Hashtable();
425              connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);              connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
426              connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);              connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);
427              connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel); //WARNING: Hardcoded Channel Value !!!              connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel);
428              return connProperties;              return connProperties;
429          }          }
430    
431    
   
         private string getLogFilename(LogfileType type)  
         {  
   
             DateTime now = DateTime.Now;  
             string filename = controller.logDirectory + "\\";  
   
             switch (type)  
             {  
                 case LogfileType.LogEvents:  
                     filename += "eventlog_";  
                     break;  
   
                 case LogfileType.LogTransactions:  
                     filename += "transactionlog_";  
                     break;  
             }  
   
   
             filename += name + "_" + now.Year.ToString("D4") + now.Month.ToString("D2") + ".log";  
   
             return filename;  
         }  
   
         public string getNowString()  
         {  
             DateTime now = DateTime.Now;  
   
             return now.ToString("s");  
         }  
   
         private void addLogEntry(string msg)  
         {  
             msg = getNowString() + " " + msg;  
             lock (logEntries)  
             {  
                 logEntries.AddFirst(msg);  
   
                 if (logEntries.Count > 20)  
                 {  
                     logEntries.RemoveLast();  
                 }                  
             }  
   
             string filename = getLogFilename(LogfileType.LogEvents);  
             using (StreamWriter eventlog = new StreamWriter(filename, true))  
             {  
                 eventlog.WriteLine(msg);  
             }  
         }  
   
         public string[] getLog()  
         {  
             lock(logEntries)  
             {  
                 List<string> tmpEntries = new List<string>();  
                 foreach (string s in logEntries)  
                 {  
                     tmpEntries.Add(s);  
                 }  
                 return tmpEntries.ToArray();  
             }  
         }  
   
432      }      }
433  }  }

Legend:
Removed from v.2051  
changed lines
  Added in v.2168

  ViewVC Help
Powered by ViewVC 1.1.20