Skip to content

Commit 77c0db4

Browse files
committed
tweaks to the dispatcher and session entity queue
1 parent 7a044de commit 77c0db4

File tree

2 files changed

+9
-7
lines changed

2 files changed

+9
-7
lines changed

engine/dispatcher/dispatcher.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (d *dis) DispatchEvent(e *et.Event) error {
7373
}
7474

7575
func (d *dis) maintainPipelines() {
76-
ctick := time.NewTimer(5 * time.Second)
76+
ctick := time.NewTimer(time.Second)
7777
defer ctick.Stop()
7878
mtick := time.NewTimer(10 * time.Second)
7979
defer mtick.Stop()
@@ -87,7 +87,7 @@ loop:
8787
mtick.Reset(10 * time.Second)
8888
case <-ctick.C:
8989
d.fillPipelineQueues()
90-
ctick.Reset(5 * time.Second)
90+
ctick.Reset(time.Second)
9191
case e := <-d.dchan:
9292
if err := d.safeDispatch(e); err != nil {
9393
d.logger.Error(fmt.Sprintf("Failed to dispatch event: %s", err.Error()))
@@ -172,8 +172,8 @@ func (d *dis) completedCallback(data interface{}) {
172172
}
173173

174174
func (d *dis) safeDispatch(e *et.Event) error {
175-
ap, err := d.reg.GetPipeline(e.Entity.Asset.AssetType())
176-
if err != nil {
175+
// there is not need to dispatch the event if there's no associated asset pipeline
176+
if ap, err := d.reg.GetPipeline(e.Entity.Asset.AssetType()); err != nil || ap == nil {
177177
return err
178178
}
179179

@@ -182,7 +182,7 @@ func (d *dis) safeDispatch(e *et.Event) error {
182182
return nil
183183
}
184184

185-
err = e.Session.Queue().Append(e.Entity)
185+
err := e.Session.Queue().Append(e.Entity)
186186
if err != nil {
187187
return err
188188
}
@@ -194,7 +194,7 @@ func (d *dis) safeDispatch(e *et.Event) error {
194194
stats.Unlock()
195195
}
196196

197-
if qlen := ap.Queue.Len(); e.Meta != nil || qlen < MinPipelineQueueSize {
197+
if e.Meta != nil {
198198
if err := d.appendToPipeline(e); err != nil {
199199
d.logger.Error(fmt.Sprintf("Failed to append to a data pipeline: %s", err.Error()))
200200
return err

engine/sessions/queuedb/queue_db.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ type Element struct {
2828

2929
func NewQueueDB(dbPath string) (*QueueDB, error) {
3030
db, err := gorm.Open(sqlite.Open(dbPath), &gorm.Config{
31-
Logger: logger.Default.LogMode(logger.Silent),
31+
PrepareStmt: false,
32+
SkipDefaultTransaction: true,
33+
Logger: logger.Default.LogMode(logger.Silent),
3234
})
3335
if err != nil {
3436
return nil, err

0 commit comments

Comments
 (0)