/[projects]/dao/DaoMqPump2/MQFilter/FilterController.cs
ViewVC logotype

Contents of /dao/DaoMqPump2/MQFilter/FilterController.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2183 - (show annotations) (download)
Wed May 21 09:56:21 2014 UTC (9 years, 11 months ago) by torben
File size: 11886 byte(s)
Use some smaller but more frequent runs
1 using System;
2 using System.Collections;
3 using System.Collections.Generic;
4 using System.IO;
5 using System.Text.RegularExpressions;
6
7 using IBM.WMQ;
8
9 using Microsoft.Win32;
10
11 using DaoCommon;
12
13
14 namespace MQFilter
15 {
16 class FilterController
17 {
18
19 public const int TRANSACTIONS_PER_RUN = 2000;
20
21 public string mqHost { get; private set; }
22 public string mqChannel { get; private set; }
23 public string mqQueueManager { get; private set; }
24
25 public string logDirectory { get; private set; }
26
27 public string[] filterTranscations { get; private set; }
28
29 Logfile logFile;
30
31
32 public static readonly string queueNameIndbakke = "DAO.INDBAKKE";
33 public static readonly string queueNameDimaps = "DAO.SAMLET";
34 public static readonly string queueNameMysql = "DAO.ALL";
35 public static readonly string queueNameStore = "DAO.STORE";
36
37 TimeSpan silentPeriodBegin;
38 TimeSpan silentPeriodEnd;
39
40 protected FilterController()
41 {
42 initialize();
43 logFile = new Logfile(LogfileType.LogEvents, "filter", logDirectory);
44 logFile.addSingleLogEntry("Starting service");
45 }
46
47 private void initialize()
48 {
49 Console.WriteLine("FilterController: Loading config");
50 RegistryKey key = Registry.LocalMachine.CreateSubKey("Software\\DAO\\MQFilter");
51
52
53 //Læser globale MQ Parametre
54 mqHost = (string)key.GetValue("MQHost");
55 if (mqHost == null || mqHost.Length == 0)
56 {
57 key.SetValue("MQHost", "", RegistryValueKind.String);
58 throw new System.ArgumentException("MQHost cannot be null or empty");
59 }
60
61 mqChannel = (string)key.GetValue("MQChannel");
62 if (mqChannel == null || mqChannel.Length == 0)
63 {
64 key.SetValue("MQChannel", "", RegistryValueKind.String);
65 throw new System.ArgumentException("MQChannel cannot be null or empty");
66 }
67
68 mqQueueManager = (string)key.GetValue("MQQueueManager");
69 if (mqQueueManager == null || mqQueueManager.Length == 0)
70 {
71 key.SetValue("MQQueueManager", "", RegistryValueKind.String);
72 throw new System.ArgumentException("MQQueueManager cannot be null or empty");
73 }
74
75 ////////////
76
77 logDirectory = (string)key.GetValue("LogDirectory");
78 if (logDirectory == null || logDirectory.Length == 0)
79 {
80 key.SetValue("LogDirectory", "", RegistryValueKind.String);
81 throw new System.ArgumentException("LogDirectory cannot be null or empty");
82 }
83
84 if (Directory.Exists(logDirectory) == false)
85 {
86 Directory.CreateDirectory(logDirectory);
87 }
88
89 ////////////
90
91 string silentBeginStr = (string)key.GetValue("SilentPeriodBegin");
92 if (silentBeginStr == null || silentBeginStr.Length == 0)
93 {
94 key.SetValue("SilentPeriodBegin", "", RegistryValueKind.String);
95 throw new System.ArgumentException("SilentPeriodBegin cannot be null or empty");
96 }
97 silentPeriodBegin = TimeSpan.Parse(silentBeginStr);
98
99 string silentEndStr = (string)key.GetValue("SilentPeriodEnd");
100 if (silentEndStr == null || silentEndStr.Length == 0)
101 {
102 key.SetValue("SilentPeriodEnd", "", RegistryValueKind.String);
103 throw new System.ArgumentException("SilentPeriodEnd cannot be null or empty");
104 }
105 silentPeriodEnd = TimeSpan.Parse(silentEndStr);
106
107 ////////////
108
109 String tmpFilterTransactions = (string)key.GetValue("FilterTransactions");
110 if (tmpFilterTransactions == null || tmpFilterTransactions.Length == 0)
111 {
112 key.SetValue("FilterTransactions", "", RegistryValueKind.String);
113 throw new System.ArgumentException("FilterTransactions cannot be null or empty");
114 }
115 tmpFilterTransactions = tmpFilterTransactions.Replace(';', ',');
116 filterTranscations = Regex.Split(tmpFilterTransactions, ",");
117
118 for (int i = 0; i < filterTranscations.Length; i++)
119 {
120 filterTranscations[i] = filterTranscations[i].Trim().ToUpper();
121 }
122
123 ////////////
124
125 }
126
127
128
129
130 public void transportAllMessages()
131 {
132 try
133 {
134 transportMessagesWorker();
135 }
136 catch (Exception e)
137 {
138 logFile.addSingleLogEntry("Error during transportAllMessages: " + e.Message);
139 Console.WriteLine(e.StackTrace);
140 }
141 }
142
143 private void transportMessagesWorker()
144 {
145 int messageCount = 0;
146
147 MQQueueManager mqMgr = null;
148 MQQueue queueIndbakke = null;
149 MQQueue queueMysql = null;
150 MQQueue queueDimaps = null;
151 MQQueue queueStore = null;
152
153 using (Logfile translog = new Logfile(LogfileType.LogTransactions, "filter", logDirectory))
154 try
155 {
156 //MQ options
157 Hashtable connProps = MQHelper.getConnectionProperties(mqHost, mqChannel);
158 int openInputOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
159 int openOutputOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
160
161
162 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
163 mqMgr = new MQQueueManager(mqQueueManager, connProps);//stage 1 connect to mq
164
165
166 queueIndbakke = MQHelper.openQueueHelper(queueNameIndbakke, mqMgr, openInputOptions);
167
168 queueMysql = MQHelper.openQueueHelper(queueNameMysql, mqMgr, openOutputOptions);
169 queueDimaps = MQHelper.openQueueHelper(queueNameDimaps, mqMgr, openOutputOptions);
170 queueStore = MQHelper.openQueueHelper(queueNameStore, mqMgr, openOutputOptions);
171
172
173 bool isContinue = true;
174 while (isContinue)
175 {
176
177 MQMessage mqMsg = new MQMessage();
178 MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
179
180
181 try
182 {
183 queueIndbakke.Get(mqMsg, mqGetMsgOpts);
184 if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
185 {
186 if (mqMsg.MessageLength == 0) //Skip empty transactions (so far only seen on my test server)
187 continue;
188
189 //System.Console.WriteLine("LEN>" + mqMsg.MessageLength);
190 string salt2String = mqMsg.ReadString(mqMsg.MessageLength);
191
192
193
194 // validér at headeren er gyldig
195 if ( Salt2Helper.validateSalt2Header(salt2String) == false)
196 {
197
198
199 using (Logfile discardedlog = new Logfile(LogfileType.LogDiscarded, "filter", logDirectory))
200 {
201 discardedlog.writeLogEntry(salt2String);
202 }
203 continue; //gå frem til at tage næste transaktion fra køen
204 }
205
206 translog.writeLogEntry(salt2String);
207
208 MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
209 // same as MQPMO_DEFAULT
210
211 MQMessage msg = new MQMessage();
212 msg.Format = MQC.MQFMT_STRING;
213 msg.CharacterSet = 1252;
214 msg.WriteString(salt2String);
215
216 Salt2Header header = Salt2Helper.parseHeader(salt2String);
217 queueMysql.Put(msg, pmo);
218 if (saveForLater(header))
219 {
220 queueStore.Put(msg, pmo);
221 }
222 else
223 {
224 queueDimaps.Put(msg, pmo);
225 }
226
227
228
229 messageCount++;// increment per run message counter
230 if (messageCount >= TRANSACTIONS_PER_RUN) // if we have moved TRANSACTIONS_PER_RUN messages in this run - let it go
231 {
232 isContinue = false;
233 }
234
235
236
237
238 }
239 else
240 {
241 System.Console.WriteLine("Non-text message");
242 }
243 }
244 catch (MQException mqe)
245 {
246 isContinue = false;
247
248 // report reason, if any
249 if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
250 {
251 // special report for normal end
252 System.Console.WriteLine("no more messages");
253 }
254 else
255 {
256 // general report for other reasons
257 System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
258
259 }
260
261 }
262
263 }
264
265
266
267 }
268 finally
269 {
270 MQHelper.closeQueueSafe(queueIndbakke);
271 MQHelper.closeQueueSafe(queueMysql);
272 MQHelper.closeQueueSafe(queueDimaps);
273 MQHelper.closeQueueSafe(queueStore);
274
275 MQHelper.closeQueueManagerSafe(mqMgr);
276 }
277 }
278
279 private Boolean saveForLater(Salt2Header header)
280 {
281
282 TimeSpan now = DateTime.Now.TimeOfDay;
283
284 if (now >= silentPeriodBegin && now < silentPeriodEnd)
285 {
286
287 if (header.afsender == "DAO") //DAO transaktioner må altid komme igennem
288 return false;
289
290 if (contains(header.transaktionForkortelse, this.filterTranscations) ) //Så længe vi skal være .net3.0 kompatible er LINQ problematisk (LINQ kræver 3.5)
291 {
292 return true;
293 }
294 else
295 {
296 return false;
297 }
298 }
299 else //normal operation - send straight trough
300 {
301 return false;
302 }
303
304 }
305
306 private bool contains(string needle, string[] haystack) // s
307 {
308 foreach(string hay in haystack)
309 {
310 if (needle.Equals(hay))
311 {
312 return true;
313 }
314 }
315
316 return false;
317 }
318
319
320
321
322 /* Singleton */
323 private static FilterController instance = null;
324 public static FilterController getInstance()
325 {
326 if (instance == null)
327 instance = new FilterController();
328
329 return instance;
330 }
331
332 }
333 }

  ViewVC Help
Powered by ViewVC 1.1.20