Advanced Features / Routing

Jul 27, 2009 at 10:40 PM

Hi, I really like the application space. It has a very clear concept. I had a look at some open source message bus projects (nservicebus, masstransit, simple service bus) and they are sometimes very confusing. But it would be nice to have same control at the application space like in the projects, that I mentioned before. For example I would like to have some routers like aggregators, process managers and stuff like this and dont really know how to implement this.

I want to make a short example with the aggregator. To implement such a router all messages need a correlation id, which defines which messages belong together. This id is not part of the header content and should be seperated from the rest of the data. My idea was to make a message class which contains the original message:

sealed class ManagedMessage<T>
{
   public T Body { get; set; }
   public CorrelationId { get; set; }
}

But the component that handles the message should not care about the correlation, therefore I need a system to seperate the body from the rest of the message. My idea was to derive from Port<T>:

class ManagedPort<T> : Port<ManagedMessage<T>>
{
   void ProcessMessage(ManagedMessage<T> m)
   {
       // Call a message handler for example
   }
}
I dont know if it works, but nevertheless it looks very ugly and I cannot use the attribute (XcoExclusive...).

So my question is, which is the best method to extend the system for this scenarios. Which is the best way to implement a message flow system.

Coordinator
Jul 28, 2009 at 7:15 AM

I´m very happy you like the AppSpace - and even better than the usual service busses ;-) (However, just to be clear, the AppSpace is not supposed to replace the service bus concept. Rather we see the AppSpace as an alternative just where it fits or - very importantly! - as a framework to build other communication infrastructure like service busses. Why not rebuild MassTransit with the AppSpace? :-)

But for your question: How to build "space components" like aggregators, routers or other Enterprise Integration Patterns and plug them together into message flows?

I guess you set out in the right direction with ManagedMessage<T> - although it (or at least its derivations) should be serializable.

Generic components need some generic message type to work on. Otherwise they cannot have any expectations about the data they process.

What I don´t understand, though, is why your sample processor does not work. I set it up like you described, put [XcoExclusive] on the ProcessMessage() method and it worked like a charm.

And what is it that makes it look ugly to you? This is how you set up async workers. Or maybe I misunderstand you.

As for flows: The AppSpace has not concept of data flows so far. But have a look here: http://ccrflows.codeplex.com/ This is how I think flows could be described in an OO language like C#/VB using the CCR. And since the CCR Flows are based on the CCR you can use them with the AppSpace. You just need to be careful: the sequence of messages currently cannot be guaranteed across workers. So if you set up a CCR flow then it should run within a single worker. Only cross worker boundaries if it´s ok for messages to change their order.

Let me know if this has brought you closer to an answer to your questions.

-Ralf

Jul 28, 2009 at 7:58 AM

Thanks for your answer.

For most of the routers in a ESB I need special information in my message, one example is the aggregator with the correlation id. This properties should be hidden from the user, because it is part of the infrastructure and (in most cases) not part of the business logic. I would like to extend the application space and my idea was to derive from Port and name it for example ManagedPort. The managed port handles the message and calls a handler which contains the business logic and can only access the Body. This would mean, that

(1) The process methods are part of my infrastructure and thus the handler methods cannot contain the attributes (because the process method already exisits), they also dont need them, because they are called by ProcessMessage(ManagedMessage<T> m)

(2) I must also derive from PortSet<T1, ... TN>

(3) It can be very confusing for other users whether to use Port<T> or ManagedPort<T>

Coordinator
Jul 28, 2009 at 8:28 AM

Sorry, but I don´t see any difficulty here. Why can´t you do sth like this:

abstract class ManagedPort<T> : Port<T>
{
    [XcoConcurrent]
    void ProcessMsg(ManagedMessage<T> msg)
    {
        // do some stuff with the message
        ProcessBody(msg.Body);
        // do some more stuff with the msg
    } 

    abstract protected virtual void ProcessBody(T body);

class ManagedIntPort : ManagedPort<int>
{
    protected override void ProcessBody(int body)
    {
        // do your stuff with an int
    }

Instead of using an abstract base class and deriving a class for processing concrete body types, you could pass a delegate to ManagedPort for processing the body.

-Ralf

Jul 28, 2009 at 11:20 AM

I think you are right, I can also use the Xco Attributes with a reader writer lock to have a syntax like the default ports.

abstract class ManagedPortSet<T0, T1> : PortSet<ManagedMessage<T0>, ManagedMessage<T1>>
{

