123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590 |
- # -*- coding: utf-8 -*-
- # @ File : predict.py
- # @ Author : Guido LuXiaohao
- # @ Date : 2021/8/30
- # @ Software : PyCharm
- # @ Description: 用于模型推理
- import json
- import math
- import os
- import sys
- import time
- from argparse import ArgumentParser
- from pathlib import Path
- import cv2
- import numpy as np
- import onnxruntime as ort
- import torch
- from PIL import Image, ImageDraw, ImageFont
- from dataset import LoadImages, resize_LongestMaxSize
- from evaluation.utils.metric_file_generate import \
- prfile_generate_semantic_segmentation
- FILE = Path(__file__).resolve()
- ROOT = FILE.parents[0] # project root directory
- if str(ROOT) not in sys.path:
- sys.path.append(str(ROOT)) # add ROOT to PATH
- ROOT = Path(os.path.relpath(ROOT, Path.cwd())) # relative path
- # {label: [min_area, area_rate, max_count], ...}
- "甲状腺横切": [0, 1 / 25, False],
- "甲状腺纵切": [0, 1 / 25, True],
- "颈动脉短轴": [0, 1 / 100, False],
- "颈动脉长轴": [0, 1 / 100, True],
- "颈部气管": [0, 1 / 25, True]
- }
- {
- "甲状腺横切": [0, 1 / 25, False],
- "甲状腺纵切": [0, 1 / 25, True],
- "颈动脉短轴": [0, 1 / 100, False],
- "颈动脉长轴": [0, 1 / 100, True]},
- {
- "颈部气管": [0, 1 / 25, True]},
- ]
- "斑块或内中膜增厚": [50, 0, False]
- }
- "神经": [0, 1 / 100, False],
- "动脉": [0, 1 / 100, False],
- "静脉": [0, 1 / 100, False]}
- def resize_LongestMaxSize_back(image, ori_size, resize_mode=cv2.INTER_CUBIC):
- # 将经过resize_LongestMaxSize的图像去除pad填充,再resize回原始尺寸
- image_height = ori_size[0]
- image_width = ori_size[1]
- norm_size = image.shape[0]
- if len(image.shape) != 3:
- image = np.expand_dims(image, -1)
- if image_height > image_width:
- pad_len = int((norm_size - norm_size * image_width / image_height) / 2)
- image_before_pad = image[0:norm_size, pad_len:norm_size - pad_len]
- image = cv2.resize(image_before_pad, (image_width, image_height), interpolation=resize_mode)
- else:
- pad_len = int((norm_size - norm_size * image_height / image_width) / 2)
- image_before_pad = image[pad_len:norm_size - pad_len, 0:norm_size]
- image = cv2.resize(image_before_pad, (image_width, image_height), interpolation=resize_mode)
- return image
- def semantics_segmentation_postprocess(input_tensor,
- label_index,
- min_area,
- area_rate,
- max_count=True):
- # 筛选语义分割结果
- select_mask = (input_tensor[..., label_index].squeeze() > 0.5) * 255
- select_mask = np.array(select_mask, np.uint8)
- all_area = select_mask.shape[0] * select_mask.shape[1]
- output_mask = np.zeros(select_mask.shape[:2], dtype=np.uint8)
- # 对应轮廓寻找
- contours, _ = cv2.findContours(select_mask.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
- contours = list(contours)
- output_contours = []
- if len(contours) != 0:
- contours.sort(key=lambda x: cv2.contourArea(x), reverse=True)
- for contour_idx in range(len(contours)):
- # 轮廓为一点时,是个向量
- if np.squeeze(contours[contour_idx]).shape[0] > 10 and \
- cv2.contourArea(contours[contour_idx]) > min_area and \
- cv2.contourArea(contours[contour_idx]) > (all_area * area_rate):
- output_contours.append(contours[contour_idx])
- if len(output_contours) != 0:
- if max_count:
- output_mask = cv2.drawContours(output_mask.copy(), [output_contours[0]], -1, label_index, cv2.FILLED)
- else:
- output_mask = cv2.drawContours(output_mask.copy(), output_contours, -1, label_index, cv2.FILLED)
- return output_mask
- def contour_sort(mask, output_label, label_mapper):
- each_image_label_list = []
- # 对应轮廓寻找
- contours, _ = cv2.findContours(mask.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
- contours = list(contours)
- contours.sort(key=lambda x: cv2.contourArea(x), reverse=True)
- for i in range(len(contours)):
- each_label_dict = {
- "contour": contours[i],
- "label_value": label_mapper[output_label],
- "area": cv2.contourArea(contours[i])}
- each_image_label_list.append(each_label_dict)
- return each_image_label_list
- def draw_contours(im0, pr_contour, class_name, fontText, color_list):
- im0 = Image.fromarray(im0) # BGR
- color = tuple([int(x) for x in color_list[pr_contour['label_value'] - 1]]) # B, G, R
- # color = (0, 0, 255) # red
- draw = ImageDraw.Draw(im0)
- draw.text(pr_contour['contour'][0][0], class_name[pr_contour['label_value'] - 1],
- fill=color, font=fontText, stroke_width=1)
- im0 = np.asarray(im0)
- cv2.drawContours(im0, pr_contour['contour'], contourIdx=-1, color=color, thickness=2)
- return im0
- def get_bounding_box(contours, img_size, expand_pixel=40):
- # 默认外扩40个像素
- roi_points = [list(i[0]) for i in contours['contour']]
- x, y = [], []
- point_count = len(roi_points)
- for ni in range(point_count):
- point_x, point_y = roi_points[ni][:]
- x.append(int(point_x))
- y.append(int(point_y))
- left, right = max(min(x) - expand_pixel, 0), min(max(x) + expand_pixel, img_size[1]) # 防止超出边界
- top, bottom = max(min(y) - expand_pixel, 0), min(max(y) + expand_pixel, img_size[0])
- return [top, bottom, left, right]
- def generate_pred_file(predict_info, index_class_map):
- predict_info.sort(key=lambda x: x["Label"], reverse=False)
- output_info = [{
- "FileResultInfos": [{
- "Index": 0,
- "FrameStatus": None,
- "LabeledResult": {"Rois": []}}]}]
- for label_idx, label_info in enumerate(predict_info):
- if len(label_info["Contours"]) > 0:
- pts = [{"X": x, "Y": y} for x, y in label_info["Contours"][0]]
- output_info[0]['FileResultInfos'][0]['LabeledResult']['Rois'].append({
- "Index": label_idx,
- "Points": pts,
- "Conclusion": {
- "Title": index_class_map[label_info["Label"]],
- "Confidence": label_info["Confidence"]}})
- else:
- continue
- return json.dumps(output_info, ensure_ascii=False)
- def model_prepare(model, num_classes, device='cuda'):
- if model.endswith(".onnx"):
- _backend = "ONNX"
- elif model.endswith((".pt", ".pth")):
- _backend = "PyTorch"
- else:
- _backend = None
- raise RuntimeError("args model must be supported format files: {'.onnx', '.pt', '.pth'}!")
- if _backend == "ONNX":
- sess_options = ort.SessionOptions()
- sess_options.execution_mode = ort.ExecutionMode.ORT_PARALLEL
- sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_DISABLE_ALL
- providers = ['CPUExecutionProvider']
- if device == "cuda":
- providers = ['CUDAExecutionProvider'] + providers
- elif device == "limit_cpu":
- sess_options.inter_op_num_threads = 1 # number of threads used to parallelize the execution across nodes
- sess_options.intra_op_num_threads = 1 # number of threads used to parallelize the execution within nodes
- # load ONNX model
- seg_model = ort.InferenceSession(model, sess_options, providers=providers)
- nc = []
- for output in seg_model.get_outputs():
- nc += [output.shape[1]]
- elif _backend == "PyTorch":
- print("You are using PyTorch as inference backend. Please make sure you call the correct model module!")
- import models
- seg_model = models.PPLiteSeg(
- num_classes=num_classes, backbone=models.STDC1(attn=True), arm_type="UAFM_ChAtten", apply_nonlin=True)
- csd = torch.load(model, map_location='cpu')['model'].float().state_dict()
- seg_model.load_state_dict(csd) # load model state_dict
- seg_model.to(device).eval()
- nc = model.num_classes
- else:
- seg_model = None
- nc = None
- return seg_model, nc, _backend
- def focus_seg(image,
- model,
- label_mapper,
- image_pre_size=(256, 256),
- resize_mode="normal",
- preprocess_mode=None,
- inference_backend="ONNX",
- postprocess_cfgs=None):
- """分割模型识别函数
- Args:
- image: 输入为裁切掉医院等无关信息的图
- model: 分割模型
- label_mapper: 用于对模型输出类别的映射
- image_pre_size: 图像预处理尺寸
- resize_mode: 图像缩放方式
- preprocess_mode: 图像预处理方式
- inference_backend: 推理后端类型
- postprocess_cfgs: 后处理配置
- Returns:
- tuple: tuple contains:
- output_contours: 模型预测的3维数组, 原图像尺寸
- output_mask: 形状为[N, H, W]的原尺寸掩模图像
- T0: 图片前处理耗时
- T1: 模型推理耗时
- """
- t0_0 = time.time()
- image_src = image.copy()
- # 尺寸放缩到固定大小
- if resize_mode == "normal":
- h = image_pre_size[0]
- w = image_pre_size[1]
- image = cv2.resize(image, (w, h))
- elif resize_mode == "fitLargeSizeAndPad":
- image = resize_LongestMaxSize(image, image_pre_size[0], resize_mode=cv2.INTER_CUBIC)
- else:
- image = image
- # 预测网络固定前处理部分
- if preprocess_mode == "normalization1":
- predict_image = (image / 255.0).astype("float32")
- elif preprocess_mode == "normalization2":
- predict_image = ((image / 255.0).astype("float32") - 0.5) / 2.0
- elif preprocess_mode == "decentralization":
- img_mean = np.mean(image)
- img_std = np.std(image)
- predict_image = ((image - img_mean + 10e-7) / (img_std + 10e-7)).astype("float32")
- elif preprocess_mode == "int8":
- predict_image = (image - 255).astype("int8")
- else:
- predict_image = image
- ############################
- t0_1 = time.time()
- T0 = t0_1 - t0_0
- print("图片前处理时间:%0.8f" % T0)
- t1_0 = time.time()
- # 模型预测
- predict_image = np.expand_dims(predict_image, axis=0)
- input_image = predict_image.transpose((0, 3, 1, 2))
- if inference_backend == "ONNX":
- input_name = model.get_inputs()[0].name
- pr_masks = model.run([], {input_name: input_image})
- pr_masks = [pr_masks] if not isinstance(pr_masks, list) else pr_masks
- elif inference_backend == "PyTorch":
- input_image = torch.from_numpy(input_image).to("cuda" if torch.cuda.is_available() else "cpu")
- pr_masks = model(input_image)
- pr_masks = [pr_masks] if not isinstance(pr_masks, list) else pr_masks
- pr_masks = [pr_mask.detach().cpu().numpy() for pr_mask in pr_masks]
- else:
- raise RuntimeError("Unsupported backend type! Only ONNX and PyTorch are supported.")
- t1_1 = time.time()
- T1 = t1_1 - t1_0
- print("模型预测时间:%0.8f" % T1)
- pr_masks = [pr_mask.squeeze(0).transpose((1, 2, 0)) for pr_mask in pr_masks]
- if resize_mode == "fitLargeSizeAndPad":
- pr_masks = [
- resize_LongestMaxSize_back(pr_mask, image_src.shape[0:2], resize_mode=cv2.INTER_LINEAR
- ) for pr_mask in pr_masks
- ]
- else:
- pr_masks = [
- cv2.resize(pr_mask, (image_src.shape[1], image_src.shape[0]), interpolation=cv2.INTER_LINEAR
- ) for pr_mask in pr_masks
- ]
- # mask后处理
- output_contours = []
- for out_idx, mask in enumerate(pr_masks):
- for class_idx, label in enumerate(postprocess_cfgs[out_idx].keys()):
- min_area, area_rate, max_count = postprocess_cfgs[out_idx][label]
- pr_class_mask = semantics_segmentation_postprocess(mask, class_idx + 1, min_area, area_rate, max_count)
- pr_class_contour = contour_sort(pr_class_mask, class_idx + 1, label_mapper[out_idx])
- output_contours.extend(pr_class_contour)
- output_contours.sort(key=lambda x: x["area"], reverse=True)
- output_mask = np.zeros((image_src.shape[0], image_src.shape[1]), dtype=np.uint8)
- for single_contour in output_contours:
- output_mask = cv2.drawContours(output_mask.copy(), [single_contour["contour"]], -1,
- single_contour["label_value"], cv2.FILLED)
- return output_contours, output_mask, T0, T1
- def run(model,
- source,
- imgsz=(256, 256),
- postprocess_cfgs=None,
- save_dir=None,
- device='cuda',
- save_txt=False):
- """用于非串联检测,如仅检测颈动脉
- """
- source = str(source)
- fontText = ImageFont.truetype(r"C:\Windows\Fonts\msyhl.ttc", 18, encoding="utf-8")
- if not isinstance(postprocess_cfgs, list):
- postprocess_cfgs = [postprocess_cfgs]
- class_name = []
- for cfg in postprocess_cfgs:
- class_name += list(cfg.keys())
- # prepare model
- seg_model, nc, _backend = model_prepare(model, 3, device) # 3为模型的输出类别数量,依据实际情况修改
- if not isinstance(nc, list):
- nc = [nc]
- nc = [c - 1 for c in nc]
- label_mapper = [{i: i+sum(nc[:idx]) if idx > 0 else i for i in range(1, c+1)} for idx, c in enumerate(nc)]
- # Dataloader
- dataset = LoadImages(source)
- bs = len(dataset)
- vid_path, vid_writer = [None] * bs, [None] * bs
- # Run inference
- Time = []
- for path, im, im0, vid_cap, s in dataset:
- print(f'{s}')
- save_path = str(save_dir / Path(path).name)
- pr_contours, pr_focus_mask, T0, T1 = focus_seg(im, seg_model, label_mapper,
- image_pre_size=imgsz,
- resize_mode="normal",
- preprocess_mode="normalization1",
- inference_backend=_backend,
- postprocess_cfgs=postprocess_cfgs)
- Time.append(T1) # record inference time
- # draw contours
- for pr_contour in pr_contours:
- im0 = draw_contours(im0, pr_contour, class_name, fontText, COLOR_LIST_ORGAN)
- if dataset.mode == 'image':
- cv2.imencode(".jpg", im0, [int(cv2.IMWRITE_JPEG_QUALITY), 100])[1].tofile(save_path)
- else: # 'video'
- if vid_path[dataset.count] != save_path: # new video
- vid_path[dataset.count] = save_path
- if isinstance(vid_writer[dataset.count], cv2.VideoWriter):
- vid_writer[dataset.count].release() # release previous video writer
- if vid_cap: # video
- fps = vid_cap.get(cv2.CAP_PROP_FPS)
- w = im0.shape[1]
- h = im0.shape[0]
- else:
- fps, w, h = 30, im.shape[1], im.shape[0]
- save_path = str(Path(save_path).with_suffix('.mp4'))
- vid_writer[dataset.count] = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))
- vid_writer[dataset.count].write(im0)
- if save_txt:
- pred_info = prfile_generate_semantic_segmentation(pr_contours, im.shape[:2])
- prf = generate_pred_file(pred_info, {k + 1: v for k, v in enumerate(class_name)})
- with open(Path(save_path).with_suffix(".txt"), "w", encoding="utf-8") as f:
- f.write(prf)
- # exclude warmup inference
- warmup = 5
- for _ in range(warmup):
- Time.pop(0)
- inference_speed = sum(Time) / len(Time)
- print('#################################\n'
- '平均每张图像推理速度为:{:.6f}\n'.format(inference_speed),
- '#################################\n')
- def run_pipeline(organ_model,
- lesion_model,
- source,
- imgsz=(256, 256),
- postprocess_cfgs_organ=None,
- postprocess_cfgs_lesion=None,
- save_dir=None,
- device='cuda'):
- '''
- 用于串联检测,脏器+病灶,依据脏器的检测结果作为病灶的输入图像,例如颈动脉+斑块
- '''
- source = str(source)
- fontText = ImageFont.truetype(r"C:\Windows\Fonts\msyhl.ttc", 18, encoding="utf-8")
- class_name_organ = list(postprocess_cfgs_organ.keys())
- class_name_lesion = list(postprocess_cfgs_lesion.keys())
- # prepare model
- seg_organ_model, organ_backend = model_prepare(organ_model, 3, device) # 3为模型的输出类别数量,依据实际情况修改
- seg_lesion_model, lesion_backend = model_prepare(lesion_model, 2, device)
- # Dataloader
- dataset = LoadImages(source)
- bs = len(dataset) # batch size
- vid_path, vid_writer = [None] * bs, [None] * bs
- # Run inference
- Time = []
- for path, im, im0, vid_cap, s in dataset:
- print(f'{s}')
- save_path = str(save_dir / Path(path).name)
- pr_contours_organ, pr_focus_mask_organ, T0, T1 = \
- focus_seg(
- im, seg_organ_model,
- image_pre_size=imgsz,
- resize_mode="normal",
- preprocess_mode="normalization1",
- inference_backend=organ_backend,
- postprocess_cfgs=postprocess_cfgs_organ)
- for pr_contour_organ in pr_contours_organ:
- # TODO 需要进行病灶检测的脏器类别id,可能为多种脏器,待组成一个list
- if pr_contour_organ['label_value'] == 2:
- # 获取外接扩展矩形,如若不需要扩充像素,expand_pixel设置为0
- organ_bounding_box = get_bounding_box(pr_contour_organ,
- im.shape[:2],
- expand_pixel=40)
- im_lesion = im[organ_bounding_box[0]: organ_bounding_box[1],
- organ_bounding_box[2]: organ_bounding_box[3]]
- # 裁切好的图像送入病灶检测模型
- pr_contours_lesion, pr_focus_mask_lesion, T0, T1 = \
- focus_seg(
- im_lesion, seg_lesion_model,
- image_pre_size=imgsz,
- resize_mode="fitLargeSizeAndPad",
- preprocess_mode="normalization1",
- inference_backend=lesion_backend,
- postprocess_cfgs=postprocess_cfgs_lesion)
- # draw contours
- for pr_contour_lesion in pr_contours_lesion:
- im_lesion = draw_contours(im_lesion, pr_contour_lesion, class_name_lesion, fontText,
- im0 = im0.copy()
- im0[organ_bounding_box[0]: organ_bounding_box[1],
- organ_bounding_box[2]: organ_bounding_box[3]] = im_lesion
- im0 = draw_contours(im0, pr_contour_organ, class_name_organ, fontText, COLOR_LIST_ORGAN)
- Time.append(T1) # record inference time
- if dataset.mode == 'image':
- cv2.imencode(".jpg", im0, [int(cv2.IMWRITE_JPEG_QUALITY), 100])[1].tofile(save_path)
- else: # 'video'
- if vid_path[dataset.count] != save_path: # new video
- vid_path[dataset.count] = save_path
- if isinstance(vid_writer[dataset.count], cv2.VideoWriter):
- vid_writer[dataset.count].release() # release previous video writer
- if vid_cap: # video
- fps = vid_cap.get(cv2.CAP_PROP_FPS)
- w = im0.shape[1]
- h = im0.shape[0]
- else:
- fps, w, h = 30, im.shape[1], im.shape[0]
- save_path = str(Path(save_path).with_suffix('.mp4'))
- vid_writer[dataset.count] = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))
- vid_writer[dataset.count].write(im0)
- Time.pop(0)
- inference_speed = sum(Time) / len(Time)
- print('#################################\n'
- '平均每张图像推理速度为:{:.6f}\n'.format(inference_speed),
- '#################################\n')
- def parse_args():
- parser = ArgumentParser()
- parser.add_argument(
- "--organ_model",
- help="organ model path",
- type=str,
- default=r"model.onnx")
- parser.add_argument(
- "--lesion_model",
- help="lesion model path",
- type=str,
- default=None)
- parser.add_argument(
- "--source",
- help="root path to predict",
- type=str,
- default=None)
- parser.add_argument(
- "--imgsz",
- help="inference size h,w",
- type=tuple,
- default=(256, 256))
- parser.add_argument(
- "--cfg_organ",
- help="postprocess configurations",
- default=NECKORGAN_CFGS2)
- parser.add_argument(
- "--cfg_lesion",
- help="postprocess configurations",
- parser.add_argument(
- "--save_dir",
- help="directory to save predicting results",
- type=str,
- default=None)
- parser.add_argument(
- "--device",
- help="select using cuda, cpu, or cpu with one core and one thread",
- type=str,
- choices=['cuda', 'cpu', 'limit_cpu'],
- default="cuda")
- parser.add_argument(
- "--save_txt",
- help="If true, save results to *.txt",
- type=bool,
- default=False)
- return parser.parse_args()
- if __name__ == '__main__':
- opt = parse_args()
- if not isinstance(opt.cfg_organ, list):
- opt.cfg_organ = [opt.cfg_organ]
- organ_categories = []
- for cfg in opt.cfg_organ:
- organ_categories += list(cfg.keys())
- seed_arr_organ = np.array([range(1, 255, math.ceil(255 / len(organ_categories)))]).astype(np.uint8)
- COLOR_LIST_ORGAN = cv2.applyColorMap(seed_arr_organ, cv2.COLORMAP_RAINBOW)[0]
- seed_arr_lesion = np.array([range(1, 255, math.ceil(255 / len(opt.cfg_lesion)))]).astype(np.uint8)
- COLOR_LIST_LESION = cv2.applyColorMap(seed_arr_lesion, cv2.COLORMAP_RAINBOW)[0]
- # Directories
- save_dir = Path(str(opt.save_dir)) / 'predict'
- save_dir.mkdir(exist_ok=True)
- if opt.lesion_model is not None:
- # 串联检测,如检测颈动脉+斑块,目前仅支持串联两个模型,而串联三个模型的有待后续使用到再添加
- run_pipeline(
- opt.organ_model,
- opt.lesion_model,
- opt.source,
- imgsz=opt.imgsz,
- postprocess_cfgs_organ=opt.cfg_organ,
- postprocess_cfgs_lesion=opt.cfg_lesion,
- save_dir=save_dir,
- device=opt.device)
- else:
- # 非串联检测,即单个模型推理
- run(
- opt.organ_model,
- opt.source,
- imgsz=opt.imgsz,
- postprocess_cfgs=opt.cfg_organ,
- save_dir=save_dir,
- device=opt.device,
- save_txt=opt.save_txt)