bigbully +

靠谱的线程工厂类

有经验的程序员会在创建线程池时,传入一个名字前缀,这样打印日志的时候就可以看到每个线程在做了什么,万一需要jmap时,也能知道每个现成正在做什么。

就像这样:

class SimpleThreadFactory(namePrefix:String) extends ThreadFactory {

  private val threadIndex: AtomicInteger = new AtomicInteger(0)

  override def newThread(r: Runnable): Thread = {
    new Thread(r, String.format(namePrefix + "_%d", this.threadIndex.incrementAndGet))
  }
}

我每次都是这么做的,不同线程有代表它职责的名字帮助我更容易排查问题,这让我心满意足。

不过当阅读Akka源码的时候,我终于意识:还能做的更好。

这是ActorSystem中用到的一个线程工厂:

case class MonitorableThreadFactory(name: String,
                                daemonic: Boolean,
                                contextClassLoader: Option[ClassLoader],
                                exceptionHandler: Thread.UncaughtExceptionHandler = MonitorableThreadFactory.doNothing,
                                protected val counter: AtomicLong = new AtomicLong) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory {

  def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = {
    val t = wire(new MonitorableThreadFactory.AkkaForkJoinWorkerThread(pool))
    // Name of the threads for the ForkJoinPool are not customizable. Change it here.
    t.setName(name + "-" + counter.incrementAndGet())
    t
  }

  def newThread(runnable: Runnable): Thread = wire(new Thread(runnable, name + "-" + counter.incrementAndGet()))

  def withName(newName: String): MonitorableThreadFactory = copy(newName)

  protected def wire[T <: Thread](t: T): T = {
    t.setUncaughtExceptionHandler(exceptionHandler)
    t.setDaemon(daemonic)
    contextClassLoader foreach t.setContextClassLoader
    t
  }
}

构造函数中不仅传入了线程名前缀,还可以指定是否是守护线程,以及classLoader,更重要的是,它允许我们传入特定的线程异常处理器:UncaughtExceptionHandler

要知道任何一个线程在处理任务时,如果有没catch的异常,那么这个线程就会被销毁重建。

但很少有人使用线程异常处理器,而是在Runnable中catch住所有异常,包括Throwable。Throwable之所以称之为Throwable,意味着它作为影响应用正常运行的严重错误,我们无法在应用内部处理,而是应该任由他被抛到最外层,甚至使应用崩溃(之后通过重启应用的方式恢复)。

protected def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler =
new Thread.UncaughtExceptionHandler() {
  def uncaughtException(thread: Thread, cause: Throwable): Unit = {
    cause match {
      case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable ⇒ log.error(cause, "Uncaught error from thread [{}]", thread.getName)
      case _ ⇒
        if (settings.JvmExitOnFatalError) {
          try {
            log.error(cause, "Uncaught error from thread [{}] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled", thread.getName)
            import System.err
            err.print("Uncaught error from thread [")
            err.print(thread.getName)
            err.print("] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[")
            err.print(name)
            err.println("]")
            cause.printStackTrace(System.err)
            System.err.flush()
          } finally {
            System.exit(-1)
          }
        } else {
          log.error(cause, "Uncaught fatal error from thread [{}] shutting down ActorSystem [{}]", thread.getName, name)
          shutdown()
        }
    }
  }
}

这个异常处理器把异常分为两类:

  1. 不严重的异常。这种异常我们在处理器中打印异常日志
  2. 如果我们有设定严重错误时应用退出,则进行异常退出。否则只是打印异常日志,ActorSystem退出

由此可见,java或scala中的异常处理,指的是当我知道如何处理异常时才去catch,如果遇到的是Throwable,则让他抛到最外层好了。这样比仅仅打印一行日志更容易也更早的能够发现问题。