0%

Java多线程

Java 多线程相关的基础知识。

概念

线程和进程

何为线程?

线程与进程相似,但线程是一个比进程更小的执行单位。一个进程在其执行的过程中可以产生多个线程。与进程不同的是同类的多个线程共享同一块内存空间和一组系统资源,所以系统在产生一个线程,或是在各个线程之间作切换工作时,负担要比进程小得多,也正因为如此,线程也被称为轻量级进程。

何为进程?

进程是程序的一次执行过程,是系统运行程序的基本单位,因此进程是动态的。系统运行一个程序即是一个进程从创建,运行到消亡的过程。简单来说,一个进程就是一个执行中的程序,它在计算机中一个指令接着一个指令地执行着,同时,每个进程还占有某些系统资源如CPU时间,内存空间,文件,文件,输入输出设备的使用权等等。换句话说,当程序在执行时,将会被操作系统载入内存中。

线程和进程有何不同?

线程是进程划分成的更小的运行单位。线程和进程最大的不同在于基本上各进程是独立的,而各线程则不一定,因为同一进程中的线程极有可能会相互影响。从另一角度来说,进程属于操作系统的范畴,主要是同一段时间内,可以同时执行一个以上的程序,而线程则是在同一程序内几乎同时执行一个以上的程序段。

生命周期

一个线程有五个基本状态

  • 新建状态:当线程对象创建后,即进入新建状态,如:Thread t = new MyThread();
  • 就绪状态:当调用线程对象的start()方法时,线程即进入就绪状态。处于就绪状态的线程只是说明此线程已经做好准备,随时等待 CPU 调度执行,并不是说执行了start()方法就立即执行。
  • 运行状态:当 CPU 开始调度处于就绪状态的线程时,此时线程才得以真正执行,即进入到运行状态。
  • 阻塞状态:处于运行状态中的线程由于某种原因,暂时放弃对CPU的使用权,停止执行,此时进入阻塞状态,直到其进入到就绪状态,才有机会再次被CPU调用以进入到运行状态。阻塞状态分为三种:
    • 等待阻塞
    • 同步阻塞
    • 其他阻塞
  • 死亡状态:线程执行完毕或者异常退出,该线程结束生命周期。

重要概念

synchronized 关键字

synchronized是 Java 中的关键字,是利用锁的机制来实现同步的。

锁机制有如下两种特性:

  • 互斥性:即在同一时间只允许一个线程持有某个对象锁,通过这种特性来实现多线程中的协调机制,这样在同一时间只有一个线程对需同步的代码块(复合操作)进行访问。互斥性我们也往往称为操作的原子性。
  • 可见性:必须确保在锁被释放之前,对共享变量所做的修改,对于随后获得该锁的另一个线程是可见的(即在获得锁时应获得最新共享变量的值),否则另一个线程可能是在本地缓存的某个副本上继续操作从而引起不一致。

synchronized的作用域包括以下几个部分:

对象

一个线程访问一个对象中的synchronized(this)同步代码块时,其他试图访问该对象的线程将被阻塞。

1
public void run() {
2
    synchronized (this){
3
        try {
4
            System.out.println(Thread.currentThread().getId() + " running");
5
            Thread.sleep(2000);
6
            System.out.println(Thread.currentThread().getId() + " complete");
7
        } catch (InterruptedException e) {
8
            e.printStackTrace();
9
        }
10
    }
11
}

同一时刻只有一个线程可以执行run()方法

或者锁一个对象

1
public Integer count=0;
2
3
public void add(){
4
    synchronized (count){
5
        try {
6
            System.out.println(Thread.currentThread().getId() + " running");
7
            Thread.sleep(2000);
8
            count++;
9
            System.out.println(Thread.currentThread().getId() + " complete");
10
        } catch (InterruptedException e) {
11
            e.printStackTrace();
12
        }
13
    }
14
}

起到同样的作用,count这个对象同一时间只能一个线程访问。

方法

1
public synchronized void run() {
2
    try {
3
        System.out.println(Thread.currentThread().getId() + " running");
4
        Thread.sleep(2000);
5
        System.out.println(Thread.currentThread().getId() + " complete");
6
    } catch (InterruptedException e) {
7
        e.printStackTrace();
8
    }
9
}

代码块(类)

synchronized关键字同样可以同步代码块,但是只有代码块内部的代码被锁,同一个方法里的其他代码仍然可以并行执行。另外需要注意,synchronized其实作用的是对象,所以如果这里锁中是this,并不能保证代码被顺序执行,只能保证同一个对象内的代码。因此要改成锁住类。

