/[projects]/dao/DaoMqPump2/MQFilter/FilterController.cs
ViewVC logotype

Annotation of /dao/DaoMqPump2/MQFilter/FilterController.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2183 - (hide annotations) (download)
Wed May 21 09:56:21 2014 UTC (10 years ago) by torben
File size: 11886 byte(s)
Use some smaller but more frequent runs
1 torben 2163 using System;
2 torben 2168 using System.Collections;
3 torben 2163 using System.Collections.Generic;
4 torben 2167 using System.IO;
5     using System.Text.RegularExpressions;
6 torben 2163
7 torben 2168 using IBM.WMQ;
8    
9 torben 2167 using Microsoft.Win32;
10    
11 torben 2168 using DaoCommon;
12    
13    
14 torben 2163 namespace MQFilter
15     {
16     class FilterController
17     {
18    
19 torben 2183 public const int TRANSACTIONS_PER_RUN = 2000;
20    
21 torben 2167 public string mqHost { get; private set; }
22     public string mqChannel { get; private set; }
23     public string mqQueueManager { get; private set; }
24    
25     public string logDirectory { get; private set; }
26    
27     public string[] filterTranscations { get; private set; }
28    
29 torben 2177 Logfile logFile;
30 torben 2167
31 torben 2177
32 torben 2168 public static readonly string queueNameIndbakke = "DAO.INDBAKKE";
33     public static readonly string queueNameDimaps = "DAO.SAMLET";
34     public static readonly string queueNameMysql = "DAO.ALL";
35     public static readonly string queueNameStore = "DAO.STORE";
36    
37 torben 2180 TimeSpan silentPeriodBegin;
38     TimeSpan silentPeriodEnd;
39 torben 2168
40 torben 2167 protected FilterController()
41     {
42     initialize();
43 torben 2177 logFile = new Logfile(LogfileType.LogEvents, "filter", logDirectory);
44     logFile.addSingleLogEntry("Starting service");
45 torben 2167 }
46    
47     private void initialize()
48     {
49     Console.WriteLine("FilterController: Loading config");
50     RegistryKey key = Registry.LocalMachine.CreateSubKey("Software\\DAO\\MQFilter");
51    
52    
53     //Læser globale MQ Parametre
54     mqHost = (string)key.GetValue("MQHost");
55     if (mqHost == null || mqHost.Length == 0)
56 torben 2175 {
57     key.SetValue("MQHost", "", RegistryValueKind.String);
58 torben 2167 throw new System.ArgumentException("MQHost cannot be null or empty");
59 torben 2175 }
60 torben 2167
61     mqChannel = (string)key.GetValue("MQChannel");
62     if (mqChannel == null || mqChannel.Length == 0)
63 torben 2175 {
64     key.SetValue("MQChannel", "", RegistryValueKind.String);
65 torben 2167 throw new System.ArgumentException("MQChannel cannot be null or empty");
66 torben 2175 }
67 torben 2167
68     mqQueueManager = (string)key.GetValue("MQQueueManager");
69     if (mqQueueManager == null || mqQueueManager.Length == 0)
70 torben 2175 {
71     key.SetValue("MQQueueManager", "", RegistryValueKind.String);
72 torben 2167 throw new System.ArgumentException("MQQueueManager cannot be null or empty");
73 torben 2175 }
74 torben 2167
75 torben 2168 ////////////
76 torben 2167
77     logDirectory = (string)key.GetValue("LogDirectory");
78     if (logDirectory == null || logDirectory.Length == 0)
79 torben 2175 {
80     key.SetValue("LogDirectory", "", RegistryValueKind.String);
81 torben 2167 throw new System.ArgumentException("LogDirectory cannot be null or empty");
82 torben 2175 }
83 torben 2167
84 torben 2168 if (Directory.Exists(logDirectory) == false)
85     {
86     Directory.CreateDirectory(logDirectory);
87     }
88    
89     ////////////
90    
91 torben 2180 string silentBeginStr = (string)key.GetValue("SilentPeriodBegin");
92     if (silentBeginStr == null || silentBeginStr.Length == 0)
93     {
94     key.SetValue("SilentPeriodBegin", "", RegistryValueKind.String);
95     throw new System.ArgumentException("SilentPeriodBegin cannot be null or empty");
96     }
97     silentPeriodBegin = TimeSpan.Parse(silentBeginStr);
98    
99     string silentEndStr = (string)key.GetValue("SilentPeriodEnd");
100     if (silentEndStr == null || silentEndStr.Length == 0)
101     {
102     key.SetValue("SilentPeriodEnd", "", RegistryValueKind.String);
103     throw new System.ArgumentException("SilentPeriodEnd cannot be null or empty");
104     }
105     silentPeriodEnd = TimeSpan.Parse(silentEndStr);
106    
107     ////////////
108    
109 torben 2167 String tmpFilterTransactions = (string)key.GetValue("FilterTransactions");
110     if (tmpFilterTransactions == null || tmpFilterTransactions.Length == 0)
111 torben 2175 {
112     key.SetValue("FilterTransactions", "", RegistryValueKind.String);
113 torben 2168 throw new System.ArgumentException("FilterTransactions cannot be null or empty");
114 torben 2175 }
115 torben 2174 tmpFilterTransactions = tmpFilterTransactions.Replace(';', ',');
116 torben 2168 filterTranscations = Regex.Split(tmpFilterTransactions, ",");
117 torben 2167
118     for (int i = 0; i < filterTranscations.Length; i++)
119     {
120 torben 2174 filterTranscations[i] = filterTranscations[i].Trim().ToUpper();
121 torben 2167 }
122    
123 torben 2168 ////////////
124 torben 2167
125 torben 2168 }
126 torben 2167
127 torben 2168
128 torben 2167
129    
130 torben 2163 public void transportAllMessages()
131     {
132 torben 2177 try
133     {
134     transportMessagesWorker();
135     }
136     catch (Exception e)
137     {
138     logFile.addSingleLogEntry("Error during transportAllMessages: " + e.Message);
139 torben 2181 Console.WriteLine(e.StackTrace);
140 torben 2177 }
141     }
142    
143     private void transportMessagesWorker()
144     {
145 torben 2168 int messageCount = 0;
146    
147     MQQueueManager mqMgr = null;
148     MQQueue queueIndbakke = null;
149     MQQueue queueMysql = null;
150     MQQueue queueDimaps = null;
151     MQQueue queueStore = null;
152 torben 2177
153     using (Logfile translog = new Logfile(LogfileType.LogTransactions, "filter", logDirectory))
154 torben 2168 try
155     {
156     //MQ options
157 torben 2172 Hashtable connProps = MQHelper.getConnectionProperties(mqHost, mqChannel);
158 torben 2168 int openInputOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
159     int openOutputOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
160    
161    
162     //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
163     mqMgr = new MQQueueManager(mqQueueManager, connProps);//stage 1 connect to mq
164    
165    
166 torben 2175 queueIndbakke = MQHelper.openQueueHelper(queueNameIndbakke, mqMgr, openInputOptions);
167 torben 2168
168 torben 2175 queueMysql = MQHelper.openQueueHelper(queueNameMysql, mqMgr, openOutputOptions);
169     queueDimaps = MQHelper.openQueueHelper(queueNameDimaps, mqMgr, openOutputOptions);
170     queueStore = MQHelper.openQueueHelper(queueNameStore, mqMgr, openOutputOptions);
171 torben 2168
172 torben 2175
173 torben 2168 bool isContinue = true;
174     while (isContinue)
175     {
176    
177     MQMessage mqMsg = new MQMessage();
178     MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
179    
180    
181     try
182     {
183     queueIndbakke.Get(mqMsg, mqGetMsgOpts);
184     if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
185     {
186 torben 2181 if (mqMsg.MessageLength == 0) //Skip empty transactions (so far only seen on my test server)
187     continue;
188    
189     //System.Console.WriteLine("LEN>" + mqMsg.MessageLength);
190 torben 2172 string salt2String = mqMsg.ReadString(mqMsg.MessageLength);
191 torben 2181
192 torben 2168
193    
194 torben 2172 // validér at headeren er gyldig
195     if ( Salt2Helper.validateSalt2Header(salt2String) == false)
196 torben 2168 {
197 torben 2177
198    
199     using (Logfile discardedlog = new Logfile(LogfileType.LogDiscarded, "filter", logDirectory))
200 torben 2168 {
201 torben 2177 discardedlog.writeLogEntry(salt2String);
202 torben 2168 }
203     continue; //gå frem til at tage næste transaktion fra køen
204     }
205    
206 torben 2177 translog.writeLogEntry(salt2String);
207    
208 torben 2171 MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
209     // same as MQPMO_DEFAULT
210 torben 2168
211 torben 2171 MQMessage msg = new MQMessage();
212     msg.Format = MQC.MQFMT_STRING;
213     msg.CharacterSet = 1252;
214 torben 2172 msg.WriteString(salt2String);
215 torben 2168
216 torben 2172 Salt2Header header = Salt2Helper.parseHeader(salt2String);
217 torben 2171 queueMysql.Put(msg, pmo);
218     if (saveForLater(header))
219     {
220     queueStore.Put(msg, pmo);
221     }
222     else
223     {
224     queueDimaps.Put(msg, pmo);
225     }
226 torben 2168
227 torben 2177
228 torben 2168
229     messageCount++;// increment per run message counter
230 torben 2183 if (messageCount >= TRANSACTIONS_PER_RUN) // if we have moved TRANSACTIONS_PER_RUN messages in this run - let it go
231 torben 2168 {
232     isContinue = false;
233     }
234    
235    
236    
237    
238     }
239     else
240     {
241     System.Console.WriteLine("Non-text message");
242     }
243     }
244     catch (MQException mqe)
245     {
246     isContinue = false;
247    
248     // report reason, if any
249     if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
250     {
251     // special report for normal end
252     System.Console.WriteLine("no more messages");
253     }
254     else
255     {
256     // general report for other reasons
257     System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
258    
259     }
260    
261     }
262    
263     }
264    
265    
266    
267     }
268     finally
269     {
270 torben 2172 MQHelper.closeQueueSafe(queueIndbakke);
271     MQHelper.closeQueueSafe(queueMysql);
272     MQHelper.closeQueueSafe(queueDimaps);
273     MQHelper.closeQueueSafe(queueStore);
274 torben 2168
275 torben 2172 MQHelper.closeQueueManagerSafe(mqMgr);
276 torben 2168 }
277 torben 2163 }
278    
279 torben 2171 private Boolean saveForLater(Salt2Header header)
280     {
281 torben 2174
282 torben 2180 TimeSpan now = DateTime.Now.TimeOfDay;
283    
284     if (now >= silentPeriodBegin && now < silentPeriodEnd)
285 torben 2171 {
286 torben 2182
287     if (header.afsender == "DAO") //DAO transaktioner må altid komme igennem
288     return false;
289 torben 2171
290 torben 2173 if (contains(header.transaktionForkortelse, this.filterTranscations) ) //Så længe vi skal være .net3.0 kompatible er LINQ problematisk (LINQ kræver 3.5)
291 torben 2171 {
292     return true;
293     }
294     else
295     {
296     return false;
297     }
298     }
299     else //normal operation - send straight trough
300     {
301     return false;
302     }
303    
304     }
305    
306     private bool contains(string needle, string[] haystack) // s
307     {
308     foreach(string hay in haystack)
309     {
310     if (needle.Equals(hay))
311     {
312     return true;
313     }
314     }
315    
316     return false;
317     }
318    
319 torben 2167
320    
321 torben 2168
322 torben 2163 /* Singleton */
323     private static FilterController instance = null;
324     public static FilterController getInstance()
325     {
326     if (instance == null)
327     instance = new FilterController();
328    
329     return instance;
330     }
331    
332     }
333     }

  ViewVC Help
Powered by ViewVC 1.1.20