Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 60 additions & 3 deletions models/actions/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,63 @@ func UpdateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) err
return err
}

// GetCommitStatusEventNameAndCommitID returns the event name and commit SHA used when creating commit statuses for this run.
// Unsupported events return empty values.
func (run *ActionRun) GetCommitStatusEventNameAndCommitID() (event, commitID string, _ error) {
switch run.Event {
case webhook_module.HookEventPush:
event = "push"
payload, err := run.GetPushEventPayload()
if err != nil {
return "", "", fmt.Errorf("GetPushEventPayload: %w", err)
}
if payload.HeadCommit == nil {
return "", "", errors.New("head commit is missing in event payload")
}
commitID = payload.HeadCommit.ID
case webhook_module.HookEventPullRequest,
webhook_module.HookEventPullRequestSync,
webhook_module.HookEventPullRequestAssign,
webhook_module.HookEventPullRequestLabel,
webhook_module.HookEventPullRequestReviewRequest,
webhook_module.HookEventPullRequestMilestone:
if run.TriggerEvent == "pull_request_target" {
event = "pull_request_target"
} else {
event = "pull_request"
}
payload, err := run.GetPullRequestEventPayload()
if err != nil {
return "", "", fmt.Errorf("GetPullRequestEventPayload: %w", err)
}
if payload.PullRequest == nil {
return "", "", errors.New("pull request is missing in event payload")
} else if payload.PullRequest.Head == nil {
return "", "", errors.New("head of pull request is missing in event payload")
}
commitID = payload.PullRequest.Head.Sha
case webhook_module.HookEventPullRequestReviewApproved,
webhook_module.HookEventPullRequestReviewRejected,
webhook_module.HookEventPullRequestReviewComment:
event = run.TriggerEvent
payload, err := run.GetPullRequestEventPayload()
if err != nil {
return "", "", fmt.Errorf("GetPullRequestEventPayload: %w", err)
}
if payload.PullRequest == nil {
return "", "", errors.New("pull request is missing in event payload")
} else if payload.PullRequest.Head == nil {
return "", "", errors.New("head of pull request is missing in event payload")
}
commitID = payload.PullRequest.Head.Sha
case webhook_module.HookEventRelease:
event = string(run.Event)
commitID = run.CommitSHA
default:
}
return event, commitID, nil
}

