--- dao/DaoMqPump2/DaoMqPump2/Transport.cs 2013/07/10 18:28:31 2010 +++ dao/DaoMqPump2/DaoMqPump2/Transport.cs 2013/07/10 20:20:21 2011 @@ -143,64 +143,57 @@ using (StreamWriter translog = new StreamWriter(filename, true) ) try { - //stage 1 connect to mq + //MQ Options Hashtable connProps = getConnectionProperties(); - MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps); int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING; - MQQueue out_queue = mqMgr.AccessQueue(queueName, openOptions); - - - //stage 2 connect to mysql + //MySQL Options string mysqlString = buildMysqlConnString(); - MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString); - sqlReadConnection.Open(); - - MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString); - sqlWriteConnection.Open(); - + - //stage 3 move messages - string readSql = "CALL " + sql2mqReadQuery + "()"; - MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection); - MySqlDataReader dataReader = readCmd.ExecuteReader(); - while (dataReader.Read()) + using (MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps))//stage 1 connect to mq + using (MQQueue out_queue = mqMgr.AccessQueue(queueName, openOptions)) + using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql + using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString)) { - int id = dataReader.GetInt32(0); - string msgString = dataReader.GetString(1); + sqlReadConnection.Open(); + sqlWriteConnection.Open(); - MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults, - // same as MQPMO_DEFAULT + //stage 3 move messages + string readSql = "CALL " + sql2mqReadQuery + "()"; + MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection); + MySqlDataReader dataReader = readCmd.ExecuteReader(); + while (dataReader.Read()) + { + int id = dataReader.GetInt32(0); + string msgString = dataReader.GetString(1); - MQMessage msg = new MQMessage(); - msg.Format = MQC.MQFMT_STRING; - msg.CharacterSet = 1252; - msg.WriteString(msgString); + MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults, + // same as MQPMO_DEFAULT - out_queue.Put(msg, pmo); + MQMessage msg = new MQMessage(); + msg.Format = MQC.MQFMT_STRING; + msg.CharacterSet = 1252; + msg.WriteString(msgString); - //now that the message has been put on queue mark it as such + out_queue.Put(msg, pmo); - string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")"; - MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection); - int numrows = updateCmd.ExecuteNonQuery(); + //now that the message has been put on queue mark it as such - translog.WriteLine(getNowString() + " " + msgString + "\n"); + string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")"; + MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection); + int numrows = updateCmd.ExecuteNonQuery(); - if (numrows != 1) - { - break; + translog.WriteLine(getNowString() + " " + msgString + "\n"); + + if (numrows != 1) + { + break; + } + statusData.counter++; } - statusData.counter++; - } - - //stage 4: everything went smooth so clean up afterwards - dataReader.Close(); - out_queue.Close(); - mqMgr.Close(); - sqlReadConnection.Close(); - sqlWriteConnection.Close(); + } } catch (Exception e) { @@ -217,89 +210,86 @@ using (StreamWriter translog = new StreamWriter(filename, true)) try { - //stage 1 connect to mq - Hashtable connProps = getConnectionProperties(); - MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps); + //MQ options + Hashtable connProps = getConnectionProperties(); int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING; - MQQueue in_queue = mqMgr.AccessQueue(queueName, openOptions); - - - //stage 2 connect to mysql + //MySQL options string mysqlString = buildMysqlConnString(); - MySqlConnection sqlConnection = new MySqlConnection(mysqlString); - sqlConnection.Open(); - - //stage 3 move messages - bool isContinue = true; - while (isContinue) + using (MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps))//stage 1 connect to mq + using (MQQueue in_queue = mqMgr.AccessQueue(queueName, openOptions) ) + using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql { - - MQMessage mqMsg = new MQMessage(); - MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions(); - mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen + sqlConnection.Open(); + - try + //stage 3 move messages + bool isContinue = true; + while (isContinue) { - in_queue.Get(mqMsg, mqGetMsgOpts); - if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0) - { - string msgString = mqMsg.ReadString(mqMsg.MessageLength); - System.Console.WriteLine(msgString); - string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng + MQMessage mqMsg = new MQMessage(); + MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions(); - MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection); - int numrows = sqlcmd.ExecuteNonQuery(); + mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen - if (numrows == 1) + try + { + in_queue.Get(mqMsg, mqGetMsgOpts); + if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0) { - translog.WriteLine(getNowString() + " " + msgString + "\n"); - mqMgr.Commit(); - statusData.counter++; + string msgString = mqMsg.ReadString(mqMsg.MessageLength); + System.Console.WriteLine(msgString); + + string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng + + MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection); + int numrows = sqlcmd.ExecuteNonQuery(); + + if (numrows == 1) + { + translog.WriteLine(getNowString() + " " + msgString + "\n"); + mqMgr.Commit(); + statusData.counter++; + } + else + { + mqMgr.Backout(); + isContinue = false; + } + } else { - mqMgr.Backout(); - isContinue = false; - } - + System.Console.WriteLine("Non-text message"); + } } - else + catch (MQException mqe) { - System.Console.WriteLine("Non-text message"); - } - } - catch (MQException mqe) - { - isContinue = false; + isContinue = false; + + // report reason, if any + if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE) + { + // special report for normal end + System.Console.WriteLine("no more messages"); + } + else + { + // general report for other reasons + System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ; + statusData.lastrunOk = false; + } - // report reason, if any - if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE) - { - // special report for normal end - System.Console.WriteLine("no more messages"); - } - else - { - // general report for other reasons - System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message);; - statusData.lastrunOk = false; } - } + } } - //stage 4: everything went smooth so clean up afterwards - in_queue.Close(); - mqMgr.Close(); - sqlConnection.Close(); - - } catch (Exception e) {