热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

GoMicroClient源码分析

概述Client主要是用来执行请求服务和订阅发布事件。是对于broker,Transort的一种封装方便使用。Init初始化客户端函数初始化连接池数量和连接池TTL调用注入的opt

概述

Client 主要是用来执行请求服务和订阅发布事件。是对于broker,Transort的一种封装方便使用。

Init

初始化客户端函数

  1. 初始化连接池数量和连接池TTL
  2. 调用注入的opts函数列表
  3. 最后初始化连接池

func (r *rpcClient) Init(opts ...Option) error {
size := r.opts.PoolSize
ttl := r.opts.PoolTTL
for _, o := range opts {
o(&r.opts)
}
// update pool configuration if the options changed
if size != r.opts.PoolSize || ttl != r.opts.PoolTTL {
r.pool.Lock()
r.pool.size = r.opts.PoolSize
r.pool.ttl = int64(r.opts.PoolTTL.Seconds())
r.pool.Unlock()
}
return nil
}

==Call==

Call是Client接口中最主要的方法,在之前Go Micro Selector 源码分析

  1. Client调用Call方法
  2. Call方法调用selector组件的Select方法,获取next函数
  3. call匿名函数中调用next函数(默认为CacheSelector 随机获取服务列表中的节点, Go Micro Selector 源码分析) 返回node
  4. 以grpcClient为例,调用grpcClient.call
  5. call函数中获取conn,然后Invoke调用服务端函数

func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
// 复制出options
callOpts := g.opts.CallOptions
for _, opt := range opts {
opt(&callOpts)
}
// 调用next函数 获取selector
next, err := g.next(req, callOpts)
if err != nil {
return err
}
// 检查context Deadline
d, ok := ctx.Deadline()
if !ok {
// 没有deadline 创建一个新的
ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout)
} else {
// 获取到deadline设置context
opt := client.WithRequestTimeout(time.Until(d))
opt(&callOpts)
}
// should we noop right here?
select {
case <-ctx.Done():
return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
default:
}
// 复制call函数 在下面的goroutine中使用
gcall := g.call
// wrap the call in reverse
for i := len(callOpts.CallWrappers); i > 0; i-- {
gcall = callOpts.CallWrappers[i-1](gcall)
}
// return errors.New("go.micro.client", "request timeout", 408)
call := func(i int) error {
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, req, i)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// only sleep if greater than 0
if t.Seconds() > 0 {
time.Sleep(t)
}
// select next node
node, err := next()
if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// 调用call 正式调用服务端接口
err = gcall(ctx, node, req, rsp, callOpts)
g.opts.Selector.Mark(req.Service(), node, err)
return err
}
ch := make(chan error, callOpts.Retries+1)
var gerr error
// 重试
for i := 0; i <= callOpts.Retries; i++ {
go func(i int) {
// 调动call 返回channel
ch <- call(i)
}(i)
select {
case <-ctx.Done():
return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
case err := <-ch:
// if the call succeeded lets bail early
if err == nil {
return nil
}
retry, rerr := callOpts.Retry(ctx, req, i, err)
if rerr != nil {
return rerr
}
if !retry {
return err
}
gerr = err
}
}
return gerr
}

Stream

Stream跟call的逻辑几乎是一样的,不过stream调用的是rpc_client.stream函数。这边就不过多的分析了

