본문 바로가기
Backend2024년 5월 13일7분 읽기

Redis Streams로 이벤트 드리븐 파이프라인 구축

YS
김영삼
조회 563

Redis Streams란?

Redis Streams는 Redis 5.0에서 도입된 로그 기반 데이터 구조입니다. Kafka와 유사한 개념이지만, 훨씬 가볍고 설정이 간단합니다. 소비자 그룹(Consumer Group)을 지원하여 여러 워커가 메시지를 분산 처리할 수 있습니다.

기본 명령어

XADD mystream * name "order" action "created" orderId "123"
XRANGE mystream - +
XREAD BLOCK 5000 STREAMS mystream $
XINFO STREAM mystream
XLEN mystream

Consumer Group 설정

XGROUP CREATE mystream order-processors 0
XREADGROUP GROUP order-processors worker1 COUNT 10 BLOCK 5000 STREAMS mystream >
XACK mystream order-processors 1234567890-0
XPENDING mystream order-processors

Node.js에서 이벤트 파이프라인 구현

Producer (이벤트 발행)

const Redis = require('ioredis');
const redis = new Redis();

async function publishEvent(stream, eventType, data) {
  const id = await redis.xadd(
    stream, '*',
    'type', eventType,
    'data', JSON.stringify(data),
    'timestamp', Date.now().toString()
  );
  console.log(\`Published \${eventType} to \${stream}: \${id}\`);
  return id;
}

await publishEvent('orders', 'order.created', {
  orderId: 'ORD-001',
  userId: 42,
  items: [{ productId: 'P1', qty: 2, price: 29900 }],
  total: 59800
});

Consumer (이벤트 소비)

const Redis = require('ioredis');
const redis = new Redis();

async function initConsumerGroup(stream, group) {
  try {
    await redis.xgroup('CREATE', stream, group, '0', 'MKSTREAM');
  } catch (err) {
    if (!err.message.includes('BUSYGROUP')) throw err;
  }
}

async function consumeEvents(stream, group, consumer, handler) {
  await initConsumerGroup(stream, group);
  while (true) {
    try {
      const results = await redis.xreadgroup(
        'GROUP', group, consumer,
        'COUNT', 10, 'BLOCK', 5000,
        'STREAMS', stream, '>'
      );
      if (!results) continue;
      for (const [, messages] of results) {
        for (const [id, fields] of messages) {
          const event = {
            id,
            type: fields[1],
            data: JSON.parse(fields[3]),
            timestamp: parseInt(fields[5])
          };
          try {
            await handler(event);
            await redis.xack(stream, group, id);
          } catch (err) {
            console.error(\`Failed to process \${id}:\`, err);
          }
        }
      }
    } catch (err) {
      console.error('Consumer error:', err);
      await new Promise(r => setTimeout(r, 1000));
    }
  }
}

미처리 메시지 재처리

async function claimStaleMessages(stream, group, consumer, minIdleMs) {
  const pending = await redis.xpending(stream, group, '-', '+', 100);
  for (const [id, owner, idle] of pending) {
    if (idle > minIdleMs) {
      await redis.xclaim(stream, group, consumer, minIdleMs, id);
      console.log(\`Claimed stale message \${id} from \${owner}\`);
    }
  }
}

Kafka 대비 Redis Streams 비교

항목Redis StreamsApache Kafka
설치 복잡도낮음높음 (Zookeeper 등)
처리량중간 (~수십만/s)높음 (~수백만/s)
영속성RDB/AOF디스크 기반
소비자 그룹지원지원
메시지 보관수동 TRIM보존 기간 설정
적합한 규모소~중규모대규모

운영 팁

  • MAXLEN으로 스트림 크기를 제한하여 메모리 사용량을 관리하세요: XADD stream MAXLEN ~ 10000 * ...
  • PEL(Pending Entries List)을 주기적으로 모니터링하여 처리되지 않은 메시지를 확인하세요.
  • XCLAIM을 활용하여 장애가 발생한 소비자의 미처리 메시지를 다른 소비자에게 재할당하세요.
  • 소규모~중규모 이벤트 파이프라인에서는 Kafka 대신 Redis Streams가 운영 부담이 훨씬 적습니다.

댓글 0

아직 댓글이 없습니다.
Ctrl+Enter로 등록