|
@@ -0,0 +1,278 @@
|
|
|
+using System;
|
|
|
+using System.Net;
|
|
|
+using System.Text;
|
|
|
+using System.Threading;
|
|
|
+using System.Threading.Tasks;
|
|
|
+using JsonRpcLite.Log;
|
|
|
+using JsonRpcLite.Network;
|
|
|
+using JsonRpcLite.Rpc;
|
|
|
+using JsonRpcLite.Services;
|
|
|
+using JsonRpcLite.Utilities;
|
|
|
+using KcpNet;
|
|
|
+
|
|
|
+namespace JsonRpcLite.Kcp
|
|
|
+{
|
|
|
+ public class JsonRpcKcpServerEngine : IJsonRpcServerEngine
|
|
|
+ {
|
|
|
+ private readonly IPAddress _address;
|
|
|
+ private readonly int _port;
|
|
|
+
|
|
|
+ private IJsonRpcRouter _router;
|
|
|
+ private bool _stopped;
|
|
|
+ private KcpListener _listener;
|
|
|
+
|
|
|
+ public string Name { get; }
|
|
|
+
|
|
|
+ public JsonRpcKcpServerEngine(IPAddress address, int port)
|
|
|
+ {
|
|
|
+ Name = nameof(JsonRpcKcpServerEngine);
|
|
|
+ _address = address;
|
|
|
+ _port = port;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void Start(IJsonRpcRouter router)
|
|
|
+ {
|
|
|
+ _router = router;
|
|
|
+ _stopped = false;
|
|
|
+ _listener = new KcpListener(new IPEndPoint(_address, _port));
|
|
|
+ _listener.Start();
|
|
|
+ Task.Factory.StartNew(async () =>
|
|
|
+ {
|
|
|
+ while (!_stopped)
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ var connection = await _listener.GetConnectionAsync().ConfigureAwait(false);
|
|
|
+ HandleConnection(connection, CancellationToken.None);
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ Logger.WriteError($"GetConnection error:{ex.Message}");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }, TaskCreationOptions.LongRunning);
|
|
|
+ Logger.WriteInfo("JsonRpc http server engine started.");
|
|
|
+ }
|
|
|
+
|
|
|
+ public void Stop()
|
|
|
+ {
|
|
|
+ _listener.Close();
|
|
|
+ }
|
|
|
+
|
|
|
+ private async Task<byte[]> ReadBytesAsync(KcpConnection connection, int size, CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+ var bytes = new byte[size];
|
|
|
+ var length = size;
|
|
|
+ var startIndex = 0;
|
|
|
+ while (length > 0)
|
|
|
+ {
|
|
|
+ var buffer = new byte[length];
|
|
|
+ var bytesRead = await connection.ReadAsync(buffer, cancellationToken).ConfigureAwait(false);
|
|
|
+ if (bytesRead == 0)
|
|
|
+ {
|
|
|
+ throw new InvalidOperationException("Connection could be closed.");
|
|
|
+ }
|
|
|
+ Array.Copy(buffer, 0, bytes, startIndex, bytesRead);
|
|
|
+ startIndex += bytesRead;
|
|
|
+ length -= bytesRead;
|
|
|
+ }
|
|
|
+
|
|
|
+ return bytes;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private async Task<int> ReadIntAsync(KcpConnection connection, CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+ var intData = await ReadBytesAsync(connection, 4, cancellationToken).ConfigureAwait(false);
|
|
|
+ return BitConverter.ToInt32(intData);
|
|
|
+ }
|
|
|
+
|
|
|
+ private async Task<string> ReadStringAsync(KcpConnection connection, CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+ var dataLength = await ReadIntAsync(connection, cancellationToken).ConfigureAwait(false);
|
|
|
+ var data = await ReadBytesAsync(connection, dataLength, cancellationToken).ConfigureAwait(false);
|
|
|
+ return Encoding.UTF8.GetString(data);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// Dispatch request to specified service.
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="connection">The KcpConnection</param>
|
|
|
+ /// <param name="router">The router to handle the rpc request</param>
|
|
|
+ /// <param name="serviceName">The name of the service</param>
|
|
|
+ /// <param name="cancellationToken">The cancellation token which can cancel this method</param>
|
|
|
+ /// <returns>Void</returns>
|
|
|
+ private async Task DispatchAsync(KcpConnection connection, IJsonRpcRouter router, string serviceName, CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+ var dataLength = await ReadIntAsync(connection,cancellationToken).ConfigureAwait(false);
|
|
|
+ var requestData = await ReadBytesAsync(connection, dataLength,cancellationToken).ConfigureAwait(false);
|
|
|
+ if (Logger.DebugMode)
|
|
|
+ {
|
|
|
+ var requestString = Encoding.UTF8.GetString(requestData);
|
|
|
+ Logger.WriteDebug($"Receive request data:{requestString}");
|
|
|
+ }
|
|
|
+ var requests = await JsonRpcCodec.DecodeRequestsAsync(requestData, cancellationToken, dataLength).ConfigureAwait(false);
|
|
|
+ var responses = await router.DispatchRequestsAsync(serviceName, requests, cancellationToken).ConfigureAwait(false);
|
|
|
+ await WriteRpcResponsesAsync(connection, responses, cancellationToken).ConfigureAwait(false);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private void HandleConnection(KcpConnection connection, CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+ Task.Run(async () =>
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ while (true)
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ var serviceName = await ReadStringAsync(connection, cancellationToken).ConfigureAwait(false);
|
|
|
+ if (string.IsNullOrEmpty(serviceName))
|
|
|
+ {
|
|
|
+ Logger.WriteWarning($"Service {serviceName} not found.");
|
|
|
+ throw new ServerErrorException("Service does not exist.", $"Service [{serviceName}] does not exist.");
|
|
|
+ }
|
|
|
+ if (!_router.ServiceExists(serviceName))
|
|
|
+ {
|
|
|
+ Logger.WriteWarning($"Service {serviceName} not found.");
|
|
|
+ throw new ServerErrorException("Service does not exist.", $"Service [{serviceName}] does not exist.");
|
|
|
+ }
|
|
|
+
|
|
|
+ try
|
|
|
+ {
|
|
|
+ await DispatchAsync(connection, _router, serviceName, cancellationToken).ConfigureAwait(false);
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ if (ex is RpcException)
|
|
|
+ {
|
|
|
+ throw;
|
|
|
+ }
|
|
|
+
|
|
|
+ throw new ServerErrorException("Internal server error.", ex.Message);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ Logger.WriteError($"Handle request error: {ex.Message}");
|
|
|
+ if (ex is HttpException httpException)
|
|
|
+ {
|
|
|
+ await WriteExceptionAsync(connection, httpException, cancellationToken).ConfigureAwait(false);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ var response = new JsonRpcResponse();
|
|
|
+ if (ex is RpcException rpcException)
|
|
|
+ {
|
|
|
+ response.WriteResult(rpcException);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ var serverError = new InternalErrorException($"Handle request error: {ex.Message}");
|
|
|
+ response.WriteResult(serverError);
|
|
|
+ }
|
|
|
+
|
|
|
+ await WriteRpcResponsesAsync(connection, new[] { response }, cancellationToken).ConfigureAwait(false);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception)
|
|
|
+ {
|
|
|
+ //
|
|
|
+ }
|
|
|
+ }, cancellationToken);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// Write exception back to the client.
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="connection">The KcpConnection</param>
|
|
|
+ /// <param name="exception">The exception to write back.</param>
|
|
|
+ /// <param name="cancellationToken">The cancel token which will cancel this method.</param>
|
|
|
+ /// <returns>Void</returns>
|
|
|
+ private async Task WriteExceptionAsync(KcpConnection connection, HttpException exception, CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ await WriteResultAsync(connection, exception.Message, cancellationToken).ConfigureAwait(false);
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ Logger.WriteWarning($"Write http exception back to client error:{ex}");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// Write http message to remote side.
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="connection">The KcpConnection.</param>
|
|
|
+ /// <param name="message">The message to write back.</param>
|
|
|
+ /// <param name="cancellationToken">The cancellation token which will cancel this method.</param>
|
|
|
+ /// <returns>Void</returns>
|
|
|
+ private async Task WriteResultAsync(KcpConnection connection, string message, CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+ var data = Encoding.UTF8.GetBytes(message);
|
|
|
+ var lengthData = BitConverter.GetBytes(data.Length);
|
|
|
+ await connection.SendAsync(lengthData, cancellationToken).ConfigureAwait(false);
|
|
|
+ await connection.SendAsync(data, cancellationToken).ConfigureAwait(false);
|
|
|
+ if (Logger.DebugMode)
|
|
|
+ {
|
|
|
+ Logger.WriteDebug($"Response data sent:{message}");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// Write rpc responses back to the client.
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="connection">The KcpConnection</param>
|
|
|
+ /// <param name="responses">The responses to write back.</param>
|
|
|
+ /// <param name="cancellationToken">The cancel token which will cancel this method.</param>
|
|
|
+ /// <returns>Void</returns>
|
|
|
+ private async Task WriteRpcResponsesAsync(KcpConnection connection, JsonRpcResponse[] responses, CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ var resultData = await JsonRpcCodec.EncodeResponsesAsync(responses, cancellationToken).ConfigureAwait(false);
|
|
|
+ await WriteRpcResultAsync(connection, resultData, cancellationToken).ConfigureAwait(false);
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ Logger.WriteWarning($"Write rpc response back to client error:{ex}");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// Write rpc result struct data to remote side.
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="connection">The KcpConnection.</param>
|
|
|
+ /// <param name="result">The result data to write</param>
|
|
|
+ /// <param name="cancellationToken">The cancellation which can cancel this method</param>
|
|
|
+ /// <returns>Void</returns>
|
|
|
+ private async Task WriteRpcResultAsync(KcpConnection connection, byte[] result, CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+ if (result != null)
|
|
|
+ {
|
|
|
+ var data = result;
|
|
|
+ var lengthData = BitConverter.GetBytes(data.Length);
|
|
|
+ await connection.SendAsync(lengthData, cancellationToken).ConfigureAwait(false);
|
|
|
+ await connection.SendAsync(data, cancellationToken).ConfigureAwait(false);
|
|
|
+ }
|
|
|
+ if (Logger.DebugMode)
|
|
|
+ {
|
|
|
+ if (result != null)
|
|
|
+ {
|
|
|
+ var resultString = Encoding.UTF8.GetString(result);
|
|
|
+ Logger.WriteDebug($"Response data sent:{resultString}");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+}
|