热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Context是如何影响grpc通信超时控制的行扩容且扩容后的元素数量超过原始切片容量时,底层数组会迁移到另片内存区域

Golang中那些隐秘的角落https:mp.weixin.qq.comsp572g5KcSwy2ri40d1cPTgGolang中那些隐秘的角落原创 张千风(千风) 阿里开发者 

Golang 中那些隐秘的角落 https://mp.weixin.qq.com/s/p572g5KcSwy2ri40d1cPTg


Golang 中那些隐秘的角落

图片

本文记录了一些在使用 Golang 开发函数计算以及系统问题排查过程中遇到的值得学习的知识点,希望能够分享给大家。

我们真的有用好 recover() 吗?

在一次系统报警调查中,发现某个组件 panic 且没有恢复运行,panic 的日志为是 "fatal error: concurrent map writes",当时只能手动重启该组件。查看其源码时发现,panic 位置对应的函数中,已经存在 recover() 函数,抽象得到的源码逻辑如下所示:

package main
import ( "fmt")
func concurrentMapWrite() { defer func() { if err := recover(); err != nil { fmt.Printf("Panic occurred due to %+v, Recovered in f", err) } }() m := map[int]int{} idx := 0 for { go func() { m[idx] = 1 }() idx++ }}
func main() { concurrentMapWrite()}

当时初步判断 recover 并没有捕获到 "fatal error: concurrent map writes",为了验证自己的猜想,进行了如下的一系列调查。


 

在 defer 中使用 recover()


Golang 程序运行不符合预期时往往会通过“错误”以及“异常”来反馈给用户。前者是代码逻辑出现错误时返回,是编程者意料之中的错误,不会破坏程序的运行;后者往往是代码中出现了不可预期的错误,导致程序无法继续运行下去,如果不加以处理,就会导致程序异常退出,这是十分危险的。

为了提高程序的稳健性,我们需要依赖 Golang 的 recover 以及 defer 机制来保证程序在发生异常后能够继续维持运行,避免程序意外退出,保证业务的稳定运行。defer 关键字中包含的内容会在其所在函数返回之前执行;recover 函数用于将 goroutine 从异常场景下恢复,只有在 defer 中调用才会生效。其使用方式如下如下:

func div(x, y int) int { return x / y}func f() { defer func() { if err := recover(); err != nil { fmt.Printf("Panic occurred due to %+v, Recovered in f", err) } }() fmt.Println(div(1, 0))}

上述 defer...recover 相当于 java 中的 try/catch 关键字,能够保证程序能够不被异常中断执行,我们知道使用 try...catch 可以捕获所有类型的异常,只要 catch 后跟所有异常的基类 Exception 即可;然后Golang为什么却不是这样呢?

 

不可恢复的 panic

不同于 try...catch,在 Golang 中并不是所有异常都能够被 recover 捕获到:



  • 当异常是通过 runtime.panic() 抛出时,能够被 recover 方法捕获;



  • 当异常是通过 runtime.throw() 或者 runtime.fatal() 抛出时,不能够被 recover 方法捕获。

在上述实际场景中遇到的 “concurrent map writes” 异常就是通过 runtime.fatal() 抛出来的,具体源码(runtime/map.go):

// Like mapaccess, but allocates a slot for the key if it is not present in the map.func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer { if h == nil { panic(plainError("assignment to entry in nil map")) } // 省略 ... if h.flags&hashWriting != 0 { fatal("concurrent map writes") } hash := t.hasher(key, uintptr(h.hash0)) // Set hashWriting after calling t.hasher, since t.hasher may panic, // in which case we have not actually done a write. h.flags ^= hashWriting if h.buckets == nil { h.buckets = newobject(t.bucket) // newarray(t.bucket, 1) }again: // 省略 ...bucketloop: // 省略 ...done: if h.flags&hashWriting == 0 { fatal("concurrent map writes") } h.flags &^= hashWriting if t.indirectelem() { elem = *((*unsafe.Pointer)(elem)) } return elem}

map 通过标志位 h.flags 来检查 map 是否存并发写情况,如果存在,则调用 fatal 方法,此时错误为 "fatal error",会强制退出程序,详情见 fatal 源码:

