HiveBrain v1.2.0
Get Started
← Back to all entries
patternpythonMinor

Get malware security feeds, and output to file

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
fileoutputsecuritygetandfeedsmalware

Problem

The idea is that local security/SIEM solutions want to populate their database with information gathered from different security feeds. This is done through importing and monitoring .txt files (for some reason).

The idea behind this, is that I want each feed to have their own function, to easier be able to filter out, or add new feeds. Not to mention that almost every feed has it's own format.

The output of all this, needs to be in the current format, which is:

IP [FEED NAME]

DOMAIN [FEED NAME]


I am sure there are better ways, especially to generate the .txt file outputs.

The idea is that the script is to be run by itself, without any input, in a cron file, and if something has to be changed, the script itself should be edited.

All feeds are directly available for everyone, so if you are curious about any formats, the URLs are in the script, with the exception of the first feed (OTX), which needs you to sign up, if needed I can paste a output from this feed.

So, you can copy and paste this script in and run it out of the box, except you might want to comment out get_alienvault.

Questions:

  • Due to that every feed has a different format, I am guessing there would be no way for me to have one function that goes through all feeds, or does someone see a way?



  • I am sure the output can be heavily modified, at a later date, I was thinking of either outputting to .txt, or use the CSV lib to writerows the same way I do now with the txt files.



  • Do I need the main method? Or is there a better way to call all the different functions? Maybe with the possibility to ignore something in the future? Without commenting out the function call?



  • The systems that will be importing this are very sensitive to extra whitespaces and so, currently using lstrip/rstrip for this, and splitlines(). Any better way?



Script:

`import requests
import re
from OTXv2 import OTXv2
from collections import defaultdict
from datetime import datetime, timedelta

# Defines the list that

Solution

Regarding your questions:

  • See below, the code is very repetitive, so if you notice a case like


that there's almost always a way to reduce the repetition.

  • I'm not sure what you are concerned about, if you need to disable


something temporarily consider argparse if you drive it from the
commandline with feature flags.

  • That seems sensible, normalising the input by stripping out whitespace


is pretty common.

Other remarks about the code:

  • Use with with open so that files are always closed.



  • Using a class is fine I guess to keep things together, but seems a bit


pointless given that it's mostly just procedural code.

  • I'd probably remove all the temporary variables and inline things to


save a few lines.

  • The fixed URLs might be better off as constants grouped at the top.



  • d is a global. That's ... not good. The one thing that really


should be a member variable is not. Consider moving that into the
constructor if you keep it as a class.

-
Even if the structure isn't exactly the same, at least extract some
common code into separate methods, e.g.:

def get_source(self, url, key):
    result = d[key]
    for line in self.download_file(url).splitlines():
        if len(line) > 0:
            if self.ipgrabber(line.lstrip().rstrip()):
                result.append(line)


or something along those lines. I've put result at the start
because it's a minor optimisation, it could be left in the inner loop
in case you find it too verbose.

-
The main method has what, about ten duplicate loops? Again, extract
the shared code with the name of the list as the parameter, e.g.:

def write_list(self, f, key, name):
    f.write("".join("{}\t[{}]\n".format(t, name) for t in d[key]))
...
# in main
    self.write_list(f, 'ransomwaretrackerdomain', 'Ransomewaretracker')


-
Typo in get_tornodes, if len(results) > 0: should use result.

  • The methods are all returning d for no reason.



-
In get_alienvault the loop could be a bit simplified with a
definition like the following (or with a try/catch for the
mapping[t] access in case the key wasn't defined in mapping,
but I dislike that pattern):

mappings = {'IPv4': 'alienvaultip', 'URL': 'alienvaulturl', 'domain': 'alienvaultdomain'}
for index, feeds in enumerate(pulses):
    for pulse in pulses[index]['indicators']:
        t = pulse['type']
        if t in mappings:
            d[mapping[t]].append(pulse['indicator'])

Code Snippets

def get_source(self, url, key):
    result = d[key]
    for line in self.download_file(url).splitlines():
        if len(line) > 0:
            if self.ipgrabber(line.lstrip().rstrip()):
                result.append(line)
def write_list(self, f, key, name):
    f.write("".join("{}\t[{}]\n".format(t, name) for t in d[key]))
...
# in main
    self.write_list(f, 'ransomwaretrackerdomain', 'Ransomewaretracker')
mappings = {'IPv4': 'alienvaultip', 'URL': 'alienvaulturl', 'domain': 'alienvaultdomain'}
for index, feeds in enumerate(pulses):
    for pulse in pulses[index]['indicators']:
        t = pulse['type']
        if t in mappings:
            d[mapping[t]].append(pulse['indicator'])

Context

StackExchange Code Review Q#147106, answer score: 3

Revisions (0)

No revisions yet.