using System; using System.Collections; using System.Collections.Generic; using System.IO; using System.Diagnostics; using IBM.WMQ; using MySql.Data.MySqlClient; namespace DaoMqPump2 { public class Transport { public static string SQL2MQ = "sql2mq"; public static string MQ2SQL = "mq2sql"; public bool enabled { get; set; } TransportController controller; public string name { get; private set; } public string direction { get; private set; } public string queueName { get; private set; } public string mq2sqlInsertQuery { get; private set; } public string sql2mqReadQuery { get; private set; } public string sql2mqUpdateQuery { get; private set; } public bool lastrunOk { get; private set; } public string lastErrorMessage { get; private set; } public string lastOkTime { get; private set; } public string lastErrorTime { get; private set; } public int counter { get; private set; } private LinkedList logEntries = new LinkedList(); public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled) { this.controller = controller; this.name = name; this.direction = direction; this.queueName = queueName; this.mq2sqlInsertQuery = mq2sqlInsertQuery; this.sql2mqReadQuery = sql2mqReadQuery; this.sql2mqUpdateQuery = sql2mqUpdateQuery; this.enabled = enabled; lastrunOk = true; counter = 0; lastErrorMessage = lastOkTime = lastErrorTime = ""; addLogEntry( "Starting ... " ); } public void transportMessages() { if (enabled == false) return; Console.WriteLine(name + " -> transportMessages() "); lastrunOk = true; if (direction == SQL2MQ) { transportSql2Mq(); } else { transportMq2Sql(); } if (lastrunOk == true) { lastOkTime = getNowString(); } else { addLogEntry(lastErrorMessage); lastErrorTime = getNowString(); } } private void transportSql2Mq() { string filename = getTransactionlogFilename(); using (StreamWriter translog = new StreamWriter(filename, true) ) try { //stage 1 connect to mq Hashtable connProps = getConnectionProperties(); MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps); int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING; MQQueue out_queue = mqMgr.AccessQueue(queueName, openOptions); //stage 2 connect to mysql string mysqlString = buildMysqlConnString(); MySqlConnection sqlConnection = new MySqlConnection(mysqlString); sqlConnection.Open(); //stage 3 move messages string readSql = "CALL " + sql2mqReadQuery + "()"; MySqlCommand readCmd = new MySqlCommand(readSql, sqlConnection); MySqlDataReader dataReader = readCmd.ExecuteReader(); while (dataReader.Read()) { int id = dataReader.GetInt32(0); string msgString = dataReader.GetString(1); MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults, // same as MQPMO_DEFAULT MQMessage msg = new MQMessage(); msg.Format = MQC.MQFMT_STRING; msg.CharacterSet = 1252; msg.WriteString(msgString); out_queue.Put(msg, pmo); //now that the message has been put on queue mark it as such string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")"; MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlConnection); int numrows = updateCmd.ExecuteNonQuery(); translog.WriteLine(getNowString() + " " + msgString + "\n"); if (numrows != 1) { break; } counter++; } //stage 4: everything went smooth so clean up afterwards dataReader.Close(); out_queue.Close(); mqMgr.Close(); sqlConnection.Close(); } catch (Exception e) { lastrunOk = false; lastErrorMessage = name + ".transportSql2Mq error: " + e.Message; Console.WriteLine(lastErrorMessage); EventLog.WriteEntry("DaoMqPump2", lastErrorMessage, EventLogEntryType.Warning); } } private void transportMq2Sql() { string filename = getTransactionlogFilename(); using (StreamWriter translog = new StreamWriter(filename, true)) try { //stage 1 connect to mq Hashtable connProps = getConnectionProperties(); MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps); int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING; MQQueue in_queue = mqMgr.AccessQueue(queueName, openOptions); //stage 2 connect to mysql string mysqlString = buildMysqlConnString(); MySqlConnection sqlConnection = new MySqlConnection(mysqlString); sqlConnection.Open(); //stage 3 move messages bool isContinue = true; while (isContinue) { MQMessage mqMsg = new MQMessage(); MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions(); mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen try { in_queue.Get(mqMsg, mqGetMsgOpts); if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0) { string msgString = mqMsg.ReadString(mqMsg.MessageLength); System.Console.WriteLine(msgString); string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection); int numrows = sqlcmd.ExecuteNonQuery(); if (numrows == 1) { translog.WriteLine(getNowString() + " " + msgString + "\n"); mqMgr.Commit(); counter++; } else { mqMgr.Backout(); isContinue = false; } } else { System.Console.WriteLine("Non-text message"); } } catch (MQException mqe) { isContinue = false; // report reason, if any if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE) { // special report for normal end System.Console.WriteLine("no more messages"); } else { // general report for other reasons System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message);; lastrunOk = false; } } } //stage 4: everything went smooth so clean up afterwards in_queue.Close(); mqMgr.Close(); sqlConnection.Close(); } catch (Exception e) { lastrunOk = false; lastErrorMessage = name + ".transportMq2Sql error: " + e.Message; Console.WriteLine(lastErrorMessage); EventLog.WriteEntry("DaoMqPump2", lastErrorMessage, EventLogEntryType.Warning); } } private string buildMysqlConnString() { string connectionString = ""; connectionString += "SERVER=" + controller.mysqlHost + ";"; //connectionString += "DATABASE=" + controller.mysqlHost + ";"; connectionString += "UID=" + controller.mysqlUser + ";"; connectionString += "PASSWORD=" + controller.mysqlPassword + ";"; return connectionString; } private Hashtable getConnectionProperties() { Hashtable connProperties = new Hashtable(); connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT); connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost); connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel); //WARNING: Hardcoded Channel Value !!! return connProperties; } private string getTransactionlogFilename() { DateTime now = DateTime.Now; string filename = controller.logDirectory + "\\"; filename += "transactionlog_" + name + "_" + now.Year.ToString("D4") + now.Month.ToString("D2") + ".log"; return filename; } public string getNowString() { DateTime now = DateTime.Now; return now.ToString("s"); } private void addLogEntry(string msg) { msg = getNowString() + " " + msg; lock (logEntries) { logEntries.AddFirst(msg); if (logEntries.Count > 20) { logEntries.RemoveLast(); } } } public string[] getLog() { lock(logEntries) { List tmpEntries = new List(); foreach (string s in logEntries) { tmpEntries.Add(s); } return tmpEntries.ToArray(); } } } }