이번 글에서는 async/await의 동작을 이해하고, 파이썬에서 비동기가 어떤 방식으로 동작하는지 asyncio를 통해 이야기해보려 한다.
먼저 이번 글에서 asyncio의 동작 원리에 대한 부분은 아래 블로그의 도움을 정말 많이 받았다.(필사하며 공부하는 느낌😁)

시간날 때 원본을 읽어보는 것을 추천!

 

[Python] 비동기 프로그래밍 동작 원리 (asyncio)

JavaScript와 달리 Python은 비동기 프로그래밍에 어색하다. 애초에 JavaScript는 비동기 방식으로 동작하도록 설계된 언어인 반면, Python은 동기 방식으로 동작하도록 설계된 언어이기 때문이다. 그래

it-eldorado.tistory.com

Asyncio란

async/awiat 구문을 사용하여 파이썬에서 concurrent code를 작성할 수 있도록 하는 라이브러리
Python3.4에서 표준 라이브러리로 채택되었으며, 파이썬의 많은 비동기 프레임워크들이 asyncio를 기반으로 만들어졌다.


파이썬은 자바스크립트처럼 처음부터 비동기 프로그래밍을 위해 만들어진 언어가 아니다. 때문에 파이썬의 많은 내장 API들 역시 동기 방식으로 작동하는 경우가 많다. 하지만 Python3.4에서 asyncio가 비동기 코드를 위한 표준 라이브러리로 채택이 되고, Python3.5에서 async/await 구문이 추가되면서 파이썬에서도 비동기 프로그래밍을 위한 기반이 마련 되었다. 그렇다면 asyncio를 사용하기 전에는 파이썬에서 비동기를 구현할 수 없었을까?

Concurrent와 Async는 같은 의미인가요?
Asynchronous(비동기)라는 것은, 실행한 태스크의 처리가 완료될 때까지 기다리지 않고, 그 시간 동안 다른 작업을 처리하는 방식을 말한다. 프로그래밍에서는 Asyncronous를 아래와 같은 2가지 방식으로 구현할 수 있다.
*️⃣ Concurrency(동시성): context switching을 통해 여러 개의 태스크가 동시에 실행되는 것처럼 보이는 것
*️⃣ Parallelism(병렬성): 물리적으로 여러 개의 태스크가 동시에 실행되는 것

Python3.4 이전의 비동기 프로그래밍

파이썬은 GIL에 의해서, Parallelism을 통한 비동기 프로그래밍이 불가능하기 때문에, Concurrency를 통한 비동기 프로그래밍을 구현하는 경우만 생각하기로 한다.

Threading

멀티 스레드를 사용하여 비동기를 구현하는 것은 가장 단순하고 직관적인 방법인데, 그만큼 발생할 수 있는 문제점이 많다. 아래의 코드를 보자.

import time
import threading

class Mayhem(threading.Thread):
    def __init__(self, map: Mapping[str, int]) -> None:
        super().__init__()
        self.map = map
    def run(self):
        for key, value in self.map.items():
            time.sleep(value)

d = {"k1": 1, "k2": 2, "k3": 3}
m = Mayhem(d)
m.start()

d["k4"] = 4 # 이 코드 때문에 에러남

위 코드를 실행하면 RuntimeError가 발생한다. m.start()로 스레스를 실행시키고, run 메서드 내의 for문에서 map에 대해 iteration이 일어나고 있는데, 그 와중에 메인스레드에서 map에 새로운 요소를 추가하려 했기 때문이다.

이처럼 스레딩을 사용하면, 여러 스레드가 같은 자원에 동시 접근하는 경우 에러가 발생할 수 있다. 이 문제를 해결하기 위한 가장 간단한 방법이 바로 자원에 lock을 거는 방법인데, 같은 자원에 한번에 하나의 스레드만 접근할 수 있도록 하는 방식이다. 하지만 lock 방식에도 2가지 문제점이 있다.

  • 스레드들이 대부분의 시간을 해당 자원에 접근하기 위해 기다리는 데에 사용한다는 것 ➡️ lock contention
  • 자원에 접근할 수 있는 권한이 모든 스레드에게 공정하게 주어지지 않는다는 것 ➡️ lock starvation

