The Labs \ Source Viewer \ SSCLI \ System.Net \ AsyncConnectionPoolRequest

  1. //------------------------------------------------------------------------------
  2. // <copyright file="ConnectionPool.cs" company="Microsoft">
  3. //
  4. // Copyright (c) 2006 Microsoft Corporation. All rights reserved.
  5. //
  6. // The use and distribution terms for this software are contained in the file
  7. // named license.txt, which can be found in the root of this distribution.
  8. // By using this software in any fashion, you are agreeing to be bound by the
  9. // terms of this license.
  10. //
  11. // You must not remove this notice, or any other, from this software.
  12. //
  13. // </copyright>
  14. //------------------------------------------------------------------------------
  15. namespace System.Net
  16. {
  17.    
  18.     using System;
  19.     using System.Net.Sockets;
  20.     using System.Collections;
  21.     using System.Diagnostics;
  22.     using System.Globalization;
  23.     using System.Runtime.InteropServices;
  24.     using System.Security;
  25.     using System.Security.Permissions;
  26.     using System.Threading;
  27.    
  28.     internal delegate void GeneralAsyncDelegate(object request, object state);
  29.     internal delegate PooledStream CreateConnectionDelegate(ConnectionPool pool);
  30.    
  31.     /// <devdoc>
  32.     /// <para>
  33.     /// Impliments basic ConnectionPooling by pooling PooledStreams
  34.     /// </para>
  35.     /// </devdoc>
  36.     internal class ConnectionPool
  37.     {
  38.         private enum State
  39.         {
  40.             Initializing,
  41.             Running,
  42.             ShuttingDown
  43.         }
  44.        
  45.         private static TimerThread.Callback s_CleanupCallback = new TimerThread.Callback(CleanupCallbackWrapper);
  46.         private static TimerThread.Callback s_CancelErrorCallback = new TimerThread.Callback(CancelErrorCallbackWrapper);
  47.         private static TimerThread.Queue s_CancelErrorQueue = TimerThread.GetOrCreateQueue(ErrorWait);
  48.        
  49.         private const int MaxQueueSize = (int)1048576;
  50.        
  51.         // The order of these is important; we want the WaitAny call to be signaled
  52.         // for a free object before a creation signal. Only the index first signaled
  53.         // object is returned from the WaitAny call.
  54.         private const int SemaphoreHandleIndex = (int)0;
  55.         private const int ErrorHandleIndex = (int)1;
  56.         private const int CreationHandleIndex = (int)2;
  57.        
  58.         private const int WaitTimeout = (int)258;
  59.         private const int WaitAbandoned = (int)128;
  60.        
  61.         private const int ErrorWait = 5 * 1000;
  62.         // 5 seconds
  63.         private readonly TimerThread.Queue m_CleanupQueue;
  64.        
  65.         private State m_State;
  66.         private InterlockedStack m_StackOld;
  67.         private InterlockedStack m_StackNew;
  68.        
  69.         private int m_WaitCount;
  70.         private WaitHandle[] m_WaitHandles;
  71.        
  72.         private Exception m_ResError;
  73.         private volatile bool m_ErrorOccured;
  74.        
  75.         private TimerThread.Timer m_ErrorTimer;
  76.        
  77.         private ArrayList m_ObjectList;
  78.         private int m_TotalObjects;
  79.        
  80.         private Queue m_QueuedRequests;
  81.         private Thread m_AsyncThread;
  82.        
  83.         private int m_MaxPoolSize;
  84.         private int m_MinPoolSize;
  85.         private ServicePoint m_ServicePoint;
  86.         private CreateConnectionDelegate m_CreateConnectionCallback;
  87.        
  88.         private Mutex CreationMutex {
  89.             get { return (Mutex)m_WaitHandles[CreationHandleIndex]; }
  90.         }
  91.        
  92.         private ManualResetEvent ErrorEvent {
  93.             get { return (ManualResetEvent)m_WaitHandles[ErrorHandleIndex]; }
  94.         }
  95.        
  96.         private Semaphore Semaphore {
  97.             get { return (Semaphore)m_WaitHandles[SemaphoreHandleIndex]; }
  98.         }
  99.        
  100.         /// <summary>
  101.         /// <para>Constructor - binds pool with a servicePoint and sets up a cleanup Timer to nuke Idle Connections</para>
  102.         /// </summary>
  103.         internal ConnectionPool(ServicePoint servicePoint, int maxPoolSize, int minPoolSize, int idleTimeout, CreateConnectionDelegate createConnectionCallback) : base()
  104.         {
  105.             m_State = State.Initializing;
  106.            
  107.             m_CreateConnectionCallback = createConnectionCallback;
  108.             m_MaxPoolSize = maxPoolSize;
  109.             m_MinPoolSize = minPoolSize;
  110.             m_ServicePoint = servicePoint;
  111.            
  112.             Initialize();
  113.            
  114.             if (idleTimeout > 0) {
  115.                 m_CleanupQueue = TimerThread.GetOrCreateQueue(idleTimeout / 2);
  116.                 m_CleanupQueue.CreateTimer(s_CleanupCallback, this);
  117.             }
  118.         }
  119.        
  120.         /// <summary>
  121.         /// <para>Internal init stuff, creates stacks, queue, wait handles etc</para>
  122.         /// </summary>
  123.         private void Initialize()
  124.         {
  125.             m_StackOld = new InterlockedStack();
  126.             m_StackNew = new InterlockedStack();
  127.            
  128.             m_QueuedRequests = new Queue();
  129.            
  130.             m_WaitHandles = new WaitHandle[3];
  131.             m_WaitHandles[SemaphoreHandleIndex] = new Semaphore(0, MaxQueueSize);
  132.             m_WaitHandles[ErrorHandleIndex] = new ManualResetEvent(false);
  133.             m_WaitHandles[CreationHandleIndex] = new Mutex();
  134.            
  135.             m_ErrorTimer = null;
  136.             // No error yet.
  137.             m_ObjectList = new ArrayList();
  138.             m_State = State.Running;
  139.         }
  140.        
  141.        
  142.         /// <summary>
  143.         /// <para>Async state object, used for storing state on async calls</para>
  144.         /// </summary>
  145.         private class AsyncConnectionPoolRequest
  146.         {
  147.             public AsyncConnectionPoolRequest(ConnectionPool pool, object owningObject, GeneralAsyncDelegate asyncCallback, int creationTimeout)
  148.             {
  149.                 Pool = pool;
  150.                 OwningObject = owningObject;
  151.                 AsyncCallback = asyncCallback;
  152.                 CreationTimeout = creationTimeout;
  153.             }
  154.             public object OwningObject;
  155.             public GeneralAsyncDelegate AsyncCallback;
  156.             public bool Completed;
  157.             public ConnectionPool Pool;
  158.             public int CreationTimeout;
  159.         }
  160.        
  161.         /// <summary>
  162.         /// <para>Queues a AsyncConnectionPoolRequest to our queue of requests needing
  163.         /// a pooled stream. If an AsyncThread is not created, we create one,
  164.         /// and let it process the queued items</para>
  165.         /// </summary>
  166.         private void QueueRequest(AsyncConnectionPoolRequest asyncRequest)
  167.         {
  168.             lock (m_QueuedRequests) {
  169.                 m_QueuedRequests.Enqueue(asyncRequest);
  170.                 if (m_AsyncThread == null) {
  171.                     m_AsyncThread = new Thread(new ThreadStart(AsyncThread));
  172.                     m_AsyncThread.IsBackground = true;
  173.                     m_AsyncThread.Start();
  174.                 }
  175.             }
  176.         }
  177.        
  178.         /// <summary>
  179.         /// <para>Processes async queued requests that are blocked on needing a free pooled stream
  180.         /// works as follows:
  181.         /// 1. while there are blocked requests, take one out of the queue
  182.         /// 2. Wait for a free connection, when one becomes avail, then notify the request that its there
  183.         /// 3. repeat 1 until there are no more queued requests
  184.         /// 4. if there are no more requests waiting to for a free stream, then close down this thread
  185.         ///</para>
  186.         /// </summary>
  187.         private void AsyncThread()
  188.         {
  189.             do {
  190.                 while (m_QueuedRequests.Count > 0) {
  191.                     bool continueLoop = true;
  192.                     AsyncConnectionPoolRequest asyncState = null;
  193.                     lock (m_QueuedRequests) {
  194.                         asyncState = (AsyncConnectionPoolRequest)m_QueuedRequests.Dequeue();
  195.                     }
  196.                    
  197.                     WaitHandle[] localWaitHandles = m_WaitHandles;
  198.                     PooledStream PooledStream = null;
  199.                     try {
  200.                         while ((PooledStream == null) && continueLoop) {
  201.                             int result = WaitHandle.WaitAny(localWaitHandles, asyncState.CreationTimeout, false);
  202.                             PooledStream = Get(asyncState.OwningObject, result, ref continueLoop, ref localWaitHandles);
  203.                         }
  204.                        
  205.                         PooledStream.Activate(asyncState.OwningObject, asyncState.AsyncCallback);
  206.                     }
  207.                     catch (Exception e) {
  208.                         if (PooledStream != null) {
  209.                             PooledStream.Close();
  210.                             PutConnection(PooledStream, asyncState.OwningObject, asyncState.CreationTimeout);
  211.                         }
  212.                         asyncState.AsyncCallback(asyncState.OwningObject, e);
  213.                     }
  214.                     catch {
  215.                         if (PooledStream != null) {
  216.                             PooledStream.Close();
  217.                             PutConnection(PooledStream, asyncState.OwningObject, asyncState.CreationTimeout);
  218.                         }
  219.                         asyncState.AsyncCallback(asyncState.OwningObject, new Exception(SR.GetString(SR.net_nonClsCompliantException)));
  220.                     }
  221.                 }
  222.                 Thread.Sleep(500);
  223.                 lock (m_QueuedRequests) {
  224.                     if (m_QueuedRequests.Count == 0) {
  225.                         m_AsyncThread = null;
  226.                         break;
  227.                     }
  228.                 }
  229.             }
  230.             while (true);
  231.         }
  232.        
  233.         /// <summary>
  234.         /// <para>Count of total pooled streams associated with this pool, including streams that are being used</para>
  235.         /// </summary>
  236.         internal int Count {
  237.             get { return (m_TotalObjects); }
  238.         }
  239.        
  240.         /// <summary>
  241.         /// <para>Our ServicePoint, used for IP resolution</para>
  242.         /// </summary>
  243.         internal ServicePoint ServicePoint {
  244.             get { return m_ServicePoint; }
  245.         }
  246.        
  247.         /// <summary>
  248.         /// <para>Our Max Size of outstanding pooled streams</para>
  249.         /// </summary>
  250.         internal int MaxPoolSize {
  251.             get { return m_MaxPoolSize; }
  252.         }
  253.        
  254.         /// <summary>
  255.         /// <para>Our Min Size of the pool to remove idled items down to</para>
  256.         /// </summary>
  257.         internal int MinPoolSize {
  258.             get { return m_MinPoolSize; }
  259.         }
  260.        
  261.         /// <summary>
  262.         /// <para>An Error occurred usually due to an abort</para>
  263.         /// </summary>
  264.         private bool ErrorOccurred {
  265.             get { return m_ErrorOccured; }
  266.         }
  267.        
  268.         private static void CleanupCallbackWrapper(TimerThread.Timer timer, int timeNoticed, object context)
  269.         {
  270.             ConnectionPool pThis = (ConnectionPool)context;
  271.            
  272.             try {
  273.                 pThis.CleanupCallback();
  274.             }
  275.             finally {
  276.                 pThis.m_CleanupQueue.CreateTimer(s_CleanupCallback, context);
  277.             }
  278.         }
  279.        
  280.         /// <summary>
  281.         /// <para>This is called by a timer, to check for needed cleanup of idle pooled streams</para>
  282.         /// </summary>
  283.         private void CleanupCallback()
  284.         {
  285.             // Called when the cleanup-timer ticks over.
  286.             //
  287.             // This is the automatic prunning method. Every period, we will perform a two-step
  288.             // process. First, for the objects above MinPool, we will obtain the semaphore for
  289.             // the object and then destroy it if it was on the old stack. We will continue this
  290.             // until we either reach MinPool size, or we are unable to obtain a free object, or
  291.             // until we have exhausted all the objects on the old stack. After that, push all
  292.             // objects on the new stack to the old stack. So, every period the objects on the
  293.             // old stack are destroyed and the objects on the new stack are pushed to the old
  294.             // stack. All objects that are currently out and in use are not on either stack.
  295.             // With this logic, a object is prunned if unused for at least one period but not
  296.             // more than two periods.
  297.            
  298.             // Destroy free objects above MinPool size from old stack.
  299.             while (Count > MinPoolSize) {
  300.                 // While above MinPoolSize...
  301.                 if (Semaphore.WaitOne(0, false)) {
  302.                     // != WaitTimeout
  303.                     // We obtained a objects from the semaphore.
  304.                     PooledStream pooledStream = (PooledStream)m_StackOld.Pop();
  305.                    
  306.                     if (null != pooledStream) {
  307.                         // If we obtained one from the old stack, destroy it.
  308.                         Destroy(pooledStream);
  309.                     }
  310.                     else {
  311.                         // Else we exhausted the old stack, so break.
  312.                         Semaphore.ReleaseSemaphore();
  313.                         break;
  314.                     }
  315.                 }
  316.                 else
  317.                     break;
  318.             }
  319.            
  320.             // Push to the old-stack. For each free object, move object from new stack
  321.             // to old stack.
  322.             if (Semaphore.WaitOne(0, false)) {
  323.                 // != WaitTimeout
  324.                 for (;;) {
  325.                     PooledStream pooledStream = (PooledStream)m_StackNew.Pop();
  326.                    
  327.                     if (null == pooledStream)
  328.                         break;
  329.                    
  330.                     GlobalLog.Assert(!pooledStream.IsEmancipated, "Pooled object not in pool.");
  331.                     GlobalLog.Assert(pooledStream.CanBePooled, "Pooled object is not poolable.");
  332.                    
  333.                     m_StackOld.Push(pooledStream);
  334.                 }
  335.                 Semaphore.ReleaseSemaphore();
  336.             }
  337.         }
  338.        
  339.         /// <summary>
  340.         /// <para>Creates a new PooledStream, performs checks as well on the new stream</para>
  341.         /// </summary>
  342.         private PooledStream Create(CreateConnectionDelegate createConnectionCallback)
  343.         {
  344.             GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::Create");
  345.             PooledStream newObj = null;
  346.            
  347.             try {
  348.                 newObj = createConnectionCallback(this);
  349.                
  350.                 if (null == newObj)
  351.                     throw new InternalException();
  352.                 // Create succeeded, but null object
  353.                 if (!newObj.CanBePooled)
  354.                     throw new InternalException();
  355.                 // Create succeeded, but non-poolable object
  356.                 newObj.PrePush(null);
  357.                
  358.                 lock (m_ObjectList.SyncRoot) {
  359.                     m_ObjectList.Add(newObj);
  360.                     m_TotalObjects = m_ObjectList.Count;
  361.                 }
  362.                
  363.                 GlobalLog.Print("Create pooledStream#" + ValidationHelper.HashString(newObj));
  364.             }
  365.             catch (Exception e) {
  366.                 GlobalLog.Print("Pool Exception: " + e.Message);
  367.                
  368.                 newObj = null;
  369.                 // set to null, so we do not return bad new object
  370.                 // Failed to create instance
  371.                 m_ResError = e;
  372.                 Abort();
  373.             }
  374.             catch {
  375.                 GlobalLog.Print("Pool Exception: Non-CLS Compliant Exception");
  376.                
  377.                 newObj = null;
  378.                 // set to null, so we do not return bad new object
  379.                 // Failed to create instance
  380.                 m_ResError = new Exception(SR.GetString(SR.net_nonClsCompliantException));
  381.                 Abort();
  382.             }
  383.             GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::Create", ValidationHelper.HashString(newObj));
  384.             return newObj;
  385.         }
  386.        
  387.        
  388.         /// <summary>
  389.         /// <para>Destroys a pooled stream from the pool</para>
  390.         /// </summary>
  391.         private void Destroy(PooledStream pooledStream)
  392.         {
  393.             GlobalLog.Print("Destroy pooledStream#" + ValidationHelper.HashString(pooledStream));
  394.            
  395.             try {
  396.                 lock (m_ObjectList.SyncRoot) {
  397.                     m_ObjectList.Remove(pooledStream);
  398.                     m_TotalObjects = m_ObjectList.Count;
  399.                 }
  400.             }
  401.             finally {
  402.                 if (null != pooledStream) {
  403.                     pooledStream.Destroy();
  404.                 }
  405.             }
  406.         }
  407.        
  408.         private static void CancelErrorCallbackWrapper(TimerThread.Timer timer, int timeNoticed, object context)
  409.         {
  410.             ((ConnectionPool)context).CancelErrorCallback();
  411.         }
  412.        
  413.         /// <summary>
  414.         /// <para>Called on error, after we waited a set amount of time from aborting</para>
  415.         /// </summary>
  416.         private void CancelErrorCallback()
  417.         {
  418.             TimerThread.Timer timer = m_ErrorTimer;
  419.             if (timer != null && timer.Cancel()) {
  420.                 m_ErrorOccured = false;
  421.                 ErrorEvent.Reset();
  422.                 m_ErrorTimer = null;
  423.                 m_ResError = null;
  424.             }
  425.         }
  426.        
  427.         /// <summary>
  428.         /// <para>Retrieves a pooled stream from the pool proper
  429.         /// this work by first attemting to find something in the pool on the New stack
  430.         /// and then trying the Old stack if something is not there availble </para>
  431.         /// </summary>
  432.         private PooledStream GetFromPool(object owningObject)
  433.         {
  434.             PooledStream res = null;
  435.             GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::GetFromPool");
  436.             res = (PooledStream)m_StackNew.Pop();
  437.             if (null == res) {
  438.                 res = (PooledStream)m_StackOld.Pop();
  439.             }
  440.            
  441.             // Shouldn't be null, we could assert here.
  442.             GlobalLog.Assert(res != null, "GetFromPool called with nothing in the pool!");
  443.            
  444.             if (null != res) {
  445.                 res.PostPop(owningObject);
  446.                 GlobalLog.Print("GetFromGeneralPool pooledStream#" + ValidationHelper.HashString(res));
  447.             }
  448.            
  449.             GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::GetFromPool", ValidationHelper.HashString(res));
  450.             return (res);
  451.         }
  452.        
  453.         /// <summary>
  454.         /// <para>Retrieves the pooled stream out of the pool, does this by using the result
  455.         /// of a WaitAny as input, and then based on whether it has a mutex, event, semaphore,
  456.         /// or timeout decides what action to take</para>
  457.         /// </summary>
  458.         private PooledStream Get(object owningObject, int result, ref bool continueLoop, ref WaitHandle[] waitHandles)
  459.         {
  460.             PooledStream pooledStream = null;
  461.             GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::Get", result.ToString());
  462.            
  463.            
  464.             // From the WaitAny docs: "If more than one object became signaled during
  465.             // the call, this is the array index of the signaled object with the
  466.             // smallest index value of all the signaled objects." This is important
  467.             // so that the free object signal will be returned before a creation
  468.             // signal.
  469.            
  470.             switch (result) {
  471.                 case WaitTimeout:
  472.                     Interlocked.Decrement(ref m_WaitCount);
  473.                     continueLoop = false;
  474.                     GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::Get", "throw Timeout WebException");
  475.                     throw new WebException(NetRes.GetWebStatusString("net_timeout", WebExceptionStatus.ConnectFailure), WebExceptionStatus.Timeout);
  476.                     break;
  477.                 case ErrorHandleIndex:
  478.                    
  479.                    
  480.                     // Throw the error that PoolCreateRequest stashed.
  481.                     int newWaitCount = Interlocked.Decrement(ref m_WaitCount);
  482.                     continueLoop = false;
  483.                     Exception exceptionToThrow = m_ResError;
  484.                     if (newWaitCount == 0) {
  485.                         CancelErrorCallback();
  486.                     }
  487.                     throw exceptionToThrow;
  488.                     break;
  489.                 case CreationHandleIndex:
  490.                    
  491.                     try {
  492.                         continueLoop = true;
  493.                         pooledStream = UserCreateRequest();
  494.                        
  495.                         if (null != pooledStream) {
  496.                             pooledStream.PostPop(owningObject);
  497.                             Interlocked.Decrement(ref m_WaitCount);
  498.                             continueLoop = false;
  499.                            
  500.                         }
  501.                         else {
  502.                             // If we were not able to create an object, check to see if
  503.                             // we reached MaxPoolSize. If so, we will no longer wait on
  504.                             // the CreationHandle, but instead wait for a free object or
  505.                             // the timeout.
  506.                            
  507.                             // BUG - if we receive the CreationHandle midway into the wait
  508.                             // period and re-wait, we will be waiting on the full period
  509.                             if (Count >= MaxPoolSize && 0 != MaxPoolSize) {
  510.                                 if (!ReclaimEmancipatedObjects()) {
  511.                                     // modify handle array not to wait on creation mutex anymore
  512.                                     waitHandles = new WaitHandle[2];
  513.                                     waitHandles[0] = m_WaitHandles[0];
  514.                                     waitHandles[1] = m_WaitHandles[1];
  515.                                 }
  516.                             }
  517.                            
  518.                         }
  519.                     }
  520.                     finally {
  521.                         CreationMutex.ReleaseMutex();
  522.                     }
  523.                     break;
  524.                 default:
  525.                    
  526.                     //
  527.                     // guaranteed available inventory
  528.                     //
  529.                     Interlocked.Decrement(ref m_WaitCount);
  530.                     pooledStream = GetFromPool(owningObject);
  531.                     continueLoop = false;
  532.                     break;
  533.             }
  534.             GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::Get", ValidationHelper.HashString(pooledStream));
  535.             return pooledStream;
  536.         }
  537.        
  538.         /// <devdoc>
  539.         /// <para>Aborts the queued requests to the pool</para>
  540.         /// </devdoc>
  541.         internal void Abort()
  542.         {
  543.             if (m_ResError == null) {
  544.                 m_ResError = new WebException(NetRes.GetWebStatusString("net_requestaborted", WebExceptionStatus.RequestCanceled), WebExceptionStatus.RequestCanceled);
  545.             }
  546.             ErrorEvent.Set();
  547.             m_ErrorOccured = true;
  548.             m_ErrorTimer = s_CancelErrorQueue.CreateTimer(s_CancelErrorCallback, this);
  549.         }
  550.        
  551.         /// <devdoc>
  552.         /// <para>Attempts to create a PooledStream, by trying to get a pooled Connection,
  553.         /// or by creating its own new one</para>
  554.         /// </devdoc>
  555.         internal PooledStream GetConnection(object owningObject, GeneralAsyncDelegate asyncCallback, int creationTimeout)
  556.         {
  557.             int result;
  558.             PooledStream stream = null;
  559.             bool continueLoop = true;
  560.             bool async = (asyncCallback != null) ? true : false;
  561.            
  562.             GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::GetConnection");
  563.            
  564.             if (m_State != State.Running) {
  565.                 throw new InternalException();
  566.             }
  567.            
  568.             Interlocked.Increment(ref m_WaitCount);
  569.             WaitHandle[] localWaitHandles = m_WaitHandles;
  570.            
  571.             if (async) {
  572.                 result = WaitHandle.WaitAny(localWaitHandles, 0, false);
  573.                 if (result != WaitTimeout) {
  574.                     stream = Get(owningObject, result, ref continueLoop, ref localWaitHandles);
  575.                 }
  576.                 if (stream == null) {
  577.                     GlobalLog.Print("GetConnection:" + ValidationHelper.HashString(this) + " going async");
  578.                     AsyncConnectionPoolRequest asyncState = new AsyncConnectionPoolRequest(this, owningObject, asyncCallback, creationTimeout);
  579.                     QueueRequest(asyncState);
  580.                 }
  581.             }
  582.             else {
  583.                 // loop while we don't have an error/timeout and we haven't gotten a stream yet
  584.                 while ((stream == null) && continueLoop) {
  585.                     result = WaitHandle.WaitAny(localWaitHandles, creationTimeout, false);
  586.                     stream = Get(owningObject, result, ref continueLoop, ref localWaitHandles);
  587.                 }
  588.             }
  589.            
  590.             if (null != stream) {
  591.                 // if there is already a stream, then we're not going async
  592.                 if (!stream.IsInitalizing) {
  593.                     asyncCallback = null;
  594.                 }
  595.                
  596.                 try {
  597.                     // If activate returns false, it is going to finish asynchronously
  598.                     // and therefore the stream will be returned in a callback and
  599.                     // we should not return it here (return null)
  600.                     if (stream.Activate(owningObject, asyncCallback) == false)
  601.                         stream = null;
  602.                 }
  603.                 catch {
  604.                     stream.Close();
  605.                     PutConnection(stream, owningObject, creationTimeout);
  606.                     throw;
  607.                 }
  608.             }
  609.             else if (!async) {
  610.                 throw new InternalException();
  611.             }
  612.            
  613.             GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::GetConnection", ValidationHelper.HashString(stream));
  614.             return (stream);
  615.         }
  616.        
  617.         /// <devdoc>
  618.         /// <para>Attempts to return a PooledStream to the pool</para>
  619.         /// </devdoc>
  620.         internal void PutConnection(PooledStream pooledStream, object owningObject, int creationTimeout)
  621.         {
  622.             GlobalLog.Print("ConnectionPool#" + ValidationHelper.HashString(this) + "::PutConnection");
  623.             if (pooledStream == null) {
  624.                 throw new ArgumentNullException("pooledStream");
  625.             }
  626.            
  627.             pooledStream.PrePush(owningObject);
  628.            
  629.             if (m_State != State.ShuttingDown) {
  630.                 pooledStream.Deactivate();
  631.                
  632.                 // cancel our error status, if we have no new requests waiting anymore
  633.                 if (m_WaitCount == 0) {
  634.                     CancelErrorCallback();
  635.                 }
  636.                
  637.                 if (pooledStream.CanBePooled) {
  638.                     PutNew(pooledStream);
  639.                 }
  640.                 else {
  641.                     Destroy(pooledStream);
  642.                    
  643.                     // Make sure we recreate a new pooled stream, if there are requests for a stream
  644.                     // at this point
  645.                     if (m_WaitCount > 0) {
  646.                         if (!CreationMutex.WaitOne(creationTimeout, false)) {
  647.                             Abort();
  648.                         }
  649.                         else {
  650.                             try {
  651.                                 pooledStream = UserCreateRequest();
  652.                                 if (null != pooledStream) {
  653.                                     PutNew(pooledStream);
  654.                                 }
  655.                             }
  656.                             finally {
  657.                                 CreationMutex.ReleaseMutex();
  658.                             }
  659.                         }
  660.                     }
  661.                 }
  662.             }
  663.             else {
  664.                 // If we're shutting down, we destroy the object.
  665.                 Destroy(pooledStream);
  666.             }
  667.         }
  668.        
  669.        
  670.         /// <devdoc>
  671.         /// <para>Places a new/reusable stream in the new stack of the pool</para>
  672.         /// </devdoc>
  673.         private void PutNew(PooledStream pooledStream)
  674.         {
  675.             GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::PutNew", "#" + ValidationHelper.HashString(pooledStream));
  676.            
  677.             GlobalLog.Assert(null != pooledStream, "Why are we adding a null object to the pool?");
  678.             GlobalLog.Assert(pooledStream.CanBePooled, "Non-poolable object in pool.");
  679.            
  680.             m_StackNew.Push(pooledStream);
  681.             Semaphore.ReleaseSemaphore();
  682.             GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::PutNew");
  683.         }
  684.        
  685.        
  686.         /// <devdoc>
  687.         /// <para>Reclaim any pooled Streams that have seen their users/WebRequests GCed away</para>
  688.         /// </devdoc>
  689.         private bool ReclaimEmancipatedObjects()
  690.         {
  691.             bool emancipatedObjectFound = false;
  692.             GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::ReclaimEmancipatedObjects");
  693.            
  694.             lock (m_ObjectList.SyncRoot) {
  695.                
  696.                 object[] objectList = m_ObjectList.ToArray();
  697.                 if (null != objectList) {
  698.                    
  699.                     for (int i = 0; i < objectList.Length; ++i) {
  700.                         PooledStream pooledStream = (PooledStream)objectList[i];
  701.                        
  702.                         if (null != pooledStream) {
  703.                             bool locked = false;
  704.                            
  705.                             try {
  706.                                 locked = Monitor.TryEnter(pooledStream);
  707.                                
  708.                                 if (locked) {
  709.                                     if (pooledStream.IsEmancipated) {
  710.                                        
  711.                                         GlobalLog.Print("EmancipatedObject pooledStream#" + ValidationHelper.HashString(pooledStream));
  712.                                         PutConnection(pooledStream, null, Timeout.Infinite);
  713.                                         emancipatedObjectFound = true;
  714.                                     }
  715.                                 }
  716.                             }
  717.                             finally {
  718.                                 if (locked)
  719.                                     Monitor.Exit(pooledStream);
  720.                             }
  721.                         }
  722.                     }
  723.                 }
  724.             }
  725.             GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::ReclaimEmancipatedObjects", emancipatedObjectFound.ToString());
  726.             return emancipatedObjectFound;
  727.         }
  728.        
  729.         /// <devdoc>
  730.         /// <para>Creates a new PooledStream is allowable</para>
  731.         /// </devdoc>
  732.         private PooledStream UserCreateRequest()
  733.         {
  734.             // called by user when they were not able to obtain a free object but
  735.             // instead obtained creation mutex
  736.             GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::UserCreateRequest");
  737.            
  738.             PooledStream pooledStream = null;
  739.            
  740.             if (!ErrorOccurred) {
  741.                 if (Count < MaxPoolSize || 0 == MaxPoolSize) {
  742.                    
  743.                     if ((Count & 1) == 1 || !ReclaimEmancipatedObjects())
  744.                         pooledStream = Create(m_CreateConnectionCallback);
  745.                 }
  746.             }
  747.             GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::UserCreateRequest", ValidationHelper.HashString(pooledStream));
  748.             return pooledStream;
  749.         }
  750.     }
  751.    
  752.    
  753.     /// <devdoc>
  754.     /// <para>Used to Pool streams in a thread safe manner</para>
  755.     /// </devdoc>
  756.     internal sealed class InterlockedStack
  757.     {
  758.         private readonly Stack _stack = new Stack();
  759.         private int _count;
  760.        
  761.         #if DEBUG
  762.         private readonly Hashtable doublepush = new Hashtable();
  763.         #endif
  764.        
  765.         internal InterlockedStack()
  766.         {
  767.         }
  768.        
  769.         internal void Push(object pooledStream)
  770.         {
  771.             GlobalLog.Assert(null != pooledStream, "push null");
  772.             if (null == pooledStream) {
  773.                 throw new ArgumentNullException("pooledStream");
  774.             }
  775.             lock (_stack.SyncRoot) {
  776.                 #if DEBUG
  777.                 GlobalLog.Assert(null == doublepush[pooledStream], "object already in stack");
  778.                 doublepush[pooledStream] = _stack.Count;
  779.                 #endif
  780.                 _stack.Push(pooledStream);
  781.                 #if DEBUG
  782.                 GlobalLog.Assert(_count + 1 == _stack.Count, "push count mishandle");
  783.                 #endif
  784.                 _count = _stack.Count;
  785.             }
  786.         }
  787.        
  788.         internal object Pop()
  789.         {
  790.             lock (_stack.SyncRoot) {
  791.                 object pooledStream = null;
  792.                 if (0 < _stack.Count) {
  793.                     pooledStream = _stack.Pop();
  794.                     #if DEBUG
  795.                     GlobalLog.Assert(_count - 1 == _stack.Count, "pop count mishandle");
  796.                     doublepush.Remove(pooledStream);
  797.                     #endif
  798.                     _count = _stack.Count;
  799.                 }
  800.                 return pooledStream;
  801.             }
  802.         }
  803.     }
  804.    
  805. }

Developer Fusion