LockSupport简介

LockSupport可以控制线程的状态,从而达到线程在等待唤醒之间切换的目的,并且不用担心阻塞和唤醒操作的顺序,但要注意连续多次唤醒的效果和一次唤醒是一样的。

注意:unpark 函数可以先于 park 调用。

【LockSupport与的区别】

  • LockSupport.park和unpark不需要在同步代码块中,wait和notify是需要的。
  • LockSupport的pork和unpark是针对线程的,而wait和notify是可以是任意对象。
  • LockSupport的unpark可以让指定线程被唤醒,但是notify是随机唤醒一个,notifyAll是全部唤醒,不够灵活。

park和unpark都是调用native方法,由JVM实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// java/util/concurrent/locks/LockSupport.java
public static void park() {
    U.park(false, 0L);
}

// jdk/internal/misc/Unsafe.java
@HotSpotIntrinsicCandidate
public native void park(boolean isAbsolute, long time);

// java/util/concurrent/locks/LockSupport.java
public static void unpark(Thread thread) {
    if (thread != null)
        U.unpark(thread);
}

// jdk/internal/misc/Unsafe.java
@HotSpotIntrinsicCandidate
public native void unpark(Object thread);

Parker

1
2
3
4
5
6
// src/hotspot/share/runtime/thread.hpp
  // JSR166 per-thread parker
 private:
  Parker*    _parker;
 public:
  Parker*     parker() { return _parker; }

线程内持有一个Parker类型的指针_parker,Parker类的定义里有个_counter变量:

1
2
3
4
5
6
public:
  Parker() : PlatformParker() {
    _counter       = 0 ;
    FreeNext       = NULL ;
    AssociatedWith = NULL ;
  }

unpark

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// src/hotspot/share/prims/unsafe.cpp
UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread)) {
  Parker* p = NULL;

  if (jthread != NULL) {
    ThreadsListHandle tlh;
    JavaThread* thr = NULL;
     // oop是oopDesc*的别名,oopDesc是object类的基类,是对Java对象的描述,便于在C++中访问对象中的域。
    oop java_thread = NULL;
      // 将线程引用转为Java线程对象
    (void) tlh.cv_internal_thread_to_JavaThread(jthread, &thr, &java_thread);
    if (java_thread != NULL) {
        // 尝试通过缓存的_parker的偏移量(_park_event_offset)来获取_parker的地址
      jlong lp = java_lang_Thread::park_event(java_thread);
      if (lp != 0) {
          // 将地址转为Parker指针
        p = (Parker*)addr_from_java(lp);
      } else {
          // 如果没有缓存_parker的偏移量(偏移量默认为0)
        if (thr != NULL) {
          p = thr->parker();
          if (p != NULL) {
              // 缓存_parker的偏移量
            java_lang_Thread::set_park_event(java_thread, addr_to_java(p));
          }
        }
      }
    }
  }

  if (p != NULL) {
    HOTSPOT_THREAD_UNPARK((uintptr_t) p);
      // 调用Parker的unpark方法
    p->unpark();
  }
} UNSAFE_END
1
2
3
4
5
6
7
// src/hotspot/share/classfile/javaClasses.cpp
jlong java_lang_Thread::park_event(oop java_thread) {
  if (_park_event_offset > 0) {
    return java_thread->long_field(_park_event_offset);
  }
  return 0;
}

oopDesc是object类的基类,是对Java对象的描述,便于在C++中访问对象中的域。上面的代码主要是利用_parker在线程对象内的偏移量来获取Parker对象_parker,并调用其unpark方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// src/hotspot/os/posix/os_posix.cpp
void Parker::unpark() {
  int status = pthread_mutex_lock(_mutex);
  assert_status(status == 0, status, "invariant");
  const int s = _counter;
  _counter = 1;
  // must capture correct index before unlocking
  int index = _cur_index;
  status = pthread_mutex_unlock(_mutex);
  assert_status(status == 0, status, "invariant");

// s记录的是unpark之前的_counter数,如果s < 1,说明有可能该线程在等待状态,需要唤醒。
  if (s < 1 && index != -1) {
    // 发信号唤醒线程
    status = pthread_cond_signal(&_cond[index]);
    assert_status(status == 0, status, "invariant");
  }
}

