Publish/Subscribe question

Jul 2, 2010 at 10:27 PM

Thanks for all the great work you've done.  I'm still trying to wrap my head around all of it!  I'm hoping you can give me some direction towards what I am trying to accomplish.

I have a computer that will act as the server.  This server has 200 registers that hold double values.  I have 3 other computers that I want to be notified when maybe 1 or 2 of those registers on the server has changed.  I have seen in the examples how to use a lambda expression to subscribe only to certain posts.  So I could use this to only subscribe to save registers 100 and 101.  The problem with this is that I will be needlessly monitoring and posting all 200 registers when each of my clients will probably only want to know about a couple of them. So my questions are:

1. Would it be possible to pass in the numbers of the registers I want to monitor when a client subscribes?

2. What would be the best way to handle this monitoring of the registers in the server?  Should I have a thread running a constant loop monitoring the values and then call post to notify clients?

Thanks. 

Jul 3, 2010 at 5:53 PM

@jweav: You could use the XcoPublisher for your solution, but I guess, I´d do the registration manually.

A client sends a (de)registration message to the server stating which registers it wants to subscribe to. Along with the registration the client sends a notifcation port.

The server keeps a directory of the registers with their subscriptions, e.g.

...

register 99: subscriber A notification port, subscriber D notification port, ...
register 134: subscriber D notification port, ...

...

When a register changes the server looks up the list of its notification ports. That´s fast and easy. Only a minimum of messages is sent.

How to monitor it values changed? Dont know. Depends on how such a change happens. Whereever it happens you could fire an event. This event (containing the register number and the old and new value) could dispatch the async notification of the subscribers. 

Try not to start threads manually! Try to limit concurrent access to shared resources.

Jul 6, 2010 at 12:51 PM

Thanks for the advice.  The registers on the server are accessed via an API.  I have no choice but to poll them to look for a change.  So i'll use a timer to poll the registers that the clients have subscribed to and fire an event if any of them have changed.  

Jul 7, 2010 at 10:47 PM

Is there a complete example of optional subscription message handling? I tried following the example given under Publish/Subscribe but the example seems to be only bits and pieces and I can't figure it out. 

From the Publish/Subscribe demo:

namespace Server
{
	class TransientNewsAgency : PNewsAgency
    {
        [XcoWorkerExtension(Optional = new Type[] { typeof(Unsubscribe<OnNewsArrived>) })]
		private readonly XcoPublisher<OnNewsArrived> publisher = new XcoPublisher<OnNewsArrived>(); //XcoPublisher automatically manages incoming subscriptions

		[XcoConcurrent]
		public void ProcessPublishNews(PublishNews news)
		{
			var signal = new OnNewsArrived { PublisherName = news.PublisherName, NewsItem = news.NewsItem };
			publisher.Publish(signal); //simply call Publish() to publish messages to all subscribers
		}
        [XcoConcurrent]
        void ProcessUnsubscribeManually(Unsubscribe<OnNewsArrived> unsub)
        {

            publisher.Unsubscribe(unsub.EventPort);
        }
	}
}

When I try to run the server it complains with Message=Duplicate processor method found for type XcoAppSpaces.Core.Unsubscribe`1[Contract.OnNewsArrived]. There can only be one processor method per type!  

I thought maybe I should leave Unsubscribe off of 

public class PNewsAgency : PortSet<PublishNews, Subscribe<OnNewsArrived>>
	{
	}
but then I get:

No port found for required message type XcoAppSpaces.Core.Unsubscribe`1[Contract.OnNewsArrived]

So I have to specify somewhere to use the WorkerExtension Unsubscribe that I defined rather than the default Unsubscribe?

 

Jul 9, 2010 at 9:13 AM

Hi jweav,

You are right that this feature really wasn't working as expected - it seems an error slipped in there with the new release. So, what you did in your first code example (marking the type as optional and implementing your own processor method) was actually the right way to do it.

I corrected the error and uploaded new binaries and examples to the 1.0 release. With these your code should be working without problems now.

Jul 9, 2010 at 10:24 PM

Thanks for the update,

I do have a couple more questions though. 

