Table des matières
4 billet(s) pour janvier 2026
| AWX sur K8S Kind - partage de fichier pour les blob - Execution pods | 2026/01/26 10:15 | Jean-Baptiste |
| Notes rsh rcp | 2026/01/21 18:08 | Jean-Baptiste |
| Git - Duplication d'un dépôt | 2026/01/19 10:22 | Jean-Baptiste |
| Exemple simple de conf Nagios | 2026/01/14 10:07 | Jean-Baptiste |
Notes RAID
echo 1000000 >/proc/sys/dev/raid/speed_limit_max echo 200000 >/proc/sys/dev/raid/speed_limit_min
# cat /proc/sys/dev/raid/speed_limit_max 200000 # cat /proc/sys/dev/raid/speed_limit_min 1000
cat /proc/mdstat | grep -B2 reco cat /proc/mdstat |grep recovery mdadm -D /dev/md1
Notes Python syntaxe
Liste de tous les mots clefs Python
#help("keywords") import keyword keyword.kwlist
['False', 'None', 'True', 'and', 'as', 'assert', 'async', 'await', 'break', 'class', 'continue', 'def', 'del', 'elif', 'else', 'except', 'finally', 'for', 'from', 'global', 'if', 'import', 'in', 'is', 'lambda', 'nonlocal', 'not', 'or', 'pass', 'raise', 'return', 'try', 'while', 'with', 'yield']
keyword.softkwlist
['_', 'case', 'match']
Fonctions
Annotations de fonctions
def func(a: int, b: int, c: int) -> int: return a + b + c
Liste des fonctions natives
Syntaxe
kw = dict( changed=changed, query=cursor.query, statusmessage=statusmessage, query_result=query_result, rowcount=rowcount if rowcount >= 0 else 0, )
In [7]: rowcount=-1 In [8]: print(rowcount) if rowcount >= 0 else 0 Out[8]: 0
i: int = 10 s: str = 'string'
Lambda
# Simple quote a string q = lambda s: f"'{s}'" # Double quote a string qq = lambda s: f'"{s}"'
Cast
String to bool
tls_verify = environ.get("VERIFY_SSL") # String to bool tls_verify = tls_verify.lower in ("true",)
ou encore
def str2bool(v): return v.lower() in ("yes", "true", "t", "1")
Décorateur
Exemple
def decorteur_print(_): def print_tmp(*args, **kv): print("Function print called") print(*args, **kv) return print_tmp @decorteur_print def print2(*args, **kv): print(*args, **kv) print2("Foo", "Bar", sep=";")
Surcharger une fonction
# On copie notre fonction print_bak = print def decorteur_print(_): def print_tmp(*args, **kv): print_bak("Function print called") print_bak(*args, **kv) return print_tmp @decorteur_print def print(*args, **kv): print_bak(*args, **kv) print("Foo", "Bar", sep=";")
Générateurs
Yield vs yield from
The yield statement
The generator is a function that returns an iterator. Normally a generator function has a yield statement. To define a generator function we need to use a yield statement as shown below:
def generate_number(): for x in range(10): yield x
The generate_number has a yield statement, so it is a generator function. Let’s print the values from the generator :
gen = generate_number() for number in gen: print(number)
0 1 2 3 4 5 6 7 8 9
The “yield from” statement
The “yield from” allows us to yield from another generator function or iterables. Let’s understand with some examples.
Consider the following generator function :
def gen_number(): yield 1 yield 2 yield 3
We can print the generator values using a loop.
for num in gen_number(): print(num)
1 2 3
Let’s consider another function(gen_cool_numbers) using the gen_number generator function.
def gen_cool_numbers(): yield 1000 for num in gen_number(): yield num yield 2000 for x in gen_cool_numbers(): print(x)
1000 1 2 3 2000
We can update the gen_cool_numbers function by replacing the for loop with yield from as shown below:
def gen_cool_numbers(): yield 1000 yield from gen_number() yield 2000 for x in gen_cool_numbers(): print(x)
1000 1 2 3 2000
Yield from can also be used with iterable(list, tuple, string etc
def gen_cool_numbers(): yield 1000 yield from [50,60,80] yield 2000 for x in gen_cool_numbers(): print(x)
1000 50 60 80 2000
Source : https://nolowiz.com/difference-between-yield-and-yield-from-in-python/
Notes Python multithreading
Voir :
- asyncio / aiohttp / multiprocess / trio / grequests
Équivalent à defer (go lang) :
Thread safe :
Nombre de CPU logiques
import multiprocessing multiprocessing.cpu_count()
Exemples
from threading import Thread import requests THREAD_COUNT = 6 def callback(): try: while True: r = requests.get('http://www.emrecetin.net') print(r) except KeyboardInterrupt: return if __name__ == '__main__': threads = [] for i in range(THREAD_COUNT): t = Thread(target=callback) threads.append(t) t.start() for t in threads: t.join()
Source : https://gist.github.com/emrectn/aea6d955b37bd15687d0112d236f8a3b
import requests import threading def make_request(url): response = requests.get(url) print(f"Response from {url}: {response.status_code}") # List of URLs to make requests to urls = [ "https://www.example.com", "https://www.google.com", "https://www.wikipedia.org", "https://www.python.org" ] # Create and start threads for each URL threads = [] for url in urls: thread = threading.Thread(target=make_request, args=(url,)) thread.start() threads.append(thread) # Wait for all threads to finish for thread in threads: thread.join()
Source : https://www.w3resource.com/python-exercises/threading/python-multi-threading-exercise-7.php
import grequests class Test: def __init__(self): self.urls = [ 'http://www.example.com', 'http://www.google.com', 'http://www.yahoo.com', 'http://www.stackoverflow.com/', 'http://www.reddit.com/' ] def exception(self, request, exception): print "Problem: {}: {}".format(request.url, exception) def async(self): results = grequests.map((grequests.get(u) for u in self.urls), exception_handler=self.exception, size=5) print results test = Test() test.async()
Source : https://stackoverflow.com/questions/38280094/python-requests-with-multithreading
Alternatives Multi threads à map
Exemple 1
Voir :
import concurrent.futures with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: executor.map(get_dataset, URLS)
ou encore
from multiprocessing import Pool p = Pool(12) p.map(process_archive, zip_files)
Voir : https://softhints.com/parallel-processing-zip-archive-csv-files-python-and-pandas/
Exemple 2
from multiprocessing import Pool pool = Pool() result1 = pool.apply_async(solve1, [A]) # evaluate "solve1(A)" asynchronously result2 = pool.apply_async(solve2, [B]) # evaluate "solve2(B)" asynchronously answer1 = result1.get(timeout=10) answer2 = result2.get(timeout=10)
args = [A, B] results = pool.map(solve1, args)
Source : https://stackoverflow.com/questions/20548628/how-to-do-parallel-programming-in-python
Exemple 3
You can apply the function to every element in a list using the map() function:
list(map(square, [1, 2, 3, 4, 5, 6]))
The multiprocessing.pool.Pool class provides an equivalent but parallelized (via multiprocessing) way of doing this. The pool class, by default, creates one new process per CPU and does parallel calculations on the list:
from multiprocessing import Pool with Pool() as pool: pool.map(square, [1, 2, 3, 4, 5, 6])
Source : https://aaltoscicomp.github.io/python-for-scicomp/parallel/
Notes async asyncio await
Voir :
Async Queues :
A savoir :
- Vous pouvez uniquement utiliser
awaitdans les fonctions créées avecasync def - Une Coroutune déclarée avec
async defne peut être appelée que parawaitsi le résultat doit être attenducreate_task()(asyncio.create_task()ouasyncio.TaskGroup) pour ne pas attendre le résultatasyncio.gather()asyncio.run()
- “Coroutine” est juste un terme élaboré pour désigner ce qui est retourné par une fonction définie avec
async def. Python sait que c'est comme une fonction classique qui va démarrer à un moment et terminer à un autre, mais qu'elle peut aussi être mise en pause, du moment qu'il y a unawaitdans son contenu. time.sleep()(ainsi que d'autres lib/fonction) n'est pas compatible avecasyncio. A la place il faut utiliserasyncio.sleep()
Sleep
An important use for sleep in asyncio programs is to suspend the current task and allow other coroutines to execute.
It is important because although a task or coroutine can easily schedule new tasks via the create_task() or gather() function, the scheduled tasks will not begin executing until the current task is suspended.
Even sleeping for zero seconds is enough to suspend the current task and give an opportunity to other tasks to run.
For example:
# allow other tasks to run for a moment await asyncio.sleep(0)
Finally, a good use for sleep is to simulate blocking tasks in a concurrent program.
Autres
Ne pas quiter avant la fin du traitement des tâches
async def main(): # Create some tasks. for _ in range(10): asyncio.create_task(asyncio.sleep(10)) # Wait for all other tasks to finish other than the current task i.e. main(). await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()})
asyncio & threading
import asyncio import threading async def something_async(): print('something_async start in thread:', threading.current_thread()) await asyncio.sleep(1) print('something_async done in thread:', threading.current_thread()) def main(): t1 = threading.Thread(target=asyncio.run, args=(something_async(), )) t2 = threading.Thread(target=asyncio.run, args=(something_async(), )) t1.start() t2.start() t1.join() t2.join() if __name__ == '__main__': main()
Source : https://stackoverflow.com/questions/57234827/wait-for-async-function-to-complete
Autre
import time from random import randint period = 1 # Second def get_epoch_ms(): return int(time.time() * 1000.0) async def do_something(name): print("Start :", name, get_epoch_ms()) try: # Do something which may takes more than 1 secs. slp = randint(1, 5) print("Sleep :", name, get_epoch_ms(), slp) await asyncio.sleep(slp) except Exception as e: print("Error :", e) print("Finish :", name, get_epoch_ms()) loop = asyncio.get_event_loop() futures = [loop.create_task(do_something('T' + str(i))) for i in range(5)] #loop.run_forever() #for f in futures: # f.cancel() for f in futures: loop.run_until_complete(f)
Source : https://stackoverflow.com/questions/56318648/how-to-run-an-asyncio-task-without-awaiting
Thread safe
from collections import deque thread_safe_deque = deque() # Thread 1 thread_safe_deque.append(1) # Thread 2 element = thread_safe_deque.pop()
Source : https://www.cloudthat.com/resources/blog/writing-thread-safe-programs-in-python
Source : nsclient NSCP-0.8.0-x64/scripts/python
badapp.py
#! /usr/bin/env python3 import threading class BadThread(threading.Thread): id = -1 def __init__(self, id): self.id = id threading.Thread.__init__(self) def run(self): i = 0 while(True): i = i + 1 if i > 100000: print('Processing: %d'%self.id) i = 0 for x in range(1000): BadThread(x).start()
Notes python git paquet Debian deb
Si existe déjà un dossier debian
sudo apt-get install devscripts fakeroot
Si existe déjà un dossier debian Exemple avec http://edf-hpc.github.io/slurm-web/installation.html
debuild -us -uc
Si existe déjà un dossier debian Exemple avec https://github.com/gingergeeks/pyslurm
debuild -us -uc -b dpkg-buildpackage -rfakeroot -D -us -uc -b dpkg-buildpackage: source package python-pyslurm dpkg-buildpackage: source version 2.2.8-1 dpkg-buildpackage: source distribution unstable dpkg-buildpackage: source changed by Ramon Bastiaans <ramon.bastiaans@surfsara.nl> dpkg-source --before-build pyslurm dpkg-buildpackage: host architecture amd64 dpkg-checkbuilddeps: Unmet build dependencies: libslurm-dev python-dev dpkg-buildpackage: warning: build dependencies/conflicts unsatisfied; aborting dpkg-buildpackage: warning: (Use -d flag to override.) debuild: fatal error at line 1376: dpkg-buildpackage -rfakeroot -D -us -uc -b failed
sudo apt-get install -y libslurm-dev python-dev debuild -us -uc -b
Si pas de dossier debian
pip install --user make-deb export PATH=$PATH:~/.local/bin/
Ou voir equivs http://wiki.sc3.uis.edu.co/index.php/Slurm-web_Installation
