Celery란?
Celery는 Python 기반의 분산 비동기 작업 큐 시스템입니다. 웹 요청과 분리해야 하는 시간 소요 작업을 백그라운드에서 처리합니다.
기본 설정
from celery import Celery
app = Celery('myapp', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Asia/Seoul',
enable_utc=True,
worker_concurrency=4,
worker_max_tasks_per_child=100,
result_expires=86400,
)
태스크 정의
from celery_app import app
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task(bind=True, max_retries=3)
def send_email(self, to, subject, body):
try:
logger.info(f"Sending email to {to}")
smtp_client.send(to=to, subject=subject, body=body)
return {'status': 'sent', 'to': to}
except ConnectionError as exc:
raise self.retry(exc=exc, countdown=10 * (2 ** self.request.retries))
@app.task(bind=True, rate_limit='10/m')
def process_image(self, image_path, operations):
logger.info(f"Processing {image_path}")
result = apply_operations(image_path, operations)
return result
@app.task(time_limit=300, soft_time_limit=240)
def generate_report(report_id):
try:
data = fetch_report_data(report_id)
pdf = render_pdf(data)
upload_to_s3(pdf)
return {'report_id': report_id, 'url': pdf.url}
except SoftTimeLimitExceeded:
logger.warning(f"Report {report_id} soft time limit exceeded")
save_partial_result(report_id)
raise
태스크 호출
result = send_email.delay('user@test.com', '제목', '본문')
result = send_email.apply_async(
args=['user@test.com', '제목', '본문'],
countdown=60, expires=3600, queue='email-queue'
)
print(result.id)
print(result.status)
print(result.get(timeout=30))
작업 체이닝과 그룹
from celery import chain, group, chord
workflow = chain(fetch_data.s(user_id), process_data.s(), send_notification.s(user_email))
result = workflow.apply_async()
batch = group(
process_image.s(f'img_{i}.jpg', ['resize', 'watermark']) for i in range(100)
)
result = batch.apply_async()
workflow = chord(
[fetch_partial_data.s(shard) for shard in shards],
merge_results.s()
)
result = workflow.apply_async()
주기적 작업 (Celery Beat)
from celery.schedules import crontab
app.conf.beat_schedule = {
'cleanup-expired-sessions': {
'task': 'tasks.cleanup_sessions',
'schedule': crontab(minute=0, hour='*/6'),
},
'generate-daily-report': {
'task': 'tasks.generate_report',
'schedule': crontab(minute=0, hour=9),
'args': ('daily',),
},
'health-check': {
'task': 'tasks.health_check',
'schedule': 60.0,
},
}
celery -A celery_app worker -l info -Q default,email-queue
celery -A celery_app beat -l info
celery -A celery_app flower --port=5555
프로덕션 운영 팁
| 설정 | 권장값 | 설명 |
|---|---|---|
| task_acks_late | True | 작업 완료 후 ACK |
| task_reject_on_worker_lost | True | 워커 사망 시 재큐잉 |
| worker_prefetch_multiplier | 1 | 공정 분배 |
| worker_max_tasks_per_child | 100-1000 | 메모리 누수 방지 |
- 작업 내에서 DB 커넥션을 직접 관리하지 말고, 커넥션 풀을 사용하세요.
- Flower나 Prometheus + Grafana로 큐 길이, 실패율, 처리 시간을 모니터링하세요.
- 작업 단위로 독립된 큐를 사용하여, 느린 작업이 빠른 작업을 블로킹하지 않게 하세요.
- task_acks_late=True와 멱등성(idempotency) 설계를 함께 적용하면 안정적인 작업 처리가 가능합니다.
댓글 0