    private ReaderWriterLock _lock = new ReaderWriterLock();


    [XcoConcurrent]
    void ProcessMsg(ManagedMessage<T0> msg)
    {

        if (GetXcoAttributeId<T0>() == XcoAttributeId.Exclusive)
        {
            _lock.AquireWriterLock();
            ProcessBody(msg.Body);
            _lock.ReleaseWriterLock();
        }
        else
        {
            ProcessBody(msg.Body);
        }

         // do some stuff with the message
        ProcessBody(msg.Body);
        // do some more stuff with the msg
    } 

    abstract protected virtual void ProcessBody(T body);
}

Another bad point is the fact, that the Subscription Messages will not work anymore, when having a PortSet like this:

class ManagedPortSet<T1, T2, T3, ... T19> : PostSet<ManagedMessage<T1>, ... , ManagedMessage<T19>>
{

}

 

I need to make my own publish, subscribe mechanism, which doesn't seems very hard. Just too bad, that it isnt possible to derive from multiple classes in .net.


Btw: The worker extensions are very cool. Perhaps I will make some workers for statisics information and management functions (number of handled messages, number of message that could could not been handled, ping request...).

Aug 4, 2009 at 7:03 PM

Hi, just another question.

I extended the port and portset as described before. But now I have to write a process method for each message and cannot use WorkerExtensions anymore (because there exists a method in the derviced portset and the workerextension).

Do you have some ideas?

Coordinator
Aug 5, 2009 at 7:52 AM

Sorry, I don´t understand your problem. Can you post a code snippet? What would you like to do? How do you expect the code to look?

Aug 5, 2009 at 8:15 AM

I made the same with PortSet like with Port before:

public class MyPortSet<T0, T1> : PortSet<MessageEnvelope<T0>, MessageEnvelope<T1>>
{

    private void Process(MessageEnvelope<T0> message)
    {
        // Do some stuff before
        // Call handler
        // Do some stuff after handling the message
    }

    private void Process(MessageEnvelope<T1> message)
    {
        // Do some stuff before
        // Call handler
        // Do some stuff after handling the message
    }

}

Works great so far.

Now I made my own Publish, Subscribe mechanism using worker extensions, which capsulates all messages in an envelope.

But wenn I add them:

public MyService : PortSet<Message, MySubscribe<Message>>
{
   MyPublisher<Message> publisher = new MyPublisher<Message>();

  public MyService()
  {
     RegisterHandler(MessageHandler);

  }

  private void MessageHandler(Message msg)
  {
  }

}

...there exists two message handler methods:

- One Method at my custom PortSet

- One Method at the publisher extension.

Which does not work (Works when removing the 2 handler method in MyPortSet)

Coordinator
Aug 5, 2009 at 9:47 AM

Thx for the code. Unfortunately I don´t see a connection between MyService and MyPortSet. In addition you did not provide a defintion fo MyPublisher.

Aug 5, 2009 at 9:59 AM

Oh, sorry, MyService derives from MyPortSet:

public MyService : MyPortSet<Message, MySubscribe<Message>>
{

   ...

}

 

MyPublisher ist just a simple Worker extensions:

[XcoWorkerExtensions]
class MyPublisher<T>
{

