Redis getset()方法_Redis使用setnx實現分佈式鎖和issues,優化


Redis getset()方法_Redis使用setnx實現分佈式鎖和issues,優化

redis getset() 方法

最近,我一直在工作中使用分佈式鎖定,並且看到了許多實現分佈式鎖定的方法。 我對redis比較熟悉,或者說是比較好用,所以看了一下redis是如何使用setnx實現分佈式鎖的。其中一篇文章被搜索最多,我不確定哪一篇是原創文章,所以請把你看到的鏈接貼出來https://blog.csdn.net/lihao21/article/details/49104695。

reids > setnx(key,value)  //设置key.redis > delete(key)  //将key删除
复制代码

如果 key 存在,則 set 將失敗並返回 -1。如果密鑰不存在,則成功返回 0。

實現方法一:

lock_key = "distribute_lock"def get_lock():      while True:          lock = redis_client.setnx(lock_key,1)          if lock: # 设置成功              break          else:              time.sleep(0.5)  #如果没有获取成功,等待0.5继续获取,当然这个地方你也可以设置重试次数。  return Trueif get_lock():    # do your work,处理临界资源    redis_client.delete(lock_key) //为防止死锁,处理完临界资源要及时释放锁。

你知道這段代碼有什麼問題嗎?

問題在於鎖的最終釋放,如果進程在處理關鍵資源時掛起,或者在執行刪除操作時redis鏈接斷開,分佈式鎖將不會釋放,會發生死鎖。所以接下來的優化是在進程掛起時釋放鎖。超時機制。

實現方法二:

添加了超時機制以避免模式 1 問題。

setnx(key, lock timeout + 1>)

本質是將key的值設置為超時,按照方法1的流程。

lock_key = "distribute_lock"def get_lock():    while True:      lock = redis_client.setnx(lock_key,lock       if lock: # 设置成功          break      now_time = <current Unix time + lock timeout + 1>      lock_time_out = redis.client.get(lock_key)      if now_time > lock_time_out: #检查是否超时          # 如果已经超时          redis_client.delete(lock_key) # 将这个key删除,进行重新设置。          lock = redis_client.setnx(lock_key,lock       if lock: # 设置成功          break      else:        time.sleep(0.5)  #如果没有获取成功,等待0.5继续获取,当然这个地方你也可以设置重试次数。  return Trueif get_lock():    # do your work,处理临界资源    redis_client.delete(lock_key) //为防止死锁,处理完临界资源要及时释放锁。

去想想吧。這有什麼問題?

  1. 假設進程 p1 獲得了一個鎖,並且進程 p2 和 p3 不斷地檢查鎖是否超時。

  2. 然後p1進程掛了,沒有及時解除鎖。

  3. 一段時間後,p2 和 p3 都檢測到它們的鎖已超時。即程序的 now_time > lock_time_out 成立。

  4. 首先,p2 移除鎖,然後將鎖設置為超時,p2 獲取鎖。

  5. p3也檢測到鎖超時了,但是運行比較慢,直接解除鎖,其實p3解除了p2設置的鎖,p3和p2都獲得了鎖。

實施方法3

第二種實現的主要問題是什麼? 當 p3 移除鎖時,它不檢查是否有新進程獲得了鎖。為避免這種情況,請在 p3 執行配置操作時使用以下命令:

getset(lock_key,now_time+lock timeout + 1)

此命令返回舊值並設置新值。設置前,判斷當前時間是否大於lock_key的舊值。如果大於,則表示超時已過,已獲取鎖。如果再次發生上述情況,p2 和 p3 將同時看到鎖超時,p2 將丟棄並獲取鎖。 p3 執行 getset 操作,將當前時間與 lock_key 的舊值(由 p2 設置)進行比較。當前時間小於舊值。獲取鎖失敗。繼續下一輪等待。

其次,當你最終刪除它時,你不能像前幾次那樣直接刪除它。首先判斷當前時間是否小於鎖超時時間並刪除。不要刪除其他進程設置的鎖。以下是步驟:

def get_lock():    LOCK_TIMEOUT = 3    lock = 0    lock_timeout = 0    lock_key = 'distribute_lock'    # 获取锁    while lock != 1:        now = int(time.time())        lock_timeout = now + LOCK_TIMEOUT + 1        lock = redis_client.setnx(lock_key, lock_timeout)        if lock == 1 or (now > int(redis_client.get(lock_key))) and now > int(redis_client.getset(lock_key, lock_timeout)):            break        else:            time.sleep(0.001)if get_lock():    # dou your lock.处理临界资源()    # 释放锁    now = int(time.time())    if now < lock_timeout:        redis_client.delete(lock_key)

繼續讀下去,怎麼了?

  1. 為什麼是當前的Unix時間+鎖超時+1+1?

+1 因為 Redis 2.4 的過期延遲小於 1 秒。這意味著即使您的密鑰已過期,您仍然可以在過期後的一秒內訪問它,並且新的 Redis 2.6 版本減少了延遲。 1毫秒內。

  1. 這段代碼有什麼問題? 1) 客戶端會生成自己的過期時間,因此需要在分佈式環境中為每個客戶端強制時間同步。 2)當鎖過期時,多個客戶端同時執行jedis.getSet()方法時,只有一個客戶端可以加鎖,但是這個客戶端的鎖過期會被其他客戶端覆蓋。 3) 鎖沒有所有者 ID。任何客戶端都可以解鎖。

