--- dao/DaoMqPump2/MQFilter/FilterController.cs 2014/05/16 19:03:46 2167 +++ dao/DaoMqPump2/MQFilter/FilterController.cs 2014/05/16 20:56:22 2168 @@ -1,10 +1,16 @@ using System; +using System.Collections; using System.Collections.Generic; using System.IO; using System.Text.RegularExpressions; +using IBM.WMQ; + using Microsoft.Win32; +using DaoCommon; + + namespace MQFilter { class FilterController @@ -19,6 +25,12 @@ public string[] filterTranscations { get; private set; } + public static readonly string queueNameIndbakke = "DAO.INDBAKKE"; + public static readonly string queueNameDimaps = "DAO.SAMLET"; + public static readonly string queueNameMysql = "DAO.ALL"; + public static readonly string queueNameStore = "DAO.STORE"; + + protected FilterController() { initialize(); @@ -43,34 +55,182 @@ if (mqQueueManager == null || mqQueueManager.Length == 0) throw new System.ArgumentException("MQQueueManager cannot be null or empty"); - //Læser øvrige parametre + //////////// logDirectory = (string)key.GetValue("LogDirectory"); if (logDirectory == null || logDirectory.Length == 0) throw new System.ArgumentException("LogDirectory cannot be null or empty"); + if (Directory.Exists(logDirectory) == false) + { + Directory.CreateDirectory(logDirectory); + } + + //////////// + String tmpFilterTransactions = (string)key.GetValue("FilterTransactions"); if (tmpFilterTransactions == null || tmpFilterTransactions.Length == 0) - throw new System.ArgumentException("LogDirectory cannot be null or empty"); - filterTranscations = Regex.Split( tmpFilterTransactions, "," ); + throw new System.ArgumentException("FilterTransactions cannot be null or empty"); + filterTranscations = Regex.Split(tmpFilterTransactions, ","); for (int i = 0; i < filterTranscations.Length; i++) { filterTranscations[i] = filterTranscations[i].Trim(); } - - if (Directory.Exists(logDirectory) == false) - { - Directory.CreateDirectory(logDirectory); - } + //////////// } + private Hashtable getConnectionProperties() + { + Hashtable connProperties = new Hashtable(); + connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT); + connProperties.Add(MQC.HOST_NAME_PROPERTY, mqHost); + connProperties.Add(MQC.CHANNEL_PROPERTY, mqChannel); + return connProperties; + } + public void transportAllMessages() { + int messageCount = 0; + + MQQueueManager mqMgr = null; + MQQueue queueIndbakke = null; + MQQueue queueMysql = null; + MQQueue queueDimaps = null; + MQQueue queueStore = null; + try + { + //MQ options + Hashtable connProps = getConnectionProperties(); + int openInputOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING; + int openOutputOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING; + + + //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement + mqMgr = new MQQueueManager(mqQueueManager, connProps);//stage 1 connect to mq + + queueIndbakke = mqMgr.AccessQueue(queueNameIndbakke, openInputOptions); + + queueMysql = mqMgr.AccessQueue(queueNameMysql, openOutputOptions); + queueDimaps = mqMgr.AccessQueue(queueNameDimaps, openOutputOptions); + queueStore = mqMgr.AccessQueue(queueNameStore, openOutputOptions); + + + bool isContinue = true; + while (isContinue) + { + + MQMessage mqMsg = new MQMessage(); + MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions(); + + + try + { + queueIndbakke.Get(mqMsg, mqGetMsgOpts); + if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0) + { + string msgString = mqMsg.ReadString(mqMsg.MessageLength); + //System.Console.WriteLine(msgString); + + + // Hvis transaktionen starter med et ? er det ikke en gyldig transaktion + // validér ligeledes at headeren er gyldig + if (msgString.StartsWith("?") || DaoUtil.validateSalt2Header(msgString) == false) + { + string discarded_filename = Logfile.getLogFilename(LogfileType.LogTransactions, logDirectory, "filter"); + using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true)) + { + discardedlog.WriteLine(Logfile.getNowString() + " " + msgString); + } + continue; //gå frem til at tage næste transaktion fra køen + } + + + + + + + + + messageCount++;// increment per run message counter + if (messageCount >= 10000) // if we have moved 10000 messages in this run - let it go + { + isContinue = false; + } + + + + + } + else + { + System.Console.WriteLine("Non-text message"); + } + } + catch (MQException mqe) + { + isContinue = false; + + // report reason, if any + if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE) + { + // special report for normal end + System.Console.WriteLine("no more messages"); + } + else + { + // general report for other reasons + System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ; + + } + + } + + } + + + + } + finally + { + closeQueue(queueIndbakke); + closeQueue(queueMysql); + closeQueue(queueDimaps); + closeQueue(queueStore); + + + if (mqMgr != null && mqMgr.IsOpen) + { + try + { + mqMgr.Close(); + } + catch (Exception e) + { + Console.WriteLine("Error cleaning up qmgr " + e.Message); + } + } + } + } + + private void closeQueue(MQQueue queue) + { + if (queue != null && queue.IsOpen) + { + try + { + queue.Close(); + } + catch (Exception e) + { + Console.WriteLine("Error cleaning up queue " + e.Message); + } + } + }