--- dao/DaoMqPump2/MQFilter/FilterController.cs 2014/05/16 19:03:46 2167 +++ dao/DaoMqPump2/MQFilter/FilterController.cs 2014/05/17 11:09:50 2173 @@ -1,10 +1,16 @@ using System; +using System.Collections; using System.Collections.Generic; using System.IO; using System.Text.RegularExpressions; +using IBM.WMQ; + using Microsoft.Win32; +using DaoCommon; + + namespace MQFilter { class FilterController @@ -19,6 +25,12 @@ public string[] filterTranscations { get; private set; } + public static readonly string queueNameIndbakke = "DAO.INDBAKKE"; + public static readonly string queueNameDimaps = "DAO.SAMLET"; + public static readonly string queueNameMysql = "DAO.ALL"; + public static readonly string queueNameStore = "DAO.STORE"; + + protected FilterController() { initialize(); @@ -43,38 +55,202 @@ if (mqQueueManager == null || mqQueueManager.Length == 0) throw new System.ArgumentException("MQQueueManager cannot be null or empty"); - //Læser øvrige parametre + //////////// logDirectory = (string)key.GetValue("LogDirectory"); if (logDirectory == null || logDirectory.Length == 0) throw new System.ArgumentException("LogDirectory cannot be null or empty"); + if (Directory.Exists(logDirectory) == false) + { + Directory.CreateDirectory(logDirectory); + } + + //////////// + String tmpFilterTransactions = (string)key.GetValue("FilterTransactions"); if (tmpFilterTransactions == null || tmpFilterTransactions.Length == 0) - throw new System.ArgumentException("LogDirectory cannot be null or empty"); - filterTranscations = Regex.Split( tmpFilterTransactions, "," ); + throw new System.ArgumentException("FilterTransactions cannot be null or empty"); + filterTranscations = Regex.Split(tmpFilterTransactions, ","); for (int i = 0; i < filterTranscations.Length; i++) { filterTranscations[i] = filterTranscations[i].Trim(); } + //////////// - if (Directory.Exists(logDirectory) == false) + } + + + + + + public void transportAllMessages() + { + int messageCount = 0; + + MQQueueManager mqMgr = null; + MQQueue queueIndbakke = null; + MQQueue queueMysql = null; + MQQueue queueDimaps = null; + MQQueue queueStore = null; + try { - Directory.CreateDirectory(logDirectory); + //MQ options + Hashtable connProps = MQHelper.getConnectionProperties(mqHost, mqChannel); + int openInputOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING; + int openOutputOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING; + + + //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement + mqMgr = new MQQueueManager(mqQueueManager, connProps);//stage 1 connect to mq + + queueIndbakke = mqMgr.AccessQueue(queueNameIndbakke, openInputOptions); + + queueMysql = mqMgr.AccessQueue(queueNameMysql, openOutputOptions); + queueDimaps = mqMgr.AccessQueue(queueNameDimaps, openOutputOptions); + queueStore = mqMgr.AccessQueue(queueNameStore, openOutputOptions); + + + bool isContinue = true; + while (isContinue) + { + + MQMessage mqMsg = new MQMessage(); + MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions(); + + + try + { + queueIndbakke.Get(mqMsg, mqGetMsgOpts); + if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0) + { + string salt2String = mqMsg.ReadString(mqMsg.MessageLength); + //System.Console.WriteLine(msgString); + + + // validér at headeren er gyldig + if ( Salt2Helper.validateSalt2Header(salt2String) == false) + { + string discarded_filename = Logfile.getLogFilename(LogfileType.LogTransactions, logDirectory, "filter"); + using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true)) + { + discardedlog.WriteLine(Logfile.getNowString() + " " + salt2String); + } + continue; //gå frem til at tage næste transaktion fra køen + } + + MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults, + // same as MQPMO_DEFAULT + + MQMessage msg = new MQMessage(); + msg.Format = MQC.MQFMT_STRING; + msg.CharacterSet = 1252; + msg.WriteString(salt2String); + + Salt2Header header = Salt2Helper.parseHeader(salt2String); + queueMysql.Put(msg, pmo); + if (saveForLater(header)) + { + queueStore.Put(msg, pmo); + } + else + { + queueDimaps.Put(msg, pmo); + } + + + + messageCount++;// increment per run message counter + if (messageCount >= 10000) // if we have moved 10000 messages in this run - let it go + { + isContinue = false; + } + + + + + } + else + { + System.Console.WriteLine("Non-text message"); + } + } + catch (MQException mqe) + { + isContinue = false; + + // report reason, if any + if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE) + { + // special report for normal end + System.Console.WriteLine("no more messages"); + } + else + { + // general report for other reasons + System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ; + + } + + } + + } + + + } + finally + { + MQHelper.closeQueueSafe(queueIndbakke); + MQHelper.closeQueueSafe(queueMysql); + MQHelper.closeQueueSafe(queueDimaps); + MQHelper.closeQueueSafe(queueStore); + MQHelper.closeQueueManagerSafe(mqMgr); + } } + private Boolean saveForLater(Salt2Header header) + { + DateTime now = DateTime.Now; + int hour = now.Hour; + if (hour >= 14 && hour < 18) + { + + if (contains(header.transaktionForkortelse, this.filterTranscations) ) //Så længe vi skal være .net3.0 kompatible er LINQ problematisk (LINQ kræver 3.5) + { + return true; + } + else + { + return false; + } + } + else //normal operation - send straight trough + { + return false; + } + } - public void transportAllMessages() + private bool contains(string needle, string[] haystack) // s { + foreach(string hay in haystack) + { + if (needle.Equals(hay)) + { + return true; + } + } + + return false; } + /* Singleton */ private static FilterController instance = null; public static FilterController getInstance()