| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216 | """HR (High-Resolution) evaluation. We found using numpy is very slow for high resolution, so we moved it to PyTorch using CUDA.Note, the script only does evaluation. You will need to first inference yourself and save the results to diskExpected directory format for both prediction and ground-truth is:    videomatte_1920x1080        ├── videomatte_motion          ├── pha            ├── 0000              ├── 0000.png          ├── fgr            ├── 0000              ├── 0000.png        ├── videomatte_static          ├── pha            ├── 0000              ├── 0000.png          ├── fgr            ├── 0000              ├── 0000.pngPrediction must have the exact file structure and file name as the ground-truth,meaning that if the ground-truth is png/jpg, prediction should be png/jpg.Example usage:python evaluate.py \    --pred-dir pred/videomatte_1920x1080 \    --true-dir true/videomatte_1920x1080    An excel sheet with evaluation results will be written to "pred/videomatte_1920x1080/videomatte_1920x1080.xlsx""""import argparseimport osimport cv2import korniaimport numpy as npimport xlsxwriterimport torchfrom concurrent.futures import ThreadPoolExecutorfrom tqdm import tqdmclass Evaluator:    def __init__(self):        self.parse_args()        self.init_metrics()        self.evaluate()        self.write_excel()            def parse_args(self):        parser = argparse.ArgumentParser()        parser.add_argument('--pred-dir', type=str, required=True)        parser.add_argument('--true-dir', type=str, required=True)        parser.add_argument('--num-workers', type=int, default=48)        parser.add_argument('--metrics', type=str, nargs='+', default=[            'pha_mad', 'pha_mse', 'pha_grad', 'pha_dtssd', 'fgr_mse'])        self.args = parser.parse_args()            def init_metrics(self):        self.mad = MetricMAD()        self.mse = MetricMSE()        self.grad = MetricGRAD()        self.dtssd = MetricDTSSD()            def evaluate(self):        tasks = []        position = 0                with ThreadPoolExecutor(max_workers=self.args.num_workers) as executor:            for dataset in sorted(os.listdir(self.args.pred_dir)):                if os.path.isdir(os.path.join(self.args.pred_dir, dataset)):                    for clip in sorted(os.listdir(os.path.join(self.args.pred_dir, dataset))):                        future = executor.submit(self.evaluate_worker, dataset, clip, position)                        tasks.append((dataset, clip, future))                        position += 1                            self.results = [(dataset, clip, future.result()) for dataset, clip, future in tasks]            def write_excel(self):        workbook = xlsxwriter.Workbook(os.path.join(self.args.pred_dir, f'{os.path.basename(self.args.pred_dir)}.xlsx'))        summarysheet = workbook.add_worksheet('summary')        metricsheets = [workbook.add_worksheet(metric) for metric in self.results[0][2].keys()]                for i, metric in enumerate(self.results[0][2].keys()):            summarysheet.write(i, 0, metric)            summarysheet.write(i, 1, f'={metric}!B2')                for row, (dataset, clip, metrics) in enumerate(self.results):            for metricsheet, metric in zip(metricsheets, metrics.values()):                # Write the header                if row == 0:                    metricsheet.write(1, 0, 'Average')                    metricsheet.write(1, 1, f'=AVERAGE(C2:ZZ2)')                    for col in range(len(metric)):                        metricsheet.write(0, col + 2, col)                        colname = xlsxwriter.utility.xl_col_to_name(col + 2)                        metricsheet.write(1, col + 2, f'=AVERAGE({colname}3:{colname}9999)')                                        metricsheet.write(row + 2, 0, dataset)                metricsheet.write(row + 2, 1, clip)                metricsheet.write_row(row + 2, 2, metric)                workbook.close()    def evaluate_worker(self, dataset, clip, position):        framenames = sorted(os.listdir(os.path.join(self.args.pred_dir, dataset, clip, 'pha')))        metrics = {metric_name : [] for metric_name in self.args.metrics}                pred_pha_tm1 = None        true_pha_tm1 = None                for i, framename in enumerate(tqdm(framenames, desc=f'{dataset} {clip}', position=position, dynamic_ncols=True)):            true_pha = cv2.imread(os.path.join(self.args.true_dir, dataset, clip, 'pha', framename), cv2.IMREAD_GRAYSCALE)            pred_pha = cv2.imread(os.path.join(self.args.pred_dir, dataset, clip, 'pha', framename), cv2.IMREAD_GRAYSCALE)                        true_pha = torch.from_numpy(true_pha).cuda(non_blocking=True).float().div_(255)            pred_pha = torch.from_numpy(pred_pha).cuda(non_blocking=True).float().div_(255)                        if 'pha_mad' in self.args.metrics:                metrics['pha_mad'].append(self.mad(pred_pha, true_pha))            if 'pha_mse' in self.args.metrics:                metrics['pha_mse'].append(self.mse(pred_pha, true_pha))            if 'pha_grad' in self.args.metrics:                metrics['pha_grad'].append(self.grad(pred_pha, true_pha))            if 'pha_conn' in self.args.metrics:                metrics['pha_conn'].append(self.conn(pred_pha, true_pha))            if 'pha_dtssd' in self.args.metrics:                if i == 0:                    metrics['pha_dtssd'].append(0)                else:                    metrics['pha_dtssd'].append(self.dtssd(pred_pha, pred_pha_tm1, true_pha, true_pha_tm1))                                pred_pha_tm1 = pred_pha            true_pha_tm1 = true_pha                        if 'fgr_mse' in self.args.metrics:                true_fgr = cv2.imread(os.path.join(self.args.true_dir, dataset, clip, 'fgr', framename), cv2.IMREAD_COLOR)                pred_fgr = cv2.imread(os.path.join(self.args.pred_dir, dataset, clip, 'fgr', framename), cv2.IMREAD_COLOR)                                true_fgr = torch.from_numpy(true_fgr).float().div_(255)                pred_fgr = torch.from_numpy(pred_fgr).float().div_(255)                                true_msk = true_pha > 0                metrics['fgr_mse'].append(self.mse(pred_fgr[true_msk], true_fgr[true_msk]))        return metricsclass MetricMAD:    def __call__(self, pred, true):        return (pred - true).abs_().mean() * 1e3class MetricMSE:    def __call__(self, pred, true):        return ((pred - true) ** 2).mean() * 1e3class MetricGRAD:    def __init__(self, sigma=1.4):        self.filter_x, self.filter_y = self.gauss_filter(sigma)        self.filter_x = torch.from_numpy(self.filter_x).unsqueeze(0).cuda()        self.filter_y = torch.from_numpy(self.filter_y).unsqueeze(0).cuda()        def __call__(self, pred, true):        true_grad = self.gauss_gradient(true)        pred_grad = self.gauss_gradient(pred)        return ((true_grad - pred_grad) ** 2).sum() / 1000        def gauss_gradient(self, img):        img_filtered_x = kornia.filters.filter2D(img[None, None, :, :], self.filter_x, border_type='replicate')[0, 0]        img_filtered_y = kornia.filters.filter2D(img[None, None, :, :], self.filter_y, border_type='replicate')[0, 0]        return (img_filtered_x**2 + img_filtered_y**2).sqrt()        @staticmethod    def gauss_filter(sigma, epsilon=1e-2):        half_size = np.ceil(sigma * np.sqrt(-2 * np.log(np.sqrt(2 * np.pi) * sigma * epsilon)))        size = np.int(2 * half_size + 1)        # create filter in x axis        filter_x = np.zeros((size, size))        for i in range(size):            for j in range(size):                filter_x[i, j] = MetricGRAD.gaussian(i - half_size, sigma) * MetricGRAD.dgaussian(                    j - half_size, sigma)        # normalize filter        norm = np.sqrt((filter_x**2).sum())        filter_x = filter_x / norm        filter_y = np.transpose(filter_x)        return filter_x, filter_y            @staticmethod    def gaussian(x, sigma):        return np.exp(-x**2 / (2 * sigma**2)) / (sigma * np.sqrt(2 * np.pi))        @staticmethod    def dgaussian(x, sigma):        return -x * MetricGRAD.gaussian(x, sigma) / sigma**2class MetricDTSSD:    def __call__(self, pred_t, pred_tm1, true_t, true_tm1):        dtSSD = ((pred_t - pred_tm1) - (true_t - true_tm1)) ** 2        dtSSD = dtSSD.sum() / true_t.numel()        dtSSD = dtSSD.sqrt()        return dtSSD * 1e2if __name__ == '__main__':    Evaluator()
 |