When I run the code I previously posted it works great.  When I try to convert the same the VB I have a problem.  Although I don't get any intellisense error I suspect that it is a syntax error but I don't know for sure.

 

<XcoWorkerExtension([Optional]:=New Type() {GetType(Unsubscribe(Of OnNewsArrived))})> _
        Private ReadOnly publisher As New XcoPublisher(Of OnNewsArrived)()

 

This generates the following exception when I try to run it.

The given assembly name or codebase was invalid. (Exception from HRESULT: 0x80131047)

If I run without the Extension using the default Subscribe/Unsubscribe the VB sample runs fine.

 

Back to the C# publish/subscribe example.  As I mentioned the Unsubscribe worked fine.  However, when I tried to do the same for subscribe, I get the intellisense error on Publisher.Subscribe:

Error 2 The best overloaded method match for 'XcoAppSpaces.Core.XcoPublisher<Contract.OnNewsArrived>.Subscribe(Microsoft.Ccr.Core.Port<Contract.OnNewsArrived>)' has some invalid arguments 

[XcoWorkerExtension(Optional = new Type[] { typeof(Subscribe<OnNewsArrived>) })]
      	private readonly XcoPublisher<OnNewsArrived> publisher = new XcoPublisher<OnNewsArrived>(); 

        [XcoConcurrent]
        void ProcessSubscribeManually(Subscribe<OnNewsArrived> SubS)
        {
            publisher.Subscribe(SubS.EventPort);
        }

The problem with the argument is  Error 3 Argument 1: cannot convert from 'Microsoft.Ccr.Core.IPort' to 'Microsoft.Ccr.Core.Port<Contract.OnNewsArrived>'

I don't understand what it is asking for.  Shouldn't it want the port from the client? 

Thanks,

Jon

 

 

Jul 12, 2010 at 8:38 AM
Edited Jul 12, 2010 at 8:39 AM

Hi Jon,

I fear I can't really help you with the first error, since I don't have experience with VB. After googling the error message, I don't think it's a syntax error though, and doesn't it seem to be especially AppSpace-related.

As for the second error: In principle your code is correct, but for some circumstances the EventPort property of the Subscribe class is not of type Port<OnNewsArrived>, but of the more general Type IPort. Because of that you need to make an explicit cast here:

[XcoConcurrent]
void ProcessSubscribeManually(Subscribe<OnNewsArrived> SubS)
{
     publisher.Subscribe((Port<OnNewsArrived>)SubS.EventPort);
}

Aug 13, 2010 at 3:33 PM
Edited Aug 13, 2010 at 4:18 PM

It's been a while since I've been able to play more with this.   I have figured out how to process the subscribe manually.  Using the example how can I pass in additional information when a client subscribes.  For example I would like to pass in an int value along with the port.

 

[XcoConcurrent ]
        private void ProcessSubscribeManually(Subscribe<OnNewsArrived> SubS, int SomeNum)
        {
            publisher.Subscribe((Port <OnNewsArrived>)SubS.EventPort);
            //do something with SomeNum
        }


How do I send the int from the client?

var subscription = new Port<OnNewsArrived>();
w.Post(new Subscribe<OnNewsArrived>(subscription));


I've also been trying to figure out this exception I'm getting when I try to use the worker extension in VB.  I'm convinced this is not a syntax error.  I've checked it using .NET reflector and it is correct.  Some forum posts suggest that it might have something to do with the name of an assembly.  I can't figure out why it would work in C# but not VB.

I noticed in the exception details that the file path begins with d:\.  I'm running everything from C:\ . Could this be the cause of the file load exception?  If so how do I change those paths?