// CancelPreviousJobs cancels all previous jobs of the same repository, reference, workflow, and event.
// It's useful when a new run is triggered, and all previous runs needn't be continued anymore.
func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) ([]*ActionRunJob, error) {
Expand Down Expand Up @@ -322,16 +379,16 @@ func GetRunByRepoAndID(ctx context.Context, repoID, runID int64) (*ActionRun, er
return &run, nil
}

func GetRunByIndex(ctx context.Context, repoID, index int64) (*ActionRun, error) {
func GetRunByRepoAndIndex(ctx context.Context, repoID, runIndex int64) (*ActionRun, error) {
run := &ActionRun{
RepoID: repoID,
Index: index,
Index: runIndex,
}
has, err := db.GetEngine(ctx).Get(run)
if err != nil {
return nil, err
} else if !has {
return nil, fmt.Errorf("run with index %d %d: %w", repoID, index, util.ErrNotExist)
return nil, fmt.Errorf("run with repo_id %d and index %d: %w", repoID, runIndex, util.ErrNotExist)
}

return run, nil
Expand Down
2 changes: 1 addition & 1 deletion models/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func prepareMigrationTasks() []*migration {
newMigration(323, "Add support for actions concurrency", v1_26.AddActionsConcurrency),
newMigration(324, "Fix closed milestone completeness for milestones with no issues", v1_26.FixClosedMilestoneCompleteness),
newMigration(325, "Fix missed repo_id when migrate attachments", v1_26.FixMissedRepoIDWhenMigrateAttachments),
newMigration(326, "Migrate commit status target URL to use run ID and job ID", v1_26.FixCommitStatusTargetURLToUseRunAndJobID),
newMigration(326, "Partially migrate commit status target URL to use run ID and job ID", v1_26.FixCommitStatusTargetURLToUseRunAndJobID),
newMigration(327, "Add disabled state to action runners", v1_26.AddDisabledToActionRunner),
newMigration(328, "Add TokenPermissions column to ActionRunJob", v1_26.AddTokenPermissionsToActionRunJob),
newMigration(329, "Add unique constraint for user badge", v1_26.AddUniqueIndexForUserBadge),
Expand Down
230 changes: 131 additions & 99 deletions models/migrations/v1_26/v326.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,21 @@ import (
"strconv"
"strings"

actions_model "code.gitea.io/gitea/models/actions"
Comment thread
Zettat123 marked this conversation as resolved.
Outdated
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
webhook_module "code.gitea.io/gitea/modules/webhook"

"xorm.io/xorm"
)

const actionsRunPath = "/actions/runs/"
const (
actionsRunPath = "/actions/runs/"

// Only commit status target URLs whose resolved run ID is smaller than this
// threshold are rewritten by this partial migration.
legacyURLIDThreshold int64 = 1000
)

type migrationRepository struct {
ID int64
Expand All @@ -24,9 +32,13 @@ type migrationRepository struct {
}

type migrationActionRun struct {
ID int64
RepoID int64
Index int64
ID int64
RepoID int64
Index int64
CommitSHA string `xorm:"commit_sha"`
Event webhook_module.HookEventType
TriggerEvent string
EventPayload string
}

type migrationActionRunJob struct {
Expand All @@ -40,93 +52,142 @@ type migrationCommitStatus struct {
TargetURL string
}

type commitSHAAndRuns struct {
commitSHA string
runs map[int64]*migrationActionRun
}

// FixCommitStatusTargetURLToUseRunAndJobID partially migrates legacy Actions
// commit status target URLs to the new run/job ID-based form.
//
// Only rows whose resolved run ID is below legacyURLIDThreshold are rewritten.
// This is because smaller legacy run indexes are more likely to collide with run ID URLs during runtime resolution,
// so this migration prioritizes that lower range and leaves the remaining legacy target URLs to the web compatibility logic.
func FixCommitStatusTargetURLToUseRunAndJobID(x *xorm.Engine) error {
runByIndexCache := make(map[int64]map[int64]*migrationActionRun)
jobsByRunIDCache := make(map[int64][]int64)
repoLinkCache := make(map[int64]string)

if err := migrateCommitStatusTargetURL(x, "commit_status", runByIndexCache, jobsByRunIDCache, repoLinkCache); err != nil {
groups, err := loadLegacyMigrationRunGroups(x)
if err != nil {
return err
}
return migrateCommitStatusTargetURL(x, "commit_status_summary", runByIndexCache, jobsByRunIDCache, repoLinkCache)

for repoID, groupsBySHA := range groups {
for _, group := range groupsBySHA {
if err := migrateCommitStatusTargetURLForGroup(x, "commit_status", repoID, group.commitSHA, group.runs, jobsByRunIDCache, repoLinkCache); err != nil {
return err
}
if err := migrateCommitStatusTargetURLForGroup(x, "commit_status_summary", repoID, group.commitSHA, group.runs, jobsByRunIDCache, repoLinkCache); err != nil {
return err
}
}
}
return nil
}

func loadLegacyMigrationRunGroups(x *xorm.Engine) (map[int64]map[string]*commitSHAAndRuns, error) {
var runs []migrationActionRun
if err := x.Table("action_run").
Where("id < ?", legacyURLIDThreshold).
Cols("id", "repo_id", "`index`", "commit_sha", "event", "trigger_event", "event_payload").
Find(&runs); err != nil {
return nil, fmt.Errorf("query action_run: %w", err)
}

groups := make(map[int64]map[string]*commitSHAAndRuns)
for i := range runs {
run := runs[i]
_, commitID, err := (&actions_model.ActionRun{
ID: run.ID,
RepoID: run.RepoID,
Index: run.Index,
CommitSHA: run.CommitSHA,
Event: run.Event,
TriggerEvent: run.TriggerEvent,
EventPayload: run.EventPayload,
}).GetCommitStatusEventNameAndCommitID() // get the commitID used for creating commit statuses
if err != nil {
log.Warn("skip action_run id=%d when resolving commit status commit SHA: %v", run.ID, err)
continue
}
if commitID == "" {
// empty commitID means the run didn't create any commit status records, just skip
continue
}
if groups[run.RepoID] == nil {
groups[run.RepoID] = make(map[string]*commitSHAAndRuns)
}
if groups[run.RepoID][commitID] == nil {
groups[run.RepoID][commitID] = &commitSHAAndRuns{
commitSHA: commitID,
runs: make(map[int64]*migrationActionRun),
}
}
groups[run.RepoID][commitID].runs[run.Index] = &run
}
return groups, nil
}

func migrateCommitStatusTargetURL(
func migrateCommitStatusTargetURLForGroup(
x *xorm.Engine,
table string,
runByIndexCache map[int64]map[int64]*migrationActionRun,
repoID int64,
sha string,
runs map[int64]*migrationActionRun,
jobsByRunIDCache map[int64][]int64,
repoLinkCache map[int64]string,
) error {
const batchSize = 500
var lastID int64

for {
var rows []migrationCommitStatus
sess := x.Table(table).
Where("target_url LIKE ?", "%"+actionsRunPath+"%").
And("id > ?", lastID).
Asc("id").
Limit(batchSize)
if err := sess.Find(&rows); err != nil {
return fmt.Errorf("query %s: %w", table, err)
}
if len(rows) == 0 {
return nil
}

for _, row := range rows {
lastID = row.ID
if row.TargetURL == "" {
continue
}
var rows []migrationCommitStatus
if err := x.Table(table).
Where("repo_id = ?", repoID).
And("sha = ?", sha).
And("target_url LIKE ?", "%"+actionsRunPath+"%").
Comment thread
Zettat123 marked this conversation as resolved.
Outdated
Cols("id", "repo_id", "target_url").
Find(&rows); err != nil {
return fmt.Errorf("query %s for repo_id=%d sha=%s: %w", table, repoID, sha, err)
}

repoLink, err := getRepoLinkCached(x, repoLinkCache, row.RepoID)
if err != nil || repoLink == "" {
if err != nil {
log.Warn("convert %s id=%d getRepoLinkCached: %v", table, row.ID, err)
} else {
log.Warn("convert %s id=%d: repo=%d not found", table, row.ID, row.RepoID)
}
continue
for _, row := range rows {
repoLink, err := getRepoLinkCached(x, repoLinkCache, row.RepoID)
if err != nil || repoLink == "" {
if err != nil {
log.Warn("convert %s id=%d getRepoLinkCached: %v", table, row.ID, err)
} else {
log.Warn("convert %s id=%d: repo=%d not found", table, row.ID, row.RepoID)
}
continue
}

runNum, jobNum, ok := parseTargetURL(row.TargetURL, repoLink)
if !ok {
continue
}
runNum, jobNum, ok := parseTargetURL(row.TargetURL, repoLink)
if !ok {
continue
}

run, err := getRunByIndexCached(x, runByIndexCache, row.RepoID, runNum)
if err != nil || run == nil {
if err != nil {
log.Warn("convert %s id=%d getRunByIndexCached: %v", table, row.ID, err)
} else {
log.Warn("convert %s id=%d: run not found for repo_id=%d run_index=%d", table, row.ID, row.RepoID, runNum)
}
continue
}
run, ok := runs[runNum]
if !ok {
continue
}

jobID, ok, err := getJobIDByIndexCached(x, jobsByRunIDCache, run.ID, jobNum)
if err != nil || !ok {
if err != nil {
log.Warn("convert %s id=%d getJobIDByIndexCached: %v", table, row.ID, err)
} else {
log.Warn("convert %s id=%d: job not found for run_id=%d job_index=%d", table, row.ID, run.ID, jobNum)
}
continue
jobID, ok, err := getJobIDByIndexCached(x, jobsByRunIDCache, run.ID, jobNum)
if err != nil || !ok {
if err != nil {
log.Warn("convert %s id=%d getJobIDByIndexCached: %v", table, row.ID, err)
} else {
log.Warn("convert %s id=%d: job not found for run_id=%d job_index=%d", table, row.ID, run.ID, jobNum)
}
continue
}

oldURL := row.TargetURL
newURL := fmt.Sprintf("%s%s%d/jobs/%d", repoLink, actionsRunPath, run.ID, jobID) // expect: {repo_link}/actions/runs/{run_id}/jobs/{job_id}
if oldURL == newURL {
continue
}
oldURL := row.TargetURL
newURL := fmt.Sprintf("%s%s%d/jobs/%d", repoLink, actionsRunPath, run.ID, jobID)
if oldURL == newURL {
continue
}

if _, err := x.Table(table).ID(row.ID).Cols("target_url").Update(&migrationCommitStatus{TargetURL: newURL}); err != nil {
return fmt.Errorf("update %s id=%d target_url from %s to %s: %w", table, row.ID, oldURL, newURL, err)
}
if _, err := x.Table(table).ID(row.ID).Cols("target_url").Update(&migrationCommitStatus{TargetURL: newURL}); err != nil {
return fmt.Errorf("update %s id=%d target_url from %s to %s: %w", table, row.ID, oldURL, newURL, err)
}
}
return nil
}

func getRepoLinkCached(x *xorm.Engine, cache map[int64]string, repoID int64) (string, error) {
Expand All @@ -147,35 +208,6 @@ func getRepoLinkCached(x *xorm.Engine, cache map[int64]string, repoID int64) (st
return link, nil
}

func getRunByIndexCached(x *xorm.Engine, cache map[int64]map[int64]*migrationActionRun, repoID, runIndex int64) (*migrationActionRun, error) {
if repoCache, ok := cache[repoID]; ok {
if run, ok := repoCache[runIndex]; ok {
if run == nil {
return nil, fmt.Errorf("run repo_id=%d run_index=%d not found", repoID, runIndex)
}
return run, nil
}
}

var run migrationActionRun
has, err := x.Table("action_run").Where("repo_id=? AND `index`=?", repoID, runIndex).Get(&run)
if err != nil {
return nil, err
}
if !has {
if cache[repoID] == nil {
cache[repoID] = make(map[int64]*migrationActionRun)
}
cache[repoID][runIndex] = nil
return nil, fmt.Errorf("run repo_id=%d run_index=%d not found", repoID, runIndex)
}
if cache[repoID] == nil {
cache[repoID] = make(map[int64]*migrationActionRun)
}
cache[repoID][runIndex] = &run
return &run, nil
}

func getJobIDByIndexCached(x *xorm.Engine, cache map[int64][]int64, runID, jobIndex int64) (int64, bool, error) {
jobIDs, ok := cache[runID]
if !ok {
Expand All @@ -202,7 +234,7 @@ func parseTargetURL(targetURL, repoLink string) (runNum, jobNum int64, ok bool)
}
rest := targetURL[len(prefix):]

parts := strings.Split(rest, "/") // expect: {run_num}/jobs/{job_num}
parts := strings.Split(rest, "/")
if len(parts) == 3 && parts[1] == "jobs" {
runNum, err1 := strconv.ParseInt(parts[0], 10, 64)
jobNum, err2 := strconv.ParseInt(parts[2], 10, 64)
Expand Down
Loading
Loading