Skip to content

fix(fcm): Optimize SendEachInBatch with worker pool #695

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: dev
Choose a base branch
from

Conversation

lahirumaramba
Copy link
Member

The sendEachInBatch function, used for sending FCM messages individually within a batch, previously created a new goroutine for each message. This could lead to high CPU usage for large batches.

This commit refactors sendEachInBatch to use a fixed-size pool of concurrent operations (10 operations) to manage message sending. This limits the number of active goroutines, reducing CPU overhead and improving resource utilization.

Key changes:

  • Implemented early validation: All messages are now validated upfront. If any message is invalid, the function returns an error immediately without attempting to send any messages.
  • Introduced a mechanism for managing concurrent operations: Manages a fixed number of goroutines to process message sending tasks. Messages are distributed via channels.
  • Ensured result ordering: The order of responses in BatchResponse.Responses correctly matches the order of the input messages, even with concurrent processing.
  • Updated unit tests in messaging_batch_test.go to comprehensively cover the new implementation, including scenarios for varying batch sizes, partial failures, early validation, and response ordering.
  • Confirmed that the existing HTTP client continues to leverage HTTP/2.

The `sendEachInBatch` function, used for sending FCM messages individually within a batch, previously created a new goroutine for each message. This could lead to high CPU usage for large batches.

This commit refactors `sendEachInBatch` to use a fixed-size pool of concurrent operations (10 operations) to manage message sending. This limits the number of active goroutines, reducing CPU overhead and improving resource utilization.

Key changes:
- Implemented early validation: All messages are now validated upfront. If any message is invalid, the function returns an error immediately without attempting to send any messages.
- Introduced a mechanism for managing concurrent operations: Manages a fixed number of goroutines to process message sending tasks. Messages are distributed via channels.
- Ensured result ordering: The order of responses in `BatchResponse.Responses` correctly matches the order of the input messages, even with concurrent processing.
- Updated unit tests in `messaging_batch_test.go` to comprehensively cover the new implementation, including scenarios for varying batch sizes, partial failures, early validation, and response ordering.
- Confirmed that the existing HTTP client continues to leverage HTTP/2.
@lahirumaramba lahirumaramba added the release:stage Stage a release candidate label May 13, 2025
google-labs-jules bot and others added 7 commits May 13, 2025 22:39
This commit increases the number of goroutines in the
`sendEachInBatch` function from 10 to 50.

This change is based on your feedback to potentially improve
throughput for large batches of FCM messages. The number 50 was
chosen as a significant increase over the previous conservative
value of 10, aiming to provide better concurrency for I/O-bound
operations without being excessively high.

The goal is to allow for more parallel processing of messages up
to the maximum batch limit of 500. Performance in specific
environments should be monitored to ensure this change has the
desired effect without causing undue resource strain.

Unit tests in `messaging_batch_test.go` have been reviewed and
adjusted to ensure they remain meaningful with the new pool size,
particularly scenarios testing behavior when the number of messages
is less than, equal to, or greater than the number of concurrent
processes.
This commit addresses two issues introduced in previous changes:

1.  A syntax error in `messaging/messaging_batch_test.go` caused by a duplicated `t.Run` call and associated redundant variable declarations.
2.  An "imported and not used" error for the "sync" package in `messaging/messaging_batch.go`, which occurred after `sync.WaitGroup` was removed and the import statement was not.

These fixes ensure the code builds cleanly and passes lint checks.
This commit re-adds the `import "sync"` statement to
`messaging/messaging_batch_test.go`.

The `sync` package, specifically `sync.Mutex`, is used within
the test suite (e.g., in `TestSendEachWorkerPoolScenarios` and
`TestSendEachResponseOrderWithConcurrency`) to protect shared
variables like hit counters and logs from race conditions when
accessed by concurrently running mock HTTP server handlers.

A previous change inadvertently removed this import while cleaning
up unused imports in other files, leading to "undefined: sync"
build errors in the test file. This commit corrects that oversight.
This commit addresses several issues in the messaging test suite
(`messaging_batch_test.go`) that were causing test failures,
including data races and incorrect test logic:

1.  **Data Races:**
    *   In `TestSendEachPartialFailure` and `TestSendEachTotalFailure`,
        the `serverHitCount` variable in mock server handlers was accessed
        concurrently without synchronization. This has been fixed by
        introducing a `sync.Mutex` to protect access to `serverHitCount`
        within each test case's server setup.

2.  **Early Validation Test (`TestSendEachEarlyValidationSkipsSend`):**
    *   The `invalid_last_message` test was failing because the
        type of invalidity used (large payload) was not caught by the
        early `validateMessage` scan in `sendEachInBatch`.
    *   Fixed by changing the invalid message to one with both Topic and
        Token defined, which `validateMessage` correctly flags, ensuring
        the test now accurately verifies early exit behavior and that no
        server calls are made.

3.  **Partial Failure Scenario (`TestSendEachWorkerPoolScenarios`):**
    *   The test for partial failures
        (`NumMessages_75_AllSuccess_false_...`) was failing due to a
        mismatch between the mock server's failure simulation (based on
        request arrival order) and the test's assertions (based on
        original message index).
    *   Fixed by modifying the mock server to parse the original message
        index from the message content (topic) and use this for
        failure simulation, ensuring deterministic alignment with assertions.

These changes should ensure the stability and correctness of the
messaging batch processing tests, especially those verifying
concurrent operations and error handling.
I applied `gofmt -s .` to ensure consistent code formatting,
particularly for `messaging/messaging_batch.go` and
`messaging/messaging_batch_test.go`.

This commit reflects any adjustments made by gofmt to align
with standard Go formatting practices, including simplification
of struct initializations and consistent spacing.
This commit applies gofmt formatting, with a specific focus
on removing trailing whitespace from messaging/messaging_batch_test.go
as identified by previous `gofmt -d` outputs.

This is part of a series of fixes to ensure the codebase
conforms to standard Go formatting practices and to resolve
various test failures and linting issues.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
release:stage Stage a release candidate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant