更多详情内容请访问:JUC 系列文章导读

1、基本概念

1.1 进程、线程、管程

  • 进程:指在系统中正在运行的一个应用程序;程序一旦运行就是进程。进程——资源分配的最小单位。
  • 线程:系统分配处理器时间资源的基本单元,或者说进程之内独立执行的一个单元执行流。线程——程序执行的最小单位。
  • 管程:Monitor(监视器),也就是我们平时所说的,是一种同步机制,保证同一时间只有一个线程可以访问被保护的数据和代码。JVM 中同步是基于进入和退出管程(monitor)对象实现的,每个对象都会有一个管程对象,管程会随着 Java 对象一同创建和销毁

一个进程在其执行的过程中可以产生多个线程。同类的多个线程共享进程的方法区资源,但每个线程有自己的程序计数器虚拟机栈本地方法栈

1.2 并发与并行

  • 并发:两个及两个以上的作业在同一时间段内执行。例子:电商秒杀...
  • 并行:两个及两个以上的作业在同一时刻执行。例子:泡方便面,电水壶烧水,一边撕调料倒入桶中

1.3 线程的生命周期和状态

Java 线程在运行的生命周期中的指定时刻只可能处于下面 6 种不同状态的其中一个状态

image-20220816024054541

状态名称 说明
NEW 初始状态,线程被构建,但是还没有调用 start() 方法
RUNNABLE 运行状态,Java 线程将操作系统中就绪运行两种状态统称作“运行中”
BLOCKED 阻塞状态,表示线程阻塞于锁
WAITING 等待状态,进入该状态表示当前线程需要等待其他线程做出一些特定动作(通知或中断)
TIME_WAITING 超时等待状态,不同意于 WAITING 可以在指定的时间自行返回
TERMINATED 终止状态,表示当前线程已经执行完毕

image-20220305234020909

1.4 用户线程和守护线程

  • 用户线程:平时用到的普通线程,自定义线程
  • 守护线程:运行在后台,是一种特殊的线程,比如垃圾回收。当主线程结束后,用户线程还在运行,JVM 存活;如果没有用户线程,都是守护线程,JVM 结束。

setDaemon(true) 方法可以将该线程修改为守护线程。setDaemon(true) 方法必须在 start() 之前设置,否则报 IIIegalThreadStateException 异常

1.5 线程的优先级

  1. 线程的优先级等级
    • MAX_PRIORITY:10
    • MIN _PRIORITY:1
    • NORM_PRIORITY:5(默认)
  2. 涉及的方法:getPriority()setPriority(int newPriority)
  3. 线程创建时继承父线程的优先级。低优先级只是获得调度的概率低,并非一定是在高优先级线程之后才被调用

2、创建线程的 4 种方式

使用较少,因为 Java 只支持单继承

public class CreateThread1 {

    public static void main(String[] args) {
        new MyThread().start();
        System.out.println(Thread.currentThread().getName() + " is running...");
        try { TimeUnit.SECONDS.sleep(0); } catch (InterruptedException e) { e.printStackTrace(); }
        System.out.println(Thread.currentThread().getName() + " -> over");
    }
}

class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " is running...");
        try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
        System.out.println(Thread.currentThread().getName() + " -> over");
    }
}

任务不需要返回结果或者抛出异常推荐使用 Runnable 接口,这样代码看起来会更加简洁

public class CreateThread2 {

    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " is running...");
                try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
                System.out.println(Thread.currentThread().getName() + " -> over");
            }
        }, "分支线程").start();

        System.out.println(Thread.currentThread().getName() + " is running...");
        try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
        System.out.println(Thread.currentThread().getName() + " -> over");
    }
}

Callable 接口就是为了处理 Runnable 接口的增强体:Callable 接口可以返回结果或抛出异常检查

public class CreateThread3 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        MyThread3 callable = new MyThread3();
        FutureTask<String> futureTask = new FutureTask<>(callable);
        Thread thread = new Thread(futureTask);
        thread.start();
        System.out.println(Thread.currentThread().getName() + " is running...");
        try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
        System.out.println(Thread.currentThread().getName() + " -> over");
        System.out.println(futureTask.get());
    }
}

class MyThread3 implements Callable<String> {
    @Override
    public String call() throws Exception {
        System.out.println(Thread.currentThread().getName() + " is running...");
        return "MyThread Return";
    }
}

public class CreateThread4 {

    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 100;
    private static final Long KEEP_ALIVE_TIME = 1L;

    public static void main(String[] args) {
        // 使用阿里巴巴推荐的创建线程池的方式:通过ThreadPoolExecutor构造函数自定义参数创建
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE, // 1. corePoolSize: 核心线程数为 5。
                MAX_POOL_SIZE, // 2. maximumPoolSize :最大线程数 10
                KEEP_ALIVE_TIME, // 3. keepAliveTime : 等待时间为 1L。
                TimeUnit.SECONDS, // 4. unit: 等待时间的单位为 TimeUnit.SECONDS。
                new ArrayBlockingQueue<>(QUEUE_CAPACITY), // 5. workQueue:任务队列为 ArrayBlockingQueue,并且容量为 100;
                new ThreadPoolExecutor.CallerRunsPolicy()); //6. handler:饱和策略为 CallerRunsPolicy。

