From 0e3a836af16c26d251882477b70dbe663a453f70 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 12 Dec 2025 17:42:58 +0100 Subject: [PATCH 01/14] fix(block): fix init logic sequencer for da epoch fetching --- block/components.go | 2 ++ .../internal/da/forced_inclusion_retriever.go | 18 +++++++-------- block/internal/executing/executor.go | 13 +++++++++-- block/internal/submitting/submitter.go | 23 +++++++++++++++---- sequencers/based/sequencer.go | 4 +++- sequencers/single/sequencer.go | 5 +++- 6 files changed, 48 insertions(+), 17 deletions(-) diff --git a/block/components.go b/block/components.go index 0b7996623f..fa7db7edef 100644 --- a/block/components.go +++ b/block/components.go @@ -258,6 +258,8 @@ func NewAggregatorComponents( logger, errorCh, ) + // Set sequencer on submitter so it can initialize sequencer DA height from genesis inclusion + submitter.SetSequencer(sequencer) return &Components{ Executor: executor, diff --git a/block/internal/da/forced_inclusion_retriever.go b/block/internal/da/forced_inclusion_retriever.go index c5ff15945a..40340905b3 100644 --- a/block/internal/da/forced_inclusion_retriever.go +++ b/block/internal/da/forced_inclusion_retriever.go @@ -18,10 +18,10 @@ var ErrForceInclusionNotConfigured = errors.New("forced inclusion namespace not // ForcedInclusionRetriever handles retrieval of forced inclusion transactions from DA. type ForcedInclusionRetriever struct { - client Client - genesis genesis.Genesis - logger zerolog.Logger - daEpochSize uint64 + client Client + logger zerolog.Logger + daEpochSize uint64 + daStartHeight uint64 } // ForcedInclusionEvent contains forced inclusion transactions retrieved from DA. @@ -39,10 +39,10 @@ func NewForcedInclusionRetriever( logger zerolog.Logger, ) *ForcedInclusionRetriever { return &ForcedInclusionRetriever{ - client: client, - genesis: genesis, - logger: logger.With().Str("component", "forced_inclusion_retriever").Logger(), - daEpochSize: genesis.DAEpochForcedInclusion, + client: client, + daStartHeight: genesis.DAStartHeight, // TODO: this should be genesis da start height (for full nodes) or store metadata da start height (for sequencers) + logger: logger.With().Str("component", "forced_inclusion_retriever").Logger(), + daEpochSize: genesis.DAEpochForcedInclusion, } } @@ -53,7 +53,7 @@ func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context return nil, ErrForceInclusionNotConfigured } - epochStart, epochEnd, currentEpochNumber := types.CalculateEpochBoundaries(daHeight, r.genesis.DAStartHeight, r.daEpochSize) + epochStart, epochEnd, currentEpochNumber := types.CalculateEpochBoundaries(daHeight, r.daStartHeight /* this should be fetch from store once */, r.daEpochSize) if daHeight != epochEnd { r.logger.Debug(). diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index f376a0cb0d..a8352239f2 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -197,12 +197,21 @@ func (e *Executor) initializeState() error { LastBlockHeight: e.genesis.InitialHeight - 1, LastBlockTime: e.genesis.StartTime, AppHash: stateRoot, - DAHeight: e.genesis.DAStartHeight, + // DA start height is usually 0 at InitChain unless it is a re-genesis. + // The sequencer does not know at which DA block its first block will be included. + DAHeight: e.genesis.DAStartHeight, } } e.setLastState(state) - e.sequencer.SetDAHeight(state.DAHeight) + // Defer setting sequencer DA height at genesis. At chain genesis there are no + // included DA blocks yet, so the sequencer shouldn't be updated with the + // state's DA height until we've produced/observed at least the first included + // block. Only set the sequencer DA height when the chain has progressed past + // the initial genesis height. + if state.LastBlockHeight >= e.genesis.InitialHeight { + e.sequencer.SetDAHeight(state.DAHeight) + } // Initialize store height using batch for atomicity batch, err := e.store.NewBatch(e.ctx) diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 8af2a76055..e42dee0144 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -15,6 +15,7 @@ import ( "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" coreexecutor "github.com/evstack/ev-node/core/execution" + coresequencer "github.com/evstack/ev-node/core/sequencer" "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/pkg/signer" @@ -31,10 +32,11 @@ type daSubmitterAPI interface { // Submitter handles DA submission and inclusion processing for both sync and aggregator nodes type Submitter struct { // Core components - store store.Store - exec coreexecutor.Executor - config config.Config - genesis genesis.Genesis + store store.Store + exec coreexecutor.Executor + sequencer coresequencer.Sequencer + config config.Config + genesis genesis.Genesis // Shared components cache cache.Manager @@ -93,6 +95,13 @@ func NewSubmitter( } } +// SetSequencer assigns the sequencer instance to the submitter. +// This allows the submitter to update the sequencer's DA height when the first +// DA inclusion (genesis) is observed. +func (s *Submitter) SetSequencer(seq coresequencer.Sequencer) { + s.sequencer = seq +} + // Start begins the submitting component func (s *Submitter) Start(ctx context.Context) error { s.ctx, s.cancel = context.WithCancel(ctx) @@ -364,6 +373,12 @@ func (s *Submitter) setSequencerHeightToDAHeight(ctx context.Context, height uin if err := s.store.SetMetadata(ctx, store.GenesisDAHeightKey, genesisDAIncludedHeightBytes); err != nil { return err } + + // the sequencer will process DA epochs from this height. + if s.sequencer != nil { + s.sequencer.SetDAHeight(genesisDAIncludedHeight) + s.logger.Debug().Uint64("genesis_da_height", genesisDAIncludedHeight).Msg("initialized sequencer DA height from persisted genesis DA height") + } } return nil diff --git a/sequencers/based/sequencer.go b/sequencers/based/sequencer.go index 7960cd52df..40c61f4a01 100644 --- a/sequencers/based/sequencer.go +++ b/sequencers/based/sequencer.go @@ -54,7 +54,9 @@ func NewBasedSequencer( logger: logger.With().Str("component", "based_sequencer").Logger(), checkpointStore: seqcommon.NewCheckpointStore(db, ds.NewKey("/based/checkpoint")), } - bs.SetDAHeight(genesis.DAStartHeight) // will be overridden by the executor + + // will be overridden by the executor or submitter (at genesis) + bs.SetDAHeight(genesis.DAStartHeight) // Load checkpoint from DB, or initialize if none exists loadCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) diff --git a/sequencers/single/sequencer.go b/sequencers/single/sequencer.go index 8b114e1e06..e887cc7af7 100644 --- a/sequencers/single/sequencer.go +++ b/sequencers/single/sequencer.go @@ -74,7 +74,10 @@ func NewSequencer( fiRetriever: fiRetriever, checkpointStore: seqcommon.NewCheckpointStore(db, ds.NewKey("/single/checkpoint")), } - s.SetDAHeight(genesis.DAStartHeight) // will be overridden by the executor + // will be overridden by the executor or submitter (at genesis) + // during genesis time, the sequencer will fetch unnecessary heights from DA genesis + // this is kept on purpose as some DAs (like local-da), do start at genesis. + s.SetDAHeight(genesis.DAStartHeight) loadCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() From 8d1c9f4d98f4295af00ac3c20eb24f049290b0d2 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 17 Dec 2025 16:27:53 +0100 Subject: [PATCH 02/14] updates --- .mockery.yaml | 4 +- block/components.go | 4 +- block/internal/submitting/submitter.go | 9 +- block/internal/submitting/submitter_test.go | 6 +- sequencers/based/sequencer.go | 4 +- sequencers/single/sequencer.go | 5 +- test/mocks/da.go | 181 ++++++++++++++++++ test/mocks/da_verifier.go | 193 -------------------- 8 files changed, 191 insertions(+), 215 deletions(-) delete mode 100644 test/mocks/da_verifier.go diff --git a/.mockery.yaml b/.mockery.yaml index 5944b9d1ae..f2671aabfc 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -63,13 +63,11 @@ packages: dir: ./test/mocks pkgname: mocks filename: da.go - github.com/evstack/ev-node/pkg/da/types: - interfaces: Verifier: config: dir: ./test/mocks pkgname: mocks - filename: da_verifier.go + filename: da.go github.com/evstack/ev-node/pkg/da/jsonrpc: interfaces: BlobModule: diff --git a/block/components.go b/block/components.go index c5a90aa304..5602e5a8e6 100644 --- a/block/components.go +++ b/block/components.go @@ -161,6 +161,7 @@ func NewSyncComponents( config, genesis, daSubmitter, + nil, // No sequencer for sync nodes nil, // No signer for sync nodes logger, errorCh, @@ -250,12 +251,11 @@ func NewAggregatorComponents( config, genesis, daSubmitter, + sequencer, signer, // Signer for aggregator nodes to submit to DA logger, errorCh, ) - // Set sequencer on submitter so it can initialize sequencer DA height from genesis inclusion - submitter.SetSequencer(sequencer) return &Components{ Executor: executor, diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index e42dee0144..7340cfbd6e 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -76,6 +76,7 @@ func NewSubmitter( config config.Config, genesis genesis.Genesis, daSubmitter daSubmitterAPI, + sequencer coresequencer.Sequencer, // Can be nil for sync nodes signer signer.Signer, // Can be nil for sync nodes logger zerolog.Logger, errorCh chan<- error, @@ -88,6 +89,7 @@ func NewSubmitter( config: config, genesis: genesis, daSubmitter: daSubmitter, + sequencer: sequencer, signer: signer, daIncludedHeight: &atomic.Uint64{}, errorCh: errorCh, @@ -95,13 +97,6 @@ func NewSubmitter( } } -// SetSequencer assigns the sequencer instance to the submitter. -// This allows the submitter to update the sequencer's DA height when the first -// DA inclusion (genesis) is observed. -func (s *Submitter) SetSequencer(seq coresequencer.Sequencer) { - s.sequencer = seq -} - // Start begins the submitting component func (s *Submitter) Start(ctx context.Context) error { s.ctx, s.cancel = context.WithCancel(ctx) diff --git a/block/internal/submitting/submitter_test.go b/block/internal/submitting/submitter_test.go index b3a778a4eb..269aedd0e0 100644 --- a/block/internal/submitting/submitter_test.go +++ b/block/internal/submitting/submitter_test.go @@ -168,7 +168,7 @@ func TestSubmitter_setSequencerHeightToDAHeight(t *testing.T) { daClient.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe() daClient.On("HasForcedInclusionNamespace").Return(false).Maybe() daSub := NewDASubmitter(daClient, cfg, genesis.Genesis{}, common.BlockOptions{}, metrics, zerolog.Nop()) - s := NewSubmitter(mockStore, nil, cm, metrics, cfg, genesis.Genesis{}, daSub, nil, zerolog.Nop(), nil) + s := NewSubmitter(mockStore, nil, cm, metrics, cfg, genesis.Genesis{}, daSub, nil, nil, zerolog.Nop(), nil) s.ctx = ctx h, d := newHeaderAndData("chain", 1, true) @@ -252,7 +252,7 @@ func TestSubmitter_processDAInclusionLoop_advances(t *testing.T) { daClient.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe() daClient.On("HasForcedInclusionNamespace").Return(false).Maybe() daSub := NewDASubmitter(daClient, cfg, genesis.Genesis{}, common.BlockOptions{}, metrics, zerolog.Nop()) - s := NewSubmitter(st, exec, cm, metrics, cfg, genesis.Genesis{}, daSub, nil, zerolog.Nop(), nil) + s := NewSubmitter(st, exec, cm, metrics, cfg, genesis.Genesis{}, daSub, nil, nil, zerolog.Nop(), nil) // prepare two consecutive blocks in store with DA included in cache h1, d1 := newHeaderAndData("chain", 1, true) @@ -442,7 +442,7 @@ func TestSubmitter_CacheClearedOnHeightInclusion(t *testing.T) { daClient.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe() daClient.On("HasForcedInclusionNamespace").Return(false).Maybe() daSub := NewDASubmitter(daClient, cfg, genesis.Genesis{}, common.BlockOptions{}, metrics, zerolog.Nop()) - s := NewSubmitter(st, exec, cm, metrics, cfg, genesis.Genesis{}, daSub, nil, zerolog.Nop(), nil) + s := NewSubmitter(st, exec, cm, metrics, cfg, genesis.Genesis{}, daSub, nil, nil, zerolog.Nop(), nil) // Create test blocks h1, d1 := newHeaderAndData("chain", 1, true) diff --git a/sequencers/based/sequencer.go b/sequencers/based/sequencer.go index ae23a1c8fc..c9407a9cc4 100644 --- a/sequencers/based/sequencer.go +++ b/sequencers/based/sequencer.go @@ -53,11 +53,9 @@ func NewBasedSequencer( fiRetriever: fiRetriever, logger: logger.With().Str("component", "based_sequencer").Logger(), checkpointStore: seqcommon.NewCheckpointStore(db, ds.NewKey("/based/checkpoint")), + daHeight: atomic.Uint64{}, // empty, set by executor or submitter } - // will be overridden by the executor or submitter (at genesis) - bs.SetDAHeight(genesis.DAStartHeight) - // Load checkpoint from DB, or initialize if none exists loadCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() diff --git a/sequencers/single/sequencer.go b/sequencers/single/sequencer.go index 8758b61f76..5e2efec636 100644 --- a/sequencers/single/sequencer.go +++ b/sequencers/single/sequencer.go @@ -73,11 +73,8 @@ func NewSequencer( proposer: proposer, fiRetriever: fiRetriever, checkpointStore: seqcommon.NewCheckpointStore(db, ds.NewKey("/single/checkpoint")), + daHeight: atomic.Uint64{}, // empty, set by executor or submitter } - // will be overridden by the executor or submitter (at genesis) - // during genesis time, the sequencer will fetch unnecessary heights from DA genesis - // this is kept on purpose as some DAs (like local-da), do start at genesis. - s.SetDAHeight(genesis.DAStartHeight) loadCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() diff --git a/test/mocks/da.go b/test/mocks/da.go index c6df9b1c41..0b5c71a49c 100644 --- a/test/mocks/da.go +++ b/test/mocks/da.go @@ -431,3 +431,184 @@ func (_c *MockClient_Submit_Call) RunAndReturn(run func(ctx context.Context, dat _c.Call.Return(run) return _c } + +// NewMockVerifier creates a new instance of MockVerifier. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockVerifier(t interface { + mock.TestingT + Cleanup(func()) +}) *MockVerifier { + mock := &MockVerifier{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} + +// MockVerifier is an autogenerated mock type for the Verifier type +type MockVerifier struct { + mock.Mock +} + +type MockVerifier_Expecter struct { + mock *mock.Mock +} + +func (_m *MockVerifier) EXPECT() *MockVerifier_Expecter { + return &MockVerifier_Expecter{mock: &_m.Mock} +} + +// GetProofs provides a mock function for the type MockVerifier +func (_mock *MockVerifier) GetProofs(ctx context.Context, ids []da.ID, namespace []byte) ([]da.Proof, error) { + ret := _mock.Called(ctx, ids, namespace) + + if len(ret) == 0 { + panic("no return value specified for GetProofs") + } + + var r0 []da.Proof + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context, []da.ID, []byte) ([]da.Proof, error)); ok { + return returnFunc(ctx, ids, namespace) + } + if returnFunc, ok := ret.Get(0).(func(context.Context, []da.ID, []byte) []da.Proof); ok { + r0 = returnFunc(ctx, ids, namespace) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]da.Proof) + } + } + if returnFunc, ok := ret.Get(1).(func(context.Context, []da.ID, []byte) error); ok { + r1 = returnFunc(ctx, ids, namespace) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockVerifier_GetProofs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetProofs' +type MockVerifier_GetProofs_Call struct { + *mock.Call +} + +// GetProofs is a helper method to define mock.On call +// - ctx context.Context +// - ids []da.ID +// - namespace []byte +func (_e *MockVerifier_Expecter) GetProofs(ctx interface{}, ids interface{}, namespace interface{}) *MockVerifier_GetProofs_Call { + return &MockVerifier_GetProofs_Call{Call: _e.mock.On("GetProofs", ctx, ids, namespace)} +} + +func (_c *MockVerifier_GetProofs_Call) Run(run func(ctx context.Context, ids []da.ID, namespace []byte)) *MockVerifier_GetProofs_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 []da.ID + if args[1] != nil { + arg1 = args[1].([]da.ID) + } + var arg2 []byte + if args[2] != nil { + arg2 = args[2].([]byte) + } + run( + arg0, + arg1, + arg2, + ) + }) + return _c +} + +func (_c *MockVerifier_GetProofs_Call) Return(vs []da.Proof, err error) *MockVerifier_GetProofs_Call { + _c.Call.Return(vs, err) + return _c +} + +func (_c *MockVerifier_GetProofs_Call) RunAndReturn(run func(ctx context.Context, ids []da.ID, namespace []byte) ([]da.Proof, error)) *MockVerifier_GetProofs_Call { + _c.Call.Return(run) + return _c +} + +// Validate provides a mock function for the type MockVerifier +func (_mock *MockVerifier) Validate(ctx context.Context, ids []da.ID, proofs []da.Proof, namespace []byte) ([]bool, error) { + ret := _mock.Called(ctx, ids, proofs, namespace) + + if len(ret) == 0 { + panic("no return value specified for Validate") + } + + var r0 []bool + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context, []da.ID, []da.Proof, []byte) ([]bool, error)); ok { + return returnFunc(ctx, ids, proofs, namespace) + } + if returnFunc, ok := ret.Get(0).(func(context.Context, []da.ID, []da.Proof, []byte) []bool); ok { + r0 = returnFunc(ctx, ids, proofs, namespace) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]bool) + } + } + if returnFunc, ok := ret.Get(1).(func(context.Context, []da.ID, []da.Proof, []byte) error); ok { + r1 = returnFunc(ctx, ids, proofs, namespace) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockVerifier_Validate_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Validate' +type MockVerifier_Validate_Call struct { + *mock.Call +} + +// Validate is a helper method to define mock.On call +// - ctx context.Context +// - ids []da.ID +// - proofs []da.Proof +// - namespace []byte +func (_e *MockVerifier_Expecter) Validate(ctx interface{}, ids interface{}, proofs interface{}, namespace interface{}) *MockVerifier_Validate_Call { + return &MockVerifier_Validate_Call{Call: _e.mock.On("Validate", ctx, ids, proofs, namespace)} +} + +func (_c *MockVerifier_Validate_Call) Run(run func(ctx context.Context, ids []da.ID, proofs []da.Proof, namespace []byte)) *MockVerifier_Validate_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 []da.ID + if args[1] != nil { + arg1 = args[1].([]da.ID) + } + var arg2 []da.Proof + if args[2] != nil { + arg2 = args[2].([]da.Proof) + } + var arg3 []byte + if args[3] != nil { + arg3 = args[3].([]byte) + } + run( + arg0, + arg1, + arg2, + arg3, + ) + }) + return _c +} + +func (_c *MockVerifier_Validate_Call) Return(bools []bool, err error) *MockVerifier_Validate_Call { + _c.Call.Return(bools, err) + return _c +} + +func (_c *MockVerifier_Validate_Call) RunAndReturn(run func(ctx context.Context, ids []da.ID, proofs []da.Proof, namespace []byte) ([]bool, error)) *MockVerifier_Validate_Call { + _c.Call.Return(run) + return _c +} diff --git a/test/mocks/da_verifier.go b/test/mocks/da_verifier.go deleted file mode 100644 index fe64098bec..0000000000 --- a/test/mocks/da_verifier.go +++ /dev/null @@ -1,193 +0,0 @@ -// Code generated by mockery; DO NOT EDIT. -// github.com/vektra/mockery -// template: testify - -package mocks - -import ( - "context" - - "github.com/evstack/ev-node/pkg/da/types" - mock "github.com/stretchr/testify/mock" -) - -// NewMockVerifier creates a new instance of MockVerifier. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewMockVerifier(t interface { - mock.TestingT - Cleanup(func()) -}) *MockVerifier { - mock := &MockVerifier{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} - -// MockVerifier is an autogenerated mock type for the Verifier type -type MockVerifier struct { - mock.Mock -} - -type MockVerifier_Expecter struct { - mock *mock.Mock -} - -func (_m *MockVerifier) EXPECT() *MockVerifier_Expecter { - return &MockVerifier_Expecter{mock: &_m.Mock} -} - -// GetProofs provides a mock function for the type MockVerifier -func (_mock *MockVerifier) GetProofs(ctx context.Context, ids []da.ID, namespace []byte) ([]da.Proof, error) { - ret := _mock.Called(ctx, ids, namespace) - - if len(ret) == 0 { - panic("no return value specified for GetProofs") - } - - var r0 []da.Proof - var r1 error - if returnFunc, ok := ret.Get(0).(func(context.Context, []da.ID, []byte) ([]da.Proof, error)); ok { - return returnFunc(ctx, ids, namespace) - } - if returnFunc, ok := ret.Get(0).(func(context.Context, []da.ID, []byte) []da.Proof); ok { - r0 = returnFunc(ctx, ids, namespace) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]da.Proof) - } - } - if returnFunc, ok := ret.Get(1).(func(context.Context, []da.ID, []byte) error); ok { - r1 = returnFunc(ctx, ids, namespace) - } else { - r1 = ret.Error(1) - } - return r0, r1 -} - -// MockVerifier_GetProofs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetProofs' -type MockVerifier_GetProofs_Call struct { - *mock.Call -} - -// GetProofs is a helper method to define mock.On call -// - ctx context.Context -// - ids []da.ID -// - namespace []byte -func (_e *MockVerifier_Expecter) GetProofs(ctx interface{}, ids interface{}, namespace interface{}) *MockVerifier_GetProofs_Call { - return &MockVerifier_GetProofs_Call{Call: _e.mock.On("GetProofs", ctx, ids, namespace)} -} - -func (_c *MockVerifier_GetProofs_Call) Run(run func(ctx context.Context, ids []da.ID, namespace []byte)) *MockVerifier_GetProofs_Call { - _c.Call.Run(func(args mock.Arguments) { - var arg0 context.Context - if args[0] != nil { - arg0 = args[0].(context.Context) - } - var arg1 []da.ID - if args[1] != nil { - arg1 = args[1].([]da.ID) - } - var arg2 []byte - if args[2] != nil { - arg2 = args[2].([]byte) - } - run( - arg0, - arg1, - arg2, - ) - }) - return _c -} - -func (_c *MockVerifier_GetProofs_Call) Return(vs []da.Proof, err error) *MockVerifier_GetProofs_Call { - _c.Call.Return(vs, err) - return _c -} - -func (_c *MockVerifier_GetProofs_Call) RunAndReturn(run func(ctx context.Context, ids []da.ID, namespace []byte) ([]da.Proof, error)) *MockVerifier_GetProofs_Call { - _c.Call.Return(run) - return _c -} - -// Validate provides a mock function for the type MockVerifier -func (_mock *MockVerifier) Validate(ctx context.Context, ids []da.ID, proofs []da.Proof, namespace []byte) ([]bool, error) { - ret := _mock.Called(ctx, ids, proofs, namespace) - - if len(ret) == 0 { - panic("no return value specified for Validate") - } - - var r0 []bool - var r1 error - if returnFunc, ok := ret.Get(0).(func(context.Context, []da.ID, []da.Proof, []byte) ([]bool, error)); ok { - return returnFunc(ctx, ids, proofs, namespace) - } - if returnFunc, ok := ret.Get(0).(func(context.Context, []da.ID, []da.Proof, []byte) []bool); ok { - r0 = returnFunc(ctx, ids, proofs, namespace) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]bool) - } - } - if returnFunc, ok := ret.Get(1).(func(context.Context, []da.ID, []da.Proof, []byte) error); ok { - r1 = returnFunc(ctx, ids, proofs, namespace) - } else { - r1 = ret.Error(1) - } - return r0, r1 -} - -// MockVerifier_Validate_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Validate' -type MockVerifier_Validate_Call struct { - *mock.Call -} - -// Validate is a helper method to define mock.On call -// - ctx context.Context -// - ids []da.ID -// - proofs []da.Proof -// - namespace []byte -func (_e *MockVerifier_Expecter) Validate(ctx interface{}, ids interface{}, proofs interface{}, namespace interface{}) *MockVerifier_Validate_Call { - return &MockVerifier_Validate_Call{Call: _e.mock.On("Validate", ctx, ids, proofs, namespace)} -} - -func (_c *MockVerifier_Validate_Call) Run(run func(ctx context.Context, ids []da.ID, proofs []da.Proof, namespace []byte)) *MockVerifier_Validate_Call { - _c.Call.Run(func(args mock.Arguments) { - var arg0 context.Context - if args[0] != nil { - arg0 = args[0].(context.Context) - } - var arg1 []da.ID - if args[1] != nil { - arg1 = args[1].([]da.ID) - } - var arg2 []da.Proof - if args[2] != nil { - arg2 = args[2].([]da.Proof) - } - var arg3 []byte - if args[3] != nil { - arg3 = args[3].([]byte) - } - run( - arg0, - arg1, - arg2, - arg3, - ) - }) - return _c -} - -func (_c *MockVerifier_Validate_Call) Return(bools []bool, err error) *MockVerifier_Validate_Call { - _c.Call.Return(bools, err) - return _c -} - -func (_c *MockVerifier_Validate_Call) RunAndReturn(run func(ctx context.Context, ids []da.ID, proofs []da.Proof, namespace []byte) ([]bool, error)) *MockVerifier_Validate_Call { - _c.Call.Return(run) - return _c -} From be30808a3508caaf08fe2049819f1e633aa6d9ec Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 17 Dec 2025 17:10:26 +0100 Subject: [PATCH 03/14] updates --- apps/evm/cmd/run.go | 2 +- apps/grpc/cmd/run.go | 2 +- apps/testapp/cmd/run.go | 2 +- .../internal/da/forced_inclusion_retriever.go | 33 +++++++++++--- .../da/forced_inclusion_retriever_test.go | 43 +++++++++++++++---- block/internal/syncing/syncer.go | 2 +- .../syncing/syncer_forced_inclusion_test.go | 16 +++---- block/public.go | 6 ++- node/full.go | 18 +++----- pkg/store/kv.go | 18 +++++++- sequencers/single/queue.go | 8 +--- sequencers/single/queue_test.go | 7 +-- 12 files changed, 105 insertions(+), 52 deletions(-) diff --git a/apps/evm/cmd/run.go b/apps/evm/cmd/run.go index cb1431f133..e2175345e9 100644 --- a/apps/evm/cmd/run.go +++ b/apps/evm/cmd/run.go @@ -161,7 +161,7 @@ func createSequencer( genesis genesis.Genesis, daClient block.FullDAClient, ) (coresequencer.Sequencer, error) { - fiRetriever := block.NewForcedInclusionRetriever(daClient, genesis, logger) + fiRetriever := block.NewForcedInclusionRetriever(daClient, genesis, datastore, logger) if nodeConfig.Node.BasedSequencer { // Based sequencer mode - fetch transactions only from DA diff --git a/apps/grpc/cmd/run.go b/apps/grpc/cmd/run.go index 6198fb7727..622b0813cc 100644 --- a/apps/grpc/cmd/run.go +++ b/apps/grpc/cmd/run.go @@ -120,7 +120,7 @@ func createSequencer( } daClient := block.NewDAClient(blobClient, nodeConfig, logger) - fiRetriever := block.NewForcedInclusionRetriever(daClient, genesis, logger) + fiRetriever := block.NewForcedInclusionRetriever(daClient, genesis, datastore, logger) if nodeConfig.Node.BasedSequencer { // Based sequencer mode - fetch transactions only from DA diff --git a/apps/testapp/cmd/run.go b/apps/testapp/cmd/run.go index 78c8bdaeaa..5df52be3ed 100644 --- a/apps/testapp/cmd/run.go +++ b/apps/testapp/cmd/run.go @@ -121,7 +121,7 @@ func createSequencer( } daClient := block.NewDAClient(blobClient, nodeConfig, logger) - fiRetriever := block.NewForcedInclusionRetriever(daClient, genesis, logger) + fiRetriever := block.NewForcedInclusionRetriever(daClient, genesis, datastore, logger) if nodeConfig.Node.BasedSequencer { // Based sequencer mode - fetch transactions only from DA diff --git a/block/internal/da/forced_inclusion_retriever.go b/block/internal/da/forced_inclusion_retriever.go index 2ececf6af1..d99bdb400a 100644 --- a/block/internal/da/forced_inclusion_retriever.go +++ b/block/internal/da/forced_inclusion_retriever.go @@ -2,6 +2,7 @@ package da import ( "context" + "encoding/binary" "errors" "fmt" "time" @@ -10,6 +11,7 @@ import ( datypes "github.com/evstack/ev-node/pkg/da/types" "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/store" "github.com/evstack/ev-node/types" ) @@ -20,6 +22,7 @@ var ErrForceInclusionNotConfigured = errors.New("forced inclusion namespace not type ForcedInclusionRetriever struct { client Client logger zerolog.Logger + store store.Store daEpochSize uint64 daStartHeight uint64 } @@ -36,24 +39,30 @@ type ForcedInclusionEvent struct { func NewForcedInclusionRetriever( client Client, genesis genesis.Genesis, + store store.Store, logger zerolog.Logger, ) *ForcedInclusionRetriever { - return &ForcedInclusionRetriever{ - client: client, - daStartHeight: genesis.DAStartHeight, // TODO: this should be genesis da start height (for full nodes) or store metadata da start height (for sequencers) - logger: logger.With().Str("component", "forced_inclusion_retriever").Logger(), - daEpochSize: genesis.DAEpochForcedInclusion, + r := &ForcedInclusionRetriever{ + client: client, + store: store, + logger: logger.With().Str("component", "forced_inclusion_retriever").Logger(), + daEpochSize: genesis.DAEpochForcedInclusion, } + + // check for inclusion da height on store (sequencer) or genesis da height (full nodes) + r.daStartHeight = max(r.getInitialDAStartHeight(context.Background()), genesis.DAStartHeight) + return r } // RetrieveForcedIncludedTxs retrieves forced inclusion transactions at the given DA height. // It respects epoch boundaries and only fetches at epoch start. func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) { - if !r.client.HasForcedInclusionNamespace() { + // when daStartHeight is not set or no namespace is configured, we retrieve nothing. + if !r.client.HasForcedInclusionNamespace() || r.daStartHeight == 0 { return nil, ErrForceInclusionNotConfigured } - epochStart, epochEnd, currentEpochNumber := types.CalculateEpochBoundaries(daHeight, r.daStartHeight /* this should be fetch from store once */, r.daEpochSize) + epochStart, epochEnd, currentEpochNumber := types.CalculateEpochBoundaries(daHeight, r.daStartHeight, r.daEpochSize) if daHeight != epochEnd { r.logger.Debug(). @@ -171,3 +180,13 @@ func (r *ForcedInclusionRetriever) processForcedInclusionBlobs( return nil } + +// getInitialDAStartHeight retrieves the DA height of the first included chain height from store. +func (r *ForcedInclusionRetriever) getInitialDAStartHeight(ctx context.Context) uint64 { + daIncludedHeightBytes, err := r.store.GetMetadata(ctx, store.GenesisDAHeightKey) + if err != nil || len(daIncludedHeightBytes) != 8 { + return 0 + } + + return binary.LittleEndian.Uint64(daIncludedHeightBytes) +} diff --git a/block/internal/da/forced_inclusion_retriever_test.go b/block/internal/da/forced_inclusion_retriever_test.go index f4ab3544f2..55c9a8a4a5 100644 --- a/block/internal/da/forced_inclusion_retriever_test.go +++ b/block/internal/da/forced_inclusion_retriever_test.go @@ -5,12 +5,15 @@ import ( "testing" "time" + "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" "github.com/rs/zerolog" "github.com/stretchr/testify/mock" "gotest.tools/v3/assert" datypes "github.com/evstack/ev-node/pkg/da/types" "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/store" "github.com/evstack/ev-node/test/mocks" ) @@ -24,7 +27,10 @@ func TestNewForcedInclusionRetriever(t *testing.T) { DAEpochForcedInclusion: 10, } - retriever := NewForcedInclusionRetriever(client, gen, zerolog.Nop()) + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + + retriever := NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) assert.Assert(t, retriever != nil) } @@ -37,7 +43,10 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_NoNamespace(t *testi DAEpochForcedInclusion: 10, } - retriever := NewForcedInclusionRetriever(client, gen, zerolog.Nop()) + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + + retriever := NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) ctx := context.Background() _, err := retriever.RetrieveForcedIncludedTxs(ctx, 100) @@ -56,7 +65,10 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_NotAtEpochStart(t *t DAEpochForcedInclusion: 10, } - retriever := NewForcedInclusionRetriever(client, gen, zerolog.Nop()) + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + + retriever := NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) ctx := context.Background() // Height 105 is not an epoch start (100, 110, 120, etc. are epoch starts) @@ -89,7 +101,10 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_EpochStartSuccess(t DAEpochForcedInclusion: 1, // Single height epoch } - retriever := NewForcedInclusionRetriever(client, gen, zerolog.Nop()) + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + + retriever := NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) ctx := context.Background() // Height 100 is an epoch start @@ -116,7 +131,10 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_EpochStartNotAvailab DAEpochForcedInclusion: 10, } - retriever := NewForcedInclusionRetriever(client, gen, zerolog.Nop()) + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + + retriever := NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) ctx := context.Background() // Epoch boundaries: [100, 109] - retrieval happens at epoch end (109) @@ -139,7 +157,10 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_NoBlobsAtHeight(t *t DAEpochForcedInclusion: 1, // Single height epoch } - retriever := NewForcedInclusionRetriever(client, gen, zerolog.Nop()) + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + + retriever := NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) ctx := context.Background() event, err := retriever.RetrieveForcedIncludedTxs(ctx, 100) @@ -177,7 +198,10 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_MultiHeightEpoch(t * DAEpochForcedInclusion: 3, // Epoch: 100-102 } - retriever := NewForcedInclusionRetriever(client, gen, zerolog.Nop()) + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + + retriever := NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) ctx := context.Background() // Epoch boundaries: [100, 102] - retrieval happens at epoch end (102) @@ -201,7 +225,10 @@ func TestForcedInclusionRetriever_processForcedInclusionBlobs(t *testing.T) { DAEpochForcedInclusion: 10, } - retriever := NewForcedInclusionRetriever(client, gen, zerolog.Nop()) + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + + retriever := NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) tests := []struct { name string diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 812b40b206..4d3565cbc1 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -187,7 +187,7 @@ func (s *Syncer) Start(ctx context.Context) error { // Initialize handlers s.daRetriever = NewDARetriever(s.daClient, s.cache, s.genesis, s.logger) - s.fiRetriever = da.NewForcedInclusionRetriever(s.daClient, s.genesis, s.logger) + s.fiRetriever = da.NewForcedInclusionRetriever(s.daClient, s.genesis, s.store, s.logger) s.p2pHandler = NewP2PHandler(s.headerStore.Store(), s.dataStore.Store(), s.cache, s.genesis, s.logger) if currentHeight, err := s.store.Height(s.ctx); err != nil { s.logger.Error().Err(err).Msg("failed to set initial processed height for p2p handler") diff --git a/block/internal/syncing/syncer_forced_inclusion_test.go b/block/internal/syncing/syncer_forced_inclusion_test.go index 111eb9e5ad..66546eae00 100644 --- a/block/internal/syncing/syncer_forced_inclusion_test.go +++ b/block/internal/syncing/syncer_forced_inclusion_test.go @@ -370,7 +370,7 @@ func TestVerifyForcedInclusionTxs_AllTransactionsIncluded(t *testing.T) { client.On("GetForcedInclusionNamespace").Return([]byte(cfg.DA.ForcedInclusionNamespace)).Maybe() client.On("HasForcedInclusionNamespace").Return(true).Maybe() daRetriever := NewDARetriever(client, cm, gen, zerolog.Nop()) - fiRetriever := da.NewForcedInclusionRetriever(client, gen, zerolog.Nop()) + fiRetriever := da.NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) s := NewSyncer( st, @@ -443,7 +443,7 @@ func TestVerifyForcedInclusionTxs_MissingTransactions(t *testing.T) { client.On("GetForcedInclusionNamespace").Return([]byte(cfg.DA.ForcedInclusionNamespace)).Maybe() client.On("HasForcedInclusionNamespace").Return(true).Maybe() daRetriever := NewDARetriever(client, cm, gen, zerolog.Nop()) - fiRetriever := da.NewForcedInclusionRetriever(client, gen, zerolog.Nop()) + fiRetriever := da.NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) s := NewSyncer( st, @@ -546,7 +546,7 @@ func TestVerifyForcedInclusionTxs_PartiallyIncluded(t *testing.T) { client.On("GetForcedInclusionNamespace").Return([]byte(cfg.DA.ForcedInclusionNamespace)).Maybe() client.On("HasForcedInclusionNamespace").Return(true).Maybe() daRetriever := NewDARetriever(client, cm, gen, zerolog.Nop()) - fiRetriever := da.NewForcedInclusionRetriever(client, gen, zerolog.Nop()) + fiRetriever := da.NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) s := NewSyncer( st, @@ -653,7 +653,7 @@ func TestVerifyForcedInclusionTxs_NoForcedTransactions(t *testing.T) { client.On("GetForcedInclusionNamespace").Return([]byte(cfg.DA.ForcedInclusionNamespace)).Maybe() client.On("HasForcedInclusionNamespace").Return(true).Maybe() daRetriever := NewDARetriever(client, cm, gen, zerolog.Nop()) - fiRetriever := da.NewForcedInclusionRetriever(client, gen, zerolog.Nop()) + fiRetriever := da.NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) s := NewSyncer( st, @@ -719,7 +719,7 @@ func TestVerifyForcedInclusionTxs_NamespaceNotConfigured(t *testing.T) { client.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe() client.On("HasForcedInclusionNamespace").Return(false).Maybe() daRetriever := NewDARetriever(client, cm, gen, zerolog.Nop()) - fiRetriever := da.NewForcedInclusionRetriever(client, gen, zerolog.Nop()) + fiRetriever := da.NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) s := NewSyncer( st, @@ -784,7 +784,7 @@ func TestVerifyForcedInclusionTxs_DeferralWithinEpoch(t *testing.T) { client.On("GetForcedInclusionNamespace").Return([]byte(cfg.DA.ForcedInclusionNamespace)).Maybe() client.On("HasForcedInclusionNamespace").Return(true).Maybe() daRetriever := NewDARetriever(client, cm, gen, zerolog.Nop()) - fiRetriever := da.NewForcedInclusionRetriever(client, gen, zerolog.Nop()) + fiRetriever := da.NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) s := NewSyncer( st, @@ -907,7 +907,7 @@ func TestVerifyForcedInclusionTxs_MaliciousAfterEpochEnd(t *testing.T) { client.On("GetForcedInclusionNamespace").Return([]byte(cfg.DA.ForcedInclusionNamespace)).Maybe() client.On("HasForcedInclusionNamespace").Return(true).Maybe() daRetriever := NewDARetriever(client, cm, gen, zerolog.Nop()) - fiRetriever := da.NewForcedInclusionRetriever(client, gen, zerolog.Nop()) + fiRetriever := da.NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) s := NewSyncer( st, @@ -996,7 +996,7 @@ func TestVerifyForcedInclusionTxs_SmoothingExceedsEpoch(t *testing.T) { client.On("HasForcedInclusionNamespace").Return(true).Maybe() daRetriever := NewDARetriever(client, cm, gen, zerolog.Nop()) - fiRetriever := da.NewForcedInclusionRetriever(client, gen, zerolog.Nop()) + fiRetriever := da.NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) s := NewSyncer( st, diff --git a/block/public.go b/block/public.go index c8765d29ec..e42cefc641 100644 --- a/block/public.go +++ b/block/public.go @@ -8,6 +8,8 @@ import ( "github.com/evstack/ev-node/pkg/config" blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc" "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/store" + "github.com/ipfs/go-datastore" "github.com/rs/zerolog" ) @@ -75,7 +77,9 @@ type ForcedInclusionRetriever interface { func NewForcedInclusionRetriever( client DAClient, genesis genesis.Genesis, + ds datastore.Batching, logger zerolog.Logger, ) ForcedInclusionRetriever { - return da.NewForcedInclusionRetriever(client, genesis, logger) + mainKV := store.NewEvNodeKVStore(ds) + return da.NewForcedInclusionRetriever(client, genesis, store.New(mainKV), logger) } diff --git a/node/full.go b/node/full.go index 7af73ad474..c6086bb5da 100644 --- a/node/full.go +++ b/node/full.go @@ -11,7 +11,6 @@ import ( "time" ds "github.com/ipfs/go-datastore" - ktds "github.com/ipfs/go-datastore/keytransform" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/zerolog" @@ -30,9 +29,6 @@ import ( evsync "github.com/evstack/ev-node/pkg/sync" ) -// prefixes used in KV store to separate rollkit data from execution environment data (if the same data base is reused) -var EvPrefix = "0" - const ( // genesisChunkSize is the maximum size, in bytes, of each // chunk in the genesis structure for the chunked API @@ -83,8 +79,8 @@ func newFullNode( blockMetrics, _ := metricsProvider(genesis.ChainID) - mainKV := newPrefixKV(database, EvPrefix) - rktStore := store.New(mainKV) + mainKV := store.NewEvNodeKVStore(database) + evstore := store.New(mainKV) headerSyncService, err := initHeaderSyncService(mainKV, nodeConfig, genesis, p2pClient, logger) if err != nil { @@ -101,7 +97,7 @@ func newFullNode( blockComponents, err = block.NewAggregatorComponents( nodeConfig, genesis, - rktStore, + evstore, exec, sequencer, daClient, @@ -116,7 +112,7 @@ func newFullNode( blockComponents, err = block.NewSyncComponents( nodeConfig, genesis, - rktStore, + evstore, exec, daClient, headerSyncService, @@ -136,7 +132,7 @@ func newFullNode( p2pClient: p2pClient, blockComponents: blockComponents, daClient: daClient, - Store: rktStore, + Store: evstore, hSyncService: headerSyncService, dSyncService: dataSyncService, } @@ -471,7 +467,3 @@ func (n *FullNode) GetGenesisChunks() ([]string, error) { func (n *FullNode) IsRunning() bool { return n.blockComponents != nil } - -func newPrefixKV(kvStore ds.Batching, prefix string) ds.Batching { - return ktds.Wrap(kvStore, ktds.PrefixTransform{Prefix: ds.NewKey(prefix)}) -} diff --git a/pkg/store/kv.go b/pkg/store/kv.go index e270962133..296796cfb8 100644 --- a/pkg/store/kv.go +++ b/pkg/store/kv.go @@ -7,18 +7,32 @@ import ( "strings" ds "github.com/ipfs/go-datastore" + ktds "github.com/ipfs/go-datastore/keytransform" dsq "github.com/ipfs/go-datastore/query" badger4 "github.com/ipfs/go-ds-badger4" ) +// EvPrefix is used in KV store to separate ev-node data from execution environment data (if the same data base is reused) +const EvPrefix = "0" + // NewDefaultKVStore creates instance of default key-value store. func NewDefaultKVStore(rootDir, dbPath, dbName string) (ds.Batching, error) { path := filepath.Join(rootify(rootDir, dbPath), dbName) return badger4.NewDatastore(path, nil) } -// PrefixEntries retrieves all entries in the datastore whose keys have the supplied prefix -func PrefixEntries(ctx context.Context, store ds.Datastore, prefix string) (dsq.Results, error) { +// NewPrefixKVStore creates a new key-value store with a prefix applied to all keys. +func NewPrefixKVStore(kvStore ds.Batching, prefix string) ds.Batching { + return ktds.Wrap(kvStore, ktds.PrefixTransform{Prefix: ds.NewKey(prefix)}) +} + +// NewEvNodeKVStore creates a new key-value store with EvPrefix prefix applied to all keys. +func NewEvNodeKVStore(kvStore ds.Batching) ds.Batching { + return NewPrefixKVStore(kvStore, EvPrefix) +} + +// GetPrefixEntries retrieves all entries in the datastore whose keys have the supplied prefix +func GetPrefixEntries(ctx context.Context, store ds.Datastore, prefix string) (dsq.Results, error) { results, err := store.Query(ctx, dsq.Query{Prefix: prefix}) if err != nil { return nil, err diff --git a/sequencers/single/queue.go b/sequencers/single/queue.go index 707fa7ad5f..ffea618fb5 100644 --- a/sequencers/single/queue.go +++ b/sequencers/single/queue.go @@ -8,11 +8,11 @@ import ( "sync" ds "github.com/ipfs/go-datastore" - ktds "github.com/ipfs/go-datastore/keytransform" "github.com/ipfs/go-datastore/query" "google.golang.org/protobuf/proto" coresequencer "github.com/evstack/ev-node/core/sequencer" + "github.com/evstack/ev-node/pkg/store" pb "github.com/evstack/ev-node/types/pb/evnode/v1" ) @@ -20,10 +20,6 @@ import ( // ErrQueueFull is returned when the batch queue has reached its maximum size var ErrQueueFull = errors.New("batch queue is full") -func newPrefixKV(kvStore ds.Batching, prefix string) ds.Batching { - return ktds.Wrap(kvStore, ktds.PrefixTransform{Prefix: ds.NewKey(prefix)}) -} - // BatchQueue implements a persistent queue for transaction batches type BatchQueue struct { queue []coresequencer.Batch @@ -40,7 +36,7 @@ func NewBatchQueue(db ds.Batching, prefix string, maxSize int) *BatchQueue { queue: make([]coresequencer.Batch, 0), head: 0, maxQueueSize: maxSize, - db: newPrefixKV(db, prefix), + db: store.NewPrefixKVStore(db, prefix), } } diff --git a/sequencers/single/queue_test.go b/sequencers/single/queue_test.go index 39ac725cd9..e79c28e08a 100644 --- a/sequencers/single/queue_test.go +++ b/sequencers/single/queue_test.go @@ -17,6 +17,7 @@ import ( "google.golang.org/protobuf/proto" coresequencer "github.com/evstack/ev-node/core/sequencer" + "github.com/evstack/ev-node/pkg/store" pb "github.com/evstack/ev-node/types/pb/evnode/v1" ) @@ -31,7 +32,7 @@ func createTestBatch(t *testing.T, txCount int) coresequencer.Batch { func setupTestQueue(t *testing.T) *BatchQueue { // Create an in-memory thread-safe datastore - memdb := newPrefixKV(ds.NewMapDatastore(), "single") + memdb := store.NewPrefixKVStore(ds.NewMapDatastore(), "single") return NewBatchQueue(memdb, "batching", 0) // 0 = unlimited for existing tests } @@ -406,7 +407,7 @@ func TestBatchQueue_QueueLimit(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { // Create in-memory datastore and queue with specified limit - memdb := newPrefixKV(ds.NewMapDatastore(), "single") + memdb := store.NewPrefixKVStore(ds.NewMapDatastore(), "single") bq := NewBatchQueue(memdb, "batching", tc.maxSize) ctx := context.Background() @@ -454,7 +455,7 @@ func TestBatchQueue_QueueLimit(t *testing.T) { func TestBatchQueue_QueueLimit_WithNext(t *testing.T) { // Test that removing batches with Next() allows adding more batches maxSize := 3 - memdb := newPrefixKV(ds.NewMapDatastore(), "single") + memdb := store.NewPrefixKVStore(ds.NewMapDatastore(), "single") bq := NewBatchQueue(memdb, "batching", maxSize) ctx := context.Background() From e80835736c1bffe601682f8f3a3bd255329b2dfb Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 18 Dec 2025 17:09:45 +0100 Subject: [PATCH 04/14] fix build --- apps/evm/cmd/rollback.go | 8 +------- apps/testapp/cmd/rollback.go | 8 +------- 2 files changed, 2 insertions(+), 14 deletions(-) diff --git a/apps/evm/cmd/rollback.go b/apps/evm/cmd/rollback.go index 552d296e8a..f28ebb8bd5 100644 --- a/apps/evm/cmd/rollback.go +++ b/apps/evm/cmd/rollback.go @@ -5,12 +5,9 @@ import ( "errors" "fmt" - ds "github.com/ipfs/go-datastore" - kt "github.com/ipfs/go-datastore/keytransform" "github.com/spf13/cobra" goheaderstore "github.com/celestiaorg/go-header/store" - "github.com/evstack/ev-node/node" rollcmd "github.com/evstack/ev-node/pkg/cmd" "github.com/evstack/ev-node/pkg/store" "github.com/evstack/ev-node/types" @@ -50,10 +47,7 @@ func NewRollbackCmd() *cobra.Command { }() // prefixed evolve db - evolveDB := kt.Wrap(rawEvolveDB, &kt.PrefixTransform{ - Prefix: ds.NewKey(node.EvPrefix), - }) - + evolveDB := store.NewEvNodeKVStore(rawEvolveDB) evolveStore := store.New(evolveDB) if height == 0 { currentHeight, err := evolveStore.Height(goCtx) diff --git a/apps/testapp/cmd/rollback.go b/apps/testapp/cmd/rollback.go index 5600eaeb90..dfea32176f 100644 --- a/apps/testapp/cmd/rollback.go +++ b/apps/testapp/cmd/rollback.go @@ -6,14 +6,11 @@ import ( "fmt" kvexecutor "github.com/evstack/ev-node/apps/testapp/kv" - "github.com/evstack/ev-node/node" rollcmd "github.com/evstack/ev-node/pkg/cmd" "github.com/evstack/ev-node/pkg/store" "github.com/evstack/ev-node/types" goheaderstore "github.com/celestiaorg/go-header/store" - ds "github.com/ipfs/go-datastore" - kt "github.com/ipfs/go-datastore/keytransform" "github.com/spf13/cobra" ) @@ -51,10 +48,7 @@ func NewRollbackCmd() *cobra.Command { }() // prefixed evolve db - evolveDB := kt.Wrap(rawEvolveDB, &kt.PrefixTransform{ - Prefix: ds.NewKey(node.EvPrefix), - }) - + evolveDB := store.NewEvNodeKVStore(rawEvolveDB) evolveStore := store.New(evolveDB) if height == 0 { currentHeight, err := evolveStore.Height(goCtx) From fa3ab8d6e926f10532b13c01a7948a511e0cc08b Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 18 Dec 2025 17:47:21 +0100 Subject: [PATCH 05/14] updates --- .../internal/da/forced_inclusion_retriever.go | 4 +++ block/internal/executing/executor.go | 12 ++------- sequencers/based/sequencer.go | 8 +++--- sequencers/single/sequencer.go | 25 ++++++++++--------- 4 files changed, 23 insertions(+), 26 deletions(-) diff --git a/block/internal/da/forced_inclusion_retriever.go b/block/internal/da/forced_inclusion_retriever.go index d99bdb400a..3665dd5388 100644 --- a/block/internal/da/forced_inclusion_retriever.go +++ b/block/internal/da/forced_inclusion_retriever.go @@ -62,6 +62,10 @@ func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context return nil, ErrForceInclusionNotConfigured } + if daHeight < r.daStartHeight { + return nil, ErrForceInclusionNotConfigured + } + epochStart, epochEnd, currentEpochNumber := types.CalculateEpochBoundaries(daHeight, r.daStartHeight, r.daEpochSize) if daHeight != epochEnd { diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index a8352239f2..845e18727e 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -197,21 +197,13 @@ func (e *Executor) initializeState() error { LastBlockHeight: e.genesis.InitialHeight - 1, LastBlockTime: e.genesis.StartTime, AppHash: stateRoot, - // DA start height is usually 0 at InitChain unless it is a re-genesis. - // The sequencer does not know at which DA block its first block will be included. + // DA start height is usually 0 at InitChain unless it is a re-genesis or a based sequencer. DAHeight: e.genesis.DAStartHeight, } } e.setLastState(state) - // Defer setting sequencer DA height at genesis. At chain genesis there are no - // included DA blocks yet, so the sequencer shouldn't be updated with the - // state's DA height until we've produced/observed at least the first included - // block. Only set the sequencer DA height when the chain has progressed past - // the initial genesis height. - if state.LastBlockHeight >= e.genesis.InitialHeight { - e.sequencer.SetDAHeight(state.DAHeight) - } + e.sequencer.SetDAHeight(state.DAHeight) // Initialize store height using batch for atomicity batch, err := e.store.NewBatch(e.ctx) diff --git a/sequencers/based/sequencer.go b/sequencers/based/sequencer.go index 911973844d..e72e297a63 100644 --- a/sequencers/based/sequencer.go +++ b/sequencers/based/sequencer.go @@ -53,8 +53,8 @@ func NewBasedSequencer( fiRetriever: fiRetriever, logger: logger.With().Str("component", "based_sequencer").Logger(), checkpointStore: seqcommon.NewCheckpointStore(db, ds.NewKey("/based/checkpoint")), - daHeight: atomic.Uint64{}, // empty, set by executor or submitter } + bs.SetDAHeight(genesis.DAStartHeight) // based sequencers need community approval of da start height given no submission are done // Load checkpoint from DB, or initialize if none exists loadCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -96,11 +96,11 @@ func (s *BasedSequencer) SubmitBatchTxs(ctx context.Context, req coresequencer.S // GetNextBatch retrieves the next batch of transactions from the DA layer using the checkpoint // It treats DA as a queue and only persists where it is in processing func (s *BasedSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextBatchRequest) (*coresequencer.GetNextBatchResponse, error) { - // If we have no cached transactions or we've consumed all from the current DA block, - // fetch the next DA epoch daHeight := s.GetDAHeight() - if len(s.currentBatchTxs) == 0 || s.checkpoint.TxIndex >= uint64(len(s.currentBatchTxs)) { + // If we have no cached transactions or we've consumed all from the current DA block, + // fetch the next DA epoch + if daHeight > 0 && (len(s.currentBatchTxs) == 0 || s.checkpoint.TxIndex >= uint64(len(s.currentBatchTxs))) { daEndTime, daEndHeight, err := s.fetchNextDAEpoch(ctx, req.MaxBytes) if err != nil { return nil, err diff --git a/sequencers/single/sequencer.go b/sequencers/single/sequencer.go index 5e2efec636..a2ab334376 100644 --- a/sequencers/single/sequencer.go +++ b/sequencers/single/sequencer.go @@ -86,16 +86,8 @@ func NewSequencer( // Load checkpoint from DB, or initialize if none exists checkpoint, err := s.checkpointStore.Load(loadCtx) - if err != nil { - if errors.Is(err, seqcommon.ErrCheckpointNotFound) { - // No checkpoint exists, initialize with current DA height - s.checkpoint = &seqcommon.Checkpoint{ - DAHeight: s.GetDAHeight(), - TxIndex: 0, - } - } else { - return nil, fmt.Errorf("failed to load checkpoint from DB: %w", err) - } + if err != nil && errors.Is(err, seqcommon.ErrCheckpointNotFound) { + return nil, fmt.Errorf("failed to load checkpoint from DB: %w", err) } else { s.checkpoint = checkpoint // If we had a non-zero tx index, we're resuming from a crash mid-block @@ -147,10 +139,19 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB return nil, ErrInvalidId } + daHeight := c.GetDAHeight() + + // checkpoint init path, only hit when sequencer is bootstrapping + if daHeight > 0 && c.checkpoint == nil { + c.checkpoint = &seqcommon.Checkpoint{ + DAHeight: daHeight, + TxIndex: 0, + } + } + // If we have no cached transactions or we've consumed all from the current cache, // fetch the next DA epoch - daHeight := c.GetDAHeight() - if len(c.cachedForcedInclusionTxs) == 0 || c.checkpoint.TxIndex >= uint64(len(c.cachedForcedInclusionTxs)) { + if daHeight > 0 && (len(c.cachedForcedInclusionTxs) == 0 || c.checkpoint.TxIndex >= uint64(len(c.cachedForcedInclusionTxs))) { daEndHeight, err := c.fetchNextDAEpoch(ctx, req.MaxBytes) if err != nil { return nil, err From f930626a5ff08a90fad174d185b413e107a22837 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 18 Dec 2025 17:54:35 +0100 Subject: [PATCH 06/14] footgun for based sequencer --- pkg/cmd/run_node.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/cmd/run_node.go b/pkg/cmd/run_node.go index 40e8b1fc13..8192a46804 100644 --- a/pkg/cmd/run_node.go +++ b/pkg/cmd/run_node.go @@ -135,6 +135,11 @@ func StartNode( return fmt.Errorf("unknown signer type: %s", nodeConfig.Signer.SignerType) } + // sanity check for based sequencer + if nodeConfig.Node.BasedSequencer && genesis.DAStartHeight == 0 { + return fmt.Errorf("A based sequencer requires DAStartHeight to be set in genesis. This value should be identical for all nodes of the chain.") + } + metrics := node.DefaultMetricsProvider(nodeConfig.Instrumentation) // Create and start the node From 9f8790264c7ad77c1d6cd9a425210c0462dc8bdf Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 18 Dec 2025 21:52:40 +0100 Subject: [PATCH 07/14] fixes --- pkg/cmd/run_node.go | 2 +- sequencers/single/sequencer.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/cmd/run_node.go b/pkg/cmd/run_node.go index 8192a46804..adcd7d6b34 100644 --- a/pkg/cmd/run_node.go +++ b/pkg/cmd/run_node.go @@ -137,7 +137,7 @@ func StartNode( // sanity check for based sequencer if nodeConfig.Node.BasedSequencer && genesis.DAStartHeight == 0 { - return fmt.Errorf("A based sequencer requires DAStartHeight to be set in genesis. This value should be identical for all nodes of the chain.") + return fmt.Errorf("A based sequencer requires DAStartHeight to be set in genesis. This value should be identical for all nodes of the chain") } metrics := node.DefaultMetricsProvider(nodeConfig.Instrumentation) diff --git a/sequencers/single/sequencer.go b/sequencers/single/sequencer.go index a2ab334376..41dc85f992 100644 --- a/sequencers/single/sequencer.go +++ b/sequencers/single/sequencer.go @@ -86,7 +86,7 @@ func NewSequencer( // Load checkpoint from DB, or initialize if none exists checkpoint, err := s.checkpointStore.Load(loadCtx) - if err != nil && errors.Is(err, seqcommon.ErrCheckpointNotFound) { + if err != nil && !errors.Is(err, seqcommon.ErrCheckpointNotFound) { return nil, fmt.Errorf("failed to load checkpoint from DB: %w", err) } else { s.checkpoint = checkpoint From 9a09230e3acf3febcae0383048c5f7184813fb13 Mon Sep 17 00:00:00 2001 From: julienrbrt Date: Fri, 19 Dec 2025 07:36:54 +0100 Subject: [PATCH 08/14] Update pkg/cmd/run_node.go Co-authored-by: Marko --- pkg/cmd/run_node.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cmd/run_node.go b/pkg/cmd/run_node.go index adcd7d6b34..99777d3dba 100644 --- a/pkg/cmd/run_node.go +++ b/pkg/cmd/run_node.go @@ -137,7 +137,7 @@ func StartNode( // sanity check for based sequencer if nodeConfig.Node.BasedSequencer && genesis.DAStartHeight == 0 { - return fmt.Errorf("A based sequencer requires DAStartHeight to be set in genesis. This value should be identical for all nodes of the chain") + return fmt.Errorf("based sequencing requires DAStartHeight to be set in genesis. This value should be identical for all nodes of the chain") } metrics := node.DefaultMetricsProvider(nodeConfig.Instrumentation) From 168d73b0c2b3fe0dc520aa36b935f54d79b0b16b Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 19 Dec 2025 10:29:08 +0100 Subject: [PATCH 09/14] fixes --- sequencers/based/sequencer.go | 2 +- sequencers/single/sequencer.go | 16 ++++++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/sequencers/based/sequencer.go b/sequencers/based/sequencer.go index e72e297a63..acc0b634eb 100644 --- a/sequencers/based/sequencer.go +++ b/sequencers/based/sequencer.go @@ -54,7 +54,7 @@ func NewBasedSequencer( logger: logger.With().Str("component", "based_sequencer").Logger(), checkpointStore: seqcommon.NewCheckpointStore(db, ds.NewKey("/based/checkpoint")), } - bs.SetDAHeight(genesis.DAStartHeight) // based sequencers need community approval of da start height given no submission are done + bs.SetDAHeight(genesis.DAStartHeight) // based sequencers need community consensus about the da start height given no submission are done // Load checkpoint from DB, or initialize if none exists loadCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) diff --git a/sequencers/single/sequencer.go b/sequencers/single/sequencer.go index 41dc85f992..5f613a7478 100644 --- a/sequencers/single/sequencer.go +++ b/sequencers/single/sequencer.go @@ -73,8 +73,8 @@ func NewSequencer( proposer: proposer, fiRetriever: fiRetriever, checkpointStore: seqcommon.NewCheckpointStore(db, ds.NewKey("/single/checkpoint")), - daHeight: atomic.Uint64{}, // empty, set by executor or submitter } + s.SetDAHeight(genesis.DAStartHeight) // default value, will be overriden by executor or submitter loadCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -86,8 +86,16 @@ func NewSequencer( // Load checkpoint from DB, or initialize if none exists checkpoint, err := s.checkpointStore.Load(loadCtx) - if err != nil && !errors.Is(err, seqcommon.ErrCheckpointNotFound) { - return nil, fmt.Errorf("failed to load checkpoint from DB: %w", err) + if err != nil { + if errors.Is(err, seqcommon.ErrCheckpointNotFound) { + // No checkpoint exists, initialize with current DA height + s.checkpoint = &seqcommon.Checkpoint{ + DAHeight: s.GetDAHeight(), + TxIndex: 0, + } + } else { + return nil, fmt.Errorf("failed to load checkpoint from DB: %w", err) + } } else { s.checkpoint = checkpoint // If we had a non-zero tx index, we're resuming from a crash mid-block @@ -142,7 +150,7 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB daHeight := c.GetDAHeight() // checkpoint init path, only hit when sequencer is bootstrapping - if daHeight > 0 && c.checkpoint == nil { + if c.checkpoint.DAHeight == 0 { c.checkpoint = &seqcommon.Checkpoint{ DAHeight: daHeight, TxIndex: 0, From a976da94ad419821e1668a6792a03afeb33446b1 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 19 Dec 2025 10:32:11 +0100 Subject: [PATCH 10/14] lint --- sequencers/single/sequencer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sequencers/single/sequencer.go b/sequencers/single/sequencer.go index 5f613a7478..db27cfe662 100644 --- a/sequencers/single/sequencer.go +++ b/sequencers/single/sequencer.go @@ -74,7 +74,7 @@ func NewSequencer( fiRetriever: fiRetriever, checkpointStore: seqcommon.NewCheckpointStore(db, ds.NewKey("/single/checkpoint")), } - s.SetDAHeight(genesis.DAStartHeight) // default value, will be overriden by executor or submitter + s.SetDAHeight(genesis.DAStartHeight) // default value, will be overridden by executor or submitter loadCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() From 1f8f9630b5d0b3db67acd7e014b8e362e3168fbd Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 19 Dec 2025 16:18:00 +0100 Subject: [PATCH 11/14] refactor fi retriever instantiation to get correct start height --- apps/evm/cmd/run.go | 11 +-- apps/grpc/cmd/run.go | 7 +- apps/testapp/cmd/run.go | 7 +- .../internal/da/forced_inclusion_retriever.go | 35 ++------ .../da/forced_inclusion_retriever_test.go | 43 ++-------- block/internal/syncing/syncer.go | 2 +- .../syncing/syncer_forced_inclusion_test.go | 16 ++-- block/public.go | 9 +-- sequencers/based/sequencer.go | 17 ++-- sequencers/based/sequencer_test.go | 6 +- sequencers/single/sequencer.go | 79 +++++++++++-------- sequencers/single/sequencer_test.go | 10 +-- 12 files changed, 91 insertions(+), 151 deletions(-) diff --git a/apps/evm/cmd/run.go b/apps/evm/cmd/run.go index e2175345e9..fd073c93ae 100644 --- a/apps/evm/cmd/run.go +++ b/apps/evm/cmd/run.go @@ -88,7 +88,7 @@ var RunCmd = &cobra.Command{ } // Create sequencer based on configuration - sequencer, err := createSequencer(context.Background(), logger, datastore, nodeConfig, genesis, daClient) + sequencer, err := createSequencer(logger, datastore, nodeConfig, genesis, daClient) if err != nil { return err } @@ -154,22 +154,20 @@ func init() { // If BasedSequencer is enabled, it creates a based sequencer that fetches transactions from DA. // Otherwise, it creates a single (traditional) sequencer. func createSequencer( - ctx context.Context, logger zerolog.Logger, datastore datastore.Batching, nodeConfig config.Config, genesis genesis.Genesis, daClient block.FullDAClient, ) (coresequencer.Sequencer, error) { - fiRetriever := block.NewForcedInclusionRetriever(daClient, genesis, datastore, logger) - if nodeConfig.Node.BasedSequencer { // Based sequencer mode - fetch transactions only from DA if !nodeConfig.Node.Aggregator { return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled") } - basedSeq, err := based.NewBasedSequencer(ctx, fiRetriever, datastore, genesis, logger) + fiRetriever := block.NewForcedInclusionRetriever(daClient, logger, genesis.DAStartHeight, genesis.DAEpochForcedInclusion) + basedSeq, err := based.NewBasedSequencer(fiRetriever, datastore, genesis, logger) if err != nil { return nil, fmt.Errorf("failed to create based sequencer: %w", err) } @@ -183,15 +181,12 @@ func createSequencer( } sequencer, err := single.NewSequencer( - ctx, logger, datastore, daClient, []byte(genesis.ChainID), nodeConfig.Node.BlockTime.Duration, - nodeConfig.Node.Aggregator, 1000, - fiRetriever, genesis, ) if err != nil { diff --git a/apps/grpc/cmd/run.go b/apps/grpc/cmd/run.go index 622b0813cc..51a1ad9d5d 100644 --- a/apps/grpc/cmd/run.go +++ b/apps/grpc/cmd/run.go @@ -120,7 +120,6 @@ func createSequencer( } daClient := block.NewDAClient(blobClient, nodeConfig, logger) - fiRetriever := block.NewForcedInclusionRetriever(daClient, genesis, datastore, logger) if nodeConfig.Node.BasedSequencer { // Based sequencer mode - fetch transactions only from DA @@ -128,7 +127,8 @@ func createSequencer( return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled") } - basedSeq, err := based.NewBasedSequencer(ctx, fiRetriever, datastore, genesis, logger) + fiRetriever := block.NewForcedInclusionRetriever(daClient, logger, genesis.DAStartHeight, genesis.DAEpochForcedInclusion) + basedSeq, err := based.NewBasedSequencer(fiRetriever, datastore, genesis, logger) if err != nil { return nil, fmt.Errorf("failed to create based sequencer: %w", err) } @@ -142,15 +142,12 @@ func createSequencer( } sequencer, err := single.NewSequencer( - ctx, logger, datastore, daClient, []byte(genesis.ChainID), nodeConfig.Node.BlockTime.Duration, - nodeConfig.Node.Aggregator, 1000, - fiRetriever, genesis, ) if err != nil { diff --git a/apps/testapp/cmd/run.go b/apps/testapp/cmd/run.go index 5df52be3ed..abe4923b7a 100644 --- a/apps/testapp/cmd/run.go +++ b/apps/testapp/cmd/run.go @@ -121,7 +121,6 @@ func createSequencer( } daClient := block.NewDAClient(blobClient, nodeConfig, logger) - fiRetriever := block.NewForcedInclusionRetriever(daClient, genesis, datastore, logger) if nodeConfig.Node.BasedSequencer { // Based sequencer mode - fetch transactions only from DA @@ -129,7 +128,8 @@ func createSequencer( return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled") } - basedSeq, err := based.NewBasedSequencer(ctx, fiRetriever, datastore, genesis, logger) + fiRetriever := block.NewForcedInclusionRetriever(daClient, logger, genesis.DAStartHeight, genesis.DAEpochForcedInclusion) + basedSeq, err := based.NewBasedSequencer(fiRetriever, datastore, genesis, logger) if err != nil { return nil, fmt.Errorf("failed to create based sequencer: %w", err) } @@ -143,15 +143,12 @@ func createSequencer( } sequencer, err := single.NewSequencer( - ctx, logger, datastore, daClient, []byte(genesis.ChainID), nodeConfig.Node.BlockTime.Duration, - nodeConfig.Node.Aggregator, 1000, - fiRetriever, genesis, ) if err != nil { diff --git a/block/internal/da/forced_inclusion_retriever.go b/block/internal/da/forced_inclusion_retriever.go index 3665dd5388..651c071a38 100644 --- a/block/internal/da/forced_inclusion_retriever.go +++ b/block/internal/da/forced_inclusion_retriever.go @@ -2,7 +2,6 @@ package da import ( "context" - "encoding/binary" "errors" "fmt" "time" @@ -10,8 +9,6 @@ import ( "github.com/rs/zerolog" datypes "github.com/evstack/ev-node/pkg/da/types" - "github.com/evstack/ev-node/pkg/genesis" - "github.com/evstack/ev-node/pkg/store" "github.com/evstack/ev-node/types" ) @@ -22,7 +19,6 @@ var ErrForceInclusionNotConfigured = errors.New("forced inclusion namespace not type ForcedInclusionRetriever struct { client Client logger zerolog.Logger - store store.Store daEpochSize uint64 daStartHeight uint64 } @@ -38,32 +34,27 @@ type ForcedInclusionEvent struct { // NewForcedInclusionRetriever creates a new forced inclusion retriever. func NewForcedInclusionRetriever( client Client, - genesis genesis.Genesis, - store store.Store, logger zerolog.Logger, + daStartHeight, daEpochSize uint64, ) *ForcedInclusionRetriever { - r := &ForcedInclusionRetriever{ - client: client, - store: store, - logger: logger.With().Str("component", "forced_inclusion_retriever").Logger(), - daEpochSize: genesis.DAEpochForcedInclusion, + return &ForcedInclusionRetriever{ + client: client, + logger: logger.With().Str("component", "forced_inclusion_retriever").Logger(), + daStartHeight: daStartHeight, + daEpochSize: daEpochSize, } - - // check for inclusion da height on store (sequencer) or genesis da height (full nodes) - r.daStartHeight = max(r.getInitialDAStartHeight(context.Background()), genesis.DAStartHeight) - return r } // RetrieveForcedIncludedTxs retrieves forced inclusion transactions at the given DA height. // It respects epoch boundaries and only fetches at epoch start. func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) { // when daStartHeight is not set or no namespace is configured, we retrieve nothing. - if !r.client.HasForcedInclusionNamespace() || r.daStartHeight == 0 { + if !r.client.HasForcedInclusionNamespace() { return nil, ErrForceInclusionNotConfigured } if daHeight < r.daStartHeight { - return nil, ErrForceInclusionNotConfigured + return nil, fmt.Errorf("DA height %d is before the configured start height %d", daHeight, r.daStartHeight) } epochStart, epochEnd, currentEpochNumber := types.CalculateEpochBoundaries(daHeight, r.daStartHeight, r.daEpochSize) @@ -184,13 +175,3 @@ func (r *ForcedInclusionRetriever) processForcedInclusionBlobs( return nil } - -// getInitialDAStartHeight retrieves the DA height of the first included chain height from store. -func (r *ForcedInclusionRetriever) getInitialDAStartHeight(ctx context.Context) uint64 { - daIncludedHeightBytes, err := r.store.GetMetadata(ctx, store.GenesisDAHeightKey) - if err != nil || len(daIncludedHeightBytes) != 8 { - return 0 - } - - return binary.LittleEndian.Uint64(daIncludedHeightBytes) -} diff --git a/block/internal/da/forced_inclusion_retriever_test.go b/block/internal/da/forced_inclusion_retriever_test.go index 55c9a8a4a5..6f35d0783f 100644 --- a/block/internal/da/forced_inclusion_retriever_test.go +++ b/block/internal/da/forced_inclusion_retriever_test.go @@ -5,15 +5,12 @@ import ( "testing" "time" - "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" "github.com/rs/zerolog" "github.com/stretchr/testify/mock" "gotest.tools/v3/assert" datypes "github.com/evstack/ev-node/pkg/da/types" "github.com/evstack/ev-node/pkg/genesis" - "github.com/evstack/ev-node/pkg/store" "github.com/evstack/ev-node/test/mocks" ) @@ -27,10 +24,7 @@ func TestNewForcedInclusionRetriever(t *testing.T) { DAEpochForcedInclusion: 10, } - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - st := store.New(ds) - - retriever := NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) + retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) assert.Assert(t, retriever != nil) } @@ -43,10 +37,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_NoNamespace(t *testi DAEpochForcedInclusion: 10, } - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - st := store.New(ds) - - retriever := NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) + retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) ctx := context.Background() _, err := retriever.RetrieveForcedIncludedTxs(ctx, 100) @@ -65,10 +56,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_NotAtEpochStart(t *t DAEpochForcedInclusion: 10, } - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - st := store.New(ds) - - retriever := NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) + retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) ctx := context.Background() // Height 105 is not an epoch start (100, 110, 120, etc. are epoch starts) @@ -101,10 +89,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_EpochStartSuccess(t DAEpochForcedInclusion: 1, // Single height epoch } - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - st := store.New(ds) - - retriever := NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) + retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) ctx := context.Background() // Height 100 is an epoch start @@ -131,10 +116,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_EpochStartNotAvailab DAEpochForcedInclusion: 10, } - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - st := store.New(ds) - - retriever := NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) + retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) ctx := context.Background() // Epoch boundaries: [100, 109] - retrieval happens at epoch end (109) @@ -157,10 +139,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_NoBlobsAtHeight(t *t DAEpochForcedInclusion: 1, // Single height epoch } - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - st := store.New(ds) - - retriever := NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) + retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) ctx := context.Background() event, err := retriever.RetrieveForcedIncludedTxs(ctx, 100) @@ -198,10 +177,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_MultiHeightEpoch(t * DAEpochForcedInclusion: 3, // Epoch: 100-102 } - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - st := store.New(ds) - - retriever := NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) + retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) ctx := context.Background() // Epoch boundaries: [100, 102] - retrieval happens at epoch end (102) @@ -225,10 +201,7 @@ func TestForcedInclusionRetriever_processForcedInclusionBlobs(t *testing.T) { DAEpochForcedInclusion: 10, } - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - st := store.New(ds) - - retriever := NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) + retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) tests := []struct { name string diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 4d3565cbc1..266bc55e40 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -187,7 +187,7 @@ func (s *Syncer) Start(ctx context.Context) error { // Initialize handlers s.daRetriever = NewDARetriever(s.daClient, s.cache, s.genesis, s.logger) - s.fiRetriever = da.NewForcedInclusionRetriever(s.daClient, s.genesis, s.store, s.logger) + s.fiRetriever = da.NewForcedInclusionRetriever(s.daClient, s.logger, s.genesis.DAStartHeight, s.genesis.DAEpochForcedInclusion) s.p2pHandler = NewP2PHandler(s.headerStore.Store(), s.dataStore.Store(), s.cache, s.genesis, s.logger) if currentHeight, err := s.store.Height(s.ctx); err != nil { s.logger.Error().Err(err).Msg("failed to set initial processed height for p2p handler") diff --git a/block/internal/syncing/syncer_forced_inclusion_test.go b/block/internal/syncing/syncer_forced_inclusion_test.go index 66546eae00..8b0622a42d 100644 --- a/block/internal/syncing/syncer_forced_inclusion_test.go +++ b/block/internal/syncing/syncer_forced_inclusion_test.go @@ -370,7 +370,7 @@ func TestVerifyForcedInclusionTxs_AllTransactionsIncluded(t *testing.T) { client.On("GetForcedInclusionNamespace").Return([]byte(cfg.DA.ForcedInclusionNamespace)).Maybe() client.On("HasForcedInclusionNamespace").Return(true).Maybe() daRetriever := NewDARetriever(client, cm, gen, zerolog.Nop()) - fiRetriever := da.NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) + fiRetriever := da.NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) s := NewSyncer( st, @@ -443,7 +443,7 @@ func TestVerifyForcedInclusionTxs_MissingTransactions(t *testing.T) { client.On("GetForcedInclusionNamespace").Return([]byte(cfg.DA.ForcedInclusionNamespace)).Maybe() client.On("HasForcedInclusionNamespace").Return(true).Maybe() daRetriever := NewDARetriever(client, cm, gen, zerolog.Nop()) - fiRetriever := da.NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) + fiRetriever := da.NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) s := NewSyncer( st, @@ -546,7 +546,7 @@ func TestVerifyForcedInclusionTxs_PartiallyIncluded(t *testing.T) { client.On("GetForcedInclusionNamespace").Return([]byte(cfg.DA.ForcedInclusionNamespace)).Maybe() client.On("HasForcedInclusionNamespace").Return(true).Maybe() daRetriever := NewDARetriever(client, cm, gen, zerolog.Nop()) - fiRetriever := da.NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) + fiRetriever := da.NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) s := NewSyncer( st, @@ -653,7 +653,7 @@ func TestVerifyForcedInclusionTxs_NoForcedTransactions(t *testing.T) { client.On("GetForcedInclusionNamespace").Return([]byte(cfg.DA.ForcedInclusionNamespace)).Maybe() client.On("HasForcedInclusionNamespace").Return(true).Maybe() daRetriever := NewDARetriever(client, cm, gen, zerolog.Nop()) - fiRetriever := da.NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) + fiRetriever := da.NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) s := NewSyncer( st, @@ -719,7 +719,7 @@ func TestVerifyForcedInclusionTxs_NamespaceNotConfigured(t *testing.T) { client.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe() client.On("HasForcedInclusionNamespace").Return(false).Maybe() daRetriever := NewDARetriever(client, cm, gen, zerolog.Nop()) - fiRetriever := da.NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) + fiRetriever := da.NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) s := NewSyncer( st, @@ -784,7 +784,7 @@ func TestVerifyForcedInclusionTxs_DeferralWithinEpoch(t *testing.T) { client.On("GetForcedInclusionNamespace").Return([]byte(cfg.DA.ForcedInclusionNamespace)).Maybe() client.On("HasForcedInclusionNamespace").Return(true).Maybe() daRetriever := NewDARetriever(client, cm, gen, zerolog.Nop()) - fiRetriever := da.NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) + fiRetriever := da.NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) s := NewSyncer( st, @@ -907,7 +907,7 @@ func TestVerifyForcedInclusionTxs_MaliciousAfterEpochEnd(t *testing.T) { client.On("GetForcedInclusionNamespace").Return([]byte(cfg.DA.ForcedInclusionNamespace)).Maybe() client.On("HasForcedInclusionNamespace").Return(true).Maybe() daRetriever := NewDARetriever(client, cm, gen, zerolog.Nop()) - fiRetriever := da.NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) + fiRetriever := da.NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) s := NewSyncer( st, @@ -996,7 +996,7 @@ func TestVerifyForcedInclusionTxs_SmoothingExceedsEpoch(t *testing.T) { client.On("HasForcedInclusionNamespace").Return(true).Maybe() daRetriever := NewDARetriever(client, cm, gen, zerolog.Nop()) - fiRetriever := da.NewForcedInclusionRetriever(client, gen, st, zerolog.Nop()) + fiRetriever := da.NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) s := NewSyncer( st, diff --git a/block/public.go b/block/public.go index e42cefc641..61a1e068a4 100644 --- a/block/public.go +++ b/block/public.go @@ -7,9 +7,6 @@ import ( "github.com/evstack/ev-node/block/internal/da" "github.com/evstack/ev-node/pkg/config" blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc" - "github.com/evstack/ev-node/pkg/genesis" - "github.com/evstack/ev-node/pkg/store" - "github.com/ipfs/go-datastore" "github.com/rs/zerolog" ) @@ -76,10 +73,8 @@ type ForcedInclusionRetriever interface { // NewForcedInclusionRetriever creates a new forced inclusion retriever func NewForcedInclusionRetriever( client DAClient, - genesis genesis.Genesis, - ds datastore.Batching, logger zerolog.Logger, + daStartHeight, daEpochSize uint64, ) ForcedInclusionRetriever { - mainKV := store.NewEvNodeKVStore(ds) - return da.NewForcedInclusionRetriever(client, genesis, store.New(mainKV), logger) + return da.NewForcedInclusionRetriever(client, logger, daStartHeight, daEpochSize) } diff --git a/sequencers/based/sequencer.go b/sequencers/based/sequencer.go index acc0b634eb..93960a304b 100644 --- a/sequencers/based/sequencer.go +++ b/sequencers/based/sequencer.go @@ -17,20 +17,15 @@ import ( seqcommon "github.com/evstack/ev-node/sequencers/common" ) -// ForcedInclusionRetriever defines the interface for retrieving forced inclusion transactions from DA -type ForcedInclusionRetriever interface { - RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*block.ForcedInclusionEvent, error) -} - var _ coresequencer.Sequencer = (*BasedSequencer)(nil) // BasedSequencer is a sequencer that only retrieves transactions from the DA layer // via the forced inclusion mechanism. It does not accept transactions from the reaper. // It uses DA as a queue and only persists a checkpoint of where it is in processing. type BasedSequencer struct { - fiRetriever ForcedInclusionRetriever - logger zerolog.Logger + logger zerolog.Logger + fiRetriever block.ForcedInclusionRetriever daHeight atomic.Uint64 checkpointStore *seqcommon.CheckpointStore checkpoint *seqcommon.Checkpoint @@ -43,18 +38,18 @@ type BasedSequencer struct { // NewBasedSequencer creates a new based sequencer instance func NewBasedSequencer( - ctx context.Context, - fiRetriever ForcedInclusionRetriever, + fiRetriever block.ForcedInclusionRetriever, db ds.Batching, genesis genesis.Genesis, logger zerolog.Logger, ) (*BasedSequencer, error) { bs := &BasedSequencer{ - fiRetriever: fiRetriever, logger: logger.With().Str("component", "based_sequencer").Logger(), checkpointStore: seqcommon.NewCheckpointStore(db, ds.NewKey("/based/checkpoint")), + fiRetriever: fiRetriever, } - bs.SetDAHeight(genesis.DAStartHeight) // based sequencers need community consensus about the da start height given no submission are done + // based sequencers need community consensus about the da start height given no submission are done + bs.SetDAHeight(genesis.DAStartHeight) // Load checkpoint from DB, or initialize if none exists loadCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) diff --git a/sequencers/based/sequencer_test.go b/sequencers/based/sequencer_test.go index 7d74cb52a8..60525f09c4 100644 --- a/sequencers/based/sequencer_test.go +++ b/sequencers/based/sequencer_test.go @@ -38,7 +38,7 @@ func createTestSequencer(t *testing.T, mockRetriever *MockForcedInclusionRetriev // Create in-memory datastore db := syncds.MutexWrap(ds.NewMapDatastore()) - seq, err := NewBasedSequencer(context.Background(), mockRetriever, db, gen, zerolog.Nop()) + seq, err := NewBasedSequencer(mockRetriever, db, gen, zerolog.Nop()) require.NoError(t, err) return seq } @@ -455,7 +455,7 @@ func TestBasedSequencer_CheckpointPersistence(t *testing.T) { db := syncds.MutexWrap(ds.NewMapDatastore()) // Create first sequencer - seq1, err := NewBasedSequencer(context.Background(), mockRetriever, db, gen, zerolog.Nop()) + seq1, err := NewBasedSequencer(mockRetriever, db, gen, zerolog.Nop()) require.NoError(t, err) req := coresequencer.GetNextBatchRequest{ @@ -470,7 +470,7 @@ func TestBasedSequencer_CheckpointPersistence(t *testing.T) { assert.Equal(t, 2, len(resp.Batch.Transactions)) // Create a new sequencer with the same datastore (simulating restart) - seq2, err := NewBasedSequencer(context.Background(), mockRetriever, db, gen, zerolog.Nop()) + seq2, err := NewBasedSequencer(mockRetriever, db, gen, zerolog.Nop()) require.NoError(t, err) // Checkpoint should be loaded from DB diff --git a/sequencers/single/sequencer.go b/sequencers/single/sequencer.go index db27cfe662..28d0960135 100644 --- a/sequencers/single/sequencer.go +++ b/sequencers/single/sequencer.go @@ -4,6 +4,7 @@ package single import ( "bytes" "context" + "encoding/binary" "encoding/hex" "errors" "fmt" @@ -17,32 +18,29 @@ import ( coresequencer "github.com/evstack/ev-node/core/sequencer" datypes "github.com/evstack/ev-node/pkg/da/types" "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/store" seqcommon "github.com/evstack/ev-node/sequencers/common" ) // ErrInvalidId is returned when the chain id is invalid var ErrInvalidId = errors.New("invalid chain id") -// ForcedInclusionRetriever defines the interface for retrieving forced inclusion transactions from DA -type ForcedInclusionRetriever interface { - RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*block.ForcedInclusionEvent, error) -} - var _ coresequencer.Sequencer = (*Sequencer)(nil) // Sequencer implements core sequencing interface type Sequencer struct { - fiRetriever ForcedInclusionRetriever - logger zerolog.Logger - proposer bool + logger zerolog.Logger + genesis genesis.Genesis + db ds.Batching - Id []byte - daVerifier block.DAVerifier + Id []byte + daClient block.FullDAClient batchTime time.Duration queue *BatchQueue // single queue for immediate availability // Forced inclusion support + fiRetriever block.ForcedInclusionRetriever daHeight atomic.Uint64 checkpointStore *seqcommon.CheckpointStore checkpoint *seqcommon.Checkpoint @@ -53,28 +51,25 @@ type Sequencer struct { // NewSequencer creates a new Single Sequencer func NewSequencer( - ctx context.Context, logger zerolog.Logger, db ds.Batching, - daVerifier block.DAVerifier, + daClient block.FullDAClient, id []byte, batchTime time.Duration, - proposer bool, maxQueueSize int, - fiRetriever ForcedInclusionRetriever, genesis genesis.Genesis, ) (*Sequencer, error) { s := &Sequencer{ + db: db, logger: logger, - daVerifier: daVerifier, + daClient: daClient, batchTime: batchTime, Id: id, queue: NewBatchQueue(db, "batches", maxQueueSize), - proposer: proposer, - fiRetriever: fiRetriever, checkpointStore: seqcommon.NewCheckpointStore(db, ds.NewKey("/single/checkpoint")), + genesis: genesis, } - s.SetDAHeight(genesis.DAStartHeight) // default value, will be overridden by executor or submitter + s.SetDAHeight(genesis.DAStartHeight) // default value, will be overriden by executor or submitter loadCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -93,6 +88,8 @@ func NewSequencer( DAHeight: s.GetDAHeight(), TxIndex: 0, } + + s.fiRetriever = block.NewForcedInclusionRetriever(daClient, logger, s.GetDAHeight(), genesis.DAEpochForcedInclusion) } else { return nil, fmt.Errorf("failed to load checkpoint from DB: %w", err) } @@ -106,11 +103,24 @@ func NewSequencer( Uint64("da_height", checkpoint.DAHeight). Msg("resuming from checkpoint within DA epoch") } + + s.fiRetriever = block.NewForcedInclusionRetriever(daClient, logger, s.GetDAHeight(), genesis.DAEpochForcedInclusion) } return s, nil } +// getInitialDAStartHeight retrieves the DA height of the first included chain height from store. +func getInitialDAStartHeight(ctx context.Context, db ds.Batching) uint64 { + s := store.New(store.NewEvNodeKVStore(db)) + daIncludedHeightBytes, err := s.GetMetadata(ctx, store.GenesisDAHeightKey) + if err != nil || len(daIncludedHeightBytes) != 8 { + return 0 + } + + return binary.LittleEndian.Uint64(daIncludedHeightBytes) +} + // SubmitBatchTxs implements sequencing.Sequencer. // It adds mempool transactions to a batch. func (c *Sequencer) SubmitBatchTxs(ctx context.Context, req coresequencer.SubmitBatchTxsRequest) (*coresequencer.SubmitBatchTxsResponse, error) { @@ -150,11 +160,13 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB daHeight := c.GetDAHeight() // checkpoint init path, only hit when sequencer is bootstrapping - if c.checkpoint.DAHeight == 0 { + if daHeight > 0 && c.checkpoint.DAHeight == 0 { c.checkpoint = &seqcommon.Checkpoint{ DAHeight: daHeight, TxIndex: 0, } + + c.fiRetriever = block.NewForcedInclusionRetriever(c.daClient, c.logger, getInitialDAStartHeight(ctx, c.db), c.genesis.DAEpochForcedInclusion) } // If we have no cached transactions or we've consumed all from the current cache, @@ -274,25 +286,21 @@ func (c *Sequencer) VerifyBatch(ctx context.Context, req coresequencer.VerifyBat return nil, ErrInvalidId } - if !c.proposer { - proofs, err := c.daVerifier.GetProofs(ctx, req.BatchData, c.Id) - if err != nil { - return nil, fmt.Errorf("failed to get proofs: %w", err) - } + proofs, err := c.daClient.GetProofs(ctx, req.BatchData, c.Id) + if err != nil { + return nil, fmt.Errorf("failed to get proofs: %w", err) + } - valid, err := c.daVerifier.Validate(ctx, req.BatchData, proofs, c.Id) - if err != nil { - return nil, fmt.Errorf("failed to validate proof: %w", err) - } + valid, err := c.daClient.Validate(ctx, req.BatchData, proofs, c.Id) + if err != nil { + return nil, fmt.Errorf("failed to validate proof: %w", err) + } - for _, v := range valid { - if !v { - return &coresequencer.VerifyBatchResponse{Status: false}, nil - } + for _, v := range valid { + if !v { + return &coresequencer.VerifyBatchResponse{Status: false}, nil } - return &coresequencer.VerifyBatchResponse{Status: true}, nil } - return &coresequencer.VerifyBatchResponse{Status: true}, nil } @@ -320,7 +328,8 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint Uint64("tx_index", c.checkpoint.TxIndex). Msg("fetching forced inclusion transactions from DA") - forcedTxsEvent, err := c.fiRetriever.RetrieveForcedIncludedTxs(ctx, currentDAHeight) + fiRetriever := block.NewForcedInclusionRetriever(c.daClient, c.logger, currentDAHeight, currentDAHeight+1) + forcedTxsEvent, err := fiRetriever.RetrieveForcedIncludedTxs(ctx, currentDAHeight) if err != nil { if errors.Is(err, datypes.ErrHeightFromFuture) { c.logger.Debug(). diff --git a/sequencers/single/sequencer_test.go b/sequencers/single/sequencer_test.go index a40ee29f81..8e656980fc 100644 --- a/sequencers/single/sequencer_test.go +++ b/sequencers/single/sequencer_test.go @@ -38,7 +38,7 @@ func newDummyDA(maxBlobSize uint64) *testda.DummyDA { } // newTestSequencer creates a sequencer for tests that don't need full initialization -func newTestSequencer(t *testing.T, db ds.Batching, fiRetriever ForcedInclusionRetriever, proposer bool) *Sequencer { +func newTestSequencer(t *testing.T, db ds.Batching, daClient block.FullDAClient) *Sequencer { ctx := context.Background() logger := zerolog.Nop() @@ -51,12 +51,10 @@ func newTestSequencer(t *testing.T, db ds.Batching, fiRetriever ForcedInclusionR ctx, logger, db, - nil, + daClient, []byte("test"), 1*time.Second, - proposer, 0, // unlimited queue - fiRetriever, gen, ) require.NoError(t, err) @@ -73,7 +71,7 @@ func TestSequencer_SubmitBatchTxs(t *testing.T) { mockRetriever := new(MockForcedInclusionRetriever) mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, mock.Anything). Return(nil, block.ErrForceInclusionNotConfigured).Maybe() - seq, err := NewSequencer(ctx, logger, db, dummyDA, Id, 10*time.Second, false, 1000, mockRetriever, genesis.Genesis{}) + seq, err := NewSequencer(ctx, logger, db, dummyDA, Id, 10*time.Second, 1000, genesis.Genesis{}) if err != nil { t.Fatalf("Failed to create sequencer: %v", err) } @@ -127,7 +125,7 @@ func TestSequencer_SubmitBatchTxs_EmptyBatch(t *testing.T) { mockRetriever := new(MockForcedInclusionRetriever) mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, mock.Anything). Return(nil, block.ErrForceInclusionNotConfigured).Maybe() - seq, err := NewSequencer(ctx, logger, db, dummyDA, Id, 10*time.Second, false, 1000, mockRetriever, genesis.Genesis{}) + seq, err := NewSequencer(ctx, logger, db, dummyDA, Id, 10*time.Second, 1000, genesis.Genesis{}) require.NoError(t, err, "Failed to create sequencer") defer func() { err := db.Close() From eadee6546c5028e119bee10387d3b8355766281b Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 19 Dec 2025 18:07:22 +0100 Subject: [PATCH 12/14] fixes --- sequencers/single/sequencer.go | 3 +- sequencers/single/sequencer_test.go | 460 ++++++++++------------------ 2 files changed, 168 insertions(+), 295 deletions(-) diff --git a/sequencers/single/sequencer.go b/sequencers/single/sequencer.go index 28d0960135..f64bd49db3 100644 --- a/sequencers/single/sequencer.go +++ b/sequencers/single/sequencer.go @@ -328,8 +328,7 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint Uint64("tx_index", c.checkpoint.TxIndex). Msg("fetching forced inclusion transactions from DA") - fiRetriever := block.NewForcedInclusionRetriever(c.daClient, c.logger, currentDAHeight, currentDAHeight+1) - forcedTxsEvent, err := fiRetriever.RetrieveForcedIncludedTxs(ctx, currentDAHeight) + forcedTxsEvent, err := c.fiRetriever.RetrieveForcedIncludedTxs(ctx, currentDAHeight) if err != nil { if errors.Is(err, datypes.ErrHeightFromFuture) { c.logger.Debug(). diff --git a/sequencers/single/sequencer_test.go b/sequencers/single/sequencer_test.go index 8e656980fc..3c0afdbf53 100644 --- a/sequencers/single/sequencer_test.go +++ b/sequencers/single/sequencer_test.go @@ -15,6 +15,7 @@ import ( "github.com/evstack/ev-node/block" coresequencer "github.com/evstack/ev-node/core/sequencer" + datypes "github.com/evstack/ev-node/pkg/da/types" "github.com/evstack/ev-node/pkg/genesis" damocks "github.com/evstack/ev-node/test/mocks" "github.com/evstack/ev-node/test/testda" @@ -33,13 +34,25 @@ func (m *MockForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Con return args.Get(0).(*block.ForcedInclusionEvent), args.Error(1) } +// MockFullDAClient combines MockClient and MockVerifier to implement FullDAClient +type MockFullDAClient struct { + *damocks.MockClient + *damocks.MockVerifier +} + +func newMockFullDAClient(t *testing.T) *MockFullDAClient { + return &MockFullDAClient{ + MockClient: damocks.NewMockClient(t), + MockVerifier: damocks.NewMockVerifier(t), + } +} + func newDummyDA(maxBlobSize uint64) *testda.DummyDA { return testda.New(testda.WithMaxBlobSize(maxBlobSize)) } // newTestSequencer creates a sequencer for tests that don't need full initialization func newTestSequencer(t *testing.T, db ds.Batching, daClient block.FullDAClient) *Sequencer { - ctx := context.Background() logger := zerolog.Nop() gen := genesis.Genesis{ @@ -48,7 +61,6 @@ func newTestSequencer(t *testing.T, db ds.Batching, daClient block.FullDAClient) } seq, err := NewSequencer( - ctx, logger, db, daClient, @@ -64,14 +76,9 @@ func newTestSequencer(t *testing.T, db ds.Batching, daClient block.FullDAClient) func TestSequencer_SubmitBatchTxs(t *testing.T) { dummyDA := newDummyDA(100_000_000) db := ds.NewMapDatastore() - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() Id := []byte("test1") logger := zerolog.Nop() - mockRetriever := new(MockForcedInclusionRetriever) - mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, mock.Anything). - Return(nil, block.ErrForceInclusionNotConfigured).Maybe() - seq, err := NewSequencer(ctx, logger, db, dummyDA, Id, 10*time.Second, 1000, genesis.Genesis{}) + seq, err := NewSequencer(logger, db, dummyDA, Id, 10*time.Second, 1000, genesis.Genesis{}) if err != nil { t.Fatalf("Failed to create sequencer: %v", err) } @@ -118,14 +125,9 @@ func TestSequencer_SubmitBatchTxs(t *testing.T) { func TestSequencer_SubmitBatchTxs_EmptyBatch(t *testing.T) { dummyDA := newDummyDA(100_000_000) db := ds.NewMapDatastore() - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() Id := []byte("test1") logger := zerolog.Nop() - mockRetriever := new(MockForcedInclusionRetriever) - mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, mock.Anything). - Return(nil, block.ErrForceInclusionNotConfigured).Maybe() - seq, err := NewSequencer(ctx, logger, db, dummyDA, Id, 10*time.Second, 1000, genesis.Genesis{}) + seq, err := NewSequencer(logger, db, dummyDA, Id, 10*time.Second, 1000, genesis.Genesis{}) require.NoError(t, err, "Failed to create sequencer") defer func() { err := db.Close() @@ -165,10 +167,7 @@ func TestSequencer_GetNextBatch_NoLastBatch(t *testing.T) { db := ds.NewMapDatastore() ctx := context.Background() logger := zerolog.Nop() - - mockRetriever := new(MockForcedInclusionRetriever) - mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, mock.Anything). - Return(nil, block.ErrForceInclusionNotConfigured).Maybe() + dummyDA := newDummyDA(100_000_000) gen := genesis.Genesis{ ChainID: "test", @@ -176,15 +175,12 @@ func TestSequencer_GetNextBatch_NoLastBatch(t *testing.T) { } seq, err := NewSequencer( - ctx, logger, db, - nil, + dummyDA, []byte("test"), 1*time.Second, - true, 0, // unlimited queue - mockRetriever, gen, ) require.NoError(t, err) @@ -219,11 +215,8 @@ func TestSequencer_GetNextBatch_Success(t *testing.T) { db := ds.NewMapDatastore() - mockRetriever := new(MockForcedInclusionRetriever) - mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, mock.Anything). - Return(nil, block.ErrForceInclusionNotConfigured).Maybe() - - seq := newTestSequencer(t, db, mockRetriever, true) + dummyDA := newDummyDA(100_000_000) + seq := newTestSequencer(t, db, dummyDA) defer func() { err := db.Close() if err != nil { @@ -271,152 +264,41 @@ func TestSequencer_VerifyBatch(t *testing.T) { require.NoError(err, "Failed to close datastore") }() - Id := []byte("test") batchData := [][]byte{[]byte("batch1"), []byte("batch2")} - proofs := [][]byte{[]byte("proof1"), []byte("proof2")} - - t.Run("Proposer Mode", func(t *testing.T) { - mockDAVerifier := damocks.NewMockVerifier(t) - mockRetriever := new(MockForcedInclusionRetriever) - mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, mock.Anything). - Return(nil, block.ErrForceInclusionNotConfigured).Maybe() + t.Run("Valid Batch", func(t *testing.T) { + dummyDA := newDummyDA(100_000_000) db := ds.NewMapDatastore() - seq := newTestSequencer(t, db, mockRetriever, true) - seq.daVerifier = mockDAVerifier + seq := newTestSequencer(t, db, dummyDA) defer db.Close() + // DummyDA always validates successfully res, err := seq.VerifyBatch(context.Background(), coresequencer.VerifyBatchRequest{Id: seq.Id, BatchData: batchData}) assert.NoError(err) assert.NotNil(res) - assert.True(res.Status, "Expected status to be true in proposer mode") - - mockDAVerifier.AssertNotCalled(t, "GetProofs", context.Background(), mock.Anything) - mockDAVerifier.AssertNotCalled(t, "Validate", mock.Anything, mock.Anything, mock.Anything) + assert.True(res.Status, "Expected status to be true for valid batch") }) - t.Run("Non-Proposer Mode", func(t *testing.T) { - t.Run("Valid Proofs", func(t *testing.T) { - mockDAVerifier := damocks.NewMockVerifier(t) - mockRetriever := new(MockForcedInclusionRetriever) - mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, mock.Anything). - Return(nil, block.ErrForceInclusionNotConfigured).Maybe() - - db := ds.NewMapDatastore() - seq := newTestSequencer(t, db, mockRetriever, false) - seq.daVerifier = mockDAVerifier - defer db.Close() - - mockDAVerifier.On("GetProofs", context.Background(), batchData, Id).Return(proofs, nil).Once() - mockDAVerifier.On("Validate", mock.Anything, batchData, proofs, Id).Return([]bool{true, true}, nil).Once() - - res, err := seq.VerifyBatch(context.Background(), coresequencer.VerifyBatchRequest{Id: seq.Id, BatchData: batchData}) - assert.NoError(err) - assert.NotNil(res) - assert.True(res.Status, "Expected status to be true for valid proofs") - mockDAVerifier.AssertExpectations(t) - }) - - t.Run("Invalid Proof", func(t *testing.T) { - mockDAVerifier := damocks.NewMockVerifier(t) - mockRetriever := new(MockForcedInclusionRetriever) - mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, mock.Anything). - Return(nil, block.ErrForceInclusionNotConfigured).Maybe() - - db := ds.NewMapDatastore() - seq := newTestSequencer(t, db, mockRetriever, false) - seq.daVerifier = mockDAVerifier - defer db.Close() - - mockDAVerifier.On("GetProofs", context.Background(), batchData, Id).Return(proofs, nil).Once() - mockDAVerifier.On("Validate", mock.Anything, batchData, proofs, Id).Return([]bool{true, false}, nil).Once() - - res, err := seq.VerifyBatch(context.Background(), coresequencer.VerifyBatchRequest{Id: seq.Id, BatchData: batchData}) - assert.NoError(err) - assert.NotNil(res) - assert.False(res.Status, "Expected status to be false for invalid proof") - mockDAVerifier.AssertExpectations(t) - }) - - t.Run("GetProofs Error", func(t *testing.T) { - mockDAVerifier := damocks.NewMockVerifier(t) - mockRetriever := new(MockForcedInclusionRetriever) - mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, mock.Anything). - Return(nil, block.ErrForceInclusionNotConfigured).Maybe() - - db := ds.NewMapDatastore() - seq := newTestSequencer(t, db, mockRetriever, false) - seq.daVerifier = mockDAVerifier - defer db.Close() - expectedErr := errors.New("get proofs failed") - - mockDAVerifier.On("GetProofs", context.Background(), batchData, Id).Return(nil, expectedErr).Once() - - res, err := seq.VerifyBatch(context.Background(), coresequencer.VerifyBatchRequest{Id: seq.Id, BatchData: batchData}) - assert.Error(err) - assert.Nil(res) - assert.Contains(err.Error(), expectedErr.Error()) - mockDAVerifier.AssertExpectations(t) - mockDAVerifier.AssertNotCalled(t, "Validate", mock.Anything, mock.Anything, mock.Anything) - }) - - t.Run("Validate Error", func(t *testing.T) { - mockDAVerifier := damocks.NewMockVerifier(t) - mockRetriever := new(MockForcedInclusionRetriever) - mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, mock.Anything). - Return(nil, block.ErrForceInclusionNotConfigured).Maybe() - - db := ds.NewMapDatastore() - seq := newTestSequencer(t, db, mockRetriever, false) - seq.daVerifier = mockDAVerifier - defer db.Close() - expectedErr := errors.New("validate failed") - - mockDAVerifier.On("GetProofs", context.Background(), batchData, Id).Return(proofs, nil).Once() - mockDAVerifier.On("Validate", mock.Anything, batchData, proofs, Id).Return(nil, expectedErr).Once() - - res, err := seq.VerifyBatch(context.Background(), coresequencer.VerifyBatchRequest{Id: seq.Id, BatchData: batchData}) - assert.Error(err) - assert.Nil(res) - assert.Contains(err.Error(), expectedErr.Error()) - mockDAVerifier.AssertExpectations(t) - }) - - t.Run("Invalid ID", func(t *testing.T) { - mockDAVerifier := damocks.NewMockVerifier(t) - mockRetriever := new(MockForcedInclusionRetriever) - mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, mock.Anything). - Return(nil, block.ErrForceInclusionNotConfigured).Maybe() - - db := ds.NewMapDatastore() - seq := newTestSequencer(t, db, mockRetriever, false) - seq.daVerifier = mockDAVerifier - defer db.Close() - - invalidId := []byte("invalid") - res, err := seq.VerifyBatch(context.Background(), coresequencer.VerifyBatchRequest{Id: invalidId, BatchData: batchData}) - assert.Error(err) - assert.Nil(res) - assert.ErrorIs(err, ErrInvalidId) - - mockDAVerifier.AssertNotCalled(t, "GetProofs", context.Background(), mock.Anything) - mockDAVerifier.AssertNotCalled(t, "Validate", mock.Anything, mock.Anything, mock.Anything) - }) + t.Run("Invalid ID", func(t *testing.T) { + dummyDA := newDummyDA(100_000_000) + db := ds.NewMapDatastore() + seq := newTestSequencer(t, db, dummyDA) + defer db.Close() + + invalidId := []byte("invalid") + res, err := seq.VerifyBatch(context.Background(), coresequencer.VerifyBatchRequest{Id: invalidId, BatchData: batchData}) + assert.Error(err) + assert.Nil(res) + assert.ErrorIs(err, ErrInvalidId) }) } func TestSequencer_GetNextBatch_BeforeDASubmission(t *testing.T) { - t.Skip() - // Initialize a new sequencer with mock DA - mockDA := damocks.NewMockVerifier(t) + // Initialize a new sequencer with dummy DA + dummyDA := newDummyDA(100_000_000) db := ds.NewMapDatastore() - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() logger := zerolog.Nop() - mockRetriever := new(MockForcedInclusionRetriever) - mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, mock.Anything). - Return(nil, block.ErrForceInclusionNotConfigured).Maybe() - seq, err := NewSequencer(ctx, logger, db, mockDA, []byte("test1"), 1*time.Second, false, 1000, mockRetriever, genesis.Genesis{}) + seq, err := NewSequencer(logger, db, dummyDA, []byte("test1"), 1*time.Second, 1000, genesis.Genesis{}) if err != nil { t.Fatalf("Failed to create sequencer: %v", err) } @@ -454,8 +336,6 @@ func TestSequencer_GetNextBatch_BeforeDASubmission(t *testing.T) { t.Fatal("Expected transaction to match submitted transaction") } - // Verify all mock expectations were met - mockDA.AssertExpectations(t) } func TestSequencer_GetNextBatch_ForcedInclusionAndBatch_MaxBytes(t *testing.T) { @@ -464,32 +344,41 @@ func TestSequencer_GetNextBatch_ForcedInclusionAndBatch_MaxBytes(t *testing.T) { // Create in-memory datastore db := ds.NewMapDatastore() + defer db.Close() - // Create mock forced inclusion retriever with txs that are 50 bytes each - mockFI := &MockForcedInclusionRetriever{} + // Create mock DA client with forced inclusion namespace + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + // Setup namespace methods + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // Create forced inclusion txs that are 50 and 60 bytes forcedTx1 := make([]byte, 50) forcedTx2 := make([]byte, 60) - mockFI.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(&block.ForcedInclusionEvent{ - Txs: [][]byte{forcedTx1, forcedTx2}, // Total 110 bytes - StartDaHeight: 100, - EndDaHeight: 100, - }, nil) + + // Mock Retrieve to return forced inclusion txs at DA height 100 + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess}, + Data: [][]byte{forcedTx1, forcedTx2}, + }).Once() gen := genesis.Genesis{ - ChainID: "test-chain", - DAStartHeight: 100, + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, } seq, err := NewSequencer( - ctx, logger, db, - nil, + mockDA, []byte("test-chain"), 1*time.Second, - true, 100, - mockFI, gen, ) require.NoError(t, err) @@ -536,8 +425,6 @@ func TestSequencer_GetNextBatch_ForcedInclusionAndBatch_MaxBytes(t *testing.T) { assert.GreaterOrEqual(t, len(resp.Batch.Transactions), 2, "Should have at least forced inclusion txs") assert.Equal(t, forcedTx1, resp.Batch.Transactions[0]) assert.Equal(t, forcedTx2, resp.Batch.Transactions[1]) - - mockFI.AssertExpectations(t) } func TestSequencer_GetNextBatch_ForcedInclusion_ExceedsMaxBytes(t *testing.T) { @@ -545,40 +432,46 @@ func TestSequencer_GetNextBatch_ForcedInclusion_ExceedsMaxBytes(t *testing.T) { logger := zerolog.New(zerolog.NewConsoleWriter()) db := ds.NewMapDatastore() + defer db.Close() + + // Create mock DA client + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() // Create forced inclusion txs where combined they exceed maxBytes - mockFI := &MockForcedInclusionRetriever{} forcedTx1 := make([]byte, 100) forcedTx2 := make([]byte, 80) // This would be deferred - mockFI.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(&block.ForcedInclusionEvent{ - Txs: [][]byte{forcedTx1, forcedTx2}, - StartDaHeight: 100, - EndDaHeight: 100, - }, nil).Once() - - // Second call won't fetch from DA - tx2 is still in cache - // Only after both txs are consumed will we fetch from DA height 101 - mockFI.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(101)).Return(&block.ForcedInclusionEvent{ - Txs: [][]byte{}, - StartDaHeight: 101, - EndDaHeight: 101, - }, nil).Maybe() + + // First DA retrieve at height 100 + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess}, + Data: [][]byte{forcedTx1, forcedTx2}, + }).Once() + + // Second DA retrieve at height 101 (empty) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess}, + Data: [][]byte{}, + }).Maybe() gen := genesis.Genesis{ - ChainID: "test-chain", - DAStartHeight: 100, + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, } seq, err := NewSequencer( - ctx, logger, db, - nil, + mockDA, []byte("test-chain"), 1*time.Second, - true, 100, - mockFI, gen, ) require.NoError(t, err) @@ -610,8 +503,6 @@ func TestSequencer_GetNextBatch_ForcedInclusion_ExceedsMaxBytes(t *testing.T) { // Checkpoint should have moved to next DA height after consuming all cached txs assert.Equal(t, uint64(101), seq.checkpoint.DAHeight, "Should have moved to next DA height") assert.Equal(t, uint64(0), seq.checkpoint.TxIndex, "TxIndex should be reset") - - mockFI.AssertExpectations(t) } func TestSequencer_GetNextBatch_AlwaysCheckPendingForcedInclusion(t *testing.T) { @@ -619,40 +510,43 @@ func TestSequencer_GetNextBatch_AlwaysCheckPendingForcedInclusion(t *testing.T) logger := zerolog.New(zerolog.NewConsoleWriter()) db := ds.NewMapDatastore() + defer db.Close() - mockFI := &MockForcedInclusionRetriever{} + // Create mock DA client + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") - // First call returns a large forced tx that will get evicted + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // First call returns large forced txs largeForcedTx1, largeForcedTx2 := make([]byte, 75), make([]byte, 75) - mockFI.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(&block.ForcedInclusionEvent{ - Txs: [][]byte{largeForcedTx1, largeForcedTx2}, - StartDaHeight: 100, - EndDaHeight: 100, - }, nil).Once() - - // Second call won't fetch from DA - forced tx is still in cache - // Only after the forced tx is consumed will we fetch from DA height 101 - mockFI.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(101)).Return(&block.ForcedInclusionEvent{ - Txs: [][]byte{}, - StartDaHeight: 101, - EndDaHeight: 101, - }, nil).Maybe() + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess}, + Data: [][]byte{largeForcedTx1, largeForcedTx2}, + }).Once() + + // Second call (height 101) returns empty + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess}, + Data: [][]byte{}, + }).Maybe() gen := genesis.Genesis{ - ChainID: "test-chain", - DAStartHeight: 100, + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, } seq, err := NewSequencer( - ctx, logger, db, - nil, + mockDA, []byte("test-chain"), 1*time.Second, - true, 100, - mockFI, gen, ) require.NoError(t, err) @@ -668,7 +562,7 @@ func TestSequencer_GetNextBatch_AlwaysCheckPendingForcedInclusion(t *testing.T) _, err = seq.SubmitBatchTxs(ctx, submitReq) require.NoError(t, err) - // First call with maxBytes = 100 + // First call with maxBytes = 125 getReq := coresequencer.GetNextBatchRequest{ Id: []byte("test-chain"), MaxBytes: 125, @@ -678,11 +572,11 @@ func TestSequencer_GetNextBatch_AlwaysCheckPendingForcedInclusion(t *testing.T) resp, err := seq.GetNextBatch(ctx, getReq) require.NoError(t, err) require.NotNil(t, resp.Batch) - assert.Equal(t, 2, len(resp.Batch.Transactions), "Should have 1 batch tx + 1 forced tx") + assert.Equal(t, 2, len(resp.Batch.Transactions), "Should have 1 forced tx + 1 batch tx") assert.Equal(t, 75, len(resp.Batch.Transactions[0])) // forced tx is 75 bytes assert.Equal(t, 50, len(resp.Batch.Transactions[1])) // batch tx is 50 bytes - // Verify checkpoint shows no forced tx was consumed (tx too large) + // Verify checkpoint shows one forced tx was consumed assert.Equal(t, uint64(1), seq.checkpoint.TxIndex, "Only one forced tx should be consumed") assert.Greater(t, len(seq.cachedForcedInclusionTxs), 1, "Remaining forced tx should still be cached") @@ -703,21 +597,10 @@ func TestSequencer_GetNextBatch_AlwaysCheckPendingForcedInclusion(t *testing.T) // Checkpoint should reflect that forced tx was consumed assert.Equal(t, uint64(101), seq.checkpoint.DAHeight, "Should have moved to next DA height") assert.Equal(t, uint64(0), seq.checkpoint.TxIndex, "TxIndex should be reset after consuming all") - - mockFI.AssertExpectations(t) } func TestSequencer_QueueLimit_Integration(t *testing.T) { // Test integration between sequencer and queue limits to demonstrate backpressure - db := ds.NewMapDatastore() - defer db.Close() - - mockDA := damocks.NewMockVerifier(t) - mockRetriever := new(MockForcedInclusionRetriever) - mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, mock.Anything). - Return(nil, block.ErrForceInclusionNotConfigured).Maybe() - - // Create a sequencer with a small queue limit for testing ctx := context.Background() logger := zerolog.Nop() @@ -726,16 +609,18 @@ func TestSequencer_QueueLimit_Integration(t *testing.T) { DAStartHeight: 100, } + db := ds.NewMapDatastore() + defer db.Close() + + dummyDA := newDummyDA(100_000_000) + seq, err := NewSequencer( - ctx, logger, db, - mockDA, + dummyDA, []byte("test"), time.Second, - true, 2, // Very small limit for testing - mockRetriever, gen, ) require.NoError(t, err) @@ -844,20 +729,14 @@ func TestSequencer_DAFailureAndQueueThrottling_Integration(t *testing.T) { // Create sequencer with small queue size to trigger throttling quickly queueSize := 3 // Small for testing logger := zerolog.Nop() - mockRetriever := new(MockForcedInclusionRetriever) - mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, mock.Anything). - Return(nil, block.ErrForceInclusionNotConfigured).Maybe() seq, err := NewSequencer( - context.Background(), logger, db, dummyDA, []byte("test-chain"), 100*time.Millisecond, - true, // proposer queueSize, - mockRetriever, // fiRetriever - genesis.Genesis{}, // genesis + genesis.Genesis{}, ) require.NoError(t, err) @@ -968,33 +847,40 @@ func TestSequencer_CheckpointPersistence_CrashRecovery(t *testing.T) { db := ds.NewMapDatastore() defer db.Close() + // Create mock DA client + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // Create forced inclusion txs at DA height 100 - mockFI := &MockForcedInclusionRetriever{} forcedTx1 := make([]byte, 100) forcedTx2 := make([]byte, 80) forcedTx3 := make([]byte, 90) - mockFI.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(&block.ForcedInclusionEvent{ - Txs: [][]byte{forcedTx1, forcedTx2, forcedTx3}, - StartDaHeight: 100, - EndDaHeight: 100, - }, nil) + + // Will be called twice - once for seq1, once for seq2 + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess}, + Data: [][]byte{forcedTx1, forcedTx2, forcedTx3}, + }).Twice() gen := genesis.Genesis{ - ChainID: "test-chain", - DAStartHeight: 100, + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, } // Create first sequencer instance seq1, err := NewSequencer( - ctx, logger, db, - nil, + mockDA, []byte("test-chain"), 1*time.Second, - true, 100, - mockFI, gen, ) require.NoError(t, err) @@ -1029,15 +915,12 @@ func TestSequencer_CheckpointPersistence_CrashRecovery(t *testing.T) { // SIMULATE CRASH: Create new sequencer instance with same DB // This simulates a node restart/crash seq2, err := NewSequencer( - ctx, logger, db, - nil, + mockDA, []byte("test-chain"), 1*time.Second, - true, 100, - mockFI, gen, ) require.NoError(t, err) @@ -1058,58 +941,51 @@ func TestSequencer_CheckpointPersistence_CrashRecovery(t *testing.T) { assert.Equal(t, uint64(0), seq2.checkpoint.TxIndex, "TxIndex should be reset") t.Log("✅ Checkpoint system successfully prevented re-execution of DA transactions after crash") - mockFI.AssertExpectations(t) } func TestSequencer_GetNextBatch_EmptyDABatch_IncreasesDAHeight(t *testing.T) { db := ds.NewMapDatastore() + defer db.Close() ctx := context.Background() - mockRetriever := new(MockForcedInclusionRetriever) + // Create mock DA client + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() // First DA epoch returns empty transactions - mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)). - Return(&block.ForcedInclusionEvent{ - Txs: [][]byte{}, - StartDaHeight: 100, - EndDaHeight: 105, - }, nil).Once() + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess}, + Data: [][]byte{}, + }).Once() // Second DA epoch also returns empty transactions - mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(106)). - Return(&block.ForcedInclusionEvent{ - Txs: [][]byte{}, - StartDaHeight: 106, - EndDaHeight: 111, - }, nil).Once() + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess}, + Data: [][]byte{}, + }).Once() gen := genesis.Genesis{ ChainID: "test", DAStartHeight: 100, - DAEpochForcedInclusion: 5, + DAEpochForcedInclusion: 1, } seq, err := NewSequencer( - ctx, zerolog.Nop(), db, - nil, + mockDA, []byte("test"), 1*time.Second, - true, 1000, - mockRetriever, gen, ) require.NoError(t, err) - defer func() { - err := db.Close() - if err != nil { - t.Fatalf("Failed to close sequencer: %v", err) - } - }() - req := coresequencer.GetNextBatchRequest{ Id: seq.Id, MaxBytes: 1000000, @@ -1127,22 +1003,20 @@ func TestSequencer_GetNextBatch_EmptyDABatch_IncreasesDAHeight(t *testing.T) { require.NotNil(t, resp.Batch) assert.Equal(t, 0, len(resp.Batch.Transactions)) - // DA height should have increased to 106 even though no transactions were processed - assert.Equal(t, uint64(106), seq.GetDAHeight()) - assert.Equal(t, uint64(106), seq.checkpoint.DAHeight) + // DA height should have increased to 101 even though no transactions were processed + assert.Equal(t, uint64(101), seq.GetDAHeight()) + assert.Equal(t, uint64(101), seq.checkpoint.DAHeight) assert.Equal(t, uint64(0), seq.checkpoint.TxIndex) - // Second batch - empty DA block at height 106 + // Second batch - empty DA block at height 101 resp, err = seq.GetNextBatch(ctx, req) require.NoError(t, err) require.NotNil(t, resp) require.NotNil(t, resp.Batch) assert.Equal(t, 0, len(resp.Batch.Transactions)) - // DA height should have increased to 112 - assert.Equal(t, uint64(112), seq.GetDAHeight()) - assert.Equal(t, uint64(112), seq.checkpoint.DAHeight) + // DA height should have increased to 102 + assert.Equal(t, uint64(102), seq.GetDAHeight()) + assert.Equal(t, uint64(102), seq.checkpoint.DAHeight) assert.Equal(t, uint64(0), seq.checkpoint.TxIndex) - - mockRetriever.AssertExpectations(t) } From 55060e710116306b54fe4bda3cc2f7e5cb853155 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 19 Dec 2025 18:11:27 +0100 Subject: [PATCH 13/14] typo --- sequencers/single/sequencer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sequencers/single/sequencer.go b/sequencers/single/sequencer.go index f64bd49db3..918721d5fe 100644 --- a/sequencers/single/sequencer.go +++ b/sequencers/single/sequencer.go @@ -69,7 +69,7 @@ func NewSequencer( checkpointStore: seqcommon.NewCheckpointStore(db, ds.NewKey("/single/checkpoint")), genesis: genesis, } - s.SetDAHeight(genesis.DAStartHeight) // default value, will be overriden by executor or submitter + s.SetDAHeight(genesis.DAStartHeight) // default value, will be overridden by executor or submitter loadCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() From c2e2cb846c1cbba42ce21909e14e6ec9e9b29677 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 19 Dec 2025 18:19:33 +0100 Subject: [PATCH 14/14] fix --- sequencers/single/sequencer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sequencers/single/sequencer.go b/sequencers/single/sequencer.go index 918721d5fe..7575a398f6 100644 --- a/sequencers/single/sequencer.go +++ b/sequencers/single/sequencer.go @@ -104,7 +104,7 @@ func NewSequencer( Msg("resuming from checkpoint within DA epoch") } - s.fiRetriever = block.NewForcedInclusionRetriever(daClient, logger, s.GetDAHeight(), genesis.DAEpochForcedInclusion) + s.fiRetriever = block.NewForcedInclusionRetriever(daClient, logger, getInitialDAStartHeight(context.Background(), s.db), genesis.DAEpochForcedInclusion) } return s, nil