JAVA并发编程(八)-线程池的管理

Scroll Down

JAVA并发编程(八)-线程池的管理

线程的未捕获异常与监控

如果线程的run方法抛出未被捕获的异常,那么run方法和线程都会终止,对于这种线程异常情况,JDK1.5引入了java.lang.Thread.UncaughtExceptionHandler,该接口是在Thread内部定义的,只定义了一个方法

void uncaughtException(Thread t, Throwable e);

两个参数包括了异常终止的线程本身以及导致线程提前终止的异常,比如某个线程因为空指针等未捕获的情况停止了,需要启动一个新的线程为他完成任务,则需要实现这个方法.

public class ThreadMonitorDemo {
    final static Logger LOGGER = Logger.getAnonymousLogger();
    static int threadIndex = 0;
    final BlockingQueue<String> channel = new ArrayBlockingQueue<String>(100);
    volatile boolean inited = false;

    public static void main(String[] args) throws InterruptedException {
        ThreadMonitorDemo demo = new ThreadMonitorDemo();
        demo.init();
        for (int i = 0; i < 100; i++) {
            demo.service("test-" + i);
        }

        Thread.sleep(2000);
        System.exit(0);
    }

    public synchronized void init() {
        if (inited) {
            return;
        }
        Debug.info("init...");
        WokrerThread t = new WokrerThread();
        t.setName("Worker0-" + threadIndex++);
        // 为线程t关联一个UncaughtExceptionHandler
        t.setUncaughtExceptionHandler(new ThreadMonitor());
        t.start();
        inited = true;
    }

    public void service(String message) throws InterruptedException {
        channel.put(message);
    }

    private class ThreadMonitor implements Thread.UncaughtExceptionHandler {
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            Debug.info("Current thread is `t`:%s, it is still alive:%s",
                    Thread.currentThread() == t, t.isAlive());

            // 将线程异常终止的相关信息记录到日志中
            String threadInfo = t.getName();
            LOGGER.log(Level.SEVERE, threadInfo + " terminated:", e);

            // 创建并启动替代线程
            LOGGER.info("About to restart " + threadInfo);
            // 重置线程启动标记
            inited = false;
            init();
        }

    }// 类ThreadMonitor定义结束

    private class WokrerThread extends Thread {
        @Override
        public void run() {
            Debug.info("Do something important...");
            String msg;
            try {
                for (; ; ) {
                    msg = channel.take();
                    process(msg);
                }
            } catch (InterruptedException e) {
                // 什么也不做
            }
        }

        private void process(String message) {
            Debug.info(message);
            // 模拟随机性异常
            int i = (int) (Math.random() * 100);
            if (i < 2) {
                throw new RuntimeException("test");
            }
            Tools.randomPause(100);
        }
    }// 类ThreadMonitorDemo定义结束
}

最终结果即使是线程报错了他也会启动一个新的线程来继续执行任务,直到任务结束,而且uncaughtException方法是抛出异常的线程t中执行的,执行uncaughtException方法的时候t还是存活的,执行完之后t就被终止了.

线程工厂

public interface ThreadFactory {

    /**
     * Constructs a new {@code Thread}.  Implementations may also initialize
     * priority, name, daemon status, {@code ThreadGroup}, etc.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     *         create a thread is rejected
     */
    Thread newThread(Runnable r);
}

ThreadFactory接口中的newThread()方法,我们可以在这方法中封装线程创建的逻辑,使得用统一的方式创建线程并配置一些信息,例如统一增加线程日志,线程异常监控等

public class XThreadFactory implements ThreadFactory {
    final static Logger LOGGER = Logger.getAnonymousLogger();
    private final UncaughtExceptionHandler ueh;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    // 所创建的线程的线程名前缀
    private final String namePrefix;

    public XThreadFactory(UncaughtExceptionHandler ueh, String name) {
        this.ueh = ueh;
        this.namePrefix = name;
    }

    public XThreadFactory(String name) {
        this(new LoggingUncaughtExceptionHandler(), name);
    }

    public XThreadFactory(UncaughtExceptionHandler ueh) {
        this(ueh, "thread");
    }

