| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- import numpy as np
- import time
- import unittest
- import onnx
- import onnx.defs
- from onnx.backend.base import namedtupledict
- from onnx.helper import make_node, make_graph, make_tensor_value_info, make_model
- from caffe2.proto import caffe2_pb2
- from caffe2.python import core, workspace
- from caffe2.python.models.download import ModelDownloader
- from caffe2.python.onnx.onnxifi import onnxifi_caffe2_net
- from caffe2.python.onnx.tests.test_utils import TestCase
- ONNXIFI_DATATYPE_FLOAT32 = 1
- def _print_net(net):
- for i in net.external_input:
- print("Input: {}".format(i))
- for i in net.external_output:
- print("Output: {}".format(i))
- for op in net.op:
- print("Op {}".format(op.type))
- for x in op.input:
- print(" input: {}".format(x))
- for y in op.output:
- print(" output: {}".format(y))
- class OnnxifiTest(TestCase):
- @unittest.skip("Need ONNXIFI backend support")
- def test_relu_graph(self):
- batch_size = 1
- X = np.random.randn(batch_size, 1, 3, 2).astype(np.float32)
- graph_def = make_graph(
- [make_node("Relu", ["X"], ["Y"])],
- name="test",
- inputs=[make_tensor_value_info("X", onnx.TensorProto.FLOAT,
- [batch_size, 1, 3, 2])],
- outputs=[make_tensor_value_info("Y", onnx.TensorProto.FLOAT,
- [batch_size, 1, 3, 2])])
- model_def = make_model(graph_def, producer_name='relu-test')
- op = core.CreateOperator(
- "Onnxifi",
- ["X"],
- ["Y"],
- onnx_model=model_def.SerializeToString(),
- input_names=["X"],
- output_names=["Y"],
- output_shape_hint_0=[ONNXIFI_DATATYPE_FLOAT32, batch_size, 1, 3, 2])
- workspace.FeedBlob("X", X)
- workspace.RunOperatorOnce(op)
- Y = workspace.FetchBlob("Y")
- np.testing.assert_almost_equal(Y, np.maximum(X, 0))
- @unittest.skip("Need ONNXIFI backend support")
- def test_conv_graph(self):
- X = np.array([[[[0., 1., 2., 3., 4.], # (1, 1, 5, 5) input tensor
- [5., 6., 7., 8., 9.],
- [10., 11., 12., 13., 14.],
- [15., 16., 17., 18., 19.],
- [20., 21., 22., 23., 24.]]]]).astype(np.float32)
- W = np.array([[[[1., 1., 1.], # (1, 1, 3, 3) tensor for convolution weights
- [1., 1., 1.],
- [1., 1., 1.]]]]).astype(np.float32)
- Y_without_padding = np.array([[[[54., 63., 72.], # (1, 1, 3, 3) output tensor
- [99., 108., 117.],
- [144., 153., 162.]]]]).astype(np.float32)
- graph_def = make_graph(
- [make_node(
- 'Conv',
- inputs=['X', 'W'],
- outputs=['Y'],
- kernel_shape=[3, 3],
- # Default values for other attributes: strides=[1, 1], dilations=[1, 1], groups=1
- pads=[0, 0, 0, 0],
- )],
- name="test",
- inputs=[make_tensor_value_info("X", onnx.TensorProto.FLOAT, [1, 1, 5, 5]),
- make_tensor_value_info("W", onnx.TensorProto.FLOAT, [1, 1, 3, 3]),
- ],
- outputs=[make_tensor_value_info("Y", onnx.TensorProto.FLOAT,
- [1, 1, 3, 3])])
- model_def = make_model(graph_def, producer_name='conv-test')
- # We intentional rewrite the input/output name so test that the
- # input/output binding of c2 op is positional
- op = core.CreateOperator(
- "Onnxifi",
- ["X0"],
- ["Y0"],
- onnx_model=model_def.SerializeToString(),
- initializers=["W", "W0"],
- input_names=["X"],
- output_names=["Y"],
- output_shape_hint_0=[ONNXIFI_DATATYPE_FLOAT32, 1, 1, 3, 3])
- workspace.FeedBlob("X0", X)
- workspace.FeedBlob("W0", W)
- workspace.RunOperatorOnce(op)
- Y = workspace.FetchBlob("Y0")
- np.testing.assert_almost_equal(Y, Y_without_padding)
- class OnnxifiTransformTest(TestCase):
- def setUp(self):
- self.model_downloader = ModelDownloader()
- def _add_head_tail(self, pred_net, new_head, new_tail):
- orig_head = pred_net.external_input[0]
- orig_tail = pred_net.external_output[0]
- # Add head
- head = caffe2_pb2.OperatorDef()
- head.type = "Copy"
- head.input.append(new_head)
- head.output.append(orig_head)
- dummy = caffe2_pb2.NetDef()
- dummy.op.extend(pred_net.op)
- del pred_net.op[:]
- pred_net.op.extend([head])
- pred_net.op.extend(dummy.op)
- pred_net.external_input[0] = new_head
- # Add tail
- tail = caffe2_pb2.OperatorDef()
- tail.type = "Copy"
- tail.input.append(orig_tail)
- tail.output.append(new_tail)
- pred_net.op.extend([tail])
- pred_net.external_output[0] = new_tail
- @unittest.skip("Need ONNXIFI backend support")
- def test_resnet50_core(self):
- N = 1
- repeat = 1
- print("Batch size: {}, repeat inference {} times".format(N, repeat))
- init_net, pred_net, _ = self.model_downloader.get_c2_model('resnet50')
- self._add_head_tail(pred_net, 'real_data', 'real_softmax')
- input_blob_dims = (N, 3, 224, 224)
- input_name = "real_data"
- device_option = core.DeviceOption(caffe2_pb2.CPU, 0)
- init_net.device_option.CopyFrom(device_option)
- pred_net.device_option.CopyFrom(device_option)
- for op in pred_net.op:
- op.device_option.CopyFrom(device_option)
- net_outputs = pred_net.external_output
- Y_c2 = None
- data = np.random.randn(*input_blob_dims).astype(np.float32)
- c2_time = 1
- workspace.SwitchWorkspace("onnxifi_test", True)
- with core.DeviceScope(device_option):
- workspace.FeedBlob(input_name, data)
- workspace.RunNetOnce(init_net)
- workspace.CreateNet(pred_net)
- start = time.time()
- for _ in range(repeat):
- workspace.RunNet(pred_net.name)
- end = time.time()
- c2_time = end - start
- output_values = [workspace.FetchBlob(name) for name in net_outputs]
- Y_c2 = namedtupledict('Outputs', net_outputs)(*output_values)
- workspace.ResetWorkspace()
- # Fill the workspace with the weights
- with core.DeviceScope(device_option):
- workspace.RunNetOnce(init_net)
- # Cut the graph
- start = time.time()
- pred_net_cut = onnxifi_caffe2_net(pred_net,
- {input_name: input_blob_dims},
- infer_shapes=True)
- del init_net, pred_net
- #_print_net(pred_net_cut)
- Y_trt = None
- input_name = pred_net_cut.external_input[0]
- print("C2 runtime: {}s".format(c2_time))
- with core.DeviceScope(device_option):
- workspace.FeedBlob(input_name, data)
- workspace.CreateNet(pred_net_cut)
- end = time.time()
- print("Conversion time: {:.2f}s".format(end - start))
- start = time.time()
- for _ in range(repeat):
- workspace.RunNet(pred_net_cut.name)
- end = time.time()
- trt_time = end - start
- print("Onnxifi runtime: {}s, improvement: {}%".format(trt_time, (c2_time - trt_time) / c2_time * 100))
- output_values = [workspace.FetchBlob(name) for name in net_outputs]
- Y_trt = namedtupledict('Outputs', net_outputs)(*output_values)
- np.testing.assert_allclose(Y_c2, Y_trt, rtol=1e-3)
|