|
@@ -0,0 +1,146 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+import json
|
|
|
+
|
|
|
+
|
|
|
+class CustomUtils():
|
|
|
+ def response_analysis(self, response):
|
|
|
+ """
|
|
|
+ 解析接口返回值:返回值分三种情况如下:
|
|
|
+ {"jsonrpc": "2.0","id": 1,"result": true}
|
|
|
+ {"jsonrpc": "2.0","id": 1,"error": {"code": 1002,"message": "Permission validation error"}}
|
|
|
+ {"jsonrpc": "2.0","id": 1,"result": {"LoginState": 0,"Token": "0126f5d7ef044088b075246c65a410ba","LockRemainingTimes":[{}],"PasswordExpired": false,"AccountName": ""}}
|
|
|
+
|
|
|
+ 返回值示例:
|
|
|
+ {'resultTag':'','content':'','message':''}
|
|
|
+ """
|
|
|
+ result = {'resultTag':'','content':'','code':'','message':''}
|
|
|
+ try:
|
|
|
+ if response.status_code == 200:
|
|
|
+ content = str(response.content, encoding = 'UTF-8')
|
|
|
+ # 若传入的数据为str类型需要将它转成dict类型
|
|
|
+ if isinstance(content, str):
|
|
|
+ content = json.loads(content)
|
|
|
+ # 若包含error则接口返回失败
|
|
|
+ if 'error' in content:
|
|
|
+ result['resultTag'] = 'false';
|
|
|
+ result['code'] = content['error']['code'];
|
|
|
+ result['message'] = content['error']['message'];
|
|
|
+ else :
|
|
|
+ # 若result为true时情况
|
|
|
+ if isinstance(content['result'], dict):
|
|
|
+ result['resultTag'] = 'true';
|
|
|
+ result['content'] = content['result'];
|
|
|
+ else :
|
|
|
+ result['resultTag'] = content['result'];
|
|
|
+ else:
|
|
|
+ raise Exception('Exception:'+str(response.status_code));
|
|
|
+ except Exception as ERROR:
|
|
|
+ print('Exception:'+str(ERROR));
|
|
|
+ raise ERROR;
|
|
|
+ return result;
|
|
|
+
|
|
|
+ def generate_params(self, method, *args):
|
|
|
+
|
|
|
+ """
|
|
|
+ 构建body体参数
|
|
|
+ {"jsonrpc": "2.0","method": "CommonSignUpAsync","params": [{"AnyAccount": "fnyml","AnyCode": "3245","Password": "222c33333","HeaderMap":{}}],"id": 1}
|
|
|
+
|
|
|
+ def loop(request):
|
|
|
+ # 元组遍历方式一:for
|
|
|
+ intupe_one = ('1', '2', '3', '4')
|
|
|
+ for value in tupe_one:
|
|
|
+ print(value)
|
|
|
+ # 元组遍历方式二:内置函数enumerate()
|
|
|
+ tupe_tow = ('1', '2', '3', '4')
|
|
|
+ for index, value in enumerate(tupe_tow):
|
|
|
+ print("下标:%s 值:%s" % (str(index), value))
|
|
|
+ # 元组遍历方式三:使用range()函数
|
|
|
+ tupe_three = ('1', '2', '3', '4')
|
|
|
+ for index in range(len(tupe_three)):
|
|
|
+ print(tupe_three[index])
|
|
|
+ # 元组遍历方式四: 使用iter()内置函数,返回迭代器对象
|
|
|
+ tupe_four = ('1', '2', '3', '4')for value in iter(tupe_four):
|
|
|
+ print(value)
|
|
|
+ # 列表遍历方式一:for
|
|
|
+ inlist_one = ('1', '2', '3', '4')
|
|
|
+ for value in list_one
|
|
|
+ :print(value)
|
|
|
+ # 列表遍历方式二:内置函数enumerate()
|
|
|
+ list_tow = ('1', '2', '3', '4')
|
|
|
+ for index, value in enumerate(list_tow):
|
|
|
+ print("下标:%s 值:%s" % (str(index), value))
|
|
|
+ # 列表遍历方式三:使用range()函数
|
|
|
+ list_three = ('1', '2', '3', '4')
|
|
|
+ for index in range(len(list_three)):
|
|
|
+ print(list_three[index])
|
|
|
+ # 列表遍历方式四: 使用iter()内置函数,返回迭代器对象
|
|
|
+ list_four = ('1', '2', '3', '4')
|
|
|
+ for value in iter(list_four):
|
|
|
+ print(value)
|
|
|
+ # 字典遍历方式一:for
|
|
|
+ indict_one = {"one": 1, "two": 2, "three": 3}
|
|
|
+ for value in dict_one:
|
|
|
+ print("K:%s Value:%d" % (value, dict_one[value]))
|
|
|
+ # 字典遍历方式二:使用dict的keys()方法
|
|
|
+ dict_two = {"one": 1, "two": 2, "three": 3}
|
|
|
+ for key in dict_two.keys():
|
|
|
+ print("K:%s Value:%d" % (key, dict_two[key]))
|
|
|
+ # 字典遍历方式三: 使用dict的values()方法
|
|
|
+ dict_three = {"one": 1, "two": 2, "three": 3}
|
|
|
+ for value in dict_three.values():
|
|
|
+ print("value:" + str(value))
|
|
|
+ # 字典遍历方式四: 使用dict的items()方法
|
|
|
+ dict_four = {"one": 1, "two": 2, "three": 3}
|
|
|
+ for item in dict_four.items():
|
|
|
+ print(item)
|
|
|
+ # 字典遍历方式五: 使用dict的items()方法,然后直接解包元组
|
|
|
+ dict_five = {"one": 1, "two": 2, "three": 3}
|
|
|
+ for key, value in dict_five.items():
|
|
|
+ print(key + ":" + str(value))
|
|
|
+ return response_success(message='列表、字典、元组遍历方式验证')
|
|
|
+ """
|
|
|
+ body = {"jsonrpc": "2.0","method": "","params": [],"id": 1}
|
|
|
+ try:
|
|
|
+ body['method'] = method;
|
|
|
+ for value in args:
|
|
|
+ body['params'].append(value);
|
|
|
+ except Exception as ERROR:
|
|
|
+ print('Exception:'+str(ERROR));
|
|
|
+ return body;
|
|
|
+
|
|
|
+ def result_analysis(self, result, resultTag, errorCode, message):
|
|
|
+ """
|
|
|
+ resultTag:返回标记:true/false
|
|
|
+ errorCode:错误码
|
|
|
+ message:错误内容
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ tag = result['resultTag']
|
|
|
+ code = result['code']
|
|
|
+ mes = result['message']
|
|
|
+ if not (resultTag is None):
|
|
|
+ if resultTag is tag:
|
|
|
+ return True;
|
|
|
+ else:
|
|
|
+ raise Exception('Exception:result tag not eq');
|
|
|
+ elif not (errorCode is None) and not (message is None):
|
|
|
+ if errorCode is code and message is mes:
|
|
|
+ return True;
|
|
|
+ else:
|
|
|
+ raise Exception('Exception:errorCode or message not eq');
|
|
|
+ else:
|
|
|
+ raise Exception('Exception:params error');
|
|
|
+ except Exception as ERROR:
|
|
|
+ raise ERROR;
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ utils = CustomUtils()
|
|
|
+ s = {"jsonrpc": "2.0","id": '1',"result": 'true'};
|
|
|
+ ss = {"jsonrpc": "2.0","id": 1,"error": {"code": 1002,"message": "Permission validation error"}}
|
|
|
+ sss = {"jsonrpc": "2.0","id": 1,"result": {"LoginState": 0,"Token": "0126f5d7ef044088b075246c65a410ba","LockRemainingTimes":[{}],"PasswordExpired": 'false',"AccountName": ""}}
|
|
|
+ print(utils.response_analysis(s));
|
|
|
+ print(utils.response_analysis(ss));
|
|
|
+ print(utils.response_analysis(sss));
|
|
|
+ print(utils.generate_params('CommonSignUpAsync',{"AnyAccount": "fnyml","AnyCode": "3245","Password": "222c33333","HeaderMap":{}},{"AnyAccount": "fnyml","AnyCode": "3245","Password": "222c33333","HeaderMap":{}}));
|