Skip to content

Commit be29b32

Browse files
committed
init
1 parent 0ced605 commit be29b32

File tree

384 files changed

+58883
-15214
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

384 files changed

+58883
-15214
lines changed
Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
namespace FlinkDotNet.DataStream.Checkpoint
18+
{
19+
/// <summary>
20+
/// Configuration for checkpointing behavior.
21+
/// This corresponds to org.apache.flink.streaming.api.environment.CheckpointConfig in Apache Flink.
22+
///
23+
/// Checkpoint configuration controls:
24+
/// - Where checkpoints are stored (checkpoint storage)
25+
/// - Checkpoint timeouts and failure tolerance
26+
/// - Concurrent checkpoint limits
27+
/// - Minimum pause between checkpoints
28+
/// </summary>
29+
public class CheckpointConfig
30+
{
31+
private ICheckpointStorage? _checkpointStorage;
32+
private string? _checkpointStoragePath;
33+
private long _checkpointTimeout = 600000; // 10 minutes default
34+
private long _minPauseBetweenCheckpoints = 0;
35+
private int _maxConcurrentCheckpoints = 1;
36+
private int _tolerableCheckpointFailureNumber = int.MaxValue;
37+
private bool _externalizedCheckpointsEnabled = false;
38+
private ExternalizedCheckpointCleanup _externalizedCheckpointCleanup = ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION;
39+
40+
/// <summary>
41+
/// Creates a new CheckpointConfig with default settings.
42+
/// </summary>
43+
public CheckpointConfig()
44+
{
45+
}
46+
47+
/// <summary>
48+
/// Sets the checkpoint storage to a file system path.
49+
/// This is a convenience method that creates a FileSystemCheckpointStorage internally.
50+
/// </summary>
51+
/// <param name="path">The checkpoint storage path (local, HDFS, S3, etc.)</param>
52+
/// <returns>This CheckpointConfig instance for method chaining</returns>
53+
public CheckpointConfig SetCheckpointStorage(string path)
54+
{
55+
if (string.IsNullOrWhiteSpace(path))
56+
{
57+
throw new System.ArgumentException("Checkpoint storage path cannot be null or empty", nameof(path));
58+
}
59+
_checkpointStoragePath = path;
60+
_checkpointStorage = new FileSystemCheckpointStorage(path);
61+
return this;
62+
}
63+
64+
/// <summary>
65+
/// Sets the checkpoint storage implementation.
66+
/// </summary>
67+
/// <param name="storage">The checkpoint storage implementation</param>
68+
/// <returns>This CheckpointConfig instance for method chaining</returns>
69+
public CheckpointConfig SetCheckpointStorage(ICheckpointStorage storage)
70+
{
71+
_checkpointStorage = storage ?? throw new System.ArgumentNullException(nameof(storage));
72+
_checkpointStoragePath = storage.GetCheckpointPath();
73+
return this;
74+
}
75+
76+
/// <summary>
77+
/// Gets the configured checkpoint storage implementation.
78+
/// </summary>
79+
/// <returns>The checkpoint storage, or null if not configured</returns>
80+
public ICheckpointStorage? GetCheckpointStorage()
81+
{
82+
return _checkpointStorage;
83+
}
84+
85+
/// <summary>
86+
/// Gets the checkpoint storage path.
87+
/// </summary>
88+
/// <returns>The storage path, or null if not configured</returns>
89+
public string? GetCheckpointStoragePath()
90+
{
91+
return _checkpointStoragePath;
92+
}
93+
94+
/// <summary>
95+
/// Sets the maximum time that a checkpoint may take before being aborted.
96+
/// </summary>
97+
/// <param name="timeoutMs">The checkpoint timeout in milliseconds</param>
98+
/// <returns>This CheckpointConfig instance for method chaining</returns>
99+
public CheckpointConfig SetCheckpointTimeout(long timeoutMs)
100+
{
101+
if (timeoutMs <= 0)
102+
{
103+
throw new System.ArgumentException("Checkpoint timeout must be positive", nameof(timeoutMs));
104+
}
105+
_checkpointTimeout = timeoutMs;
106+
return this;
107+
}
108+
109+
/// <summary>
110+
/// Gets the checkpoint timeout in milliseconds.
111+
/// </summary>
112+
/// <returns>The checkpoint timeout</returns>
113+
public long GetCheckpointTimeout()
114+
{
115+
return _checkpointTimeout;
116+
}
117+
118+
/// <summary>
119+
/// Sets the minimal pause between consecutive checkpoint attempts.
120+
/// This defines how soon the checkpoint coordinator may trigger another checkpoint
121+
/// after the last checkpoint has completed.
122+
/// </summary>
123+
/// <param name="pauseMs">The minimum pause in milliseconds</param>
124+
/// <returns>This CheckpointConfig instance for method chaining</returns>
125+
public CheckpointConfig SetMinPauseBetweenCheckpoints(long pauseMs)
126+
{
127+
if (pauseMs < 0)
128+
{
129+
throw new System.ArgumentException("Minimum pause must be non-negative", nameof(pauseMs));
130+
}
131+
_minPauseBetweenCheckpoints = pauseMs;
132+
return this;
133+
}
134+
135+
/// <summary>
136+
/// Gets the minimum pause between checkpoints in milliseconds.
137+
/// </summary>
138+
/// <returns>The minimum pause</returns>
139+
public long GetMinPauseBetweenCheckpoints()
140+
{
141+
return _minPauseBetweenCheckpoints;
142+
}
143+
144+
/// <summary>
145+
/// Sets the maximum number of concurrent checkpoint attempts that may be in progress at the same time.
146+
/// For most setups, one concurrent checkpoint is sufficient and preferred for consistency.
147+
/// </summary>
148+
/// <param name="maxConcurrent">The maximum number of concurrent checkpoints</param>
149+
/// <returns>This CheckpointConfig instance for method chaining</returns>
150+
public CheckpointConfig SetMaxConcurrentCheckpoints(int maxConcurrent)
151+
{
152+
if (maxConcurrent < 1)
153+
{
154+
throw new System.ArgumentException("Max concurrent checkpoints must be at least 1", nameof(maxConcurrent));
155+
}
156+
_maxConcurrentCheckpoints = maxConcurrent;
157+
return this;
158+
}
159+
160+
/// <summary>
161+
/// Gets the maximum number of concurrent checkpoints.
162+
/// </summary>
163+
/// <returns>The maximum concurrent checkpoints</returns>
164+
public int GetMaxConcurrentCheckpoints()
165+
{
166+
return _maxConcurrentCheckpoints;
167+
}
168+
169+
/// <summary>
170+
/// Sets the tolerable checkpoint failure number.
171+
/// If this value is exceeded, the job fails.
172+
/// </summary>
173+
/// <param name="tolerableFailures">The number of tolerable checkpoint failures</param>
174+
/// <returns>This CheckpointConfig instance for method chaining</returns>
175+
public CheckpointConfig SetTolerableCheckpointFailureNumber(int tolerableFailures)
176+
{
177+
if (tolerableFailures < 0)
178+
{
179+
throw new System.ArgumentException("Tolerable failures must be non-negative", nameof(tolerableFailures));
180+
}
181+
_tolerableCheckpointFailureNumber = tolerableFailures;
182+
return this;
183+
}
184+
185+
/// <summary>
186+
/// Gets the tolerable checkpoint failure number.
187+
/// </summary>
188+
/// <returns>The tolerable failure count</returns>
189+
public int GetTolerableCheckpointFailureNumber()
190+
{
191+
return _tolerableCheckpointFailureNumber;
192+
}
193+
194+
/// <summary>
195+
/// Enables externalized checkpoints, which persist checkpoints after job termination.
196+
/// Externalized checkpoints can be used to recover from job failures or for manual savepoints.
197+
/// </summary>
198+
/// <param name="cleanup">The cleanup behavior for externalized checkpoints</param>
199+
/// <returns>This CheckpointConfig instance for method chaining</returns>
200+
public CheckpointConfig EnableExternalizedCheckpoints(ExternalizedCheckpointCleanup cleanup)
201+
{
202+
_externalizedCheckpointsEnabled = true;
203+
_externalizedCheckpointCleanup = cleanup;
204+
return this;
205+
}
206+
207+
/// <summary>
208+
/// Disables externalized checkpoints.
209+
/// </summary>
210+
/// <returns>This CheckpointConfig instance for method chaining</returns>
211+
public CheckpointConfig DisableExternalizedCheckpoints()
212+
{
213+
_externalizedCheckpointsEnabled = false;
214+
return this;
215+
}
216+
217+
/// <summary>
218+
/// Gets whether externalized checkpoints are enabled.
219+
/// </summary>
220+
/// <returns>True if externalized checkpoints are enabled</returns>
221+
public bool IsExternalizedCheckpointsEnabled()
222+
{
223+
return _externalizedCheckpointsEnabled;
224+
}
225+
226+
/// <summary>
227+
/// Gets the externalized checkpoint cleanup behavior.
228+
/// </summary>
229+
/// <returns>The cleanup behavior</returns>
230+
public ExternalizedCheckpointCleanup GetExternalizedCheckpointCleanup()
231+
{
232+
return _externalizedCheckpointCleanup;
233+
}
234+
}
235+
236+
/// <summary>
237+
/// Cleanup behavior for externalized checkpoints.
238+
/// Corresponds to org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup.
239+
/// </summary>
240+
public enum ExternalizedCheckpointCleanup
241+
{
242+
/// <summary>
243+
/// Delete externalized checkpoints when the job is cancelled.
244+
/// The checkpoints will be kept when the job fails.
245+
/// </summary>
246+
DELETE_ON_CANCELLATION,
247+
248+
/// <summary>
249+
/// Retain externalized checkpoints when the job is cancelled or fails.
250+
/// The checkpoints need to be deleted manually.
251+
/// </summary>
252+
RETAIN_ON_CANCELLATION
253+
}
254+
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
namespace FlinkDotNet.DataStream.Checkpoint
18+
{
19+
/// <summary>
20+
/// Checkpoint storage implementation that stores checkpoints on a file system.
21+
/// This corresponds to org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage in Apache Flink.
22+
///
23+
/// Supports various file systems:
24+
/// - Local file system: file:///path/to/checkpoints
25+
/// - HDFS: hdfs://namenode:port/path/to/checkpoints
26+
/// - Amazon S3: s3://bucket/path/to/checkpoints
27+
/// - Azure Blob Storage: wasb://container@account/path/to/checkpoints
28+
/// - Google Cloud Storage: gs://bucket/path/to/checkpoints
29+
///
30+
/// File system checkpoint storage provides:
31+
/// - Persistent checkpoint storage
32+
/// - High availability support
33+
/// - Suitable for production deployments
34+
/// </summary>
35+
public class FileSystemCheckpointStorage : ICheckpointStorage
36+
{
37+
private readonly string _checkpointPath;
38+
private readonly int _fileSizeThreshold;
39+
40+
/// <summary>
41+
/// Creates a new FileSystemCheckpointStorage with the specified checkpoint path.
42+
/// </summary>
43+
/// <param name="checkpointPath">The base path where checkpoints will be stored</param>
44+
/// <param name="fileSizeThreshold">
45+
/// The file size threshold (in bytes) below which state is stored inline.
46+
/// Default is -1 (use Flink default, typically 1024 bytes).
47+
/// </param>
48+
public FileSystemCheckpointStorage(string checkpointPath, int fileSizeThreshold = -1)
49+
{
50+
if (string.IsNullOrWhiteSpace(checkpointPath))
51+
{
52+
throw new System.ArgumentException(
53+
"Checkpoint path cannot be null or empty",
54+
nameof(checkpointPath));
55+
}
56+
57+
_checkpointPath = checkpointPath;
58+
_fileSizeThreshold = fileSizeThreshold;
59+
}
60+
61+
/// <summary>
62+
/// Gets the base path where checkpoints are stored.
63+
/// </summary>
64+
/// <returns>The checkpoint storage path</returns>
65+
public string GetCheckpointPath()
66+
{
67+
return _checkpointPath;
68+
}
69+
70+
/// <summary>
71+
/// Gets the file size threshold for inline state storage.
72+
/// </summary>
73+
/// <returns>The file size threshold in bytes, or -1 for default</returns>
74+
public int GetFileSizeThreshold()
75+
{
76+
return _fileSizeThreshold;
77+
}
78+
79+
/// <summary>
80+
/// Gets whether this storage implementation supports high availability.
81+
/// File system checkpoint storage supports high availability when using
82+
/// distributed file systems (HDFS, S3, etc.).
83+
/// </summary>
84+
/// <returns>True, as file system storage supports high availability</returns>
85+
public bool SupportsHighAvailability()
86+
{
87+
return true;
88+
}
89+
}
90+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
namespace FlinkDotNet.DataStream.Checkpoint
18+
{
19+
/// <summary>
20+
/// Checkpoint storage defines where completed checkpoints are persisted.
21+
/// This corresponds to org.apache.flink.runtime.state.storage.CheckpointStorage in Apache Flink.
22+
///
23+
/// Different checkpoint storage implementations:
24+
/// - FileSystemCheckpointStorage: Stores checkpoints on file system (local, HDFS, S3, etc.)
25+
/// - JobManagerCheckpointStorage: Stores small checkpoints in JobManager memory
26+
/// </summary>
27+
public interface ICheckpointStorage
28+
{
29+
/// <summary>
30+
/// Gets the base path where checkpoints are stored.
31+
/// </summary>
32+
/// <returns>The checkpoint storage path</returns>
33+
string GetCheckpointPath();
34+
35+
/// <summary>
36+
/// Gets whether this storage implementation supports high availability.
37+
/// High availability requires persistent storage that survives JobManager failures.
38+
/// </summary>
39+
/// <returns>True if high availability is supported</returns>
40+
bool SupportsHighAvailability();
41+
}
42+
}

0 commit comments

Comments
 (0)