/[projects]/dao/DaoMqPump2/MQFilter/FilterController.cs
ViewVC logotype

Diff of /dao/DaoMqPump2/MQFilter/FilterController.cs

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 2168 by torben, Fri May 16 20:56:22 2014 UTC revision 2182 by torben, Tue May 20 20:48:48 2014 UTC
# Line 24  namespace MQFilter Line 24  namespace MQFilter
24    
25          public string[] filterTranscations { get; private set; }          public string[] filterTranscations { get; private set; }
26    
27            Logfile logFile;
28    
29    
30          public static readonly string queueNameIndbakke = "DAO.INDBAKKE";          public static readonly string queueNameIndbakke = "DAO.INDBAKKE";
31          public static readonly string queueNameDimaps = "DAO.SAMLET";          public static readonly string queueNameDimaps = "DAO.SAMLET";
32          public static readonly string queueNameMysql = "DAO.ALL";          public static readonly string queueNameMysql = "DAO.ALL";
33          public static readonly string queueNameStore = "DAO.STORE";          public static readonly string queueNameStore = "DAO.STORE";
34    
35            TimeSpan silentPeriodBegin;
36            TimeSpan silentPeriodEnd;
37    
38          protected FilterController()          protected FilterController()
39          {          {
40              initialize();              initialize();
41                logFile = new Logfile(LogfileType.LogEvents, "filter", logDirectory);
42                logFile.addSingleLogEntry("Starting service");
43          }          }
44    
45          private void initialize()          private void initialize()
# Line 45  namespace MQFilter Line 51  namespace MQFilter
51              //Læser globale MQ Parametre              //Læser globale MQ Parametre
52              mqHost = (string)key.GetValue("MQHost");              mqHost = (string)key.GetValue("MQHost");
53              if (mqHost == null || mqHost.Length == 0)              if (mqHost == null || mqHost.Length == 0)
54                {
55                    key.SetValue("MQHost", "", RegistryValueKind.String);
56                  throw new System.ArgumentException("MQHost cannot be null or empty");                  throw new System.ArgumentException("MQHost cannot be null or empty");
57                }
58    
59              mqChannel = (string)key.GetValue("MQChannel");              mqChannel = (string)key.GetValue("MQChannel");
60              if (mqChannel == null || mqChannel.Length == 0)              if (mqChannel == null || mqChannel.Length == 0)
61                {
62                    key.SetValue("MQChannel", "", RegistryValueKind.String);
63                  throw new System.ArgumentException("MQChannel cannot be null or empty");                  throw new System.ArgumentException("MQChannel cannot be null or empty");
64                }
65    
66              mqQueueManager = (string)key.GetValue("MQQueueManager");              mqQueueManager = (string)key.GetValue("MQQueueManager");
67              if (mqQueueManager == null || mqQueueManager.Length == 0)              if (mqQueueManager == null || mqQueueManager.Length == 0)
68                {
69                    key.SetValue("MQQueueManager", "", RegistryValueKind.String);
70                  throw new System.ArgumentException("MQQueueManager cannot be null or empty");                  throw new System.ArgumentException("MQQueueManager cannot be null or empty");
71                }
72    
73              ////////////              ////////////
74    
75              logDirectory = (string)key.GetValue("LogDirectory");              logDirectory = (string)key.GetValue("LogDirectory");
76              if (logDirectory == null || logDirectory.Length == 0)              if (logDirectory == null || logDirectory.Length == 0)
77                {
78                    key.SetValue("LogDirectory", "", RegistryValueKind.String);
79                  throw new System.ArgumentException("LogDirectory cannot be null or empty");                  throw new System.ArgumentException("LogDirectory cannot be null or empty");
80                }
81    
82              if (Directory.Exists(logDirectory) == false)              if (Directory.Exists(logDirectory) == false)
83              {              {
# Line 68  namespace MQFilter Line 86  namespace MQFilter
86    
87              ////////////              ////////////
88    
89                string silentBeginStr = (string)key.GetValue("SilentPeriodBegin");
90                if (silentBeginStr == null || silentBeginStr.Length == 0)
91                {
92                    key.SetValue("SilentPeriodBegin", "", RegistryValueKind.String);
93                    throw new System.ArgumentException("SilentPeriodBegin cannot be null or empty");
94                }
95                silentPeriodBegin = TimeSpan.Parse(silentBeginStr);
96    
97                string silentEndStr = (string)key.GetValue("SilentPeriodEnd");
98                if (silentEndStr == null || silentEndStr.Length == 0)
99                {
100                    key.SetValue("SilentPeriodEnd", "", RegistryValueKind.String);
101                    throw new System.ArgumentException("SilentPeriodEnd cannot be null or empty");
102                }
103                silentPeriodEnd = TimeSpan.Parse(silentEndStr);
104    
105                ////////////
106    
107              String tmpFilterTransactions = (string)key.GetValue("FilterTransactions");              String tmpFilterTransactions = (string)key.GetValue("FilterTransactions");
108              if (tmpFilterTransactions == null || tmpFilterTransactions.Length == 0)              if (tmpFilterTransactions == null || tmpFilterTransactions.Length == 0)
109                {
110                    key.SetValue("FilterTransactions", "", RegistryValueKind.String);
111                  throw new System.ArgumentException("FilterTransactions cannot be null or empty");                  throw new System.ArgumentException("FilterTransactions cannot be null or empty");
112                }
113                tmpFilterTransactions = tmpFilterTransactions.Replace(';', ',');
114              filterTranscations = Regex.Split(tmpFilterTransactions, ",");              filterTranscations = Regex.Split(tmpFilterTransactions, ",");
115    
116              for (int i = 0; i < filterTranscations.Length; i++)              for (int i = 0; i < filterTranscations.Length; i++)
117              {              {
118                  filterTranscations[i] = filterTranscations[i].Trim();                  filterTranscations[i] = filterTranscations[i].Trim().ToUpper();
119              }              }
120    
121              ////////////              ////////////
# Line 83  namespace MQFilter Line 123  namespace MQFilter
123          }          }
124    
125    
         private Hashtable getConnectionProperties()  
         {  
             Hashtable connProperties = new Hashtable();  
             connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);  
             connProperties.Add(MQC.HOST_NAME_PROPERTY, mqHost);  
             connProperties.Add(MQC.CHANNEL_PROPERTY, mqChannel);  
             return connProperties;  
         }  
