// // Copyright (c) 2008-2011, Kenneth Bell // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), // to deal in the Software without restriction, including without limitation // the rights to use, copy, modify, merge, publish, distribute, sublicense, // and/or sell copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. // using System; using System.Collections.Generic; using System.IO; namespace DiscUtils.Streams { /// /// Provides a thread-safe wrapping around a sparse stream. /// /// /// Streams are inherently not thread-safe (because read/write is not atomic w.r.t. Position). /// This method enables multiple 'views' of a stream to be created (each with their own Position), and ensures /// only a single operation is executing on the wrapped stream at any time. /// This example shows the pattern of use: /// /// /// SparseStream baseStream = ...; /// ThreadSafeStream tss = new ThreadSafeStream(baseStream); /// for(int i = 0; i < 10; ++i) /// { /// SparseStream streamForThread = tss.OpenView(); /// } /// /// /// This results in 11 streams that can be used in different streams - tss and ten 'views' created from tss. /// Note, the stream length cannot be changed. /// public class ThreadSafeStream : SparseStream { private CommonState _common; private readonly bool _ownsCommon; private long _position; /// /// Initializes a new instance of the ThreadSafeStream class. /// /// The stream to wrap. /// Do not directly modify toWrap after wrapping it, unless the thread-safe views /// will no longer be used. public ThreadSafeStream(SparseStream toWrap) : this(toWrap, Ownership.None) {} /// /// Initializes a new instance of the ThreadSafeStream class. /// /// The stream to wrap. /// Whether to transfer ownership of toWrap to the new instance. /// Do not directly modify toWrap after wrapping it, unless the thread-safe views /// will no longer be used. public ThreadSafeStream(SparseStream toWrap, Ownership ownership) { if (!toWrap.CanSeek) { throw new ArgumentException("Wrapped stream must support seeking", nameof(toWrap)); } _common = new CommonState { WrappedStream = toWrap, WrappedStreamOwnership = ownership }; _ownsCommon = true; } private ThreadSafeStream(ThreadSafeStream toClone) { _common = toClone._common; if (_common == null) { throw new ObjectDisposedException("toClone"); } } /// /// Gets a value indicating if this stream supports reads. /// public override bool CanRead { get { lock (_common) { return Wrapped.CanRead; } } } /// /// Gets a value indicating if this stream supports seeking (always true). /// public override bool CanSeek { get { return true; } } /// /// Gets a value indicating if this stream supports writes (currently, always false). /// public override bool CanWrite { get { lock (_common) { return Wrapped.CanWrite; } } } /// /// Gets the parts of the stream that are stored. /// /// This may be an empty enumeration if all bytes are zero. public override IEnumerable Extents { get { lock (_common) { return Wrapped.Extents; } } } /// /// Gets the length of the stream. /// public override long Length { get { lock (_common) { return Wrapped.Length; } } } /// /// Gets the current stream position - each 'view' has it's own Position. /// public override long Position { get { return _position; } set { _position = value; } } private SparseStream Wrapped { get { SparseStream wrapped = _common.WrappedStream; if (wrapped == null) { throw new ObjectDisposedException("ThreadSafeStream"); } return wrapped; } } /// /// Opens a new thread-safe view on the stream. /// /// The new view. public SparseStream OpenView() { return new ThreadSafeStream(this); } /// /// Gets the parts of a stream that are stored, within a specified range. /// /// The offset of the first byte of interest. /// The number of bytes of interest. /// An enumeration of stream extents, indicating stored bytes. public override IEnumerable GetExtentsInRange(long start, long count) { lock (_common) { return Wrapped.GetExtentsInRange(start, count); } } /// /// Causes the stream to flush all changes. /// public override void Flush() { lock (_common) { Wrapped.Flush(); } } /// /// Reads data from the stream. /// /// The buffer to fill. /// The first byte in buffer to fill. /// The requested number of bytes to read. /// The actual number of bytes read. public override int Read(byte[] buffer, int offset, int count) { lock (_common) { SparseStream wrapped = Wrapped; wrapped.Position = _position; int numRead = wrapped.Read(buffer, offset, count); _position += numRead; return numRead; } } /// /// Changes the current stream position (each view has it's own Position). /// /// The relative location to move to. /// The origin of the location. /// The new location as an absolute position. public override long Seek(long offset, SeekOrigin origin) { long effectiveOffset = offset; if (origin == SeekOrigin.Current) { effectiveOffset += _position; } else if (origin == SeekOrigin.End) { effectiveOffset += Length; } if (effectiveOffset < 0) { throw new IOException("Attempt to move before beginning of disk"); } _position = effectiveOffset; return _position; } /// /// Sets the length of the stream (not supported). /// /// The new length. public override void SetLength(long value) { throw new NotSupportedException(); } /// /// Writes data to the stream (not currently supported). /// /// The data to write. /// The first byte to write. /// The number of bytes to write. public override void Write(byte[] buffer, int offset, int count) { lock (_common) { SparseStream wrapped = Wrapped; if (_position + count > wrapped.Length) { throw new IOException("Attempt to extend stream"); } wrapped.Position = _position; wrapped.Write(buffer, offset, count); _position += count; } } /// /// Disposes of this instance, invalidating any remaining views. /// /// true if disposing, lese false. protected override void Dispose(bool disposing) { if (disposing) { if (_ownsCommon && _common != null) { lock (_common) { if (_common.WrappedStreamOwnership == Ownership.Dispose) { _common.WrappedStream.Dispose(); } _common.Dispose(); } } } _common = null; } private sealed class CommonState : IDisposable { public SparseStream WrappedStream; public Ownership WrappedStreamOwnership; #region IDisposable Members public void Dispose() { WrappedStream = null; } #endregion } } }