




func init() { 	go func() { 		ticker := time.NewTicker(time.Second) 		defer ticker.Stop() 		for tm := range ticker.C {  			t := uint64(tm.Unix()) 			atomic.StoreUint64(&currentTimestamp, t) 		} 	}() }  var currentTimestamp = uint64(time.Now().Unix())  // UnixTimestamp returns the current unix timestamp in seconds. // // It is faster than time.Now().Unix() func UnixTimestamp() uint64 { 	return atomic.LoadUint64(&currentTimestamp) } 





slice := []int{1,2,3,4,5,6,7,8,9,10} fmt.Println(unsafe.Sizeof(slice)) //24 
type Key struct { 	Part interface{} 	Offset uint64 }  func (k *Key) hashUint64() uint64 { 	buf := (*[unsafe.Sizeof(*k)]byte)(unsafe.Pointer(k)) 	return xxhash.Sum64(buf[:]) } 



str := "1231445" arr := []byte{1, 2, 3} arr = append(arr, str...) 



func int64ToByteSlice(a []int64) (b []byte) {    sh := (*reflect.SliceHeader)(unsafe.Pointer(&b))    sh.Data = uintptr(unsafe.Pointer(&a[0]))    sh.Len = len(a) * int(unsafe.Sizeof(a[0]))    sh.Cap = sh.Len    return } 



// WaitGroup wraps sync.WaitGroup and makes safe to call Add/Wait // from concurrent goroutines. // // An additional limitation is that call to Wait prohibits further calls to Add // until return. type WaitGroup struct { 	sync.WaitGroup 	mu sync.Mutex }  // Add registers n additional workers. Add may be called from concurrent goroutines. func (wg *WaitGroup) Add(n int) { 	wg.WaitGroup.Add(n) }  // Wait waits until all the goroutines call Done. // // Wait may be called from concurrent goroutines. // // Further calls to Add are blocked until return from Wait. func (wg *WaitGroup) Wait() { 	wg.WaitGroup.Wait() }  // WaitAndBlock waits until all the goroutines call Done and then prevents // from new goroutines calling Add. // // Further calls to Add are always blocked. This is useful for graceful shutdown // when other goroutines calling Add must be stopped. // // wg cannot be used after this call. func (wg *WaitGroup) WaitAndBlock() { 	wg.WaitGroup.Wait()  	// Do not unlock, so other goroutines calling Add are blocked. }  // There is no need in wrapping WaitGroup.Done, since it is already goroutine-safe. 



// Get returns a timer for the given duration d from the pool. // // Return back the timer to the pool with Put. func Get(d time.Duration) *time.Timer { 	if v := timerPool.Get(); v != nil { 		t := v.(*time.Timer) 		if t.Reset(d) { 			logger.Panicf("BUG: active timer trapped to the pool!") 		} 		return t 	} 	return time.NewTimer(d) }  // Put returns t to the pool. // // t cannot be accessed after returning to the pool. func Put(t *time.Timer) { 	if !t.Stop() { 		// Drain t.C if it wasn't obtained by the caller yet. 		select { 		case <-t.C: 		default: 		} 	} 	timerPool.Put(t) }  var timerPool sync.Pool 


victoriaMetrics的vminsert作为vmagentvmstorage之间的组件,接收vmagent的流量并将其转发到vmstorage。在vmstorage卡死、处理过慢或下线的情况下,有可能会导致无法转发流量,进而造成vminsert CPU和内存飙升,造成组件故障。为了防止这种情况,vminsert使用了限速器,当接收到的流量激增时,可以在牺牲一部分数据的情况下保证系统的稳定性。


Limit the number of conurrent f calls in order to prevent from excess memory usage and CPU thrashing

限速器使用了两个参数:maxConcurrentInsertsmaxQueueDuration,前者给出了突发情况下可以处理的最大请求数,后者给出了某个请求的最大超时时间。需要注意的是Do(f func() error)是异步执行的,而ch又是全局的,因此会异步等待其他请求释放资源(struct{})。

可以看到限速器使用了指标来指示当前的限速状态。同时使用cgroup.AvailableCPUs()*4 (即runtime.GOMAXPROCS(-1)*4)来设置默认的maxConcurrentInserts长度。


