🚀本篇章代码 Demo

# 多线程概念

  • 在现代操作系统中启动某一个程序时,操作系统就会为其创建一个进程,而在操作系统中调度最小单位是线程,也被称为轻量级进程,在一个进程里面可以创建多个线程,它们都拥有各自的计数器、栈、堆、局部变量等属性,并且可以访问共享内存变量。
  • 进程:操作系统中正在运行的程序,是系统进行资源分配和调用的独立单位,每一个进程都拥有它自己的内存空间和系统资源。
  • 线程:线程是进程中的单个顺序控制流,一个进程如果只有一条执行路径则被称为单线程程序,而如果一个进程有多条执行路径则被称为多线程程序。

# 多线程创建 & 启动

多线程有两种创建方法,第一种是继承 Thread 类并重写 run 方法,第二种是实现 Runnable 接口并重写 run 方法。

第一种继承 Thread 类并重写 run 方法,示例代码如下:

InheritThread.java
package top.rem.rain.demo1;
/**
 * @Author: LightRain
 * @Description: 继承 Thread 实现线程
 * @DateTime: 2024-01-07 17:52
 * @Version:1.0
 **/
public class InheritThread extends Thread {
    /**
     * 重写 run 方法
     */
    @Override
    public void run() {
        System.out.println("第一种继承Thread实现线程。");
    }
}

第二种实现 Runnable 接口并重写 run 方法,示例代码如下:

ImplementRunnable.java
package top.rem.rain.demo1;
/**
 * @Author: LightRain
 * @Description: 实现 Runnable 接口来实现线程
 * @DateTime: 2024-01-07 17:54
 * @Version:1.0
 **/
public class ImplementRunnable implements Runnable {
    /**
     * 使用 Runnable 接口必须重写 run 方法
     */
    @Override
    public void run() {
        System.out.println("第二种实现Runnable接口并重写run方法。");
    }
}

线程启动代码如下:

ThreadTest.java
package top.rem.rain.demo1;
/**
 * @Author: LightRain
 * @Description: 线程测试
 * @DateTime: 2024-01-07 17:58
 * @Version:1.0
 **/
public class ThreadTest {
    public static void main(String[] args) {
        // 第一种继承 Thread 的启动方法
        InheritThread inheritThread = new InheritThread();
        // 启动线程
        inheritThread.start();
        // 第二种实现 Runnable 接口的启动方法
        ImplementRunnable implementRunnable = new ImplementRunnable();
        Thread thread = new Thread(implementRunnable);
        // 启动线程
        thread.start();
        /*
        执行结果:
        第一种继承 Thread 并重写 run 方法实现线程。
        第二种实现 Runnable 接口并重写 run 方法。
         */
    }
}

冷知识:当调用 start() 方法时并不是立即执行多线程的代码,而是使该线程变为可运行状态,什么时候运行多线程代码是由操作系统来决定的。

# 中断线程

首先什么是中断线程?当线程的 run() 方法执行方法体中的最后一条语句后并经由执行 return 语句返回时,或出现方法中没有捕获的异常时线程将终止。在 Java 早期版本中有一个 stop() 方法,其它线程可以调用它进行终止操作,但现在这个方法已经被弃用了,原因是这个方法会造成一些线程不安全问题。

可以将中断理解为一个 flag 属性,它表示一个运行中的线程是否被其它线程进行了中断操作,其它线程通过调用该线程的 interrupt() 方法对其进行中断操作,当一个线程调用 interrupt() 方法时,线程中断状态的 flag 将被改变,这是每个线程都具有的 boolean 类型的标志,每个线程都应该不时的检查这个标志来判断线程是否被中断,判断线程是否被中断的代码是: Thread.currentThread().isInterrupted() 默认没有被中断的状态下是 false

如果当前线程处于阻塞状态 (sleep|wait) , 就会无法检测中断状态,此时将会抛出 InterruptedException(中断异常) 。如果在每次迭代之后都调用 sleep() 方法或其它线程中断方法,此时 isInterrupted() 检测就没有必要了,如果在中断状态被改变时调用 sleep() 方法,它不会休眠反而会清除这一休眠状态并抛出 InterruptedException 异常。所以如果在循环中调用 sleep() 方法时不要去检测中断状态,只需要捕获 InterruptedException 异常即可,示例代码如下:

@Override
public void run() {
    while(flag){
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            // 线程在睡眠期间中断
            e.printStackTrace();
        }finally{
           // ...
        }
    }
}

注意:在捕捉中断异常时,不要在 catch 块中什么都不处理。

不正确的处理方式如下:

public void test() {
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
    }
}

正确的处理方式 1:

public void test()throw InterruptedException{
    Thread.sleep(5000);
}

正确的处理方式 2:

public void test() {
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        // 中断当前线程
        Thread.currentThread().interrupt();
    }
}

关于中断线程的一些主要方法:

返回类型方法名称方法说明
voidinterrupt()向线程发送中断请求,线程中断状态将被设置为 true , 如果当前线程被一个 sleep() 调用阻塞,将会抛出 InterruptedException(中断异常)
static booleaninterrupted()测试当前正在执行的线程是否被中断,这是一个静态方法,调用这个方法将会产生一个副作用,那就是它会将当前线程的中断重置为 false
booleanisInterrupted()判断线程是否被中断,当前方法调用不会产生副作用 (不会改变线程当前的中断状态)。
static native ThreadcurrentThread()返回当前执行线程的 Thread 对象。

# 守护线程

守护线程的唯一作用就是为其它线程提供服务,可以使用 thread.setDaemon(true) 的方法将线程转为守护线程,计时线程就是一个典型例子,它定时发送 (计时器) 信号来告诉其它线程去执行某项任务。当只剩下守护线程时,虚拟机将会退出,因为如果只剩下守护线程,程序就没有必要执行了。

另外 JVM 的垃圾回收、内存管理等都是守护线程,最后要注意的就是在 Java 虚拟机退出时线程中的 finally 代码块并不一定会执行,因此在使用线程时,不能依靠 finally 代码块来确保执行关闭或清理占用资源的逻辑,代码示例如下:

GuardianThread.java
package top.rem.rain.demo1;
/**
 * @Author: LightRain
 * @Description: 守护线程示例
 * @DateTime: 2024-01-08 01:26
 * @Version:1.0
 **/
public class GuardianThread {
    public static void main(String[] args) {
        Thread thread = new Thread(new GuardianThreadRunner(), "GuardianThreadRunner");
        // 设置为守护线程
        thread.setDaemon(true);
        // 启动线程
        thread.start();
    }
    
    static class GuardianThreadRunner implements Runnable{
        @Override
        public void run() {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally{
                System.out.println("线程中的finally代码块并不一定会执行。");
            }
        }
    }
}

# 线程优先级

什么是线程优先级?在现代操作系统中基本采用时分的形式调度运行的线程,操作系统会分出一个个时间片,线程会分配到若干时间片,当线程的时间片用完了就会发生线程调度,并等待着下一次分配。线程分配到的时间片多少也决定了线程使用处理器资源的多少,而线程优先级就是决定线程需要多少或者少分配一些处理器资源的线程属性。

Java 线程中,通过一个整形成员变量 priority 来控制优先级,优先级的范围从 1 ~ 10 ,在线程构建的时候可以通过 serPriority() 方法来修改优先级,默认优先级是 5 ,优先级高的线程分配时间片的数量要多于优先级低的线程。设置线程优先级时,针对频繁阻塞 (休眠或者 I/O 操作) 的线程需要设置较高优先级,而偏重计算 (需要较多 CPU 时间或者偏运算) 的线程则设置较低的优先级,确保处理器不会被独占。在不同的 JVM 以及操作系统上,线程规划会存在差异,有些操作系统甚至会忽略对线程优先级的设定。 (Java并发编程的艺术)

Thread 类中有三种静态常量的优先级,分别是: Thread.MIN_PRIORITY = 1Thread.NORM_PRIORITY = 5Thread.MAX_PRIORITY = 10

