TABLE OF CONTENTS


Introduction

With the Functions feature, you are able to manage and execute Python functions within ONE DATA. It allows you to access and interact with resources in and outside of ONE DATA ("outside" means that you can trigger Functions from Apps via an Endpoint for example).


First, we need to mention that Functions are an advanced Feature. To be able to use it efficiently, you should be familiar with Python scripting and working with APIs.


Note that, at the moment, the Function feature does not support versioning yet!


General Explanation

Functions Overview

Functions are accessible via the "Functions" tab in the Project Overview. There you can view all existing Functions in the Project. In the menu on the left hand side you have the possibilities to:

  1. Create new Functions
  2. Delete Functions


Function Detail Page

When clicking on a Function, the respective Function detail page is opened. There you can find several details about the resource on the top of the page. Below you have an editor for creating and adapting Functions.


Function Meta Information

Most of the Function information given at the top of the page is pretty straight forward, but there are two mentionable flags:

  • Executable version exists: Indicates that an executable version of the Function exists on the server.
  • Is last executable version: Indicates if the latest deployed version is executable. In case this flag is set to false a new attempt to deploy the Function to the infrastructure will be triggered upon next execution.


Editor Actions

In the menu on the left hand you find the general actions:

  1. Save and Deploy & Run: Saves the Function on the server if it was changed, and then deploys and executes it on the infrastructure
  2. Save & Deploy: Saves the Function on the server and deploys it on the infrastructure.
  3. Delete: Deletes the Function from the server and removes it from the infrastructure.

On the right hand side and above the editor, you can find the editor options:

  1. Only show the code editor
  2. Show a split view between editor and result
  3. Only show the result of the executed Function
  4. Show the arguments view. There you can define test input parameters for the Function.
  5. Undo
  6. Redo
  7. Search
  8. Replace
  9. Copy code to clipboard
  10. Paste from clipboard


How it Works

In this section, we will have a look at how to work with Functions and give you some information on arguments and predefined variables.


Create a Function

Functions can be created by clicking on the "Create New Function" button on the left action bar of the list view. You are then redirected to the detail page of a new Function template.


The initial Function editor view shows a pre-filled code template and contains no meta information like a creation date or an ID at that point. That's due to the fact that the Function does not exist on the ONE DATA server yet.

The Function will be created on the first saving action as this creates and deploys a self contained image to ONE DATA's infrastructure and therefore may take some time. 

Deployed Functions are persistently available until a user deletes them. Also, once saved, the meta information is available.

The deployed Function is then executable via the execute button on the left of the details page or an API call.


It is important to note that each Function is limited to: 
  • 0.5 cores 
  • 256mb memory
  • timeout: 1h

Also, each Function reserves memory on the ONE DATA Server, even when it is not executed.



Function Arguments

As mentioned in the explanation of the UI, you can pass test arguments to your Function. The inputs have to be defined in JSON format. To have multiple arguments, you need to wrap them in an object. A definition could look like this:


You can then access the defined arguments via the req Function parameter and use them in your code.

def handle(req):
"""Handles a request to the function.

Args:
req (dict): Request body containing arguments (req["args"]) and configuration (req["config"]).

Returns:
dict|str: The return value of the function. Can be a dictionary, string or None.
"""

var1 = req["args"]["exampleVariable1"] # hello
var2 = req["args"]["exampleVariable2"] # world


return {
"output": var1 + var2
}


Function Config Variables

The req parameter also offers other useful values which can be accessed via "config".

It comes with the values:

  • "authorization": Contains the user's authorization token.
  • "oneDataBaseurl": Contains the base URL of the ONE DATA instance.


This makes it easier to use the ONE DATA REST API as you already have some useful information for building requests.

Accessing the values on Python code would look like this:

