/[projects]/dao/DaoMqPump2/MQFilter/FilterController.cs
ViewVC logotype

Diff of /dao/DaoMqPump2/MQFilter/FilterController.cs

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 2163 by torben, Fri May 16 17:58:30 2014 UTC revision 2175 by torben, Mon May 19 11:59:45 2014 UTC
# Line 1  Line 1 
1  using System;  using System;
2    using System.Collections;
3  using System.Collections.Generic;  using System.Collections.Generic;
4  using System.Text;  using System.IO;
5    using System.Text.RegularExpressions;
6    
7    using IBM.WMQ;
8    
9    using Microsoft.Win32;
10    
11    using DaoCommon;
12    
13    
14  namespace MQFilter  namespace MQFilter
15  {  {
16      class FilterController      class FilterController
17      {      {
18    
19            public string mqHost { get; private set; }
20            public string mqChannel { get; private set; }
21            public string mqQueueManager { get; private set; }
22    
23            public string logDirectory { get; private set; }
24    
25            public string[] filterTranscations { get; private set; }
26    
27    
28            public static readonly string queueNameIndbakke = "DAO.INDBAKKE";
29            public static readonly string queueNameDimaps = "DAO.SAMLET";
30            public static readonly string queueNameMysql = "DAO.ALL";
31            public static readonly string queueNameStore = "DAO.STORE";
32    
33    
34            protected FilterController()
35            {
36                initialize();
37            }
38    
39            private void initialize()
40            {
41                Console.WriteLine("FilterController: Loading config");
42                RegistryKey key = Registry.LocalMachine.CreateSubKey("Software\\DAO\\MQFilter");
43    
44    
45                //Læser globale MQ Parametre
46                mqHost = (string)key.GetValue("MQHost");
47                if (mqHost == null || mqHost.Length == 0)
48                {
49                    key.SetValue("MQHost", "", RegistryValueKind.String);
50                    throw new System.ArgumentException("MQHost cannot be null or empty");
51                }
52    
53                mqChannel = (string)key.GetValue("MQChannel");
54                if (mqChannel == null || mqChannel.Length == 0)
55                {
56                    key.SetValue("MQChannel", "", RegistryValueKind.String);
57                    throw new System.ArgumentException("MQChannel cannot be null or empty");
58                }
59    
60                mqQueueManager = (string)key.GetValue("MQQueueManager");
61                if (mqQueueManager == null || mqQueueManager.Length == 0)
62                {
63                    key.SetValue("MQQueueManager", "", RegistryValueKind.String);
64                    throw new System.ArgumentException("MQQueueManager cannot be null or empty");
65                }
66    
67                ////////////
68    
69                logDirectory = (string)key.GetValue("LogDirectory");
70                if (logDirectory == null || logDirectory.Length == 0)
71                {
72                    key.SetValue("LogDirectory", "", RegistryValueKind.String);
73                    throw new System.ArgumentException("LogDirectory cannot be null or empty");
74                }
75    
76                if (Directory.Exists(logDirectory) == false)
77                {
78                    Directory.CreateDirectory(logDirectory);
79                }
80    
81                ////////////
82    
83                String tmpFilterTransactions = (string)key.GetValue("FilterTransactions");
84                if (tmpFilterTransactions == null || tmpFilterTransactions.Length == 0)
85                {
86                    key.SetValue("FilterTransactions", "", RegistryValueKind.String);
87                    throw new System.ArgumentException("FilterTransactions cannot be null or empty");
88                }
89                tmpFilterTransactions = tmpFilterTransactions.Replace(';', ',');
90                filterTranscations = Regex.Split(tmpFilterTransactions, ",");
91    
92                for (int i = 0; i < filterTranscations.Length; i++)
93                {
94                    filterTranscations[i] = filterTranscations[i].Trim().ToUpper();
95                }
96    
97                ////////////
98    
99            }
100    
101    
102    
103    
104    
105          public void transportAllMessages()          public void transportAllMessages()
106          {          {
107                int messageCount = 0;
108    
109                MQQueueManager mqMgr = null;
110                MQQueue queueIndbakke = null;
111                MQQueue queueMysql = null;
112                MQQueue queueDimaps = null;
113                MQQueue queueStore = null;
114                try
115                {
116                    //MQ options
117                    Hashtable connProps = MQHelper.getConnectionProperties(mqHost, mqChannel);
118                    int openInputOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
119                    int openOutputOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
120    
121    
122                    //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
123                    mqMgr = new MQQueueManager(mqQueueManager, connProps);//stage 1 connect to mq
124    
125    
126                    queueIndbakke = MQHelper.openQueueHelper(queueNameIndbakke, mqMgr, openInputOptions);
127    
128                    queueMysql = MQHelper.openQueueHelper(queueNameMysql, mqMgr, openOutputOptions);
129                    queueDimaps = MQHelper.openQueueHelper(queueNameDimaps, mqMgr, openOutputOptions);
130                    queueStore = MQHelper.openQueueHelper(queueNameStore, mqMgr, openOutputOptions);
131    
132    
133                    bool isContinue = true;
134                    while (isContinue)
135                    {
136    
137                        MQMessage mqMsg = new MQMessage();
138                        MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
139    
140    
141                        try
142                        {
143                            queueIndbakke.Get(mqMsg, mqGetMsgOpts);
144                            if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
145                            {
146                                string salt2String = mqMsg.ReadString(mqMsg.MessageLength);
147                                //System.Console.WriteLine(msgString);
148    
149    
150                                // validér at headeren er gyldig
151                                if ( Salt2Helper.validateSalt2Header(salt2String) == false)
152                                {
153                                    string discarded_filename = Logfile.getLogFilename(LogfileType.LogTransactions, logDirectory, "filter");
154                                    using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))
155                                    {
156                                        discardedlog.WriteLine(Logfile.getNowString() + " " + salt2String);
157                                    }
158                                    continue; //gå frem til at tage næste transaktion fra køen
159                                }
160    
161                                MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
162                                // same as MQPMO_DEFAULT
163    
164                                MQMessage msg = new MQMessage();
165                                msg.Format = MQC.MQFMT_STRING;
166                                msg.CharacterSet = 1252;
167                                msg.WriteString(salt2String);
168    
169                                Salt2Header header = Salt2Helper.parseHeader(salt2String);
170                                queueMysql.Put(msg, pmo);
171                                if (saveForLater(header))
172                                {
173                                    queueStore.Put(msg, pmo);
174                                }
175                                else
176                                {
177                                    queueDimaps.Put(msg, pmo);
178                                }
179    
180    
181    
182                                messageCount++;// increment per run message counter
183                                if (messageCount >= 10000) // if we have moved  10000 messages in this run - let it go
184                                {
185                                    isContinue = false;
186                                }
187    
188    
189    
190    
191                            }
192                            else
193                            {
194                                System.Console.WriteLine("Non-text message");
195                            }
196                        }
197                        catch (MQException mqe)
198                        {
199                            isContinue = false;
200    
201                            // report reason, if any
202                            if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
203                            {
204                                // special report for normal end
205                                System.Console.WriteLine("no more messages");
206                            }
207                            else
208                            {
209                                // general report for other reasons
210                                System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
211    
212                            }
213    
214                        }
215    
216                    }
217    
218    
219    
220                }
221                finally
222                {
223                    MQHelper.closeQueueSafe(queueIndbakke);
224                    MQHelper.closeQueueSafe(queueMysql);
225                    MQHelper.closeQueueSafe(queueDimaps);
226                    MQHelper.closeQueueSafe(queueStore);
227    
228                    MQHelper.closeQueueManagerSafe(mqMgr);
229                }
230            }
231    
232            private Boolean saveForLater(Salt2Header header)
233            {
234    
235                DateTime now = DateTime.Now;
236                int hour = now.Hour;
237                if (hour >= 14 && hour < 18)
238                {
239                    
240                    if (contains(header.transaktionForkortelse, this.filterTranscations) ) //Så længe vi skal være .net3.0 kompatible er LINQ problematisk (LINQ kræver 3.5)
241                    {
242                        return true;
243                    }
244                    else
245                    {
246                        return false;
247                    }
248                }
249                else //normal operation - send straight trough
250                {
251                    return false;
252                }
253    
254          }          }
255    
256            private bool contains(string needle, string[] haystack) // s
257            {
258                foreach(string hay in haystack)
259                {
260                    if (needle.Equals(hay))
261                    {
262                        return true;
263                    }
264                }
265    
266                return false;            
267            }
268    
269    
270    
271    
272          /* Singleton */          /* Singleton */
273          private static FilterController instance = null;          private static FilterController instance = null;
274          public static FilterController getInstance()          public static FilterController getInstance()

Legend:
Removed from v.2163  
changed lines
  Added in v.2175

  ViewVC Help
Powered by ViewVC 1.1.20