364 lines
17 KiB
Python
364 lines
17 KiB
Python
import warnings
|
||
warnings.filterwarnings('ignore')
|
||
warnings.simplefilter('ignore')
|
||
import torch, yaml, cv2, os, shutil, sys, copy
|
||
import numpy as np
|
||
np.random.seed(0)
|
||
import matplotlib.pyplot as plt
|
||
from tqdm import trange
|
||
from PIL import Image
|
||
from ultralytics import YOLO
|
||
from ultralytics.nn.tasks import attempt_load_weights
|
||
from ultralytics.utils.torch_utils import intersect_dicts
|
||
from ultralytics.utils.ops import xywh2xyxy, non_max_suppression
|
||
from pytorch_grad_cam import GradCAMPlusPlus, GradCAM, XGradCAM, EigenCAM, HiResCAM, LayerCAM, RandomCAM, EigenGradCAM, KPCA_CAM, AblationCAM
|
||
from pytorch_grad_cam.utils.image import show_cam_on_image, scale_cam_image
|
||
from pytorch_grad_cam.activations_and_gradients import ActivationsAndGradients
|
||
|
||
def letterbox(im, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True, stride=32):
|
||
# Resize and pad image while meeting stride-multiple constraints
|
||
shape = im.shape[:2] # current shape [height, width]
|
||
if isinstance(new_shape, int):
|
||
new_shape = (new_shape, new_shape)
|
||
|
||
# Scale ratio (new / old)
|
||
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
|
||
if not scaleup: # only scale down, do not scale up (for better val mAP)
|
||
r = min(r, 1.0)
|
||
|
||
# Compute padding
|
||
ratio = r, r # width, height ratios
|
||
new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
|
||
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding
|
||
if auto: # minimum rectangle
|
||
dw, dh = np.mod(dw, stride), np.mod(dh, stride) # wh padding
|
||
elif scaleFill: # stretch
|
||
dw, dh = 0.0, 0.0
|
||
new_unpad = (new_shape[1], new_shape[0])
|
||
ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] # width, height ratios
|
||
|
||
dw /= 2 # divide padding into 2 sides
|
||
dh /= 2
|
||
|
||
if shape[::-1] != new_unpad: # resize
|
||
im = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR)
|
||
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
|
||
left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
|
||
im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add border
|
||
return im, ratio, (top, bottom, left, right)
|
||
|
||
class ActivationsAndGradients:
|
||
""" Class for extracting activations and
|
||
registering gradients from targetted intermediate layers """
|
||
|
||
def __init__(self, model, target_layers, reshape_transform):
|
||
self.model = model
|
||
self.gradients = []
|
||
self.activations = []
|
||
self.reshape_transform = reshape_transform
|
||
self.handles = []
|
||
for target_layer in target_layers:
|
||
self.handles.append(
|
||
target_layer.register_forward_hook(self.save_activation))
|
||
# Because of https://github.com/pytorch/pytorch/issues/61519,
|
||
# we don't use backward hook to record gradients.
|
||
self.handles.append(
|
||
target_layer.register_forward_hook(self.save_gradient))
|
||
|
||
def save_activation(self, module, input, output):
|
||
activation = output
|
||
|
||
if self.reshape_transform is not None:
|
||
activation = self.reshape_transform(activation)
|
||
self.activations.append(activation.cpu().detach())
|
||
|
||
def save_gradient(self, module, input, output):
|
||
if not hasattr(output, "requires_grad") or not output.requires_grad:
|
||
# You can only register hooks on tensor requires grad.
|
||
return
|
||
|
||
# Gradients are computed in reverse order
|
||
def _store_grad(grad):
|
||
if self.reshape_transform is not None:
|
||
grad = self.reshape_transform(grad)
|
||
self.gradients = [grad.cpu().detach()] + self.gradients
|
||
|
||
output.register_hook(_store_grad)
|
||
|
||
def post_process(self, result):
|
||
if self.model.end2end:
|
||
logits_ = result[:, :, 4:]
|
||
boxes_ = result[:, :, :4]
|
||
sorted, indices = torch.sort(logits_[:, :, 0], descending=True)
|
||
return logits_[0][indices[0]], boxes_[0][indices[0]]
|
||
elif self.model.task == 'detect':
|
||
logits_ = result[:, 4:]
|
||
boxes_ = result[:, :4]
|
||
sorted, indices = torch.sort(logits_.max(1)[0], descending=True)
|
||
return torch.transpose(logits_[0], dim0=0, dim1=1)[indices[0]], torch.transpose(boxes_[0], dim0=0, dim1=1)[indices[0]]
|
||
elif self.model.task == 'segment':
|
||
logits_ = result[0][:, 4:4 + self.model.nc]
|
||
boxes_ = result[0][:, :4]
|
||
mask_p, mask_nm = result[1][2].squeeze(), result[1][1].squeeze().transpose(1, 0)
|
||
c, h, w = mask_p.size()
|
||
mask = (mask_nm @ mask_p.view(c, -1))
|
||
sorted, indices = torch.sort(logits_.max(1)[0], descending=True)
|
||
return torch.transpose(logits_[0], dim0=0, dim1=1)[indices[0]], torch.transpose(boxes_[0], dim0=0, dim1=1)[indices[0]], mask[indices[0]]
|
||
elif self.model.task == 'pose':
|
||
logits_ = result[:, 4:4 + self.model.nc]
|
||
boxes_ = result[:, :4]
|
||
poses_ = result[:, 4 + self.model.nc:]
|
||
sorted, indices = torch.sort(logits_.max(1)[0], descending=True)
|
||
return torch.transpose(logits_[0], dim0=0, dim1=1)[indices[0]], torch.transpose(boxes_[0], dim0=0, dim1=1)[indices[0]], torch.transpose(poses_[0], dim0=0, dim1=1)[indices[0]]
|
||
elif self.model.task == 'obb':
|
||
logits_ = result[:, 4:4 + self.model.nc]
|
||
boxes_ = result[:, :4]
|
||
angles_ = result[:, 4 + self.model.nc:]
|
||
sorted, indices = torch.sort(logits_.max(1)[0], descending=True)
|
||
return torch.transpose(logits_[0], dim0=0, dim1=1)[indices[0]], torch.transpose(boxes_[0], dim0=0, dim1=1)[indices[0]], torch.transpose(angles_[0], dim0=0, dim1=1)[indices[0]]
|
||
elif self.model.task == 'classify':
|
||
return result[0]
|
||
|
||
def __call__(self, x):
|
||
self.gradients = []
|
||
self.activations = []
|
||
model_output = self.model(x)
|
||
if self.model.task == 'detect':
|
||
post_result, pre_post_boxes = self.post_process(model_output[0])
|
||
return [[post_result, pre_post_boxes]]
|
||
elif self.model.task == 'segment':
|
||
post_result, pre_post_boxes, pre_post_mask = self.post_process(model_output)
|
||
return [[post_result, pre_post_boxes, pre_post_mask]]
|
||
elif self.model.task == 'pose':
|
||
post_result, pre_post_boxes, pre_post_pose = self.post_process(model_output[0])
|
||
return [[post_result, pre_post_boxes, pre_post_pose]]
|
||
elif self.model.task == 'obb':
|
||
post_result, pre_post_boxes, pre_post_angle = self.post_process(model_output[0])
|
||
return [[post_result, pre_post_boxes, pre_post_angle]]
|
||
elif self.model.task == 'classify':
|
||
data = self.post_process(model_output)
|
||
return [data]
|
||
|
||
def release(self):
|
||
for handle in self.handles:
|
||
handle.remove()
|
||
|
||
class yolo_detect_target(torch.nn.Module):
|
||
def __init__(self, ouput_type, conf, ratio, end2end) -> None:
|
||
super().__init__()
|
||
self.ouput_type = ouput_type
|
||
self.conf = conf
|
||
self.ratio = ratio
|
||
self.end2end = end2end
|
||
|
||
def forward(self, data):
|
||
post_result, pre_post_boxes = data
|
||
result = []
|
||
for i in trange(int(post_result.size(0) * self.ratio)):
|
||
if (self.end2end and float(post_result[i, 0]) < self.conf) or (not self.end2end and float(post_result[i].max()) < self.conf):
|
||
break
|
||
if self.ouput_type == 'class' or self.ouput_type == 'all':
|
||
if self.end2end:
|
||
result.append(post_result[i, 0])
|
||
else:
|
||
result.append(post_result[i].max())
|
||
elif self.ouput_type == 'box' or self.ouput_type == 'all':
|
||
for j in range(4):
|
||
result.append(pre_post_boxes[i, j])
|
||
return sum(result)
|
||
|
||
class yolo_segment_target(yolo_detect_target):
|
||
def __init__(self, ouput_type, conf, ratio, end2end):
|
||
super().__init__(ouput_type, conf, ratio, end2end)
|
||
|
||
def forward(self, data):
|
||
post_result, pre_post_boxes, pre_post_mask = data
|
||
result = []
|
||
for i in trange(int(post_result.size(0) * self.ratio)):
|
||
if float(post_result[i].max()) < self.conf:
|
||
break
|
||
if self.ouput_type == 'class' or self.ouput_type == 'all':
|
||
result.append(post_result[i].max())
|
||
elif self.ouput_type == 'box' or self.ouput_type == 'all':
|
||
for j in range(4):
|
||
result.append(pre_post_boxes[i, j])
|
||
elif self.ouput_type == 'segment' or self.ouput_type == 'all':
|
||
result.append(pre_post_mask[i].mean())
|
||
return sum(result)
|
||
|
||
class yolo_pose_target(yolo_detect_target):
|
||
def __init__(self, ouput_type, conf, ratio, end2end):
|
||
super().__init__(ouput_type, conf, ratio, end2end)
|
||
|
||
def forward(self, data):
|
||
post_result, pre_post_boxes, pre_post_pose = data
|
||
result = []
|
||
for i in trange(int(post_result.size(0) * self.ratio)):
|
||
if float(post_result[i].max()) < self.conf:
|
||
break
|
||
if self.ouput_type == 'class' or self.ouput_type == 'all':
|
||
result.append(post_result[i].max())
|
||
elif self.ouput_type == 'box' or self.ouput_type == 'all':
|
||
for j in range(4):
|
||
result.append(pre_post_boxes[i, j])
|
||
elif self.ouput_type == 'pose' or self.ouput_type == 'all':
|
||
result.append(pre_post_pose[i].mean())
|
||
return sum(result)
|
||
|
||
class yolo_obb_target(yolo_detect_target):
|
||
def __init__(self, ouput_type, conf, ratio, end2end):
|
||
super().__init__(ouput_type, conf, ratio, end2end)
|
||
|
||
def forward(self, data):
|
||
post_result, pre_post_boxes, pre_post_angle = data
|
||
result = []
|
||
for i in trange(int(post_result.size(0) * self.ratio)):
|
||
if float(post_result[i].max()) < self.conf:
|
||
break
|
||
if self.ouput_type == 'class' or self.ouput_type == 'all':
|
||
result.append(post_result[i].max())
|
||
elif self.ouput_type == 'box' or self.ouput_type == 'all':
|
||
for j in range(4):
|
||
result.append(pre_post_boxes[i, j])
|
||
elif self.ouput_type == 'obb' or self.ouput_type == 'all':
|
||
result.append(pre_post_angle[i])
|
||
return sum(result)
|
||
|
||
class yolo_classify_target(yolo_detect_target):
|
||
def __init__(self, ouput_type, conf, ratio, end2end):
|
||
super().__init__(ouput_type, conf, ratio, end2end)
|
||
|
||
def forward(self, data):
|
||
return data.max()
|
||
|
||
class yolo_heatmap:
|
||
def __init__(self, weight, device, method, layer, backward_type, conf_threshold, ratio, show_result, renormalize, task, img_size):
|
||
device = torch.device(device)
|
||
model_yolo = YOLO(weight)
|
||
model_names = model_yolo.names
|
||
print(f'model class info:{model_names}')
|
||
model = copy.deepcopy(model_yolo.model)
|
||
model.to(device)
|
||
model.info()
|
||
for p in model.parameters():
|
||
p.requires_grad_(True)
|
||
model.eval()
|
||
|
||
model.task = task
|
||
if not hasattr(model, 'end2end'):
|
||
model.end2end = False
|
||
|
||
if task == 'detect':
|
||
target = yolo_detect_target(backward_type, conf_threshold, ratio, model.end2end)
|
||
elif task == 'segment':
|
||
target = yolo_segment_target(backward_type, conf_threshold, ratio, model.end2end)
|
||
elif task == 'pose':
|
||
target = yolo_pose_target(backward_type, conf_threshold, ratio, model.end2end)
|
||
elif task == 'obb':
|
||
target = yolo_obb_target(backward_type, conf_threshold, ratio, model.end2end)
|
||
elif task == 'classify':
|
||
target = yolo_classify_target(backward_type, conf_threshold, ratio, model.end2end)
|
||
else:
|
||
raise Exception(f"not support task({task}).")
|
||
|
||
target_layers = [model.model[l] for l in layer]
|
||
method = eval(method)(model, target_layers)
|
||
method.activations_and_grads = ActivationsAndGradients(model, target_layers, None)
|
||
|
||
colors = np.random.uniform(0, 255, size=(len(model_names), 3)).astype(np.int32)
|
||
self.__dict__.update(locals())
|
||
|
||
def post_process(self, result):
|
||
result = non_max_suppression(result, conf_thres=self.conf_threshold, iou_thres=0.65)[0]
|
||
return result
|
||
|
||
def draw_detections(self, box, color, name, img):
|
||
xmin, ymin, xmax, ymax = list(map(int, list(box)))
|
||
cv2.rectangle(img, (xmin, ymin), (xmax, ymax), tuple(int(x) for x in color), 2) # 绘制检测框
|
||
cv2.putText(img, str(name), (xmin, ymin - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.8, tuple(int(x) for x in color), 2, lineType=cv2.LINE_AA) # 绘制类别、置信度
|
||
return img
|
||
|
||
def renormalize_cam_in_bounding_boxes(self, boxes, image_float_np, grayscale_cam):
|
||
"""Normalize the CAM to be in the range [0, 1]
|
||
inside every bounding boxes, and zero outside of the bounding boxes. """
|
||
renormalized_cam = np.zeros(grayscale_cam.shape, dtype=np.float32)
|
||
for x1, y1, x2, y2 in boxes:
|
||
x1, y1 = max(x1, 0), max(y1, 0)
|
||
x2, y2 = min(grayscale_cam.shape[1] - 1, x2), min(grayscale_cam.shape[0] - 1, y2)
|
||
renormalized_cam[y1:y2, x1:x2] = scale_cam_image(grayscale_cam[y1:y2, x1:x2].copy())
|
||
renormalized_cam = scale_cam_image(renormalized_cam)
|
||
eigencam_image_renormalized = show_cam_on_image(image_float_np, renormalized_cam, use_rgb=True)
|
||
return eigencam_image_renormalized
|
||
|
||
def process(self, img_path, save_path):
|
||
# img process
|
||
try:
|
||
img = cv2.imdecode(np.fromfile(img_path, np.uint8), cv2.IMREAD_COLOR)
|
||
except:
|
||
print(f"Warning... {img_path} read failure.")
|
||
return
|
||
img, _, (top, bottom, left, right) = letterbox(img, new_shape=(self.img_size, self.img_size), auto=True) # 如果需要完全固定成宽高一样就把auto设置为False
|
||
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
|
||
img = np.float32(img) / 255.0
|
||
tensor = torch.from_numpy(np.transpose(img, axes=[2, 0, 1])).unsqueeze(0).to(self.device)
|
||
print(f'tensor size:{tensor.size()}')
|
||
|
||
try:
|
||
grayscale_cam = self.method(tensor, [self.target])
|
||
except AttributeError as e:
|
||
print(f"Warning... self.method(tensor, [self.target]) failure.")
|
||
return
|
||
|
||
grayscale_cam = grayscale_cam[0, :]
|
||
cam_image = show_cam_on_image(img, grayscale_cam, use_rgb=True)
|
||
|
||
pred = self.model_yolo.predict(tensor, conf=self.conf_threshold, iou=0.7)[0]
|
||
if self.renormalize and self.task in ['detect', 'segment', 'pose']:
|
||
cam_image = self.renormalize_cam_in_bounding_boxes(pred.boxes.xyxy.cpu().detach().numpy().astype(np.int32), img, grayscale_cam)
|
||
if self.show_result:
|
||
cam_image = pred.plot(img=cam_image,
|
||
conf=True, # 显示置信度
|
||
font_size=None, # 字体大小,None为根据当前image尺寸计算
|
||
line_width=None, # 线条宽度,None为根据当前image尺寸计算
|
||
labels=False, # 显示标签
|
||
)
|
||
|
||
# 去掉padding边界
|
||
cam_image = cam_image[top:cam_image.shape[0] - bottom, left:cam_image.shape[1] - right]
|
||
cam_image = Image.fromarray(cam_image)
|
||
cam_image.save(save_path)
|
||
|
||
def __call__(self, img_path, save_path):
|
||
# remove dir if exist
|
||
if os.path.exists(save_path):
|
||
shutil.rmtree(save_path)
|
||
# make dir if not exist
|
||
os.makedirs(save_path, exist_ok=True)
|
||
|
||
if os.path.isdir(img_path):
|
||
for img_path_ in os.listdir(img_path):
|
||
self.process(f'{img_path}/{img_path_}', f'{save_path}/{img_path_}')
|
||
else:
|
||
self.process(img_path, f'{save_path}/result.png')
|
||
|
||
def get_params():
|
||
params = {
|
||
'weight': 'yolo11n.pt', # 现在只需要指定权重即可,不需要指定cfg
|
||
'device': 'cuda:0',
|
||
'method': 'GradCAMPlusPlus', # GradCAMPlusPlus, GradCAM, XGradCAM, EigenCAM, HiResCAM, LayerCAM, RandomCAM, EigenGradCAM, KPCA_CAM
|
||
'layer': [10, 12, 14, 16, 18],
|
||
'backward_type': 'all', # detect:<class, box, all> segment:<class, box, segment, all> pose:<box, keypoint, all> obb:<box, angle, all> classify:<all>
|
||
'conf_threshold': 0.2, # 0.2
|
||
'ratio': 0.02, # 0.02-0.1
|
||
'show_result': True, # 不需要绘制结果请设置为False
|
||
'renormalize': False, # 需要把热力图限制在框内请设置为True(仅对detect,segment,pose有效)
|
||
'task':'detect', # 任务(detect,segment,pose,obb,classify)
|
||
'img_size':640, # 图像尺寸
|
||
}
|
||
return params
|
||
|
||
# pip install grad-cam==1.5.4 --no-deps
|
||
if __name__ == '__main__':
|
||
model = yolo_heatmap(**get_params())
|
||
model(r'/home/hjj/Desktop/dataset/dataset_coco/coco/images/val2017/000000361238.jpg', 'result')
|
||
# model(r'/home/hjj/Desktop/dataset/dataset_coco/coco/images/val2017', 'result') |