/[projects]/dao/DaoMqPump2/MQFilter/FilterController.cs
ViewVC logotype

Contents of /dao/DaoMqPump2/MQFilter/FilterController.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2177 - (show annotations) (download)
Mon May 19 19:51:47 2014 UTC (10 years ago) by torben
File size: 10449 byte(s)
Improved Logging
1 using System;
2 using System.Collections;
3 using System.Collections.Generic;
4 using System.IO;
5 using System.Text.RegularExpressions;
6
7 using IBM.WMQ;
8
9 using Microsoft.Win32;
10
11 using DaoCommon;
12
13
14 namespace MQFilter
15 {
16 class FilterController
17 {
18
19 public string mqHost { get; private set; }
20 public string mqChannel { get; private set; }
21 public string mqQueueManager { get; private set; }
22
23 public string logDirectory { get; private set; }
24
25 public string[] filterTranscations { get; private set; }
26
27 Logfile logFile;
28
29
30 public static readonly string queueNameIndbakke = "DAO.INDBAKKE";
31 public static readonly string queueNameDimaps = "DAO.SAMLET";
32 public static readonly string queueNameMysql = "DAO.ALL";
33 public static readonly string queueNameStore = "DAO.STORE";
34
35
36 protected FilterController()
37 {
38 initialize();
39 logFile = new Logfile(LogfileType.LogEvents, "filter", logDirectory);
40 logFile.addSingleLogEntry("Starting service");
41 }
42
43 private void initialize()
44 {
45 Console.WriteLine("FilterController: Loading config");
46 RegistryKey key = Registry.LocalMachine.CreateSubKey("Software\\DAO\\MQFilter");
47
48
49 //Læser globale MQ Parametre
50 mqHost = (string)key.GetValue("MQHost");
51 if (mqHost == null || mqHost.Length == 0)
52 {
53 key.SetValue("MQHost", "", RegistryValueKind.String);
54 throw new System.ArgumentException("MQHost cannot be null or empty");
55 }
56
57 mqChannel = (string)key.GetValue("MQChannel");
58 if (mqChannel == null || mqChannel.Length == 0)
59 {
60 key.SetValue("MQChannel", "", RegistryValueKind.String);
61 throw new System.ArgumentException("MQChannel cannot be null or empty");
62 }
63
64 mqQueueManager = (string)key.GetValue("MQQueueManager");
65 if (mqQueueManager == null || mqQueueManager.Length == 0)
66 {
67 key.SetValue("MQQueueManager", "", RegistryValueKind.String);
68 throw new System.ArgumentException("MQQueueManager cannot be null or empty");
69 }
70
71 ////////////
72
73 logDirectory = (string)key.GetValue("LogDirectory");
74 if (logDirectory == null || logDirectory.Length == 0)
75 {
76 key.SetValue("LogDirectory", "", RegistryValueKind.String);
77 throw new System.ArgumentException("LogDirectory cannot be null or empty");
78 }
79
80 if (Directory.Exists(logDirectory) == false)
81 {
82 Directory.CreateDirectory(logDirectory);
83 }
84
85 ////////////
86
87 String tmpFilterTransactions = (string)key.GetValue("FilterTransactions");
88 if (tmpFilterTransactions == null || tmpFilterTransactions.Length == 0)
89 {
90 key.SetValue("FilterTransactions", "", RegistryValueKind.String);
91 throw new System.ArgumentException("FilterTransactions cannot be null or empty");
92 }
93 tmpFilterTransactions = tmpFilterTransactions.Replace(';', ',');
94 filterTranscations = Regex.Split(tmpFilterTransactions, ",");
95
96 for (int i = 0; i < filterTranscations.Length; i++)
97 {
98 filterTranscations[i] = filterTranscations[i].Trim().ToUpper();
99 }
100
101 ////////////
102
103 }
104
105
106
107
108 public void transportAllMessages()
109 {
110 try
111 {
112 transportMessagesWorker();
113 }
114 catch (Exception e)
115 {
116 logFile.addSingleLogEntry("Error during transportAllMessages: " + e.Message);
117 }
118 }
119
120 private void transportMessagesWorker()
121 {
122 int messageCount = 0;
123
124 MQQueueManager mqMgr = null;
125 MQQueue queueIndbakke = null;
126 MQQueue queueMysql = null;
127 MQQueue queueDimaps = null;
128 MQQueue queueStore = null;
129
130 using (Logfile translog = new Logfile(LogfileType.LogTransactions, "filter", logDirectory))
131 try
132 {
133 //MQ options
134 Hashtable connProps = MQHelper.getConnectionProperties(mqHost, mqChannel);
135 int openInputOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
136 int openOutputOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
137
138
139 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
140 mqMgr = new MQQueueManager(mqQueueManager, connProps);//stage 1 connect to mq
141
142
143 queueIndbakke = MQHelper.openQueueHelper(queueNameIndbakke, mqMgr, openInputOptions);
144
145 queueMysql = MQHelper.openQueueHelper(queueNameMysql, mqMgr, openOutputOptions);
146 queueDimaps = MQHelper.openQueueHelper(queueNameDimaps, mqMgr, openOutputOptions);
147 queueStore = MQHelper.openQueueHelper(queueNameStore, mqMgr, openOutputOptions);
148
149
150 bool isContinue = true;
151 while (isContinue)
152 {
153
154 MQMessage mqMsg = new MQMessage();
155 MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
156
157
158 try
159 {
160 queueIndbakke.Get(mqMsg, mqGetMsgOpts);
161 if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
162 {
163 string salt2String = mqMsg.ReadString(mqMsg.MessageLength);
164 //System.Console.WriteLine(msgString);
165
166
167 // validér at headeren er gyldig
168 if ( Salt2Helper.validateSalt2Header(salt2String) == false)
169 {
170
171
172 using (Logfile discardedlog = new Logfile(LogfileType.LogDiscarded, "filter", logDirectory))
173 {
174 discardedlog.writeLogEntry(salt2String);
175 }
176 continue; //gå frem til at tage næste transaktion fra køen
177 }
178
179 translog.writeLogEntry(salt2String);
180
181 MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
182 // same as MQPMO_DEFAULT
183
184 MQMessage msg = new MQMessage();
185 msg.Format = MQC.MQFMT_STRING;
186 msg.CharacterSet = 1252;
187 msg.WriteString(salt2String);
188
189 Salt2Header header = Salt2Helper.parseHeader(salt2String);
190 queueMysql.Put(msg, pmo);
191 if (saveForLater(header))
192 {
193 queueStore.Put(msg, pmo);
194 }
195 else
196 {
197 queueDimaps.Put(msg, pmo);
198 }
199
200
201
202 messageCount++;// increment per run message counter
203 if (messageCount >= 10000) // if we have moved 10000 messages in this run - let it go
204 {
205 isContinue = false;
206 }
207
208
209
210
211 }
212 else
213 {
214 System.Console.WriteLine("Non-text message");
215 }
216 }
217 catch (MQException mqe)
218 {
219 isContinue = false;
220
221 // report reason, if any
222 if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
223 {
224 // special report for normal end
225 System.Console.WriteLine("no more messages");
226 }
227 else
228 {
229 // general report for other reasons
230 System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
231
232 }
233
234 }
235
236 }
237
238
239
240 }
241 finally
242 {
243 MQHelper.closeQueueSafe(queueIndbakke);
244 MQHelper.closeQueueSafe(queueMysql);
245 MQHelper.closeQueueSafe(queueDimaps);
246 MQHelper.closeQueueSafe(queueStore);
247
248 MQHelper.closeQueueManagerSafe(mqMgr);
249 }
250 }
251
252 private Boolean saveForLater(Salt2Header header)
253 {
254
255 DateTime now = DateTime.Now;
256 int hour = now.Hour;
257 if (hour >= 14 && hour < 18)
258 {
259
260 if (contains(header.transaktionForkortelse, this.filterTranscations) ) //Så længe vi skal være .net3.0 kompatible er LINQ problematisk (LINQ kræver 3.5)
261 {
262 return true;
263 }
264 else
265 {
266 return false;
267 }
268 }
269 else //normal operation - send straight trough
270 {
271 return false;
272 }
273
274 }
275
276 private bool contains(string needle, string[] haystack) // s
277 {
278 foreach(string hay in haystack)
279 {
280 if (needle.Equals(hay))
281 {
282 return true;
283 }
284 }
285
286 return false;
287 }
288
289
290
291
292 /* Singleton */
293 private static FilterController instance = null;
294 public static FilterController getInstance()
295 {
296 if (instance == null)
297 instance = new FilterController();
298
299 return instance;
300 }
301
302 }
303 }

  ViewVC Help
Powered by ViewVC 1.1.20