How to load balance and not overload workers

Oct 18, 2010 at 10:57 AM

Hi,

I am transferring a discussing I have been having with Ralf over e-mail regarding how to solve the following problem with XCoAppSpace.

As follows:

I have a database table with 10's of thousands of entries that need processing. Processing I would define as picking up a row and executing several serial processes on the row data (most of these steps are very short running however one of them can take up to 6 secs). At the moment, I have a single Windows service that sits on a machine and pulls 'n' rows from the database. It submits these to a CCR Port which handles each row of data with a CCR iterator (each step is split up in the iterator). The windows service keeps the processing Port filled up by checking to see if its dropped below a certain threshold and if so, grabs more entries from the DB table and posts to the Port. So far, so good. I achieve good throughput on the machine and it scales to the number of Cores/CPU's on the machine.

Now, I want to scale my service out to multiple machines so the proposed solution is to have a Dispatcher sitting on one machine and this farms out DB rows to the other machines (workers) for processing. This cannot be a pure round robin solution as we have no idea of the capacity of each worker machine is and do not want to "flood" a worker machine that is not capable of handling the load and so Ralf recommended the following "pattern" which I really like:

no, the dispatcher (or load balancer) does not keep state about workers. that´s the beauty of the solution. and it does no round robin! it passive except for the notifications.

 -dispatcher watches the database table
-dispatcher notifies all workers about new work items (but does not send the work items themselves with the notification!). use the XcoPublisher for notifications via pub/sub.
-workers ask dispatcher for new work items when they are ready to take them up

why is it so easy compared to other solutions you might have seen? because the appspace can do bidirectional communication. notifications are dead easy. just a line of code or two.

if a dispatcher could only send work to workers, then it would need to do that in a round robin fashion or something alike. that would be push. but with a pull model, where workers pull their work, it´s easier. just let them know there is work to do. and then trust them to ask for it when they can. this also scales very well. and with the appspace you can scale it with dozens of worker nodes distributed across the world ;-)

What I'm not sure about is how to set this up (I'm sure its very simple) using XcoAppspace? Is this a combination of Pub/Sub and bi-directional ports? i.e. Its Pub/Sub to let the workers know there is work to be done and then some kind of other bi-directional port that allows the worker to send to the dispatcher a message that it can take some work items/rows(that it needs more work) and then the dispatcher to send to the worker which rows it needs to work on. 

Would appreciate any help (pseudo code) in how to go about this.

Many thanks in advance

Mike

Coordinator
Oct 20, 2010 at 8:28 AM

Hi Mike!

The contract for your Dispatcher Worker could look like that:

public class PDispatcherWorker : PortSet<RequestWorkItem, Subscribe<NewWorkItemAvailable>, Unsubscribe<NewWorkItemAvailable>> {}

public class RequestWorkItem
{
   public Port<WorkItem> ResponsePort;
}

As you already said, it is a combination of Pub/Sub and Request/Response. For the Pub/Sub implementation you can use XcoPublisher which makes it really easy. A NewWorkItemAvailable message is always published when new Work Items arrive. Then you just wait for incoming RequestWorkItem messages and hand over the available work items with the response port. By that the dispatcher is completely unaware of who does the the processing, or how many processors there are, which is a nice thing.

A processing entity doesn't even need a published worker itself, it just subscribes to the Dispatcher Worker:

var dispatcher = space.ConnectWorker<PDispatcherWorker>(...);
var workItemsAvailablePort = space.Receive<NewWorkItemAvailable>(... if you can process additional work items, send a request message to the dispatcher...);
worker.Post(new Subscribe(workItemsAvailablePort));

//for requesting work items, you use a simple response port
var workItemsPort = space.Receive<WorkItem>(... process any received work items...);
worker.Post(new RequestWorkItem(workItemsPort));

The thing the could be a bit tricky is to decide when to request new work items, because you can of course process multiple work items in parallel. But I think you already solved this problem in your current solution by keeping the ports filled up to a certain level. You could probably use the same technique for your distributed processors.

As an alternative solution, you could even leave out the whole pub/sub mechanism and do it just with response ports:
- When a processing worker needs new work items, it sends a request to the dispatcher
- If the dispatcher has work items available, it sends one or more items back over the response port
- If not, the dispatcher stores the response port and sends work items as soon as some are available
- A processing worker doesn't send a new request to the dispatcher as long as it hasn't received any tasks

Hope I could help you!

Thomas

Oct 21, 2010 at 1:45 PM

Hi Thomas,

Yes, that definitely helps.

Is it possible you could show me what the Dispatcher looks like just so I can confirm my understanding?

Also, just to confirm that I would essentially be doing this

var workItemsAvailablePort = space.Receive<NewWorkItemAvailable>(if (canHandleNewRequests) worker.Post(new RequestWorkItem(workItemsPort));...);

Many thanks in advance

Mike

Coordinator
Oct 22, 2010 at 1:43 PM

Your Dispatcher could look something like this:

class DispatcherWorker : PDispatcherWorker
{
   XcoPublisher<NewWorkItemAvailable> publisher = new XcoPublisher<NewWorkItemAvailable>();

   public void AddNewWorkItems(...)
   {
      ... adds new work items to a list of work items when needed...
      
      publisher.Publish(new NewWorkItemAvailable());
   }

   [XcoConcurrent]
   void Process(RequestWorkItem msg)
   {
      var workItem = ... take a work item from the list of work items ...;
      msg.ResponsePort.Post(workItem);

     ... trigger loading new work items if needed ...;
   }
}

And yes, what you wrote is basically what you need to do on the client side.

Thomas

Oct 22, 2010 at 2:10 PM

Hi Thomas,

