Reactive Rx for MSMF

I started wondering how they might add Reactive Extentions to MF without Generics or Linq as a metal question. You know how that goes. 10 hours later, I have something that works. Need to clean and test, but will post soon. I was wonder if I should use CodePlex for something like this and be able to leverage Codeplex source control or just put on project page, or both? Naturally, without generics or extention method support, it makes calling different and have to old school casting. But still a lot of value I think for reactive sensors and such.

Anyone care to fixup and rebuild the MF Extention method project on Codeplex and get it running on 4.1? Would not compile for me under 4.1. Extention methods would be most welcome.

What you are doing is great but it is more advanced than most users on here. It may help if you explain the benefit and usage of what you are doing

Hi Gus. The internals are fairly complex, but the user interface is hopefully ez. The goal of Rx is make complicated things easier. Rx is many things in one. Rx is a publisher/subscriber model. It is the reverse of IEnumerable. Collections and streams push data at you, instead of enumerating it. You can Compose “events” together, filter interesting data, etc. Simplifies concurrency patterns. Some simple examples below:

private static void TestRxSample()
{
    // Sample 1:
    //   - Gets ticks every 100ms, take first 10 ticks, find the even ones and dump.
    Debug.Print("Sample 1:");
    var timer = Observable.Interval(TimeSpan.FromTicks(TimeSpan.TicksPerMillisecond * 100), 10);
    timer = Observable.Take(timer, 10);
    timer = Observable.Where(timer, i => (int)i % 2 == 0);
    using (Observable.Dump(timer, "Interval: "))
    {
        Thread.Sleep(2000); // Give timer a chance to run the ticks.
    }

    // Sample 2:
    //   - In a range of numbers, take the last 3 and sum them.
    Debug.Print("\nSample 2:");
    var nums = Observable.Range(0, 10);
    nums = Observable.TakeLast(nums, 3);
    Observable.Dump(nums, "Numbers: ");
    Observable.Dump(Observable.Sum(nums), "Sum: ");

    /* Output:
    Sample 1:
    Interval: 0
    Interval: 2
    Interval: 4
    Interval: 6
    Interval: 8
    Interval: OnCompleted()
    Disposed timer...

    Sample 2:
    Numbers: 7
    Numbers: 8
    Numbers: 9
    Numbers: OnCompleted()
    Sum: 24
    Sum: OnCompleted()
        */
}

If we had extention methods, this could be written more straight forward, such as

Observable.Range(0,100).Where(i=>(int)i % 2 == 0).Take(3).Dump();

See: [url]Microsoft Learn: Build skills that open doors in your career for Rx in the big framework.

I never used Rx but for me it’s event filtering using Linq. Your example doesn’t really show that, you never use Subscribe()

And you can use extension methods, you just need to add this to your project :

namespace System.Runtime.CompilerServices
{
    [AttributeUsage(AttributeTargets.Assembly | AttributeTargets.Class | AttributeTargets.Method)]
    public sealed class ExtensionAttribute : Attribute { }
}

“I never used Rx but for me it’s event filtering using Linq. Your example doesn’t really show that, you never use Subscribe()”

Hi. I use Subscribe(). Dump is just a helper that calls Subscribe internally and does Debug.Print() instead of writting that code over and over during development. Thanks a ton for the extention method thing! That will make this much smoother.

Here is the updated sample running on MF with extention methods enabled.

private static void TestRxSample()
{
    // Sample 1:
    //   - Gets ticks every 100ms, take first 10 ticks, find the even ones and dump.
    Debug.Print("Sample 1:");
    Observable.Interval(100, 10).Take(10).Where(i=>(int)i%2==0).Dump();
    Thread.Sleep(2000);

    // Sample 2:
    //   - In a range of numbers, take the last 3 and sum them.
    Debug.Print("\nSample 2:");
    Observable.Range(0, 10).TakeLast(3).Sum().Dump("Sum is:");
    Thread.Sleep(5000);

    /* Output:
        Sample 1:
        0
        2
        4
        6
        8
        OnCompleted()

        Sample 2:
        Sum is:24
        Sum is:OnCompleted()
    */
}