Handling Exceptions without blocking Worker

Jan 31, 2010 at 9:12 PM

Dear Ralf,

I am intensively working with XcoAppSpace and also CcrFlows realizing a data feed server application. Developed some good patterns to effectively work with it so far. Both great - and smart! - projects.

However, I think we need to add support for exception scenarios. For sequential processes such as a Worker Method marked with [XcoEclusive] or a SeqStage in CcrFlows, any exception thrown during processing blocks any further processing by means of not handling any new messages arriving on the input port. For today, i use the following approach to overcome this issue:

1) Since I mainly use CcrFlows as the real "meat" to process messages within Xco Workers, using [XcoExclusive] or [XcoConcurrent] is not that important to me. Sequential processing and the correponding locking is assured by the CcrFlow anyway. So I simply decided to change all my [XcoExclusive] Attributes to [XcoConcurrent]. Nevertheless, for the long run I would like to find a better solution.

2) Regarding the SeqStages in CcrFlows, I added a causality in the inputhandler, being posted together with the input task. This causality fetches any exception thrown down the road and assures that the ReceiverTask on the Input Port is activated again (this avoids th blocking).  Here are my changes in the SeqStage class:

    class SeqStage<TIn, TOut> : IteratorStage<TIn, TOut>
    {
        DispatcherQueue _exceptionQueue = new DispatcherQueue();
        Port<Exception> _exceptionPort = new Port<Exception>();
        string _exceptionCausalityID = Guid.NewGuid().ToString();
        Causality _exceptionCausality = null;

        internal SeqStage(SeqHandler<TIn, TOut> handler)
        {
            _exceptionCausality = new Causality(_exceptionCausalityID, _exceptionPort);
            Arbiter.Activate(_exceptionQueue, Arbiter.Receive<Exception>(true, _exceptionPort, HandleException));
            base.InitializeInputHandler(Arbiter.Receive(
                                        false,
                                        (Port<TIn>)base.inputPorts[0],
                                        i =>
                                        {
                                            Dispatcher.AddCausality(_exceptionCausality);
                                            handler(i, (Port<TOut>)base.outputPorts[0]);
                                            Dispatcher.RemoveCausality(_exceptionCausality);
                                            base.WaitForNextInput();
                                        }
                                        ));
        }

        /// <summary>
        /// Sole purpose of this handler:
        /// Avoid that SeqStage gets blocked in case an exception was thrown during processing the SeqHandler.
        /// This is simply done by re-activating the Input Receiver.
        /// </summary>
        /// <param name="e"> The exception.</param>
        private void HandleException(Exception e)
        {
            Dispatcher.RemoveCausality(_exceptionCausalityID);
            //-----Avoid that SeqStage Input is blocked due to the exception.
            base.WaitForNextInput();
            throw e; //Rethrow exception
        }

Don't take it as a bug report from my side. It shall rather be some thoughts to be discussed. Tell me if I'm wrong - it will make me happy.

See you - and best regards,

Claas

 

Coordinator
Feb 1, 2010 at 7:57 AM
Edited Feb 1, 2010 at 8:05 AM

Hi, Claas!

Great to hear you´re excercising the AppSpace quite a bit ;-)

Unfortunately I don´t really understand what you´re trying to do, the "business scenario". Could you explain that maybe in pseudo-code? A small sample would do.

Also:

Have you tried the most recent AppSpace version? It supports sequential ports (for remoting).

Have you tried the CCR Space (http://code.google.com/p/ccrspace/) for its flow features instead of CCR Flows? I´m not supporting the CCR Flows anymore.

-Ralf

 

Feb 2, 2010 at 10:32 PM

Hi Ralf,

thanks for your hints!

Regarding 2) (AppSpace Version)  and 3) (CcrFlows <-> CCR Space) :

2) I indeed worked with CTP5. I am now upgrading to CTP6. The introduced separation of AppSpace and UnityContainer put some lines on my ToDo list.., but the first components are recoded to the new version and unit tests work well.  I also tested a little bit more around the "Blocking" question I raised in the beginning. With CTP6, I could not recreate the Blocking of an [XcoExclusive] method. But as already mentioned I enabled the SeqStage class of the CcrFlows framework to avoid any blocking even in case of exceptions thrown within a flow. So, I must admit that this might have been the real problem behind it, not the AppSpace [XcoExclusive] implementation itself. However, with CTP6 and the changed SeqStage class it works well.

