using System; using System.Collections; using System.Collections.Generic; using System.IO; using System.Diagnostics; using IBM.WMQ; using MySql.Data.MySqlClient; using System.Globalization; namespace DaoMqPump2 { public class Transport { enum LogfileType { LogTransactions, LogEvents, LogDiscarded } public static string SQL2MQ = "sql2mq"; public static string MQ2SQL = "mq2sql"; //private bool enabled; TransportController controller; StatusData statusData = new StatusData(); public string name { get; private set; } public string direction { get; private set; } public string queueName { get; private set; } public string mq2sqlInsertQuery { get; private set; } public string sql2mqReadQuery { get; private set; } public string sql2mqUpdateQuery { get; private set; } //public bool lastrunOk { get; private set; } //public string lastErrorMessage { get; private set; } //public string lastOkTime { get; private set; } //public string lastErrorTime { get; private set; } //public string lastTransferTime { get; private set; } //public int counter { get; private set; } public StatusData TransportStatusData { get { return this.statusData; } } public bool Enabled { get { return statusData.transportEnabled; } set { statusData.transportEnabled = value; if (value == true) { this.addLogEntry("Transport enabled"); } else { this.addLogEntry("Transport disabled"); } } } private LinkedList logEntries = new LinkedList(); public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled) { this.controller = controller; this.name = name; this.direction = direction; this.queueName = queueName; this.mq2sqlInsertQuery = mq2sqlInsertQuery; this.sql2mqReadQuery = sql2mqReadQuery; this.sql2mqUpdateQuery = sql2mqUpdateQuery; statusData.transportEnabled = enabled; statusData.lastrunOk = true; statusData.counter = 0; statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = ""; addLogEntry( "Starting ... " ); } ~Transport() { addLogEntry("Stopping ... "); } public void transportMessages() { if (statusData.transportEnabled == false) return; Console.WriteLine(name + " -> transportMessages() "); statusData.lastrunOk = true; int startCounter = statusData.counter; if (direction == SQL2MQ) { transportSql2Mq(); } else { transportMq2Sql(); } if (statusData.lastrunOk == true) { statusData.lastOkTime = getNowString(); if (statusData.counter != startCounter) { //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet statusData.lastTransferTime = getNowString(); } } else { addLogEntry(statusData.lastErrorMessage); statusData.lastErrorTime = getNowString(); } } private void transportSql2Mq() { MQQueueManager mqMgr = null; MQQueue out_queue = null; string filename = getLogFilename(LogfileType.LogTransactions); using (StreamWriter translog = new StreamWriter(filename, true) ) try { //MQ Options Hashtable connProps = getConnectionProperties(); int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING; //MySQL Options string mysqlString = buildMysqlConnString(); //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq out_queue = mqMgr.AccessQueue(queueName, openOptions); using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString)) { sqlReadConnection.Open(); sqlWriteConnection.Open(); //stage 3 move messages string readSql = "CALL " + sql2mqReadQuery + "()"; MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection); MySqlDataReader dataReader = readCmd.ExecuteReader(); while (dataReader.Read()) { int id = dataReader.GetInt32(0); string msgString = dataReader.GetString(1); MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults, // same as MQPMO_DEFAULT MQMessage msg = new MQMessage(); msg.Format = MQC.MQFMT_STRING; msg.CharacterSet = 1252; msg.WriteString(msgString); out_queue.Put(msg, pmo); //now that the message has been put on queue mark it as such string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")"; MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection); int numrows = updateCmd.ExecuteNonQuery(); translog.WriteLine(getNowString() + " " + msgString); if (numrows != 1) { break; } statusData.counter++; } } } catch (Exception e) { statusData.lastrunOk = false; statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message; Console.WriteLine(statusData.lastErrorMessage); Console.WriteLine(e.StackTrace); EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning); } finally { if (out_queue != null && out_queue.IsOpen) { try { out_queue.Close(); } catch (Exception e) { Console.WriteLine("Error cleaning up qmgr " + e.Message); } } if (mqMgr != null && mqMgr.IsOpen) { try { mqMgr.Close(); } catch (Exception e) { Console.WriteLine("Error cleaning up qmgr " + e.Message); } } } } private void transportMq2Sql() { MQQueueManager mqMgr = null; MQQueue in_queue = null; string filename = getLogFilename(LogfileType.LogTransactions); using (StreamWriter translog = new StreamWriter(filename, true)) try { //MQ options Hashtable connProps = getConnectionProperties(); int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING; //MySQL options string mysqlString = buildMysqlConnString(); //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq in_queue = mqMgr.AccessQueue(queueName, openOptions); using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql { sqlConnection.Open(); //stage 3 move messages bool isContinue = true; while (isContinue) { MQMessage mqMsg = new MQMessage(); MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions(); mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen try { in_queue.Get(mqMsg, mqGetMsgOpts); if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0) { string msgString = mqMsg.ReadString(mqMsg.MessageLength); //System.Console.WriteLine(msgString); // Hvis transaktionen starter med et ? er det ikke en gyldig transaktion // validér ligeledes at headeren er gyldig if ( msgString.StartsWith("?") || validateSalt2Header(msgString) == false ) { string discarded_filename = getLogFilename(LogfileType.LogDiscarded); using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true)) { discardedlog.WriteLine(msgString); } mqMgr.Commit();//fjern den afviste transaktion fra køen statusData.discardedCounter++; continue; //gå frem til at tage næste transaktion fra køen } string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection); int numrows = sqlcmd.ExecuteNonQuery(); if (numrows == 1) { translog.WriteLine(getNowString() + " " + msgString); mqMgr.Commit(); statusData.counter++; } else { mqMgr.Backout(); isContinue = false; } } else { System.Console.WriteLine("Non-text message"); } } catch (MQException mqe) { isContinue = false; // report reason, if any if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE) { // special report for normal end System.Console.WriteLine("no more messages"); } else { // general report for other reasons System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ; statusData.lastrunOk = false; } } } } } catch (Exception e) { //Det vil være mest korrekt at Rollback/backout MQ Transaktionen her - for at være sikker på at Message'n fjernes fra køen try { if (mqMgr != null) { mqMgr.Backout(); } } catch (Exception e2) { this.addLogEntry("Error backing out from MQ Transaction: " + e2.Message); } statusData.lastrunOk = false; statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message; Console.WriteLine(statusData.lastErrorMessage); Console.WriteLine(e.StackTrace); EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning); } finally { if (in_queue != null && in_queue.IsOpen) { try { in_queue.Close(); } catch (Exception e) { Console.WriteLine("Error cleaning up qmgr " + e.Message); } } if (mqMgr != null && mqMgr.IsOpen) { try { mqMgr.Close(); } catch (Exception e) { Console.WriteLine("Error cleaning up qmgr " + e.Message); } } } } private string buildMysqlConnString() { string connectionString = ""; connectionString += "SERVER=" + controller.mysqlHost + ";"; //connectionString += "DATABASE=" + controller.mysqlHost + ";"; connectionString += "UID=" + controller.mysqlUser + ";"; connectionString += "PASSWORD=" + controller.mysqlPassword + ";"; connectionString += "Max Pool Size=20;"; //connectionString += "ConnectionReset=true;";//ConnectionReset kræver muligvis at Default Database er angivet - det virker ihvertfald ikke uden return connectionString; } private Hashtable getConnectionProperties() { Hashtable connProperties = new Hashtable(); connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT); connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost); connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel); return connProperties; } private string getLogFilename(LogfileType type) { DateTime now = DateTime.Now; string filename = controller.logDirectory + "\\"; //Find uge nr DateTimeFormatInfo dfi = DateTimeFormatInfo.CurrentInfo; Calendar myCal = CultureInfo.InvariantCulture.Calendar;//System.Globalization int week = myCal.GetWeekOfYear(now, dfi.CalendarWeekRule, dfi.FirstDayOfWeek); switch (type) { case LogfileType.LogEvents: filename += "eventlog_"; break; case LogfileType.LogTransactions: filename += "transactionlog_"; break; case LogfileType.LogDiscarded: filename += "discardedlog_"; break; } filename += name + "_" + now.Year.ToString("D4") + "_W" + week.ToString("D2") + ".log"; return filename; } public string getNowString() { DateTime now = DateTime.Now; return now.ToString("s"); } private bool validateSalt2Header(string salt2String) { int result; string afsender = salt2String.Substring(0, 5); string modtager = salt2String.Substring(5, 5); string afsenderTegnSaet = salt2String.Substring(10, 6); string standardNavn = salt2String.Substring(16, 6); string standardVersion = salt2String.Substring(22, 3); string afsenderSekvensnr = salt2String.Substring(25, 6); string afsenderTidsstempel = salt2String.Substring(31, 14); string afsenderBakkeIdent = salt2String.Substring(45, 5); string modtagerBakkeIdent = salt2String.Substring(50, 5); string transaktionForkortelse = salt2String.Substring(55, 4); string transaktionsLaengde = salt2String.Substring(59, 5); string prioritet = salt2String.Substring(64, 1); if (int.TryParse(standardVersion.Trim(), out result) == false) // standardVersion _skal_ være en int { return false; } if (int.TryParse(afsenderSekvensnr.Trim(), out result) == false) // afsenderSekvensnr _skal_ være en int { return false; } if (int.TryParse(afsenderTidsstempel.Trim(), out result) == false) // afsenderTidsstempel _skal_ være en int { return false; } if (int.TryParse(transaktionsLaengde.Trim(), out result) == false) // transaktionsLaengde _skal_ være en int { return false; } if ( int.TryParse(prioritet.Trim(), out result) == false ) // prioritet _skal_ være en int { return false; } return true; } private void addLogEntry(string msg) { msg = getNowString() + " " + msg; lock (logEntries) { logEntries.AddFirst(msg); if (logEntries.Count > 20) { logEntries.RemoveLast(); } } string filename = getLogFilename(LogfileType.LogEvents); using (StreamWriter eventlog = new StreamWriter(filename, true)) { eventlog.WriteLine(msg); } } public string[] getLog() { lock(logEntries) { List tmpEntries = new List(); foreach (string s in logEntries) { tmpEntries.Add(s); } return tmpEntries.ToArray(); } } } }