Skip to content

Commit 16cf4e9

Browse files
otherviewlibotony
andauthored
Enhance deferred tx accounting (#1598)
Co-authored-by: libotony <tony.li@vechain.org>
1 parent 7b976c6 commit 16cf4e9

File tree

3 files changed

+126
-10
lines changed

3 files changed

+126
-10
lines changed

txpool/tx_object_map.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,15 @@ func (m *txObjectMap) RemoveByHash(txHash thor.Bytes32) bool {
138138
return false
139139
}
140140

141+
func (m *txObjectMap) PendingCostOf(payer thor.Address) *big.Int {
142+
m.lock.RLock()
143+
defer m.lock.RUnlock()
144+
if cost := m.cost[payer]; cost != nil {
145+
return new(big.Int).Set(cost)
146+
}
147+
return new(big.Int)
148+
}
149+
141150
func (m *txObjectMap) UpdatePendingCost(txObj *TxObject) {
142151
m.lock.Lock()
143152
defer m.lock.Unlock()

txpool/tx_pool.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,6 @@ func (p *TxPool) wash(
502502
) {
503503
all := p.all.ToTxObjects()
504504
var toRemove []*TxObject
505-
var toUpdateCost []*TxObject
506505
defer func() {
507506
if err != nil {
508507
// in case of error, simply cut pool size to limit
@@ -527,13 +526,9 @@ func (p *TxPool) wash(
527526
}
528527
}
529528
}
530-
// update pending cost
531-
for _, txObj := range toUpdateCost {
532-
p.all.UpdatePendingCost(txObj)
533-
}
534529
}()
535530

536-
// recreate state every time to avoid high RAM usage when the pool at hight water-mark.
531+
// recreate state every time to avoid high RAM usage when the pool at high water-mark.
537532
newState := func() *state.State {
538533
return p.stater.NewState(headSummary.Root())
539534
}
@@ -660,16 +655,24 @@ func (p *TxPool) wash(
660655
var toBroadcast tx.Transactions
661656

662657
for _, obj := range executableObjs {
663-
executables = append(executables, obj.Transaction)
664-
// the tx is not executable previously
658+
// the tx was not executable previously: validate payer energy before promoting
665659
if !obj.executable {
660+
payer := *obj.Payer()
661+
needs := new(big.Int).Add(p.all.PendingCostOf(payer), obj.Cost())
662+
energy, err := builtin.Energy.Native(newState(), headSummary.Header.Timestamp()+thor.BlockInterval()).Get(payer)
663+
if err != nil || energy.Cmp(needs) < 0 {
664+
toRemove = append(toRemove, obj)
665+
logger.Trace("tx washed out", "id", obj.ID(), "err", "insufficient energy for overall pending cost")
666+
continue
667+
}
666668
obj.executable = true
667-
toUpdateCost = append(toUpdateCost, obj)
669+
p.all.UpdatePendingCost(obj)
668670
toBroadcast = append(toBroadcast, obj.Transaction)
669671
} else if obj.localSubmitted {
670672
// broadcast local submitted even it's already executable
671673
toBroadcast = append(toBroadcast, obj.Transaction)
672674
}
675+
executables = append(executables, obj.Transaction)
673676
}
674677

675678
p.goes.Go(func() {
@@ -681,7 +684,7 @@ func (p *TxPool) wash(
681684
return executables, 0, 0, nil
682685
}
683686

684-
// Get length of the `all` field
687+
// Len returns the length of the `all` field
685688
func (p *TxPool) Len() int {
686689
return p.all.Len()
687690
}

txpool/tx_pool_test.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1684,6 +1684,110 @@ func TestAddOverPendingCostDynamicFee(t *testing.T) {
16841684
assert.EqualError(t, err, "tx rejected: insufficient energy for overall pending cost")
16851685
}
16861686

1687+
func TestWashDeferredTxPendingCostEnforcement(t *testing.T) {
1688+
// This test asserts the correct behaviour: only 2 of 3 deferred txs
1689+
// should be promoted; the third exceeds the payer's energy and must be evicted.
1690+
now := uint64(time.Now().Unix() - time.Now().Unix()%10 - 10)
1691+
db := muxdb.NewMem()
1692+
1693+
// energy = 42*10^18 covers exactly 2 txs at gas=21000, InitialBaseGasPrice=1e15.
1694+
energy, _ := new(big.Int).SetString("42000000000000000000", 10)
1695+
1696+
builder := new(genesis.Builder).
1697+
GasLimit(thor.InitialGasLimit).
1698+
ForkConfig(&thor.NoFork).
1699+
Timestamp(now).
1700+
State(func(st *state.State) error {
1701+
if err := st.SetCode(builtin.Params.Address, builtin.Params.RuntimeBytecodes()); err != nil {
1702+
return err
1703+
}
1704+
if err := st.SetCode(builtin.Prototype.Address, builtin.Prototype.RuntimeBytecodes()); err != nil {
1705+
return err
1706+
}
1707+
st.SetEnergy(devAccounts[0].Address, energy, now)
1708+
return nil
1709+
})
1710+
1711+
setMethod, found := builtin.Params.ABI.MethodByName("set")
1712+
require.True(t, found)
1713+
var executor thor.Address
1714+
data, err := setMethod.EncodeInput(thor.KeyExecutorAddress, new(big.Int).SetBytes(executor[:]))
1715+
require.NoError(t, err)
1716+
builder.Call(tx.NewClause(&builtin.Params.Address).WithData(data), thor.Address{})
1717+
data, err = setMethod.EncodeInput(thor.KeyLegacyTxBaseGasPrice, thor.InitialBaseGasPrice)
1718+
require.NoError(t, err)
1719+
builder.Call(tx.NewClause(&builtin.Params.Address).WithData(data), executor)
1720+
1721+
b0, _, _, err := builder.Build(state.NewStater(db))
1722+
require.NoError(t, err)
1723+
1724+
// Commit genesis state so the pool stater can read it.
1725+
st := state.New(db, trie.Root{Hash: b0.Header().StateRoot()})
1726+
stage, err := st.Stage(trie.Version{Major: 1})
1727+
require.NoError(t, err)
1728+
root, err := stage.Commit()
1729+
require.NoError(t, err)
1730+
1731+
b1 := new(block.Builder).
1732+
ParentID(b0.Header().ID()).
1733+
StateRoot(root).
1734+
TotalScore(100).
1735+
Timestamp(now + 10).
1736+
GasLimit(thor.InitialGasLimit).
1737+
Build()
1738+
1739+
repo, _ := chain.NewRepository(db, b0)
1740+
require.NoError(t, repo.AddBlock(b1, tx.Receipts{}, 0, true))
1741+
1742+
pool := New(repo, state.NewStater(db), Options{
1743+
Limit: 50, // non-executable cap = 50*2/10 = 10, enough for 3 deferred txs
1744+
LimitPerAccount: 10,
1745+
MaxLifetime: time.Hour,
1746+
}, &thor.NoFork)
1747+
defer pool.Close()
1748+
1749+
// BlockRef=3: deferred while best is b1 (nextBlockNum=2), executable when best is b2 (nextBlockNum=3).
1750+
deferredRef := tx.NewBlockRef(b1.Header().Number() + 2)
1751+
tx1 := newTx(tx.TypeLegacy, pool.repo.ChainTag(), nil, 21000, deferredRef, 100, nil, tx.Features(0), devAccounts[0])
1752+
tx2 := newTx(tx.TypeLegacy, pool.repo.ChainTag(), nil, 21000, deferredRef, 100, nil, tx.Features(0), devAccounts[0])
1753+
tx3 := newTx(tx.TypeLegacy, pool.repo.ChainTag(), nil, 21000, deferredRef, 100, nil, tx.Features(0), devAccounts[0])
1754+
1755+
// All 3 are admitted: cost==nil for deferred txs so Add() skips the energy check.
1756+
require.NoError(t, pool.AddLocal(tx1))
1757+
require.NoError(t, pool.AddLocal(tx2))
1758+
require.NoError(t, pool.AddLocal(tx3))
1759+
require.Equal(t, 3, pool.Len(), "all 3 deferred txs must be admitted without energy check")
1760+
1761+
// Advance chain to block 2: nextBlockNum becomes 3, so BlockRef=3 is no longer deferred.
1762+
// Commit the same state at version 2 so wash() can read it at the new head.
1763+
st2 := state.New(db, trie.Root{Hash: root})
1764+
stage2, err := st2.Stage(trie.Version{Major: 2})
1765+
require.NoError(t, err)
1766+
root2, err := stage2.Commit()
1767+
require.NoError(t, err)
1768+
b2 := new(block.Builder).
1769+
ParentID(b1.Header().ID()).
1770+
StateRoot(root2).
1771+
TotalScore(200).
1772+
Timestamp(now + 20).
1773+
GasLimit(thor.InitialGasLimit).
1774+
Build()
1775+
require.NoError(t, repo.AddBlock(b2, tx.Receipts{}, 0, true))
1776+
1777+
executables, _, _, err := pool.wash(repo.BestBlockSummary(), true)
1778+
require.NoError(t, err)
1779+
1780+
promoted := 0
1781+
for _, etx := range executables {
1782+
if etx.ID() == tx1.ID() || etx.ID() == tx2.ID() || etx.ID() == tx3.ID() {
1783+
promoted++
1784+
}
1785+
}
1786+
// Payer energy covers only 2 txs; the 3rd must be evicted during promotion.
1787+
assert.Equal(t, 2, promoted, "only 2 deferred txs should be promoted; payer energy is exhausted after 2")
1788+
assert.Equal(t, 2, pool.Len(), "over-budget tx should be evicted from the pool during wash")
1789+
}
1790+
16871791
func TestValidateTxBasics(t *testing.T) {
16881792
pool := newPool(1, LIMIT_PER_ACCOUNT, &thor.ForkConfig{GALACTICA: 0})
16891793
defer pool.Close()

0 commit comments

Comments
 (0)