| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242 |
- import contextlib
- import io
- import logging
- import os
- import pickle
- import time
- import warnings
- from datetime import timedelta
- from typing import Callable, Dict, Optional, Tuple, Union
- import torch
- from torch._C._distributed_c10d import (
- AllreduceCoalescedOptions,
- AllreduceOptions,
- AllToAllOptions,
- BarrierOptions,
- BroadcastOptions,
- GatherOptions,
- PrefixStore,
- ProcessGroup,
- ReduceOp,
- ReduceOptions,
- ReduceScatterOptions,
- ScatterOptions,
- Store,
- DebugLevel,
- get_debug_level,
- )
- from torch._six import string_classes
- from .constants import default_pg_timeout
- from .rendezvous import register_rendezvous_handler, rendezvous # noqa: F401
- # This module is wildcard imported from torch.distributed.
- # TODO: specify __all__
- _MPI_AVAILABLE = True
- _NCCL_AVAILABLE = True
- _GLOO_AVAILABLE = True
- _pickler = pickle.Pickler
- _unpickler = pickle.Unpickler
- try:
- from torch._C._distributed_c10d import ProcessGroupMPI
- except ImportError:
- _MPI_AVAILABLE = False
- try:
- from torch._C._distributed_c10d import ProcessGroupNCCL
- except ImportError:
- _NCCL_AVAILABLE = False
- try:
- from torch._C._distributed_c10d import ProcessGroupGloo
- from torch._C._distributed_c10d import _ProcessGroupWrapper
- except ImportError:
- _GLOO_AVAILABLE = False
- logger = logging.getLogger(__name__)
- PG_WRAPPER_STORE_PREFIX = "pg_wrapper"
- # Some reduce ops are not supported by complex numbers and will result in an error.
- # We currently provide complex support to the distributed API by viewing
- # complex tensors as real (torch.view_as_real), meaning that calling
- # these unsupported ops will return garbage values rather than error out.
- # (e.g. max(2+3i, 3+2i) = 3+3i)
- # We'd like calls to unsupported ops to error out accordingly,
- # rather than returning garbage values.
- def supports_complex(reduceOp: ReduceOp) -> bool:
- denyList = [
- ReduceOp.MAX,
- ReduceOp.MIN,
- ReduceOp.PRODUCT,
- ReduceOp.BAND,
- ReduceOp.BOR,
- ReduceOp.BXOR,
- ]
- return reduceOp not in denyList
- class Backend(object):
- """
- An enum-like class of available backends: GLOO, NCCL, MPI, and other registered
- backends.
- The values of this class are lowercase strings, e.g., ``"gloo"``. They can
- be accessed as attributes, e.g., ``Backend.NCCL``.
- This class can be directly called to parse the string, e.g.,
- ``Backend(backend_str)`` will check if ``backend_str`` is valid, and
- return the parsed lowercase string if so. It also accepts uppercase strings,
- e.g., ``Backend("GLOO")`` returns ``"gloo"``.
- .. note:: The entry ``Backend.UNDEFINED`` is present but only used as
- initial value of some fields. Users should neither use it directly
- nor assume its existence.
- """
- UNDEFINED = "undefined"
- GLOO = "gloo"
- NCCL = "nccl"
- MPI = "mpi"
- TCP = "tcp"
- _plugins: Dict[str, Callable] = {}
- def __new__(cls, name: str):
- if not isinstance(name, string_classes):
- raise ValueError("Backend name must be a string, but got: {}".format(name))
- value = getattr(Backend, name.upper(), Backend.UNDEFINED)
- if value == Backend.TCP:
- raise ValueError(
- "TCP backend has been deprecated. Please use "
- "Gloo or MPI backend for collective operations "
- "on CPU tensors."
- )
- elif value == Backend.UNDEFINED:
- raise ValueError("Invalid backend: '{}'".format(name))
- elif value != Backend.GLOO and value != Backend.NCCL and value != Backend.MPI:
- value = name.lower()
- return value
- @classmethod
- def register_backend(cls, name, func):
- """
- Registers a new backend with the given name and instantiating function.
- This class method is used by 3rd party ``ProcessGroup`` extension to
- register new backends.
- Args:
- name (str): Backend name of the ``ProcessGroup`` extension. It
- should match the one in ``init_process_group()``.
- func (function): Function handler that instantiates the backend.
- The function should be implemented in the backend
- extension and takes four arguments, including
- ``store``, ``rank``, ``world_size``, and ``timeout``.
- .. note:: This support of 3rd party backend is experimental and subject to change.
- """
- assert not hasattr(Backend, name.upper()), (
- f"{name.upper()} c10d backend already exist"
- )
- assert name.upper() not in Backend._plugins, (
- f"{name.upper()} c10d backend creator function already exist"
- )
- setattr(Backend, name.upper(), name.upper())
- Backend._plugins[name.upper()] = func
- # `_backend`, `dist_backend`, and `reduce_op` are here to maintain backward
- # compatibility with pre-c10d distributed package.
- # TODO: remove them when users are ready to take a hard dependency on PyTorch 1.
- _backend: str = Backend.UNDEFINED
- dist_backend = Backend
- class _reduce_op(object):
- r"""
- Deprecated enum-like class for reduction operations: ``SUM``, ``PRODUCT``,
- ``MIN``, and ``MAX``.
- :class:`~torch.distributed.ReduceOp` is recommended to use instead.
- """
- def __init__(self):
- # __members__ is a dict storing key-value pairs for enum classes
- for k, v in ReduceOp.__members__.items():
- setattr(self, k, v)
- self.__members__ = ReduceOp.__members__
- def __getattribute__(self, key):
- warnings.warn(
- "torch.distributed.reduce_op is deprecated, please use "
- "torch.distributed.ReduceOp instead"
- )
- return object.__getattribute__(self, key)
- reduce_op = _reduce_op()
- class group(object):
- # Points to the default PG once initialized.
- WORLD: Optional[ProcessGroup] = None
- class GroupMember(object):
- # Alias to group.WORLD for backward compatibility
- WORLD = group.WORLD
- NON_GROUP_MEMBER = object()
- # Cached process groups
- # For NCCL and GLOO pg, it is a map from ProcessGroup to (Backend, Store)
- # For MPI pg, it is a map from ProcessGroup to (Backend, None)
- _pg_map: Dict[ProcessGroup, Tuple[str, Optional[Store]]] = {}
- # Process group's names, map from ProcessGroup to str
- _pg_names: Dict[ProcessGroup, str] = {}
- # Process group's global rank to local rank mapping
- _pg_group_ranks: Dict[ProcessGroup, Dict[int, int]] = {}
- # Default process group state
- _default_pg_init_method = None
- # Process group count for default naming
- _group_count = 0
- STORE_BASED_BARRIER_PREFIX = "store_based_barrier_key"
- def _store_based_barrier(rank, store, timeout):
- """
- Barrier based on store which is used for synchronizing processes after
- ``init_process_group`` or ``new_group``. Intended to be used only with
- those two methods and is not a generic alternative to ``barrier()``.
- """
- store_key = "{}:{}".format(STORE_BASED_BARRIER_PREFIX, _group_count)
- store.add(store_key, 1)
- logger.info("Added key: {} to store for rank: {}".format(store_key, rank))
- # Now wait for all workers to check in with the store.
- world_size = get_world_size()
- # Use 'add' instead of 'get' since for some store implementations 'add'
- # doesn't work well with 'get'. Ideally the store implementations should
- # be fixed, but for backward compatiblity reasons it is risky to change
- # the store implementations. Once, we completely migrate away from these
- # legacy stores, we can use 'get' here instead.
- worker_count = store.add(store_key, 0)
- start = time.time()
- log_time = time.time()
- while worker_count != world_size:
- time.sleep(0.01)
- worker_count = store.add(store_key, 0)
- # Print status periodically to keep track.
- if timedelta(seconds=(time.time() - log_time)) > timedelta(seconds=10):
- logger.info(
- "Waiting in store based barrier to initialize process group for "
- "rank: {}, key: {} (world_size={}, worker_count={}, timeout={})".format(
- rank, store_key, world_size, worker_count, timeout
- )
- )
- log_time = time.time()
- if timedelta(seconds=(time.time() - start)) > timeout:
- raise RuntimeError(
- "Timed out initializing process group in store based barrier on "
- "rank: {}, for key: {} (world_size={}, worker_count={}, timeout={})".format(
- rank, store_key, world_size, worker_count, timeout
- )
- )
- logger.info(
- f"Rank {rank}: Completed store-based barrier for key:{store_key} with {world_size} nodes."
- )
- def _rank_not_in_group(group: ProcessGroup):
- """
- Helper that checks if the current process's rank is not in a given group.
- """
- if group is None:
- return False
- return group == GroupMember.NON_GROUP_MEMBER
- def _warn_not_in_group(op_name):
- global_rank = -1 if GroupMember.WORLD is None else GroupMember.WORLD.rank()
- warnings.warn(
- f"Running {op_name} on global rank {global_rank} which does not "
- "belong to the given group."
- )
- def _get_group_rank(group: ProcessGroup, rank):
- """
- Helper that gets a given group's local rank in the group from a given global
- rank.
- """
- if group is GroupMember.WORLD:
- raise RuntimeError(
- "group.WORLD does not have local rank to global " "rank mapping"
- )
- if group not in _pg_group_ranks:
- raise RuntimeError("The given group does not exist")
- try:
- group_rank = _pg_group_ranks[group][rank]
- except KeyError:
- raise RuntimeError(
- f"The global rank {rank} is not part of the group {group}"
- ) from None
- return group_rank
- def _get_global_rank(group, group_rank):
- """
- Helper that gets a given group's global rank from a given local rank in the
- group.
- """
- if group is GroupMember.WORLD:
- raise RuntimeError(
- "group.WORLD does not have local rank to global " "rank mapping"
- )
- group_rank_map = _pg_group_ranks[group]
- for rank, grp_rank in group_rank_map.items():
- if grp_rank == group_rank:
- return rank
- raise RuntimeError("The group rank is not part of the group")
- def _get_group_size(group):
- """
- Helper that gets a given group's world size.
- """
- if group is GroupMember.WORLD or group is None:
- default_pg = _get_default_group()
- return default_pg.size()
- return group.size()
- def _check_single_tensor(param, param_name):
- """
- Helper to check that the parameter ``param_name`` is a single tensor.
- """
- if not isinstance(param, torch.Tensor):
- raise RuntimeError(
- "Invalid function argument. Expected parameter `{}` "
- "to be of type torch.Tensor.".format(param_name)
- )
- def _check_tensor_list(param, param_name):
- """
- Helper to check that the parameter ``param_name`` is a list of tensors.
- """
- if not isinstance(param, list) or not all(
- isinstance(p, torch.Tensor) for p in param
- ):
- raise RuntimeError(
- "Invalid function argument. Expected parameter `{}` "
- "to be of type List[torch.Tensor].".format(param_name)
- )
- def _check_op(op):
- """
- Helper to check that the ``op`` is either isend or irecv.
- """
- if op not in [isend, irecv]:
- raise RuntimeError(
- "Invalid ``op``. Expected ``op`` "
- "to be of type ``torch.distributed.isend`` or "
- "``torch.distributed.irecv``."
- )
- def _check_p2p_op_list(p2p_op_list):
- """
- Helper to check that the ``p2p_op_list`` is a list of P2POp instances and
- all ops use the same backend.
- """
- if not isinstance(p2p_op_list, list) or not all(
- isinstance(p2p_op, P2POp) for p2p_op in p2p_op_list
- ):
- raise RuntimeError(
- "Invalid ``p2p_op_list``. Each op is expected to "
- "to be of type ``torch.distributed.P2POp``."
- )
- backend = get_backend(p2p_op_list[0].group)
- if not all(backend == get_backend(p2p_op.group) for p2p_op in p2p_op_list):
- raise RuntimeError("All groups need to use the same backend.")
- def is_mpi_available():
- """
- Checks if the MPI backend is available.
- """
- return _MPI_AVAILABLE
- def is_nccl_available():
- """
- Checks if the NCCL backend is available.
- """
- return _NCCL_AVAILABLE
- def is_gloo_available():
- """
- Checks if the Gloo backend is available.
- """
- return _GLOO_AVAILABLE
- def is_initialized():
- """
- Checking if the default process group has been initialized
- """
- return GroupMember.WORLD is not None
- def is_torchelastic_launched():
- """
- Checks whether this process was launched with ``torch.distributed.elastic``
- (aka torchelastic). The existence of ``TORCHELASTIC_RUN_ID`` environment
- variable is used as a proxy to determine whether the current process
- was launched with torchelastic. This is a reasonable proxy since
- ``TORCHELASTIC_RUN_ID`` maps to the rendezvous id which is always a
- non-null value indicating the job id for peer discovery purposes..
- """
- return os.getenv("TORCHELASTIC_RUN_ID") is not None
- def _get_default_group():
- """
- Getting the default process group created by init_process_group
- """
- if not is_initialized():
- raise RuntimeError(
- "Default process group has not been initialized, "
- "please make sure to call init_process_group."
- )
- return GroupMember.WORLD
- def _get_default_store():
- """
- Getting the default store created by init_process_group
- """
- if not is_initialized():
- raise RuntimeError(
- "Default process group has not been initialized, "
- "please make sure to call init_process_group."
- )
- default_pg = _get_default_group()
- _, default_store = _pg_map[default_pg]
- return default_store
- def _update_default_pg(pg):
- GroupMember.WORLD = group.WORLD = pg
- def get_backend(group=None):
- """
- Returns the backend of the given process group.
- Args:
- group (ProcessGroup, optional): The process group to work on. The
- default is the general main process group. If another specific group
- is specified, the calling process must be part of :attr:`group`.
- Returns:
- The backend of the given process group as a lower case string.
- """
- if group is None:
- pg = _get_default_group()
- else:
- pg = group
- if _rank_not_in_group(pg):
- raise RuntimeError("Invalid process group specified")
- pg_store = _pg_map.get(pg, None)
- assert pg_store is not None
- return pg_store[0]
- def init_process_group(
- backend,
- init_method=None,
- timeout=default_pg_timeout,
- world_size=-1,
- rank=-1,
- store=None,
- group_name="",
- pg_options=None,
- ):
- """
- Initializes the default distributed process group, and this will also
- initialize the distributed package.
- There are 2 main ways to initialize a process group:
- 1. Specify ``store``, ``rank``, and ``world_size`` explicitly.
- 2. Specify ``init_method`` (a URL string) which indicates where/how
- to discover peers. Optionally specify ``rank`` and ``world_size``,
- or encode all required parameters in the URL and omit them.
- If neither is specified, ``init_method`` is assumed to be "env://".
- Args:
- backend (str or Backend): The backend to use. Depending on
- build-time configurations, valid values include ``mpi``, ``gloo``,
- and ``nccl``. This field should be given as a lowercase string
- (e.g., ``"gloo"``), which can also be accessed via
- :class:`Backend` attributes (e.g., ``Backend.GLOO``). If using
- multiple processes per machine with ``nccl`` backend, each process
- must have exclusive access to every GPU it uses, as sharing GPUs
- between processes can result in deadlocks.
- init_method (str, optional): URL specifying how to initialize the
- process group. Default is "env://" if no
- ``init_method`` or ``store`` is specified.
- Mutually exclusive with ``store``.
- world_size (int, optional): Number of processes participating in
- the job. Required if ``store`` is specified.
- rank (int, optional): Rank of the current process (it should be a
- number between 0 and ``world_size``-1).
- Required if ``store`` is specified.
- store(Store, optional): Key/value store accessible to all workers, used
- to exchange connection/address information.
- Mutually exclusive with ``init_method``.
- timeout (timedelta, optional): Timeout for operations executed against
- the process group. Default value equals 30 minutes.
- This is applicable for the ``gloo`` backend. For ``nccl``, this is
- applicable only if the environment variable ``NCCL_BLOCKING_WAIT``
- or ``NCCL_ASYNC_ERROR_HANDLING`` is set to 1. When
- ``NCCL_BLOCKING_WAIT`` is set, this is the duration for which the
- process will block and wait for collectives to complete before
- throwing an exception. When ``NCCL_ASYNC_ERROR_HANDLING`` is set,
- this is the duration after which collectives will be aborted
- asynchronously and the process will crash. ``NCCL_BLOCKING_WAIT``
- will provide errors to the user which can be caught and handled,
- but due to its blocking nature, it has a performance overhead. On
- the other hand, ``NCCL_ASYNC_ERROR_HANDLING`` has very little
- performance overhead, but crashes the process on errors. This is
- done since CUDA execution is async and it is no longer safe to
- continue executing user code since failed async NCCL operations
- might result in subsequent CUDA operations running on corrupted
- data. Only one of these two environment variables should be set.
- group_name (str, optional, deprecated): Group name.
- pg_options (ProcessGroupOptions, optional): process group options
- specifying what additional options need to be passed in during
- the construction of specific process groups. As of now, the only
- options we support is ``ProcessGroupNCCL.Options`` for the ``nccl``
- backend, ``is_high_priority_stream`` can be specified so that
- the nccl backend can pick up high priority cuda streams when
- there're compute kernels waiting.
- .. note:: To enable ``backend == Backend.MPI``, PyTorch needs to be built from source
- on a system that supports MPI.
- """
- global _pg_group_ranks
- global _backend
- global _default_pg_init_method
- if not isinstance(timeout, timedelta):
- raise RuntimeError(
- "Expected timeout argument to be of type" "datetime.timedelta"
- )
- if GroupMember.WORLD is not None:
- raise RuntimeError("trying to initialize the default process group " "twice!")
- assert (store is None) or (
- init_method is None
- ), "Cannot specify both init_method and store."
- if store is not None:
- assert world_size > 0, "world_size must be positive if using store"
- assert rank >= 0, "rank must be non-negative if using store"
- elif init_method is None:
- init_method = "env://"
- backend = Backend(backend)
- if backend == Backend.MPI:
- if world_size != -1 or rank != -1:
- warnings.warn(
- "For MPI backend, world_size ({}) and rank ({}) "
- "are ignored since they are assigned by the "
- "MPI runtime.".format(world_size, rank)
- )
- default_pg = _new_process_group_helper(
- -1, -1, [], Backend.MPI, None, group_name=group_name, timeout=timeout
- )
- _update_default_pg(default_pg)
- else:
- # backward compatible API
- if store is None:
- rendezvous_iterator = rendezvous(
- init_method, rank, world_size, timeout=timeout
- )
- store, rank, world_size = next(rendezvous_iterator)
- store.set_timeout(timeout)
- # Use a PrefixStore to avoid accidental overrides of keys used by
- # different systems (e.g. RPC) in case the store is multi-tenant.
- store = PrefixStore("default_pg", store)
- default_pg = _new_process_group_helper(
- world_size,
- rank,
- [],
- backend,
- store,
- pg_options=pg_options,
- group_name=group_name,
- timeout=timeout,
- )
- _update_default_pg(default_pg)
- _pg_group_ranks[GroupMember.WORLD] = {i: i for i in range(GroupMember.WORLD.size())} # type: ignore[attr-defined, index]
- _backend = _pg_map[GroupMember.WORLD][0] # type: ignore[index]
- _default_pg_init_method = init_method
- # barrier at the end to ensure that once we return from this method, all
- # process groups including global variables are updated correctly on all
- # ranks.
- if backend == Backend.MPI:
- # MPI backend doesn't use store.
- barrier()
- else:
- # Use store based barrier here since barrier() used a bunch of
- # default devices and messes up NCCL internal state.
- _store_based_barrier(rank, store, timeout)
- # Set sequence numbers for gloo and nccl process groups.
- if get_backend(default_pg) in [Backend.GLOO, Backend.NCCL]:
- default_pg._set_sequence_number_for_group()
- def _new_process_group_helper(
- world_size,
- rank,
- group_ranks,
- backend,
- store,
- pg_options=None,
- group_name=None,
- timeout=default_pg_timeout,
- ):
- """
- Create a new distributed process group.
- This function must be called by ALL processes in the global group, even if
- the calling process is not part of the newly created group. In that case,
- this function returns GroupMember.NON_GROUP_MEMBER.
- This function is called with ``group_ranks == []`` for the default group.
- """
- global _pg_map
- global _group_count
- global _pg_names
- if not group_name:
- group_name = str(_group_count)
- _group_count += 1
- if group_name in _pg_names.values():
- raise RuntimeError(
- "The specified group name has already been "
- "created, please use a different group name"
- )
- if not isinstance(timeout, timedelta):
- raise RuntimeError(
- "Expected timeout argument to be of type" "datetime.timedelta"
- )
- # The list of group ranks is empty if we're creating the default group.
- is_default_group = len(group_ranks) == 0
- backend = Backend(backend)
- pg: Union[ProcessGroupGloo, ProcessGroupMPI, ProcessGroupNCCL]
- if backend == Backend.MPI:
- if not is_mpi_available():
- raise RuntimeError(
- "Distributed package doesn't have MPI built in."
- " MPI is only included if you build PyTorch from"
- " source on a host that has MPI installed."
- )
- pg = ProcessGroupMPI.create(group_ranks)
- if not pg:
- return GroupMember.NON_GROUP_MEMBER
- _pg_map[pg] = (Backend.MPI, None)
- _pg_names[pg] = group_name
- else:
- # If this is a subgroup (which means group_ranks is specified),
- # we check if the current process is a member of the new group.
- if not is_default_group:
- global_rank = _get_default_group().rank()
- if global_rank not in group_ranks:
- return GroupMember.NON_GROUP_MEMBER
- # Use the group name as prefix in the default store, such that
- # a single store can be reused by multiple groups.
- prefix_store = PrefixStore(group_name, store)
- if backend == Backend.GLOO:
- if pg_options is not None:
- raise RuntimeError("GLOO options not supported")
- pg = ProcessGroupGloo(prefix_store, rank, world_size, timeout=timeout)
- # In debug mode and if GLOO is available, wrap in a wrapper PG that
- # enables enhanced collective checking for debugability.
- if get_debug_level() == DebugLevel.DETAIL:
- if not _GLOO_AVAILABLE:
- logger.info(
- """TORCH_DISTRIBUTED_DEBUG was set to DETAIL, but
- GLOO is not available. Build with Gloo to
- create a wrapper process group in debug mode
- to aid collective desynchronization debugging."""
- )
- else:
- pg = _create_process_group_wrapper(
- wrapped_pg=pg,
- store_prefix=group_name,
- store=store,
- rank=rank,
- world_size=world_size,
- timeout=timeout,
- )
- _pg_map[pg] = (Backend.GLOO, store)
- _pg_names[pg] = group_name
- elif backend == Backend.NCCL:
- if not is_nccl_available():
- raise RuntimeError("Distributed package doesn't have NCCL " "built in")
- if pg_options is not None:
- assert isinstance(
- pg_options, ProcessGroupNCCL.Options
- ), "Expected pg_options argument to be of type ProcessGroupNCCL.Options"
- else:
- # default pg_options for NCCL
- pg_options = ProcessGroupNCCL.Options()
- pg_options.is_high_priority_stream = False
- pg_options._timeout = timeout
- pg = ProcessGroupNCCL(prefix_store, rank, world_size, pg_options)
- # In debug mode and if GLOO is available, wrap in a wrapper PG that
- # enables enhanced collective checking for debugability.
- if get_debug_level() == DebugLevel.DETAIL:
- if not _GLOO_AVAILABLE:
- logger.info(
- """TORCH_DISTRIBUTED_DEBUG was set to DETAIL, but
- GLOO is not available. Build with Gloo to
- create a wrapper process group in debug mode
- to aid collective desynchronization debugging."""
- )
- else:
- pg = _create_process_group_wrapper(
- wrapped_pg=pg,
- store_prefix=group_name,
- store=store,
- rank=rank,
- world_size=world_size,
- timeout=timeout,
- )
- _pg_map[pg] = (Backend.NCCL, store)
- _pg_names[pg] = group_name
- else:
- assert backend.upper() in Backend._plugins, (
- f"unknown c10d backend type {backend.upper()}"
- )
- pg = Backend._plugins[backend.upper()](
- prefix_store, rank, world_size, timeout
- )
- _pg_map[pg] = (backend, store)
- _pg_names[pg] = group_name
- return pg
- def destroy_process_group(group=None):
- """
- Destroy a given process group, and deinitialize the distributed package
- Args:
- group (ProcessGroup, optional): The process group to be destroyed, if
- group.WORLD is given, all process
- groups including the default one will
- be destroyed.
- """
- global _pg_map
- global _pg_names
- global _pg_group_ranks
- global _default_pg_init_method
- global _group_count
- if group == GroupMember.NON_GROUP_MEMBER:
- return
- if group is None:
- pg = GroupMember.WORLD
- else:
- pg = group
- assert pg is not None
- if _pg_map.get(pg, None) is None:
- raise RuntimeError("Invalid process group specified")
- if group is None or group == GroupMember.WORLD:
- _update_default_pg(None)
- _default_pg_init_method = None
- _pg_map.clear()
- _pg_names.clear()
- _pg_group_ranks.clear()
- # when process group doesn't have an explicit name (only WORLD (default)
- # process group can have an explicit name), we use global _group_counter
- # to generate the name. We need to reset the counter on destruction to
- # allow consistent value to be generated when we re-create process
- # groups after some trainers recover from failure
- #
- # We only reset this when WORLD is being destroyed because if this
- # process group is in good state, we aren't dealing with failures.
- _group_count = 0
- else:
- del _pg_map[pg]
- del _pg_names[pg]
- del _pg_group_ranks[pg]
- def get_rank(group=None):
- """
- Returns the rank of the current process in the provided ``group`` or the
- default group if none was provided.
- Rank is a unique identifier assigned to each process within a distributed
- process group. They are always consecutive integers ranging from 0 to
- ``world_size``.
- Args:
- group (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used.
- Returns:
- The rank of the process group
- -1, if not part of the group
- """
- if _rank_not_in_group(group):
- return -1
- default_pg = _get_default_group()
- if group is None or group is GroupMember.WORLD:
- return default_pg.rank()
- return _get_group_rank(group, default_pg.rank())
- def get_world_size(group=None):
- """
- Returns the number of processes in the current process group
- Args:
- group (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used.
- Returns:
- The world size of the process group
- -1, if not part of the group
- """
- if _rank_not_in_group(group):
- return -1
- return _get_group_size(group)
- def isend(tensor, dst, group=None, tag=0):
- """
- Sends a tensor asynchronously.
- .. warning::
- Modifying ``tensor`` before the request completes causes undefined
- behavior.
- Args:
- tensor (Tensor): Tensor to send.
- dst (int): Destination rank.
- group (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used.
- tag (int, optional): Tag to match send with remote recv
- Returns:
- A distributed request object.
- None, if not part of the group
- """
- _check_single_tensor(tensor, "tensor")
- if _rank_not_in_group(group):
- _warn_not_in_group("isend")
- return
- if group is None or group is GroupMember.WORLD:
- default_pg = _get_default_group()
- return default_pg.send([tensor], dst, tag)
- else:
- group_dst_rank = _get_group_rank(group, dst)
- return group.send([tensor], group_dst_rank, tag)
- def irecv(tensor, src=None, group=None, tag=0):
- """
- Receives a tensor asynchronously.
- Args:
- tensor (Tensor): Tensor to fill with received data.
- src (int, optional): Source rank. Will receive from any
- process if unspecified.
- group (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used.
- tag (int, optional): Tag to match recv with remote send
- Returns:
- A distributed request object.
- None, if not part of the group
- """
- _check_single_tensor(tensor, "tensor")
- if _rank_not_in_group(group):
- _warn_not_in_group("irecv")
- return
- if group is None or group is GroupMember.WORLD:
- pg = _get_default_group()
- else:
- pg = group
- if src is None:
- return pg.recv_anysource([tensor], tag)
- else:
- if pg is GroupMember.WORLD:
- return pg.recv([tensor], src, tag)
- else:
- group_src_rank = _get_group_rank(pg, src)
- return pg.recv([tensor], group_src_rank, tag)
- def send(tensor, dst, group=None, tag=0):
- """
- Sends a tensor synchronously.
- Args:
- tensor (Tensor): Tensor to send.
- dst (int): Destination rank.
- group (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used.
- tag (int, optional): Tag to match send with remote recv
- """
- _check_single_tensor(tensor, "tensor")
- if _rank_not_in_group(group):
- _warn_not_in_group("send")
- return
- if group is None or group is GroupMember.WORLD:
- default_pg = _get_default_group()
- default_pg.send([tensor], dst, tag).wait()
- else:
- group_dst_rank = _get_group_rank(group, dst)
- group.send([tensor], group_dst_rank, tag).wait()
- def recv(tensor, src=None, group=None, tag=0):
- """
- Receives a tensor synchronously.
- Args:
- tensor (Tensor): Tensor to fill with received data.
- src (int, optional): Source rank. Will receive from any
- process if unspecified.
- group (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used.
- tag (int, optional): Tag to match recv with remote send
- Returns:
- Sender rank
- -1, if not part of the group
- """
- _check_single_tensor(tensor, "tensor")
- if _rank_not_in_group(group):
- _warn_not_in_group("recv")
- return -1
- if group is None:
- pg = _get_default_group()
- else:
- pg = group
- if src is None:
- work = pg.recv_anysource([tensor], tag)
- work.wait()
- src_rank = work._source_rank()
- if group is None or group is GroupMember.WORLD:
- return src_rank
- else:
- return _get_global_rank(pg, src_rank)
- else:
- if group is None or group is GroupMember.WORLD:
- pg.recv([tensor], src, tag).wait()
- else:
- group_src_rank = _get_group_rank(pg, src)
- pg.recv([tensor], group_src_rank, tag).wait()
- return src
- class P2POp(object):
- """
- A class to build point-to-point operations for ``batch_isend_irecv``.
- This class builds the type of P2P operation, communication buffer, peer rank,
- Process Group group, and tag. Instances of this class will be passed to
- ``batch_isend_irecv`` for point-to-point communications.
- Args:
- op (callable): A function to send data to or receive data from a peer process.
- The type of ``op`` is either ``torch.distributed.isend`` or
- ``torch.distributed.irecv``.
- tensor (Tensor): Tensor to send or receive.
- peer (int): Destination or source rank.
- group (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used.
- tag (int, optional): Tag to match send with recv.
- """
- def __init__(self, op, tensor, peer, group=None, tag=0):
- self.op = op
- self.tensor = tensor
- self.peer = peer
- self.group = group
- self.tag = tag
- def __new__(cls, op, tensor, peer, group=None, tag=0):
- _check_op(op)
- _check_single_tensor(tensor, "tensor")
- return object.__new__(cls)
- @contextlib.contextmanager
- def _batch_p2p_manager(backend):
- if backend == Backend.NCCL:
- ProcessGroupNCCL._group_start()
- try:
- yield
- finally:
- if backend == Backend.NCCL:
- ProcessGroupNCCL._group_end()
- def batch_isend_irecv(p2p_op_list):
- """
- Send or Receive a batch of tensors asynchronously and return a list of requests.
- Process each of the operations in ``p2p_op_list`` and return the corresponding
- requests. NCCL and Gloo backend are currently supported.
- Args:
- p2p_op_list: A list of point-to-point operations(type of each operator is
- ``torch.distributed.P2POp``). The order of the isend/irecv in the list
- matters and it needs to match with corresponding isend/irecv on the
- remote end.
- Returns:
- A list of distributed request objects returned by calling the corresponding
- op in the op_list.
- Examples:
- >>> send_tensor = torch.arange(2) + 2 * rank
- >>> recv_tensor = torch.randn(2)
- >>> send_op = dist.P2POp(dist.isend, send_tensor, (rank + 1)%world_size)
- >>> recv_op = dist.P2POp(dist.irecv, recv_tensor, (rank - 1 + world_size)%world_size)
- >>> reqs = batch_isend_irecv([send_op, recv_op])
- >>> for req in reqs:
- >>> req.wait()
- >>> recv_tensor
- tensor([2, 3]) # Rank 0
- tensor([0, 1]) # Rank 1
- .. note:: Note that when this API is used with the NCCL PG backend, users must set
- the current GPU device with `torch.cuda.set_device`, otherwise it will
- lead to unexpected hang issues.
- In addition, if this API is the first collective call in the ``group``
- passed to ``dist.P2POp``, all ranks of the ``group`` must participate in
- this API call; otherwise, the behavior is undefined. If this API call is
- not the first collective call in the ``group``, batched P2P operations
- involving only a subset of ranks of the ``group`` are allowed.
- """
- _check_p2p_op_list(p2p_op_list)
- backend = get_backend(p2p_op_list[0].group)
- reqs = []
- with _batch_p2p_manager(backend):
- for p2p_op in p2p_op_list:
- op = p2p_op.op
- tensor = p2p_op.tensor
- peer = p2p_op.peer
- curr_group = p2p_op.group
- tag = p2p_op.tag
- ret = op(tensor, peer, curr_group, tag)
- if ret is not None:
- reqs.append(ret)
- return reqs
- def broadcast_multigpu(tensor_list, src, group=None, async_op=False, src_tensor=0):
- """
- Broadcasts the tensor to the whole group with multiple GPU tensors
- per node.
- ``tensor`` must have the same number of elements in all the GPUs from
- all processes participating in the collective. each tensor in the list must
- be on a different GPU
- Only nccl and gloo backend are currently supported
- tensors should only be GPU tensors
- Args:
- tensor_list (List[Tensor]): Tensors that participate in the collective
- operation. If ``src`` is the rank, then the specified ``src_tensor``
- element of ``tensor_list`` (``tensor_list[src_tensor]``) will be
- broadcast to all other tensors (on different GPUs) in the src process
- and all tensors in ``tensor_list`` of other non-src processes.
- You also need to make sure that ``len(tensor_list)`` is the same
- for all the distributed processes calling this function.
- src (int): Source rank.
- group (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used.
- async_op (bool, optional): Whether this op should be an async op
- src_tensor (int, optional): Source tensor rank within ``tensor_list``
- Returns:
- Async work handle, if async_op is set to True.
- None, if not async_op or if not part of the group
- """
- if _rank_not_in_group(group):
- _warn_not_in_group("broadcast_multigpu")
- return
- opts = BroadcastOptions()
- opts.rootRank = src
- opts.rootTensor = src_tensor
- if group is None or group is GroupMember.WORLD:
- default_pg = _get_default_group()
- work = default_pg.broadcast(tensor_list, opts)
- else:
- group_src_rank = _get_group_rank(group, src)
- opts.rootRank = group_src_rank
- work = group.broadcast(tensor_list, opts)
- if async_op:
- return work
- else:
- work.wait()
- def broadcast(tensor, src, group=None, async_op=False):
- """
- Broadcasts the tensor to the whole group.
- ``tensor`` must have the same number of elements in all processes
- participating in the collective.
- Args:
- tensor (Tensor): Data to be sent if ``src`` is the rank of current
- process, and tensor to be used to save received data otherwise.
- src (int): Source rank.
- group (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used.
- async_op (bool, optional): Whether this op should be an async op
- Returns:
- Async work handle, if async_op is set to True.
- None, if not async_op or if not part of the group
- """
- _check_single_tensor(tensor, "tensor")
- if _rank_not_in_group(group):
- _warn_not_in_group("broadcast")
- return
- opts = BroadcastOptions()
- opts.rootRank = src
- opts.rootTensor = 0
- if group is None or group is GroupMember.WORLD:
- default_pg = _get_default_group()
- work = default_pg.broadcast([tensor], opts)
- else:
- group_src_rank = _get_group_rank(group, src)
- opts.rootRank = group_src_rank
- work = group.broadcast([tensor], opts)
- if async_op:
- return work
- else:
- work.wait()
- def all_reduce_multigpu(tensor_list, op=ReduceOp.SUM, group=None, async_op=False):
- r"""
- Reduces the tensor data across all machines in such a way that all get
- the final result. This function reduces a number of tensors on every node,
- while each tensor resides on different GPUs.
- Therefore, the input tensor in the tensor list needs to be GPU tensors.
- Also, each tensor in the tensor list needs to reside on a different GPU.
- After the call, all ``tensor`` in ``tensor_list`` is going to be bitwise
- identical in all processes.
- Complex tensors are supported.
- Only nccl and gloo backend is currently supported
- tensors should only be GPU tensors
- Args:
- tensor_list (List[Tensor]): List of input and output tensors of
- the collective. The function operates in-place and requires that
- each tensor to be a GPU tensor on different GPUs.
- You also need to make sure that ``len(tensor_list)`` is the same for
- all the distributed processes calling this function.
- op (optional): One of the values from
- ``torch.distributed.ReduceOp``
- enum. Specifies an operation used for element-wise reductions.
- group (ProcessGroup, optional): The process group to work on. If
- ``None``, the default process group will be used.
- async_op (bool, optional): Whether this op should be an async op
- Returns:
- Async work handle, if async_op is set to True.
- None, if not async_op or if not part of the group
- """
- if _rank_not_in_group(group):
- return
- tensor_list = [
- t if not t.is_complex() else torch.view_as_real(t) for t in tensor_list
- ]
- opts = AllreduceOptions()
- opts.reduceOp = op
- if group is None:
- default_pg = _get_default_group()
- work = default_pg.allreduce(tensor_list, opts)
- else:
- work = group.allreduce(tensor_list, opts)
- if async_op:
- return work
- else:
- work.wait()
- def all_reduce(tensor, op=ReduceOp.SUM, group=None, async_op=False):
- """
- Reduces the tensor data across all machines in such a way that all get
- the final result.
- After the call ``tensor`` is going to be bitwise identical in all processes.
- Complex tensors are supported.
- Args:
- tensor (Tensor): Input and output of the collective. The function
- operates in-place.
- op (optional): One of the values from
- ``torch.distributed.ReduceOp``
- enum. Specifies an operation used for element-wise reductions.
- group (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used.
- async_op (bool, optional): Whether this op should be an async op
- Returns:
- Async work handle, if async_op is set to True.
- None, if not async_op or if not part of the group
- Examples:
- >>> # All tensors below are of torch.int64 type.
- >>> # We have 2 process groups, 2 ranks.
- >>> tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank
- >>> tensor
- tensor([1, 2]) # Rank 0
- tensor([3, 4]) # Rank 1
- >>> dist.all_reduce(tensor, op=ReduceOp.SUM)
- >>> tensor
- tensor([4, 6]) # Rank 0
- tensor([4, 6]) # Rank 1
- >>> # All tensors below are of torch.cfloat type.
- >>> # We have 2 process groups, 2 ranks.
- >>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat) + 2 * rank * (1+1j)
- >>> tensor
- tensor([1.+1.j, 2.+2.j]) # Rank 0
- tensor([3.+3.j, 4.+4.j]) # Rank 1
- >>> dist.all_reduce(tensor, op=ReduceOp.SUM)
- >>> tensor
- tensor([4.+4.j, 6.+6.j]) # Rank 0
- tensor([4.+4.j, 6.+6.j]) # Rank 1
- """
- _check_single_tensor(tensor, "tensor")
- if _rank_not_in_group(group):
- _warn_not_in_group("all_reduce")
- return
- if tensor.is_complex():
- if not supports_complex(op):
- raise RuntimeError(f"all_reduce does not support {op} on complex tensors")
- tensor = torch.view_as_real(tensor)
- opts = AllreduceOptions()
- opts.reduceOp = op
- if group is None:
- default_pg = _get_default_group()
- work = default_pg.allreduce([tensor], opts)
- else:
- work = group.allreduce([tensor], opts)
- if async_op:
- return work
- else:
- work.wait()
- def all_reduce_coalesced(tensors, op=ReduceOp.SUM, group=None, async_op=False):
- """
- WARNING: at this time individual shape checking is not implemented across nodes.
- For example, if the rank 0 node passes [torch.rand(4), torch.rand(2)] and the
- rank 1 node passes [torch.rand(2), torch.rand(2), torch.rand(2)], the allreduce
- operation will proceed without complaint and return erroneous outputs. This lack
- of shape checking results in significant performance improvements but users of this
- function should take extra care to ensure that each node passes in tensors whose
- shapes match across nodes.
- Reduces each tensor in tensors (residing on the same device) across all machines
- in such a way that all get the final result.
- After the call each tensor in tensors is going to bitwise identical
- in all processes.
- Complex tensors are supported.
- Args:
- tensors (List[Tensor]): Input and output of the collective. The function
- operates in-place.
- op (Optional[ReduceOp]): One of the values from
- ``torch.distributed.ReduceOp`` enum. Specifies an operation used for
- element-wise reductions.
- group (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used.
- async_op (Optional[bool]): Whether this op should be an async op.
- Returns:
- Async work handle, if async_op is set to True.
- None, if not async_op or if not part of the group.
- """
- _check_tensor_list(tensors, "tensor")
- if _rank_not_in_group(group):
- _warn_not_in_group("all_reduce_coalesced")
- return
- if any([t.is_complex() for t in tensors]) and not supports_complex(op):
- raise RuntimeError(f"all_reduce does not support {op} on complex tensors")
- tensors = [t if not t.is_complex() else torch.view_as_real(t) for t in tensors]
- opts = AllreduceCoalescedOptions()
- opts.reduceOp = op
- if group is None:
- default_pg = _get_default_group()
- work = default_pg.allreduce_coalesced(tensors, opts)
- else:
- work = group.allreduce_coalesced(tensors, opts)
- if async_op:
- return work.get_future()
- else:
- work.wait()
- def reduce_multigpu(
- tensor_list, dst, op=ReduceOp.SUM, group=None, async_op=False, dst_tensor=0
- ):
- """
- Reduces the tensor data on multiple GPUs across all machines. Each tensor
- in ``tensor_list`` should reside on a separate GPU
- Only the GPU of ``tensor_list[dst_tensor]`` on the process with rank ``dst``
- is going to receive the final result.
- Only nccl backend is currently supported
- tensors should only be GPU tensors
- Args:
- tensor_list (List[Tensor]): Input and output GPU tensors of the
- collective. The function operates in-place.
- You also need to make sure that ``len(tensor_list)`` is the same for
- all the distributed processes calling this function.
- dst (int): Destination rank
- op (optional): One of the values from
- ``torch.distributed.ReduceOp``
- enum. Specifies an operation used for element-wise reductions.
- group (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used.
- async_op (bool, optional): Whether this op should be an async op
- dst_tensor (int, optional): Destination tensor rank within
- ``tensor_list``
- Returns:
- Async work handle, if async_op is set to True.
- None, otherwise
- """
- if _rank_not_in_group(group):
- _warn_not_in_group("reduce_multigpu")
- return
- opts = ReduceOptions()
- opts.reduceOp = op
- opts.rootRank = dst
- opts.rootTensor = dst_tensor
- if group is None or group is GroupMember.WORLD:
- default_pg = _get_default_group()
- work = default_pg.reduce(tensor_list, opts)
- else:
- group_dst_rank = _get_group_rank(group, dst)
- opts.rootRank = group_dst_rank
- work = group.reduce(tensor_list, opts)
- if async_op:
- return work
- else:
- work.wait()
- def reduce(tensor, dst, op=ReduceOp.SUM, group=None, async_op=False):
- """
- Reduces the tensor data across all machines.
- Only the process with rank ``dst`` is going to receive the final result.
- Args:
- tensor (Tensor): Input and output of the collective. The function
- operates in-place.
- dst (int): Destination rank
- op (optional): One of the values from
- ``torch.distributed.ReduceOp``
- enum. Specifies an operation used for element-wise reductions.
- group (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used.
- async_op (bool, optional): Whether this op should be an async op
- Returns:
- Async work handle, if async_op is set to True.
- None, if not async_op or if not part of the group
- """
- _check_single_tensor(tensor, "tensor")
- if _rank_not_in_group(group):
- _warn_not_in_group("reduce")
- return
- opts = ReduceOptions()
- opts.reduceOp = op
- opts.rootRank = dst
- if group is None or group is GroupMember.WORLD:
- default_pg = _get_default_group()
- work = default_pg.reduce([tensor], opts)
- else:
- group_dst_rank = _get_group_rank(group, dst)
- opts.rootRank = group_dst_rank
- work = group.reduce([tensor], opts)
- if async_op:
- return work
- else:
- work.wait()
- def all_gather_multigpu(
- output_tensor_lists, input_tensor_list, group=None, async_op=False
- ):
- """
- Gathers tensors from the whole group in a list.
- Each tensor in ``tensor_list`` should reside on a separate GPU
- Only nccl backend is currently supported
- tensors should only be GPU tensors
- Complex tensors are supported.
- Args:
- output_tensor_lists (List[List[Tensor]]): Output lists. It should
- contain correctly-sized tensors on each GPU to be used for output
- of the collective, e.g. ``output_tensor_lists[i]`` contains the
- all_gather result that resides on the GPU of
- ``input_tensor_list[i]``.
- Note that each element of ``output_tensor_lists`` has the size of
- ``world_size * len(input_tensor_list)``, since the function all
- gathers the result from every single GPU in the group. To interpret
- each element of ``output_tensor_lists[i]``, note that
- ``input_tensor_list[j]`` of rank k will be appear in
- ``output_tensor_lists[i][k * world_size + j]``
- Also note that ``len(output_tensor_lists)``, and the size of each
- element in ``output_tensor_lists`` (each element is a list,
- therefore ``len(output_tensor_lists[i])``) need to be the same
- for all the distributed processes calling this function.
- input_tensor_list (List[Tensor]): List of tensors(on different GPUs) to
- be broadcast from current process.
- Note that ``len(input_tensor_list)`` needs to be the same for
- all the distributed processes calling this function.
- group (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used.
- async_op (bool, optional): Whether this op should be an async op
- Returns:
- Async work handle, if async_op is set to True.
- None, if not async_op or if not part of the group
- """
- if _rank_not_in_group(group):
- _warn_not_in_group("all_gather_multigpu")
- return
- output_tensor_lists = [
- [t if not t.is_complex() else torch.view_as_real(t) for t in l]
- for l in output_tensor_lists
- ]
- input_tensor_list = [
- t if not t.is_complex() else torch.view_as_real(t) for t in input_tensor_list
- ]
- if group is None:
- default_pg = _get_default_group()
- work = default_pg.allgather(output_tensor_lists, input_tensor_list)
- else:
- work = group.allgather(output_tensor_lists, input_tensor_list)
- if async_op:
- return work
- else:
- work.wait()
- def _object_to_tensor(obj):
- f = io.BytesIO()
- _pickler(f).dump(obj)
- byte_storage = torch.ByteStorage.from_buffer(f.getvalue()) # type: ignore[attr-defined]
- # Do not replace `torch.ByteTensor` or `torch.LongTensor` with torch.tensor and specifying dtype.
- # Otherwise, it will casue 100X slowdown.
- # See: https://github.com/pytorch/pytorch/issues/65696
- byte_tensor = torch.ByteTensor(byte_storage)
- local_size = torch.LongTensor([byte_tensor.numel()])
- return byte_tensor, local_size
- def _tensor_to_object(tensor, tensor_size):
- buf = tensor.numpy().tobytes()[:tensor_size]
- return _unpickler(io.BytesIO(buf)).load()
- def _check_for_nccl_backend(group):
- pg = group or _get_default_group()
- # Gate PG wrapper check on Gloo availability.
- if _GLOO_AVAILABLE:
- # It is not expected for PG to be wrapped many times, but support it just
- # in case
- while isinstance(pg, _ProcessGroupWrapper):
- pg = pg.wrapped_pg
- return (
- is_nccl_available() and
- isinstance(pg, ProcessGroupNCCL)
- )
- def all_gather_object(object_list, obj, group=None):
- """
- Gathers picklable objects from the whole group into a list. Similar to
- :func:`all_gather`, but Python objects can be passed in. Note that the object
- must be picklable in order to be gathered.
- Args:
- object_list (list[Any]): Output list. It should be correctly sized as the
- size of the group for this collective and will contain the output.
- object (Any): Pickable Python object to be broadcast from current process.
- group (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used. Default is ``None``.
- Returns:
- None. If the calling rank is part of this group, the output of the
- collective will be populated into the input ``object_list``. If the
- calling rank is not part of the group, the passed in ``object_list`` will
- be unmodified.
- .. note:: Note that this API differs slightly from the :func:`all_gather`
- collective since it does not provide an ``async_op`` handle and thus
- will be a blocking call.
- .. note:: For NCCL-based processed groups, internal tensor representations
- of objects must be moved to the GPU device before communication takes
- place. In this case, the device used is given by
- ``torch.cuda.current_device()`` and it is the user's responsiblity to
- ensure that this is set so that each rank has an individual GPU, via
- ``torch.cuda.set_device()``.
- .. warning::
- :func:`all_gather_object` uses ``pickle`` module implicitly, which is
- known to be insecure. It is possible to construct malicious pickle data
- which will execute arbitrary code during unpickling. Only call this
- function with data you trust.
- Example::
- >>> # Note: Process group initialization omitted on each rank.
- >>> import torch.distributed as dist
- >>> # Assumes world_size of 3.
- >>> gather_objects = ["foo", 12, {1: 2}] # any picklable object
- >>> output = [None for _ in gather_objects]
- >>> dist.all_gather_object(output, gather_objects[dist.get_rank()])
- >>> output
- ['foo', 12, {1: 2}]
- """
- if _rank_not_in_group(group):
- _warn_not_in_group("all_gather_object")
- return
- input_tensor, local_size = _object_to_tensor(obj)
- current_device = torch.device("cpu")
- is_nccl_backend = _check_for_nccl_backend(group)
- if is_nccl_backend:
- # See note about using torch.cuda.current_device() here in docstring.
- # We cannot simply use my_rank since rank == device is not necessarily
- # true.
- current_device = torch.device("cuda", torch.cuda.current_device())
- input_tensor = input_tensor.to(current_device)
- local_size = local_size.to(current_device)
- # Gather all local sizes. This is so that we can find the max size, and index
- # until the correct size when deserializing the tensors.
- group_size = get_world_size(group=group)
- object_sizes_tensor = torch.zeros(
- group_size, dtype=torch.long, device=current_device
- )
- object_size_list = [
- object_sizes_tensor[i].unsqueeze(dim=0) for i in range(group_size)
- ]
- # Allgather tensor sizes
- all_gather(object_size_list, local_size, group=group)
- max_object_size = int(max(object_size_list).item()) # type: ignore[type-var]
- # Resize tensor to max size across all ranks.
- input_tensor.resize_(max_object_size)
- coalesced_output_tensor = torch.empty(
- max_object_size * group_size, dtype=torch.uint8, device=current_device
- )
- # Output tensors are nonoverlapping views of coalesced_output_tensor
- output_tensors = [
- coalesced_output_tensor[max_object_size * i : max_object_size * (i + 1)]
- for i in range(group_size)
- ]
- all_gather(output_tensors, input_tensor, group=group)
- # Deserialize outputs back to object.
- for i, tensor in enumerate(output_tensors):
- tensor = tensor.type(torch.uint8)
- if tensor.device != torch.device("cpu"):
- tensor = tensor.cpu()
- tensor_size = object_size_list[i]
- object_list[i] = _tensor_to_object(tensor, tensor_size)
- def gather_object(obj, object_gather_list=None, dst=0, group=None):
- """
- Gathers picklable objects from the whole group in a single process.
- Similar to :func:`gather`, but Python objects can be passed in. Note that the
- object must be picklable in order to be gathered.
- Args:
- obj (Any): Input object. Must be picklable.
- object_gather_list (list[Any]): Output list. On the ``dst`` rank, it
- should be correctly sized as the size of the group for this
- collective and will contain the output. Must be ``None`` on non-dst
- ranks. (default is ``None``)
- dst (int, optional): Destination rank. (default is 0)
- group: (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used. Default is ``None``.
- Returns:
- None. On the ``dst`` rank, ``object_gather_list`` will contain the
- output of the collective.
- .. note:: Note that this API differs slightly from the gather collective
- since it does not provide an async_op handle and thus will be a blocking
- call.
- .. note:: For NCCL-based processed groups, internal tensor representations
- of objects must be moved to the GPU device before communication takes
- place. In this case, the device used is given by
- ``torch.cuda.current_device()`` and it is the user's responsiblity to
- ensure that this is set so that each rank has an individual GPU, via
- ``torch.cuda.set_device()``.
- .. warning::
- :func:`gather_object` uses ``pickle`` module implicitly, which is
- known to be insecure. It is possible to construct malicious pickle data
- which will execute arbitrary code during unpickling. Only call this
- function with data you trust.
- Example::
- >>> # Note: Process group initialization omitted on each rank.
- >>> import torch.distributed as dist
- >>> # Assumes world_size of 3.
- >>> gather_objects = ["foo", 12, {1: 2}] # any picklable object
- >>> output = [None for _ in gather_objects]
- >>> dist.gather_object(
- gather_objects[dist.get_rank()],
- output if dist.get_rank() == 0 else None,
- dst=0
- )
- >>> # On rank 0
- >>> output
- ['foo', 12, {1: 2}]
- """
- if _rank_not_in_group(group):
- _warn_not_in_group("gather_object")
- return
- # Ensure object_gather_list is specified appopriately.
- my_rank = get_rank()
- _validate_output_list_for_rank(my_rank, dst, object_gather_list)
- input_tensor, local_size = _object_to_tensor(obj)
- current_device = torch.device("cpu")
- is_nccl_backend = _check_for_nccl_backend(group)
- if is_nccl_backend:
- current_device = torch.device("cuda", torch.cuda.current_device())
- input_tensor = input_tensor.to(current_device)
- local_size = local_size.to(current_device)
- # Gather all local sizes. This is so that we can find the max size, and index
- # until the correct size when deserializing the tensors.
- group_size = get_world_size(group=group)
- object_sizes_tensor = torch.zeros(
- group_size, dtype=torch.long, device=current_device
- )
- object_size_list = [
- object_sizes_tensor[i].unsqueeze(dim=0) for i in range(group_size)
- ]
- # Allgather tensor sizes. An all-gather is needed here despite this being a
- # gather, since each rank needs to broadcast a tensor of the same (maximal)
- # size.
- all_gather(object_size_list, local_size, group=group)
- max_object_size = int(max(object_size_list).item()) # type: ignore[type-var]
- # Resize tensor to max size across all ranks.
- input_tensor.resize_(max_object_size)
- # Avoid populating output tensors if the result won't be gathered on this rank.
- if my_rank == dst:
- coalesced_output_tensor = torch.empty(
- max_object_size * group_size, dtype=torch.uint8, device=current_device
- )
- # Output tensors are nonoverlapping views of coalesced_output_tensor
- output_tensors = [
- coalesced_output_tensor[max_object_size * i : max_object_size * (i + 1)]
- for i in range(group_size)
- ]
- # All ranks call gather with equal-sized tensors.
- gather(
- input_tensor,
- gather_list=output_tensors if my_rank == dst else None,
- dst=dst,
- group=group,
- )
- if my_rank != dst:
- return
- for i, tensor in enumerate(output_tensors):
- tensor = tensor.type(torch.uint8)
- if tensor.device != torch.device("cpu"):
- tensor = tensor.cpu()
- tensor_size = object_size_list[i]
- object_gather_list[i] = _tensor_to_object(tensor, tensor_size)
- def broadcast_object_list(object_list, src=0, group=None, device=None):
- """
- Broadcasts picklable objects in ``object_list`` to the whole group. Similar
- to :func:`broadcast`, but Python objects can be passed in.
- Note that all objects in ``object_list`` must be picklable in order to be
- broadcasted.
- Args:
- object_list (List[Any]): List of input objects to broadcast.
- Each object must be picklable. Only objects on the ``src`` rank will
- be broadcast, but each rank must provide lists of equal sizes.
- src (int): Source rank from which to broadcast ``object_list``.
- group: (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used. Default is ``None``.
- device (``torch.device``, optional): If not None, the objects are
- serialized and converted to tensors which are moved to the
- ``device`` before broadcasting. Default is ``None``.
- Returns:
- ``None``. If rank is part of the group, ``object_list`` will contain the
- broadcasted objects from ``src`` rank.
- .. note:: For NCCL-based processed groups, internal tensor representations
- of objects must be moved to the GPU device before communication takes
- place. In this case, the device used is given by
- ``torch.cuda.current_device()`` and it is the user's responsiblity to
- ensure that this is set so that each rank has an individual GPU, via
- ``torch.cuda.set_device()``.
- .. note:: Note that this API differs slightly from the :func:`all_gather`
- collective since it does not provide an ``async_op`` handle and thus
- will be a blocking call.
- .. warning::
- :func:`broadcast_object_list` uses ``pickle`` module implicitly, which
- is known to be insecure. It is possible to construct malicious pickle
- data which will execute arbitrary code during unpickling. Only call this
- function with data you trust.
- Example::
- >>> # Note: Process group initialization omitted on each rank.
- >>> import torch.distributed as dist
- >>> if dist.get_rank() == 0:
- >>> # Assumes world_size of 3.
- >>> objects = ["foo", 12, {1: 2}] # any picklable object
- >>> else:
- >>> objects = [None, None, None]
- >>> # Assumes backend is not NCCL
- >>> device = torch.device("cpu")
- >>> dist.broadcast_object_list(objects, src=0, device=device)
- >>> objects
- ['foo', 12, {1: 2}]
- """
- if _rank_not_in_group(group):
- _warn_not_in_group("broadcast_object_list")
- return
- my_rank = get_rank()
- # Serialize object_list elements to tensors on src rank.
- if my_rank == src:
- tensor_list, size_list = zip(*[_object_to_tensor(obj) for obj in object_list])
- object_sizes_tensor = torch.cat(size_list)
- else:
- object_sizes_tensor = torch.empty(len(object_list), dtype=torch.long)
- # Current device selection.
- # To preserve backwards compatibility, ``device`` is default to ``None``
- # in which case we run current logic of device selection, i.e.
- # ``current_device`` is CUDA if backend is NCCL otherwise CPU device. In the
- # case it is not ``None`` we move the size and object tensors to be
- # broadcasted to this device.
- is_nccl_backend = _check_for_nccl_backend(group)
- current_device = None
- if device is not None:
- if is_nccl_backend and device.type != "cuda":
- raise ValueError("device type must be cuda for nccl backend")
- current_device = device
- else:
- current_device = torch.device("cpu")
- if is_nccl_backend:
- # See note about using torch.cuda.current_device() here in
- # docstring. We cannot simply use my_rank since rank == device is
- # not necessarily true.
- current_device = torch.device("cuda", torch.cuda.current_device())
- if is_nccl_backend:
- object_sizes_tensor = object_sizes_tensor.to(current_device)
- # Broadcast object sizes
- broadcast(object_sizes_tensor, src=src, group=group)
- # Concatenate and broadcast serialized object tensors
- if my_rank == src:
- object_tensor = torch.cat(tensor_list)
- else:
- object_tensor = torch.empty( # type: ignore[call-overload]
- torch.sum(object_sizes_tensor).item(), # type: ignore[arg-type]
- dtype=torch.uint8,
- )
- if is_nccl_backend:
- object_tensor = object_tensor.to(current_device)
- broadcast(object_tensor, src=src, group=group)
- # Deserialize objects using their stored sizes.
- offset = 0
- if my_rank != src:
- for i, obj_size in enumerate(object_sizes_tensor):
- obj_view = object_tensor[offset : offset + obj_size]
- obj_view = obj_view.type(torch.uint8)
- if obj_view.device != torch.device("cpu"):
- obj_view = obj_view.cpu()
- offset += obj_size
- object_list[i] = _tensor_to_object(obj_view, obj_size)
- def scatter_object_list(
- scatter_object_output_list, scatter_object_input_list, src=0, group=None
- ):
- """
- Scatters picklable objects in ``scatter_object_input_list`` to the whole
- group. Similar to :func:`scatter`, but Python objects can be passed in. On
- each rank, the scattered object will be stored as the first element of
- ``scatter_object_output_list``. Note that all objects in
- ``scatter_object_input_list`` must be picklable in order to be scattered.
- Args:
- scatter_object_output_list (List[Any]): Non-empty list whose first
- element will store the object scattered to this rank.
- scatter_object_input_list (List[Any]): List of input objects to scatter.
- Each object must be picklable. Only objects on the ``src`` rank will
- be scattered, and the argument can be ``None`` for non-src ranks.
- src (int): Source rank from which to scatter
- ``scatter_object_input_list``.
- group: (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used. Default is ``None``.
- Returns:
- ``None``. If rank is part of the group, ``scatter_object_output_list``
- will have its first element set to the scattered object for this rank.
- .. note:: Note that this API differs slightly from the scatter collective
- since it does not provide an ``async_op`` handle and thus will be a
- blocking call.
- .. note:: Note that this API does not support the NCCL backend, as the
- tensor-based scatter collective is not supported by ProcessGroupNCCL.
- .. warning::
- :func:`scatter_object_list` uses ``pickle`` module implicitly, which
- is known to be insecure. It is possible to construct malicious pickle
- data which will execute arbitrary code during unpickling. Only call this
- function with data you trust.
- Example::
- >>> # Note: Process group initialization omitted on each rank.
- >>> import torch.distributed as dist
- >>> if dist.get_rank() == 0:
- >>> # Assumes world_size of 3.
- >>> objects = ["foo", 12, {1: 2}] # any picklable object
- >>> else:
- >>> # Can be any list on non-src ranks, elements are not used.
- >>> objects = [None, None, None]
- >>> output_list = [None]
- >>> dist.scatter_object_list(output_list, objects, src=0)
- >>> # Rank i gets objects[i]. For example, on rank 2:
- >>> output_list
- [{1: 2}]
- """
- if _rank_not_in_group(group):
- _warn_not_in_group("scatter_object_list")
- return
- if (
- not isinstance(scatter_object_output_list, list)
- or len(scatter_object_output_list) < 1
- ):
- raise RuntimeError(
- "Expected argument scatter_object_output_list to be a list of size at least 1."
- )
- my_rank = get_rank(group)
- if my_rank == src:
- tensor_list, tensor_sizes = zip(
- *[_object_to_tensor(obj) for obj in scatter_object_input_list]
- )
- tensor_list, tensor_sizes = list(tensor_list), list(tensor_sizes)
- # Src rank broadcasts the maximum tensor size. This is because all ranks are
- # expected to call into scatter() with equal-sized tensors.
- if my_rank == src:
- max_tensor_size = max(tensor_sizes)
- for tensor in tensor_list:
- tensor.resize_(max_tensor_size)
- else:
- max_tensor_size = torch.tensor([0], dtype=torch.long)
- broadcast(max_tensor_size, src=src, group=group)
- # Scatter actual serialized objects
- output_tensor = torch.empty(max_tensor_size.item(), dtype=torch.uint8)
- scatter(
- output_tensor,
- scatter_list=None if my_rank != src else tensor_list,
- src=src,
- group=group,
- )
- # Scatter per-object sizes to trim tensors when deserializing back to object
- obj_tensor_size = torch.tensor([0], dtype=torch.long)
- scatter(
- obj_tensor_size,
- scatter_list=None if my_rank != src else tensor_sizes,
- src=src,
- group=group,
- )
- # Deserialize back to object
- scatter_object_output_list[0] = _tensor_to_object(output_tensor, obj_tensor_size)
- def all_gather(tensor_list, tensor, group=None, async_op=False):
- """
- Gathers tensors from the whole group in a list.
- Complex tensors are supported.
- Args:
- tensor_list (list[Tensor]): Output list. It should contain
- correctly-sized tensors to be used for output of the collective.
- tensor (Tensor): Tensor to be broadcast from current process.
- group (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used.
- async_op (bool, optional): Whether this op should be an async op
- Returns:
- Async work handle, if async_op is set to True.
- None, if not async_op or if not part of the group
- Examples:
- >>> # All tensors below are of torch.int64 dtype.
- >>> # We have 2 process groups, 2 ranks.
- >>> tensor_list = [torch.zeros(2, dtype=torch.int64) for _ in range(2)]
- >>> tensor_list
- [tensor([0, 0]), tensor([0, 0])] # Rank 0 and 1
- >>> tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank
- >>> tensor
- tensor([1, 2]) # Rank 0
- tensor([3, 4]) # Rank 1
- >>> dist.all_gather(tensor_list, tensor)
- >>> tensor_list
- [tensor([1, 2]), tensor([3, 4])] # Rank 0
- [tensor([1, 2]), tensor([3, 4])] # Rank 1
- >>> # All tensors below are of torch.cfloat dtype.
- >>> # We have 2 process groups, 2 ranks.
- >>> tensor_list = [torch.zeros(2, dtype=torch.cfloat) for _ in range(2)]
- >>> tensor_list
- [tensor([0.+0.j, 0.+0.j]), tensor([0.+0.j, 0.+0.j])] # Rank 0 and 1
- >>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat) + 2 * rank * (1+1j)
- >>> tensor
- tensor([1.+1.j, 2.+2.j]) # Rank 0
- tensor([3.+3.j, 4.+4.j]) # Rank 1
- >>> dist.all_gather(tensor_list, tensor)
- >>> tensor_list
- [tensor([1.+1.j, 2.+2.j]), tensor([3.+3.j, 4.+4.j])] # Rank 0
- [tensor([1.+1.j, 2.+2.j]), tensor([3.+3.j, 4.+4.j])] # Rank 1
- """
- _check_tensor_list(tensor_list, "tensor_list")
- _check_single_tensor(tensor, "tensor")
- if _rank_not_in_group(group):
- _warn_not_in_group("all_gather")
- return
- tensor_list = [
- t if not t.is_complex() else torch.view_as_real(t) for t in tensor_list
- ]
- tensor = tensor if not tensor.is_complex() else torch.view_as_real(tensor)
- if group is None:
- default_pg = _get_default_group()
- work = default_pg.allgather([tensor_list], [tensor])
- else:
- work = group.allgather([tensor_list], [tensor])
- if async_op:
- return work
- else:
- work.wait()
- def _all_gather_base(output_tensor, input_tensor, group=None, async_op=False):
- """
- Single tensor all gather. Gathers a single tensor from all ranks, and puts them in a single output tensor.
- Args:
- output_tensor (Tensor): Output tensor. It should contain
- correctly-sized tensors to be used for output of the collective.
- input_tensor (Tensor): Tensor to be broadcast from current process.
- group (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used.
- async_op (bool, optional): Whether this op should be an async op
- Returns:
- Async work handle, if async_op is set to True.
- None, if not async_op or if not part of the group
- Examples:
- >>> # All tensors below are of torch.int64 dtype.
- >>> # We have 2 process groups, 2 ranks.
- >>> output_tensor = torch.zeros(2, dtype=torch.int64)
- >>> output_tensor
- [tensor([0, 0])] # Rank 0 and 1
- >>> tensor = torch.arange(1, dtype=torch.int64) + 1 + rank
- >>> tensor
- tensor([1]) # Rank 0
- tensor([2]) # Rank 1
- >>> dist.all_gather_base(output_tensor, tensor)
- >>> output_tensor
- tensor([1,2]) # Rank 0
- tensor([1,2]) # Rank 1
- .. warning::
- `_all_gather_base` is experimental and subject to change.
- It is the caller's responsibility to ensure the output_tensor
- is correctly sized.
- """
- _check_single_tensor(input_tensor, "input_tensor")
- _check_single_tensor(output_tensor, "output_tensor")
- if _rank_not_in_group(group):
- _warn_not_in_group("_all_gather_base")
- return
- output_tensor = (
- output_tensor
- if not output_tensor.is_complex()
- else torch.view_as_real(output_tensor)
- )
- input_tensor = (
- input_tensor
- if not input_tensor.is_complex()
- else torch.view_as_real(input_tensor)
- )
- if group is None:
- default_pg = _get_default_group()
- work = default_pg._allgather_base(output_tensor, input_tensor)
- else:
- work = group._allgather_base(output_tensor, input_tensor)
- if async_op:
- return work
- else:
- work.wait()
- def all_gather_coalesced(
- output_tensor_lists, input_tensor_list, group=None, async_op=False
- ):
- """
- Gathers input tensors from the whole group in a list in a coalesced manner.
- Complex tensors are supported.
- Args:
- output_tensor_lists (list[list[Tensor]]): Output list. It should contain
- correctly-sized tensors to be used for output of the collective.
- input_tensor_list (list[Tensor]): Tensors to be broadcast from
- current process. At least one tensor has to be non empty.
- group (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used.
- async_op (bool, optional): Whether this op should be an async op.
- Returns:
- Async work handle, if async_op is set to True.
- None, if not async_op or if not part of the group
- Example:
- we have 2 process groups, 2 ranks.
- rank 0 passes:
- input_tensor_list = [[[1, 1], [1, 1]], [2], [3, 3]]
- output_tensor_lists =
- [[[[-1, -1], [-1, -1]], [-1], [-1, -1]],
- [[[-1, -1], [-1, -1]], [-1], [-1, -1]]]
- rank 1 passes:
- input_tensor_list = [[[3, 3], [3, 3]], [5], [1, 1]]
- output_tensor_lists =
- [[[[-1, -1], [-1, -1]], [-1], [-1, -1]],
- [[[-1, -1], [-1, -1]], [-1], [-1, -1]]]
- both rank 0 and 1 get:
- output_tensor_lists =
- [[[1, 1], [1, 1]], [2], [3, 3]],
- [[3, 3], [3, 3]], [5], [1, 1]]].
- WARNING: at this time individual shape checking is not implemented across nodes.
- For example, if the rank 0 node passes [torch.rand(4), torch.rand(2)] and the
- rank 1 node passes [torch.rand(2), torch.rand(2), torch.rand(2)], the
- all_gather_coalesced operation will proceed without complaint and return
- erroneous outputs. This lack of shape checking results in significant
- performance improvements but users of this function should take extra care
- to ensure that each node passes in tensors whose shapes match across nodes.
- """
- # We only check basic compatibility with C++ params here, C++ code will
- # do shape and type checking.
- if _rank_not_in_group(group):
- _warn_not_in_group("all_gather_coalesced")
- return
- _check_tensor_list(input_tensor_list, "tensor_list")
- if not isinstance(output_tensor_lists, list):
- raise RuntimeError(
- "Invalid function argument: " "output_tensor_lists should be a list"
- )
- for output_tensor_list in output_tensor_lists:
- _check_tensor_list(output_tensor_list, "output_tensor_lists")
- output_tensor_lists = [
- [t if not t.is_complex() else torch.view_as_real(t) for t in l]
- for l in output_tensor_lists
- ]
- input_tensor_list = [
- t if not t.is_complex() else torch.view_as_real(t) for t in input_tensor_list
- ]
- if group is None:
- default_pg = _get_default_group()
- work = default_pg.allgather_coalesced(output_tensor_lists, input_tensor_list)
- else:
- work = group.allgather_coalesced(output_tensor_lists, input_tensor_list)
- if async_op:
- return work.get_future()
- else:
- work.wait()
- def _validate_output_list_for_rank(my_rank, dst, gather_list):
- if dst == my_rank:
- if not gather_list:
- raise ValueError(
- "Argument ``gather_list`` must be specified on destination rank."
- )
- elif gather_list:
- raise ValueError(
- "Argument ``gather_list`` must NOT be specified "
- "on non-destination ranks."
- )
- def gather(tensor, gather_list=None, dst=0, group=None, async_op=False):
- """
- Gathers a list of tensors in a single process.
- Args:
- tensor (Tensor): Input tensor.
- gather_list (list[Tensor], optional): List of appropriately-sized
- tensors to use for gathered data (default is None, must be specified
- on the destination rank)
- dst (int, optional): Destination rank (default is 0)
- group (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used.
- async_op (bool, optional): Whether this op should be an async op
- Returns:
- Async work handle, if async_op is set to True.
- None, if not async_op or if not part of the group
- """
- _check_single_tensor(tensor, "tensor")
- # Parameter ``gather_list`` may be left unspecified on non-dst ranks.
- if gather_list:
- _check_tensor_list(gather_list, "gather_list")
- else:
- gather_list = []
- if _rank_not_in_group(group):
- _warn_not_in_group("gather")
- return
- my_rank = get_rank()
- _validate_output_list_for_rank(my_rank, dst, gather_list)
- output_tensors = [gather_list] if dst == my_rank else []
- input_tensors = [tensor]
- opts = GatherOptions()
- opts.rootRank = dst
- if group is None or group is GroupMember.WORLD:
- default_pg = _get_default_group()
- work = default_pg.gather(output_tensors, input_tensors, opts)
- else:
- group_dst_rank = _get_group_rank(group, dst)
- opts.rootRank = group_dst_rank
- work = group.gather(output_tensors, input_tensors, opts)
- if async_op:
- return work
- else:
- work.wait()
- def scatter(tensor, scatter_list=None, src=0, group=None, async_op=False):
- """
- Scatters a list of tensors to all processes in a group.
- Each process will receive exactly one tensor and store its data in the
- ``tensor`` argument.
- Complex tensors are supported.
- Args:
- tensor (Tensor): Output tensor.
- scatter_list (list[Tensor]): List of tensors to scatter (default is
- None, must be specified on the source rank)
- src (int): Source rank (default is 0)
- group (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used.
- async_op (bool, optional): Whether this op should be an async op
- Returns:
- Async work handle, if async_op is set to True.
- None, if not async_op or if not part of the group
- """
- _check_single_tensor(tensor, "tensor")
- # Parameter ``scatter_list`` may be left unspecified on non-src ranks.
- if scatter_list:
- _check_tensor_list(scatter_list, "scatter_list")
- else:
- scatter_list = []
- if _rank_not_in_group(group):
- _warn_not_in_group("scatter")
- return
- scatter_list = [
- t if not t.is_complex() else torch.view_as_real(t) for t in scatter_list
- ]
- tensor = tensor if not tensor.is_complex() else torch.view_as_real(tensor)
- my_rank = get_rank()
- if src == my_rank:
- if not scatter_list:
- raise ValueError(
- "Argument ``scatter_list`` must be specified " "on source rank."
- )
- input_tensors = [scatter_list]
- output_tensors = [tensor]
- else:
- if scatter_list:
- raise ValueError(
- "Argument ``scatter_list`` must NOT be specified "
- "on non-source ranks."
- )
- input_tensors = []
- output_tensors = [tensor]
- opts = ScatterOptions()
- opts.rootRank = src
- if group is None or group is GroupMember.WORLD:
- default_pg = _get_default_group()
- work = default_pg.scatter(output_tensors, input_tensors, opts)
- else:
- group_src_rank = _get_group_rank(group, src)
- opts.rootRank = group_src_rank
- work = group.scatter(output_tensors, input_tensors, opts)
- if async_op:
- return work
- else:
- work.wait()
- def reduce_scatter_multigpu(
- output_tensor_list, input_tensor_lists, op=ReduceOp.SUM, group=None, async_op=False
- ):
- """
- Reduce and scatter a list of tensors to the whole group. Only nccl backend
- is currently supported.
- Each tensor in ``output_tensor_list`` should reside on a separate GPU, as
- should each list of tensors in ``input_tensor_lists``.
- Args:
- output_tensor_list (List[Tensor]): Output tensors (on different GPUs)
- to receive the result of the operation.
- Note that ``len(output_tensor_list)`` needs to be the same for all
- the distributed processes calling this function.
- input_tensor_lists (List[List[Tensor]]): Input lists. It should
- contain correctly-sized tensors on each GPU to be used for input of
- the collective, e.g. ``input_tensor_lists[i]`` contains the
- reduce_scatter input that resides on the GPU of
- ``output_tensor_list[i]``.
- Note that each element of ``input_tensor_lists`` has the size of
- ``world_size * len(output_tensor_list)``, since the function
- scatters the result from every single GPU in the group. To
- interpret each element of ``input_tensor_lists[i]``, note that
- ``output_tensor_list[j]`` of rank k receives the reduce-scattered
- result from ``input_tensor_lists[i][k * world_size + j]``
- Also note that ``len(input_tensor_lists)``, and the size of each
- element in ``input_tensor_lists`` (each element is a list,
- therefore ``len(input_tensor_lists[i])``) need to be the same for
- all the distributed processes calling this function.
- group (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used.
- async_op (bool, optional): Whether this op should be an async op.
- Returns:
- Async work handle, if async_op is set to True.
- None, if not async_op or if not part of the group.
- """
- if _rank_not_in_group(group):
- _warn_not_in_group("reduce_scatter_multigpu")
- return
- opts = ReduceScatterOptions()
- opts.reduceOp = op
- if group is None:
- default_pg = _get_default_group()
- work = default_pg.reduce_scatter(output_tensor_list, input_tensor_lists, opts)
- else:
- work = group.reduce_scatter(output_tensor_list, input_tensor_lists, opts)
- if async_op:
- return work
- else:
- work.wait()
- def reduce_scatter(output, input_list, op=ReduceOp.SUM, group=None, async_op=False):
- """
- Reduces, then scatters a list of tensors to all processes in a group.
- Args:
- output (Tensor): Output tensor.
- input_list (list[Tensor]): List of tensors to reduce and scatter.
- group (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used.
- async_op (bool, optional): Whether this op should be an async op.
- Returns:
- Async work handle, if async_op is set to True.
- None, if not async_op or if not part of the group.
- """
- _check_single_tensor(output, "output")
- _check_tensor_list(input_list, "input_list")
- if _rank_not_in_group(group):
- _warn_not_in_group("reduce_scatter")
- return
- opts = ReduceScatterOptions()
- opts.reduceOp = op
- if group is None:
- default_pg = _get_default_group()
- work = default_pg.reduce_scatter([output], [input_list], opts)
- else:
- work = group.reduce_scatter([output], [input_list], opts)
- if async_op:
- return work
- else:
- work.wait()
- def _reduce_scatter_base(output, input, op=ReduceOp.SUM, group=None, async_op=False):
- """
- Reduces, then scatters a flattened tensor to all processes in a group.
- Args:
- output (Tensor): Output tensor.
- input (Tensor): Input tensor that is of size output tensor size times world size
- group (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used.
- async_op (bool, optional): Whether this op should be an async op.
- Returns:
- Async work handle, if async_op is set to True.
- None, if not async_op or if not part of the group.
- """
- _check_single_tensor(output, "output")
- _check_single_tensor(input, "input")
- if _rank_not_in_group(group):
- _warn_not_in_group("_reduce_scatter_base")
- return
- opts = ReduceScatterOptions()
- opts.reduceOp = op
- if group is None:
- default_pg = _get_default_group()
- work = default_pg._reduce_scatter_base(output, input, opts)
- else:
- work = group._reduce_scatter_base(output, input, opts)
- if async_op:
- return work
- else:
- work.wait()
- def all_to_all_single(
- output,
- input,
- output_split_sizes=None,
- input_split_sizes=None,
- group=None,
- async_op=False,
- ):
- """
- Each process splits input tensor and then scatters the split list
- to all processes in a group. Then concatenate the received tensors from all
- the processes in the group and return single output tensor.
- Complex tensors are supported.
- Args:
- output (Tensor): Gathered cancatenated output tensor.
- input (Tensor): Input tensor to scatter.
- output_split_sizes: (list[Int], optional): Output split sizes for dim 0
- if specified None or empty, dim 0 of ``output`` tensor must divide
- equally by ``world_size``.
- input_split_sizes: (list[Int], optional): Input split sizes for dim 0
- if specified None or empty, dim 0 of ``input`` tensor must divide
- equally by ``world_size``.
- group (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used.
- async_op (bool, optional): Whether this op should be an async op.
- Returns:
- Async work handle, if async_op is set to True.
- None, if not async_op or if not part of the group.
- .. warning::
- `all_to_all_single` is experimental and subject to change.
- Examples:
- >>> input = torch.arange(4) + rank * 4
- >>> input
- tensor([0, 1, 2, 3]) # Rank 0
- tensor([4, 5, 6, 7]) # Rank 1
- tensor([8, 9, 10, 11]) # Rank 2
- tensor([12, 13, 14, 15]) # Rank 3
- >>> output = torch.empty([4], dtype=torch.int64)
- >>> dist.all_to_all_single(output, input)
- >>> output
- tensor([0, 4, 8, 12]) # Rank 0
- tensor([1, 5, 9, 13]) # Rank 1
- tensor([2, 6, 10, 14]) # Rank 2
- tensor([3, 7, 11, 15]) # Rank 3
- >>> # Essentially, it is similar to following operation:
- >>> scatter_list = list(input.chunk(world_size))
- >>> gather_list = list(output.chunk(world_size))
- >>> for i in range(world_size):
- >>> dist.scatter(gather_list[i], scatter_list if i == rank else [], src = i)
- >>> # Another example with uneven split
- >>> input
- tensor([0, 1, 2, 3, 4, 5]) # Rank 0
- tensor([10, 11, 12, 13, 14, 15, 16, 17, 18]) # Rank 1
- tensor([20, 21, 22, 23, 24]) # Rank 2
- tensor([30, 31, 32, 33, 34, 35, 36]) # Rank 3
- >>> input_splits
- [2, 2, 1, 1] # Rank 0
- [3, 2, 2, 2] # Rank 1
- [2, 1, 1, 1] # Rank 2
- [2, 2, 2, 1] # Rank 3
- >>> output_splits
- [2, 3, 2, 2] # Rank 0
- [2, 2, 1, 2] # Rank 1
- [1, 2, 1, 2] # Rank 2
- [1, 2, 1, 1] # Rank 3
- >>> output = ...
- >>> dist.all_to_all_single(output, input, output_splits, input_splits)
- >>> output
- tensor([ 0, 1, 10, 11, 12, 20, 21, 30, 31]) # Rank 0
- tensor([ 2, 3, 13, 14, 22, 32, 33]) # Rank 1
- tensor([ 4, 15, 16, 23, 34, 35]) # Rank 2
- tensor([ 5, 17, 18, 24, 36]) # Rank 3
- >>> # Another example with tensors of torch.cfloat type.
- >>> input = torch.tensor([1+1j, 2+2j, 3+3j, 4+4j], dtype=torch.cfloat) + 4 * rank * (1+1j)
- >>> input
- tensor([1+1j, 2+2j, 3+3j, 4+4j]) # Rank 0
- tensor([5+5j, 6+6j, 7+7j, 8+8j]) # Rank 1
- tensor([9+9j, 10+10j, 11+11j, 12+12j]) # Rank 2
- tensor([13+13j, 14+14j, 15+15j, 16+16j]) # Rank 3
- >>> output = torch.empty([4], dtype=torch.int64)
- >>> dist.all_to_all_single(output, input)
- >>> output
- tensor([1+1j, 5+5j, 9+9j, 13+13j]) # Rank 0
- tensor([2+2j, 6+6j, 10+10j, 14+14j]) # Rank 1
- tensor([3+3j, 7+7j, 11+11j, 15+15j]) # Rank 2
- tensor([4+4j, 8+8j, 12+12j, 16+16j]) # Rank 3
- """
- if _rank_not_in_group(group):
- _warn_not_in_group("all_to_all_single")
- return
- opts = AllToAllOptions()
- _check_single_tensor(output, "output")
- _check_single_tensor(input, "input")
- if input.is_complex():
- input = torch.view_as_real(input)
- if output.is_complex():
- output = torch.view_as_real(output)
- output_split_sizes = [] if output_split_sizes is None else output_split_sizes
- input_split_sizes = [] if input_split_sizes is None else input_split_sizes
- if group is None:
- default_pg = _get_default_group()
- work = default_pg.alltoall_base(
- output, input, output_split_sizes, input_split_sizes, opts
- )
- else:
- work = group.alltoall_base(
- output, input, output_split_sizes, input_split_sizes, opts
- )
- if async_op:
- return work
- else:
- work.wait()
- def all_to_all(output_tensor_list, input_tensor_list, group=None, async_op=False):
- """
- Each process scatters list of input tensors to all processes in a group and
- return gathered list of tensors in output list.
- Complex tensors are supported.
- Args:
- output_tensor_list (list[Tensor]): List of tensors to be gathered one
- per rank.
- input_tensor_list (list[Tensor]): List of tensors to scatter one per rank.
- group (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used.
- async_op (bool, optional): Whether this op should be an async op.
- Returns:
- Async work handle, if async_op is set to True.
- None, if not async_op or if not part of the group.
- .. warning::
- `all_to_all` is experimental and subject to change.
- Examples:
- >>> input = torch.arange(4) + rank * 4
- >>> input = list(input.chunk(4))
- >>> input
- [tensor([0]), tensor([1]), tensor([2]), tensor([3])] # Rank 0
- [tensor([4]), tensor([5]), tensor([6]), tensor([7])] # Rank 1
- [tensor([8]), tensor([9]), tensor([10]), tensor([11])] # Rank 2
- [tensor([12]), tensor([13]), tensor([14]), tensor([15])] # Rank 3
- >>> output = list(torch.empty([4], dtype=torch.int64).chunk(4))
- >>> dist.all_to_all(output, input)
- >>> output
- [tensor([0]), tensor([4]), tensor([8]), tensor([12])] # Rank 0
- [tensor([1]), tensor([5]), tensor([9]), tensor([13])] # Rank 1
- [tensor([2]), tensor([6]), tensor([10]), tensor([14])] # Rank 2
- [tensor([3]), tensor([7]), tensor([11]), tensor([15])] # Rank 3
- >>> # Essentially, it is similar to following operation:
- >>> scatter_list = input
- >>> gather_list = output
- >>> for i in range(world_size):
- >>> dist.scatter(gather_list[i], scatter_list if i == rank else [], src = i)
- >>> input
- tensor([0, 1, 2, 3, 4, 5]) # Rank 0
- tensor([10, 11, 12, 13, 14, 15, 16, 17, 18]) # Rank 1
- tensor([20, 21, 22, 23, 24]) # Rank 2
- tensor([30, 31, 32, 33, 34, 35, 36]) # Rank 3
- >>> input_splits
- [2, 2, 1, 1] # Rank 0
- [3, 2, 2, 2] # Rank 1
- [2, 1, 1, 1] # Rank 2
- [2, 2, 2, 1] # Rank 3
- >>> output_splits
- [2, 3, 2, 2] # Rank 0
- [2, 2, 1, 2] # Rank 1
- [1, 2, 1, 2] # Rank 2
- [1, 2, 1, 1] # Rank 3
- >>> input = list(input.split(input_splits))
- >>> input
- [tensor([0, 1]), tensor([2, 3]), tensor([4]), tensor([5])] # Rank 0
- [tensor([10, 11, 12]), tensor([13, 14]), tensor([15, 16]), tensor([17, 18])] # Rank 1
- [tensor([20, 21]), tensor([22]), tensor([23]), tensor([24])] # Rank 2
- [tensor([30, 31]), tensor([32, 33]), tensor([34, 35]), tensor([36])] # Rank 3
- >>> output = ...
- >>> dist.all_to_all(output, input)
- >>> output
- [tensor([0, 1]), tensor([10, 11, 12]), tensor([20, 21]), tensor([30, 31])] # Rank 0
- [tensor([2, 3]), tensor([13, 14]), tensor([22]), tensor([32, 33])] # Rank 1
- [tensor([4]), tensor([15, 16]), tensor([23]), tensor([34, 35])] # Rank 2
- [tensor([5]), tensor([17, 18]), tensor([24]), tensor([36])] # Rank 3
- >>> # Another example with tensors of torch.cfloat type.
- >>> input = torch.tensor([1+1j, 2+2j, 3+3j, 4+4j], dtype=torch.cfloat) + 4 * rank * (1+1j)
- >>> input = list(input.chunk(4))
- >>> input
- [tensor([1+1j]), tensor([2+2j]), tensor([3+3j]), tensor([4+4j])] # Rank 0
- [tensor([5+5j]), tensor([6+6j]), tensor([7+7j]), tensor([8+8j])] # Rank 1
- [tensor([9+9j]), tensor([10+10j]), tensor([11+11j]), tensor([12+12j])] # Rank 2
- [tensor([13+13j]), tensor([14+14j]), tensor([15+15j]), tensor([16+16j])] # Rank 3
- >>> output = list(torch.empty([4], dtype=torch.int64).chunk(4))
- >>> dist.all_to_all(output, input)
- >>> output
- [tensor([1+1j]), tensor([5+5j]), tensor([9+9j]), tensor([13+13j])] # Rank 0
- [tensor([2+2j]), tensor([6+6j]), tensor([10+10j]), tensor([14+14j])] # Rank 1
- [tensor([3+3j]), tensor([7+7j]), tensor([11+11j]), tensor([15+15j])] # Rank 2
- [tensor([4+4j]), tensor([8+8j]), tensor([12+12j]), tensor([16+16j])] # Rank 3
- """
- if _rank_not_in_group(group):
- _warn_not_in_group("all_to_all")
- return
- opts = AllToAllOptions()
- _check_tensor_list(output_tensor_list, "output_tensor_list")
- _check_tensor_list(input_tensor_list, "input_tensor_list")
- input_tensor_list = [
- t if not t.is_complex() else torch.view_as_real(t) for t in input_tensor_list
- ]
- output_tensor_list = [
- t if not t.is_complex() else torch.view_as_real(t) for t in output_tensor_list
- ]
- if group is None:
- default_pg = _get_default_group()
- work = default_pg.alltoall(output_tensor_list, input_tensor_list, opts)
- else:
- work = group.alltoall(output_tensor_list, input_tensor_list, opts)
- if async_op:
- return work
- else:
- work.wait()
- def barrier(group=GroupMember.WORLD, async_op=False, device_ids=None):
- """
- Synchronizes all processes.
- This collective blocks processes until the whole group enters this function,
- if async_op is False, or if async work handle is called on wait().
- Args:
- group (ProcessGroup, optional): The process group to work on. If None,
- the default process group will be used.
- async_op (bool, optional): Whether this op should be an async op
- device_ids ([int], optional): List of device/GPU ids.
- Valid only for NCCL backend.
- Returns:
- Async work handle, if async_op is set to True.
- None, if not async_op or if not part of the group
- """
- if _rank_not_in_group(group):
- _warn_not_in_group("barrier")
- return
- opts = BarrierOptions()
- if device_ids is not None:
- if get_backend(group) != Backend.NCCL:
- raise RuntimeError(
- "Function argument device_ids not supported "
- "for the selected backend {}".format(get_backend(group))
- )
- if isinstance(device_ids, list):
- opts.device_ids = device_ids
- else:
- raise RuntimeError(
- "Invalid function argument: " "device_ids type should be List[int]"
- )
- if group is None:
- default_pg = _get_default_group()
- work = default_pg.barrier(opts=opts)
- else:
- work = group.barrier(opts=opts)
- if async_op:
- return work
- else:
- work.wait()
- def monitored_barrier(group=GroupMember.WORLD, timeout=None, wait_all_ranks=False):
- """
- Synchronizes all processes similar to ``torch.distributed.barrier``, but takes
- a configurable timeout and is able to report ranks that did not pass this
- barrier within that timeout. Specifically, for non-zero ranks, will block
- until a send/recv is processed from rank 0. Rank 0 will block until all send
- /recv from other ranks are processed, and will report failures for ranks
- that failed to respond in time. Note that if one rank does not reach the
- monitored_barrier (for example due to a hang), all other ranks would fail
- in monitored_barrier.
- This collective will block all processes/ranks in the group, until the
- whole group exits the function successfully, making it useful for debugging
- and synchronizing. However, it can have a performance impact and should only
- be used for debugging or scenarios that require full synchronization points
- on the host-side. For debugging purposees, this barrier can be inserted
- before the application's collective calls to check if any ranks are
- desynchronized.
- .. note:: Note that this collective is only supported with the GLOO backend.
- Args:
- group (ProcessGroup, optional): The process group to work on. If
- ``None``, the default process group will be used.
- timeout (datetime.timedelta, optional): Timeout for monitored_barrier.
- If ``None``, the default process group timeout will be used.
- wait_all_ranks (bool, optional): Whether to collect all failed ranks or
- not. By default, this is ``False`` and ``monitored_barrier`` on rank 0
- will throw on the first failed rank it encounters in order to fail
- fast. By setting ``wait_all_ranks=True`` ``monitored_barrier`` will
- collect all failed ranks and throw an error containing information
- about all failed ranks.
- Returns:
- ``None``.
- Example::
- >>> # Note: Process group initialization omitted on each rank.
- >>> import torch.distributed as dist
- >>> if dist.get_rank() != 1:
- >>> dist.monitored_barrier() # Raises exception indicating that
- >>> # rank 1 did not call into monitored_barrier.
- >>> # Example with wait_all_ranks=True
- >>> if dist.get_rank() == 0:
- >>> dist.monitored_barrier(wait_all_ranks=True) # Raises exception
- >>> # indicating that ranks 1, 2, ... world_size - 1 did not call into
- >>> # monitored_barrier.
- """
- # Need to call rank not in group before using the group, otherwise
- # "Invalid process group" error is raised.
- if _rank_not_in_group(group):
- _warn_not_in_group("monitored_barrier")
- return
- if get_backend(group) != Backend.GLOO:
- raise RuntimeError("monitored_barrier is only implemented for GLOO backend.")
- if timeout is None:
- timeout = default_pg_timeout
- group_to_use = _get_default_group() if group is None else group
- return group_to_use.monitored_barrier(timeout, wait_all_ranks=wait_all_ranks)
- def _create_process_group_wrapper(
- wrapped_pg: ProcessGroup,
- store_prefix: str,
- store: Store,
- rank: int,
- world_size: int,
- timeout: timedelta = default_pg_timeout,
- ):
- # Create a separate prefix store for the helper process group.
- prefix = f"{PG_WRAPPER_STORE_PREFIX}:{store_prefix}"
- store = PrefixStore(prefix, store)
- helper_pg = ProcessGroupGloo(store, rank, world_size, timeout=timeout)
- # Wrap the underlying pg with ProcessGroupWrapper.
- wrapped_pg = _ProcessGroupWrapper(wrapped_pg, helper_pg)
- return wrapped_pg
- def new_group(ranks=None, timeout=default_pg_timeout, backend=None, pg_options=None):
- """
- Creates a new distributed group.
- This function requires that all processes in the main group (i.e. all
- processes that are part of the distributed job) enter this function, even
- if they are not going to be members of the group. Additionally, groups
- should be created in the same order in all processes.
- .. warning::
- Using multiple process groups with the ``NCCL`` backend concurrently
- is not safe and the user should perform explicit synchronization in
- their application to ensure only one process group is used at a time.
- This means collectives from one process group should have completed
- execution on the device (not just enqueued since CUDA execution is
- async) before collectives from another process group are enqueued.
- See `Using multiple NCCL communicators concurrently <https://docs.nvid
- ia.com/deeplearning/nccl/user-guide/docs/usage/communicators.html#using
- -multiple-nccl-communicators-concurrently>`_ for more details.
- Args:
- ranks (list[int]): List of ranks of group members. If ``None``, will be
- set to all ranks. Default is ``None``.
- timeout (timedelta, optional): Timeout for operations executed against
- the process group. Default value equals 30 minutes.
- This is applicable for the ``gloo`` backend. For ``nccl``, this is
- applicable only if the environment variable ``NCCL_BLOCKING_WAIT``
- or ``NCCL_ASYNC_ERROR_HANDLING`` is set to 1. When
- ``NCCL_BLOCKING_WAIT`` is set, this is the duration for which the
- process will block and wait for collectives to complete before
- throwing an exception. When ``NCCL_ASYNC_ERROR_HANDLING`` is set,
- this is the duration after which collectives will be aborted
- asynchronously and the process will crash. ``NCCL_BLOCKING_WAIT``
- will provide errors to the user which can be caught and handled,
- but due to its blocking nature, it has a performance overhead. On
- the other hand, ``NCCL_ASYNC_ERROR_HANDLING`` has very little
- performance overhead, but crashes the process on errors. This is
- done since CUDA execution is async and it is no longer safe to
- continue executing user code since failed async NCCL operations
- might result in subsequent CUDA operations running on corrupted
- data. Only one of these two environment variables should be set.
- backend (str or Backend, optional): The backend to use. Depending on
- build-time configurations, valid values are ``gloo`` and ``nccl``.
- By default uses the same backend as the global group. This field
- should be given as a lowercase string (e.g., ``"gloo"``), which can
- also be accessed via :class:`Backend` attributes (e.g.,
- ``Backend.GLOO``). If ``None`` is passed in, the backend
- corresponding to the default process group will be used. Default is
- ``None``.
- pg_options (ProcessGroupOptions, optional): process group options
- specifying what additional options need to be passed in during
- the construction of specific process groups. i.e. for the ``nccl``
- backend, ``is_high_priority_stream`` can be specified so that
- process group can pick up high priority cuda streams.
- Returns:
- A handle of distributed group that can be given to collective calls.
- """
- global _pg_group_ranks
- default_pg = _get_default_group()
- default_backend, default_store = _pg_map[default_pg]
- global_rank = default_pg.rank()
- global_world_size = default_pg.size()
- # Default to the same backend as the global process group
- # if the backend is not specified.
- if not backend:
- backend = default_backend
- # checks the input ranks
- if ranks is not None:
- ranks = sorted(ranks)
- group_world_size = len(ranks)
- if group_world_size > global_world_size:
- raise RuntimeError(
- "the new group's world size should be less or "
- "equal to the world size set by "
- "init_process_group"
- )
- # check ranks' sanity
- for rank in ranks:
- if rank < 0 or rank >= global_world_size:
- raise RuntimeError(
- "The new group's rank should be within the "
- "the world_size set by init_process_group"
- )
- if global_rank in ranks:
- group_rank = ranks.index(global_rank)
- else:
- group_rank = None
- else:
- ranks = list(range(global_world_size))
- group_world_size = global_world_size
- group_rank = global_rank
- backend = Backend(backend)
- pg = _new_process_group_helper(
- group_world_size,
- group_rank,
- ranks,
- backend,
- default_store,
- pg_options=pg_options,
- timeout=timeout,
- )
- # Create the global rank to group rank mapping
- _pg_group_ranks[pg] = {
- global_rank: group_rank for group_rank, global_rank in enumerate(ranks)
- }
- # barrier at the end to ensure that once we return from this method, all
- # process groups including global variables are updated correctly on all
- # ranks.
- if backend == Backend.MPI:
- # MPI doesn't have store.
- barrier()
- else:
- # Use store based barrier here since barrier() used a bunch of
- # default devices and messes up NCCL internal state.
- _store_based_barrier(global_rank, default_store, timeout)
- # Set sequence numbers for gloo and nccl process groups.
- if pg != GroupMember.NON_GROUP_MEMBER and get_backend(pg) in [
- Backend.GLOO,
- Backend.NCCL,
- ]:
- pg._set_sequence_number_for_group()
- return pg
- def new_subgroups(
- group_size=None,
- group=None,
- timeout=default_pg_timeout,
- backend=None,
- pg_options=None,
- ):
- """
- Creates GPU subgroups of equal size. By default, it creates intra-machine subgroups,
- where each of which contains all the ranks of a machine, based on the assumption
- that each machine has the same number of CUDA devices.
- This is a convenience API that calls ``new_group`` to generate multiple subgroups.
- It requires that all processes in the main group (i.e. all
- processes that are part of the distributed job) enter this function, even
- if they are not going to be members of the group.
- .. warning::
- This API only works when CUDA is available.
- .. warning::
- If ``group_size`` is passed in, the world size must be divisible by ``group_size``.
- If no ``group_size`` is passed in, and not all the machines have the same number
- of devices, the subgroup division will be different across nodes and can cause
- unexpected behaviors.
- .. warning::
- Using multiple process groups with the ``NCCL`` backend concurrently
- is not safe and the user should perform explicit synchronization in
- their application to ensure only one process group is used at a time.
- This means collectives from one process group should have completed
- execution on the device (not just enqueued since CUDA execution is
- async) before collectives from another process group are enqueued.
- See `Using multiple NCCL communicators concurrently <https://docs.nvid
- ia.com/deeplearning/nccl/user-guide/docs/usage/communicators.html#using
- -multiple-nccl-communicators-concurrently>`_ for more details.
- Args:
- group_size (int, optional): The size of each subgroup. If ``None``,
- the default subgroup size is equal to the number of devices on each machine,
- based on the assumption that each machine has exactly the same
- number of devices. Default is ``None``.
- timeout (timedelta, optional): Timeout for operations executed against
- the process group. Default value equals 30 minutes.
- This is applicable for the ``gloo`` backend. For ``nccl``, this is
- applicable only if the environment variable ``NCCL_BLOCKING_WAIT``
- or ``NCCL_ASYNC_ERROR_HANDLING`` is set to 1. When
- ``NCCL_BLOCKING_WAIT`` is set, this is the duration for which the
- process will block and wait for collectives to complete before
- throwing an exception. When ``NCCL_ASYNC_ERROR_HANDLING`` is set,
- this is the duration after which collectives will be aborted
- asynchronously and the process will crash. ``NCCL_BLOCKING_WAIT``
- will provide errors to the user which can be caught and handled,
- but due to its blocking nature, it has a performance overhead. On
- the other hand, ``NCCL_ASYNC_ERROR_HANDLING`` has very little
- performance overhead, but crashes the process on errors. This is
- done since CUDA execution is async and it is no longer safe to
- continue executing user code since failed async NCCL operations
- might result in subsequent CUDA operations running on corrupted
- data. Only one of these two environment variables should be set.
- backend (str or Backend, optional): The backend to use. Depending on
- build-time configurations, valid values are ``gloo`` and ``nccl``.
- By default uses the same backend as the global group. This field
- should be given as a lowercase string (e.g., ``"gloo"``), which can
- also be accessed via :class:`Backend` attributes (e.g.,
- ``Backend.GLOO``). If ``None`` is passed in, the backend
- corresponding to the default process group will be used. Default is
- ``None``.
- pg_options (ProcessGroupOptions, optional): process group options
- specifying what additional options need to be passed in during
- the construction of specific process groups. i.e. for the ``nccl``
- backend, ``is_high_priority_stream`` can be specified so that
- process group can pick up high priority cuda streams.
- Returns:
- The subgroup containing the current rank, and all the subgroups used for cleanup.
- Examples:
- >>> # Create intra-machine subgroups.
- >>> cur_subgroup, subgroups = dist.new_subgroups()
- >>> # Allreduce within the machine.
- >>> rank = dist.get_rank()
- >>> tensor = torch.ones(1, device=rank) * rank
- >>> dist.all_reduce(tensor, group=cur_subgroup)
- >>> tensor
- tensor([8]) # Assume 8 is the number of CUDA devices per machine.
- >>> # Cleanup.
- >>> for subgroup in subgroups:
- >>> dist.destroy_process_group(subgroup)
- """
- if not torch.cuda.is_available():
- raise ValueError("Subgroups can only be created when CUDA is available")
- if group_size is None:
- group_size = torch.cuda.device_count()
- world_size = get_world_size()
- if world_size < group_size:
- raise ValueError("The arg 'group_size' must not exceed the world size")
- if world_size % group_size != 0:
- raise ValueError("The world size must be divisible by 'group_size'")
- subgroups = []
- cur_subgroup = None
- for subgroup_id in range(world_size // group_size):
- start_rank = subgroup_id * group_size
- end_rank = start_rank + group_size
- ranks_in_subgroup = list(range(start_rank, end_rank))
- subgroup = new_group(
- ranks=ranks_in_subgroup,
- timeout=timeout,
- backend=backend,
- pg_options=pg_options,
- )
- subgroups.append(subgroup)
- rank = get_rank()
- if rank in ranks_in_subgroup:
- cur_subgroup = subgroup
- logger.info(
- "Rank {} is assigned to subgroup {}".format(rank, ranks_in_subgroup)
- )
- return cur_subgroup, subgroups
- def new_subgroups_by_enumeration(
- ranks_per_subgroup_list,
- timeout=default_pg_timeout,
- backend=None,
- pg_options=None,
- ):
- """
- Creates GPU subgroups by dividing the global world, where the division is specified by
- a nested list of ranks. The subgroups cannot have overlap, and some ranks may not have
- to be in any subgroup.
- This is a convenience API that calls ``new_group`` to generate multiple subgroups.
- It requires that all processes in the main group (i.e. all
- processes that are part of the distributed job) enter this function, even
- if they are not going to be members of the group.
- .. warning::
- Using multiple process groups with the ``NCCL`` backend concurrently
- is not safe and the user should perform explicit synchronization in
- their application to ensure only one process group is used at a time.
- This means collectives from one process group should have completed
- execution on the device (not just enqueued since CUDA execution is
- async) before collectives from another process group are enqueued.
- See `Using multiple NCCL communicators concurrently <https://docs.nvid
- ia.com/deeplearning/nccl/user-guide/docs/usage/communicators.html#using
- -multiple-nccl-communicators-concurrently>`_ for more details.
- Args:
- ranks_per_subgroup_list (list[list[int]]): A nested list of ranks of
- group members.
- timeout (timedelta, optional): Timeout for operations executed against
- the process group. Default value equals 30 minutes.
- This is applicable for the ``gloo`` backend. For ``nccl``, this is
- applicable only if the environment variable ``NCCL_BLOCKING_WAIT``
- or ``NCCL_ASYNC_ERROR_HANDLING`` is set to 1. When
- ``NCCL_BLOCKING_WAIT`` is set, this is the duration for which the
- process will block and wait for collectives to complete before
- throwing an exception. When ``NCCL_ASYNC_ERROR_HANDLING`` is set,
- this is the duration after which collectives will be aborted
- asynchronously and the process will crash. ``NCCL_BLOCKING_WAIT``
- will provide errors to the user which can be caught and handled,
- but due to its blocking nature, it has a performance overhead. On
- the other hand, ``NCCL_ASYNC_ERROR_HANDLING`` has very little
- performance overhead, but crashes the process on errors. This is
- done since CUDA execution is async and it is no longer safe to
- continue executing user code since failed async NCCL operations
- might result in subsequent CUDA operations running on corrupted
- data. Only one of these two environment variables should be set.
- backend (str or Backend, optional): The backend to use. Depending on
- build-time configurations, valid values are ``gloo`` and ``nccl``.
- By default uses the same backend as the global group. This field
- should be given as a lowercase string (e.g., ``"gloo"``), which can
- also be accessed via :class:`Backend` attributes (e.g.,
- ``Backend.GLOO``). If ``None`` is passed in, the backend
- corresponding to the default process group will be used. Default is
- ``None``.
- pg_options (ProcessGroupOptions, optional): process group options
- specifying what additional options need to be passed in during
- the construction of specific process groups. i.e. for the ``nccl``
- backend, ``is_high_priority_stream`` can be specified so that
- process group can pick up high priority cuda streams.
- Returns:
- The subgroup containing the current rank, and all the subgroups used for cleanup.
- Examples:
- >>> # Create two subgroups, where each has 2 processes.
- >>> cur_subgroup, subgroups = dist.new_subgroups(ranks=[[0, 2], [1, 3]])
- >>> rank = dist.get_rank()
- >>> tensor = torch.ones(1, device=rank) * rank
- >>> dist.all_reduce(tensor, group=cur_subgroup)
- >>> tensor
- tensor([2]) # Subgroup 0: ranks 0 and 2
- tensor([4]) # Subgroup 1: ranks 1 and 3
- """
- if not torch.cuda.is_available():
- raise ValueError("Subgroups can only be created when CUDA is available")
- if ranks_per_subgroup_list is None or len(ranks_per_subgroup_list) == 0:
- raise ValueError("The arg 'ranks_per_subgroup_list' cannot be empty")
- world_size = get_world_size()
- subgroups = []
- cur_subgroup = None
- # Create a mapping from rank to subgroup to check if there is any subgroup overlap.
- rank_to_ranks_dict = {} # type: ignore[var-annotated]
- for ranks in ranks_per_subgroup_list:
- subgroup = new_group(
- ranks=ranks,
- timeout=timeout,
- backend=backend,
- pg_options=pg_options,
- )
- subgroups.append(subgroup)
- my_rank = get_rank()
- for rank in ranks:
- if rank in rank_to_ranks_dict:
- raise ValueError(
- "Rank {} has appeared in both subgroup {} and {}".format(
- rank, rank_to_ranks_dict[rank], ranks
- )
- )
- rank_to_ranks_dict[rank] = ranks
- if my_rank == rank:
- cur_subgroup = subgroup
- logger.info("Rank {} is assigned to subgroup {}".format(rank, ranks))
- return cur_subgroup, subgroups
|