diff --git a/cmd/crowdsec/main.go b/cmd/crowdsec/main.go index 2ca229ed8c1..69da385f543 100644 --- a/cmd/crowdsec/main.go +++ b/cmd/crowdsec/main.go @@ -120,7 +120,7 @@ func LoadAcquisition(cConfig *csconfig.Config) ([]acquisition.DataSource, error) return nil, fmt.Errorf("failed to configure datasource for %s: %w", flags.OneShotDSN, err) } } else { - dataSources, err = acquisition.LoadAcquisitionFromFile(cConfig.Crowdsec, cConfig.Prometheus) + dataSources, err = acquisition.LoadAcquisitionFromFiles(cConfig.Crowdsec, cConfig.Prometheus) if err != nil { return nil, err } diff --git a/pkg/acquisition/acquisition.go b/pkg/acquisition/acquisition.go index 9f587da2e02..779a6e6fa09 100644 --- a/pkg/acquisition/acquisition.go +++ b/pkg/acquisition/acquisition.go @@ -216,113 +216,129 @@ func GetMetricsLevelFromPromCfg(prom *csconfig.PrometheusCfg) int { return configuration.METRICS_FULL } -// LoadAcquisitionFromFile unmarshals the configuration item and checks its availability -func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg, prom *csconfig.PrometheusCfg) ([]DataSource, error) { - var sources []DataSource - - metrics_level := GetMetricsLevelFromPromCfg(prom) - for _, acquisFile := range config.AcquisitionFiles { - log.Infof("loading acquisition file : %s", acquisFile) - - yamlFile, err := os.Open(acquisFile) - if err != nil { - return nil, err - } +// sourcesFromFile reads and parses one acquisition file into DataSources. +func sourcesFromFile(acquisFile string, metrics_level int) ([]DataSource, error) { + var sources []DataSource - defer yamlFile.Close() + log.Infof("loading acquisition file : %s", acquisFile) - acquisContent, err := io.ReadAll(yamlFile) - if err != nil { - return nil, fmt.Errorf("failed to read %s: %w", acquisFile, err) - } + yamlFile, err := os.Open(acquisFile) + if err != nil { + return nil, err + } - expandedAcquis := csstring.StrictExpand(string(acquisContent), os.LookupEnv) + defer yamlFile.Close() - dec := yaml.NewDecoder(strings.NewReader(expandedAcquis)) - dec.SetStrict(true) + acquisContent, err := io.ReadAll(yamlFile) + if err != nil { + return nil, fmt.Errorf("failed to read %s: %w", acquisFile, err) + } - idx := -1 + expandedAcquis := csstring.StrictExpand(string(acquisContent), os.LookupEnv) - for { - var sub configuration.DataSourceCommonCfg + dec := yaml.NewDecoder(strings.NewReader(expandedAcquis)) + dec.SetStrict(true) - idx += 1 + idx := -1 - err = dec.Decode(&sub) - if err != nil { - if !errors.Is(err, io.EOF) { - return nil, fmt.Errorf("failed to parse %s: %w", acquisFile, err) - } + for { + var sub configuration.DataSourceCommonCfg - log.Tracef("End of yaml file") + idx += 1 - break + err = dec.Decode(&sub) + if err != nil { + if !errors.Is(err, io.EOF) { + return nil, fmt.Errorf("failed to parse %s: %w", acquisFile, err) } - // for backward compat ('type' was not mandatory, detect it) - if guessType := detectBackwardCompatAcquis(sub); guessType != "" { - log.Debugf("datasource type missing in %s (position %d): detected 'source=%s'", acquisFile, idx, guessType) + log.Tracef("End of yaml file") - if sub.Source != "" && sub.Source != guessType { - log.Warnf("datasource type mismatch in %s (position %d): found '%s' but should probably be '%s'", acquisFile, idx, sub.Source, guessType) - } + break + } - sub.Source = guessType - } - // it's an empty item, skip it - if len(sub.Labels) == 0 { - if sub.Source == "" { - log.Debugf("skipping empty item in %s", acquisFile) - continue - } + // for backward compat ('type' was not mandatory, detect it) + if guessType := detectBackwardCompatAcquis(sub); guessType != "" { + log.Debugf("datasource type missing in %s (position %d): detected 'source=%s'", acquisFile, idx, guessType) - if sub.Source != "docker" { - // docker is the only source that can be empty - return nil, fmt.Errorf("missing labels in %s (position %d)", acquisFile, idx) - } + if sub.Source != "" && sub.Source != guessType { + log.Warnf("datasource type mismatch in %s (position %d): found '%s' but should probably be '%s'", acquisFile, idx, sub.Source, guessType) } + sub.Source = guessType + } + // it's an empty item, skip it + if len(sub.Labels) == 0 { if sub.Source == "" { - return nil, fmt.Errorf("data source type is empty ('source') in %s (position %d)", acquisFile, idx) + log.Debugf("skipping empty item in %s", acquisFile) + continue } - // pre-check that the source is valid - _, err := GetDataSourceIface(sub.Source) - if err != nil { - return nil, fmt.Errorf("in file %s (position %d) - %w", acquisFile, idx, err) + if sub.Source != "docker" { + // docker is the only source that can be empty + return nil, fmt.Errorf("missing labels in %s (position %d)", acquisFile, idx) } + } - uniqueId := uuid.NewString() - sub.UniqueId = uniqueId + if sub.Source == "" { + return nil, fmt.Errorf("data source type is empty ('source') in %s (position %d)", acquisFile, idx) + } - src, err := DataSourceConfigure(sub, metrics_level) - if err != nil { - var dserr *DataSourceUnavailableError - if errors.As(err, &dserr) { - log.Error(err) - continue - } + // pre-check that the source is valid + _, err := GetDataSourceIface(sub.Source) + if err != nil { + return nil, fmt.Errorf("in file %s (position %d) - %w", acquisFile, idx, err) + } - return nil, fmt.Errorf("while configuring datasource of type %s from %s (position %d): %w", sub.Source, acquisFile, idx, err) + uniqueId := uuid.NewString() + sub.UniqueId = uniqueId + + src, err := DataSourceConfigure(sub, metrics_level) + if err != nil { + var dserr *DataSourceUnavailableError + if errors.As(err, &dserr) { + log.Error(err) + continue } - if sub.TransformExpr != "" { - vm, err := expr.Compile(sub.TransformExpr, exprhelpers.GetExprOptions(map[string]interface{}{"evt": &types.Event{}})...) - if err != nil { - return nil, fmt.Errorf("while compiling transform expression '%s' for datasource %s in %s (position %d): %w", sub.TransformExpr, sub.Source, acquisFile, idx, err) - } + return nil, fmt.Errorf("while configuring datasource of type %s from %s (position %d): %w", sub.Source, acquisFile, idx, err) + } - transformRuntimes[uniqueId] = vm + if sub.TransformExpr != "" { + vm, err := expr.Compile(sub.TransformExpr, exprhelpers.GetExprOptions(map[string]interface{}{"evt": &types.Event{}})...) + if err != nil { + return nil, fmt.Errorf("while compiling transform expression '%s' for datasource %s in %s (position %d): %w", sub.TransformExpr, sub.Source, acquisFile, idx, err) } - sources = append(sources, src) + transformRuntimes[uniqueId] = vm } + + sources = append(sources, src) } return sources, nil } + +// LoadAcquisitionFromFiles unmarshals the configuration item and checks its availability +func LoadAcquisitionFromFiles(config *csconfig.CrowdsecServiceCfg, prom *csconfig.PrometheusCfg) ([]DataSource, error) { + var allSources []DataSource + + metrics_level := GetMetricsLevelFromPromCfg(prom) + + for _, acquisFile := range config.AcquisitionFiles { + sources, err := sourcesFromFile(acquisFile, metrics_level) + if err != nil { + return nil, err + } + + allSources = append(allSources, sources...) + } + + return allSources, nil +} + func GetMetrics(sources []DataSource, aggregated bool) error { var metrics []prometheus.Collector diff --git a/pkg/acquisition/acquisition_test.go b/pkg/acquisition/acquisition_test.go index c0927c6d8fe..037cc87c64a 100644 --- a/pkg/acquisition/acquisition_test.go +++ b/pkg/acquisition/acquisition_test.go @@ -215,7 +215,7 @@ wowo: ajsajasjas } } -func TestLoadAcquisitionFromFile(t *testing.T) { +func TestLoadAcquisitionFromFiles(t *testing.T) { appendMockSource() t.Setenv("TEST_ENV", "test_value2") @@ -293,7 +293,7 @@ func TestLoadAcquisitionFromFile(t *testing.T) { } for _, tc := range tests { t.Run(tc.TestName, func(t *testing.T) { - dss, err := LoadAcquisitionFromFile(&tc.Config, nil) + dss, err := LoadAcquisitionFromFiles(&tc.Config, nil) cstest.RequireErrorContains(t, err, tc.ExpectedError) if tc.ExpectedError != "" {