// fatal triggers a fatal error that dumps a stack trace and exits.//// fatal is equivalent to throw, but is used when user code is expected to be// at fault for the failure, such as racing map writes.//// fatal does not include runtime frames, system goroutines, or frame metadata// (fp, sp, pc) in the stack trace unless GOTRACEBACK=system or higher.////go:nosplitfunc fatal(s string) { // Everything fatal does should be recursively nosplit so it // can be called even when it's unsafe to grow the stack. systemstack(func() { print("fatal error: ", s, "\n") }) fatalthrow(throwTypeUser)}// fatalthrow implements an unrecoverable runtime throw. It freezes the// system, prints stack traces starting from its caller, and terminates the// process.////go:nosplitfunc fatalthrow(t throwType) { pc := getcallerpc() sp := getcallersp() gp := getg() if gp.m.throwing == throwTypeNone { gp.m.throwing = t } // Switch to the system stack to avoid any stack growth, which may make // things worse if the runtime is in a bad state. systemstack(func() { startpanic_m() if dopanic_m(gp, pc, sp) { // crash uses a decent amount of nosplit stack and we're already // low on stack in throw, so crash on the system stack (unlike // fatalpanic). crash() } exit(2) }) *(*int)(nil) = 0 // not reached}

从 fatal 方法的注释中可知,该方法等同于 throw 方法,但是只会抛出用户层面的异常,系统层面的异常由 runtime.throw 抛出。fatal 方法中又调用了 fatalthrow 方法, 该方法的注释中明确表示:"fatalthrow implements an unrecoverable runtime throw",因此通过这个方法抛出的异常均属于不可恢复异常。"concurrent map writes" 之所以被视为不可恢复异常,是因为 Golang 检测到数据竞争时,map 内部的结构已经被破坏了,继续运行可能会产生不可预期的结果,因此会强制结束程序。 以下罗列了一些其他不可恢复的异常种类:



  • Out of memory



  • Concurrent map writes



  • Stack memory exhaustion



  • Attempting to launch a nil function as a goroutine



  • All goroutines are asleep - deadlock



  • Thread limit exhaustion



参考:

[1] https://go-review.googlesource.com/c/go/+/390421

[2] https://github.com/golang/go/blob/master/src/runtime/map.go#L578

[3] https://stackoverflow.com/questions/57486620/are-all-runtime-errors-recoverable-in-go

[4] https://www.jianshu.com/p/15c459c85141

[5] https://www.zhihu.com/question/305845656/answer/728440889

切片扩容有哪些坑?

在开发过程中,将切片作为参数传递到函数中,然后在函数中修改切片内容,对应的修改预期一定能够同步更新到实参中,然而实际开发测试中发现,有的场景符合预期,有的却不符合预期。若在函数中对该切片进行扩容且扩容后的切片大小不超过其原始容量,此时修改切片中已有的元素,则修改会同步到实参切片中,而扩容不会同步到实参切片中;若在函数中对该切片进行扩容且扩容后的切片大小超过其原始容量,则修改不会同步到实参切片中,同时扩容不会同步到实参切片中,示例代码如下:

package main

