Skip to content

[sql-9] sessions: small sql preparations #967

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 0 additions & 127 deletions session/db.go

This file was deleted.

9 changes: 9 additions & 0 deletions session/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package session

import "errors"

var (
// ErrSessionNotFound is an error returned when we attempt to retrieve
// information about a session but it is not found.
ErrSessionNotFound = errors.New("session not found")
)
141 changes: 127 additions & 14 deletions session/store.go → session/kvdb_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package session

import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"os"
"path/filepath"
"time"

"github.com/btcsuite/btcd/btcec/v2"
Expand Down Expand Up @@ -46,15 +49,125 @@ var (
// IDs associated with the given group ID.
sessionIDKey = []byte("session-id")

// ErrSessionNotFound is an error returned when we attempt to retrieve
// information about a session but it is not found.
ErrSessionNotFound = errors.New("session not found")

// ErrDBInitErr is returned when a bucket that we expect to have been
// set up during DB initialisation is not found.
ErrDBInitErr = errors.New("db did not initialise properly")

// byteOrder is the default byte order we'll use for serialization
// within the database.
byteOrder = binary.BigEndian
)

const (
// DBFilename is the default filename of the session database.
DBFilename = "session.db"

// dbFilePermission is the default permission the session database file
// is created with.
dbFilePermission = 0600

// DefaultSessionDBTimeout is the default maximum time we wait for the
// session bbolt database to be opened. If the database is already
// opened by another process, the unique lock cannot be obtained. With
// the timeout we error out after the given time instead of just
// blocking for forever.
DefaultSessionDBTimeout = 5 * time.Second
)

// BoltStore is a bolt-backed persistent store.
type BoltStore struct {
*bbolt.DB
}

// A compile-time check to ensure that BoltStore implements the Store interface.
var _ Store = (*BoltStore)(nil)

// NewDB creates a new bolt database that can be found at the given directory.
func NewDB(dir, fileName string) (*BoltStore, error) {
firstInit := false
path := filepath.Join(dir, fileName)

// If the database file does not exist yet, create its directory.
if !fileExists(path) {
if err := os.MkdirAll(dir, 0700); err != nil {
return nil, err
}
firstInit = true
}

db, err := initDB(path, firstInit)
if err != nil {
return nil, err
}

// Attempt to sync the database's current version with the latest known
// version available.
if err := syncVersions(db); err != nil {
return nil, err
}

return &BoltStore{DB: db}, nil
}

// fileExists reports whether the named file or directory exists.
func fileExists(path string) bool {
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
return false
}
}
return true
}

// initDB initializes all the required top-level buckets for the database.
func initDB(filepath string, firstInit bool) (*bbolt.DB, error) {
db, err := bbolt.Open(filepath, dbFilePermission, &bbolt.Options{
Timeout: DefaultSessionDBTimeout,
})
if err == bbolt.ErrTimeout {
return nil, fmt.Errorf("error while trying to open %s: timed "+
"out after %v when trying to obtain exclusive lock",
filepath, DefaultSessionDBTimeout)
}
if err != nil {
return nil, err
}

err = db.Update(func(tx *bbolt.Tx) error {
if firstInit {
metadataBucket, err := tx.CreateBucketIfNotExists(
metadataBucketKey,
)
if err != nil {
return err
}
err = setDBVersion(metadataBucket, latestDBVersion)
if err != nil {
return err
}
}

sessionBkt, err := tx.CreateBucketIfNotExists(sessionBucketKey)
if err != nil {
return err
}

_, err = sessionBkt.CreateBucketIfNotExists(idIndexKey)
if err != nil {
return err
}

_, err = sessionBkt.CreateBucketIfNotExists(groupIDIndexKey)

return err
})
if err != nil {
return nil, err
}

return db, nil
}

