[파이썬 100강] 97강. concurrent.futures 고급 패턴으로 대량 작업 파이프라인 운영하기
대량 작업을 처리할 때 가장 흔한 오해는 "스레드 풀만 쓰면 성능이 자동으로 좋아진다"는 믿음입니다. 실제로는 작업 성격(CPU 바운드/IO 바운드), 실패 처리 방식, 타임아웃 정책, 재시도 규칙을 같이 설계하지 않으면 오히려 느려지거나 장애를 더 크게 만들 수 있습니다. 이번 강의에서는 concurrent.futures를 단순 병렬 실행 도구가 아니라 운영 가능한 파이프라인 구성 도구로 다루겠습니다.
핵심 목표는 세 가지입니다. 첫째, ThreadPoolExecutor와 ProcessPoolExecutor를 언제 선택해야 하는지 기준을 명확히 잡습니다. 둘째, submit, as_completed, result(timeout=...)를 조합해 실패를 통제하는 패턴을 익힙니다. 셋째, 실무에서 바로 가져다 쓸 수 있는 "입력 검증-실행-집계-리포트" 흐름을 만듭니다.
핵심 개념
concurrent.futures는 "작업 제출"과 "결과 수거"를 분리해서, 병렬 실행을 구조적으로 관리하게 해 줍니다.ThreadPoolExecutor는 네트워크/파일 I-O 같은 대기 시간이 긴 작업에 유리하고,ProcessPoolExecutor는 CPU 계산처럼 GIL 영향을 크게 받는 작업에 유리합니다.- 실무 품질은 병렬화 자체보다 예외·타임아웃·부분 실패를 어떻게 다루는지에서 갈립니다.
많은 사람이 병렬화 코드를 처음 쓸 때 executor.map() 한 줄로 끝내고 만족합니다. 그런데 운영 환경에서는 작업마다 소요 시간이 다르고, 일부는 실패하며, 일부는 타임아웃이 납니다. 이때 map()만 쓰면 어느 작업이 실패했는지, 어떤 입력이 병목인지, 실패율이 얼마인지 추적이 어려워집니다. 그래서 고급 패턴에서는 보통 submit()으로 Future를 명시적으로 만들고, as_completed()로 완료 순서대로 수거하며, 각 Future에서 성공/실패 메타데이터를 남깁니다. 이렇게 해 두면 장애가 나도 "어떤 입력에서, 어떤 에러가, 얼마나 자주" 발생하는지 빠르게 확인할 수 있습니다.
기본 사용
예제 1) IO 바운드 작업을 ThreadPoolExecutor로 처리
>>> import time
>>> from concurrent.futures import ThreadPoolExecutor, as_completed
>>>
>>> def fetch_like_io(task_id: int) -> dict:
... # 네트워크 대기처럼 흉내 내기
... time.sleep(0.2)
... return {"task_id": task_id, "status": "ok"}
...
>>> tasks = [1, 2, 3, 4, 5]
>>> results = []
>>> with ThreadPoolExecutor(max_workers=3) as ex:
... futures = [ex.submit(fetch_like_io, t) for t in tasks]
... for fut in as_completed(futures):
... results.append(fut.result())
...
>>> len(results)
5
>>> all(r["status"] == "ok" for r in results)
True
해설:
as_completed()를 쓰면 "입력 순서"가 아니라 "완료 순서"로 결과를 받습니다. 느린 작업 하나 때문에 전체 진행이 막히는 느낌을 줄일 수 있습니다.- 결과를 리스트에 그냥 넣기보다, 실제 업무에서는
task_id, 시작/종료 시각, 예외 여부를 함께 기록하면 이후 분석이 쉬워집니다.
예제 2) CPU 바운드 작업을 ProcessPoolExecutor로 처리
>>> from concurrent.futures import ProcessPoolExecutor
>>>
>>> def heavy_sum(n: int) -> int:
... total = 0
... for i in range(n):
... total += i * i
... return total
...
>>> inputs = [50_000, 60_000, 70_000]
>>> with ProcessPoolExecutor(max_workers=2) as ex:
... out = list(ex.map(heavy_sum, inputs))
...
>>> len(out)
3
>>> out[0] > 0
True
해설:
- CPU 계산은 스레드보다 프로세스 풀이 유리한 경우가 많습니다.
- 단, 프로세스 풀은 직렬화(pickle) 가능한 함수/인자를 써야 하므로, 람다/클로저/열린 파일 핸들 전달 같은 패턴은 피하는 것이 안전합니다.
예제 3) 타임아웃과 예외를 분리 수집하는 운영형 패턴
>>> import time
>>> from concurrent.futures import ThreadPoolExecutor, as_completed, TimeoutError
>>>
>>> def unreliable_job(x: int) -> int:
... if x == 3:
... raise ValueError("bad input: 3")
... if x == 5:
... time.sleep(1.2) # 일부러 느리게
... return x * 10
...
>>> report = {"ok": [], "fail": []}
>>> with ThreadPoolExecutor(max_workers=4) as ex:
... fut_to_input = {ex.submit(unreliable_job, i): i for i in [1,2,3,4,5]}
... for fut in as_completed(fut_to_input):
... src = fut_to_input[fut]
... try:
... value = fut.result(timeout=1.0)
... report["ok"].append({"input": src, "value": value})
... except TimeoutError:
... report["fail"].append({"input": src, "error": "timeout"})
... except Exception as e:
... report["fail"].append({"input": src, "error": str(e)})
...
>>> len(report["ok"]) + len(report["fail"])
5
>>> any("bad input" in x["error"] for x in report["fail"])
True
해설:
- 운영에서는 "실패를 없애는 것"보다 "실패를 분류하고 다음 행동으로 연결하는 것"이 더 중요합니다.
fut_to_input매핑을 유지하면, 실패 메시지만 봐도 어떤 입력이 원인인지 즉시 파악할 수 있습니다.
예제 4) 파이프라인 집계(성공률/재시도 후보) 만들기
>>> def summarize(report: dict) -> dict:
... total = len(report["ok"]) + len(report["fail"])
... success = len(report["ok"])
... rate = round((success / total) * 100, 1) if total else 0.0
... retry_candidates = [f["input"] for f in report["fail"] if f["error"] == "timeout"]
... return {
... "total": total,
... "success": success,
... "success_rate": rate,
... "retry_candidates": retry_candidates,
... }
...
>>> sample = {
... "ok": [{"input": 1, "value": 10}, {"input": 2, "value": 20}],
... "fail": [{"input": 3, "error": "bad input: 3"}, {"input": 5, "error": "timeout"}]
... }
>>> summarize(sample)
{'total': 4, 'success': 2, 'success_rate': 50.0, 'retry_candidates': [5]}
해설:
- 병렬 처리는 결국 "결과를 다음 단계가 쓰기 좋은 형태로 만드는 것"까지 포함됩니다.
- 이렇게 집계 객체를 만들면 대시보드, 슬랙 알림, 일간 리포트로 쉽게 확장할 수 있습니다.
자주 하는 실수
실수 1) CPU 작업을 ThreadPoolExecutor로 밀어 넣기
>>> from concurrent.futures import ThreadPoolExecutor
>>> def cpu_work(n):
... s = 0
... for i in range(n):
... s += i * i
... return s
...
>>> # 문법상 문제는 없지만, 성능 향상이 거의 없거나 오히려 나빠질 수 있음
>>> with ThreadPoolExecutor(max_workers=8) as ex:
... _ = list(ex.map(cpu_work, [200000]*8))
원인:
- CPU 바운드 작업에서 스레드는 GIL 영향으로 병렬 이점이 제한될 수 있습니다.
해결:
>>> from concurrent.futures import ProcessPoolExecutor
>>> with ProcessPoolExecutor(max_workers=4) as ex:
... _ = list(ex.map(cpu_work, [200000]*8))
핵심은 "스레드=무조건 빠름"이 아니라 "작업 특성에 맞는 실행기 선택"입니다.
실수 2) Future.result()를 즉시 호출해 병렬성을 스스로 무력화
>>> from concurrent.futures import ThreadPoolExecutor
>>> import time
>>> def slow(x):
... time.sleep(0.2)
... return x
...
>>> with ThreadPoolExecutor(max_workers=4) as ex:
... futures = [ex.submit(slow, i) for i in range(4)]
... # 아래처럼 제출 직후 순서대로 result()를 기다리면
... # 느린 첫 작업이 끝날 때까지 나머지 이점을 잘 못 살릴 수 있음
... ordered = [f.result() for f in futures]
>>> ordered
[0, 1, 2, 3]
원인:
- 완료 순서를 활용하지 않고 입력 순서로 무조건 대기해서 전체 처리 효율이 떨어집니다.
해결:
>>> from concurrent.futures import as_completed
>>> with ThreadPoolExecutor(max_workers=4) as ex:
... futures = [ex.submit(slow, i) for i in range(4)]
... done_order = [f.result() for f in as_completed(futures)]
>>> sorted(done_order)
[0, 1, 2, 3]
as_completed()는 특히 작업 시간이 들쭉날쭉할 때 효과가 큽니다.
실수 3) 예외를 한데 뭉개서 실패 원인을 잃어버리기
- 증상: "실패 17건"은 보이는데, 무엇 때문에 실패했는지 모릅니다.
- 원인:
except Exception: return None같은 코드로 맥락을 버렸습니다. - 해결: 입력값, 예외 타입, 메시지를 함께 구조화해 저장하세요.
>>> def normalize_error(input_id, exc):
... return {
... "input": input_id,
... "error_type": type(exc).__name__,
... "message": str(exc),
... }
...
>>> try:
... raise RuntimeError("queue closed")
... except Exception as e:
... normalize_error("job-44", e)
{'input': 'job-44', 'error_type': 'RuntimeError', 'message': 'queue closed'}
이 구조 하나만 있어도 사후 분석 속도가 크게 올라갑니다.
실무 패턴
-
입력 검증 규칙:
- 작업 시작 전에 "필수 필드"와 "허용 범위"를 먼저 확인합니다. 병렬 실행 중간에 잘못된 입력이 섞이면 장애 범위가 커집니다.
- 입력 단위를 가능한 작게 쪼개면 부분 실패 시 재처리가 쉬워집니다.
-
로그/예외 처리 규칙:
- 최소 로그 스키마를 고정하세요:
job_id,input,status,latency_ms,error_type. - 타임아웃, 비즈니스 오류(검증 실패), 시스템 오류(네트워크/프로세스 종료)를 분리 집계하면 대응 전략이 명확해집니다.
- 최소 로그 스키마를 고정하세요:
-
재사용 함수/구조화 팁:
run_pipeline(inputs, worker, executor_factory)처럼 실행기 주입 구조를 만들면 테스트에서 스레드/프로세스를 바꿔 검증하기 쉽습니다.- 결과를 즉시 파일/DB에 append하는 방식보다, 우선 메모리에서 구조화한 뒤 배치 저장하는 편이 일관성 관리에 유리합니다.
-
성능/메모리 체크 포인트:
max_workers는 크게 잡을수록 좋지 않습니다. I-O 작업은 외부 시스템 한계(API rate limit, DB 연결 수)에 맞춰 조정하세요.- 처리량(throughput)만 보지 말고 p95 지연, 실패율, 재시도율을 같이 봐야 진짜 운영 품질을 판단할 수 있습니다.
실무에서 concurrent.futures를 잘 쓴다는 건 "병렬로 돌렸다"가 아니라 "문제 생겼을 때 제어 가능하다"는 뜻입니다. 작업 수가 10개일 때는 수동 확인으로 버티지만, 10만 개가 되면 구조화된 설계 없이는 운영이 불가능합니다. 그래서 이번 강의 패턴(입력 검증 → Future 매핑 → 결과 분류 → 집계/재시도 후보 생성)은 프로젝트 규모가 커질수록 더 큰 가치를 냅니다.
오늘의 결론
한 줄 요약: concurrent.futures의 고급 사용은 속도보다 통제력(실패 분류, 타임아웃, 재처리 가능성)을 확보하는 데 목적이 있습니다.
기억할 것:
- IO 바운드는 스레드, CPU 바운드는 프로세스라는 기본 축을 먼저 잡으세요.
submit + as_completed조합은 운영 추적성과 장애 대응력을 크게 높여 줍니다.- 결과는 "성공/실패"만 남기지 말고, 다음 액션(재시도/폐기/수동 검토)으로 연결 가능한 형태로 저장하세요.
연습문제
ThreadPoolExecutor로 URL 목록을 병렬 요청한다고 가정하고, 타임아웃 실패만 따로 모아retry_list를 만드는 함수를 작성해 보세요.- 같은 작업을
max_workers=4,8,16으로 돌렸을 때 성공률과 평균 지연시간을 비교하는 실험 코드를 작성해 보세요. submit + as_completed구조에서 예외 타입별(TimeoutError,ValueError, 기타) 카운트를 집계하는 리포트 함수를 구현해 보세요.
이전 강의 정답
- URL 결합 시 문자열 더하기 대신
urljoin사용
>>> from urllib.parse import urljoin
>>> base = "https://api.example.com/v1/"
>>> urljoin(base, "users")
'https://api.example.com/v1/users'
>>> urljoin(base, "/health")
'https://api.example.com/health'
- 쿼리 파라미터 인코딩
>>> from urllib.parse import urlencode
>>> params = {"q": "python futures", "page": 2}
>>> urlencode(params)
'q=python+futures&page=2'
- URL 파싱 후 안전하게 구성 요소 확인
>>> from urllib.parse import urlparse
>>> p = urlparse("https://example.com:443/docs?id=10")
>>> (p.scheme, p.netloc, p.path)
('https', 'example.com:443', '/docs')
실습 환경/재현 정보
- 실행 환경:
condaenvpython100(Python 3.11.14) - 가정한 OS: macOS/Linux 공통
- 권장 실행 방식:
- REPL에서 예제 함수 단위 검증
- 스크립트 파일로 분리 후
python lesson97_demo.py실행
- 재현 체크포인트:
as_completed사용 시 결과 순서가 입력 순서와 다를 수 있음- 타임아웃/예외가
report['fail']에 구조화되어 남는지 확인 max_workers변경 시 처리 시간과 실패율 변화를 함께 기록