The Labs \ Source Viewer \ SSCLI \ System.IO \ BufferedStream

  1. // ==++==
  2. //
  3. //
  4. // Copyright (c) 2006 Microsoft Corporation. All rights reserved.
  5. //
  6. // The use and distribution terms for this software are contained in the file
  7. // named license.txt, which can be found in the root of this distribution.
  8. // By using this software in any fashion, you are agreeing to be bound by the
  9. // terms of this license.
  10. //
  11. // You must not remove this notice, or any other, from this software.
  12. //
  13. //
  14. // ==--==
  15. /*============================================================
  16. **
  17. ** Class:  BufferedStream
  18. **
  19. ** Purpose: A composable Stream that buffers reads & writes
  20. ** to the underlying stream.
  21. **
  22. **
  23. ===========================================================*/
  24. using System;
  25. using System.Runtime.InteropServices;
  26. using System.Globalization;
  27. namespace System.IO
  28. {
  29.     //
  30.     //
  31.     //
  32.     [ComVisible(true)]
  33.     public sealed class BufferedStream : Stream
  34.     {
  35.         private Stream _s;
  36.         // Underlying stream. Close sets _s to null.
  37.         private byte[] _buffer;
  38.         // Shared read/write buffer. Alloc on first use.
  39.         private int _readPos;
  40.         // Read pointer within shared buffer.
  41.         private int _readLen;
  42.         // Number of bytes read in buffer from _s.
  43.         private int _writePos;
  44.         // Write pointer within shared buffer.
  45.         private int _bufferSize;
  46.         // Length of internal buffer, if it's allocated.
  47.        
  48.         private const int _DefaultBufferSize = 4096;
  49.        
  50.         private BufferedStream()
  51.         {
  52.         }
  53.        
  54.         public BufferedStream(Stream stream) : this(stream, _DefaultBufferSize)
  55.         {
  56.         }
  57.        
  58.         public BufferedStream(Stream stream, int bufferSize)
  59.         {
  60.             if (stream == null)
  61.                 throw new ArgumentNullException("stream");
  62.             if (bufferSize <= 0)
  63.                 throw new ArgumentOutOfRangeException("bufferSize", String.Format(CultureInfo.CurrentCulture, Environment.GetResourceString("ArgumentOutOfRange_MustBePositive"), "bufferSize"));
  64.             BCLDebug.Perf(!(stream is FileStream), "FileStream is buffered - don't wrap it in a BufferedStream");
  65.             BCLDebug.Perf(!(stream is MemoryStream), "MemoryStream shouldn't be wrapped in a BufferedStream!");
  66.            
  67.             _s = stream;
  68.             _bufferSize = bufferSize;
  69.             // Allocate _buffer on its first use - it will not be used if all reads
  70.             // & writes are greater than or equal to buffer size.
  71.             if (!_s.CanRead && !_s.CanWrite)
  72.                 __Error.StreamIsClosed();
  73.         }
  74.        
  75.         public override bool CanRead {
  76.             get { return _s != null && _s.CanRead; }
  77.         }
  78.        
  79.         public override bool CanWrite {
  80.             get { return _s != null && _s.CanWrite; }
  81.         }
  82.        
  83.         public override bool CanSeek {
  84.             get { return _s != null && _s.CanSeek; }
  85.         }
  86.        
  87.         public override long Length {
  88.             get {
  89.                 if (_s == null)
  90.                     __Error.StreamIsClosed();
  91.                 if (_writePos > 0)
  92.                     FlushWrite();
  93.                 return _s.Length;
  94.             }
  95.         }
  96.        
  97.         public override long Position {
  98.             get {
  99.                 if (_s == null)
  100.                     __Error.StreamIsClosed();
  101.                 if (!_s.CanSeek)
  102.                     __Error.SeekNotSupported();
  103.                 // return _s.Seek(0, SeekOrigin.Current) + (_readPos + _writePos - _readLen);
  104.                 return _s.Position + (_readPos - _readLen + _writePos);
  105.             }
  106.             set {
  107.                 if (value < 0)
  108.                     throw new ArgumentOutOfRangeException("value", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
  109.                 if (_s == null)
  110.                     __Error.StreamIsClosed();
  111.                 if (!_s.CanSeek)
  112.                     __Error.SeekNotSupported();
  113.                 if (_writePos > 0)
  114.                     FlushWrite();
  115.                 _readPos = 0;
  116.                 _readLen = 0;
  117.                 _s.Seek(value, SeekOrigin.Begin);
  118.             }
  119.         }
  120.        
  121.         protected override void Dispose(bool disposing)
  122.         {
  123.             try {
  124.                 if (disposing && _s != null) {
  125.                     try {
  126.                         Flush();
  127.                     }
  128.                     finally {
  129.                         _s.Close();
  130.                     }
  131.                 }
  132.             }
  133.             finally {
  134.                 _s = null;
  135.                 _buffer = null;
  136.                
  137.                 // Call base.Dispose(bool) to cleanup async IO resources
  138.                 base.Dispose(disposing);
  139.             }
  140.         }
  141.        
  142.         public override void Flush()
  143.         {
  144.             if (_s == null)
  145.                 __Error.StreamIsClosed();
  146.             if (_writePos > 0) {
  147.                 FlushWrite();
  148.             }
  149.             else if (_readPos < _readLen && _s.CanSeek) {
  150.                 FlushRead();
  151.             }
  152.         }
  153.        
  154.         // Reading is done by blocks from the file, but someone could read
  155.         // 1 byte from the buffer then write. At that point, the OS's file
  156.         // pointer is out of sync with the stream's position. All write
  157.         // functions should call this function to preserve the position in the file.
  158.         private void FlushRead()
  159.         {
  160.             BCLDebug.Assert(_writePos == 0, "BufferedStream: Write buffer must be empty in FlushRead!");
  161.             if (_readPos - _readLen != 0)
  162.                 _s.Seek(_readPos - _readLen, SeekOrigin.Current);
  163.             _readPos = 0;
  164.             _readLen = 0;
  165.         }
  166.        
  167.         // Writes are buffered. Anytime the buffer fills up
  168.         // (_writePos + delta > _bufferSize) or the buffer switches to reading
  169.         // and there is dirty data (_writePos > 0), this function must be called.
  170.         private void FlushWrite()
  171.         {
  172.             BCLDebug.Assert(_readPos == 0 && _readLen == 0, "BufferedStream: Read buffer must be empty in FlushWrite!");
  173.             _s.Write(_buffer, 0, _writePos);
  174.             _writePos = 0;
  175.             _s.Flush();
  176.         }
  177.        
  178.         public override int Read(        [In(), Out()]
  179. byte[] array, int offset, int count)
  180.         {
  181.             if (array == null)
  182.                 throw new ArgumentNullException("array", Environment.GetResourceString("ArgumentNull_Buffer"));
  183.             if (offset < 0)
  184.                 throw new ArgumentOutOfRangeException("offset", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
  185.             if (count < 0)
  186.                 throw new ArgumentOutOfRangeException("count", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
  187.             if (array.Length - offset < count)
  188.                 throw new ArgumentException(Environment.GetResourceString("Argument_InvalidOffLen"));
  189.            
  190.             if (_s == null)
  191.                 __Error.StreamIsClosed();
  192.            
  193.             int n = _readLen - _readPos;
  194.             // if the read buffer is empty, read into either user's array or our
  195.             // buffer, depending on number of bytes user asked for and buffer size.
  196.             if (n == 0) {
  197.                 if (!_s.CanRead)
  198.                     __Error.ReadNotSupported();
  199.                 if (_writePos > 0)
  200.                     FlushWrite();
  201.                 if (count >= _bufferSize) {
  202.                     n = _s.Read(array, offset, count);
  203.                     // Throw away read buffer.
  204.                     _readPos = 0;
  205.                     _readLen = 0;
  206.                     return n;
  207.                 }
  208.                 if (_buffer == null)
  209.                     _buffer = new byte[_bufferSize];
  210.                 n = _s.Read(_buffer, 0, _bufferSize);
  211.                 if (n == 0)
  212.                     return 0;
  213.                 _readPos = 0;
  214.                 _readLen = n;
  215.             }
  216.             // Now copy min of count or numBytesAvailable (ie, near EOF) to array.
  217.             if (n > count)
  218.                 n = count;
  219.             Buffer.InternalBlockCopy(_buffer, _readPos, array, offset, n);
  220.             _readPos += n;
  221.            
  222.            
  223.             if (n < count) {
  224.                 int moreBytesRead = _s.Read(array, offset + n, count - n);
  225.                 n += moreBytesRead;
  226.                 _readPos = 0;
  227.                 _readLen = 0;
  228.             }
  229.            
  230.             return n;
  231.         }
  232.        
  233.         // Reads a byte from the underlying stream. Returns the byte cast to an int
  234.         // or -1 if reading from the end of the stream.
  235.         public override int ReadByte()
  236.         {
  237.             if (_s == null)
  238.                 __Error.StreamIsClosed();
  239.             if (_readLen == 0 && !_s.CanRead)
  240.                 __Error.ReadNotSupported();
  241.             if (_readPos == _readLen) {
  242.                 if (_writePos > 0)
  243.                     FlushWrite();
  244.                 if (_buffer == null)
  245.                     _buffer = new byte[_bufferSize];
  246.                 _readLen = _s.Read(_buffer, 0, _bufferSize);
  247.                 _readPos = 0;
  248.             }
  249.             if (_readPos == _readLen)
  250.                 return -1;
  251.            
  252.             return _buffer[_readPos++];
  253.         }
  254.        
  255.         public override void Write(byte[] array, int offset, int count)
  256.         {
  257.             if (array == null)
  258.                 throw new ArgumentNullException("array", Environment.GetResourceString("ArgumentNull_Buffer"));
  259.             if (offset < 0)
  260.                 throw new ArgumentOutOfRangeException("offset", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
  261.             if (count < 0)
  262.                 throw new ArgumentOutOfRangeException("count", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
  263.             if (array.Length - offset < count)
  264.                 throw new ArgumentException(Environment.GetResourceString("Argument_InvalidOffLen"));
  265.            
  266.             if (_s == null)
  267.                 __Error.StreamIsClosed();
  268.             if (_writePos == 0) {
  269.                 // Ensure we can write to the stream, and ready buffer for writing.
  270.                 if (!_s.CanWrite)
  271.                     __Error.WriteNotSupported();
  272.                 if (_readPos < _readLen)
  273.                     FlushRead();
  274.                 else {
  275.                     _readPos = 0;
  276.                     _readLen = 0;
  277.                 }
  278.             }
  279.            
  280.             // If our buffer has data in it, copy data from the user's array into
  281.             // the buffer, and if we can fit it all there, return. Otherwise, write
  282.             // the buffer to disk and copy any remaining data into our buffer.
  283.             // The assumption here is memcpy is cheaper than disk (or net) IO.
  284.             // (10 milliseconds to disk vs. ~20-30 microseconds for a 4K memcpy)
  285.             // So the extra copying will reduce the total number of writes, in
  286.             // non-pathological cases (ie, write 1 byte, then write for the buffer
  287.             // size repeatedly)
  288.             if (_writePos > 0) {
  289.                 int numBytes = _bufferSize - _writePos;
  290.                 // space left in buffer
  291.                 if (numBytes > 0) {
  292.                     if (numBytes > count)
  293.                         numBytes = count;
  294.                     Buffer.InternalBlockCopy(array, offset, _buffer, _writePos, numBytes);
  295.                     _writePos += numBytes;
  296.                     if (count == numBytes)
  297.                         return;
  298.                     offset += numBytes;
  299.                     count -= numBytes;
  300.                 }
  301.                 // Reset our buffer. We essentially want to call FlushWrite
  302.                 // without calling Flush on the underlying Stream.
  303.                 _s.Write(_buffer, 0, _writePos);
  304.                 _writePos = 0;
  305.             }
  306.             // If the buffer would slow writes down, avoid buffer completely.
  307.             if (count >= _bufferSize) {
  308.                 BCLDebug.Assert(_writePos == 0, "BufferedStream cannot have buffered data to write here! Your stream will be corrupted.");
  309.                 _s.Write(array, offset, count);
  310.                 return;
  311.             }
  312.             else if (count == 0)
  313.                 return;
  314.             // Don't allocate a buffer then call memcpy for 0 bytes.
  315.             if (_buffer == null)
  316.                 _buffer = new byte[_bufferSize];
  317.             // Copy remaining bytes into buffer, to write at a later date.
  318.             Buffer.InternalBlockCopy(array, offset, _buffer, 0, count);
  319.             _writePos = count;
  320.         }
  321.        
  322.         public override void WriteByte(byte value)
  323.         {
  324.             if (_s == null)
  325.                 __Error.StreamIsClosed();
  326.             if (_writePos == 0) {
  327.                 if (!_s.CanWrite)
  328.                     __Error.WriteNotSupported();
  329.                 if (_readPos < _readLen)
  330.                     FlushRead();
  331.                 else {
  332.                     _readPos = 0;
  333.                     _readLen = 0;
  334.                 }
  335.                 if (_buffer == null)
  336.                     _buffer = new byte[_bufferSize];
  337.             }
  338.             if (_writePos == _bufferSize)
  339.                 FlushWrite();
  340.            
  341.             _buffer[_writePos++] = value;
  342.         }
  343.        
  344.        
  345.         public override long Seek(long offset, SeekOrigin origin)
  346.         {
  347.             if (_s == null)
  348.                 __Error.StreamIsClosed();
  349.             if (!_s.CanSeek)
  350.                 __Error.SeekNotSupported();
  351.             // If we've got bytes in our buffer to write, write them out.
  352.             // If we've read in and consumed some bytes, we'll have to adjust
  353.             // our seek positions ONLY IF we're seeking relative to the current
  354.             // position in the stream.
  355.             BCLDebug.Assert(_readPos <= _readLen, "_readPos <= _readLen");
  356.             if (_writePos > 0) {
  357.                 FlushWrite();
  358.             }
  359.             else if (origin == SeekOrigin.Current) {
  360.                 // Don't call FlushRead here, which would have caused an infinite
  361.                 // loop. Simply adjust the seek origin. This isn't necessary
  362.                 // if we're seeking relative to the beginning or end of the stream.
  363.                 BCLDebug.Assert(_readLen - _readPos >= 0, "_readLen (" + _readLen + ") - _readPos (" + _readPos + ") >= 0");
  364.                 offset -= (_readLen - _readPos);
  365.             }
  366.             /*
  367.             _readPos = 0;
  368.             _readLen = 0;
  369.             return _s.Seek(offset, origin);
  370.             */           
  371. long oldPos = _s.Position + (_readPos - _readLen);
  372.             long pos = _s.Seek(offset, origin);
  373.            
  374.             // We now must update the read buffer. We can in some cases simply
  375.             // update _readPos within the buffer, copy around the buffer so our
  376.             // Position property is still correct, and avoid having to do more
  377.             // reads from the disk. Otherwise, discard the buffer's contents.
  378.             if (_readLen > 0) {
  379.                 // We can optimize the following condition:
  380.                 // oldPos - _readPos <= pos < oldPos + _readLen - _readPos
  381.                 if (oldPos == pos) {
  382.                     if (_readPos > 0) {
  383.                         //Console.WriteLine("Seek: seeked for 0, adjusting buffer back by: "+_readPos+" _readLen: "+_readLen);
  384.                         Buffer.InternalBlockCopy(_buffer, _readPos, _buffer, 0, _readLen - _readPos);
  385.                         _readLen -= _readPos;
  386.                         _readPos = 0;
  387.                     }
  388.                     // If we still have buffered data, we must update the stream's
  389.                     // position so our Position property is correct.
  390.                     if (_readLen > 0)
  391.                         _s.Seek(_readLen, SeekOrigin.Current);
  392.                 }
  393.                 else if (oldPos - _readPos < pos && pos < oldPos + _readLen - _readPos) {
  394.                     int diff = (int)(pos - oldPos);
  395.                     //Console.WriteLine("Seek: diff was "+diff+", readpos was "+_readPos+" adjusting buffer - shrinking by "+ (_readPos + diff));
  396.                     Buffer.InternalBlockCopy(_buffer, _readPos + diff, _buffer, 0, _readLen - (_readPos + diff));
  397.                     _readLen -= (_readPos + diff);
  398.                     _readPos = 0;
  399.                     if (_readLen > 0)
  400.                         _s.Seek(_readLen, SeekOrigin.Current);
  401.                 }
  402.                 else {
  403.                     // Lose the read buffer.
  404.                     _readPos = 0;
  405.                     _readLen = 0;
  406.                 }
  407.                 BCLDebug.Assert(_readLen >= 0 && _readPos <= _readLen, "_readLen should be nonnegative, and _readPos should be less than or equal _readLen");
  408.                 BCLDebug.Assert(pos == Position, "Seek optimization: pos != Position! Buffer math was mangled.");
  409.             }
  410.             return pos;
  411.         }
  412.        
  413.         public override void SetLength(long value)
  414.         {
  415.             if (value < 0)
  416.                 throw new ArgumentOutOfRangeException("value", Environment.GetResourceString("ArgumentOutOfRange_NegFileSize"));
  417.             if (_s == null)
  418.                 __Error.StreamIsClosed();
  419.             if (!_s.CanSeek)
  420.                 __Error.SeekNotSupported();
  421.             if (!_s.CanWrite)
  422.                 __Error.WriteNotSupported();
  423.             if (_writePos > 0) {
  424.                 FlushWrite();
  425.             }
  426.             else if (_readPos < _readLen) {
  427.                 FlushRead();
  428.             }
  429.             _s.SetLength(value);
  430.         }
  431.     }
  432. }

Developer Fusion