_cur_index 有3个值:-1,0,1,默认是-1。_cur_index代表被使用cond的index。

【pthread_mutex_unlock、pthread_cond_signal先后顺序】

解锁互斥量mutex和发出唤醒信号condition_signal是两个单独的操作,那么就存在一个顺序的问题。谁先随后可能会产生不同的结果。如下:

  1. 按照 condition_signal(); unlock(mutext)顺序,当等待线程被唤醒时,它试图锁住mutex,但是如果此时mutex还未解锁,则线程又进入睡眠,mutex成功解锁后,此线程在再次被唤醒并锁住mutex,从而从condition_wait()中返回。
  2. 按照 unlock(mutex); condition_signal()顺序, 当等待的线程被唤醒时,因为mutex已经解锁,因此被唤醒的线程很容易就锁住了mutex然后从conditon_wait()中返回了。

对于1,等待线程可能会发生2次的上下文切换,严重影响性能。可以使用wait morphing优化:如果线程被唤醒但是不能锁住mutex,则线程被转移(morphing)到互斥量mutex的等待队列中,避免了上下文的切换造成的开销。

对于2,是一种优化,不过可能引起的问题是线程的优先级倒置,实时系统对可预测性要求高,而普通的应用程序不是问题。

pthread简介

POSIX线程(POSIX threads),简称Pthreads,是线程的POSIX标准。该标准定义了创建和操纵线程的一整套API。在类Unix操作系统(Unix、Linux、Mac OS X等)中,都使用Pthreads作为操作系统的线程。

Pthreads API中的函数可以非正式的划分为三大类:

  • 线程管理(Thread management): 第一类函数直接用于线程:创建(creating),分离(detaching),连接(joining)等等。包含了用于设置和查询线程属性(可连接,调度属性等)的函数。
  • 互斥量(Mutexes): 第二类函数是用于线程同步的,称为互斥量(mutexes),是"mutual exclusion"的缩写。 Mutex函数提供了创建,销毁,锁定和解锁互斥量的功能。同时还包括了一些用于设定或修改互斥量属性的函数。
  • 条件变量(Condition variables):第三类函数处理共享一个互斥量的线程间的通信,基于程序员指定的条件。这类函数包括指定的条件变量的创建,销毁,等待和受信(signal)。设置查询条件变量属性的函数也包含其中。

条件变量

条件变量对于 wait 端:

  1. 必须与 mutex 一起使用,该布尔表达式的读写需受此 mutex 保护。
  2. 在 mutex 已上锁的时候才能调用 wait()。
  3. 把判断布尔条件和 wait() 放到 while 循环中。

对于 signal/broadcast 端:

  1. 不一定要在 mutex 已上锁的情况下调用 signal (理论上)。
  2. 在 signal 之前一般要修改布尔表达式。
  3. 修改布尔表达式通常要用 mutex 保护(至少用作 full memory barrier)。
  4. 注意区分 signal 与 broadcast:“broadcast 通常用于表明状态变化,signal 通常用于表示资源可用。(broadcast should generally be used to indicate state change rather than resource availability。)

总之,使用条件变量,调用 signal() 的时候无法知道是否已经有线程等待在 wait() 上。因此一般总是 要先修改“条件”,使其为 true,再调用 signal();这样 wait 线程先检查“条件”,只有当条件不成立时才去 wait(),避免了丢事件的可能 。换言之,通过使用“条件”,将边沿触发(edge trigger)改为电平触发(level trigger)。这里 “修改条件”和“检查条件”都必须在 mutex 保护下进行,而且这个 mutex 必须用于配合 wait()