        for (int i = 0; i < 10; i++) {
            //创建WorkerThread对象(WorkerThread类实现了Runnable 接口)
            Runnable worker = new MyRunnable("" + i);
            //执行Runnable
            executor.execute(worker);
        }
        //终止线程池
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("Finished all threads");
    }
}


/**
 * 这是一个简单的Runnable类,需要大约5秒钟来执行其任务。
 * @author shuang.kou
 */
class MyRunnable implements Runnable {

    private String command;

    public MyRunnable(String s) {
        this.command = s;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());
        try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); }
        System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
    }

    @Override
    public String toString() {
        return this.command;
    }
}

3、常用类及方法

3.1 wait() 与 sleep()

sleep() wait()
一旦执行方法,都可以使得当前的线程进入阻塞状态 一旦执行方法,都可以使得当前的线程进入阻塞状态
Thread 类中声明 sleep() Object 类中声明 wait()
可以在任何需要的场景下调用 必须使用在同步代码块或同步方法中
sleep() 不会释放锁 wait() 会释放锁
常被用于暂停执行,调用后线程会自动苏醒 常被用于线程间的交互/通信,被调用后线程不会自动苏醒

3.2 join()

是 Thread 类中的一个方法,该方法的定义是等待该线程执行直到终止。其实就是说:join() 方法将挂起调用线程的执行,直到被调用的对象完成执行

  • 调用线程:当前线程,即调用了 t.join() 语句的线程
  • 被调用对象:t.join() 当中的线程对象 t

在很多情况下,主线程创建并启动子线程,如果子线程中要进行大量耗时运算,主线程将可能早于子线程结束。如果主线程需知道子线程的执行结果,就需要等待子线程执行结束。

主线程虽然可以使用 sleep(xxx) 这种方法,但是由于执行时间不好确定,因此 join() 方法更加适合这种场景

3.3 TimeUnit 类

TimeUnit 是 java.util.concurrent 包下面的一个类,TimeUnit 提供了可读性更好的线程暂停操作,通常用来替换Thread.sleep() 方法,然而底层实现还是使用的 Thread.sleep( )

//停顿3秒
try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) {e.printStackTrace();}
//停顿3分钟
try { TimeUnit.MINUTES.sleep(3); } catch (InterruptedException e) {e.printStackTrace();}
//停顿3小时
try { TimeUnit.HOURS.sleep(3); } catch (InterruptedException e) {e.printStackTrace();}
//停顿3天
try { TimeUnit.DAYS.sleep(3); } catch (InterruptedException e) {e.printStackTrace();}

4、Synchronized 关键字与 Lock 接口

4.1 Lock 和 Synchronized

synchronized lock
存在层次 Java 关键字,在 JVM 层面上,依赖 monitor 对象实现 是一个类,在 API 层面上,主要使用 ReentrantLock 实现
锁的释放 ①以获取锁的线程执行完同步代码后,自动释放锁
②线程执行发生异常,JVM强制让线程释放锁
在 finally 中必须释放锁,需要手动释放
锁的获取 不可中断,除非抛出异常或者正常运行完成 可中断,设置超时时间 tryLock(long timeout,TimeUnit unit) 或调用 interrupt() 方法中断
锁的状态 无法判断。synchronized 没有 Condition,要么随机唤醒一个线程,要么会唤醒多个线程 可以判断,ReentrantLock 用来实现分组唤醒需要唤醒线程们,可以精确唤醒
锁的类型 可重入,非公平 可重入,可公平
锁的性能 少量同步,两者性能差不多 大量同步,性能更好,具备更好的扩展性

4.2 Synchronized 关键字

  1. 修饰一个代码块,被修饰的代码块称为同步语句块,其作用的范围是大括号 {} 括起来的代码,作用的对象是调用这个代码块的对象;

  2. 修饰一个方法,被修饰的方法称为同步方法,其作用的范围是整个方法,作用的对象是调用这个方法的对象;

    虽然可以使用 synchronized 来定义方法,但 synchronized 并不属于方法定义的一部分,因此,synchronized 关键字不能被继承。

  3. 修改一个静态的方法,其作用的范围是整个静态方法,作用的对象是这个类的所有对象;

  4. 修改一个类,其作用的范围是 synchronized 后面括号括起来的部分,作用的对象是这个类的所有对象。

使用 synchronized 模拟售票过程

public class SaleTicket {

    // 第二步:创建多线程吗,调用资源类的操作方法
    public static void main(String[] args) {
        Tick tick = new Tick();
        // 创建三个线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 调用资源类中操作方法
                for (int i = 0; i < 40; i++) {
                    tick.sale();
                }
            }
        }, "B").start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 调用资源类中操作方法
                for (int i = 0; i < 40; i++) {
                    tick.sale();
                }
            }
        }, "A").start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 调用资源类中操作方法
                for (int i = 0; i < 40; i++) {
                    tick.sale();
                }
            }
        }, "C").start();
    }
}

