//
// Copyright (c) 2008-2011, Kenneth Bell
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//
using System;
using System.Collections.Generic;
using System.IO;
namespace DiscUtils.Streams
{
///
/// Provides a thread-safe wrapping around a sparse stream.
///
///
/// Streams are inherently not thread-safe (because read/write is not atomic w.r.t. Position).
/// This method enables multiple 'views' of a stream to be created (each with their own Position), and ensures
/// only a single operation is executing on the wrapped stream at any time.
/// This example shows the pattern of use:
///
///
/// SparseStream baseStream = ...;
/// ThreadSafeStream tss = new ThreadSafeStream(baseStream);
/// for(int i = 0; i < 10; ++i)
/// {
/// SparseStream streamForThread = tss.OpenView();
/// }
///
///
/// This results in 11 streams that can be used in different streams - tss and ten 'views' created from tss.
/// Note, the stream length cannot be changed.
///
public class ThreadSafeStream : SparseStream
{
private CommonState _common;
private readonly bool _ownsCommon;
private long _position;
///
/// Initializes a new instance of the ThreadSafeStream class.
///
/// The stream to wrap.
/// Do not directly modify toWrap after wrapping it, unless the thread-safe views
/// will no longer be used.
public ThreadSafeStream(SparseStream toWrap)
: this(toWrap, Ownership.None) {}
///
/// Initializes a new instance of the ThreadSafeStream class.
///
/// The stream to wrap.
/// Whether to transfer ownership of toWrap to the new instance.
/// Do not directly modify toWrap after wrapping it, unless the thread-safe views
/// will no longer be used.
public ThreadSafeStream(SparseStream toWrap, Ownership ownership)
{
if (!toWrap.CanSeek)
{
throw new ArgumentException("Wrapped stream must support seeking", nameof(toWrap));
}
_common = new CommonState
{
WrappedStream = toWrap,
WrappedStreamOwnership = ownership
};
_ownsCommon = true;
}
private ThreadSafeStream(ThreadSafeStream toClone)
{
_common = toClone._common;
if (_common == null)
{
throw new ObjectDisposedException("toClone");
}
}
///
/// Gets a value indicating if this stream supports reads.
///
public override bool CanRead
{
get
{
lock (_common)
{
return Wrapped.CanRead;
}
}
}
///
/// Gets a value indicating if this stream supports seeking (always true).
///
public override bool CanSeek
{
get { return true; }
}
///
/// Gets a value indicating if this stream supports writes (currently, always false).
///
public override bool CanWrite
{
get
{
lock (_common)
{
return Wrapped.CanWrite;
}
}
}
///
/// Gets the parts of the stream that are stored.
///
/// This may be an empty enumeration if all bytes are zero.
public override IEnumerable Extents
{
get
{
lock (_common)
{
return Wrapped.Extents;
}
}
}
///
/// Gets the length of the stream.
///
public override long Length
{
get
{
lock (_common)
{
return Wrapped.Length;
}
}
}
///
/// Gets the current stream position - each 'view' has it's own Position.
///
public override long Position
{
get { return _position; }
set { _position = value; }
}
private SparseStream Wrapped
{
get
{
SparseStream wrapped = _common.WrappedStream;
if (wrapped == null)
{
throw new ObjectDisposedException("ThreadSafeStream");
}
return wrapped;
}
}
///
/// Opens a new thread-safe view on the stream.
///
/// The new view.
public SparseStream OpenView()
{
return new ThreadSafeStream(this);
}
///
/// Gets the parts of a stream that are stored, within a specified range.
///
/// The offset of the first byte of interest.
/// The number of bytes of interest.
/// An enumeration of stream extents, indicating stored bytes.
public override IEnumerable GetExtentsInRange(long start, long count)
{
lock (_common)
{
return Wrapped.GetExtentsInRange(start, count);
}
}
///
/// Causes the stream to flush all changes.
///
public override void Flush()
{
lock (_common)
{
Wrapped.Flush();
}
}
///
/// Reads data from the stream.
///
/// The buffer to fill.
/// The first byte in buffer to fill.
/// The requested number of bytes to read.
/// The actual number of bytes read.
public override int Read(byte[] buffer, int offset, int count)
{
lock (_common)
{
SparseStream wrapped = Wrapped;
wrapped.Position = _position;
int numRead = wrapped.Read(buffer, offset, count);
_position += numRead;
return numRead;
}
}
///
/// Changes the current stream position (each view has it's own Position).
///
/// The relative location to move to.
/// The origin of the location.
/// The new location as an absolute position.
public override long Seek(long offset, SeekOrigin origin)
{
long effectiveOffset = offset;
if (origin == SeekOrigin.Current)
{
effectiveOffset += _position;
}
else if (origin == SeekOrigin.End)
{
effectiveOffset += Length;
}
if (effectiveOffset < 0)
{
throw new IOException("Attempt to move before beginning of disk");
}
_position = effectiveOffset;
return _position;
}
///
/// Sets the length of the stream (not supported).
///
/// The new length.
public override void SetLength(long value)
{
throw new NotSupportedException();
}
///
/// Writes data to the stream (not currently supported).
///
/// The data to write.
/// The first byte to write.
/// The number of bytes to write.
public override void Write(byte[] buffer, int offset, int count)
{
lock (_common)
{
SparseStream wrapped = Wrapped;
if (_position + count > wrapped.Length)
{
throw new IOException("Attempt to extend stream");
}
wrapped.Position = _position;
wrapped.Write(buffer, offset, count);
_position += count;
}
}
///
/// Disposes of this instance, invalidating any remaining views.
///
/// true if disposing, lese false.
protected override void Dispose(bool disposing)
{
if (disposing)
{
if (_ownsCommon && _common != null)
{
lock (_common)
{
if (_common.WrappedStreamOwnership == Ownership.Dispose)
{
_common.WrappedStream.Dispose();
}
_common.Dispose();
}
}
}
_common = null;
}
private sealed class CommonState : IDisposable
{
public SparseStream WrappedStream;
public Ownership WrappedStreamOwnership;
#region IDisposable Members
public void Dispose()
{
WrappedStream = null;
}
#endregion
}
}
}