Skip to content

Commit 8779f50

Browse files
added wrappers and updated knative functions
Signed-off-by: Abhishek Kumar <[email protected]>
1 parent f74a2df commit 8779f50

File tree

12 files changed

+499
-98
lines changed

12 files changed

+499
-98
lines changed
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
const { CloudEvent, HTTP } = require('cloudevents');
2+
const handler = require('./function').handler;
3+
4+
async function handle(context, event) {
5+
const startTime = new Date();
6+
7+
try {
8+
// Ensure event data is parsed correctly
9+
const eventData = event ? event : context.body;
10+
context.log.info(`Received event: ${JSON.stringify(eventData)}`);
11+
12+
// Call the handler function with the event data
13+
const result = await handler(eventData);
14+
const endTime = new Date();
15+
16+
context.log.info(`Function result: ${JSON.stringify(result)}`);
17+
const resultTime = (endTime - startTime) / 1000; // Time in seconds
18+
19+
// Create a response
20+
const response = {
21+
begin: startTime.toISOString(),
22+
end: endTime.toISOString(),
23+
results_time: resultTime,
24+
result: result
25+
};
26+
27+
// Return the response
28+
return {
29+
data: response,
30+
headers: { 'Content-Type': 'application/json' },
31+
statusCode: 200
32+
};
33+
} catch (error) {
34+
const endTime = new Date();
35+
const resultTime = (endTime - startTime) / 1000; // Time in seconds
36+
37+
context.log.error(`Error - invocation failed! Reason: ${error.message}`);
38+
const response = {
39+
begin: startTime.toISOString(),
40+
end: endTime.toISOString(),
41+
results_time: resultTime,
42+
result: `Error - invocation failed! Reason: ${error.message}`
43+
};
44+
45+
// Return the error response
46+
return {
47+
data: response,
48+
headers: { 'Content-Type': 'application/json' },
49+
statusCode: 500
50+
};
51+
}
52+
}
53+
54+
module.exports = handle;
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
2+
const minio = require('minio'),
3+
path = require('path'),
4+
uuid = require('uuid'),
5+
util = require('util'),
6+
stream = require('stream'),
7+
fs = require('fs');
8+
9+
class minio_storage {
10+
11+
constructor() {
12+
let address = process.env.MINIO_STORAGE_CONNECTION_URL;
13+
let access_key = process.env.MINIO_STORAGE_ACCESS_KEY;
14+
let secret_key = process.env.MINIO_STORAGE_SECRET_KEY;
15+
16+
this.client = new minio.Client(
17+
{
18+
endPoint: address.split(':')[0],
19+
port: parseInt(address.split(':')[1], 10),
20+
accessKey: access_key,
21+
secretKey: secret_key,
22+
useSSL: false
23+
}
24+
);
25+
}
26+
27+
unique_name(file) {
28+
let name = path.parse(file);
29+
let uuid_name = uuid.v4().split('-')[0];
30+
return path.join(name.dir, util.format('%s.%s%s', name.name, uuid_name, name.ext));
31+
}
32+
33+
upload(bucket, file, filepath) {
34+
let uniqueName = this.unique_name(file);
35+
return [uniqueName, this.client.fPutObject(bucket, uniqueName, filepath)];
36+
};
37+
38+
download(bucket, file, filepath) {
39+
return this.client.fGetObject(bucket, file, filepath);
40+
};
41+
42+
uploadStream(bucket, file) {
43+
var write_stream = new stream.PassThrough();
44+
let uniqueName = this.unique_name(file);
45+
let promise = this.client.putObject(bucket, uniqueName, write_stream, write_stream.size);
46+
return [write_stream, promise, uniqueName];
47+
};
48+
49+
downloadStream(bucket, file) {
50+
var read_stream = new stream.PassThrough();
51+
return this.client.getObject(bucket, file);
52+
};
53+
54+
static get_instance() {
55+
if(!this.instance) {
56+
this.instance = new storage();
57+
}
58+
return this.instance;
59+
}
60+
61+
62+
};
63+
exports.storage = minio_storage;
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import logging
2+
import datetime
3+
from flask import jsonify
4+
from parliament import Context
5+
from function import handler
6+
7+
8+
def main(context: Context):
9+
logging.getLogger().setLevel(logging.INFO)
10+
11+
try:
12+
# Extract JSON data from the request
13+
event = context.request.json
14+
15+
begin = datetime.datetime.now()
16+
# Pass the extracted JSON data to the handler function
17+
ret = handler(event)
18+
end = datetime.datetime.now()
19+
logging.info(f"Function result: {ret}")
20+
results_time = (end - begin) / datetime.timedelta(microseconds=1)
21+
22+
response = {
23+
"begin": begin.strftime("%s.%f"),
24+
"end": end.strftime("%s.%f"),
25+
"results_time": results_time,
26+
"result": ret,
27+
}
28+
29+
return jsonify(response), 200
30+
31+
except Exception as e:
32+
end = datetime.datetime.now()
33+
results_time = (end - begin) / datetime.timedelta(microseconds=1)
34+
logging.error(f"Error - invocation failed! Reason: {e}")
35+
response = {
36+
"begin": begin.strftime("%s.%f"),
37+
"end": end.strftime("%s.%f"),
38+
"results_time": results_time,
39+
"result": f"Error - invocation failed! Reason: {e}",
40+
}
41+
return jsonify(response), 500
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import os
2+
import uuid
3+
import json
4+
import minio
5+
import logging
6+
7+
8+
class storage:
9+
instance = None
10+
client = None
11+
12+
def __init__(self):
13+
try:
14+
"""
15+
Minio does not allow another way of configuring timeout for connection.
16+
The rest of configuration is copied from source code of Minio.
17+
"""
18+
import urllib3
19+
from datetime import timedelta
20+
21+
timeout = timedelta(seconds=1).seconds
22+
23+
mgr = urllib3.PoolManager(
24+
timeout=urllib3.util.Timeout(connect=timeout, read=timeout),
25+
maxsize=10,
26+
retries=urllib3.Retry(
27+
total=5, backoff_factor=0.2, status_forcelist=[500, 502, 503, 504]
28+
)
29+
)
30+
self.client = minio.Minio(
31+
os.getenv("MINIO_STORAGE_CONNECTION_URL"),
32+
access_key=os.getenv("MINIO_STORAGE_ACCESS_KEY"),
33+
secret_key=os.getenv("MINIO_STORAGE_SECRET_KEY"),
34+
secure=False,
35+
http_client=mgr
36+
)
37+
except Exception as e:
38+
logging.info(e)
39+
raise e
40+
41+
@staticmethod
42+
def unique_name(name):
43+
name, extension = os.path.splitext(name)
44+
return '{name}.{random}{extension}'.format(
45+
name=name,
46+
extension=extension,
47+
random=str(uuid.uuid4()).split('-')[0]
48+
)
49+
50+
51+
def upload(self, bucket, file, filepath):
52+
key_name = storage.unique_name(file)
53+
self.client.fput_object(bucket, key_name, filepath)
54+
return key_name
55+
56+
def download(self, bucket, file, filepath):
57+
self.client.fget_object(bucket, file, filepath)
58+
59+
def download_directory(self, bucket, prefix, path):
60+
objects = self.client.list_objects(bucket, prefix, recursive=True)
61+
for obj in objects:
62+
file_name = obj.object_name
63+
self.download(bucket, file_name, os.path.join(path, file_name))
64+
65+
def upload_stream(self, bucket, file, bytes_data):
66+
key_name = storage.unique_name(file)
67+
self.client.put_object(
68+
bucket, key_name, bytes_data, bytes_data.getbuffer().nbytes
69+
)
70+
return key_name
71+
72+
def download_stream(self, bucket, file):
73+
data = self.client.get_object(bucket, file)
74+
return data.read()
75+
76+
@staticmethod
77+
def get_instance():
78+
if storage.instance is None:
79+
storage.instance = storage()
80+
return storage.instance

