Skip to content

Commit 16e91b1

Browse files
committed
added admin app: fixes
1 parent 0a56522 commit 16e91b1

File tree

2 files changed

+35
-10
lines changed

2 files changed

+35
-10
lines changed

admin/failover_reprocessor.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,6 @@ import (
55
"compress/gzip"
66
"context"
77
"fmt"
8-
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
9-
"github.com/jitsucom/bulker/jitsubase/jsonorder"
10-
"github.com/jitsucom/bulker/jitsubase/utils"
118
"io"
129
"os"
1310
"path/filepath"
@@ -18,6 +15,10 @@ import (
1815
"sync/atomic"
1916
"time"
2017

18+
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
19+
"github.com/jitsucom/bulker/jitsubase/jsonorder"
20+
"github.com/jitsucom/bulker/jitsubase/utils"
21+
2122
"github.com/aws/aws-sdk-go-v2/aws"
2223
awsconfig "github.com/aws/aws-sdk-go-v2/config"
2324
"github.com/aws/aws-sdk-go-v2/service/s3"
@@ -323,7 +324,7 @@ func (m *ReprocessingJobManager) processJob(job *ReprocessingJob) {
323324
m.Infof("Reached event limit of %d, stopping processing", job.Config.Limit)
324325
break
325326
}
326-
327+
327328
select {
328329
case <-job.ctx.Done():
329330
return
@@ -493,13 +494,13 @@ func (m *ReprocessingJobManager) processFile(job *ReprocessingJob, filePath stri
493494
}
494495

495496
lineNum++
496-
497+
497498
// Check if we've reached the limit
498499
if job.Config.Limit > 0 && atomic.LoadInt64(&job.SuccessCount) >= job.Config.Limit {
499500
m.Infof("Reached event limit of %d, stopping processing", job.Config.Limit)
500501
break
501502
}
502-
503+
503504
line := scanner.Bytes()
504505
// Parse message as IngestMessage
505506
var ingestMsg IngestMessage
@@ -564,7 +565,8 @@ func (m *ReprocessingJobManager) filterMessage(job *ReprocessingJob, ingestMsg *
564565

565566
// Filter by streamIds if specified
566567
if len(job.Config.StreamIds) > 0 {
567-
return utils.ArrayContains(job.Config.StreamIds, ingestMsg.Origin.SourceId)
568+
return utils.ArrayContains(job.Config.StreamIds, ingestMsg.Origin.SourceId) ||
569+
utils.ArrayContains(job.Config.StreamIds, ingestMsg.Origin.Slug)
568570
}
569571

570572
return true
@@ -583,7 +585,7 @@ func (m *ReprocessingJobManager) processBatch(job *ReprocessingJob, batch []*Ing
583585
batch = batch[:remaining]
584586
}
585587
}
586-
588+
587589
if job.Config.DryRun {
588590
// In dry run mode, just count the messages
589591
atomic.AddInt64(&job.SuccessCount, int64(len(batch)))

admin/router.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package main
33
import (
44
"github.com/gin-gonic/gin"
55
"github.com/jitsucom/bulker/jitsubase/appbase"
6+
"github.com/jitsucom/bulker/jitsubase/utils"
7+
68
"net/http"
79
)
810

@@ -15,7 +17,7 @@ type Router struct {
1517
func NewRouter(context *Context) *Router {
1618
base := appbase.NewRouterBase(context.config.Config, []string{
1719
"/health",
18-
"/admin",
20+
"/",
1921
"/static",
2022
})
2123
router := &Router{
@@ -40,9 +42,30 @@ func NewRouter(context *Context) *Router {
4042
reprocessingAPI.POST("/jobs/:id/cancel", router.cancelReprocessingJob)
4143

4244
// Serve HTML interface for admin (no auth required for UI, auth handled via form)
43-
engine.GET("/admin", router.serveAdminHTML)
45+
engine.GET("/", router.serveAdminHTML)
4446
engine.Static("/static", "./static")
4547

4648
return router
4749

4850
}
51+
func (r *Router) CorsMiddleware(c *gin.Context) {
52+
origin := c.GetHeader("Origin")
53+
if c.Request.Method == "OPTIONS" {
54+
c.Header("Access-Control-Allow-Origin", utils.NvlString(origin, "*"))
55+
c.Header("Access-Control-Allow-Methods", "GET,POST,HEAD,OPTIONS")
56+
// x-jitsu-custom - in case client want to add some custom payload via header
57+
c.Header("Access-Control-Allow-Headers", "x-enable-debug, x-write-key, authorization, content-type, x-ip-policy, cache-control, x-jitsu-custom")
58+
c.Header("Access-Control-Allow-Credentials", "true")
59+
c.Header("Access-Control-Max-Age", "86400")
60+
c.AbortWithStatus(http.StatusOK)
61+
return
62+
} else if origin != "" {
63+
c.Header("Access-Control-Allow-Origin", origin)
64+
c.Header("Access-Control-Allow-Methods", "GET,POST,HEAD,OPTIONS")
65+
// x-jitsu-custom - in case client want to add some custom payload via header
66+
c.Header("Access-Control-Allow-Headers", "x-enable-debug, x-write-key, authorization, content-type, x-ip-policy, cache-control, x-jitsu-custom")
67+
c.Header("Access-Control-Allow-Credentials", "true")
68+
c.Header("Access-Control-Max-Age", "86400")
69+
}
70+
c.Next()
71+
}

0 commit comments

Comments
 (0)