/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Diff of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 2010 by torben, Wed Jul 10 18:28:31 2013 UTC revision 2011 by torben, Wed Jul 10 20:20:21 2013 UTC
# Line 143  namespace DaoMqPump2 Line 143  namespace DaoMqPump2
143              using (StreamWriter translog = new StreamWriter(filename, true) )              using (StreamWriter translog = new StreamWriter(filename, true) )
144              try              try
145              {              {
146                  //stage 1 connect to mq                  //MQ Options
147                  Hashtable connProps = getConnectionProperties();                  Hashtable connProps = getConnectionProperties();
                 MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);  
148                  int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;                  int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
149    
150                  MQQueue out_queue = mqMgr.AccessQueue(queueName, openOptions);                  //MySQL Options
   
   
                 //stage 2 connect to mysql  
151                  string mysqlString = buildMysqlConnString();                  string mysqlString = buildMysqlConnString();
152                  MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString);                  
                 sqlReadConnection.Open();  
   
                 MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString);  
                 sqlWriteConnection.Open();  
   
153    
154                  //stage 3 move messages                  using (MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps))//stage 1 connect to mq
155                  string readSql = "CALL " + sql2mqReadQuery + "()";                  using (MQQueue out_queue = mqMgr.AccessQueue(queueName, openOptions))
156                  MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);                  using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
157                  MySqlDataReader dataReader = readCmd.ExecuteReader();                  using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
                 while (dataReader.Read())  
158                  {                  {
159                      int id = dataReader.GetInt32(0);                      sqlReadConnection.Open();
160                      string msgString = dataReader.GetString(1);                      sqlWriteConnection.Open();
161    
162                      MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,                      //stage 3 move messages
163                      // same as MQPMO_DEFAULT                      string readSql = "CALL " + sql2mqReadQuery + "()";
164                        MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);
165                        MySqlDataReader dataReader = readCmd.ExecuteReader();
166                        while (dataReader.Read())
167                        {
168                            int id = dataReader.GetInt32(0);
169                            string msgString = dataReader.GetString(1);
170    
171                      MQMessage msg = new MQMessage();                          MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
172                      msg.Format = MQC.MQFMT_STRING;                          // same as MQPMO_DEFAULT
                     msg.CharacterSet = 1252;  
                     msg.WriteString(msgString);  
173    
174                      out_queue.Put(msg, pmo);                          MQMessage msg = new MQMessage();
175                            msg.Format = MQC.MQFMT_STRING;
176                            msg.CharacterSet = 1252;
177                            msg.WriteString(msgString);
178    
179                      //now that the message has been put on queue mark it as such                          out_queue.Put(msg, pmo);
180    
181                      string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";                          //now that the message has been put on queue mark it as such
                     MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);  
                     int numrows = updateCmd.ExecuteNonQuery();  
182    
183                      translog.WriteLine(getNowString() + " " + msgString + "\n");                          string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
184                            MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
185                            int numrows = updateCmd.ExecuteNonQuery();
186    
187                      if (numrows != 1)                          translog.WriteLine(getNowString() + " " + msgString + "\n");
188                      {  
189                          break;                          if (numrows != 1)
190                            {
191                                break;
192                            }
193                            statusData.counter++;
194                      }                      }
                     statusData.counter++;  
                 }  
                   
195    
196                  //stage 4: everything went smooth so clean up afterwards                  }
                 dataReader.Close();  
                 out_queue.Close();  
                 mqMgr.Close();  
                 sqlReadConnection.Close();  
                 sqlWriteConnection.Close();  
