热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

关于rpc:RPCX源码学习server端

协定的次要内容是什么?该协定容许运行于一台计算机中的程序调用另一个地址空间(通常为一个凋谢网络中的一台计算机)的子程序,而程序员就像调用本地程序一

意识RPC

  1. RPC是什么货色?
    RPC: Remote Procedure Call(近程过程调用),是一个计算机通信协议。
  2. 协定的次要内容是什么?
    该协定容许运行于一台计算机中的程序调用另一个地址空间(通常为一个凋谢网络中的一台计算机)的子程序,而程序员就像调用本地程序一样,无需额定的为这个交互作用编程(无需关注细节)。
  3. 次要用来解决什么问题?
    解决分布式系统中服务之间的调用问题。
    使近程调用像本地办法调用一样不便,暗藏底层网络通信的复杂性,使咱们更专一于业务。
  4. 一次rpc通信会波及到哪些角色?流程是什么样的?
    client: 客户端程序,调用的发起者
    client-stub: 将调用参数和办法依照约定的协定进行编码,而后传输到server
    server: 服务端程序,解决client的调用
    server-stub: 收到client的音讯后,依照约定的协定进行解码,而后调用本地办法进行解决

RPCX框架简介

rpcx是一个分布式的RPC框架,由go语言开发,反对Zookepper、etcd、consul多种服务发现形式,多种服务路由形式,是目前性能最好的RPC框架之一。

详情见官网介绍: https://books.studygolang.com…

server端源码分析

从入口开始,咱们启动一个rpc服务时,须要通过NewServer办法去创立一个Server对象,源码如下:

// NewServer returns a server.
func NewServer(options ...OptionFn) *Server {
    s := &Server{ // 创立一个Server对象,给一些字段赋默认值
        Plugins:    &pluginContainer{},
        options:    make(map[string]interface{}),
        activeConn: make(map[net.Conn]struct{}),
        doneChan:   make(chan struct{}),
        serviceMap: make(map[string]*service),
        router:     make(map[string]Handler),
        AsyncWrite: false, // 除非你想benchmark或者极致优化,否则倡议你设置为false
    }

    for _, op := range options {
        op(s)
    }

    // 设置tcp连贯的keepAlive参数
    if s.options["TCPKeepAlivePeriod"] == nil {
        s.options["TCPKeepAlivePeriod"] = 3 * time.Minute
    }
    return s
}

Server构造如下:

type Handler func(ctx *Context) error

// Server is rpcx server that use TCP or UDP.
type Server struct {
    ln                 net.Listener // 监听器,用来监听服务端的端口
    readTimeout        time.Duration // 读取client申请数据包的超时工夫
    writeTimeout       time.Duration // 给client写响应数据包的超时工夫
    gatewayHTTPServer  *http.Server // http网管
    DisableHTTPGateway bool // should disable http invoke or not.
    DisableJSONRPC     bool // should disable json rpc or not.
    AsyncWrite         bool // set true if your server only serves few clients

    serviceMapMu sync.RWMutex // 读写锁,爱护sericeMap的并发读写
    serviceMap   map[string]*service // 服务记录表

    router map[string]Handler

    mu         sync.RWMutex // 读写锁,爱护activeConn的并发读写
    activeConn map[net.Conn]struct{} // 沉闷连贯记录表
    doneChan   chan struct{}
    seq        uint64 // server端id

    inShutdown int32
    onShutdown []func(s *Server)
    onRestart  []func(s *Server)

    // TLSConfig for creating tls tcp connection.
    tlsConfig *tls.Config
    // BlockCrypt for kcp.BlockCrypt
    options map[string]interface{}

    // CORS options 跨域
    corsOptions *CORSOptions
    // 插件
    Plugins PluginContainer

    // AuthFunc can be used to auth.
    AuthFunc func(ctx context.Context, req *protocol.Message, token string) error

    handlerMsgNum int32

    HandleServiceError func(error)
}

对象创立实现之后,须要咱们来注册对外提供的服务,通过RegisterName办法来实现:


/*
    @name: 对外提供的服务名称
    @rcvr: 对外提供的服务对象的实例
*/
func (s *Server) RegisterName(name string, rcvr interface{}, metadata string) error {
    _, err := s.register(rcvr, name, true) // 创立service对象,存储到对象表中
    if err != nil {
        return err
    }
    if s.Plugins == nil {
        s.Plugins = &pluginContainer{}
    }
    return s.Plugins.DoRegister(name, rcvr, metadata)
}

