我遇到了一个模式和一个导致死锁问题的upsert存储过程.我有一个大概的想法,为什么这会导致死锁以及如何解决它.我可以重现它,但我没有清楚地了解导致它的步骤顺序.如果有人能够清楚地解释为什么会造成僵局,那将会很棒.
这是架构和存储过程.此代码正在PostgreSQL 9.2.2上执行.
CREATE TABLE counters ( count_type INTEGER NOT NULL, count_id INTEGER NOT NULL, count INTEGER NOT NULL ); CREATE TABLE primary_relation ( id INTEGER PRIMARY KEY, a_counter INTEGER NOT NULL DEFAULT 0 ); INSERT INTO primary_relation SELECT i FROM generate_series(1,5) AS i; CREATE OR REPLACE FUNCTION increment_count(ctype integer, cid integer, i integer) RETURNS VOID AS $$ BEGIN LOOP UPDATE counters SET count = count + i WHERE count_type = ctype AND count_id = cid; IF FOUND THEN RETURN; END IF; BEGIN INSERT INTO counters (count_type, count_id, count) VALUES (ctype, cid, i); RETURN; EXCEPTION WHEN OTHERS THEN END; END LOOP; END; $$ LANGUAGE PLPGSQL; CREATE OR REPLACE FUNCTION update_primary_a_count(ctype integer) RETURNS VOID AS $$ WITH deleted_counts_cte AS ( DELETE FROM counters WHERE count_type = ctype RETURNING * ), rollup_cte AS ( SELECT count_id, SUM(count) AS count FROM deleted_counts_cte GROUP BY count_id HAVING SUM(count) <> 0 ) UPDATE primary_relation SET a_counter = a_counter + rollup_cte.count FROM rollup_cte WHERE primary_relation.id = rollup_cte.count_id $$ LANGUAGE SQL;
这是一个重现死锁的python脚本.
import os import random import time import psycopg2 COUNTERS = 5 THREADS = 10 ITERATIONS = 500 def increment(): outf = open('synctest.out.%d' % os.getpid(), 'w') conn = psycopg2.connect(database="test") cur = conn.cursor() for i in range(0,ITERATIONS): time.sleep(random.random()) start = time.time() cur.execute("SELECT increment_count(0, %s, 1)", [random.randint(1,COUNTERS)]) conn.commit() outf.write("%f\n" % (time.time() - start)) conn.close() outf.close() def update(n): outf = open('synctest.update', 'w') conn = psycopg2.connect(database="test") cur = conn.cursor() for i in range(0,n): time.sleep(random.random()) start = time.time() cur.execute("SELECT update_primary_a_count(0)") conn.commit() outf.write("%f\n" % (time.time() - start)) conn.close() pids = [] for i in range(THREADS): pid = os.fork() if pid != 0: print 'Process %d spawned' % pid pids.append(pid) else: print 'Starting child %d' % os.getpid() increment() print 'Exiting child %d' % os.getpid() os._exit(0) update(ITERATIONS) for pid in pids: print "waiting on %d" % pid os.waitpid(pid, 0) # cleanup update(1)
我认识到这个问题的一个问题是upsert会产生重复的行(有多个写入器),这可能会导致一些重复计数.但为什么这会导致僵局?
我从PostgreSQL得到的错误如下:
process 91924 detected deadlock while waiting for ShareLock on transaction 4683083 after 100.559 ms",,,,,"SQL statement ""UPDATE counters
并且客户吐出这样的东西:
psycopg2.extensions.TransactionRollbackError: deadlock detected DETAIL: Process 91924 waits for ShareLock on transaction 4683083; blocked by process 91933. Process 91933 waits for ShareLock on transaction 4683079; blocked by process 91924. HINT: See server log for query details.CONTEXT: SQL statement "UPDATE counters SET count = count + i WHERE count_type = ctype AND count_id = cid" PL/pgSQL function increment_count(integer,integer,integer) line 4 at SQL statement
要解决此问题,您需要添加一个主键,如下所示:
ALTER TABLE counters ADD PRIMARY KEY (count_type, count_id);
任何见解将不胜感激.谢谢!