有一個(gè)循環(huán)里面套循環(huán)的模式,
在內(nèi)循環(huán)的循環(huán)體內(nèi)要同時(shí)用到大循環(huán)和小循環(huán)的變量。
我這里是簡(jiǎn)化成了一個(gè)簡(jiǎn)單的模型,
這種模式如果函數(shù)復(fù)雜的話速度超級(jí)慢,
想問(wèn)一下如何使用多進(jìn)程的辦法來(lái)解決速度問(wèn)題?
我的思路是,只對(duì)小循環(huán)采用多進(jìn)程,
在大循環(huán)的循環(huán)體內(nèi)寫(xiě)多進(jìn)程的代碼,
但是一直失敗,
求大神給出正確的代碼。
拜謝!
import random as r
list1=list(range(100))
i=0
reslist=[]
while i<2000:#大循環(huán)
alist=[]#三個(gè)列表變量,每次循環(huán)開(kāi)始時(shí)清空
blist=[]
clist=[]
for each in list1:#小循環(huán)
x=r.randint(i+30,i+60)+each#涉及到大、小循環(huán)變量的幾個(gè)函數(shù),這里用random示意
y=r.randint(i+60,i+120)+each
z=r.randint(i+60,i+180)+each
res=2.5*x-y-z
reslist.append(res)#對(duì)函數(shù)結(jié)果進(jìn)行操作
if res>=50:
alist.append(each)
if -50<res<50:
blist.append(each)
if res<=-50:
clist.append(each)
for each in alist:#在大循環(huán)中對(duì)小循環(huán)中得出的結(jié)果進(jìn)行進(jìn)一步其他操作
print(each)
for each in blist:
print(each)
for each in clist:
print(each)
i+=1
首先,并行計(jì)算需要各個(gè)并行運(yùn)算的子程序間沒(méi)有相互因果關(guān)系。
小循環(huán)內(nèi),res與x,y,z,與alist,blist,clist,都是因果關(guān)系密切的,很難拆分并行計(jì)算。
題主貼上來(lái)的雖然不是原始代碼,不知道原始代碼里大循環(huán)間有沒(méi)有因果關(guān)系,不過(guò)從示意代碼來(lái)看,
把大循環(huán)拆分為N個(gè)線程(用不到進(jìn)程吧)應(yīng)該是可以的,每個(gè)線程計(jì)算2000/N次。
例如,分為8個(gè)線程,線程1計(jì)算i=0到249,線程2計(jì)算i=250到499,依次類推。。。
這里N的大小,可以根據(jù)CPU的核數(shù)來(lái)定,如果N超過(guò)CPU的核數(shù),就沒(méi)有太大意義了,反而有可能會(huì)降低效率。
可以在大循環(huán)這里開(kāi)多進(jìn)程,比如大循環(huán)2000次,如CPU的核數(shù)是4,則開(kāi)4個(gè)進(jìn)程,每個(gè)進(jìn)程負(fù)責(zé)運(yùn)行500個(gè)
小循環(huán)結(jié)束后,可以開(kāi)子線程去執(zhí)行下面的這些后續(xù)操作,大循環(huán)繼續(xù)往前處理
for each in alist:#在大循環(huán)中對(duì)小循環(huán)中得出的結(jié)果進(jìn)行進(jìn)一步其他操作
print(each)
for each in blist:
print(each)
for each in clist:
print(each)
可以將小循環(huán)用子進(jìn)程去處理 不過(guò)這樣 你需要兩個(gè)大循環(huán)。一個(gè)循環(huán)處理小循環(huán) ,等處理完這個(gè)循環(huán)在來(lái)個(gè)大循環(huán)處理后面的事情
像這樣
import random as r
def cumput(i, list1):
alist = []
blist = []
clist = []
reslist = []
for each in list1: # 小循環(huán)
x = r.randint(i + 30, i + 60) + each # 涉及到大、小循環(huán)變量的幾個(gè)函數(shù),這里用random示意
y = r.randint(i + 60, i + 120) + each
z = r.randint(i + 60, i + 180) + each
res = 2.5 * x - y - z
reslist.append(res) # 對(duì)函數(shù)結(jié)果進(jìn)行操作
if res >= 50:
alist.append(each)
if -50 < res < 50:
blist.append(each)
if res <= -50:
clist.append(each)
return alist, blist, clist, reslist
if __name__ == '__main__':
multiprocessing.freeze_support()
list1 = list(range(100))
i = 0
pool = multiprocessing.Pool(2)
res = {}
while i < 2000: # 大循環(huán)
res[i]=pool.apply_async(cumput, (i, list1,))
i += 1
pool.close()
pool.join()
for i in res:
for each in res[i].get()[0]: # 在大循環(huán)中對(duì)小循環(huán)中得出的結(jié)果進(jìn)行進(jìn)一步其他操作
print(each)
for each in res[i].get()[1]:
print(each)
for each in res[i].get()[2]:
print(each)
如果小循環(huán)中執(zhí)行的函數(shù)比較耗時(shí)的話可以考慮生產(chǎn)者-消費(fèi)者模型
import random
from threading import Thread
from Queue import Queue
resqueue = Queue()
aqueue = Queue()
bqueue = Queue()
cqueue = Queue()
def producer():
list1=list(range(100))
for _ in range(2000):
for each in list1:
x=r.randint(i+30,i+60)+each
y=r.randint(i+60,i+120)+each
z=r.randint(i+60,i+180)+each
res=2.5*x-y-z
resqueue.put(res)
if res>=50:
aqueue.put(each)
if -50<res<50:
bqueue.put(each)
if res<=-50:
cqueue.put(each)
def consumer_a():
while True:
try:
data = aqueue.get(timeout=5)
except Queue.Empty:
return
else:
# 耗時(shí)操作
deal_data(data)
aqueue.task_done()
def consumer_b():
while True:
try:
data = bqueue.get(timeout=5)
except Queue.Empty:
return
else:
# 耗時(shí)操作
deal_data(data)
bqueue.task_done()
def consumer_c():
while True:
try:
data = cqueue.get(timeout=5)
except Queue.Empty:
return
else:
# 耗時(shí)操作
deal_data(data)
cqueue.task_done()
def consumer_res():
while True:
try:
data = resqueue.get(timeout=5)
except Queue.Empty:
return
else:
# 耗時(shí)操作
deal_data(data)
resqueue.task_done()
if __name__ == "__main__":
t1 = Thread(target=producer)
t2 = Thread(target=consumer_a)
...
t1.start()
t2.start()
題主是不是應(yīng)該先設(shè)計(jì)好進(jìn)程的輸入與輸出,多進(jìn)程做并行計(jì)算的話進(jìn)程之間的通信是最重要的,據(jù)我了解的應(yīng)該是MPI,比如多層循環(huán),應(yīng)該是先分發(fā)部分?jǐn)?shù)據(jù)到每個(gè)進(jìn)程,每個(gè)進(jìn)程做計(jì)算后再返回?cái)?shù)據(jù)整合點(diǎn),然后合并結(jié)果輸出。
還有一個(gè)比較重要的點(diǎn)是估算每個(gè)進(jìn)程的執(zhí)行時(shí)間,畢竟有進(jìn)程間的通信的話等待時(shí)間也會(huì)導(dǎo)致效率下降。
@一代鍵客 所說(shuō),你的嵌套不太符合并行計(jì)算的輸入規(guī)則,可以看看這個(gè)例子
http://blog.csdn.net/zouxy09/...
之前測(cè)試過(guò)文中的例子,沒(méi)啥問(wèn)題,你沿著這些做的話應(yīng)該是可以搞出來(lái)的