Redis Streams란?
Redis Streams는 Redis 5.0에서 도입된 로그 기반 데이터 구조입니다. Kafka와 유사한 개념이지만, 훨씬 가볍고 설정이 간단합니다. 소비자 그룹(Consumer Group)을 지원하여 여러 워커가 메시지를 분산 처리할 수 있습니다.
기본 명령어
XADD mystream * name "order" action "created" orderId "123"
XRANGE mystream - +
XREAD BLOCK 5000 STREAMS mystream $
XINFO STREAM mystream
XLEN mystream
Consumer Group 설정
XGROUP CREATE mystream order-processors 0
XREADGROUP GROUP order-processors worker1 COUNT 10 BLOCK 5000 STREAMS mystream >
XACK mystream order-processors 1234567890-0
XPENDING mystream order-processors
Node.js에서 이벤트 파이프라인 구현
Producer (이벤트 발행)
const Redis = require('ioredis');
const redis = new Redis();
async function publishEvent(stream, eventType, data) {
const id = await redis.xadd(
stream, '*',
'type', eventType,
'data', JSON.stringify(data),
'timestamp', Date.now().toString()
);
console.log(\`Published \${eventType} to \${stream}: \${id}\`);
return id;
}
await publishEvent('orders', 'order.created', {
orderId: 'ORD-001',
userId: 42,
items: [{ productId: 'P1', qty: 2, price: 29900 }],
total: 59800
});
Consumer (이벤트 소비)
const Redis = require('ioredis');
const redis = new Redis();
async function initConsumerGroup(stream, group) {
try {
await redis.xgroup('CREATE', stream, group, '0', 'MKSTREAM');
} catch (err) {
if (!err.message.includes('BUSYGROUP')) throw err;
}
}
async function consumeEvents(stream, group, consumer, handler) {
await initConsumerGroup(stream, group);
while (true) {
try {
const results = await redis.xreadgroup(
'GROUP', group, consumer,
'COUNT', 10, 'BLOCK', 5000,
'STREAMS', stream, '>'
);
if (!results) continue;
for (const [, messages] of results) {
for (const [id, fields] of messages) {
const event = {
id,
type: fields[1],
data: JSON.parse(fields[3]),
timestamp: parseInt(fields[5])
};
try {
await handler(event);
await redis.xack(stream, group, id);
} catch (err) {
console.error(\`Failed to process \${id}:\`, err);
}
}
}
} catch (err) {
console.error('Consumer error:', err);
await new Promise(r => setTimeout(r, 1000));
}
}
}
미처리 메시지 재처리
async function claimStaleMessages(stream, group, consumer, minIdleMs) {
const pending = await redis.xpending(stream, group, '-', '+', 100);
for (const [id, owner, idle] of pending) {
if (idle > minIdleMs) {
await redis.xclaim(stream, group, consumer, minIdleMs, id);
console.log(\`Claimed stale message \${id} from \${owner}\`);
}
}
}
Kafka 대비 Redis Streams 비교
| 항목 | Redis Streams | Apache Kafka |
|---|---|---|
| 설치 복잡도 | 낮음 | 높음 (Zookeeper 등) |
| 처리량 | 중간 (~수십만/s) | 높음 (~수백만/s) |
| 영속성 | RDB/AOF | 디스크 기반 |
| 소비자 그룹 | 지원 | 지원 |
| 메시지 보관 | 수동 TRIM | 보존 기간 설정 |
| 적합한 규모 | 소~중규모 | 대규모 |
운영 팁
- MAXLEN으로 스트림 크기를 제한하여 메모리 사용량을 관리하세요:
XADD stream MAXLEN ~ 10000 * ... - PEL(Pending Entries List)을 주기적으로 모니터링하여 처리되지 않은 메시지를 확인하세요.
- XCLAIM을 활용하여 장애가 발생한 소비자의 미처리 메시지를 다른 소비자에게 재할당하세요.
- 소규모~중규모 이벤트 파이프라인에서는 Kafka 대신 Redis Streams가 운영 부담이 훨씬 적습니다.
댓글 0