Skip to content
SP StackPractices
intermediate By StackPractices

Sequential Convoy Pattern

Preserve message ordering for related messages in a distributed system by grouping them into ordered sequences and processing them one at a time through a single consumer.

Note: This guide follows English-language naming conventions and terminology standards common in international development teams. Examples use English identifiers and comments to maximize compatibility across codebases and tooling.

Sequential Convoy Pattern

Overview

The Sequential Convoy Pattern preserves the order of related messages in a distributed messaging system. While many message brokers guarantee ordering within a single partition or queue, they typically process unrelated messages in parallel for throughput. When messages have a causal relationship — for example, “create user” followed by “update user” — processing them out of order leads to inconsistent state.

This pattern groups related messages into a convoy (a sequence) and ensures they are processed by a single consumer in order. Unrelated convoys can still be processed in parallel, preserving both correctness and throughput.

When to Use

  • Messages for the same entity must be processed in the order they were produced
  • Event sourcing where events for an aggregate must be applied sequentially
  • Order processing pipelines where status transitions depend on prior states
  • Inventory systems where stock movements must be applied chronologically
  • Multi-step workflows where step N cannot begin until step N-1 completes

When to Avoid

  • Messages have no causal relationship — parallel processing is simpler and faster
  • Strict ordering is not required (e.g., independent analytics events)
  • The system can tolerate eventual consistency without ordering guarantees
  • Message volumes per convoy are so high that single-consumer processing creates a bottleneck

Solution

Python (Kafka with Partition Key)

from kafka import KafkaProducer, KafkaConsumer
from kafka.partitioner import DefaultPartitioner
import json
import time

class OrderedMessageProducer:
    """Produce messages that maintain ordering per entity"""

    def __init__(self, bootstrap_servers):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            # Partition by entity ID ensures all messages for the same entity
            # go to the same partition, preserving order
            partitioner=lambda key, partitions, topic: (
                hash(key) % len(partitions) if key else 0
            )
        )

    def send_event(self, entity_id: str, event_type: str, payload: dict):
        """Send event with entity_id as partition key"""
        message = {
            'entity_id': entity_id,
            'event_type': event_type,
            'payload': payload,
            'timestamp': time.time(),
            'sequence_number': self._get_next_sequence(entity_id)
        }
        # Using entity_id as key ensures all events for this entity
        # land in the same partition, preserving order
        self.producer.send(
            'entity-events',
            key=entity_id.encode('utf-8'),
            value=message
        )
        self.producer.flush()

    def _get_next_sequence(self, entity_id: str) -> int:
        # In production, use a database or Redis counter
        # This is a simplified example
        return int(time.time() * 1000)

class SequentialConvoyConsumer:
    """Process messages in order per entity"""

    def __init__(self, bootstrap_servers, topic, group_id):
        self.consumer = KafkaConsumer(
            topic,
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            auto_offset_reset='earliest',
            # Critical: max one consumer per partition to preserve order
            max_poll_records=1,
            enable_auto_commit=False
        )
        self.pending_sequences: dict = {}
        self.last_processed: dict = {}

    def process_messages(self):
        for message in self.consumer:
            event = json.loads(message.value)
            entity_id = event['entity_id']
            seq_num = event['sequence_number']

            # Ensure in-order processing
            expected_seq = self.last_processed.get(entity_id, 0) + 1

            if seq_num == expected_seq:
                self._process_event(event)
                self.last_processed[entity_id] = seq_num
                self._check_pending(entity_id)
            elif seq_num > expected_seq:
                # Out of order — buffer and wait for missing messages
                self.pending_sequences.setdefault(entity_id, {})[seq_num] = event
                print(f"Buffering out-of-order message {seq_num} for {entity_id}")
            else:
                # Duplicate or already processed
                print(f"Skipping duplicate/late message {seq_num} for {entity_id}")

            self.consumer.commit()

    def _process_event(self, event):
        print(f"Processing {event['event_type']} for {event['entity_id']}")
        # Business logic here

    def _check_pending(self, entity_id):
        """Process any buffered messages that are now in order"""
        pending = self.pending_sequences.get(entity_id, {})
        expected = self.last_processed.get(entity_id, 0) + 1

        while expected in pending:
            event = pending.pop(expected)
            self._process_event(event)
            self.last_processed[entity_id] = expected
            expected += 1