(1)存在第一個問題,一般同一個分佈組的項目需要時間同步。一般是互聯網設置的時區時間來同步互聯網的全局設置。 (2) Redis是單線程的,所以我沒有你說的情況。它們不能同時運行。甚至客戶端 A、B 和 C 同時發送 getset 到 redis(納秒精度)。 Redis 也按隊列順序順序運行。所以保證只有一個客戶端獲取鎖,做一些業務,最後釋放鎖,其他客戶端肯定會返回大於當前時間的結果,可能導致鎖失敗。是不可能的。允許其他客戶端解鎖。 您可能會說“同時”。但是當你說“同時”的時候,我知道你並不了解redis的底層,如果你改變redis的單線程設計理念,你就不會認同redis作者的理念。不過redis的單線程並不影響它的高性能。

  1. 我實際發現的問題。

if lock == 1 or (now > int(redis_client.get(lock_key))) and now > int(redis_client.getset(lock_key, lock_timeout))

解除鎖後,如果碰巧被 redis_client.get(lock_key) 查到了,此時鎖也被解除了,因此出現了這個bug。所以 redis_client.get(lock_key) 是 None,int(None)。發生類型轉換錯誤。那麼,如果此時為None,即使原樣代入0也沒有問題,所以鎖本身的釋放方式和timeout一樣,所以我們提出優化。

if lock == 1 or (now > int(redis_client.get(lock_key) or 0)) and now > int(redis_client.getset(lock_key, lock_timeout) or 0)

然後,為了便於使用,我將其優化為類。

class DistributeLock(object):    def __init__(self, lock_key=None, lock_timeout=2):        """        :param lock_key: 分布式锁的key        :param lock_timeout: 锁的超时时间        """        self.LOCK_TIMEOUT = lock_timeout        self.lock = 0        self.lock_timeout = 0        self.lock_key = lock_key  #  # 分布式锁的key        self.redis_client = #初始化redis客户端。    def __enter__(self):        # 获取锁        while self.lock != 1:            now = int(time.time())            self.lock_timeout = now + self.LOCK_TIMEOUT + 1            lock = self.redis_client.setnx(self.lock_key, self.lock_timeout)            if lock == 1 or (now > int(self.redis_client.get(self.lock_key) or 0)) and now > int(                    self.redis_client.getset(self.lock_key, self.lock_timeout) or 0):                break            else:                time.sleep(0.001)    def __exit__(self, ex_type, ex_value, traceback):        # 释放锁        now = int(time.time())        if now < self.lock_timeout:            self.redis_client.delete(self.lock_key)# 使用方式:  with DistributeLock(lock_key="your_distribute_lock",lock_timeout=3):      # do your work.        do_work() # 不用在关心锁的获取或者释放。

其實上面還是有一些問題的。這意味著不考慮redis掛起和主從切換的情況,稍後會更新。

關注我,讓我和你一起成長。

025e3107a563165626380163d27b794a.png