/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Diff of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1999 by torben, Mon Jul 8 14:22:33 2013 UTC revision 2050 by torben, Fri Aug 23 17:07:25 2013 UTC
# Line 21  namespace DaoMqPump2 Line 21  namespace DaoMqPump2
21          public static string SQL2MQ = "sql2mq";          public static string SQL2MQ = "sql2mq";
22          public static string MQ2SQL = "mq2sql";          public static string MQ2SQL = "mq2sql";
23    
24          private bool enabled;                  //private bool enabled;        
25    
26          TransportController controller;          TransportController controller;
27    
# Line 55  namespace DaoMqPump2 Line 55  namespace DaoMqPump2
55          public bool Enabled          public bool Enabled
56          {          {
57              get {              get {
58                  return this.enabled;                  return statusData.transportEnabled;
59              }              }
60              set              set
61              {              {
62                  this.enabled = value;                  statusData.transportEnabled = value;
63                  if (value == true)                  if (value == true)
64                  {                  {
65                      this.addLogEntry("Transport enabled");                      this.addLogEntry("Transport enabled");
# Line 85  namespace DaoMqPump2 Line 85  namespace DaoMqPump2
85              this.sql2mqReadQuery = sql2mqReadQuery;              this.sql2mqReadQuery = sql2mqReadQuery;
86              this.sql2mqUpdateQuery = sql2mqUpdateQuery;              this.sql2mqUpdateQuery = sql2mqUpdateQuery;
87    
88              this.enabled = enabled;              statusData.transportEnabled = enabled;
89    
90    
91              statusData.lastrunOk = true;              statusData.lastrunOk = true;
# Line 103  namespace DaoMqPump2 Line 103  namespace DaoMqPump2
103    
104          public void transportMessages()          public void transportMessages()
105          {          {
106              if (enabled == false)              if (statusData.transportEnabled == false)
107                  return;                  return;
108    
109              Console.WriteLine(name + " -> transportMessages() ");              Console.WriteLine(name + " -> transportMessages() ");
# Line 143  namespace DaoMqPump2 Line 143  namespace DaoMqPump2
143              using (StreamWriter translog = new StreamWriter(filename, true) )              using (StreamWriter translog = new StreamWriter(filename, true) )
144              try              try
145              {              {
146                  //stage 1 connect to mq                  //MQ Options
147                  Hashtable connProps = getConnectionProperties();                  Hashtable connProps = getConnectionProperties();
                 MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);  
148                  int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;                  int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
149    
150                  MQQueue out_queue = mqMgr.AccessQueue(queueName, openOptions);                  //MySQL Options
   
   
                 //stage 2 connect to mysql  
151                  string mysqlString = buildMysqlConnString();                  string mysqlString = buildMysqlConnString();
152                  MySqlConnection sqlConnection = new MySqlConnection(mysqlString);                  
                 sqlConnection.Open();  
   
153    
154                  //stage 3 move messages                  using (MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps))//stage 1 connect to mq
155                  string readSql = "CALL " + sql2mqReadQuery + "()";                  using (MQQueue out_queue = mqMgr.AccessQueue(queueName, openOptions))
156                  MySqlCommand readCmd = new MySqlCommand(readSql, sqlConnection);                  using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
157                  MySqlDataReader dataReader = readCmd.ExecuteReader();                  using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
                 while (dataReader.Read())  
158                  {                  {
159                      int id = dataReader.GetInt32(0);                      sqlReadConnection.Open();
160                      string msgString = dataReader.GetString(1);                      sqlWriteConnection.Open();
161    
162                      MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,                      //stage 3 move messages
163                      // same as MQPMO_DEFAULT                      string readSql = "CALL " + sql2mqReadQuery + "()";
164                        MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);
165                        MySqlDataReader dataReader = readCmd.ExecuteReader();
166                        while (dataReader.Read())
167                        {
168                            int id = dataReader.GetInt32(0);
169                            string msgString = dataReader.GetString(1);
170    
171                      MQMessage msg = new MQMessage();                          MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
172                      msg.Format = MQC.MQFMT_STRING;                          // same as MQPMO_DEFAULT
                     msg.CharacterSet = 1252;  
                     msg.WriteString(msgString);  
173    
174                      out_queue.Put(msg, pmo);                          MQMessage msg = new MQMessage();
175                            msg.Format = MQC.MQFMT_STRING;
176                            msg.CharacterSet = 1252;
177                            msg.WriteString(msgString);
178    
179                      //now that the message has been put on queue mark it as such                          out_queue.Put(msg, pmo);
180    
181                      string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";                          //now that the message has been put on queue mark it as such
                     MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlConnection);  
                     int numrows = updateCmd.ExecuteNonQuery();  
182    
183                      translog.WriteLine(getNowString() + " " + msgString + "\n");                          string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
184                            MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
185                            int numrows = updateCmd.ExecuteNonQuery();
186    
187                      if (numrows != 1)                          translog.WriteLine(getNowString() + " " + msgString + "\n");
188                      {  
189                          break;                          if (numrows != 1)
190                            {
191                                break;
192                            }
193                            statusData.counter++;
194                      }                      }
                     statusData.counter++;  
                 }  
                   
195    
196                  //stage 4: everything went smooth so clean up afterwards                  }
                 dataReader.Close();  
                 out_queue.Close();  
                 mqMgr.Close();  
                 sqlConnection.Close();  
197              }              }
198              catch (Exception e)              catch (Exception e)
199              {              {
200                  statusData.lastrunOk = false;                  statusData.lastrunOk = false;
201                  statusData.lastErrorMessage = name + ".transportSql2Mq error: " + e.Message;                  statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
202                  Console.WriteLine(statusData.lastErrorMessage);                  Console.WriteLine(statusData.lastErrorMessage);
203                    Console.WriteLine(e.StackTrace);
204                  EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);                  EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
205              }              }
206          }          }
# Line 213  namespace DaoMqPump2 Line 211  namespace DaoMqPump2
211              using (StreamWriter translog = new StreamWriter(filename, true))              using (StreamWriter translog = new StreamWriter(filename, true))
212              try              try
213              {              {
214                  //stage 1 connect to mq                  //MQ options
215                  Hashtable connProps = getConnectionProperties();                  Hashtable connProps = getConnectionProperties();                
                 MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);  
216                  int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;                  int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
217    
218                  MQQueue in_queue = mqMgr.AccessQueue(queueName, openOptions);                  //MySQL options
   
   
                 //stage 2 connect to mysql  
219                  string mysqlString = buildMysqlConnString();                  string mysqlString = buildMysqlConnString();
                 MySqlConnection sqlConnection = new MySqlConnection(mysqlString);  
                 sqlConnection.Open();  
   
220    
221                  //stage 3 move messages                  using (MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps))//stage 1 connect to mq
222                  bool isContinue = true;                  using (MQQueue in_queue = mqMgr.AccessQueue(queueName, openOptions) )
223                  while (isContinue)                  using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
224                  {                  {
                       
                     MQMessage mqMsg = new MQMessage();  
                     MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();  
225    
226                      mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen                      sqlConnection.Open();
227    
228                      try  
229                        //stage 3 move messages
230                        bool isContinue = true;
231                        while (isContinue)
232                      {                      {
                         in_queue.Get(mqMsg, mqGetMsgOpts);  
                         if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)  
                         {                              
                             string msgString = mqMsg.ReadString(mqMsg.MessageLength);  
                             System.Console.WriteLine(msgString);  
233    
234                              string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng                          MQMessage mqMsg = new MQMessage();
235                            MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
236    
237                              MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);                          mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
                             int numrows = sqlcmd.ExecuteNonQuery();  
238    
239                              if (numrows == 1)                          try
240                            {
241                                in_queue.Get(mqMsg, mqGetMsgOpts);
242                                if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
243                              {                              {
244                                  translog.WriteLine(getNowString() + " " + msgString + "\n");                                  string msgString = mqMsg.ReadString(mqMsg.MessageLength);
245                                  mqMgr.Commit();                                  System.Console.WriteLine(msgString);
246                                  statusData.counter++;  
247                                    string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
248    
249                                    MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
250                                    int numrows = sqlcmd.ExecuteNonQuery();
251    
252                                    if (numrows == 1)
253                                    {
254                                        translog.WriteLine(getNowString() + " " + msgString + "\n");
255                                        mqMgr.Commit();
256                                        statusData.counter++;
257                                    }
258                                    else
259                                    {
260                                        mqMgr.Backout();
261                                        isContinue = false;
262                                    }
263    
264                              }                              }
265                              else                              else
266                              {                              {
267                                  mqMgr.Backout();                                  System.Console.WriteLine("Non-text message");
268                                  isContinue = false;                              }
                             }                              
   
269                          }                          }
270                          else                          catch (MQException mqe)
271                          {                          {
272                              System.Console.WriteLine("Non-text message");                              isContinue = false;
273                          }  
274                      }                              // report reason, if any
275                      catch (MQException mqe)                              if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
276                      {                              {
277                          isContinue = false;                                  // special report for normal end
278                                    System.Console.WriteLine("no more messages");
279                                }
280                                else
281                                {
282                                    // general report for other reasons
283                                    System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
284                                    statusData.lastrunOk = false;
285                                }
286    
                         // report reason, if any  
                         if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)  
                         {  
                             // special report for normal end  
                             System.Console.WriteLine("no more messages");  
                         }  
                         else  
                         {  
                             // general report for other reasons  
                             System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message);;  
                             statusData.lastrunOk = false;  
287                          }                          }
288    
                     }  
289    
290                        }
291    
292                  }                  }
293    
                 //stage 4: everything went smooth so clean up afterwards  
                 in_queue.Close();  
                 mqMgr.Close();  
                 sqlConnection.Close();  
   
   
