/[projects]/dao/DaoMqPump2/MQFilter/FilterController.cs
ViewVC logotype

Diff of /dao/DaoMqPump2/MQFilter/FilterController.cs

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 2171 by torben, Fri May 16 21:50:17 2014 UTC revision 2183 by torben, Wed May 21 09:56:21 2014 UTC
# Line 16  namespace MQFilter Line 16  namespace MQFilter
16      class FilterController      class FilterController
17      {      {
18    
19            public const int TRANSACTIONS_PER_RUN = 2000;
20    
21          public string mqHost { get; private set; }          public string mqHost { get; private set; }
22          public string mqChannel { get; private set; }          public string mqChannel { get; private set; }
23          public string mqQueueManager { get; private set; }          public string mqQueueManager { get; private set; }
# Line 24  namespace MQFilter Line 26  namespace MQFilter
26    
27          public string[] filterTranscations { get; private set; }          public string[] filterTranscations { get; private set; }
28    
29            Logfile logFile;
30    
31    
32          public static readonly string queueNameIndbakke = "DAO.INDBAKKE";          public static readonly string queueNameIndbakke = "DAO.INDBAKKE";
33          public static readonly string queueNameDimaps = "DAO.SAMLET";          public static readonly string queueNameDimaps = "DAO.SAMLET";
34          public static readonly string queueNameMysql = "DAO.ALL";          public static readonly string queueNameMysql = "DAO.ALL";
35          public static readonly string queueNameStore = "DAO.STORE";          public static readonly string queueNameStore = "DAO.STORE";
36    
37            TimeSpan silentPeriodBegin;
38            TimeSpan silentPeriodEnd;
39    
40          protected FilterController()          protected FilterController()
41          {          {
42              initialize();              initialize();
43                logFile = new Logfile(LogfileType.LogEvents, "filter", logDirectory);
44                logFile.addSingleLogEntry("Starting service");
45          }          }
46    
47          private void initialize()          private void initialize()
# Line 45  namespace MQFilter Line 53  namespace MQFilter
53              //Læser globale MQ Parametre              //Læser globale MQ Parametre
54              mqHost = (string)key.GetValue("MQHost");              mqHost = (string)key.GetValue("MQHost");
55              if (mqHost == null || mqHost.Length == 0)              if (mqHost == null || mqHost.Length == 0)
56                {
57                    key.SetValue("MQHost", "", RegistryValueKind.String);
58                  throw new System.ArgumentException("MQHost cannot be null or empty");                  throw new System.ArgumentException("MQHost cannot be null or empty");
59                }
60    
61              mqChannel = (string)key.GetValue("MQChannel");              mqChannel = (string)key.GetValue("MQChannel");
62              if (mqChannel == null || mqChannel.Length == 0)              if (mqChannel == null || mqChannel.Length == 0)
63                {
64                    key.SetValue("MQChannel", "", RegistryValueKind.String);
65                  throw new System.ArgumentException("MQChannel cannot be null or empty");                  throw new System.ArgumentException("MQChannel cannot be null or empty");
66                }
67    
68              mqQueueManager = (string)key.GetValue("MQQueueManager");              mqQueueManager = (string)key.GetValue("MQQueueManager");
69              if (mqQueueManager == null || mqQueueManager.Length == 0)              if (mqQueueManager == null || mqQueueManager.Length == 0)
70                {
71                    key.SetValue("MQQueueManager", "", RegistryValueKind.String);
72                  throw new System.ArgumentException("MQQueueManager cannot be null or empty");                  throw new System.ArgumentException("MQQueueManager cannot be null or empty");
73                }
74    
75              ////////////              ////////////
76    
77              logDirectory = (string)key.GetValue("LogDirectory");              logDirectory = (string)key.GetValue("LogDirectory");
78              if (logDirectory == null || logDirectory.Length == 0)              if (logDirectory == null || logDirectory.Length == 0)
79                {
80                    key.SetValue("LogDirectory", "", RegistryValueKind.String);
81                  throw new System.ArgumentException("LogDirectory cannot be null or empty");                  throw new System.ArgumentException("LogDirectory cannot be null or empty");
82                }
83    
84              if (Directory.Exists(logDirectory) == false)              if (Directory.Exists(logDirectory) == false)
85              {              {
# Line 68  namespace MQFilter Line 88  namespace MQFilter
88    
89              ////////////              ////////////
90    
91                string silentBeginStr = (string)key.GetValue("SilentPeriodBegin");
92                if (silentBeginStr == null || silentBeginStr.Length == 0)
93                {
94                    key.SetValue("SilentPeriodBegin", "", RegistryValueKind.String);
95                    throw new System.ArgumentException("SilentPeriodBegin cannot be null or empty");
96                }
97                silentPeriodBegin = TimeSpan.Parse(silentBeginStr);
98    
99                string silentEndStr = (string)key.GetValue("SilentPeriodEnd");
100                if (silentEndStr == null || silentEndStr.Length == 0)
101                {
102                    key.SetValue("SilentPeriodEnd", "", RegistryValueKind.String);
103                    throw new System.ArgumentException("SilentPeriodEnd cannot be null or empty");
104                }
105                silentPeriodEnd = TimeSpan.Parse(silentEndStr);
106    
107                ////////////
108    
109              String tmpFilterTransactions = (string)key.GetValue("FilterTransactions");              String tmpFilterTransactions = (string)key.GetValue("FilterTransactions");
110              if (tmpFilterTransactions == null || tmpFilterTransactions.Length == 0)              if (tmpFilterTransactions == null || tmpFilterTransactions.Length == 0)
111                {
112                    key.SetValue("FilterTransactions", "", RegistryValueKind.String);
113                  throw new System.ArgumentException("FilterTransactions cannot be null or empty");                  throw new System.ArgumentException("FilterTransactions cannot be null or empty");
114                }
115                tmpFilterTransactions = tmpFilterTransactions.Replace(';', ',');
116              filterTranscations = Regex.Split(tmpFilterTransactions, ",");              filterTranscations = Regex.Split(tmpFilterTransactions, ",");
117    
118              for (int i = 0; i < filterTranscations.Length; i++)              for (int i = 0; i < filterTranscations.Length; i++)
119              {              {
120                  filterTranscations[i] = filterTranscations[i].Trim();                  filterTranscations[i] = filterTranscations[i].Trim().ToUpper();
121              }              }
122    
123              ////////////              ////////////
# Line 83  namespace MQFilter Line 125  namespace MQFilter
125          }          }
126    
127    
         private Hashtable getConnectionProperties()  
         {  
             Hashtable connProperties = new Hashtable();  
             connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);  
             connProperties.Add(MQC.HOST_NAME_PROPERTY, mqHost);  
             connProperties.Add(MQC.CHANNEL_PROPERTY, mqChannel);  
             return connProperties;  
         }  
128    
129    
130          public void transportAllMessages()          public void transportAllMessages()
131          {          {
132                try
133                {
134                    transportMessagesWorker();
135                }
136                catch (Exception e)
137                {
138                    logFile.addSingleLogEntry("Error during transportAllMessages: " + e.Message);
139                    Console.WriteLine(e.StackTrace);
140                }
141            }
142    
143            private void transportMessagesWorker()
144            {
145              int messageCount = 0;              int messageCount = 0;
146    
147              MQQueueManager mqMgr = null;              MQQueueManager mqMgr = null;
# Line 102  namespace MQFilter Line 149  namespace MQFilter
149              MQQueue queueMysql = null;              MQQueue queueMysql = null;
150              MQQueue queueDimaps = null;              MQQueue queueDimaps = null;
151              MQQueue queueStore = null;              MQQueue queueStore = null;
152    
153                using (Logfile translog = new Logfile(LogfileType.LogTransactions, "filter", logDirectory))
154              try              try
155              {              {
156                  //MQ options                  //MQ options
157                  Hashtable connProps = getConnectionProperties();                  Hashtable connProps = MQHelper.getConnectionProperties(mqHost, mqChannel);
158                  int openInputOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;                  int openInputOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
159                  int openOutputOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;                  int openOutputOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
160    
# Line 113  namespace MQFilter Line 162  namespace MQFilter
162                  //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement                  //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
163                  mqMgr = new MQQueueManager(mqQueueManager, connProps);//stage 1 connect to mq                  mqMgr = new MQQueueManager(mqQueueManager, connProps);//stage 1 connect to mq
164    
                 queueIndbakke = mqMgr.AccessQueue(queueNameIndbakke, openInputOptions);  
165    
166                  queueMysql = mqMgr.AccessQueue(queueNameMysql, openOutputOptions);                  queueIndbakke = MQHelper.openQueueHelper(queueNameIndbakke, mqMgr, openInputOptions);
167                  queueDimaps = mqMgr.AccessQueue(queueNameDimaps, openOutputOptions);  
168                  queueStore = mqMgr.AccessQueue(queueNameStore, openOutputOptions);                  queueMysql = MQHelper.openQueueHelper(queueNameMysql, mqMgr, openOutputOptions);
169                    queueDimaps = MQHelper.openQueueHelper(queueNameDimaps, mqMgr, openOutputOptions);
170                    queueStore = MQHelper.openQueueHelper(queueNameStore, mqMgr, openOutputOptions);
171    
172    
173                  bool isContinue = true;                  bool isContinue = true;
# Line 133  namespace MQFilter Line 183  namespace MQFilter
183                          queueIndbakke.Get(mqMsg, mqGetMsgOpts);                          queueIndbakke.Get(mqMsg, mqGetMsgOpts);
184                          if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)                          if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
185                          {                          {
186                              string saltiiString = mqMsg.ReadString(mqMsg.MessageLength);                              if (mqMsg.MessageLength == 0) //Skip empty transactions (so far only seen on my test server)
187                              //System.Console.WriteLine(msgString);                                  continue;
188    
189                                //System.Console.WriteLine("LEN>" + mqMsg.MessageLength);
190                                string salt2String = mqMsg.ReadString(mqMsg.MessageLength);
191                                
192    
193    
194                              // Hvis transaktionen starter med et ? er det ikke en gyldig transaktion                              // validér at headeren er gyldig
195                              // validér ligeledes at headeren er gyldig                              if ( Salt2Helper.validateSalt2Header(salt2String) == false)
                             if (saltiiString.StartsWith("?") || Salt2Helper.validateSalt2Header(saltiiString) == false)  
196                              {                              {
197                                  string discarded_filename = Logfile.getLogFilename(LogfileType.LogTransactions, logDirectory, "filter");  
198                                  using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))  
199                                    using (Logfile discardedlog = new Logfile(LogfileType.LogDiscarded, "filter", logDirectory))
200                                  {                                  {
201                                      discardedlog.WriteLine(Logfile.getNowString() + " " + saltiiString);                                      discardedlog.writeLogEntry(salt2String);
202                                  }                                  }
203                                  continue; //gå frem til at tage næste transaktion fra køen                                  continue; //gå frem til at tage næste transaktion fra køen
204                              }                              }
205    
206                                translog.writeLogEntry(salt2String);
207    
208                              MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,                              MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
209                              // same as MQPMO_DEFAULT                              // same as MQPMO_DEFAULT
210    
211                              MQMessage msg = new MQMessage();                              MQMessage msg = new MQMessage();
212                              msg.Format = MQC.MQFMT_STRING;                              msg.Format = MQC.MQFMT_STRING;
213                              msg.CharacterSet = 1252;                              msg.CharacterSet = 1252;
214                              msg.WriteString(saltiiString);                              msg.WriteString(salt2String);
215    
216                              Salt2Header header = Salt2Helper.parseHeader(saltiiString);                              Salt2Header header = Salt2Helper.parseHeader(salt2String);
217                              queueMysql.Put(msg, pmo);                              queueMysql.Put(msg, pmo);
218                              if (saveForLater(header))                              if (saveForLater(header))
219                              {                              {
# Line 168  namespace MQFilter Line 224  namespace MQFilter
224                                  queueDimaps.Put(msg, pmo);                                  queueDimaps.Put(msg, pmo);
225                              }                              }
226    
227                                
228    
229                              messageCount++;// increment per run message counter                              messageCount++;// increment per run message counter
230                              if (messageCount >= 10000) // if we have moved  10000 messages in this run - let it go                              if (messageCount >= TRANSACTIONS_PER_RUN) // if we have moved  TRANSACTIONS_PER_RUN messages in this run - let it go
231                              {                              {
232                                  isContinue = false;                                  isContinue = false;
233                              }                              }
# Line 211  namespace MQFilter Line 267  namespace MQFilter
267              }              }
268              finally              finally
269              {              {
270                  closeQueue(queueIndbakke);                  MQHelper.closeQueueSafe(queueIndbakke);
271                  closeQueue(queueMysql);                  MQHelper.closeQueueSafe(queueMysql);
272                  closeQueue(queueDimaps);                  MQHelper.closeQueueSafe(queueDimaps);
273                  closeQueue(queueStore);                  MQHelper.closeQueueSafe(queueStore);
   
274    
275                  if (mqMgr != null && mqMgr.IsOpen)                  MQHelper.closeQueueManagerSafe(mqMgr);
                 {  
                     try  
                     {  
                         mqMgr.Close();  
                     }  
                     catch (Exception e)  
                     {  
                         Console.WriteLine("Error cleaning up qmgr " + e.Message);  
                     }  
                 }  
276              }              }
277          }          }
278    
279          private Boolean saveForLater(Salt2Header header)          private Boolean saveForLater(Salt2Header header)
280          {          {
281              DateTime now = DateTime.Now;  
282              int hour = now.Hour;              TimeSpan now = DateTime.Now.TimeOfDay;
283              if (hour >= 14 && hour < 18)              
284                if (now >= silentPeriodBegin && now < silentPeriodEnd)
285              {              {
286    
287                    if (header.afsender == "DAO") //DAO transaktioner må altid komme igennem
288                        return false;
289                                    
290                  if (contains(header.transaktionForkortelse, this.filterTranscations) ) //Så længe vi skal være net3.0 kompatible er LINQ problematisk                  if (contains(header.transaktionForkortelse, this.filterTranscations) ) //Så længe vi skal være .net3.0 kompatible er LINQ problematisk (LINQ kræver 3.5)
291                  {                  {
292                      return true;                      return true;
293                  }                  }
# Line 267  namespace MQFilter Line 316  namespace MQFilter
316              return false;                          return false;            
317          }          }
318    
         private void closeQueue(MQQueue queue)  
         {  
             if (queue != null && queue.IsOpen)  
             {  
                 try  
                 {  
                     queue.Close();  
                 }  
                 catch (Exception e)  
                 {  
                     Console.WriteLine("Error cleaning up queue " + e.Message);  
                 }  
             }  
   
         }  
319    
320    
321    

Legend:
Removed from v.2171  
changed lines
  Added in v.2183

  ViewVC Help
Powered by ViewVC 1.1.20