123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- /* Copyright (C) 2016-2017 Tal Aloni <tal.aloni.il@gmail.com>. All rights reserved.
- *
- * You can redistribute this program and/or modify it under the terms of
- * the GNU Lesser Public License as published by the Free Software Foundation,
- * either version 3 of the License, or (at your option) any later version.
- */
- using System;
- using System.Collections.Generic;
- using System.IO;
- using System.Text;
- using System.Threading;
- namespace Utilities
- {
- public class PrefetchedStream : Stream
- {
- public const int CacheSize = 524288; // 512 KB
- public const int ReadAheadThershold = 65536; // 64 KB
- private long m_cacheOffset;
- private byte[] m_cache = new byte[0];
- private Stream m_stream;
- public PrefetchedStream(Stream stream)
- {
- m_stream = stream;
- if (m_stream.CanRead)
- {
- ScheduleReadAhead();
- }
- }
- private void ScheduleReadAhead()
- {
- new Thread(delegate()
- {
- ReadAhead();
- }).Start();
- }
- private void ReadAhead()
- {
- lock (m_stream)
- {
- long position = this.Position;
- bool isInCache = (position >= m_cacheOffset) && (position < m_cacheOffset + m_cache.Length);
- int bytesAlreadyRead;
- if (isInCache)
- {
- int offsetInCache = (int)(position - m_cacheOffset);
- bytesAlreadyRead = m_cache.Length - offsetInCache;
- byte[] oldCache = m_cache;
- m_cache = new byte[CacheSize];
- Array.Copy(oldCache, offsetInCache, m_cache, 0, bytesAlreadyRead);
- this.Position = position + bytesAlreadyRead;
- }
- else
- {
- bytesAlreadyRead = 0;
- m_cache = new byte[CacheSize];
- }
- m_cacheOffset = position;
- int bytesRead = m_stream.Read(m_cache, bytesAlreadyRead, CacheSize - bytesAlreadyRead);
- System.Diagnostics.Debug.Print("[{0}] {1} bytes have been read ahead from offset {2}.", DateTime.Now.ToString("HH:mm:ss:ffff"), bytesRead, position);
- if (bytesAlreadyRead + bytesRead < CacheSize)
- {
- // EOF, we must trim the response data array
- m_cache = ByteReader.ReadBytes(m_cache, 0, bytesAlreadyRead + bytesRead);
- }
- this.Position = position;
- }
- }
- public override int Read(byte[] buffer, int offset, int count)
- {
- int bytesCopied;
- lock (m_stream)
- {
- long position = this.Position;
- bool isInCache = (position >= m_cacheOffset) && (position < m_cacheOffset + m_cache.Length);
- if (isInCache)
- {
- int offsetInCache = (int)(position - m_cacheOffset);
- int bytesAvailableInCache = m_cache.Length - offsetInCache;
- bytesCopied = Math.Min(count, bytesAvailableInCache);
- Array.Copy(m_cache, offsetInCache, buffer, offset, bytesCopied);
- this.Position = position + bytesCopied;
- if (bytesCopied < count)
- {
- int bytesMissing = count - bytesCopied;
- int bytesRead = m_stream.Read(buffer, offset + bytesCopied, bytesMissing);
- }
- if (offsetInCache + ReadAheadThershold >= m_cache.Length)
- {
- ScheduleReadAhead();
- }
- }
- else
- {
- bytesCopied = m_stream.Read(buffer, 0, count);
- ScheduleReadAhead();
- }
- }
- return bytesCopied;
- }
- public override void Write(byte[] buffer, int offset, int count)
- {
- lock (m_stream)
- {
- m_cache = new byte[0];
- m_stream.Write(buffer, offset, count);
- }
- }
- public override void Close()
- {
- lock (m_stream)
- {
- m_stream.Close();
- }
- base.Close();
- }
- public override bool CanRead
- {
- get
- {
- return m_stream.CanRead;
- }
- }
- public override bool CanSeek
- {
- get
- {
- return m_stream.CanSeek;
- }
- }
- public override bool CanWrite
- {
- get
- {
- return m_stream.CanWrite;
- }
- }
- public override long Length
- {
- get
- {
- lock (m_stream)
- {
- return m_stream.Length;
- }
- }
- }
- public override long Position
- {
- get
- {
- lock (m_stream)
- {
- return m_stream.Position;
- }
- }
- set
- {
- lock (m_stream)
- {
- m_stream.Position = value;
- }
- }
- }
- public override void Flush()
- {
- lock (m_stream)
- {
- m_stream.Flush();
- }
- }
- public override long Seek(long offset, SeekOrigin origin)
- {
- lock (m_stream)
- {
- return m_stream.Seek(offset, origin);
- }
- }
- public override void SetLength(long value)
- {
- lock (m_stream)
- {
- m_stream.SetLength(value);
- }
- }
- }
- }
|