    public XThreadFactory() {
        this(new LoggingUncaughtExceptionHandler(), "thread");
    }

    protected Thread doMakeThread(final Runnable r) {
        return new Thread(r) {
            @Override
            public String toString() {
                // 返回对问题定位更加有益的信息
                ThreadGroup group = getThreadGroup();
                String groupName = null == group ? "" : group.getName();
                String threadInfo = getClass().getSimpleName() + "[" + getName() + ","
                        + getId() + ","
                        + groupName + "]@" + hashCode();
                return threadInfo;
            }
        };
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = doMakeThread(r);
        t.setUncaughtExceptionHandler(ueh);
        t.setName(namePrefix + "-" + threadNumber.getAndIncrement());
        if (t.isDaemon()) {
            t.setDaemon(false);
        }
        if (t.getPriority() != Thread.NORM_PRIORITY) {
            t.setPriority(Thread.NORM_PRIORITY);
        }
        if (LOGGER.isLoggable(Level.FINE)) {
            LOGGER.fine("new thread created" + t);
        }
        return t;
    }

    static class LoggingUncaughtExceptionHandler implements
            UncaughtExceptionHandler {
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            // 将线程异常终止的相关信息记录到日志中
            LOGGER.log(Level.SEVERE, t + " terminated:", e);
        }
    }// LoggingUncaughtExceptionHandler类定义结束
}

线程池

线程是一种昂贵的资源,主要体现在:

  • 线程的创建于启动的开销,与普通的对象相比,线程占用了额外的栈空间,并且线程启动会导致产生相应的线程调度开销
  • 线程的销毁也有额外的开销
  • 线程调度的开销,线程的调度会导致上下文切换,从而增加处理器资源的消耗

线程池介绍

线程池本身是一个对象,线程池内部预先创建一定数量的工作者线程(核心线程),客户端线程只需要向线程池提交任务对象即可,线程池可能将这些任务对象缓存在任务队列中,由工作者线程去不断的去除任务并执行.线程池可以看做是消费者-生产者模式的一种服务.

线程池的继承关系:

image-20200402222237345

线程池的运行机制:

image-20200402222309482

//corePoolSize 核心线程数  maximumPoolSize 最大线程数    keepAliveTime和unit指定空闲线程的最大存活时间 workQueue工作队列(阻塞) threadFactory 线程工厂,通常用于设置线程名,RejectedExecutionHandler 线程执行被拒绝后的处理接口
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
参数名含义作用
corePoolSize核心线程池大小线程池大小达到核心线程池大小的时候,新来的任务会被存放到工作队列之中,直到队列满了之后才会进行创建新的工作者线程,直到线程池满
maximumPoolSize最大线程数最大线程数,达到最大线程数之后会触发线程池拒绝策略
long keepAliveTime,
TimeUnit unit
存活时间非核心线程的空闲线程的最大存活时间
workQueue工作队列一个阻塞式的工作队列
threadFactory线程工厂默认使用Executors.defaultThreadFactory()创建
RejectedExecutionHandler handler拒绝策略当线程池满之后触发的拒绝策略

这个是ThreadPoolExecutor最完整的构造方法,客户端每提交一个任务就会创建一个工作者线程来处理该任务,**(如果调用了prestartCoreThread()或者 prestartAllCoreThreads(),线程池创建的时候所有的核心线程都会被创建并且启动,这样可以减少线程池处理的所需时间)**随着客户端不断的提交任务,当前线程池大小也相应增加,在当前线程池大小达到核心线程池大小的时候,新来的任务会被存放到工作队列之中,缓存的任务由线程池中所有工作者线程负责取出并进行执行,线程池将任务存入工作队列的时候调用的就是BlockingQueue的非阻塞方法offer(E e),因此即使是工作队列满了,线程池还是会继续创建新的工作者线程,工作队列满时,线程池才会继续创建新的工作者线程,直到当前线程池大小达到最大线程池大小,如果工作队列满并且线程池大小也达到最大值的时候,客户端提交任务会被拒绝,即需要使用RejectedExecutionHandler handler方法用于封装任务被拒绝的策略.线程池创建线程默认是用Executors.defaultThreadFactory()来创建的.

image-20200402222354278

