muji.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. ## @package muji
  2. # Module caffe2.python.muji
  3. """muji.py does multi-gpu training for caffe2 with no need to change the c++
  4. side code. Everything is defined on the computation graph level.
  5. We support the following use cases:
  6. - 2 gpus, where peer access is enabled between them.
  7. - 4 gpus, where peer access are enabled between all of them.
  8. - 4 gpus, where peer access are enabled in two groups,
  9. between {1, 2} and {3, 4}
  10. - 8 gpus, where peer access are enabled in two groups,
  11. between {1, 2, 3, 4} and {5, 6, 7, 8}.
  12. If above cases are not satisfied, a fallback function which does not rely on
  13. peer access will be called.
  14. """
  15. import numpy as np
  16. from caffe2.proto import caffe2_pb2
  17. from caffe2.python import workspace
  18. def OnGPU(gpu_id):
  19. """A utility function that returns a device option protobuf of the
  20. specified gpu id.
  21. """
  22. device_option = caffe2_pb2.DeviceOption()
  23. device_option.device_type = workspace.GpuDeviceType
  24. device_option.device_id = gpu_id
  25. return device_option
  26. def OnCPU():
  27. device_option = caffe2_pb2.DeviceOption()
  28. device_option.device_type = caffe2_pb2.CPU
  29. return device_option
  30. def Allreduce(net, blobs, reduced_affix="_reduced", gpu_indices=None):
  31. """The general Allreduce interface that reroutes the function calls.
  32. CPUs and AMD GPUs are not supported because
  33. GetGpuPeerAccessPattern is called to get gpu peer access pattern.
  34. """
  35. if gpu_indices is None:
  36. gpu_indices = list(range(len(blobs)))
  37. if len(gpu_indices) != len(blobs):
  38. raise RuntimeError(
  39. "gpu_indices length and blobs length mismatch: %d vs %d" %
  40. (len(gpu_indices), len(blobs))
  41. )
  42. pattern = workspace.GetGpuPeerAccessPattern()
  43. if len(blobs) == 2 and pattern.shape[0] >= 2 and np.all(pattern[:2, :2]):
  44. return Allreduce2(net, blobs, reduced_affix, gpu_indices)
  45. elif len(blobs) == 4 and pattern.shape[0] >= 4 and np.all(pattern[:4, :4]):
  46. return Allreduce4(net, blobs, reduced_affix, gpu_indices)
  47. elif len(blobs) == 4 and pattern.shape[0] >= 4 and np.all(pattern[:2, :2]) and np.all(pattern[2:4, 2:4]):
  48. return Allreduce4Group2(net, blobs, reduced_affix, gpu_indices)
  49. elif len(blobs) == 8 and pattern.shape[0] >= 8 and np.all(pattern[:8, :8]):
  50. return Allreduce8(net, blobs, reduced_affix, gpu_indices)
  51. else:
  52. return AllreduceFallback(net, blobs, reduced_affix, gpu_indices)
  53. def Allreduce2(net, blobs, reduced_affix, gpu_indices):
  54. """Allreduce for 2 gpus.
  55. Algorithm: 0r <- 0 + 1, 1r <- 0r, where r means "reduced"
  56. """
  57. a, b = blobs
  58. gpu_a, gpu_b = gpu_indices
  59. a_reduced = net.Add([a, b], a + reduced_affix, device_option=OnGPU(gpu_a))
  60. b_reduced = a_reduced.Copy(
  61. [],
  62. b + reduced_affix,
  63. device_option=OnGPU(gpu_b)
  64. )
  65. return a_reduced, b_reduced
  66. def Allreduce4(net, blobs, reduced_affix, gpu_indices):
  67. """Allreduce for 4 gpus.
  68. Algorithm: 2 level reduction.
  69. 0r <- 0 + 1, 2r <- 2 + 3
  70. 0r <- 0r + 2r
  71. 2r <- 0r,
  72. 1r <- 0r, 3r <- 2r
  73. """
  74. a, b, c, d = blobs
  75. gpu_a, gpu_b, gpu_c, gpu_d = gpu_indices
  76. # a_reduced <- a+b, c_reduced <- c + d
  77. a_reduced = net.Add(
  78. [a, b],
  79. str(a) + reduced_affix,
  80. device_option=OnGPU(gpu_a)
  81. )
  82. c_reduced = net.Add(
  83. [c, d],
  84. str(c) + reduced_affix,
  85. device_option=OnGPU(gpu_c)
  86. )
  87. # a_reduced <- a_reduced + c_reduced
  88. a_reduced = a_reduced.Add(c_reduced, a_reduced, device_option=OnGPU(gpu_a))
  89. # broadcast a_reduced to c_reduced
  90. c_reduced = a_reduced.Copy([], c_reduced, device_option=OnGPU(gpu_c))
  91. # broadcast to b and d
  92. b_reduced = a_reduced.Copy(
  93. [],
  94. str(b) + reduced_affix,
  95. device_option=OnGPU(gpu_b)
  96. )
  97. d_reduced = c_reduced.Copy(
  98. [],
  99. str(d) + reduced_affix,
  100. device_option=OnGPU(gpu_d)
  101. )
  102. return a_reduced, b_reduced, c_reduced, d_reduced
  103. def Allreduce4Group2(net, blobs, reduced_affix, gpu_indices):
  104. """Allreduce for 4 gpus where peer access are enabled in {0,1} and {2,3}
  105. Algorithm: 2 level reduction.
  106. 0r <- 0 + 1, 2r <- 2 + 3
  107. 0r <- 0r + 2r
  108. 2r <- 0r,
  109. 1r <- 0r, 3r <- 2r
  110. """
  111. a, b, c, d = blobs
  112. gpu_a, gpu_b, gpu_c, gpu_d = gpu_indices
  113. # a_reduced <- a+b, c_reduced <- c + d
  114. a_reduced = net.Add(
  115. [a, b],
  116. str(a) + reduced_affix,
  117. device_option=OnGPU(gpu_a)
  118. )
  119. c_reduced = net.Add(
  120. [c, d],
  121. str(c) + reduced_affix,
  122. device_option=OnGPU(gpu_c)
  123. )
  124. # copy from c_reduce(gpu_c) to c_reduce_copy(gpu_a)
  125. c_reduced_copy = c_reduced.Copy(
  126. [],
  127. str(c_reduced) + '_copy',
  128. device_option=OnGPU(gpu_a)
  129. )
  130. # a_reduced <- a_reduced + c_reduced_copy
  131. a_reduced = a_reduced.Add(c_reduced_copy, a_reduced, device_option=OnGPU(gpu_a))
  132. # broadcast a_reduced to c_reduced
  133. c_reduced = a_reduced.Copy([], c_reduced, device_option=OnGPU(gpu_c))
  134. # broadcast to b and d
  135. b_reduced = a_reduced.Copy(
  136. [],
  137. str(b) + reduced_affix,
  138. device_option=OnGPU(gpu_b)
  139. )
  140. d_reduced = c_reduced.Copy(
  141. [],
  142. str(d) + reduced_affix,
  143. device_option=OnGPU(gpu_d)
  144. )
  145. return a_reduced, b_reduced, c_reduced, d_reduced
  146. def Allreduce8(net, blobs, reduced_affix, gpu_indices):
  147. """Allreduce for 8 gpus.
  148. Algorithm: 3 level reduction.
  149. 0r <- 0 + 1, 2r <- 2 + 3, 4r <- 4 + 5, 6r <- 6 + 7
  150. 0r <- 0r + 2r, 4r <- 4r + 6r
  151. 0r <- 0r + 4r
  152. 4r <- 0r
  153. 2r <- 0r, 6r <- 4r
  154. 1r <- 0r, 3r <- 2r, 5r <- 4r, 7r <- 6r
  155. """
  156. reduced = [None] * 8
  157. # Reduction level 1
  158. for i in [0, 2, 4, 6]:
  159. reduced[i] = net.Add(
  160. [blobs[i], blobs[i + 1]],
  161. blobs[i] + reduced_affix,
  162. device_option=OnGPU(gpu_indices[i])
  163. )
  164. # Reduction level 2
  165. for i in [0, 4]:
  166. reduced[i] = net.Add(
  167. [reduced[i], reduced[i + 2]],
  168. str(blobs[i]) + reduced_affix,
  169. device_option=OnGPU(gpu_indices[i])
  170. )
  171. # Reduction level 3: this involves a copy.
  172. reduced_4_copy = reduced[4].Copy(
  173. [],
  174. str(reduced[4]) + '_copy',
  175. device_option=OnGPU(gpu_indices[0])
  176. )
  177. reduced[0] = reduced[0].Add(
  178. reduced_4_copy,
  179. reduced[0],
  180. device_option=OnGPU(gpu_indices[0])
  181. )
  182. # Broadcast level 1
  183. reduced[4] = reduced[0].Copy(
  184. [],
  185. reduced[4],
  186. device_option=OnGPU(gpu_indices[4])
  187. )
  188. # Broadcast level 2
  189. for i in [2, 6]:
  190. reduced[i] = reduced[i - 2].Copy(
  191. [],
  192. reduced[i],
  193. device_option=OnGPU(gpu_indices[i])
  194. )
  195. # Broadcast level 3
  196. for i in [1, 3, 5, 7]:
  197. reduced[i] = reduced[i - 1].Copy(
  198. [],
  199. blobs[i] + reduced_affix,
  200. device_option=OnGPU(gpu_indices[i])
  201. )
  202. return reduced
  203. def AllreduceFallback(net, blobs, reduced_affix, gpu_indices):
  204. """A fallback option for Allreduce with no assumption on p2p.
  205. Algorithm: a flat operation on gpu 0
  206. 0r <- 0
  207. 0r <- 0r + i for i in gpu_indices[1:]
  208. ir <- 0r for i in gpu_indices[1:]
  209. """
  210. reduced = [None] * len(gpu_indices)
  211. if reduced_affix != '':
  212. # copy first
  213. reduced[0] = net.Copy(
  214. blobs[0],
  215. blobs[0] + reduced_affix,
  216. device_option=OnGPU(gpu_indices[0])
  217. )
  218. else:
  219. reduced[0] = blobs[0]
  220. # do temp copy and add
  221. temp_name = reduced[0] + '_temp_copy'
  222. for i in range(1, len(gpu_indices)):
  223. temp = net.Copy(
  224. blobs[i],
  225. temp_name,
  226. device_option=OnGPU(gpu_indices[0])
  227. )
  228. reduced[0] = net.Add(
  229. [temp, reduced[0]],
  230. reduced[0],
  231. device_option=OnGPU(gpu_indices[0])
  232. )
  233. # Broadcast to everyone else
  234. for i in range(1, len(gpu_indices)):
  235. reduced[i] = net.Copy(
  236. reduced[0],
  237. blobs[i] + reduced_affix,
  238. device_option=OnGPU(gpu_indices[i])
  239. )
  240. return reduced