--- dao/DaoMqPump2/MQFilter/FilterController.cs 2014/05/16 19:03:46 2167 +++ dao/DaoMqPump2/MQFilter/FilterController.cs 2014/05/19 19:51:47 2177 @@ -1,10 +1,16 @@ using System; +using System.Collections; using System.Collections.Generic; using System.IO; using System.Text.RegularExpressions; +using IBM.WMQ; + using Microsoft.Win32; +using DaoCommon; + + namespace MQFilter { class FilterController @@ -18,10 +24,20 @@ public string[] filterTranscations { get; private set; } + Logfile logFile; + + + public static readonly string queueNameIndbakke = "DAO.INDBAKKE"; + public static readonly string queueNameDimaps = "DAO.SAMLET"; + public static readonly string queueNameMysql = "DAO.ALL"; + public static readonly string queueNameStore = "DAO.STORE"; + protected FilterController() { initialize(); + logFile = new Logfile(LogfileType.LogEvents, "filter", logDirectory); + logFile.addSingleLogEntry("Starting service"); } private void initialize() @@ -33,48 +49,246 @@ //Læser globale MQ Parametre mqHost = (string)key.GetValue("MQHost"); if (mqHost == null || mqHost.Length == 0) + { + key.SetValue("MQHost", "", RegistryValueKind.String); throw new System.ArgumentException("MQHost cannot be null or empty"); + } mqChannel = (string)key.GetValue("MQChannel"); if (mqChannel == null || mqChannel.Length == 0) + { + key.SetValue("MQChannel", "", RegistryValueKind.String); throw new System.ArgumentException("MQChannel cannot be null or empty"); + } mqQueueManager = (string)key.GetValue("MQQueueManager"); if (mqQueueManager == null || mqQueueManager.Length == 0) + { + key.SetValue("MQQueueManager", "", RegistryValueKind.String); throw new System.ArgumentException("MQQueueManager cannot be null or empty"); + } - //Læser øvrige parametre + //////////// logDirectory = (string)key.GetValue("LogDirectory"); if (logDirectory == null || logDirectory.Length == 0) + { + key.SetValue("LogDirectory", "", RegistryValueKind.String); throw new System.ArgumentException("LogDirectory cannot be null or empty"); + } + + if (Directory.Exists(logDirectory) == false) + { + Directory.CreateDirectory(logDirectory); + } + + //////////// String tmpFilterTransactions = (string)key.GetValue("FilterTransactions"); if (tmpFilterTransactions == null || tmpFilterTransactions.Length == 0) - throw new System.ArgumentException("LogDirectory cannot be null or empty"); - filterTranscations = Regex.Split( tmpFilterTransactions, "," ); + { + key.SetValue("FilterTransactions", "", RegistryValueKind.String); + throw new System.ArgumentException("FilterTransactions cannot be null or empty"); + } + tmpFilterTransactions = tmpFilterTransactions.Replace(';', ','); + filterTranscations = Regex.Split(tmpFilterTransactions, ","); for (int i = 0; i < filterTranscations.Length; i++) { - filterTranscations[i] = filterTranscations[i].Trim(); + filterTranscations[i] = filterTranscations[i].Trim().ToUpper(); } + //////////// - if (Directory.Exists(logDirectory) == false) + } + + + + + public void transportAllMessages() + { + try { - Directory.CreateDirectory(logDirectory); + transportMessagesWorker(); + } + catch (Exception e) + { + logFile.addSingleLogEntry("Error during transportAllMessages: " + e.Message); + } + } + + private void transportMessagesWorker() + { + int messageCount = 0; + + MQQueueManager mqMgr = null; + MQQueue queueIndbakke = null; + MQQueue queueMysql = null; + MQQueue queueDimaps = null; + MQQueue queueStore = null; + + using (Logfile translog = new Logfile(LogfileType.LogTransactions, "filter", logDirectory)) + try + { + //MQ options + Hashtable connProps = MQHelper.getConnectionProperties(mqHost, mqChannel); + int openInputOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING; + int openOutputOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING; + + + //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement + mqMgr = new MQQueueManager(mqQueueManager, connProps);//stage 1 connect to mq + + + queueIndbakke = MQHelper.openQueueHelper(queueNameIndbakke, mqMgr, openInputOptions); + + queueMysql = MQHelper.openQueueHelper(queueNameMysql, mqMgr, openOutputOptions); + queueDimaps = MQHelper.openQueueHelper(queueNameDimaps, mqMgr, openOutputOptions); + queueStore = MQHelper.openQueueHelper(queueNameStore, mqMgr, openOutputOptions); + + + bool isContinue = true; + while (isContinue) + { + + MQMessage mqMsg = new MQMessage(); + MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions(); + + + try + { + queueIndbakke.Get(mqMsg, mqGetMsgOpts); + if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0) + { + string salt2String = mqMsg.ReadString(mqMsg.MessageLength); + //System.Console.WriteLine(msgString); + + + // validér at headeren er gyldig + if ( Salt2Helper.validateSalt2Header(salt2String) == false) + { + + + using (Logfile discardedlog = new Logfile(LogfileType.LogDiscarded, "filter", logDirectory)) + { + discardedlog.writeLogEntry(salt2String); + } + continue; //gå frem til at tage næste transaktion fra køen + } + + translog.writeLogEntry(salt2String); + + MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults, + // same as MQPMO_DEFAULT + + MQMessage msg = new MQMessage(); + msg.Format = MQC.MQFMT_STRING; + msg.CharacterSet = 1252; + msg.WriteString(salt2String); + + Salt2Header header = Salt2Helper.parseHeader(salt2String); + queueMysql.Put(msg, pmo); + if (saveForLater(header)) + { + queueStore.Put(msg, pmo); + } + else + { + queueDimaps.Put(msg, pmo); + } + + + + messageCount++;// increment per run message counter + if (messageCount >= 10000) // if we have moved 10000 messages in this run - let it go + { + isContinue = false; + } + + + + + } + else + { + System.Console.WriteLine("Non-text message"); + } + } + catch (MQException mqe) + { + isContinue = false; + + // report reason, if any + if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE) + { + // special report for normal end + System.Console.WriteLine("no more messages"); + } + else + { + // general report for other reasons + System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ; + + } + + } + + } + + + } + finally + { + MQHelper.closeQueueSafe(queueIndbakke); + MQHelper.closeQueueSafe(queueMysql); + MQHelper.closeQueueSafe(queueDimaps); + MQHelper.closeQueueSafe(queueStore); + MQHelper.closeQueueManagerSafe(mqMgr); + } } + private Boolean saveForLater(Salt2Header header) + { + DateTime now = DateTime.Now; + int hour = now.Hour; + if (hour >= 14 && hour < 18) + { + + if (contains(header.transaktionForkortelse, this.filterTranscations) ) //Så længe vi skal være .net3.0 kompatible er LINQ problematisk (LINQ kræver 3.5) + { + return true; + } + else + { + return false; + } + } + else //normal operation - send straight trough + { + return false; + } - public void transportAllMessages() + } + + private bool contains(string needle, string[] haystack) // s { + foreach(string hay in haystack) + { + if (needle.Equals(hay)) + { + return true; + } + } + + return false; } + /* Singleton */ private static FilterController instance = null; public static FilterController getInstance()