126    
127    
128          public void transportAllMessages()          public void transportAllMessages()
129          {          {
130                try
131                {
132                    transportMessagesWorker();
133                }
134                catch (Exception e)
135                {
136                    logFile.addSingleLogEntry("Error during transportAllMessages: " + e.Message);
137                    Console.WriteLine(e.StackTrace);
138                }
139            }
140    
141            private void transportMessagesWorker()
142            {
143              int messageCount = 0;              int messageCount = 0;
144    
145              MQQueueManager mqMgr = null;              MQQueueManager mqMgr = null;
# Line 102  namespace MQFilter Line 147  namespace MQFilter
147              MQQueue queueMysql = null;              MQQueue queueMysql = null;
148              MQQueue queueDimaps = null;              MQQueue queueDimaps = null;
149              MQQueue queueStore = null;              MQQueue queueStore = null;
150    
151                using (Logfile translog = new Logfile(LogfileType.LogTransactions, "filter", logDirectory))
152              try              try
153              {              {
154                  //MQ options                  //MQ options
155                  Hashtable connProps = getConnectionProperties();                  Hashtable connProps = MQHelper.getConnectionProperties(mqHost, mqChannel);
156                  int openInputOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;                  int openInputOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
157                  int openOutputOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;                  int openOutputOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
158    
# Line 113  namespace MQFilter Line 160  namespace MQFilter
160                  //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement                  //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
161                  mqMgr = new MQQueueManager(mqQueueManager, connProps);//stage 1 connect to mq                  mqMgr = new MQQueueManager(mqQueueManager, connProps);//stage 1 connect to mq
162    
                 queueIndbakke = mqMgr.AccessQueue(queueNameIndbakke, openInputOptions);  
163    
164                  queueMysql = mqMgr.AccessQueue(queueNameMysql, openOutputOptions);                  queueIndbakke = MQHelper.openQueueHelper(queueNameIndbakke, mqMgr, openInputOptions);
165                  queueDimaps = mqMgr.AccessQueue(queueNameDimaps, openOutputOptions);  
166                  queueStore = mqMgr.AccessQueue(queueNameStore, openOutputOptions);                  queueMysql = MQHelper.openQueueHelper(queueNameMysql, mqMgr, openOutputOptions);
167                    queueDimaps = MQHelper.openQueueHelper(queueNameDimaps, mqMgr, openOutputOptions);
168                    queueStore = MQHelper.openQueueHelper(queueNameStore, mqMgr, openOutputOptions);
169    
170    
171                  bool isContinue = true;                  bool isContinue = true;
# Line 133  namespace MQFilter Line 181  namespace MQFilter
181                          queueIndbakke.Get(mqMsg, mqGetMsgOpts);                          queueIndbakke.Get(mqMsg, mqGetMsgOpts);
182                          if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)                          if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
183                          {                          {
184                              string msgString = mqMsg.ReadString(mqMsg.MessageLength);                              if (mqMsg.MessageLength == 0) //Skip empty transactions (so far only seen on my test server)
185                              //System.Console.WriteLine(msgString);                                  continue;
186    
187                                //System.Console.WriteLine("LEN>" + mqMsg.MessageLength);
188                                string salt2String = mqMsg.ReadString(mqMsg.MessageLength);
189                                
190    
191    
192                              // Hvis transaktionen starter med et ? er det ikke en gyldig transaktion                              // validér at headeren er gyldig
193                              // validér ligeledes at headeren er gyldig                              if ( Salt2Helper.validateSalt2Header(salt2String) == false)
                             if (msgString.StartsWith("?") || DaoUtil.validateSalt2Header(msgString) == false)  
194                              {                              {
195                                  string discarded_filename = Logfile.getLogFilename(LogfileType.LogTransactions, logDirectory, "filter");  
196                                  using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))  
197                                    using (Logfile discardedlog = new Logfile(LogfileType.LogDiscarded, "filter", logDirectory))
198                                  {                                  {
199                                      discardedlog.WriteLine(Logfile.getNowString() + " " + msgString);                                      discardedlog.writeLogEntry(salt2String);
200                                  }                                  }
201                                  continue; //gå frem til at tage næste transaktion fra køen                                  continue; //gå frem til at tage næste transaktion fra køen
202                              }                              }
203    
204                                translog.writeLogEntry(salt2String);
205    
206                                MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
207                                // same as MQPMO_DEFAULT
208    
209                                MQMessage msg = new MQMessage();
210                                msg.Format = MQC.MQFMT_STRING;
211                                msg.CharacterSet = 1252;
212                                msg.WriteString(salt2String);
213    
214                                Salt2Header header = Salt2Helper.parseHeader(salt2String);
215                                queueMysql.Put(msg, pmo);
216                                if (saveForLater(header))
217                                {
218                                    queueStore.Put(msg, pmo);
219                                }
220                                else
221                                {
222                                    queueDimaps.Put(msg, pmo);
223                                }
224    
225                                
   
   
226    
227                              messageCount++;// increment per run message counter                              messageCount++;// increment per run message counter
228                              if (messageCount >= 10000) // if we have moved  10000 messages in this run - let it go                              if (messageCount >= 10000) // if we have moved  10000 messages in this run - let it go
# Line 197  namespace MQFilter Line 265  namespace MQFilter
265              }              }
266              finally              finally
267              {              {
268                  closeQueue(queueIndbakke);                  MQHelper.closeQueueSafe(queueIndbakke);
269                  closeQueue(queueMysql);                  MQHelper.closeQueueSafe(queueMysql);
270                  closeQueue(queueDimaps);                  MQHelper.closeQueueSafe(queueDimaps);
271                  closeQueue(queueStore);                  MQHelper.closeQueueSafe(queueStore);
272    
273                    MQHelper.closeQueueManagerSafe(mqMgr);
274                }
275            }
276    
277            private Boolean saveForLater(Salt2Header header)
278            {
279    
280                TimeSpan now = DateTime.Now.TimeOfDay;
281                
282                if (now >= silentPeriodBegin && now < silentPeriodEnd)
283                {
284    
285                  if (mqMgr != null && mqMgr.IsOpen)                  if (header.afsender == "DAO") //DAO transaktioner må altid komme igennem
286                        return false;
287                    
288                    if (contains(header.transaktionForkortelse, this.filterTranscations) ) //Så længe vi skal være .net3.0 kompatible er LINQ problematisk (LINQ kræver 3.5)
289                  {                  {
290                      try                      return true;
                     {  
                         mqMgr.Close();  
                     }  
                     catch (Exception e)  
                     {  
                         Console.WriteLine("Error cleaning up qmgr " + e.Message);  
                     }  
291                  }                  }
292                    else
293                    {
294                        return false;
295                    }
296                }
297                else //normal operation - send straight trough
298                {
299                    return false;
300              }              }
301    
302          }          }
303    
304          private void closeQueue(MQQueue queue)          private bool contains(string needle, string[] haystack) // s
305          {          {
306              if (queue != null && queue.IsOpen)              foreach(string hay in haystack)
307              {              {
308                  try                  if (needle.Equals(hay))
309                  {                  {
310                      queue.Close();                      return true;
                 }  
                 catch (Exception e)  
                 {  
                     Console.WriteLine("Error cleaning up queue " + e.Message);  
311                  }                  }
312              }              }
313    
314                return false;            
315          }          }
316    
317    
318    
319    
320          /* Singleton */          /* Singleton */
321          private static FilterController instance = null;          private static FilterController instance = null;
322          public static FilterController getInstance()          public static FilterController getInstance()

Legend:
Removed from v.2168  
changed lines
  Added in v.2182

  ViewVC Help
Powered by ViewVC 1.1.20