需要注意的是:不要太过于依赖优先级,如果确实要用,则应该避免常犯的一个错误,如果有几个高优先级的线程没有进入非活动状态,优先级低的线程可能永远也不会执行。每当调度器决定运行一个新线程时,首先会在具有高优先级的线程中进行选择,尽管这样使低优先级的线程可能永远不会被执行到。因此我们需要在设置优先级时,针对频繁阻塞的线程需要设置较高的优先级,而偏重计算的线程则设置较低的优先级,这样才能确保处理器不会被长久独占。

# 线程状态转化关系

  • New(新建状态) :新建一个线程对象。
  • Runnable(就绪状态) :在线程对象创建后,其它线程调用该对象的 start() 方法,该状态的线程位于可运行线程池中变得可运行并等待 CPU 的使用权。
  • Running(运行状态) :就绪状态的线程获取了 CPU 并执行程序代码。
  • Blocked(阻塞状态) :阻塞状态是线程因为某种原因放弃 CPU 使用权,暂时停止运行,直到线程进入就绪状态下才有机会转到运行状态。阻塞状态分为三种情况:
    • Waiting(等待阻塞) :运行的线程执行 wait() 方法, JVM 会把该线程放入等待池中。
    • Blocked(同步阻塞) :运行的线程在获取对象的同步锁时,若该同步锁被别的线程占用则 JVM 会把该线程放入锁池中。
    • TimeWaiting(超时阻塞) :运行的线程执行 sleep()join() 方法以及发出 I/O 请求时, JVM 会把该线程置为阻塞状态。
  • Dead(死亡状态) :线程执行完毕或因异常退出了 run() 方法时该线程将结束生命周期。

pFSNgzt.png

上图中的方法解析如下:

  • Thread.sleep() :在指定时间内让当前正在执行的线程暂停工作,但不会释放 锁标志 ,不推荐使用。
  • Thread.sleep(long) :使当前线程进入阻塞状态,在指定时间内不会执行。
  • Object.notify() :从对象等待池中唤醒其中一个线程。
  • Object.notifyAll() :从对象等待池中唤醒所有等待线程。
  • Thread.yieId() :暂停当前正在执行的线程对象, yieId() 方法只是使当前线程重新回到可执行状态,所以执行 yieId() 方法的线程有可能再进入到可执行状态后马上又被执行, yieId() 方法只能使同优先级或更高优先级的线程有机会执行。
  • Thread.join() :把指定的线程加入到当前线程,可以将两个交替执行的线程合并为顺序执行的线程。如:在线程 B 中调用了线程 Ajoin() 方法,那么直到线程 A 执行完毕后才会继续执行线程 B
  • Object.wait()/Object.wait(long) :在其它线程调用对象的 notify()notifyAll() 方法前,导致当前线程等待,线程会释放掉他所占用的 锁标志 ,从而使别的线程有机会抢占该锁,当前线程必须拥有当前对象锁,如果当前线程不是此锁的拥有者将会抛出 IllegalMonitorStateException 异常。唤醒当前对象锁的等待线程使用 notify()notifyAll() 方法,也必须拥有相同的对象锁否则也会抛出 IllegalMonitorStateException 异常, waite()notify() 方法必须在 synchronized 方法或者是 synchronized 代码块中调用,如果不在 synchronized 方法或代码块中进行调用则会在运行期间抛出 IllegalMonitorStateException 异常。

# 线程同步问题

线程同步问题的产生,先来看下面示例代码的卖票系统:

package top.rem.rain.demo1;
/**
 * @Author: LightRain
 * @Description: 卖票类 会产生线程同步问题
 * @DateTime: 2024-01-08 18:47
 * @Version:1.0
 **/
public class SellTicket implements Runnable {
  /**
   * 当前票数
   */
  private int num = 100;
  @Override
  public void run() {
    while (true) {
      try {
        if (num > 0) {
          Thread.sleep(10);
          // 打印卖票信息
          System.out.println(Thread.currentThread().getName() + "......" + num--);
        } else {
          break;
        }
      } catch (InterruptedException ignored) {
      }
    }
  }
  public static void main(String[] args) {
    SellTicket sellTicket = new SellTicket();
    // 创建 4 个线程同时卖票
    Thread t1 = new Thread(sellTicket);
    Thread t2 = new Thread(sellTicket);
    Thread t3 = new Thread(sellTicket);
    Thread t4 = new Thread(sellTicket);
    // 启动线程
    t1.start();
    t2.start();
    t3.start();
    t4.start();
  }
}
执行结果
...............
Thread-2......6
Thread-0......5
Thread-1......4
Thread-0......2
Thread-2......3
Thread-3......4
Thread-0......1
Thread-2......1
Thread-1......1
Thread-3......0

从执行结果中可以看出三个售票窗口同时卖出了 1 号票,这当然是不正确的逻辑,这个问题就是线程同步问题,不同的线程都对同一个数据进行操作就很容易导致数据错乱问题,也就是所谓的线程不同步。

在解决这个问题之前先来分析一下为什么会产生这种结果?当声明一个 SellTicket 线程类,在这个类中又声明了一个成员变量 num , 然后通过 run() 方法不断的去获取 num 的值并输出,在输出时并且又进行了 num-1 操作,最后在 main 方法中创建了四个线程同时操作这个数据,当程序运行后就出现了线程同步问题,可以看出产生线程同步问题的条件有两个:第一种是多个线程在操作共享数据 (num) , 第二种是操作共享数据的线程有多条。

Java 中有两种防止线程安全问题的发生, Java 提供了一个 synchronized 关键字来解决这个问题,同时在 JavaSE 5.0 中还引入了 Lock 锁对象的相关类。线程同步问题的解决

# 锁对象的介绍

# Lock - 通过锁

Lock 锁对象,在 Java 中锁是用来控制多个线程访问共享数据的方式,一个锁能够防止多个线程同时访问共享数据 (但读写锁可以允许多个线程同时访问共享数据),在 LOck 接口出现之前, Java 是靠 synchronized 关键字实现的锁功能,在 JavaSE 5.0 之后并发包中新增了 Lock 接口用来实现锁功能,它提供了与 synchronized 关键字类似的同步功能,只是在使用时需要显式地获取和释放锁,缺点就是缺少像 synchronized 那样隐式获取释放锁的便捷性,但是却拥有了锁获取与释放的可操作性,可中断获取锁以及超时获取锁等多种 synchronized 关键字所不具备的同步特性。

返回类型方法名称方法说明
voidlock()获取锁,调用该方法会使当前线程获取锁,当获取锁后从该方法返回。
voidlockInterruptibly()可中断获取锁和 lock() 方法不同的是该方法会响应中断,即在获取锁中可以中断当前线程。
booleantryLock()尝试非阻塞获取锁,调用该方法后立即返回,如果能够获取锁则返回 true 否则返回 false
booleantryLock(long time, TimeUnit unit)超时获取锁,当前线程会在三种情况下返回,1. 当前线程在超时时间内获取了锁,2. 当前线程在超时时间被中断,3. 当前线程超时时间结束将返回 false
voidunlock()释放锁。
ConditionnewCondition()条件对象,获取等待通知组件,该组件和当前的锁绑定,当前线程只有获取了锁,才可以调用该组件的 await() 方法,在调用后当前线程将释放锁。

在后面将结合 Lock 接口的子实现类 ReentrantLock 类来使用某些方法。

# ReentrantLock - 重入锁

重入锁就是支持重新进入的锁,它表示该锁能够支持一个线程对资源的重复加锁,就是说在调用 lock() 方法时,已经获取到锁的线程并且能够再次调用 lock() 方法获取锁而不被阻塞,同时还支持获取锁的公平性和非公平性。公平性是在绝对时间上,先对锁进行获取的请求一定先被满足,那么这个锁是公平锁反之是不公平锁,示例代码如下:

