/[projects]/dao/DaoMqPump2/DaoMqPump2/Transport.cs
ViewVC logotype

Annotation of /dao/DaoMqPump2/DaoMqPump2/Transport.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1986 - (hide annotations) (download)
Wed Jul 3 07:56:52 2013 UTC (10 years, 11 months ago) by torben
File size: 11615 byte(s)
Add files
1 torben 1986 using System;
2     using System.Collections;
3     using System.Collections.Generic;
4     using System.IO;
5    
6     using System.Diagnostics;
7    
8     using IBM.WMQ;
9     using MySql.Data.MySqlClient;
10    
11     namespace DaoMqPump2
12     {
13     public class Transport
14     {
15     public static string SQL2MQ = "sql2mq";
16     public static string MQ2SQL = "mq2sql";
17    
18    
19     public bool enabled { get; set; }
20    
21     TransportController controller;
22    
23     public string name { get; private set; }
24     public string direction { get; private set; }
25     public string queueName { get; private set; }
26     public string mq2sqlInsertQuery { get; private set; }
27     public string sql2mqReadQuery { get; private set; }
28     public string sql2mqUpdateQuery { get; private set; }
29    
30     public bool lastrunOk { get; private set; }
31     public string lastErrorMessage { get; private set; }
32    
33     public string lastOkTime { get; private set; }
34     public string lastErrorTime { get; private set; }
35    
36     public int counter { get; private set; }
37    
38     private LinkedList<string> logEntries = new LinkedList<string>();
39    
40    
41     public Transport(TransportController controller, string name, string direction, string queueName, string mq2sqlInsertQuery, string sql2mqReadQuery, string sql2mqUpdateQuery, bool enabled)
42     {
43     this.controller = controller;
44     this.name = name;
45     this.direction = direction;
46     this.queueName = queueName;
47     this.mq2sqlInsertQuery = mq2sqlInsertQuery;
48     this.sql2mqReadQuery = sql2mqReadQuery;
49     this.sql2mqUpdateQuery = sql2mqUpdateQuery;
50    
51     this.enabled = enabled;
52    
53    
54     lastrunOk = true;
55     counter = 0;
56     lastErrorMessage = lastOkTime = lastErrorTime = "";
57    
58     addLogEntry( "Starting ... " );
59     }
60    
61     public void transportMessages()
62     {
63     if (enabled == false)
64     return;
65    
66     Console.WriteLine(name + " -> transportMessages() ");
67     lastrunOk = true;
68    
69     if (direction == SQL2MQ)
70     {
71     transportSql2Mq();
72     }
73     else
74     {
75     transportMq2Sql();
76     }
77    
78     if (lastrunOk == true)
79     {
80     lastOkTime = getNowString();
81     }
82     else
83     {
84     addLogEntry(lastErrorMessage);
85     lastErrorTime = getNowString();
86     }
87     }
88    
89     private void transportSql2Mq()
90     {
91     string filename = getTransactionlogFilename();
92     using (StreamWriter translog = new StreamWriter(filename, true) )
93     try
94     {
95     //stage 1 connect to mq
96     Hashtable connProps = getConnectionProperties();
97     MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);
98     int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING;
99    
100     MQQueue out_queue = mqMgr.AccessQueue(queueName, openOptions);
101    
102    
103     //stage 2 connect to mysql
104     string mysqlString = buildMysqlConnString();
105     MySqlConnection sqlConnection = new MySqlConnection(mysqlString);
106     sqlConnection.Open();
107    
108    
109     //stage 3 move messages
110     string readSql = "CALL " + sql2mqReadQuery + "()";
111     MySqlCommand readCmd = new MySqlCommand(readSql, sqlConnection);
112     MySqlDataReader dataReader = readCmd.ExecuteReader();
113     while (dataReader.Read())
114     {
115     int id = dataReader.GetInt32(0);
116     string msgString = dataReader.GetString(1);
117    
118     MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
119     // same as MQPMO_DEFAULT
120    
121     MQMessage msg = new MQMessage();
122     msg.Format = MQC.MQFMT_STRING;
123     msg.CharacterSet = 1252;
124     msg.WriteString(msgString);
125    
126     out_queue.Put(msg, pmo);
127    
128     //now that the message has been put on queue mark it as such
129    
130     string updateSql = "CALL " + sql2mqUpdateQuery + "(" + id + ")";
131     MySqlCommand updateCmd = new MySqlCommand(updateSql, sqlConnection);
132     int numrows = updateCmd.ExecuteNonQuery();
133    
134     translog.WriteLine(getNowString() + " " + msgString + "\n");
135    
136     if (numrows != 1)
137     {
138     break;
139     }
140     counter++;
141     }
142    
143    
144     //stage 4: everything went smooth so clean up afterwards
145     dataReader.Close();
146     out_queue.Close();
147     mqMgr.Close();
148     sqlConnection.Close();
149     }
150     catch (Exception e)
151     {
152     lastrunOk = false;
153     lastErrorMessage = name + ".transportSql2Mq error: " + e.Message;
154     Console.WriteLine(lastErrorMessage);
155     EventLog.WriteEntry("DaoMqPump2", lastErrorMessage, EventLogEntryType.Warning);
156     }
157     }
158    
159     private void transportMq2Sql()
160     {
161     string filename = getTransactionlogFilename();
162     using (StreamWriter translog = new StreamWriter(filename, true))
163     try
164     {
165     //stage 1 connect to mq
166     Hashtable connProps = getConnectionProperties();
167     MQQueueManager mqMgr = new MQQueueManager(controller.mqQueueManager, connProps);
168     int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
169    
170     MQQueue in_queue = mqMgr.AccessQueue(queueName, openOptions);
171    
172    
173     //stage 2 connect to mysql
174     string mysqlString = buildMysqlConnString();
175     MySqlConnection sqlConnection = new MySqlConnection(mysqlString);
176     sqlConnection.Open();
177    
178    
179     //stage 3 move messages
180     bool isContinue = true;
181     while (isContinue)
182     {
183    
184     MQMessage mqMsg = new MQMessage();
185     MQGetMessageOptions mqGetMsgOpts = new MQGetMessageOptions();
186    
187     mqGetMsgOpts.Options = MQC.MQGMO_SYNCPOINT; //kræver en commit førend at den er fjernet fra køen
188    
189     try
190     {
191     in_queue.Get(mqMsg, mqGetMsgOpts);
192     if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
193     {
194     string msgString = mqMsg.ReadString(mqMsg.MessageLength);
195     System.Console.WriteLine(msgString);
196    
197     string sql = "CALL " + mq2sqlInsertQuery + "( '" + MySqlHelper.EscapeString(msgString) + "' )"; //opbygger en CALL somestoredprocedure('msgString'); sql streng
198    
199     MySqlCommand sqlcmd = new MySqlCommand(sql, sqlConnection);
200     int numrows = sqlcmd.ExecuteNonQuery();
201    
202     if (numrows == 1)
203     {
204     translog.WriteLine(getNowString() + " " + msgString + "\n");
205     mqMgr.Commit();
206     counter++;
207     }
208     else
209     {
210     mqMgr.Backout();
211     isContinue = false;
212     }
213    
214     }
215     else
216     {
217     System.Console.WriteLine("Non-text message");
218     }
219     }
220     catch (MQException mqe)
221     {
222     isContinue = false;
223    
224     // report reason, if any
225     if (mqe.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
226     {
227     // special report for normal end
228     System.Console.WriteLine("no more messages");
229     }
230     else
231     {
232     // general report for other reasons
233     System.Console.WriteLine("MQQueue::Get ended with " + mqe.Message);;
234     lastrunOk = false;
235     }
236    
237     }
238    
239    
240     }
241    
242     //stage 4: everything went smooth so clean up afterwards
243     in_queue.Close();
244     mqMgr.Close();
245     sqlConnection.Close();
246    
247    
248     }
249     catch (Exception e)
250     {
251     lastrunOk = false;
252    
253     lastErrorMessage = name + ".transportMq2Sql error: " + e.Message;
254     Console.WriteLine(lastErrorMessage);
255     EventLog.WriteEntry("DaoMqPump2", lastErrorMessage, EventLogEntryType.Warning);
256     }
257     }
258    
259     private string buildMysqlConnString()
260     {
261     string connectionString = "";
262    
263     connectionString += "SERVER=" + controller.mysqlHost + ";";
264     //connectionString += "DATABASE=" + controller.mysqlHost + ";";
265     connectionString += "UID=" + controller.mysqlUser + ";";
266     connectionString += "PASSWORD=" + controller.mysqlPassword + ";";
267    
268     return connectionString;
269     }
270    
271     private Hashtable getConnectionProperties()
272     {
273     Hashtable connProperties = new Hashtable();
274     connProperties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
275     connProperties.Add(MQC.HOST_NAME_PROPERTY, controller.mqHost);
276     connProperties.Add(MQC.CHANNEL_PROPERTY, controller.mqChannel); //WARNING: Hardcoded Channel Value !!!
277     return connProperties;
278     }
279    
280     private string getTransactionlogFilename()
281     {
282    
283     DateTime now = DateTime.Now;
284     string filename = controller.logDirectory + "\\";
285     filename += "transactionlog_" + name + "_" + now.Year.ToString("D4") + now.Month.ToString("D2") + ".log";
286    
287     return filename;
288     }
289    
290     public string getNowString()
291     {
292     DateTime now = DateTime.Now;
293    
294     return now.ToString("s");
295     }
296    
297     private void addLogEntry(string msg)
298     {
299     msg = getNowString() + " " + msg;
300     lock (logEntries)
301     {
302     logEntries.AddFirst(msg);
303    
304     if (logEntries.Count > 20)
305     {
306     logEntries.RemoveLast();
307     }
308    
309     }
310     }
311    
312     public string[] getLog()
313     {
314     lock(logEntries)
315     {
316     List<string> tmpEntries = new List<string>();
317     foreach (string s in logEntries)
318     {
319     tmpEntries.Add(s);
320     }
321     return tmpEntries.ToArray();
322     }
323     }
324    
325     }
326     }

  ViewVC Help
Powered by ViewVC 1.1.20