RX Extensions & Threading

Like a lot of people, I’ve been playing with Rx recently, the Reactive Extensions for .Net. To find out a bit more about what these are have a look here. One of the things I found interesting was the use of Rx as a fluent interface for creating sets of asynchronous tasks and then subscribing to the collective result with both a result handler and error handler.

I’ve worked up a little example here that shows some code performing some fairly standard asynchronous tasks. When called, my code needs to call a web service, query a database and fetch a value from some cache. It then needs to combine the results. Since these processes each take different and often varying amounts of time and I want to minimise the executing time of my code I’m going to do them in parallel.

I have shown 4 different methods of achieving this, using my own threads, the thread pool’s threads by way of BeginInvoke(), the Tasks namespace new in .NET 4.0 and Rx.

namespace RXThreadingExample
{
	using System;
	using System.Collections.Generic;
	using System.Diagnostics;
	using System.Linq;
	using System.Threading;
	using System.Threading.Tasks;

	public class Program
	{
		public static void Main()
		{
			const bool throwException = false;

			ClassicAsync(throwException);
			WaitHandles(throwException);
			Tasks(throwException);
			RXExtensions(throwException);

			Console.ReadKey(true);
		}

		private static void ClassicAsync(bool throwException)
		{
			var sw = Stopwatch.StartNew();
			var wsResult = 0;
			string dbResult = null;
			string cacheResult = null;

			var callWebService = new Thread(() => wsResult = CallWebService());
			var queryDB = new Thread(() => dbResult = QueryDB(throwException, "Async"));
			var fetchCacheItem = new Thread(() => cacheResult = FetchCacheItem());

			try
			{
				callWebService.Start();
				queryDB.Start();
				fetchCacheItem.Start();

				callWebService.Join();
				queryDB.Join();
				fetchCacheItem.Join();

				Console.WriteLine(dbResult, wsResult, cacheResult, sw.ElapsedMilliseconds);
			}
			catch (Exception ex)
			{
				Console.WriteLine("Exception : {0}", ex);
			}
		}

		private static void WaitHandles(bool throwException)
		{
			var sw = Stopwatch.StartNew();

			var waitHandles = new List<WaitHandle>();
			Func<int> callWebService = CallWebService;
			var wsAsyncResult = callWebService.BeginInvoke(null, null);			
			waitHandles.Add(wsAsyncResult.AsyncWaitHandle);

			Func<bool, string, string> queryDB = QueryDB;
			var dbAsyncResult = queryDB.BeginInvoke(throwException, "WaitHandles", null, null);
			waitHandles.Add(dbAsyncResult.AsyncWaitHandle);

			Func<string> queryLocalCache = FetchCacheItem;
			var cacheAsyncResult = queryLocalCache.BeginInvoke(null, null);
			waitHandles.Add(cacheAsyncResult.AsyncWaitHandle);

			try
			{
				WaitHandle.WaitAll(waitHandles.ToArray());

				var wsResult = callWebService.EndInvoke(wsAsyncResult);
				var dbResult = queryDB.EndInvoke(dbAsyncResult);
				var cacheResult = queryLocalCache.EndInvoke(cacheAsyncResult);

				Console.WriteLine(dbResult, wsResult, cacheResult, sw.ElapsedMilliseconds);
			}
			catch(Exception ex)
			{
				Console.WriteLine("Exception : {0}", ex);
			}
		}

		private static void Tasks(bool throwException)
		{
			var sw = Stopwatch.StartNew();
			var wsResult = 0;
			string dbResult = null;
			string cacheResult = null;

			var tasks = new List<Task>
				{
					new Task(() => wsResult = CallWebService()),
					new Task(() => dbResult = QueryDB(throwException, "Tasks")),
					new Task(() => cacheResult = FetchCacheItem())
				};

			try
			{
				tasks.ForEach(t => t.Start());
				Task.WaitAll(tasks.ToArray());
				Console.WriteLine(dbResult, wsResult, cacheResult, sw.ElapsedMilliseconds);
			}
			catch (Exception ex)
			{
				Console.WriteLine("Exception : {0}", ex);
			}

		}

		private static void RXExtensions(bool throwException)
		{
			var sw = Stopwatch.StartNew();
			Observable.Join(
				Observable.ToAsync<int>(CallWebService)()
					.And(Observable.ToAsync<bool, string, string>(QueryDB)(throwException, "RX"))
					.And(Observable.ToAsync<string>(FetchCacheItem)())
					.Then((wsResult, dbResult, cacheValue) =>
						new { WebServiceResult = wsResult, DatabaseResult = dbResult, CacheValue = cacheValue })
				).Subscribe(
					o => Console.WriteLine(o.DatabaseResult, o.WebServiceResult, o.CacheValue, sw.ElapsedMilliseconds),
					e => Console.WriteLine("Exception: {0}", e));
		}

		private static int CallWebService()
		{
			Thread.Sleep(500);
			return new Random().Next(1,33);
		}

		private static string QueryDB(bool throwException, string name)
		{
			Thread.Sleep(1500);
			if (throwException)
			{
				throw new Exception("You asked for it !");
			}
			return name + " can rescue {0} Chilean miners in {2} ms. {1}";
		}

		private static string FetchCacheItem()
		{
			return new[]{"Awesome!", "Cool!", "Amazing!", "Jinkies!"}[new Random().Next(0,4)];
		}
	}
}

I have to say, I like the syntactic sugar of Rx. In the tests I have run, however, using Task consistently produces the fastest results. More to follow, I think.

Rx : Reactive Extensions for .Net and JavaScript.

Rx is a set of framework extensions to allow developers to easily do asynchronous programming via familiar interfaces. If you are familiar with LINQ and IEnumerable<T> and IEnumerator<T> you will instantly be able to grasp and use the new interfaces IObservable<T> and IObserver<T>. The enumerating interfaces allow you to PULL sequences of T from an interface in your code. The observing interfaces allow you to have sequences of T PUSHED to handlers in your code in response to asynchronous LINQ queries. Rx is implemented over the top of PFx (Parallel Framework extensions) so all the threading and concurrency is handled for you including synchronization contexts which is especially useful if you are programming UI. It is also particularly significant that the Rx extension methods include a FromEvent() method that allows you to attach handlers to a steam of events in a very fluid way.

If you want to know more watch the video on the bottom of this page where Rx author Erik Meijer explains it in 14 minutes.

If you want to see it in action have a look at this Channel9 video with Wes Dyer where he implements drag & drop from scratch using Rx in 6 minutes.

Also, checkout the Rx Team Blog for downloads, updates and samples.

Finally, back to Channel9 to look at RxJS for Reactive extensions for Java Script so the UI I mentioned above now includes web UI too. As Jeffrey Van Gogh says in the RxJS video, web programming is about “asynchronous stuff” but java script is an imperative language. It’s really hard to do good asynchronous stuff in java script. RxJS changes that it a big way.

%d bloggers like this: