123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- import http.client
- import shutil
- import os
- import cv2
- import numpy as np
- import torch
- import json
- # 获取训练文件夹中的图片数量
- # token:训练用的授权码
- def get_image_count(token):
- uri = "localhost:8888"
- conn = http.client.HTTPConnection(uri)
- conn.request("GET", "/GetFolderFileCount?token=" + token)
- res = conn.getresponse()
- if res.status == 200:
- count_data = res.read(4)
- #print(count_data)
- count = int.from_bytes(count_data, byteorder='little', signed=True)
- return count
- else:
- reason = str(res.reason)
- if len(reason) > 0 :
- reason = ", reason: " + reason
- raise Exception("Get file count failed, could be url error or token error. Code: " + str(res.status) + reason)
- # 获取图片和其对应的标注数据
- # token:训练用的授权码
- # index:图片序号
- def get_labeled_image(token, index):
- uri = "localhost:8888"
- conn = http.client.HTTPConnection(uri)
- conn.request("GET", "/GetTrainFile?token=" + token + "&index=" + str(index))
- res = conn.getresponse()
- if res.status == 200:
- image_size_data = res.read(4)
- image_size = int.from_bytes(image_size_data, byteorder='little', signed=True)
- image_data = res.read(image_size)
- label_size_data = res.read(4)
- label_size = int.from_bytes(label_size_data, byteorder='little', signed=True)
- label_data = res.read(label_size).decode("utf8")
- file_name_size_data = res.read(4)
- file_name_size = int.from_bytes(file_name_size_data, byteorder='little', signed=True)
- file_name = res.read(file_name_size).decode("utf8")
- return image_data, label_data, file_name
- else:
- raise Exception("Get image and label failed, could be url error or token error. Code: " + str(res.status))
- # 将训练结果的模型保存到系统可识别的路径下
- # trainedFile: 要保存的训练结果
- def save_output_model(trainedFile):
- uri = "localhost:8888"
- conn = http.client.HTTPConnection(uri)
- conn.request("GET", "/GetModelOutputFolder")
- res = conn.getresponse()
- if res.status == 200:
- outputFolder = res.readlines()[0].decode("utf8")
- sourceFolder, sourceFileName = os.path.split(trainedFile)
- outputFile = os.path.join(outputFolder, sourceFileName)
- shutil.copy(trainedFile, outputFile)
- else:
- raise Exception("Save output model failed, could be url error or token error. Code: " + str(res.status))
- # 获取测试文件夹中的图片数量
- # token:授权码
- def get_test_image_count(token):
- uri = "localhost:8888"
- conn = http.client.HTTPConnection(uri)
- conn.request("GET", "/GetTestFolderFileCount?token=" + token)
- res = conn.getresponse()
- if res.status == 200:
- count_data = res.read(4)
- count = int.from_bytes(count_data, byteorder='little', signed=True)
- return count
- else:
- raise Exception("Get test image count failed, could be url error or token error. Code: " + str(res.status))
- # 获取测试图片和其对应的标注数据
- # token:授权码
- # index:图片序号
- def get_test_labeled_image(token, index):
- uri = "localhost:8888"
- conn = http.client.HTTPConnection(uri)
- conn.request("GET", "/GetTestFile?token=" + token + "&index=" + str(index))
- res = conn.getresponse()
- if res.status == 200:
- image_size_data = res.read(4)
- image_size = int.from_bytes(image_size_data, byteorder='little', signed=True)
- image_data = res.read(image_size)
- label_size_data = res.read(4)
- label_size = int.from_bytes(label_size_data, byteorder='little', signed=True)
- label_data = res.read(label_size).decode("utf8")
- file_name_size_data = res.read(4)
- file_name_size = int.from_bytes(file_name_size_data, byteorder='little', signed=True)
- file_name = res.read(file_name_size).decode("utf8")
- return image_data, label_data, file_name
- else:
- return bytearray(), "", "" # 读取数据时有无法读到的现象
- # raise Exception("Get test image and label failed, could be url error or token error. Code: " + str(res.status))
-
- def label_preprocess(label):
- class_dict = {'BMode':0, 'BModeBlood':1,'Pseudocolor':2, 'PseudocolorBlood':3, 'Spectrogram':4, 'CEUS':5, 'SE':6,'STE':7,'FourDime':8}
- try:
- txt_info_dict = json.loads(label)
- classes = txt_info_dict[0]["FileResultInfos"][0]["LabeledResult"]["ImageResults"][0]["Conclusion"]["Title"]
- dst_label = class_dict[classes]
- return dst_label
- except Exception as e:
- print('label data process wrong!,{}'.format(e))
- def preprocess(image):
- nparr_data = np.frombuffer(image, dtype=np.uint8)
- img_data = cv2.imdecode(nparr_data, cv2.IMREAD_GRAYSCALE)
- img_data = cv2.cvtColor(img_data, cv2.COLOR_BGR2RGB)
- print(img_data)
- # img_data = transform_image(img_data)
- # print(img_data)
- img_data = cv2.resize(img_data, (224, 224))
- if len(img_data.shape) == 2:
- img_data = np.expand_dims(img_data, axis=2)
- if img_data.max() > 1:
- img_data = img_data.astype('float32') / 255
- img_data = img_data.transpose((2, 0, 1))
- img_data = torch.from_numpy(img_data).type(torch.FloatTensor)
- return img_data
- if __name__ == "__main__":
- image_count = get_test_image_count("4925EC4929684AA0ABB0173B03CFC8FF")
- for index in range(image_count):
- image, label, name = get_test_labeled_image("4925EC4929684AA0ABB0173B03CFC8FF", index)
- if label != "":
- dst_label = label_preprocess(label)
- txt_info_dict = json.loads(label)
- # print(index, name, txt_info_dict[0]["LabeledUltrasoundFileId"], dst_label)
- else:
- print(index)
|