The Labs \ Source Viewer \ SSCLI \ System.Runtime.Remoting.Channels \ AsyncWorkItem

  1. // ==++==
  2. //
  3. //
  4. // Copyright (c) 2006 Microsoft Corporation. All rights reserved.
  5. //
  6. // The use and distribution terms for this software are contained in the file
  7. // named license.txt, which can be found in the root of this distribution.
  8. // By using this software in any fashion, you are agreeing to be bound by the
  9. // terms of this license.
  10. //
  11. // You must not remove this notice, or any other, from this software.
  12. //
  13. //
  14. // ==--==
  15. //
  16. // Remoting Infrastructure Sink for making calls across context
  17. // boundaries.
  18. //
  19. namespace System.Runtime.Remoting.Channels
  20. {
  21.    
  22.     using System;
  23.     using System.Collections;
  24.     using System.Threading;
  25.     using System.Runtime.Remoting;
  26.     using System.Runtime.Remoting.Contexts;
  27.     using System.Runtime.Remoting.Messaging;
  28.     using System.Runtime.Serialization;
  29.    
  30. /* package scope */   
  31.     // deliberately not [serializable]
  32.     internal class CrossContextChannel : InternalSink, IMessageSink
  33.     {
  34.         private const string _channelName = "XCTX";
  35.         private const int _channelCapability = 0;
  36.         private const string _channelURI = "XCTX_URI";
  37.        
  38.         private static CrossContextChannel messageSink {
  39.             get { return Thread.GetDomain().RemotingData.ChannelServicesData.xctxmessageSink; }
  40.             set { Thread.GetDomain().RemotingData.ChannelServicesData.xctxmessageSink = value; }
  41.         }
  42.        
  43.         private static object staticSyncObject = new object();
  44.         private static InternalCrossContextDelegate s_xctxDel = new InternalCrossContextDelegate(SyncProcessMessageCallback);
  45.        
  46.         static internal IMessageSink MessageSink {
  47.             get {
  48.                 if (messageSink == null) {
  49.                     CrossContextChannel tmpSink = new CrossContextChannel();
  50.                    
  51.                     lock (staticSyncObject) {
  52.                         if (messageSink == null) {
  53.                             messageSink = tmpSink;
  54.                         }
  55.                     }
  56.                     //Interlocked.CompareExchange(out messageSink, tmpSink, null);
  57.                 }
  58.                 return messageSink;
  59.             }
  60.         }
  61.        
  62.         static internal object SyncProcessMessageCallback(object[] args)
  63.         {
  64.             IMessage reqMsg = args[0] as IMessage;
  65.             Context srvCtx = args[1] as Context;
  66.             IMessage replyMsg = null;
  67.            
  68.             // If profiling of remoting is active, must tell the profiler that we have received
  69.             // a message.
  70.             if (RemotingServices.CORProfilerTrackRemoting()) {
  71.                 Guid g = Guid.Empty;
  72.                
  73.                 if (RemotingServices.CORProfilerTrackRemotingCookie()) {
  74.                     object obj = reqMsg.Properties["CORProfilerCookie"];
  75.                    
  76.                     if (obj != null) {
  77.                         g = (Guid)obj;
  78.                     }
  79.                 }
  80.                
  81.                 RemotingServices.CORProfilerRemotingServerReceivingMessage(g, false);
  82.             }
  83.            
  84.             Message.DebugOut("::::::::::::::::::::::::: CrossContext Channel: passing to ServerContextChain");
  85.            
  86.             // Server side notifications for dynamic sinks are done
  87.             // in the x-context channel ... this is to maintain
  88.             // symmetry of the point of notification between
  89.             // the client and server context
  90.                 // bCliSide
  91.                 // bStart
  92.                 // bAsync
  93.             srvCtx.NotifyDynamicSinks(reqMsg, false, true, false, true);
  94.             // bNotifyGlobals
  95.             replyMsg = srvCtx.GetServerContextChain().SyncProcessMessage(reqMsg);
  96.                 // bCliSide
  97.                 // bStart
  98.                 // bAsync
  99.             srvCtx.NotifyDynamicSinks(replyMsg, false, false, false, true);
  100.             // bNotifyGlobals
  101.             Message.DebugOut("::::::::::::::::::::::::: CrossContext Channel: back from ServerContextChain");
  102.            
  103.             // If profiling of remoting is active, must tell the profiler that we are sending a
  104.             // reply message.
  105.             if (RemotingServices.CORProfilerTrackRemoting()) {
  106.                 Guid g;
  107.                
  108.                 RemotingServices.CORProfilerRemotingServerSendingReply(out g, false);
  109.                
  110.                 if (RemotingServices.CORProfilerTrackRemotingCookie()) {
  111.                     replyMsg.Properties["CORProfilerCookie"] = g;
  112.                 }
  113.             }
  114.             return replyMsg;
  115.         }
  116.        
  117.         public virtual IMessage SyncProcessMessage(IMessage reqMsg)
  118.         {
  119.             object[] args = new object[] {null, null};
  120.             IMessage replyMsg = null;
  121.            
  122.             try {
  123.                 Message.DebugOut("\n::::::::::::::::::::::::: CrossContext Channel: Sync call starting");
  124.                 IMessage errMsg = ValidateMessage(reqMsg);
  125.                 if (errMsg != null) {
  126.                     return errMsg;
  127.                 }
  128.                
  129.                 ServerIdentity srvID = GetServerIdentity(reqMsg);
  130.                 Message.DebugOut("Got Server identity \n");
  131.                 BCLDebug.Assert(null != srvID, "null != srvID");
  132.                
  133.                
  134.                 BCLDebug.Assert(null != srvID.ServerContext, "null != srvID.ServerContext");
  135.                
  136.                 args[0] = reqMsg;
  137.                 args[1] = srvID.ServerContext;
  138.                 replyMsg = (IMessage)Thread.CurrentThread.InternalCrossContextCallback(srvID.ServerContext, s_xctxDel, args);
  139.             }
  140.             catch (Exception e) {
  141.                 Message.DebugOut("Arrgh.. XCTXSink::throwing exception " + e + "\n");
  142.                 replyMsg = new ReturnMessage(e, (IMethodCallMessage)reqMsg);
  143.                 if (reqMsg != null) {
  144.                     ((ReturnMessage)replyMsg).SetLogicalCallContext((LogicalCallContext)reqMsg.Properties[Message.CallContextKey]);
  145.                 }
  146.             }
  147.            
  148.             Message.DebugOut("::::::::::::::::::::::::::: CrossContext Channel: Sync call returning!!\n");
  149.             return replyMsg;
  150.         }
  151.        
  152.         static internal object AsyncProcessMessageCallback(object[] args)
  153.         {
  154.             AsyncWorkItem workItem = null;
  155.            
  156.             IMessage reqMsg = (IMessage)args[0];
  157.             IMessageSink replySink = (IMessageSink)args[1];
  158.             Context oldCtx = (Context)args[2];
  159.             Context srvCtx = (Context)args[3];
  160.             IMessageCtrl msgCtrl = null;
  161.            
  162.             // we use the work item just as our replySink in this case
  163.             if (replySink != null) {
  164.                 workItem = new AsyncWorkItem(replySink, oldCtx);
  165.             }
  166.            
  167.             Message.DebugOut("::::::::::::::::::::::::: CrossContext Channel: passing to ServerContextChain");
  168.            
  169.                 // bCliSide
  170.                 // bStart
  171.                 // bAsync
  172.             srvCtx.NotifyDynamicSinks(reqMsg, false, true, true, true);
  173.             // bNotifyGlobals
  174.             // call the server context chain
  175.             msgCtrl = srvCtx.GetServerContextChain().AsyncProcessMessage(reqMsg, (IMessageSink)workItem);
  176.            
  177.             // Note: for async calls, we will do the return notification
  178.             // for dynamic properties only when the async call
  179.             // completes (i.e. when the replySink gets called)
  180.            
  181.             Message.DebugOut("::::::::::::::::::::::::: CrossContext Channel: back from ServerContextChain");
  182.            
  183.             return msgCtrl;
  184.         }
  185.        
  186.         public virtual IMessageCtrl AsyncProcessMessage(IMessage reqMsg, IMessageSink replySink)
  187.         {
  188.             Message.DebugOut("::::::::::::::::::::::::::: CrossContext Channel: Async call starting!!\n");
  189.             // One way Async notifications may potentially pass a null reply sink.
  190.             IMessage errMsg = ValidateMessage(reqMsg);
  191.            
  192.             object[] args = new object[] {null, null, null, null};
  193.            
  194.             IMessageCtrl msgCtrl = null;
  195.             if (errMsg != null) {
  196.                 if (replySink != null) {
  197.                     replySink.SyncProcessMessage(errMsg);
  198.                 }
  199.             }
  200.             else {
  201.                 ServerIdentity srvID = GetServerIdentity(reqMsg);
  202.                
  203.                 // If active, notify the profiler that an asynchronous remoting message was received.
  204.                 if (RemotingServices.CORProfilerTrackRemotingAsync()) {
  205.                     Guid g = Guid.Empty;
  206.                    
  207.                     if (RemotingServices.CORProfilerTrackRemotingCookie()) {
  208.                         object obj = reqMsg.Properties["CORProfilerCookie"];
  209.                        
  210.                         if (obj != null) {
  211.                             g = (Guid)obj;
  212.                         }
  213.                     }
  214.                    
  215.                     RemotingServices.CORProfilerRemotingServerReceivingMessage(g, true);
  216.                    
  217.                     // Only wrap the replySink if the call wants a reply
  218.                     if (replySink != null) {
  219.                         // Now wrap the reply sink in our own so that we can notify the profiler of
  220.                         // when the reply is sent. Upon invocation, it will notify the profiler
  221.                         // then pass control on to the replySink passed in above.
  222.                         IMessageSink profSink = new ServerAsyncReplyTerminatorSink(replySink);
  223.                        
  224.                         // Replace the reply sink with our own
  225.                         replySink = profSink;
  226.                     }
  227.                 }
  228.                
  229.                 Context srvCtx = srvID.ServerContext;
  230.                 if (srvCtx.IsThreadPoolAware) {
  231.                     // this is the case when we do not queue the work item since the
  232.                     // server context claims to be doing its own threading.
  233.                    
  234.                     args[0] = reqMsg;
  235.                     args[1] = replySink;
  236.                     args[2] = Thread.CurrentContext;
  237.                     args[3] = srvCtx;
  238.                    
  239.                     InternalCrossContextDelegate xctxDel = new InternalCrossContextDelegate(AsyncProcessMessageCallback);
  240.                    
  241.                     msgCtrl = (IMessageCtrl)Thread.CurrentThread.InternalCrossContextCallback(srvCtx, xctxDel, args);
  242.                 }
  243.                 else {
  244.                     AsyncWorkItem workItem = null;
  245.                    
  246.                     // This is the case where we take care of returning the calling
  247.                     // thread asap by using the ThreadPool for completing the call.
  248.                    
  249.                     // we use a more elaborate WorkItem and delegate the work to the thread pool
  250.                     workItem = new AsyncWorkItem(reqMsg, replySink, Thread.CurrentContext, srvID);
  251.                    
  252.                     WaitCallback threadFunc = new WaitCallback(workItem.FinishAsyncWork);
  253.                     // Note: Dynamic sinks are notified in the threadFunc
  254.                     ThreadPool.QueueUserWorkItem(threadFunc);
  255.                 }
  256.             }
  257.            
  258.             Message.DebugOut("::::::::::::::::::::::::::: CrossContext Channel: Async call returning!!\n");
  259.             return msgCtrl;
  260.         }
  261.         // AsyncProcessMessage
  262.         static internal object DoAsyncDispatchCallback(object[] args)
  263.         {
  264.             AsyncWorkItem workItem = null;
  265.            
  266.             IMessage reqMsg = (IMessage)args[0];
  267.             IMessageSink replySink = (IMessageSink)args[1];
  268.             Context oldCtx = (Context)args[2];
  269.             Context srvCtx = (Context)args[3];
  270.             IMessageCtrl msgCtrl = null;
  271.            
  272.            
  273.             // we use the work item just as our replySink in this case
  274.             if (replySink != null) {
  275.                 workItem = new AsyncWorkItem(replySink, oldCtx);
  276.             }
  277.             Message.DebugOut("::::::::::::::::::::::::: CrossContext Channel: passing to ServerContextChain");
  278.             // call the server context chain
  279.             msgCtrl = srvCtx.GetServerContextChain().AsyncProcessMessage(reqMsg, (IMessageSink)workItem);
  280.             Message.DebugOut("::::::::::::::::::::::::: CrossContext Channel: back from ServerContextChain");
  281.            
  282.             return msgCtrl;
  283.         }
  284.        
  285.        
  286.         static internal IMessageCtrl DoAsyncDispatch(IMessage reqMsg, IMessageSink replySink)
  287.         {
  288.             object[] args = new object[] {null, null, null, null};
  289.            
  290.             ServerIdentity srvID = GetServerIdentity(reqMsg);
  291.            
  292.             // If active, notify the profiler that an asynchronous remoting message was received.
  293.             if (RemotingServices.CORProfilerTrackRemotingAsync()) {
  294.                 Guid g = Guid.Empty;
  295.                
  296.                 if (RemotingServices.CORProfilerTrackRemotingCookie()) {
  297.                     object obj = reqMsg.Properties["CORProfilerCookie"];
  298.                     if (obj != null)
  299.                         g = (Guid)obj;
  300.                 }
  301.                
  302.                 RemotingServices.CORProfilerRemotingServerReceivingMessage(g, true);
  303.                
  304.                 // Only wrap the replySink if the call wants a reply
  305.                 if (replySink != null) {
  306.                     // Now wrap the reply sink in our own so that we can notify the profiler of
  307.                     // when the reply is sent. Upon invocation, it will notify the profiler
  308.                     // then pass control on to the replySink passed in above.
  309.                     IMessageSink profSink = new ServerAsyncReplyTerminatorSink(replySink);
  310.                    
  311.                     // Replace the reply sink with our own
  312.                     replySink = profSink;
  313.                 }
  314.             }
  315.            
  316.             IMessageCtrl msgCtrl = null;
  317.             Context srvCtx = srvID.ServerContext;
  318.            
  319.             //if (srvCtx.IsThreadPoolAware)
  320.             //{
  321.             // this is the case when we do not queue the work item since the
  322.             // server context claims to be doing its own threading.
  323.            
  324.             args[0] = reqMsg;
  325.             args[1] = replySink;
  326.             args[2] = Thread.CurrentContext;
  327.             args[3] = srvCtx;
  328.            
  329.             InternalCrossContextDelegate xctxDel = new InternalCrossContextDelegate(DoAsyncDispatchCallback);
  330.            
  331.             msgCtrl = (IMessageCtrl)Thread.CurrentThread.InternalCrossContextCallback(srvCtx, xctxDel, args);
  332.            
  333.             //}
  334.            
  335.             return msgCtrl;
  336.         }
  337.         // DoDispatch
  338.         public IMessageSink NextSink {
  339. // We are a terminating sink for this chain.
  340.             get { return null; }
  341.         }
  342.     }
  343.    
  344. /* package */   
  345.     internal class AsyncWorkItem : IMessageSink
  346.     {
  347.         // the replySink passed in to us in AsyncProcessMsg
  348.         private IMessageSink _replySink;
  349.        
  350.         // the server identity we are calling
  351.         private ServerIdentity _srvID;
  352.        
  353.         // the original context of the thread calling AsyncProcessMsg
  354.         private Context _oldCtx;
  355.        
  356.         private LogicalCallContext _callCtx;
  357.        
  358.         // the request msg passed in
  359.         private IMessage _reqMsg;
  360.        
  361.        
  362.         internal AsyncWorkItem(IMessageSink replySink, Context oldCtx) : this(null, replySink, oldCtx, null)
  363.         {
  364.         }
  365.        
  366.         internal AsyncWorkItem(IMessage reqMsg, IMessageSink replySink, Context oldCtx, ServerIdentity srvID)
  367.         {
  368.             _reqMsg = reqMsg;
  369.             _replySink = replySink;
  370.             _oldCtx = oldCtx;
  371.             _callCtx = CallContext.GetLogicalCallContext();
  372.             _srvID = srvID;
  373.         }
  374.        
  375.         static internal object SyncProcessMessageCallback(object[] args)
  376.         {
  377.             IMessageSink replySink = (IMessageSink)args[0];
  378.             IMessage msg = (IMessage)args[1];
  379.            
  380.             return replySink.SyncProcessMessage(msg);
  381.         }
  382.        
  383.         public virtual IMessage SyncProcessMessage(IMessage msg)
  384.         {
  385.             // This gets called when the called object finishes the AsyncWork...
  386.            
  387.             // This is called irrespective of whether we delegated the initial
  388.             // work to a thread pool thread or not. Quite likely it will be
  389.             // called on a user thread (i.e. a thread different from the
  390.             // forward call thread)
  391.            
  392.             // we just switch back to the old context before calling
  393.             // the next replySink
  394.            
  395.             IMessage retMsg = null;
  396.            
  397.             if (_replySink != null) {
  398.                 // This assert covers the common case (ThreadPool)
  399.                 // and checks that the reply thread for the async call
  400.                 // indeed emerges from the server context.
  401.                 BCLDebug.Assert((_srvID == null) || (_srvID.ServerContext == Thread.CurrentContext), "Thread expected to be in the server context!");
  402.                
  403.                 // Call the dynamic sinks to notify that the async call
  404.                 // has completed
  405.                     // this is the async reply
  406.                     // bCliSide
  407.                     // bStart
  408.                     // bAsync
  409.                 Thread.CurrentContext.NotifyDynamicSinks(msg, false, false, true, true);
  410.                 // bNotifyGlobals
  411.                 object[] args = new object[] {_replySink, msg};
  412.                
  413.                 InternalCrossContextDelegate xctxDel = new InternalCrossContextDelegate(SyncProcessMessageCallback);
  414.                
  415.                 retMsg = (IMessage)Thread.CurrentThread.InternalCrossContextCallback(_oldCtx, xctxDel, args);
  416.             }
  417.             return retMsg;
  418.         }
  419.        
  420.         public virtual IMessageCtrl AsyncProcessMessage(IMessage msg, IMessageSink replySink)
  421.         {
  422.             // Can't call the reply sink asynchronously!
  423.             throw new NotSupportedException(Environment.GetResourceString("NotSupported_Method"));
  424.         }
  425.        
  426.         public IMessageSink NextSink {
  427.             get { return _replySink; }
  428.         }
  429.        
  430.         static internal object FinishAsyncWorkCallback(object[] args)
  431.         {
  432.             AsyncWorkItem This = (AsyncWorkItem)args[0];
  433.             Context srvCtx = This._srvID.ServerContext;
  434.            
  435.             LogicalCallContext threadPoolCallCtx = CallContext.SetLogicalCallContext(This._callCtx);
  436.            
  437.             // Call the server context chain Async. We provide workItem as our
  438.             // replySink ... this will cause the replySink.ProcessMessage
  439.             // to switch back to the context of the original caller thread.
  440.            
  441.             // Call the dynamic sinks to notify that the async call
  442.             // is starting
  443.                 // bCliSide
  444.                 // bStart
  445.                 // bAsync
  446.             srvCtx.NotifyDynamicSinks(This._reqMsg, false, true, true, true);
  447.             // bNotifyGlobals
  448.             IMessageCtrl ctrl = srvCtx.GetServerContextChain().AsyncProcessMessage(This._reqMsg, (IMessageSink)This);
  449.            
  450.             // change back to the old context
  451.             CallContext.SetLogicalCallContext(threadPoolCallCtx);
  452.            
  453.             return null;
  454.         }
  455.        
  456. /* package */       
  457.         internal virtual void FinishAsyncWork(object stateIgnored)
  458.         {
  459.             InternalCrossContextDelegate xctxDel = new InternalCrossContextDelegate(FinishAsyncWorkCallback);
  460.            
  461.             object[] args = new object[] {this};
  462.            
  463.             Thread.CurrentThread.InternalCrossContextCallback(_srvID.ServerContext, xctxDel, args);
  464.         }
  465.     }
  466. }

Developer Fusion