config/systems.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@
249249
"username": "docker_user",
250250
"deployment": {
251251
"files": [
252-
"handler.py",
252+
"func.py",
253253
"storage.py"
254254
],
255255
"packages": {
@@ -269,7 +269,7 @@
269269
"username": "docker_user",
270270
"deployment": {
271271
"files": [
272-
"handler.js",
272+
"index.js",
273273
"storage.js"
274274
],
275275
"packages": []

sebs/knative/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from .knative import KnativeSystem # noqa
2+
from .config import KnativeConfig # noqa

sebs/knative/config.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from sebs.cache import Cache
2-
from sebs.faas.config import Credentials, Resources, Config
2+
from sebs.faas.config import Resources, Config
33
from sebs.utils import LoggingHandlers
44
from sebs.storage.config import MinioConfig
55

@@ -53,9 +53,9 @@ def registry_updated(self) -> bool:
5353
@staticmethod
5454
def initialize(res: Resources, dct: dict):
5555
ret = cast(KnativeResources, res)
56-
ret._docker_registry = dct["registry"]
57-
ret._docker_username = dct["username"]
58-
ret._docker_password = dct["password"]
56+
ret._docker_registry = dct.get("registry")
57+
ret._docker_username = dct.get("username")
58+
ret._docker_password = dct.get("password")
5959

6060
@staticmethod
6161
def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Resources:
@@ -101,7 +101,9 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Resour
101101
# Check for new config
102102
if "storage" in config:
103103
ret._storage = MinioConfig.deserialize(config["storage"])
104-
ret.logging.info("Using user-provided configuration of storage for Knative.")
104+
ret.logging.info(
105+
"Using user-provided configuration of storage for Knative."
106+
)
105107

106108
# check if there has been an update
107109
if not (
@@ -122,21 +124,26 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Resour
122124
and "resources" in cached_config
123125
and "storage" in cached_config["resources"]
124126
):
125-
ret._storage = MinioConfig.deserialize(cached_config["resources"]["storage"])
127+
ret._storage = MinioConfig.deserialize(
128+
cached_config["resources"]["storage"]
129+
)
126130
ret.logging.info("Using cached configuration of storage for Knative.")
127131

128132
return ret
129133

130134
def update_cache(self, cache: Cache):
131135
super().update_cache(cache)
132136
cache.update_config(
133-
val=self.docker_registry, keys=["knative", "resources", "docker", "registry"]
137+
val=self.docker_registry,
138+
keys=["knative", "resources", "docker", "registry"],
134139
)
135140
cache.update_config(
136-
val=self.docker_username, keys=["knative", "resources", "docker", "username"]
141+
val=self.docker_username,
142+
keys=["knative", "resources", "docker", "username"],
137143
)
138144
cache.update_config(
139-
val=self.docker_password, keys=["knative", "resources", "docker", "password"]
145+
val=self.docker_password,
146+
keys=["knative", "resources", "docker", "password"],
140147
)
141148
if self._storage:
142149
self._storage.update_cache(["knative", "resources", "storage"], cache)
@@ -186,7 +193,7 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Config
186193
KnativeResources, KnativeResources.deserialize(config, cache, handlers)
187194
)
188195

189-
res = KnativeConfig(config, cached_config)
196+
res = KnativeConfig(config, cache)
190197
res.logging_handlers = handlers
191198
res._resources = resources
192199
return res

sebs/knative/function.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from sebs.faas.function import Function, FunctionConfig, Runtime
88
from sebs.storage.config import MinioConfig
99

10+
1011
@dataclass
1112
class KnativeFunctionConfig(FunctionConfig):
1213
docker_image: str = ""
@@ -31,9 +32,14 @@ def from_benchmark(benchmark: Benchmark) -> KnativeFunctionConfig:
3132
benchmark, KnativeFunctionConfig
3233
)
3334

35+
3436
class KnativeFunction(Function):
3537
def __init__(
36-
self, name: str, benchmark: str, code_package_hash: str, cfg: KnativeFunctionConfig
38+
self,
39+
name: str,
40+
benchmark: str,
41+
code_package_hash: str,
42+
cfg: KnativeFunctionConfig,
3743
):
3844
super().__init__(benchmark, name, code_package_hash, cfg)
3945

@@ -55,12 +61,17 @@ def deserialize(cached_config: dict) -> KnativeFunction:
5561

5662
cfg = KnativeFunctionConfig.deserialize(cached_config["config"])
5763
ret = KnativeFunction(
58-
cached_config["name"], cached_config["benchmark"], cached_config["hash"], cfg
64+
cached_config["name"],
65+
cached_config["benchmark"],
66+
cached_config["hash"],
67+
cfg,
5968
)
6069
for trigger in cached_config["triggers"]:
6170
trigger_type = cast(
6271
Trigger,
63-
{"Library": KnativeLibraryTrigger, "HTTP": KnativeHTTPTrigger}.get(trigger["type"]),
72+
{"Library": KnativeLibraryTrigger, "HTTP": KnativeHTTPTrigger}.get(
73+
trigger["type"]
74+
),
6475
)
6576
assert trigger_type, "Unknown trigger type {}".format(trigger["type"])
6677
ret.add_trigger(trigger_type.deserialize(trigger))

0 commit comments

Comments
 (0)