avatar

目录
Java多线程基础

Java多线程基础

### 线程的出现

  • 线程可以认为是轻量级的进程,所以线程的创建、销毁比进程更快
  • 在多核CPU中,利用多线程可以实现真正意义的并行执行(单核cpu中线程是通过cpu时间片不断切换执行的,在任意时刻只会有一个线程会被cpu调度)
  • 在一个应用进程中,会存在多个同时执行的任务,如果其中一个任务被阻塞,将会导致其他任务也会被阻塞。通过对不同任务创建不同的线程去处理,可以提升程序的实时性

线程生命周期(6种状态)

NEW、RUNNABLE、BLOCKED、WAITING、TIME_WAITING、TERMINATED(详情见Thread源码)

  • NEW:初始状态,线程,但是还没有调用线程的start方法
  • RUNNABLE:运行状态,JAVA把操作系统中线程的就绪和运行两种状态统一称为“运行中”
  • BLOCKED:阻塞状态,表示线程进入等待状态,线程因为某种原因放弃了CPU使用权,阻塞分以下几种情况
    • 等待阻塞:运行的线程执行wait方法(只有获得锁,才能执行wait方法),jvm会把当前的线程放入到等待队列
    • 同步阻塞:运行的线程在获取对象的同步锁时,如果该同步锁被其他线程占用了,那么jvm会把当前线程放入到锁池中
    • 其他阻塞:运行的线程执行了Thread.sleep或者join方法,或者发出了IO请求时,jvm会帮当前线程设置为阻塞状态,当sleep结束、join线程终止、io处理完毕,线程恢复
    • TIME_WAITING:超时等待状态,超时后自动返回
    • TERMINATED:终止状态,表示当前线程执行完毕
java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
/**
* A thread state. A thread can be in one of the following states:
* <ul>
* <li>{@link #NEW}<br>
* A thread that has not yet started is in this state.
* </li>
* <li>{@link #RUNNABLE}<br>
* A thread executing in the Java virtual machine is in this state.
* </li>
* <li>{@link #BLOCKED}<br>
* A thread that is blocked waiting for a monitor lock
* is in this state.
* </li>
* <li>{@link #WAITING}<br>
* A thread that is waiting indefinitely for another thread to
* perform a particular action is in this state.
* </li>
* <li>{@link #TIMED_WAITING}<br>
* A thread that is waiting for another thread to perform an action
* for up to a specified waiting time is in this state.
* </li>
* <li>{@link #TERMINATED}<br>
* A thread that has exited is in this state.
* </li>
* </ul>
*
* <p>
* A thread can be in only one state at a given point in time.
* These states are virtual machine states which do not reflect
* any operating system thread states.
*
* @since 1.5
* @see #getState
*/
public enum State {
/**
* Thread state for a thread which has not yet started.
*/
NEW,

/**
* Thread state for a runnable thread. A thread in the runnable
* state is executing in the Java virtual machine but it may
* be waiting for other resources from the operating system
* such as processor.
*/
RUNNABLE,

/**
* Thread state for a thread blocked waiting for a monitor lock.
* A thread in the blocked state is waiting for a monitor lock
* to enter a synchronized block/method or
* reenter a synchronized block/method after calling
* {@link Object#wait() Object.wait}.
*/
BLOCKED,

/**
* Thread state for a waiting thread.
* A thread is in the waiting state due to calling one of the
* following methods:
* <ul>
* <li>{@link Object#wait() Object.wait} with no timeout</li>
* <li>{@link #join() Thread.join} with no timeout</li>
* <li>{@link LockSupport#park() LockSupport.park}</li>
* </ul>
*
* <p>A thread in the waiting state is waiting for another thread to
* perform a particular action.
*
* For example, a thread that has called <tt>Object.wait()</tt>
* on an object is waiting for another thread to call
* <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
* that object. A thread that has called <tt>Thread.join()</tt>
* is waiting for a specified thread to terminate.
*/
WAITING,

/**
* Thread state for a waiting thread with a specified waiting time.
* A thread is in the timed waiting state due to calling one of
* the following methods with a specified positive waiting time:
* <ul>
* <li>{@link #sleep Thread.sleep}</li>
* <li>{@link Object#wait(long) Object.wait} with timeout</li>
* <li>{@link #join(long) Thread.join} with timeout</li>
* <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
* <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
* </ul>
*/
TIMED_WAITING,

/**
* Thread state for a terminated thread.
* The thread has completed execution.
*/
TERMINATED;
}