멀티 스레드를 사용하여 비동기를 구현할 때 발생할 수 있는 또 따른 문제점이 있다. 바로 context switching 비용이 많이 발생한다는 것이다. 하나의 코어에서 여러 개의 스레드를 번갈아가며 실행해야 하기 때문에, CPU는 임의의 간격마다 스레드의 모든 context 정보를 저장하고, 다른 스레드로 전환하게 된다. 또한 context switching은 개별 스레드의 상태를 고려하지 않고 일어나기 때문에, 현재 동작하지 않고 대기중인 스레드에게도 context switching이 일어날 수 있다. 이 경우, 스레드는 처리할 작업이 없기 때문에 바로 sleep으로 들어가고 다시 context switching이 일어나게 된다.

Coroutine과 Eventloop

코루틴은 특정 시점에 자신의 실행과 관련된 상태를 어딘가에 저장한 뒤, 실행을 중단하고, 나중에 그 상태를 복원하여 실행을 재개할 수 있는 서브루틴을 의미한다. (여기서 서브루틴은 우리가 일반적으로 아는 함수를 가리킴)
이벤트루프는 무한 루프를 돌며 매 루프마다 태스크를 하나씩 실행시키는 로직을 의미한다.

코루틴과 이벤트루프에 대한 더 자세한 내용은 아래에서 다루도록 하자.
파이썬에서 싱글 스레드를 사용하는 Concurrent code를 위한 라이브러리들은 모두 코루틴과 이벤트루프를 이용해서 구현이 되어있고, 다만 어떤 방식으로 추상화하여 개발자에게 제공하느냐에 따라 아래와 같은 다양한 종류가 있다.

잠깐! 
Python3.5부터 파이썬에는 3가지 종류의 코루틴이 존재한다. 관련 내용이 궁금하다면 아래 글을 확인하자
Fluent Python, Second Edition

 

Green Threads

OS가 아닌, 런타임 라이브러리 또는 가상머신에 의해 예약되는 스레드
Kernel mode가 아닌 User mode에서 관리되는 것이 특징이다.

멀티 스레드를 사용할 때의 context switching 비용을 줄이기 위한 목적으로 등장했으며, '경량 스레드' 라고도 불린다. 그린 스레드는 스케줄링을 OS가 아닌 애플리케이션 코드에서 한다는 점이 일반 스레드와 다르다. 우리에게 익숙한 워딩을 사용하면 '가상 스레드' 또는 '논리적 스레드'라고 생각하는 것이 이해가 쉬울 것이다.
그린 스레드는 파이썬 뿐만이 아니라, 다른 언어에서도 사용되는 개념이고, 각 프로그래밍 언어마다 그 구현 방식에 차이가 있다. 파이썬에서는 그린 스레드를 사용한 비동기 프로그래밍 방법 중 Gevent라는 라이브러리가 유명한데, 그 내부는 코루틴과 이벤트루프를 사용하여 구현되어 있다. 하지만 개발자는 그 내부에서 코루틴이 어떻게 동작하는지 이해할 필요없이 기존의 스레드를 사용하는 방식과 비슷하게 비동기 코드를 작성할 수 있다.

  • 스레드들이 커널 레벨 대신 애플리케이션 레벨에서 관리됨
  • 일반 스레드와 유사함 ➡️ 스레드를 이해하고 있는 사람들이 사용하기 좋음
  • CPU context switching 문제를 제외한 일반 스레드 프로그래밍이 가진 문제점들을 가지고 있음

 

Callback fuctions

다른 함수의 파라미터로 전달된 실행 가능한 함수
주로 비동기 작업이 완료된 이후에 실행할 코드를 넘길 때 사용한다.

스레드와는 다른 방식의 비동기 구현 방식이다. 파이썬에는 콜백 스타일을 사용하여 비동기 네트워크 I/O를 구현한 Tornado라는 웹 프레임워크가 존재한다. 하지만 파이썬에서는 이 역시도 내부가 코루틴과 이벤트루프로 구현이 되어있다.

  • 프로그래머는 스레드와 코루틴을 직접 볼 수 없음
  • 콜백은 예외를 발생시키지 않음
  • 콜백의 콜백은 복잡하며 디버깅이 어려움
  • 콜백의 콜백의 콜백같은 콜백 지옥의 위험성

asyncio가 표준으로 자리잡기 전에는 대부분 그린스레드 및 콜백 스타일을 사용하여 비동기 프로그래밍을 구현했다고 한다.
그렇다면 지금부터 asyncio에서의 비동기는 어떤 원리로 동작하는지 알아보도록 하자.

Asyncio의 동작 원리

Coroutine ↔️ Generator

제일 먼저 비동기 프로그래밍의 기반이 되는 코루틴에 대해서 알 필요가 있다. 위에서 이미 언급했다시피, 코루틴은 특정 시점에 자신의 상태를 저장해서 실행을 멈추고, 이후에 다시 저장한 상태를 복원해서 실행을 재개할 수 있는 서브루틴을 말한다.
파이썬에서는 일반 함수인 def 키워드 앞에 async를 붙이면 코루틴이 된다.

def fuction1():
	print("This is Subroutine")
    
async def function2():
	print("This is Coroutine!")

파이썬에서 코루틴은 제너레이터를 기반으로 구현된다. 파이썬에서 코루틴은 곧 제너레이터라고 말할 수 있다. 제너레이터는 yield 키워드를 기준으로 현재 함수의 실행을 중단하고, 후에 해당 yield 이후부터 실행을 재개할 수 있기 때문이다. Python3.5 이전에 async키워드가 있기 전에는 코루틴을 제너레이터로 직접 구현해야 했다.
즉, async는 제너레이터를 더 쉽게 작성할 수 있도록 도와주는 역할을 하는 키워드이다.

❓Iterator(이터레이터)와 Generator(제너레이터)란?

이터레이터는 next() 함수 호출 시 계속 그다음 값을 반환하는 객체이다.

next() 함수를 호출할 때마다 이터레이터 객체는 요소를 차례대로 반환하며, 더 반환할 값이 없을 때 StopIteration 예외가 발생한다.
*️⃣ next()로 그 값을 한 번 읽으면 해당 값을 다시 읽을 수 없음

제너레이터는 이터레이터를 생성해주는 함수를 말한다.

보통 함수는 하나의 값(정수, 리스트, 딕셔너리 등)을 반환한다. 만약 함수가 하나의 값이 아니라, 연속된 값을 차례대로 하나씩 반환할 수 있다면 어떨까? 이런 발상에서 나온 것이 제너레이터이다.
이터레이터는 클래스에 __iter__, __next__ 메서드를 구현해야 하지만, 제너레이터는 함수 안에서 yield라는 키워드만 사용해서 훨씬 간단하게 작성할 수 있다.
*️⃣ 모든 generator는 iterator임
*️⃣ 모든 데이터를 한꺼번에 만들어서 메모리에 적재해놓는 것이 아니고, 필요할 때마다 하나씩 만들어냄
➡️ 위 이유 덕분에 제너레이터를 통해 무한한 순서가 있는 객체를 모델링할 수 있음
  • 이터레이터 클래스 예시
더보기
# 이터레이터 클래스
class MyItertor:
    def __init__(self, data):
        self.data = data
        self.position = 0

    def __iter__(self):
        return self

    def __next__(self):
        if self.position >= len(self.data):
            raise StopIteration
        result = self.data[self.position]
        self.position += 1
        return result


if __name__ == "__main__":
    i = MyItertor([1,2,3])
    for item in i:
        print(item)
  • 제너레이터의 원리와 yield from
더보기

다음과 같은 제너레이터가 있다고 가정해보자

def number_generator():
    yield 0
    yield 1
    yield 2

위 제너레이터의 객체를 얻어, for문을 통해 값을 읽어낼 때의 플로우는 아래 그림과 같다.

참고로 제너레이터 객체에서 __iter__를 호출하면 self를 반환하므로 본인과 동일한 객체를 얻는다.

generator는 왜 yield라는 키워드를 사용할까? yield는 생산하다라는 뜻과 함께 양보하다라는 뜻도 가지고 있다. 즉, yield를 사용하면 값을 함수 외부로 전달하면서 코드 실행도 외부에 양보한다. 따라서 yield는 현재 함수를 잠시 중단하고 함수 바깥의 코드가 실행되도록 만든다.

 

yield from 키워드는 Python3.3이상부터 사용 가능하며, yield from <iterable>처럼 뒤에 제너레이터 같은 iterable 객체를 둘 수 있으며, 이는 해당 iterable 객체의 모든 값이 소진될 때까지 실행하는 역할을 한다.

# yield 키워드 사용
>>> def three_generator():
...     a = [1, 2, 3]
...     for i in a:
...             yield i
... 
>>> gen = three_generator()
>>> list(gen)
[1, 2, 3]


# yield from 키워드 사용
>>> def three_generator():
...     a = [1, 2, 3]
...     yield from a
... 
>>> gen = three_generator()
>>> list(gen)
[1, 2, 3]

async def 함수를 호출하면 코루틴 객체가 반환된다. 이 때 중요한 점은 코루틴 객체를 생성해서 반환할 뿐, 해당 코루틴이 실행되지는 않는다는 것이다. 실제 코루틴의 실행이 어떻게 일어나는지는 조금 더 아래서 다루도록 하겠다.

