rule_parser.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528
  1. # Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. # This source code is licensed under both the GPLv2 (found in the
  3. # COPYING file in the root directory) and Apache 2.0 License
  4. # (found in the LICENSE.Apache file in the root directory).
  5. from abc import ABC, abstractmethod
  6. from advisor.db_log_parser import DataSource, NO_COL_FAMILY
  7. from advisor.db_timeseries_parser import TimeSeriesData
  8. from enum import Enum
  9. from advisor.ini_parser import IniParser
  10. import re
  11. class Section(ABC):
  12. def __init__(self, name):
  13. self.name = name
  14. @abstractmethod
  15. def set_parameter(self, key, value):
  16. pass
  17. @abstractmethod
  18. def perform_checks(self):
  19. pass
  20. class Rule(Section):
  21. def __init__(self, name):
  22. super().__init__(name)
  23. self.conditions = None
  24. self.suggestions = None
  25. self.overlap_time_seconds = None
  26. self.trigger_entities = None
  27. self.trigger_column_families = None
  28. def set_parameter(self, key, value):
  29. # If the Rule is associated with a single suggestion/condition, then
  30. # value will be a string and not a list. Hence, convert it to a single
  31. # element list before storing it in self.suggestions or
  32. # self.conditions.
  33. if key == 'conditions':
  34. if isinstance(value, str):
  35. self.conditions = [value]
  36. else:
  37. self.conditions = value
  38. elif key == 'suggestions':
  39. if isinstance(value, str):
  40. self.suggestions = [value]
  41. else:
  42. self.suggestions = value
  43. elif key == 'overlap_time_period':
  44. self.overlap_time_seconds = value
  45. def get_suggestions(self):
  46. return self.suggestions
  47. def perform_checks(self):
  48. if not self.conditions or len(self.conditions) < 1:
  49. raise ValueError(
  50. self.name + ': rule must have at least one condition'
  51. )
  52. if not self.suggestions or len(self.suggestions) < 1:
  53. raise ValueError(
  54. self.name + ': rule must have at least one suggestion'
  55. )
  56. if self.overlap_time_seconds:
  57. if len(self.conditions) != 2:
  58. raise ValueError(
  59. self.name + ": rule must be associated with 2 conditions\
  60. in order to check for a time dependency between them"
  61. )
  62. time_format = '^\d+[s|m|h|d]$'
  63. if (
  64. not
  65. re.match(time_format, self.overlap_time_seconds, re.IGNORECASE)
  66. ):
  67. raise ValueError(
  68. self.name + ": overlap_time_seconds format: \d+[s|m|h|d]"
  69. )
  70. else: # convert to seconds
  71. in_seconds = int(self.overlap_time_seconds[:-1])
  72. if self.overlap_time_seconds[-1] == 'm':
  73. in_seconds *= 60
  74. elif self.overlap_time_seconds[-1] == 'h':
  75. in_seconds *= (60 * 60)
  76. elif self.overlap_time_seconds[-1] == 'd':
  77. in_seconds *= (24 * 60 * 60)
  78. self.overlap_time_seconds = in_seconds
  79. def get_overlap_timestamps(self, key1_trigger_epochs, key2_trigger_epochs):
  80. # this method takes in 2 timeseries i.e. timestamps at which the
  81. # rule's 2 TIME_SERIES conditions were triggered and it finds
  82. # (if present) the first pair of timestamps at which the 2 conditions
  83. # were triggered within 'overlap_time_seconds' of each other
  84. key1_lower_bounds = [
  85. epoch - self.overlap_time_seconds
  86. for epoch in key1_trigger_epochs
  87. ]
  88. key1_lower_bounds.sort()
  89. key2_trigger_epochs.sort()
  90. trigger_ix = 0
  91. overlap_pair = None
  92. for key1_lb in key1_lower_bounds:
  93. while (
  94. key2_trigger_epochs[trigger_ix] < key1_lb and
  95. trigger_ix < len(key2_trigger_epochs)
  96. ):
  97. trigger_ix += 1
  98. if trigger_ix >= len(key2_trigger_epochs):
  99. break
  100. if (
  101. key2_trigger_epochs[trigger_ix] <=
  102. key1_lb + (2 * self.overlap_time_seconds)
  103. ):
  104. overlap_pair = (
  105. key2_trigger_epochs[trigger_ix],
  106. key1_lb + self.overlap_time_seconds
  107. )
  108. break
  109. return overlap_pair
  110. def get_trigger_entities(self):
  111. return self.trigger_entities
  112. def get_trigger_column_families(self):
  113. return self.trigger_column_families
  114. def is_triggered(self, conditions_dict, column_families):
  115. if self.overlap_time_seconds:
  116. condition1 = conditions_dict[self.conditions[0]]
  117. condition2 = conditions_dict[self.conditions[1]]
  118. if not (
  119. condition1.get_data_source() is DataSource.Type.TIME_SERIES and
  120. condition2.get_data_source() is DataSource.Type.TIME_SERIES
  121. ):
  122. raise ValueError(self.name + ': need 2 timeseries conditions')
  123. map1 = condition1.get_trigger()
  124. map2 = condition2.get_trigger()
  125. if not (map1 and map2):
  126. return False
  127. self.trigger_entities = {}
  128. is_triggered = False
  129. entity_intersection = (
  130. set(map1.keys()).intersection(set(map2.keys()))
  131. )
  132. for entity in entity_intersection:
  133. overlap_timestamps_pair = (
  134. self.get_overlap_timestamps(
  135. list(map1[entity].keys()), list(map2[entity].keys())
  136. )
  137. )
  138. if overlap_timestamps_pair:
  139. self.trigger_entities[entity] = overlap_timestamps_pair
  140. is_triggered = True
  141. if is_triggered:
  142. self.trigger_column_families = set(column_families)
  143. return is_triggered
  144. else:
  145. all_conditions_triggered = True
  146. self.trigger_column_families = set(column_families)
  147. for cond_name in self.conditions:
  148. cond = conditions_dict[cond_name]
  149. if not cond.get_trigger():
  150. all_conditions_triggered = False
  151. break
  152. if (
  153. cond.get_data_source() is DataSource.Type.LOG or
  154. cond.get_data_source() is DataSource.Type.DB_OPTIONS
  155. ):
  156. cond_col_fam = set(cond.get_trigger().keys())
  157. if NO_COL_FAMILY in cond_col_fam:
  158. cond_col_fam = set(column_families)
  159. self.trigger_column_families = (
  160. self.trigger_column_families.intersection(cond_col_fam)
  161. )
  162. elif cond.get_data_source() is DataSource.Type.TIME_SERIES:
  163. cond_entities = set(cond.get_trigger().keys())
  164. if self.trigger_entities is None:
  165. self.trigger_entities = cond_entities
  166. else:
  167. self.trigger_entities = (
  168. self.trigger_entities.intersection(cond_entities)
  169. )
  170. if not (self.trigger_entities or self.trigger_column_families):
  171. all_conditions_triggered = False
  172. break
  173. if not all_conditions_triggered: # clean up if rule not triggered
  174. self.trigger_column_families = None
  175. self.trigger_entities = None
  176. return all_conditions_triggered
  177. def __repr__(self):
  178. # Append conditions
  179. rule_string = "Rule: " + self.name + " has conditions:: "
  180. is_first = True
  181. for cond in self.conditions:
  182. if is_first:
  183. rule_string += cond
  184. is_first = False
  185. else:
  186. rule_string += (" AND " + cond)
  187. # Append suggestions
  188. rule_string += "\nsuggestions:: "
  189. is_first = True
  190. for sugg in self.suggestions:
  191. if is_first:
  192. rule_string += sugg
  193. is_first = False
  194. else:
  195. rule_string += (", " + sugg)
  196. if self.trigger_entities:
  197. rule_string += (', entities:: ' + str(self.trigger_entities))
  198. if self.trigger_column_families:
  199. rule_string += (', col_fam:: ' + str(self.trigger_column_families))
  200. # Return constructed string
  201. return rule_string
  202. class Suggestion(Section):
  203. class Action(Enum):
  204. set = 1
  205. increase = 2
  206. decrease = 3
  207. def __init__(self, name):
  208. super().__init__(name)
  209. self.option = None
  210. self.action = None
  211. self.suggested_values = None
  212. self.description = None
  213. def set_parameter(self, key, value):
  214. if key == 'option':
  215. # Note:
  216. # case 1: 'option' is supported by Rocksdb OPTIONS file; in this
  217. # case the option belongs to one of the sections in the config
  218. # file and it's name is prefixed by "<section_type>."
  219. # case 2: 'option' is not supported by Rocksdb OPTIONS file; the
  220. # option is not expected to have the character '.' in its name
  221. self.option = value
  222. elif key == 'action':
  223. if self.option and not value:
  224. raise ValueError(self.name + ': provide action for option')
  225. self.action = self.Action[value]
  226. elif key == 'suggested_values':
  227. if isinstance(value, str):
  228. self.suggested_values = [value]
  229. else:
  230. self.suggested_values = value
  231. elif key == 'description':
  232. self.description = value
  233. def perform_checks(self):
  234. if not self.description:
  235. if not self.option:
  236. raise ValueError(self.name + ': provide option or description')
  237. if not self.action:
  238. raise ValueError(self.name + ': provide action for option')
  239. if self.action is self.Action.set and not self.suggested_values:
  240. raise ValueError(
  241. self.name + ': provide suggested value for option'
  242. )
  243. def __repr__(self):
  244. sugg_string = "Suggestion: " + self.name
  245. if self.description:
  246. sugg_string += (' description : ' + self.description)
  247. else:
  248. sugg_string += (
  249. ' option : ' + self.option + ' action : ' + self.action.name
  250. )
  251. if self.suggested_values:
  252. sugg_string += (
  253. ' suggested_values : ' + str(self.suggested_values)
  254. )
  255. return sugg_string
  256. class Condition(Section):
  257. def __init__(self, name):
  258. super().__init__(name)
  259. self.data_source = None
  260. self.trigger = None
  261. def perform_checks(self):
  262. if not self.data_source:
  263. raise ValueError(self.name + ': condition not tied to data source')
  264. def set_data_source(self, data_source):
  265. self.data_source = data_source
  266. def get_data_source(self):
  267. return self.data_source
  268. def reset_trigger(self):
  269. self.trigger = None
  270. def set_trigger(self, condition_trigger):
  271. self.trigger = condition_trigger
  272. def get_trigger(self):
  273. return self.trigger
  274. def is_triggered(self):
  275. if self.trigger:
  276. return True
  277. return False
  278. def set_parameter(self, key, value):
  279. # must be defined by the subclass
  280. raise NotImplementedError(self.name + ': provide source for condition')
  281. class LogCondition(Condition):
  282. @classmethod
  283. def create(cls, base_condition):
  284. base_condition.set_data_source(DataSource.Type['LOG'])
  285. base_condition.__class__ = cls
  286. return base_condition
  287. def set_parameter(self, key, value):
  288. if key == 'regex':
  289. self.regex = value
  290. def perform_checks(self):
  291. super().perform_checks()
  292. if not self.regex:
  293. raise ValueError(self.name + ': provide regex for log condition')
  294. def __repr__(self):
  295. log_cond_str = "LogCondition: " + self.name
  296. log_cond_str += (" regex: " + self.regex)
  297. # if self.trigger:
  298. # log_cond_str += (" trigger: " + str(self.trigger))
  299. return log_cond_str
  300. class OptionCondition(Condition):
  301. @classmethod
  302. def create(cls, base_condition):
  303. base_condition.set_data_source(DataSource.Type['DB_OPTIONS'])
  304. base_condition.__class__ = cls
  305. return base_condition
  306. def set_parameter(self, key, value):
  307. if key == 'options':
  308. if isinstance(value, str):
  309. self.options = [value]
  310. else:
  311. self.options = value
  312. elif key == 'evaluate':
  313. self.eval_expr = value
  314. def perform_checks(self):
  315. super().perform_checks()
  316. if not self.options:
  317. raise ValueError(self.name + ': options missing in condition')
  318. if not self.eval_expr:
  319. raise ValueError(self.name + ': expression missing in condition')
  320. def __repr__(self):
  321. opt_cond_str = "OptionCondition: " + self.name
  322. opt_cond_str += (" options: " + str(self.options))
  323. opt_cond_str += (" expression: " + self.eval_expr)
  324. if self.trigger:
  325. opt_cond_str += (" trigger: " + str(self.trigger))
  326. return opt_cond_str
  327. class TimeSeriesCondition(Condition):
  328. @classmethod
  329. def create(cls, base_condition):
  330. base_condition.set_data_source(DataSource.Type['TIME_SERIES'])
  331. base_condition.__class__ = cls
  332. return base_condition
  333. def set_parameter(self, key, value):
  334. if key == 'keys':
  335. if isinstance(value, str):
  336. self.keys = [value]
  337. else:
  338. self.keys = value
  339. elif key == 'behavior':
  340. self.behavior = TimeSeriesData.Behavior[value]
  341. elif key == 'rate_threshold':
  342. self.rate_threshold = float(value)
  343. elif key == 'window_sec':
  344. self.window_sec = int(value)
  345. elif key == 'evaluate':
  346. self.expression = value
  347. elif key == 'aggregation_op':
  348. self.aggregation_op = TimeSeriesData.AggregationOperator[value]
  349. def perform_checks(self):
  350. if not self.keys:
  351. raise ValueError(self.name + ': specify timeseries key')
  352. if not self.behavior:
  353. raise ValueError(self.name + ': specify triggering behavior')
  354. if self.behavior is TimeSeriesData.Behavior.bursty:
  355. if not self.rate_threshold:
  356. raise ValueError(self.name + ': specify rate burst threshold')
  357. if not self.window_sec:
  358. self.window_sec = 300 # default window length is 5 minutes
  359. if len(self.keys) > 1:
  360. raise ValueError(self.name + ': specify only one key')
  361. elif self.behavior is TimeSeriesData.Behavior.evaluate_expression:
  362. if not (self.expression):
  363. raise ValueError(self.name + ': specify evaluation expression')
  364. else:
  365. raise ValueError(self.name + ': trigger behavior not supported')
  366. def __repr__(self):
  367. ts_cond_str = "TimeSeriesCondition: " + self.name
  368. ts_cond_str += (" statistics: " + str(self.keys))
  369. ts_cond_str += (" behavior: " + self.behavior.name)
  370. if self.behavior is TimeSeriesData.Behavior.bursty:
  371. ts_cond_str += (" rate_threshold: " + str(self.rate_threshold))
  372. ts_cond_str += (" window_sec: " + str(self.window_sec))
  373. if self.behavior is TimeSeriesData.Behavior.evaluate_expression:
  374. ts_cond_str += (" expression: " + self.expression)
  375. if hasattr(self, 'aggregation_op'):
  376. ts_cond_str += (" aggregation_op: " + self.aggregation_op.name)
  377. if self.trigger:
  378. ts_cond_str += (" trigger: " + str(self.trigger))
  379. return ts_cond_str
  380. class RulesSpec:
  381. def __init__(self, rules_path):
  382. self.file_path = rules_path
  383. def initialise_fields(self):
  384. self.rules_dict = {}
  385. self.conditions_dict = {}
  386. self.suggestions_dict = {}
  387. def perform_section_checks(self):
  388. for rule in self.rules_dict.values():
  389. rule.perform_checks()
  390. for cond in self.conditions_dict.values():
  391. cond.perform_checks()
  392. for sugg in self.suggestions_dict.values():
  393. sugg.perform_checks()
  394. def load_rules_from_spec(self):
  395. self.initialise_fields()
  396. with open(self.file_path, 'r') as db_rules:
  397. curr_section = None
  398. for line in db_rules:
  399. line = IniParser.remove_trailing_comment(line)
  400. if not line:
  401. continue
  402. element = IniParser.get_element(line)
  403. if element is IniParser.Element.comment:
  404. continue
  405. elif element is not IniParser.Element.key_val:
  406. curr_section = element # it's a new IniParser header
  407. section_name = IniParser.get_section_name(line)
  408. if element is IniParser.Element.rule:
  409. new_rule = Rule(section_name)
  410. self.rules_dict[section_name] = new_rule
  411. elif element is IniParser.Element.cond:
  412. new_cond = Condition(section_name)
  413. self.conditions_dict[section_name] = new_cond
  414. elif element is IniParser.Element.sugg:
  415. new_suggestion = Suggestion(section_name)
  416. self.suggestions_dict[section_name] = new_suggestion
  417. elif element is IniParser.Element.key_val:
  418. key, value = IniParser.get_key_value_pair(line)
  419. if curr_section is IniParser.Element.rule:
  420. new_rule.set_parameter(key, value)
  421. elif curr_section is IniParser.Element.cond:
  422. if key == 'source':
  423. if value == 'LOG':
  424. new_cond = LogCondition.create(new_cond)
  425. elif value == 'OPTIONS':
  426. new_cond = OptionCondition.create(new_cond)
  427. elif value == 'TIME_SERIES':
  428. new_cond = TimeSeriesCondition.create(new_cond)
  429. else:
  430. new_cond.set_parameter(key, value)
  431. elif curr_section is IniParser.Element.sugg:
  432. new_suggestion.set_parameter(key, value)
  433. def get_rules_dict(self):
  434. return self.rules_dict
  435. def get_conditions_dict(self):
  436. return self.conditions_dict
  437. def get_suggestions_dict(self):
  438. return self.suggestions_dict
  439. def get_triggered_rules(self, data_sources, column_families):
  440. self.trigger_conditions(data_sources)
  441. triggered_rules = []
  442. for rule in self.rules_dict.values():
  443. if rule.is_triggered(self.conditions_dict, column_families):
  444. triggered_rules.append(rule)
  445. return triggered_rules
  446. def trigger_conditions(self, data_sources):
  447. for source_type in data_sources:
  448. cond_subset = [
  449. cond
  450. for cond in self.conditions_dict.values()
  451. if cond.get_data_source() is source_type
  452. ]
  453. if not cond_subset:
  454. continue
  455. for source in data_sources[source_type]:
  456. source.check_and_trigger_conditions(cond_subset)
  457. def print_rules(self, rules):
  458. for rule in rules:
  459. print('\nRule: ' + rule.name)
  460. for cond_name in rule.conditions:
  461. print(repr(self.conditions_dict[cond_name]))
  462. for sugg_name in rule.suggestions:
  463. print(repr(self.suggestions_dict[sugg_name]))
  464. if rule.trigger_entities:
  465. print('scope: entities:')
  466. print(rule.trigger_entities)
  467. if rule.trigger_column_families:
  468. print('scope: col_fam:')
  469. print(rule.trigger_column_families)