| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- import unittest
- from caffe2.python import core
- from hypothesis import given
- import hypothesis.strategies as st
- import caffe2.python.hypothesis_test_util as hu
- from caffe2.python import workspace
- from caffe2.python.functional import Functional
- import numpy as np
- @st.composite
- def _tensor_splits(draw, add_axis=False):
- """Generates (axis, split_info, tensor_splits) tuples."""
- tensor = draw(hu.tensor(min_value=4)) # Each dim has at least 4 elements.
- axis = draw(st.integers(0, len(tensor.shape) - 1))
- if add_axis:
- # Simple case: get individual slices along one axis, where each of them
- # is (N-1)-dimensional. The axis will be added back upon concatenation.
- return (
- axis, np.ones(tensor.shape[axis], dtype=np.int32), [
- np.array(tensor.take(i, axis=axis))
- for i in range(tensor.shape[axis])
- ]
- )
- else:
- # General case: pick some (possibly consecutive, even non-unique)
- # indices at which we will split the tensor, along the given axis.
- splits = sorted(
- draw(
- st.
- lists(elements=st.integers(0, tensor.shape[axis]), max_size=4)
- ) + [0, tensor.shape[axis]]
- )
- return (
- axis, np.array(np.diff(splits), dtype=np.int32), [
- tensor.take(range(splits[i], splits[i + 1]), axis=axis)
- for i in range(len(splits) - 1)
- ],
- )
- class TestFunctional(hu.HypothesisTestCase):
- @given(X=hu.tensor(), engine=st.sampled_from(["", "CUDNN"]), **hu.gcs)
- def test_relu(self, X, engine, gc, dc):
- X += 0.02 * np.sign(X)
- X[X == 0.0] += 0.02
- output = Functional.Relu(X, device_option=gc)
- Y_l = output[0]
- Y_d = output["output_0"]
- with workspace.WorkspaceGuard("tmp_workspace"):
- op = core.CreateOperator("Relu", ["X"], ["Y"], engine=engine)
- workspace.FeedBlob("X", X)
- workspace.RunOperatorOnce(op)
- Y_ref = workspace.FetchBlob("Y")
- np.testing.assert_array_equal(
- Y_l, Y_ref, err_msg='Functional Relu result mismatch'
- )
- np.testing.assert_array_equal(
- Y_d, Y_ref, err_msg='Functional Relu result mismatch'
- )
- @given(tensor_splits=_tensor_splits(), **hu.gcs)
- def test_concat(self, tensor_splits, gc, dc):
- # Input Size: 1 -> inf
- axis, _, splits = tensor_splits
- concat_result, split_info = Functional.Concat(*splits, axis=axis, device_option=gc)
- concat_result_ref = np.concatenate(splits, axis=axis)
- split_info_ref = np.array([a.shape[axis] for a in splits])
- np.testing.assert_array_equal(
- concat_result,
- concat_result_ref,
- err_msg='Functional Concat result mismatch'
- )
- np.testing.assert_array_equal(
- split_info,
- split_info_ref,
- err_msg='Functional Concat split info mismatch'
- )
- @given(tensor_splits=_tensor_splits(), split_as_arg=st.booleans(), **hu.gcs)
- def test_split(self, tensor_splits, split_as_arg, gc, dc):
- # Output Size: 1 - inf
- axis, split_info, splits = tensor_splits
- split_as_arg = True
- if split_as_arg:
- input_tensors = [np.concatenate(splits, axis=axis)]
- kwargs = dict(axis=axis, split=split_info, num_output=len(splits))
- else:
- input_tensors = [np.concatenate(splits, axis=axis), split_info]
- kwargs = dict(axis=axis, num_output=len(splits))
- result = Functional.Split(*input_tensors, device_option=gc, **kwargs)
- def split_ref(input, split=split_info):
- s = np.cumsum([0] + list(split))
- return [
- np.array(input.take(np.arange(s[i], s[i + 1]), axis=axis))
- for i in range(len(split))
- ]
- result_ref = split_ref(*input_tensors)
- for i, ref in enumerate(result_ref):
- np.testing.assert_array_equal(
- result[i], ref, err_msg='Functional Relu result mismatch'
- )
- if __name__ == '__main__':
- unittest.main()
|