RX Extensions & Threading

Like a lot of people, I’ve been playing with Rx recently, the Reactive Extensions for .Net. To find out a bit more about what these are have a look here. One of the things I found interesting was the use of Rx as a fluent interface for creating sets of asynchronous tasks and then subscribing to the collective result with both a result handler and error handler.

I’ve worked up a little example here that shows some code performing some fairly standard asynchronous tasks. When called, my code needs to call a web service, query a database and fetch a value from some cache. It then needs to combine the results. Since these processes each take different and often varying amounts of time and I want to minimise the executing time of my code I’m going to do them in parallel.

I have shown 4 different methods of achieving this, using my own threads, the thread pool’s threads by way of BeginInvoke(), the Tasks namespace new in .NET 4.0 and Rx.

namespace RXThreadingExample
{
	using System;
	using System.Collections.Generic;
	using System.Diagnostics;
	using System.Linq;
	using System.Threading;
	using System.Threading.Tasks;

	public class Program
	{
		public static void Main()
		{
			const bool throwException = false;

			ClassicAsync(throwException);
			WaitHandles(throwException);
			Tasks(throwException);
			RXExtensions(throwException);

			Console.ReadKey(true);
		}

		private static void ClassicAsync(bool throwException)
		{
			var sw = Stopwatch.StartNew();
			var wsResult = 0;
			string dbResult = null;
			string cacheResult = null;

			var callWebService = new Thread(() => wsResult = CallWebService());
			var queryDB = new Thread(() => dbResult = QueryDB(throwException, "Async"));
			var fetchCacheItem = new Thread(() => cacheResult = FetchCacheItem());

			try
			{
				callWebService.Start();
				queryDB.Start();
				fetchCacheItem.Start();

				callWebService.Join();
				queryDB.Join();
				fetchCacheItem.Join();

				Console.WriteLine(dbResult, wsResult, cacheResult, sw.ElapsedMilliseconds);
			}
			catch (Exception ex)
			{
				Console.WriteLine("Exception : {0}", ex);
			}
		}

		private static void WaitHandles(bool throwException)
		{
			var sw = Stopwatch.StartNew();

			var waitHandles = new List<WaitHandle>();
			Func<int> callWebService = CallWebService;
			var wsAsyncResult = callWebService.BeginInvoke(null, null);			
			waitHandles.Add(wsAsyncResult.AsyncWaitHandle);

			Func<bool, string, string> queryDB = QueryDB;
			var dbAsyncResult = queryDB.BeginInvoke(throwException, "WaitHandles", null, null);
			waitHandles.Add(dbAsyncResult.AsyncWaitHandle);

			Func<string> queryLocalCache = FetchCacheItem;
			var cacheAsyncResult = queryLocalCache.BeginInvoke(null, null);
			waitHandles.Add(cacheAsyncResult.AsyncWaitHandle);

			try
			{
				WaitHandle.WaitAll(waitHandles.ToArray());

				var wsResult = callWebService.EndInvoke(wsAsyncResult);
				var dbResult = queryDB.EndInvoke(dbAsyncResult);
				var cacheResult = queryLocalCache.EndInvoke(cacheAsyncResult);

				Console.WriteLine(dbResult, wsResult, cacheResult, sw.ElapsedMilliseconds);
			}
			catch(Exception ex)
			{
				Console.WriteLine("Exception : {0}", ex);
			}
		}

		private static void Tasks(bool throwException)
		{
			var sw = Stopwatch.StartNew();
			var wsResult = 0;
			string dbResult = null;
			string cacheResult = null;

			var tasks = new List<Task>
				{
					new Task(() => wsResult = CallWebService()),
					new Task(() => dbResult = QueryDB(throwException, "Tasks")),
					new Task(() => cacheResult = FetchCacheItem())
				};

			try
			{
				tasks.ForEach(t => t.Start());
				Task.WaitAll(tasks.ToArray());
				Console.WriteLine(dbResult, wsResult, cacheResult, sw.ElapsedMilliseconds);
			}
			catch (Exception ex)
			{
				Console.WriteLine("Exception : {0}", ex);
			}

		}

