314 lines
10 KiB
Python
314 lines
10 KiB
Python
import argparse
|
|
import os
|
|
import struct
|
|
import torch
|
|
from ultralytics.yolo.utils.torch_utils import select_device
|
|
|
|
|
|
class Layers(object):
|
|
def __init__(self, n, size, fw, fc):
|
|
self.blocks = [0 for _ in range(n)]
|
|
self.current = -1
|
|
|
|
self.width = size[0] if len(size) == 1 else size[1]
|
|
self.height = size[0]
|
|
|
|
self.fw = fw
|
|
self.fc = fc
|
|
self.wc = 0
|
|
|
|
self.net()
|
|
|
|
def Conv(self, child):
|
|
self.current = child.i
|
|
self.fc.write('\n# Conv\n')
|
|
|
|
self.convolutional(child)
|
|
|
|
def C2f(self, child):
|
|
self.current = child.i
|
|
self.fc.write('\n# C2f\n')
|
|
|
|
self.convolutional(child.cv1)
|
|
self.c2f(child.m)
|
|
self.convolutional(child.cv2)
|
|
|
|
def SPPF(self, child):
|
|
self.current = child.i
|
|
self.fc.write('\n# SPPF\n')
|
|
|
|
self.convolutional(child.cv1)
|
|
self.maxpool(child.m)
|
|
self.maxpool(child.m)
|
|
self.maxpool(child.m)
|
|
self.route('-4, -3, -2, -1')
|
|
self.convolutional(child.cv2)
|
|
|
|
def Upsample(self, child):
|
|
self.current = child.i
|
|
self.fc.write('\n# Upsample\n')
|
|
|
|
self.upsample(child)
|
|
|
|
def Concat(self, child):
|
|
self.current = child.i
|
|
self.fc.write('\n# Concat\n')
|
|
|
|
r = []
|
|
for i in range(1, len(child.f)):
|
|
r.append(self.get_route(child.f[i]))
|
|
self.route('-1, %s' % str(r)[1:-1])
|
|
|
|
def Detect(self, child):
|
|
self.current = child.i
|
|
self.fc.write('\n# Detect\n')
|
|
|
|
output_idxs = [0 for _ in range(child.nl)]
|
|
for i in range(child.nl):
|
|
r = self.get_route(child.f[i])
|
|
self.route('%d' % r)
|
|
for j in range(len(child.cv3[i])):
|
|
self.convolutional(child.cv3[i][j])
|
|
self.route('%d' % (-1 - len(child.cv3[i])))
|
|
for j in range(len(child.cv2[i])):
|
|
self.convolutional(child.cv2[i][j])
|
|
self.route('-1, %d' % (-2 - len(child.cv2[i])))
|
|
self.shuffle(reshape=[child.no, -1])
|
|
output_idxs[i] = (-1 + i * (-4 - len(child.cv3[i]) - len(child.cv2[i])))
|
|
self.route('%s' % str(output_idxs[::-1])[1:-1], axis=1)
|
|
self.yolo(child)
|
|
|
|
def net(self):
|
|
self.fc.write('[net]\n' +
|
|
'width=%d\n' % self.width +
|
|
'height=%d\n' % self.height +
|
|
'channels=3\n' +
|
|
'letter_box=1\n')
|
|
|
|
def convolutional(self, cv, act=None, detect=False):
|
|
self.blocks[self.current] += 1
|
|
|
|
self.get_state_dict(cv.state_dict())
|
|
|
|
if cv._get_name() == 'Conv2d':
|
|
filters = cv.out_channels
|
|
size = cv.kernel_size
|
|
stride = cv.stride
|
|
pad = cv.padding
|
|
groups = cv.groups
|
|
bias = cv.bias
|
|
bn = False
|
|
act = 'linear' if not detect else 'logistic'
|
|
else:
|
|
filters = cv.conv.out_channels
|
|
size = cv.conv.kernel_size
|
|
stride = cv.conv.stride
|
|
pad = cv.conv.padding
|
|
groups = cv.conv.groups
|
|
bias = cv.conv.bias
|
|
bn = True if hasattr(cv, 'bn') else False
|
|
if act is None:
|
|
act = self.get_activation(cv.act._get_name()) if hasattr(cv, 'act') else 'linear'
|
|
|
|
b = 'batch_normalize=1\n' if bn is True else ''
|
|
g = 'groups=%d\n' % groups if groups > 1 else ''
|
|
w = 'bias=1\n' if bias is not None and bn is not False else 'bias=0\n' if bias is None and bn is False else ''
|
|
|
|
self.fc.write('\n[convolutional]\n' +
|
|
b +
|
|
'filters=%d\n' % filters +
|
|
'size=%s\n' % self.get_value(size) +
|
|
'stride=%s\n' % self.get_value(stride) +
|
|
'pad=%s\n' % self.get_value(pad) +
|
|
g +
|
|
w +
|
|
'activation=%s\n' % act)
|
|
|
|
def c2f(self, m):
|
|
self.blocks[self.current] += 1
|
|
|
|
for x in m:
|
|
self.get_state_dict(x.state_dict())
|
|
|
|
n = len(m)
|
|
shortcut = 1 if m[0].add else 0
|
|
filters = m[0].cv1.conv.out_channels
|
|
size = m[0].cv1.conv.kernel_size
|
|
stride = m[0].cv1.conv.stride
|
|
pad = m[0].cv1.conv.padding
|
|
groups = m[0].cv1.conv.groups
|
|
bias = m[0].cv1.conv.bias
|
|
bn = True if hasattr(m[0].cv1, 'bn') else False
|
|
act = 'linear'
|
|
if hasattr(m[0].cv1, 'act'):
|
|
act = self.get_activation(m[0].cv1.act._get_name())
|
|
|
|
b = 'batch_normalize=1\n' if bn is True else ''
|
|
g = 'groups=%d\n' % groups if groups > 1 else ''
|
|
w = 'bias=1\n' if bias is not None and bn is not False else 'bias=0\n' if bias is None and bn is False else ''
|
|
|
|
self.fc.write('\n[c2f]\n' +
|
|
'n=%d\n' % n +
|
|
'shortcut=%d\n' % shortcut +
|
|
b +
|
|
'filters=%d\n' % filters +
|
|
'size=%s\n' % self.get_value(size) +
|
|
'stride=%s\n' % self.get_value(stride) +
|
|
'pad=%s\n' % self.get_value(pad) +
|
|
g +
|
|
w +
|
|
'activation=%s\n' % act)
|
|
|
|
def route(self, layers, axis=0):
|
|
self.blocks[self.current] += 1
|
|
|
|
a = 'axis=%d\n' % axis if axis != 0 else ''
|
|
|
|
self.fc.write('\n[route]\n' +
|
|
'layers=%s\n' % layers +
|
|
a)
|
|
|
|
def shortcut(self, r, ew='add', act='linear'):
|
|
self.blocks[self.current] += 1
|
|
|
|
m = 'mode=mul\n' if ew == 'mul' else ''
|
|
|
|
self.fc.write('\n[shortcut]\n' +
|
|
'from=%d\n' % r +
|
|
m +
|
|
'activation=%s\n' % act)
|
|
|
|
def maxpool(self, m):
|
|
self.blocks[self.current] += 1
|
|
|
|
stride = m.stride
|
|
size = m.kernel_size
|
|
mode = m.ceil_mode
|
|
|
|
m = 'maxpool_up' if mode else 'maxpool'
|
|
|
|
self.fc.write('\n[%s]\n' % m +
|
|
'stride=%d\n' % stride +
|
|
'size=%d\n' % size)
|
|
|
|
def upsample(self, child):
|
|
self.blocks[self.current] += 1
|
|
|
|
stride = child.scale_factor
|
|
|
|
self.fc.write('\n[upsample]\n' +
|
|
'stride=%d\n' % stride)
|
|
|
|
def shuffle(self, reshape=None, transpose1=None, transpose2=None):
|
|
self.blocks[self.current] += 1
|
|
|
|
r = 'reshape=%s\n' % ', '.join(str(x) for x in reshape) if reshape is not None else ''
|
|
t1 = 'transpose1=%s\n' % ', '.join(str(x) for x in transpose1) if transpose1 is not None else ''
|
|
t2 = 'transpose2=%s\n' % ', '.join(str(x) for x in transpose2) if transpose2 is not None else ''
|
|
|
|
self.fc.write('\n[shuffle]\n' +
|
|
r +
|
|
t1 +
|
|
t2)
|
|
|
|
def yolo(self, child):
|
|
self.blocks[self.current] += 1
|
|
|
|
self.fc.write('\n[detect_v8]\n' +
|
|
'num=%d\n' % (child.reg_max * 4) +
|
|
'classes=%d\n' % child.nc)
|
|
|
|
def get_state_dict(self, state_dict):
|
|
for k, v in state_dict.items():
|
|
if 'num_batches_tracked' not in k:
|
|
vr = v.reshape(-1).numpy()
|
|
self.fw.write('{} {} '.format(k, len(vr)))
|
|
for vv in vr:
|
|
self.fw.write(' ')
|
|
self.fw.write(struct.pack('>f', float(vv)).hex())
|
|
self.fw.write('\n')
|
|
self.wc += 1
|
|
|
|
def get_anchors(self, anchor_points, stride_tensor):
|
|
vr = anchor_points.numpy()
|
|
self.fw.write('{} {} '.format('anchor_points', len(vr)))
|
|
for vv in vr:
|
|
self.fw.write(' ')
|
|
self.fw.write(struct.pack('>f', float(vv)).hex())
|
|
self.fw.write('\n')
|
|
self.wc += 1
|
|
vr = stride_tensor.numpy()
|
|
self.fw.write('{} {} '.format('stride_tensor', len(vr)))
|
|
for vv in vr:
|
|
self.fw.write(' ')
|
|
self.fw.write(struct.pack('>f', float(vv)).hex())
|
|
self.fw.write('\n')
|
|
self.wc += 1
|
|
|
|
def get_value(self, key):
|
|
if type(key) == int:
|
|
return key
|
|
return key[0] if key[0] == key[1] else str(key)[1:-1]
|
|
|
|
def get_route(self, n):
|
|
r = 0
|
|
for i, b in enumerate(self.blocks):
|
|
if i <= n:
|
|
r += b
|
|
else:
|
|
break
|
|
return r - 1
|
|
|
|
def get_activation(self, act):
|
|
if act == 'Hardswish':
|
|
return 'hardswish'
|
|
elif act == 'LeakyReLU':
|
|
return 'leaky'
|
|
elif act == 'SiLU':
|
|
return 'silu'
|
|
return 'linear'
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(description='PyTorch YOLOv8 conversion')
|
|
parser.add_argument('-w', '--weights', required=True, help='Input weights (.pt) file path (required)')
|
|
parser.add_argument(
|
|
'-s', '--size', nargs='+', type=int, default=[640], help='Inference size [H,W] (default [640])')
|
|
args = parser.parse_args()
|
|
if not os.path.isfile(args.weights):
|
|
raise SystemExit('Invalid weights file')
|
|
return args.weights, args.size
|
|
|
|
|
|
pt_file, inference_size = parse_args()
|
|
|
|
model_name = os.path.basename(pt_file).split('.pt')[0]
|
|
wts_file = model_name + '.wts' if 'yolov8' in model_name else 'yolov8_' + model_name + '.wts'
|
|
cfg_file = model_name + '.cfg' if 'yolov8' in model_name else 'yolov8_' + model_name + '.cfg'
|
|
|
|
device = select_device('cpu')
|
|
model = torch.load(pt_file, map_location=device)['model'].float()
|
|
model.to(device).eval()
|
|
|
|
with open(wts_file, 'w') as fw, open(cfg_file, 'w') as fc:
|
|
layers = Layers(len(model.model), inference_size, fw, fc)
|
|
|
|
for child in model.model.children():
|
|
if child._get_name() == 'Conv':
|
|
layers.Conv(child)
|
|
elif child._get_name() == 'C2f':
|
|
layers.C2f(child)
|
|
elif child._get_name() == 'SPPF':
|
|
layers.SPPF(child)
|
|
elif child._get_name() == 'Upsample':
|
|
layers.Upsample(child)
|
|
elif child._get_name() == 'Concat':
|
|
layers.Concat(child)
|
|
elif child._get_name() == 'Detect':
|
|
layers.Detect(child)
|
|
layers.get_anchors(child.anchors.reshape([-1]), child.strides.reshape([-1]))
|
|
else:
|
|
raise SystemExit('Model not supported')
|
|
|
|
os.system('echo "%d" | cat - %s > temp && mv temp %s' % (layers.wc, wts_file, wts_file))
|