utils.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. import collections
  2. import torch
  3. import torch.distributed as dist
  4. from torch.nn.parallel._functions import _get_stream
  5. from torch.nn.parallel.scatter_gather import ( # type: ignore[attr-defined]
  6. is_namedtuple as _is_namedtuple
  7. )
  8. from typing import Dict, Any, List
  9. __all__ = [] # type: ignore[var-annotated]
  10. def _recursive_to(inputs, target_gpu, use_side_stream_for_tensor_copies):
  11. r"""
  12. Recursively moves input to the target_gpu.
  13. """
  14. def to_map(obj):
  15. if isinstance(obj, torch.Tensor):
  16. if obj.device == torch.device("cuda", target_gpu):
  17. return (obj,)
  18. if not use_side_stream_for_tensor_copies:
  19. return (obj.to(target_gpu),)
  20. else:
  21. # Perform CPU -> GPU copies in a background stream. This code is
  22. # motivated from similar logic in torch/nn/parallel/_functions.py
  23. stream = _get_stream(target_gpu)
  24. with torch.cuda.stream(stream):
  25. output = obj.to(target_gpu)
  26. # synchronize with the copy stream
  27. with torch.cuda.device(target_gpu):
  28. current_stream = torch.cuda.current_stream()
  29. # Sync the current stream with the copy stream
  30. current_stream.wait_stream(stream)
  31. # Ensure tensor memory is not reused until work on
  32. # main stream is complete
  33. output.record_stream(current_stream) # type: ignore[arg-type]
  34. return (output,)
  35. if _is_namedtuple(obj):
  36. return [type(obj)(*args) for args in zip(*map(to_map, obj))]
  37. if isinstance(obj, tuple) and len(obj) > 0:
  38. return list(zip(*map(to_map, obj)))
  39. if isinstance(obj, str):
  40. # Needs to be checked, otherwise it's taken as a sequence infinitely.
  41. # This is because the elements of a string are also strings, and so on.
  42. return [obj]
  43. if isinstance(obj, collections.abc.Sequence) and len(obj) > 0:
  44. try:
  45. return [type(obj)(i) for i in zip(*map(to_map, obj))] # type: ignore[call-arg]
  46. except TypeError:
  47. # The sequence type may not support `__init__(iterable)` (e.g., `range`).
  48. return [list(i) for i in zip(*map(to_map, obj))]
  49. if isinstance(obj, collections.abc.Mapping) and len(obj) > 0:
  50. try:
  51. return [type(obj)(i) for i in zip(*map(to_map, obj.items()))] # type: ignore[call-arg]
  52. except TypeError:
  53. # The mapping type may not support `__init__(iterable)`.
  54. return [dict(i) for i in zip(*map(to_map, obj.items()))]
  55. return [obj]
  56. # Avoid reference cycle
  57. try:
  58. res = to_map(inputs)
  59. finally:
  60. to_map = None # type: ignore[assignment]
  61. return res
  62. def _to_kwargs(inputs, kwargs, device_id, use_side_stream_for_tensor_copies):
  63. inputs = (
  64. _recursive_to(inputs, device_id, use_side_stream_for_tensor_copies)
  65. if inputs
  66. else []
  67. )
  68. kwargs = (
  69. _recursive_to(kwargs, device_id, use_side_stream_for_tensor_copies)
  70. if kwargs
  71. else []
  72. )
  73. if len(inputs) < len(kwargs):
  74. inputs.extend([() for _ in range(len(kwargs) - len(inputs))])
  75. elif len(kwargs) < len(inputs):
  76. kwargs.extend([{} for _ in range(len(inputs) - len(kwargs))])
  77. inputs = tuple(inputs)
  78. kwargs = tuple(kwargs)
  79. return inputs, kwargs
  80. def _verify_param_shape_across_processes(process_group, tensors, logger=None):
  81. return dist._verify_params_across_processes(process_group, tensors, logger)
  82. def _sync_module_states(
  83. module,
  84. process_group,
  85. broadcast_bucket_size,
  86. src,
  87. params_and_buffers_to_ignore,
  88. ):
  89. """
  90. Syncs ``module``'s parameters and buffers state so that all ranks contain
  91. the same module state across all ranks. Note that this API assumes that all
  92. parameter shapes are consistent before running the synchronization. This can
  93. be checked with ``_verify_param_shape_across_processes``.
  94. """
  95. module_states = []
  96. for name, param in module.named_parameters():
  97. if name not in params_and_buffers_to_ignore:
  98. module_states.append(param.detach())
  99. for name, buffer in module.named_buffers():
  100. if name not in params_and_buffers_to_ignore:
  101. module_states.append(buffer.detach())
  102. _sync_params_and_buffers(
  103. process_group,
  104. module_states,
  105. broadcast_bucket_size,
  106. src
  107. )
  108. def _sync_params_and_buffers(
  109. process_group: dist.ProcessGroup,
  110. module_states: List[torch.Tensor],
  111. broadcast_bucket_size: int,
  112. src: int,
  113. ):
  114. """
  115. Synchronizes ``module_states`` (list of tensors) across all processes by
  116. broadcasting them from rank 0.
  117. """
  118. if len(module_states) > 0:
  119. dist._broadcast_coalesced(
  120. process_group, module_states, broadcast_bucket_size, src
  121. )
  122. def _replace_by_prefix(
  123. state_dict: Dict[str, Any],
  124. old_prefix: str,
  125. new_prefix: str,
  126. ) -> None:
  127. """
  128. Replace all keys that match a given old_prefix with a new_prefix (in-place).
  129. Usage::
  130. state_dict = {"layer.xyz": torch.tensor(1)}
  131. replace_by_prefix_(state_dict, "layer.", "module.layer.")
  132. assert state_dict == {"module.layer.xyz": torch.tensor(1)}
  133. """
  134. if old_prefix == new_prefix:
  135. raise ValueError("old_prefix and new_prefix must be distinct")
  136. for key in list(state_dict.keys()):
  137. if not key.startswith(old_prefix):
  138. continue
  139. new_key = new_prefix + key[len(old_prefix) :]
  140. state_dict[new_key] = state_dict[key]
  141. del state_dict[key]