		private static void RXExtensions(bool throwException)
		{
			var sw = Stopwatch.StartNew();
			Observable.Join(
				Observable.ToAsync<int>(CallWebService)()
					.And(Observable.ToAsync<bool, string, string>(QueryDB)(throwException, "RX"))
					.And(Observable.ToAsync<string>(FetchCacheItem)())
					.Then((wsResult, dbResult, cacheValue) =>
						new { WebServiceResult = wsResult, DatabaseResult = dbResult, CacheValue = cacheValue })
				).Subscribe(
					o => Console.WriteLine(o.DatabaseResult, o.WebServiceResult, o.CacheValue, sw.ElapsedMilliseconds),
					e => Console.WriteLine("Exception: {0}", e));
		}

		private static int CallWebService()
		{
			Thread.Sleep(500);
			return new Random().Next(1,33);
		}

		private static string QueryDB(bool throwException, string name)
		{
			Thread.Sleep(1500);
			if (throwException)
			{
				throw new Exception("You asked for it !");
			}
			return name + " can rescue {0} Chilean miners in {2} ms. {1}";
		}

		private static string FetchCacheItem()
		{
			return new[]{"Awesome!", "Cool!", "Amazing!", "Jinkies!"}[new Random().Next(0,4)];
		}
	}
}

I have to say, I like the syntactic sugar of Rx. In the tests I have run, however, using Task consistently produces the fastest results. More to follow, I think.

Events in C# 4.0

Rx : Reactive Extensions for .Net and JavaScript.

Rx is a set of framework extensions to allow developers to easily do asynchronous programming via familiar interfaces. If you are familiar with LINQ and IEnumerable<T> and IEnumerator<T> you will instantly be able to grasp and use the new interfaces IObservable<T> and IObserver<T>. The enumerating interfaces allow you to PULL sequences of T from an interface in your code. The observing interfaces allow you to have sequences of T PUSHED to handlers in your code in response to asynchronous LINQ queries. Rx is implemented over the top of PFx (Parallel Framework extensions) so all the threading and concurrency is handled for you including synchronization contexts which is especially useful if you are programming UI. It is also particularly significant that the Rx extension methods include a FromEvent() method that allows you to attach handlers to a steam of events in a very fluid way.

If you want to know more watch the video on the bottom of this page where Rx author Erik Meijer explains it in 14 minutes.

If you want to see it in action have a look at this Channel9 video with Wes Dyer where he implements drag & drop from scratch using Rx in 6 minutes.

Also, checkout the Rx Team Blog for downloads, updates and samples.

Finally, back to Channel9 to look at RxJS for Reactive extensions for Java Script so the UI I mentioned above now includes web UI too. As Jeffrey Van Gogh says in the RxJS video, web programming is about “asynchronous stuff” but java script is an imperative language. It’s really hard to do good asynchronous stuff in java script. RxJS changes that it a big way.

A Couple of Bugs (or not)

A couple of niggles have come to light today. The first one is that Windows Azure does not seem to support the latest version of the Azure .NET Services Bus. The recent release of the .NET Services SDK, the July CTP, included a new version of Microsoft.ServiceBus.dll (0.16.0.0). This version is not available on Windows Azure. After I had updated my SDK I rebuilt and redeployed a Windows Azure cloud application with a Web Role that calls service bus endpoints. This started throwing all kinds of odd exceptions which was confusing as it (of course) worked fine on my machine. I eventually tracked the issue down and the only way round it at the moment is to set the Copy Local property in the properties of the reference to Microsoft.ServiceBus.dll to true. That way it will get deployed with the application.

The second issue was a kind of two in one. I’m developing some prototypes for Windows Mobile using the .NET Compact Framework 3.5. One of the things I’m working on is a method of keeping mobile devices synchronised via .Net Service Bus queues and routers. There is no native support for the Service Bus on .NETCF but fortunately I only need to make REST calls using a meteor pattern to poll a queue for sync messages.

I had two problems. The first was that I could only poll a queue twice using HttpWebRequest before I’d start getting timeouts. This turned out to be because I was hitting the max connections limit (2 by default), even though I was carefully disposing of responses and closing response streams etc. The second was that I would always get an ObjectDisposedException at System.Threading.WaitHandle.CheckResultInternal when I closed the application. The polling of the queue was of course being done on a background thread, so looking at the two together it did seem that some resource was not being correctly released somewhere.

Looking into the stack trace of the ObjectDisposedException exception I could see references to HttpWebRequest and stream writes. It occurred to me that because I was using REST I was only ever sending headers in my HttpWebRequests, and no body. When I was calling WebRequest.Create() it was possible that the request stream was being opened and because I wasn’t using it it never got closed. As these requests were being created on my background thread they would not dispose correctly and they would continue to consume connection resources and possibly casue thread termination exceptions.

