Big update
This commit is contained in:
79
utils/export_ppyoloe.py
Normal file
79
utils/export_ppyoloe.py
Normal file
@@ -0,0 +1,79 @@
|
||||
import os
|
||||
import sys
|
||||
import onnx
|
||||
import paddle
|
||||
import paddle.nn as nn
|
||||
from ppdet.core.workspace import load_config, merge_config
|
||||
from ppdet.utils.check import check_version, check_config
|
||||
from ppdet.utils.cli import ArgsParser
|
||||
from ppdet.engine import Trainer
|
||||
from ppdet.slim import build_slim_model
|
||||
|
||||
|
||||
class DeepStreamOutput(nn.Layer):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def forward(self, x):
|
||||
boxes = x['bbox']
|
||||
x['bbox_num'] = x['bbox_num'].transpose([0, 2, 1])
|
||||
scores = paddle.max(x['bbox_num'], 2, keepdim=True)
|
||||
classes = paddle.cast(paddle.argmax(x['bbox_num'], 2, keepdim=True), dtype='float32')
|
||||
return paddle.concat((boxes, scores, classes), axis=2)
|
||||
|
||||
|
||||
def ppyoloe_export(FLAGS):
|
||||
cfg = load_config(FLAGS.config)
|
||||
FLAGS.opt['weights'] = FLAGS.weights
|
||||
FLAGS.opt['exclude_nms'] = True
|
||||
merge_config(FLAGS.opt)
|
||||
if FLAGS.slim_config:
|
||||
cfg = build_slim_model(cfg, FLAGS.slim_config, mode='test')
|
||||
merge_config(FLAGS.opt)
|
||||
check_config(cfg)
|
||||
check_version()
|
||||
trainer = Trainer(cfg, mode='test')
|
||||
trainer.load_weights(cfg.weights)
|
||||
trainer.model.eval()
|
||||
if not os.path.exists('.tmp'):
|
||||
os.makedirs('.tmp')
|
||||
static_model, _ = trainer._get_infer_cfg_and_input_spec('.tmp')
|
||||
os.system('rm -r .tmp')
|
||||
return cfg, static_model
|
||||
|
||||
|
||||
def main(FLAGS):
|
||||
paddle.set_device('cpu')
|
||||
cfg, model = ppyoloe_export(FLAGS)
|
||||
|
||||
model = nn.Sequential(model, DeepStreamOutput())
|
||||
|
||||
img_size = [cfg.eval_height, cfg.eval_width]
|
||||
|
||||
onnx_input_im = {}
|
||||
onnx_input_im['image'] = paddle.static.InputSpec(shape=[None, 3, *img_size], dtype='float32', name='image')
|
||||
onnx_input_im['scale_factor'] = paddle.static.InputSpec(shape=[None, 2], dtype='float32', name='scale_factor')
|
||||
onnx_output_file = cfg.filename + '.onnx'
|
||||
|
||||
paddle.onnx.export(model, cfg.filename, input_spec=[onnx_input_im], opset_version=FLAGS.opset)
|
||||
|
||||
if FLAGS.simplify:
|
||||
import onnxsim
|
||||
model_onnx = onnx.load(onnx_output_file)
|
||||
model_onnx, _ = onnxsim.simplify(model_onnx)
|
||||
onnx.save(model_onnx, onnx_output_file)
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = ArgsParser()
|
||||
parser.add_argument('-w', '--weights', required=True, help='Input weights (.pdparams) file path (required)')
|
||||
parser.add_argument('--slim_config', default=None, type=str, help='Slim configuration file of slim method')
|
||||
parser.add_argument('--opset', type=int, default=11, help='ONNX opset version')
|
||||
parser.add_argument('--simplify', action='store_true', help='ONNX simplify model')
|
||||
args = parser.parse_args()
|
||||
return args
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
FLAGS = parse_args()
|
||||
sys.exit(main(FLAGS))
|
||||
82
utils/export_yoloV5.py
Normal file
82
utils/export_yoloV5.py
Normal file
@@ -0,0 +1,82 @@
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
import warnings
|
||||
import onnx
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
from models.experimental import attempt_load
|
||||
from utils.torch_utils import select_device
|
||||
from models.yolo import Detect
|
||||
|
||||
|
||||
class DeepStreamOutput(nn.Module):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def forward(self, x):
|
||||
x = x[0]
|
||||
boxes = x[:, :, :4]
|
||||
objectness = x[:, :, 4:5]
|
||||
scores, classes = torch.max(x[:, :, 5:], 2, keepdim=True)
|
||||
return torch.cat((boxes, scores, classes, objectness), dim=2)
|
||||
|
||||
|
||||
def suppress_warnings():
|
||||
warnings.filterwarnings('ignore', category=torch.jit.TracerWarning)
|
||||
warnings.filterwarnings('ignore', category=UserWarning)
|
||||
warnings.filterwarnings('ignore', category=DeprecationWarning)
|
||||
|
||||
|
||||
def yolov5_export(weights, device):
|
||||
model = attempt_load(weights, device=device, inplace=True, fuse=True)
|
||||
model.eval()
|
||||
for k, m in model.named_modules():
|
||||
if isinstance(m, Detect):
|
||||
m.inplace = False
|
||||
m.dynamic = False
|
||||
m.export = True
|
||||
return model
|
||||
|
||||
|
||||
def main(args):
|
||||
suppress_warnings()
|
||||
device = select_device('cpu')
|
||||
model = yolov5_export(args.weights, device)
|
||||
|
||||
model = nn.Sequential(model, DeepStreamOutput())
|
||||
|
||||
img_size = args.size * 2 if len(args.size) == 1 else args.size
|
||||
|
||||
if img_size == [640, 640] and args.p6:
|
||||
img_size = [1280] * 2
|
||||
|
||||
onnx_input_im = torch.zeros(1, 3, *img_size).to(device)
|
||||
onnx_output_file = os.path.basename(args.weights).split('.pt')[0] + '.onnx'
|
||||
|
||||
torch.onnx.export(model, onnx_input_im, onnx_output_file, verbose=False, opset_version=args.opset,
|
||||
do_constant_folding=True, input_names=['input'], output_names=['output'], dynamic_axes=None)
|
||||
|
||||
if args.simplify:
|
||||
import onnxsim
|
||||
model_onnx = onnx.load(onnx_output_file)
|
||||
model_onnx, _ = onnxsim.simplify(model_onnx)
|
||||
onnx.save(model_onnx, onnx_output_file)
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(description='DeepStream YOLOv5 conversion')
|
||||
parser.add_argument('-w', '--weights', required=True, help='Input weights (.pt) file path (required)')
|
||||
parser.add_argument('-s', '--size', nargs='+', type=int, default=[640], help='Inference size [H,W] (default [640])')
|
||||
parser.add_argument('--p6', action='store_true', help='P6 model')
|
||||
parser.add_argument('--opset', type=int, default=17, help='ONNX opset version')
|
||||
parser.add_argument('--simplify', action='store_true', help='ONNX simplify model')
|
||||
args = parser.parse_args()
|
||||
if not os.path.isfile(args.weights):
|
||||
raise SystemExit('Invalid weights file')
|
||||
return args
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
args = parse_args()
|
||||
sys.exit(main(args))
|
||||
88
utils/export_yoloV6.py
Normal file
88
utils/export_yoloV6.py
Normal file
@@ -0,0 +1,88 @@
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
import warnings
|
||||
import onnx
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
from yolov6.utils.checkpoint import load_checkpoint
|
||||
from yolov6.layers.common import RepVGGBlock, ConvModule, SiLU
|
||||
from yolov6.models.effidehead import Detect
|
||||
|
||||
|
||||
class DeepStreamOutput(nn.Module):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def forward(self, x):
|
||||
print(x)
|
||||
boxes = x[:, :, :4]
|
||||
objectness = x[:, :, 4:5]
|
||||
scores, classes = torch.max(x[:, :, 5:], 2, keepdim=True)
|
||||
return torch.cat((boxes, scores, classes, objectness), dim=2)
|
||||
|
||||
|
||||
def suppress_warnings():
|
||||
warnings.filterwarnings('ignore', category=torch.jit.TracerWarning)
|
||||
warnings.filterwarnings('ignore', category=UserWarning)
|
||||
warnings.filterwarnings('ignore', category=DeprecationWarning)
|
||||
|
||||
|
||||
def yolov6_export(weights, device):
|
||||
model = load_checkpoint(weights, map_location=device, inplace=True, fuse=True)
|
||||
for layer in model.modules():
|
||||
if isinstance(layer, RepVGGBlock):
|
||||
layer.switch_to_deploy()
|
||||
elif isinstance(layer, nn.Upsample) and not hasattr(layer, 'recompute_scale_factor'):
|
||||
layer.recompute_scale_factor = None
|
||||
model.eval()
|
||||
for k, m in model.named_modules():
|
||||
if isinstance(m, ConvModule):
|
||||
if hasattr(m, 'act') and isinstance(m.act, nn.SiLU):
|
||||
m.act = SiLU()
|
||||
elif isinstance(m, Detect):
|
||||
m.inplace = False
|
||||
return model
|
||||
|
||||
|
||||
def main(args):
|
||||
suppress_warnings()
|
||||
device = torch.device('cpu')
|
||||
model = yolov6_export(args.weights, device)
|
||||
|
||||
model = nn.Sequential(model, DeepStreamOutput())
|
||||
|
||||
img_size = args.size * 2 if len(args.size) == 1 else args.size
|
||||
|
||||
if img_size == [640, 640] and args.p6:
|
||||
img_size = [1280] * 2
|
||||
|
||||
onnx_input_im = torch.zeros(1, 3, *img_size).to(device)
|
||||
onnx_output_file = os.path.basename(args.weights).split('.pt')[0] + '.onnx'
|
||||
|
||||
torch.onnx.export(model, onnx_input_im, onnx_output_file, verbose=False, opset_version=args.opset,
|
||||
do_constant_folding=True, input_names=['input'], output_names=['output'], dynamic_axes=None)
|
||||
|
||||
if args.simplify:
|
||||
import onnxsim
|
||||
model_onnx = onnx.load(onnx_output_file)
|
||||
model_onnx, _ = onnxsim.simplify(model_onnx)
|
||||
onnx.save(model_onnx, onnx_output_file)
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(description='DeepStream YOLOv6 conversion')
|
||||
parser.add_argument('-w', '--weights', required=True, help='Input weights (.pt) file path (required)')
|
||||
parser.add_argument('-s', '--size', nargs='+', type=int, default=[640], help='Inference size [H,W] (default [640])')
|
||||
parser.add_argument('--p6', action='store_true', help='P6 model')
|
||||
parser.add_argument('--opset', type=int, default=13, help='ONNX opset version')
|
||||
parser.add_argument('--simplify', action='store_true', help='ONNX simplify model')
|
||||
args = parser.parse_args()
|
||||
if not os.path.isfile(args.weights):
|
||||
raise SystemExit('Invalid weights file')
|
||||
return args
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
args = parse_args()
|
||||
sys.exit(main(args))
|
||||
86
utils/export_yoloV7.py
Normal file
86
utils/export_yoloV7.py
Normal file
@@ -0,0 +1,86 @@
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
import warnings
|
||||
import onnx
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
import models
|
||||
from models.experimental import attempt_load
|
||||
from utils.torch_utils import select_device
|
||||
from utils.activations import Hardswish, SiLU
|
||||
|
||||
|
||||
class DeepStreamOutput(nn.Module):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def forward(self, x):
|
||||
boxes = x[:, :, :4]
|
||||
objectness = x[:, :, 4:5]
|
||||
scores, classes = torch.max(x[:, :, 5:], 2, keepdim=True)
|
||||
return torch.cat((boxes, scores, classes, objectness), dim=2)
|
||||
|
||||
|
||||
def suppress_warnings():
|
||||
warnings.filterwarnings('ignore', category=torch.jit.TracerWarning)
|
||||
warnings.filterwarnings('ignore', category=UserWarning)
|
||||
warnings.filterwarnings('ignore', category=DeprecationWarning)
|
||||
|
||||
|
||||
def yolov7_export(weights, device):
|
||||
model = attempt_load(weights, map_location=device)
|
||||
for k, m in model.named_modules():
|
||||
m._non_persistent_buffers_set = set()
|
||||
if isinstance(m, models.common.Conv):
|
||||
if isinstance(m.act, nn.Hardswish):
|
||||
m.act = Hardswish()
|
||||
elif isinstance(m.act, nn.SiLU):
|
||||
m.act = SiLU()
|
||||
model.model[-1].export = False
|
||||
model.model[-1].concat = True
|
||||
model.eval()
|
||||
return model
|
||||
|
||||
|
||||
def main(args):
|
||||
suppress_warnings()
|
||||
device = select_device('cpu')
|
||||
model = yolov7_export(args.weights, device)
|
||||
|
||||
model = nn.Sequential(model, DeepStreamOutput())
|
||||
|
||||
img_size = args.size * 2 if len(args.size) == 1 else args.size
|
||||
|
||||
if img_size == [640, 640] and args.p6:
|
||||
img_size = [1280] * 2
|
||||
|
||||
onnx_input_im = torch.zeros(1, 3, *img_size).to(device)
|
||||
onnx_output_file = os.path.basename(args.weights).split('.pt')[0] + '.onnx'
|
||||
|
||||
torch.onnx.export(model, onnx_input_im, onnx_output_file, verbose=False, opset_version=args.opset,
|
||||
do_constant_folding=True, input_names=['input'], output_names=['output'], dynamic_axes=None)
|
||||
|
||||
if args.simplify:
|
||||
import onnxsim
|
||||
model_onnx = onnx.load(onnx_output_file)
|
||||
model_onnx, _ = onnxsim.simplify(model_onnx)
|
||||
onnx.save(model_onnx, onnx_output_file)
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(description='DeepStream YOLOv7 conversion')
|
||||
parser.add_argument('-w', '--weights', required=True, help='Input weights (.pt) file path (required)')
|
||||
parser.add_argument('-s', '--size', nargs='+', type=int, default=[640], help='Inference size [H,W] (default [640])')
|
||||
parser.add_argument('--p6', action='store_true', help='P6 model')
|
||||
parser.add_argument('--opset', type=int, default=12, help='ONNX opset version')
|
||||
parser.add_argument('--simplify', action='store_true', help='ONNX simplify model')
|
||||
args = parser.parse_args()
|
||||
if not os.path.isfile(args.weights):
|
||||
raise SystemExit('Invalid weights file')
|
||||
return args
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
args = parse_args()
|
||||
sys.exit(main(args))
|
||||
85
utils/export_yoloV8.py
Normal file
85
utils/export_yoloV8.py
Normal file
@@ -0,0 +1,85 @@
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
import warnings
|
||||
import onnx
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
from copy import deepcopy
|
||||
from ultralytics import YOLO
|
||||
from ultralytics.yolo.utils.torch_utils import select_device
|
||||
from ultralytics.nn.modules import C2f, Detect
|
||||
|
||||
|
||||
class DeepStreamOutput(nn.Module):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def forward(self, x):
|
||||
x = x.transpose(1, 2)
|
||||
boxes = x[:, :, :4]
|
||||
scores, classes = torch.max(x[:, :, 4:], 2, keepdim=True)
|
||||
return torch.cat((boxes, scores, classes), dim=2)
|
||||
|
||||
|
||||
def suppress_warnings():
|
||||
warnings.filterwarnings('ignore', category=torch.jit.TracerWarning)
|
||||
warnings.filterwarnings('ignore', category=UserWarning)
|
||||
warnings.filterwarnings('ignore', category=DeprecationWarning)
|
||||
|
||||
|
||||
def yolov8_export(weights, device):
|
||||
model = YOLO(weights)
|
||||
model = deepcopy(model.model).to(device)
|
||||
for p in model.parameters():
|
||||
p.requires_grad = False
|
||||
model.eval()
|
||||
model.float()
|
||||
model = model.fuse()
|
||||
for k, m in model.named_modules():
|
||||
if isinstance(m, Detect):
|
||||
m.dynamic = False
|
||||
m.export = True
|
||||
m.format = 'onnx'
|
||||
elif isinstance(m, C2f):
|
||||
m.forward = m.forward_split
|
||||
return model
|
||||
|
||||
|
||||
def main(args):
|
||||
suppress_warnings()
|
||||
device = select_device('cpu')
|
||||
model = yolov8_export(args.weights, device)
|
||||
|
||||
model = nn.Sequential(model, DeepStreamOutput())
|
||||
|
||||
img_size = args.size * 2 if len(args.size) == 1 else args.size
|
||||
|
||||
onnx_input_im = torch.zeros(1, 3, *img_size).to(device)
|
||||
onnx_output_file = os.path.basename(args.weights).split('.pt')[0] + '.onnx'
|
||||
|
||||
torch.onnx.export(model, onnx_input_im, onnx_output_file, verbose=False, opset_version=args.opset,
|
||||
do_constant_folding=True, input_names=['input'], output_names=['output'], dynamic_axes=None)
|
||||
|
||||
if args.simplify:
|
||||
import onnxsim
|
||||
model_onnx = onnx.load(onnx_output_file)
|
||||
model_onnx, _ = onnxsim.simplify(model_onnx)
|
||||
onnx.save(model_onnx, onnx_output_file)
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(description='DeepStream YOLOv8 conversion')
|
||||
parser.add_argument('-w', '--weights', required=True, help='Input weights (.pt) file path (required)')
|
||||
parser.add_argument('-s', '--size', nargs='+', type=int, default=[640], help='Inference size [H,W] (default [640])')
|
||||
parser.add_argument('--opset', type=int, default=16, help='ONNX opset version')
|
||||
parser.add_argument('--simplify', action='store_true', help='ONNX simplify model')
|
||||
args = parser.parse_args()
|
||||
if not os.path.isfile(args.weights):
|
||||
raise SystemExit('Invalid weights file')
|
||||
return args
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
args = parse_args()
|
||||
sys.exit(main(args))
|
||||
75
utils/export_yolonas.py
Normal file
75
utils/export_yolonas.py
Normal file
@@ -0,0 +1,75 @@
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
import warnings
|
||||
import onnx
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
from super_gradients.training import models
|
||||
|
||||
|
||||
class DeepStreamOutput(nn.Module):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def forward(self, x):
|
||||
boxes = x[0]
|
||||
scores, classes = torch.max(x[1], 2, keepdim=True)
|
||||
return torch.cat((boxes, scores, classes), dim=2)
|
||||
|
||||
|
||||
def suppress_warnings():
|
||||
warnings.filterwarnings('ignore', category=torch.jit.TracerWarning)
|
||||
warnings.filterwarnings('ignore', category=UserWarning)
|
||||
warnings.filterwarnings('ignore', category=DeprecationWarning)
|
||||
|
||||
|
||||
def yolonas_export(model_name, weights, num_classes, size):
|
||||
img_size = size * 2 if len(size) == 1 else size
|
||||
model = models.get(model_name, num_classes=num_classes, checkpoint_path=weights)
|
||||
model.eval()
|
||||
model.prep_model_for_conversion(input_size=[1, 3, *img_size])
|
||||
return model
|
||||
|
||||
|
||||
def main(args):
|
||||
suppress_warnings()
|
||||
device = torch.device('cpu')
|
||||
model = yolonas_export(args.model, args.weights, args.classes, args.size)
|
||||
|
||||
model = nn.Sequential(model, DeepStreamOutput())
|
||||
|
||||
img_size = args.size * 2 if len(args.size) == 1 else args.size
|
||||
|
||||
onnx_input_im = torch.zeros(1, 3, *img_size).to(device)
|
||||
onnx_output_file = os.path.basename(args.weights).split('.pt')[0] + '.onnx'
|
||||
|
||||
torch.onnx.export(model, onnx_input_im, onnx_output_file, verbose=False, opset_version=args.opset,
|
||||
do_constant_folding=True, input_names=['input'], output_names=['output'], dynamic_axes=None)
|
||||
|
||||
if args.simplify:
|
||||
import onnxsim
|
||||
model_onnx = onnx.load(onnx_output_file)
|
||||
model_onnx, _ = onnxsim.simplify(model_onnx)
|
||||
onnx.save(model_onnx, onnx_output_file)
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(description='DeepStream YOLO-NAS conversion')
|
||||
parser.add_argument('-m', '--model', required=True, help='Model name (required)')
|
||||
parser.add_argument('-w', '--weights', required=True, help='Input weights (.pth) file path (required)')
|
||||
parser.add_argument('-n', '--classes', type=int, default=80, help='Number of trained classes (default 80)')
|
||||
parser.add_argument('-s', '--size', nargs='+', type=int, default=[640], help='Inference size [H,W] (default [640])')
|
||||
parser.add_argument('--opset', type=int, default=14, help='ONNX opset version')
|
||||
parser.add_argument('--simplify', action='store_true', help='ONNX simplify model')
|
||||
args = parser.parse_args()
|
||||
if args.model == '':
|
||||
raise SystemExit('Invalid model name')
|
||||
if not os.path.isfile(args.weights):
|
||||
raise SystemExit('Invalid weights file')
|
||||
return args
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
args = parse_args()
|
||||
sys.exit(main(args))
|
||||
99
utils/export_yolor.py
Normal file
99
utils/export_yolor.py
Normal file
@@ -0,0 +1,99 @@
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
import warnings
|
||||
import onnx
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
|
||||
|
||||
class DeepStreamOutput(nn.Module):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def forward(self, x):
|
||||
x = x[0]
|
||||
boxes = x[:, :, :4]
|
||||
objectness = x[:, :, 4:5]
|
||||
scores, classes = torch.max(x[:, :, 5:], 2, keepdim=True)
|
||||
return torch.cat((boxes, scores, classes, objectness), dim=2)
|
||||
|
||||
|
||||
def suppress_warnings():
|
||||
warnings.filterwarnings('ignore', category=torch.jit.TracerWarning)
|
||||
warnings.filterwarnings('ignore', category=UserWarning)
|
||||
warnings.filterwarnings('ignore', category=DeprecationWarning)
|
||||
|
||||
|
||||
def yolor_export(weights, cfg, size, device):
|
||||
if os.path.isfile('models/experimental.py'):
|
||||
import models
|
||||
from models.experimental import attempt_load
|
||||
from utils.activations import Hardswish
|
||||
model = attempt_load(weights, map_location=device)
|
||||
for k, m in model.named_modules():
|
||||
m._non_persistent_buffers_set = set()
|
||||
if isinstance(m, models.common.Conv) and isinstance(m.act, nn.Hardswish):
|
||||
m.act = Hardswish()
|
||||
elif isinstance(m, nn.Upsample) and not hasattr(m, 'recompute_scale_factor'):
|
||||
m.recompute_scale_factor = None
|
||||
model.model[-1].training = False
|
||||
model.model[-1].export = False
|
||||
else:
|
||||
from models.models import Darknet
|
||||
model_name = os.path.basename(weights).split('.pt')[0]
|
||||
if cfg == '':
|
||||
cfg = 'cfg/' + model_name + '.cfg'
|
||||
if not os.path.isfile(cfg):
|
||||
raise SystemExit('CFG file not found')
|
||||
model = Darknet(cfg, img_size=size[::-1]).to(device)
|
||||
model.load_state_dict(torch.load(weights, map_location=device)['model'])
|
||||
model.float()
|
||||
model.fuse()
|
||||
model.eval()
|
||||
model.module_list[-1].training = False
|
||||
return model
|
||||
|
||||
|
||||
def main(args):
|
||||
suppress_warnings()
|
||||
device = torch.device('cpu')
|
||||
model = yolor_export(args.weights, args.cfg, args.size, device)
|
||||
|
||||
model = nn.Sequential(model, DeepStreamOutput())
|
||||
|
||||
img_size = args.size * 2 if len(args.size) == 1 else args.size
|
||||
|
||||
if img_size == [640, 640] and args.p6:
|
||||
img_size = [1280] * 2
|
||||
|
||||
onnx_input_im = torch.zeros(1, 3, *img_size).to(device)
|
||||
onnx_output_file = os.path.basename(args.weights).split('.pt')[0] + '.onnx'
|
||||
|
||||
torch.onnx.export(model, onnx_input_im, onnx_output_file, verbose=False, opset_version=args.opset,
|
||||
do_constant_folding=True, input_names=['input'], output_names=['output'], dynamic_axes=None)
|
||||
|
||||
if args.simplify:
|
||||
import onnxsim
|
||||
model_onnx = onnx.load(onnx_output_file)
|
||||
model_onnx, _ = onnxsim.simplify(model_onnx)
|
||||
onnx.save(model_onnx, onnx_output_file)
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(description='DeepStream YOLOR conversion')
|
||||
parser.add_argument('-w', '--weights', required=True, help='Input weights (.pt) file path (required)')
|
||||
parser.add_argument('-c', '--cfg', default='', help='Input cfg (.cfg) file path')
|
||||
parser.add_argument('-s', '--size', nargs='+', type=int, default=[640], help='Inference size [H,W] (default [640])')
|
||||
parser.add_argument('--p6', action='store_true', help='P6 model')
|
||||
parser.add_argument('--opset', type=int, default=12, help='ONNX opset version')
|
||||
parser.add_argument('--simplify', action='store_true', help='ONNX simplify model')
|
||||
args = parser.parse_args()
|
||||
if not os.path.isfile(args.weights):
|
||||
raise SystemExit('Invalid weights file')
|
||||
return args
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
args = parse_args()
|
||||
sys.exit(main(args))
|
||||
81
utils/export_yolox.py
Normal file
81
utils/export_yolox.py
Normal file
@@ -0,0 +1,81 @@
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
import warnings
|
||||
import onnx
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
from yolox.exp import get_exp
|
||||
from yolox.utils import replace_module
|
||||
from yolox.models.network_blocks import SiLU
|
||||
|
||||
|
||||
class DeepStreamOutput(nn.Module):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def forward(self, x):
|
||||
boxes = x[:, :, :4]
|
||||
objectness = x[:, :, 4:5]
|
||||
scores, classes = torch.max(x[:, :, 5:], 2, keepdim=True)
|
||||
return torch.cat((boxes, scores, classes, objectness), dim=2)
|
||||
|
||||
|
||||
def suppress_warnings():
|
||||
warnings.filterwarnings('ignore', category=torch.jit.TracerWarning)
|
||||
warnings.filterwarnings('ignore', category=UserWarning)
|
||||
warnings.filterwarnings('ignore', category=DeprecationWarning)
|
||||
|
||||
|
||||
def yolox_export(weights, exp_file):
|
||||
exp = get_exp(exp_file)
|
||||
model = exp.get_model()
|
||||
ckpt = torch.load(weights, map_location='cpu')
|
||||
model.eval()
|
||||
if 'model' in ckpt:
|
||||
ckpt = ckpt['model']
|
||||
model.load_state_dict(ckpt)
|
||||
model = replace_module(model, nn.SiLU, SiLU)
|
||||
model.head.decode_in_inference = True
|
||||
return model, exp
|
||||
|
||||
|
||||
def main(args):
|
||||
suppress_warnings()
|
||||
device = torch.device('cpu')
|
||||
model, exp = yolox_export(args.weights, args.exp)
|
||||
|
||||
model = nn.Sequential(model, DeepStreamOutput())
|
||||
|
||||
img_size = [exp.input_size[1], exp.input_size[0]]
|
||||
|
||||
onnx_input_im = torch.zeros(1, 3, *img_size).to(device)
|
||||
onnx_output_file = os.path.basename(args.weights).split('.pt')[0] + '.onnx'
|
||||
|
||||
torch.onnx.export(model, onnx_input_im, onnx_output_file, verbose=False, opset_version=args.opset,
|
||||
do_constant_folding=True, input_names=['input'], output_names=['output'], dynamic_axes=None)
|
||||
|
||||
if args.simplify:
|
||||
import onnxsim
|
||||
model_onnx = onnx.load(onnx_output_file)
|
||||
model_onnx, _ = onnxsim.simplify(model_onnx)
|
||||
onnx.save(model_onnx, onnx_output_file)
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(description='DeepStream YOLOX conversion')
|
||||
parser.add_argument('-w', '--weights', required=True, help='Input weights (.pth) file path (required)')
|
||||
parser.add_argument('-c', '--exp', required=True, help='Input exp (.py) file path (required)')
|
||||
parser.add_argument('--opset', type=int, default=11, help='ONNX opset version')
|
||||
parser.add_argument('--simplify', action='store_true', help='ONNX simplify model')
|
||||
args = parser.parse_args()
|
||||
if not os.path.isfile(args.weights):
|
||||
raise SystemExit('Invalid weights file')
|
||||
if not os.path.isfile(args.exp):
|
||||
raise SystemExit('Invalid exp file')
|
||||
return args
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
args = parse_args()
|
||||
sys.exit(main(args))
|
||||
@@ -1,432 +0,0 @@
|
||||
import os
|
||||
import struct
|
||||
import paddle
|
||||
import numpy as np
|
||||
from ppdet.core.workspace import load_config, merge_config
|
||||
from ppdet.utils.check import check_version, check_config
|
||||
from ppdet.utils.cli import ArgsParser
|
||||
from ppdet.engine import Trainer
|
||||
from ppdet.slim import build_slim_model
|
||||
|
||||
|
||||
class Layers(object):
|
||||
def __init__(self, size, fw, fc, letter_box):
|
||||
self.blocks = [0 for _ in range(300)]
|
||||
self.current = -1
|
||||
|
||||
self.backbone_outs = []
|
||||
self.neck_fpn_feats = []
|
||||
self.neck_pan_feats = []
|
||||
self.yolo_head_cls = []
|
||||
self.yolo_head_reg = []
|
||||
|
||||
self.width = size[0] if len(size) == 1 else size[1]
|
||||
self.height = size[0]
|
||||
self.letter_box = letter_box
|
||||
|
||||
self.fw = fw
|
||||
self.fc = fc
|
||||
self.wc = 0
|
||||
|
||||
self.net()
|
||||
|
||||
def ConvBNLayer(self, child):
|
||||
self.current += 1
|
||||
|
||||
self.convolutional(child, act='swish')
|
||||
|
||||
def CSPResStage(self, child, ret):
|
||||
self.current += 1
|
||||
|
||||
if child.conv_down is not None:
|
||||
self.convolutional(child.conv_down, act='swish')
|
||||
self.convolutional(child.conv1, act='swish')
|
||||
self.route('-2')
|
||||
self.convolutional(child.conv2, act='swish')
|
||||
idx = -3
|
||||
for m in child.blocks:
|
||||
self.convolutional(m.conv1, act='swish')
|
||||
self.convolutional(m.conv2, act='swish')
|
||||
self.shortcut(-3)
|
||||
idx -= 3
|
||||
self.route('%d, -1' % idx)
|
||||
if child.attn is not None:
|
||||
self.reduce((1, 2), mode='mean', keepdim=True)
|
||||
self.convolutional(child.attn.fc, act='hardsigmoid')
|
||||
self.shortcut(-3, ew='mul')
|
||||
self.convolutional(child.conv3, act='swish')
|
||||
if ret is True:
|
||||
self.backbone_outs.append(self.current)
|
||||
|
||||
def CSPStage(self, child, stage):
|
||||
self.current += 1
|
||||
|
||||
self.convolutional(child.conv1, act='swish')
|
||||
self.route('-2')
|
||||
self.convolutional(child.conv2, act='swish')
|
||||
idx = -3
|
||||
for m in child.convs:
|
||||
if m.__class__.__name__ == 'BasicBlock':
|
||||
self.convolutional(m.conv1, act='swish')
|
||||
self.convolutional(m.conv2, act='swish')
|
||||
idx -= 2
|
||||
elif m.__class__.__name__ == 'SPP':
|
||||
self.maxpool(m.pool0)
|
||||
self.route('-2')
|
||||
self.maxpool(m.pool1)
|
||||
self.route('-4')
|
||||
self.maxpool(m.pool2)
|
||||
self.route('-6, -5, -3, -1')
|
||||
self.convolutional(m.conv, act='swish')
|
||||
idx -= 7
|
||||
self.route('%d, -1' % idx)
|
||||
self.convolutional(child.conv3, act='swish')
|
||||
if stage == 'fpn':
|
||||
self.neck_fpn_feats.append(self.current)
|
||||
elif stage == 'pan':
|
||||
self.neck_pan_feats.append(self.current)
|
||||
|
||||
def Concat(self, route):
|
||||
self.current += 1
|
||||
|
||||
r = self.get_route(route)
|
||||
self.route('-1, %d' % r)
|
||||
|
||||
def Upsample(self):
|
||||
self.current += 1
|
||||
|
||||
self.upsample()
|
||||
|
||||
def AvgPool2d(self, route=None):
|
||||
self.current += 1
|
||||
|
||||
if route is not None:
|
||||
r = self.get_route(route)
|
||||
self.route('%d' % r)
|
||||
self.avgpool()
|
||||
|
||||
def ESEAttn(self, child, route=0):
|
||||
self.current += 1
|
||||
|
||||
if route < 0:
|
||||
self.route('%d' % route)
|
||||
self.convolutional(child.fc, act='sigmoid')
|
||||
self.shortcut(route - 3, ew='mul')
|
||||
self.convolutional(child.conv, act='swish')
|
||||
if route == 0:
|
||||
self.shortcut(-5)
|
||||
|
||||
def Conv2D(self, child, act='linear'):
|
||||
self.current += 1
|
||||
|
||||
self.convolutional(child, act=act)
|
||||
|
||||
def Shuffle(self, reshape=None, transpose1=None, transpose2=None, output=''):
|
||||
self.current += 1
|
||||
|
||||
self.shuffle(reshape=reshape, transpose1=transpose1, transpose2=transpose2)
|
||||
if output == 'cls':
|
||||
self.yolo_head_cls.append(self.current)
|
||||
elif output == 'reg':
|
||||
self.yolo_head_reg.append(self.current)
|
||||
|
||||
def SoftMax(self, axes):
|
||||
self.current += 1
|
||||
|
||||
self.softmax(axes)
|
||||
|
||||
def Detect(self, output):
|
||||
self.current += 1
|
||||
|
||||
routes = self.yolo_head_cls if output == 'cls' else self.yolo_head_reg
|
||||
|
||||
for i, route in enumerate(routes):
|
||||
routes[i] = self.get_route(route)
|
||||
self.route(str(routes)[1:-1], axis=-1)
|
||||
self.yolo(output)
|
||||
|
||||
def net(self):
|
||||
lb = 'letter_box=1\n' if self.letter_box else ''
|
||||
|
||||
self.fc.write('[net]\n' +
|
||||
'width=%d\n' % self.width +
|
||||
'height=%d\n' % self.height +
|
||||
'channels=3\n' +
|
||||
lb)
|
||||
|
||||
def convolutional(self, cv, act='linear'):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
self.get_state_dict(cv.state_dict())
|
||||
|
||||
if cv.__class__.__name__ == 'Conv2D':
|
||||
filters = cv._out_channels
|
||||
size = cv._kernel_size
|
||||
stride = cv._stride
|
||||
pad = cv._padding
|
||||
groups = cv._groups
|
||||
bias = cv.bias
|
||||
bn = False
|
||||
else:
|
||||
filters = cv.conv._out_channels
|
||||
size = cv.conv._kernel_size
|
||||
stride = cv.conv._stride
|
||||
pad = cv.conv._padding
|
||||
groups = cv.conv._groups
|
||||
bias = cv.conv.bias
|
||||
bn = True if hasattr(cv, 'bn') else False
|
||||
|
||||
b = 'batch_normalize=1\n' if bn is True else ''
|
||||
g = 'groups=%d\n' % groups if groups > 1 else ''
|
||||
w = 'bias=1\n' if bias is not None and bn is not False else 'bias=0\n' if bias is None and bn is False else ''
|
||||
|
||||
self.fc.write('\n[convolutional]\n' +
|
||||
b +
|
||||
'filters=%d\n' % filters +
|
||||
'size=%s\n' % self.get_value(size) +
|
||||
'stride=%s\n' % self.get_value(stride) +
|
||||
'pad=%s\n' % self.get_value(pad) +
|
||||
g +
|
||||
w +
|
||||
'activation=%s\n' % act)
|
||||
|
||||
def route(self, layers, axis=0):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
a = 'axis=%d\n' % axis if axis != 0 else ''
|
||||
|
||||
self.fc.write('\n[route]\n' +
|
||||
'layers=%s\n' % layers +
|
||||
a)
|
||||
|
||||
def shortcut(self, r, ew='add', act='linear'):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
m = 'mode=mul\n' if ew == 'mul' else ''
|
||||
|
||||
self.fc.write('\n[shortcut]\n' +
|
||||
'from=%d\n' % r +
|
||||
m +
|
||||
'activation=%s\n' % act)
|
||||
|
||||
def reduce(self, dim, mode='mean', keepdim=False):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
self.fc.write('\n[reduce]\n' +
|
||||
'mode=%s\n' % mode +
|
||||
'axes=%s\n' % str(dim)[1:-1] +
|
||||
'keep=%d\n' % keepdim)
|
||||
|
||||
def maxpool(self, m):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
stride = m.stride
|
||||
size = m.ksize
|
||||
mode = m.ceil_mode
|
||||
|
||||
m = 'maxpool_up' if mode else 'maxpool'
|
||||
|
||||
self.fc.write('\n[%s]\n' % m +
|
||||
'stride=%d\n' % stride +
|
||||
'size=%d\n' % size)
|
||||
|
||||
def upsample(self):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
stride = 2
|
||||
|
||||
self.fc.write('\n[upsample]\n' +
|
||||
'stride=%d\n' % stride)
|
||||
|
||||
def avgpool(self):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
self.fc.write('\n[avgpool]\n')
|
||||
|
||||
def shuffle(self, reshape=None, transpose1=None, transpose2=None):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
r = 'reshape=%s\n' % ', '.join(str(x) for x in reshape) if reshape is not None else ''
|
||||
t1 = 'transpose1=%s\n' % ', '.join(str(x) for x in transpose1) if transpose1 is not None else ''
|
||||
t2 = 'transpose2=%s\n' % ', '.join(str(x) for x in transpose2) if transpose2 is not None else ''
|
||||
|
||||
self.fc.write('\n[shuffle]\n' +
|
||||
r +
|
||||
t1 +
|
||||
t2)
|
||||
|
||||
def softmax(self, axes):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
self.fc.write('\n[softmax]\n' +
|
||||
'axes=%d\n' % axes)
|
||||
|
||||
def yolo(self, output):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
self.fc.write('\n[%s]\n' % output)
|
||||
|
||||
def get_state_dict(self, state_dict):
|
||||
for k, v in state_dict.items():
|
||||
if 'alpha' not in k:
|
||||
vr = v.reshape([-1]).numpy()
|
||||
self.fw.write('{} {} '.format(k, len(vr)))
|
||||
for vv in vr:
|
||||
self.fw.write(' ')
|
||||
self.fw.write(struct.pack('>f', float(vv)).hex())
|
||||
self.fw.write('\n')
|
||||
self.wc += 1
|
||||
|
||||
def get_anchors(self, anchor_points, stride_tensor):
|
||||
vr = anchor_points.numpy()
|
||||
self.fw.write('{} {} '.format('anchor_points', len(vr)))
|
||||
for vv in vr:
|
||||
self.fw.write(' ')
|
||||
self.fw.write(struct.pack('>f', float(vv)).hex())
|
||||
self.fw.write('\n')
|
||||
self.wc += 1
|
||||
vr = stride_tensor.numpy()
|
||||
self.fw.write('{} {} '.format('stride_tensor', len(vr)))
|
||||
for vv in vr:
|
||||
self.fw.write(' ')
|
||||
self.fw.write(struct.pack('>f', float(vv)).hex())
|
||||
self.fw.write('\n')
|
||||
self.wc += 1
|
||||
|
||||
def get_value(self, key):
|
||||
if type(key) == int:
|
||||
return key
|
||||
return key[0] if key[0] == key[1] else str(key)[1:-1]
|
||||
|
||||
def get_route(self, n):
|
||||
r = 0
|
||||
for i, b in enumerate(self.blocks):
|
||||
if i <= n:
|
||||
r += b
|
||||
else:
|
||||
break
|
||||
return r - 1
|
||||
|
||||
|
||||
def export_model():
|
||||
paddle.set_device('cpu')
|
||||
|
||||
FLAGS = parse_args()
|
||||
|
||||
cfg = load_config(FLAGS.config)
|
||||
|
||||
FLAGS.opt['weights'] = FLAGS.weights
|
||||
FLAGS.opt['exclude_nms'] = True
|
||||
|
||||
if 'norm_type' in cfg and cfg['norm_type'] == 'sync_bn':
|
||||
FLAGS.opt['norm_type'] = 'bn'
|
||||
merge_config(FLAGS.opt)
|
||||
|
||||
if FLAGS.slim_config:
|
||||
cfg = build_slim_model(cfg, FLAGS.slim_config, mode='test')
|
||||
|
||||
merge_config(FLAGS.opt)
|
||||
check_config(cfg)
|
||||
check_version()
|
||||
|
||||
trainer = Trainer(cfg, mode='test')
|
||||
trainer.load_weights(cfg.weights)
|
||||
|
||||
trainer.model.eval()
|
||||
|
||||
if not os.path.exists('.tmp'):
|
||||
os.makedirs('.tmp')
|
||||
|
||||
static_model, _ = trainer._get_infer_cfg_and_input_spec('.tmp')
|
||||
|
||||
os.system('rm -r .tmp')
|
||||
|
||||
return cfg, static_model
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = ArgsParser()
|
||||
parser.add_argument('-w', '--weights', required=True, type=str, help='Input weights (.pdparams) file path (required)')
|
||||
parser.add_argument('--slim_config', default=None, type=str, help='Slim configuration file of slim method')
|
||||
args = parser.parse_args()
|
||||
return args
|
||||
|
||||
|
||||
cfg, model = export_model()
|
||||
|
||||
model_name = cfg.filename
|
||||
inference_size = (cfg.eval_height, cfg.eval_width)
|
||||
letter_box = False
|
||||
|
||||
for sample_transforms in cfg['EvalReader']['sample_transforms']:
|
||||
if 'Resize' in sample_transforms:
|
||||
letter_box = sample_transforms['Resize']['keep_ratio']
|
||||
|
||||
backbone = cfg[cfg.architecture]['backbone']
|
||||
neck = cfg[cfg.architecture]['neck']
|
||||
yolo_head = cfg[cfg.architecture]['yolo_head']
|
||||
|
||||
wts_file = model_name + '.wts' if 'ppyoloe' in model_name else 'ppyoloe_' + model_name + '.wts'
|
||||
cfg_file = model_name + '.cfg' if 'ppyoloe' in model_name else 'ppyoloe_' + model_name + '.cfg'
|
||||
|
||||
with open(wts_file, 'w') as fw, open(cfg_file, 'w') as fc:
|
||||
layers = Layers(inference_size, fw, fc, letter_box)
|
||||
|
||||
if backbone == 'CSPResNet':
|
||||
layers.fc.write('\n# CSPResNet\n')
|
||||
|
||||
for child in model.backbone.stem:
|
||||
layers.ConvBNLayer(child)
|
||||
for i, child in enumerate(model.backbone.stages):
|
||||
ret = True if i in model.backbone.return_idx else False
|
||||
layers.CSPResStage(child, ret)
|
||||
else:
|
||||
raise SystemExit('Model not supported')
|
||||
|
||||
if neck == 'CustomCSPPAN':
|
||||
layers.fc.write('\n# CustomCSPPAN\n')
|
||||
|
||||
blocks = layers.backbone_outs[::-1]
|
||||
for i, block in enumerate(blocks):
|
||||
if i > 0:
|
||||
layers.Concat(block)
|
||||
layers.CSPStage(model.neck.fpn_stages[i][0], 'fpn')
|
||||
if i < model.neck.num_blocks - 1:
|
||||
layers.ConvBNLayer(model.neck.fpn_routes[i])
|
||||
layers.Upsample()
|
||||
layers.neck_pan_feats = [layers.neck_fpn_feats[-1], ]
|
||||
for i in reversed(range(model.neck.num_blocks - 1)):
|
||||
layers.ConvBNLayer(model.neck.pan_routes[i])
|
||||
layers.Concat(layers.neck_fpn_feats[i])
|
||||
layers.CSPStage(model.neck.pan_stages[i][0], 'pan')
|
||||
layers.neck_pan_feats = layers.neck_pan_feats[::-1]
|
||||
else:
|
||||
raise SystemExit('Model not supported')
|
||||
|
||||
if yolo_head == 'PPYOLOEHead':
|
||||
layers.fc.write('\n# PPYOLOEHead\n')
|
||||
|
||||
reg_max = model.yolo_head.reg_max + 1 if hasattr(model.yolo_head, 'reg_max') else model.yolo_head.reg_range[1]
|
||||
|
||||
for i, feat in enumerate(layers.neck_pan_feats):
|
||||
if i > 0:
|
||||
layers.AvgPool2d(route=feat)
|
||||
else:
|
||||
layers.AvgPool2d()
|
||||
layers.ESEAttn(model.yolo_head.stem_cls[i])
|
||||
layers.Conv2D(model.yolo_head.pred_cls[i], act='sigmoid')
|
||||
layers.Shuffle(reshape=[model.yolo_head.num_classes, 'hw'], output='cls')
|
||||
layers.ESEAttn(model.yolo_head.stem_reg[i], route=-7)
|
||||
layers.Conv2D(model.yolo_head.pred_reg[i])
|
||||
layers.Shuffle(reshape=[4, reg_max, 'hw'], transpose2=[1, 0, 2])
|
||||
layers.SoftMax(0)
|
||||
layers.Conv2D(model.yolo_head.proj_conv)
|
||||
layers.Shuffle(reshape=['h', 'w'], output='reg')
|
||||
layers.Detect('cls')
|
||||
layers.Detect('reg')
|
||||
layers.get_anchors(model.yolo_head.anchor_points.reshape([-1]), model.yolo_head.stride_tensor)
|
||||
|
||||
else:
|
||||
raise SystemExit('Model not supported')
|
||||
|
||||
os.system('echo "%d" | cat - %s > temp && mv temp %s' % (layers.wc, wts_file, wts_file))
|
||||
@@ -1,394 +0,0 @@
|
||||
import argparse
|
||||
import os
|
||||
import struct
|
||||
import torch
|
||||
from utils.torch_utils import select_device
|
||||
|
||||
|
||||
class Layers(object):
|
||||
def __init__(self, n, size, fw, fc):
|
||||
self.blocks = [0 for _ in range(n)]
|
||||
self.current = 0
|
||||
|
||||
self.width = size[0] if len(size) == 1 else size[1]
|
||||
self.height = size[0]
|
||||
|
||||
self.num = 0
|
||||
self.nc = 0
|
||||
self.anchors = ''
|
||||
self.masks = []
|
||||
|
||||
self.fw = fw
|
||||
self.fc = fc
|
||||
self.wc = 0
|
||||
|
||||
self.net()
|
||||
|
||||
def Focus(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# Focus\n')
|
||||
|
||||
self.reorg()
|
||||
self.convolutional(child.conv)
|
||||
|
||||
def Conv(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# Conv\n')
|
||||
|
||||
self.convolutional(child)
|
||||
|
||||
def BottleneckCSP(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# BottleneckCSP\n')
|
||||
|
||||
self.convolutional(child.cv2)
|
||||
self.route('-2')
|
||||
self.convolutional(child.cv1)
|
||||
idx = -3
|
||||
for m in child.m:
|
||||
if m.add:
|
||||
self.convolutional(m.cv1)
|
||||
self.convolutional(m.cv2)
|
||||
self.shortcut(-3)
|
||||
idx -= 3
|
||||
else:
|
||||
self.convolutional(m.cv1)
|
||||
self.convolutional(m.cv2)
|
||||
idx -= 2
|
||||
self.convolutional(child.cv3)
|
||||
self.route('-1, %d' % (idx - 1))
|
||||
self.batchnorm(child.bn, child.act)
|
||||
self.convolutional(child.cv4)
|
||||
|
||||
def C3(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# C3\n')
|
||||
|
||||
self.convolutional(child.cv2)
|
||||
self.route('-2')
|
||||
self.convolutional(child.cv1)
|
||||
idx = -3
|
||||
for m in child.m:
|
||||
if m.add:
|
||||
self.convolutional(m.cv1)
|
||||
self.convolutional(m.cv2)
|
||||
self.shortcut(-3)
|
||||
idx -= 3
|
||||
else:
|
||||
self.convolutional(m.cv1)
|
||||
self.convolutional(m.cv2)
|
||||
idx -= 2
|
||||
self.route('-1, %d' % idx)
|
||||
self.convolutional(child.cv3)
|
||||
|
||||
def SPP(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# SPP\n')
|
||||
|
||||
self.convolutional(child.cv1)
|
||||
self.maxpool(child.m[0])
|
||||
self.route('-2')
|
||||
self.maxpool(child.m[1])
|
||||
self.route('-4')
|
||||
self.maxpool(child.m[2])
|
||||
self.route('-6, -5, -3, -1')
|
||||
self.convolutional(child.cv2)
|
||||
|
||||
def SPPF(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# SPPF\n')
|
||||
|
||||
self.convolutional(child.cv1)
|
||||
self.maxpool(child.m)
|
||||
self.maxpool(child.m)
|
||||
self.maxpool(child.m)
|
||||
self.route('-4, -3, -2, -1')
|
||||
self.convolutional(child.cv2)
|
||||
|
||||
def Upsample(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# Upsample\n')
|
||||
|
||||
self.upsample(child)
|
||||
|
||||
def Concat(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# Concat\n')
|
||||
|
||||
r = []
|
||||
for i in range(1, len(child.f)):
|
||||
r.append(self.get_route(child.f[i]))
|
||||
self.route('-1, %s' % str(r)[1:-1])
|
||||
|
||||
def Detect(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# Detect\n')
|
||||
|
||||
self.get_anchors(child.state_dict(), child.m[0].out_channels)
|
||||
|
||||
for i, m in enumerate(child.m):
|
||||
r = self.get_route(child.f[i])
|
||||
self.route('%d' % r)
|
||||
self.convolutional(m, detect=True)
|
||||
self.yolo(i)
|
||||
|
||||
def net(self):
|
||||
self.fc.write('[net]\n' +
|
||||
'width=%d\n' % self.width +
|
||||
'height=%d\n' % self.height +
|
||||
'channels=3\n' +
|
||||
'letter_box=1\n')
|
||||
|
||||
def CBH(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# CBH\n')
|
||||
|
||||
self.convolutional(child.conv, act='hardswish')
|
||||
|
||||
def LC_Block(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# LC_Block\n')
|
||||
|
||||
self.convolutional(child.dw_conv, act='hardswish')
|
||||
if child.use_se:
|
||||
self.avgpool()
|
||||
self.convolutional(child.se.conv1, act='relu')
|
||||
self.convolutional(child.se.conv2, act='silu')
|
||||
self.shortcut(-4, ew='mul')
|
||||
self.convolutional(child.pw_conv, act='hardswish')
|
||||
|
||||
def Dense(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# Dense\n')
|
||||
|
||||
self.convolutional(child.dense_conv, act='hardswish')
|
||||
|
||||
def reorg(self):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
self.fc.write('\n[reorg]\n')
|
||||
|
||||
def convolutional(self, cv, act=None, detect=False):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
self.get_state_dict(cv.state_dict())
|
||||
|
||||
if cv._get_name() == 'Conv2d':
|
||||
filters = cv.out_channels
|
||||
size = cv.kernel_size
|
||||
stride = cv.stride
|
||||
pad = cv.padding
|
||||
groups = cv.groups
|
||||
bias = cv.bias
|
||||
bn = False
|
||||
act = 'linear' if not detect else 'logistic'
|
||||
else:
|
||||
filters = cv.conv.out_channels
|
||||
size = cv.conv.kernel_size
|
||||
stride = cv.conv.stride
|
||||
pad = cv.conv.padding
|
||||
groups = cv.conv.groups
|
||||
bias = cv.conv.bias
|
||||
bn = True if hasattr(cv, 'bn') else False
|
||||
if act is None:
|
||||
act = self.get_activation(cv.act._get_name()) if hasattr(cv, 'act') else 'linear'
|
||||
|
||||
b = 'batch_normalize=1\n' if bn is True else ''
|
||||
g = 'groups=%d\n' % groups if groups > 1 else ''
|
||||
w = 'bias=1\n' if bias is not None and bn is not False else 'bias=0\n' if bias is None and bn is False else ''
|
||||
|
||||
self.fc.write('\n[convolutional]\n' +
|
||||
b +
|
||||
'filters=%d\n' % filters +
|
||||
'size=%s\n' % self.get_value(size) +
|
||||
'stride=%s\n' % self.get_value(stride) +
|
||||
'pad=%s\n' % self.get_value(pad) +
|
||||
g +
|
||||
w +
|
||||
'activation=%s\n' % act)
|
||||
|
||||
def batchnorm(self, bn, act):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
self.get_state_dict(bn.state_dict())
|
||||
|
||||
filters = bn.num_features
|
||||
act = self.get_activation(act._get_name())
|
||||
|
||||
self.fc.write('\n[batchnorm]\n' +
|
||||
'filters=%d\n' % filters +
|
||||
'activation=%s\n' % act)
|
||||
|
||||
def route(self, layers):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
self.fc.write('\n[route]\n' +
|
||||
'layers=%s\n' % layers)
|
||||
|
||||
def shortcut(self, r, ew='add', act='linear'):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
m = 'mode=mul\n' if ew == 'mul' else ''
|
||||
|
||||
self.fc.write('\n[shortcut]\n' +
|
||||
'from=%d\n' % r +
|
||||
m +
|
||||
'activation=%s\n' % act)
|
||||
|
||||
def maxpool(self, m):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
stride = m.stride
|
||||
size = m.kernel_size
|
||||
mode = m.ceil_mode
|
||||
|
||||
m = 'maxpool_up' if mode else 'maxpool'
|
||||
|
||||
self.fc.write('\n[%s]\n' % m +
|
||||
'stride=%d\n' % stride +
|
||||
'size=%d\n' % size)
|
||||
|
||||
def upsample(self, child):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
stride = child.scale_factor
|
||||
|
||||
self.fc.write('\n[upsample]\n' +
|
||||
'stride=%d\n' % stride)
|
||||
|
||||
def avgpool(self):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
self.fc.write('\n[avgpool]\n')
|
||||
|
||||
def yolo(self, i):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
self.fc.write('\n[yolo]\n' +
|
||||
'mask=%s\n' % self.masks[i] +
|
||||
'anchors=%s\n' % self.anchors +
|
||||
'classes=%d\n' % self.nc +
|
||||
'num=%d\n' % self.num +
|
||||
'scale_x_y=2.0\n' +
|
||||
'new_coords=1\n')
|
||||
|
||||
def get_state_dict(self, state_dict):
|
||||
for k, v in state_dict.items():
|
||||
if 'num_batches_tracked' not in k:
|
||||
vr = v.reshape(-1).numpy()
|
||||
self.fw.write('{} {} '.format(k, len(vr)))
|
||||
for vv in vr:
|
||||
self.fw.write(' ')
|
||||
self.fw.write(struct.pack('>f', float(vv)).hex())
|
||||
self.fw.write('\n')
|
||||
self.wc += 1
|
||||
|
||||
def get_anchors(self, state_dict, out_channels):
|
||||
anchor_grid = state_dict['anchor_grid']
|
||||
aa = anchor_grid.reshape(-1).tolist()
|
||||
am = anchor_grid.tolist()
|
||||
|
||||
self.num = (len(aa) / 2)
|
||||
self.nc = int((out_channels / (self.num / len(am))) - 5)
|
||||
self.anchors = str(aa)[1:-1]
|
||||
|
||||
n = 0
|
||||
for m in am:
|
||||
mask = []
|
||||
for _ in range(len(m)):
|
||||
mask.append(n)
|
||||
n += 1
|
||||
self.masks.append(str(mask)[1:-1])
|
||||
|
||||
def get_value(self, key):
|
||||
if type(key) == int:
|
||||
return key
|
||||
return key[0] if key[0] == key[1] else str(key)[1:-1]
|
||||
|
||||
def get_route(self, n):
|
||||
r = 0
|
||||
if n < 0:
|
||||
for i, b in enumerate(self.blocks[self.current-1::-1]):
|
||||
if i < abs(n) - 1:
|
||||
r -= b
|
||||
else:
|
||||
break
|
||||
else:
|
||||
for i, b in enumerate(self.blocks):
|
||||
if i <= n:
|
||||
r += b
|
||||
else:
|
||||
break
|
||||
return r - 1
|
||||
|
||||
def get_activation(self, act):
|
||||
if act == 'Hardswish':
|
||||
return 'hardswish'
|
||||
elif act == 'LeakyReLU':
|
||||
return 'leaky'
|
||||
elif act == 'SiLU':
|
||||
return 'silu'
|
||||
return 'linear'
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(description='PyTorch YOLOv5 conversion')
|
||||
parser.add_argument('-w', '--weights', required=True, help='Input weights (.pt) file path (required)')
|
||||
parser.add_argument(
|
||||
'-s', '--size', nargs='+', type=int, help='Inference size [H,W] (default [640])')
|
||||
parser.add_argument("--p6", action="store_true", help="P6 model")
|
||||
args = parser.parse_args()
|
||||
if not os.path.isfile(args.weights):
|
||||
raise SystemExit('Invalid weights file')
|
||||
if not args.size:
|
||||
args.size = [1280] if args.p6 else [640]
|
||||
return args.weights, args.size
|
||||
|
||||
|
||||
pt_file, inference_size = parse_args()
|
||||
|
||||
model_name = os.path.basename(pt_file).split('.pt')[0]
|
||||
wts_file = model_name + '.wts' if 'yolov5' in model_name else 'yolov5_' + model_name + '.wts'
|
||||
cfg_file = model_name + '.cfg' if 'yolov5' in model_name else 'yolov5_' + model_name + '.cfg'
|
||||
|
||||
device = select_device('cpu')
|
||||
model = torch.load(pt_file, map_location=device)['model'].float()
|
||||
|
||||
anchor_grid = model.model[-1].anchors * model.model[-1].stride[..., None, None]
|
||||
delattr(model.model[-1], 'anchor_grid')
|
||||
model.model[-1].register_buffer('anchor_grid', anchor_grid)
|
||||
|
||||
model.to(device).eval()
|
||||
|
||||
with open(wts_file, 'w') as fw, open(cfg_file, 'w') as fc:
|
||||
layers = Layers(len(model.model), inference_size, fw, fc)
|
||||
|
||||
for child in model.model.children():
|
||||
if child._get_name() == 'Focus':
|
||||
layers.Focus(child)
|
||||
elif child._get_name() == 'Conv':
|
||||
layers.Conv(child)
|
||||
elif child._get_name() == 'BottleneckCSP':
|
||||
layers.BottleneckCSP(child)
|
||||
elif child._get_name() == 'C3':
|
||||
layers.C3(child)
|
||||
elif child._get_name() == 'SPP':
|
||||
layers.SPP(child)
|
||||
elif child._get_name() == 'SPPF':
|
||||
layers.SPPF(child)
|
||||
elif child._get_name() == 'Upsample':
|
||||
layers.Upsample(child)
|
||||
elif child._get_name() == 'Concat':
|
||||
layers.Concat(child)
|
||||
elif child._get_name() == 'Detect':
|
||||
layers.Detect(child)
|
||||
elif child._get_name() == 'CBH':
|
||||
layers.CBH(child)
|
||||
elif child._get_name() == 'LC_Block':
|
||||
layers.LC_Block(child)
|
||||
elif child._get_name() == 'Dense':
|
||||
layers.Dense(child)
|
||||
else:
|
||||
raise SystemExit('Model not supported')
|
||||
|
||||
os.system('echo "%d" | cat - %s > temp && mv temp %s' % (layers.wc, wts_file, wts_file))
|
||||
@@ -1,588 +0,0 @@
|
||||
import argparse
|
||||
import os
|
||||
import struct
|
||||
import torch
|
||||
from yolov6.assigners.anchor_generator import generate_anchors
|
||||
|
||||
|
||||
class Layers(object):
|
||||
def __init__(self, size, fw, fc):
|
||||
self.blocks = [0 for _ in range(300)]
|
||||
self.current = -1
|
||||
|
||||
self.width = size[0] if len(size) == 1 else size[1]
|
||||
self.height = size[0]
|
||||
|
||||
self.backbone_outs = []
|
||||
self.fpn_feats = []
|
||||
self.pan_feats = []
|
||||
self.yolo_head_cls = []
|
||||
self.yolo_head_reg = []
|
||||
|
||||
self.fw = fw
|
||||
self.fc = fc
|
||||
self.wc = 0
|
||||
|
||||
self.net()
|
||||
|
||||
def BaseConv(self, child):
|
||||
self.current += 1
|
||||
|
||||
if child._get_name() == 'RepVGGBlock':
|
||||
self.convolutional(child.rbr_reparam, act=self.get_activation(child.nonlinearity._get_name()))
|
||||
elif child._get_name() == 'ConvWrapper' or child._get_name() == 'SimConvWrapper':
|
||||
self.convolutional(child.block)
|
||||
else:
|
||||
raise SystemExit('Model not supported')
|
||||
|
||||
def RepBlock(self, child, stage=''):
|
||||
self.current += 1
|
||||
|
||||
if child.conv1._get_name() == 'RepVGGBlock':
|
||||
self.convolutional(child.conv1.rbr_reparam, act=self.get_activation(child.conv1.nonlinearity._get_name()))
|
||||
if child.block is not None:
|
||||
for m in child.block:
|
||||
self.convolutional(m.rbr_reparam, act=self.get_activation(m.nonlinearity._get_name()))
|
||||
elif child.conv1._get_name() == 'ConvWrapper' or child.conv1._get_name() == 'SimConvWrapper':
|
||||
self.convolutional(child.conv1.block)
|
||||
if child.block is not None:
|
||||
for m in child.block:
|
||||
self.convolutional(m.block)
|
||||
else:
|
||||
raise SystemExit('Model not supported')
|
||||
|
||||
if stage == 'backbone':
|
||||
self.backbone_outs.append(self.current)
|
||||
elif stage == 'pan':
|
||||
self.pan_feats.append(self.current)
|
||||
|
||||
def BepC3(self, child, stage=''):
|
||||
self.current += 1
|
||||
|
||||
if child.concat is True:
|
||||
self.convolutional(child.cv2)
|
||||
self.route('-2')
|
||||
self.convolutional(child.cv1)
|
||||
idx = -3
|
||||
if child.m.conv1.conv1._get_name() == 'RepVGGBlock':
|
||||
self.convolutional(child.m.conv1.conv1.rbr_reparam,
|
||||
act=self.get_activation(child.m.conv1.conv1.nonlinearity._get_name()))
|
||||
self.convolutional(child.m.conv1.conv2.rbr_reparam,
|
||||
act=self.get_activation(child.m.conv1.conv2.nonlinearity._get_name()))
|
||||
idx -= 2
|
||||
if child.m.conv1.shortcut:
|
||||
self.shortcut(-3)
|
||||
idx -= 1
|
||||
if child.m.block is not None:
|
||||
for m in child.m.block:
|
||||
self.convolutional(m.conv1.rbr_reparam, act=self.get_activation(m.conv1.nonlinearity._get_name()))
|
||||
self.convolutional(m.conv2.rbr_reparam, act=self.get_activation(m.conv2.nonlinearity._get_name()))
|
||||
idx -= 2
|
||||
if m.shortcut:
|
||||
self.shortcut(-3)
|
||||
idx -= 1
|
||||
elif child.m.conv1.conv1._get_name() == 'ConvWrapper' or child.m.conv1.conv1._get_name() == 'SimConvWrapper':
|
||||
self.convolutional(child.m.conv1.conv1.block)
|
||||
self.convolutional(child.m.conv1.conv2.block)
|
||||
idx -= 2
|
||||
if child.m.conv1.shortcut:
|
||||
self.shortcut(-3)
|
||||
idx -= 1
|
||||
if child.m.block is not None:
|
||||
for m in child.m.block:
|
||||
self.convolutional(m.conv1.block)
|
||||
self.convolutional(m.conv2.block)
|
||||
idx -= 2
|
||||
if m.shortcut:
|
||||
self.shortcut(-3)
|
||||
idx -= 1
|
||||
else:
|
||||
raise SystemExit('Model not supported')
|
||||
|
||||
if child.concat is True:
|
||||
self.route('-1, %d' % idx)
|
||||
self.convolutional(child.cv3)
|
||||
|
||||
if stage == 'backbone':
|
||||
self.backbone_outs.append(self.current)
|
||||
elif stage == 'pan':
|
||||
self.pan_feats.append(self.current)
|
||||
|
||||
def CSPSPPF(self, child):
|
||||
self.current += 1
|
||||
|
||||
self.convolutional(child.cv2)
|
||||
self.route('-2')
|
||||
self.convolutional(child.cv1)
|
||||
self.convolutional(child.cv3)
|
||||
self.convolutional(child.cv4)
|
||||
self.maxpool(child.m)
|
||||
self.maxpool(child.m)
|
||||
self.maxpool(child.m)
|
||||
self.route('-4, -3, -2, -1')
|
||||
self.convolutional(child.cv5)
|
||||
self.convolutional(child.cv6)
|
||||
self.route('-11, -1')
|
||||
self.convolutional(child.cv7)
|
||||
self.backbone_outs.append(self.current)
|
||||
|
||||
def SPPF(self, child):
|
||||
self.current += 1
|
||||
|
||||
self.convolutional(child.cv1)
|
||||
self.maxpool(child.m)
|
||||
self.maxpool(child.m)
|
||||
self.maxpool(child.m)
|
||||
self.route('-4, -3, -2, -1')
|
||||
self.convolutional(child.cv2)
|
||||
self.backbone_outs.append(self.current)
|
||||
|
||||
def SimConv(self, child, stage=''):
|
||||
self.current += 1
|
||||
|
||||
self.convolutional(child)
|
||||
if stage == 'fpn':
|
||||
self.fpn_feats.append(self.current)
|
||||
|
||||
def BiFusion(self, child, idx):
|
||||
self.current += 1
|
||||
|
||||
self.deconvolutional(child.upsample.upsample_transpose)
|
||||
r = self.get_route(self.backbone_outs[- idx -2])
|
||||
self.route('%d' % r)
|
||||
self.convolutional(child.cv1)
|
||||
r = self.get_route(self.backbone_outs[- idx -3])
|
||||
self.route('%d' % r)
|
||||
self.convolutional(child.cv2)
|
||||
self.convolutional(child.downsample)
|
||||
self.route('-6, -4, -1')
|
||||
self.convolutional(child.cv3)
|
||||
|
||||
def Upsample(self, child):
|
||||
self.current += 1
|
||||
|
||||
self.deconvolutional(child.upsample_transpose)
|
||||
|
||||
def Conv(self, child, act=None):
|
||||
self.current += 1
|
||||
|
||||
self.convolutional(child, act=act)
|
||||
|
||||
def Concat(self, route):
|
||||
self.current += 1
|
||||
|
||||
r = self.get_route(route)
|
||||
self.route('-1, %d' % r)
|
||||
|
||||
def Route(self, route):
|
||||
self.current += 1
|
||||
|
||||
if route > 0:
|
||||
r = self.get_route(route)
|
||||
self.route('%d' % r)
|
||||
else:
|
||||
self.route('%d' % route)
|
||||
|
||||
def Shuffle(self, reshape=None, transpose1=None, transpose2=None, output=''):
|
||||
self.current += 1
|
||||
|
||||
self.shuffle(reshape=reshape, transpose1=transpose1, transpose2=transpose2)
|
||||
if output == 'cls':
|
||||
self.yolo_head_cls.append(self.current)
|
||||
elif output == 'reg':
|
||||
self.yolo_head_reg.append(self.current)
|
||||
|
||||
def SoftMax(self, axes):
|
||||
self.current += 1
|
||||
|
||||
self.softmax(axes)
|
||||
|
||||
def Detect(self, output):
|
||||
self.current += 1
|
||||
|
||||
routes = self.yolo_head_cls if output == 'cls' else self.yolo_head_reg
|
||||
|
||||
for i, route in enumerate(routes):
|
||||
routes[i] = self.get_route(route)
|
||||
self.route(str(routes)[1:-1], axis=-1)
|
||||
self.yolo(output)
|
||||
|
||||
def net(self):
|
||||
self.fc.write('[net]\n' +
|
||||
'width=%d\n' % self.width +
|
||||
'height=%d\n' % self.height +
|
||||
'channels=3\n' +
|
||||
'letter_box=1\n')
|
||||
|
||||
def convolutional(self, cv, act=None, detect=False):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
self.get_state_dict(cv.state_dict())
|
||||
|
||||
if cv._get_name() == 'Conv2d':
|
||||
filters = cv.out_channels
|
||||
size = cv.kernel_size
|
||||
stride = cv.stride
|
||||
pad = cv.padding
|
||||
groups = cv.groups
|
||||
bias = cv.bias
|
||||
bn = False
|
||||
act = act if act is not None else 'linear'
|
||||
else:
|
||||
filters = cv.conv.out_channels
|
||||
size = cv.conv.kernel_size
|
||||
stride = cv.conv.stride
|
||||
pad = cv.conv.padding
|
||||
groups = cv.conv.groups
|
||||
bias = cv.conv.bias
|
||||
bn = True if hasattr(cv, 'bn') else False
|
||||
if act is None:
|
||||
act = self.get_activation(cv.act._get_name()) if hasattr(cv, 'act') else 'linear'
|
||||
|
||||
b = 'batch_normalize=1\n' if bn is True else ''
|
||||
g = 'groups=%d\n' % groups if groups > 1 else ''
|
||||
w = 'bias=1\n' if bias is not None and bn is not False else 'bias=0\n' if bias is None and bn is False else ''
|
||||
|
||||
self.fc.write('\n[convolutional]\n' +
|
||||
b +
|
||||
'filters=%d\n' % filters +
|
||||
'size=%s\n' % self.get_value(size) +
|
||||
'stride=%s\n' % self.get_value(stride) +
|
||||
'pad=%s\n' % self.get_value(pad) +
|
||||
g +
|
||||
w +
|
||||
'activation=%s\n' % act)
|
||||
|
||||
def deconvolutional(self, cv):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
self.get_state_dict(cv.state_dict())
|
||||
|
||||
filters = cv.out_channels
|
||||
size = cv.kernel_size
|
||||
stride = cv.stride
|
||||
pad = cv.padding
|
||||
groups = cv.groups
|
||||
bias = cv.bias
|
||||
|
||||
g = 'groups=%d\n' % groups if groups > 1 else ''
|
||||
w = 'bias=0\n' if bias is None else ''
|
||||
|
||||
self.fc.write('\n[deconvolutional]\n' +
|
||||
'filters=%d\n' % filters +
|
||||
'size=%s\n' % self.get_value(size) +
|
||||
'stride=%s\n' % self.get_value(stride) +
|
||||
'pad=%s\n' % self.get_value(pad) +
|
||||
g +
|
||||
w)
|
||||
|
||||
def route(self, layers, axis=0):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
a = 'axis=%d\n' % axis if axis != 0 else ''
|
||||
|
||||
self.fc.write('\n[route]\n' +
|
||||
'layers=%s\n' % layers +
|
||||
a)
|
||||
|
||||
def shortcut(self, r, ew='add', act='linear'):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
m = 'mode=mul\n' if ew == 'mul' else ''
|
||||
|
||||
self.fc.write('\n[shortcut]\n' +
|
||||
'from=%d\n' % r +
|
||||
m +
|
||||
'activation=%s\n' % act)
|
||||
|
||||
def maxpool(self, m):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
stride = m.stride
|
||||
size = m.kernel_size
|
||||
mode = m.ceil_mode
|
||||
|
||||
m = 'maxpool_up' if mode else 'maxpool'
|
||||
|
||||
self.fc.write('\n[%s]\n' % m +
|
||||
'stride=%d\n' % stride +
|
||||
'size=%d\n' % size)
|
||||
|
||||
def shuffle(self, reshape=None, transpose1=None, transpose2=None):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
r = 'reshape=%s\n' % ', '.join(str(x) for x in reshape) if reshape is not None else ''
|
||||
t1 = 'transpose1=%s\n' % ', '.join(str(x) for x in transpose1) if transpose1 is not None else ''
|
||||
t2 = 'transpose2=%s\n' % ', '.join(str(x) for x in transpose2) if transpose2 is not None else ''
|
||||
|
||||
self.fc.write('\n[shuffle]\n' +
|
||||
r +
|
||||
t1 +
|
||||
t2)
|
||||
|
||||
def softmax(self, axes):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
self.fc.write('\n[softmax]\n' +
|
||||
'axes=%d\n' % axes)
|
||||
|
||||
def yolo(self, output):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
self.fc.write('\n[%s]\n' % output)
|
||||
|
||||
def get_state_dict(self, state_dict):
|
||||
for k, v in state_dict.items():
|
||||
if 'num_batches_tracked' not in k:
|
||||
vr = v.reshape(-1).numpy()
|
||||
self.fw.write('{} {} '.format(k, len(vr)))
|
||||
for vv in vr:
|
||||
self.fw.write(' ')
|
||||
self.fw.write(struct.pack('>f', float(vv)).hex())
|
||||
self.fw.write('\n')
|
||||
self.wc += 1
|
||||
|
||||
def get_anchors(self, anchor_points, stride_tensor):
|
||||
vr = anchor_points.numpy()
|
||||
self.fw.write('{} {} '.format('anchor_points', len(vr)))
|
||||
for vv in vr:
|
||||
self.fw.write(' ')
|
||||
self.fw.write(struct.pack('>f', float(vv)).hex())
|
||||
self.fw.write('\n')
|
||||
self.wc += 1
|
||||
vr = stride_tensor.numpy()
|
||||
self.fw.write('{} {} '.format('stride_tensor', len(vr)))
|
||||
for vv in vr:
|
||||
self.fw.write(' ')
|
||||
self.fw.write(struct.pack('>f', float(vv)).hex())
|
||||
self.fw.write('\n')
|
||||
self.wc += 1
|
||||
|
||||
def get_value(self, key):
|
||||
if type(key) == int:
|
||||
return key
|
||||
return key[0] if key[0] == key[1] else str(key)[1:-1]
|
||||
|
||||
def get_route(self, n):
|
||||
r = 0
|
||||
for i, b in enumerate(self.blocks):
|
||||
if i <= n:
|
||||
r += b
|
||||
else:
|
||||
break
|
||||
return r - 1
|
||||
|
||||
def get_activation(self, act):
|
||||
if act == 'Hardswish':
|
||||
return 'hardswish'
|
||||
elif act == 'LeakyReLU':
|
||||
return 'leaky'
|
||||
elif act == 'SiLU':
|
||||
return 'silu'
|
||||
elif act == 'ReLU':
|
||||
return 'relu'
|
||||
return 'linear'
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(description='PyTorch YOLOv6 conversion')
|
||||
parser.add_argument('-w', '--weights', required=True, help='Input weights (.pt) file path (required)')
|
||||
parser.add_argument(
|
||||
'-s', '--size', nargs='+', type=int, help='Inference size [H,W] (default [640])')
|
||||
parser.add_argument("--p6", action="store_true", help="P6 model")
|
||||
args = parser.parse_args()
|
||||
if not os.path.isfile(args.weights):
|
||||
raise SystemExit('Invalid weights file')
|
||||
if not args.size:
|
||||
args.size = [1280] if args.p6 else [640]
|
||||
return args.weights, args.size
|
||||
|
||||
|
||||
pt_file, inference_size = parse_args()
|
||||
|
||||
model_name = os.path.basename(pt_file).split('.pt')[0]
|
||||
wts_file = model_name + '.wts' if 'yolov6' in model_name else 'yolov6_' + model_name + '.wts'
|
||||
cfg_file = model_name + '.cfg' if 'yolov6' in model_name else 'yolov6_' + model_name + '.cfg'
|
||||
|
||||
model = torch.load(pt_file, map_location='cpu')['model'].float()
|
||||
model.to('cpu').eval()
|
||||
|
||||
for layer in model.modules():
|
||||
if layer._get_name() == 'RepVGGBlock':
|
||||
layer.switch_to_deploy()
|
||||
|
||||
backbones = ['EfficientRep', 'CSPBepBackbone']
|
||||
necks = ['RepBiFPANNeck', 'CSPRepBiFPANNeck', 'RepPANNeck', 'CSPRepPANNeck']
|
||||
backbones_p6 = ['EfficientRep6', 'CSPBepBackbone_P6']
|
||||
necks_p6 = ['RepBiFPANNeck6', 'CSPRepBiFPANNeck_P6', 'RepPANNeck6', 'CSPRepPANNeck_P6']
|
||||
|
||||
with open(wts_file, 'w') as fw, open(cfg_file, 'w') as fc:
|
||||
layers = Layers(inference_size, fw, fc)
|
||||
|
||||
if model.backbone._get_name() in backbones:
|
||||
layers.fc.write('\n# %s\n' % model.backbone._get_name())
|
||||
|
||||
if model.backbone._get_name() == 'EfficientRep':
|
||||
block1 = layers.RepBlock
|
||||
elif model.backbone._get_name() == 'CSPBepBackbone':
|
||||
block1 = layers.BepC3
|
||||
|
||||
if model.backbone.ERBlock_5[2]._get_name() == 'CSPSPPF' or model.backbone.ERBlock_5[2]._get_name() == 'SimCSPSPPF':
|
||||
block2 = layers.CSPSPPF
|
||||
elif model.backbone.ERBlock_5[2]._get_name() == 'SPPF' or model.backbone.ERBlock_5[2]._get_name() == 'SimSPPF':
|
||||
block2 = layers.SPPF
|
||||
else:
|
||||
raise SystemExit('Model not supported')
|
||||
|
||||
layers.BaseConv(model.backbone.stem)
|
||||
layers.BaseConv(model.backbone.ERBlock_2[0])
|
||||
block1(model.backbone.ERBlock_2[1], 'backbone' if hasattr(model.backbone, 'fuse_P2') and
|
||||
model.backbone.fuse_P2 else '')
|
||||
layers.BaseConv(model.backbone.ERBlock_3[0])
|
||||
block1(model.backbone.ERBlock_3[1], 'backbone')
|
||||
layers.BaseConv(model.backbone.ERBlock_4[0])
|
||||
block1(model.backbone.ERBlock_4[1], 'backbone')
|
||||
layers.BaseConv(model.backbone.ERBlock_5[0])
|
||||
block1(model.backbone.ERBlock_5[1])
|
||||
block2(model.backbone.ERBlock_5[2])
|
||||
|
||||
elif model.backbone._get_name() in backbones_p6:
|
||||
layers.fc.write('\n# %s\n' % model.backbone._get_name())
|
||||
|
||||
if model.backbone._get_name() == 'EfficientRep6':
|
||||
block1 = layers.RepBlock
|
||||
elif model.backbone._get_name() == 'CSPBepBackbone_P6':
|
||||
block1 = layers.BepC3
|
||||
|
||||
if model.backbone.ERBlock_6[2]._get_name() == 'CSPSPPF' or model.backbone.ERBlock_6[2]._get_name() == 'SimCSPSPPF':
|
||||
block2 = layers.CSPSPPF
|
||||
elif model.backbone.ERBlock_6[2]._get_name() == 'SPPF' or model.backbone.ERBlock_6[2]._get_name() == 'SimSPPF':
|
||||
block2 = layers.SPPF
|
||||
else:
|
||||
raise SystemExit('Model not supported')
|
||||
|
||||
layers.BaseConv(model.backbone.stem)
|
||||
layers.BaseConv(model.backbone.ERBlock_2[0])
|
||||
block1(model.backbone.ERBlock_2[1], 'backbone' if model.backbone._get_name() == 'CSPBepBackbone_P6' or
|
||||
(hasattr(model.backbone, 'fuse_P2') and model.backbone.fuse_P2) else '')
|
||||
layers.BaseConv(model.backbone.ERBlock_3[0])
|
||||
block1(model.backbone.ERBlock_3[1], 'backbone')
|
||||
layers.BaseConv(model.backbone.ERBlock_4[0])
|
||||
block1(model.backbone.ERBlock_4[1], 'backbone')
|
||||
layers.BaseConv(model.backbone.ERBlock_5[0])
|
||||
block1(model.backbone.ERBlock_5[1], 'backbone')
|
||||
layers.BaseConv(model.backbone.ERBlock_6[0])
|
||||
block1(model.backbone.ERBlock_6[1])
|
||||
block2(model.backbone.ERBlock_6[2])
|
||||
|
||||
else:
|
||||
raise SystemExit('Model not supported')
|
||||
|
||||
if model.neck._get_name() in necks:
|
||||
layers.fc.write('\n# %s\n' % model.neck._get_name())
|
||||
|
||||
if model.neck._get_name() == 'RepBiFPANNeck' or model.neck._get_name() == 'RepPANNeck':
|
||||
block = layers.RepBlock
|
||||
elif model.neck._get_name() == 'CSPRepBiFPANNeck' or model.neck._get_name() == 'CSPRepPANNeck':
|
||||
block = layers.BepC3
|
||||
|
||||
layers.SimConv(model.neck.reduce_layer0, 'fpn')
|
||||
if 'Bi' in model.neck._get_name():
|
||||
layers.BiFusion(model.neck.Bifusion0, 0)
|
||||
else:
|
||||
layers.Upsample(model.neck.upsample0)
|
||||
layers.Concat(layers.backbone_outs[-2])
|
||||
block(model.neck.Rep_p4)
|
||||
layers.SimConv(model.neck.reduce_layer1, 'fpn')
|
||||
if 'Bi' in model.neck._get_name():
|
||||
layers.BiFusion(model.neck.Bifusion1, 1)
|
||||
else:
|
||||
layers.Upsample(model.neck.upsample1)
|
||||
layers.Concat(layers.backbone_outs[-3])
|
||||
block(model.neck.Rep_p3, 'pan')
|
||||
layers.SimConv(model.neck.downsample2)
|
||||
layers.Concat(layers.fpn_feats[1])
|
||||
block(model.neck.Rep_n3, 'pan')
|
||||
layers.SimConv(model.neck.downsample1)
|
||||
layers.Concat(layers.fpn_feats[0])
|
||||
block(model.neck.Rep_n4, 'pan')
|
||||
layers.pan_feats = layers.pan_feats[::-1]
|
||||
|
||||
elif model.neck._get_name() in necks_p6:
|
||||
layers.fc.write('\n# %s\n' % model.neck._get_name())
|
||||
|
||||
if model.neck._get_name() == 'RepBiFPANNeck6' or model.neck._get_name() == 'RepPANNeck6':
|
||||
block = layers.RepBlock
|
||||
elif model.neck._get_name() == 'CSPRepBiFPANNeck_P6' or model.neck._get_name() == 'CSPRepPANNeck_P6':
|
||||
block = layers.BepC3
|
||||
|
||||
layers.SimConv(model.neck.reduce_layer0, 'fpn')
|
||||
if 'Bi' in model.neck._get_name():
|
||||
layers.BiFusion(model.neck.Bifusion0, 0)
|
||||
else:
|
||||
layers.Upsample(model.neck.upsample0)
|
||||
layers.Concat(layers.backbone_outs[-2])
|
||||
block(model.neck.Rep_p5)
|
||||
layers.SimConv(model.neck.reduce_layer1, 'fpn')
|
||||
if 'Bi' in model.neck._get_name():
|
||||
layers.BiFusion(model.neck.Bifusion1, 1)
|
||||
else:
|
||||
layers.Upsample(model.neck.upsample1)
|
||||
layers.Concat(layers.backbone_outs[-3])
|
||||
block(model.neck.Rep_p4)
|
||||
layers.SimConv(model.neck.reduce_layer2, 'fpn')
|
||||
if 'Bi' in model.neck._get_name():
|
||||
layers.BiFusion(model.neck.Bifusion2, 2)
|
||||
else:
|
||||
layers.Upsample(model.neck.upsample2)
|
||||
layers.Concat(layers.backbone_outs[-4])
|
||||
block(model.neck.Rep_p3, 'pan')
|
||||
layers.SimConv(model.neck.downsample2)
|
||||
layers.Concat(layers.fpn_feats[2])
|
||||
block(model.neck.Rep_n4, 'pan')
|
||||
layers.SimConv(model.neck.downsample1)
|
||||
layers.Concat(layers.fpn_feats[1])
|
||||
block(model.neck.Rep_n5, 'pan')
|
||||
layers.SimConv(model.neck.downsample0)
|
||||
layers.Concat(layers.fpn_feats[0])
|
||||
block(model.neck.Rep_n6, 'pan')
|
||||
layers.pan_feats = layers.pan_feats[::-1]
|
||||
|
||||
else:
|
||||
raise SystemExit('Model not supported')
|
||||
|
||||
if model.detect._get_name() == 'Detect':
|
||||
layers.fc.write('\n# Detect\n')
|
||||
|
||||
for i, feat in enumerate(layers.pan_feats):
|
||||
idx = len(layers.pan_feats) - i - 1
|
||||
if i > 0:
|
||||
layers.Route(feat)
|
||||
layers.Conv(model.detect.stems[idx])
|
||||
layers.Conv(model.detect.cls_convs[idx])
|
||||
layers.Conv(model.detect.cls_preds[idx], act='sigmoid')
|
||||
layers.Shuffle(reshape=[model.detect.nc, 'hw'], output='cls')
|
||||
layers.Route(-4)
|
||||
layers.Conv(model.detect.reg_convs[idx])
|
||||
layers.Conv(model.detect.reg_preds[idx])
|
||||
if model.detect.use_dfl:
|
||||
layers.Shuffle(reshape=[4, model.detect.reg_max + 1, 'hw'], transpose2=[1, 0, 2])
|
||||
layers.SoftMax(0)
|
||||
layers.Conv(model.detect.proj_conv)
|
||||
layers.Shuffle(reshape=['h', 'w'], output='reg')
|
||||
else:
|
||||
layers.Shuffle(reshape=[4, 'hw'], output='reg')
|
||||
layers.Detect('cls')
|
||||
layers.Detect('reg')
|
||||
|
||||
x = []
|
||||
for stride in model.detect.stride.tolist()[::-1]:
|
||||
x.append(torch.zeros([1, 1, int(layers.height / stride), int(layers.width / stride)], dtype=torch.float32))
|
||||
anchor_points, stride_tensor = generate_anchors(x, model.detect.stride.flip((0,)), model.detect.grid_cell_size,
|
||||
model.detect.grid_cell_offset, device='cpu', is_eval=True, mode='af')
|
||||
layers.get_anchors(anchor_points.reshape([-1]), stride_tensor)
|
||||
|
||||
else:
|
||||
raise SystemExit('Model not supported')
|
||||
|
||||
os.system('echo "%d" | cat - %s > temp && mv temp %s' % (layers.wc, wts_file, wts_file))
|
||||
@@ -1,357 +0,0 @@
|
||||
import argparse
|
||||
import os
|
||||
import struct
|
||||
import torch
|
||||
from utils.torch_utils import select_device
|
||||
|
||||
|
||||
class Layers(object):
|
||||
def __init__(self, n, size, fw, fc):
|
||||
self.blocks = [0 for _ in range(n)]
|
||||
self.current = 0
|
||||
|
||||
self.width = size[0] if len(size) == 1 else size[1]
|
||||
self.height = size[0]
|
||||
|
||||
self.num = 0
|
||||
self.nc = 0
|
||||
self.anchors = ''
|
||||
self.masks = []
|
||||
|
||||
self.fw = fw
|
||||
self.fc = fc
|
||||
self.wc = 0
|
||||
|
||||
self.net()
|
||||
|
||||
def ReOrg(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# ReOrg\n')
|
||||
|
||||
self.reorg()
|
||||
|
||||
def Conv(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# Conv\n')
|
||||
|
||||
if child.f != -1:
|
||||
r = self.get_route(child.f)
|
||||
self.route('%d' % r)
|
||||
self.convolutional(child)
|
||||
|
||||
def DownC(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# DownC\n')
|
||||
|
||||
self.maxpool(child.mp)
|
||||
self.convolutional(child.cv3)
|
||||
self.route('-3')
|
||||
self.convolutional(child.cv1)
|
||||
self.convolutional(child.cv2)
|
||||
self.route('-1, -4')
|
||||
|
||||
def MP(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# MP\n')
|
||||
|
||||
self.maxpool(child.m)
|
||||
|
||||
def SP(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# SP\n')
|
||||
|
||||
if child.f != -1:
|
||||
r = self.get_route(child.f)
|
||||
self.route('%d' % r)
|
||||
self.maxpool(child.m)
|
||||
|
||||
def SPPCSPC(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# SPPCSPC\n')
|
||||
|
||||
self.convolutional(child.cv2)
|
||||
self.route('-2')
|
||||
self.convolutional(child.cv1)
|
||||
self.convolutional(child.cv3)
|
||||
self.convolutional(child.cv4)
|
||||
self.maxpool(child.m[0])
|
||||
self.route('-2')
|
||||
self.maxpool(child.m[1])
|
||||
self.route('-4')
|
||||
self.maxpool(child.m[2])
|
||||
self.route('-6, -5, -3, -1')
|
||||
self.convolutional(child.cv5)
|
||||
self.convolutional(child.cv6)
|
||||
self.route('-1, -13')
|
||||
self.convolutional(child.cv7)
|
||||
|
||||
def RepConv(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# RepConv\n')
|
||||
|
||||
if child.f != -1:
|
||||
r = self.get_route(child.f)
|
||||
self.route('%d' % r)
|
||||
self.convolutional(child.rbr_1x1)
|
||||
self.route('-2')
|
||||
self.convolutional(child.rbr_dense)
|
||||
self.shortcut(-3, act=self.get_activation(child.act._get_name()))
|
||||
|
||||
def Upsample(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# Upsample\n')
|
||||
|
||||
self.upsample(child)
|
||||
|
||||
def Concat(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# Concat\n')
|
||||
|
||||
r = []
|
||||
for i in range(1, len(child.f)):
|
||||
r.append(self.get_route(child.f[i]))
|
||||
self.route('-1, %s' % str(r)[1:-1])
|
||||
|
||||
def Shortcut(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# Shortcut\n')
|
||||
|
||||
r = self.get_route(child.f[1])
|
||||
self.shortcut(r)
|
||||
|
||||
def Detect(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# Detect\n')
|
||||
|
||||
self.get_anchors(child.state_dict(), child.m[0].out_channels)
|
||||
|
||||
for i, m in enumerate(child.m):
|
||||
r = self.get_route(child.f[i])
|
||||
self.route('%d' % r)
|
||||
self.convolutional(m, detect=True)
|
||||
self.yolo(i)
|
||||
|
||||
def net(self):
|
||||
self.fc.write('[net]\n' +
|
||||
'width=%d\n' % self.width +
|
||||
'height=%d\n' % self.height +
|
||||
'channels=3\n' +
|
||||
'letter_box=1\n')
|
||||
|
||||
def reorg(self):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
self.fc.write('\n[reorg]\n')
|
||||
|
||||
def convolutional(self, cv, act=None, detect=False):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
self.get_state_dict(cv.state_dict())
|
||||
|
||||
if cv._get_name() == 'Conv2d':
|
||||
filters = cv.out_channels
|
||||
size = cv.kernel_size
|
||||
stride = cv.stride
|
||||
pad = cv.padding
|
||||
groups = cv.groups
|
||||
bias = cv.bias
|
||||
bn = False
|
||||
act = 'linear' if not detect else 'logistic'
|
||||
elif cv._get_name() == 'Sequential':
|
||||
filters = cv[0].out_channels
|
||||
size = cv[0].kernel_size
|
||||
stride = cv[0].stride
|
||||
pad = cv[0].padding
|
||||
groups = cv[0].groups
|
||||
bias = cv[0].bias
|
||||
bn = True if cv[1]._get_name() == 'BatchNorm2d' else False
|
||||
act = 'linear'
|
||||
else:
|
||||
filters = cv.conv.out_channels
|
||||
size = cv.conv.kernel_size
|
||||
stride = cv.conv.stride
|
||||
pad = cv.conv.padding
|
||||
groups = cv.conv.groups
|
||||
bias = cv.conv.bias
|
||||
bn = True if hasattr(cv, 'bn') else False
|
||||
if act is None:
|
||||
act = self.get_activation(cv.act._get_name()) if hasattr(cv, 'act') else 'linear'
|
||||
|
||||
b = 'batch_normalize=1\n' if bn is True else ''
|
||||
g = 'groups=%d\n' % groups if groups > 1 else ''
|
||||
w = 'bias=1\n' if bias is not None and bn is not False else 'bias=0\n' if bias is None and bn is False else ''
|
||||
|
||||
self.fc.write('\n[convolutional]\n' +
|
||||
b +
|
||||
'filters=%d\n' % filters +
|
||||
'size=%s\n' % self.get_value(size) +
|
||||
'stride=%s\n' % self.get_value(stride) +
|
||||
'pad=%s\n' % self.get_value(pad) +
|
||||
g +
|
||||
w +
|
||||
'activation=%s\n' % act)
|
||||
|
||||
def route(self, layers):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
self.fc.write('\n[route]\n' +
|
||||
'layers=%s\n' % layers)
|
||||
|
||||
def shortcut(self, r, act='linear'):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
self.fc.write('\n[shortcut]\n' +
|
||||
'from=%d\n' % r +
|
||||
'activation=%s\n' % act)
|
||||
|
||||
def maxpool(self, m):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
stride = m.stride
|
||||
size = m.kernel_size
|
||||
mode = m.ceil_mode
|
||||
|
||||
m = 'maxpool_up' if mode else 'maxpool'
|
||||
|
||||
self.fc.write('\n[%s]\n' % m +
|
||||
'stride=%d\n' % stride +
|
||||
'size=%d\n' % size)
|
||||
|
||||
def upsample(self, child):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
stride = child.scale_factor
|
||||
|
||||
self.fc.write('\n[upsample]\n' +
|
||||
'stride=%d\n' % stride)
|
||||
|
||||
def yolo(self, i):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
self.fc.write('\n[yolo]\n' +
|
||||
'mask=%s\n' % self.masks[i] +
|
||||
'anchors=%s\n' % self.anchors +
|
||||
'classes=%d\n' % self.nc +
|
||||
'num=%d\n' % self.num +
|
||||
'scale_x_y=2.0\n' +
|
||||
'new_coords=1\n')
|
||||
|
||||
def get_state_dict(self, state_dict):
|
||||
for k, v in state_dict.items():
|
||||
if 'num_batches_tracked' not in k:
|
||||
vr = v.reshape(-1).numpy()
|
||||
self.fw.write('{} {} '.format(k, len(vr)))
|
||||
for vv in vr:
|
||||
self.fw.write(' ')
|
||||
self.fw.write(struct.pack('>f', float(vv)).hex())
|
||||
self.fw.write('\n')
|
||||
self.wc += 1
|
||||
|
||||
def get_anchors(self, state_dict, out_channels):
|
||||
anchor_grid = state_dict['anchor_grid']
|
||||
aa = anchor_grid.reshape(-1).tolist()
|
||||
am = anchor_grid.tolist()
|
||||
|
||||
self.num = (len(aa) / 2)
|
||||
self.nc = int((out_channels / (self.num / len(am))) - 5)
|
||||
self.anchors = str(aa)[1:-1]
|
||||
|
||||
n = 0
|
||||
for m in am:
|
||||
mask = []
|
||||
for _ in range(len(m)):
|
||||
mask.append(n)
|
||||
n += 1
|
||||
self.masks.append(str(mask)[1:-1])
|
||||
|
||||
def get_value(self, key):
|
||||
if type(key) == int:
|
||||
return key
|
||||
return key[0] if key[0] == key[1] else str(key)[1:-1]
|
||||
|
||||
def get_route(self, n):
|
||||
r = 0
|
||||
if n < 0:
|
||||
for i, b in enumerate(self.blocks[self.current-1::-1]):
|
||||
if i < abs(n) - 1:
|
||||
r -= b
|
||||
else:
|
||||
break
|
||||
else:
|
||||
for i, b in enumerate(self.blocks):
|
||||
if i <= n:
|
||||
r += b
|
||||
else:
|
||||
break
|
||||
return r - 1
|
||||
|
||||
def get_activation(self, act):
|
||||
if act == 'Hardswish':
|
||||
return 'hardswish'
|
||||
elif act == 'LeakyReLU':
|
||||
return 'leaky'
|
||||
elif act == 'SiLU':
|
||||
return 'silu'
|
||||
return 'linear'
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(description='PyTorch YOLOv7 conversion')
|
||||
parser.add_argument('-w', '--weights', required=True, help='Input weights (.pt) file path (required)')
|
||||
parser.add_argument(
|
||||
'-s', '--size', nargs='+', type=int, help='Inference size [H,W] (default [640])')
|
||||
parser.add_argument("--p6", action="store_true", help="P6 model")
|
||||
args = parser.parse_args()
|
||||
if not os.path.isfile(args.weights):
|
||||
raise SystemExit('Invalid weights file')
|
||||
if not args.size:
|
||||
args.size = [1280] if args.p6 else [640]
|
||||
return args.weights, args.size
|
||||
|
||||
|
||||
pt_file, inference_size = parse_args()
|
||||
|
||||
model_name = os.path.basename(pt_file).split('.pt')[0]
|
||||
wts_file = model_name + '.wts' if 'yolov7' in model_name else 'yolov7_' + model_name + '.wts'
|
||||
cfg_file = model_name + '.cfg' if 'yolov7' in model_name else 'yolov7_' + model_name + '.cfg'
|
||||
|
||||
device = select_device('cpu')
|
||||
model = torch.load(pt_file, map_location=device)
|
||||
model = model['ema' if model.get('ema') else 'model'].float()
|
||||
|
||||
anchor_grid = model.model[-1].anchors * model.model[-1].stride[..., None, None]
|
||||
delattr(model.model[-1], 'anchor_grid')
|
||||
model.model[-1].register_buffer('anchor_grid', anchor_grid)
|
||||
|
||||
model.to(device).eval()
|
||||
|
||||
with open(wts_file, 'w') as fw, open(cfg_file, 'w') as fc:
|
||||
layers = Layers(len(model.model), inference_size, fw, fc)
|
||||
|
||||
for child in model.model.children():
|
||||
if child._get_name() == 'ReOrg':
|
||||
layers.ReOrg(child)
|
||||
elif child._get_name() == 'Conv':
|
||||
layers.Conv(child)
|
||||
elif child._get_name() == 'DownC':
|
||||
layers.DownC(child)
|
||||
elif child._get_name() == 'MP':
|
||||
layers.MP(child)
|
||||
elif child._get_name() == 'SP':
|
||||
layers.SP(child)
|
||||
elif child._get_name() == 'SPPCSPC':
|
||||
layers.SPPCSPC(child)
|
||||
elif child._get_name() == 'RepConv':
|
||||
layers.RepConv(child)
|
||||
elif child._get_name() == 'Upsample':
|
||||
layers.Upsample(child)
|
||||
elif child._get_name() == 'Concat':
|
||||
layers.Concat(child)
|
||||
elif child._get_name() == 'Shortcut':
|
||||
layers.Shortcut(child)
|
||||
elif child._get_name() == 'Detect':
|
||||
layers.Detect(child)
|
||||
else:
|
||||
raise SystemExit('Model not supported')
|
||||
|
||||
os.system('echo "%d" | cat - %s > temp && mv temp %s' % (layers.wc, wts_file, wts_file))
|
||||
@@ -1,323 +0,0 @@
|
||||
import argparse
|
||||
import os
|
||||
import struct
|
||||
import torch
|
||||
from ultralytics.yolo.utils.torch_utils import select_device
|
||||
from ultralytics.yolo.utils.tal import make_anchors
|
||||
|
||||
|
||||
class Layers(object):
|
||||
def __init__(self, n, size, fw, fc):
|
||||
self.blocks = [0 for _ in range(n)]
|
||||
self.current = -1
|
||||
|
||||
self.width = size[0] if len(size) == 1 else size[1]
|
||||
self.height = size[0]
|
||||
|
||||
self.fw = fw
|
||||
self.fc = fc
|
||||
self.wc = 0
|
||||
|
||||
self.net()
|
||||
|
||||
def Conv(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# Conv\n')
|
||||
|
||||
self.convolutional(child)
|
||||
|
||||
def C2f(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# C2f\n')
|
||||
|
||||
self.convolutional(child.cv1)
|
||||
self.c2f(child.m)
|
||||
self.convolutional(child.cv2)
|
||||
|
||||
def SPPF(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# SPPF\n')
|
||||
|
||||
self.convolutional(child.cv1)
|
||||
self.maxpool(child.m)
|
||||
self.maxpool(child.m)
|
||||
self.maxpool(child.m)
|
||||
self.route('-4, -3, -2, -1')
|
||||
self.convolutional(child.cv2)
|
||||
|
||||
def Upsample(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# Upsample\n')
|
||||
|
||||
self.upsample(child)
|
||||
|
||||
def Concat(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# Concat\n')
|
||||
|
||||
r = []
|
||||
for i in range(1, len(child.f)):
|
||||
r.append(self.get_route(child.f[i]))
|
||||
self.route('-1, %s' % str(r)[1:-1])
|
||||
|
||||
def Detect(self, child):
|
||||
self.current = child.i
|
||||
self.fc.write('\n# Detect\n')
|
||||
|
||||
output_idxs = [0 for _ in range(child.nl)]
|
||||
for i in range(child.nl):
|
||||
r = self.get_route(child.f[i])
|
||||
self.route('%d' % r)
|
||||
for j in range(len(child.cv3[i])):
|
||||
self.convolutional(child.cv3[i][j])
|
||||
self.route('%d' % (-1 - len(child.cv3[i])))
|
||||
for j in range(len(child.cv2[i])):
|
||||
self.convolutional(child.cv2[i][j])
|
||||
self.route('-1, %d' % (-2 - len(child.cv2[i])))
|
||||
self.shuffle(reshape=[child.no, -1])
|
||||
output_idxs[i] = (-1 + i * (-4 - len(child.cv3[i]) - len(child.cv2[i])))
|
||||
self.route('%s' % str(output_idxs[::-1])[1:-1], axis=1)
|
||||
self.yolo(child)
|
||||
|
||||
def net(self):
|
||||
self.fc.write('[net]\n' +
|
||||
'width=%d\n' % self.width +
|
||||
'height=%d\n' % self.height +
|
||||
'channels=3\n' +
|
||||
'letter_box=1\n')
|
||||
|
||||
def convolutional(self, cv, act=None, detect=False):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
self.get_state_dict(cv.state_dict())
|
||||
|
||||
if cv._get_name() == 'Conv2d':
|
||||
filters = cv.out_channels
|
||||
size = cv.kernel_size
|
||||
stride = cv.stride
|
||||
pad = cv.padding
|
||||
groups = cv.groups
|
||||
bias = cv.bias
|
||||
bn = False
|
||||
act = 'linear' if not detect else 'logistic'
|
||||
else:
|
||||
filters = cv.conv.out_channels
|
||||
size = cv.conv.kernel_size
|
||||
stride = cv.conv.stride
|
||||
pad = cv.conv.padding
|
||||
groups = cv.conv.groups
|
||||
bias = cv.conv.bias
|
||||
bn = True if hasattr(cv, 'bn') else False
|
||||
if act is None:
|
||||
act = self.get_activation(cv.act._get_name()) if hasattr(cv, 'act') else 'linear'
|
||||
|
||||
b = 'batch_normalize=1\n' if bn is True else ''
|
||||
g = 'groups=%d\n' % groups if groups > 1 else ''
|
||||
w = 'bias=1\n' if bias is not None and bn is not False else 'bias=0\n' if bias is None and bn is False else ''
|
||||
|
||||
self.fc.write('\n[convolutional]\n' +
|
||||
b +
|
||||
'filters=%d\n' % filters +
|
||||
'size=%s\n' % self.get_value(size) +
|
||||
'stride=%s\n' % self.get_value(stride) +
|
||||
'pad=%s\n' % self.get_value(pad) +
|
||||
g +
|
||||
w +
|
||||
'activation=%s\n' % act)
|
||||
|
||||
def c2f(self, m):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
for x in m:
|
||||
self.get_state_dict(x.state_dict())
|
||||
|
||||
n = len(m)
|
||||
shortcut = 1 if m[0].add else 0
|
||||
filters = m[0].cv1.conv.out_channels
|
||||
size = m[0].cv1.conv.kernel_size
|
||||
stride = m[0].cv1.conv.stride
|
||||
pad = m[0].cv1.conv.padding
|
||||
groups = m[0].cv1.conv.groups
|
||||
bias = m[0].cv1.conv.bias
|
||||
bn = True if hasattr(m[0].cv1, 'bn') else False
|
||||
act = 'linear'
|
||||
if hasattr(m[0].cv1, 'act'):
|
||||
act = self.get_activation(m[0].cv1.act._get_name())
|
||||
|
||||
b = 'batch_normalize=1\n' if bn is True else ''
|
||||
g = 'groups=%d\n' % groups if groups > 1 else ''
|
||||
w = 'bias=1\n' if bias is not None and bn is not False else 'bias=0\n' if bias is None and bn is False else ''
|
||||
|
||||
self.fc.write('\n[c2f]\n' +
|
||||
'n=%d\n' % n +
|
||||
'shortcut=%d\n' % shortcut +
|
||||
b +
|
||||
'filters=%d\n' % filters +
|
||||
'size=%s\n' % self.get_value(size) +
|
||||
'stride=%s\n' % self.get_value(stride) +
|
||||
'pad=%s\n' % self.get_value(pad) +
|
||||
g +
|
||||
w +
|
||||
'activation=%s\n' % act)
|
||||
|
||||
def route(self, layers, axis=0):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
a = 'axis=%d\n' % axis if axis != 0 else ''
|
||||
|
||||
self.fc.write('\n[route]\n' +
|
||||
'layers=%s\n' % layers +
|
||||
a)
|
||||
|
||||
def shortcut(self, r, ew='add', act='linear'):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
m = 'mode=mul\n' if ew == 'mul' else ''
|
||||
|
||||
self.fc.write('\n[shortcut]\n' +
|
||||
'from=%d\n' % r +
|
||||
m +
|
||||
'activation=%s\n' % act)
|
||||
|
||||
def maxpool(self, m):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
stride = m.stride
|
||||
size = m.kernel_size
|
||||
mode = m.ceil_mode
|
||||
|
||||
m = 'maxpool_up' if mode else 'maxpool'
|
||||
|
||||
self.fc.write('\n[%s]\n' % m +
|
||||
'stride=%d\n' % stride +
|
||||
'size=%d\n' % size)
|
||||
|
||||
def upsample(self, child):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
stride = child.scale_factor
|
||||
|
||||
self.fc.write('\n[upsample]\n' +
|
||||
'stride=%d\n' % stride)
|
||||
|
||||
def shuffle(self, reshape=None, transpose1=None, transpose2=None):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
r = 'reshape=%s\n' % ', '.join(str(x) for x in reshape) if reshape is not None else ''
|
||||
t1 = 'transpose1=%s\n' % ', '.join(str(x) for x in transpose1) if transpose1 is not None else ''
|
||||
t2 = 'transpose2=%s\n' % ', '.join(str(x) for x in transpose2) if transpose2 is not None else ''
|
||||
|
||||
self.fc.write('\n[shuffle]\n' +
|
||||
r +
|
||||
t1 +
|
||||
t2)
|
||||
|
||||
def yolo(self, child):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
self.fc.write('\n[detect_v8]\n' +
|
||||
'num=%d\n' % (child.reg_max * 4) +
|
||||
'classes=%d\n' % child.nc)
|
||||
|
||||
def get_state_dict(self, state_dict):
|
||||
for k, v in state_dict.items():
|
||||
if 'num_batches_tracked' not in k:
|
||||
vr = v.reshape(-1).numpy()
|
||||
self.fw.write('{} {} '.format(k, len(vr)))
|
||||
for vv in vr:
|
||||
self.fw.write(' ')
|
||||
self.fw.write(struct.pack('>f', float(vv)).hex())
|
||||
self.fw.write('\n')
|
||||
self.wc += 1
|
||||
|
||||
def get_anchors(self, anchor_points, stride_tensor):
|
||||
vr = anchor_points.numpy()
|
||||
self.fw.write('{} {} '.format('anchor_points', len(vr)))
|
||||
for vv in vr:
|
||||
self.fw.write(' ')
|
||||
self.fw.write(struct.pack('>f', float(vv)).hex())
|
||||
self.fw.write('\n')
|
||||
self.wc += 1
|
||||
vr = stride_tensor.numpy()
|
||||
self.fw.write('{} {} '.format('stride_tensor', len(vr)))
|
||||
for vv in vr:
|
||||
self.fw.write(' ')
|
||||
self.fw.write(struct.pack('>f', float(vv)).hex())
|
||||
self.fw.write('\n')
|
||||
self.wc += 1
|
||||
|
||||
def get_value(self, key):
|
||||
if type(key) == int:
|
||||
return key
|
||||
return key[0] if key[0] == key[1] else str(key)[1:-1]
|
||||
|
||||
def get_route(self, n):
|
||||
r = 0
|
||||
for i, b in enumerate(self.blocks):
|
||||
if i <= n:
|
||||
r += b
|
||||
else:
|
||||
break
|
||||
return r - 1
|
||||
|
||||
def get_activation(self, act):
|
||||
if act == 'Hardswish':
|
||||
return 'hardswish'
|
||||
elif act == 'LeakyReLU':
|
||||
return 'leaky'
|
||||
elif act == 'SiLU':
|
||||
return 'silu'
|
||||
return 'linear'
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(description='PyTorch YOLOv8 conversion')
|
||||
parser.add_argument('-w', '--weights', required=True, help='Input weights (.pt) file path (required)')
|
||||
parser.add_argument(
|
||||
'-s', '--size', nargs='+', type=int, default=[640], help='Inference size [H,W] (default [640])')
|
||||
args = parser.parse_args()
|
||||
if not os.path.isfile(args.weights):
|
||||
raise SystemExit('Invalid weights file')
|
||||
return args.weights, args.size
|
||||
|
||||
|
||||
pt_file, inference_size = parse_args()
|
||||
|
||||
model_name = os.path.basename(pt_file).split('.pt')[0]
|
||||
wts_file = model_name + '.wts' if 'yolov8' in model_name else 'yolov8_' + model_name + '.wts'
|
||||
cfg_file = model_name + '.cfg' if 'yolov8' in model_name else 'yolov8_' + model_name + '.cfg'
|
||||
|
||||
device = select_device('cpu')
|
||||
model = torch.load(pt_file, map_location=device)['model'].float()
|
||||
model.to(device).eval()
|
||||
|
||||
if model.names and model.nc:
|
||||
with open("labels.txt", 'w') as fw:
|
||||
for i in range(model.nc):
|
||||
fw.write(model.names[i] + '\n')
|
||||
|
||||
with open(wts_file, 'w') as fw, open(cfg_file, 'w') as fc:
|
||||
layers = Layers(len(model.model), inference_size, fw, fc)
|
||||
|
||||
for child in model.model.children():
|
||||
if child._get_name() == 'Conv':
|
||||
layers.Conv(child)
|
||||
elif child._get_name() == 'C2f':
|
||||
layers.C2f(child)
|
||||
elif child._get_name() == 'SPPF':
|
||||
layers.SPPF(child)
|
||||
elif child._get_name() == 'Upsample':
|
||||
layers.Upsample(child)
|
||||
elif child._get_name() == 'Concat':
|
||||
layers.Concat(child)
|
||||
elif child._get_name() == 'Detect':
|
||||
layers.Detect(child)
|
||||
x = []
|
||||
for stride in model.stride.tolist():
|
||||
x.append(torch.zeros([1, 1, int(layers.height / stride), int(layers.width / stride)], dtype=torch.float32))
|
||||
anchor_points, stride_tensor = (x.transpose(0, 1) for x in make_anchors(x, child.stride, 0.5))
|
||||
layers.get_anchors(anchor_points.reshape([-1]), stride_tensor.reshape([-1]))
|
||||
else:
|
||||
raise SystemExit('Model not supported')
|
||||
|
||||
os.system('echo "%d" | cat - %s > temp && mv temp %s' % (layers.wc, wts_file, wts_file))
|
||||
@@ -1,56 +0,0 @@
|
||||
import argparse
|
||||
import os
|
||||
import struct
|
||||
import torch
|
||||
from utils.torch_utils import select_device
|
||||
from models.models import Darknet
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(description='PyTorch YOLOR conversion (main branch)')
|
||||
parser.add_argument('-w', '--weights', required=True, help='Input weights (.pt) file path (required)')
|
||||
parser.add_argument('-c', '--cfg', default='', help='Input cfg (.cfg) file path')
|
||||
args = parser.parse_args()
|
||||
if not os.path.isfile(args.weights):
|
||||
raise SystemExit('Invalid weights file')
|
||||
if args.cfg != '' and not os.path.isfile(args.cfg):
|
||||
raise SystemExit('Invalid cfg file')
|
||||
return args.weights, args.cfg
|
||||
|
||||
|
||||
pt_file, cfg_file = parse_args()
|
||||
|
||||
|
||||
model_name = os.path.basename(pt_file).split('.pt')[0]
|
||||
wts_file = model_name + '.wts' if 'yolor' in model_name else 'yolor_' + model_name + '.wts'
|
||||
new_cfg_file = model_name + '.cfg' if 'yolor' in model_name else 'yolor_' + model_name + '.cfg'
|
||||
|
||||
if cfg_file == '':
|
||||
cfg_file = 'cfg/' + model_name + '.cfg'
|
||||
if not os.path.isfile(cfg_file):
|
||||
raise SystemExit('CFG file not found')
|
||||
elif not os.path.isfile(cfg_file):
|
||||
raise SystemExit('Invalid CFG file')
|
||||
|
||||
device = select_device('cpu')
|
||||
model = Darknet(cfg_file).to(device)
|
||||
model.load_state_dict(torch.load(pt_file, map_location=device)['model'])
|
||||
model.to(device).eval()
|
||||
|
||||
with open(wts_file, 'w') as f:
|
||||
wts_write = ''
|
||||
conv_count = 0
|
||||
for k, v in model.state_dict().items():
|
||||
if 'num_batches_tracked' not in k:
|
||||
vr = v.reshape(-1).cpu().numpy()
|
||||
wts_write += '{} {} '.format(k, len(vr))
|
||||
for vv in vr:
|
||||
wts_write += ' '
|
||||
wts_write += struct.pack('>f', float(vv)).hex()
|
||||
wts_write += '\n'
|
||||
conv_count += 1
|
||||
f.write('{}\n'.format(conv_count))
|
||||
f.write(wts_write)
|
||||
|
||||
if not os.path.isfile(new_cfg_file):
|
||||
os.system('cp %s %s' % (cfg_file, new_cfg_file))
|
||||
@@ -1,370 +0,0 @@
|
||||
import argparse
|
||||
import os
|
||||
import struct
|
||||
import torch
|
||||
from yolox.exp import get_exp
|
||||
|
||||
|
||||
class Layers(object):
|
||||
def __init__(self, size, fw, fc):
|
||||
self.blocks = [0 for _ in range(300)]
|
||||
self.current = -1
|
||||
|
||||
self.width = size[0] if len(size) == 1 else size[1]
|
||||
self.height = size[0]
|
||||
|
||||
self.backbone_outs = []
|
||||
self.fpn_feats = []
|
||||
self.pan_feats = []
|
||||
self.yolo_head = []
|
||||
|
||||
self.fw = fw
|
||||
self.fc = fc
|
||||
self.wc = 0
|
||||
|
||||
self.net()
|
||||
|
||||
def Conv(self, child):
|
||||
self.current += 1
|
||||
|
||||
if child._get_name() == 'DWConv':
|
||||
self.convolutional(child.dconv)
|
||||
self.convolutional(child.pconv)
|
||||
else:
|
||||
self.convolutional(child)
|
||||
|
||||
def Focus(self, child):
|
||||
self.current += 1
|
||||
|
||||
self.reorg()
|
||||
self.convolutional(child.conv)
|
||||
|
||||
def BaseConv(self, child, stage='', act=None):
|
||||
self.current += 1
|
||||
|
||||
self.convolutional(child, act=act)
|
||||
if stage == 'fpn':
|
||||
self.fpn_feats.append(self.current)
|
||||
|
||||
def CSPLayer(self, child, stage=''):
|
||||
self.current += 1
|
||||
|
||||
self.convolutional(child.conv2)
|
||||
self.route('-2')
|
||||
self.convolutional(child.conv1)
|
||||
idx = -3
|
||||
for m in child.m:
|
||||
if m.use_add:
|
||||
self.convolutional(m.conv1)
|
||||
if m.conv2._get_name() == 'DWConv':
|
||||
self.convolutional(m.conv2.dconv)
|
||||
self.convolutional(m.conv2.pconv)
|
||||
self.shortcut(-4)
|
||||
idx -= 4
|
||||
else:
|
||||
self.convolutional(m.conv2)
|
||||
self.shortcut(-3)
|
||||
idx -= 3
|
||||
else:
|
||||
self.convolutional(m.conv1)
|
||||
if m.conv2._get_name() == 'DWConv':
|
||||
self.convolutional(m.conv2.dconv)
|
||||
self.convolutional(m.conv2.pconv)
|
||||
idx -= 3
|
||||
else:
|
||||
self.convolutional(m.conv2)
|
||||
idx -= 2
|
||||
self.route('-1, %d' % idx)
|
||||
self.convolutional(child.conv3)
|
||||
if stage == 'backbone':
|
||||
self.backbone_outs.append(self.current)
|
||||
elif stage == 'pan':
|
||||
self.pan_feats.append(self.current)
|
||||
|
||||
def SPPBottleneck(self, child):
|
||||
self.current += 1
|
||||
|
||||
self.convolutional(child.conv1)
|
||||
self.maxpool(child.m[0])
|
||||
self.route('-2')
|
||||
self.maxpool(child.m[1])
|
||||
self.route('-4')
|
||||
self.maxpool(child.m[2])
|
||||
self.route('-6, -5, -3, -1')
|
||||
self.convolutional(child.conv2)
|
||||
|
||||
def Upsample(self, child):
|
||||
self.current += 1
|
||||
|
||||
self.upsample(child)
|
||||
|
||||
def Concat(self, route):
|
||||
self.current += 1
|
||||
|
||||
r = self.get_route(route)
|
||||
self.route('-1, %d' % r)
|
||||
|
||||
def Route(self, route):
|
||||
self.current += 1
|
||||
|
||||
if route > 0:
|
||||
r = self.get_route(route)
|
||||
self.route('%d' % r)
|
||||
else:
|
||||
self.route('%d' % route)
|
||||
|
||||
def RouteShuffleOut(self, route):
|
||||
self.current += 1
|
||||
|
||||
self.route(route)
|
||||
self.shuffle(reshape=['c', 'hw'])
|
||||
self.yolo_head.append(self.current)
|
||||
|
||||
def Detect(self, strides):
|
||||
self.current += 1
|
||||
|
||||
routes = self.yolo_head[::-1]
|
||||
|
||||
for i, route in enumerate(routes):
|
||||
routes[i] = self.get_route(route)
|
||||
self.route(str(routes)[1:-1], axis=1)
|
||||
self.shuffle(transpose1=[1, 0])
|
||||
self.yolo(strides)
|
||||
|
||||
def net(self):
|
||||
self.fc.write('[net]\n' +
|
||||
'width=%d\n' % self.width +
|
||||
'height=%d\n' % self.height +
|
||||
'channels=3\n' +
|
||||
'letter_box=1\n')
|
||||
|
||||
def reorg(self):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
self.fc.write('\n[reorg]\n')
|
||||
|
||||
def convolutional(self, cv, act=None, detect=False):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
self.get_state_dict(cv.state_dict())
|
||||
|
||||
if cv._get_name() == 'Conv2d':
|
||||
filters = cv.out_channels
|
||||
size = cv.kernel_size
|
||||
stride = cv.stride
|
||||
pad = cv.padding
|
||||
groups = cv.groups
|
||||
bias = cv.bias
|
||||
bn = False
|
||||
act = act if act is not None else 'linear'
|
||||
else:
|
||||
filters = cv.conv.out_channels
|
||||
size = cv.conv.kernel_size
|
||||
stride = cv.conv.stride
|
||||
pad = cv.conv.padding
|
||||
groups = cv.conv.groups
|
||||
bias = cv.conv.bias
|
||||
bn = True if hasattr(cv, 'bn') else False
|
||||
if act is None:
|
||||
act = self.get_activation(cv.act._get_name()) if hasattr(cv, 'act') else 'linear'
|
||||
|
||||
b = 'batch_normalize=1\n' if bn is True else ''
|
||||
g = 'groups=%d\n' % groups if groups > 1 else ''
|
||||
w = 'bias=1\n' if bias is not None and bn is not False else 'bias=0\n' if bias is None and bn is False else ''
|
||||
|
||||
self.fc.write('\n[convolutional]\n' +
|
||||
b +
|
||||
'filters=%d\n' % filters +
|
||||
'size=%s\n' % self.get_value(size) +
|
||||
'stride=%s\n' % self.get_value(stride) +
|
||||
'pad=%s\n' % self.get_value(pad) +
|
||||
g +
|
||||
w +
|
||||
'activation=%s\n' % act)
|
||||
|
||||
def route(self, layers, axis=0):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
a = 'axis=%d\n' % axis if axis != 0 else ''
|
||||
|
||||
self.fc.write('\n[route]\n' +
|
||||
'layers=%s\n' % layers +
|
||||
a)
|
||||
|
||||
def shortcut(self, r, ew='add', act='linear'):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
m = 'mode=mul\n' if ew == 'mul' else ''
|
||||
|
||||
self.fc.write('\n[shortcut]\n' +
|
||||
'from=%d\n' % r +
|
||||
m +
|
||||
'activation=%s\n' % act)
|
||||
|
||||
def maxpool(self, m):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
stride = m.stride
|
||||
size = m.kernel_size
|
||||
mode = m.ceil_mode
|
||||
|
||||
m = 'maxpool_up' if mode else 'maxpool'
|
||||
|
||||
self.fc.write('\n[%s]\n' % m +
|
||||
'stride=%d\n' % stride +
|
||||
'size=%d\n' % size)
|
||||
|
||||
def upsample(self, child):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
stride = child.scale_factor
|
||||
|
||||
self.fc.write('\n[upsample]\n' +
|
||||
'stride=%d\n' % stride)
|
||||
|
||||
def shuffle(self, reshape=None, transpose1=None, transpose2=None):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
r = 'reshape=%s\n' % ', '.join(str(x) for x in reshape) if reshape is not None else ''
|
||||
t1 = 'transpose1=%s\n' % ', '.join(str(x) for x in transpose1) if transpose1 is not None else ''
|
||||
t2 = 'transpose2=%s\n' % ', '.join(str(x) for x in transpose2) if transpose2 is not None else ''
|
||||
|
||||
self.fc.write('\n[shuffle]\n' +
|
||||
r +
|
||||
t1 +
|
||||
t2)
|
||||
|
||||
def yolo(self, strides):
|
||||
self.blocks[self.current] += 1
|
||||
|
||||
self.fc.write('\n[detect_x]\n' +
|
||||
'strides=%s\n' % str(strides)[1:-1])
|
||||
|
||||
def get_state_dict(self, state_dict):
|
||||
for k, v in state_dict.items():
|
||||
if 'num_batches_tracked' not in k:
|
||||
vr = v.reshape(-1).numpy()
|
||||
self.fw.write('{} {} '.format(k, len(vr)))
|
||||
for vv in vr:
|
||||
self.fw.write(' ')
|
||||
self.fw.write(struct.pack('>f', float(vv)).hex())
|
||||
self.fw.write('\n')
|
||||
self.wc += 1
|
||||
|
||||
def get_value(self, key):
|
||||
if type(key) == int:
|
||||
return key
|
||||
return key[0] if key[0] == key[1] else str(key)[1:-1]
|
||||
|
||||
def get_route(self, n):
|
||||
r = 0
|
||||
for i, b in enumerate(self.blocks):
|
||||
if i <= n:
|
||||
r += b
|
||||
else:
|
||||
break
|
||||
return r - 1
|
||||
|
||||
def get_activation(self, act):
|
||||
if act == 'Hardswish':
|
||||
return 'hardswish'
|
||||
elif act == 'LeakyReLU':
|
||||
return 'leaky'
|
||||
elif act == 'SiLU':
|
||||
return 'silu'
|
||||
return 'linear'
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(description='PyTorch YOLOX conversion')
|
||||
parser.add_argument('-w', '--weights', required=True, help='Input weights (.pth) file path (required)')
|
||||
parser.add_argument('-e', '--exp', required=True, help='Input exp (.py) file path (required)')
|
||||
args = parser.parse_args()
|
||||
if not os.path.isfile(args.weights):
|
||||
raise SystemExit('Invalid weights file')
|
||||
if not os.path.isfile(args.exp):
|
||||
raise SystemExit('Invalid exp file')
|
||||
return args.weights, args.exp
|
||||
|
||||
|
||||
pth_file, exp_file = parse_args()
|
||||
|
||||
exp = get_exp(exp_file)
|
||||
model = exp.get_model()
|
||||
model.load_state_dict(torch.load(pth_file, map_location='cpu')['model'])
|
||||
model.to('cpu').eval()
|
||||
|
||||
model_name = exp.exp_name
|
||||
inference_size = (exp.input_size[1], exp.input_size[0])
|
||||
|
||||
backbone = model.backbone._get_name()
|
||||
head = model.head._get_name()
|
||||
|
||||
wts_file = model_name + '.wts' if 'yolox' in model_name else 'yolox_' + model_name + '.wts'
|
||||
cfg_file = model_name + '.cfg' if 'yolox' in model_name else 'yolox_' + model_name + '.cfg'
|
||||
|
||||
with open(wts_file, 'w') as fw, open(cfg_file, 'w') as fc:
|
||||
layers = Layers(inference_size, fw, fc)
|
||||
|
||||
if backbone == 'YOLOPAFPN':
|
||||
layers.fc.write('\n# YOLOPAFPN\n')
|
||||
|
||||
layers.Focus(model.backbone.backbone.stem)
|
||||
layers.Conv(model.backbone.backbone.dark2[0])
|
||||
layers.CSPLayer(model.backbone.backbone.dark2[1])
|
||||
layers.Conv(model.backbone.backbone.dark3[0])
|
||||
layers.CSPLayer(model.backbone.backbone.dark3[1], 'backbone')
|
||||
layers.Conv(model.backbone.backbone.dark4[0])
|
||||
layers.CSPLayer(model.backbone.backbone.dark4[1], 'backbone')
|
||||
layers.Conv(model.backbone.backbone.dark5[0])
|
||||
layers.SPPBottleneck(model.backbone.backbone.dark5[1])
|
||||
layers.CSPLayer(model.backbone.backbone.dark5[2], 'backbone')
|
||||
layers.BaseConv(model.backbone.lateral_conv0, 'fpn')
|
||||
layers.Upsample(model.backbone.upsample)
|
||||
layers.Concat(layers.backbone_outs[1])
|
||||
layers.CSPLayer(model.backbone.C3_p4)
|
||||
layers.BaseConv(model.backbone.reduce_conv1, 'fpn')
|
||||
layers.Upsample(model.backbone.upsample)
|
||||
layers.Concat(layers.backbone_outs[0])
|
||||
layers.CSPLayer(model.backbone.C3_p3, 'pan')
|
||||
layers.Conv(model.backbone.bu_conv2)
|
||||
layers.Concat(layers.fpn_feats[1])
|
||||
layers.CSPLayer(model.backbone.C3_n3, 'pan')
|
||||
layers.Conv(model.backbone.bu_conv1)
|
||||
layers.Concat(layers.fpn_feats[0])
|
||||
layers.CSPLayer(model.backbone.C3_n4, 'pan')
|
||||
layers.pan_feats = layers.pan_feats[::-1]
|
||||
else:
|
||||
raise SystemExit('Model not supported')
|
||||
|
||||
if head == 'YOLOXHead':
|
||||
layers.fc.write('\n# YOLOXHead\n')
|
||||
|
||||
for i, feat in enumerate(layers.pan_feats):
|
||||
idx = len(layers.pan_feats) - i - 1
|
||||
dw = True if model.head.cls_convs[idx][0]._get_name() == 'DWConv' else False
|
||||
if i > 0:
|
||||
layers.Route(feat)
|
||||
layers.BaseConv(model.head.stems[idx])
|
||||
layers.Conv(model.head.cls_convs[idx][0])
|
||||
layers.Conv(model.head.cls_convs[idx][1])
|
||||
layers.BaseConv(model.head.cls_preds[idx], act='sigmoid')
|
||||
if dw:
|
||||
layers.Route(-6)
|
||||
else:
|
||||
layers.Route(-4)
|
||||
layers.Conv(model.head.reg_convs[idx][0])
|
||||
layers.Conv(model.head.reg_convs[idx][1])
|
||||
layers.BaseConv(model.head.obj_preds[idx], act='sigmoid')
|
||||
layers.Route(-2)
|
||||
layers.BaseConv(model.head.reg_preds[idx])
|
||||
if dw:
|
||||
layers.RouteShuffleOut('-1, -3, -9')
|
||||
else:
|
||||
layers.RouteShuffleOut('-1, -3, -7')
|
||||
layers.Detect(model.head.strides)
|
||||
|
||||
else:
|
||||
raise SystemExit('Model not supported')
|
||||
|
||||
os.system('echo "%d" | cat - %s > temp && mv temp %s' % (layers.wc, wts_file, wts_file))
|
||||
Reference in New Issue
Block a user