/[projects]/dao/DaoMqPump2/MQFilter/FilterController.cs
ViewVC logotype

Contents of /dao/DaoMqPump2/MQFilter/FilterController.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2182 - (show annotations) (download)
Tue May 20 20:48:48 2014 UTC (9 years, 11 months ago) by torben
File size: 11799 byte(s)
DAO Transactions go straight through
1 using System;
2 using System.Collections;
3 using System.Collections.Generic;
4 using System.IO;
5 using System.Text.RegularExpressions;
6
7 using IBM.WMQ;
8
9 using Microsoft.Win32;
10
11 using DaoCommon;
12
13
14 namespace MQFilter
15 {
16 class FilterController
17 {
18
19 public string mqHost { get; private set; }
20 public string mqChannel { get; private set; }
21 public string mqQueueManager { get; private set; }
22
23 public string logDirectory { get; private set; }
24
25 public string[] filterTranscations { get; private set; }
26
27 Logfile logFile;
28
29
30 public static readonly string queueNameIndbakke = "DAO.INDBAKKE";
31 public static readonly string queueNameDimaps = "DAO.SAMLET";
32 public static readonly string queueNameMysql = "DAO.ALL";
33 public static readonly string queueNameStore = "DAO.STORE";
34
35 TimeSpan silentPeriodBegin;
36 TimeSpan silentPeriodEnd;
37
38 protected FilterController()
39 {
40 initialize();
41 logFile = new Logfile(LogfileType.LogEvents, "filter", logDirectory);
42 logFile.addSingleLogEntry("Starting service");
43 }
44
45 private void initialize()
46 {
47 Console.WriteLine("FilterController: Loading config");
48 RegistryKey key = Registry.LocalMachine.CreateSubKey("Software\\DAO\\MQFilter");
49
50
51 //Læser globale MQ Parametre
52 mqHost = (string)key.GetValue("MQHost");
53 if (mqHost == null || mqHost.Length == 0)
54 {
55 key.SetValue("MQHost", "", RegistryValueKind.String);
56 throw new System.ArgumentException("MQHost cannot be null or empty");
57 }
58
59 mqChannel = (string)key.GetValue("MQChannel");
60 if (mqChannel == null || mqChannel.Length == 0)
61 {
62 key.SetValue("MQChannel", "", RegistryValueKind.String);
63 throw new System.ArgumentException("MQChannel cannot be null or empty");
64 }
65
66 mqQueueManager = (string)key.GetValue("MQQueueManager");
67 if (mqQueueManager == null || mqQueueManager.Length == 0)
68 {
69 key.SetValue("MQQueueManager", "", RegistryValueKind.String);
70 throw new System.ArgumentException("MQQueueManager cannot be null or empty");
71 }
72
73 ////////////
74
75 logDirectory = (string)key.GetValue("LogDirectory");
76 if (logDirectory == null || logDirectory.Length == 0)
77 {
78 key.SetValue("LogDirectory", "", RegistryValueKind.String);
79 throw new System.ArgumentException("LogDirectory cannot be null or empty");
80 }
81
82 if (Directory.Exists(logDirectory) == false)
83 {
84 Directory.CreateDirectory(logDirectory);
85 }
86
87 ////////////
88
89 string silentBeginStr = (string)key.GetValue("SilentPeriodBegin");
90 if (silentBeginStr == null || silentBeginStr.Length == 0)
91 {
92 key.SetValue("SilentPeriodBegin", "", RegistryValueKind.String);
93 throw new System.ArgumentException("SilentPeriodBegin cannot be null or empty");
94 }
95 silentPeriodBegin = TimeSpan.Parse(silentBeginStr);
96
97 string silentEndStr = (string)key.GetValue("SilentPeriodEnd");
98 if (silentEndStr == null || silentEndStr.Length == 0)
99 {
100 key.SetValue("SilentPeriodEnd", "", RegistryValueKind.String);
101 throw new System.ArgumentException("SilentPeriodEnd cannot be null or empty");
102 }
103 silentPeriodEnd = TimeSpan.Parse(silentEndStr);
104
105 ////////////
106
107 String tmpFilterTransactions = (string)key.GetValue("FilterTransactions");
108 if (tmpFilterTransactions == null || tmpFilterTransactions.Length == 0)
109 {
110 key.SetValue("FilterTransactions", "", RegistryValueKind.String);
111 throw new System.ArgumentException("FilterTransactions cannot be null or empty");
112 }
113 tmpFilterTransactions = tmpFilterTransactions.Replace(';', ',');
114 filterTranscations = Regex.Split(tmpFilterTransactions, ",");
115
116 for (int i = 0; i < filterTranscations.Length; i++)
117 {
118 filterTranscations[i] = filterTranscations[i].Trim().ToUpper();
119 }
120
121 ////////////
122
123 }
124
125
126
127
128 public void transportAllMessages()
129 {
130 try
131 {
132 transportMessagesWorker();
133 }
134 catch (Exception e)
135 {
136 logFile.addSingleLogEntry("Error during transportAllMessages: " + e.Message);
137 Console.WriteLine(e.StackTrace);
138 }
139 }
140
141 private void transportMessagesWorker()
142 {
143 int messageCount = 0;
144
145 MQQueueManager mqMgr = null;
146 MQQueue queueIndbakke = null;
147 MQQueue queueMysql = null;
148 MQQueue queueDimaps = null;
149 MQQueue queueStore = null;
150
151 using (Logfile translog = new Logfile(LogfileType.LogTransactions, "filter", logDirectory))
152 try
153 {
154 //MQ options
155 Hashtable connProps = MQHelper.getConnectionProperties(mqHost, mqChannel);
156 int openInputOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
157 int openOutputOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
158
159
160 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
161 mqMgr = new MQQueueManager(mqQueueManager, connProps);//stage 1 connect to mq
162
163
164 queueIndbakke = MQHelper.openQueueHelper(queueNameIndbakke, mqMgr, openInputOptions);
165
166 queueMysql = MQHelper.openQueueHelper(queueNameMysql, mqMgr, openOutputOptions);
167 queueDimaps = MQHelper.openQueueHelper(queueNameDimaps, mqMgr, openOutputOptions);
168 queueStore = MQHelper.openQueueHelper(queueNameStore, mqMgr, openOutputOptions);
169
170
171 bool isContinue = true;
172 while (isContinue)
173 {
174
175 MQMessage mqMsg = new MQMessage();
176 MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
177
178
179 try
180 {
181 queueIndbakke.Get(mqMsg, mqGetMsgOpts);
182 if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
183 {
184 if (mqMsg.MessageLength == 0) //Skip empty transactions (so far only seen on my test server)
185 continue;
186
187 //System.Console.WriteLine("LEN>" + mqMsg.MessageLength);
188 string salt2String = mqMsg.ReadString(mqMsg.MessageLength);
189
190
191
192 // validér at headeren er gyldig
193 if ( Salt2Helper.validateSalt2Header(salt2String) == false)
194 {
195
196
197 using (Logfile discardedlog = new Logfile(LogfileType.LogDiscarded, "filter", logDirectory))
198 {
199 discardedlog.writeLogEntry(salt2String);
200 }
201 continue; //gå frem til at tage næste transaktion fra køen
202 }
203
204 translog.writeLogEntry(salt2String);
205
206 MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
207 // same as MQPMO_DEFAULT
208
209 MQMessage msg = new MQMessage();
210 msg.Format = MQC.MQFMT_STRING;
211 msg.CharacterSet = 1252;
212 msg.WriteString(salt2String);
213
214 Salt2Header header = Salt2Helper.parseHeader(salt2String);
215 queueMysql.Put(msg, pmo);
216 if (saveForLater(header))
217 {
218 queueStore.Put(msg, pmo);
219 }
220 else
221 {
222 queueDimaps.Put(msg, pmo);
223 }
224
225
226
227 messageCount++;// increment per run message counter
228 if (messageCount >= 10000) // if we have moved 10000 messages in this run - let it go
229 {
230 isContinue = false;
231 }
232
233
234
235
236 }
237 else
238 {
239 System.Console.WriteLine("Non-text message");
240 }
241 }
242 catch (MQException mqe)
243 {
244 isContinue = false;
245
246 // report reason, if any
247 if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
248 {
249 // special report for normal end
250 System.Console.WriteLine("no more messages");
251 }
252 else
253 {
254 // general report for other reasons
255 System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
256
257 }
258
259 }
260
261 }
262
263
264
265 }
266 finally
267 {
268 MQHelper.closeQueueSafe(queueIndbakke);
269 MQHelper.closeQueueSafe(queueMysql);
270 MQHelper.closeQueueSafe(queueDimaps);
271 MQHelper.closeQueueSafe(queueStore);
272
273 MQHelper.closeQueueManagerSafe(mqMgr);
274 }
275 }
276
277 private Boolean saveForLater(Salt2Header header)
278 {
279
280 TimeSpan now = DateTime.Now.TimeOfDay;
281
282 if (now >= silentPeriodBegin && now < silentPeriodEnd)
283 {
284
285 if (header.afsender == "DAO") //DAO transaktioner må altid komme igennem
286 return false;
287
288 if (contains(header.transaktionForkortelse, this.filterTranscations) ) //Så længe vi skal være .net3.0 kompatible er LINQ problematisk (LINQ kræver 3.5)
289 {
290 return true;
291 }
292 else
293 {
294 return false;
295 }
296 }
297 else //normal operation - send straight trough
298 {
299 return false;
300 }
301
302 }
303
304 private bool contains(string needle, string[] haystack) // s
305 {
306 foreach(string hay in haystack)
307 {
308 if (needle.Equals(hay))
309 {
310 return true;
311 }
312 }
313
314 return false;
315 }
316
317
318
319
320 /* Singleton */
321 private static FilterController instance = null;
322 public static FilterController getInstance()
323 {
324 if (instance == null)
325 instance = new FilterController();
326
327 return instance;
328 }
329
330 }
331 }

  ViewVC Help
Powered by ViewVC 1.1.20