/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Annotation of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2057 - (hide annotations) (download)
Tue Aug 27 06:49:36 2013 UTC (10 years, 9 months ago) by torben
File size: 17372 byte(s)
Use a file per week instead of file per month
1 torben 1986 using System;
2     using System.Collections;
3     using System.Collections.Generic;
4     using System.IO;
5    
6     using System.Diagnostics;
7    
8     using IBM.WMQ;
9     using MySql.Data.MySqlClient;
10 torben 2057 using System.Globalization;
11 torben 1986
12     namespace DaoMqPump2
13     {
14     public class Transport
15     {
16 torben 1999
17     enum LogfileType {
18     LogTransactions,
19     LogEvents
20     }
21    
22 torben 1986 public static string SQL2MQ = "sql2mq";
23     public static string MQ2SQL = "mq2sql";
24    
25 torben 2001 //private bool enabled;
26 torben 1986
27     TransportController controller;
28    
29 torben 1999 StatusData statusData = new StatusData();
30    
31 torben 1986 public string name { get; private set; }
32     public string direction { get; private set; }
33     public string queueName { get; private set; }
34     public string mq2sqlInsertQuery { get; private set; }
35     public string sql2mqReadQuery { get; private set; }
36     public string sql2mqUpdateQuery { get; private set; }
37    
38 torben 1999 //public bool lastrunOk { get; private set; }
39     //public string lastErrorMessage { get; private set; }
40 torben 1986
41 torben 1999 //public string lastOkTime { get; private set; }
42     //public string lastErrorTime { get; private set; }
43     //public string lastTransferTime { get; private set; }
44 torben 1986
45 torben 1999 //public int counter { get; private set; }
46 torben 1986
47 torben 1999 public StatusData TransportStatusData
48     {
49     get
50     {
51     return this.statusData;
52     }
53     }
54    
55    
56     public bool Enabled
57     {
58     get {
59 torben 2001 return statusData.transportEnabled;
60 torben 1999 }
61     set
62     {
63 torben 2001 statusData.transportEnabled = value;
64 torben 1999 if (value == true)
65     {
66     this.addLogEntry("Transport enabled");
67     }
68     else
69     {
70     this.addLogEntry("Transport disabled");
71     }
72     }
73     }
74    
75    
76 torben 1986 private LinkedList<string> logEntries = new LinkedList<string>();
77    
78    
79     public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)
80     {
81     this.controller = controller;
82     this.name = name;
83     this.direction = direction;
84     this.queueName = queueName;
85     this.mq2sqlInsertQuery = mq2sqlInsertQuery;
86     this.sql2mqReadQuery = sql2mqReadQuery;
87     this.sql2mqUpdateQuery = sql2mqUpdateQuery;
88    
89 torben 2001 statusData.transportEnabled = enabled;
90 torben 1986
91 torben 1999
92     statusData.lastrunOk = true;
93     statusData.counter = 0;
94     statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";
95    
96 torben 1986 addLogEntry( "Starting ... " );
97     }
98    
99 torben 1999 ~Transport()
100     {
101     addLogEntry("Stopping ... ");
102     }
103    
104    
105 torben 1986 public void transportMessages()
106     {
107 torben 2001 if (statusData.transportEnabled == false)
108 torben 1986 return;
109    
110     Console.WriteLine(name + " -> transportMessages() ");
111 torben 1999 statusData.lastrunOk = true;
112 torben 1986
113 torben 1999 int startCounter = statusData.counter;
114    
115 torben 1986 if (direction == SQL2MQ)
116     {
117     transportSql2Mq();
118     }
119     else
120     {
121     transportMq2Sql();
122     }
123    
124 torben 1999 if (statusData.lastrunOk == true)
125 torben 1986 {
126 torben 1999 statusData.lastOkTime = getNowString();
127    
128     if (statusData.counter != startCounter)
129     {
130     //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet
131     statusData.lastTransferTime = getNowString();
132     }
133 torben 1986 }
134     else
135     {
136 torben 1999 addLogEntry(statusData.lastErrorMessage);
137     statusData.lastErrorTime = getNowString();
138 torben 1986 }
139     }
140    
141     private void transportSql2Mq()
142     {
143 torben 2051 MQQueueManager mqMgr = null;
144     MQQueue out_queue = null;
145    
146 torben 1999 string filename = getLogFilename(LogfileType.LogTransactions);
147 torben 1986 using (StreamWriter translog = new StreamWriter(filename, true) )
148     try
149     {
150 torben 2011 //MQ Options
151 torben 1986 Hashtable connProps = getConnectionProperties();
152     int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
153    
154 torben 2011 //MySQL Options
155 torben 1986 string mysqlString = buildMysqlConnString();
156    
157 torben 2051 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
158     mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
159     out_queue = mqMgr.AccessQueue(queueName, openOptions);
160 torben 2011 using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
161     using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
162     {
163     sqlReadConnection.Open();
164     sqlWriteConnection.Open();
165 torben 1986
166 torben 2011 //stage 3 move messages
167     string readSql = "CALL " + sql2mqReadQuery + "()";
168     MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);
169     MySqlDataReader dataReader = readCmd.ExecuteReader();
170     while (dataReader.Read())
171     {
172     int id = dataReader.GetInt32(0);
173     string msgString = dataReader.GetString(1);
174 torben 2010
175 torben 2011 MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
176     // same as MQPMO_DEFAULT
177 torben 1986
178 torben 2011 MQMessage msg = new MQMessage();
179     msg.Format = MQC.MQFMT_STRING;
180     msg.CharacterSet = 1252;
181     msg.WriteString(msgString);
182 torben 1986
183 torben 2011 out_queue.Put(msg, pmo);
184 torben 1986
185 torben 2011 //now that the message has been put on queue mark it as such
186 torben 1986
187 torben 2011 string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
188     MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
189     int numrows = updateCmd.ExecuteNonQuery();
190 torben 1986
191 torben 2056 translog.WriteLine(getNowString() + " " + msgString);
192 torben 1986
193 torben 2011 if (numrows != 1)
194     {
195     break;
196     }
197     statusData.counter++;
198     }
199 torben 1986
200     }
201     }
202     catch (Exception e)
203     {
204 torben 1999 statusData.lastrunOk = false;
205 torben 2048 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
206 torben 1999 Console.WriteLine(statusData.lastErrorMessage);
207 torben 2050 Console.WriteLine(e.StackTrace);
208 torben 1999 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
209 torben 1986 }
210 torben 2051 finally
211     {
212    
213     if (out_queue != null && out_queue.IsOpen)
214     {
215     try
216     {
217     out_queue.Close();
218     }
219     catch (Exception e)
220     {
221     Console.WriteLine("Error cleaning up qmgr " + e.Message);
222     }
223     }
224    
225     if (mqMgr != null && mqMgr.IsOpen)
226     {
227     try
228     {
229     mqMgr.Close();
230     }
231     catch (Exception e)
232     {
233     Console.WriteLine("Error cleaning up qmgr " + e.Message);
234     }
235     }
236    
237     }
238 torben 1986 }
239    
240     private void transportMq2Sql()
241     {
242 torben 2051 MQQueueManager mqMgr = null;
243     MQQueue in_queue = null;
244 torben 1999 string filename = getLogFilename(LogfileType.LogTransactions);
245 torben 1986 using (StreamWriter translog = new StreamWriter(filename, true))
246 torben 2051 try
247     {
248     //MQ options
249     Hashtable connProps = getConnectionProperties();
250     int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
251 torben 1986
252 torben 2051 //MySQL options
253     string mysqlString = buildMysqlConnString();
254 torben 1986
255 torben 2051 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
256     mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
257     in_queue = mqMgr.AccessQueue(queueName, openOptions);
258     using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
259     {
260 torben 1986
261 torben 2051 sqlConnection.Open();
262 torben 1986
263 torben 2011
264 torben 2051 //stage 3 move messages
265     bool isContinue = true;
266     while (isContinue)
267     {
268 torben 1986
269 torben 2051 MQMessage mqMsg = new MQMessage();
270     MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
271 torben 1986
272 torben 2051 mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
273 torben 1986
274 torben 2051 try
275 torben 1986 {
276 torben 2051 in_queue.Get(mqMsg, mqGetMsgOpts);
277     if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
278     {
279     string msgString = mqMsg.ReadString(mqMsg.MessageLength);
280     System.Console.WriteLine(msgString);
281 torben 2011
282 torben 2051 string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
283 torben 2011
284 torben 2051 MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
285     int numrows = sqlcmd.ExecuteNonQuery();
286 torben 2011
287 torben 2051 if (numrows == 1)
288     {
289 torben 2056 translog.WriteLine(getNowString() + " " + msgString);
290 torben 2051 mqMgr.Commit();
291     statusData.counter++;
292     }
293     else
294     {
295     mqMgr.Backout();
296     isContinue = false;
297     }
298    
299 torben 2011 }
300     else
301     {
302 torben 2051 System.Console.WriteLine("Non-text message");
303 torben 2011 }
304 torben 1986 }
305 torben 2051 catch (MQException mqe)
306 torben 1986 {
307 torben 2051 isContinue = false;
308 torben 1986
309 torben 2051 // report reason, if any
310     if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
311     {
312     // special report for normal end
313     System.Console.WriteLine("no more messages");
314     }
315     else
316     {
317     // general report for other reasons
318     System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
319     statusData.lastrunOk = false;
320     }
321    
322 torben 2011 }
323    
324 torben 2051
325 torben 1986 }
326    
327 torben 2051 }
328 torben 2011
329 torben 2051 }
330     catch (Exception e)
331     {
332 torben 2055 //Det vil være mest korrekt at Rollback/backout MQ Transaktionen her - for at være sikker på at Message'n fjernes fra køen
333     try
334     {
335     if (mqMgr != null)
336     {
337     mqMgr.Backout();
338     }
339     }
340     catch (Exception e2)
341     {
342     this.addLogEntry("Error backing out from MQ Transaction: " + e2.Message);
343     }
344    
345 torben 2051 statusData.lastrunOk = false;
346    
347     statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
348     Console.WriteLine(statusData.lastErrorMessage);
349     Console.WriteLine(e.StackTrace);
350     EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
351     }
352     finally
353     {
354    
355     if (in_queue != null && in_queue.IsOpen)
356     {
357     try
358     {
359     in_queue.Close();
360     }
361     catch (Exception e)
362     {
363     Console.WriteLine("Error cleaning up qmgr " + e.Message);
364     }
365 torben 1986 }
366 torben 2051
367     if (mqMgr != null && mqMgr.IsOpen)
368     {
369     try
370     {
371     mqMgr.Close();
372     } catch (Exception e) {
373     Console.WriteLine("Error cleaning up qmgr " + e.Message);
374     }
375     }
376 torben 1986
377     }
378     }
379    
380     private string buildMysqlConnString()
381     {
382     string connectionString = "";
383    
384     connectionString += "SERVER=" + controller.mysqlHost + ";";
385     //connectionString += "DATABASE=" + controller.mysqlHost + ";";
386     connectionString += "UID=" + controller.mysqlUser + ";";
387     connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
388 torben 2051 connectionString += "Max Pool Size=20;";
389     //connectionString += "ConnectionReset=true;";//ConnectionReset kræver muligvis at Default Database er angivet - det virker ihvertfald ikke uden
390 torben 1986
391     return connectionString;
392     }
393    
394     private Hashtable getConnectionProperties()
395     {
396     Hashtable connProperties = new Hashtable();
397     connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
398     connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);
399     connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel); //WARNING: Hardcoded Channel Value !!!
400     return connProperties;
401     }
402    
403 torben 1999
404    
405     private string getLogFilename(LogfileType type)
406 torben 1986 {
407    
408     DateTime now = DateTime.Now;
409     string filename = controller.logDirectory + "\\";
410 torben 1999
411 torben 2057 //Find uge nr
412     DateTimeFormatInfo dfi = DateTimeFormatInfo.CurrentInfo;
413     Calendar myCal = CultureInfo.InvariantCulture.Calendar;//System.Globalization
414     int week = myCal.GetWeekOfYear(now, dfi.CalendarWeekRule, dfi.FirstDayOfWeek);
415    
416 torben 1999 switch (type)
417     {
418     case LogfileType.LogEvents:
419     filename += "eventlog_";
420     break;
421    
422     case LogfileType.LogTransactions:
423     filename += "transactionlog_";
424     break;
425     }
426    
427    
428 torben 2057 filename += name + "_" + now.Year.ToString("D4") + "_W" + week.ToString("D2") + ".log";
429 torben 1986
430     return filename;
431     }
432    
433     public string getNowString()
434     {
435     DateTime now = DateTime.Now;
436    
437     return now.ToString("s");
438     }
439    
440     private void addLogEntry(string msg)
441     {
442     msg = getNowString() + " " + msg;
443     lock (logEntries)
444     {
445     logEntries.AddFirst(msg);
446    
447     if (logEntries.Count > 20)
448     {
449     logEntries.RemoveLast();
450 torben 1999 }
451 torben 1986 }
452 torben 1999
453     string filename = getLogFilename(LogfileType.LogEvents);
454     using (StreamWriter eventlog = new StreamWriter(filename, true))
455     {
456     eventlog.WriteLine(msg);
457     }
458 torben 1986 }
459    
460     public string[] getLog()
461     {
462     lock(logEntries)
463     {
464     List<string> tmpEntries = new List<string>();
465     foreach (string s in logEntries)
466     {
467     tmpEntries.Add(s);
468     }
469     return tmpEntries.ToArray();
470     }
471     }
472    
473     }
474     }

  ViewVC Help
Powered by ViewVC 1.1.20