查看线程的状态及jvm内存监控可以参考以下命令:

  • jps:查看本机的Java中进程信息

  • jstack:打印线程的栈信息,制作线程Dump(jstack pid<进程id>)

  • jmap:打印内存映射,制作堆Dump(jmap -dump:format-b,file=保存文件路径)

  • jstat:性能监控工具

  • jhat:内存分析工具 jhat -port file<dump文件>

  • jconsole:简易的可视化控制台

  • jvisualvm:功能强大的控制台

线程的创建

  • 继承Thread类(本质是对Runnable接口的实现)
  • 实现Runnable接口
  • 使用ExecutorService
  • Callable、Future(带返回值)

启动线程的唯一方法就是通过Thread类的start方法(srart方法是一个native方法,它会启动一个新的线程,并执行run方法)

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/** Thread class source code */
public class Thread implements Runnable {
/* Make sure registerNatives is the first thing <clinit> does. */
private static native void registerNatives();
static {
registerNatives();
}

/* What will be run. */
private Runnable target;

public Thread(Runnable target) {
init(null, target, "Thread-" + nextThreadNum(), 0);
}

public Thread(Runnable target, String name) {
init(null, target, name, 0);
}

public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();

/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this);

boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
/** native method */
private native void start0();

@Override
public void run() {
if (target != null) {
target.run();
}
}
}

jvm线程创建、启动源码

打开OpenJDK / jdk8 / jdk8 / jdk源码,找到文件->src/share/native/java/lang/Thread.c @ 2362:00cd9dc3c2b5,可以看到java中Thread类中调用的本地方法,在虚拟机jvm.cpp文件中的定义,以及映射关系,本地方法通过JNI技术暴露给外部调用。

Thread.c文件源码:

c++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include "jni.h"
#include "jvm.h"
#include "java_lang_Thread.h"

#define THD "Ljava/lang/Thread;"
#define OBJ "Ljava/lang/Object;"
#define STE "Ljava/lang/StackTraceElement;"
#define ARRAY_LENGTH(a) (sizeof(a)/sizeof(a[0]))

static JNINativeMethod methods[] = {
{"start0", "()V", (void *)&JVM_StartThread},
{"stop0", "(" OBJ ")V", (void *)&JVM_StopThread},
{"isAlive", "()Z", (void *)&JVM_IsThreadAlive},
{"suspend0", "()V", (void *)&JVM_SuspendThread},
{"resume0", "()V", (void *)&JVM_ResumeThread},
{"setPriority0", "(I)V", (void *)&JVM_SetThreadPriority},
{"yield", "()V", (void *)&JVM_Yield},
{"sleep", "(J)V", (void *)&JVM_Sleep},
{"currentThread", "()" THD, (void *)&JVM_CurrentThread},
{"countStackFrames", "()I", (void *)&JVM_CountStackFrames},
{"interrupt0", "()V", (void *)&JVM_Interrupt},
{"isInterrupted", "(Z)Z", (void *)&JVM_IsInterrupted},
{"holdsLock", "(" OBJ ")Z", (void *)&JVM_HoldsLock},
{"getThreads", "()[" THD, (void *)&JVM_GetAllThreads},
{"dumpThreads", "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
};

#undef THD
#undef OBJ
#undef STE

JNIEXPORT void JNICALL
Java_java_lang_Thread_registerNatives(JNIEnv *env, jclass cls)
{
(*env)->RegisterNatives(env, cls, methods, ARRAY_LENGTH(methods));
}

/src/share/vm/prims/jvm.cpp源码(线程启动):

c++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_StartThread");
JavaThread *native_thread = NULL;

// We cannot hold the Threads_lock when we throw an exception,
// due to rank ordering issues. Example: we might need to grab the
// Heap_lock while we construct the exception.
bool throw_illegal_thread_state = false;

// We must release the Threads_lock before we can post a jvmti event
// in Thread::start.
{
// Ensure that the C++ Thread and OSThread structures aren't freed before
// we operate.
MutexLocker mu(Threads_lock);

// Since JDK 5 the java.lang.Thread threadStatus is used to prevent
// re-starting an already started thread, so we should usually find
// that the JavaThread is null. However for a JNI attached thread
// there is a small window between the Thread object being created
// (with its JavaThread set) and the update to its threadStatus, so we
// have to check for this
if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL) {
throw_illegal_thread_state = true;
} else {
// We could also check the stillborn flag to see if this thread was already stopped, but
// for historical reasons we let the thread detect that itself when it starts running

jlong size =
java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
// Allocate the C++ Thread structure and create the native thread. The
// stack size retrieved from java is signed, but the constructor takes
// size_t (an unsigned type), so avoid passing negative values which would
// result in really large stacks.
size_t sz = size > 0 ? (size_t) size : 0;
native_thread = new JavaThread(&thread_entry, sz);

// At this point it may be possible that no osthread was created for the
// JavaThread due to lack of memory. Check for this situation and throw
// an exception if necessary. Eventually we may want to change this so
// that we only grab the lock if the thread was created successfully -
// then we can also do this check and throw the exception in the
// JavaThread constructor.
if (native_thread->osthread() != NULL) {
// Note: the current thread is not being used within "prepare".
native_thread->prepare(jthread);
}
}
}