var ( 	maxConcurrentInserts = flag.Int("maxConcurrentInserts", cgroup.AvailableCPUs()*4, "The maximum number of concurrent inserts. Default value should work for most cases, "+ 		"since it minimizes the overhead for concurrent inserts. This option is tigthly coupled with -insert.maxQueueDuration") 	maxQueueDuration = flag.Duration("insert.maxQueueDuration", time.Minute, "The maximum duration for waiting in the queue for insert requests due to -maxConcurrentInserts") )  // ch is the channel for limiting concurrent calls to Do. var ch chan struct{}  // Init initializes concurrencylimiter. // // Init must be called after flag.Parse call. func Init() { 	ch = make(chan struct{}, *maxConcurrentInserts) //初始化limiter,最大突发并行请求量为maxConcurrentInserts }  // Do calls f with the limited concurrency. func Do(f func() error) error { 	// Limit the number of conurrent f calls in order to prevent from excess 	// memory usage and CPU thrashing. 	select { 	case ch <- struct{}{}: //在channel中添加一个元素,表示开始处理一个请求 		err := f() //阻塞等大请求处理结束 		<-ch //请求处理完之后释放channel中的一个元素,释放出的空间可以用于处理下一个请求 		return err 	default: 	}      //如果当前达到处理上限maxConcurrentInserts,则需要等到其他Do(f func() error)释放资源。 	// All the workers are busy. 	// Sleep for up to *maxQueueDuration. 	concurrencyLimitReached.Inc() 	t := timerpool.Get(*maxQueueDuration) //获取一个timer,设置等待超时时间为 maxQueueDuration 	select { 	case ch <- struct{}{}: //在maxQueueDuration时间内等待其他请求释放资源,如果获取到资源,则回收timer,继续处理 		timerpool.Put(t) 		err := f() 		<- 		return err 	case <-t.C: //在maxQueueDuration时间内没有获取到资源,定时器超时后回收timer,丢弃请求并返回错误信息 		timerpool.Put(t) 		concurrencyLimitTimeout.Inc() 		return &httpserver.ErrorWithStatusCode{ 			Err: fmt.Errorf("cannot handle more than %d concurrent inserts during %s; possible solutions: "+ 				"increase `-insert.maxQueueDuration`, increase `-maxConcurrentInserts`, increase server capacity", *maxConcurrentInserts, *maxQueueDuration), 			StatusCode: http.StatusServiceUnavailable, 		} 	} }  var ( 	concurrencyLimitReached = metrics.NewCounter(`vm_concurrent_insert_limit_reached_total`) 	concurrencyLimitTimeout = metrics.NewCounter(`vm_concurrent_insert_limit_timeout_total`)  	_ = metrics.NewGauge(`vm_concurrent_insert_capacity`, func() float64 { 		return float64(cap(ch)) 	}) 	_ = metrics.NewGauge(`vm_concurrent_insert_current`, func() float64 { 		return float64(len(ch)) 	}) ) 



// PaceLimiter throttles WaitIfNeeded callers while the number of Inc calls is bigger than the number of Dec calls. // // It is expected that Inc is called before performing high-priority work, // while Dec is called when the work is done. // WaitIfNeeded must be called inside the work which must be throttled (i.e. lower-priority work). // It may be called in the loop before performing a part of low-priority work. type PaceLimiter struct { 	mu          sync.Mutex 	cond        *sync.Cond 	delaysTotal uint64 	n           int32 }  // New returns pace limiter that throttles WaitIfNeeded callers while the number of Inc calls is bigger than the number of Dec calls. func New() *PaceLimiter { 	var pl PaceLimiter 	pl.cond = sync.NewCond(& 	return &pl }  // Inc increments pl. func (pl *PaceLimiter) Inc() { 	atomic.AddInt32(&pl.n, 1) }  // Dec decrements pl. func (pl *PaceLimiter) Dec() { 	if atomic.AddInt32(&pl.n, -1) == 0 { 		// Wake up all the goroutines blocked in WaitIfNeeded, 		// since the number of Dec calls equals the number of Inc calls. 		pl.cond.Broadcast() 	} }  // WaitIfNeeded blocks while the number of Inc calls is bigger than the number of Dec calls. func (pl *PaceLimiter) WaitIfNeeded() { 	if atomic.LoadInt32(&pl.n) <= 0 { 		// Fast path - there is no need in lock. 		return 	} 	// Slow path - wait until Dec is called. 	for atomic.LoadInt32(&pl.n) > 0 { 		pl.delaysTotal++ 		pl.cond.Wait() 	} }  // DelaysTotal returns the number of delays inside WaitIfNeeded. func (pl *PaceLimiter) DelaysTotal() uint64 { 	n := pl.delaysTotal 	return n } 
