我有多个Python脚本使用pyMongo写入Mongodb.另一个Python脚本如何观察Mongo查询的更改并在发生更改时执行某些功能?设置了启用oplog的mongodb.
我在前一段时间用Python编写了MongoDB的增量备份工具.该工具通过拖尾来监控数据变化oplog
.这是代码的相关部分.
更新的答案,pymongo 3
from time import sleep from pymongo import MongoClient, ASCENDING from pymongo.cursor import CursorType from pymongo.errors import AutoReconnect # Time to wait for data or connection. _SLEEP = 1.0 if __name__ == '__main__': oplog = MongoClient().local.oplog.rs stamp = oplog.find().sort('$natural', ASCENDING).limit(-1).next()['ts'] while True: kw = {} kw['filter'] = {'ts': {'$gt': stamp}} kw['cursor_type'] = CursorType.TAILABLE_AWAIT kw['oplog_replay'] = True cursor = oplog.find(**kw) try: while cursor.alive: for doc in cursor: stamp = doc['ts'] print(doc) # Do something with doc. sleep(_SLEEP) except AutoReconnect: sleep(_SLEEP)
另请参阅http://api.mongodb.com/python/current/examples/tailable.html.
原始答案,pymongo 2
from time import sleep from pymongo import MongoClient from pymongo.cursor import _QUERY_OPTIONS from pymongo.errors import AutoReconnect from bson.timestamp import Timestamp # Tailable cursor options. _TAIL_OPTS = {'tailable': True, 'await_data': True} # Time to wait for data or connection. _SLEEP = 10 if __name__ == '__main__': db = MongoClient().local while True: query = {'ts': {'$gt': Timestamp(some_timestamp, 0)}} # Replace with your query. cursor = db.oplog.rs.find(query, **_TAIL_OPTS) cursor.add_option(_QUERY_OPTIONS['oplog_replay']) try: while cursor.alive: try: doc = next(cursor) # Do something with doc. except (AutoReconnect, StopIteration): sleep(_SLEEP) finally: cursor.close()