Skip to content

Commit c2a340a

Browse files
committed
fix
1 parent 2912174 commit c2a340a

2 files changed

Lines changed: 60 additions & 52 deletions

File tree

modules/git/catfile_batch_reader.go

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,40 +16,52 @@ import (
1616

1717
"code.gitea.io/gitea/modules/git/gitcmd"
1818
"code.gitea.io/gitea/modules/log"
19+
"code.gitea.io/gitea/modules/util"
1920
)
2021

21-
var catFileBatchDebugPipeClose atomic.Pointer[func(stdPipeClose func())]
22-
2322
type catFileBatchCommunicator struct {
24-
closeFunc func(err error)
23+
closeFunc atomic.Pointer[func(err error)]
2524
reqWriter io.Writer
2625
respReader *bufio.Reader
2726
debugGitCmd *gitcmd.Command
2827
}
2928

30-
func (b *catFileBatchCommunicator) Close() {
31-
if b.closeFunc != nil {
32-
b.closeFunc(nil)
33-
b.closeFunc = nil
29+
func (b *catFileBatchCommunicator) Close(err ...error) {
30+
if fn := b.closeFunc.Load(); fn != nil {
31+
(*fn)(util.OptionalArg(err))
32+
b.closeFunc.Store(nil)
3433
}
3534
}
3635

36+
func (b *catFileBatchCommunicator) debugKill() (ret struct {
37+
beforeClose chan struct{}
38+
blockClose chan struct{}
39+
afterClose chan struct{}
40+
}) {
41+
ret.beforeClose = make(chan struct{})
42+
ret.blockClose = make(chan struct{})
43+
ret.afterClose = make(chan struct{})
44+
oldCloseFunc := b.closeFunc.Load()
45+
b.closeFunc.Store(new(func(err error) {
46+
b.closeFunc.Store(oldCloseFunc)
47+
close(ret.beforeClose)
48+
<-ret.blockClose
49+
(*oldCloseFunc)(err)
50+
close(ret.afterClose)
51+
}))
52+
b.debugGitCmd.DebugKill()
53+
return ret
54+
}
55+
3756
// newCatFileBatch opens git cat-file --batch in the provided repo and returns a stdin pipe, a stdout reader and cancel function
3857
func newCatFileBatch(ctx context.Context, repoPath string, cmdCatFile *gitcmd.Command) (ret *catFileBatchCommunicator) {
3958
ctx, ctxCancel := context.WithCancelCause(ctx)
4059

4160
// We often want to feed the commits in order into cat-file --batch, followed by their trees and subtrees as necessary.
4261
stdinWriter, stdoutReader, stdPipeClose := cmdCatFile.MakeStdinStdoutPipe()
43-
pipeClose := func() {
44-
if fn := catFileBatchDebugPipeClose.Load(); fn != nil {
45-
(*fn)(stdPipeClose)
46-
} else {
47-
stdPipeClose()
48-
}
49-
}
5062
closeFunc := func(err error) {
5163
ctxCancel(err)
52-
pipeClose()
64+
stdPipeClose()
5365
}
5466
return newCatFileBatchWithCloseFunc(ctx, repoPath, cmdCatFile, stdinWriter, stdoutReader, closeFunc)
5567
}
@@ -59,17 +71,17 @@ func newCatFileBatchWithCloseFunc(ctx context.Context, repoPath string, cmdCatFi
5971
) *catFileBatchCommunicator {
6072
ret := &catFileBatchCommunicator{
6173
debugGitCmd: cmdCatFile,
62-
closeFunc: closeFunc,
6374
reqWriter: stdinWriter,
6475
respReader: bufio.NewReaderSize(stdoutReader, 32*1024), // use a buffered reader for rich operations
6576
}
77+
ret.closeFunc.Store(&closeFunc)
6678

6779
err := cmdCatFile.WithDir(repoPath).StartWithStderr(ctx)
6880
if err != nil {
6981
log.Error("Unable to start git command %v: %v", cmdCatFile.LogString(), err)
7082
// ideally here it should return the error, but it would require refactoring all callers
7183
// so just return a dummy communicator that does nothing, almost the same behavior as before, not bad
72-
closeFunc(err)
84+
ret.Close(err)
7385
return ret
7486
}
7587

@@ -78,7 +90,7 @@ func newCatFileBatchWithCloseFunc(ctx context.Context, repoPath string, cmdCatFi
7890
if err != nil && !errors.Is(err, context.Canceled) {
7991
log.Error("cat-file --batch command failed in repo %s, error: %v", repoPath, err)
8092
}
81-
closeFunc(err)
93+
ret.Close(err)
8294
}()
8395

8496
return ret

modules/git/catfile_batch_test.go

Lines changed: 30 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -37,53 +37,49 @@ func testCatFileBatch(t *testing.T) {
3737
require.Error(t, err)
3838
})
3939

40-
setupTerminatedBatch := func(t *testing.T) (*catFileBatchCommunicator, func()) {
41-
t.Helper()
40+
simulateQueryTerminated := func(t *testing.T, errBeforePipeClose, errAfterPipeClose error) {
41+
readError := func(t *testing.T, r io.Reader, expectedErr error) {
42+
if expectedErr == nil {
43+
return // expectedErr == nil means this read should be skipped
44+
}
45+
n, err := r.Read(make([]byte, 100))
46+
assert.Zero(t, n)
47+
assert.ErrorIs(t, err, expectedErr)
48+
}
49+
4250
batch, err := NewBatch(t.Context(), filepath.Join(testReposDir, "repo1_bare"))
4351
require.NoError(t, err)
44-
_, _ = batch.QueryInfo("e2129701f1a4d54dc44f03c93bca0a2aec7c5449")
45-
var c *catFileBatchCommunicator
52+
_, err = batch.QueryInfo("e2129701f1a4d54dc44f03c93bca0a2aec7c5449")
53+
require.NoError(t, err)
54+
55+
var batchComm *catFileBatchCommunicator
4656
switch b := batch.(type) {
4757
case *catFileBatchLegacy:
48-
c = b.batchCheck
49-
_, _ = c.reqWriter.Write([]byte("in-complete-line-"))
58+
batchComm = b.batchCheck
59+
_, _ = batchComm.reqWriter.Write([]byte("in-complete-line-"))
5060
case *catFileBatchCommand:
51-
c = b.batch
52-
_, _ = c.reqWriter.Write([]byte("info"))
61+
batchComm = b.batch
62+
_, _ = batchComm.reqWriter.Write([]byte("info"))
5363
default:
5464
t.FailNow()
5565
}
56-
return c, func() { batch.Close() }
57-
}
66+
defer batchComm.Close()
67+
68+
require.True(t, (errBeforePipeClose != nil) != (errAfterPipeClose != nil), "must set exactly one of the expected errors")
5869

70+
inceptor := batchComm.debugKill()
71+
<-inceptor.beforeClose // wait for the command's Close to be called, the pipe is not closed yet
72+
readError(t, batchComm.respReader, errBeforePipeClose) // then caller will read on an open pipe which will be closed soon
73+
close(inceptor.blockClose) // continue to close the pipe
74+
<-inceptor.afterClose // wait for the pipe to be closed
75+
readError(t, batchComm.respReader, errAfterPipeClose) // then caller will read on a closed pipe
76+
}
5977
t.Run("QueryTerminated", func(t *testing.T) {
6078
t.Run("PipeClosedBeforeRead", func(t *testing.T) {
61-
closed := make(chan struct{})
62-
hook := func(doClose func()) { doClose(); close(closed) }
63-
c, cleanup := setupTerminatedBatch(t)
64-
defer cleanup()
65-
catFileBatchDebugPipeClose.Store(&hook)
66-
defer catFileBatchDebugPipeClose.Store(nil)
67-
c.debugGitCmd.DebugKill()
68-
<-closed
69-
n, err := c.respReader.Read(make([]byte, 100))
70-
assert.Zero(t, n)
71-
assert.ErrorIs(t, err, os.ErrClosed)
79+
simulateQueryTerminated(t, io.EOF, nil)
7280
})
7381
t.Run("ReadBeforePipeClose", func(t *testing.T) {
74-
ready := make(chan struct{})
75-
proceed := make(chan struct{})
76-
hook := func(doClose func()) { close(ready); <-proceed; doClose() }
77-
c, cleanup := setupTerminatedBatch(t)
78-
defer cleanup()
79-
catFileBatchDebugPipeClose.Store(&hook)
80-
defer catFileBatchDebugPipeClose.Store(nil)
81-
c.debugGitCmd.DebugKill()
82-
<-ready
83-
n, err := c.respReader.Read(make([]byte, 100))
84-
assert.Zero(t, n)
85-
assert.ErrorIs(t, err, io.EOF)
86-
close(proceed)
82+
simulateQueryTerminated(t, nil, os.ErrClosed)
8783
})
8884
})
8985

0 commit comments

Comments
 (0)