ReentrantLockDemo.java
package top.rem.rain.demo1;
import java.util.concurrent.locks.ReentrantLock;
/**
 * @Author: LightRain
 * @Description: 类似于 synchronized 的同步执行
 * @DateTime: 2024-01-09 01:50
 * @Version:1.0
 **/
public class ReentrantLockDemo {
    public static void main(String[] args) {
        /// 参数默认 false,不公平锁  
        ReentrantLock lock = new ReentrantLock();
        // 公平锁
        ReentrantLock lock2 = new ReentrantLock(true); 
        // 如果被其它资源锁定,会在此等待锁释放,达到暂停效果
        lock.lock();
        try {
            // 操作  
        } finally {
            lock.unlock();  // 释放锁
        }
    }
}
ReentrantLockDemo2.java
package top.rem.rain.demo1;
import java.util.concurrent.locks.ReentrantLock;
/**
 * @Author: LightRain
 * @Description: 防止重复执行代码
 * @DateTime: 2024-01-09 01:53
 * @Version:1.0
 **/
public class ReentrantLockDemo2 {
    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        // 如果已经被 lock,则立即返回 false 不会等待,达到忽略操作的效果
        if (lock.tryLock()) {
            try {
                // 操作
            } finally {
                lock.unlock();
            }
        }
    }
}
ReentrantLockDemo3.java
package top.rem.rain.demo1;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
 * @Author: LightRain
 * @Description: 尝试等待执行
 * @DateTime: 2024-01-09 02:03
 * @Version:1.0
 **/
public class ReentrantLockDemo3 {
    public static void main(String[] args) {
        // 公平锁
        ReentrantLock lock = new ReentrantLock(true);
        try {
            if (lock.tryLock(3, TimeUnit.SECONDS)) {
                // 如果已经被 lock,尝试等待 3s,看是否可以获得锁,如果 5s 后仍然无法获得锁则返回 false 继续执行
                try {
                    // 操作
                } finally {
                    lock.unlock();
                }
            }
        } catch (InterruptedException e) {
            // 当前线程被中断时 (interrupt),会抛 InterruptedException
            e.printStackTrace();
        }
    }
}

注意:需要把解锁操作放在 finally 代码块中的第一行的位置这个十分重要。如在临界区的代码抛出异常,锁必须被释放否则其它线程将永远被阻塞。

# 线程同步问题的解决

# ReentrantLock

下面是通过 ReentrantLock 来解决卖票的线程同步问题,代码如下:

SellTicketLock.java
package top.rem.rain.demo1;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
 * @Author: LightRain
 * @Description: 卖票类:使用 ReentrantLock 解决线程同步问题
 * @DateTime: 2024-01-08 18:47
 * @Version:1.0
 **/
public class SellTicketLock implements Runnable {
    /**
     * 创建锁对象
     */
    private Lock lock = new ReentrantLock();
    /**
     * 当前票数
     */
    private int num = 100;
    @Override
    public void run() {
        while (true) {
            try {
                // 获取锁
                lock.lock();
                if (num > 0) {
                    Thread.sleep(10);
                    // 打印卖票信息
                    System.out.println(Thread.currentThread().getName() + "......" + num--);
                } else {
                    break;
                }
            } catch (InterruptedException e) {
                // 出现异常就中断
                Thread.currentThread().interrupt();
            } finally {
                // 释放锁
                lock.unlock();
            }
        }
    }
    public static void main(String[] args) {
        SellTicketLock sellTicket = new SellTicketLock();
        // 创建 4 个线程同时卖票
        Thread t1 = new Thread(sellTicket);
        Thread t2 = new Thread(sellTicket);
        Thread t3 = new Thread(sellTicket);
        Thread t4 = new Thread(sellTicket);
        // 启动线程
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }
}
执行结果
Thread-0......100
Thread-0......99
Thread-0......98
Thread-0......97
Thread-0......96
Thread-0......95
Thread-0......94
Thread-0......93
Thread-0......92
Thread-0......91
Thread-0......90
Thread-0......89
Thread-0......88
Thread-0......87
Thread-0......86
Thread-0......85
Thread-0......84
Thread-0......83
Thread-0......82
Thread-0......81
Thread-0......80
Thread-0......79
Thread-0......78
Thread-0......77
Thread-0......76
Thread-0......75
Thread-0......74
Thread-0......73
Thread-0......72
Thread-0......71
Thread-0......70
Thread-0......69
Thread-0......68
Thread-0......67
Thread-0......66
Thread-0......65
Thread-0......64
Thread-0......63
Thread-0......62
Thread-0......61
Thread-0......60
Thread-0......59
Thread-0......58
Thread-0......57
Thread-0......56
Thread-0......55
Thread-0......54
Thread-0......53
Thread-0......52
Thread-0......51
Thread-0......50
Thread-0......49
Thread-0......48
Thread-0......47
Thread-0......46
Thread-0......45
Thread-0......44
Thread-0......43
Thread-0......42
Thread-0......41
Thread-0......40
Thread-0......39
Thread-0......38
Thread-0......37
Thread-0......36
Thread-0......35
Thread-0......34
Thread-0......33
Thread-0......32
Thread-0......31
Thread-0......30
Thread-0......29
Thread-0......28
Thread-0......27
Thread-0......26
Thread-0......25
Thread-0......24
Thread-0......23
Thread-0......22
Thread-0......21
Thread-0......20
Thread-0......19
Thread-0......18
Thread-0......17
Thread-0......16
Thread-0......15
Thread-0......14
Thread-0......13
Thread-0......12
Thread-0......11
Thread-0......10
Thread-0......9
Thread-0......8
Thread-0......7
Thread-0......6
Thread-0......5
Thread-0......4
Thread-0......3
Thread-0......2
Thread-0......1
进程已结束,退出代码 0

从执行结果中可以看出当前的执行结果没有卖出重复的车票,线程安全问题就此解决。

# synchronized

Java 中可以使用 synchronized 关键字来简化多线程中的代码,并且还可以解决线程安全问题,自 JavaSE 1.0 开始, Java 中的每一个对象都有一个内部锁,如果一个方法使用 synchronized 关键字进行声明,那么这个对象将会保护整个方法,也就是说在调用该方法时线程必须获得内部对象锁才可以进一步执行。

public synchronized void method(){
  //...
}
// ↑等价于↓
private Lock lock = new ReentrantLock();
public void method(){
    lock.lock();
    try{
        //.......
    }finally{
        lock.unlock();
    }
}

可以看的出来使用 synchronized 关键字代码简洁很多,我们必须知道每个对象都有一个内部锁,并且该锁有一个内部条件,由锁来管理那些试图进入 synchronized 方法的线程,由条件来管理那些调用 wait()/notify()/notifyAll() 的线程。同时还需要明白另一件事就是一旦有一个线程通过 synchronized 方法获取到内部锁,该类的所有 synchronized 方法或代码块都无法被其它线程访问,直到当前线程释放了内部锁。

synchronized 同步代码块如下:

Object obj = new Object();
synchronized(obj){
    // 需要同步的代码
}
//obj 是对象锁,它可以是任意对象,也可以使用 this 则代表使用当前对象作为锁对象
synchronized(this){
    // 需要同步的代码
}

下面是使用 synchronized 代码块来解决线程同步问题,代码如下:

SellTicketSynchronized.java
package top.rem.rain.demo1;
/**
 * @Author: LightRain
 * @Description: 卖票类 使用 synchronized 解决线程同步问题
 * @DateTime: 2024-01-08 18:47
 * @Version:1.0
 **/
