The Labs \ Source Viewer \ SSCLI \ System.IO.Compression \ DeflateStreamAsyncResult

  1. //------------------------------------------------------------------------------
  2. // <copyright file="DeflateStream.cs" company="Microsoft">
  3. //
  4. // Copyright (c) 2006 Microsoft Corporation. All rights reserved.
  5. //
  6. // The use and distribution terms for this software are contained in the file
  7. // named license.txt, which can be found in the root of this distribution.
  8. // By using this software in any fashion, you are agreeing to be bound by the
  9. // terms of this license.
  10. //
  11. // You must not remove this notice, or any other, from this software.
  12. //
  13. // </copyright>
  14. //------------------------------------------------------------------------------
  15. namespace System.IO.Compression
  16. {
  17.     using System.IO;
  18.     using System.Diagnostics;
  19.     using System.Threading;
  20.     using System.Security.Permissions;
  21.    
  22.     public class DeflateStream : Stream
  23.     {
  24.         private const int bufferSize = 4096;
  25.        
  26.         internal delegate void AsyncWriteDelegate(byte[] array, int offset, int count, bool isAsync);
  27.        
  28.         private Stream _stream;
  29.         private CompressionMode _mode;
  30.         private bool _leaveOpen;
  31.         private Inflater inflater;
  32.         private Deflater deflater;
  33.         private byte[] buffer;
  34.        
  35.         private int asyncOperations;
  36.         private readonly AsyncCallback m_CallBack;
  37.         private readonly AsyncWriteDelegate m_AsyncWriterDelegate;
  38.        
  39.         public DeflateStream(Stream stream, CompressionMode mode) : this(stream, mode, false, false)
  40.         {
  41.         }
  42.        
  43.         public DeflateStream(Stream stream, CompressionMode mode, bool leaveOpen) : this(stream, mode, leaveOpen, false)
  44.         {
  45.         }
  46.        
  47.         internal DeflateStream(Stream stream, CompressionMode mode, bool leaveOpen, bool usingGZip)
  48.         {
  49.             _stream = stream;
  50.             _mode = mode;
  51.             _leaveOpen = leaveOpen;
  52.            
  53.             if (_stream == null) {
  54.                 throw new ArgumentNullException("stream");
  55.             }
  56.            
  57.             switch (_mode) {
  58.                 case CompressionMode.Decompress:
  59.                     if (!(_stream.CanRead)) {
  60.                         throw new ArgumentException(SR.GetString(SR.NotReadableStream), "stream");
  61.                     }
  62.                     inflater = new Inflater(usingGZip);
  63.                     m_CallBack = new AsyncCallback(ReadCallback);
  64.                     break;
  65.                 case CompressionMode.Compress:
  66.                    
  67.                     if (!(_stream.CanWrite)) {
  68.                         throw new ArgumentException(SR.GetString(SR.NotWriteableStream), "stream");
  69.                     }
  70.                     deflater = new Deflater(usingGZip);
  71.                     m_AsyncWriterDelegate = new AsyncWriteDelegate(this.InternalWrite);
  72.                     m_CallBack = new AsyncCallback(WriteCallback);
  73.                     break;
  74.                 default:
  75.                    
  76.                     throw new ArgumentException(SR.GetString(SR.ArgumentOutOfRange_Enum), "mode");
  77.                     break;
  78.             }
  79.             buffer = new byte[bufferSize];
  80.         }
  81.        
  82.         public override bool CanRead {
  83.             get {
  84.                 if (_stream == null) {
  85.                     return false;
  86.                 }
  87.                
  88.                 return (_mode == CompressionMode.Decompress && _stream.CanRead);
  89.             }
  90.         }
  91.        
  92.         public override bool CanWrite {
  93.             get {
  94.                 if (_stream == null) {
  95.                     return false;
  96.                 }
  97.                
  98.                 return (_mode == CompressionMode.Compress && _stream.CanWrite);
  99.             }
  100.         }
  101.        
  102.         public override bool CanSeek {
  103.             get { return false; }
  104.         }
  105.        
  106.         public override long Length {
  107.             get {
  108.                 throw new NotSupportedException(SR.GetString(SR.NotSupported));
  109.             }
  110.         }
  111.        
  112.         public override long Position {
  113.             get {
  114.                 throw new NotSupportedException(SR.GetString(SR.NotSupported));
  115.             }
  116.            
  117.             set {
  118.                 throw new NotSupportedException(SR.GetString(SR.NotSupported));
  119.             }
  120.         }
  121.        
  122.         public override void Flush()
  123.         {
  124.             if (_stream == null) {
  125.                 throw new ObjectDisposedException(null, SR.GetString(SR.ObjectDisposed_StreamClosed));
  126.             }
  127.             return;
  128.         }
  129.        
  130.         public override long Seek(long offset, SeekOrigin origin)
  131.         {
  132.             throw new NotSupportedException(SR.GetString(SR.NotSupported));
  133.         }
  134.        
  135.         public override void SetLength(long value)
  136.         {
  137.             throw new NotSupportedException(SR.GetString(SR.NotSupported));
  138.         }
  139.        
  140.         public override int Read(byte[] array, int offset, int count)
  141.         {
  142.             EnsureDecompressionMode();
  143.             ValidateParameters(array, offset, count);
  144.            
  145.             int bytesRead;
  146.             int currentOffest = offset;
  147.             int remainingCount = count;
  148.            
  149.             while (true) {
  150.                 bytesRead = inflater.Inflate(array, currentOffest, remainingCount);
  151.                 currentOffest += bytesRead;
  152.                 remainingCount -= bytesRead;
  153.                
  154.                 if (remainingCount == 0) {
  155.                     break;
  156.                 }
  157.                
  158.                 if (inflater.Finished()) {
  159.                     // if we finished decompressing, we can't have anything left in the outputwindow.
  160.                     Debug.Assert(inflater.AvailableOutput == 0, "We should have copied all stuff out!");
  161.                     break;
  162.                 }
  163.                
  164.                 Debug.Assert(inflater.NeedsInput(), "We can only run into this case if we are short of input");
  165.                
  166.                 int bytes = _stream.Read(buffer, 0, buffer.Length);
  167.                 if (bytes == 0) {
  168.                     break;
  169.                     //Do we want to throw an exception here?
  170.                 }
  171.                
  172.                 inflater.SetInput(buffer, 0, bytes);
  173.             }
  174.            
  175.             return count - remainingCount;
  176.         }
  177.        
  178.         private void ValidateParameters(byte[] array, int offset, int count)
  179.         {
  180.             if (array == null) {
  181.                 throw new ArgumentNullException("array");
  182.             }
  183.            
  184.             if (offset < 0) {
  185.                 throw new ArgumentOutOfRangeException("offset");
  186.             }
  187.            
  188.             if (count < 0) {
  189.                 throw new ArgumentOutOfRangeException("count");
  190.             }
  191.            
  192.             if (array.Length - offset < count) {
  193.                 throw new ArgumentException(SR.GetString(SR.InvalidArgumentOffsetCount));
  194.             }
  195.            
  196.             if (_stream == null) {
  197.                 throw new ObjectDisposedException(null, SR.GetString(SR.ObjectDisposed_StreamClosed));
  198.             }
  199.         }
  200.        
  201.         private void EnsureDecompressionMode()
  202.         {
  203.             if (_mode != CompressionMode.Decompress) {
  204.                 throw new InvalidOperationException(SR.GetString(SR.CannotReadFromDeflateStream));
  205.             }
  206.         }
  207.        
  208.         private void EnsureCompressionMode()
  209.         {
  210.             if (_mode != CompressionMode.Compress) {
  211.                 throw new InvalidOperationException(SR.GetString(SR.CannotWriteToDeflateStream));
  212.             }
  213.         }
  214.        
  215.         [HostProtection(ExternalThreading = true)]
  216.         public override IAsyncResult BeginRead(byte[] array, int offset, int count, AsyncCallback asyncCallback, object asyncState)
  217.         {
  218.             EnsureDecompressionMode();
  219.             if (asyncOperations != 0) {
  220.                 throw new InvalidOperationException(SR.GetString(SR.InvalidBeginCall));
  221.             }
  222.             Interlocked.Increment(ref asyncOperations);
  223.            
  224.             try {
  225.                 ValidateParameters(array, offset, count);
  226.                
  227.                 DeflateStreamAsyncResult userResult = new DeflateStreamAsyncResult(this, asyncState, asyncCallback, array, offset, count);
  228.                 userResult.isWrite = false;
  229.                
  230.                 // Try to read decompressed data in output buffer
  231.                 int bytesRead = inflater.Inflate(array, offset, count);
  232.                 if (bytesRead != 0) {
  233.                     // If decompression output buffer is not empty, return immediately.
  234.                     // 'true' means we complete synchronously.
  235.                     userResult.InvokeCallback(true, (object)bytesRead);
  236.                     return userResult;
  237.                 }
  238.                
  239.                 if (inflater.Finished()) {
  240.                     // end of compression stream
  241.                     userResult.InvokeCallback(true, (object)0);
  242.                     return userResult;
  243.                 }
  244.                
  245.                 // If there is no data on the output buffer and we are not at
  246.                 // the end of the stream, we need to get more data from the base stream
  247.                 _stream.BeginRead(buffer, 0, buffer.Length, m_CallBack, userResult);
  248.                 userResult.m_CompletedSynchronously &= userResult.IsCompleted;
  249.                
  250.                 return userResult;
  251.             }
  252.             catch {
  253.                 Interlocked.Decrement(ref asyncOperations);
  254.                 throw;
  255.             }
  256.         }
  257.        
  258.         // callback function for asynchrous reading on base stream
  259.         private void ReadCallback(IAsyncResult baseStreamResult)
  260.         {
  261.             DeflateStreamAsyncResult outerResult = (DeflateStreamAsyncResult)baseStreamResult.AsyncState;
  262.             outerResult.m_CompletedSynchronously &= baseStreamResult.CompletedSynchronously;
  263.             int bytesRead = 0;
  264.            
  265.             try {
  266.                 bytesRead = _stream.EndRead(baseStreamResult);
  267.             }
  268.             catch (Exception exc) {
  269.                 // Defer throwing this until EndXxx where we are ensured of user code on the stack.
  270.                 outerResult.InvokeCallback(exc);
  271.                 return;
  272.             }
  273.            
  274.             if (bytesRead <= 0) {
  275.                 // This indicates the base stream has received EOF
  276.                 outerResult.InvokeCallback((object)0);
  277.                 return;
  278.             }
  279.            
  280.             // Feed the data from base stream into decompression engine
  281.             inflater.SetInput(buffer, 0, bytesRead);
  282.             bytesRead = inflater.Inflate(outerResult.buffer, outerResult.offset, outerResult.count);
  283.             if (bytesRead == 0 && !inflater.Finished()) {
  284.                 // We could have read in head information and didn't get any data.
  285.                 // Read from the base stream again.
  286.                 // Need to solve recusion.
  287.                 _stream.BeginRead(buffer, 0, buffer.Length, m_CallBack, outerResult);
  288.             }
  289.             else {
  290.                 outerResult.InvokeCallback((object)bytesRead);
  291.             }
  292.         }
  293.        
  294.         public override int EndRead(IAsyncResult asyncResult)
  295.         {
  296.             EnsureDecompressionMode();
  297.            
  298.             if (asyncOperations != 1) {
  299.                 throw new InvalidOperationException(SR.GetString(SR.InvalidEndCall));
  300.             }
  301.            
  302.             if (asyncResult == null) {
  303.                 throw new ArgumentNullException("asyncResult");
  304.             }
  305.            
  306.             if (_stream == null) {
  307.                 throw new InvalidOperationException(SR.GetString(SR.ObjectDisposed_StreamClosed));
  308.             }
  309.            
  310.             DeflateStreamAsyncResult myResult = asyncResult as DeflateStreamAsyncResult;
  311.            
  312.             if (myResult == null) {
  313.                 throw new ArgumentNullException("asyncResult");
  314.             }
  315.            
  316.             try {
  317.                 if (!myResult.IsCompleted) {
  318.                     myResult.AsyncWaitHandle.WaitOne();
  319.                 }
  320.             }
  321.             finally {
  322.                 Interlocked.Decrement(ref asyncOperations);
  323.                 // this will just close the wait handle
  324.                 myResult.Close();
  325.             }
  326.            
  327.             if (myResult.Result is Exception) {
  328.                 throw (Exception)(myResult.Result);
  329.             }
  330.            
  331.             return (int)myResult.Result;
  332.         }
  333.        
  334.        
  335.         public override void Write(byte[] array, int offset, int count)
  336.         {
  337.             EnsureCompressionMode();
  338.             ValidateParameters(array, offset, count);
  339.             InternalWrite(array, offset, count, false);
  340.         }
  341.        
  342.         internal void InternalWrite(byte[] array, int offset, int count, bool isAsync)
  343.         {
  344.             int currentOffest = offset;
  345.             int remainingCount = count;
  346.             int bytesCompressed;
  347.            
  348.             // compressed the bytes we already passed to the deflater
  349.             while (!deflater.NeedsInput()) {
  350.                 bytesCompressed = deflater.GetDeflateOutput(buffer);
  351.                 if (bytesCompressed != 0) {
  352.                     if (isAsync) {
  353.                         IAsyncResult result = _stream.BeginWrite(buffer, 0, bytesCompressed, null, null);
  354.                         _stream.EndWrite(result);
  355.                     }
  356.                     else
  357.                         _stream.Write(buffer, 0, bytesCompressed);
  358.                 }
  359.             }
  360.            
  361.             deflater.SetInput(array, offset, count);
  362.            
  363.             // compressed the new input
  364.             while (!deflater.NeedsInput()) {
  365.                 bytesCompressed = deflater.GetDeflateOutput(buffer);
  366.                 if (bytesCompressed != 0) {
  367.                     if (isAsync) {
  368.                         IAsyncResult result = _stream.BeginWrite(buffer, 0, bytesCompressed, null, null);
  369.                         _stream.EndWrite(result);
  370.                     }
  371.                     else
  372.                         _stream.Write(buffer, 0, bytesCompressed);
  373.                 }
  374.             }
  375.         }
  376.        
  377.         protected override void Dispose(bool disposing)
  378.         {
  379.             try {
  380.                 // Flush on the underlying stream can throw (ex., low disk space)
  381.                 if (disposing && _stream != null) {
  382.                     Flush();
  383.                    
  384.                     // Need to do close the output stream in compression mode
  385.                     if (_mode == CompressionMode.Compress && _stream != null) {
  386.                         int bytesCompressed;
  387.                         // compress any bytes left.
  388.                         while (!deflater.NeedsInput()) {
  389.                             bytesCompressed = deflater.GetDeflateOutput(buffer);
  390.                             if (bytesCompressed != 0) {
  391.                                 _stream.Write(buffer, 0, bytesCompressed);
  392.                             }
  393.                         }
  394.                        
  395.                         // Write the end of compressed stream data.
  396.                         // We can safely do this since the buffer is large enough.
  397.                         bytesCompressed = deflater.Finish(buffer);
  398.                        
  399.                         if (bytesCompressed > 0)
  400.                             _stream.Write(buffer, 0, bytesCompressed);
  401.                     }
  402.                 }
  403.             }
  404.             finally {
  405.                 try {
  406.                     // Attempt to close the stream even if there was an IO error from Flushing.
  407.                     // Note that Stream.Close() can potentially throw here (may or may not be
  408.                     // due to the same Flush error). In this case, we still need to ensure
  409.                     // cleaning up internal resources, hence the finally block.
  410.                     if (disposing && !_leaveOpen && _stream != null) {
  411.                         _stream.Close();
  412.                     }
  413.                 }
  414.                 finally {
  415.                     _stream = null;
  416.                     base.Dispose(disposing);
  417.                 }
  418.             }
  419.         }
  420.        
  421.        
  422.         [HostProtection(ExternalThreading = true)]
  423.         public override IAsyncResult BeginWrite(byte[] array, int offset, int count, AsyncCallback asyncCallback, object asyncState)
  424.         {
  425.             EnsureCompressionMode();
  426.             if (asyncOperations != 0) {
  427.                 throw new InvalidOperationException(SR.GetString(SR.InvalidBeginCall));
  428.             }
  429.             Interlocked.Increment(ref asyncOperations);
  430.            
  431.             try {
  432.                 ValidateParameters(array, offset, count);
  433.                
  434.                 DeflateStreamAsyncResult userResult = new DeflateStreamAsyncResult(this, asyncState, asyncCallback, array, offset, count);
  435.                 userResult.isWrite = true;
  436.                
  437.                 m_AsyncWriterDelegate.BeginInvoke(array, offset, count, true, m_CallBack, userResult);
  438.                 userResult.m_CompletedSynchronously &= userResult.IsCompleted;
  439.                
  440.                 return userResult;
  441.             }
  442.             catch {
  443.                 Interlocked.Decrement(ref asyncOperations);
  444.                 throw;
  445.             }
  446.         }
  447.        
  448.         // callback function for asynchrous reading on base stream
  449.         private void WriteCallback(IAsyncResult asyncResult)
  450.         {
  451.             DeflateStreamAsyncResult outerResult = (DeflateStreamAsyncResult)asyncResult.AsyncState;
  452.             outerResult.m_CompletedSynchronously &= asyncResult.CompletedSynchronously;
  453.            
  454.             try {
  455.                 m_AsyncWriterDelegate.EndInvoke(asyncResult);
  456.             }
  457.             catch (Exception exc) {
  458.                 // Defer throwing this until EndXxx where we are ensured of user code on the stack.
  459.                 outerResult.InvokeCallback(exc);
  460.                 return;
  461.             }
  462.             outerResult.InvokeCallback(null);
  463.         }
  464.        
  465.         public override void EndWrite(IAsyncResult asyncResult)
  466.         {
  467.             EnsureCompressionMode();
  468.            
  469.             if (asyncOperations != 1) {
  470.                 throw new InvalidOperationException(SR.GetString(SR.InvalidEndCall));
  471.             }
  472.            
  473.             if (asyncResult == null) {
  474.                 throw new ArgumentNullException("asyncResult");
  475.             }
  476.            
  477.             if (_stream == null) {
  478.                 throw new InvalidOperationException(SR.GetString(SR.ObjectDisposed_StreamClosed));
  479.             }
  480.            
  481.             DeflateStreamAsyncResult myResult = asyncResult as DeflateStreamAsyncResult;
  482.            
  483.             if (myResult == null) {
  484.                 throw new ArgumentNullException("asyncResult");
  485.             }
  486.            
  487.             try {
  488.                 if (!myResult.IsCompleted) {
  489.                     myResult.AsyncWaitHandle.WaitOne();
  490.                 }
  491.             }
  492.             finally {
  493.                 Interlocked.Decrement(ref asyncOperations);
  494.                 // this will just close the wait handle
  495.                 myResult.Close();
  496.             }
  497.            
  498.             if (myResult.Result is Exception) {
  499.                 throw (Exception)(myResult.Result);
  500.             }
  501.         }
  502.        
  503.        
  504.         public Stream BaseStream {
  505.             get { return _stream; }
  506.         }
  507.     }
  508.    
  509.    
  510.     internal class DeflateStreamAsyncResult : IAsyncResult
  511.     {
  512.         public byte[] buffer;
  513.         public int offset;
  514.         public int count;
  515.         public bool isWrite;
  516.        
  517.         private object m_AsyncObject;
  518.         // Caller's async object.
  519.         private object m_AsyncState;
  520.         // Caller's state object.
  521.         private AsyncCallback m_AsyncCallback;
  522.         // Caller's callback method.
  523.         private object m_Result;
  524.         // Final IO result to be returned byt the End*() method.
  525.         internal bool m_CompletedSynchronously;
  526.         // true if the operation completed synchronously.
  527.         private int m_InvokedCallback;
  528.         // 0 is callback is not called
  529.         private int m_Completed;
  530.         // 0 if not completed >0 otherwise.
  531.         private object m_Event;
  532.         // lazy allocated event to be returned in the IAsyncResult for the client to wait on
  533.         public DeflateStreamAsyncResult(object asyncObject, object asyncState, AsyncCallback asyncCallback, byte[] buffer, int offset, int count)
  534.         {
  535.            
  536.             this.buffer = buffer;
  537.             this.offset = offset;
  538.             this.count = count;
  539.             m_CompletedSynchronously = true;
  540.             m_AsyncObject = asyncObject;
  541.             m_AsyncState = asyncState;
  542.             m_AsyncCallback = asyncCallback;
  543.         }
  544.        
  545.         // Interface method to return the caller's state object.
  546.         public object AsyncState {
  547.             get { return m_AsyncState; }
  548.         }
  549.        
  550.         // Interface property to return a WaitHandle that can be waited on for I/O completion.
  551.         // This property implements lazy event creation.
  552.         // the event object is only created when this property is accessed,
  553.         // since we're internally only using callbacks, as long as the user is using
  554.         // callbacks as well we will not create an event at all.
  555.         public WaitHandle AsyncWaitHandle {
  556.             get {
  557.                 // save a copy of the completion status
  558.                 int savedCompleted = m_Completed;
  559.                 if (m_Event == null) {
  560.                     // lazy allocation of the event:
  561.                     // if this property is never accessed this object is never created
  562.                     Interlocked.CompareExchange(ref m_Event, new ManualResetEvent(savedCompleted != 0), null);
  563.                 }
  564.                
  565.                 ManualResetEvent castedEvent = (ManualResetEvent)m_Event;
  566.                 if (savedCompleted == 0 && m_Completed != 0) {
  567.                     // if, while the event was created in the reset state,
  568.                     // the IO operation completed, set the event here.
  569.                     castedEvent.Set();
  570.                 }
  571.                 return castedEvent;
  572.             }
  573.         }
  574.        
  575.         // Interface property, returning synchronous completion status.
  576.         public bool CompletedSynchronously {
  577.             get { return m_CompletedSynchronously; }
  578.         }
  579.        
  580.         // Interface property, returning completion status.
  581.         public bool IsCompleted {
  582.             get { return m_Completed != 0; }
  583.         }
  584.        
  585.         // Internal property for setting the IO result.
  586.         internal object Result {
  587.             get { return m_Result; }
  588.         }
  589.        
  590.         internal void Close()
  591.         {
  592.             if (m_Event != null) {
  593.                 ((ManualResetEvent)m_Event).Close();
  594.             }
  595.         }
  596.        
  597.         internal void InvokeCallback(bool completedSynchronously, object result)
  598.         {
  599.             Complete(completedSynchronously, result);
  600.         }
  601.        
  602.         internal void InvokeCallback(object result)
  603.         {
  604.             Complete(result);
  605.         }
  606.        
  607.         // Internal method for setting completion.
  608.         // As a side effect, we'll signal the WaitHandle event and clean up.
  609.         private void Complete(bool completedSynchronously, object result)
  610.         {
  611.             m_CompletedSynchronously = completedSynchronously;
  612.             Complete(result);
  613.         }
  614.        
  615.         private void Complete(object result)
  616.         {
  617.             m_Result = result;
  618.            
  619.             // Set IsCompleted and the event only after the usercallback method.
  620.             Interlocked.Increment(ref m_Completed);
  621.            
  622.             if (m_Event != null) {
  623.                 ((ManualResetEvent)m_Event).Set();
  624.             }
  625.            
  626.             if (Interlocked.Increment(ref m_InvokedCallback) == 1) {
  627.                 if (m_AsyncCallback != null) {
  628.                     m_AsyncCallback(this);
  629.                 }
  630.             }
  631.         }
  632.        
  633.     }
  634. }

Developer Fusion