park

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// src/hotspot/os/posix/os_posix.cpp
void Parker::park(bool isAbsolute, jlong time) {
    // 原子性地将_counter置为0,如果_counter原值大于0,说明已获得许可,直接返回
  if (Atomic::xchg(0, &_counter) > 0) return;

  Thread* thread = Thread::current();
  assert(thread->is_Java_thread(), "Must be JavaThread");
  JavaThread *jt = (JavaThread *)thread;

    // 如果线程被中断,直接返回
  if (Thread::is_interrupted(thread, false)) {
    return;
  }

  struct timespec absTime;
    // 如果isAbsolute为true,time表示等到某个时刻,否则time表示等待多久
    // 如果time不合法,或是要等到纪元时间时间(显然已过),则直接返回
  if (time < 0 || (isAbsolute && time == 0)) { // don't wait at all
    return;
  }
    // 将time统一转为绝对时刻
  if (time > 0) {
    to_abstime(&absTime, time, isAbsolute);
  }

    // 进入安全区
  ThreadBlockInVM tbivm(jt);

    // 再次判断线程是否被中断,如果没有被中断,尝试获得互斥锁,如果获取失败,直接返回
  if (Thread::is_interrupted(thread, false) ||
      pthread_mutex_trylock(_mutex) != 0) {
    return;
  }

  int status;
    // 再次检查_counter,如果>0,不需要等待
  if (_counter > 0)  { // no wait needed
      // 将_counter置为0,并释放锁
    _counter = 0;
    status = pthread_mutex_unlock(_mutex);
    assert_status(status == 0, status, "invariant");
      // 插入内存屏障,确保_counter的可见性
    OrderAccess::fence();
    return;
  }

   // 设置线程状态为CONDVAR_WAIT
  OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
  jt->set_suspend_equivalent();

  assert(_cur_index == -1, "invariant");
  if (time == 0) {
      // 如果time为0,表示不设等待超时
      // 让线程等待_cond[REL_INDEX]信号,线程进入等待状态
    _cur_index = REL_INDEX; // arbitrary choice when not timed
    status = pthread_cond_wait(&_cond[_cur_index], _mutex);
    assert_status(status == 0, status, "cond_timedwait");
  }
  else {
      // 线程进入有超时的等待
    _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
    status = pthread_cond_timedwait(&_cond[_cur_index], _mutex, &absTime);
    assert_status(status == 0 || status == ETIMEDOUT,
                  status, "cond_timedwait");
  }
    // 等待结束后,恢复变量,释放锁
  _cur_index = -1;

  _counter = 0;
  status = pthread_mutex_unlock(_mutex);
  assert_status(status == 0, status, "invariant");
    // 插入内存屏障
  OrderAccess::fence();

  if (jt->handle_special_suspend_equivalent_condition()) {
    jt->java_suspend_self();
  }
}

xchg

xchg 在x86平台是直接调用xchg指令:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// src/hotspot/os_cpu/linux_x86/atomic_linux_x86.hpp
template<>
template<typename T>
inline T Atomic::PlatformXchg<4>::operator()(T exchange_value,
                                             T volatile* dest,
                                             atomic_memory_order order) const {
  STATIC_ASSERT(4 == sizeof(T));
  __asm__ volatile (  "xchgl (%2),%0"
                    : "=r" (exchange_value)
                    : "0" (exchange_value), "r" (dest)
                    : "memory");
  return exchange_value;
}

内存屏障

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// src/hotspot/os_cpu/linux_x86/orderAccess_linux_x86.hpp
inline void OrderAccess::fence() {
   // always use locked addl since mfence is sometimes expensive
#ifdef AMD64
  __asm__ volatile ("lock; addl $0,0(%%rsp)" : : : "cc", "memory");
#else
  __asm__ volatile ("lock; addl $0,0(%%esp)" : : : "cc", "memory");
#endif
  compiler_barrier();
}

参考资料