본문 바로가기
Backend2024년 7월 18일7분 읽기

Python Celery 비동기 작업 큐 실전 활용

YS
김영삼
조회 755

Celery란?

Celery는 Python 기반의 분산 비동기 작업 큐 시스템입니다. 웹 요청과 분리해야 하는 시간 소요 작업을 백그라운드에서 처리합니다.

기본 설정

from celery import Celery

app = Celery('myapp', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')

app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='Asia/Seoul',
    enable_utc=True,
    worker_concurrency=4,
    worker_max_tasks_per_child=100,
    result_expires=86400,
)

태스크 정의

from celery_app import app
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task(bind=True, max_retries=3)
def send_email(self, to, subject, body):
    try:
        logger.info(f"Sending email to {to}")
        smtp_client.send(to=to, subject=subject, body=body)
        return {'status': 'sent', 'to': to}
    except ConnectionError as exc:
        raise self.retry(exc=exc, countdown=10 * (2 ** self.request.retries))

@app.task(bind=True, rate_limit='10/m')
def process_image(self, image_path, operations):
    logger.info(f"Processing {image_path}")
    result = apply_operations(image_path, operations)
    return result

@app.task(time_limit=300, soft_time_limit=240)
def generate_report(report_id):
    try:
        data = fetch_report_data(report_id)
        pdf = render_pdf(data)
        upload_to_s3(pdf)
        return {'report_id': report_id, 'url': pdf.url}
    except SoftTimeLimitExceeded:
        logger.warning(f"Report {report_id} soft time limit exceeded")
        save_partial_result(report_id)
        raise

태스크 호출

result = send_email.delay('user@test.com', '제목', '본문')

result = send_email.apply_async(
    args=['user@test.com', '제목', '본문'],
    countdown=60, expires=3600, queue='email-queue'
)

print(result.id)
print(result.status)
print(result.get(timeout=30))

작업 체이닝과 그룹

from celery import chain, group, chord

workflow = chain(fetch_data.s(user_id), process_data.s(), send_notification.s(user_email))
result = workflow.apply_async()

batch = group(
    process_image.s(f'img_{i}.jpg', ['resize', 'watermark']) for i in range(100)
)
result = batch.apply_async()

workflow = chord(
    [fetch_partial_data.s(shard) for shard in shards],
    merge_results.s()
)
result = workflow.apply_async()

주기적 작업 (Celery Beat)

from celery.schedules import crontab

app.conf.beat_schedule = {
    'cleanup-expired-sessions': {
        'task': 'tasks.cleanup_sessions',
        'schedule': crontab(minute=0, hour='*/6'),
    },
    'generate-daily-report': {
        'task': 'tasks.generate_report',
        'schedule': crontab(minute=0, hour=9),
        'args': ('daily',),
    },
    'health-check': {
        'task': 'tasks.health_check',
        'schedule': 60.0,
    },
}
celery -A celery_app worker -l info -Q default,email-queue
celery -A celery_app beat -l info
celery -A celery_app flower --port=5555

프로덕션 운영 팁

설정권장값설명
task_acks_lateTrue작업 완료 후 ACK
task_reject_on_worker_lostTrue워커 사망 시 재큐잉
worker_prefetch_multiplier1공정 분배
worker_max_tasks_per_child100-1000메모리 누수 방지
  • 작업 내에서 DB 커넥션을 직접 관리하지 말고, 커넥션 풀을 사용하세요.
  • Flower나 Prometheus + Grafana로 큐 길이, 실패율, 처리 시간을 모니터링하세요.
  • 작업 단위로 독립된 큐를 사용하여, 느린 작업이 빠른 작업을 블로킹하지 않게 하세요.
  • task_acks_late=True와 멱등성(idempotency) 설계를 함께 적용하면 안정적인 작업 처리가 가능합니다.

댓글 0

아직 댓글이 없습니다.
Ctrl+Enter로 등록