本篇主要是多线程的基础知识,代码示例较多,有时间的可以逐个分析,具体细节都放在代码注释中了。
1. 理解线程:多任务执行的基石
1.1 什么是线程?
在现代操作系统中,进程是资源分配的基本单位,而线程是CPU调度的最小单位。可以把进程想象成一家公司,线程就是公司里的员工。
/** * 演示Java程序天生就是多线程程序 * 即使最简单的main方法也会启动多个系统线程 */ public class MultiThread { public static void main(String[] args) { // 获取Java线程管理MXBean ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); // 不需要获取同步的monitor和synchronizer信息,仅获取线程和线程堆栈信息 ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false); // 遍历线程信息 System.out.println("=== Java程序启动的线程列表 ==="); for (ThreadInfo threadInfo : threadInfos) { System.out.println("[" + threadInfo.getThreadId() + "] " + threadInfo.getThreadName()); } } }
输出示例:
=== Java程序启动的线程列表 === [4] Signal Dispatcher // 分发处理发送给JVM信号的线程 [3] Finalizer // 调用对象finalize方法的线程 [2] Reference Handler // 清除Reference的线程 [1] main // main线程,用户程序入口
1.2 为什么需要多线程?
三大核心优势:
- 充分利用多核处理器 - 避免CPU资源闲置
- 提升响应速度 - 后台任务不阻塞用户操作
- 更好的编程模型 - Java提供一致的多线程API
1.3 线程状态生命周期
新建(NEW) → 可运行(RUNNABLE) → 运行中 ↓ 超时等待(TIMED_WAITING) ← 等待(WAITING) ← 阻塞(BLOCKED) ↓ 终止(TERMINATED)
2. 线程的启动与安全终止
2.1 正确启动线程
/** * 线程启动最佳实践示例 * 重点:设置有意义的线程名称,合理设置守护线程标志 */ public class ThreadStartExample { public static void main(String[] args) { // 推荐:为线程设置有意义的名称,便于问题排查 Thread worker = new Thread(new Task(), "Data-Processor-1"); worker.setDaemon(false); // 明确设置是否为守护线程 worker.start(); // 正确启动方式,不要直接调用run() System.out.println("主线程继续执行,不会等待worker线程"); } static class Task implements Runnable { @Override public void run() { System.out.println(Thread.currentThread().getName() + " 开始执行"); try { // 模拟工作任务 Thread.sleep(1000); } catch (InterruptedException e) { System.out.println("任务被中断"); } System.out.println(Thread.currentThread().getName() + " 执行完成"); } } }
2.2 安全终止线程的两种方式
方式一:使用中断机制
/** * 使用中断机制安全终止线程 * 重点:理解中断异常处理的最佳实践 */ public class InterruptExample { public static void main(String[] args) throws InterruptedException { Thread worker = new Thread(new InterruptibleTask(), "Interruptible-Worker"); worker.start(); // 主线程等待2秒后中断工作线程 TimeUnit.SECONDS.sleep(2); System.out.println("主线程发送中断信号"); worker.interrupt(); // 发送中断信号 // 等待工作线程完全退出 worker.join(); System.out.println("工作线程已安全退出"); } static class InterruptibleTask implements Runnable { @Override public void run() { while (!Thread.currentThread().isInterrupted()) { try { // 模拟工作 - 这里可能抛出InterruptedException System.out.println("Working..."); TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { /** * 关键理解点:为什么需要重新设置中断状态? * * 当线程在阻塞状态(如sleep、wait、join)时被中断, * Java会做两件事: * 1. 抛出InterruptedException * 2. 清除线程的中断状态(设为false) * * 这导致循环条件 !Thread.currentThread().isInterrupted() * 会继续为true,线程无法退出。 * * 因此我们需要在捕获异常后重新设置中断状态, * 这样循环条件就能检测到中断,安全退出。 */ System.out.println("捕获到中断异常,重新设置中断状态"); Thread.currentThread().interrupt(); // 重新设置中断标志 } } System.out.println("线程安全退出,中断状态: " + Thread.currentThread().isInterrupted()); } } }
方式二:使用标志位
/** * 使用volatile标志位安全终止线程 * 适用于没有阻塞调用或需要更复杂退出逻辑的场景 */ public class FlagShutdownExample { // volatile保证可见性,确保所有线程看到最新的值 private volatile boolean running = true; private final Thread workerThread; public FlagShutdownExample() { this.workerThread = new Thread(this::doWork, "Flag-Controlled-Worker"); } public void start() { workerThread.start(); } /** * 优雅停止工作线程 */ public void stop() { System.out.println("请求停止工作线程"); running = false; // 同时发送中断,处理可能存在的阻塞情况 workerThread.interrupt(); } /** * 工作线程的主循环 * 同时检查标志位和中断状态,提供双重保障 */ private void doWork() { try { while (running && !Thread.currentThread().isInterrupted()) { // 执行工作任务 processData(); } } finally { // 无论何种方式退出,都执行清理工作 cleanup(); } System.out.println("工作线程已安全退出"); } private void processData() { try { // 模拟数据处理 System.out.println("处理数据中..."); Thread.sleep(300); } catch (InterruptedException e) { System.out.println("处理数据时被中断"); // 收到中断,但可能还想继续处理,所以不重新设置中断 // 让循环条件来检查running标志 } } private void cleanup() { System.out.println("执行资源清理工作..."); // 关闭文件、数据库连接等资源 } public static void main(String[] args) throws InterruptedException { FlagShutdownExample example = new FlagShutdownExample(); example.start(); // 运行3秒后停止 Thread.sleep(3000); example.stop(); // 等待工作线程退出 example.workerThread.join(); } }
3. 线程间通信:协作的艺术
3.1 volatile关键字:共享状态可见性
/** * volatile关键字示例 * 保证多线程间的可见性,但不保证原子性 */ public class VolatileExample { // volatile确保shutdownRequested的修改对所有线程立即可见 private volatile boolean shutdownRequested = false; private int operationCount = 0; // 非volatile,不保证可见性 public void shutdown() { shutdownRequested = true; // 所有线程立即可见 System.out.println("关闭请求已设置"); } public void doWork() { while (!shutdownRequested) { // 正常工作循环 operationCount++; // 非原子操作,可能有问题 try { Thread.sleep(100); } catch (InterruptedException e) { System.out.println("工作被中断"); Thread.currentThread().interrupt(); break; } } System.out.println("工作线程退出,操作次数: " + operationCount); } }
3.2 synchronized关键字:互斥访问
/** * synchronized关键字示例 * 保证原子性和可见性,但可能影响性能 */ public class SynchronizedCounter { private int count = 0; /** * 同步方法 - 锁对象是当前实例(this) */ public synchronized void increment() { count++; // 原子操作 } /** * 同步块 - 可以更细粒度控制锁的范围 */ public void decrement() { // 只同步关键部分,减少锁持有时间 synchronized (this) { count--; } // 这里可以执行非同步操作 } /** * 同步的get方法,保证看到最新值 */ public synchronized int getCount() { return count; } /** * 静态同步方法 - 锁对象是类的Class对象 */ public static synchronized void staticMethod() { // 静态同步方法使用Class对象作为锁 } }
3.3 等待/通知机制:经典生产者-消费者模式
/** * 生产者-消费者模式示例 * 演示wait/notify机制的正确使用 */ public class WaitNotifyExample { private final Object lock = new Object(); // 共享锁对象 private final Queue<String> queue = new LinkedList<>(); private final int MAX_SIZE = 5; /** * 生产者方法 */ public void produce(String data) throws InterruptedException { synchronized (lock) { // 必须使用while循环检查条件,避免虚假唤醒 while (queue.size() >= MAX_SIZE) { System.out.println("队列已满(" + queue.size() + "),生产者等待"); lock.wait(); // 释放锁并等待 } queue.offer(data); System.out.println("生产: " + data + ",队列大小: " + queue.size()); // 通知所有等待的消费者 lock.notifyAll(); } } /** * 消费者方法 */ public String consume() throws InterruptedException { synchronized (lock) { // 必须使用while循环检查条件 while (queue.isEmpty()) { System.out.println("队列为空,消费者等待"); lock.wait(); // 释放锁并等待 } String data = queue.poll(); System.out.println("消费: " + data + ",队列大小: " + queue.size()); // 通知所有等待的生产者 lock.notifyAll(); return data; } } /** * 测试生产者消费者模式 */ public static void main(String[] args) { WaitNotifyExample example = new WaitNotifyExample(); // 启动生产者线程 Thread producer = new Thread(() -> { try { for (int i = 0; i < 10; i++) { example.produce("Data-" + i); Thread.sleep(200); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }, "Producer"); // 启动消费者线程 Thread consumer = new Thread(() -> { try { for (int i = 0; i < 10; i++) { example.consume(); Thread.sleep(300); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }, "Consumer"); producer.start(); consumer.start(); } }
等待/通知经典范式:
// 消费者范式 - 永远在循环中调用wait() synchronized(锁对象) { while(条件不满足) { 锁对象.wait(); // 等待时会释放锁 } // 条件满足,处理业务逻辑 } // 生产者范式 synchronized(锁对象) { 改变条件; // 改变等待条件 锁对象.notifyAll(); // 通知所有等待线程 }
3.4 Thread.join():线程依赖执行
/** * Thread.join()使用示例 * 实现线程间的顺序执行依赖 */ public class JoinExample { public static void main(String[] args) throws InterruptedException { System.out.println("主线程开始"); Thread previous = Thread.currentThread(); // 创建5个有依赖关系的线程 for (int i = 0; i < 5; i++) { Thread thread = new Thread(new DependentTask(previous), "Worker-" + i); thread.start(); previous = thread; // 设置依赖链 } // 主线程先做一些工作 TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName() + " 完成初始化工作"); // 等待所有线程完成(实际上由最后一个Worker-4 join主线程) } static class DependentTask implements Runnable { private final Thread dependency; // 依赖的线程 public DependentTask(Thread dependency) { this.dependency = dependency; } @Override public void run() { try { // 等待依赖的线程执行完成 System.out.println(Thread.currentThread().getName() + " 等待 " + dependency.getName()); dependency.join(); // 依赖线程完成后开始自己的工作 System.out.println(Thread.currentThread().getName() + " 开始工作"); TimeUnit.MILLISECONDS.sleep(500); // 模拟工作 System.out.println(Thread.currentThread().getName() + " 完成工作"); } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + " 被中断"); Thread.currentThread().interrupt(); } } } }
3.5 ThreadLocal深入解析:线程局部变量
/** * ThreadLocal深度解析 * 理解原理、使用场景和内存泄漏防护 */ public class ThreadLocalExample { /** * ThreadLocal基本使用:每个线程独立的SimpleDateFormat * 避免SimpleDateFormat的线程安全问题 */ private static final ThreadLocal<SimpleDateFormat> DATE_FORMATTER = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); /** * ThreadLocal用于用户上下文传递 * 在Web应用中非常有用,避免在方法参数中传递用户信息 */ private static final ThreadLocal<UserContext> USER_CONTEXT = new ThreadLocal<>(); /** * ThreadLocal用于事务上下文 */ private static final ThreadLocal<TransactionContext> TRANSACTION_CONTEXT = new ThreadLocal<>(); /** * 可继承的ThreadLocal:子线程可以继承父线程的值 */ private static final InheritableThreadLocal<String> INHERITABLE_CONTEXT = new InheritableThreadLocal<>(); /** * 处理用户请求的示例方法 */ public void processRequest(User user) { // 设置用户上下文到当前线程 USER_CONTEXT.set(new UserContext(user)); try { // 使用线程安全的日期格式化 String timestamp = DATE_FORMATTER.get().format(new Date()); System.out.println(Thread.currentThread().getName() + " - 用户: " + user.getName() + ", 时间: " + timestamp); // 执行业务逻辑 - 任何方法都可以获取用户上下文,无需传递参数 doBusinessLogic(); } finally { /** * 关键:必须清理ThreadLocal,防止内存泄漏! * * 原因: * 1. ThreadLocalMap的key是弱引用,会被GC回收 * 2. 但value是强引用,不会被自动回收 * 3. 如果线程长时间存活(如线程池中的线程),会导致value无法释放 * 4. 调用remove()方法显式清理 */ USER_CONTEXT.remove(); DATE_FORMATTER.remove(); // 清理所有使用的ThreadLocal } } private void doBusinessLogic() { // 在任何地方都可以获取用户上下文,无需方法参数传递 UserContext context = USER_CONTEXT.get(); if (context != null) { System.out.println("执行业务逻辑,用户: " + context.getUser().getName()); } // 使用线程安全的日期格式化 String now = DATE_FORMATTER.get().format(new Date()); System.out.println("业务执行时间: " + now); } /** * 演示ThreadLocal的内存泄漏问题 */ public void demonstrateMemoryLeak() { // 错误的用法:不清理ThreadLocal ThreadLocal<byte[]> leakyLocal = new ThreadLocal<>(); leakyLocal.set(new byte[1024 * 1024]); // 1MB数据 // 如果没有调用 leakyLocal.remove(), 即使leakyLocal=null, // 线程的ThreadLocalMap中仍然保留着这个Entry // 在线程池场景下,线程重用会导致内存不断增长 } /** * ThreadLocal最佳实践:使用try-finally确保清理 */ public void bestPractice(User user) { USER_CONTEXT.set(new UserContext(user)); try { // 业务处理 doBusinessLogic(); } finally { // 确保清理,即使在业务逻辑中发生异常 USER_CONTEXT.remove(); } } /** * 测试多线程环境下的ThreadLocal */ public static void main(String[] args) throws InterruptedException { ThreadLocalExample example = new ThreadLocalExample(); // 创建多个线程,每个线程有独立的ThreadLocal值 Thread[] threads = new Thread[3]; for (int i = 0; i < threads.length; i++) { final int userId = i; threads[i] = new Thread(() -> { User user = new User("User-" + userId); example.processRequest(user); }, "Thread-" + i); threads[i].start(); } // 等待所有线程完成 for (Thread thread : threads) { thread.join(); } System.out.println("所有线程执行完成"); } // 辅助类定义 static class UserContext { private final User user; public UserContext(User user) { this.user = user; } public User getUser() { return user; } } static class User { private final String name; public User(String name) { this.name = name; } public String getName() { return name; } } static class TransactionContext { // 事务相关信息 } } /** * ThreadLocal高级用法:自定义ThreadLocal子类 */ class AdvancedThreadLocal<T> extends ThreadLocal<T> { /** * 初始值 - 当线程第一次调用get()时,如果还没有设置值,会调用此方法 */ @Override protected T initialValue() { System.out.println(Thread.currentThread().getName() + " - 初始化ThreadLocal值"); return null; // 返回默认初始值 } /** * 子线程值继承 - 仅对InheritableThreadLocal有效 * 当创建新线程时,可以控制如何从父线程继承值 */ protected T childValue(T parentValue) { System.out.println("子线程继承父线程的值: " + parentValue); return parentValue; // 直接继承,也可以进行转换 } }
4. 线程应用实例:从理论到实践
4.1 等待超时模式:避免无限期等待
/** * 等待超时模式实现 * 在等待/通知机制基础上增加超时控制 */ public class TimeoutWait<T> { private T result; /** * 带超时的获取方法 * @param timeoutMs 超时时间(毫秒) * @return 结果,超时返回null */ public synchronized T get(long timeoutMs) throws InterruptedException { long endTime = System.currentTimeMillis() + timeoutMs; long remaining = timeoutMs; // 循环检查条件和剩余时间 while (result == null && remaining > 0) { wait(remaining); // 等待剩余时间 remaining = endTime - System.currentTimeMillis(); // 更新剩余时间 } return result; // 可能为null(超时) } /** * 设置结果并通知所有等待线程 */ public synchronized void set(T value) { this.result = value; notifyAll(); // 通知所有等待的线程 } /** * 演示超时等待的使用 */ public static void main(String[] args) throws InterruptedException { TimeoutWait<String> waitObject = new TimeoutWait<>(); // 消费者线程 - 等待结果,最多等3秒 Thread consumer = new Thread(() -> { try { System.out.println("消费者开始等待结果..."); String result = waitObject.get(3000); if (result != null) { System.out.println("消费者收到结果: " + result); } else { System.out.println("消费者等待超时"); } } catch (InterruptedException e) { System.out.println("消费者被中断"); } }); // 生产者线程 - 2秒后产生结果 Thread producer = new Thread(() -> { try { Thread.sleep(2000); // 模拟生产耗时 waitObject.set("生产完成的数据"); System.out.println("生产者完成工作"); } catch (InterruptedException e) { System.out.println("生产者被中断"); } }); consumer.start(); producer.start(); consumer.join(); producer.join(); } }
4.2 数据库连接池实现
/** * 简易数据库连接池实现 * 演示资源池化和等待超时模式的实际应用 */ public class SimpleConnectionPool { private final LinkedList<Connection> pool = new LinkedList<>(); private final int maxSize; private int createdCount = 0; public SimpleConnectionPool(int initialSize, int maxSize) { this.maxSize = maxSize; // 初始化连接池 for (int i = 0; i < initialSize; i++) { pool.add(createConnection()); } System.out.println("连接池初始化完成,初始连接数: " + initialSize); } /** * 获取连接,支持超时 */ public Connection getConnection(long timeoutMs) throws InterruptedException, TimeoutException { synchronized (pool) { // 如果池中有可用连接,立即返回 if (!pool.isEmpty()) { return pool.removeFirst(); } // 池为空,但还可以创建新连接 if (createdCount < maxSize) { Connection conn = createConnection(); System.out.println("创建新连接,当前连接数: " + createdCount); return conn; } // 等待可用连接 long endTime = System.currentTimeMillis() + timeoutMs; long remaining = timeoutMs; while (pool.isEmpty() && remaining > 0) { System.out.println(Thread.currentThread().getName() + " 等待连接,剩余时间: " + remaining + "ms"); pool.wait(remaining); remaining = endTime - System.currentTimeMillis(); } if (!pool.isEmpty()) { return pool.removeFirst(); } throw new TimeoutException("获取连接超时,等待 " + timeoutMs + "ms"); } } /** * 归还连接到池中 */ public void releaseConnection(Connection conn) { if (conn != null) { synchronized (pool) { if (pool.size() < maxSize) { pool.addLast(conn); pool.notifyAll(); // 通知等待的线程 System.out.println("连接已归还,当前池大小: " + pool.size()); } else { // 连接数超过上限,关闭连接 closeConnection(conn); createdCount--; System.out.println("连接池已满,关闭连接"); } } } } /** * 创建新连接 */ private Connection createConnection() { createdCount++; // 这里应该是真实的数据库连接创建逻辑 System.out.println("创建第 " + createdCount + " 个连接"); return new MockConnection(); } /** * 关闭连接 */ private void closeConnection(Connection conn) { try { conn.close(); } catch (Exception e) { System.err.println("关闭连接失败: " + e.getMessage()); } } /** * 获取连接池状态 */ public synchronized void printStatus() { System.out.println("连接池状态 - 池中连接: " + pool.size() + ", 总创建数: " + createdCount + ", 最大限制: " + maxSize); } // 模拟数据库连接 static class MockConnection implements Connection { private final String id = UUID.randomUUID().toString().substring(0, 8); @Override public void close() { System.out.println("关闭连接: " + id); } @Override public String toString() { return "MockConnection{" + "id='" + id + ''' + '}'; } // 其他Connection接口方法... @Override public void commit() {} @Override public void rollback() {} // ... 简化实现 } static class TimeoutException extends Exception { public TimeoutException(String message) { super(message); } } }
4.3 线程池核心技术实现
/** * 简易线程池实现 * 理解线程池的核心原理和工作机制 */ public class SimpleThreadPool implements Executor { private final BlockingQueue<Runnable> workQueue; private final List<WorkerThread> workers; private volatile boolean isShutdown = false; private final int poolSize; /** * 创建线程池 */ public SimpleThreadPool(int poolSize) { this.poolSize = poolSize; this.workQueue = new LinkedBlockingQueue<>(); this.workers = new ArrayList<>(poolSize); System.out.println("初始化线程池,大小: " + poolSize); // 创建工作线程 for (int i = 0; i < poolSize; i++) { WorkerThread worker = new WorkerThread("Pool-Worker-" + i); workers.add(worker); worker.start(); } } /** * 提交任务到线程池 */ @Override public void execute(Runnable task) { if (isShutdown) { throw new RejectedExecutionException("线程池已关闭,拒绝新任务"); } if (task == null) { throw new NullPointerException("任务不能为null"); } try { workQueue.put(task); // 阻塞直到有空间 System.out.println("任务已提交,队列大小: " + workQueue.size()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RejectedExecutionException("提交任务时被中断", e); } } /** * 优雅关闭线程池 */ public void shutdown() { System.out.println("开始关闭线程池..."); isShutdown = true; // 中断所有工作线程 for (WorkerThread worker : workers) { worker.interrupt(); } } /** * 强制关闭线程池 */ public void shutdownNow() { shutdown(); workQueue.clear(); // 清空等待队列 } /** * 等待线程池完全终止 */ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long endTime = System.currentTimeMillis() + unit.toMillis(timeout); for (WorkerThread worker : workers) { long remaining = endTime - System.currentTimeMillis(); if (remaining <= 0) { return false; // 超时 } worker.join(remaining); } return true; } /** * 获取线程池状态 */ public void printStatus() { System.out.println("线程池状态 - 工作线程: " + workers.size() + ", 等待任务: " + workQueue.size() + ", 已关闭: " + isShutdown); } /** * 工作线程实现 */ private class WorkerThread extends Thread { public WorkerThread(String name) { super(name); } @Override public void run() { System.out.println(getName() + " 开始运行"); while (!isShutdown || !workQueue.isEmpty()) { try { // 从队列获取任务,支持超时以便检查关闭状态 Runnable task = workQueue.poll(1, TimeUnit.SECONDS); if (task != null) { System.out.println(getName() + " 开始执行任务"); task.run(); System.out.println(getName() + " 任务执行完成"); } } catch (InterruptedException e) { // 响应中断,退出线程 System.out.println(getName() + " 收到中断信号"); break; } catch (Exception e) { // 捕获任务执行异常,避免工作线程退出 System.err.println(getName() + " 任务执行异常: " + e.getMessage()); } } System.out.println(getName() + " 退出"); } } /** * 测试线程池 */ public static void main(String[] args) throws InterruptedException { SimpleThreadPool pool = new SimpleThreadPool(3); // 提交10个任务 for (int i = 0; i < 10; i++) { final int taskId = i; pool.execute(() -> { System.out.println(Thread.currentThread().getName() + " 执行任务 " + taskId); try { Thread.sleep(1000); // 模拟任务执行 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } // 查看状态 pool.printStatus(); // 等待任务执行 Thread.sleep(5000); // 关闭线程池 pool.shutdown(); if (pool.awaitTermination(3, TimeUnit.SECONDS)) { System.out.println("线程池已完全关闭"); } else { System.out.println("线程池关闭超时,强制关闭"); pool.shutdownNow(); } } }
4.4 基于线程池的Web服务器
/** * 基于线程池的简易Web服务器 * 演示线程池在实际项目中的应用 */ public class SimpleHttpServer { private final ExecutorService threadPool; private final ServerSocket serverSocket; private final String basePath; private volatile boolean isRunning = false; /** * 创建HTTP服务器 */ public SimpleHttpServer(int port, int poolSize, String basePath) throws IOException { this.threadPool = Executors.newFixedThreadPool(poolSize); this.serverSocket = new ServerSocket(port); this.basePath = basePath; // 确保基础路径存在 File baseDir = new File(basePath); if (!baseDir.exists() || !baseDir.isDirectory()) { throw new IllegalArgumentException("基础路径不存在或不是目录: " + basePath); } } /** * 启动服务器 */ public void start() { if (isRunning) { throw new IllegalStateException("服务器已经在运行"); } isRunning = true; System.out.println("HTTP服务器启动,端口: " + serverSocket.getLocalPort() + ", 基础路径: " + basePath); // 主接受循环 Thread acceptorThread = new Thread(this::acceptConnections, "Server-Acceptor"); acceptorThread.setDaemon(false); acceptorThread.start(); } /** * 接受客户端连接 */ private void acceptConnections() { while (isRunning) { try { Socket clientSocket = serverSocket.accept(); System.out.println("接受客户端连接: " + clientSocket.getInetAddress().getHostAddress()); // 提交到线程池处理 threadPool.execute(new HttpHandler(clientSocket, basePath)); } catch (IOException e) { if (isRunning) { System.err.println("接受连接错误: " + e.getMessage()); } // 服务器关闭时的异常是正常的 } } System.out.println("服务器停止接受新连接"); } /** * 停止服务器 */ public void stop() { System.out.println("正在停止服务器..."); isRunning = false; try { serverSocket.close(); } catch (IOException e) { System.err.println("关闭ServerSocket错误: " + e.getMessage()); } // 优雅关闭线程池 threadPool.shutdown(); try { if (!threadPool.awaitTermination(5, TimeUnit.SECONDS)) { System.out.println("强制关闭线程池"); threadPool.shutdownNow(); } } catch (InterruptedException e) { threadPool.shutdownNow(); Thread.currentThread().interrupt(); } System.out.println("服务器已停止"); } /** * HTTP请求处理器 */ private static class HttpHandler implements Runnable { private final Socket socket; private final String basePath; public HttpHandler(Socket socket, String basePath) { this.socket = socket; this.basePath = basePath; } @Override public void run() { // 使用ThreadLocal记录请求上下文 ThreadLocal<String> requestId = ThreadLocal.withInitial( () -> UUID.randomUUID().toString().substring(0, 8) ); try (BufferedReader in = new BufferedReader( new InputStreamReader(socket.getInputStream())); PrintWriter out = new PrintWriter(socket.getOutputStream())) { String requestIdValue = requestId.get(); System.out.println("[" + requestIdValue + "] 开始处理请求"); // 解析HTTP请求 String requestLine = in.readLine(); if (requestLine == null || requestLine.isEmpty()) { sendError(out, 400, "Bad Request"); return; } String[] parts = requestLine.split(" "); if (parts.length < 2) { sendError(out, 400, "Bad Request"); return; } String method = parts[0]; String path = parts[1]; System.out.println("[" + requestIdValue + "] " + method + " " + path); // 只处理GET请求 if (!"GET".equals(method)) { sendError(out, 405, "Method Not Allowed"); return; } // 处理请求路径 handleRequest(path, out, requestIdValue); } catch (IOException e) { System.err.println("处理请求IO错误: " + e.getMessage()); } finally { // 清理ThreadLocal requestId.remove(); try { socket.close(); } catch (IOException e) { // 忽略关闭异常 } } } private void handleRequest(String path, PrintWriter out, String requestId) { try { // 简单路径安全校验 if (path.contains("..")) { sendError(out, 403, "Forbidden"); return; } // 默认页面 if ("/".equals(path)) { path = "/index.html"; } File file = new File(basePath + path); // 文件不存在 if (!file.exists() || !file.isFile()) { sendError(out, 404, "Not Found"); return; } // 安全检查:确保文件在基础路径内 if (!file.getCanonicalPath().startsWith(new File(basePath).getCanonicalPath())) { sendError(out, 403, "Forbidden"); return; } // 根据文件类型设置Content-Type String contentType = getContentType(file.getName()); // 读取文件内容 byte[] content = Files.readAllBytes(file.toPath()); // 发送HTTP响应 out.println("HTTP/1.1 200 OK"); out.println("Server: SimpleHttpServer"); out.println("Content-Type: " + contentType); out.println("Content-Length: " + content.length); out.println("Connection: close"); out.println(); // 空行分隔头部和主体 out.flush(); // 发送文件内容 socket.getOutputStream().write(content); socket.getOutputStream().flush(); System.out.println("[" + requestId + "] 响应发送完成,文件: " + file.getName()); } catch (IOException e) { System.err.println("[" + requestId + "] 处理请求错误: " + e.getMessage()); sendError(out, 500, "Internal Server Error"); } } private String getContentType(String filename) { if (filename.endsWith(".html") || filename.endsWith(".htm")) { return "text/html; charset=UTF-8"; } else if (filename.endsWith(".css")) { return "text/css"; } else if (filename.endsWith(".js")) { return "application/javascript"; } else if (filename.endsWith(".jpg") || filename.endsWith(".jpeg")) { return "image/jpeg"; } else if (filename.endsWith(".png")) { return "image/png"; } else { return "application/octet-stream"; } } private void sendError(PrintWriter out, int code, String message) { out.println("HTTP/1.1 " + code + " " + message); out.println("Content-Type: text/html"); out.println("Connection: close"); out.println(); out.println("<html><body><h1>" + code + " " + message + "</h1></body></html>"); out.flush(); } } /** * 启动服务器示例 */ public static void main(String[] args) { try { // 创建服务器,端口8080,线程池大小10,基础路径为当前目录 SimpleHttpServer server = new SimpleHttpServer(8080, 10, "."); server.start(); System.out.println("服务器已启动,访问 http://localhost:8080/"); System.out.println("按Enter键停止服务器..."); // 等待用户输入停止服务器 System.in.read(); server.stop(); } catch (Exception e) { System.err.println("服务器启动失败: " + e.getMessage()); e.printStackTrace(); } } }
5. 性能优化与最佳实践
5.1 线程池大小配置策略
/** * 线程池配置策略 * 根据任务类型合理配置线程池参数 */ public class ThreadPoolConfig { /** * CPU密集型任务配置 * 特点:大量计算,很少IO等待 * 策略:线程数 ≈ CPU核心数,避免过多线程竞争CPU */ public static ExecutorService newCpuIntensivePool() { int coreCount = Runtime.getRuntime().availableProcessors(); int threadCount = coreCount + 1; // +1 确保CPU不会空闲 System.out.println("CPU密集型线程池: " + threadCount + " 线程"); return Executors.newFixedThreadPool(threadCount); } /** * IO密集型任务配置 * 特点:大量等待(网络、磁盘IO) * 策略:线程数 ≈ CPU核心数 * (1 + 等待时间/计算时间) */ public static ExecutorService newIoIntensivePool() { int coreCount = Runtime.getRuntime().availableProcessors(); int threadCount = coreCount * 2; // 经验值,可根据实际情况调整 System.out.println("IO密集型线程池: " + threadCount + " 线程"); return Executors.newFixedThreadPool(threadCount); } /** * 混合型任务配置 * 根据CPU和IO比例动态调整 */ public static ExecutorService newMixedPool(double cpuRatio, double ioRatio) { int coreCount = Runtime.getRuntime().availableProcessors(); int threadCount = (int) (coreCount * cpuRatio + ioRatio); threadCount = Math.max(1, Math.min(threadCount, 100)); // 合理范围限制 System.out.println("混合型线程池: " + threadCount + " 线程"); return Executors.newFixedThreadPool(threadCount); } /** * 自定义线程池 - 更精细的控制 */ public static ThreadPoolExecutor newCustomPool(int corePoolSize, int maxPoolSize, long keepAliveTime, int queueSize) { return new ThreadPoolExecutor( corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<>(queueSize), new CustomThreadFactory(), new CustomRejectionPolicy() ); } /** * 自定义线程工厂,设置更有意义的线程名称 */ static class CustomThreadFactory implements ThreadFactory { private final AtomicInteger counter = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "CustomPool-Thread-" + counter.getAndIncrement()); thread.setDaemon(false); thread.setPriority(Thread.NORM_PRIORITY); return thread; } } /** * 自定义拒绝策略 */ static class CustomRejectionPolicy implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.err.println("任务被拒绝,当前活跃线程: " + executor.getActiveCount() + ", 队列大小: " + executor.getQueue().size()); // 可以记录日志、发送告警等 throw new RejectedExecutionException("线程池已满,拒绝新任务"); } } }
5.2 避免常见陷阱
1. 死锁预防与检测
/** * 死锁预防示例 * 演示如何避免和检测死锁 */ public class DeadlockPrevention { /** * 死锁产生示例 - 错误的锁顺序 */ public static class DeadlockExample { private final Object lock1 = new Object(); private final Object lock2 = new Object(); public void method1() { synchronized (lock1) { System.out.println(Thread.currentThread().getName() + " 获得 lock1"); try { Thread.sleep(100); } catch (InterruptedException e) {} synchronized (lock2) { // 可能死锁 System.out.println(Thread.currentThread().getName() + " 获得 lock2"); } } } public void method2() { synchronized (lock2) { // 不同的锁顺序 System.out.println(Thread.currentThread().getName() + " 获得 lock2"); try { Thread.sleep(100); } catch (InterruptedException e) {} synchronized (lock1) { // 可能死锁 System.out.println(Thread.currentThread().getName() + " 获得 lock1"); } } } } /** * 死锁预防 - 统一的锁顺序 */ public static class DeadlockPreventionExample { private final Object lock1 = new Object(); private final Object lock2 = new Object(); /** * 使用统一的锁获取顺序来预防死锁 * 总是先获取lock1,再获取lock2 */ public void method1() { synchronized (lock1) { System.out.println(Thread.currentThread().getName() + " 获得 lock1"); try { Thread.sleep(100); } catch (InterruptedException e) {} synchronized (lock2) { System.out.println(Thread.currentThread().getName() + " 获得 lock2"); // 业务逻辑 } } } public void method2() { synchronized (lock1) { // 相同的锁顺序 System.out.println(Thread.currentThread().getName() + " 获得 lock1"); try { Thread.sleep(100); } catch (InterruptedException e) {} synchronized (lock2) { System.out.println(Thread.currentThread().getName() + " 获得 lock2"); // 业务逻辑 } } } } /** * 使用tryLock避免死锁 */ public static class TryLockExample { private final Lock lock1 = new ReentrantLock(); private final Lock lock2 = new ReentrantLock(); public boolean tryDoWork() { // 尝试获取第一个锁 if (lock1.tryLock()) { try { System.out.println(Thread.currentThread().getName() + " 获得 lock1"); // 尝试获取第二个锁 if (lock2.tryLock()) { try { System.out.println(Thread.currentThread().getName() + " 获得 lock2"); // 执行业务逻辑 return true; } finally { lock2.unlock(); } } } finally { lock1.unlock(); } } return false; // 获取锁失败 } } /** * 死锁检测工具 */ public static void detectDeadlock() { ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); long[] threadIds = threadBean.findDeadlockedThreads(); if (threadIds != null) { System.err.println("检测到死锁!"); ThreadInfo[] threadInfos = threadBean.getThreadInfo(threadIds); for (ThreadInfo threadInfo : threadInfos) { System.err.println("死锁线程: " + threadInfo.getThreadName()); System.err.println("等待锁: " + threadInfo.getLockName()); System.err.println("被线程持有: " + threadInfo.getLockOwnerName()); } } else { System.out.println("未检测到死锁"); } } }
2. 资源清理最佳实践
/** * 资源清理最佳实践 * 演示如何正确管理和清理多线程资源 */ public class ResourceCleanup implements AutoCloseable { private final ExecutorService executor; private final List<AutoCloseable> resources; public ResourceCleanup(int threadPoolSize) { this.executor = Executors.newFixedThreadPool(threadPoolSize); this.resources = new ArrayList<>(); System.out.println("资源管理器初始化完成,线程池大小: " + threadPoolSize); } /** * 提交任务 */ public <T> Future<T> submit(Callable<T> task) { return executor.submit(task); } /** * 注册需要管理的资源 */ public void registerResource(AutoCloseable resource) { synchronized (resources) { resources.add(resource); } } /** * 实现AutoCloseable,支持try-with-resources */ @Override public void close() { System.out.println("开始清理资源..."); // 1. 关闭线程池 shutdownExecutor(); // 2. 关闭所有注册的资源 closeRegisteredResources(); System.out.println("资源清理完成"); } /** * 优雅关闭线程池 */ private void shutdownExecutor() { executor.shutdown(); // 停止接受新任务 try { // 等待现有任务完成 if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { System.out.println("线程池关闭超时,尝试强制关闭"); // 取消所有未开始的任务 executor.shutdownNow(); // 再次等待 if (!executor.awaitTermination(3, TimeUnit.SECONDS)) { System.err.println("线程池无法完全关闭"); } } } catch (InterruptedException e) { // 重新中断并强制关闭 executor.shutdownNow(); Thread.currentThread().interrupt(); } } /** * 关闭所有注册的资源 */ private void closeRegisteredResources() { synchronized (resources) { for (AutoCloseable resource : resources) { try { resource.close(); } catch (Exception e) { System.err.println("关闭资源时出错: " + e.getMessage()); // 继续关闭其他资源,不抛出异常 } } resources.clear(); } } /** * 使用示例 */ public static void main(String[] args) { // 使用try-with-resources确保资源清理 try (ResourceCleanup manager = new ResourceCleanup(3)) { // 注册一些资源 manager.registerResource(() -> System.out.println("关闭数据库连接")); manager.registerResource(() -> System.out.println("关闭网络连接")); // 提交任务 Future<String> future = manager.submit(() -> { Thread.sleep(1000); return "任务完成"; }); // 获取结果 String result = future.get(); System.out.println("任务结果: " + result); } catch (Exception e) { System.err.println("执行出错: " + e.getMessage()); } // 这里会自动调用close()方法清理资源 } }
6. 总结与核心要点
6.1 关键知识点回顾
1. 中断异常处理的核心理解
try { Thread.sleep(1000); } catch (InterruptedException e) { /** * 必须重新设置中断状态的原因: * 1. 当阻塞方法抛出InterruptedException时,会清除线程的中断状态 * 2. 如果不重新设置,调用者无法知道线程曾被中断 * 3. 这破坏了中断的传播机制 */ Thread.currentThread().interrupt(); // 恢复中断状态 // 或者直接抛出异常:throw new RuntimeException(e); }
2. ThreadLocal内存管理
public void usingThreadLocal() { try { threadLocal.set(someValue); // 使用threadLocal } finally { /** * 必须清理ThreadLocal的原因: * 1. ThreadLocalMap使用弱引用作为key,但value是强引用 * 2. 如果线程长时间存活(线程池),value不会被GC回收 * 3. 导致内存泄漏,特别是存储大对象时 */ threadLocal.remove(); // 必须调用! } }
6.2 最佳实践清单
- 线程命名:为所有线程设置有意义的名字
- 异常处理:在Runnable.run()中捕获所有异常
- 资源清理:使用try-finally或try-with-resources
- 中断响应:合理处理InterruptedException
- 锁顺序:统一锁获取顺序避免死锁
- 线程池:优先使用线程池而非直接创建线程
- volatile:仅用于简单的状态标志
- ThreadLocal清理:使用后必须调用remove()
6.3 性能调优建议
| 场景 | 推荐配置 | 说明 |
|---|---|---|
| CPU密集型 | 线程数 = CPU核心数 + 1 | 减少线程切换开销 |
| IO密集型 | 线程数 = CPU核心数 × 2 | 充分利用等待时间 |
| 混合型 | 根据监控动态调整 | 结合实际负载 |
6.4 常见问题排查
- 死锁检测:使用jstack或ThreadMXBean
- 内存泄漏:检查ThreadLocal使用,特别是线程池场景
- CPU过高:检查是否存在忙等待或过多线程竞争
- 响应慢:检查锁竞争、IO阻塞或线程池配置
掌握这些Java并发编程的基础知识和最佳实践,能够帮助开发者构建出高性能、高可靠的多线程应用程序。记住,并发编程的核心在于正确的同步、合理的资源管理和清晰的线程通信。