This project is read-only.

Publish/Subscribe

Implementing the Publish/Subscribe pattern is easy with the Xcoordination Application Space (AppSpace). You can do in 2 simple steps:

1. Add Subscribe/Unsubscribe messages to worker contract

Think of the message types clients can subscribe to on a worker. Add Subscribe/Unscubscribe messages to the worker contract for them.

As a sample let´s implement a news ticker worker which broadcasts each newly added news item to all its subscribers:

class PNewsWorker : PortSet<PNewsWorker.AddNewsItem, Subscribe<NewsItem>, Unsubscribe<NewsItem>>
{
    [Serializable]
    public class AddNewsItem {...}
}

[Serializable]
class NewsItem {...}
Where do the Subscribe/Unsubscribe messages come from? They are predefined by the AppSpace. Parameterize them with the subscription event type. If a worker publishes several events repeat Subscribe/Unsubscribe for each of them.

2. Turn a worker into a publisher

Next you need to add just one line of code to turn a regular worker into a publishing worker. Just put a XcoPublisher<> field into it:

class SimpleNewsWorker : PNewsWorker
{
    private XcoPublisher<NewsItem> publisher = new XcoPublisher<NewsItem>();
    
    [XcoConcurrent]
    void ProcessNewItems(AddNewsItem msg)
    {...}
}
That´s it. You´re done. The SimpleNewsWorker now not only accepts AddNewsItem messages, but also Subscribe<NewsItem> and Unsubscribe<NewsItem> messages. They are automatically handled by the corresponding XcoPublisher<NewsItem> instance. No need to add message handlers of your own.

Whenever the worker wants to publish a NewsItem to all its subscribes it now just has to call this.publisher.Post(...) with a NewsItem. That´s it. The publisher sees to that all of them get notified. Of course that´s done asynchronously.

By the way: If you want a publisher to broadcast notifications of different types you can define it with several generic type parameters like XcoPublisher<T0, T1, T2, T3, T4>.

Subscribing to a publisher

From the point of view of a subscriber handling publications is like handling any notifications from a service. It needs to set up a Port with a message handler as the sink for the notifications. This Port is passed to the publishing worker using a Subscribe<> message.

PNewsWorker nw = space.ConnectWorker<PNewsWorker>("...");

Port<NewsItem> newsPublications = new Port<NewsItem>();
space.Receive(newsPublications, OnNewsItemPublication); // set up notification handler

nw.Post(new Subscribe<NewsItem>(newsPublications)); // turn on notifications
...
nw.Post(new Unsubscribe<NewsItem>(newsPublications)); // turn off notifications
When the publishing worker receices the Subscribe<> message it adds the Port it´s carrying to its internal list of Ports to broadcast new NewsItems to. (Please give it a moment to process the subscription before you send other messages to the worker which might generate notifications. Otherwise you might miss a notification.)

Filtered subscriptions

Sometimes a subscribe is not interested in all publications of a service. For example it might not want to receive its own news items or just the ones pertaining to a certain subject. Of course you could filter unwanted notifications in your message handler - but why have the server transport them to the client in the first place?

That´s why the AppSpace provides you with publisher side notification filtering. Yes, that´s right: it´s like executing queries in a database server. Here is how you use this feature. Suppose the NewsItem carries an info about the country it´s pertaining to:

[Serializable]
class NewsItem 
{
    public string Country;
}
Then a client could subscribe selectively by passing a lambda expression to the publisher:

nw.Post(new Subscribe<NewsItem>(newsPublications, ni => ni.Country == "Austria"));
The publisher would only send NewsItem instances to this client where this predicate evaluates to true. It truely executes it within the worker process because the lambda expression got sent there with the Subscribe<> message. The AppSpace serializes the lambda expression and transfers it to the receiving workers where it gets deserialized into a expression tree and then compiled.

In order for this to work you just need to ensure that all types uses in the predicate lambda expression are known at the server side. Otherwise the receiving AppSpace cannot deserialize the expression. Also keep the lambda expression short; blocks like ni => { return ni.Country == "Austria" } are unfortunately not allowed. So if you need to do lengthy calculations put them into a function for which you make sure the server knows about.

Passing predicates to the server is a powerful feature. For example, you can use it to selectively receive notifications on different Ports depending on their content. Or you could even filter them according to their type and turn a worker into a simple message bus.
(Note however, that the things you can do with predicates are still limited because they need to be serializable for transfer, e.g. you currently cannot call any methods from within the predicate.)

Optional subscription message handling

Mostly you´ll want the XcoPublisher<> to handle subscription messages automatically. Just list them in the contract and call Publish().

Sometimes, though, you want to intercept message handling. You might want to disallow to unsubscribe for example. Or you might want to receive the subscription port with another message and pass it manually to the XcoPublisher<>.

Both scenarios can be implemented using the [XcoWorkerExtension] attribute on the XcoPublisher<> field.

If you simply don´t want to offer unsubscription, make it optional on the publisher an leave out the Unsubscribe<> message from the contract:

class PNewsWorker : PortSet<PNewsWorker.AddNewsItem, Subscribe<NewsItem>>
{...}
...
[XcoWorkerExtension(Optional=new Type[]{typeof(Unsubscribe<NewsItem>)})]
private XcoPublisher<NewsItem> publisher = ...;
The publisher won´t expetc the Unsubscribe<NewsItem> message anymore on the contract. If it´s there it will wire-up a message handler for it. If not, though, that´s fine, too.

However, if the message is optional and (!) you provide your own message handler, that one will be called instead of the XcoPublisher<>´s! That´s one way of manual handling subscription messages. You could do this, for example, in case you want to keep track of the number of subscriptions.

class PNewsWorker : PortSet<PNewsWorker.AddNewsItem, Subscribe<NewsItem>, Unscribe<NewsItem>>
{...}

...

class SimpleNewsWorker : PNewsWorker
{
    [XcoWorkerExtension(Optional=new Type[]{typeof(Unsubscribe<NewsItem>)})]
    private XcoPublisher<NewsItem> publisher = new XcoPublisher<NewsItem>();
    
    [XcoConcurrent]
    void ProcessUnsubscribeManually(Unsubscribe<NewsItem> unsub)
    {
        ...
        publisher.Unsubscribe(unsub.EventPort);
    }
}
In your own message handler you can either discard the message or you handle it manually like above.

Last edited Feb 22, 2010 at 5:02 PM by thomass, version 8

Comments

No comments yet.