public class SellTicketSynchronized implements Runnable {
    /**
     * 当前票数
     */
    private int num = 100;
    @Override
    public void run() {
        synchronized (this) {
            while (true) {
                try {
                    if (num > 0) {
                        Thread.sleep(10);
                        // 打印卖票信息
                        System.out.println(Thread.currentThread().getName() + "......" + num--);
                    } else {
                        break;
                    }
                } catch (InterruptedException ignored) {
                }
            }
        }
    }
    public static void main(String[] args) {
        SellTicketSynchronized sellTicket = new SellTicketSynchronized();
        // 创建 4 个线程同时卖票
        Thread t1 = new Thread(sellTicket);
        Thread t2 = new Thread(sellTicket);
        Thread t3 = new Thread(sellTicket);
        Thread t4 = new Thread(sellTicket);
        // 启动线程
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }
}
执行结果
C:\LightRainData\IDEA\JDK\JDK-17.0.6\bin\java.exe "-javaagent:C:\LightRainData\IDEA\IntelliJ IDEA 2022.3.2\lib\idea_rt.jar=1866:C:\LightRainData\IDEA\IntelliJ IDEA 2022.3.2\bin" -Dfile.encoding=UTF-8 -classpath D:\ 项目 \gitee\multi-threaded-example\target\classes top.rem.rain.demo1.SellTicketSynchronized
Thread-0......100
Thread-0......99
Thread-0......98
Thread-0......97
Thread-0......96
Thread-0......95
Thread-0......94
Thread-0......93
Thread-0......92
Thread-0......91
Thread-0......90
Thread-0......89
Thread-0......88
Thread-0......87
Thread-0......86
Thread-0......85
Thread-0......84
Thread-0......83
Thread-0......82
Thread-0......81
Thread-0......80
Thread-0......79
Thread-0......78
Thread-0......77
Thread-0......76
Thread-0......75
Thread-0......74
Thread-0......73
Thread-0......72
Thread-0......71
Thread-0......70
Thread-0......69
Thread-0......68
Thread-0......67
Thread-0......66
Thread-0......65
Thread-0......64
Thread-0......63
Thread-0......62
Thread-0......61
Thread-0......60
Thread-0......59
Thread-0......58
Thread-0......57
Thread-0......56
Thread-0......55
Thread-0......54
Thread-0......53
Thread-0......52
Thread-0......51
Thread-0......50
Thread-0......49
Thread-0......48
Thread-0......47
Thread-0......46
Thread-0......45
Thread-0......44
Thread-0......43
Thread-0......42
Thread-0......41
Thread-0......40
Thread-0......39
Thread-0......38
Thread-0......37
Thread-0......36
Thread-0......35
Thread-0......34
Thread-0......33
Thread-0......32
Thread-0......31
Thread-0......30
Thread-0......29
Thread-0......28
Thread-0......27
Thread-0......26
Thread-0......25
Thread-0......24
Thread-0......23
Thread-0......22
Thread-0......21
Thread-0......20
Thread-0......19
Thread-0......18
Thread-0......17
Thread-0......16
Thread-0......15
Thread-0......14
Thread-0......13
Thread-0......12
Thread-0......11
Thread-0......10
Thread-0......9
Thread-0......8
Thread-0......7
Thread-0......6
Thread-0......5
Thread-0......4
Thread-0......3
Thread-0......2
Thread-0......1
进程已结束,退出代码 0

这样就可以解决线程安全问题了,当然代码同步是要牺牲效率为前提的,同步的好处是解决了线程安全问题,同步的弊端是降低了执行效率,因为同步外的线程都会判断同步锁,同步的前提是同步中必须有多个线程并使用同一个锁。

# 线程间的通信

线程在运行时都拥有自动的栈空间,但如果每个线程在运行中仅仅是孤立地运行,那么这就将会变得毫无价值,如果多线程能够相互配合完成工作,这将带来巨大的价值,这也就是所谓的线程间的通信,在 Java 中多线程的通信使用的是等待 / 通知机制来实现的。

# 等待 / 通知 - synchronized

synchronized 等待 / 通知是指一个线程 A 调用了 Object 类中的 wait() 方法进入等待状态,而另一个线程 B 调用了 Object 类中的 notify()/notifyAll() 方法,当线程 A 收到通知后从对象 Objectwait() 方法返回进而执行后续操作。

上述中的两个线程通过 Object 对象来完成交互,而对象上的 wait()/notify()/notifyAll() 的关系就跟信号开关一样,用来完成等待方和通知方之间的交互。

等待 / 通知的机制主要用到的是 notify()notifyAll()wait()wait(long)wait(long,int) 这些方法,当然这是针对 synchronized 关键字修饰的方法或者代码块,因为要使用这些方法的前提是对调用对象加锁,也就是只能在同步方法或同步代码块中来使用。

# 等待 / 通知 - 条件对象

条件对象就是前面的 Lock 锁对象,通过锁对象的条件来实现等待 / 通知机制。

// 创建条件对象
Condition condition = lock.newCondition();

返回的对象就是与该锁 (lock) 相关的条件对象,条件对象 API 如下:

返回类型方法名称方法说明
voidawait()将该线程放到条件等待池中 (对应 wait() 方法)。
voidsignal()从该条件等待池中随机选择一个线程解除其阻塞状态 (对应 notify() 方法)。
voidsignalAll()解除该条件等待池中的所有线程阻塞状态 (对应 notifyAll() 方法)。

上述方法过程分析:当一个线程 A 调用了条件对象的 await() 方法进入等待状态,而另一个线程 B 调用了条件对象的 signal()signalAll() 方法,线程 A 收到通知后从条件对象的 await() 方法返回并执行后续操作。

这就是两个线程通过条件对象来完成交互,而对象上的 await()signal() 以及 signalAll() 的关系如同信号开关一样,用来完成等待方和通知方之间的交互。当然这样的操作都是必须基于对象锁,当前线程只有获取了锁才可以调用该条件对象的 await() 方法,调用后当前线程将释放锁。

注意:在上述两种等待 / 通知机制中无论是调用 signal()signalAll() 方法还是调用 notify() 以及 notifyAll() 方法都不会立即激活一个等待线程,它们仅仅只是解除等待线程的阻塞状态,以便这些线程可以在当前线程解锁或退出同步方法后,通过争夺 CPU 执行权来实现对象的访问。

# 生产者 & 消费者模式

# 单生产者 & 单消费者

单生产者与单消费者模式就是一个线程生产一个线程消费,下面是等待 / 通知机制下的单生产者与单消费者模式,如是卖北京烤鸭的一个店铺,现在只有一条生产线并只有一条消费路径,就是说只能等生产线生产完成后再通知消费路径去卖,如果消费路径没有烤鸭了就必须通知生产线去生产,此时消费路径将进入等待状态。

在这种场景下我们不仅要保证共享数据 (烤鸭数量) 的线程安全,而且还要保证烤鸭数量在消费之前必须有烤鸭,代码如下:

PekingDuck.java
package top.rem.rain.demo2;
/**
 * @Author: LightRain
 * @Description: 北京烤鸭 - 单生产者与单消费者
 * @DateTime: 2024-01-09 15:38
 * @Version:1.0
 **/