1
public void synchronizedCodeBlock() {
2
    System.out.println(Thread.currentThread().getId() + ":同一个方法里没有被锁的部分可以同步执行");
3
    synchronized (this.getClass()) {
4
        try {
5
            Thread.sleep(100);
6
        } catch (InterruptedException e) {
7
            e.printStackTrace();
8
9
        }
10
        System.out.println(Thread.currentThread().getId() + ":被锁住的代码部分必须顺序执行");
11
    }
12
}

静态变量、方法、代码块

synchronize关键字如果修饰的是一个静态变量、静态方法或者静态代码块的时候,同步的是这个类的所有实例。

锁静态方法、变量的话,会作用于该类的所有实例。

1
public synchronized static void synchronizedStaticFunction() {
2
    try {
3
        Thread.sleep(200);
4
    } catch (InterruptedException e) {
5
        e.printStackTrace();
6
    }
7
    System.out.println(Thread.currentThread().getId() + ":静态方法");
8
}

Lock 接口

Lock不是一个关键字,而是一个接口,使用方式跟synchronized类似

总结来说,Locksynchronized有以下几点不同:

  • Lock是一个接口,而synchronized是Java中的关键字,synchronized是内置的语言实现;
  • synchronized在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而Lock在发生异常时,如果没有主动通过unLock()去释放锁,则很可能造成死锁现象,因此使用Lock时需要在finally块中释放锁;
  • Lock可以让等待锁的线程响应中断,而synchronized却不行,使用synchronized时,等待的线程会一直等待下去,不能够响应中断;
  • 通过Lock可以知道有没有成功获取锁,而synchronized却无法办到。
  • Lock可以提高多个线程进行读操作的效率。

在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时Lock的性能要远远优于synchronized。所以说,在具体使用时要根据适当情况选择。

volatile 关键字

volatile修饰的共享变量,就具有了以下两点特性:

  1. 保证了不同线程对该变量操作的内存可见性;

  2. 禁止指令重排序

JMM主要就是围绕着如何在并发过程中如何处理原子性、可见性和有序性这3个特征来建立的,通过解决这三个问题,可以解除缓存不一致的问题。而volatile跟可见性和有序性都有关。volatile不保证原子性。

一个简单的测试

1
public class MyVolatileClass {
2
    int a = 0;
3
    
4
    public void add() {
5
        a = a+1;
6
    }
7
}

开启多个线程执行一万次add()方法之后,可以看到a的结果并不是一万,而是要少于一万。无论int a前面加不加volatile结果都一样。所以volatile不能保证操作的原子性。那么volatile的作用是什么?

1
static volatile boolean stop = false;
2
public static void main(String[] args) {
3
    new Thread(() -> {
4
        while(!stop) {
5
        }
6
        System.out.println("停止了");
7
    }).start();
8
    try {
9
        Thread.sleep(1000);
10
    } catch (InterruptedException e) {
11
        e.printStackTrace();
12
    }
13
    stop = true;
14
    System.out.println("赶快停止吧 " + stop);
15
}

如果不带volatile关键字,那么循环很可能不会终止。

锁的种类

可中断锁

Lock是可中断锁,而synchronized不是可中断锁

线程 A 和 B 都要获取对象 O 的锁定,假设 A 获取了对象 O 锁,B 将等待 A 释放对 O 的锁定,如果使用synchronized,如果 A 不释放,B 将一直等下去,不能被中断。

如果使用ReentrantLock,即使 A 不释放,也可以使 B 在等待了足够长的时间以后,中断等待,而干别的事情。

公平锁/非公平锁

公平锁是指多个线程按照申请锁的顺序来获取锁。非公平锁是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁。有可能,会造成优先级反转或者饥饿现象。对于Java ReentrantLock而言,通过构造函数指定该锁是否是公平锁,默认是非公平锁。非公平锁的优点在于吞吐量比公平锁大。对于synchronized而言,也是一种非公平锁。由于其并不像ReentrantLock是通过 AQS 来实现线程调度,所以并没有任何办法使其变成公平锁。

乐观锁/悲观锁