async 키워드가 제너레이터의 생성을 더 쉽게 해주는 역할을 한 것처럼, await 키워드는 yield from 구문을 더 쉽게 작성할 수 있도록하는 역할을 한다. yield from 키워드 뒤에 제너레이터 객체를 두면 제너레이터 객체 안에서 또 다른 제너레이터 객체를 실행하는 것이 가능하다.

위에서 본 내용을 요약하여 코루틴과 제너레이터의 관계를 나타내면 아래 그림과 같다.

Future와 Task

 

코루틴과 퓨처, 태스크 객체 관계도

asyncio에서는 코루틴과 더불어, 퓨처와 태스크 객체도 사용되는데, 이들 셋 모두 Awaitable이기 때문에 await 키워드 뒤에 오는 것이 가능하다. asyncio에서의 이벤트루프 동작을 알아보기 전에, 각각의 객체들의 특성을 확인해보도록 하자.

 

Future 객체 

퓨처 객체는 작업의 실행 상태 및 결과를 저장하는 객체이다. 여기서의 실행 상태는 아래의 3가지 값을 가질 수 있다.

  • PENDING (진행 중)
  • CANCELED (취소)
  • FINISHED (완료)

작업이 완료되었다는 것은, 작업의 실행 상태가 CANCELED 혹은 FINISHED임을 의미하며, 예외가 발생한 경우에도 실행 상태는 FINISHED가 된다는 점을 명심하자.

 

실행 결과는 다음과 같은 값을 가질 수 있다.

  • 해당 작업의 결과값
  • 해당 작업에서 발생한 예외 객체

아래 코드에서 Future 클래스의 필드를 확인해보자.

class Future:
    _state = _PENDING # 작업의 실행 상태
    _result = None # 작업의 결과값
    _exception = None # 작업에서 발생한 예외
    _loop = None
    _source_traceback = None
    _cancel_message = None
    _cancelled_exc = None
    _asyncio_future_blocking = False
    __log_traceback = False

퓨처 객체는 어떠한 작업의 실행 상태와 결과를 저장할 뿐, 작업의 실행을 개시하는 역할은 수행하지 않다는 점이 중요하다. 

 

Task 객체

태스크 객체는 퓨처 객체를 상속하는 클래스이다. 따라서 퓨처 객체처럼 '작업의 실행 상태와 결과를 저장' 할 수도 있고, 추가로 '작업의 실행 개시'하는 역할도 수행한다. 작업의 실행을 개시하기 위해 필요한 것이 바로 코루틴이다. 

아래 코드에서 확인할 수 있듯이, 태스크 객체는 생성 시 코루틴 객체를 인자로 넘겨받아 _coro 필드에 저장한다.

class Task(futures._PyFuture):  # Inherit Python Task implementation

    _log_destroy_pending = True

    def __init__(self, coro, *, loop=None, name=None):
        super().__init__(loop=loop)
        if self._source_traceback:
            del self._source_traceback[-1]
        if not coroutines.iscoroutine(coro):
            self._log_destroy_pending = False
            raise TypeError(f"a coroutine was expected, got {coro!r}")

        if name is None:
            self._name = f'Task-{_task_name_counter()}'
        else:
            self._name = str(name)

        self._must_cancel = False
        self._fut_waiter = None
        self._coro = coro # 코루틴 객체
        self._context = contextvars.copy_context()

        self._loop.call_soon(self.__step, context=self._context)
        _register_task(self)

태스크 객체는 코루틴을 인자로 넣은 asyncio.run() 함수 또는 asyncio.create_task() 함수를 호출하여 생성할 수 있다. 태스크 객체는 생성되는 즉시 현재 스레드에 있는 이벤트루프에게 자신의 __step() 메서드를 호출해 줄 것을 요청한다. 

__step()은 자신의 코루틴 객체를 이용해서 해당 코루틴을 실행하는 메서드이다. 이것을 '코루틴이 태스크로서 실행되도록 이벤트루프에 예약을 건다'라고 한다. 이렇게 태스크 객체의 __step() 메서드가 호출되면 필드에 가지고 있는 코루틴의 실행이 개시되며, 해당 코루틴은 그 안에서 다른 코루틴을, 그 코루틴은 또 다른 코루틴을 계속해서 호출할 수 있다. 이를 코루틴 체인이라고 부른다.

