/[projects]/dao/DaoMqPump2/MQFilter/FilterController.cs
ViewVC logotype

Annotation of /dao/DaoMqPump2/MQFilter/FilterController.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2182 - (hide annotations) (download)
Tue May 20 20:48:48 2014 UTC (10 years ago) by torben
File size: 11799 byte(s)
DAO Transactions go straight through
1 torben 2163 using System;
2 torben 2168 using System.Collections;
3 torben 2163 using System.Collections.Generic;
4 torben 2167 using System.IO;
5     using System.Text.RegularExpressions;
6 torben 2163
7 torben 2168 using IBM.WMQ;
8    
9 torben 2167 using Microsoft.Win32;
10    
11 torben 2168 using DaoCommon;
12    
13    
14 torben 2163 namespace MQFilter
15     {
16     class FilterController
17     {
18    
19 torben 2167 public string mqHost { get; private set; }
20     public string mqChannel { get; private set; }
21     public string mqQueueManager { get; private set; }
22    
23     public string logDirectory { get; private set; }
24    
25     public string[] filterTranscations { get; private set; }
26    
27 torben 2177 Logfile logFile;
28 torben 2167
29 torben 2177
30 torben 2168 public static readonly string queueNameIndbakke = "DAO.INDBAKKE";
31     public static readonly string queueNameDimaps = "DAO.SAMLET";
32     public static readonly string queueNameMysql = "DAO.ALL";
33     public static readonly string queueNameStore = "DAO.STORE";
34    
35 torben 2180 TimeSpan silentPeriodBegin;
36     TimeSpan silentPeriodEnd;
37 torben 2168
38 torben 2167 protected FilterController()
39     {
40     initialize();
41 torben 2177 logFile = new Logfile(LogfileType.LogEvents, "filter", logDirectory);
42     logFile.addSingleLogEntry("Starting service");
43 torben 2167 }
44    
45     private void initialize()
46     {
47     Console.WriteLine("FilterController: Loading config");
48     RegistryKey key = Registry.LocalMachine.CreateSubKey("Software\\DAO\\MQFilter");
49    
50    
51     //Læser globale MQ Parametre
52     mqHost = (string)key.GetValue("MQHost");
53     if (mqHost == null || mqHost.Length == 0)
54 torben 2175 {
55     key.SetValue("MQHost", "", RegistryValueKind.String);
56 torben 2167 throw new System.ArgumentException("MQHost cannot be null or empty");
57 torben 2175 }
58 torben 2167
59     mqChannel = (string)key.GetValue("MQChannel");
60     if (mqChannel == null || mqChannel.Length == 0)
61 torben 2175 {
62     key.SetValue("MQChannel", "", RegistryValueKind.String);
63 torben 2167 throw new System.ArgumentException("MQChannel cannot be null or empty");
64 torben 2175 }
65 torben 2167
66     mqQueueManager = (string)key.GetValue("MQQueueManager");
67     if (mqQueueManager == null || mqQueueManager.Length == 0)
68 torben 2175 {
69     key.SetValue("MQQueueManager", "", RegistryValueKind.String);
70 torben 2167 throw new System.ArgumentException("MQQueueManager cannot be null or empty");
71 torben 2175 }
72 torben 2167
73 torben 2168 ////////////
74 torben 2167
75     logDirectory = (string)key.GetValue("LogDirectory");
76     if (logDirectory == null || logDirectory.Length == 0)
77 torben 2175 {
78     key.SetValue("LogDirectory", "", RegistryValueKind.String);
79 torben 2167 throw new System.ArgumentException("LogDirectory cannot be null or empty");
80 torben 2175 }
81 torben 2167
82 torben 2168 if (Directory.Exists(logDirectory) == false)
83     {
84     Directory.CreateDirectory(logDirectory);
85     }
86    
87     ////////////
88    
89 torben 2180 string silentBeginStr = (string)key.GetValue("SilentPeriodBegin");
90     if (silentBeginStr == null || silentBeginStr.Length == 0)
91     {
92     key.SetValue("SilentPeriodBegin", "", RegistryValueKind.String);
93     throw new System.ArgumentException("SilentPeriodBegin cannot be null or empty");
94     }
95     silentPeriodBegin = TimeSpan.Parse(silentBeginStr);
96    
97     string silentEndStr = (string)key.GetValue("SilentPeriodEnd");
98     if (silentEndStr == null || silentEndStr.Length == 0)
99     {
100     key.SetValue("SilentPeriodEnd", "", RegistryValueKind.String);
101     throw new System.ArgumentException("SilentPeriodEnd cannot be null or empty");
102     }
103     silentPeriodEnd = TimeSpan.Parse(silentEndStr);
104    
105     ////////////
106    
107 torben 2167 String tmpFilterTransactions = (string)key.GetValue("FilterTransactions");
108     if (tmpFilterTransactions == null || tmpFilterTransactions.Length == 0)
109 torben 2175 {
110     key.SetValue("FilterTransactions", "", RegistryValueKind.String);
111 torben 2168 throw new System.ArgumentException("FilterTransactions cannot be null or empty");
112 torben 2175 }
113 torben 2174 tmpFilterTransactions = tmpFilterTransactions.Replace(';', ',');
114 torben 2168 filterTranscations = Regex.Split(tmpFilterTransactions, ",");
115 torben 2167
116     for (int i = 0; i < filterTranscations.Length; i++)
117     {
118 torben 2174 filterTranscations[i] = filterTranscations[i].Trim().ToUpper();
119 torben 2167 }
120    
121 torben 2168 ////////////
122 torben 2167
123 torben 2168 }
124 torben 2167
125 torben 2168
126 torben 2167
127    
128 torben 2163 public void transportAllMessages()
129     {
130 torben 2177 try
131     {
132     transportMessagesWorker();
133     }
134     catch (Exception e)
135     {
136     logFile.addSingleLogEntry("Error during transportAllMessages: " + e.Message);
137 torben 2181 Console.WriteLine(e.StackTrace);
138 torben 2177 }
139     }
140    
141     private void transportMessagesWorker()
142     {
143 torben 2168 int messageCount = 0;
144    
145     MQQueueManager mqMgr = null;
146     MQQueue queueIndbakke = null;
147     MQQueue queueMysql = null;
148     MQQueue queueDimaps = null;
149     MQQueue queueStore = null;
150 torben 2177
151     using (Logfile translog = new Logfile(LogfileType.LogTransactions, "filter", logDirectory))
152 torben 2168 try
153     {
154     //MQ options
155 torben 2172 Hashtable connProps = MQHelper.getConnectionProperties(mqHost, mqChannel);
156 torben 2168 int openInputOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
157     int openOutputOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
158    
159    
160     //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
161     mqMgr = new MQQueueManager(mqQueueManager, connProps);//stage 1 connect to mq
162    
163    
164 torben 2175 queueIndbakke = MQHelper.openQueueHelper(queueNameIndbakke, mqMgr, openInputOptions);
165 torben 2168
166 torben 2175 queueMysql = MQHelper.openQueueHelper(queueNameMysql, mqMgr, openOutputOptions);
167     queueDimaps = MQHelper.openQueueHelper(queueNameDimaps, mqMgr, openOutputOptions);
168     queueStore = MQHelper.openQueueHelper(queueNameStore, mqMgr, openOutputOptions);
169 torben 2168
170 torben 2175
171 torben 2168 bool isContinue = true;
172     while (isContinue)
173     {
174    
175     MQMessage mqMsg = new MQMessage();
176     MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
177    
178    
179     try
180     {
181     queueIndbakke.Get(mqMsg, mqGetMsgOpts);
182     if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
183     {
184 torben 2181 if (mqMsg.MessageLength == 0) //Skip empty transactions (so far only seen on my test server)
185     continue;
186    
187     //System.Console.WriteLine("LEN>" + mqMsg.MessageLength);
188 torben 2172 string salt2String = mqMsg.ReadString(mqMsg.MessageLength);
189 torben 2181
190 torben 2168
191    
192 torben 2172 // validér at headeren er gyldig
193     if ( Salt2Helper.validateSalt2Header(salt2String) == false)
194 torben 2168 {
195 torben 2177
196    
197     using (Logfile discardedlog = new Logfile(LogfileType.LogDiscarded, "filter", logDirectory))
198 torben 2168 {
199 torben 2177 discardedlog.writeLogEntry(salt2String);
200 torben 2168 }
201     continue; //gå frem til at tage næste transaktion fra køen
202     }
203    
204 torben 2177 translog.writeLogEntry(salt2String);
205    
206 torben 2171 MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
207     // same as MQPMO_DEFAULT
208 torben 2168
209 torben 2171 MQMessage msg = new MQMessage();
210     msg.Format = MQC.MQFMT_STRING;
211     msg.CharacterSet = 1252;
212 torben 2172 msg.WriteString(salt2String);
213 torben 2168
214 torben 2172 Salt2Header header = Salt2Helper.parseHeader(salt2String);
215 torben 2171 queueMysql.Put(msg, pmo);
216     if (saveForLater(header))
217     {
218     queueStore.Put(msg, pmo);
219     }
220     else
221     {
222     queueDimaps.Put(msg, pmo);
223     }
224 torben 2168
225 torben 2177
226 torben 2168
227     messageCount++;// increment per run message counter
228     if (messageCount >= 10000) // if we have moved 10000 messages in this run - let it go
229     {
230     isContinue = false;
231     }
232    
233    
234    
235    
236     }
237     else
238     {
239     System.Console.WriteLine("Non-text message");
240     }
241     }
242     catch (MQException mqe)
243     {
244     isContinue = false;
245    
246     // report reason, if any
247     if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
248     {
249     // special report for normal end
250     System.Console.WriteLine("no more messages");
251     }
252     else
253     {
254     // general report for other reasons
255     System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
256    
257     }
258    
259     }
260    
261     }
262    
263    
264    
265     }
266     finally
267     {
268 torben 2172 MQHelper.closeQueueSafe(queueIndbakke);
269     MQHelper.closeQueueSafe(queueMysql);
270     MQHelper.closeQueueSafe(queueDimaps);
271     MQHelper.closeQueueSafe(queueStore);
272 torben 2168
273 torben 2172 MQHelper.closeQueueManagerSafe(mqMgr);
274 torben 2168 }
275 torben 2163 }
276    
277 torben 2171 private Boolean saveForLater(Salt2Header header)
278     {
279 torben 2174
280 torben 2180 TimeSpan now = DateTime.Now.TimeOfDay;
281    
282     if (now >= silentPeriodBegin && now < silentPeriodEnd)
283 torben 2171 {
284 torben 2182
285     if (header.afsender == "DAO") //DAO transaktioner må altid komme igennem
286     return false;
287 torben 2171
288 torben 2173 if (contains(header.transaktionForkortelse, this.filterTranscations) ) //Så længe vi skal være .net3.0 kompatible er LINQ problematisk (LINQ kræver 3.5)
289 torben 2171 {
290     return true;
291     }
292     else
293     {
294     return false;
295     }
296     }
297     else //normal operation - send straight trough
298     {
299     return false;
300     }
301    
302     }
303    
304     private bool contains(string needle, string[] haystack) // s
305     {
306     foreach(string hay in haystack)
307     {
308     if (needle.Equals(hay))
309     {
310     return true;
311     }
312     }
313    
314     return false;
315     }
316    
317 torben 2167
318    
319 torben 2168
320 torben 2163 /* Singleton */
321     private static FilterController instance = null;
322     public static FilterController getInstance()
323     {
324     if (instance == null)
325     instance = new FilterController();
326    
327     return instance;
328     }
329    
330     }
331     }

  ViewVC Help
Powered by ViewVC 1.1.20