乐观锁与悲观锁不是指具体的什么类型的锁,而是指看待并发同步的角度。悲观锁认为对于同一个数据的并发操作,一定是会发生修改的,哪怕没有修改,也会认为修改。因此对于同一个数据的并发操作,悲观锁采取加锁的形式。悲观的认为,不加锁的并发操作一定会出问题。乐观锁则认为对于同一个数据的并发操作,是不会发生修改的。在更新数据的时候,会采用尝试更新,不断重新的方式更新数据。乐观的认为,不加锁的并发操作是没有事情的。从上面的描述我们可以看出,悲观锁适合写操作非常多的场景,乐观锁适合读操作非常多的场景,不加锁会带来大量的性能提升。悲观锁在 Java 中的使用,就是利用各种锁。乐观锁在 Java 中的使用,是无锁编程,常常采用的是 CAS 算法,典型的例子就是原子类,通过 CAS 自旋实现原子操作的更新。

独享锁/共享锁

独享锁是指该锁一次只能被一个线程所持有。共享锁是指该锁可被多个线程所持有。对于Java ReentrantLock而言,其是独享锁。但是对于Lock的另一个实现类ReentrantReadWriteLock,其读锁是共享锁,其写锁是独享锁。读锁的共享锁可保证并发读是非常高效的,读写,写读 ,写写的过程是互斥的。独享锁与共享锁也是通过AQS来实现的,通过实现不同的方法,来实现独享或者共享。对于synchronized 而言,当然是独享锁。

可重入锁/不可重入锁

可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,在进入内层方法会自动获取锁。

读写锁

ReentrantLock 属于排他锁,这些锁在同一时刻只允许一个线程进行访问,而读写锁在同一时刻可以允许多个线程访问,但是在写线程访问时,所有的读和其他写线程都被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。

自旋锁

阻塞或唤醒一个 Java 线程需要操作系统切换 CPU 状态来完成,这种状态转换需要耗费处理器时间。如果同步代码块中的内容过于简单,状态转换消耗的时间有可能比用户代码执行的时间还要长。

在许多场景中,同步资源的锁定时间很短,为了这一小段时间去切换线程,线程挂起和恢复现场的花费可能会让系统得不偿失。如果物理机器有多个处理器,能够让两个或以上的线程同时并行执行,我们就可以让后面那个请求锁的线程不放弃 CPU 的执行时间,看看持有锁的线程是否很快就会释放锁。

而为了让当前线程“稍等一下”,我们需让当前线程进行自旋,如果在自旋完成后前面锁定同步资源的线程已经释放了锁,那么当前线程就可以不必阻塞而是直接获取同步资源,从而避免切换线程的开销。这就是自旋锁。

借一张网图来总结:

基本使用方式

继承 Thread 类

1
public class MyThread extends Thread {
2
    @Override
3
    public void run() {
4
        System.out.println("MyThread is running");
5
    }
6
}

实现 Runnable 接口

1
public class MyRunnable implements Runnable {
2
    @Override
3
    public void run() {
4
        System.out.println("MyRunnable is running");
5
    }
6
}

实现 Callable 接口

1
public class MyCallable implements Callable {
2
    @Override
3
    public Object call() {
4
        System.out.println("MyCallable is running");
5
        return null;
6
    }
7
}

三者区别

  • 继承Thread不方便共享变量,由于 Java 的单继承机制,继承了Thread类之后不能再继承别的类。
  • RunnableCallable功能比较相似,主要区别有:
    • Runnable是自从 Java 1.1就有了,而Callable是1.5之后才加上去的
    • Callable规定的方法是call()Runnable规定的方法是run()
    • Callable的任务执行后可返回值,而Runnable的任务没有返回值(是void)
    • call()方法可以抛出异常,run()方法不可以
    • 运行Callable任务可以拿到一个Future对象,表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果。通过Future对象可以了解任务执行情况,可取消任务的执行,还可获取执行结果。
    • 加入线程池运行,Runnable使用ExecutorService的execute方法,Callable使用submit方法

执行run方法和start方法的区别

start()的作用是启动一个新的线程。

通过start()方法来启动的新线程,处于就绪(可运行)状态,并没有运行,一旦得到cpu时间片,就开始执行相应线程的run()方法,这里方法run()称为线程体,它包含了要执行的这个线程的内容,run方法运行结束,此线程随即终止。start()不能被重复调用。用start方法来启动线程,真正实现了多线程运行,即无需等待某个线程的run方法体代码执行完毕就直接继续执行下面的代码。这里无需等待run方法执行完毕,即可继续执行下面的代码,即进行了线程切换。

run()就和普通的成员方法一样,可以被重复调用。

如果直接调用run方法,并不会启动新线程!程序中依然只有主线程这一个线程,其程序执行路径还是只有一条,还是要顺序执行,还是要等待run方法体执行完毕后才可继续执行下面的代码,这样就没有达到多线程的目的。

总结:调用start方法方可启动线程,而run方法只是Thread的一个普通方法调用,还是在主线程里执行。

Future

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

Future接口中声明了5个方法,下面依次解释每个方法的作用:

  • cancel(boolean mayInterruptIfRunning)方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunningtrue还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunningtrue还是false,肯定返回true
  • isCancelled()方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true
  • isDone()方法表示任务是否已经完成,若任务完成,则返回true
  • get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
  • get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null

也就是说Future提供了三种功能:

  • 判断任务是否完成;
  • 能够中断任务;
  • 能够获取任务执行结果。

FutureTask

FutureTask类实现了RunnableFuture接口,我们看一下RunnableFuture接口的实现:

1
public interface RunnableFuture extends Runnable, Future {
2
    void run();
3
}

可以看出RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

FutureTask提供了2个构造器:

1
public FutureTask(Callable callable) {}
2
public FutureTask(Runnable runnable, V result) {}

事实上,FutureTaskFuture接口的一个唯一实现类。

线程通信和控制

在调用wait(), notify()notifyAll()的时候,必须先获得锁,且状态变量须由该锁保护,而固有锁对象与固有条件队列对象又是同一个对象。也就是说,要在某个对象上执行waitnotify,先必须锁定该对象,而对应的状态变量也是由该对象锁保护的。

调用一个Objectwaitnotify/notifyAll的时候,必须保证调用代码对该Object是同步的,也就是说必须在作用等同于synchronized(obj){......}的内部才能够去调用objwaitnotify/notifyAll三个方法,否则就会报错:

1
java.lang.IllegalMonitorStateException:current thread not owner

wait/notify/notifyAll

一个简单的等待、唤醒示例:

1
public class WaitAndNotify {
2
    Object lock = new Object();
3
4
    public void prevFunc() {
5
        try {
6
            Thread.sleep(2000);
7
            synchronized (lock) {
8
                lock.notifyAll();
9
            }
10
11
        } catch (InterruptedException e) {
12
            e.printStackTrace();
13
        }
14
    }
15
16
    public void nextFunc() {
17
        try {
18
            synchronized (lock) {
19
                System.out.println(Thread.currentThread().getId() + "等待任务执行,线程wait");
20
                lock.wait();
21
                System.out.println(Thread.currentThread().getId() + "任务执行完毕,线程notify");
22
            }
23
24
        } catch (InterruptedException e) {
25
            e.printStackTrace();
26
        }
27
28
    }
29
30
    public static void main(String[] args) {
31
        WaitAndNotify waitAndNotify = new WaitAndNotify();
32
        new Thread(waitAndNotify::nextFunc).start();
33
        new Thread(waitAndNotify::nextFunc).start();
34
        new Thread(waitAndNotify::nextFunc).start();
35
        new Thread(waitAndNotify::prevFunc).start();
36
    }
37
}

notifynotifyAll的区别是前者只随机唤醒一个线程,后者唤醒所有。

join

join的作用很简单:让一个线程等待另一个线结束之后才能继续运行。

举个栗子,主线程创造一个子线程执行耗时操作,等子线程执行完之后回到主线程继续处理。

1
public class ThreadJoin {
2
    void funcA() {
3
        try {
4
            System.out.println(Thread.currentThread().getId() + "开始耗时操作");
5
            Thread.sleep(2000);
6
            System.out.println(Thread.currentThread().getId() + "耗时操作完成");
7
        } catch (InterruptedException e) {
8
            e.printStackTrace();
9
        }
10
    }
11
12
    public static void main(String[] args) {
13
        ThreadJoin threadJoin = new ThreadJoin();
14
        Thread thread = new Thread(threadJoin::funcA);
15
        thread.start();
16
        try {
17
            System.out.println("主线程等待结果");
18
            thread.join();
19
            System.out.println("主线程结束");
20
        } catch (InterruptedException e) {
21
            e.printStackTrace();
22
        }
23
    }
24
}

yield

yield的作用也很简单,就是让出当前时间片,让其他线程使用 CPU,自身从运行状态重新变成就绪状态,然后重新竞争 CPU 的使用权。

平时几乎没用过yield,看到一篇博客上这么说明的:

yield 方法可以很好的控制多线程,如执行某项复杂的任务时,如果担心占用资源过多,可以在完成某个重要的工作后使用 yield 方法让掉当前 CPU 的调度权,等下次获取到再继续执行,这样不但能完成自己的重要工作,也能给其他线程一些运行的机会,避免一个线程长时间占有 CPU 资源。