결국 하나의 태스크 객체는 코루틴 체인의 실행을 관장하는 역할을 한다고 볼 수 있다.

 

코루틴 체인에서 sleep 또는 I/O관련 코루틴을 await하는 코드를 만날 경우, 태스크 객체는 자신의 실행을 중단하고 이벤트루프에게 제어를 넘긴다. 제어를 넘겨받은 이벤트루프는 실행이 예약된 태스크들 중 우선순위가 높은 것을 선택하여 실행시킨다. 이후 아까 중단되었던 태스크가 다시 실행 가능한 상태가 되면, 해당 태스크는 다시 이벤트 루프에 자신의 실행을 예약해두고, 이벤트루프는 제어를 넘겨받을 때마다 예약된 태스크들을 하나씩 실행한다.

 

태스크 객체가 처음 실행한 코루틴의 실행이 완료되면, 태스크 객체는 해당 코루틴으로부터 반환값을 얻어 자신의 결과값에 저장한다. 이는 태스크의 실행이 완료된 상태임을 의미하며, 해당 태스크는 더 이상 이벤트루프에 실행을 예약할 수 없게 된다.

이벤트루프의 동작 원리

파이썬에서 코루틴을 실행시키는 방법은 아래와 같다.

  • await 키워드 ➡️ 코루틴 내에서만 사용 가능
  • asyncio.run() 함수 
  • asyncio.create_task() 함수

위 3가지 방법 중, 맨 처음 코루틴의 실행이 가능한 방법은 asyncio.run() 함수와 asyncio.create_task() 함수 2가지이다.  2개의 함수를 차근차근 살펴보자.

 

asyncio.run() ➡️ Python3.7 이상부터 사용 가능

asyncio.run() 함수는 현재의 스레드에 새로운 이벤트루프를 생성하고, 인자로 받은 코루틴 객체를 태스크로 실행을 예약한 뒤, 해당 태스크의 실행이 완료되면 이벤트루프를 닫는 역할을 수행한다. 해당 함수는 Python3.7 이상부터 사용할 수 있고, 이전 버전에서는 아래의 코드로 같은 기능을 실행할 수 있다.

loop = asyncio.get_event_loop() # 이벤트루프가 있으면 반환 / 없으면 새로 생성 후 반환
loop.run_until_complete(coroutine()) # 코루틴 실행!
loop.close() # 이벤트루프 닫음

1️⃣ loop = asyncio.get_event_loop()

현재 스레드에 생성된 이벤트루프가 있으면 해당 루프를 반환하고, 없으면 새로 생성 후 반환하는 함수이다. 이벤트루프는 무한 루프를 돌면서 한번에 하나의 태스크를 실행시키는 로직을 의미한다. 

2️⃣ loop.run_until_complete(coroutine())

태스크 객체의 __step() 메서드 코드와 함께 아래 설명을 살펴보자.

