The Labs \ Source Viewer \ SSCLI \ System.Diagnostics \ AsyncStreamReader

  1. // ==++==
  2. //
  3. //
  4. // Copyright (c) 2006 Microsoft Corporation. All rights reserved.
  5. //
  6. // The use and distribution terms for this software are contained in the file
  7. // named license.txt, which can be found in the root of this distribution.
  8. // By using this software in any fashion, you are agreeing to be bound by the
  9. // terms of this license.
  10. //
  11. // You must not remove this notice, or any other, from this software.
  12. //
  13. //
  14. // ==--==
  15. /*============================================================
  16. **
  17. ** Class:  AsyncStreamReader
  18. **
  19. ** Purpose: For reading text from streams using a particular
  20. ** encoding in an asychronous manner used by the process class
  21. **
  22. **
  23. ===========================================================*/
  24. namespace System.Diagnostics
  25. {
  26.     using System;
  27.     using System.IO;
  28.     using System.Text;
  29.     using System.Runtime.InteropServices;
  30.     using System.Threading;
  31.     using System.Collections;
  32.    
  33.     internal delegate void UserCallBack(string data);
  34.    
  35.     internal class AsyncStreamReader : IDisposable
  36.     {
  37.         internal const int DefaultBufferSize = 1024;
  38.         // Byte buffer size
  39.         private const int MinBufferSize = 128;
  40.        
  41.         private Stream stream;
  42.         private Encoding encoding;
  43.         private Decoder decoder;
  44.         private byte[] byteBuffer;
  45.         private char[] charBuffer;
  46.         // Record the number of valid bytes in the byteBuffer, for a few checks.
  47.        
  48.         // This is the maximum number of chars we can get from one call to
  49.         // ReadBuffer. Used so ReadBuffer can tell when to copy data into
  50.         // a user's char[] directly, instead of our internal char[].
  51.         private int _maxCharsPerBuffer;
  52.        
  53.         // Store a backpointer to the process class, to check for user callbacks
  54.         private Process process;
  55.        
  56.         // Delegate to call user function.
  57.         private UserCallBack userCallBack;
  58.        
  59.         // Internal Cancel operation
  60.         private bool cancelOperation;
  61.         private ManualResetEvent eofEvent;
  62.         private Queue messageQueue;
  63.         private StringBuilder sb;
  64.         private bool bLastCarriageReturn;
  65.        
  66.         internal AsyncStreamReader(Process process, Stream stream, UserCallBack callback, Encoding encoding) : this(process, stream, callback, encoding, DefaultBufferSize)
  67.         {
  68.         }
  69.        
  70.        
  71.         // Creates a new AsyncStreamReader for the given stream. The
  72.         // character encoding is set by encoding and the buffer size,
  73.         // in number of 16-bit characters, is set by bufferSize.
  74.         //
  75.         internal AsyncStreamReader(Process process, Stream stream, UserCallBack callback, Encoding encoding, int bufferSize)
  76.         {
  77.             Debug.Assert(process != null && stream != null && encoding != null && callback != null, "Invalid arguments!");
  78.             Debug.Assert(stream.CanRead, "Stream must be readable!");
  79.             Debug.Assert(bufferSize > 0, "Invalid buffer size!");
  80.            
  81.             Init(process, stream, callback, encoding, bufferSize);
  82.             messageQueue = new Queue();
  83.         }
  84.        
  85.         private void Init(Process process, Stream stream, UserCallBack callback, Encoding encoding, int bufferSize)
  86.         {
  87.             this.process = process;
  88.             this.stream = stream;
  89.             this.encoding = encoding;
  90.             this.userCallBack = callback;
  91.             decoder = encoding.GetDecoder();
  92.             if (bufferSize < MinBufferSize)
  93.                 bufferSize = MinBufferSize;
  94.             byteBuffer = new byte[bufferSize];
  95.             _maxCharsPerBuffer = encoding.GetMaxCharCount(bufferSize);
  96.             charBuffer = new char[_maxCharsPerBuffer];
  97.             cancelOperation = false;
  98.             eofEvent = new ManualResetEvent(false);
  99.             sb = null;
  100.             this.bLastCarriageReturn = false;
  101.         }
  102.        
  103.         public virtual void Close()
  104.         {
  105.             Dispose(true);
  106.         }
  107.        
  108.         void IDisposable.Dispose()
  109.         {
  110.             Dispose(true);
  111.         }
  112.        
  113.         protected virtual void Dispose(bool disposing)
  114.         {
  115.             if (disposing) {
  116.                 if (stream != null)
  117.                     stream.Close();
  118.             }
  119.             if (stream != null) {
  120.                 stream = null;
  121.                 encoding = null;
  122.                 decoder = null;
  123.                 byteBuffer = null;
  124.                 charBuffer = null;
  125.             }
  126.            
  127.             if (eofEvent != null) {
  128.                 eofEvent.Close();
  129.                 eofEvent = null;
  130.             }
  131.         }
  132.        
  133.         public virtual Encoding CurrentEncoding {
  134.             get { return encoding; }
  135.         }
  136.        
  137.         public virtual Stream BaseStream {
  138.             get { return stream; }
  139.         }
  140.        
  141.         // User calls BeginRead to start the asynchronous read
  142.         internal void BeginReadLine()
  143.         {
  144.             if (cancelOperation) {
  145.                 cancelOperation = false;
  146.             }
  147.            
  148.             if (sb == null) {
  149.                 sb = new StringBuilder(DefaultBufferSize);
  150.                 stream.BeginRead(byteBuffer, 0, byteBuffer.Length, new AsyncCallback(ReadBuffer), null);
  151.             }
  152.             else {
  153.                 FlushMessageQueue();
  154.             }
  155.         }
  156.        
  157.         internal void CancelOperation()
  158.         {
  159.             cancelOperation = true;
  160.         }
  161.        
  162.         // This is the async callback function. Only one thread could/should call this.
  163.         private void ReadBuffer(IAsyncResult ar)
  164.         {
  165.            
  166.             int byteLen;
  167.            
  168.             try {
  169.                 byteLen = stream.EndRead(ar);
  170.             }
  171.             catch (IOException) {
  172.                 // We should ideally consume errors from operations getting cancelled
  173.                 // so that we don't crash the unsuspecting parent with an unhandled exc.
  174.                 // This seems to come in 2 forms of exceptions (depending on platform and scenario),
  175.                 // namely OperationCanceledException and IOException (for errorcode that we don't
  176.                 // map explicitly).
  177.                 byteLen = 0;
  178.                 // Treat this as EOF
  179.             }
  180.             catch (OperationCanceledException) {
  181.                 // We should consume any OperationCanceledException from child read here
  182.                 // so that we don't crash the parent with an unhandled exc
  183.                 byteLen = 0;
  184.                 // Treat this as EOF
  185.             }
  186.            
  187.             if (byteLen == 0) {
  188.                 // We're at EOF, we won't call this function again from here on.
  189.                 lock (messageQueue) {
  190.                     if (sb.Length != 0) {
  191.                         messageQueue.Enqueue(sb.ToString());
  192.                         sb.Length = 0;
  193.                     }
  194.                     messageQueue.Enqueue(null);
  195.                 }
  196.                
  197.                 try {
  198.                     // UserCallback could throw, we should still set the eofEvent
  199.                     FlushMessageQueue();
  200.                 }
  201.                 finally {
  202.                     eofEvent.Set();
  203.                 }
  204.             }
  205.             else {
  206.                 int charLen = decoder.GetChars(byteBuffer, 0, byteLen, charBuffer, 0);
  207.                 sb.Append(charBuffer, 0, charLen);
  208.                 GetLinesFromStringBuilder();
  209.                 stream.BeginRead(byteBuffer, 0, byteBuffer.Length, new AsyncCallback(ReadBuffer), null);
  210.             }
  211.         }
  212.        
  213.        
  214.         // Read lines stored in StringBuilder and the buffer we just read into.
  215.         // A line is defined as a sequence of characters followed by
  216.         // a carriage return ('\r'), a line feed ('\n'), or a carriage return
  217.         // immediately followed by a line feed. The resulting string does not
  218.         // contain the terminating carriage return and/or line feed. The returned
  219.         // value is null if the end of the input stream has been reached.
  220.         //
  221.        
  222.         private void GetLinesFromStringBuilder()
  223.         {
  224.             int i = 0;
  225.             int lineStart = 0;
  226.             int len = sb.Length;
  227.            
  228.             // skip a beginning '\n' character of new block if last block ended
  229.             // with '\r'
  230.             if (bLastCarriageReturn && (len > 0) && sb[0] == '\n') {
  231.                 i = 1;
  232.                 lineStart = 1;
  233.                 bLastCarriageReturn = false;
  234.             }
  235.            
  236.             while (i < len) {
  237.                 char ch = sb[i];
  238.                 // Note the following common line feed chars:
  239.                 // \n - UNIX \r\n - DOS \r - Mac
  240.                 if (ch == '\r' || ch == '\n') {
  241.                     string s = sb.ToString(lineStart, i - lineStart);
  242.                     lineStart = i + 1;
  243.                     // skip the "\n" character following "\r" character
  244.                     if ((ch == '\r') && (lineStart < len) && (sb[lineStart] == '\n')) {
  245.                         lineStart++;
  246.                         i++;
  247.                     }
  248.                    
  249.                     lock (messageQueue) {
  250.                         messageQueue.Enqueue(s);
  251.                     }
  252.                 }
  253.                 i++;
  254.             }
  255.             if (sb[len - 1] == '\r') {
  256.                 bLastCarriageReturn = true;
  257.             }
  258.             // Keep the rest characaters which can't form a new line in string builder.
  259.             if (lineStart < len) {
  260.                 sb.Remove(0, lineStart);
  261.             }
  262.             else {
  263.                 sb.Length = 0;
  264.             }
  265.            
  266.             FlushMessageQueue();
  267.         }
  268.        
  269.         private void FlushMessageQueue()
  270.         {
  271.             while (true) {
  272.                
  273.                 // When we call BeginReadLine, we also need to flush the queue
  274.                 // So there could be a race between the ReadBuffer and BeginReadLine
  275.                 // We need to take lock before DeQueue.
  276.                 if (messageQueue.Count > 0) {
  277.                     lock (messageQueue) {
  278.                         if (messageQueue.Count > 0) {
  279.                             string s = (string)messageQueue.Dequeue();
  280.                             // skip if the read is the read is cancelled
  281.                             // this might happen inside UserCallBack
  282.                             // However, continue to drain the queue
  283.                             if (!cancelOperation) {
  284.                                 userCallBack(s);
  285.                             }
  286.                         }
  287.                     }
  288.                 }
  289.                 else {
  290.                     break;
  291.                 }
  292.             }
  293.         }
  294.        
  295.         // Wait until we hit EOF. This is called from Process.WaitForExit
  296.         // We will lose some information if we don't do this.
  297.         internal void WaitUtilEOF()
  298.         {
  299.             if (eofEvent != null) {
  300.                 eofEvent.WaitOne();
  301.                 eofEvent.Close();
  302.                 eofEvent = null;
  303.             }
  304.         }
  305.     }
  306. }

Developer Fusion