Skip to content

Commit 9013f1a

Browse files
committed
add mux as http router. add support to path methods and variables. refine some features. fix some issues. add tests.
close OpenFunction#51 Signed-off-by: Lize Cai <[email protected]>
1 parent 83d5da8 commit 9013f1a

24 files changed

+757
-34
lines changed

.github/workflows/main.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ jobs:
5555
e2e: "test/declarative/sync-http-multiple/e2e.yaml"
5656
- name: Declarative multiple Sync Cloudevent e2e test
5757
e2e: "test/declarative/sync-cloudevent-multiple/e2e.yaml"
58+
- name: Declarative multiple functions with variables e2e test
59+
e2e: "test/declarative/sync-http-variables/e2e.yaml"
5860
steps:
5961
- uses: actions/checkout@v2
6062

context/context.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
cloudevents "github.com/cloudevents/sdk-go/v2"
1717
dapr "github.com/dapr/go-sdk/client"
1818
"github.com/dapr/go-sdk/service/common"
19+
"github.com/gorilla/mux"
1920
"k8s.io/klog/v2"
2021
agentv3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
2122
)
@@ -886,3 +887,25 @@ func ConvertUserDataToBytes(data interface{}) []byte {
886887
return d
887888
}
888889
}
890+
891+
type contextKey int
892+
893+
const (
894+
varsKey contextKey = iota
895+
)
896+
897+
// Vars returns the route variables for the current request, if any.
898+
var (
899+
Vars = mux.Vars
900+
)
901+
902+
func CtxWithVars(ctx context.Context, vars map[string]string) context.Context {
903+
return context.WithValue(ctx, varsKey, vars)
904+
}
905+
906+
func VarsFromCtx(ctx context.Context) map[string]string {
907+
if rv := ctx.Value(varsKey); rv != nil {
908+
return rv.(map[string]string)
909+
}
910+
return nil
911+
}

context/context_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"os"
55
"strings"
66
"testing"
7+
"net/http"
8+
"reflect"
79
)
810

911
var (
@@ -292,3 +294,36 @@ func TestParseFunctionContext(t *testing.T) {
292294
t.Fatal("Error set function context env")
293295
}
294296
}
297+
298+
299+
func TestGetVarsFromContext(t *testing.T) {
300+
301+
tests := []struct {
302+
name string
303+
request *http.Request
304+
vars map[string]string
305+
}{
306+
{
307+
name: "single variable",
308+
request: &http.Request{},
309+
vars: map[string]string{"key1": "val1"},
310+
},
311+
{
312+
name: "multi variables",
313+
request: &http.Request{},
314+
vars: map[string]string{"key1": "val1", "key2": "val2"},
315+
},
316+
}
317+
for _, tt := range tests {
318+
t.Run(tt.name, func(t *testing.T) {
319+
r := tt.request
320+
ctx := r.Context()
321+
ctx = CtxWithVars(ctx, tt.vars)
322+
got := VarsFromCtx(ctx)
323+
if !reflect.DeepEqual(got, tt.vars) {
324+
t.Errorf("VarsFromCtx = %v, want %v", got, tt.vars)
325+
}
326+
})
327+
}
328+
329+
}

framework/framework.go

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"net/http"
77
"os"
8+
"fmt"
89

