Java并發(fā)編程(四)--- 死鎖的發(fā)生與避免
前言
上一篇我們介紹了如何通過synchronized 來加鎖保護(hù)資源。但是,不當(dāng)?shù)募渔i方式可能就會導(dǎo)致死鎖。
死鎖發(fā)生的場景
最典型的就是哲學(xué)家問題,
場景:5個哲學(xué)家,5跟筷子,5盤意大利面,大家圍繞桌子而坐,進(jìn)行思考與進(jìn)食活動。
哲學(xué)家的活動描述:
哲學(xué)家除了吃面、還要思考、所以要么放下左右手筷子進(jìn)行思考、要么拿起兩個筷子(自己兩側(cè)的)開始吃面。
哲學(xué)家從不交談,這就很危險了,很可能會發(fā)生死鎖,假設(shè)每個人都是先拿到左邊的筷子,然后去拿右邊的筷子,那么就可能會出現(xiàn)如下情況。
通過代碼模擬:
public class DeadLockTest2 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
int sum = 5;
Chopsticks[] chopsticks = new Chopsticks[sum];
for (int i = 0; i < sum; i++) {
chopsticks[i] = new Chopsticks();
}
for (int i = 0; i < sum; i++) {
executorService.execute(new Philosopher(chopsticks[i], chopsticks[(i + 1) % sum]));
}
}
// 筷子
static class Chopsticks {
}
//哲學(xué)家
static class Philosopher implements Runnable {
private Chopsticks left;
private Chopsticks right;
public Philosopher(Chopsticks left, Chopsticks right) {
this.left = left;
this.right = right;
}
@Override
public void run() {
try {
//思考一段時間
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (left) {
try {
//拿到左邊的筷子之后等待一段時間
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (right) {
try {
System.out.println("********開始吃飯");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
如上程序:定義了一個哲學(xué)家類,該類的主要任務(wù)要么是思考,要么是吃飯,吃飯的話,首先拿到其左邊的筷子,等待一段時間后再去拿其右邊的筷子。在此處因為每個哲學(xué)家都是占用自己左邊的筷子等待拿右邊的筷子。所以,就會出現(xiàn)循環(huán)等待,導(dǎo)致死鎖。下面我們就來查看下:
如何查看死鎖的發(fā)生
我們可以通過java命令很方便的查看是否有死鎖發(fā)生。首先通過jps命令查看當(dāng)前程序所占的進(jìn)程如下:
找到對應(yīng)的進(jìn)程之后,接著通過jstack 命令查看程序運行情況。如下:
通過上述分析我們發(fā)現(xiàn)死鎖發(fā)生的條件是如下四個(必須同時滿足):
互斥,共享資源A和B只能被一個線程占用,就是本例中的,一根筷子同一時刻只能被一個哲學(xué)家獲得
占有且等待:線程T1持有共享資源A,在等待共享資源B時,不釋放占用的資源,在本例中就是:哲學(xué)家1獲得他左邊的筷子,等待獲得他右邊的筷子,即使沒有得到也不會放回其獲得的筷子。
不可搶占:其他線程不能強行占用線程T1占用的資源,在本例中就是:每個哲學(xué)家獲得的筷子不能被其他哲學(xué)家搶走。
循環(huán)等待:線程T1等待線程T2占用的資源,線程T2等待線程T1占用的資源。在本例中:所有哲學(xué)家圍坐一桌,已經(jīng)形成了一個申請資源的環(huán)。
如何避免死鎖
前面我們說了,死鎖的發(fā)生條件是必須同時滿足上述四個條件。那么避免死鎖的方式就是破壞掉其中的一個條件就可以了。
對于占用且等待
對于占用且等待的情況,我們只需要一次性申請所有的資源,只有申請到了才會往下面走。對于這種情況,我們需要一個調(diào)度者,由它來統(tǒng)一申請資源。調(diào)度者必須是單例的,由他給哲學(xué)家分配筷子。
public class Allocator {
private List applyList = new ArrayList();
private final static Allocator allocator = new Allocator();
private Allocator() {
}
/**
* 只能由一個人完成,所以是單例模式
* @return
*/
public static Allocator getAllocator() {
return allocator;
}
/**
* 申請資源
*/
synchronized boolean applyResource(Object from, Object to) {
if (applyList.contains(from) ||
applyList.contains(to)) {
return false;
}
applyList.add(from);
applyList.add(to);
return true;
}
/**
* 釋放資源
*/
synchronized void free(Object from, Object to) {
applyList.remove(from);
applyList.remove(to);
}
調(diào)度者會一直申請拿到兩個資源,如果能拿到這執(zhí)行后續(xù)流程,拿不到的話則一直循環(huán)申請。
public void eat(Account2 target) {
//沒有申請到鎖就一直循環(huán)下去,直到成功
while (!Allocator.getAllocator().applyResource(this, target)) {
return;
}
try {
//左邊
synchronized (this) {
//右邊
synchronized (target) {
}
}
} finally {
//釋放已經(jīng)申請的資源
Allocator.getAllocator().free(this, target);
}
}
對于不可搶占資源
對于不可搶占資源,占有部分資源的線程進(jìn)一步申請其他資源,如果申請不到則主動釋放它占用的資源。在后面我們會運用lock來實現(xiàn)。給鎖設(shè)定超時時間。如果在超時未獲得需要的資源,則釋放其所占資源。
class Philosopher extends Thread{
private ReentrantLock left,right;
public Philosopher(ReentrantLock left, ReentrantLock right) {
super();
this.left = left;
this.right = right;
}
public void run(){
try {
while(true){
Thread.sleep(1000);//思考一段時間
left.lock();
try{
if(right.tryLock(1000,TimeUnit.MILLISECONDS)){
try{
Thread.sleep(1000);//進(jìn)餐一段時間
}finally {
right.unlock();
}
}else{
//沒有獲取到右手的筷子,放棄并繼續(xù)思考
}
}finally {
left.unlock();
}
}
} catch (InterruptedException e) {
}
}
}
如上程序,我們將right鎖的超時時間設(shè)置為1秒,如果不能獲取到,右手的筷子,則放棄吃面并繼續(xù)思考。
對于循環(huán)等待
對于循環(huán)等待,我們可以按序申請資源來預(yù)防,所謂的按序申請,是指資源是有線性順序的,申請的時候可以先申請序號小的,再申請序號大的。
我們在Chopsticks中添加一個id 字段,作為資源的序號。然后在申請資源時按照序號從小到大開始申請。
static class Chopsticks {
private int id;
public Chopsticks(int id) {
this.id = id;
}
public int getId() {
return id;
}
}
public Philosopher(Chopsticks left, Chopsticks right) {
if (left.getId() > right.getId()) {
this.left = right;
this.right = left;
} else {
this.left = left;
this.right = right;
}
}
死鎖發(fā)生后如何處理
當(dāng)檢測到死鎖時,一個可行的做法是釋放所有鎖,回退,并且等待一段隨機時間后重試。這個和簡單的加鎖超時類似,不一樣的是只有死鎖已經(jīng)發(fā)生了才回退 。而不會是因為加鎖的請求超時了,雖然有回退和等待,但是如果有大量線程競爭同一批鎖,它們還是會重復(fù)地死鎖。
一個更好的方案是給這些線程設(shè)置優(yōu)先級,讓一個(或幾個)線程回退,剩下的線程就像沒發(fā)生死鎖一樣繼續(xù)保持著它們需要的鎖??梢栽偎梨i發(fā)生的時候設(shè)置隨機的優(yōu)先級。
總結(jié)
本文通過一個經(jīng)典的哲學(xué)家就餐的問題,引入了死鎖發(fā)生的場景及發(fā)生的條件。然后,針對這些條件介紹了避免死鎖的三種方式。
作者:碼農(nóng)飛哥
微信公眾號:碼農(nóng)飛哥