/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Annotation of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2136 - (hide annotations) (download)
Wed Mar 26 14:00:14 2014 UTC (10 years, 2 months ago) by torben
File size: 22103 byte(s)
in transportMq2Sql() - only move 10000 messages per run to give the other transports a chance
1 torben 1986 using System;
2     using System.Collections;
3     using System.Collections.Generic;
4     using System.IO;
5    
6     using System.Diagnostics;
7    
8     using IBM.WMQ;
9     using MySql.Data.MySqlClient;
10 torben 2057 using System.Globalization;
11 torben 1986
12     namespace DaoMqPump2
13     {
14     public class Transport
15     {
16 torben 1999
17     enum LogfileType {
18     LogTransactions,
19 torben 2058 LogEvents,
20     LogDiscarded
21 torben 1999 }
22    
23 torben 1986 public static string SQL2MQ = "sql2mq";
24     public static string MQ2SQL = "mq2sql";
25    
26 torben 2001 //private bool enabled;
27 torben 1986
28     TransportController controller;
29    
30 torben 1999 StatusData statusData = new StatusData();
31    
32 torben 1986 public string name { get; private set; }
33     public string direction { get; private set; }
34     public string queueName { get; private set; }
35     public string mq2sqlInsertQuery { get; private set; }
36     public string sql2mqReadQuery { get; private set; }
37     public string sql2mqUpdateQuery { get; private set; }
38    
39 torben 1999 //public bool lastrunOk { get; private set; }
40     //public string lastErrorMessage { get; private set; }
41 torben 1986
42 torben 1999 //public string lastOkTime { get; private set; }
43     //public string lastErrorTime { get; private set; }
44     //public string lastTransferTime { get; private set; }
45 torben 1986
46 torben 1999 //public int counter { get; private set; }
47 torben 1986
48 torben 1999 public StatusData TransportStatusData
49     {
50     get
51     {
52     return this.statusData;
53     }
54     }
55    
56    
57     public bool Enabled
58     {
59     get {
60 torben 2001 return statusData.transportEnabled;
61 torben 1999 }
62     set
63     {
64 torben 2001 statusData.transportEnabled = value;
65 torben 1999 if (value == true)
66     {
67     this.addLogEntry("Transport enabled");
68     }
69     else
70     {
71     this.addLogEntry("Transport disabled");
72     }
73     }
74     }
75    
76    
77 torben 1986 private LinkedList<string> logEntries = new LinkedList<string>();
78    
79    
80     public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)
81     {
82     this.controller = controller;
83     this.name = name;
84     this.direction = direction;
85     this.queueName = queueName;
86     this.mq2sqlInsertQuery = mq2sqlInsertQuery;
87     this.sql2mqReadQuery = sql2mqReadQuery;
88     this.sql2mqUpdateQuery = sql2mqUpdateQuery;
89    
90 torben 2001 statusData.transportEnabled = enabled;
91 torben 1986
92 torben 1999
93     statusData.lastrunOk = true;
94     statusData.counter = 0;
95     statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";
96    
97 torben 1986 addLogEntry( "Starting ... " );
98     }
99    
100 torben 1999 ~Transport()
101     {
102     addLogEntry("Stopping ... ");
103     }
104    
105    
106 torben 1986 public void transportMessages()
107     {
108 torben 2001 if (statusData.transportEnabled == false)
109 torben 1986 return;
110    
111     Console.WriteLine(name + " -> transportMessages() ");
112 torben 1999 statusData.lastrunOk = true;
113 torben 1986
114 torben 1999 int startCounter = statusData.counter;
115    
116 torben 1986 if (direction == SQL2MQ)
117     {
118     transportSql2Mq();
119     }
120     else
121     {
122     transportMq2Sql();
123     }
124    
125 torben 1999 if (statusData.lastrunOk == true)
126 torben 1986 {
127 torben 1999 statusData.lastOkTime = getNowString();
128    
129     if (statusData.counter != startCounter)
130     {
131     //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet
132     statusData.lastTransferTime = getNowString();
133     }
134 torben 1986 }
135     else
136     {
137 torben 1999 addLogEntry(statusData.lastErrorMessage);
138     statusData.lastErrorTime = getNowString();
139 torben 1986 }
140     }
141    
142     private void transportSql2Mq()
143     {
144 torben 2051 MQQueueManager mqMgr = null;
145     MQQueue out_queue = null;
146    
147 torben 1999 string filename = getLogFilename(LogfileType.LogTransactions);
148 torben 1986 using (StreamWriter translog = new StreamWriter(filename, true) )
149     try
150     {
151 torben 2011 //MQ Options
152 torben 1986 Hashtable connProps = getConnectionProperties();
153     int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
154    
155 torben 2011 //MySQL Options
156 torben 1986 string mysqlString = buildMysqlConnString();
157    
158 torben 2051 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
159     mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
160     out_queue = mqMgr.AccessQueue(queueName, openOptions);
161 torben 2011 using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
162     using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
163     {
164     sqlReadConnection.Open();
165     sqlWriteConnection.Open();
166 torben 1986
167 torben 2011 //stage 3 move messages
168     string readSql = "CALL " + sql2mqReadQuery + "()";
169     MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);
170     MySqlDataReader dataReader = readCmd.ExecuteReader();
171     while (dataReader.Read())
172     {
173     int id = dataReader.GetInt32(0);
174     string msgString = dataReader.GetString(1);
175 torben 2010
176 torben 2011 MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
177     // same as MQPMO_DEFAULT
178 torben 1986
179 torben 2011 MQMessage msg = new MQMessage();
180     msg.Format = MQC.MQFMT_STRING;
181     msg.CharacterSet = 1252;
182     msg.WriteString(msgString);
183 torben 1986
184 torben 2011 out_queue.Put(msg, pmo);
185 torben 1986
186 torben 2011 //now that the message has been put on queue mark it as such
187 torben 1986
188 torben 2011 string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
189     MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
190     int numrows = updateCmd.ExecuteNonQuery();
191 torben 1986
192 torben 2056 translog.WriteLine(getNowString() + " " + msgString);
193 torben 1986
194 torben 2011 if (numrows != 1)
195     {
196     break;
197     }
198     statusData.counter++;
199     }
200 torben 1986
201     }
202     }
203     catch (Exception e)
204     {
205 torben 1999 statusData.lastrunOk = false;
206 torben 2048 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
207 torben 1999 Console.WriteLine(statusData.lastErrorMessage);
208 torben 2050 Console.WriteLine(e.StackTrace);
209 torben 1999 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
210 torben 1986 }
211 torben 2051 finally
212     {
213    
214     if (out_queue != null && out_queue.IsOpen)
215     {
216     try
217     {
218     out_queue.Close();
219     }
220     catch (Exception e)
221     {
222     Console.WriteLine("Error cleaning up qmgr " + e.Message);
223     }
224     }
225    
226     if (mqMgr != null && mqMgr.IsOpen)
227     {
228     try
229     {
230     mqMgr.Close();
231     }
232     catch (Exception e)
233     {
234     Console.WriteLine("Error cleaning up qmgr " + e.Message);
235     }
236     }
237    
238     }
239 torben 1986 }
240    
241     private void transportMq2Sql()
242     {
243 torben 2136 int messageCount = 0;
244    
245 torben 2051 MQQueueManager mqMgr = null;
246     MQQueue in_queue = null;
247 torben 1999 string filename = getLogFilename(LogfileType.LogTransactions);
248 torben 1986 using (StreamWriter translog = new StreamWriter(filename, true))
249 torben 2051 try
250     {
251     //MQ options
252     Hashtable connProps = getConnectionProperties();
253     int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
254 torben 1986
255 torben 2051 //MySQL options
256     string mysqlString = buildMysqlConnString();
257 torben 1986
258 torben 2051 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
259     mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
260     in_queue = mqMgr.AccessQueue(queueName, openOptions);
261     using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
262     {
263 torben 1986
264 torben 2051 sqlConnection.Open();
265 torben 1986
266 torben 2011
267 torben 2051 //stage 3 move messages
268     bool isContinue = true;
269     while (isContinue)
270     {
271 torben 1986
272 torben 2051 MQMessage mqMsg = new MQMessage();
273     MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
274 torben 1986
275 torben 2051 mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
276 torben 1986
277 torben 2051 try
278 torben 1986 {
279 torben 2051 in_queue.Get(mqMsg, mqGetMsgOpts);
280     if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
281     {
282     string msgString = mqMsg.ReadString(mqMsg.MessageLength);
283 torben 2062 //System.Console.WriteLine(msgString);
284 torben 2011
285 torben 2083
286     // Hvis transaktionen starter med et ? er det ikke en gyldig transaktion
287     // validér ligeledes at headeren er gyldig
288     if ( msgString.StartsWith("?") || validateSalt2Header(msgString) == false )
289 torben 2058 {
290     string discarded_filename = getLogFilename(LogfileType.LogDiscarded);
291     using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))
292     {
293 torben 2084 discardedlog.WriteLine(getNowString() + " " + msgString);
294 torben 2058 }
295     mqMgr.Commit();//fjern den afviste transaktion fra køen
296     statusData.discardedCounter++;
297     continue; //gå frem til at tage næste transaktion fra køen
298     }
299    
300    
301 torben 2051 string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
302 torben 2011
303 torben 2051 MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
304     int numrows = sqlcmd.ExecuteNonQuery();
305 torben 2011
306 torben 2051 if (numrows == 1)
307     {
308 torben 2056 translog.WriteLine(getNowString() + " " + msgString);
309 torben 2051 mqMgr.Commit();
310     statusData.counter++;
311 torben 2136
312    
313     messageCount++;// increment per run message counter
314     if (messageCount >= 10000) // if we have moved 10000 messages in this run - let it go and give the other transports a change
315     {
316     isContinue = false;
317     }
318    
319    
320    
321 torben 2051 }
322     else
323     {
324     mqMgr.Backout();
325     isContinue = false;
326     }
327    
328 torben 2011 }
329     else
330     {
331 torben 2051 System.Console.WriteLine("Non-text message");
332 torben 2011 }
333 torben 1986 }
334 torben 2051 catch (MQException mqe)
335 torben 1986 {
336 torben 2051 isContinue = false;
337 torben 1986
338 torben 2051 // report reason, if any
339     if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
340     {
341     // special report for normal end
342     System.Console.WriteLine("no more messages");
343     }
344     else
345     {
346     // general report for other reasons
347     System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
348     statusData.lastrunOk = false;
349     }
350    
351 torben 2011 }
352    
353 torben 2051
354 torben 1986 }
355    
356 torben 2051 }
357 torben 2011
358 torben 2051 }
359     catch (Exception e)
360     {
361 torben 2055 //Det vil være mest korrekt at Rollback/backout MQ Transaktionen her - for at være sikker på at Message'n fjernes fra køen
362     try
363     {
364     if (mqMgr != null)
365     {
366     mqMgr.Backout();
367     }
368     }
369     catch (Exception e2)
370     {
371     this.addLogEntry("Error backing out from MQ Transaction: " + e2.Message);
372     }
373    
374 torben 2051 statusData.lastrunOk = false;
375    
376     statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
377     Console.WriteLine(statusData.lastErrorMessage);
378     Console.WriteLine(e.StackTrace);
379     EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
380     }
381     finally
382     {
383    
384     if (in_queue != null && in_queue.IsOpen)
385     {
386     try
387     {
388     in_queue.Close();
389     }
390     catch (Exception e)
391     {
392     Console.WriteLine("Error cleaning up qmgr " + e.Message);
393     }
394 torben 1986 }
395 torben 2051
396     if (mqMgr != null && mqMgr.IsOpen)
397     {
398     try
399     {
400     mqMgr.Close();
401     } catch (Exception e) {
402     Console.WriteLine("Error cleaning up qmgr " + e.Message);
403     }
404     }
405 torben 1986
406     }
407     }
408    
409     private string buildMysqlConnString()
410     {
411     string connectionString = "";
412    
413     connectionString += "SERVER=" + controller.mysqlHost + ";";
414     //connectionString += "DATABASE=" + controller.mysqlHost + ";";
415     connectionString += "UID=" + controller.mysqlUser + ";";
416     connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
417 torben 2051 connectionString += "Max Pool Size=20;";
418     //connectionString += "ConnectionReset=true;";//ConnectionReset kræver muligvis at Default Database er angivet - det virker ihvertfald ikke uden
419 torben 1986
420     return connectionString;
421     }
422    
423     private Hashtable getConnectionProperties()
424     {
425     Hashtable connProperties = new Hashtable();
426     connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
427     connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);
428 torben 2060 connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel);
429 torben 1986 return connProperties;
430     }
431    
432 torben 1999
433    
434     private string getLogFilename(LogfileType type)
435 torben 1986 {
436    
437     DateTime now = DateTime.Now;
438     string filename = controller.logDirectory + "\\";
439 torben 1999
440 torben 2057 //Find uge nr
441     DateTimeFormatInfo dfi = DateTimeFormatInfo.CurrentInfo;
442     Calendar myCal = CultureInfo.InvariantCulture.Calendar;//System.Globalization
443     int week = myCal.GetWeekOfYear(now, dfi.CalendarWeekRule, dfi.FirstDayOfWeek);
444    
445 torben 1999 switch (type)
446     {
447     case LogfileType.LogEvents:
448     filename += "eventlog_";
449     break;
450    
451     case LogfileType.LogTransactions:
452     filename += "transactionlog_";
453     break;
454 torben 2058 case LogfileType.LogDiscarded:
455     filename += "discardedlog_";
456     break;
457 torben 1999 }
458    
459    
460 torben 2057 filename += name + "_" + now.Year.ToString("D4") + "_W" + week.ToString("D2") + ".log";
461 torben 1986
462     return filename;
463     }
464    
465     public string getNowString()
466     {
467     DateTime now = DateTime.Now;
468    
469     return now.ToString("s");
470     }
471    
472 torben 2088 /* no used any where used added here for reference/ just in case */
473     private void sendErrorMail(string sub, string msg)
474     {
475     System.Net.Mail.MailMessage message = new System.Net.Mail.MailMessage();
476     message.To.Add("thn@daoas.dk");
477     message.Subject = "Error from DaoMqPump2: " + sub;
478     message.From = new System.Net.Mail.MailAddress("no-reply@daoas.dk");
479     message.Body = msg;
480    
481     System.Net.Mail.SmtpClient smtp = new System.Net.Mail.SmtpClient("mail.dao.int");
482     smtp.Send(message);
483     }
484    
485 torben 2083 private bool validateSalt2Header(string salt2String)
486     {
487 torben 2086 if (salt2String.Length < 66)
488     {
489     addLogEntry("Transaction too short - discarding");
490     return false;
491     }
492    
493    
494 torben 2083 int result;
495 torben 2085 long result_long;
496 torben 2083
497     string afsender = salt2String.Substring(0, 5);
498     string modtager = salt2String.Substring(5, 5);
499     string afsenderTegnSaet = salt2String.Substring(10, 6);
500     string standardNavn = salt2String.Substring(16, 6);
501     string standardVersion = salt2String.Substring(22, 3);
502     string afsenderSekvensnr = salt2String.Substring(25, 6);
503     string afsenderTidsstempel = salt2String.Substring(31, 14);
504     string afsenderBakkeIdent = salt2String.Substring(45, 5);
505     string modtagerBakkeIdent = salt2String.Substring(50, 5);
506     string transaktionForkortelse = salt2String.Substring(55, 4);
507     string transaktionsLaengde = salt2String.Substring(59, 5);
508     string prioritet = salt2String.Substring(64, 1);
509    
510 torben 2085
511    
512 torben 2083 if (int.TryParse(standardVersion.Trim(), out result) == false) // standardVersion _skal_ være en int
513     {
514 torben 2086 addLogEntry("standardVersion not an integer, discarding");
515 torben 2083 return false;
516     }
517    
518     if (int.TryParse(afsenderSekvensnr.Trim(), out result) == false) // afsenderSekvensnr _skal_ være en int
519     {
520 torben 2086 addLogEntry("afsenderSekvensnr not an integer, discarding");
521 torben 2083 return false;
522     }
523    
524 torben 2085 if (long.TryParse(afsenderTidsstempel.Trim(), out result_long) == false) // afsenderTidsstempel _skal_ være en long
525 torben 2083 {
526 torben 2087 addLogEntry("afsenderTidsstempel not a long integer, discarding");
527 torben 2083 return false;
528     }
529    
530     if (int.TryParse(transaktionsLaengde.Trim(), out result) == false) // transaktionsLaengde _skal_ være en int
531     {
532 torben 2086 addLogEntry("transaktionsLaengde not an integer, discarding");
533 torben 2083 return false;
534     }
535    
536     if ( int.TryParse(prioritet.Trim(), out result) == false ) // prioritet _skal_ være en int
537     {
538 torben 2086 addLogEntry("prioritet not an integer, discarding");
539 torben 2083 return false;
540     }
541    
542     return true;
543     }
544    
545 torben 1986 private void addLogEntry(string msg)
546     {
547     msg = getNowString() + " " + msg;
548     lock (logEntries)
549     {
550     logEntries.AddFirst(msg);
551    
552     if (logEntries.Count > 20)
553     {
554     logEntries.RemoveLast();
555 torben 1999 }
556 torben 1986 }
557 torben 1999
558     string filename = getLogFilename(LogfileType.LogEvents);
559     using (StreamWriter eventlog = new StreamWriter(filename, true))
560     {
561     eventlog.WriteLine(msg);
562     }
563 torben 1986 }
564    
565     public string[] getLog()
566     {
567     lock(logEntries)
568     {
569     List<string> tmpEntries = new List<string>();
570     foreach (string s in logEntries)
571     {
572     tmpEntries.Add(s);
573     }
574     return tmpEntries.ToArray();
575     }
576     }
577    
578     }
579     }

  ViewVC Help
Powered by ViewVC 1.1.20