| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557 |
- # TODO(jiayq): as more and more tests are moving to hypothesis test, we
- # can gradually remove this test script. DO NOT ADD MORE TESTS TO THIS
- # FILE.
- import numpy as np
- from caffe2.python import (
- brew,
- core,
- device_checker,
- gradient_checker,
- model_helper,
- test_util,
- workspace,
- )
- from caffe2.python.gradient_checker import NetGradientChecker
- from caffe2.python.net_builder import ops, NetBuilder
- from caffe2.proto import caffe2_pb2
- import unittest
- from typing import Optional
- if workspace.has_gpu_support and workspace.NumGpuDevices() > 0:
- _gpu_dev_option = caffe2_pb2.DeviceOption()
- _gpu_dev_option.device_type = workspace.GpuDeviceType
- cpu_device_option = caffe2_pb2.DeviceOption()
- gpu_device_checker = device_checker.DeviceChecker(
- 0.01, [_gpu_dev_option]
- )
- device_checker = device_checker.DeviceChecker(
- 0.01, [_gpu_dev_option, cpu_device_option]
- )
- gpu_gradient_checkers = [
- gradient_checker.GradientChecker(
- 0.005, 0.05, _gpu_dev_option, "gpu_checker_ws"
- ),
- ]
- gradient_checkers = [
- gradient_checker.GradientChecker(
- 0.005, 0.05, _gpu_dev_option, "gpu_checker_ws"
- ),
- gradient_checker.GradientChecker(
- 0.01, 0.05, cpu_device_option, "cpu_checker_ws"
- ),
- ]
- gpu_device_option: Optional[caffe2_pb2.DeviceOption] = _gpu_dev_option
- else:
- cpu_device_option = caffe2_pb2.DeviceOption()
- gpu_device_option = None
- gpu_device_checker = device_checker.DeviceChecker(
- 0.01, []
- )
- device_checker = device_checker.DeviceChecker(0.01, [cpu_device_option])
- gradient_checkers = [
- gradient_checker.GradientChecker(
- 0.01, 0.05, cpu_device_option, "cpu_checker_ws"
- )
- ]
- gpu_gradient_checkers = []
- class TestLRN(test_util.TestCase):
- def setUp(self):
- self.test_configs = [(6, 10), (3, 13), ]
- def testLRN(self):
- for input_size, depth in self.test_configs:
- op = core.CreateOperator("LRN",
- ["X"],
- ["Y", "Y_scale"],
- size=11,
- alpha=0.001,
- beta=0.5,
- bias=2.0,
- order="NHWC"
- )
- X = np.random.rand(2, input_size, input_size,
- depth).astype(np.float32)
- res = device_checker.CheckSimple(op, [X], [0])
- self.assertTrue(res)
- for checker in gradient_checkers:
- res, grad, grad_estimated = checker.CheckSimple(op, [X], 0, [0])
- self.assertTrue(res)
- class TestFlatten(test_util.TestCase):
- def testFlatten(self):
- op = core.CreateOperator("Flatten", ["X"], ["Y"])
- X = np.random.rand(2, 3, 4, 5).astype(np.float32)
- res = device_checker.CheckSimple(op, [X], [0])
- self.assertTrue(res)
- for checker in gradient_checkers:
- res, grad, grad_estimated = checker.CheckSimple(op, [X], 0, [0])
- self.assertTrue(res)
- class TestConcat(test_util.TestCase):
- def setUp(self):
- self.test_configs = [
- # input_size, depth1, depth2, depth3, depth4
- (3, 2, 3, 4, 5),
- (4, 5, 4, 3, 2),
- ]
- def testConcatNHWC(self):
- for input_size, d1, d2, d3, d4 in self.test_configs:
- op = core.CreateOperator("Concat",
- ["X1", "X2", "X3", "X4"],
- ["Y", "Y_dims"],
- order="NHWC"
- )
- Xs = [
- np.random.rand(2, input_size, input_size,
- d1).astype(np.float32),
- np.random.rand(2, input_size, input_size,
- d2).astype(np.float32),
- np.random.rand(2, input_size, input_size,
- d3).astype(np.float32),
- np.random.rand(2, input_size, input_size, d4).astype(np.float32)
- ]
- for i in range(4):
- res = device_checker.CheckSimple(op, Xs, [0])
- self.assertTrue(res)
- for checker in gradient_checkers:
- res, grad, grad_estimated = checker.CheckSimple(op, Xs, i,
- [0])
- self.assertTrue(res)
- def testConcatNCHW(self):
- for input_size, d1, d2, d3, d4 in self.test_configs:
- op = core.CreateOperator("Concat",
- ["X1", "X2", "X3", "X4"],
- ["Y", "Y_dims"],
- order="NCHW"
- )
- Xs = [
- np.random.rand(2, d1, input_size,
- input_size).astype(np.float32),
- np.random.rand(2, d2, input_size,
- input_size).astype(np.float32),
- np.random.rand(2, d3, input_size,
- input_size).astype(np.float32),
- np.random.rand(2, d4, input_size, input_size).astype(np.float32)
- ]
- for i in range(4):
- res = device_checker.CheckSimple(op, Xs, [0])
- self.assertTrue(res)
- for checker in gradient_checkers:
- res, grad, grad_estimated = checker.CheckSimple(op, Xs, i,
- [0])
- self.assertTrue(res)
- class TestRelu(test_util.TestCase):
- def setUp(self):
- self.test_configs = [
- # input size
- # (0, 1),
- (1, 1),
- (2, 1),
- (1, 3, 3, 1),
- (2, 3, 3, 1),
- (1, 5, 5, 3),
- (2, 5, 5, 3),
- ]
- def testRelu(self):
- for input_size in self.test_configs:
- op = core.CreateOperator("Relu", ["X"], ["Y"])
- X = np.random.rand(*input_size).astype(np.float32)
- # go away from the origin point to avoid kink problems
- X += 0.01 * np.sign(X)
- X[X == 0] = 0.01
- res = device_checker.CheckSimple(op, [X], [0])
- self.assertTrue(res)
- for checker in gradient_checkers:
- res, grad, grad_estimated = checker.CheckSimple(op, [X], 0, [0])
- self.assertTrue(res)
- class TestTanh(test_util.TestCase):
- def setUp(self):
- self.test_configs = [
- # (0, 1),
- (1, 1),
- (2, 1),
- (1, 2, 3, 4),
- ]
- def testTanh(self):
- for input_size in self.test_configs:
- op = core.CreateOperator("Tanh", ["X"], ["Y"])
- X = np.random.rand(*input_size).astype(np.float32) - 0.5
- res = device_checker.CheckSimple(op, [X], [0])
- self.assertTrue(res)
- for checker in gradient_checkers:
- res, grad, grad_estimated = checker.CheckSimple(op, [X], 0, [0])
- self.assertTrue(res)
- class TestAbs(test_util.TestCase):
- def setUp(self):
- self.test_configs = [
- (1, 1),
- (2, 3),
- (2, 3, 4),
- (2, 3, 4, 5),
- ]
- def testAbs(self):
- for input_size in self.test_configs:
- op = core.CreateOperator("Abs", ["X"], ["Y"])
- X = np.random.rand(*input_size).astype(np.float32)
- # go away from the origin point to avoid kink problems
- X += 0.01 * np.sign(X)
- X[X == 0] = 0.01
- res = device_checker.CheckSimple(op, [X], [0])
- self.assertTrue(res)
- for checker in gradient_checkers:
- res, grad, grad_estimated = checker.CheckSimple(op, [X], 0, [0])
- self.assertTrue(res)
- class TestExp(test_util.TestCase):
- def setUp(self):
- self.test_configs = [
- # (0, 1),
- (1, 1),
- (2, 1),
- (1, 2, 3, 4),
- ]
- def testExp(self):
- for input_size in self.test_configs:
- op = core.CreateOperator("Exp", ["X"], ["Y"])
- X = np.random.rand(*input_size).astype(np.float32) - 0.5
- res = device_checker.CheckSimple(op, [X], [0])
- self.assertTrue(res)
- for checker in gradient_checkers:
- res, grad, grad_estimated = checker.CheckSimple(op, [X], 0, [0])
- self.assertTrue(res)
- class TestCos(test_util.TestCase):
- def setUp(self):
- self.test_configs = [
- (1, 1),
- (2, 3),
- (2, 3, 4),
- (2, 3, 4, 5),
- ]
- def testCos(self):
- for input_size in self.test_configs:
- op = core.CreateOperator("Cos", ["X"], ["Y"])
- X = np.random.rand(*input_size).astype(np.float32) - 0.5
- res = device_checker.CheckSimple(op, [X], [0])
- self.assertTrue(res)
- for checker in gradient_checkers:
- res, grad, grad_estimated = checker.CheckSimple(op, [X], 0, [0])
- self.assertTrue(res)
- class TestSin(test_util.TestCase):
- def setUp(self):
- self.test_configs = [
- (1, 1),
- (2, 3),
- (2, 3, 4),
- (2, 3, 4, 5),
- ]
- def testSin(self):
- for input_size in self.test_configs:
- op = core.CreateOperator("Sin", ["X"], ["Y"])
- X = np.random.rand(*input_size).astype(np.float32) - 0.5
- res = device_checker.CheckSimple(op, [X], [0])
- self.assertTrue(res)
- for checker in gradient_checkers:
- res, grad, grad_estimated = checker.CheckSimple(op, [X], 0, [0])
- self.assertTrue(res)
- class TestSigmoid(test_util.TestCase):
- def setUp(self):
- self.test_configs = [
- # (0, 1),
- (1, 1),
- (2, 1),
- (1, 2, 3, 4),
- ]
- def testSigmoid(self):
- for input_size in self.test_configs:
- op = core.CreateOperator("Sigmoid", ["X"], ["Y"])
- X = np.random.rand(*input_size).astype(np.float32) - 0.5
- res = device_checker.CheckSimple(op, [X], [0])
- self.assertTrue(res)
- for checker in gradient_checkers:
- res, grad, grad_estimated = checker.CheckSimple(op, [X], 0, [0])
- self.assertTrue(res)
- class TestSum(test_util.TestCase):
- def setUp(self):
- self.test_configs = [
- # ((0, 1), False),
- ((1, 2, 3, 4), True),
- ((1, 2, 3, 4), False)]
- def testSum(self):
- for (input_size, in_place) in self.test_configs:
- op = core.CreateOperator("Sum", ["X1", "X2"],
- ["Y" if not in_place else "X1"])
- X1 = np.random.rand(*input_size).astype(np.float32) - 0.5
- X2 = np.random.rand(*input_size).astype(np.float32) - 0.5
- res = device_checker.CheckSimple(op, [X1, X2], [0])
- self.assertTrue(res)
- for checker in gradient_checkers:
- res, grad, grad_estimated = checker.CheckSimple(
- op, [X1, X2], 0, [0])
- self.assertTrue(res)
- res, grad, grad_estimated = checker.CheckSimple(
- op, [X1, X2], 1, [0])
- self.assertTrue(res)
- class TestMakeTwoClass(test_util.TestCase):
- def setUp(self):
- self.test_configs = [
- # input size
- # (0, 1),
- (1,),
- (7,),
- (1, 3),
- (2, 5),
- ]
- def testMakeTwoClass(self):
- for input_size in self.test_configs:
- op = core.CreateOperator("MakeTwoClass", ["X"], ["Y"])
- X = np.random.rand(*input_size).astype(np.float32)
- # step a little to avoid gradient problems
- X[X < 0.01] += 0.01
- X[X > 0.99] -= 0.01
- res = device_checker.CheckSimple(op, [X], [0])
- self.assertTrue(res)
- for checker in gradient_checkers:
- res, grad, grad_estimated = checker.CheckSimple(op, [X], 0, [0])
- self.assertTrue(res)
- class TestNetGradientChecker(test_util.TestCase):
- def test_net_gradient_checker(self):
- model = model_helper.ModelHelper(name="test")
- const = model.net.AddExternalInputs("const1", "const2")
- fc = brew.fc(model, dim_in=3, dim_out=4, blob_in="X", blob_out="Y", axis=0)
- dist = [model.net.SquaredL2Distance([fc, c]) for c in const]
- losses = [model.net.AveragedLoss(d) for d in dist] # using two losses here
- workspace.RunNetOnce(model.param_init_net)
- NetGradientChecker.Check(
- model.net,
- outputs_with_grad=losses,
- input_values={"X": np.array([1, 2, 3], dtype="float32"),
- const[0]: np.array([1, 1, 1, 1], dtype="float32"),
- const[1]: np.array([2, 2, 2, 2], dtype="float32")},
- input_to_check="X",
- )
- def test_net_comparison(self):
- # (a + b) * (c + d) == a * c + a * d + b * c + b * d
- net1 = core.Net("net1")
- a, b, c, d = net1.AddExternalInputs("a", "b", "c", "d")
- a_b = net1.Sum([a, b], "a+b")
- c_d = net1.Sum([c, d], "c+d")
- x = net1.Mul([a_b, c_d], "x")
- net2 = core.Net("net2")
- ac = net2.Mul([a, c], "ac")
- ad = net2.Mul([a, d], "ad")
- bc = net2.Mul([b, c], "bc")
- bd = net2.Mul([b, d], "bd")
- y = net2.Sum([ac, ad, bc, bd], "y")
- input_values = {blob: np.array([i], dtype=np.float32)
- for i, blob in enumerate([a, b, c, d])}
- NetGradientChecker.CompareNets(
- [net1, net2], [[x], [y]], [0],
- inputs_with_grads=[a, b, c, d],
- input_values=input_values,
- )
- class TestIf(test_util.TestCase):
- def testIf(self):
- W_a_values = [2.0, 1.5]
- B_a_values = [0.5]
- W_b_values = [7.0, 3.5]
- B_b_values = [1.5]
- with NetBuilder(_use_control_ops=True) as init_nb:
- W_a = ops.UniformFill([], "W_a", shape=[1, 2], min=-1., max=1.)
- B_a = ops.ConstantFill([], "B_a", shape=[1], value=0.0)
- W_b = ops.UniformFill([], "W_b", shape=[1, 2], min=-1., max=1.)
- B_b = ops.ConstantFill([], "B_b", shape=[1], value=0.0)
- W_gt_a = ops.GivenTensorFill(
- [], "W_gt_a", shape=[1, 2], values=W_a_values)
- B_gt_a = ops.GivenTensorFill([], "B_gt_a", shape=[1], values=B_a_values)
- W_gt_b = ops.GivenTensorFill(
- [], "W_gt_b", shape=[1, 2], values=W_b_values)
- B_gt_b = ops.GivenTensorFill([], "B_gt_b", shape=[1], values=B_b_values)
- params = [W_gt_a, B_gt_a, W_a, B_a, W_gt_b, B_gt_b, W_b, B_b]
- with NetBuilder(_use_control_ops=True, initial_scope=params) as train_nb:
- Y_pred = ops.ConstantFill([], "Y_pred", shape=[1], value=0.0)
- Y_noise = ops.ConstantFill([], "Y_noise", shape=[1], value=0.0)
- switch = ops.UniformFill(
- [], "switch", shape=[1], min=-1., max=1., run_once=0)
- zero = ops.ConstantFill([], "zero", shape=[1], value=0.0)
- X = ops.GaussianFill(
- [], "X", shape=[4096, 2], mean=0.0, std=1.0, run_once=0)
- noise = ops.GaussianFill(
- [], "noise", shape=[4096, 1], mean=0.0, std=1.0, run_once=0)
- with ops.IfNet(ops.LT([switch, zero])):
- Y_gt = ops.FC([X, W_gt_a, B_gt_a], "Y_gt")
- ops.Add([Y_gt, noise], Y_noise)
- ops.FC([X, W_a, B_a], Y_pred)
- with ops.Else():
- Y_gt = ops.FC([X, W_gt_b, B_gt_b], "Y_gt")
- ops.Add([Y_gt, noise], Y_noise)
- ops.FC([X, W_b, B_b], Y_pred)
- dist = ops.SquaredL2Distance([Y_noise, Y_pred], "dist")
- loss = dist.AveragedLoss([], ["loss"])
- assert len(init_nb.get()) == 1, "Expected a single init net produced"
- assert len(train_nb.get()) == 1, "Expected a single train net produced"
- train_net = train_nb.get()[0]
- gradient_map = train_net.AddGradientOperators([loss])
- init_net = init_nb.get()[0]
- ITER = init_net.ConstantFill(
- [], "ITER", shape=[1], value=0, dtype=core.DataType.INT64)
- train_net.Iter(ITER, ITER)
- LR = train_net.LearningRate(ITER, "LR", base_lr=-0.1,
- policy="step", stepsize=20, gamma=0.9)
- ONE = init_net.ConstantFill([], "ONE", shape=[1], value=1.)
- train_net.WeightedSum([W_a, ONE, gradient_map[W_a], LR], W_a)
- train_net.WeightedSum([B_a, ONE, gradient_map[B_a], LR], B_a)
- train_net.WeightedSum([W_b, ONE, gradient_map[W_b], LR], W_b)
- train_net.WeightedSum([B_b, ONE, gradient_map[B_b], LR], B_b)
- workspace.RunNetOnce(init_net)
- workspace.CreateNet(train_net)
- # print("Before training, W_a is: {}".format(workspace.FetchBlob("W_a")))
- # print("Before training, B_a is: {}".format(workspace.FetchBlob("B_a")))
- # print("Before training, W_b is: {}".format(workspace.FetchBlob("W_b")))
- # print("Before training, B_b is: {}".format(workspace.FetchBlob("B_b")))
- for _epoch in range(1000):
- workspace.RunNet(train_net.Proto().name)
- # print("After training, W_a is: {}".format(workspace.FetchBlob("W_a")))
- # print("After training, B_a is: {}".format(workspace.FetchBlob("B_a")))
- # print("After training, W_b is: {}".format(workspace.FetchBlob("W_b")))
- # print("After training, B_b is: {}".format(workspace.FetchBlob("B_b")))
- # print("Ground truth W_a is: {}".format(workspace.FetchBlob("W_gt_a")))
- # print("Ground truth B_a is: {}".format(workspace.FetchBlob("B_gt_a")))
- # print("Ground truth W_b is: {}".format(workspace.FetchBlob("W_gt_b")))
- # print("Ground truth B_b is: {}".format(workspace.FetchBlob("B_gt_b")))
- values_map = {
- "W_a": W_a_values,
- "B_a": B_a_values,
- "W_b": W_b_values,
- "B_b": B_b_values,
- }
- train_eps = 0.01
- for blob_name, values in values_map.items():
- trained_values = workspace.FetchBlob(blob_name)
- if trained_values.ndim == 2:
- self.assertEqual(trained_values.shape[0], 1)
- trained_values = trained_values[0][:]
- else:
- self.assertEqual(trained_values.ndim, 1)
- self.assertEqual(trained_values.size, len(values))
- for idx in range(len(trained_values)):
- self.assertTrue(abs(trained_values[idx] - values[idx]) < train_eps)
- class TestWhile(test_util.TestCase):
- @unittest.skip("Skip flaky test.")
- def testWhile(self):
- with NetBuilder(_use_control_ops=True) as nb:
- ops.Copy(ops.Const(0), "i")
- ops.Copy(ops.Const(1), "one")
- ops.Copy(ops.Const(2), "two")
- ops.Copy(ops.Const(2.0), "x")
- ops.Copy(ops.Const(3.0), "y")
- ops.Copy(ops.Const(2.0), "z")
- # raises x to the power of 4 and y to the power of 2
- # and z to the power of 3
- with ops.WhileNet():
- with ops.Condition():
- ops.Add(["i", "one"], "i")
- ops.LE(["i", "two"])
- ops.Pow("x", "x", exponent=2.0)
- with ops.IfNet(ops.LT(["i", "two"])):
- ops.Pow("y", "y", exponent=2.0)
- with ops.Else():
- ops.Pow("z", "z", exponent=3.0)
- ops.Add(["x", "y"], "x_plus_y")
- ops.Add(["x_plus_y", "z"], "s")
- assert len(nb.get()) == 1, "Expected a single net produced"
- net = nb.get()[0]
- net.AddGradientOperators(["s"])
- workspace.RunNetOnce(net)
- # (x^4)' = 4x^3
- self.assertAlmostEqual(workspace.FetchBlob("x_grad"), 32)
- self.assertAlmostEqual(workspace.FetchBlob("x"), 16)
- # (y^2)' = 2y
- self.assertAlmostEqual(workspace.FetchBlob("y_grad"), 6)
- self.assertAlmostEqual(workspace.FetchBlob("y"), 9)
- # (z^3)' = 3z^2
- self.assertAlmostEqual(workspace.FetchBlob("z_grad"), 12)
- self.assertAlmostEqual(workspace.FetchBlob("z"), 8)
- if __name__ == '__main__':
- workspace.GlobalInit(["python"])
- unittest.main()
|