Ruby多線程
傳統程序有一個單獨的線程執行,包含該程序的語句或指令順序執行直到程序終止。
一個多線程的程序有多個線程的執行。在每個線程是按順序執行的,但是在多核CPU機器上線程可能並行地執行。例如,通常情況下在單一CPU的機器,多個線程實際上不是並行執行的,而是模擬並行交叉的線程的執行。
Ruby的可以使用 Thread 類很容易地編寫多線程程序。 Ruby線程是一個輕量級的和高效的在代碼中實現並行性。
創建Ruby線程:
要啟動一個新線程,關聯一個塊通過調用Thread.new。將創建一個新的線程執行的代碼塊,原始線程將立即從Thread.new返回並繼續執行下一個語句:
# Thread #1 is running here Thread.new { # Thread #2 runs this code } # Thread #1 runs this code
例如:
這裡是一個例子說明,我們如何能夠利用多線程的Ruby的程序。
#!/usr/bin/ruby def func1 i=0 while i<=2 puts "func1 at: #{Time.now}" sleep(2) i=i+1 end end def func2 j=0 while j<=2 puts "func2 at: #{Time.now}" sleep(1) j=j+1 end end puts "Started At #{Time.now}" t1=Thread.new{func1()} t2=Thread.new{func2()} t1.join t2.join puts "End at #{Time.now}"
這將產生以下結果:
Started At Wed May 14 08:21:54 -0700 2008 func1 at: Wed May 14 08:21:54 -0700 2008 func2 at: Wed May 14 08:21:54 -0700 2008 func2 at: Wed May 14 08:21:55 -0700 2008 func1 at: Wed May 14 08:21:56 -0700 2008 func2 at: Wed May 14 08:21:56 -0700 2008 func1 at: Wed May 14 08:21:58 -0700 2008 End at Wed May 14 08:22:00 -0700 2008
線程的生命周期:
創建一個新的線程用 Thread.new。也可以使用了同義詞用 Thread.Start 和 Thread.fork。
冇有必要啟動一個線程在它被創建後,它會自動開始運行時,CPU 資源成為可用。
Thread 類定義了一些方法來查詢和處理的線程在運行時。運行一個線程塊中的代碼調用Thread.new,然後它停止運行。
該塊中的最後一個表達式的值是線程的值,可以通過調用 Thread對象值的方法。如果線程運行完成,則該值為線程的返回值。否則,該值方法會阻塞不會返回,直到該線程已完成。
可以等待一個特定的線程通過調用該線程的Thread.Join方法來完成。調用線程將被阻塞,直到給定線程完成。
線程和異常:
如果在主線程中引發一個異常,並冇有任何地方處理,Ruby解釋器打印一條消息並退出。在主線程以外的其他線程,未處理的異常導致線程停止運行。
如果線程 t 退出,因為未處理的異常,而另一個線程調用t.join或t.value,那麼所發生的異常在 t 中提出的線程 s。
如果 Thread.abort_on_exception 為 false,默認情況下,出現未處理的異常隻是殺死當前線程和所有其餘的繼續運行。
如果想在任何線程中的任何未處理的異常導致解釋退出中,設置類方法Thread.abort_on_exception 為 true。
t = Thread.new { ... } t.abort_on_exception = true
線程變量:
一個線程可以正常訪問是在範圍內的任何變量的線程被創建時。一個線程塊的局部變量是線程的局部,而不是共享。
Thread類提供一個特殊的功能,允許通過名稱來創建和存取線程局部變量。隻需把線程對象,如果它是一個Hash,寫入元素使用[] =和讀取他們帶回使用[]。
在這個例子中,每個線程記錄計數變量的當前值與該鍵mycount的一個threadlocal變量。
#!/usr/bin/ruby count = 0 arr = [] 10.times do |i| arr[i] = Thread.new { sleep(rand(0)/10.0) Thread.current["mycount"] = count count += 1 } end arr.each {|t| t.join; print t["mycount"], ", " } puts "count = #{count}"
這將產生下麵的結果:
8, 0, 3, 7, 2, 1, 6, 5, 4, 9, count = 10
主線程等待子線程完成,然後打印出每個捕獲count的值。
線程優先級:
影響線程調度的第一因素,是線程的優先級:高優先級線程之前計劃的低優先級的線程。更確切地說,一個線程將隻獲得CPU時間,如果冇有更高優先級的線程等待運行。
可以設置和查詢一個Ruby線程對象的優先級=和優先級的優先級。新創建的線程開始在相同的優先級的線程創建它。啟動主線程優先級為0。
冇有任何方法設置線程優先級在開始運行前。然而,一個線程可以提高或降低自己的優先級的第一次操作。
線程排斥:
如果兩個線程共享訪問相同的數據,至少有一個線程修改數據,你必須要特彆小心,以確保任何線程都不能看到數據處於不一致的狀態。這稱為線程排除。
Mutex類是一些共享資源的互斥訪問,實現了一個簡單的信號鎖定。即,隻有一個線程可持有的鎖在給定時間。其他線程可能選擇排隊等候的鎖變得可用,或者可以簡單地選擇立即得到錯誤,表示鎖定不可用。
通過將所有訪問共享數據的互斥體的控製下,我們確保一致性和原子操作。我們的嘗試例子,第一個無需mutax,第二個使用mutax:
無需Mutax的例子:
#!/usr/bin/ruby require 'thread' count1 = count2 = 0 difference = 0 counter = Thread.new do loop do count1 += 1 count2 += 1 end end spy = Thread.new do loop do difference += (count1 - count2).abs end end sleep 1 puts "count1 : #{count1}" puts "count2 : #{count2}" puts "difference : #{difference}"
這將產生以下結果:
count1 : 1583766 count2 : 1583766 difference : 637992
#!/usr/bin/ruby require 'thread' mutex = Mutex.new count1 = count2 = 0 difference = 0 counter = Thread.new do loop do mutex.synchronize do count1 += 1 count2 += 1 end end end spy = Thread.new do loop do mutex.synchronize do difference += (count1 - count2).abs end end end sleep 1 mutex.lock puts "count1 : #{count1}" puts "count2 : #{count2}" puts "difference : #{difference}"
這將產生以下結果:
count1 : 696591 count2 : 696591 difference : 0
處理死鎖:
當我們開始使用互斥對象的線程排除,我們必須小心地避免死鎖。死鎖的情況發生時,所有線程正在等待獲取另一個線程持有的資源。因為所有的線程被阻塞,他們不能釋放其所持有的鎖。因為他們可以不釋放鎖,其它線程不能獲得這些鎖。
一個條件變量僅僅是一個信號,與資源相關聯,並用於特定互斥鎖的保護範圍內的。當需要一個資源不可用,等待一個條件變量。這一行動釋放相應的互斥鎖。當一些其他線程發送信號的資源是可用的,原來的線程來等待,並同時恢複上的鎖臨界區。
例子:
#!/usr/bin/ruby require 'thread' mutex = Mutex.new cv = ConditionVariable.new a = Thread.new { mutex.synchronize { puts "A: I have critical section, but will wait for cv" cv.wait(mutex) puts "A: I have critical section again! I rule!" } } puts "(Later, back at the ranch...)" b = Thread.new { mutex.synchronize { puts "B: Now I am critical, but am done with cv" cv.signal puts "B: I am still critical, finishing up" } } a.join b.join
這將產生以下結果:
A: I have critical section, but will wait for cv (Later, back at the ranch...) B: Now I am critical, but am done with cv B: I am still critical, finishing up A: I have critical section again! I rule!
線程狀態:
有五種可能的返回值對應於下表中所示的5個可能的狀態。該的狀態方法返回的線程狀態。
線程狀態 | 返回值 |
---|---|
Runnable | run |
Sleeping | Sleeping |
Aborting | aborting |
Terminated normally | false |
Terminated with exception | nil |
Thread類的方法:
Thread類提供以下方法,它們適用程序的所有線程。這些方法它們使用Thread類的名稱來調用,如下所示:
Thread.abort_on_exception = true
這裡是所有類方法的完整列表:
SN | 方法描述 |
---|---|
1 |
Thread.abort_on_exception Returns the status of the global abort on exception condition. The default is false. When set to true, will cause all threads to abort (the process will exit(0)) if an exception is raised in any thread. |
2 |
Thread.abort_on_exception= When set to true, all threads will abort if an exception is raised. Returns the new state. |
3 |
Thread.critical Returns the status of the global thread critical condition. |
4 |
Thread.critical= Sets the status of the global thread critical condition and returns it. When set to true, prohibits scheduling of any existing thread. Does not block new threads from being created and run. Certain thread operations (such as stopping or killing a thread, sleeping in the current thread, and raising an exception) may cause a thread to be scheduled even when in a critical section. |
5 |
Thread.current Returns the currently executing thread. |
6 |
Thread.exit Terminates the currently running thread and schedules another thread to be run. If this thread is already marked to be killed, exit returns the Thread. If this is the main thread, or the last thread, exit the process. |
7 |
Thread.fork { block } Synonym for Thread.new . |
8 |
Thread.kill( aThread ) Causes the given aThread to exit |
9 |
Thread.list Returns an array of Thread objects for all threads that are either runnable or stopped. Thread. |
10 |
Thread.main Returns the main thread for the process. |
11 |
Thread.new( [ arg ]* ) {| args | block } Creates a new thread to execute the instructions given in block, and begins running it. Any arguments passed to Thread.new are passed into the block. |
12 |
Thread.pass Invokes the thread scheduler to pass execution to another thread. |
13 |
Thread.start( [ args ]* ) {| args | block } Basically the same as Thread.new . However, if class Thread is subclassed, then calling start in that subclass will not invoke the subclass's initialize method. |
14 |
Thread.stop Stops execution of the current thread, putting it into a sleep state, and schedules execution of another thread. Resets the critical condition to false. |
線程實例方法:
這些方法是適用於一個線程的一個實例。這些方法將被調用,使用一個線程的一個實例如下:
#!/usr/bin/ruby thr = Thread.new do # Calling a class method new puts "In second thread" raise "Raise exception" end thr.join # Calling an instance method join
這裡是所有實例方法的完整列表:
SN | 方法及描述 |
---|---|
1 |
thr[ aSymbol ] Attribute Reference - Returns the value of a thread-local variable, using either a symbol or a aSymbol name. If the specified variable does not exist, returns nil. |
2 |
thr[ aSymbol ] = Attribute Assignment - Sets or creates the value of a thread-local variable, using either a symbol or a string. |
3 |
thr.abort_on_exception Returns the status of the abort on exception condition for thr. The default is false. |
4 |
thr.abort_on_exception= When set to true, causes all threads (including the main program) to abort if an exception is raised in thr. The process will effectively exit(0). |
5 |
thr.alive? Returns true if thr is running or sleeping. |
6 |
thr.exit Terminates thr and schedules another thread to be run. If this thread is already marked to be killed, exit returns the Thread. If this is the main thread, or the last thread, exits the process. |
7 |
thr.join The calling thread will suspend execution and run thr. Does not return until thr exits. Any threads not joined will be killed when the main program exits. |
8 |
thr.key? Returns true if the given string (or symbol) exists as a thread-local variable. |
9 |
thr.kill Synonym for Thread.exit . |
10 |
thr.priority Returns the priority of thr. Default is zero; higher-priority threads will run before lower priority threads. |
11 |
thr.priority= Sets the priority of thr to an Integer. Higher-priority threads will run before lower priority threads. |
12 |
thr.raise( anException ) Raises an exception from thr. The caller does not have to be thr. |
13 |
thr.run Wakes up thr, making it eligible for scheduling. If not in a critical section, then invokes the scheduler. |
14 |
thr.safe_level Returns the safe level in effect for thr. |
15 |
thr.status Returns the status of thr: sleep if thr is sleeping or waiting on I/O, run if thr is executing, false if thr terminated normally, and nil if thr terminated with an exception. |
16 |
thr.stop? Returns true if thr is dead or sleeping. |
17 |
thr.value Waits for thr to complete via Thread.join and returns its value. |
18 |
thr.wakeup Marks thr as eligible for scheduling, it may still remain blocked on I/O, however. |