/[projects]/dao/DaoMqPump2/MQFilter/FilterController.cs
ViewVC logotype

Diff of /dao/DaoMqPump2/MQFilter/FilterController.cs

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 2163 by torben, Fri May 16 17:58:30 2014 UTC revision 2181 by torben, Tue May 20 20:20:48 2014 UTC
# Line 1  Line 1 
1  using System;  using System;
2    using System.Collections;
3  using System.Collections.Generic;  using System.Collections.Generic;
4  using System.Text;  using System.IO;
5    using System.Text.RegularExpressions;
6    
7    using IBM.WMQ;
8    
9    using Microsoft.Win32;
10    
11    using DaoCommon;
12    
13    
14  namespace MQFilter  namespace MQFilter
15  {  {
16      class FilterController      class FilterController
17      {      {
18    
19            public string mqHost { get; private set; }
20            public string mqChannel { get; private set; }
21            public string mqQueueManager { get; private set; }
22    
23            public string logDirectory { get; private set; }
24    
25            public string[] filterTranscations { get; private set; }
26    
27            Logfile logFile;
28    
29    
30            public static readonly string queueNameIndbakke = "DAO.INDBAKKE";
31            public static readonly string queueNameDimaps = "DAO.SAMLET";
32            public static readonly string queueNameMysql = "DAO.ALL";
33            public static readonly string queueNameStore = "DAO.STORE";
34    
35            TimeSpan silentPeriodBegin;
36            TimeSpan silentPeriodEnd;
37    
38            protected FilterController()
39            {
40                initialize();
41                logFile = new Logfile(LogfileType.LogEvents, "filter", logDirectory);
42                logFile.addSingleLogEntry("Starting service");
43            }
44    
45            private void initialize()
46            {
47                Console.WriteLine("FilterController: Loading config");
48                RegistryKey key = Registry.LocalMachine.CreateSubKey("Software\\DAO\\MQFilter");
49    
50    
51                //Læser globale MQ Parametre
52                mqHost = (string)key.GetValue("MQHost");
53                if (mqHost == null || mqHost.Length == 0)
54                {
55                    key.SetValue("MQHost", "", RegistryValueKind.String);
56                    throw new System.ArgumentException("MQHost cannot be null or empty");
57                }
58    
59                mqChannel = (string)key.GetValue("MQChannel");
60                if (mqChannel == null || mqChannel.Length == 0)
61                {
62                    key.SetValue("MQChannel", "", RegistryValueKind.String);
63                    throw new System.ArgumentException("MQChannel cannot be null or empty");
64                }
65    
66                mqQueueManager = (string)key.GetValue("MQQueueManager");
67                if (mqQueueManager == null || mqQueueManager.Length == 0)
68                {
69                    key.SetValue("MQQueueManager", "", RegistryValueKind.String);
70                    throw new System.ArgumentException("MQQueueManager cannot be null or empty");
71                }
72    
73                ////////////
74    
75                logDirectory = (string)key.GetValue("LogDirectory");
76                if (logDirectory == null || logDirectory.Length == 0)
77                {
78                    key.SetValue("LogDirectory", "", RegistryValueKind.String);
79                    throw new System.ArgumentException("LogDirectory cannot be null or empty");
80                }
81    
82                if (Directory.Exists(logDirectory) == false)
83                {
84                    Directory.CreateDirectory(logDirectory);
85                }
86    
87                ////////////
88    
89                string silentBeginStr = (string)key.GetValue("SilentPeriodBegin");
90                if (silentBeginStr == null || silentBeginStr.Length == 0)
91                {
92                    key.SetValue("SilentPeriodBegin", "", RegistryValueKind.String);
93                    throw new System.ArgumentException("SilentPeriodBegin cannot be null or empty");
94                }
95                silentPeriodBegin = TimeSpan.Parse(silentBeginStr);
96    
97                string silentEndStr = (string)key.GetValue("SilentPeriodEnd");
98                if (silentEndStr == null || silentEndStr.Length == 0)
99                {
100                    key.SetValue("SilentPeriodEnd", "", RegistryValueKind.String);
101                    throw new System.ArgumentException("SilentPeriodEnd cannot be null or empty");
102                }
103                silentPeriodEnd = TimeSpan.Parse(silentEndStr);
104    
105                ////////////
106    
107                String tmpFilterTransactions = (string)key.GetValue("FilterTransactions");
108                if (tmpFilterTransactions == null || tmpFilterTransactions.Length == 0)
109                {
110                    key.SetValue("FilterTransactions", "", RegistryValueKind.String);
111                    throw new System.ArgumentException("FilterTransactions cannot be null or empty");
112                }
113                tmpFilterTransactions = tmpFilterTransactions.Replace(';', ',');
114                filterTranscations = Regex.Split(tmpFilterTransactions, ",");
115    
116                for (int i = 0; i < filterTranscations.Length; i++)
117                {
118                    filterTranscations[i] = filterTranscations[i].Trim().ToUpper();
119                }
120    
121                ////////////
122    
123            }
124    
125    
126    
127    
128          public void transportAllMessages()          public void transportAllMessages()
129          {          {
130                try
131                {
132                    transportMessagesWorker();
133                }
134                catch (Exception e)
135                {
136                    logFile.addSingleLogEntry("Error during transportAllMessages: " + e.Message);
137                    Console.WriteLine(e.StackTrace);
138                }
139          }          }
140    
141            private void transportMessagesWorker()
142            {
143                int messageCount = 0;
144    
145                MQQueueManager mqMgr = null;
146                MQQueue queueIndbakke = null;
147                MQQueue queueMysql = null;
148                MQQueue queueDimaps = null;
149                MQQueue queueStore = null;
150    
151                using (Logfile translog = new Logfile(LogfileType.LogTransactions, "filter", logDirectory))
152                try
153                {
154                    //MQ options
155                    Hashtable connProps = MQHelper.getConnectionProperties(mqHost, mqChannel);
156                    int openInputOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
157                    int openOutputOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
158    
159    
160                    //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
161                    mqMgr = new MQQueueManager(mqQueueManager, connProps);//stage 1 connect to mq
162    
163    
164                    queueIndbakke = MQHelper.openQueueHelper(queueNameIndbakke, mqMgr, openInputOptions);
165    
166                    queueMysql = MQHelper.openQueueHelper(queueNameMysql, mqMgr, openOutputOptions);
167                    queueDimaps = MQHelper.openQueueHelper(queueNameDimaps, mqMgr, openOutputOptions);
168                    queueStore = MQHelper.openQueueHelper(queueNameStore, mqMgr, openOutputOptions);
169    
170    
171                    bool isContinue = true;
172                    while (isContinue)
173                    {
174    
175                        MQMessage mqMsg = new MQMessage();
176                        MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
177    
178    
179                        try
180                        {
181                            queueIndbakke.Get(mqMsg, mqGetMsgOpts);
182                            if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
183                            {
184                                if (mqMsg.MessageLength == 0) //Skip empty transactions (so far only seen on my test server)
185                                    continue;
186    
187                                //System.Console.WriteLine("LEN>" + mqMsg.MessageLength);
188                                string salt2String = mqMsg.ReadString(mqMsg.MessageLength);
189                                
190    
191    
192                                // validér at headeren er gyldig
193                                if ( Salt2Helper.validateSalt2Header(salt2String) == false)
194                                {
195    
196    
197                                    using (Logfile discardedlog = new Logfile(LogfileType.LogDiscarded, "filter", logDirectory))
198                                    {
199                                        discardedlog.writeLogEntry(salt2String);
200                                    }
201                                    continue; //gå frem til at tage næste transaktion fra køen
202                                }
203    
204                                translog.writeLogEntry(salt2String);
205    
206                                MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
207                                // same as MQPMO_DEFAULT
208    
209                                MQMessage msg = new MQMessage();
210                                msg.Format = MQC.MQFMT_STRING;
211                                msg.CharacterSet = 1252;
212                                msg.WriteString(salt2String);
213    
214                                Salt2Header header = Salt2Helper.parseHeader(salt2String);
215                                queueMysql.Put(msg, pmo);
216                                if (saveForLater(header))
217                                {
218                                    queueStore.Put(msg, pmo);
219                                }
220                                else
221                                {
222                                    queueDimaps.Put(msg, pmo);
223                                }
224    
225                                
226    
227                                messageCount++;// increment per run message counter
228                                if (messageCount >= 10000) // if we have moved  10000 messages in this run - let it go
229                                {
230                                    isContinue = false;
231                                }
232    
233    
234    
235    
236                            }
237                            else
238                            {
239                                System.Console.WriteLine("Non-text message");
240                            }
241                        }
242                        catch (MQException mqe)
243                        {
244                            isContinue = false;
245    
246                            // report reason, if any
247                            if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
248                            {
249                                // special report for normal end
250                                System.Console.WriteLine("no more messages");
251                            }
252                            else
253                            {
254                                // general report for other reasons
255                                System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
256    
257                            }
258    
259                        }
260    
261                    }
262    
263    
264    
265                }
266                finally
267                {
268                    MQHelper.closeQueueSafe(queueIndbakke);
269                    MQHelper.closeQueueSafe(queueMysql);
270                    MQHelper.closeQueueSafe(queueDimaps);
271                    MQHelper.closeQueueSafe(queueStore);
272    
273                    MQHelper.closeQueueManagerSafe(mqMgr);
274                }
275            }
276    
277            private Boolean saveForLater(Salt2Header header)
278            {
279    
280                TimeSpan now = DateTime.Now.TimeOfDay;
281                
282                if (now >= silentPeriodBegin && now < silentPeriodEnd)
283                {
284                    
285                    if (contains(header.transaktionForkortelse, this.filterTranscations) ) //Så længe vi skal være .net3.0 kompatible er LINQ problematisk (LINQ kræver 3.5)
286                    {
287                        return true;
288                    }
289                    else
290                    {
291                        return false;
292                    }
293                }
294                else //normal operation - send straight trough
295                {
296                    return false;
297                }
298    
299            }
300    
301            private bool contains(string needle, string[] haystack) // s
302            {
303                foreach(string hay in haystack)
304                {
305                    if (needle.Equals(hay))
306                    {
307                        return true;
308                    }
309                }
310    
311                return false;            
312            }
313    
314    
315    
316    
317          /* Singleton */          /* Singleton */
318          private static FilterController instance = null;          private static FilterController instance = null;
319          public static FilterController getInstance()          public static FilterController getInstance()

Legend:
Removed from v.2163  
changed lines
  Added in v.2181

  ViewVC Help
Powered by ViewVC 1.1.20