Java并发(4)深入分析java线程池框架及实现原理(一)

前言

先说说我个人对线程池的理解:线程池顾名思义是一个装有很多线程的池子,这个池子维护着从线程创建到销毁的怎个生命周期以及线程的分配,用户只需要把任务提交给这个线程池而不用去关心线程池如何创建线程,线程池会自己给这些任务分配线程资源来完成任务。

正文

java的Executor线程池框架类图大致如下: ​​​​​​​​

  • Executor:执行者,java线程池框架的最上层父接口,地位类似于spring的BeanFactry、集合框架的Collection接口,在Executor这个接口中只有一个execute方法,该方法的作用是向线程池提交任务并执行。

  • ExecutorService:该接口继承自Executor接口,添加了shutdown、shutdownAll、submit、invokeAll等一系列对线程的操作方法,该接口比较重要,在使用线程池框架的时候,经常用到该接口。

  • AbstractExecutorService:这是一个抽象类,实现ExecuotrService接口。

  • ThreadPoolExecutor:这是Java线程池最核心的一个类,该类继承自AbstractExecutorService,主要功能是创建线程池,给任务分配线程资源,执行任务。

  • ScheduledExecutorSerivce 和 ScheduledThreadPoolExecutor 提供了另一种线程池:延迟执行和周期性执行的线程池。

  • Executors:这是一个静态工厂类,该类定义了一系列静态工厂方法,通过这些工厂方法可以返回各种不同的线程池。

    要深入分析这个线程池框架,我们先从使用者的角度入手进行分析,要使用这个框架首先得有个可执行的线程任务(实现了Runnable或Callable接口的类):

    1
    2
    3
    4
    5
    6
    7
    8
    public class Task implements Runnable {
    @Override
    public void run() {
    for (int i = 0; i < 10; i++) {
    System.out.println(Thread.currentThread().getName()+"---"+i);
    }
    }
    }

    然后通过静态工厂类Executors产生一个线程池,然后将这个任务提交给线程池。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public class TaskTest {
    public static void main(String[] args){
    ExecutorService exec = Executors.newFixedThreadPool(3); // 核心线程为3的线程池,最大线程为3的线程池,池子中只会有三个线程
    // ExecutorService exec = Executors.newCachedThreadPool(); // 核心线程为0,最大线程为Integer.MAX_VALUE的线程池
    // ExecutorService exec = Executors.newScheduledThreadPool(3); // 核心线程为3的延迟线程池
    // ExecutorService exec = Executors.newSingleThreadExecutor(); // 相当于Executors.newFixedThreadPool(1);
    for (int i = 0; i < 5; i++) {
    exec.execute(new Task());// 提交五个任务给线程池
    }
    }
    }

    从结果中可以看到,虽然提交了五个任务,但是始终是由三个线程执行的。

    在以上的使用中我们通过静态工厂类产生了一种类型的线程池,辣么,就从这个Executors入手进行分析java的这个线程池框架,先看看Execuotrs中几种线程池的静态工厂方法:

  1. newFixedThreadPool

    1
    2
    3
    4
    5
    6
    7
    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads,
    nThreads,
    0L,
    TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }
    1
    2
    3
    4
    5
    6
    7
    8
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads,
    nThreads,
    0L,
    TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>(),
    threadFactory);
    }
  2. newCachedThreadPool

    1
    2
    3
    4
    5
    public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    }
  3. newCachedThreadPool

    1
    2
    3
    4
    5
    6
    public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>()));
    }

    对于每一种的线程池工厂方法都可以在参数处传入一个ThreadFactroy对象,用来构造线程,线程池将按照你的ThreadFactroy中的newThread方法来产生线程,如果不传入这个参数,默认使用Executors的静态内部类DefaultThreadFactroy作为线程生成策略。

    DefaultThreadFactroy源码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    /**
    * The default thread factory
    */
    static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
    SecurityManager s = System.getSecurityManager();
    group = (s != null) ? s.getThreadGroup() :
    Thread.currentThread().getThreadGroup();
    namePrefix = "pool-" +
    poolNumber.getAndIncrement() +
    "-thread-";
    }

    public Thread newThread(Runnable r) {
    Thread t = new Thread(group, r,
    namePrefix + threadNumber.getAndIncrement(),
    0);
    if (t.isDaemon())
    t.setDaemon(false);
    if (t.getPriority() != Thread.NORM_PRIORITY)
    t.setPriority(Thread.NORM_PRIORITY);
    return t;
    }
    }

    默认的线程工厂会给线程分好组然后给线程设置名字,设置线程为非守护线程,并且将线程优先级设为Thread.NORM_PRIORITY (该值为5,默认线程优先级都是5)

    从上面的三种线程池的静态工厂方法中可以看出,通过在new ThreadPoolExecutor的时候传入不同的参数就可以得到不同的线程池了,下面先看看ThreadPoolExecutor构造方法中的各个参数及含义:

    1
    2
    3
    4
    5
    6
    7
    8
    public ThreadPoolExecutor(int corePoolSize,     //核心线程数
    int maximumPoolSize, //最大线程数
    long keepAliveTime, //非核心线程存活时间
    TimeUnit unit, //时间单位
    BlockingQueue<Runnable> workQueue, //排队线程的阻塞队列
    ThreadFactory threadFactory, //线程工厂
    RejectedExecutionHandler handler) { //当前线程已达到最大线程数,且阻塞队列已满时的异常处理策略
    }

    对于以上各参数的理解,可以举一个栗子进行讲解

    我们可以自定义一个 核心线程数为1,最大线程数为2,非核心闲置线程存活时间为100毫秒,阻塞队列大小为2的线程池如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    ExecutorService executorService = new ThreadPoolExecutor( 1, //核心线程
    2, //最大线程
    100, //闲置线程存活时间
    TimeUnit.MILLISECONDS, //时间单位:毫秒
    new ArrayBlockingQueue<Runnable>(2), //大小为3的阻塞队列
    Executors.defaultThreadFactory(), //使用Executors类的默认线程工厂
    new RejectedExecutionHandler() { //new 一个匿名内部类实现RejectedExecutionHandler接口,作为异常处理策略
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    if (r instanceof Task){
    System.out.println( "task:" + ((Task) r).taskId + " is Handel!");
    try {
    TimeUnit.MILLISECONDS.sleep(51);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    });

    创建一个任务,每个任务有一个自己的id:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public class Task implements Runnable {

    private static AtomicInteger number = new AtomicInteger(0);

    public int taskId = number.addAndGet(1);

    @Override
    public void run() {
    System.out.println(Thread.currentThread().getName() + " task:" + taskId + " start...");
    try {
    TimeUnit.MILLISECONDS.sleep(100);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println(Thread.currentThread().getName() + " task:" + taskId + " end...");
    }
    }

    接下来向这个线程池提交10个任务:

    1
    2
    3
    4
    5
    for (int i = 0; i < 10; i++) {
    executorService.execute(new Task()); //提交十个任务给线程池
    }
    executorService.shutdown();
    System.out.println("main is over");

    先看看运行结果:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    pool-1-thread-1 task:1 start...
    pool-1-thread-2 task:4 start...
    task:5 is Handel!
    task:6 is Handel!
    pool-1-thread-1 task:1 end...
    pool-1-thread-2 task:4 end...
    pool-1-thread-1 task:2 start...
    pool-1-thread-2 task:3 start...
    task:9 is Handel!
    task:10 is Handel!
    pool-1-thread-1 task:2 end...
    pool-1-thread-2 task:3 end...
    pool-1-thread-1 task:7 start...
    pool-1-thread-2 task:8 start...
    main is over
    pool-1-thread-1 task:7 end...

    pool-1-thread-2 task:8 end...

    结果中: poo-1表示线程属于那个线程池,-thread-1表示哪一个线程在执行任务(线程名),task:1表示哪一个任务被执行, start… end…表示任务开和结束

    分析一下运行过程:

  4. 在for循环中向线程池提交第一个任务的时候由于线程池中还没有任何线程,并且给线程池设置的核心线程数为1,所以,这时候线程池会通过ThreadFactroy创建一个核心线程1并用这个线程执行任务1.

  5. 在向线程池提交任务2时,由于线程池中核心线程数已经用完(Task任务会sleep 100毫秒,所以这时任务1还未结束,线程1还在被使用),阻塞队列还没满,所以向阻塞队列里面添加任务2,,任务3同理进入阻塞队列

  6. 在向线程池提交任务4时,由于线程池中核心线程数已经用完,阻塞队列已经满了,但线程池中线程数还未达到最大线程数,所以创建非核心线程2,并用这个线程执行任务4.

  7. 在向线程池中提交任务5时,由于此时线程池中,线程数已经达到最大值,并且所有线程都在被使用,并且阻塞队列也已经满了,所以此时线程池就会回调你传入的RejectedExecutionHandler的rejectedExecution方法作为异常处理策略处理任务5(该方法其实是在主线程中处理的,在该策略中我让线程sleep 51秒,以便用该策略处理两个Task之后,核心线程1执行的任务已经结束),任务6同理会被该策略处理。

  8. 在向线程池中提交第7个任务前2毫秒(理论值),此时,核心线程1、非核心线程2的任务1、任务4已经执行完成,当核心线程1执行完任务之后,发现阻塞队列中还有任务在排队等待,这时根据队列先进先出依次取出队列中的任务2和任务3分配给两个线程,所以当任务7、任务8提交的时候发现阻塞队列还没满,会依次被线程池添加进阻塞队列.

  9. 在向线程池提交任务9、任务10的时候同第4步原理,会被异常处理策略处理。

  10. 至此,所有任务都被提交,接着任务2、任务3结束,阻塞队列中的任务7、任务8被执行,然后任务7、任务8结束,程序结束!

    总结一下:当向线程池中添加任务时,首先会从核心线程池中获取线程执行任务,核心线程池中没有空余线程时,任务会进入阻塞队列,阻塞队列满了之后,线程池会分配非核心线程,直到池子里的所有线程达到最大值,此时会将任务交由异常处理策略处理,非核心线程在执行完任务之后如果空闲时间超过设定值将会被回收,下一次需要的时候又重新被创建,核心线程不会被回收。

    通过以上例子,相信大家对线程池已经有了一个比较深入的了解,对于静态工厂类Executors提供的几个线程池也能比较好的理解了,在接下来的一篇文章中,我会深入剖析在jdk1.8中线程池核心类ThreadPoolExecutor这个类是如何实现以上的功能的。
    .