The Labs \ Source Viewer \ SSCLI \ System.Runtime.Remoting.Channels \ RequestQueue

  1. // ==++==
  2. //
  3. //
  4. // Copyright (c) 2006 Microsoft Corporation. All rights reserved.
  5. //
  6. // The use and distribution terms for this software are contained in the file
  7. // named license.txt, which can be found in the root of this distribution.
  8. // By using this software in any fashion, you are agreeing to be bound by the
  9. // terms of this license.
  10. //
  11. // You must not remove this notice, or any other, from this software.
  12. //
  13. //
  14. // ==--==
  15. //
  16. // Request Queue
  17. // queues up the requests to avoid thread pool starvation,
  18. // making sure that there are always available threads to process requests
  19. namespace System.Runtime.Remoting.Channels
  20. {
  21.     using System.Threading;
  22.     using System.Collections;
  23.    
  24.     internal class RequestQueue
  25.     {
  26.         // configuration params
  27.         private int _minExternFreeThreads;
  28.         private int _minLocalFreeThreads;
  29.         private int _queueLimit;
  30.        
  31.         // two queues -- one for local requests, one for external
  32.         private Queue _localQueue = new Queue();
  33.         private Queue _externQueue = new Queue();
  34.        
  35.         // total count
  36.         private int _count;
  37.        
  38.         // work items queued to pick up new work
  39.         private WaitCallback _workItemCallback;
  40.         private int _workItemCount;
  41.         private const int _workItemLimit = 2;
  42.        
  43.        
  44.         // helpers
  45.         private static bool IsLocal(SocketHandler sh)
  46.         {
  47.             return sh.IsLocal();
  48.         }
  49.        
  50.         private void QueueRequest(SocketHandler sh, bool isLocal)
  51.         {
  52.             lock (this) {
  53.                 if (isLocal)
  54.                     _localQueue.Enqueue(sh);
  55.                 else
  56.                     _externQueue.Enqueue(sh);
  57.                
  58.                 _count++;
  59.             }
  60.         }
  61.        
  62.         private SocketHandler DequeueRequest(bool localOnly)
  63.         {
  64.             object sh = null;
  65.            
  66.             if (_count > 0) {
  67.                 lock (this) {
  68.                     if (_localQueue.Count > 0) {
  69.                         sh = _localQueue.Dequeue();
  70.                         _count--;
  71.                     }
  72.                     else if (!localOnly && _externQueue.Count > 0) {
  73.                         sh = _externQueue.Dequeue();
  74.                         _count--;
  75.                     }
  76.                 }
  77.             }
  78.            
  79.             return (SocketHandler)sh;
  80.         }
  81.        
  82.         // ctor
  83.         internal RequestQueue(int minExternFreeThreads, int minLocalFreeThreads, int queueLimit)
  84.         {
  85.             _minExternFreeThreads = minExternFreeThreads;
  86.             _minLocalFreeThreads = minLocalFreeThreads;
  87.             _queueLimit = queueLimit;
  88.            
  89.             _workItemCallback = new WaitCallback(this.WorkItemCallback);
  90.         }
  91.        
  92.        
  93.         // method called to process the next request
  94.         internal void ProcessNextRequest(SocketHandler sh)
  95.         {
  96.             sh = GetRequestToExecute(sh);
  97.            
  98.             if (sh != null)
  99.                 sh.ProcessRequestNow();
  100.         }
  101.         // ProcessNextRequest
  102.        
  103.         // method called when data arrives for incoming requests
  104.         internal SocketHandler GetRequestToExecute(SocketHandler sh)
  105.         {
  106.             int workerThreads;
  107.             int ioThreads;
  108.             ThreadPool.GetAvailableThreads(out workerThreads, out ioThreads);
  109.             int freeThreads = (ioThreads > workerThreads) ? workerThreads : ioThreads;
  110.            
  111.             // fast path when there are threads available and nothing queued
  112.             if (freeThreads >= _minExternFreeThreads && _count == 0)
  113.                 return sh;
  114.            
  115.             bool isLocal = IsLocal(sh);
  116.            
  117.             // fast path when there are threads for local requests available and nothing queued
  118.             if (isLocal && freeThreads >= _minLocalFreeThreads && _count == 0)
  119.                 return sh;
  120.            
  121.             // reject if queue limit exceeded
  122.             if (_count >= _queueLimit) {
  123.                 sh.RejectRequestNowSinceServerIsBusy();
  124.                 return null;
  125.             }
  126.            
  127.             // can't execute the current request on the current thread -- need to queue
  128.             QueueRequest(sh, isLocal);
  129.            
  130.             // maybe can execute a request previously queued
  131.             if (freeThreads >= _minExternFreeThreads) {
  132.                 sh = DequeueRequest(false);
  133.                 // enough threads to process even external requests
  134.             }
  135.             else if (freeThreads >= _minLocalFreeThreads) {
  136.                 sh = DequeueRequest(true);
  137.                 // enough threads to process only local requests
  138.             }
  139.             else {
  140.                 sh = null;
  141.             }
  142.            
  143.             if (sh == null) {
  144.                 // not enough threads -> do nothing on this thread
  145.                 ScheduleMoreWorkIfNeeded();
  146.                 // try to schedule to worker thread
  147.             }
  148.            
  149.             return sh;
  150.         }
  151.        
  152.         // method called from SocketHandler at the end of request
  153.         internal void ScheduleMoreWorkIfNeeded()
  154.         {
  155.             // is queue empty?
  156.             if (_count == 0)
  157.                 return;
  158.            
  159.             // already scheduled enough work items
  160.             if (_workItemCount >= _workItemLimit)
  161.                 return;
  162.            
  163.             // queue the work item
  164.             Interlocked.Increment(ref _workItemCount);
  165.             ThreadPool.UnsafeQueueUserWorkItem(_workItemCallback, null);
  166.         }
  167.        
  168.         // method called to pick up more work
  169.         private void WorkItemCallback(object state)
  170.         {
  171.             Interlocked.Decrement(ref _workItemCount);
  172.            
  173.             // is queue empty?
  174.             if (_count == 0)
  175.                 return;
  176.            
  177.             int workerThreads;
  178.             int ioThreads;
  179.             ThreadPool.GetAvailableThreads(out workerThreads, out ioThreads);
  180.            
  181.             bool bHandledRequest = false;
  182.             // service another request if enough worker threads are available
  183.             if (workerThreads >= _minLocalFreeThreads) {
  184.                 // pick up request from the queue
  185.                 SocketHandler sh = DequeueRequest(workerThreads < _minExternFreeThreads);
  186.                 if (sh != null) {
  187.                     sh.ProcessRequestNow();
  188.                     bHandledRequest = true;
  189.                 }
  190.             }
  191.            
  192.             if (!bHandledRequest) {
  193.                 Thread.Sleep(250);
  194.                 ScheduleMoreWorkIfNeeded();
  195.             }
  196.         }
  197.        
  198.     }
  199. }

Developer Fusion