Skip to content

feat(connectors): support Elasticsearch sink and source connectors #1872

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: master
Choose a base branch
from

Conversation

kaori-seasons
Copy link

@kaori-seasons kaori-seasons commented Jun 17, 2025

Iggy Elasticsearch Connector

Related to ISSUE#1851
An Elasticsearch connector for Apache Iggy providing bidirectional data synchronization capabilities.

Features

  • Elasticsearch Sink: Consumes messages from Iggy and writes them to Elasticsearch
  • Elasticsearch Source: Reads data from Elasticsearch and sends it to Iggy
  • Batch Processing: Supports batch read/write operations for performance optimization
  • Incremental Sync: Timestamp-based incremental data synchronization
  • Error Handling: Comprehensive retry mechanisms and error handling
  • State Management: Supports checkpoint recovery and state persistence

Quick Start

Installation

cargo add iggy-elasticsearch-connector
Sink Usage Example

use iggy_elasticsearch_connector::{ElasticsearchSink, ElasticsearchSinkConfig};  
use iggy::prelude::*;  
use std::time::Duration;  
  
#[tokio::main]  
async fn main() -> Result<(), Box<dyn std::error::Error>> {  
    // Create Iggy client  
    let iggy_client = IggyClient::from_connection_string("iggy://iggy:iggy@localhost:8090")?;  
      
    // Configure Elasticsearch Sink  
    let sink_config = ElasticsearchSinkConfig {  
        elasticsearch_url: "http://localhost:9200".to_string(),  
        index_pattern: "iggy-messages-{date}".to_string(),  
        batch_size: 1000,  
        flush_interval: Duration::from_secs(5),  
        retry_attempts: 3,  
        retry_interval: Duration::from_secs(1),  
        mapping_config: Some(r#"{  
            "properties": {  
                "@timestamp": {"type": "date"},  
                "message": {"type": "text"},  
                "level": {"type": "keyword"}  
            }  
        }"#.to_string()),  
        document_id_field: None,  
        timestamp_field: Some("@timestamp".to_string()),  
        enable_dlq: false,  
        dlq_topic: None,  
    };  
      
    // Create and start sink  
    let mut sink = ElasticsearchSink::new(  
        iggy_client,  
        "logs",  
        "application-logs",   
        "elasticsearch-sink-group",  
        sink_config,  
    ).await?;  
      
    sink.start().await?;  
    Ok(())  
}

Source Usage Example

use iggy_elasticsearch_connector::{ElasticsearchSource, ElasticsearchSourceConfig};  
use iggy::prelude::*;  
use std::time::Duration;  
  
#[tokio::main]  
async fn main() -> Result<(), Box<dyn std::error::Error>> {  
    // Create Iggy client  
    let iggy_client = IggyClient::from_connection_string("iggy://iggy:iggy@localhost:8090")?;  
      
    // Configure Elasticsearch Source  
    let source_config = ElasticsearchSourceConfig {  
        elasticsearch_url: "http://localhost:9200".to_string(),  
        query: r#"{  
            "bool": {  
                "must": [  
                    {"range": {"@timestamp": {"gte": "now-1h"}}}  
                ]  
            }  
        }"#.to_string(),  
        index_pattern: "logs-*".to_string(),  
        poll_interval: Duration::from_secs(30),  
        batch_size: 500,  
        timestamp_field: Some("@timestamp".to_string()),  
        scroll_timeout: Duration::from_secs(60),  
        max_docs_per_poll: Some(5000),  
        state_file_path: Some("./elasticsearch_source_state.json".to_string()),  
        sort_field: Some("@timestamp".to_string()),  
    };  
      
    // Create and start source  
    let mut source = ElasticsearchSource::new(  
        iggy_client,  
        "external-logs",  
        "imported-logs",  
        source_config,  
    ).await?;  
      
    source.start().await?;  
    Ok(())  
}

Configuration
Core Configuration Structures

// Sink Configuration  
pub struct ElasticsearchSinkConfig {  
    pub elasticsearch_url: String,  
    pub index_pattern: String,  
    pub batch_size: usize,  
    pub flush_interval: Duration,  
    pub retry_attempts: u32,  
    pub retry_interval: Duration,  
    pub mapping_config: Option<String>,  
    pub document_id_field: Option<String>,  
    pub timestamp_field: Option<String>,  
    pub enable_dlq: bool,  
    pub dlq_topic: Option<String>,  
}  
  
