# -*- coding: utf-8 -*- # @ File : predict.py # @ Author : Guido LuXiaohao # @ Date : 2021/8/30 # @ Software : PyCharm # @ Description: 用于模型推理 import json import math import os import sys import time from argparse import ArgumentParser from pathlib import Path import cv2 import numpy as np import onnxruntime as ort import torch from PIL import Image, ImageDraw, ImageFont from dataset import LoadImages, resize_LongestMaxSize from evaluation.utils.metric_file_generate import \ prfile_generate_semantic_segmentation FILE = Path(__file__).resolve() ROOT = FILE.parents[0] # project root directory if str(ROOT) not in sys.path: sys.path.append(str(ROOT)) # add ROOT to PATH ROOT = Path(os.path.relpath(ROOT, Path.cwd())) # relative path # {label: [min_area, area_rate, max_count], ...} NECKORGAN_CFGS = { "甲状腺横切": [0, 1 / 25, False], "甲状腺纵切": [0, 1 / 25, True], "颈动脉短轴": [0, 1 / 100, False], "颈动脉长轴": [0, 1 / 100, True], "颈部气管": [0, 1 / 25, True] } NECKORGAN_CFGS2 = [ { "甲状腺横切": [0, 1 / 25, False], "甲状腺纵切": [0, 1 / 25, True], "颈动脉短轴": [0, 1 / 100, False], "颈动脉长轴": [0, 1 / 100, True]}, { "颈部气管": [0, 1 / 25, True]}, ] NECKLESION_CFGS = { "斑块或内中膜增厚": [50, 0, False] } NERVE_CFGS = { "神经": [0, 1 / 100, False], "动脉": [0, 1 / 100, False], "静脉": [0, 1 / 100, False]} def resize_LongestMaxSize_back(image, ori_size, resize_mode=cv2.INTER_CUBIC): # 将经过resize_LongestMaxSize的图像去除pad填充,再resize回原始尺寸 image_height = ori_size[0] image_width = ori_size[1] norm_size = image.shape[0] if len(image.shape) != 3: image = np.expand_dims(image, -1) if image_height > image_width: pad_len = int((norm_size - norm_size * image_width / image_height) / 2) image_before_pad = image[0:norm_size, pad_len:norm_size - pad_len] image = cv2.resize(image_before_pad, (image_width, image_height), interpolation=resize_mode) else: pad_len = int((norm_size - norm_size * image_height / image_width) / 2) image_before_pad = image[pad_len:norm_size - pad_len, 0:norm_size] image = cv2.resize(image_before_pad, (image_width, image_height), interpolation=resize_mode) return image def semantics_segmentation_postprocess(input_tensor, label_index, min_area, area_rate, max_count=True): # 筛选语义分割结果 select_mask = (input_tensor[..., label_index].squeeze() > 0.5) * 255 select_mask = np.array(select_mask, np.uint8) all_area = select_mask.shape[0] * select_mask.shape[1] output_mask = np.zeros(select_mask.shape[:2], dtype=np.uint8) # 对应轮廓寻找 contours, _ = cv2.findContours(select_mask.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) contours = list(contours) output_contours = [] if len(contours) != 0: contours.sort(key=lambda x: cv2.contourArea(x), reverse=True) for contour_idx in range(len(contours)): # 轮廓为一点时,是个向量 if np.squeeze(contours[contour_idx]).shape[0] > 10 and \ cv2.contourArea(contours[contour_idx]) > min_area and \ cv2.contourArea(contours[contour_idx]) > (all_area * area_rate): output_contours.append(contours[contour_idx]) if len(output_contours) != 0: if max_count: output_mask = cv2.drawContours(output_mask.copy(), [output_contours[0]], -1, label_index, cv2.FILLED) else: output_mask = cv2.drawContours(output_mask.copy(), output_contours, -1, label_index, cv2.FILLED) return output_mask def contour_sort(mask, output_label, label_mapper): each_image_label_list = [] # 对应轮廓寻找 contours, _ = cv2.findContours(mask.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) contours = list(contours) contours.sort(key=lambda x: cv2.contourArea(x), reverse=True) for i in range(len(contours)): each_label_dict = { "contour": contours[i], "label_value": label_mapper[output_label], "area": cv2.contourArea(contours[i])} each_image_label_list.append(each_label_dict) return each_image_label_list def draw_contours(im0, pr_contour, class_name, fontText, color_list): im0 = Image.fromarray(im0) # BGR color = tuple([int(x) for x in color_list[pr_contour['label_value'] - 1]]) # B, G, R # color = (0, 0, 255) # red draw = ImageDraw.Draw(im0) draw.text(pr_contour['contour'][0][0], class_name[pr_contour['label_value'] - 1], fill=color, font=fontText, stroke_width=1) im0 = np.asarray(im0) cv2.drawContours(im0, pr_contour['contour'], contourIdx=-1, color=color, thickness=2) return im0 def get_bounding_box(contours, img_size, expand_pixel=40): # 默认外扩40个像素 roi_points = [list(i[0]) for i in contours['contour']] x, y = [], [] point_count = len(roi_points) for ni in range(point_count): point_x, point_y = roi_points[ni][:] x.append(int(point_x)) y.append(int(point_y)) left, right = max(min(x) - expand_pixel, 0), min(max(x) + expand_pixel, img_size[1]) # 防止超出边界 top, bottom = max(min(y) - expand_pixel, 0), min(max(y) + expand_pixel, img_size[0]) return [top, bottom, left, right] def generate_pred_file(predict_info, index_class_map): predict_info.sort(key=lambda x: x["Label"], reverse=False) output_info = [{ "FileResultInfos": [{ "Index": 0, "FrameStatus": None, "LabeledResult": {"Rois": []}}]}] for label_idx, label_info in enumerate(predict_info): if len(label_info["Contours"]) > 0: pts = [{"X": x, "Y": y} for x, y in label_info["Contours"][0]] output_info[0]['FileResultInfos'][0]['LabeledResult']['Rois'].append({ "Index": label_idx, "Points": pts, "Conclusion": { "Title": index_class_map[label_info["Label"]], "Confidence": label_info["Confidence"]}}) else: continue return json.dumps(output_info, ensure_ascii=False) def model_prepare(model, num_classes, device='cuda'): if model.endswith(".onnx"): _backend = "ONNX" elif model.endswith((".pt", ".pth")): _backend = "PyTorch" else: _backend = None raise RuntimeError("args model must be supported format files: {'.onnx', '.pt', '.pth'}!") if _backend == "ONNX": sess_options = ort.SessionOptions() sess_options.execution_mode = ort.ExecutionMode.ORT_PARALLEL sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_DISABLE_ALL providers = ['CPUExecutionProvider'] if device == "cuda": providers = ['CUDAExecutionProvider'] + providers elif device == "limit_cpu": sess_options.inter_op_num_threads = 1 # number of threads used to parallelize the execution across nodes sess_options.intra_op_num_threads = 1 # number of threads used to parallelize the execution within nodes # load ONNX model seg_model = ort.InferenceSession(model, sess_options, providers=providers) nc = [] for output in seg_model.get_outputs(): nc += [output.shape[1]] elif _backend == "PyTorch": print("You are using PyTorch as inference backend. Please make sure you call the correct model module!") import models seg_model = models.PPLiteSeg( num_classes=num_classes, backbone=models.STDC1(attn=True), arm_type="UAFM_ChAtten", apply_nonlin=True) csd = torch.load(model, map_location='cpu')['model'].float().state_dict() seg_model.load_state_dict(csd) # load model state_dict seg_model.to(device).eval() nc = model.num_classes else: seg_model = None nc = None return seg_model, nc, _backend def focus_seg(image, model, label_mapper, image_pre_size=(256, 256), resize_mode="normal", preprocess_mode=None, inference_backend="ONNX", postprocess_cfgs=None): """分割模型识别函数 Args: image: 输入为裁切掉医院等无关信息的图 model: 分割模型 label_mapper: 用于对模型输出类别的映射 image_pre_size: 图像预处理尺寸 resize_mode: 图像缩放方式 preprocess_mode: 图像预处理方式 inference_backend: 推理后端类型 postprocess_cfgs: 后处理配置 Returns: tuple: tuple contains: output_contours: 模型预测的3维数组, 原图像尺寸 output_mask: 形状为[N, H, W]的原尺寸掩模图像 T0: 图片前处理耗时 T1: 模型推理耗时 """ t0_0 = time.time() image_src = image.copy() # 尺寸放缩到固定大小 if resize_mode == "normal": h = image_pre_size[0] w = image_pre_size[1] image = cv2.resize(image, (w, h)) elif resize_mode == "fitLargeSizeAndPad": image = resize_LongestMaxSize(image, image_pre_size[0], resize_mode=cv2.INTER_CUBIC) else: image = image # 预测网络固定前处理部分 if preprocess_mode == "normalization1": predict_image = (image / 255.0).astype("float32") elif preprocess_mode == "normalization2": predict_image = ((image / 255.0).astype("float32") - 0.5) / 2.0 elif preprocess_mode == "decentralization": img_mean = np.mean(image) img_std = np.std(image) predict_image = ((image - img_mean + 10e-7) / (img_std + 10e-7)).astype("float32") elif preprocess_mode == "int8": predict_image = (image - 255).astype("int8") else: predict_image = image ############################ t0_1 = time.time() T0 = t0_1 - t0_0 print("图片前处理时间:%0.8f" % T0) t1_0 = time.time() # 模型预测 predict_image = np.expand_dims(predict_image, axis=0) input_image = predict_image.transpose((0, 3, 1, 2)) if inference_backend == "ONNX": input_name = model.get_inputs()[0].name pr_masks = model.run([], {input_name: input_image}) pr_masks = [pr_masks] if not isinstance(pr_masks, list) else pr_masks elif inference_backend == "PyTorch": input_image = torch.from_numpy(input_image).to("cuda" if torch.cuda.is_available() else "cpu") pr_masks = model(input_image) pr_masks = [pr_masks] if not isinstance(pr_masks, list) else pr_masks pr_masks = [pr_mask.detach().cpu().numpy() for pr_mask in pr_masks] else: raise RuntimeError("Unsupported backend type! Only ONNX and PyTorch are supported.") t1_1 = time.time() T1 = t1_1 - t1_0 print("模型预测时间:%0.8f" % T1) pr_masks = [pr_mask.squeeze(0).transpose((1, 2, 0)) for pr_mask in pr_masks] if resize_mode == "fitLargeSizeAndPad": pr_masks = [ resize_LongestMaxSize_back(pr_mask, image_src.shape[0:2], resize_mode=cv2.INTER_LINEAR ) for pr_mask in pr_masks ] else: pr_masks = [ cv2.resize(pr_mask, (image_src.shape[1], image_src.shape[0]), interpolation=cv2.INTER_LINEAR ) for pr_mask in pr_masks ] # mask后处理 output_contours = [] for out_idx, mask in enumerate(pr_masks): for class_idx, label in enumerate(postprocess_cfgs[out_idx].keys()): min_area, area_rate, max_count = postprocess_cfgs[out_idx][label] pr_class_mask = semantics_segmentation_postprocess(mask, class_idx + 1, min_area, area_rate, max_count) pr_class_contour = contour_sort(pr_class_mask, class_idx + 1, label_mapper[out_idx]) output_contours.extend(pr_class_contour) output_contours.sort(key=lambda x: x["area"], reverse=True) output_mask = np.zeros((image_src.shape[0], image_src.shape[1]), dtype=np.uint8) for single_contour in output_contours: output_mask = cv2.drawContours(output_mask.copy(), [single_contour["contour"]], -1, single_contour["label_value"], cv2.FILLED) return output_contours, output_mask, T0, T1 def run(model, source, imgsz=(256, 256), postprocess_cfgs=None, save_dir=None, device='cuda', save_txt=False): """用于非串联检测,如仅检测颈动脉 """ source = str(source) fontText = ImageFont.truetype(r"C:\Windows\Fonts\msyhl.ttc", 18, encoding="utf-8") if not isinstance(postprocess_cfgs, list): postprocess_cfgs = [postprocess_cfgs] class_name = [] for cfg in postprocess_cfgs: class_name += list(cfg.keys()) # prepare model seg_model, nc, _backend = model_prepare(model, 3, device) # 3为模型的输出类别数量,依据实际情况修改 if not isinstance(nc, list): nc = [nc] nc = [c - 1 for c in nc] label_mapper = [{i: i+sum(nc[:idx]) if idx > 0 else i for i in range(1, c+1)} for idx, c in enumerate(nc)] # Dataloader dataset = LoadImages(source) bs = len(dataset) vid_path, vid_writer = [None] * bs, [None] * bs # Run inference Time = [] for path, im, im0, vid_cap, s in dataset: print(f'{s}') save_path = str(save_dir / Path(path).name) pr_contours, pr_focus_mask, T0, T1 = focus_seg(im, seg_model, label_mapper, image_pre_size=imgsz, resize_mode="normal", preprocess_mode="normalization1", inference_backend=_backend, postprocess_cfgs=postprocess_cfgs) Time.append(T1) # record inference time # draw contours for pr_contour in pr_contours: im0 = draw_contours(im0, pr_contour, class_name, fontText, COLOR_LIST_ORGAN) if dataset.mode == 'image': cv2.imencode(".jpg", im0, [int(cv2.IMWRITE_JPEG_QUALITY), 100])[1].tofile(save_path) else: # 'video' if vid_path[dataset.count] != save_path: # new video vid_path[dataset.count] = save_path if isinstance(vid_writer[dataset.count], cv2.VideoWriter): vid_writer[dataset.count].release() # release previous video writer if vid_cap: # video fps = vid_cap.get(cv2.CAP_PROP_FPS) w = im0.shape[1] h = im0.shape[0] else: fps, w, h = 30, im.shape[1], im.shape[0] save_path = str(Path(save_path).with_suffix('.mp4')) vid_writer[dataset.count] = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h)) vid_writer[dataset.count].write(im0) if save_txt: pred_info = prfile_generate_semantic_segmentation(pr_contours, im.shape[:2]) prf = generate_pred_file(pred_info, {k + 1: v for k, v in enumerate(class_name)}) with open(Path(save_path).with_suffix(".txt"), "w", encoding="utf-8") as f: f.write(prf) # exclude warmup inference warmup = 5 for _ in range(warmup): Time.pop(0) inference_speed = sum(Time) / len(Time) print('#################################\n' '平均每张图像推理速度为:{:.6f}\n'.format(inference_speed), '#################################\n') def run_pipeline(organ_model, lesion_model, source, imgsz=(256, 256), postprocess_cfgs_organ=None, postprocess_cfgs_lesion=None, save_dir=None, device='cuda'): ''' 用于串联检测,脏器+病灶,依据脏器的检测结果作为病灶的输入图像,例如颈动脉+斑块 ''' source = str(source) fontText = ImageFont.truetype(r"C:\Windows\Fonts\msyhl.ttc", 18, encoding="utf-8") class_name_organ = list(postprocess_cfgs_organ.keys()) class_name_lesion = list(postprocess_cfgs_lesion.keys()) # prepare model seg_organ_model, organ_backend = model_prepare(organ_model, 3, device) # 3为模型的输出类别数量,依据实际情况修改 seg_lesion_model, lesion_backend = model_prepare(lesion_model, 2, device) # Dataloader dataset = LoadImages(source) bs = len(dataset) # batch size vid_path, vid_writer = [None] * bs, [None] * bs # Run inference Time = [] for path, im, im0, vid_cap, s in dataset: print(f'{s}') save_path = str(save_dir / Path(path).name) pr_contours_organ, pr_focus_mask_organ, T0, T1 = \ focus_seg( im, seg_organ_model, image_pre_size=imgsz, resize_mode="normal", preprocess_mode="normalization1", inference_backend=organ_backend, postprocess_cfgs=postprocess_cfgs_organ) for pr_contour_organ in pr_contours_organ: # TODO 需要进行病灶检测的脏器类别id,可能为多种脏器,待组成一个list if pr_contour_organ['label_value'] == 2: # 获取外接扩展矩形,如若不需要扩充像素,expand_pixel设置为0 organ_bounding_box = get_bounding_box(pr_contour_organ, im.shape[:2], expand_pixel=40) im_lesion = im[organ_bounding_box[0]: organ_bounding_box[1], organ_bounding_box[2]: organ_bounding_box[3]] # 裁切好的图像送入病灶检测模型 pr_contours_lesion, pr_focus_mask_lesion, T0, T1 = \ focus_seg( im_lesion, seg_lesion_model, image_pre_size=imgsz, resize_mode="fitLargeSizeAndPad", preprocess_mode="normalization1", inference_backend=lesion_backend, postprocess_cfgs=postprocess_cfgs_lesion) # draw contours for pr_contour_lesion in pr_contours_lesion: im_lesion = draw_contours(im_lesion, pr_contour_lesion, class_name_lesion, fontText, COLOR_LIST_LESION) im0 = im0.copy() im0[organ_bounding_box[0]: organ_bounding_box[1], organ_bounding_box[2]: organ_bounding_box[3]] = im_lesion im0 = draw_contours(im0, pr_contour_organ, class_name_organ, fontText, COLOR_LIST_ORGAN) Time.append(T1) # record inference time if dataset.mode == 'image': cv2.imencode(".jpg", im0, [int(cv2.IMWRITE_JPEG_QUALITY), 100])[1].tofile(save_path) else: # 'video' if vid_path[dataset.count] != save_path: # new video vid_path[dataset.count] = save_path if isinstance(vid_writer[dataset.count], cv2.VideoWriter): vid_writer[dataset.count].release() # release previous video writer if vid_cap: # video fps = vid_cap.get(cv2.CAP_PROP_FPS) w = im0.shape[1] h = im0.shape[0] else: fps, w, h = 30, im.shape[1], im.shape[0] save_path = str(Path(save_path).with_suffix('.mp4')) vid_writer[dataset.count] = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h)) vid_writer[dataset.count].write(im0) Time.pop(0) inference_speed = sum(Time) / len(Time) print('#################################\n' '平均每张图像推理速度为:{:.6f}\n'.format(inference_speed), '#################################\n') def parse_args(): parser = ArgumentParser() parser.add_argument( "--organ_model", help="organ model path", type=str, default=r"model.onnx") parser.add_argument( "--lesion_model", help="lesion model path", type=str, default=None) parser.add_argument( "--source", help="root path to predict", type=str, default=None) parser.add_argument( "--imgsz", help="inference size h,w", type=tuple, default=(256, 256)) parser.add_argument( "--cfg_organ", help="postprocess configurations", default=NECKORGAN_CFGS2) parser.add_argument( "--cfg_lesion", help="postprocess configurations", default=NECKLESION_CFGS) parser.add_argument( "--save_dir", help="directory to save predicting results", type=str, default=None) parser.add_argument( "--device", help="select using cuda, cpu, or cpu with one core and one thread", type=str, choices=['cuda', 'cpu', 'limit_cpu'], default="cuda") parser.add_argument( "--save_txt", help="If true, save results to *.txt", type=bool, default=False) return parser.parse_args() if __name__ == '__main__': opt = parse_args() if not isinstance(opt.cfg_organ, list): opt.cfg_organ = [opt.cfg_organ] organ_categories = [] for cfg in opt.cfg_organ: organ_categories += list(cfg.keys()) seed_arr_organ = np.array([range(1, 255, math.ceil(255 / len(organ_categories)))]).astype(np.uint8) COLOR_LIST_ORGAN = cv2.applyColorMap(seed_arr_organ, cv2.COLORMAP_RAINBOW)[0] seed_arr_lesion = np.array([range(1, 255, math.ceil(255 / len(opt.cfg_lesion)))]).astype(np.uint8) COLOR_LIST_LESION = cv2.applyColorMap(seed_arr_lesion, cv2.COLORMAP_RAINBOW)[0] # Directories save_dir = Path(str(opt.save_dir)) / 'predict' save_dir.mkdir(exist_ok=True) if opt.lesion_model is not None: # 串联检测,如检测颈动脉+斑块,目前仅支持串联两个模型,而串联三个模型的有待后续使用到再添加 run_pipeline( opt.organ_model, opt.lesion_model, opt.source, imgsz=opt.imgsz, postprocess_cfgs_organ=opt.cfg_organ, postprocess_cfgs_lesion=opt.cfg_lesion, save_dir=save_dir, device=opt.device) else: # 非串联检测,即单个模型推理 run( opt.organ_model, opt.source, imgsz=opt.imgsz, postprocess_cfgs=opt.cfg_organ, save_dir=save_dir, device=opt.device, save_txt=opt.save_txt)