学习是件开心事

多线程-ThreadPoolExecutor(不定时更新)

一直很想写一篇关于多线程的文章,于是搜了很多文章,自己琢磨了许久才敢下笔 ,那么开始吧。

ThreadPoolExecutor线程池初探


jdk8线程池类ThreadPoolExecutor的完整构造方法如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);



corePoolSize: 核心线程数,会一直存活,即使没有任务,线程池也会维护线程的最少数量。
maximumPoolSize: 最大线程数,一个线程池最大的可运行线程数目。
keepAliveTime: 线程等待新任务的时间。线程会等待分配新任务的时间,如果超时还没有等到新任务那么线程会退出,直到当前的线程数=corePoolSize。
unit: 线程等待新任务的时间单位。
workQueue: 线程池用的缓冲队列,常用的是:java.util.concurrent. ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue
PrivilegedThreadFactory: 默认的线程工厂,创建线程用的。
defaultHandler: 线程池中的数量大于maximumPoolSize,对拒绝任务的处理策略,默认值ThreadPoolExecutor.AbortPolicy()。

线程池运行过程猜想


下面上一段代码测试:
public static void main(String[] args) {
BlockingQueue queue=new LinkedBlockingQueue<>(4);
final ExecutorService threadPool = new ThreadPoolExecutor(2,6,1,TimeUnit.DAYS,queue);
for (int i=1;i<=15;i++) {
final int finalI = i;
threadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + “开始任务” + finalI);
System.out.println();
try {

Thread.sleep(1000 * 60);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + “结束任务” + finalI);
}
});
}
threadPool.shutdown();
System.out.println(“主线程结束了”);
}



测试代码运行结果:
pool-1-thread-1开始任务1
pool-1-thread-2开始任务2
pool-1-thread-3开始任务7
pool-1-thread-4开始任务8
pool-1-thread-5开始任务9
pool-1-thread-6开始任务10
Exception in thread “main” java.util.concurrent.RejectedExecutionException: Task com.houbank.mls.test.ThreadTest$1@7eda2dbb rejected from java.util.concurrent.ThreadPoolExecutor@6576fe71[Running, pool size = 6, active threads = 6, queued tasks = 4, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)


运行结果总共6个线程在执行任务,线程1执行任务1,线程2执行任务2,线程3执行任务7?,线程4执行任务8?,线程5执行任务9?,线程6执行任务10? 为什么会是这种结果呢? 原来:

首先创建线程池的的时候,核心线程数为2,最大线程数为6,线程等待任务的时间为1 day,同时队列采用LinkedbBlockingQuenel来缓存任务,队列缓存任务的长度是4。
每次执行execute()方法的时候线程池
step1:如果当前运行的线程数小于corePoolSize,会立即创建一个线程运行任务。
step2:如果当前运行的线程数大于等于corePoolSize,会把任务缓存到队列中。
step3:如果队列放满了,而且此时的当前运行线程数小于maximumPoolSize,还是要创建一个线程来运行任务。
step4:如果队列满了,同时前运行线程数等于或大于maximumPoolSize时,拒绝新任务抛出异常。
即:任务1和2 execute()时,创建线程执行任务。此时当前运行线程为2,达到了corPoolSize指定核心线程数。任务3,4,5,6 execute()时直接把任务放到缓存队列里,同时缓存队列size=4达到饱和。任务7,8,9,10 execute()时还是要创建线程来执行的,同时当前运行线程数累加到6达到maximumPoolSize饱和。所以余下的任务被拒绝并抛出异常。

jdk8的相关源码


    public class ThreadPoolExecutor extends AbstractExecutorService {
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
            //step1:如果当前运行线程数小于corePoolSize,addWorker(创建线程并执行任务)
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //step2:如果当前运行线程数大于等于corePoolSize,会把任务加到队列中
            //下面是拒绝任务的情况 一般不会进 主要是线程池被急停
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
          //如果队列满了即step2的offer结果为fasle 往后执行
          //step3: 继续创建线程来执行任务,
        else if (!addWorker(command, false))
          //如果创建失败,那就应该是达到maximumPoolSize饱和了,并采用策略拒绝任务
            reject(command);
    } 

}

创建线程启动任务


