/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Annotation of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2169 - (hide annotations) (download)
Fri May 16 21:10:02 2014 UTC (10 years ago) by torben
File size: 17087 byte(s)
WIP
1 torben 1986 using System;
2     using System.Collections;
3     using System.Collections.Generic;
4     using System.IO;
5    
6     using System.Diagnostics;
7    
8     using IBM.WMQ;
9     using MySql.Data.MySqlClient;
10 torben 2057 using System.Globalization;
11 torben 1986
12 torben 2166 using DaoCommon;
13    
14 torben 1986 namespace DaoMqPump2
15     {
16     public class Transport
17     {
18 torben 1999
19 torben 1986 public static string SQL2MQ = "sql2mq";
20     public static string MQ2SQL = "mq2sql";
21    
22 torben 2001 //private bool enabled;
23 torben 1986
24     TransportController controller;
25    
26 torben 1999 StatusData statusData = new StatusData();
27    
28 torben 2168 public Logfile logfile { get; private set; }
29    
30 torben 1986 public string name { get; private set; }
31     public string direction { get; private set; }
32     public string queueName { get; private set; }
33     public string mq2sqlInsertQuery { get; private set; }
34     public string sql2mqReadQuery { get; private set; }
35     public string sql2mqUpdateQuery { get; private set; }
36    
37 torben 1999 //public bool lastrunOk { get; private set; }
38     //public string lastErrorMessage { get; private set; }
39 torben 1986
40 torben 1999 //public string lastOkTime { get; private set; }
41     //public string lastErrorTime { get; private set; }
42     //public string lastTransferTime { get; private set; }
43 torben 1986
44 torben 1999 //public int counter { get; private set; }
45 torben 1986
46 torben 1999 public StatusData TransportStatusData
47     {
48     get
49     {
50     return this.statusData;
51     }
52     }
53    
54    
55     public bool Enabled
56     {
57     get {
58 torben 2001 return statusData.transportEnabled;
59 torben 1999 }
60     set
61     {
62 torben 2001 statusData.transportEnabled = value;
63 torben 1999 if (value == true)
64     {
65 torben 2168 logfile.addSingleLogEntry("Transport enabled");
66 torben 1999 }
67     else
68     {
69 torben 2168 logfile.addSingleLogEntry("Transport disabled");
70 torben 1999 }
71     }
72     }
73    
74    
75 torben 1986
76    
77 torben 2168
78 torben 1986 public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)
79     {
80     this.controller = controller;
81     this.name = name;
82     this.direction = direction;
83     this.queueName = queueName;
84     this.mq2sqlInsertQuery = mq2sqlInsertQuery;
85     this.sql2mqReadQuery = sql2mqReadQuery;
86     this.sql2mqUpdateQuery = sql2mqUpdateQuery;
87    
88 torben 2001 statusData.transportEnabled = enabled;
89 torben 1986
90 torben 1999
91     statusData.lastrunOk = true;
92     statusData.counter = 0;
93     statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";
94    
95 torben 2168 logfile = new Logfile(name, controller.logDirectory);
96     logfile.addSingleLogEntry("Starting ... ");
97 torben 1986 }
98    
99 torben 1999 ~Transport()
100     {
101 torben 2168 logfile.addSingleLogEntry("Stopping ... ");
102 torben 1999 }
103    
104    
105 torben 1986 public void transportMessages()
106     {
107 torben 2001 if (statusData.transportEnabled == false)
108 torben 1986 return;
109    
110     Console.WriteLine(name + " -> transportMessages() ");
111 torben 1999 statusData.lastrunOk = true;
112 torben 1986
113 torben 1999 int startCounter = statusData.counter;
114    
115 torben 1986 if (direction == SQL2MQ)
116     {
117     transportSql2Mq();
118     }
119     else
120     {
121     transportMq2Sql();
122     }
123    
124 torben 1999 if (statusData.lastrunOk == true)
125 torben 1986 {
126 torben 2168 statusData.lastOkTime = Logfile.getNowString();
127 torben 1999
128     if (statusData.counter != startCounter)
129     {
130     //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet
131 torben 2168 statusData.lastTransferTime = Logfile.getNowString();
132 torben 1999 }
133 torben 1986 }
134     else
135     {
136 torben 2168 logfile.addSingleLogEntry(statusData.lastErrorMessage);
137     statusData.lastErrorTime = Logfile.getNowString();
138 torben 1986 }
139     }
140    
141     private void transportSql2Mq()
142     {
143 torben 2051 MQQueueManager mqMgr = null;
144     MQQueue out_queue = null;
145    
146 torben 2168 string filename = Logfile.getLogFilename(LogfileType.LogTransactions, controller.logDirectory, name);
147 torben 1986 using (StreamWriter translog = new StreamWriter(filename, true) )
148     try
149     {
150 torben 2011 //MQ Options
151 torben 1986 Hashtable connProps = getConnectionProperties();
152     int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
153    
154 torben 2011 //MySQL Options
155 torben 1986 string mysqlString = buildMysqlConnString();
156    
157 torben 2051 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
158     mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
159     out_queue = mqMgr.AccessQueue(queueName, openOptions);
160 torben 2011 using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
161     using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
162     {
163     sqlReadConnection.Open();
164     sqlWriteConnection.Open();
165 torben 1986
166 torben 2011 //stage 3 move messages
167     string readSql = "CALL " + sql2mqReadQuery + "()";
168     MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);
169     MySqlDataReader dataReader = readCmd.ExecuteReader();
170     while (dataReader.Read())
171     {
172     int id = dataReader.GetInt32(0);
173     string msgString = dataReader.GetString(1);
174 torben 2010
175 torben 2011 MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
176     // same as MQPMO_DEFAULT
177 torben 1986
178 torben 2011 MQMessage msg = new MQMessage();
179     msg.Format = MQC.MQFMT_STRING;
180     msg.CharacterSet = 1252;
181     msg.WriteString(msgString);
182 torben 1986
183 torben 2011 out_queue.Put(msg, pmo);
184 torben 1986
185 torben 2011 //now that the message has been put on queue mark it as such
186 torben 1986
187 torben 2011 string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
188     MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
189     int numrows = updateCmd.ExecuteNonQuery();
190 torben 1986
191 torben 2168 translog.WriteLine(Logfile.getNowString() + " " + msgString);
192 torben 1986
193 torben 2011 if (numrows != 1)
194     {
195     break;
196     }
197     statusData.counter++;
198     }
199 torben 1986
200     }
201     }
202     catch (Exception e)
203     {
204 torben 1999 statusData.lastrunOk = false;
205 torben 2048 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
206 torben 1999 Console.WriteLine(statusData.lastErrorMessage);
207 torben 2050 Console.WriteLine(e.StackTrace);
208 torben 1999 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
209 torben 1986 }
210 torben 2051 finally
211     {
212    
213     if (out_queue != null && out_queue.IsOpen)
214     {
215     try
216     {
217     out_queue.Close();
218     }
219     catch (Exception e)
220     {
221     Console.WriteLine("Error cleaning up qmgr " + e.Message);
222     }
223     }
224    
225     if (mqMgr != null && mqMgr.IsOpen)
226     {
227     try
228     {
229     mqMgr.Close();
230     }
231     catch (Exception e)
232     {
233     Console.WriteLine("Error cleaning up qmgr " + e.Message);
234     }
235     }
236    
237     }
238 torben 1986 }
239    
240     private void transportMq2Sql()
241     {
242 torben 2136 int messageCount = 0;
243    
244 torben 2051 MQQueueManager mqMgr = null;
245 torben 2168 MQQueue in_queue = null;
246     string filename = Logfile.getLogFilename(LogfileType.LogTransactions, controller.logDirectory, name);
247 torben 1986 using (StreamWriter translog = new StreamWriter(filename, true))
248 torben 2051 try
249     {
250     //MQ options
251     Hashtable connProps = getConnectionProperties();
252     int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
253 torben 1986
254 torben 2051 //MySQL options
255     string mysqlString = buildMysqlConnString();
256 torben 1986
257 torben 2051 //MQ objects i v6 implementerer ikke IDisposable og kan derfor ikke bruges med "using" statement
258     mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);//stage 1 connect to mq
259     in_queue = mqMgr.AccessQueue(queueName, openOptions);
260     using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
261     {
262 torben 1986
263 torben 2051 sqlConnection.Open();
264 torben 1986
265 torben 2011
266 torben 2051 //stage 3 move messages
267     bool isContinue = true;
268     while (isContinue)
269     {
270 torben 1986
271 torben 2051 MQMessage mqMsg = new MQMessage();
272     MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
273 torben 1986
274 torben 2051 mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
275 torben 1986
276 torben 2051 try
277 torben 1986 {
278 torben 2051 in_queue.Get(mqMsg, mqGetMsgOpts);
279     if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
280     {
281     string msgString = mqMsg.ReadString(mqMsg.MessageLength);
282 torben 2062 //System.Console.WriteLine(msgString);
283 torben 2011
284 torben 2083
285     // Hvis transaktionen starter med et ? er det ikke en gyldig transaktion
286     // validér ligeledes at headeren er gyldig
287 torben 2169 if ( msgString.StartsWith("?") || Salt2Helper.validateSalt2Header(msgString) == false )
288 torben 2058 {
289 torben 2168 string discarded_filename = Logfile.getLogFilename(LogfileType.LogTransactions, controller.logDirectory, name);
290 torben 2058 using (StreamWriter discardedlog = new StreamWriter(discarded_filename, true))
291     {
292 torben 2168 discardedlog.WriteLine(Logfile.getNowString() + " " + msgString);
293 torben 2058 }
294     mqMgr.Commit();//fjern den afviste transaktion fra køen
295     statusData.discardedCounter++;
296     continue; //gå frem til at tage næste transaktion fra køen
297     }
298    
299    
300 torben 2051 string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
301 torben 2011
302 torben 2051 MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
303     int numrows = sqlcmd.ExecuteNonQuery();
304 torben 2011
305 torben 2051 if (numrows == 1)
306     {
307 torben 2168 translog.WriteLine(Logfile.getNowString() + " " + msgString);
308 torben 2051 mqMgr.Commit();
309     statusData.counter++;
310 torben 2136
311    
312     messageCount++;// increment per run message counter
313     if (messageCount >= 10000) // if we have moved 10000 messages in this run - let it go and give the other transports a change
314     {
315     isContinue = false;
316     }
317    
318    
319    
320 torben 2051 }
321     else
322     {
323     mqMgr.Backout();
324     isContinue = false;
325     }
326    
327 torben 2011 }
328     else
329     {
330 torben 2051 System.Console.WriteLine("Non-text message");
331 torben 2011 }
332 torben 1986 }
333 torben 2051 catch (MQException mqe)
334 torben 1986 {
335 torben 2051 isContinue = false;
336 torben 1986
337 torben 2051 // report reason, if any
338     if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
339     {
340     // special report for normal end
341     System.Console.WriteLine("no more messages");
342     }
343     else
344     {
345     // general report for other reasons
346     System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
347     statusData.lastrunOk = false;
348     }
349    
350 torben 2011 }
351    
352 torben 2051
353 torben 1986 }
354    
355 torben 2051 }
356 torben 2011
357 torben 2051 }
358     catch (Exception e)
359     {
360 torben 2055 //Det vil være mest korrekt at Rollback/backout MQ Transaktionen her - for at være sikker på at Message'n fjernes fra køen
361     try
362     {
363     if (mqMgr != null)
364     {
365     mqMgr.Backout();
366     }
367     }
368     catch (Exception e2)
369     {
370 torben 2168 logfile.addSingleLogEntry("Error backing out from MQ Transaction: " + e2.Message);
371 torben 2055 }
372    
373 torben 2051 statusData.lastrunOk = false;
374    
375     statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
376     Console.WriteLine(statusData.lastErrorMessage);
377     Console.WriteLine(e.StackTrace);
378     EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
379     }
380     finally
381     {
382    
383     if (in_queue != null && in_queue.IsOpen)
384     {
385     try
386     {
387     in_queue.Close();
388     }
389     catch (Exception e)
390     {
391     Console.WriteLine("Error cleaning up qmgr " + e.Message);
392     }
393 torben 1986 }
394 torben 2051
395     if (mqMgr != null && mqMgr.IsOpen)
396     {
397     try
398     {
399     mqMgr.Close();
400     } catch (Exception e) {
401     Console.WriteLine("Error cleaning up qmgr " + e.Message);
402     }
403     }
404 torben 1986
405     }
406     }
407    
408     private string buildMysqlConnString()
409     {
410     string connectionString = "";
411    
412     connectionString += "SERVER=" + controller.mysqlHost + ";";
413     //connectionString += "DATABASE=" + controller.mysqlHost + ";";
414     connectionString += "UID=" + controller.mysqlUser + ";";
415     connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
416 torben 2051 connectionString += "Max Pool Size=20;";
417     //connectionString += "ConnectionReset=true;";//ConnectionReset kræver muligvis at Default Database er angivet - det virker ihvertfald ikke uden
418 torben 1986
419     return connectionString;
420     }
421    
422     private Hashtable getConnectionProperties()
423     {
424     Hashtable connProperties = new Hashtable();
425     connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
426     connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);
427 torben 2060 connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel);
428 torben 1986 return connProperties;
429     }
430    
431 torben 1999
432 torben 1986 }
433     }

  ViewVC Help
Powered by ViewVC 1.1.20