From 62b9a8ea20aaedb5802e57f9852525f9d871d07e Mon Sep 17 00:00:00 2001 From: Tushar Goel Date: Fri, 28 Nov 2025 19:16:24 +0530 Subject: [PATCH 1/6] Collect SSVC trees Signed-off-by: Tushar Goel --- vulnerabilities/migrations/0104_ssvc.py | 51 ++++++++ vulnerabilities/models.py | 18 +++ .../v2_importers/vulnrichment_importer.py | 115 +----------------- .../v2_improvers/collect_ssvc_trees.py | 112 +++++++++++++++++ vulnerabilities/utils.py | 115 ++++++++++++++++++ 5 files changed, 297 insertions(+), 114 deletions(-) create mode 100644 vulnerabilities/migrations/0104_ssvc.py create mode 100644 vulnerabilities/pipelines/v2_improvers/collect_ssvc_trees.py diff --git a/vulnerabilities/migrations/0104_ssvc.py b/vulnerabilities/migrations/0104_ssvc.py new file mode 100644 index 000000000..82e54b8db --- /dev/null +++ b/vulnerabilities/migrations/0104_ssvc.py @@ -0,0 +1,51 @@ +# Generated by Django 4.2.25 on 2025-11-26 13:31 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ("vulnerabilities", "0103_codecommit_impactedpackage_affecting_commits_and_more"), + ] + + operations = [ + migrations.CreateModel( + name="SSVC", + fields=[ + ( + "id", + models.AutoField( + auto_created=True, primary_key=True, serialize=False, verbose_name="ID" + ), + ), + ( + "vector", + models.CharField( + help_text="The vector string representing the SSVC.", max_length=255 + ), + ), + ( + "options", + models.JSONField(help_text="A JSON object containing the SSVC options."), + ), + ( + "decision", + models.CharField(help_text="The decision string for the SSVC.", max_length=255), + ), + ( + "advisory", + models.ForeignKey( + help_text="The advisory associated with this SSVC.", + on_delete=django.db.models.deletion.CASCADE, + related_name="ssvc_entries", + to="vulnerabilities.advisoryv2", + ), + ), + ], + options={ + "unique_together": {("vector", "advisory", "decision")}, + }, + ), + ] diff --git a/vulnerabilities/models.py b/vulnerabilities/models.py index e1c4ddc6b..8358ae558 100644 --- a/vulnerabilities/models.py +++ b/vulnerabilities/models.py @@ -3414,3 +3414,21 @@ class CodeCommit(models.Model): class Meta: unique_together = ("commit_hash", "vcs_url") + + +class SSVC(models.Model): + vector = models.CharField(max_length=255, help_text="The vector string representing the SSVC.") + options = models.JSONField(help_text="A JSON object containing the SSVC options.") + advisory = models.ForeignKey( + AdvisoryV2, + on_delete=models.CASCADE, + related_name="ssvc_entries", + help_text="The advisory associated with this SSVC.", + ) + decision = models.CharField(max_length=255, help_text="The decision string for the SSVC.") + + def __str__(self): + return f"SSVC for Advisory {self.advisory.advisory_id}: {self.decision}" + + class Meta: + unique_together = ("vector", "advisory", "decision") diff --git a/vulnerabilities/pipelines/v2_importers/vulnrichment_importer.py b/vulnerabilities/pipelines/v2_importers/vulnrichment_importer.py index a596d8d65..7de7ff7d7 100644 --- a/vulnerabilities/pipelines/v2_importers/vulnrichment_importer.py +++ b/vulnerabilities/pipelines/v2_importers/vulnrichment_importer.py @@ -16,6 +16,7 @@ from vulnerabilities.utils import get_advisory_url from vulnerabilities.utils import get_cwe_id from vulnerabilities.utils import get_reference_id +from vulnerabilities.utils import ssvc_calculator logger = logging.getLogger(__name__) @@ -210,117 +211,3 @@ def clean_downloads(self): def on_failure(self): self.clean_downloads() - - -def ssvc_calculator(ssvc_data): - """ - Return the ssvc vector and the decision value - """ - options = ssvc_data.get("options", []) - timestamp = ssvc_data.get("timestamp") - - # Extract the options into a dictionary - options_dict = {k: v.lower() for option in options for k, v in option.items()} - - # We copied the table value from this link. - # https://www.cisa.gov/sites/default/files/publications/cisa-ssvc-guide%20508c.pdf - - # Determining Mission and Well-Being Impact Value - mission_well_being_table = { - # (Mission Prevalence, Public Well-being Impact) : "Mission & Well-being" - ("minimal", "minimal"): "low", - ("minimal", "material"): "medium", - ("minimal", "irreversible"): "high", - ("support", "minimal"): "medium", - ("support", "material"): "medium", - ("support", "irreversible"): "high", - ("essential", "minimal"): "high", - ("essential", "material"): "high", - ("essential", "irreversible"): "high", - } - - if "Mission Prevalence" not in options_dict: - options_dict["Mission Prevalence"] = "minimal" - - if "Public Well-being Impact" not in options_dict: - options_dict["Public Well-being Impact"] = "material" - - options_dict["Mission & Well-being"] = mission_well_being_table[ - (options_dict["Mission Prevalence"], options_dict["Public Well-being Impact"]) - ] - - decision_key = ( - options_dict.get("Exploitation"), - options_dict.get("Automatable"), - options_dict.get("Technical Impact"), - options_dict.get("Mission & Well-being"), - ) - - decision_points = { - "Exploitation": {"E": {"none": "N", "poc": "P", "active": "A"}}, - "Automatable": {"A": {"no": "N", "yes": "Y"}}, - "Technical Impact": {"T": {"partial": "P", "total": "T"}}, - "Public Well-being Impact": {"B": {"minimal": "M", "material": "A", "irreversible": "I"}}, - "Mission Prevalence": {"P": {"minimal": "M", "support": "S", "essential": "E"}}, - "Mission & Well-being": {"M": {"low": "L", "medium": "M", "high": "H"}}, - } - - # Create the SSVC vector - ssvc_vector = "SSVCv2/" - for key, value_map in options_dict.items(): - options_key = decision_points.get(key) - for lhs, rhs_map in options_key.items(): - ssvc_vector += f"{lhs}:{rhs_map.get(value_map)}/" - - # "Decision": {"D": {"Track": "T", "Track*": "R", "Attend": "A", "Act": "C"}}, - decision_values = {"Track": "T", "Track*": "R", "Attend": "A", "Act": "C"} - - decision_lookup = { - ("none", "no", "partial", "low"): "Track", - ("none", "no", "partial", "medium"): "Track", - ("none", "no", "partial", "high"): "Track", - ("none", "no", "total", "low"): "Track", - ("none", "no", "total", "medium"): "Track", - ("none", "no", "total", "high"): "Track*", - ("none", "yes", "partial", "low"): "Track", - ("none", "yes", "partial", "medium"): "Track", - ("none", "yes", "partial", "high"): "Attend", - ("none", "yes", "total", "low"): "Track", - ("none", "yes", "total", "medium"): "Track", - ("none", "yes", "total", "high"): "Attend", - ("poc", "no", "partial", "low"): "Track", - ("poc", "no", "partial", "medium"): "Track", - ("poc", "no", "partial", "high"): "Track*", - ("poc", "no", "total", "low"): "Track", - ("poc", "no", "total", "medium"): "Track*", - ("poc", "no", "total", "high"): "Attend", - ("poc", "yes", "partial", "low"): "Track", - ("poc", "yes", "partial", "medium"): "Track", - ("poc", "yes", "partial", "high"): "Attend", - ("poc", "yes", "total", "low"): "Track", - ("poc", "yes", "total", "medium"): "Track*", - ("poc", "yes", "total", "high"): "Attend", - ("active", "no", "partial", "low"): "Track", - ("active", "no", "partial", "medium"): "Track", - ("active", "no", "partial", "high"): "Attend", - ("active", "no", "total", "low"): "Track", - ("active", "no", "total", "medium"): "Attend", - ("active", "no", "total", "high"): "Act", - ("active", "yes", "partial", "low"): "Attend", - ("active", "yes", "partial", "medium"): "Attend", - ("active", "yes", "partial", "high"): "Act", - ("active", "yes", "total", "low"): "Attend", - ("active", "yes", "total", "medium"): "Act", - ("active", "yes", "total", "high"): "Act", - } - - decision = decision_lookup.get(decision_key, "") - - if decision: - ssvc_vector += f"D:{decision_values.get(decision)}/" - - if timestamp: - timestamp_formatted = dateparser.parse(timestamp).strftime("%Y-%m-%dT%H:%M:%SZ") - - ssvc_vector += f"{timestamp_formatted}/" - return ssvc_vector, decision diff --git a/vulnerabilities/pipelines/v2_improvers/collect_ssvc_trees.py b/vulnerabilities/pipelines/v2_improvers/collect_ssvc_trees.py new file mode 100644 index 000000000..ccc6ae455 --- /dev/null +++ b/vulnerabilities/pipelines/v2_improvers/collect_ssvc_trees.py @@ -0,0 +1,112 @@ +import json +import logging +from pathlib import Path +from typing import Iterable, List + +from fetchcode.vcs import fetch_via_vcs + +from vulnerabilities.importer import AdvisoryData +from vulnerabilities.models import SSVC, AdvisoryV2 +from vulnerabilities.pipelines import VulnerableCodePipeline +from vulnerabilities.severity_systems import SCORING_SYSTEMS +from vulnerabilities.utils import ssvc_calculator + +logger = logging.getLogger(__name__) + + +class CollectSSVCPipeline(VulnerableCodePipeline): + """ + Collect SSVC Pipeline + + This pipeline collects SSVC from Vulnrichment project and associates them with existing advisories. + """ + + pipeline_id = "collect_ssvc" + spdx_license_expression = "CC0-1.0" + license_url = "https://github.com/cisagov/vulnrichment/blob/develop/LICENSE" + repo_url = "git+https://github.com/cisagov/vulnrichment.git" + + @classmethod + def steps(cls): + return ( + cls.clone, + cls.collect_ssvc_data, + cls.clean_downloads, + ) + + def clone(self): + self.log(f"Cloning `{self.repo_url}`") + self.vcs_response = fetch_via_vcs(self.repo_url) + + def collect_ssvc_data(self): + self.log(self.vcs_response.dest_dir) + base_path = Path(self.vcs_response.dest_dir) + for file_path in base_path.glob("**/**/*.json"): + self.log(f"Processing file: {file_path}") + if not file_path.name.startswith("CVE-"): + continue + with open(file_path) as f: + raw_data = json.load(f) + file_name = file_path.name + # strip .json from file name + cve_id = file_name[:-5] + advisories = list(AdvisoryV2.objects.filter(advisory_id=cve_id)) + if not advisories: + self.log(f"No advisories found for CVE ID: {cve_id}") + continue + self.parse_cve_advisory(raw_data, advisories) + + def parse_cve_advisory(self, raw_data, advisories: List[AdvisoryV2]): + self.log(f"Processing CVE data") + cve_metadata = raw_data.get("cveMetadata", {}) + cve_id = cve_metadata.get("cveId") + + containers = raw_data.get("containers", {}) + adp_data = containers.get("adp", {}) + self.log(f"Processing ADP") + + metrics = [ + adp_metrics for data in adp_data for adp_metrics in data.get("metrics", []) + ] + + vulnrichment_scoring_system = { + "other": { + "ssvc": SCORING_SYSTEMS["ssvc"], + }, # ignore kev + } + + for metric in metrics: + self.log(metric) + self.log(f"Processing metric") + for metric_type, metric_value in metric.items(): + if metric_type not in vulnrichment_scoring_system: + continue + + if metric_type == "other": + other_types = metric_value.get("type") + self.log(f"Processing SSVC") + if other_types == "ssvc": + content = metric_value.get("content", {}) + options = content.get("options", {}) + vector_string, decision = ssvc_calculator(content) + advisories = list(AdvisoryV2.objects.filter(advisory_id=cve_id)) + if not advisories: + continue + ssvc_trees = [] + for advisory in advisories: + obj = SSVC( + advisory=advisory, + options=options, + decision=decision, + vector=vector_string, + ) + ssvc_trees.append(obj) + SSVC.objects.bulk_create(ssvc_trees, ignore_conflicts=True, batch_size=1000) + + def clean_downloads(self): + if self.vcs_response: + self.log("Removing cloned repository") + self.vcs_response.delete() + + def on_failure(self): + self.clean_downloads() diff --git a/vulnerabilities/utils.py b/vulnerabilities/utils.py index b65726a5d..67c6bc10f 100644 --- a/vulnerabilities/utils.py +++ b/vulnerabilities/utils.py @@ -26,6 +26,7 @@ from unittest.mock import MagicMock from urllib.parse import urljoin +import dateparser import requests import saneyaml import toml @@ -678,3 +679,117 @@ def create_registry(pipelines): registry[key] = pipeline return registry + + +def ssvc_calculator(ssvc_data): + """ + Return the ssvc vector and the decision value + """ + options = ssvc_data.get("options", []) + timestamp = ssvc_data.get("timestamp") + + # Extract the options into a dictionary + options_dict = {k: v.lower() for option in options for k, v in option.items()} + + # We copied the table value from this link. + # https://www.cisa.gov/sites/default/files/publications/cisa-ssvc-guide%20508c.pdf + + # Determining Mission and Well-Being Impact Value + mission_well_being_table = { + # (Mission Prevalence, Public Well-being Impact) : "Mission & Well-being" + ("minimal", "minimal"): "low", + ("minimal", "material"): "medium", + ("minimal", "irreversible"): "high", + ("support", "minimal"): "medium", + ("support", "material"): "medium", + ("support", "irreversible"): "high", + ("essential", "minimal"): "high", + ("essential", "material"): "high", + ("essential", "irreversible"): "high", + } + + if "Mission Prevalence" not in options_dict: + options_dict["Mission Prevalence"] = "minimal" + + if "Public Well-being Impact" not in options_dict: + options_dict["Public Well-being Impact"] = "material" + + options_dict["Mission & Well-being"] = mission_well_being_table[ + (options_dict["Mission Prevalence"], options_dict["Public Well-being Impact"]) + ] + + decision_key = ( + options_dict.get("Exploitation"), + options_dict.get("Automatable"), + options_dict.get("Technical Impact"), + options_dict.get("Mission & Well-being"), + ) + + decision_points = { + "Exploitation": {"E": {"none": "N", "poc": "P", "active": "A"}}, + "Automatable": {"A": {"no": "N", "yes": "Y"}}, + "Technical Impact": {"T": {"partial": "P", "total": "T"}}, + "Public Well-being Impact": {"B": {"minimal": "M", "material": "A", "irreversible": "I"}}, + "Mission Prevalence": {"P": {"minimal": "M", "support": "S", "essential": "E"}}, + "Mission & Well-being": {"M": {"low": "L", "medium": "M", "high": "H"}}, + } + + # Create the SSVC vector + ssvc_vector = "SSVCv2/" + for key, value_map in options_dict.items(): + options_key = decision_points.get(key) + for lhs, rhs_map in options_key.items(): + ssvc_vector += f"{lhs}:{rhs_map.get(value_map)}/" + + # "Decision": {"D": {"Track": "T", "Track*": "R", "Attend": "A", "Act": "C"}}, + decision_values = {"Track": "T", "Track*": "R", "Attend": "A", "Act": "C"} + + decision_lookup = { + ("none", "no", "partial", "low"): "Track", + ("none", "no", "partial", "medium"): "Track", + ("none", "no", "partial", "high"): "Track", + ("none", "no", "total", "low"): "Track", + ("none", "no", "total", "medium"): "Track", + ("none", "no", "total", "high"): "Track*", + ("none", "yes", "partial", "low"): "Track", + ("none", "yes", "partial", "medium"): "Track", + ("none", "yes", "partial", "high"): "Attend", + ("none", "yes", "total", "low"): "Track", + ("none", "yes", "total", "medium"): "Track", + ("none", "yes", "total", "high"): "Attend", + ("poc", "no", "partial", "low"): "Track", + ("poc", "no", "partial", "medium"): "Track", + ("poc", "no", "partial", "high"): "Track*", + ("poc", "no", "total", "low"): "Track", + ("poc", "no", "total", "medium"): "Track*", + ("poc", "no", "total", "high"): "Attend", + ("poc", "yes", "partial", "low"): "Track", + ("poc", "yes", "partial", "medium"): "Track", + ("poc", "yes", "partial", "high"): "Attend", + ("poc", "yes", "total", "low"): "Track", + ("poc", "yes", "total", "medium"): "Track*", + ("poc", "yes", "total", "high"): "Attend", + ("active", "no", "partial", "low"): "Track", + ("active", "no", "partial", "medium"): "Track", + ("active", "no", "partial", "high"): "Attend", + ("active", "no", "total", "low"): "Track", + ("active", "no", "total", "medium"): "Attend", + ("active", "no", "total", "high"): "Act", + ("active", "yes", "partial", "low"): "Attend", + ("active", "yes", "partial", "medium"): "Attend", + ("active", "yes", "partial", "high"): "Act", + ("active", "yes", "total", "low"): "Attend", + ("active", "yes", "total", "medium"): "Act", + ("active", "yes", "total", "high"): "Act", + } + + decision = decision_lookup.get(decision_key, "") + + if decision: + ssvc_vector += f"D:{decision_values.get(decision)}/" + + if timestamp: + timestamp_formatted = dateparser.parse(timestamp).strftime("%Y-%m-%dT%H:%M:%SZ") + + ssvc_vector += f"{timestamp_formatted}/" + return ssvc_vector, decision From 0c4ebfeb79e94e7be96bbfa0e1c50458fc41775f Mon Sep 17 00:00:00 2001 From: Tushar Goel Date: Mon, 15 Dec 2025 20:48:53 +0530 Subject: [PATCH 2/6] Add pipeline to collect SSVC Trees Signed-off-by: Tushar Goel --- vulnerabilities/api_v2.py | 17 +- vulnerabilities/improvers/__init__.py | 2 + vulnerabilities/migrations/0104_ssvc.py | 18 +- vulnerabilities/models.py | 17 +- .../v2_improvers/collect_ssvc_trees.py | 186 +++++++++--------- 5 files changed, 139 insertions(+), 101 deletions(-) diff --git a/vulnerabilities/api_v2.py b/vulnerabilities/api_v2.py index ba41d0906..09dbca880 100644 --- a/vulnerabilities/api_v2.py +++ b/vulnerabilities/api_v2.py @@ -146,6 +146,18 @@ class AdvisoryV2Serializer(serializers.ModelSerializer): references = AdvisoryReferenceSerializer(many=True) severities = AdvisorySeveritySerializer(many=True) advisory_id = serializers.CharField(source="avid", read_only=True) + ssvc_trees = serializers.SerializerMethodField() + + def get_ssvc_trees(self, obj): + ssvc_trees = obj.ssvc_entries.all() + return [ + { + "vector": ssvc.vector, + "decision": ssvc.decision, + "options": ssvc.options, + } + for ssvc in ssvc_trees + ] class Meta: model = AdvisoryV2 @@ -160,6 +172,7 @@ class Meta: "exploitability", "weighted_severity", "risk_score", + "ssvc_trees", ] def get_aliases(self, obj): @@ -1033,13 +1046,13 @@ def list(self, request, *args, **kwargs): return self.get_paginated_response({"advisories": advisory_data, "packages": data}) # If pagination is not applied, collect vulnerabilities for all packages - for package in queryset: + for package in filtered_queryset: advisories.update({impact.advisory for impact in package.affected_in_impacts.all()}) advisories.update({impact.advisory for impact in package.fixed_in_impacts.all()}) advisory_data = {f"{adv.avid}": AdvisoryV2Serializer(adv).data for adv in advisories} - serializer = self.get_serializer(queryset, many=True) + serializer = self.get_serializer(filtered_queryset, many=True) data = serializer.data return Response({"advisories": advisory_data, "packages": data}) diff --git a/vulnerabilities/improvers/__init__.py b/vulnerabilities/improvers/__init__.py index aa9312ec1..7735ad816 100644 --- a/vulnerabilities/improvers/__init__.py +++ b/vulnerabilities/improvers/__init__.py @@ -19,6 +19,7 @@ from vulnerabilities.pipelines import flag_ghost_packages from vulnerabilities.pipelines import populate_vulnerability_summary_pipeline from vulnerabilities.pipelines import remove_duplicate_advisories +from vulnerabilities.pipelines.v2_improvers import collect_ssvc_trees from vulnerabilities.pipelines.v2_improvers import compute_advisory_todo as compute_advisory_todo_v2 from vulnerabilities.pipelines.v2_improvers import compute_package_risk as compute_package_risk_v2 from vulnerabilities.pipelines.v2_improvers import ( @@ -70,5 +71,6 @@ compute_advisory_todo_v2.ComputeToDo, unfurl_version_range_v2.UnfurlVersionRangePipeline, compute_advisory_todo.ComputeToDo, + collect_ssvc_trees.CollectSSVCPipeline, ] ) diff --git a/vulnerabilities/migrations/0104_ssvc.py b/vulnerabilities/migrations/0104_ssvc.py index 82e54b8db..1e9100eae 100644 --- a/vulnerabilities/migrations/0104_ssvc.py +++ b/vulnerabilities/migrations/0104_ssvc.py @@ -1,4 +1,4 @@ -# Generated by Django 4.2.25 on 2025-11-26 13:31 +# Generated by Django 4.2.25 on 2025-12-15 15:15 from django.db import migrations, models import django.db.models.deletion @@ -35,17 +35,25 @@ class Migration(migrations.Migration): models.CharField(help_text="The decision string for the SSVC.", max_length=255), ), ( - "advisory", + "related_advisories", + models.ManyToManyField( + help_text="Advisories associated with this SSVC.", + related_name="related_ssvcs", + to="vulnerabilities.advisoryv2", + ), + ), + ( + "source_advisory", models.ForeignKey( - help_text="The advisory associated with this SSVC.", + help_text="The advisory that was used to generate this SSVC decision.", on_delete=django.db.models.deletion.CASCADE, - related_name="ssvc_entries", + related_name="source_ssvcs", to="vulnerabilities.advisoryv2", ), ), ], options={ - "unique_together": {("vector", "advisory", "decision")}, + "unique_together": {("vector", "source_advisory")}, }, ), ] diff --git a/vulnerabilities/models.py b/vulnerabilities/models.py index 8358ae558..9a0d05c8f 100644 --- a/vulnerabilities/models.py +++ b/vulnerabilities/models.py @@ -3419,16 +3419,21 @@ class Meta: class SSVC(models.Model): vector = models.CharField(max_length=255, help_text="The vector string representing the SSVC.") options = models.JSONField(help_text="A JSON object containing the SSVC options.") - advisory = models.ForeignKey( + decision = models.CharField(max_length=255, help_text="The decision string for the SSVC.") + related_advisories = models.ManyToManyField( + AdvisoryV2, + related_name="related_ssvcs", + help_text="Advisories associated with this SSVC.", + ) + source_advisory = models.ForeignKey( AdvisoryV2, on_delete=models.CASCADE, - related_name="ssvc_entries", - help_text="The advisory associated with this SSVC.", + related_name="source_ssvcs", + help_text="The advisory that was used to generate this SSVC decision.", ) - decision = models.CharField(max_length=255, help_text="The decision string for the SSVC.") def __str__(self): - return f"SSVC for Advisory {self.advisory.advisory_id}: {self.decision}" + return f"SSVC Decision: {self.vector} -> {self.decision}" class Meta: - unique_together = ("vector", "advisory", "decision") + unique_together = ("vector", "source_advisory") diff --git a/vulnerabilities/pipelines/v2_improvers/collect_ssvc_trees.py b/vulnerabilities/pipelines/v2_improvers/collect_ssvc_trees.py index ccc6ae455..c310f012c 100644 --- a/vulnerabilities/pipelines/v2_improvers/collect_ssvc_trees.py +++ b/vulnerabilities/pipelines/v2_improvers/collect_ssvc_trees.py @@ -1,15 +1,20 @@ -import json -import logging -from pathlib import Path -from typing import Iterable, List +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# -from fetchcode.vcs import fetch_via_vcs +import logging -from vulnerabilities.importer import AdvisoryData -from vulnerabilities.models import SSVC, AdvisoryV2 +from django.db.models import Q +from vulnerabilities.models import SSVC +from vulnerabilities.models import AdvisoryV2 from vulnerabilities.pipelines import VulnerableCodePipeline +from vulnerabilities.pipelines.v2_importers.vulnrichment_importer import VulnrichImporterPipeline from vulnerabilities.severity_systems import SCORING_SYSTEMS -from vulnerabilities.utils import ssvc_calculator logger = logging.getLogger(__name__) @@ -21,92 +26,97 @@ class CollectSSVCPipeline(VulnerableCodePipeline): This pipeline collects SSVC from Vulnrichment project and associates them with existing advisories. """ - pipeline_id = "collect_ssvc" + pipeline_id = "collect_ssvc_tree_v2" spdx_license_expression = "CC0-1.0" - license_url = "https://github.com/cisagov/vulnrichment/blob/develop/LICENSE" - repo_url = "git+https://github.com/cisagov/vulnrichment.git" @classmethod def steps(cls): return ( - cls.clone, cls.collect_ssvc_data, - cls.clean_downloads, ) - def clone(self): - self.log(f"Cloning `{self.repo_url}`") - self.vcs_response = fetch_via_vcs(self.repo_url) - def collect_ssvc_data(self): - self.log(self.vcs_response.dest_dir) - base_path = Path(self.vcs_response.dest_dir) - for file_path in base_path.glob("**/**/*.json"): - self.log(f"Processing file: {file_path}") - if not file_path.name.startswith("CVE-"): - continue - with open(file_path) as f: - raw_data = json.load(f) - file_name = file_path.name - # strip .json from file name - cve_id = file_name[:-5] - advisories = list(AdvisoryV2.objects.filter(advisory_id=cve_id)) - if not advisories: - self.log(f"No advisories found for CVE ID: {cve_id}") - continue - self.parse_cve_advisory(raw_data, advisories) - - def parse_cve_advisory(self, raw_data, advisories: List[AdvisoryV2]): - self.log(f"Processing CVE data") - cve_metadata = raw_data.get("cveMetadata", {}) - cve_id = cve_metadata.get("cveId") - - containers = raw_data.get("containers", {}) - adp_data = containers.get("adp", {}) - self.log(f"Processing ADP") - - metrics = [ - adp_metrics for data in adp_data for adp_metrics in data.get("metrics", []) - ] - - vulnrichment_scoring_system = { - "other": { - "ssvc": SCORING_SYSTEMS["ssvc"], - }, # ignore kev - } - - for metric in metrics: - self.log(metric) - self.log(f"Processing metric") - for metric_type, metric_value in metric.items(): - if metric_type not in vulnrichment_scoring_system: - continue - - if metric_type == "other": - other_types = metric_value.get("type") - self.log(f"Processing SSVC") - if other_types == "ssvc": - content = metric_value.get("content", {}) - options = content.get("options", {}) - vector_string, decision = ssvc_calculator(content) - advisories = list(AdvisoryV2.objects.filter(advisory_id=cve_id)) - if not advisories: - continue - ssvc_trees = [] - for advisory in advisories: - obj = SSVC( - advisory=advisory, - options=options, - decision=decision, - vector=vector_string, - ) - ssvc_trees.append(obj) - SSVC.objects.bulk_create(ssvc_trees, ignore_conflicts=True, batch_size=1000) - - def clean_downloads(self): - if self.vcs_response: - self.log("Removing cloned repository") - self.vcs_response.delete() - - def on_failure(self): - self.clean_downloads() + vulnrichment_advisories = AdvisoryV2.objects.filter( + datasource_id=VulnrichImporterPipeline.pipeline_id, + ) + for advisory in vulnrichment_advisories: + severities = advisory.severities.filter(scoring_system=SCORING_SYSTEMS["ssvc"]) + for severity in severities: + ssvc_vector = severity.scoring_elements + try: + ssvc_tree, decision = convert_vector_to_tree_and_decision(ssvc_vector) + self.log(f"Advisory: {advisory.advisory_id}, SSVC Tree: {ssvc_tree}, Decision: {decision}, vector: {ssvc_vector}") + ssvc_obj, _ = SSVC.objects.get_or_create( + source_advisory=advisory, + defaults={ + "options": ssvc_tree, + "decision": decision, + }, + ) + # All advisories that have advisory.advisory_id in their aliases or advisory_id same as advisory.advisory_id + related_advisories = AdvisoryV2.objects.filter( + Q(advisory_id=advisory.advisory_id) | + Q(aliases__alias=advisory.advisory_id) + ).distinct() + # remove the current advisory from related advisories + related_advisories = related_advisories.exclude(id=advisory.id) + ssvc_obj.related_advisories.set(related_advisories) + except ValueError as e: + logger.error(f"Failed to parse SSVC vector '{ssvc_vector}' for advisory '{advisory}': {e}") + +REVERSE_POINTS = { + "E": ("Exploitation", {"N": "none", "P": "poc", "A": "active"}), + "A": ("Automatable", {"N": "no", "Y": "yes"}), + "T": ("Technical Impact", {"P": "partial", "T": "total"}), + "P": ("Mission Prevalence", {"M": "minimal", "S": "support", "E": "essential"}), + "B": ("Public Well-being Impact", {"M": "minimal", "A": "material", "I": "irreversible"}), + "M": ("Mission & Well-being", {"L": "low", "M": "medium", "H": "high"}), +} + +REVERSE_DECISION = { + "T": "Track", + "R": "Track*", + "A": "Attend", + "C": "Act", +} + +VECTOR_ORDER = ["E", "A", "T", "P", "B", "M"] + +def convert_vector_to_tree_and_decision(vector: str): + """ + Convert a given SSVC vector string into a structured tree and decision. + + Args: + vector (str): The SSVC vector string. + + Returns: + tuple: A tuple containing the SSVC tree (dict) and decision (str). + """ + if not vector.startswith("SSVCv2/"): + raise ValueError("Invalid SSVC vector") + + parts = [p for p in vector.replace("SSVCv2/", "").split("/") if p] + + options = [] + decision = None + + for part in parts: + if ":" not in part: + continue + + key, value = part.split(":", 1) + + if key == "D": + decision = REVERSE_DECISION.get(value) + continue + + if key in REVERSE_POINTS: + name, mapping = REVERSE_POINTS[key] + options.append({name: mapping[value]}) + + # Preserve canonical SSVC order + options.sort(key=lambda o: VECTOR_ORDER.index( + next(k for k, _ in REVERSE_POINTS.values() if k == next(iter(o))) + ) if False else 0) + + return options, decision From ea03fa0d259492b43708e384e88096a9e107fde3 Mon Sep 17 00:00:00 2001 From: Tushar Goel Date: Mon, 15 Dec 2025 22:27:53 +0530 Subject: [PATCH 3/6] Adjust tests Signed-off-by: Tushar Goel --- vulnerabilities/api_v2.py | 37 +++-- .../v2_improvers/collect_ssvc_trees.py | 60 ++++--- .../templates/advisory_detail.html | 152 ++++++------------ vulnerabilities/tests/test_api_v2.py | 10 +- vulnerabilities/views.py | 70 +++++--- 5 files changed, 167 insertions(+), 162 deletions(-) diff --git a/vulnerabilities/api_v2.py b/vulnerabilities/api_v2.py index 09dbca880..39b2e60cd 100644 --- a/vulnerabilities/api_v2.py +++ b/vulnerabilities/api_v2.py @@ -146,18 +146,31 @@ class AdvisoryV2Serializer(serializers.ModelSerializer): references = AdvisoryReferenceSerializer(many=True) severities = AdvisorySeveritySerializer(many=True) advisory_id = serializers.CharField(source="avid", read_only=True) - ssvc_trees = serializers.SerializerMethodField() + related_ssvc_trees = serializers.SerializerMethodField() - def get_ssvc_trees(self, obj): - ssvc_trees = obj.ssvc_entries.all() - return [ - { - "vector": ssvc.vector, - "decision": ssvc.decision, - "options": ssvc.options, - } - for ssvc in ssvc_trees - ] + def get_related_ssvc_trees(self, obj): + related_ssvcs = obj.related_ssvcs.all().select_related("source_advisory") + source_ssvcs = obj.source_ssvcs.all().select_related("source_advisory") + + seen = set() + result = [] + + for ssvc in list(related_ssvcs) + list(source_ssvcs): + key = (ssvc.vector, ssvc.source_advisory_id) + if key in seen: + continue + seen.add(key) + + result.append( + { + "vector": ssvc.vector, + "decision": ssvc.decision, + "options": ssvc.options, + "source_url": ssvc.source_advisory.url, + } + ) + + return result class Meta: model = AdvisoryV2 @@ -172,7 +185,7 @@ class Meta: "exploitability", "weighted_severity", "risk_score", - "ssvc_trees", + "related_ssvc_trees", ] def get_aliases(self, obj): diff --git a/vulnerabilities/pipelines/v2_improvers/collect_ssvc_trees.py b/vulnerabilities/pipelines/v2_improvers/collect_ssvc_trees.py index c310f012c..4afe8b2a1 100644 --- a/vulnerabilities/pipelines/v2_improvers/collect_ssvc_trees.py +++ b/vulnerabilities/pipelines/v2_improvers/collect_ssvc_trees.py @@ -9,8 +9,11 @@ import logging +from django.db.models import Prefetch from django.db.models import Q + from vulnerabilities.models import SSVC +from vulnerabilities.models import AdvisorySeverity from vulnerabilities.models import AdvisoryV2 from vulnerabilities.pipelines import VulnerableCodePipeline from vulnerabilities.pipelines.v2_importers.vulnrichment_importer import VulnrichImporterPipeline @@ -26,43 +29,60 @@ class CollectSSVCPipeline(VulnerableCodePipeline): This pipeline collects SSVC from Vulnrichment project and associates them with existing advisories. """ - pipeline_id = "collect_ssvc_tree_v2" - spdx_license_expression = "CC0-1.0" + pipeline_id = "collect_ssvc_trees" @classmethod def steps(cls): - return ( - cls.collect_ssvc_data, - ) + return (cls.collect_ssvc_data,) def collect_ssvc_data(self): - vulnrichment_advisories = AdvisoryV2.objects.filter( - datasource_id=VulnrichImporterPipeline.pipeline_id, + vulnrichment_advisories = ( + AdvisoryV2.objects.filter( + datasource_id=VulnrichImporterPipeline.pipeline_id, + severities__scoring_system=SCORING_SYSTEMS["ssvc"], + ) + .distinct() + .prefetch_related( + Prefetch( + "severities", + queryset=AdvisorySeverity.objects.filter( + scoring_system=SCORING_SYSTEMS["ssvc"] + ), + ) + ) + ) + + self.log( + f"Found {vulnrichment_advisories.count()} advisories from Vulnrichment with SSVC severities." ) for advisory in vulnrichment_advisories: - severities = advisory.severities.filter(scoring_system=SCORING_SYSTEMS["ssvc"]) - for severity in severities: + self.log(f"Processing advisory: {advisory.advisory_id}") + for severity in advisory.severities.all(): ssvc_vector = severity.scoring_elements try: ssvc_tree, decision = convert_vector_to_tree_and_decision(ssvc_vector) - self.log(f"Advisory: {advisory.advisory_id}, SSVC Tree: {ssvc_tree}, Decision: {decision}, vector: {ssvc_vector}") + self.log( + f"Advisory: {advisory.advisory_id}, SSVC Tree: {ssvc_tree}, Decision: {decision}, vector: {ssvc_vector}" + ) ssvc_obj, _ = SSVC.objects.get_or_create( source_advisory=advisory, defaults={ "options": ssvc_tree, "decision": decision, + "vector": ssvc_vector, }, ) # All advisories that have advisory.advisory_id in their aliases or advisory_id same as advisory.advisory_id related_advisories = AdvisoryV2.objects.filter( - Q(advisory_id=advisory.advisory_id) | - Q(aliases__alias=advisory.advisory_id) + Q(advisory_id=advisory.advisory_id) | Q(aliases__alias=advisory.advisory_id) ).distinct() - # remove the current advisory from related advisories related_advisories = related_advisories.exclude(id=advisory.id) ssvc_obj.related_advisories.set(related_advisories) except ValueError as e: - logger.error(f"Failed to parse SSVC vector '{ssvc_vector}' for advisory '{advisory}': {e}") + logger.error( + f"Failed to parse SSVC vector '{ssvc_vector}' for advisory '{advisory}': {e}" + ) + REVERSE_POINTS = { "E": ("Exploitation", {"N": "none", "P": "poc", "A": "active"}), @@ -82,6 +102,7 @@ def collect_ssvc_data(self): VECTOR_ORDER = ["E", "A", "T", "P", "B", "M"] + def convert_vector_to_tree_and_decision(vector: str): """ Convert a given SSVC vector string into a structured tree and decision. @@ -114,9 +135,12 @@ def convert_vector_to_tree_and_decision(vector: str): name, mapping = REVERSE_POINTS[key] options.append({name: mapping[value]}) - # Preserve canonical SSVC order - options.sort(key=lambda o: VECTOR_ORDER.index( - next(k for k, _ in REVERSE_POINTS.values() if k == next(iter(o))) - ) if False else 0) + options.sort( + key=lambda o: VECTOR_ORDER.index( + next(k for k, _ in REVERSE_POINTS.values() if k == next(iter(o))) + ) + if False + else 0 + ) return options, decision diff --git a/vulnerabilities/templates/advisory_detail.html b/vulnerabilities/templates/advisory_detail.html index c3d93619a..976ae80de 100644 --- a/vulnerabilities/templates/advisory_detail.html +++ b/vulnerabilities/templates/advisory_detail.html @@ -44,13 +44,6 @@ -
  • - - - Severity details ({{ severity_vectors|length }}) - - -
  • {% if advisory.exploits %}
  • @@ -70,6 +63,16 @@
  • + {% if ssvcs %} +
  • + + + Related SSVCS ({{ ssvcs|length }}) + + +
  • + {% endif %} +