{{tag>Brouillon Python CA}} = Notes Python multithreading Voir : * https://plainenglish.io/blog/send-http-requests-as-fast-as-possible-in-python-304134d46604 * https://cppsecrets.com/users/1102811497104117108109111104116975048484864103109971051084699111109/Blocking-and-non-blocking-threads-in-python-Simplified.php * asyncio / aiohttp / multiprocess / trio / grequests Équivalent à ''defer'' (go lang) : * https://stackoverflow.com/questions/34625089/python-equivalent-of-golangs-defer-statement * https://pypi.org/project/pygolang/ * https://pypi.org/project/python-defer/ Thread safe : * https://plainenglish.io/blog/thread-safety-in-python * https://jonbleiberg.medium.com/how-python-keeps-your-queues-thread-safe-4b66a2f3e692 Nombre de CPU logiques import multiprocessing multiprocessing.cpu_count() == Exemples from threading import Thread import requests THREAD_COUNT = 6 def callback(): try: while True: r = requests.get('http://www.emrecetin.net') print(r) except KeyboardInterrupt: return if __name__ == '__main__': threads = [] for i in range(THREAD_COUNT): t = Thread(target=callback) threads.append(t) t.start() for t in threads: t.join() Source : https://gist.github.com/emrectn/aea6d955b37bd15687d0112d236f8a3b import requests import threading def make_request(url): response = requests.get(url) print(f"Response from {url}: {response.status_code}") # List of URLs to make requests to urls = [ "https://www.example.com", "https://www.google.com", "https://www.wikipedia.org", "https://www.python.org" ] # Create and start threads for each URL threads = [] for url in urls: thread = threading.Thread(target=make_request, args=(url,)) thread.start() threads.append(thread) # Wait for all threads to finish for thread in threads: thread.join() Source : https://www.w3resource.com/python-exercises/threading/python-multi-threading-exercise-7.php import grequests class Test: def __init__(self): self.urls = [ 'http://www.example.com', 'http://www.google.com', 'http://www.yahoo.com', 'http://www.stackoverflow.com/', 'http://www.reddit.com/' ] def exception(self, request, exception): print "Problem: {}: {}".format(request.url, exception) def async(self): results = grequests.map((grequests.get(u) for u in self.urls), exception_handler=self.exception, size=5) print results test = Test() test.async() Source : https://stackoverflow.com/questions/38280094/python-requests-with-multithreading == Alternatives Multi threads à map === Exemple 1 Voir : * https://www.digitalocean.com/community/tutorials/how-to-use-threadpoolexecutor-in-python-3-fr * https://www.youtube.com/watch?v=OB0_a8P-aqY import concurrent.futures with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: executor.map(get_dataset, URLS) ou encore from multiprocessing import Pool p = Pool(12) p.map(process_archive, zip_files) Voir : https://softhints.com/parallel-processing-zip-archive-csv-files-python-and-pandas/ === Exemple 2 from multiprocessing import Pool pool = Pool() result1 = pool.apply_async(solve1, [A]) # evaluate "solve1(A)" asynchronously result2 = pool.apply_async(solve2, [B]) # evaluate "solve2(B)" asynchronously answer1 = result1.get(timeout=10) answer2 = result2.get(timeout=10) args = [A, B] results = pool.map(solve1, args) Source : https://stackoverflow.com/questions/20548628/how-to-do-parallel-programming-in-python === Exemple 3 You can apply the function to every element in a list using the map() function: list(map(square, [1, 2, 3, 4, 5, 6])) The multiprocessing.pool.Pool class provides an equivalent but parallelized (via multiprocessing) way of doing this. The pool class, by default, creates one new process per CPU and does parallel calculations on the list: from multiprocessing import Pool with Pool() as pool: pool.map(square, [1, 2, 3, 4, 5, 6]) Source : https://aaltoscicomp.github.io/python-for-scicomp/parallel/ ------------------ == Notes async asyncio await Voir : * [[Python - callback]] * https://www.metal3d.org/blog/2020/d%C3%A9mystifier-python-async/ * https://fastapi.tiangolo.com/fr/async/ * https://docs.python.org/3/library/asyncio-task.html * https://stackless.readthedocs.io/en/v3.6.13-slp/library/asyncio-task.html * https://stackoverflow.com/questions/65973786/why-does-asyncio-sleep0-make-my-code-faster * https://zestedesavoir.com/articles/3306/plongee-au-coeur-de-lasynchrone-en-python/ * https://copyprogramming.com/howto/how-to-use-python-async-task-without-await * https://www.pythontutorial.net/python-concurrency/python-asyncio-wait/ * https://realpython.com/async-io-python/ * https://www.youtube.com/watch?v=FqXmE8KaIR0 * https://www.youtube.com/watch?v=0GVLtTnebNA Async Queues : * https://superfastpython.com/asyncio-queue/ A savoir : * Vous pouvez uniquement utiliser ''await'' dans les fonctions créées avec ''async def'' * Une Coroutune déclarée avec ''async def'' ne peut être appelée que par * ''await'' si le résultat doit être attendu * ''create_task()'' (''asyncio.create_task()'' ou ''asyncio.TaskGroup'') pour ne pas attendre le résultat * ''asyncio.gather()'' * ''asyncio.run()'' * "**Coroutine**" est juste un terme élaboré pour désigner ce qui est retourné par une fonction définie avec ''async def''. Python sait que c'est comme une fonction classique qui va démarrer à un moment et terminer à un autre, mais qu'elle peut aussi être mise en pause, du moment qu'il y a un ''await'' dans son contenu. * ''time.sleep()'' (ainsi que d'autres lib/fonction) n'est pas compatible avec ''asyncio''. A la place il faut utiliser ''asyncio.sleep()'' === Sleep An important use for sleep in asyncio programs is to suspend the current task and allow other coroutines to execute. It is important because although a task or coroutine can easily schedule new tasks via the create_task() or gather() function, the scheduled tasks will not begin executing until the current task is suspended. Even sleeping for zero seconds is enough to suspend the current task and give an opportunity to other tasks to run. For example: # allow other tasks to run for a moment await asyncio.sleep(0) Finally, a good use for sleep is to simulate blocking tasks in a concurrent program. Source : https://superfastpython.com/asyncio-sleep/ === Autres Ne pas quiter avant la fin du traitement des tâches async def main(): # Create some tasks. for _ in range(10): asyncio.create_task(asyncio.sleep(10)) # Wait for all other tasks to finish other than the current task i.e. main(). await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()}) Source : https://stackoverflow.com/questions/27796294/when-using-asyncio-how-do-you-allow-all-running-tasks-to-finish-before-shutting asyncio & threading import asyncio import threading async def something_async(): print('something_async start in thread:', threading.current_thread()) await asyncio.sleep(1) print('something_async done in thread:', threading.current_thread()) def main(): t1 = threading.Thread(target=asyncio.run, args=(something_async(), )) t2 = threading.Thread(target=asyncio.run, args=(something_async(), )) t1.start() t2.start() t1.join() t2.join() if __name__ == '__main__': main() Source : https://stackoverflow.com/questions/57234827/wait-for-async-function-to-complete Autre import time from random import randint period = 1 # Second def get_epoch_ms(): return int(time.time() * 1000.0) async def do_something(name): print("Start :", name, get_epoch_ms()) try: # Do something which may takes more than 1 secs. slp = randint(1, 5) print("Sleep :", name, get_epoch_ms(), slp) await asyncio.sleep(slp) except Exception as e: print("Error :", e) print("Finish :", name, get_epoch_ms()) loop = asyncio.get_event_loop() futures = [loop.create_task(do_something('T' + str(i))) for i in range(5)] #loop.run_forever() #for f in futures: # f.cancel() for f in futures: loop.run_until_complete(f) Source : https://stackoverflow.com/questions/56318648/how-to-run-an-asyncio-task-without-awaiting Thread safe from collections import deque thread_safe_deque = deque() # Thread 1 thread_safe_deque.append(1) # Thread 2 element = thread_safe_deque.pop() Source : https://www.cloudthat.com/resources/blog/writing-thread-safe-programs-in-python --------- Source : nsclient ''NSCP-0.8.0-x64/scripts/python'' ''badapp.py'' #! /usr/bin/env python3 import threading class BadThread(threading.Thread): id = -1 def __init__(self, id): self.id = id threading.Thread.__init__(self) def run(self): i = 0 while(True): i = i + 1 if i > 100000: print('Processing: %d'%self.id) i = 0 for x in range(1000): BadThread(x).start()