1 測量函數運行時間
import time
def profile(func):
def wrapper(*args, **kwargs):
import time
start = time.time()
func(*args, **kwargs)
end = time.time()
print 'COST: {}'.format(end - start)
return wrapper
@profile
def fib(n):
if n<= 2:
return 1
return fib(n-1) + fib(n-2)
fib(35)
2 啟動多個線程,並等待完成 2.1 使用threading.enumerate()
import threading
for i in range(2):
t = threading.Thread(target=fib, args=(35,))
t.start()
main_thread = threading.currentThread()
for t in threading.enumerate():
if t is main_thread:
continue
t.join()
2.2 先保存啟動的線程
threads = []3 使用信號量,限制同時能有幾個線程訪問臨界區
for i in range(5):
t = Thread(target=foo, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
from threading import Semaphore
import time
sema = Semaphore(3)
def foo(tid):
with sema:
print('{} acquire sema'.format(tid))
wt = random() * 2
time.sleep(wt)
print('{} release sema'.format(tid))
4 鎖,相當於信號量為1的情況
from threading import Thread Lock
value = 0
lock = Lock()
def getlock():
global lock
with lock:
new = value + 1
time.sleep(0.001)
value = new
5 可重入鎖RLock acquire() 可以不被阻塞的被同一個線程調用多次,release()需要和acquire()調用次數匹配才能釋放鎖 6 條件 Condition 一個線程發出信號,另一個線程等待信號 常用於生產者-消費者模型
import time
import threading
def consumer(cond):
t = threading.currentThread()
with cond:
cond.wait()
print("{}: Resource is available to sonsumer".format(t.name))
def producer(cond):
t = threading.currentThread()
with cond:
print("{}: Making resource available".format(t.name))
cond.notifyAll()
condition = threading.Condition()
c1 = threading.Thread(name='c1', target=consumer, args=(condition,))
c2 = threading.Thread(name='c2', target=consumer, args=(condition,))
p = threading.Thread(name='p', target=producer, args=(condition,))
c1.start()
c2.start()
p.start()
7 事件 Event 感覺和Condition 差不多
import time
import threading
from random import randint
TIMEOUT = 2
def consumer(event, l):
t = threading.currentThread()
while 1:
event_is_set = event.wait(TIMEOUT)
if event_is_set:
try:
integer = l.pop()
print '{} popped from list by {}'.format(integer, t.name)
event.clear() # 重置事件狀態
except IndexError: # 為了讓剛啟動時容錯
pass
def producer(event, l):
t = threading.currentThread()
while 1:
integer = randint(10, 100)
l.append(integer)
print '{} appended to list by {}'.format(integer, t.name)
event.set() # 設置事件
time.sleep(1)
event = threading.Event()
l = []
threads = []
for name in ('consumer1', 'consumer2'):
t = threading.Thread(name=name, target=consumer, args=(event, l))
t.start()
threads.append(t)
p = threading.Thread(name='producer1', target=producer, args=(event, l))
p.start()
threads.append(p)
for t in threads:
t.join()
8 線程隊列 線程隊列有task_done() 和 join() 標準庫里的例子 往隊列內放結束標誌,注意do_work阻塞可能無法結束,需要用超時
import queue
def worker():
while True:
item = q.get()
if item is None:
break
do_work(item)
q.task_done()
q = queue.Queue()
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
for item in source():
q.put(item)
q.join()
for i in range(num_worker_threads):
q.put(None)
for t in threads:
t.join()
9 優先級隊列 PriorityQueue
import threading
from random import randint
from queue import PriorityQueue
q = PriorityQueue()
def double(n):
return n * 2
def producer():
count = 0
while 1:
if count > 5:
break
pri = randint(0, 100)
print('put :{}'.format(pri))
q.put((pri, double, pri)) # (priority, func, args)
count += 1
def consumer():
while 1:
if q.empty():
break
pri, task, arg = q.get()
print('[PRI:{}] {} * 2 = {}'.format(pri, arg, task(arg)))
q.task_done()
time.sleep(0.1)
t = threading.Thread(target=producer)
t.start()
time.sleep(1)
t = threading.Thread(target=consumer)
t.start()
10 線程池 當線程執行相同的任務時用線程池 10.1 multiprocessing.pool 中的線程池
from multiprocessing.pool import ThreadPool
pool = ThreadPool(5)
pool.map(lambda x: x**2, range(5))
10.2 multiprocessing.dummy
from multiprocessing.dummy import Pool
10.3 concurrent.futures.ThreadPoolExecutor
from concurrent.futures improt ThreadPoolExecutor
from concurrent.futures import as_completed
import urllib.request
URLS = ['http://www.baidu.com', 'http://www.hao123.com']
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
with ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
execpt Exception as exc:
print("%r generated an exception: %s" % (url, exc))
else:
print("%r page is %d bytes" % (url, len(data)))
11 啟動多進程,等待多個進程結束
import multiprocessing
jobs = []
for i in range(2):
p = multiprocessing.Process(target=fib, args=(12,))
p.start()
jobs.append(p)
for p in jobs:
p.join()
12 進程池 12.1 multiprocessing.Pool
from multiprocessing import Pool
pool = Pool(2)
pool.map(fib, [36] * 2)
12.2 concurrent.futures.ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import math
PRIMES = [ 112272535095293, 112582705942171]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
if __name__ == "__main__":
with ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print("%d is prime: %s" % (number, prime))
13 asyncio 13.1 最基本的示例,單個任務
import asyncio
async def hello():
print("Hello world!")
await asyncio.sleep(1)
print("Hello again")
loop = asyncio.get_event_loop()
loop.run_until_complete(hello())
loop.close()
13.2 最基本的示例,多個任務
import asyncio
async def hello():
print("Hello world!")
await asyncio.sleep(1)
print("Hello again")
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
13.3 結合httpx 執行多個任務並接收返回結果 httpx 接口和 requests基本一致
import asyncio
import httpx
async def get_url():
r = await httpx.get("http://www.baidu.com")
return r.status_code
loop = asyncio.get_event_loop()
tasks = [get_url() for i in range(10)]
results = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
for num, result in zip(range(10), results):
print(num, result)
本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【精選推薦文章】
如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!
想要讓你的商品在網路上成為最夯、最多人討論的話題?
網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線
不管是台北網頁設計公司、台中網頁設計公司,全省皆有專員為您服務
想知道最厲害的台北網頁設計公司推薦、台中網頁設計公司推薦專業設計師"嚨底家"!!
Orignal From: Python 併發總結,多線程,多進程,異步IO
留言
張貼留言