Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 115 additions & 45 deletions sequencers/single/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package single

import (
"context"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"strconv"
"sync"

ds "github.com/ipfs/go-datastore"
Expand All @@ -20,26 +22,52 @@ import (
// ErrQueueFull is returned when the batch queue has reached its maximum size
var ErrQueueFull = errors.New("batch queue is full")

// initialSeqNum is the starting sequence number for new queues.
// It is set to the middle of the uint64 range to allow for both
// appending (incrementing) and prepending (decrementing) transactions.
const initialSeqNum = uint64(0x8000000000000000)

// queuedItem holds a batch and its associated persistence key
type queuedItem struct {
Batch coresequencer.Batch
Key string
}

// BatchQueue implements a persistent queue for transaction batches
type BatchQueue struct {
queue []coresequencer.Batch
queue []queuedItem
head int // index of the first element in the queue
maxQueueSize int // maximum number of batches allowed in queue (0 = unlimited)
mu sync.Mutex
db ds.Batching

// Sequence numbers for generating new keys
nextAddSeq uint64
nextPrependSeq uint64

mu sync.Mutex
db ds.Batching
}

// NewBatchQueue creates a new BatchQueue with the specified maximum size.
// If maxSize is 0, the queue will be unlimited.
func NewBatchQueue(db ds.Batching, prefix string, maxSize int) *BatchQueue {
return &BatchQueue{
queue: make([]coresequencer.Batch, 0),
head: 0,
maxQueueSize: maxSize,
db: store.NewPrefixKVStore(db, prefix),
queue: make([]queuedItem, 0),
head: 0,
maxQueueSize: maxSize,
db: store.NewPrefixKVStore(db, prefix),
nextAddSeq: initialSeqNum,
nextPrependSeq: initialSeqNum - 1,
}
}

// seqToKey converts a sequence number to a hex-encoded big-endian key.
// We use big-endian so that lexicographical sort order matches numeric order.
func seqToKey(seq uint64) string {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, seq)
return hex.EncodeToString(b)
}

// AddBatch adds a new transaction to the queue and writes it to the WAL.
// Returns ErrQueueFull if the queue has reached its maximum size.
func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) error {
Expand All @@ -53,12 +81,14 @@ func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) e
return ErrQueueFull
}