# Usage
producer = OrderedMessageProducer(['localhost:9092'])
producer.send_event('user-123', 'created', {'name': 'Alice'})
producer.send_event('user-123', 'updated', {'name': 'Alice Smith'})
producer.send_event('user-123', 'deleted', {})

consumer = SequentialConvoyConsumer(['localhost:9092'], 'entity-events', 'convoy-group')
consumer.process_messages()

Java (Azure Service Bus Sessions)

import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusSenderClient;
import com.azure.messaging.servicebus.ServiceBusProcessorClient;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;

import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

public class SequentialConvoyServiceBus {

    private static final String CONNECTION_STRING = "<connection-string>";
    private static final String QUEUE_NAME = "ordered-queue";

    // Producer: uses session ID to group ordered messages
    public static class OrderedProducer {
        private final ServiceBusSenderClient sender;

        public OrderedProducer() {
            this.sender = new ServiceBusClientBuilder()
                .connectionString(CONNECTION_STRING)
                .sender()
                .queueName(QUEUE_NAME)
                .buildClient();
        }

        public void sendOrderedEvents(String entityId, List<DomainEvent> events) {
            for (int i = 0; i < events.size(); i++) {
                ServiceBusMessage message = new ServiceBusMessage(
                    serializeEvent(events.get(i))
                );
                // Session ID ensures all messages with the same ID
                // are processed by the same consumer in order
                message.setSessionId(entityId);
                message.setApplicationProperty("sequenceNumber", i);

                sender.sendMessage(message);
            }
        }
    }

    // Consumer: processes one session at a time, preserving order
    public static class OrderedConsumer {

        public void startProcessing() {
            ServiceBusProcessorClient processor = new ServiceBusClientBuilder()
                .connectionString(CONNECTION_STRING)
                .processor()
                .queueName(QUEUE_NAME)
                .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
                .processMessage(this::processMessage)
                .processError(this::handleError)
                // Prefetch count 1 ensures strict ordering within session
                .prefetchCount(1)
                .buildProcessorClient();

            processor.start();
        }

        private void processMessage(ServiceBusReceivedMessageContext context) {
            ServiceBusReceivedMessage message = context.getMessage();
            String sessionId = message.getSessionId();
            int sequenceNumber = (int) message.getApplicationProperties()
                .get("sequenceNumber");

            // Process in order — Service Bus guarantees this
            // because only one processor handles a session at a time
            DomainEvent event = deserializeEvent(message.getBody().toString());
            applyEvent(sessionId, event);

            context.complete();
        }

        private void handleError(ServiceBusErrorContext context) {
            System.err.println("Error: " + context.getException().getMessage());
        }
    }
}

JavaScript (Redis Streams with Consumer Groups)

const Redis = require('ioredis');

class SequentialConvoyProcessor {
    constructor(redis, streamKey) {
        this.redis = redis;
        this.streamKey = streamKey;
        this.groupName = 'convoy-processors';
    }

    async initialize() {
        try {
            await this.redis.xgroup('CREATE', this.streamKey,
                this.groupName, '0', 'MKSTREAM');
        } catch (err) {
            if (!err.message.includes('already exists')) throw err;
        }
    }

    async produceEvent(entityId, eventType, payload) {
        const sequence = await this.redis.incr(`seq:${entityId}`);

        await this.redis.xadd(this.streamKey, '*',
            'entityId', entityId,
            'sequence', sequence.toString(),
            'eventType', eventType,
            'payload', JSON.stringify(payload)
        );
    }

    async consumeOrdered(consumerId) {
        const results = await this.redis.xreadgroup(
            'GROUP', this.groupName, consumerId,
            'COUNT', 1,
            'BLOCK', 5000,
            'STREAMS', this.streamKey, '>'
        );

        if (!results || results.length === 0) return null;

        const [[, messages]] = results;
        const [id, fields] = messages[0];

        const event = this.parseFields(fields);
        const entityId = event.entityId;
        const sequence = parseInt(event.sequence);

        // Check ordering
        const lastProcessed = await this.redis.get(`last:${entityId}`);
        const expected = lastProcessed ? parseInt(lastProcessed) + 1 : 1;

        if (sequence === expected) {
            await this.processEvent(event);
            await this.redis.set(`last:${entityId}`, sequence);
            await this.redis.xack(this.streamKey, this.groupName, id);
            return event;
        } else if (sequence > expected) {
            // Requeue — will be retried when sequence is complete
            console.log(`Out of order: expected ${expected}, got ${sequence}`);
            return null;
        } else {
            // Duplicate
            await this.redis.xack(this.streamKey, this.groupName, id);
            return null;
        }
    }

