/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Annotation of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2051 - (hide annotations) (download)
Fri Aug 23 20:03:47 2013 UTC (10 years, 9 months ago) by torben
File size: 16521 byte(s)
Websphere v6 mq objects does NOT implement idisposable and can not be used with "using" statement.
Set Max Pool Size=20 to avoid hogging all connections
1 torben 1986 using System;
2     using System.Collections;
3     using System.Collections.Generic;
4     using System.IO;
5    
6     using System.Diagnostics;
7    
8     using IBM.WMQ;
9     using MySql.Data.MySqlClient;
10    
11     namespace DaoMqPump2
12     {
13     public class Transport
14     {
15 torben 1999
16     enum LogfileType {
17     LogTransactions,
18     LogEvents
19     }
20    
21 torben 1986 public static string SQL2MQ = "sql2mq";
22     public static string MQ2SQL = "mq2sql";
23    
24 torben 2001 //private bool enabled;
25 torben 1986
26     TransportController controller;
27    
28 torben 1999 StatusData statusData = new StatusData();
29    
30 torben 1986 public string name { get; private set; }
31     public string direction { get; private set; }
32     public string queueName { get; private set; }
33     public string mq2sqlInsertQuery { get; private set; }
34     public string sql2mqReadQuery { get; private set; }
35     public string sql2mqUpdateQuery { get; private set; }
36    
37 torben 1999 //public bool lastrunOk { get; private set; }
38     //public string lastErrorMessage { get; private set; }
39 torben 1986
40 torben 1999 //public string lastOkTime { get; private set; }
41     //public string lastErrorTime { get; private set; }
42     //public string lastTransferTime { get; private set; }
43 torben 1986
44 torben 1999 //public int counter { get; private set; }
45 torben 1986
46 torben 1999 public StatusData TransportStatusData
47     {
48     get
49     {
50     return this.statusData;
51     }
52     }
53    
54    
55     public bool Enabled
56     {
57     get {
58 torben 2001 return statusData.transportEnabled;
59 torben 1999 }
60     set
61     {
62 torben 2001 statusData.transportEnabled = value;
63 torben 1999 if (value == true)
64     {
65     this.addLogEntry("Transport enabled");
66     }
67     else
68     {
69     this.addLogEntry("Transport disabled");
70     }
71     }
72     }
73    
74    
75 torben 1986 private LinkedList<string> logEntries = new LinkedList<string>();
76    
77    
78     public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)
79     {
80     this.controller = controller;
81     this.name = name;
82     this.direction = direction;
83     this.queueName = queueName;
84     this.mq2sqlInsertQuery = mq2sqlInsertQuery;
85     this.sql2mqReadQuery = sql2mqReadQuery;
86     this.sql2mqUpdateQuery = sql2mqUpdateQuery;
87    
88 torben 2001 statusData.transportEnabled = enabled;
89 torben 1986
90 torben 1999
91     statusData.lastrunOk = true;
92     statusData.counter = 0;
93     statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";
94    
95 torben 1986 addLogEntry( "Starting ... " );
96     }
97    
98 torben 1999 ~Transport()
99     {
100     addLogEntry("Stopping ... ");
101     }
102    
103    
104 torben 1986 public void transportMessages()
105     {
106 torben 2001 if (statusData.transportEnabled == false)
107 torben 1986 return;
108    
109     Console.WriteLine(name + " -> transportMessages() ");
110 torben 1999 statusData.lastrunOk = true;
111 torben 1986
112 torben 1999 int startCounter = statusData.counter;
113    
114 torben 1986 if (direction == SQL2MQ)
115     {
116     transportSql2Mq();
117     }
118     else
119     {
120     transportMq2Sql();
121     }
122    
123 torben 1999 if (statusData.lastrunOk == true)
124 torben 1986 {
125 torben 1999 statusData.lastOkTime = getNowString();
126    
127     if (statusData.counter != startCounter)
128     {
129     //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet
130     statusData.lastTransferTime = getNowString();
131     }
132 torben 1986 }
133     else
134     {
135 torben 1999 addLogEntry(statusData.lastErrorMessage);
136     statusData.lastErrorTime = getNowString();
137 torben 1986 }
138     }
139    
140     private void transportSql2Mq()
141     {
142 torben 2051 MQQueueManager mqMgr = null;
143     MQQueue out_queue = null;
144    
145 torben 1999 string filename = getLogFilename(LogfileType.LogTransactions);
146 torben 1986 using (StreamWriter translog = new StreamWriter(filename, true) )
147     try
148     {
149 torben 2011 //MQ Options
150 torben 1986 Hashtable connProps = getConnectionProperties();
151     int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
152    
153 torben 2011 //MySQL Options
154 torben 1986 string mysqlString = buildMysqlConnString();
155    
156 torben 2051 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
157     mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
158     out_queue = mqMgr.AccessQueue(queueName, openOptions);
159 torben 2011 using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
160     using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
161     {
162     sqlReadConnection.Open();
163     sqlWriteConnection.Open();
164 torben 1986
165 torben 2011 //stage 3 move messages
166     string readSql = "CALL " + sql2mqReadQuery + "()";
167     MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);
168     MySqlDataReader dataReader = readCmd.ExecuteReader();
169     while (dataReader.Read())
170     {
171     int id = dataReader.GetInt32(0);
172     string msgString = dataReader.GetString(1);
173 torben 2010
174 torben 2011 MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
175     // same as MQPMO_DEFAULT
176 torben 1986
177 torben 2011 MQMessage msg = new MQMessage();
178     msg.Format = MQC.MQFMT_STRING;
179     msg.CharacterSet = 1252;
180     msg.WriteString(msgString);
181 torben 1986
182 torben 2011 out_queue.Put(msg, pmo);
183 torben 1986
184 torben 2011 //now that the message has been put on queue mark it as such
185 torben 1986
186 torben 2011 string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
187     MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
188     int numrows = updateCmd.ExecuteNonQuery();
189 torben 1986
190 torben 2011 translog.WriteLine(getNowString() + " " + msgString + "\n");
191 torben 1986
192 torben 2011 if (numrows != 1)
193     {
194     break;
195     }
196     statusData.counter++;
197     }
198 torben 1986
199     }
200     }
201     catch (Exception e)
202     {
203 torben 1999 statusData.lastrunOk = false;
204 torben 2048 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
205 torben 1999 Console.WriteLine(statusData.lastErrorMessage);
206 torben 2050 Console.WriteLine(e.StackTrace);
207 torben 1999 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
208 torben 1986 }
209 torben 2051 finally
210     {
211    
212     if (out_queue != null && out_queue.IsOpen)
213     {
214     try
215     {
216     out_queue.Close();
217     }
218     catch (Exception e)
219     {
220     Console.WriteLine("Error cleaning up qmgr " + e.Message);
221     }
222     }
223    
224     if (mqMgr != null && mqMgr.IsOpen)
225     {
226     try
227     {
228     mqMgr.Close();
229     }
230     catch (Exception e)
231     {
232     Console.WriteLine("Error cleaning up qmgr " + e.Message);
233     }
234     }
235    
236     }
237 torben 1986 }
238    
239     private void transportMq2Sql()
240     {
241 torben 2051 MQQueueManager mqMgr = null;
242     MQQueue in_queue = null;
243 torben 1999 string filename = getLogFilename(LogfileType.LogTransactions);
244 torben 1986 using (StreamWriter translog = new StreamWriter(filename, true))
245 torben 2051 try
246     {
247     //MQ options
248     Hashtable connProps = getConnectionProperties();
249     int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
250 torben 1986
251 torben 2051 //MySQL options
252     string mysqlString = buildMysqlConnString();
253 torben 1986
254 torben 2051 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
255     mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
256     in_queue = mqMgr.AccessQueue(queueName, openOptions);
257     using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
258     {
259 torben 1986
260 torben 2051 sqlConnection.Open();
261 torben 1986
262 torben 2011
263 torben 2051 //stage 3 move messages
264     bool isContinue = true;
265     while (isContinue)
266     {
267 torben 1986
268 torben 2051 MQMessage mqMsg = new MQMessage();
269     MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
270 torben 1986
271 torben 2051 mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
272 torben 1986
273 torben 2051 try
274 torben 1986 {
275 torben 2051 in_queue.Get(mqMsg, mqGetMsgOpts);
276     if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
277     {
278     string msgString = mqMsg.ReadString(mqMsg.MessageLength);
279     System.Console.WriteLine(msgString);
280 torben 2011
281 torben 2051 string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
282 torben 2011
283 torben 2051 MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
284     int numrows = sqlcmd.ExecuteNonQuery();
285 torben 2011
286 torben 2051 if (numrows == 1)
287     {
288     translog.WriteLine(getNowString() + " " + msgString + "\n");
289     mqMgr.Commit();
290     statusData.counter++;
291     }
292     else
293     {
294     mqMgr.Backout();
295     isContinue = false;
296     }
297    
298 torben 2011 }
299     else
300     {
301 torben 2051 System.Console.WriteLine("Non-text message");
302 torben 2011 }
303 torben 1986 }
304 torben 2051 catch (MQException mqe)
305 torben 1986 {
306 torben 2051 isContinue = false;
307 torben 1986
308 torben 2051 // report reason, if any
309     if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
310     {
311     // special report for normal end
312     System.Console.WriteLine("no more messages");
313     }
314     else
315     {
316     // general report for other reasons
317     System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
318     statusData.lastrunOk = false;
319     }
320    
321 torben 2011 }
322    
323 torben 2051
324 torben 1986 }
325    
326 torben 2051 }
327 torben 2011
328 torben 2051 }
329     catch (Exception e)
330     {
331     statusData.lastrunOk = false;
332    
333     statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
334     Console.WriteLine(statusData.lastErrorMessage);
335     Console.WriteLine(e.StackTrace);
336     EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
337     }
338     finally
339     {
340    
341     if (in_queue != null && in_queue.IsOpen)
342     {
343     try
344     {
345     in_queue.Close();
346     }
347     catch (Exception e)
348     {
349     Console.WriteLine("Error cleaning up qmgr " + e.Message);
350     }
351 torben 1986 }
352 torben 2051
353     if (mqMgr != null && mqMgr.IsOpen)
354     {
355     try
356     {
357     mqMgr.Close();
358     } catch (Exception e) {
359     Console.WriteLine("Error cleaning up qmgr " + e.Message);
360     }
361     }
362 torben 1986
363     }
364     }
365    
366     private string buildMysqlConnString()
367     {
368     string connectionString = "";
369    
370     connectionString += "SERVER=" + controller.mysqlHost + ";";
371     //connectionString += "DATABASE=" + controller.mysqlHost + ";";
372     connectionString += "UID=" + controller.mysqlUser + ";";
373     connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
374 torben 2051 connectionString += "Max Pool Size=20;";
375     //connectionString += "ConnectionReset=true;";//ConnectionReset kræver muligvis at Default Database er angivet - det virker ihvertfald ikke uden
376 torben 1986
377     return connectionString;
378     }
379    
380     private Hashtable getConnectionProperties()
381     {
382     Hashtable connProperties = new Hashtable();
383     connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
384     connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);
385     connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel); //WARNING: Hardcoded Channel Value !!!
386     return connProperties;
387     }
388    
389 torben 1999
390    
391     private string getLogFilename(LogfileType type)
392 torben 1986 {
393    
394     DateTime now = DateTime.Now;
395     string filename = controller.logDirectory + "\\";
396 torben 1999
397     switch (type)
398     {
399     case LogfileType.LogEvents:
400     filename += "eventlog_";
401     break;
402    
403     case LogfileType.LogTransactions:
404     filename += "transactionlog_";
405     break;
406     }
407    
408    
409 torben 2003 filename += name + "_" + now.Year.ToString("D4") + now.Month.ToString("D2") + ".log";
410 torben 1986
411     return filename;
412     }
413    
414     public string getNowString()
415     {
416     DateTime now = DateTime.Now;
417    
418     return now.ToString("s");
419     }
420    
421     private void addLogEntry(string msg)
422     {
423     msg = getNowString() + " " + msg;
424     lock (logEntries)
425     {
426     logEntries.AddFirst(msg);
427    
428     if (logEntries.Count > 20)
429     {
430     logEntries.RemoveLast();
431 torben 1999 }
432 torben 1986 }
433 torben 1999
434     string filename = getLogFilename(LogfileType.LogEvents);
435     using (StreamWriter eventlog = new StreamWriter(filename, true))
436     {
437     eventlog.WriteLine(msg);
438     }
439 torben 1986 }
440    
441     public string[] getLog()
442     {
443     lock(logEntries)
444     {
445     List<string> tmpEntries = new List<string>();
446     foreach (string s in logEntries)
447     {
448     tmpEntries.Add(s);
449     }
450     return tmpEntries.ToArray();
451     }
452     }
453    
454     }
455     }

  ViewVC Help
Powered by ViewVC 1.1.20