/[projects]/dao/DaoMqPump2/MQFilter/FilterController.cs
ViewVC logotype

Contents of /dao/DaoMqPump2/MQFilter/FilterController.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2174 - (show annotations) (download)
Sat May 17 11:25:26 2014 UTC (10 years ago) by torben
File size: 9311 byte(s)
Handle mq headers as upper-case trimmed string
1 using System;
2 using System.Collections;
3 using System.Collections.Generic;
4 using System.IO;
5 using System.Text.RegularExpressions;
6
7 using IBM.WMQ;
8
9 using Microsoft.Win32;
10
11 using DaoCommon;
12
13
14 namespace MQFilter
15 {
16 class FilterController
17 {
18
19 public string mqHost { get; private set; }
20 public string mqChannel { get; private set; }
21 public string mqQueueManager { get; private set; }
22
23 public string logDirectory { get; private set; }
24
25 public string[] filterTranscations { get; private set; }
26
27
28 public static readonly string queueNameIndbakke = "DAO.INDBAKKE";
29 public static readonly string queueNameDimaps = "DAO.SAMLET";
30 public static readonly string queueNameMysql = "DAO.ALL";
31 public static readonly string queueNameStore = "DAO.STORE";
32
33
34 protected FilterController()
35 {
36 initialize();
37 }
38
39 private void initialize()
40 {
41 Console.WriteLine("FilterController: Loading config");
42 RegistryKey key = Registry.LocalMachine.CreateSubKey("Software\\DAO\\MQFilter");
43
44
45 //Læser globale MQ Parametre
46 mqHost = (string)key.GetValue("MQHost");
47 if (mqHost == null || mqHost.Length == 0)
48 throw new System.ArgumentException("MQHost cannot be null or empty");
49
50 mqChannel = (string)key.GetValue("MQChannel");
51 if (mqChannel == null || mqChannel.Length == 0)
52 throw new System.ArgumentException("MQChannel cannot be null or empty");
53
54 mqQueueManager = (string)key.GetValue("MQQueueManager");
55 if (mqQueueManager == null || mqQueueManager.Length == 0)
56 throw new System.ArgumentException("MQQueueManager cannot be null or empty");
57
58 ////////////
59
60 logDirectory = (string)key.GetValue("LogDirectory");
61 if (logDirectory == null || logDirectory.Length == 0)
62 throw new System.ArgumentException("LogDirectory cannot be null or empty");
63
64 if (Directory.Exists(logDirectory) == false)
65 {
66 Directory.CreateDirectory(logDirectory);
67 }
68
69 ////////////
70
71 String tmpFilterTransactions = (string)key.GetValue("FilterTransactions");
72 if (tmpFilterTransactions == null || tmpFilterTransactions.Length == 0)
73 throw new System.ArgumentException("FilterTransactions cannot be null or empty");
74 tmpFilterTransactions = tmpFilterTransactions.Replace(';', ',');
75 filterTranscations = Regex.Split(tmpFilterTransactions, ",");
76
77 for (int i = 0; i < filterTranscations.Length; i++)
78 {
79 filterTranscations[i] = filterTranscations[i].Trim().ToUpper();
80 }
81
82 ////////////
83
84 }
85
86
87
88
89
90 public void transportAllMessages()
91 {
92 int messageCount = 0;
93
94 MQQueueManager mqMgr = null;
95 MQQueue queueIndbakke = null;
96 MQQueue queueMysql = null;
97 MQQueue queueDimaps = null;
98 MQQueue queueStore = null;
99 try
100 {
101 //MQ options
102 Hashtable connProps = MQHelper.getConnectionProperties(mqHost, mqChannel);
103 int openInputOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
104 int openOutputOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
105
106
107 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
108 mqMgr = new MQQueueManager(mqQueueManager, connProps);//stage 1 connect to mq
109
110 queueIndbakke = mqMgr.AccessQueue(queueNameIndbakke, openInputOptions);
111
112 queueMysql = mqMgr.AccessQueue(queueNameMysql, openOutputOptions);
113 queueDimaps = mqMgr.AccessQueue(queueNameDimaps, openOutputOptions);
114 queueStore = mqMgr.AccessQueue(queueNameStore, openOutputOptions);
115
116
117 bool isContinue = true;
118 while (isContinue)
119 {
120
121 MQMessage mqMsg = new MQMessage();
122 MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
123
124
125 try
126 {
127 queueIndbakke.Get(mqMsg, mqGetMsgOpts);
128 if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
129 {
130 string salt2String = mqMsg.ReadString(mqMsg.MessageLength);
131 //System.Console.WriteLine(msgString);
132
133
134 // validér at headeren er gyldig
135 if ( Salt2Helper.validateSalt2Header(salt2String) == false)
136 {
137 string discarded_filename = Logfile.getLogFilename(LogfileType.LogTransactions, logDirectory, "filter");
138 using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))
139 {
140 discardedlog.WriteLine(Logfile.getNowString() + " " + salt2String);
141 }
142 continue; //gå frem til at tage næste transaktion fra køen
143 }
144
145 MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
146 // same as MQPMO_DEFAULT
147
148 MQMessage msg = new MQMessage();
149 msg.Format = MQC.MQFMT_STRING;
150 msg.CharacterSet = 1252;
151 msg.WriteString(salt2String);
152
153 Salt2Header header = Salt2Helper.parseHeader(salt2String);
154 queueMysql.Put(msg, pmo);
155 if (saveForLater(header))
156 {
157 queueStore.Put(msg, pmo);
158 }
159 else
160 {
161 queueDimaps.Put(msg, pmo);
162 }
163
164
165
166 messageCount++;// increment per run message counter
167 if (messageCount >= 10000) // if we have moved 10000 messages in this run - let it go
168 {
169 isContinue = false;
170 }
171
172
173
174
175 }
176 else
177 {
178 System.Console.WriteLine("Non-text message");
179 }
180 }
181 catch (MQException mqe)
182 {
183 isContinue = false;
184
185 // report reason, if any
186 if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
187 {
188 // special report for normal end
189 System.Console.WriteLine("no more messages");
190 }
191 else
192 {
193 // general report for other reasons
194 System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
195
196 }
197
198 }
199
200 }
201
202
203
204 }
205 finally
206 {
207 MQHelper.closeQueueSafe(queueIndbakke);
208 MQHelper.closeQueueSafe(queueMysql);
209 MQHelper.closeQueueSafe(queueDimaps);
210 MQHelper.closeQueueSafe(queueStore);
211
212 MQHelper.closeQueueManagerSafe(mqMgr);
213 }
214 }
215
216 private Boolean saveForLater(Salt2Header header)
217 {
218
219 DateTime now = DateTime.Now;
220 int hour = now.Hour;
221 if (hour >= 14 && hour < 18)
222 {
223
224 if (contains(header.transaktionForkortelse, this.filterTranscations) ) //Så længe vi skal være .net3.0 kompatible er LINQ problematisk (LINQ kræver 3.5)
225 {
226 return true;
227 }
228 else
229 {
230 return false;
231 }
232 }
233 else //normal operation - send straight trough
234 {
235 return false;
236 }
237
238 }
239
240 private bool contains(string needle, string[] haystack) // s
241 {
242 foreach(string hay in haystack)
243 {
244 if (needle.Equals(hay))
245 {
246 return true;
247 }
248 }
249
250 return false;
251 }
252
253
254
255
256 /* Singleton */
257 private static FilterController instance = null;
258 public static FilterController getInstance()
259 {
260 if (instance == null)
261 instance = new FilterController();
262
263 return instance;
264 }
265
266 }
267 }

  ViewVC Help
Powered by ViewVC 1.1.20