thread,threadlocal,InheritableThreadLocal源码详解

Author Avatar
Sean Yu 11月 20, 2020
  • 在其它设备中阅读本文章

Thread 源码解析

构造方法:

所有构造方法都调用了init(),直接看此方法:

private void init(ThreadGroup g, Runnable target, String name,
                    long stackSize, AccessControlContext acc) {
      if (name == null) {
          throw new NullPointerException("name cannot be null");
      }
      this.name = name.toCharArray();
      // 父线程
      Thread parent = currentThread();
      SecurityManager security = System.getSecurityManager();
      if (g == null) {
          // 实际是Thread.currentThread().getThreadGroup();
          // 拿到threadgroup以后会对其进行操作
          // threadgroup里面有nUnstartedThreads代表未启动线程数
          // nthreads启动线程数
          // threads[]线程的数组
          /* Determine if it's an applet or not */
          /* If there is a security manager, ask the security manager
             what to do. */
          if (security != null) {
              g = security.getThreadGroup();
          }
          /* If the security doesn't have a strong opinion of the matter
             use the parent thread group. */
          if (g == null) {
              g = parent.getThreadGroup();
          }
      }
      /* checkAccess regardless of whether or not threadgroup is
         explicitly passed in. */
      g.checkAccess();
      /*
       * Do we have the required permissions?
       */
      if (security != null) {
          if (isCCLOverridden(getClass())) {
              security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION);
          }
      }
      // 在ThreadGroup中标记增加了一个未启动的线程,里面操作很简单,nUnstartedThreads++;
      g.addUnstarted();
      this.group = g;
      // 继承父线程的属性
      this.daemon = parent.isDaemon();
      this.priority = parent.getPriority();
      if (security == null || isCCLOverridden(parent.getClass()))
          this.contextClassLoader = parent.getContextClassLoader();
      else
          this.contextClassLoader = parent.contextClassLoader;
      this.inheritedAccessControlContext =
              acc != null ? acc : AccessController.getContext();
      this.target = target;
      // 再设置线程优先级
      setPriority(priority);
      // 设置inheritableThreadLocals用于父子线程变量共享
      if (parent.inheritableThreadLocals != null)
          this.inheritableThreadLocals =
              ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
      /* Stash the specified stack size in case the VM cares */
      // 给这个线程设置的栈的大小,默认为0 
      this.stackSize = stackSize;
      // 设置线程id
      tid = nextThreadID();
  }

start():

 public synchronized void start() {
        /**
         * This method is not invoked for the main method thread or "system"
         * group threads created/set up by the VM. Any new functionality added
         * to this method in the future may have to also be added to the VM.
         *
         * A zero status value corresponds to state "NEW".
         */
        // 一个新建的未启动的线程的threadStatus必须为0
        // 0 就代表“new”
        if (threadStatus != 0)
            throw new IllegalThreadStateException();

        /* Notify the group that this thread is about to be started
         * so that it can be added to the group's list of threads
         * and the group's unstarted count can be decremented. */
        // 将当前线程加入group
        // group中的启动线程数++ 未启动线程数--
        group.add(this);

        boolean started = false;
        try {
            // 调用native方法
            start0();
            started = true;
        } finally {
            try {
                if (!started) {
                    group.threadStartFailed(this);
                }
            } catch (Throwable ignore) {
                /* do nothing. If start0 threw a Throwable then
                  it will be passed up the call stack */
            }
        }
    }

exit():

/**
 * This method is called by the system to give a Thread
 * a chance to clean up before it actually exits.
 */
private void exit() {
    if (group != null) {
        group.threadTerminated(this);
        group = null;
    }
    /* Aggressively null out all reference fields: see bug 4006245 */
    target = null;
    /* Speed the release of some of these resources */
    threadLocals = null;
    inheritableThreadLocals = null;
    inheritedAccessControlContext = null;
    blocker = null;
    uncaughtExceptionHandler = null;
}

exit()方法没啥可说的,更重要的是其中调用的group.threadTerminated(this);

 /**
     * Notifies the group that the thread {@code t} has terminated.
     *
     * <p> Destroy the group if all of the following conditions are
     * true: this is a daemon thread group; there are no more alive
     * or unstarted threads in the group; there are no subgroups in
     * this thread group.
     *
     * @param  t
     *         the Thread that has terminated
     */
void threadTerminated(Thread t) {
        synchronized (this) {
            // 从group中删除当前线程
            remove(t);

            if (nthreads == 0) {
                // 唤醒其他线程
                // 这就是为什么被join挂起的线程会被唤醒
                notifyAll();
            }
            if (daemon && (nthreads == 0) &&
                (nUnstartedThreads == 0) && (ngroups == 0))
            {
                destroy();
            }
        }
    }

sleep():

public static void sleep(long millis, int nanos)
    throws InterruptedException {
        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }
        if (nanos < 0 || nanos > 999999) {
            throw new IllegalArgumentException(
                                "nanosecond timeout value out of range");
        }
        if (nanos >= 500000 || (nanos != 0 && millis == 0)) {
            millis++;
        }
        // 直接调用native方法
        sleep(millis);
    }

stop():
stop( )方法是停止线程的执行,这个方法是一个不推荐使用的方法,已经被废弃了,因为使用该方法会出现异常情况。
因为它是天生不安全的。停止一个线程会导致它解锁它所锁定的所有monitor(当一个ThreadDeath Exception沿着栈向上传播时会解锁monitor),如果这些被释放的锁所保护的objects有任何一个进入一个不一致的状态,其他将要访问该objects的线程也会以一种不一致的状态来访问这些objects。这种objects称为“被损坏了”。当线程对被损坏的objects上做操作时,可能会产生意想不到的结果,这些行为可能是很严重的,并且难以探测到,

@Deprecated
  public final void stop() {
      SecurityManager security = System.getSecurityManager();
      if (security != null) {
        //检查是否有权限
          checkAccess();
          if (this != Thread.currentThread()) {
              security.checkPermission(SecurityConstants.STOP_THREAD_PERMISSION);
          }
      }
      // A zero status value corresponds to "NEW", it can't change to
      // not-NEW because we hold the lock.
      if (threadStatus != 0) {
          resume(); // Wake up thread if it was suspended; no-op otherwise
      }
      // The VM can handle all thread states
      stop0(new ThreadDeath());
  }

join():

join方法是等待该线程执行,直到超时或者终止,可以作为线程通信的一种方式,A线程调用B线程的join(阻塞),等待B完成后再往下执行。 join()方法中重载了多个方法,但是主要的方法是下面的方法。

public final synchronized void join(long millis)
   throws InterruptedException {
       //得到当前的系统给时间
       long base = System.currentTimeMillis();
       long now = 0;
       if (millis < 0) {
           throw new IllegalArgumentException("timeout value is negative");
       }
       if (millis == 0) {
           //如果是活跃的的
           while (isAlive()) {
           //无限期的等待
               wait(0);
           }
       } else {
           while (isAlive()) {
               long delay = millis - now;
               if (delay <= 0) {
                   break;
               }
               //有限期的进行等待
               wait(delay);
               now = System.currentTimeMillis() - base;
           }
       }
   }

interrupt():

public void interrupt() {
    if (this != Thread.currentThread())
        checkAccess();
    synchronized (blockerLock) {
        Interruptible b = blocker;
        if (b != null) {
            interrupt0();           // Just to set the interrupt flag
            b.interrupt(this);
            return;
        }
    }
    interrupt0();
}

interrupt()方法是中断当前的线程, 其实Thread类中有三个方法,比较容易混淆,在这里解释一下。

  • interrupt: 置为中断状态
  • isInterrupt: 线程是否中断
  • interrupted: 返回线程的上次的中断状态,并清除中断状态。

一般来说,阻塞函数:如Thread.sleep、Thread.join、Object.wait等在检查到线程的中断状态的时候,会抛出InteruptedExeption, 同时会清除线程的中断状态。

对于InterruptedException的处理,可以有两种情况:

外层代码可以处理这个异常,直接抛出这个异常即可
如果不能抛出这个异常,比如在run()方法内,因为在得到这个异常的同时,线程的中断状态已经被清除了,需要保留线程的中断状态,则需要调用Thread.currentThread().interrupt()

threadlocal

以下内容参考: https://juejin.cn/post/6844903552477822989 https://www.jianshu.com/p/9d289d33f696

threadLocal是每线程独有的

DlmQnP.png

每个Thread维护一个ThreadLocalMap,存储在ThreadLocalMap内的就是一个以Entry为元素的数组,Entry就是一个key-value结构,key为ThreadLocal,value为存储的值。类比HashMap的实现,其实就是每个线程借助于一个哈希表,存储线程独立的值。

这里ThreadLocal和key之间的线是虚线,因为Entry是继承了WeakReference实现的,当ThreadLocalRef销毁时,指向堆中ThreadLocal实例的唯一一条强引用消失了,只有Entry有一条指向ThreadLocal实例的弱引用,那么这里ThreadLocal实例是可以被GC掉的。这时Entry里的key为null了,但是enrty本身还是有一个强引用链:Thread Ref -> Thread -> ThreaLocalMap -> Entry -> value,如果线程迟迟没有死亡,那么永远无法回收,造成内存泄漏。下面会讲。

ThreadLocalMap用来存储用户的value,这个map的引用在Thread类里,是全线程唯一的。

static class ThreadLocalMap {
    // 注意虚引用
    static class Entry extends WeakReference<ThreadLocal<?>> {
        /** The value associated with this ThreadLocal. */
        Object value;

        Entry(ThreadLocal<?> k, Object v) {
            super(k);
            value = v;
        }
    }
    /**
     * The table, resized as necessary.
     * table.length MUST always be a power of two.
     */
    // 主要数据结构就是一个Entry的数组table;
    private Entry[] table;

    ...

几个方法一起说了:


public void set(T value) {
    // 获取当前的Thread对象,通过getMap获取Thread内的ThreadLocalMap
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
    }


public T get() {
    // 获取当前的Thread对象,通过getMap获取Thread内的ThreadLocalMap
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            // 如果map已经存在,以当前的ThreadLocal为键,获取Entry对象,并从从Entry中取出值
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        // 否则,调用setInitialValue进行初始化。
        return setInitialValue();
    }

ThreadLocalMap getMap(Thread t) {
        // ThreadLocalMap在当前线程被所有ThreadLocal共享
        return t.threadLocals;
}

void createMap(Thread t, T firstValue) {
    // 初始化map,构建table与Enrty
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}

private T setInitialValue() {
    // 首先是调用initialValue生成一个初始的value值,深入initialValue函数,我们可知它就是返回一个null;
    T value = initialValue();
    Thread t = Thread.currentThread();
    // 然后还是在get以下Map
    ThreadLocalMap map = getMap(t);
    if (map != null)
    // 如果map存在,则直接map.set
        map.set(this, value);
    else
    // 如果不存在则会调用createMap创建ThreadLocalMap
        createMap(t, value);
    return value;
}

每一个ThreadLocal对象是如何区分的:

    //java提供的,可以用原子方式更新的 int值的类。
    private static AtomicInteger nextHashCode = new AtomicInteger();
    private static final int HASH_INCREMENT = 0x61c88647;
    private final int threadLocalHashCode = nextHashCode();

    private static int nextHashCode() {
        //原子性加一
        return nextHashCode.getAndAdd(HASH_INCREMENT);
    }

对于每一个ThreadLocal对象,都有一个final的int值threadLocalHashCode;AtomicInteger 是static修饰的,全局唯一,每一次加一之后的值仍然可用,并且保证原子性。所以,每一个线程的ThreadLocal对象都有唯一的threadLocalHashCode值。

为什么不使用当前线程作为key?

上面知道,每一个线程周期,都有一个全线程唯一的map用于存储value,如果线程内多个ThreadLocal对象set了value,那么以当前线程作为键是不能保证key的唯一性的;而每一个ThreadLocal对象都可以由threadLocalHashCode进行唯一区分,所以key使用为ThreadLocal方便存取。

解决内存泄漏

private Entry getEntry(ThreadLocal<?> key) {
    // 首先是计算索引位置i
    int i = key.threadLocalHashCode & (table.length - 1);
    Entry e = table[i];
    if (e != null && e.get() == key)
    // 根据获取Entry,如果Entry存在且Entry的key恰巧等于ThreadLocal,那么直接返回Entry对象;
        return e;
    else
    // 否则,也就是在此位置上找不到对应的Entry,那么就调用getEntryAfterMiss。
        return getEntryAfterMiss(key, i, e);
}


private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
    Entry[] tab = table;
    int len = tab.length;

    while (e != null) {
        ThreadLocal<?> k = e.get();
        if (k == key)
        // 如果k==key,那么代表找到了这个所需要的Entry,直接返回;
            return e;
        if (k == null)
        // 如果k==null,那么证明这个Entry中key已经为null,那么这个Entry就是一个过期对象,这里调用expungeStaleEntry清理该Entry。
        // 这里解决了内存泄漏问题
        // ThreadLocal实例由于只有Entry中的一条弱引用指着,那么就会被GC掉,Entry的key没了,value可能会内存泄露的
        // 其实在每一个get,set操作时都会不断清理掉这种key为null的Entry的。
            expungeStaleEntry(i);
        else
            i = nextIndex(i, len);
        e = tab[i];
    }
    return null;
}

private int expungeStaleEntry(int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;

    // expunge entry at staleSlot
    // 这段主要是将i位置上的Entry的value设为null,Entry的引用也设为null,那么系统GC的时候自然会清理掉这块内存;
    tab[staleSlot].value = null;
    tab[staleSlot] = null;
    size--;

    // Rehash until we encounter null
    // 这段就是扫描位置staleSlot之后,null之前的Entry数组,清除每一个key为null的Entry,同时若是key不为空,做rehash,调整其位置。
    Entry e;
    int i;
    for (i = nextIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = nextIndex(i, len)) {
        ThreadLocal<?> k = e.get();
        if (k == null) {
            e.value = null;
            tab[i] = null;
            size--;
        } else {
            int h = k.threadLocalHashCode & (len - 1);
            if (h != i) {
                tab[i] = null;

                // Unlike Knuth 6.4 Algorithm R, we must scan until
                // null because multiple entries could have been stale.
                while (tab[h] != null)
                    h = nextIndex(h, len);
                tab[h] = e;
            }
        }
    }
    return i;
}

我们发现无论是set,get还是remove方法,过程中key为null的Entry都会被擦除,那么Entry内的value也就没有强引用链,GC时就会被回收。那么怎么会存在内存泄露呢?但是以上的思路是假设你调用get或者set方法了,很多时候我们都没有调用过,所以最佳实践就是*
1 .使用者需要手动调用remove函数,删除不再使用的ThreadLocal.
2 .还有尽量将ThreadLocal设置成private static的,这样ThreadLocal会尽量和线程本身一起消亡。

InheritableThreadLocal父子线程共享变量

InheritableThreadLocal的源码非常简单,继承自ThreadLocal,重写其中三个方法。

public class InheritableThreadLocal<T> extends ThreadLocal<T> {
    public InheritableThreadLocal() {
    }

    protected T childValue(T var1) {
        return var1;
    }

    ThreadLocalMap getMap(Thread var1) {
        return var1.inheritableThreadLocals;
    }

    void createMap(Thread var1, T var2) {
        var1.inheritableThreadLocals = new ThreadLocalMap(this, var2);
    }
}

InheritableThreadLocal获取的也是ThreadLocalMap, 解决父子线程共享变量的地方实际在thread类里,上文说过:

/* ThreadLocal values pertaining to this thread. This map is maintained
     * by the ThreadLocal class. */
    ThreadLocal.ThreadLocalMap threadLocals = null;

    /*
     * InheritableThreadLocal values pertaining to this thread. This map is
     * maintained by the InheritableThreadLocal class.
     */
    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;

private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize, AccessControlContext acc,
                      boolean inheritThreadLocals) {
        //..........
        Thread parent = currentThread();
        //..........
        // 如果父线程的inheritableThreadLocals != null
        if (inheritThreadLocals && parent.inheritableThreadLocals != null)
        // 新线程:`this.inheritableThreadLocals=ThreadLocal.createInheritedMap(parent.inheritableThreadLocals)`
        // 传入父线程的inheritableThreadLocals 
            this.inheritableThreadLocals =
                ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
        /* Stash the specified stack size in case the VM cares */
        this.stackSize = stackSize;

        /* Set thread ID */
        tid = nextThreadID();
    }
    static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
        return new ThreadLocalMap(parentMap);
    }

    // 实际上就是构造了一个新的包含所有parentMap元素的新map
    private ThreadLocalMap(ThreadLocalMap parentMap) {
        // 得到parent的数组
        Entry[] parentTable = parentMap.table;
        int len = parentTable.length;
        setThreshold(len);
        table = new Entry[len];

        for (int j = 0; j < len; j++) {
            Entry e = parentTable[j];
            if (e != null) {
                @SuppressWarnings("unchecked")
                ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
                if (key != null) {
                    // InheritableThreadLocal重写了这个方法返回原值
                    Object value = key.childValue(e.value);
                    Entry c = new Entry(key, value);
                    //计算hash位置,与ThreadLocal的set方法一样
                    int h = key.threadLocalHashCode & (len - 1);
                    while (table[h] != null)
                        h = nextIndex(h, len);
                    table[h] = c;
                    size++;
                }
            }   
        }
    }