// getSessionKey returns the key for a session.
func getSessionKey(session *Session) []byte {
return session.LocalPublicKey.SerializeCompressed()
Expand All @@ -64,7 +177,7 @@ func getSessionKey(session *Session) []byte {
// local public key already exists an error is returned.
//
// NOTE: this is part of the Store interface.
func (db *DB) CreateSession(session *Session) error {
func (db *BoltStore) CreateSession(session *Session) error {
var buf bytes.Buffer
if err := SerializeSession(&buf, session); err != nil {
return err
Expand Down Expand Up @@ -158,7 +271,7 @@ func (db *DB) CreateSession(session *Session) error {
// to the session with the given local pub key.
//
// NOTE: this is part of the Store interface.
func (db *DB) UpdateSessionRemotePubKey(localPubKey,
func (db *BoltStore) UpdateSessionRemotePubKey(localPubKey,
remotePubKey *btcec.PublicKey) error {

key := localPubKey.SerializeCompressed()
Expand Down Expand Up @@ -196,7 +309,7 @@ func (db *DB) UpdateSessionRemotePubKey(localPubKey,
// GetSession fetches the session with the given key.
//
// NOTE: this is part of the Store interface.
func (db *DB) GetSession(key *btcec.PublicKey) (*Session, error) {
func (db *BoltStore) GetSession(key *btcec.PublicKey) (*Session, error) {
var session *Session
err := db.View(func(tx *bbolt.Tx) error {
sessionBucket, err := getBucket(tx, sessionBucketKey)
Expand Down Expand Up @@ -226,7 +339,7 @@ func (db *DB) GetSession(key *btcec.PublicKey) (*Session, error) {
// ListSessions returns all sessions currently known to the store.
//
// NOTE: this is part of the Store interface.
func (db *DB) ListSessions(filterFn func(s *Session) bool) ([]*Session, error) {
func (db *BoltStore) ListSessions(filterFn func(s *Session) bool) ([]*Session, error) {
var sessions []*Session
err := db.View(func(tx *bbolt.Tx) error {
sessionBucket, err := getBucket(tx, sessionBucketKey)
Expand Down Expand Up @@ -266,7 +379,7 @@ func (db *DB) ListSessions(filterFn func(s *Session) bool) ([]*Session, error) {
// public key to be revoked.
//
// NOTE: this is part of the Store interface.
func (db *DB) RevokeSession(key *btcec.PublicKey) error {
func (db *BoltStore) RevokeSession(key *btcec.PublicKey) error {
var session *Session
return db.Update(func(tx *bbolt.Tx) error {
sessionBucket, err := getBucket(tx, sessionBucketKey)
Expand Down Expand Up @@ -299,7 +412,7 @@ func (db *DB) RevokeSession(key *btcec.PublicKey) error {
// GetSessionByID fetches the session with the given ID.
//
// NOTE: this is part of the Store interface.
func (db *DB) GetSessionByID(id ID) (*Session, error) {
func (db *BoltStore) GetSessionByID(id ID) (*Session, error) {
var session *Session
err := db.View(func(tx *bbolt.Tx) error {
sessionBucket, err := getBucket(tx, sessionBucketKey)
Expand Down Expand Up @@ -337,7 +450,7 @@ func (db *DB) GetSessionByID(id ID) (*Session, error) {
// used or discarded.
//
// NOTE: this is part of the Store interface.
func (db *DB) GetUnusedIDAndKeyPair() (ID, *btcec.PrivateKey, error) {
func (db *BoltStore) GetUnusedIDAndKeyPair() (ID, *btcec.PrivateKey, error) {
var (
id ID
privKey *btcec.PrivateKey
Expand Down Expand Up @@ -383,7 +496,7 @@ func (db *DB) GetUnusedIDAndKeyPair() (ID, *btcec.PrivateKey, error) {
// GetGroupID will return the group ID for the given session ID.
//
// NOTE: this is part of the IDToGroupIndex interface.
func (db *DB) GetGroupID(sessionID ID) (ID, error) {
func (db *BoltStore) GetGroupID(sessionID ID) (ID, error) {
var groupID ID
err := db.View(func(tx *bbolt.Tx) error {
sessionBkt, err := getBucket(tx, sessionBucketKey)
Expand Down Expand Up @@ -423,7 +536,7 @@ func (db *DB) GetGroupID(sessionID ID) (ID, error) {
// group with the given ID.
//
// NOTE: this is part of the IDToGroupIndex interface.
func (db *DB) GetSessionIDs(groupID ID) ([]ID, error) {
func (db *BoltStore) GetSessionIDs(groupID ID) ([]ID, error) {
var (
sessionIDs []ID
err error
Expand All @@ -450,7 +563,7 @@ func (db *DB) GetSessionIDs(groupID ID) ([]ID, error) {
// each session passes.
//
// NOTE: this is part of the Store interface.
func (db *DB) CheckSessionGroupPredicate(groupID ID,
func (db *BoltStore) CheckSessionGroupPredicate(groupID ID,
fn func(s *Session) bool) (bool, error) {

var (
Expand Down
6 changes: 1 addition & 5 deletions session_rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type sessionRpcServer struct {
// sessionRpcServerConfig holds the values used to configure the
// sessionRpcServer.
type sessionRpcServerConfig struct {
db *session.DB
db session.Store
basicAuth string
grpcOptions []grpc.ServerOption
registerGrpcServers func(server *grpc.Server)
Expand Down Expand Up @@ -175,10 +175,6 @@ func (s *sessionRpcServer) start(ctx context.Context) error {
func (s *sessionRpcServer) stop() error {
var returnErr error
s.stopOnce.Do(func() {
if err := s.cfg.db.Close(); err != nil {
log.Errorf("Error closing session DB: %v", err)
returnErr = err
}
s.sessionServer.Stop()

close(s.quit)
Expand Down
Loading
Loading