/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Contents of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2169 - (show annotations) (download)
Fri May 16 21:10:02 2014 UTC (10 years ago) by torben
File size: 17087 byte(s)
WIP
1 using System;
2 using System.Collections;
3 using System.Collections.Generic;
4 using System.IO;
5
6 using System.Diagnostics;
7
8 using IBM.WMQ;
9 using MySql.Data.MySqlClient;
10 using System.Globalization;
11
12 using DaoCommon;
13
14 namespace DaoMqPump2
15 {
16 public class Transport
17 {
18
19 public static string SQL2MQ = "sql2mq";
20 public static string MQ2SQL = "mq2sql";
21
22 //private bool enabled;
23
24 TransportController controller;
25
26 StatusData statusData = new StatusData();
27
28 public Logfile logfile { get; private set; }
29
30 public string name { get; private set; }
31 public string direction { get; private set; }
32 public string queueName { get; private set; }
33 public string mq2sqlInsertQuery { get; private set; }
34 public string sql2mqReadQuery { get; private set; }
35 public string sql2mqUpdateQuery { get; private set; }
36
37 //public bool lastrunOk { get; private set; }
38 //public string lastErrorMessage { get; private set; }
39
40 //public string lastOkTime { get; private set; }
41 //public string lastErrorTime { get; private set; }
42 //public string lastTransferTime { get; private set; }
43
44 //public int counter { get; private set; }
45
46 public StatusData TransportStatusData
47 {
48 get
49 {
50 return this.statusData;
51 }
52 }
53
54
55 public bool Enabled
56 {
57 get {
58 return statusData.transportEnabled;
59 }
60 set
61 {
62 statusData.transportEnabled = value;
63 if (value == true)
64 {
65 logfile.addSingleLogEntry("Transport enabled");
66 }
67 else
68 {
69 logfile.addSingleLogEntry("Transport disabled");
70 }
71 }
72 }
73
74
75
76
77
78 public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)
79 {
80 this.controller = controller;
81 this.name = name;
82 this.direction = direction;
83 this.queueName = queueName;
84 this.mq2sqlInsertQuery = mq2sqlInsertQuery;
85 this.sql2mqReadQuery = sql2mqReadQuery;
86 this.sql2mqUpdateQuery = sql2mqUpdateQuery;
87
88 statusData.transportEnabled = enabled;
89
90
91 statusData.lastrunOk = true;
92 statusData.counter = 0;
93 statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";
94
95 logfile = new Logfile(name, controller.logDirectory);
96 logfile.addSingleLogEntry("Starting ... ");
97 }
98
99 ~Transport()
100 {
101 logfile.addSingleLogEntry("Stopping ... ");
102 }
103
104
105 public void transportMessages()
106 {
107 if (statusData.transportEnabled == false)
108 return;
109
110 Console.WriteLine(name + " -> transportMessages() ");
111 statusData.lastrunOk = true;
112
113 int startCounter = statusData.counter;
114
115 if (direction == SQL2MQ)
116 {
117 transportSql2Mq();
118 }
119 else
120 {
121 transportMq2Sql();
122 }
123
124 if (statusData.lastrunOk == true)
125 {
126 statusData.lastOkTime = Logfile.getNowString();
127
128 if (statusData.counter != startCounter)
129 {
130 //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet
131 statusData.lastTransferTime = Logfile.getNowString();
132 }
133 }
134 else
135 {
136 logfile.addSingleLogEntry(statusData.lastErrorMessage);
137 statusData.lastErrorTime = Logfile.getNowString();
138 }
139 }
140
141 private void transportSql2Mq()
142 {
143 MQQueueManager mqMgr = null;
144 MQQueue out_queue = null;
145
146 string filename = Logfile.getLogFilename(LogfileType.LogTransactions, controller.logDirectory, name);
147 using (StreamWriter translog = new StreamWriter(filename, true) )
148 try
149 {
150 //MQ Options
151 Hashtable connProps = getConnectionProperties();
152 int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
153
154 //MySQL Options
155 string mysqlString = buildMysqlConnString();
156
157 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
158 mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
159 out_queue = mqMgr.AccessQueue(queueName, openOptions);
160 using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
161 using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
162 {
163 sqlReadConnection.Open();
164 sqlWriteConnection.Open();
165
166 //stage 3 move messages
167 string readSql = "CALL " + sql2mqReadQuery + "()";
168 MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);
169 MySqlDataReader dataReader = readCmd.ExecuteReader();
170 while (dataReader.Read())
171 {
172 int id = dataReader.GetInt32(0);
173 string msgString = dataReader.GetString(1);
174
175 MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
176 // same as MQPMO_DEFAULT
177
178 MQMessage msg = new MQMessage();
179 msg.Format = MQC.MQFMT_STRING;
180 msg.CharacterSet = 1252;
181 msg.WriteString(msgString);
182
183 out_queue.Put(msg, pmo);
184
185 //now that the message has been put on queue mark it as such
186
187 string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
188 MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
189 int numrows = updateCmd.ExecuteNonQuery();
190
191 translog.WriteLine(Logfile.getNowString() + " " + msgString);
192
193 if (numrows != 1)
194 {
195 break;
196 }
197 statusData.counter++;
198 }
199
200 }
201 }
202 catch (Exception e)
203 {
204 statusData.lastrunOk = false;
205 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
206 Console.WriteLine(statusData.lastErrorMessage);
207 Console.WriteLine(e.StackTrace);
208 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
209 }
210 finally
211 {
212
213 if (out_queue != null && out_queue.IsOpen)
214 {
215 try
216 {
217 out_queue.Close();
218 }
219 catch (Exception e)
220 {
221 Console.WriteLine("Error cleaning up qmgr " + e.Message);
222 }
223 }
224
225 if (mqMgr != null && mqMgr.IsOpen)
226 {
227 try
228 {
229 mqMgr.Close();
230 }
231 catch (Exception e)
232 {
233 Console.WriteLine("Error cleaning up qmgr " + e.Message);
234 }
235 }
236
237 }
238 }
239
240 private void transportMq2Sql()
241 {
242 int messageCount = 0;
243
244 MQQueueManager mqMgr = null;
245 MQQueue in_queue = null;
246 string filename = Logfile.getLogFilename(LogfileType.LogTransactions, controller.logDirectory, name);
247 using (StreamWriter translog = new StreamWriter(filename, true))
248 try
249 {
250 //MQ options
251 Hashtable connProps = getConnectionProperties();
252 int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
253
254 //MySQL options
255 string mysqlString = buildMysqlConnString();
256
257 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
258 mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
259 in_queue = mqMgr.AccessQueue(queueName, openOptions);
260 using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
261 {
262
263 sqlConnection.Open();
264
265
266 //stage 3 move messages
267 bool isContinue = true;
268 while (isContinue)
269 {
270
271 MQMessage mqMsg = new MQMessage();
272 MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
273
274 mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
275
276 try
277 {
278 in_queue.Get(mqMsg, mqGetMsgOpts);
279 if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
280 {
281 string msgString = mqMsg.ReadString(mqMsg.MessageLength);
282 //System.Console.WriteLine(msgString);
283
284
285 // Hvis transaktionen starter med et ? er det ikke en gyldig transaktion
286 // validér ligeledes at headeren er gyldig
287 if ( msgString.StartsWith("?") || Salt2Helper.validateSalt2Header(msgString) == false )
288 {
289 string discarded_filename = Logfile.getLogFilename(LogfileType.LogTransactions, controller.logDirectory, name);
290 using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))
291 {
292 discardedlog.WriteLine(Logfile.getNowString() + " " + msgString);
293 }
294 mqMgr.Commit();//fjern den afviste transaktion fra køen
295 statusData.discardedCounter++;
296 continue; //gå frem til at tage næste transaktion fra køen
297 }
298
299
300 string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
301
302 MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
303 int numrows = sqlcmd.ExecuteNonQuery();
304
305 if (numrows == 1)
306 {
307 translog.WriteLine(Logfile.getNowString() + " " + msgString);
308 mqMgr.Commit();
309 statusData.counter++;
310
311
312 messageCount++;// increment per run message counter
313 if (messageCount >= 10000) // if we have moved 10000 messages in this run - let it go and give the other transports a change
314 {
315 isContinue = false;
316 }
317
318
319
320 }
321 else
322 {
323 mqMgr.Backout();
324 isContinue = false;
325 }
326
327 }
328 else
329 {
330 System.Console.WriteLine("Non-text message");
331 }
332 }
333 catch (MQException mqe)
334 {
335 isContinue = false;
336
337 // report reason, if any
338 if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
339 {
340 // special report for normal end
341 System.Console.WriteLine("no more messages");
342 }
343 else
344 {
345 // general report for other reasons
346 System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
347 statusData.lastrunOk = false;
348 }
349
350 }
351
352
353 }
354
355 }
356
357 }
358 catch (Exception e)
359 {
360 //Det vil være mest korrekt at Rollback/backout MQ Transaktionen her - for at være sikker på at Message'n fjernes fra køen
361 try
362 {
363 if (mqMgr != null)
364 {
365 mqMgr.Backout();
366 }
367 }
368 catch (Exception e2)
369 {
370 logfile.addSingleLogEntry("Error backing out from MQ Transaction: " + e2.Message);
371 }
372
373 statusData.lastrunOk = false;
374
375 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
376 Console.WriteLine(statusData.lastErrorMessage);
377 Console.WriteLine(e.StackTrace);
378 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
379 }
380 finally
381 {
382
383 if (in_queue != null && in_queue.IsOpen)
384 {
385 try
386 {
387 in_queue.Close();
388 }
389 catch (Exception e)
390 {
391 Console.WriteLine("Error cleaning up qmgr " + e.Message);
392 }
393 }
394
395 if (mqMgr != null && mqMgr.IsOpen)
396 {
397 try
398 {
399 mqMgr.Close();
400 } catch (Exception e) {
401 Console.WriteLine("Error cleaning up qmgr " + e.Message);
402 }
403 }
404
405 }
406 }
407
408 private string buildMysqlConnString()
409 {
410 string connectionString = "";
411
412 connectionString += "SERVER=" + controller.mysqlHost + ";";
413 //connectionString += "DATABASE=" + controller.mysqlHost + ";";
414 connectionString += "UID=" + controller.mysqlUser + ";";
415 connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
416 connectionString += "Max Pool Size=20;";
417 //connectionString += "ConnectionReset=true;";//ConnectionReset kræver muligvis at Default Database er angivet - det virker ihvertfald ikke uden
418
419 return connectionString;
420 }
421
422 private Hashtable getConnectionProperties()
423 {
424 Hashtable connProperties = new Hashtable();
425 connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
426 connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);
427 connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel);
428 return connProperties;
429 }
430
431
432 }
433 }

  ViewVC Help
Powered by ViewVC 1.1.20