/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Annotation of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2048 - (hide annotations) (download)
Fri Aug 23 07:44:00 2013 UTC (10 years, 9 months ago) by torben
File size: 14074 byte(s)
Also log exception type on error
1 torben 1986 using System;
2     using System.Collections;
3     using System.Collections.Generic;
4     using System.IO;
5    
6     using System.Diagnostics;
7    
8     using IBM.WMQ;
9     using MySql.Data.MySqlClient;
10    
11     namespace DaoMqPump2
12     {
13     public class Transport
14     {
15 torben 1999
16     enum LogfileType {
17     LogTransactions,
18     LogEvents
19     }
20    
21 torben 1986 public static string SQL2MQ = "sql2mq";
22     public static string MQ2SQL = "mq2sql";
23    
24 torben 2001 //private bool enabled;
25 torben 1986
26     TransportController controller;
27    
28 torben 1999 StatusData statusData = new StatusData();
29    
30 torben 1986 public string name { get; private set; }
31     public string direction { get; private set; }
32     public string queueName { get; private set; }
33     public string mq2sqlInsertQuery { get; private set; }
34     public string sql2mqReadQuery { get; private set; }
35     public string sql2mqUpdateQuery { get; private set; }
36    
37 torben 1999 //public bool lastrunOk { get; private set; }
38     //public string lastErrorMessage { get; private set; }
39 torben 1986
40 torben 1999 //public string lastOkTime { get; private set; }
41     //public string lastErrorTime { get; private set; }
42     //public string lastTransferTime { get; private set; }
43 torben 1986
44 torben 1999 //public int counter { get; private set; }
45 torben 1986
46 torben 1999 public StatusData TransportStatusData
47     {
48     get
49     {
50     return this.statusData;
51     }
52     }
53    
54    
55     public bool Enabled
56     {
57     get {
58 torben 2001 return statusData.transportEnabled;
59 torben 1999 }
60     set
61     {
62 torben 2001 statusData.transportEnabled = value;
63 torben 1999 if (value == true)
64     {
65     this.addLogEntry("Transport enabled");
66     }
67     else
68     {
69     this.addLogEntry("Transport disabled");
70     }
71     }
72     }
73    
74    
75 torben 1986 private LinkedList<string> logEntries = new LinkedList<string>();
76    
77    
78     public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)
79     {
80     this.controller = controller;
81     this.name = name;
82     this.direction = direction;
83     this.queueName = queueName;
84     this.mq2sqlInsertQuery = mq2sqlInsertQuery;
85     this.sql2mqReadQuery = sql2mqReadQuery;
86     this.sql2mqUpdateQuery = sql2mqUpdateQuery;
87    
88 torben 2001 statusData.transportEnabled = enabled;
89 torben 1986
90 torben 1999
91     statusData.lastrunOk = true;
92     statusData.counter = 0;
93     statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";
94    
95 torben 1986 addLogEntry( "Starting ... " );
96     }
97    
98 torben 1999 ~Transport()
99     {
100     addLogEntry("Stopping ... ");
101     }
102    
103    
104 torben 1986 public void transportMessages()
105     {
106 torben 2001 if (statusData.transportEnabled == false)
107 torben 1986 return;
108    
109     Console.WriteLine(name + " -> transportMessages() ");
110 torben 1999 statusData.lastrunOk = true;
111 torben 1986
112 torben 1999 int startCounter = statusData.counter;
113    
114 torben 1986 if (direction == SQL2MQ)
115     {
116     transportSql2Mq();
117     }
118     else
119     {
120     transportMq2Sql();
121     }
122    
123 torben 1999 if (statusData.lastrunOk == true)
124 torben 1986 {
125 torben 1999 statusData.lastOkTime = getNowString();
126    
127     if (statusData.counter != startCounter)
128     {
129     //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet
130     statusData.lastTransferTime = getNowString();
131     }
132 torben 1986 }
133     else
134     {
135 torben 1999 addLogEntry(statusData.lastErrorMessage);
136     statusData.lastErrorTime = getNowString();
137 torben 1986 }
138     }
139    
140     private void transportSql2Mq()
141     {
142 torben 1999 string filename = getLogFilename(LogfileType.LogTransactions);
143 torben 1986 using (StreamWriter translog = new StreamWriter(filename, true) )
144     try
145     {
146 torben 2011 //MQ Options
147 torben 1986 Hashtable connProps = getConnectionProperties();
148     int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
149    
150 torben 2011 //MySQL Options
151 torben 1986 string mysqlString = buildMysqlConnString();
152 torben 2011
153 torben 1986
154 torben 2011 using (MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps))//stage 1 connect to mq
155     using (MQQueue out_queue = mqMgr.AccessQueue(queueName, openOptions))
156     using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
157     using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
158     {
159     sqlReadConnection.Open();
160     sqlWriteConnection.Open();
161 torben 1986
162 torben 2011 //stage 3 move messages
163     string readSql = "CALL " + sql2mqReadQuery + "()";
164     MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);
165     MySqlDataReader dataReader = readCmd.ExecuteReader();
166     while (dataReader.Read())
167     {
168     int id = dataReader.GetInt32(0);
169     string msgString = dataReader.GetString(1);
170 torben 2010
171 torben 2011 MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
172     // same as MQPMO_DEFAULT
173 torben 1986
174 torben 2011 MQMessage msg = new MQMessage();
175     msg.Format = MQC.MQFMT_STRING;
176     msg.CharacterSet = 1252;
177     msg.WriteString(msgString);
178 torben 1986
179 torben 2011 out_queue.Put(msg, pmo);
180 torben 1986
181 torben 2011 //now that the message has been put on queue mark it as such
182 torben 1986
183 torben 2011 string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
184     MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
185     int numrows = updateCmd.ExecuteNonQuery();
186 torben 1986
187 torben 2011 translog.WriteLine(getNowString() + " " + msgString + "\n");
188 torben 1986
189 torben 2011 if (numrows != 1)
190     {
191     break;
192     }
193     statusData.counter++;
194     }
195 torben 1986
196     }
197     }
198     catch (Exception e)
199     {
200 torben 1999 statusData.lastrunOk = false;
201 torben 2048 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
202 torben 1999 Console.WriteLine(statusData.lastErrorMessage);
203     EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
204 torben 1986 }
205     }
206    
207     private void transportMq2Sql()
208     {
209 torben 1999 string filename = getLogFilename(LogfileType.LogTransactions);
210 torben 1986 using (StreamWriter translog = new StreamWriter(filename, true))
211     try
212     {
213 torben 2011 //MQ options
214     Hashtable connProps = getConnectionProperties();
215 torben 1986 int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
216    
217 torben 2011 //MySQL options
218 torben 1986 string mysqlString = buildMysqlConnString();
219    
220 torben 2011 using (MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps))//stage 1 connect to mq
221     using (MQQueue in_queue = mqMgr.AccessQueue(queueName, openOptions) )
222     using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
223 torben 1986 {
224    
225 torben 2011 sqlConnection.Open();
226 torben 1986
227 torben 2011
228     //stage 3 move messages
229     bool isContinue = true;
230     while (isContinue)
231 torben 1986 {
232    
233 torben 2011 MQMessage mqMsg = new MQMessage();
234     MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
235 torben 1986
236 torben 2011 mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
237 torben 1986
238 torben 2011 try
239     {
240     in_queue.Get(mqMsg, mqGetMsgOpts);
241     if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
242 torben 1986 {
243 torben 2011 string msgString = mqMsg.ReadString(mqMsg.MessageLength);
244     System.Console.WriteLine(msgString);
245    
246     string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
247    
248     MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
249     int numrows = sqlcmd.ExecuteNonQuery();
250    
251     if (numrows == 1)
252     {
253     translog.WriteLine(getNowString() + " " + msgString + "\n");
254     mqMgr.Commit();
255     statusData.counter++;
256     }
257     else
258     {
259     mqMgr.Backout();
260     isContinue = false;
261     }
262    
263 torben 1986 }
264     else
265     {
266 torben 2011 System.Console.WriteLine("Non-text message");
267     }
268 torben 1986 }
269 torben 2011 catch (MQException mqe)
270 torben 1986 {
271 torben 2011 isContinue = false;
272 torben 1986
273 torben 2011 // report reason, if any
274     if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
275     {
276     // special report for normal end
277     System.Console.WriteLine("no more messages");
278     }
279     else
280     {
281     // general report for other reasons
282     System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
283     statusData.lastrunOk = false;
284     }
285    
286 torben 1986 }
287    
288 torben 2011
289 torben 1986 }
290    
291     }
292    
293     }
294     catch (Exception e)
295     {
296 torben 1999 statusData.lastrunOk = false;
297 torben 2048
298     statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
299 torben 1999 Console.WriteLine(statusData.lastErrorMessage);
300     EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
301 torben 1986 }
302     }
303    
304     private string buildMysqlConnString()
305     {
306     string connectionString = "";
307    
308     connectionString += "SERVER=" + controller.mysqlHost + ";";
309     //connectionString += "DATABASE=" + controller.mysqlHost + ";";
310     connectionString += "UID=" + controller.mysqlUser + ";";
311     connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
312 torben 2048 //connectionString += "maximumpoolsize=10;";
313     //connectionString += "ConnectionReset=true;";
314 torben 1986
315     return connectionString;
316     }
317    
318     private Hashtable getConnectionProperties()
319     {
320     Hashtable connProperties = new Hashtable();
321     connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
322     connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);
323     connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel); //WARNING: Hardcoded Channel Value !!!
324     return connProperties;
325     }
326    
327 torben 1999
328    
329     private string getLogFilename(LogfileType type)
330 torben 1986 {
331    
332     DateTime now = DateTime.Now;
333     string filename = controller.logDirectory + "\\";
334 torben 1999
335     switch (type)
336     {
337     case LogfileType.LogEvents:
338     filename += "eventlog_";
339     break;
340    
341     case LogfileType.LogTransactions:
342     filename += "transactionlog_";
343     break;
344     }
345    
346    
347 torben 2003 filename += name + "_" + now.Year.ToString("D4") + now.Month.ToString("D2") + ".log";
348 torben 1986
349     return filename;
350     }
351    
352     public string getNowString()
353     {
354     DateTime now = DateTime.Now;
355    
356     return now.ToString("s");
357     }
358    
359     private void addLogEntry(string msg)
360     {
361     msg = getNowString() + " " + msg;
362     lock (logEntries)
363     {
364     logEntries.AddFirst(msg);
365    
366     if (logEntries.Count > 20)
367     {
368     logEntries.RemoveLast();
369 torben 1999 }
370 torben 1986 }
371 torben 1999
372     string filename = getLogFilename(LogfileType.LogEvents);
373     using (StreamWriter eventlog = new StreamWriter(filename, true))
374     {
375     eventlog.WriteLine(msg);
376     }
377 torben 1986 }
378    
379     public string[] getLog()
380     {
381     lock(logEntries)
382     {
383     List<string> tmpEntries = new List<string>();
384     foreach (string s in logEntries)
385     {
386     tmpEntries.Add(s);
387     }
388     return tmpEntries.ToArray();
389     }
390     }
391    
392     }
393     }

  ViewVC Help
Powered by ViewVC 1.1.20