    parseFields(fields) {
        const obj = {};
        for (let i = 0; i < fields.length; i += 2) {
            obj[fields[i]] = fields[i + 1];
        }
        return obj;
    }

    async processEvent(event) {
        console.log(`Processing ${event.eventType} for ${event.entityId}`);
        // Business logic
    }
}

Explanation

The pattern relies on two key mechanisms:

  1. Partitioning by entity ID: Messages for the same entity are routed to the same partition/queue/session. This is done using a partition key (Kafka), session ID (Service Bus), or entity field (Redis).

  2. Single consumer per partition: Only one consumer processes messages from a given partition at a time. This prevents two consumers from handling different messages for the same entity simultaneously, which would violate ordering.

The trade-off is reduced parallelism per entity — all messages for user-123 must be processed sequentially. However, messages for user-456 can be processed in parallel on a different partition.

Variants

VariantMechanismBest For
Kafka partition keyHash-based partition assignmentHigh throughput, simple ordering
Service Bus sessionsSession-aware load balancingCloud-native, exactly-once per session
RabbitMQ single active consumerExclusive consumer per queueSimple queue-based ordering
Database sequence tableOptimistic locking on sequence numbersSystems without message broker ordering
Sagas with orchestrationExplicit step ordering in workflow engineComplex multi-step business processes

Best Practices

  • Use a deterministic partition key. The entity ID should consistently map to the same partition. Changing the key invalidates ordering.
  • Monitor partition skew. If one entity generates 90% of messages, its partition becomes a bottleneck. Consider splitting hot entities.
  • Handle missing messages gracefully. If sequence N never arrives, the convoy stalls. Implement timeouts and alerts.
  • Keep convoys small. Long-running convoys hold up new messages. Design for short, bounded sequences.
  • Idempotent processing within convoys. Even with ordering, retries can cause duplicates. Make individual operations idempotent.

Common Mistakes

  • Changing partition keys. Rebalancing Kafka partitions changes which consumer handles which entity, violating ordering assumptions.
  • Multiple consumers per partition. Two consumers reading the same partition will process messages in parallel for the same entity.
  • Not handling sequence gaps. A lost message in a sequence blocks all subsequent messages forever.
  • Overly large convoys. A convoy that processes thousands of messages for one entity creates a hotspot.
  • Ignoring producer retries. A retried message may be reordered relative to a newer message if not using the same partition key.

Real-World Examples

Kafka Partitioning

Kafka guarantees order within a partition. By using the user ID as the partition key, all events for a user are ordered. Uber uses this for trip events: trip-created, driver-assigned, trip-started, trip-completed must be processed in order for fare calculation.

Azure Service Bus Sessions

Service Bus sessions provide FIFO ordering within a session. An e-commerce platform uses sessions per shopping cart: item-added, quantity-changed, checkout-initiated, payment-received must be processed sequentially to maintain cart consistency.

Event Store DB

Event Store DB uses optimistic concurrency control on streams. Each aggregate (e.g., an order) is a stream, and events are appended with expected version numbers. Concurrent writers fail if the stream has been modified, preserving ordering.

Frequently Asked Questions

Q: How is this different from just using a single queue? A: A single queue forces ALL messages to be sequential, destroying throughput. The convoy pattern only sequences messages for the same entity; different entities process in parallel.

Q: What happens if a message in the sequence is lost? A: The convoy stalls at the gap. Solutions: implement a timeout with alerting, use a dead letter queue for gaps, or design messages to not depend on every intermediate step.

Q: Can I have multiple consumers for fault tolerance? A: Yes — Kafka consumer groups balance partitions across consumers, but each partition is owned by exactly one consumer at a time. If a consumer fails, its partitions are reassigned to another.

Q: Does this pattern work across regions? A: Cross-region ordering is extremely difficult. Most systems accept eventual consistency across regions and use the convoy pattern within a single region.

Q: How do I handle a consumer that is slow but correct? A: Slow consumers delay all messages in their partitions. Solutions: split the entity into sub-entities with different keys, or use a dedicated high-priority consumer for the slow partition.