||
- # Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- # This source code is licensed under both the GPLv2 (found in the
- # COPYING file in the root directory) and Apache 2.0 License
- # (found in the LICENSE.Apache file in the root directory).
- from abc import ABC, abstractmethod
- from advisor.db_log_parser import DataSource, NO_COL_FAMILY
- from advisor.db_timeseries_parser import TimeSeriesData
- from enum import Enum
- from advisor.ini_parser import IniParser
- import re
- class Section(ABC):
- def __init__(self, name):
- self.name = name
- @abstractmethod
- def set_parameter(self, key, value):
- pass
- @abstractmethod
- def perform_checks(self):
- pass
- class Rule(Section):
- def __init__(self, name):
- super().__init__(name)
- self.conditions = None
- self.suggestions = None
- self.overlap_time_seconds = None
- self.trigger_entities = None
- self.trigger_column_families = None
- def set_parameter(self, key, value):
- # If the Rule is associated with a single suggestion/condition, then
- # value will be a string and not a list. Hence, convert it to a single
- # element list before storing it in self.suggestions or
- # self.conditions.
- if key == 'conditions':
- if isinstance(value, str):
- self.conditions = [value]
- else:
- self.conditions = value
- elif key == 'suggestions':
- if isinstance(value, str):
- self.suggestions = [value]
- else:
- self.suggestions = value
- elif key == 'overlap_time_period':
- self.overlap_time_seconds = value
- def get_suggestions(self):
- return self.suggestions
- def perform_checks(self):
- if not self.conditions or len(self.conditions) < 1:
- raise ValueError(
- self.name + ': rule must have at least one condition'
- )
- if not self.suggestions or len(self.suggestions) < 1:
- raise ValueError(
- self.name + ': rule must have at least one suggestion'
- )
- if self.overlap_time_seconds:
- if len(self.conditions) != 2:
- raise ValueError(
- self.name + ": rule must be associated with 2 conditions\
- in order to check for a time dependency between them"
- )
- time_format = '^\d+[s|m|h|d]$'
- if (
- not
- re.match(time_format, self.overlap_time_seconds, re.IGNORECASE)
- ):
- raise ValueError(
- self.name + ": overlap_time_seconds format: \d+[s|m|h|d]"
- )
- else: # convert to seconds
- in_seconds = int(self.overlap_time_seconds[:-1])
- if self.overlap_time_seconds[-1] == 'm':
- in_seconds *= 60
- elif self.overlap_time_seconds[-1] == 'h':
- in_seconds *= (60 * 60)
- elif self.overlap_time_seconds[-1] == 'd':
- in_seconds *= (24 * 60 * 60)
- self.overlap_time_seconds = in_seconds
- def get_overlap_timestamps(self, key1_trigger_epochs, key2_trigger_epochs):
- # this method takes in 2 timeseries i.e. timestamps at which the
- # rule's 2 TIME_SERIES conditions were triggered and it finds
- # (if present) the first pair of timestamps at which the 2 conditions
- # were triggered within 'overlap_time_seconds' of each other
- key1_lower_bounds = [
- epoch - self.overlap_time_seconds
- for epoch in key1_trigger_epochs
- ]
- key1_lower_bounds.sort()
- key2_trigger_epochs.sort()
- trigger_ix = 0
- overlap_pair = None
- for key1_lb in key1_lower_bounds:
- while (
- key2_trigger_epochs[trigger_ix] < key1_lb and
- trigger_ix < len(key2_trigger_epochs)
- ):
- trigger_ix += 1
- if trigger_ix >= len(key2_trigger_epochs):
- break
- if (
- key2_trigger_epochs[trigger_ix] <=
- key1_lb + (2 * self.overlap_time_seconds)
- ):
- overlap_pair = (
- key2_trigger_epochs[trigger_ix],
- key1_lb + self.overlap_time_seconds
- )
- break
- return overlap_pair
- def get_trigger_entities(self):
- return self.trigger_entities
- def get_trigger_column_families(self):
- return self.trigger_column_families
- def is_triggered(self, conditions_dict, column_families):
- if self.overlap_time_seconds:
- condition1 = conditions_dict[self.conditions[0]]
- condition2 = conditions_dict[self.conditions[1]]
- if not (
- condition1.get_data_source() is DataSource.Type.TIME_SERIES and
- condition2.get_data_source() is DataSource.Type.TIME_SERIES
- ):
- raise ValueError(self.name + ': need 2 timeseries conditions')
- map1 = condition1.get_trigger()
- map2 = condition2.get_trigger()
- if not (map1 and map2):
- return False
- self.trigger_entities = {}
- is_triggered = False
- entity_intersection = (
- set(map1.keys()).intersection(set(map2.keys()))
- )
- for entity in entity_intersection:
- overlap_timestamps_pair = (
- self.get_overlap_timestamps(
- list(map1[entity].keys()), list(map2[entity].keys())
- )
- )
- if overlap_timestamps_pair:
- self.trigger_entities[entity] = overlap_timestamps_pair
- is_triggered = True
- if is_triggered:
- self.trigger_column_families = set(column_families)
- return is_triggered
- else:
- all_conditions_triggered = True
- self.trigger_column_families = set(column_families)
- for cond_name in self.conditions:
- cond = conditions_dict[cond_name]
- if not cond.get_trigger():
- all_conditions_triggered = False
- break
- if (
- cond.get_data_source() is DataSource.Type.LOG or
- cond.get_data_source() is DataSource.Type.DB_OPTIONS
- ):
- cond_col_fam = set(cond.get_trigger().keys())
- if NO_COL_FAMILY in cond_col_fam:
- cond_col_fam = set(column_families)
- self.trigger_column_families = (
- self.trigger_column_families.intersection(cond_col_fam)
- )
- elif cond.get_data_source() is DataSource.Type.TIME_SERIES:
- cond_entities = set(cond.get_trigger().keys())
- if self.trigger_entities is None:
- self.trigger_entities = cond_entities
- else:
- self.trigger_entities = (
- self.trigger_entities.intersection(cond_entities)
- )
- if not (self.trigger_entities or self.trigger_column_families):
- all_conditions_triggered = False
- break
- if not all_conditions_triggered: # clean up if rule not triggered
- self.trigger_column_families = None
- self.trigger_entities = None
- return all_conditions_triggered
- def __repr__(self):
- # Append conditions
- rule_string = "Rule: " + self.name + " has conditions:: "
- is_first = True
- for cond in self.conditions:
- if is_first:
- rule_string += cond
- is_first = False
- else:
- rule_string += (" AND " + cond)
- # Append suggestions
- rule_string += "\nsuggestions:: "
- is_first = True
- for sugg in self.suggestions:
- if is_first:
- rule_string += sugg
- is_first = False
- else:
- rule_string += (", " + sugg)
- if self.trigger_entities:
- rule_string += (', entities:: ' + str(self.trigger_entities))
- if self.trigger_column_families:
- rule_string += (', col_fam:: ' + str(self.trigger_column_families))
- # Return constructed string
- return rule_string
- class Suggestion(Section):
- class Action(Enum):
- set = 1
- increase = 2
- decrease = 3
- def __init__(self, name):
- super().__init__(name)
- self.option = None
- self.action = None
- self.suggested_values = None
- self.description = None
- def set_parameter(self, key, value):
- if key == 'option':
- # Note:
- # case 1: 'option' is supported by Rocksdb OPTIONS file; in this
- # case the option belongs to one of the sections in the config
- # file and it's name is prefixed by "<section_type>."
- # case 2: 'option' is not supported by Rocksdb OPTIONS file; the
- # option is not expected to have the character '.' in its name
- self.option = value
- elif key == 'action':
- if self.option and not value:
- raise ValueError(self.name + ': provide action for option')
- self.action = self.Action[value]
- elif key == 'suggested_values':
- if isinstance(value, str):
- self.suggested_values = [value]
- else:
- self.suggested_values = value
- elif key == 'description':
- self.description = value
- def perform_checks(self):
- if not self.description:
- if not self.option:
- raise ValueError(self.name + ': provide option or description')
- if not self.action:
- raise ValueError(self.name + ': provide action for option')
- if self.action is self.Action.set and not self.suggested_values:
- raise ValueError(
- self.name + ': provide suggested value for option'
- )
- def __repr__(self):
- sugg_string = "Suggestion: " + self.name
- if self.description:
- sugg_string += (' description : ' + self.description)
- else:
- sugg_string += (
- ' option : ' + self.option + ' action : ' + self.action.name
- )
- if self.suggested_values:
- sugg_string += (
- ' suggested_values : ' + str(self.suggested_values)
- )
- return sugg_string
- class Condition(Section):
- def __init__(self, name):
- super().__init__(name)
- self.data_source = None
- self.trigger = None
- def perform_checks(self):
- if not self.data_source:
- raise ValueError(self.name + ': condition not tied to data source')
- def set_data_source(self, data_source):
- self.data_source = data_source
- def get_data_source(self):
- return self.data_source
- def reset_trigger(self):
- self.trigger = None
- def set_trigger(self, condition_trigger):
- self.trigger = condition_trigger
- def get_trigger(self):
- return self.trigger
- def is_triggered(self):
- if self.trigger:
- return True
- return False
- def set_parameter(self, key, value):
- # must be defined by the subclass
- raise NotImplementedError(self.name + ': provide source for condition')
- class LogCondition(Condition):
- @classmethod
- def create(cls, base_condition):
- base_condition.set_data_source(DataSource.Type['LOG'])
- base_condition.__class__ = cls
- return base_condition
- def set_parameter(self, key, value):
- if key == 'regex':
- self.regex = value
- def perform_checks(self):
- super().perform_checks()
- if not self.regex:
- raise ValueError(self.name + ': provide regex for log condition')
- def __repr__(self):
- log_cond_str = "LogCondition: " + self.name
- log_cond_str += (" regex: " + self.regex)
- # if self.trigger:
- # log_cond_str += (" trigger: " + str(self.trigger))
- return log_cond_str
- class OptionCondition(Condition):
- @classmethod
- def create(cls, base_condition):
- base_condition.set_data_source(DataSource.Type['DB_OPTIONS'])
- base_condition.__class__ = cls
- return base_condition
- def set_parameter(self, key, value):
- if key == 'options':
- if isinstance(value, str):
- self.options = [value]
- else:
- self.options = value
- elif key == 'evaluate':
- self.eval_expr = value
- def perform_checks(self):
- super().perform_checks()
- if not self.options:
- raise ValueError(self.name + ': options missing in condition')
- if not self.eval_expr:
- raise ValueError(self.name + ': expression missing in condition')
- def __repr__(self):
- opt_cond_str = "OptionCondition: " + self.name
- opt_cond_str += (" options: " + str(self.options))
- opt_cond_str += (" expression: " + self.eval_expr)
- if self.trigger:
- opt_cond_str += (" trigger: " + str(self.trigger))
- return opt_cond_str
- class TimeSeriesCondition(Condition):
- @classmethod
- def create(cls, base_condition):
- base_condition.set_data_source(DataSource.Type['TIME_SERIES'])
- base_condition.__class__ = cls
- return base_condition
- def set_parameter(self, key, value):
- if key == 'keys':
- if isinstance(value, str):
- self.keys = [value]
- else:
- self.keys = value
- elif key == 'behavior':
- self.behavior = TimeSeriesData.Behavior[value]
- elif key == 'rate_threshold':
- self.rate_threshold = float(value)
- elif key == 'window_sec':
- self.window_sec = int(value)
- elif key == 'evaluate':
- self.expression = value
- elif key == 'aggregation_op':
- self.aggregation_op = TimeSeriesData.AggregationOperator[value]
- def perform_checks(self):
- if not self.keys:
- raise ValueError(self.name + ': specify timeseries key')
- if not self.behavior:
- raise ValueError(self.name + ': specify triggering behavior')
- if self.behavior is TimeSeriesData.Behavior.bursty:
- if not self.rate_threshold:
- raise ValueError(self.name + ': specify rate burst threshold')
- if not self.window_sec:
- self.window_sec = 300 # default window length is 5 minutes
- if len(self.keys) > 1:
- raise ValueError(self.name + ': specify only one key')
- elif self.behavior is TimeSeriesData.Behavior.evaluate_expression:
- if not (self.expression):
- raise ValueError(self.name + ': specify evaluation expression')
- else:
- raise ValueError(self.name + ': trigger behavior not supported')
- def __repr__(self):
- ts_cond_str = "TimeSeriesCondition: " + self.name
- ts_cond_str += (" statistics: " + str(self.keys))
- ts_cond_str += (" behavior: " + self.behavior.name)
- if self.behavior is TimeSeriesData.Behavior.bursty:
- ts_cond_str += (" rate_threshold: " + str(self.rate_threshold))
- ts_cond_str += (" window_sec: " + str(self.window_sec))
- if self.behavior is TimeSeriesData.Behavior.evaluate_expression:
- ts_cond_str += (" expression: " + self.expression)
- if hasattr(self, 'aggregation_op'):
- ts_cond_str += (" aggregation_op: " + self.aggregation_op.name)
- if self.trigger:
- ts_cond_str += (" trigger: " + str(self.trigger))
- return ts_cond_str
- class RulesSpec:
- def __init__(self, rules_path):
- self.file_path = rules_path
- def initialise_fields(self):
- self.rules_dict = {}
- self.conditions_dict = {}
- self.suggestions_dict = {}
- def perform_section_checks(self):
- for rule in self.rules_dict.values():
- rule.perform_checks()
- for cond in self.conditions_dict.values():
- cond.perform_checks()
- for sugg in self.suggestions_dict.values():
- sugg.perform_checks()
- def load_rules_from_spec(self):
- self.initialise_fields()
- with open(self.file_path, 'r') as db_rules:
- curr_section = None
- for line in db_rules:
- line = IniParser.remove_trailing_comment(line)
- if not line:
- continue
- element = IniParser.get_element(line)
- if element is IniParser.Element.comment:
- continue
- elif element is not IniParser.Element.key_val:
- curr_section = element # it's a new IniParser header
- section_name = IniParser.get_section_name(line)
- if element is IniParser.Element.rule:
- new_rule = Rule(section_name)
- self.rules_dict[section_name] = new_rule
- elif element is IniParser.Element.cond:
- new_cond = Condition(section_name)
- self.conditions_dict[section_name] = new_cond
- elif element is IniParser.Element.sugg:
- new_suggestion = Suggestion(section_name)
- self.suggestions_dict[section_name] = new_suggestion
- elif element is IniParser.Element.key_val:
- key, value = IniParser.get_key_value_pair(line)
- if curr_section is IniParser.Element.rule:
- new_rule.set_parameter(key, value)
- elif curr_section is IniParser.Element.cond:
- if key == 'source':
- if value == 'LOG':
- new_cond = LogCondition.create(new_cond)
- elif value == 'OPTIONS':
- new_cond = OptionCondition.create(new_cond)
- elif value == 'TIME_SERIES':
- new_cond = TimeSeriesCondition.create(new_cond)
- else:
- new_cond.set_parameter(key, value)
- elif curr_section is IniParser.Element.sugg:
- new_suggestion.set_parameter(key, value)
- def get_rules_dict(self):
- return self.rules_dict
- def get_conditions_dict(self):
- return self.conditions_dict
- def get_suggestions_dict(self):
- return self.suggestions_dict
- def get_triggered_rules(self, data_sources, column_families):
- self.trigger_conditions(data_sources)
- triggered_rules = []
- for rule in self.rules_dict.values():
- if rule.is_triggered(self.conditions_dict, column_families):
- triggered_rules.append(rule)
- return triggered_rules
- def trigger_conditions(self, data_sources):
- for source_type in data_sources:
- cond_subset = [
- cond
- for cond in self.conditions_dict.values()
- if cond.get_data_source() is source_type
- ]
- if not cond_subset:
- continue
- for source in data_sources[source_type]:
- source.check_and_trigger_conditions(cond_subset)
- def print_rules(self, rules):
- for rule in rules:
- print('\nRule: ' + rule.name)
- for cond_name in rule.conditions:
- print(repr(self.conditions_dict[cond_name]))
- for sugg_name in rule.suggestions:
- print(repr(self.suggestions_dict[sugg_name]))
- if rule.trigger_entities:
- print('scope: entities:')
- print(rule.trigger_entities)
- if rule.trigger_column_families:
- print('scope: col_fam:')
- print(rule.trigger_column_families)
|