为什么我的Go通道多次返回相同的元素

 黑糖老姜茶 发布于 2023-01-30 09:04

我有一个简单的应用程序,我正在阅读MongoDB的复制oplog,将结果序列化为Go结构并将其发送到要处理的通道.目前我正在从该频道阅读并简单地打印出结构内部的值.

我尝试使用for/range读取通道中的值,直接从中读取简单的值,并将其置于带超时的select中.结果都是一样的.每次运行代码时,我都会从频道获得不同的结果.我每次都会看到频道被写入一次但是从该频道读取有时我会读出相同的值1-3,有时甚至是4次,即使只有一次写入.

这通常仅在初始加载时发生(拉入较旧的记录),并且在读取对通道的实时添加时似乎不会发生.是否存在一些问题,即在第一次读取该项目之前,从该频道读取的速度过快发生?

package main

import (
    "fmt"
    "labix.org/v2/mgo"
    "labix.org/v2/mgo/bson"
)

type Operation struct {
    Id        int64  `bson:"h" json:"id"`
    Operator  string `bson:"op" json:"operator"`
    Namespace string `bson:"ns" json:"namespace"`
    Select    bson.M `bson:"o" json:"select"`
    Update    bson.M `bson:"o2" json:"update"`
    Timestamp int64  `bson:"ts" json:"timestamp"`
}

func Tail(collection *mgo.Collection, Out chan<- *Operation) {
    iter := collection.Find(nil).Tail(-1)
    var oper *Operation

    for {
        for iter.Next(&oper) {
            fmt.Println("\n<<", oper.Id)
            Out <- oper
        }

        if err := iter.Close(); err != nil {
            fmt.Println(err)
            return
        }
    }
}

func main() {
    session, err := mgo.Dial("127.0.0.1")

    if err != nil {
        panic(err)
    }
    defer session.Close()

    c := session.DB("local").C("oplog.rs")

    cOper := make(chan *Operation, 1)

    go Tail(c, cOper)

    for operation := range cOper {
        fmt.Println()
        fmt.Println("Id: ", operation.Id)
        fmt.Println("Operator: ", operation.Operator)
        fmt.Println("Namespace: ", operation.Namespace)
        fmt.Println("Select: ", operation.Select)
        fmt.Println("Update: ", operation.Update)
        fmt.Println("Timestamp: ", operation.Timestamp)
    }
}

Caleb.. 5

我认为你正在重复使用*Operation导致问题的因素.例如:

http://play.golang.org/p/_MeSBLWPwN

c := make(chan *int, 1)

go func() {
    val := new(int)
    for i :=0; i<10; i++ {
        *val = i
        c <- val
    }
    close(c)
}()


for val := range c {
    time.Sleep(time.Millisecond * 1)
    fmt.Println(*val)
}

此代码导致:

2
3
4
5
6
7
8
9
9
9

更重要的是,它不是线程安全的.尝试这样做可能:

for {
    for { 
        var oper *Operation
        if !iter.Next(&oper) {
            break
        }
        fmt.Println("\n<<", oper.Id)
        Out <- oper
    }
    ...
}

或者使用普通Operation而不是*Operation.(因为没有指针,值被复制)

1 个回答
  • 我认为你正在重复使用*Operation导致问题的因素.例如:

    http://play.golang.org/p/_MeSBLWPwN

    c := make(chan *int, 1)
    
    go func() {
        val := new(int)
        for i :=0; i<10; i++ {
            *val = i
            c <- val
        }
        close(c)
    }()
    
    
    for val := range c {
        time.Sleep(time.Millisecond * 1)
        fmt.Println(*val)
    }
    

    此代码导致:

    2
    3
    4
    5
    6
    7
    8
    9
    9
    9
    

    更重要的是,它不是线程安全的.尝试这样做可能:

    for {
        for { 
            var oper *Operation
            if !iter.Next(&oper) {
                break
            }
            fmt.Println("\n<<", oper.Id)
            Out <- oper
        }
        ...
    }
    

    或者使用普通Operation而不是*Operation.(因为没有指针,值被复制)

    2023-01-30 09:07 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有