/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Contents of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2055 - (show annotations) (download)
Tue Aug 27 06:28:44 2013 UTC (10 years, 8 months ago) by torben
File size: 17073 byte(s)
Backout from MQ Transaction on exception
1 using System;
2 using System.Collections;
3 using System.Collections.Generic;
4 using System.IO;
5
6 using System.Diagnostics;
7
8 using IBM.WMQ;
9 using MySql.Data.MySqlClient;
10
11 namespace DaoMqPump2
12 {
13 public class Transport
14 {
15
16 enum LogfileType {
17 LogTransactions,
18 LogEvents
19 }
20
21 public static string SQL2MQ = "sql2mq";
22 public static string MQ2SQL = "mq2sql";
23
24 //private bool enabled;
25
26 TransportController controller;
27
28 StatusData statusData = new StatusData();
29
30 public string name { get; private set; }
31 public string direction { get; private set; }
32 public string queueName { get; private set; }
33 public string mq2sqlInsertQuery { get; private set; }
34 public string sql2mqReadQuery { get; private set; }
35 public string sql2mqUpdateQuery { get; private set; }
36
37 //public bool lastrunOk { get; private set; }
38 //public string lastErrorMessage { get; private set; }
39
40 //public string lastOkTime { get; private set; }
41 //public string lastErrorTime { get; private set; }
42 //public string lastTransferTime { get; private set; }
43
44 //public int counter { get; private set; }
45
46 public StatusData TransportStatusData
47 {
48 get
49 {
50 return this.statusData;
51 }
52 }
53
54
55 public bool Enabled
56 {
57 get {
58 return statusData.transportEnabled;
59 }
60 set
61 {
62 statusData.transportEnabled = value;
63 if (value == true)
64 {
65 this.addLogEntry("Transport enabled");
66 }
67 else
68 {
69 this.addLogEntry("Transport disabled");
70 }
71 }
72 }
73
74
75 private LinkedList<string> logEntries = new LinkedList<string>();
76
77
78 public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)
79 {
80 this.controller = controller;
81 this.name = name;
82 this.direction = direction;
83 this.queueName = queueName;
84 this.mq2sqlInsertQuery = mq2sqlInsertQuery;
85 this.sql2mqReadQuery = sql2mqReadQuery;
86 this.sql2mqUpdateQuery = sql2mqUpdateQuery;
87
88 statusData.transportEnabled = enabled;
89
90
91 statusData.lastrunOk = true;
92 statusData.counter = 0;
93 statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";
94
95 addLogEntry( "Starting ... " );
96 }
97
98 ~Transport()
99 {
100 addLogEntry("Stopping ... ");
101 }
102
103
104 public void transportMessages()
105 {
106 if (statusData.transportEnabled == false)
107 return;
108
109 Console.WriteLine(name + " -> transportMessages() ");
110 statusData.lastrunOk = true;
111
112 int startCounter = statusData.counter;
113
114 if (direction == SQL2MQ)
115 {
116 transportSql2Mq();
117 }
118 else
119 {
120 transportMq2Sql();
121 }
122
123 if (statusData.lastrunOk == true)
124 {
125 statusData.lastOkTime = getNowString();
126
127 if (statusData.counter != startCounter)
128 {
129 //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet
130 statusData.lastTransferTime = getNowString();
131 }
132 }
133 else
134 {
135 addLogEntry(statusData.lastErrorMessage);
136 statusData.lastErrorTime = getNowString();
137 }
138 }
139
140 private void transportSql2Mq()
141 {
142 MQQueueManager mqMgr = null;
143 MQQueue out_queue = null;
144
145 string filename = getLogFilename(LogfileType.LogTransactions);
146 using (StreamWriter translog = new StreamWriter(filename, true) )
147 try
148 {
149 //MQ Options
150 Hashtable connProps = getConnectionProperties();
151 int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
152
153 //MySQL Options
154 string mysqlString = buildMysqlConnString();
155
156 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
157 mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
158 out_queue = mqMgr.AccessQueue(queueName, openOptions);
159 using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
160 using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
161 {
162 sqlReadConnection.Open();
163 sqlWriteConnection.Open();
164
165 //stage 3 move messages
166 string readSql = "CALL " + sql2mqReadQuery + "()";
167 MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);
168 MySqlDataReader dataReader = readCmd.ExecuteReader();
169 while (dataReader.Read())
170 {
171 int id = dataReader.GetInt32(0);
172 string msgString = dataReader.GetString(1);
173
174 MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
175 // same as MQPMO_DEFAULT
176
177 MQMessage msg = new MQMessage();
178 msg.Format = MQC.MQFMT_STRING;
179 msg.CharacterSet = 1252;
180 msg.WriteString(msgString);
181
182 out_queue.Put(msg, pmo);
183
184 //now that the message has been put on queue mark it as such
185
186 string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
187 MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
188 int numrows = updateCmd.ExecuteNonQuery();
189
190 translog.WriteLine(getNowString() + " " + msgString + "\n");
191
192 if (numrows != 1)
193 {
194 break;
195 }
196 statusData.counter++;
197 }
198
199 }
200 }
201 catch (Exception e)
202 {
203 statusData.lastrunOk = false;
204 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
205 Console.WriteLine(statusData.lastErrorMessage);
206 Console.WriteLine(e.StackTrace);
207 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
208 }
209 finally
210 {
211
212 if (out_queue != null && out_queue.IsOpen)
213 {
214 try
215 {
216 out_queue.Close();
217 }
218 catch (Exception e)
219 {
220 Console.WriteLine("Error cleaning up qmgr " + e.Message);
221 }
222 }
223
224 if (mqMgr != null && mqMgr.IsOpen)
225 {
226 try
227 {
228 mqMgr.Close();
229 }
230 catch (Exception e)
231 {
232 Console.WriteLine("Error cleaning up qmgr " + e.Message);
233 }
234 }
235
236 }
237 }
238
239 private void transportMq2Sql()
240 {
241 MQQueueManager mqMgr = null;
242 MQQueue in_queue = null;
243 string filename = getLogFilename(LogfileType.LogTransactions);
244 using (StreamWriter translog = new StreamWriter(filename, true))
245 try
246 {
247 //MQ options
248 Hashtable connProps = getConnectionProperties();
249 int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
250
251 //MySQL options
252 string mysqlString = buildMysqlConnString();
253
254 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
255 mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
256 in_queue = mqMgr.AccessQueue(queueName, openOptions);
257 using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
258 {
259
260 sqlConnection.Open();
261
262
263 //stage 3 move messages
264 bool isContinue = true;
265 while (isContinue)
266 {
267
268 MQMessage mqMsg = new MQMessage();
269 MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
270
271 mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
272
273 try
274 {
275 in_queue.Get(mqMsg, mqGetMsgOpts);
276 if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
277 {
278 string msgString = mqMsg.ReadString(mqMsg.MessageLength);
279 System.Console.WriteLine(msgString);
280
281 string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
282
283 MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
284 int numrows = sqlcmd.ExecuteNonQuery();
285
286 if (numrows == 1)
287 {
288 translog.WriteLine(getNowString() + " " + msgString + "\n");
289 mqMgr.Commit();
290 statusData.counter++;
291 }
292 else
293 {
294 mqMgr.Backout();
295 isContinue = false;
296 }
297
298 }
299 else
300 {
301 System.Console.WriteLine("Non-text message");
302 }
303 }
304 catch (MQException mqe)
305 {
306 isContinue = false;
307
308 // report reason, if any
309 if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
310 {
311 // special report for normal end
312 System.Console.WriteLine("no more messages");
313 }
314 else
315 {
316 // general report for other reasons
317 System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
318 statusData.lastrunOk = false;
319 }
320
321 }
322
323
324 }
325
326 }
327
328 }
329 catch (Exception e)
330 {
331 //Det vil være mest korrekt at Rollback/backout MQ Transaktionen her - for at være sikker på at Message'n fjernes fra køen
332 try
333 {
334 if (mqMgr != null)
335 {
336 mqMgr.Backout();
337 }
338 }
339 catch (Exception e2)
340 {
341 this.addLogEntry("Error backing out from MQ Transaction: " + e2.Message);
342 }
343
344 statusData.lastrunOk = false;
345
346 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
347 Console.WriteLine(statusData.lastErrorMessage);
348 Console.WriteLine(e.StackTrace);
349 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
350 }
351 finally
352 {
353
354 if (in_queue != null && in_queue.IsOpen)
355 {
356 try
357 {
358 in_queue.Close();
359 }
360 catch (Exception e)
361 {
362 Console.WriteLine("Error cleaning up qmgr " + e.Message);
363 }
364 }
365
366 if (mqMgr != null && mqMgr.IsOpen)
367 {
368 try
369 {
370 mqMgr.Close();
371 } catch (Exception e) {
372 Console.WriteLine("Error cleaning up qmgr " + e.Message);
373 }
374 }
375
376 }
377 }
378
379 private string buildMysqlConnString()
380 {
381 string connectionString = "";
382
383 connectionString += "SERVER=" + controller.mysqlHost + ";";
384 //connectionString += "DATABASE=" + controller.mysqlHost + ";";
385 connectionString += "UID=" + controller.mysqlUser + ";";
386 connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
387 connectionString += "Max Pool Size=20;";
388 //connectionString += "ConnectionReset=true;";//ConnectionReset kræver muligvis at Default Database er angivet - det virker ihvertfald ikke uden
389
390 return connectionString;
391 }
392
393 private Hashtable getConnectionProperties()
394 {
395 Hashtable connProperties = new Hashtable();
396 connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
397 connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);
398 connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel); //WARNING: Hardcoded Channel Value !!!
399 return connProperties;
400 }
401
402
403
404 private string getLogFilename(LogfileType type)
405 {
406
407 DateTime now = DateTime.Now;
408 string filename = controller.logDirectory + "\\";
409
410 switch (type)
411 {
412 case LogfileType.LogEvents:
413 filename += "eventlog_";
414 break;
415
416 case LogfileType.LogTransactions:
417 filename += "transactionlog_";
418 break;
419 }
420
421
422 filename += name + "_" + now.Year.ToString("D4") + now.Month.ToString("D2") + ".log";
423
424 return filename;
425 }
426
427 public string getNowString()
428 {
429 DateTime now = DateTime.Now;
430
431 return now.ToString("s");
432 }
433
434 private void addLogEntry(string msg)
435 {
436 msg = getNowString() + " " + msg;
437 lock (logEntries)
438 {
439 logEntries.AddFirst(msg);
440
441 if (logEntries.Count > 20)
442 {
443 logEntries.RemoveLast();
444 }
445 }
446
447 string filename = getLogFilename(LogfileType.LogEvents);
448 using (StreamWriter eventlog = new StreamWriter(filename, true))
449 {
450 eventlog.WriteLine(msg);
451 }
452 }
453
454 public string[] getLog()
455 {
456 lock(logEntries)
457 {
458 List<string> tmpEntries = new List<string>();
459 foreach (string s in logEntries)
460 {
461 tmpEntries.Add(s);
462 }
463 return tmpEntries.ToArray();
464 }
465 }
466
467 }
468 }

  ViewVC Help
Powered by ViewVC 1.1.20