Interrupt

线程的interrupt()方法是中断线程,将会设置该线程的中断状态位,即设置为true,中断的结果线程是死亡、还是等待新的任务或是继续运行至下一步,就取决于这个程序本身。线程会不时地检测这个中断标示位,以判断线程是否应该被中断(中断标示值是否为true)。它并不像stop方法那样会中断一个正在运行的线程。

一个最简单的用法,在线程内判断是否被中断,线程外进行中断操作:

1
while (!Thread.interrupted()) {
2
    System.out.println("running");
3
}

底层实现方式

synchronized 的实现方式

由于对 c++ 的源码不熟悉,仅从现有的博客中总结synchronized关键字实现方式。

每个对象都拥有自己的监视器,当这个对象由同步块或者这个对象的同步方法调用时,执行方法的线程必须先获取该对象的监视器才能进入同步块和同步方法,如果没有获取到监视器的线程将会被阻塞在同步块和同步方法的入口处,进入到 BLOCKED 状态。

对于同步方法:方法级同步没有通过字节码指令来控制,它实现在方法调用和返回操作之中。当方法调用时,调用指令会检查方法ACC_SYNCHRONIZED访问标志是否被设置,若设置了则执行线程需要持有监视器(Monitor)才能运行方法,当方法完成(无论是否出现异常)时释放监视器。

对于同步代码块:synchronized关键字经过编译后,会在同步块的前后分别形成monitorentermonitorexit两个字节码指令,每条monitorenter指令都必须执行其对应的monitorexit指令,为了保证方法异常完成时这两条指令依然能正确执行,编译器还会自动产生一个异常处理器,其目的就是用来执行monitorexit指令。

Monitor是线程私有的数据结构,每个线程都有一个可用monitor record列表,同时 还有一个全局的可用列表。每一个被锁住的对象都会和一个monitor关联(对象头的MarkWord中的LockWord指向monitor的起始地址),同时monitor中有一个owner字段存放拥有该锁的线程的唯一标识,表示该锁被这个线程占用。其结构如下:

Owner:初始时为 NULL 表示当前没有任何线程拥有该monitor record,当线程成功拥有该锁后保存线程唯一标识,当锁被释放时又设置为 NULL
EntryQ:关联一个系统互斥锁(semaphore),阻塞所有试图锁住monitor record失败的线程
RcThis:表示blockedwaiting在该monitor record上的所有线程的个数
Nest:用来实现重入锁的计数
HashCode:保存从对象头拷贝过来的HashCode值(可能还包含GC age)
Candidate:用来避免不必要的阻塞或等待线程唤醒,因为每一次只有一个线程能够成功拥有锁,如果每次前一个释放锁的线程唤醒所有正在阻塞或等待的线程,会引起不必要的上下文切换(从阻塞到就绪然后因为竞争锁失败又被阻塞)从而导致性能严重下降。Candidate只有两种可能的值 0 表示没有需要唤醒的线程 1 表示要唤醒一个继任线程来竞争锁。

Lock 实现原理

加锁过程

ReentrantLock为例,我们看看他都做了些什么:

1
public ReentrantLock() {
2
    sync = new NonfairSync();
3
}

在构造方法中,就是初始化了一个静态内部类,叫做非公平同步,继续点进去,可以看到类似这个NonfairSync,还有个FairSync,都是继承于Sync:

1
static final class NonfairSync extends Sync
2
static final class FairSync extends Sync

Sync由继承自AbstractQueuedSynchronizer,这么长一坨是个什么东西?从名字里可以看出是个抽象的队列同步器。而他由继承自AbstractOwnableSynchronizer(AOS),整个继承关系如下:

FairSyncNonfairSync的区别在于,是不是保证获取锁的公平性,因为默认是NonfairSync,我们就先看这个。

点进AbstractQueuedSynchronizer代码里面,可以看到内部维护了一个双向链表,锁的存储结构归根结底就是两个东西:双向链表+int类型的状态。

可以看到,都是用volatile关键字修饰的,链表的头尾还有transient关键字。

获取锁的时候,又做了什么?我们查看ReentrantLocklock()方法:

1
public void lock() {
2
    sync.acquire(1);
3
}

继续追踪,借着注释的帮助:

1
/**
2
 * Acquires in exclusive mode, ignoring interrupts.  Implemented
3
 * by invoking at least once {@link #tryAcquire},
4
 * returning on success.  Otherwise the thread is queued, possibly
5
 * repeatedly blocking and unblocking, invoking {@link
6
 * #tryAcquire} until success.  This method can be used
7
 * to implement method {@link Lock#lock}.
8
 *
9
 * @param arg the acquire argument.  This value is conveyed to
10
 *        {@link #tryAcquire} but is otherwise uninterpreted and
11
 *        can represent anything you like.
12
 */
