Channel 是 Go 的一種,它與 GoRoutine 一起為 Go 提供併發技術,在開發中被廣泛使用。 Go 鼓勵人們通過通道在 goroutine 之間傳遞對資料的引用(就像將資料的所有者從乙個 goroutine 傳遞到另乙個 goroutine),而 Effective Go 總結了這句話:
do not communicate by sharing memory; instead, share memory by communicating.Go 記憶體模型指出了通道作為併發控制的乙個特性
a send on a channel happens before the corresponding receive from that channel completes. (golang spec)除了goroutine之間共享資料的正常安全傳輸外,通道還可以發揮很多技巧(模式),本文列舉了一些通道的應用模式。
促成本文誕生的主要因素包括:
Eapache 的 Channels Library、Go 中的併發、Francesc Campoy 的 JustForfun 系列、關於合併 Channel 實現、我在 Scala Collections Handbook 中對 Scala Collections 的啟發,讓我們以這個模式為例。
我們知道,GO的標準庫sync
是的mutex
,但是可以用作鎖mutex
但它沒有實現trylock
方法。
我們是為了trylock
定義是當前的 goroutine 正在嘗試獲取乙個鎖,如果成功,則獲取該鎖,返回 true,否則返回 false。 我們可以使用這種方法來避免當前的 goroutine 在獲取鎖時被阻塞。
本來這是乙個常用的功能,在一些其他程式語言中實現,那麼為什麼沒有在 Go 中實現呢? 正如第 6123 期中詳細討論的那樣,在我看來,go 核心組的成員本身並不熱衷於這個功能,並認為同樣的方式可以通過渠道實現。 實際上,對於標準庫sync.mutex
新增此功能很容易,方法是通過hack
道路:mutex
實現trylock
特徵。
const mutexlocked = 1 “在上面的**中還有乙個額外的這主要是通過利用通道邊界情況下的阻塞功能來實現的。islocked
但是,方法並不常用,因為查詢和鎖定方法不是原子操作,並且此方法可能可用於除錯和日誌記錄。由於標準庫尚未準備就緒
mutex
讓我們看看如何使用頻道,而不是使用頻道。type mutex struct }func newmutex() mutex , 1)} mu.ch <-struct{}func (m *mutex) lock() func (m *mutex) unlock() default: panic("unlock of unlocked mutex") }func (m *mutex) trylock() bool return false}func (m *mutex) islocked() bool
您還可以將快取的大小從 1 更改為 n 以處理 n 個鎖(資源)。
有時候,當我們拿到一把鎖時,由於競爭,當這個鎖被另乙個 goroutine 擁有時,當前的 goroutine 沒有辦法立即拿到鎖,只能阻塞和等待。 標準庫不提供等待超時的功能,我們嘗試實現它。
type mutex struct }func newmutex() mutex , 1)} mu.ch <-struct{}func (m *mutex) lock() func (m *mutex) unlock() default: panic("unlock of unlocked mutex") }func (m *mutex) trylock(timeout time.duration) bool return false}func (m *mutex) islocked() bool你也可以使用它
context
轉換,不是使用超時,而是使用context
若要取消獲取鎖的超時,此作業留給讀取器來實現。
當您等待多個訊號時,如果接收到任何乙個訊號,則執行業務邏輯,忽略尚未接收到的其他訊號。
例如,如果我們向提供相同服務的 n 個節點傳送請求,只要任何乙個服務節點返回乙個結果,我們就可以執行以下業務邏輯,其他 n-1 個節點的請求都可以被取消或忽略。 當 n=2 時,就是這樣back request
模式。 這允許以增加延遲為代價來交換資源。
應該注意的是,當接收到任何乙個訊號時,所有其他訊號都將被忽略。如果您使用乙個通道,只要您從任何通道接收單個資料,就可以關閉所有通道(取決於您的實現,但輸出通道肯定會關閉)。
有三種方法可以做到這一點:goroutine、reflect 和 recursion。
func or(chans ..chan interface{})chan interface{} go func() case <-out: }c) }return out}
or
該函式可以處理 n 個通道,它為每個通道啟動乙個 goroutine,一旦任何 goroutine 從通道讀取資料,輸出通道就會關閉。
為避免同時關閉輸出通道的問題,關斷操作僅執行一次。
Go 的反射庫有專用資料(用於 select 語句reflect.selectcase
) 和函式 (reflect.select
)處理。
因此,我們可以使用反射來“隨機”接收來自一組可選通道的資料並關閉輸出通道。
這樣看起來更簡潔。
func or(channels ..chan interface{})chan interface{} ordone := make(chan interface{})go func() reflect.select(cases) }return ordone}遞迴方法一直都是開竅實現,接下來的方法就是分而治之的方法,逐步合併通道,最終返回乙個通道。
func or(channels ..chan interface{})chan interface{} ordone := make(chan interface{})go func() default: m := len(channels) / 2 select }return ordone}在後面的扇入(merge)模式下,我們仍然會使用相同的遞迴模式來合併多個輸入通道,這比goroutines更有效率,並根據JustForfun的測試結果進行反映。
這是我們經常使用的一種模式,使用訊號通道(done)來控制(取消)輸入通道的處理。
一旦從完成通道讀取訊號,或者完成通道關閉,輸入通道的處理就會被取消。
此模式提供了一種將完成通道和輸入通道合併為輸出通道的簡單方法。
func ordone(done <-chan struct{},c <-chan interface{})chan interface{} go func() select }return valstream}Fanin模式是將多個相同型別的輸入通道合併為乙個相同型別的輸出通道,即通道的合併。
每個通道都有乙個 goroutine。
func fanin(chans ..chan interface{})chan interface{} go func() wg.done() c) }wg.wait() close(out) }return out}利用反射庫對 select 語句的處理來合併輸入通道。
下面的實現其實還是有點問題,輸入通道讀取更均勻的時候效果更好,否則效能會降低。
func faninreflect(chans ..chan interface{})chan interface{} go func() for len(cases) >0 out <-v.interface() return out}雖然這種方法不直觀,但效能還是不錯的(遞迴電平不會高,在輸入通道不是很大的時候也不會成為瓶頸)。
func faninrec(chans ..chan interface{})chan interface{} close(c) return c case 1: return chans[0] case 2: return mergetwo(chans[0], chans[1]) default: m := len(chans) / 2 return mergetwo( faninrec(chans[:m]..faninrec(chans[m:].func mergetwo(a, b <-chan interface{})chan interface{} go func() c <-v case v, ok := <-b: if !ok c <-v } return c}扇出模式是將輸入通道扇出到多個通道中。
扇出行為至少可分為兩種型別:
從輸入通道讀取一條資料併發送到每個輸入通道,這種模式稱為 T 型,從輸入通道讀取一條資料,從輸出通道中選擇乙個通道傳送 本節只介紹第一種情況,下一節介紹第二種情況。
讀取值被傳送到每個輸出通道,非同步模式會導致大量的 goroutine。
func fanout(ch <-chan interface{},out chan interface{},async bool) for v := range ch ()else }在此模式下,一旦輸出通道被阻塞,可能會導致後續處理延遲。
func fanoutreflect(ch <-chan interface{},out chan interface{})cases := make(reflect.selectcase, len(out)) for i := range cases for v := range ch for _ = range cases }分配模式將從輸入通道讀取的值傳送到其中乙個輸出通道。
Roundrobin 選擇輸出通道的方式。
func fanout(ch <-chan interface{},out chan interface{})// roundrobin var i = 0 var n = len(out) for v := range ch }利用隨機選擇的發射。
func fanoutreflect(ch <-chan interface{},out chan interface{})cases := make(reflect.selectcase, len(out)) for i := range cases for v := range ch _= reflect.select(cases) }eapache 通道提供了一些將模式應用於通道的方法,例如上面的扇入和扇出模式。 因為圍棋本身的通道已經無法再延伸了
eapache/channels
該庫定義了自己的通道介面,並提供方便的通道轉換。
eapache/channels
提供了四種方法:
distribute:從輸入通道讀取值並將其傳送到其中乙個輸出通道。 當輸入通道關閉時,輸出通道被關閉:從輸入通道讀取值併發送到所有輸出通道。 當輸入通道關閉時,輸出通道在多路復用中關閉:輸入通道合併為乙個輸出通道,當所有輸入都關閉時輸出關閉 offpipe:還為上述四個功能提供了兩個通道的串weakxxx
,輸入關閉,輸出未關閉。
讓我們看乙個對應函式的示例。
func testdist() channels.distribute(a, outputs[0], outputs[1], outputs[2], outputs[3]) outputs[0], outputs[1], outputs[2], outputs[3]) go func() a.close() for i := 0; i < 6; i++ var j int select fmt.printf("channel#%d: %d", j, v) }
func testtee() channels.tee(a, outputs[0], outputs[1], outputs[2], outputs[3]) outputs[0], outputs[1], outputs[2], outputs[3]) go func() a.close() for i := 0; i < 20; i++ var j int select fmt.printf("channel#%d: %d", j, v) }
func testmulti() channels.multiplex(a, inputs[0], inputs[1], inputs[2], inputs[3]) inputs[0], inputs[1], inputs[2], inputs[3]) go func() for i := range inputs }for v := range a.out()
func testpipe() a.close() for v := range b.out()從通道行為的角度來看,它看起來很像資料流,因此我們可以實現類似 scala 集合的東西。
Scala 的集合類提供了廣泛的操作(方法),但其他程式語言或框架也提供了類似的方法,例如 Apache Spark、J**a Stream、Reactivex 等。
下面列舉了一些方法的一些實現,相信經過一些人的深入研究,相關的方法可以變成乙個好的類庫,但現在我們來看一些例子。
skip 函式是在頻道開始讀取之前跳過頻道中的一些資料。
skipnskipn 跳過前 n 個資料。
func skipn(done <-chan struct{},valuestream <-chan interface{},num int)func skipfn(done <-chan struct{},valuestream <-chan interface{},fn func(interface{})bool)func skipwhile(done <-chan struct{}, valuestream <-chan interface{},fn func(interface{})bool) takentaken 讀取前 n 個資料。Map 和 Reduce 是一組常見的操作。func taken(done <-chan struct{},valuestream <-chan interface{},num int)func takefn(done <-chan struct{},valuestream <-chan interface{},fn func(interface{})bool)func takewhile(done <-chan struct{}, valuestream <-chan interface{},fn func(interface{})bool) 如果輸入是乙個通道,並且通道中的資料仍然是同一型別的通道,那麼 flat 將返回乙個輸出通道, 輸出通道中的資料就是輸入通道中的資料。
它與扇入不同,在扇入中,輸入通道在呼叫時是固定的,並以陣列形式提供,而 flat 的輸入是可以在執行時新增到通道中的通道。
func ordone(done <-chan struct{},c <-chan interface{})chan interface{} go func() select }return valstream}func flat(done <-chan struct{},chanstream <-chan <-chan interface{})chan interface{} go func() select stream = maybestream case <-done: return } for val := range ordone(done, stream) return valstream}
地圖將乙個通道對映到另乙個通道,通道的型別可以不同。
func mapchan(in <-chan interface{},fn func(interface{})interface{})chan interface{} if in == nil go func() return out}因為
map
是 go 的關鍵字,因此我們不能將函式型別命名為map
,此處使用mapchan
鑑於。
例如,您可以處理公司中的員工工資渠道,並輸出扣除稅後的員工工資渠道。
func reduce(in <-chan interface{},fn func(r, v interface{})interface{})interface{} out := <-in for v := range in return out}您可以:
reduce
實現sum
max
min
和其他聚合操作。
本文列出了一些深入的通道應用模式,相信通過閱讀本文,可以更深入地了解go的通道型別,並在開發中靈活應用通道。 也歡迎您在評論中為頻道提出更多應用模式建議。
所有這些都可以在 GitHub 上找到:Smallnest Channels。