Outils pour utilisateurs

Outils du site


tech:notes_python_multithreading

Notes Python multithreading

Exemples

from threading import Thread
import requests
 
THREAD_COUNT = 6
 
def callback():
    try:
        while True:
            r = requests.get('http://www.emrecetin.net')
            print(r)
    except KeyboardInterrupt:
        return
 
if __name__ == '__main__':
    threads = []
    for i in range(THREAD_COUNT):
        t = Thread(target=callback)
        threads.append(t)
        t.start()
 
    for t in threads:
        t.join()

Source : https://gist.github.com/emrectn/aea6d955b37bd15687d0112d236f8a3b

import requests
import threading
def make_request(url):
    response = requests.get(url)
    print(f"Response from {url}: {response.status_code}")
# List of URLs to make requests to
urls = [
    "https://www.example.com",
    "https://www.google.com",
    "https://www.wikipedia.org",
    "https://www.python.org"
]
# Create and start threads for each URL
threads = []
for url in urls:
    thread = threading.Thread(target=make_request, args=(url,))
    thread.start()
    threads.append(thread)
# Wait for all threads to finish
for thread in threads:
    thread.join()

Source : https://www.w3resource.com/python-exercises/threading/python-multi-threading-exercise-7.php

import grequests
 
class Test:
    def __init__(self):
        self.urls = [
            'http://www.example.com',
            'http://www.google.com', 
            'http://www.yahoo.com',
            'http://www.stackoverflow.com/',
            'http://www.reddit.com/'
        ]
 
    def exception(self, request, exception):
        print "Problem: {}: {}".format(request.url, exception)
 
    def async(self):
        results = grequests.map((grequests.get(u) for u in self.urls), exception_handler=self.exception, size=5)
        print results
 
test = Test()
test.async()

Source : https://stackoverflow.com/questions/38280094/python-requests-with-multithreading

Alternatives Multi threads à map

Exemple 1

Voir :

import concurrent.futures
 
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    executor.map(get_dataset, URLS)

ou encore

from multiprocessing import Pool
 
p = Pool(12)
p.map(process_archive, zip_files)

Voir : https://softhints.com/parallel-processing-zip-archive-csv-files-python-and-pandas/

Exemple 2

from multiprocessing import Pool
 
pool = Pool()
result1 = pool.apply_async(solve1, [A])    # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B])    # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)
args = [A, B]
results = pool.map(solve1, args)

Source : https://stackoverflow.com/questions/20548628/how-to-do-parallel-programming-in-python

Exemple 3

You can apply the function to every element in a list using the map() function:

list(map(square, [1, 2, 3, 4, 5, 6]))

The multiprocessing.pool.Pool class provides an equivalent but parallelized (via multiprocessing) way of doing this. The pool class, by default, creates one new process per CPU and does parallel calculations on the list:

from multiprocessing import Pool
 
with Pool() as pool:
 
    pool.map(square, [1, 2, 3, 4, 5, 6])

Source : https://aaltoscicomp.github.io/python-for-scicomp/parallel/


Notes async asyncio await

Voir :

Async Queues :

A savoir :

  • Vous pouvez uniquement utiliser await dans les fonctions créées avec async def
  • Une Coroutune déclarée avec async def ne peut être appelée que par
    • await si le résultat doit être attendu
    • create_task() (asyncio.create_task() ou asyncio.TaskGroup) pour ne pas attendre le résultat
    • asyncio.gather()
    • asyncio.run()
  • Coroutine” est juste un terme élaboré pour désigner ce qui est retourné par une fonction définie avec async def. Python sait que c'est comme une fonction classique qui va démarrer à un moment et terminer à un autre, mais qu'elle peut aussi être mise en pause, du moment qu'il y a un await dans son contenu.
  • time.sleep() (ainsi que d'autres lib/fonction) n'est pas compatible avec asyncio. A la place il faut utiliser asyncio.sleep()

Sleep

An important use for sleep in asyncio programs is to suspend the current task and allow other coroutines to execute.

It is important because although a task or coroutine can easily schedule new tasks via the create_task() or gather() function, the scheduled tasks will not begin executing until the current task is suspended.

Even sleeping for zero seconds is enough to suspend the current task and give an opportunity to other tasks to run.

For example:

# allow other tasks to run for a moment
await asyncio.sleep(0)

Finally, a good use for sleep is to simulate blocking tasks in a concurrent program.

Source : https://superfastpython.com/asyncio-sleep/

Autres

Ne pas quiter avant la fin du traitement des tâches

async def main():
    # Create some tasks.
    for _ in range(10):
        asyncio.create_task(asyncio.sleep(10))
    # Wait for all other tasks to finish other than the current task i.e. main().
    await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()})

Source : https://stackoverflow.com/questions/27796294/when-using-asyncio-how-do-you-allow-all-running-tasks-to-finish-before-shutting

asyncio & threading

import asyncio
import threading
 
async def something_async():
    print('something_async start in thread:', threading.current_thread())
    await asyncio.sleep(1)
    print('something_async done in thread:', threading.current_thread())
 
def main():
    t1 = threading.Thread(target=asyncio.run, args=(something_async(), ))
    t2 = threading.Thread(target=asyncio.run, args=(something_async(), ))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
 
if __name__ == '__main__':
    main()

Source : https://stackoverflow.com/questions/57234827/wait-for-async-function-to-complete

Autre

import time
from random import randint
 
period = 1  # Second
 
def get_epoch_ms():
    return int(time.time() * 1000.0)
 
async def do_something(name):
    print("Start  :", name, get_epoch_ms())
    try:
        # Do something which may takes more than 1 secs.
        slp = randint(1, 5)
        print("Sleep  :", name, get_epoch_ms(), slp)
        await asyncio.sleep(slp)
    except Exception as e:
        print("Error  :", e)
 
    print("Finish :", name, get_epoch_ms())
 
loop = asyncio.get_event_loop()
futures = [loop.create_task(do_something('T' + str(i)))
           for i in range(5)]
 
#loop.run_forever()
 
#for f in futures:
#    f.cancel()
 
for f in futures:
     loop.run_until_complete(f)

Source : https://stackoverflow.com/questions/56318648/how-to-run-an-asyncio-task-without-awaiting

Thread safe

from collections import deque
 
thread_safe_deque = deque()
 
# Thread 1
thread_safe_deque.append(1)
 
# Thread 2
element = thread_safe_deque.pop()

Source : https://www.cloudthat.com/resources/blog/writing-thread-safe-programs-in-python


Source : nsclient NSCP-0.8.0-x64/scripts/python

badapp.py

#! /usr/bin/env python3
 
import threading
 
class BadThread(threading.Thread):
        id = -1
        def __init__(self, id):
                self.id = id
                threading.Thread.__init__(self)
 
        def run(self):
                i = 0
                while(True):
                        i = i + 1
                        if i > 100000:
                                print('Processing: %d'%self.id)
                                i = 0
 
for x in range(1000):
        BadThread(x).start()
tech/notes_python_multithreading.txt · Dernière modification : de Jean-Baptiste

Donate Powered by PHP Valid HTML5 Valid CSS Driven by DokuWiki