Skip to content

Commit 6dbfc97

Browse files
author
Jacob Marble
committed
feat(backup): influx backup creates data backup
1 parent 8846882 commit 6dbfc97

File tree

23 files changed

+609
-22
lines changed

23 files changed

+609
-22
lines changed

authorizer/authorize.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,23 @@ import (
1111
// IsAllowed checks to see if an action is authorized by retrieving the authorizer
1212
// off of context and authorizing the action appropriately.
1313
func IsAllowed(ctx context.Context, p influxdb.Permission) error {
14+
return IsAllowedAll(ctx, []influxdb.Permission{p})
15+
}
16+
17+
// IsAllowedAll checks to see if an action is authorized by ALL permissions.
18+
// Also see IsAllowed.
19+
func IsAllowedAll(ctx context.Context, permissions []influxdb.Permission) error {
1420
a, err := influxdbcontext.GetAuthorizer(ctx)
1521
if err != nil {
1622
return err
1723
}
1824

19-
if !a.Allowed(p) {
20-
return &influxdb.Error{
21-
Code: influxdb.EUnauthorized,
22-
Msg: fmt.Sprintf("%s is unauthorized", p),
25+
for _, p := range permissions {
26+
if !a.Allowed(p) {
27+
return &influxdb.Error{
28+
Code: influxdb.EUnauthorized,
29+
Msg: fmt.Sprintf("%s is unauthorized", p),
30+
}
2331
}
2432
}
2533

authorizer/backup.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package authorizer
2+
3+
import (
4+
"context"
5+
"io"
6+
7+
"github.com/influxdata/influxdb"
8+
"github.com/influxdata/influxdb/kit/tracing"
9+
)
10+
11+
var _ influxdb.BackupService = (*BackupService)(nil)
12+
13+
// BackupService wraps a influxdb.BackupService and authorizes actions
14+
// against it appropriately.
15+
type BackupService struct {
16+
s influxdb.BackupService
17+
}
18+
19+
// NewBackupService constructs an instance of an authorizing backup service.
20+
func NewBackupService(s influxdb.BackupService) *BackupService {
21+
return &BackupService{
22+
s: s,
23+
}
24+
}
25+
26+
func (b BackupService) CreateBackup(ctx context.Context) (int, []string, error) {
27+
span, ctx := tracing.StartSpanFromContext(ctx)
28+
defer span.Finish()
29+
30+
if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil {
31+
return 0, nil, err
32+
}
33+
return b.s.CreateBackup(ctx)
34+
}
35+
36+
func (b BackupService) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error {
37+
span, ctx := tracing.StartSpanFromContext(ctx)
38+
defer span.Finish()
39+
40+
if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil {
41+
return nil
42+
}
43+
return b.s.FetchBackupFile(ctx, backupID, backupFile, w)
44+
}
45+
46+
func (b BackupService) InternalBackupPath(backupID int) string {
47+
return b.s.InternalBackupPath(backupID)
48+
}

authz.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,16 @@ func OperPermissions() []Permission {
335335
return ps
336336
}
337337

338+
// ReadAllPermissions represents permission to read all data and metadata.
339+
// Like OperPermissions, but allows read-only users.
340+
func ReadAllPermissions() []Permission {
341+
ps := make([]Permission, len(AllResourceTypes))
342+
for i, t := range AllResourceTypes {
343+
ps[i] = Permission{Action: ReadAction, Resource: Resource{Type: t}}
344+
}
345+
return ps
346+
}
347+
338348
// OwnerPermissions are the default permissions for those who own a resource.
339349
func OwnerPermissions(orgID ID) []Permission {
340350
ps := []Permission{}

backup.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package influxdb
2+
3+
import (
4+
"context"
5+
"io"
6+
)
7+
8+
// BackupService represents the data backup functions of InfluxDB.
9+
type BackupService interface {
10+
// CreateBackup creates a local copy (hard links) of the TSM data for all orgs and buckets.
11+
// The return values are used to download each backup file.
12+
CreateBackup(context.Context) (backupID int, backupFiles []string, err error)
13+
// FetchBackupFile downloads one backup file, data or metadata.
14+
FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error
15+
// InternalBackupPath is a utility to determine the on-disk location of a backup fileset.
16+
InternalBackupPath(backupID int) string
17+
}
18+
19+
// KVBackupService represents the meta data backup functions of InfluxDB.
20+
type KVBackupService interface {
21+
// Backup creates a live backup copy of the metadata database.
22+
Backup(ctx context.Context, w io.Writer) error
23+
}

bolt/bbolt.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,12 @@ import (
1414
"go.uber.org/zap"
1515
)
1616

17-
// OpPrefix is the prefix for bolt ops
18-
const OpPrefix = "bolt/"
17+
const (
18+
// OpPrefix is the prefix for bolt ops
19+
OpPrefix = "bolt/"
20+
21+
DefaultFilename = "influxd.bolt"
22+
)
1923

2024
func getOp(op string) string {
2125
return OpPrefix + op

bolt/kv.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package bolt
33
import (
44
"context"
55
"fmt"
6+
"io"
67
"os"
78
"path/filepath"
89
"time"
@@ -120,6 +121,17 @@ func (s *KVStore) Update(ctx context.Context, fn func(tx kv.Tx) error) error {
120121
})
121122
}
122123

124+
// Backup copies all K:Vs to a writer, in BoltDB format.
125+
func (s *KVStore) Backup(ctx context.Context, w io.Writer) error {
126+
span, _ := tracing.StartSpanFromContext(ctx)
127+
defer span.Finish()
128+
129+
return s.db.View(func(tx *bolt.Tx) error {
130+
_, err := tx.WriteTo(w)
131+
return err
132+
})
133+
}
134+
123135
// Tx is a light wrapper around a boltdb transaction. It implements kv.Tx.
124136
type Tx struct {
125137
tx *bolt.Tx

cmd/influx/backup.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"path/filepath"
8+
9+
"github.com/influxdata/influxdb"
10+
"github.com/influxdata/influxdb/bolt"
11+
"github.com/influxdata/influxdb/http"
12+
"github.com/spf13/cobra"
13+
"github.com/spf13/viper"
14+
"go.uber.org/multierr"
15+
)
16+
17+
var backupCmd = &cobra.Command{
18+
Use: "backup",
19+
Short: "Backup the data in InfluxDB",
20+
Long: fmt.Sprintf(
21+
`Backs up data and meta data for the running InfluxDB instance.
22+
Downloaded files are written to the directory indicated by --path.
23+
The target directory, and any parent directories, are created automatically.
24+
Data file have extension .tsm; meta data is written to %s in the same directory.`,
25+
bolt.DefaultFilename),
26+
RunE: backupF,
27+
}
28+
29+
var backupFlags struct {
30+
Path string
31+
}
32+
33+
func init() {
34+
backupCmd.PersistentFlags().StringVarP(&backupFlags.Path, "path", "p", "", "directory path to write backup files to")
35+
err := viper.BindEnv("PATH")
36+
if err != nil {
37+
panic(err)
38+
}
39+
if h := viper.GetString("PATH"); h != "" {
40+
backupFlags.Path = h
41+
}
42+
}
43+
44+
func newBackupService(flags Flags) (influxdb.BackupService, error) {
45+
return &http.BackupService{
46+
Addr: flags.host,
47+
Token: flags.token,
48+
}, nil
49+
}
50+
51+
func backupF(cmd *cobra.Command, args []string) error {
52+
ctx := context.Background()
53+
54+
if flags.local {
55+
return fmt.Errorf("local flag not supported for backup command")
56+
}
57+
58+
if backupFlags.Path == "" {
59+
return fmt.Errorf("must specify path")
60+
}
61+
62+
err := os.MkdirAll(backupFlags.Path, 0770)
63+
if err != nil && !os.IsExist(err) {
64+
return err
65+
}
66+
67+
backupService, err := newBackupService(flags)
68+
if err != nil {
69+
return err
70+
}
71+
72+
id, backupFilenames, err := backupService.CreateBackup(ctx)
73+
if err != nil {
74+
return err
75+
}
76+
77+
fmt.Printf("Backup ID %d contains %d files\n", id, len(backupFilenames))
78+
79+
for _, backupFilename := range backupFilenames {
80+
dest := filepath.Join(backupFlags.Path, backupFilename)
81+
w, err := os.OpenFile(dest, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
82+
if err != nil {
83+
return err
84+
}
85+
err = backupService.FetchBackupFile(ctx, id, backupFilename, w)
86+
if err != nil {
87+
return multierr.Append(err, w.Close())
88+
}
89+
if err = w.Close(); err != nil {
90+
return err
91+
}
92+
}
93+
94+
fmt.Printf("Backup complete")
95+
96+
return nil
97+
}

cmd/influx/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ func influxCmd() *cobra.Command {
8585
}
8686
cmd.AddCommand(
8787
authCmd(),
88+
backupCmd,
8889
bucketCmd,
8990
deleteCmd,
9091
organizationCmd(),

cmd/influxd/launcher/engine.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package launcher
22

33
import (
44
"context"
5+
"io"
56
"io/ioutil"
67
"os"
78
"sync"
@@ -29,6 +30,7 @@ type Engine interface {
2930
storage.PointsWriter
3031
storage.BucketDeleter
3132
prom.PrometheusCollector
33+
influxdb.BackupService
3234

3335
SeriesCardinality() int64
3436

@@ -165,3 +167,15 @@ func (t *TemporaryEngine) Flush(ctx context.Context) {
165167
t.log.Fatal("unable to open engine", zap.Error(err))
166168
}
167169
}
170+
171+
func (t *TemporaryEngine) CreateBackup(ctx context.Context) (int, []string, error) {
172+
return t.engine.CreateBackup(ctx)
173+
}
174+
175+
func (t *TemporaryEngine) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error {
176+
return t.engine.FetchBackupFile(ctx, backupID, backupFile, w)
177+
}
178+
179+
func (t *TemporaryEngine) InternalBackupPath(backupID int) string {
180+
return t.engine.InternalBackupPath(backupID)
181+
}

cmd/influxd/launcher/launcher.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ func buildLauncherCommand(l *Launcher, cmd *cobra.Command) {
146146
{
147147
DestP: &l.boltPath,
148148
Flag: "bolt-path",
149-
Default: filepath.Join(dir, "influxd.bolt"),
149+
Default: filepath.Join(dir, bolt.DefaultFilename),
150150
Desc: "path to boltdb database",
151151
},
152152
{
@@ -590,6 +590,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
590590
var (
591591
deleteService platform.DeleteService = m.engine
592592
pointsWriter storage.PointsWriter = m.engine
593+
backupService platform.BackupService = m.engine
593594
)
594595

595596
// TODO(cwolff): Figure out a good default per-query memory limit:
@@ -796,6 +797,8 @@ func (m *Launcher) run(ctx context.Context) (err error) {
796797
NewQueryService: source.NewQueryService,
797798
PointsWriter: pointsWriter,
798799
DeleteService: deleteService,
800+
BackupService: backupService,
801+
KVBackupService: m.kvService,
799802
AuthorizationService: authSvc,
800803
// Wrap the BucketService in a storage backed one that will ensure deleted buckets are removed from the storage engine.
801804
BucketService: storage.NewBucketService(bucketSvc, m.engine),

cmd/influxd/launcher/launcher_helpers.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/influxdata/flux"
1818
"github.com/influxdata/flux/lang"
1919
platform "github.com/influxdata/influxdb"
20+
"github.com/influxdata/influxdb/bolt"
2021
influxdbcontext "github.com/influxdata/influxdb/context"
2122
"github.com/influxdata/influxdb/http"
2223
"github.com/influxdata/influxdb/kv"
@@ -79,7 +80,7 @@ func RunTestLauncherOrFail(tb testing.TB, ctx context.Context, args ...string) *
7980

8081
// Run executes the program with additional arguments to set paths and ports.
8182
func (tl *TestLauncher) Run(ctx context.Context, args ...string) error {
82-
args = append(args, "--bolt-path", filepath.Join(tl.Path, "influxd.bolt"))
83+
args = append(args, "--bolt-path", filepath.Join(tl.Path, bolt.DefaultFilename))
8384
args = append(args, "--engine-path", filepath.Join(tl.Path, "engine"))
8485
args = append(args, "--http-bind-address", "127.0.0.1:0")
8586
args = append(args, "--log-level", "debug")

http/api_handler.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ type APIBackend struct {
3636

3737
PointsWriter storage.PointsWriter
3838
DeleteService influxdb.DeleteService
39+
BackupService influxdb.BackupService
40+
KVBackupService influxdb.KVBackupService
3941
AuthorizationService influxdb.AuthorizationService
4042
BucketService influxdb.BucketService
4143
SessionService influxdb.SessionService
@@ -197,6 +199,10 @@ func NewAPIHandler(b *APIBackend, opts ...APIHandlerOptFn) *APIHandler {
197199
variableBackend.VariableService = authorizer.NewVariableService(b.VariableService)
198200
h.Mount(prefixVariables, NewVariableHandler(b.Logger, variableBackend))
199201

202+
backupBackend := NewBackupBackend(b)
203+
backupBackend.BackupService = authorizer.NewBackupService(backupBackend.BackupService)
204+
h.Mount(prefixBackup, NewBackupHandler(backupBackend))
205+
200206
writeBackend := NewWriteBackend(b.Logger.With(zap.String("handler", "write")), b)
201207
h.Mount(prefixWrite, NewWriteHandler(b.Logger, writeBackend))
202208

@@ -210,6 +216,7 @@ var apiLinks = map[string]interface{}{
210216
// when adding new links, please take care to keep this list alphabetical
211217
// as this makes it easier to verify values against the swagger document.
212218
"authorizations": "/api/v2/authorizations",
219+
"backup": "/api/v2/backup",
213220
"buckets": "/api/v2/buckets",
214221
"dashboards": "/api/v2/dashboards",
215222
"external": map[string]string{

0 commit comments

Comments
 (0)