197              }              }
198              catch (Exception e)              catch (Exception e)
199              {              {
# Line 217  namespace DaoMqPump2 Line 210  namespace DaoMqPump2
210              using (StreamWriter translog = new StreamWriter(filename, true))              using (StreamWriter translog = new StreamWriter(filename, true))
211              try              try
212              {              {
213                  //stage 1 connect to mq                  //MQ options
214                  Hashtable connProps = getConnectionProperties();                  Hashtable connProps = getConnectionProperties();                
                 MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);  
215                  int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;                  int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
216    
217                  MQQueue in_queue = mqMgr.AccessQueue(queueName, openOptions);                  //MySQL options
   
   
                 //stage 2 connect to mysql  
218                  string mysqlString = buildMysqlConnString();                  string mysqlString = buildMysqlConnString();
                 MySqlConnection sqlConnection = new MySqlConnection(mysqlString);  
                 sqlConnection.Open();  
219    
220                    using (MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps))//stage 1 connect to mq
221                  //stage 3 move messages                  using (MQQueue in_queue = mqMgr.AccessQueue(queueName, openOptions) )
222                  bool isContinue = true;                  using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
                 while (isContinue)  
223                  {                  {
                       
                     MQMessage mqMsg = new MQMessage();  
                     MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();  
224    
225                      mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen                      sqlConnection.Open();
226    
227    
228                      try                      //stage 3 move messages
229                        bool isContinue = true;
230                        while (isContinue)
231                      {                      {
                         in_queue.Get(mqMsg, mqGetMsgOpts);  
                         if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)  
                         {                              
                             string msgString = mqMsg.ReadString(mqMsg.MessageLength);  
                             System.Console.WriteLine(msgString);  
232    
233                              string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng                          MQMessage mqMsg = new MQMessage();
234                            MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
235    
236                              MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);                          mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
                             int numrows = sqlcmd.ExecuteNonQuery();  
237    
238                              if (numrows == 1)                          try
239                            {
240                                in_queue.Get(mqMsg, mqGetMsgOpts);
241                                if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
242                              {                              {
243                                  translog.WriteLine(getNowString() + " " + msgString + "\n");                                  string msgString = mqMsg.ReadString(mqMsg.MessageLength);
244                                  mqMgr.Commit();                                  System.Console.WriteLine(msgString);
245                                  statusData.counter++;  
246                                    string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
247    
248                                    MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
249                                    int numrows = sqlcmd.ExecuteNonQuery();
250    
251                                    if (numrows == 1)
252                                    {
253                                        translog.WriteLine(getNowString() + " " + msgString + "\n");
254                                        mqMgr.Commit();
255                                        statusData.counter++;
256                                    }
257                                    else
258                                    {
259                                        mqMgr.Backout();
260                                        isContinue = false;
261                                    }
262    
263                              }                              }
264                              else                              else
265                              {                              {
266                                  mqMgr.Backout();                                  System.Console.WriteLine("Non-text message");
267                                  isContinue = false;                              }
                             }                              
   
268                          }                          }
269                          else                          catch (MQException mqe)
270                          {                          {
271                              System.Console.WriteLine("Non-text message");                              isContinue = false;
272                          }  
273                      }                              // report reason, if any
274                      catch (MQException mqe)                              if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
275                      {                              {
276                          isContinue = false;                                  // special report for normal end
277                                    System.Console.WriteLine("no more messages");
278                                }
279                                else
280                                {
281                                    // general report for other reasons
282                                    System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
283                                    statusData.lastrunOk = false;
284                                }
285    
                         // report reason, if any  
                         if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)  
                         {  
                             // special report for normal end  
                             System.Console.WriteLine("no more messages");  
                         }  
                         else  
                         {  
                             // general report for other reasons  
                             System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message);;  
                             statusData.lastrunOk = false;  
286                          }                          }
287    
                     }  
288    
289                        }
290    
291                  }                  }
292    
                 //stage 4: everything went smooth so clean up afterwards  
                 in_queue.Close();  
                 mqMgr.Close();  
                 sqlConnection.Close();  
   
   
293              }              }
294              catch (Exception e)              catch (Exception e)
295              {              {

Legend:
Removed from v.2010  
changed lines
  Added in v.2011

  ViewVC Help
Powered by ViewVC 1.1.20