/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Contents of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1986 - (show annotations) (download)
Wed Jul 3 07:56:52 2013 UTC (10 years, 10 months ago) by torben
File size: 11615 byte(s)
Add files
1 using System;
2 using System.Collections;
3 using System.Collections.Generic;
4 using System.IO;
5
6 using System.Diagnostics;
7
8 using IBM.WMQ;
9 using MySql.Data.MySqlClient;
10
11 namespace DaoMqPump2
12 {
13 public class Transport
14 {
15 public static string SQL2MQ = "sql2mq";
16 public static string MQ2SQL = "mq2sql";
17
18
19 public bool enabled { get; set; }
20
21 TransportController controller;
22
23 public string name { get; private set; }
24 public string direction { get; private set; }
25 public string queueName { get; private set; }
26 public string mq2sqlInsertQuery { get; private set; }
27 public string sql2mqReadQuery { get; private set; }
28 public string sql2mqUpdateQuery { get; private set; }
29
30 public bool lastrunOk { get; private set; }
31 public string lastErrorMessage { get; private set; }
32
33 public string lastOkTime { get; private set; }
34 public string lastErrorTime { get; private set; }
35
36 public int counter { get; private set; }
37
38 private LinkedList<string> logEntries = new LinkedList<string>();
39
40
41 public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)
42 {
43 this.controller = controller;
44 this.name = name;
45 this.direction = direction;
46 this.queueName = queueName;
47 this.mq2sqlInsertQuery = mq2sqlInsertQuery;
48 this.sql2mqReadQuery = sql2mqReadQuery;
49 this.sql2mqUpdateQuery = sql2mqUpdateQuery;
50
51 this.enabled = enabled;
52
53
54 lastrunOk = true;
55 counter = 0;
56 lastErrorMessage = lastOkTime = lastErrorTime = "";
57
58 addLogEntry( "Starting ... " );
59 }
60
61 public void transportMessages()
62 {
63 if (enabled == false)
64 return;
65
66 Console.WriteLine(name + " -> transportMessages() ");
67 lastrunOk = true;
68
69 if (direction == SQL2MQ)
70 {
71 transportSql2Mq();
72 }
73 else
74 {
75 transportMq2Sql();
76 }
77
78 if (lastrunOk == true)
79 {
80 lastOkTime = getNowString();
81 }
82 else
83 {
84 addLogEntry(lastErrorMessage);
85 lastErrorTime = getNowString();
86 }
87 }
88
89 private void transportSql2Mq()
90 {
91 string filename = getTransactionlogFilename();
92 using (StreamWriter translog = new StreamWriter(filename, true) )
93 try
94 {
95 //stage 1 connect to mq
96 Hashtable connProps = getConnectionProperties();
97 MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);
98 int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
99
100 MQQueue out_queue = mqMgr.AccessQueue(queueName, openOptions);
101
102
103 //stage 2 connect to mysql
104 string mysqlString = buildMysqlConnString();
105 MySqlConnection sqlConnection = new MySqlConnection(mysqlString);
106 sqlConnection.Open();
107
108
109 //stage 3 move messages
110 string readSql = "CALL " + sql2mqReadQuery + "()";
111 MySqlCommand readCmd = new MySqlCommand(readSql, sqlConnection);
112 MySqlDataReader dataReader = readCmd.ExecuteReader();
113 while (dataReader.Read())
114 {
115 int id = dataReader.GetInt32(0);
116 string msgString = dataReader.GetString(1);
117
118 MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
119 // same as MQPMO_DEFAULT
120
121 MQMessage msg = new MQMessage();
122 msg.Format = MQC.MQFMT_STRING;
123 msg.CharacterSet = 1252;
124 msg.WriteString(msgString);
125
126 out_queue.Put(msg, pmo);
127
128 //now that the message has been put on queue mark it as such
129
130 string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
131 MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlConnection);
132 int numrows = updateCmd.ExecuteNonQuery();
133
134 translog.WriteLine(getNowString() + " " + msgString + "\n");
135
136 if (numrows != 1)
137 {
138 break;
139 }
140 counter++;
141 }
142
143
144 //stage 4: everything went smooth so clean up afterwards
145 dataReader.Close();
146 out_queue.Close();
147 mqMgr.Close();
148 sqlConnection.Close();
149 }
150 catch (Exception e)
151 {
152 lastrunOk = false;
153 lastErrorMessage = name + ".transportSql2Mq error: " + e.Message;
154 Console.WriteLine(lastErrorMessage);
155 EventLog.WriteEntry("DaoMqPump2", lastErrorMessage, EventLogEntryType.Warning);
156 }
157 }
158
159 private void transportMq2Sql()
160 {
161 string filename = getTransactionlogFilename();
162 using (StreamWriter translog = new StreamWriter(filename, true))
163 try
164 {
165 //stage 1 connect to mq
166 Hashtable connProps = getConnectionProperties();
167 MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);
168 int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
169
170 MQQueue in_queue = mqMgr.AccessQueue(queueName, openOptions);
171
172
173 //stage 2 connect to mysql
174 string mysqlString = buildMysqlConnString();
175 MySqlConnection sqlConnection = new MySqlConnection(mysqlString);
176 sqlConnection.Open();
177
178
179 //stage 3 move messages
180 bool isContinue = true;
181 while (isContinue)
182 {
183
184 MQMessage mqMsg = new MQMessage();
185 MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
186
187 mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
188
189 try
190 {
191 in_queue.Get(mqMsg, mqGetMsgOpts);
192 if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
193 {
194 string msgString = mqMsg.ReadString(mqMsg.MessageLength);
195 System.Console.WriteLine(msgString);
196
197 string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
198
199 MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
200 int numrows = sqlcmd.ExecuteNonQuery();
201
202 if (numrows == 1)
203 {
204 translog.WriteLine(getNowString() + " " + msgString + "\n");
205 mqMgr.Commit();
206 counter++;
207 }
208 else
209 {
210 mqMgr.Backout();
211 isContinue = false;
212 }
213
214 }
215 else
216 {
217 System.Console.WriteLine("Non-text message");
218 }
219 }
220 catch (MQException mqe)
221 {
222 isContinue = false;
223
224 // report reason, if any
225 if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
226 {
227 // special report for normal end
228 System.Console.WriteLine("no more messages");
229 }
230 else
231 {
232 // general report for other reasons
233 System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message);;
234 lastrunOk = false;
235 }
236
237 }
238
239
240 }
241
242 //stage 4: everything went smooth so clean up afterwards
243 in_queue.Close();
244 mqMgr.Close();
245 sqlConnection.Close();
246
247
248 }
249 catch (Exception e)
250 {
251 lastrunOk = false;
252
253 lastErrorMessage = name + ".transportMq2Sql error: " + e.Message;
254 Console.WriteLine(lastErrorMessage);
255 EventLog.WriteEntry("DaoMqPump2", lastErrorMessage, EventLogEntryType.Warning);
256 }
257 }
258
259 private string buildMysqlConnString()
260 {
261 string connectionString = "";
262
263 connectionString += "SERVER=" + controller.mysqlHost + ";";
264 //connectionString += "DATABASE=" + controller.mysqlHost + ";";
265 connectionString += "UID=" + controller.mysqlUser + ";";
266 connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
267
268 return connectionString;
269 }
270
271 private Hashtable getConnectionProperties()
272 {
273 Hashtable connProperties = new Hashtable();
274 connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
275 connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);
276 connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel); //WARNING: Hardcoded Channel Value !!!
277 return connProperties;
278 }
279
280 private string getTransactionlogFilename()
281 {
282
283 DateTime now = DateTime.Now;
284 string filename = controller.logDirectory + "\\";
285 filename += "transactionlog_" + name + "_" + now.Year.ToString("D4") + now.Month.ToString("D2") + ".log";
286
287 return filename;
288 }
289
290 public string getNowString()
291 {
292 DateTime now = DateTime.Now;
293
294 return now.ToString("s");
295 }
296
297 private void addLogEntry(string msg)
298 {
299 msg = getNowString() + " " + msg;
300 lock (logEntries)
301 {
302 logEntries.AddFirst(msg);
303
304 if (logEntries.Count > 20)
305 {
306 logEntries.RemoveLast();
307 }
308
309 }
310 }
311
312 public string[] getLog()
313 {
314 lock(logEntries)
315 {
316 List<string> tmpEntries = new List<string>();
317 foreach (string s in logEntries)
318 {
319 tmpEntries.Add(s);
320 }
321 return tmpEntries.ToArray();
322 }
323 }
324
325 }
326 }

  ViewVC Help
Powered by ViewVC 1.1.20