rendezvous.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. try:
  2. from urllib.parse import urlparse, urlunparse
  3. except ImportError:
  4. raise ImportError(
  5. "urllib cannot be found, urlparse from python2 is no longer supported."
  6. )
  7. import numbers
  8. import os
  9. import sys
  10. from datetime import timedelta
  11. from typing import Dict
  12. import torch._six as six
  13. from torch.distributed import FileStore, PrefixStore, Store, TCPStore
  14. from .constants import default_pg_timeout
  15. _rendezvous_handlers = {}
  16. def register_rendezvous_handler(scheme, handler):
  17. """Registers a new rendezvous handler.
  18. Before we can run collective algorithms, participating processes
  19. need to find each other and exchange information to be able to
  20. communicate. We call this process rendezvous.
  21. The outcome of the rendezvous process is a triplet containing a
  22. shared key/value store, the rank of the process, and the total
  23. number of participating processes.
  24. If none of the bundled rendezvous methods apply to your execution
  25. environment you can opt to register your own rendezvous handler.
  26. Pick a unique name and use the URL scheme to identify it when
  27. calling the `rendezvous()` function.
  28. Args:
  29. scheme (str): URL scheme to identify your rendezvous handler.
  30. handler (function): Handler that is invoked when the
  31. `rendezvous()` function is called with a URL that uses
  32. the corresponding scheme. It must be a generator function
  33. that yields the triplet.
  34. """
  35. global _rendezvous_handlers
  36. if scheme in _rendezvous_handlers:
  37. raise RuntimeError(
  38. "Rendezvous handler for {}:// already registered".format(scheme)
  39. )
  40. _rendezvous_handlers[scheme] = handler
  41. # Query will have format "rank=0&world_size=1" and is
  42. # converted into {"rank": 0, "world_size": 1}
  43. def _query_to_dict(query: str) -> Dict[str, str]:
  44. return dict((pair[0], pair[1]) for pair in (pair.split("=") for pair in filter(None, query.split("&"))))
  45. def rendezvous(url: str, rank: int = -1, world_size: int = -1, **kwargs):
  46. if not isinstance(url, six.string_classes):
  47. raise RuntimeError("`url` must be a string. {}: {}".format(type(url), url))
  48. if not isinstance(rank, numbers.Integral):
  49. raise RuntimeError("`rank` must be an integer. {}".format(rank))
  50. if not isinstance(world_size, numbers.Integral):
  51. raise RuntimeError("`world_size` must be an integer. {}".format(world_size))
  52. # Append node-specific arguments.
  53. result = urlparse(url)
  54. if rank != -1 or world_size != -1:
  55. query_dict = _query_to_dict(result.query)
  56. assert (
  57. "rank" not in query_dict and "world_size" not in query_dict
  58. ), "The url: {url} has node-specific arguments(rank, world_size) already.".format(
  59. url=url
  60. )
  61. if rank != -1:
  62. query_dict["rank"] = rank
  63. if world_size != -1:
  64. query_dict["world_size"] = world_size
  65. result = result._replace(
  66. query="{}".format(
  67. "&".join(["{}={}".format(k, v) for k, v in query_dict.items()])
  68. )
  69. )
  70. url = urlunparse(result)
  71. if result.scheme not in _rendezvous_handlers:
  72. raise RuntimeError("No rendezvous handler for {}://".format(result.scheme))
  73. return _rendezvous_handlers[result.scheme](url, **kwargs)
  74. def _create_store_from_options(backend_options, rank):
  75. result = urlparse(backend_options.init_method)
  76. # If using env initialization, get rank and world_size from env
  77. world_size = -1
  78. if result.scheme == "env":
  79. rank = os.environ.get("RANK", rank)
  80. # Here, the world_size has already beeen initialized to -1 in init_rpc
  81. # If the world_size env variable is also not present then it is a dynamic group
  82. world_size = int(os.environ.get("WORLD_SIZE", world_size))
  83. query_dict = _query_to_dict(result.query)
  84. # if rank is -1 then intentionally exclude rank for the query, error will be thrown later
  85. if rank != -1:
  86. query_dict["rank"] = str(rank)
  87. query_dict["world_size"] = str(world_size)
  88. result = result._replace(
  89. query="{}".format(
  90. "&".join(["{}={}".format(k, v) for k, v in query_dict.items()])
  91. )
  92. )
  93. url = urlunparse(result)
  94. if result.scheme not in _rendezvous_handlers:
  95. raise RuntimeError("No handler for {}://".format(result.scheme))
  96. store, _, _ = next(_rendezvous_handlers[result.scheme](url))
  97. return store
  98. def _rendezvous_error(msg):
  99. return ValueError("Error initializing torch.distributed using " + msg)
  100. def _file_rendezvous_handler(url: str, **kwargs):
  101. def _error(msg):
  102. return _rendezvous_error("file:// rendezvous: " + msg)
  103. result = urlparse(url)
  104. path = result.path
  105. if sys.platform == "win32":
  106. import urllib.request
  107. full_path = result.netloc + result.path
  108. path = urllib.request.url2pathname(full_path)
  109. if path:
  110. # Normalizing an empty string produces ".", which is not expected.
  111. path = os.path.normpath(path)
  112. if not path:
  113. raise _error("path missing")
  114. query_dict = _query_to_dict(result.query)
  115. if "rank" not in query_dict:
  116. raise _error("rank parameter missing")
  117. if "world_size" not in query_dict:
  118. raise _error("world size parameter missing")
  119. rank = int(query_dict["rank"])
  120. world_size = int(query_dict["world_size"])
  121. store = FileStore(path, world_size)
  122. yield (store, rank, world_size)
  123. # If this configuration is invalidated, there is nothing we can do about it
  124. raise RuntimeError("Unable to perform rerendezvous using file:// method")
  125. def _torchelastic_use_agent_store() -> bool:
  126. return os.environ.get("TORCHELASTIC_USE_AGENT_STORE", None) == str(True)
  127. def _create_c10d_store(hostname, port, rank, world_size, timeout) -> Store:
  128. """
  129. Smartly creates a c10d Store object on ``rank`` based on whether
  130. we need to re-use agent store. The TCPStore server is assumed to be hosted
  131. on ``hostname:port``.
  132. If ``torchelastic_use_agent_store()`` is ``True``, then it is assumed that
  133. the agent leader (node rank 0) hosts the TCPStore server (for which the
  134. endpoint is specified by the given ``hostname:port``). Hence
  135. ALL ranks will create and return a TCPStore client (e.g. ``start_daemon=False``).
  136. If ``torchelastic_use_agent_store()`` is ``False``, then rank 0 will host
  137. the TCPStore (with multi-tenancy) and it is assumed that rank 0's hostname
  138. and port are correctly passed via ``hostname`` and ``port``. All
  139. non-zero ranks will create and return a TCPStore client.
  140. """
  141. # check if port is uint16_t
  142. if not 0 <= port < 2**16:
  143. raise ValueError(f"port must have value from 0 to 65535 but was {port}.")
  144. if _torchelastic_use_agent_store():
  145. attempt = os.environ["TORCHELASTIC_RESTART_COUNT"]
  146. tcp_store = TCPStore(hostname, port, world_size, False, timeout)
  147. return PrefixStore(f"/worker/attempt_{attempt}", tcp_store)
  148. else:
  149. start_daemon = rank == 0
  150. return TCPStore(
  151. hostname, port, world_size, start_daemon, timeout, multi_tenant=True
  152. )
  153. def _tcp_rendezvous_handler(
  154. url: str, timeout: timedelta = default_pg_timeout, **kwargs
  155. ):
  156. def _error(msg):
  157. return _rendezvous_error("tcp:// rendezvous: " + msg)
  158. result = urlparse(url)
  159. if not result.port:
  160. raise _error("port number missing")
  161. query_dict = _query_to_dict(result.query)
  162. if "rank" not in query_dict:
  163. raise _error("rank parameter missing")
  164. if "world_size" not in query_dict:
  165. raise _error("world size parameter missing")
  166. rank = int(query_dict["rank"])
  167. world_size = int(query_dict["world_size"])
  168. assert result.hostname is not None
  169. store = _create_c10d_store(result.hostname, result.port, rank, world_size, timeout)
  170. yield (store, rank, world_size)
  171. # If this configuration is invalidated, there is nothing we can do about it
  172. raise RuntimeError("Unable to perform re-rendezvous using tcp:// method")
  173. def _env_rendezvous_handler(
  174. url: str, timeout: timedelta = default_pg_timeout, **kwargs
  175. ):
  176. def _error(msg):
  177. return _rendezvous_error("env:// rendezvous: " + msg)
  178. def _env_error(var):
  179. return _error("environment variable %s expected, but not set" % var)
  180. def _get_env_or_raise(env_var: str) -> str:
  181. env_val = os.environ.get(env_var, None)
  182. if not env_val:
  183. raise _env_error(env_var)
  184. else:
  185. return env_val
  186. result = urlparse(url)
  187. query_dict = _query_to_dict(result.query)
  188. rank: int
  189. world_size: int
  190. master_port: int
  191. master_addr: str
  192. if "rank" in query_dict:
  193. rank = int(query_dict["rank"])
  194. else:
  195. rank = int(_get_env_or_raise("RANK"))
  196. if "world_size" in query_dict:
  197. world_size = int(query_dict["world_size"])
  198. else:
  199. world_size = int(_get_env_or_raise("WORLD_SIZE"))
  200. master_addr = _get_env_or_raise("MASTER_ADDR")
  201. master_port = int(_get_env_or_raise("MASTER_PORT"))
  202. store = _create_c10d_store(master_addr, master_port, rank, world_size, timeout)
  203. yield (store, rank, world_size)
  204. # If this configuration is invalidated, there is nothing we can do about it
  205. raise RuntimeError("Unable to perform re-rendezvous using env:// method")
  206. register_rendezvous_handler("tcp", _tcp_rendezvous_handler)
  207. register_rendezvous_handler("env", _env_rendezvous_handler)
  208. register_rendezvous_handler("file", _file_rendezvous_handler)