using System; using System.Collections; using System.Collections.Generic; using System.IO; using System.Text.RegularExpressions; using IBM.WMQ; using Microsoft.Win32; using DaoCommon; namespace MQFilter { class FilterController { public string mqHost { get; private set; } public string mqChannel { get; private set; } public string mqQueueManager { get; private set; } public string logDirectory { get; private set; } public string[] filterTranscations { get; private set; } public static readonly string queueNameIndbakke = "DAO.INDBAKKE"; public static readonly string queueNameDimaps = "DAO.SAMLET"; public static readonly string queueNameMysql = "DAO.ALL"; public static readonly string queueNameStore = "DAO.STORE"; protected FilterController() { initialize(); } private void initialize() { Console.WriteLine("FilterController: Loading config"); RegistryKey key = Registry.LocalMachine.CreateSubKey("Software\\DAO\\MQFilter"); //Læser globale MQ Parametre mqHost = (string)key.GetValue("MQHost"); if (mqHost == null || mqHost.Length == 0) throw new System.ArgumentException("MQHost cannot be null or empty"); mqChannel = (string)key.GetValue("MQChannel"); if (mqChannel == null || mqChannel.Length == 0) throw new System.ArgumentException("MQChannel cannot be null or empty"); mqQueueManager = (string)key.GetValue("MQQueueManager"); if (mqQueueManager == null || mqQueueManager.Length == 0) throw new System.ArgumentException("MQQueueManager cannot be null or empty"); //////////// logDirectory = (string)key.GetValue("LogDirectory"); if (logDirectory == null || logDirectory.Length == 0) throw new System.ArgumentException("LogDirectory cannot be null or empty"); if (Directory.Exists(logDirectory) == false) { Directory.CreateDirectory(logDirectory); } //////////// String tmpFilterTransactions = (string)key.GetValue("FilterTransactions"); if (tmpFilterTransactions == null || tmpFilterTransactions.Length == 0) throw new System.ArgumentException("FilterTransactions cannot be null or empty"); filterTranscations = Regex.Split(tmpFilterTransactions, ","); for (int i = 0; i < filterTranscations.Length; i++) { filterTranscations[i] = filterTranscations[i].Trim(); } //////////// } private Hashtable getConnectionProperties() { Hashtable connProperties = new Hashtable(); connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT); connProperties.Add(MQC.HOST_NAME_PROPERTY, mqHost); connProperties.Add(MQC.CHANNEL_PROPERTY, mqChannel); return connProperties; } public void transportAllMessages() { int messageCount = 0; MQQueueManager mqMgr = null; MQQueue queueIndbakke = null; MQQueue queueMysql = null; MQQueue queueDimaps = null; MQQueue queueStore = null; try { //MQ options Hashtable connProps = getConnectionProperties(); int openInputOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING; int openOutputOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING; //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement mqMgr = new MQQueueManager(mqQueueManager, connProps);//stage 1 connect to mq queueIndbakke = mqMgr.AccessQueue(queueNameIndbakke, openInputOptions); queueMysql = mqMgr.AccessQueue(queueNameMysql, openOutputOptions); queueDimaps = mqMgr.AccessQueue(queueNameDimaps, openOutputOptions); queueStore = mqMgr.AccessQueue(queueNameStore, openOutputOptions); bool isContinue = true; while (isContinue) { MQMessage mqMsg = new MQMessage(); MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions(); try { queueIndbakke.Get(mqMsg, mqGetMsgOpts); if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0) { string msgString = mqMsg.ReadString(mqMsg.MessageLength); //System.Console.WriteLine(msgString); // Hvis transaktionen starter med et ? er det ikke en gyldig transaktion // validér ligeledes at headeren er gyldig if (msgString.StartsWith("?") || Salt2Helper.validateSalt2Header(msgString) == false) { string discarded_filename = Logfile.getLogFilename(LogfileType.LogTransactions, logDirectory, "filter"); using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true)) { discardedlog.WriteLine(Logfile.getNowString() + " " + msgString); } continue; //gå frem til at tage næste transaktion fra køen } messageCount++;// increment per run message counter if (messageCount >= 10000) // if we have moved 10000 messages in this run - let it go { isContinue = false; } } else { System.Console.WriteLine("Non-text message"); } } catch (MQException mqe) { isContinue = false; // report reason, if any if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE) { // special report for normal end System.Console.WriteLine("no more messages"); } else { // general report for other reasons System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ; } } } } finally { closeQueue(queueIndbakke); closeQueue(queueMysql); closeQueue(queueDimaps); closeQueue(queueStore); if (mqMgr != null && mqMgr.IsOpen) { try { mqMgr.Close(); } catch (Exception e) { Console.WriteLine("Error cleaning up qmgr " + e.Message); } } } } private void closeQueue(MQQueue queue) { if (queue != null && queue.IsOpen) { try { queue.Close(); } catch (Exception e) { Console.WriteLine("Error cleaning up queue " + e.Message); } } } /* Singleton */ private static FilterController instance = null; public static FilterController getInstance() { if (instance == null) instance = new FilterController(); return instance; } } }