| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- #pragma once
- #include <c10/util/irange.h>
- #include <c10d/Store.hpp>
- #include <c10d/Types.hpp>
- #include <sys/types.h>
- #include <cstdlib>
- #include <string>
- #include <system_error>
- #include <vector>
- namespace c10d {
- inline std::string getTraceStartKey(const std::string& pgName, int rank) {
- return pgName + "_" + std::to_string(rank) + "_trace_start";
- }
- inline std::string getTraceEndKey(const std::string& pgName, int rank) {
- return pgName + "_" + std::to_string(rank) + "_trace_end";
- }
- inline bool traceUpdate(
- c10::intrusive_ptr<Store>& store,
- const std::string& key,
- uint64_t seq,
- const std::string& col) {
- std::vector<uint8_t> value(col.size() + sizeof(seq) + 1);
- memcpy(value.data(), &seq, sizeof(seq));
- memcpy(value.data() + sizeof(seq), col.data(), col.size());
- try {
- store->set(key, value);
- return true;
- } catch (...) {
- LOG(ERROR) << "Store is down while updating #" << seq << " with key "
- << key;
- return false;
- }
- return true;
- }
- enum TraceDebugEvent {
- kEventStart,
- kEventEnd,
- };
- // <seq, <rank, <col, start/end>>>
- using TraceMap =
- std::map<uint64_t, std::map<int, std::pair<std::string, TraceDebugEvent>>>;
- inline std::string ranksToString(const std::vector<int>& ranks) {
- std::string str;
- for (int rank : ranks) {
- if (str.empty()) {
- str = std::to_string(rank);
- } else {
- str += ", " + std::to_string(rank);
- }
- }
- return str;
- }
- inline std::string ranksFromTrace(
- const std::vector<std::pair<int, std::string>>& items) {
- std::string ranks;
- for (auto& p : items) {
- if (ranks.empty()) {
- ranks = std::to_string(p.first);
- } else {
- ranks += ", " + std::to_string(p.first);
- }
- }
- return ranks;
- }
- inline std::string analyzeMissingRanks(const std::vector<int>& missingRanks) {
- return c10::str(
- "\n\t - To our best knowledge, ranks [",
- ranksToString(missingRanks),
- "] are the lagging ranks that caused this timeout. "
- "They never joined any collectives");
- }
- inline std::string analyzeLaggingRanks(const TraceMap& traceMap) {
- uint64_t lagSeq = traceMap.begin()->first;
- std::vector<int> startRanks;
- std::vector<int> endRanks;
- for (auto& p : traceMap.begin()->second) {
- if (p.second.second == kEventStart) {
- startRanks.push_back(p.first);
- } else {
- endRanks.push_back(p.first);
- }
- }
- std::string report =
- "\n\t - To our best knowledge, the lagging/dead/mismatched ranks "
- "that caused the desync are:";
- if (startRanks.size()) {
- report += c10::str(
- "\n\t - [",
- ranksToString(startRanks),
- "] joined but didn't finish collective #",
- lagSeq,
- " (count from 1)");
- }
- if (endRanks.size()) {
- report += c10::str(
- "\n\t [",
- ranksToString(endRanks),
- "] finished collective #",
- lagSeq,
- ", but didn't join collective #",
- lagSeq + 1,
- " (count from 1)");
- }
- return report;
- }
- inline std::string dumpSnapshot(TraceMap& traceMap) {
- std::string report = "\n\t - Snapshot of ranks' latest states:";
- for (auto& tracePair : traceMap) {
- uint64_t seq = tracePair.first;
- std::map<int, std::pair<std::string, TraceDebugEvent>>& subMap =
- tracePair.second;
- std::unordered_map<std::string, std::vector<int>> collectivesStart;
- std::unordered_map<std::string, std::vector<int>> collectivesEnd;
- for (auto& p : subMap) {
- int rank = p.first;
- const std::string& col = p.second.first;
- if (p.second.second == kEventStart) {
- collectivesStart[col].push_back(rank);
- } else {
- collectivesEnd[col].push_back(rank);
- }
- }
- if (collectivesStart.size()) {
- report += c10::str("\n\t #", seq, " started ranks:");
- for (auto& mapPair : collectivesStart) {
- report += c10::str(
- "\n\t [",
- ranksToString(mapPair.second),
- "] started ",
- mapPair.first);
- }
- }
- if (collectivesEnd.size()) {
- report += c10::str("\n\t #", seq, " finished ranks:");
- for (auto& mapPair : collectivesEnd) {
- report += c10::str(
- "\n\t [",
- ranksToString(mapPair.second),
- "] finished ",
- mapPair.first);
- }
- }
- }
- return report;
- }
- inline bool parseTraceValue(
- c10::intrusive_ptr<Store>& store,
- const std::string& key,
- uint64_t& seq,
- std::string& col) {
- try {
- std::vector<uint8_t> traceValue = store->get(key);
- memcpy(&seq, traceValue.data(), sizeof(seq));
- std::string colName((char*)traceValue.data() + sizeof(seq));
- col = colName;
- return true;
- } catch (...) {
- LOG(ERROR) << "Store is down while getting key " << key;
- return false;
- }
- return true;
- }
- inline std::string retrieveDesyncReport(
- c10::intrusive_ptr<Store>& store,
- const std::string& pgName,
- int myRank,
- int worldSize) {
- std::string report;
- uint64_t thisSeq;
- std::string thisCol;
- std::vector<int> missingRanks;
- TraceMap traceMap;
- for (const auto rank : c10::irange(worldSize)) {
- // Build traceMapStart.
- uint64_t seqStart;
- {
- std::string traceKeyStart = getTraceStartKey(pgName, rank);
- if (!store->check({traceKeyStart})) {
- missingRanks.push_back(rank);
- continue;
- }
- std::string col;
- if (!parseTraceValue(store, traceKeyStart, seqStart, col)) {
- return report;
- }
- traceMap[seqStart].emplace(rank, std::make_pair(col, kEventStart));
- if (rank == myRank) {
- thisSeq = seqStart;
- thisCol = std::move(col);
- }
- }
- // Build traceMapEnd.
- {
- std::string traceKeyEnd = getTraceEndKey(pgName, rank);
- if (!store->check({traceKeyEnd})) {
- continue;
- }
- uint64_t seq;
- std::string col;
- if (!parseTraceValue(store, traceKeyEnd, seq, col)) {
- return report;
- }
- if (seq == seqStart) {
- traceMap[seq][rank].second = kEventEnd;
- }
- }
- }
- TORCH_INTERNAL_ASSERT(
- !missingRanks.empty() || !traceMap.empty(),
- "Trace shouldn't be empty while enabled GLOO_ASYNC_TIMEOUT_DEBUG");
- TORCH_INTERNAL_ASSERT(
- !thisCol.empty(),
- "Timeout rank [",
- myRank,
- "] must have collective tracking iteam in c10::Store trace");
- TORCH_INTERNAL_ASSERT(
- traceMap[thisSeq][myRank].second == kEventStart,
- "Timeout rank [",
- myRank,
- "] last trace item must be kEventStart. thisSeq = ",
- thisSeq,
- ", col = ",
- thisCol);
- report += c10::str(
- "\n\t - [", myRank, "] Timeout at collective: ", thisCol, ", #", thisSeq);
- if (!missingRanks.empty()) {
- report += analyzeMissingRanks(missingRanks);
- } else {
- report += analyzeLaggingRanks(traceMap);
- report += dumpSnapshot(traceMap);
- }
- return report;
- }
- } // namespace c10d
|