class Task(futures._PyFuture):

	...
    
    def __step(self, exc=None):
        if self.done():
            raise exceptions.InvalidStateError(
                f'_step(): already done: {self!r}, {exc!r}')
        if self._must_cancel:
            if not isinstance(exc, exceptions.CancelledError):
                exc = self._make_cancelled_error()
            self._must_cancel = False
        coro = self._coro
        self._fut_waiter = None # 퓨처 객체 바인딩 해제

        _enter_task(self._loop, self)
        # Call either coro.throw(exc) or coro.send(None).
        try:
            if exc is None:
                # We use the `send` method directly, because coroutines
                # don't have `__iter__` and `__next__` methods.
                result = coro.send(None)
            else:
                result = coro.throw(exc)
        except StopIteration as exc: # 태스크가 실행한 최초의 코루틴 실행 완료
            if self._must_cancel:
                # Task is cancelled right before coro stops.
                self._must_cancel = False
                super().cancel(msg=self._cancel_message)
            else:
                super().set_result(exc.value) # 발생한 예외 객체의 value 필드에 결과 저장
        except exceptions.CancelledError as exc:
            # Save the original exception so we can chain it later.
            self._cancelled_exc = exc
            super().cancel()  # I.e., Future.cancel(self).
        except (KeyboardInterrupt, SystemExit) as exc:
            super().set_exception(exc)
            raise
        except BaseException as exc:
            super().set_exception(exc)
        else:
            blocking = getattr(result, '_asyncio_future_blocking', None)
            if blocking is not None:
                # Yielded Future must come from Future.__iter__().
                if futures._get_loop(result) is not self._loop:
                    new_exc = RuntimeError(
                        f'Task {self!r} got Future '
                        f'{result!r} attached to a different loop')
                    self._loop.call_soon(
                        self.__step, new_exc, context=self._context)
                elif blocking:
                    if result is self:
                        new_exc = RuntimeError(
                            f'Task cannot await on itself: {self!r}')
                        self._loop.call_soon(
                            self.__step, new_exc, context=self._context)
                    else:
                        result._asyncio_future_blocking = False
                        result.add_done_callback(
                            self.__wakeup, context=self._context) # 퓨처 객체의 실행 상태가 완료로 변경될 때 실행할 콜백 등록
                        self._fut_waiter = result # 반환받은 퓨처 객체를 _fut_waiter에 저장
                        if self._must_cancel:
                            if self._fut_waiter.cancel(
                                    msg=self._cancel_message):
                                self._must_cancel = False
                else:
                    new_exc = RuntimeError(
                        f'yield was used instead of yield from '
                        f'in task {self!r} with {result!r}')
                    self._loop.call_soon(
                        self.__step, new_exc, context=self._context)

            elif result is None:
                # Bare yield relinquishes control for one event loop iteration.
                self._loop.call_soon(self.__step, context=self._context)
            elif inspect.isgenerator(result):
                # Yielding a generator is just wrong.
                new_exc = RuntimeError(
                    f'yield was used instead of yield from for '
                    f'generator in task {self!r} with {result!r}')
                self._loop.call_soon(
                    self.__step, new_exc, context=self._context)
            else:
                # Yielding something else is an error.
                new_exc = RuntimeError(f'Task got bad yield: {result!r}')
                self._loop.call_soon(
                    self.__step, new_exc, context=self._context)
        finally:
            _leave_task(self._loop, self)
            self = None  # Needed to break cycles when an exception occurs.
  1. 태스크 실행
    • 인자로 받은 코루틴 객체로 태스크 객체 생성 후, 태스크 객체의 실행이 즉시 예약됨
    • 다른 태스크가 없을 경우 즉시 실행됨
    • 태스크의 __step() 메서드가 실행됨(메서드에서 코루틴 객체의 send() 메서드로 코루틴 실행)
  2. 코루틴 체인이 sleep 또는 I/O 코루틴을 만났을 때
    • 코루틴 체인에서 asyncio.sleep() 또는 I/O관련 코루틴을 만날 수 있는데, 이러한 코루틴들은 퓨처 객체를 await 하도록 구현되어 있음
    • I/O관련 코루틴의 경우, 이 코루틴은 특정 소켓에 데이터를 읽거나 쓰기 위해 해당 소켓의 상태를 검사함. 당장 읽거나 쓸 수 있는 상태가 아니라면 select() 함수를 이용해서 해당 소켓을 등록해두고, 해당 소켓에 바인딩된 퓨처 객체를 생성하여 await 함(퓨처 객체의 __await__() 메서드는 자기 자신을 yield하도록 구현되어 있음➡️해당 퓨처는 코루틴 체인을 따라 태스크 객체의 __step() 메서드로 전달됨)
    • asyncio.sleep(1) 코루틴의 경우, 이 코루틴은 퓨처 객체를 하나 생성한 뒤 이벤트루프에게는 1초 뒤에 해당 퓨처 객체의 결과값을 업데이트하도록 요청함. 그 후 해당 퓨처 객체를 await
  3. 태스크 객체의 퓨처 객체 처리
    • 퓨처 객체의 add_done_callback() 메서드를 호출함(퓨처 객체가 완료 상태가 되었을 때 이벤트루프에 등록할 콜백 함수 추가 ➡️ 이 때 등록되는 것은 자기 자신(태스크 객체)의 __step() 메서드 🟰 자기 자신의 실행을 예약)
    • 반환받은 퓨처 객체를 _fut_waiter 필드에 저장
    • 태스크 객체는 자신의 실행을 중단한 후, 이벤트루프에 제어 넘김
    • _fut_waiter가 바인딩 되어있는 퓨처 객체는 이벤트루프에 의해 실행되지 못함
    • 이벤트루프는 실행이 예약된 태스크들 중 우선 순위가 높은 것들을 적절히 선택하여 실행시킴 ➡️ 이벤트루프는 이러한 과정을 반복하며 concurrent 하게 태스크들을 실행함
  4. 이벤트루프의 polling
    • 이벤트루프는 시간이 날 때마다 select() 함수를 이용해서 데이터를 읽거나 쓸 준비가 된 소켓을 계속 찾음
    • 그런 소켓을 찾으면, 해당 소켓에 바인딩되어 있는 퓨처 객체의 결과값을 업데이트함
    • 바로 이 순간, 아까 등록된 콜백 함수의 실행이 이벤트루프에 예약됨
  5. 태스크 객체의 실행 재개
    • 태스크의 실행은 곧 __step() 메서드가 호출되는 것을 의미함
    • 이 __step() 메서드는 제일 먼저 바인딩된 퓨처 객체를 해제함 (self._fut_waiter = None)
    • 자신의 코루틴 객체의 send() 메서드를 호출해서 코루틴 실행 재개
    • 해당 퓨처 객체의 __await__() 메서드에서 자기 자신을 yield하는 부분(중단되었던 부분)에 도달
    • I/O 관련 코루틴 때문에 기다리고 있었던 경우, 해당 소켓에 대해 데이터를 읽거나 쓸 준비가 되었다는 것이므로 해당 소켓(자기 자신에 바인딩되어 있음)에 대해 데이터를 읽거나 쓴 다음 그 값을 return함
    • asyncio.sleep() 관련 코루틴 때문이었을 경우, 바로 return함
  6. 최초 코루틴의 return
    • 이러한 과정이 반복되다가, 태스크가 실행한 최초의 코루틴이 return 하는 경우, 태스크 객체의 __step() 메서드에서 StopIteration 예외가 발생함
    • 그러면 태스크 객체가, 발생한 StopIteration 예외 객체의 value 필드에 코루틴의 return 값을 저장하고 본인의 실행을 종료함 ➡️ 이 때 loop.run_until_completed() 함수의 실행이 끝남

