TraceUtils.h 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. #pragma once
  2. #include <c10/util/irange.h>
  3. #include <c10d/Store.hpp>
  4. #include <c10d/Types.hpp>
  5. #include <sys/types.h>
  6. #include <cstdlib>
  7. #include <string>
  8. #include <system_error>
  9. #include <vector>
  10. namespace c10d {
  11. inline std::string getTraceStartKey(const std::string& pgName, int rank) {
  12. return pgName + "_" + std::to_string(rank) + "_trace_start";
  13. }
  14. inline std::string getTraceEndKey(const std::string& pgName, int rank) {
  15. return pgName + "_" + std::to_string(rank) + "_trace_end";
  16. }
  17. inline bool traceUpdate(
  18. c10::intrusive_ptr<Store>& store,
  19. const std::string& key,
  20. uint64_t seq,
  21. const std::string& col) {
  22. std::vector<uint8_t> value(col.size() + sizeof(seq) + 1);
  23. memcpy(value.data(), &seq, sizeof(seq));
  24. memcpy(value.data() + sizeof(seq), col.data(), col.size());
  25. try {
  26. store->set(key, value);
  27. return true;
  28. } catch (...) {
  29. LOG(ERROR) << "Store is down while updating #" << seq << " with key "
  30. << key;
  31. return false;
  32. }
  33. return true;
  34. }
  35. enum TraceDebugEvent {
  36. kEventStart,
  37. kEventEnd,
  38. };
  39. // <seq, <rank, <col, start/end>>>
  40. using TraceMap =
  41. std::map<uint64_t, std::map<int, std::pair<std::string, TraceDebugEvent>>>;
  42. inline std::string ranksToString(const std::vector<int>& ranks) {
  43. std::string str;
  44. for (int rank : ranks) {
  45. if (str.empty()) {
  46. str = std::to_string(rank);
  47. } else {
  48. str += ", " + std::to_string(rank);
  49. }
  50. }
  51. return str;
  52. }
  53. inline std::string ranksFromTrace(
  54. const std::vector<std::pair<int, std::string>>& items) {
  55. std::string ranks;
  56. for (auto& p : items) {
  57. if (ranks.empty()) {
  58. ranks = std::to_string(p.first);
  59. } else {
  60. ranks += ", " + std::to_string(p.first);
  61. }
  62. }
  63. return ranks;
  64. }
  65. inline std::string analyzeMissingRanks(const std::vector<int>& missingRanks) {
  66. return c10::str(
  67. "\n\t - To our best knowledge, ranks [",
  68. ranksToString(missingRanks),
  69. "] are the lagging ranks that caused this timeout. "
  70. "They never joined any collectives");
  71. }
  72. inline std::string analyzeLaggingRanks(const TraceMap& traceMap) {
  73. uint64_t lagSeq = traceMap.begin()->first;
  74. std::vector<int> startRanks;
  75. std::vector<int> endRanks;
  76. for (auto& p : traceMap.begin()->second) {
  77. if (p.second.second == kEventStart) {
  78. startRanks.push_back(p.first);
  79. } else {
  80. endRanks.push_back(p.first);
  81. }
  82. }
  83. std::string report =
  84. "\n\t - To our best knowledge, the lagging/dead/mismatched ranks "
  85. "that caused the desync are:";
  86. if (startRanks.size()) {
  87. report += c10::str(
  88. "\n\t - [",
  89. ranksToString(startRanks),
  90. "] joined but didn't finish collective #",
  91. lagSeq,
  92. " (count from 1)");
  93. }
  94. if (endRanks.size()) {
  95. report += c10::str(
  96. "\n\t [",
  97. ranksToString(endRanks),
  98. "] finished collective #",
  99. lagSeq,
  100. ", but didn't join collective #",
  101. lagSeq + 1,
  102. " (count from 1)");
  103. }
  104. return report;
  105. }
  106. inline std::string dumpSnapshot(TraceMap& traceMap) {
  107. std::string report = "\n\t - Snapshot of ranks' latest states:";
  108. for (auto& tracePair : traceMap) {
  109. uint64_t seq = tracePair.first;
  110. std::map<int, std::pair<std::string, TraceDebugEvent>>& subMap =
  111. tracePair.second;
  112. std::unordered_map<std::string, std::vector<int>> collectivesStart;
  113. std::unordered_map<std::string, std::vector<int>> collectivesEnd;
  114. for (auto& p : subMap) {
  115. int rank = p.first;
  116. const std::string& col = p.second.first;
  117. if (p.second.second == kEventStart) {
  118. collectivesStart[col].push_back(rank);
  119. } else {
  120. collectivesEnd[col].push_back(rank);
  121. }
  122. }
  123. if (collectivesStart.size()) {
  124. report += c10::str("\n\t #", seq, " started ranks:");
  125. for (auto& mapPair : collectivesStart) {
  126. report += c10::str(
  127. "\n\t [",
  128. ranksToString(mapPair.second),
  129. "] started ",
  130. mapPair.first);
  131. }
  132. }
  133. if (collectivesEnd.size()) {
  134. report += c10::str("\n\t #", seq, " finished ranks:");
  135. for (auto& mapPair : collectivesEnd) {
  136. report += c10::str(
  137. "\n\t [",
  138. ranksToString(mapPair.second),
  139. "] finished ",
  140. mapPair.first);
  141. }
  142. }
  143. }
  144. return report;
  145. }
  146. inline bool parseTraceValue(
  147. c10::intrusive_ptr<Store>& store,
  148. const std::string& key,
  149. uint64_t& seq,
  150. std::string& col) {
  151. try {
  152. std::vector<uint8_t> traceValue = store->get(key);
  153. memcpy(&seq, traceValue.data(), sizeof(seq));
  154. std::string colName((char*)traceValue.data() + sizeof(seq));
  155. col = colName;
  156. return true;
  157. } catch (...) {
  158. LOG(ERROR) << "Store is down while getting key " << key;
  159. return false;
  160. }
  161. return true;
  162. }
  163. inline std::string retrieveDesyncReport(
  164. c10::intrusive_ptr<Store>& store,
  165. const std::string& pgName,
  166. int myRank,
  167. int worldSize) {
  168. std::string report;
  169. uint64_t thisSeq;
  170. std::string thisCol;
  171. std::vector<int> missingRanks;
  172. TraceMap traceMap;
  173. for (const auto rank : c10::irange(worldSize)) {
  174. // Build traceMapStart.
  175. uint64_t seqStart;
  176. {
  177. std::string traceKeyStart = getTraceStartKey(pgName, rank);
  178. if (!store->check({traceKeyStart})) {
  179. missingRanks.push_back(rank);
  180. continue;
  181. }
  182. std::string col;
  183. if (!parseTraceValue(store, traceKeyStart, seqStart, col)) {
  184. return report;
  185. }
  186. traceMap[seqStart].emplace(rank, std::make_pair(col, kEventStart));
  187. if (rank == myRank) {
  188. thisSeq = seqStart;
  189. thisCol = std::move(col);
  190. }
  191. }
  192. // Build traceMapEnd.
  193. {
  194. std::string traceKeyEnd = getTraceEndKey(pgName, rank);
  195. if (!store->check({traceKeyEnd})) {
  196. continue;
  197. }
  198. uint64_t seq;
  199. std::string col;
  200. if (!parseTraceValue(store, traceKeyEnd, seq, col)) {
  201. return report;
  202. }
  203. if (seq == seqStart) {
  204. traceMap[seq][rank].second = kEventEnd;
  205. }
  206. }
  207. }
  208. TORCH_INTERNAL_ASSERT(
  209. !missingRanks.empty() || !traceMap.empty(),
  210. "Trace shouldn't be empty while enabled GLOO_ASYNC_TIMEOUT_DEBUG");
  211. TORCH_INTERNAL_ASSERT(
  212. !thisCol.empty(),
  213. "Timeout rank [",
  214. myRank,
  215. "] must have collective tracking iteam in c10::Store trace");
  216. TORCH_INTERNAL_ASSERT(
  217. traceMap[thisSeq][myRank].second == kEventStart,
  218. "Timeout rank [",
  219. myRank,
  220. "] last trace item must be kEventStart. thisSeq = ",
  221. thisSeq,
  222. ", col = ",
  223. thisCol);
  224. report += c10::str(
  225. "\n\t - [", myRank, "] Timeout at collective: ", thisCol, ", #", thisSeq);
  226. if (!missingRanks.empty()) {
  227. report += analyzeMissingRanks(missingRanks);
  228. } else {
  229. report += analyzeLaggingRanks(traceMap);
  230. report += dumpSnapshot(traceMap);
  231. }
  232. return report;
  233. }
  234. } // namespace c10d