66# See https://github.com/aboutcode-org/vulnerablecode for support or download.
77# See https://aboutcode.org for more information about nexB OSS projects.
88#
9-
9+ import datetime
1010from pathlib import Path
1111
12+ import yaml
1213from aboutcode .pipeline import LoopProgress
1314from fetchcode .vcs import fetch_via_vcs
14- from yaml import YAMLError
1515
1616from vulnerabilities .models import AdvisoryAlias
1717from vulnerabilities .models import DetectionRule
@@ -40,58 +40,58 @@ def clone_repo(self):
4040 def collect_and_store_rules (self ):
4141 """
4242 Collect Sigma YAML rules from the destination directory and store/update
43- them as AdvisoryDetectionRule objects.
43+ them as DetectionRule objects.
4444 """
4545
4646 base_directory = Path (self .vcs_response .dest_dir )
47- yaml_files = list (base_directory .rglob ("**/*.yml" ))
48- rules_count = len (yaml_files )
47+ yaml_files = [
48+ p
49+ for p in base_directory .rglob ("**/*.yml" )
50+ if not any (part in [".github" , "images" , "documentation" ] for part in p .parts )
51+ ]
4952
53+ rules_count = len (yaml_files )
5054 self .log (f"Enhancing the vulnerability with { rules_count :,d} rule records" )
5155 progress = LoopProgress (total_iterations = rules_count , logger = self .log )
5256 for file_path in progress .iter (yaml_files ):
53- if any (part in [".github" , "images" , "documentation" ] for part in file_path .parts ):
54- continue
55-
56- with open (file_path , "r" ) as f :
57- try :
58- rule_data = f .read ()
59- except YAMLError as err :
60- self .log (f"Invalid YAML in { file_path } : { err } . Skipping." )
61- continue
57+ raw_text = file_path .read_text (encoding = "utf-8" )
58+ rule_documents = list (yaml .load_all (raw_text , yaml .FullLoader ))
6259
60+ rule_metadata = extract_sigma_metadata (rule_documents )
6361 rule_url = f"https://raw.githubusercontent.com/SigmaHQ/sigma/refs/heads/master/{ file_path .relative_to (base_directory )} "
6462 cve_ids = find_all_cve (str (file_path ))
63+
6564 found_advisories = set ()
6665 for cve_id in cve_ids :
6766 try :
6867 alias = AdvisoryAlias .objects .get (alias = cve_id )
6968 for adv in alias .advisories .all ():
7069 found_advisories .add (adv )
7170 except AdvisoryAlias .DoesNotExist :
72- self .log (f"Advisory { file_path .name } not found." )
71+ self .log (f"AdvisoryAlias { cve_id } : { file_path .name } not found." )
7372 continue
7473
7574 for adv in found_advisories :
7675 DetectionRule .objects .update_or_create (
77- rule_text = rule_data ,
76+ rule_text = raw_text ,
77+ rule_type = DetectionRuleTypes .SIGMA ,
7878 advisory = adv ,
7979 defaults = {
80- "rule_type " : DetectionRuleTypes . SIGMA ,
80+ "rule_metadata " : rule_metadata ,
8181 "source_url" : rule_url ,
8282 },
8383 )
8484
8585 if not found_advisories :
8686 DetectionRule .objects .update_or_create (
87- rule_text = rule_data ,
87+ rule_text = raw_text ,
88+ rule_type = DetectionRuleTypes .SIGMA ,
8889 advisory = None ,
8990 defaults = {
90- "rule_type " : DetectionRuleTypes . SIGMA ,
91+ "rule_metadata " : rule_metadata ,
9192 "source_url" : rule_url ,
9293 },
9394 )
94- self .log (f"Successfully processed rules." )
9595
9696 def clean_downloads (self ):
9797 if self .vcs_response :
@@ -100,3 +100,27 @@ def clean_downloads(self):
100100
101101 def on_failure (self ):
102102 self .clean_downloads ()
103+
104+
105+ def extract_sigma_metadata (rule_documents ):
106+ """
107+ Extract Sigma metadata from Sigma YAML rules
108+ """
109+ if not rule_documents :
110+ return None
111+
112+ first_document = rule_documents [0 ]
113+ metadata = {
114+ "status" : first_document .get ("status" ),
115+ "author" : first_document .get ("author" ),
116+ "date" : first_document .get ("date" ),
117+ "title" : first_document .get ("title" ),
118+ "id" : first_document .get ("id" ),
119+ }
120+
121+ rule_date = metadata .get ("date" )
122+
123+ if isinstance (rule_date , (datetime .date , datetime .datetime )):
124+ metadata ["date" ] = rule_date .isoformat ()
125+
126+ return metadata
0 commit comments