@@ -19,8 +19,9 @@ import (
19
19
)
20
20
21
21
const (
22
- readLabel = "read"
23
- writeLabel = "write"
22
+ readLabel = "read"
23
+ writeLabel = "write"
24
+ minWriteCapacity = 1
24
25
)
25
26
26
27
var (
@@ -119,6 +120,12 @@ func (m *DynamoTableManager) loop() {
119
120
ticker := time .NewTicker (m .cfg .DynamoDBPollInterval )
120
121
defer ticker .Stop ()
121
122
123
+ if err := instrument .TimeRequestHistogram (context .Background (), "DynamoTableManager.syncTables" , syncTableDuration , func (ctx context.Context ) error {
124
+ return m .syncTables (ctx )
125
+ }); err != nil {
126
+ log .Errorf ("Error syncing tables: %v" , err )
127
+ }
128
+
122
129
for {
123
130
select {
124
131
case <- ticker .C :
@@ -161,43 +168,55 @@ func (a byName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
161
168
func (a byName ) Less (i , j int ) bool { return a [i ].name < a [j ].name }
162
169
163
170
func (m * DynamoTableManager ) calculateExpectedTables (_ context.Context ) []tableDescription {
171
+ if ! m .cfg .UsePeriodicTables {
172
+ return []tableDescription {
173
+ {
174
+ name : m .tableName ,
175
+ provisionedRead : m .cfg .ProvisionedReadThroughput ,
176
+ provisionedWrite : m .cfg .ProvisionedWriteThroughput ,
177
+ },
178
+ }
179
+ }
180
+
164
181
result := []tableDescription {}
165
182
183
+ var (
184
+ tablePeriodSecs = int64 (m .cfg .TablePeriod / time .Second )
185
+ gracePeriodSecs = int64 (m .cfg .CreationGracePeriod / time .Second )
186
+ maxChunkAgeSecs = int64 (m .cfg .MaxChunkAge / time .Second )
187
+ firstTable = m .cfg .PeriodicTableStartAt .Unix () / tablePeriodSecs
188
+ lastTable = (mtime .Now ().Unix () + gracePeriodSecs ) / tablePeriodSecs
189
+ now = mtime .Now ().Unix ()
190
+ )
191
+
166
192
// Add the legacy table
167
193
{
168
194
legacyTable := tableDescription {
169
- name : m .tableName ,
170
- provisionedRead : m .cfg .ProvisionedReadThroughput ,
195
+ name : m .tableName ,
196
+ provisionedRead : m .cfg .ProvisionedReadThroughput ,
197
+ provisionedWrite : minWriteCapacity ,
171
198
}
172
199
173
200
// if we are before the switch to periodic table, we need to give this table write throughput
174
- if ! m . cfg . UsePeriodicTables || mtime . Now (). Before ( m . cfg . PeriodicTableStartAt . Add ( m . cfg . CreationGracePeriod ). Add ( m . cfg . MaxChunkAge )) {
201
+ if now < ( firstTable * tablePeriodSecs ) + gracePeriodSecs + maxChunkAgeSecs {
175
202
legacyTable .provisionedWrite = m .cfg .ProvisionedWriteThroughput
176
203
}
177
204
result = append (result , legacyTable )
178
205
}
179
206
180
- if m .cfg .UsePeriodicTables {
181
- tablePeriodSecs := int64 (m .cfg .TablePeriod / time .Second )
182
- gracePeriodSecs := int64 (m .cfg .CreationGracePeriod / time .Second )
183
- maxChunkAgeSecs := int64 (m .cfg .MaxChunkAge / time .Second )
184
- firstTable := m .cfg .PeriodicTableStartAt .Unix () / tablePeriodSecs
185
- lastTable := (mtime .Now ().Unix () + gracePeriodSecs ) / tablePeriodSecs
186
-
187
- for i := firstTable ; i <= lastTable ; i ++ {
188
- table := tableDescription {
189
- // Name construction needs to be consistent with chunk_store.bigBuckets
190
- name : m .cfg .TablePrefix + strconv .Itoa (int (i )),
191
- provisionedRead : m .cfg .ProvisionedReadThroughput ,
192
- }
207
+ for i := firstTable ; i <= lastTable ; i ++ {
208
+ table := tableDescription {
209
+ // Name construction needs to be consistent with chunk_store.bigBuckets
210
+ name : m .cfg .TablePrefix + strconv .Itoa (int (i )),
211
+ provisionedRead : m .cfg .ProvisionedReadThroughput ,
212
+ provisionedWrite : minWriteCapacity ,
213
+ }
193
214
194
- // if now is within table [start - grace, end + grace), then we need some write throughput
195
- now := mtime .Now ().Unix ()
196
- if (i * tablePeriodSecs )- gracePeriodSecs <= now && now < (i * tablePeriodSecs )+ tablePeriodSecs + gracePeriodSecs + maxChunkAgeSecs {
197
- table .provisionedWrite = m .cfg .ProvisionedWriteThroughput
198
- }
199
- result = append (result , table )
215
+ // if now is within table [start - grace, end + grace), then we need some write throughput
216
+ if (i * tablePeriodSecs )- gracePeriodSecs <= now && now < (i * tablePeriodSecs )+ tablePeriodSecs + gracePeriodSecs + maxChunkAgeSecs {
217
+ table .provisionedWrite = m .cfg .ProvisionedWriteThroughput
200
218
}
219
+ result = append (result , table )
201
220
}
202
221
203
222
sort .Sort (byName (result ))
@@ -306,10 +325,11 @@ func (m *DynamoTableManager) updateTables(ctx context.Context, descriptions []ta
306
325
tableCapacity .WithLabelValues (writeLabel , desc .name ).Set (float64 (* out .Table .ProvisionedThroughput .WriteCapacityUnits ))
307
326
308
327
if * out .Table .ProvisionedThroughput .ReadCapacityUnits == desc .provisionedRead && * out .Table .ProvisionedThroughput .WriteCapacityUnits == desc .provisionedWrite {
328
+ log .Infof (" Provisioned throughput: read = %d, write = %d, skipping." , * out .Table .ProvisionedThroughput .ReadCapacityUnits , * out .Table .ProvisionedThroughput .WriteCapacityUnits )
309
329
continue
310
330
}
311
331
312
- log .Infof ("Updating provisioned throughput on table %s" , desc .name )
332
+ log .Infof (" Updating provisioned throughput on table %s to read = %d, write = %d " , desc .name , desc . provisionedRead , desc . provisionedWrite )
313
333
if err := instrument .TimeRequestHistogram (ctx , "DynamoDB.DescribeTable" , dynamoRequestDuration , func (_ context.Context ) error {
314
334
_ , err := m .dynamodb .UpdateTable (& dynamodb.UpdateTableInput {
315
335
TableName : aws .String (desc .name ),
0 commit comments