// 注册service
func (s *Server) register(rcvr interface{}, name string, useName bool) (string, error) {
    // 加写锁
    s.serviceMapMu.Lock()
    defer s.serviceMapMu.Unlock()

    // 结构service对象
    service := new(service)
    service.typ = reflect.TypeOf(rcvr)
    service.rcvr = reflect.ValueOf(rcvr)
    sname := reflect.Indirect(service.rcvr).Type().Name() // Type
    if useName {
        sname = name
    }
    if sname == "" {
        errorStr := "rpcx.Register: no service name for type " + service.typ.String()
        log.Error(errorStr)
        return sname, errors.New(errorStr)
    }
    if !useName && !isExported(sname) {
        errorStr := "rpcx.Register: type " + sname + " is not exported"
        log.Error(errorStr)
        return sname, errors.New(errorStr)
    }
    service.name = sname

    // Install the methods
    // 将rcvr对象的所有办法查找进去
    service.method = suitableMethods(service.typ, true)

    if len(service.method) == 0 {
        var errorStr string

        // To help the user, see if a pointer receiver would work.
        method := suitableMethods(reflect.PtrTo(service.typ), false)
        if len(method) != 0 {
            errorStr = "rpcx.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
        } else {
            errorStr = "rpcx.Register: type " + sname + " has no exported methods of suitable type"
        }
        log.Error(errorStr)
        return sname, errors.New(errorStr)
    }
    // 更新服务记录表
    s.serviceMap[service.name] = service
    return sname, nil
}

// DoRegister invokes DoRegister plugin.
func (p *pluginContainer) DoRegister(name string, rcvr interface{}, metadata string) error {
    var es []error
    for _, rp := range p.plugins {
        // 如果是RegisterPlugin类型的插件,会调用它的Register函数进行注册
        if plugin, ok := rp.(RegisterPlugin); ok {
            err := plugin.Register(name, rcvr, metadata)
            if err != nil {
                es = append(es, err)
            }
        }
    }

    if len(es) > 0 {
        return errors.NewMultiError(es)
    }
    return nil
}

服务注册结束之后,就开始启动服务了:

// Serve starts and listens RPC requests.
// It is blocked until receiving connections from clients.
func (s *Server) Serve(network, address string) (err error) {
    var ln net.Listener
    // 设置监听器,监听地址中配置的端口
    ln, err = s.makeListener(network, address)
    if err != nil {
        return
    }
    // 如果抉择http协定,走这里
    if network == "http" {
        s.serveByHTTP(ln, "")
        return nil
    }
    // 抉择ws/wss协定走这里
    if network == "ws" || network == "wss" {
        s.serveByWS(ln, "")
        return nil
    }

    // try to start gateway
    // 尝试启动网关,如果是tcp协定,则会通过goroutine启动一个网关服务
    ln = s.startGateway(network, ln)
    // tcp协定走这里,开始解决监听端口处的数据
    return s.serveListener(ln)
}

// 获取端口数据并启动goroutine解决
func (s *Server) serveListener(ln net.Listener) error {
    var tempDelay time.Duration

    s.mu.Lock()
    s.ln = ln
    s.mu.Unlock()

    for {// 死循环
        // 有申请数据达到时,channel会返回数据
        conn, e := ln.Accept()
        if e != nil { // 错误处理
            select {
            case <-s.getDoneChan(): // 连贯敞开,报错,跳出循环
                return ErrServerClosed
            default:
            }

            if ne, ok := e.(net.Error); ok && ne.Temporary() { // 短暂谬误
                if tempDelay == 0 {
                    tempDelay = 5 * time.Millisecond
                } else {
                    tempDelay *= 2
                }

                if max := 1 * time.Second; tempDelay > max {
                    tempDelay = max
                }

                log.Errorf("rpcx: Accept error: %v; retrying in %v", e, tempDelay)
                time.Sleep(tempDelay)
                continue
            }

            if strings.Contains(e.Error(), "listener closed") {
                return ErrServerClosed
            }
            return e
        }
        tempDelay = 0

        // 如果是tcp连贯,设置keepAlive相干参数
        if tc, ok := conn.(*net.TCPConn); ok {
            period := s.options["TCPKeepAlivePeriod"]
            if period != nil {
                tc.SetKeepAlive(true)
                tc.SetKeepAlivePeriod(period.(time.Duration))
                tc.SetLinger(10)
            }
        }

        // 有PostConnAcceptPlugin类型插件时,进行解决
        conn, ok := s.Plugins.DoPostConnAccept(conn)
        if !ok {
            conn.Close()
            continue
        }

        s.mu.Lock()
        // 记录沉闷连贯
        s.activeConn[conn] = struct{}{}
        s.mu.Unlock()

        if share.Trace {
            log.Debugf("server accepted an conn: %v", conn.RemoteAddr().String())
        }
        // 启动goroutine异步解决申请,持续开始下一次循环
        go s.serveConn(conn)
    }
}

