/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Contents of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2136 - (show annotations) (download)
Wed Mar 26 14:00:14 2014 UTC (10 years, 2 months ago) by torben
File size: 22103 byte(s)
in transportMq2Sql() - only move 10000 messages per run to give the other transports a chance
1 using System;
2 using System.Collections;
3 using System.Collections.Generic;
4 using System.IO;
5
6 using System.Diagnostics;
7
8 using IBM.WMQ;
9 using MySql.Data.MySqlClient;
10 using System.Globalization;
11
12 namespace DaoMqPump2
13 {
14 public class Transport
15 {
16
17 enum LogfileType {
18 LogTransactions,
19 LogEvents,
20 LogDiscarded
21 }
22
23 public static string SQL2MQ = "sql2mq";
24 public static string MQ2SQL = "mq2sql";
25
26 //private bool enabled;
27
28 TransportController controller;
29
30 StatusData statusData = new StatusData();
31
32 public string name { get; private set; }
33 public string direction { get; private set; }
34 public string queueName { get; private set; }
35 public string mq2sqlInsertQuery { get; private set; }
36 public string sql2mqReadQuery { get; private set; }
37 public string sql2mqUpdateQuery { get; private set; }
38
39 //public bool lastrunOk { get; private set; }
40 //public string lastErrorMessage { get; private set; }
41
42 //public string lastOkTime { get; private set; }
43 //public string lastErrorTime { get; private set; }
44 //public string lastTransferTime { get; private set; }
45
46 //public int counter { get; private set; }
47
48 public StatusData TransportStatusData
49 {
50 get
51 {
52 return this.statusData;
53 }
54 }
55
56
57 public bool Enabled
58 {
59 get {
60 return statusData.transportEnabled;
61 }
62 set
63 {
64 statusData.transportEnabled = value;
65 if (value == true)
66 {
67 this.addLogEntry("Transport enabled");
68 }
69 else
70 {
71 this.addLogEntry("Transport disabled");
72 }
73 }
74 }
75
76
77 private LinkedList<string> logEntries = new LinkedList<string>();
78
79
80 public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)
81 {
82 this.controller = controller;
83 this.name = name;
84 this.direction = direction;
85 this.queueName = queueName;
86 this.mq2sqlInsertQuery = mq2sqlInsertQuery;
87 this.sql2mqReadQuery = sql2mqReadQuery;
88 this.sql2mqUpdateQuery = sql2mqUpdateQuery;
89
90 statusData.transportEnabled = enabled;
91
92
93 statusData.lastrunOk = true;
94 statusData.counter = 0;
95 statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";
96
97 addLogEntry( "Starting ... " );
98 }
99
100 ~Transport()
101 {
102 addLogEntry("Stopping ... ");
103 }
104
105
106 public void transportMessages()
107 {
108 if (statusData.transportEnabled == false)
109 return;
110
111 Console.WriteLine(name + " -> transportMessages() ");
112 statusData.lastrunOk = true;
113
114 int startCounter = statusData.counter;
115
116 if (direction == SQL2MQ)
117 {
118 transportSql2Mq();
119 }
120 else
121 {
122 transportMq2Sql();
123 }
124
125 if (statusData.lastrunOk == true)
126 {
127 statusData.lastOkTime = getNowString();
128
129 if (statusData.counter != startCounter)
130 {
131 //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet
132 statusData.lastTransferTime = getNowString();
133 }
134 }
135 else
136 {
137 addLogEntry(statusData.lastErrorMessage);
138 statusData.lastErrorTime = getNowString();
139 }
140 }
141
142 private void transportSql2Mq()
143 {
144 MQQueueManager mqMgr = null;
145 MQQueue out_queue = null;
146
147 string filename = getLogFilename(LogfileType.LogTransactions);
148 using (StreamWriter translog = new StreamWriter(filename, true) )
149 try
150 {
151 //MQ Options
152 Hashtable connProps = getConnectionProperties();
153 int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
154
155 //MySQL Options
156 string mysqlString = buildMysqlConnString();
157
158 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
159 mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
160 out_queue = mqMgr.AccessQueue(queueName, openOptions);
161 using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
162 using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
163 {
164 sqlReadConnection.Open();
165 sqlWriteConnection.Open();
166
167 //stage 3 move messages
168 string readSql = "CALL " + sql2mqReadQuery + "()";
169 MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);
170 MySqlDataReader dataReader = readCmd.ExecuteReader();
171 while (dataReader.Read())
172 {
173 int id = dataReader.GetInt32(0);
174 string msgString = dataReader.GetString(1);
175
176 MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
177 // same as MQPMO_DEFAULT
178
179 MQMessage msg = new MQMessage();
180 msg.Format = MQC.MQFMT_STRING;
181 msg.CharacterSet = 1252;
182 msg.WriteString(msgString);
183
184 out_queue.Put(msg, pmo);
185
186 //now that the message has been put on queue mark it as such
187
188 string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
189 MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
190 int numrows = updateCmd.ExecuteNonQuery();
191
192 translog.WriteLine(getNowString() + " " + msgString);
193
194 if (numrows != 1)
195 {
196 break;
197 }
198 statusData.counter++;
199 }
200
201 }
202 }
203 catch (Exception e)
204 {
205 statusData.lastrunOk = false;
206 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
207 Console.WriteLine(statusData.lastErrorMessage);
208 Console.WriteLine(e.StackTrace);
209 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
210 }
211 finally
212 {
213
214 if (out_queue != null && out_queue.IsOpen)
215 {
216 try
217 {
218 out_queue.Close();
219 }
220 catch (Exception e)
221 {
222 Console.WriteLine("Error cleaning up qmgr " + e.Message);
223 }
224 }
225
226 if (mqMgr != null && mqMgr.IsOpen)
227 {
228 try
229 {
230 mqMgr.Close();
231 }
232 catch (Exception e)
233 {
234 Console.WriteLine("Error cleaning up qmgr " + e.Message);
235 }
236 }
237
238 }
239 }
240
241 private void transportMq2Sql()
242 {
243 int messageCount = 0;
244
245 MQQueueManager mqMgr = null;
246 MQQueue in_queue = null;
247 string filename = getLogFilename(LogfileType.LogTransactions);
248 using (StreamWriter translog = new StreamWriter(filename, true))
249 try
250 {
251 //MQ options
252 Hashtable connProps = getConnectionProperties();
253 int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
254
255 //MySQL options
256 string mysqlString = buildMysqlConnString();
257
258 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
259 mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
260 in_queue = mqMgr.AccessQueue(queueName, openOptions);
261 using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
262 {
263
264 sqlConnection.Open();
265
266
267 //stage 3 move messages
268 bool isContinue = true;
269 while (isContinue)
270 {
271
272 MQMessage mqMsg = new MQMessage();
273 MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
274
275 mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
276
277 try
278 {
279 in_queue.Get(mqMsg, mqGetMsgOpts);
280 if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
281 {
282 string msgString = mqMsg.ReadString(mqMsg.MessageLength);
283 //System.Console.WriteLine(msgString);
284
285
286 // Hvis transaktionen starter med et ? er det ikke en gyldig transaktion
287 // validér ligeledes at headeren er gyldig
288 if ( msgString.StartsWith("?") || validateSalt2Header(msgString) == false )
289 {
290 string discarded_filename = getLogFilename(LogfileType.LogDiscarded);
291 using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))
292 {
293 discardedlog.WriteLine(getNowString() + " " + msgString);
294 }
295 mqMgr.Commit();//fjern den afviste transaktion fra køen
296 statusData.discardedCounter++;
297 continue; //gå frem til at tage næste transaktion fra køen
298 }
299
300
301 string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
302
303 MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
304 int numrows = sqlcmd.ExecuteNonQuery();
305
306 if (numrows == 1)
307 {
308 translog.WriteLine(getNowString() + " " + msgString);
309 mqMgr.Commit();
310 statusData.counter++;
311
312
313 messageCount++;// increment per run message counter
314 if (messageCount >= 10000) // if we have moved 10000 messages in this run - let it go and give the other transports a change
315 {
316 isContinue = false;
317 }
318
319
320
321 }
322 else
323 {
324 mqMgr.Backout();
325 isContinue = false;
326 }
327
328 }
329 else
330 {
331 System.Console.WriteLine("Non-text message");
332 }
333 }
334 catch (MQException mqe)
335 {
336 isContinue = false;
337
338 // report reason, if any
339 if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
340 {
341 // special report for normal end
342 System.Console.WriteLine("no more messages");
343 }
344 else
345 {
346 // general report for other reasons
347 System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
348 statusData.lastrunOk = false;
349 }
350
351 }
352
353
354 }
355
356 }
357
358 }
359 catch (Exception e)
360 {
361 //Det vil være mest korrekt at Rollback/backout MQ Transaktionen her - for at være sikker på at Message'n fjernes fra køen
362 try
363 {
364 if (mqMgr != null)
365 {
366 mqMgr.Backout();
367 }
368 }
369 catch (Exception e2)
370 {
371 this.addLogEntry("Error backing out from MQ Transaction: " + e2.Message);
372 }
373
374 statusData.lastrunOk = false;
375
376 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
377 Console.WriteLine(statusData.lastErrorMessage);
378 Console.WriteLine(e.StackTrace);
379 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
380 }
381 finally
382 {
383
384 if (in_queue != null && in_queue.IsOpen)
385 {
386 try
387 {
388 in_queue.Close();
389 }
390 catch (Exception e)
391 {
392 Console.WriteLine("Error cleaning up qmgr " + e.Message);
393 }
394 }
395
396 if (mqMgr != null && mqMgr.IsOpen)
397 {
398 try
399 {
400 mqMgr.Close();
401 } catch (Exception e) {
402 Console.WriteLine("Error cleaning up qmgr " + e.Message);
403 }
404 }
405
406 }
407 }
408
409 private string buildMysqlConnString()
410 {
411 string connectionString = "";
412
413 connectionString += "SERVER=" + controller.mysqlHost + ";";
414 //connectionString += "DATABASE=" + controller.mysqlHost + ";";
415 connectionString += "UID=" + controller.mysqlUser + ";";
416 connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
417 connectionString += "Max Pool Size=20;";
418 //connectionString += "ConnectionReset=true;";//ConnectionReset kræver muligvis at Default Database er angivet - det virker ihvertfald ikke uden
419
420 return connectionString;
421 }
422
423 private Hashtable getConnectionProperties()
424 {
425 Hashtable connProperties = new Hashtable();
426 connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
427 connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);
428 connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel);
429 return connProperties;
430 }
431
432
433
434 private string getLogFilename(LogfileType type)
435 {
436
437 DateTime now = DateTime.Now;
438 string filename = controller.logDirectory + "\\";
439
440 //Find uge nr
441 DateTimeFormatInfo dfi = DateTimeFormatInfo.CurrentInfo;
442 Calendar myCal = CultureInfo.InvariantCulture.Calendar;//System.Globalization
443 int week = myCal.GetWeekOfYear(now, dfi.CalendarWeekRule, dfi.FirstDayOfWeek);
444
445 switch (type)
446 {
447 case LogfileType.LogEvents:
448 filename += "eventlog_";
449 break;
450
451 case LogfileType.LogTransactions:
452 filename += "transactionlog_";
453 break;
454 case LogfileType.LogDiscarded:
455 filename += "discardedlog_";
456 break;
457 }
458
459
460 filename += name + "_" + now.Year.ToString("D4") + "_W" + week.ToString("D2") + ".log";
461
462 return filename;
463 }
464
465 public string getNowString()
466 {
467 DateTime now = DateTime.Now;
468
469 return now.ToString("s");
470 }
471
472 /* no used any where used added here for reference/ just in case */
473 private void sendErrorMail(string sub, string msg)
474 {
475 System.Net.Mail.MailMessage message = new System.Net.Mail.MailMessage();
476 message.To.Add("thn@daoas.dk");
477 message.Subject = "Error from DaoMqPump2: " + sub;
478 message.From = new System.Net.Mail.MailAddress("no-reply@daoas.dk");
479 message.Body = msg;
480
481 System.Net.Mail.SmtpClient smtp = new System.Net.Mail.SmtpClient("mail.dao.int");
482 smtp.Send(message);
483 }
484
485 private bool validateSalt2Header(string salt2String)
486 {
487 if (salt2String.Length < 66)
488 {
489 addLogEntry("Transaction too short - discarding");
490 return false;
491 }
492
493
494 int result;
495 long result_long;
496
497 string afsender = salt2String.Substring(0, 5);
498 string modtager = salt2String.Substring(5, 5);
499 string afsenderTegnSaet = salt2String.Substring(10, 6);
500 string standardNavn = salt2String.Substring(16, 6);
501 string standardVersion = salt2String.Substring(22, 3);
502 string afsenderSekvensnr = salt2String.Substring(25, 6);
503 string afsenderTidsstempel = salt2String.Substring(31, 14);
504 string afsenderBakkeIdent = salt2String.Substring(45, 5);
505 string modtagerBakkeIdent = salt2String.Substring(50, 5);
506 string transaktionForkortelse = salt2String.Substring(55, 4);
507 string transaktionsLaengde = salt2String.Substring(59, 5);
508 string prioritet = salt2String.Substring(64, 1);
509
510
511
512 if (int.TryParse(standardVersion.Trim(), out result) == false) // standardVersion _skal_ være en int
513 {
514 addLogEntry("standardVersion not an integer, discarding");
515 return false;
516 }
517
518 if (int.TryParse(afsenderSekvensnr.Trim(), out result) == false) // afsenderSekvensnr _skal_ være en int
519 {
520 addLogEntry("afsenderSekvensnr not an integer, discarding");
521 return false;
522 }
523
524 if (long.TryParse(afsenderTidsstempel.Trim(), out result_long) == false) // afsenderTidsstempel _skal_ være en long
525 {
526 addLogEntry("afsenderTidsstempel not a long integer, discarding");
527 return false;
528 }
529
530 if (int.TryParse(transaktionsLaengde.Trim(), out result) == false) // transaktionsLaengde _skal_ være en int
531 {
532 addLogEntry("transaktionsLaengde not an integer, discarding");
533 return false;
534 }
535
536 if ( int.TryParse(prioritet.Trim(), out result) == false ) // prioritet _skal_ være en int
537 {
538 addLogEntry("prioritet not an integer, discarding");
539 return false;
540 }
541
542 return true;
543 }
544
545 private void addLogEntry(string msg)
546 {
547 msg = getNowString() + " " + msg;
548 lock (logEntries)
549 {
550 logEntries.AddFirst(msg);
551
552 if (logEntries.Count > 20)
553 {
554 logEntries.RemoveLast();
555 }
556 }
557
558 string filename = getLogFilename(LogfileType.LogEvents);
559 using (StreamWriter eventlog = new StreamWriter(filename, true))
560 {
561 eventlog.WriteLine(msg);
562 }
563 }
564
565 public string[] getLog()
566 {
567 lock(logEntries)
568 {
569 List<string> tmpEntries = new List<string>();
570 foreach (string s in logEntries)
571 {
572 tmpEntries.Add(s);
573 }
574 return tmpEntries.ToArray();
575 }
576 }
577
578 }
579 }

  ViewVC Help
Powered by ViewVC 1.1.20