/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Contents of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2050 - (show annotations) (download)
Fri Aug 23 17:07:25 2013 UTC (10 years, 9 months ago) by torben
File size: 14174 byte(s)
also dump stacktrace to CLI / console
1 using System;
2 using System.Collections;
3 using System.Collections.Generic;
4 using System.IO;
5
6 using System.Diagnostics;
7
8 using IBM.WMQ;
9 using MySql.Data.MySqlClient;
10
11 namespace DaoMqPump2
12 {
13 public class Transport
14 {
15
16 enum LogfileType {
17 LogTransactions,
18 LogEvents
19 }
20
21 public static string SQL2MQ = "sql2mq";
22 public static string MQ2SQL = "mq2sql";
23
24 //private bool enabled;
25
26 TransportController controller;
27
28 StatusData statusData = new StatusData();
29
30 public string name { get; private set; }
31 public string direction { get; private set; }
32 public string queueName { get; private set; }
33 public string mq2sqlInsertQuery { get; private set; }
34 public string sql2mqReadQuery { get; private set; }
35 public string sql2mqUpdateQuery { get; private set; }
36
37 //public bool lastrunOk { get; private set; }
38 //public string lastErrorMessage { get; private set; }
39
40 //public string lastOkTime { get; private set; }
41 //public string lastErrorTime { get; private set; }
42 //public string lastTransferTime { get; private set; }
43
44 //public int counter { get; private set; }
45
46 public StatusData TransportStatusData
47 {
48 get
49 {
50 return this.statusData;
51 }
52 }
53
54
55 public bool Enabled
56 {
57 get {
58 return statusData.transportEnabled;
59 }
60 set
61 {
62 statusData.transportEnabled = value;
63 if (value == true)
64 {
65 this.addLogEntry("Transport enabled");
66 }
67 else
68 {
69 this.addLogEntry("Transport disabled");
70 }
71 }
72 }
73
74
75 private LinkedList<string> logEntries = new LinkedList<string>();
76
77
78 public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)
79 {
80 this.controller = controller;
81 this.name = name;
82 this.direction = direction;
83 this.queueName = queueName;
84 this.mq2sqlInsertQuery = mq2sqlInsertQuery;
85 this.sql2mqReadQuery = sql2mqReadQuery;
86 this.sql2mqUpdateQuery = sql2mqUpdateQuery;
87
88 statusData.transportEnabled = enabled;
89
90
91 statusData.lastrunOk = true;
92 statusData.counter = 0;
93 statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";
94
95 addLogEntry( "Starting ... " );
96 }
97
98 ~Transport()
99 {
100 addLogEntry("Stopping ... ");
101 }
102
103
104 public void transportMessages()
105 {
106 if (statusData.transportEnabled == false)
107 return;
108
109 Console.WriteLine(name + " -> transportMessages() ");
110 statusData.lastrunOk = true;
111
112 int startCounter = statusData.counter;
113
114 if (direction == SQL2MQ)
115 {
116 transportSql2Mq();
117 }
118 else
119 {
120 transportMq2Sql();
121 }
122
123 if (statusData.lastrunOk == true)
124 {
125 statusData.lastOkTime = getNowString();
126
127 if (statusData.counter != startCounter)
128 {
129 //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet
130 statusData.lastTransferTime = getNowString();
131 }
132 }
133 else
134 {
135 addLogEntry(statusData.lastErrorMessage);
136 statusData.lastErrorTime = getNowString();
137 }
138 }
139
140 private void transportSql2Mq()
141 {
142 string filename = getLogFilename(LogfileType.LogTransactions);
143 using (StreamWriter translog = new StreamWriter(filename, true) )
144 try
145 {
146 //MQ Options
147 Hashtable connProps = getConnectionProperties();
148 int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
149
150 //MySQL Options
151 string mysqlString = buildMysqlConnString();
152
153
154 using (MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps))//stage 1 connect to mq
155 using (MQQueue out_queue = mqMgr.AccessQueue(queueName, openOptions))
156 using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
157 using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
158 {
159 sqlReadConnection.Open();
160 sqlWriteConnection.Open();
161
162 //stage 3 move messages
163 string readSql = "CALL " + sql2mqReadQuery + "()";
164 MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);
165 MySqlDataReader dataReader = readCmd.ExecuteReader();
166 while (dataReader.Read())
167 {
168 int id = dataReader.GetInt32(0);
169 string msgString = dataReader.GetString(1);
170
171 MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
172 // same as MQPMO_DEFAULT
173
174 MQMessage msg = new MQMessage();
175 msg.Format = MQC.MQFMT_STRING;
176 msg.CharacterSet = 1252;
177 msg.WriteString(msgString);
178
179 out_queue.Put(msg, pmo);
180
181 //now that the message has been put on queue mark it as such
182
183 string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
184 MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
185 int numrows = updateCmd.ExecuteNonQuery();
186
187 translog.WriteLine(getNowString() + " " + msgString + "\n");
188
189 if (numrows != 1)
190 {
191 break;
192 }
193 statusData.counter++;
194 }
195
196 }
197 }
198 catch (Exception e)
199 {
200 statusData.lastrunOk = false;
201 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
202 Console.WriteLine(statusData.lastErrorMessage);
203 Console.WriteLine(e.StackTrace);
204 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
205 }
206 }
207
208 private void transportMq2Sql()
209 {
210 string filename = getLogFilename(LogfileType.LogTransactions);
211 using (StreamWriter translog = new StreamWriter(filename, true))
212 try
213 {
214 //MQ options
215 Hashtable connProps = getConnectionProperties();
216 int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
217
218 //MySQL options
219 string mysqlString = buildMysqlConnString();
220
221 using (MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps))//stage 1 connect to mq
222 using (MQQueue in_queue = mqMgr.AccessQueue(queueName, openOptions) )
223 using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
224 {
225
226 sqlConnection.Open();
227
228
229 //stage 3 move messages
230 bool isContinue = true;
231 while (isContinue)
232 {
233
234 MQMessage mqMsg = new MQMessage();
235 MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
236
237 mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
238
239 try
240 {
241 in_queue.Get(mqMsg, mqGetMsgOpts);
242 if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
243 {
244 string msgString = mqMsg.ReadString(mqMsg.MessageLength);
245 System.Console.WriteLine(msgString);
246
247 string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
248
249 MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
250 int numrows = sqlcmd.ExecuteNonQuery();
251
252 if (numrows == 1)
253 {
254 translog.WriteLine(getNowString() + " " + msgString + "\n");
255 mqMgr.Commit();
256 statusData.counter++;
257 }
258 else
259 {
260 mqMgr.Backout();
261 isContinue = false;
262 }
263
264 }
265 else
266 {
267 System.Console.WriteLine("Non-text message");
268 }
269 }
270 catch (MQException mqe)
271 {
272 isContinue = false;
273
274 // report reason, if any
275 if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
276 {
277 // special report for normal end
278 System.Console.WriteLine("no more messages");
279 }
280 else
281 {
282 // general report for other reasons
283 System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
284 statusData.lastrunOk = false;
285 }
286
287 }
288
289
290 }
291
292 }
293
294 }
295 catch (Exception e)
296 {
297 statusData.lastrunOk = false;
298
299 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
300 Console.WriteLine(statusData.lastErrorMessage);
301 Console.WriteLine(e.StackTrace);
302 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
303 }
304 }
305
306 private string buildMysqlConnString()
307 {
308 string connectionString = "";
309
310 connectionString += "SERVER=" + controller.mysqlHost + ";";
311 //connectionString += "DATABASE=" + controller.mysqlHost + ";";
312 connectionString += "UID=" + controller.mysqlUser + ";";
313 connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
314 //connectionString += "maximumpoolsize=10;";
315 //connectionString += "ConnectionReset=true;";
316
317 return connectionString;
318 }
319
320 private Hashtable getConnectionProperties()
321 {
322 Hashtable connProperties = new Hashtable();
323 connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
324 connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);
325 connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel); //WARNING: Hardcoded Channel Value !!!
326 return connProperties;
327 }
328
329
330
331 private string getLogFilename(LogfileType type)
332 {
333
334 DateTime now = DateTime.Now;
335 string filename = controller.logDirectory + "\\";
336
337 switch (type)
338 {
339 case LogfileType.LogEvents:
340 filename += "eventlog_";
341 break;
342
343 case LogfileType.LogTransactions:
344 filename += "transactionlog_";
345 break;
346 }
347
348
349 filename += name + "_" + now.Year.ToString("D4") + now.Month.ToString("D2") + ".log";
350
351 return filename;
352 }
353
354 public string getNowString()
355 {
356 DateTime now = DateTime.Now;
357
358 return now.ToString("s");
359 }
360
361 private void addLogEntry(string msg)
362 {
363 msg = getNowString() + " " + msg;
364 lock (logEntries)
365 {
366 logEntries.AddFirst(msg);
367
368 if (logEntries.Count > 20)
369 {
370 logEntries.RemoveLast();
371 }
372 }
373
374 string filename = getLogFilename(LogfileType.LogEvents);
375 using (StreamWriter eventlog = new StreamWriter(filename, true))
376 {
377 eventlog.WriteLine(msg);
378 }
379 }
380
381 public string[] getLog()
382 {
383 lock(logEntries)
384 {
385 List<string> tmpEntries = new List<string>();
386 foreach (string s in logEntries)
387 {
388 tmpEntries.Add(s);
389 }
390 return tmpEntries.ToArray();
391 }
392 }
393
394 }
395 }

  ViewVC Help
Powered by ViewVC 1.1.20