instanceUrl = req["config"]["oneDataBaseurl]


Examples

Now we will have a look at some example scenarios on how Functions can be used to interact with ONE DATA.


Scenario 1: Execute a Workflow From a Function

In order to execute a Workflow from a Function, the corresponding endpoint from the ONE DATA REST API has to be called (/api/v1/workflows/{workflowId}/versions/{workflowVersion}/jobs). 


The Workflow ID and version must be specified in the URL. Also the request body must contain a field "variabeAssignments", even if it is an empty array []. The Variable needs to be added by its technical name. In this example, the Workflow ID and version and the Variable name and value are stored in the arguments of the Function.

Function code:

def handle(req):
import requests

headers = {"Authorization" : req["config"]["authorization"],
"Content-Type" : "application/json"}
body = {
"variableAssignments": [{
"variableName": req["args"]["variableName"],
"variableType": "string",
"variableValue": req["args"]["variableValue"]
}]
}
response = requests.post(req["config"]["onedataBaseurl"]+"/api /v1/workflows/"+req["args"]["workflowId"]+"/versions/"+req["args"]["workflowVersion"]+"/jobs", headers = headers, json = body, verify = False).json()
return response


Arguments:


{
"workflowVersion": "5",
"workflowId": "21b651aa-b90f-4714-bb5c-cc3a9dfe8713",
"variableName": "variable_WZN",
"variableValue": "Demo Data Set"
}



Scenario 2: Retrieve a Dataset in a Function

In this example, the content of a dataset is retrieved using the /data/content REST API endpoint. The dataset ID must be specified in the request body.

The specified Function looks as follows:

def handle(req):
import requests

headers = {"Authorization" : req["config"]["authorization"],
"Content-Type" : "application/json"}
body = [{
'dataId': req["args"]["datasetId"],
'dataRequest': {
'id': req["args"]["datasetId"],
'transformations': [],
'statistics': [],
'limiter': {
'type': 'COMPLETE'
}
},
'errorMessageFormat': 'ERROR_CODE_AND_MESSAGE'
}]

response = requests.post(req["config"]["onedataBaseurl"]+"/api/v1/data/content", headers = headers, json = body, verify = False).json()
return response


Arguments:


{"datasetId": "4e4338f9-8478-477d-80aa-c8da2e5c7f45"}



Scenario 3: Calculate and Write to a Dataset

In this example, the result of a multiplication operation is calculated and written to the specified Data Table as a new entry.

The specified Function looks as follows:


def handle(req):

import requests
import json
import datetime

# get the needed values from the arguments and store them in variables for easier usage
authorization_token = req['config']['authorization']
base_url = req['config']['onedataBaseurl']

#replace the dataset id with your dataset
dataset_id = '02902af0-c7df-4930-8a61-667caac40f9a'

# create the headers for the requests to the OD API
headers = {'Authorization' : authorization_token,
'Content-Type' : 'application/json;charset=UTF-8'}

# get the numbers from the paremeters so we don't have to get them twice when calculating the value
try:
number_1 = float(req['args']['number1'])
number_2 = float(req['args']['number2'])
except:
raise Exception('The numbers you entered have the wrong format or are empty.')

# get the current time
timestamp = datetime.datetime.now().isoformat()

# calculate the result of the multiplication

result = number_1 * number_2

# create the request body for the request to update the dataset
# it contains all changed rows
body = json.dumps({
'added': [{
'result': result,
'number1': number_1,
'number2': number_2,
'timestamp': str(timestamp)
}]
})

# request to update the dataset
update = requests.patch(base_url+'/api/v1/data/content?dataId='+dataset_id, headers = headers, data = body)

message = {}

if(update.status_code == 204):
message = {
'message': f'The calculation was successful. The result is {int(result)}.'
}
else:
raise Exception('The function execution failed.')

return message


Arguments:


{
"number1": "2",
"number2": "3",
}



Scenario 4: Save to Function

In this example, we will introduce a number of changes to a Data Table.

These changes, when applied, call a function that writes to the specified Data Table and trigger a corresponding Workflow Job.

Once the user saves changes,  the Function counts the numbers of added, updated and deleted rows and shows a message to the user.

The specified Function looks as follows:


import requests
import json

def update_data_table(data_id, frt_id, target_project, edit_data, row_id_column, od_base_url, jwt):
"""
Updates a data table via the ONE DATA API.
:param data_id: The ID of the data table to edit. May be None if frt_id is present.
:param frt_id: The legacy FRT-ID of the data table to edit. May be None if data_id is present.
:param target_project: The ID of a project in context of which to execute the edit.
:param edit_data: The changeset to be applied to the data table.
:param row_id_column: The name of the column containing the row IDs.
:param od_base_url: The base URL of the ONE DATA server.
:param jwt: The invoking users JWT.
"""
# Construct the parameterized endpoint URL for editing the data
edit_endpoint_url = "%s/api/v1/data/content?" % od_base_url
if data_id is not None:
edit_endpoint_url += "dataId=%s" % data_id
elif frt_id is not None:
edit_endpoint_url += "filterableResultTableId=%s" % frt_id
else:
raise Exception('No data table ID or FRT-ID present.')
# If there the Spark Execution Context is ambiguous we need to specify a target project
if target_project is not None:
edit_endpoint_url += "&target-project=%s" % target_project

# The added/updated/deleted rows are in 'data'
added_rows = edit_data['added_rows']
updated_rows = edit_data['updated_rows']
# The OD API expects only row IDs for the deleted rows. Extract them
deleted_row_ids = list(map(lambda r: r[row_id_column], edit_data['deleted_rows']))
# The payload for the Data Table edit as JSON:
edit_request_payload = json.dumps({
'added': added_rows,
'updated': updated_rows,
'deleted': deleted_row_ids
})

# We need to send headers with the content type and the users bearer token
headers = {
'Authorization': jwt,
'Content-Type': 'application/json;charset=UTF-8'
}

print('Executing data edit: PATCH %s' % edit_endpoint_url)
od_edit_response = requests.patch(edit_endpoint_url, headers=headers, data=edit_request_payload)

# The OD API returns 204 No Content on success
if od_edit_response.status_code != 204:
raise Exception("ONE DATA reported error on data edit. With errors: %s" % od_edit_response.json())
print('Successfully updated data.')

def handle(req):
"""
Saves the changes to the edited Data Table and invokes a workflow afterwards.
:param req: The save-to-function request payload.
:return: Returns messages to be shown in Apps.
"""
# Validate that this is really a save-to-function call
if 'edit_operation_info' not in req['args'] or 'data' not in req['args'] or 'payload' not in req['args']:
raise Exception('This function must be executed in context of save-to-function.')

edit_data = req['args']['data']
edit_operation_info = req['args']['edit_operation_info']
payload = req['args']['payload']

# Validate that a user name was specified (exists and not empty)
if 'name' not in payload or not payload['name']:
raise Exception('This function must be executed with a "name" in the payload.')

# Get the OD URL and the invoking users JWT from the invocation context
od_base_url = req["config"]["onedataBaseurl"]
jwt = req["config"]["authorization"]

# The metadata contains either the Data Table ID or legacy FRT-ID of the edited data and the row-ID column
data_id = edit_operation_info['data_id']
frt_id = edit_operation_info['frt_id']
row_id_column = edit_operation_info['row_id_column']

# A project can be specified in the function payload, e.g. if the Spark Execution is ambiguous
target_project = payload['target_project'] if 'target_project' in payload else None

# Update the data table
update_data_table(data_id, frt_id, target_project, edit_data, row_id_column, od_base_url, jwt)

num_added = len(edit_data['added_rows'])
num_updated = len(edit_data['updated_rows'])
num_deleted = len(edit_data['deleted_rows'])

return {
'messages': [{
'headline': 'Changes of %s' % payload['name'],
'message': 'Added %s rows, updated %s rows and deleted %s rows' % (num_added, num_updated, num_deleted),
'type': 'SUCCESS'
}]
}


The following configuration shows how a Function can be defined as consumer of changes to a table element: 


"saveToFunction": {
// The ID of the function to invoke when saving changes to the table
"id": "b37d1f6b-cfe2-4a09-85cd-b87e4d29ffdf",
// Whether messages of uncatched exceptions from the function should be shown to the user
"showSpecificErrorMessages": true,
// Whether all columns of deleted rows should be sent - otherwise only the row ID column will be present
"fullDeletedRows": true,
"payload": {
// An arbitrary payload can be defined here (object, array, string, ...). It is then available at args.payload in the function
"input": {
"name": "",
"$name": {
"value": "{{user_name}}"
}
}
}
}

Scenario 5: Using an Exposed Credential Key

An exposed Key's information (username, password, etc.) can be retrieved via API in Functions and Python Processors. So, if a Function requires Credentials to gain access to some service, it is possible to do this without writing them in plaintext. More on this here

The following code snippet first retrieves then prints the Key's properties.

def handle(req):
    # required package for sending requests
    import requests
    # create the header for the request using authorization
    headers = {'Authorization': req["config"]["authorization"]}
    # performing a get request to retrieve the exposed key
    # note that the server url provided in the request config can be used
    r = requests.get(req["config"]["onedataBaseurl"] + "/api/v1/keys/Key-UUID/exposed", headers=headers)
    # parse json result and read the keyInformation
    if 'keyInformation' in r.json():
        print(r.json()["keyInformation"])
    else:
        print(r.json()["errors"])
    return {}