使用不同的工作队列可以获取不同的策略:

image-20200402222414430

任务的申请过程

线程需要从任务缓存模块中不断地取任务执行,帮助线程从阻塞队列中获取任务,实现线程管理模块和任务管理模块之间的通信。这部分策略由getTask方法实现

640-1585834913414

线程池的拒绝策略

ThreadPoolExecutor默认自带了4种拒绝策略,可供不同的形式下使用,如果不够满足业务需求,可以自己实现RejectedExecutionHandler 接口进行处理RejectedExecutionHandler_thumb
640-1585834983213

线程池的生命周期

线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。在具体实现中,线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起,如下代码所示:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl这个AtomicInteger类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它同时包含两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。

关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码所示:

为什么说是ctl的前三位来存储状态,后29位来存储数量呢

private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

其中COUNT_BITS = 32 - 3 = 29,CAPACITY = (1 << 29) - 1;

CAPACITY 使用位运算的效果就是

1       	= 00000000 00000000 00000000 00000001
1 << 29 	= 00100000 00000000 00000000 00000000
1 << 29 - 1 = 00011111 11111111 11111111 11111111
private static int runStateOf(int c)     { return c & ~CAPACITY; } //计算当前运行状态
private static int workerCountOf(int c)  { return c & CAPACITY; }  //计算当前线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; }   //通过状态和线程数生成ctl

这样状态值取的是 c & ~CAPACITY,~CAPACITY = 11100000...,这样状态值只需要取前三位即可,而c & CAPACITY中,CAPACITY的前三位是0,证明是永久舍弃前三位,最多只支持29位数的线程数量,大小为:536870911,所以可以达到一个原子变量数字既表示状态,又表示线程数量

640

关闭线程池

ThreadPoolExecutor中关闭线程池也分为两种:

  • 平缓关闭:ThreadPoolExecutor#shutdown关闭的时候线程池中已提交的任务会继续执行,新提交的任务会像线程池饱和时一样被拒绝,客户端代码可以使用ThreadPoolExecutor#awaitTermination来等待线程池关闭结束
  • 立即关闭:ThreadPoolExecutor#shutdownNow关闭的时候正在执行任务会停止,已提交正在等待执行的任务也不会执行,该方法会返回已提交未被执行的任务列表,通过调用工作者线程的interrupt方法来停止正在执行的任务的,因此某些无法响应中断的任务可能永远也不会停止.

image-20200402222439559

Worker线程管理

线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    final Thread thread;//Worker持有的线程
    Runnable firstTask;//初始化的任务,可以为null
}

Worker这个工作线程,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask。thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务;firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。

Worker执行任务的模型如下图所示:

640-1585835923244

线程池需要管理线程的生命周期,需要在线程长时间不运行的时候进行回收。线程池使用一张Hash表去持有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。这个时候重要的就是如何判断线程是否在运行。

Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。

  1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中。
  2. 如果正在执行任务,则不应该中断线程。
  3. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。
  4. 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。

在线程回收过程中就使用到了这种特性,回收过程如下图所示:
image-20200402222508267

增加worker线程

增加线程是通过线程池中的addWorker方法,该方法的功能就是增加一个线程,该方法不考虑线程池是在哪个阶段增加的该线程,这个分配线程的策略是在上个步骤完成的,该步骤仅仅完成增加线程,并使它运行,最后返回是否成功这个结果。addWorker方法有两个参数:firstTask、core。firstTask参数用于指定新增的线程执行的第一个任务,该参数可以为空;core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize,其执行流程如下图所示:

640-1585836082984

worker线程回收

线程池中线程的销毁依赖JVM自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。Worker被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当Worker无法获取到任务,也就是获取的任务为空时,循环会结束,Worker会主动消除自身在线程池内的引用。

try {
  while (task != null || (task = getTask()) != null) {
    //执行任务
  }
} finally {
  processWorkerExit(w, completedAbruptly);//获取不到任务时,主动回收自己
}

这里消除引用是从HashSet workers中直接workers.remove(w);

Worker线程执行任务

在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的执行过程如下:

  1. while循环不断地通过getTask()方法获取任务。
  2. getTask()方法从阻塞队列中取任务。
  3. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。
  4. 执行任务。
  5. 如果getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。