13
public final void acquire(int arg) {
14
    if (!tryAcquire(arg) &&
15
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
16
        selfInterrupt();
17
}

可以看到这里实际上做了三个动作:

  1. 尝试设置state,也就是获取锁,这里的tryAcquire实际上对应了FairNonefair两种情况,我们继续以默认的非公平来看。(公平的情况其实类似,只不过在加锁之前判断了一下当前线程是不是在等待队列的队首,以保证公平性。)

    1
    final boolean nonfairTryAcquire(int acquires) {
    2
        final Thread current = Thread.currentThread();
    3
        int c = getState();
    4
        if (c == 0) {
    5
            if (compareAndSetState(0, acquires)) {
    6
                setExclusiveOwnerThread(current);
    7
                return true;
    8
            }
    9
        }
    10
        else if (current == getExclusiveOwnerThread()) {
    11
            int nextc = c + acquires;
    12
            if (nextc < 0) // overflow
    13
                throw new Error("Maximum lock count exceeded");
    14
            setState(nextc);
    15
            return true;
    16
        }
    17
        return false;
    18
    }

    很好理解,如果state==0,就是说锁没有被占用,就把state原子操作地设置为 1,也就是占用锁,然后记录当前线程。如果请求占用锁的是当前持有锁的线程,就把state加一。其他情况获取不到锁,返回失败。

    这一步如果获取锁成功,就没有后续步骤了,如果失败,就进行第二步。

  2. addWaiter:将当前线程加入上面锁的双向链表(等待队列)中。这里用的是CAS的方式,这一步比较简单就不放源码了。

  3. acquireQueued()

    1
    /**
    2
     * Acquires in exclusive uninterruptible mode for thread already in
    3
     * queue. Used by condition wait methods as well as acquire.
    4
     *
    5
     * @param node the node
    6
     * @param arg the acquire argument
    7
     * @return {@code true} if interrupted while waiting
    8
     */    
    9
    final boolean acquireQueued(final Node node, int arg) {
    10
        boolean interrupted = false;
    11
        try {
    12
            for (;;) {
    13
                final Node p = node.predecessor();
    14
                if (p == head && tryAcquire(arg)) {
    15
                    setHead(node);
    16
                    p.next = null; // help GC
    17
                    return interrupted;
    18
                }
    19
                if (shouldParkAfterFailedAcquire(p, node))
    20
                    interrupted |= parkAndCheckInterrupt();
    21
            }
    22
        } catch (Throwable t) {
    23
            cancelAcquire(node);
    24
            if (interrupted)
    25
                selfInterrupt();
    26
            throw t;
    27
        }
    28
    }

    前面我们已经把当前线程加到队列的队尾了,acquireQueued()的作用就是逐步的去执行等待队列的线程,如果当前线程获取到了锁,则返回;否则,当前线程进行休眠,直到唤醒并重新获取锁了才返回。

    shouldParkAfterFailedAcquire()方法的作用是判断当前线程是否需要被阻塞,具体的判断规则如下:

    • 规则1:如果前继节点状态为SIGNAL,表明当前节点需要被unpark(唤醒),此时则返回true。parkAndCheckInterrupt()的作用是阻塞当前线程,并且返回“线程被唤醒之后”的中断状态。
    • 规则2:如果前继节点状态为CANCELLED(ws>0),说明前继节点已经被取消,则通过先前回溯找到一个有效(非CANCELLED状态)的节点,并返回false
    • 规则3:如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,并返回false

    看代码:

    1
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    2
        int ws = pred.waitStatus;// 前继节点的状态
    3
        if (ws == Node.SIGNAL)// 如果前继节点是SIGNAL状态,则意味这当前线程需要被unpark唤醒。此时,返回true。
    4
            /*
    5
             * This node has already set status asking a release
    6
             * to signal it, so it can safely park.
    7
             */
    8
            return true;
    9
        if (ws > 0) {// 如果前继节点是“取消”状态,则设置 “当前节点”的 “当前前继节点”  为  “‘原前继节点’的前继节点”。
    10
            /*
    11
             * Predecessor was cancelled. Skip over predecessors and
    12
             * indicate retry.
    13
             */
    14
            do {
    15
                node.prev = pred = pred.prev;
    16
            } while (pred.waitStatus > 0);
    17
            pred.next = node;
    18
        } else {  // 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL状态。
    19
            /*
    20
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
    21
             * need a signal, but don't park yet.  Caller will need to
    22
             * retry to make sure it cannot acquire before parking.
    23
             */
    24
            pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
    25
        }
    26
        return false;
    27
    }

    如果“规则1”发生,即“前继节点是SIGNAL”状态,则意味着“当前线程”需要被阻塞。接下来会调用parkAndCheckInterrupt()阻塞当前线程,直到当前先被唤醒才从parkAndCheckInterrupt()中返回。parkAndCheckInterrupt()的作用是阻塞当前线程,并且返回“线程被唤醒之后”的中断状态。