// 第一步:创建资源类,在资源类创建属性和操作方法
class Tick {
    private int number = 30;

    public synchronized void sale() {
        // 判断当前是否还有票
        if (number > 0) {
            System.out.println(Thread.currentThread().getName() + "卖出:" + number-- + ",剩下" + number);
        }
    }
}

4.3 Lock 接口

4.3.1 什么是 Lock 接口

如果一个代码块被 synchronized 修饰了,当一个线程获取了对应的锁,并执行该代码块时,其他线程便只能一直等待,等待获取锁的线程释放锁,而这里获取锁的线程释放锁只会有两种情况:

  1. 获取锁的线程执行完了该代码块,然后线程释放对锁的占有;
  2. 线程执行发生异常,此时 JVM 会让线程自动释放锁。

那么如果这个获取锁的线程由于要等待 IO 或者其他原因(比如调用 sleep 方法)被阻塞了,但是又没有释放锁,其他线程便只能干巴巴地等待,试想一下,这多么影响程序执行效率。因此就需要有一种机制可以不让等待的线程一直无期限地等待下去(比如只等待一定的时间或者能够响应中断),通过 Lock 就可以办到。

Lock 锁实现提供了比使用同步方法和语句可以获得的更广泛的锁操作。它们允许更灵活的结构,可能具有非常不同的属性,并且可能支持多个关联的条件对象。Lock 提供了比 synchronized 更多的功能。

4.3.2 Lock 接口常用方法

public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}

lock() 方法是平常使用得最多的一个方法,就是用来获取锁。如果锁已被其他线程获取,则进行等待。

采用 Lock,必须主动去释放锁,并且在发生异常时,不会自动释放锁。因此一般来说,使用 Lock 必须在 try{}catch{} 块中进行,并且将释放锁的操作放在 finally 块中进行,以保证锁一定被被释放,防止死锁的发生。

通常使用 Lock 来进行同步的话,是以下面这种形式去使用的:

Lock lock = ...;
lock.lock(); // 上锁
try {
    // 处理任务
} finally {
    lock.unlock(); // 释放锁
}

一个简单案例

public class SaleTicket {

    // 第二步:创建多个线程,调用资源类的方法
    public static void main(String[] args) {
        Ticket ticket = new Ticket();
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 40; i++)
                ticket.sale();
            }
        }, "A").start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 调用资源类中操作方法
                for (int i = 0; i < 40; i++) {
                    ticket.sale();
                }
            }
        }, "B").start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 调用资源类中操作方法
                for (int i = 0; i < 40; i++) {
                    ticket.sale();
                }
            }
        }, "C").start();
    }
}

// 第一步:创建资源类,定义属性和操作方法
class Ticket {

    // 创建可重入锁
    private final ReentrantLock lock = new ReentrantLock();

    private int number = 30;

    public void sale() {
        // 上锁
        lock.lock();

        try {
            // 判断是否有票
            if (number > 0) {
                System.out.println(Thread.currentThread().getName() + ":卖出了" + number-- + ",还剩余" + number);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 解锁
            lock.unlock();
        }
    }
}

tryLock() 有一个重载方法,这个方法就是:tryLock(long time , TimeUnit unit) 方法,这个方法去限定了一个尝试获取锁的时间。

  • 获取锁成功则返回 true;
  • 当失败是分为两种情况:
    1. 在参数范围内,则不会立即返回值,会等待一段时间,这个时间就是传入的具体参数值,在这个时间内获取锁成功,则依旧返回true;
    2. 当过了参数范围后,还是获取锁失败,则立即返回false。

使用形式如下:

Lock lock = ...;
//根据尝试获取锁的值来判断具体执行的代码
if(lock.tryLock(long time , TimeUnit unit)) {
     try{
         //处理任务
     }catch(Exception ex){

     }finally{
        //当获取锁成功时最后一定要记住finally去关闭锁
         lock.unlock();   //释放锁
     } 
}else {
    //else时为未获取锁,则无需去关闭锁
    //如果不能获取锁,则直接做其他事情
}

一个简单案例

public class TryLockDemo {

    //实例化Lock对象
    Lock lock = new ReentrantLock();

    public static void main(String[] args) {
        //实例化本类对象,目的是调用runThread方法
        TryLockDemo tl = new TryLockDemo();
        //匿名对象创建线程1,并重写run方法,启动线程
        new Thread(() -> {tl.runThreadNow(Thread.currentThread());}, "T1").start();
        //匿名对象创建线程2,并重写run方法,启动线程
        new Thread(() -> {tl.runThreadNow(Thread.currentThread());}, "T2").start();
        //匿名对象创建线程3,并重写run方法,启动线程
        new Thread(() -> { try { tl.runThreadMonent(Thread.currentThread()); } catch (InterruptedException e) { e.printStackTrace(); } }, "T3").start();
        //匿名对象创建线程4,并重写run方法,启动线程
        new Thread(() -> { try { tl.runThreadMonent(Thread.currentThread()); } catch (InterruptedException e) { e.printStackTrace(); } }, "T4").start();
    }

