Skip to content

Commit b83d2eb

Browse files
author
Jacob Marble
committed
feat(backup): influx backup creates data backup
1 parent 9151384 commit b83d2eb

File tree

20 files changed

+579
-23
lines changed

20 files changed

+579
-23
lines changed

authorizer/authorize.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,23 @@ func IsAllowed(ctx context.Context, p influxdb.Permission) error {
2525

2626
return nil
2727
}
28+
29+
// IsAllowedAll checks to see if an action is authorized by ALL permissions.
30+
// Also see IsAllowed.
31+
func IsAllowedAll(ctx context.Context, permissions []influxdb.Permission) error {
32+
a, err := influxdbcontext.GetAuthorizer(ctx)
33+
if err != nil {
34+
return err
35+
}
36+
37+
for _, p := range permissions {
38+
if !a.Allowed(p) {
39+
return &influxdb.Error{
40+
Code: influxdb.EUnauthorized,
41+
Msg: fmt.Sprintf("%s is unauthorized", p),
42+
}
43+
}
44+
}
45+
46+
return nil
47+
}

authorizer/backup.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package authorizer
2+
3+
import (
4+
"context"
5+
"io"
6+
7+
"github.com/influxdata/influxdb/kit/tracing"
8+
9+
"github.com/influxdata/influxdb"
10+
)
11+
12+
var _ influxdb.BackupService = (*BackupService)(nil)
13+
14+
// BackupService wraps a influxdb.BackupService and authorizes actions
15+
// against it appropriately.
16+
type BackupService struct {
17+
s influxdb.BackupService
18+
}
19+
20+
// NewBackupService constructs an instance of an authorizing backup service.
21+
func NewBackupService(s influxdb.BackupService) *BackupService {
22+
return &BackupService{
23+
s: s,
24+
}
25+
}
26+
27+
func (b BackupService) CreateBackup(ctx context.Context) (int, []string, error) {
28+
span, ctx := tracing.StartSpanFromContext(ctx)
29+
defer span.Finish()
30+
31+
if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil {
32+
return 0, nil, err
33+
}
34+
return b.s.CreateBackup(ctx)
35+
}
36+
37+
func (b BackupService) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error {
38+
span, ctx := tracing.StartSpanFromContext(ctx)
39+
defer span.Finish()
40+
41+
if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil {
42+
return nil
43+
}
44+
return b.s.FetchBackupFile(ctx, backupID, backupFile, w)
45+
}
46+
47+
func (b BackupService) InternalBackupPath(backupID int) string {
48+
return b.s.InternalBackupPath(backupID)
49+
}

authz.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,16 @@ func OperPermissions() []Permission {
335335
return ps
336336
}
337337

338+
// ReadAllPermissions represents permission to read all data and metadata.
339+
// Like OperPermissions, but allows read-only users.
340+
func ReadAllPermissions() []Permission {
341+
ps := make([]Permission, len(AllResourceTypes))
342+
for i, t := range AllResourceTypes {
343+
ps[i] = Permission{Action: ReadAction, Resource: Resource{Type: t}}
344+
}
345+
return ps
346+
}
347+
338348
// OwnerPermissions are the default permissions for those who own a resource.
339349
func OwnerPermissions(orgID ID) []Permission {
340350
ps := []Permission{}

backup.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package influxdb
2+
3+
import (
4+
"context"
5+
"io"
6+
)
7+
8+
type BackupService interface {
9+
CreateBackup(context.Context) (int, []string, error)
10+
FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error
11+
InternalBackupPath(backupID int) string
12+
}
13+
14+
type KVBackupService interface {
15+
Backup(ctx context.Context, w io.Writer) error
16+
}

bolt/kv.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,16 @@ package bolt
33
import (
44
"context"
55
"fmt"
6+
"io"
67
"os"
78
"path/filepath"
89
"time"
910

1011
bolt "github.com/coreos/bbolt"
12+
"go.uber.org/zap"
13+
1114
"github.com/influxdata/influxdb/kit/tracing"
1215
"github.com/influxdata/influxdb/kv"
13-
"go.uber.org/zap"
1416
)
1517

1618
// KVStore is a kv.Store backed by boltdb.
@@ -125,6 +127,17 @@ func (s *KVStore) Update(ctx context.Context, fn func(tx kv.Tx) error) error {
125127
})
126128
}
127129

