Technical Insights

Preconditions for working on the SDK

Introduction

Basic Examples

Architecture

The SDK consists of four major parts:

User Facing

The user facing part consists of the OneDataApi and the methods it provides. The OneDataApi is structured similar to the actual OD API. For example, all methods to interact with OD workflows are located in the OneDataWorkflowApi submodule of OneDataApi, available as OneDataApi.workflows. For example, to create a OD workflow, use OneDataApi.workflows.create(...).

Besides the submodules for Workflows, DataTables, etc, there are also methods that let the user change the internal behavior and configuration. This includes manipulating the submodules' methods behavior, requests, and responses. It is also possible to change the deserialization of the response and extend OneDataApi by entirely new features.

Command Execution

  • Commands let you extend the sdk with additional requests easily
  • You only have to add the command and an executor, the rest will be handled by the existing sdk infrastructure
    • Each command has ideally one executor
    • For example a new get request can easily be added by creating a NewGetCommand and a NewGetCommandExecutor
      • NewGetCommand
        • The command should include the parameters expected by the OD API
      • NewGetCommandExecutor
        • The executor has to inherit from the BaseCommandExecutor which provides the retry logic and forces you to implement the execute method which processes the command

Request transformation and execution

  • The RequestService contains all request http methods like GET, POST ...
  • Before calling an endpoint, the RequestService transforms the request with the provided request transformers, for example the BaseUrlTransformer
    • Transformers(called here system transformers) are defined while instantiating RequestService by OneDataApi with the given parameters
      • Optional it is possible define transformers directly in the OneDataApi instantiation
    • You can also add Transformers while calling the methods of the api
  • After receiving the response, it will be transformed by the given response transformers for further processing
    • You can provide response transformers by the same rules as for request transformers

Response Deserialization

  • After the transformation, the response is sent to the respective deserializer service.
  • The mapping between the deserializer and the given sdk command is made in the OneDataApi initialization.
  • In the deserializer service the response is converted to the respective object and returned to the caller






Examples

Initialize the OneDataApi

The OneDataApi needs a base URL of the API and some authorization. The authorization can be provided via combination of username and password (which will result in an automatic login attempt) or immediately via a valid token.

Via username and password:

od = OneDataApi(base_url="https://onedata.de", username="firstname.lastname@onedata.de", password="123456")

Via token:

od = OneDataApi(base_url="https://onedata.de", token="Bearer abc123")

Get an OdFunction by ID

# Note that the returned object is of type Function.
function_fetched: Function = od.functions.get(function_created.id)

Change the return type of a predefined method in OneDataApi

Each method in the OneDataApi modules corresponds to one command. For example, OneDataApi.functions.get will result in a FunctionGetCommand. This command will (by default) be executed by its corresponding executer FunctionGetCommandExecutor. The response from the API is deserialized by ResponseToFunctionDeserializer, the default deserializer assigned to the initial FunctionGetCommand.

Let's assume you want the entire response from the API request instead of the deserialized payload (type Function). In order to achieve this, the default deserializer can be changed. This can be a custom deserializer that prepares you the exact result you need, or - in this case - the IdentityDeserializer, which just returns the input as output (identity).

od.set_deserializer(FunctionGetCommand, IdentityDeserializer)

The next time a FunctionGetCommand is executed via the corresponding methid (e.g., OneDataApi.functions.get(...)) and its response is deserialized, the unmodified response (of type requests.models.Response) is returned.

# Note that the returned entity is now of type Response instead of Function.
function_fetched: Response = od.functions.get(function_created.id)

Change timeout or turn off SSL verification for a single request

