Updating Data Asynchronously Using Message Queuing
Problem
You need to asynchronously update data on a database on system that is not always connected.
Solution
You must use message queuing and an XML DiffGram to:
- Construct and send an MSMQ message containing a DiffGram to the server.
- Extract the DiffGram from the message and use it to update the data source at the server.
- Construct and send a message containing the latest DataSet to the client.
- Retrieve the response at the client and deserialize it into a DataSet .
The sample code contains four event handlers:
Form.Load
Sets up the sample by loading a DataSet with the Customers table from the Northwind database. The default view of the table is bound to a data grid on the form.
Send Update Button.Click
Checks if the update message queue exists and creates it if necessary. A MessageQueue object is created to access the queue. A message is created containing a DiffGram of the changes made to the DataSet containing the Customers table. This message is sent to the update queue.
Process Update Button.Click
Checks if the update message queue exists and creates it if necessary. A MessageQueue object is created to access the queue. An attempt is made to receive a message from the queue, waiting one second before giving up. If a message is received, a DataAdapter that uses a CommandBuilder to generate updating logic is created and used to update the database with the changes in the DataSet contained in the body of the message. The latest version of the Customers table is retrieved into a new DataSet . A result queue is created if necessary and a message containing the latest DataSet is sent to the queue.
Receive Update Button.Click
Checks if the result message queue exists and creates it if necessary. A MessageQueue object is created to access the queue and the formatter set to deserialize the DataSet in the message bodies. An attempt is made to receive a message from the queue, waiting one second before giving up. If a message is received, the DataSet in the body is deserialized and the default view of the Categories DataTable is bound to the data grid on the form.
The C# code is shown in Example 4-32.
Example 4-32. File: MessageQueueUpdateForm.cs
// Namespaces, variables, and constants using System; using System.Configuration; using System.Windows.Forms; using System.Messaging; using System.Data; using System.Data.SqlClient; private const String CUSTOMERS_TABLE = "Customers"; private const String QUEUENAMEUPDATE = @".Private$adodotnetcb0413update"; private const String QUEUENAMERESULT = @".Private$adodotnetcb0413result"; private DataSet ds; // . . . private void MessageQueueUpdateForm_Load(object sender, System.EventArgs e) { // As a starting point, load the data directly. // Create the DataAdapter to load customers data. SqlDataAdapter da = new SqlDataAdapter("SELECT * FROM Customers", ConfigurationSettings.AppSettings["Sql_ConnectString"]); // Get the schema and data for the customers table. ds = new DataSet( ); da.FillSchema(ds, SchemaType.Source, CUSTOMERS_TABLE); da.Fill(ds, CUSTOMERS_TABLE); // Bind the default view of the customers table to the grid. dataGrid.DataSource = ds.Tables[CUSTOMERS_TABLE].DefaultView; } private void sendUpdateButton_Click(object sender, System.EventArgs e) { // Create the result queue if it does not exist. if(!MessageQueue.Exists(QUEUENAMEUPDATE)) MessageQueue.Create(QUEUENAMEUPDATE); // Create an object to access the result queue. MessageQueue mq = new MessageQueue(QUEUENAMEUPDATE); // Set the formatter for serialization of message bodies. mq.Formatter = new XmlMessageFormatter(new Type[] {typeof(DataSet)}); // Create a message containing the changes. mq.Send(ds.GetChanges( )); MessageBox.Show("Update message sent.","MessageQueue Update", MessageBoxButtons.OK, MessageBoxIcon.Information); } private void processUpdateButton_Click(object sender, System.EventArgs e) { // Create the result queue if it does not exist. if(!MessageQueue.Exists(QUEUENAMEUPDATE)) MessageQueue.Create(QUEUENAMEUPDATE); // Create an object to access the result queue. MessageQueue mq = new MessageQueue(QUEUENAMEUPDATE); // Set the formatter for deserialization of message bodies. mq.Formatter = new XmlMessageFormatter(new Type[] {typeof(DataSet)}); // Receive a message from the query queue. System.Messaging.Message msg; try { msg = mq.Receive(new TimeSpan(0,0,1)); } catch(MessageQueueException ex) { MessageBox.Show(ex.Message, "MessageQueue Receive Error", MessageBoxButtons.OK, MessageBoxIcon.Error); return; } // Create the DataAdapter and CommandBuilder to update. SqlDataAdapter da = new SqlDataAdapter("SELECT * FROM " + CUSTOMERS_TABLE, ConfigurationSettings.AppSettings["Sql_ConnectString"]); SqlCommandBuilder cb = new SqlCommandBuilder(da); if (msg.BodyStream.Length > 0) { // Get the DataSet of changes from the message body. DataSet dsChanges = (DataSet)msg.Body; // Process the updates. da.Update(dsChanges, CUSTOMERS_TABLE); } // Get the updated DataSet. DataSet dsUpdate = new DataSet( ); da.Fill(dsUpdate, CUSTOMERS_TABLE); // Create the result queue if it does not exist. if(!MessageQueue.Exists(QUEUENAMERESULT)) MessageQueue.Create(QUEUENAMERESULT); // Create an object to access the result queue. mq = new MessageQueue(QUEUENAMERESULT); // Send a message with the update DataSet to the queue. mq.Send(dsUpdate); MessageBox.Show("Update processed. Refreshed DataSet sent.", "MessageQueue Process", MessageBoxButtons.OK, MessageBoxIcon.Information); } private void receiveUpdateButton_Click(object sender, System.EventArgs e) { // Create the result queue if it does not exist. if(!MessageQueue.Exists(QUEUENAMERESULT)) MessageQueue.Create(QUEUENAMERESULT); // Create an object to access the result queue. MessageQueue mq = new MessageQueue(QUEUENAMERESULT); // Set the formatter for deserialization of message bodies. mq.Formatter = new XmlMessageFormatter(new Type[] {typeof(DataSet)}); // Receive a message from the result queue. System.Messaging.Message msg; try { msg = mq.Receive(new TimeSpan(0,0,1)); } catch(MessageQueueException ex) { MessageBox.Show("ERROR: " + ex.Message, "MessageQueue Receive", MessageBoxButtons.OK, MessageBoxIcon.Error); return; } // Refresh the DataSet underlying the DataGrid. ds = (DataSet)msg.Body; dataGrid.DataSource = ds.Tables[CUSTOMERS_TABLE].DefaultView; MessageBox.Show("Retrieved and loaded refreshed data.", "MessageQueue Receive", MessageBoxButtons.OK, MessageBoxIcon.Information); }
Discussion
The discussion in Recipe 2.22 provides an overview of Message Queuing (MSMQ).
For more information about DiffGram s, see Recipe 8.8.