/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Annotation of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2166 - (hide annotations) (download)
Fri May 16 18:24:05 2014 UTC (10 years ago) by torben
File size: 21437 byte(s)
Svn:ignore
1 torben 1986 using System;
2     using System.Collections;
3     using System.Collections.Generic;
4     using System.IO;
5    
6     using System.Diagnostics;
7    
8     using IBM.WMQ;
9     using MySql.Data.MySqlClient;
10 torben 2057 using System.Globalization;
11 torben 1986
12 torben 2166 using DaoCommon;
13    
14 torben 1986 namespace DaoMqPump2
15     {
16     public class Transport
17     {
18 torben 1999
19     enum LogfileType {
20     LogTransactions,
21 torben 2058 LogEvents,
22     LogDiscarded
23 torben 1999 }
24    
25 torben 1986 public static string SQL2MQ = "sql2mq";
26     public static string MQ2SQL = "mq2sql";
27    
28 torben 2001 //private bool enabled;
29 torben 1986
30     TransportController controller;
31    
32 torben 1999 StatusData statusData = new StatusData();
33    
34 torben 1986 public string name { get; private set; }
35     public string direction { get; private set; }
36     public string queueName { get; private set; }
37     public string mq2sqlInsertQuery { get; private set; }
38     public string sql2mqReadQuery { get; private set; }
39     public string sql2mqUpdateQuery { get; private set; }
40    
41 torben 1999 //public bool lastrunOk { get; private set; }
42     //public string lastErrorMessage { get; private set; }
43 torben 1986
44 torben 1999 //public string lastOkTime { get; private set; }
45     //public string lastErrorTime { get; private set; }
46     //public string lastTransferTime { get; private set; }
47 torben 1986
48 torben 1999 //public int counter { get; private set; }
49 torben 1986
50 torben 1999 public StatusData TransportStatusData
51     {
52     get
53     {
54     return this.statusData;
55     }
56     }
57    
58    
59     public bool Enabled
60     {
61     get {
62 torben 2001 return statusData.transportEnabled;
63 torben 1999 }
64     set
65     {
66 torben 2001 statusData.transportEnabled = value;
67 torben 1999 if (value == true)
68     {
69     this.addLogEntry("Transport enabled");
70     }
71     else
72     {
73     this.addLogEntry("Transport disabled");
74     }
75     }
76     }
77    
78    
79 torben 1986 private LinkedList<string> logEntries = new LinkedList<string>();
80    
81    
82     public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)
83     {
84     this.controller = controller;
85     this.name = name;
86     this.direction = direction;
87     this.queueName = queueName;
88     this.mq2sqlInsertQuery = mq2sqlInsertQuery;
89     this.sql2mqReadQuery = sql2mqReadQuery;
90     this.sql2mqUpdateQuery = sql2mqUpdateQuery;
91    
92 torben 2001 statusData.transportEnabled = enabled;
93 torben 1986
94 torben 1999
95     statusData.lastrunOk = true;
96     statusData.counter = 0;
97     statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";
98    
99 torben 1986 addLogEntry( "Starting ... " );
100     }
101    
102 torben 1999 ~Transport()
103     {
104     addLogEntry("Stopping ... ");
105     }
106    
107    
108 torben 1986 public void transportMessages()
109     {
110 torben 2001 if (statusData.transportEnabled == false)
111 torben 1986 return;
112    
113     Console.WriteLine(name + " -> transportMessages() ");
114 torben 1999 statusData.lastrunOk = true;
115 torben 1986
116 torben 1999 int startCounter = statusData.counter;
117    
118 torben 1986 if (direction == SQL2MQ)
119     {
120     transportSql2Mq();
121     }
122     else
123     {
124     transportMq2Sql();
125     }
126    
127 torben 1999 if (statusData.lastrunOk == true)
128 torben 1986 {
129 torben 2166 statusData.lastOkTime = DaoUtil.getNowString();
130 torben 1999
131     if (statusData.counter != startCounter)
132     {
133     //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet
134 torben 2166 statusData.lastTransferTime = DaoUtil.getNowString();
135 torben 1999 }
136 torben 1986 }
137     else
138     {
139 torben 1999 addLogEntry(statusData.lastErrorMessage);
140 torben 2166 statusData.lastErrorTime = DaoUtil.getNowString();
141 torben 1986 }
142     }
143    
144     private void transportSql2Mq()
145     {
146 torben 2051 MQQueueManager mqMgr = null;
147     MQQueue out_queue = null;
148    
149 torben 1999 string filename = getLogFilename(LogfileType.LogTransactions);
150 torben 1986 using (StreamWriter translog = new StreamWriter(filename, true) )
151     try
152     {
153 torben 2011 //MQ Options
154 torben 1986 Hashtable connProps = getConnectionProperties();
155     int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
156    
157 torben 2011 //MySQL Options
158 torben 1986 string mysqlString = buildMysqlConnString();
159    
160 torben 2051 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
161     mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
162     out_queue = mqMgr.AccessQueue(queueName, openOptions);
163 torben 2011 using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
164     using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
165     {
166     sqlReadConnection.Open();
167     sqlWriteConnection.Open();
168 torben 1986
169 torben 2011 //stage 3 move messages
170     string readSql = "CALL " + sql2mqReadQuery + "()";
171     MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);
172     MySqlDataReader dataReader = readCmd.ExecuteReader();
173     while (dataReader.Read())
174     {
175     int id = dataReader.GetInt32(0);
176     string msgString = dataReader.GetString(1);
177 torben 2010
178 torben 2011 MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
179     // same as MQPMO_DEFAULT
180 torben 1986
181 torben 2011 MQMessage msg = new MQMessage();
182     msg.Format = MQC.MQFMT_STRING;
183     msg.CharacterSet = 1252;
184     msg.WriteString(msgString);
185 torben 1986
186 torben 2011 out_queue.Put(msg, pmo);
187 torben 1986
188 torben 2011 //now that the message has been put on queue mark it as such
189 torben 1986
190 torben 2011 string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
191     MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
192     int numrows = updateCmd.ExecuteNonQuery();
193 torben 1986
194 torben 2166 translog.WriteLine(DaoUtil.getNowString() + " " + msgString);
195 torben 1986
196 torben 2011 if (numrows != 1)
197     {
198     break;
199     }
200     statusData.counter++;
201     }
202 torben 1986
203     }
204     }
205     catch (Exception e)
206     {
207 torben 1999 statusData.lastrunOk = false;
208 torben 2048 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
209 torben 1999 Console.WriteLine(statusData.lastErrorMessage);
210 torben 2050 Console.WriteLine(e.StackTrace);
211 torben 1999 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
212 torben 1986 }
213 torben 2051 finally
214     {
215    
216     if (out_queue != null && out_queue.IsOpen)
217     {
218     try
219     {
220     out_queue.Close();
221     }
222     catch (Exception e)
223     {
224     Console.WriteLine("Error cleaning up qmgr " + e.Message);
225     }
226     }
227    
228     if (mqMgr != null && mqMgr.IsOpen)
229     {
230     try
231     {
232     mqMgr.Close();
233     }
234     catch (Exception e)
235     {
236     Console.WriteLine("Error cleaning up qmgr " + e.Message);
237     }
238     }
239    
240     }
241 torben 1986 }
242    
243     private void transportMq2Sql()
244     {
245 torben 2136 int messageCount = 0;
246    
247 torben 2051 MQQueueManager mqMgr = null;
248     MQQueue in_queue = null;
249 torben 1999 string filename = getLogFilename(LogfileType.LogTransactions);
250 torben 1986 using (StreamWriter translog = new StreamWriter(filename, true))
251 torben 2051 try
252     {
253     //MQ options
254     Hashtable connProps = getConnectionProperties();
255     int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
256 torben 1986
257 torben 2051 //MySQL options
258     string mysqlString = buildMysqlConnString();
259 torben 1986
260 torben 2051 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
261     mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
262     in_queue = mqMgr.AccessQueue(queueName, openOptions);
263     using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
264     {
265 torben 1986
266 torben 2051 sqlConnection.Open();
267 torben 1986
268 torben 2011
269 torben 2051 //stage 3 move messages
270     bool isContinue = true;
271     while (isContinue)
272     {
273 torben 1986
274 torben 2051 MQMessage mqMsg = new MQMessage();
275     MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
276 torben 1986
277 torben 2051 mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
278 torben 1986
279 torben 2051 try
280 torben 1986 {
281 torben 2051 in_queue.Get(mqMsg, mqGetMsgOpts);
282     if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
283     {
284     string msgString = mqMsg.ReadString(mqMsg.MessageLength);
285 torben 2062 //System.Console.WriteLine(msgString);
286 torben 2011
287 torben 2083
288     // Hvis transaktionen starter med et ? er det ikke en gyldig transaktion
289     // validér ligeledes at headeren er gyldig
290     if ( msgString.StartsWith("?") || validateSalt2Header(msgString) == false )
291 torben 2058 {
292     string discarded_filename = getLogFilename(LogfileType.LogDiscarded);
293     using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))
294     {
295 torben 2166 discardedlog.WriteLine( DaoUtil.getNowString() + " " + msgString );
296 torben 2058 }
297     mqMgr.Commit();//fjern den afviste transaktion fra køen
298     statusData.discardedCounter++;
299     continue; //gå frem til at tage næste transaktion fra køen
300     }
301    
302    
303 torben 2051 string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
304 torben 2011
305 torben 2051 MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
306     int numrows = sqlcmd.ExecuteNonQuery();
307 torben 2011
308 torben 2051 if (numrows == 1)
309     {
310 torben 2166 translog.WriteLine( DaoUtil.getNowString() + " " + msgString );
311 torben 2051 mqMgr.Commit();
312     statusData.counter++;
313 torben 2136
314    
315     messageCount++;// increment per run message counter
316     if (messageCount >= 10000) // if we have moved 10000 messages in this run - let it go and give the other transports a change
317     {
318     isContinue = false;
319     }
320    
321    
322    
323 torben 2051 }
324     else
325     {
326     mqMgr.Backout();
327     isContinue = false;
328     }
329    
330 torben 2011 }
331     else
332     {
333 torben 2051 System.Console.WriteLine("Non-text message");
334 torben 2011 }
335 torben 1986 }
336 torben 2051 catch (MQException mqe)
337 torben 1986 {
338 torben 2051 isContinue = false;
339 torben 1986
340 torben 2051 // report reason, if any
341     if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
342     {
343     // special report for normal end
344     System.Console.WriteLine("no more messages");
345     }
346     else
347     {
348     // general report for other reasons
349     System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
350     statusData.lastrunOk = false;
351     }
352    
353 torben 2011 }
354    
355 torben 2051
356 torben 1986 }
357    
358 torben 2051 }
359 torben 2011
360 torben 2051 }
361     catch (Exception e)
362     {
363 torben 2055 //Det vil være mest korrekt at Rollback/backout MQ Transaktionen her - for at være sikker på at Message'n fjernes fra køen
364     try
365     {
366     if (mqMgr != null)
367     {
368     mqMgr.Backout();
369     }
370     }
371     catch (Exception e2)
372     {
373     this.addLogEntry("Error backing out from MQ Transaction: " + e2.Message);
374     }
375    
376 torben 2051 statusData.lastrunOk = false;
377    
378     statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
379     Console.WriteLine(statusData.lastErrorMessage);
380     Console.WriteLine(e.StackTrace);
381     EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
382     }
383     finally
384     {
385    
386     if (in_queue != null && in_queue.IsOpen)
387     {
388     try
389     {
390     in_queue.Close();
391     }
392     catch (Exception e)
393     {
394     Console.WriteLine("Error cleaning up qmgr " + e.Message);
395     }
396 torben 1986 }
397 torben 2051
398     if (mqMgr != null && mqMgr.IsOpen)
399     {
400     try
401     {
402     mqMgr.Close();
403     } catch (Exception e) {
404     Console.WriteLine("Error cleaning up qmgr " + e.Message);
405     }
406     }
407 torben 1986
408     }
409     }
410    
411     private string buildMysqlConnString()
412     {
413     string connectionString = "";
414    
415     connectionString += "SERVER=" + controller.mysqlHost + ";";
416     //connectionString += "DATABASE=" + controller.mysqlHost + ";";
417     connectionString += "UID=" + controller.mysqlUser + ";";
418     connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
419 torben 2051 connectionString += "Max Pool Size=20;";
420     //connectionString += "ConnectionReset=true;";//ConnectionReset kræver muligvis at Default Database er angivet - det virker ihvertfald ikke uden
421 torben 1986
422     return connectionString;
423     }
424    
425     private Hashtable getConnectionProperties()
426     {
427     Hashtable connProperties = new Hashtable();
428     connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
429     connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);
430 torben 2060 connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel);
431 torben 1986 return connProperties;
432     }
433    
434 torben 1999
435    
436     private string getLogFilename(LogfileType type)
437 torben 1986 {
438    
439     DateTime now = DateTime.Now;
440     string filename = controller.logDirectory + "\\";
441 torben 1999
442 torben 2057 //Find uge nr
443     DateTimeFormatInfo dfi = DateTimeFormatInfo.CurrentInfo;
444     Calendar myCal = CultureInfo.InvariantCulture.Calendar;//System.Globalization
445     int week = myCal.GetWeekOfYear(now, dfi.CalendarWeekRule, dfi.FirstDayOfWeek);
446    
447 torben 1999 switch (type)
448     {
449     case LogfileType.LogEvents:
450     filename += "eventlog_";
451     break;
452    
453     case LogfileType.LogTransactions:
454     filename += "transactionlog_";
455     break;
456 torben 2058 case LogfileType.LogDiscarded:
457     filename += "discardedlog_";
458     break;
459 torben 1999 }
460    
461    
462 torben 2057 filename += name + "_" + now.Year.ToString("D4") + "_W" + week.ToString("D2") + ".log";
463 torben 1986
464     return filename;
465     }
466    
467 torben 2083 private bool validateSalt2Header(string salt2String)
468     {
469 torben 2086 if (salt2String.Length < 66)
470     {
471     addLogEntry("Transaction too short - discarding");
472     return false;
473     }
474    
475    
476 torben 2083 int result;
477 torben 2085 long result_long;
478 torben 2083
479     string afsender = salt2String.Substring(0, 5);
480     string modtager = salt2String.Substring(5, 5);
481     string afsenderTegnSaet = salt2String.Substring(10, 6);
482     string standardNavn = salt2String.Substring(16, 6);
483     string standardVersion = salt2String.Substring(22, 3);
484     string afsenderSekvensnr = salt2String.Substring(25, 6);
485     string afsenderTidsstempel = salt2String.Substring(31, 14);
486     string afsenderBakkeIdent = salt2String.Substring(45, 5);
487     string modtagerBakkeIdent = salt2String.Substring(50, 5);
488     string transaktionForkortelse = salt2String.Substring(55, 4);
489     string transaktionsLaengde = salt2String.Substring(59, 5);
490     string prioritet = salt2String.Substring(64, 1);
491    
492 torben 2085
493    
494 torben 2083 if (int.TryParse(standardVersion.Trim(), out result) == false) // standardVersion _skal_ være en int
495     {
496 torben 2086 addLogEntry("standardVersion not an integer, discarding");
497 torben 2083 return false;
498     }
499    
500     if (int.TryParse(afsenderSekvensnr.Trim(), out result) == false) // afsenderSekvensnr _skal_ være en int
501     {
502 torben 2086 addLogEntry("afsenderSekvensnr not an integer, discarding");
503 torben 2083 return false;
504     }
505    
506 torben 2085 if (long.TryParse(afsenderTidsstempel.Trim(), out result_long) == false) // afsenderTidsstempel _skal_ være en long
507 torben 2083 {
508 torben 2087 addLogEntry("afsenderTidsstempel not a long integer, discarding");
509 torben 2083 return false;
510     }
511    
512     if (int.TryParse(transaktionsLaengde.Trim(), out result) == false) // transaktionsLaengde _skal_ være en int
513     {
514 torben 2086 addLogEntry("transaktionsLaengde not an integer, discarding");
515 torben 2083 return false;
516     }
517    
518     if ( int.TryParse(prioritet.Trim(), out result) == false ) // prioritet _skal_ være en int
519     {
520 torben 2086 addLogEntry("prioritet not an integer, discarding");
521 torben 2083 return false;
522     }
523    
524     return true;
525     }
526    
527 torben 1986 private void addLogEntry(string msg)
528     {
529 torben 2166 msg = DaoUtil.getNowString() + " " + msg;
530 torben 1986 lock (logEntries)
531     {
532     logEntries.AddFirst(msg);
533    
534     if (logEntries.Count > 20)
535     {
536     logEntries.RemoveLast();
537 torben 1999 }
538 torben 1986 }
539 torben 1999
540     string filename = getLogFilename(LogfileType.LogEvents);
541     using (StreamWriter eventlog = new StreamWriter(filename, true))
542     {
543     eventlog.WriteLine(msg);
544     }
545 torben 1986 }
546    
547     public string[] getLog()
548     {
549     lock(logEntries)
550     {
551     List<string> tmpEntries = new List<string>();
552     foreach (string s in logEntries)
553     {
554     tmpEntries.Add(s);
555     }
556     return tmpEntries.ToArray();
557     }
558     }
559    
560     }
561     }

  ViewVC Help
Powered by ViewVC 1.1.20