type Context interface { Deadline() (deadline time.Time, ok bool) Done() <-chan struct{} Err() error Value(key interface{}) interface{} }
// Done is provided for use in select statements: // // Stream generates values with DoSomething and sends them to out // // until DoSomething returns an error or ctx.Done is closed. // func Stream(ctx context.Context, out chan<- Value) error { // for { // v, err := DoSomething(ctx) // if err != nil { // return err // } // select { // case <-ctx.Done(): // return ctx.Err() // case out <- v: // } // } // } // See https://blog.golang.org/pipelines for more examples of how to use // a Done channel for cancellation.
// // Package user defines a User type that's stored in Contexts. // package user // import "context" // // User is the type of value stored in the Contexts. // type User struct {...} // // // key is an unexported type for keys defined in this package. // // This prevents collisions with keys defined in other packages. // type key int // // userKey is the key for user.User values in Contexts. It is // // unexported; clients use user.NewContext and user.FromContext // // instead of using this key directly. // var userKey key // // NewContext returns a new Context that carries value u. // func NewContext(ctx context.Context, u *User) context.Context { // return context.WithValue(ctx, userKey, u) // } // // FromContext returns the User value stored in ctx, if any. // func FromContext(ctx context.Context) (*User, bool) { // u, ok := ctx.Value(userKey).(*User) // return u, ok // }
value, ok := x.(T) x 表示一个接口的类型,T 表示一个具体的类型(也可为接口类型) 该断言表达式会返回 x 的值(也就是 value)和一个布尔值(也就是 ok),可根据该布尔值判断 x 是否为 T 类型: 如果 T 是具体某个类型,类型断言会检查 x 的动态类型是否等于具体类型 T。如果检查成功,类型断言返回的结果是 x 的动态值,其类型是 T。 如果 T 是接口类型,类型断言会检查 x 的动态类型是否满足 T。如果检查成功,x 的动态值不会被提取,返回值是一个类型为 T 的接口值。 无论 T 是什么类型,如果 x 是 nil 接口值,类型断言都会失败。
// An emptyCtx is never canceled, has no values, and has no deadline. It is not // struct{}, since vars of this type must have distinct addresses. type emptyCtx int func (*emptyCtx) Deadline() (deadline time.Time, ok bool) { return } func (*emptyCtx) Done() <-chan struct{} { return nil } func (*emptyCtx) Err() error { return nil } func (*emptyCtx) Value(key any) any { return nil } func (e *emptyCtx) String() string { switch e { case background: return "context.Background" case todo: return "context.TODO" } return "unknown empty Context" } var ( background = new(emptyCtx) todo = new(emptyCtx) )
// Background returns a non-nil, empty Context. It is never canceled, has no // values, and has no deadline. It is typically used by the main function, // initialization, and tests, and as the top-level Context for incoming // requests. func Background() Context { return background } // TODO returns a non-nil, empty Context. Code should use context.TODO when // it's unclear which Context to use or it is not yet available (because the // surrounding function has not yet been extended to accept a Context // parameter). func TODO() Context { return todo }
func FixLeakingByContex() { //创建上下文用于管理子协程 ctx, cancel := context.WithCancel(context.Background()) //结束前清理未结束协程 defer cancel() ch := make(chan int) go CancelByContext(ctx, ch) go CancelByContext(ctx, ch) go CancelByContext(ctx, ch) // 随机触发某个子协程退出 ch <- 1 } func CancelByContext(ctx context.Context, ch chan (int)) int { select { case <-ctx.Done(): //fmt.Println("cancel by ctx.") return 0 case n := <-ch : return n } }
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) { if parent == nil { panic("cannot create context from nil parent") } // WithCancel通过一个父级Context来创建出一个cancelCtx c := newCancelCtx(parent) // 调用propagateCancel根据父级context的状态来关联cancelCtx的cancel行为 propagateCancel(parent, &c) // 返回c和一个方法,方法中调用c.cancel并传递Canceled变量 return &c, func() { c.cancel(true, Canceled) } } func newCancelCtx(parent Context) cancelCtx { return cancelCtx{Context: parent} } var Canceled = errors.New("context canceled")
func newCancelCtx(parent Context) cancelCtx { return cancelCtx{Context: parent} }
// A cancelCtx can be canceled. When canceled, it also cancels any children // that implement canceler. type cancelCtx struct { Context // 内嵌结构体 mu sync.Mutex // protects following fields done atomic.Value // of chan struct{}, created lazily, closed by first cancel call children map[canceler]struct{} // set to nil by the first cancel call err error // set to non-nil by the first cancel call }
// A canceler is a context type that can be canceled directly. The // implementations are *cancelCtx and *timerCtx. type canceler interface { cancel(removeFromParent bool, err error) Done() <-chan struct{} }
// propagateCancel arranges for child to be canceled when parent is. func propagateCancel(parent Context, child canceler) { // 如果父元素的Done方法返回为空,也就是说父context是emptyCtx // 直接返回,因为父上下文不会做任何处理 done := parent.Done() if done == nil { return // parent is never canceled } // 如果父上下文不是emptyCtx类型,使用select来判断一下父上下文的done channel是不是已经被关闭掉了 // 关闭则调用child的cancel方法 // select其实会阻塞,但这里给了一个default方法,所以如果父上下文的done channel没有被关闭则继续之心后续代码 // 这里相当于利用了select的阻塞性来做if-else判断 select { case <-done: // parent is already canceled child.cancel(false, parent.Err()) return default: } // parentCancelCtx目的在于寻找父上下文中最底层的cancelCtx,因为像timerCtx等会内嵌cancelCtx if p, ok := parentCancelCtx(parent); ok { // 如果找的到,就把最内层的cancelCtx跟child的设置好关联关系 // 这里要考虑到多线程环境,所以是加锁处理 p.mu.Lock() if p.err != nil { // 如果祖先cancelCtx已经被取消了,那么也调用child的cancel方法 // parent has already been canceled child.cancel(false, p.err) } else { // 这里设置内层cancelCtx与child的父子层级关系 if p.children == nil { p.children = make(map[canceler]struct{}) } p.children[child] = struct{}{} } p.mu.Unlock() } else { // 这里代表没有找到祖先cancelCtx,单启了一个协程来进行监听(因为select是阻塞的),如果父上下文的done 关闭了,则子上下文取消 // goroutines在别的地方代码中没有使用,不知道为什么要做增加操作,看源码英文解释也是为了测试使用 // 单独的协程会在阻塞完毕后被GC回收,不会有泄露风险 atomic.AddInt32(&goroutines, +1) go func() { select { case <-parent.Done(): child.cancel(false, parent.Err()) case <-child.Done(): } }() } }
// parentCancelCtx returns the underlying *cancelCtx for parent. // It does this by looking up parent.Value(&cancelCtxKey) to find // the innermost enclosing *cancelCtx and then checking whether // parent.Done() matches that *cancelCtx. (If not, the *cancelCtx // has been wrapped in a custom implementation providing a // different done channel, in which case we should not bypass it.) func parentCancelCtx(parent Context) (*cancelCtx, bool) { done := parent.Done() if done == closedchan || done == nil { return nil, false } p, ok := parent.Value(&cancelCtxKey).(*cancelCtx) if !ok { return nil, false } pdone, _ := p.done.Load().(chan struct{}) if pdone != done { return nil, false } return p, true }
} else { // 这里代表没有找到祖先cancelCtx,单启了一个协程来进行监听(因为select是阻塞的),如果父上下文的done 关闭了,则子上下文取消 // goroutines在别的地方代码中没有使用,不知道为什么要做增加操作,看源码英文解释也是为了测试使用 // 单独的协程会在阻塞完毕后被GC回收,不会有泄露风险 atomic.AddInt32(&goroutines, +1) go func() { select { case <-parent.Done(): child.cancel(false, parent.Err()) case <-child.Done(): } }() }
func (c *cancelCtx) Value(key any) any { if key == &cancelCtxKey { return c } return value(c.Context, key) }
func value(c Context, key any) any { for { switch ctx := c.(type) { case *valueCtx: if key == ctx.key { return ctx.val } c = ctx.Context case *cancelCtx: if key == &cancelCtxKey { return c } c = ctx.Context case *timerCtx: if key == &cancelCtxKey { return &ctx.cancelCtx } c = ctx.Context case *emptyCtx: return nil default: return c.Value(key) } } }
func (c *cancelCtx) Done() <-chan struct{} { // 返回atomic.Value中存储的值 d := c.done.Load() if d != nil { // atomic.Value类型的Load方法返回的是ifaceWords类型,所以这里是利用了类型断言 // 把ifaceWords类型转换为 struct类型的chan return d.(chan struct{}) } // 这里是并发场景要考虑的问题,因为会存在多个线程并发进行的过程,所以不一定哪个goroutine就对c.done进行了修改 // 所以这里不能直接像单线程一样,if d!=nil else。。。;首先得抢锁。 c.mu.Lock() defer c.mu.Unlock() d = c.done.Load() // 上面抢锁的过程可能抢到了,也可能没抢到,所以到这里是抢到了锁,但是c.done未必还是nil; // 所以这里要再次做判断 if d == nil { d = make(chan struct{}) c.done.Store(d) } return d.(chan struct{}) }
func (c *cancelCtx) Err() error { c.mu.Lock() err := c.err c.mu.Unlock() return err }
func (c *cancelCtx) cancel(removeFromParent bool, err error) { if err == nil { panic("context: internal error: missing cancel error") } // 因为后面要对c.err和c.done进行更新,所以这里要抢锁 c.mu.Lock() if c.err != nil { // if这部分放到锁的外部是否可以?看起来是可以的,但是如果放到外面,if判断不通过此时c.err为nil // 接着进行抢锁,那么在抢到锁之后仍然要对c.err判断是否还是nil,才能进行更新 // 因为在抢锁过程中,可能c.err已经被某个协程修改了 // 所以把这部分放到锁之后是合理的。 c.mu.Unlock() return // already canceled } c.err = err // 赋值 d, _ := c.done.Load().(chan struct{}) // 读取done的值 if d == nil { // 如果done为nil,就把一个内部的closedchan存入c.done中; // closedchan是一个channel类型,在context包的init函数中就会把它close掉 c.done.Store(closedchan) } else { close(d) } // 遍历c的children调用他们的cancel; for child := range c.children { // NOTE: acquiring the child's lock while holding parent's lock. child.cancel(false, err) } c.children = nil c.mu.Unlock() // 这部分没有在锁的代码中,是因为函数中会自己加锁? if removeFromParent { removeChild(c.Context, c) } }
// removeChild removes a context from its parent. func removeChild(parent Context, child canceler) { p, ok := parentCancelCtx(parent) if !ok { return } p.mu.Lock() if p.children != nil { delete(p.children, child) } p.mu.Unlock() }
type timerCtx struct { cancelCtx timer *time.Timer // Under cancelCtx.mu. deadline time.Time }
func (c *timerCtx) Deadline() (deadline time.Time, ok bool) { return c.deadline, true } func (c *timerCtx) String() string { return contextName(c.cancelCtx.Context) + ".WithDeadline(" + c.deadline.String() + " [" + time.Until(c.deadline).String() + "])" } func (c *timerCtx) cancel(removeFromParent bool, err error) { c.cancelCtx.cancel(false, err) if removeFromParent { // Remove this timerCtx from its parent cancelCtx's children. removeChild(c.cancelCtx.Context, c) } c.mu.Lock() if c.timer != nil { c.timer.Stop() c.timer = nil } c.mu.Unlock() }
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) { if parent == nil { panic("cannot create context from nil parent") } // 如果parent的deadline小于当前时间,直接创建cancelCtx,里面会调用propagateCancel方法 // 来根据父上下文状态进行处理 if cur, ok := parent.Deadline(); ok && cur.Before(d) { // The current deadline is already sooner than the new one. return WithCancel(parent) } // 创建timerCtx,这里可以看到cancelCtx是私有变量,而cancelCtx中的Context字段是公有变量 c := &timerCtx{ cancelCtx: newCancelCtx(parent), deadline: d, } // 设置层级取消关联 propagateCancel(parent, c) dur := time.Until(d) // 如果已经超时直接取消 if dur <= 0 { c.cancel(true, DeadlineExceeded) // deadline has already passed return c, func() { c.cancel(false, Canceled) } } c.mu.Lock() defer c.mu.Unlock() // 如果没有超时并且没有被调用过cancel,那么设置timer,超时则调用cancel方法; if c.err == nil { c.timer = time.AfterFunc(dur, func() { c.cancel(true, DeadlineExceeded) }) } return c, func() { c.cancel(true, Canceled) } }
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) { return WithDeadline(parent, time.Now().Add(timeout)) }
type valueCtx struct { Context key, val any }
func WithValue(parent Context, key, val any) Context { if parent == nil { panic("cannot create context from nil parent") } if key == nil { panic("nil key") } if !reflectlite.TypeOf(key).Comparable() { panic("key is not comparable") } return &valueCtx{parent, key, val} }
func (c *valueCtx) Value(key any) any { if c.key == key { return c.val } return value(c.Context, key) }