<Java并发编程之美>笔记
Contents
线程回调
public static class CallTask implements Callable<String> {
@Override
public String call() throws Exception {
return "hello world";
}
}
public void call() {
FutureTask<String> futureTask = new FutureTask<>(new CallTask());
new Thread(futureTask).start();
try {
String result = futureTask.get();
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testFutureTask() {
call();
}
Object 中的函数
wait 函数 / wait(timeout) 函数
调用后, 会被阻塞挂起. 直到下面条件之一才返回. 需要注意的是, 调用 wait()
要先获取该对象的监视器锁. (获取监视器锁, 可以通过在方法里执行 synchronized (共享变量){}
或 在方法级添加签名 public synchronized void yourMethodName(){}
- 调用该共享变量的
notify()
或notifyAll()
方法 - 调用该共享变量的
interrupt()
方法, 这时抛出InterruptedException
异常返回
虚假唤醒
还需要注意的是, 一个线程可以从挂起状态变为可运行状态(也就是被唤醒), 即使该线程没有被其他线程调用 notify()
或 notifyAll()
方法, 或被中断, 或等待超时. 这就是虚假唤醒.
所以, 防范的做法是. 在一个循环中调用 wait()
方法, 不停地测试线程被唤醒的条件是否满足.即
synchronized (共享变量) {
while (条件不满足) {
共享变量.wait();
}
}
释放锁
当线程调用共享对象的 wait()
方法时, 当前线程只会释放当前共享对象的锁, 当前线程持有的其他共享对象的监视器锁并不会释放.
notify / notifyAll 函数
- notify 会唤醒一个在该共享变量上调用 wait 系列方法后被挂起的线程
- notifyall 会唤醒所有在该共享变量上由于调用 wait 系列方法后被挂起的线程
Thread 中的线程方法
join 等待线程结束
调用者会一直等待该线程结束后才会返回.
sleep 休眠
调用后, 只会让出 CPU 资源, 但锁是不释放的.sleep 后, 调度器不会去调度它.
yield 让出 CPU 执行权
yield 表示当前线程让出 CPU 使用权, 然后处于就绪状态, 线程调度器会从线程就绪队列中获取一个线程优先级最高的线程, 当然, 也有可能刚让出, 就被调度器再次调度执行了.
线程中断
- interrupt 方法: 设置中断标志. 实际线程并没有中断, 依然会继续往下执行. 如果有其他地方调用了该线程的 wait , join, sleep 方法而被挂起时, 这些方法会抛出 InterruptedException 异常而返回
- boolean isInterrupted : 判断是否被中断
- static boolean interrupted : 返回中断标志. 如果当前被中断, 则会清除中断标志.(要注意, 这个是静态方法, 即判断的是当前所在的线程.而不是实例线程)
死锁
产生四条件
- 互斥条件
- 请求并持有条件
- 不可剥夺条件
- 环路等待条件
避免
只要破坏上面其中之一条件即可. 通常只有 请求并持有和环路等待
可以破坏.
造成死锁的原因, 其实和申请资源的顺序有很大关系, 使用资源申请的有序性原则
就可以避免死锁.
守护线程和用户线程
守护线程并不会影响 JVM 结束. 但只要有一个用户线程还没结束, 正常情况下 JVM 就不会退出.
Main 线程结束后, JVM 会自动启动一个叫作 DestroyJavaVM 的线程, 它会等待所有用户线程结束后终止 JVM 进程.
ThreadLocal
只是一个工具类, 实际的本地变量存放在调用线程的 threadLocals 变量里.
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
ThreadLocal 是线程间独立的. 也就是说, 子线程并不能获取父线程中的值. 如果想让子线程获取父线程的值, 则可以使用 InheritableThreadLocal 类来代替 ThreadLocal
private static final ThreadLocal<String> STRING_THREAD_LOCAL = new InheritableThreadLocal<>();
@Test
public void test() {
STRING_THREAD_LOCAL.set("hello world from parent value");
new Thread(() -> System.out.println("in sub thread get value => " + STRING_THREAD_LOCAL.get())).start();
}
共享变量的内存可见性
Java 内存模型规定, 将所有的变量都存放在主内存中, 当线程使用变量时, 会把主内存里面的变量复制到自己的工作空间或叫工作内存, 线程读写变量时操作的是自己工作内存中的变量.
synchronized
一种原子性内置锁, 也称为内部锁, 监视器锁.(它是排他的. ) 代码进入 synchronized
前会自动获得内部锁, 其他线程访问时会被阻塞挂起.
它会在正常退出同步代码块或抛出异常或调用了内置锁资源的 wait 系列方法时, 会自动释放内置锁.
由于 Java 线程与系统原生线程是一对一的关系, 当阻塞一个线程时, 需要从用户态切换到内核态. 这比较耗时, 并导致上下文切换
.
它的内存语义 : 在 synchronized 块内使用到的变量从线程的工作内存中清除, 这样在 synchronized 块使用该变量时就不会从线程的工作内存中获取, 而是直接从主内存中获取. 退出 synchronized 的内存语义是把 synchronized 块内对共享变量的修改刷新到主内存
. 这也是加锁和释放锁的语义.
volatile
该关键字可以确保对一个变量的更新, 对其他线程立即可见. 当一个变量声明为 volatile 时, 线程在写入变量时, 不会把值缓存在寄存器或其他地方, 而是会把值刷新回主内存. 其他线程读取该共享变量时, 会从主内存重新获取最新值, 而不是使用当前线程的工作内存中的值.
它也会防止指令重排.
CAS
非阻塞原子操作. 通过硬件保证. CAS 的四个操作数分别为对象内存位置, 对象中的变量的偏移量, 变量预期值和新值.
这是CPU 提供的一个原子性指令. 但这种会有一个 ABA 的问题. JDK 中的 AtomicStampedReference 类给每个变量状态值都配备了一个 timestamp , 从而避免了 ABA 的问题.
Unsafe 类
正常情况并不能正确访问它. 可通过反射来获取实例.
伪共享
CPU 缓存, 是按行存储的. 每一行称为一个 Cache 行, Cache 行是 Cache 与主内存进行数据交换的单位, Cache 行一般是 2 的幂次数字节.
当 CPU 访问某个变量时, 首先会去看 CPU Cache 内是否有. 如果有直接从中获取, 否则就去主内存里获取该变量, 然后把该变量所在内存区域的一个 Cache 行大小的内存复制到 Cache 行中. 由于存放到 Cache 行的是内存块, 而不是单个变量, 所以可能会把多个变量存放到一个 Cache 行中. 当多个线程同时修改一个缓存行里面的多个变量时
, 由于 同时只能有一个线程操作缓存行
, 所以相比将每个变量放到一个缓存行, 性能会有所下降, 这就是伪共享.
何时出现
因为多个变量被放入一个缓存行中, 并且多个线程同时去写入缓存行中不同的变量.
因为缓存与内存交换数据的单位就是缓存行. 当 CPU 要访问的变量没有在缓存中找到时, 根据程序运行的局部性原理, 会把该变量所在内存中大小为缓存行的内存放入缓存行.
避免伪共享
JDK 8 之前一般是通过字节填充的方式来避免该问题.
JDK 8 提供了一个 sun.misc.Contended
注释来解决伪共享问题.
默认情况下, @Contended
注释只用于 Java 核心类, 比如 rt 包下的类. 如果用户路径下的类需要使用这个注解, 则需要添加 JVM 参数: -XX:-RestrictContended
. 填充的宽度默认为 128. 可以通过参数 -XX:ContendedPaddingWidth
设置.
其他资料
- https://www.cnblogs.com/diegodu/p/9340243.html
- https://dzone.com/articles/what-false-sharing-is-and-how-jvm-prevents-it
- https://www.cnblogs.com/cyfonly/p/5800758.html
锁
乐观与悲观锁
- 悲观锁 : 对数据被外界修改持保守态度. 访问数据时就加排他锁. 一般使用数据库提供的锁机制.
- 乐观锁 : 认为数据一般不会造成冲突. 所以访问数据时不会加排他锁, 而是在进行数据提交更新时, 才会正式对数据冲突与否进行检测. 它不会使用数据库提供的锁机制, 在一般在表中添加 version 或业务状态字段来实现. 它是直到提交才锁定, 所以不会产生死锁.
公平与非公平锁
- 公平锁 : 表示线程获取锁的顺序是按照线程请求锁的时间早晚来决定的.
final ReentrantLock reentrantLock = new ReentrantLock(true);
- 非公平锁 : 在运行时闯入, 不一定先来先得.
final ReentrantLock reentrantLock = new ReentrantLock(false);
公平锁会有一定的额外开销. 默认是非公平锁.
独占与共享锁
- 独占锁 : 任一时刻, 只有一个线程能得到锁.
ReentrantLock
就是独占式的. (悲观锁) - 共享锁 : 可以同时由多个线程持有. 例如
ReadWriteLock
读写锁, 允许一个资源可以被多线程同时进行读操作. (乐观锁)
可重入锁
如果一个线程再次获取它自己已经获取的锁时, 不会被阻塞, 那就是可重入的.
synchronized
也是一种可重入锁.
自旋锁
当前线程在获取锁时, 如果发现锁被其他线程占有, 它不马上阻塞自己, 在不放弃 CPU 使用权的情况下, 多次尝试获取(默认次数是 10, 可以使用 -XX:PreBlockSpinsh
参数设置), 很可能在后面几次尝试中其他线程已经释放了锁. 如果尝试指定的次数后, 仍没有获取到锁则当前线程才会被阻塞挂起.
由此看来, 自旋锁是使用 CPU 时间换取线程阻塞与调度的开销, 但很可能这些 CPU 时间白白浪费了.
ThreadLocalRandom
Random 在多线程下的缺陷 : 每个 Random 实例里面都有一个原子性的种子变量用来记录当前的种子值, 当要生成新的随机数时需要根据当前种子计算新种子并更新回原子变量. 在多线程下使用单个 Random 实例生成随机数时, 当多个线程同时计算随机数来计算新的种子时, 多个线程会竞争同一个原子变量的更新操作, 由于原子变量的更新是 CAS 操作, 同时只有一个线程会成功, 所以会造成大量线程进行自旋重试, 这会降低并发性能. 所以 ThreadLocalRandom 应运而生
ThreadLocalRandom 并没有存放具体的种子, 具体的种子存放在具体的调用线程 Thread 的 threadLocalRandomSeed
变量里面. 它类似 ThreadLoal , 就是个工具类. 这个变量是 long 类型的, 因为它是线程级别的, 所以不需要是原子性变量.
Thread.java
:
// The following three initially uninitialized fields are exclusively
// managed by class java.util.concurrent.ThreadLocalRandom. These
// fields are used to build the high-performance PRNGs in the
// concurrent code, and we can not risk accidental false sharing.
// Hence, the fields are isolated with @Contended.
/** The current seed for a ThreadLocalRandom */
@sun.misc.Contended("tlr")
long threadLocalRandomSeed;
/** Probe hash value; nonzero if threadLocalRandomSeed initialized */
@sun.misc.Contended("tlr")
int threadLocalRandomProbe;
/** Secondary seed isolated from public ThreadLocalRandom sequence */
@sun.misc.Contended("tlr")
int threadLocalRandomSecondarySeed;
另外, 在 ThreadLocalRandom 里有个 instance 变量, 是 static 的, 即多线程共有同一个实例, 只是具体的种子是存放在线程里面的.
ThreadLocalRandom.java
:
/** The common ThreadLocalRandom */
static final ThreadLocalRandom instance = new ThreadLocalRandom();
原子类中的 CAS
内部都是使用 Unsafe 来实现.
AtomicLong
在高并发个还会存在性能问题(大量线程会同时去竞争更新同一个原子变量
, 但由于同时只有一个线程的 CAS 会成功, 这就造成了大量线程竞争失败后, 会通过无限循环不断进行自旋尝试 CAS 的操作, 而这会白白浪费 CPU 资源). JDK 8 提供了一个在高并发下性能更好的 LongAdder
类.
LongAdder
在内部维护多个 Cell 变量, 每个 Cell 变量里有一个初始值为 0 的 long 型变量, 这样, 在同等并发量下, 争夺单个变量更新操作的线程量会减少, 这变相地减少了争夺共享资源的并发量. 另外, 多个线程在争夺同一个 Cell 原子变量时如果失败了, 它并不是在当前 Cell 变量上一直自旋 CAS 重试, 而是尝试在其他 Cell 变量上进行 CAS 尝试, 这个改变增加了当前线程重试 CAS 成功的可能性. 最后, 在获取 LongAdder
当前值时, 是把所有 Cell 变量 value 值累加后再甽 base 返回.
LongAdder
是 LongAccumulator
的一个特例. LongAccumulator
相比 LongAdder
可以为累加器提供非 0 初始值, 后者只能提供默认的 0 值. 另外, LongAccumulator
还可以指定累加规则, 比如不进行累加, 而进行相乘, 在构造 LongAccumulator
时传入自定义的双目运算器即可, 而 LongAdder
则内置累加规则.
COW List
CopyOnWriteArrayList
是一个线程安全的 ArrayList. 对其修改, 都是在底层一个复制的数组(快照)上进行的, 也就是使用了写时复制策略. (所以会产生弱一致性问题
)
里面有个 array
成员数组, 用来存放具体元素. ReentrantLock
独占锁用来保证同时只有一个线程对 array 进行修改.
添加元素时, 首先复制一个快照, 然后在快照上进行添加, 而不是在原来数组上进行.
迭代时(即获取 Iterator 对象后, 它实际是 COWItertor 对象), 其他线程对该 list 进行的增删改操作不可见, 因为它们是两个不同的数组. 这就是弱一致性.
JUC 中锁原理
LockSupport 工具类
它与每个使用它的线程都会关联一个许可证, 在默认情况下调用 LockSupport 类的方法的线程是不持有许可证的. 底层用 Unsafe 实现.
park()
方法 : 如果当前线程拿到许可证, 则该方法会立即返回, 否则阻塞挂起.- 其他线程调用
unpark(Thread)
时, 调用 park 方法而被阻塞的线程会返回 - 调用了阻塞线程的
interrupt()
方法, 设置中断标志或虚假唤醒, 则阻塞线程也会返回. 所以最好使用循环条件判断方式 - 调用
park()
而阻塞的线程, 被其他线程中断而返回时, 并不会抛出InterruptedException
异常 park(nano)
park(blocker)
建议使用, 这样子在排查时, 可以知道哪个类被阻塞了
- 其他线程调用
unpark(Thread)
方法- 如果 thread 没有持有 thread 与 LockSupport 类关联的许可证, 则让 thread 持有
- 如果 thread 之前因调用
park()
而被挂起, 则调用后, 该线程会被唤醒 - 如果 thread 之前没有调用
park()
, 则调用unpark()
后, 再调用park()
, 会立即返回
AQS 抽象同步队列
AbstractQueuedSynchronizer
- 一个 FIFO 的双向队列
- Head, tail 分别记录首, 尾元素
- 元素类型为 Node
- 内部 thread 用来存放进入 AQS 队列里面的线程
- 内部的 SHARED 用来标识该线程是获取共享资源时被阻塞挂起后放入 AQS 队列
- 内部的 EXCLUSIVE 用来标识该线程是获取独占资源时被阻塞挂起后放入 AQS 队列
- waitStatus : 记录当前线程等待状态
- CANCELLED : 线程被取消
- SIGNAL : 需要被唤醒
- CONDITION : 在条件队列里等待
- PROPAGATE : 释放共享资源时需要通知其他节点
- Pre / next : 前驱和后继节点
- State
- 对于 ReentrantLock 实现, 它表示当前线程获取锁的可重入次数
- 对于 ReentrantReadWriteLock 来说, state 高 16 位表示读状态. 即获取读锁的次数. 低 16 位表示写锁的可重入次数.
- 对于 Semaphore 来说, 它表示可用信号的个数
- 对于 CountDownLatch 来说, 它表示计数器当前值
- 操作方式
- 独占方式下的获取和释放
acquire(int arg)
: 不对中断进行响应acquireInterruptibly(int arg)
: 对中断进行响应release(int arg)
- 共享方式下的获取和释放
acquireShared(int arg)
acquireSharedInterruptibly(int arg)
releaseShared(int arg)
- 内部类 ConditionObject : 用来结合锁实现线程同步.
条件变量
notify
和 wait
是配合 synchronized 实现同步的基础设施.
条件变量 Condition 的 signal
和 await
是配合 AQS 实现的锁的同步基础设施.
final ReentrantLock reentrantLock = new ReentrantLock();
final Condition condition = reentrantLock.newCondition();
@Test
public void testLock() throws InterruptedException {
new Thread(() -> {
reentrantLock.lock();
try {
System.out.println("begin wait");
condition.await();
System.out.println("end wait");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}).start();
TimeUnit.MILLISECONDS.sleep(10);
new Thread(() -> {
reentrantLock.lock();
try {
System.out.println("begin signal");
condition.signal();
System.out.println("end signal");
} finally {
reentrantLock.unlock();
}
}).start();
}
ReentrantLock
可重入独占锁
(即同一时刻, 只有一个线程可以获取该锁, 其他线程再获取再要阻塞挂起.). 最终还是使用 AQS 来实现. 可以看到, 内部类 Sync 直接继承 AQS.State 表示线程获取该锁的可重入次数
- 0 表示没有被任何线程持有
- 之后线程进行 CAS 设置 state 为 1, 成功则获取该锁
- 释放锁后, CAS 进行减 1
- 直到为 0 则线程释放该锁
获取锁:
lock()
, 内部调用的是sync.lock()
非公平锁实现
检测 state 是否为 0, 为 0 则尝试 CAS 获取. (即不管阻塞队列里是否有更早的线程在排队)
如果不为 0, 则判断当前线程是否该锁的持有者, 如果是, state + 1 , 然后返回 true
如果不为 0, 也不是锁持有者, 则放入 AQS 阻塞队列
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
公平锁实现
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
可以看到, 公平锁实现, 会判断 hasQueuedPredecessors()
- 调用
await
后, 会自动释放该锁.然后阻塞挂起.
ReentrantReadWriteLock
ReentrantLock
是独占锁, 同一时刻, 只有一个线程可以获取该锁.
而 ReentrantReadWriteLock
采用读写分离策略, 允许多个线程可以同时获取 读锁
- State 的高 16 位表示读状态
- State 的低 16 位表示写状态
- 写锁
WriteLock
是独占的. - 写锁是可多次获取的
- 有写锁时, 其他线程 请求读/写 会阻塞
- 有读锁时, 其他线程 请求写锁会阻塞
StampedLock
它提供三种模式的读写控制, 获取锁时, 返回一个 long 型的 stamp, 它代表了锁的状态. 失败返回为 0 的 stamp .
- writeLock : 它是一个排他/独占锁. 类似
ReentrantLock
(不同的是, 这里的是不可重入的) - readLock : 悲观读锁. 它是共享锁. 类似
ReentrantReadWriteLock
(不同的是, 这里是不可重入的). 悲观是指在具体操作数据前其会悲观地认为其他线程可能要对自己操作的数据进行修改, 所以需要先对数据加锁. 这是在读少写多的情况下的一种考虑. - tryOptimisticRead : 相对于悲观读锁, 它在操作数据前并没有通过 CAS 设置锁的状态, 仅通过位运算测试. 获取 stamp 后, 在具体操作数据前还需要调用 validate 方法验证该 stamp 是否已经不可用, 也就是在获取 stamp 到操作数据前期间, 是否有其他线程持有了写锁, 如果是, 则 validate 返回 0, 否则就可以正常操作数据. 适合读多写少的场景.
并且, 它还支持在这三种锁在一定条件下进行相互转换. 它的 tryOptimisticRead
性能比 ReentrantReadWriteLock
更好. 因为它只是简单位运算测试, 而不用 CAS
并发队列
ConcurrentLinkedQueue
- 线程安全
- 无界非阻塞队列
- 底层用单向链表实现
- 通过 CAS 来实现线程安全
操作
- offer : 向队尾添加一个元素
- add : 内部也是调用 offer
- poll : 从队头获取并移除一个元素
- peek : 从队头获取元素, 但不移除
- size : 由于是 CAS, 可能导致不精确
- remove : 删除第一个出现的元素. 并返回 true.
- contains : 是否包含指定对象
LinkedBlockingQueue
- 独占锁
- 阻塞队列
- 单向链表
- 有界(默认容量为 0x7fffffff, 也可以自定)
- size : 操作不一定是精确的
ArrayBlockingQueue
类似 LinkedBlockingQueue
. 只是实现是用数组.
- size 操作是精确的
PriorityBlockingQueue
- 优先级队列
- 无界
- 阻塞
- 内部使用平衡二叉树堆实现
- 默认使用对象的 compareTo 方法提供比较规则
- 默认情况下, 队列元素要实现 Comparable 接口(如果不提供比较器的话)
元素的顺序跟插入顺序无关, 而是和他们的优先级有关
DelayQueue
- 无界
- 阻塞
- 延迟队列
- 每个元素都有过期时间. 只有过期元素才会出队
- 队列头元素是最快要过期的元素
- 元素要实现 Delayed 接口
- Size 包括过期和没过期的元素
ThreadPoolExecutor
- 成员 ctl 用来记录线程池状态和线程池中线程个数
- 高 3 位表示状态
- 后 29 位表示线程池线程个数
- 状态
- RUNNING : 接受新任务, 并处理队列任务
- SHUTDOWN : 拒绝新任务, 但处理队列任务
- STOP : 拒绝新任务, 抛弃队列任务, 中断正在处理任务
- TIDYING : 所有任务执行完后, 当前线程池活动数为 0, 将要调用 terminated 方法
- TERMINATED : 终止状态. 调用 terminated 方法完成后的状态
- 转换
RUNNING -> SHUTDOWN
: 显式调用 shutdown 方法或隐式调用finalize()
RUNNING 或 SHUTDOWN -> STOP
: 显式调用shutdownNow()
方法SHUTDOWN -> TIDYING
: 线程池和任务队列都为空时TIDYING -> TERMINATED
: 当terminated()
完成时
- 参数
- corePoolSize : 核心线程数
- workQueue : 保存等待执行的任务的阻塞队列.
- maximunPoolSize : 最大线程数
- ThreadFactory : 创建线程的工厂
- RejectExecutioinHandler : 饱和策略. 当队列满且线程个数达到 maximunPoolSize 后采取的策略.
AbortPolicy
: 抛出异常CallerRunsPolicy
: 使用调用者所在线程来运行任务DiscardPolicy
: 默默丢弃, 不抛出异常DiscardOldestPolicy
: 调用poll
丢弃队列里的一个任务, 执行当前任务.- keepAliveTime/TimeUnit : 存活时间. 线程池的线程数量比 corePoolSize 数量多, 并且是闲置状态, 则这些闲置线程能存活的最大时间.
keekAliveTime = 0
, 表示只要线程个数比核心线程多并且当前空闲, 则回收.
- 线程池类型
- newFixedThreadPool : 创建一个核心线程和最大线程都为 N 的线程池, 并且阻塞队列长度为
Integer.MAX_VALUE
- newSingleThreadExecutor : 创建一个核心线程和最大线程都为 1 的线程池, 并且阻塞队列长度为
Integer.MAX_VALUE
- newCachedThreadPool : 按需创建线程的线程池. 初始为 0, 最大为
Integer.MAX_VALUE
, 并且阻塞队列为同步队列. 这个类型的特殊之处在于, 加入同步队列的任务会被马上执行, 同步队列里最多只有一个任务.
- newFixedThreadPool : 创建一个核心线程和最大线程都为 N 的线程池, 并且阻塞队列长度为
方法
void execute(Runnable)
: 提交任务到线程池执行.void shutdown()
: 这时线程池不会再接受新任务, 但工作队列里的任务还是要执行. 但它会立即返回, 并不会等待所有任务完成.List<Runnable> shutdownNow()
它也会立即返回, 返回值是队列里被丢弃的任务列表.awaitTermination(timeout)
: 当前线程会阻塞, 直到线程池状态变为TERMINATED
才返回, 或者等待时间超时才返回.import org.apache.commons.lang3.concurrent.BasicThreadFactory; @Bean public ThreadPoolExecutor threadPoolExecutor() { final ThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("app-executor-%d-").build(); final int cpus = Runtime.getRuntime().availableProcessors(); ThreadPoolExecutor executor = new ThreadPoolExecutor(cpus, cpus * 2, 10, TimeUnit.MINUTES, new ArrayBlockingQueue<>(QUEUE_SIZE), threadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); return executor; }
ScheduledThreadPoolExecutor
- 继承 ThreadPoolExecutor
- 实现 ScheduleExecutorService 接口
- 线程池队列是 DelayedWorkQueue, 和 DelayedQueue 类似, 是一个延迟队列
- 内部
ScheduledFutureTask
是具有返回值的任务, 继承自FutureTask
. FutureTask 内部有一个 state 表示任务状态.- state
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
ScheduledFutureTask
的 period- 0 表示一次性
> 0
表示fixed-rate
任务. 固定频率的可重复执行任务.< 0
表示fixed-delay
任务, 固定延迟的定时可重复执行任务.
重要方法
schedule(Runnable command, long delay, TimeUnit unit)
: 一次性任务. 在延迟指定时间后执行.scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
:initialDelay
表示提交任务后延迟多久开始执行任务delay
表示任务执行完后, 延迟多久再次执行执行- 一直重复 delay 执行
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
:- 相对起始时间点以固定频率调用指定任务(
fixed-rate
) - 提交任务后
initialDelay
时间后开始执行任务 - 然后从
initialDelay + period
时间点再次执行 然后再从
initialDelay + 2*period
时间点再次执行, 以此循环类推@Bean public ScheduledExecutorService scheduledThreadPoolExecutor() { final ThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("app-schedule-%d-").build(); final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), threadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); return executor; }
线程同步器
CountDownLatch
等待线程计数器. 不可重用. 计数器递减
CyclicBarrier
达到状态后, 可被重用.计数器递减
它可以让一组线程全部达到一个状态后, 再全部同时执行
Semaphore
与前两个不同. 计数器是递增的
调用 release()
方法, 相当于让计数器递增 1.
void acquire()
: 获取一个信号量资源. 如果当前信号量大于 0, 则当前信号量会减1, 然后直接返回. 如果为 0, 则会放到阻塞队列.acquire(permits)
: 获取指定个信号量资源release()
: 把当前信号量值增加 1.release(permits)
: 把当前信号量值增加 permits .