123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- using System;
- using System.IO;
- using System.Threading.Tasks;
- using Vinno.IUS.Common.IO;
- namespace Vinno.IUS.Common.Network.IO
- {
- public class AsyncBufferReader
- {
- /// <summary>
- /// Size > 4MB will be consided to large buffer.
- /// </summary>
- private const int LargeSize = 4194304;
- private readonly Stream _stream;
- public AsyncBufferReader(Stream stream)
- {
- _stream = stream;
- }
- /// <summary>
- /// Read a int data from stream.
- /// </summary>
- /// <returns></returns>
- private async Task<int> ReadIntAsync()
- {
- var readData = await ReadBinaryAsync(sizeof(int)).ConfigureAwait(false);
- return BitConverter.ToInt32(readData, 0);
- }
- /// <summary>
- /// Read binary data by size.
- /// </summary>
- /// <param name="size"></param>
- /// <returns></returns>
- private async Task<byte[]> ReadBinaryAsync(int size)
- {
- var data = new byte[size];
- var dataLength = data.Length;
- var startPosition = 0;
- var bytesRead = 0;
- do
- {
- dataLength -= bytesRead;
- if (dataLength == 0)
- {
- break;
- }
- bytesRead = await _stream.ReadAsync(data, startPosition, dataLength).ConfigureAwait(false);
- if (bytesRead == 0)
- {
- //If the bytes readed is zero, it means the network connection is aborted.
- throw new ConnectionAbortException();
- }
- if (bytesRead != dataLength)
- {
- startPosition += bytesRead;
- }
- } while (bytesRead != dataLength);
- return data;
- }
- /// <summary>
- /// Read buffer from stream.
- /// </summary>
- /// <returns></returns>
- public async Task<IBuffer> ReadBufferAsync()
- {
- IBuffer result;
- var readData = await ReadIntAsync().ConfigureAwait(false);
- var bufferType = (BufferType)readData;
- if (bufferType == BufferType.Composit)
- {
- var cb = new CompositBuffer();
- var bufferCount = await ReadIntAsync().ConfigureAwait(false);
- for (var i = 0; i < bufferCount; i++)
- {
- var buffer = await ReadBufferAsync().ConfigureAwait(false);
- cb.Add(buffer);
- }
- result = cb;
- }
- else
- {
- //Small size, read directly
- var bufferSize = await ReadIntAsync().ConfigureAwait(false);
- if (bufferSize <= LargeSize)
- {
- var buffer = await ReadBinaryAsync(bufferSize).ConfigureAwait(false);
- result = new ByteBuffer(buffer);
- }
- else
- {
- //Large size, use a cache to read.
- var cacheStream = new CacheStream(IoCache.CreateCache());
- var largeBuffer = new CompositCacheBuffer(cacheStream);
- while (bufferSize > LargeSize)
- {
- var buffer = await ReadBinaryAsync(LargeSize).ConfigureAwait(false);
- largeBuffer.Add(new CacheBuffer(cacheStream, buffer));
- bufferSize -= LargeSize;
- }
- if (bufferSize > 0)
- {
- var buffer = await ReadBinaryAsync(bufferSize).ConfigureAwait(false);
- largeBuffer.Add(new CacheBuffer(cacheStream, buffer));
- }
- CacheStreamManager.Register(cacheStream);
- result = largeBuffer;
- }
- }
- ////HashEngine.HashSize is 4, it's a int, use ReadInt directly, the compare hascode is no
- var hashCode = await ReadIntAsync();
- if (hashCode!=BitConverter.ToInt32(result.HashCode, 0))
- {
- throw new ErrorDataException("Hash error");
- }
- return result;
- }
- }
- }
|