# Notebook 환경에서는 이미 이벤트 루프안에서 동작하고 있기 때문에
# asyncio.run()은 사용할 수 없습니다. 따라서 Notebook 환경에서는
# asyncio.run(X)는 await X로 바꿔 읽기 바랍니다.
asyncio.run(main())
<_UnixSelectorEventLoop running=True closed=False debug=False>
── 비동기 처리 캡슐화 실행
Executor
클래스의 .submin()
메서드로 전달.submit()
메서드는 Future
에 스케쥴링 포함된 인스턴스를 출력# ThreadPoolExecutor는 Executor의 구현 서브 클래스
from concurrent.futures import (
Future, ThreadPoolExecutor,
)
# 비동기로 수행할 처리
def func():
return 1
# 비동기로 수행할 처리를 submit()에 전달
future = ThreadPoolExecutor().submit(func)
isinstance(future, Future)
# 비동기로 수행한 처리의 반환값 취득
print("비동기 처리 반환 값 :", future.result())
print("현재상태 :", future.done())
print("Future Status :", future.running())
print("Future Cancelled :", future.running())
비동기 처리 반환 값 : 1 현재상태 : True Future Status : False Future Cancelled : False
urls = ['https://twitter.com', 'https://facebook.com', 'https://instagram.com']
from hashlib import md5
from pathlib import Path
from urllib import request
def download(url):
req = request.Request(url)
# 파일 이름에 / 등이 포함되지 않도록 함
name = md5(url.encode('utf-8')).hexdigest()
file_path = './' + name
with request.urlopen(req) as res:
Path(file_path).write_bytes(res.read())
return url, file_path
import time
def elapsed_time(f):
def wrapper(*args, **kwargs):
st = time.time()
v = f(*args, **kwargs)
print(f"{f.__name__}: {time.time() - st}")
return v
return wrapper
@elapsed_time
def get_sequential():
[print(download(url)) for url in urls]
get_sequential()
('https://twitter.com', './be8b09f7f1f66235a9c91986952483f0') ('https://facebook.com', './a023cfbf5f1c39bdf8407f28b60cd134') ('https://instagram.com', './09f8b89478d7e1046fa93c7ee4afa99e') get_sequential: 1.7453134059906006
get_multi_thread()
를 활용한 다중 스레드 구현
ThreadPoolExecutor
클래스 인스턴스는 콘텍스트 관리자 로 with 문을 사용할 수 있다.max_workers
의 기본값은 코어 수 * 5 를 쓴다from concurrent.futures import (
ThreadPoolExecutor,
as_completed
)
@elapsed_time
def get_multi_thread():
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(download, url)
for url in urls
]
# 완료된 것부터 얻을 수 있다
[print(future.result()) for future in as_completed(futures)]
get_multi_thread()
('https://twitter.com', './be8b09f7f1f66235a9c91986952483f0') ('https://facebook.com', './a023cfbf5f1c39bdf8407f28b60cd134') ('https://instagram.com', './09f8b89478d7e1046fa93c7ee4afa99e') get_multi_thread: 0.7480037212371826
다중 실행을 하면 동작하지 않는 경우가 발생하는데. 스레드 세이프한 구성 으로 변경한다
from concurrent.futures import (
ThreadPoolExecutor,
wait
)
class Counter:
def __init__(self):
self.count = 0
def increment(self):
self.count = self.count + 1
# 1,000,000회 증가
def count_up(counter):
for _ in range(1000000):
counter.increment()
counter = Counter()
# 2개의 스레드를 준비하고, 각각 count_up을 호출
threads = 2
with ThreadPoolExecutor() as e:
futures = [
e.submit(count_up, counter)
for _ in range(threads)
]
done, not_done = wait(futures)
# 2,000,000 을 출력하지 않고 중간에 엉성하게 중복연산이 실행
print(f'{counter.count=:,}')
counter.count=1,765,225
── 스레트 세이프티한 카운터 예시
threading.Lock
객체를 활용하여 exclusive control
을 추가한다Lock
을 얻은으면 해당 스레드만 실행할 수 있고, Lock
의 처리가 끝나면 빠르게 해제된다Lock
이 뒤늦게 해제되는 상황을 방지하기 위해 Lock
객체는 with
문과 함께 사용한다── 주의 할 점
Lock
객체를 감싸도록 한다import threading
class ThreadSafeCounter:
lock = threading.Lock() # 록을 준비
def __init__(self):
self.count = 0
# with 로 감싼 `배타 제어처리`를 `Lock` 내부에 정의한다
def increment(self):
with self.lock:
self.count = self.count + 1
counter = ThreadSafeCounter()
threads = 2
with ThreadPoolExecutor() as e:
futures = [e.submit(count_up, counter)
for _ in range(threads)]
done, not_done = wait(futures)
print(f'{counter.count=:,}') # 기대한 값이 출력된다
counter.count=2,000,000
N = 100000
import os
import time
import sys
def fibonacci(n):
a, b = 0, 1
for _ in range(n):
a, b = b, b + a
else:
return a
def elapsed_time(f):
def wrapper(args, **kwargs):
st = time.time()
v = f(args, **kwargs)
print(f"{f.__name__}: {time.time() - st}")
return v
return wrapper
@elapsed_time
def get_sequential(nums):
[ fibonacci(num) for num in nums]
def main(n):
# n = int(sys.argv[1])
print(f'Os Count is : {os.cpu_count()}')
nums = [n] * os.cpu_count()
get_sequential(nums)
main(N)
Os Count is : 12 get_sequential: 0.89145827293396
다중 프로세스를 사용하여 병렬화 합니다
from concurrent.futures import (
ProcessPoolExecutor, as_completed )
def fibonacci(n):
a, b = 0, 1
for _ in range(n):
a, b = b, b + a
else: return a
def elapsed_time(f):
def wrapper(args, **kwargs):
st = time.time()
v = f(args, **kwargs)
print(f"{f.__name__}: {time.time() - st}")
return v
return wrapper
@elapsed_time
def get_sequential(nums):
[fibonacci(num) for num in nums]
@elapsed_time
def get_multi_process(nums):
with ProcessPoolExecutor() as e:
futures = [e.submit(fibonacci, num) for num in nums]
[future.result() for future in as_completed(futures)]
def main(n):
# n = int(sys.argv[1])
nums = [n] * os.cpu_count()
get_multi_process(nums)
main(N)
get_multi_process: 0.23077726364135742
ProcessPoolExecutor
클래스는 queue
를 사용하여 프로세스간 객체를 전달 합니다.
queue
는 multiprocessing.Queue
클래스로 구현되고, pickle
형식으로 직렬화 됩니다pickle
화 불가능한 객체로 lambda
식으로 정의한 객체가 있습니다.from concurrent.futures import (
ProcessPoolExecutor,
wait
)
func = lambda: 1
def main():
with ProcessPoolExecutor() as e:
future = e.submit(func)
done, _ = wait([future])
print(future.result())
try:
main()
except Exception as e:
import termcolor # termcolor.COLORS
print(termcolor.colored(e, 'red'))
Can't pickle <function <lambda> at 0x7fe0a0107700>: attribute lookup <lambda> on __main__ failed
from concurrent.futures import (
ProcessPoolExecutor,
as_completed
)
import numpy as np
def use_numpy_random(): # 난수 생성기를 초기화
return np.random.random()
def main():
with ProcessPoolExecutor() as e:
futures = [
e.submit(use_numpy_random)
for _ in range(3)]
for future in as_completed(futures):
print(future.result())
if __name__ == '__main__':
main()
0.7548810825156606 0.7548810825156606 0.7548810825156606
from concurrent.futures import (
ProcessPoolExecutor,
as_completed
)
import random
def use_starndard_random():
return random.random()
def main():
with ProcessPoolExecutor() as e:
futures = [
e.submit(use_starndard_random)
for _ in range(3)]
for future in as_completed(futures):
print(future.result())
if __name__ == '__main__':
main()
0.21848652968019733 0.7947343618065531 0.9513253851747362
async def coro():
return 1
coro() # 반환값은 1이 아닌 코루틴 객체
<coroutine object coro at 0x7fe07807e0c0>
await coro()
1
import asyncio
import random
async def call_web_api(url):
# Web API 처리를 여기에서는 슬립(sleep)으로 대신함
print(f'send a request: {url}')
await asyncio.sleep(random.random())
print(f'got a response: {url}')
return url
async def async_download(url): # await를 사용해 코루틴을 호출
response = await call_web_api(url)
return response
result = await async_download('https://twitter.com/')
result
send a request: https://twitter.com/ got a response: https://twitter.com/
'https://twitter.com/'
async def main():
task = asyncio.gather(
async_download('https://twitter.com/'),
async_download('https://facebook.com'),
async_download('https://instagram.com'),
)
return await task
# Notebook 환경에서는 이미 이벤트 루프안에서 동작하고 있기 때문에 asyncio.run()은 사용할 수 없습니다.
# 따라서 Notebook 환경에서는 asyncio.run(X)는 await X로 바꿔 읽기 바랍니다.>>>
# result = asyncio.run(main())
result = await main()
result
send a request: https://twitter.com/ send a request: https://facebook.com send a request: https://instagram.com got a response: https://facebook.com got a response: https://twitter.com/ got a response: https://instagram.com
['https://twitter.com/', 'https://facebook.com', 'https://instagram.com']
import asyncio
async def main():
loop = asyncio.get_running_loop()
print(loop)
await main()
<_UnixSelectorEventLoop running=True closed=False debug=False>
── 스케줄링한 코루틴의 테스크 실행
asyncio.run()
, 2) 코루틴 내부에서 await
로 실행, 3) 테크스
를 만들어서 실행asyncio.Tast()
클래스의 인스턴스, asyncio.Future
의 서브 클래스 입니다.result()
를 사용하지 않고, 대신 await
키워드를 사용 합니다async def coro(n):
await asyncio.sleep(n)
return n
async def main():
task = asyncio.create_task(coro(1))
print(task)
return await task
# print() 시점에서는 아직 Pending 상태
await main()
<Task pending name='Task-11' coro=<coro() running at /tmp/ipykernel_37364/658887625.py:1>>
1
# 태스크를 작성해 실행
# 3초에 완료됨
async def main():
task1 = asyncio.create_task(coro(1))
task2 = asyncio.create_task(coro(2))
task3 = asyncio.create_task(coro(3))
print(await task1)
print(await task2)
print(await task3)
await main()
1 2 3
# 코루틴인 상태로 실행
# 6초에 완료됨
async def main():
print(await coro(1))
print(await coro(2))
print(await coro(3))
await main()
1 2 3
# 동기 I/O를 이용하는 처리의 태스크화
async def main():
loop = asyncio.get_running_loop()
# 동기 I/O를 이용하는 download에서 태스크를 작성
futures = [loop.run_in_executor(
None, download, url) for url in urls]
for result in await asyncio.gather(*futures):
print(result)
await main()
('https://twitter.com', './be8b09f7f1f66235a9c91986952483f0') ('https://facebook.com', './a023cfbf5f1c39bdf8407f28b60cd134') ('https://instagram.com', './09f8b89478d7e1046fa93c7ee4afa99e')
import aiohttp
import asyncio
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
html = await fetch(session, 'http://python.org')
print(html)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
except Exception as e:
import termcolor # termcolor.COLORS
print(termcolor.colored(e, 'red'))
This event loop is already running
/tmp/ipykernel_37364/1232096121.py:18: RuntimeWarning: coroutine 'main' was never awaited print(termcolor.colored(e, 'red')) RuntimeWarning: Enable tracemalloc to get the object allocation traceback