experimental.cc 47 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "rocksdb/experimental.h"
  6. #include <cstddef>
  7. #include <cstdint>
  8. #include <functional>
  9. #include <memory>
  10. #include <string>
  11. #include <vector>
  12. #include "db/db_impl/db_impl.h"
  13. #include "db/manifest_ops.h"
  14. #include "db/version_edit_handler.h"
  15. #include "db/version_util.h"
  16. #include "logging/logging.h"
  17. #include "util/atomic.h"
  18. namespace ROCKSDB_NAMESPACE::experimental {
  19. Status SuggestCompactRange(DB* db, ColumnFamilyHandle* column_family,
  20. const Slice* begin, const Slice* end) {
  21. if (db == nullptr) {
  22. return Status::InvalidArgument("DB is empty");
  23. }
  24. return db->SuggestCompactRange(column_family, begin, end);
  25. }
  26. Status PromoteL0(DB* db, ColumnFamilyHandle* column_family, int target_level) {
  27. if (db == nullptr) {
  28. return Status::InvalidArgument("Didn't recognize DB object");
  29. }
  30. return db->PromoteL0(column_family, target_level);
  31. }
  32. Status SuggestCompactRange(DB* db, const Slice* begin, const Slice* end) {
  33. return SuggestCompactRange(db, db->DefaultColumnFamily(), begin, end);
  34. }
  35. Status GetFileChecksumsFromCurrentManifest(FileSystem* fs,
  36. const std::string& dbname,
  37. FileChecksumList* checksum_list) {
  38. std::string manifest_path;
  39. uint64_t manifest_file_number;
  40. Status s = GetCurrentManifestPath(dbname, fs, true /* is_retry */,
  41. &manifest_path, &manifest_file_number);
  42. if (!s.ok()) {
  43. return s;
  44. }
  45. if (checksum_list == nullptr) {
  46. return Status::InvalidArgument("checksum_list is nullptr");
  47. }
  48. assert(checksum_list);
  49. const ReadOptions read_options(
  50. Env::IOActivity::kGetFileChecksumsFromCurrentManifest);
  51. checksum_list->reset();
  52. std::unique_ptr<SequentialFileReader> file_reader;
  53. {
  54. std::unique_ptr<FSSequentialFile> file;
  55. s = fs->NewSequentialFile(manifest_path,
  56. fs->OptimizeForManifestRead(FileOptions()), &file,
  57. nullptr /* dbg */);
  58. if (!s.ok()) {
  59. return s;
  60. }
  61. file_reader.reset(new SequentialFileReader(std::move(file), manifest_path));
  62. }
  63. struct LogReporter : public log::Reader::Reporter {
  64. Status* status_ptr;
  65. void Corruption(size_t /*bytes*/, const Status& st,
  66. uint64_t /*log_number*/ = kMaxSequenceNumber) override {
  67. if (status_ptr->ok()) {
  68. *status_ptr = st;
  69. }
  70. }
  71. } reporter;
  72. reporter.status_ptr = &s;
  73. log::Reader reader(nullptr, std::move(file_reader), &reporter,
  74. true /* checksum */, 0 /* log_number */);
  75. // Read all records from the manifest file...
  76. uint64_t manifest_file_size = std::numeric_limits<uint64_t>::max();
  77. FileChecksumRetriever retriever(read_options, manifest_file_size);
  78. retriever.Iterate(reader, &s);
  79. if (!retriever.status().ok()) {
  80. return retriever.status();
  81. }
  82. return retriever.FetchFileChecksumList(*checksum_list);
  83. }
  84. Status UpdateManifestForFilesState(
  85. const DBOptions& db_opts, const std::string& db_name,
  86. const std::vector<ColumnFamilyDescriptor>& column_families,
  87. const UpdateManifestForFilesStateOptions& opts) {
  88. // TODO: plumb Env::IOActivity, Env::IOPriority
  89. const ReadOptions read_options;
  90. const WriteOptions write_options;
  91. OfflineManifestWriter w(db_opts, db_name);
  92. Status s = w.Recover(column_families);
  93. size_t files_updated = 0;
  94. size_t cfs_updated = 0;
  95. auto fs = db_opts.env->GetFileSystem();
  96. for (auto cfd : *w.Versions().GetColumnFamilySet()) {
  97. if (!s.ok()) {
  98. break;
  99. }
  100. assert(cfd);
  101. if (cfd->IsDropped() || !cfd->initialized()) {
  102. continue;
  103. }
  104. const auto* current = cfd->current();
  105. assert(current);
  106. const auto* vstorage = current->storage_info();
  107. assert(vstorage);
  108. VersionEdit edit;
  109. edit.SetColumnFamily(cfd->GetID());
  110. /* SST files */
  111. for (int level = 0; level < cfd->NumberLevels(); level++) {
  112. if (!s.ok()) {
  113. break;
  114. }
  115. const auto& level_files = vstorage->LevelFiles(level);
  116. for (const auto& lf : level_files) {
  117. assert(lf);
  118. uint64_t number = lf->fd.GetNumber();
  119. std::string fname =
  120. TableFileName(w.IOptions().db_paths, number, lf->fd.GetPathId());
  121. std::unique_ptr<FSSequentialFile> f;
  122. FileOptions fopts;
  123. // Use kUnknown to signal the FileSystem to search all tiers for the
  124. // file.
  125. fopts.temperature = Temperature::kUnknown;
  126. IOStatus file_ios =
  127. fs->NewSequentialFile(fname, fopts, &f, /*dbg*/ nullptr);
  128. if (file_ios.ok()) {
  129. if (opts.update_temperatures) {
  130. Temperature temp = f->GetTemperature();
  131. if (temp != Temperature::kUnknown && temp != lf->temperature) {
  132. // Current state inconsistent with manifest
  133. ++files_updated;
  134. edit.DeleteFile(level, number);
  135. edit.AddFile(
  136. level, number, lf->fd.GetPathId(), lf->fd.GetFileSize(),
  137. lf->smallest, lf->largest, lf->fd.smallest_seqno,
  138. lf->fd.largest_seqno, lf->marked_for_compaction, temp,
  139. lf->oldest_blob_file_number, lf->oldest_ancester_time,
  140. lf->file_creation_time, lf->epoch_number, lf->file_checksum,
  141. lf->file_checksum_func_name, lf->unique_id,
  142. lf->compensated_range_deletion_size, lf->tail_size,
  143. lf->user_defined_timestamps_persisted);
  144. }
  145. }
  146. } else {
  147. s = file_ios;
  148. break;
  149. }
  150. }
  151. }
  152. if (s.ok() && edit.NumEntries() > 0) {
  153. std::unique_ptr<FSDirectory> db_dir;
  154. s = fs->NewDirectory(db_name, IOOptions(), &db_dir, nullptr);
  155. if (s.ok()) {
  156. s = w.LogAndApply(read_options, write_options, cfd, &edit,
  157. db_dir.get());
  158. }
  159. if (s.ok()) {
  160. ++cfs_updated;
  161. }
  162. }
  163. }
  164. if (cfs_updated > 0) {
  165. ROCKS_LOG_INFO(db_opts.info_log,
  166. "UpdateManifestForFilesState: updated %zu files in %zu CFs",
  167. files_updated, cfs_updated);
  168. } else if (s.ok()) {
  169. ROCKS_LOG_INFO(db_opts.info_log,
  170. "UpdateManifestForFilesState: no updates needed");
  171. }
  172. if (!s.ok()) {
  173. ROCKS_LOG_ERROR(db_opts.info_log, "UpdateManifestForFilesState failed: %s",
  174. s.ToString().c_str());
  175. }
  176. return s;
  177. }
  178. // EXPERIMENTAL new filtering features
  179. namespace {
  180. template <size_t N>
  181. class SemiStaticCappedKeySegmentsExtractor : public KeySegmentsExtractor {
  182. public:
  183. SemiStaticCappedKeySegmentsExtractor(const uint32_t* byte_widths) {
  184. id_ = kName();
  185. uint32_t prev_end = 0;
  186. if constexpr (N > 0) { // Suppress a compiler warning
  187. for (size_t i = 0; i < N; ++i) {
  188. prev_end = prev_end + byte_widths[i];
  189. ideal_ends_[i] = prev_end;
  190. id_ += std::to_string(byte_widths[i]) + "b";
  191. }
  192. }
  193. }
  194. static const char* kName() { return "CappedKeySegmentsExtractor"; }
  195. const char* Name() const override { return kName(); }
  196. std::string GetId() const override { return id_; }
  197. void Extract(const Slice& key_or_bound, KeyKind /*kind*/,
  198. Result* result) const override {
  199. // Optimistic assignment
  200. result->segment_ends.assign(ideal_ends_.begin(), ideal_ends_.end());
  201. if constexpr (N > 0) { // Suppress a compiler warning
  202. uint32_t key_size = static_cast<uint32_t>(key_or_bound.size());
  203. if (key_size < ideal_ends_.back()) {
  204. // Need to fix up (should be rare)
  205. for (size_t i = 0; i < N; ++i) {
  206. result->segment_ends[i] = std::min(key_size, result->segment_ends[i]);
  207. }
  208. }
  209. }
  210. }
  211. private:
  212. std::array<uint32_t, N> ideal_ends_;
  213. std::string id_;
  214. };
  215. class DynamicCappedKeySegmentsExtractor : public KeySegmentsExtractor {
  216. public:
  217. DynamicCappedKeySegmentsExtractor(const std::vector<uint32_t>& byte_widths) {
  218. id_ = kName();
  219. uint32_t prev_end = 0;
  220. for (size_t i = 0; i < byte_widths.size(); ++i) {
  221. prev_end = prev_end + byte_widths[i];
  222. ideal_ends_[i] = prev_end;
  223. id_ += std::to_string(byte_widths[i]) + "b";
  224. }
  225. final_ideal_end_ = prev_end;
  226. }
  227. static const char* kName() { return "CappedKeySegmentsExtractor"; }
  228. const char* Name() const override { return kName(); }
  229. std::string GetId() const override { return id_; }
  230. void Extract(const Slice& key_or_bound, KeyKind /*kind*/,
  231. Result* result) const override {
  232. // Optimistic assignment
  233. result->segment_ends = ideal_ends_;
  234. uint32_t key_size = static_cast<uint32_t>(key_or_bound.size());
  235. if (key_size < final_ideal_end_) {
  236. // Need to fix up (should be rare)
  237. for (size_t i = 0; i < ideal_ends_.size(); ++i) {
  238. result->segment_ends[i] = std::min(key_size, result->segment_ends[i]);
  239. }
  240. }
  241. }
  242. private:
  243. std::vector<uint32_t> ideal_ends_;
  244. uint32_t final_ideal_end_;
  245. std::string id_;
  246. };
  247. void GetFilterInput(FilterInput select, const Slice& key,
  248. const KeySegmentsExtractor::Result& extracted,
  249. Slice* out_input, Slice* out_leadup) {
  250. struct FilterInputGetter {
  251. explicit FilterInputGetter(const Slice& _key,
  252. const KeySegmentsExtractor::Result& _extracted)
  253. : key(_key), extracted(_extracted) {}
  254. const Slice& key;
  255. const KeySegmentsExtractor::Result& extracted;
  256. Slice operator()(SelectKeySegment select) {
  257. size_t count = extracted.segment_ends.size();
  258. if (count <= select.segment_index) {
  259. return Slice();
  260. }
  261. assert(count > 0);
  262. size_t start = select.segment_index > 0
  263. ? extracted.segment_ends[select.segment_index - 1]
  264. : 0;
  265. size_t end =
  266. extracted
  267. .segment_ends[std::min(size_t{select.segment_index}, count - 1)];
  268. return Slice(key.data() + start, end - start);
  269. }
  270. Slice operator()(SelectKeySegmentRange select) {
  271. assert(select.from_segment_index <= select.to_segment_index);
  272. size_t count = extracted.segment_ends.size();
  273. if (count <= select.from_segment_index) {
  274. return Slice();
  275. }
  276. assert(count > 0);
  277. size_t start = select.from_segment_index > 0
  278. ? extracted.segment_ends[select.from_segment_index - 1]
  279. : 0;
  280. size_t end = extracted.segment_ends[std::min(
  281. size_t{select.to_segment_index}, count - 1)];
  282. return Slice(key.data() + start, end - start);
  283. }
  284. Slice operator()(SelectWholeKey) { return key; }
  285. Slice operator()(SelectLegacyKeyPrefix) {
  286. // TODO
  287. assert(false);
  288. return Slice();
  289. }
  290. Slice operator()(SelectUserTimestamp) {
  291. // TODO
  292. assert(false);
  293. return Slice();
  294. }
  295. Slice operator()(SelectColumnName) {
  296. // TODO
  297. assert(false);
  298. return Slice();
  299. }
  300. };
  301. Slice input = std::visit(FilterInputGetter(key, extracted), select);
  302. *out_input = input;
  303. if (input.empty() || input.data() < key.data() ||
  304. input.data() > key.data() + key.size()) {
  305. *out_leadup = key;
  306. } else {
  307. *out_leadup = Slice(key.data(), input.data() - key.data());
  308. }
  309. }
  310. const char* DeserializeFilterInput(const char* p, const char* limit,
  311. FilterInput* out) {
  312. if (p >= limit) {
  313. return nullptr;
  314. }
  315. uint8_t b = static_cast<uint8_t>(*p++);
  316. if (b & 0x80) {
  317. // Reserved for future use to read more bytes
  318. return nullptr;
  319. }
  320. switch (b >> 4) {
  321. case 0:
  322. // Various cases that don't have an argument
  323. switch (b) {
  324. case 0:
  325. *out = SelectWholeKey{};
  326. return p;
  327. case 1:
  328. *out = SelectLegacyKeyPrefix{};
  329. return p;
  330. case 2:
  331. *out = SelectUserTimestamp{};
  332. return p;
  333. case 3:
  334. *out = SelectColumnName{};
  335. return p;
  336. default:
  337. // Reserved for future use
  338. return nullptr;
  339. }
  340. case 1:
  341. // First 16 cases of SelectKeySegment
  342. *out = SelectKeySegment{BitwiseAnd(b, 0xf)};
  343. return p;
  344. case 2:
  345. // First 16 cases of SelectKeySegmentRange
  346. // that are not a single key segment
  347. // 0: 0-1
  348. // 1: 0-2
  349. // 2: 1-2
  350. // 3: 0-3
  351. // 4: 1-3
  352. // 5: 2-3
  353. // 6: 0-4
  354. // 7: 1-4
  355. // 8: 2-4
  356. // 9: 3-4
  357. // 10: 0-5
  358. // 11: 1-5
  359. // 12: 2-5
  360. // 13: 3-5
  361. // 14: 4-5
  362. // 15: 0-6
  363. if (b < 6) {
  364. if (b >= 3) {
  365. *out = SelectKeySegmentRange{static_cast<uint8_t>(b - 3), 3};
  366. } else if (b >= 1) {
  367. *out = SelectKeySegmentRange{static_cast<uint8_t>(b - 1), 2};
  368. } else {
  369. *out = SelectKeySegmentRange{0, 1};
  370. }
  371. } else if (b < 10) {
  372. *out = SelectKeySegmentRange{static_cast<uint8_t>(b - 6), 4};
  373. } else if (b < 15) {
  374. *out = SelectKeySegmentRange{static_cast<uint8_t>(b - 10), 5};
  375. } else {
  376. *out = SelectKeySegmentRange{0, 6};
  377. }
  378. return p;
  379. default:
  380. // Reserved for future use
  381. return nullptr;
  382. }
  383. }
  384. void SerializeFilterInput(std::string* out, const FilterInput& select) {
  385. struct FilterInputSerializer {
  386. std::string* out;
  387. void operator()(SelectWholeKey) { out->push_back(0); }
  388. void operator()(SelectLegacyKeyPrefix) { out->push_back(1); }
  389. void operator()(SelectUserTimestamp) { out->push_back(2); }
  390. void operator()(SelectColumnName) { out->push_back(3); }
  391. void operator()(SelectKeySegment select) {
  392. // TODO: expand supported cases
  393. assert(select.segment_index < 16);
  394. out->push_back(static_cast<char>((1 << 4) | select.segment_index));
  395. }
  396. void operator()(SelectKeySegmentRange select) {
  397. auto from = select.from_segment_index;
  398. auto to = select.to_segment_index;
  399. // TODO: expand supported cases
  400. assert(from < 6);
  401. assert(to < 6 || (to == 6 && from == 0));
  402. assert(from < to);
  403. int start = (to - 1) * to / 2;
  404. assert(start + from < 16);
  405. out->push_back(static_cast<char>((2 << 4) | (start + from)));
  406. }
  407. };
  408. std::visit(FilterInputSerializer{out}, select);
  409. }
  410. size_t GetFilterInputSerializedLength(const FilterInput& /*select*/) {
  411. // TODO: expand supported cases
  412. return 1;
  413. }
  414. uint64_t CategorySetToUint(const KeySegmentsExtractor::KeyCategorySet& s) {
  415. static_assert(sizeof(KeySegmentsExtractor::KeyCategorySet) ==
  416. sizeof(uint64_t));
  417. return *reinterpret_cast<const uint64_t*>(&s);
  418. }
  419. KeySegmentsExtractor::KeyCategorySet UintToCategorySet(uint64_t s) {
  420. static_assert(sizeof(KeySegmentsExtractor::KeyCategorySet) ==
  421. sizeof(uint64_t));
  422. return *reinterpret_cast<const KeySegmentsExtractor::KeyCategorySet*>(&s);
  423. }
  424. enum BuiltinSstQueryFilters : char {
  425. // Wraps a set of filters such that they use a particular
  426. // KeySegmentsExtractor and a set of categories covering all keys seen.
  427. // TODO: unit test category covering filtering
  428. kExtrAndCatFilterWrapper = 0x1,
  429. // Wraps a set of filters to limit their scope to a particular set of
  430. // categories. (Unlike kExtrAndCatFilterWrapper,
  431. // keys in other categories may have been seen so are not filtered here.)
  432. // TODO: unit test more subtleties
  433. kCategoryScopeFilterWrapper = 0x2,
  434. // ... (reserve some values for more wrappers)
  435. // A filter representing the bytewise min and max values of a numbered
  436. // segment or composite (range of segments). The empty value is tracked
  437. // and filtered independently because it might be a special case that is
  438. // not representative of the minimum in a spread of values.
  439. kBytewiseMinMaxFilter = 0x10,
  440. kRevBytewiseMinMaxFilter = 0x11,
  441. };
  442. class SstQueryFilterBuilder {
  443. public:
  444. virtual ~SstQueryFilterBuilder() = default;
  445. virtual void Add(const Slice& key,
  446. const KeySegmentsExtractor::Result& extracted,
  447. const Slice* prev_key,
  448. const KeySegmentsExtractor::Result* prev_extracted) = 0;
  449. virtual Status GetStatus() const = 0;
  450. virtual size_t GetEncodedLength() const = 0;
  451. virtual void Finish(std::string& append_to) = 0;
  452. };
  453. class SstQueryFilterConfigImpl : public SstQueryFilterConfig {
  454. public:
  455. explicit SstQueryFilterConfigImpl(
  456. const FilterInput& input,
  457. const KeySegmentsExtractor::KeyCategorySet& categories)
  458. : input_(input), categories_(categories) {}
  459. virtual ~SstQueryFilterConfigImpl() = default;
  460. virtual std::unique_ptr<SstQueryFilterBuilder> NewBuilder(
  461. bool sanity_checks) const = 0;
  462. protected:
  463. FilterInput input_;
  464. KeySegmentsExtractor::KeyCategorySet categories_;
  465. };
  466. class CategoryScopeFilterWrapperBuilder : public SstQueryFilterBuilder {
  467. public:
  468. explicit CategoryScopeFilterWrapperBuilder(
  469. KeySegmentsExtractor::KeyCategorySet categories,
  470. std::unique_ptr<SstQueryFilterBuilder> wrapped)
  471. : categories_(categories), wrapped_(std::move(wrapped)) {}
  472. void Add(const Slice& key, const KeySegmentsExtractor::Result& extracted,
  473. const Slice* prev_key,
  474. const KeySegmentsExtractor::Result* prev_extracted) override {
  475. if (!categories_.Contains(extracted.category)) {
  476. // Category not in scope of the contituent filters
  477. return;
  478. }
  479. wrapped_->Add(key, extracted, prev_key, prev_extracted);
  480. }
  481. Status GetStatus() const override { return wrapped_->GetStatus(); }
  482. size_t GetEncodedLength() const override {
  483. size_t wrapped_length = wrapped_->GetEncodedLength();
  484. if (wrapped_length == 0) {
  485. // Use empty filter
  486. // FIXME: needs unit test
  487. return 0;
  488. } else {
  489. // For now in the code, wraps only 1 filter, but schema supports multiple
  490. return 1 + VarintLength(CategorySetToUint(categories_)) +
  491. VarintLength(1) + wrapped_length;
  492. }
  493. }
  494. void Finish(std::string& append_to) override {
  495. size_t encoded_length = GetEncodedLength();
  496. if (encoded_length == 0) {
  497. // Nothing to do
  498. return;
  499. }
  500. size_t old_append_to_size = append_to.size();
  501. append_to.reserve(old_append_to_size + encoded_length);
  502. append_to.push_back(kCategoryScopeFilterWrapper);
  503. PutVarint64(&append_to, CategorySetToUint(categories_));
  504. // Wrapping just 1 filter for now
  505. PutVarint64(&append_to, 1);
  506. wrapped_->Finish(append_to);
  507. }
  508. private:
  509. KeySegmentsExtractor::KeyCategorySet categories_;
  510. std::unique_ptr<SstQueryFilterBuilder> wrapped_;
  511. };
  512. class BytewiseMinMaxSstQueryFilterConfig : public SstQueryFilterConfigImpl {
  513. public:
  514. explicit BytewiseMinMaxSstQueryFilterConfig(
  515. const FilterInput& input,
  516. const KeySegmentsExtractor::KeyCategorySet& categories, bool reverse)
  517. : SstQueryFilterConfigImpl(input, categories), reverse_(reverse) {}
  518. std::unique_ptr<SstQueryFilterBuilder> NewBuilder(
  519. bool sanity_checks) const override {
  520. auto b = std::make_unique<MyBuilder>(*this, sanity_checks);
  521. if (categories_ != KeySegmentsExtractor::KeyCategorySet::All()) {
  522. return std::make_unique<CategoryScopeFilterWrapperBuilder>(categories_,
  523. std::move(b));
  524. } else {
  525. return b;
  526. }
  527. }
  528. static bool RangeMayMatch(
  529. const Slice& filter, const Slice& lower_bound_incl,
  530. const KeySegmentsExtractor::Result& lower_bound_extracted,
  531. const Slice& upper_bound_excl,
  532. const KeySegmentsExtractor::Result& upper_bound_extracted) {
  533. assert(!filter.empty() && (filter[0] == kBytewiseMinMaxFilter ||
  534. filter[0] == kRevBytewiseMinMaxFilter));
  535. if (filter.size() <= 4) {
  536. // Missing some data
  537. return true;
  538. }
  539. bool reverse = (filter[0] == kRevBytewiseMinMaxFilter);
  540. bool empty_included = (filter[1] & kEmptySeenFlag) != 0;
  541. const char* p = filter.data() + 2;
  542. const char* limit = filter.data() + filter.size();
  543. FilterInput in;
  544. p = DeserializeFilterInput(p, limit, &in);
  545. if (p == nullptr) {
  546. // Corrupt or unsupported
  547. return true;
  548. }
  549. uint32_t smallest_size;
  550. p = GetVarint32Ptr(p, limit, &smallest_size);
  551. if (p == nullptr || static_cast<size_t>(limit - p) <= smallest_size) {
  552. // Corrupt
  553. return true;
  554. }
  555. Slice smallest = Slice(p, smallest_size);
  556. p += smallest_size;
  557. size_t largest_size = static_cast<size_t>(limit - p);
  558. Slice largest = Slice(p, largest_size);
  559. Slice lower_bound_input, lower_bound_leadup;
  560. Slice upper_bound_input, upper_bound_leadup;
  561. GetFilterInput(in, lower_bound_incl, lower_bound_extracted,
  562. &lower_bound_input, &lower_bound_leadup);
  563. GetFilterInput(in, upper_bound_excl, upper_bound_extracted,
  564. &upper_bound_input, &upper_bound_leadup);
  565. if (lower_bound_leadup.compare(upper_bound_leadup) != 0) {
  566. // Unable to filter range when bounds have different lead-up to key
  567. // segment
  568. return true;
  569. }
  570. if (empty_included && lower_bound_input.empty()) {
  571. // May match on 0-length segment
  572. return true;
  573. }
  574. // TODO: potentially fix upper bound to actually be exclusive, but it's not
  575. // as simple as changing >= to > below, because it's upper_bound_excl that's
  576. // exclusive, and the upper_bound_input part extracted from it might not be.
  577. // May match if both the upper bound and lower bound indicate there could
  578. // be overlap
  579. if (reverse) {
  580. return upper_bound_input.compare(smallest) <= 0 &&
  581. lower_bound_input.compare(largest) >= 0;
  582. } else {
  583. return upper_bound_input.compare(smallest) >= 0 &&
  584. lower_bound_input.compare(largest) <= 0;
  585. }
  586. }
  587. protected:
  588. struct MyBuilder : public SstQueryFilterBuilder {
  589. MyBuilder(const BytewiseMinMaxSstQueryFilterConfig& _parent,
  590. bool _sanity_checks)
  591. : parent(_parent), sanity_checks(_sanity_checks) {}
  592. void Add(const Slice& key, const KeySegmentsExtractor::Result& extracted,
  593. const Slice* prev_key,
  594. const KeySegmentsExtractor::Result* prev_extracted) override {
  595. Slice input, leadup;
  596. GetFilterInput(parent.input_, key, extracted, &input, &leadup);
  597. if (sanity_checks && prev_key && prev_extracted) {
  598. // Opportunistic checking of segment ordering invariant
  599. Slice prev_input, prev_leadup;
  600. GetFilterInput(parent.input_, *prev_key, *prev_extracted, &prev_input,
  601. &prev_leadup);
  602. int compare = prev_leadup.compare(leadup);
  603. if (compare == 0) {
  604. // On the same prefix leading up to the segment, the segments must
  605. // not be out of order.
  606. compare = prev_input.compare(input);
  607. if (parent.reverse_ ? compare < 0 : compare > 0) {
  608. status = Status::Corruption(
  609. "Ordering invariant violated from 0x" +
  610. prev_key->ToString(/*hex=*/true) + " with segment 0x" +
  611. prev_input.ToString(/*hex=*/true) + " to 0x" +
  612. key.ToString(/*hex=*/true) + " with segment 0x" +
  613. input.ToString(/*hex=*/true));
  614. return;
  615. }
  616. }
  617. // NOTE: it is not strictly required that the leadup be ordered, just
  618. // satisfy the "common segment prefix property" which would be
  619. // expensive to check
  620. }
  621. // Now actually update state for the filter inputs
  622. // TODO: shorten largest and smallest if appropriate
  623. if (input.empty()) {
  624. empty_seen = true;
  625. } else if (largest.empty()) {
  626. // Step for first non-empty input
  627. smallest = largest = input.ToString();
  628. } else if (input.compare(largest) > 0) {
  629. largest = input.ToString();
  630. } else if (input.compare(smallest) < 0) {
  631. smallest = input.ToString();
  632. }
  633. }
  634. Status GetStatus() const override { return status; }
  635. size_t GetEncodedLength() const override {
  636. if (largest.empty()) {
  637. // Not an interesting filter -> 0 to indicate no filter
  638. // FIXME: needs unit test
  639. return 0;
  640. }
  641. return 2 + GetFilterInputSerializedLength(parent.input_) +
  642. VarintLength(parent.reverse_ ? largest.size() : smallest.size()) +
  643. smallest.size() + largest.size();
  644. }
  645. void Finish(std::string& append_to) override {
  646. assert(status.ok());
  647. size_t encoded_length = GetEncodedLength();
  648. if (encoded_length == 0) {
  649. // Nothing to do
  650. return;
  651. }
  652. size_t old_append_to_size = append_to.size();
  653. append_to.reserve(old_append_to_size + encoded_length);
  654. append_to.push_back(parent.reverse_ ? kRevBytewiseMinMaxFilter
  655. : kBytewiseMinMaxFilter);
  656. append_to.push_back(empty_seen ? kEmptySeenFlag : 0);
  657. SerializeFilterInput(&append_to, parent.input_);
  658. auto& minv = parent.reverse_ ? largest : smallest;
  659. auto& maxv = parent.reverse_ ? smallest : largest;
  660. PutVarint32(&append_to, static_cast<uint32_t>(minv.size()));
  661. append_to.append(minv);
  662. // The end of `maxv` is given by the end of the filter
  663. append_to.append(maxv);
  664. assert(append_to.size() == old_append_to_size + encoded_length);
  665. }
  666. const BytewiseMinMaxSstQueryFilterConfig& parent;
  667. const bool sanity_checks;
  668. // Smallest and largest segment seen, excluding the empty segment which
  669. // is tracked separately. "Reverse" from parent is only applied at
  670. // serialization time, for efficiency.
  671. std::string smallest;
  672. std::string largest;
  673. bool empty_seen = false;
  674. // Only for sanity checks
  675. Status status;
  676. };
  677. bool reverse_;
  678. private:
  679. static constexpr char kEmptySeenFlag = 0x1;
  680. };
  681. const SstQueryFilterConfigs kEmptyNotFoundSQFC{};
  682. class SstQueryFilterConfigsManagerImpl : public SstQueryFilterConfigsManager {
  683. public:
  684. using ConfigVersionMap = std::map<FilteringVersion, SstQueryFilterConfigs>;
  685. Status Populate(const Data& data) {
  686. if (data.empty()) {
  687. return Status::OK();
  688. }
  689. // Populate only once
  690. assert(min_ver_ == 0 && max_ver_ == 0);
  691. min_ver_ = max_ver_ = data.begin()->first;
  692. FilteringVersion prev_ver = 0;
  693. bool first_entry = true;
  694. for (const auto& ver_info : data) {
  695. if (ver_info.first == 0) {
  696. return Status::InvalidArgument(
  697. "Filtering version 0 is reserved for empty configuration and may "
  698. "not be overridden");
  699. }
  700. if (first_entry) {
  701. min_ver_ = ver_info.first;
  702. first_entry = false;
  703. } else if (ver_info.first != prev_ver + 1) {
  704. return Status::InvalidArgument(
  705. "Filtering versions must increase by 1 without repeating: " +
  706. std::to_string(prev_ver) + " -> " + std::to_string(ver_info.first));
  707. }
  708. max_ver_ = ver_info.first;
  709. UnorderedSet<std::string> names_seen_this_ver;
  710. for (const auto& config : ver_info.second) {
  711. if (!names_seen_this_ver.insert(config.first).second) {
  712. return Status::InvalidArgument(
  713. "Duplicate name in filtering version " +
  714. std::to_string(ver_info.first) + ": " + config.first);
  715. }
  716. auto& ver_map = name_map_[config.first];
  717. ver_map[ver_info.first] = config.second;
  718. if (config.second.extractor) {
  719. extractor_map_[config.second.extractor->GetId()] =
  720. config.second.extractor;
  721. }
  722. }
  723. prev_ver = ver_info.first;
  724. }
  725. return Status::OK();
  726. }
  727. struct MyCollector : public TablePropertiesCollector {
  728. // Keeps a reference to `configs` which should be kept alive by
  729. // SstQueryFilterConfigsManagerImpl, which should be kept alive by
  730. // any factories
  731. // TODO: sanity_checks option
  732. explicit MyCollector(const SstQueryFilterConfigs& configs,
  733. const SstQueryFilterConfigsManagerImpl& _parent)
  734. : parent(_parent),
  735. extractor(configs.extractor.get()),
  736. sanity_checks(true) {
  737. for (const auto& c : configs.filters) {
  738. builders.push_back(
  739. static_cast<SstQueryFilterConfigImpl&>(*c).NewBuilder(
  740. sanity_checks));
  741. }
  742. }
  743. Status AddUserKey(const Slice& key, const Slice& /*value*/,
  744. EntryType /*type*/, SequenceNumber /*seq*/,
  745. uint64_t /*file_size*/) override {
  746. // FIXME later: `key` might contain user timestamp. That should be
  747. // exposed properly in a future update to TablePropertiesCollector
  748. extracted.Reset();
  749. if (extractor) {
  750. extractor->Extract(key, KeySegmentsExtractor::kFullUserKey, &extracted);
  751. if (UNLIKELY(extracted.category >=
  752. KeySegmentsExtractor::kMinErrorCategory)) {
  753. // TODO: proper failure scopes
  754. Status s = Status::Corruption(
  755. "Extractor returned error category from key 0x" +
  756. Slice(key).ToString(/*hex=*/true));
  757. overall_status.UpdateIfOk(s);
  758. return s;
  759. }
  760. assert(extracted.category <= KeySegmentsExtractor::kMaxUsableCategory);
  761. bool new_category = categories_seen.Add(extracted.category);
  762. if (sanity_checks) {
  763. // Opportunistic checking of category ordering invariant
  764. if (!first_key) {
  765. if (prev_extracted.category != extracted.category &&
  766. !new_category) {
  767. Status s = Status::Corruption(
  768. "Category ordering invariant violated from key 0x" +
  769. Slice(prev_key).ToString(/*hex=*/true) + " to 0x" +
  770. key.ToString(/*hex=*/true));
  771. overall_status.UpdateIfOk(s);
  772. return s;
  773. }
  774. }
  775. }
  776. }
  777. for (const auto& b : builders) {
  778. if (first_key) {
  779. b->Add(key, extracted, nullptr, nullptr);
  780. } else {
  781. Slice prev_key_slice = Slice(prev_key);
  782. b->Add(key, extracted, &prev_key_slice, &prev_extracted);
  783. }
  784. }
  785. prev_key.assign(key.data(), key.size());
  786. std::swap(prev_extracted, extracted);
  787. first_key = false;
  788. return Status::OK();
  789. }
  790. Status Finish(UserCollectedProperties* properties) override {
  791. assert(properties != nullptr);
  792. if (!overall_status.ok()) {
  793. return overall_status;
  794. }
  795. size_t total_size = 1;
  796. autovector<std::pair<SstQueryFilterBuilder&, size_t>> filters_to_finish;
  797. // Need to determine number of filters before serializing them. Might
  798. // as well determine full length also.
  799. for (const auto& b : builders) {
  800. Status s = b->GetStatus();
  801. if (s.ok()) {
  802. size_t len = b->GetEncodedLength();
  803. if (len > 0) {
  804. total_size += VarintLength(len) + len;
  805. filters_to_finish.emplace_back(*b, len);
  806. }
  807. } else {
  808. // FIXME: no way to report partial failure without getting
  809. // remaining filters thrown out
  810. }
  811. }
  812. total_size += VarintLength(filters_to_finish.size());
  813. if (filters_to_finish.empty()) {
  814. // No filters to add
  815. return Status::OK();
  816. }
  817. // Length of the last filter is omitted
  818. total_size -= VarintLength(filters_to_finish.back().second);
  819. // Need to determine size of
  820. // kExtrAndCatFilterWrapper if used
  821. std::string extractor_id;
  822. if (extractor) {
  823. extractor_id = extractor->GetId();
  824. // identifier byte
  825. total_size += 1;
  826. // fields of the wrapper
  827. total_size += VarintLength(extractor_id.size()) + extractor_id.size() +
  828. VarintLength(CategorySetToUint(categories_seen));
  829. // outer layer will have just 1 filter in its count (added here)
  830. // and this filter wrapper will have filters_to_finish.size()
  831. // (added above).
  832. total_size += VarintLength(1);
  833. }
  834. std::string filters;
  835. filters.reserve(total_size);
  836. // Leave room for drastic changes in the future.
  837. filters.push_back(kSchemaVersion);
  838. if (extractor) {
  839. // Wrap everything in a kExtrAndCatFilterWrapper
  840. // TODO in future: put whole key filters outside of this wrapper.
  841. // Also TODO in future: order the filters starting with broadest
  842. // applicability.
  843. // Just one top-level filter (wrapper). Because it's last, we don't
  844. // need to encode its length.
  845. PutVarint64(&filters, 1);
  846. // The filter(s) wrapper itself
  847. filters.push_back(kExtrAndCatFilterWrapper);
  848. PutVarint64(&filters, extractor_id.size());
  849. filters += extractor_id;
  850. PutVarint64(&filters, CategorySetToUint(categories_seen));
  851. }
  852. PutVarint64(&filters, filters_to_finish.size());
  853. for (const auto& e : filters_to_finish) {
  854. // Encode filter length, except last filter
  855. if (&e != &filters_to_finish.back()) {
  856. PutVarint64(&filters, e.second);
  857. }
  858. // Encode filter
  859. e.first.Finish(filters);
  860. }
  861. if (filters.size() != total_size) {
  862. assert(false);
  863. return Status::Corruption(
  864. "Internal inconsistency building SST query filters");
  865. }
  866. (*properties)[kTablePropertyName] = std::move(filters);
  867. return Status::OK();
  868. }
  869. UserCollectedProperties GetReadableProperties() const override {
  870. // TODO?
  871. return {};
  872. }
  873. const char* Name() const override {
  874. // placeholder
  875. return "SstQueryFilterConfigsImpl::MyCollector";
  876. }
  877. Status overall_status;
  878. const SstQueryFilterConfigsManagerImpl& parent;
  879. const KeySegmentsExtractor* const extractor;
  880. const bool sanity_checks;
  881. std::vector<std::shared_ptr<SstQueryFilterBuilder>> builders;
  882. bool first_key = true;
  883. std::string prev_key;
  884. KeySegmentsExtractor::Result extracted;
  885. KeySegmentsExtractor::Result prev_extracted;
  886. KeySegmentsExtractor::KeyCategorySet categories_seen;
  887. };
  888. struct RangeQueryFilterReader {
  889. Slice lower_bound_incl;
  890. Slice upper_bound_excl;
  891. const KeySegmentsExtractor* extractor;
  892. const UnorderedMap<std::string,
  893. std::shared_ptr<const KeySegmentsExtractor>>&
  894. extractor_map;
  895. struct State {
  896. KeySegmentsExtractor::Result lb_extracted;
  897. KeySegmentsExtractor::Result ub_extracted;
  898. };
  899. bool MayMatch_CategoryScopeFilterWrapper(Slice wrapper,
  900. State& state) const {
  901. assert(!wrapper.empty() && wrapper[0] == kCategoryScopeFilterWrapper);
  902. // Regardless of the filter values (which we assume is not all
  903. // categories; that should skip the wrapper), we need upper bound and
  904. // lower bound to be in the same category to do any range filtering.
  905. // (There could be another category in range between the bounds.)
  906. if (state.lb_extracted.category != state.ub_extracted.category) {
  907. // Can't filter between categories
  908. return true;
  909. }
  910. const char* p = wrapper.data() + 1;
  911. const char* limit = wrapper.data() + wrapper.size();
  912. uint64_t cats_raw;
  913. p = GetVarint64Ptr(p, limit, &cats_raw);
  914. if (p == nullptr) {
  915. // Missing categories
  916. return true;
  917. }
  918. KeySegmentsExtractor::KeyCategorySet categories =
  919. UintToCategorySet(cats_raw);
  920. // Check category against those in scope
  921. if (!categories.Contains(state.lb_extracted.category)) {
  922. // Can't filter this category
  923. return true;
  924. }
  925. // Process the wrapped filters
  926. return MayMatch(Slice(p, limit - p), &state);
  927. }
  928. bool MayMatch_ExtrAndCatFilterWrapper(Slice wrapper) const {
  929. assert(!wrapper.empty() && wrapper[0] == kExtrAndCatFilterWrapper);
  930. if (wrapper.size() <= 4) {
  931. // Missing some data
  932. // (1 byte marker, >= 1 byte name length, >= 1 byte name, >= 1 byte
  933. // categories, ...)
  934. return true;
  935. }
  936. const char* p = wrapper.data() + 1;
  937. const char* limit = wrapper.data() + wrapper.size();
  938. uint64_t name_len;
  939. p = GetVarint64Ptr(p, limit, &name_len);
  940. if (p == nullptr || name_len == 0 ||
  941. static_cast<size_t>(limit - p) < name_len) {
  942. // Missing some data
  943. return true;
  944. }
  945. Slice name(p, name_len);
  946. p += name_len;
  947. const KeySegmentsExtractor* ex = nullptr;
  948. if (extractor && name == Slice(extractor->GetId())) {
  949. ex = extractor;
  950. } else {
  951. auto it = extractor_map.find(name.ToString());
  952. if (it != extractor_map.end()) {
  953. ex = it->second.get();
  954. } else {
  955. // Extractor mismatch / not found
  956. // TODO future: try to get the extractor from the ObjectRegistry
  957. return true;
  958. }
  959. }
  960. // TODO future: cache extraction?
  961. // Ready to run extractor
  962. assert(ex);
  963. State state;
  964. ex->Extract(lower_bound_incl, KeySegmentsExtractor::kInclusiveLowerBound,
  965. &state.lb_extracted);
  966. if (UNLIKELY(state.lb_extracted.category >=
  967. KeySegmentsExtractor::kMinErrorCategory)) {
  968. // TODO? Report problem
  969. // No filtering
  970. return true;
  971. }
  972. assert(state.lb_extracted.category <=
  973. KeySegmentsExtractor::kMaxUsableCategory);
  974. ex->Extract(upper_bound_excl, KeySegmentsExtractor::kExclusiveUpperBound,
  975. &state.ub_extracted);
  976. if (UNLIKELY(state.ub_extracted.category >=
  977. KeySegmentsExtractor::kMinErrorCategory)) {
  978. // TODO? Report problem
  979. // No filtering
  980. return true;
  981. }
  982. assert(state.ub_extracted.category <=
  983. KeySegmentsExtractor::kMaxUsableCategory);
  984. uint64_t cats_raw;
  985. p = GetVarint64Ptr(p, limit, &cats_raw);
  986. if (p == nullptr) {
  987. // Missing categories
  988. return true;
  989. }
  990. KeySegmentsExtractor::KeyCategorySet categories =
  991. UintToCategorySet(cats_raw);
  992. // Can only filter out based on category if upper and lower bound have
  993. // the same category. (Each category is contiguous by key order, but we
  994. // don't know the order between categories.)
  995. if (state.lb_extracted.category == state.ub_extracted.category &&
  996. !categories.Contains(state.lb_extracted.category)) {
  997. // Filtered out
  998. return false;
  999. }
  1000. // Process the wrapped filters
  1001. return MayMatch(Slice(p, limit - p), &state);
  1002. }
  1003. bool MayMatch(Slice filters, State* state = nullptr) const {
  1004. const char* p = filters.data();
  1005. const char* limit = p + filters.size();
  1006. uint64_t filter_count;
  1007. p = GetVarint64Ptr(p, limit, &filter_count);
  1008. if (p == nullptr || filter_count == 0) {
  1009. // TODO? Report problem
  1010. // No filtering
  1011. return true;
  1012. }
  1013. for (size_t i = 0; i < filter_count; ++i) {
  1014. uint64_t filter_len;
  1015. if (i + 1 == filter_count) {
  1016. // Last filter
  1017. filter_len = static_cast<uint64_t>(limit - p);
  1018. } else {
  1019. p = GetVarint64Ptr(p, limit, &filter_len);
  1020. if (p == nullptr || filter_len == 0 ||
  1021. static_cast<size_t>(limit - p) < filter_len) {
  1022. // TODO? Report problem
  1023. // No filtering
  1024. return true;
  1025. }
  1026. }
  1027. Slice filter = Slice(p, filter_len);
  1028. p += filter_len;
  1029. bool may_match = true;
  1030. char type = filter[0];
  1031. switch (type) {
  1032. case kExtrAndCatFilterWrapper:
  1033. may_match = MayMatch_ExtrAndCatFilterWrapper(filter);
  1034. break;
  1035. case kCategoryScopeFilterWrapper:
  1036. if (state == nullptr) {
  1037. // TODO? Report problem
  1038. // No filtering
  1039. return true;
  1040. }
  1041. may_match = MayMatch_CategoryScopeFilterWrapper(filter, *state);
  1042. break;
  1043. case kBytewiseMinMaxFilter:
  1044. case kRevBytewiseMinMaxFilter:
  1045. if (state == nullptr) {
  1046. // TODO? Report problem
  1047. // No filtering
  1048. return true;
  1049. }
  1050. may_match = BytewiseMinMaxSstQueryFilterConfig::RangeMayMatch(
  1051. filter, lower_bound_incl, state->lb_extracted, upper_bound_excl,
  1052. state->ub_extracted);
  1053. break;
  1054. default:
  1055. // TODO? Report problem
  1056. {}
  1057. // Unknown filter type
  1058. }
  1059. if (!may_match) {
  1060. // Successfully filtered
  1061. return false;
  1062. }
  1063. }
  1064. // Wasn't filtered
  1065. return true;
  1066. }
  1067. };
  1068. struct MyFactory : public Factory {
  1069. explicit MyFactory(
  1070. std::shared_ptr<const SstQueryFilterConfigsManagerImpl> _parent,
  1071. const std::string& _configs_name)
  1072. : parent(std::move(_parent)),
  1073. ver_map(parent->GetVerMap(_configs_name)),
  1074. configs_name(_configs_name) {}
  1075. TablePropertiesCollector* CreateTablePropertiesCollector(
  1076. TablePropertiesCollectorFactory::Context /*context*/) override {
  1077. auto& configs = GetConfigs();
  1078. if (configs.IsEmptyNotFound()) {
  1079. return nullptr;
  1080. }
  1081. return new MyCollector(configs, *parent);
  1082. }
  1083. const char* Name() const override {
  1084. // placeholder
  1085. return "SstQueryFilterConfigsManagerImpl::MyFactory";
  1086. }
  1087. Status SetFilteringVersion(FilteringVersion ver) override {
  1088. if (ver > 0 && ver < parent->min_ver_) {
  1089. return Status::InvalidArgument(
  1090. "Filtering version is before earliest known configuration: " +
  1091. std::to_string(ver) + " < " + std::to_string(parent->min_ver_));
  1092. }
  1093. if (ver > parent->max_ver_) {
  1094. return Status::InvalidArgument(
  1095. "Filtering version is after latest known configuration: " +
  1096. std::to_string(ver) + " > " + std::to_string(parent->max_ver_));
  1097. }
  1098. version.StoreRelaxed(ver);
  1099. return Status::OK();
  1100. }
  1101. FilteringVersion GetFilteringVersion() const override {
  1102. return version.LoadRelaxed();
  1103. }
  1104. const std::string& GetConfigsName() const override { return configs_name; }
  1105. const SstQueryFilterConfigs& GetConfigs() const override {
  1106. FilteringVersion ver = version.LoadRelaxed();
  1107. if (ver == 0) {
  1108. // Special case
  1109. return kEmptyNotFoundSQFC;
  1110. }
  1111. assert(ver >= parent->min_ver_);
  1112. assert(ver <= parent->max_ver_);
  1113. auto it = ver_map.upper_bound(ver);
  1114. if (it == ver_map.begin()) {
  1115. return kEmptyNotFoundSQFC;
  1116. } else {
  1117. --it;
  1118. return it->second;
  1119. }
  1120. }
  1121. // The buffers pointed to by the Slices must live as long as any read
  1122. // operations using this table filter function.
  1123. std::function<bool(const TableProperties&)> GetTableFilterForRangeQuery(
  1124. Slice lower_bound_incl, Slice upper_bound_excl) const override {
  1125. // TODO: cache extractor results between SST files, assuming most will
  1126. // use the same version
  1127. return
  1128. [rqf = RangeQueryFilterReader{
  1129. lower_bound_incl, upper_bound_excl, GetConfigs().extractor.get(),
  1130. parent->extractor_map_}](const TableProperties& props) -> bool {
  1131. auto it = props.user_collected_properties.find(kTablePropertyName);
  1132. if (it == props.user_collected_properties.end()) {
  1133. // No filtering
  1134. return true;
  1135. }
  1136. auto& filters = it->second;
  1137. // Parse the serialized filters string
  1138. if (filters.size() < 2 || filters[0] != kSchemaVersion) {
  1139. // TODO? Report problem
  1140. // No filtering
  1141. return true;
  1142. }
  1143. return rqf.MayMatch(Slice(filters.data() + 1, filters.size() - 1));
  1144. };
  1145. }
  1146. const std::shared_ptr<const SstQueryFilterConfigsManagerImpl> parent;
  1147. const ConfigVersionMap& ver_map;
  1148. const std::string configs_name;
  1149. RelaxedAtomic<FilteringVersion> version;
  1150. };
  1151. Status MakeSharedFactory(const std::string& configs_name,
  1152. FilteringVersion ver,
  1153. std::shared_ptr<Factory>* out) const override {
  1154. auto obj = std::make_shared<MyFactory>(
  1155. static_cast_with_check<const SstQueryFilterConfigsManagerImpl>(
  1156. shared_from_this()),
  1157. configs_name);
  1158. Status s = obj->SetFilteringVersion(ver);
  1159. if (s.ok()) {
  1160. *out = std::move(obj);
  1161. }
  1162. return s;
  1163. }
  1164. const ConfigVersionMap& GetVerMap(const std::string& configs_name) const {
  1165. static const ConfigVersionMap kEmptyMap;
  1166. auto it = name_map_.find(configs_name);
  1167. if (it == name_map_.end()) {
  1168. return kEmptyMap;
  1169. }
  1170. return it->second;
  1171. }
  1172. private:
  1173. static const std::string kTablePropertyName;
  1174. static constexpr char kSchemaVersion = 1;
  1175. private:
  1176. UnorderedMap<std::string, ConfigVersionMap> name_map_;
  1177. UnorderedMap<std::string, std::shared_ptr<const KeySegmentsExtractor>>
  1178. extractor_map_;
  1179. FilteringVersion min_ver_ = 0;
  1180. FilteringVersion max_ver_ = 0;
  1181. };
  1182. // SstQueryFilterConfigs
  1183. const std::string SstQueryFilterConfigsManagerImpl::kTablePropertyName =
  1184. "rocksdb.sqfc";
  1185. } // namespace
  1186. std::shared_ptr<const KeySegmentsExtractor>
  1187. MakeSharedCappedKeySegmentsExtractor(const std::vector<size_t>& byte_widths) {
  1188. std::vector<uint32_t> byte_widths_checked;
  1189. byte_widths_checked.resize(byte_widths.size());
  1190. size_t final_end = 0;
  1191. for (size_t i = 0; i < byte_widths.size(); ++i) {
  1192. final_end += byte_widths[i];
  1193. if (byte_widths[i] > UINT32_MAX / 2 || final_end > UINT32_MAX) {
  1194. // Better to crash than to proceed unsafely
  1195. return nullptr;
  1196. }
  1197. byte_widths_checked[i] = static_cast<uint32_t>(byte_widths[i]);
  1198. }
  1199. switch (byte_widths_checked.size()) {
  1200. case 0:
  1201. return std::make_shared<SemiStaticCappedKeySegmentsExtractor<0>>(
  1202. byte_widths_checked.data());
  1203. case 1:
  1204. return std::make_shared<SemiStaticCappedKeySegmentsExtractor<1>>(
  1205. byte_widths_checked.data());
  1206. case 2:
  1207. return std::make_shared<SemiStaticCappedKeySegmentsExtractor<2>>(
  1208. byte_widths_checked.data());
  1209. case 3:
  1210. return std::make_shared<SemiStaticCappedKeySegmentsExtractor<3>>(
  1211. byte_widths_checked.data());
  1212. case 4:
  1213. return std::make_shared<SemiStaticCappedKeySegmentsExtractor<4>>(
  1214. byte_widths_checked.data());
  1215. case 5:
  1216. return std::make_shared<SemiStaticCappedKeySegmentsExtractor<5>>(
  1217. byte_widths_checked.data());
  1218. case 6:
  1219. return std::make_shared<SemiStaticCappedKeySegmentsExtractor<6>>(
  1220. byte_widths_checked.data());
  1221. default:
  1222. return std::make_shared<DynamicCappedKeySegmentsExtractor>(
  1223. byte_widths_checked);
  1224. }
  1225. }
  1226. bool SstQueryFilterConfigs::IsEmptyNotFound() const {
  1227. return this == &kEmptyNotFoundSQFC;
  1228. }
  1229. std::shared_ptr<SstQueryFilterConfig> MakeSharedBytewiseMinMaxSQFC(
  1230. FilterInput input, KeySegmentsExtractor::KeyCategorySet categories) {
  1231. return std::make_shared<BytewiseMinMaxSstQueryFilterConfig>(
  1232. input, categories,
  1233. /*reverse=*/false);
  1234. }
  1235. std::shared_ptr<SstQueryFilterConfig> MakeSharedReverseBytewiseMinMaxSQFC(
  1236. FilterInput input, KeySegmentsExtractor::KeyCategorySet categories) {
  1237. return std::make_shared<BytewiseMinMaxSstQueryFilterConfig>(input, categories,
  1238. /*reverse=*/true);
  1239. }
  1240. Status SstQueryFilterConfigsManager::MakeShared(
  1241. const Data& data, std::shared_ptr<SstQueryFilterConfigsManager>* out) {
  1242. auto obj = std::make_shared<SstQueryFilterConfigsManagerImpl>();
  1243. Status s = obj->Populate(data);
  1244. if (s.ok()) {
  1245. *out = std::move(obj);
  1246. }
  1247. return s;
  1248. }
  1249. } // namespace ROCKSDB_NAMESPACE::experimental