原文

让我们用Java来实现自定义的线程池.

让我们首先定义一个类,它拥有两个方法enqueuedequeue. 这个类的dequeue方法将充当一个阻塞队列。即,当队列里有数据时,阻塞所有调用dequeue方法,否则就一直等待。

package com.pract.threadpool;

public interface CustomQueue<E>{

    public void enqueue(E e);

    public E dequeue();

}

以下是实现

package com.pract.threadpool;

import java.util.LinkedList;
import java.util.Queue;


public class MyQueue<E> implements CustomQueue<E>{

    // queue backed by a linkedlist
    private Queue<E> queue = new LinkedList<E>();


    /**
     * Enqueue will add an object to this queue, and will notify any waiting
     * threads that now there is an object available.
     *
     * In enqueue method we just adding the elements not caring of size,
     * we can even introduce some check of size here also.
     */
    @Override
    public synchronized void enqueue(E e) {
        queue.add(e);
        // Wake up anyone waiting on the queue to put some item.
        notifyAll();
    }

    /**
    * Make a blocking call so that we will only return when the queue has
    * something on it, otherwise wait until something is put on it.
    */
    @Override
    public synchronized E dequeue(){
        E e = null;

        while(queue.isEmpty()){
            try {
                wait();
            } catch (InterruptedException e1) {
                return e;
            }
        }
        e = queue.remove();
        return e;
    }
}

现在,让我们思考下线程池管理器,通过名称以及工作线程的大小参数,即线程池大小。并且,worker线程应该监听该任务被提交到的队列。

以下是实现:

package com.pract.threadpool;

public class ThreadPoolManager {

    private final int THREADPOOL_CAPACITY;
    private MyQueue<Runnable> myQueue = new MyQueue<Runnable>();

    public ThreadPoolManager(int capacity){
        this.THREADPOOL_CAPACITY = capacity;
        initAllConsumers();
    }

    private void initAllConsumers(){
        for(Integer i = 0; i < THREADPOOL_CAPACITY; i++){
            Thread thread = new Thread(new Worker(myQueue, i.toString()));
            thread.start();
        }
    }


    public void submitTask(Runnable r){
        myQueue.enqueue(r);
    }

}

如果你检查上面的实现,你就会发现我们已经初始化的队列为MyQueue:

MyQueue<Runnable> myQueue = new MyQueue<Runnable>();

MyQeueue队列会一直保持所有任务会被我们的worker线程执行。 基于上面,我们也创建一个Worker类. Worker类是我们的工作者,它会一直监听队列然后处理提交的任务。

以下是Worker的实现:

package com.pract.threadpool;

public class Worker implements Runnable{

    private MyQueue<Runnable> myQueue;
    private String name;

    public Worker(MyQueue<Runnable> myQueue, String name){
        this.myQueue = myQueue;
        this.name = name;
    }


    @Override
    public void run() {
        while(true){
            Runnable r = myQueue.dequeue();
            // print the dequeued item
            System.out.println(" Taken Item by thread name:" + this.name );
            // run it
            r.run();
            System.out.println(" Task completed of thread:" + this.name);
        }
    }
}

现在,让我们测试下我们以上的实现:

package com.pract.threadpool;

public class TestThreadPoolManager {

    public static void main(String[] args) {
        ThreadPoolManager poolManager = new ThreadPoolManager(10);

        //now lets submit task
        poolManager.submitTask(new Runnable() {
            @Override
            public void run() {
                System.out.println("Starting Task A....");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task A Completed....");
            }
        });

        poolManager.submitTask(new Runnable() {
            @Override
            public void run() {
                System.out.println("Starting Task B....");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task B Completed....");
            }
        });
    }
}

以下是输出:

Taken Item by thread name:9
Taken Item by thread name:0

Starting Task A....
Starting Task B....

Task B Completed....
Task completed of thread:0

Task A Completed....
Task completed of thread:9