Generator Pipelines using Workbench:

If you don't know what Generator Pipelines here are some resources:

tl;dr - Good for streaming data and building pipelines that iterate over streaming data.

How easy is it to get workbench to build streaming data pipeline thingys?

  • Pretty darn easy...

Lets start up the workbench server...

Run the workbench server (from somewhere, for the demo we're just going to start a local one)

$ workbench_server

Okay so when the server starts up, it autoloads any worker plugins in the server/worker directory and dynamically monitors the directory, if a new python file shows up, it's validated as a properly formed plugin and if it passes is added to the list of workers.

In [76]:
# Lets start to interact with workbench, please note there is NO specific client to workbench,
# Just use the ZeroRPC Python, Node.js, or CLI interfaces.
import zerorpc
c = zerorpc.Client()
In [77]:
# Load in 100 PE Files
import os
file_list = [os.path.join('../data/pe/bad', child) for child in os.listdir('../data/pe/bad')]
file_list += [os.path.join('../data/pe/good', child) for child in os.listdir('../data/pe/good')]
md5_list = []
for filename in file_list:
    with open(filename,'rb') as f:
        md5_list.append(c.store_sample(, filename, 'exe'))
print 'Files loaded: %d' % len(md5_list)
Files loaded: 100

Notice the level on control on our batch operations

Your database may have tons of files of different types. We literally control execution on the per sample level with md5 lists. Alternatively we can specify specific types or simply make a query to the database get exactly what we want and build our own md5 list.

Also notice that we can specify ^exactly^ what data we want down to arbitrary depth.. here we want just the imported_symbols from the sparse features from the pe_features worker.

In [78]:
# Compute pe_features on all files of type pe, just pull back the sparse features
imports = c.batch_work_request('pe_features', {'md5_list': md5_list, 'subkeys':['md5','sparse_features.imported_symbols']})
<generator object iterator at 0x10b545cd0>

Holy s#@&! The server batch request returned a generator?

Yes generators are awesome but getting one from a server request! Are u serious?! Yes, thanks to ZeroRPC... dead serious.. like chopping off your head and kicking your body into a shallow grave and putting your head on a stick... serious.

Now that we have the a server generator from workbench we setup our pipeline and client generators to precisely control server execution on the streaming samples.

In [79]:
# Client generators

# First we're going going to filter PE Files only getting ones with communication related imports
def comm_imports(import_info_list):
    comm_imports = ['accept', 'bind', 'connect', 'connectnamedpipe', 'gethostbyname', 'gethostname', 'inet_addr', 'recv', 'send']
    for import_info in import_info_list:
        md5 = import_info['md5']
        import_symbols = import_info['imported_symbols']
        if any(comm in sym for comm in comm_imports for sym in import_symbols ):
            yield md5

def peid_sigs(md5_list):
    for md5 in md5_list:
        yield c.work_request('pe_peid', md5)
In [80]:
# Now and only now will our generator pipeline unwind. The work will get pulled from the workbench
# server and ONLY what needs to get processed based on our generator filters will get processed.
# Note: Out of 100 PE Files, only 19 matched our filter, so only 19 will get pulled through the peid 
# worker. Imagine a generator pipeline that ended in a dynamic sandbox.. super handy to downselect first.
peid_results = peid_sigs(comm_imports(imports))
for peid_info in peid_results:
    print peid_info
{'pe_peid': {'match_list': ['Microsoft Visual C++ v6.0'], 'md5': '0cb9aa6fb9c4aa3afad7a303e21ac0f3'}}
{'pe_peid': {'match_list': [], 'md5': '0e8b030fb6ae48ffd29e520fc16b5641'}}
{'pe_peid': {'match_list': [], 'md5': '0eb9e990c521b30428a379700ec5ab3e'}}
{'pe_peid': {'match_list': ['Microsoft Visual C++ v6.0'], 'md5': '13dcc5b4570180118eb65529b77f6d89'}}
{'pe_peid': {'match_list': ['Armadillo v4.x'], 'md5': '1cac80a2147cd8f3860547e43edcaa00'}}
{'pe_peid': {'match_list': [], 'md5': '2058c50de5976c67a09dfa5e0e1c7eb5'}}
{'pe_peid': {'match_list': ['UPX ->'], 'md5': '2d09e4aff42aebac87ae2fd737aba94f'}}
{'pe_peid': {'match_list': [], 'md5': '9ceccd9f32cb2ad0b140b6d15d8993b6'}}
{'pe_peid': {'match_list': [], 'md5': 'b681485cb9e0cad73ee85b9274c0d3c2'}}
{'pe_peid': {'match_list': [], 'md5': '093dee8d97fd9d35884ed52179b3d142'}}
{'pe_peid': {'match_list': [], 'md5': '2352ab5f9f8f097bf9d41d5a4718a041'}}
{'pe_peid': {'match_list': ['Installer VISE Custom'], 'md5': '2459a629ace148286360b860442221a2'}}
{'pe_peid': {'match_list': ['Microsoft Visual C++ 8'], 'md5': '2cbfc3993bd5134d58d12668f21d91da'}}
{'pe_peid': {'match_list': ['Microsoft Visual C++ 8.0 (DLL)'], 'md5': '4d71227301dd8d09097b9e4cc6527e5a'}}
{'pe_peid': {'match_list': ['Microsoft Visual C++ 8'], 'md5': '550388c7aecde4bf6fae9f77755d54de'}}
{'pe_peid': {'match_list': ['Microsoft Visual C++ 8.0 (DLL)'], 'md5': '8003518cf919df3475531124b327ceb8'}}
{'pe_peid': {'match_list': [], 'md5': '803b370865d907ea21dc0c2b6a8936b5'}}
{'pe_peid': {'match_list': [], 'md5': '9333096b59dad0c4294749755963dba7'}}
{'pe_peid': {'match_list': ['Borland Delphi 3.0 (???)'], 'md5': 'a3661a61f7e7b7d37e6d037ed747e7ef'}}


This simple set of python commands loaded a bunch of files into workbench (100 in this case but real applications will be a LOT more). We then set up a pipeline on the client side which controlled which workers were invoked in which order (that's a pipeline) but we also placed a filter that explicitly controlled which samples went further down the pipeline.

Super important: The amount of data pulled back to the client was minuscule.

- Just the import symbols for 100 samples
- Just the PEID signature matches for 19 samples

We got exactly the data we needed from the samples and we controlled the whole server pipeline with ease and elegance.