/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Annotation of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2010 - (hide annotations) (download)
Wed Jul 10 18:28:31 2013 UTC (10 years, 10 months ago) by torben
File size: 13896 byte(s)
Use 2 connections in parallel: one for reading and the other for writing
1 torben 1986 using System;
2     using System.Collections;
3     using System.Collections.Generic;
4     using System.IO;
5    
6     using System.Diagnostics;
7    
8     using IBM.WMQ;
9     using MySql.Data.MySqlClient;
10    
11     namespace DaoMqPump2
12     {
13     public class Transport
14     {
15 torben 1999
16     enum LogfileType {
17     LogTransactions,
18     LogEvents
19     }
20    
21 torben 1986 public static string SQL2MQ = "sql2mq";
22     public static string MQ2SQL = "mq2sql";
23    
24 torben 2001 //private bool enabled;
25 torben 1986
26     TransportController controller;
27    
28 torben 1999 StatusData statusData = new StatusData();
29    
30 torben 1986 public string name { get; private set; }
31     public string direction { get; private set; }
32     public string queueName { get; private set; }
33     public string mq2sqlInsertQuery { get; private set; }
34     public string sql2mqReadQuery { get; private set; }
35     public string sql2mqUpdateQuery { get; private set; }
36    
37 torben 1999 //public bool lastrunOk { get; private set; }
38     //public string lastErrorMessage { get; private set; }
39 torben 1986
40 torben 1999 //public string lastOkTime { get; private set; }
41     //public string lastErrorTime { get; private set; }
42     //public string lastTransferTime { get; private set; }
43 torben 1986
44 torben 1999 //public int counter { get; private set; }
45 torben 1986
46 torben 1999 public StatusData TransportStatusData
47     {
48     get
49     {
50     return this.statusData;
51     }
52     }
53    
54    
55     public bool Enabled
56     {
57     get {
58 torben 2001 return statusData.transportEnabled;
59 torben 1999 }
60     set
61     {
62 torben 2001 statusData.transportEnabled = value;
63 torben 1999 if (value == true)
64     {
65     this.addLogEntry("Transport enabled");
66     }
67     else
68     {
69     this.addLogEntry("Transport disabled");
70     }
71     }
72     }
73    
74    
75 torben 1986 private LinkedList<string> logEntries = new LinkedList<string>();
76    
77    
78     public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)
79     {
80     this.controller = controller;
81     this.name = name;
82     this.direction = direction;
83     this.queueName = queueName;
84     this.mq2sqlInsertQuery = mq2sqlInsertQuery;
85     this.sql2mqReadQuery = sql2mqReadQuery;
86     this.sql2mqUpdateQuery = sql2mqUpdateQuery;
87    
88 torben 2001 statusData.transportEnabled = enabled;
89 torben 1986
90 torben 1999
91     statusData.lastrunOk = true;
92     statusData.counter = 0;
93     statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";
94    
95 torben 1986 addLogEntry( "Starting ... " );
96     }
97    
98 torben 1999 ~Transport()
99     {
100     addLogEntry("Stopping ... ");
101     }
102    
103    
104 torben 1986 public void transportMessages()
105     {
106 torben 2001 if (statusData.transportEnabled == false)
107 torben 1986 return;
108    
109     Console.WriteLine(name + " -> transportMessages() ");
110 torben 1999 statusData.lastrunOk = true;
111 torben 1986
112 torben 1999 int startCounter = statusData.counter;
113    
114 torben 1986 if (direction == SQL2MQ)
115     {
116     transportSql2Mq();
117     }
118     else
119     {
120     transportMq2Sql();
121     }
122    
123 torben 1999 if (statusData.lastrunOk == true)
124 torben 1986 {
125 torben 1999 statusData.lastOkTime = getNowString();
126    
127     if (statusData.counter != startCounter)
128     {
129     //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet
130     statusData.lastTransferTime = getNowString();
131     }
132 torben 1986 }
133     else
134     {
135 torben 1999 addLogEntry(statusData.lastErrorMessage);
136     statusData.lastErrorTime = getNowString();
137 torben 1986 }
138     }
139    
140     private void transportSql2Mq()
141     {
142 torben 1999 string filename = getLogFilename(LogfileType.LogTransactions);
143 torben 1986 using (StreamWriter translog = new StreamWriter(filename, true) )
144     try
145     {
146     //stage 1 connect to mq
147     Hashtable connProps = getConnectionProperties();
148     MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);
149     int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
150    
151     MQQueue out_queue = mqMgr.AccessQueue(queueName, openOptions);
152    
153    
154     //stage 2 connect to mysql
155     string mysqlString = buildMysqlConnString();
156 torben 2010 MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString);
157     sqlReadConnection.Open();
158 torben 1986
159 torben 2010 MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString);
160     sqlWriteConnection.Open();
161 torben 1986
162 torben 2010
163 torben 1986 //stage 3 move messages
164     string readSql = "CALL " + sql2mqReadQuery + "()";
165 torben 2010 MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);
166 torben 1986 MySqlDataReader dataReader = readCmd.ExecuteReader();
167     while (dataReader.Read())
168     {
169     int id = dataReader.GetInt32(0);
170     string msgString = dataReader.GetString(1);
171    
172     MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
173     // same as MQPMO_DEFAULT
174    
175     MQMessage msg = new MQMessage();
176     msg.Format = MQC.MQFMT_STRING;
177     msg.CharacterSet = 1252;
178     msg.WriteString(msgString);
179    
180     out_queue.Put(msg, pmo);
181    
182     //now that the message has been put on queue mark it as such
183    
184     string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
185 torben 2010 MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
186 torben 1986 int numrows = updateCmd.ExecuteNonQuery();
187    
188     translog.WriteLine(getNowString() + " " + msgString + "\n");
189    
190     if (numrows != 1)
191     {
192     break;
193     }
194 torben 1999 statusData.counter++;
195 torben 1986 }
196    
197    
198     //stage 4: everything went smooth so clean up afterwards
199     dataReader.Close();
200     out_queue.Close();
201     mqMgr.Close();
202 torben 2010 sqlReadConnection.Close();
203     sqlWriteConnection.Close();
204 torben 1986 }
205     catch (Exception e)
206     {
207 torben 1999 statusData.lastrunOk = false;
208     statusData.lastErrorMessage = name + ".transportSql2Mq error: " + e.Message;
209     Console.WriteLine(statusData.lastErrorMessage);
210     EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
211 torben 1986 }
212     }
213    
214     private void transportMq2Sql()
215     {
216 torben 1999 string filename = getLogFilename(LogfileType.LogTransactions);
217 torben 1986 using (StreamWriter translog = new StreamWriter(filename, true))
218     try
219     {
220     //stage 1 connect to mq
221     Hashtable connProps = getConnectionProperties();
222     MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);
223     int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
224    
225     MQQueue in_queue = mqMgr.AccessQueue(queueName, openOptions);
226    
227    
228     //stage 2 connect to mysql
229     string mysqlString = buildMysqlConnString();
230     MySqlConnection sqlConnection = new MySqlConnection(mysqlString);
231     sqlConnection.Open();
232    
233    
234     //stage 3 move messages
235     bool isContinue = true;
236     while (isContinue)
237     {
238    
239     MQMessage mqMsg = new MQMessage();
240     MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
241    
242     mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
243    
244     try
245     {
246     in_queue.Get(mqMsg, mqGetMsgOpts);
247     if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
248     {
249     string msgString = mqMsg.ReadString(mqMsg.MessageLength);
250     System.Console.WriteLine(msgString);
251    
252     string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
253    
254     MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
255     int numrows = sqlcmd.ExecuteNonQuery();
256    
257     if (numrows == 1)
258     {
259     translog.WriteLine(getNowString() + " " + msgString + "\n");
260     mqMgr.Commit();
261 torben 1999 statusData.counter++;
262 torben 1986 }
263     else
264     {
265     mqMgr.Backout();
266     isContinue = false;
267     }
268    
269     }
270     else
271     {
272     System.Console.WriteLine("Non-text message");
273     }
274     }
275     catch (MQException mqe)
276     {
277     isContinue = false;
278    
279     // report reason, if any
280     if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
281     {
282     // special report for normal end
283     System.Console.WriteLine("no more messages");
284     }
285     else
286     {
287     // general report for other reasons
288     System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message);;
289 torben 1999 statusData.lastrunOk = false;
290 torben 1986 }
291    
292     }
293    
294    
295     }
296    
297     //stage 4: everything went smooth so clean up afterwards
298     in_queue.Close();
299     mqMgr.Close();
300     sqlConnection.Close();
301    
302    
303     }
304     catch (Exception e)
305     {
306 torben 1999 statusData.lastrunOk = false;
307 torben 1986
308 torben 1999 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.Message;
309     Console.WriteLine(statusData.lastErrorMessage);
310     EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
311 torben 1986 }
312     }
313    
314     private string buildMysqlConnString()
315     {
316     string connectionString = "";
317    
318     connectionString += "SERVER=" + controller.mysqlHost + ";";
319     //connectionString += "DATABASE=" + controller.mysqlHost + ";";
320     connectionString += "UID=" + controller.mysqlUser + ";";
321     connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
322    
323     return connectionString;
324     }
325    
326     private Hashtable getConnectionProperties()
327     {
328     Hashtable connProperties = new Hashtable();
329     connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
330     connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);
331     connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel); //WARNING: Hardcoded Channel Value !!!
332     return connProperties;
333     }
334    
335 torben 1999
336    
337     private string getLogFilename(LogfileType type)
338 torben 1986 {
339    
340     DateTime now = DateTime.Now;
341     string filename = controller.logDirectory + "\\";
342 torben 1999
343     switch (type)
344     {
345     case LogfileType.LogEvents:
346     filename += "eventlog_";
347     break;
348    
349     case LogfileType.LogTransactions:
350     filename += "transactionlog_";
351     break;
352     }
353    
354    
355 torben 2003 filename += name + "_" + now.Year.ToString("D4") + now.Month.ToString("D2") + ".log";
356 torben 1986
357     return filename;
358     }
359    
360     public string getNowString()
361     {
362     DateTime now = DateTime.Now;
363    
364     return now.ToString("s");
365     }
366    
367     private void addLogEntry(string msg)
368     {
369     msg = getNowString() + " " + msg;
370     lock (logEntries)
371     {
372     logEntries.AddFirst(msg);
373    
374     if (logEntries.Count > 20)
375     {
376     logEntries.RemoveLast();
377 torben 1999 }
378 torben 1986 }
379 torben 1999
380     string filename = getLogFilename(LogfileType.LogEvents);
381     using (StreamWriter eventlog = new StreamWriter(filename, true))
382     {
383     eventlog.WriteLine(msg);
384     }
385 torben 1986 }
386    
387     public string[] getLog()
388     {
389     lock(logEntries)
390     {
391     List<string> tmpEntries = new List<string>();
392     foreach (string s in logEntries)
393     {
394     tmpEntries.Add(s);
395     }
396     return tmpEntries.ToArray();
397     }
398     }
399    
400     }
401     }

  ViewVC Help
Powered by ViewVC 1.1.20