130+
// Backup copies all K:Vs to a writer, in BoltDB format.
131+
func (s *KVStore) Backup(ctx context.Context, w io.Writer) error {
132+
span, ctx := tracing.StartSpanFromContext(ctx)
133+
defer span.Finish()
134+
135+
return s.db.View(func(tx *bolt.Tx) error {
136+
_, err := tx.WriteTo(w)
137+
return err
138+
})
139+
}
140+
128141
// Tx is a light wrapper around a boltdb transaction. It implements kv.Tx.
129142
type Tx struct {
130143
tx *bolt.Tx

cmd/influx/backup.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/influxdata/influxdb"
7+
"github.com/influxdata/influxdb/http"
8+
"github.com/spf13/cobra"
9+
"github.com/spf13/viper"
10+
"go.uber.org/multierr"
11+
"os"
12+
"path/filepath"
13+
)
14+
15+
var backupCmd = &cobra.Command{
16+
Use: "backup",
17+
Short: "Backup the data in InfluxDB",
18+
Long: `Backs up data and meta data for the InfluxDB instance. OSS only.`,
19+
RunE: backupF,
20+
}
21+
22+
var backupFlags struct {
23+
Path string
24+
}
25+
26+
func init() {
27+
backupCmd.PersistentFlags().StringVarP(&backupFlags.Path, "path", "p", "", "directory path to write backup files to")
28+
err := viper.BindEnv("PATH")
29+
if err != nil {
30+
panic(err)
31+
}
32+
if h := viper.GetString("PATH"); h != "" {
33+
backupFlags.Path = h
34+
}
35+
}
36+
37+
func newBackupService(flags Flags) (influxdb.BackupService, error) {
38+
return &http.BackupService{
39+
Addr: flags.host,
40+
Token: flags.token,
41+
}, nil
42+
}
43+
44+
func backupF(cmd *cobra.Command, args []string) error {
45+
ctx := context.Background()
46+
47+
if flags.local {
48+
return fmt.Errorf("local flag not supported for backup command")
49+
}
50+
51+
if backupFlags.Path == "" {
52+
return fmt.Errorf("must specify path")
53+
}
54+
55+
err := os.Mkdir(backupFlags.Path, 0770)
56+
if err != nil && !os.IsExist(err) {
57+
return err
58+
}
59+
60+
backupService, err := newBackupService(flags)
61+
if err != nil {
62+
return err
63+
}
64+
65+
id, backupFilenames, err := backupService.CreateBackup(ctx)
66+
if err != nil {
67+
return err
68+
}
69+
70+
fmt.Printf("Backup ID %d contains %d files\n", id, len(backupFilenames))
71+
72+
for _, backupFilename := range backupFilenames {
73+
dest := filepath.Join(backupFlags.Path, backupFilename)
74+
w, err := os.OpenFile(dest, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
75+
if err != nil {
76+
return err
77+
}
78+
err = backupService.FetchBackupFile(ctx, id, backupFilename, w)
79+
if err != nil {
80+
return multierr.Append(err, w.Close())
81+
}
82+
if err = w.Close(); err != nil {
83+
return err
84+
}
85+
}
86+
87+
fmt.Printf("Backup complete")
88+
89+
return nil
90+
}

cmd/influx/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ const maxTCPConnections = 128
3333
func init() {
3434
influxCmd.AddCommand(
3535
authorizationCmd,
36+
backupCmd,
3637
bucketCmd,
3738
deleteCmd,
3839
organizationCmd,

cmd/influxd/launcher/launcher.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
578578

579579
var deleteService platform.DeleteService
580580
var pointsWriter storage.PointsWriter
581+
var backupService platform.BackupService
581582
{
582583
m.engine = storage.NewEngine(m.enginePath, m.StorageConfig, storage.WithRetentionEnforcer(bucketSvc))
583584
m.engine.WithLogger(m.logger)
@@ -591,6 +592,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
591592

592593
pointsWriter = m.engine
593594
deleteService = m.engine
595+
backupService = m.engine
594596

595597
// TODO(cwolff): Figure out a good default per-query memory limit:
596598
// https://github.com/influxdata/influxdb/issues/13642
@@ -801,6 +803,8 @@ func (m *Launcher) run(ctx context.Context) (err error) {
801803
NewQueryService: source.NewQueryService,
802804
PointsWriter: pointsWriter,
803805
DeleteService: deleteService,
806+
BackupService: backupService,
807+
KVBackupService: m.kvService,
804808
AuthorizationService: authSvc,
805809
// Wrap the BucketService in a storage backed one that will ensure deleted buckets are removed from the storage engine.
806810
BucketService: storage.NewBucketService(bucketSvc, m.engine),

http/api_handler.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type APIHandler struct {
2121
influxdb.HTTPErrorHandler
2222
AssetHandler *AssetHandler
2323
AuthorizationHandler *AuthorizationHandler
24+
BackupHandler *BackupHandler
2425
BucketHandler *BucketHandler
2526
CheckHandler *CheckHandler
2627
ChronografHandler *ChronografHandler
@@ -62,6 +63,8 @@ type APIBackend struct {
6263

6364
PointsWriter storage.PointsWriter
6465
DeleteService influxdb.DeleteService
66+
BackupService influxdb.BackupService
67+
KVBackupService influxdb.KVBackupService
6568
AuthorizationService influxdb.AuthorizationService
6669
BucketService influxdb.BucketService
6770
SessionService influxdb.SessionService
@@ -213,6 +216,10 @@ func NewAPIHandler(b *APIBackend, opts ...APIHandlerOptFn) *APIHandler {
213216
deleteBackend := NewDeleteBackend(b)
214217
h.DeleteHandler = NewDeleteHandler(deleteBackend)
215218

219+
backupBackend := NewBackupBackend(b)
220+
backupBackend.BackupService = authorizer.NewBackupService(backupBackend.BackupService)
221+
h.BackupHandler = NewBackupHandler(backupBackend)
222+
216223
fluxBackend := NewFluxBackend(b)
217224
h.QueryHandler = NewFluxHandler(fluxBackend)
218225

@@ -227,40 +234,41 @@ var apiLinks = map[string]interface{}{
227234
// when adding new links, please take care to keep this list alphabetical
228235
// as this makes it easier to verify values against the swagger document.
229236
"authorizations": "/api/v2/authorizations",
237+
"backup": "/api/v2/backup",
230238
"buckets": "/api/v2/buckets",
239+
"checks": "/api/v2/checks",
231240
"dashboards": "/api/v2/dashboards",
241+
"delete": "/api/v2/delete",
232242
"external": map[string]string{
233243
"statusFeed": "https://www.influxdata.com/feed/json",
234244
},
235245
"labels": "/api/v2/labels",
236-
"variables": "/api/v2/variables",
237246
"me": "/api/v2/me",
238-
"notificationRules": "/api/v2/notificationRules",
239247
"notificationEndpoints": "/api/v2/notificationEndpoints",
248+
"notificationRules": "/api/v2/notificationRules",
240249
"orgs": "/api/v2/orgs",
241250
"query": map[string]string{
242251
"self": "/api/v2/query",
243252
"ast": "/api/v2/query/ast",
244253
"analyze": "/api/v2/query/analyze",
245254
"suggestions": "/api/v2/query/suggestions",
246255
},
256+
"scrapers": "/api/v2/scrapers",
247257
"setup": "/api/v2/setup",
248258
"signin": "/api/v2/signin",
249259
"signout": "/api/v2/signout",
250260
"sources": "/api/v2/sources",
251-
"scrapers": "/api/v2/scrapers",
252261
"swagger": "/api/v2/swagger.json",
253262
"system": map[string]string{
254263
"metrics": "/metrics",
255264
"debug": "/debug/pprof",
256265
"health": "/health",
257266
},
258267
"tasks": "/api/v2/tasks",
259-
"checks": "/api/v2/checks",
260268
"telegrafs": "/api/v2/telegrafs",
261269
"users": "/api/v2/users",
270+
"variables": "/api/v2/variables",
262271
"write": "/api/v2/write",
263-
"delete": "/api/v2/delete",
264272
}
265273

266274
func (h *APIHandler) serveLinks(w http.ResponseWriter, r *http.Request) {
@@ -303,6 +311,11 @@ func (h *APIHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
303311
return
304312
}
305313

314+
if strings.HasPrefix(r.URL.Path, "/api/v2/backup") {
315+
h.BackupHandler.ServeHTTP(w, r)
316+
return
317+
}
318+
306319
if strings.HasPrefix(r.URL.Path, "/api/v2/query") {
307320
h.QueryHandler.ServeHTTP(w, r)
308321
return

0 commit comments

Comments
 (0)