What is an asynchronous component?

A explained here a component consists of two pieces of code: a contract and an implementation. For synchronous components that normally means defining an interface in the contract component with several methods that can be called and (synchronously) return a result, and on the other Hand having an implementation component that simply implements this interface. But what´s an asynchronous component?

Well, an asynchronous component (or async component for short) also consists of a contract and an implementation. The contract, though, is not an interface! Interfaces are means to describe sync communication via the stack. We at Xcoordination don´t see them fit to describe async components. Instead we use Microsoft´s Concurrency Coordination Runtime (CCR) ports as the basic building blocks for async contracts. Please see the CCR documentation for details on how to use its API. Here we want to focus on how the AppSpace builds upon the CCR.

Async component contract

Using the CCR we define an async component to be a class deriving from a CCR Port<T> or PortSet<T0, T1, ...>. The contract of an async component thus is a list of 1 or more ports. Whether you derive directly from Port/PortSet

class MyAsyncService : PortSet<int, string>
{
    ...
}
or indirectly
class PAsyncService : PortSet<int, string> {}
...
class MyAsyncService : PAsyncService
{
    ...
}
is a matter of taste and convenience. We recommend to do it indirectly except for most trivial cases. It will be easier to use async contracts with the AppSpace and a dedicated contract class makes it easier to change the contract; this way you keep your code DRY.

An async component defined this way is supposed to be only used through its contract, i.e. by sending messages to its ports. Be careful when you add methods to such contracts; they, too, should only use the contract ports to delegate work to the implementation. View async contracts as stateless facades before the implementations.

Async component implementation

The implementation of an async component is about processing messages received on its contract´s ports. For each port implement a corresponding message processor method with return type void and just one parameter. The parameter´s type needs to be the same as the port´s message type.

using Microsoft.Ccr.Core;
using XcoAppSpace.Core;
...
class PAsyncService : PortSet<int, string, RequestData> {}
...
class MyAsyncService : PAsyncService
{
    [XcoConcurrent]    
    void ProcessInt(int msg) { ... }

    [XcoConcurrent]    
    void ProcessString(string msg) { ... }

    [XcoConcurrent]    
    void ProcessRequestData(RequestData req) { ... }
}
Choose any name for the processing methods you like. We, however, often use the above pattern of naming them ProcessMSGTYPE(MSGTYPE ...). Also you don´t need to make them public, internal or private is fine. Remember: the sole interface with any client code is the port-based contract.

No kind of overloading is supported for message processing. You may only define one processing method for each contract port; and each contract port needs to be matched by one processing method. Ports and methods have a 1:1 relationship.

Finally it´s important to annotate the processing methods with an attribute! Only this way the AppSpace can wire them automatically to the contract ports as "event handlers". Because event-driven architectures you need to design when you work with async components. Choose among these coordination attributes:
  • [XcoConcurrent]
  • [XcoExclusive]
  • [XcoTeardown]

They advise the AppSpace (or the CCR to be more precise) to coordinate message processing between the methods. Methods marked with [XcoConcurrent] may run in parallel, but those marked with [XcoExclusive] will be run sequentially. The same is true for [XcoTeardown] methods; but after they have run, no other methods may be executed. See the CCR documentation on Interleave Arbiter for more details on concurrent/exclusive/teardown message processing. Internally the AppSpace maps the attributes to an Interleave Arbiter and its receiver groups.

The purpose of these attributes should be obvious: they help protect shared state without the need for explicit locks. Think of an async repository component keeping shared data to be accessed by multiple concurrent components:

class PRespository : PortSet<PRespository.Store, PRespository.Delete, PRespository.FindById, PRespository.GetCount> 
{
    public class Store {...}
    public class Delete {...}
    public class FindById {...}
    public class GetCount {...}
}
...
class TransientRepository : PRepository
{
    [XcoExclusive]
    void ProcessStore(Store cmd) {...}

    [XcoExclusive]
    void ProcessDelete(Delete cmd) {...}

    [XcoConcurrent]
    void ProcessFindById(FindById query) {...}

    [XcoConcurrent]
    void ProcessGetCound(GetCount query) {...}
}
All commands are marked as to run exclusively, all queries may run concurrently. If you follow the command-query separation pattern then it´s easy to decide which attribute to use.

Using a component

So far you´ve only seen how to define an async component. But how do you use it? It´s almost like synchronous components:

using Microsoft.Ccr.Core;
using XcoAppSpaces.Core;
...
using(XcoAppSpace space = new XcoAppSpace())
{
    space.RunWorker<PRepository, TransientRepository>();
    ...
    PRepository r = space.Resolve<PRepository>();
    ...
}
You just register an async component with the AppSpace using RunWorker<TContract, TImplementation>() or RunWorker<TContract>(TContract worker). That´s all. From then on you can get the worker from the AppSpace by calling Resolve<TContract>(). If you like, you can additionally give each worker a name, which also allows you to run several workers of the same contract type within one appspace instance.

Note that if you are using RunWorker<TContract, TImplementation>(), the implementation type needs a parameterless ctor. Ctor-injection therefore is not possible with workers. However, you can also instantiate the worker yourself and just use the RunWorker<TContract>(TContract worker) method to hand in your worker instance. The AppSpace then does the wiring as well, it just doesn't instantiate the worker itself.

Asking a worker instance for some service is just a matter of sending it a message through one of its ports. How you pass in data, pass it on to other workers or ports, return results, handle exceptions... that´s all a matter of how you use the CCR API.

PRepository r = space.Resolve<PRepository>();

r.Post(new PRepository.Store(...));
This means, you can use worker ports in CCR join or choice receivers or you can surround message posts with causalities etc. It also means, you can use iterators. Instead of returning void from a worker method you can return IEnumerator<ITask> if you want to implement a "workflow" within a message processor:

class MyBusinessWorker : PBusinesWorker
{
    IEnumerator<ITask> ProcessWorkflow(WorkflowTask task)
    {
       Port<ActivityResult> pResult;
        ...
       yield return pResult.Receive();
       ActivityResult result = pResult;
       ...
    }
}

Last edited Feb 22, 2010 at 4:04 PM by thomass, version 8

Comments

No comments yet.