/[projects]/dao/DaoMqPump2/MQFilter/FilterController.cs
ViewVC logotype

Contents of /dao/DaoMqPump2/MQFilter/FilterController.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2171 - (show annotations) (download)
Fri May 16 21:50:17 2014 UTC (10 years ago) by torben
File size: 10373 byte(s)
Theoretically this should work
1 using System;
2 using System.Collections;
3 using System.Collections.Generic;
4 using System.IO;
5 using System.Text.RegularExpressions;
6
7 using IBM.WMQ;
8
9 using Microsoft.Win32;
10
11 using DaoCommon;
12
13
14 namespace MQFilter
15 {
16 class FilterController
17 {
18
19 public string mqHost { get; private set; }
20 public string mqChannel { get; private set; }
21 public string mqQueueManager { get; private set; }
22
23 public string logDirectory { get; private set; }
24
25 public string[] filterTranscations { get; private set; }
26
27
28 public static readonly string queueNameIndbakke = "DAO.INDBAKKE";
29 public static readonly string queueNameDimaps = "DAO.SAMLET";
30 public static readonly string queueNameMysql = "DAO.ALL";
31 public static readonly string queueNameStore = "DAO.STORE";
32
33
34 protected FilterController()
35 {
36 initialize();
37 }
38
39 private void initialize()
40 {
41 Console.WriteLine("FilterController: Loading config");
42 RegistryKey key = Registry.LocalMachine.CreateSubKey("Software\\DAO\\MQFilter");
43
44
45 //Læser globale MQ Parametre
46 mqHost = (string)key.GetValue("MQHost");
47 if (mqHost == null || mqHost.Length == 0)
48 throw new System.ArgumentException("MQHost cannot be null or empty");
49
50 mqChannel = (string)key.GetValue("MQChannel");
51 if (mqChannel == null || mqChannel.Length == 0)
52 throw new System.ArgumentException("MQChannel cannot be null or empty");
53
54 mqQueueManager = (string)key.GetValue("MQQueueManager");
55 if (mqQueueManager == null || mqQueueManager.Length == 0)
56 throw new System.ArgumentException("MQQueueManager cannot be null or empty");
57
58 ////////////
59
60 logDirectory = (string)key.GetValue("LogDirectory");
61 if (logDirectory == null || logDirectory.Length == 0)
62 throw new System.ArgumentException("LogDirectory cannot be null or empty");
63
64 if (Directory.Exists(logDirectory) == false)
65 {
66 Directory.CreateDirectory(logDirectory);
67 }
68
69 ////////////
70
71 String tmpFilterTransactions = (string)key.GetValue("FilterTransactions");
72 if (tmpFilterTransactions == null || tmpFilterTransactions.Length == 0)
73 throw new System.ArgumentException("FilterTransactions cannot be null or empty");
74 filterTranscations = Regex.Split(tmpFilterTransactions, ",");
75
76 for (int i = 0; i < filterTranscations.Length; i++)
77 {
78 filterTranscations[i] = filterTranscations[i].Trim();
79 }
80
81 ////////////
82
83 }
84
85
86 private Hashtable getConnectionProperties()
87 {
88 Hashtable connProperties = new Hashtable();
89 connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
90 connProperties.Add(MQC.HOST_NAME_PROPERTY, mqHost);
91 connProperties.Add(MQC.CHANNEL_PROPERTY, mqChannel);
92 return connProperties;
93 }
94
95
96 public void transportAllMessages()
97 {
98 int messageCount = 0;
99
100 MQQueueManager mqMgr = null;
101 MQQueue queueIndbakke = null;
102 MQQueue queueMysql = null;
103 MQQueue queueDimaps = null;
104 MQQueue queueStore = null;
105 try
106 {
107 //MQ options
108 Hashtable connProps = getConnectionProperties();
109 int openInputOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
110 int openOutputOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
111
112
113 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
114 mqMgr = new MQQueueManager(mqQueueManager, connProps);//stage 1 connect to mq
115
116 queueIndbakke = mqMgr.AccessQueue(queueNameIndbakke, openInputOptions);
117
118 queueMysql = mqMgr.AccessQueue(queueNameMysql, openOutputOptions);
119 queueDimaps = mqMgr.AccessQueue(queueNameDimaps, openOutputOptions);
120 queueStore = mqMgr.AccessQueue(queueNameStore, openOutputOptions);
121
122
123 bool isContinue = true;
124 while (isContinue)
125 {
126
127 MQMessage mqMsg = new MQMessage();
128 MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
129
130
131 try
132 {
133 queueIndbakke.Get(mqMsg, mqGetMsgOpts);
134 if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
135 {
136 string saltiiString = mqMsg.ReadString(mqMsg.MessageLength);
137 //System.Console.WriteLine(msgString);
138
139
140 // Hvis transaktionen starter med et ? er det ikke en gyldig transaktion
141 // validér ligeledes at headeren er gyldig
142 if (saltiiString.StartsWith("?") || Salt2Helper.validateSalt2Header(saltiiString) == false)
143 {
144 string discarded_filename = Logfile.getLogFilename(LogfileType.LogTransactions, logDirectory, "filter");
145 using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))
146 {
147 discardedlog.WriteLine(Logfile.getNowString() + " " + saltiiString);
148 }
149 continue; //gå frem til at tage næste transaktion fra køen
150 }
151
152 MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
153 // same as MQPMO_DEFAULT
154
155 MQMessage msg = new MQMessage();
156 msg.Format = MQC.MQFMT_STRING;
157 msg.CharacterSet = 1252;
158 msg.WriteString(saltiiString);
159
160 Salt2Header header = Salt2Helper.parseHeader(saltiiString);
161 queueMysql.Put(msg, pmo);
162 if (saveForLater(header))
163 {
164 queueStore.Put(msg, pmo);
165 }
166 else
167 {
168 queueDimaps.Put(msg, pmo);
169 }
170
171
172
173 messageCount++;// increment per run message counter
174 if (messageCount >= 10000) // if we have moved 10000 messages in this run - let it go
175 {
176 isContinue = false;
177 }
178
179
180
181
182 }
183 else
184 {
185 System.Console.WriteLine("Non-text message");
186 }
187 }
188 catch (MQException mqe)
189 {
190 isContinue = false;
191
192 // report reason, if any
193 if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
194 {
195 // special report for normal end
196 System.Console.WriteLine("no more messages");
197 }
198 else
199 {
200 // general report for other reasons
201 System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
202
203 }
204
205 }
206
207 }
208
209
210
211 }
212 finally
213 {
214 closeQueue(queueIndbakke);
215 closeQueue(queueMysql);
216 closeQueue(queueDimaps);
217 closeQueue(queueStore);
218
219
220 if (mqMgr != null && mqMgr.IsOpen)
221 {
222 try
223 {
224 mqMgr.Close();
225 }
226 catch (Exception e)
227 {
228 Console.WriteLine("Error cleaning up qmgr " + e.Message);
229 }
230 }
231 }
232 }
233
234 private Boolean saveForLater(Salt2Header header)
235 {
236 DateTime now = DateTime.Now;
237 int hour = now.Hour;
238 if (hour >= 14 && hour < 18)
239 {
240
241 if (contains(header.transaktionForkortelse, this.filterTranscations) ) //Så længe vi skal være net3.0 kompatible er LINQ problematisk
242 {
243 return true;
244 }
245 else
246 {
247 return false;
248 }
249 }
250 else //normal operation - send straight trough
251 {
252 return false;
253 }
254
255 }
256
257 private bool contains(string needle, string[] haystack) // s
258 {
259 foreach(string hay in haystack)
260 {
261 if (needle.Equals(hay))
262 {
263 return true;
264 }
265 }
266
267 return false;
268 }
269
270 private void closeQueue(MQQueue queue)
271 {
272 if (queue != null && queue.IsOpen)
273 {
274 try
275 {
276 queue.Close();
277 }
278 catch (Exception e)
279 {
280 Console.WriteLine("Error cleaning up queue " + e.Message);
281 }
282 }
283
284 }
285
286
287
288 /* Singleton */
289 private static FilterController instance = null;
290 public static FilterController getInstance()
291 {
292 if (instance == null)
293 instance = new FilterController();
294
295 return instance;
296 }
297
298 }
299 }

  ViewVC Help
Powered by ViewVC 1.1.20