我有一个简单的应用程序,我正在阅读MongoDB的复制oplog,将结果序列化为Go结构并将其发送到要处理的通道.目前我正在从该频道阅读并简单地打印出结构内部的值.
我尝试使用for/range读取通道中的值,直接从中读取简单的值,并将其置于带超时的select中.结果都是一样的.每次运行代码时,我都会从频道获得不同的结果.我每次都会看到频道被写入一次但是从该频道读取有时我会读出相同的值1-3,有时甚至是4次,即使只有一次写入.
这通常仅在初始加载时发生(拉入较旧的记录),并且在读取对通道的实时添加时似乎不会发生.是否存在一些问题,即在第一次读取该项目之前,从该频道读取的速度过快发生?
package main import ( "fmt" "labix.org/v2/mgo" "labix.org/v2/mgo/bson" ) type Operation struct { Id int64 `bson:"h" json:"id"` Operator string `bson:"op" json:"operator"` Namespace string `bson:"ns" json:"namespace"` Select bson.M `bson:"o" json:"select"` Update bson.M `bson:"o2" json:"update"` Timestamp int64 `bson:"ts" json:"timestamp"` } func Tail(collection *mgo.Collection, Out chan<- *Operation) { iter := collection.Find(nil).Tail(-1) var oper *Operation for { for iter.Next(&oper) { fmt.Println("\n<<", oper.Id) Out <- oper } if err := iter.Close(); err != nil { fmt.Println(err) return } } } func main() { session, err := mgo.Dial("127.0.0.1") if err != nil { panic(err) } defer session.Close() c := session.DB("local").C("oplog.rs") cOper := make(chan *Operation, 1) go Tail(c, cOper) for operation := range cOper { fmt.Println() fmt.Println("Id: ", operation.Id) fmt.Println("Operator: ", operation.Operator) fmt.Println("Namespace: ", operation.Namespace) fmt.Println("Select: ", operation.Select) fmt.Println("Update: ", operation.Update) fmt.Println("Timestamp: ", operation.Timestamp) } }
Caleb.. 5
我认为你正在重复使用*Operation
导致问题的因素.例如:
http://play.golang.org/p/_MeSBLWPwN
c := make(chan *int, 1) go func() { val := new(int) for i :=0; i<10; i++ { *val = i c <- val } close(c) }() for val := range c { time.Sleep(time.Millisecond * 1) fmt.Println(*val) }
此代码导致:
2 3 4 5 6 7 8 9 9 9
更重要的是,它不是线程安全的.尝试这样做可能:
for { for { var oper *Operation if !iter.Next(&oper) { break } fmt.Println("\n<<", oper.Id) Out <- oper } ... }
或者使用普通Operation
而不是*Operation
.(因为没有指针,值被复制)
我认为你正在重复使用*Operation
导致问题的因素.例如:
http://play.golang.org/p/_MeSBLWPwN
c := make(chan *int, 1) go func() { val := new(int) for i :=0; i<10; i++ { *val = i c <- val } close(c) }() for val := range c { time.Sleep(time.Millisecond * 1) fmt.Println(*val) }
此代码导致:
2 3 4 5 6 7 8 9 9 9
更重要的是,它不是线程安全的.尝试这样做可能:
for { for { var oper *Operation if !iter.Next(&oper) { break } fmt.Println("\n<<", oper.Id) Out <- oper } ... }
或者使用普通Operation
而不是*Operation
.(因为没有指针,值被复制)