ClientChannel.cs 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. using System;
  2. using System.Threading.Tasks;
  3. using Vinno.IUS.Common.IO;
  4. using Vinno.IUS.Common.Network.Tcp;
  5. using Vinno.IUS.Common.Network.Tcp.Extensions;
  6. using Vinno.IUS.Common.Network.IO;
  7. namespace Vinno.IUS.Common.Network.Channels
  8. {
  9. public class ClientChannel:Channel
  10. {
  11. private readonly ITcp _tcp;
  12. private int _readTimeoutTranscationLevel;
  13. private int _writeTimeoutTranscationLevel;
  14. /// <summary>
  15. /// Gets the total data transfered include send and receive.
  16. /// </summary>
  17. public long DataTransfered { get; private set; }
  18. public ClientChannel(int channelId, ITcp tcp):base(channelId)
  19. {
  20. _tcp = tcp;
  21. Remote = _tcp.Remote;
  22. Local = _tcp.Local;
  23. }
  24. /// <summary>
  25. /// Read and validate the header from remote.
  26. /// </summary>
  27. /// <param name="timeout">Wait forever if the value is 0</param>
  28. protected override void ReadHeader(int timeout = 0)
  29. {
  30. //Notice that when network is shutdown, it may not know it's shutdown,
  31. //But when read data from socket, it will read 0 byte, that's can used for
  32. //Check if network is shutdown.
  33. using (new ReadTimeoutTranscation(this, timeout))
  34. {
  35. //ReadBinary may throw exception when network is shutdown, and readed data length is zero.
  36. var headerData = _tcp.ReadBinary(8);
  37. ValidateHeader(headerData);
  38. DataTransfered += 8;
  39. }
  40. }
  41. /// <summary>
  42. /// Read and validate the header from remote by async way.
  43. /// </summary>
  44. /// <param name="timeout">Wait forever if the value is 0</param>
  45. protected override async Task ReadHeaderAsync(int timeout = 0)
  46. {
  47. //Notice that when network is shutdown, it may not know it's shutdown,
  48. //But when read data from socket, it will read 0 byte, that's can used for
  49. //Check if network is shutdown.
  50. using (new ReadTimeoutTranscation(this, timeout))
  51. {
  52. //ReadBinary may throw exception when network is shutdown, and read data length is zero.
  53. var headerData = await _tcp.ReadBinaryAsync(8).ConfigureAwait(false);
  54. ValidateHeader(headerData);
  55. DataTransfered += 8;
  56. }
  57. }
  58. /// <summary>
  59. /// Write the header before write buffer.
  60. /// </summary>
  61. /// <param name="timeout">Wait forever if the value is 0</param>
  62. protected override void WriteHeader(int timeout = 0)
  63. {
  64. using (new WriteTimeoutTranscation(this, timeout))
  65. {
  66. var headerData = GetHeaderData();
  67. //Write header
  68. _tcp.WriteBinary(headerData);
  69. DataTransfered += 8;
  70. }
  71. }
  72. /// <summary>
  73. /// Write the header before write buffer by async way.
  74. /// </summary>
  75. /// <param name="timeout">Wait forever if the value is 0</param>
  76. protected override async Task WriteHeaderAsync(int timeout = 0)
  77. {
  78. using (new WriteTimeoutTranscation(this, timeout))
  79. {
  80. var headerData = GetHeaderData();
  81. //Write header
  82. await _tcp.WriteBinaryAsync(headerData).ConfigureAwait(false);
  83. DataTransfered += 8;
  84. }
  85. }
  86. /// <summary>
  87. /// Read buffer from remote side.
  88. /// </summary>
  89. /// <param name="timeout">Wait forever if the value is 0</param>
  90. /// <returns>The buffer readed from network.</returns>
  91. protected IBuffer ReadBuffer(int timeout = 0)
  92. {
  93. using (new ReadTimeoutTranscation(this, timeout))
  94. {
  95. var reader = new BufferReader(_tcp.ReadStream);
  96. var buffer = reader.ReadBuffer();
  97. DataTransfered += buffer.Size;
  98. return buffer;
  99. }
  100. }
  101. /// <summary>
  102. /// Read buffer from remote side by async way.
  103. /// </summary>
  104. /// <param name="timeout">Wait forever if the value is 0</param>
  105. /// <returns>The buffer readed from network.</returns>
  106. protected async Task<IBuffer> ReadBufferAsync(int timeout = 0)
  107. {
  108. using (new ReadTimeoutTranscation(this, timeout))
  109. {
  110. var reader = new AsyncBufferReader(_tcp.ReadStream);
  111. var buffer = await reader.ReadBufferAsync().ConfigureAwait(false);
  112. DataTransfered += buffer.Size;
  113. return buffer;
  114. }
  115. }
  116. /// <summary>
  117. /// Write buffer to remote side.
  118. /// </summary>
  119. /// <param name="buffer">The buffer to be writen</param>
  120. /// <param name="timeout">Wait forever if the value is 0</param>
  121. protected void WriteBuffer(IBuffer buffer, int timeout = 0)
  122. {
  123. using (new WriteTimeoutTranscation(this, timeout))
  124. {
  125. var writer = new BufferWriter(_tcp.WriteStream);
  126. writer.WriteBuffer(buffer);
  127. DataTransfered += buffer.Size;
  128. }
  129. }
  130. /// <summary>
  131. /// Write buffer to remote side by async way.
  132. /// </summary>
  133. /// <param name="buffer">The buffer to be writen</param>
  134. /// <param name="timeout">Wait forever if the value is 0</param>
  135. protected async Task WriteBufferAsync(IBuffer buffer, int timeout = 0)
  136. {
  137. using (new WriteTimeoutTranscation(this, timeout))
  138. {
  139. var writer = new AsyncBufferWriter(_tcp.WriteStream);
  140. await writer.WriteBufferAsync(buffer).ConfigureAwait(false);
  141. DataTransfered += buffer.Size;
  142. }
  143. }
  144. protected override void OnClosed()
  145. {
  146. _tcp.Disconnect();
  147. base.OnClosed();
  148. }
  149. public override string ToString()
  150. {
  151. return $"{Remote}<->{Local}";
  152. }
  153. private class ReadTimeoutTranscation : IDisposable
  154. {
  155. private readonly ClientChannel _channel;
  156. public ReadTimeoutTranscation(ClientChannel channel, int timeout)
  157. {
  158. _channel = channel;
  159. if (_channel._readTimeoutTranscationLevel == 0)
  160. {
  161. try
  162. {
  163. _channel._tcp.ReadTimeout = timeout;
  164. }
  165. catch
  166. {
  167. //DO Nothing.
  168. }
  169. }
  170. _channel._readTimeoutTranscationLevel++;
  171. }
  172. public void Dispose()
  173. {
  174. _channel._readTimeoutTranscationLevel--;
  175. if (_channel._readTimeoutTranscationLevel == 0)
  176. {
  177. try
  178. {
  179. _channel._tcp.ReadTimeout = 0;
  180. }
  181. catch
  182. {
  183. //DO Nothing.
  184. }
  185. }
  186. }
  187. }
  188. private class WriteTimeoutTranscation : IDisposable
  189. {
  190. private readonly ClientChannel _channel;
  191. public WriteTimeoutTranscation(ClientChannel channel, int timeout)
  192. {
  193. _channel = channel;
  194. if (_channel._writeTimeoutTranscationLevel == 0)
  195. {
  196. try
  197. {
  198. _channel._tcp.SendTimeout = timeout;
  199. }
  200. catch
  201. {
  202. //DO Nothing.
  203. }
  204. }
  205. _channel._writeTimeoutTranscationLevel++;
  206. }
  207. public void Dispose()
  208. {
  209. _channel._writeTimeoutTranscationLevel--;
  210. if (_channel._writeTimeoutTranscationLevel == 0)
  211. {
  212. try
  213. {
  214. _channel._tcp.SendTimeout = 0;
  215. }
  216. catch
  217. {
  218. //DO Nothing.
  219. }
  220. }
  221. }
  222. }
  223. }
  224. }