diff --git a/sequencers/single/queue.go b/sequencers/single/queue.go index ffea618fb..e2b20b4f1 100644 --- a/sequencers/single/queue.go +++ b/sequencers/single/queue.go @@ -2,9 +2,11 @@ package single import ( "context" + "encoding/binary" "encoding/hex" "errors" "fmt" + "strconv" "sync" ds "github.com/ipfs/go-datastore" @@ -20,26 +22,52 @@ import ( // ErrQueueFull is returned when the batch queue has reached its maximum size var ErrQueueFull = errors.New("batch queue is full") +// initialSeqNum is the starting sequence number for new queues. +// It is set to the middle of the uint64 range to allow for both +// appending (incrementing) and prepending (decrementing) transactions. +const initialSeqNum = uint64(0x8000000000000000) + +// queuedItem holds a batch and its associated persistence key +type queuedItem struct { + Batch coresequencer.Batch + Key string +} + // BatchQueue implements a persistent queue for transaction batches type BatchQueue struct { - queue []coresequencer.Batch + queue []queuedItem head int // index of the first element in the queue maxQueueSize int // maximum number of batches allowed in queue (0 = unlimited) - mu sync.Mutex - db ds.Batching + + // Sequence numbers for generating new keys + nextAddSeq uint64 + nextPrependSeq uint64 + + mu sync.Mutex + db ds.Batching } // NewBatchQueue creates a new BatchQueue with the specified maximum size. // If maxSize is 0, the queue will be unlimited. func NewBatchQueue(db ds.Batching, prefix string, maxSize int) *BatchQueue { return &BatchQueue{ - queue: make([]coresequencer.Batch, 0), - head: 0, - maxQueueSize: maxSize, - db: store.NewPrefixKVStore(db, prefix), + queue: make([]queuedItem, 0), + head: 0, + maxQueueSize: maxSize, + db: store.NewPrefixKVStore(db, prefix), + nextAddSeq: initialSeqNum, + nextPrependSeq: initialSeqNum - 1, } } +// seqToKey converts a sequence number to a hex-encoded big-endian key. +// We use big-endian so that lexicographical sort order matches numeric order. +func seqToKey(seq uint64) string { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, seq) + return hex.EncodeToString(b) +} + // AddBatch adds a new transaction to the queue and writes it to the WAL. // Returns ErrQueueFull if the queue has reached its maximum size. func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) error { @@ -53,12 +81,14 @@ func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) e return ErrQueueFull } - if err := bq.persistBatch(ctx, batch); err != nil { + key := seqToKey(bq.nextAddSeq) + if err := bq.persistBatch(ctx, batch, key); err != nil { return err } + bq.nextAddSeq++ // Then add to in-memory queue - bq.queue = append(bq.queue, batch) + bq.queue = append(bq.queue, queuedItem{Batch: batch, Key: key}) return nil } @@ -68,25 +98,27 @@ func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) e // The batch is persisted to the DB to ensure durability in case of crashes. // // NOTE: Prepend intentionally bypasses the maxQueueSize limit to ensure high-priority -// transactions can always be re-queued. This means the effective queue size may temporarily -// exceed the configured maximum when Prepend is used. This is by design to prevent loss -// of transactions that have already been accepted but couldn't fit in the current batch. +// transactions can always be re-queued. func (bq *BatchQueue) Prepend(ctx context.Context, batch coresequencer.Batch) error { bq.mu.Lock() defer bq.mu.Unlock() - if err := bq.persistBatch(ctx, batch); err != nil { + key := seqToKey(bq.nextPrependSeq) + if err := bq.persistBatch(ctx, batch, key); err != nil { return err } + bq.nextPrependSeq-- + + item := queuedItem{Batch: batch, Key: key} // Then add to in-memory queue // If we have room before head, use it if bq.head > 0 { bq.head-- - bq.queue[bq.head] = batch + bq.queue[bq.head] = item } else { // Need to expand the queue at the front - bq.queue = append([]coresequencer.Batch{batch}, bq.queue...) + bq.queue = append([]queuedItem{item}, bq.queue...) } return nil @@ -102,8 +134,9 @@ func (bq *BatchQueue) Next(ctx context.Context) (*coresequencer.Batch, error) { return &coresequencer.Batch{Transactions: nil}, nil } - batch := bq.queue[bq.head] - bq.queue[bq.head] = coresequencer.Batch{} // Release memory for the dequeued element + item := bq.queue[bq.head] + // Release memory for the dequeued element + bq.queue[bq.head] = queuedItem{} bq.head++ // Compact when head gets too large to prevent memory leaks @@ -112,28 +145,22 @@ func (bq *BatchQueue) Next(ctx context.Context) (*coresequencer.Batch, error) { // frequent compactions on small queues if bq.head > len(bq.queue)/2 && bq.head > 100 { remaining := copy(bq.queue, bq.queue[bq.head:]) - // Zero out the rest of the slice to release memory + // Zero out the rest of the slice for i := remaining; i < len(bq.queue); i++ { - bq.queue[i] = coresequencer.Batch{} + bq.queue[i] = queuedItem{} } bq.queue = bq.queue[:remaining] bq.head = 0 } - hash, err := batch.Hash() - if err != nil { - return &coresequencer.Batch{Transactions: nil}, err - } - key := hex.EncodeToString(hash) - // Delete the batch from the WAL since it's been processed - err = bq.db.Delete(ctx, ds.NewKey(key)) - if err != nil { + // Use the stored key directly + if err := bq.db.Delete(ctx, ds.NewKey(item.Key)); err != nil { // Log the error but continue fmt.Printf("Error deleting processed batch: %v\n", err) } - return &batch, nil + return &item.Batch, nil } // Load reloads all batches from WAL file into the in-memory queue after a crash or restart @@ -141,30 +168,79 @@ func (bq *BatchQueue) Load(ctx context.Context) error { bq.mu.Lock() defer bq.mu.Unlock() - // Clear the current queue - bq.queue = make([]coresequencer.Batch, 0) + // Clear the current queue and reset sequences + bq.queue = make([]queuedItem, 0) bq.head = 0 + bq.nextAddSeq = initialSeqNum + bq.nextPrependSeq = initialSeqNum - 1 - q := query.Query{} + q := query.Query{ + Orders: []query.Order{query.OrderByKey{}}, + } results, err := bq.db.Query(ctx, q) if err != nil { return fmt.Errorf("error querying datastore: %w", err) } defer results.Close() - // Load each batch + var legacyItems []queuedItem for result := range results.Next() { if result.Error != nil { fmt.Printf("Error reading entry from datastore: %v\n", result.Error) continue } - pbBatch := &pb.Batch{} - err := proto.Unmarshal(result.Value, pbBatch) + // We care about the last part of the key (the sequence number) + // ds.Key usually has a leading slash. + keyName := ds.NewKey(result.Key).Name() + + var pbBatch pb.Batch + err := proto.Unmarshal(result.Value, &pbBatch) if err != nil { - fmt.Printf("Error decoding batch for key '%s': %v. Skipping entry.\n", result.Key, err) + fmt.Printf("Error decoding batch for key '%s': %v. Skipping entry.\n", keyName, err) + continue + } + + batch := coresequencer.Batch{Transactions: pbBatch.Txs} + + // Check if key is valid hex sequence number (16 hex chars) + // We use strict 16 check because seqToKey always produces 16 hex chars. + isValid := false + if len(keyName) == 16 { + if seq, err := strconv.ParseUint(keyName, 16, 64); err == nil { + isValid = true + if seq >= bq.nextAddSeq { + bq.nextAddSeq = seq + 1 + } + if seq <= bq.nextPrependSeq { + bq.nextPrependSeq = seq - 1 + } + } + } + if isValid { + bq.queue = append(bq.queue, queuedItem{Batch: batch, Key: keyName}) + } else { + legacyItems = append(legacyItems, queuedItem{Batch: batch, Key: result.Key}) + } + } + if len(legacyItems) == 0 { + return nil + } + fmt.Printf("Found %d legacy items to migrate...\n", len(legacyItems)) + + for _, item := range legacyItems { + newKeyName := seqToKey(bq.nextAddSeq) + + if err := bq.persistBatch(ctx, item.Batch, newKeyName); err != nil { + fmt.Printf("Failed to migrate legacy item %s: %v\n", item.Key, err) continue } - bq.queue = append(bq.queue, coresequencer.Batch{Transactions: pbBatch.Txs}) + + if err := bq.db.Delete(ctx, ds.NewKey(item.Key)); err != nil { + fmt.Printf("Failed to delete legacy key %s after migration: %v\n", item.Key, err) + } + + bq.queue = append(bq.queue, queuedItem{Batch: item.Batch, Key: newKeyName}) + bq.nextAddSeq++ } return nil @@ -178,14 +254,8 @@ func (bq *BatchQueue) Size() int { return len(bq.queue) - bq.head } -// persistBatch persists a batch to the datastore -func (bq *BatchQueue) persistBatch(ctx context.Context, batch coresequencer.Batch) error { - hash, err := batch.Hash() - if err != nil { - return err - } - key := hex.EncodeToString(hash) - +// persistBatch persists a batch to the datastore with the given key +func (bq *BatchQueue) persistBatch(ctx context.Context, batch coresequencer.Batch, key string) error { pbBatch := &pb.Batch{ Txs: batch.Transactions, } @@ -195,7 +265,7 @@ func (bq *BatchQueue) persistBatch(ctx context.Context, batch coresequencer.Batc return err } - // First write to DB for durability + // Write to DB if err := bq.db.Put(ctx, ds.NewKey(key), encodedBatch); err != nil { return err } diff --git a/sequencers/single/queue_migration_test.go b/sequencers/single/queue_migration_test.go new file mode 100644 index 000000000..355676553 --- /dev/null +++ b/sequencers/single/queue_migration_test.go @@ -0,0 +1,166 @@ +package single + +import ( + "context" + "testing" + + ds "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/query" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + + coresequencer "github.com/evstack/ev-node/core/sequencer" + pb "github.com/evstack/ev-node/types/pb/evnode/v1" +) + +func TestLoad_MigratesLegacyKeys(t *testing.T) { + require := require.New(t) + ctx := context.Background() + + // Use an in-memory datastore + memdb := ds.NewMapDatastore() + prefix := "batching" // Prefix used by the queue + + // 1. Setup DB with mixed data: valid keys and legacy keys + // Valid key (seq 0x8000...00) + validSeq := initialSeqNum + validKey := seqToKey(validSeq) // 16-char hex + validBatch := coresequencer.Batch{Transactions: [][]byte{[]byte("valid-tx")}} + + // Persist valid batch manually + pbValid := &pb.Batch{Txs: validBatch.Transactions} + validBytes, _ := proto.Marshal(pbValid) + // Note: NewBatchQueue uses "batching" prefix, so keys are /batching/ + err := memdb.Put(ctx, ds.NewKey("/"+prefix+"/"+validKey), validBytes) + require.NoError(err) + + // Legacy key (just a random hash string, not 16 hex chars) + // Let's use 32 bytes hex encoded -> 64 chars + legacyHash := "aabbccddeeff00112233445566778899aabbccddeeff00112233445566778899" + legacyBatch := coresequencer.Batch{Transactions: [][]byte{[]byte("legacy-tx")}} + + pbLegacy := &pb.Batch{Txs: legacyBatch.Transactions} + legacyBytes, _ := proto.Marshal(pbLegacy) + err = memdb.Put(ctx, ds.NewKey("/"+prefix+"/"+legacyHash), legacyBytes) + require.NoError(err) + + // Another legacy key (to test multiple) + legacyHash2 := "112233445566778899aabbccddeeff00112233445566778899aabbccddeeff00" + legacyBatch2 := coresequencer.Batch{Transactions: [][]byte{[]byte("legacy-tx-2")}} + pbLegacy2 := &pb.Batch{Txs: legacyBatch2.Transactions} + legacyBytes2, _ := proto.Marshal(pbLegacy2) + err = memdb.Put(ctx, ds.NewKey("/"+prefix+"/"+legacyHash2), legacyBytes2) + require.NoError(err) + + // 2. Create Queue and call Load + bq := NewBatchQueue(memdb, prefix, 0) + err = bq.Load(ctx) + require.NoError(err) + + // 3. Verify Queue State + // 1 valid + 2 legacy = 3 items total + require.Equal(3, bq.Size()) + + // Check Order: + // Appended items should be LAST. + // Valid strings are < Appended strings (because we use nextAddSeq which is > validSeq) + // So Next() should return Valid item first. + // Between legacy items, the order depends on how they were iterated from DB and appended. + // Iteration order: L2 (11...), L1 (aa...) + // Process L2: append -> [Valid, L2] + // Process L1: append -> [Valid, L2, L1] + + // So expected retrieval order: Valid, L2, L1. + + // 1st Item (Valid) + item1, err := bq.Next(ctx) + require.NoError(err) + require.Equal(validBatch.Transactions, item1.Transactions) + + // 2nd Item (L2) + item2, err := bq.Next(ctx) + require.NoError(err) + require.Equal(legacyBatch2.Transactions, item2.Transactions) + + // 3rd Item (L1) + item3, err := bq.Next(ctx) + require.NoError(err) + require.Equal(legacyBatch.Transactions, item3.Transactions) + + // Queue empty + require.Equal(0, bq.Size()) + + // 4. Verify DB State + // Old legacy keys should be gone from the DB (under the prefix). + // New seq keys should exist. + + q := query.Query{Prefix: "/" + prefix} + results, err := memdb.Query(ctx, q) + require.NoError(err) + defer results.Close() + + keys := make([]string, 0) + for range results.Next() { + keys = append(keys, "found") + } + + // We expect 0 keys in DB because we called Next() 3 times which deletes them from DB as well! + require.Empty(keys, "Expected DB to be empty after processing all items") +} + +func TestLoad_Migration_DBCheck(t *testing.T) { + require := require.New(t) + ctx := context.Background() + memdb := ds.NewMapDatastore() + prefix := "batching" + + // Setup valid existing item as well to verify it stays before legacy + validSeq := initialSeqNum + validKey := seqToKey(validSeq) + validBatch := coresequencer.Batch{Transactions: [][]byte{[]byte("valid-data")}} + pbValid := &pb.Batch{Txs: validBatch.Transactions} + validBytes, _ := proto.Marshal(pbValid) + err := memdb.Put(ctx, ds.NewKey("/"+prefix+"/"+validKey), validBytes) + require.NoError(err) + + // Setup legacy key + legacyHash := "aabbccddeeff00112233445566778899aabbccddeeff00112233445566778899" + legacyBatch := coresequencer.Batch{Transactions: [][]byte{[]byte("legacy-data")}} + pbLegacy := &pb.Batch{Txs: legacyBatch.Transactions} + legacyBytes, _ := proto.Marshal(pbLegacy) + err = memdb.Put(ctx, ds.NewKey("/"+prefix+"/"+legacyHash), legacyBytes) + require.NoError(err) + + // Load + bq := NewBatchQueue(memdb, prefix, 0) + require.NoError(bq.Load(ctx)) + + // Verify DB keys + q := query.Query{Prefix: "/" + prefix} + results, err := memdb.Query(ctx, q) + require.NoError(err) + defer results.Close() + + foundLegacy := false + foundValid := false + for res := range results.Next() { + k := ds.NewKey(res.Key).Name() + if k == legacyHash { + t.Errorf("Legacy key %s still exists in DB", legacyHash) + } + // Check if it's the expected migrated key (initialSeqNum + 1) + // validSeq = initialSeqNum. So legacy should be at initialSeqNum + 1. + expectedKey := seqToKey(initialSeqNum + 1) + if k == expectedKey { + foundLegacy = true + var pbCheck pb.Batch + _ = proto.Unmarshal(res.Value, &pbCheck) + require.Equal(legacyBatch.Transactions, pbCheck.Txs) + } + if k == validKey { + foundValid = true + } + } + require.True(foundValid, "Valid key should persist") + require.True(foundLegacy, "Migrated key not found in DB") +} diff --git a/sequencers/single/queue_test.go b/sequencers/single/queue_test.go index e79c28e08..f344f27ec 100644 --- a/sequencers/single/queue_test.go +++ b/sequencers/single/queue_test.go @@ -228,6 +228,8 @@ func TestLoad_WithMixedData(t *testing.T) { require.NotNil(bq) // 1. Add valid batch data under the correct prefix + // Use valid hex sequence keys to ensure Load parses them correctly if needed + key1 := "8000000000000001" validBatch1 := createTestBatch(t, 3) hash1, err := validBatch1.Hash() require.NoError(err) @@ -235,9 +237,10 @@ func TestLoad_WithMixedData(t *testing.T) { pbBatch1 := &pb.Batch{Txs: validBatch1.Transactions} encodedBatch1, err := proto.Marshal(pbBatch1) require.NoError(err) - err = rawDB.Put(ctx, ds.NewKey(queuePrefix+hexHash1), encodedBatch1) + err = rawDB.Put(ctx, ds.NewKey(queuePrefix+key1), encodedBatch1) require.NoError(err) + key2 := "8000000000000002" validBatch2 := createTestBatch(t, 5) hash2, err := validBatch2.Hash() require.NoError(err) @@ -245,7 +248,7 @@ func TestLoad_WithMixedData(t *testing.T) { pbBatch2 := &pb.Batch{Txs: validBatch2.Transactions} encodedBatch2, err := proto.Marshal(pbBatch2) require.NoError(err) - err = rawDB.Put(ctx, ds.NewKey(queuePrefix+hexHash2), encodedBatch2) + err = rawDB.Put(ctx, ds.NewKey(queuePrefix+key2), encodedBatch2) require.NoError(err) // 3. Add data outside the queue's prefix @@ -258,8 +261,8 @@ func TestLoad_WithMixedData(t *testing.T) { // Ensure all data is initially present in the raw DB initialKeys := map[string]bool{ - queuePrefix + hexHash1: true, - queuePrefix + hexHash2: true, + queuePrefix + key1: true, + queuePrefix + key2: true, otherDataKey1.String(): true, otherDataKey2.String(): true, } @@ -287,7 +290,7 @@ func TestLoad_WithMixedData(t *testing.T) { loadedHashes := make(map[string]bool) bq.mu.Lock() for i := bq.head; i < len(bq.queue); i++ { - h, _ := bq.queue[i].Hash() + h, _ := bq.queue[i].Batch.Hash() loadedHashes[hex.EncodeToString(h)] = true } bq.mu.Unlock() @@ -303,6 +306,42 @@ func TestLoad_WithMixedData(t *testing.T) { require.Equal([]byte("more data"), val) } +func TestBatchQueue_Load_SetsSequencesProperly(t *testing.T) { + ctx := context.Background() + db := ds.NewMapDatastore() + prefix := "test-load-sequences" + + // Build some persisted state with both AddBatch and Prepend so we have + // keys on both sides of the initialSeqNum. + q1 := NewBatchQueue(db, prefix, 0) + require.NoError(t, q1.Load(ctx)) + + require.NoError(t, q1.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("add-1")}})) // initialSeqNum + require.NoError(t, q1.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("add-2")}})) // initialSeqNum+1 + + require.NoError(t, q1.Prepend(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("pre-1")}})) // initialSeqNum-1 + require.NoError(t, q1.Prepend(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("pre-2")}})) // initialSeqNum-2 + + // Simulate restart. + q2 := NewBatchQueue(db, prefix, 0) + require.NoError(t, q2.Load(ctx)) + + // After Load(), the sequencers should be positioned to avoid collisions: + // - nextAddSeq should be (maxSeq + 1) + // - nextPrependSeq should be (minSeq - 1) + require.Equal(t, initialSeqNum+2, q2.nextAddSeq, "nextAddSeq should continue after the max loaded key") + require.Equal(t, initialSeqNum-3, q2.nextPrependSeq, "nextPrependSeq should continue before the min loaded key") + + // Verify we actually use those sequences when persisting new items. + require.NoError(t, q2.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("add-after-load")}})) + _, err := q2.db.Get(ctx, ds.NewKey(seqToKey(initialSeqNum+2))) + require.NoError(t, err, "expected AddBatch after Load to persist using nextAddSeq key") + + require.NoError(t, q2.Prepend(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("pre-after-load")}})) + _, err = q2.db.Get(ctx, ds.NewKey(seqToKey(initialSeqNum-3))) + require.NoError(t, err, "expected Prepend after Load to persist using nextPrependSeq key") +} + func TestConcurrency(t *testing.T) { bq := setupTestQueue(t) ctx := context.Background()