From 4112380714cd2dd2377e32c9fc2767962a12b73b Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Wed, 13 Apr 2022 11:08:39 +0800 Subject: [PATCH 1/6] Use queue instead of memory queue in webhook send service --- modules/sync/unique_queue.go | 104 ----------------------------------- routers/init.go | 2 +- services/webhook/deliver.go | 59 ++++++-------------- services/webhook/webhook.go | 49 ++++++++++++++--- 4 files changed, 59 insertions(+), 155 deletions(-) delete mode 100644 modules/sync/unique_queue.go diff --git a/modules/sync/unique_queue.go b/modules/sync/unique_queue.go deleted file mode 100644 index df115d7c96cc9..0000000000000 --- a/modules/sync/unique_queue.go +++ /dev/null @@ -1,104 +0,0 @@ -// Copyright 2016 The Gogs Authors. All rights reserved. -// Copyright 2019 The Gitea Authors. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package sync - -// UniqueQueue is a queue which guarantees only one instance of same -// identity is in the line. Instances with same identity will be -// discarded if there is already one in the line. -// -// This queue is particularly useful for preventing duplicated task -// of same purpose. -type UniqueQueue struct { - table *StatusTable - queue chan string - closed chan struct{} -} - -// NewUniqueQueue initializes and returns a new UniqueQueue object. -func NewUniqueQueue(queueLength int) *UniqueQueue { - if queueLength <= 0 { - queueLength = 100 - } - - return &UniqueQueue{ - table: NewStatusTable(), - queue: make(chan string, queueLength), - closed: make(chan struct{}), - } -} - -// Close closes this queue -func (q *UniqueQueue) Close() { - select { - case <-q.closed: - default: - q.table.lock.Lock() - select { - case <-q.closed: - default: - close(q.closed) - } - q.table.lock.Unlock() - } -} - -// IsClosed returns a channel that is closed when this Queue is closed -func (q *UniqueQueue) IsClosed() <-chan struct{} { - return q.closed -} - -// IDs returns the current ids in the pool -func (q *UniqueQueue) IDs() []string { - q.table.lock.Lock() - defer q.table.lock.Unlock() - ids := make([]string, 0, len(q.table.pool)) - for id := range q.table.pool { - ids = append(ids, id) - } - return ids -} - -// Queue returns channel of queue for retrieving instances. -func (q *UniqueQueue) Queue() <-chan string { - return q.queue -} - -// Exist returns true if there is an instance with given identity -// exists in the queue. -func (q *UniqueQueue) Exist(id string) bool { - return q.table.IsRunning(id) -} - -// AddFunc adds new instance to the queue with a custom runnable function, -// the queue is blocked until the function exits. -func (q *UniqueQueue) AddFunc(id string, fn func()) { - q.table.lock.Lock() - if _, ok := q.table.pool[id]; ok { - q.table.lock.Unlock() - return - } - q.table.pool[id] = struct{}{} - if fn != nil { - fn() - } - q.table.lock.Unlock() - select { - case <-q.closed: - return - case q.queue <- id: - return - } -} - -// Add adds new instance to the queue. -func (q *UniqueQueue) Add(id string) { - q.AddFunc(id, nil) -} - -// Remove removes instance from the queue. -func (q *UniqueQueue) Remove(id string) { - q.table.Stop(id) -} diff --git a/routers/init.go b/routers/init.go index 88c393736ef48..403fab00cd3b2 100644 --- a/routers/init.go +++ b/routers/init.go @@ -145,7 +145,7 @@ func GlobalInitInstalled(ctx context.Context) { mustInit(stats_indexer.Init) mirror_service.InitSyncMirrors() - webhook.InitDeliverHooks() + mustInit(webhook.Init) mustInit(pull_service.Init) mustInit(task.Init) mustInit(repo_migrations.Init) diff --git a/services/webhook/deliver.go b/services/webhook/deliver.go index 7998be53c2831..bd06d84d77b3a 100644 --- a/services/webhook/deliver.go +++ b/services/webhook/deliver.go @@ -15,7 +15,6 @@ import ( "io" "net/http" "net/url" - "strconv" "strings" "sync" "time" @@ -26,6 +25,7 @@ import ( "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/proxy" + "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" "github.com/gobwas/glob" @@ -202,17 +202,19 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { return nil } -// DeliverHooks checks and delivers undelivered hooks. +// populateDeliverHooks checks and delivers undelivered hooks. // FIXME: graceful: This would likely benefit from either a worker pool with dummy queue // or a full queue. Then more hooks could be sent at same time. -func DeliverHooks(ctx context.Context) { +func populateDeliverHooks(ctx context.Context) { select { case <-ctx.Done(): return default: } + ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: DeliverHooks", process.SystemProcessType, true) defer finished() + tasks, err := webhook_model.FindUndeliveredHookTasks() if err != nil { log.Error("DeliverHooks: %v", err) @@ -226,42 +228,9 @@ func DeliverHooks(ctx context.Context) { return default: } - if err = Deliver(ctx, t); err != nil { - log.Error("deliver: %v", err) - } - } - - // Start listening on new hook requests. - for { - select { - case <-ctx.Done(): - hookQueue.Close() - return - case repoIDStr := <-hookQueue.Queue(): - log.Trace("DeliverHooks [repo_id: %v]", repoIDStr) - hookQueue.Remove(repoIDStr) - - repoID, err := strconv.ParseInt(repoIDStr, 10, 64) - if err != nil { - log.Error("Invalid repo ID: %s", repoIDStr) - continue - } - tasks, err := webhook_model.FindRepoUndeliveredHookTasks(repoID) - if err != nil { - log.Error("Get repository [%d] hook tasks: %v", repoID, err) - continue - } - for _, t := range tasks { - select { - case <-ctx.Done(): - return - default: - } - if err = Deliver(ctx, t); err != nil { - log.Error("deliver: %v", err) - } - } + if err := addToTask(t.RepoID); err != nil { + log.Error("DeliverHook failed [%d]: %v", t.RepoID, err) } } } @@ -297,8 +266,8 @@ func webhookProxy() func(req *http.Request) (*url.URL, error) { } } -// InitDeliverHooks starts the hooks delivery thread -func InitDeliverHooks() { +// Init starts the hooks delivery thread +func Init() error { timeout := time.Duration(setting.Webhook.DeliverTimeout) * time.Second allowedHostListValue := setting.Webhook.AllowedHostList @@ -316,5 +285,13 @@ func InitDeliverHooks() { }, } - go graceful.GetManager().RunWithShutdownContext(DeliverHooks) + hookQueue = queue.CreateUniqueQueue("webhook_sender", handle, "") + if hookQueue == nil { + return fmt.Errorf("Unable to create webhook_sender Queue") + } + go graceful.GetManager().RunWithShutdownFns(hookQueue.Run) + + populateDeliverHooks(graceful.GetManager().HammerContext()) + + return nil } diff --git a/services/webhook/webhook.go b/services/webhook/webhook.go index a3efc7535fc3c..b15b8173f51fe 100644 --- a/services/webhook/webhook.go +++ b/services/webhook/webhook.go @@ -12,10 +12,11 @@ import ( repo_model "code.gitea.io/gitea/models/repo" webhook_model "code.gitea.io/gitea/models/webhook" "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" api "code.gitea.io/gitea/modules/structs" - "code.gitea.io/gitea/modules/sync" "code.gitea.io/gitea/modules/util" "github.com/gobwas/glob" @@ -80,7 +81,7 @@ func IsValidHookTaskType(name string) bool { } // hookQueue is a global queue of web hooks -var hookQueue = sync.NewUniqueQueue(setting.Webhook.QueueLength) +var hookQueue queue.UniqueQueue // getPayloadBranch returns branch for hook event, if applicable. func getPayloadBranch(p api.Payloader) string { @@ -101,14 +102,47 @@ func getPayloadBranch(p api.Payloader) string { return "" } +// handle passed PR IDs and test the PRs +func handle(data ...queue.Data) []queue.Data { + for _, datum := range data { + repoIDStr := datum.(string) + log.Trace("DeliverHooks [repo_id: %v]", repoIDStr) + + repoID, err := strconv.ParseInt(repoIDStr, 10, 64) + if err != nil { + log.Error("Invalid repo ID: %s", repoIDStr) + continue + } + + tasks, err := webhook_model.FindRepoUndeliveredHookTasks(repoID) + if err != nil { + log.Error("Get repository [%d] hook tasks: %v", repoID, err) + continue + } + for _, t := range tasks { + if err = Deliver(graceful.GetManager().HammerContext(), t); err != nil { + log.Error("deliver: %v", err) + } + } + } + return nil +} + +func addToTask(repoID int64) error { + err := hookQueue.PushFunc(strconv.FormatInt(repoID, 10), nil) + if err != nil && err != queue.ErrAlreadyInQueue { + return err + } + return nil +} + // PrepareWebhook adds special webhook to task queue for given payload. func PrepareWebhook(w *webhook_model.Webhook, repo *repo_model.Repository, event webhook_model.HookEventType, p api.Payloader) error { if err := prepareWebhook(w, repo, event, p); err != nil { return err } - go hookQueue.Add(strconv.FormatInt(repo.ID, 10)) - return nil + return addToTask(repo.ID) } func checkBranch(w *webhook_model.Webhook, branch string) bool { @@ -188,8 +222,7 @@ func PrepareWebhooks(repo *repo_model.Repository, event webhook_model.HookEventT return err } - go hookQueue.Add(strconv.FormatInt(repo.ID, 10)) - return nil + return addToTask(repo.ID) } func prepareWebhooks(repo *repo_model.Repository, event webhook_model.HookEventType, p api.Payloader) error { @@ -240,7 +273,5 @@ func ReplayHookTask(w *webhook_model.Webhook, uuid string) error { return err } - go hookQueue.Add(strconv.FormatInt(t.RepoID, 10)) - - return nil + return addToTask(t.RepoID) } From 9d5aa4587b71d704cc042d79738d95cd1be561ed Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Sun, 17 Apr 2022 23:55:14 +0800 Subject: [PATCH 2/6] fix bug --- routers/api/v1/repo/main_test.go | 6 ++++++ services/webhook/main_test.go | 4 ++++ 2 files changed, 10 insertions(+) diff --git a/routers/api/v1/repo/main_test.go b/routers/api/v1/repo/main_test.go index 19e524d014f76..bb9289f258c0c 100644 --- a/routers/api/v1/repo/main_test.go +++ b/routers/api/v1/repo/main_test.go @@ -9,10 +9,16 @@ import ( "testing" "code.gitea.io/gitea/models/unittest" + "code.gitea.io/gitea/modules/setting" + webhook_service "code.gitea.io/gitea/services/webhook" ) func TestMain(m *testing.M) { + setting.LoadForTest() unittest.MainTest(m, &unittest.TestOptions{ GiteaRootPath: filepath.Join("..", "..", "..", ".."), + SetUp: func() error { + return webhook_service.Init() + }, }) } diff --git a/services/webhook/main_test.go b/services/webhook/main_test.go index 25b9df0af6688..538ce34788afd 100644 --- a/services/webhook/main_test.go +++ b/services/webhook/main_test.go @@ -9,6 +9,7 @@ import ( "testing" "code.gitea.io/gitea/models/unittest" + webhook_service "code.gitea.io/gitea/services/webhook" _ "code.gitea.io/gitea/models" ) @@ -16,5 +17,8 @@ import ( func TestMain(m *testing.M) { unittest.MainTest(m, &unittest.TestOptions{ GiteaRootPath: filepath.Join("..", ".."), + SetUp: func() error { + return webhook_service.Init() + }, }) } From 95043a3ca1eb61cc04a131ad636afecfd0767fe3 Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Mon, 25 Apr 2022 15:05:15 +0800 Subject: [PATCH 3/6] Fix test --- routers/api/v1/repo/main_test.go | 4 +--- services/webhook/deliver.go | 4 ---- services/webhook/main_test.go | 5 +---- 3 files changed, 2 insertions(+), 11 deletions(-) diff --git a/routers/api/v1/repo/main_test.go b/routers/api/v1/repo/main_test.go index bb9289f258c0c..22cb3a8e6847e 100644 --- a/routers/api/v1/repo/main_test.go +++ b/routers/api/v1/repo/main_test.go @@ -17,8 +17,6 @@ func TestMain(m *testing.M) { setting.LoadForTest() unittest.MainTest(m, &unittest.TestOptions{ GiteaRootPath: filepath.Join("..", "..", "..", ".."), - SetUp: func() error { - return webhook_service.Init() - }, + SetUp: webhook_service.Init, }) } diff --git a/services/webhook/deliver.go b/services/webhook/deliver.go index bd06d84d77b3a..77744473f1ce3 100644 --- a/services/webhook/deliver.go +++ b/services/webhook/deliver.go @@ -203,18 +203,14 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { } // populateDeliverHooks checks and delivers undelivered hooks. -// FIXME: graceful: This would likely benefit from either a worker pool with dummy queue -// or a full queue. Then more hooks could be sent at same time. func populateDeliverHooks(ctx context.Context) { select { case <-ctx.Done(): return default: } - ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: DeliverHooks", process.SystemProcessType, true) defer finished() - tasks, err := webhook_model.FindUndeliveredHookTasks() if err != nil { log.Error("DeliverHooks: %v", err) diff --git a/services/webhook/main_test.go b/services/webhook/main_test.go index 538ce34788afd..e3b6f2f671b60 100644 --- a/services/webhook/main_test.go +++ b/services/webhook/main_test.go @@ -9,7 +9,6 @@ import ( "testing" "code.gitea.io/gitea/models/unittest" - webhook_service "code.gitea.io/gitea/services/webhook" _ "code.gitea.io/gitea/models" ) @@ -17,8 +16,6 @@ import ( func TestMain(m *testing.M) { unittest.MainTest(m, &unittest.TestOptions{ GiteaRootPath: filepath.Join("..", ".."), - SetUp: func() error { - return webhook_service.Init() - }, + SetUp: Init, }) } From 94cbce43932ae604f5d8cf03613e2eff6ed73f87 Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Mon, 25 Apr 2022 20:57:04 +0800 Subject: [PATCH 4/6] Fix block --- services/webhook/main_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/services/webhook/main_test.go b/services/webhook/main_test.go index e3b6f2f671b60..f13288961fb29 100644 --- a/services/webhook/main_test.go +++ b/services/webhook/main_test.go @@ -9,6 +9,7 @@ import ( "testing" "code.gitea.io/gitea/models/unittest" + "code.gitea.io/gitea/modules/setting" _ "code.gitea.io/gitea/models" ) @@ -16,6 +17,9 @@ import ( func TestMain(m *testing.M) { unittest.MainTest(m, &unittest.TestOptions{ GiteaRootPath: filepath.Join("..", ".."), - SetUp: Init, + SetUp: func() error { + setting.NewQueueService() + return Init() + }, }) } From d7e0d499b736b2f23767e2ce8fb680a61e5d80f8 Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Tue, 26 Apr 2022 00:10:09 +0800 Subject: [PATCH 5/6] Fix test --- services/webhook/main_test.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/services/webhook/main_test.go b/services/webhook/main_test.go index f13288961fb29..6254f482c71d0 100644 --- a/services/webhook/main_test.go +++ b/services/webhook/main_test.go @@ -15,11 +15,9 @@ import ( ) func TestMain(m *testing.M) { + setting.LoadForTest() unittest.MainTest(m, &unittest.TestOptions{ GiteaRootPath: filepath.Join("..", ".."), - SetUp: func() error { - setting.NewQueueService() - return Init() - }, + SetUp: Init, }) } From 632dfdc9d6b12d4aafb4b692047a3a8911006065 Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Tue, 26 Apr 2022 00:54:00 +0800 Subject: [PATCH 6/6] Fix test --- routers/api/v1/repo/main_test.go | 1 + services/webhook/main_test.go | 1 + 2 files changed, 2 insertions(+) diff --git a/routers/api/v1/repo/main_test.go b/routers/api/v1/repo/main_test.go index 22cb3a8e6847e..1f91a24937186 100644 --- a/routers/api/v1/repo/main_test.go +++ b/routers/api/v1/repo/main_test.go @@ -15,6 +15,7 @@ import ( func TestMain(m *testing.M) { setting.LoadForTest() + setting.NewQueueService() unittest.MainTest(m, &unittest.TestOptions{ GiteaRootPath: filepath.Join("..", "..", "..", ".."), SetUp: webhook_service.Init, diff --git a/services/webhook/main_test.go b/services/webhook/main_test.go index 6254f482c71d0..1dc2e1bd83fb3 100644 --- a/services/webhook/main_test.go +++ b/services/webhook/main_test.go @@ -16,6 +16,7 @@ import ( func TestMain(m *testing.M) { setting.LoadForTest() + setting.NewQueueService() unittest.MainTest(m, &unittest.TestOptions{ GiteaRootPath: filepath.Join("..", ".."), SetUp: Init,