跳到主要內容

Python 併發總結,多線程,多進程,異步IO

1 測量函數運行時間
import time 
def profile(func):
def wrapper(*args, **kwargs):
import time
start
= time.time()
func(
*args, **kwargs)
end
= time.time()
print 'COST: {}'.format(end - start)
return wrapper

@profile
def fib(n):
if n<= 2:
return 1
return fib(n-1) + fib(n-2)

fib(
35)

 

2 啟動多個線程,並等待完成   2.1 使用threading.enumerate()
import threading 
for i in range(2):
t
= threading.Thread(target=fib, args=(35,))
t.start()
main_thread
= threading.currentThread()

for t in threading.enumerate():
if t is main_thread:
continue
t.join()

 

2.2 先保存啟動的線程
threads = [] 
for i in range(5):
t
= Thread(target=foo, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
  3 使用信號量,限制同時能有幾個線程訪問臨界區
from threading import Semaphore 
import time

sema
= Semaphore(3)

def foo(tid):
with sema:
print('{} acquire sema'.format(tid))
wt
= random() * 2
time.sleep(wt)
print('{} release sema'.format(tid))

 

4 鎖,相當於信號量為1的情況
from threading import Thread Lock 
value
= 0
lock
= Lock()
def getlock():
global lock
with lock:
new
= value + 1
time.sleep(
0.001)
value
= new

 

  5 可重入鎖RLock     acquire() 可以不被阻塞的被同一個線程調用多次,release()需要和acquire()調用次數匹配才能釋放鎖 6 條件 Condition 一個線程發出信號,另一個線程等待信號 常用於生產者-消費者模型
import time 
import threading

def consumer(cond):
t
= threading.currentThread()
with cond:
cond.wait()
print("{}: Resource is available to sonsumer".format(t.name))

def producer(cond):
t
= threading.currentThread()
with cond:
print("{}: Making resource available".format(t.name))
cond.notifyAll()

condition
= threading.Condition()
c1
= threading.Thread(name='c1', target=consumer, args=(condition,))
c2
= threading.Thread(name='c2', target=consumer, args=(condition,))
p
= threading.Thread(name='p', target=producer, args=(condition,))

c1.start()
c2.start()
p.start()

 

  7 事件 Event 感覺和Condition 差不多
import time 
import threading
from random import randint

TIMEOUT
= 2

def consumer(event, l):
t
= threading.currentThread()
while 1:
event_is_set
= event.wait(TIMEOUT)
if event_is_set:
try:
integer
= l.pop()
print '{} popped from list by {}'.format(integer, t.name)
event.clear()
# 重置事件狀態
except IndexError: # 為了讓剛啟動時容錯
pass

def producer(event, l):
t
= threading.currentThread()
while 1:
integer
= randint(10, 100)
l.append(integer)
print '{} appended to list by {}'.format(integer, t.name)
event.set()
# 設置事件
time.sleep(1)

event
= threading.Event()
l
= []

threads
= []

for name in ('consumer1', 'consumer2'):
t
= threading.Thread(name=name, target=consumer, args=(event, l))
t.start()
threads.append(t)

p
= threading.Thread(name='producer1', target=producer, args=(event, l))
p.start()
threads.append(p)


for t in threads:
t.join()

 

  8 線程隊列  線程隊列有task_done() 和 join() 標準庫里的例子 往隊列內放結束標誌,注意do_work阻塞可能無法結束,需要用超時
import queue 
def worker():
while True:
item
= q.get()
if item is None:
break
do_work(item)
q.task_done()
q
= queue.Queue()
threads
= []
for i in range(num_worker_threads):
t
= threading.Thread(target=worker)
t.start()
threads.append(t)
for item in source():
q.put(item)
q.join()
for i in range(num_worker_threads):
q.put(None)
for t in threads:
t.join()

 

  9 優先級隊列 PriorityQueue
import threading 
from random import randint
from queue import PriorityQueue

q
= PriorityQueue()

def double(n):
return n * 2

def producer():
count
= 0
while 1:
if count > 5:
break
pri
= randint(0, 100)
print('put :{}'.format(pri))
q.put((pri, double, pri))
# (priority, func, args)
count += 1

def consumer():
while 1:
if q.empty():
break
pri, task, arg
= q.get()
print('[PRI:{}] {} * 2 = {}'.format(pri, arg, task(arg)))
q.task_done()
time.sleep(
0.1)

t
= threading.Thread(target=producer)
t.start()
time.sleep(
1)
t
= threading.Thread(target=consumer)
t.start()

 

  10 線程池 當線程執行相同的任務時用線程池 10.1 multiprocessing.pool 中的線程池
from multiprocessing.pool import ThreadPool 
pool
= ThreadPool(5)
pool.map(
lambda x: x**2, range(5))

 

10.2 multiprocessing.dummy
from multiprocessing.dummy import Pool

 

10.3 concurrent.futures.ThreadPoolExecutor
from concurrent.futures improt ThreadPoolExecutor 
from concurrent.futures import as_completed
import urllib.request

URLS
= ['http://www.baidu.com', 'http://www.hao123.com']

def load_url(url, timeout):
with urllib.request.urlopen(url, timeout
=timeout) as conn:
return conn.read()

with ThreadPoolExecutor(max_workers
=5) as executor:
future_to_url
= {executor.submit(load_url, url, 60): url for url in URLS}
for future in as_completed(future_to_url):
url
= future_to_url[future]
try:
data
= future.result()
execpt Exception as exc:
print("%r generated an exception: %s" % (url, exc))
else:
print("%r page is %d bytes" % (url, len(data)))

 

11 啟動多進程,等待多個進程結束
import multiprocessing 
jobs
= []
for i in range(2):
p
= multiprocessing.Process(target=fib, args=(12,))
p.start()
jobs.append(p)
for p in jobs:
p.join()

 

12 進程池 12.1 multiprocessing.Pool
from multiprocessing import Pool 
pool
= Pool(2)
pool.map(fib, [
36] * 2)

 

  12.2 concurrent.futures.ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor 
import math

PRIMES
= [ 112272535095293, 112582705942171]

def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n
= int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True

if __name__ == "__main__":
with ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print("%d is prime: %s" % (number, prime))

 

  13 asyncio   13.1 最基本的示例,單個任務
import asyncio 

async
def hello():
print("Hello world!")
await asyncio.sleep(
1)
print("Hello again")

loop
= asyncio.get_event_loop()
loop.run_until_complete(hello())
loop.close()

 

13.2 最基本的示例,多個任務
import asyncio 

async
def hello():
print("Hello world!")
await asyncio.sleep(
1)
print("Hello again")

loop
= asyncio.get_event_loop()
tasks
= [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

 

  13.3 結合httpx 執行多個任務並接收返回結果 httpx 接口和 requests基本一致
import asyncio 
import httpx


async
def get_url():
r
= await httpx.get("http://www.baidu.com")
return r.status_code


loop
= asyncio.get_event_loop()
tasks
= [get_url() for i in range(10)]
results
= loop.run_until_complete(asyncio.gather(*tasks))
loop.close()


for num, result in zip(range(10), results):
print(num, result)

 

   本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【精選推薦文章】



如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!



想要讓你的商品在網路上成為最夯、最多人討論的話題?



網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線



不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務



想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師"嚨底家"!!



Orignal From: Python 併發總結,多線程,多進程,異步IO

留言