AsyncBufferReader.cs 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. using System;
  2. using System.IO;
  3. using System.Threading.Tasks;
  4. using Vinno.IUS.Common.IO;
  5. namespace Vinno.IUS.Common.Network.IO
  6. {
  7. public class AsyncBufferReader
  8. {
  9. /// <summary>
  10. /// Size > 4MB will be consided to large buffer.
  11. /// </summary>
  12. private const int LargeSize = 4194304;
  13. private readonly Stream _stream;
  14. public AsyncBufferReader(Stream stream)
  15. {
  16. _stream = stream;
  17. }
  18. /// <summary>
  19. /// Read a int data from stream.
  20. /// </summary>
  21. /// <returns></returns>
  22. private async Task<int> ReadIntAsync()
  23. {
  24. var readData = await ReadBinaryAsync(sizeof(int)).ConfigureAwait(false);
  25. return BitConverter.ToInt32(readData, 0);
  26. }
  27. /// <summary>
  28. /// Read binary data by size.
  29. /// </summary>
  30. /// <param name="size"></param>
  31. /// <returns></returns>
  32. private async Task<byte[]> ReadBinaryAsync(int size)
  33. {
  34. var data = new byte[size];
  35. var dataLength = data.Length;
  36. var startPosition = 0;
  37. var bytesRead = 0;
  38. do
  39. {
  40. dataLength -= bytesRead;
  41. if (dataLength == 0)
  42. {
  43. break;
  44. }
  45. bytesRead = await _stream.ReadAsync(data, startPosition, dataLength).ConfigureAwait(false);
  46. if (bytesRead == 0)
  47. {
  48. //If the bytes readed is zero, it means the network connection is aborted.
  49. throw new ConnectionAbortException();
  50. }
  51. if (bytesRead != dataLength)
  52. {
  53. startPosition += bytesRead;
  54. }
  55. } while (bytesRead != dataLength);
  56. return data;
  57. }
  58. /// <summary>
  59. /// Read buffer from stream.
  60. /// </summary>
  61. /// <returns></returns>
  62. public async Task<IBuffer> ReadBufferAsync()
  63. {
  64. IBuffer result;
  65. var readData = await ReadIntAsync().ConfigureAwait(false);
  66. var bufferType = (BufferType)readData;
  67. if (bufferType == BufferType.Composit)
  68. {
  69. var cb = new CompositBuffer();
  70. var bufferCount = await ReadIntAsync().ConfigureAwait(false);
  71. for (var i = 0; i < bufferCount; i++)
  72. {
  73. var buffer = await ReadBufferAsync().ConfigureAwait(false);
  74. cb.Add(buffer);
  75. }
  76. result = cb;
  77. }
  78. else
  79. {
  80. //Small size, read directly
  81. var bufferSize = await ReadIntAsync().ConfigureAwait(false);
  82. if (bufferSize <= LargeSize)
  83. {
  84. var buffer = await ReadBinaryAsync(bufferSize).ConfigureAwait(false);
  85. result = new ByteBuffer(buffer);
  86. }
  87. else
  88. {
  89. //Large size, use a cache to read.
  90. var cacheStream = new CacheStream(IoCache.CreateCache());
  91. var largeBuffer = new CompositCacheBuffer(cacheStream);
  92. while (bufferSize > LargeSize)
  93. {
  94. var buffer = await ReadBinaryAsync(LargeSize).ConfigureAwait(false);
  95. largeBuffer.Add(new CacheBuffer(cacheStream, buffer));
  96. bufferSize -= LargeSize;
  97. }
  98. if (bufferSize > 0)
  99. {
  100. var buffer = await ReadBinaryAsync(bufferSize).ConfigureAwait(false);
  101. largeBuffer.Add(new CacheBuffer(cacheStream, buffer));
  102. }
  103. CacheStreamManager.Register(cacheStream);
  104. result = largeBuffer;
  105. }
  106. }
  107. ////HashEngine.HashSize is 4, it's a int, use ReadInt directly, the compare hascode is no
  108. var hashCode = await ReadIntAsync();
  109. if (hashCode!=BitConverter.ToInt32(result.HashCode, 0))
  110. {
  111. throw new ErrorDataException("Hash error");
  112. }
  113. return result;
  114. }
  115. }
  116. }