    //线程共同调用方法
    public void runThreadNow(Thread t) {
        //lock对象调用trylock()方法尝试获取锁
        if (lock.tryLock()) {
            //获锁成功代码段
            System.out.println("线程" + t.getName() + "获取锁成功");
            try {
                //执行的代码
                Thread.sleep(5000);
            } catch (Exception e) {
                //异常处理内容,比如中断异常,需要恢复等
            } finally {
                //获取锁成功之后,一定记住加finally并unlock()方法,释放锁
                System.out.println("线程" + t.getName() + "释放锁");
                lock.unlock();
            }
        } else {
            //获锁失败代码段
            //具体获取锁失败的回复响应
            System.out.println("线程" + t.getName() + "获取锁失败");
        }
    }
    //线程共同调用方法
    public void runThreadMonent(Thread t) throws InterruptedException {
        //lock对象调用trylock()方法尝试获取锁
        if (lock.tryLock(5, TimeUnit.SECONDS)) {
            //获锁成功代码段
            System.out.println("线程" + t.getName() + "获取锁成功");
            try {
                //执行的代码
                Thread.sleep(5000);
            } catch (Exception e) {
                //异常处理内容,比如中断异常,需要恢复等
            } finally {
                //获取锁成功之后,一定记住加finally并unlock()方法,释放锁
                System.out.println("线程" + t.getName() + "释放锁");
                lock.unlock();
            }
        } else {
            //获锁失败代码段
            //具体获取锁失败的回复响应
            System.out.println("线程" + t.getName() + "获取锁失败");
        }
    }
}
/**
 线程T1获取锁成功
 线程T2获取锁失败
 线程T1释放锁
 线程T4获取锁成功
 线程T3获取锁失败
 线程T4释放锁
 */

主要和 lock() 方法进行对比:

  • lock.lock():必须等持有锁对象的线程做完事情,其他等待的线程才可以做事情。而且中途不能退出。
  • lock.lockInterruptibly():也必须是等待持有锁对象的线程做完事情,其他线程才能做事情,但中途可以退出。

lockInterruptibly() 通过这个方法去获取锁时,如果线程正在等待获取锁,则这个线程能够响应中断,即中断线程的等待状态。也就使说,当两个线程同时通过 lock.lockInterruptibly() 想获取某个锁时,假若此时线程 A 获取到了锁,而线程 B 只有在等待,那么对线程 B 调用 threadB.interrupt() 方法能够中断线程 B 的等待过程

一个简单案例

public class LockInterruptiblyDemo {

    private Lock lock = new ReentrantLock();

    public void bFuction() {
        String tName = Thread.currentThread().getName();
        try {
            System.out.println(tName + "-开始获取锁..........");
            lock.lockInterruptibly();

            System.out.println(tName + "-获取到锁了!!!!");
            System.out.println(tName + "-睡觉了,睡个30秒!");
            Thread.sleep(30000);
            System.out.println(tName + "-睡醒了,干活!");
        } catch (Exception e) {
            System.out.println(tName + "-我好像被中断了!");
            e.printStackTrace();
        } finally {
            lock.unlock();
            System.out.println(tName + "-释放了锁");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        LockInterruptiblyDemo bc=new LockInterruptiblyDemo();

        Thread t0 = new Thread(() -> bc.bFuction());
        Thread t1 = new Thread(() -> bc.bFuction());

        String tName=Thread.currentThread().getName();

        System.out.println(tName+"-启动t0!");
        t0.start();
        System.out.println(tName+"-我等个5秒,再启动t1");
        Thread.sleep(5000);
        System.out.println(tName+"-启动t1");
        t1.start();

        System.out.println(tName+"-t1获取不到锁,t0这货睡觉了,没释放,我等个5秒!");
        Thread.sleep(5000);
        System.out.println(tName+"-等了5秒了,不等了,把t1中断了!");
        t1.interrupt();

        Thread.sleep(Long.MAX_VALUE);
    }
}

/**
 * main-启动t0!
 * main-我等个5秒,再启动t1
 * Thread-0-开始获取锁..........
 * Thread-0-获取到锁了!!!!
 * Thread-0-睡觉了,睡个30秒!
 * main-启动t1
 * main-t1获取不到锁,t0这货睡觉了,没释放,我等个5秒!
 * Thread-1-开始获取锁..........
 * main-等了5秒了,不等了,把t1中断了!
 * Thread-1-我好像被中断了!
 * java.lang.InterruptedException
 * Thread-0-睡醒了,干活!
 */

关键字 synchronized 与 wait() / notify() 这两个方法一起使用可以实现等待/通知模式, Lock 锁的 newContition()方法返回 Condition 对象,Condition 类也可以实现等待/通知模式。

用 notify() 通知时,JVM 会随机唤醒某个等待的线程, 使用 Condition 类可以进行选择性通知, Condition 比较常用的两个方法:

