jde_matching.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. # Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """
  15. This code is based on https://github.com/Zhongdao/Towards-Realtime-MOT/blob/master/tracker/matching.py
  16. """
  17. import lap
  18. import scipy
  19. import numpy as np
  20. from scipy.spatial.distance import cdist
  21. from ..motion import kalman_filter
  22. import warnings
  23. warnings.filterwarnings("ignore")
  24. __all__ = [
  25. 'merge_matches',
  26. 'linear_assignment',
  27. 'bbox_ious',
  28. 'iou_distance',
  29. 'embedding_distance',
  30. 'fuse_motion',
  31. ]
  32. def merge_matches(m1, m2, shape):
  33. O, P, Q = shape
  34. m1 = np.asarray(m1)
  35. m2 = np.asarray(m2)
  36. M1 = scipy.sparse.coo_matrix(
  37. (np.ones(len(m1)), (m1[:, 0], m1[:, 1])), shape=(O, P))
  38. M2 = scipy.sparse.coo_matrix(
  39. (np.ones(len(m2)), (m2[:, 0], m2[:, 1])), shape=(P, Q))
  40. mask = M1 * M2
  41. match = mask.nonzero()
  42. match = list(zip(match[0], match[1]))
  43. unmatched_O = tuple(set(range(O)) - set([i for i, j in match]))
  44. unmatched_Q = tuple(set(range(Q)) - set([j for i, j in match]))
  45. return match, unmatched_O, unmatched_Q
  46. def linear_assignment(cost_matrix, thresh):
  47. if cost_matrix.size == 0:
  48. return np.empty(
  49. (0, 2), dtype=int), tuple(range(cost_matrix.shape[0])), tuple(
  50. range(cost_matrix.shape[1]))
  51. matches, unmatched_a, unmatched_b = [], [], []
  52. cost, x, y = lap.lapjv(cost_matrix, extend_cost=True, cost_limit=thresh)
  53. for ix, mx in enumerate(x):
  54. if mx >= 0:
  55. matches.append([ix, mx])
  56. unmatched_a = np.where(x < 0)[0]
  57. unmatched_b = np.where(y < 0)[0]
  58. matches = np.asarray(matches)
  59. return matches, unmatched_a, unmatched_b
  60. def bbox_ious(atlbrs, btlbrs):
  61. boxes = np.ascontiguousarray(atlbrs, dtype=np.float)
  62. query_boxes = np.ascontiguousarray(btlbrs, dtype=np.float)
  63. N = boxes.shape[0]
  64. K = query_boxes.shape[0]
  65. ious = np.zeros((N, K), dtype=boxes.dtype)
  66. if N * K == 0:
  67. return ious
  68. for k in range(K):
  69. box_area = ((query_boxes[k, 2] - query_boxes[k, 0] + 1) *
  70. (query_boxes[k, 3] - query_boxes[k, 1] + 1))
  71. for n in range(N):
  72. iw = (min(boxes[n, 2], query_boxes[k, 2]) - max(
  73. boxes[n, 0], query_boxes[k, 0]) + 1)
  74. if iw > 0:
  75. ih = (min(boxes[n, 3], query_boxes[k, 3]) - max(
  76. boxes[n, 1], query_boxes[k, 1]) + 1)
  77. if ih > 0:
  78. ua = float((boxes[n, 2] - boxes[n, 0] + 1) * (boxes[
  79. n, 3] - boxes[n, 1] + 1) + box_area - iw * ih)
  80. ious[n, k] = iw * ih / ua
  81. return ious
  82. def iou_distance(atracks, btracks):
  83. """
  84. Compute cost based on IoU between two list[STrack].
  85. """
  86. if (len(atracks) > 0 and isinstance(atracks[0], np.ndarray)) or (
  87. len(btracks) > 0 and isinstance(btracks[0], np.ndarray)):
  88. atlbrs = atracks
  89. btlbrs = btracks
  90. else:
  91. atlbrs = [track.tlbr for track in atracks]
  92. btlbrs = [track.tlbr for track in btracks]
  93. _ious = bbox_ious(atlbrs, btlbrs)
  94. cost_matrix = 1 - _ious
  95. return cost_matrix
  96. def embedding_distance(tracks, detections, metric='euclidean'):
  97. """
  98. Compute cost based on features between two list[STrack].
  99. """
  100. cost_matrix = np.zeros((len(tracks), len(detections)), dtype=np.float)
  101. if cost_matrix.size == 0:
  102. return cost_matrix
  103. det_features = np.asarray(
  104. [track.curr_feat for track in detections], dtype=np.float)
  105. track_features = np.asarray(
  106. [track.smooth_feat for track in tracks], dtype=np.float)
  107. cost_matrix = np.maximum(0.0, cdist(track_features, det_features,
  108. metric)) # Nomalized features
  109. return cost_matrix
  110. def fuse_motion(kf,
  111. cost_matrix,
  112. tracks,
  113. detections,
  114. only_position=False,
  115. lambda_=0.98):
  116. if cost_matrix.size == 0:
  117. return cost_matrix
  118. gating_dim = 2 if only_position else 4
  119. gating_threshold = kalman_filter.chi2inv95[gating_dim]
  120. measurements = np.asarray([det.to_xyah() for det in detections])
  121. for row, track in enumerate(tracks):
  122. gating_distance = kf.gating_distance(
  123. track.mean,
  124. track.covariance,
  125. measurements,
  126. only_position,
  127. metric='maha')
  128. cost_matrix[row, gating_distance > gating_threshold] = np.inf
  129. cost_matrix[row] = lambda_ * cost_matrix[row] + (1 - lambda_
  130. ) * gating_distance
  131. return cost_matrix