/[projects]/dao/DaoMqPump2/MQFilter/FilterController.cs
ViewVC logotype

Diff of /dao/DaoMqPump2/MQFilter/FilterController.cs

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 2163 by torben, Fri May 16 17:58:30 2014 UTC revision 2183 by torben, Wed May 21 09:56:21 2014 UTC
# Line 1  Line 1 
1  using System;  using System;
2    using System.Collections;
3  using System.Collections.Generic;  using System.Collections.Generic;
4  using System.Text;  using System.IO;
5    using System.Text.RegularExpressions;
6    
7    using IBM.WMQ;
8    
9    using Microsoft.Win32;
10    
11    using DaoCommon;
12    
13    
14  namespace MQFilter  namespace MQFilter
15  {  {
16      class FilterController      class FilterController
17      {      {
18    
19            public const int TRANSACTIONS_PER_RUN = 2000;
20    
21            public string mqHost { get; private set; }
22            public string mqChannel { get; private set; }
23            public string mqQueueManager { get; private set; }
24    
25            public string logDirectory { get; private set; }
26    
27            public string[] filterTranscations { get; private set; }
28    
29            Logfile logFile;
30    
31    
32            public static readonly string queueNameIndbakke = "DAO.INDBAKKE";
33            public static readonly string queueNameDimaps = "DAO.SAMLET";
34            public static readonly string queueNameMysql = "DAO.ALL";
35            public static readonly string queueNameStore = "DAO.STORE";
36    
37            TimeSpan silentPeriodBegin;
38            TimeSpan silentPeriodEnd;
39    
40            protected FilterController()
41            {
42                initialize();
43                logFile = new Logfile(LogfileType.LogEvents, "filter", logDirectory);
44                logFile.addSingleLogEntry("Starting service");
45            }
46    
47            private void initialize()
48            {
49                Console.WriteLine("FilterController: Loading config");
50                RegistryKey key = Registry.LocalMachine.CreateSubKey("Software\\DAO\\MQFilter");
51    
52    
53                //Læser globale MQ Parametre
54                mqHost = (string)key.GetValue("MQHost");
55                if (mqHost == null || mqHost.Length == 0)
56                {
57                    key.SetValue("MQHost", "", RegistryValueKind.String);
58                    throw new System.ArgumentException("MQHost cannot be null or empty");
59                }
60    
61                mqChannel = (string)key.GetValue("MQChannel");
62                if (mqChannel == null || mqChannel.Length == 0)
63                {
64                    key.SetValue("MQChannel", "", RegistryValueKind.String);
65                    throw new System.ArgumentException("MQChannel cannot be null or empty");
66                }
67    
68                mqQueueManager = (string)key.GetValue("MQQueueManager");
69                if (mqQueueManager == null || mqQueueManager.Length == 0)
70                {
71                    key.SetValue("MQQueueManager", "", RegistryValueKind.String);
72                    throw new System.ArgumentException("MQQueueManager cannot be null or empty");
73                }
74    
75                ////////////
76    
77                logDirectory = (string)key.GetValue("LogDirectory");
78                if (logDirectory == null || logDirectory.Length == 0)
79                {
80                    key.SetValue("LogDirectory", "", RegistryValueKind.String);
81                    throw new System.ArgumentException("LogDirectory cannot be null or empty");
82                }
83    
84                if (Directory.Exists(logDirectory) == false)
85                {
86                    Directory.CreateDirectory(logDirectory);
87                }
88    
89                ////////////
90    
91                string silentBeginStr = (string)key.GetValue("SilentPeriodBegin");
92                if (silentBeginStr == null || silentBeginStr.Length == 0)
93                {
94                    key.SetValue("SilentPeriodBegin", "", RegistryValueKind.String);
95                    throw new System.ArgumentException("SilentPeriodBegin cannot be null or empty");
96                }
97                silentPeriodBegin = TimeSpan.Parse(silentBeginStr);
98    
99                string silentEndStr = (string)key.GetValue("SilentPeriodEnd");
100                if (silentEndStr == null || silentEndStr.Length == 0)
101                {
102                    key.SetValue("SilentPeriodEnd", "", RegistryValueKind.String);
103                    throw new System.ArgumentException("SilentPeriodEnd cannot be null or empty");
104                }
105                silentPeriodEnd = TimeSpan.Parse(silentEndStr);
106    
107                ////////////
108    
109                String tmpFilterTransactions = (string)key.GetValue("FilterTransactions");
110                if (tmpFilterTransactions == null || tmpFilterTransactions.Length == 0)
111                {
112                    key.SetValue("FilterTransactions", "", RegistryValueKind.String);
113                    throw new System.ArgumentException("FilterTransactions cannot be null or empty");
114                }
115                tmpFilterTransactions = tmpFilterTransactions.Replace(';', ',');
116                filterTranscations = Regex.Split(tmpFilterTransactions, ",");
117    
118                for (int i = 0; i < filterTranscations.Length; i++)
119                {
120                    filterTranscations[i] = filterTranscations[i].Trim().ToUpper();
121                }
122    
123                ////////////
124    
125            }
126    
127    
128    
129    
130          public void transportAllMessages()          public void transportAllMessages()
131          {          {
132                try
133                {
134                    transportMessagesWorker();
135                }
136                catch (Exception e)
137                {
138                    logFile.addSingleLogEntry("Error during transportAllMessages: " + e.Message);
139                    Console.WriteLine(e.StackTrace);
140                }
141          }          }
142    
143            private void transportMessagesWorker()
144            {
145                int messageCount = 0;
146    
147                MQQueueManager mqMgr = null;
148                MQQueue queueIndbakke = null;
149                MQQueue queueMysql = null;
150                MQQueue queueDimaps = null;
151                MQQueue queueStore = null;
152    
153                using (Logfile translog = new Logfile(LogfileType.LogTransactions, "filter", logDirectory))
154                try
155                {
156                    //MQ options
157                    Hashtable connProps = MQHelper.getConnectionProperties(mqHost, mqChannel);
158                    int openInputOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
159                    int openOutputOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
160    
161    
162                    //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
163                    mqMgr = new MQQueueManager(mqQueueManager, connProps);//stage 1 connect to mq
164    
165    
166                    queueIndbakke = MQHelper.openQueueHelper(queueNameIndbakke, mqMgr, openInputOptions);
167    
168                    queueMysql = MQHelper.openQueueHelper(queueNameMysql, mqMgr, openOutputOptions);
169                    queueDimaps = MQHelper.openQueueHelper(queueNameDimaps, mqMgr, openOutputOptions);
170                    queueStore = MQHelper.openQueueHelper(queueNameStore, mqMgr, openOutputOptions);
171    
172    
173                    bool isContinue = true;
174                    while (isContinue)
175                    {
176    
177                        MQMessage mqMsg = new MQMessage();
178                        MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
179    
180    
181                        try
182                        {
183                            queueIndbakke.Get(mqMsg, mqGetMsgOpts);
184                            if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
185                            {
186                                if (mqMsg.MessageLength == 0) //Skip empty transactions (so far only seen on my test server)
187                                    continue;
188    
189                                //System.Console.WriteLine("LEN>" + mqMsg.MessageLength);
190                                string salt2String = mqMsg.ReadString(mqMsg.MessageLength);
191                                
192    
193    
194                                // validér at headeren er gyldig
195                                if ( Salt2Helper.validateSalt2Header(salt2String) == false)
196                                {
197    
198    
199                                    using (Logfile discardedlog = new Logfile(LogfileType.LogDiscarded, "filter", logDirectory))
200                                    {
201                                        discardedlog.writeLogEntry(salt2String);
202                                    }
203                                    continue; //gå frem til at tage næste transaktion fra køen
204                                }
205    
206                                translog.writeLogEntry(salt2String);
207    
208                                MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
209                                // same as MQPMO_DEFAULT
210    
211                                MQMessage msg = new MQMessage();
212                                msg.Format = MQC.MQFMT_STRING;
213                                msg.CharacterSet = 1252;
214                                msg.WriteString(salt2String);
215    
216                                Salt2Header header = Salt2Helper.parseHeader(salt2String);
217                                queueMysql.Put(msg, pmo);
218                                if (saveForLater(header))
219                                {
220                                    queueStore.Put(msg, pmo);
221                                }
222                                else
223                                {
224                                    queueDimaps.Put(msg, pmo);
225                                }
226    
227                                
228    
229                                messageCount++;// increment per run message counter
230                                if (messageCount >= TRANSACTIONS_PER_RUN) // if we have moved  TRANSACTIONS_PER_RUN messages in this run - let it go
231                                {
232                                    isContinue = false;
233                                }
234    
235    
236    
237    
238                            }
239                            else
240                            {
241                                System.Console.WriteLine("Non-text message");
242                            }
243                        }
244                        catch (MQException mqe)
245                        {
246                            isContinue = false;
247    
248                            // report reason, if any
249                            if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
250                            {
251                                // special report for normal end
252                                System.Console.WriteLine("no more messages");
253                            }
254                            else
255                            {
256                                // general report for other reasons
257                                System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
258    
259                            }
260    
261                        }
262    
263                    }
264    
265    
266    
267                }
268                finally
269                {
270                    MQHelper.closeQueueSafe(queueIndbakke);
271                    MQHelper.closeQueueSafe(queueMysql);
272                    MQHelper.closeQueueSafe(queueDimaps);
273                    MQHelper.closeQueueSafe(queueStore);
274    
275                    MQHelper.closeQueueManagerSafe(mqMgr);
276                }
277            }
278    
279            private Boolean saveForLater(Salt2Header header)
280            {
281    
282                TimeSpan now = DateTime.Now.TimeOfDay;
283                
284                if (now >= silentPeriodBegin && now < silentPeriodEnd)
285                {
286    
287                    if (header.afsender == "DAO") //DAO transaktioner må altid komme igennem
288                        return false;
289                    
290                    if (contains(header.transaktionForkortelse, this.filterTranscations) ) //Så længe vi skal være .net3.0 kompatible er LINQ problematisk (LINQ kræver 3.5)
291                    {
292                        return true;
293                    }
294                    else
295                    {
296                        return false;
297                    }
298                }
299                else //normal operation - send straight trough
300                {
301                    return false;
302                }
303    
304            }
305    
306            private bool contains(string needle, string[] haystack) // s
307            {
308                foreach(string hay in haystack)
309                {
310                    if (needle.Equals(hay))
311                    {
312                        return true;
313                    }
314                }
315    
316                return false;            
317            }
318    
319    
320    
321    
322          /* Singleton */          /* Singleton */
323          private static FilterController instance = null;          private static FilterController instance = null;
324          public static FilterController getInstance()          public static FilterController getInstance()

Legend:
Removed from v.2163  
changed lines
  Added in v.2183

  ViewVC Help
Powered by ViewVC 1.1.20