3️⃣ loop.close()

이벤트루프를 닫는 함수이다. 해당 함수를 실행했을 때 아직 완료되지 못한 태스크들이 남아있으면, Task was destroyed but it is pending! 이라는 경고 메세지가 출력된다.

asyncio.create_task() ➡️ Python3.7 이상부터 사용 가능

앞서 살펴본 asyncio.run() 함수는 사실 태스크를 1개만 실행시키므로, concurrent하게 실행되지는 않는다.(태스크가 1개밖에 없으므로) 해당 함수는 Python3.7 이상부터 사용가능하므로, 이전 버전에서는 asyncio.ensure_future() 함수를 대신 사용한다.

asyncio.gather() 함수는 인자로 여러 개의 awaitable 객체를 받는다. 만약 코루틴 객체를 받으면 자동으로 태스크 객체로 래핑이 되기 때문에 사실상 퓨처 객체(태스크 포함)만 넘어간다고 생각해도 된다.

 

아래 코드를 보며, 2개의 태스크가 concurrent하게 실행되는 과정을 이해해보자.

import asyncio
import time

async def sleep(sec):
    await asyncio.sleep(sec)
    return sec

async def main():
    sec_list = [1, 2]
    tasks = [asyncio.create_task(sleep(sec)) for sec in sec_list]  # [Task 1 객체, Task 2 객체]
    tasks_results = await asyncio.gather(*tasks)  # [Task 1 객체의 결과 값, Task 2 객체의 결과 값]
    return tasks_results

start = time.time()

loop = asyncio.get_event_loop()
result = loop.run_until_complete(main())
loop.close()

end = time.time()

