/[projects]/dao/DaoMqPump2/MQFilter/FilterController.cs
ViewVC logotype

Annotation of /dao/DaoMqPump2/MQFilter/FilterController.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2177 - (hide annotations) (download)
Mon May 19 19:51:47 2014 UTC (10 years ago) by torben
File size: 10449 byte(s)
Improved Logging
1 torben 2163 using System;
2 torben 2168 using System.Collections;
3 torben 2163 using System.Collections.Generic;
4 torben 2167 using System.IO;
5     using System.Text.RegularExpressions;
6 torben 2163
7 torben 2168 using IBM.WMQ;
8    
9 torben 2167 using Microsoft.Win32;
10    
11 torben 2168 using DaoCommon;
12    
13    
14 torben 2163 namespace MQFilter
15     {
16     class FilterController
17     {
18    
19 torben 2167 public string mqHost { get; private set; }
20     public string mqChannel { get; private set; }
21     public string mqQueueManager { get; private set; }
22    
23     public string logDirectory { get; private set; }
24    
25     public string[] filterTranscations { get; private set; }
26    
27 torben 2177 Logfile logFile;
28 torben 2167
29 torben 2177
30 torben 2168 public static readonly string queueNameIndbakke = "DAO.INDBAKKE";
31     public static readonly string queueNameDimaps = "DAO.SAMLET";
32     public static readonly string queueNameMysql = "DAO.ALL";
33     public static readonly string queueNameStore = "DAO.STORE";
34    
35    
36 torben 2167 protected FilterController()
37     {
38     initialize();
39 torben 2177 logFile = new Logfile(LogfileType.LogEvents, "filter", logDirectory);
40     logFile.addSingleLogEntry("Starting service");
41 torben 2167 }
42    
43     private void initialize()
44     {
45     Console.WriteLine("FilterController: Loading config");
46     RegistryKey key = Registry.LocalMachine.CreateSubKey("Software\\DAO\\MQFilter");
47    
48    
49     //Læser globale MQ Parametre
50     mqHost = (string)key.GetValue("MQHost");
51     if (mqHost == null || mqHost.Length == 0)
52 torben 2175 {
53     key.SetValue("MQHost", "", RegistryValueKind.String);
54 torben 2167 throw new System.ArgumentException("MQHost cannot be null or empty");
55 torben 2175 }
56 torben 2167
57     mqChannel = (string)key.GetValue("MQChannel");
58     if (mqChannel == null || mqChannel.Length == 0)
59 torben 2175 {
60     key.SetValue("MQChannel", "", RegistryValueKind.String);
61 torben 2167 throw new System.ArgumentException("MQChannel cannot be null or empty");
62 torben 2175 }
63 torben 2167
64     mqQueueManager = (string)key.GetValue("MQQueueManager");
65     if (mqQueueManager == null || mqQueueManager.Length == 0)
66 torben 2175 {
67     key.SetValue("MQQueueManager", "", RegistryValueKind.String);
68 torben 2167 throw new System.ArgumentException("MQQueueManager cannot be null or empty");
69 torben 2175 }
70 torben 2167
71 torben 2168 ////////////
72 torben 2167
73     logDirectory = (string)key.GetValue("LogDirectory");
74     if (logDirectory == null || logDirectory.Length == 0)
75 torben 2175 {
76     key.SetValue("LogDirectory", "", RegistryValueKind.String);
77 torben 2167 throw new System.ArgumentException("LogDirectory cannot be null or empty");
78 torben 2175 }
79 torben 2167
80 torben 2168 if (Directory.Exists(logDirectory) == false)
81     {
82     Directory.CreateDirectory(logDirectory);
83     }
84    
85     ////////////
86    
87 torben 2167 String tmpFilterTransactions = (string)key.GetValue("FilterTransactions");
88     if (tmpFilterTransactions == null || tmpFilterTransactions.Length == 0)
89 torben 2175 {
90     key.SetValue("FilterTransactions", "", RegistryValueKind.String);
91 torben 2168 throw new System.ArgumentException("FilterTransactions cannot be null or empty");
92 torben 2175 }
93 torben 2174 tmpFilterTransactions = tmpFilterTransactions.Replace(';', ',');
94 torben 2168 filterTranscations = Regex.Split(tmpFilterTransactions, ",");
95 torben 2167
96     for (int i = 0; i < filterTranscations.Length; i++)
97     {
98 torben 2174 filterTranscations[i] = filterTranscations[i].Trim().ToUpper();
99 torben 2167 }
100    
101 torben 2168 ////////////
102 torben 2167
103 torben 2168 }
104 torben 2167
105 torben 2168
106 torben 2167
107    
108 torben 2163 public void transportAllMessages()
109     {
110 torben 2177 try
111     {
112     transportMessagesWorker();
113     }
114     catch (Exception e)
115     {
116     logFile.addSingleLogEntry("Error during transportAllMessages: " + e.Message);
117     }
118     }
119    
120     private void transportMessagesWorker()
121     {
122 torben 2168 int messageCount = 0;
123    
124     MQQueueManager mqMgr = null;
125     MQQueue queueIndbakke = null;
126     MQQueue queueMysql = null;
127     MQQueue queueDimaps = null;
128     MQQueue queueStore = null;
129 torben 2177
130     using (Logfile translog = new Logfile(LogfileType.LogTransactions, "filter", logDirectory))
131 torben 2168 try
132     {
133     //MQ options
134 torben 2172 Hashtable connProps = MQHelper.getConnectionProperties(mqHost, mqChannel);
135 torben 2168 int openInputOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
136     int openOutputOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
137    
138    
139     //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
140     mqMgr = new MQQueueManager(mqQueueManager, connProps);//stage 1 connect to mq
141    
142    
143 torben 2175 queueIndbakke = MQHelper.openQueueHelper(queueNameIndbakke, mqMgr, openInputOptions);
144 torben 2168
145 torben 2175 queueMysql = MQHelper.openQueueHelper(queueNameMysql, mqMgr, openOutputOptions);
146     queueDimaps = MQHelper.openQueueHelper(queueNameDimaps, mqMgr, openOutputOptions);
147     queueStore = MQHelper.openQueueHelper(queueNameStore, mqMgr, openOutputOptions);
148 torben 2168
149 torben 2175
150 torben 2168 bool isContinue = true;
151     while (isContinue)
152     {
153    
154     MQMessage mqMsg = new MQMessage();
155     MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
156    
157    
158     try
159     {
160     queueIndbakke.Get(mqMsg, mqGetMsgOpts);
161     if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
162     {
163 torben 2172 string salt2String = mqMsg.ReadString(mqMsg.MessageLength);
164 torben 2168 //System.Console.WriteLine(msgString);
165    
166    
167 torben 2172 // validér at headeren er gyldig
168     if ( Salt2Helper.validateSalt2Header(salt2String) == false)
169 torben 2168 {
170 torben 2177
171    
172     using (Logfile discardedlog = new Logfile(LogfileType.LogDiscarded, "filter", logDirectory))
173 torben 2168 {
174 torben 2177 discardedlog.writeLogEntry(salt2String);
175 torben 2168 }
176     continue; //gå frem til at tage næste transaktion fra køen
177     }
178    
179 torben 2177 translog.writeLogEntry(salt2String);
180    
181 torben 2171 MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
182     // same as MQPMO_DEFAULT
183 torben 2168
184 torben 2171 MQMessage msg = new MQMessage();
185     msg.Format = MQC.MQFMT_STRING;
186     msg.CharacterSet = 1252;
187 torben 2172 msg.WriteString(salt2String);
188 torben 2168
189 torben 2172 Salt2Header header = Salt2Helper.parseHeader(salt2String);
190 torben 2171 queueMysql.Put(msg, pmo);
191     if (saveForLater(header))
192     {
193     queueStore.Put(msg, pmo);
194     }
195     else
196     {
197     queueDimaps.Put(msg, pmo);
198     }
199 torben 2168
200 torben 2177
201 torben 2168
202     messageCount++;// increment per run message counter
203     if (messageCount >= 10000) // if we have moved 10000 messages in this run - let it go
204     {
205     isContinue = false;
206     }
207    
208    
209    
210    
211     }
212     else
213     {
214     System.Console.WriteLine("Non-text message");
215     }
216     }
217     catch (MQException mqe)
218     {
219     isContinue = false;
220    
221     // report reason, if any
222     if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
223     {
224     // special report for normal end
225     System.Console.WriteLine("no more messages");
226     }
227     else
228     {
229     // general report for other reasons
230     System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
231    
232     }
233    
234     }
235    
236     }
237    
238    
239    
240     }
241     finally
242     {
243 torben 2172 MQHelper.closeQueueSafe(queueIndbakke);
244     MQHelper.closeQueueSafe(queueMysql);
245     MQHelper.closeQueueSafe(queueDimaps);
246     MQHelper.closeQueueSafe(queueStore);
247 torben 2168
248 torben 2172 MQHelper.closeQueueManagerSafe(mqMgr);
249 torben 2168 }
250 torben 2163 }
251    
252 torben 2171 private Boolean saveForLater(Salt2Header header)
253     {
254 torben 2174
255 torben 2171 DateTime now = DateTime.Now;
256     int hour = now.Hour;
257     if (hour >= 14 && hour < 18)
258     {
259    
260 torben 2173 if (contains(header.transaktionForkortelse, this.filterTranscations) ) //Så længe vi skal være .net3.0 kompatible er LINQ problematisk (LINQ kræver 3.5)
261 torben 2171 {
262     return true;
263     }
264     else
265     {
266     return false;
267     }
268     }
269     else //normal operation - send straight trough
270     {
271     return false;
272     }
273    
274     }
275    
276     private bool contains(string needle, string[] haystack) // s
277     {
278     foreach(string hay in haystack)
279     {
280     if (needle.Equals(hay))
281     {
282     return true;
283     }
284     }
285    
286     return false;
287     }
288    
289 torben 2167
290    
291 torben 2168
292 torben 2163 /* Singleton */
293     private static FilterController instance = null;
294     public static FilterController getInstance()
295     {
296     if (instance == null)
297     instance = new FilterController();
298    
299     return instance;
300     }
301    
302     }
303     }

  ViewVC Help
Powered by ViewVC 1.1.20