“[翻译]用Java实现自定义的线程池”
Contents
让我们用Java来实现自定义的线程池.
让我们首先定义一个类,它拥有两个方法enqueue
和dequeue
. 这个类的dequeue
方法将充当一个阻塞队列。即,当队列里有数据时,阻塞所有调用dequeue
方法,否则就一直等待。
package com.pract.threadpool;
public interface CustomQueue<E>{
public void enqueue(E e);
public E dequeue();
}
以下是实现
package com.pract.threadpool;
import java.util.LinkedList;
import java.util.Queue;
public class MyQueue<E> implements CustomQueue<E>{
// queue backed by a linkedlist
private Queue<E> queue = new LinkedList<E>();
/**
* Enqueue will add an object to this queue, and will notify any waiting
* threads that now there is an object available.
*
* In enqueue method we just adding the elements not caring of size,
* we can even introduce some check of size here also.
*/
@Override
public synchronized void enqueue(E e) {
queue.add(e);
// Wake up anyone waiting on the queue to put some item.
notifyAll();
}
/**
* Make a blocking call so that we will only return when the queue has
* something on it, otherwise wait until something is put on it.
*/
@Override
public synchronized E dequeue(){
E e = null;
while(queue.isEmpty()){
try {
wait();
} catch (InterruptedException e1) {
return e;
}
}
e = queue.remove();
return e;
}
}
现在,让我们思考下线程池管理器,通过名称以及工作线程的大小参数,即线程池大小。并且,worker
线程应该监听该任务被提交到的队列。
以下是实现:
package com.pract.threadpool;
public class ThreadPoolManager {
private final int THREADPOOL_CAPACITY;
private MyQueue<Runnable> myQueue = new MyQueue<Runnable>();
public ThreadPoolManager(int capacity){
this.THREADPOOL_CAPACITY = capacity;
initAllConsumers();
}
private void initAllConsumers(){
for(Integer i = 0; i < THREADPOOL_CAPACITY; i++){
Thread thread = new Thread(new Worker(myQueue, i.toString()));
thread.start();
}
}
public void submitTask(Runnable r){
myQueue.enqueue(r);
}
}
如果你检查上面的实现,你就会发现我们已经初始化的队列为MyQueue
:
MyQueue<Runnable> myQueue = new MyQueue<Runnable>();
MyQeueue
队列会一直保持所有任务会被我们的worker
线程执行。
基于上面,我们也创建一个Worker
类. Worker
类是我们的工作者,它会一直监听队列然后处理提交的任务。
以下是Worker
的实现:
package com.pract.threadpool;
public class Worker implements Runnable{
private MyQueue<Runnable> myQueue;
private String name;
public Worker(MyQueue<Runnable> myQueue, String name){
this.myQueue = myQueue;
this.name = name;
}
@Override
public void run() {
while(true){
Runnable r = myQueue.dequeue();
// print the dequeued item
System.out.println(" Taken Item by thread name:" + this.name );
// run it
r.run();
System.out.println(" Task completed of thread:" + this.name);
}
}
}
现在,让我们测试下我们以上的实现:
package com.pract.threadpool;
public class TestThreadPoolManager {
public static void main(String[] args) {
ThreadPoolManager poolManager = new ThreadPoolManager(10);
//now lets submit task
poolManager.submitTask(new Runnable() {
@Override
public void run() {
System.out.println("Starting Task A....");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task A Completed....");
}
});
poolManager.submitTask(new Runnable() {
@Override
public void run() {
System.out.println("Starting Task B....");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task B Completed....");
}
});
}
}
以下是输出:
Taken Item by thread name:9
Taken Item by thread name:0
Starting Task A....
Starting Task B....
Task B Completed....
Task completed of thread:0
Task A Completed....
Task completed of thread:9