The answer was to close the request stream before sending the request :

try{    HttpWebRequest dequeueRequest = (HttpWebRequest)WebRequest.Create(_QueueHeadUri.AbsoluteUri + "?encoding=asreply&maxmessages=1&timeout=15");    dequeueRequest.ConnectionGroupName = "queueclient";    dequeueRequest.Method = "DELETE";    dequeueRequest.Timeout = 30000;    dequeueRequest.ContentLength = 0;    dequeueRequest.Headers.Add("X-MS-Identity-Token", this._AccessToken);    // this next line is the key     dequeueRequest.GetRequestStream().Close();    using (HttpWebResponse response = (HttpWebResponse)dequeueRequest.GetResponse())    {        if (response.StatusCode == HttpStatusCode.OK)        {            string viaString = response.Headers["X-REQUEST-HTTP-URI"];            Uri viaUri = new Uri(viaString);            if (this.OnMessageDequeued != null)            {                this.OnMessageDequeued(this, viaUri.Query);            }            continue;        }        if (response.StatusCode == HttpStatusCode.NoContent)        {            continue;        }        if (this.OnMessageDequeueError != null)        {            this.OnMessageDequeueError(this, new Exception(string.Format("HTTP Status Code : {0} : {1}", ((int)response.StatusCode), response.StatusCode)));        }    }}catch (Exception ex){    if (this.OnMessageDequeueError != null)    {        this.OnMessageDequeueError(this, ex);    }}

Generics in Serviced Component interfaces.

Here’s a thing that’s bugging me right now. I’m refactoring a client server application to use Enterprise Services. It’s already split into UI, Buslness Logic and Data Access on the client with direct calls to the SQL server stored procedures so we’re just moving some of those layers to the server and turning them into serviced components. Simple and effective….OK so it’s not THAT simple but in broad strokes that’s the strategy.
 
So I’m looking at the newly formed COM+ API with its interfaces and methods using the Component Services explorer and I notice that lots of the methods are missing. I mean, I know they’re there right, because I can call them and they work, but COM+ is telling me that they don’t exist.
 
It turns out that the thing all of the “missing” methods have in common is that they have a generic somewhere in the method signature, like 
int foo(Nullable<int> bar);
 or 
List<string> foo(string bar);
 So I ask around and I get a number of explanations/guesses, a common one being “you can’t use generics across a COM+ boundary”. But, sure I can, because these methods are working. So what’s the deal here ?
I know that if I have a simple signature like : 
int foo(string bar);
 then COM+ will have no problem with it and I am wondering if this is similar to the issue with the serializer. There are some CLR types (mainly the value types) that COM+ understands and so if a method signature contains only these types the COM+ serializer will be used when the method is called. However, if the method signature contains a type that COM+ does not undertsand and cannot serialize it will defer to the .NET remoting serializer when the method is called. Similarly when I query the interface COM+ simply doesn’t have the language to communicate the method signatures of the methods that contain generics because it’s stuck with IDL so it doesn’t bother trying. But then if I look at method signatures that have DataSets, or strongly typed derivatives thereof, in them they appear just fine so IDL is communicating those just fine.
 
So maybe I shouldn’t be worrying about this because it works and if it ain’t broke….right ? But now I’m looking at profiling this application and I’m looking at various tools that will give me some metrics on calls to the methods in the COM+ API and I have a sneaking suspicion that the metrics for all my missing methods are going to get lumped together under calls to IRemoteDispatch or some such horror.
 
Anyway, I banged my head on this for a while and in the end I took a flyer and emailed Juval Lowy. Yes, me. I did that. So over the weekend I exchange a few emails with Juval and you know what ? He didn’t tell me the answer. I think he kind of hinted at it but in the end what he actually did was make me feel a whole lot better about learning not to care. I mean, if you look at this method : 
DataSet GetData(Nullable<int> id);
why would I spend hours agonising over the perfomance issues that might be associated with the difference between Nullable<int> and just int. I’ve already made my bed in performance terms by deciding to pass a DataSet. At the end of the day I have a working application, the client is happy and if performance is adequate why should I care ? The answer is, I don’t know, I just know that I do and I find it hard not to.
%d bloggers like this: