Thread 源码阅读
线程是操作系统进行调度的最小单位,Java 中的 Thread 是对操作系统线程的封装,很多实际的控制是靠底层 Native 方法实现的,因此 Java 的 Thread 类还是比较简单的。
成员
Thread 内部的成员变量还是比较多的,主要分为线程标识、状态标记、局部变量、同步相关等等:
public class Thread implements Runnable {
// 线程ID
private final long tid;
// 线程ID计数器
private static long threadSeqNumber;
// 线程名
private volatile String name;
// 线程取名的计数器
private static int threadInitNumber;
// 线程状态,0 表示 NEW
private volatile int threadStatus;
// 中断标记,注意仅仅用作标记
private volatile boolean interrupted;
// 守护线程标志,所有普通线程执行完毕后自动终止,常用于执行后台任务,例如 GC 线程、JIT线程
private boolean daemon = false;
// 线程优先级,1-10,默认 5
private int priority;
public static final int MIN_PRIORITY = 1;
public static final int NORM_PRIORITY = 5;
public static final int MAX_PRIORITY = 10;
// 所属线程组
private ThreadGroup group;
// 执行目标
private Runnable target;
// 线程私有的局部变量
ThreadLocal.ThreadLocalMap threadLocals = null;
// 可继承的线程局部变量,线程创建时会继承自父线程
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
// 支持 LockSupport
volatile Object parkBlocker;
// 可中断IO中用于阻塞线程的对象及对应的访问锁
private volatile Interruptible blocker;
private final Object blockerLock = new Object();
// 线程栈大小
private final long stackSize;
// 上下文类加载器,默认是父线程的类加载器
private ClassLoader contextClassLoader;
// 标识线程是否是死胎
private boolean stillborn = false;
// 存储线程在底层平台的线程 ID
private long eetop;
// 继承的访问控制上下文
private AccessControlContext inheritedAccessControlContext;
// 未捕获异常的处理器
private volatile UncaughtExceptionHandler uncaughtExceptionHandler;
private static volatile UncaughtExceptionHandler defaultUncaughtExceptionHandler;
// 用于生成高性能的伪随机数(PRNGs:Pseudo Random Number Generators)
long threadLocalRandomSeed;
int threadLocalRandomProbe;
int threadLocalRandomSecondarySeed;
}
构造器
Thread 提供了多种构造器的重载,主要调用的是下面这个:
private Thread(ThreadGroup g, // 线程组,默认 null
Runnable target, // 执行目标,默认 null
String name, // 线程名,默认 ["Thread-" + threadInitNumber++]
long stackSize, // 线程栈大小,默认 0 表示不指定,由 VM 决定如何使用
AccessControlContext acc, // 访问控制,默认 null,已废弃
boolean inheritThreadLocals) // 是否继承父线程私有变量,默认 false
{
if (name == null)
throw new NullPointerException("name cannot be null");
this.name = name;
// 如果没有指定线程组,依次从 SecurityManager->父线程 获取
Thread parent = currentThread();
SecurityManager security = System.getSecurityManager();
if (g == null) {
if (security != null) g = security.getThreadGroup();
if (g == null) g = parent.getThreadGroup();
}
// 访问校验
g.checkAccess();
if (security != null) {
if (isCCLOverridden(getClass())) {
security.checkPermission(SecurityConstants.SUBCLASS_IMPLEMENTATION_PERMISSION);
}
}
// 线程组记录新线程
g.addUnstarted();
// 注入成员
this.group = g;
// 默认根据父线程
this.daemon = parent.isDaemon();
// 默认根据父线程
this.priority = parent.getPriority();
if (security == null || isCCLOverridden(parent.getClass()))
this.contextClassLoader = parent.getContextClassLoader();
else
this.contextClassLoader = parent.contextClassLoader;
this.inheritedAccessControlContext = acc != null ? acc : AccessController.getContext();
this.target = target;
setPriority(priority);
// 继承父线程的私有变量
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
this.stackSize = stackSize;
// 设置线程ID = ++threadSeqNumber
this.tid = nextThreadID();
}
线程状态
Thread 共定义了 6 种状态,对应于 Thread.State 枚举,某一时刻某一线程只会处于其中一种状态,但和操作系统中的线程状态并没有对应关系。
// java.lang.Thread.State
public enum State {
// 创建完但尚未启动的状态
NEW,
// 启动后的就绪/运行中(可运行)的状态
RUNNABLE,
// 阻塞在某个 monitor 上所处的状态
BLOCKED,
// 等待另一线程执行操作的状态
WAITING,
// 指定超时时间的等待状态
TIMED_WAITING,
// 执行完毕的终止状态
TERMINATED;
}
状态转换以及触发条件如下:
start
Thread 里面有两种执行 target,也就是 Runnable 对象的方法:
- start():调用 native 方法创建实际的操作系统线程
- run():在当前线程里执行 target#run(),并不会创建新线程
所以要想真的利用多线程执行任务,要调用 Thread#start,而不是执行 Thread#run。注意 start() 不能重复调用,否则会抛出非法线程状态的异常。执行完 start() 后线程就进入了 Runnable 状态。
// java.lang.Thread#start
public synchronized void start() {
// 状态校验
if (threadStatus != 0)
throw new IllegalThreadStateException();
// 添加到线程组的【可运行线程】数组中,并修改相关的统计
group.add(this);
boolean started = false;
try {
// 本地方法创建新线程执行 target
start0();
started = true;
} finally {
try {
if (!started)
group.threadStartFailed(this);
} catch (Throwable ignore) {}
}
}
private native void start0();
// java.lang.Thread#run
public void run() {
if (target != null) {
target.run();
}
}
join
join() 可以在一个线程中等待另一个线程执行完毕(在当前这个线程等待另一个线程加入进来),例如在 a 线程中执行 b.join,那么线程 a 就会进入 Thread#join 方法循环等待,直到 b 线程执行结束 a 才会继续。join() 底层是利用 isAlive() 和 Object#wait() 实现有限阻塞,调用的线程执行完毕后会执行 notifyAll() 唤醒所有等待队列的线程进入同步队列。
public final void join() throws InterruptedException { join(0); }
public final synchronized void join(final long millis) {
if (millis > 0) {
if (isAlive()) {
// 不断循环,校验是否超时
final long startTime = System.nanoTime();
long delay = millis;
do {
wait(delay);
} while (isAlive() && (delay = millis - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)) > 0);
}
} else if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
throw new IllegalArgumentException("timeout value is negative");
}
}
interrupt
interrupt() 可以对一个已经启动的线程进行中断操作,中断操作会根据线程所处的不同状态有不同结果:
- 若线程阻塞在 wait/join/sleep 上:中断标记置 false,同时线程收到 InterruptedException
- 若线程阻塞在 NIO#InterruptibleChannel 上:关闭 channel,中断标记置 true,同时线程收到 ClosedByInterruptException
- 若线程阻塞在 NIO#Selector 上:阻塞方法立即返回,中断标记置 true
- 其他情况:中断标记置 true
public void interrupt() {
// 其它线程中断本线程
if (this != Thread.currentThread()) {
checkAccess();
// 先获取 blocker 的锁
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
// 标记当前线程被中断,注意仅仅是做标记
interrupted = true;
interrupt0();
b.interrupt(this);
return;
}
}
}
// 如果是自己调用的,直接设置标记即可
interrupted = true;
// 通知 JVM 中断事件
interrupt0();
}
private native void interrupt0();
Thread 里有两种方法获取中断状态,但有不同的副作用:
// 1. 获取中断状态,不清除中断标记
public boolean isInterrupted() { return interrupted; }
// 2. 静态方法,判断是否被中断,如果已经被中断会清除中断标记
public static boolean interrupted() {
Thread t = currentThread();
boolean interrupted = t.interrupted;
if (interrupted) {
t.interrupted = false;
clearInterruptEvent();
}
return interrupted;
}
exit
exit 会在线程退出前被调用,来做一些清理工作,这是 JVM 自动执行的退出。JDK 建议用一个自定义的变量用于标识线程是否需要退出,然后线程不断地去检查该变量从而决定要不要从 run 方法退出;或者调用线程对象的 interrupt 方法,通过中断异常来退出执行。
// java.lang.Thread#exit
private void exit() {
// 清理局部变量
if (threadLocals != null && TerminatingThreadLocal.REGISTRY.isPresent()) {
TerminatingThreadLocal.threadTerminated();
}
// 移除出线程组
if (group != null) {
group.threadTerminated(this);
group = null;
}
// 手动置 null 帮助 GC
target = null;
threadLocals = null;
inheritableThreadLocals = null;
inheritedAccessControlContext = null;
blocker = null;
uncaughtExceptionHandler = null;
}
其它
工具方法
// 获取当前所在线程
public static native Thread currentThread();
// 进入 Timed_Waiting,不会释放任何 monitor,可以被其它线程 interrupt
public static native void sleep(long millis);
// 提示调度器当前线程愿意让出 CPU,依赖于具体实现
public static native void yield();
// 设置当前所在线程的阻塞对象
static void blockedOn(Interruptible b);
// 当前线程是否持有该对象的 monitor 锁
public static native boolean holdsLock(Object obj);
// 告诉 JVM 当前线程正在忙等,用于优化
public static void onSpinWait() {}
// 是否存活
public final native boolean isAlive();
// 获取所属线程组内活跃线程数(包括子线程组)
public static int activeCount() {
return currentThread().getThreadGroup().activeCount();
}
// 获取所属线程组内所有线程,拷贝到 tarray 里(包括子线程组)
public static int enumerate(Thread tarray[]) {
return currentThread().getThreadGroup().enumerate(tarray);
}
// 获取当前线程的调用栈
private static final StackTraceElement[] EMPTY_STACK_TRACE = new StackTraceElement[0];
public StackTraceElement[] getStackTrace() {
// 其它线程调用
if (this != Thread.currentThread()) {
// 安全校验
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkPermission(SecurityConstants.GET_STACK_TRACE_PERMISSION);
}
if (!isAlive()) {
return EMPTY_STACK_TRACE;
}
// 拿到当前线程的调用栈
StackTraceElement[][] stackTraceArray = dumpThreads(new Thread[] {this});
StackTraceElement[] stackTrace = stackTraceArray[0];
if (stackTrace == null) {
stackTrace = EMPTY_STACK_TRACE;
}
return stackTrace;
} else {
// 当前线程自己调用,直接打印调用栈
return (new Exception()).getStackTrace();
}
}
// 拿到所有线程和对应的调用栈封装成 Map,原理同上
public static Map<Thread, StackTraceElement[]> getAllStackTraces();
// 获取所有线程对象
private static native Thread[] getThreads();
// 拿到参数线程组对应的所有调用栈
private static native StackTraceElement[][] dumpThreads(Thread[] threads);
// 打印调用栈
public static void dumpStack() {
new Exception("Stack trace").printStackTrace();
}
异常
单线程的程序发生一个未捕获的异常时我们可以采用 try-catch 进行异常捕获;但在多线程环境中,线程抛出的异常不能在调用的地方直接用 try-catch 捕获,进而导致一些资源问题。不过 Thread 提供了未捕获异常的处理器接口,能检测出某个由于未捕获的异常而终结的情况。
// 两个异常处理器类成员
private volatile UncaughtExceptionHandler uncaughtExceptionHandler;
private static volatile UncaughtExceptionHandler defaultUncaughtExceptionHandler;
// 未捕获异常处理器接口
public interface UncaughtExceptionHandler {
void uncaughtException(Thread t, Throwable e);
}
private void dispatchUncaughtException(Throwable e);
// getter/setter 略
安全审计
private static boolean isCCLOverridden(Class<?> cl);
static void processQueue(ReferenceQueue<Class<?>> queue, ConcurrentMap<? extends, WeakReference<Class<?>>, ?> map);
private static boolean auditSubclass(final Class<?> subcl);
private static class Caches {...}
static class WeakClassKey extends WeakReference<Class<?>> {...}
已废弃
下面是一些由于安全问题,不推荐使用的控制方法:
public final void stop() -> private native void stop0(Object o);
public final void suspend() -> private native void suspend0();
public final void resume() -> private native void resume0();
ThreadGroup
线程组 ThreadGroup 就是一个线程集合,用于更方便地管理线程。线程组是一种父子层级结构,一个线程组包括多个线程,同时还可以拥有多个子线程组。在 JVM 中线程组层级关系如下:
- system:用来处理 JVM 系统任务的线程组,例如对象的销毁等
- main:system 的直接子线程组,该组至少包含一个 main 线程
- 其它:应用程序创建的线程组
ThreadGroup 主要提供了对所管理线程/线程组的 CRUD、状态控制、统计、遍历等功能,但其中很多由于安全问题都是已废弃的,使用时要注意。
public class ThreadGroup implements Thread.UncaughtExceptionHandler {
// 父线程组
private final ThreadGroup parent;
// 线程组名称
String name;
// 线程组最大优先级
int maxPriority;
// 销毁标记
boolean destroyed;
// 守护标记
boolean daemon;
// 已加入,尚未启动的线程数
int nUnstartedThreads = 0;
// 已启动线程数
int nthreads;
// 管理的线程集合
Thread threads[];
// 管理的子线程组数
int ngroups;
// 管理的子线程组集合
ThreadGroup groups[];
}