-
Notifications
You must be signed in to change notification settings - Fork 17
5091 Multi threaded state store committer #6189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
5091 Multi threaded state store committer #6189
Conversation
| <Match> | ||
| <Class name="sleeper.statestore.committer.MultiThreadedStateStoreCommitter" /> | ||
| <Bug pattern="DM_GC" /> | ||
| </Match> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To ignore specific failures like this, we use a SuppressFBWarnings annotation. That's in the Maven dependency com.github.spotbugs:spotbugs-annotations.
| commitQueue = sqsQueueForStateStoreCommitter(policiesStack, deadLetters); | ||
| lambdaToCommitStateStoreUpdates( | ||
|
|
||
| if (this.instanceProperties.getEnumValue(TableStateProperty.STATESTORE_COMMITTER_PLATFORM, StateStoreCommitterPlatform.class).equals(StateStoreCommitterPlatform.EC2)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this might be a bit easier to read if you static import the instance property STATESTORE_COMMITTER_PLATFORM.
There are also other references to instance properties that could be static imported in this file.
| } | ||
|
|
||
| private void ecsTaskToCommitStateStoreUpdates(LoggingStack loggingStack, ConfigBucketStack configBucketStack, TableIndexStack tableIndexStack, StateStoreStacks stateStoreStacks, Queue commitQueue) { | ||
| String instanceId = this.instanceProperties.get(ID); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are unnecessary uses of "this" in this file. Please see our coding conventions:
| Map<String, String> environmentVariables = EnvironmentUtils.createDefaultEnvironment(this.instanceProperties); | ||
| environmentVariables.put(Utils.AWS_REGION, this.instanceProperties.get(REGION)); | ||
|
|
||
| if (this.instanceProperties.getEnumValue(TableStateProperty.STATESTORE_COMMITTER_PLATFORM, StateStoreCommitterPlatform.class).equals(StateStoreCommitterPlatform.EC2)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This duplicates the check in the constructor, so the else branch of this if statement will never be executed. Should this be removed?
| private Stream<StackDockerImage> dockerDeploymentImages(Collection<OptionalStack> stacks) { | ||
| return dockerDeployments.stream() | ||
| .filter(deployment -> stacks.contains(deployment.getOptionalStack())) | ||
| .filter(deployment -> deployment.getOptionalStack() == null || stacks.contains(deployment.getOptionalStack())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is failing some tests in AdminClientPropertiesStoreIT, because the getImagesToUploadOnUpdate method assumes that the only images to be uploaded are for optional stacks.
The intended behaviour of getImagesToUploadOnUpdate is to upload images for optional stacks that weren't in the instance before.
We could handle optional stack images separately from core images, or potentially treat the new state store platform similarly to optional stacks, so that the image will only be uploaded when you choose that platform. In either case this needs unit test coverage, ideally in UploadDockerImagesToEcrTest.
| } | ||
|
|
||
| public List<RunTaskResponse> run(String instanceId, int numWritersPerTable, int numIngestsPerWriter, long recordsPerIngest) { | ||
| SystemTestProperties systemTestProperties = SystemTestProperties.loadFromS3GivenInstanceId(this.s3Client, instanceId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are unnecessary uses of "this" in this file. Please see our coding conventions:
| String instanceId = args[0]; | ||
| int numWritersPerTable = args.length > 1 ? Integer.parseInt(args[1]) : 1; | ||
| int numIngestsPerWriter = args.length > 2 ? Integer.parseInt(args[2]) : 1; | ||
| long recordsPerIngest = args.length > 3 ? Long.parseLong(args[3]) : 10_000_000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems odd that the default values aren't taken from SystemTestProperties like in RunWriteRandomDataTaskOnECS.
|
|
||
| public List<String> createTables(String instanceId, int tableCount, Path tablePropertiesFile, Path schemaFile, String splitPointsFile) throws IOException { | ||
| InstanceProperties instanceProperties = S3InstanceProperties.loadGivenInstanceId(this.s3Client, instanceId); | ||
| String tablePrefix = "table-" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyMMdd-HHmm")) + '-'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be good to use Instant rather than LocalDateTime, to keep the times consistent with elsewhere in the system. Instant uses UTC, whereas I think LocalDateTime will be in the local time zone, although without time zone information in the object.
| } | ||
|
|
||
| public void takeAllTablesOffline(String instanceId) { | ||
| TakeAllTablesOffline offliner = new TakeAllTablesOffline(this.s3Client, this.dynamoClient); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are unnecessary uses of "this" in this file. Please see our coding conventions:
|
|
||
| int numWritersPerTable = args.length > 5 ? Integer.parseInt(args[5]) : 1; | ||
| int numIngestsPerWriter = args.length > 6 ? Integer.parseInt(args[6]) : 1; | ||
| long recordsPerIngest = args.length > 7 ? Long.parseLong(args[7]) : 10_000_000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems odd that the default values aren't taken from SystemTestProperties like in RunWriteRandomDataTaskOnECS.
| } | ||
| } | ||
|
|
||
| private void ensureEnoughHeapSpaceAvailable(Set<String> requiredTableIds) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like it could be moved into StateStoreProvider? The processedTableOrder field seems to duplicate the field StateStoreProvider.tableIds.
Make sure you have checked all steps below.
Issue
Feature". Note that before an issue is finished, you can still make a pull request by raising a separate issue
for your progress.
Tests
Documentation
separate issue for that below.