3) Thanks for the information that you do not support CcrFlows anymore. It was and is - nevertheless - a very good framework as I think. At least to my eyes, it provides the missing coding metapher when working with the asynchroneous paradigm. That way, I could overcome writing code with C# Iterators and lots of "yield return" statements, which I think are hard to understand and debug when it comes to longer process-chains. I did not yet have a look at CCR Space. As all business code is 95% finished, I now honestly want to put my application into action first, before introducing the next big refactoring to maybe the CCR Space. But I will have a look at it, be sure I get back to this.. :-)

Finally, some words to 1) (My Business Scenario).

I am developing a server application that autonomously refreshes ship position information for different ships (that users can configure). Simply spoken, its all around a user-service, a ship-service and an autonomous refresh-engine. All of these components shall run a server. A small wpf client application shall enable the user to quickly manage his set of ships which the server should refresh the position for him.

The above mentioned services work on the server-side and comprise my components (beneath some others). They live in an AppSpace, so they are "workers" to speak in AppSpace terms. Each of the methods these workers publish simply relay the work to corresponding "Processes" (not operating system processes, but rather classes that define the Business-Process that deliver the functionality published by the worker methods). And it is these processes that I implemented using a CcrFlows. So, XcoAppSpace and CcrFlows take different responsibilities: The AppSpace makes the workers live and assures connectivity, whereas the CcrFlows provide the framework to define the business processes. Whereas the AppSpace workers define the big picture, the Flow comes into place for the plumbing on a lower, more code-related level. Of course these business processes span multiple AppSpace workers where needed. IA common pattern within my server-side code is the following (shortened code):

Worker:

        [XcoExclusive]
        public void ProcessCreateShip(PShipService.CreateShip request)
        {
            _createShipProcess.Flow.Input.Post(request);
        }

Process (instantiated and referenced by Worker class. The code snippet shows the constructor):

        public CreateShipProcess(EDMMetaData edmMetaData, PShipService shipService, PAuthService authService)
        {
            _edmMetaData = edmMetaData;
            _shipService = shipService;
            _authService = authService;
            this.Flow = Flow<PShipService.CreateShip>
                        .Do<ProcessCard>(CreateProcessCard)
                        .Scatter<ProcessCard>(GetCallerName) //Call to external Service
                        .Scatter<ProcessCard>(ValidateCallerToken) //Call to external Service
                        .Do<ProcessCard>(ValidateRequest)
                        .Do<ProcessCard>(Query)
                        .Do<ProcessCard>(EnableChangeTracking)
                        .Do<ProcessCard>(CreateResponse)
                        .Do<ProcessCard>(RespondToPortAndCausality)
                        .Do<PShipService.CreateShipResponse>(DiscardProcessCard);
        }

The state of the process is hold in a "ProcessCard", so different stages can easily share state. Such a ProcessCard is newly created for every request and is hold in memory for the duration of the flow. Its cheap, easy and seem to work pretty well.
So, as you see this pattern might change with CCR Space, but as mentioned I did not have a look at it up to now.

'Hope this is of some (informative) value for you..

My very best regards!

Claas

Coordinator
Feb 3, 2010 at 9:10 PM

Hi, Claas!

Thx for the info on your application. This sounds like what I intended with the CCR Flows ;-) Use the AppSpace to distribute code, use some flow API to wire-up processes from local and/or distributed stages.

If the CCR Flows suite you, just use them. Even though I´m not supporting them anymore (and they never had been an official part of the AppSpace), they won´t go bad ;-) You have the source code and could even maintain them.

But have a look at the CCR Space. It´s a more general layer on top of the CCR which integrates the AppSpace with the CCR quite naturally, I´d say. It also features flows.

Have you done any performance/scalability comparisons between some "usual" implementation of your server side stuff and the CCR based version?

-Ralf