/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Annotation of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2050 - (hide annotations) (download)
Fri Aug 23 17:07:25 2013 UTC (10 years, 9 months ago) by torben
File size: 14174 byte(s)
also dump stacktrace to CLI / console
1 torben 1986 using System;
2     using System.Collections;
3     using System.Collections.Generic;
4     using System.IO;
5    
6     using System.Diagnostics;
7    
8     using IBM.WMQ;
9     using MySql.Data.MySqlClient;
10    
11     namespace DaoMqPump2
12     {
13     public class Transport
14     {
15 torben 1999
16     enum LogfileType {
17     LogTransactions,
18     LogEvents
19     }
20    
21 torben 1986 public static string SQL2MQ = "sql2mq";
22     public static string MQ2SQL = "mq2sql";
23    
24 torben 2001 //private bool enabled;
25 torben 1986
26     TransportController controller;
27    
28 torben 1999 StatusData statusData = new StatusData();
29    
30 torben 1986 public string name { get; private set; }
31     public string direction { get; private set; }
32     public string queueName { get; private set; }
33     public string mq2sqlInsertQuery { get; private set; }
34     public string sql2mqReadQuery { get; private set; }
35     public string sql2mqUpdateQuery { get; private set; }
36    
37 torben 1999 //public bool lastrunOk { get; private set; }
38     //public string lastErrorMessage { get; private set; }
39 torben 1986
40 torben 1999 //public string lastOkTime { get; private set; }
41     //public string lastErrorTime { get; private set; }
42     //public string lastTransferTime { get; private set; }
43 torben 1986
44 torben 1999 //public int counter { get; private set; }
45 torben 1986
46 torben 1999 public StatusData TransportStatusData
47     {
48     get
49     {
50     return this.statusData;
51     }
52     }
53    
54    
55     public bool Enabled
56     {
57     get {
58 torben 2001 return statusData.transportEnabled;
59 torben 1999 }
60     set
61     {
62 torben 2001 statusData.transportEnabled = value;
63 torben 1999 if (value == true)
64     {
65     this.addLogEntry("Transport enabled");
66     }
67     else
68     {
69     this.addLogEntry("Transport disabled");
70     }
71     }
72     }
73    
74    
75 torben 1986 private LinkedList<string> logEntries = new LinkedList<string>();
76    
77    
78     public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)
79     {
80     this.controller = controller;
81     this.name = name;
82     this.direction = direction;
83     this.queueName = queueName;
84     this.mq2sqlInsertQuery = mq2sqlInsertQuery;
85     this.sql2mqReadQuery = sql2mqReadQuery;
86     this.sql2mqUpdateQuery = sql2mqUpdateQuery;
87    
88 torben 2001 statusData.transportEnabled = enabled;
89 torben 1986
90 torben 1999
91     statusData.lastrunOk = true;
92     statusData.counter = 0;
93     statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";
94    
95 torben 1986 addLogEntry( "Starting ... " );
96     }
97    
98 torben 1999 ~Transport()
99     {
100     addLogEntry("Stopping ... ");
101     }
102    
103    
104 torben 1986 public void transportMessages()
105     {
106 torben 2001 if (statusData.transportEnabled == false)
107 torben 1986 return;
108    
109     Console.WriteLine(name + " -> transportMessages() ");
110 torben 1999 statusData.lastrunOk = true;
111 torben 1986
112 torben 1999 int startCounter = statusData.counter;
113    
114 torben 1986 if (direction == SQL2MQ)
115     {
116     transportSql2Mq();
117     }
118     else
119     {
120     transportMq2Sql();
121     }
122    
123 torben 1999 if (statusData.lastrunOk == true)
124 torben 1986 {
125 torben 1999 statusData.lastOkTime = getNowString();
126    
127     if (statusData.counter != startCounter)
128     {
129     //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet
130     statusData.lastTransferTime = getNowString();
131     }
132 torben 1986 }
133     else
134     {
135 torben 1999 addLogEntry(statusData.lastErrorMessage);
136     statusData.lastErrorTime = getNowString();
137 torben 1986 }
138     }
139    
140     private void transportSql2Mq()
141     {
142 torben 1999 string filename = getLogFilename(LogfileType.LogTransactions);
143 torben 1986 using (StreamWriter translog = new StreamWriter(filename, true) )
144     try
145     {
146 torben 2011 //MQ Options
147 torben 1986 Hashtable connProps = getConnectionProperties();
148     int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
149    
150 torben 2011 //MySQL Options
151 torben 1986 string mysqlString = buildMysqlConnString();
152 torben 2011
153 torben 1986
154 torben 2011 using (MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps))//stage 1 connect to mq
155     using (MQQueue out_queue = mqMgr.AccessQueue(queueName, openOptions))
156     using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
157     using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
158     {
159     sqlReadConnection.Open();
160     sqlWriteConnection.Open();
161 torben 1986
162 torben 2011 //stage 3 move messages
163     string readSql = "CALL " + sql2mqReadQuery + "()";
164     MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);
165     MySqlDataReader dataReader = readCmd.ExecuteReader();
166     while (dataReader.Read())
167     {
168     int id = dataReader.GetInt32(0);
169     string msgString = dataReader.GetString(1);
170 torben 2010
171 torben 2011 MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
172     // same as MQPMO_DEFAULT
173 torben 1986
174 torben 2011 MQMessage msg = new MQMessage();
175     msg.Format = MQC.MQFMT_STRING;
176     msg.CharacterSet = 1252;
177     msg.WriteString(msgString);
178 torben 1986
179 torben 2011 out_queue.Put(msg, pmo);
180 torben 1986
181 torben 2011 //now that the message has been put on queue mark it as such
182 torben 1986
183 torben 2011 string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
184     MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
185     int numrows = updateCmd.ExecuteNonQuery();
186 torben 1986
187 torben 2011 translog.WriteLine(getNowString() + " " + msgString + "\n");
188 torben 1986
189 torben 2011 if (numrows != 1)
190     {
191     break;
192     }
193     statusData.counter++;
194     }
195 torben 1986
196     }
197     }
198     catch (Exception e)
199     {
200 torben 1999 statusData.lastrunOk = false;
201 torben 2048 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
202 torben 1999 Console.WriteLine(statusData.lastErrorMessage);
203 torben 2050 Console.WriteLine(e.StackTrace);
204 torben 1999 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
205 torben 1986 }
206     }
207    
208     private void transportMq2Sql()
209     {
210 torben 1999 string filename = getLogFilename(LogfileType.LogTransactions);
211 torben 1986 using (StreamWriter translog = new StreamWriter(filename, true))
212     try
213     {
214 torben 2011 //MQ options
215     Hashtable connProps = getConnectionProperties();
216 torben 1986 int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
217    
218 torben 2011 //MySQL options
219 torben 1986 string mysqlString = buildMysqlConnString();
220    
221 torben 2011 using (MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps))//stage 1 connect to mq
222     using (MQQueue in_queue = mqMgr.AccessQueue(queueName, openOptions) )
223     using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
224 torben 1986 {
225    
226 torben 2011 sqlConnection.Open();
227 torben 1986
228 torben 2011
229     //stage 3 move messages
230     bool isContinue = true;
231     while (isContinue)
232 torben 1986 {
233    
234 torben 2011 MQMessage mqMsg = new MQMessage();
235     MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
236 torben 1986
237 torben 2011 mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
238 torben 1986
239 torben 2011 try
240     {
241     in_queue.Get(mqMsg, mqGetMsgOpts);
242     if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
243 torben 1986 {
244 torben 2011 string msgString = mqMsg.ReadString(mqMsg.MessageLength);
245     System.Console.WriteLine(msgString);
246    
247     string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
248    
249     MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
250     int numrows = sqlcmd.ExecuteNonQuery();
251    
252     if (numrows == 1)
253     {
254     translog.WriteLine(getNowString() + " " + msgString + "\n");
255     mqMgr.Commit();
256     statusData.counter++;
257     }
258     else
259     {
260     mqMgr.Backout();
261     isContinue = false;
262     }
263    
264 torben 1986 }
265     else
266     {
267 torben 2011 System.Console.WriteLine("Non-text message");
268     }
269 torben 1986 }
270 torben 2011 catch (MQException mqe)
271 torben 1986 {
272 torben 2011 isContinue = false;
273 torben 1986
274 torben 2011 // report reason, if any
275     if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
276     {
277     // special report for normal end
278     System.Console.WriteLine("no more messages");
279     }
280     else
281     {
282     // general report for other reasons
283     System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
284     statusData.lastrunOk = false;
285     }
286    
287 torben 1986 }
288    
289 torben 2011
290 torben 1986 }
291    
292     }
293    
294     }
295     catch (Exception e)
296     {
297 torben 1999 statusData.lastrunOk = false;
298 torben 2048
299     statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.GetType().FullName + " " + e.Message;
300 torben 1999 Console.WriteLine(statusData.lastErrorMessage);
301 torben 2050 Console.WriteLine(e.StackTrace);
302 torben 1999 EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
303 torben 1986 }
304     }
305    
306     private string buildMysqlConnString()
307     {
308     string connectionString = "";
309    
310     connectionString += "SERVER=" + controller.mysqlHost + ";";
311     //connectionString += "DATABASE=" + controller.mysqlHost + ";";
312     connectionString += "UID=" + controller.mysqlUser + ";";
313     connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
314 torben 2048 //connectionString += "maximumpoolsize=10;";
315     //connectionString += "ConnectionReset=true;";
316 torben 1986
317     return connectionString;
318     }
319    
320     private Hashtable getConnectionProperties()
321     {
322     Hashtable connProperties = new Hashtable();
323     connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
324     connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);
325     connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel); //WARNING: Hardcoded Channel Value !!!
326     return connProperties;
327     }
328    
329 torben 1999
330    
331     private string getLogFilename(LogfileType type)
332 torben 1986 {
333    
334     DateTime now = DateTime.Now;
335     string filename = controller.logDirectory + "\\";
336 torben 1999
337     switch (type)
338     {
339     case LogfileType.LogEvents:
340     filename += "eventlog_";
341     break;
342    
343     case LogfileType.LogTransactions:
344     filename += "transactionlog_";
345     break;
346     }
347    
348    
349 torben 2003 filename += name + "_" + now.Year.ToString("D4") + now.Month.ToString("D2") + ".log";
350 torben 1986
351     return filename;
352     }
353    
354     public string getNowString()
355     {
356     DateTime now = DateTime.Now;
357    
358     return now.ToString("s");
359     }
360    
361     private void addLogEntry(string msg)
362     {
363     msg = getNowString() + " " + msg;
364     lock (logEntries)
365     {
366     logEntries.AddFirst(msg);
367    
368     if (logEntries.Count > 20)
369     {
370     logEntries.RemoveLast();
371 torben 1999 }
372 torben 1986 }
373 torben 1999
374     string filename = getLogFilename(LogfileType.LogEvents);
375     using (StreamWriter eventlog = new StreamWriter(filename, true))
376     {
377     eventlog.WriteLine(msg);
378     }
379 torben 1986 }
380    
381     public string[] getLog()
382     {
383     lock(logEntries)
384     {
385     List<string> tmpEntries = new List<string>();
386     foreach (string s in logEntries)
387     {
388     tmpEntries.Add(s);
389     }
390     return tmpEntries.ToArray();
391     }
392     }
393    
394     }
395     }

  ViewVC Help
Powered by ViewVC 1.1.20