/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Annotation of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2084 - (hide annotations) (download)
Tue Nov 26 19:22:50 2013 UTC (10 years, 6 months ago) by torben
File size: 20427 byte(s)
discardedlog should also contain date and time for transaction
1 torben 1986 using System;
2     using System.Collections;
3     using System.Collections.Generic;
4     using System.IO;
5    
6     using System.Diagnostics;
7    
8     using IBM.WMQ;
9     using MySql.Data.MySqlClient;
10 torben 2057 using System.Globalization;
11 torben 1986
12     namespace DaoMqPump2
13     {
14     public class Transport
15     {
16 torben 1999
17     enum LogfileType {
18     LogTransactions,
19 torben 2058 LogEvents,
20     LogDiscarded
21 torben 1999 }
22    
23 torben 1986 public static string SQL2MQ = "sql2mq";
24     public static string MQ2SQL = "mq2sql";
25    
26 torben 2001 //private bool enabled;
27 torben 1986
28     TransportController controller;
29    
30 torben 1999 StatusData statusData = new StatusData();
31    
32 torben 1986 public string name { get; private set; }
33     public string direction { get; private set; }
34     public string queueName { get; private set; }
35     public string mq2sqlInsertQuery { get; private set; }
36     public string sql2mqReadQuery { get; private set; }
37     public string sql2mqUpdateQuery { get; private set; }
38    
39 torben 1999 //public bool lastrunOk { get; private set; }
40     //public string lastErrorMessage { get; private set; }
41 torben 1986
42 torben 1999 //public string lastOkTime { get; private set; }
43     //public string lastErrorTime { get; private set; }
44     //public string lastTransferTime { get; private set; }
45 torben 1986
46 torben 1999 //public int counter { get; private set; }
47 torben 1986
48 torben 1999 public StatusData TransportStatusData
49     {
50     get
51     {
52     return this.statusData;
53     }
54     }
55    
56    
57     public bool Enabled
58     {
59     get {
60 torben 2001 return statusData.transportEnabled;
61 torben 1999 }
62     set
63     {
64 torben 2001 statusData.transportEnabled = value;
65 torben 1999 if (value == true)
66     {
67     this.addLogEntry("Transport enabled");
68     }
69     else
70     {
71     this.addLogEntry("Transport disabled");
72     }
73     }
74     }
75    
76    
77 torben 1986 private LinkedList<string> logEntries = new LinkedList<string>();
78    
79    
80     public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)
81     {
82     this.controller = controller;
83     this.name = name;
84     this.direction = direction;
85     this.queueName = queueName;
86     this.mq2sqlInsertQuery = mq2sqlInsertQuery;
87     this.sql2mqReadQuery = sql2mqReadQuery;
88     this.sql2mqUpdateQuery = sql2mqUpdateQuery;
89    
90 torben 2001 statusData.transportEnabled = enabled;
91 torben 1986
92 torben 1999
93     statusData.lastrunOk = true;
94     statusData.counter = 0;
95     statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";
96    
97 torben 1986 addLogEntry( "Starting ... " );
98     }
99    
100 torben 1999 ~Transport()
101     {
102     addLogEntry("Stopping ... ");
103     }
104    
105    
106 torben 1986 public void transportMessages()
107     {
108 torben 2001 if (statusData.transportEnabled == false)
109 torben 1986 return;
110    
111     Console.WriteLine(name + " -> transportMessages() ");
112 torben 1999 statusData.lastrunOk = true;
113 torben 1986
114 torben 1999 int startCounter = statusData.counter;
115    
116 torben 1986 if (direction == SQL2MQ)
117     {
118     transportSql2Mq();
119     }
120     else
121     {
122     transportMq2Sql();
123     }
124    
125 torben 1999 if (statusData.lastrunOk == true)
126 torben 1986 {
127 torben 1999 statusData.lastOkTime = getNowString();
128    
129     if (statusData.counter != startCounter)
130     {
131     //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet
132     statusData.lastTransferTime = getNowString();
133     }
134 torben 1986 }
135     else
136     {
137 torben 1999 addLogEntry(statusData.lastErrorMessage);
138     statusData.lastErrorTime = getNowString();
139 torben 1986 }
140     }
141    
142     private void transportSql2Mq()
143     {
144 torben 2051 MQQueueManager mqMgr = null;
145     MQQueue out_queue = null;
146    
147 torben 1999 string filename = getLogFilename(LogfileType.LogTransactions);
148 torben 1986 using (StreamWriter translog = new StreamWriter(filename, true) )
149     try
150     {
151 torben 2011 //MQ Options
152 torben 1986 Hashtable connProps = getConnectionProperties();
153     int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
154    
155 torben 2011 //MySQL Options
156 torben 1986 string mysqlString = buildMysqlConnString();
157    
158 torben 2051 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
159     mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
160     out_queue = mqMgr.AccessQueue(queueName, openOptions);
161 torben 2011 using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
162     using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
163     {
164     sqlReadConnection.Open();
165     sqlWriteConnection.Open();
166 torben 1986
167 torben 2011 //stage 3 move messages
168     string readSql = "CALL " + sql2mqReadQuery + "()";
169     MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);
170     MySqlDataReader dataReader = readCmd.ExecuteReader();
171     while (dataReader.Read())
172     {
173     int id = dataReader.GetInt32(0);
174     string msgString = dataReader.GetString(1);
175 torben 2010
176 torben 2011 MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
177     // same as MQPMO_DEFAULT
178 torben 1986
179 torben 2011 MQMessage msg = new MQMessage();
180     msg.Format = MQC.MQFMT_STRING;
181     msg.CharacterSet = 1252;
182     msg.WriteString(msgString);
183 torben 1986
184 torben 2011 out_queue.Put(msg, pmo);
185 torben 1986
186 torben 2011 //now that the message has been put on queue mark it as such
187 torben 1986
188 torben 2011 string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
189     MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
190     int numrows = updateCmd.ExecuteNonQuery();
191 torben 1986
192 torben 2056 translog.WriteLine(getNowString() + " " + msgString);
193 torben 1986
194 torben 2011 if (numrows != 1)
195     {
196     break;
197     }
198     statusData.counter++;
199     }
200 torben 1986
201     }
202     }
203     catch (Exception e)
204     {
205 torben 1999 statusData.lastrunOk = false;
206 torben 2048 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
207 torben 1999 Console.WriteLine(statusData.lastErrorMessage);
208 torben 2050 Console.WriteLine(e.StackTrace);
209 torben 1999 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
210 torben 1986 }
211 torben 2051 finally
212     {
213    
214     if (out_queue != null && out_queue.IsOpen)
215     {
216     try
217     {
218     out_queue.Close();
219     }
220     catch (Exception e)
221     {
222     Console.WriteLine("Error cleaning up qmgr " + e.Message);
223     }
224     }
225    
226     if (mqMgr != null && mqMgr.IsOpen)
227     {
228     try
229     {
230     mqMgr.Close();
231     }
232     catch (Exception e)
233     {
234     Console.WriteLine("Error cleaning up qmgr " + e.Message);
235     }
236     }
237    
238     }
239 torben 1986 }
240    
241     private void transportMq2Sql()
242     {
243 torben 2051 MQQueueManager mqMgr = null;
244     MQQueue in_queue = null;
245 torben 1999 string filename = getLogFilename(LogfileType.LogTransactions);
246 torben 1986 using (StreamWriter translog = new StreamWriter(filename, true))
247 torben 2051 try
248     {
249     //MQ options
250     Hashtable connProps = getConnectionProperties();
251     int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
252 torben 1986
253 torben 2051 //MySQL options
254     string mysqlString = buildMysqlConnString();
255 torben 1986
256 torben 2051 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
257     mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
258     in_queue = mqMgr.AccessQueue(queueName, openOptions);
259     using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
260     {
261 torben 1986
262 torben 2051 sqlConnection.Open();
263 torben 1986
264 torben 2011
265 torben 2051 //stage 3 move messages
266     bool isContinue = true;
267     while (isContinue)
268     {
269 torben 1986
270 torben 2051 MQMessage mqMsg = new MQMessage();
271     MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
272 torben 1986
273 torben 2051 mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
274 torben 1986
275 torben 2051 try
276 torben 1986 {
277 torben 2051 in_queue.Get(mqMsg, mqGetMsgOpts);
278     if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
279     {
280     string msgString = mqMsg.ReadString(mqMsg.MessageLength);
281 torben 2062 //System.Console.WriteLine(msgString);
282 torben 2011
283 torben 2083
284     // Hvis transaktionen starter med et ? er det ikke en gyldig transaktion
285     // validér ligeledes at headeren er gyldig
286     if ( msgString.StartsWith("?") || validateSalt2Header(msgString) == false )
287 torben 2058 {
288     string discarded_filename = getLogFilename(LogfileType.LogDiscarded);
289     using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))
290     {
291 torben 2084 discardedlog.WriteLine(getNowString() + " " + msgString);
292 torben 2058 }
293     mqMgr.Commit();//fjern den afviste transaktion fra køen
294     statusData.discardedCounter++;
295     continue; //gå frem til at tage næste transaktion fra køen
296     }
297    
298    
299 torben 2051 string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
300 torben 2011
301 torben 2051 MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
302     int numrows = sqlcmd.ExecuteNonQuery();
303 torben 2011
304 torben 2051 if (numrows == 1)
305     {
306 torben 2056 translog.WriteLine(getNowString() + " " + msgString);
307 torben 2051 mqMgr.Commit();
308     statusData.counter++;
309     }
310     else
311     {
312     mqMgr.Backout();
313     isContinue = false;
314     }
315    
316 torben 2011 }
317     else
318     {
319 torben 2051 System.Console.WriteLine("Non-text message");
320 torben 2011 }
321 torben 1986 }
322 torben 2051 catch (MQException mqe)
323 torben 1986 {
324 torben 2051 isContinue = false;
325 torben 1986
326 torben 2051 // report reason, if any
327     if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
328     {
329     // special report for normal end
330     System.Console.WriteLine("no more messages");
331     }
332     else
333     {
334     // general report for other reasons
335     System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
336     statusData.lastrunOk = false;
337     }
338    
339 torben 2011 }
340    
341 torben 2051
342 torben 1986 }
343    
344 torben 2051 }
345 torben 2011
346 torben 2051 }
347     catch (Exception e)
348     {
349 torben 2055 //Det vil være mest korrekt at Rollback/backout MQ Transaktionen her - for at være sikker på at Message'n fjernes fra køen
350     try
351     {
352     if (mqMgr != null)
353     {
354     mqMgr.Backout();
355     }
356     }
357     catch (Exception e2)
358     {
359     this.addLogEntry("Error backing out from MQ Transaction: " + e2.Message);
360     }
361    
362 torben 2051 statusData.lastrunOk = false;
363    
364     statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
365     Console.WriteLine(statusData.lastErrorMessage);
366     Console.WriteLine(e.StackTrace);
367     EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
368     }
369     finally
370     {
371    
372     if (in_queue != null && in_queue.IsOpen)
373     {
374     try
375     {
376     in_queue.Close();
377     }
378     catch (Exception e)
379     {
380     Console.WriteLine("Error cleaning up qmgr " + e.Message);
381     }
382 torben 1986 }
383 torben 2051
384     if (mqMgr != null && mqMgr.IsOpen)
385     {
386     try
387     {
388     mqMgr.Close();
389     } catch (Exception e) {
390     Console.WriteLine("Error cleaning up qmgr " + e.Message);
391     }
392     }
393 torben 1986
394     }
395     }
396    
397     private string buildMysqlConnString()
398     {
399     string connectionString = "";
400    
401     connectionString += "SERVER=" + controller.mysqlHost + ";";
402     //connectionString += "DATABASE=" + controller.mysqlHost + ";";
403     connectionString += "UID=" + controller.mysqlUser + ";";
404     connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
405 torben 2051 connectionString += "Max Pool Size=20;";
406     //connectionString += "ConnectionReset=true;";//ConnectionReset kræver muligvis at Default Database er angivet - det virker ihvertfald ikke uden
407 torben 1986
408     return connectionString;
409     }
410    
411     private Hashtable getConnectionProperties()
412     {
413     Hashtable connProperties = new Hashtable();
414     connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
415     connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);
416 torben 2060 connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel);
417 torben 1986 return connProperties;
418     }
419    
420 torben 1999
421    
422     private string getLogFilename(LogfileType type)
423 torben 1986 {
424    
425     DateTime now = DateTime.Now;
426     string filename = controller.logDirectory + "\\";
427 torben 1999
428 torben 2057 //Find uge nr
429     DateTimeFormatInfo dfi = DateTimeFormatInfo.CurrentInfo;
430     Calendar myCal = CultureInfo.InvariantCulture.Calendar;//System.Globalization
431     int week = myCal.GetWeekOfYear(now, dfi.CalendarWeekRule, dfi.FirstDayOfWeek);
432    
433 torben 1999 switch (type)
434     {
435     case LogfileType.LogEvents:
436     filename += "eventlog_";
437     break;
438    
439     case LogfileType.LogTransactions:
440     filename += "transactionlog_";
441     break;
442 torben 2058 case LogfileType.LogDiscarded:
443     filename += "discardedlog_";
444     break;
445 torben 1999 }
446    
447    
448 torben 2057 filename += name + "_" + now.Year.ToString("D4") + "_W" + week.ToString("D2") + ".log";
449 torben 1986
450     return filename;
451     }
452    
453     public string getNowString()
454     {
455     DateTime now = DateTime.Now;
456    
457     return now.ToString("s");
458     }
459    
460 torben 2083 private bool validateSalt2Header(string salt2String)
461     {
462     int result;
463    
464     string afsender = salt2String.Substring(0, 5);
465     string modtager = salt2String.Substring(5, 5);
466     string afsenderTegnSaet = salt2String.Substring(10, 6);
467     string standardNavn = salt2String.Substring(16, 6);
468     string standardVersion = salt2String.Substring(22, 3);
469     string afsenderSekvensnr = salt2String.Substring(25, 6);
470     string afsenderTidsstempel = salt2String.Substring(31, 14);
471     string afsenderBakkeIdent = salt2String.Substring(45, 5);
472     string modtagerBakkeIdent = salt2String.Substring(50, 5);
473     string transaktionForkortelse = salt2String.Substring(55, 4);
474     string transaktionsLaengde = salt2String.Substring(59, 5);
475     string prioritet = salt2String.Substring(64, 1);
476    
477     if (int.TryParse(standardVersion.Trim(), out result) == false) // standardVersion _skal_ være en int
478     {
479     return false;
480     }
481    
482     if (int.TryParse(afsenderSekvensnr.Trim(), out result) == false) // afsenderSekvensnr _skal_ være en int
483     {
484     return false;
485     }
486    
487     if (int.TryParse(afsenderTidsstempel.Trim(), out result) == false) // afsenderTidsstempel _skal_ være en int
488     {
489     return false;
490     }
491    
492     if (int.TryParse(transaktionsLaengde.Trim(), out result) == false) // transaktionsLaengde _skal_ være en int
493     {
494     return false;
495     }
496    
497     if ( int.TryParse(prioritet.Trim(), out result) == false ) // prioritet _skal_ være en int
498     {
499     return false;
500     }
501    
502     return true;
503     }
504    
505 torben 1986 private void addLogEntry(string msg)
506     {
507     msg = getNowString() + " " + msg;
508     lock (logEntries)
509     {
510     logEntries.AddFirst(msg);
511    
512     if (logEntries.Count > 20)
513     {
514     logEntries.RemoveLast();
515 torben 1999 }
516 torben 1986 }
517 torben 1999
518     string filename = getLogFilename(LogfileType.LogEvents);
519     using (StreamWriter eventlog = new StreamWriter(filename, true))
520     {
521     eventlog.WriteLine(msg);
522     }
523 torben 1986 }
524    
525     public string[] getLog()
526     {
527     lock(logEntries)
528     {
529     List<string> tmpEntries = new List<string>();
530     foreach (string s in logEntries)
531     {
532     tmpEntries.Add(s);
533     }
534     return tmpEntries.ToArray();
535     }
536     }
537    
538     }
539     }

  ViewVC Help
Powered by ViewVC 1.1.20