Skip to content

Add finalizers to ensure that repos are closed and blobreaders are closed #19495

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 13 commits into from
Closed
66 changes: 55 additions & 11 deletions modules/git/blob_nogogit.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ import (
"bytes"
"io"
"math"
"runtime"
"sync"

"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/process"
)

// Blob represents a Git object.
Expand Down Expand Up @@ -53,11 +56,15 @@ func (b *Blob) DataAsync() (io.ReadCloser, error) {
return io.NopCloser(bytes.NewReader(bs)), err
}

return &blobReader{
br := &blobReader{
repo: b.repo,
rd: rd,
n: size,
cancel: cancel,
}, nil
}
runtime.SetFinalizer(br, (*blobReader).finalizer)

return br, nil
}

// Size returns the uncompressed size of the blob
Expand Down Expand Up @@ -85,6 +92,10 @@ func (b *Blob) Size() int64 {
}

type blobReader struct {
lock sync.Mutex
closed bool

repo *Repository
rd *bufio.Reader
n int64
cancel func()
Expand All @@ -103,27 +114,60 @@ func (b *blobReader) Read(p []byte) (n int, err error) {
}

// Close implements io.Closer
func (b *blobReader) Close() error {
defer b.cancel()
func (b *blobReader) Close() (err error) {
b.lock.Lock()
defer b.lock.Unlock()
if b.closed {
return
}
return b.close()
}

func (b *blobReader) close() (err error) {
defer func() {
b.cancel()
runtime.SetFinalizer(b, nil)
}()
b.closed = true
if b.n > 0 {
var n int
for b.n > math.MaxInt32 {
n, err := b.rd.Discard(math.MaxInt32)
n, err = b.rd.Discard(math.MaxInt32)
b.n -= int64(n)
if err != nil {
return err
return
}
b.n -= math.MaxInt32
}
n, err := b.rd.Discard(int(b.n))
n, err = b.rd.Discard(int(b.n))
b.n -= int64(n)
if err != nil {
return err
return
}
}
if b.n == 0 {
_, err := b.rd.Discard(1)
_, err = b.rd.Discard(1)
b.n--
return err
return
}
return
}

func (b *blobReader) finalizer() error {
if b == nil {
return nil
}
b.lock.Lock()
defer b.lock.Unlock()
if b.closed {
return nil
}
return nil

pid := ""
if b.repo.Ctx != nil {
pid = " from PID: " + string(process.GetPID(b.repo.Ctx))
}
log.Error("Finalizer running on unclosed blobReader%s: %s%s", pid, b.repo.Path)

return b.close()
}
48 changes: 44 additions & 4 deletions modules/git/repo_base_gogit.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ import (
"context"
"errors"
"path/filepath"
"runtime"
"sync"

"code.gitea.io/gitea/modules/log"
gitealog "code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/setting"

"github.com/go-git/go-billy/v5/osfs"
Expand All @@ -27,6 +31,9 @@ type Repository struct {

tagCache *ObjectCache

lock sync.Mutex
closed bool

gogitRepo *gogit.Repository
gogitStorage *filesystem.Storage
gpgSettings *GPGSettings
Expand Down Expand Up @@ -62,26 +69,59 @@ func OpenRepository(ctx context.Context, repoPath string) (*Repository, error) {
return nil, err
}

return &Repository{
repo := &Repository{
Path: repoPath,
gogitRepo: gogitRepo,
gogitStorage: storage,
tagCache: newObjectCache(),
Ctx: ctx,
}, nil
}

runtime.SetFinalizer(repo, (*Repository).finalizer)

return repo, nil
}

// Close this repository, in particular close the underlying gogitStorage if this is not nil
func (repo *Repository) Close() (err error) {
if repo == nil || repo.gogitStorage == nil {
if repo == nil {
return
}
repo.lock.Lock()
defer repo.lock.Unlock()
return repo.close()
}

func (repo *Repository) close() (err error) {
repo.closed = true
if repo.gogitStorage == nil {
return
}
if err := repo.gogitStorage.Close(); err != nil {
err = repo.gogitStorage.Close()
if err != nil {
gitealog.Error("Error closing storage: %v", err)
}
return
}

func (repo *Repository) finalizer() error {
if repo == nil {
return nil
}
repo.lock.Lock()
defer repo.lock.Unlock()
if !repo.closed {
pid := ""
if repo.Ctx != nil {
pid = " from PID: " + string(process.GetPID(repo.Ctx))
}
log.Error("Finalizer running on unclosed repository%s: %s%s", pid, repo.Path)
}

// We still need to run the close fn as it may be possible to reopen the gogitrepo after close
return repo.close()
}

// GoGitRepo gets the go-git repo representation
func (repo *Repository) GoGitRepo() *gogit.Repository {
return repo.gogitRepo
Expand Down
61 changes: 59 additions & 2 deletions modules/git/repo_base_nogogit.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ import (
"context"
"errors"
"path/filepath"
"runtime"
"sync"

"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/process"
)

// Repository represents a Git repository.
Expand All @@ -24,6 +27,10 @@ type Repository struct {

gpgSettings *GPGSettings

lock sync.Mutex

closed bool

batchCancel context.CancelFunc
batchReader *bufio.Reader
batchWriter WriteCloserError
Expand Down Expand Up @@ -63,29 +70,57 @@ func OpenRepository(ctx context.Context, repoPath string) (*Repository, error) {
repo.batchWriter, repo.batchReader, repo.batchCancel = CatFileBatch(ctx, repoPath)
repo.checkWriter, repo.checkReader, repo.checkCancel = CatFileBatchCheck(ctx, repo.Path)

runtime.SetFinalizer(repo, (*Repository).finalizer)

return repo, nil
}

// CatFileBatch obtains a CatFileBatch for this repository
func (repo *Repository) CatFileBatch(ctx context.Context) (WriteCloserError, *bufio.Reader, func()) {
if repo.batchCancel == nil || repo.batchReader.Buffered() > 0 {
repo.lock.Lock()
defer repo.lock.Unlock()

if repo.closed || repo.batchReader.Buffered() > 0 {
log.Debug("Opening temporary cat file batch for: %s", repo.Path)
return CatFileBatch(ctx, repo.Path)
}

if repo.batchCancel == nil {
repo.batchWriter, repo.batchReader, repo.batchCancel = CatFileBatch(ctx, repo.Path)
}

return repo.batchWriter, repo.batchReader, func() {}
}

// CatFileBatchCheck obtains a CatFileBatchCheck for this repository
func (repo *Repository) CatFileBatchCheck(ctx context.Context) (WriteCloserError, *bufio.Reader, func()) {
if repo.checkCancel == nil || repo.checkReader.Buffered() > 0 {
repo.lock.Lock()
defer repo.lock.Unlock()

if repo.closed || repo.checkReader.Buffered() > 0 {
log.Debug("Opening temporary cat file batch-check: %s", repo.Path)
return CatFileBatchCheck(ctx, repo.Path)
}

if repo.checkCancel == nil {
repo.checkWriter, repo.checkReader, repo.checkCancel = CatFileBatchCheck(ctx, repo.Path)
}

return repo.checkWriter, repo.checkReader, func() {}
}

// Close this repository, in particular close the underlying gogitStorage if this is not nil
func (repo *Repository) Close() (err error) {
if repo == nil {
return
}
repo.lock.Lock()
defer repo.lock.Unlock()

return repo.close()
}

func (repo *Repository) close() (err error) {
if repo == nil {
return
}
Expand All @@ -101,5 +136,27 @@ func (repo *Repository) Close() (err error) {
repo.checkReader = nil
repo.checkWriter = nil
}
repo.closed = true
runtime.SetFinalizer(repo, nil)
return
}

func (repo *Repository) finalizer() (err error) {
if repo == nil {
return
}
repo.lock.Lock()
defer repo.lock.Unlock()
if repo.closed {
return nil
}

if repo.batchCancel != nil || repo.checkCancel != nil {
pid := ""
if repo.Ctx != nil {
pid = " from PID: " + string(process.GetPID(repo.Ctx))
}
log.Error("Finalizer running on unclosed repository%s: %s%s", pid, repo.Path)
}
return repo.close()
}