Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions services/actions/job_emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,24 @@ func checkJobsOfCurrentRunAttempt(ctx context.Context, run *actions_model.Action
if err != nil {
return nil, nil, nil, err
}
// The resolver below only considers needs and job-level concurrency, so a run blocked
// solely by run-level concurrency would have its jobs unblocked here. checkRunConcurrency
// re-evaluates when the holding run finishes.
if run.Status.IsBlocked() {
attempt, has, err := run.GetLatestAttempt(ctx)
if err != nil {
return nil, nil, nil, fmt.Errorf("GetLatestAttempt: %w", err)
}
if has {
shouldBlock, err := shouldBlockRunByConcurrency(ctx, attempt)
if err != nil {
return nil, nil, nil, fmt.Errorf("shouldBlockRunByConcurrency: %w", err)
}
if shouldBlock {
return jobs, nil, nil, nil
}
}
Comment thread
silverwind marked this conversation as resolved.
}
vars, err := actions_model.GetVariablesOfRun(ctx, run)
if err != nil {
return nil, nil, nil, err
Expand Down
65 changes: 65 additions & 0 deletions tests/integration/actions_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/models/unittest"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
api "code.gitea.io/gitea/modules/structs"
"code.gitea.io/gitea/modules/timeutil"
Expand Down Expand Up @@ -1219,6 +1220,70 @@ jobs:
})
}

// TestScheduleConcurrencyBlockedRunStaysBlocked verifies that a run blocked solely by
// run-level concurrency is not unblocked by the job-status resolver while another run
// still holds the same concurrency group.
func TestScheduleConcurrencyBlockedRunStaysBlocked(t *testing.T) {
onGiteaRun(t, func(t *testing.T, u *url.URL) {
user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})
session := loginUser(t, user2.Name)
token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser)

apiRepo := createActionsTestRepo(t, token, "actions-concurrency", false)
repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID})
httpContext := NewAPITestContext(t, user2.Name, repo.Name, auth_model.AccessTokenScopeWriteRepository)
defer doAPIDeleteRepository(httpContext)(t)

runner := newMockRunner()
runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"}, false)

wfTreePath := ".gitea/workflows/schedule-concurrency.yml"
wfFileContent := `name: schedule-concurrency
on:
push:
schedule:
- cron: '@every 1m'
concurrency:
group: schedule-concurrency
cancel-in-progress: ${{ gitea.event_name == 'push' }}
jobs:
job:
runs-on: ubuntu-latest
steps:
- run: echo 'schedule workflow'
`
opts := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wfTreePath, wfFileContent)
createWorkflowFile(t, token, user2.Name, repo.Name, wfTreePath, opts)

// Push run picks up and holds the "schedule-concurrency" group while it runs.
_ = runner.fetchTask(t)
Comment thread
silverwind marked this conversation as resolved.
Outdated

fireSchedule := func() {
t.Helper()
spec := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionScheduleSpec{RepoID: repo.ID})
spec.Next = timeutil.TimeStampNow()
require.NoError(t, actions_model.UpdateScheduleSpec(t.Context(), spec, "next"))
require.NoError(t, actions_service.StartScheduleTasks(t.Context()))
}

// Two schedule triggers: the first creates a Blocked run; the second creates another
// Blocked run and cancels the first (cancel-in-progress is false for schedule, so
// only Waiting/Blocked are cancelled). The cancellation queues EmitJobsIfReadyByRun
// and drives the resolver against the surviving Blocked run.
fireSchedule()
fireSchedule()
require.NoError(t, queue.GetManager().FlushAll(t.Context(), 15*time.Second))

blockedRuns, err := db.Find[actions_model.ActionRun](t.Context(), actions_model.FindRunOptions{
RepoID: repo.ID,
TriggerEvent: webhook_module.HookEventSchedule,
Status: []actions_model.Status{actions_model.StatusBlocked},
})
require.NoError(t, err)
require.Len(t, blockedRuns, 1)
})
}

func TestWorkflowAndJobConcurrency(t *testing.T) {
onGiteaRun(t, func(t *testing.T, u *url.URL) {
user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})
Expand Down