The Labs \ Source Viewer \ SSCLI \ System.IO \ Stream

  1. // ==++==
  2. //
  3. //
  4. // Copyright (c) 2006 Microsoft Corporation. All rights reserved.
  5. //
  6. // The use and distribution terms for this software are contained in the file
  7. // named license.txt, which can be found in the root of this distribution.
  8. // By using this software in any fashion, you are agreeing to be bound by the
  9. // terms of this license.
  10. //
  11. // You must not remove this notice, or any other, from this software.
  12. //
  13. //
  14. // ==--==
  15. /*============================================================
  16. **
  17. ** Class:  Stream
  18. **
  19. **
  20. ** Purpose: Abstract base class for all Streams.  Provides
  21. ** default implementations of asynchronous reads & writes, in
  22. ** terms of the synchronous reads & writes (and vice versa).
  23. **
  24. **
  25. ===========================================================*/
  26. using System;
  27. using System.Threading;
  28. using System.Runtime.InteropServices;
  29. using System.Runtime.Remoting.Messaging;
  30. using System.Security;
  31. using System.Security.Permissions;
  32. namespace System.IO
  33. {
  34.     [Serializable()]
  35.     [ComVisible(true)]
  36.     public abstract class Stream : MarshalByRefObject, IDisposable
  37.     {
  38.        
  39.         public static readonly Stream Null = new NullStream();
  40.        
  41.         // To implement Async IO operations on streams that don't support async IO
  42.         private delegate int ReadDelegate(        [In(), Out()]
  43. byte[] bytes, int index, int offset);
  44.         private delegate void WriteDelegate(byte[] bytes, int index, int offset);
  45.        
  46.         [NonSerialized()]
  47.         private ReadDelegate _readDelegate;
  48.         [NonSerialized()]
  49.         private WriteDelegate _writeDelegate;
  50.         // Use a semaphore here with a max count of 1. Note Mutex in Win32
  51.         // is very different from a semaphore, requiring thread affinity.
  52.         [NonSerialized()]
  53.         private AutoResetEvent _asyncActiveEvent;
  54.        
  55.         [NonSerialized()]
  56.         // Keeps track of pending IO and not close the event until after
  57.         // all operations are completed. This avoids NullRefExc for _asyncActiveEvent
  58.         private int _asyncActiveCount = 1;
  59.         public abstract bool CanRead {
  60.             get;
  61.         }
  62.        
  63.         // If CanSeek is false, Position, Seek, Length, and SetLength should throw.
  64.         public abstract bool CanSeek {
  65.             get;
  66.         }
  67.        
  68.         [ComVisible(false)]
  69.         public virtual bool CanTimeout {
  70.             get { return false; }
  71.         }
  72.        
  73.         public abstract bool CanWrite {
  74.             get;
  75.         }
  76.        
  77.         public abstract long Length {
  78.             get;
  79.         }
  80.        
  81.         public abstract long Position {
  82.             get;
  83.             set;
  84.         }
  85.        
  86.         [ComVisible(false)]
  87.         public virtual int ReadTimeout {
  88.             get {
  89.                 throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
  90.             }
  91.             set {
  92.                 throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
  93.             }
  94.         }
  95.        
  96.         [ComVisible(false)]
  97.         public virtual int WriteTimeout {
  98.             get {
  99.                 throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
  100.             }
  101.             set {
  102.                 throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
  103.             }
  104.         }
  105.        
  106.         // Stream used to require that all cleanup logic went into Close(),
  107.         // which was thought up before we invented IDisposable. However, we
  108.         // need to follow the IDisposable pattern so that users can write
  109.         // sensible subclasses without needing to inspect all their base
  110.         // classes, and without worrying about version brittleness, from a
  111.         // base class switching to the Dispose pattern. We're moving
  112.         // Stream to the Dispose(bool) pattern - that's where all subclasses
  113.         // should put their cleanup starting in V2.
  114.         public virtual void Close()
  115.         {
  116.             Dispose(true);
  117.             GC.SuppressFinalize(this);
  118.         }
  119.        
  120.         public void Dispose()
  121.         {
  122.             Close();
  123.         }
  124.        
  125.         protected virtual void Dispose(bool disposing)
  126.         {
  127.             // Note: Never change this to call other virtual methods on Stream
  128.             // like Write, since the state on subclasses has already been
  129.             // torn down. This is the last code to run on cleanup for a stream.
  130.             if ((disposing) && (_asyncActiveEvent != null))
  131.                 _CloseAsyncActiveEvent(Interlocked.Decrement(ref _asyncActiveCount));
  132.         }
  133.        
  134.         private void _CloseAsyncActiveEvent(int asyncActiveCount)
  135.         {
  136.             BCLDebug.Assert(_asyncActiveCount >= 0, "ref counting mismatch, possible race in the code");
  137.            
  138.             // If no pending async IO, close the event
  139.             if ((_asyncActiveEvent != null) && (asyncActiveCount == 0)) {
  140.                 _asyncActiveEvent.Close();
  141.                 _asyncActiveEvent = null;
  142.             }
  143.         }
  144.        
  145.         public abstract void Flush();
  146.        
  147.        
  148.         [Obsolete("CreateWaitHandle will be removed eventually. Please use \"new ManualResetEvent(false)\" instead.")]
  149.         protected virtual WaitHandle CreateWaitHandle()
  150.         {
  151.             return new ManualResetEvent(false);
  152.         }
  153.        
  154.         [HostProtection(ExternalThreading = true)]
  155.         public virtual IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
  156.         {
  157.             if (!CanRead)
  158.                 __Error.ReadNotSupported();
  159.            
  160.             // Increment the count to account for this async operation
  161.             BCLDebug.Assert(_asyncActiveCount >= 1, "ref counting mismatch, possible race in the code");
  162.             Interlocked.Increment(ref _asyncActiveCount);
  163.            
  164.             ReadDelegate d = new ReadDelegate(Read);
  165.            
  166.             // To avoid a race with a stream's position pointer & generating race
  167.             // conditions with internal buffer indexes in our own streams that
  168.             // don't natively support async IO operations when there are multiple
  169.             // async requests outstanding, we will block the application's main
  170.             // thread if it does a second IO request until the first one completes.
  171.             if (_asyncActiveEvent == null) {
  172.                 lock (this) {
  173.                     if (_asyncActiveEvent == null)
  174.                         _asyncActiveEvent = new AutoResetEvent(true);
  175.                 }
  176.             }
  177.             bool r = _asyncActiveEvent.WaitOne();
  178.             BCLDebug.Assert(r, "AutoResetEvent didn't get a signal when we called WaitOne!");
  179.            
  180.             BCLDebug.Assert(_readDelegate == null && _writeDelegate == null, "Expected no other readers or writers!");
  181.            
  182.             // Set delegate before we call BeginInvoke, to avoid a race
  183.             _readDelegate = d;
  184.             IAsyncResult asyncResult = d.BeginInvoke(buffer, offset, count, callback, state);
  185.            
  186.             return asyncResult;
  187.         }
  188.        
  189.         public virtual int EndRead(IAsyncResult asyncResult)
  190.         {
  191.             if (asyncResult == null)
  192.                 throw new ArgumentNullException("asyncResult");
  193.            
  194.             // Ideally we want to throw InvalidOperationException but for ECMA conformance we need to throw ArgExc instead.
  195.             if (_readDelegate == null)
  196.                 throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple"));
  197.            
  198.             int numRead = -1;
  199.             try {
  200.                 numRead = _readDelegate.EndInvoke(asyncResult);
  201.             }
  202.             finally {
  203.                 _readDelegate = null;
  204.                 _asyncActiveEvent.Set();
  205.                
  206.                 // Decrement the count to account for this async operation
  207.                 // and close the handle if no other IO is pending
  208.                 _CloseAsyncActiveEvent(Interlocked.Decrement(ref _asyncActiveCount));
  209.             }
  210.            
  211.             return numRead;
  212.         }
  213.        
  214.         [HostProtection(ExternalThreading = true)]
  215.         public virtual IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
  216.         {
  217.             if (!CanWrite)
  218.                 __Error.WriteNotSupported();
  219.            
  220.             // Increment the count to account for this async operation
  221.             BCLDebug.Assert(_asyncActiveCount >= 1, "ref counting mismatch, possible race in the code");
  222.             Interlocked.Increment(ref _asyncActiveCount);
  223.            
  224.             WriteDelegate d = new WriteDelegate(Write);
  225.            
  226.             // To avoid a race with a stream's position pointer & generating race
  227.             // conditions with internal buffer indexes in our own streams that
  228.             // don't natively support async IO operations when there are multiple
  229.             // async requests outstanding, we will block the application's main
  230.             // thread if it does a second IO request until the first one completes.
  231.             //Monitor.Enter(this);
  232.             if (_asyncActiveEvent == null) {
  233.                 lock (this) {
  234.                     if (_asyncActiveEvent == null)
  235.                         _asyncActiveEvent = new AutoResetEvent(true);
  236.                 }
  237.             }
  238.             bool r = _asyncActiveEvent.WaitOne();
  239.             BCLDebug.Assert(r, "AutoResetEvent didn't get a signal when we called WaitOne!");
  240.            
  241.             BCLDebug.Assert(_readDelegate == null && _writeDelegate == null, "Expected no other readers or writers!");
  242.            
  243.             // Set delegate before we call BeginInvoke, to avoid a race
  244.             _writeDelegate = d;
  245.             IAsyncResult asyncResult = d.BeginInvoke(buffer, offset, count, callback, state);
  246.             return asyncResult;
  247.         }
  248.        
  249.         public virtual void EndWrite(IAsyncResult asyncResult)
  250.         {
  251.             if (asyncResult == null)
  252.                 throw new ArgumentNullException("asyncResult");
  253.            
  254.             // Ideally we want to throw InvalidOperationException but for ECMA conformance we need to throw ArgExc instead.
  255.             if (_writeDelegate == null)
  256.                 throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple"));
  257.            
  258.             try {
  259.                 _writeDelegate.EndInvoke(asyncResult);
  260.             }
  261.             finally {
  262.                 _writeDelegate = null;
  263.                 _asyncActiveEvent.Set();
  264.                
  265.                 // Decrement the count to account for this async operation
  266.                 // and close the handle if no other IO is pending
  267.                 _CloseAsyncActiveEvent(Interlocked.Decrement(ref _asyncActiveCount));
  268.             }
  269.         }
  270.        
  271.         public abstract long Seek(long offset, SeekOrigin origin);
  272.        
  273.         public abstract void SetLength(long value);
  274.        
  275.         public abstract int Read(        [In(), Out()]
  276. byte[] buffer, int offset, int count);
  277.        
  278.         // Reads one byte from the stream by calling Read(byte[], int, int).
  279.         // Will return an unsigned byte cast to an int or -1 on end of stream.
  280.         // This implementation does not perform well because it allocates a new
  281.         // byte[] each time you call it, and should be overridden by any
  282.         // subclass that maintains an internal buffer. Then, it can help perf
  283.         // significantly for people who are reading one byte at a time.
  284.         public virtual int ReadByte()
  285.         {
  286.             byte[] oneByteArray = new byte[1];
  287.             int r = Read(oneByteArray, 0, 1);
  288.             if (r == 0)
  289.                 return -1;
  290.             return oneByteArray[0];
  291.         }
  292.        
  293.         public abstract void Write(byte[] buffer, int offset, int count);
  294.        
  295.         // Writes one byte from the stream by calling Write(byte[], int, int).
  296.         // This implementation does not perform well because it allocates a new
  297.         // byte[] each time you call it, and should be overridden by any
  298.         // subclass that maintains an internal buffer. Then, it can help perf
  299.         // significantly for people who are writing one byte at a time.
  300.         public virtual void WriteByte(byte value)
  301.         {
  302.             byte[] oneByteArray = new byte[1];
  303.             oneByteArray[0] = value;
  304.             Write(oneByteArray, 0, 1);
  305.         }
  306.        
  307.         [HostProtection(Synchronization = true)]
  308.         public static Stream Synchronized(Stream stream)
  309.         {
  310.             if (stream == null)
  311.                 throw new ArgumentNullException("stream");
  312.             if (stream is SyncStream)
  313.                 return stream;
  314.            
  315.             return new SyncStream(stream);
  316.         }
  317.        
  318.         [Serializable()]
  319.         private sealed class NullStream : Stream
  320.         {
  321.             internal NullStream()
  322.             {
  323.             }
  324.            
  325.             public override bool CanRead {
  326.                 get { return true; }
  327.             }
  328.            
  329.             public override bool CanWrite {
  330.                 get { return true; }
  331.             }
  332.            
  333.             public override bool CanSeek {
  334.                 get { return true; }
  335.             }
  336.            
  337.             public override long Length {
  338.                 get { return 0; }
  339.             }
  340.            
  341.             public override long Position {
  342.                 get { return 0; }
  343.                 set { }
  344.             }
  345.            
  346.             // No need to override Close
  347.            
  348.             public override void Flush()
  349.             {
  350.             }
  351.            
  352.             public override int Read(            [In(), Out()]
  353. byte[] buffer, int offset, int count)
  354.             {
  355.                 return 0;
  356.             }
  357.            
  358.             public override int ReadByte()
  359.             {
  360.                 return -1;
  361.             }
  362.            
  363.             public override void Write(byte[] buffer, int offset, int count)
  364.             {
  365.             }
  366.            
  367.             public override void WriteByte(byte value)
  368.             {
  369.             }
  370.            
  371.             public override long Seek(long offset, SeekOrigin origin)
  372.             {
  373.                 return 0;
  374.             }
  375.            
  376.             public override void SetLength(long length)
  377.             {
  378.             }
  379.         }
  380.        
  381.         // Used as the IAsyncResult object when using asynchronous IO methods
  382.         // on the base Stream class. Note I'm not using async delegates, so
  383.         // this is necessary.
  384.        
  385.         // SyncStream is a wrapper around a stream that takes
  386.         // a lock for every operation making it thread safe.
  387.        
  388.         [Serializable()]
  389.         internal sealed class SyncStream : Stream, IDisposable
  390.         {
  391.             private Stream _stream;
  392.            
  393.             internal SyncStream(Stream stream)
  394.             {
  395.                 if (stream == null)
  396.                     throw new ArgumentNullException("stream");
  397.                 _stream = stream;
  398.             }
  399.            
  400.             public override bool CanRead {
  401.                 get { return _stream.CanRead; }
  402.             }
  403.            
  404.             public override bool CanWrite {
  405.                 get { return _stream.CanWrite; }
  406.             }
  407.            
  408.             public override bool CanSeek {
  409.                 get { return _stream.CanSeek; }
  410.             }
  411.            
  412.             [ComVisible(false)]
  413.             public override bool CanTimeout {
  414.                 get { return _stream.CanTimeout; }
  415.             }
  416.            
  417.             public override long Length {
  418.                 get {
  419.                     lock (_stream) {
  420.                         return _stream.Length;
  421.                     }
  422.                 }
  423.             }
  424.            
  425.             public override long Position {
  426.                 get {
  427.                     lock (_stream) {
  428.                         return _stream.Position;
  429.                     }
  430.                 }
  431.                 set {
  432.                     lock (_stream) {
  433.                         _stream.Position = value;
  434.                     }
  435.                 }
  436.             }
  437.            
  438.             [ComVisible(false)]
  439.             public override int ReadTimeout {
  440.                 get { return _stream.ReadTimeout; }
  441.                 set { _stream.ReadTimeout = value; }
  442.             }
  443.            
  444.             [ComVisible(false)]
  445.             public override int WriteTimeout {
  446.                 get { return _stream.WriteTimeout; }
  447.                 set { _stream.WriteTimeout = value; }
  448.             }
  449.            
  450.             // In the off chance that some wrapped stream has different
  451.             // semantics for Close vs. Dispose, let's preserve that.
  452.             public override void Close()
  453.             {
  454.                 lock (_stream) {
  455.                     try {
  456.                         _stream.Close();
  457.                     }
  458.                     finally {
  459.                         base.Dispose(true);
  460.                     }
  461.                 }
  462.             }
  463.            
  464.             protected override void Dispose(bool disposing)
  465.             {
  466.                 lock (_stream) {
  467.                     try {
  468.                         // Explicitly pick up a potentially methodimpl'ed Dispose
  469.                         if (disposing)
  470.                             ((IDisposable)_stream).Dispose();
  471.                     }
  472.                     finally {
  473.                         base.Dispose(disposing);
  474.                     }
  475.                 }
  476.             }
  477.            
  478.             public override void Flush()
  479.             {
  480.                 lock (_stream)
  481.                     _stream.Flush();
  482.             }
  483.            
  484.             public override int Read(            [In(), Out()]
  485. byte[] bytes, int offset, int count)
  486.             {
  487.                 lock (_stream)
  488.                     return _stream.Read(bytes, offset, count);
  489.             }
  490.            
  491.             public override int ReadByte()
  492.             {
  493.                 lock (_stream)
  494.                     return _stream.ReadByte();
  495.             }
  496.            
  497.             [HostProtection(ExternalThreading = true)]
  498.             public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
  499.             {
  500.                 lock (_stream)
  501.                     return _stream.BeginRead(buffer, offset, count, callback, state);
  502.             }
  503.            
  504.             public override int EndRead(IAsyncResult asyncResult)
  505.             {
  506.                 lock (_stream)
  507.                     return _stream.EndRead(asyncResult);
  508.             }
  509.            
  510.             public override long Seek(long offset, SeekOrigin origin)
  511.             {
  512.                 lock (_stream)
  513.                     return _stream.Seek(offset, origin);
  514.             }
  515.            
  516.             public override void SetLength(long length)
  517.             {
  518.                 lock (_stream)
  519.                     _stream.SetLength(length);
  520.             }
  521.            
  522.             public override void Write(byte[] bytes, int offset, int count)
  523.             {
  524.                 lock (_stream)
  525.                     _stream.Write(bytes, offset, count);
  526.             }
  527.            
  528.             public override void WriteByte(byte b)
  529.             {
  530.                 lock (_stream)
  531.                     _stream.WriteByte(b);
  532.             }
  533.            
  534.             [HostProtection(ExternalThreading = true)]
  535.             public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
  536.             {
  537.                 lock (_stream)
  538.                     return _stream.BeginWrite(buffer, offset, count, callback, state);
  539.             }
  540.            
  541.             public override void EndWrite(IAsyncResult asyncResult)
  542.             {
  543.                 lock (_stream)
  544.                     _stream.EndWrite(asyncResult);
  545.             }
  546.         }
  547.     }
  548. }

Developer Fusion