?
Dieses Dokument verwendet PHP-Handbuch für chinesische Websites Freigeben
Ruby給了你兩個基本的方法來組織你的程序,使它同時能運行自己的不同部分。你可以使用線程在程序內(nèi)部將任務(wù)分割,或者將任務(wù)分解為不同的程序,使用多進(jìn)程來運行。下面我們輪流看一下這兩種方法。
一般來說在Ruby中同時做兩件事情最簡單的是使用Ruby線程。線程在進(jìn)程中,由Ruby解釋器實現(xiàn)。這使得Ruby線程也能完全的可移至,因為它不需要依賴特定的操作系統(tǒng),但是這樣你也不能利用本地線程(native threads)的優(yōu)點了。你也許有過線程饑餓得經(jīng)驗(優(yōu)先級低的線程沒有機會運行)。也許你會遇到線程死鎖,整個進(jìn)程都被掛起?;蛘咭恍┚€程的某些操作占用了CPU的太多時間,以至于其它線程不得不等待。但是,不要被這些潛在的問題嚇倒,Ruby線程是使你程序并行運行的輕量而有效的方法。
創(chuàng)建一個新的線程十分簡單,下面的部分代碼并行的下載一些網(wǎng)頁,每次有請求調(diào)用,這段代碼都將產(chǎn)生一個獨立的線程處理HTTP傳輸。
require?'net/http' pages?=?%w(?www.rubycentral.com ????????????www.awl.com ????????????www.pragmaticprogrammer.com ???????????) threads?=?[] for?page?in?pages ??threads?<<?Thread.new(page)?{?|myPage| ????h?=?Net::HTTP.new(myPage,?80) ????puts?"Fetching:?#{myPage}" ????resp,?data?=?h.get('/',?nil?) ????puts?"Got?#{myPage}:??#{resp.message}" ??} end threads.each?{?|aThread|??aThread.join?} |
Fetching:?www.rubycentral.com Fetching:?www.awl.com Fetching:?www.pragmaticprogrammer.com Got?www.rubycentral.com:??OK Got?www.pragmaticprogrammer.com:??OK Got?www.awl.com:??OK |
讓我們更詳細(xì)的看看這段代碼,這里有一些新技巧在里面。
新線程用 Thread.new
創(chuàng)建,這個方法接收一個block作為線程中要運行的代碼,在我們的例子里面,這個block使用net/http庫從每個指定的站點抓取首頁,從我們打印出來的信息來看,這些抓取活動是同時進(jìn)行的。
當(dāng)我們創(chuàng)建線程的時候,我們將這個HTML頁面作為參數(shù)傳入,這個參數(shù)然后作為myPage參數(shù)傳給了block。為什么我們這么做而不是直接在block里面用page這個變量那?
一個線程共享了所有在它啟動之前已經(jīng)存在的所有全局變量,實例變量和局部變量。善意的人有時候會告訴你,共享有時候不一定是好事。在這個例子里面,3個線程將共享page變量,第一個線程啟動之后,page被設(shè)為http://www.rubycentral.com,在這個時候,創(chuàng)建線程的循環(huán)還沒有結(jié)束,第二次,page被設(shè)為http://www.awl.com,如果第一個線程還沒有使用page變量運行完畢,那么可能這個線程會使用page的新值。這個bug將很難被跟蹤發(fā)現(xiàn)。
但是,在線程塊中創(chuàng)建的局部變量的作用域只在創(chuàng)建它的線程里,而不能被其它線程共享。在我們的例子里面,變量myPage將在線程被創(chuàng)建時賦值,每個線程都有自己的myPage變量的拷貝。
另一個很微妙的地方是程序的最后一行,為什么我們調(diào)用所創(chuàng)每個建線程的join方法?
當(dāng)一個Ruby程序結(jié)束退出的時候,它會殺死所有的線程,而不管它們的狀態(tài)。但是,你可以通過線程的 Thread#join
方法,使得主程序在等待這個線程結(jié)束后再退出。調(diào)用它的線程將會被阻塞,直到給定的線程結(jié)束。通過調(diào)用3個線程的join方法,你可以確定這三個請求將會在主程序退出之前完成。
?除了join,還有其它幾個用于控制線程的方便的方法。首先,你可以用 Thread.current
來訪問當(dāng)前線程,你可以用 Thread.list
來取得所有線程列表,這個列表包括所有可運行或者已停止的線程。為了檢測一個線程的狀態(tài),可以用方法 Thread#status
和 Thread#alive?
。
另外,你可以使用 Thread#priority=
來設(shè)置線程的不同的優(yōu)先級別。高優(yōu)先級的將會先于低優(yōu)先級的線程執(zhí)行。
在上面我們已經(jīng)說過,一個線程可以訪問在它定義之前已經(jīng)存在的,在作用域范圍內(nèi)的變量,但是,在線程內(nèi)部定義的變量的作用域只在這個線程內(nèi)部有效。
但是,如果你想一個線程的變量能被其它線程訪問,包括主線程,該怎么辦呢?Ruby中的線程提供了一個功能,就是能夠按名字創(chuàng)建和訪問線程內(nèi)的局部變量。你可以簡單的把這個線程對象作為一個哈希,使用[ ]=方法寫這個對象的變量值,用 [ ]來讀它的值。在這個例子里,?每個線程記錄當(dāng)前變量count的值,把它存到線程的局部變量,名字(key)為mycount。(這里有競爭條件的出現(xiàn),但是我們還沒有談到同步問題,這里我們只是忽略它們。)
count?=?0 arr?=?[] 10.times?do?|i| ??arr[i]?=?Thread.new?{ ????sleep(rand(0)/10.0) ????Thread.current["mycount"]?=?count ????count?+=?1 ??} end arr.each?{|t|?t.join;?print?t["mycount"],?",?"?} puts?"count?=?#{count}" |
8,?0,?3,?7,?2,?1,?6,?5,?4,?9,?count?=?10 |
主線程等待每個子線程結(jié)束之后,打印出來每個線程得到的count的值。我們?nèi)藶榈淖屆總€線程在取得count值之前休眠隨機的時間,這只是為了增加點趣味而已。
如果一個線程拋出一個沒有被處理的異常,將會怎樣呢?這依賴于系統(tǒng)設(shè)置Thread.abort_on_exception?,這個設(shè)置在第384頁和387頁。
如果abort_on_exception
被設(shè)置為false,這也是默認(rèn)的缺省值,那么如果一個線程出現(xiàn)錯誤而沒有處理,則這個線程將會被殺死,其它沒有遇到異常的線程將繼續(xù)運行。在下面的例子里,編號為3的線程將產(chǎn)生一個異常,而不會輸出任何東西,但是,你仍然可以看到其它線程的輸出。
threads?=?[] 6.times?{?|i| ??threads?<<?Thread.new(i)?{ ????raise?"Boom!"?if?i?==?3 ????puts?i ??} } threads.each?{|t|?t.join?} |
01 2 45prog.rb:4:?Boom!?(RuntimeError) from?prog.rb:8:in?`join' from?prog.rb:8 from?prog.rb:8:in?`each' from?prog.rb:8 |
但是,如果將abort_on_exception
設(shè)為true,一個線程出現(xiàn)沒有捕獲(處理)的異常,則所有的線程將都被殺死,上面的例子,如果編號為3的線程出錯,所有后面的線程都被殺死,不會產(chǎn)生任何輸出。
Thread.abort_on_exception?=?true threads?=?[] 6.times?{?|i| ??threads?<<?Thread.new(i)?{ ????raise?"Boom!"?if?i?==?3 ????puts?i ??} } threads.each?{|t|?t.join?} |
01 2 prog.rb:5:?Boom!?(RuntimeError) from?prog.rb:7:in?`initialize' from?prog.rb:7:in?`new' from?prog.rb:7 from?prog.rb:3:in?`times' from?prog.rb:3 |
在一個設(shè)計良好的應(yīng)用程序中,你應(yīng)該讓線程只做自己改作的事情;在一個多線程環(huán)境中創(chuàng)建一個基于時間的系統(tǒng)一般來說不是一個好主意。
但是,有時候我們需要控制線程的運行。也許我們的自動點唱機有一個線程用來控制指示燈,我們希望在音樂停止播放的時候也停止指示燈。你也許在一個經(jīng)典的生產(chǎn)者-消費者關(guān)系中有兩個線程,一個消費者在生產(chǎn)者掛起的時候也必須掛起。
類Thread
提供了很多方法用來控制線程調(diào)度,調(diào)用 Thread.stop
能停止當(dāng)前線程,而
Thread#run
將使某個線程啟動運行,調(diào)用
Thread.pass
將告訴線程調(diào)度器去執(zhí)行另外一個線程。?
Thread#join
和
Thread#value
將使調(diào)用者掛起,直到這個線程結(jié)束。
我們可以用下面代碼來示范一下上面的特點。
t?=?Thread.new?{?sleep?.1;?Thread.pass;?Thread.stop;?} | ||
t.status |
? |
"sleep" |
t.run | ||
t.status |
? |
"run" |
t.run | ||
t.status |
? |
false |
但是,使用這些原始的方法來控制線程調(diào)度實現(xiàn)同步,不管怎么說,都可能會遇到競爭條件。如果你需要在線程中共享數(shù)據(jù),競爭條件將會一直存在并且給調(diào)試帶來麻煩。幸運的是,線程還有另一個工具:臨界區(qū)(critical section),使用它,我們能編寫一個安全的同步方案。
用來阻塞一個線程運行的低層的方法是使用全局的"線程臨界"(thread critical)條件。當(dāng)這個條件被設(shè)為true(用 Thread.critical=
方法)時,調(diào)度器將不會讓任何已經(jīng)存在地線程執(zhí)行,但是,它不會阻止新線程的建立和運行;一些特定的線程操作(比如停止或者殺死一個線程,在當(dāng)前線程中休眠,或者拋出一個異常)都會引起一個線程被調(diào)度,即使在臨界區(qū)之內(nèi)。
直接使用 Thread.critical=
雖然可行,但是它并不是很方便。幸運的是,Ruby自帶了很多其它選項,當(dāng)然,最好的兩個是thread庫模塊中的類Mutex和類ConditionVariable。關(guān)于它們的文檔從第457頁開始。
Mutex
是一個為對互斥地訪問某一共享對象而設(shè)計的一個簡單的信號量鎖。也就是說,在一個時候,只有一個線程能持有這個鎖。其它線程可以繼續(xù)等待直到這個鎖可用,或者立即返回一個錯誤不再繼續(xù)等待。
一個mutex經(jīng)常用于原子性的對一個共享對象進(jìn)行修改更新。假設(shè)我們需要更新一個事務(wù)中的兩個變量,比如下面的程序模擬增加兩個數(shù)的計數(shù)。這個更新假定是原子性的,外面的世界不可能看到這兩個數(shù)有不同的值。如果不使用互斥,則不能達(dá)到該目的。
count1?=?count2?=?0 | ||
difference?=?0 | ||
counter?=?Thread.new?do | ||
??loop?do | ||
????count1?+=?1 | ||
????count2?+=?1 | ||
??end | ||
end | ||
spy?=?Thread.new?do | ||
??loop?do | ||
????difference?+=?(count1?-?count2).abs | ||
??end | ||
end | ||
sleep?1 | ||
Thread.critical?=?1 | ||
count1 |
? |
184846 |
count2 |
? |
184846 |
difference |
? |
58126 |
這個例子顯示了在執(zhí)行的過程中count1和count2的值曾經(jīng)出現(xiàn)過不同,盡管最后還是一樣的。
幸運的是,我們可以用互斥來改善這個例子。require?'thread' mutex?=?Mutex.new count1?=?count2?=?0 difference?=?0 counter?=?Thread.new?do ??loop?do ????mutex.synchronize?do ??????count1?+=?1 ??????count2?+=?1 ????end ??end end spy?=?Thread.new?do ??loop?do ????mutex.synchronize?do ??????difference?+=?(count1?-?count2).abs ????end ??end end |
sleep?1 | ||
mutex.lock | ||
count1 |
? |
21192 |
count2 |
? |
21192 |
difference |
? |
0 |
通過把需要訪問共享數(shù)據(jù)的代碼放到muxtex的控制下,我們確保了數(shù)據(jù)的一致性。但不幸的是,你也從這些數(shù)字看到了,我們在經(jīng)受著性能上的損失。
有時候使用互斥(mutex )來保護(hù)對臨界數(shù)據(jù)的訪問并不能滿足要求,比如假設(shè)我們在一個臨界區(qū)內(nèi),但是你還需要等待一個特殊的資源,如果你的線程這時候為了等待這個資源而休眠,可能會導(dǎo)致其它線程不能釋放這個資源,因為它們都無法進(jìn)入這個臨界區(qū),原來的線程一直在鎖定著這個臨界區(qū)。你也許需要暫時的放棄對臨界區(qū)的控制,同時告訴其它線程你在等待某一資源。當(dāng)這個資源可用之后,你的線程同時需要重新得到對臨界區(qū)的控制權(quán)。
條件變量正是用在此處。一個條件變量是一個簡單的信號量,它關(guān)聯(lián)一個特定的資源,在臨界區(qū)的保護(hù)范圍內(nèi)使用。當(dāng)你需要一個資源而這個資源暫時不可用的時候,你等待一個條件變量,這個操作將放棄對這個條件變量所在互斥(臨界區(qū)?)的鎖定。當(dāng)其它線程發(fā)送信號告訴這個變量可用之后,原來的線程停止等待立即取得對臨界區(qū)的鎖定。
require?'thread' mutex?=?Mutex.new cv?=?ConditionVariable.new a?=?Thread.new?{ ??mutex.synchronize?{ ????puts?"A:?I?have?critical?section,?but?will?wait?for?cv" ????cv.wait(mutex) ????puts?"A:?I?have?critical?section?again!?I?rule!" ??} } puts?"(Later,?back?at?the?ranch...)" b?=?Thread.new?{ ??mutex.synchronize?{ ????puts?"B:?Now?I?am?critical,?but?am?done?with?cv" ????cv.signal ????puts?"B:?I?am?still?critical,?finishing?up" ??} } a.join b.join |
A:?I?have?critical?section,?but?will?wait?for?cv(Later,?back?at?the?ranch...) B:?Now?I?am?critical,?but?am?done?with?cv B:?I?am?still?critical,?finishing?up A:?I?have?critical?section?again!?I?rule! |
另一個同步機制的實現(xiàn),可以參考Ruby發(fā)布程序中l(wèi)ib文件夾下的文件 monitor.rb
和sync.rb。
有時候你可能需要把一個任務(wù)分成幾個進(jìn)程級別的子任務(wù),或者你需要運行一個另外的不使用Ruby寫的程序,沒關(guān)系,Ruby有好幾種方法能使你創(chuàng)建和管理其它獨立的進(jìn)程。
在Ruby中產(chǎn)生一個新的進(jìn)程有好幾種方法,最簡單的方法是運行一個命令并且等到它結(jié)束。你也許運行一些其它的獨立的命令,并且從主機得到返回的結(jié)果,Ruby提供了system方法和反引號方法。(反引號即"`")
system("tar?xzf?test.tgz") | ? | tar:?test.tgz:?Cannot?open:?No?such?file?or?directory\ntar: ?Error?is?not?recoverable:?exiting?now\ntar:?Child?returned?status?2\ntar: ?Error?exit?delayed?from?previous?errors\nfalse |
result?=?`date` | ||
result | ? | "Sun?Jun??9?00:08:50?CDT?2002\n" |
方法 Kernel::system
運行一個指定的命令,如果這個命令存在且正確的運行結(jié)束,這個方法返回true,否則返回false。如果這個命令運行失敗,你可以從全局變量$?得到這個命令的返回代碼。
但system也有一個問題,就是它所運行程序的輸出簡單的被指定到了你的程序的輸出,這可能不是你想要的。為了取得子進(jìn)程的標(biāo)準(zhǔn)輸出,你可以用反引號,比如上面例子的 `date`
。注意,你需要用 String#chomp
來去除返回結(jié)果最后的換行符。
這中方法對簡單的場合比較合適,我們只需要運行一個命令,然后取得它的返回結(jié)果。但是,很多時候我們都需要對進(jìn)程有更多的控制,比如我們需要和子進(jìn)程進(jìn)行會話,向它輸入數(shù)據(jù),并且從它取回數(shù)據(jù)。方法 IO.popen
正是具有這樣的作用。popen方法以一個子進(jìn)程來運行一個命令,并且把這個進(jìn)程的標(biāo)準(zhǔn)輸入和標(biāo)準(zhǔn)輸出綁定到Ruby的IO對象。向IO對象寫數(shù)據(jù),子進(jìn)程就可以從它的標(biāo)準(zhǔn)輸入讀取數(shù)據(jù),而子進(jìn)程輸出的數(shù)據(jù),也可以通過Ruby的IO對象讀出來。
pig?=?IO.popen("pig",?"w+") pig.puts?"ice?cream?after?they?go?to?bed" pig.close_write puts?pig.gets |
iceway?eamcray?afterway?eythay?ogay?otay?edbay |
這個例子看起來很簡單,打開這個管道(pipe),寫入一個短語,然后讀取返回結(jié)果。但是pig程序并不會立即將它寫的東西flush。假如上面例子中,如果pig.puts 后面緊跟pig.gets的話,程序?qū)⒈粧炱?,pig程序處理了我們的輸入,但是返回結(jié)果卻一直不會被寫到管道。我們必須在這兩行之間插入 pig.close_write
,這將給pig的標(biāo)準(zhǔn)輸入發(fā)送一個文件結(jié)束標(biāo)志(end-of-file),然后我們需要的結(jié)果就會返回。
popen方法還有另外一些注意事項。如果指定的命令是一個減號("-"),Ruby將強迫產(chǎn)生一個新的Ruby解釋器,它將和原來的解釋器一起運行。原來的解釋器進(jìn)程將得到一個IO對象作為返回結(jié)果,而子解釋器將得到nil。
pipe?=?IO.popen("-","w+") if?pipe ??pipe.puts?"Get?a?job!" ??$stderr.puts?"Child?says?'#{pipe.gets.chomp}'" else ??$stderr.puts?"Dad?says?'#{gets.chomp}'" ??puts?"OK" end |
Dad?says?'Get?a?job!' Child?says?'OK' |
除了popen方法,傳統(tǒng)的Unix調(diào)用 Kernel::fork
, IO.pipe
和 Kernel::exec
也可以在支持它們的系統(tǒng)上使用。許多IO方法和 Kernel::open
也能產(chǎn)生新的子進(jìn)程,使用方法是將文件名前面加上一個豎線``|
'' 。注意你不能用 File.new
來產(chǎn)生一個子進(jìn)程,這個方法只是用于文件。
有時候我們并不需要這樣處理:我們只想把產(chǎn)生的子進(jìn)程賦給一個變量,然后繼續(xù)處理自己的事務(wù)。一段時間以后,我們也許還需要一下這個進(jìn)程是否結(jié)束。比如,我們需要從主程序分離一個需要很長運行時間的外部排序:
exec("sort?testfile?>?output.txt")?if?fork?==?nil #?The?sort?is?now?running?in?a?child?process #?carry?on?processing?in?the?main?program #?then?wait?for?the?sort?to?finish Process.wait |
系統(tǒng)調(diào)用 Kernel::fork
在父進(jìn)程中返回fork產(chǎn)生的子進(jìn)程id,在子進(jìn)程中返回nil,所以,上面例子中子進(jìn)程將調(diào)用 Kernel::exec
來運行一個外部的排序。一段時間以后,我們使用 Process::wait
,這將等待排序完成,然后返回這個進(jìn)程的id。(pid)
如果你需要在子進(jìn)程退出后通知父進(jìn)程(而不是等待子進(jìn)程結(jié)束),可以用 Kernel::trap
來對返回的信號進(jìn)行處理。比如這里我們建立了一個用于捕獲SIGCLD
信號的trap,這個信號的意思是“子進(jìn)程結(jié)束(死亡)”
trap("CLD")?{ ??pid?=?Process.wait ??puts?"Child?pid?#{pid}:?terminated" ??exit } exec("sort?testfile?>?output.txt")?if?fork?==?nil #?do?other?stuff... |
Child?pid?31842:?terminated |
IO.popen
也能像
File.open
那樣接受一個block。通過傳遞一個參數(shù)給 popen
作為一個命令,比如 date
,然后,這個block將得到一個IO對象作為參數(shù)。
IO.popen?("date")?{?|f|?puts?"Date?is?#{f.gets}"?} |
Date?is?Sun?Jun??9?00:08:50?CDT?2002 |
這個IO對象將會在BLOCK結(jié)束之后自動關(guān)閉,就如同 File.open
一樣。
如果你給 Kernel::fork
提供一個block,那么這些block中的代碼將在Ruby的子進(jìn)程中運行,而父進(jìn)程在block結(jié)束后繼續(xù)運行。
fork?do ??puts?"In?child,?pid?=?#$$" ??exit?99 end pid?=?Process.wait puts?"Child?terminated,?pid?=?#{pid},?exit?code?=?#{$??>>?8}" |
In?child,?pid?=?31849 Child?terminated,?pid?=?31849,?exit?code?=?99 |
最后一個問題,為什么我們子進(jìn)程的返回代碼 $?
要右移8位?這是Posix系統(tǒng)的特點,退出代碼(exit code)的低8位是程序結(jié)束的原因,高8位才是真正的退出代碼。