golang倒腾一款简配的具有请求排队功能的并发受限服务器

golang官方指南给了一些代码片段来,层层递进演示了信道的能力:

1>. 信号量
2>. 限流能力

var sem = make(chan int, MaxOutstanding)    func Serve(queue chan *Request) {     for req := range queue {         req:= req         sem <- 1            go func() {   // 只会开启MaxOutstanding个并发协程             process(req)             <-sem         }()     } } 

上面出现了两个信道:

sem 提供了限制服务端并发处理请求的信号量
queue 提供了一个客户端请求队列,起媒介/解耦的作用


进一步指南给出了信道的另一个用法:
3>. 解多路复用

多路复用是网络编程中一个耳熟能详的概念,nginx redis等高性能web、内存kv都用到了这个技术 。

这个解多路复用是怎么理解呢?

离散/独立/并发的客户端请求被服务端Serve收敛之后, Serve就起到了多路复用的概念,在Request定义resultChan信道,就给每个客户端请求提供了独立获取请求结果的能力,这便是一种解多路复用。


从实际效果看这就是常见的互联网web服务器:一款具备请求排队功能的并发限流服务器

golang倒腾一款简配的具有请求排队功能的并发受限服务器

官方指南并没有完整实现客户端和服务器端工程。

下面是我的工程化实现, 记录下实践中遇到的问题。

并发受限服务器

  • 信道queue接收客户端请求,解耦客户端和服务器,天然具备排队能力
  • 信号量信道sem提供了并发受限的能力
  • 服务器处理完,向解多路复用信道req.resultChan写入响应结果。
/* 实现一个有请求队列功能的并发请求受限服务器*/  package main  import ( 	"fmt" 	"sync" 	"time" )  var sem = make(chan int, Maxoutstanding)  var wg2 sync.WaitGroup  func server(queue chan *Request) { 	fmt.Printf("Server is already, listen req n")  	for req := range queue { 		req := req 		sem <- 1  		wg2.Add(1) 		go func() { 			defer wg2.Done() 			process(req) 			<-sem 		}() 	} }  func process(req *Request) { 	s := sum(req.args) 	req.resultChan <- s } func sum(a []int) (s int) { 	for i := 1; i <= a[0]; i++ { 		s += i 	} 	time.Sleep(time.Millisecond * 20) 	return s }  

time.Sleep模拟服务器处理请求单次耗时20ms, 输出数字的累加,
eg: input: 100;
output: 1+100/2*100 =5050

wg2 sync.WaitGroup是一个动态活跃的Goroutine计数器,注意用法和位置,wg2的作用是:等待所有请求处理完成。

并发客户端请求

for循环开启并发客户端请求,

  • 每个请求入驻一个独立的Goroutine,独立向信道queue投递请求和接收响应
package main  import ( 	"fmt" 	"sync" )  type Request struct { 	args       []int 	resultChan chan int }  var wg1 sync.WaitGroup  func clients() { 	fmt.Printf("start %d concurrency client requestn ", concurrencyClients) 	for i := 1; i <= concurrencyClients; i++ { 		r := &Request{ 			args:       []int{i}, 			resultChan: make(chan int), 		} 		wg1.Add(1) 		go ClientReq(r) 	} 	wg1.Wait()   }  func ClientReq(r *Request) { 	defer wg1.Done() 	queue <- r 	go func() { 		res := <-r.resultChan 		fmt.Printf("current args is %d, the result is %d n", r.args[0], res) 	}() } 

wg1 WaitGroup的目的是确保所有的客户端请求都已经发出,之后客户端任务结束,所以此处我们新开Goroutine处理响应结果(这里又有闭包的参与)。

工程化

工程化代码的先后顺序,决定了代码是否死锁。
server需要处于监听状态,故先启动。

本处clients在主协程整体上是同步发送,如果放在clients()的后面,clients内的wg1可能会有部分请求Goroutine阻塞在信道queue且没法唤醒 运行时会检测到报死锁。

package main  import ( 	"fmt" 	"time" )  var concurrencyClients = 1000 var queueLength = 100 var queue = make(chan *Request, queueLength) // 请求队列长度 var Maxoutstanding int = 10                  // 服务器并发受限10  func main() {  	go server(queue) 	var start = time.Now()  	clients() // 确保所有的请求都已经发出去  	wg2.Wait() // 确保服务器处理完所有的请求 	fmt.Printf("客户端并发%d请求,服务器请求队列长度%d,服务器限流%d,总共耗时%d ms n", concurrencyClients, queueLength, Maxoutstanding, time.Since(start).Milliseconds()) } 

上面出现了3个配置变量
1>. 客户端并发请求数量concurrencyClients=100
2>. 服务器排队队列长度queueLength, 会作用到信道queue=50
3>. 服务器并发受限阈值Maxoutstanding=10

start 1000 concurrency client request  Server is already, listen req  current args is 14, the result is 105  current args is 2, the result is 3  current args is 3, the result is 6  current args is 1, the result is 1  current args is 4, the result is 10  current args is 8, the result is 36  current args is 6, the result is 21  current args is 12, the result is 78  current args is 5, the result is 15  current args is 7, the result is 28  current args is 18, the result is 171  current args is 16, the result is 136  current args is 15, the result is 120  current args is 20, the result is 210  current args is 19, the result is 190  current args is 13, the result is 91  current args is 21, the result is 231  current args is 10, the result is 55  current args is 17, the result is 153  current args is 9, the result is 45  current args is 22, the result is 253  current args is 28, the result is 406  current args is 27, the result is 378  current args is 11, the result is 66  current args is 26, the result is 351  current args is 30, the result is 465  current args is 23, the result is 276  current args is 25, the result is 325  current args is 29, the result is 435  current args is 24, the result is 300  current args is 31, the result is 496  current args is 34, the result is 595  current args is 38, the result is 741  current args is 36, the result is 666  current args is 41, the result is 861  current args is 32, the result is 528  current args is 35, the result is 630  current args is 33, the result is 561  current args is 37, the result is 703  current args is 39, the result is 780  current args is 52, the result is 1378  current args is 46, the result is 1081  current args is 47, the result is 1128  current args is 49, the result is 1225  current args is 45, the result is 1035  current args is 43, the result is 946  current args is 48, the result is 1176  current args is 40, the result is 820  current args is 42, the result is 903  current args is 44, the result is 990  current args is 59, the result is 1770  current args is 55, the result is 1540  current args is 53, the result is 1431  current args is 57, the result is 1653  current args is 51, the result is 1326  current args is 54, the result is 1485  current args is 50, the result is 1275  current args is 56, the result is 1596  current args is 58, the result is 1711  current args is 60, the result is 1830  current args is 66, the result is 2211  current args is 63, the result is 2016  current args is 70, the result is 2485  current args is 62, the result is 1953  current args is 61, the result is 1891  current args is 65, the result is 2145  current args is 67, the result is 2278  current args is 64, the result is 2080  current args is 68, the result is 2346  current args is 69, the result is 2415  current args is 76, the result is 2926  current args is 77, the result is 3003  current args is 71, the result is 2556  current args is 80, the result is 3240  current args is 75, the result is 2850  current args is 74, the result is 2775  current args is 73, the result is 2701  current args is 72, the result is 2628  current args is 78, the result is 3081  current args is 81, the result is 3321  current args is 89, the result is 4005  current args is 83, the result is 3486  current args is 88, the result is 3916  current args is 82, the result is 3403  current args is 79, the result is 3160  current args is 86, the result is 3741  current args is 84, the result is 3570  current args is 90, the result is 4095  current args is 85, the result is 3655  ......  current args is 981, the result is 481671  current args is 978, the result is 478731  current args is 982, the result is 482653  current args is 970, the result is 470935  current args is 979, the result is 479710  current args is 980, the result is 480690  current args is 983, the result is 483636  current args is 989, the result is 489555  current args is 986, the result is 486591  current args is 987, the result is 487578  current args is 985, the result is 485605  current args is 977, the result is 477753  current args is 988, the result is 488566  current args is 992, the result is 492528  current args is 976, the result is 476776  current args is 984, the result is 484620  current args is 995, the result is 495510  current args is 999, the result is 499500  current args is 1000, the result is 500500  current args is 990, the result is 490545  客户端并发1000请求,服务器请求队列长度100,服务器限流10,总共耗时2099 ms   

读者可以随意调整3个参数的大小,来感受服务器调参的魅力。

并发客户端请求数concurrencyClients 服务器请求队列queueLength 服务器限流阈值 Maxoutstanding 耗时ms
1000 100 10 2067
1000 100 50 454
1000 100 100 210
1000 300 10 2082
1000 500 10 2071
3000 100 10 6259
5000 500 10 10516

完整代码传送门

That’s All,本文根据golang有关信道的指南, 实现了一个带有请求队列功能的首先服务器, 巩固了信道、WaitGroup的用法。

发表评论

评论已关闭。

相关文章