我正在尝试建立一个系统,优雅地将数据库操作推迟到一个单独的线程,以避免在Twisted回调期间阻塞.
到目前为止,这是我的方法:
from contextlib import contextmanager from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session, sessionmaker from twisted.internet.threads import deferToThread _engine = create_engine(initialization_string) Session = scoped_session(sessionmaker(bind=_engine)) @contextmanager def transaction_context(): session = Session() try: yield session session.commit() except: # No need to do session.rollback(). session.remove will do it. raise finally: session.remove() def threaded(fn): @wraps(fn) def wrapper(*args, **kwargs): return deferToThread(fn, *args, **kwargs) return wrapper
这应该允许我用threaded
装饰器包装一个函数,然后transaction_context
在所述函数体中使用上下文管理器.以下是一个例子:
from __future__ import print_function from my_lib.orm import User, transaction_context, threaded from twisted.internet import reactor @threaded def get_n_users(n): with transaction_context() as session: return session.query(User).limit(n).all() if __name__ == '__main__': get_n_users(n).addBoth(len) reactor.run()
但是,当我运行上面的脚本时,我得到一个包含以下回溯的失败:
Unhandled error in Deferred: Unhandled Error Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 781, in __bootstrap self.__bootstrap_inner() File "/usr/lib/python2.7/threading.py", line 808, in __bootstrap_inner self.run() File "/usr/lib/python2.7/threading.py", line 761, in run self.__target(*self.__args, **self.__kwargs) ------ File "/usr/local/lib/python2.7/dist-packages/twisted/python/threadpool.py", line 191, in _worker result = context.call(ctx, function, *args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/twisted/python/context.py", line 118, in callWithContext return self.currentContext().callWithContext(ctx, func, *args, **kw) File "/usr/local/lib/python2.7/dist-packages/twisted/python/context.py", line 81, in callWithContext return func(*args,**kw) File "testaccess.py", line 9, in get_n_users return session.query(User).limit(n).all() File "/usr/lib/python2.7/contextlib.py", line 24, in __exit__ self.gen.next() File "/home/louis/Documents/Python/knacki/knacki/db.py", line 36, in transaction_context session.remove() exceptions.AttributeError: 'Session' object has no attribute 'remove'
我根本没想到这一点.我错过了什么?我没有scoped_session
正确地实例化吗?
编辑: 这是一个关于将此设置与Twisted集成的相关问题.它可能有助于澄清我正在努力实现的目标.
简短的回答
打电话.remove()
的Session
,不是session
.
答案很长:
scoped_session
并没有真正回归一Session
堂课.相反,它创建了一个对象,该对象关注它所调用的线程.调用它将返回Session
与该线程关联的现有实例或关联新线程并返回该线程.一个线程本地就是一个线程与会话关联.
对象remove
上的方法scoped_session
删除当前与调用它的线程关联的会话对象.这意味着它是相反的scoped_session.__call__
,这是一种令人困惑的API.
这是一个简短的Python脚本来说明行为.
import threading from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session, sessionmaker _engine = create_engine('sqlite:///:memory:') Session = scoped_session(sessionmaker(_engine)) def scoped_session_demo(remove=False): ids = [] def push_ids(): thread_name = threading.currentThread().getName() data = [thread_name] data.append(Session()) if remove: Session.remove() data.append(Session()) ids.append(data) t = threading.Thread(target=push_ids) t.start() t.join() push_ids() sub_thread, main_thread = ids sub_name, sub_session_a, sub_session_b = sub_thread main_name, main_session_a, main_session_b = main_thread print sub_name, sub_session_a == sub_session_b print main_name, main_session_a == main_session_b print sub_name, '==', main_name, sub_session_a == main_session_b print 'Without remove:' scoped_session_demo() print 'With remove:' scoped_session_demo(True)
它的输出:
Without remove: Thread-1 True MainThread True Thread-1 == MainThread False With remove: Thread-2 False MainThread False Thread-2 == MainThread False