import ( "fmt")func appendSliceWithinCap(s []string) { s = append(s, "two") s[0] = "appendSliceWithinCap"}func appendSliceOverCap(s []string) { s = append(s, "two") s = append(s, "three") s[0] = "appendSliceOverCap"}func main() { fmt.Println("hello main") s := make([]string, 1, 2) s[0] = "one" fmt.Println(s) // ["one"] appendSliceWithinCap(s) fmt.Println(s) // ["appendSliceWithinCap"] appendSliceOverCap(s) fmt.Println(s) // ["appendSliceWithinCap"]}

切片扩容机理


函数中对切片参数中已有元素的更新会影响实参

切片自身维护了一个指针属性,用于指向它底层数组中的某些元素的集合,其结构体如下所示:

type slice struct { array unsafe.Pointer // 指向底层数组的指针 len int // 切片的长度 cap int // 切片的容量}

Golang 官方文档声明:函数参数传参只有值传递一种方式。值传递方式会在调用函数时将实际参数拷贝一份传递到函数中,slice 参数被传递到函数中时,其 array、len 以及 cap 都被复制了一份,因此函数中 slice 和实参 slice 是共享了底层 slice 数组的,函数中对 slice 中已有元素的修改会同步到实参 slice 中。


切片的扩容策略

切片可以通过 append 函数以追加元素的方式进行动态扩容,扩容的元素会存储在切片已有的存储空间中,然而切片的存储空间上限由切片容量决定,当扩容的元素数量超过切片容量时,切片必须对底层数组进行扩容才能容纳这些元素,我们通过 go/src/runtime/slice.go 中的 growslice 方法来解析下此时 Golang(1.19.2+) 扩容切片的策略:

// growslice allocates new backing store for a slice.//// arguments:////  oldPtr = pointer to the slice's backing array//  newLen = new length (= oldLen + num)//  oldCap = original slice's capacity.//     num = number of elements being added//      et = element type//// return values:////  newPtr = pointer to the new backing store//  newLen = same value as the argument//  newCap = capacity of the new backing store//// Requires that uint(newLen) > uint(oldCap).// Assumes the original slice length is newLen - num//// A new backing store is allocated with space for at least newLen elements.// Existing entries [0, oldLen) are copied over to the new backing store.// Added entries [oldLen, newLen) are not initialized by growslice// (although for pointer-containing element types, they are zeroed). They// must be initialized by the caller.// Trailing entries [newLen, newCap) are zeroed.//// growslice's odd calling convention makes the generated code that calls// this function simpler. In particular, it accepts and returns the// new length so that the old length is not live (does not need to be// spilled/restored) and the new length is returned (also does not need// to be spilled/restored).func growslice(oldPtr unsafe.Pointer, newLen, oldCap, num int, et *_type) slice {  oldLen := newLen - num  // 省略 ...
  newcap := oldCap  doublecap := newcap + newcap  if newLen > doublecap {    newcap = newLen  } else {    const threshold = 256    if oldCap < threshold {      newcap = doublecap    } else {      // Check 0 < newcap to detect overflow      // and prevent an infinite loop.      for 0 < newcap && newcap < newLen {        // Transition from growing 2x for small slices        // to growing 1.25x for large slices. This formula        // gives a smooth-ish transition between the two.        newcap += (newcap + 3*threshold) / 4      }      // Set newcap to the requested cap when      // the newcap calculation overflowed.      if newcap <= 0 {        newcap = newLen      }    } } // 省略 ...  // The check of overflow in addition to capmem > maxAlloc is needed  // to prevent an overflow which can be used to trigger a segfault  // on 32bit architectures with this example program:  //  // type T [1<<27 + 1]int64  //  // var d T  // var s []T  //  // func main() {  //   s = append(s, d, d, d, d)  //   print(len(s), "\n")  // }  if overflow || capmem > maxAlloc {    panic(errorString("growslice: len out of range")) }
  var p unsafe.Pointer  if et.ptrdata == 0 {    p = mallocgc(capmem, nil, false)    // The append() that calls growslice is going to overwrite from oldLen to newLen.    // Only clear the part that will not be overwritten.    // The reflect_growslice() that calls growslice will manually clear    // the region not cleared here.    memclrNoHeapPointers(add(p, newlenmem), capmem-newlenmem)  } else {    // Note: can't use rawmem (which avoids zeroing of memory), because then GC can scan uninitialized memory.    p = mallocgc(capmem, et, true)    if lenmem > 0 && writeBarrier.enabled {      // Only shade the pointers in oldPtr since we know the destination slice p      // only contains nil pointers because it has been cleared during alloc.      bulkBarrierPreWriteSrcOnly(uintptr(p), uintptr(oldPtr), lenmem-et.size+et.ptrdata)    }  }  memmove(p, oldPtr, lenmem)  return slice{p, newLen, newcap}}

当扩容后的元素总数超过切片容量大小,其扩容策略如下:



  • threshold = 256



  • 若扩容后的元素数量超过两倍原始容量,则直接将扩容后元素数量赋值给新容量,否则执行如下



  • 若原容量小于 threshold,则将原始容量的两倍赋值给新容量,否则执行如下



  • 在原始容量基础上,每次增加 (原始容量 + threshold * 3)/ 4,直到其不小于扩容后的元素数量



扩容策略完成,得到新容量值后,会基于该值申请内存,然后将原数组中的数据以及扩容的数据拷贝到新内存中,此时完成切片的动态扩容,其公式如下:

图片

由上可知,当函数对形参 slice 进行扩容且扩容后的元素数量超过原始切片容量时,底层数组会迁移到另片内存区域,因此函数中对形参 slice 已有元素的更新无法影响到实参 slice。

参考:[1] https://www.liangtian.me/post/go-slice/[2] https://juejin.cn/post/6888117219213967368

[3]https://github.com/golang/go/blob/4c61e079c087052355c137ab8fcd9abf8728e50a/src/runtime/slice.go

Context 是如何影响 grpc 通信超时控制的

图片

上述场景是我在实际开发应用过程中抽象出来的 grpc 通信过程,这也是一个较为通用的过程,client 端将带有超时时间的 context 传递到 server 端,server 端在超时时间内需要完成请求处理并返回响应给 client 端,若超过超时请求时间,那么链接将会断开,client 端将不会收到任何响应。

然而在实际开发应用中,发现即便 server 端的 context 超时了,但是其请求响应仍会偶发性地传递到 client 端,导致我们的一个功能出现了不可预期的情况,为了用代码描述对应的交互流程,我在这里放了简化后的示例代码,描述了当时的交互逻辑。

https://github.com/git-qfzhang/hello-golang/tree/master/grpc-go/helloworld

图片

grpc 超时传递流程

在 Golang grpc 通信过程中,超时信息会在不同通信端进行传递的,传递的介质是 Http2 Request Frame。grpc client 在发送请求之前,会将信息封装在不同的的 Frame 中,例如 Data Frame 用来存放请求的 response payload;Header Frame 用户存在一些跨 goroutine 传递的数据,例如路径信息。而超时信息就存放在 Header Frame 中,其源码如下所示:

// NewStream 方法的调用链路:grpc.Invoke -> invoke -> sendRequest -> NewStream
// NewStream creates a stream and register it into the transport as "active"// streams.func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) { // 省略 ... // HPACK encodes various headers. Note that once WriteField(...) is // called, the corresponding headers/continuation frame has to be sent // because hpack.Encoder is stateful.  t.hBuf.Reset()  t.hEnc.WriteField(hpack.HeaderField{Name: ":method", Value: "POST"})  t.hEnc.WriteField(hpack.HeaderField{Name: ":scheme", Value: t.scheme})  t.hEnc.WriteField(hpack.HeaderField{Name: ":path", Value: callHdr.Method})  t.hEnc.WriteField(hpack.HeaderField{Name: ":authority", Value: callHdr.Host})  t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})  t.hEnc.WriteField(hpack.HeaderField{Name: "user-agent", Value: t.userAgent}) t.hEnc.WriteField(hpack.HeaderField{Name: "te", Value: "trailers"})
  if callHdr.SendCompress != "" {    t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})  }  if dl, ok := ctx.Deadline(); ok {    // Send out timeout regardless its value. The server can detect timeout context by itself.    timeout := dl.Sub(time.Now())    t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})  } // 省略 ...}

client server 端在收到超时信息后,将 grpc-timeout 字段从 Header 中取出,基于该超时信息新建一个 context 实例,其源码如下所示:

// processHeaderField 方法调用链:grpc.Server -> handleRawConn -> serveNewHTTP2Transport -> serveStreams -> HandleStreams -> operateHeaders -> processHeaderField// operateHeader takes action on the decoded headers.func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) (close bool) { buf := newRecvBuffer() s := &Stream{ id: frame.Header().StreamID, st: t, buf: buf, fc: &inFlow{limit: initialWindowSize}, } var state decodeState for _, hf := range frame.Fields { state.processHeaderField(hf) } // 省略 ... s.recvCompress = state.encoding if state.timeoutSet { s.ctx, s.cancel = context.WithTimeout(context.TODO(), state.timeout) } else { s.ctx, s.cancel = context.WithCancel(context.TODO()) } // 省略 ...}func (d *decodeState) processHeaderField(f hpack.HeaderField) { switch f.Name { // 省略 ... case "grpc-timeout": d.timeoutSet = true var err error d.timeout, err = decodeTimeout(f.Value) if err != nil { d.setErr(streamErrorf(codes.Internal, "transport: malformed time-out: %v", err)) return } // 省略 ... }}

在 grpc client 端,会去不断检查 context.Done() 来判断 context 是否超时,若超时,则会断开链接。然而,也会存在 context timeout races 的情况,例如,client 端 context 已经超时,但是此时下一轮检查还未开始,同时 server 端恰好返回了响应信息,此时虽然 client 端 context 超时了,但是仍然会接收到 server 端的响应并处理;更普遍的情况是 select { case <- ctx; ...; case <- response; ...},这就会导致有 50% 的概率未检测到 context 超时,详情请参考我之前在 grpc-go 中提的 issue。

 

确保 grpc 响应超时错误

在我之前经历的错误场景中, server 端 context 出现超时,并返回响应给 client 端,此时 client 端预期应该也会超时并断开链接,但实际是会成功接收到 client 端的响应,由于处理逻辑的问题,当时的响应并不包含超时错误,因此 client 端在接收到请求后会重新发送一次请求,重新发送完成后,才检测到 context 超时,最终断开链接,导致了错误的出现。

图片

因此,在应用过程中,需要在 server 端 context timeout 时,保证返回的 response 中的错误信息是 grpc.DeadlineExceeded,让 client 端也感知到 timeout 的发生,避免不必要逻辑的发生。

参考:[1] https://github.com/grpc/grpc-go[2] https://github.com/grpc/grpc-go/issues/5206#issuecomment-1058564271[3] https://xiaomi-info.github.io/2019/12/30/grpc-deadline/

 

 



推荐阅读
author-avatar
民主公园
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有