123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358 |
- import numpy as np
- #from numba import jit
- from collections import OrderedDict, deque
- import itertools
- import os
- import cv2
- import torch
- from torch._C import dtype
- import torchvision
- from yolox.motdt_tracker import matching
- from .kalman_filter import KalmanFilter
- from .reid_model import load_reid_model, extract_reid_features
- from yolox.data.dataloading import get_yolox_datadir
- from .basetrack import BaseTrack, TrackState
- class STrack(BaseTrack):
- def __init__(self, tlwh, score, max_n_features=100, from_det=True):
- # wait activate
- self._tlwh = np.asarray(tlwh, dtype=np.float)
- self.kalman_filter = None
- self.mean, self.covariance = None, None
- self.is_activated = False
- self.score = score
- self.max_n_features = max_n_features
- self.curr_feature = None
- self.last_feature = None
- self.features = deque([], maxlen=self.max_n_features)
- # classification
- self.from_det = from_det
- self.tracklet_len = 0
- self.time_by_tracking = 0
- # self-tracking
- self.tracker = None
- def set_feature(self, feature):
- if feature is None:
- return False
- self.features.append(feature)
- self.curr_feature = feature
- self.last_feature = feature
- # self._p_feature = 0
- return True
- def predict(self):
- if self.time_since_update > 0:
- self.tracklet_len = 0
- self.time_since_update += 1
- mean_state = self.mean.copy()
- if self.state != TrackState.Tracked:
- mean_state[7] = 0
- self.mean, self.covariance = self.kalman_filter.predict(mean_state, self.covariance)
- if self.tracker:
- self.tracker.update_roi(self.tlwh)
- def self_tracking(self, image):
- tlwh = self.tracker.predict(image) if self.tracker else self.tlwh
- return tlwh
- def activate(self, kalman_filter, frame_id, image):
- """Start a new tracklet"""
- self.kalman_filter = kalman_filter # type: KalmanFilter
- self.track_id = self.next_id()
- # cx, cy, aspect_ratio, height, dx, dy, da, dh
- self.mean, self.covariance = self.kalman_filter.initiate(self.tlwh_to_xyah(self._tlwh))
- # self.tracker = sot.SingleObjectTracker()
- # self.tracker.init(image, self.tlwh)
- del self._tlwh
- self.time_since_update = 0
- self.time_by_tracking = 0
- self.tracklet_len = 0
- self.state = TrackState.Tracked
- # self.is_activated = True
- self.frame_id = frame_id
- self.start_frame = frame_id
- def re_activate(self, new_track, frame_id, image, new_id=False):
- # self.mean, self.covariance = self.kalman_filter.initiate(self.tlwh_to_xyah(new_track.tlwh))
- self.mean, self.covariance = self.kalman_filter.update(
- self.mean, self.covariance, self.tlwh_to_xyah(new_track.tlwh)
- )
- self.time_since_update = 0
- self.time_by_tracking = 0
- self.tracklet_len = 0
- self.state = TrackState.Tracked
- self.is_activated = True
- self.frame_id = frame_id
- if new_id:
- self.track_id = self.next_id()
- self.set_feature(new_track.curr_feature)
- def update(self, new_track, frame_id, image, update_feature=True):
- """
- Update a matched track
- :type new_track: STrack
- :type frame_id: int
- :type update_feature: bool
- :return:
- """
- self.frame_id = frame_id
- self.time_since_update = 0
- if new_track.from_det:
- self.time_by_tracking = 0
- else:
- self.time_by_tracking += 1
- self.tracklet_len += 1
- new_tlwh = new_track.tlwh
- self.mean, self.covariance = self.kalman_filter.update(
- self.mean, self.covariance, self.tlwh_to_xyah(new_tlwh))
- self.state = TrackState.Tracked
- self.is_activated = True
- self.score = new_track.score
- if update_feature:
- self.set_feature(new_track.curr_feature)
- if self.tracker:
- self.tracker.update(image, self.tlwh)
- @property
- #@jit
- def tlwh(self):
- """Get current position in bounding box format `(top left x, top left y,
- width, height)`.
- """
- if self.mean is None:
- return self._tlwh.copy()
- ret = self.mean[:4].copy()
- ret[2] *= ret[3]
- ret[:2] -= ret[2:] / 2
- return ret
- @property
- #@jit
- def tlbr(self):
- """Convert bounding box to format `(min x, min y, max x, max y)`, i.e.,
- `(top left, bottom right)`.
- """
- ret = self.tlwh.copy()
- ret[2:] += ret[:2]
- return ret
- @staticmethod
- #@jit
- def tlwh_to_xyah(tlwh):
- """Convert bounding box to format `(center x, center y, aspect ratio,
- height)`, where the aspect ratio is `width / height`.
- """
- ret = np.asarray(tlwh).copy()
- ret[:2] += ret[2:] / 2
- ret[2] /= ret[3]
- return ret
- def to_xyah(self):
- return self.tlwh_to_xyah(self.tlwh)
- def tracklet_score(self):
- # score = (1 - np.exp(-0.6 * self.hit_streak)) * np.exp(-0.03 * self.time_by_tracking)
- score = max(0, 1 - np.log(1 + 0.05 * self.time_by_tracking)) * (self.tracklet_len - self.time_by_tracking > 2)
- # score = max(0, 1 - np.log(1 + 0.05 * self.n_tracking)) * (1 - np.exp(-0.6 * self.hit_streak))
- return score
- def __repr__(self):
- return 'OT_{}_({}-{})'.format(self.track_id, self.start_frame, self.end_frame)
- class OnlineTracker(object):
- def __init__(self, model_folder, min_cls_score=0.4, min_ap_dist=0.8, max_time_lost=30, use_tracking=True, use_refind=True):
- self.min_cls_score = min_cls_score
- self.min_ap_dist = min_ap_dist
- self.max_time_lost = max_time_lost
- self.kalman_filter = KalmanFilter()
- self.tracked_stracks = [] # type: list[STrack]
- self.lost_stracks = [] # type: list[STrack]
- self.removed_stracks = [] # type: list[STrack]
- self.use_refind = use_refind
- self.use_tracking = use_tracking
- self.classifier = None
- self.reid_model = load_reid_model(model_folder)
- self.frame_id = 0
- def update(self, output_results, img_info, img_size, img_file_name):
- img_file_name = os.path.join(get_yolox_datadir(), 'mot', 'train', img_file_name)
- image = cv2.imread(img_file_name)
- # post process detections
- output_results = output_results.cpu().numpy()
- confidences = output_results[:, 4] * output_results[:, 5]
-
- bboxes = output_results[:, :4] # x1y1x2y2
- img_h, img_w = img_info[0], img_info[1]
- scale = min(img_size[0] / float(img_h), img_size[1] / float(img_w))
- bboxes /= scale
- bbox_xyxy = bboxes
- tlwhs = self._xyxy_to_tlwh_array(bbox_xyxy)
- remain_inds = confidences > self.min_cls_score
- tlwhs = tlwhs[remain_inds]
- det_scores = confidences[remain_inds]
- self.frame_id += 1
- activated_starcks = []
- refind_stracks = []
- lost_stracks = []
- removed_stracks = []
- """step 1: prediction"""
- for strack in itertools.chain(self.tracked_stracks, self.lost_stracks):
- strack.predict()
- """step 2: scoring and selection"""
- if det_scores is None:
- det_scores = np.ones(len(tlwhs), dtype=float)
- detections = [STrack(tlwh, score, from_det=True) for tlwh, score in zip(tlwhs, det_scores)]
- if self.use_tracking:
- tracks = [STrack(t.self_tracking(image), 0.6 * t.tracklet_score(), from_det=False)
- for t in itertools.chain(self.tracked_stracks, self.lost_stracks) if t.is_activated]
- detections.extend(tracks)
- rois = np.asarray([d.tlbr for d in detections], dtype=np.float32)
- scores = np.asarray([d.score for d in detections], dtype=np.float32)
- # nms
- if len(detections) > 0:
- nms_out_index = torchvision.ops.batched_nms(
- torch.from_numpy(rois),
- torch.from_numpy(scores.reshape(-1)).to(torch.from_numpy(rois).dtype),
- torch.zeros_like(torch.from_numpy(scores.reshape(-1))),
- 0.7,
- )
- keep = nms_out_index.numpy()
- mask = np.zeros(len(rois), dtype=np.bool)
- mask[keep] = True
- keep = np.where(mask & (scores >= self.min_cls_score))[0]
- detections = [detections[i] for i in keep]
- scores = scores[keep]
- for d, score in zip(detections, scores):
- d.score = score
- pred_dets = [d for d in detections if not d.from_det]
- detections = [d for d in detections if d.from_det]
- # set features
- tlbrs = [det.tlbr for det in detections]
- features = extract_reid_features(self.reid_model, image, tlbrs)
- features = features.cpu().numpy()
- for i, det in enumerate(detections):
- det.set_feature(features[i])
- """step 3: association for tracked"""
- # matching for tracked targets
- unconfirmed = []
- tracked_stracks = [] # type: list[STrack]
- for track in self.tracked_stracks:
- if not track.is_activated:
- unconfirmed.append(track)
- else:
- tracked_stracks.append(track)
- dists = matching.nearest_reid_distance(tracked_stracks, detections, metric='euclidean')
- dists = matching.gate_cost_matrix(self.kalman_filter, dists, tracked_stracks, detections)
- matches, u_track, u_detection = matching.linear_assignment(dists, thresh=self.min_ap_dist)
- for itracked, idet in matches:
- tracked_stracks[itracked].update(detections[idet], self.frame_id, image)
- # matching for missing targets
- detections = [detections[i] for i in u_detection]
- dists = matching.nearest_reid_distance(self.lost_stracks, detections, metric='euclidean')
- dists = matching.gate_cost_matrix(self.kalman_filter, dists, self.lost_stracks, detections)
- matches, u_lost, u_detection = matching.linear_assignment(dists, thresh=self.min_ap_dist)
- for ilost, idet in matches:
- track = self.lost_stracks[ilost] # type: STrack
- det = detections[idet]
- track.re_activate(det, self.frame_id, image, new_id=not self.use_refind)
- refind_stracks.append(track)
- # remaining tracked
- # tracked
- len_det = len(u_detection)
- detections = [detections[i] for i in u_detection] + pred_dets
- r_tracked_stracks = [tracked_stracks[i] for i in u_track]
- dists = matching.iou_distance(r_tracked_stracks, detections)
- matches, u_track, u_detection = matching.linear_assignment(dists, thresh=0.5)
- for itracked, idet in matches:
- r_tracked_stracks[itracked].update(detections[idet], self.frame_id, image, update_feature=True)
- for it in u_track:
- track = r_tracked_stracks[it]
- track.mark_lost()
- lost_stracks.append(track)
- # unconfirmed
- detections = [detections[i] for i in u_detection if i < len_det]
- dists = matching.iou_distance(unconfirmed, detections)
- matches, u_unconfirmed, u_detection = matching.linear_assignment(dists, thresh=0.7)
- for itracked, idet in matches:
- unconfirmed[itracked].update(detections[idet], self.frame_id, image, update_feature=True)
- for it in u_unconfirmed:
- track = unconfirmed[it]
- track.mark_removed()
- removed_stracks.append(track)
- """step 4: init new stracks"""
- for inew in u_detection:
- track = detections[inew]
- if not track.from_det or track.score < 0.6:
- continue
- track.activate(self.kalman_filter, self.frame_id, image)
- activated_starcks.append(track)
- """step 6: update state"""
- for track in self.lost_stracks:
- if self.frame_id - track.end_frame > self.max_time_lost:
- track.mark_removed()
- removed_stracks.append(track)
- self.tracked_stracks = [t for t in self.tracked_stracks if t.state == TrackState.Tracked]
- self.lost_stracks = [t for t in self.lost_stracks if t.state == TrackState.Lost] # type: list[STrack]
- self.tracked_stracks.extend(activated_starcks)
- self.tracked_stracks.extend(refind_stracks)
- self.lost_stracks.extend(lost_stracks)
- self.removed_stracks.extend(removed_stracks)
- # output_stracks = self.tracked_stracks + self.lost_stracks
- # get scores of lost tracks
- output_tracked_stracks = [track for track in self.tracked_stracks if track.is_activated]
- output_stracks = output_tracked_stracks
- return output_stracks
-
- @staticmethod
- def _xyxy_to_tlwh_array(bbox_xyxy):
- if isinstance(bbox_xyxy, np.ndarray):
- bbox_tlwh = bbox_xyxy.copy()
- elif isinstance(bbox_xyxy, torch.Tensor):
- bbox_tlwh = bbox_xyxy.clone()
- bbox_tlwh[:, 2] = bbox_xyxy[:, 2] - bbox_xyxy[:, 0]
- bbox_tlwh[:, 3] = bbox_xyxy[:, 3] - bbox_xyxy[:, 1]
- return bbox_tlwh
|