99 "errors"
1010 "fmt"
1111 "log"
12+ "strings"
1213 "sync"
1314 "time"
1415
@@ -71,7 +72,9 @@ func (c *RemoteClient) Get() (*remote.Payload, error) {
7172 c .mu .Lock ()
7273 defer c .mu .Unlock ()
7374
74- pair , _ , err := c .Client .KV ().Get (c .Path , nil )
75+ kv := c .Client .KV ()
76+
77+ chunked , hash , chunks , pair , err := c .chunkedMode ()
7578 if err != nil {
7679 return nil , err
7780 }
@@ -81,27 +84,102 @@ func (c *RemoteClient) Get() (*remote.Payload, error) {
8184
8285 c .modifyIndex = pair .ModifyIndex
8386
84- payload := pair .Value
87+ var payload []byte
88+ if chunked {
89+ for _ , c := range chunks {
90+ pair , _ , err := kv .Get (c , nil )
91+ if err != nil {
92+ return nil , err
93+ }
94+ if pair == nil {
95+ return nil , fmt .Errorf ("Key %q could not be found" , c )
96+ }
97+ payload = append (payload , pair .Value [:]... )
98+ }
99+ } else {
100+ payload = pair .Value
101+ }
102+
85103 // If the payload starts with 0x1f, it's gzip, not json
86- if len (pair .Value ) >= 1 && pair .Value [0 ] == '\x1f' {
87- if data , err := uncompressState (pair .Value ); err == nil {
88- payload = data
89- } else {
104+ if len (payload ) >= 1 && payload [0 ] == '\x1f' {
105+ payload , err = uncompressState (payload )
106+ if err != nil {
90107 return nil , err
91108 }
92109 }
93110
94- md5 := md5 .Sum (pair .Value )
111+ md5 := md5 .Sum (payload )
112+
113+ if hash != "" && fmt .Sprintf ("%x" , md5 ) != hash {
114+ return nil , fmt .Errorf ("The remote state does not match the expected hash" )
115+ }
116+
95117 return & remote.Payload {
96118 Data : payload ,
97119 MD5 : md5 [:],
98120 }, nil
99121}
100122
101123func (c * RemoteClient ) Put (data []byte ) error {
124+ // The state can be stored in 4 different ways, based on the payload size
125+ // and whether the user enabled gzip:
126+ // - single entry mode with plain JSON: a single JSON is stored at
127+ // "tfstate/my_project"
128+ // - single entry mode gzip: the JSON payload is first gziped and stored at
129+ // "tfstate/my_project"
130+ // - chunked mode with plain JSON: the JSON payload is split in pieces and
131+ // stored like so:
132+ // - "tfstate/my_project" -> a JSON payload that contains the path of
133+ // the chunks and an MD5 sum like so:
134+ // {
135+ // "current-hash": "abcdef1234",
136+ // "chunks": [
137+ // "tfstate/my_project/tfstate.abcdef1234/0",
138+ // "tfstate/my_project/tfstate.abcdef1234/1",
139+ // "tfstate/my_project/tfstate.abcdef1234/2",
140+ // ]
141+ // }
142+ // - "tfstate/my_project/tfstate.abcdef1234/0" -> The first chunk
143+ // - "tfstate/my_project/tfstate.abcdef1234/1" -> The next one
144+ // - ...
145+ // - chunked mode with gzip: the same system but we gziped the JSON payload
146+ // before splitting it in chunks
147+ //
148+ // When overwritting the current state, we need to clean the old chunks if
149+ // we were in chunked mode (no matter whether we need to use chunks for the
150+ // new one). To do so based on the 4 possibilities above we look at the
151+ // value at "tfstate/my_project" and if it is:
152+ // - absent then it's a new state and there will be nothing to cleanup,
153+ // - not a JSON payload we were in single entry mode with gzip so there will
154+ // be nothing to cleanup
155+ // - a JSON payload, then we were either single entry mode with plain JSON
156+ // or in chunked mode. To differentiate between the two we look whether a
157+ // "current-hash" key is present in the payload. If we find one we were
158+ // in chunked mode and we will need to remove the old chunks (whether or
159+ // not we were using gzip does not matter in that case).
160+
102161 c .mu .Lock ()
103162 defer c .mu .Unlock ()
104163
164+ kv := c .Client .KV ()
165+
166+ // First we determine what mode we were using and to prepare the cleanup
167+ chunked , hash , _ , _ , err := c .chunkedMode ()
168+ if err != nil {
169+ return err
170+ }
171+ cleanupOldChunks := func () {}
172+ if chunked {
173+ cleanupOldChunks = func () {
174+ // We ignore all errors that can happen here because we already
175+ // saved the new state and there is no way to return a warning to
176+ // the user. We may end up with dangling chunks but there is no way
177+ // to be sure we won't.
178+ path := strings .TrimRight (c .Path , "/" ) + fmt .Sprintf ("/tfstate.%s/" , hash )
179+ kv .DeleteTree (path , nil )
180+ }
181+ }
182+
105183 payload := data
106184 if c .GZip {
107185 if compressedState , err := compressState (data ); err == nil {
@@ -111,8 +189,6 @@ func (c *RemoteClient) Put(data []byte) error {
111189 }
112190 }
113191
114- kv := c .Client .KV ()
115-
116192 // default to doing a CAS
117193 verb := consulapi .KVCAS
118194
@@ -122,9 +198,44 @@ func (c *RemoteClient) Put(data []byte) error {
122198 verb = consulapi .KVSet
123199 }
124200
201+ // If the payload is too large we first write the chunks and replace it
202+ // 524288 is the default value, we just hope the user did not set a smaller
203+ // one but there is really no reason for them to do so, if they changed it
204+ // it is certainly to set a larger value.
205+ limit := 524288
206+ if len (payload ) > limit {
207+ md5 := md5 .Sum (data )
208+ chunks := split (payload , limit )
209+ chunkPaths := make ([]string , 0 )
210+
211+ // First we write the new chunks
212+ for i , p := range chunks {
213+ path := strings .TrimRight (c .Path , "/" ) + fmt .Sprintf ("/tfstate.%x/%d" , md5 , i )
214+ chunkPaths = append (chunkPaths , path )
215+ _ , err := kv .Put (& consulapi.KVPair {
216+ Key : path ,
217+ Value : p ,
218+ }, nil )
219+
220+ if err != nil {
221+ return err
222+ }
223+ }
224+
225+ // We update the link to point to the new chunks
226+ payload , err = json .Marshal (map [string ]interface {}{
227+ "current-hash" : fmt .Sprintf ("%x" , md5 ),
228+ "chunks" : chunkPaths ,
229+ })
230+ if err != nil {
231+ return err
232+ }
233+ }
234+
235+ var txOps consulapi.KVTxnOps
125236 // KV.Put doesn't return the new index, so we use a single operation
126237 // transaction to get the new index with a single request.
127- txOps : = consulapi.KVTxnOps {
238+ txOps = consulapi.KVTxnOps {
128239 & consulapi.KVTxnOp {
129240 Verb : verb ,
130241 Key : c .Path ,
@@ -137,7 +248,6 @@ func (c *RemoteClient) Put(data []byte) error {
137248 if err != nil {
138249 return err
139250 }
140-
141251 // transaction was rolled back
142252 if ! ok {
143253 return fmt .Errorf ("consul CAS failed with transaction errors: %v" , resp .Errors )
@@ -149,6 +259,10 @@ func (c *RemoteClient) Put(data []byte) error {
149259 }
150260
151261 c .modifyIndex = resp .Results [0 ].ModifyIndex
262+
263+ // We remove all the old chunks
264+ cleanupOldChunks ()
265+
152266 return nil
153267}
154268
@@ -157,7 +271,20 @@ func (c *RemoteClient) Delete() error {
157271 defer c .mu .Unlock ()
158272
159273 kv := c .Client .KV ()
160- _ , err := kv .Delete (c .Path , nil )
274+
275+ chunked , hash , _ , _ , err := c .chunkedMode ()
276+ if err != nil {
277+ return err
278+ }
279+
280+ _ , err = kv .Delete (c .Path , nil )
281+
282+ // If there were chunks we need to remove them
283+ if chunked {
284+ path := strings .TrimRight (c .Path , "/" ) + fmt .Sprintf ("/tfstate.%s/" , hash )
285+ kv .DeleteTree (path , nil )
286+ }
287+
161288 return err
162289}
163290
@@ -466,3 +593,42 @@ func uncompressState(data []byte) ([]byte, error) {
466593 }
467594 return b .Bytes (), nil
468595}
596+
597+ func split (payload []byte , limit int ) [][]byte {
598+ var chunk []byte
599+ chunks := make ([][]byte , 0 , len (payload )/ limit + 1 )
600+ for len (payload ) >= limit {
601+ chunk , payload = payload [:limit ], payload [limit :]
602+ chunks = append (chunks , chunk )
603+ }
604+ if len (payload ) > 0 {
605+ chunks = append (chunks , payload [:])
606+ }
607+ return chunks
608+ }
609+
610+ func (c * RemoteClient ) chunkedMode () (bool , string , []string , * consulapi.KVPair , error ) {
611+ kv := c .Client .KV ()
612+ pair , _ , err := kv .Get (c .Path , nil )
613+ if err != nil {
614+ return false , "" , nil , pair , err
615+ }
616+ if pair != nil {
617+ var d map [string ]interface {}
618+ err = json .Unmarshal (pair .Value , & d )
619+ // If there is an error when unmarshaling the payload, the state has
620+ // probably been gziped in single entry mode.
621+ if err == nil {
622+ // If we find the "current-hash" key we were in chunked mode
623+ hash , ok := d ["current-hash" ]
624+ if ok {
625+ chunks := make ([]string , 0 )
626+ for _ , c := range d ["chunks" ].([]interface {}) {
627+ chunks = append (chunks , c .(string ))
628+ }
629+ return true , hash .(string ), chunks , pair , nil
630+ }
631+ }
632+ }
633+ return false , "" , nil , pair , nil
634+ }
0 commit comments