public class PekingDuck {
    /**
     * 烤鸭数量
     */
    private int count = 0;
    /**
     * 烤鸭名称
     */
    private String pekingDuckName;
    /**
     * 线程等待标志
     */
    private boolean flag = false;
    /**
     * 生产烤鸭
     *
     * @param pekingDuckName 烤鸭名称
     */
    public synchronized void production(String pekingDuckName) {
        if (flag) {
            try {
                // 有烤鸭进入等待状态
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 设置烤鸭名称和数量
        this.pekingDuckName = pekingDuckName;
        // 数量 + 1
        count++;
        System.out.println(Thread.currentThread().getName() + ".生产者:" + pekingDuckName + " " + count);
        // 有烤鸭后改变状态
        flag = true;
        // 通知消费线程来消费烤鸭
        notify();
    }
    /**
     * 消费烤鸭
     */
    public synchronized void consumption() {
        // 没有烤鸭就进入等待状态
        if (!flag) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 消费一只烤鸭
        System.out.println(Thread.currentThread().getName() + ".消费者:" + this.pekingDuckName + " " + count);
        // 没有烤鸭后改变状态
        flag = false;
        // 通知生产线程来生产烤鸭
        notify();
    }
    /**
     * 生产者线程
     */
    static class ProducerThread implements Runnable {
        private final PekingDuck pekingDuck;
        public ProducerThread(PekingDuck pekingDuck) {
            this.pekingDuck = pekingDuck;
        }
        @Override
        public void run() {
            while (true) {
                pekingDuck.production("北京烤鸭");
            }
        }
    }
    /**
     * 消费者线程
     */
    static class ConsumerThread implements Runnable {
        private final PekingDuck pekingDuck;
        public ConsumerThread(PekingDuck pekingDuck) {
            this.pekingDuck = pekingDuck;
        }
        @Override
        public void run() {
            while (true) {
                pekingDuck.consumption();
            }
        }
    }
    public static void main(String[] args) {
        // 创建烤鸭店铺对象
        PekingDuck pekingDuck = new PekingDuck();
        // 创建生产者
        ProducerThread producerThread = new ProducerThread(pekingDuck);
        // 创建消费者
        ConsumerThread consumerThread = new ConsumerThread(pekingDuck);
        // 创建生产者线程
        Thread t1 = new Thread(producerThread);
        // 创建消费者线程
        Thread t2 = new Thread(consumerThread);
        // 启动线程
        t1.start();
        t2.start();
    }
}

在上面的类中有两个 synchronized 同步方法,一个是生产烤鸭,另一个是消费烤鸭,之所以需要使用同步是因为我们操作了共享数据 count ,同时为了保证生产烤鸭后才可以进行消费,使用了等待 / 通知机制的 wait()notify() 方法。

当第一次运行生产时调用了生产方法,此时有 0 只烤鸭,即 flag = false 此时不需要等待消费者来消费,因为当前没有可消费的烤鸭,设置烤鸭名称然后烤鸭数量 +1 ,此时就生产了一只烤鸭,随后改变 flag = true , 同时通知消费线程可以来消费烤鸭了,即使此时生产线程再次抢到了执行权因为当前的 flag = true , 所以生产线程会进入等待的阻塞状态,消费者线程被唤醒后进入消费方法,等消费完成后又将 flag 的值设置为了 false 状态,通知生产线程可以再生产烤鸭了,以此循环,只要卖出一只我就生产一只。

main 方法中创建了两个线程,一个是生产线程,一个是消费线程,当开启这两个线程后就会不断的生产消费一直循环,下面是执行结果。

执行结果
C:\LightRainData\IDEA\JDK\JDK-17.0.6\bin\java.exe "-javaagent:C:\LightRainData\IDEA\IntelliJ IDEA 2022.3.2\lib\idea_rt.jar=8168:C:\LightRainData\IDEA\IntelliJ IDEA 2022.3.2\bin" -Dfile.encoding=UTF-8 -classpath D:\ 项目 \gitee\multi-threaded-example\target\classes top.rem.rain.demo2.PekingDuck
Thread-0. 生产者:北京烤鸭 1
Thread-1. 消费者:北京烤鸭 1
Thread-0. 生产者:北京烤鸭 2
Thread-1. 消费者:北京烤鸭 2
Thread-0. 生产者:北京烤鸭 3
Thread-1. 消费者:北京烤鸭 3
Thread-0. 生产者:北京烤鸭 4
Thread-1. 消费者:北京烤鸭 4
Thread-0. 生产者:北京烤鸭 5
Thread-1. 消费者:北京烤鸭 5
Thread-0. 生产者:北京烤鸭 6
Thread-1. 消费者:北京烤鸭 6
Thread-0. 生产者:北京烤鸭 7
Thread-1. 消费者:北京烤鸭 7
........................
进程已结束,退出代码 130

从执行结果中可以看出当生产一只烤鸭就会消费一只烤鸭,运行完全正常,这就是单生产者与单消费者模式,下面将使用对象锁的方式来修改,代码如下:

PekingDuckLock.java
package top.rem.rain.demo2;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
 * @Author: LightRain
 * @Description: 北京烤鸭 - 单生产者与单消费者
 * @DateTime: 2024-01-09 15:38
 * @Version:1.0
 **/
public class PekingDuckLock {
    /**
     * 烤鸭数量
     */
    private int count = 0;
    /**
     * 烤鸭名称
     */
    private String pekingDuckName;
    /**
     * 线程等待标志
     */
    private boolean flag = false;
    /**
     * 创建锁对象
     */
    private final Lock lock = new ReentrantLock();
    /**
     * 创建条件对象
     */
    private final Condition condition = lock.newCondition();
    /**
     * 生产烤鸭
     *
     * @param pekingDuckName 烤鸭名称
     */
    public void production(String pekingDuckName) {
        // 获取锁
        lock.lock();
        try {
            if (flag) {
                try {
                    // 有烤鸭进入等待状态
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 设置烤鸭名称和数量
            this.pekingDuckName = pekingDuckName;
            // 数量 + 1
            count++;
            System.out.println(Thread.currentThread().getName() + ".生产者:" + pekingDuckName + " " + count);
            // 有烤鸭后改变状态
            flag = true;
            // 通知消费线程来消费烤鸭
            condition.signal();
        } finally {
            // 释放锁
            lock.unlock();
        }
    }
    /**
     * 消费烤鸭
     */
    public void consumption() {
        // 获取锁
        lock.lock();
        try {
            // 没有烤鸭就进入等待状态
            if (!flag) {
                try {
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 消费一只烤鸭
            System.out.println(Thread.currentThread().getName() + ".消费者:" + this.pekingDuckName + " " + count);
            // 没有烤鸭后改变状态
            flag = false;
            // 通知生产线程来生产烤鸭
            condition.signal();
        } finally {
            // 释放锁
            lock.unlock();
        }
    }
    /**
     * 生产者线程
     */
    static class ProducerThread implements Runnable {
        private final PekingDuckLock pekingDuck;
        public ProducerThread(PekingDuckLock pekingDuck) {
            this.pekingDuck = pekingDuck;
        }
        @Override
        public void run() {
            while (true) {
                pekingDuck.production("北京烤鸭");
            }
        }
    }
    /**
     * 消费者线程
     */
    static class ConsumerThread implements Runnable {
        private final PekingDuckLock pekingDuck;
        public ConsumerThread(PekingDuckLock pekingDuck) {
            this.pekingDuck = pekingDuck;
        }
        @Override
        public void run() {
            while (true) {
                pekingDuck.consumption();
            }
        }
    }
    public static void main(String[] args) {
        // 创建烤鸭店铺对象
        PekingDuckLock pekingDuck = new PekingDuckLock();
        // 创建生产者
        ProducerThread producerThread = new ProducerThread(pekingDuck);
        // 创建消费者
        ConsumerThread consumerThread = new ConsumerThread(pekingDuck);
        // 创建生产者线程
        Thread t1 = new Thread(producerThread);
        // 创建消费者线程
        Thread t2 = new Thread(consumerThread);
        // 启动线程
        t1.start();
        t2.start();
    }
}

通过使用对象锁的方式来实现,首先创建一个对象锁,这里使用是 ReentrantLock 重入锁,需要手动设置 lock()unlock() 方法去获取锁和释放锁,为了实现等待 / 通知机制,此时还需要通过锁对象去创建一个条件对象 Condition ,然后通过锁对象的 await()signal() 方法来实现等待和通知操作,执行结果和上面使用 synchronized 关键字是一样的结果就不贴了。

# 多生产者 & 多消费者模式

多生产者与多消费者模式就是多条生产线与多条消费路径,在上面示例代码中的 main 方法中新添加两条线程即可,其它地方并未修改,代码如下:

PekingDuck2.java
package top.rem.rain.demo2;
/**
 * @Author: LightRain
 * @Description: 北京烤鸭 - 多生产者与多消费者
 * @DateTime: 2024-01-09 15:38
 * @Version:1.0
 **/
public class PekingDuck2 {
    /**
     * 烤鸭数量
     */
    private int count = 0;
    /**
     * 烤鸭名称
     */
    private String pekingDuckName;
    /**
     * 线程等待标志
     */
    private boolean flag = false;
    /**
     * 生产烤鸭
     *
     * @param pekingDuckName 烤鸭名称
     */
    public synchronized void production(String pekingDuckName) {
        if (flag) {
            try {
                // 有烤鸭进入等待状态
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 设置烤鸭名称和数量
        this.pekingDuckName = pekingDuckName;
        // 数量 + 1
        count++;
        System.out.println(Thread.currentThread().getName() + ".生产者:" + pekingDuckName + " " + count);
        // 有烤鸭后改变状态
        flag = true;
        // 通知消费线程来消费烤鸭
        notifyAll();
    }
    /**
     * 消费烤鸭
     */
    public synchronized void consumption() {
        // 没有烤鸭就进入等待状态
        if (!flag) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 消费一只烤鸭
        System.out.println(Thread.currentThread().getName() + ".消费者:" + this.pekingDuckName + " " + count);
        // 没有烤鸭后改变状态
        flag = false;
        // 通知生产线程来生产烤鸭
        notifyAll();
    }
    /**
     * 生产者线程
     */
    static class ProducerThread implements Runnable {
        private final PekingDuck pekingDuck;
        public ProducerThread(PekingDuck pekingDuck) {
            this.pekingDuck = pekingDuck;
        }
        @Override
        public void run() {
            while (true) {
                pekingDuck.production("北京烤鸭");
            }
        }
    }
    /**
     * 消费者线程
     */
    static class ConsumerThread implements Runnable {
        private final PekingDuck pekingDuck;
        public ConsumerThread(PekingDuck pekingDuck) {
            this.pekingDuck = pekingDuck;
        }
        @Override
        public void run() {
            while (true) {
                pekingDuck.consumption();
            }
        }
    }
    public static void main(String[] args) {
        // 创建烤鸭店铺对象
        PekingDuck pekingDuck = new PekingDuck();
        // 创建生产者
        ProducerThread producerThread = new ProducerThread(pekingDuck);
        // 创建消费者
        ConsumerThread consumerThread = new ConsumerThread(pekingDuck);
        // 创建生产者线程
        Thread t1 = new Thread(producerThread);
        Thread t2 = new Thread(producerThread);
        // 创建消费者线程
        Thread t3 = new Thread(consumerThread);
        Thread t4 = new Thread(consumerThread);
        // 启动线程
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }
}

新增了两条线程后来看下面的执行结果:

执行结果
C:\LightRainData\IDEA\JDK\JDK-17.0.6\bin\java.exe "-javaagent:C:\LightRainData\IDEA\IntelliJ IDEA 2022.3.2\lib\idea_rt.jar=9761:C:\LightRainData\IDEA\IntelliJ IDEA 2022.3.2\bin" -Dfile.encoding=UTF-8 -classpath D:\ 项目 \gitee\multi-threaded-example\target\classes top.rem.rain.demo2.PekingDuck
Thread-0. 生产者:北京烤鸭 1
Thread-3. 消费者:北京烤鸭 1
Thread-1. 生产者:北京烤鸭 2
Thread-3. 消费者:北京烤鸭 2
Thread-2. 消费者:北京烤鸭 2
Thread-1. 生产者:北京烤鸭 3
Thread-3. 消费者:北京烤鸭 3
Thread-2. 消费者:北京烤鸭 3
Thread-1. 生产者:北京烤鸭 4
Thread-3. 消费者:北京烤鸭 4
Thread-2. 消费者:北京烤鸭 4
Thread-1. 生产者:北京烤鸭 5
Thread-3. 消费者:北京烤鸭 5
Thread-2. 消费者:北京烤鸭 5
........................
Thread-1. 生产者:北京烤鸭 63710
Thread-0. 生产者:北京烤鸭 63711
Thread-1. 生产者:北京烤鸭 63712
Thread-0. 生产者:北京烤鸭 63713
Thread-2. 消费者:北京烤鸭 63713
Thread-3. 消费者:北京烤鸭 63713
Thread-2. 消费者:北京烤鸭 63713
Thread-3. 消费者:北京烤鸭 63713
Thread-2. 消费者:北京烤鸭 63713
Thread-3. 消费者:北京烤鸭 63713
Thread-2. 消费者:北京烤鸭 63713
Thread-3. 消费者:北京烤鸭 63713
Thread-2. 消费者:北京烤鸭 63713
Thread-3. 消费者:北京烤鸭 63713
Thread-2. 消费者:北京烤鸭 63713
Thread-3. 消费者:北京烤鸭 63713
Thread-2. 消费者:北京烤鸭 63713
............................
进程已结束,退出代码 - 1

仅仅新增了两条线程后从执行结果中就可以看出明显的不对了,生产到第 5 只烤鸭的时候第 5 只烤鸭居然被消费了两次,然后当生产到第 63713 只的时候紧接着当前烤鸭被消费了 n 次,有的烤鸭还没有被消费,难道共享数据没有进行线程同步么?先来看下面代码:

PekingDuck2.java
package top.rem.rain.demo2;
/**
 * @Author: LightRain
 * @Description: 北京烤鸭 - 多生产者与多消费者
 * @DateTime: 2024-01-09 15:38
 * @Version:1.0
 **/
public class PekingDuck2 {
    /**
     * 烤鸭数量
     */
    private int count = 0;
    /**
     * 烤鸭名称
     */
    private String pekingDuckName;
    /**
     * 线程等待标志
     */
    private boolean flag = false;
    /**
     * 生产烤鸭
     *
     * @param pekingDuckName 烤鸭名称
     */
    public synchronized void production(String pekingDuckName) {
        if (flag) {
            try {
                // 有烤鸭进入等待状态
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 设置烤鸭名称和数量
        this.pekingDuckName = pekingDuckName;
        // 数量 + 1
        count++;
        System.out.println(Thread.currentThread().getName() + ".生产者:" + pekingDuckName + " " + count);
        // 有烤鸭后改变状态
        flag = true;
        // 通知消费线程来消费烤鸭
        notifyAll();
    }
    /**
     * 消费烤鸭
     */
    public synchronized void consumption() {
        // 没有烤鸭就进入等待状态
        if (!flag) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 消费一只烤鸭
        System.out.println(Thread.currentThread().getName() + ".消费者:" + this.pekingDuckName + " " + count);
        // 没有烤鸭后改变状态
        flag = false;
        // 通知生产线程来生产烤鸭
        notifyAll();
    }
    
    // ....
}

从代码中可以看出共享数据 count 的获取方法都使用了 synchronized 关键字进行了同步,为什么还会出现数据错误的现象呢?

分析:确实对共享数据采用了同步措施,而且也应用了等待 / 通知机制,但是这样的措施只能在单生产者与单消费者的情况下才可以正确应用,从执行结果中可以看出之前的单生产者与单消费者安全处理措施就不适合多生产者与多消费者的情况了,看下图:

pFpyrL9.png

修正:上图中的 notify() 方法需要修改为 notifyAll() 才可正常执行, notify() 方法只适合在单生产者与单消费者模式中,在多生产者与多消费者中需要使用 notifyAll() 方法来唤醒所有的线程,并且两处的 if 都需要修改为 while 判断,修改后的正确代码如下:

PekingDuck2.java
package top.rem.rain.demo2;
/**
 * @Author: LightRain
 * @Description: 北京烤鸭 - 多生产者与多消费者
 * @DateTime: 2024-01-09 18:38
 * @Version:1.0
 **/
public class PekingDuck2 {
  /**
   * 烤鸭数量
   */
  private int count = 0;
  /**
   * 烤鸭名称
   */
  private String pekingDuckName;
  /**
   * 线程等待标志
   */
  private boolean flag = false;
  /**
   * 生产烤鸭
   *
   * @param pekingDuckName 烤鸭名称
   */
  public synchronized void production(String pekingDuckName) {
    while (flag) {
      try {
        // 有烤鸭进入等待状态
        wait();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    // 设置烤鸭名称和数量
    this.pekingDuckName = pekingDuckName;
    // 数量 + 1
    count++;
    System.out.println(Thread.currentThread().getName() + ".生产者:" + pekingDuckName + " " + count);
    // 有烤鸭后改变状态
    flag = true;
    // 通知消费线程来消费烤鸭
    notifyAll();
  }
  /**
   * 消费烤鸭
   */
  public synchronized void consumption() {
    // 没有烤鸭就进入等待状态
    while (!flag) {
      try {
        wait();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    // 消费一只烤鸭
    System.out.println(Thread.currentThread().getName() + ".消费者:" + this.pekingDuckName + " " + count);
    // 没有烤鸭后改变状态
    flag = false;
    // 通知生产线程来生产烤鸭
    notifyAll();
  }
  /**
   * 生产者线程
   */
  static class ProducerThread implements Runnable {
    private final PekingDuck2 pekingDuck;
    public ProducerThread(PekingDuck2 pekingDuck) {
      this.pekingDuck = pekingDuck;
    }
    @Override
    public void run() {
      while (true) {
        pekingDuck.production("北京烤鸭");
      }
    }
  }
  /**
   * 消费者线程
   */
  static class ConsumerThread implements Runnable {
    private final PekingDuck2 pekingDuck;
    public ConsumerThread(PekingDuck2 pekingDuck) {
      this.pekingDuck = pekingDuck;
    }
    @Override
    public void run() {
      while (true) {
        pekingDuck.consumption();
      }
    }
  }
  public static void main(String[] args) {
    // 创建烤鸭店铺对象
    PekingDuck2 pekingDuck = new PekingDuck2();
    // 创建生产者
    ProducerThread producerThread = new ProducerThread(pekingDuck);
    // 创建消费者
    ConsumerThread consumerThread = new ConsumerThread(pekingDuck);
    // 创建生产者线程
    Thread t1 = new Thread(producerThread);
    Thread t2 = new Thread(producerThread);
    // 创建消费者线程
    Thread t3 = new Thread(consumerThread);
    Thread t4 = new Thread(consumerThread);
    // 启动线程
    t1.start();
    t2.start();
    t3.start();
    t4.start();
  }
}

执行结果如下:

执行结果
Thread-0. 生产者:北京烤鸭 1
Thread-3. 消费者:北京烤鸭 1
Thread-1. 生产者:北京烤鸭 2
Thread-3. 消费者:北京烤鸭 2
Thread-1. 生产者:北京烤鸭 3
Thread-3. 消费者:北京烤鸭 3
Thread-1. 生产者:北京烤鸭 4
Thread-3. 消费者:北京烤鸭 4
Thread-1. 生产者:北京烤鸭 5
Thread-3. 消费者:北京烤鸭 5
Thread-1. 生产者:北京烤鸭 6
Thread-3. 消费者:北京烤鸭 6
Thread-1. 生产者:北京烤鸭 7
Thread-3. 消费者:北京烤鸭 7
Thread-1. 生产者:北京烤鸭 8
Thread-3. 消费者:北京烤鸭 8
Thread-1. 生产者:北京烤鸭 9
Thread-3. 消费者:北京烤鸭 9
Thread-1. 生产者:北京烤鸭 10
Thread-3. 消费者:北京烤鸭 10
Thread-1. 生产者:北京烤鸭 11
Thread-3. 消费者:北京烤鸭 11
Thread-1. 生产者:北京烤鸭 12
Thread-3. 消费者:北京烤鸭 12
Thread-1. 生产者:北京烤鸭 13
Thread-3. 消费者:北京烤鸭 13
Thread-1. 生产者:北京烤鸭 14
Thread-3. 消费者:北京烤鸭 14
Thread-1. 生产者:北京烤鸭 15
Thread-3. 消费者:北京烤鸭 15
Thread-1. 生产者:北京烤鸭 16
Thread-3. 消费者:北京烤鸭 16
Thread-1. 生产者:北京烤鸭 17
Thread-3. 消费者:北京烤鸭 17
Thread-1. 生产者:北京烤鸭 18
Thread-3. 消费者:北京烤鸭 18
Thread-1. 生产者:北京烤鸭 19
Thread-3. 消费者:北京烤鸭 19
Thread-1. 生产者:北京烤鸭 20
Thread-3. 消费者:北京烤鸭 20
Thread-1. 生产者:北京烤鸭 21
Thread-3. 消费者:北京烤鸭 21
Thread-1. 生产者:北京烤鸭 22
Thread-3. 消费者:北京烤鸭 22
Thread-1. 生产者:北京烤鸭 23
Thread-3. 消费者:北京烤鸭 23
Thread-1. 生产者:北京烤鸭 24
Thread-3. 消费者:北京烤鸭 24
Thread-1. 生产者:北京烤鸭 25
Thread-3. 消费者:北京烤鸭 25
Thread-1. 生产者:北京烤鸭 26
Thread-3. 消费者:北京烤鸭 26
Thread-1. 生产者:北京烤鸭 27
Thread-3. 消费者:北京烤鸭 27
Thread-1. 生产者:北京烤鸭 28
Thread-3. 消费者:北京烤鸭 28
Thread-1. 生产者:北京烤鸭 29
Thread-3. 消费者:北京烤鸭 29
Thread-1. 生产者:北京烤鸭 30
Thread-3. 消费者:北京烤鸭 30
Thread-1. 生产者:北京烤鸭 31
Thread-3. 消费者:北京烤鸭 31
Thread-1. 生产者:北京烤鸭 32
Thread-3. 消费者:北京烤鸭 32
Thread-1. 生产者:北京烤鸭 33
Thread-3. 消费者:北京烤鸭 33
Thread-1. 生产者:北京烤鸭 34
Thread-3. 消费者:北京烤鸭 34
Thread-1. 生产者:北京烤鸭 35
Thread-3. 消费者:北京烤鸭 35
Thread-1. 生产者:北京烤鸭 36
Thread-3. 消费者:北京烤鸭 36
Thread-1. 生产者:北京烤鸭 37
Thread-3. 消费者:北京烤鸭 37
Thread-1. 生产者:北京烤鸭 38
Thread-3. 消费者:北京烤鸭 38
Thread-1. 生产者:北京烤鸭 39
Thread-3. 消费者:北京烤鸭 39
Thread-1. 生产者:北京烤鸭 40
Thread-3. 消费者:北京烤鸭 40
.........................

从当前执行结果中可以看出当前数据是没有错乱的,这样数据就正常了,而锁对象的解决方法与此相同。

下面将介绍一种更有效得锁对象解决方法,使用两组条件对象 (Condition也被称为监视器) 来实现等待 / 通知机制,也就是通过已有的锁获取两组监视器,一组监视生产者,一组监视消费者,代码如下:

PekingDuckLockCondition.java
package top.rem.rain.demo2;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
 * @Author: LightRain
 * @Description: 北京烤鸭 - 多生产者与多消费者,通过已有的锁获取两组监视器,一组监视生产者,一组监视消费者
 * @DateTime: 2024-01-09 19:10
 * @Version:1.0
 **/
public class PekingDuckLockCondition {
    /**
     * 烤鸭数量
     */
    private int count = 0;
    /**
     * 烤鸭名称
     */
    private String pekingDuckName;
    /**
     * 线程等待标志
     */
    private boolean flag = false;
    /**
     * 创建锁对象
     */
    private final Lock lock = new ReentrantLock();
    /**
     * 监视生产者的监视器
     */
    private final Condition productionCondition = lock.newCondition();
    /**
     * 监视消费者的监视器
     */
    private final Condition consumptionCondition = lock.newCondition();
    /**
     * 生产烤鸭
     *
     * @param pekingDuckName 烤鸭名称
     */
    public void production(String pekingDuckName) {
        // 获取锁
        lock.lock();
        try {
            while (flag) {
                try {
                    // 有烤鸭进入等待状态
                    productionCondition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 设置烤鸭名称和数量
            this.pekingDuckName = pekingDuckName;
            // 数量 + 1
            count++;
            System.out.println(Thread.currentThread().getName() + ".生产者:" + pekingDuckName + " " + count);
            // 有烤鸭后改变状态
            flag = true;
            // 直接唤醒消费线程
            consumptionCondition.signal();
        } finally {
            // 释放锁
            lock.unlock();
        }
    }
    /**
     * 消费烤鸭
     */
    public void consumption() {
        // 获取锁
        lock.lock();
        try {
            // 没有烤鸭就进入等待状态
            while (!flag) {
                try {
                    consumptionCondition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 消费一只烤鸭
            System.out.println(Thread.currentThread().getName() + ".消费者:" + this.pekingDuckName + " " + count);
            // 没有烤鸭后改变状态
            flag = false;
            // 直接唤醒生产线程
            productionCondition.signal();
        } finally {
            // 释放锁
            lock.unlock();
        }
    }
    /**
     * 生产者线程
     */
    static class ProducerThread implements Runnable {
        private final PekingDuckLockCondition pekingDuck;
        public ProducerThread(PekingDuckLockCondition pekingDuck) {
            this.pekingDuck = pekingDuck;
        }
        @Override
        public void run() {
            while (true) {
                pekingDuck.production("北京烤鸭");
            }
        }
    }
    /**
     * 消费者线程
     */
    static class ConsumerThread implements Runnable {
        private final PekingDuckLockCondition pekingDuck;
        public ConsumerThread(PekingDuckLockCondition pekingDuck) {
            this.pekingDuck = pekingDuck;
        }
        @Override
        public void run() {
            while (true) {
                pekingDuck.consumption();
            }
        }
    }
    public static void main(String[] args) {
        // 创建烤鸭店铺对象
        PekingDuckLockCondition pekingDuck = new PekingDuckLockCondition();
        // 创建生产者
        ProducerThread producerThread = new ProducerThread(pekingDuck);
        // 创建消费者
        ConsumerThread consumerThread = new ConsumerThread(pekingDuck);
        // 创建生产者线程
        Thread t1 = new Thread(producerThread);
        Thread t2 = new Thread(producerThread);
        // 创建消费者线程
        Thread t3 = new Thread(consumerThread);
        Thread t4 = new Thread(consumerThread);
        // 启动线程
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }
}

在上述代码中创建了两个条件对象分别是 productionConditionconsumptionCondition 并且分别用于监视生产线程和监视消费线程,在 production() 方法中获取到锁后,此时 flag = true 的话也就是还有烤鸭未被消费,因此生产线程需要进行等待,所以调用生产线程的监视器 productionConditionawait() 方法进入阻塞等待池。

如果此时的 flagfalse 的话就说明烤鸭已被消费完,需要生产线程去生产烤鸭,那么生产线程将进行烤鸭的生产并通过消费线程的监视器 consumptionConditionsignal() 方法去通知消费线程对烤鸭进行消费,与此同时在 consumption() 方法中也是同样的道理。

可以发现使用这种方法比之前使用 synchronized 同步方法或是单监视器的锁对象都要高效和方便,在此之前都是使用 notify()/notifyAll()signal()/signalAll() 方法去唤醒等待池中的线程,然后让池中的线程又进入竞争队列去抢占 CPU 资源,这样不仅唤醒了无关的线程而且又让全部线程进入了竞争队列中。

然而当使用两种监视器分别监听生产者线程和消费者线程,这样的方式恰好解决了前面两种方式的问题所在,每次唤醒都只是生产者线程或是消费者线程而不会让两者都同时唤醒,这样不就可以更高效得执行程序了么。

# 线程死锁

关于线程死锁问题,通过上面代码示例,我们知道了锁是一个非常有用的工具,适用的场景非常多,因为他适用起来非常得简单,但它同时也会带来一些不必要的麻烦,那就是可能会引起线程死锁,一旦产生了死锁就会造成系统功能的不可用,下面的示例将演示线程 t1 和线程 t2 互相等待对方释放锁从而引起死锁,代码如下:

package top.rem.rain.demo2;
/**
 * @Author: LightRain
 * @Description: 线程死锁示例
 * @DateTime: 2024-01-09 23:07
 * @Version:1.0
 **/
public class DeadLock {
    private static String A = "A";
    private static String B = "B";
    static class DeadLockA implements Runnable{
        @SuppressWarnings("static-access")
        @Override
        public void run() {
            synchronized (A) {
                try {
                    Thread.currentThread().sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            synchronized (B) {
                System.out.println("1");
            }
        }
    }
    static class DeadLockB implements Runnable{
        @Override
        public void run() {
            synchronized (B) {
                synchronized (A) {
                    System.out.println("2");
                }
            }
        }
    }
    private void deadLock() {
        DeadLockA deadLockA = new DeadLockA();
        DeadLockB deadLockB = new DeadLockB();
        Thread t1 = new Thread(deadLockA);
        Thread t2 = new Thread(deadLockB);
        // 启动线程
        t1.start();
        t2.start();
    }
    public static void main(String[] args) {
        DeadLock deadLock = new DeadLock();
        while (true) {
            deadLock.deadLock();
        }
    }
}

同步嵌套是产生死锁的常见情景,从上面的代码中可以看出,当线程 t1 获取到锁 (A) 后休眠 2秒 ,此时线程 t2 刚好获取到了锁 (B) , 接着要获取锁 (A) , 但此时锁 (A) 正在被 t1 线程所持有,因此只能等待 t1 线程释放锁 (A) , 但遗憾的是在 t1 线程内又请求获取锁 (B) ,而锁 (B) 此时又被 t2 线程所持有,到此结果就是 t1 线程拿到了锁 (A) 同时等待 t2 线程释放锁 (B) ,而 t2 线程获取到了锁 (B) 也同时在等待 t1 线程释放锁 (A) , 然后彼此等待也就造成了线程死锁问题。

虽然在一般情况时不会这么写代码,但是有些 较为复杂的场景中,可能就会遇到这种问题,所以在写同步代码时需要多考虑死锁的情况,避免发生线程死锁,避免死锁常见的几种方法:1. 避免一个线程同时获取多个锁。2. 避免在一个资源内占用多个资源,尽量保证每个锁只占用一个资源。3. 尝试使用定时锁,使用 tryLock(timeout) 来代替内部锁机制。4. 对于数据库锁,加锁和解锁必须在同一个数据库连接里面,否则将会出现解锁失败的情况。5. 避免同步嵌套的发生。

# Thread.join () 方法

如果有一个线程 A 执行了 thread.join() 方法,其含义是当前线程 A 等待 thread 线程终止之后才能从 thread.join() 方法中返回。线程 Thread 除了提供 join() 方法之外,还提供了 join(long millis)join(long millis,int nanos) 两个具备超时特性的方法。

这两个超时方法表示如果线程在给定的超时时间里没有终止,那么将会从该超时方法中返回,来看下面代码示例,创建 10 个线程,从 0 ~ 9 每个线程调用前一个线程的 join() 方法,也就是线程 0 结束后线程 1 才能从 join() 方法中返回,而 0 需要等待 main 线程结束。

JoinDemo.java
package top.rem.rain.demo2;
/**
 * @Author: LightRain
 * @Description: join 示例
 * @DateTime: 2024-01-10 00:05
 * @Version:1.0
 **/
public class JoinDemo {
    public static void main(String[] args) {
        Thread previous = Thread.currentThread();
        for (int i = 0; i < 10; i++) {
            // 每个线程拥有前一个线程的引用。需要等待前一个线程终止,才能从等待中返回
            Thread thread = new Thread(new Domino(previous), String.valueOf(i));
            thread.start();
            previous = thread;
        }
        System.out.println(Thread.currentThread().getName() + " 线程结束");
        /*
        执行结果:
        main 线程结束
        0 线程结束
        1 线程结束
        2 线程结束
        3 线程结束
        4 线程结束
        5 线程结束
        6 线程结束
        7 线程结束
        8 线程结束
        9 线程结束
         */
    }
}
class Domino implements Runnable {
    private Thread thread;
    public Domino(Thread thread) {
        this.thread = thread;
    }
    @Override
    public void run() {
        try {
            thread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " 线程结束");
    }
}