[python]子プロセス中で行うMySQLへのクエリをabortしたい時はpymysqlを使えば良いという話

最近prefork型デーモンとしてクローラを実装していて、子プロセスで発行中のクエリをabortしたいシーンが割とあるのですが、標準のMySQLdbには中断する手段が準備されていません。コードまで終えなかったのですが、この辺の情報によるとMySQLへのクエリを発行する箇所がCで書かれている為にVMがクエリ終了までブロックしてしまうようです。それは困るので、あれこれ試行錯誤を繰り返したんですが、結論から言えばpymysqlという別のMySQLドライバを使うと発行中のクエリをabortできました。

想定しているのは以下のコードになります。子プロセスを3つforkし、runを実行させています。runの中ではSELECT SLEEP(30)というSQLを実行しています。親プロセスにシグナルが送られた際に子プロセスに対しても同様のシグナルを送信しているので、期待する挙動は親プロセスを殺した際に子プロセスも同時に死ぬというものですが、実際はそうはなりません。

#!/usr/bin/python
# -*- coding: utf-8 -*-

import MySQLdb, pymysql
import signal, os, sys

workers = {}

def run():
  # 後で下の2行は入れ替える
  con = MySQLdb.connect(host='localhost', db='test', user='testuser', passwd='password')
  #con = pymysql.connect(host='localhost', db='test', user='testuser', passwd='password')
  signal.signal(signal.SIGTERM, lambda sig, status: sys.exit(0))
  cur = con.cursor()
  cur.execute("SELECT SLEEP(30)")

def killall(sig, status):
  for pid in workers.keys():
    os.kill(pid, signal.SIGTERM)

def waitall():
  for pid in workers.keys():
    try:
      os.waitpid(pid, 0)
    except:
      print "waitpid: interrupted exception"

def main():
  print os.getpid()
  signal.signal(signal.SIGTERM, killall)
  for i in range(3):
    pid = os.fork()
    if pid == 0:
      try:
        run()
      except:
        print "run: interrupted exception" # ここで中断処理
        sys.exit(0)
    else:
      workers[pid] = 1
  waitall()

if __name__ == '__main__':
  main()

実際にこのプログラムを実行して、親プロセスを殺して見たのが以下です。親プロセスを殺しても子プロセスが残っています。子プロセスに対してSIGKILLを送るとクエリ発行中であっても殺せるのですが、流石に普段運用でSIGKILLを使うのは抵抗があります。(上記プログラムではSIGTERMを使用)

$ ./mysqldb_test.py &
[1] 87020
$ kill 87020
waitpid: interrupted exception
$ ps a | grep mysqldb_test.py
87020 s003  SN     0:00.24 /System/Library/Frameworks/Python.framework/Versions/2.5/Resources/Python.app/Contents/MacOS/Python ./mysqldb_test.py
87022 s003  SN     0:00.08 /System/Library/Frameworks/Python.framework/Versions/2.5/Resources/Python.app/Contents/MacOS/Python ./mysqldb_test.py
87023 s003  SN     0:00.08 /System/Library/Frameworks/Python.framework/Versions/2.5/Resources/Python.app/Contents/MacOS/Python ./mysqldb_test.py
87024 s003  SN     0:00.08 /System/Library/Frameworks/Python.framework/Versions/2.5/Resources/Python.app/Contents/MacOS/Python ./mysqldb_test.py

そこで今度は先ほどのプログラムのrun内にあったコメントを外して、pymysqlを有効化してMySQLdbの行と入れ替えて見ます。pymysqlはpure pythonで記述されたMySQLのpythonバインディングで、使い方も同じです。ソースコードまで追ってはいませんが、中断処理を隠蔽せずにinterrupt exceptionを発火してくれるようです。pymysqlを使用して再度、先ほどのオペレーションを行ってみたのが以下です。

$ ./mysqldb_test.py &
[1] 87052
$ kill 87052
$ run: interrupted exception
waitpid: interrupted exception
run: interrupted exception
run: interrupted exception
[1]  + done       ./mysqldb_test.py
$ ps a | grep mysqldb_test.py
$ 

子プロセスもきちんと死んでいますね。また出力を見ると解る通り、interrupt exceptionをキャッチすれば、例外処理も普通に行うことが出来ます。恐らくパフォーマンスロスはあると思いますが、今回の例ではpymysqlを使うのが賢そうですね。

3 thoughts on “[python]子プロセス中で行うMySQLへのクエリをabortしたい時はpymysqlを使えば良いという話”

  1. MySQLdbでも以下のような面倒な手順を踏めばうまく子プロセスが死んでくれそうです。
    以下の例では、子プロセスを殺す前に、子プロセスが実行しているMySQLのクエリの実行を殺しています。
    こうすることで、子プロセスがブロックされる原因自体がなくなるため、問題なく子プロセスが殺せます。

    というか、killやctrl-cで、OSのプロセスだけでなく、MySQL内で実行中のクエリも止めたいと思いググっていたところこのWebサイトに辿り着きました。
    とても参考になりました。

    —————————————————-
    #!/usr/bin/python
    # coding: UTF-8

    import MySQLdb
    import signal, os, sys

    workers = {}
    END_SIGNAL = “\n”

    def connect():
    con = MySQLdb.connect(host=’localhost’, db=’test’, user=’testuser’, passwd=’password’)
    return con

    def get_mysql_pid(con):
    cur = con.cursor()
    cur.execute(‘SELECT CONNECTION_ID()’) #自分自身のmysqlセッションを取得
    return cur.fetchall()[0][0]

    def kill_mysql_pid(con,mysql_pid):
    cur = con.cursor()
    cur.execute(‘KILL ‘ + mysql_pid) #mysqlセッションをKILL

    def run(con, query):
    signal.signal(signal.SIGTERM, lambda sig, status: sys.exit(0))
    cur = con.cursor()
    cur.execute(query)

    def killall(sig, status):
    con = connect()
    for pid in workers.keys():
    kill_mysql_pid(con,workers[pid]) #子プロセスのmysqlセッションをKILLする
    os.kill(pid, signal.SIGTERM)

    def waitall():
    for pid in workers.keys():
    try:
    os.waitpid(pid, 0)
    except:
    print “waitpid: interrupted exception”

    def main():
    signal.signal(signal.SIGINT, killall)
    signal.signal(signal.SIGTERM, killall)
    r_num, w_num = os.pipe()
    pid = os.fork()
    if pid == 0:
    os.close(r_num)
    wpipe = os.fdopen(w_num, ‘w’)
    con = connect()
    mysql_pid = “%d\n” % get_mysql_pid(con) #親プロセスへ子プロセスのmysqlセッションidを送る
    wpipe.write(mysql_pid)
    wpipe.flush()
    try:
    run(con, “SELECT SLEEP(30)”)
    except:
    print “run: interrupted exception”
    sys.exit(0)
    else:
    os.close(w_num)
    rpipe = os.fdopen(r_num, ‘r’)
    mysql_pid = rpipe.readline()
    mysql_pid = mysql_pid[:-1]
    workers[pid] = mysql_pid #子プロセスのmysqlセッションidを保存しておく
    print workers[pid]
    waitall()

    if __name__ == ‘__main__’:
    main()
    —————————————————-

  2. インデントが崩れたので代わりにgistにコードを上げておきました。
    https://gist.github.com/2902213
    あとpipeが面倒だったので、子プロセスは1個しか作らないようにしてます。
    適当ですみません。

Leave a Reply

Your email address will not be published. Required fields are marked *