  • await():使当前线程等待,同时会释放锁,当其他线程调用 signal() 时,线程会重新获得锁并继续执行
  • signal():用于唤醒一个等待的线程

一个简单案例

public class NewConditionDemo {

    private Lock lock = new ReentrantLock();
    private Condition conditionA = lock.newCondition();
    private Condition conditionB = lock.newCondition();
    private Condition conditionC = lock.newCondition();
    private int number = 1;

    public void printA() {
        lock.lock();
        try {
            // 判断
            while (number != 1) { conditionA.await(); }
            // 打印
            System.out.println(Thread.currentThread().getName());
            // 唤醒
            number++;
            conditionB.signal(); // 此处会使 printB() 方法中的 conditionB.await() 的线程唤醒
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void printB() {
        lock.lock();
        try {
            // 判断
            while (number != 2) { conditionB.await(); }
            // 打印
            System.out.println(Thread.currentThread().getName());
            // 唤醒
            number++;
            conditionC.signal(); // 此处会使 printB() 方法中的 conditionB.await() 的线程唤醒
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void printC() {
        lock.lock();
        try {
            // 判断
            while (number != 3) { conditionC.await(); }
            // 打印
            System.out.println(Thread.currentThread().getName());
            // 唤醒
            number++;
            conditionA.signal(); // 此处会使 printB() 方法中的 conditionB.await() 的线程唤醒
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        NewConditionDemo demo = new NewConditionDemo();
        new Thread(() -> demo.printA()).start();
        new Thread(() -> demo.printB()).start();
        new Thread(() -> demo.printC()).start();
    }
}

4.3.3 Lock 接口实现类 ReentrantLock

ReentrantLock,意思是“可重入锁”,是唯一实现了 Lock 接口的类,并且 ReentrantLock 提供了更多的方法。

4.4 ReadWriteLock 接口

public interface ReadWriteLock {
    // 返回用于读取的锁
    Lock readLock();
    // 返回用于写入的锁
    Lock writeLock();
}

一个用来获取读锁,一个用来获取写锁。也就是说将文件的读写操作分开,分成 2 个锁来分配给线程,从而使得多个线程可以同时进行读操作。

ReentrantReadWriteLock 实现了 ReadWriteLock 接口,提供了很多丰富的方法。

  • 如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁。
  • 如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程会一直等待释放写锁。

5、线程间通信(生产者和消费者模式)

线程间通信的模型有两种:共享内存和消息传递,以下方式都是基本这两种模型来实现的。

class Share {
    private int number = 0;

    // 创建 Lock
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    // +1
    public void incr() throws InterruptedException {
        // 上锁
        lock.lock();
        try {
            // 判断
            while (number != 0) {
                condition.await();;
            }
            // 干活
            number++;
            System.out.println(Thread.currentThread().getName() + "::" + number);
            // 通知
            condition.signalAll();
        } finally {
            // 解锁
            lock.unlock();
        }
    }

    // -1
    public void decr() throws InterruptedException {
        lock.lock();
        try {
            while (number != 1) {
                condition.await();
            }
            number--;
            System.out.println(Thread.currentThread().getName() + "::" + number);
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }
}

public class ThreadDemp2 {

    public static void main(String[] args) {
        Share share = new Share();
        new Thread(() -> {
            for (int i = 0; i <= 10; i++) {
                try {
                    share.incr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "AA").start();
        new Thread(() -> {
            for (int i = 0; i <= 10; i++) {
                try {
                    share.decr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "BB").start();
        new Thread(() -> {
            for (int i = 0; i <= 10; i++) {
                try {
                    share.incr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "CC").start();
        new Thread(() -> {
            for (int i = 0; i <= 10; i++) {
                try {
                    share.decr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "DD").start();
    }
}

  1. wait():令当前线程挂起并放弃 CPU、同步资源并等待,使别的线程可访问并修改共享资源,而当前线程排队等候其他线程调用 notify() 或 notifyAll() 方法唤醒,唤醒后等待重新获得对监视器的所有权后才能继续执行
    • 使当前线程进入等待(某对象)状态 ,直到另一线程对该对象发出 notify() 或 notifyAll() 为止
    • 调用此方法后,当前线程将释放对象监控权 ,然后进入等待
    • 在当前线程被 notify() 后,要重新获得监控权,然后从断点处继续代码的执行
  2. notify():唤醒正在排队等待同步资源的线程中优先级最高者结束等待,只能唤醒一个wait()线程
  3. notifyAll():唤醒正在排队等待资源的所有线程结束等待
  4. 这三个方法只有在 synchronized 方法或 synchronized 代码块中才能使用,否则会报java.lang.IllegalMonitorStateException 异常
  5. 因为这三个方法必须有锁对象调用,而任意对象都可以作为 synchronized 的同步锁,因此这三个方法只能在 Object 类中声明
  6. 当每个线程都有特定锁的时候,只有等待这个锁的线程才能被唤醒,也就是线程 2 的 notify() 或 notifyAll() 不能唤醒线程 1 的 wait();
// 第一步:创建资源类,定义属性和操作方法
class Share {
    // 初始化
    private int number = 0;

    // +1 的方法
    public synchronized void incr() throws InterruptedException {
        // 第二步:判断、干活、通知
        while (number != 0) {
            this.wait(); // 在哪里睡就会在哪里醒
        }
        // 如果 number 值是 0,就 +1
        number++;
        System.out.println(Thread.currentThread().getName() + "::" + number);
        // 通知其他线程
        this.notifyAll();
    }

    // -1 的方法
    public synchronized void decr() throws InterruptedException {
        // 判断
        while (number != 1) {
            this.wait();
        }
        // 干活
        number--;
        System.out.println(Thread.currentThread().getName() + "::" + number);
        // 通知
        this.notifyAll();
    }
}

public class ThreadDemo {

    // 第三步:创建多个线程,调用资源类的操作方法
    public static void main(String[] args) {
        Share share = new Share();
        // 创建线程
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    share.incr();
                } catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        }, "AA").start();

        // 创建线程
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    share.decr();
                } catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        }, "BB").start();

        // 创建线程
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    share.incr();
                } catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        }, "CC").start();

        // 创建线程
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    share.decr();
                } catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        }, "DD").start();
    }
}

6、集合的线程安全问题

  1. 线程安全与线程不安全集合,常见例如(以下都是通过 synchronized 关键字实现,效率较低):

    ArrayList ---> Vector

    HashMap ---> HashTable

  2. Collections 构建的线程安全集合

  3. java.util.concurrent 并发包下CopyOnWriteArrayListCopyOnWriteArraySetConcurrentHashMap 类型,通过动态数组与线程安全个方面保证线程安全

7、JUC 三大辅助类

7.1 减少计数 CountDownLatch

CountDownLatch 类可以设置一个计数器,然后通过 countDown 方法来进行减 1 的操作,使用 await() 方法等待计数器不大于 0,然后继续执行 await() 方法之后的语句。

  • CountDownLatch 主要有两个方法,当一个或多个线程调用 await() 方法时,这些线程会阻塞
  • 其它线程调用 countDown 方法会将计数器 -1(调用 countDown 方法的线程不会阻塞)
  • 当计数器的值变为 0 时,因 await() 方法阻塞的线程会被唤醒,继续执行
public class CountDownLatchDemo {

    // 6个同学陆续离开教室之后,班长锁门
    public static void main(String[] args) throws InterruptedException {

        // 创建CountDownLatch对象,设置初始值
        CountDownLatch countDownLatch = new CountDownLatch(6);

        // 6个同学陆续离开教室之后
        for (int i = 1; i <=6; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+" 号同学离开了教室");

                //计数  -1
                countDownLatch.countDown();
            },String.valueOf(i)).start();
        }

        // 等待
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName()+" 班长锁门走人了");
    }
}

7.2 循环栅栏 CyclicBarrier

CyclicBarrier 的构造方法第一个参数是目标障碍数,每次执行 CyclicBarrier 一次障碍数会加一,如果达到了目标障碍数,才会执行 cyclicBarrier.await() 之后的语句。可以将 CyclicBarrier 理解为加 1 操作

public class CyclicBarrierDemo {

    //创建固定值
    private static final int NUMBER = 7;

    public static void main(String[] args) {
        //创建CyclicBarrier
        CyclicBarrier cyclicBarrier =
                new CyclicBarrier(NUMBER,()->{
                    System.out.println("*****集齐7颗龙珠就可以召唤神龙");
                });

        //集齐七颗龙珠过程
        for (int i = 1; i <=7; i++) {
            new Thread(()->{
                try {
                    System.out.println(Thread.currentThread().getName()+" 星龙被收集到了");
                    //等待
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            },String.valueOf(i)).start();
        }
    }
}

7.3 信号灯 Semaphore

Semaphore 的构造方法中传入的第一个参数是最大信号量(可以看成最大线程池),每个信号量初始化为一个最多只能分发一个许可证。

  • acquire() 方法获得许可证
  • release() 方法释放许可
public class SemaphoreDemo {
    public static void main(String[] args) {
        //创建Semaphore,设置许可数量
        Semaphore semaphore = new Semaphore(3);

        //模拟6辆汽车
        for (int i = 1; i <=6; i++) {
            new Thread(()->{
                try {
                    //抢占
                    semaphore.acquire();

                    System.out.println(Thread.currentThread().getName()+" 抢到了车位");

                    //设置随机停车时间
                    TimeUnit.SECONDS.sleep(new Random().nextInt(5));

                    System.out.println(Thread.currentThread().getName()+" ------离开了车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    //释放
                    semaphore.release();
                }
            },String.valueOf(i)).start();
        }
    }
}

8、阻塞队列

8.1 什么是 BlockingQueue

image-20220718141957742

常用的队列主要有以下两种:

  • 先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性
  • 后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件(栈)

使用 BlockingQueue 的好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切 BlockingQueue 都给你一手包办了

8.2 BlockingQueue 核心方法

方法类型 抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移出 remove() poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用
情况 说明
抛出异常 当阻塞队列满时,再往队列里 add 插入元素会抛 IllegalStateException:Queue full
当阻塞队列空时,再往队列里 remove 移除元素会抛出 NoSuchElementException
特殊值 插入方法时,成功返回 true,失败 false
移除方法时,成功返回队列的元素,队列里没有就返回 null
一直阻塞 当阻塞队列满时,生产者线程继续往队列里 put 元素,队列会一直阻塞生产者线程直到 put 数据或者响应中断退出
当阻塞队列空时,消费者线程试图从队列里 take 元素,队列会一直阻塞消费者线程直到队列可用
超时退出 当阻塞队列满时,队列会阻塞生产者线程一定时间,超过限时后生产者线程会退出

以下是部分方法的说明:

  • offer(anObject):表示如果可能的话,将 anObject 加到 BlockingQueue 里,即如果 BlockingQueue 可以容纳,则返回 true;否则返回 false(本方法不阻塞当前执行方法的线程)
  • offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入 BlockingQueue,则返回失败
  • put(anObject):把 anObject 加到 BlockingQueue 里,如果 BlockQueue 没有空间,则调用此方法的线程被阻断直到 BlockingQueue 里面有空间再继续

  • poll(time):取走 BlockingQueue 里排在首位的对象,若不能立即取出,则可以等 time 参数规定的时间,取不到时返回 null
  • poll(long timeout, TimeUnit unit):从 BlockingQueue 取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则直到时间超时还没有数据可取,返回失败。
  • take():取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到 BlockingQueue 有新的数据被加入;
  • drainTo():一次性从 BlockingQueue 获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

public class BlockingQueueDemo {

    public static void main(String[] args) throws InterruptedException {
        //创建阻塞队列
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);

        //第一组
//        System.out.println(blockingQueue.add("a"));
//        System.out.println(blockingQueue.add("b"));
//        System.out.println(blockingQueue.add("c"));
//        //System.out.println(blockingQueue.element());
//
//        //System.out.println(blockingQueue.add("w"));
//        System.out.println(blockingQueue.remove());
//        System.out.println(blockingQueue.remove());
//        System.out.println(blockingQueue.remove());
//        System.out.println(blockingQueue.remove());

        //第二组
//        System.out.println(blockingQueue.offer("a"));
//        System.out.println(blockingQueue.offer("b"));
//        System.out.println(blockingQueue.offer("c"));
//        System.out.println(blockingQueue.offer("www"));
//
//        System.out.println(blockingQueue.poll());
//        System.out.println(blockingQueue.poll());
//        System.out.println(blockingQueue.poll());
//        System.out.println(blockingQueue.poll());

        //第三组
//        blockingQueue.put("a");
//        blockingQueue.put("b");
//        blockingQueue.put("c");
//        //blockingQueue.put("w");
//
//        System.out.println(blockingQueue.take());
//        System.out.println(blockingQueue.take());
//        System.out.println(blockingQueue.take());
//        System.out.println(blockingQueue.take());

        //第四组
        System.out.println(blockingQueue.offer("a"));
        System.out.println(blockingQueue.offer("b"));
        System.out.println(blockingQueue.offer("c"));
        System.out.println(blockingQueue.offer("w",3L, TimeUnit.SECONDS));
    }
}

8.3 常见的 BlockingQueue

  1. ArrayBlockingQueue:由数组结构组成的有界阻塞队列

  2. LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列

  3. DelayQueu:使用优先级队列实现的延迟无界阻塞队列

  4. PriorityBlockingQueue:支持优先级排序的无界阻塞队列

  5. SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列

    1. SynchronousQueue没有容量,与其他 BlcokingQueue 不同,SynchronousQueue 是一个不存储元素的 BlcokingQueue
    2. 每个 put 操作必须要等待一个 take 操作,否则不能继续添加元素,反之亦然
  6. LinkedTransferQueue:由链表组成的无界阻塞队列

  7. LinkedBlockingDeque:由链表组成的双向阻塞队列

9、线程池

9.1 为什么使用线程池

线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。

  • 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
  • 提高响应速度:当任务到达时,任务可以不需要等待线程创建就能立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

9.2 Executors 创建线程池

Executors 工具类中的方法如图所示:

image-20220804011642467

image-20220816025825433

通过 Executor 框架的工具类 Executors 来实现,可以创建四种类型的 ThreadPoolExecutor:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
  • 作用:创建一个可重用固定线程数的线程池,可控制线程的最大并发数,超出的线程会在队列中等待
  • 特征
    1. 线程池中的线程处于一定的量,可以很好的控制线程的并发量
    2. 线程可以重复被使用,在显示关闭之前,都将一直存在
    3. 创建的线程池 corePoolSize 和 MaxmumPoolSize 是相等的,它使用的是 LinkedBlockingQueue
  • 场景:适用于可以预测线程数量的业务中,或者服务器负载较重,对线程数有严格限制的场景

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
  • 作用:创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务都按照指定顺序执行。
  • 特征:创建的线程池 corePoolSize 和 MaxmumPoolSize 都设置为 1,它使用的是 LinkedBlockingQueue
  • 场景:适用于需要保证顺序执行各个任务,并且在任意时间点,不会同时有多个线程的场景

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
  • 作用:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程
  • 特点:
    1. 线程池中数量没有固定,可达到最大值(Interger. MAX_VALUE)
    2. 来了任务就创建线程运行,如果线程空闲超过 60 秒,就销毁线程
    3. 创建的线程池 corePoolSize 设置为 0,MaxmumPoolSize 设置为 Interger. MAX_VALUE,它使用的是 SynchronousQueue
  • 场景:适用于创建一个可无限扩大的线程池,服务器负载压力较轻,执行时间较短,任务多的场景

主要用来在给定的延迟后运行任务,或者定期执行任务。这个在实际项目中基本不会被用到,也不推荐使用。只需要简单了解一下它的思想即可。

《阿里巴巴Java开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。Executors 返回线程池对象的弊端如下:

  1. CachedThreadPoolScheduledThreadPoolExecutor:允许创建的线程数量为 Integer.MAX_VALUE,可能会创建大量线程导致 OOM
  2. FixedThreadPoolSingleThreadExecutor:允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM

9.3 ThreadPoolExecutor 创建线程池

image-20220804182835092

  1. corePoolSize:保留在池中的线程数,即使它们是空闲的,除非设置 allowCoreThreadTimeOut
  2. maximumPoolSize:池中允许的最大线程数
  3. keepAliveTime:当线程数大于核心时,这是多余的空闲线程在终止前等待新任务的最长时间
  4. unit:keepAliveTime 参数的时间单位
  5. workQueue:用于在执行任务之前保存任务的队列。此队列将仅保存由 execute 方法提交的 Runnable 任务
  6. threadFactory:表示生成线程池中工作线程的线程工厂,用户创建新线程,一般用默认即可
  7. handler:拒绝策略

9.4 拒绝策略

当提交任务数大于 corePoolSize 的时候,会优先将任务放到 workQueue 阻塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到 maximumPoolSize 最大线程数配置。此时,再多余的任务,则会触发线程池的拒绝策略了

image-20220718212314441

9.5 线程池底层工作原理

image-20220718190806638

  1. 在创建了线程池后,线程池中的线程数为零

  2. 当调用 execute() 方法添加一个请求任务时,线程池会做出如下判断:

    • 如果正在运行的线程数量小于 corePoolSize(线程池的核心线程数),那么马上创建线程运行这个任务;
    • 如果正在运行的线程数量大于或等于 corePoolSize(线程池的核心线程数),那么将这个任务放入队列;
      • 如果这个时候队列满了且正在运行的线程数量还小于 maximumPoolSize(容纳的最大线程数),那么还是要创建非核心线程立刻运行这个任务;
      • 如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize(容纳的最大线程数),那么线程池会启动饱和拒绝策略来执行。
  3. 当一个线程完成任务时,它会从队列中取下一个任务来执行

  4. 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:

    • 如果当前运行的线程数大于 corePoolSize(线程池的核心线程数),那么这个线程就被停掉。
    • 所以线程池的所有任务完成后,它最终会收缩到 corePoolSize(线程池的核心线程数)的大小。

9.6 如何合理分配核心线程数

9.6.1 CPU 密集型

CPU 密集型的意思是该任务需要大量的运算,而没有阻塞,CPU 一直全速运行。

CPU 密集型任务只有在真正的多核 CPU 上才可能得到加速(通过多线程),而在单核 CPU 上,无论你开几个模拟的多线程,该任务都不可能得到加速,因为 CPU 总的运算能力就那些。

CPU 密集型任务配置尽可能少的线程数量,一般公式:CPU 核数 + 1 个线程的线程池

9.6.2 IO 密集型

由于 IO 密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如 CPU * 2

IO 密集型,即该任务需要大量的 IO,即大量的阻塞。

在单线程上运行 IO 密集型的任务会导致浪费大量的 CPU 运算能力浪费在等待。所以在 IO 密集型任务中使用多线程可以大大地加速程序运行,即使在单核 CPU 上,这种加速主要就是利用了被浪费掉的阻塞时间。

IO 密集型时,大部分线程都阻塞,故需要多配制线程数:

  • 参考公式:CPU 核数 / (1 - 阻塞系数)
  • 阻塞系数:在 0.8~0.9 之间

比如一个 8 核 CPU,8 / (1 - 0.9) = 80 个线程数


END

本文作者:
文章标题:JUC 入门知识概述
本文地址:https://www.pendulumye.com/juc/438.html
版权说明:若无注明,本文皆PendulumYe原创,转载请保留文章出处。
最后修改:2022 年 09 月 08 日
千山万水总是情,给个一毛行不行💋