GAE/Pでスレッドの排他制御(synchronized)をする

Javaのsynchronized修飾子みたいなのを、デコレータで出来るようにした。
Memcacheを使ってる。
デコレートしたメソッドごとに排他制御出来るようにした。
synchronizedの引数でオプション指定が出来る。
オプションは、key / key_prefix / sleep_time / retry_count。
詳しくはソース参照。
 

memcache_lock.py

# set encoding=utf8

from google.appengine.api import memcache
import time
from types import *


UNLOCKED = 0
LOCKED = 1

KEY_PREFIX = 'MemcacheLock.'
SLEEP_TIME = 0.1
RETRY_COUNT = 5


class MemcacheLock:
    def __init__(self, key, key_prefix=KEY_PREFIX, sleep_time=SLEEP_TIME, retry_count=RETRY_COUNT):
        self.key = key_prefix + key
        self.sleep_time = sleep_time
        self.retry_count = retry_count
        self.is_locked = False
        self.lock_value = None

    def lock(self):
        if self.is_locked:
            return True

        self.lock_value = memcache.incr(self.key, initial_value=0)
        if self.lock_value==LOCKED:
            self.is_locked = True
            return True

        memcache.decr(self.key)
        return False

    def unlock(self):
        if self.is_locked:
            memcache.decr(self.key)
            self.is_locked = False
            return True
        return False

    def runSynchronized(self, func, *args, **kwargs):
        for i in xrange(self.retry_count):
            if self.lock():
                try:
                    return func(*args, **kwargs)
                finally:
                    self.unlock()
            if i+1 != self.retry_count:
                time.sleep(self.sleep_time)
        raise Exception('can not lock: %s = %d' % (self.key, self.lock_value))


# decorator

def synchronized(**options):
    def deco(func):
        if type(func)!=FunctionType:
            raise Exception('func is not function')

        # key
        options['key'] = str(options.get('key', ''))
        if len(options['key'])==0:
            options['key'] = str(func.func_code.__hash__())

        def wrap(*args, **kwargs):
            ml = MemcacheLock(**options)
            ml.runSynchronized(func, *args, **kwargs)
        return wrap
    return deco

 

使い方

# set encoding=utf8

#from google.appengine.ext import webapp
import webapp2 as webapp
from google.appengine.api import memcache
from memcache_lock import synchronized
import time

class SingleHandler(webapp.RequestHandler):
    def get(self):
        @synchronized()
        def runner(sleep_time=1.0):
            key = 'single'
            self.response.out.write('%d<br />\n' % memcache.incr(key, initial_value=0))
            time.sleep(sleep_time)
            self.response.out.write('%d<br />\n' % memcache.incr(key, initial_value=0))
            time.sleep(sleep_time)
            self.response.out.write('%d<br />\n' % memcache.incr(key, initial_value=0))
            time.sleep(sleep_time)
            self.response.out.write('%d<br />\n' % memcache.incr(key, initial_value=0))
            time.sleep(sleep_time)
            self.response.out.write('%d<br />\n' % memcache.incr(key, initial_value=0))

        try:
            runner(sleep_time=0.2)
        except Exception, e:
            self.response.out.write('exception: %s' % (str(e)))

    def post(self):
        self.get()

 

参考URL

appengineでMemcacheを使ったLockを実装した - あおうさ@日記
http://d.hatena.ne.jp/bluerabbit/20091008/1255008730