jsonrpc_engine.go 9.8 KB


  1. package jsonrpclite
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "net/http"
  8. "strconv"
  9. "strings"
  10. "time"
  11. )
  12. type RpcServerEngine interface {
  13. // GetName Get the engine name.
  14. GetName() string
  15. //Start the engine and initialize the router.
  16. Start(router *rpcRouter)
  17. //Stop the engine and free the router.
  18. Stop()
  19. }
  20. type RpcClientEngine interface {
  21. // GetName Get the engine name.
  22. GetName() string
  23. // ProcessString Send the rpc request string to the server.
  24. ProcessString(serviceName string, requestStr string) string
  25. // ProcessData Send the rpc request data to the server
  26. ProcessData(serviceName string, method string, params []any) string
  27. //Close the engine and free the router.
  28. Close()
  29. }
  30. // RpcServerEngineCore The basic server engine for other engines
  31. type RpcServerEngineCore struct {
  32. _router *rpcRouter
  33. }
  34. // SetRouter Initialize the router for the engine.
  35. func (engine *RpcServerEngineCore) SetRouter(router *rpcRouter) {
  36. engine._router = router
  37. }
  38. // ServiceExists check whether the service is available.
  39. func (engine *RpcServerEngineCore) ServiceExists(serviceName string) bool {
  40. return engine._router.getService(serviceName) != nil
  41. }
  42. // Dispatch the request string to the services.
  43. func (engine *RpcServerEngineCore) Dispatch(serviceName string, requestStr string) string {
  44. if engine._router == nil {
  45. var err any = errors.New(" The rpc router has not been initialized. ")
  46. panic(err)
  47. }
  48. service := engine._router.getService(serviceName)
  49. if service != nil {
  50. requests := decodeRequestString(service, requestStr)
  51. responses := engine._router.dispatchRequests(serviceName, requests)
  52. if len(responses) > 0 {
  53. result := encodeResponses(responses)
  54. return string(result)
  55. } else {
  56. return ""
  57. }
  58. } else {
  59. var err any = errors.New("Service " + serviceName + " does not exist.")
  60. panic(err)
  61. }
  62. }
  63. //The engine for in-process communication
  64. type rpcInProcessEngine struct {
  65. requestId int
  66. *RpcServerEngineCore
  67. }
  68. // GetName Get the engine name.
  69. func (engine *rpcInProcessEngine) GetName() string {
  70. return "RpcInProcessEngine"
  71. }
  72. //Start the engine and initialize the router.
  73. func (engine *rpcInProcessEngine) Start(router *rpcRouter) {
  74. engine.RpcServerEngineCore.SetRouter(router)
  75. }
  76. //Stop the engine and free the router.
  77. func (engine *rpcInProcessEngine) Stop() {
  78. engine.RpcServerEngineCore.SetRouter(nil)
  79. }
  80. // ProcessString Send the rpc request string to the server.
  81. func (engine *rpcInProcessEngine) ProcessString(serviceName string, requestStr string) string {
  82. return engine.RpcServerEngineCore.Dispatch(serviceName, requestStr)
  83. }
  84. // ProcessData Send the rpc request data to the server.
  85. func (engine *rpcInProcessEngine) ProcessData(serviceName string, method string, params []any) string {
  86. engine.requestId++
  87. data := requestData{"2.0", engine.requestId, method, nil}
  88. if len(params) > 0 {
  89. if len(params) == 1 {
  90. paramData, err := json.Marshal(params[0])
  91. if err == nil {
  92. data.Params = paramData
  93. } else {
  94. var sendErr any = errors.New("Send request error: " + err.Error())
  95. panic(sendErr)
  96. }
  97. } else {
  98. paramsData, err := json.Marshal(params)
  99. if err == nil {
  100. data.Params = paramsData
  101. } else {
  102. var sendErr any = errors.New("Send request error: " + err.Error())
  103. panic(sendErr)
  104. }
  105. }
  106. }
  107. requestData, err := json.Marshal(data)
  108. if err == nil {
  109. return engine.RpcServerEngineCore.Dispatch(serviceName, string(requestData))
  110. } else {
  111. var sendErr any = errors.New("Send request error: " + err.Error())
  112. panic(sendErr)
  113. }
  114. }
  115. //Close the engine and free the router.
  116. func (engine *rpcInProcessEngine) Close() {
  117. engine.RpcServerEngineCore.SetRouter(nil)
  118. }
  119. // NewInProcessEngine Create an InProcessEngine, the results are the same instance.
  120. func NewInProcessEngine() (RpcServerEngine, RpcClientEngine) {
  121. engine := new(rpcInProcessEngine)
  122. engine.RpcServerEngineCore = new(RpcServerEngineCore)
  123. return engine, engine
  124. }
  125. type rpcHttpServerHandler struct {
  126. engine *rpcHttpServerEngine
  127. }
  128. func (handler *rpcHttpServerHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
  129. engine := handler.engine
  130. defer func() {
  131. var p = any(recover())
  132. if p != nil {
  133. responseErr, ok := p.(*RpcResponseError)
  134. if ok {
  135. engine.WriteResponseData(writer, http.StatusOK, "application/json", responseErr.response)
  136. } else {
  137. //Unhandled system error
  138. errStr := "Server error: " + fmt.Sprintf("%v", p)
  139. engine.WriteResponseData(writer, http.StatusInternalServerError, "text/html", errStr)
  140. }
  141. }
  142. }()
  143. serviceName := strings.Replace(request.URL.Path, "/", "", -1)
  144. if request.Method == "POST" {
  145. contentLength := request.ContentLength
  146. buffer := new(bytes.Buffer)
  147. for contentLength > 0 {
  148. size, err := buffer.ReadFrom(request.Body)
  149. if err == nil {
  150. contentLength -= size
  151. } else {
  152. break
  153. }
  154. }
  155. if engine.ServiceExists(serviceName) {
  156. response := engine.RpcServerEngineCore.Dispatch(serviceName, string(buffer.Bytes()))
  157. if response != "" {
  158. engine.WriteResponseData(writer, http.StatusOK, "application/json", response)
  159. } else {
  160. engine.WriteResponseData(writer, http.StatusOK, "", "")
  161. }
  162. } else {
  163. errStr := "Service " + serviceName + " does not exist."
  164. engine.WriteResponseData(writer, http.StatusServiceUnavailable, "text/html", errStr)
  165. }
  166. } else if request.Method == "OPTIONS" {
  167. engine.WriteResponseData(writer, http.StatusOK, "text/html", "")
  168. } else {
  169. errStr := "Invalid http-method: " + request.Method
  170. engine.WriteResponseData(writer, http.StatusMethodNotAllowed, "text/html", errStr)
  171. }
  172. }
  173. //A basic http server engine which uses the build-in http lib.
  174. type rpcHttpServerEngine struct {
  175. server *http.Server
  176. port int
  177. *RpcServerEngineCore
  178. }
  179. // GetName Get the engine name.
  180. func (engine *rpcHttpServerEngine) GetName() string {
  181. return "RpcHttpServerEngine"
  182. }
  183. // WriteResponseData Common way to write string data to client.
  184. func (engine *rpcHttpServerEngine) WriteResponseData(writer http.ResponseWriter, statusCode int, contentType string, content string) {
  185. contentData := []byte(content)
  186. writer.Header().Set("Server", "JsonRpcLite-Go")
  187. writer.Header().Set("Access-Control-Allow-Origin", "*")
  188. writer.Header().Set("Access-Control-Allow-Methods", "*")
  189. writer.Header().Set("Access-Control-Allow-Headers", "*")
  190. if contentType != "" {
  191. writer.Header().Set("Content-Type", contentType+"; charset=utf-8")
  192. }
  193. if len(contentData) > 0 {
  194. writer.Header().Set("Content-Length", strconv.Itoa(len(contentData)))
  195. }
  196. writer.WriteHeader(statusCode)
  197. if len(contentData) > 0 {
  198. _, err := writer.Write(contentData)
  199. if err != nil {
  200. logger.Warning("Write data to client error: " + err.Error())
  201. }
  202. }
  203. }
  204. //Start the engine and initialize the router.
  205. func (engine *rpcHttpServerEngine) Start(router *rpcRouter) {
  206. if engine.server != nil {
  207. logger.Warning("The server of engine already started, will be closed.")
  208. engine.Stop()
  209. }
  210. server := new(http.Server)
  211. engine.RpcServerEngineCore.SetRouter(router)
  212. handler := new(rpcHttpServerHandler)
  213. handler.engine = engine
  214. server.Handler = handler
  215. server.Addr = ":" + strconv.Itoa(engine.port)
  216. go func() {
  217. logger.Info(server.ListenAndServe().Error())
  218. }()
  219. }
  220. //Stop the engine and free the router.
  221. func (engine *rpcHttpServerEngine) Stop() {
  222. if engine.server != nil {
  223. err := engine.server.Close()
  224. if err != nil {
  225. logger.Warning("Close the server of engine error: " + err.Error())
  226. }
  227. engine.server = nil
  228. }
  229. engine.RpcServerEngineCore.SetRouter(nil)
  230. }
  231. // NewRpcHttpServerEngine Create a new RpcServerHttpEngine which based on the build-in http lib
  232. func NewRpcHttpServerEngine(port int) RpcServerEngine {
  233. engine := new(rpcHttpServerEngine)
  234. engine.port = port
  235. engine.RpcServerEngineCore = new(RpcServerEngineCore)
  236. return engine
  237. }
  238. //A basic http client engine which uses the build-in http lib.
  239. type rpcHttpClientEngine struct {
  240. requestId int
  241. serverHost string
  242. }
  243. // GetName Get the engine name.
  244. func (engine *rpcHttpClientEngine) GetName() string {
  245. return "RpcHttpClientEngine"
  246. }
  247. // ProcessData Send the rpc request data to the server.
  248. func (engine *rpcHttpClientEngine) ProcessData(serviceName string, method string, params []any) string {
  249. engine.requestId++
  250. data := requestData{"2.0", engine.requestId, method, nil}
  251. if len(params) > 0 {
  252. if len(params) == 1 {
  253. paramData, err := json.Marshal(params[0])
  254. if err == nil {
  255. data.Params = paramData
  256. } else {
  257. var sendErr any = errors.New("Send request error: " + err.Error())
  258. panic(sendErr)
  259. }
  260. } else {
  261. paramsData, err := json.Marshal(params)
  262. if err == nil {
  263. data.Params = paramsData
  264. } else {
  265. var sendErr any = errors.New("Send request error: " + err.Error())
  266. panic(sendErr)
  267. }
  268. }
  269. }
  270. requestData, err := json.Marshal(data)
  271. if err == nil {
  272. return engine.ProcessString(serviceName, string(requestData))
  273. } else {
  274. var sendErr any = errors.New("Send request error: " + err.Error())
  275. panic(sendErr)
  276. }
  277. }
  278. // ProcessString Process Send the rpc request to the server.
  279. func (engine *rpcHttpClientEngine) ProcessString(serviceName string, requestStr string) string {
  280. buffer := bytes.NewBufferString(requestStr)
  281. http.DefaultClient.Timeout = 5 * time.Second
  282. response, err := http.Post(engine.serverHost+"/"+serviceName, "application/json; charset=utf-8", buffer)
  283. if err == nil {
  284. contentLength := response.ContentLength
  285. buffer = new(bytes.Buffer)
  286. for contentLength > 0 {
  287. size, err := buffer.ReadFrom(response.Body)
  288. if err == nil {
  289. contentLength -= size
  290. } else {
  291. break
  292. }
  293. }
  294. return string(buffer.Bytes())
  295. } else {
  296. var sendErr any = errors.New("Send request error: " + err.Error())
  297. panic(sendErr)
  298. }
  299. }
  300. //Close the engine and free the router.
  301. func (engine *rpcHttpClientEngine) Close() {
  302. //DoNothing
  303. }
  304. // NewRpcHttpClientEngine Create a new rpc http client based on the build-in http lib
  305. func NewRpcHttpClientEngine(serverHost string) RpcClientEngine {
  306. engine := new(rpcHttpClientEngine)
  307. engine.serverHost = serverHost
  308. return engine
  309. }