Skip to content

Make WorkerPools and Queues flushable #10001

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jan 29, 2020
92 changes: 92 additions & 0 deletions cmd/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

package cmd

import (
"fmt"
"net/http"
"os"
"time"

"code.gitea.io/gitea/modules/private"

"github.com/urfave/cli"
)

var (
// CmdManager represents the manager command
CmdManager = cli.Command{
Name: "manager",
Usage: "Manage the running gitea process",
Description: "This is a command for managing the running gitea process",
Subcommands: []cli.Command{
subcmdShutdown,
subcmdRestart,
subcmdFlushQueues,
},
}
subcmdShutdown = cli.Command{
Name: "shutdown",
Usage: "Gracefully shutdown the running process",
Action: runShutdown,
}
subcmdRestart = cli.Command{
Name: "restart",
Usage: "Gracefully restart the running process - (not implemented for windows servers)",
Action: runRestart,
}
subcmdFlushQueues = cli.Command{
Name: "flush-queues",
Usage: "Flush queues in the running process",
Action: runFlushQueues,
Flags: []cli.Flag{
cli.DurationFlag{
Name: "timeout",
Value: 60 * time.Second,
Usage: "Timeout for the flushing process",
},
cli.BoolFlag{
Name: "non-blocking",
Usage: "Set to true to not wait for flush to complete before returning",
},
},
}
)

func runShutdown(c *cli.Context) error {
setup("manager", false)
statusCode, msg := private.Shutdown()
switch statusCode {
case http.StatusInternalServerError:
fail("InternalServerError", msg)
}

fmt.Fprintln(os.Stdout, msg)
return nil
}

func runRestart(c *cli.Context) error {
setup("manager", false)
statusCode, msg := private.Restart()
switch statusCode {
case http.StatusInternalServerError:
fail("InternalServerError", msg)
}

fmt.Fprintln(os.Stdout, msg)
return nil
}

func runFlushQueues(c *cli.Context) error {
setup("manager", false)
statusCode, msg := private.FlushQueues(c.Duration("timeout"), c.Bool("non-blocking"))
switch statusCode {
case http.StatusInternalServerError:
fail("InternalServerError", msg)
}

fmt.Fprintln(os.Stdout, msg)
return nil
}
6 changes: 6 additions & 0 deletions integrations/testlogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@
package integrations

import (
"context"
"encoding/json"
"fmt"
"os"
"runtime"
"strings"
"sync"
"testing"
"time"

"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/queue"
)

