Java并发编程基础:从线程管理到高并发应用实践

本篇主要是多线程的基础知识,代码示例较多,有时间的可以逐个分析,具体细节都放在代码注释中了。

1. 理解线程:多任务执行的基石

1.1 什么是线程?

在现代操作系统中,进程是资源分配的基本单位,而线程是CPU调度的最小单位。可以把进程想象成一家公司,线程就是公司里的员工。

/**  * 演示Java程序天生就是多线程程序  * 即使最简单的main方法也会启动多个系统线程  */ public class MultiThread {     public static void main(String[] args) {         // 获取Java线程管理MXBean         ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();         // 不需要获取同步的monitor和synchronizer信息,仅获取线程和线程堆栈信息         ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false);                  // 遍历线程信息         System.out.println("=== Java程序启动的线程列表 ===");         for (ThreadInfo threadInfo : threadInfos) {             System.out.println("[" + threadInfo.getThreadId() + "] " +                               threadInfo.getThreadName());         }     } } 

输出示例:

=== Java程序启动的线程列表 === [4] Signal Dispatcher    // 分发处理发送给JVM信号的线程 [3] Finalizer           // 调用对象finalize方法的线程   [2] Reference Handler   // 清除Reference的线程 [1] main               // main线程,用户程序入口 

1.2 为什么需要多线程?

三大核心优势

  1. 充分利用多核处理器 - 避免CPU资源闲置
  2. 提升响应速度 - 后台任务不阻塞用户操作
  3. 更好的编程模型 - Java提供一致的多线程API

1.3 线程状态生命周期

新建(NEW) → 可运行(RUNNABLE) → 运行中     ↓ 超时等待(TIMED_WAITING) ← 等待(WAITING) ← 阻塞(BLOCKED)     ↓     终止(TERMINATED) 

2. 线程的启动与安全终止

2.1 正确启动线程

/**  * 线程启动最佳实践示例  * 重点:设置有意义的线程名称,合理设置守护线程标志  */ public class ThreadStartExample {     public static void main(String[] args) {         // 推荐:为线程设置有意义的名称,便于问题排查         Thread worker = new Thread(new Task(), "Data-Processor-1");         worker.setDaemon(false); // 明确设置是否为守护线程         worker.start(); // 正确启动方式,不要直接调用run()                  System.out.println("主线程继续执行,不会等待worker线程");     }          static class Task implements Runnable {         @Override         public void run() {             System.out.println(Thread.currentThread().getName() + " 开始执行");             try {                 // 模拟工作任务                 Thread.sleep(1000);             } catch (InterruptedException e) {                 System.out.println("任务被中断");             }             System.out.println(Thread.currentThread().getName() + " 执行完成");         }     } } 

2.2 安全终止线程的两种方式

方式一:使用中断机制

/**  * 使用中断机制安全终止线程  * 重点:理解中断异常处理的最佳实践  */ public class InterruptExample {     public static void main(String[] args) throws InterruptedException {         Thread worker = new Thread(new InterruptibleTask(), "Interruptible-Worker");         worker.start();                  // 主线程等待2秒后中断工作线程         TimeUnit.SECONDS.sleep(2);         System.out.println("主线程发送中断信号");         worker.interrupt(); // 发送中断信号                  // 等待工作线程完全退出         worker.join();         System.out.println("工作线程已安全退出");     }          static class InterruptibleTask implements Runnable {         @Override         public void run() {             while (!Thread.currentThread().isInterrupted()) {                 try {                     // 模拟工作 - 这里可能抛出InterruptedException                     System.out.println("Working...");                     TimeUnit.MILLISECONDS.sleep(500);                 } catch (InterruptedException e) {                     /**                      * 关键理解点:为什么需要重新设置中断状态?                      *                       * 当线程在阻塞状态(如sleep、wait、join)时被中断,                      * Java会做两件事:                      * 1. 抛出InterruptedException                      * 2. 清除线程的中断状态(设为false)                      *                       * 这导致循环条件 !Thread.currentThread().isInterrupted()                       * 会继续为true,线程无法退出。                      *                       * 因此我们需要在捕获异常后重新设置中断状态,                      * 这样循环条件就能检测到中断,安全退出。                      */                     System.out.println("捕获到中断异常,重新设置中断状态");                     Thread.currentThread().interrupt(); // 重新设置中断标志                 }             }             System.out.println("线程安全退出,中断状态: " +                  Thread.currentThread().isInterrupted());         }     } } 

方式二:使用标志位

/**  * 使用volatile标志位安全终止线程  * 适用于没有阻塞调用或需要更复杂退出逻辑的场景  */ public class FlagShutdownExample {     // volatile保证可见性,确保所有线程看到最新的值     private volatile boolean running = true;     private final Thread workerThread;          public FlagShutdownExample() {         this.workerThread = new Thread(this::doWork, "Flag-Controlled-Worker");     }          public void start() {         workerThread.start();     }          /**      * 优雅停止工作线程      */     public void stop() {         System.out.println("请求停止工作线程");         running = false;         // 同时发送中断,处理可能存在的阻塞情况         workerThread.interrupt();     }          /**      * 工作线程的主循环      * 同时检查标志位和中断状态,提供双重保障      */     private void doWork() {         try {             while (running && !Thread.currentThread().isInterrupted()) {                 // 执行工作任务                 processData();             }         } finally {             // 无论何种方式退出,都执行清理工作             cleanup();         }         System.out.println("工作线程已安全退出");     }          private void processData() {         try {             // 模拟数据处理             System.out.println("处理数据中...");             Thread.sleep(300);         } catch (InterruptedException e) {             System.out.println("处理数据时被中断");             // 收到中断,但可能还想继续处理,所以不重新设置中断             // 让循环条件来检查running标志         }     }          private void cleanup() {         System.out.println("执行资源清理工作...");         // 关闭文件、数据库连接等资源     }          public static void main(String[] args) throws InterruptedException {         FlagShutdownExample example = new FlagShutdownExample();         example.start();                  // 运行3秒后停止         Thread.sleep(3000);         example.stop();                  // 等待工作线程退出         example.workerThread.join();     } } 

3. 线程间通信:协作的艺术

3.1 volatile关键字:共享状态可见性

/**  * volatile关键字示例  * 保证多线程间的可见性,但不保证原子性  */ public class VolatileExample {     // volatile确保shutdownRequested的修改对所有线程立即可见     private volatile boolean shutdownRequested = false;     private int operationCount = 0; // 非volatile,不保证可见性          public void shutdown() {         shutdownRequested = true; // 所有线程立即可见         System.out.println("关闭请求已设置");     }          public void doWork() {         while (!shutdownRequested) {             // 正常工作循环             operationCount++; // 非原子操作,可能有问题             try {                 Thread.sleep(100);             } catch (InterruptedException e) {                 System.out.println("工作被中断");                 Thread.currentThread().interrupt();                 break;             }         }         System.out.println("工作线程退出,操作次数: " + operationCount);     } } 

3.2 synchronized关键字:互斥访问

/**  * synchronized关键字示例  * 保证原子性和可见性,但可能影响性能  */ public class SynchronizedCounter {     private int count = 0;          /**      * 同步方法 - 锁对象是当前实例(this)      */     public synchronized void increment() {         count++; // 原子操作     }          /**      * 同步块 - 可以更细粒度控制锁的范围      */     public void decrement() {         // 只同步关键部分,减少锁持有时间         synchronized (this) {             count--;         }         // 这里可以执行非同步操作     }          /**      * 同步的get方法,保证看到最新值      */     public synchronized int getCount() {         return count;     }          /**      * 静态同步方法 - 锁对象是类的Class对象      */     public static synchronized void staticMethod() {         // 静态同步方法使用Class对象作为锁     } } 

3.3 等待/通知机制:经典生产者-消费者模式

/**  * 生产者-消费者模式示例  * 演示wait/notify机制的正确使用  */ public class WaitNotifyExample {     private final Object lock = new Object(); // 共享锁对象     private final Queue<String> queue = new LinkedList<>();     private final int MAX_SIZE = 5;          /**      * 生产者方法      */     public void produce(String data) throws InterruptedException {         synchronized (lock) {             // 必须使用while循环检查条件,避免虚假唤醒             while (queue.size() >= MAX_SIZE) {                 System.out.println("队列已满(" + queue.size() + "),生产者等待");                 lock.wait(); // 释放锁并等待             }                          queue.offer(data);             System.out.println("生产: " + data + ",队列大小: " + queue.size());                          // 通知所有等待的消费者             lock.notifyAll();         }     }          /**      * 消费者方法      */     public String consume() throws InterruptedException {         synchronized (lock) {             // 必须使用while循环检查条件             while (queue.isEmpty()) {                 System.out.println("队列为空,消费者等待");                 lock.wait(); // 释放锁并等待             }                          String data = queue.poll();             System.out.println("消费: " + data + ",队列大小: " + queue.size());                          // 通知所有等待的生产者             lock.notifyAll();             return data;         }     }          /**      * 测试生产者消费者模式      */     public static void main(String[] args) {         WaitNotifyExample example = new WaitNotifyExample();                  // 启动生产者线程         Thread producer = new Thread(() -> {             try {                 for (int i = 0; i < 10; i++) {                     example.produce("Data-" + i);                     Thread.sleep(200);                 }             } catch (InterruptedException e) {                 Thread.currentThread().interrupt();             }         }, "Producer");                  // 启动消费者线程         Thread consumer = new Thread(() -> {             try {                 for (int i = 0; i < 10; i++) {                     example.consume();                     Thread.sleep(300);                 }             } catch (InterruptedException e) {                 Thread.currentThread().interrupt();             }         }, "Consumer");                  producer.start();         consumer.start();     } } 

等待/通知经典范式

// 消费者范式 - 永远在循环中调用wait() synchronized(锁对象) {     while(条件不满足) {         锁对象.wait();  // 等待时会释放锁     }     // 条件满足,处理业务逻辑 }  // 生产者范式   synchronized(锁对象) {     改变条件;           // 改变等待条件     锁对象.notifyAll(); // 通知所有等待线程 } 

3.4 Thread.join():线程依赖执行

/**  * Thread.join()使用示例  * 实现线程间的顺序执行依赖  */ public class JoinExample {     public static void main(String[] args) throws InterruptedException {         System.out.println("主线程开始");                  Thread previous = Thread.currentThread();                  // 创建5个有依赖关系的线程         for (int i = 0; i < 5; i++) {             Thread thread = new Thread(new DependentTask(previous), "Worker-" + i);             thread.start();             previous = thread; // 设置依赖链         }                  // 主线程先做一些工作         TimeUnit.SECONDS.sleep(1);         System.out.println(Thread.currentThread().getName() + " 完成初始化工作");                  // 等待所有线程完成(实际上由最后一个Worker-4 join主线程)     }          static class DependentTask implements Runnable {         private final Thread dependency; // 依赖的线程                  public DependentTask(Thread dependency) {             this.dependency = dependency;         }                  @Override         public void run() {             try {                 // 等待依赖的线程执行完成                 System.out.println(Thread.currentThread().getName() + " 等待 " + dependency.getName());                 dependency.join();                                  // 依赖线程完成后开始自己的工作                 System.out.println(Thread.currentThread().getName() + " 开始工作");                 TimeUnit.MILLISECONDS.sleep(500); // 模拟工作                 System.out.println(Thread.currentThread().getName() + " 完成工作");                              } catch (InterruptedException e) {                 System.out.println(Thread.currentThread().getName() + " 被中断");                 Thread.currentThread().interrupt();             }         }     } } 

3.5 ThreadLocal深入解析:线程局部变量

/**  * ThreadLocal深度解析  * 理解原理、使用场景和内存泄漏防护  */ public class ThreadLocalExample {          /**      * ThreadLocal基本使用:每个线程独立的SimpleDateFormat      * 避免SimpleDateFormat的线程安全问题      */     private static final ThreadLocal<SimpleDateFormat> DATE_FORMATTER =         ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));          /**      * ThreadLocal用于用户上下文传递      * 在Web应用中非常有用,避免在方法参数中传递用户信息      */     private static final ThreadLocal<UserContext> USER_CONTEXT =          new ThreadLocal<>();          /**      * ThreadLocal用于事务上下文      */     private static final ThreadLocal<TransactionContext> TRANSACTION_CONTEXT =         new ThreadLocal<>();          /**      * 可继承的ThreadLocal:子线程可以继承父线程的值      */     private static final InheritableThreadLocal<String> INHERITABLE_CONTEXT =         new InheritableThreadLocal<>();          /**      * 处理用户请求的示例方法      */     public void processRequest(User user) {         // 设置用户上下文到当前线程         USER_CONTEXT.set(new UserContext(user));                  try {             // 使用线程安全的日期格式化             String timestamp = DATE_FORMATTER.get().format(new Date());             System.out.println(Thread.currentThread().getName() +                  " - 用户: " + user.getName() + ", 时间: " + timestamp);                          // 执行业务逻辑 - 任何方法都可以获取用户上下文,无需传递参数             doBusinessLogic();                      } finally {             /**              * 关键:必须清理ThreadLocal,防止内存泄漏!              *               * 原因:              * 1. ThreadLocalMap的key是弱引用,会被GC回收              * 2. 但value是强引用,不会被自动回收              * 3. 如果线程长时间存活(如线程池中的线程),会导致value无法释放              * 4. 调用remove()方法显式清理              */             USER_CONTEXT.remove();             DATE_FORMATTER.remove(); // 清理所有使用的ThreadLocal         }     }          private void doBusinessLogic() {         // 在任何地方都可以获取用户上下文,无需方法参数传递         UserContext context = USER_CONTEXT.get();         if (context != null) {             System.out.println("执行业务逻辑,用户: " + context.getUser().getName());         }                  // 使用线程安全的日期格式化         String now = DATE_FORMATTER.get().format(new Date());         System.out.println("业务执行时间: " + now);     }          /**      * 演示ThreadLocal的内存泄漏问题      */     public void demonstrateMemoryLeak() {         // 错误的用法:不清理ThreadLocal         ThreadLocal<byte[]> leakyLocal = new ThreadLocal<>();         leakyLocal.set(new byte[1024 * 1024]); // 1MB数据                  // 如果没有调用 leakyLocal.remove(), 即使leakyLocal=null,         // 线程的ThreadLocalMap中仍然保留着这个Entry         // 在线程池场景下,线程重用会导致内存不断增长     }          /**      * ThreadLocal最佳实践:使用try-finally确保清理      */     public void bestPractice(User user) {         USER_CONTEXT.set(new UserContext(user));         try {             // 业务处理             doBusinessLogic();         } finally {             // 确保清理,即使在业务逻辑中发生异常             USER_CONTEXT.remove();         }     }          /**      * 测试多线程环境下的ThreadLocal      */     public static void main(String[] args) throws InterruptedException {         ThreadLocalExample example = new ThreadLocalExample();                  // 创建多个线程,每个线程有独立的ThreadLocal值         Thread[] threads = new Thread[3];         for (int i = 0; i < threads.length; i++) {             final int userId = i;             threads[i] = new Thread(() -> {                 User user = new User("User-" + userId);                 example.processRequest(user);             }, "Thread-" + i);                          threads[i].start();         }                  // 等待所有线程完成         for (Thread thread : threads) {             thread.join();         }                  System.out.println("所有线程执行完成");     }          // 辅助类定义     static class UserContext {         private final User user;         public UserContext(User user) { this.user = user; }         public User getUser() { return user; }     }          static class User {         private final String name;         public User(String name) { this.name = name; }         public String getName() { return name; }     }          static class TransactionContext {         // 事务相关信息     } }  /**  * ThreadLocal高级用法:自定义ThreadLocal子类  */ class AdvancedThreadLocal<T> extends ThreadLocal<T> {          /**      * 初始值 - 当线程第一次调用get()时,如果还没有设置值,会调用此方法      */     @Override     protected T initialValue() {         System.out.println(Thread.currentThread().getName() + " - 初始化ThreadLocal值");         return null; // 返回默认初始值     }          /**      * 子线程值继承 - 仅对InheritableThreadLocal有效      * 当创建新线程时,可以控制如何从父线程继承值      */     protected T childValue(T parentValue) {         System.out.println("子线程继承父线程的值: " + parentValue);         return parentValue; // 直接继承,也可以进行转换     } } 

4. 线程应用实例:从理论到实践

4.1 等待超时模式:避免无限期等待

/**  * 等待超时模式实现  * 在等待/通知机制基础上增加超时控制  */ public class TimeoutWait<T> {     private T result;          /**      * 带超时的获取方法      * @param timeoutMs 超时时间(毫秒)      * @return 结果,超时返回null      */     public synchronized T get(long timeoutMs) throws InterruptedException {         long endTime = System.currentTimeMillis() + timeoutMs;         long remaining = timeoutMs;                  // 循环检查条件和剩余时间         while (result == null && remaining > 0) {             wait(remaining); // 等待剩余时间             remaining = endTime - System.currentTimeMillis(); // 更新剩余时间         }                  return result; // 可能为null(超时)     }          /**      * 设置结果并通知所有等待线程      */     public synchronized void set(T value) {         this.result = value;         notifyAll(); // 通知所有等待的线程     }          /**      * 演示超时等待的使用      */     public static void main(String[] args) throws InterruptedException {         TimeoutWait<String> waitObject = new TimeoutWait<>();                  // 消费者线程 - 等待结果,最多等3秒         Thread consumer = new Thread(() -> {             try {                 System.out.println("消费者开始等待结果...");                 String result = waitObject.get(3000);                 if (result != null) {                     System.out.println("消费者收到结果: " + result);                 } else {                     System.out.println("消费者等待超时");                 }             } catch (InterruptedException e) {                 System.out.println("消费者被中断");             }         });                  // 生产者线程 - 2秒后产生结果         Thread producer = new Thread(() -> {             try {                 Thread.sleep(2000); // 模拟生产耗时                 waitObject.set("生产完成的数据");                 System.out.println("生产者完成工作");             } catch (InterruptedException e) {                 System.out.println("生产者被中断");             }         });                  consumer.start();         producer.start();                  consumer.join();         producer.join();     } } 

4.2 数据库连接池实现

/**  * 简易数据库连接池实现  * 演示资源池化和等待超时模式的实际应用  */ public class SimpleConnectionPool {     private final LinkedList<Connection> pool = new LinkedList<>();     private final int maxSize;     private int createdCount = 0;          public SimpleConnectionPool(int initialSize, int maxSize) {         this.maxSize = maxSize;         // 初始化连接池         for (int i = 0; i < initialSize; i++) {             pool.add(createConnection());         }         System.out.println("连接池初始化完成,初始连接数: " + initialSize);     }          /**      * 获取连接,支持超时      */     public Connection getConnection(long timeoutMs) throws InterruptedException, TimeoutException {         synchronized (pool) {             // 如果池中有可用连接,立即返回             if (!pool.isEmpty()) {                 return pool.removeFirst();             }                          // 池为空,但还可以创建新连接             if (createdCount < maxSize) {                 Connection conn = createConnection();                 System.out.println("创建新连接,当前连接数: " + createdCount);                 return conn;             }                          // 等待可用连接             long endTime = System.currentTimeMillis() + timeoutMs;             long remaining = timeoutMs;                          while (pool.isEmpty() && remaining > 0) {                 System.out.println(Thread.currentThread().getName() + " 等待连接,剩余时间: " + remaining + "ms");                 pool.wait(remaining);                 remaining = endTime - System.currentTimeMillis();             }                          if (!pool.isEmpty()) {                 return pool.removeFirst();             }             throw new TimeoutException("获取连接超时,等待 " + timeoutMs + "ms");         }     }          /**      * 归还连接到池中      */     public void releaseConnection(Connection conn) {         if (conn != null) {             synchronized (pool) {                 if (pool.size() < maxSize) {                     pool.addLast(conn);                     pool.notifyAll(); // 通知等待的线程                     System.out.println("连接已归还,当前池大小: " + pool.size());                 } else {                     // 连接数超过上限,关闭连接                     closeConnection(conn);                     createdCount--;                     System.out.println("连接池已满,关闭连接");                 }             }         }     }          /**      * 创建新连接      */     private Connection createConnection() {         createdCount++;         // 这里应该是真实的数据库连接创建逻辑         System.out.println("创建第 " + createdCount + " 个连接");         return new MockConnection();     }          /**      * 关闭连接      */     private void closeConnection(Connection conn) {         try {             conn.close();         } catch (Exception e) {             System.err.println("关闭连接失败: " + e.getMessage());         }     }          /**      * 获取连接池状态      */     public synchronized void printStatus() {         System.out.println("连接池状态 - 池中连接: " + pool.size() +                            ", 总创建数: " + createdCount +                            ", 最大限制: " + maxSize);     }          // 模拟数据库连接     static class MockConnection implements Connection {         private final String id = UUID.randomUUID().toString().substring(0, 8);                  @Override         public void close() {             System.out.println("关闭连接: " + id);         }                  @Override         public String toString() {             return "MockConnection{" + "id='" + id + ''' + '}';         }                  // 其他Connection接口方法...         @Override public void commit() {}         @Override public void rollback() {}         // ... 简化实现     }          static class TimeoutException extends Exception {         public TimeoutException(String message) { super(message); }     } } 

4.3 线程池核心技术实现

/**  * 简易线程池实现  * 理解线程池的核心原理和工作机制  */ public class SimpleThreadPool implements Executor {     private final BlockingQueue<Runnable> workQueue;     private final List<WorkerThread> workers;     private volatile boolean isShutdown = false;     private final int poolSize;          /**      * 创建线程池      */     public SimpleThreadPool(int poolSize) {         this.poolSize = poolSize;         this.workQueue = new LinkedBlockingQueue<>();         this.workers = new ArrayList<>(poolSize);                  System.out.println("初始化线程池,大小: " + poolSize);                  // 创建工作线程         for (int i = 0; i < poolSize; i++) {             WorkerThread worker = new WorkerThread("Pool-Worker-" + i);             workers.add(worker);             worker.start();         }     }          /**      * 提交任务到线程池      */     @Override     public void execute(Runnable task) {         if (isShutdown) {             throw new RejectedExecutionException("线程池已关闭,拒绝新任务");         }                  if (task == null) {             throw new NullPointerException("任务不能为null");         }                  try {             workQueue.put(task); // 阻塞直到有空间             System.out.println("任务已提交,队列大小: " + workQueue.size());         } catch (InterruptedException e) {             Thread.currentThread().interrupt();             throw new RejectedExecutionException("提交任务时被中断", e);         }     }          /**      * 优雅关闭线程池      */     public void shutdown() {         System.out.println("开始关闭线程池...");         isShutdown = true;                  // 中断所有工作线程         for (WorkerThread worker : workers) {             worker.interrupt();         }     }          /**      * 强制关闭线程池      */     public void shutdownNow() {         shutdown();         workQueue.clear(); // 清空等待队列     }          /**      * 等待线程池完全终止      */     public boolean awaitTermination(long timeout, TimeUnit unit)              throws InterruptedException {         long endTime = System.currentTimeMillis() + unit.toMillis(timeout);                  for (WorkerThread worker : workers) {             long remaining = endTime - System.currentTimeMillis();             if (remaining <= 0) {                 return false; // 超时             }             worker.join(remaining);         }                  return true;     }          /**      * 获取线程池状态      */     public void printStatus() {         System.out.println("线程池状态 - 工作线程: " + workers.size() +                            ", 等待任务: " + workQueue.size() +                            ", 已关闭: " + isShutdown);     }          /**      * 工作线程实现      */     private class WorkerThread extends Thread {         public WorkerThread(String name) {             super(name);         }                  @Override         public void run() {             System.out.println(getName() + " 开始运行");                          while (!isShutdown || !workQueue.isEmpty()) {                 try {                     // 从队列获取任务,支持超时以便检查关闭状态                     Runnable task = workQueue.poll(1, TimeUnit.SECONDS);                                          if (task != null) {                         System.out.println(getName() + " 开始执行任务");                         task.run();                         System.out.println(getName() + " 任务执行完成");                     }                                      } catch (InterruptedException e) {                     // 响应中断,退出线程                     System.out.println(getName() + " 收到中断信号");                     break;                 } catch (Exception e) {                     // 捕获任务执行异常,避免工作线程退出                     System.err.println(getName() + " 任务执行异常: " + e.getMessage());                 }             }                          System.out.println(getName() + " 退出");         }     }          /**      * 测试线程池      */     public static void main(String[] args) throws InterruptedException {         SimpleThreadPool pool = new SimpleThreadPool(3);                  // 提交10个任务         for (int i = 0; i < 10; i++) {             final int taskId = i;             pool.execute(() -> {                 System.out.println(Thread.currentThread().getName() +                      " 执行任务 " + taskId);                 try {                     Thread.sleep(1000); // 模拟任务执行                 } catch (InterruptedException e) {                     Thread.currentThread().interrupt();                 }             });         }                  // 查看状态         pool.printStatus();                  // 等待任务执行         Thread.sleep(5000);                  // 关闭线程池         pool.shutdown();         if (pool.awaitTermination(3, TimeUnit.SECONDS)) {             System.out.println("线程池已完全关闭");         } else {             System.out.println("线程池关闭超时,强制关闭");             pool.shutdownNow();         }     } } 

4.4 基于线程池的Web服务器

/**  * 基于线程池的简易Web服务器  * 演示线程池在实际项目中的应用  */ public class SimpleHttpServer {     private final ExecutorService threadPool;     private final ServerSocket serverSocket;     private final String basePath;     private volatile boolean isRunning = false;          /**      * 创建HTTP服务器      */     public SimpleHttpServer(int port, int poolSize, String basePath) throws IOException {         this.threadPool = Executors.newFixedThreadPool(poolSize);         this.serverSocket = new ServerSocket(port);         this.basePath = basePath;                  // 确保基础路径存在         File baseDir = new File(basePath);         if (!baseDir.exists() || !baseDir.isDirectory()) {             throw new IllegalArgumentException("基础路径不存在或不是目录: " + basePath);         }     }          /**      * 启动服务器      */     public void start() {         if (isRunning) {             throw new IllegalStateException("服务器已经在运行");         }                  isRunning = true;         System.out.println("HTTP服务器启动,端口: " + serverSocket.getLocalPort() +                            ", 基础路径: " + basePath);                  // 主接受循环         Thread acceptorThread = new Thread(this::acceptConnections, "Server-Acceptor");         acceptorThread.setDaemon(false);         acceptorThread.start();     }          /**      * 接受客户端连接      */     private void acceptConnections() {         while (isRunning) {             try {                 Socket clientSocket = serverSocket.accept();                 System.out.println("接受客户端连接: " +                      clientSocket.getInetAddress().getHostAddress());                                  // 提交到线程池处理                 threadPool.execute(new HttpHandler(clientSocket, basePath));                              } catch (IOException e) {                 if (isRunning) {                     System.err.println("接受连接错误: " + e.getMessage());                 }                 // 服务器关闭时的异常是正常的             }         }         System.out.println("服务器停止接受新连接");     }          /**      * 停止服务器      */     public void stop() {         System.out.println("正在停止服务器...");         isRunning = false;                  try {             serverSocket.close();         } catch (IOException e) {             System.err.println("关闭ServerSocket错误: " + e.getMessage());         }                  // 优雅关闭线程池         threadPool.shutdown();         try {             if (!threadPool.awaitTermination(5, TimeUnit.SECONDS)) {                 System.out.println("强制关闭线程池");                 threadPool.shutdownNow();             }         } catch (InterruptedException e) {             threadPool.shutdownNow();             Thread.currentThread().interrupt();         }                  System.out.println("服务器已停止");     }          /**      * HTTP请求处理器      */     private static class HttpHandler implements Runnable {         private final Socket socket;         private final String basePath;                  public HttpHandler(Socket socket, String basePath) {             this.socket = socket;             this.basePath = basePath;         }                  @Override         public void run() {             // 使用ThreadLocal记录请求上下文             ThreadLocal<String> requestId = ThreadLocal.withInitial(                 () -> UUID.randomUUID().toString().substring(0, 8)             );                          try (BufferedReader in = new BufferedReader(                     new InputStreamReader(socket.getInputStream()));                  PrintWriter out = new PrintWriter(socket.getOutputStream())) {                                  String requestIdValue = requestId.get();                 System.out.println("[" + requestIdValue + "] 开始处理请求");                                  // 解析HTTP请求                 String requestLine = in.readLine();                 if (requestLine == null || requestLine.isEmpty()) {                     sendError(out, 400, "Bad Request");                     return;                 }                                  String[] parts = requestLine.split(" ");                 if (parts.length < 2) {                     sendError(out, 400, "Bad Request");                     return;                 }                                  String method = parts[0];                 String path = parts[1];                                  System.out.println("[" + requestIdValue + "] " + method + " " + path);                                  // 只处理GET请求                 if (!"GET".equals(method)) {                     sendError(out, 405, "Method Not Allowed");                     return;                 }                                  // 处理请求路径                 handleRequest(path, out, requestIdValue);                              } catch (IOException e) {                 System.err.println("处理请求IO错误: " + e.getMessage());             } finally {                 // 清理ThreadLocal                 requestId.remove();                                  try {                     socket.close();                 } catch (IOException e) {                     // 忽略关闭异常                 }             }         }                  private void handleRequest(String path, PrintWriter out, String requestId) {             try {                 // 简单路径安全校验                 if (path.contains("..")) {                     sendError(out, 403, "Forbidden");                     return;                 }                                  // 默认页面                 if ("/".equals(path)) {                     path = "/index.html";                 }                                  File file = new File(basePath + path);                                  // 文件不存在                 if (!file.exists() || !file.isFile()) {                     sendError(out, 404, "Not Found");                     return;                 }                                  // 安全检查:确保文件在基础路径内                 if (!file.getCanonicalPath().startsWith(new File(basePath).getCanonicalPath())) {                     sendError(out, 403, "Forbidden");                     return;                 }                                  // 根据文件类型设置Content-Type                 String contentType = getContentType(file.getName());                                  // 读取文件内容                 byte[] content = Files.readAllBytes(file.toPath());                                  // 发送HTTP响应                 out.println("HTTP/1.1 200 OK");                 out.println("Server: SimpleHttpServer");                 out.println("Content-Type: " + contentType);                 out.println("Content-Length: " + content.length);                 out.println("Connection: close");                 out.println(); // 空行分隔头部和主体                 out.flush();                                  // 发送文件内容                 socket.getOutputStream().write(content);                 socket.getOutputStream().flush();                                  System.out.println("[" + requestId + "] 响应发送完成,文件: " + file.getName());                              } catch (IOException e) {                 System.err.println("[" + requestId + "] 处理请求错误: " + e.getMessage());                 sendError(out, 500, "Internal Server Error");             }         }                  private String getContentType(String filename) {             if (filename.endsWith(".html") || filename.endsWith(".htm")) {                 return "text/html; charset=UTF-8";             } else if (filename.endsWith(".css")) {                 return "text/css";             } else if (filename.endsWith(".js")) {                 return "application/javascript";             } else if (filename.endsWith(".jpg") || filename.endsWith(".jpeg")) {                 return "image/jpeg";             } else if (filename.endsWith(".png")) {                 return "image/png";             } else {                 return "application/octet-stream";             }         }                  private void sendError(PrintWriter out, int code, String message) {             out.println("HTTP/1.1 " + code + " " + message);             out.println("Content-Type: text/html");             out.println("Connection: close");             out.println();             out.println("<html><body><h1>" + code + " " + message + "</h1></body></html>");             out.flush();         }     }          /**      * 启动服务器示例      */     public static void main(String[] args) {         try {             // 创建服务器,端口8080,线程池大小10,基础路径为当前目录             SimpleHttpServer server = new SimpleHttpServer(8080, 10, ".");             server.start();                          System.out.println("服务器已启动,访问 http://localhost:8080/");             System.out.println("按Enter键停止服务器...");                          // 等待用户输入停止服务器             System.in.read();             server.stop();                      } catch (Exception e) {             System.err.println("服务器启动失败: " + e.getMessage());             e.printStackTrace();         }     } } 

5. 性能优化与最佳实践

5.1 线程池大小配置策略

/**  * 线程池配置策略  * 根据任务类型合理配置线程池参数  */ public class ThreadPoolConfig {          /**      * CPU密集型任务配置      * 特点:大量计算,很少IO等待      * 策略:线程数 ≈ CPU核心数,避免过多线程竞争CPU      */     public static ExecutorService newCpuIntensivePool() {         int coreCount = Runtime.getRuntime().availableProcessors();         int threadCount = coreCount + 1; // +1 确保CPU不会空闲         System.out.println("CPU密集型线程池: " + threadCount + " 线程");         return Executors.newFixedThreadPool(threadCount);     }          /**      * IO密集型任务配置      * 特点:大量等待(网络、磁盘IO)      * 策略:线程数 ≈ CPU核心数 * (1 + 等待时间/计算时间)      */     public static ExecutorService newIoIntensivePool() {         int coreCount = Runtime.getRuntime().availableProcessors();         int threadCount = coreCount * 2; // 经验值,可根据实际情况调整         System.out.println("IO密集型线程池: " + threadCount + " 线程");         return Executors.newFixedThreadPool(threadCount);     }          /**      * 混合型任务配置      * 根据CPU和IO比例动态调整      */     public static ExecutorService newMixedPool(double cpuRatio, double ioRatio) {         int coreCount = Runtime.getRuntime().availableProcessors();         int threadCount = (int) (coreCount * cpuRatio + ioRatio);         threadCount = Math.max(1, Math.min(threadCount, 100)); // 合理范围限制         System.out.println("混合型线程池: " + threadCount + " 线程");         return Executors.newFixedThreadPool(threadCount);     }          /**      * 自定义线程池 - 更精细的控制      */     public static ThreadPoolExecutor newCustomPool(int corePoolSize,                                                    int maxPoolSize,                                                   long keepAliveTime,                                                   int queueSize) {         return new ThreadPoolExecutor(             corePoolSize,             maxPoolSize,             keepAliveTime,             TimeUnit.SECONDS,             new LinkedBlockingQueue<>(queueSize),             new CustomThreadFactory(),             new CustomRejectionPolicy()         );     }          /**      * 自定义线程工厂,设置更有意义的线程名称      */     static class CustomThreadFactory implements ThreadFactory {         private final AtomicInteger counter = new AtomicInteger(1);                  @Override         public Thread newThread(Runnable r) {             Thread thread = new Thread(r, "CustomPool-Thread-" + counter.getAndIncrement());             thread.setDaemon(false);             thread.setPriority(Thread.NORM_PRIORITY);             return thread;         }     }          /**      * 自定义拒绝策略      */     static class CustomRejectionPolicy implements RejectedExecutionHandler {         @Override         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {             System.err.println("任务被拒绝,当前活跃线程: " + executor.getActiveCount() +                               ", 队列大小: " + executor.getQueue().size());             // 可以记录日志、发送告警等             throw new RejectedExecutionException("线程池已满,拒绝新任务");         }     } } 

5.2 避免常见陷阱

1. 死锁预防与检测

/**  * 死锁预防示例  * 演示如何避免和检测死锁  */ public class DeadlockPrevention {          /**      * 死锁产生示例 - 错误的锁顺序      */     public static class DeadlockExample {         private final Object lock1 = new Object();         private final Object lock2 = new Object();                  public void method1() {             synchronized (lock1) {                 System.out.println(Thread.currentThread().getName() + " 获得 lock1");                 try { Thread.sleep(100); } catch (InterruptedException e) {}                                  synchronized (lock2) {  // 可能死锁                     System.out.println(Thread.currentThread().getName() + " 获得 lock2");                 }             }         }                  public void method2() {             synchronized (lock2) {  // 不同的锁顺序                 System.out.println(Thread.currentThread().getName() + " 获得 lock2");                 try { Thread.sleep(100); } catch (InterruptedException e) {}                                  synchronized (lock1) {  // 可能死锁                     System.out.println(Thread.currentThread().getName() + " 获得 lock1");                 }             }         }     }          /**      * 死锁预防 - 统一的锁顺序      */     public static class DeadlockPreventionExample {         private final Object lock1 = new Object();         private final Object lock2 = new Object();                  /**          * 使用统一的锁获取顺序来预防死锁          * 总是先获取lock1,再获取lock2          */         public void method1() {             synchronized (lock1) {                 System.out.println(Thread.currentThread().getName() + " 获得 lock1");                 try { Thread.sleep(100); } catch (InterruptedException e) {}                                  synchronized (lock2) {                     System.out.println(Thread.currentThread().getName() + " 获得 lock2");                     // 业务逻辑                 }             }         }                  public void method2() {             synchronized (lock1) {  // 相同的锁顺序                 System.out.println(Thread.currentThread().getName() + " 获得 lock1");                 try { Thread.sleep(100); } catch (InterruptedException e) {}                                  synchronized (lock2) {                     System.out.println(Thread.currentThread().getName() + " 获得 lock2");                     // 业务逻辑                 }             }         }     }          /**      * 使用tryLock避免死锁      */     public static class TryLockExample {         private final Lock lock1 = new ReentrantLock();         private final Lock lock2 = new ReentrantLock();                  public boolean tryDoWork() {             // 尝试获取第一个锁             if (lock1.tryLock()) {                 try {                     System.out.println(Thread.currentThread().getName() + " 获得 lock1");                                          // 尝试获取第二个锁                     if (lock2.tryLock()) {                         try {                             System.out.println(Thread.currentThread().getName() + " 获得 lock2");                             // 执行业务逻辑                             return true;                         } finally {                             lock2.unlock();                         }                     }                 } finally {                     lock1.unlock();                 }             }             return false; // 获取锁失败         }     }          /**      * 死锁检测工具      */     public static void detectDeadlock() {         ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();         long[] threadIds = threadBean.findDeadlockedThreads();                  if (threadIds != null) {             System.err.println("检测到死锁!");             ThreadInfo[] threadInfos = threadBean.getThreadInfo(threadIds);                          for (ThreadInfo threadInfo : threadInfos) {                 System.err.println("死锁线程: " + threadInfo.getThreadName());                 System.err.println("等待锁: " + threadInfo.getLockName());                 System.err.println("被线程持有: " + threadInfo.getLockOwnerName());             }         } else {             System.out.println("未检测到死锁");         }     } } 

2. 资源清理最佳实践

/**  * 资源清理最佳实践  * 演示如何正确管理和清理多线程资源  */ public class ResourceCleanup implements AutoCloseable {     private final ExecutorService executor;     private final List<AutoCloseable> resources;          public ResourceCleanup(int threadPoolSize) {         this.executor = Executors.newFixedThreadPool(threadPoolSize);         this.resources = new ArrayList<>();         System.out.println("资源管理器初始化完成,线程池大小: " + threadPoolSize);     }          /**      * 提交任务      */     public <T> Future<T> submit(Callable<T> task) {         return executor.submit(task);     }          /**      * 注册需要管理的资源      */     public void registerResource(AutoCloseable resource) {         synchronized (resources) {             resources.add(resource);         }     }          /**      * 实现AutoCloseable,支持try-with-resources      */     @Override     public void close() {         System.out.println("开始清理资源...");                  // 1. 关闭线程池         shutdownExecutor();                  // 2. 关闭所有注册的资源         closeRegisteredResources();                  System.out.println("资源清理完成");     }          /**      * 优雅关闭线程池      */     private void shutdownExecutor() {         executor.shutdown(); // 停止接受新任务         try {             // 等待现有任务完成             if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {                 System.out.println("线程池关闭超时,尝试强制关闭");                 // 取消所有未开始的任务                 executor.shutdownNow();                                  // 再次等待                 if (!executor.awaitTermination(3, TimeUnit.SECONDS)) {                     System.err.println("线程池无法完全关闭");                 }             }         } catch (InterruptedException e) {             // 重新中断并强制关闭             executor.shutdownNow();             Thread.currentThread().interrupt();         }     }          /**      * 关闭所有注册的资源      */     private void closeRegisteredResources() {         synchronized (resources) {             for (AutoCloseable resource : resources) {                 try {                     resource.close();                 } catch (Exception e) {                     System.err.println("关闭资源时出错: " + e.getMessage());                     // 继续关闭其他资源,不抛出异常                 }             }             resources.clear();         }     }          /**      * 使用示例      */     public static void main(String[] args) {         // 使用try-with-resources确保资源清理         try (ResourceCleanup manager = new ResourceCleanup(3)) {                          // 注册一些资源             manager.registerResource(() -> System.out.println("关闭数据库连接"));             manager.registerResource(() -> System.out.println("关闭网络连接"));                          // 提交任务             Future<String> future = manager.submit(() -> {                 Thread.sleep(1000);                 return "任务完成";             });                          // 获取结果             String result = future.get();             System.out.println("任务结果: " + result);                      } catch (Exception e) {             System.err.println("执行出错: " + e.getMessage());         }         // 这里会自动调用close()方法清理资源     } } 

6. 总结与核心要点

6.1 关键知识点回顾

1. 中断异常处理的核心理解

try {     Thread.sleep(1000); } catch (InterruptedException e) {     /**      * 必须重新设置中断状态的原因:      * 1. 当阻塞方法抛出InterruptedException时,会清除线程的中断状态      * 2. 如果不重新设置,调用者无法知道线程曾被中断      * 3. 这破坏了中断的传播机制      */     Thread.currentThread().interrupt(); // 恢复中断状态     // 或者直接抛出异常:throw new RuntimeException(e); } 

2. ThreadLocal内存管理

public void usingThreadLocal() {     try {         threadLocal.set(someValue);         // 使用threadLocal     } finally {         /**          * 必须清理ThreadLocal的原因:          * 1. ThreadLocalMap使用弱引用作为key,但value是强引用          * 2. 如果线程长时间存活(线程池),value不会被GC回收          * 3. 导致内存泄漏,特别是存储大对象时          */         threadLocal.remove(); // 必须调用!     } } 

6.2 最佳实践清单

  1. 线程命名:为所有线程设置有意义的名字
  2. 异常处理:在Runnable.run()中捕获所有异常
  3. 资源清理:使用try-finally或try-with-resources
  4. 中断响应:合理处理InterruptedException
  5. 锁顺序:统一锁获取顺序避免死锁
  6. 线程池:优先使用线程池而非直接创建线程
  7. volatile:仅用于简单的状态标志
  8. ThreadLocal清理:使用后必须调用remove()

6.3 性能调优建议

场景 推荐配置 说明
CPU密集型 线程数 = CPU核心数 + 1 减少线程切换开销
IO密集型 线程数 = CPU核心数 × 2 充分利用等待时间
混合型 根据监控动态调整 结合实际负载

6.4 常见问题排查

  1. 死锁检测:使用jstack或ThreadMXBean
  2. 内存泄漏:检查ThreadLocal使用,特别是线程池场景
  3. CPU过高:检查是否存在忙等待或过多线程竞争
  4. 响应慢:检查锁竞争、IO阻塞或线程池配置

掌握这些Java并发编程的基础知识和最佳实践,能够帮助开发者构建出高性能、高可靠的多线程应用程序。记住,并发编程的核心在于正确的同步、合理的资源管理和清晰的线程通信

发表评论

评论已关闭。

相关文章