/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Diff of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1986 by torben, Wed Jul 3 07:56:52 2013 UTC revision 2047 by torben, Mon Aug 19 07:21:59 2013 UTC
# Line 12  namespace DaoMqPump2 Line 12  namespace DaoMqPump2
12  {  {
13      public class Transport      public class Transport
14      {      {
15    
16            enum LogfileType {
17                LogTransactions,
18                LogEvents
19            }
20    
21          public static string SQL2MQ = "sql2mq";          public static string SQL2MQ = "sql2mq";
22          public static string MQ2SQL = "mq2sql";          public static string MQ2SQL = "mq2sql";
23    
24            //private bool enabled;        
         public bool enabled { get; set; }  
25    
26          TransportController controller;          TransportController controller;
27    
28            StatusData statusData = new StatusData();
29    
30          public string name { get; private set; }          public string name { get; private set; }
31          public string direction { get; private set; }          public string direction { get; private set; }
32          public string queueName { get; private set; }          public string queueName { get; private set; }
# Line 27  namespace DaoMqPump2 Line 34  namespace DaoMqPump2
34          public string sql2mqReadQuery { get; private set; }          public string sql2mqReadQuery { get; private set; }
35          public string sql2mqUpdateQuery { get; private set; }          public string sql2mqUpdateQuery { get; private set; }
36    
37          public bool lastrunOk { get; private set; }          //public bool lastrunOk { get; private set; }
38          public string lastErrorMessage { get; private set; }          //public string lastErrorMessage { get; private set; }
39    
40            //public string lastOkTime { get; private set; }
41            //public string lastErrorTime { get; private set; }
42            //public string lastTransferTime { get; private set; }
43    
44          public string lastOkTime { get; private set; }          //public int counter { get; private set; }
45          public string lastErrorTime { get; private set; }  
46            public StatusData TransportStatusData
47            {
48                get
49                {
50                    return this.statusData;
51                }
52            }
53    
54    
55            public bool Enabled
56            {
57                get {
58                    return statusData.transportEnabled;
59                }
60                set
61                {
62                    statusData.transportEnabled = value;
63                    if (value == true)
64                    {
65                        this.addLogEntry("Transport enabled");
66                    }
67                    else
68                    {
69                        this.addLogEntry("Transport disabled");
70                    }
71                }
72            }
73    
         public int counter { get; private set; }  
74    
75          private LinkedList<string> logEntries = new LinkedList<string>();          private LinkedList<string> logEntries = new LinkedList<string>();
76    
# Line 48  namespace DaoMqPump2 Line 85  namespace DaoMqPump2
85              this.sql2mqReadQuery = sql2mqReadQuery;              this.sql2mqReadQuery = sql2mqReadQuery;
86              this.sql2mqUpdateQuery = sql2mqUpdateQuery;              this.sql2mqUpdateQuery = sql2mqUpdateQuery;
87    
88              this.enabled = enabled;              statusData.transportEnabled = enabled;
89                
90                
91              lastrunOk = true;              statusData.lastrunOk = true;
92              counter = 0;              statusData.counter = 0;
93              lastErrorMessage = lastOkTime = lastErrorTime = "";              statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";
94    
95              addLogEntry( "Starting ... " );              addLogEntry( "Starting ... " );
96          }          }
97    
98            ~Transport()
99            {
100                addLogEntry("Stopping ... ");
101            }
102    
103    
104          public void transportMessages()          public void transportMessages()
105          {          {
106              if (enabled == false)              if (statusData.transportEnabled == false)
107                  return;                  return;
108    
109              Console.WriteLine(name + " -> transportMessages() ");              Console.WriteLine(name + " -> transportMessages() ");
110              lastrunOk = true;              statusData.lastrunOk = true;
111    
112                int startCounter = statusData.counter;
113    
114              if (direction == SQL2MQ)              if (direction == SQL2MQ)
115              {              {
# Line 75  namespace DaoMqPump2 Line 120  namespace DaoMqPump2
120                  transportMq2Sql();                  transportMq2Sql();
121              }              }
122    
123              if (lastrunOk == true)              if (statusData.lastrunOk == true)
124              {              {
125                  lastOkTime = getNowString();                  statusData.lastOkTime = getNowString();
126    
127                    if (statusData.counter != startCounter)
128                    {
129                        //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet
130                        statusData.lastTransferTime = getNowString();
131                    }
132              }              }
133              else              else
134              {              {
135                  addLogEntry(lastErrorMessage);                  addLogEntry(statusData.lastErrorMessage);
136                  lastErrorTime = getNowString();                  statusData.lastErrorTime = getNowString();
137              }              }
138          }          }
139    
140          private void transportSql2Mq()          private void transportSql2Mq()
141          {          {
142              string filename = getTransactionlogFilename();              string filename = getLogFilename(LogfileType.LogTransactions);
143              using (StreamWriter translog = new StreamWriter(filename, true) )              using (StreamWriter translog = new StreamWriter(filename, true) )
144              try              try
145              {              {
146                  //stage 1 connect to mq                  //MQ Options
147                  Hashtable connProps = getConnectionProperties();                  Hashtable connProps = getConnectionProperties();
                 MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);  
148                  int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;                  int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
149    
150                  MQQueue out_queue = mqMgr.AccessQueue(queueName, openOptions);                  //MySQL Options
   
   
                 //stage 2 connect to mysql  
151                  string mysqlString = buildMysqlConnString();                  string mysqlString = buildMysqlConnString();
152                  MySqlConnection sqlConnection = new MySqlConnection(mysqlString);                  
                 sqlConnection.Open();  
   
153    
154                  //stage 3 move messages                  using (MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps))//stage 1 connect to mq
155                  string readSql = "CALL " + sql2mqReadQuery + "()";                  using (MQQueue out_queue = mqMgr.AccessQueue(queueName, openOptions))
156                  MySqlCommand readCmd = new MySqlCommand(readSql, sqlConnection);                  using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
157                  MySqlDataReader dataReader = readCmd.ExecuteReader();                  using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
                 while (dataReader.Read())  
158                  {                  {
159                      int id = dataReader.GetInt32(0);                      sqlReadConnection.Open();
160                      string msgString = dataReader.GetString(1);                      sqlWriteConnection.Open();
161    
162                        //stage 3 move messages
163                        string readSql = "CALL " + sql2mqReadQuery + "()";
164                        MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);
165                        MySqlDataReader dataReader = readCmd.ExecuteReader();
166                        while (dataReader.Read())
167                        {
168                            int id = dataReader.GetInt32(0);
169                            string msgString = dataReader.GetString(1);
170    
171                      MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,                          MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
172                      // same as MQPMO_DEFAULT                          // same as MQPMO_DEFAULT
173    
174                      MQMessage msg = new MQMessage();                          MQMessage msg = new MQMessage();
175                      msg.Format = MQC.MQFMT_STRING;                          msg.Format = MQC.MQFMT_STRING;
176                      msg.CharacterSet = 1252;                          msg.CharacterSet = 1252;
177                      msg.WriteString(msgString);                          msg.WriteString(msgString);
178    
179                      out_queue.Put(msg, pmo);                          out_queue.Put(msg, pmo);
180    
181                      //now that the message has been put on queue mark it as such                          //now that the message has been put on queue mark it as such
182    
183                      string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";                          string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
184                      MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlConnection);                          MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
185                      int numrows = updateCmd.ExecuteNonQuery();                          int numrows = updateCmd.ExecuteNonQuery();
186    
187                      translog.WriteLine(getNowString() + " " + msgString + "\n");                          translog.WriteLine(getNowString() + " " + msgString + "\n");
188    
189                      if (numrows != 1)                          if (numrows != 1)
190                      {                          {
191                          break;                              break;
192                            }
193                            statusData.counter++;
194                      }                      }
                     counter++;  
                 }  
                   
195    
196                  //stage 4: everything went smooth so clean up afterwards                  }
                 dataReader.Close();  
                 out_queue.Close();  
                 mqMgr.Close();  
                 sqlConnection.Close();  
197              }              }
198              catch (Exception e)              catch (Exception e)
199              {              {
200                  lastrunOk = false;                  statusData.lastrunOk = false;
201                  lastErrorMessage = name + ".transportSql2Mq error: " + e.Message;                  statusData.lastErrorMessage = name + ".transportSql2Mq error: " + e.Message;
202                  Console.WriteLine(lastErrorMessage);                  Console.WriteLine(statusData.lastErrorMessage);
203                  EventLog.WriteEntry("DaoMqPump2", lastErrorMessage, EventLogEntryType.Warning);                  EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
204              }              }
205          }          }
206    
207          private void transportMq2Sql()          private void transportMq2Sql()
208          {          {
209              string filename = getTransactionlogFilename();              string filename = getLogFilename(LogfileType.LogTransactions);
210              using (StreamWriter translog = new StreamWriter(filename, true))              using (StreamWriter translog = new StreamWriter(filename, true))
211              try              try
212              {              {
213                  //stage 1 connect to mq                  //MQ options
214                  Hashtable connProps = getConnectionProperties();                  Hashtable connProps = getConnectionProperties();                
                 MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);  
215                  int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;                  int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
216    
217                  MQQueue in_queue = mqMgr.AccessQueue(queueName, openOptions);                  //MySQL options
   
   
                 //stage 2 connect to mysql  
218                  string mysqlString = buildMysqlConnString();                  string mysqlString = buildMysqlConnString();
                 MySqlConnection sqlConnection = new MySqlConnection(mysqlString);  
                 sqlConnection.Open();  
   
219    
220                  //stage 3 move messages                  using (MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps))//stage 1 connect to mq
221                  bool isContinue = true;                  using (MQQueue in_queue = mqMgr.AccessQueue(queueName, openOptions) )
222                  while (isContinue)                  using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
223                  {                  {
                       
                     MQMessage mqMsg = new MQMessage();  
                     MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();  
224    
225                      mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen                      sqlConnection.Open();
226    
227                      try  
228                        //stage 3 move messages
229                        bool isContinue = true;
230                        while (isContinue)
231                      {                      {
                         in_queue.Get(mqMsg, mqGetMsgOpts);  
                         if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)  
                         {                              
                             string msgString = mqMsg.ReadString(mqMsg.MessageLength);  
                             System.Console.WriteLine(msgString);  
232    
233                              string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng                          MQMessage mqMsg = new MQMessage();
234                            MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
235    
236                              MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);                          mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
                             int numrows = sqlcmd.ExecuteNonQuery();  
237    
238                              if (numrows == 1)                          try
239                            {
240                                in_queue.Get(mqMsg, mqGetMsgOpts);
241                                if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
242                              {                              {
243                                  translog.WriteLine(getNowString() + " " + msgString + "\n");                                  string msgString = mqMsg.ReadString(mqMsg.MessageLength);
244                                  mqMgr.Commit();                                  System.Console.WriteLine(msgString);
245                                  counter++;  
246                                    string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
247    
248                                    MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
249                                    int numrows = sqlcmd.ExecuteNonQuery();
250    
251                                    if (numrows == 1)
252                                    {
253                                        translog.WriteLine(getNowString() + " " + msgString + "\n");
254                                        mqMgr.Commit();
255                                        statusData.counter++;
256                                    }
257                                    else
258                                    {
259                                        mqMgr.Backout();
260                                        isContinue = false;
261                                    }
262    
263                              }                              }
264                              else                              else
265                              {                              {
266                                  mqMgr.Backout();                                  System.Console.WriteLine("Non-text message");
267                                  isContinue = false;                              }
                             }                              
   
268                          }                          }
269                          else                          catch (MQException mqe)
270                          {                          {
271                              System.Console.WriteLine("Non-text message");                              isContinue = false;
272                          }  
273                      }                              // report reason, if any
274                      catch (MQException mqe)                              if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
275                      {                              {
276                          isContinue = false;                                  // special report for normal end
277                                    System.Console.WriteLine("no more messages");
278                                }
279                                else
280                                {
281                                    // general report for other reasons
282                                    System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
283                                    statusData.lastrunOk = false;
284                                }
285    
                         // report reason, if any  
                         if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)  
                         {  
                             // special report for normal end  
                             System.Console.WriteLine("no more messages");  
                         }  
                         else  
                         {  
                             // general report for other reasons  
                             System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message);;  
                             lastrunOk = false;  
286                          }                          }
287    
                     }  
288    
289                        }
290    
291                  }                  }
292    
                 //stage 4: everything went smooth so clean up afterwards  
                 in_queue.Close();  
                 mqMgr.Close();  
                 sqlConnection.Close();  
   
   