var prefix string
Expand Down Expand Up @@ -98,6 +101,9 @@ func PrintCurrentTest(t testing.TB, skip ...int) func() {
}
writerCloser.setT(&t)
return func() {
if err := queue.GetManager().FlushAll(context.Background(), 20*time.Second); err != nil {
t.Errorf("Flushing queues failed with error %v", err)
}
_ = writerCloser.Close()
}
}
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ arguments - which can alternatively be run by running the subcommand web.`
cmd.CmdKeys,
cmd.CmdConvert,
cmd.CmdDoctor,
cmd.CmdManager,
}
// Now adjust these commands to add our global configuration options

Expand Down
46 changes: 31 additions & 15 deletions modules/graceful/manager_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,36 +110,27 @@ func (g *Manager) handleSignals(ctx context.Context) {
case sig := <-signalChannel:
switch sig {
case syscall.SIGHUP:
if setting.GracefulRestartable {
log.Info("PID: %d. Received SIGHUP. Forking...", pid)
err := g.doFork()
if err != nil && err.Error() != "another process already forked. Ignoring this one" {
log.Error("Error whilst forking from PID: %d : %v", pid, err)
}
} else {
log.Info("PID: %d. Received SIGHUP. Not set restartable. Shutting down...", pid)

g.doShutdown()
}
log.Info("PID: %d. Received SIGHUP. Attempting GracefulShutdown...", pid)
g.DoGracefulShutdown()
case syscall.SIGUSR1:
log.Info("PID %d. Received SIGUSR1.", pid)
case syscall.SIGUSR2:
log.Warn("PID %d. Received SIGUSR2. Hammering...", pid)
g.doHammerTime(0 * time.Second)
g.DoImmediateHammer()
case syscall.SIGINT:
log.Warn("PID %d. Received SIGINT. Shutting down...", pid)
g.doShutdown()
g.DoGracefulShutdown()
case syscall.SIGTERM:
log.Warn("PID %d. Received SIGTERM. Shutting down...", pid)
g.doShutdown()
g.DoGracefulShutdown()
case syscall.SIGTSTP:
log.Info("PID %d. Received SIGTSTP.", pid)
default:
log.Info("PID %d. Received %v.", pid, sig)
}
case <-ctx.Done():
log.Warn("PID: %d. Background context for manager closed - %v - Shutting down...", pid, ctx.Err())
g.doShutdown()
g.DoGracefulShutdown()
}
}
}
Expand All @@ -160,6 +151,31 @@ func (g *Manager) doFork() error {
return err
}

// DoGracefulRestart causes a graceful restart
func (g *Manager) DoGracefulRestart() {
if setting.GracefulRestartable {
log.Info("PID: %d. Forking...", os.Getpid())
err := g.doFork()
if err != nil && err.Error() != "another process already forked. Ignoring this one" {
log.Error("Error whilst forking from PID: %d : %v", os.Getpid(), err)
}
} else {
log.Info("PID: %d. Not set restartable. Shutting down...", os.Getpid())

g.doShutdown()
}
}

// DoImmediateHammer causes an immediate hammer
func (g *Manager) DoImmediateHammer() {
g.doHammerTime(0 * time.Second)
}

// DoGracefulShutdown causes a graceful shutdown
func (g *Manager) DoGracefulShutdown() {
g.doShutdown()
}

// RegisterServer registers the running of a listening server, in the case of unix this means that the parent process can now die.
// Any call to RegisterServer must be matched by a call to ServerDone
func (g *Manager) RegisterServer() {
Expand Down
33 changes: 28 additions & 5 deletions modules/graceful/manager_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Manager struct {
runningServerWaitGroup sync.WaitGroup
createServerWaitGroup sync.WaitGroup
terminateWaitGroup sync.WaitGroup
shutdownRequested chan struct{}
}

func newGracefulManager(ctx context.Context) *Manager {
Expand All @@ -62,6 +63,7 @@ func (g *Manager) start() {
g.shutdown = make(chan struct{})
g.hammer = make(chan struct{})
g.done = make(chan struct{})
g.shutdownRequested = make(chan struct{})

// Set the running state
g.setState(stateRunning)
Expand Down Expand Up @@ -107,20 +109,23 @@ loop:
for {
select {
case <-g.ctx.Done():
g.doShutdown()
g.DoGracefulShutdown()
waitTime += setting.GracefulHammerTime
break loop
case <-g.shutdownRequested:
waitTime += setting.GracefulHammerTime
break loop
case change := <-changes:
switch change.Cmd {
case svc.Interrogate:
status <- change.CurrentStatus
case svc.Stop, svc.Shutdown:
g.doShutdown()
g.DoGracefulShutdown()
waitTime += setting.GracefulHammerTime
break loop
case hammerCode:
g.doShutdown()
g.doHammerTime(0 * time.Second)
g.DoGracefulShutdown()
g.DoImmediateHammer()
break loop
default:
log.Debug("Unexpected control request: %v", change.Cmd)
Expand All @@ -140,7 +145,7 @@ hammerLoop:
case svc.Interrogate:
status <- change.CurrentStatus
case svc.Stop, svc.Shutdown, hammerCmd:
g.doHammerTime(0 * time.Second)
g.DoImmediateHammer()
break hammerLoop
default:
log.Debug("Unexpected control request: %v", change.Cmd)
Expand All @@ -152,6 +157,24 @@ hammerLoop:
return false, 0
}

// DoImmediateHammer causes an immediate hammer
func (g *Manager) DoImmediateHammer() {
g.doHammerTime(0 * time.Second)
}

// DoGracefulShutdown causes a graceful shutdown
func (g *Manager) DoGracefulShutdown() {
g.lock.Lock()
select {
case <-g.shutdownRequested:
g.lock.Unlock()
default:
close(g.shutdownRequested)
g.lock.Unlock()
g.doShutdown()
}
}

// RegisterServer registers the running of a listening server.
// Any call to RegisterServer must be matched by a call to ServerDone
func (g *Manager) RegisterServer() {
Expand Down
83 changes: 83 additions & 0 deletions modules/private/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

package private

import (
"encoding/json"
"fmt"
"net/http"
"time"

"code.gitea.io/gitea/modules/setting"
)

// Shutdown calls the internal shutdown function
func Shutdown() (int, string) {
reqURL := setting.LocalURL + fmt.Sprintf("api/internal/manager/shutdown")

req := newInternalRequest(reqURL, "POST")
resp, err := req.Response()
if err != nil {
return http.StatusInternalServerError, fmt.Sprintf("Unable to contact gitea: %v", err.Error())
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return resp.StatusCode, decodeJSONError(resp).Err
}

return http.StatusOK, "Shutting down"
}

// Restart calls the internal restart function
func Restart() (int, string) {
reqURL := setting.LocalURL + fmt.Sprintf("api/internal/manager/restart")

req := newInternalRequest(reqURL, "POST")
resp, err := req.Response()
if err != nil {
return http.StatusInternalServerError, fmt.Sprintf("Unable to contact gitea: %v", err.Error())
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return resp.StatusCode, decodeJSONError(resp).Err
}

return http.StatusOK, "Restarting"
}

// FlushOptions represents the options for the flush call
type FlushOptions struct {
Timeout time.Duration
NonBlocking bool
}

// FlushQueues calls the internal flush-queues function
func FlushQueues(timeout time.Duration, nonBlocking bool) (int, string) {
reqURL := setting.LocalURL + fmt.Sprintf("api/internal/manager/flush-queues")

req := newInternalRequest(reqURL, "POST")
if timeout > 0 {
req.SetTimeout(timeout+10*time.Second, timeout+10*time.Second)
}
req = req.Header("Content-Type", "application/json")
jsonBytes, _ := json.Marshal(FlushOptions{
Timeout: timeout,
NonBlocking: nonBlocking,
})
req.Body(jsonBytes)
resp, err := req.Response()
if err != nil {
return http.StatusInternalServerError, fmt.Sprintf("Unable to contact gitea: %v", err.Error())
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return resp.StatusCode, decodeJSONError(resp).Err
}

return http.StatusOK, "Flushed"
}
Loading