The Labs \ Source Viewer \ SSCLI \ System.Runtime.Remoting.Contexts \ SynchronizedServerContextSink

  1. // ==++==
  2. //
  3. //
  4. // Copyright (c) 2006 Microsoft Corporation. All rights reserved.
  5. //
  6. // The use and distribution terms for this software are contained in the file
  7. // named license.txt, which can be found in the root of this distribution.
  8. // By using this software in any fashion, you are agreeing to be bound by the
  9. // terms of this license.
  10. //
  11. // You must not remove this notice, or any other, from this software.
  12. //
  13. //
  14. // ==--==
  15. //
  16. // Synchronization Property for URT Contexts. Uses the ThreadPool API.
  17. // An instance of this property in a context enforces a synchronization
  18. // domain for the context (and all contexts that share the same instance).
  19. // This means that at any instant, at most 1 thread could be executing
  20. // in all contexts that share an instance of this property.
  21. //
  22. // This is done by contributing sinks that intercept and serialize in-coming
  23. // calls for the respective contexts.
  24. //
  25. // If the property is marked for re-entrancy, then call-outs are
  26. // intercepted too. The call-out interception allows other waiting threads
  27. // to enter the synchronization domain for maximal throughput.
  28. //
  29. namespace System.Runtime.Remoting.Contexts
  30. {
  31.     using System.Threading;
  32.     using System.Runtime.Remoting;
  33.     using System.Runtime.Remoting.Messaging;
  34.     using System.Runtime.Remoting.Activation;
  35.     using System.Security.Permissions;
  36.     using System;
  37.     using Queue = System.Collections.Queue;
  38.     using ArrayList = System.Collections.ArrayList;
  39.     [Serializable()]
  40.     [AttributeUsage(AttributeTargets.Class)]
  41.     [SecurityPermissionAttribute(SecurityAction.LinkDemand, Flags = SecurityPermissionFlag.Infrastructure)]
  42.     [SecurityPermissionAttribute(SecurityAction.InheritanceDemand, Flags = SecurityPermissionFlag.Infrastructure)]
  43.     [System.Runtime.InteropServices.ComVisible(true)]
  44.     public class SynchronizationAttribute : ContextAttribute, IContributeServerContextSink, IContributeClientContextSink
  45.     {
  46.         // The class should not be instantiated in a context that has Synchronization
  47.         public const int NOT_SUPPORTED = 1;
  48.        
  49.         // The class does not care if the context has Synchronization or not
  50.         public const int SUPPORTED = 2;
  51.        
  52.         // The class should be instantiated in a context that has Synchronization
  53.         public const int REQUIRED = 4;
  54.        
  55.         // The class should be instantiated in a context with a new instance of
  56.         // Synchronization property each time
  57.         public const int REQUIRES_NEW = 8;
  58.        
  59.         private const string PROPERTY_NAME = "Synchronization";
  60.        
  61.         private static readonly UInt32 _timeOut = (UInt32)2147483647;
  62.         // event that releases a thread-pool worker thread
  63.         [NonSerialized()]
  64.         internal AutoResetEvent _asyncWorkEvent;
  65.         [NonSerialized()]
  66.         private RegisteredWaitHandle _waitHandle;
  67.        
  68.         // queue of work items.
  69.         [NonSerialized()]
  70.         internal Queue _workItemQueue;
  71.         // flag for the domain lock (access always synchronized on the _workItemQueue)
  72.         [NonSerialized()]
  73.         internal bool _locked;
  74.         // flag to indicate if the lock should be released during call-outs
  75.         internal bool _bReEntrant;
  76.         // flag for use as an attribute on types
  77.         internal int _flavor;
  78.        
  79.         [NonSerialized()]
  80.         private SynchronizationAttribute _cliCtxAttr;
  81.         // Logical call id (used only in non-reentrant case for deadlock avoidance)
  82.         [NonSerialized()]
  83.         private string _syncLcid;
  84.         [NonSerialized()]
  85.         private ArrayList _asyncLcidList;
  86.        
  87.        
  88.         public virtual bool Locked {
  89.             get { return _locked; }
  90.             set { _locked = value; }
  91.         }
  92.         public virtual bool IsReEntrant {
  93.             get { return _bReEntrant; }
  94.         }
  95.        
  96.         internal string SyncCallOutLCID {
  97.             get {
  98.                 BCLDebug.Assert(!_bReEntrant, "Should not use this for the reentrant case");
  99.                
  100.                 return _syncLcid;
  101.             }
  102.            
  103.             set {
  104.                 BCLDebug.Assert(!_bReEntrant, "Should not use this for the reentrant case");
  105.                
  106.                 BCLDebug.Assert(_syncLcid == null || (_syncLcid != null && value == null) || _syncLcid.Equals(value), "context can be associated with one logical call at a time");
  107.                
  108.                 _syncLcid = value;
  109.             }
  110.         }
  111.        
  112.         internal ArrayList AsyncCallOutLCIDList {
  113.             get { return _asyncLcidList; }
  114.         }
  115.        
  116.         internal bool IsKnownLCID(IMessage reqMsg)
  117.         {
  118.             string msgLCID = ((LogicalCallContext)reqMsg.Properties[Message.CallContextKey]).RemotingData.LogicalCallID;
  119.             return (msgLCID.Equals(_syncLcid) || _asyncLcidList.Contains(msgLCID));
  120.            
  121.         }
  122.        
  123.        
  124. /*
  125.         *  Constructor for the synchronized dispatch property
  126.         */       
  127.        
  128.         public SynchronizationAttribute() : this(REQUIRED, false)
  129.         {
  130.         }
  131.        
  132. /*
  133.         *  Constructor.
  134.         *  If reEntrant is true, we allow other calls to come in
  135.         *  if the currently running call leaves the domain for a call-out.
  136.         */       
  137.        
  138.         public SynchronizationAttribute(bool reEntrant) : this(REQUIRED, reEntrant)
  139.         {
  140.         }
  141.        
  142.        
  143.         public SynchronizationAttribute(int flag) : this(flag, false)
  144.         {
  145.         }
  146.        
  147.        
  148.         // Invoke ContextProperty ctor!
  149.         public SynchronizationAttribute(int flag, bool reEntrant) : base(PROPERTY_NAME)
  150.         {
  151.            
  152.             _bReEntrant = reEntrant;
  153.            
  154.             switch (flag) {
  155.                 case NOT_SUPPORTED:
  156.                 case SUPPORTED:
  157.                 case REQUIRED:
  158.                 case REQUIRES_NEW:
  159.                     _flavor = flag;
  160.                     break;
  161.                 default:
  162.                     throw new ArgumentException(Environment.GetResourceString("Argument_InvalidFlag"), "flag");
  163.                     break;
  164.             }
  165.         }
  166.        
  167.         // Dispose off the WaitHandle registered in Initialization
  168.         internal void Dispose()
  169.         {
  170.             //Unregister the RegisteredWaitHandle
  171.             if (_waitHandle != null)
  172.                 _waitHandle.Unregister(null);
  173.         }
  174.        
  175.         // Override ContextAttribute's implementation of IContextAttribute::IsContextOK
  176.         [System.Runtime.InteropServices.ComVisible(true)]
  177.         public override bool IsContextOK(Context ctx, IConstructionCallMessage msg)
  178.         {
  179.             if (ctx == null)
  180.                 throw new ArgumentNullException("ctx");
  181.             if (msg == null)
  182.                 throw new ArgumentNullException("msg");
  183.            
  184.             // If the context has ThreadAffinity then it is synchronized by default!
  185.             bool isOK = true;
  186.             if (_flavor == REQUIRES_NEW) {
  187.                 isOK = false;
  188.                 // Each activation request instantiates a new attribute class.
  189.                 // We are relying on that for the REQUIRES_NEW case!
  190.                 BCLDebug.Assert(ctx.GetProperty(PROPERTY_NAME) != this, "ctx.GetProperty(PROPERTY_NAME) != this");
  191.             }
  192.             else {
  193.                 SynchronizationAttribute syncProp = (SynchronizationAttribute)ctx.GetProperty(PROPERTY_NAME);
  194.                 if (((_flavor == NOT_SUPPORTED) && (syncProp != null)) || ((_flavor == REQUIRED) && (syncProp == null))) {
  195.                     isOK = false;
  196.                 }
  197.                
  198.                 if (_flavor == REQUIRED) {
  199.                     // pick up the property from the current context
  200.                     _cliCtxAttr = syncProp;
  201.                 }
  202.             }
  203.             return isOK;
  204.         }
  205.        
  206.         // Override ContextAttribute's impl. of IContextAttribute::GetPropForNewCtx
  207.         [System.Runtime.InteropServices.ComVisible(true)]
  208.         public override void GetPropertiesForNewContext(IConstructionCallMessage ctorMsg)
  209.         {
  210.             if ((_flavor == NOT_SUPPORTED) || (_flavor == SUPPORTED) || (null == ctorMsg)) {
  211.                 return;
  212.             }
  213.            
  214.             if (_cliCtxAttr != null) {
  215.                 BCLDebug.Assert(_flavor == REQUIRED, "Use cli-ctx property only for the REQUIRED flavor");
  216.                 ctorMsg.ContextProperties.Add((IContextProperty)_cliCtxAttr);
  217.                 _cliCtxAttr = null;
  218.             }
  219.             else {
  220.                 ctorMsg.ContextProperties.Add((IContextProperty)this);
  221.             }
  222.         }
  223.        
  224.         // We need this to make the use of the property as an attribute
  225.         // light-weight. This allows us to delay initialize everything we
  226.         // need to fully function as a ContextProperty.
  227.         internal virtual void InitIfNecessary()
  228.         {
  229.             lock (this) {
  230.                 if (_asyncWorkEvent == null) {
  231.                     // initialize thread pool event to non-signaled state.
  232.                     _asyncWorkEvent = new AutoResetEvent(false);
  233.                    
  234.                     _workItemQueue = new Queue();
  235.                     _asyncLcidList = new ArrayList();
  236.                    
  237.                     WaitOrTimerCallback callBackDelegate = new WaitOrTimerCallback(this.DispatcherCallBack);
  238.                    
  239.                     // Register a callback to be executed by the thread-pool
  240.                     // each time the event is signaled.
  241.                         // state info
  242.                     _waitHandle = ThreadPool.RegisterWaitForSingleObject(_asyncWorkEvent, callBackDelegate, null, _timeOut, false);
  243.                     // bExecuteOnlyOnce
  244.                 }
  245.             }
  246.         }
  247.        
  248. /*
  249.         * Call back function -- executed for each work item that
  250.         * was enqueued. This is invoked by a thread-pool thread for
  251.         * async work items and the caller thread for sync items.
  252.         */       
  253.         private void DispatcherCallBack(object stateIgnored, bool ignored)
  254.         {
  255.             // This function should be called by only one thread at a time. We will
  256.             // ensure this by releasing exactly one waiting thread to go work on
  257.             // a WorkItem
  258.            
  259.             //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] --- In DispatherCallBack ");
  260.            
  261.             BCLDebug.Assert(_locked == true, "_locked==true");
  262.             WorkItem work;
  263.             // get the work item out of the queue.
  264.             lock (_workItemQueue) {
  265.                 work = (WorkItem)_workItemQueue.Dequeue();
  266.                 //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] --- Dequeued Work for: " + work._thread.GetHashCode());
  267.             }
  268.             BCLDebug.Assert(work != null, "work!=null");
  269.             BCLDebug.Assert(work.IsSignaled() && !(work.IsDummy()), "work.IsSignaled() && !(work.IsDummy())");
  270.             // execute the work item (WorkItem.Execute will switch to the proper context)
  271.             ExecuteWorkItem(work);
  272.             HandleWorkCompletion();
  273.             //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] --- CallBack finished for: " + work._thread.GetHashCode());
  274.         }
  275.        
  276. /*
  277.         *  This is used by the call-out (client context) sinks to notify
  278.         *  the domain manager that the thread is leaving
  279.         */       
  280.         internal virtual void HandleThreadExit()
  281.         {
  282.             //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ~~~~ Thread EXIT ~~~~");
  283.             // For now treat this as if the work was completed!
  284.             BCLDebug.Assert(_locked == true, "_locked==true");
  285.             HandleWorkCompletion();
  286.         }
  287.        
  288. /*
  289.         *  This is used by a returning call-out thread to request
  290.         *  that it be queued for re-entry into the domain.
  291.         */       
  292.         internal virtual void HandleThreadReEntry()
  293.         {
  294.             //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ~~~~ Thread REQUEST REENTRY ~~~~");
  295.             // Treat this as if a new work item needs to be done
  296.             WorkItem work = new WorkItem(null, null, null);
  297.             work.SetDummy();
  298.             HandleWorkRequest(work);
  299.         }
  300.        
  301. /*
  302.         *  This gets called at the end of work.Execute and from
  303.         *  HandleThreadExit() in the re-entrant scenario.
  304.         *  This is the point where we decide what to do next!
  305.         */       
  306.         internal virtual void HandleWorkCompletion()
  307.         {
  308.             // We should still have the lock held for the workItem that just completed
  309.             BCLDebug.Assert(_locked == true, "_locked==true");
  310.             // Now we check the queue to see if we need to release any one?
  311.             WorkItem nextWork = null;
  312.             bool bNotify = false;
  313.             lock (_workItemQueue) {
  314.                 if (_workItemQueue.Count >= 1) {
  315.                     nextWork = (WorkItem)_workItemQueue.Peek();
  316.                     bNotify = true;
  317.                     nextWork.SetSignaled();
  318.                 }
  319.                 else {
  320.                     // We set locked to false only in the case there is no
  321.                     // next work to be done.
  322.                     // NOTE: this is the only place _locked in ever set to false!
  323.                     //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] Domain UNLOCKED!");
  324.                     _locked = false;
  325.                 }
  326.             }
  327.             // See if we found a non-signaled work item at the head.
  328.             if (bNotify) {
  329.                 // In both sync and async cases we just hand off the _locked state to
  330.                 // the next thread which will execute.
  331.                 if (nextWork.IsAsync()) {
  332.                     // Async-WorkItem: signal ThreadPool event to release one thread
  333.                     _asyncWorkEvent.Set();
  334.                     //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ### Signal " + nextWork._thread.GetHashCode() + (nextWork.IsDummy()?" DUMMY ":" REAL "));
  335.                 }
  336.                 else {
  337.                     // Sync-WorkItem: notify the waiting sync-thread.
  338.                     lock (nextWork) {
  339.                         Monitor.Pulse(nextWork);
  340.                         //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ~~~ Notify " + nextWork._thread.GetHashCode() + (nextWork.IsDummy()?" DUMMY ":" REAL ") );
  341.                     }
  342.                 }
  343.             }
  344.         }
  345.        
  346. /*
  347.         *  This is called by any new incoming thread or from
  348.         *  HandleThreadReEntry() when a call-out thread wants to
  349.         *  re-enter the domain.
  350.         *  In the latter case, the WorkItem is a dummy item, it
  351.         *  just serves the purpose of something to block on till
  352.         *  the thread is given a green signal to re-enter.
  353.         */       
  354.         internal virtual void HandleWorkRequest(WorkItem work)
  355.         {
  356.             bool bQueued;
  357.            
  358.             // Check for nested call backs
  359.             if (!IsNestedCall(work._reqMsg)) {
  360.                 // See what type of work it is
  361.                 if (work.IsAsync()) {
  362.                     // Async work is always queued.
  363.                     bQueued = true;
  364.                     // Enqueue the workItem
  365.                     lock (_workItemQueue) {
  366.                         //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ### Async Item EnQueue " + work._thread.GetHashCode());
  367.                         work.SetWaiting();
  368.                         _workItemQueue.Enqueue(work);
  369.                         // If this is the only work item in the queue we will
  370.                         // have to trigger the thread-pool event ourselves
  371.                         if ((!_locked) && (_workItemQueue.Count == 1)) {
  372.                             //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ### Async Signal Self: " + work._thread.GetHashCode());
  373.                             work.SetSignaled();
  374.                             //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ### Domain Locked!");
  375.                             _locked = true;
  376.                             _asyncWorkEvent.Set();
  377.                         }
  378.                     }
  379.                 }
  380.                 else {
  381.                     // Sync work is queued only if there are other items
  382.                     // already in the queue.
  383.                     lock (work) {
  384.                         // Enqueue if we need to
  385.                         lock (_workItemQueue) {
  386.                             if ((!_locked) && (_workItemQueue.Count == 0)) {
  387.                                 _locked = true;
  388.                                 //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ### Domain Locked!");
  389.                                 bQueued = false;
  390.                             }
  391.                             else {
  392.                                 //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ~~~ ENQUEUE Sync!" + (work.IsDummy()?" DUMMY ":" REAL ") + work._thread);
  393.                                 bQueued = true;
  394.                                 work.SetWaiting();
  395.                                 _workItemQueue.Enqueue(work);
  396.                             }
  397.                         }
  398.                        
  399.                         if (bQueued == true) {
  400.                             // If we queued a work item we must wait for some
  401.                             // other thread to peek at it and Notify us.
  402.                            
  403.                             //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ~~~ WORK::WAIT" + work._thread);
  404.                             Monitor.Wait(work);
  405.                             //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ~~~ FINISH Work::WAIT" + work._thread);
  406.                             BCLDebug.Assert(_locked == true, "_locked==true");
  407.                             // Our turn to complete the work!
  408.                             // Execute the callBack only if this is real work
  409.                             if (!work.IsDummy()) {
  410.                                 //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ~~~ Invoke DispatcherCallBack " + work._thread);
  411.                                 // We invoke the callback here that does exactly
  412.                                 // what we need to do ... dequeue work, execute, checkForMore
  413.                                 DispatcherCallBack(null, true);
  414.                             }
  415.                             else {
  416.                                 // DummyWork is just use to block/unblock a returning call.
  417.                                 // Throw away our dummy WorkItem.
  418.                                 lock (_workItemQueue) {
  419.                                     _workItemQueue.Dequeue();
  420.                                 }
  421.                                 // We don't check for more work here since we are already
  422.                                 // in the midst of an executing WorkItem (at the end of which
  423.                                 // the check will be performed)
  424.                             }
  425.                         }
  426.                         else {
  427.                             // We did not queue the work item.
  428.                             if (!work.IsDummy()) {
  429.                                 //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ~~~ Execute direct" + work._thread);
  430.                                 // Execute the work.
  431.                                 BCLDebug.Assert(_locked == true, "_locked==true");
  432.                                 work.SetSignaled();
  433.                                 ExecuteWorkItem(work);
  434.                                 // Check for more work
  435.                                 HandleWorkCompletion();
  436.                             }
  437.                         }
  438.                     }
  439.                 }
  440.             }
  441.             else {
  442.                 // We allow the nested calls to execute directly
  443.                
  444.                 //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ~~~ Execute Nested Call direct" + work._thread);
  445.                 // Execute the work.
  446.                 BCLDebug.Assert(_locked == true, "_locked==true");
  447.                 work.SetSignaled();
  448.                 work.Execute();
  449.                 // We are still inside the top level call ...
  450.                 // so after work.Execute finishes we don't check for more work
  451.                 // or unlock the domain as we do elsewhere.
  452.             }
  453.         }
  454.        
  455.         internal void ExecuteWorkItem(WorkItem work)
  456.         {
  457.             work.Execute();
  458.         }
  459.        
  460.         internal bool IsNestedCall(IMessage reqMsg)
  461.         {
  462.             // This returns TRUE only if it is a non-reEntrant context
  463.             // AND
  464.             // (the LCID of the reqMsg matches that of
  465.             // the top level sync call lcid associated with the context.
  466.             // OR
  467.             // it matches one of the async call out lcids)
  468.            
  469.             bool bNested = false;
  470.             if (!IsReEntrant) {
  471.                 string lcid = SyncCallOutLCID;
  472.                 if (lcid != null) {
  473.                     // This means we are inside a top level call
  474.                     LogicalCallContext callCtx = (LogicalCallContext)reqMsg.Properties[Message.CallContextKey];
  475.                    
  476.                     if (callCtx != null && lcid.Equals(callCtx.RemotingData.LogicalCallID)) {
  477.                         // This is a nested call (we made a call out during
  478.                         // the top level call and eventually that has resulted
  479.                         // in an incoming call with the same lcid)
  480.                         bNested = true;
  481.                     }
  482.                 }
  483.                 if (!bNested && AsyncCallOutLCIDList.Count > 0) {
  484.                     // This means we are inside a top level call
  485.                     LogicalCallContext callCtx = (LogicalCallContext)reqMsg.Properties[Message.CallContextKey];
  486.                     if (AsyncCallOutLCIDList.Contains(callCtx.RemotingData.LogicalCallID)) {
  487.                         bNested = true;
  488.                     }
  489.                 }
  490.             }
  491.             return bNested;
  492.         }
  493.        
  494.        
  495. /*
  496.         *  Implements IContributeServerContextSink::GetServerContextSink
  497.         *  Create a SynchronizedDispatch sink and return it.
  498.         */       
  499.         public virtual IMessageSink GetServerContextSink(IMessageSink nextSink)
  500.         {
  501.             InitIfNecessary();
  502.            
  503.             SynchronizedServerContextSink propertySink = new SynchronizedServerContextSink(this, nextSink);
  504.            
  505.             return (IMessageSink)propertySink;
  506.         }
  507.        
  508. /*
  509.         *  Implements IContributeClientContextSink::GetClientContextSink
  510.         *  Create a CallOut sink and return it.
  511.         */       
  512.         public virtual IMessageSink GetClientContextSink(IMessageSink nextSink)
  513.         {
  514.             InitIfNecessary();
  515.            
  516.             SynchronizedClientContextSink propertySink = new SynchronizedClientContextSink(this, nextSink);
  517.            
  518.             return (IMessageSink)propertySink;
  519.         }
  520.        
  521.     }
  522.    
  523. /*************************************** SERVER SINK ********************************/   
  524. /*
  525.     *  Implements the sink contributed by the Synch-Dispatch
  526.     *  Property. The sink holds a back pointer to the property.
  527.     *  The sink intercepts incoming calls to objects resident in
  528.     *  the Context and co-ordinates with the property to enforce
  529.     *  the domain policy.
  530.     */   
  531.     internal class SynchronizedServerContextSink : InternalSink, IMessageSink
  532.     {
  533.         internal IMessageSink _nextSink;
  534.         internal SynchronizationAttribute _property;
  535.        
  536.         internal SynchronizedServerContextSink(SynchronizationAttribute prop, IMessageSink nextSink)
  537.         {
  538.             _property = prop;
  539.             _nextSink = nextSink;
  540.         }
  541.        
  542.         ~SynchronizedServerContextSink()
  543.         {
  544.             _property.Dispose();
  545.         }
  546.        
  547. /*
  548.         * Implements IMessageSink::SyncProcessMessage
  549.         */       
  550.         public virtual IMessage SyncProcessMessage(IMessage reqMsg)
  551.         {
  552.             // 1. Create a work item
  553.                 /* replySink */            WorkItem work = new WorkItem(reqMsg, _nextSink, null);
  554.            
  555.             // 2. Notify the property to handle the WorkItem
  556.             // The work item may get put in a Queue or may execute directly
  557.             // if the domain is free.
  558.             _property.HandleWorkRequest(work);
  559.            
  560.             // 3. Pick up retMsg from the WorkItem and return
  561.             return work.ReplyMessage;
  562.         }
  563.        
  564. /*
  565.         *  Implements IMessageSink::AsyncProcessMessage
  566.         */       
  567.         public virtual IMessageCtrl AsyncProcessMessage(IMessage reqMsg, IMessageSink replySink)
  568.         {
  569.             // 1. Create a work item
  570.             WorkItem work = new WorkItem(reqMsg, _nextSink, replySink);
  571.             work.SetAsync();
  572.             // 2. We always queue the work item in async case
  573.             _property.HandleWorkRequest(work);
  574.             // 3. Return an IMsgCtrl
  575.             return null;
  576.         }
  577.        
  578. /* 
  579.         * Implements IMessageSink::GetNextSink
  580.         */       
  581.         public IMessageSink NextSink {
  582.             get { return _nextSink; }
  583.         }
  584.     }
  585.    
  586.     //*************************************** WORK ITEM ********************************//
  587. /*
  588.     *  A work item holds the info about a call to Sync or
  589.     *  Async-ProcessMessage.
  590.     */   
  591.     internal class WorkItem
  592.     {
  593.         private const int FLG_WAITING = 1;
  594.         private const int FLG_SIGNALED = 2;
  595.         private const int FLG_ASYNC = 4;
  596.         private const int FLG_DUMMY = 8;
  597.        
  598.         internal int _flags;
  599.         internal IMessage _reqMsg;
  600.         internal IMessageSink _nextSink;
  601.         // ReplySink will be null for an sync work item.
  602.         internal IMessageSink _replySink;
  603.         // ReplyMsg is set once the sync call is completed
  604.         internal IMessage _replyMsg;
  605.        
  606.         // Context in which the work should execute.
  607.         internal Context _ctx;
  608.        
  609.         internal LogicalCallContext _callCtx;
  610.         static internal InternalCrossContextDelegate _xctxDel = new InternalCrossContextDelegate(ExecuteCallback);
  611.        
  612.         //DBGDBG
  613.         //internal int _thread;
  614.        
  615.         internal WorkItem(IMessage reqMsg, IMessageSink nextSink, IMessageSink replySink)
  616.         {
  617.             _reqMsg = reqMsg;
  618.             _replyMsg = null;
  619.             _nextSink = nextSink;
  620.             _replySink = replySink;
  621.             _ctx = Thread.CurrentContext;
  622.             _callCtx = CallContext.GetLogicalCallContext();
  623.             //DBGDBG
  624.             //_thread = Thread.CurrentThread.GetHashCode();
  625.         }
  626.        
  627.         // To mark a work item being enqueued
  628.         internal virtual void SetWaiting()
  629.         {
  630.             BCLDebug.Assert(!IsWaiting(), "!IsWaiting()");
  631.             _flags |= FLG_WAITING;
  632.         }
  633.        
  634.         internal virtual bool IsWaiting()
  635.         {
  636.             return (_flags & FLG_WAITING) == FLG_WAITING;
  637.         }
  638.        
  639.         // To mark a work item that has been given the green light!
  640.         internal virtual void SetSignaled()
  641.         {
  642.             BCLDebug.Assert(!IsSignaled(), "!IsSignaled()");
  643.             _flags |= FLG_SIGNALED;
  644.         }
  645.        
  646.         internal virtual bool IsSignaled()
  647.         {
  648.             return (_flags & FLG_SIGNALED) == FLG_SIGNALED;
  649.         }
  650.        
  651.         internal virtual void SetAsync()
  652.         {
  653.             _flags |= FLG_ASYNC;
  654.         }
  655.        
  656.         internal virtual bool IsAsync()
  657.         {
  658.             return (_flags & FLG_ASYNC) == FLG_ASYNC;
  659.         }
  660.        
  661.         internal virtual void SetDummy()
  662.         {
  663.             _flags |= FLG_DUMMY;
  664.         }
  665.        
  666.         internal virtual bool IsDummy()
  667.         {
  668.             return (_flags & FLG_DUMMY) == FLG_DUMMY;
  669.         }
  670.        
  671.        
  672.         static internal object ExecuteCallback(object[] args)
  673.         {
  674.             WorkItem This = (WorkItem)args[0];
  675.            
  676.             if (This.IsAsync()) {
  677.                 //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] AsyncWork.Execute");
  678.                 This._nextSink.AsyncProcessMessage(This._reqMsg, This._replySink);
  679.             }
  680.             else if (This._nextSink != null) {
  681.                 //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] SyncWork.Execute");
  682.                 This._replyMsg = This._nextSink.SyncProcessMessage(This._reqMsg);
  683.             }
  684.             return null;
  685.         }
  686.        
  687. /*
  688.         *  Execute is called to complete a work item (sync or async).
  689.         *  Execute assumes that the context is set correctly and the lock
  690.         *  is taken (i.e. it makes no policy decisions)
  691.         *
  692.         *  It is called from the following 3 points:
  693.         *      1. thread pool thread executing the callback for an async item
  694.         *      2. calling thread executing the callback for a queued sync item
  695.         *      3. calling thread directly calling Execute for a non-queued sync item
  696.         */       
  697.         internal virtual void Execute()
  698.         {
  699.             // Execute should be called with the domain policy enforced
  700.             // i.e. a Synchronization domain should be locked etc ...
  701.             BCLDebug.Assert(IsSignaled(), "IsSignaled()");
  702.            
  703.             Thread.CurrentThread.InternalCrossContextCallback(_ctx, _xctxDel, new object[] {this});
  704.         }
  705.         internal virtual IMessage ReplyMessage {
  706.             get { return _replyMsg; }
  707.         }
  708.     }
  709.    
  710.     //*************************************** CLIENT SINK ********************************//
  711.    
  712. /*
  713.     *  Implements the client context sink contributed by the
  714.     *  Property. The sink holds a back pointer to the property.
  715.     *  The sink intercepts outgoing calls from objects the Context
  716.     *  and co-ordinates with the property to enforce the domain policy.
  717.     */   
  718.     internal class SynchronizedClientContextSink : InternalSink, IMessageSink
  719.     {
  720.         internal IMessageSink _nextSink;
  721.         internal SynchronizationAttribute _property;
  722.        
  723.         internal SynchronizedClientContextSink(SynchronizationAttribute prop, IMessageSink nextSink)
  724.         {
  725.             _property = prop;
  726.             _nextSink = nextSink;
  727.         }
  728.        
  729.         ~SynchronizedClientContextSink()
  730.         {
  731.             _property.Dispose();
  732.         }
  733.        
  734. /*
  735.         *  Implements IMessageSink::SyncProcessMessage for the call-out sink
  736.         */       
  737.         public virtual IMessage SyncProcessMessage(IMessage reqMsg)
  738.         {
  739.             BCLDebug.Assert(_property.Locked == true, "_property.Locked == true");
  740.             IMessage replyMsg;
  741.             if (_property.IsReEntrant) {
  742.                 // In this case we are required to let anybody waiting for
  743.                 // the domain to enter and execute
  744.                 // Notify the property that we are leaving
  745.                 _property.HandleThreadExit();
  746.                
  747.                 //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] R: Sync call-out");
  748.                 replyMsg = _nextSink.SyncProcessMessage(reqMsg);
  749.                
  750.                 // We will just block till we are given permission to re-enter
  751.                 // Notify the property that we wish to re-enter the domain.
  752.                 // This will block the thread here if someone is in the domain.
  753.                 //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] R: Sync call-out returned, waiting for lock");
  754.                 _property.HandleThreadReEntry();
  755.                 BCLDebug.Assert(_property.Locked == true, "_property.Locked == true");
  756.             }
  757.             else {
  758.                 // In the non-reentrant case we are just a pass-through sink
  759.                 //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] NR: Sync call-out (pass through)");
  760.                 // We should mark the domain with our LCID so that call-backs are allowed to enter..
  761.                 LogicalCallContext cctx = (LogicalCallContext)reqMsg.Properties[Message.CallContextKey];
  762.                
  763.                 string lcid = cctx.RemotingData.LogicalCallID;
  764.                 bool bClear = false;
  765.                 if (lcid == null) {
  766.                     // We used to assign call-ids in RemotingProxy.cs at the
  767.                     // start of each Invoke. As an optimization we now do it
  768.                     // here in a delayed fashion... since currently only
  769.                     // Synchronization needs it
  770.                     // Note that for Sync-calls we would just inherit an LCID
  771.                     // if the call has one, if not we create one. However for
  772.                     // async calls we always generate a new LCID.
  773.                     lcid = Identity.GetNewLogicalCallID();
  774.                     cctx.RemotingData.LogicalCallID = lcid;
  775.                     bClear = true;
  776.                    
  777.                     BCLDebug.Assert(_property.SyncCallOutLCID == null, "Synchronization domain is already in a callOut state");
  778.                 }
  779.                
  780.                 bool bTopLevel = false;
  781.                 if (_property.SyncCallOutLCID == null) {
  782.                     _property.SyncCallOutLCID = lcid;
  783.                     bTopLevel = true;
  784.                 }
  785.                
  786.                 BCLDebug.Assert(lcid.Equals(_property.SyncCallOutLCID), "Bad synchronization domain state!");
  787.                
  788.                 replyMsg = _nextSink.SyncProcessMessage(reqMsg);
  789.                
  790.                 // if a top level call out returned we clear the callId in the domain
  791.                 if (bTopLevel) {
  792.                     _property.SyncCallOutLCID = null;
  793.                    
  794.                     // The sync callOut is done, we do not need the lcid
  795.                     // that was associated with the call any more.
  796.                     // (clear it only if we added one to the reqMsg)
  797.                     if (bClear) {
  798.                         // Note that we make changes to the callCtx in
  799.                         // the reply message ... since this is the one that
  800.                         // will get installed back on the thread that called
  801.                         // the proxy.
  802.                         LogicalCallContext cctxRet = (LogicalCallContext)replyMsg.Properties[Message.CallContextKey];
  803.                         BCLDebug.Assert(cctxRet != null, "CallContext should be non-null");
  804.                         cctxRet.RemotingData.LogicalCallID = null;
  805.                     }
  806.                 }
  807.                
  808.                 //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] NR: Sync call-out returned");
  809.             }
  810.             return replyMsg;
  811.         }
  812.        
  813. /*
  814.         *  Implements IMessageSink::AsyncProcessMessage for the call-out sink
  815.         */       
  816.         public virtual IMessageCtrl AsyncProcessMessage(IMessage reqMsg, IMessageSink replySink)
  817.         {
  818.             IMessageCtrl msgCtrl = null;
  819.            
  820.             BCLDebug.Assert(_property.Locked == true, "_property.Locked == true");
  821.            
  822.             if (!_property.IsReEntrant) {
  823.                 // In this case new calls are not allowed to enter the domain
  824.                 // We need to track potentially more than one async-call-outs
  825.                 // and allow the completion notifications to come in for those
  826.                
  827.                 LogicalCallContext cctx = (LogicalCallContext)reqMsg.Properties[Message.CallContextKey];
  828.                 // We used to generate a new lcid automatically in RemotingProxy
  829.                 // Invoke at the start of each Async call.
  830.                 // However now we do it here as an optimization (since only
  831.                 // Synchronization needs it)
  832.                 // RemotingProxy invoke code does Clone() the callContext at
  833.                 // the start of each Async call so we don't have to worry
  834.                 // about stomping someone else's lcid here.
  835.                
  836.                
  837.                 string lcid = Identity.GetNewLogicalCallID();
  838.                 cctx.RemotingData.LogicalCallID = lcid;
  839.                
  840.                
  841.                 BCLDebug.Assert(_property.SyncCallOutLCID == null, "Cannot handle async call outs when already in a top-level sync call out");
  842.                 //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] NR: Async CallOut: adding to lcidList: " + lcid);
  843.                 _property.AsyncCallOutLCIDList.Add(lcid);
  844.             }
  845.             // We will call AsyncProcessMessage directly on this thread
  846.             // since the thread should not block much. However, we will
  847.             // have to intercept the callback on the replySink chain for
  848.             // which we wrap the caller provided replySink into our sink.
  849.             AsyncReplySink mySink = new AsyncReplySink(replySink, _property);
  850.            
  851.             // NOTE: we will need to yield the Synchronization Domain at
  852.             // some time or another to get our own callBack to complete.
  853.            
  854.             // Note that for the Async call-outs we have to provide an interception
  855.             // sink whether we are re-entrant or not since we want
  856.             // the replySink.SyncProcessMessage call to be wait for the lock just like
  857.             // any other call-in.
  858.             //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] Async call-out");
  859.            
  860.             msgCtrl = _nextSink.AsyncProcessMessage(reqMsg, (IMessageSink)mySink);
  861.             //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] Async call-out AsyncPM returned, reply to come separately");
  862.            
  863.             return msgCtrl;
  864.         }
  865.        
  866. /*
  867.         *  Implements IMessageSink::GetNextSink
  868.         */       
  869.         public IMessageSink NextSink {
  870.             get { return _nextSink; }
  871.         }
  872.        
  873.        
  874. /*
  875.         *  This class just implements the CallBack sink we provide to
  876.         *  intercept the callback of an Async out-call. The CallBack sink
  877.         *  ensures that arbitrary threads do not enter our Synchronization
  878.         *  Domain without asking us if it is Ok!
  879.         */       
  880.         internal class AsyncReplySink : IMessageSink
  881.         {
  882.             internal IMessageSink _nextSink;
  883.             internal SynchronizationAttribute _property;
  884.             internal AsyncReplySink(IMessageSink nextSink, SynchronizationAttribute prop)
  885.             {
  886.                 _nextSink = nextSink;
  887.                 _property = prop;
  888.             }
  889.            
  890.             public virtual IMessage SyncProcessMessage(IMessage reqMsg)
  891.             {
  892.                
  893.                 // We handle this as a regular new Sync workItem
  894.                 // 1. Create a work item
  895.                     /* replySink */                WorkItem work = new WorkItem(reqMsg, _nextSink, null);
  896.                
  897.                 // 2. Notify the property to handle the WorkItem
  898.                 // The work item may get put in a Queue or may execute right away.
  899.                 _property.HandleWorkRequest(work);
  900.                
  901.                 if (!_property.IsReEntrant) {
  902.                     // Remove the async lcid we had added to the call out list.
  903.                     //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] NR: InterceptionSink::SyncPM Removing async call-out lcid: " + ((LogicalCallContext)reqMsg.Properties[Message.CallContextKey]).RemotingData.LogicalCallID);
  904.                     _property.AsyncCallOutLCIDList.Remove(((LogicalCallContext)reqMsg.Properties[Message.CallContextKey]).RemotingData.LogicalCallID);
  905.                 }
  906.                
  907.                 // 3. Pick up retMsg from the WorkItem and return
  908.                 return work.ReplyMessage;
  909.             }
  910.            
  911.             public virtual IMessageCtrl AsyncProcessMessage(IMessage reqMsg, IMessageSink replySink)
  912.             {
  913.                 throw new NotSupportedException();
  914.             }
  915.            
  916. /*
  917.             * Implements IMessageSink::GetNextSink
  918.             */           
  919.             public IMessageSink NextSink {
  920.                 get { return _nextSink; }
  921.             }
  922.         }
  923.     }
  924.    
  925. }

Developer Fusion