System.IO.FileLoadException was unhandled
  Message=The given assembly name or codebase was invalid. (Exception from HRESULT: 0x80131047)
  Source=mscorlib
  StackTrace:
       at System.Reflection.CustomAttribute._GetPropertyOrFieldData(RuntimeModule pModule, Byte** ppBlobStart, Byte* pBlobEnd, String& name, Boolean& bIsProperty, RuntimeType& type, Object& value)
       at System.Reflection.CustomAttribute.GetCustomAttributes(RuntimeModule decoratedModule, Int32 decoratedMetadataToken, Int32 pcaCount, RuntimeType attributeFilterType, Boolean mustBeInheritable, IList derivedAttributes, Boolean isDecoratedTargetSecurityTransparent)
       at System.Reflection.CustomAttribute.GetCustomAttributes(RuntimeFieldInfo field, RuntimeType caType)
       at System.Reflection.RuntimeFieldInfo.GetCustomAttributes(Type attributeType, Boolean inherit)
       at XcoAppSpaces.Core.CcrWiring.WorkerFactory.Get_worker_extension_attribute(FieldInfo field) in d:\Development\Uni\XcoAppSpaces\source.implementation\XcoAppSpaces.Core.CcrWiring\XcoAppSpaces.Core.CcrWiring\WorkerFactory.cs:line 105
       at XcoAppSpaces.Core.CcrWiring.WorkerFactory.&lt;&gt;c__DisplayClass4.&lt;Get_worker_extension_objects&gt;b__2(FieldInfo f) in d:\Development\Uni\XcoAppSpaces\source.implementation\XcoAppSpaces.Core.CcrWiring\XcoAppSpaces.Core.CcrWiring\WorkerFactory.cs:line 99
       at System.Linq.Enumerable.WhereSelectArrayIterator`2.MoveNext()
       at XcoAppSpaces.Core.CcrWiring.WorkerFactory.Compile_message_processor_methods[TProcessorModeAttribute](Object worker, IEnumerable`1 workerExtensions) in d:\Development\Uni\XcoAppSpaces\source.implementation\XcoAppSpaces.Core.CcrWiring\XcoAppSpaces.Core.CcrWiring\WorkerFactory.cs:line 89
       at XcoAppSpaces.Core.CcrWiring.WorkerFactory.Wire[TMessageContract](TMessageContract worker, DispatcherQueue queue) in d:\Development\Uni\XcoAppSpaces\source.implementation\XcoAppSpaces.Core.CcrWiring\XcoAppSpaces.Core.CcrWiring\WorkerFactory.cs:line 59
       at XcoAppSpaces.Core.CcrWiring.WorkerFactory.Create[TMessageContract,TImplementation](DispatcherQueue queue) in d:\Development\Uni\XcoAppSpaces\source.implementation\XcoAppSpaces.Core.CcrWiring\XcoAppSpaces.Core.CcrWiring\WorkerFactory.cs:line 51
       at XcoAppSpaces.Core.CcrWiring.CcrWirer.CreateWorker[TMessageContract,TImplementation](DispatcherQueue dpq) in d:\Development\Uni\XcoAppSpaces\source.implementation\XcoAppSpaces.Core.CcrWiring\XcoAppSpaces.Core.CcrWiring\CcrWirer.cs:line 71
       at XcoAppSpaces.Core.XcoAppSpace.RunWorker[TMessageContract,TImplementation](PublishMode publishMode, String name, DispatcherQueue queue) in d:\Development\Uni\XcoAppSpaces\source.implementation\XcoAppSpaces.Core\XcoAppSpaces.Core\public\XcoAppSpace.cs:line 277
       at XcoAppSpaces.Core.XcoAppSpace.RunWorker[TMessageContract,TImplementation]() in d:\Development\Uni\XcoAppSpaces\source.implementation\XcoAppSpaces.Core\XcoAppSpaces.Core\public\XcoAppSpace.cs:line 174
       at Server.ServerPgm.Main() in C:\Documents and Settings\jweaver\My Documents\Visual Studio 10\Xcoordination\XcoAppSpaces Usage Demos\VBFilteredSub\xoAppSpaceServer1\ServerPgm.vb:line 11
  InnerException:

Aug 13, 2010 at 3:42 PM

Sorry, can´t read what you posted. Please reformat.

Aug 13, 2010 at 4:23 PM
ralfw wrote:

Sorry, can´t read what you posted. Please reformat.

 Ok you should be able to read it now.

Aug 14, 2010 at 4:46 PM
Edited Aug 14, 2010 at 4:47 PM

Hi Jon,

If you want to send additional information along with the subscription, you will need to replace the standard subscribe message with your own one. You could for example just extend the Subscribe<T> class and add your own information to it, like this:

[Serializable]
public class MySubscribe<T> : Subscribe<T>
{
  public int SomeNum { get; set; }
  ...
}

