如何判断对sys.stdin.readline()(或者更常见的是基于任何基于文件描述符的文件对象的readline())的调用是否会阻塞?
当我在python中编写基于行的文本过滤程序时,会出现这种情况.也就是说,程序重复从输入中读取一行文本,可能会对其进行转换,然后将其写入输出.
我想实现一个合理的输出缓冲策略.我的标准是:
在批量处理数百万行时应该是有效的 - 主要是缓冲输出,偶尔刷新.
在保持缓冲输出时,它永远不应阻塞输入.
因此,无缓冲输出是不好的,因为它违反了(1)(对操作系统的写入太多).并且线路缓冲输出并不好,因为它仍然违反(1)(在批量的每一百万行上将输出刷新到OS是没有意义的).并且默认缓冲输出不好,因为它违反了(2)(如果输出到文件或管道,它将不适当地保留输出).
在大多数情况下,我认为一个好的解决方案是:"刷新sys.stdout(其缓冲区已满或)sys.stdin.readline()即将阻塞".可以实施吗?
(注意,我并不认为这种策略对于所有情况都是完美的.例如,在程序受cpu约束的情况下,它可能不理想;在这种情况下,更频繁地冲洗可能是明智的,以避免在保留输出的同时进行长时间的计算.)
对于确定性,假设我在python中实现了unix的"cat -n"程序.
(实际上"cat -n"比一次一行更聪明;也就是说,它知道如何在读取整行之前读取和写入部分行;但是,对于这个例子,我要去无论如何,一次一行地实施它.)
行缓冲实现
(表现良好,但违反了标准(1),即因为冲洗太多而无法缓慢):
#!/usr/bin/python
# cat-n.linebuffered.py
import sys
num_lines_read = 0
while True:
line = sys.stdin.readline()
if line == '': break
num_lines_read += 1
print("%d: %s" % (num_lines_read, line))
sys.stdout.flush()
默认缓冲实现
(快速但违反标准(2),即不友好的输出预扣)
#!/usr/bin/python
# cat-n.defaultbuffered.py
import sys
num_lines_read = 0
while True:
line = sys.stdin.readline()
if line == '': break
num_lines_read += 1
print("%d: %s" % (num_lines_read, line))
期望的实施:
#!/usr/bin/python
num_lines_read = 0
while True:
if sys_stdin_readline_is_about_to_block(): # <--- How do I implement this??
sys.stdout.flush()
line = sys.stdin.readline()
if line == '': break
num_lines_read += 1
print("%d: %s" % (num_lines_read, line))
所以问题是:是否可以实施sys_stdin_readline_is_about_to_block()
?
我想要一个适用于python2和python3的答案.我已经研究了以下每种技术,但到目前为止还没有任何进展.
用于select([sys.stdin],[],[],0)
确定从sys.stdin读取是否会阻止.(当sys.stdin是一个缓冲的文件对象时,这不起作用,至少有一个原因可能有两个原因:(1)如果部分行准备从底层输入管道读取,它将错误地说"不会阻塞", (2)如果sys.stdin的缓冲区包含一个完整的输入行,但是底层管道还没有准备好进行额外的读取,它会错误地说"将阻塞"...我认为).
非阻塞io,使用os.fdopen(sys.stdin.fileno(), 'r')
和fcntl
with O_NONBLOCK
(我无法在任何python版本中使用readline():在python2.7中,只要部分行进入就会丢失输入;在python3中,似乎无法区分在"阻止"和输入结束之间.??)
asyncio(我不清楚python2中有什么可用;我不认为它适用于sys.stdin;但是,我仍然对只有从子进程返回的管道读取时才有效的答案感兴趣.Popen()).
创建一个线程来执行readline()
循环并通过queue.Queue将每一行传递给主程序.然后主程序可以在从队列中读取每一行之前轮询队列,每当它看到它即将阻塞时,首先刷新stdout.(我试过这个,实际上让它工作,见下文,但它非常慢,比线缓冲慢得多.)
螺纹实施:
请注意,这并没有严格回答"如何判断sys.stdin.readline()是否会阻止"的问题,但它仍设法实现所需的缓冲策略.但这太慢了.
#!/usr/bin/python
# cat-n.threaded.py
import queue
import sys
import threading
def iter_with_abouttoblock_cb(callable, sentinel, abouttoblock_cb, qsize=100):
# child will send each item through q to parent.
q = queue.Queue(qsize)
def child_fun():
for item in iter(callable, sentinel):
q.put(item)
q.put(sentinel)
child = threading.Thread(target=child_fun)
# The child thread normally runs until it sees the sentinel,
# but we mark it daemon so that it won't prevent the parent
# from exiting prematurely if it wants.
child.daemon = True
child.start()
while True:
try:
item = q.get(block=False)
except queue.Empty:
# q is empty; call abouttoblock_cb before blocking
abouttoblock_cb()
item = q.get(block=True)
if item == sentinel:
break # do *not* yield sentinel
yield item
child.join()
num_lines_read = 0
for line in iter_with_abouttoblock_cb(sys.stdin.readline,
sentinel='',
abouttoblock_cb=sys.stdout.flush):
num_lines_read += 1
sys.stdout.write("%d: %s" % (num_lines_read, line))
验证缓冲行为:
以下命令(在linux上的bash中)显示了预期的缓冲行为:"defaultbuffered"缓冲区过于激进,而"linebuffered"和"threaded"缓冲区恰到好处.
(注意,| cat
管道的末尾是默认情况下使用python块缓冲区而不是行缓冲区.)
for which in defaultbuffered linebuffered threaded; do
for python in python2.7 python3.5; do
echo "$python cat-n.$which.py:"
(echo z; echo -n a; sleep 1; echo b; sleep 1; echo -n c; sleep 1; echo d; echo x; echo y; echo z; sleep 1; echo -n e; sleep 1; echo f) | $python cat-n.$which.py | cat
done
done
输出:
python2.7 cat-n.defaultbuffered.py:
[... pauses 5 seconds here. Bad! ...]
1: z
2: ab
3: cd
4: x
5: y
6: z
7: ef
python3.5 cat-n.defaultbuffered.py:
[same]
python2.7 cat-n.linebuffered.py:
1: z
[... pauses 1 second here, as expected ...]
2: ab
[... pauses 2 seconds here, as expected ...]
3: cd
4: x
5: y
6: z
[... pauses 2 seconds here, as expected ...]
6: ef
python3.5 cat-n.linebuffered.py:
[same]
python2.7 cat-n.threaded.py:
[same]
python3.5 cat-n.threaded.py:
[same]
时序:
(在linux上的bash中):
for which in defaultbuffered linebuffered threaded; do
for python in python2.7 python3.5; do
echo -n "$python cat-n.$which.py: "
timings=$(time (yes 01234567890123456789012345678901234567890123456789012345678901234567890123456789 | head -1000000 | $python cat-n.$which.py >| /tmp/REMOVE_ME) 2>&1)
echo $timings
done
done
/bin/rm /tmp/REMOVE_ME
输出:
python2.7 cat-n.defaultbuffered.py: real 0m1.490s user 0m1.191s sys 0m0.386s
python3.5 cat-n.defaultbuffered.py: real 0m1.633s user 0m1.007s sys 0m0.311s
python2.7 cat-n.linebuffered.py: real 0m5.248s user 0m2.198s sys 0m2.704s
python3.5 cat-n.linebuffered.py: real 0m6.462s user 0m3.038s sys 0m3.224s
python2.7 cat-n.threaded.py: real 0m25.097s user 0m18.392s sys 0m16.483s
python3.5 cat-n.threaded.py: real 0m12.655s user 0m11.722s sys 0m1.540s
重申一下,我想要一个永远不会阻塞的解决方案,同时保持缓冲输出("linebuffered"和"thread"在这方面都很好),而且速度也很快:即速度与"defaultbuffered"相当.