Table des matières

, ,

Notes Python multithreading

Voir :

Équivalent à defer (go lang) :

Thread safe :

Nombre de CPU logiques

import multiprocessing
 
multiprocessing.cpu_count()

Exemples

from threading import Thread
import requests
 
THREAD_COUNT = 6
 
def callback():
    try:
        while True:
            r = requests.get('http://www.emrecetin.net')
            print(r)
    except KeyboardInterrupt:
        return
 
if __name__ == '__main__':
    threads = []
    for i in range(THREAD_COUNT):
        t = Thread(target=callback)
        threads.append(t)
        t.start()
 
    for t in threads:
        t.join()

Source : https://gist.github.com/emrectn/aea6d955b37bd15687d0112d236f8a3b

import requests
import threading
def make_request(url):
    response = requests.get(url)
    print(f"Response from {url}: {response.status_code}")
# List of URLs to make requests to
urls = [
    "https://www.example.com",
    "https://www.google.com",
    "https://www.wikipedia.org",
    "https://www.python.org"
]
# Create and start threads for each URL
threads = []
for url in urls:
    thread = threading.Thread(target=make_request, args=(url,))
    thread.start()
    threads.append(thread)
# Wait for all threads to finish
for thread in threads:
    thread.join()

Source : https://www.w3resource.com/python-exercises/threading/python-multi-threading-exercise-7.php

import grequests
 
class Test:
    def __init__(self):
        self.urls = [
            'http://www.example.com',
            'http://www.google.com', 
            'http://www.yahoo.com',
            'http://www.stackoverflow.com/',
            'http://www.reddit.com/'
        ]
 
    def exception(self, request, exception):
        print "Problem: {}: {}".format(request.url, exception)
 
    def async(self):
        results = grequests.map((grequests.get(u) for u in self.urls), exception_handler=self.exception, size=5)
        print results
 
test = Test()
test.async()

Source : https://stackoverflow.com/questions/38280094/python-requests-with-multithreading

Alternatives Multi threads à map

Exemple 1

Voir :

import concurrent.futures
 
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    executor.map(get_dataset, URLS)

ou encore

from multiprocessing import Pool
 
p = Pool(12)
p.map(process_archive, zip_files)

Voir : https://softhints.com/parallel-processing-zip-archive-csv-files-python-and-pandas/

Exemple 2

from multiprocessing import Pool
 
pool = Pool()
result1 = pool.apply_async(solve1, [A])    # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B])    # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)
args = [A, B]
results = pool.map(solve1, args)

Source : https://stackoverflow.com/questions/20548628/how-to-do-parallel-programming-in-python

Exemple 3

You can apply the function to every element in a list using the map() function:

list(map(square, [1, 2, 3, 4, 5, 6]))

The multiprocessing.pool.Pool class provides an equivalent but parallelized (via multiprocessing) way of doing this. The pool class, by default, creates one new process per CPU and does parallel calculations on the list:

from multiprocessing import Pool
 
with Pool() as pool:
 
    pool.map(square, [1, 2, 3, 4, 5, 6])

Source : https://aaltoscicomp.github.io/python-for-scicomp/parallel/


Notes async asyncio await

Voir :

Async Queues :

A savoir :

Sleep

An important use for sleep in asyncio programs is to suspend the current task and allow other coroutines to execute.

It is important because although a task or coroutine can easily schedule new tasks via the create_task() or gather() function, the scheduled tasks will not begin executing until the current task is suspended.

Even sleeping for zero seconds is enough to suspend the current task and give an opportunity to other tasks to run.

For example:

# allow other tasks to run for a moment
await asyncio.sleep(0)

Finally, a good use for sleep is to simulate blocking tasks in a concurrent program.

Source : https://superfastpython.com/asyncio-sleep/

Autres

Ne pas quiter avant la fin du traitement des tâches

async def main():
    # Create some tasks.
    for _ in range(10):
        asyncio.create_task(asyncio.sleep(10))
    # Wait for all other tasks to finish other than the current task i.e. main().
    await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()})

Source : https://stackoverflow.com/questions/27796294/when-using-asyncio-how-do-you-allow-all-running-tasks-to-finish-before-shutting

asyncio & threading

import asyncio
import threading
 
async def something_async():
    print('something_async start in thread:', threading.current_thread())
    await asyncio.sleep(1)
    print('something_async done in thread:', threading.current_thread())
 
def main():
    t1 = threading.Thread(target=asyncio.run, args=(something_async(), ))
    t2 = threading.Thread(target=asyncio.run, args=(something_async(), ))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
 
if __name__ == '__main__':
    main()

Source : https://stackoverflow.com/questions/57234827/wait-for-async-function-to-complete

Autre

import time
from random import randint
 
period = 1  # Second
 
def get_epoch_ms():
    return int(time.time() * 1000.0)
 
async def do_something(name):
    print("Start  :", name, get_epoch_ms())
    try:
        # Do something which may takes more than 1 secs.
        slp = randint(1, 5)
        print("Sleep  :", name, get_epoch_ms(), slp)
        await asyncio.sleep(slp)
    except Exception as e:
        print("Error  :", e)
 
    print("Finish :", name, get_epoch_ms())
 
loop = asyncio.get_event_loop()
futures = [loop.create_task(do_something('T' + str(i)))
           for i in range(5)]
 
#loop.run_forever()
 
#for f in futures:
#    f.cancel()
 
for f in futures:
     loop.run_until_complete(f)

Source : https://stackoverflow.com/questions/56318648/how-to-run-an-asyncio-task-without-awaiting

Thread safe

from collections import deque
 
thread_safe_deque = deque()
 
# Thread 1
thread_safe_deque.append(1)
 
# Thread 2
element = thread_safe_deque.pop()

Source : https://www.cloudthat.com/resources/blog/writing-thread-safe-programs-in-python


Source : nsclient NSCP-0.8.0-x64/scripts/python

badapp.py

#! /usr/bin/env python3
 
import threading
 
class BadThread(threading.Thread):
        id = -1
        def __init__(self, id):
                self.id = id
                threading.Thread.__init__(self)
 
        def run(self):
                i = 0
                while(True):
                        i = i + 1
                        if i > 100000:
                                print('Processing: %d'%self.id)
                                i = 0
 
for x in range(1000):
        BadThread(x).start()