I'll try to:
I wont:
source: www.spec.org, github.com/samuelcolvin/analyze-spec-benchmarks
Machine = host/computer/virtual machine/container
import requests
def count_words(year: int):
resp = requests.get(f'https://ep{year}.europython.eu/en/')
print(f'{year}: {len(resp.text.split())}')
worker.py
from redis import Redis
from rq import Queue
from worker import count_words
q = Queue(connection=Redis())
for year in range(2016, 2020):
print(q.enqueue(count_words, year))
rq_example.py
from multiprocessing import Process, JoinableQueue
import requests
def count_words(year: int):
resp = requests.get(f'https://ep{year}.europython.eu/en/')
print(f'{year}: {len(resp.text.split())} words')
def worker(id):
while True:
item = q.get()
if item is None:
print('quitting worker', id)
break
count_words(item)
q.task_done()
q = JoinableQueue()
process = []
for id in range(2):
p = Process(target=worker, args=(id,))
p.start()
process.append(p)
for year in range(2016, 2020):
q.put(year)
q.join()
for _ in process:
q.put(None)
for p in process:
p.join()
➤ python multiprocessing_example.py
2017: 4123 words
2016: 3794 words
2019: 1953 words
2018: 4334 words
quitting worker 0
quitting worker 1
from queue import Queue
from threading import Thread
import requests
def count_words(year: int):
resp = requests.get(f'https://ep{year}.europython.eu/en/')
print(f'{year}: {len(resp.text.split())}')
def worker(id):
while True:
item = q.get()
if item is None:
print('quitting thread', id)
break
count_words(item)
q.task_done()
q = Queue()
threads = []
for id in range(2):
t = Thread(target=worker, args=(id,))
t.start()
threads.append(t)
for year in range(2016, 2020):
q.put(year)
q.join()
for _ in threads:
q.put(None)
for t in threads:
t.join()
➤ python threading_example.py
2017: 4123 words
2016: 3794 words
2019: 1953 words
2018: 4334 words
quitting worker 0
quitting worker 1
Memory locking is horrid -
The GIL limits the usefulness of threading with Python:
Do not communicate by sharing memory; instead, share memory by communicating.
- Go Proverb
GIL ... protects access to Python objects, preventing multiple threads from executing Python bytecodes at once
- Python Wiki
from queue import Queue
from threading import Thread
from time import time
def do_calcs(year: int):
print(sum(range(year * int(1e5))))
t1 = time()
for year in range(2016, 2020):
do_calcs(year)
t2 = time()
print(f'Time taken without threads: {t2 - t1:0.2f}s')
def worker(id):
while True:
item = q.get()
if item is None:
print('quitting thread', id)
break
do_calcs(item)
q.task_done()
t3 = time()
...
for year in range(2016, 2020):
q.put(year)
...
t4 = time()
print(f'Time taken with 2 threads: {t4 - t3:0.2f}s')
➤ python gil.py
20321279899200000
20341444899150000
20361619899100000
20381804899050000
Time taken without threads: 7.63s
20321279899200000
20341444899150000
20361619899100000
20381804899050000
quitting thread 1
quitting thread 0
Time taken with 2 threads: 7.65s
from queue import Queue
from threading import Thread
from time import time
import numpy as np
def do_calcs(year: int):
print(np.sum(np.arange(year * int(1e5))))
t1 = time()
for year in range(2016, 2020):
do_calcs(year)
t2 = time()
print(f'Time taken without threads: {t2 - t1:0.2f}s')
def worker(id):
while True:
item = q.get()
if item is None:
print('quitting thread', id)
break
do_calcs(item)
q.task_done()
t3 = time()
...
for year in range(2016, 2020):
q.put(year)
...
t4 = time()
print(f'Time taken with 2 threads: {t4 - t3:0.2f}s')
➤ python gil_numpy.py
20321279899200000
20341444899150000
20361619899100000
20381804899050000
Time taken without threads: 2.36s
20321279899200000
20341444899150000
20381804899050000
20361619899100000
quitting thread 1
quitting thread 0
Time taken with 2 threads: 1.34s
from aiohttp import ClientSession
import asyncio
async def count_words(year: int):
async with ClientSession() as session:
async with session.get(f'https://ep{year}.europython.eu/en/') as resp:
text = await resp.text()
print(f'{year}: {len(text.split())} words')
async def main():
coroutines = []
for year in range(2016, 2020):
coroutines.append(count_words(year))
await asyncio.gather(*coroutines)
asyncio.run(main())
➤ python asyncio_example.py
2019: 1953 words
2017: 4123 words
2016: 3782 words
2018: 4334 words
explicit cooperative scheduling is awesome, but it can't be implicit
- me
Machines
Processes
Threads
Asyncio
rq forks the main process to run the worker
ThreadPoolExecutor
ProcessPoolExecutor
aiohttp, arq
multiprocessing.Queue
from concurrent.futures import ThreadPoolExecutor
import asyncio
from time import time
import numpy as np
def do_calcs(year: int):
print(np.sum(np.arange(year * int(1e5))))
async def main():
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(max_workers=2) as pool:
coroutines = [
loop.run_in_executor(pool, do_calcs, v)
for v in range(2016, 2020)
]
await asyncio.gather(*coroutines)
t1 = time()
asyncio.run(main())
print(f'Time taken with 2 threads: {time() - t1:0.2f}s')
➤ python asyncio_numpy.py
20321279899200000
20341444899150000
20381804899050000
20361619899100000
Time taken with 2 threads: 1.27s
It's easy to read the docs but the tricky thing (and what I tried to do today) is understanding the big picture
this presentation: tiny.cc/pythonsppp