You would then just use your own type instead of Subscribe in the contract, in the processor method and for posting.

As for the VB related error: The path you are seeing in the exceptions are the ones that are stored in the PDB files for debugging. So these are the paths where the appspace dll was compiled - which should therefore not be related to the error in any way. If you remove the XcoAppSpace PDB files you won't see those paths any more - or, if you compile XcoAppSpaces on your local machine, you will see your local paths in the StackTrace.

Aug 14, 2010 at 6:22 PM

Thomass,

Thanks for the information.  I understand what you're explaining.  When I tried to run the server I got an exception that I can't seem to figure out.  It says that I don't have a required port.  Can you see my error? 

 

XcoAppSpaces.Contracts.Exceptions.XcoWorkerException was unhandled
  Message=No port found for required message type Microsoft.Ccr.Core.Port`1[Contract.PMirroringWorker+VarNotification]
  Source=XcoAppSpaces
  StackTrace:
       at XcoAppSpaces.Core.CcrWiring.WorkerFactory.Create_CCRreceiver_to_bind_message_processor_method_to_port(Dictionary`2 messageContractPorts, MessageProcessorInfo messageProcessor, DispatcherQueue queue, Boolean makePersistent, Dictionary`2 alreadyWiredProcessors) in d:\Development\Uni\XcoAppSpaces\source.implementation\XcoAppSpaces.Core.CcrWiring\XcoAppSpaces.Core.CcrWiring\WorkerFactory.cs:line 178
       at XcoAppSpaces.Core.CcrWiring.WorkerFactory.Create_CCRreceivers_for_message_processor_methods(Dictionary`2 messageContractPorts, IEnumerable`1 messageProcessors, DispatcherQueue queue, Boolean makePersistent, Dictionary`2 alreadyWiredProcessors) in d:\Development\Uni\XcoAppSpaces\source.implementation\XcoAppSpaces.Core.CcrWiring\XcoAppSpaces.Core.CcrWiring\WorkerFactory.cs:line 149
       at XcoAppSpaces.Core.CcrWiring.WorkerFactory.Wire_message_contract_ports_to_message_processor_methods(Dictionary`2 messageContractPorts, IEnumerable`1 exclusiveMessageProcessors, IEnumerable`1 concurrentMessageProcessors, IEnumerable`1 teardownMessageProcessors, DispatcherQueue queue) in d:\Development\Uni\XcoAppSpaces\source.implementation\XcoAppSpaces.Core.CcrWiring\XcoAppSpaces.Core.CcrWiring\WorkerFactory.cs:line 131
       at XcoAppSpaces.Core.CcrWiring.WorkerFactory.Wire[TMessageContract](TMessageContract worker, DispatcherQueue queue) in d:\Development\Uni\XcoAppSpaces\source.implementation\XcoAppSpaces.Core.CcrWiring\XcoAppSpaces.Core.CcrWiring\WorkerFactory.cs:line 66
       at XcoAppSpaces.Core.CcrWiring.WorkerFactory.Create[TMessageContract,TImplementation](DispatcherQueue queue) in d:\Development\Uni\XcoAppSpaces\source.implementation\XcoAppSpaces.Core.CcrWiring\XcoAppSpaces.Core.CcrWiring\WorkerFactory.cs:line 51
       at XcoAppSpaces.Core.CcrWiring.CcrWirer.CreateWorker[TMessageContract,TImplementation](DispatcherQueue dpq) in d:\Development\Uni\XcoAppSpaces\source.implementation\XcoAppSpaces.Core.CcrWiring\XcoAppSpaces.Core.CcrWiring\CcrWirer.cs:line 71
       at XcoAppSpaces.Core.XcoAppSpace.RunWorker[TMessageContract,TImplementation](PublishMode publishMode, String name, DispatcherQueue queue) in d:\Development\Uni\XcoAppSpaces\source.implementation\XcoAppSpaces.Core\XcoAppSpaces.Core\public\XcoAppSpace.cs:line 277
       at XcoAppSpaces.Core.XcoAppSpace.RunWorker[TMessageContract,TImplementation]() in d:\Development\Uni\XcoAppSpaces\source.implementation\XcoAppSpaces.Core\XcoAppSpaces.Core\public\XcoAppSpace.cs:line 174
       at Server.ServerPgm.Main(String[] args) in C:\Documents and Settings\jweaver\My Documents\Visual Studio 10\Xcoordination\XcoAppSpaces Usage Demos\VBFilteredSub\cServer\SvrProgram.cs:line 15
       at System.AppDomain._nExecuteAssembly(RuntimeAssembly assembly, String[] args)
       at System.AppDomain.ExecuteAssembly(String assemblyFile, Evidence assemblySecurity, String[] args)
       at Microsoft.VisualStudio.HostingProcess.HostProc.RunUsersAssembly()
       at System.Threading.ThreadHelper.ThreadStart_Context(Object state)
       at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx)
       at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
       at System.Threading.ThreadHelper.ThreadStart()
  InnerException:

 

