Java JUC 库完整指南

1

Java JUC 库完整指南

思维导图

Java JUC (java.util.concurrent)
├── 执行器框架 (Executor Framework)
│   ├── Executor 接口
│   ├── ExecutorService 接口  
│   ├── ThreadPoolExecutor 类
│   ├── ScheduledExecutorService 接口
│   └── Executors 工厂类
│
├── 并发集合 (Concurrent Collections)
│   ├── ConcurrentHashMap - 线程安全的哈希表
│   ├── ConcurrentLinkedQueue - 无界线程安全队列
│   ├── ConcurrentSkipListMap - 线程安全的跳跃表
│   ├── CopyOnWriteArrayList - 写时复制数组列表
│   └── BlockingQueue 系列
│       ├── ArrayBlockingQueue - 有界阻塞队列
│       ├── LinkedBlockingQueue - 链式阻塞队列
│       └── PriorityBlockingQueue - 优先级阻塞队列
│
├── 同步工具 (Synchronization Utilities)
│   ├── CountDownLatch - 倒计时门闩
│   ├── CyclicBarrier - 循环屏障
│   ├── Semaphore - 信号量
│   ├── Exchanger - 交换器
│   └── Phaser - 移相器
│
├── 锁机制 (Lock Framework)
│   ├── Lock 接口
│   ├── ReentrantLock - 可重入锁
│   ├── ReadWriteLock 接口
│   ├── ReentrantReadWriteLock - 可重入读写锁
│   └── StampedLock - 邮戳锁
│
├── 原子类 (Atomic Classes)
│   ├── AtomicInteger/Long/Boolean
│   ├── AtomicReference
│   ├── AtomicIntegerArray
│   └── AtomicIntegerFieldUpdater
│
└── Future 和 异步编程
    ├── Future 接口
    ├── CompletableFuture 类
    └── ForkJoinPool 框架

核心组件详解与示例

1. 执行器框架 (Executor Framework)

执行器框架是JUC的核心,提供了线程池管理和任务执行的抽象。

// 执行器框架示例
public class ExecutorExample {
    
    // 创建固定大小线程池
    public void fixedThreadPoolExample() {
        ExecutorService executor = Executors.newFixedThreadPool(4);
        
        // 提交任务
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("任务 " + taskId + " 在线程 " + 
                    Thread.currentThread().getName() + " 中执行");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        // 优雅关闭
        executor.shutdown();
    }
    
    // 定时任务示例
    public void scheduledExecutorExample() {
        ScheduledExecutorService scheduler = 
            Executors.newScheduledThreadPool(2);
        
        // 延迟执行
        scheduler.schedule(() -> 
            System.out.println("延迟3秒执行"), 3, TimeUnit.SECONDS);
        
        // 周期性执行
        scheduler.scheduleAtFixedRate(() -> 
            System.out.println("每2秒执行一次"), 
            0, 2, TimeUnit.SECONDS);
    }
    
    // 自定义线程池
    public void customThreadPoolExample() {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2,           // 核心线程数
            4,           // 最大线程数
            60L,         // 空闲时间
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(100), // 任务队列
            new ThreadFactory() {           // 线程工厂
                private AtomicInteger counter = new AtomicInteger(0);
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "自定义线程-" + counter.incrementAndGet());
                }
            },
            new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
        );
    }
}

2. 并发集合 (Concurrent Collections)

线程安全的集合类,提供高并发性能。

// 并发集合示例
public class ConcurrentCollectionExample {
    
    // ConcurrentHashMap 示例
    public void concurrentHashMapExample() {
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
        
        // 原子性操作
        map.putIfAbsent("key1", 1);
        map.compute("key2", (k, v) -> v == null ? 1 : v + 1);
        map.merge("key3", 1, Integer::sum);
        
        // 并行流操作
        map.entrySet().parallelStream()
           .filter(entry -> entry.getValue() > 0)
           .forEach(System.out::println);
    }
    