监听到申请后,会启动一个goroutine进行异步解决,接下来看看申请的解决逻辑:

func (s *Server) serveConn(conn net.Conn) {
    if s.isShutdown() { // 异样解决,敞开连贯
        s.closeConn(conn)
        return
    }

    defer func() {// 捕捉异样,打印谬误堆栈信息,敞开连贯
        if err := recover(); err != nil {
            const size = 64 <<10
            buf := make([]byte, size)
            ss := runtime.Stack(buf, false)
            if ss > size {
                ss = size
            }
            buf = buf[:ss]
            log.Errorf("serving %s panic error: %s, stack:\n %s", conn.RemoteAddr(), err, buf)
        }

        if share.Trace {
            log.Debugf("server closed conn: %v", conn.RemoteAddr().String())
        }

        s.closeConn(conn)
    }()

    // TLS解决
    if tlsConn, ok := conn.(*tls.Conn); ok {
        if d := s.readTimeout; d != 0 {
            conn.SetReadDeadline(time.Now().Add(d))
        }
        if d := s.writeTimeout; d != 0 {
            conn.SetWriteDeadline(time.Now().Add(d))
        }
        if err := tlsConn.Handshake(); err != nil {
            log.Errorf("rpcx: TLS handshake error from %s: %v", conn.RemoteAddr(), err)
            return
        }
    }

    // 申请空间,寄存申请数据
    r := bufio.NewReaderSize(conn, ReaderBuffsize)

    var writeCh chan *[]byte
    if s.AsyncWrite { // 如果设置了同步写,走这里
        writeCh = make(chan *[]byte, 1)
        defer close(writeCh)
        go s.serveAsyncWrite(conn, writeCh)
    }
    for {
        if s.isShutdown() {
            return
        }

        t0 := time.Now()
        if s.readTimeout != 0 { // 设置超时工夫
            conn.SetReadDeadline(t0.Add(s.readTimeout))
        }

        // 创立上下文
        ctx := share.WithValue(context.Background(), RemoteConnContextKey, conn)
        // 获取申请数据
        req, err := s.readRequest(ctx, r)
        if err != nil { // 获取失败的解决逻辑
            protocol.FreeMsg(req)

            if err == io.EOF {
                log.Infof("client has closed this connection: %s", conn.RemoteAddr().String())
            } else if strings.Contains(err.Error(), "use of closed network connection") {
                log.Infof("rpcx: connection %s is closed", conn.RemoteAddr().String())
            } else if errors.Is(err, ErrReqReachLimit) {
                if !req.IsOneway() {
                    res := req.Clone()
                    res.SetMessageType(protocol.Response)
                    if len(res.Payload) > 1024 && req.CompressType() != protocol.None {
                        res.SetCompressType(req.CompressType())
                    }
                    handleError(res, err)
                    s.Plugins.DoPreWriteResponse(ctx, req, res, err)
                    data := res.EncodeSlicePointer()
                    if s.AsyncWrite {
                        writeCh <- data
                    } else {
                        conn.Write(*data)
                        protocol.PutData(data)
                    }
                    s.Plugins.DoPostWriteResponse(ctx, req, res, err)
                    protocol.FreeMsg(res)
                } else {
                    s.Plugins.DoPreWriteResponse(ctx, req, nil, err)
                }
                protocol.FreeMsg(req)
                continue
            } else {
                log.Warnf("rpcx: failed to read request: %v", err)
            }
            return
        }

        if s.writeTimeout != 0 {
            conn.SetWriteDeadline(t0.Add(s.writeTimeout))
        }

        if share.Trace {
            log.Debugf("server received an request %+v from conn: %v", req, conn.RemoteAddr().String())
        }

        // 更新上下文的值
        ctx = share.WithLocalValue(ctx, StartRequestContextKey, time.Now().UnixNano())
        closeConn := false
        if !req.IsHeartbeat() { // 不是心跳包
            err = s.auth(ctx, req) // 如果设置有auth防备,执行auth逻辑
            closeCOnn= err != nil
        }

        if err != nil { // auth失败的解决逻辑
            if !req.IsOneway() {
                res := req.Clone()
                res.SetMessageType(protocol.Response)
                if len(res.Payload) > 1024 && req.CompressType() != protocol.None {
                    res.SetCompressType(req.CompressType())
                }
                handleError(res, err)
                s.Plugins.DoPreWriteResponse(ctx, req, res, err)
                data := res.EncodeSlicePointer()
                if s.AsyncWrite {
                    writeCh <- data
                } else {
                    conn.Write(*data)
                    protocol.PutData(data)
                }
                s.Plugins.DoPostWriteResponse(ctx, req, res, err)
                protocol.FreeMsg(res)
            } else {
                s.Plugins.DoPreWriteResponse(ctx, req, nil, err)
            }
            protocol.FreeMsg(req)
            // auth failed, closed the connection
            if closeConn {
                log.Infof("auth failed for conn %s: %v", conn.RemoteAddr().String(), err)
                return
            }
            continue
        }

        go func() {//启动goroutine解决
            defer func() { // 解决完结,捕捉异样
                if r := recover(); r != nil {
                    // maybe panic because the writeCh is closed.
                    log.Errorf("failed to handle request: %v", r)
                }
            }()

            // 原子操作,记录服务解决的音讯数量
            atomic.AddInt32(&s.handlerMsgNum, 1)
            defer atomic.AddInt32(&s.handlerMsgNum, -1)

            if req.IsHeartbeat() { // 心跳包的解决逻辑
                s.Plugins.DoHeartbeatRequest(ctx, req)
                req.SetMessageType(protocol.Response)
                data := req.EncodeSlicePointer()
                if s.AsyncWrite {
                    writeCh <- data
                } else {
                    conn.Write(*data)
                    protocol.PutData(data)
                }
                protocol.FreeMsg(req)
                return
            }

            resMetadata := make(map[string]string)
            ctx = share.WithLocalValue(share.WithLocalValue(ctx, share.ReqMetaDataKey, req.Metadata),
                share.ResMetaDataKey, resMetadata)
            // 如果服务通过metadata设置了超时工夫,则创立一个超时上下文
            cancelFunc := parseServerTimeout(ctx, req)
            if cancelFunc != nil {
                defer cancelFunc()
            }
            // 插件解决
            s.Plugins.DoPreHandleRequest(ctx, req)

            if share.Trace {
                log.Debugf("server handle request %+v from conn: %v", req, conn.RemoteAddr().String())
            }

            // first use handler
            if handler, ok := s.router[req.ServicePath+"."+req.ServiceMethod]; ok {
                sctx := NewContext(ctx, conn, req, writeCh)
                err := handler(sctx)
                if err != nil {
                    log.Errorf("[handler internal error]: servicepath: %s, servicemethod, err: %v", req.ServicePath, req.ServiceMethod, err)
                }

                return
            }

            // 解决申请
            res, err := s.handleRequest(ctx, req)
            if err != nil {
                if s.HandleServiceError != nil {
                    s.HandleServiceError(err)
                } else {
                    log.Warnf("rpcx: failed to handle request: %v", err)
                }
            }

            s.Plugins.DoPreWriteResponse(ctx, req, res, err)
            if !req.IsOneway() {
                if len(resMetadata) > 0 { // copy meta in context to request
                    meta := res.Metadata
                    if meta == nil {
                        res.Metadata = resMetadata
                    } else {
                        for k, v := range resMetadata {
                            if meta[k] == "" {
                                meta[k] = v
                            }
                        }
                    }
                }

                if len(res.Payload) > 1024 && req.CompressType() != protocol.None {
                    res.SetCompressType(req.CompressType())
                }
                data := res.EncodeSlicePointer()
                if s.AsyncWrite {
                    writeCh <- data
                } else {
                    conn.Write(*data)
                    protocol.PutData(data)
                }

            }

            s.Plugins.DoPostWriteResponse(ctx, req, res, err)

            if share.Trace {
                log.Debugf("server write response %+v for an request %+v from conn: %v", res, req, conn.RemoteAddr().String())
            }

            // 开释对象,放回对象池
            protocol.FreeMsg(req)
            protocol.FreeMsg(res)
        }()
    }
}

