/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Annotation of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2047 - (hide annotations) (download)
Mon Aug 19 07:21:59 2013 UTC (10 years, 9 months ago) by torben
File size: 13938 byte(s)
MySQL max pool size is 10 connections
1 torben 1986 using System;
2     using System.Collections;
3     using System.Collections.Generic;
4     using System.IO;
5    
6     using System.Diagnostics;
7    
8     using IBM.WMQ;
9     using MySql.Data.MySqlClient;
10    
11     namespace DaoMqPump2
12     {
13     public class Transport
14     {
15 torben 1999
16     enum LogfileType {
17     LogTransactions,
18     LogEvents
19     }
20    
21 torben 1986 public static string SQL2MQ = "sql2mq";
22     public static string MQ2SQL = "mq2sql";
23    
24 torben 2001 //private bool enabled;
25 torben 1986
26     TransportController controller;
27    
28 torben 1999 StatusData statusData = new StatusData();
29    
30 torben 1986 public string name { get; private set; }
31     public string direction { get; private set; }
32     public string queueName { get; private set; }
33     public string mq2sqlInsertQuery { get; private set; }
34     public string sql2mqReadQuery { get; private set; }
35     public string sql2mqUpdateQuery { get; private set; }
36    
37 torben 1999 //public bool lastrunOk { get; private set; }
38     //public string lastErrorMessage { get; private set; }
39 torben 1986
40 torben 1999 //public string lastOkTime { get; private set; }
41     //public string lastErrorTime { get; private set; }
42     //public string lastTransferTime { get; private set; }
43 torben 1986
44 torben 1999 //public int counter { get; private set; }
45 torben 1986
46 torben 1999 public StatusData TransportStatusData
47     {
48     get
49     {
50     return this.statusData;
51     }
52     }
53    
54    
55     public bool Enabled
56     {
57     get {
58 torben 2001 return statusData.transportEnabled;
59 torben 1999 }
60     set
61     {
62 torben 2001 statusData.transportEnabled = value;
63 torben 1999 if (value == true)
64     {
65     this.addLogEntry("Transport enabled");
66     }
67     else
68     {
69     this.addLogEntry("Transport disabled");
70     }
71     }
72     }
73    
74    
75 torben 1986 private LinkedList<string> logEntries = new LinkedList<string>();
76    
77    
78     public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)
79     {
80     this.controller = controller;
81     this.name = name;
82     this.direction = direction;
83     this.queueName = queueName;
84     this.mq2sqlInsertQuery = mq2sqlInsertQuery;
85     this.sql2mqReadQuery = sql2mqReadQuery;
86     this.sql2mqUpdateQuery = sql2mqUpdateQuery;
87    
88 torben 2001 statusData.transportEnabled = enabled;
89 torben 1986
90 torben 1999
91     statusData.lastrunOk = true;
92     statusData.counter = 0;
93     statusData.lastErrorMessage = statusData.lastOkTime = statusData.lastErrorTime = "";
94    
95 torben 1986 addLogEntry( "Starting ... " );
96     }
97    
98 torben 1999 ~Transport()
99     {
100     addLogEntry("Stopping ... ");
101     }
102    
103    
104 torben 1986 public void transportMessages()
105     {
106 torben 2001 if (statusData.transportEnabled == false)
107 torben 1986 return;
108    
109     Console.WriteLine(name + " -> transportMessages() ");
110 torben 1999 statusData.lastrunOk = true;
111 torben 1986
112 torben 1999 int startCounter = statusData.counter;
113    
114 torben 1986 if (direction == SQL2MQ)
115     {
116     transportSql2Mq();
117     }
118     else
119     {
120     transportMq2Sql();
121     }
122    
123 torben 1999 if (statusData.lastrunOk == true)
124 torben 1986 {
125 torben 1999 statusData.lastOkTime = getNowString();
126    
127     if (statusData.counter != startCounter)
128     {
129     //Vi har transporteret beskeder - så gemmer vi lige transfer tidspunktet
130     statusData.lastTransferTime = getNowString();
131     }
132 torben 1986 }
133     else
134     {
135 torben 1999 addLogEntry(statusData.lastErrorMessage);
136     statusData.lastErrorTime = getNowString();
137 torben 1986 }
138     }
139    
140     private void transportSql2Mq()
141     {
142 torben 1999 string filename = getLogFilename(LogfileType.LogTransactions);
143 torben 1986 using (StreamWriter translog = new StreamWriter(filename, true) )
144     try
145     {
146 torben 2011 //MQ Options
147 torben 1986 Hashtable connProps = getConnectionProperties();
148     int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
149    
150 torben 2011 //MySQL Options
151 torben 1986 string mysqlString = buildMysqlConnString();
152 torben 2011
153 torben 1986
154 torben 2011 using (MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps))//stage 1 connect to mq
155     using (MQQueue out_queue = mqMgr.AccessQueue(queueName, openOptions))
156     using (MySqlConnection sqlReadConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
157     using (MySqlConnection sqlWriteConnection = new MySqlConnection(mysqlString))
158     {
159     sqlReadConnection.Open();
160     sqlWriteConnection.Open();
161 torben 1986
162 torben 2011 //stage 3 move messages
163     string readSql = "CALL " + sql2mqReadQuery + "()";
164     MySqlCommand readCmd = new MySqlCommand(readSql, sqlReadConnection);
165     MySqlDataReader dataReader = readCmd.ExecuteReader();
166     while (dataReader.Read())
167     {
168     int id = dataReader.GetInt32(0);
169     string msgString = dataReader.GetString(1);
170 torben 2010
171 torben 2011 MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
172     // same as MQPMO_DEFAULT
173 torben 1986
174 torben 2011 MQMessage msg = new MQMessage();
175     msg.Format = MQC.MQFMT_STRING;
176     msg.CharacterSet = 1252;
177     msg.WriteString(msgString);
178 torben 1986
179 torben 2011 out_queue.Put(msg, pmo);
180 torben 1986
181 torben 2011 //now that the message has been put on queue mark it as such
182 torben 1986
183 torben 2011 string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
184     MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlWriteConnection);
185     int numrows = updateCmd.ExecuteNonQuery();
186 torben 1986
187 torben 2011 translog.WriteLine(getNowString() + " " + msgString + "\n");
188 torben 1986
189 torben 2011 if (numrows != 1)
190     {
191     break;
192     }
193     statusData.counter++;
194     }
195 torben 1986
196     }
197     }
198     catch (Exception e)
199     {
200 torben 1999 statusData.lastrunOk = false;
201     statusData.lastErrorMessage = name + ".transportSql2Mq error: " + e.Message;
202     Console.WriteLine(statusData.lastErrorMessage);
203     EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
204 torben 1986 }
205     }
206    
207     private void transportMq2Sql()
208     {
209 torben 1999 string filename = getLogFilename(LogfileType.LogTransactions);
210 torben 1986 using (StreamWriter translog = new StreamWriter(filename, true))
211     try
212     {
213 torben 2011 //MQ options
214     Hashtable connProps = getConnectionProperties();
215 torben 1986 int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
216    
217 torben 2011 //MySQL options
218 torben 1986 string mysqlString = buildMysqlConnString();
219    
220 torben 2011 using (MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps))//stage 1 connect to mq
221     using (MQQueue in_queue = mqMgr.AccessQueue(queueName, openOptions) )
222     using (MySqlConnection sqlConnection = new MySqlConnection(mysqlString)) //stage 2 connect to mysql
223 torben 1986 {
224    
225 torben 2011 sqlConnection.Open();
226 torben 1986
227 torben 2011
228     //stage 3 move messages
229     bool isContinue = true;
230     while (isContinue)
231 torben 1986 {
232    
233 torben 2011 MQMessage mqMsg = new MQMessage();
234     MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
235 torben 1986
236 torben 2011 mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
237 torben 1986
238 torben 2011 try
239     {
240     in_queue.Get(mqMsg, mqGetMsgOpts);
241     if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
242 torben 1986 {
243 torben 2011 string msgString = mqMsg.ReadString(mqMsg.MessageLength);
244     System.Console.WriteLine(msgString);
245    
246     string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
247    
248     MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
249     int numrows = sqlcmd.ExecuteNonQuery();
250    
251     if (numrows == 1)
252     {
253     translog.WriteLine(getNowString() + " " + msgString + "\n");
254     mqMgr.Commit();
255     statusData.counter++;
256     }
257     else
258     {
259     mqMgr.Backout();
260     isContinue = false;
261     }
262    
263 torben 1986 }
264     else
265     {
266 torben 2011 System.Console.WriteLine("Non-text message");
267     }
268 torben 1986 }
269 torben 2011 catch (MQException mqe)
270 torben 1986 {
271 torben 2011 isContinue = false;
272 torben 1986
273 torben 2011 // report reason, if any
274     if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
275     {
276     // special report for normal end
277     System.Console.WriteLine("no more messages");
278     }
279     else
280     {
281     // general report for other reasons
282     System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message); ;
283     statusData.lastrunOk = false;
284     }
285    
286 torben 1986 }
287    
288 torben 2011
289 torben 1986 }
290    
291     }
292    
293     }
294     catch (Exception e)
295     {
296 torben 1999 statusData.lastrunOk = false;
297 torben 1986
298 torben 1999 statusData.lastErrorMessage = name + ".transportMq2Sql error: " + e.Message;
299     Console.WriteLine(statusData.lastErrorMessage);
300     EventLog.WriteEntry("DaoMqPump2", statusData.lastErrorMessage, EventLogEntryType.Warning);
301 torben 1986 }
302     }
303    
304     private string buildMysqlConnString()
305     {
306     string connectionString = "";
307    
308     connectionString += "SERVER=" + controller.mysqlHost + ";";
309     //connectionString += "DATABASE=" + controller.mysqlHost + ";";
310     connectionString += "UID=" + controller.mysqlUser + ";";
311     connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
312 torben 2047 connectionString += "maximumpoolsize=10;";
313 torben 1986
314     return connectionString;
315     }
316    
317     private Hashtable getConnectionProperties()
318     {
319     Hashtable connProperties = new Hashtable();
320     connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
321     connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);
322     connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel); //WARNING: Hardcoded Channel Value !!!
323     return connProperties;
324     }
325    
326 torben 1999
327    
328     private string getLogFilename(LogfileType type)
329 torben 1986 {
330    
331     DateTime now = DateTime.Now;
332     string filename = controller.logDirectory + "\\";
333 torben 1999
334     switch (type)
335     {
336     case LogfileType.LogEvents:
337     filename += "eventlog_";
338     break;
339    
340     case LogfileType.LogTransactions:
341     filename += "transactionlog_";
342     break;
343     }
344    
345    
346 torben 2003 filename += name + "_" + now.Year.ToString("D4") + now.Month.ToString("D2") + ".log";
347 torben 1986
348     return filename;
349     }
350    
351     public string getNowString()
352     {
353     DateTime now = DateTime.Now;
354    
355     return now.ToString("s");
356     }
357    
358     private void addLogEntry(string msg)
359     {
360     msg = getNowString() + " " + msg;
361     lock (logEntries)
362     {
363     logEntries.AddFirst(msg);
364    
365     if (logEntries.Count > 20)
366     {
367     logEntries.RemoveLast();
368 torben 1999 }
369 torben 1986 }
370 torben 1999
371     string filename = getLogFilename(LogfileType.LogEvents);
372     using (StreamWriter eventlog = new StreamWriter(filename, true))
373     {
374     eventlog.WriteLine(msg);
375     }
376 torben 1986 }
377    
378     public string[] getLog()
379     {
380     lock(logEntries)
381     {
382     List<string> tmpEntries = new List<string>();
383     foreach (string s in logEntries)
384     {
385     tmpEntries.Add(s);
386     }
387     return tmpEntries.ToArray();
388     }
389     }
390    
391     }
392     }

  ViewVC Help
Powered by ViewVC 1.1.20