// Source Configuration  
pub struct ElasticsearchSourceConfig {  
    pub elasticsearch_url: String,  
    pub query: String,  
    pub index_pattern: String,  
    pub poll_interval: Duration,  
    pub batch_size: usize,  
    pub timestamp_field: Option<String>,  
    pub scroll_timeout: Duration,  
    pub max_docs_per_poll: Option<usize>,  
    pub state_file_path: Option<String>,  
    pub sort_field: Option<String>,  
}

Deployment
Docker Compose

version: '3.8'  
  
services:  
  elasticsearch:  
    image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0  
    environment:  
      - discovery.type=single-node  
      - xpack.security.enabled=false  
    ports:  
      - "9200:9200"  
  
  iggy:  
    image: apache/iggy:latest  
    ports:  
      - "8090:8090"  
      - "3000:3000"  
  
  elasticsearch-sink:  
    build: .  
    depends_on:  
      - elasticsearch  
      - iggy  
    environment:  
      - RUST_LOG=info  
      - IGGY_URL=iggy://iggy:iggy@iggy:8090  
      - ELASTICSEARCH_URL=http://elasticsearch:9200  
  
  elasticsearch-source:  
    build: .  
    depends_on:  
      - elasticsearch  
      - iggy  
    environment:  
      - RUST_LOG=info  
      - IGGY_URL=iggy://iggy:iggy@iggy:8090  
      - ELASTICSEARCH_URL=http://elasticsearch:9200

Quick deployment:

docker-compose up -d

Commit Message Rules

  • Description: Provide a concise description of the changes.
  • Style: Use an imperative style in the subject line (e.g., "Fix bug" rather than "Fixed bug" or "Fixes bug").
  • Brevity: Keep the subject line under 80 characters.
  • Rationale: Explain the 'why' and 'what' of your changes in the summary.
  • Details: Use the body to elaborate on the 'how' of your changes.
  • Context: Include 'before' and 'after' scenarios if applicable.
  • References: Link any relevant issues or PRs in the message body.

Remember: Your contribution is essential to the success of iggy. Please ensure that your PR conforms to these guidelines for a swift and smooth integration process.

Thank you!

@spetz
Copy link
Contributor

spetz commented Jun 17, 2025

Hey,

Thank you for the contribution! Please check how the example connectors are implemented, by making use of either Sink or Source trait (exposing the basic functions), which can be then easily loaded by the runtime. The tutorial can be found under sinks / source readme files, as well as here iggy.apache.org/docs/connectors/sink.

I think that your code can be refactored to support the desired plugin system. Speaking of the configuration, as well as the basic overview of the connector capabilities, you could add the separate README.md files under e.g. /sinks/elastic_search_sink and /sources/elastic_search_source modules (we can also copy the docs to Apache Website).

@hubcio
Copy link
Contributor

hubcio commented Jun 19, 2025

@kaori-seasons have you already joined our discord?

@kaori-seasons
Copy link
Author

Hey,

Thank you for the contribution! Please check how the example connectors are implemented, by making use of either Sink or Source trait (exposing the basic functions), which can be then easily loaded by the runtime. The tutorial can be found under sinks / source readme files, as well as here iggy.apache.org/docs/connectors/sink.

I think that your code can be refactored to support the desired plugin system. Speaking of the configuration, as well as the basic overview of the connector capabilities, you could add the separate README.md files under e.g. /sinks/elastic_search_sink and /sources/elastic_search_source modules (we can also copy the docs to Apache Website).

OK, thanks for your suggestion. I am too busy at work during weekdays, so I will take some time to read the documentation of the relevant connectors on weekends.

@kaori-seasons
Copy link
Author

kaori-seasons commented Jun 20, 2025

discord

My Discord shows that it does not support Chinese mobile phone numbers.
I think the community should support contact via slack

@hubcio
Copy link
Contributor

hubcio commented Jun 21, 2025

@kaori-seasons we dont plan to use slack, we have pretty big community already on the discord. you can use github discussions if you have any questions.

@hubcio
Copy link
Contributor

