123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336 |
- """
- # File : test.py
- # Time :2024-06-11 10:15
- # Author :FEANGYANG
- # version :python 3.7
- # Contact :1071082183@qq.com
- # Description:
- """
- import json
- import itertools
- import random
- import cv2
- from PIL import Image, ImageDraw
- import os
- from superpoint_superglue_deployment import Matcher
- import numpy as np
- from loguru import logger
- import cv2 as cv
- # 定义大类
- class StructureClass:
- def __init__(self, ref_image_path, query_image_path, json_path, save_image_path, json_mask_path):
- self.weldmentclasses = []
- self.ref_image = []
- self.query_image = []
- self.boxes_xy_label = {}
- self.scale_factor = 0.37
- self.read_ref_image(ref_image_path)
- self.read_query_image(query_image_path)
- self.read_json(json_path)
- self.registration_demo(save_image_path, json_mask_path)
- def read_ref_image(self, path):
- self.ref_image = self.process_image_data(path)
- def read_query_image(self, path):
- self.query_image = self.process_image_data(path)
- def replace_query_image(self, wrap_image):
- self.query_image = wrap_image
- def read_json(self, json_path):
- with open(json_path, 'r') as f:
- data = json.load(f)
- for shape in data['shapes']:
- if 'points' in shape:
- shape['points'] = [[int(round(x)), int(round(y))] for x, y in shape['points']]
- x1, y1 = shape['points'][0]
- x2, y2 = shape['points'][1]
- label = shape['label']
- self.boxes_xy_label[label] = [x1, y1, x2, y2]
- def process_image_data(self, data):
- extensions = ['.PNG', '.png', '.jpg', '.jpeg', '.JPG', '.JPEG']
- if any(data.endswith(ext) for ext in extensions):
- if not os.path.exists(data):
- raise FileNotFoundError(f"Image file {data} not found")
- image = cv2.imread(data)
- return image
- else:
- if isinstance(data, np.ndarray):
- return data
- else:
- raise FileNotFoundError(f"Image file {data} not found")
- def registration_demo(self,save_image_path, json_mask_path):
- height, width = self.ref_image.shape[:2]
- ref_image_resize = cv2.resize(self.ref_image, dsize=None, fx=self.scale_factor, fy=self.scale_factor)
- query_image_resize = cv2.resize(self.query_image, dsize=None, fx=self.scale_factor, fy=self.scale_factor)
- ref_gray = cv2.cvtColor(ref_image_resize, cv2.COLOR_BGR2GRAY)
- query_gray = cv2.cvtColor(query_image_resize, cv2.COLOR_BGR2GRAY)
- if os.path.exists(json_path):
- with open(json_mask_path, 'r') as f:
- data = json.load(f)
- shapes = data['shapes']
- shape = shapes[0]
- if shape['shape_type'] == 'polygon':
- coords = [(int(round(x * self.scale_factor)), int(round(y * self.scale_factor))) for x, y in
- shape['points']]
- else:
- coords = []
- mask = np.zeros(ref_gray.shape, dtype=np.uint8) * 255
- pts = np.array(coords, np.int32)
- cv2.fillPoly(mask, [pts], 1)
- ref_gray = cv2.bitwise_and(ref_gray, ref_gray, mask=mask)
- superglue_matcher = Matcher(
- {
- "superpoint": {
- "input_shape": (-1, -1),
- "keypoint_threshold": 0.003,
- },
- "superglue": {
- "match_threshold": 0.5,
- },
- "use_gpu": True,
- }
- )
- query_kpts, ref_kpts, _, _, matches = superglue_matcher.match(query_gray, ref_gray)
- M, mask = cv2.findHomography(
- np.float64([query_kpts[m.queryIdx].pt for m in matches]).reshape(-1, 1, 2),
- np.float64([ref_kpts[m.trainIdx].pt for m in matches]).reshape(-1, 1, 2),
- method=cv2.USAC_MAGSAC,
- ransacReprojThreshold=5.0,
- maxIters=10000,
- confidence=0.95,
- )
- logger.info(f"number of inliers: {mask.sum()}")
- matches = np.array(matches)[np.all(mask > 0, axis=1)]
- matches = sorted(matches, key=lambda match: match.distance)
- matched_image = cv2.drawMatches(
- query_image_resize,
- query_kpts,
- ref_image_resize,
- ref_kpts,
- matches[:50],
- None,
- flags=2,
- )
- match_file_name = f"match.jpg"
- cv2.imwrite(os.path.join(save_image_path, match_file_name), matched_image)
- wrap_file_name = f"wrap.jpg"
- wrap_image = cv.warpPerspective(query_image_resize, M, (ref_image_resize.shape[1], ref_image_resize.shape[0]))
- wrap_image = cv2.resize(wrap_image, (self.ref_image.shape[1], self.ref_image.shape[0]))
- cv2.imwrite(os.path.join(save_image_path, wrap_file_name), wrap_image)
- sub_file_name = f"result.jpg"
- sub_image = cv2.subtract(self.ref_image, wrap_image)
- cv2.imwrite(os.path.join(save_image_path, sub_file_name), sub_image)
- return matched_image, wrap_image, sub_image
- # def process_image_with_mask(self, json_mask_path, ref_gray):
- # with open(json_mask_path, 'r') as f:
- # data = json.load(f)
- # shapes = data['shapes']
- # shape = shapes[0]
- # if shape['shape_type'] == 'polygon':
- # coords = [(int(round(x * self.scale_factor)), int(round(y * self.scale_factor))) for x, y in shape['points']]
- # else:
- # coords = []
- # mask = np.zeros(ref_gray.shape, dtype=np.uint8) * 255
- # pts = np.array(coords, np.int32)
- # cv2.fillPoly(mask, [pts], 1)
- # ref_gray_masked = cv2.bitwise_and(ref_gray, ref_gray, mask=mask)
- # cv2.imwrite('ref_gray_mask.jpg', ref_gray_masked)
- # return ref_gray_masked
- def add_weldmentclass(self, weldmentclass):
- self.weldmentclasses.append(weldmentclass)
- # 焊接件类
- class WeldmentClass(StructureClass):
- def __init__(self, name):
- self.ployclasses = []
- self.name = name
- def add_ploy(self, ployclass):
- self.ployclasses.append(ployclass)
- class SSIMClass:
- def __init__(self, label, x1, y1, x2, y2):
- self.name = 'SSIM'
- self.label = label
- self.x1 = x1
- self.x2 = x2
- self.y1 = y1
- self.y2 = y2
- self.result = self.SSIMfunc()
- def SSIMfunc(self):
- return self.label, self.x1, self.x2, self.y1, self.y2
- class Ploy1Class:
- def __init__(self, label, x1, y1, x2, y2):
- self.name = 'Ploy1'
- self.label = label
- self.x1 = x1
- self.x2 = x2
- self.y1 = y1
- self.y2 = y2
- self.result = self.ploy1func()
- def ploy1func(self):
- return self.label, self.x1, self.x2, self.y1, self.y2
- class Ploy2Class:
- def __init__(self, label, x1, y1, x2, y2):
- self.name = 'Ploy2'
- self.label = label
- self.x1 = x1
- self.x2 = x2
- self.y1 = y1
- self.y2 = y2
- self.result = self.ploy2func()
- def ploy2func(self):
- return self.label, self.x1, self.x2, self.y1, self.y2
- class Ploy3Class:
- def __init__(self, label, x1, y1, x2, y2):
- self.name = 'Ploy3'
- self.label = label
- self.x1 = x1
- self.x2 = x2
- self.y1 = y1
- self.y2 = y2
- self.result = self.ploy3func()
- def ploy3func(self):
- return self.label, self.x1, self.x2, self.y1, self.y2
- # 定义一个函数来获取每个元素的首字母
- def get_first_letter(item):
- return item[0]
- if __name__ == '__main__':
- ref_image_path = './data/yongsheng_image/ref_image/DSC_0452.JPG'
- query_image_path = './data/yongsheng_image/test_image_query/DSC_0445.JPG'
- json_path = './data/yongsheng_image/json/DSC_0452.json'
- save_image_path = './data/yongsheng_image/test_regis_result'
- json_mask_path = './data/yongsheng_image/json/DSC_0452_mask.json'
- for filename in os.listdir(image_dir):
- # struct = StructureClass('./data/yongsheng_image/ref_image/DSC_0452.JPG', './data/yongsheng_image/test_image_query/DSC_0445.JPG', './data/yongsheng_image/json/DSC_0452.json')
- struct = StructureClass(ref_image_path, query_image_path, json_path, save_image_path, json_mask_path)
- grouped_data = {}
- for key, group in itertools.groupby(sorted(struct.boxes_xy_label), get_first_letter):
- grouped_data[key] = list(group)
- # 创建子类实例并添加到大类中
- for key, group in grouped_data.items():
- subclass = WeldmentClass(key)
- for g in group:
- if len(g) == 1:
- xy = struct.boxes_xy_label.get(g)
- ssim = SSIMClass(g, xy[0], xy[1], xy[2], xy[3])
- subclass.add_ploy(ssim)
- else:
- xy = struct.boxes_xy_label.get(g)
- if str(g).endswith('1'):
- poly = Ploy1Class(g, xy[0], xy[1], xy[2], xy[3])
- elif str(g).endswith('2'):
- poly = SSIMClass(g, xy[0], xy[1], xy[2], xy[3])
- else:
- poly = Ploy3Class(g, xy[0], xy[1], xy[2], xy[3])
- subclass.add_ploy(poly)
- struct.add_weldmentclass(subclass)
- w = WeldmentClass('A')
- struct.add_weldmentclass(w)
- print()
- # with open('./DSC_0452.json', 'r') as f:
- # data = json.load(f)
- # save_value = {}
- # for shape in data['shapes']:
- # if 'points' in shape:
- # shape['points'] = [[int(round(x)), int(round(y))] for x, y in shape['points']]
- # x1, y1 = shape['points'][0]
- # x2, y2 = shape['points'][1]
- # label = shape['label']
- # save_value[label] = [x1, y1, x2, y2]
- #
- # # 使用groupby函数根据首字母分组
- # grouped_data = {}
- # for key, group in itertools.groupby(sorted(save_value), get_first_letter):
- # grouped_data[key] = list(group)
- #
- # # 打印分组后的结果
- # for key, group in grouped_data.items():
- # print(f"{key}: {group}")
- #
- # # 创建大类实例
- # big_class = StructureClass()
- # # 创建子类实例并添加到大类中
- # for key, group in grouped_data.items():
- # subclass = WeldmentClass(key)
- # for g in group:
- # if len(g) == 1:
- # xy = save_value.get(g)
- # ssim = SSIMClass(g, xy[0], xy[1], xy[2], xy[3])
- # subclass.add_ploy(ssim)
- # else:
- # xy = save_value.get(g)
- # r = random.randint(1, 4)
- # if r == 1:
- # poly = Ploy1Class(g, xy[0], xy[1], xy[2], xy[3])
- # elif r == 2:
- # poly = Ploy2Class(g, xy[0], xy[1], xy[2], xy[3])
- # else:
- # poly = Ploy3Class(g, xy[0], xy[1], xy[2], xy[3])
- #
- # subclass.add_ploy(poly)
- # big_class.add_weldmentclass(subclass)
- #
- # for subclass in big_class.weldmentclasses:
- # print(subclass)
|