Thanks for this. I'm almost with you but your method AddNewWorkItems - How is this called to initiate the publish? Is this also marked with  [XcoConcurrent] and then from the actual host, you just post a NewWorkItemAvailable message?

i.e. Something like 

 var w = space.RunWorker<PDispatcherWorker,MyDispatcher>("SourceToProcess");
                Console.WriteLine("worker running...");

                Thread.Sleep(3000); //give clients some time to subscribe
// Check if we have entries in the DB to to process
		if(haveEntriesToProcess)
                   w.Post(new NewWorkItemAvailable());

or am I missing something?

Thanks in advance for all the help 

Mike

Coordinator
Oct 22, 2010 at 2:45 PM

Hi,

Well you could do it as you said, and also mark the AddNewWorkerItems with XcoConcurrent, but then you would of course also have to define an additional port in the worker's contract (e.g. for an "AddNewWorkerItems" message). In my example I just assumed there would be some way that new work items are handed over to the dispatcher worker. This could also just be a public method that is called directly from your local application:

var worker = new MyDispatcher()
space.RunWorker<PDispatcherWorker>(worker);
worker.AddNewWorkItems(...);

Or another way could be that you hand over a work items datasource to the worker, and it loads the data whenever it needs to:

class MyDispatcher : PDispatcherWorker
{
   ...
   public MyDispatcher(IWorkItemsSource source)
   {
    ... do initial loading of work items ...
   }
   ...
}

So, I would say how you do this is mainly a matter of what you think suits the architecture of your application best :-)

Thomas

Oct 22, 2010 at 3:32 PM

Hi Thomas,

Thanks for this. That makes sense now and the Pub/Sub is working fine however for my Req/Resp port, I cannot get anything to work i.e. The worker is not receiving the message.

Should I be able to hook it up using the same ConnectWorker i.e. Like below

 

var w = space.ConnectWorker<PDispatcherWorker>("localhost:9000");

                //subscribe to news agency
                var subscription = space.Receive<NewWorkItemAvailable>(msg => Console.WriteLine("New items are available "  ));
                w.Post(new Subscribe<NewWorkItemAvailable>(subscription));

                

                var workItemsPort = space.Receive<WorkItem>(msg => Console.WriteLine("Processing work item " + msg.MessagesToProcess[0]));
                Thread.Sleep(3000);
                Console.WriteLine("Posting request for id's");
                w.Post(new RequestWorkItem(){ResponsePort = workItemsPort, NumberOfMessagesToReceive = 2});

 

My worker is not receiving the RequestWorkItem


                [XcoConcurrent]
               public void ProcessNewRequest(RequestWorkItem msg)
                    {
                        Console.WriteLine("Received request and dispatching {0} messages",msg.NumberOfMessagesToReceive);
                        var workItem = new WorkItem();
                        List<int> ids = new List<int>();
                        ids.Add(1);
                        ids.Add(2);
                        msg.ResponsePort.Post(workItem);
                    }

Thanks in advance

 

Mike

Oct 22, 2010 at 7:59 PM

Hi Thomas,

I did not make the requestWorkItem class serializable so it did not work - All working now so thanks for your help!

Many thanks

Mike

Oct 25, 2010 at 3:55 PM

Hi Thomas,

I've got this working now but have come across something and wondered if you could help. Once I receive back on the response port the request to process 'n' messages, I want to essentially Post to another AppSpace with an Iterator, each message that was received and then get a response back after each message has been processed. The reason for this is to maintain a global count of how many messages are being processed and once this reaches a low threshold, the worker can respond to the dispatcher that it can take more messages to process. However, the handler is a static method and therefore cannot access the other AppSpace (that is processing the messages) or the global variable that holds the number of in flight messages? Is there a way/pattern to solve this?

i.e.

int globalInFlightCount = 0;

var responsePort = space.Receive...etc....(msg => DecrementGlobalCount())
var workItemsPort = space.Receive<WorkItem>(item => ProcessMessages(item));

.
.
private static void ProcessMessages(WorkItem msg)
        {
            foreach (SQCarrier sq in msg.MessagesToProcess)
            {
                Console.WriteLine("Processing Message with Id {0}", sq.MessageId);
		_w.Post(sq, ResponsePort= responsePort); --cannot do this as static method
                
            }
        }

This is really my lack of knowledge so would appreciate any tips on how to solve this (If I've made it clear enough!)

Thanks in advance

Mike

 

Coordinator
Oct 25, 2010 at 6:53 PM

Hi Mike,

I fear I don't completely understand what your problem is - could you not just make your ProcessMessages method not static? Or alternatively, you could hand over the needed worker as a method parameter:
var workItemsPort = space.Receive<WorkItem>(item => ProcessMessages(item, worker));

If that didn't help you, could you describe what you would like to do in more detail, perhaps with pseudo code?

Thomas

Oct 29, 2010 at 11:47 AM

Hi Thomas,

My fault really. My test app is a console app so was forced to use a static method. Once I slotted it in to my windows service I used an instance method and it works just fine!

Thanks again for your help - This is all working really well now!

Cheers 

Mike

Coordinator
Oct 29, 2010 at 12:08 PM

Glad to hear that :-)

Thomas

Oct 29, 2010 at 1:17 PM

Hey Mike! This looks like some interesting work. Any chance you could post your working code for download? I would like to apply this same request/response theory to a project I am working on as well. Thanks!

Nov 9, 2010 at 6:48 PM

Hi Husterk!

Sure, no problem in sharing the code. I'm slightly up to my eyeballs this week in work but should be able to get something together to illustrate the point maybe next week.

Not really sure how to send the code to you - Shout if you have any suggestions

Cheers

Mike