hubcio commented Jun 24, 2025

@kaori-seasons do you plan to change your code according to @spetz comments?

@kaori-seasons
Copy link
Author

@kaori-seasons do you plan to change your code according to @spetz comments?

I've been busy at work this week and will refactor as soon as possible.

@spetz
Copy link
Contributor

spetz commented Jun 29, 2025

Thank you for the recent update, looks good now :) Could you please fix the markdown and update PR title (we use conventional commits) e.g. feat(connectors): support Elasticsearch sink and source connectors

@spetz
Copy link
Contributor

spetz commented Jul 1, 2025

Hey @kaori-seasons just wanted to ask, whether you'll have time to complete this, or could I help you?

@kaori-seasons
Copy link
Author

spetz

Oh dear spetz, best regards, I just saw your answer. I will make the changes as soon as possible this week

@kaori-seasons kaori-seasons changed the title [ISSUE#1851] support Elasticsearch connectors feat(connectors): support Elasticsearch sink and source connectors Jul 12, 2025
@spetz
Copy link
Contributor

spetz commented Jul 12, 2025

Thanks @kaori-seasons, also please check the latest changes which allow to provide the optional state for source connector (e.g. keep track of the recently fetched record ID or anything else, in the persistent way).

@kaori-seasons
Copy link
Author

谢谢@kaori-seasons,还请检查最新的更改,这些更改允许为源连接器提供可选状态(例如,以持久的方式跟踪最近获取的记录 ID 或其他任何内容)。

OK, I will change

@kaori-seasons
Copy link
Author

@spetz Hi, dear spetz, I introduced a state manager to track the most recently fetched record ID in a persistent way, and added corresponding integration tests. Welcome your review. The purpose of the function change is as follows:

Core Function

  • Optional state management: Enable/disable state management function through configuration
  • Multiple storage backends: Support file, Elasticsearch, Redis and other storage methods
  • Rich state information: Track processing progress, error statistics, performance indicators, etc.
  • Automatic state saving: Configurable automatic save interval
  • State recovery: Continue processing from the last position after restart

Technical Implementation

  • SDK extension: Added StatefulSource and StatefulSink traits in iggy_connector_sdk
  • State storage abstraction: Defined StateStorage trait to support multiple storage implementations
  • ES connector enhancement: Extended ES connector to support state management
  • Configuration drive: Control state management behavior through configuration files

State information

  • Processing status: Last polling time, total number of documents, number of polls, etc.
  • Cursor information: Last document ID, scroll ID, offset, etc.
  • Error tracking: Error count, last error information
  • Performance statistics: Processing time, number of bytes, success rate, etc.

How to use

  • Configuration enablement: Add state management configuration in the configuration file
  • Automatic integration: The connector automatically loads and saves state
  • Manual control: Provide API for manual state management
  • Tool support: Provide tool functions such as state statistics and cleanup

Copy link
Contributor

@spetz spetz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @kaori-seasons please ensure that the code compiles locally, and all the other validation such as running clippy, tests etc. also succeeds. For example, some structures are duplicated (e.g. SourceState), there's no need for StatefulSource or StatefulSink traits etc.

}

#[tokio::test]
async fn test_state_manager_creation() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the simple BDD convention, such as should/given/when, for example, state_manager_should_be_created() and so on.

@kaori-seasons
Copy link
Author

Hi, @spetz. I should have changed all the files according to your suggestion and added relevant tests. Make sure all the tests pass. Can you help me take a look when you have time?

test_434343 8f525974150924e52e65152a1ad9aeb5

@kaori-seasons
Copy link
Author

By the way, due to work reasons, I may not have much time in the future. I sincerely hope that this pull request can be merged as soon as possible.

@spetz
Copy link
Contributor

spetz commented Jul 16, 2025

@kaori-seasons please take a look at https://github.com/apache/iggy/blob/master/CONTRIBUTING.md and https://github.com/apache/iggy/blob/master/PULL_REQUEST_TEMPLATE - on top of the required commands, please also invoke the following, to ensure the dependencies are up to date, and all the source files have the license header included.

./scripts/licenses-list.sh --update

just licenses-fix 

You can either use just or include the headers manually. Also, please fix the markdown lint errors.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants