Skip to content

Commit 787f6c3

Browse files
zeripathdelvhlunny
authored
Ensure that Webhook tasks are not double delivered (#21558)
When re-retrieving hook tasks from the DB double check if they have not been delivered in the meantime. Further ensure that tasks are marked as delivered when they are being delivered. In addition: * Improve the error reporting and make sure that the webhook task population script runs in a separate goroutine. * Only get hook task IDs out of the DB instead of the whole task when repopulating the queue * When repopulating the queue make the DB request paged Ref #17940 Signed-off-by: Andrew Thornton <[email protected]> Co-authored-by: delvh <[email protected]> Co-authored-by: Lunny Xiao <[email protected]>
1 parent 4c00d8f commit 787f6c3

File tree

4 files changed

+99
-32
lines changed

4 files changed

+99
-32
lines changed

models/webhook/hooktask.go

+19-3
Original file line numberDiff line numberDiff line change
@@ -233,14 +233,30 @@ func ReplayHookTask(ctx context.Context, hookID int64, uuid string) (*HookTask,
233233
return newTask, db.Insert(ctx, newTask)
234234
}
235235

236-
// FindUndeliveredHookTasks represents find the undelivered hook tasks
237-
func FindUndeliveredHookTasks(ctx context.Context) ([]*HookTask, error) {
238-
tasks := make([]*HookTask, 0, 10)
236+
// FindUndeliveredHookTaskIDs will find the next 100 undelivered hook tasks with ID greater than the provided lowerID
237+
func FindUndeliveredHookTaskIDs(ctx context.Context, lowerID int64) ([]int64, error) {
238+
const batchSize = 100
239+
240+
tasks := make([]int64, 0, batchSize)
239241
return tasks, db.GetEngine(ctx).
242+
Select("id").
243+
Table(new(HookTask)).
240244
Where("is_delivered=?", false).
245+
And("id > ?", lowerID).
246+
Asc("id").
247+
Limit(batchSize).
241248
Find(&tasks)
242249
}
243250

251+
func MarkTaskDelivered(ctx context.Context, task *HookTask) (bool, error) {
252+
count, err := db.GetEngine(ctx).ID(task.ID).Where("is_delivered = ?", false).Cols("is_delivered").Update(&HookTask{
253+
ID: task.ID,
254+
IsDelivered: true,
255+
})
256+
257+
return count != 0, err
258+
}
259+
244260
// CleanupHookTaskTable deletes rows from hook_task as needed.
245261
func CleanupHookTaskTable(ctx context.Context, cleanupType HookTaskCleanupType, olderThan time.Duration, numberToKeep int) error {
246262
log.Trace("Doing: CleanupHookTaskTable")

services/webhook/deliver.go

+55-19
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"code.gitea.io/gitea/modules/graceful"
2424
"code.gitea.io/gitea/modules/hostmatcher"
2525
"code.gitea.io/gitea/modules/log"
26+
"code.gitea.io/gitea/modules/process"
2627
"code.gitea.io/gitea/modules/proxy"
2728
"code.gitea.io/gitea/modules/queue"
2829
"code.gitea.io/gitea/modules/setting"
@@ -43,7 +44,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
4344
return
4445
}
4546
// There was a panic whilst delivering a hook...
46-
log.Error("PANIC whilst trying to deliver webhook[%d] to %s Panic: %v\nStacktrace: %s", t.ID, w.URL, err, log.Stack(2))
47+
log.Error("PANIC whilst trying to deliver webhook task[%d] to webhook %s Panic: %v\nStacktrace: %s", t.ID, w.URL, err, log.Stack(2))
4748
}()
4849

4950
t.IsDelivered = true
@@ -52,7 +53,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
5253

5354
switch w.HTTPMethod {
5455
case "":
55-
log.Info("HTTP Method for webhook %d empty, setting to POST as default", t.ID)
56+
log.Info("HTTP Method for webhook %s empty, setting to POST as default", w.URL)
5657
fallthrough
5758
case http.MethodPost:
5859
switch w.ContentType {
@@ -78,14 +79,14 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
7879
case http.MethodGet:
7980
u, err := url.Parse(w.URL)
8081
if err != nil {
81-
return err
82+
return fmt.Errorf("unable to deliver webhook task[%d] as cannot parse webhook url %s: %w", t.ID, w.URL, err)
8283
}
8384
vals := u.Query()
8485
vals["payload"] = []string{t.PayloadContent}
8586
u.RawQuery = vals.Encode()
8687
req, err = http.NewRequest("GET", u.String(), nil)
8788
if err != nil {
88-
return err
89+
return fmt.Errorf("unable to deliver webhook task[%d] as unable to create HTTP request for webhook url %s: %w", t.ID, w.URL, err)
8990
}
9091
case http.MethodPut:
9192
switch w.Type {
@@ -97,13 +98,13 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
9798
url := fmt.Sprintf("%s/%s", w.URL, url.PathEscape(txnID))
9899
req, err = http.NewRequest("PUT", url, strings.NewReader(t.PayloadContent))
99100
if err != nil {
100-
return err
101+
return fmt.Errorf("unable to deliver webhook task[%d] as cannot create matrix request for webhook url %s: %w", t.ID, w.URL, err)
101102
}
102103
default:
103-
return fmt.Errorf("invalid http method for webhook: [%d] %v", t.ID, w.HTTPMethod)
104+
return fmt.Errorf("invalid http method for webhook task[%d] in webhook %s: %v", t.ID, w.URL, w.HTTPMethod)
104105
}
105106
default:
106-
return fmt.Errorf("invalid http method for webhook: [%d] %v", t.ID, w.HTTPMethod)
107+
return fmt.Errorf("invalid http method for webhook task[%d] in webhook %s: %v", t.ID, w.URL, w.HTTPMethod)
107108
}
108109

109110
var signatureSHA1 string
@@ -159,6 +160,20 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
159160
Headers: map[string]string{},
160161
}
161162

163+
// OK We're now ready to attempt to deliver the task - we must double check that it
164+
// has not been delivered in the meantime
165+
updated, err := webhook_model.MarkTaskDelivered(ctx, t)
166+
if err != nil {
167+
log.Error("MarkTaskDelivered[%d]: %v", t.ID, err)
168+
return fmt.Errorf("unable to mark task[%d] delivered in the db: %w", t.ID, err)
169+
}
170+
if !updated {
171+
// This webhook task has already been attempted to be delivered or is in the process of being delivered
172+
log.Trace("Webhook Task[%d] already delivered", t.ID)
173+
return nil
174+
}
175+
176+
// All code from this point will update the hook task
162177
defer func() {
163178
t.Delivered = time.Now().UnixNano()
164179
if t.IsSucceed {
@@ -190,13 +205,14 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
190205
}
191206

192207
if !w.IsActive {
208+
log.Trace("Webhook %s in Webhook Task[%d] is not active", w.URL, t.ID)
193209
return nil
194210
}
195211

196212
resp, err := webhookHTTPClient.Do(req.WithContext(ctx))
197213
if err != nil {
198214
t.ResponseInfo.Body = fmt.Sprintf("Delivery: %v", err)
199-
return err
215+
return fmt.Errorf("unable to deliver webhook task[%d] in %s due to error in http client: %w", t.ID, w.URL, err)
200216
}
201217
defer resp.Body.Close()
202218

@@ -210,7 +226,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
210226
p, err := io.ReadAll(resp.Body)
211227
if err != nil {
212228
t.ResponseInfo.Body = fmt.Sprintf("read body: %s", err)
213-
return err
229+
return fmt.Errorf("unable to deliver webhook task[%d] in %s as unable to read response body: %w", t.ID, w.URL, err)
214230
}
215231
t.ResponseInfo.Body = string(p)
216232
return nil
@@ -272,17 +288,37 @@ func Init() error {
272288
}
273289
go graceful.GetManager().RunWithShutdownFns(hookQueue.Run)
274290

275-
tasks, err := webhook_model.FindUndeliveredHookTasks(graceful.GetManager().HammerContext())
276-
if err != nil {
277-
log.Error("FindUndeliveredHookTasks failed: %v", err)
278-
return err
279-
}
291+
go graceful.GetManager().RunWithShutdownContext(populateWebhookSendingQueue)
292+
293+
return nil
294+
}
295+
296+
func populateWebhookSendingQueue(ctx context.Context) {
297+
ctx, _, finished := process.GetManager().AddContext(ctx, "Webhook: Populate sending queue")
298+
defer finished()
280299

281-
for _, task := range tasks {
282-
if err := enqueueHookTask(task); err != nil {
283-
log.Error("enqueueHookTask failed: %v", err)
300+
lowerID := int64(0)
301+
for {
302+
taskIDs, err := webhook_model.FindUndeliveredHookTaskIDs(ctx, lowerID)
303+
if err != nil {
304+
log.Error("Unable to populate webhook queue as FindUndeliveredHookTaskIDs failed: %v", err)
305+
return
306+
}
307+
if len(taskIDs) == 0 {
308+
return
309+
}
310+
lowerID = taskIDs[len(taskIDs)-1]
311+
312+
for _, taskID := range taskIDs {
313+
select {
314+
case <-ctx.Done():
315+
log.Warn("Shutdown before Webhook Sending queue finishing being populated")
316+
return
317+
default:
318+
}
319+
if err := enqueueHookTask(taskID); err != nil {
320+
log.Error("Unable to push HookTask[%d] to the Webhook Sending queue: %v", taskID, err)
321+
}
284322
}
285323
}
286-
287-
return nil
288324
}

services/webhook/deliver_test.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"code.gitea.io/gitea/models/unittest"
1717
webhook_model "code.gitea.io/gitea/models/webhook"
1818
"code.gitea.io/gitea/modules/setting"
19+
api "code.gitea.io/gitea/modules/structs"
1920

2021
"github.com/stretchr/testify/assert"
2122
)
@@ -67,8 +68,15 @@ func TestWebhookDeliverAuthorizationHeader(t *testing.T) {
6768
err := hook.SetHeaderAuthorization("Bearer s3cr3t-t0ken")
6869
assert.NoError(t, err)
6970
assert.NoError(t, webhook_model.CreateWebhook(db.DefaultContext, hook))
71+
db.GetEngine(db.DefaultContext).NoAutoTime().DB().Logger.ShowSQL(true)
7072

71-
hookTask := &webhook_model.HookTask{HookID: hook.ID, EventType: webhook_model.HookEventPush}
73+
hookTask := &webhook_model.HookTask{HookID: hook.ID, EventType: webhook_model.HookEventPush, Payloader: &api.PushPayload{}}
74+
75+
hookTask, err = webhook_model.CreateHookTask(db.DefaultContext, hookTask)
76+
assert.NoError(t, err)
77+
if !assert.NotNil(t, hookTask) {
78+
return
79+
}
7280

7381
assert.NoError(t, Deliver(context.Background(), hookTask))
7482
select {

services/webhook/webhook.go

+16-9
Original file line numberDiff line numberDiff line change
@@ -116,19 +116,26 @@ func handle(data ...queue.Data) []queue.Data {
116116
for _, taskID := range data {
117117
task, err := webhook_model.GetHookTaskByID(ctx, taskID.(int64))
118118
if err != nil {
119-
log.Error("GetHookTaskByID failed: %v", err)
120-
} else {
121-
if err := Deliver(ctx, task); err != nil {
122-
log.Error("webhook.Deliver failed: %v", err)
123-
}
119+
log.Error("GetHookTaskByID[%d] failed: %v", taskID.(int64), err)
120+
continue
121+
}
122+
123+
if task.IsDelivered {
124+
// Already delivered in the meantime
125+
log.Trace("Task[%d] has already been delivered", task.ID)
126+
continue
127+
}
128+
129+
if err := Deliver(ctx, task); err != nil {
130+
log.Error("Unable to deliver webhook task[%d]: %v", task.ID, err)
124131
}
125132
}
126133

127134
return nil
128135
}
129136

130-
func enqueueHookTask(task *webhook_model.HookTask) error {
131-
err := hookQueue.PushFunc(task.ID, nil)
137+
func enqueueHookTask(taskID int64) error {
138+
err := hookQueue.Push(taskID)
132139
if err != nil && err != queue.ErrAlreadyInQueue {
133140
return err
134141
}
@@ -205,7 +212,7 @@ func PrepareWebhook(ctx context.Context, w *webhook_model.Webhook, event webhook
205212
return fmt.Errorf("CreateHookTask: %w", err)
206213
}
207214

208-
return enqueueHookTask(task)
215+
return enqueueHookTask(task.ID)
209216
}
210217

211218
// PrepareWebhooks adds new webhooks to task queue for given payload.
@@ -265,5 +272,5 @@ func ReplayHookTask(ctx context.Context, w *webhook_model.Webhook, uuid string)
265272
return err
266273
}
267274

268-
return enqueueHookTask(task)
275+
return enqueueHookTask(task.ID)
269276
}

0 commit comments

Comments
 (0)