/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Contents of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2057 - (show annotations) (download)
Tue Aug 27 06:49:36 2013 UTC (10 years, 8 months ago) by torben
File size: 17372 byte(s)
Use a file per week instead of file per month
1 using System;
2 using System.Collections;
3 using System.Collections.Generic;
4 using System.IO;
5
6 using System.Diagnostics;
7
8 using IBM.WMQ;
9 using MySql.Data.MySqlClient;
10 using System.Globalization;
11
12 namespace DaoMqPump2
13 {
14 public class Transport
15 {
16
17 enum LogfileType {
18 LogTransactions,
19 LogEvents
20 }
21
22 public static string SQL2MQ = "sql2mq";
23 public static string MQ2SQL = "mq2sql";
24
25 //private bool enabled;
26
27 TransportController controller;
28
29 StatusData statusData = new StatusData();
30
31 public string name { get; private set; }
32 public string direction { get; private set; }
33 public string queueName { get; private set; }
34 public string mq2sqlInsertQuery { get; private set; }
35 public string sql2mqReadQuery { get; private set; }
36 public string sql2mqUpdateQuery { get; private set; }
37
38 //public bool lastrunOk { get; private set; }
39 //public string lastErrorMessage { get; private set; }
40
41 //public string lastOkTime { get; private set; }
42 //public string lastErrorTime { get; private set; }
43 //public string lastTransferTime { get; private set; }
44
45 //public int counter { get; private set; }
46
47 public StatusData TransportStatusData
48 {
49 get
50 {
51 return this.statusData;
52 }
53 }
54
55
56 public bool Enabled
57 {
58 get {
59 return statusData.transportEnabled;
60 }
61 set
62 {
63 statusData.transportEnabled = value;
64 if (value == true)
65 {
66 this.addLogEntry("Transport enabled");
67 }
68 else
69 {
70 this.addLogEntry("Transport disabled");
71 }
72 }
73 }
74
75
76 private LinkedList<string> logEntries = new LinkedList<string>();
77
78
79 public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)
80 {
81 this.controller = controller;
82 this.name = name;
83 this.direction = direction;
84 this.queueName = queueName;
85 this.mq2sqlInsertQuery = mq2sqlInsertQuery;
86 this.sql2mqReadQuery = sql2mqReadQuery;
87 this.sql2mqUpdateQuery = sql2mqUpdateQuery;
88
89 statusData.transportEnabled = enabled;
90
91
92 statusData.lastrunOk = true;
93 statusData.counter = 0;
94 statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";
95
96 addLogEntry( "Starting ... " );
97 }
98
99 ~Transport()
100 {
101 addLogEntry("Stopping ... ");
102 }
103
104
105 public void transportMessages()
106 {
107 if (statusData.transportEnabled == false)
108 return;
109
110 Console.WriteLine(name + " -> transportMessages() ");
111 statusData.lastrunOk = true;
112
113 int startCounter = statusData.counter;
114
115 if (direction == SQL2MQ)
116 {
117 transportSql2Mq();
118 }
119 else
120 {
121 transportMq2Sql();
122 }
123
124 if (statusData.lastrunOk == true)
125 {
126 statusData.lastOkTime = getNowString();
127
128 if (statusData.counter != startCounter)
129 {
130 //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet
131 statusData.lastTransferTime = getNowString();
132 }
133 }
134 else
135 {
136 addLogEntry(statusData.lastErrorMessage);
137 statusData.lastErrorTime = getNowString();
138 }
139 }
140
141 private void transportSql2Mq()
142 {
143 MQQueueManager mqMgr = null;
144 MQQueue out_queue = null;
145
146 string filename = getLogFilename(LogfileType.LogTransactions);
147 using (StreamWriter translog = new StreamWriter(filename, true) )
148 try
149 {
150 //MQ Options
151 Hashtable connProps = getConnectionProperties();
152 int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
153
154 //MySQL Options
155 string mysqlString = buildMysqlConnString();
156
157 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
158 mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
159 out_queue = mqMgr.AccessQueue(queueName, openOptions);
160 using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
161 using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
162 {
163 sqlReadConnection.Open();
164 sqlWriteConnection.Open();
165
166 //stage 3 move messages
167 string readSql = "CALL " + sql2mqReadQuery + "()";
168 MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);
169 MySqlDataReader dataReader = readCmd.ExecuteReader();
170 while (dataReader.Read())
171 {
172 int id = dataReader.GetInt32(0);
173 string msgString = dataReader.GetString(1);
174
175 MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
176 // same as MQPMO_DEFAULT
177
178 MQMessage msg = new MQMessage();
179 msg.Format = MQC.MQFMT_STRING;
180 msg.CharacterSet = 1252;
181 msg.WriteString(msgString);
182
183 out_queue.Put(msg, pmo);
184
185 //now that the message has been put on queue mark it as such
186
187 string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
188 MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
189 int numrows = updateCmd.ExecuteNonQuery();
190
191 translog.WriteLine(getNowString() + " " + msgString);
192
193 if (numrows != 1)
194 {
195 break;
196 }
197 statusData.counter++;
198 }
199
200 }
201 }
202 catch (Exception e)
203 {
204 statusData.lastrunOk = false;
205 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
206 Console.WriteLine(statusData.lastErrorMessage);
207 Console.WriteLine(e.StackTrace);
208 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
209 }
210 finally
211 {
212
213 if (out_queue != null && out_queue.IsOpen)
214 {
215 try
216 {
217 out_queue.Close();
218 }
219 catch (Exception e)
220 {
221 Console.WriteLine("Error cleaning up qmgr " + e.Message);
222 }
223 }
224
225 if (mqMgr != null && mqMgr.IsOpen)
226 {
227 try
228 {
229 mqMgr.Close();
230 }
231 catch (Exception e)
232 {
233 Console.WriteLine("Error cleaning up qmgr " + e.Message);
234 }
235 }
236
237 }
238 }
239
240 private void transportMq2Sql()
241 {
242 MQQueueManager mqMgr = null;
243 MQQueue in_queue = null;
244 string filename = getLogFilename(LogfileType.LogTransactions);
245 using (StreamWriter translog = new StreamWriter(filename, true))
246 try
247 {
248 //MQ options
249 Hashtable connProps = getConnectionProperties();
250 int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
251
252 //MySQL options
253 string mysqlString = buildMysqlConnString();
254
255 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
256 mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
257 in_queue = mqMgr.AccessQueue(queueName, openOptions);
258 using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
259 {
260
261 sqlConnection.Open();
262
263
264 //stage 3 move messages
265 bool isContinue = true;
266 while (isContinue)
267 {
268
269 MQMessage mqMsg = new MQMessage();
270 MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
271
272 mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
273
274 try
275 {
276 in_queue.Get(mqMsg, mqGetMsgOpts);
277 if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
278 {
279 string msgString = mqMsg.ReadString(mqMsg.MessageLength);
280 System.Console.WriteLine(msgString);
281
282 string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
283
284 MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
285 int numrows = sqlcmd.ExecuteNonQuery();
286
287 if (numrows == 1)
288 {
289 translog.WriteLine(getNowString() + " " + msgString);
290 mqMgr.Commit();
291 statusData.counter++;
292 }
293 else
294 {
295 mqMgr.Backout();
296 isContinue = false;
297 }
298
299 }
300 else
301 {
302 System.Console.WriteLine("Non-text message");
303 }
304 }
305 catch (MQException mqe)
306 {
307 isContinue = false;
308
309 // report reason, if any
310 if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
311 {
312 // special report for normal end
313 System.Console.WriteLine("no more messages");
314 }
315 else
316 {
317 // general report for other reasons
318 System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
319 statusData.lastrunOk = false;
320 }
321
322 }
323
324
325 }
326
327 }
328
329 }
330 catch (Exception e)
331 {
332 //Det vil være mest korrekt at Rollback/backout MQ Transaktionen her - for at være sikker på at Message'n fjernes fra køen
333 try
334 {
335 if (mqMgr != null)
336 {
337 mqMgr.Backout();
338 }
339 }
340 catch (Exception e2)
341 {
342 this.addLogEntry("Error backing out from MQ Transaction: " + e2.Message);
343 }
344
345 statusData.lastrunOk = false;
346
347 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
348 Console.WriteLine(statusData.lastErrorMessage);
349 Console.WriteLine(e.StackTrace);
350 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
351 }
352 finally
353 {
354
355 if (in_queue != null && in_queue.IsOpen)
356 {
357 try
358 {
359 in_queue.Close();
360 }
361 catch (Exception e)
362 {
363 Console.WriteLine("Error cleaning up qmgr " + e.Message);
364 }
365 }
366
367 if (mqMgr != null && mqMgr.IsOpen)
368 {
369 try
370 {
371 mqMgr.Close();
372 } catch (Exception e) {
373 Console.WriteLine("Error cleaning up qmgr " + e.Message);
374 }
375 }
376
377 }
378 }
379
380 private string buildMysqlConnString()
381 {
382 string connectionString = "";
383
384 connectionString += "SERVER=" + controller.mysqlHost + ";";
385 //connectionString += "DATABASE=" + controller.mysqlHost + ";";
386 connectionString += "UID=" + controller.mysqlUser + ";";
387 connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
388 connectionString += "Max Pool Size=20;";
389 //connectionString += "ConnectionReset=true;";//ConnectionReset kræver muligvis at Default Database er angivet - det virker ihvertfald ikke uden
390
391 return connectionString;
392 }
393
394 private Hashtable getConnectionProperties()
395 {
396 Hashtable connProperties = new Hashtable();
397 connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
398 connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);
399 connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel); //WARNING: Hardcoded Channel Value !!!
400 return connProperties;
401 }
402
403
404
405 private string getLogFilename(LogfileType type)
406 {
407
408 DateTime now = DateTime.Now;
409 string filename = controller.logDirectory + "\\";
410
411 //Find uge nr
412 DateTimeFormatInfo dfi = DateTimeFormatInfo.CurrentInfo;
413 Calendar myCal = CultureInfo.InvariantCulture.Calendar;//System.Globalization
414 int week = myCal.GetWeekOfYear(now, dfi.CalendarWeekRule, dfi.FirstDayOfWeek);
415
416 switch (type)
417 {
418 case LogfileType.LogEvents:
419 filename += "eventlog_";
420 break;
421
422 case LogfileType.LogTransactions:
423 filename += "transactionlog_";
424 break;
425 }
426
427
428 filename += name + "_" + now.Year.ToString("D4") + "_W" + week.ToString("D2") + ".log";
429
430 return filename;
431 }
432
433 public string getNowString()
434 {
435 DateTime now = DateTime.Now;
436
437 return now.ToString("s");
438 }
439
440 private void addLogEntry(string msg)
441 {
442 msg = getNowString() + " " + msg;
443 lock (logEntries)
444 {
445 logEntries.AddFirst(msg);
446
447 if (logEntries.Count > 20)
448 {
449 logEntries.RemoveLast();
450 }
451 }
452
453 string filename = getLogFilename(LogfileType.LogEvents);
454 using (StreamWriter eventlog = new StreamWriter(filename, true))
455 {
456 eventlog.WriteLine(msg);
457 }
458 }
459
460 public string[] getLog()
461 {
462 lock(logEntries)
463 {
464 List<string> tmpEntries = new List<string>();
465 foreach (string s in logEntries)
466 {
467 tmpEntries.Add(s);
468 }
469 return tmpEntries.ToArray();
470 }
471 }
472
473 }
474 }

  ViewVC Help
Powered by ViewVC 1.1.20