Redisson 高性能 Redis 分布式鎖源碼分析
建議閱讀時(shí)間: 2min
Redisson 實(shí)現(xiàn)分布式鎖的機(jī)制如下:
原理描述
先線程 1 獲取鎖,如果獲取鎖成功,那么會(huì)開啟一個(gè)后臺(tái)線程,每次間隔 10 秒進(jìn)行續(xù)期。
并發(fā)情況,線程 2 會(huì)進(jìn)行加鎖,如果無法獲取鎖,那么就會(huì)進(jìn)行自旋等待,等待到達(dá)一定次數(shù)過后,就會(huì)進(jìn)行線程阻塞,并且訂閱解鎖消息。
當(dāng)線程 1 釋放鎖之后,會(huì)觸發(fā) redis 的解鎖消息,消息的觀察者會(huì)觀察到然后去喚醒解鎖的邏輯,線程 2 繼續(xù)競爭鎖。
對于鎖的重入,Redisson 是通過 hash 為數(shù)據(jù)類型的,會(huì)存儲(chǔ)當(dāng)前線程的 tid (本質(zhì)是生成的 uuid 唯一id).
測試代碼
下面我們將以一個(gè)秒殺的例子來說明:
依賴版本
implementation 'org.redisson:redisson-spring-boot-starter:3.17.0'
測試代碼
下面是模擬一個(gè)商品秒殺的場景,示例代碼如下:
public class RedissonTest {
public static void main(String[] args) {
//1. 配置部分
Config config = new Config();
String address = "redis://127.0.0.1:6379";
SingleServerConfig serverConfig = config.useSingleServer();
serverConfig.setAddress(address);
serverConfig.setDatabase(0);
config.setLockWatchdogTimeout(5000);
Redisson redisson = (Redisson) Redisson.create(config);
RLock rLock = redisson.getLock("goods:1000:1");
//2. 加鎖
rLock.lock();
try {
System.out.println("todo 邏輯處理 1000000.");
} finally {
if (rLock.isLocked() && rLock.isHeldByCurrentThread()) {
//3. 解鎖
rLock.unlock();
}
}
}
}
加鎖設(shè)計(jì)
rLock.lock();是加鎖的核心代碼,我們一起來看看調(diào)用棧
加鎖的核心方法是:org.redisson.RedissonLock#tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
其實(shí)它的本質(zhì)是調(diào)用一段 LUA 腳本進(jìn)行加鎖, 需要注意的是這個(gè)地方使用的數(shù)據(jù)類型是 hash。這里是用 hash 的好處就是可以通過同一個(gè) key 來存儲(chǔ)重入的 tid
鎖續(xù)期設(shè)計(jì)
鎖的續(xù)期是在 org.redisson.RedissonLock#tryAcquireAsync方法中調(diào)用 scheduleExpirationRenewal實(shí)現(xiàn)的。
續(xù)期需要注意的是,看門狗是設(shè)置在主線程的延遲隊(duì)列的線程中。
這里的好處就是如果我在一個(gè)進(jìn)程中,同時(shí)加了 1000 把鎖,我們不需要啟動(dòng) 1000 個(gè)子線程去續(xù)期,只需要?jiǎng)?chuàng)建 1000 個(gè)續(xù)期任務(wù)對象即可,在到達(dá)續(xù)期時(shí)間才會(huì)喚醒續(xù)期線程。
tryAcquireAsync 代碼如下:
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// lock acquired
if (ttlRemaining == null) {
if (leaseTime != -1) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// 鎖過期時(shí)間續(xù)期
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
鎖續(xù)期 scheduleExpirationRenewal代碼如下:
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
try {
renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
然后在調(diào)用 renewExpiration(); 執(zhí)行續(xù)期邏輯, 其實(shí)這里是一個(gè)定時(shí)任務(wù) + 遞歸的方式實(shí)現(xiàn)續(xù)期的,用定時(shí)任務(wù)的好處就是不用去開 N 個(gè)字線程,只需要?jiǎng)?chuàng)建對應(yīng)的任務(wù)對象即可。
備注:如果超級(jí)極端的情況下 N 把鎖,同時(shí)加鎖,同時(shí)需求。我們可以考慮在鎖的有效期上,給它加一個(gè)浮動(dòng)時(shí)間比如 100 - 500ms. 這樣就能一定程度上避免 (參考的是緩存失效/擊穿的解決方案)
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 創(chuàng)建延遲任務(wù)
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 真正的續(xù)期,調(diào)用 LUA 腳本續(xù)期
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
// 如果續(xù)期成功
if (res) {
// reschedule itself
renewExpiration();
} else {
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
這里還有一個(gè)小的點(diǎn),就是續(xù)期的時(shí)間是 1/3 為什么呢?保證在下次續(xù)期的時(shí)候鎖不過期,如果是 1/2 可能在下次定時(shí)任務(wù)執(zhí)行的時(shí)候 key 已經(jīng)過期,如果小于 1/3 會(huì)導(dǎo)致頻繁續(xù)期,任務(wù)代價(jià)/收益比不高。
renewExpirationAsync方法, 里面還是一段 LUA 腳本,進(jìn)行重新設(shè)置鎖的過期時(shí)間。
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
鎖的自旋重試
org.redisson.RedissonLock#lock(long, java.util.concurrent.TimeUnit, boolean)在執(zhí)行獲取鎖失敗的時(shí)候,會(huì)進(jìn)入重試。其實(shí)這里就會(huì)執(zhí)行 18 行以后的 while (true)邏輯
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
// 訂閱鎖過期的消息
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
RedissonLockEntry entry;
if (interruptibly) {
entry = commandExecutor.getInterrupted(future);
} else {
entry = commandExecutor.get(future);
}
try {
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
try {
// 阻塞鎖的超時(shí)時(shí)間,等鎖過期后再嘗試加鎖
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
entry.getLatch().acquire();
} else {
entry.getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(entry, threadId);
}
// get(lockAsync(leaseTime, unit));
}
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);其實(shí)這里就是一個(gè)間歇性自旋。等到上次鎖過期的時(shí)間,在喚醒進(jìn)行搶鎖 entry.getLatch().acquire();
訂閱鎖失效
還有一個(gè)邏輯就是
CompletableFuture future = subscribe(threadId);
這里其實(shí)是會(huì)訂閱一個(gè)消息,如果解鎖過后,會(huì)發(fā)布解鎖的消息。然后再喚醒當(dāng)前多次競爭鎖進(jìn)入休眠的線程。
解鎖設(shè)計(jì)
rLock.unlock(); 的核心就是釋放鎖,撤銷續(xù)期和喚醒在等待加鎖的線程(發(fā)布解鎖成功消息)。
核心方法(解鎖): org.redisson.RedissonLock#unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
// 發(fā)布解鎖成功消息
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
還是 LUA 的執(zhí)行方式。
撤銷鎖續(xù)期
核心方法 org.redisson.RedissonBaseLock#unlockAsync(long)
@Override
public RFuture<Void> unlockAsync(long threadId) {
// 解鎖
RFuture<Boolean> future = unlockInnerAsync(threadId);
// 撤銷續(xù)期
CompletionStage<Void> f = future.handle((opStatus, e) -> {
cancelExpirationRenewal(threadId);
if (e != null) {
throw new CompletionException(e);
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
throw new CompletionException(cause);
}
return null;
});
return new CompletableFutureWrapper<>(f);
}
解鎖成功喚排隊(duì)線程
在 org.redisson.pubsub.LockPubSub#onMessage中回去喚醒阻塞的線程,讓執(zhí)行前面的鎖自旋邏輯,具體代碼如下:
@Override
protected void onMessage(RedissonLockEntry value, Long message) {
if (message.equals(UNLOCK_MESSAGE)) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
value.getLatch().release();
} else if (message.equals(READ_UNLOCK_MESSAGE)) {
while (true) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute == null) {
break;
}
runnableToExecute.run();
}
value.getLatch().release(value.getLatch().getQueueLength());
}
}
作者:老鄭
歡迎關(guān)注:運(yùn)維開發(fā)故事