patternpythonMinor
Library to be used against a specific REST web service
Viewed 0 times
restusedagainstservicelibraryspecificweb
Problem
I have been trying to generate a basic library that I can use at work, to call different REST calls towards a software vendor that we are using.
I would love someone's opinion on it, and what I could do better, hopefully with some examples or links, and some descriptions. I am by no means any good at Python, but I tried my best.
The library is meant to just be imported into an existing script..
Questions:
```
#!/usr/bin/env python
"""Test SDK for Arcsight Logger"""
import time
import json
import datetime
import requests
import untangle
from requests.packages.urllib3.exceptions import InsecureRequestWarning
class ArcsightLogger(object):
"""
Main Class to interact with Arcsight Logger REST API
"""
def __init__(self):
self.target = 'https://SOMETHING:9000'
self.login = 'username'
self.password = 'password'
def post(self, url, data, isjson):
"""
Post Call towards Arcsight Logger
:param url: URL to retrieve
:param data: Request Body
:param isjson: Checks if post needs to be JSON
:return: HTTP Response
"""
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
if data:
if isjson:
try:
r = requests.post(url, json=data, verify=False)
return r
except requests.exceptions.RequestException as e:
print e
else:
try:
r = requests.post(url, data, verify=False)
return r
except requests.exceptions.RequestException as e:
print e
def arcsight_l
I would love someone's opinion on it, and what I could do better, hopefully with some examples or links, and some descriptions. I am by no means any good at Python, but I tried my best.
The library is meant to just be imported into an existing script..
Questions:
- From a library point of view, can this be designed any different?
- How could i make it any shorter / more understandable?
- I feel that i need to learn more about args* kwargs**, is that needed here?
- General coding style, naming conventions or breaking any standards (PEP8 im looking at you)?
```
#!/usr/bin/env python
"""Test SDK for Arcsight Logger"""
import time
import json
import datetime
import requests
import untangle
from requests.packages.urllib3.exceptions import InsecureRequestWarning
class ArcsightLogger(object):
"""
Main Class to interact with Arcsight Logger REST API
"""
def __init__(self):
self.target = 'https://SOMETHING:9000'
self.login = 'username'
self.password = 'password'
def post(self, url, data, isjson):
"""
Post Call towards Arcsight Logger
:param url: URL to retrieve
:param data: Request Body
:param isjson: Checks if post needs to be JSON
:return: HTTP Response
"""
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
if data:
if isjson:
try:
r = requests.post(url, json=data, verify=False)
return r
except requests.exceptions.RequestException as e:
print e
else:
try:
r = requests.post(url, data, verify=False)
return r
except requests.exceptions.RequestException as e:
print e
def arcsight_l
Solution
Creating a session
As a library, I find it barely usable. If I want to use it with my own credentials, I have to:
-
or tamper with the attributes after building an
Moreover, having to manually store and feed back to each method the generated
Instead, I would try to log the user in as soon as possible and store the generated token in an attribute for easy access by each of your methods:
Note the use of the class constant
Posting data
First off, a comment would be more than helpfull to know why you
Next, I would change the signature of
Finally, you should let the exceptions bubble up rather than print them as they will most likely indicate an issue that will prevent further processing:
Verfying search status
Your
```
def arcsight_search_complete(self, search_id):
"""
Checks the current status of a search using the search_id
:param search_id: The search_id that was generated when a new search was called
:return: Whether or not the search finished already.
"""
response = self.post(
'/server/search/status',
search_session_id=search_id,
user_session_id=self.token)
return response.json().
As a library, I find it barely usable. If I want to use it with my own credentials, I have to:
- either modify the source of your library to put them in
__init__;
-
or tamper with the attributes after building an
ArcsightLogger object:o = ArcsightLogger()
o.login = 'spam'
o.password = 'eggs'
o.main()Moreover, having to manually store and feed back to each method the generated
token is unnecessary boilerplate.Instead, I would try to log the user in as soon as possible and store the generated token in an attribute for easy access by each of your methods:
class ArcsightLogger(object):
"""
Main Class to interact with Arcsight Logger REST API
"""
TARGET = 'https://SOMETHING:9000'
def __init__(self, username, password):
"""
Log in the user whose credentials are provided and
store the access token to be used with all requests
against Arcsight
"""
data = {
'login': username,
'password': password,
}
url = self.TARGET + '/core-service/rest/LoginService/login'
r = self.post(url, data, False)
r.raise_for_status()
loginrequest = untangle.parse(r.content)
self.token = loginrequest.ns3_loginResponse.ns3_return.cdataNote the use of the class constant
TARGET instead of an instance attribute as this is not something meant to be changed when using the API.Posting data
First off, a comment would be more than helpfull to know why you
requests.packages.urllib3.disable_warnings(InsecureRequestWarning). Second, do you really need to call this at each request? I would rather put this line right after your last import. Or not at all, since it let the user know about a potential MITM attack. An other solution would be to have a configurable behaviour that let the warnings appear by default but can be disabled by the user if they so wish. I would however use the warnings module to control that.Next, I would change the signature of
_post a bit. I would ask the caller to provide only the route part of the URL as the domain is arleady stored as a class constant. It will avoid some boilerplate as the concatenation can be done in this method. I would also take advantage of Python's syntax to turn data into a dictionnary instead of letting the caller do it. I would change the isjson parameter to accept a default value of True since this is what most method uses. And, lastly, I would name it _post as it is mainly an helper function for your methods rather than part of your public API.Finally, you should let the exceptions bubble up rather than print them as they will most likely indicate an issue that will prevent further processing:
import warnings
from requests.packages.urllib3.exceptions import InsecureRequestWarning
class ArcsightLogger(object):
"""
Main Class to interact with Arcsight Logger REST API
"""
TARGET = 'https://SOMETHING:9000'
def __init__(self, username, password, disable_insecure_warning=False):
"""
Log in the user whose credentials are provided and
store the access token to be used with all requests
against Arcsight
"""
action = 'ignore' if disable_insecure_warning else 'once'
warnings.simplefilter(action, InsecureRequestWarning)
r = self.post(
'/core-service/rest/LoginService/login',
login=username, password=password, is_json=False)
r.raise_for_status()
loginrequest = untangle.parse(r.content)
self.token = loginrequest.ns3_loginResponse.ns3_return.cdata
def post(self, route, is_json=True, **data):
"""
Post Call towards Arcsight Logger
:param route: API endpoint to fetch
:param is_json: Checks if post needs to be JSON
:param data: Request Body
:return: HTTP Response
"""
if not data:
return
url = self.TARGET + route
if isjson:
return requests.post(url, json=data, verify=False)
else:
return requests.post(url, data, verify=False)Verfying search status
Your
arcsight_status method is not optimal. As a user, knowing that an operation can take a huge amount of time, I prefer have the ability to not perform a blocking call so I can perform other operations in the meantime. You could split the functionnality by providing an arcsight_search_complete method returning a boolean and build arcsight_status on top of it:```
def arcsight_search_complete(self, search_id):
"""
Checks the current status of a search using the search_id
:param search_id: The search_id that was generated when a new search was called
:return: Whether or not the search finished already.
"""
response = self.post(
'/server/search/status',
search_session_id=search_id,
user_session_id=self.token)
return response.json().
Code Snippets
o = ArcsightLogger()
o.login = 'spam'
o.password = 'eggs'
o.main()class ArcsightLogger(object):
"""
Main Class to interact with Arcsight Logger REST API
"""
TARGET = 'https://SOMETHING:9000'
def __init__(self, username, password):
"""
Log in the user whose credentials are provided and
store the access token to be used with all requests
against Arcsight
"""
data = {
'login': username,
'password': password,
}
url = self.TARGET + '/core-service/rest/LoginService/login'
r = self.post(url, data, False)
r.raise_for_status()
loginrequest = untangle.parse(r.content)
self.token = loginrequest.ns3_loginResponse.ns3_return.cdataimport warnings
from requests.packages.urllib3.exceptions import InsecureRequestWarning
class ArcsightLogger(object):
"""
Main Class to interact with Arcsight Logger REST API
"""
TARGET = 'https://SOMETHING:9000'
def __init__(self, username, password, disable_insecure_warning=False):
"""
Log in the user whose credentials are provided and
store the access token to be used with all requests
against Arcsight
"""
action = 'ignore' if disable_insecure_warning else 'once'
warnings.simplefilter(action, InsecureRequestWarning)
r = self.post(
'/core-service/rest/LoginService/login',
login=username, password=password, is_json=False)
r.raise_for_status()
loginrequest = untangle.parse(r.content)
self.token = loginrequest.ns3_loginResponse.ns3_return.cdata
def post(self, route, is_json=True, **data):
"""
Post Call towards Arcsight Logger
:param route: API endpoint to fetch
:param is_json: Checks if post needs to be JSON
:param data: Request Body
:return: HTTP Response
"""
if not data:
return
url = self.TARGET + route
if isjson:
return requests.post(url, json=data, verify=False)
else:
return requests.post(url, data, verify=False)def arcsight_search_complete(self, search_id):
"""
Checks the current status of a search using the search_id
:param search_id: The search_id that was generated when a new search was called
:return: Whether or not the search finished already.
"""
response = self.post(
'/server/search/status',
search_session_id=search_id,
user_session_id=self.token)
return response.json().get('status') == 'complete'
def arcsight_wait_for_search(self, search_id):
"""
Blocks until the search represented by search_id completes
:param search_id: The search_id that was generated when a new search was called
:return: The status of the search.
"""
while not self.arcsight_search_complete(search_id):
time.sleep(5)
return self.post(
'/server/search/status',
search_session_id=search_id,
user_session_id=self.token).json()def arcsight_events(self, search_id):
"""
Gathers events from a finished search
:param search_id: The search_id that was generated when a new search was called
:return: The events generated by a search. This returns the default arcsight
JSON format.
"""
response = self.post(
'/server/search/events',
search_session_id=search_id,
user_session_id=self.token)
self.arcsight_stop(token, searchid)
return response.json()
def arcsight_events_custom(self, search_id):
"""
Gathers events from a finished search
:param search_id: The search_id that was generated when a new search was called
:return: The events generated by a search. This returns a custom JSON format
"""
events = self.arcsight_events(search_id)
return [{
field['name']: result
for field, result in zip(events['fields'], results)
} for results in events['results']]Context
StackExchange Code Review Q#146925, answer score: 5
Revisions (0)
No revisions yet.