/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Contents of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2166 - (show annotations) (download)
Fri May 16 18:24:05 2014 UTC (10 years ago) by torben
File size: 21437 byte(s)
Svn:ignore
1 using System;
2 using System.Collections;
3 using System.Collections.Generic;
4 using System.IO;
5
6 using System.Diagnostics;
7
8 using IBM.WMQ;
9 using MySql.Data.MySqlClient;
10 using System.Globalization;
11
12 using DaoCommon;
13
14 namespace DaoMqPump2
15 {
16 public class Transport
17 {
18
19 enum LogfileType {
20 LogTransactions,
21 LogEvents,
22 LogDiscarded
23 }
24
25 public static string SQL2MQ = "sql2mq";
26 public static string MQ2SQL = "mq2sql";
27
28 //private bool enabled;
29
30 TransportController controller;
31
32 StatusData statusData = new StatusData();
33
34 public string name { get; private set; }
35 public string direction { get; private set; }
36 public string queueName { get; private set; }
37 public string mq2sqlInsertQuery { get; private set; }
38 public string sql2mqReadQuery { get; private set; }
39 public string sql2mqUpdateQuery { get; private set; }
40
41 //public bool lastrunOk { get; private set; }
42 //public string lastErrorMessage { get; private set; }
43
44 //public string lastOkTime { get; private set; }
45 //public string lastErrorTime { get; private set; }
46 //public string lastTransferTime { get; private set; }
47
48 //public int counter { get; private set; }
49
50 public StatusData TransportStatusData
51 {
52 get
53 {
54 return this.statusData;
55 }
56 }
57
58
59 public bool Enabled
60 {
61 get {
62 return statusData.transportEnabled;
63 }
64 set
65 {
66 statusData.transportEnabled = value;
67 if (value == true)
68 {
69 this.addLogEntry("Transport enabled");
70 }
71 else
72 {
73 this.addLogEntry("Transport disabled");
74 }
75 }
76 }
77
78
79 private LinkedList<string> logEntries = new LinkedList<string>();
80
81
82 public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)
83 {
84 this.controller = controller;
85 this.name = name;
86 this.direction = direction;
87 this.queueName = queueName;
88 this.mq2sqlInsertQuery = mq2sqlInsertQuery;
89 this.sql2mqReadQuery = sql2mqReadQuery;
90 this.sql2mqUpdateQuery = sql2mqUpdateQuery;
91
92 statusData.transportEnabled = enabled;
93
94
95 statusData.lastrunOk = true;
96 statusData.counter = 0;
97 statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";
98
99 addLogEntry( "Starting ... " );
100 }
101
102 ~Transport()
103 {
104 addLogEntry("Stopping ... ");
105 }
106
107
108 public void transportMessages()
109 {
110 if (statusData.transportEnabled == false)
111 return;
112
113 Console.WriteLine(name + " -> transportMessages() ");
114 statusData.lastrunOk = true;
115
116 int startCounter = statusData.counter;
117
118 if (direction == SQL2MQ)
119 {
120 transportSql2Mq();
121 }
122 else
123 {
124 transportMq2Sql();
125 }
126
127 if (statusData.lastrunOk == true)
128 {
129 statusData.lastOkTime = DaoUtil.getNowString();
130
131 if (statusData.counter != startCounter)
132 {
133 //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet
134 statusData.lastTransferTime = DaoUtil.getNowString();
135 }
136 }
137 else
138 {
139 addLogEntry(statusData.lastErrorMessage);
140 statusData.lastErrorTime = DaoUtil.getNowString();
141 }
142 }
143
144 private void transportSql2Mq()
145 {
146 MQQueueManager mqMgr = null;
147 MQQueue out_queue = null;
148
149 string filename = getLogFilename(LogfileType.LogTransactions);
150 using (StreamWriter translog = new StreamWriter(filename, true) )
151 try
152 {
153 //MQ Options
154 Hashtable connProps = getConnectionProperties();
155 int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
156
157 //MySQL Options
158 string mysqlString = buildMysqlConnString();
159
160 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
161 mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
162 out_queue = mqMgr.AccessQueue(queueName, openOptions);
163 using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
164 using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
165 {
166 sqlReadConnection.Open();
167 sqlWriteConnection.Open();
168
169 //stage 3 move messages
170 string readSql = "CALL " + sql2mqReadQuery + "()";
171 MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);
172 MySqlDataReader dataReader = readCmd.ExecuteReader();
173 while (dataReader.Read())
174 {
175 int id = dataReader.GetInt32(0);
176 string msgString = dataReader.GetString(1);
177
178 MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
179 // same as MQPMO_DEFAULT
180
181 MQMessage msg = new MQMessage();
182 msg.Format = MQC.MQFMT_STRING;
183 msg.CharacterSet = 1252;
184 msg.WriteString(msgString);
185
186 out_queue.Put(msg, pmo);
187
188 //now that the message has been put on queue mark it as such
189
190 string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
191 MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
192 int numrows = updateCmd.ExecuteNonQuery();
193
194 translog.WriteLine(DaoUtil.getNowString() + " " + msgString);
195
196 if (numrows != 1)
197 {
198 break;
199 }
200 statusData.counter++;
201 }
202
203 }
204 }
205 catch (Exception e)
206 {
207 statusData.lastrunOk = false;
208 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
209 Console.WriteLine(statusData.lastErrorMessage);
210 Console.WriteLine(e.StackTrace);
211 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
212 }
213 finally
214 {
215
216 if (out_queue != null && out_queue.IsOpen)
217 {
218 try
219 {
220 out_queue.Close();
221 }
222 catch (Exception e)
223 {
224 Console.WriteLine("Error cleaning up qmgr " + e.Message);
225 }
226 }
227
228 if (mqMgr != null && mqMgr.IsOpen)
229 {
230 try
231 {
232 mqMgr.Close();
233 }
234 catch (Exception e)
235 {
236 Console.WriteLine("Error cleaning up qmgr " + e.Message);
237 }
238 }
239
240 }
241 }
242
243 private void transportMq2Sql()
244 {
245 int messageCount = 0;
246
247 MQQueueManager mqMgr = null;
248 MQQueue in_queue = null;
249 string filename = getLogFilename(LogfileType.LogTransactions);
250 using (StreamWriter translog = new StreamWriter(filename, true))
251 try
252 {
253 //MQ options
254 Hashtable connProps = getConnectionProperties();
255 int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
256
257 //MySQL options
258 string mysqlString = buildMysqlConnString();
259
260 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
261 mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
262 in_queue = mqMgr.AccessQueue(queueName, openOptions);
263 using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
264 {
265
266 sqlConnection.Open();
267
268
269 //stage 3 move messages
270 bool isContinue = true;
271 while (isContinue)
272 {
273
274 MQMessage mqMsg = new MQMessage();
275 MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
276
277 mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
278
279 try
280 {
281 in_queue.Get(mqMsg, mqGetMsgOpts);
282 if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
283 {
284 string msgString = mqMsg.ReadString(mqMsg.MessageLength);
285 //System.Console.WriteLine(msgString);
286
287
288 // Hvis transaktionen starter med et ? er det ikke en gyldig transaktion
289 // validér ligeledes at headeren er gyldig
290 if ( msgString.StartsWith("?") || validateSalt2Header(msgString) == false )
291 {
292 string discarded_filename = getLogFilename(LogfileType.LogDiscarded);
293 using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))
294 {
295 discardedlog.WriteLine( DaoUtil.getNowString() + " " + msgString );
296 }
297 mqMgr.Commit();//fjern den afviste transaktion fra køen
298 statusData.discardedCounter++;
299 continue; //gå frem til at tage næste transaktion fra køen
300 }
301
302
303 string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
304
305 MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
306 int numrows = sqlcmd.ExecuteNonQuery();
307
308 if (numrows == 1)
309 {
310 translog.WriteLine( DaoUtil.getNowString() + " " + msgString );
311 mqMgr.Commit();
312 statusData.counter++;
313
314
315 messageCount++;// increment per run message counter
316 if (messageCount >= 10000) // if we have moved 10000 messages in this run - let it go and give the other transports a change
317 {
318 isContinue = false;
319 }
320
321
322
323 }
324 else
325 {
326 mqMgr.Backout();
327 isContinue = false;
328 }
329
330 }
331 else
332 {
333 System.Console.WriteLine("Non-text message");
334 }
335 }
336 catch (MQException mqe)
337 {
338 isContinue = false;
339
340 // report reason, if any
341 if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
342 {
343 // special report for normal end
344 System.Console.WriteLine("no more messages");
345 }
346 else
347 {
348 // general report for other reasons
349 System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
350 statusData.lastrunOk = false;
351 }
352
353 }
354
355
356 }
357
358 }
359
360 }
361 catch (Exception e)
362 {
363 //Det vil være mest korrekt at Rollback/backout MQ Transaktionen her - for at være sikker på at Message'n fjernes fra køen
364 try
365 {
366 if (mqMgr != null)
367 {
368 mqMgr.Backout();
369 }
370 }
371 catch (Exception e2)
372 {
373 this.addLogEntry("Error backing out from MQ Transaction: " + e2.Message);
374 }
375
376 statusData.lastrunOk = false;
377
378 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
379 Console.WriteLine(statusData.lastErrorMessage);
380 Console.WriteLine(e.StackTrace);
381 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
382 }
383 finally
384 {
385
386 if (in_queue != null && in_queue.IsOpen)
387 {
388 try
389 {
390 in_queue.Close();
391 }
392 catch (Exception e)
393 {
394 Console.WriteLine("Error cleaning up qmgr " + e.Message);
395 }
396 }
397
398 if (mqMgr != null && mqMgr.IsOpen)
399 {
400 try
401 {
402 mqMgr.Close();
403 } catch (Exception e) {
404 Console.WriteLine("Error cleaning up qmgr " + e.Message);
405 }
406 }
407
408 }
409 }
410
411 private string buildMysqlConnString()
412 {
413 string connectionString = "";
414
415 connectionString += "SERVER=" + controller.mysqlHost + ";";
416 //connectionString += "DATABASE=" + controller.mysqlHost + ";";
417 connectionString += "UID=" + controller.mysqlUser + ";";
418 connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
419 connectionString += "Max Pool Size=20;";
420 //connectionString += "ConnectionReset=true;";//ConnectionReset kræver muligvis at Default Database er angivet - det virker ihvertfald ikke uden
421
422 return connectionString;
423 }
424
425 private Hashtable getConnectionProperties()
426 {
427 Hashtable connProperties = new Hashtable();
428 connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
429 connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);
430 connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel);
431 return connProperties;
432 }
433
434
435
436 private string getLogFilename(LogfileType type)
437 {
438
439 DateTime now = DateTime.Now;
440 string filename = controller.logDirectory + "\\";
441
442 //Find uge nr
443 DateTimeFormatInfo dfi = DateTimeFormatInfo.CurrentInfo;
444 Calendar myCal = CultureInfo.InvariantCulture.Calendar;//System.Globalization
445 int week = myCal.GetWeekOfYear(now, dfi.CalendarWeekRule, dfi.FirstDayOfWeek);
446
447 switch (type)
448 {
449 case LogfileType.LogEvents:
450 filename += "eventlog_";
451 break;
452
453 case LogfileType.LogTransactions:
454 filename += "transactionlog_";
455 break;
456 case LogfileType.LogDiscarded:
457 filename += "discardedlog_";
458 break;
459 }
460
461
462 filename += name + "_" + now.Year.ToString("D4") + "_W" + week.ToString("D2") + ".log";
463
464 return filename;
465 }
466
467 private bool validateSalt2Header(string salt2String)
468 {
469 if (salt2String.Length < 66)
470 {
471 addLogEntry("Transaction too short - discarding");
472 return false;
473 }
474
475
476 int result;
477 long result_long;
478
479 string afsender = salt2String.Substring(0, 5);
480 string modtager = salt2String.Substring(5, 5);
481 string afsenderTegnSaet = salt2String.Substring(10, 6);
482 string standardNavn = salt2String.Substring(16, 6);
483 string standardVersion = salt2String.Substring(22, 3);
484 string afsenderSekvensnr = salt2String.Substring(25, 6);
485 string afsenderTidsstempel = salt2String.Substring(31, 14);
486 string afsenderBakkeIdent = salt2String.Substring(45, 5);
487 string modtagerBakkeIdent = salt2String.Substring(50, 5);
488 string transaktionForkortelse = salt2String.Substring(55, 4);
489 string transaktionsLaengde = salt2String.Substring(59, 5);
490 string prioritet = salt2String.Substring(64, 1);
491
492
493
494 if (int.TryParse(standardVersion.Trim(), out result) == false) // standardVersion _skal_ være en int
495 {
496 addLogEntry("standardVersion not an integer, discarding");
497 return false;
498 }
499
500 if (int.TryParse(afsenderSekvensnr.Trim(), out result) == false) // afsenderSekvensnr _skal_ være en int
501 {
502 addLogEntry("afsenderSekvensnr not an integer, discarding");
503 return false;
504 }
505
506 if (long.TryParse(afsenderTidsstempel.Trim(), out result_long) == false) // afsenderTidsstempel _skal_ være en long
507 {
508 addLogEntry("afsenderTidsstempel not a long integer, discarding");
509 return false;
510 }
511
512 if (int.TryParse(transaktionsLaengde.Trim(), out result) == false) // transaktionsLaengde _skal_ være en int
513 {
514 addLogEntry("transaktionsLaengde not an integer, discarding");
515 return false;
516 }
517
518 if ( int.TryParse(prioritet.Trim(), out result) == false ) // prioritet _skal_ være en int
519 {
520 addLogEntry("prioritet not an integer, discarding");
521 return false;
522 }
523
524 return true;
525 }
526
527 private void addLogEntry(string msg)
528 {
529 msg = DaoUtil.getNowString() + " " + msg;
530 lock (logEntries)
531 {
532 logEntries.AddFirst(msg);
533
534 if (logEntries.Count > 20)
535 {
536 logEntries.RemoveLast();
537 }
538 }
539
540 string filename = getLogFilename(LogfileType.LogEvents);
541 using (StreamWriter eventlog = new StreamWriter(filename, true))
542 {
543 eventlog.WriteLine(msg);
544 }
545 }
546
547 public string[] getLog()
548 {
549 lock(logEntries)
550 {
551 List<string> tmpEntries = new List<string>();
552 foreach (string s in logEntries)
553 {
554 tmpEntries.Add(s);
555 }
556 return tmpEntries.ToArray();
557 }
558 }
559
560 }
561 }

  ViewVC Help
Powered by ViewVC 1.1.20