/[projects]/dao/DaoMqPump2/MQFilter/FilterController.cs
ViewVC logotype

Annotation of /dao/DaoMqPump2/MQFilter/FilterController.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2175 - (hide annotations) (download)
Mon May 19 11:59:45 2014 UTC (10 years ago) by torben
File size: 9903 byte(s)
Better handling of openQueue exceptions
1 torben 2163 using System;
2 torben 2168 using System.Collections;
3 torben 2163 using System.Collections.Generic;
4 torben 2167 using System.IO;
5     using System.Text.RegularExpressions;
6 torben 2163
7 torben 2168 using IBM.WMQ;
8    
9 torben 2167 using Microsoft.Win32;
10    
11 torben 2168 using DaoCommon;
12    
13    
14 torben 2163 namespace MQFilter
15     {
16     class FilterController
17     {
18    
19 torben 2167 public string mqHost { get; private set; }
20     public string mqChannel { get; private set; }
21     public string mqQueueManager { get; private set; }
22    
23     public string logDirectory { get; private set; }
24    
25     public string[] filterTranscations { get; private set; }
26    
27    
28 torben 2168 public static readonly string queueNameIndbakke = "DAO.INDBAKKE";
29     public static readonly string queueNameDimaps = "DAO.SAMLET";
30     public static readonly string queueNameMysql = "DAO.ALL";
31     public static readonly string queueNameStore = "DAO.STORE";
32    
33    
34 torben 2167 protected FilterController()
35     {
36     initialize();
37     }
38    
39     private void initialize()
40     {
41     Console.WriteLine("FilterController: Loading config");
42     RegistryKey key = Registry.LocalMachine.CreateSubKey("Software\\DAO\\MQFilter");
43    
44    
45     //Læser globale MQ Parametre
46     mqHost = (string)key.GetValue("MQHost");
47     if (mqHost == null || mqHost.Length == 0)
48 torben 2175 {
49     key.SetValue("MQHost", "", RegistryValueKind.String);
50 torben 2167 throw new System.ArgumentException("MQHost cannot be null or empty");
51 torben 2175 }
52 torben 2167
53     mqChannel = (string)key.GetValue("MQChannel");
54     if (mqChannel == null || mqChannel.Length == 0)
55 torben 2175 {
56     key.SetValue("MQChannel", "", RegistryValueKind.String);
57 torben 2167 throw new System.ArgumentException("MQChannel cannot be null or empty");
58 torben 2175 }
59 torben 2167
60     mqQueueManager = (string)key.GetValue("MQQueueManager");
61     if (mqQueueManager == null || mqQueueManager.Length == 0)
62 torben 2175 {
63     key.SetValue("MQQueueManager", "", RegistryValueKind.String);
64 torben 2167 throw new System.ArgumentException("MQQueueManager cannot be null or empty");
65 torben 2175 }
66 torben 2167
67 torben 2168 ////////////
68 torben 2167
69     logDirectory = (string)key.GetValue("LogDirectory");
70     if (logDirectory == null || logDirectory.Length == 0)
71 torben 2175 {
72     key.SetValue("LogDirectory", "", RegistryValueKind.String);
73 torben 2167 throw new System.ArgumentException("LogDirectory cannot be null or empty");
74 torben 2175 }
75 torben 2167
76 torben 2168 if (Directory.Exists(logDirectory) == false)
77     {
78     Directory.CreateDirectory(logDirectory);
79     }
80    
81     ////////////
82    
83 torben 2167 String tmpFilterTransactions = (string)key.GetValue("FilterTransactions");
84     if (tmpFilterTransactions == null || tmpFilterTransactions.Length == 0)
85 torben 2175 {
86     key.SetValue("FilterTransactions", "", RegistryValueKind.String);
87 torben 2168 throw new System.ArgumentException("FilterTransactions cannot be null or empty");
88 torben 2175 }
89 torben 2174 tmpFilterTransactions = tmpFilterTransactions.Replace(';', ',');
90 torben 2168 filterTranscations = Regex.Split(tmpFilterTransactions, ",");
91 torben 2167
92     for (int i = 0; i < filterTranscations.Length; i++)
93     {
94 torben 2174 filterTranscations[i] = filterTranscations[i].Trim().ToUpper();
95 torben 2167 }
96    
97 torben 2168 ////////////
98 torben 2167
99 torben 2168 }
100 torben 2167
101 torben 2168
102 torben 2167
103    
104 torben 2172
105 torben 2163 public void transportAllMessages()
106     {
107 torben 2168 int messageCount = 0;
108    
109     MQQueueManager mqMgr = null;
110     MQQueue queueIndbakke = null;
111     MQQueue queueMysql = null;
112     MQQueue queueDimaps = null;
113     MQQueue queueStore = null;
114     try
115     {
116     //MQ options
117 torben 2172 Hashtable connProps = MQHelper.getConnectionProperties(mqHost, mqChannel);
118 torben 2168 int openInputOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
119     int openOutputOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
120    
121    
122     //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
123     mqMgr = new MQQueueManager(mqQueueManager, connProps);//stage 1 connect to mq
124    
125    
126 torben 2175 queueIndbakke = MQHelper.openQueueHelper(queueNameIndbakke, mqMgr, openInputOptions);
127 torben 2168
128 torben 2175 queueMysql = MQHelper.openQueueHelper(queueNameMysql, mqMgr, openOutputOptions);
129     queueDimaps = MQHelper.openQueueHelper(queueNameDimaps, mqMgr, openOutputOptions);
130     queueStore = MQHelper.openQueueHelper(queueNameStore, mqMgr, openOutputOptions);
131 torben 2168
132 torben 2175
133 torben 2168 bool isContinue = true;
134     while (isContinue)
135     {
136    
137     MQMessage mqMsg = new MQMessage();
138     MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
139    
140    
141     try
142     {
143     queueIndbakke.Get(mqMsg, mqGetMsgOpts);
144     if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
145     {
146 torben 2172 string salt2String = mqMsg.ReadString(mqMsg.MessageLength);
147 torben 2168 //System.Console.WriteLine(msgString);
148    
149    
150 torben 2172 // validér at headeren er gyldig
151     if ( Salt2Helper.validateSalt2Header(salt2String) == false)
152 torben 2168 {
153     string discarded_filename = Logfile.getLogFilename(LogfileType.LogTransactions, logDirectory, "filter");
154     using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))
155     {
156 torben 2172 discardedlog.WriteLine(Logfile.getNowString() + " " + salt2String);
157 torben 2168 }
158     continue; //gå frem til at tage næste transaktion fra køen
159     }
160    
161 torben 2171 MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
162     // same as MQPMO_DEFAULT
163 torben 2168
164 torben 2171 MQMessage msg = new MQMessage();
165     msg.Format = MQC.MQFMT_STRING;
166     msg.CharacterSet = 1252;
167 torben 2172 msg.WriteString(salt2String);
168 torben 2168
169 torben 2172 Salt2Header header = Salt2Helper.parseHeader(salt2String);
170 torben 2171 queueMysql.Put(msg, pmo);
171     if (saveForLater(header))
172     {
173     queueStore.Put(msg, pmo);
174     }
175     else
176     {
177     queueDimaps.Put(msg, pmo);
178     }
179 torben 2168
180    
181    
182     messageCount++;// increment per run message counter
183     if (messageCount >= 10000) // if we have moved 10000 messages in this run - let it go
184     {
185     isContinue = false;
186     }
187    
188    
189    
190    
191     }
192     else
193     {
194     System.Console.WriteLine("Non-text message");
195     }
196     }
197     catch (MQException mqe)
198     {
199     isContinue = false;
200    
201     // report reason, if any
202     if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
203     {
204     // special report for normal end
205     System.Console.WriteLine("no more messages");
206     }
207     else
208     {
209     // general report for other reasons
210     System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
211    
212     }
213    
214     }
215    
216     }
217    
218    
219    
220     }
221     finally
222     {
223 torben 2172 MQHelper.closeQueueSafe(queueIndbakke);
224     MQHelper.closeQueueSafe(queueMysql);
225     MQHelper.closeQueueSafe(queueDimaps);
226     MQHelper.closeQueueSafe(queueStore);
227 torben 2168
228 torben 2172 MQHelper.closeQueueManagerSafe(mqMgr);
229 torben 2168 }
230 torben 2163 }
231    
232 torben 2171 private Boolean saveForLater(Salt2Header header)
233     {
234 torben 2174
235 torben 2171 DateTime now = DateTime.Now;
236     int hour = now.Hour;
237     if (hour >= 14 && hour < 18)
238     {
239    
240 torben 2173 if (contains(header.transaktionForkortelse, this.filterTranscations) ) //Så længe vi skal være .net3.0 kompatible er LINQ problematisk (LINQ kræver 3.5)
241 torben 2171 {
242     return true;
243     }
244     else
245     {
246     return false;
247     }
248     }
249     else //normal operation - send straight trough
250     {
251     return false;
252     }
253    
254     }
255    
256     private bool contains(string needle, string[] haystack) // s
257     {
258     foreach(string hay in haystack)
259     {
260     if (needle.Equals(hay))
261     {
262     return true;
263     }
264     }
265    
266     return false;
267     }
268    
269 torben 2167
270    
271 torben 2168
272 torben 2163 /* Singleton */
273     private static FilterController instance = null;
274     public static FilterController getInstance()
275     {
276     if (instance == null)
277     instance = new FilterController();
278    
279     return instance;
280     }
281    
282     }
283     }

  ViewVC Help
Powered by ViewVC 1.1.20