/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Annotation of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2001 - (hide annotations) (download)
Mon Jul 8 14:28:01 2013 UTC (10 years, 10 months ago) by torben
File size: 13716 byte(s)
use statusData.transportEnabled instead of enabled
1 torben 1986 using System;
2     using System.Collections;
3     using System.Collections.Generic;
4     using System.IO;
5    
6     using System.Diagnostics;
7    
8     using IBM.WMQ;
9     using MySql.Data.MySqlClient;
10    
11     namespace DaoMqPump2
12     {
13     public class Transport
14     {
15 torben 1999
16     enum LogfileType {
17     LogTransactions,
18     LogEvents
19     }
20    
21 torben 1986 public static string SQL2MQ = "sql2mq";
22     public static string MQ2SQL = "mq2sql";
23    
24 torben 2001 //private bool enabled;
25 torben 1986
26     TransportController controller;
27    
28 torben 1999 StatusData statusData = new StatusData();
29    
30 torben 1986 public string name { get; private set; }
31     public string direction { get; private set; }
32     public string queueName { get; private set; }
33     public string mq2sqlInsertQuery { get; private set; }
34     public string sql2mqReadQuery { get; private set; }
35     public string sql2mqUpdateQuery { get; private set; }
36    
37 torben 1999 //public bool lastrunOk { get; private set; }
38     //public string lastErrorMessage { get; private set; }
39 torben 1986
40 torben 1999 //public string lastOkTime { get; private set; }
41     //public string lastErrorTime { get; private set; }
42     //public string lastTransferTime { get; private set; }
43 torben 1986
44 torben 1999 //public int counter { get; private set; }
45 torben 1986
46 torben 1999 public StatusData TransportStatusData
47     {
48     get
49     {
50     return this.statusData;
51     }
52     }
53    
54    
55     public bool Enabled
56     {
57     get {
58 torben 2001 return statusData.transportEnabled;
59 torben 1999 }
60     set
61     {
62 torben 2001 statusData.transportEnabled = value;
63 torben 1999 if (value == true)
64     {
65     this.addLogEntry("Transport enabled");
66     }
67     else
68     {
69     this.addLogEntry("Transport disabled");
70     }
71     }
72     }
73    
74    
75 torben 1986 private LinkedList<string> logEntries = new LinkedList<string>();
76    
77    
78     public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)
79     {
80     this.controller = controller;
81     this.name = name;
82     this.direction = direction;
83     this.queueName = queueName;
84     this.mq2sqlInsertQuery = mq2sqlInsertQuery;
85     this.sql2mqReadQuery = sql2mqReadQuery;
86     this.sql2mqUpdateQuery = sql2mqUpdateQuery;
87    
88 torben 2001 statusData.transportEnabled = enabled;
89 torben 1986
90 torben 1999
91     statusData.lastrunOk = true;
92     statusData.counter = 0;
93     statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";
94    
95 torben 1986 addLogEntry( "Starting ... " );
96     }
97    
98 torben 1999 ~Transport()
99     {
100     addLogEntry("Stopping ... ");
101     }
102    
103    
104 torben 1986 public void transportMessages()
105     {
106 torben 2001 if (statusData.transportEnabled == false)
107 torben 1986 return;
108    
109     Console.WriteLine(name + " -> transportMessages() ");
110 torben 1999 statusData.lastrunOk = true;
111 torben 1986
112 torben 1999 int startCounter = statusData.counter;
113    
114 torben 1986 if (direction == SQL2MQ)
115     {
116     transportSql2Mq();
117     }
118     else
119     {
120     transportMq2Sql();
121     }
122    
123 torben 1999 if (statusData.lastrunOk == true)
124 torben 1986 {
125 torben 1999 statusData.lastOkTime = getNowString();
126    
127     if (statusData.counter != startCounter)
128     {
129     //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet
130     statusData.lastTransferTime = getNowString();
131     }
132 torben 1986 }
133     else
134     {
135 torben 1999 addLogEntry(statusData.lastErrorMessage);
136     statusData.lastErrorTime = getNowString();
137 torben 1986 }
138     }
139    
140     private void transportSql2Mq()
141     {
142 torben 1999 string filename = getLogFilename(LogfileType.LogTransactions);
143 torben 1986 using (StreamWriter translog = new StreamWriter(filename, true) )
144     try
145     {
146     //stage 1 connect to mq
147     Hashtable connProps = getConnectionProperties();
148     MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);
149     int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
150    
151     MQQueue out_queue = mqMgr.AccessQueue(queueName, openOptions);
152    
153    
154     //stage 2 connect to mysql
155     string mysqlString = buildMysqlConnString();
156     MySqlConnection sqlConnection = new MySqlConnection(mysqlString);
157     sqlConnection.Open();
158    
159    
160     //stage 3 move messages
161     string readSql = "CALL " + sql2mqReadQuery + "()";
162     MySqlCommand readCmd = new MySqlCommand(readSql, sqlConnection);
163     MySqlDataReader dataReader = readCmd.ExecuteReader();
164     while (dataReader.Read())
165     {
166     int id = dataReader.GetInt32(0);
167     string msgString = dataReader.GetString(1);
168    
169     MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
170     // same as MQPMO_DEFAULT
171    
172     MQMessage msg = new MQMessage();
173     msg.Format = MQC.MQFMT_STRING;
174     msg.CharacterSet = 1252;
175     msg.WriteString(msgString);
176    
177     out_queue.Put(msg, pmo);
178    
179     //now that the message has been put on queue mark it as such
180    
181     string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
182     MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlConnection);
183     int numrows = updateCmd.ExecuteNonQuery();
184    
185     translog.WriteLine(getNowString() + " " + msgString + "\n");
186    
187     if (numrows != 1)
188     {
189     break;
190     }
191 torben 1999 statusData.counter++;
192 torben 1986 }
193    
194    
195     //stage 4: everything went smooth so clean up afterwards
196     dataReader.Close();
197     out_queue.Close();
198     mqMgr.Close();
199     sqlConnection.Close();
200     }
201     catch (Exception e)
202     {
203 torben 1999 statusData.lastrunOk = false;
204     statusData.lastErrorMessage = name + ".transportSql2Mq error: " + e.Message;
205     Console.WriteLine(statusData.lastErrorMessage);
206     EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
207 torben 1986 }
208     }
209    
210     private void transportMq2Sql()
211     {
212 torben 1999 string filename = getLogFilename(LogfileType.LogTransactions);
213 torben 1986 using (StreamWriter translog = new StreamWriter(filename, true))
214     try
215     {
216     //stage 1 connect to mq
217     Hashtable connProps = getConnectionProperties();
218     MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);
219     int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
220    
221     MQQueue in_queue = mqMgr.AccessQueue(queueName, openOptions);
222    
223    
224     //stage 2 connect to mysql
225     string mysqlString = buildMysqlConnString();
226     MySqlConnection sqlConnection = new MySqlConnection(mysqlString);
227     sqlConnection.Open();
228    
229    
230     //stage 3 move messages
231     bool isContinue = true;
232     while (isContinue)
233     {
234    
235     MQMessage mqMsg = new MQMessage();
236     MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
237    
238     mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
239    
240     try
241     {
242     in_queue.Get(mqMsg, mqGetMsgOpts);
243     if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
244     {
245     string msgString = mqMsg.ReadString(mqMsg.MessageLength);
246     System.Console.WriteLine(msgString);
247    
248     string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
249    
250     MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
251     int numrows = sqlcmd.ExecuteNonQuery();
252    
253     if (numrows == 1)
254     {
255     translog.WriteLine(getNowString() + " " + msgString + "\n");
256     mqMgr.Commit();
257 torben 1999 statusData.counter++;
258 torben 1986 }
259     else
260     {
261     mqMgr.Backout();
262     isContinue = false;
263     }
264    
265     }
266     else
267     {
268     System.Console.WriteLine("Non-text message");
269     }
270     }
271     catch (MQException mqe)
272     {
273     isContinue = false;
274    
275     // report reason, if any
276     if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
277     {
278     // special report for normal end
279     System.Console.WriteLine("no more messages");
280     }
281     else
282     {
283     // general report for other reasons
284     System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message);;
285 torben 1999 statusData.lastrunOk = false;
286 torben 1986 }
287    
288     }
289    
290    
291     }
292    
293     //stage 4: everything went smooth so clean up afterwards
294     in_queue.Close();
295     mqMgr.Close();
296     sqlConnection.Close();
297    
298    
299     }
300     catch (Exception e)
301     {
302 torben 1999 statusData.lastrunOk = false;
303 torben 1986
304 torben 1999 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.Message;
305     Console.WriteLine(statusData.lastErrorMessage);
306     EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
307 torben 1986 }
308     }
309    
310     private string buildMysqlConnString()
311     {
312     string connectionString = "";
313    
314     connectionString += "SERVER=" + controller.mysqlHost + ";";
315     //connectionString += "DATABASE=" + controller.mysqlHost + ";";
316     connectionString += "UID=" + controller.mysqlUser + ";";
317     connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
318    
319     return connectionString;
320     }
321    
322     private Hashtable getConnectionProperties()
323     {
324     Hashtable connProperties = new Hashtable();
325     connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
326     connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);
327     connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel); //WARNING: Hardcoded Channel Value !!!
328     return connProperties;
329     }
330    
331 torben 1999
332    
333     private string getLogFilename(LogfileType type)
334 torben 1986 {
335    
336     DateTime now = DateTime.Now;
337     string filename = controller.logDirectory + "\\";
338 torben 1999
339     switch (type)
340     {
341     case LogfileType.LogEvents:
342     filename += "eventlog_";
343     break;
344    
345     case LogfileType.LogTransactions:
346     filename += "transactionlog_";
347     break;
348     }
349    
350    
351 torben 1986 filename += "transactionlog_" + name + "_" + now.Year.ToString("D4") + now.Month.ToString("D2") + ".log";
352    
353     return filename;
354     }
355    
356     public string getNowString()
357     {
358     DateTime now = DateTime.Now;
359    
360     return now.ToString("s");
361     }
362    
363     private void addLogEntry(string msg)
364     {
365     msg = getNowString() + " " + msg;
366     lock (logEntries)
367     {
368     logEntries.AddFirst(msg);
369    
370     if (logEntries.Count > 20)
371     {
372     logEntries.RemoveLast();
373 torben 1999 }
374 torben 1986 }
375 torben 1999
376     string filename = getLogFilename(LogfileType.LogEvents);
377     using (StreamWriter eventlog = new StreamWriter(filename, true))
378     {
379     eventlog.WriteLine(msg);
380     }
381 torben 1986 }
382    
383     public string[] getLog()
384     {
385     lock(logEntries)
386     {
387     List<string> tmpEntries = new List<string>();
388     foreach (string s in logEntries)
389     {
390     tmpEntries.Add(s);
391     }
392     return tmpEntries.ToArray();
393     }
394     }
395    
396     }
397     }

  ViewVC Help
Powered by ViewVC 1.1.20