| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706 |
- #!/usr/bin/env python3
- from hypothesis import given, settings
- import hypothesis.strategies as st
- from multiprocessing import Process, Queue
- import numpy as np
- import os
- import pickle
- import tempfile
- import shutil
- from caffe2.python import core, workspace, dyndep
- import caffe2.python.hypothesis_test_util as hu
- from gloo.python import IoError
- dyndep.InitOpsLibrary("@/caffe2/caffe2/distributed:file_store_handler_ops")
- dyndep.InitOpsLibrary("@/caffe2/caffe2/distributed:redis_store_handler_ops")
- dyndep.InitOpsLibrary("@/caffe2/caffe2/distributed:store_ops")
- dyndep.InitOpsLibrary("@/caffe2/caffe2/contrib/gloo:gloo_ops")
- dyndep.InitOpsLibrary("@/caffe2/caffe2/contrib/gloo:gloo_ops_gpu")
- op_engine = 'GLOO'
- class TemporaryDirectory:
- def __enter__(self):
- self.tmpdir = tempfile.mkdtemp()
- return self.tmpdir
- def __exit__(self, type, value, traceback):
- shutil.rmtree(self.tmpdir)
- class TestCase(hu.HypothesisTestCase):
- test_counter = 0
- sync_counter = 0
- def run_test_locally(self, fn, device_option=None, **kwargs):
- # Queue for assertion errors on subprocesses
- queue = Queue()
- # Capture any exception thrown by the subprocess
- def run_fn(*args, **kwargs):
- try:
- with core.DeviceScope(device_option):
- fn(*args, **kwargs)
- workspace.ResetWorkspace()
- queue.put(True)
- except Exception as ex:
- queue.put(ex)
- # Start N processes in the background
- procs = []
- for i in range(kwargs['comm_size']):
- kwargs['comm_rank'] = i
- proc = Process(
- target=run_fn,
- kwargs=kwargs)
- proc.start()
- procs.append(proc)
- # Test complete, join background processes
- while len(procs) > 0:
- proc = procs.pop(0)
- while proc.is_alive():
- proc.join(10)
- # Raise exception if we find any. Otherwise each worker
- # should put a True into the queue
- # Note that the following is executed ALSO after
- # the last process was joined, so if ANY exception
- # was raised, it will be re-raised here.
- self.assertFalse(queue.empty(), "Job failed without a result")
- o = queue.get()
- if isinstance(o, Exception):
- raise o
- else:
- self.assertTrue(o)
- def run_test_distributed(self, fn, device_option=None, **kwargs):
- comm_rank = os.getenv('COMM_RANK')
- self.assertIsNotNone(comm_rank)
- comm_size = os.getenv('COMM_SIZE')
- self.assertIsNotNone(comm_size)
- kwargs['comm_rank'] = int(comm_rank)
- kwargs['comm_size'] = int(comm_size)
- with core.DeviceScope(device_option):
- fn(**kwargs)
- workspace.ResetWorkspace()
- def create_common_world(self, comm_rank, comm_size, tmpdir=None, existing_cw=None):
- store_handler = "store_handler"
- # If REDIS_HOST is set, use RedisStoreHandler for rendezvous.
- if existing_cw is None:
- redis_host = os.getenv("REDIS_HOST")
- redis_port = int(os.getenv("REDIS_PORT", 6379))
- if redis_host is not None:
- workspace.RunOperatorOnce(
- core.CreateOperator(
- "RedisStoreHandlerCreate",
- [],
- [store_handler],
- prefix=str(TestCase.test_counter) + "/",
- host=redis_host,
- port=redis_port))
- else:
- workspace.RunOperatorOnce(
- core.CreateOperator(
- "FileStoreHandlerCreate",
- [],
- [store_handler],
- path=tmpdir))
- common_world = "common_world"
- else:
- common_world = str(existing_cw) + ".forked"
- if existing_cw is not None:
- workspace.RunOperatorOnce(
- core.CreateOperator(
- "CloneCommonWorld",
- [existing_cw],
- [common_world],
- sync=True,
- engine=op_engine))
- else:
- workspace.RunOperatorOnce(
- core.CreateOperator(
- "CreateCommonWorld",
- [store_handler],
- [common_world],
- size=comm_size,
- rank=comm_rank,
- sync=True,
- engine=op_engine))
- return (store_handler, common_world)
- def synchronize(self, store_handler, value, comm_rank=None):
- TestCase.sync_counter += 1
- blob = "sync_{}".format(TestCase.sync_counter)
- if comm_rank == 0:
- workspace.FeedBlob(blob, pickle.dumps(value))
- workspace.RunOperatorOnce(
- core.CreateOperator(
- "StoreSet",
- [store_handler, blob],
- []))
- else:
- workspace.RunOperatorOnce(
- core.CreateOperator(
- "StoreGet",
- [store_handler],
- [blob]))
- return pickle.loads(workspace.FetchBlob(blob))
- def _test_broadcast(self,
- comm_rank=None,
- comm_size=None,
- blob_size=None,
- num_blobs=None,
- tmpdir=None,
- use_float16=False,
- ):
- store_handler, common_world = self.create_common_world(
- comm_rank=comm_rank,
- comm_size=comm_size,
- tmpdir=tmpdir)
- blob_size = self.synchronize(
- store_handler,
- blob_size,
- comm_rank=comm_rank)
- num_blobs = self.synchronize(
- store_handler,
- num_blobs,
- comm_rank=comm_rank)
- for i in range(comm_size):
- blobs = []
- for j in range(num_blobs):
- blob = "blob_{}".format(j)
- offset = (comm_rank * num_blobs) + j
- value = np.full(blob_size, offset,
- np.float16 if use_float16 else np.float32)
- workspace.FeedBlob(blob, value)
- blobs.append(blob)
- net = core.Net("broadcast")
- net.Broadcast(
- [common_world] + blobs,
- blobs,
- root=i,
- engine=op_engine)
- workspace.CreateNet(net)
- workspace.RunNet(net.Name())
- for j in range(num_blobs):
- np.testing.assert_array_equal(
- workspace.FetchBlob(blobs[j]),
- i * num_blobs)
- # Run the net a few more times to check the operator
- # works not just the first time it's called
- for _tmp in range(4):
- workspace.RunNet(net.Name())
- @given(comm_size=st.integers(min_value=2, max_value=8),
- blob_size=st.integers(min_value=int(1e3), max_value=int(1e6)),
- num_blobs=st.integers(min_value=1, max_value=4),
- device_option=st.sampled_from([hu.cpu_do]),
- use_float16=st.booleans())
- @settings(deadline=10000)
- def test_broadcast(self, comm_size, blob_size, num_blobs, device_option,
- use_float16):
- TestCase.test_counter += 1
- if os.getenv('COMM_RANK') is not None:
- self.run_test_distributed(
- self._test_broadcast,
- blob_size=blob_size,
- num_blobs=num_blobs,
- use_float16=use_float16,
- device_option=device_option)
- else:
- with TemporaryDirectory() as tmpdir:
- self.run_test_locally(
- self._test_broadcast,
- comm_size=comm_size,
- blob_size=blob_size,
- num_blobs=num_blobs,
- device_option=device_option,
- tmpdir=tmpdir,
- use_float16=use_float16)
- def _test_allreduce(self,
- comm_rank=None,
- comm_size=None,
- blob_size=None,
- num_blobs=None,
- tmpdir=None,
- use_float16=False
- ):
- store_handler, common_world = self.create_common_world(
- comm_rank=comm_rank,
- comm_size=comm_size,
- tmpdir=tmpdir)
- blob_size = self.synchronize(
- store_handler,
- blob_size,
- comm_rank=comm_rank)
- num_blobs = self.synchronize(
- store_handler,
- num_blobs,
- comm_rank=comm_rank)
- blobs = []
- for i in range(num_blobs):
- blob = "blob_{}".format(i)
- value = np.full(blob_size, (comm_rank * num_blobs) + i,
- np.float16 if use_float16 else np.float32)
- workspace.FeedBlob(blob, value)
- blobs.append(blob)
- net = core.Net("allreduce")
- net.Allreduce(
- [common_world] + blobs,
- blobs,
- engine=op_engine)
- workspace.CreateNet(net)
- workspace.RunNet(net.Name())
- for i in range(num_blobs):
- np.testing.assert_array_equal(
- workspace.FetchBlob(blobs[i]),
- (num_blobs * comm_size) * (num_blobs * comm_size - 1) / 2)
- # Run the net a few more times to check the operator
- # works not just the first time it's called
- for _tmp in range(4):
- workspace.RunNet(net.Name())
- def _test_allreduce_multicw(self,
- comm_rank=None,
- comm_size=None,
- tmpdir=None
- ):
- _store_handler, common_world = self.create_common_world(
- comm_rank=comm_rank,
- comm_size=comm_size,
- tmpdir=tmpdir)
- _, common_world2 = self.create_common_world(
- comm_rank=comm_rank,
- comm_size=comm_size,
- tmpdir=tmpdir,
- existing_cw=common_world)
- blob_size = int(1e4)
- num_blobs = 4
- for cw in [common_world, common_world2]:
- blobs = []
- for i in range(num_blobs):
- blob = "blob_{}".format(i)
- value = np.full(blob_size, (comm_rank * num_blobs) + i, np.float32)
- workspace.FeedBlob(blob, value)
- blobs.append(blob)
- net = core.Net("allreduce_multicw")
- net.Allreduce(
- [cw] + blobs,
- blobs,
- engine=op_engine)
- workspace.RunNetOnce(net)
- for i in range(num_blobs):
- np.testing.assert_array_equal(
- workspace.FetchBlob(blobs[i]),
- (num_blobs * comm_size) * (num_blobs * comm_size - 1) / 2)
- @given(comm_size=st.integers(min_value=2, max_value=8),
- blob_size=st.integers(min_value=int(1e3), max_value=int(1e6)),
- num_blobs=st.integers(min_value=1, max_value=4),
- device_option=st.sampled_from([hu.cpu_do]),
- use_float16=st.booleans())
- @settings(deadline=10000)
- def test_allreduce(self, comm_size, blob_size, num_blobs, device_option,
- use_float16):
- TestCase.test_counter += 1
- if os.getenv('COMM_RANK') is not None:
- self.run_test_distributed(
- self._test_allreduce,
- blob_size=blob_size,
- num_blobs=num_blobs,
- use_float16=use_float16,
- device_option=device_option)
- else:
- with TemporaryDirectory() as tmpdir:
- self.run_test_locally(
- self._test_allreduce,
- comm_size=comm_size,
- blob_size=blob_size,
- num_blobs=num_blobs,
- device_option=device_option,
- tmpdir=tmpdir,
- use_float16=use_float16)
- def _test_reduce_scatter(self,
- comm_rank=None,
- comm_size=None,
- blob_size=None,
- num_blobs=None,
- tmpdir=None,
- use_float16=False
- ):
- store_handler, common_world = self.create_common_world(
- comm_rank=comm_rank,
- comm_size=comm_size,
- tmpdir=tmpdir)
- blob_size = self.synchronize(
- store_handler,
- blob_size,
- comm_rank=comm_rank)
- num_blobs = self.synchronize(
- store_handler,
- num_blobs,
- comm_rank=comm_rank)
- blobs = []
- for i in range(num_blobs):
- blob = "blob_{}".format(i)
- value = np.full(blob_size, (comm_rank * num_blobs) + i,
- np.float16 if use_float16 else np.float32)
- workspace.FeedBlob(blob, value)
- blobs.append(blob)
- # Specify distribution among ranks i.e. number of elements
- # scattered/distributed to each process.
- recv_counts = np.zeros(comm_size, dtype=np.int32)
- remaining = blob_size
- chunk_size = (blob_size + comm_size - 1) / comm_size
- for i in range(comm_size):
- recv_counts[i] = min(chunk_size, remaining)
- remaining = remaining - chunk_size if remaining > chunk_size else 0
- recv_counts_blob = "recvCounts"
- workspace.FeedBlob(recv_counts_blob, recv_counts)
- blobs.append(recv_counts_blob)
- net = core.Net("reduce_scatter")
- net.ReduceScatter(
- [common_world] + blobs,
- blobs,
- engine=op_engine)
- workspace.CreateNet(net)
- workspace.RunNet(net.Name())
- for i in range(num_blobs):
- np.testing.assert_array_equal(
- np.resize(workspace.FetchBlob(blobs[i]), recv_counts[comm_rank]),
- (num_blobs * comm_size) * (num_blobs * comm_size - 1) / 2)
- # Run the net a few more times to check the operator
- # works not just the first time it's called
- for _tmp in range(4):
- workspace.RunNet(net.Name())
- @given(comm_size=st.integers(min_value=2, max_value=8),
- blob_size=st.integers(min_value=int(1e3), max_value=int(1e6)),
- num_blobs=st.integers(min_value=1, max_value=4),
- device_option=st.sampled_from([hu.cpu_do]),
- use_float16=st.booleans())
- @settings(deadline=10000)
- def test_reduce_scatter(self, comm_size, blob_size, num_blobs,
- device_option, use_float16):
- TestCase.test_counter += 1
- if os.getenv('COMM_RANK') is not None:
- self.run_test_distributed(
- self._test_reduce_scatter,
- blob_size=blob_size,
- num_blobs=num_blobs,
- use_float16=use_float16,
- device_option=device_option)
- else:
- with TemporaryDirectory() as tmpdir:
- self.run_test_locally(
- self._test_reduce_scatter,
- comm_size=comm_size,
- blob_size=blob_size,
- num_blobs=num_blobs,
- device_option=device_option,
- tmpdir=tmpdir,
- use_float16=use_float16)
- def _test_allgather(self,
- comm_rank=None,
- comm_size=None,
- blob_size=None,
- num_blobs=None,
- tmpdir=None,
- use_float16=False
- ):
- store_handler, common_world = self.create_common_world(
- comm_rank=comm_rank,
- comm_size=comm_size,
- tmpdir=tmpdir)
- blob_size = self.synchronize(
- store_handler,
- blob_size,
- comm_rank=comm_rank)
- num_blobs = self.synchronize(
- store_handler,
- num_blobs,
- comm_rank=comm_rank)
- blobs = []
- for i in range(num_blobs):
- blob = "blob_{}".format(i)
- value = np.full(blob_size, (comm_rank * num_blobs) + i,
- np.float16 if use_float16 else np.float32)
- workspace.FeedBlob(blob, value)
- blobs.append(blob)
- net = core.Net("allgather")
- net.Allgather(
- [common_world] + blobs,
- ["Gathered"],
- engine=op_engine)
- workspace.CreateNet(net)
- workspace.RunNet(net.Name())
- # create expected output
- expected_output = np.array([])
- for i in range(comm_size):
- for j in range(num_blobs):
- value = np.full(blob_size, (i * num_blobs) + j,
- np.float16 if use_float16 else np.float32)
- expected_output = np.concatenate((expected_output, value))
- np.testing.assert_array_equal(
- workspace.FetchBlob("Gathered"), expected_output)
- # Run the net a few more times to check the operator
- # works not just the first time it's called
- for _tmp in range(4):
- workspace.RunNet(net.Name())
- @given(comm_size=st.integers(min_value=2, max_value=8),
- blob_size=st.integers(min_value=int(1e3), max_value=int(1e6)),
- num_blobs=st.integers(min_value=1, max_value=4),
- device_option=st.sampled_from([hu.cpu_do]),
- use_float16=st.booleans())
- @settings(max_examples=10, deadline=None)
- def test_allgather(self, comm_size, blob_size, num_blobs, device_option,
- use_float16):
- TestCase.test_counter += 1
- if os.getenv('COMM_RANK') is not None:
- self.run_test_distributed(
- self._test_allgather,
- blob_size=blob_size,
- num_blobs=num_blobs,
- use_float16=use_float16,
- device_option=device_option)
- else:
- with TemporaryDirectory() as tmpdir:
- self.run_test_locally(
- self._test_allgather,
- comm_size=comm_size,
- blob_size=blob_size,
- num_blobs=num_blobs,
- device_option=device_option,
- tmpdir=tmpdir,
- use_float16=use_float16)
- @given(device_option=st.sampled_from([hu.cpu_do]))
- @settings(deadline=10000)
- def test_forked_cw(self, device_option):
- TestCase.test_counter += 1
- if os.getenv('COMM_RANK') is not None:
- self.run_test_distributed(
- self._test_allreduce_multicw,
- device_option=device_option)
- else:
- # Note: this test exercises the path where we fork a common world.
- # We therefore don't need a comm size larger than 2. It used to be
- # run with comm_size=8, which causes flaky results in a stress run.
- # The flakiness was caused by too many listening sockets being
- # created by Gloo context initialization (8 processes times
- # 7 sockets times 20-way concurrency, plus TIME_WAIT).
- with TemporaryDirectory() as tmpdir:
- self.run_test_locally(
- self._test_allreduce_multicw,
- comm_size=2,
- device_option=device_option,
- tmpdir=tmpdir)
- def _test_barrier(
- self,
- comm_rank=None,
- comm_size=None,
- tmpdir=None,
- ):
- store_handler, common_world = self.create_common_world(
- comm_rank=comm_rank, comm_size=comm_size, tmpdir=tmpdir
- )
- net = core.Net("barrier")
- net.Barrier(
- [common_world],
- [],
- engine=op_engine)
- workspace.CreateNet(net)
- workspace.RunNet(net.Name())
- # Run the net a few more times to check the operator
- # works not just the first time it's called
- for _tmp in range(4):
- workspace.RunNet(net.Name())
- @given(comm_size=st.integers(min_value=2, max_value=8),
- device_option=st.sampled_from([hu.cpu_do]))
- @settings(deadline=10000)
- def test_barrier(self, comm_size, device_option):
- TestCase.test_counter += 1
- if os.getenv('COMM_RANK') is not None:
- self.run_test_distributed(
- self._test_barrier,
- device_option=device_option)
- else:
- with TemporaryDirectory() as tmpdir:
- self.run_test_locally(
- self._test_barrier,
- comm_size=comm_size,
- device_option=device_option,
- tmpdir=tmpdir)
- def _test_close_connection(
- self,
- comm_rank=None,
- comm_size=None,
- tmpdir=None,
- ):
- '''
- One node calls close connection, others wait it on barrier.
- Test will check that all will exit eventually.
- '''
- # Caffe's for closers only:
- # https://www.youtube.com/watch?v=QMFwFgG9NE8
- closer = comm_rank == comm_size // 2,
- store_handler, common_world = self.create_common_world(
- comm_rank=comm_rank, comm_size=comm_size, tmpdir=tmpdir
- )
- net = core.Net("barrier_or_close")
- if not closer:
- net.Barrier(
- [common_world],
- [],
- engine=op_engine)
- else:
- net.DestroyCommonWorld(
- [common_world], [common_world], engine=op_engine)
- # Sleep a bit to ensure others start the barrier
- import time
- time.sleep(0.1)
- workspace.CreateNet(net)
- workspace.RunNet(net.Name())
- @given(comm_size=st.integers(min_value=2, max_value=8),
- device_option=st.sampled_from([hu.cpu_do]))
- @settings(deadline=10000)
- def test_close_connection(self, comm_size, device_option):
- import time
- start_time = time.time()
- TestCase.test_counter += 1
- if os.getenv('COMM_RANK') is not None:
- self.run_test_distributed(
- self._test_close_connection,
- device_option=device_option)
- else:
- with TemporaryDirectory() as tmpdir:
- self.run_test_locally(
- self._test_close_connection,
- comm_size=comm_size,
- device_option=device_option,
- tmpdir=tmpdir)
- # Check that test finishes quickly because connections get closed.
- # This assert used to check that the end to end runtime was less
- # than 2 seconds, but this may not always be the case if there
- # is significant overhead in starting processes. Ideally, this
- # assert is replaced by one that doesn't depend on time but rather
- # checks the success/failure status of the barrier that is run.
- self.assertLess(time.time() - start_time, 20.0)
- def _test_io_error(
- self,
- comm_rank=None,
- comm_size=None,
- tmpdir=None,
- ):
- '''
- Only one node will participate in allreduce, resulting in an IoError
- '''
- store_handler, common_world = self.create_common_world(
- comm_rank=comm_rank,
- comm_size=comm_size,
- tmpdir=tmpdir)
- if comm_rank == 0:
- blob_size = 1000
- num_blobs = 1
- blobs = []
- for i in range(num_blobs):
- blob = "blob_{}".format(i)
- value = np.full(
- blob_size, (comm_rank * num_blobs) + i, np.float32
- )
- workspace.FeedBlob(blob, value)
- blobs.append(blob)
- net = core.Net("allreduce")
- net.Allreduce(
- [common_world] + blobs,
- blobs,
- engine=op_engine)
- workspace.CreateNet(net)
- workspace.RunNet(net.Name())
- @given(comm_size=st.integers(min_value=2, max_value=8),
- device_option=st.sampled_from([hu.cpu_do]))
- @settings(deadline=10000)
- def test_io_error(self, comm_size, device_option):
- TestCase.test_counter += 1
- with self.assertRaises(IoError):
- if os.getenv('COMM_RANK') is not None:
- self.run_test_distributed(
- self._test_io_error,
- device_option=device_option)
- else:
- with TemporaryDirectory() as tmpdir:
- self.run_test_locally(
- self._test_io_error,
- comm_size=comm_size,
- device_option=device_option,
- tmpdir=tmpdir)
- if __name__ == "__main__":
- import unittest
- unittest.main()
|