func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Stream, error) {
// make a copy of call opts
callOpts := r.opts.CallOptions
for _, opt := range opts {
opt(&callOpts)
}
next, err := r.next(request, callOpts)
if err != nil {
return nil, err
}
// should we noop right here?
select {
case <-ctx.Done():
return nil, errors.Timeout("go.micro.client", fmt.Sprintf("%v", ctx.Err()))
default:
}
call := func(i int) (Stream, error) {
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, request, i)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", "backoff error: %v", err.Error())
}
// only sleep if greater than 0
if t.Seconds() > 0 {
time.Sleep(t)
}
node, err := next()
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
}
stream, err := r.stream(ctx, node, request, callOpts)
r.opts.Selector.Mark(request.Service(), node, err)
return stream, err
}
type response struct {
stream Stream
err error
}
ch := make(chan response, callOpts.Retries+1)
var grr error
for i := 0; i <= callOpts.Retries; i++ {
go func(i int) {
s, err := call(i)
ch <- response{s, err}
}(i)
select {
case <-ctx.Done():
return nil, errors.Timeout("go.micro.client", fmt.Sprintf("call timeout: %v", ctx.Err()))
case rsp := <-ch:
// if the call succeeded lets bail early
if rsp.err == nil {
return rsp.stream, nil
}
retry, rerr := callOpts.Retry(ctx, request, i, rsp.err)
if rerr != nil {
return nil, rerr
}
if !retry {
return nil, rsp.err
}
grr = rsp.err
}
}
return nil, grr
}

Publish

Client中的Publish主要是调用broker中的publish:r.opts.Broker.Publish
然而在client的publish函数中,获取了topic准备了body 最后调用broker的publish

