Java并發(fā)編程(四)--- 死鎖的發(fā)生與避免
前言
上一篇我們介紹了如何通過synchronized 來加鎖保護(hù)資源。但是,不當(dāng)?shù)募渔i方式可能就會(huì)導(dǎo)致死鎖。
死鎖發(fā)生的場(chǎng)景
最典型的就是哲學(xué)家問題,
場(chǎng)景:5個(gè)哲學(xué)家,5跟筷子,5盤意大利面,大家圍繞桌子而坐,進(jìn)行思考與進(jìn)食活動(dòng)。
哲學(xué)家的活動(dòng)描述:
哲學(xué)家除了吃面、還要思考、所以要么放下左右手筷子進(jìn)行思考、要么拿起兩個(gè)筷子(自己兩側(cè)的)開始吃面。
哲學(xué)家從不交談,這就很危險(xiǎn)了,很可能會(huì)發(fā)生死鎖,假設(shè)每個(gè)人都是先拿到左邊的筷子,然后去拿右邊的筷子,那么就可能會(huì)出現(xiàn)如下情況。
通過代碼模擬:
public class DeadLockTest2 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
int sum = 5;
Chopsticks[] chopsticks = new Chopsticks[sum];
for (int i = 0; i < sum; i++) {
chopsticks[i] = new Chopsticks();
}
for (int i = 0; i < sum; i++) {
executorService.execute(new Philosopher(chopsticks[i], chopsticks[(i + 1) % sum]));
}
}
// 筷子
static class Chopsticks {
}
//哲學(xué)家
static class Philosopher implements Runnable {
private Chopsticks left;
private Chopsticks right;
public Philosopher(Chopsticks left, Chopsticks right) {
this.left = left;
this.right = right;
}
@Override
public void run() {
try {
//思考一段時(shí)間
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (left) {
try {
//拿到左邊的筷子之后等待一段時(shí)間
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (right) {
try {
System.out.println("********開始吃飯");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
如上程序:定義了一個(gè)哲學(xué)家類,該類的主要任務(wù)要么是思考,要么是吃飯,吃飯的話,首先拿到其左邊的筷子,等待一段時(shí)間后再去拿其右邊的筷子。在此處因?yàn)槊總€(gè)哲學(xué)家都是占用自己左邊的筷子等待拿右邊的筷子。所以,就會(huì)出現(xiàn)循環(huán)等待,導(dǎo)致死鎖。下面我們就來查看下:
如何查看死鎖的發(fā)生
我們可以通過java命令很方便的查看是否有死鎖發(fā)生。首先通過jps命令查看當(dāng)前程序所占的進(jìn)程如下:
找到對(duì)應(yīng)的進(jìn)程之后,接著通過jstack 命令查看程序運(yùn)行情況。如下:
通過上述分析我們發(fā)現(xiàn)死鎖發(fā)生的條件是如下四個(gè)(必須同時(shí)滿足):
互斥,共享資源A和B只能被一個(gè)線程占用,就是本例中的,一根筷子同一時(shí)刻只能被一個(gè)哲學(xué)家獲得
占有且等待:線程T1持有共享資源A,在等待共享資源B時(shí),不釋放占用的資源,在本例中就是:哲學(xué)家1獲得他左邊的筷子,等待獲得他右邊的筷子,即使沒有得到也不會(huì)放回其獲得的筷子。
不可搶占:其他線程不能強(qiáng)行占用線程T1占用的資源,在本例中就是:每個(gè)哲學(xué)家獲得的筷子不能被其他哲學(xué)家搶走。
循環(huán)等待:線程T1等待線程T2占用的資源,線程T2等待線程T1占用的資源。在本例中:所有哲學(xué)家圍坐一桌,已經(jīng)形成了一個(gè)申請(qǐng)資源的環(huán)。
如何避免死鎖
前面我們說了,死鎖的發(fā)生條件是必須同時(shí)滿足上述四個(gè)條件。那么避免死鎖的方式就是破壞掉其中的一個(gè)條件就可以了。
對(duì)于占用且等待
對(duì)于占用且等待的情況,我們只需要一次性申請(qǐng)所有的資源,只有申請(qǐng)到了才會(huì)往下面走。對(duì)于這種情況,我們需要一個(gè)調(diào)度者,由它來統(tǒng)一申請(qǐng)資源。調(diào)度者必須是單例的,由他給哲學(xué)家分配筷子。
public class Allocator {
private List applyList = new ArrayList();
private final static Allocator allocator = new Allocator();
private Allocator() {
}
/**
* 只能由一個(gè)人完成,所以是單例模式
* @return
*/
public static Allocator getAllocator() {
return allocator;
}
/**
* 申請(qǐng)資源
*/
synchronized boolean applyResource(Object from, Object to) {
if (applyList.contains(from) ||
applyList.contains(to)) {
return false;
}
applyList.add(from);
applyList.add(to);
return true;
}
/**
* 釋放資源
*/
synchronized void free(Object from, Object to) {
applyList.remove(from);
applyList.remove(to);
}
調(diào)度者會(huì)一直申請(qǐng)拿到兩個(gè)資源,如果能拿到這執(zhí)行后續(xù)流程,拿不到的話則一直循環(huán)申請(qǐng)。
public void eat(Account2 target) {
//沒有申請(qǐng)到鎖就一直循環(huán)下去,直到成功
while (!Allocator.getAllocator().applyResource(this, target)) {
return;
}
try {
//左邊
synchronized (this) {
//右邊
synchronized (target) {
}
}
} finally {
//釋放已經(jīng)申請(qǐng)的資源
Allocator.getAllocator().free(this, target);
}
}
對(duì)于不可搶占資源
對(duì)于不可搶占資源,占有部分資源的線程進(jìn)一步申請(qǐng)其他資源,如果申請(qǐng)不到則主動(dòng)釋放它占用的資源。在后面我們會(huì)運(yùn)用lock來實(shí)現(xiàn)。給鎖設(shè)定超時(shí)時(shí)間。如果在超時(shí)未獲得需要的資源,則釋放其所占資源。
class Philosopher extends Thread{
private ReentrantLock left,right;
public Philosopher(ReentrantLock left, ReentrantLock right) {
super();
this.left = left;
this.right = right;
}
public void run(){
try {
while(true){
Thread.sleep(1000);//思考一段時(shí)間
left.lock();
try{
if(right.tryLock(1000,TimeUnit.MILLISECONDS)){
try{
Thread.sleep(1000);//進(jìn)餐一段時(shí)間
}finally {
right.unlock();
}
}else{
//沒有獲取到右手的筷子,放棄并繼續(xù)思考
}
}finally {
left.unlock();
}
}
} catch (InterruptedException e) {
}
}
}
如上程序,我們將right鎖的超時(shí)時(shí)間設(shè)置為1秒,如果不能獲取到,右手的筷子,則放棄吃面并繼續(xù)思考。
對(duì)于循環(huán)等待
對(duì)于循環(huán)等待,我們可以按序申請(qǐng)資源來預(yù)防,所謂的按序申請(qǐng),是指資源是有線性順序的,申請(qǐng)的時(shí)候可以先申請(qǐng)序號(hào)小的,再申請(qǐng)序號(hào)大的。
我們?cè)贑hopsticks中添加一個(gè)id 字段,作為資源的序號(hào)。然后在申請(qǐng)資源時(shí)按照序號(hào)從小到大開始申請(qǐng)。
static class Chopsticks {
private int id;
public Chopsticks(int id) {
this.id = id;
}
public int getId() {
return id;
}
}
public Philosopher(Chopsticks left, Chopsticks right) {
if (left.getId() > right.getId()) {
this.left = right;
this.right = left;
} else {
this.left = left;
this.right = right;
}
}
死鎖發(fā)生后如何處理
當(dāng)檢測(cè)到死鎖時(shí),一個(gè)可行的做法是釋放所有鎖,回退,并且等待一段隨機(jī)時(shí)間后重試。這個(gè)和簡單的加鎖超時(shí)類似,不一樣的是只有死鎖已經(jīng)發(fā)生了才回退 。而不會(huì)是因?yàn)榧渔i的請(qǐng)求超時(shí)了,雖然有回退和等待,但是如果有大量線程競爭同一批鎖,它們還是會(huì)重復(fù)地死鎖。
一個(gè)更好的方案是給這些線程設(shè)置優(yōu)先級(jí),讓一個(gè)(或幾個(gè))線程回退,剩下的線程就像沒發(fā)生死鎖一樣繼續(xù)保持著它們需要的鎖??梢栽偎梨i發(fā)生的時(shí)候設(shè)置隨機(jī)的優(yōu)先級(jí)。
總結(jié)
本文通過一個(gè)經(jīng)典的哲學(xué)家就餐的問題,引入了死鎖發(fā)生的場(chǎng)景及發(fā)生的條件。然后,針對(duì)這些條件介紹了避免死鎖的三種方式。
作者:碼農(nóng)飛哥
微信公眾號(hào):碼農(nóng)飛哥
