ultralytics/heatmap.py
2025-02-25 11:58:34 +08:00

364 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import warnings
warnings.filterwarnings('ignore')
warnings.simplefilter('ignore')
import torch, yaml, cv2, os, shutil, sys, copy
import numpy as np
np.random.seed(0)
import matplotlib.pyplot as plt
from tqdm import trange
from PIL import Image
from ultralytics import YOLO
from ultralytics.nn.tasks import attempt_load_weights
from ultralytics.utils.torch_utils import intersect_dicts
from ultralytics.utils.ops import xywh2xyxy, non_max_suppression
from pytorch_grad_cam import GradCAMPlusPlus, GradCAM, XGradCAM, EigenCAM, HiResCAM, LayerCAM, RandomCAM, EigenGradCAM, KPCA_CAM, AblationCAM
from pytorch_grad_cam.utils.image import show_cam_on_image, scale_cam_image
from pytorch_grad_cam.activations_and_gradients import ActivationsAndGradients
def letterbox(im, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True, stride=32):
# Resize and pad image while meeting stride-multiple constraints
shape = im.shape[:2] # current shape [height, width]
if isinstance(new_shape, int):
new_shape = (new_shape, new_shape)
# Scale ratio (new / old)
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
if not scaleup: # only scale down, do not scale up (for better val mAP)
r = min(r, 1.0)
# Compute padding
ratio = r, r # width, height ratios
new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding
if auto: # minimum rectangle
dw, dh = np.mod(dw, stride), np.mod(dh, stride) # wh padding
elif scaleFill: # stretch
dw, dh = 0.0, 0.0
new_unpad = (new_shape[1], new_shape[0])
ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] # width, height ratios
dw /= 2 # divide padding into 2 sides
dh /= 2
if shape[::-1] != new_unpad: # resize
im = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR)
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add border
return im, ratio, (top, bottom, left, right)
class ActivationsAndGradients:
""" Class for extracting activations and
registering gradients from targetted intermediate layers """
def __init__(self, model, target_layers, reshape_transform):
self.model = model
self.gradients = []
self.activations = []
self.reshape_transform = reshape_transform
self.handles = []
for target_layer in target_layers:
self.handles.append(
target_layer.register_forward_hook(self.save_activation))
# Because of https://github.com/pytorch/pytorch/issues/61519,
# we don't use backward hook to record gradients.
self.handles.append(
target_layer.register_forward_hook(self.save_gradient))
def save_activation(self, module, input, output):
activation = output
if self.reshape_transform is not None:
activation = self.reshape_transform(activation)
self.activations.append(activation.cpu().detach())
def save_gradient(self, module, input, output):
if not hasattr(output, "requires_grad") or not output.requires_grad:
# You can only register hooks on tensor requires grad.
return
# Gradients are computed in reverse order
def _store_grad(grad):
if self.reshape_transform is not None:
grad = self.reshape_transform(grad)
self.gradients = [grad.cpu().detach()] + self.gradients
output.register_hook(_store_grad)
def post_process(self, result):
if self.model.end2end:
logits_ = result[:, :, 4:]
boxes_ = result[:, :, :4]
sorted, indices = torch.sort(logits_[:, :, 0], descending=True)
return logits_[0][indices[0]], boxes_[0][indices[0]]
elif self.model.task == 'detect':
logits_ = result[:, 4:]
boxes_ = result[:, :4]
sorted, indices = torch.sort(logits_.max(1)[0], descending=True)
return torch.transpose(logits_[0], dim0=0, dim1=1)[indices[0]], torch.transpose(boxes_[0], dim0=0, dim1=1)[indices[0]]
elif self.model.task == 'segment':
logits_ = result[0][:, 4:4 + self.model.nc]
boxes_ = result[0][:, :4]
mask_p, mask_nm = result[1][2].squeeze(), result[1][1].squeeze().transpose(1, 0)
c, h, w = mask_p.size()
mask = (mask_nm @ mask_p.view(c, -1))
sorted, indices = torch.sort(logits_.max(1)[0], descending=True)
return torch.transpose(logits_[0], dim0=0, dim1=1)[indices[0]], torch.transpose(boxes_[0], dim0=0, dim1=1)[indices[0]], mask[indices[0]]
elif self.model.task == 'pose':
logits_ = result[:, 4:4 + self.model.nc]
boxes_ = result[:, :4]
poses_ = result[:, 4 + self.model.nc:]
sorted, indices = torch.sort(logits_.max(1)[0], descending=True)
return torch.transpose(logits_[0], dim0=0, dim1=1)[indices[0]], torch.transpose(boxes_[0], dim0=0, dim1=1)[indices[0]], torch.transpose(poses_[0], dim0=0, dim1=1)[indices[0]]
elif self.model.task == 'obb':
logits_ = result[:, 4:4 + self.model.nc]
boxes_ = result[:, :4]
angles_ = result[:, 4 + self.model.nc:]
sorted, indices = torch.sort(logits_.max(1)[0], descending=True)
return torch.transpose(logits_[0], dim0=0, dim1=1)[indices[0]], torch.transpose(boxes_[0], dim0=0, dim1=1)[indices[0]], torch.transpose(angles_[0], dim0=0, dim1=1)[indices[0]]
elif self.model.task == 'classify':
return result[0]
def __call__(self, x):
self.gradients = []
self.activations = []
model_output = self.model(x)
if self.model.task == 'detect':
post_result, pre_post_boxes = self.post_process(model_output[0])
return [[post_result, pre_post_boxes]]
elif self.model.task == 'segment':
post_result, pre_post_boxes, pre_post_mask = self.post_process(model_output)
return [[post_result, pre_post_boxes, pre_post_mask]]
elif self.model.task == 'pose':
post_result, pre_post_boxes, pre_post_pose = self.post_process(model_output[0])
return [[post_result, pre_post_boxes, pre_post_pose]]
elif self.model.task == 'obb':
post_result, pre_post_boxes, pre_post_angle = self.post_process(model_output[0])
return [[post_result, pre_post_boxes, pre_post_angle]]
elif self.model.task == 'classify':
data = self.post_process(model_output)
return [data]
def release(self):
for handle in self.handles:
handle.remove()
class yolo_detect_target(torch.nn.Module):
def __init__(self, ouput_type, conf, ratio, end2end) -> None:
super().__init__()
self.ouput_type = ouput_type
self.conf = conf
self.ratio = ratio
self.end2end = end2end
def forward(self, data):
post_result, pre_post_boxes = data
result = []
for i in trange(int(post_result.size(0) * self.ratio)):
if (self.end2end and float(post_result[i, 0]) < self.conf) or (not self.end2end and float(post_result[i].max()) < self.conf):
break
if self.ouput_type == 'class' or self.ouput_type == 'all':
if self.end2end:
result.append(post_result[i, 0])
else:
result.append(post_result[i].max())
elif self.ouput_type == 'box' or self.ouput_type == 'all':
for j in range(4):
result.append(pre_post_boxes[i, j])
return sum(result)
class yolo_segment_target(yolo_detect_target):
def __init__(self, ouput_type, conf, ratio, end2end):
super().__init__(ouput_type, conf, ratio, end2end)
def forward(self, data):
post_result, pre_post_boxes, pre_post_mask = data
result = []
for i in trange(int(post_result.size(0) * self.ratio)):
if float(post_result[i].max()) < self.conf:
break
if self.ouput_type == 'class' or self.ouput_type == 'all':
result.append(post_result[i].max())
elif self.ouput_type == 'box' or self.ouput_type == 'all':
for j in range(4):
result.append(pre_post_boxes[i, j])
elif self.ouput_type == 'segment' or self.ouput_type == 'all':
result.append(pre_post_mask[i].mean())
return sum(result)
class yolo_pose_target(yolo_detect_target):
def __init__(self, ouput_type, conf, ratio, end2end):
super().__init__(ouput_type, conf, ratio, end2end)
def forward(self, data):
post_result, pre_post_boxes, pre_post_pose = data
result = []
for i in trange(int(post_result.size(0) * self.ratio)):
if float(post_result[i].max()) < self.conf:
break
if self.ouput_type == 'class' or self.ouput_type == 'all':
result.append(post_result[i].max())
elif self.ouput_type == 'box' or self.ouput_type == 'all':
for j in range(4):
result.append(pre_post_boxes[i, j])
elif self.ouput_type == 'pose' or self.ouput_type == 'all':
result.append(pre_post_pose[i].mean())
return sum(result)
class yolo_obb_target(yolo_detect_target):
def __init__(self, ouput_type, conf, ratio, end2end):
super().__init__(ouput_type, conf, ratio, end2end)
def forward(self, data):
post_result, pre_post_boxes, pre_post_angle = data
result = []
for i in trange(int(post_result.size(0) * self.ratio)):
if float(post_result[i].max()) < self.conf:
break
if self.ouput_type == 'class' or self.ouput_type == 'all':
result.append(post_result[i].max())
elif self.ouput_type == 'box' or self.ouput_type == 'all':
for j in range(4):
result.append(pre_post_boxes[i, j])
elif self.ouput_type == 'obb' or self.ouput_type == 'all':
result.append(pre_post_angle[i])
return sum(result)
class yolo_classify_target(yolo_detect_target):
def __init__(self, ouput_type, conf, ratio, end2end):
super().__init__(ouput_type, conf, ratio, end2end)
def forward(self, data):
return data.max()
class yolo_heatmap:
def __init__(self, weight, device, method, layer, backward_type, conf_threshold, ratio, show_result, renormalize, task, img_size):
device = torch.device(device)
model_yolo = YOLO(weight)
model_names = model_yolo.names
print(f'model class info:{model_names}')
model = copy.deepcopy(model_yolo.model)
model.to(device)
model.info()
for p in model.parameters():
p.requires_grad_(True)
model.eval()
model.task = task
if not hasattr(model, 'end2end'):
model.end2end = False
if task == 'detect':
target = yolo_detect_target(backward_type, conf_threshold, ratio, model.end2end)
elif task == 'segment':
target = yolo_segment_target(backward_type, conf_threshold, ratio, model.end2end)
elif task == 'pose':
target = yolo_pose_target(backward_type, conf_threshold, ratio, model.end2end)
elif task == 'obb':
target = yolo_obb_target(backward_type, conf_threshold, ratio, model.end2end)
elif task == 'classify':
target = yolo_classify_target(backward_type, conf_threshold, ratio, model.end2end)
else:
raise Exception(f"not support task({task}).")
target_layers = [model.model[l] for l in layer]
method = eval(method)(model, target_layers)
method.activations_and_grads = ActivationsAndGradients(model, target_layers, None)
colors = np.random.uniform(0, 255, size=(len(model_names), 3)).astype(np.int32)
self.__dict__.update(locals())
def post_process(self, result):
result = non_max_suppression(result, conf_thres=self.conf_threshold, iou_thres=0.65)[0]
return result
def draw_detections(self, box, color, name, img):
xmin, ymin, xmax, ymax = list(map(int, list(box)))
cv2.rectangle(img, (xmin, ymin), (xmax, ymax), tuple(int(x) for x in color), 2) # 绘制检测框
cv2.putText(img, str(name), (xmin, ymin - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.8, tuple(int(x) for x in color), 2, lineType=cv2.LINE_AA) # 绘制类别、置信度
return img
def renormalize_cam_in_bounding_boxes(self, boxes, image_float_np, grayscale_cam):
"""Normalize the CAM to be in the range [0, 1]
inside every bounding boxes, and zero outside of the bounding boxes. """
renormalized_cam = np.zeros(grayscale_cam.shape, dtype=np.float32)
for x1, y1, x2, y2 in boxes:
x1, y1 = max(x1, 0), max(y1, 0)
x2, y2 = min(grayscale_cam.shape[1] - 1, x2), min(grayscale_cam.shape[0] - 1, y2)
renormalized_cam[y1:y2, x1:x2] = scale_cam_image(grayscale_cam[y1:y2, x1:x2].copy())
renormalized_cam = scale_cam_image(renormalized_cam)
eigencam_image_renormalized = show_cam_on_image(image_float_np, renormalized_cam, use_rgb=True)
return eigencam_image_renormalized
def process(self, img_path, save_path):
# img process
try:
img = cv2.imdecode(np.fromfile(img_path, np.uint8), cv2.IMREAD_COLOR)
except:
print(f"Warning... {img_path} read failure.")
return
img, _, (top, bottom, left, right) = letterbox(img, new_shape=(self.img_size, self.img_size), auto=True) # 如果需要完全固定成宽高一样就把auto设置为False
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = np.float32(img) / 255.0
tensor = torch.from_numpy(np.transpose(img, axes=[2, 0, 1])).unsqueeze(0).to(self.device)
print(f'tensor size:{tensor.size()}')
try:
grayscale_cam = self.method(tensor, [self.target])
except AttributeError as e:
print(f"Warning... self.method(tensor, [self.target]) failure.")
return
grayscale_cam = grayscale_cam[0, :]
cam_image = show_cam_on_image(img, grayscale_cam, use_rgb=True)
pred = self.model_yolo.predict(tensor, conf=self.conf_threshold, iou=0.7)[0]
if self.renormalize and self.task in ['detect', 'segment', 'pose']:
cam_image = self.renormalize_cam_in_bounding_boxes(pred.boxes.xyxy.cpu().detach().numpy().astype(np.int32), img, grayscale_cam)
if self.show_result:
cam_image = pred.plot(img=cam_image,
conf=True, # 显示置信度
font_size=None, # 字体大小None为根据当前image尺寸计算
line_width=None, # 线条宽度None为根据当前image尺寸计算
labels=False, # 显示标签
)
# 去掉padding边界
cam_image = cam_image[top:cam_image.shape[0] - bottom, left:cam_image.shape[1] - right]
cam_image = Image.fromarray(cam_image)
cam_image.save(save_path)
def __call__(self, img_path, save_path):
# remove dir if exist
if os.path.exists(save_path):
shutil.rmtree(save_path)
# make dir if not exist
os.makedirs(save_path, exist_ok=True)
if os.path.isdir(img_path):
for img_path_ in os.listdir(img_path):
self.process(f'{img_path}/{img_path_}', f'{save_path}/{img_path_}')
else:
self.process(img_path, f'{save_path}/result.png')
def get_params():
params = {
'weight': 'yolo11n.pt', # 现在只需要指定权重即可,不需要指定cfg
'device': 'cuda:0',
'method': 'GradCAMPlusPlus', # GradCAMPlusPlus, GradCAM, XGradCAM, EigenCAM, HiResCAM, LayerCAM, RandomCAM, EigenGradCAM, KPCA_CAM
'layer': [10, 12, 14, 16, 18],
'backward_type': 'all', # detect:<class, box, all> segment:<class, box, segment, all> pose:<box, keypoint, all> obb:<box, angle, all> classify:<all>
'conf_threshold': 0.2, # 0.2
'ratio': 0.02, # 0.02-0.1
'show_result': True, # 不需要绘制结果请设置为False
'renormalize': False, # 需要把热力图限制在框内请设置为True(仅对detect,segment,pose有效)
'task':'detect', # 任务(detect,segment,pose,obb,classify)
'img_size':640, # 图像尺寸
}
return params
# pip install grad-cam==1.5.4 --no-deps
if __name__ == '__main__':
model = yolo_heatmap(**get_params())
model(r'/home/hjj/Desktop/dataset/dataset_coco/coco/images/val2017/000000361238.jpg', 'result')
# model(r'/home/hjj/Desktop/dataset/dataset_coco/coco/images/val2017', 'result')