三个步骤都完成之后,if()中的判断终于结束了,进入到方法selfInterrupt()。这个就很简单了,当前线程自己执行一个中断。

再回顾整个过程:

  1. 先是通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,再通过acquireQueued()获取锁。
  2. 尝试失败的情况下,会先通过addWaiter()来将当前线程加入到CLH队列末尾;然后调用acquireQueued(),在CLH队列中排序等待获取锁,在此过程中,线程处于休眠状态。直到获取锁了才返回。 如果在休眠等待过程中被中断过,则调用selfInterrupt()来自己产生一个中断。

释放锁过程

释放锁过程相对简单:

1
public void unlock() {
2
    sync.release(1);
3
}

继续追踪:

1
public final boolean release(int arg) {
2
    if (tryRelease(arg)) {
3
        Node h = head;
4
        if (h != null && h.waitStatus != 0)
5
            unparkSuccessor(h);
6
        return true;
7
    }
8
    return false;
9
}

可以看到,先尝试释放锁,这里就是判断下锁的state的数字是不是 0,当前线程是不是锁的持有者。释放成功之后,唤醒等待队列的后续节点,因为已经让出锁了,可以让后续的节点进行操作。

可能遇到的问题和解决办法

死锁

有可能产生死锁的情况:

系统资源的竞争

通常系统中拥有的不可剥夺资源,其数量不足以满足多个线程运行的需要,使得线程在 运行过程中,会因争夺资源而陷入僵局,如磁带机、打印机等。只有对不可剥夺资源的竞争 才可能产生死锁,对可剥夺资源的竞争是不会引起死锁的。

线程推进顺序非法

线程在运行过程中,请求和释放资源的顺序不当,也同样会导致死锁。例如,并发线程 P1、P2 分别保持了资源 R1、R2,而线程 P1 申请资源 R2,线程 P2 申请资源 R1 时,两者都 会因为所需资源被占用而阻塞。

信号量使用不当也会造成死锁。线程间彼此相互等待对方发来的消息,结果也会使得这 些线程间无法继续向前推进。例如,线程A等待线程B发的消息,线程B又在等待线程A 发的消息,可以看出线程A和B不是因为竞争同一资源,而是在等待对方的资源导致死锁。

死锁是由四个必要条件导致的,所以一般来说,只要破坏这四个必要条件中的一个条件,死锁情况就应该不会发生。

  1. 如果想要打破互斥条件,我们需要允许进程同时访问某些资源,这种方法受制于实际场景,不太容易实现条件;
  2. 打破不可抢占条件,这样需要允许进程强行从占有者那里夺取某些资源,或者简单一点理解,占有资源的进程不能再申请占有其他资源,必须释放手上的资源之后才能发起申请,这个其实也很难找到适用场景;
  3. 进程在运行前申请得到所有的资源,否则该进程不能进入准备执行状态。这个方法看似有点用处,但是它的缺点是可能导致资源利用率和进程并发性降低;
  4. 避免出现资源申请环路,即对资源事先分类编号,按号分配。这种方式可以有效提高资源的利用率和系统吞吐量,但是增加了系统开销,增大了进程对资源的占用时间。

面试题

多个线程交替顺序打印

比如两个线程,从零开始,一个打印偶数一个打印奇数。

方法一,轮流休眠唤醒:

