Appendix-Low-Level-Concurrency
附录: 并发底层原理
尽管不建议你自己编写底层
Java 并发代码,但是这样通常有助于了解它是如何工作的。
并发编程 章节中介绍了一些用于高级并发的概念,包括为
在
什么是线程?
并发将程序划分成独立分离运行的任务。每个任务都由一个 执行线程 来驱动,我们通常将其简称为 线程 。而一个 线程 就是操作系统进程中单一顺序的控制流。因此,单个进程可以有多个并发执行的任务,但是你的程序使得每个任务都好像有自己的处理器一样。此线程模型为编程带来了便利,它简化了在单一程序中处理变戏法般的多任务过程。操作系统则从处理器上分配时间片到你程序的所有线程中。
Thread(线程) 是将任务关联到处理器的软件概念。虽然创建和使用
- 程序计数器,指明要执行的下一个
JVM 字节码指令。 - 用于支持
Java 代码执行的栈,包含有关此线程已到达当时执行位置所调用方法的信息。它也包含每个正在执行的方法的所有局部变量( 包括原语和堆对象的引用) 。每个线程的栈通常在64K 到1M 之间 1 。 - 第二个则用于
native code (本机方法代码)执行的栈 - thread-local variables (线程本地变量)的存储区域
- 用于控制线程的状态管理变量
包括 main()
在内的所有代码都会在某个线程内运行。 每当调用一个方法时,当前程序计数器被推到该线程的栈上,然后栈指针向下移动以足够来创建一个栈帧,其栈帧里存储该方法的所有局部变量,参数和返回值。所有基本类型变量都直接在栈上,虽然方法中创建(或方法中使用)对象的任何引用都位于栈帧中,但对象本身存于堆中。这仅且只有一个堆,被程序中所有线程所共享。
除此以外,线程必须绑定到操作系统,这样它就可以在某个时候连接到处理器。这是作为线程构建过程的一部分为你管理的。
最佳线程数
如果你查看第main()
方法的主线程,它巧妙地将其用作额外的并行流range()
方法中的上限值,你会看到没有创建额外的线程。这是为什么?
我们可以查出当前机器上处理器的数量:
// lowlevel/NumberOfProcessors.java
public class NumberOfProcessors {
public static void main(String[] args) {
System.out.println(
Runtime.getRuntime().availableProcessors());
}
}
/* Output:
8
*/
在我的机器上(使用英特尔酷睿
你的操作系统可能有办法来查出关于处理器的更多信息,例如,在
事实证明
定义了 “逻辑处理器” 数量的
我可以创建多少个线程?
Thread(线程)对象的最大部分是用于执行方法的
// lowlevel/ThreadSize.java
// {ExcludeFromGradle} Takes a long time or hangs
import java.util.concurrent.*;
import onjava.Nap;
public class ThreadSize {
static class Dummy extends Thread {
@Override
public void run() { new Nap(1); }
}
public static void main(String[] args) {
ExecutorService exec =
Executors.newCachedThreadPool();
int count = 0;
try {
while(true) {
exec.execute(new Dummy());
count++;
}
} catch(Error e) {
System.out.println(
e.getClass().getSimpleName() + ": " + count);
System.exit(0);
} finally {
exec.shutdown();
}
}
}
只要你不断递交任务,execute()
方法以开始任务,如果线程池无可用线程,则分配一个新线程。执行的暂停方法 pause()
运行时间必须足够长,使任务不会开始即完成
我并不总是能够在我尝试的每台机器上造成内存不足的错误。在一台机器上,我看到这样的结果
> java ThreadSize
OutOfMemoryError: 2816
我们可以使用
>java -Xss64K ThreadSize
OutOfMemoryError: 4952
如果我们将线程栈大小增加到
>java -Xss2M ThreadSize
OutOfMemoryError: 722
>java -Xss320K ThreadSize
OutOfMemoryError: 2816
你还可以使用
>java -Xss64K -Xmx5M ThreadSize
OutOfMemoryError: 5703
请注意的是操作系统还可能对允许的线程数施加限制。
因此
The WorkStealingPool ( 工作窃取线程池)
这是一个
// lowlevel/WorkStealingPool.java
import java.util.stream.*;
import java.util.concurrent.*;
class ShowThread implements Runnable {
@Override
public void run() {
System.out.println(
Thread.currentThread().getName());
}
}
public class WorkStealingPool {
public static void main(String[] args)
throws InterruptedException {
System.out.println(
Runtime.getRuntime().availableProcessors());
ExecutorService exec =
Executors.newWorkStealingPool();
IntStream.range(0, 10)
.mapToObj(n -> new ShowThread())
.forEach(exec::execute);
exec.awaitTermination(1, TimeUnit.SECONDS);
}
}
/* Output:
8
ForkJoinPool-1-worker-2
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-2
ForkJoinPool-1-worker-3
ForkJoinPool-1-worker-2
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-3
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-4
ForkJoinPool-1-worker-2
*/
工作窃取算法允许已经耗尽输入队列中的工作项的线程从其他队列“窃取”工作项。目标是在处理器之间分配工作项,从而最大限度地利用所有可用的处理器来完成计算密集型任务。这项算法也用于
异常捕获
这可能会让你感到惊讶:
// lowlevel/SwallowedException.java
import java.util.concurrent.*;
public class SwallowedException {
public static void main(String[] args)
throws InterruptedException {
ExecutorService exec =
Executors.newSingleThreadExecutor();
exec.submit(() -> {
throw new RuntimeException();
});
exec.shutdown();
}
}
这个程序什么也不输出(然而,如果你用submit()
方法,你就将会看到异常抛出。这说明在线程中抛出异常是很棘手的,需要特别注意的事情。
你无法捕获到从线程逃逸的异常。一旦异常越过了任务的 run()
方法,它就会传递至控制台,除非你采取特殊步骤来捕获此类错误异常。
下面是一个抛出异常的代码,该异常会传递到它的 run()
方法之外,而 main()
方法会显示运行它时会发生什么:
// lowlevel/ExceptionThread.java
// {ThrowsException}
import java.util.concurrent.*;
public class ExceptionThread implements Runnable {
@Override
public void run() {
throw new RuntimeException();
}
public static void main(String[] args) {
ExecutorService es =
Executors.newCachedThreadPool();
es.execute(new ExceptionThread());
es.shutdown();
}
}
/* Output:
___[ Error Output ]___
Exception in thread "pool-1-thread-1"
java.lang.RuntimeException
at ExceptionThread.run(ExceptionThread.java:8)
at java.util.concurrent.ThreadPoolExecutor.runW
orker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Work
er.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
*/
输出是
Exception in thread "pool-1-thread-1" RuntimeException
at ExceptionThread.run(ExceptionThread.java:9)
at ThreadPoolExecutor.runWorker(...)
at ThreadPoolExecutor$Worker.run(...)
at java.lang.Thread.run(Thread.java:745)
即使在 main()
方法体内包裹
// lowlevel/NaiveExceptionHandling.java
// {ThrowsException}
import java.util.concurrent.*;
public class NaiveExceptionHandling {
public static void main(String[] args) {
ExecutorService es =
Executors.newCachedThreadPool();
try {
es.execute(new ExceptionThread());
} catch(RuntimeException ue) {
// This statement will NOT execute!
System.out.println("Exception was handled!");
} finally {
es.shutdown();
}
}
}
/* Output:
___[ Error Output ]___
Exception in thread "pool-1-thread-1"
java.lang.RuntimeException
at ExceptionThread.run(ExceptionThread.java:8)
at java.util.concurrent.ThreadPoolExecutor.runW
orker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Work
er.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
*/
这会产生与前一个示例相同的结果
为解决这个问题,需要改变
当该线程即将死于未捕获的异常时,将自动调用 Thread.UncaughtExceptionHandler.uncaughtException()
方法。为了调用该方法,我们创建一个新的
// lowlevel/CaptureUncaughtException.java
import java.util.concurrent.*;
class ExceptionThread2 implements Runnable {
@Override
public void run() {
Thread t = Thread.currentThread();
System.out.println("run() by " + t.getName());
System.out.println(
"eh = " + t.getUncaughtExceptionHandler());
throw new RuntimeException();
}
}
class MyUncaughtExceptionHandler implements
Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("caught " + e);
}
}
class HandlerThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
System.out.println(this + " creating new Thread");
Thread t = new Thread(r);
System.out.println("created " + t);
t.setUncaughtExceptionHandler(
new MyUncaughtExceptionHandler());
System.out.println(
"eh = " + t.getUncaughtExceptionHandler());
return t;
}
}
public class CaptureUncaughtException {
public static void main(String[] args) {
ExecutorService exec =
Executors.newCachedThreadPool(
new HandlerThreadFactory());
exec.execute(new ExceptionThread2());
exec.shutdown();
}
}
/* Output:
HandlerThreadFactory@4e25154f creating new Thread
created Thread[Thread-0,5,main]
eh = MyUncaughtExceptionHandler@70dea4e
run() by Thread-0
eh = MyUncaughtExceptionHandler@70dea4e
caught java.lang.RuntimeException
*/
额外会在代码中添加跟踪机制,用来验证工厂对象创建的线程是否获得新
上面的示例根据具体情况来设置处理器。如果你知道你将要在代码中处处使用相同的异常处理器,那么更简单的方式是在
// lowlevel/SettingDefaultHandler.java
import java.util.concurrent.*;
public class SettingDefaultHandler {
public static void main(String[] args) {
Thread.setDefaultUncaughtExceptionHandler(
new MyUncaughtExceptionHandler());
ExecutorService es =
Executors.newCachedThreadPool();
es.execute(new ExceptionThread());
es.shutdown();
}
}
/* Output:
caught java.lang.RuntimeException
*/
只有在每个线程没有设置异常处理器时候,默认处理器才会被调用。系统会检查线程专有的版本,如果没有,则检查是否线程组中有专有的 uncaughtException()
方法;如果都没有,就会调用
可以将此方法与
资源共享
你可以将单线程程序看作一个孤独的实体,在你的问题空间中移动并同一时间只做一件事。因为只有一个实体,你永远不会想到两个实体试图同时使用相同资源的问题:问题犹如两个人试图同时停放在同一个空间,同时走过一扇门,甚至同时说话。
通过并发,事情不再孤单,但现在两个或更多任务可能会相互干扰。如果你不阻止这种冲突,你将有两个任务同时尝试访问同一个银行帐户,打印到同一个打印机,调整同一个阀门,等等。
资源竞争
当你启动一个任务来执行某些工作时,可以通过两种不同的方式捕获该工作的结果
从编程方式上看,副作用似乎更容易
伴随这种方式的问题是集合通常是共享资源。当运行多个任务时,任何任务都可能同时读写 共享资源 。这揭示了 资源竞争 问题,这是处理任务时的主要陷阱之一。
在单线程系统中,你不需要考虑资源竞争,因为你永远不可能同时做多件事。当你有多个任务时,你就必须始终防止资源竞争。
解决此问题的的一种方法是使用能够应对资源竞争的集合,如果多个任务同时尝试对此类集合进行写入,那么此类集合可以应付该问题。在
请思考以下的示例,其中一个任务负责生成偶数,其他任务则负责消费这些数字。在这里,消费者任务的唯一工作就是检查偶数的有效性。
我们将定义消费者任务next()
方法,以及可以取消它执行生成的方法。
// lowlevel/IntGenerator.java
import java.util.concurrent.atomic.AtomicBoolean;
public abstract class IntGenerator {
private AtomicBoolean canceled =
new AtomicBoolean();
public abstract int next();
public void cancel() { canceled.set(true); }
public boolean isCanceled() {
return canceled.get();
}
}
cancel()
方法改变isCanceled()
方法则告诉标志位是否设置。因为
任何
// lowlevel/EvenChecker.java
import java.util.*;
import java.util.stream.*;
import java.util.concurrent.*;
import onjava.TimedAbort;
public class EvenChecker implements Runnable {
private IntGenerator generator;
private final int id;
public EvenChecker(IntGenerator generator, int id) {
this.generator = generator;
this.id = id;
}
@Override
public void run() {
while(!generator.isCanceled()) {
int val = generator.next();
if(val % 2 != 0) {
System.out.println(val + " not even!");
generator.cancel(); // Cancels all EvenCheckers
}
}
}
// Test any IntGenerator:
public static void test(IntGenerator gp, int count) {
List<CompletableFuture<Void>> checkers =
IntStream.range(0, count)
.mapToObj(i -> new EvenChecker(gp, i))
.map(CompletableFuture::runAsync)
.collect(Collectors.toList());
checkers.forEach(CompletableFuture::join);
}
// Default value for count:
public static void test(IntGenerator gp) {
new TimedAbort(4, "No odd numbers discovered");
test(gp, 10);
}
}
test()
方法开启了许多访问同一个test()
方法会报告并返回。
依赖于generator.isCanceled()
返回值为run()
方法返回。 任何cancel()
,这会导致使用该
在本设计中,共享公共资源( IntGenerator )的任务会监视该资源的终止信号。这消除所谓的竞争条件,其中两个或更多的任务竞争响应某个条件并因此冲突或不一致结果的情况。
你必须仔细考虑并防止并发系统失败的所有可能途径。例如,一个任务不能依赖于另一个任务,因为任务关闭的顺序无法得到保证。这里,通过使任务依赖于非任务对象,我们可以消除潜在的竞争条件。
一般来说,我们假设 test()
方法最终失败,因为各个
// onjava/TimedAbort.java
// Terminate a program after t seconds
package onjava;
import java.util.concurrent.*;
public class TimedAbort {
private volatile boolean restart = true;
public TimedAbort(double t, String msg) {
CompletableFuture.runAsync(() -> {
try {
while(restart) {
restart = false;
TimeUnit.MILLISECONDS
.sleep((int)(1000 * t));
}
} catch(InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(msg);
System.exit(0);
});
}
public TimedAbort(double t) {
this(t, "TimedAbort " + t);
}
public void restart() { restart = true; }
}
我们使用runAsync()
静态方法执行。 runAsync()
方法的值会立即返回。 因此,
restart()
方法重启任务,在有某些有用的活动进行时保持程序打开。
我们可以看到正在运行的
// lowlevel/TestAbort.java
import onjava.*;
public class TestAbort {
public static void main(String[] args) {
new TimedAbort(1);
System.out.println("Napping for 4");
new Nap(4);
}
}
/* Output:
Napping for 4
TimedAbort 1.0
*/
如果你注释掉
我们将看到第一个next()
方法:
// lowlevel/EvenProducer.java
// When threads collide
// {VisuallyInspectOutput}
public class EvenProducer extends IntGenerator {
private int currentEvenValue = 0;
@Override
public int next() {
++currentEvenValue; // [1]
++currentEvenValue;
return currentEvenValue;
}
public static void main(String[] args) {
EvenChecker.test(new EvenProducer());
}
}
/* Output:
419 not even!
425 not even!
423 not even!
421 not even!
417 not even!
*/
[1] 一个任务有可能在另外一个任务执行第一个对currentEvenValue 的自增操作之后,但是没有执行第二个操作之前,调用 next()
方法。这将使这个值处于 “不恰当” 的状态。
为了证明这是可能发生的, EvenChecker.test()
创建了一组
多线程程序的部分问题是,即使存在
重要的是要注意到自增操作自身需要多个步骤,并且在自增过程中任务可能会被线程机制挂起
该示例程序并不总是在第一次非偶数产生时终止。所有任务都不会立即关闭,这是并发程序的典型特征。
解决资源竞争
前面的示例揭示了当你使用线程时的基本问题:你永远不知道线程哪个时刻运行。想象一下坐在一张桌子上,用叉子,将最后一块食物放在盘子上,当叉子到达时,食物突然消失…仅因为你的线程被挂起而另一个用餐者进来吃了食物了。这就是在编写并发程序时要处理的问题。为了使并发工作有效,你需要某种方式来阻止两个任务访问同一个资源,至少在关键时期是这样。
防止这种冲突的方法就是当资源被一个任务使用时,在其上加锁。第一个访问某项资源的任务必须锁定这项资源,使其他任务在其被解锁之前,就无法访问它,而在其被解锁时候,另一个任务就可以锁定并使用它,以此类推。如果汽车前排座位是受限资源,那么大喊着 “冲呀” 的孩子就会(在这次旅途过程中)获得该资源的锁。
为了解决线程冲突的问题,基本的并发方案将序列化访问共享资源。这意味着一次只允许一个任务访问共享资源。这通常是通过在访问资源的代码片段周围加上一个子句来实现的,该子句一次只允许一个任务访问这段代码。因为这个子句产生 互斥 效果,所以这种机制的通常称为是
考虑一下屋子里的浴室:多个人(即多个由线程驱动的任务)都希望能独立使用浴室(即共享资源
当浴室使用完毕,就是时候给其他任务进入,这时比喻就有点不准确了。事实上没有人排队,我们也不知道下一个使用浴室是谁,因为线程调度机制并不是确定性的。相反,就好像在浴室前面有一组被阻止的任务一样,当锁定浴室的任务解锁并出现时,线程调度机制将会决定下一个要进入的任务。
共享资源一般是以对象形式存在的内存片段,但也可以是文件、
通常你会将字段设为
synchronized void f() { /* ... */ }
synchronized void g() { /* ... */ }
所有对象都自动包含独立的锁(也称为f()
,对于同一个对象而言,就只能等到 f()
调用结束并释放了锁之后,其他任务才能调用 f()
和 g()
。所以,某个特定对象的所有
在使用并发时,将字段设为
一个线程可以获取对象的锁多次。如果一个方法调用在同一个对象上的第二个方法,而后者又在同一个对象上调用另一个方法,就会发生这种情况。
你应该什么时候使用同步呢?可以永远
如果你正在写一个变量,它可能接下来被另一个线程读取,或者正在读取一个上一次已经被另一个线程写过的变量,那么你必须使用同步,并且,读写线程都必须用相同的监视器锁同步。
如果在你的类中有超过一个方法在处理临界数据,那么你必须同步所有相关方法。如果只同步其中一个方法,那么其他方法可以忽略对象锁,并且可以不受惩罚地调用。这是很重要的一点:每个访问临界共享资源的方法都必须被同步,否则将不会正确地工作。
同步控制EventProducer
通过在
// lowlevel/SynchronizedEvenProducer.java
// Simplifying mutexes with the synchronized keyword
import onjava.Nap;
public class
SynchronizedEvenProducer extends IntGenerator {
private int currentEvenValue = 0;
@Override
public synchronized int next() {
++currentEvenValue;
new Nap(0.01); // Cause failure faster
++currentEvenValue;
return currentEvenValue;
}
public static void main(String[] args) {
EvenChecker.test(new SynchronizedEvenProducer());
}
}
/* Output:
No odd numbers discovered
*/
在两个自增操作之间插入 Nap()
构造器方法,以提高在next()
方法的任务将获得锁,任何试图获取锁的后续任务都将被阻塞,直到第一个任务释放锁。此时,调度机制选择另一个等待锁的任务。通过这种方式,任何时刻只能有一个任务通过互斥锁保护的代码。
volatile 关键字
使用
字分裂
当你的
在缺乏任何其他保护的情况下,用
可见性
第二个问题属于
出现这个问题是因为
每个线程都可以在处理器缓存中存储变量的本地副本。将字段定义为
- 该变量同时被多个任务访问。
- 这些访问中至少有一个是写操作。
- 你尝试避免同步 (在现代
Java 中,你可以使用高级工具来避免进行同步) 。
举个例子,如果你使用变量作为停止任务的标志值。那么该变量至少必须声明为
任务对其自身变量所做的任何写操作都始终对该任务可见,因此,如果只在任务中使用变量,你不需要使其变量声明为
如果单个线程对变量写入而其他线程只读取它,你可以放弃该变量声明为
重要的是要理解原子性和可见性是两个不同的概念。在非
同步也会让主内存刷新,所以如果一个变量完全由
重排与Happen-Before 原则
只要结果不会改变程序表现,
这项原则保证在
// lowlevel/ReOrdering.java
public class ReOrdering implements Runnable {
int one, two, three, four, five, six;
volatile int volaTile;
@Override
public void run() {
one = 1;
two = 2;
three = 3;
volaTile = 92;
int x = four;
int y = five;
int z = six;
}
}
例子中
什么时候使用volatile
对于
如果你尝试使用
如果你正在尝试调试其他人的并发代码,请首先查找使用
原子性
在
Goetz 测试:如果你可以编写用于现代微处理器的高性能JVM ,那么就有资格考虑是否可以避免同步4 。
了解原子性是很有用的,并且知道它与其他高级技术一起用于实现一些更加巧妙的
原子性可以应用于除
因为原子操作不能被线程机制中断。专家程序员可以利用这个来编写无锁代码(lock-free code
在多核处理器系统,相对于单核处理器而言,可见性问题远比原子性问题多得多。一个任务所做的修改,即使它们是原子性的,也可能对其他任务不可见(例如,修改只是暂时性存储在本地处理器缓存中
什么才属于原子操作时?对于属性中的值做赋值和返回操作通常都是原子性的,但是在
i++; // Might be atomic in C++
i += 2; // Might be atomic in C++
但是在
在
// lowlevel/NotAtomic.java
// {javap -c NotAtomic}
// {VisuallyInspectOutput}
public class NotAtomic {
int i;
void f1() { i++; }
void f2() { i += 3; }
}
/* Output:
Compiled from "NotAtomic.java"
public class NotAtomic {
int i;
public NotAtomic();
Code:
0: aload_0
1: invokespecial #1 // Method
java/lang/Object."<init>":()V
4: return
void f1();
Code:
0: aload_0
1: dup
2: getfield #2 // Field
i:I
5: iconst_1
6: iadd
7: putfield #2 // Field
i:I
10: return
void f2();
Code:
0: aload_0
1: dup
2: getfield #2 // Field
i:I
5: iconst_3
6: iadd
7: putfield #2 // Field
i:I
10: return
}
*/
每条指令都会产生一个 “get” 和 “put”,它们之间还有一些其他指令。因此在获取指令和放置指令之间,另有一个任务可能会修改这个属性,所有,这些操作不是原子性的。
让我们通过定义一个抽象类来测试原子性的概念,这个抽象类的方法是将一个整数类型进行偶数自增,并且 run()
不断地调用这个方法
// lowlevel/IntTestable.java
import java.util.function.*;
public abstract class
IntTestable implements Runnable, IntSupplier {
abstract void evenIncrement();
@Override
public void run() {
while(true)
evenIncrement();
}
}
getAsInt()
方法的函数式接口。
现在我们可以创建一个测试,它作为一个独立的任务启动 run()
方法 ,然后获取值来检查它们是否为偶数
// lowlevel/Atomicity.java
import java.util.concurrent.*;
import onjava.TimedAbort;
public class Atomicity {
public static void test(IntTestable it) {
new TimedAbort(4, "No failures found");
CompletableFuture.runAsync(it);
while(true) {
int val = it.getAsInt();
if(val % 2 != 0) {
System.out.println("failed with: " + val);
System.exit(0);
}
}
}
}
很容易盲目地应用原子性的概念。在这里,getAsInt()
似乎是安全的原子性方法:
// lowlevel/UnsafeReturn.java
import java.util.function.*;
import java.util.concurrent.*;
public class UnsafeReturn extends IntTestable {
private int i = 0;
public int getAsInt() { return i; }
public synchronized void evenIncrement() {
i++; i++;
}
public static void main(String[] args) {
Atomicity.test(new UnsafeReturn());
}
}
/* Output:
failed with: 79
*/
但是, Atomicity.test()
方法还是出现有非偶数的失败。尽管,返回getValue()
和 evenIncrement()
都必须同步
// lowlevel/SafeReturn.java
import java.util.function.*;
import java.util.concurrent.*;
public class SafeReturn extends IntTestable {
private int i = 0;
public synchronized int getAsInt() { return i; }
public synchronized void evenIncrement() {
i++; i++;
}
public static void main(String[] args) {
Atomicity.test(new SafeReturn());
}
}
/* Output:
No failures found
*/
只有并发编程专家有能力去尝试做像前面例子情况的优化;再次强调,请遵循
Josh 的序列号
作为第二个示例,考虑某些更简单的东西:创建一个产生序列号的类,灵感启发于nextSerialNumber()
都必须返回唯一值。
// lowlevel/SerialNumbers.java
public class SerialNumbers {
private volatile int serialNumber = 0;
public int nextSerialNumber() {
return serialNumber++; // Not thread-safe
}
}
我们在这里加入nextSerialNumber()
方法在不进行线程同步的情况下访问共享的可变变量值。
为了测试
// lowlevel/CircularSet.java
// Reuses storage so we don't run out of memory
import java.util.*;
public class CircularSet {
private int[] array;
private int size;
private int index = 0;
public CircularSet(int size) {
this.size = size;
array = new int[size];
// Initialize to a value not produced
// by SerialNumbers:
Arrays.fill(array, -1);
}
public synchronized void add(int i) {
array[index] = i;
// Wrap index and write over old elements:
index = ++index % size;
}
public synchronized boolean contains(int val) {
for(int i = 0; i < size; i++)
if(array[i] == val) return true;
return false;
}
}
add()
和 contains()
方法是线程同步的,以防止线程冲突。
The add() and contains() methods are synchronized to prevent thread collisions.
run()
方法。
// lowlevel/SerialNumberChecker.java
// Test SerialNumbers implementations for thread-safety
import java.util.concurrent.*;
import onjava.Nap;
public class SerialNumberChecker implements Runnable {
private CircularSet serials = new CircularSet(1000);
private SerialNumbers producer;
public SerialNumberChecker(SerialNumbers producer) {
this.producer = producer;
}
@Override
public void run() {
while(true) {
int serial = producer.nextSerialNumber();
if(serials.contains(serial)) {
System.out.println("Duplicate: " + serial);
System.exit(0);
}
serials.add(serial);
}
}
static void test(SerialNumbers producer) {
for(int i = 0; i < 10; i++)
CompletableFuture.runAsync(
new SerialNumberChecker(producer));
new Nap(4, "No duplicates detected");
}
}
test()
方法创建多个任务来竞争单独的
当我们测试基本的
// lowlevel/SerialNumberTest.java
public class SerialNumberTest {
public static void main(String[] args) {
SerialNumberChecker.test(new SerialNumbers());
}
}
/* Output:
Duplicate: 148044
*/
nextSerialNumber()
方法
// lowlevel/SynchronizedSerialNumbers.java
public class
SynchronizedSerialNumbers extends SerialNumbers {
private int serialNumber = 0;
public synchronized int nextSerialNumber() {
return serialNumber++;
}
public static void main(String[] args) {
SerialNumberChecker.test(
new SynchronizedSerialNumbers());
}
}
/* Output:
No duplicates detected
*/
读取和赋值原语应该是安全的原子操作。然后,正如在
原子类
下面,我们可以使用
// lowlevel/AtomicIntegerTest.java
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.*;
import onjava.*;
public class AtomicIntegerTest extends IntTestable {
private AtomicInteger i = new AtomicInteger(0);
public int getAsInt() { return i.get(); }
public void evenIncrement() { i.addAndGet(2); }
public static void main(String[] args) {
Atomicity.test(new AtomicIntegerTest());
}
}
/* Output:
No failures found
*/
现在,我们通过使用
下面使用
// lowlevel/AtomicEvenProducer.java
// Atomic classes: occasionally useful in regular code
import java.util.concurrent.atomic.*;
public class AtomicEvenProducer extends IntGenerator {
private AtomicInteger currentEvenValue =
new AtomicInteger(0);
@Override
public int next() {
return currentEvenValue.addAndGet(2);
}
public static void main(String[] args) {
EvenChecker.test(new AtomicEvenProducer());
}
}
/* Output:
No odd numbers discovered
*/
再次,使用
下面是一个使用
// lowlevel/AtomicSerialNumbers.java
import java.util.concurrent.atomic.*;
public class
AtomicSerialNumbers extends SerialNumbers {
private AtomicInteger serialNumber =
new AtomicInteger();
public int nextSerialNumber() {
return serialNumber.getAndIncrement();
}
public static void main(String[] args) {
SerialNumberChecker.test(
new AtomicSerialNumbers());
}
}
/* Output:
No duplicates detected
*/
这些都是对单一字段的简单示例; 当你创建更复杂的类时,你必须确定哪些字段需要保护,在某些情况下,你可能仍然最后在方法上使用
临界区
有时,你只是想防止多线程访问方法中的部分代码,而不是整个方法。要隔离的代码部分称为临界区,它使用我们用于保护整个方法相同的
synchronized(syncObject) {
// This code can be accessed
// by only one task at a time
}
这也被称为 同步控制块 (synchronized block
使用同步控制块而不是同步控制整个方法的主要动机是性能(有时,算法确实聪明,但还是要特别警惕来自并发性问题上的聪明method()
的计数并且发起一些任务来尝试竞争调用 method()
方法。
// lowlevel/SynchronizedComparison.java
// speeds up access.
import java.util.*;
import java.util.stream.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import onjava.Nap;
abstract class Guarded {
AtomicLong callCount = new AtomicLong();
public abstract void method();
@Override
public String toString() {
return getClass().getSimpleName() +
": " + callCount.get();
}
}
class SynchronizedMethod extends Guarded {
public synchronized void method() {
new Nap(0.01);
callCount.incrementAndGet();
}
}
class CriticalSection extends Guarded {
public void method() {
new Nap(0.01);
synchronized(this) {
callCount.incrementAndGet();
}
}
}
class Caller implements Runnable {
private Guarded g;
Caller(Guarded g) { this.g = g; }
private AtomicLong successfulCalls =
new AtomicLong();
private AtomicBoolean stop =
new AtomicBoolean(false);
@Override
public void run() {
new Timer().schedule(new TimerTask() {
public void run() { stop.set(true); }
}, 2500);
while(!stop.get()) {
g.method();
successfulCalls.getAndIncrement();
}
System.out.println(
"-> " + successfulCalls.get());
}
}
public class SynchronizedComparison {
static void test(Guarded g) {
List<CompletableFuture<Void>> callers =
Stream.of(
new Caller(g),
new Caller(g),
new Caller(g),
new Caller(g))
.map(CompletableFuture::runAsync)
.collect(Collectors.toList());
callers.forEach(CompletableFuture::join);
System.out.println(g);
}
public static void main(String[] args) {
test(new CriticalSection());
test(new SynchronizedMethod());
}
}
/* Output:
-> 243
-> 243
-> 243
-> 243
CriticalSection: 972
-> 69
-> 61
-> 83
-> 36
SynchronizedMethod: 249
*/
method()
的次数。method
方法,而method
方法的一部分代码。这样,耗时的method()
有多少。
请记住,使用同步控制块是有风险;它要求你确切知道同步控制块外的非同步代码是实际上要线程安全的。
method()
方法(并报告调用次数)的任务。为了构建这个时间周期,我们会使用虽然有点过时但仍然可以很好地工作的
test()
方法接收一个method()
方法的锁。
你通常会看到从一次运行到下一次运行的输出变化。结果表明, method()
方法。这通常是使用
在其他对象上同步
有时必须在另一个对象上同步,但是如果你要这样做,就必须确保所有相关的任务都是在同一个任务上同步的。下面的示例演示了当对象中的方法在不同的锁上同步时,两个任务可以同时进入同一对象:
// lowlevel/SyncOnObject.java
// Synchronizing on another object
import java.util.*;
import java.util.stream.*;
import java.util.concurrent.*;
import onjava.Nap;
class DualSynch {
ConcurrentLinkedQueue<String> trace =
new ConcurrentLinkedQueue<>();
public synchronized void f(boolean nap) {
for(int i = 0; i < 5; i++) {
trace.add(String.format("f() " + i));
if(nap) new Nap(0.01);
}
}
private Object syncObject = new Object();
public void g(boolean nap) {
synchronized(syncObject) {
for(int i = 0; i < 5; i++) {
trace.add(String.format("g() " + i));
if(nap) new Nap(0.01);
}
}
}
}
public class SyncOnObject {
static void test(boolean fNap, boolean gNap) {
DualSynch ds = new DualSynch();
List<CompletableFuture<Void>> cfs =
Arrays.stream(new Runnable[] {
() -> ds.f(fNap), () -> ds.g(gNap) })
.map(CompletableFuture::runAsync)
.collect(Collectors.toList());
cfs.forEach(CompletableFuture::join);
ds.trace.forEach(System.out::println);
}
public static void main(String[] args) {
test(true, false);
System.out.println("****");
test(false, true);
}
}
/* Output:
f() 0
g() 0
g() 1
g() 2
g() 3
g() 4
f() 1
f() 2
f() 3
f() 4
****
f() 0
g() 0
f() 1
f() 2
f() 3
f() 4
g() 1
g() 2
g() 3
g() 4
*/
DualSync.f()
方法(通过同步整个方法)在g()
方法有一个在test()
方法中运行的两个调用 f()
和 g()
方法的独立任务演示了这一点。f()
和 g()
是否应该在其Nap()
方法。例如,当g()
,反之亦然。
使用显式锁对象
// lowlevel/MutexEvenProducer.java
// Preventing thread collisions with mutexes
import java.util.concurrent.locks.*;
import onjava.Nap;
public class MutexEvenProducer extends IntGenerator {
private int currentEvenValue = 0;
private Lock lock = new ReentrantLock();
@Override
public int next() {
lock.lock();
try {
++currentEvenValue;
new Nap(0.01); // Cause failure faster
++currentEvenValue;
return currentEvenValue;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
EvenChecker.test(new MutexEvenProducer());
}
}
/*
No odd numbers discovered
*/
next()
中使用 lock()
和 unlock()
方法创建一个临界区。当你使用Lock()
之后,你必须放置unlock()
方法
尽管
一般来说,当你使用
// lowlevel/AttemptLocking.java
// Locks in the concurrent library allow you
// to give up on trying to acquire a lock
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import onjava.Nap;
public class AttemptLocking {
private ReentrantLock lock = new ReentrantLock();
public void untimed() {
boolean captured = lock.tryLock();
try {
System.out.println("tryLock(): " + captured);
} finally {
if(captured)
lock.unlock();
}
}
public void timed() {
boolean captured = false;
try {
captured = lock.tryLock(2, TimeUnit.SECONDS);
} catch(InterruptedException e) {
throw new RuntimeException(e);
}
try {
System.out.println(
"tryLock(2, TimeUnit.SECONDS): " + captured);
} finally {
if(captured)
lock.unlock();
}
}
public static void main(String[] args) {
final AttemptLocking al = new AttemptLocking();
al.untimed(); // True -- lock is available
al.timed(); // True -- lock is available
// Now create a second task to grab the lock:
CompletableFuture.runAsync( () -> {
al.lock.lock();
System.out.println("acquired");
});
new Nap(0.1); // Give the second task a chance
al.untimed(); // False -- lock grabbed by task
al.timed(); // False -- lock grabbed by task
}
}
/* Output:
tryLock(): true
tryLock(2, TimeUnit.SECONDS): true
acquired
tryLock(): false
tryLock(2, TimeUnit.SECONDS): false
*/
untimed()
方法那样。而在 timed()
方法中,则尝试获取可能在main()
方法中,一个单独的线程被匿名类所创建,并且它会获得锁,因此让 untimed()
和 timed()
方法有东西可以去竞争。
显式锁比起内置同步锁提供更细粒度的加锁和解锁控制。这对于实现专门的同步并发结构,比如用于遍历链表节点的 交替锁
库组件
在本节中,我们将看一些使用不同组件的示例,然后讨论一下
DelayQueue
这是一个无界阻塞队列 ( BlockingQueue poll()
将返回
下面是一个示例,其中的
// lowlevel/DelayQueueDemo.java
import java.util.*;
import java.util.stream.*;
import java.util.concurrent.*;
import static java.util.concurrent.TimeUnit.*;
class DelayedTask implements Runnable, Delayed {
private static int counter = 0;
private final int id = counter++;
private final int delta;
private final long trigger;
protected static List<DelayedTask> sequence =
new ArrayList<>();
DelayedTask(int delayInMilliseconds) {
delta = delayInMilliseconds;
trigger = System.nanoTime() +
NANOSECONDS.convert(delta, MILLISECONDS);
sequence.add(this);
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(
trigger - System.nanoTime(), NANOSECONDS);
}
@Override
public int compareTo(Delayed arg) {
DelayedTask that = (DelayedTask)arg;
if(trigger < that.trigger) return -1;
if(trigger > that.trigger) return 1;
return 0;
}
@Override
public void run() {
System.out.print(this + " ");
}
@Override
public String toString() {
return
String.format("[%d] Task %d", delta, id);
}
public String summary() {
return String.format("(%d:%d)", id, delta);
}
public static class EndTask extends DelayedTask {
EndTask(int delay) { super(delay); }
@Override
public void run() {
sequence.forEach(dt ->
System.out.println(dt.summary()));
}
}
}
public class DelayQueueDemo {
public static void
main(String[] args) throws Exception {
DelayQueue<DelayedTask> tasks =
Stream.concat( // Random delays:
new Random(47).ints(20, 0, 4000)
.mapToObj(DelayedTask::new),
// Add the summarizing task:
Stream.of(new DelayedTask.EndTask(4000)))
.collect(Collectors
.toCollection(DelayQueue::new));
while(tasks.size() > 0)
tasks.take().run();
}
}
/* Output:
[128] Task 12 [429] Task 6 [551] Task 13 [555] Task 2
[693] Task 3 [809] Task 15 [961] Task 5 [1258] Task 1
[1258] Task 20 [1520] Task 19 [1861] Task 4 [1998] Task
17 [2200] Task 8 [2207] Task 10 [2288] Task 11 [2522]
Task 9 [2589] Task 14 [2861] Task 18 [2868] Task 7
[3278] Task 16 (0:4000)
(1:1258)
(2:555)
(3:693)
(4:1861)
(5:961)
(6:429)
(7:2868)
(8:2200)
(9:2522)
(10:2207)
(11:2288)
(12:128)
(13:551)
(14:2589)
(15:809)
(16:3278)
(17:1998)
(18:2861)
(19:1520)
(20:1258)
*/
getDelay()
, 该方法用来告知延迟到期有多长时间,或者延迟在多长时间之前已经到期了。这个方法强制我们去使用System.nanoTime()
产生的时间则是以纳秒为单位的。你可以转换
NANOSECONDS.convert(delta, MILLISECONDS);
在 getDelay()
中, 所希望的单位是作为
为了排序, compareTo()
从输出中可以看到,任务创建的顺序对执行顺序没有任何影响
PriorityBlockingQueue
这是一个很基础的优先级队列,它具有可阻塞的读取操作。在下面的示例中, take()
时会显示多个选项,
在
// lowlevel/PriorityBlockingQueueDemo.java
import java.util.*;
import java.util.stream.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import onjava.Nap;
class Prioritized implements Comparable<Prioritized> {
private static AtomicInteger counter =
new AtomicInteger();
private final int id = counter.getAndIncrement();
private final int priority;
private static List<Prioritized> sequence =
new CopyOnWriteArrayList<>();
Prioritized(int priority) {
this.priority = priority;
sequence.add(this);
}
@Override
public int compareTo(Prioritized arg) {
return priority < arg.priority ? 1 :
(priority > arg.priority ? -1 : 0);
}
@Override
public String toString() {
return String.format(
"[%d] Prioritized %d", priority, id);
}
public void displaySequence() {
int count = 0;
for(Prioritized pt : sequence) {
System.out.printf("(%d:%d)", pt.id, pt.priority);
if(++count % 5 == 0)
System.out.println();
}
}
public static class EndSentinel extends Prioritized {
EndSentinel() { super(-1); }
}
}
class Producer implements Runnable {
private static AtomicInteger seed =
new AtomicInteger(47);
private SplittableRandom rand =
new SplittableRandom(seed.getAndAdd(10));
private Queue<Prioritized> queue;
Producer(Queue<Prioritized> q) {
queue = q;
}
@Override
public void run() {
rand.ints(10, 0, 20)
.mapToObj(Prioritized::new)
.peek(p -> new Nap(rand.nextDouble() / 10))
.forEach(p -> queue.add(p));
queue.add(new Prioritized.EndSentinel());
}
}
class Consumer implements Runnable {
private PriorityBlockingQueue<Prioritized> q;
private SplittableRandom rand =
new SplittableRandom(47);
Consumer(PriorityBlockingQueue<Prioritized> q) {
this.q = q;
}
@Override
public void run() {
while(true) {
try {
Prioritized pt = q.take();
System.out.println(pt);
if(pt instanceof Prioritized.EndSentinel) {
pt.displaySequence();
break;
}
new Nap(rand.nextDouble() / 10);
} catch(InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
public class PriorityBlockingQueueDemo {
public static void main(String[] args) {
PriorityBlockingQueue<Prioritized> queue =
new PriorityBlockingQueue<>();
CompletableFuture.runAsync(new Producer(queue));
CompletableFuture.runAsync(new Producer(queue));
CompletableFuture.runAsync(new Producer(queue));
CompletableFuture.runAsync(new Consumer(queue))
.join();
}
}
/* Output:
[15] Prioritized 2
[17] Prioritized 1
[17] Prioritized 5
[16] Prioritized 6
[14] Prioritized 9
[12] Prioritized 0
[11] Prioritized 4
[11] Prioritized 12
[13] Prioritized 13
[12] Prioritized 16
[14] Prioritized 18
[15] Prioritized 23
[18] Prioritized 26
[16] Prioritized 29
[12] Prioritized 17
[11] Prioritized 30
[11] Prioritized 24
[10] Prioritized 15
[10] Prioritized 22
[8] Prioritized 25
[8] Prioritized 11
[8] Prioritized 10
[6] Prioritized 31
[3] Prioritized 7
[2] Prioritized 20
[1] Prioritized 3
[0] Prioritized 19
[0] Prioritized 8
[0] Prioritized 14
[0] Prioritized 21
[-1] Prioritized 28
(0:12)(2:15)(1:17)(3:1)(4:11)
(5:17)(6:16)(7:3)(8:0)(9:14)
(10:8)(11:8)(12:11)(13:13)(14:0)
(15:10)(16:12)(17:12)(18:14)(19:0)
(20:2)(21:0)(22:10)(23:15)(24:11)
(25:8)(26:18)(27:-1)(28:-1)(29:16)
(30:11)(31:6)(32:-1)
*/
与前面的示例一样,
无锁集合
集合 章节强调集合是基本的编程工具,这也要求包含并发性。因此,早期的集合比如
无锁集合有一个有趣的特性:只要读取者仅能看到已完成修改的结果,对集合的修改就可以同时发生在读取发生时。这是通过一些策略实现的。为了让你了解它们是如何工作的,我们来看看其中的一些。
复制策略
使用“复制”策略,修改是在数据结构一部分的单独副本(或有时是整个数据的副本)上进行的,并且在整个修改过程期间这个副本是不可见的。仅当修改完成时,修改后的结构才与“主”数据结构安全地交换,然后读取者才会看到修改。
在
比较并交换(CAS)
在 比较并交换
如果内存仅轻量竞争,
最重要的是,许多现代处理器的汇编语言中都有一条
本章小结
本附录主要是为了让你在遇到底层并发代码时能对此有一定的了解,尽管本文还远没对这个主题进行全面的讨论。为此,你需要先从阅读由
以下是并发编程的步骤
- 不要使用它。想一些其他方法来使你写的程序变的更快。
- 如果你必须使用它,请使用在 并发编程
- parallel Streams and CompletableFutures 中展示的现代高级工具。 - 不要在任务间共享变量,在任务之间必须传递的任何信息都应该使用
Java.util.concurrent 库中的并发数据结构。 - 如果必须在任务之间共享变量,请使用
java.util.concurrent.atomic 里面其中一种类型,或在任何直接或间接访问这些变量的方法上应用synchronized 。 当你不这样做时,很容易被愚弄,以为你已经把所有东西都包括在内。 说真的,尝试使用步骤3 。 - 如果步骤
4 产生的结果太慢,你可以尝试使用volatile 或其他技术来调整代码,但是如果你正在阅读本书并认为你已经准备好尝试这些方法,那么你就超出了你的深度。 返回步骤#1。
通常可以只使用
-
在某些平台上,特别是
Windows ,默认值可能非常难以查明。你可以使用-Xss 标志调整堆栈大小。 ↩︎ -
引自
Brian Goetz, Java Concurrency in Practice 一书的作者, 该书由Brian Goetz, Tim Peierls, Joshua Bloch, Joseph Bowbeer, David Holmes, and Doug Lea 联合著作(Addison-Wesley 出版社, 2006) 。↩ ↩︎ -
请注意,在
64 位处理器上可能不会发生这种情况,从而消除了这个问题。 ↩︎ -
这个测试的推论是
, “如果某人表示线程是容易并且简单的,请确保这个人没有对你的项目做出重要的决策。如果那个人已经做出,那么你就已经陷入麻烦之中了。 ” ↩︎ -
这在即将产生的
C++ 的标准中得到了补救。 ↩︎