if err := bq.persistBatch(ctx, batch); err != nil {
key := seqToKey(bq.nextAddSeq)
if err := bq.persistBatch(ctx, batch, key); err != nil {
return err
}
bq.nextAddSeq++

// Then add to in-memory queue
bq.queue = append(bq.queue, batch)
bq.queue = append(bq.queue, queuedItem{Batch: batch, Key: key})

return nil
}
Expand All @@ -68,25 +98,27 @@ func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) e
// The batch is persisted to the DB to ensure durability in case of crashes.
//
// NOTE: Prepend intentionally bypasses the maxQueueSize limit to ensure high-priority
// transactions can always be re-queued. This means the effective queue size may temporarily
// exceed the configured maximum when Prepend is used. This is by design to prevent loss
// of transactions that have already been accepted but couldn't fit in the current batch.
// transactions can always be re-queued.
func (bq *BatchQueue) Prepend(ctx context.Context, batch coresequencer.Batch) error {
bq.mu.Lock()
defer bq.mu.Unlock()

if err := bq.persistBatch(ctx, batch); err != nil {
key := seqToKey(bq.nextPrependSeq)
if err := bq.persistBatch(ctx, batch, key); err != nil {
return err
}
bq.nextPrependSeq--

item := queuedItem{Batch: batch, Key: key}

// Then add to in-memory queue
// If we have room before head, use it
if bq.head > 0 {
bq.head--
bq.queue[bq.head] = batch
bq.queue[bq.head] = item
} else {
// Need to expand the queue at the front
bq.queue = append([]coresequencer.Batch{batch}, bq.queue...)
bq.queue = append([]queuedItem{item}, bq.queue...)
}

return nil
Expand All @@ -102,8 +134,9 @@ func (bq *BatchQueue) Next(ctx context.Context) (*coresequencer.Batch, error) {
return &coresequencer.Batch{Transactions: nil}, nil
}

batch := bq.queue[bq.head]
bq.queue[bq.head] = coresequencer.Batch{} // Release memory for the dequeued element
item := bq.queue[bq.head]
// Release memory for the dequeued element
bq.queue[bq.head] = queuedItem{}
bq.head++

// Compact when head gets too large to prevent memory leaks
Expand All @@ -112,59 +145,102 @@ func (bq *BatchQueue) Next(ctx context.Context) (*coresequencer.Batch, error) {
// frequent compactions on small queues
if bq.head > len(bq.queue)/2 && bq.head > 100 {
remaining := copy(bq.queue, bq.queue[bq.head:])
// Zero out the rest of the slice to release memory
// Zero out the rest of the slice
for i := remaining; i < len(bq.queue); i++ {
bq.queue[i] = coresequencer.Batch{}
bq.queue[i] = queuedItem{}
}
bq.queue = bq.queue[:remaining]
bq.head = 0
}

hash, err := batch.Hash()
if err != nil {
return &coresequencer.Batch{Transactions: nil}, err
}
key := hex.EncodeToString(hash)

// Delete the batch from the WAL since it's been processed
err = bq.db.Delete(ctx, ds.NewKey(key))
if err != nil {
// Use the stored key directly
if err := bq.db.Delete(ctx, ds.NewKey(item.Key)); err != nil {
// Log the error but continue
fmt.Printf("Error deleting processed batch: %v\n", err)
}

return &batch, nil
return &item.Batch, nil
}

// Load reloads all batches from WAL file into the in-memory queue after a crash or restart
func (bq *BatchQueue) Load(ctx context.Context) error {
bq.mu.Lock()
defer bq.mu.Unlock()

// Clear the current queue
bq.queue = make([]coresequencer.Batch, 0)
// Clear the current queue and reset sequences
bq.queue = make([]queuedItem, 0)
bq.head = 0
bq.nextAddSeq = initialSeqNum
bq.nextPrependSeq = initialSeqNum - 1

q := query.Query{}
q := query.Query{
Orders: []query.Order{query.OrderByKey{}},
}
results, err := bq.db.Query(ctx, q)
if err != nil {
return fmt.Errorf("error querying datastore: %w", err)
}
defer results.Close()

// Load each batch
var legacyItems []queuedItem
for result := range results.Next() {
if result.Error != nil {
fmt.Printf("Error reading entry from datastore: %v\n", result.Error)
continue
}
pbBatch := &pb.Batch{}
err := proto.Unmarshal(result.Value, pbBatch)
// We care about the last part of the key (the sequence number)
// ds.Key usually has a leading slash.
keyName := ds.NewKey(result.Key).Name()

var pbBatch pb.Batch
err := proto.Unmarshal(result.Value, &pbBatch)
if err != nil {
fmt.Printf("Error decoding batch for key '%s': %v. Skipping entry.\n", result.Key, err)
fmt.Printf("Error decoding batch for key '%s': %v. Skipping entry.\n", keyName, err)
continue
}

batch := coresequencer.Batch{Transactions: pbBatch.Txs}

// Check if key is valid hex sequence number (16 hex chars)
// We use strict 16 check because seqToKey always produces 16 hex chars.
isValid := false
if len(keyName) == 16 {
if seq, err := strconv.ParseUint(keyName, 16, 64); err == nil {
isValid = true
if seq >= bq.nextAddSeq {
bq.nextAddSeq = seq + 1
}
if seq <= bq.nextPrependSeq {
bq.nextPrependSeq = seq - 1
}
}
}
if isValid {
bq.queue = append(bq.queue, queuedItem{Batch: batch, Key: keyName})
} else {
legacyItems = append(legacyItems, queuedItem{Batch: batch, Key: result.Key})
}
}
if len(legacyItems) == 0 {
return nil
}
fmt.Printf("Found %d legacy items to migrate...\n", len(legacyItems))

for _, item := range legacyItems {
newKeyName := seqToKey(bq.nextAddSeq)

if err := bq.persistBatch(ctx, item.Batch, newKeyName); err != nil {
fmt.Printf("Failed to migrate legacy item %s: %v\n", item.Key, err)
continue
}
bq.queue = append(bq.queue, coresequencer.Batch{Transactions: pbBatch.Txs})

if err := bq.db.Delete(ctx, ds.NewKey(item.Key)); err != nil {
fmt.Printf("Failed to delete legacy key %s after migration: %v\n", item.Key, err)
}

bq.queue = append(bq.queue, queuedItem{Batch: item.Batch, Key: newKeyName})
bq.nextAddSeq++
}

return nil
Expand All @@ -178,14 +254,8 @@ func (bq *BatchQueue) Size() int {
return len(bq.queue) - bq.head
}

// persistBatch persists a batch to the datastore
func (bq *BatchQueue) persistBatch(ctx context.Context, batch coresequencer.Batch) error {
hash, err := batch.Hash()
if err != nil {
return err
}
key := hex.EncodeToString(hash)

// persistBatch persists a batch to the datastore with the given key
func (bq *BatchQueue) persistBatch(ctx context.Context, batch coresequencer.Batch, key string) error {
pbBatch := &pb.Batch{
Txs: batch.Transactions,
}
Expand All @@ -195,7 +265,7 @@ func (bq *BatchQueue) persistBatch(ctx context.Context, batch coresequencer.Batc
return err
}

// First write to DB for durability
// Write to DB
if err := bq.db.Put(ctx, ds.NewKey(key), encodedBatch); err != nil {
return err
}
Expand Down
Loading
Loading