Skip to content

Commit 7c164d5

Browse files
authored
Use queue instead of memory queue in webhook send service (#19390)
1 parent 257cea6 commit 7c164d5

File tree

6 files changed

+66
-157
lines changed

6 files changed

+66
-157
lines changed

modules/sync/unique_queue.go

-104
This file was deleted.

routers/api/v1/repo/main_test.go

+5
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,15 @@ import (
99
"testing"
1010

1111
"code.gitea.io/gitea/models/unittest"
12+
"code.gitea.io/gitea/modules/setting"
13+
webhook_service "code.gitea.io/gitea/services/webhook"
1214
)
1315

1416
func TestMain(m *testing.M) {
17+
setting.LoadForTest()
18+
setting.NewQueueService()
1519
unittest.MainTest(m, &unittest.TestOptions{
1620
GiteaRootPath: filepath.Join("..", "..", "..", ".."),
21+
SetUp: webhook_service.Init,
1722
})
1823
}

routers/init.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func GlobalInitInstalled(ctx context.Context) {
145145
mustInit(stats_indexer.Init)
146146

147147
mirror_service.InitSyncMirrors()
148-
webhook.InitDeliverHooks()
148+
mustInit(webhook.Init)
149149
mustInit(pull_service.Init)
150150
mustInit(task.Init)
151151
mustInit(repo_migrations.Init)

services/webhook/deliver.go

+16-43
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"io"
1616
"net/http"
1717
"net/url"
18-
"strconv"
1918
"strings"
2019
"sync"
2120
"time"
@@ -26,6 +25,7 @@ import (
2625
"code.gitea.io/gitea/modules/log"
2726
"code.gitea.io/gitea/modules/process"
2827
"code.gitea.io/gitea/modules/proxy"
28+
"code.gitea.io/gitea/modules/queue"
2929
"code.gitea.io/gitea/modules/setting"
3030

3131
"github.com/gobwas/glob"
@@ -202,10 +202,8 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
202202
return nil
203203
}
204204

205-
// DeliverHooks checks and delivers undelivered hooks.
206-
// FIXME: graceful: This would likely benefit from either a worker pool with dummy queue
207-
// or a full queue. Then more hooks could be sent at same time.
208-
func DeliverHooks(ctx context.Context) {
205+
// populateDeliverHooks checks and delivers undelivered hooks.
206+
func populateDeliverHooks(ctx context.Context) {
209207
select {
210208
case <-ctx.Done():
211209
return
@@ -226,42 +224,9 @@ func DeliverHooks(ctx context.Context) {
226224
return
227225
default:
228226
}
229-
if err = Deliver(ctx, t); err != nil {
230-
log.Error("deliver: %v", err)
231-
}
232-
}
233-
234-
// Start listening on new hook requests.
235-
for {
236-
select {
237-
case <-ctx.Done():
238-
hookQueue.Close()
239-
return
240-
case repoIDStr := <-hookQueue.Queue():
241-
log.Trace("DeliverHooks [repo_id: %v]", repoIDStr)
242-
hookQueue.Remove(repoIDStr)
243-
244-
repoID, err := strconv.ParseInt(repoIDStr, 10, 64)
245-
if err != nil {
246-
log.Error("Invalid repo ID: %s", repoIDStr)
247-
continue
248-
}
249227

250-
tasks, err := webhook_model.FindRepoUndeliveredHookTasks(repoID)
251-
if err != nil {
252-
log.Error("Get repository [%d] hook tasks: %v", repoID, err)
253-
continue
254-
}
255-
for _, t := range tasks {
256-
select {
257-
case <-ctx.Done():
258-
return
259-
default:
260-
}
261-
if err = Deliver(ctx, t); err != nil {
262-
log.Error("deliver: %v", err)
263-
}
264-
}
228+
if err := addToTask(t.RepoID); err != nil {
229+
log.Error("DeliverHook failed [%d]: %v", t.RepoID, err)
265230
}
266231
}
267232
}
@@ -297,8 +262,8 @@ func webhookProxy() func(req *http.Request) (*url.URL, error) {
297262
}
298263
}
299264

300-
// InitDeliverHooks starts the hooks delivery thread
301-
func InitDeliverHooks() {
265+
// Init starts the hooks delivery thread
266+
func Init() error {
302267
timeout := time.Duration(setting.Webhook.DeliverTimeout) * time.Second
303268

304269
allowedHostListValue := setting.Webhook.AllowedHostList
@@ -316,5 +281,13 @@ func InitDeliverHooks() {
316281
},
317282
}
318283

319-
go graceful.GetManager().RunWithShutdownContext(DeliverHooks)
284+
hookQueue = queue.CreateUniqueQueue("webhook_sender", handle, "")
285+
if hookQueue == nil {
286+
return fmt.Errorf("Unable to create webhook_sender Queue")
287+
}
288+
go graceful.GetManager().RunWithShutdownFns(hookQueue.Run)
289+
290+
populateDeliverHooks(graceful.GetManager().HammerContext())
291+
292+
return nil
320293
}

services/webhook/main_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,16 @@ import (
99
"testing"
1010

1111
"code.gitea.io/gitea/models/unittest"
12+
"code.gitea.io/gitea/modules/setting"
1213

1314
_ "code.gitea.io/gitea/models"
1415
)
1516

1617
func TestMain(m *testing.M) {
18+
setting.LoadForTest()
19+
setting.NewQueueService()
1720
unittest.MainTest(m, &unittest.TestOptions{
1821
GiteaRootPath: filepath.Join("..", ".."),
22+
SetUp: Init,
1923
})
2024
}

services/webhook/webhook.go

+40-9
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@ import (
1212
repo_model "code.gitea.io/gitea/models/repo"
1313
webhook_model "code.gitea.io/gitea/models/webhook"
1414
"code.gitea.io/gitea/modules/git"
15+
"code.gitea.io/gitea/modules/graceful"
1516
"code.gitea.io/gitea/modules/log"
17+
"code.gitea.io/gitea/modules/queue"
1618
"code.gitea.io/gitea/modules/setting"
1719
api "code.gitea.io/gitea/modules/structs"
18-
"code.gitea.io/gitea/modules/sync"
1920
"code.gitea.io/gitea/modules/util"
2021

2122
"github.com/gobwas/glob"
@@ -80,7 +81,7 @@ func IsValidHookTaskType(name string) bool {
8081
}
8182

8283
// hookQueue is a global queue of web hooks
83-
var hookQueue = sync.NewUniqueQueue(setting.Webhook.QueueLength)
84+
var hookQueue queue.UniqueQueue
8485

8586
// getPayloadBranch returns branch for hook event, if applicable.
8687
func getPayloadBranch(p api.Payloader) string {
@@ -101,14 +102,47 @@ func getPayloadBranch(p api.Payloader) string {
101102
return ""
102103
}
103104

105+
// handle passed PR IDs and test the PRs
106+
func handle(data ...queue.Data) []queue.Data {
107+
for _, datum := range data {
108+
repoIDStr := datum.(string)
109+
log.Trace("DeliverHooks [repo_id: %v]", repoIDStr)
110+
111+
repoID, err := strconv.ParseInt(repoIDStr, 10, 64)
112+
if err != nil {
113+
log.Error("Invalid repo ID: %s", repoIDStr)
114+
continue
115+
}
116+
117+
tasks, err := webhook_model.FindRepoUndeliveredHookTasks(repoID)
118+
if err != nil {
119+
log.Error("Get repository [%d] hook tasks: %v", repoID, err)
120+
continue
121+
}
122+
for _, t := range tasks {
123+
if err = Deliver(graceful.GetManager().HammerContext(), t); err != nil {
124+
log.Error("deliver: %v", err)
125+
}
126+
}
127+
}
128+
return nil
129+
}
130+
131+
func addToTask(repoID int64) error {
132+
err := hookQueue.PushFunc(strconv.FormatInt(repoID, 10), nil)
133+
if err != nil && err != queue.ErrAlreadyInQueue {
134+
return err
135+
}
136+
return nil
137+
}
138+
104139
// PrepareWebhook adds special webhook to task queue for given payload.
105140
func PrepareWebhook(w *webhook_model.Webhook, repo *repo_model.Repository, event webhook_model.HookEventType, p api.Payloader) error {
106141
if err := prepareWebhook(w, repo, event, p); err != nil {
107142
return err
108143
}
109144

110-
go hookQueue.Add(strconv.FormatInt(repo.ID, 10))
111-
return nil
145+
return addToTask(repo.ID)
112146
}
113147

114148
func checkBranch(w *webhook_model.Webhook, branch string) bool {
@@ -188,8 +222,7 @@ func PrepareWebhooks(repo *repo_model.Repository, event webhook_model.HookEventT
188222
return err
189223
}
190224

191-
go hookQueue.Add(strconv.FormatInt(repo.ID, 10))
192-
return nil
225+
return addToTask(repo.ID)
193226
}
194227

195228
func prepareWebhooks(repo *repo_model.Repository, event webhook_model.HookEventType, p api.Payloader) error {
@@ -240,7 +273,5 @@ func ReplayHookTask(w *webhook_model.Webhook, uuid string) error {
240273
return err
241274
}
242275

243-
go hookQueue.Add(strconv.FormatInt(t.RepoID, 10))
244-
245-
return nil
276+
return addToTask(t.RepoID)
246277
}

0 commit comments

Comments
 (0)