/[projects]/dao/DaoMqPump2/MQFilter/FilterController.cs
ViewVC logotype

Contents of /dao/DaoMqPump2/MQFilter/FilterController.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2175 - (show annotations) (download)
Mon May 19 11:59:45 2014 UTC (10 years ago) by torben
File size: 9903 byte(s)
Better handling of openQueue exceptions
1 using System;
2 using System.Collections;
3 using System.Collections.Generic;
4 using System.IO;
5 using System.Text.RegularExpressions;
6
7 using IBM.WMQ;
8
9 using Microsoft.Win32;
10
11 using DaoCommon;
12
13
14 namespace MQFilter
15 {
16 class FilterController
17 {
18
19 public string mqHost { get; private set; }
20 public string mqChannel { get; private set; }
21 public string mqQueueManager { get; private set; }
22
23 public string logDirectory { get; private set; }
24
25 public string[] filterTranscations { get; private set; }
26
27
28 public static readonly string queueNameIndbakke = "DAO.INDBAKKE";
29 public static readonly string queueNameDimaps = "DAO.SAMLET";
30 public static readonly string queueNameMysql = "DAO.ALL";
31 public static readonly string queueNameStore = "DAO.STORE";
32
33
34 protected FilterController()
35 {
36 initialize();
37 }
38
39 private void initialize()
40 {
41 Console.WriteLine("FilterController: Loading config");
42 RegistryKey key = Registry.LocalMachine.CreateSubKey("Software\\DAO\\MQFilter");
43
44
45 //Læser globale MQ Parametre
46 mqHost = (string)key.GetValue("MQHost");
47 if (mqHost == null || mqHost.Length == 0)
48 {
49 key.SetValue("MQHost", "", RegistryValueKind.String);
50 throw new System.ArgumentException("MQHost cannot be null or empty");
51 }
52
53 mqChannel = (string)key.GetValue("MQChannel");
54 if (mqChannel == null || mqChannel.Length == 0)
55 {
56 key.SetValue("MQChannel", "", RegistryValueKind.String);
57 throw new System.ArgumentException("MQChannel cannot be null or empty");
58 }
59
60 mqQueueManager = (string)key.GetValue("MQQueueManager");
61 if (mqQueueManager == null || mqQueueManager.Length == 0)
62 {
63 key.SetValue("MQQueueManager", "", RegistryValueKind.String);
64 throw new System.ArgumentException("MQQueueManager cannot be null or empty");
65 }
66
67 ////////////
68
69 logDirectory = (string)key.GetValue("LogDirectory");
70 if (logDirectory == null || logDirectory.Length == 0)
71 {
72 key.SetValue("LogDirectory", "", RegistryValueKind.String);
73 throw new System.ArgumentException("LogDirectory cannot be null or empty");
74 }
75
76 if (Directory.Exists(logDirectory) == false)
77 {
78 Directory.CreateDirectory(logDirectory);
79 }
80
81 ////////////
82
83 String tmpFilterTransactions = (string)key.GetValue("FilterTransactions");
84 if (tmpFilterTransactions == null || tmpFilterTransactions.Length == 0)
85 {
86 key.SetValue("FilterTransactions", "", RegistryValueKind.String);
87 throw new System.ArgumentException("FilterTransactions cannot be null or empty");
88 }
89 tmpFilterTransactions = tmpFilterTransactions.Replace(';', ',');
90 filterTranscations = Regex.Split(tmpFilterTransactions, ",");
91
92 for (int i = 0; i < filterTranscations.Length; i++)
93 {
94 filterTranscations[i] = filterTranscations[i].Trim().ToUpper();
95 }
96
97 ////////////
98
99 }
100
101
102
103
104
105 public void transportAllMessages()
106 {
107 int messageCount = 0;
108
109 MQQueueManager mqMgr = null;
110 MQQueue queueIndbakke = null;
111 MQQueue queueMysql = null;
112 MQQueue queueDimaps = null;
113 MQQueue queueStore = null;
114 try
115 {
116 //MQ options
117 Hashtable connProps = MQHelper.getConnectionProperties(mqHost, mqChannel);
118 int openInputOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
119 int openOutputOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
120
121
122 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
123 mqMgr = new MQQueueManager(mqQueueManager, connProps);//stage 1 connect to mq
124
125
126 queueIndbakke = MQHelper.openQueueHelper(queueNameIndbakke, mqMgr, openInputOptions);
127
128 queueMysql = MQHelper.openQueueHelper(queueNameMysql, mqMgr, openOutputOptions);
129 queueDimaps = MQHelper.openQueueHelper(queueNameDimaps, mqMgr, openOutputOptions);
130 queueStore = MQHelper.openQueueHelper(queueNameStore, mqMgr, openOutputOptions);
131
132
133 bool isContinue = true;
134 while (isContinue)
135 {
136
137 MQMessage mqMsg = new MQMessage();
138 MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
139
140
141 try
142 {
143 queueIndbakke.Get(mqMsg, mqGetMsgOpts);
144 if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
145 {
146 string salt2String = mqMsg.ReadString(mqMsg.MessageLength);
147 //System.Console.WriteLine(msgString);
148
149
150 // validér at headeren er gyldig
151 if ( Salt2Helper.validateSalt2Header(salt2String) == false)
152 {
153 string discarded_filename = Logfile.getLogFilename(LogfileType.LogTransactions, logDirectory, "filter");
154 using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))
155 {
156 discardedlog.WriteLine(Logfile.getNowString() + " " + salt2String);
157 }
158 continue; //gå frem til at tage næste transaktion fra køen
159 }
160
161 MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
162 // same as MQPMO_DEFAULT
163
164 MQMessage msg = new MQMessage();
165 msg.Format = MQC.MQFMT_STRING;
166 msg.CharacterSet = 1252;
167 msg.WriteString(salt2String);
168
169 Salt2Header header = Salt2Helper.parseHeader(salt2String);
170 queueMysql.Put(msg, pmo);
171 if (saveForLater(header))
172 {
173 queueStore.Put(msg, pmo);
174 }
175 else
176 {
177 queueDimaps.Put(msg, pmo);
178 }
179
180
181
182 messageCount++;// increment per run message counter
183 if (messageCount >= 10000) // if we have moved 10000 messages in this run - let it go
184 {
185 isContinue = false;
186 }
187
188
189
190
191 }
192 else
193 {
194 System.Console.WriteLine("Non-text message");
195 }
196 }
197 catch (MQException mqe)
198 {
199 isContinue = false;
200
201 // report reason, if any
202 if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
203 {
204 // special report for normal end
205 System.Console.WriteLine("no more messages");
206 }
207 else
208 {
209 // general report for other reasons
210 System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
211
212 }
213
214 }
215
216 }
217
218
219
220 }
221 finally
222 {
223 MQHelper.closeQueueSafe(queueIndbakke);
224 MQHelper.closeQueueSafe(queueMysql);
225 MQHelper.closeQueueSafe(queueDimaps);
226 MQHelper.closeQueueSafe(queueStore);
227
228 MQHelper.closeQueueManagerSafe(mqMgr);
229 }
230 }
231
232 private Boolean saveForLater(Salt2Header header)
233 {
234
235 DateTime now = DateTime.Now;
236 int hour = now.Hour;
237 if (hour >= 14 && hour < 18)
238 {
239
240 if (contains(header.transaktionForkortelse, this.filterTranscations) ) //Så længe vi skal være .net3.0 kompatible er LINQ problematisk (LINQ kræver 3.5)
241 {
242 return true;
243 }
244 else
245 {
246 return false;
247 }
248 }
249 else //normal operation - send straight trough
250 {
251 return false;
252 }
253
254 }
255
256 private bool contains(string needle, string[] haystack) // s
257 {
258 foreach(string hay in haystack)
259 {
260 if (needle.Equals(hay))
261 {
262 return true;
263 }
264 }
265
266 return false;
267 }
268
269
270
271
272 /* Singleton */
273 private static FilterController instance = null;
274 public static FilterController getInstance()
275 {
276 if (instance == null)
277 instance = new FilterController();
278
279 return instance;
280 }
281
282 }
283 }

  ViewVC Help
Powered by ViewVC 1.1.20