1
int times = 100 * 10000;
2
Thread thread1;
3
Thread thread2;
4
5
thread1 = new Thread(() -> {
6
    for (int i = 0; i < times; i++) {
7
        final int index1 = 2 * i;
8
        synchronized (this) {
9
            print("偶数打印:" + index1);
10
            if (i == times - 1)
11
                break;
12
            try {
13
                this.notify();
14
                this.wait();
15
            } catch (InterruptedException e) {
16
                e.printStackTrace();
17
            }
18
        }
19
    }
20
});
21
22
thread2 = new Thread(() -> {
23
    for (int i = 0; i < times; i++) {
24
        final int index2 = 2 * i + 1;
25
        synchronized (this) {
26
            print("奇数打印:" + index2);
27
            if (i == times - 1)
28
                break;
29
            try {
30
                this.notify();
31
                this.wait(100);
32
            } catch (InterruptedException e) {
33
                e.printStackTrace();
34
            }
35
        }
36
    }
37
});
38
thread1.start();
39
thread2.start();
40
thread2.join();

多次测试平均耗时大概 7.7 秒左右。

方法二,利用volitale关键字实现 CAS:

1
private static volatile boolean flag = true;
2
3
public void func2() {
4
    int times = 100 * 10000;
5
    ExecutorService service = Executors.newFixedThreadPool(2);
6
    service.execute(() -> {
7
        int i = 0;
8
        while (i < times) {
9
            if (flag) {
10
                print("偶数打印:" + 2 * i++);
11
                flag = false;
12
            }
13
        }
14
    });
15
    service.execute(() -> {
16
        int i = 0;
17
        while (i < times) {
18
            if (!flag) {
19
                print("奇数打印:" + (1 + 2 * i++));
20
                flag = true;
21
            }
22
        }
23
    });
24
    service.shutdown();
25
    try {
26
        service.awaitTermination(Integer.MAX_VALUE, TimeUnit.HOURS);
27
    } catch (InterruptedException e) {
28
        e.printStackTrace();
29
    }
30
}

多次测试平均时间消耗大概在 3.6 秒,可以看出不用锁会快很多。

方法三,类似方法一,用LockCondition轮流休眠和唤醒,效率略高:

1
int times = 100 * 10000;
2
Lock lock = new ReentrantLock();
3
Condition condition = lock.newCondition();
4
5
Thread thread1 = new Thread(() -> {
6
    int i = 0;
7
    while (i < times) {
8
        lock.lock();
9
        print("偶数打印:" + 2 * i++);
10
        try {
11
            condition.signal();
12
            condition.await();
13
        } catch (InterruptedException e) {
14
            e.printStackTrace();
15
        }
16
        lock.unlock();
17
    }
18
});
19
Thread thread2 = new Thread(() -> {
20
    int i = 0;
21
    while (i < times) {
22
        lock.lock();
23
        print("奇数打印:" + (1 + 2 * i++));
24
        if (i == times)
25
            return;
26
        try {
27
            condition.signal();
28
            condition.await();
29
        } catch (InterruptedException e) {
30
            e.printStackTrace();
31
        }
32
        lock.unlock();
33
    }
34
});
35
thread1.start();
36
thread2.start();
37
try {
38
    thread2.join();
39
} catch (InterruptedException e) {
40
    e.printStackTrace();
41
}

多次测试执行时间在 6.4 毫秒左右。

线程池

Java 通过 Executors 提供四种线程池,分别为:

  • newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
  • newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  • newScheduledThreadPool创建一个定长线程池,支持定时及周期性任务执行。
  • newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

后面还新增了两个:

  • newSingleThreadScheduledExecutor单线程的任务队列。

  • newWorkStealingPool工作窃取线程池,内部通过 ForkJoinPool 来实现,通过切分任务让所有 CPU 都不闲置。

在阿里巴巴Java开发手册中也明确指出,是『不允许』使用Executors创建线程池。

避免使用 Executors 创建线程池,主要是避免使用其中的默认实现,那么我们可以自己直接调用ThreadPoolExecutor的构造函数来自己创建线程池。在创建的同时,给BlockQueue指定容量就可以了。

应用

java.util.concurrent包

  • 数据结构:ConcurrentHashMap, BlockingQueue 系列
  • 线程池:Executor, ExecutorService, ThreadPoolExecutor, ScheduledThreadPoolExecutor, Executors(工厂类), Callable, Runnable, Future
  • 锁:ReentrantLock, ReentrantReadWriteLock, Condition
  • 线程同步:CountDownLatch, CyclicBarrier

本来想一篇都写写,发现内容实在太多了,这部分后续再单开博客详细论述吧…这篇够长了。

参考链接

Java多线程学习(一)Java多线程入门

java-线程中start和run的区别

Java多线程学习(吐血超详细总结)

Java并发编程:Callable、Future和FutureTask

Java 之 synchronized 详解

Java并发编程:Lock

java中几种锁,分别是什么?

Thread的中断机制(interrupt)

Java并发——关键字synchronized解析