Writing Custom Worker Extensions

Worker extensions give you the possibility to create predefined modules for your workers. This gives you the possibility to combine functionality that is needed within different workers into a single module which is then used by all workers. One such worker extension is already available within the XcoAppSpace: The XcoPublisher, which allows publishing of and subscribing to messages. This pages shows you how to write your own worker extensions with a simple example (you can also find this example in the downloadable usage demos).
The following example shows the SimpleLoadBalancer<T> extension, which allows a worker to act as a load balancer for a certain type of message. The load balancer contains the following functionality:
  • Other workers can be registered as load carriers
  • Messages can be posted that are spread amongst all registered load carriers in round-robin.

Creating the Worker Extension

When creating a class that should be used as a worker extension, the class needs to be marked with the XcoWorkerExtension attribute. This attribute allows using the class as a worker extension within any workers. As a next step, you can now define message processor methods within the worker extension class in exactly the same way as you do it within workers (using the XcoConcurrent or XcoExclusive attribute).
    [XcoWorkerExtension]
    public class SimpleLoadBalancer<TLoad>
    {
        private readonly List<RegisterLoadCarrier<TLoad>> carriers = new List<RegisterLoadCarrier<TLoad>>();
        private int currentCarrierIndex = -1;

        // Payload handler method of worker extension, will automatically be wired when RunWorker is called
        [XcoConcurrent]
        void ProcessRegisterLoadCarrier(RegisterLoadCarrier<TLoad> registration)
        {
            lock(this.carriers)
            {
                Console.WriteLine("  Load Balancer: Registering load carrier");
                this.carriers.Add(registration);
            }
        }

        // Method to be used by worker hosting the worker extension.
        public void Post(TLoad task)
        {
            lock (this.carriers)
            {
                if (this.carriers.Count > 0)
                {
                    this.currentCarrierIndex = (this.currentCarrierIndex + 1) % this.carriers.Count;
                    Console.WriteLine("  Load Balancer: Next load carrier = {0}", this.currentCarrierIndex);
                    this.carriers[this.currentCarrierIndex].LoadCarrier.Post(task);
                }
            }
        }
    }

    //Message class for registering a load carrier.
    [Serializable]
    public class RegisterLoadCarrier<TLoad>
    {
        public Port<TLoad> LoadCarrier;
    }
The code example shows that the SimpleLoadBalancer consists of two methods: One is the ProcessRegisterLoadCarrier which will automatically handle all messages of type RegisterLoadCarrier<TLoad> that are posted to the worker that uses the worker extension. The second method is Post and allows the worker that uses the extension to post a load which is then automatically distributed to one of the registered load carriers.

Using the Worker Extension

The worker extension class can now simply be used by creating a field within the worker. Note that it is necessary that the field is already assigned an instance of the worker extension class as soon as the worker is wired, otherwise it would not be possible to wire the message processor methods of the worker extension correctly. This can be guaranteed by assigning the field like in the following example:
    // Add the worker extension´s messages (here: RegisterLoadCarrier<>)
    // to the worker message contract. They´ll be handled by the worker extension
    // object of the worker.
    public class PWorkflowHead : PortSet<string, RegisterLoadCarrier<string>>
    {}

    public class WorkflowHeadWorker : PWorkflowHead
    {
        // Use worker extension as a field. Its message handler methods
        // get automatically wired up to the worker´s PortSet.
        private readonly SimpleLoadBalancer<string> simpleLoadBalancer = new SimpleLoadBalancer<string>();

        // Focus worker´s message handlers on the worker´s own messages.
        [XcoConcurrent]
        private void ProcessString(string text)
        {
            text = new string(text.ToCharArray().Reverse().ToArray());
            Console.WriteLine("Workflow Head: {0}", text);

            this.simpleLoadBalancer.Post(text);
        }
    }
You can see that the worker can now simply use .simpleLoadBalancer.Post() for distributing messages to the registered load carriers, and doesn't need to care about handling RegisterLoadCarrier<> messages himself because this is completely done by the SimpleLoadBalancer.

Required and Optional Message Types

By default, the worker MUST define a port for all message processors that are provided by a worker extension. For example, if a worker extension can process messages of Type T1 and T2, the worker contract must contain T1 and T2 in its PortSet, meaning the definition must look something like class WorkerContract : PortSet<...,T1,T2>.
However, the XcoWorkerExtension attribute provides the additional possibility to make message types optional. This means that e.g. if Type T1 is defined as optional, a worker can use the worker extension even without supporting this type. The following example shows how this works:
[[XcoWorkerExtension(Optional=new Type[]{typeof(T1)})]
class MyWorkerExtension
{
    [XcoConcurrent]
    void ProcessT1(T1 msg) {...}

    [XcoConcurrent]
    void ProcessT2(T2 msg) {...}
}
Instead of defining the optional type, you could also define the required types (using the XcoWorkerExtension property Required instead of Optional), meaning that all types that are not defined as required are optional.
You an also overwrite these Required/Optional settings for an existing worker extension within your worker, by just using the XcoWorkerExtension attribute for your field. An example for doing that with the XcoPublisher can be found here.

Last edited Mar 1, 2010 at 12:04 PM by thomass, version 6

Comments

No comments yet.