#!/usr/bin/env python # coding: utf-8 # # Exporting ATT&CK Group Navigator Layers # # Get Relationship STIX Objects - (Manual) # ----------------------- # I believe it is important to understand the code behind the main functions available in the Python library [attackcti](https://attackcti.readthedocs.io/en/latest/index.html). I highly recommend to first read the docs I put together about [cti-taxii-client](https://attackcti.readthedocs.io/en/latest/taxii_client.html) and [cti-python-stix2](https://attackcti.readthedocs.io/en/latest/stix.html) libraries. # Those two summarize several of the concepts that I had to read to understand how to perform a simple query against ATT&CK's TAXII server # ## Import STIX and TAXII Libraries # In[1]: from stix2 import TAXIICollectionSource, Filter, CompositeDataSource from taxii2client import Collection # ## Set ATT&CK TAXII Collection ID Variables # The public ATT&CK TAXII instance has three main collections (Enterprise, Pre and Mobile). Every collection has an ID which attackcti uses to retrieve ATT&CK STIX objects from all those matrices. # In[2]: ATTCK_STIX_COLLECTIONS = "https://cti-taxii.mitre.org/stix/collections/" ENTERPRISE_ATTCK = "95ecc380-afe9-11e4-9b6c-751b66dd541e" PRE_ATTCK = "062767bd-02d2-4b72-84ba-56caef0f8658" MOBILE_ATTCK = "2f669986-b40b-4423-b720-4396ca6a462b" # ## Initialize TAXII Collection Sources # According to [STIX2 docs](https://stix2.readthedocs.io/en/latest/index.html), the [TAXIICollectionSource API](https://stix2.readthedocs.io/en/latest/api/datastore/stix2.datastore.taxii.html#stix2.datastore.taxii.TAXIICollectionSource) provides an interface for searching/retrieving STIX objects from a local/remote TAXII Collection endpoint. In our case, we are pointing to our ATT&CK TAXII Collection instances (https://cti-taxii.mitre.org/stix/collections/) # In[3]: ENTERPRISE_COLLECTION = Collection(ATTCK_STIX_COLLECTIONS + ENTERPRISE_ATTCK + "/") TC_ENTERPRISE_SOURCE = TAXIICollectionSource(ENTERPRISE_COLLECTION) PRE_COLLECTION = Collection(ATTCK_STIX_COLLECTIONS + PRE_ATTCK + "/") TC_PRE_SOURCE = TAXIICollectionSource(PRE_COLLECTION) MOBILE_COLLECTION = Collection(ATTCK_STIX_COLLECTIONS + MOBILE_ATTCK + "/") TC_MOBILE_SOURCE = TAXIICollectionSource(MOBILE_COLLECTION) # ## Initialize a Composite Data Source # According to [STIX2 docs](https://stix2.readthedocs.io/en/latest/index.html), a user can have a single [CompositeDataSource](https://stix2.readthedocs.io/en/latest/api/stix2.datastore.html#stix2.datastore.CompositeDataSource) as an interface to a set of DataSources. When an API call is made to the CompositeDataSource, it is delegated to each of the (real) DataSources that are attached to it. In our case, we have three TAXIICollection sources (Enterprise, PRE and Mobile) as defined in our previous step. Therefore, we can use the CompositeDataSource class and the add_data_sources method to attach every ATT&CK TAXIICollection source and be able to query all of them at the same time. # In[4]: COMPOSITE_DS = CompositeDataSource() COMPOSITE_DS.add_data_sources([TC_ENTERPRISE_SOURCE, TC_PRE_SOURCE, TC_MOBILE_SOURCE]) # ## Retrieve all relationships # Now that we can query all the ATT&CK TAXIICollection sources at once, we can use the query method and a set of filters to retrieve STIX objects of type relationship # In[5]: rels = COMPOSITE_DS.query(Filter("type", "=", "relationship")) rels[0] # ## Retrieve all relationships from an specific STIX object # What if you want to be very specific and get relationships from a specific STIX objects? You can use the [relationships](https://stix2.readthedocs.io/en/latest/api/stix2.datastore.html#stix2.datastore.CompositeDataSource.relationships) method from the [CompositeDataSource](https://stix2.readthedocs.io/en/latest/api/stix2.datastore.html#stix2.datastore.CompositeDataSource) class to retrieve relationships involving a given STIX object. # In[6]: from attackcti import attack_client lift = attack_client() groups = lift.get_groups() groups = lift.remove_revoked(groups) rels = COMPOSITE_DS.relationships(groups[0], 'uses', source_only=True) rels[0] # # Get Relationship STIX Objects - (Automatic) # ----------------------- # ## Retrieve all relationships # In[7]: from attackcti import attack_client lift = attack_client() # In[8]: get_ipython().run_line_magic('time', 'all_relationships = lift.get_relationships()') # In[9]: all_relationships[0] # ## Retrieve all relationships from an specific STIX object # In[10]: groups = lift.get_groups() groups = lift.remove_revoked(groups) # In[11]: get_ipython().run_line_magic('time', 'group_relationships = lift.get_relationships_by_object(groups[0])') # In[12]: group_relationships[0] # # Retrive Techniques used by one Group - (Manual) # ----------------------- # In this case we want relationship objects that have target_ref values of type attack-pattern. Following the manual code I shared above, and the results from the `get_relationships_by_object()` function, you can simply query the ATT&CK Enterprise TAXIICollection source with the filter below # In[13]: filter_objects = [ Filter('type', '=', 'attack-pattern'), Filter('id', '=', [r.target_ref for r in group_relationships]) ] techniques_used = TC_ENTERPRISE_SOURCE.query(filter_objects) techniques_used[0] # # Retrive Techniques used by one Group - (Automatic) # ----------------------- # In[14]: from attackcti import attack_client lift = attack_client() groups = lift.get_groups() groups = lift.remove_revoked(groups) group_techniques = lift.get_techniques_used_by_group(groups[0]) group_techniques[0] # # Retrive all Techniques used by all Groups - (Manual) # ----------------------- # You can apply the same get_techniques_used_by_group() function, but against all the groups STIX objects that the get_groups() function retrieves. You can do a simple for loop over more than 90 groups. However, it takes longer than what I would like it to take. Therefore, I decided to go a different route and started testing some code. # ## Get all groups and techniques # In[15]: from attackcti import attack_client lift = attack_client() groups = lift.get_groups() techniques = lift.get_techniques() # ## Filter Group objects using techniques # In[16]: from stix2.utils import get_type_from_id group_relationships = [] for rel in all_relationships: if get_type_from_id(rel.source_ref) == 'intrusion-set'\ and get_type_from_id(rel.target_ref) == 'attack-pattern': group_relationships.append(rel) len(group_relationships) print(group_relationships[0]) # ## Match Group -> Relationships Intrusion-set ID # Then, I just take all the group_relationships I got, and look for the specific `intrusion-set (Group)` id in the groups STIX objects. Once there is a match, I create new fields on the `intrusion-set (Group)` STIX object to add additional information about the `attack-pattern (Technique)` in the relationship object. The most important additional metadata is the target_ref field which points to the specific `attack-pattern (Technique)` id involving the `group`. The results are then added to a new list named `group_techniques_ref` . # In[17]: import json group_techniques_ref = [] for g in groups: for rel in group_relationships: if g['id'] == rel['source_ref']: gs = json.loads(g.serialize()) gs gs['technique_ref'] = rel['target_ref'] gs['relationship_description'] = rel['description'] gs['relationship_id'] = rel['id'] group_techniques_ref.append(gs) # ## Match Attack-patterns -> Intrusion-set object ID # I apply the same concept as before, and just loop through all the attack-pattern objects and look for the specific attack-pattern id in the initial relationships STIX objects. Once there is a match, I add additional information from the attack-pattern itself to the python dictionaries in the `group_techniques_ref` list. The results then get added to a new list named `groups_use_techniques`. # In[18]: groups_use_techniques = [] for gt in group_techniques_ref: for t in techniques: if gt['technique_ref'] == t['id']: tactic_list = list() for phase in t['kill_chain_phases']: tactic_list.append(phase['phase_name']) gt['technique'] = t['name'] gt['technique_description'] = t['description'] gt['tactic'] = tactic_list gt['technique_id'] = t['external_references'][0]['external_id'] gt['matrix'] = t['external_references'][0]['source_name'] if 'x_mitre_platforms' in t.keys(): gt['platform'] = t['x_mitre_platforms'] if 'x_mitre_data_sources' in t.keys(): gt['data_sources'] = t['x_mitre_data_sources'] if 'x_mitre_permissions_required' in t.keys(): gt['permissions_required'] = t['x_mitre_permissions_required'] if 'x_mitre_effective_permissions' in t.keys(): gt['effective_permissions'] = t['x_mitre_effective_permissions'] groups_use_techniques.append(gt) groups_use_techniques[0] # # Retrive all Techniques used by all Groups - (Automatic) # ----------------------- # In[19]: from attackcti import attack_client lift = attack_client() get_ipython().run_line_magic('time', 'techniques_used = lift.get_techniques_used_by_all_groups()') # In[20]: len(techniques_used) # In[21]: techniques_used[0] # # Create Navigator Group Layer Files - (Manual) # ----------------------- # ## Create a list of group dictionaries # To make things easier, I create a list of dictionaries where each group name is the main key and the value is a list where I append every single technique involving that group. I get that information from the `get_techniques_used_by_all_groups()` results. # In[ ]: groups = lift.get_groups() groups = lift.remove_revoked(groups) groups_list = [] for g in groups: group_dict = dict() group_dict[g['name']] = [] groups_list.append(group_dict) groups_list[89] # ## Group techniques by group # We can then use the output of the `get_techniques_used_by_all_groups()` function and start appending techniques to the dictionaries with the key name that matches the group name being involved with each technique. # In[ ]: for group in groups_list: for group_name,techniques_list in group.items(): for gut in techniques_used: if group_name == gut['name']: technique_dict = dict() technique_dict['techniqueId'] = gut['technique_id'] technique_dict['techniqueName'] = gut['technique'] technique_dict['comment'] = gut['relationship_description'] technique_dict['tactic'] = gut['tactic'] technique_dict['group_id'] = gut['external_references'][0]['external_id'] techniques_list.append(technique_dict) groups_list[89] # ## Run dynamic navigator layer template # In[ ]: import json for group in groups_list: for k,v in group.items(): if v: actor_layer = { "description": ("Enterprise techniques used by {0}, ATT&CK group {1} v1.0".format(k,v[0]['group_id'])), "name": ("{0} ({1})".format(k,v[0]['group_id'])), "domain": "mitre-enterprise", "version": "2.2", "techniques": [ { "score": 1, "techniqueID" : technique['techniqueId'], "techniqueName" : technique['techniqueName'], "comment": technique['comment'] } for technique in v ], "gradient": { "colors": [ "#ffffff", "#ff6666" ], "minValue": 0, "maxValue": 1 }, "legendItems": [ { "label": ("used by {}".format(k)), "color": "#ff6666" } ] } with open(('{0}_{1}.json'.format(k,v[0]['group_id'])), 'w') as f: f.write(json.dumps(actor_layer)) # In[ ]: get_ipython().system(' ls *.json') # We can delete all the files with the following command. # In[ ]: get_ipython().system(' rm *.json') # # Create Navigator Group Layer Files - (Automatic) # ----------------------- # In[ ]: from attackcti import attack_client lift = attack_client() get_ipython().run_line_magic('time', 'lift.export_groups_navigator_layers()') # In[ ]: get_ipython().system(' ls *.json') # We can delete all the files with the following command. # In[ ]: get_ipython().system(' rm *.json') # In[ ]: