Querying Data Asynchronously with Message Queuing

Problem

You want to asynchronously retrieve data from a system that is not always connected.

Solution

You must:

The sample code contains three event handlers:

Send Button.Click

Checks if the query message queue exists and creates it if necessary. A MessageQueue object is created to access the queue. A message is sent to the queue containing the CustomerID which the user wants information about.

Process Query Button.Click

Checks if the query message queue exists and creates it if necessary. A MessageQueue object is created to access the queue. An attempt is made to receive a message from the queue, waiting one second before giving up. If a message is received, the CustomerID is extracted from the message and the message queue is closed. A DataSet is created and a DataAdapter is used to return the record for the requested CustomerID into a Customer DataTable in the DataSet . A result queue is created if necessary and a message labeled with the CustomerID and containing the DataSet with the asynchronous query results is sent to the queue.

Process Result Button.Click

Checks if the result message queue exists and creates it if necessary. A MessageQueue object is created to access the queue and the formatter set to deserialize the DataSet in the message bodies. An attempt is made to receive a message from the queue, waiting for one second before giving up. If a message is received, the DataSet in the body is deserialized and the contents are displayed.

The C# code is shown in Example 2-31.

Example 2-31. File: MessageQueueQueryForm.cs

// Namespaces, variables, and constants using System; using System.Configuration; using System.IO; using System.Text; using System.Messaging; using System.Data; using System.Data.SqlClient; private const String QUEUENAMEQUERY = @".Private$adodotnetcb0222query"; private const String QUEUENAMERESULT = @".Private$adodotnetcb0222result"; private System.Messaging.MessageQueue messageQueue; // . . . private void sendButton_Click(object sender, System.EventArgs e) { // Create the query queue if it does not exist. if(!MessageQueue.Exists(QUEUENAMEQUERY)) MessageQueue.Create(QUEUENAMEQUERY); // Create an object to access the query queue. MessageQueue mq = new MessageQueue(QUEUENAMEQUERY); // Send a message containing the user-enetered customer ID. String msg = "CustomerId=" + customerIdTextBox.Text; mq.Send(msg); resultTextBox.Text = "Query sent."; } private void processQueryButton_Click(object sender, System.EventArgs e) { // Create the query queue if it does not exist. if(!MessageQueue.Exists(QUEUENAMEQUERY)) MessageQueue.Create(QUEUENAMEQUERY); // Create an object to access the query queue. MessageQueue mq = new MessageQueue(QUEUENAMEQUERY); // Set the formatter for (de)serialization of message bodies. mq.Formatter = new XmlMessageFormatter(new Type[] {typeof(String)}); // Receive a message from the query queue. System.Messaging.Message msg; try { msg = mq.Receive(new TimeSpan(0, 0, 1)); resultTextBox.Text = "Query " + msg.Id + " received." + Environment.NewLine; } catch(MessageQueueException ex) { resultTextBox.Text = ex.Message; return; } // Get the customer ID from the message body. String customerId = ((String)msg.Body).Substring(11); // Close the queue. mq.Close( ); // Create a DataAdapter to retrieve data for the specified customer. String sqlText = "SELECT * FROM Customers WHERE CustomerID='" + customerId + "'"; SqlDataAdapter da = new SqlDataAdapter(sqlText, ConfigurationSettings.AppSettings["Sql_ConnectString"]); // Fill the Customer table in the DataSet with customer data. DataSet ds = new DataSet( ); da.Fill(ds, "Customers"); // Create the result queue if it does not exist. if(!MessageQueue.Exists(QUEUENAMERESULT)) MessageQueue.Create(QUEUENAMERESULT); // Create an object to access the result queue. mq = new MessageQueue(QUEUENAMERESULT); // Send a message with the customer DataSet to the queue. mq.Send(ds, customerId); resultTextBox.Text = "Response sent."; } private void processResultButton_Click(object sender, System.EventArgs e) { StringBuilder result = new StringBuilder( ); // Create the result queue if it does not exist. if(!MessageQueue.Exists(QUEUENAMERESULT)) MessageQueue.Create(QUEUENAMERESULT); // Create an object to access the result queue. MessageQueue mq = new MessageQueue(QUEUENAMERESULT); // Set the formatter for (de)serialization of message bodies. mq.Formatter = new XmlMessageFormatter(new Type[] {typeof(DataSet)}); // Receive a message from the result queue. System.Messaging.Message msg; try { msg = mq.Receive(new TimeSpan(0, 0, 1)); } catch(MessageQueueException ex) { resultTextBox.Text = ex.Message; return; } // Create the customer DataSet from the message body. DataSet ds = (DataSet)msg.Body; // Display the results of the query. result.Append("QUERY RESULTS:" + Environment.NewLine); if (ds.Tables["Customers"].Rows.Count == 0) result.Append("Customer not found for ID = '" + msg.Label + "'."); else for(int i = 0; i < ds.Tables[0].Columns.Count; i++) { result.Append(ds.Tables[0].Columns[i].ColumnName + " = " + ds.Tables[0].Rows[0][i] + Environment.NewLine); } resultTextBox.Text = result.ToString( ); }

Discussion

Message Queuing (MSMQ) provides an inter-application messaging infrastructure that allows messages to be sent between disconnected applications. MSMQ provides for message transport, queuing, transactional message support, error handling and auditing, and makes available a variety of Application Programming Interfaces to interact with MSMQ programmatically. The System.Messaging namespace contains the .NET classes that support MSMQ.

To send a message using MSMQ, perform the following actions:

Table 2-19. .NET predefined formatters

Formatter

Description

ActiveXMessageFormatter

Serializes or deserializes primitive data types and other objects using a format compatible with MSMQ ActiveX Component to allow interoperability with previous versions of MSMQ. It is fast and produces a compact serialization.

BinaryMessageFormatter

Serializes or deserializes an object or an object graph using a binary format. It is fast and produces a compact serialization.

XMLMessageFormatter

Serializes or deserializes objects and primitive data types into XML based on an XSD schema. This is the default formatter for MessageQueue components .

When the Send( ) method of the MessageQueue is called, the body of the message is serialized using the XMLMessageFormatter if the Formatter property is not specified.

To read a message and recreate the serialized body, formatter properties must be set before reading the message. The properties that must be set are specific to the formatter:

ActiveXMessageFormatter

No properties must be set.

BinaryMessageFormatter

Specify the format of the root object and the type descriptions either in the constructor or by explicitly setting the TopObjectFormat and TypeFormat properties.

XmlMessageFormatter

Specify the target types or target type names either in the constructor or by explicitly setting the TargetTypes or TargetTypeNames property.

The message can now be read by using the Receive( ) method of the MessageQueue . You can retrieve the serialized object from the Body property of the Message returned by the Receive( ) method.

For more information about Microsoft Message Queue (MSMQ), see the MSDN Library.

Searching and Analyzing Data

Категории