123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338 |
- package jsonrpclite
- import (
- "bytes"
- "encoding/json"
- "errors"
- "fmt"
- "net/http"
- "strconv"
- "strings"
- "time"
- )
- type RpcServerEngine interface {
- // GetName Get the engine name.
- GetName() string
- //Start the engine and initialize the router.
- Start(router *rpcRouter)
- //Stop the engine and free the router.
- Stop()
- }
- type RpcClientEngine interface {
- // GetName Get the engine name.
- GetName() string
- // ProcessString Send the rpc request string to the server.
- ProcessString(serviceName string, requestStr string) string
- // ProcessData Send the rpc request data to the server
- ProcessData(serviceName string, method string, params []any) string
- //Close the engine and free the router.
- Close()
- }
- // RpcServerEngineCore The basic server engine for other engines
- type RpcServerEngineCore struct {
- _router *rpcRouter
- }
- // SetRouter Initialize the router for the engine.
- func (engine *RpcServerEngineCore) SetRouter(router *rpcRouter) {
- engine._router = router
- }
- // ServiceExists check whether the service is available.
- func (engine *RpcServerEngineCore) ServiceExists(serviceName string) bool {
- return engine._router.getService(serviceName) != nil
- }
- // Dispatch the request string to the services.
- func (engine *RpcServerEngineCore) Dispatch(serviceName string, requestStr string) string {
- if engine._router == nil {
- var err any = errors.New(" The rpc router has not been initialized. ")
- panic(err)
- }
- service := engine._router.getService(serviceName)
- if service != nil {
- requests := decodeRequestString(service, requestStr)
- responses := engine._router.dispatchRequests(serviceName, requests)
- if len(responses) > 0 {
- result := encodeResponses(responses)
- return string(result)
- } else {
- return ""
- }
- } else {
- var err any = errors.New("Service " + serviceName + " does not exist.")
- panic(err)
- }
- }
- //The engine for in-process communication
- type rpcInProcessEngine struct {
- requestId int
- *RpcServerEngineCore
- }
- // GetName Get the engine name.
- func (engine *rpcInProcessEngine) GetName() string {
- return "RpcInProcessEngine"
- }
- //Start the engine and initialize the router.
- func (engine *rpcInProcessEngine) Start(router *rpcRouter) {
- engine.RpcServerEngineCore.SetRouter(router)
- }
- //Stop the engine and free the router.
- func (engine *rpcInProcessEngine) Stop() {
- engine.RpcServerEngineCore.SetRouter(nil)
- }
- // ProcessString Send the rpc request string to the server.
- func (engine *rpcInProcessEngine) ProcessString(serviceName string, requestStr string) string {
- return engine.RpcServerEngineCore.Dispatch(serviceName, requestStr)
- }
- // ProcessData Send the rpc request data to the server.
- func (engine *rpcInProcessEngine) ProcessData(serviceName string, method string, params []any) string {
- engine.requestId++
- data := requestData{"2.0", engine.requestId, method, nil}
- if len(params) > 0 {
- if len(params) == 1 {
- paramData, err := json.Marshal(params[0])
- if err == nil {
- data.Params = paramData
- } else {
- var sendErr any = errors.New("Send request error: " + err.Error())
- panic(sendErr)
- }
- } else {
- paramsData, err := json.Marshal(params)
- if err == nil {
- data.Params = paramsData
- } else {
- var sendErr any = errors.New("Send request error: " + err.Error())
- panic(sendErr)
- }
- }
- }
- requestData, err := json.Marshal(data)
- if err == nil {
- return engine.RpcServerEngineCore.Dispatch(serviceName, string(requestData))
- } else {
- var sendErr any = errors.New("Send request error: " + err.Error())
- panic(sendErr)
- }
- }
- //Close the engine and free the router.
- func (engine *rpcInProcessEngine) Close() {
- engine.RpcServerEngineCore.SetRouter(nil)
- }
- // NewInProcessEngine Create an InProcessEngine, the results are the same instance.
- func NewInProcessEngine() (RpcServerEngine, RpcClientEngine) {
- engine := new(rpcInProcessEngine)
- engine.RpcServerEngineCore = new(RpcServerEngineCore)
- return engine, engine
- }
- type rpcHttpServerHandler struct {
- engine *rpcHttpServerEngine
- }
- func (handler *rpcHttpServerHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
- engine := handler.engine
- defer func() {
- var p = any(recover())
- if p != nil {
- responseErr, ok := p.(*RpcResponseError)
- if ok {
- engine.WriteResponseData(writer, http.StatusOK, "application/json", responseErr.response)
- } else {
- //Unhandled system error
- errStr := "Server error: " + fmt.Sprintf("%v", p)
- engine.WriteResponseData(writer, http.StatusInternalServerError, "text/html", errStr)
- }
- }
- }()
- serviceName := strings.Replace(request.URL.Path, "/", "", -1)
- if request.Method == "POST" {
- contentLength := request.ContentLength
- buffer := new(bytes.Buffer)
- for contentLength > 0 {
- size, err := buffer.ReadFrom(request.Body)
- if err == nil {
- contentLength -= size
- } else {
- break
- }
- }
- if engine.ServiceExists(serviceName) {
- response := engine.RpcServerEngineCore.Dispatch(serviceName, string(buffer.Bytes()))
- if response != "" {
- engine.WriteResponseData(writer, http.StatusOK, "application/json", response)
- } else {
- engine.WriteResponseData(writer, http.StatusOK, "", "")
- }
- } else {
- errStr := "Service " + serviceName + " does not exist."
- engine.WriteResponseData(writer, http.StatusServiceUnavailable, "text/html", errStr)
- }
- } else if request.Method == "OPTIONS" {
- engine.WriteResponseData(writer, http.StatusOK, "text/html", "")
- } else {
- errStr := "Invalid http-method: " + request.Method
- engine.WriteResponseData(writer, http.StatusMethodNotAllowed, "text/html", errStr)
- }
- }
- //A basic http server engine which uses the build-in http lib.
- type rpcHttpServerEngine struct {
- server *http.Server
- port int
- *RpcServerEngineCore
- }
- // GetName Get the engine name.
- func (engine *rpcHttpServerEngine) GetName() string {
- return "RpcHttpServerEngine"
- }
- // WriteResponseData Common way to write string data to client.
- func (engine *rpcHttpServerEngine) WriteResponseData(writer http.ResponseWriter, statusCode int, contentType string, content string) {
- contentData := []byte(content)
- writer.Header().Set("Server", "JsonRpcLite-Go")
- writer.Header().Set("Access-Control-Allow-Origin", "*")
- writer.Header().Set("Access-Control-Allow-Methods", "*")
- writer.Header().Set("Access-Control-Allow-Headers", "*")
- if contentType != "" {
- writer.Header().Set("Content-Type", contentType+"; charset=utf-8")
- }
- if len(contentData) > 0 {
- writer.Header().Set("Content-Length", strconv.Itoa(len(contentData)))
- }
- writer.WriteHeader(statusCode)
- if len(contentData) > 0 {
- _, err := writer.Write(contentData)
- if err != nil {
- logger.Warning("Write data to client error: " + err.Error())
- }
- }
- }
- //Start the engine and initialize the router.
- func (engine *rpcHttpServerEngine) Start(router *rpcRouter) {
- if engine.server != nil {
- logger.Warning("The server of engine already started, will be closed.")
- engine.Stop()
- }
- server := new(http.Server)
- engine.RpcServerEngineCore.SetRouter(router)
- handler := new(rpcHttpServerHandler)
- handler.engine = engine
- server.Handler = handler
- server.Addr = ":" + strconv.Itoa(engine.port)
- go func() {
- logger.Info(server.ListenAndServe().Error())
- }()
- }
- //Stop the engine and free the router.
- func (engine *rpcHttpServerEngine) Stop() {
- if engine.server != nil {
- err := engine.server.Close()
- if err != nil {
- logger.Warning("Close the server of engine error: " + err.Error())
- }
- engine.server = nil
- }
- engine.RpcServerEngineCore.SetRouter(nil)
- }
- // NewRpcHttpServerEngine Create a new RpcServerHttpEngine which based on the build-in http lib
- func NewRpcHttpServerEngine(port int) RpcServerEngine {
- engine := new(rpcHttpServerEngine)
- engine.port = port
- engine.RpcServerEngineCore = new(RpcServerEngineCore)
- return engine
- }
- //A basic http client engine which uses the build-in http lib.
- type rpcHttpClientEngine struct {
- requestId int
- serverHost string
- }
- // GetName Get the engine name.
- func (engine *rpcHttpClientEngine) GetName() string {
- return "RpcHttpClientEngine"
- }
- // ProcessData Send the rpc request data to the server.
- func (engine *rpcHttpClientEngine) ProcessData(serviceName string, method string, params []any) string {
- engine.requestId++
- data := requestData{"2.0", engine.requestId, method, nil}
- if len(params) > 0 {
- if len(params) == 1 {
- paramData, err := json.Marshal(params[0])
- if err == nil {
- data.Params = paramData
- } else {
- var sendErr any = errors.New("Send request error: " + err.Error())
- panic(sendErr)
- }
- } else {
- paramsData, err := json.Marshal(params)
- if err == nil {
- data.Params = paramsData
- } else {
- var sendErr any = errors.New("Send request error: " + err.Error())
- panic(sendErr)
- }
- }
- }
- requestData, err := json.Marshal(data)
- if err == nil {
- return engine.ProcessString(serviceName, string(requestData))
- } else {
- var sendErr any = errors.New("Send request error: " + err.Error())
- panic(sendErr)
- }
- }
- // ProcessString Process Send the rpc request to the server.
- func (engine *rpcHttpClientEngine) ProcessString(serviceName string, requestStr string) string {
- buffer := bytes.NewBufferString(requestStr)
- http.DefaultClient.Timeout = 5 * time.Second
- response, err := http.Post(engine.serverHost+"/"+serviceName, "application/json; charset=utf-8", buffer)
- if err == nil {
- contentLength := response.ContentLength
- buffer = new(bytes.Buffer)
- for contentLength > 0 {
- size, err := buffer.ReadFrom(response.Body)
- if err == nil {
- contentLength -= size
- } else {
- break
- }
- }
- return string(buffer.Bytes())
- } else {
- var sendErr any = errors.New("Send request error: " + err.Error())
- panic(sendErr)
- }
- }
- //Close the engine and free the router.
- func (engine *rpcHttpClientEngine) Close() {
- //DoNothing
- }
- // NewRpcHttpClientEngine Create a new rpc http client based on the build-in http lib
- func NewRpcHttpClientEngine(serverHost string) RpcClientEngine {
- engine := new(rpcHttpClientEngine)
- engine.serverHost = serverHost
- return engine
- }
|