    // BlockingQueue 生产者消费者模式
    public void blockingQueueExample() {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
        
        // 生产者
        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 5; i++) {
                    queue.put("产品-" + i);
                    System.out.println("生产了: 产品-" + i);
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        // 消费者
        Thread consumer = new Thread(() -> {
            try {
                while (true) {
                    String product = queue.take();
                    System.out.println("消费了: " + product);
                    Thread.sleep(2000);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        producer.start();
        consumer.start();
    }
    
    // CopyOnWriteArrayList 示例
    public void copyOnWriteArrayListExample() {
        CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
        
        // 适合读多写少的场景
        list.add("元素1");
        list.add("元素2");
        
        // 迭代器不会抛出 ConcurrentModificationException
        for (String item : list) {
            System.out.println(item);
            // 在迭代过程中修改集合是安全的
            if ("元素1".equals(item)) {
                list.add("新元素");
            }
        }
    }
}

3. 同步工具 (Synchronization Utilities)

提供线程间协调的工具类。

// 同步工具示例
public class SynchronizationUtilsExample {
    
    // CountDownLatch 示例 - 等待多个任务完成
    public void countDownLatchExample() {
        int taskCount = 3;
        CountDownLatch latch = new CountDownLatch(taskCount);
        
        for (int i = 0; i < taskCount; i++) {
            final int taskId = i;
            new Thread(() -> {
                try {
                    System.out.println("任务 " + taskId + " 开始执行");
                    Thread.sleep(2000);
                    System.out.println("任务 " + taskId + " 执行完成");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    latch.countDown(); // 计数器减1
                }
            }).start();
        }
        
        try {
            latch.await(); // 等待所有任务完成
            System.out.println("所有任务执行完成");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    // CyclicBarrier 示例 - 多线程同步执行
    public void cyclicBarrierExample() {
        int threadCount = 3;
        CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
            System.out.println("所有线程都到达屏障点,开始下一阶段");
        });
        
        for (int i = 0; i < threadCount; i++) {
            final int threadId = i;
            new Thread(() -> {
                try {
                    System.out.println("线程 " + threadId + " 到达屏障点");
                    barrier.await(); // 等待其他线程
                    System.out.println("线程 " + threadId + " 继续执行");
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
    
    // Semaphore 示例 - 限制资源访问
    public void semaphoreExample() {
        Semaphore semaphore = new Semaphore(2); // 只允许2个线程同时访问
        
        for (int i = 0; i < 5; i++) {
            final int threadId = i;
            new Thread(() -> {
                try {
                    semaphore.acquire(); // 获取许可
                    System.out.println("线程 " + threadId + " 获得资源访问权限");
                    Thread.sleep(2000);
                    System.out.println("线程 " + threadId + " 释放资源");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    semaphore.release(); // 释放许可
                }
            }).start();
        }
    }
    
    // Exchanger 示例 - 线程间数据交换
    public void exchangerExample() {
        Exchanger<String> exchanger = new Exchanger<>();
        
        Thread thread1 = new Thread(() -> {
            try {
                String data = "来自线程1的数据";
                String received = exchanger.exchange(data);
                System.out.println("线程1发送: " + data + ", 接收: " + received);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        Thread thread2 = new Thread(() -> {
            try {
                String data = "来自线程2的数据";
                String received = exchanger.exchange(data);
                System.out.println("线程2发送: " + data + ", 接收: " + received);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        thread1.start();
        thread2.start();
    }
}

4. 锁机制 (Lock Framework)

比synchronized更灵活的锁机制。

// 锁机制示例
public class LockExample {
    private final ReentrantLock lock = new ReentrantLock();
    private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final Lock readLock = rwLock.readLock();
    private final Lock writeLock = rwLock.writeLock();
    private int count = 0;
    
    // ReentrantLock 基本使用
    public void reentrantLockExample() {
        lock.lock();
        try {
            count++;
            System.out.println("当前计数: " + count);
        } finally {
            lock.unlock();
        }
    }
    
    // 尝试获取锁
    public boolean tryLockExample() {
        if (lock.tryLock()) {
            try {
                count++;
                return true;
            } finally {
                lock.unlock();
            }
        }
        return false;
    }
    
    // 带超时的锁获取
    public boolean tryLockWithTimeoutExample() {
        try {
            if (lock.tryLock(1, TimeUnit.SECONDS)) {
                try {
                    count++;
                    return true;
                } finally {
                    lock.unlock();
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return false;
    }
    
    // 读写锁示例
    public void readWriteLockExample() {
        // 读操作
        readLock.lock();
        try {
            System.out.println("读取数据: " + count);
        } finally {
            readLock.unlock();
        }
        
        // 写操作
        writeLock.lock();
        try {
            count++;
            System.out.println("写入数据: " + count);
        } finally {
            writeLock.unlock();
        }
    }
    
    // StampedLock 示例 - 乐观读锁
    private final StampedLock stampedLock = new StampedLock();
    private double x, y;
    
    public double distanceFromOrigin() {
        long stamp = stampedLock.tryOptimisticRead(); // 乐观读
        double currentX = x, currentY = y;
        
        if (!stampedLock.validate(stamp)) { // 验证数据是否被修改
            stamp = stampedLock.readLock(); // 获取悲观读锁
            try {
                currentX = x;
                currentY = y;
            } finally {
                stampedLock.unlockRead(stamp);
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }
    
    public void moveIfAtOrigin(double newX, double newY) {
        long stamp = stampedLock.readLock();
        try {
            while (x == 0.0 && y == 0.0) {
                long writeStamp = stampedLock.tryConvertToWriteLock(stamp);
                if (writeStamp != 0L) {
                    stamp = writeStamp;
                    x = newX;
                    y = newY;
                    break;
                } else {
                    stampedLock.unlockRead(stamp);
                    stamp = stampedLock.writeLock();
                }
            }
        } finally {
            stampedLock.unlock(stamp);
        }
    }
}

5. 原子类 (Atomic Classes)

提供无锁的原子操作。

// 原子类示例
public class AtomicExample {
    private AtomicInteger counter = new AtomicInteger(0);
    private AtomicReference<String> reference = new AtomicReference<>("初始值");
    
    // 基本原子操作
    public void basicAtomicOperations() {
        // 原子递增
        int newValue = counter.incrementAndGet();
        System.out.println("递增后的值: " + newValue);
        
        // 原子加法
        int result = counter.addAndGet(5);
        System.out.println("加5后的值: " + result);
        
        // 比较并交换
        boolean success = counter.compareAndSet(result, 100);
        System.out.println("CAS操作成功: " + success);
    }
    
    // AtomicReference 示例
    public void atomicReferenceExample() {
        String oldValue = reference.get();
        boolean updated = reference.compareAndSet(oldValue, "新值");
        System.out.println("更新成功: " + updated);
        
        // 使用 updateAndGet
        String newValue = reference.updateAndGet(current -> current + "-更新");
        System.out.println("更新后的值: " + newValue);
    }
    
    // 原子数组示例
    public void atomicArrayExample() {
        AtomicIntegerArray array = new AtomicIntegerArray(10);
        
        // 并发更新数组元素
        for (int i = 0; i < 10; i++) {
            final int index = i;
            new Thread(() -> {
                for (int j = 0; j < 100; j++) {
                    array.incrementAndGet(index);
                }
            }).start();
        }
    }
    
    // 原子字段更新器示例
    static class Person {
        volatile int age;
        volatile String name;
        
        private static final AtomicIntegerFieldUpdater<Person> ageUpdater =
            AtomicIntegerFieldUpdater.newUpdater(Person.class, "age");
        
        public void incrementAge() {
            ageUpdater.incrementAndGet(this);
        }
        
        public boolean setAgeIfExpected(int expect, int update) {
            return ageUpdater.compareAndSet(this, expect, update);
        }
    }
}

6. Future 和异步编程

处理异步计算结果。

// Future和异步编程示例
public class FutureExample {
    
    // 基本Future使用
    public void basicFutureExample() {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        
        // 提交Callable任务
        Future<Integer> future = executor.submit(() -> {
            Thread.sleep(2000);
            return 42;
        });
        
        try {
            // 获取结果(阻塞)
            Integer result = future.get(3, TimeUnit.SECONDS);
            System.out.println("计算结果: " + result);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            e.printStackTrace();
        }
        
        executor.shutdown();
    }
    
    // CompletableFuture 示例
    public void completableFutureExample() {
        // 异步执行
        CompletableFuture<String> future = CompletableFuture
            .supplyAsync(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return "Hello";
            })
            .thenApply(result -> result + " World")
            .thenApply(String::toUpperCase);
        
        // 非阻塞回调
        future.thenAccept(result -> 
            System.out.println("异步结果: " + result));
        
        // 组合多个Future
        CompletableFuture<String> future1 = 
            CompletableFuture.supplyAsync(() -> "Hello");
        CompletableFuture<String> future2 = 
            CompletableFuture.supplyAsync(() -> "World");
        
        CompletableFuture<String> combined = future1
            .thenCombine(future2, (s1, s2) -> s1 + " " + s2);
        
        combined.thenAccept(System.out::println);
    }
    
    // 异常处理
    public void exceptionHandlingExample() {
        CompletableFuture<String> future = CompletableFuture
            .supplyAsync(() -> {
                if (Math.random() > 0.5) {
                    throw new RuntimeException("随机异常");
                }
                return "成功";
            })
            .exceptionally(throwable -> {
                System.out.println("捕获异常: " + throwable.getMessage());
                return "默认值";
            })
            .handle((result, throwable) -> {
                if (throwable != null) {
                    return "处理异常后的结果";
                }
                return result;
            });
        
        future.thenAccept(System.out::println);
    }
    
    // ForkJoinPool 示例
    public void forkJoinPoolExample() {
        ForkJoinPool pool = new ForkJoinPool();
        
        // 计算数组和的任务
        class SumTask extends RecursiveTask<Long> {
            private final long[] array;
            private final int start, end;
            private static final int THRESHOLD = 1000;
            
            public SumTask(long[] array, int start, int end) {
                this.array = array;
                this.start = start;
                this.end = end;
            }
            
            @Override
            protected Long compute() {
                if (end - start <= THRESHOLD) {
                    // 直接计算
                    long sum = 0;
                    for (int i = start; i < end; i++) {
                        sum += array[i];
                    }
                    return sum;
                } else {
                    // 分割任务
                    int middle = (start + end) / 2;
                    SumTask leftTask = new SumTask(array, start, middle);
                    SumTask rightTask = new SumTask(array, middle, end);
                    
                    leftTask.fork(); // 异步执行左半部分
                    long rightResult = rightTask.compute(); // 执行右半部分
                    long leftResult = leftTask.join(); // 等待左半部分结果
                    
                    return leftResult + rightResult;
                }
            }
        }
        
        long[] array = new long[10000];
        for (int i = 0; i < array.length; i++) {
            array[i] = i + 1;
        }
        
        SumTask task = new SumTask(array, 0, array.length);
        Long result = pool.invoke(task);
        System.out.println("数组和: " + result);
        
        pool.shutdown();
    }
}

实际应用场景与最佳实践

1. 生产者消费者模式

public class ProducerConsumerPattern {
    private final BlockingQueue<Task> taskQueue = new LinkedBlockingQueue<>(100);
    private final ExecutorService executorService = Executors.newFixedThreadPool(10);
    private volatile boolean running = true;
    
    public void startProducers(int producerCount) {
        for (int i = 0; i < producerCount; i++) {
            executorService.submit(() -> {
                while (running) {
                    try {
                        Task task = generateTask();
                        taskQueue.put(task);
                        System.out.println("生产任务: " + task.getId());
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            });
        }
    }
    
    public void startConsumers(int consumerCount) {
        for (int i = 0; i < consumerCount; i++) {
            executorService.submit(() -> {
                while (running) {
                    try {
                        Task task = taskQueue.take();
                        processTask(task);
                        System.out.println("处理任务: " + task.getId());
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            });
        }
    }
    
    private Task generateTask() {
        return new Task("Task-" + System.currentTimeMillis());
    }
    
    private void processTask(Task task) {
        // 模拟任务处理
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    static class Task {
        private final String id;
        
        public Task(String id) {
            this.id = id;
        }
        
        public String getId() {
            return id;
        }
    }
}

2. 缓存实现

public class ConcurrentCache<K, V> {
    private final ConcurrentHashMap<K, V> cache = new ConcurrentHashMap<>();
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final Function<K, V> loader;
    
    public ConcurrentCache(Function<K, V> loader) {
        this.loader = loader;
    }
    
    public V get(K key) {
        // 先尝试从缓存获取
        V value = cache.get(key);
        if (value != null) {
            return value;
        }
        
        // 双重检查锁定
        lock.writeLock().lock();
        try {
            value = cache.get(key);
            if (value == null) {
                value = loader.apply(key);
                cache.put(key, value);
            }
            return value;
        } finally {
            lock.writeLock().unlock();
        }
    }
    
    public void put(K key, V value) {
        cache.put(key, value);
    }
    
    public void remove(K key) {
        cache.remove(key);
    }
    
    public int size() {
        return cache.size();
    }
}

总结

JUC库的核心价值

  1. 高性能并发: 提供比传统synchronized更高效的并发控制机制
  2. 灵活性: 提供多种同步工具,适应不同的并发场景
  3. 安全性: 通过原子类和并发集合避免线程安全问题
  4. 可扩展性: 支持大规模并发应用的开发

选择指南

场景推荐工具原因
线程池管理ThreadPoolExecutor灵活配置,高效执行
高并发读写ConcurrentHashMap分段锁,高并发性能
生产者消费者BlockingQueue自动阻塞,简化编程
等待多任务完成CountDownLatch一次性同步工具
多阶段同步CyclicBarrier可重用的同步屏障
资源限制Semaphore控制并发访问数量
无锁编程Atomic类高性能原子操作
异步编程CompletableFuture丰富的组合操作

最佳实践

  1. 合理设置线程池参数: 根据任务特性调整核心线程数和最大线程数
  2. 正确处理中断: 在可中断的阻塞操作中正确响应中断
  3. 避免死锁: 使用tryLock和超时机制
  4. 资源清理: 及时关闭ExecutorService和释放资源
  5. 异常处理: 在异步任务中正确处理异常
  6. 性能监控: 监控线程池状态和队列大小

JUC库是Java并发编程的基础,掌握这些工具类能够帮助开发者构建高性能、可扩展的并发应用程序。