910
cloudevents "github.com/cloudevents/sdk-go/v2"
1011
"k8s.io/klog/v2"
@@ -22,6 +23,7 @@ import (
2223

2324
type functionsFrameworkImpl struct {
2425
funcContext ofctx.RuntimeContext
26+
funcContextMap map[string]ofctx.RuntimeContext
2527
prePlugins []plugin.Plugin
2628
postPlugins []plugin.Plugin
2729
pluginMap map[string]plugin.Plugin
@@ -34,6 +36,7 @@ type Framework interface {
3436
Register(ctx context.Context, fn interface{}) error
3537
RegisterPlugins(customPlugins map[string]plugin.Plugin)
3638
Start(ctx context.Context) error
39+
StartRegisteringFunctions(ctx context.Context) error
3740
GetRuntime() runtime.Interface
3841
}
3942

@@ -50,6 +53,8 @@ func NewFramework() (*functionsFrameworkImpl, error) {
5053
} else {
5154
fwk.funcContext = ctx
5255
}
56+
// for multi functions use cases
57+
fwk.funcContextMap = map[string]ofctx.RuntimeContext{}
5358

5459
// Scan the local directory and register the plugins if exist
5560
// Register the framework default plugins under `plugin` directory
@@ -100,7 +105,8 @@ func (fwk *functionsFrameworkImpl) Register(ctx context.Context, fn interface{})
100105
return nil
101106
}
102107

103-
func (fwk *functionsFrameworkImpl) Start(ctx context.Context) error {
108+
func (fwk *functionsFrameworkImpl) StartRegisteringFunctions(ctx context.Context) error {
109+
104110

105111
target := os.Getenv("FUNCTION_TARGET")
106112

@@ -110,14 +116,25 @@ func (fwk *functionsFrameworkImpl) Start(ctx context.Context) error {
110116
klog.Infof("registering function: %s on path: %s", target, fn.GetPath())
111117
switch fn.GetFunctionType() {
112118
case functions.HTTPType:
113-
fwk.Register(ctx, fn.GetHTTPFunction())
119+
if err := fwk.Register(ctx, fn.GetHTTPFunction()); err != nil {
120+
klog.Errorf("failed to register function: %v", err)
121+
return err
122+
}
114123
case functions.CloudEventType:
115-
fwk.Register(ctx, fn.GetCloudEventFunction())
124+
if err := fwk.Register(ctx, fn.GetCloudEventFunction()); err != nil {
125+
klog.Errorf("failed to register function: %v", err)
126+
return err
127+
}
116128
case functions.OpenFunctionType:
117-
fwk.Register(ctx, fn.GetOpenFunctionFunction())
129+
if err := fwk.Register(ctx, fn.GetOpenFunctionFunction()); err != nil {
130+
klog.Errorf("failed to register function: %v", err)
131+
return err
132+
}
133+
default:
134+
return fmt.Errorf("Unkown function type: %s", fn.GetFunctionType())
118135
}
119136
} else {
120-
klog.Errorf("function not found: %s", target)
137+
return fmt.Errorf("function not found: %s", target)
121138
}
122139
} else {
123140
// if FUNCTION_TARGET is not provided but user uses declarative function, by default all registered functions will be deployed.
@@ -129,19 +146,26 @@ func (fwk *functionsFrameworkImpl) Start(ctx context.Context) error {
129146
for _, name := range funcNames {
130147
if rf, ok := fwk.registry.GetRegisteredFunction(name); ok {
131148
klog.Infof("registering function: %s on path: %s", rf.GetName(), rf.GetPath())
149+
// Parse OpenFunction FunctionContext
150+
if ctx, err := ofctx.GetRuntimeContext(); err != nil {
151+
klog.Errorf("failed to parse OpenFunction FunctionContext: %v\n", err)
152+
return err
153+
} else {
154+
fwk.funcContextMap[rf.GetName()] = ctx
155+
}
132156
switch rf.GetFunctionType() {
133157
case functions.HTTPType:
134-
if err := fwk.runtime.RegisterHTTPFunction(fwk.funcContext, fwk.prePlugins, fwk.postPlugins, rf); err != nil {
158+
if err := fwk.runtime.RegisterHTTPFunction(fwk.funcContextMap[rf.GetName()], fwk.prePlugins, fwk.postPlugins, rf); err != nil {
135159
klog.Errorf("failed to register function: %v", err)
136160
return err
137161
}
138162
case functions.CloudEventType:
139-
if err := fwk.runtime.RegisterCloudEventFunction(ctx, fwk.funcContext, fwk.prePlugins, fwk.postPlugins, rf); err != nil {
163+
if err := fwk.runtime.RegisterCloudEventFunction(ctx, fwk.funcContextMap[rf.GetName()], fwk.prePlugins, fwk.postPlugins, rf); err != nil {
140164
klog.Errorf("failed to register function: %v", err)
141165
return err
142166
}
143167
case functions.OpenFunctionType:
144-
if err := fwk.runtime.RegisterOpenFunction(fwk.funcContext, fwk.prePlugins, fwk.postPlugins, rf); err != nil {
168+
if err := fwk.runtime.RegisterOpenFunction(fwk.funcContextMap[rf.GetName()], fwk.prePlugins, fwk.postPlugins, rf); err != nil {
145169
klog.Errorf("failed to register function: %v", err)
146170
return err
147171
}
@@ -150,8 +174,18 @@ func (fwk *functionsFrameworkImpl) Start(ctx context.Context) error {
150174
}
151175
}
152176
}
177+
return nil
178+
}
179+
180+
func (fwk *functionsFrameworkImpl) Start(ctx context.Context) error {
181+
182+
err := fwk.StartRegisteringFunctions(ctx)
183+
if err != nil {
184+
klog.Error("failed to start registering functions")
185+
return err
186+
}
153187

154-
err := fwk.runtime.Start(ctx)
188+
err = fwk.runtime.Start(ctx)
155189
if err != nil {
156190
klog.Error("failed to start runtime service")
157191
return err

framework/framework_test.go

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/stretchr/testify/assert"
1919

2020
ofctx "github.com/OpenFunction/functions-framework-go/context"
21+
"github.com/OpenFunction/functions-framework-go/functions"
2122
"github.com/OpenFunction/functions-framework-go/runtime/async"
2223
)
2324

@@ -166,6 +167,158 @@ func TestCloudEventFunction(t *testing.T) {
166167
}
167168
}
168169

170+
171+
func TestMultipleFunctions(t *testing.T) {
172+
env := `{
173+
"name": "function-demo",
174+
"version": "v1.0.0",
175+
"port": "8080",
176+
"runtime": "Knative",
177+
"httpPattern": "/"
178+
}`
179+
var ceDemo = struct {
180+
message map[string]string
181+
headers map[string]string
182+
}{
183+
message: map[string]string{
184+
"msg": "Hello World!",
185+
},
186+
headers: map[string]string{
187+
"Ce-Specversion": "1.0",
188+
"Ce-Type": "cloudevents.openfunction.samples.helloworld",
189+
"Ce-Source": "cloudevents.openfunction.samples/helloworldsource",
190+
"Ce-Id": "536808d3-88be-4077-9d7a-a3f162705f79",
191+
},
192+
}
193+
194+
ctx := context.Background()
195+
fwk, err := createFramework(env)
196+
if err != nil {
197+
t.Fatalf("failed to create framework: %v", err)
198+
}
199+
200+
fwk.RegisterPlugins(nil)
201+
202+
// register multiple functions
203+
functions.HTTP("http", fakeHTTPFunction,
204+
functions.WithFunctionPath("/http"),
205+
functions.WithFunctionMethods("GET"),
206+
)
207+
208+
functions.CloudEvent("ce", fakeCloudEventsFunction,
209+
functions.WithFunctionPath("/ce"),
210+
)
211+
212+
functions.OpenFunction("ofn", fakeBindingsFunction,
213+
functions.WithFunctionPath("/ofn"),
214+
functions.WithFunctionMethods("GET", "POST"),
215+
)
216+
217+
if err := fwk.StartRegisteringFunctions(ctx); err != nil {
218+
t.Fatalf("failed to start registering functions: %v", err)
219+
}
220+
221+
if fwk.GetRuntime() == nil {
222+
t.Fatal("failed to create runtime")
223+
}
224+
handler := fwk.GetRuntime().GetHandler()
225+
if handler == nil {
226+
t.Fatal("handler is nil")
227+
}
228+
229+
srv := httptest.NewServer(handler.(http.Handler))
230+
defer srv.Close()
231+
232+
// test http
233+
t.Run("sending http", func(t *testing.T) {
234+
resp, err := http.Get(srv.URL + "/http")
235+
if err != nil {
236+
t.Fatalf("http.Get: %v", err)
237+
}
238+
239+
defer resp.Body.Close()
240+
body, err := ioutil.ReadAll(resp.Body)
241+
if err != nil {
242+
t.Fatalf("ioutil.ReadAll: %v", err)
243+
}
244+
245+
if got, want := string(body), "Hello World!"; got != want {
246+
t.Fatalf("TestHTTPFunction: got %v; want %v", got, want)
247+
}
248+
})
249+
250+
// test http to openfunction
251+
t.Run("sending http to openfunction", func(t *testing.T) {
252+
resp, err := http.Get(srv.URL + "/ofn")
253+
if err != nil {
254+
t.Fatalf("http.Get: %v", err)
255+
}
256+
257+
defer resp.Body.Close()
258+
body, err := ioutil.ReadAll(resp.Body)
259+
if err != nil {
260+
t.Fatalf("ioutil.ReadAll: %v", err)
261+
}
262+
263+
if got, want := string(body), "hello there"; got != want {
264+
t.Fatalf("TestHTTPFunction: got %v; want %v", got, want)
265+
}
266+
})
267+
268+
// test cloudevent
269+
t.Run("sending cloudevent", func(t *testing.T) {
270+
messageByte, err := json.Marshal(ceDemo.message)
271+
if err != nil {
272+
t.Fatalf("failed to marshal message: %v", err)
273+
}
274+
275+
req, err := http.NewRequest("POST", srv.URL+"/ce", bytes.NewBuffer(messageByte))
276+
if err != nil {
277+
t.Fatalf("error creating HTTP request for test: %v", err)
278+
}
279+
req.Header.Set("Content-Type", "application/json")
280+
for k, v := range ceDemo.headers {
281+
req.Header.Set(k, v)
282+
}
283+
client := &http.Client{}
284+
resp, err := client.Do(req)
285+
if err != nil {
286+
t.Fatalf("failed to do client.Do: %v", err)
287+
}
288+
289+
if resp.StatusCode != http.StatusOK {
290+
t.Fatalf("failed to test cloudevents function: response status = %v, want %v", resp.StatusCode, http.StatusOK)
291+
}
292+
})
293+
294+
// test cloudevent to openfunction
295+
t.Run("sending cloudevent to openfunction", func(t *testing.T) {
296+
messageByte, err := json.Marshal(ceDemo.message)
297+
if err != nil {
298+
t.Fatalf("failed to marshal message: %v", err)
299+
}
300+
301+
req, err := http.NewRequest("POST", srv.URL+"/ofn", bytes.NewBuffer(messageByte))
302+
if err != nil {
303+
t.Fatalf("error creating HTTP request for test: %v", err)
304+
}
305+
req.Header.Set("Content-Type", "application/json")
306+
for k, v := range ceDemo.headers {
307+
req.Header.Set(k, v)
308+
}
309+
client := &http.Client{}
310+
resp, err := client.Do(req)
311+
if err != nil {
312+
t.Fatalf("failed to do client.Do: %v", err)
313+
}
314+
315+
if resp.StatusCode != http.StatusOK {
316+
t.Fatalf("failed to test cloudevents function: response status = %v, want %v", resp.StatusCode, http.StatusOK)
317+
}
318+
})
319+
320+
}
321+
169322
func TestAsyncBindingsFunction(t *testing.T) {
170323
env := `{
171324
"name": "function-demo",

functions/options.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,6 @@ import (
77
type FunctionOption = functions.FunctionOption
88

99
var (
10-
WithFunctionPath = functions.WithFunctionPath
10+
WithFunctionPath = functions.WithFunctionPath
11+
WithFunctionMethods = functions.WithFunctionMethods
1112
)

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ require (
1010
github.com/fatih/structs v1.1.0
1111
github.com/golang/protobuf v1.5.2
1212
github.com/google/uuid v1.3.0
13+
github.com/gorilla/mux v1.8.0
1314
github.com/pkg/errors v0.9.1
1415
github.com/stretchr/testify v1.7.0
1516
google.golang.org/grpc v1.40.0

0 commit comments

Comments
 (0)