print('result : {}'.format(result))
print('total time : {0:.2f} sec'.format(end - start))
  1. loop.run_until_complete() 함수에 의해 Task 0가 실행되고, 이로 인해 main() 코루틴이 실행된다.
  2. main() 코루틴은 asyncio.create_task() 함수를 통해 Task 1, Task 2 객체를 생성하고 실행을 예약한다.
  3. asyncio.gather() 코루틴은 Task 1 객체를 await 한다.
  4. Task 0는 Task 1 객체가 완료 상태가 될 때까지 기다리도록 하고, 이벤트 루프에게 제어를 넘긴다.
  5. 이벤트 루프가 Task 1을 실행한다.
  6. Task 1은 sleep(1) 코루틴을 실행하고, 다시 asyncio.sleep(1) 코루틴을 실행한다.
  7. asyncio.sleep(1) 코루틴은 Future 1 객체를 만들고, 1초 뒤에 Future 1 객체의 결과 값이 갱신되도록 이벤트 루프에 예약을 건 뒤, Future 1 객체를 await 한다.
  8. Task 1은 Future 1 객체가 완료 상태가 될 때까지 기다리도록 하고, 이벤트 루프에게 제어를 넘긴다.
  9. 이벤트 루프가 Task 2를 실행한다.
  10. Task 2는 sleep(2) 코루틴을 실행하고, 다시 asyncio.sleep(2) 코루틴을 실행한다.
  11. asyncio.sleep(2) 코루틴은 Future 2 객체를 만들고, 2초 뒤에 Future 2 객체의 결과 값이 갱신되도록 이벤트 루프에 예약을 건 뒤, Future 2 객체를 await 한다.
  12. Task 2는 Future 2 객체가 완료 상태가 될 때까지 기다리도록 하고, 이벤트 루프에게 제어를 넘긴다.
  13. 이제 이벤트 루프는 실행할 태스크가 없으므로 아무것도 하지 않는다.
  14. 그러다가 1초가 지나면 이벤트 루프는 Future 1 객체의 결과 값을 갱신한다. 이로 인해 Future 1 객체가 완료 상태가 될 때까지 기다리던 Task 1의 실행이 다시 예약된다.
  15. 이벤트 루프가 Task 1을 실행한다.
  16. asyncio.sleep(1) 코루틴으로 돌아가서 실행이 중단되었던 부분부터 실행을 재개한다.
  17. asyncio.sleep(1) 코루틴이 리턴하고, sleep(1) 코루틴도 리턴한다. 이때 반환 값은 1이다.
  18. Task 1 객체의 결과 값이 1로 설정되면서 Task 1의 실행이 완료된다. 이로 인해 Task 1 객체가 완료 상태가 될 때까지 기다리던 Task 0의 실행이 다시 예약된다.
  19. 이벤트 루프가 Task 0를 실행한다.
  20. asyncio.gather() 코루틴으로 돌아가서 실행이 중단되었던 부분부터 실행을 재개한다.
  21. asyncio.gather() 코루틴은 Task 1 객체의 결과 값을 저장하고, Task 2 객체를 await 한다.
  22. Task 0는 Task 2 객체가 완료 상태가 될 때까지 기다리도록 하고, 이벤트 루프에게 제어를 넘긴다.
  23. 이제 이벤트 루프는 실행할 태스크가 없으므로 아무것도 하지 않는다.
  24. 그러다가 1초가 더 지나면 이벤트 루프는 Future 2 객체의 결과 값을 갱신한다. 이로 인해 Future 2 객체가 완료 상태가 될 때까지 기다리던 Task 2의 실행이 다시 예약된다.
  25. 이벤트 루프가 Task 2를 실행한다.
  26. asyncio.sleep(2) 코루틴으로 돌아가서 실행이 중단되었던 부분부터 실행을 재개한다.
  27. asyncio.sleep(2) 코루틴이 리턴하고, sleep(2) 코루틴도 리턴한다. 이때 반환 값은 2이다.
  28. Task 2 객체의 결과 값이 2로 설정되면서 Task 2의 실행이 완료된다. 이로 인해 Task 2 객체가 완료 상태가 될 때까지 기다리던 Task 0의 실행이 다시 예약된다.
  29. 이벤트 루프가 Task 0를 실행한다.
  30. asyncio.gather() 코루틴으로 돌아가서 실행이 중단되었던 부분부터 실행을 재개한다.
  31. asyncio.gather() 코루틴은 [Task1 객체의 결과 값, Task 2 객체의 결과 값], 즉 [1, 2]를 리턴한다.
  32. main() 코루틴도 리턴한다. 이때 반환 값은 [1, 2]이다.
  33. Task 0 객체의 결과 값이 [1, 2]로 설정되면서 Task 0의 실행이 완료된다.
  34. loop.run_until_complete()의 실행이 완료되고, 이벤트 루프를 닫는다.

동기 함수를 코루틴처럼 사용하기(loop.run_in_executer() 메서드)

결국 파이썬 비동기 프로그래밍이 효과적으로 동작하기 위해서는, 현재의 스레드와 무관한 다른 곳에서 작업을 처리할 수 있어야한다. sleep과 I/O 관련 코루틴처럼 말이다. 

그러나 처음에도 언급했다시피, 파이썬은 동기적으로 작동하도록 설계된 언어이기 때문에 가지고 있는 대부분의 API들이 모두 동기 방식으로 동작한다.

이런 동기 방식의 코드를 이벤트루프가 존재하는 스레드가 아닌, 별도의 스레드에서 실행시킴으로써 sleep 혹은 I/O 관련 코루틴처럼 사용할 수 있게 해주는 함수가 바로 loop.run_in_executer() 이다.

복사했습니다!