if (throw_illegal_thread_state) {
THROW(vmSymbols::java_lang_IllegalThreadStateException());
}

assert(native_thread != NULL, "Starting null thread?");

if (native_thread->osthread() == NULL) {
// No one should hold a reference to the 'native_thread'.
delete native_thread;
if (JvmtiExport::should_post_resource_exhausted()) {
JvmtiExport::post_resource_exhausted(
JVMTI_RESOURCE_EXHAUSTED_OOM_ERROR | JVMTI_RESOURCE_EXHAUSTED_THREADS,
"unable to create new native thread");
}
THROW_MSG(vmSymbols::java_lang_OutOfMemoryError(),
"unable to create new native thread");
}

Thread::start(native_thread);

JVM_END

从源码native_thread = new JavaThread(&thread_entry, sz);可以看出,本地线程native_thread是通过创建JAVA线程来创建的,通过Thread::start(native_thread)来启动线程,最终通过操作系统来创建线程。

/src/share/vm/runtime/thread.cpp源码:

c++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void Thread::start(Thread* thread) {
trace("start", thread);
// Start is different from resume in that its safety is guaranteed by context or
// being called from a Java method synchronized on the Thread object.
if (!DisableStartThread) {
if (thread->is_Java_thread()) {
// Initialize the thread state to RUNNABLE before starting this thread.
// Can not set it after the thread started because we do not know the
// exact thread state at that time. It could be in MONITOR_WAIT or
// in SLEEPING or some other state.
java_lang_Thread::set_thread_status(((JavaThread*)thread)->threadObj(),
java_lang_Thread::RUNNABLE);
}
os::start_thread(thread);
}
}

jvm中断线程方法

/src/share/vm/prims/jvm.cpp源码,jvm最后调用Thread.cpp中的is_interrupted方法去中断某个线程

c++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_Interrupt");

// Ensure that the C++ Thread and OSThread structures aren't freed before we operate
oop java_thread = JNIHandles::resolve_non_null(jthread);
MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
// We need to re-resolve the java_thread, since a GC might have happened during the
// acquire of the lock
JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
if (thr != NULL) {
Thread::interrupt(thr);
}
JVM_END

JVM_QUICK_ENTRY(jboolean, JVM_IsInterrupted(JNIEnv* env, jobject jthread, jboolean clear_interrupted))
JVMWrapper("JVM_IsInterrupted");

// Ensure that the C++ Thread and OSThread structures aren't freed before we operate
oop java_thread = JNIHandles::resolve_non_null(jthread);
MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
// We need to re-resolve the java_thread, since a GC might have happened during the
// acquire of the lock
JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
if (thr == NULL) {
return JNI_FALSE;
} else {
return (jboolean) Thread::is_interrupted(thr, clear_interrupted != 0);
}
JVM_END

/src/share/vm/runtime/thread.cpp源码,最终是调用操作系统的is_interrupted方法中断线程

c++
1
2
3
4
5
6
7
8
9
10
11
12
13
void Thread::interrupt(Thread* thread) {
trace("interrupt", thread);
debug_only(check_for_dangling_thread_pointer(thread);)
os::interrupt(thread);
}

bool Thread::is_interrupted(Thread* thread, bool clear_interrupted) {
trace("is_interrupted", thread);
debug_only(check_for_dangling_thread_pointer(thread);)
// Note: If clear_interrupted==false, this simply fetches and
// returns the value of the field osthread()->interrupted().
return os::is_interrupted(thread, clear_interrupted);
}

/src/os/linux/vm/os_linux.cpp源代码(这里已linux操作系统平台为例),通过调用OSThread类的set_interrupted(true),将成员变量_interrupted设置为true,来设置线程中断,默认为false

c++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
void os::interrupt(Thread* thread) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");

OSThread* osthread = thread->osthread();

if (!osthread->interrupted()) {
osthread->set_interrupted(true);
// More than one thread can get here with the same value of osthread,
// resulting in multiple notifications. We do, however, want the store
// to interrupted() to be visible to other threads before we execute unpark().
OrderAccess::fence();
ParkEvent * const slp = thread->_SleepEvent ;
if (slp != NULL) slp->unpark() ;
}

// For JSR166. Unpark even if interrupt status already was set
if (thread->is_Java_thread())
((JavaThread*)thread)->parker()->unpark();

ParkEvent * ev = thread->_ParkEvent ;
if (ev != NULL) ev->unpark() ;

}

