日常开发中,经常需要使用到分布式锁,避免冲突的发生。在redis中提供了一个Setnx命令,可以实现类似的效果。使用Python实现类似效果:
from redis import StrictRedis, ConnectionPool
class Lock:
def __init__(self, key):
self.key = key
pool = ConnectionPool(host='localhost')
self.redis = StrictRedis(connection_pool=pool)
def try_enter(self):
"""尝试进入,进入成功则返回True"""
return self.redis.setnx(self.key, 1)
def try_exit(self):
"""尝试退出,退出成功则返回True"""
self.redis.delete(self.key)
def __enter__(self):
"""获取失败时,会抛出LockFailureException异常"""
isGet = self.try_enter()
if(not isGet): raise LockFailureException()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
'''退出时调用'''
self.try_exit()
这里使用到Python的上下文管理器协议,这个协议由两个方法实现:
__enter___
进入上下文,如果不能进入时应抛出异常
__exit__
退出上下文或执行上下文有异常时执行的方法
这样,就可以使用with语句来引用上面的类:
with(Lock('lockKey')):
#todo .....
一个非常简单实用的分布式锁,但是这里有一个小小的缺陷,假设有一个这样的需求:
爬取某宝的一个类目所有的商品详情和评论
通过分析基本可以将爬取过程规划为三个大过程:
爬取列表
爬取商品详情
爬取商品评论
对每个过程的工作量预估可能为:
爬取列表:100页,最多为100个请求,所需时间预计为几分钟以内
爬取商品详情:列表100页 * 50个/页,最多为5000个请求,所需时间预计为几分钟至几小时
爬取商品评论:5000个商品 * 10页评论(预估值),约为50000个请求,所需时间预计为几小时至几天
可以发现每个过程所消耗的时间是不一样的。如果希望可以快速抓取,可以使用分布式爬虫,启动多个实例并发抓取。但如果不做控制,每个实例都只会过程顺序执行,这样就需要等几分钟或几小时才可以看到商品详情或评论数据了。为了实现快速反馈数据,可以分配每个过程的实例数量数量,例如:
爬取列表:1个实例
爬取商品详情:3个实例
爬取商品评论:N个实例
如果要实现上面的分布控制,那么上面的Lock就无能为力了,这个是由于Setnx命令的限制导致的。我们可以对其进行优化:
from redis import StrictRedis, ConnectionPool, WatchError
class Lock:
'''
基于Raids实现分布式锁
使用方式:
with(Lock('锁key', 2)):
获取锁后的代码
'''
def __init__(self, key, max = 1):
'''
初始化
key 锁对应的key
max 最多分配的数量,默认为1
'''
self.key = "Locks_" + key
self.max = max
pool = ConnectionPool(host='localhost')
self.redis = StrictRedis(connection_pool=pool)
def try_enter(self):
"""尝试进入,进入成功则返回True"""
return self.try_add(self.key, 1, self.max)
def try_exit(self):
"""尝试退出,退出成功则返回True"""
self.try_add(self.key, -1)
def __enter__(self):
"""获取失败时,会抛出LockFailureException异常"""
isGet = self.try_enter()
if(not isGet): raise LockFailureException()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.try_exit()
#在原增加值的基础上加上number,最大值为max
def try_add(self, key, number, maxValue = sys.maxsize):
with self.redis.pipeline() as pipe:
try:
pipe.watch(key)
val = pipe.get(key)
pipe.multi()
val = 0 if val == None else int(val)
if(val >= maxValue): return False
val = val + number
pipe.set(key, val)
#如果为0则删除之
if(val == 0): pipe.delete(key)
pipe.execute()
return True
except WatchError:
pipe.unwatch()
这里改由使用pipeline实现计数,例如实现上面的过程实例分布,可以这样子:
with(Lock('lockList')):
#抓取商品列表
with(Lock('lockProduct', 3)):
#抓取商品详情
#抓取商品评论
还是保留了原来的优雅实现,但应用的场景更多了。