Redis getset()方法_Redis使用setnx實現分佈式鎖和issues,優化
最近,我一直在工作中使用分佈式鎖定,並且看到了許多實現分佈式鎖定的方法。 我對redis比較熟悉,或者說是比較好用,所以看了一下redis是如何使用setnx實現分佈式鎖的。其中一篇文章被搜索最多,我不確定哪一篇是原創文章,所以請把你看到的鏈接貼出來https://blog.csdn.net/lihao21/article/details/49104695。
reids > setnx(key,value) //设置key.redis > delete(key) //将key删除
复制代码
如果 key 存在,則 set 將失敗並返回 -1。如果密鑰不存在,則成功返回 0。
實現方法一:
lock_key = "distribute_lock"def get_lock(): while True: lock = redis_client.setnx(lock_key,1) if lock: # 设置成功 break else: time.sleep(0.5) #如果没有获取成功,等待0.5继续获取,当然这个地方你也可以设置重试次数。 return Trueif get_lock(): # do your work,处理临界资源 redis_client.delete(lock_key) //为防止死锁,处理完临界资源要及时释放锁。
你知道這段代碼有什麼問題嗎?
問題在於鎖的最終釋放,如果進程在處理關鍵資源時掛起,或者在執行刪除操作時redis鏈接斷開,分佈式鎖將不會釋放,會發生死鎖。所以接下來的優化是在進程掛起時釋放鎖。超時機制。
實現方法二:
添加了超時機制以避免模式 1 問題。
setnx(key, lock timeout + 1>)
本質是將key的值設置為超時,按照方法1的流程。
lock_key = "distribute_lock"def get_lock(): while True: lock = redis_client.setnx(lock_key,lock if lock: # 设置成功 break now_time = <current Unix time + lock timeout + 1> lock_time_out = redis.client.get(lock_key) if now_time > lock_time_out: #检查是否超时 # 如果已经超时 redis_client.delete(lock_key) # 将这个key删除,进行重新设置。 lock = redis_client.setnx(lock_key,lock if lock: # 设置成功 break else: time.sleep(0.5) #如果没有获取成功,等待0.5继续获取,当然这个地方你也可以设置重试次数。 return Trueif get_lock(): # do your work,处理临界资源 redis_client.delete(lock_key) //为防止死锁,处理完临界资源要及时释放锁。
去想想吧。這有什麼問題?
假設進程 p1 獲得了一個鎖,並且進程 p2 和 p3 不斷地檢查鎖是否超時。
然後p1進程掛了,沒有及時解除鎖。
一段時間後,p2 和 p3 都檢測到它們的鎖已超時。即程序的 now_time > lock_time_out 成立。
首先,p2 移除鎖,然後將鎖設置為超時,p2 獲取鎖。
p3也檢測到鎖超時了,但是運行比較慢,直接解除鎖,其實p3解除了p2設置的鎖,p3和p2都獲得了鎖。
實施方法3
第二種實現的主要問題是什麼? 當 p3 移除鎖時,它不檢查是否有新進程獲得了鎖。為避免這種情況,請在 p3 執行配置操作時使用以下命令:
getset(lock_key,now_time+lock timeout + 1)
此命令返回舊值並設置新值。設置前,判斷當前時間是否大於lock_key的舊值。如果大於,則表示超時已過,已獲取鎖。如果再次發生上述情況,p2 和 p3 將同時看到鎖超時,p2 將丟棄並獲取鎖。 p3 執行 getset 操作,將當前時間與 lock_key 的舊值(由 p2 設置)進行比較。當前時間小於舊值。獲取鎖失敗。繼續下一輪等待。
其次,當你最終刪除它時,你不能像前幾次那樣直接刪除它。首先判斷當前時間是否小於鎖超時時間並刪除。不要刪除其他進程設置的鎖。以下是步驟:
def get_lock(): LOCK_TIMEOUT = 3 lock = 0 lock_timeout = 0 lock_key = 'distribute_lock' # 获取锁 while lock != 1: now = int(time.time()) lock_timeout = now + LOCK_TIMEOUT + 1 lock = redis_client.setnx(lock_key, lock_timeout) if lock == 1 or (now > int(redis_client.get(lock_key))) and now > int(redis_client.getset(lock_key, lock_timeout)): break else: time.sleep(0.001)if get_lock(): # dou your lock.处理临界资源() # 释放锁 now = int(time.time()) if now < lock_timeout: redis_client.delete(lock_key)
繼續讀下去,怎麼了?
為什麼是當前的Unix時間+鎖超時+1+1?
+1 因為 Redis 2.4 的過期延遲小於 1 秒。這意味著即使您的密鑰已過期,您仍然可以在過期後的一秒內訪問它,並且新的 Redis 2.6 版本減少了延遲。 1毫秒內。
這段代碼有什麼問題? 1) 客戶端會生成自己的過期時間,因此需要在分佈式環境中為每個客戶端強制時間同步。 2)當鎖過期時,多個客戶端同時執行jedis.getSet()方法時,只有一個客戶端可以加鎖,但是這個客戶端的鎖過期會被其他客戶端覆蓋。 3) 鎖沒有所有者 ID。任何客戶端都可以解鎖。
(1)存在第一個問題,一般同一個分佈組的項目需要時間同步。一般是互聯網設置的時區時間來同步互聯網的全局設置。 (2) Redis是單線程的,所以我沒有你說的情況。它們不能同時運行。甚至客戶端 A、B 和 C 同時發送 getset 到 redis(納秒精度)。 Redis 也按隊列順序順序運行。所以保證只有一個客戶端獲取鎖,做一些業務,最後釋放鎖,其他客戶端肯定會返回大於當前時間的結果,可能導致鎖失敗。是不可能的。允許其他客戶端解鎖。 您可能會說“同時”。但是當你說“同時”的時候,我知道你並不了解redis的底層,如果你改變redis的單線程設計理念,你就不會認同redis作者的理念。不過redis的單線程並不影響它的高性能。
我實際發現的問題。
if lock == 1 or (now > int(redis_client.get(lock_key))) and now > int(redis_client.getset(lock_key, lock_timeout))
解除鎖後,如果碰巧被 redis_client.get(lock_key) 查到了,此時鎖也被解除了,因此出現了這個bug。所以 redis_client.get(lock_key) 是 None,int(None)。發生類型轉換錯誤。那麼,如果此時為None,即使原樣代入0也沒有問題,所以鎖本身的釋放方式和timeout一樣,所以我們提出優化。
if lock == 1 or (now > int(redis_client.get(lock_key) or 0)) and now > int(redis_client.getset(lock_key, lock_timeout) or 0)
然後,為了便於使用,我將其優化為類。
class DistributeLock(object): def __init__(self, lock_key=None, lock_timeout=2): """ :param lock_key: 分布式锁的key :param lock_timeout: 锁的超时时间 """ self.LOCK_TIMEOUT = lock_timeout self.lock = 0 self.lock_timeout = 0 self.lock_key = lock_key # # 分布式锁的key self.redis_client = #初始化redis客户端。 def __enter__(self): # 获取锁 while self.lock != 1: now = int(time.time()) self.lock_timeout = now + self.LOCK_TIMEOUT + 1 lock = self.redis_client.setnx(self.lock_key, self.lock_timeout) if lock == 1 or (now > int(self.redis_client.get(self.lock_key) or 0)) and now > int( self.redis_client.getset(self.lock_key, self.lock_timeout) or 0): break else: time.sleep(0.001) def __exit__(self, ex_type, ex_value, traceback): # 释放锁 now = int(time.time()) if now < self.lock_timeout: self.redis_client.delete(self.lock_key)# 使用方式: with DistributeLock(lock_key="your_distribute_lock",lock_timeout=3): # do your work. do_work() # 不用在关心锁的获取或者释放。
其實上面還是有一些問題的。這意味著不考慮redis掛起和主從切換的情況,稍後會更新。
關注我,讓我和你一起成長。
原文鏈接:https://blog.csdn.net/weixin_42394054/article/details/113581407