由上可知 addWoeker()方法是来创建线程启动任务的,那么线程把起始任务执行完毕 又是如何从队列中获取新任务来执行的呢?直接看代码

  private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // step1:参数检验 运行状态是否为RUNNING 是否有任务 是否有缓存队列 
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                //step1:参数检验完再看此时的运行状态  如果不是RUNNING 赶紧跳转到第一行retry从头开始检验
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
             //step2:核心代码 把任务委托给Worker来创建线程并启动任务
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                //step3:加锁 把创建的的worker加入到set集合里,并设置标识。
                //因为这里存在并发,操作的是成员变量,workers是Set,每个子线程共享
                try {
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                //step3: 操作完数据后解锁
                    mainLock.unlock();
                }
                if (workerAdded) {
            //step4: threadFactory新创建的线程包装了Worker对象,在执行新创建线程的run方法时,调用到了Worker对象的run方法
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
以下是内部类Worker的相关方法:

        ----------每次创建一个worker的时候,都先从工厂里创建一个线程,同时把任务赋给该线程,worker持有该线程。
        Worker(Runnable firstTask) {
            setState(-1); 
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        ----------需要注意worker也是实现runnable的 所以也需要实现任务方法 run()
           public void run() {
            runWorker(this);
        }
        ----------运行任务 
        final void runWorker(Worker w) {
        //step1: 从当前的worker中获得含有任务的线程
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
        //while循环 ,第一次从worker中获得任务,以后都是getTask()方法来获得任务
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                    //task是获得的任务 执行任务的执行体
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
----------从队列中获得新任务:
<pre><code>private Runnable getTask() {
    boolean timedOut = false; 
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
        //从队列中获取任务 如果有时间限制 那么用poll  没有时间限制 用take 
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

最后附上简单的流程图
Image text

以上就是本人的一点小分析,还有很多遗漏的地方等待后续补进
感谢博客:
https://my.oschina.net/u/1398304/blog/376827
http://shmilyaw-hotmail-com.iteye.com/blog/1897638

ps: 为什么ThreadPoolExecutor的addWorker()方法里调用t.start() 会调用Worker()的run方法


解释:

  Worker w = null;
w = new Worker(firstTask);
final Thread t = w.thread;

t是Worker类的一个成员变量thread,上面代码意思是传一个初始任务给Worker给Worker,在把Worker创建的线程赋值给t。

Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

Worker是ThreadPoolExecutor的内部类,接收到firstTask后调用线程工厂来创建线程,并把自己即this传进去,
getThreadFactory()返回的是ThreadPoolExecutor的成员变量threadFactory,如果自己没有自定义并setThreadFactory的话,默认使用Executors.defaultThreadFactory(),这个在定义线程池就已经赋值了,就不贴代码了。查看Executors内部类defaultThreadFactory的newThread()方法

 public Thread newThread(Runnable r) {//Worker也是实现Runnable的
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}

可以看出newThread()就是定义了即将要创建线程所在的group,线程要执行的目标任务r,线程的名称name (DefaultThreadFactory构造方法里有赋值namePrefix,有兴趣自看),还有运行的stacksize大小(0就是忽略的意思,最后创建线程还是调用Thread的构造方法。


private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc) {……

经过Thread多个构造方法最终进入上面的构造方法,里面定义了线程的安全性,执行等级等,有机会独立开一章。可以看出我们传进的Worker对象最终变成了target,target就是目标任务,线程启动后就得要执行目标任务的run()方法
线程启动需要调用Thread的start(),Thread的start()方法里调用了start0(),

private native void start0();

jvm调用start0的时候会调用Thread的run方法

public void run() {
if (target != null) {
target.run();
}
}

很不幸的是run方法就是启动目标任务的run方法,目标任务就是我们之前传进来的Worker。

ps:idea多线程测试


第一,idea的dubug默认是查看所有线程,多线程调试最好修改idea配置
Ctrl+shift+F8 -> 去掉All 选择Thread 这样就可以进行单条线程测试了。如果有子线程运行并且卡在breakpoint会有弹窗征求你是否切换线程
第二:t.start()方法启动线程,线程全部进入Runnable状态等待cpu分配时间碎片,如果系统是单核,正常运行时只有一个线程是在running其他线程都是sleeping。