function_fetched: Response = od.functions.get(function_created.id, options=Options(timeout=100, verify=False)

Internally, these properties are transformed to RequestTransformers that adapt the request accordingly before sending it.

You can also define the timeout and SSL verification via RequestTransformers. There are two transformers provided to do this, TimeoutTransformer and SslVerificationTransformer.

function_fetched: Response = od.functions.get(function_created.id, options=Options(request_transformers=[TimeoutTransformer(100), SslVerificationTransformer(False)]))

You can also define your own Transformers that have their custom behaviour. For example, if you want to wait (sleep) before and after a request is send, a RequestTransformer and a ResponseTransformer can be added accordingly.

@dataclass(init=True)
class RequestSleepTransformer(RequestTransformer):
    n: int = field()

    def transform(self, request: Request):
        print(f"sleeping...{self.n}")
        time.sleep(self.n)
        return request


@dataclass(init=True)
class ResponseSleepTransformer(ResponseTransformer):
    n: int = field()

    def transform(self, response: Response):
        print(f"sleeping...{self.n}")
        time.sleep(self.n)
        return response


function_result = od.functions.execute(function_created.id,
                                             options=Options(request_transformers=[RequestSleepTransformer(4)],
                                                             response_transformers=[ResponseSleepTransformer(4)]))

Change timeout or turn off SSL verification for all performed requests

The easiest way to set the default timeout and SSL verification is via the constructor of the OneDataApi by providing the corresponding parameters.

od = OneDataApi(base_url="...", token="...", timeout=100, verify=False)

But you can also change the behavior during runtime. But to understand this, here is a little background information:

The default parameters if the OneDataApi constructor are translated into RequestTransformers. Each RequestTransformer takes responsibility to set one of the default values of the request. In total, there are 4 default RequestTransformers with specific order. To give the user the option to introduce additional Transformers in-between, the transformers are spaced by 100.


100: HeaderTransformer


200: BaseUrlTransformer


300: TimeoutTransformer


400: SslVerificationTransformer


You are able to replace the default transformers by reusing the order values seen above, but we suggest to use order >=1000 for your custom transformers. Transformers with higher order will eventually overwrite changes done by transformers with lower order.

# CAUTION: This irreversibly replaces the default behavior
od.set_request_transformer( 200, SslVerificationTransformer(False)     # set the default SLL verification, all following requests will use the new base url
od.set_request_transformer( 300, TimeoutTransformer(100)               # set the default timeout, all following requests will use the new base url

# SUGGESTED: Add new transformer after the default transformers (using higher order) to overwrite values as required
od.set_request_transformer(1010, SslVerificationTransformer(False)     # will overwrite the base url, all following requests will use the new base url
od.set_request_transformer(1020, TimeoutTransformer(100)               # will overwrite the base url, all following requests will use the new base url

# do some other stuff

od.set_request_transformer(1010, None)                                 # will remove your custom SslVerificationTransformer if you no longer need it
od.set_request_transformer(1020, None)                                 # will remove your custom TimeoutTransformerif you no longer need it

This also works for ResponseTransformers (using OneDataApi.set_response_transformers). We have no default ResponseTransformers right now, but we suggest to use order >=1000 here as well.

Add a custom header to a single performed requests

function_fetched: Function = od.functions.get(function_created.id, options=Options(request_transformers=[HeaderTransformer("my-custom-auth-token", "abcde12345")])

Add a custom header to all performed requests

od.set_request_transformer(1000, HeaderTransformer("my-custom-auth-token", "abcde12345"))
function_fetched: Function = od.functions.get(function_created.id)

Creating your own submodule


In case you want to extend the OneDataApi with a submodule that is not (yet) available in the od-python-sdk, you can create your own submodule. For example, if you want to have a custom submodule for the /feature endpoint.


So let's create a submodule to get the features of the OD instance via OneDataApi.features.get().


First, we need to create a Command and its Executor. In this case we have the FeaturesGetCommand (note that the command has no parameters since the GET /features call also has no parameters) and its specific executor FeaturesGetCommandExecutor.


Second, we create a new module OdFeatureApi (which extends OneDataApiModule). Here we add the methods we want. In this case only get(). In this method, the command is created and passed to the executer service.

Optionally, we can create a Deserializer for the API response. In this case, we want the JSON payload of the response as dict, therefore we can use the generic ResponseToJsonDeserializer.


Lastly, we need to connect all these things by setting the executor as well as the deserializer to the right command and introduce the new module to OneDataApi.


Now we can use the new module via OneDataApi.features.get().

import requests
from dataclasses import dataclass
from onedata.api import OneDataApi
from requests.packages.urllib3.exceptions import InsecureRequestWarning

from onedata.common.api import OneDataApiModule
from onedata.common.commands import BaseCommand
from onedata.common.deserializers import ResponseToJsonDeserializer
from onedata.common.executors import BaseCommandExecutor
from onedata.util.request import Options, Request

requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

od = OneDataApi(base_url="...",
              username="...",
              password="...",
              verify=False,
              timeout=1)


class FeaturesGetCommand(BaseCommand):
    pass


class FeaturesGetCommandExecutor(BaseCommandExecutor):
    def execute(self, command: FeaturesGetCommand, options: Options = None):
        request = Request(method="GET", path="/api/v1/features")
        response = self._request_service.perform(request, options)
        if response.status_code == 200:
            return response
        else:
            # TODO fix error message
            raise Exception(f"Wrong error code returned: {response.status_code}")


@dataclass()
class FeatureApi(OneDataApiModule):
    def get(self, options: Options = None):
        return self._execute(FeaturesGetCommand(), options)


od.features = FeatureApi(od._executor_service)
od.set_executor(FeaturesGetCommand, FeaturesGetCommandExecutor)
od.set_deserializer(FeaturesGetCommand, ResponseToJsonDeserializer)

print(od.features.get())

Example Function

workflow api

import requests
from onedata.api import OneDataApi
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from onedata.common.types import SearchParameters, Paginated
from onedata.workflows.types import Workflow, WorkflowCreate, WorkflowInformation
from onedata.jobs.types import WorkflowJob

def handle(req):
    requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

    project_id = "5db58c7d-2874-485c-9d15-b299f235f852"
    wf_id = "e2a2750c-1b5c-4d56-8a71-bb8384991fce"

    od = OneDataApi(base_url=req["config"]["onedataBaseurl"], token=req["config"]["authorization"], timeout=2)

    # get an existing workflow
    workflow: Workflow = od.workflows.get(wf_id)
    print(f"fetched workflow with name '{workflow.name}'")

    # create a new workflow in a project
    workflow_new: Workflow = od.workflows.create(data=WorkflowCreate(name="WF via SDK",
                                                                        description="A Workflow created via od-python-sdk",
                                                                        tags=[]),
                                                    project=project_id)
    print(f"workflow '{workflow_new.name}' ({workflow_new.id}) created in project '{workflow.projects[0].name}'")

    # copy nodes from one WF to another
    workflow_new.nodes = workflow.nodes
    workflow_new.name = "WF via SDK - and Nodes!"
    workflow_new = od.workflows.update(workflow_new)

    # execute the workflow async
    job: WorkflowJob = od.workflows.execute_async(workflow.id)

    # execution takes some time, therefore a result is not immediately there
    print(f"execute_async: received job with state {job.executionState} and results {job.results}")

    # get job again and wait (auto-retry) until job is finished (success, aborted, failed)
    job = od.jobs.get(id=job.id, retries=5)
    # job = od.jobs.get(id=job.id, retries=5)
    print(f"get:        received job with state {job.executionState} and results {job.results}")

    # execute the workflow sync
    job: WorkflowJob = od.workflows.execute_sync(workflow.id)
    # execution takes some time, therefore a result is not immediately there
    print(f"execute_sync:  received job with state {job.executionState} and results {job.results}")

    # get job again and wait (auto-retry) until job is finished (success, aborted, failed)
    job = od.jobs.get(id=job.id, retries=5)
    print(f"get:        received job with state {job.executionState} and results {job.results}")

    # get all workflows created via SDK
    paginated: Paginated = od.workflows.list(search_parameters=SearchParameters(page=0,
                                                                                   limit=10,
                                                                                   search="WF via SDK",
                                                                                   projects=[project_id]))
    print(f"total number of SDK workflows: {paginated.totalItems}")

    # cleanup by deleting all workflow created via SDK
    wf: WorkflowInformation
    for wf in paginated.items:
        success: bool = od.workflows.delete(wf.id)
        print(f"deleted wf with id {wf.id}: {success}")

    return {}

Function API

import time
from requests import ReadTimeout
from onedata.api import OneDataApi
from onedata.common.commands import BaseCommand
from onedata.common.types import SearchParameters, SortOrder, ResourceSortProperty, OneDataModule
from onedata.common.executors import BaseCommandExecutor
from onedata.functions.commands import FunctionExecuteCommand, FunctionUpdateCommand
from onedata.functions.executors import FunctionExecuteCommandExecutor
from onedata.common.transformers import TimeoutTransformer, BaseUrlTransformer, HeaderTransformer, \
    SslVerificationTransformer, RequestSleepTransformer, ResponseSleepTransformer, RequestTransformer
from onedata.common.types import Paginated
from onedata.common.deserializers import IdentityDeserializer, ResponseToDictDeserializer
from onedata.functions.types import FunctionCreate, FunctionEdit, Function, FunctionResult
from onedata.util.error_handler import handle_error

from onedata.util.request import Request
from onedata.common.util import Options
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning

def handle(req):

    requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

    project_id = "5db58c7d-2874-485c-9d15-b299f235f852"

    # basic example

    od = OneDataApi(base_url=req["config"]["onedataBaseurl"], token=req["config"]["authorization"], timeout=5)

    function_create: FunctionCreate = FunctionCreate(name="DummyFunction", description="", notes="", tags=[],
                                                     rawCode="def handle(req):\n return 'original run'",
                                                     project=project_id)
    try:
        od.functions.create(function_create)
    except ReadTimeout:
        print(f"created: Timed out as expected")

    # options to increase timeout
    time_out_20_options = Options(request_transformers=[TimeoutTransformer(30)])

    class RequestPrintTransformer(RequestTransformer):
        def transform(self, request: Request) -> Request:
            print(request.__dict__)
            return request

    function_created: Function = od.functions.create(function_create,
                                                        options=Options(timeout=20, request_transformers=[RequestPrintTransformer()]))
    print(f"created: {function_created}")
    to_dict = function_created.to_dict()
    print(f"to_dict: {to_dict}")
    from_dct = Function.from_dict(to_dict)
    print(f"to_dict: {from_dct}")

    function_fetched: Function = od.functions.get(function_created.id)
    print(f"fetched: {function_fetched}")

    function_result: FunctionResult = od.functions.execute(function_fetched.id, {"msg": "hello"})
    print(f"result: {function_result.response}")

    od.set_deserializer(FunctionUpdateCommand, ResponseToDictDeserializer)
    function_edited = od.functions.update(function_fetched.id,
                                             FunctionEdit(name="DummyFunction", notes="", tags=[],
                                                          rawCode="def handle(req):\n return 'edited run'"),
                                             options=time_out_20_options)
    print(f"edited: {function_edited}")
    function_result = od.functions.execute(function_created.id)

    print(f"result: {function_result.response}")


    # expert part

    # logging example
    class LoggingFunctionExecuteCommandExecutor(FunctionExecuteCommandExecutor):
        def execute(self, command: FunctionExecuteCommand, options: Options = None):
            start = time.time()
            print(f"Function execution startet at: {start}")
            result = super().execute(command, options)
            end = time.time()
            print(f"Function execution ended at: {end} (took {end - start})")
            return result


    od.set_executor(FunctionExecuteCommand, LoggingFunctionExecuteCommandExecutor)

    function_result = od.functions.execute(function_created.id)
    print(f"result: {function_result.response}")



    function_result = od.functions.execute(function_created.id,
                                              options=Options(request_transformers=[RequestSleepTransformer(4)],
                                                              response_transformers=[ResponseSleepTransformer(4)]))
    print(f"result: {function_result.response}")

    search_parameters = SearchParameters(page=0, ordering=SortOrder.ASC, limit=10, sortBy=ResourceSortProperty.created,
                                         search="DummyFunction",
                                         project_types=[OneDataModule.USE_CASES, OneDataModule.DATAHUB],
                                         projects=[project_id])
    function_list: Paginated = od.functions.list(search_parameters)
    print(f"list size: {len(function_list.items)}")

    for function in function_list.items:
        print(
            f"delete function '{function.name} (with id {function.id})':"
            f" {od.functions.delete(function.id, options=time_out_20_options)}")

    # Very expert part

    # remove default auth header and base url transformer
    od.set_request_transformer(100, HeaderTransformer("token",
                                                         "YU4ZMc7WELOhL4THHhhK78pHVMYwtBYqZozdo6wMxRxGMF1FcyrT9fSZ3K2vbbJVGdrWLuZipAtA3yvy"))
    od.set_request_transformer(200, BaseUrlTransformer("https://148.251.11.188/smartmirror/"))
    od.set_request_transformer(400, SslVerificationTransformer(False))

    od.set_deserializer(IdentityDeserializer, IdentityDeserializer)

    # set custom transformers for auth header and base url

    print(f"active transformer {od._request_transformers}")


    # introduce new command
    class WeatherGetCommand(BaseCommand):
        noop: str = None


    # introduce new command executor
    class WeatherGetCommandExecutor(BaseCommandExecutor):
        def execute(self, command: WeatherGetCommand, options: Options = None):
            request = Request(method="GET", path=f"api/weather/current")
            response = self._request_service.perform(request, options)
            if response.status_code == 200:
                return response.json()
            else:
                handle_error(response)


    # set new executor for the new command
    od.set_executor(WeatherGetCommand, WeatherGetCommandExecutor)

    # pass command to executorService to be executed with its assigned executor
    weather = od.execute_command(WeatherGetCommand())
    print(f"weather: {weather}")

    print(f"active transformer {od._request_transformers}")

    return {}

Custom module

import requests
from dataclasses import dataclass
from onedata.api import OneDataApi
from requests.packages.urllib3.exceptions import InsecureRequestWarning

from onedata.common.api import OneDataApiModule
from onedata.common.commands import BaseCommand
from onedata.common.deserializers import ResponseToJsonDeserializer
from onedata.common.executors import BaseCommandExecutor
from onedata.util.error_handler import handle_error
from onedata.util.request import Request
from onedata.common.util import Options


def handle(req):
    requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

    od = OneDataApi(base_url=req["config"]["onedataBaseurl"], token=req["config"]["authorization"], timeout=5)


    class FeaturesGetCommand(BaseCommand):
        pass


    class FeaturesGetCommandExecutor(BaseCommandExecutor):
        def execute(self, command: FeaturesGetCommand, options: Options = None):
            request = Request(method="GET", path="/api/v1/features")
            response = self._request_service.perform(request, options)
            if response.status_code == 200:
                return response
            else:
                handle_error(response)

    @dataclass()
    class FeatureApi(OneDataApiModule):
        def get(self, options: Options = None):
            return self._execute(FeaturesGetCommand(), options)


    od.features = FeatureApi(OneDataApi._executor_service)
    od.set_executor(FeaturesGetCommand, FeaturesGetCommandExecutor)
    od.set_deserializer(FeaturesGetCommand, ResponseToJsonDeserializer)

    print(od.features.get())

    return {}