   [XcoConcurrent]
   private void HandleSubscribe(MySubscribe<T> s) { ... }

}

Coordinator
Aug 5, 2009 at 11:05 AM

I don´t think this will work. See what you´re doing:

MyService : MyPortSet<Message, MySubscribe<Message>>

this is the same as:

MyService : PortSet<MessageEnvelope<Message>, MessageEnvelope<MySubscribe<Message>>>

Is that what you want? I doubt it, since your publisher is registered as

MyPublisher<Message> publisher = new MyPublisher<Message>();

which means, it accepts only MySubscribe<Message> messages and not MessageEnvelope<MySubscribe<Message>>.

So what do you want to accomplish?

 

Aug 5, 2009 at 12:20 PM

Damn, third try. Now I have access to my code and can just copy & paste it :(

The Publisher is implemented this way:

    [XcoWorkerExtension]
    public class MyPublisher<T> : MyPublisher
    {
        [XcoConcurrent]
        private void ProcessSubscribe(MessageEnvelope<MySuscribe<T>> subscribe)
        {
            base.ProcessSubscribe(subscribe.Body);
        }

        public void Publish(T message)
        {
            base.Publish(message);
        }
    }


Seems to be correct I think and as I mentioned before, the publisher WORKS when I remove the Process method from the MyPortSet, which is reponsible for the second Message type (MessageEnvelope<MySubscribe<Message>>).

 

As summary, one of the two possibilities work:

1. Only implement a process method for first message type in the custom PortSet and use MyPublisher as WorkerExtension.

2. Both Process Messages in MyPortSet but no MyPublisher

 

If I want to have both Process methods in the custom PortSet (which I really need to call the handlers when I dont have  a worker extension, which handles the message) AND a publisher it throws the following exception when calling RunWorker:

No port found for required message type XcoTest.MessageEnvelope`1[XcoTest.MySubscribe`1[XcoTest.PContract+Ping]]

 

I hope it is understandable now.

Coordinator
Aug 5, 2009 at 12:47 PM

To sum up what you have:

public class MyPublisher<T> : MyPublisher
        [XcoConcurrent]
        private void ProcessSubscribe(MessageEnvelope<MySuscribe<T>> subscribe)

public class MyPortSet<T0, T1> : PortSet<MessageEnvelope<T0>, MessageEnvelope<T1>>
    private void Process(MessageEnvelope<T0> message)
    private void Process(MessageEnvelope<T1> message)

Please observe there are not [Xco...] attributes on the Process() methods! I don´t think they would work like this anyway.

public MyService : MyPortSet<Message, MySubscribe<Message>>
       MyPublisher<Message> publisher = new MyPublisher<Message>();
  public MyService()
  private void MessageHandler(Message msg)

I don´t understand, what RegisterHandler() in MyService.ctor does. But again there are [Xco...] attributes missing, so no method will handle messages.

This last definition effectively leads to the following:

  • MyService is a PortSet<MessageEnvelope<Message>, MessageEnvelope<MySubscribe<Message>>>
  • MyService has 3 (!) processing methods:
    1. ProcessSubscribe(MessageEnvelope<MySubscribe<Message>> subscribe) from MyPublisher
    2. Process(MessageEnvelope<Message> message) from MyService
    3. Process(MessageEnvelope<MySubscribe<Message>> message) from MyService

You see, there are two message handlers with the same parameter signature: 1 and 3. Which should the AppSpace bind the message MessageEnvelope<MySubscribe<Message>> bind to (if attributes were present)?

 

Aug 5, 2009 at 1:18 PM

Damn, not my day. There are [Xco...] attributs on the Process() methods, just have forgotten to copy them. I think the 3(!) processing methods are the problem, because there exists two for the same message type and the application space cannot decide which to invoke, when a message arrives and throws the exception.


I made a little example which shows the same problem: http://www.gpstudio.de/XcoTest.rar. I need all Processing methods, so the question is: Is this a bug or does it work as expected?

 

The RegisterHandler ist just to register a delegate, so the user does not have to care about the MessageEnvelope<T> and just about the body. An example from MyPort (it is the same at MyPortSet).

public class MyPort<T> : Port<MessageEnvelope<T>>
    {
        private Action<T> _handler1;

        public void Reply(T message)
        {
            Contract.Requires<ArgumentNullException>(message != null, "Message cannot be null.");

            this.PostEnveloped(message, MyPortHelper.GetLastMessageCorrelationId());
        }

        public void Post(T message)
        {
            Contract.Requires<ArgumentNullException>(message != null, "Message cannot be null.");

            this.PostEnveloped(message);
        }

        public void Register(Action<T> handler)
        {
            _handler = handler;
        }

        [XcoConcurrent]
        internal void Process(MessageEnvelope<T> message)
        {
            if (message != null && _handler != null && message.IsReceivedInTime())
            {
                MessageStorage.Current.StoreLastReceivedMessage(message);

                    _handler(message.Body);

                MessageStorage.Current.StoreLastReceivedMessage(null);
            }
        }
    }

Coordinator
Aug 5, 2009 at 1:45 PM

Well, I´d not call it a feature, but neither a bug. If there is more than one method which accepts message type X then the AppSpace cannot decide which one to use.

This means, you must not define more than one method attributed with [Xco...] with a particular message type.

To us this sounds pretty reasonable and thus we won´t change it in the near future.

There might be a way around this, though, as the final section on this page describes:

http://xcoappspace.codeplex.com/Wiki/View.aspx?title=PubSubFeature

Try using the Optional property of the XcoExtension attribute to hide a publisher method. Of course you then need to handle the subscription messages in your worker. I doubt this is what you want.