未完待续……


推荐阅读
  • 【重识云原生】第四章云网络4.8.3.2节——Open vSwitch工作原理详解
    2OpenvSwitch架构2.1OVS整体架构ovs-vswitchd:守护程序,实现交换功能,和Linux内核兼容模块一起,实现基于流的交换flow-basedswitchin ... [详细]
  • 本文介绍了如何使用php限制数据库插入的条数并显示每次插入数据库之间的数据数目,以及避免重复提交的方法。同时还介绍了如何限制某一个数据库用户的并发连接数,以及设置数据库的连接数和连接超时时间的方法。最后提供了一些关于浏览器在线用户数和数据库连接数量比例的参考值。 ... [详细]
  • 本文介绍了RPC框架Thrift的安装环境变量配置与第一个实例,讲解了RPC的概念以及如何解决跨语言、c++客户端、web服务端、远程调用等需求。Thrift开发方便上手快,性能和稳定性也不错,适合初学者学习和使用。 ... [详细]
  • 本文介绍了在Windows环境下如何配置php+apache环境,包括下载php7和apache2.4、安装vc2015运行时环境、启动php7和apache2.4等步骤。希望对需要搭建php7环境的读者有一定的参考价值。摘要长度为169字。 ... [详细]
  • 本文介绍了在mac环境下使用nginx配置nodejs代理服务器的步骤,包括安装nginx、创建目录和文件、配置代理的域名和日志记录等。 ... [详细]
  • 本文介绍了Windows操作系统的版本及其特点,包括Windows 7系统的6个版本:Starter、Home Basic、Home Premium、Professional、Enterprise、Ultimate。Windows操作系统是微软公司研发的一套操作系统,具有人机操作性优异、支持的应用软件较多、对硬件支持良好等优点。Windows 7 Starter是功能最少的版本,缺乏Aero特效功能,没有64位支持,最初设计不能同时运行三个以上应用程序。 ... [详细]
  • phpcomposer 那个中文镜像是不是凉了 ... [详细]
  • Skywalking系列博客1安装单机版 Skywalking的快速安装方法
    本文介绍了如何快速安装单机版的Skywalking,包括下载、环境需求和端口检查等步骤。同时提供了百度盘下载地址和查询端口是否被占用的命令。 ... [详细]
  • Monkey《大话移动——Android与iOS应用测试指南》的预购信息发布啦!
    Monkey《大话移动——Android与iOS应用测试指南》的预购信息已经发布,可以在京东和当当网进行预购。感谢几位大牛给出的书评,并呼吁大家的支持。明天京东的链接也将发布。 ... [详细]
  • 本文介绍了lua语言中闭包的特性及其在模式匹配、日期处理、编译和模块化等方面的应用。lua中的闭包是严格遵循词法定界的第一类值,函数可以作为变量自由传递,也可以作为参数传递给其他函数。这些特性使得lua语言具有极大的灵活性,为程序开发带来了便利。 ... [详细]
  • 怎么在PHP项目中实现一个HTTP断点续传功能发布时间:2021-01-1916:26:06来源:亿速云阅读:96作者:Le ... [详细]
  • RouterOS 5.16软路由安装图解教程
    本文介绍了如何安装RouterOS 5.16软路由系统,包括系统要求、安装步骤和登录方式。同时提供了详细的图解教程,方便读者进行操作。 ... [详细]
  • 本文介绍了一种图的存储和遍历方法——链式前向星法,该方法在存储带边权的图时时间效率比vector略高且节省空间。然而,链式前向星法存图的最大问题是对一个点的出边进行排序去重不容易,但在平行边无所谓的情况下选择这个方法是非常明智的。文章还提及了图中搜索树的父子关系一般不是很重要,同时给出了相应的代码示例。 ... [详细]
  • 深入理解Java虚拟机的并发编程与性能优化
    本文主要介绍了Java内存模型与线程的相关概念,探讨了并发编程在服务端应用中的重要性。同时,介绍了Java语言和虚拟机提供的工具,帮助开发人员处理并发方面的问题,提高程序的并发能力和性能优化。文章指出,充分利用计算机处理器的能力和协调线程之间的并发操作是提高服务端程序性能的关键。 ... [详细]
  • Jquery 跨域问题
    为什么80%的码农都做不了架构师?JQuery1.2后getJSON方法支持跨域读取json数据,原理是利用一个叫做jsonp的概念。当然 ... [详细]
author-avatar
同亮uncle_847
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有