Technical Insights
Preconditions for working on the SDK
- Python installed on your machine (we are using 3.9)
- Pycharm or another IDE of your choice installed on your machine
- Access to the repository https://gitlab.service.onedata.de/software/random/od-python-sdk
- Rights or support to install the project dependencies after opening the project
Introduction
Basic Examples
Architecture
The SDK consists of four major parts:
User Facing
The user facing part consists of the OneDataApi and the methods it provides. The OneDataApi is structured similar to the actual OD API. For example, all methods to interact with OD workflows are located in the OneDataWorkflowApi submodule of OneDataApi, available as OneDataApi.workflows. For example, to create a OD workflow, use OneDataApi.workflows.create(...).
Besides the submodules for Workflows, DataTables, etc, there are also methods that let the user change the internal behavior and configuration. This includes manipulating the submodules' methods behavior, requests, and responses. It is also possible to change the deserialization of the response and extend OneDataApi by entirely new features.
Command Execution
- Commands let you extend the sdk with additional requests easily
- You only have to add the command and an executor, the rest will be handled by the existing sdk infrastructure
- Each command has ideally one executor
- For example a new get request can easily be added by creating a NewGetCommand and a NewGetCommandExecutor
- NewGetCommand
- The command should include the parameters expected by the OD API
- NewGetCommandExecutor
- The executor has to inherit from the BaseCommandExecutor which provides the retry logic and forces you to implement the execute method which processes the command
- NewGetCommand
Request transformation and execution
- The RequestService contains all request http methods like GET, POST ...
- Before calling an endpoint, the RequestService transforms the request with the provided request transformers, for example the BaseUrlTransformer
- Transformers(called here system transformers) are defined while instantiating RequestService by OneDataApi with the given parameters
- Optional it is possible define transformers directly in the OneDataApi instantiation
- You can also add Transformers while calling the methods of the api
- Transformers(called here system transformers) are defined while instantiating RequestService by OneDataApi with the given parameters
- After receiving the response, it will be transformed by the given response transformers for further processing
- You can provide response transformers by the same rules as for request transformers
Response Deserialization
- After the transformation, the response is sent to the respective deserializer service.
- The mapping between the deserializer and the given sdk command is made in the OneDataApi initialization.
- In the deserializer service the response is converted to the respective object and returned to the caller
Examples
Initialize the OneDataApi
The OneDataApi needs a base URL of the API and some authorization. The authorization can be provided via combination of username and password (which will result in an automatic login attempt) or immediately via a valid token.
Via username and password:
od = OneDataApi(base_url="https://onedata.de", username="firstname.lastname@onedata.de", password="123456")
Via token:
od = OneDataApi(base_url="https://onedata.de", token="Bearer abc123")
Get an OdFunction by ID
# Note that the returned object is of type Function.
function_fetched: Function = od.functions.get(function_created.id)
Change the return type of a predefined method in OneDataApi
Each method in the OneDataApi modules corresponds to one command. For example, OneDataApi.functions.get will result in a FunctionGetCommand. This command will (by default) be executed by its corresponding executer FunctionGetCommandExecutor. The response from the API is deserialized by ResponseToFunctionDeserializer, the default deserializer assigned to the initial FunctionGetCommand.
Let's assume you want the entire response from the API request instead of the deserialized payload (type Function). In order to achieve this, the default deserializer can be changed. This can be a custom deserializer that prepares you the exact result you need, or - in this case - the IdentityDeserializer, which just returns the input as output (identity).
od.set_deserializer(FunctionGetCommand, IdentityDeserializer)
The next time a FunctionGetCommand is executed via the corresponding methid (e.g., OneDataApi.functions.get(...)) and its response is deserialized, the unmodified response (of type requests.models.Response) is returned.
# Note that the returned entity is now of type Response instead of Function.
function_fetched: Response = od.functions.get(function_created.id)
Change timeout or turn off SSL verification for a single request
function_fetched: Response = od.functions.get(function_created.id, options=Options(timeout=100, verify=False)
Internally, these properties are transformed to RequestTransformers that adapt the request accordingly before sending it.
You can also define the timeout and SSL verification via RequestTransformers. There are two transformers provided to do this, TimeoutTransformer and SslVerificationTransformer.
function_fetched: Response = od.functions.get(function_created.id, options=Options(request_transformers=[TimeoutTransformer(100), SslVerificationTransformer(False)]))
You can also define your own Transformers that have their custom behaviour. For example, if you want to wait (sleep) before and after a request is send, a RequestTransformer and a ResponseTransformer can be added accordingly.
@dataclass(init=True)
class RequestSleepTransformer(RequestTransformer):
n: int = field()
def transform(self, request: Request):
print(f"sleeping...{self.n}")
time.sleep(self.n)
return request
@dataclass(init=True)
class ResponseSleepTransformer(ResponseTransformer):
n: int = field()
def transform(self, response: Response):
print(f"sleeping...{self.n}")
time.sleep(self.n)
return response
function_result = od.functions.execute(function_created.id,
options=Options(request_transformers=[RequestSleepTransformer(4)],
response_transformers=[ResponseSleepTransformer(4)]))
Change timeout or turn off SSL verification for all performed requests
The easiest way to set the default timeout and SSL verification is via the constructor of the OneDataApi by providing the corresponding parameters.
od = OneDataApi(base_url="...", token="...", timeout=100, verify=False)
But you can also change the behavior during runtime. But to understand this, here is a little background information:
The default parameters if the OneDataApi constructor are translated into RequestTransformers. Each RequestTransformer takes responsibility to set one of the default values of the request. In total, there are 4 default RequestTransformers with specific order. To give the user the option to introduce additional Transformers in-between, the transformers are spaced by 100.
100: HeaderTransformer
200: BaseUrlTransformer
300: TimeoutTransformer
400: SslVerificationTransformer
You are able to replace the default transformers by reusing the order values seen above, but we suggest to use order >=1000 for your custom transformers. Transformers with higher order will eventually overwrite changes done by transformers with lower order.
# CAUTION: This irreversibly replaces the default behavior
od.set_request_transformer( 200, SslVerificationTransformer(False) # set the default SLL verification, all following requests will use the new base url
od.set_request_transformer( 300, TimeoutTransformer(100) # set the default timeout, all following requests will use the new base url
# SUGGESTED: Add new transformer after the default transformers (using higher order) to overwrite values as required
od.set_request_transformer(1010, SslVerificationTransformer(False) # will overwrite the base url, all following requests will use the new base url
od.set_request_transformer(1020, TimeoutTransformer(100) # will overwrite the base url, all following requests will use the new base url
# do some other stuff
od.set_request_transformer(1010, None) # will remove your custom SslVerificationTransformer if you no longer need it
od.set_request_transformer(1020, None) # will remove your custom TimeoutTransformerif you no longer need it
This also works for ResponseTransformers (using OneDataApi.set_response_transformers). We have no default ResponseTransformers right now, but we suggest to use order >=1000 here as well.
Add a custom header to a single performed requests
function_fetched: Function = od.functions.get(function_created.id, options=Options(request_transformers=[HeaderTransformer("my-custom-auth-token", "abcde12345")])
Add a custom header to all performed requests
od.set_request_transformer(1000, HeaderTransformer("my-custom-auth-token", "abcde12345"))
function_fetched: Function = od.functions.get(function_created.id)
Creating your own submodule
In case you want to extend the OneDataApi with a submodule that is not (yet) available in the od-python-sdk, you can create your own submodule. For example, if you want to have a custom submodule for the /feature endpoint.
So let's create a submodule to get the features of the OD instance via OneDataApi.features.get().
First, we need to create a Command and its Executor. In this case we have the FeaturesGetCommand (note that the command has no parameters since the GET /features call also has no parameters) and its specific executor FeaturesGetCommandExecutor.
Second, we create a new module OdFeatureApi (which extends OneDataApiModule). Here we add the methods we want. In this case only get(). In this method, the command is created and passed to the executer service.
Optionally, we can create a Deserializer for the API response. In this case, we want the JSON payload of the response as dict, therefore we can use the generic ResponseToJsonDeserializer.
Lastly, we need to connect all these things by setting the executor as well as the deserializer to the right command and introduce the new module to OneDataApi.
Now we can use the new module via OneDataApi.features.get().
import requests
from dataclasses import dataclass
from onedata.api import OneDataApi
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from onedata.common.api import OneDataApiModule
from onedata.common.commands import BaseCommand
from onedata.common.deserializers import ResponseToJsonDeserializer
from onedata.common.executors import BaseCommandExecutor
from onedata.util.request import Options, Request
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
od = OneDataApi(base_url="...",
username="...",
password="...",
verify=False,
timeout=1)
class FeaturesGetCommand(BaseCommand):
pass
class FeaturesGetCommandExecutor(BaseCommandExecutor):
def execute(self, command: FeaturesGetCommand, options: Options = None):
request = Request(method="GET", path="/api/v1/features")
response = self._request_service.perform(request, options)
if response.status_code == 200:
return response
else:
# TODO fix error message
raise Exception(f"Wrong error code returned: {response.status_code}")
@dataclass()
class FeatureApi(OneDataApiModule):
def get(self, options: Options = None):
return self._execute(FeaturesGetCommand(), options)
od.features = FeatureApi(od._executor_service)
od.set_executor(FeaturesGetCommand, FeaturesGetCommandExecutor)
od.set_deserializer(FeaturesGetCommand, ResponseToJsonDeserializer)
print(od.features.get())
Example Function
workflow api
import requests
from onedata.api import OneDataApi
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from onedata.common.types import SearchParameters, Paginated
from onedata.workflows.types import Workflow, WorkflowCreate, WorkflowInformation
from onedata.jobs.types import WorkflowJob
def handle(req):
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
project_id = "5db58c7d-2874-485c-9d15-b299f235f852"
wf_id = "e2a2750c-1b5c-4d56-8a71-bb8384991fce"
od = OneDataApi(base_url=req["config"]["onedataBaseurl"], token=req["config"]["authorization"], timeout=2)
# get an existing workflow
workflow: Workflow = od.workflows.get(wf_id)
print(f"fetched workflow with name '{workflow.name}'")
# create a new workflow in a project
workflow_new: Workflow = od.workflows.create(data=WorkflowCreate(name="WF via SDK",
description="A Workflow created via od-python-sdk",
tags=[]),
project=project_id)
print(f"workflow '{workflow_new.name}' ({workflow_new.id}) created in project '{workflow.projects[0].name}'")
# copy nodes from one WF to another
workflow_new.nodes = workflow.nodes
workflow_new.name = "WF via SDK - and Nodes!"
workflow_new = od.workflows.update(workflow_new)
# execute the workflow async
job: WorkflowJob = od.workflows.execute_async(workflow.id)
# execution takes some time, therefore a result is not immediately there
print(f"execute_async: received job with state {job.executionState} and results {job.results}")
# get job again and wait (auto-retry) until job is finished (success, aborted, failed)
job = od.jobs.get(id=job.id, retries=5)
# job = od.jobs.get(id=job.id, retries=5)
print(f"get: received job with state {job.executionState} and results {job.results}")
# execute the workflow sync
job: WorkflowJob = od.workflows.execute_sync(workflow.id)
# execution takes some time, therefore a result is not immediately there
print(f"execute_sync: received job with state {job.executionState} and results {job.results}")
# get job again and wait (auto-retry) until job is finished (success, aborted, failed)
job = od.jobs.get(id=job.id, retries=5)
print(f"get: received job with state {job.executionState} and results {job.results}")
# get all workflows created via SDK
paginated: Paginated = od.workflows.list(search_parameters=SearchParameters(page=0,
limit=10,
search="WF via SDK",
projects=[project_id]))
print(f"total number of SDK workflows: {paginated.totalItems}")
# cleanup by deleting all workflow created via SDK
wf: WorkflowInformation
for wf in paginated.items:
success: bool = od.workflows.delete(wf.id)
print(f"deleted wf with id {wf.id}: {success}")
return {}
Function API
import time
from requests import ReadTimeout
from onedata.api import OneDataApi
from onedata.common.commands import BaseCommand
from onedata.common.types import SearchParameters, SortOrder, ResourceSortProperty, OneDataModule
from onedata.common.executors import BaseCommandExecutor
from onedata.functions.commands import FunctionExecuteCommand, FunctionUpdateCommand
from onedata.functions.executors import FunctionExecuteCommandExecutor
from onedata.common.transformers import TimeoutTransformer, BaseUrlTransformer, HeaderTransformer, \
SslVerificationTransformer, RequestSleepTransformer, ResponseSleepTransformer, RequestTransformer
from onedata.common.types import Paginated
from onedata.common.deserializers import IdentityDeserializer, ResponseToDictDeserializer
from onedata.functions.types import FunctionCreate, FunctionEdit, Function, FunctionResult
from onedata.util.error_handler import handle_error
from onedata.util.request import Request
from onedata.common.util import Options
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
def handle(req):
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
project_id = "5db58c7d-2874-485c-9d15-b299f235f852"
# basic example
od = OneDataApi(base_url=req["config"]["onedataBaseurl"], token=req["config"]["authorization"], timeout=5)
function_create: FunctionCreate = FunctionCreate(name="DummyFunction", description="", notes="", tags=[],
rawCode="def handle(req):\n return 'original run'",
project=project_id)
try:
od.functions.create(function_create)
except ReadTimeout:
print(f"created: Timed out as expected")
# options to increase timeout
time_out_20_options = Options(request_transformers=[TimeoutTransformer(30)])
class RequestPrintTransformer(RequestTransformer):
def transform(self, request: Request) -> Request:
print(request.__dict__)
return request
function_created: Function = od.functions.create(function_create,
options=Options(timeout=20, request_transformers=[RequestPrintTransformer()]))
print(f"created: {function_created}")
to_dict = function_created.to_dict()
print(f"to_dict: {to_dict}")
from_dct = Function.from_dict(to_dict)
print(f"to_dict: {from_dct}")
function_fetched: Function = od.functions.get(function_created.id)
print(f"fetched: {function_fetched}")
function_result: FunctionResult = od.functions.execute(function_fetched.id, {"msg": "hello"})
print(f"result: {function_result.response}")
od.set_deserializer(FunctionUpdateCommand, ResponseToDictDeserializer)
function_edited = od.functions.update(function_fetched.id,
FunctionEdit(name="DummyFunction", notes="", tags=[],
rawCode="def handle(req):\n return 'edited run'"),
options=time_out_20_options)
print(f"edited: {function_edited}")
function_result = od.functions.execute(function_created.id)
print(f"result: {function_result.response}")
# expert part
# logging example
class LoggingFunctionExecuteCommandExecutor(FunctionExecuteCommandExecutor):
def execute(self, command: FunctionExecuteCommand, options: Options = None):
start = time.time()
print(f"Function execution startet at: {start}")
result = super().execute(command, options)
end = time.time()
print(f"Function execution ended at: {end} (took {end - start})")
return result
od.set_executor(FunctionExecuteCommand, LoggingFunctionExecuteCommandExecutor)
function_result = od.functions.execute(function_created.id)
print(f"result: {function_result.response}")
function_result = od.functions.execute(function_created.id,
options=Options(request_transformers=[RequestSleepTransformer(4)],
response_transformers=[ResponseSleepTransformer(4)]))
print(f"result: {function_result.response}")
search_parameters = SearchParameters(page=0, ordering=SortOrder.ASC, limit=10, sortBy=ResourceSortProperty.created,
search="DummyFunction",
project_types=[OneDataModule.USE_CASES, OneDataModule.DATAHUB],
projects=[project_id])
function_list: Paginated = od.functions.list(search_parameters)
print(f"list size: {len(function_list.items)}")
for function in function_list.items:
print(
f"delete function '{function.name} (with id {function.id})':"
f" {od.functions.delete(function.id, options=time_out_20_options)}")
# Very expert part
# remove default auth header and base url transformer
od.set_request_transformer(100, HeaderTransformer("token",
"YU4ZMc7WELOhL4THHhhK78pHVMYwtBYqZozdo6wMxRxGMF1FcyrT9fSZ3K2vbbJVGdrWLuZipAtA3yvy"))
od.set_request_transformer(200, BaseUrlTransformer("https://148.251.11.188/smartmirror/"))
od.set_request_transformer(400, SslVerificationTransformer(False))
od.set_deserializer(IdentityDeserializer, IdentityDeserializer)
# set custom transformers for auth header and base url
print(f"active transformer {od._request_transformers}")
# introduce new command
class WeatherGetCommand(BaseCommand):
noop: str = None
# introduce new command executor
class WeatherGetCommandExecutor(BaseCommandExecutor):
def execute(self, command: WeatherGetCommand, options: Options = None):
request = Request(method="GET", path=f"api/weather/current")
response = self._request_service.perform(request, options)
if response.status_code == 200:
return response.json()
else:
handle_error(response)
# set new executor for the new command
od.set_executor(WeatherGetCommand, WeatherGetCommandExecutor)
# pass command to executorService to be executed with its assigned executor
weather = od.execute_command(WeatherGetCommand())
print(f"weather: {weather}")
print(f"active transformer {od._request_transformers}")
return {}
Custom module
import requests
from dataclasses import dataclass
from onedata.api import OneDataApi
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from onedata.common.api import OneDataApiModule
from onedata.common.commands import BaseCommand
from onedata.common.deserializers import ResponseToJsonDeserializer
from onedata.common.executors import BaseCommandExecutor
from onedata.util.error_handler import handle_error
from onedata.util.request import Request
from onedata.common.util import Options
def handle(req):
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
od = OneDataApi(base_url=req["config"]["onedataBaseurl"], token=req["config"]["authorization"], timeout=5)
class FeaturesGetCommand(BaseCommand):
pass
class FeaturesGetCommandExecutor(BaseCommandExecutor):
def execute(self, command: FeaturesGetCommand, options: Options = None):
request = Request(method="GET", path="/api/v1/features")
response = self._request_service.perform(request, options)
if response.status_code == 200:
return response
else:
handle_error(response)
@dataclass()
class FeatureApi(OneDataApiModule):
def get(self, options: Options = None):
return self._execute(FeaturesGetCommand(), options)
od.features = FeatureApi(OneDataApi._executor_service)
od.set_executor(FeaturesGetCommand, FeaturesGetCommandExecutor)
od.set_deserializer(FeaturesGetCommand, ResponseToJsonDeserializer)
print(od.features.get())
return {}