func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOption) error {
options := PublishOptions{
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
md, ok := metadata.FromContext(ctx)
if !ok {
md = make(map[string]string)
}
id := uuid.New().String()
md["Content-Type"] = msg.ContentType()
md["Micro-Topic"] = msg.Topic()
md["Micro-Id"] = id
// set the topic
topic := msg.Topic()
// get proxy
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
options.Exchange = prx
}
// get the exchange
if len(options.Exchange) > 0 {
topic = options.Exchange
}
// encode message body
cf, err := r.newCodec(msg.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
b := &buffer{bytes.NewBuffer(nil)}
if err := cf(b).Write(&codec.Message{
Target: topic,
Type: codec.Event,
Header: map[string]string{
"Micro-Id": id,
"Micro-Topic": msg.Topic(),
},
}, msg.Payload()); err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
r.once.Do(func() {
r.opts.Broker.Connect()
})
return r.opts.Broker.Publish(topic, &broker.Message{
Header: md,
Body: b.Bytes(),
})
}

推荐阅读
  • MySQL日志分析在应急响应中的应用与优化策略
    在应急响应中,MySQL日志分析对于检测和应对数据库攻击具有重要意义。常见的攻击手段包括弱口令、SQL注入、权限提升和备份数据窃取。通过对MySQL日志的深入分析,不仅可以及时发现潜在的攻击行为,还能详细还原攻击过程并追踪攻击源头。此外,优化日志记录和分析策略,能够提高安全响应效率,增强系统的整体安全性。 ... [详细]
  • Go语言中Goroutine与通道机制及其异常处理深入解析
    在Go语言中,Goroutine可视为一种轻量级的并发执行单元,其资源消耗远低于传统线程,初始栈大小仅为2KB,而普通线程则通常需要几MB。此外,Goroutine的调度由Go运行时自动管理,能够高效地支持成千上万个并发任务。本文深入探讨了Goroutine的工作原理及其与通道(channel)的配合使用,特别是在异常处理方面的最佳实践,为开发者提供了一套完整的解决方案,以确保程序的稳定性和可靠性。 ... [详细]
  • Spring Boot 实战(一):基础的CRUD操作详解
    在《Spring Boot 实战(一)》中,详细介绍了基础的CRUD操作,涵盖创建、读取、更新和删除等核心功能,适合初学者快速掌握Spring Boot框架的应用开发技巧。 ... [详细]
  • 本文作为“实现简易版Spring系列”的第五篇,继前文深入探讨了Spring框架的核心技术之一——控制反转(IoC)之后,将重点转向另一个关键技术——面向切面编程(AOP)。对于使用Spring框架进行开发的开发者来说,AOP是一个不可或缺的概念。了解AOP的背景及其基本原理,对于掌握这一技术至关重要。本文将通过具体示例,详细解析AOP的实现机制,帮助读者更好地理解和应用这一技术。 ... [详细]
  • 在Hive中合理配置Map和Reduce任务的数量对于优化不同场景下的性能至关重要。本文探讨了如何控制Hive任务中的Map数量,分析了当输入数据超过128MB时是否会自动拆分,以及Map数量是否越多越好的问题。通过实际案例和实验数据,本文提供了具体的配置建议,帮助用户在不同场景下实现最佳性能。 ... [详细]
  • Go语言实现Redis客户端与服务器的交互机制深入解析
    在前文对Godis v1.0版本的基础功能进行了详细介绍后,本文将重点探讨如何实现客户端与服务器之间的交互机制。通过具体代码实现,使客户端与服务器能够顺利通信,赋予项目实际运行的能力。本文将详细解析Go语言在实现这一过程中的关键技术和实现细节,帮助读者深入了解Redis客户端与服务器的交互原理。 ... [详细]
  • MySQL:不仅仅是数据库那么简单
    MySQL不仅是一款高效、可靠的数据库管理系统,它还具备丰富的功能和扩展性,支持多种存储引擎,适用于各种应用场景。从简单的网站开发到复杂的企业级应用,MySQL都能提供强大的数据管理和优化能力,满足不同用户的需求。其开源特性也促进了社区的活跃发展,为技术进步提供了持续动力。 ... [详细]
  • 本文将介绍一种扩展的ASP.NET MVC三层架构框架,并通过使用StructureMap实现依赖注入,以降低代码间的耦合度。该方法不仅能够提高代码的可维护性和可测试性,还能增强系统的灵活性和扩展性。通过具体实践案例,详细阐述了如何在实际开发中有效应用这一技术。 ... [详细]
  • 深入解析十大经典排序算法:动画演示、原理分析与代码实现
    本文深入探讨了十种经典的排序算法,不仅通过动画直观展示了每种算法的运行过程,还详细解析了其背后的原理与机制,并提供了相应的代码实现,帮助读者全面理解和掌握这些算法的核心要点。 ... [详细]
  • 探讨 `org.openide.windows.TopComponent.componentOpened()` 方法的应用及其代码实例分析 ... [详细]
  • 题目《UVa 11978 福岛核爆问题》涉及圆与多边形交集面积的计算及二分法的应用。该问题的核心在于通过精确的几何运算与高效的算法实现来解决复杂图形的面积计算。在实现过程中,特别需要注意的是对多边形顶点的平移处理,确保所有顶点包括最后一个顶点 \( p[n] \) 都经过正确的位移,以避免因细节疏忽导致的错误。此外,使用循环次数为50次的二分法能够有效提高算法的精度和稳定性。 ... [详细]
  • 在处理历史交易表时,发现存在部分重复交易记录,需进行数据清理。为解决此问题,考虑构建一个临时表,并采用SQL Server ODBC工具进行数据的导入与导出操作,以实现高效去重。此外,建议结合索引优化和批处理技术,进一步提升数据处理效率和系统性能。 ... [详细]
  • Enhance Directives with a Dedicated Flag for Internet Explorer 11 Compatibility ... [详细]
  • 本文深入探讨了 `ExpressionChangedAfterItHasBeenCheckedError` 错误的原因及其解决方案。通过分析 Angular 的变更检测机制,详细解释了该错误的发生条件,并提供了多种有效的应对策略,帮助开发者在实际开发中避免这一常见问题。 ... [详细]
  • Android平台生活辅助应用的设计与开发实现
    随着移动互联网技术的迅猛发展,Android操作系统已成为移动设备中的主流平台。本文探讨了基于Android平台的生活辅助应用设计与开发,旨在通过创新的功能和用户友好的界面,提升用户的日常生活质量。研究不仅涵盖了应用的核心功能实现,还深入分析了用户体验优化的方法,为同类应用的开发提供了有价值的参考。 ... [详细]
author-avatar
mobiledu2502884967
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有