This is what my contract looks like:

namespace Contract
{
    public class PMirroringWorker : PortSet<PMirroringWorker.Var, 
                                            Unsubscribe<PMirroringWorker.VarNotification>, 
                                            sscribe<PMirroringWorker.VarNotification>>
    {

        [Serializable()]
        public class VarNotification
        {
            public int VarNumber;
            public double Val;
        }

        [Serializable()]
        public class Var
        {
            public int VarNum;
            public double NewVal;
        }
    }

    public class sscribe<t> : Subscribe<t>
    {

        public sscribe(Port<PMirroringWorker.VarNotification> sport, int VarNum)
            : base(sport)
        {
        }

    }

}

And the server looks like this:

 

namespace Server
{
	static class ServerPgm
	{

        static void Main(string[] args)
		{
			Console.WriteLine("Xco Application Space - Filtered Subscriptions Server");
			using (var space = new XcoAppSpace("tcp.port=9000")) {
				var w = space.RunWorker<PMirroringWorker, MirroringWorker>();
				Console.WriteLine("running... Enter to continue");
				Console.ReadLine();
				Lp:
				//now post some data to the worker that should be published
				for (int i = 1; i <= 200; i++) {
                    w.Post(new PMirroringWorker.Var
                    {
						VarNum = i,
						NewVal = i * 1.234
					});
					Console.WriteLine(string.Format("Posting Var {0} with Val {1}", i, i * 1.234));
				}

				if (Console.ReadLine() == "r") {
					goto Lp;
				}

			}
		}

	}

}

And the worker:

 

namespace Server
{
    class MirroringWorker : PMirroringWorker
    {
        [XcoWorkerExtension(Required = new Type[] { typeof(sscribe<VarNotification>) })]
        private readonly XcoPublisher<VarNotification> VarSubscriptions = new XcoPublisher<VarNotification>();

        private Dictionary<int, Port<VarNotification>> slist = new Dictionary<int, Port<VarNotification>>();

        [XcoConcurrent()]
        private void ProcessVar(Var Val)
        {
            VarSubscriptions.Publish(new VarNotification
            {
                VarNumber = Val.VarNum,
                Val = Val.NewVal
            });
        }

        [XcoConcurrent()]
                private void ManualSubscribe(Port<VarNotification> sport, int VarNum)
        {
            slist.Add(VarNum, sport);
            //VarSubscriptions.sscribe(sport);
        }
    }

}

 

Aug 15, 2010 at 9:37 AM

I  think there are two errors in your worker implementation:

1. You set the type "sscribe<VarNotification>" to required for the worker extension, but this type is not part of the worker extension. Instead, you would need to set the Subscribe<VarNotification> to optional since you don't use it and therefore have to tell the XcoPublisher that it is not required:

[XcoWorkerExtension(Optional = new Type[] { typeof(Subscribe<VarNotification>) })]
private readonly XcoPublisher<VarNotification> VarSubscriptions = new XcoPublisher<VarNotification>();

2. The processing method for the manual subscribe needs to have a parameter of your new message type sscribe<VarNotification>, or else it cannot be matched to the worker port:

[XcoConcurrent()]
private void ManualSubscribe(sscribe<VarNotification>)
...

Aug 16, 2010 at 5:06 PM
Thanks Thomas it worked great. I appreciate all the help. Jon