diff --git a/splitio/engine/evaluator.py b/splitio/engine/evaluator.py index 5cbbd205..26875a68 100644 --- a/splitio/engine/evaluator.py +++ b/splitio/engine/evaluator.py @@ -57,23 +57,9 @@ def eval_with_context(self, key, bucketing, feature_name, attrs, ctx): label = Label.KILLED _treatment = feature.default_treatment else: - if feature.prerequisites is not None: - prerequisites_matcher = PrerequisitesMatcher(feature.prerequisites) - if not prerequisites_matcher.match(key, attrs, { - 'evaluator': self, - 'bucketing_key': bucketing, - 'ec': ctx}): - label = Label.PREREQUISITES_NOT_MET - _treatment = feature.default_treatment + label, _treatment = self._check_prerequisites(feature, bucketing, key, attrs, ctx, label, _treatment) + label, _treatment = self._get_treatment(feature, bucketing, key, attrs, ctx, label, _treatment) - if _treatment == CONTROL: - treatment, label = self._treatment_for_flag(feature, key, bucketing, attrs, ctx) - if treatment is None: - label = Label.NO_CONDITION_MATCHED - _treatment = feature.default_treatment - else: - _treatment = treatment - return { 'treatment': _treatment, 'configurations': feature.get_configurations_for(_treatment) if feature else None, @@ -84,6 +70,30 @@ def eval_with_context(self, key, bucketing, feature_name, attrs, ctx): 'impressions_disabled': feature.impressions_disabled if feature else None } + def _get_treatment(self, feature, bucketing, key, attrs, ctx, label, _treatment): + if _treatment == CONTROL: + treatment, label = self._treatment_for_flag(feature, key, bucketing, attrs, ctx) + if treatment is None: + label = Label.NO_CONDITION_MATCHED + _treatment = feature.default_treatment + else: + _treatment = treatment + + return label, _treatment + + def _check_prerequisites(self, feature, bucketing, key, attrs, ctx, label, _treatment): + if feature.prerequisites is not None: + prerequisites_matcher = PrerequisitesMatcher(feature.prerequisites) + if not prerequisites_matcher.match(key, attrs, { + 'evaluator': self, + 'bucketing_key': bucketing, + 'ec': ctx}): + label = Label.PREREQUISITES_NOT_MET + _treatment = feature.default_treatment + + return label, _treatment + + def _treatment_for_flag(self, flag, key, bucketing, attributes, ctx): """ ... diff --git a/splitio/models/grammar/matchers/rule_based_segment.py b/splitio/models/grammar/matchers/rule_based_segment.py index 6c89c98c..6e4c8023 100644 --- a/splitio/models/grammar/matchers/rule_based_segment.py +++ b/splitio/models/grammar/matchers/rule_based_segment.py @@ -65,10 +65,8 @@ def _match_dep_rb_segments(self, excluded_rb_segments, key, attributes, context) if key in excluded_segment.excluded.get_excluded_keys(): return False - if self._match_dep_rb_segments(excluded_segment.excluded.get_excluded_segments(), key, attributes, context): + if self._match_dep_rb_segments(excluded_segment.excluded.get_excluded_segments(), key, attributes, context) \ + or self._match_conditions(excluded_segment.conditions, key, attributes, context): return True - - if self._match_conditions(excluded_segment.conditions, key, attributes, context): - return True - + return False diff --git a/splitio/push/workers.py b/splitio/push/workers.py index e4888f36..e0dd8369 100644 --- a/splitio/push/workers.py +++ b/splitio/push/workers.py @@ -35,6 +35,8 @@ class CompressionMode(Enum): class WorkerBase(object, metaclass=abc.ABCMeta): """Worker template.""" + _fetching_segment = "Fetching new segment {segment_name}" + @abc.abstractmethod def is_running(self): """Return whether the working is running.""" @@ -226,20 +228,18 @@ def _apply_iff_if_needed(self, event): segment_list = update_feature_flag_storage(self._feature_flag_storage, [new_feature_flag], event.change_number) for segment_name in segment_list: if self._segment_storage.get(segment_name) is None: - _LOGGER.debug('Fetching new segment %s', segment_name) + _LOGGER.debug(self._fetching_segment.format(segment_name=segment_name)) self._segment_handler(segment_name, event.change_number) referenced_rbs = self._get_referenced_rbs(new_feature_flag) - if len(referenced_rbs) > 0 and not self._rule_based_segment_storage.contains(referenced_rbs): - _LOGGER.debug('Fetching new rule based segment(s) %s', referenced_rbs) - self._handler(None, event.change_number) + self._fetch_rbs_segment_if_needed(referenced_rbs, event) self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.SPLIT_UPDATE) else: new_rbs = rbs_from_raw(json.loads(self._get_object_definition(event))) segment_list = update_rule_based_segment_storage(self._rule_based_segment_storage, [new_rbs], event.change_number) for segment_name in segment_list: if self._segment_storage.get(segment_name) is None: - _LOGGER.debug('Fetching new segment %s', segment_name) + _LOGGER.debug(self._fetching_segment.format(segment_name=segment_name)) self._segment_handler(segment_name, event.change_number) self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.RBS_UPDATE) return True @@ -247,6 +247,11 @@ def _apply_iff_if_needed(self, event): except Exception as e: raise SplitStorageException(e) + def _fetch_rbs_segment_if_needed(self, referenced_rbs, event): + if len(referenced_rbs) > 0 and not self._rule_based_segment_storage.contains(referenced_rbs): + _LOGGER.debug('Fetching new rule based segment(s) %s', referenced_rbs) + self._handler(None, event.change_number) + def _check_instant_ff_update(self, event): if event.update_type == UpdateType.SPLIT_UPDATE and event.compression is not None and event.previous_change_number == self._feature_flag_storage.get_change_number(): return True @@ -264,16 +269,15 @@ def _run(self): break if event == self._centinel: continue + _LOGGER.debug('Processing feature flag update %d', event.change_number) try: if self._apply_iff_if_needed(event): continue + till = None rbs_till = None - if event.update_type == UpdateType.SPLIT_UPDATE: - till = event.change_number - else: - rbs_till = event.change_number + till, rbs_till = self._check_update_type(till, rbs_till, event) sync_result = self._handler(till, rbs_till) if not sync_result.success and sync_result.error_code is not None and sync_result.error_code == 414: _LOGGER.error("URI too long exception caught, sync failed") @@ -288,6 +292,14 @@ def _run(self): _LOGGER.error('Exception raised in feature flag synchronization') _LOGGER.debug('Exception information: ', exc_info=True) + def _check_update_type(self, till, rbs_till, event): + if event.update_type == UpdateType.SPLIT_UPDATE: + till = event.change_number + else: + rbs_till = event.change_number + + return till, rbs_till + def start(self): """Start worker.""" if self.is_running(): @@ -354,20 +366,18 @@ async def _apply_iff_if_needed(self, event): segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, [new_feature_flag], event.change_number) for segment_name in segment_list: if await self._segment_storage.get(segment_name) is None: - _LOGGER.debug('Fetching new segment %s', segment_name) + _LOGGER.debug(self._fetching_segment.format(segment_name=segment_name)) await self._segment_handler(segment_name, event.change_number) referenced_rbs = self._get_referenced_rbs(new_feature_flag) - if len(referenced_rbs) > 0 and not await self._rule_based_segment_storage.contains(referenced_rbs): - await self._handler(None, event.change_number) - + await self._fetch_rbs_segment_if_needed(referenced_rbs, event) await self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.SPLIT_UPDATE) else: new_rbs = rbs_from_raw(json.loads(self._get_object_definition(event))) segment_list = await update_rule_based_segment_storage_async(self._rule_based_segment_storage, [new_rbs], event.change_number) for segment_name in segment_list: if await self._segment_storage.get(segment_name) is None: - _LOGGER.debug('Fetching new segment %s', segment_name) + _LOGGER.debug(self._fetching_segment.format(segment_name=segment_name)) await self._segment_handler(segment_name, event.change_number) await self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.RBS_UPDATE) return True @@ -375,6 +385,11 @@ async def _apply_iff_if_needed(self, event): except Exception as e: raise SplitStorageException(e) + async def _fetch_rbs_segment_if_needed(self, referenced_rbs, event): + if len(referenced_rbs) > 0 and not await self._rule_based_segment_storage.contains(referenced_rbs): + _LOGGER.debug('Fetching new rule based segment(s) %s', referenced_rbs) + await self._handler(None, event.change_number) + async def _check_instant_ff_update(self, event): if event.update_type == UpdateType.SPLIT_UPDATE and event.compression is not None and event.previous_change_number == await self._feature_flag_storage.get_change_number(): return True diff --git a/splitio/sync/split.py b/splitio/sync/split.py index e5d1f645..c1b5aa39 100644 --- a/splitio/sync/split.py +++ b/splitio/sync/split.py @@ -75,6 +75,12 @@ def _get_config_sets(self): return ','.join(self._feature_flag_storage.flag_set_filter.sorted_flag_sets) + def _check_exit_conditions(self, till, rbs_till, change_number, rbs_change_number): + return (till is not None and till < change_number) or (rbs_till is not None and rbs_till < rbs_change_number) + + def _check_return_conditions(self, feature_flag_changes): + return feature_flag_changes.get('ff')['t'] == feature_flag_changes.get('ff')['s'] and feature_flag_changes.get('rbs')['t'] == feature_flag_changes.get('rbs')['s'] + class SplitSynchronizer(SplitSynchronizerBase): """Feature Flag changes synchronizer.""" @@ -119,7 +125,7 @@ def _fetch_until(self, fetch_options, till=None, rbs_till=None): if rbs_change_number is None: rbs_change_number = -1 - if (till is not None and till < change_number) or (rbs_till is not None and rbs_till < rbs_change_number): + if self._check_exit_conditions(till, rbs_till, change_number, rbs_change_number): # the passed till is less than change_number, no need to perform updates return change_number, rbs_change_number, segment_list @@ -142,7 +148,7 @@ def _fetch_until(self, fetch_options, till=None, rbs_till=None): segment_list.update(update_feature_flag_storage(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes.get('ff')['t'], self._api.clear_storage)) segment_list.update(rbs_segment_list) - if feature_flag_changes.get('ff')['t'] == feature_flag_changes.get('ff')['s'] and feature_flag_changes.get('rbs')['t'] == feature_flag_changes.get('rbs')['s']: + if self._check_return_conditions(feature_flag_changes): return feature_flag_changes.get('ff')['t'], feature_flag_changes.get('rbs')['t'], segment_list def _attempt_feature_flag_sync(self, fetch_options, till=None, rbs_till=None): @@ -278,7 +284,7 @@ async def _fetch_until(self, fetch_options, till=None, rbs_till=None): if rbs_change_number is None: rbs_change_number = -1 - if (till is not None and till < change_number) or (rbs_till is not None and rbs_till < rbs_change_number): + if self._check_exit_conditions(till, rbs_till, change_number, rbs_change_number): # the passed till is less than change_number, no need to perform updates return change_number, rbs_change_number, segment_list @@ -301,7 +307,7 @@ async def _fetch_until(self, fetch_options, till=None, rbs_till=None): segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes.get('ff')['t'], self._api.clear_storage) segment_list.update(rbs_segment_list) - if feature_flag_changes.get('ff')['t'] == feature_flag_changes.get('ff')['s'] and feature_flag_changes.get('rbs')['t'] == feature_flag_changes.get('rbs')['s']: + if self._check_return_conditions(feature_flag_changes): return feature_flag_changes.get('ff')['t'], feature_flag_changes.get('rbs')['t'], segment_list async def _attempt_feature_flag_sync(self, fetch_options, till=None, rbs_till=None):