{{tag>Brouillon Python CA}}
= Notes Python multithreading
Voir :
* https://plainenglish.io/blog/send-http-requests-as-fast-as-possible-in-python-304134d46604
* https://cppsecrets.com/users/1102811497104117108109111104116975048484864103109971051084699111109/Blocking-and-non-blocking-threads-in-python-Simplified.php
* asyncio / aiohttp / multiprocess / trio / grequests
Équivalent à ''defer'' (go lang) :
* https://stackoverflow.com/questions/34625089/python-equivalent-of-golangs-defer-statement
* https://pypi.org/project/pygolang/
* https://pypi.org/project/python-defer/
Thread safe :
* https://plainenglish.io/blog/thread-safety-in-python
* https://jonbleiberg.medium.com/how-python-keeps-your-queues-thread-safe-4b66a2f3e692
Nombre de CPU logiques
import multiprocessing
multiprocessing.cpu_count()
== Exemples
from threading import Thread
import requests
THREAD_COUNT = 6
def callback():
try:
while True:
r = requests.get('http://www.emrecetin.net')
print(r)
except KeyboardInterrupt:
return
if __name__ == '__main__':
threads = []
for i in range(THREAD_COUNT):
t = Thread(target=callback)
threads.append(t)
t.start()
for t in threads:
t.join()
Source : https://gist.github.com/emrectn/aea6d955b37bd15687d0112d236f8a3b
import requests
import threading
def make_request(url):
response = requests.get(url)
print(f"Response from {url}: {response.status_code}")
# List of URLs to make requests to
urls = [
"https://www.example.com",
"https://www.google.com",
"https://www.wikipedia.org",
"https://www.python.org"
]
# Create and start threads for each URL
threads = []
for url in urls:
thread = threading.Thread(target=make_request, args=(url,))
thread.start()
threads.append(thread)
# Wait for all threads to finish
for thread in threads:
thread.join()
Source : https://www.w3resource.com/python-exercises/threading/python-multi-threading-exercise-7.php
import grequests
class Test:
def __init__(self):
self.urls = [
'http://www.example.com',
'http://www.google.com',
'http://www.yahoo.com',
'http://www.stackoverflow.com/',
'http://www.reddit.com/'
]
def exception(self, request, exception):
print "Problem: {}: {}".format(request.url, exception)
def async(self):
results = grequests.map((grequests.get(u) for u in self.urls), exception_handler=self.exception, size=5)
print results
test = Test()
test.async()
Source : https://stackoverflow.com/questions/38280094/python-requests-with-multithreading
== Alternatives Multi threads à map
=== Exemple 1
Voir :
* https://www.digitalocean.com/community/tutorials/how-to-use-threadpoolexecutor-in-python-3-fr
* https://www.youtube.com/watch?v=OB0_a8P-aqY
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
executor.map(get_dataset, URLS)
ou encore
from multiprocessing import Pool
p = Pool(12)
p.map(process_archive, zip_files)
Voir : https://softhints.com/parallel-processing-zip-archive-csv-files-python-and-pandas/
=== Exemple 2
from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A]) # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B]) # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)
args = [A, B]
results = pool.map(solve1, args)
Source : https://stackoverflow.com/questions/20548628/how-to-do-parallel-programming-in-python
=== Exemple 3
You can apply the function to every element in a list using the map() function:
list(map(square, [1, 2, 3, 4, 5, 6]))
The multiprocessing.pool.Pool class provides an equivalent but parallelized (via multiprocessing) way of doing this. The pool class, by default, creates one new process per CPU and does parallel calculations on the list:
from multiprocessing import Pool
with Pool() as pool:
pool.map(square, [1, 2, 3, 4, 5, 6])
Source : https://aaltoscicomp.github.io/python-for-scicomp/parallel/
------------------
== Notes async asyncio await
Voir :
* [[Python - callback]]
* https://www.metal3d.org/blog/2020/d%C3%A9mystifier-python-async/
* https://fastapi.tiangolo.com/fr/async/
* https://docs.python.org/3/library/asyncio-task.html
* https://stackless.readthedocs.io/en/v3.6.13-slp/library/asyncio-task.html
* https://stackoverflow.com/questions/65973786/why-does-asyncio-sleep0-make-my-code-faster
* https://zestedesavoir.com/articles/3306/plongee-au-coeur-de-lasynchrone-en-python/
* https://copyprogramming.com/howto/how-to-use-python-async-task-without-await
* https://www.pythontutorial.net/python-concurrency/python-asyncio-wait/
* https://realpython.com/async-io-python/
* https://www.youtube.com/watch?v=FqXmE8KaIR0
* https://www.youtube.com/watch?v=0GVLtTnebNA
Async Queues :
* https://superfastpython.com/asyncio-queue/
A savoir :
* Vous pouvez uniquement utiliser ''await'' dans les fonctions créées avec ''async def''
* Une Coroutune déclarée avec ''async def'' ne peut être appelée que par
* ''await'' si le résultat doit être attendu
* ''create_task()'' (''asyncio.create_task()'' ou ''asyncio.TaskGroup'') pour ne pas attendre le résultat
* ''asyncio.gather()''
* ''asyncio.run()''
* "**Coroutine**" est juste un terme élaboré pour désigner ce qui est retourné par une fonction définie avec ''async def''. Python sait que c'est comme une fonction classique qui va démarrer à un moment et terminer à un autre, mais qu'elle peut aussi être mise en pause, du moment qu'il y a un ''await'' dans son contenu.
* ''time.sleep()'' (ainsi que d'autres lib/fonction) n'est pas compatible avec ''asyncio''. A la place il faut utiliser ''asyncio.sleep()''
=== Sleep
An important use for sleep in asyncio programs is to suspend the current task and allow other coroutines to execute.
It is important because although a task or coroutine can easily schedule new tasks via the create_task() or gather() function, the scheduled tasks will not begin executing until the current task is suspended.
Even sleeping for zero seconds is enough to suspend the current task and give an opportunity to other tasks to run.
For example:
# allow other tasks to run for a moment
await asyncio.sleep(0)
Finally, a good use for sleep is to simulate blocking tasks in a concurrent program.
Source : https://superfastpython.com/asyncio-sleep/
=== Autres
Ne pas quiter avant la fin du traitement des tâches
async def main():
# Create some tasks.
for _ in range(10):
asyncio.create_task(asyncio.sleep(10))
# Wait for all other tasks to finish other than the current task i.e. main().
await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()})
Source : https://stackoverflow.com/questions/27796294/when-using-asyncio-how-do-you-allow-all-running-tasks-to-finish-before-shutting
asyncio & threading
import asyncio
import threading
async def something_async():
print('something_async start in thread:', threading.current_thread())
await asyncio.sleep(1)
print('something_async done in thread:', threading.current_thread())
def main():
t1 = threading.Thread(target=asyncio.run, args=(something_async(), ))
t2 = threading.Thread(target=asyncio.run, args=(something_async(), ))
t1.start()
t2.start()
t1.join()
t2.join()
if __name__ == '__main__':
main()
Source : https://stackoverflow.com/questions/57234827/wait-for-async-function-to-complete
Autre
import time
from random import randint
period = 1 # Second
def get_epoch_ms():
return int(time.time() * 1000.0)
async def do_something(name):
print("Start :", name, get_epoch_ms())
try:
# Do something which may takes more than 1 secs.
slp = randint(1, 5)
print("Sleep :", name, get_epoch_ms(), slp)
await asyncio.sleep(slp)
except Exception as e:
print("Error :", e)
print("Finish :", name, get_epoch_ms())
loop = asyncio.get_event_loop()
futures = [loop.create_task(do_something('T' + str(i)))
for i in range(5)]
#loop.run_forever()
#for f in futures:
# f.cancel()
for f in futures:
loop.run_until_complete(f)
Source : https://stackoverflow.com/questions/56318648/how-to-run-an-asyncio-task-without-awaiting
Thread safe
from collections import deque
thread_safe_deque = deque()
# Thread 1
thread_safe_deque.append(1)
# Thread 2
element = thread_safe_deque.pop()
Source : https://www.cloudthat.com/resources/blog/writing-thread-safe-programs-in-python
---------
Source : nsclient ''NSCP-0.8.0-x64/scripts/python''
''badapp.py''
#! /usr/bin/env python3
import threading
class BadThread(threading.Thread):
id = -1
def __init__(self, id):
self.id = id
threading.Thread.__init__(self)
def run(self):
i = 0
while(True):
i = i + 1
if i > 100000:
print('Processing: %d'%self.id)
i = 0
for x in range(1000):
BadThread(x).start()