bool os::is_interrupted(Thread* thread, bool clear_interrupted) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");

OSThread* osthread = thread->osthread();

bool interrupted = osthread->interrupted();

if (interrupted && clear_interrupted) {
osthread->set_interrupted(false);
// consider thread->_SleepEvent->reset() ... optional optimization
}

return interrupted;
}

/src/share/vm/runtime/osThread.hpp源代码

c++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class OSThread: public CHeapObj<mtThread> {
friend class VMStructs;
private:
/** 省略其他代码 */
volatile ThreadState _state; // Thread state *hint*
volatile jint _interrupted; // Thread.isInterrupted state

// Note: _interrupted must be jint, so that Java intrinsics can access it.
// The value stored there must be either 0 or 1. It must be possible
// for Java to emulate Thread.currentThread().isInterrupted() by performing
// the double indirection Thread::current()->_osthread->_interrupted.

// Methods
public:
void set_state(ThreadState state) { _state = state; }
ThreadState get_state() { return _state; }

/** 省略其他代码 */

volatile bool interrupted() const { return _interrupted != 0; }
void set_interrupted(bool z) { _interrupted = z ? 1 : 0; }

/** ....... 剩余代码省略 */
}

线程Interrupte之后调用Thread.interrupted()复位

Thread.java

java
1
2
3
4
5
6
7
8
Thread.interrupted();  // 复位,回到初始状态

public static boolean interrupted() {
return currentThread().isInterrupted(true);
}

// 本地方法,参考上面已经提到的jvm线程中断源码
private native boolean isInterrupted(boolean ClearInterrupted);

中断一个处于阻塞状态(sleep/wait/join/…)的线程,会先复位,再抛出异常(为什么会抛出异常可以参考jvm.cpp文件中的JVM_Sleep方法)

c++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
JVM_ENTRY(void, JVM_Sleep(JNIEnv* env, jclass threadClass, jlong millis))
JVMWrapper("JVM_Sleep");

if (millis < 0) {
THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
}

if (Thread::is_interrupted (THREAD, true) && !HAS_PENDING_EXCEPTION) {
THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
}

// Save current thread state and restore it at the end of this block.
// And set new thread state to SLEEPING.
JavaThreadSleepState jtss(thread);

#ifndef USDT2
HS_DTRACE_PROBE1(hotspot, thread__sleep__begin, millis);
#else /* USDT2 */
HOTSPOT_THREAD_SLEEP_BEGIN(
millis);
#endif /* USDT2 */

EventThreadSleep event;

if (millis == 0) {
// When ConvertSleepToYield is on, this matches the classic VM implementation of
// JVM_Sleep. Critical for similar threading behaviour (Win32)
// It appears that in certain GUI contexts, it may be beneficial to do a short sleep
// for SOLARIS
if (ConvertSleepToYield) {
os::yield();
} else {
ThreadState old_state = thread->osthread()->get_state();
thread->osthread()->set_state(SLEEPING);
os::sleep(thread, MinSleepInterval, false);
thread->osthread()->set_state(old_state);
}
} else {
ThreadState old_state = thread->osthread()->get_state();
thread->osthread()->set_state(SLEEPING);
if (os::sleep(thread, millis, true) == OS_INTRPT) {
// An asynchronous exception (e.g., ThreadDeathException) could have been thrown on
// us while we were sleeping. We do not overwrite those.
if (!HAS_PENDING_EXCEPTION) {
if (event.should_commit()) {
event.set_time(millis);
event.commit();
}
#ifndef USDT2
HS_DTRACE_PROBE1(hotspot, thread__sleep__end,1);
#else /* USDT2 */
HOTSPOT_THREAD_SLEEP_END(
1);
#endif /* USDT2 */
// TODO-FIXME: THROW_MSG returns which means we will not call set_state()
// to properly restore the thread state. That's likely wrong.
THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
}
}
thread->osthread()->set_state(old_state);
}
if (event.should_commit()) {
event.set_time(millis);
event.commit();
}
#ifndef USDT2
HS_DTRACE_PROBE1(hotspot, thread__sleep__end,0);
#else /* USDT2 */
HOTSPOT_THREAD_SLEEP_END(
0);
#endif /* USDT2 */
JVM_END

实际应用

  • 线程池
  • zookeeper责任链,异步化任务
  • 跑批脚本、对账文件

并发编程相关文章持续更新中…

在这里插入图片描述

文章作者: Super Wein(星痕)
文章链接: https://superspeedone.github.io/2020/03/12/Java%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B%E5%9F%BA%E7%A1%80/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Super .Wein
打赏
  • 微信
    微信
  • 支付寶
    支付寶

评论