/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Annotation of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2058 - (hide annotations) (download)
Wed Aug 28 06:45:20 2013 UTC (10 years, 9 months ago) by torben
File size: 18419 byte(s)
Add a support for discarding transactions and writing those to a seperate log + use a counter for keeping track of how many were discarded in current session/instance
1 torben 1986 using System;
2     using System.Collections;
3     using System.Collections.Generic;
4     using System.IO;
5    
6     using System.Diagnostics;
7    
8     using IBM.WMQ;
9     using MySql.Data.MySqlClient;
10 torben 2057 using System.Globalization;
11 torben 1986
12     namespace DaoMqPump2
13     {
14     public class Transport
15     {
16 torben 1999
17     enum LogfileType {
18     LogTransactions,
19 torben 2058 LogEvents,
20     LogDiscarded
21 torben 1999 }
22    
23 torben 1986 public static string SQL2MQ = "sql2mq";
24     public static string MQ2SQL = "mq2sql";
25    
26 torben 2001 //private bool enabled;
27 torben 1986
28     TransportController controller;
29    
30 torben 1999 StatusData statusData = new StatusData();
31    
32 torben 1986 public string name { get; private set; }
33     public string direction { get; private set; }
34     public string queueName { get; private set; }
35     public string mq2sqlInsertQuery { get; private set; }
36     public string sql2mqReadQuery { get; private set; }
37     public string sql2mqUpdateQuery { get; private set; }
38    
39 torben 1999 //public bool lastrunOk { get; private set; }
40     //public string lastErrorMessage { get; private set; }
41 torben 1986
42 torben 1999 //public string lastOkTime { get; private set; }
43     //public string lastErrorTime { get; private set; }
44     //public string lastTransferTime { get; private set; }
45 torben 1986
46 torben 1999 //public int counter { get; private set; }
47 torben 1986
48 torben 1999 public StatusData TransportStatusData
49     {
50     get
51     {
52     return this.statusData;
53     }
54     }
55    
56    
57     public bool Enabled
58     {
59     get {
60 torben 2001 return statusData.transportEnabled;
61 torben 1999 }
62     set
63     {
64 torben 2001 statusData.transportEnabled = value;
65 torben 1999 if (value == true)
66     {
67     this.addLogEntry("Transport enabled");
68     }
69     else
70     {
71     this.addLogEntry("Transport disabled");
72     }
73     }
74     }
75    
76    
77 torben 1986 private LinkedList<string> logEntries = new LinkedList<string>();
78    
79    
80     public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)
81     {
82     this.controller = controller;
83     this.name = name;
84     this.direction = direction;
85     this.queueName = queueName;
86     this.mq2sqlInsertQuery = mq2sqlInsertQuery;
87     this.sql2mqReadQuery = sql2mqReadQuery;
88     this.sql2mqUpdateQuery = sql2mqUpdateQuery;
89    
90 torben 2001 statusData.transportEnabled = enabled;
91 torben 1986
92 torben 1999
93     statusData.lastrunOk = true;
94     statusData.counter = 0;
95     statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";
96    
97 torben 1986 addLogEntry( "Starting ... " );
98     }
99    
100 torben 1999 ~Transport()
101     {
102     addLogEntry("Stopping ... ");
103     }
104    
105    
106 torben 1986 public void transportMessages()
107     {
108 torben 2001 if (statusData.transportEnabled == false)
109 torben 1986 return;
110    
111     Console.WriteLine(name + " -> transportMessages() ");
112 torben 1999 statusData.lastrunOk = true;
113 torben 1986
114 torben 1999 int startCounter = statusData.counter;
115    
116 torben 1986 if (direction == SQL2MQ)
117     {
118     transportSql2Mq();
119     }
120     else
121     {
122     transportMq2Sql();
123     }
124    
125 torben 1999 if (statusData.lastrunOk == true)
126 torben 1986 {
127 torben 1999 statusData.lastOkTime = getNowString();
128    
129     if (statusData.counter != startCounter)
130     {
131     //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet
132     statusData.lastTransferTime = getNowString();
133     }
134 torben 1986 }
135     else
136     {
137 torben 1999 addLogEntry(statusData.lastErrorMessage);
138     statusData.lastErrorTime = getNowString();
139 torben 1986 }
140     }
141    
142     private void transportSql2Mq()
143     {
144 torben 2051 MQQueueManager mqMgr = null;
145     MQQueue out_queue = null;
146    
147 torben 1999 string filename = getLogFilename(LogfileType.LogTransactions);
148 torben 1986 using (StreamWriter translog = new StreamWriter(filename, true) )
149     try
150     {
151 torben 2011 //MQ Options
152 torben 1986 Hashtable connProps = getConnectionProperties();
153     int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
154    
155 torben 2011 //MySQL Options
156 torben 1986 string mysqlString = buildMysqlConnString();
157    
158 torben 2051 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
159     mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
160     out_queue = mqMgr.AccessQueue(queueName, openOptions);
161 torben 2011 using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
162     using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
163     {
164     sqlReadConnection.Open();
165     sqlWriteConnection.Open();
166 torben 1986
167 torben 2011 //stage 3 move messages
168     string readSql = "CALL " + sql2mqReadQuery + "()";
169     MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);
170     MySqlDataReader dataReader = readCmd.ExecuteReader();
171     while (dataReader.Read())
172     {
173     int id = dataReader.GetInt32(0);
174     string msgString = dataReader.GetString(1);
175 torben 2010
176 torben 2011 MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
177     // same as MQPMO_DEFAULT
178 torben 1986
179 torben 2011 MQMessage msg = new MQMessage();
180     msg.Format = MQC.MQFMT_STRING;
181     msg.CharacterSet = 1252;
182     msg.WriteString(msgString);
183 torben 1986
184 torben 2011 out_queue.Put(msg, pmo);
185 torben 1986
186 torben 2011 //now that the message has been put on queue mark it as such
187 torben 1986
188 torben 2011 string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
189     MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
190     int numrows = updateCmd.ExecuteNonQuery();
191 torben 1986
192 torben 2056 translog.WriteLine(getNowString() + " " + msgString);
193 torben 1986
194 torben 2011 if (numrows != 1)
195     {
196     break;
197     }
198     statusData.counter++;
199     }
200 torben 1986
201     }
202     }
203     catch (Exception e)
204     {
205 torben 1999 statusData.lastrunOk = false;
206 torben 2048 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
207 torben 1999 Console.WriteLine(statusData.lastErrorMessage);
208 torben 2050 Console.WriteLine(e.StackTrace);
209 torben 1999 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
210 torben 1986 }
211 torben 2051 finally
212     {
213    
214     if (out_queue != null && out_queue.IsOpen)
215     {
216     try
217     {
218     out_queue.Close();
219     }
220     catch (Exception e)
221     {
222     Console.WriteLine("Error cleaning up qmgr " + e.Message);
223     }
224     }
225    
226     if (mqMgr != null && mqMgr.IsOpen)
227     {
228     try
229     {
230     mqMgr.Close();
231     }
232     catch (Exception e)
233     {
234     Console.WriteLine("Error cleaning up qmgr " + e.Message);
235     }
236     }
237    
238     }
239 torben 1986 }
240    
241     private void transportMq2Sql()
242     {
243 torben 2051 MQQueueManager mqMgr = null;
244     MQQueue in_queue = null;
245 torben 1999 string filename = getLogFilename(LogfileType.LogTransactions);
246 torben 1986 using (StreamWriter translog = new StreamWriter(filename, true))
247 torben 2051 try
248     {
249     //MQ options
250     Hashtable connProps = getConnectionProperties();
251     int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
252 torben 1986
253 torben 2051 //MySQL options
254     string mysqlString = buildMysqlConnString();
255 torben 1986
256 torben 2051 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
257     mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
258     in_queue = mqMgr.AccessQueue(queueName, openOptions);
259     using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
260     {
261 torben 1986
262 torben 2051 sqlConnection.Open();
263 torben 1986
264 torben 2011
265 torben 2051 //stage 3 move messages
266     bool isContinue = true;
267     while (isContinue)
268     {
269 torben 1986
270 torben 2051 MQMessage mqMsg = new MQMessage();
271     MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
272 torben 1986
273 torben 2051 mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
274 torben 1986
275 torben 2051 try
276 torben 1986 {
277 torben 2051 in_queue.Get(mqMsg, mqGetMsgOpts);
278     if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
279     {
280     string msgString = mqMsg.ReadString(mqMsg.MessageLength);
281     System.Console.WriteLine(msgString);
282 torben 2011
283 torben 2058 if ( msgString.StartsWith("?") ) //Hvis transaktionen starter med et ? er det ikke en gyldig transaktion
284     {
285     string discarded_filename = getLogFilename(LogfileType.LogDiscarded);
286     using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))
287     {
288     discardedlog.WriteLine(msgString);
289     }
290     mqMgr.Commit();//fjern den afviste transaktion fra køen
291     statusData.discardedCounter++;
292     continue; //gå frem til at tage næste transaktion fra køen
293     }
294    
295    
296 torben 2051 string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
297 torben 2011
298 torben 2051 MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
299     int numrows = sqlcmd.ExecuteNonQuery();
300 torben 2011
301 torben 2051 if (numrows == 1)
302     {
303 torben 2056 translog.WriteLine(getNowString() + " " + msgString);
304 torben 2051 mqMgr.Commit();
305     statusData.counter++;
306     }
307     else
308     {
309     mqMgr.Backout();
310     isContinue = false;
311     }
312    
313 torben 2011 }
314     else
315     {
316 torben 2051 System.Console.WriteLine("Non-text message");
317 torben 2011 }
318 torben 1986 }
319 torben 2051 catch (MQException mqe)
320 torben 1986 {
321 torben 2051 isContinue = false;
322 torben 1986
323 torben 2051 // report reason, if any
324     if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
325     {
326     // special report for normal end
327     System.Console.WriteLine("no more messages");
328     }
329     else
330     {
331     // general report for other reasons
332     System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
333     statusData.lastrunOk = false;
334     }
335    
336 torben 2011 }
337    
338 torben 2051
339 torben 1986 }
340    
341 torben 2051 }
342 torben 2011
343 torben 2051 }
344     catch (Exception e)
345     {
346 torben 2055 //Det vil være mest korrekt at Rollback/backout MQ Transaktionen her - for at være sikker på at Message'n fjernes fra køen
347     try
348     {
349     if (mqMgr != null)
350     {
351     mqMgr.Backout();
352     }
353     }
354     catch (Exception e2)
355     {
356     this.addLogEntry("Error backing out from MQ Transaction: " + e2.Message);
357     }
358    
359 torben 2051 statusData.lastrunOk = false;
360    
361     statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
362     Console.WriteLine(statusData.lastErrorMessage);
363     Console.WriteLine(e.StackTrace);
364     EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
365     }
366     finally
367     {
368    
369     if (in_queue != null && in_queue.IsOpen)
370     {
371     try
372     {
373     in_queue.Close();
374     }
375     catch (Exception e)
376     {
377     Console.WriteLine("Error cleaning up qmgr " + e.Message);
378     }
379 torben 1986 }
380 torben 2051
381     if (mqMgr != null && mqMgr.IsOpen)
382     {
383     try
384     {
385     mqMgr.Close();
386     } catch (Exception e) {
387     Console.WriteLine("Error cleaning up qmgr " + e.Message);
388     }
389     }
390 torben 1986
391     }
392     }
393    
394     private string buildMysqlConnString()
395     {
396     string connectionString = "";
397    
398     connectionString += "SERVER=" + controller.mysqlHost + ";";
399     //connectionString += "DATABASE=" + controller.mysqlHost + ";";
400     connectionString += "UID=" + controller.mysqlUser + ";";
401     connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
402 torben 2051 connectionString += "Max Pool Size=20;";
403     //connectionString += "ConnectionReset=true;";//ConnectionReset kræver muligvis at Default Database er angivet - det virker ihvertfald ikke uden
404 torben 1986
405     return connectionString;
406     }
407    
408     private Hashtable getConnectionProperties()
409     {
410     Hashtable connProperties = new Hashtable();
411     connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
412     connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);
413     connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel); //WARNING: Hardcoded Channel Value !!!
414     return connProperties;
415     }
416    
417 torben 1999
418    
419     private string getLogFilename(LogfileType type)
420 torben 1986 {
421    
422     DateTime now = DateTime.Now;
423     string filename = controller.logDirectory + "\\";
424 torben 1999
425 torben 2057 //Find uge nr
426     DateTimeFormatInfo dfi = DateTimeFormatInfo.CurrentInfo;
427     Calendar myCal = CultureInfo.InvariantCulture.Calendar;//System.Globalization
428     int week = myCal.GetWeekOfYear(now, dfi.CalendarWeekRule, dfi.FirstDayOfWeek);
429    
430 torben 1999 switch (type)
431     {
432     case LogfileType.LogEvents:
433     filename += "eventlog_";
434     break;
435    
436     case LogfileType.LogTransactions:
437     filename += "transactionlog_";
438     break;
439 torben 2058 case LogfileType.LogDiscarded:
440     filename += "discardedlog_";
441     break;
442 torben 1999 }
443    
444    
445 torben 2057 filename += name + "_" + now.Year.ToString("D4") + "_W" + week.ToString("D2") + ".log";
446 torben 1986
447     return filename;
448     }
449    
450     public string getNowString()
451     {
452     DateTime now = DateTime.Now;
453    
454     return now.ToString("s");
455     }
456    
457     private void addLogEntry(string msg)
458     {
459     msg = getNowString() + " " + msg;
460     lock (logEntries)
461     {
462     logEntries.AddFirst(msg);
463    
464     if (logEntries.Count > 20)
465     {
466     logEntries.RemoveLast();
467 torben 1999 }
468 torben 1986 }
469 torben 1999
470     string filename = getLogFilename(LogfileType.LogEvents);
471     using (StreamWriter eventlog = new StreamWriter(filename, true))
472     {
473     eventlog.WriteLine(msg);
474     }
475 torben 1986 }
476    
477     public string[] getLog()
478     {
479     lock(logEntries)
480     {
481     List<string> tmpEntries = new List<string>();
482     foreach (string s in logEntries)
483     {
484     tmpEntries.Add(s);
485     }
486     return tmpEntries.ToArray();
487     }
488     }
489    
490     }
491     }

  ViewVC Help
Powered by ViewVC 1.1.20