/[projects]/dao/DaoMqPump2/MQFilter/FilterController.cs
ViewVC logotype

Annotation of /dao/DaoMqPump2/MQFilter/FilterController.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2171 - (hide annotations) (download)
Fri May 16 21:50:17 2014 UTC (10 years ago) by torben
File size: 10373 byte(s)
Theoretically this should work
1 torben 2163 using System;
2 torben 2168 using System.Collections;
3 torben 2163 using System.Collections.Generic;
4 torben 2167 using System.IO;
5     using System.Text.RegularExpressions;
6 torben 2163
7 torben 2168 using IBM.WMQ;
8    
9 torben 2167 using Microsoft.Win32;
10    
11 torben 2168 using DaoCommon;
12    
13    
14 torben 2163 namespace MQFilter
15     {
16     class FilterController
17     {
18    
19 torben 2167 public string mqHost { get; private set; }
20     public string mqChannel { get; private set; }
21     public string mqQueueManager { get; private set; }
22    
23     public string logDirectory { get; private set; }
24    
25     public string[] filterTranscations { get; private set; }
26    
27    
28 torben 2168 public static readonly string queueNameIndbakke = "DAO.INDBAKKE";
29     public static readonly string queueNameDimaps = "DAO.SAMLET";
30     public static readonly string queueNameMysql = "DAO.ALL";
31     public static readonly string queueNameStore = "DAO.STORE";
32    
33    
34 torben 2167 protected FilterController()
35     {
36     initialize();
37     }
38    
39     private void initialize()
40     {
41     Console.WriteLine("FilterController: Loading config");
42     RegistryKey key = Registry.LocalMachine.CreateSubKey("Software\\DAO\\MQFilter");
43    
44    
45     //Læser globale MQ Parametre
46     mqHost = (string)key.GetValue("MQHost");
47     if (mqHost == null || mqHost.Length == 0)
48     throw new System.ArgumentException("MQHost cannot be null or empty");
49    
50     mqChannel = (string)key.GetValue("MQChannel");
51     if (mqChannel == null || mqChannel.Length == 0)
52     throw new System.ArgumentException("MQChannel cannot be null or empty");
53    
54     mqQueueManager = (string)key.GetValue("MQQueueManager");
55     if (mqQueueManager == null || mqQueueManager.Length == 0)
56     throw new System.ArgumentException("MQQueueManager cannot be null or empty");
57    
58 torben 2168 ////////////
59 torben 2167
60     logDirectory = (string)key.GetValue("LogDirectory");
61     if (logDirectory == null || logDirectory.Length == 0)
62     throw new System.ArgumentException("LogDirectory cannot be null or empty");
63    
64 torben 2168 if (Directory.Exists(logDirectory) == false)
65     {
66     Directory.CreateDirectory(logDirectory);
67     }
68    
69     ////////////
70    
71 torben 2167 String tmpFilterTransactions = (string)key.GetValue("FilterTransactions");
72     if (tmpFilterTransactions == null || tmpFilterTransactions.Length == 0)
73 torben 2168 throw new System.ArgumentException("FilterTransactions cannot be null or empty");
74     filterTranscations = Regex.Split(tmpFilterTransactions, ",");
75 torben 2167
76     for (int i = 0; i < filterTranscations.Length; i++)
77     {
78     filterTranscations[i] = filterTranscations[i].Trim();
79     }
80    
81 torben 2168 ////////////
82 torben 2167
83 torben 2168 }
84 torben 2167
85 torben 2168
86     private Hashtable getConnectionProperties()
87     {
88     Hashtable connProperties = new Hashtable();
89     connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
90     connProperties.Add(MQC.HOST_NAME_PROPERTY, mqHost);
91     connProperties.Add(MQC.CHANNEL_PROPERTY, mqChannel);
92     return connProperties;
93 torben 2167 }
94    
95    
96 torben 2163 public void transportAllMessages()
97     {
98 torben 2168 int messageCount = 0;
99    
100     MQQueueManager mqMgr = null;
101     MQQueue queueIndbakke = null;
102     MQQueue queueMysql = null;
103     MQQueue queueDimaps = null;
104     MQQueue queueStore = null;
105     try
106     {
107     //MQ options
108     Hashtable connProps = getConnectionProperties();
109     int openInputOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
110     int openOutputOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
111    
112    
113     //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
114     mqMgr = new MQQueueManager(mqQueueManager, connProps);//stage 1 connect to mq
115    
116     queueIndbakke = mqMgr.AccessQueue(queueNameIndbakke, openInputOptions);
117    
118     queueMysql = mqMgr.AccessQueue(queueNameMysql, openOutputOptions);
119     queueDimaps = mqMgr.AccessQueue(queueNameDimaps, openOutputOptions);
120     queueStore = mqMgr.AccessQueue(queueNameStore, openOutputOptions);
121    
122    
123     bool isContinue = true;
124     while (isContinue)
125     {
126    
127     MQMessage mqMsg = new MQMessage();
128     MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
129    
130    
131     try
132     {
133     queueIndbakke.Get(mqMsg, mqGetMsgOpts);
134     if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
135     {
136 torben 2171 string saltiiString = mqMsg.ReadString(mqMsg.MessageLength);
137 torben 2168 //System.Console.WriteLine(msgString);
138    
139    
140     // Hvis transaktionen starter med et ? er det ikke en gyldig transaktion
141     // validér ligeledes at headeren er gyldig
142 torben 2171 if (saltiiString.StartsWith("?") || Salt2Helper.validateSalt2Header(saltiiString) == false)
143 torben 2168 {
144     string discarded_filename = Logfile.getLogFilename(LogfileType.LogTransactions, logDirectory, "filter");
145     using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))
146     {
147 torben 2171 discardedlog.WriteLine(Logfile.getNowString() + " " + saltiiString);
148 torben 2168 }
149     continue; //gå frem til at tage næste transaktion fra køen
150     }
151    
152 torben 2171 MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
153     // same as MQPMO_DEFAULT
154 torben 2168
155 torben 2171 MQMessage msg = new MQMessage();
156     msg.Format = MQC.MQFMT_STRING;
157     msg.CharacterSet = 1252;
158     msg.WriteString(saltiiString);
159 torben 2168
160 torben 2171 Salt2Header header = Salt2Helper.parseHeader(saltiiString);
161     queueMysql.Put(msg, pmo);
162     if (saveForLater(header))
163     {
164     queueStore.Put(msg, pmo);
165     }
166     else
167     {
168     queueDimaps.Put(msg, pmo);
169     }
170 torben 2168
171    
172    
173     messageCount++;// increment per run message counter
174     if (messageCount >= 10000) // if we have moved 10000 messages in this run - let it go
175     {
176     isContinue = false;
177     }
178    
179    
180    
181    
182     }
183     else
184     {
185     System.Console.WriteLine("Non-text message");
186     }
187     }
188     catch (MQException mqe)
189     {
190     isContinue = false;
191    
192     // report reason, if any
193     if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
194     {
195     // special report for normal end
196     System.Console.WriteLine("no more messages");
197     }
198     else
199     {
200     // general report for other reasons
201     System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
202    
203     }
204    
205     }
206    
207     }
208    
209    
210    
211     }
212     finally
213     {
214     closeQueue(queueIndbakke);
215     closeQueue(queueMysql);
216     closeQueue(queueDimaps);
217     closeQueue(queueStore);
218    
219    
220     if (mqMgr != null && mqMgr.IsOpen)
221     {
222     try
223     {
224     mqMgr.Close();
225     }
226     catch (Exception e)
227     {
228     Console.WriteLine("Error cleaning up qmgr " + e.Message);
229     }
230     }
231     }
232 torben 2163 }
233    
234 torben 2171 private Boolean saveForLater(Salt2Header header)
235     {
236     DateTime now = DateTime.Now;
237     int hour = now.Hour;
238     if (hour >= 14 && hour < 18)
239     {
240    
241     if (contains(header.transaktionForkortelse, this.filterTranscations) ) //Så længe vi skal være net3.0 kompatible er LINQ problematisk
242     {
243     return true;
244     }
245     else
246     {
247     return false;
248     }
249     }
250     else //normal operation - send straight trough
251     {
252     return false;
253     }
254    
255     }
256    
257     private bool contains(string needle, string[] haystack) // s
258     {
259     foreach(string hay in haystack)
260     {
261     if (needle.Equals(hay))
262     {
263     return true;
264     }
265     }
266    
267     return false;
268     }
269    
270 torben 2168 private void closeQueue(MQQueue queue)
271     {
272     if (queue != null && queue.IsOpen)
273     {
274     try
275     {
276     queue.Close();
277     }
278     catch (Exception e)
279     {
280     Console.WriteLine("Error cleaning up queue " + e.Message);
281     }
282     }
283 torben 2167
284 torben 2168 }
285 torben 2167
286 torben 2168
287    
288 torben 2163 /* Singleton */
289     private static FilterController instance = null;
290     public static FilterController getInstance()
291     {
292     if (instance == null)
293     instance = new FilterController();
294    
295     return instance;
296     }
297    
298     }
299     }

  ViewVC Help
Powered by ViewVC 1.1.20