生产环境中使用线程池需要综合考虑资源管理、任务处理、错误恢复和监控等多个方面。以下是生产级别线程池的全面使用指南:
一、线程池创建最佳实践
1. 避免使用Executors快捷方法
// 反模式 - 可能导致OOM ExecutorService unsafe = Executors.newCachedThreadPool(); // 无界线程池 ExecutorService unsafe2 = Executors.newFixedThreadPool(10); // 无界队列 // 正确方式 - 手动创建ThreadPoolExecutor int corePoolSize = Runtime.getRuntime().availableProcessors(); int maxPoolSize = corePoolSize * 2; BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100); // 有界队列 RejectedExecutionHandler handler = new CustomRejectionPolicy(); ExecutorService executor = new ThreadPoolExecutor( corePoolSize, maxPoolSize, 60L, TimeUnit.SECONDS, workQueue, new CustomThreadFactory("app-worker-"), handler );
2. 关键配置参数
- corePoolSize:常驻核心线程数(根据业务类型调整)
- maximumPoolSize:最大线程数(建议不超过100)
- keepAliveTime:空闲线程存活时间(30-120秒)
- workQueue:必须使用有界队列(避免OOM)
- threadFactory:自定义线程工厂
- rejectedExecutionHandler:自定义拒绝策略
二、线程池关键组件实现
1. 自定义线程工厂(命名、异常处理)
public class CustomThreadFactory implements ThreadFactory { private final AtomicInteger counter = new AtomicInteger(0); private final String namePrefix; private final ThreadGroup group; public CustomThreadFactory(String namePrefix) { this.namePrefix = namePrefix; SecurityManager s = System.getSecurityManager(); this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); } @Override public Thread newThread(Runnable r) { Thread thread = new Thread(group, r, namePrefix + counter.incrementAndGet(), 0); thread.setDaemon(false); thread.setPriority(Thread.NORM_PRIORITY); // 设置未捕获异常处理器 thread.setUncaughtExceptionHandler((t, e) -> { logger.error("Uncaught exception in thread: " + t.getName(), e); // 发送告警通知 AlertManager.notify(e); }); return thread; } }
2. 自定义拒绝策略(生产级)
public class CustomRejectionPolicy implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (!executor.isShutdown()) { // 1. 记录被拒绝任务 logger.warn("Task rejected: " + r.toString()); // 2. 尝试重新放入队列(带超时) try { boolean offered = executor.getQueue().offer(r, 1, TimeUnit.SECONDS); if (!offered) { // 3. 持久化到存储系统 persistTask(r); logger.info("Task persisted to storage: " + r); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.error("Re-enqueue interrupted", e); } } } private void persistTask(Runnable task) { // 实现任务持久化逻辑(数据库、文件、消息队列) TaskStorage.save(task); } }
三、任务提交与执行最佳实践
1. 任务封装(带监控)
public class MonitoredTask implements Runnable { private final Runnable actualTask; private final long submitTime; public MonitoredTask(Runnable task) { this.actualTask = task; this.submitTime = System.currentTimeMillis(); } @Override public void run() { long start = System.currentTimeMillis(); try { // 设置MDC上下文(日志链路跟踪) MDC.put("traceId", UUID.randomUUID().toString()); actualTask.run(); long duration = System.currentTimeMillis() - start; Metrics.recordSuccess(duration); } catch (Exception e) { long duration = System.currentTimeMillis() - start; Metrics.recordFailure(duration); // 重试逻辑 if (shouldRetry(e)) { retryTask(); } else { logger.error("Task execution failed", e); } } finally { MDC.clear(); } } // 提交任务时使用 public static void submit(ExecutorService executor, Runnable task) { executor.execute(new MonitoredTask(task)); } }
2. 任务超时控制
Future<?> future = executor.submit(task); try { // 设置任务超时时间 future.get(30, TimeUnit.SECONDS); } catch (TimeoutException e) { // 1. 取消任务执行 future.cancel(true); // 2. 记录超时日志 logger.warn("Task timed out: " + task); // 3. 执行降级策略 fallbackHandler.handle(task); } catch (Exception e) { // 处理其他异常 }
四、线程池监控与管理
1. 监控指标采集
public class ThreadPoolMonitor implements Runnable { private final ThreadPoolExecutor executor; public ThreadPoolMonitor(ThreadPoolExecutor executor) { this.executor = executor; } @Override public void run() { while (!Thread.currentThread().isInterrupted()) { try { // 采集关键指标 int activeCount = executor.getActiveCount(); long completedTaskCount = executor.getCompletedTaskCount(); int queueSize = executor.getQueue().size(); int poolSize = executor.getPoolSize(); // 发布到监控系统 Metrics.gauge("threadpool.active.count", activeCount); Metrics.gauge("threadpool.queue.size", queueSize); Metrics.counter("threadpool.completed.tasks", completedTaskCount); // 检测潜在问题 if (queueSize > executor.getQueue().remainingCapacity() * 0.8) { logger.warn("Thread pool queue is approaching capacity"); } // 30秒采集一次 Thread.sleep(30_000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } }
2. 动态调整线程池参数
public class DynamicThreadPool extends ThreadPoolExecutor { public DynamicThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue); } // 动态修改核心线程数 public void setCorePoolSize(int corePoolSize) { if (corePoolSize >= 0) { super.setCorePoolSize(corePoolSize); Metrics.gauge("threadpool.core.size", corePoolSize); } } // 动态修改最大线程数 public void setMaxPoolSize(int maxPoolSize) { if (maxPoolSize > 0 && maxPoolSize >= getCorePoolSize()) { super.setMaximumPoolSize(maxPoolSize); Metrics.gauge("threadpool.max.size", maxPoolSize); } } // 动态修改队列容量(需要特殊处理) public void resizeQueue(int newCapacity) { BlockingQueue<Runnable> newQueue = new ArrayBlockingQueue<>(newCapacity); BlockingQueue<Runnable> oldQueue = getQueue(); // 转移任务 synchronized (this) { List<Runnable> transferList = new ArrayList<>(); oldQueue.drainTo(transferList); newQueue.addAll(transferList); // 更新队列 super.setRejectedExecutionHandler(getRejectedExecutionHandler()); super.setQueue(newQueue); } } }
五、优雅关闭与资源清理
1. 应用关闭时处理
@PreDestroy public void shutdownExecutor() { // 1. 禁止新任务提交 executor.shutdown(); try { // 2. 等待现有任务完成 if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { // 3. 取消所有未开始任务 executor.shutdownNow(); // 4. 再次等待任务响应取消 if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { logger.error("Thread pool did not terminate"); } } } catch (InterruptedException e) { // 5. 中断当前线程并尝试取消 executor.shutdownNow(); Thread.currentThread().interrupt(); } // 6. 清理资源 cleanupResources(); }
2. 未完成任务恢复
public void recoverPendingTasks() { BlockingQueue<Runnable> pendingQueue = executor.getQueue(); List<Runnable> pendingTasks = new ArrayList<>(); pendingQueue.drainTo(pendingTasks); for (Runnable task : pendingTasks) { if (task instanceof RecoverableTask) { // 持久化到可靠存储 TaskStorage.save((RecoverableTask) task); logger.info("Recovered pending task: " + task); } } }
六、生产环境建议
-
线程隔离策略:
- CPU密集型任务:独立线程池
- I/O密集型任务:独立线程池
- 关键业务:独立线程池(避免相互影响)
-
资源限制:
// 使用Semaphore控制并发资源使用 private final Semaphore concurrencySemaphore = new Semaphore(50); executor.execute(() -> { try { concurrencySemaphore.acquire(); // 执行受限资源操作 } finally { concurrencySemaphore.release(); } }); -
上下文传递:
// 使用TransmittableThreadLocal传递上下文 TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>(); executor.execute(TtlRunnable.get(() -> { // 可以访问父线程的context值 String value = context.get(); })); -
熔断降级:
CircuitBreaker circuitBreaker = new CircuitBreaker(); executor.execute(() -> { if (circuitBreaker.allowExecution()) { try { // 执行业务逻辑 } catch (Exception e) { circuitBreaker.recordFailure(); } } else { // 执行降级逻辑 fallbackService.executeFallback(); } });
七、常见问题处理方案
| 问题 | 现象 | 解决方案 |
|---|---|---|
| 线程泄露 | 线程数持续增长 | 1. 检查线程是否正常结束 2. 添加线程创建监控 3. 限制最大线程数 |
| 任务堆积 | 队列持续增长 | 1. 增加消费者线程 2. 优化任务处理速度 3. 实施任务降级 |
| CPU使用率高 | CPU持续满载 | 1. 分析线程栈(jstack) 2. 优化热点代码 3. 限制线程池大小 |
| 任务饿死 | 低优先级任务长期得不到执行 | 1. 使用优先级队列 2. 拆分不同优先级线程池 3. 实现公平调度 |
| 上下文丢失 | 子线程无法获取上下文 | 1. 使用TransmittableThreadLocal 2. 手动传递上下文 3. 使用MDC框架 |
生产环境中使用线程池需要综合考虑资源配置、任务管理、错误处理和监控告警等多个方面。建议结合具体业务场景选择合适的策略,并建立完善的监控和告警机制。