执行流程如下图所示:

image-20200402222533078

获取任务的处理结果,异常处理与取消

ThreadPoolExecutor中向线程池提交任务有两种方式,一种是submit,一种是execute,submit具有返回值,execute没有返回值.

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

其中submit中相当于是使用的Callable接口,Callable接口相当于一个增强的Runnable实现类,因为他具有特定的返回值,而submit(Runnable task, T result) 也可以将Runnable接口转换为Callable接口实例.submit的返回值类型都是java.util.concurrent.Future<T>,我们可以使用Future.get()来获取任务的执行结果.

Future类

Future.get()方法:

  • Future.get()被调用时,如果任务还未执行完毕,那么Future.get()会使当前线程暂停,直到相应的任务执行结束,因此Future.get()是个阻塞方法,并且Future.get()会抛出InterrupuedException异常,说明它可以响应线程中断,如果Future.get()抛出了未捕获的异常,那么Future.get()会抛出一个ExecutionException异常,可以从ExecutionException.getCause()中获取当前抛出的异常.
  • 因为Future.get()会导致上下文切换,所以代码中提交给线程池执行任务的代码要尽量早,Future.get()获取结果的代码要尽量晚,这样可以使得线程等待的时间变少

Future.cancel(boolean mayInterruptIfRunning)方法:

  • 该方法的返回值可以表示相应的任务是否敲成功,参数mayInterruptIfRunning代表着是否允许发送interrupt来取消任务,如果任务取消成功的话再使用Future.get()会抛出CancellationException异常

Future.isCancelled()方法

  • 该方法的返回值是判断任务是否被取消成功,如果前面调用了Future.cancel()方法,那么调用Future.get()之前需要判断一下任务是否已经被取消了,被取消了则不调用Future.get()方法

Future.isDone()方法

  • 可以检测任务是否执行完毕,任务执行完毕或者抛出异常或者任务被取消都会返回true

线程池的监控

因为线程池的大小,工作队列的容量等属性是没有一个标准答案的,所以我们可能需要在测试的时候对线程池进行监控来确定具体的参数

方法含义
getPoolSize()获取线程池当前的大小
getQueue()返回工作队列的实例,可以根据这个再获取具体的工作队列大小
getLargestPoolSize()获取线程池的峰值
getActiveCount()获取线程池中正在工作的线程数
getTaskCount()获得线程池目前为止接收到的任务数
getCompletedTaskCount()获取线程池中已经处理完毕的任务数

ThreadPoolExecutor池中还提供了两个钩子方法,beforeExecute(Thread t, Runnable r);afterExecute(Runnable r, Throwable t);,当一个任务r在被线程池中的线程t执行前,会先触发beforeExecute(Thread t, Runnable r)方法,当线程t执行完了r之后,无论是否成功,都会执行afterExecute(Runnable r, Throwable t)方法,我们可以在通过创建ThreadPoolExecutor的子类来计算任务执行的平均耗时或者监控其他数据.

线程池中的异常死锁

如果线程池中执行的任务会向同一个线程池中提交另一个任务,而且前一个任务的执行结束又依赖于后一个任务的执行结果,那么就有可能导致所有的工作者线程都在等待其他任务的工作结果,但是其他任务都仍然在工作队列中等待,导致线程池死锁.一般同一个线程池只用户执行相互独立的任务,这样可以避免死锁.

线程的异常终止

如果是使用ThreadPoolExecutor#submit方法,即使代码中抛出了异常,也不会导致正在执行工作者线程的异常终止,需要使用Future.get()方法来捕获ExecutionException异常来获取

如果是使用ThreadPoolExecutor#executor方法来提交给线程池的,一旦执行时出现异常,那么工作者线程就会异常终止,我们可以通过创建ThreadPoolExecutor的时候设置线程工厂,这个工厂创建线程时可以关联一个UncaughtExceptionHandler接口,这样出现未捕获的异常时,可以进行一定的处理.

总结

线程管理

参考:

《JAVA多线程编程实战指南》

Java线程池实现原理及其在美团业务中的实践 ps:写的太好了,看原文比我总结的更好