不积跬步,无以至千里;不积小流,无以成江海。

Dean's blog

  • Join Us on Facebook!
  • Follow Us on Twitter!
  • LinkedIn
  • Subcribe to Our RSS Feed

基于 Redis 实现分布式锁

日常开发中,经常需要使用到分布式锁,避免冲突的发生。在redis中提供了一个Setnx命令,可以实现类似的效果。使用Python实现类似效果:

from redis import StrictRedis, ConnectionPool
class Lock:
    def __init__(self, key):
        self.key = key
        pool = ConnectionPool(host='localhost')
        self.redis = StrictRedis(connection_pool=pool)

    def try_enter(self):
        """尝试进入,进入成功则返回True"""
        return self.redis.setnx(self.key, 1)

    def try_exit(self):
        """尝试退出,退出成功则返回True"""
        self.redis.delete(self.key)

    def __enter__(self):
        """获取失败时,会抛出LockFailureException异常"""
        isGet = self.try_enter()
        if(not isGet): raise LockFailureException()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        '''退出时调用'''
        self.try_exit()

这里使用到Python的上下文管理器协议,这个协议由两个方法实现:

__enter___

进入上下文,如果不能进入时应抛出异常

__exit__

退出上下文或执行上下文有异常时执行的方法

这样,就可以使用with语句来引用上面的类:

with(Lock('lockKey')):
     #todo .....

一个非常简单实用的分布式锁,但是这里有一个小小的缺陷,假设有一个这样的需求:

爬取某宝的一个类目所有的商品详情和评论

通过分析基本可以将爬取过程规划为三个大过程:

爬取列表

爬取商品详情

爬取商品评论

对每个过程的工作量预估可能为:

爬取列表:100页,最多为100个请求,所需时间预计为几分钟以内

爬取商品详情:列表100页 * 50个/页,最多为5000个请求,所需时间预计为几分钟至几小时

爬取商品评论:5000个商品 * 10页评论(预估值),约为50000个请求,所需时间预计为几小时至几天

可以发现每个过程所消耗的时间是不一样的。如果希望可以快速抓取,可以使用分布式爬虫,启动多个实例并发抓取。但如果不做控制,每个实例都只会过程顺序执行,这样就需要等几分钟或几小时才可以看到商品详情或评论数据了。为了实现快速反馈数据,可以分配每个过程的实例数量数量,例如:

爬取列表:1个实例

爬取商品详情:3个实例

爬取商品评论:N个实例

如果要实现上面的分布控制,那么上面的Lock就无能为力了,这个是由于Setnx命令的限制导致的。我们可以对其进行优化:

from redis import StrictRedis, ConnectionPool, WatchError

class Lock:
    '''
    基于Raids实现分布式锁
    使用方式:
        with(Lock('锁key', 2)):
            获取锁后的代码
    '''
    def __init__(self, key, max = 1):
        '''
        初始化
            key 锁对应的key
            max 最多分配的数量,默认为1
        '''
        self.key = "Locks_" + key
        self.max = max
        pool = ConnectionPool(host='localhost')
        self.redis = StrictRedis(connection_pool=pool)

    def try_enter(self):
        """尝试进入,进入成功则返回True"""
        return self.try_add(self.key, 1, self.max)

    def try_exit(self):
        """尝试退出,退出成功则返回True"""
        self.try_add(self.key, -1)

    def __enter__(self):
        """获取失败时,会抛出LockFailureException异常"""
        isGet = self.try_enter()
        if(not isGet): raise LockFailureException()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.try_exit()


    #在原增加值的基础上加上number,最大值为max
    def try_add(self, key, number, maxValue = sys.maxsize):
        with self.redis.pipeline() as pipe:
            try:
                pipe.watch(key)
                val = pipe.get(key)
                pipe.multi()
                val = 0 if val == None else int(val)
                if(val >= maxValue): return False
                val = val + number
                pipe.set(key, val)

                #如果为0则删除之
                if(val == 0): pipe.delete(key)

                pipe.execute()
                return True
            except WatchError:
                pipe.unwatch()

这里改由使用pipeline实现计数,例如实现上面的过程实例分布,可以这样子:

with(Lock('lockList')):
      #抓取商品列表

with(Lock('lockProduct', 3)):
      #抓取商品详情

#抓取商品评论

还是保留了原来的优雅实现,但应用的场景更多了。

 

不允许评论
粤ICP备17049187号-1