293              }              }
294              catch (Exception e)              catch (Exception e)
295              {              {
296                  lastrunOk = false;                  statusData.lastrunOk = false;
297    
298                  lastErrorMessage = name + ".transportMq2Sql error: " + e.Message;                  statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.Message;
299                  Console.WriteLine(lastErrorMessage);                  Console.WriteLine(statusData.lastErrorMessage);
300                  EventLog.WriteEntry("DaoMqPump2", lastErrorMessage, EventLogEntryType.Warning);                  EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
301              }              }
302          }          }
303    
# Line 264  namespace DaoMqPump2 Line 309  namespace DaoMqPump2
309              //connectionString += "DATABASE=" + controller.mysqlHost + ";";              //connectionString += "DATABASE=" + controller.mysqlHost + ";";
310              connectionString += "UID=" + controller.mysqlUser + ";";              connectionString += "UID=" + controller.mysqlUser + ";";
311              connectionString += "PASSWORD=" + controller.mysqlPassword + ";";              connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
312                connectionString += "maximumpoolsize=10;";
313    
314              return connectionString;              return connectionString;
315          }          }
# Line 277  namespace DaoMqPump2 Line 323  namespace DaoMqPump2
323              return connProperties;              return connProperties;
324          }          }
325    
326          private string getTransactionlogFilename()  
327    
328            private string getLogFilename(LogfileType type)
329          {          {
330    
331              DateTime now = DateTime.Now;              DateTime now = DateTime.Now;
332              string filename = controller.logDirectory + "\\";              string filename = controller.logDirectory + "\\";
333              filename += "transactionlog_" + name + "_" + now.Year.ToString("D4") + now.Month.ToString("D2") + ".log";  
334                switch (type)
335                {
336                    case LogfileType.LogEvents:
337                        filename += "eventlog_";
338                        break;
339    
340                    case LogfileType.LogTransactions:
341                        filename += "transactionlog_";
342                        break;
343                }
344    
345    
346                filename += name + "_" + now.Year.ToString("D4") + now.Month.ToString("D2") + ".log";
347    
348              return filename;              return filename;
349          }          }
# Line 304  namespace DaoMqPump2 Line 365  namespace DaoMqPump2
365                  if (logEntries.Count > 20)                  if (logEntries.Count > 20)
366                  {                  {
367                      logEntries.RemoveLast();                      logEntries.RemoveLast();
368                  }                  }                
369                                }
370    
371                string filename = getLogFilename(LogfileType.LogEvents);
372                using (StreamWriter eventlog = new StreamWriter(filename, true))
373                {
374                    eventlog.WriteLine(msg);
375              }              }
376          }          }
377    

Legend:
Removed from v.1986  
changed lines
  Added in v.2047

  ViewVC Help
Powered by ViewVC 1.1.20