Java JUC 库完整指南
Java JUC 库完整指南
思维导图
Java JUC (java.util.concurrent)
├── 执行器框架 (Executor Framework)
│ ├── Executor 接口
│ ├── ExecutorService 接口
│ ├── ThreadPoolExecutor 类
│ ├── ScheduledExecutorService 接口
│ └── Executors 工厂类
│
├── 并发集合 (Concurrent Collections)
│ ├── ConcurrentHashMap - 线程安全的哈希表
│ ├── ConcurrentLinkedQueue - 无界线程安全队列
│ ├── ConcurrentSkipListMap - 线程安全的跳跃表
│ ├── CopyOnWriteArrayList - 写时复制数组列表
│ └── BlockingQueue 系列
│ ├── ArrayBlockingQueue - 有界阻塞队列
│ ├── LinkedBlockingQueue - 链式阻塞队列
│ └── PriorityBlockingQueue - 优先级阻塞队列
│
├── 同步工具 (Synchronization Utilities)
│ ├── CountDownLatch - 倒计时门闩
│ ├── CyclicBarrier - 循环屏障
│ ├── Semaphore - 信号量
│ ├── Exchanger - 交换器
│ └── Phaser - 移相器
│
├── 锁机制 (Lock Framework)
│ ├── Lock 接口
│ ├── ReentrantLock - 可重入锁
│ ├── ReadWriteLock 接口
│ ├── ReentrantReadWriteLock - 可重入读写锁
│ └── StampedLock - 邮戳锁
│
├── 原子类 (Atomic Classes)
│ ├── AtomicInteger/Long/Boolean
│ ├── AtomicReference
│ ├── AtomicIntegerArray
│ └── AtomicIntegerFieldUpdater
│
└── Future 和 异步编程
├── Future 接口
├── CompletableFuture 类
└── ForkJoinPool 框架
核心组件详解与示例
1. 执行器框架 (Executor Framework)
执行器框架是JUC的核心,提供了线程池管理和任务执行的抽象。
// 执行器框架示例
public class ExecutorExample {
// 创建固定大小线程池
public void fixedThreadPoolExample() {
ExecutorService executor = Executors.newFixedThreadPool(4);
// 提交任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务 " + taskId + " 在线程 " +
Thread.currentThread().getName() + " 中执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 优雅关闭
executor.shutdown();
}
// 定时任务示例
public void scheduledExecutorExample() {
ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(2);
// 延迟执行
scheduler.schedule(() ->
System.out.println("延迟3秒执行"), 3, TimeUnit.SECONDS);
// 周期性执行
scheduler.scheduleAtFixedRate(() ->
System.out.println("每2秒执行一次"),
0, 2, TimeUnit.SECONDS);
}
// 自定义线程池
public void customThreadPoolExample() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数
4, // 最大线程数
60L, // 空闲时间
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100), // 任务队列
new ThreadFactory() { // 线程工厂
private AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "自定义线程-" + counter.incrementAndGet());
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
}
}
2. 并发集合 (Concurrent Collections)
线程安全的集合类,提供高并发性能。
// 并发集合示例
public class ConcurrentCollectionExample {
// ConcurrentHashMap 示例
public void concurrentHashMapExample() {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 原子性操作
map.putIfAbsent("key1", 1);
map.compute("key2", (k, v) -> v == null ? 1 : v + 1);
map.merge("key3", 1, Integer::sum);
// 并行流操作
map.entrySet().parallelStream()
.filter(entry -> entry.getValue() > 0)
.forEach(System.out::println);
}
// BlockingQueue 生产者消费者模式
public void blockingQueueExample() {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
// 生产者
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 5; i++) {
queue.put("产品-" + i);
System.out.println("生产了: 产品-" + i);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 消费者
Thread consumer = new Thread(() -> {
try {
while (true) {
String product = queue.take();
System.out.println("消费了: " + product);
Thread.sleep(2000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
// CopyOnWriteArrayList 示例
public void copyOnWriteArrayListExample() {
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
// 适合读多写少的场景
list.add("元素1");
list.add("元素2");
// 迭代器不会抛出 ConcurrentModificationException
for (String item : list) {
System.out.println(item);
// 在迭代过程中修改集合是安全的
if ("元素1".equals(item)) {
list.add("新元素");
}
}
}
}
3. 同步工具 (Synchronization Utilities)
提供线程间协调的工具类。
// 同步工具示例
public class SynchronizationUtilsExample {
// CountDownLatch 示例 - 等待多个任务完成
public void countDownLatchExample() {
int taskCount = 3;
CountDownLatch latch = new CountDownLatch(taskCount);
for (int i = 0; i < taskCount; i++) {
final int taskId = i;
new Thread(() -> {
try {
System.out.println("任务 " + taskId + " 开始执行");
Thread.sleep(2000);
System.out.println("任务 " + taskId + " 执行完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 计数器减1
}
}).start();
}
try {
latch.await(); // 等待所有任务完成
System.out.println("所有任务执行完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// CyclicBarrier 示例 - 多线程同步执行
public void cyclicBarrierExample() {
int threadCount = 3;
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
System.out.println("所有线程都到达屏障点,开始下一阶段");
});
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
new Thread(() -> {
try {
System.out.println("线程 " + threadId + " 到达屏障点");
barrier.await(); // 等待其他线程
System.out.println("线程 " + threadId + " 继续执行");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
// Semaphore 示例 - 限制资源访问
public void semaphoreExample() {
Semaphore semaphore = new Semaphore(2); // 只允许2个线程同时访问
for (int i = 0; i < 5; i++) {
final int threadId = i;
new Thread(() -> {
try {
semaphore.acquire(); // 获取许可
System.out.println("线程 " + threadId + " 获得资源访问权限");
Thread.sleep(2000);
System.out.println("线程 " + threadId + " 释放资源");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release(); // 释放许可
}
}).start();
}
}
// Exchanger 示例 - 线程间数据交换
public void exchangerExample() {
Exchanger<String> exchanger = new Exchanger<>();
Thread thread1 = new Thread(() -> {
try {
String data = "来自线程1的数据";
String received = exchanger.exchange(data);
System.out.println("线程1发送: " + data + ", 接收: " + received);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread thread2 = new Thread(() -> {
try {
String data = "来自线程2的数据";
String received = exchanger.exchange(data);
System.out.println("线程2发送: " + data + ", 接收: " + received);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
thread1.start();
thread2.start();
}
}
4. 锁机制 (Lock Framework)
比synchronized更灵活的锁机制。
// 锁机制示例
public class LockExample {
private final ReentrantLock lock = new ReentrantLock();
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
private int count = 0;
// ReentrantLock 基本使用
public void reentrantLockExample() {
lock.lock();
try {
count++;
System.out.println("当前计数: " + count);
} finally {
lock.unlock();
}
}
// 尝试获取锁
public boolean tryLockExample() {
if (lock.tryLock()) {
try {
count++;
return true;
} finally {
lock.unlock();
}
}
return false;
}
// 带超时的锁获取
public boolean tryLockWithTimeoutExample() {
try {
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
count++;
return true;
} finally {
lock.unlock();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return false;
}
// 读写锁示例
public void readWriteLockExample() {
// 读操作
readLock.lock();
try {
System.out.println("读取数据: " + count);
} finally {
readLock.unlock();
}
// 写操作
writeLock.lock();
try {
count++;
System.out.println("写入数据: " + count);
} finally {
writeLock.unlock();
}
}
// StampedLock 示例 - 乐观读锁
private final StampedLock stampedLock = new StampedLock();
private double x, y;
public double distanceFromOrigin() {
long stamp = stampedLock.tryOptimisticRead(); // 乐观读
double currentX = x, currentY = y;
if (!stampedLock.validate(stamp)) { // 验证数据是否被修改
stamp = stampedLock.readLock(); // 获取悲观读锁
try {
currentX = x;
currentY = y;
} finally {
stampedLock.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
public void moveIfAtOrigin(double newX, double newY) {
long stamp = stampedLock.readLock();
try {
while (x == 0.0 && y == 0.0) {
long writeStamp = stampedLock.tryConvertToWriteLock(stamp);
if (writeStamp != 0L) {
stamp = writeStamp;
x = newX;
y = newY;
break;
} else {
stampedLock.unlockRead(stamp);
stamp = stampedLock.writeLock();
}
}
} finally {
stampedLock.unlock(stamp);
}
}
}
5. 原子类 (Atomic Classes)
提供无锁的原子操作。
// 原子类示例
public class AtomicExample {
private AtomicInteger counter = new AtomicInteger(0);
private AtomicReference<String> reference = new AtomicReference<>("初始值");
// 基本原子操作
public void basicAtomicOperations() {
// 原子递增
int newValue = counter.incrementAndGet();
System.out.println("递增后的值: " + newValue);
// 原子加法
int result = counter.addAndGet(5);
System.out.println("加5后的值: " + result);
// 比较并交换
boolean success = counter.compareAndSet(result, 100);
System.out.println("CAS操作成功: " + success);
}
// AtomicReference 示例
public void atomicReferenceExample() {
String oldValue = reference.get();
boolean updated = reference.compareAndSet(oldValue, "新值");
System.out.println("更新成功: " + updated);
// 使用 updateAndGet
String newValue = reference.updateAndGet(current -> current + "-更新");
System.out.println("更新后的值: " + newValue);
}
// 原子数组示例
public void atomicArrayExample() {
AtomicIntegerArray array = new AtomicIntegerArray(10);
// 并发更新数组元素
for (int i = 0; i < 10; i++) {
final int index = i;
new Thread(() -> {
for (int j = 0; j < 100; j++) {
array.incrementAndGet(index);
}
}).start();
}
}
// 原子字段更新器示例
static class Person {
volatile int age;
volatile String name;
private static final AtomicIntegerFieldUpdater<Person> ageUpdater =
AtomicIntegerFieldUpdater.newUpdater(Person.class, "age");
public void incrementAge() {
ageUpdater.incrementAndGet(this);
}
public boolean setAgeIfExpected(int expect, int update) {
return ageUpdater.compareAndSet(this, expect, update);
}
}
}
6. Future 和异步编程
处理异步计算结果。
// Future和异步编程示例
public class FutureExample {
// 基本Future使用
public void basicFutureExample() {
ExecutorService executor = Executors.newFixedThreadPool(2);
// 提交Callable任务
Future<Integer> future = executor.submit(() -> {
Thread.sleep(2000);
return 42;
});
try {
// 获取结果(阻塞)
Integer result = future.get(3, TimeUnit.SECONDS);
System.out.println("计算结果: " + result);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
executor.shutdown();
}
// CompletableFuture 示例
public void completableFutureExample() {
// 异步执行
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Hello";
})
.thenApply(result -> result + " World")
.thenApply(String::toUpperCase);
// 非阻塞回调
future.thenAccept(result ->
System.out.println("异步结果: " + result));
// 组合多个Future
CompletableFuture<String> future1 =
CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 =
CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combined = future1
.thenCombine(future2, (s1, s2) -> s1 + " " + s2);
combined.thenAccept(System.out::println);
}
// 异常处理
public void exceptionHandlingExample() {
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("随机异常");
}
return "成功";
})
.exceptionally(throwable -> {
System.out.println("捕获异常: " + throwable.getMessage());
return "默认值";
})
.handle((result, throwable) -> {
if (throwable != null) {
return "处理异常后的结果";
}
return result;
});
future.thenAccept(System.out::println);
}
// ForkJoinPool 示例
public void forkJoinPoolExample() {
ForkJoinPool pool = new ForkJoinPool();
// 计算数组和的任务
class SumTask extends RecursiveTask<Long> {
private final long[] array;
private final int start, end;
private static final int THRESHOLD = 1000;
public SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
// 直接计算
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
// 分割任务
int middle = (start + end) / 2;
SumTask leftTask = new SumTask(array, start, middle);
SumTask rightTask = new SumTask(array, middle, end);
leftTask.fork(); // 异步执行左半部分
long rightResult = rightTask.compute(); // 执行右半部分
long leftResult = leftTask.join(); // 等待左半部分结果
return leftResult + rightResult;
}
}
}
long[] array = new long[10000];
for (int i = 0; i < array.length; i++) {
array[i] = i + 1;
}
SumTask task = new SumTask(array, 0, array.length);
Long result = pool.invoke(task);
System.out.println("数组和: " + result);
pool.shutdown();
}
}
实际应用场景与最佳实践
1. 生产者消费者模式
public class ProducerConsumerPattern {
private final BlockingQueue<Task> taskQueue = new LinkedBlockingQueue<>(100);
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
private volatile boolean running = true;
public void startProducers(int producerCount) {
for (int i = 0; i < producerCount; i++) {
executorService.submit(() -> {
while (running) {
try {
Task task = generateTask();
taskQueue.put(task);
System.out.println("生产任务: " + task.getId());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
}
public void startConsumers(int consumerCount) {
for (int i = 0; i < consumerCount; i++) {
executorService.submit(() -> {
while (running) {
try {
Task task = taskQueue.take();
processTask(task);
System.out.println("处理任务: " + task.getId());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
}
private Task generateTask() {
return new Task("Task-" + System.currentTimeMillis());
}
private void processTask(Task task) {
// 模拟任务处理
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
static class Task {
private final String id;
public Task(String id) {
this.id = id;
}
public String getId() {
return id;
}
}
}
2. 缓存实现
public class ConcurrentCache<K, V> {
private final ConcurrentHashMap<K, V> cache = new ConcurrentHashMap<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Function<K, V> loader;
public ConcurrentCache(Function<K, V> loader) {
this.loader = loader;
}
public V get(K key) {
// 先尝试从缓存获取
V value = cache.get(key);
if (value != null) {
return value;
}
// 双重检查锁定
lock.writeLock().lock();
try {
value = cache.get(key);
if (value == null) {
value = loader.apply(key);
cache.put(key, value);
}
return value;
} finally {
lock.writeLock().unlock();
}
}
public void put(K key, V value) {
cache.put(key, value);
}
public void remove(K key) {
cache.remove(key);
}
public int size() {
return cache.size();
}
}
总结
JUC库的核心价值
- 高性能并发: 提供比传统synchronized更高效的并发控制机制
- 灵活性: 提供多种同步工具,适应不同的并发场景
- 安全性: 通过原子类和并发集合避免线程安全问题
- 可扩展性: 支持大规模并发应用的开发
选择指南
场景 | 推荐工具 | 原因 |
---|---|---|
线程池管理 | ThreadPoolExecutor | 灵活配置,高效执行 |
高并发读写 | ConcurrentHashMap | 分段锁,高并发性能 |
生产者消费者 | BlockingQueue | 自动阻塞,简化编程 |
等待多任务完成 | CountDownLatch | 一次性同步工具 |
多阶段同步 | CyclicBarrier | 可重用的同步屏障 |
资源限制 | Semaphore | 控制并发访问数量 |
无锁编程 | Atomic类 | 高性能原子操作 |
异步编程 | CompletableFuture | 丰富的组合操作 |
最佳实践
- 合理设置线程池参数: 根据任务特性调整核心线程数和最大线程数
- 正确处理中断: 在可中断的阻塞操作中正确响应中断
- 避免死锁: 使用tryLock和超时机制
- 资源清理: 及时关闭ExecutorService和释放资源
- 异常处理: 在异步任务中正确处理异常
- 性能监控: 监控线程池状态和队列大小
JUC库是Java并发编程的基础,掌握这些工具类能够帮助开发者构建高性能、可扩展的并发应用程序。