294              }              }
295              catch (Exception e)              catch (Exception e)
296              {              {
297                  statusData.lastrunOk = false;                  statusData.lastrunOk = false;
298                    
299                  statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.Message;                  statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
300                  Console.WriteLine(statusData.lastErrorMessage);                  Console.WriteLine(statusData.lastErrorMessage);
301                    Console.WriteLine(e.StackTrace);
302                  EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);                  EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
303              }              }
304          }          }
# Line 315  namespace DaoMqPump2 Line 311  namespace DaoMqPump2
311              //connectionString += "DATABASE=" + controller.mysqlHost + ";";              //connectionString += "DATABASE=" + controller.mysqlHost + ";";
312              connectionString += "UID=" + controller.mysqlUser + ";";              connectionString += "UID=" + controller.mysqlUser + ";";
313              connectionString += "PASSWORD=" + controller.mysqlPassword + ";";              connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
314                //connectionString += "maximumpoolsize=10;";
315                //connectionString += "ConnectionReset=true;";
316    
317              return connectionString;              return connectionString;
318          }          }
# Line 348  namespace DaoMqPump2 Line 346  namespace DaoMqPump2
346              }              }
347    
348    
349              filename += "transactionlog_" + name + "_" + now.Year.ToString("D4") + now.Month.ToString("D2") + ".log";              filename += name + "_" + now.Year.ToString("D4") + now.Month.ToString("D2") + ".log";
350    
351              return filename;              return filename;
352          }          }

Legend:
Removed from v.1999  
changed lines
  Added in v.2050

  ViewVC Help
Powered by ViewVC 1.1.20