Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/livepeer/starter/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func NewLivepeerConfig(fs *flag.FlagSet) LivepeerConfig {
cfg.MaxTotalEV = fs.String("maxTotalEV", *cfg.MaxTotalEV, "The maximum acceptable expected value for one PM payment")
// Broadcaster deposit multiplier to determine max acceptable ticket faceValue
cfg.DepositMultiplier = fs.Int("depositMultiplier", *cfg.DepositMultiplier, "The deposit multiplier used to determine max acceptable faceValue for PM tickets")
cfg.IgnoreSenderReserve = fs.Bool("IgnoreSenderReserve", *cfg.IgnoreSenderReserve, "Skip sender reserve validation and rely solely on broadcaster deposits covering ticket face value (unsafe)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
cfg.IgnoreSenderReserve = fs.Bool("IgnoreSenderReserve", *cfg.IgnoreSenderReserve, "Skip sender reserve validation and rely solely on broadcaster deposits covering ticket face value (unsafe)")
cfg.IgnoreSenderReserve = fs.Bool("ignoreSenderReserve", *cfg.IgnoreSenderReserve, "Ignore sender reserve; lowers gateway reserve needs but allows double-spending risk (unsafe)")

// Orchestrator base pricing info
cfg.PricePerUnit = fs.String("pricePerUnit", "0", "The price per 'pixelsPerUnit' amount pixels. Can be specified in wei or a custom currency in the format <price><currency> (e.g. 0.50USD). When using a custom currency, a corresponding price feed must be configured with -priceFeedAddr")
// Unit of pixels for both O's pricePerUnit and B's maxPricePerUnit
Expand Down
13 changes: 10 additions & 3 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ type LivepeerConfig struct {
MaxTicketEV *string
MaxTotalEV *string
DepositMultiplier *int
IgnoreSenderReserve *bool
PricePerUnit *string
PixelsPerUnit *string
PriceFeedAddr *string
Expand Down Expand Up @@ -263,6 +264,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
defaultMaxPricePerUnit := "0"
defaultMaxPricePerCapability := ""
defaultIgnoreMaxPriceIfNeeded := false
defaultIgnoreSenderReserve := false
defaultPixelsPerUnit := "1"
defaultPriceFeedAddr := "0x639Fe6ab55C921f74e7fac1ee960C0B6293ba612" // ETH / USD price feed address on Arbitrum Mainnet
defaultAutoAdjustPrice := true
Expand Down Expand Up @@ -378,6 +380,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
MaxTicketEV: &defaultMaxTicketEV,
MaxTotalEV: &defaultMaxTotalEV,
DepositMultiplier: &defaultDepositMultiplier,
IgnoreSenderReserve: &defaultIgnoreSenderReserve,
MaxPricePerUnit: &defaultMaxPricePerUnit,
MaxPricePerCapability: &defaultMaxPricePerCapability,
IgnoreMaxPriceIfNeeded: &defaultIgnoreMaxPriceIfNeeded,
Expand Down Expand Up @@ -1015,9 +1018,10 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
defer sm.Stop()

tcfg := pm.TicketParamsConfig{
EV: ev,
RedeemGas: redeemGas,
TxCostMultiplier: txCostMultiplier,
EV: ev,
RedeemGas: redeemGas,
TxCostMultiplier: txCostMultiplier,
IgnoreSenderReserve: *cfg.IgnoreSenderReserve,
}
n.Recipient, err = pm.NewRecipient(
recipientAddr,
Expand All @@ -1032,6 +1036,9 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
glog.Errorf("Error setting up PM recipient: %v", err)
return
}
if *cfg.IgnoreSenderReserve {
glog.Warning("Sender reserve requirements disabled; relying on broadcaster deposit to cover ticket face value. Double-spend protection is reduced.")
}
mfv, _ := new(big.Int).SetString(*cfg.MaxFaceValue, 10)
if mfv == nil {
panic(fmt.Errorf("-maxFaceValue must be a valid integer, but %v provided. Restart the node with a different valid value for -maxFaceValue", *cfg.MaxFaceValue))
Expand Down
37 changes: 27 additions & 10 deletions pm/recipient.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ type TicketParamsConfig struct {
// TxCostMultiplier is the desired multiplier of the transaction
// cost for redemption
TxCostMultiplier int

// IgnoreSenderReserve instructs the recipient to skip sender reserve checks and accept tickets as long as the computed face value meets EV requirements
IgnoreSenderReserve bool
}

// GasPriceMonitor defines methods for monitoring gas prices
Expand Down Expand Up @@ -271,16 +274,28 @@ func (r *recipient) faceValue(sender ethcommon.Address) (*big.Int, error) {
faceValue = new(big.Int).Mul(r.cfg.EV, evMultiplier)
}

// Fetch current max float for sender
maxFloat, err := r.sm.MaxFloat(sender)
if err != nil {
return nil, err
}
var maxFloat *big.Int
if !r.cfg.IgnoreSenderReserve {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: how about revering the condition and check here if r.cfgSenderReserve and then have the logic for ignoring here and the rest in the else statement. Super nit, but it's harder to read when you have two reverse conditions (not and ignore, which effectively means if use sender reserve).

// Fetch current max float for sender
var err error
maxFloat, err = r.sm.MaxFloat(sender)
if err != nil {
return nil, err
}

if faceValue.Cmp(maxFloat) > 0 {
// If faceValue > maxFloat
// Set faceValue = maxFloat
faceValue = maxFloat
if faceValue.Cmp(maxFloat) > 0 {
// If faceValue > maxFloat
// Set faceValue = maxFloat
faceValue = maxFloat
}
} else {
available, err := r.sm.SenderFunds(sender)
if err != nil {
return nil, err
}
if available.Cmp(faceValue) < 0 {
return nil, errInsufficientSenderReserve
}
}

if r.maxfacevalue.Cmp(big.NewInt(0)) > 0 {
Expand All @@ -290,7 +305,9 @@ func (r *recipient) faceValue(sender ethcommon.Address) (*big.Int, error) {
}
if monitor.Enabled {
monitor.TicketFaceValue(sender.Hex(), faceValue)
monitor.MaxFloat(sender.Hex(), maxFloat)
if !r.cfg.IgnoreSenderReserve && maxFloat != nil {
monitor.MaxFloat(sender.Hex(), maxFloat)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so we monitor this metric only fi the IgnoreSenerReserve is false (default). Is that correct? I think it should be ok, but just want to confirm that's the intention.

}
}
if faceValue.Cmp(r.cfg.EV) < 0 {
return nil, errInsufficientSenderReserve
Expand Down
15 changes: 15 additions & 0 deletions pm/recipient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,21 @@ func TestTicketParams(t *testing.T) {
_, err = r.TicketParams(sender, big.NewRat(1, 1))
assert.EqualError(err, errInsufficientSenderReserve.Error())

// Test ignoring sender reserve requirements bypasses maxFloat enforcement
cfg.IgnoreSenderReserve = true
sm.maxFloat = big.NewInt(100)
sm.deposit = new(big.Int).Exp(big.NewInt(10), big.NewInt(18), nil)
rIgnore := NewRecipientWithSecret(recipient, b, v, gm, sm, tm, secret, cfg)
paramsIgnore, err := rIgnore.TicketParams(sender, big.NewRat(1, 1))
require.Nil(err)
assert.True(paramsIgnore.FaceValue.Cmp(sm.maxFloat) > 0)

// Test ignoring sender reserve requirements errors if deposit < faceValue
sm.deposit = new(big.Int).Sub(paramsIgnore.FaceValue, big.NewInt(1))
_, err = rIgnore.TicketParams(sender, big.NewRat(1, 1))
assert.EqualError(err, errInsufficientSenderReserve.Error())
cfg.IgnoreSenderReserve = false

// Test faceValue < txCostWithGasPrice(current gasPrice) and faceValue > txCostWithGasPrice(avg gasPrice)
// Set current gasPrice higher than avg gasPrice
gm.gasPrice = new(big.Int).Add(avgGasPrice, big.NewInt(1))
Expand Down
32 changes: 32 additions & 0 deletions pm/sendermonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type SenderMonitor interface {
// MaxFloat returns a remote sender's max float
MaxFloat(addr ethcommon.Address) (*big.Int, error)

// SenderFunds returns the broadcaster's spendable balance (deposit + total reserve - pending tickets)
SenderFunds(addr ethcommon.Address) (*big.Int, error)

// ValidateSender checks whether a sender's unlock period ends the round after the next round
ValidateSender(addr ethcommon.Address) error
}
Expand Down Expand Up @@ -162,6 +165,35 @@ func (sm *LocalSenderMonitor) MaxFloat(addr ethcommon.Address) (*big.Int, error)
return sm.maxFloat(addr)
}

// SenderFunds returns the sender's deposit plus total reserve minus pending tickets
func (sm *LocalSenderMonitor) SenderFunds(addr ethcommon.Address) (*big.Int, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance it's possible to add unit tests for this?

sm.mu.Lock()
defer sm.mu.Unlock()

sm.ensureCache(addr)

info, err := sm.smgr.GetSenderInfo(addr)
if err != nil {
return nil, err
}

totalReserve := new(big.Int).Set(info.Reserve.FundsRemaining)
if info.Reserve.ClaimedInCurrentRound != nil {
totalReserve.Add(totalReserve, info.Reserve.ClaimedInCurrentRound)
}

available := new(big.Int).Set(totalReserve)
if info.Deposit != nil {
available.Add(available, info.Deposit)
}
available.Sub(available, sm.senders[addr].pendingAmount)
Comment on lines +180 to +189
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment here or in the function comment explaining the calculations here?

I don't exactly get it, so the sender funds are calculated as:

funds = deposit + reserve + claimed in current round in pending amount

I think the most tricky part is why we add this claimed in current round.

if available.Sign() < 0 {
available = big.NewInt(0)
}

return available, nil
}

// QueueTicket adds a ticket to the queue for a remote sender
func (sm *LocalSenderMonitor) QueueTicket(ticket *SignedTicket) error {
sm.mu.Lock()
Expand Down
10 changes: 10 additions & 0 deletions pm/stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,19 +381,22 @@ func (s *stubGasPriceMonitor) GasPrice() *big.Int {

type stubSenderMonitor struct {
maxFloat *big.Int
funds *big.Int
redeemable chan *redemption
queued []*SignedTicket
acceptable bool
addFloatErr error
maxFloatErr error
validateSenderErr error
shouldFail error
deposit *big.Int
}

func newStubSenderMonitor() *stubSenderMonitor {
return &stubSenderMonitor{
maxFloat: big.NewInt(0),
redeemable: make(chan *redemption),
deposit: big.NewInt(0),
}
}

Expand Down Expand Up @@ -433,6 +436,13 @@ func (s *stubSenderMonitor) MaxFloat(_ ethcommon.Address) (*big.Int, error) {
return s.maxFloat, nil
}

func (s *stubSenderMonitor) SenderFunds(_ ethcommon.Address) (*big.Int, error) {
if s.funds != nil {
return new(big.Int).Set(s.funds), nil
}
return s.deposit, nil
}

func (s *stubSenderMonitor) ValidateSender(_ ethcommon.Address) error { return s.validateSenderErr }

// MockRecipient is useful for testing components that depend on pm.Recipient
Expand Down
23 changes: 23 additions & 0 deletions server/redeemer.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,29 @@ func (r *RedeemerClient) MaxFloat(sender ethcommon.Address) (*big.Int, error) {
return mf, nil
}

// SenderFunds retrieves the sender's spendable balance directly from the local sender manager cache
func (r *RedeemerClient) SenderFunds(sender ethcommon.Address) (*big.Int, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance it's possible to add unit tests for this?

info, err := r.sm.GetSenderInfo(sender)
if err != nil {
return nil, err
}

totalReserve := new(big.Int).Set(info.Reserve.FundsRemaining)
if info.Reserve.ClaimedInCurrentRound != nil {
totalReserve.Add(totalReserve, info.Reserve.ClaimedInCurrentRound)
}

available := new(big.Int).Set(totalReserve)
if info.Deposit != nil {
available.Add(available, info.Deposit)
}
Comment on lines +277 to +290
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any change it's possible to extract it to some utils and reuse? The code looks very similar to some other parts and the LocalSenderMonitor.SenderFunds() implementation.


if available.Sign() < 0 {
available = big.NewInt(0)
}
return available, nil
}

// ValidateSender checks whether a sender has not recently unlocked its deposit and reserve
func (r *RedeemerClient) ValidateSender(sender ethcommon.Address) error {
info, err := r.sm.GetSenderInfo(sender)
Expand Down
Loading