Skip to content

Improve queue & process & stacktrace #24636

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions modules/queue/base_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ func (q *baseChannel) PopItem(ctx context.Context) ([]byte, error) {
func (q *baseChannel) HasItem(ctx context.Context, data []byte) (bool, error) {
q.mu.Lock()
defer q.mu.Unlock()

if !q.isUnique {
return false, nil
}
return q.set.Contains(string(data)), nil
}

Expand All @@ -107,7 +109,9 @@ func (q *baseChannel) Close() error {
defer q.mu.Unlock()

close(q.c)
q.set = container.Set[string]{}
if q.isUnique {
q.set = container.Set[string]{}
}

return nil
}
Expand All @@ -119,5 +123,9 @@ func (q *baseChannel) RemoveAll(ctx context.Context) error {
for q.c != nil && len(q.c) > 0 {
<-q.c
}

if q.isUnique {
q.set = container.Set[string]{}
}
return nil
}
13 changes: 8 additions & 5 deletions modules/queue/base_levelqueue_unique.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,20 @@ func (q *baseLevelQueueUnique) RemoveAll(ctx context.Context) error {
}
lq := (*levelUniqueQueue)(unsafe.Pointer(q.internal))

for lq.q.Len() > 0 {
if _, err := lq.q.LPop(); err != nil {
return err
}
}

// the "set" must be cleared after the "list" because there is no transaction.
// it's better to have duplicate items than losing items.
members, err := lq.set.Members()
if err != nil {
return err // seriously corrupted
}
for _, v := range members {
_, _ = lq.set.Remove(v)
}
for lq.q.Len() > 0 {
if _, err = lq.q.LPop(); err != nil {
return err
}
}
return nil
}
3 changes: 3 additions & 0 deletions modules/queue/base_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ func (q *baseRedis) Close() error {
func (q *baseRedis) RemoveAll(ctx context.Context) error {
q.mu.Lock()
defer q.mu.Unlock()

c1 := q.client.Del(ctx, q.cfg.QueueFullName)
// the "set" must be cleared after the "list" because there is no transaction.
// it's better to have duplicate items than losing items.
c2 := q.client.Del(ctx, q.cfg.SetFullName)
if c1.Err() != nil {
return c1.Err()
Expand Down
3 changes: 3 additions & 0 deletions modules/queue/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type ManagedWorkerPoolQueue interface {
// FlushWithContext tries to make the handler process all items in the queue synchronously.
// It is for testing purpose only. It's not designed to be used in a cluster.
FlushWithContext(ctx context.Context, timeout time.Duration) error

// RemoveAllItems removes all items in the base queue (on-the-fly items are not affected)
RemoveAllItems(ctx context.Context) error
}

var manager *Manager
Expand Down
5 changes: 5 additions & 0 deletions modules/queue/workerqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ func (q *WorkerPoolQueue[T]) FlushWithContext(ctx context.Context, timeout time.
}
}

// RemoveAllItems removes all items in the baes queue
func (q *WorkerPoolQueue[T]) RemoveAllItems(ctx context.Context) error {
return q.baseQueue.RemoveAll(ctx)
}

func (q *WorkerPoolQueue[T]) marshal(data T) []byte {
bs, err := json.Marshal(data)
if err != nil {
Expand Down
53 changes: 7 additions & 46 deletions options/locale/locale_en-US.ini
Original file line number Diff line number Diff line change
Expand Up @@ -3040,8 +3040,9 @@ monitor.next = Next Time
monitor.previous = Previous Time
monitor.execute_times = Executions
monitor.process = Running Processes
monitor.stacktrace = Stacktraces
monitor.goroutines = %d Goroutines
monitor.stacktrace = Stacktrace
monitor.processes_count = %d Processes
monitor.download_diagnosis_report = Download diagnosis report
monitor.desc = Description
monitor.start = Start Time
monitor.execute_time = Execution Time
Expand All @@ -3050,6 +3051,7 @@ monitor.process.cancel = Cancel process
monitor.process.cancel_desc = Cancelling a process may cause data loss
monitor.process.cancel_notices = Cancel: <strong>%s</strong>?
monitor.process.children = Children

monitor.queues = Queues
monitor.queue = Queue: %s
monitor.queue.name = Name
Expand All @@ -3060,56 +3062,15 @@ monitor.queue.maxnumberworkers = Max Number of Workers
monitor.queue.numberinqueue = Number in Queue
monitor.queue.review = Review Config
monitor.queue.review_add = Review/Add Workers
monitor.queue.configuration = Initial Configuration
monitor.queue.nopool.title = No Worker Pool
monitor.queue.nopool.desc = This queue wraps other queues and does not itself have a worker pool.
monitor.queue.wrapped.desc = A wrapped queue wraps a slow starting queue, buffering queued requests in a channel. It does not have a worker pool itself.
monitor.queue.persistable-channel.desc = A persistable-channel wraps two queues, a channel queue that has its own worker pool and a level queue for persisted requests from previous shutdowns. It does not have a worker pool itself.
monitor.queue.flush = Flush worker
monitor.queue.pool.timeout = Timeout
monitor.queue.pool.addworkers.title = Add Workers
monitor.queue.pool.addworkers.submit = Add Workers
monitor.queue.pool.addworkers.desc = Add Workers to this pool with or without a timeout. If you set a timeout these workers will be removed from the pool after the timeout has lapsed.
monitor.queue.pool.addworkers.numberworkers.placeholder = Number of Workers
monitor.queue.pool.addworkers.timeout.placeholder = Set to 0 for no timeout
monitor.queue.pool.addworkers.mustnumbergreaterzero = Number of Workers to add must be greater than zero
monitor.queue.pool.addworkers.musttimeoutduration = Timeout must be a golang duration eg. 5m or be 0
monitor.queue.pool.flush.title = Flush Queue
monitor.queue.pool.flush.desc = Flush will add a worker that will terminate once the queue is empty, or it times out.
monitor.queue.pool.flush.submit = Add Flush Worker
monitor.queue.pool.flush.added = Flush Worker added for %[1]s
monitor.queue.pool.pause.title = Pause Queue
monitor.queue.pool.pause.desc = Pausing a Queue will prevent it from processing data
monitor.queue.pool.pause.submit = Pause Queue
monitor.queue.pool.resume.title = Resume Queue
monitor.queue.pool.resume.desc = Set this queue to resume work
monitor.queue.pool.resume.submit = Resume Queue

monitor.queue.settings.title = Pool Settings
monitor.queue.settings.desc = Pools dynamically grow with a boost in response to their worker queue blocking. These changes will not affect current worker groups.
monitor.queue.settings.timeout = Boost Timeout
monitor.queue.settings.timeout.placeholder = Currently %[1]v
monitor.queue.settings.timeout.error = Timeout must be a golang duration eg. 5m or be 0
monitor.queue.settings.numberworkers = Boost Number of Workers
monitor.queue.settings.numberworkers.placeholder = Currently %[1]d
monitor.queue.settings.numberworkers.error = Number of Workers to add must be greater than or equal to zero
monitor.queue.settings.desc = Pools dynamically grow in response to their worker queue blocking.
monitor.queue.settings.maxnumberworkers = Max Number of workers
monitor.queue.settings.maxnumberworkers.placeholder = Currently %[1]d
monitor.queue.settings.maxnumberworkers.error = Max number of workers must be a number
monitor.queue.settings.submit = Update Settings
monitor.queue.settings.changed = Settings Updated
monitor.queue.settings.blocktimeout = Current Block Timeout
monitor.queue.settings.blocktimeout.value = %[1]v

monitor.queue.pool.none = This queue does not have a Pool
monitor.queue.pool.added = Worker Group Added
monitor.queue.pool.max_changed = Maximum number of workers changed
monitor.queue.pool.workers.title = Active Worker Groups
monitor.queue.pool.workers.none = No worker groups.
monitor.queue.pool.cancel = Shutdown Worker Group
monitor.queue.pool.cancelling = Worker Group shutting down
monitor.queue.pool.cancel_notices = Shutdown this group of %s workers?
monitor.queue.pool.cancel_desc = Leaving a queue without any worker groups may cause requests to block indefinitely.
monitor.queue.settings.remove_all_items = Remove all
monitor.queue.settings.remove_all_items_done = All items in the queue have been removed.

notices.system_notice_list = System Notices
notices.view_detail_header = View Notice Details
Expand Down
47 changes: 7 additions & 40 deletions routers/web/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
activities_model "code.gitea.io/gitea/models/activities"
"code.gitea.io/gitea/modules/base"
"code.gitea.io/gitea/modules/context"
"code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/updatechecker"
"code.gitea.io/gitea/modules/web"
Expand All @@ -24,7 +22,8 @@ import (

const (
tplDashboard base.TplName = "admin/dashboard"
tplMonitor base.TplName = "admin/monitor"
tplCron base.TplName = "admin/cron"
tplQueue base.TplName = "admin/queue"
tplStacktrace base.TplName = "admin/stacktrace"
tplQueueManage base.TplName = "admin/queue_manage"
)
Expand Down Expand Up @@ -142,47 +141,15 @@ func DashboardPost(ctx *context.Context) {
}
}
if form.From == "monitor" {
ctx.Redirect(setting.AppSubURL + "/admin/monitor")
ctx.Redirect(setting.AppSubURL + "/admin/monitor/cron")
} else {
ctx.Redirect(setting.AppSubURL + "/admin")
}
}

// Monitor show admin monitor page
func Monitor(ctx *context.Context) {
ctx.Data["Title"] = ctx.Tr("admin.monitor")
ctx.Data["PageIsAdminMonitor"] = true
ctx.Data["Processes"], ctx.Data["ProcessCount"] = process.GetManager().Processes(false, true)
func CronTasks(ctx *context.Context) {
ctx.Data["Title"] = ctx.Tr("admin.monitor.cron")
ctx.Data["PageIsAdminMonitorCron"] = true
ctx.Data["Entries"] = cron.ListTasks()
ctx.Data["Queues"] = queue.GetManager().ManagedQueues()

ctx.HTML(http.StatusOK, tplMonitor)
}

// GoroutineStacktrace show admin monitor goroutines page
func GoroutineStacktrace(ctx *context.Context) {
ctx.Data["Title"] = ctx.Tr("admin.monitor")
ctx.Data["PageIsAdminMonitor"] = true

processStacks, processCount, goroutineCount, err := process.GetManager().ProcessStacktraces(false, false)
if err != nil {
ctx.ServerError("GoroutineStacktrace", err)
return
}

ctx.Data["ProcessStacks"] = processStacks

ctx.Data["GoroutineCount"] = goroutineCount
ctx.Data["ProcessCount"] = processCount

ctx.HTML(http.StatusOK, tplStacktrace)
}

// MonitorCancel cancels a process
func MonitorCancel(ctx *context.Context) {
pid := ctx.Params("pid")
process.GetManager().Cancel(process.IDType(pid))
ctx.JSON(http.StatusOK, map[string]interface{}{
"redirect": setting.AppSubURL + "/admin/monitor",
})
ctx.HTML(http.StatusOK, tplCron)
}
61 changes: 61 additions & 0 deletions routers/web/admin/diagnosis.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll store this code in my quiver 😁

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully we never need such code 🤣 , no bug, no use

Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2023 The Gitea Authors.
// SPDX-License-Identifier: MIT

package admin

import (
"archive/zip"
"fmt"
"runtime/pprof"
"time"

"code.gitea.io/gitea/modules/context"
"code.gitea.io/gitea/modules/httplib"
)

func MonitorDiagnosis(ctx *context.Context) {
seconds := ctx.FormInt64("seconds")
if seconds <= 5 {
seconds = 5
}
if seconds > 300 {
seconds = 300
}

httplib.ServeSetHeaders(ctx.Resp, &httplib.ServeHeaderOptions{
ContentType: "application/zip",
Disposition: "attachment",
Filename: fmt.Sprintf("gitea-diagnosis-%s.zip", time.Now().Format("20060102-150405")),
})

zipWriter := zip.NewWriter(ctx.Resp)
defer zipWriter.Close()

f, err := zipWriter.CreateHeader(&zip.FileHeader{Name: "goroutine-before.txt", Method: zip.Deflate, Modified: time.Now()})
if err != nil {
ctx.ServerError("Failed to create zip file", err)
return
}
_ = pprof.Lookup("goroutine").WriteTo(f, 1)

f, err = zipWriter.CreateHeader(&zip.FileHeader{Name: "cpu-profile.dat", Method: zip.Deflate, Modified: time.Now()})
if err != nil {
ctx.ServerError("Failed to create zip file", err)
return
}

err = pprof.StartCPUProfile(f)
if err == nil {
time.Sleep(time.Duration(seconds) * time.Second)
pprof.StopCPUProfile()
} else {
_, _ = f.Write([]byte(err.Error()))
}

f, err = zipWriter.CreateHeader(&zip.FileHeader{Name: "goroutine-after.txt", Method: zip.Deflate, Modified: time.Now()})
if err != nil {
ctx.ServerError("Failed to create zip file", err)
return
}
_ = pprof.Lookup("goroutine").WriteTo(f, 1)
}
34 changes: 32 additions & 2 deletions routers/web/admin/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,18 @@ import (
"code.gitea.io/gitea/modules/setting"
)

// Queue shows details for a specific queue
func Queue(ctx *context.Context) {
func Queues(ctx *context.Context) {
if !setting.IsProd {
initTestQueueOnce()
}
ctx.Data["Title"] = ctx.Tr("admin.monitor.queue")
ctx.Data["PageIsAdminMonitorQueue"] = true
ctx.Data["Queues"] = queue.GetManager().ManagedQueues()
ctx.HTML(http.StatusOK, tplQueue)
}

// QueueManage shows details for a specific queue
func QueueManage(ctx *context.Context) {
qid := ctx.ParamsInt64("qid")
mq := queue.GetManager().GetManagedQueue(qid)
if mq == nil {
Expand Down Expand Up @@ -57,3 +67,23 @@ func QueueSet(ctx *context.Context) {
ctx.Flash.Success(ctx.Tr("admin.monitor.queue.settings.changed"))
ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10))
}

func QueueRemoveAllItems(ctx *context.Context) {
// Gitea's queue doesn't have transaction support
// So in rare cases, the queue could be corrupted/out-of-sync
// Site admin could remove all items from the queue to make it work again
qid := ctx.ParamsInt64("qid")
mq := queue.GetManager().GetManagedQueue(qid)
if mq == nil {
ctx.Status(http.StatusNotFound)
return
}

if err := mq.RemoveAllItems(ctx); err != nil {
ctx.ServerError("RemoveAllItems", err)
return
}

ctx.Flash.Success(ctx.Tr("admin.monitor.queue.settings.remove_all_items_done"))
ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10))
}
Loading