UP | HOME

java-concurrency-in-practice

Table of Contents

Chapter 1: Introduction

  • 线程是JAVA无法回避的重要特性,它可以帮助我们把复杂的异步程序变成简单的几行代码
  • 另外,线程是最容易利用多核威力的方法,随着cpu核心数的增加, 提高并发效率就变的 尤为重要

A (Very) Brief History of Concurrency

  • 在计算机的原始时代, 是没有操作系统的, 在计算机上一个程序从头跑到尾,而且能动 用计算机所有的资源.这种系统,不仅仅是写程序麻烦,而且每次只能运行一个程序,对于 计算机资源来说,是极大的浪费
  • 操作系统的引入,让两个以上的程序同时允许成为了可能:
    • 程序都被操作系统设计成一个进程, 进程之间的资源是操作系统分配的, 相互之间是 独立的,相互隔离的
    • 如果两个进程之间想相互通信, 那么可以通过以下几种方式:
      1. sockets
      2. signal handlers
      3. shared memory
      4. semaphores
      5. file
  • 操作系统设计成多个程序(运行起来才叫进程)可以同时运行的初衷有以下几点:
    1. 资源的利用: 程序可能会时常需要等待外部的输入输出, 在等待的时候,如果能够做 一些其他的事情,有助于资源更好的利用
    2. 公平: 多个用户或者多个程序在同时使用一台机器的时候, 都有平等的使用资源的 权利, 这种平等的实现应该是通过时间片的等额分割,而不是说某个用户的一个程序 执行完,然后让另外一个用户执行其他程序
    3. 方便:写多个程序,每个简单的功能,然后让多个程序合作,比起写一个"多功能"的 程序要方便的多
  • 早期的共享时间的系统当中, 每个进程都是一个虚拟的"冯诺依曼计算机":
    • 每个进程都有自己的内存空间, 包括数据和指令(代码)
    • 顺序执行当前的指令, 对于每一个指令来说,永远存在"下一条"指令, 指令的指定 要么是事先写好的,要么逻辑控制的
    • 通过操作系统指定的一系列的IO原语与外界进行联系
  • 线性的编程模型是自然的而直观的, 因为人类活动就和它很像:每次只做一件事情, 大部分情况下,顺序执行,举个例子:
    1. 起床
    2. 穿好衣服
    3. 下楼
    4. 打开茶叶盒,放入茶杯茶叶
    5. 查看是否有足够的热水:
    6. if 有足够的热水, 开始泡茶
    7. else 没有足够热水, 开始烧水
  • 上面7)是烧水,在烧水的空间,你可以选择烤面包,或者看报纸. 烤面包机和烧水壶这种 设备知道自己经常会被"异步使用",所以它们会发出声音来提示使用者
  • 找到"线性"工作和"异步"使用的平衡是一个高效人士的要务,这对程序来说也是一样的
  • 发明进程的几个原因(资源利用, 公平, 方便)也催生出了线程的发明.线程的特点有:
    1. 允许多个程序控制流在一个进程里面同时存在:
    2. 这多个线程共享进程的资源:内存,文件描述符;但是这些线程拥有自己的程序计数器, stack,和局部变量
    3. 多个线程在同一时间可以被调度到不同的CPU,而达到共同运行的目的
  • Thread在很多时候也叫做lightweight process, 很多操作系统会把线程作为调度的最 基本单位,而不是进程
  • 因为线程之间是共享'他们的进程'的内存的,所以一个线程可能改了某个内存数据,会影 响另外线程的工作

Benefits of Threads

  • 如果运用的得当,线程可以减少开发和维护的费用,提高复杂系统的性能.
  • 线程是一种通过把"线性"模式转换成"异步"模式,来提高效率的方法
  • 线程在GUI应用中开业大幅度的提高用户借口的响应率,在服务器开发中可以提高资源 的利用率和输出, 而且简化了JVM的GC(JVM的GC就是在某几个特定的thread里面进行的)

Exploiting Multiple Processors

  • 由于处理器频率的提高越来越难,CPU提供商越来越倾向于在一个chip上面放置更多的 processor core.
  • 因为调度的最小单位是线程,所以如果一个程序只有一个线程,那么每次它只能运行在 一个核上. 如果这个CPU是双核的,那么它放弃了50%的计算力,如果是100核,那就是99%…
  • 即便是在单核的CPU上面,多线程也是可以提高效率的,比如一个程序是单线程的,那么 如果遇到了I/O,操作,那么唯一的线程就得busy waiting了. 如果这个程序还有其他 线程B,那么可以在线程A等待I/O的空隙里面,被调度到,然后运转. (就好像,一边看着 报纸,一边等待水开)

Simplicity of Modeling

  • 生活中我们有这样的体验, 如果专心做一件事情,比如修复12个bug,通常要比干很多 乱七八糟的事情(比如,同时修复bug,面试,给自己的下属打分)要来的复杂的多
  • 线程就是这样一种架构: 在程序内部,把不同的事情分开,每种事情写到一块,相互尽量 不打扰
  • 我们说到的这些好处,通常是各种框架帮我们完成的.框架一般要处理各种细节:相应要求, 创建线程,处理负载,等等. 而servlet用户完全不用关心这些东西 #TODO#

Simplified Handling of Asynchronous Events

  • 作为一个server端程序的话,如果只有一个线程,那么如果和外界connect成功了的话,如果 当前线程不结束,后续的其他connect也就被block住了,这显然是不对的.
  • 所以:如果server坚持single-thread,'多进程'工作的话,那么会有两种办法:
    • non-blocking I/O技术:不让socket block
    • 轮询socket:*nix上面的select, poll等:socke block了就处理其他的socket
  • 当然还可以每个connection都有一个thread, '多线程'的工作但是由于历史原因, 操 作系统对于每个进程能创建的最大线程是有限制的,顶多几百个.所以server还大部分 是single-thread的
  • 但是,新的OS开始逐渐增加对每个process里面拥有的thread个数, 不久的将来,每个connection 都有一个thread就会变成可能.

More Responsive User Interfaces

  • GUI应用曾经是单线程的(Emacs现在还是…), 也就是说,你要么频繁的应付相应输入操 作,要么在间接的在"main event loop"里面执行代码. 如果"main event loop"里面 的代码执行时间过长,那么用户界面就会看起来"卡", 因为只有前面的代码返回以后,用户 的操作才能进行.
  • 现代的GUI框架,都使用了EDT(event dispatch thread),也就当用户按一个按钮的时候, 一个相应的thread会运行.

Risks of Threads

  • java内置的对线程的支持是一把双刃剑,这提高了对程序员的要求, 一旦使用多线程, 就要明白什么是线程安全

Safety Hazards

  • 下面这段代码就是线程不安全的
    package org.hfeng.jcip.ch1.unsafe;
    
    import org.apache.http.annotation.NotThreadSafe;
    
    @NotThreadSafe
    public class UnSafeSequence {
        private int value;
    
        public int getNext() {
            return value++;
        }
    
        public UnSafeSequence() {
            this.value = 0;
        }
    }
    
  • 代码线程不安全的原因,是存在下面一种调用过程(非常巧合是吧..对的,就是非常巧, 也就是说线程的错误不是每次都能重现的)让两个线程调用getNext函数,却得到了同 样的值(本来这个函数是希望返回值都是独一无二的)
           +--------+        +--------+        +--------+
    A      | value  +------->| 9+1    +------->+ value  |
           |   9    |        | = 10   |        |  10    |
           +--------+        +--------+        +--------+
    
    
                    +--------+        +--------+        +--------+
    B      -------->| value  +------->| 9+1    +------->+ value  |
                    |   9    |        | = 10   |        |  10    |
                    +--------+        +--------+        +--------+
    
  • UnsafeSequence描述了一个常见的concurrency hazard, 叫做"race condition"
  • 因为所有的线程共享内存,而且同时运行, 他们就可能更改其他用户正在使用的数据, 这本来是一个巨大的方便之处,因为比起其他的进程间通信, 这种使用数据的方式明 显容易很多. 但是这样也会存在前面说到的你无法知道当前变量是否正被使用. 这个 时候,就要java提供的同步方法(synchronization mechanism)
    package org.hfeng.jcip.ch1.safe;
    
    import org.apache.http.annotation.GuardedBy;
    import org.apache.http.annotation.ThreadSafe;
    
    @ThreadSafe
    public class SafeSequence {
        @GuardedBy("this") private int nextValue;
    
        public synchronized int getNext() {
            return nextValue++;
        }
    
        public SafeSequence() {
            nextValue = 0;
        }
    }
    
  • 如果没有这些同步方法(synchronization mechanism), 编译器,硬件会被赋予相当 大的自由, 比如会缓存当前变量到寄存器(这个还好,全局可见), 或者缓存变量到某个 核(processor-local)的cache,这就麻烦了,因为如果线程不在这个核上运行的,根本 看不到这个cache. 这些操作原本是为了提高程序的运行效率,但是这些操心无法让多个 线程安全的共享数据
  • @GuardedBy简言之就是为class加了一个锁,必须得到这个锁才可以继续访问class的 内容. 也就是利用了mutual exclusion来做到thread safe,另外三个thread safe的 方法是:
    • re-entrancy
    • Atomic operations
    • Thread-local storage

Liveness Hazards

  • 前面说的safety意味着"nothing bad ever happens", 而这里的liveness意味着"something good eventually happens".
  • 在single-threaded程序里面,safety不能保证liveness(我们想做的肯定能做到), 比 如while(true)这种循环.
  • 但是,在多线程程序里面, safety不能保证liveness的情况就更多了, 比如:
    • deadlock: Thread(A)有lock(X),还需要lock(Y), Thread(B)有lock(Y),还需要lock(X).
    • starvation: 别的threads总是来拿到lock,然后释放,再拿到再释放,当前thread总 是拿不到lock
    • livelock: 两个人在狭窄的走道上碰面了,都想'先左后右'的变换位置,结果虽然两 个人都在不停运动,但是还是没有什么进展

Performance Hazards

  • 前面说的liveness意味着"something good eventually happens", 这个eventually有时候 远远不够,因为我们想好事情快快来.也就是说我们要提高多线程程序的效率
  • 效率这个东西涉及到很多:
    • 服务时间
    • 响应率
    • 输出
    • 资源消耗
    • 可扩展性
  • 就像liveness一样,多线程程序的效率的难题不仅仅来自传统的问题,还来自于多线程 的引入带来的问题
  • 在理想的情况下,利用多核系统带来的都是净收益,但这明显不现实的.线程多的情况下, 上下文切换更加频繁, 也意味着更多的资源消耗:
    • 保存上下文
    • CPU调度
    • 使用syn, 无法享受编译器优化, 无法享受缓存

Threads are Everywhere

  • 即便你没有显示创建过线程, 框架可能就为你创建过thread,而这些thread调用的代码 必须线程安全
  • 每一个Java程序都使用了线程
    1. 当JVM开始的时候,会创建houskeeping task 线程负责回收, 还有一个main 线程来运行main方法.
    2. AWT(Abstract Windows Tookit) 创建了一个线程来管理用户事件
    3. Timer创建了一个线程来管理被拒绝的任务
    4. Servlet创建了线程池,并且调用这些线程里面的方法
  • 如果你是使用了前面的这些工具, 就需要了解线程安全, 因为"框架会使用线程来调用 你的代码"
  • 比如Java设计了servlet这种server端的framework来处理来自client的请求,而多个 client(可以认为是thread)很可能是同时访问servlet的,所以servlet必须是thread-safe 的.

Chapter 2: Thread Safety

  • 有点令人出乎意料的是, 并发编程并不是和'线程或者锁'有很大关系.就像工程学也并不是 和'铆钉,工字梁'有很大关系(not … any more than表示前后都否定: Concurrent programming isn't so much about thread or locks, any more than civil engineering is about rivets and I-beams)
  • 建造大桥,肯定要用很多的'铆钉,工字梁', 写多线程代码,也要用很多的'线程或者锁', 但这 只是表象和机制.
  • 写出线程安全代码更重的是管理"状态的准入"(accesss to state), 特别是对共享的(Shared state) 和可变的(Mutable state)状态的访问
  • 一般来说,一个对象的状态就是它的数据(存储在instance或者static field).
  • 某些情况下,一个对象的状态有时候也会和其他对象有关,这要看是哪种数据结构:比如 HashMap的状态一部分存在hashmap objec里面,另一部分存在成员Map.Entry里面
  • 比如下面的例子, hm里面肯定存有一部分状态,但是同时foo1,foo2里面也有,因 为java的hashmap其实存储的不过是一个reference, 下面例子中,通过1, hashmap 只能找到foo1这个object, foo的val值是多少,hashmap不会存储的
    public class TestCode {
        public static void main(String[] args) {
            HashMap<Integer, Foo> hm = new HashMap<Integer, Foo>();
            Foo foo = new Foo(15);
            Foo foo2 = new Foo(35);
            hm.put(1, foo);
            hm.put(2, foo2);
    
            for (Map.Entry<Integer, Foo> e : hm.entrySet()) {
                System.out.println(e.getKey() + " " + e.getValue());
            }
            foo.setVal(115);
            System.out.println(hm.entrySet());
        }
    }
    
    class Foo{
        public void setVal(int val) {
            this.val = val;
        }
    
        private int val;
        public Foo(int v) {
            val = v;
        }
    
        public String toString() {
            return "" + val;
        }
    }
    
    ////////////////////////////////////////////////////
    // <===================OUTPUT===================> //
    // 1 15                                           //
    // 2 35                                           //
    // [1=115, 2=35]                                  //
    ////////////////////////////////////////////////////
    
  • 所谓"共享",是说一个变量可以被多个线程访问到,所谓"可变性"是说一个变量的值是可 以改变的.所谓线程安全,就是要防止"可变"的数据被"多个线程无法控制的并发访问"
  • 一个对象是否需要线程安全,不是看这个对象是"做什么的", 而是看他"怎么被使用"(是 否是被多个线程访问到)
  • 把一个对象做成thread-safe的话,就要求使用synchronization手段来使得"多个thread" 的访问"相互协调"
  • 我们这里说的"synchronization手段"包括如下形式:
    • synchronized关键字:其实就是'互斥锁'
    • volatile关键字
    • explicit lock
    • atomic variable
  • 如果多个线程访问同一个数据导致错误, 那么可以有如下几个方法来改正:
    • 不要在线程直接分享state
    • 把state标记成immutable
    • 在所有能够访问到state的地方,使用synchronization
  • 上面的建议看起来简单,但是如果你一开始设计的时候没有考虑到线程安全,那么改起来 确实是不那么容易,所以最好的方法,就是在你设计一个class的时候,就把他设计成线程 安全的
  • 把一个class设计成thread-safe,比你后来改写代码来控制对这个class的兵法访问,要 容易的多.
  • 在一个大型的系统上面,确认一个变量是否被多个线程访问,是非常困难的.幸运的是,面向 对象的很多设计方法能够帮助我们设计出更好,更容易维护的class. 比如封装和数据掩盖 可以帮助你更容易的设计线程安全:越少的代码能够接触到某些代码, 越容易保证使用的 同步性.

What is Thread Safety:

  • google下来thread-safe的定义一般是"一个类可以被多个thread访问,并且不需要在调 用类的代码里面做任何特殊的处理"
  • 这种定义非常模糊,因为"可以安全被多个thread"访问就是thread-safe,那么"thread-不 safe"是什么样子? safe是怎么个safe呢?都没有说清楚
  • 我们先定义correctness:"某个class如果是single-thread的话能够按照specification" 的要求,正确的运行.那么就是correct single-thread process
  • 如果一个类能在多个线程同时调用,且不管运行时如何调度其他线程, 都可以在调用代 码不用附加任何同步机制的情况下, 行为正常的运行, 那么就是thread-safe
    A class is thread-safe if it behaves correctly when accessed
    from multiple threads, regardless of the scheduling or interlevaing
    of the execution of thosed threads by the runtime environment, and
    with no additional synchronization other coordination on the part
    of the calling code
    

Example: A Statelses Servlet

  • 第一章我们说过, 有些框架会创建线程然后从这些线程里面,调用你的代码, 从而要 求你自己的代码也是线程安全的. 其实很多时候,线程安全的要求,并非是你想要使用 线程,而是你想要使用某些功能,比如Servlet framework
  • 下面这个例子,从request接受请求,factor这个请求,然后放到response传回去
    @ThreadSafe
    public class StatelessFactorizer implements Servlet {
        public void service(ServletRequest req, ServletResponse resp) {
            BigInteger i = extractFromRequest(req);
            BigInteger[] factors = factor(i);
            encodeIntoResponse(resp, factors);
        }
    }
    
  • StatelessFactorizer, 和大多数的servlet一样是没有状态的(stateless):
    • 自己没有任何的数据
    • 也不引用任何其他类的数据
  • StatelessFactorizer拥有短暂的state, 局部变量, 但是这些局部变量是在线程 自己的stack上面的,只对当前运行的线程可见.线程A访问StatelessFactorizer不 会影响线程B访问StatelessFactorizer,因为他们之间不会共享state.
  • 因为一个线程访问stateless的对象不会影响其他线程的正确运行,所以,
    stateless的对象总是线程安全的
    

Atomicity

  • 如果我们想加入一个变量来计算当前的处理的请求要求.那么一个常见的错误 加变量的方法:
    @NotThreadSafe
    public class UnsafeCountingFactorizer implements Servlet {
        private long count = 0;
    
        public long getCount() {
            return count;
        }
    
        public void service(ServletRequest req, ServletRequest resp) {
            BigInteger i = extractFromRequest(req);
            BigInteger[] factors = factor(i);
            ++count;
            encodeIntoResponse(resp, factors);
        }
    }
    
  • 这种例子有个别名叫做read-modify-write的race condition:因为这个动作其实是三个 动作的合体:
    1. fetch the current value
    2. add one to it
    3. write the new value back
  • atomicity的原则就是消灭上面的"三步走",把操作变成"一步走",就可以达到thread-safe 的目的
  • 第一章的例子已经讲到, 如果调度非常不幸运的话,会出现两个线程返回值相同的情况, 或许你觉得,极端返回一个相同的计数,这种低概率的精度错误可以忽略,那你可就打错特 错了. 如果这个计数是用来产生序列,或者对象标识的话,那么从不同的调用处得到相同 的这种标识(或者序列)会导致数据一致性的多种问题,最常见的就是race condition

Race Conditions

  • 当一个正确的结果,需要依赖幸运的时机掌握的时候(不完全靠自己就行), 竞争环境就 会发生
  • 最常见的一种race condition就是check-then-act(要观察下当前的情况,然后做 决定):
    • 某天你和X约好去大学城旁边的地铁站见面
    • 到了你发现有两家星巴克A和星巴克B
    • 你去星巴克A, 没发现X, 去星巴克B, 又没发现X. 然后去A,然后去B…非常繁忙, 但是还是找不到X,这时候存在很多种情况:
      1. 你朋友就没来
      2. 你朋友刚才才星巴克B,当你去星巴克B的时候,他已经去星巴克A啊.
    • 有可能这个下午你可能都见不到你的朋友,因为你的策略是:
      1. 去某个星巴克,发现X不在check
      2. 然后去另外一个星巴克then-act
    • 问题的根源在于[去某个星巴克, 发现X不在], 这个过程只是"当时正确",一旦你离开 你的朋友X可能就来了. 这个观察是无效的,换句话说就是, "无效的观察,导致了大多 数的race condition"

Example: Race Conditions in Lazy Initialization

  • 另外一个使用check-then-act的例子是lazy initialization: getInstance首先 测算一下ExpensiveObject是不是已经创建了,如果还没有创建, 就创建,如果已经创建了 就返回存在的object
    @NotThreadSafe
    public class LazyInitRace {
        private ExpensiveObject instance = null;
    
        public ExpensiveObject getInstance() {
            if (instance == null) {
                instance = new ExpensiveObject();
            }
        }
    }
    
  • LazyInitRace也存在race condition: 线程A和B同时调用getInstance, A看到的是 null, 然后创建ExpensiveObject, 恰巧在创建的同时(unluck timing)B也看到的是 null, 然后也创建ExpensiveObject.所以不同的调用者会收到不同的对象

Compound Actions

  • LazyInitRace 和 UnsafeCountingFactorizer 的数据都需要一种原子性的操作,数据 更改的过程变的不可分: 保证其他线程观察或者修改state的时候,要么是在我们开始之前, 要么是在我们修改之后, 而不是在这两者之间
  • 为了保证线程安全,check-then-act(lazy initialization)和read-modify-write(increment) 操作都必须是原子性的.
  • 我们把check-then-act和read-modify-write这种必须通过原子性来保证线程 安全的操作叫做 compound action
    @ThreadSafe
    public class CountingFactorizer implements Servlet {
        private final AtomicLong count = new AtomicLong(0);
    
        public long getCount() {
            return count.get();
        }
    
        public void service(ServletRequest req, ServletRequest resp) {
            BigInteger i = extractFromRequest(req);
            BigInteger[] factors = factors(i);
            count.incrementAndGet();
            encodeIntoResponse(resp, factors);
        }
    }
    
  • 上面这个例子就是利用了java的java.util.concurrent.atomic, 这个package能够使 得一切操作原子化.
  • 在实际操作中,尽可能的使用已有的线程安全的类(比如AtomicLong)来管理类的状态.

Locking

  • 我们前面通过一个线程安全的AtomicLong类来管理计数,保证了整个大类的线程安全, 如果我 要加入更多,类型更为复杂(不是long)的state,我还可以像下面的例子一样,全部都使用Atomic 帮手(这里是AtomicReference)么
  • 比如为了提高servlet的效率,我们设计了一个cache的机制, 如果新的请求和上一个请求是一样 的,那么我们就可以不用重复计算,而直接返回上次的计算结果
    @NotThreadSafe
    public class UnsafeCachingFactorizer implements Servlet {
        private final AtomicReference<BigInteger> lastNumber
            = new AtomicReference<BigInteger> ();
        private final AtomicReference<BigInteger[]> lastFactors
            = new AtomicReference<BigInteger[]>();
    
        public void service(ServletRequest req, ServletResponse resp) {
            BigInteger i = extractFromRequest(req);
            if (i.equals(lastNumber.get())) {
                encodeIntoResponse(resp, lastFactors.get());
            } else {
                // factor是因式分解 6 = 3 * 2
                BigInteger[] factors = factor(i);
                lastNumber.set(i);
                lastFactors.set(factors);
                encodeIntoResponse(resp, factors);
            }
        }
    }
    
  • 尽管上面的例子中,每一个变量都是线程安全的,但是整个类却无法达到线程安全.
  • 线程安全的定义,要求"不变式"一直有效, 我们这个逻辑的"不变式"就是:
    lastFactors中缓存的factors数组的乘积,应该等于lastNumber中缓存的数
    

    一定要是lastNumber计算的结果,

  • 很遗憾,上面的代码无法满足这个要求.除非我们"原子 性"的同时更新两个变量: 线程A获取这两个变量的时候(分两次),线程B可能已经更改了他们

Intrinsic Locks

  • java提供了一种最简单的内置锁,叫做同步代码块(synchronized block)
    synchronized (lock) {
        // Access or modify shared state guarded by lock
    }
    
  • 同步代码块包括了两个部分:
    • reference to an objct : 作为lock
    • block of code : 作为被锁关照的部分
  • java中的所有对象都可以作为"lock"存在, 换句话说,所有的对象内部都实现了一个锁, 而java中所有的block(包括函数),都是在一定的object内部的(或者class内部, 比如 static method).所以最正常的方法是把ref to current object(也就是this)作为lock
    public void addName(String name) {
        synchronized(this) {
            lastName = name;
            nameCount++;
        }
        nameList.add(name);
    }
    
  • 只有进入"同步代码块"才能获得锁,退出或者抛出异常才能放弃"锁", 同一时间之恩 能够有一个thread拥有这个锁
  • 同步函数(synchronized method)就是同步代码块的一个个例:
    • lock是调用这个函数的object:就不用特别指定了,其实就是this
    • 被锁关照的部分是整个函数: 函数的{},作为block
  • 同步函数默认使用当前object的this,所以static method和common method所使用的 this是不同的,锁也就不同了.
  • 下面就是使用synchronized method来获得thread-safe的办法
    @ThreadSafe
    public class SynchronizedFactorizer implements Servlet {
        @GuardedBy("this") private BigInteger lastNumber;
        @GuardedBy("this") private BigInteger[] lastFactors;
    
        public synchronized void service(ServletRequest req,
                                         ServletResponse resp) {
            BigInteger i = extractFromRequest(req);
            if (i.equals(lastNumber)) {
                encodeIntoResponse(resp, lastFactors);
            } else {
                BigInteger[] factors = factor(i);
                lastNumber = i;
                lastFactors = factors;
                encodeIntoResponse(resp, factors);
            }
        }
    }
    
  • 这种原子化虽然达到了线程安全的目的,但是确是以巨大的效率牺牲为代价的:因为内置锁本 质上是一种互斥锁,同一时间只能有一个线程访问函数.

Reentrancy

  • 内置锁都是可重入(renentrancy)的, 所谓可重入,就是说一个线程自己已经获得了某个内 置锁, 当它试图再次获得这个内置锁的时候,是成功的!
  • java实现了可重用,意味着java的锁的是每个线程获取一次, 而不是Pthread里面的每 个调用获取一次
  • 可是一个线程为什么会再次请求自己已经拥有的锁呢? 在面向对象当中,这种例子非 常常见. 可以说可重入的设计,极大的简化了面向对象的并发开发.
    public class Widget {
        public synchronized void doSomething() {
        }
    }
    
    public class LoggingWidget extends Widget {
        pblic synchronized void doSomething() {
            System.out.println(toString() + ": calling doSomething");
            super.doSomething();
        }
    }
    
  • 上面的例子中,如果内置锁不是可重入的话, 在调用super.doSomething()的时候,就会永 远的等待下去,(因为子类的doSomething开始的时候,肯定会获取内置锁,而调用super父类 函数的时候,会再次试图获取内置锁)
  • java实现内置锁的可重入的方法,就是计数,线程首次获得这个锁,计数为0, 如果其他线程线 程来,发现是0,就等待,而本线程再次获取这个锁,会导致计数变成1

Guarding State with Locks

  • 我们上一节介绍了锁(主要是同步代码块), 是通过对"一个block"的访问限制(每次一 个访问)来达到对state排他("eclusive")访问的
  • 前面说到的check-then-act和read-modify-write需要的是atomic(一个操作之内的不间 断性), 当然可以用锁实现这种atomic(只有一个线程访问,当然是atomic的), 但是要变得非常复杂:
  • 锁的引入,让多线程有序访问变量(一个接一个,而不是同时)变得方便. 前面说到的check-then-act 和read-modify-write也可以使用锁, 但是要变得非常复杂:
    • 在整个 compound action的过程当中都要用锁
    • 每个变量用到的地方还要用锁, 比如上面CountingFactorizer例子中,每次变量被使 用的时候,都是使用的自己的操作(比如.get(), .incrementAndGet())
  • 一个常见的错误是,认为只有'共享变量被写入的时候,才需要锁',这是不对的.
    For each mutable state variable that my be accessed by more than one thread,
    all accesses to that variable must be performed with the same lock held. In this
    case, we say that the variable is 'guarded by that lock'.
    
  • 上面的定义,终于点了我们的题:guarding state with locks: 一个state是不是被保护 了,是看你是否synchronized了所有的access, 如果能成功保护了的话,可以使用@Guardedby annotion来标记(Guardedby更多的是一种文档,起作用的还是synchronized), 再来看看 上一遍synchronized method的例子:
    @ThreadSafe
    public class SynchronizedFactorizer implements Servlet {
        @GuardedBy("this") private BigInteger lastNumber;
        @GuardedBy("this") private BigInteger[] lastFactors;
    
        public synchronized void service(ServletRequest req,
                                         ServletResponse resp) {
            BigInteger i = extractFromRequest(req);
            if (i.equals(lastNumber)) {
                encodeIntoResponse(resp, lastFactors);
            } else {
                BigInteger[] factors = factor(i);
                lastNumber = i;
                lastFactors = factors;
                encodeIntoResponse(resp, factors);
            }
        }
    }
    
  • 一个'object的内置锁(synchronized(this))'和这个'object的state'之间是没有任何 联系的:一个object的state完全可以使用其他的object ref作为锁. 只不过在一个object 内部, 使用this作为锁是synchronized method的默认行为,这样做更简单.
  • 一种常见的locking策略,是把所有的mutable变量都加内置锁:方法是所有能够接触到这些mutable 变量的函数(或者path)都加synzhronized.有些jdk内部的实现,就把object的内置锁和 object的state强行联系起来了.比如Vector, 其所有函数都被synchronized保护了
    public synchronized void ensureCapacity(int minCapacity) {
        if (minCapacity > 0) {
            modCount++;
            ensureCapacityHelper(minCapacity);
        }
    }
    
  • Vector的这种做法其实是不太可取(虽然也凑合用), 因为一旦加入一个新的函数,忘了 加synchronized的话,就破坏了原来的约定.
  • 并不是每一个变量都需要加锁保护,但是如果这个变量能够被多个thread访问到,那么就 必须加锁保护
  • 更进一步的,如果class内部有两个变量都能被多个thread访问到, 而且class的'不变 式(正常工作的要求)'涉及道了这多个变量,那么这多个变量不仅仅要加锁,还要加"同一 个锁", 在class内部,这同一个锁就显然是this(内置锁), 其他锁当然也可以,只要是 同一个锁
    For every invariant that involves more than one variabl, all the variables
    involved in that invariant must be guarded by the same lock
    
  • 既然synchronized是治愈race condition的良药,为什么说前面的Vector'给所有method 都加synchronized'的方法是不妥的呢?原因有如下:
    • 即便为每一个method都加了锁,这些method配合的时候,还是会导致race condition, 比如下面的contains和add都加了锁,但是这两个动作之间,锁可能还是会丢失(因为执 行完contains, 某个thread就会离开Vector block), 从而导致race condition
      if (!vector.contains(element)) {
          vector.add(element);
      }
      
    • 为每个method加锁,还会导致liveness或者performance问题

Liveness and Performance

  • 前面我们通过synchronized method来让serverlet的cache功能实现了thread-safe,但 是这样做存在着巨大的性能隐患:
    • 每次只能有一个thread访问servlet,这违背了servlet需要被多个thread同时访问的设计初衷
    • 如果某个thread占用servlet时间过长会极大的影响server的响应率, 降低CPU的使用率
  • 造成性能隐患的原因是我们锁的层次太高了, 锁的内容太多,容易造成性能的缺失
  • 刚才我们的锁是加在了函数层次,这里我们可以重新设计代码,让锁的层次更低一些, 用 了两次synchronized(this)来替代原来的整个函数synchronized(注意hits, 和cacheHits 只是新加的bonus而已,不是非得要才能达到threadsafe
    @ThreadSafe
    public class CachedFactorizer impements Servlet {
        @GuardedBy("this") private BigInteger lastNumber;
        @GuardedBy("this") private BigInteger[] lastFactors;
        @GuardedBy("this") private long hits;
        @GuardedBy("this") private long cacheHits;
    
        public synchronized long getHits() {
            return hits;
        }
    
        public synchronized double getCacheHitRatio() {
            return (double) cacheHits / (double) hits;
        }
    
        public void service(ServletRequest req, ServletResponse resp) {
            BigInteger i = extractFromRequest(req);
            BigInteger[] factors = null;
            synchronized(this) {
                ++hits;
                if (i.equals(lastNumber)) {
                    ++cacheHits;
                    factors = lastFactors.clone();
                }
            }
            if (factors == null) {
                factors = factor(i);
                synchronized(this) {
                    lastNumber = i;
                    lastFactors = factors.clone();
                }
            }
            encodeIntoResponse(resp, factors);
        }
    }
    
  • 从代码中我们看到,因为使用了内置锁, AtomaticLong这种线程安全类的方式被抛弃了,因为两种 同步方式常常会引发错误
  • 我们代码加锁的区域不大,但是必要的地方也都加了锁, 这样既坚固了效率,又保障了安全
  • 不要试图在如下两种情况下加锁:
    • 时间较长的计算
    • 有可能无法快速完成的任务(比如网络IO, 控制台IO)

Chapter 3: Sharing Objects

  • 第二章我们讲到过了,并发编程的关键,是管理对共享,可变数据的访问.
  • 所以第二章主要讲的是,如何利用'各种技巧(锁,原子化)’来防止'多个线程在同一时间 访问同一个数据'.
  • 这一章讲的则是,通过"其他技巧"(而不是靠锁)让对象能够共享,而且被多个线程同时访 问,当然是线程安全的访问
  • 第二章的介绍,很容易让人误以为同步"只能":
    1. 让操作原子化
    2. 创建critical section
  • 其实同步还有其他的不易察觉却非常重要的作用:内存的可见性(memory visibility).
  • 所谓内存可见性,是指:我们不仅仅想阻止线程A在线程B使用数据X的时候得到这个数据, 我们还想让A可以在B更改X的时候,能够"看到"数据X的最新值.如果没有正确的同步,是 不可能做到内存可见的

Visibility

  • 在single-thread进程环境里面, 如果你写入一个值到一个变量,然后再读取,那么结果 是显而易见的.但是,如果'读取'和'写入'操作是在两个不同的线程里面的话,结果很可能 是reading thread无法看到writing thread写入的值
  • 下面这个例子,有两个线程,main线程和reader线程, main线程首先启动reader线程, 然后设置ready为true, 并给予number一个非零值.reader线程不停的check readay 的状态是否为true,如果是的情况下,就打印number的值
    public class NoVisibility {
        private static boolean ready;
        private static int number;
    
        private static class ReaderThread extends Thread {
            public void run() {
                while (!ready) {
                    Thread.yield();
                }
                System.out.println(number);
            }
        }
    
        public static void main(String[] args) {
            new ReaderThread().start();
            number = 42;
            ready = true;
        }
    }
    
  • 虽然上面的例子在大多数情况下都能成功打印42, 但是还是存在着下面两种可能的错误:
    1. 死循环:因为读不到ready变成true
    2. 打印0:因为读不到number变成42
  • 造成上面两种错误的原因是:在没有同步的情况下, JVM有权利把所有的指令按照自己 的想法重新安排顺序(通常是为了缓存,效率等原因)
  • 如果想避免上述情况的发生,说起来也是很简单的:如果一个数据会在多个线程间共享, 那么就一定要使用合适的同步机制

Stale Date

  • 上面错误发生的原因根本上在于,如果不使用同步机制, 某个线程很多时候会发现"失效" 的数据, 更坏的是,"失效"数据很可能是部分发作的,比如上面的例子, 有两个变量,可能 有时候是一个失效,或另一个失效,或者都失效
  • 失效的数据,会造成巨大的危害.下面的例子MutableInteger不是线程安全的,线程A正在 getter的时候,线程B可能在setter.
    @NotThreadSafe
    public class MutableInteger {
        private int value;
    
        public int get() {
            return value;
        }
        public void set(int value) {
            this.value = value;
        }
    }
    
  • 更改的方法,就是给两个函数都加上同步机制
    @ThreadSafe
    public class SynchronizedInteger {
        @GuardeBy("this") private int value;
    
        public synchronized int get() {
            return value;
        }
        public synchronized void set(int value) {
            this.value = value;
        }
    }
    
  • 仅仅给予setter同步是不够的,因为某个线程还是可以在getter的时候,得到"失效"的数据:
    • value开始的时候值是5
    • 线程A进入setter,然后被调度出去,
    • 线程B虽然不能进入setter,但是还是可以进入getter,也就看到了老的value值5
    • 线程A重新被调度到,把value改成了100
    • 线程B拿到的是老的值5

Non-atomic 64-bit Operations

  • 如果没有使用同步机制,线程可能会读取到"失效"的变量,但至少这个变量还是其他线 程更改的,还不是很离谱.有些时候,会遇到更离谱的情况:读取的值直接就是一个随机 值
  • 这种随机值的情况,主要发生在64位的变量(long 和double)上面,JVM读取和存储32 位的数字是原子的,但是读取和存储64位的数字是分两次读取的(64位java应该就没有 这个问题).所以如果读写64位在两个不同的线程里面,会出现前32位和后32位的数字 来自不同64位数字的尴尬随机数.
  • 所以即便你不担心会读取到"失效"的变量,如果一旦有64位变量存在被多个线程读取的 情况(而且没有加锁,没有标记成volatile), 还是加个同步机制吧

Locking and Visiility

  • 内置锁可以用来保证一个线程可以"保证"看到另一个线程所做的操作, 比如下面的例 子, 如果没有同步机制, 线程B无法保证看到x的新值1, 因为它可能由于优化的原因 直接取用了x在缓存里面的值
        Thread A
    +---------------+
    |   y = 1       |
    +---------------+
    
    +---------------+
    |   lock M      |
    +---------------+
    
    +---------------+
    |   x = 1       |
    +---------------+
    
    +---------------+
    |   unlock M    |                            Thread B
    +---------------+ --------------------->  +---------------+
                                              |  lock M       |
                                              +---------------+
    
                                              +---------------+
                                              |  i = x        |
                                              +---------------+
    
                                              +---------------+
                                              |  unlock M     |
                                              +---------------+
    
                                              +---------------+
                                              |  j = y        |
                                              +---------------+
    
  • 所以,"锁"并不是只是为了"互相排斥(mutual excusion)", 而且也为了能够让其他 线程看到"有效的"内存值
    Locking is not just about mutual exclusion; it is also about memory visibility.
    To ensure that all threads see the most up-to-date values of shared mutable variables,
    the reading and riting threads must synchronize on a common lock.
    

Volatile Variables

  • Java还提供了一个简单,轻量级的synchronizated, 就是关键字volatile.编译器 看到volatile的时候,就知道这个变量是共享的, 所以:
    1. 编译器不会把对volatile变量的操作reorder
    2. 编译器不会把volatile变量放入寄存器进行缓存(缓存是其他的cpu core看不到的)
  • 既然是轻量级的lock,那么volatile还有其局限性:
    • lock可以保证原子性和内存可见性
    • 轻量级lock, volatile变量只能保证内存可见性
  • volatile非常的脆弱,甚至无法保证++的原子性(count++), 最常见的volatile变量 的用法也就是flag:
    volatile boolean asleep;
    
    while(!asleep) {
        countSomeSheep();
    }
    
  • 只有保证如下的三个条件,才能使用volatile变量:
    1. 对这个变量的写入操作,不涉及到它原来的值
    2. 这个变量不涉及其他变量的"不变式"(invariants)
    3. 没有任何理由要为访问此变量加锁

Publication and Escape

  • 发布(publish)一个对象的意思是,扩展某个对象的作用域,让它以前作用域以外的代码 能够看到它, 方法有一下:
    1. 存储一个指向对象的引用reference
    2. 从一个非私有的函数里面返回某个对象
    3. 把一个对象当作某个函数的参数
  • 发布一个内部的state是对封装的妥协,会更加难以保证"不变式"
  • 发布一个还没有创建好的对象,是对线程安全的妥协
  • 如果一个对象还没准备好, 就被不小心的发布了,叫做逃逸(escaped)
  • 最明显的发布方式,就是'存储一个对象的引用在public static的变量'里面, 这样所有 的人都可以通过knownSecrets得到引用然后知道了Secret的所有内容
    public static Set<Secret> knownSecrets;
    
    public void initialize() {
        knownSecrets = new HashSet<Secret>();
    }
    
  • 通过'从非private的函数里面返回值'的方式,也容易publish不合适的内容, 比如下面这 个例子,通过返回的引用,我可以轻松的改动本来private的值
    package org.hfeng.jcip.ch3;
    
    public class UnsafeStates {
        public String[] getStates() {
            return states;
        }
    
        private String[] states = new String[] { "AA", "BB"};
    
        public static void main(String[] args) {
            UnsafeStates us = new UnsafeStates();
            System.out.println(us.getStates()[0]);
            us.getStates()[0] = new String("ZZ");
            System.out.println(us.getStates()[0]);
        }
    }
    
    ////////////////////////////////////////////////////
    // <===================OUTPUT===================> //
    // AA                                             //
    // ZZ                                             //
    ////////////////////////////////////////////////////
    
  • '把一个对象当作某个函数的参数'是最后一种可能的发布内部state的方法. 下面是返回 一个inner class instance的例子,下面虽然看似是publish了EventListener, 但是其 实EventListener里面含有ref到ThisEscape, 所以其实也同时publish了ThisEscape. 因为EventListener是一个inner class(interface), 它必然包括对enclosing instance 的引用
    public class ThisEscape {
        public ThisEscape(EventSource source) {
            source.registerListener(new EventListener() {
                @Override
                public void onEvent(Event e) {
                    doSomething(e);
                }
            });
        }
    
        void doSomething(Event e) {
        }
    
        interface EventSource {
            void registerListener(EventListener e);
        }
    
        interface EventListener {
            void onEvent(Event e);
        }
    
        interface Event {
        }
    }
    

Safte Construction Practices

  • 上面的例子告诉我们,在ctor里面publish 对象(在ctor里面有一个register的过程,其 实就是发布)的结果,很可能是publish了一个没有构造好的对象
  • 一个常见的在构造阶段escape this引用的做法就是在构造函数里面开启线程. 因为构造 函数里面this是和其他线程共享的, 所以:
    • this引用可以作为参数"显示"的传递给线程
    • this引用也可以"隐性"的被调用,因为Thread或者Runnable是当前class的inner class 的话
  • 一旦在ctor里面创建了新的thread, 新的thread会看到没有创建好的当前的class
  • 但这不是错误的关键,关键是不要那么着急start().
  • 从构造函数里面调用被重载的函数(比如上面的调用public void onEvent)肯定会this引用泄漏.
  • 如果实在想在构造函数里面注册一个event listenr或者start()一个线程,那么可以 选择把构造函数设计成private, 然后用public的工厂方法,如下:
    public class SafeListener {
        private final EventListener listener;
    
        private SafeListener() {
            listener = new EventListener() {
                @Override
                public void onEvent(Event e) {
                    doSomething();
                }
            };
        }
    
        public static SafeListener newInstance(EventSource source) {
            SafeListener safe = new SafeListener();
            source.registerListener(safe.listener);
            return safe;
        }
    
        void doSomething() {
        }
    
        interface EventSource {
            void registerListener(EventListener e);
        }
    
        interface EventListener {
            void onEvent(Event e);
        }
    
        interface Event {
        }
    }
    

Thread Confinement

  • 前面说过, "共享且可变"的函数访问必须需要同步机制, 一个不需要同步的方法说来 简单,就是"不要共享", 如果一个变量只被一个线程访问,当然用不到同步机制.
  • 这种不共享数据的方法又叫thread confinement,是最简单的线程安全的方法.
  • 一个object就算自己不是thread-safe的,但是从来不共享它的话,那么可以看做'对这个 object的访问是thread-safe的'
  • 一个常见的使用thread confinment来实现线程安全的例子是JDBC(Java Database Connectivity):
    • JDBC规范没有要求Connection object是thread-safe的(当然connection pool必须 是thread-safe的,否则无法满足多个线程访问)
    • 在一个服务器应用中, '一个线程'会从connection pool中请求一个connection,然 后等用完了之后,再还回去,期间不会把这个connection共享给其他线程.所以,通过 不共享资源的方法, 实现了线程安全.
  • 不共享资源的方法不是从java语言的角度来保证的.(前面的JDBC的例子, JDBC的spec 没有要求一定要线程安全, 而线程不共享connection是由servlet 和EJB request的 结构造成的), 而是要求软件的设计来保证的.
  • 与之相反,如果软件设计有漏洞,无法做到"thread不safe的object不被其他thread访问" 的话,我们可以借助java语言的两个特性来让变量无法被其他thread访问:
    • local variable
    • ThreadLocal class

Ad-hoc Thread Confinement

  • 临时性的线程封闭(ad-hoc thread confinement)只是说如果你的代码保证是单线程了 才可以使用(比如GUI), 其实不是很鼓励使用
  • 比如,使用volatile variable的时候,需要保证'只有一个writer', 多个reader才可 以读到最update的值.

Stack Confinement

  • stack confinement是thread confinement的一种, 其实就是用local variable来 代替全局或者作用域更大的变量.
  • 就像封装可以更容易实现"不变式"一样, local variable可以更容易的把object限定 在thread里面. 其实java的local variable更容易表示,因为它一般'不能'加public private, protected等符号
  • 之所以叫stack,是因为local variable一般都是存放在stack上的, 而每个thread都有 自己的stack(call stack), 不和其他thread共享:每当一个thread进入这个函数的时 候都会压(自己的)栈来创建local variable
  • stack confinement更易于实现,健壮性也强于ad-hoc thread confinement
    public int loadTheArk(Collection<Animal> candidates) {
        SortedSet<Animal> animals;
        int numPairs = 0;
        Animal candidate = null;
    
        //animals confined to method, don't let them escape!
        animals = new TreeSet<Animal>(new SpeciesGenderComparator());
        animals.addAll(candidates);
    
        for (Animal a : animals) {
            if (candidate == null || !candidate.isPotentialMate(a))
                candidate = a;
            else {
                ark.load(new AnimalPair(candidate, a));
                ++numPairs;
                candidate = null;
            }
        }
        return numPairs;
    }
    
  • 比如上面的例子中, animals, numParis都是local variable,不能被其他thread共享:
    • numParis,是primitive类型,所以不用担心ref泄露
    • animals, 是object reference,所以要防止ref泄露. 也就是不能publish animals 这个ref到外部
  • 在线程内部使用线程不安全的对象(比如TreeSet类型的animals)保证其local也是可以 保证整个线程安全的, 但是要注意:
    • 这个线程不安全对象只属于这个线程
    • 这个线程不安全这件事情要文档记录

ThreadLocal

  • 最正式的保护线程confinement的方法就是ThreadLocal
  • 其实ThreadLocal这个东西叫做ThreadLocalVariable 更合适, 其实现机制就是为每一个 使用该变量的线程提供一个变量值的副本(通过new)
  • ThreadLocal为每个线程创建了私有的变量, 这个其实就是"用空间换时间", 每个线程 都有自己的变量,就不会打架了.
  • 而同步机制,就是"用时间换空间":只有一份变量,大家排好队依次取用.
    package org.hfeng.jcip.ch3;
    
    class Connection {
        private String url;
    
        Connection(String url) {
            this.url = url;
        }
    }
    
    class DriverManager {
    
        public static Connection getConnection(String db_url) {
            return new Connection(db_url);
        }
    }
    
    public class TestThreadLocal {
        public static final String DB_URL = "jdbc:mysql://localhost/mydatabase";
    
        private ThreadLocal<Connection> connectionHolder = new ThreadLocal<Connection>() {
            public Connection initialValue() {
                return DriverManager.getConnection(DB_URL);
            }
        };
    
        private class Thread1 extends Thread {
            public void run() {
                System.out.println("Connection is " + connectionHolder.get());
            }
        }
    
        private class Thread2 extends Thread {
            public void run() {
                System.out.println("Connection is " + connectionHolder.get());
            }
        }
    
        public void display() {
            new Thread1().start();
            new Thread2().start();
        }
    
        public static void main(String[] args) {
            TestThreadLocal testThreadLocal = new TestThreadLocal();
            testThreadLocal.display();
        }
    }
    
    ////////////////////////////////////////////////////////
    // <===================OUTPUT===================>     //
    // Connection is org.hfeng.jcip.ch3.Connection@fe268a //
    // Connection is org.hfeng.jcip.ch3.Connection@52b16b //
    ////////////////////////////////////////////////////////
    
  • Thread设计的原理很想Map,也就是ThreadLocal<T>其实就是Map<Thread, T>,也就是为 每一个Thread创建并存储一份T的instance.
  • ThreadLocal最开始的初衷就是建立thread confinement的,为每个线程创建一份自己 的数据,所以不要在其他方面滥用

Immutability

  • 同步的对象是"共享的变量", 我们打完"共享"的主意,现在是打"变量"主意的时候了, 把"变量"变成"常量"是一个更简单的注意:常量天生就线程安全
    Immutable objects are always thread-safe
    
  • 当然我们这里的常量是immutable objects: 就是在构造之后就无法改变的对象
  • immutable对象不是所有的域都是final, 那远不够, 要达到下面的条件
    • 对象的state在构造以后无法改变
    • 对象所有的域都是final的 (String这种天然的immutable当然可以)
    • 对象构造的过程是没有差错的(不会导致引用逃逸 reference escape)
  • 如果细心会发现,Immutable对象的构建过程并没有要求自己所有的成员都是immutable 的, 如果那样要求的话,java除了final static的内置类型,和本身就不可变的String 以外就没其他候选了.
  • 下面就是一个Immutable对象的内部使用mutable的例子
    public class ThreeStooges {
        private final Set<String> stooges = new HashSet<String>();
    
        public ThreeStooges() {
            stooges.add("Moe");
            stooges.add("Larry");
            stooges.add("Curly");
        }
    
        public boolean isStooge(String name) {
            return stooges.contains(name);
        }
    
    }
    
  • 这里的Set类型肯定是可以改变的.(内容改变), 因为java里面final 修饰的只不过是 stooges本身, 它无法再指向其他的Set类(或者Set的子类)了, 但是你final了stooges, 它内部的数据还是可以不停的增加的.
  • 我们通过将这个类的Set成员设计成private,那么别人就不可能看到它,也就不可能再 给它增加新的内容了!
  • 同时,看我们的ctor: public ThreeStooges, 过程本身没有泄露reference
  • 刚接触到immutable的新手可能会觉得immutable object没有什么用,其实不然:
    • nonfinal ref 指向一个immutabl object, object是不可变的
    • 但是ref是可以指向新的object
    • 创建一个新的immutable object(老的就放弃了),然后ref指向它
    • 从ref的角度看,其值肯定是改变了,而且object都是immutable,肯定是thread-safe

Final Fields

  • final变量无法改变(虽然他们指向的对象可能改变). 使用final能够保证:
    • 初始化安全(initialization safety)
    • 不需要同步机制,保证线程安全
  • 除非某个变量可能会改变,把所有的变量声明为final是一个good practice
    Just as it is a good practice to make all fields private unless they need greater
    visibility, it is a good practice to make all fields final unless they need to
    be mutable.
    

Example: Using Volatile to Publish Immutable Objects

  • 下面这个例子就是创建了一个immutable holder class,来确保线程安全
  • 如果用mutable holder object,你必须使用lock. 下面的immutable object 一旦创建就无法更改. 如果想要更改变量,那么就要创建一个新的immutable object.
  • 下面这个例子能够成功也要得益于在ctor和返回值里面都用到的java6的新特性, Arrays.copyOf. 虽然没有使用任何的lock手段,但由于对象是immutable的,所以可以 成功的保证值在设置以后马上就能被其他函数"看到"
    package org.hfeng.jcip.ch3;
    
    import org.apache.http.annotation.Immutable;
    
    import java.math.BigInteger;
    import java.util.Arrays;
    
    @Immutable
    public class OneValueCache {
        private final BigInteger lastNumber;
        private final BigInteger[] lastFactors;
    
        public OneValueCache(BigInteger i, BigInteger[] factors) {
            lastNumber = i;
            lastFactors = Arrays.copyOf(factors, factors.length);
        }
    
        public BigInteger[] getLastFactors(BigInteger i) {
            if (lastNumber == null || !lastNumber.equals(i)) {
                return null;
            } else {
                return Arrays.copyOf(lastFactors, lastFactors.length);
            }
        }
    }
    
  • 我们可以看到数组的ref value都不一样,肯定是thread-safe的
    package org.hfeng.jcip.ch3;
    
    import java.math.BigInteger;
    
    public class TestImmutable {
        private static BigInteger[] bigIntegers = new BigInteger[] {BigInteger.ONE, BigInteger.TEN};
        private static OneValueCache oneValueCache = new OneValueCache(BigInteger.ONE, bigIntegers);
    
        private static class Thread1 extends Thread {
            @Override
            public void run() {
                System.out.println(oneValueCache.getLastFactors(BigInteger.ONE));
            }
        }
    
    
        public static void main(String[] args) {
            for (int i = 0; i < 10; i++) {
                new Thread1().start();
            }
            System.out.println(oneValueCache.getLastFactors(BigInteger.ONE));
        }
    }
    
    ////////////////////////////////////////////////////
    // <===================OUTPUT===================> //
    // [Ljava.math.BigInteger;@ff5996                 //
    // [Ljava.math.BigInteger;@52b16b                 //
    // [Ljava.math.BigInteger;@fe268a                 //
    // [Ljava.math.BigInteger;@a39c81                 //
    // [Ljava.math.BigInteger;@178069d                //
    // [Ljava.math.BigInteger;@a3bdc                  //
    // [Ljava.math.BigInteger;@186a830                //
    // [Ljava.math.BigInteger;@25c6e8                 //
    // [Ljava.math.BigInteger;@25cc4c                 //
    // [Ljava.math.BigInteger;@18797cf                //
    // [Ljava.math.BigInteger;@ff99df                 //
    ////////////////////////////////////////////////////
    

Safe Publication

  • 到现在为止,我们的策略主要是"堵":就是不让对象publish, 或者object只在某个thread 里面,当然就不会有问题
  • 但是有时候,我们的确要发布一个对象,这个时候就要注意很多问题, 下面的这种简单 的把一个指向对象的ref抛到public区域的做法无法保证publish的安全
    // Unsafe publication
    public Holder holder;
    
    public void initialize() {
        holder = new Holder(42);
    }
    
  • 这个例子问题是在于:会展示给其他线程一个"不完全构造"的对象

Improper Publication: When Good Objects Go Bad

  • 上面的例子Holder如果应用起来的话,竟然会出现'刚看到的n'和'过一会看到的n'值 不相等
  • 如下, 由于没有进行必要的"同步", 一个线程可能开始看到一个值是"过时的", 但是紧接着,它有看到了正确的值,这种情况下,会让下面的assertSanity函数抛出 AssertionError
    public class Holder {
        private int n;
        public Holder(int n) {
            this.n = n;
        }
        public void assertSanity() {
            if (n != n) {
                throw new AssertionError("This is statement is false");
            }
        }
    }
    
  • 不光是我们刚才描述的那些情况,一旦数据在多个线程间共享,而且没有正确的进行同 步,那么很多更加奇怪的事情会发生.

Immutable Objects and Initialization Safety

  • 通过上面的例子我们可以看到"一个object reference对其他thread看起来变的visible, 并不意味着这个object的state对这个thureadvisible"
    An object reference becomes  visible to another thread does not
    necessarily mean that the state of that object is visible to the
    consuming thread.
    
  • 所以为了保证mutable object的"consistent view", 如果要publish mutable object 的话,同步手段是必须的
  • 反之,对于immutable对象, java虚拟机专门提供了一个保证:"共享的不变量的初始化 是安全的"
  • java虚拟机提供的这一保证特别重要, 它让线程间共享数据有了一个捷径(不使用同 步手段的捷径),那就是使用immutable对象(不变对象的条件:无法更改的state, 所有域都 是final的, proper construction).
    Immutable objects can be used safely by any thread without additional synchronization,
    even when synchronization is not used to publish them
    

Safe Publication Idioms

  • 如果我们不能使用不变对象,那么就要采用"同步"的手段来保证publish的正确,而且:
    • 不仅仅要synchronized"发布(publish)对象"的线程
    • 还要synchronized"消费(consuming)这些发布的对象"的线程
  • 我们首先讨论的是,如何保证"consuming thread"看到"成功发布的对象",换句话说下 面是保证safe publish的几种方法, 这几种方法可以看做是在发布时候的synchronized 手段(在后面的叙述中,我们提到safely publish意思就是如下的几种方式,也即意味着 在publishing端的synchronized手段):
    • 最常见的办法:用static initializer来初始化object reference, JVM会在 class初始化的时候(而不是instance初始化),就初始化static initializer, 借助 于'JVM内部的同步机制',这种初始化的方法可以保证安全的发布(publish)对象
      public static Holder holder = new Holder(42);
      
    • 把object reference存储到一个volatile或者AutomicReference里面
    • 把object reference存储到一个properly constructed对象的final域里面
    • 把object reference存储到一个被lock很好保护的域里面(比如collection), 这一 条举例就是线程A把对象X放入到线程安全的vector里面(或者其他collection), 线 程B紧接着从这个vector里面读取对象X,虽然没有任何的"同步"手段,但是我们能保 证线程B能够看到准确的X的state
  • 上面我们提到了可以使用thread-safe library collection来"转存"object,从而能 够做到safe publication的保证.

Effectively(事实上的) Immutable Objects

  • 前面讲的safe publication object的几种手段,都是从"publishing的"thread的角度 来看的.做到诸如static initialization的话,在publishing端就没有其他的好说了.
  • 这些诸如static initialization的做法,相当于在publishing端加了synchronized机制
  • 前面我们讲了,除了在publishing thread端加synchronized机制外,还要在consuming thread 端加同步机制
  • 所谓effectively(事实上)的immutable objects, 意思是,发布的时候,已经做了 synchronized,但是consuming thread在"业务逻辑"上不会去改动这个object,这个 object虽然在technically上说肯定不算immutable的,但是因为不会被改动,所以我们 叫它"事实上的immutable object"
  • 下面是一个effectively immutable的例子
    public Map<String, Date> lastLogin =
        Collections.synchronizedMap(new HashMap<String, Date>());
    
  • Date不是immutable的, 把它放入到线程安全的Map中以后,就做到了publishing thread 端的synchronized机制,也可以被其他线程看到了.
  • 如果我们再能保证这个Date不再改变(String是immutable的),那么就可以在consuming thread端,也就是使用(access)这个变量的时候,不使用任何其他的"同步"手段了.

Mutable Objects

  • 如果我们不能保证一个对象在"安全发布"之后保证不更改,那么除了在publising thread 端的努力,我们还要在consuming thread端使用synchronized手段:
    • 把object做成, thread-safe(immutable天生thread-safe)
    • 使用锁(synchronized block, synchronized method)
  • 我们总结一下就是安全的发布一个object,并安全的让其被使用,其难度是根据mutability 不同而不同的:
    • 如果是immutable的话,publishing thread和consuming thread都不需要做任何处理
    • 如果是"事实上immutable"的话, publishing thread需要safe publish
    • 如果是mutable的话,publishing thread端需要safely publish, consuming端需要 thread-safe或者lock

Sharing Objects Safety

  • 最后,我们从consuming thread端的角度来看看这个问题
  • 在consuming thread端,如果你得到了一个reference(能得到,说明你已经可以visible 这个object了), 但是能visible的话,却不一定能使用,还是要根据mutability:
    • 如果是immutable object的话: 直接使用
    • 如果是"事实上immutable"的话: 可以使用但是千万不要去更改ref的值
    • 如果是mutable的话,如果这个object不是thread-safe的话,要加锁访问
  • 所有这些情况的正确处理,在于publishing thread端能够正确的document自己所publish 的object的性质,以及consuming thread端能够做什么
  • 就我们所学过的几种情况,来做下总结, 如果我们在consuming thread里面拥有了如下 的reference:
    • Thread-confined object: 可以"读取"并且"更改"ref所指的object内容
    • Shared-read-only object: 包括immutable和"effectively immutable", 只可以 "读取",不能"更改"
    • Shared-thred-safe object: 因为对象是thread-safe的,所以可以"读取"并且"更改"
    • Guarded: 因为对象有锁保护: 只有在获取了"那个"锁的情况下,可以"读取"并且"更改"

Chapter 4: Composing Objects

  • 前面我们学到了如何在底层处理"线程安全"和"同步", 但是我们不希望分析每次内存访 问, 然后来保证线程安全, 这一章就来分析如何使用线程安全的component来更加容易 的达到整体的线程安全

Designing a Thread-safe Class

  • 虽然我们可以通过把class所有的state存储到public static field来达到safely, publish 然后再来保证consuming thread的synchronized, 但这样显然太麻烦了
  • 更简单的方法是使用encapsulation, encapsulation做到好可以不用像前面几章一样 一点的分析整个程序,就可以保证thread-safe
  • 设计一个thread-safe类需要包括下面三个基本原则:
    • 确定组成object state的那些个variable
    • 确定这些state variable组成的"不变式"
    • 管理对这些state varible的并发访问
  • 一个对象的state来源于他的数据域, 如果这些数据都是内置类型(primitive type) 的话, 这些数据就构成了整个对象的state.下面的例子中Counter只有一个内置类型 的数据,所以这唯一的数据线程安全了,整个对象就线程安全了.
    @ThreadSafe
    public final class Counter {
        @GuardedBy("this") private long value = 0;
    
        public synchronized long getValue() {
            return value;
        }
        public synchronized long increment() {
            if (value == long.MAX_VALUE) {
                throw new illegalStateException("counter overflow");
            }
            return ++value;
        }
    }
    
  • 如果对象有多个数据域,那么肯定对象的state是和多个值相关的
  • 更进一步的,如果对象含有指向其他对象的域, 那么对象的state是和这些域都相关的. 比如如果一个LinkedList包含很多node,那么他的state就包括这些node的state

Gathering Synchronization Requirements

  • 保证一个class的线程安全,其实就是保证在多个线程访问的情况下, 保证class的"不 变性": 这个不变性,需要我们对state加以控制:
    • 这个state越小,越容易控制
    • 如果尽可能的使用final来修饰field,你的state的变化区间就小了.
    • immutable 对象就更容易控制,因为它只有一种state
  • 上面的例子中, Counter的唯一一个域是value, 它是long, 取值范围是Long.MIN_VALUE 和Long.MAX_VALUE. 但是因为是counter,所以负数是不合法的
  • 同样的,一些操作可以通过post-condition来判断现在的state是不是合法的, 比如 当前的counter是17,那么下一个唯一合法的数字是18.
  • 如果下一个state一定要来自于当前的state, 那么这种操作就必须是compound action(概 念来自第二章:我们把check-then-act和read-modify-write这种必须通过原子性来保证线 程安全的操作叫做compound action)
  • 并不是所有的操作都有要求和前一个state相关,比如测量当前的温度.就更上一次的温度没有 关系
  • 而compound action操作就意味着"更多的同步或者封装请求".比如,一旦某个state invalid 了,我们要好好"保护"它,否则这个invalid的数据会被做为基础被其他thread拿去产生 更坏的后果.
  • 从另外一个房间讲,如果我们的操作不算compound action,那么我们就不需要保护中间 状态,那么我们可以去掉synchronized机制,从而做到更好的性能.
  • 一个class可能会有多个的变量,然后他们共同维护一个不变式,比如一个Range类,有个最大 值,有个最小值. 对这种变量的操作,一定要要求"atomically"的更改"最大值"和"最小 值"(换句话说,要求他俩加同一个锁)否则你更新完了其中一个,再去拿另外一个锁,中 间这个Range就是invalid的状态啊.
    You cannot ensure thread safety without understanding an object's invariants and
    post-conditions. Constraints on the valid values or state transitions for state
    variables can create atomicity and encapsulation requirements.
    

State-dependent Operations

  • 很多时候,某个对象存在一种叫做state-based的 precondition. 比如,一个队列里面必须 要有值,你才能从里面取值. 这个队列"非空"就是取值的precondition.
  • 而需要这种precondition的操作(也就是函数)就叫做state-depdent的操作.
  • 在单线程的程序中,如果precondition没有达到,那么相应的操作一定会失败的.但是在多线程 程序中,我们可以提供另外的选择,因为同一时段可能有其他线程放入数据,我们可以选择等一会 等有数据了再取.
  • 选择内置的方法来实现"等到有数据了通知我(wait-and-notify)"的策略,会和内置锁 (intrinsic lock)有非常紧密联系的,非常难以实现.
  • 我们推荐使用已有的Blocking library class.比如BlockingQueue, Semaphore.来实 现"wati for a precondition to become tre before proceeding".

State Ownership

  • 前面我们说了,并不是所有的object的field都会成为object state的一部分.
  • 当我们要定义哪些变量组成对象的state的时候,我们只考虑了对象自己的数据.
  • "所有权(Ownership)"这个概念,不是存在于语言里面的,而是一种class的设计.当你创建了一个 HashMap的时候,你就创建了多个对象: HashMap对象, 很多Map.Entry对象. 从逻辑上 来讲,不止HashMap对象,其他的Entry对象也是state的一个部分
  • 当传递一个object给cpp method的时候,你必须认真思考自己是不是:
    • 传递了你的ownership?
    • 短暂的借出了ownership
    • 长期共享ownership
  • 在java里面,上面三种ownsership模型都是存在的,但是由于GC的存在,ownership造成 的错误比较少,让我们很少在Java里面考虑ownership.
  • 在很多情况下,ownership和encapsulation总是在一起的:只有state的owner才能决定 "加锁"策略.
  • ownership意味着控制权,但是一旦你把你的ref to mutable object发布出去的话,你 就不再拥有对这个object"排他性的控制权"了,因为其他class也会能够更改并使用它了.
  • 一般来说,一个class通常不会拥有"以参数形式传入到它的method或者它的ctor的object" 的所有权.除非这个method是故意接受一个参数然后传递ownership的.比如synchronized collection wrapper factory method.
  • Collection class通常呈现出来的是一种"split ownership":
    • collection 拥有对collection infrastructure的ownership
    • client code拥有对存在collection内部object的ownership
  • 举个例子:
    • ServletContext就是一个类似Map的object container
  • Java由于有GC的存在,我们很少去考虑"所有权"的问题了. 在java中一个对象封装了state,并且 拥有这个state. 只有你"拥有"这个state,你才能来决定应该采取怎样的同步策略.因为"拥有权" 就意味着"控制权"
  • 但是,如果一旦你把一个reference发布出去,给了一个mutable的对象,那么那就没有"独享"的 所有权,以及控制权了.
  • 另外,一般来说,一个类对它的ctor的参数是没有所有权的,对它的内部函数的参数传入变量也是 没有所有权的.
  • Collection class经常出现所谓的"split ownership", 因为collection拥有collection 自己的state, 用户代码拥有collection里面的对象的state.
  • 一个"split ownership"的例子就是servlet框架里面ServletContext, 它提供了一个类似Map 的容器对象. 因为ServletContext会被多个线程同时访问.当Servlet需要从ServletContext 里面取得数据的时候,就一定要使用"同步方法"了,因为ServletContext里面的对象是application 拥有的. 所以访问他们的是application众多的线程.在使用的时候,还是要做到:
    • 要么object本身就是thread-safe
    • 要么object是immutable
    • 要么object被锁保护.

Instance Confinement

  • 如果一个object不是thread-safe的,那么你依然可以通过如下的手段,使其在多线程的 环境中使用:
    • 只能通过一个thread访问(thread confinement)
    • 访问这个"线程不安全"的object的,所有的access都guarded by lock的
  • 封装通过增加对object的"约束"(confine)来让class做到thread-safe
  • 具体做法就是把一个线程不安全的object放入到另外的一个class里面. 并且辅助以 相应的"锁"的机制.就可以做到让这个本来'线程不安全的object'能够以线程安全的方 式使用.
    Encapsulating data within an object confines access to the data
    to the object's methods, making it easier to ensure that the datat
    is always accessed with the appropriate lock held
    
  • "约束"(confine)的方法有很多种,常见的就是把一个object限定在:
    • class instance (private class member)
    • lexical scope (local variable)
    • 某个thread内部的method之间传递,而不要在不同threads之间传递
  • 下面的例子就是利用confinement和lock共同努力,把一个thread不safe的HashSet使用 的thread-safe起来
    @ThreadSafe
    public class PersonSet {
        @GuardedBy("this")
        private final Set<Person> mySet = new HashSet<Person>();
    
        public synchronized void addPerson(Person p) {
            mySet.add(p);
        }
    
        public synchronized boolean containsPerson(Person p) {
            return mySet.contains(p);
        }
    }
    
  • 上例中的mySet是PersonSet的state, HashSet本身是不threadsafe的,但是因为被包裹 在PersonSet里面(作为private),所以所有对mySet的访问途径被削减到两个函数.
  • 我们对这个两个函数加上intrinsic lock就可以了.注意使用intrinsic lock只是碰巧, 或者说,是一种惯例.用其他lock也是可以达到同等的需求的,只不过intrinsic用起来 更容易
  • 上面的例子中,我们没有对Person做任何说明,但是读过前面的介绍,不难发现.其实Person 也是State的一个部分,也要保证其访问的thread安全性.所以:
    • 如果Person是thread-safe class,那最好
    • 如果Person不是thread-safe,那么访问它的途径也是通过我们PersonSet的函数, 当 前没有能访问Person的函数,但是如果以后要加入的话,要使用intrinsic锁(或其他锁) 来保证Person的thread安全
  • Jdk中常用的collection比如ArrayList和HashMap都不是线程安全的.但是可以通过wrapper factory methods(比如Collections.synchronizedList)来达到thread-safe.
  • wrapper factory methods的原理就是,通过"Decortor模式"给一个collection加一个层 次,collection的所有函数都被加了一个synchronized method的外衣.这样,所有对 collection 的成员的访问'如果是通过wrapper object'的话,就都是被synchronized了 的了.wrapper object当然就是thread-safe的啦.
  • 需要注意的是wrapper object是thread-safe的,但是要保证对underlying的collection 的访问都是通过wrapper object的(否则synchronized 函数就没起作用)
    package org.hfeng.jcip.ch4;
    
    import java.util.*;
    
    public class CollectionsDemo {
        public static void main(String[] args) {
            List<String> list = new ArrayList<String>();
    
            list.add("1");
            list.add("2");
            list.add("3");
            list.add("4");
            list.add("5");
    
            // create a synchronized list
            List<String> synlist = Collections.synchronizedList(list);
    
            System.out.println("SynChronized list is " + synlist);
        }
    }
    
    ////////////////////////////////////////////////////
    // <===================OUTPUT===================> //
    // SynChronized list is [1, 2, 3, 4, 5]           //
    ////////////////////////////////////////////////////
    

The Java Monitor Pattern

  • 从上一节的instance confinement借鉴来的行为(也即object放到其他class里面,所有 通过class的method访问object的函数都加synchronized), 这种保护object线程安全 访问的手段,叫做Java monitor pattern
  • Java monitor pattern中使用intrinsic lock 仅仅是一种"约定俗成". 你完全可以 使用任意的object作为lock,比如下面,就是使用了一个private lock
    public class PrivateLock {
        private final object myLock = new Object();
        @GuardedBy("myLock") Widget widget;
    
        void someMethod() {
            synchronized(myLock){
                // Access or modify the state of widget
            }
        }
    }
    
  • 使用private lock object,而不是this(this是可以public acces的,因为它就是class 的instance)来作为lock,是有其优点的.因为lock是private的话,client code就无法 获得这个lock,也就不会参与到synchronized的过程,也就不会来捣乱了.

Example: Tracking Fleet Vehicles

  • 我们来看一个Java monitor pattern的例子"追踪汽车位置":
    • MutablePoint类用来记录位置(x,y), 它是线程"不安全的"
      package org.hfeng.jcip.ch4;
      
      import org.apache.http.annotation.NotThreadSafe;
      
      @NotThreadSafe
      public class MutablePoint {
          public double x, y;
      
          public MutablePoint(double x, double y) {
              this.x = x;
              this.y = y;
          }
      
          public MutablePoint(MutablePoint p) {
              this(p.x, p.y);
          }
      }
      
    • MonitorVehicleTracker类使用了Java Monitor 模式用来把汽车id(一个String)和 他的位置(x,y)进行包装.其是thread-safe的
      package org.hfeng.jcip.ch4;
      
      import org.apache.http.annotation.ThreadSafe;
      
      import java.util.HashMap;
      import java.util.Map;
      
      @ThreadSafe
      public class MonitorVehicleTracker {
          private final Map<String, MutablePoint> locations;
      
          public MonitorVehicleTracker(Map<String, MutablePoint> data) {
              locations = deepCopy(data);
          }
      
          public synchronized Map<String, MutablePoint> getLocations() {
              return deepCopy(locations);
          }
      
          public synchronized void setLocation(String name, MutablePoint point) {
              MutablePoint loc = safeGet(name);
              loc.x = point.x;
              loc.y = point.y;
          }
      
          public synchronized MutablePoint getLocation(String name) {
              return new MutablePoint(safeGet(name));
          }
      
          private Map<String, MutablePoint> deepCopy(Map<String, MutablePoint> src) {
              Map<String, MutablePoint> copy = new HashMap<String, MutablePoint>();
              for (String id : src.keySet()) {
                  MutablePoint point = src.get(id);
                  copy.put(id, new MutablePoint(point));
              }
              return copy;
          }
      
          private MutablePoint safeGet(String name) {
              MutablePoint loc = locations.get(name);
              if (loc == null) throw new IllegalArgumentException("No such ID: " + name);
              return loc;
          }
      }
      
    • 然后作为一个整体, VehicleTracker instance可能会在多个threads之间共享.比如 View thread就会获取vehicle的名字和位置,然后把他们打印在显示器上.
      Map<String, Point> locations = vehicles.getLocations();
      for (String key : locations.keySet()) {
          renderVehicle(key, locations.get(key));
      }
      
    • updater threads会从GPS(或者死机手动输入)取得汽车的新位置,然后更改
      void vehicleMoved(VehicleMovedEvent evt) {
          Point loc = evt.getNewLocation();
          vehicles.setLocation(evt.getVehicleId(), loc.x, loc.y);
      }
      
  • 由于view thread和updater thread都会同时访问data model, 所以thread safe对它 来说,是必须的.
  • 虽然内部的MutablePointer不是线程安全的, 但是我们只要包成tracker class线程 安全就可以了,因为所有的访问都是通过tracker class
  • 但是这个实现也有其缺点, 因为它是以deep copy的形式来返回所有的location. 如 果src.keySet()的值特别大,那么程序效率就会非常差,因为getLocations拿着intrinsic lock太久的时间,必然对性能造成巨大影响.
    public synchronized Map<String, MutablePoint> getLocations() {
        return deepCopy(locations);
    }
    
    private Map<String, MutablePoint> deepCopy(Map<String, MutablePoint> src) {
        Map<String, MutablePoint> copy = new HashMap<String, MutablePoint>();
        for (String id : src.keySet()) {
            MutablePoint point = src.get(id);
            copy.put(id, new MutablePoint(point));
        }
        return copy;
    }
    
  • 另外一个可能的影响是,你通过deepCopy拷贝了一份所有汽车某个时段的"snapshot", 一旦返回,这些数据就不会改变了.如果你想要返回的copy(Map类型)里面存的是"动态 的数据",那么这个实现显然不能满足

Delegating Thread Safety

  • java monitor pattern对于从零开始创建项目,或者把一个"thread不safe"的object(比 如map)包装成thread-safe的,是非常有用的
  • 如果一个class包含的那些component(比如map是thread-safe的map)本身就thread-safe 了,我们还需要使用java monitor pattern来包装他们么?
  • 答案是"不一定":因为class的每个成员都thread-safe, 只是保证整个class thread-safe 的一个良好开始.有些时候,还是需要增加其他辅助来保证整体thread-safe的
  • 回顾一下我们前面的例子CountingFactorizer.我们可以说CountingFactorizer让AtomicLong 的count来代理了它的thread-safe的责任:CountingFactorizer之所以thread-safe,是 因为AtomicLong count自己thread-safe
    @ThreadSafe
    public class CountingFactorizer implements Servlet {
        private final AtomicLong count = new AtomicLong(0);
    
        public long getCount() {
            return count.get();
        }
    
        public void service(ServletRequest req, ServletRequest resp) {
            BigInteger i = extractFromRequest(req);
            BigInteger[] factors = factors(i);
            count.incrementAndGet();
            encodeIntoResponse(resp, factors);
        }
    }
    
  • 下面来看几个把class自己的thread代理给自己的component的例子

Vehicle Tracker Using Delegation

  • 第一个例子是"成员(component)"是thread-safe, 而且只有一个成员(就不牵涉到成员之间的"不变式")
    package org.hfeng.book.jcip.ch4.immutable;
    
    
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentMap;
    
    public class DelegatingVehicleTracker {
        private final ConcurrentMap<String, Point> locations;
        private final Map<String, Point> unmodifiableMap;
    
        public DelegatingVehicleTracker(Map<String, Point> points) {
            locations = new ConcurrentHashMap<String, Point>(points);
            unmodifiableMap = Collections.unmodifiableMap(locations);
        }
    
        public Map<String, Point> getLocations() {
            return unmodifiableMap;
        }
    
        public Point getLocation(String id) {
            return locations.get(id);
        }
    
        public void setLocation(String id, int x, int y) {
            if (locations.replace(id, new Point(x, y)) == null) {
                throw new IllegalArgumentException("invalid vehicle name:" + id);
            }
        }
    
        // Alternate version fo getLocations
        public Map<String, Point> getLocationsAsStatic() {
            return Collections.unmodifiableMap(new HashMap<String, Point>(locations));
        }
    }
    
  • 包裹它的class不需要对访问成员的函数加synchronized就可以自动做到thread-safe 当然,也可以说
    DelegatingVehicleTracker把自己的thread-safe的责任代理给了ConcurrentHashMap
    
  • 注意!我这里使用的Point是immutable的, 这里使用immutable的Point不是为了让locations 变的thread-safe(因为location本身就是thread-safe的map实现). 这里使用immutable point是因为我们要publish point
    package org.hfeng.book.jcip.ch4.immutable;
    
    import org.apache.http.annotation.ThreadSafe;
    
    @ThreadSafe
    public class Point {
        public final int x, y;
    
        public Point(int x, int y) {
            this.x = x;
            this.y = y;
        }
    }
    
  • 注意!我们这里getLocations函数的行为已经变化了:
    • Monitor版本getLocations返回的是一个deepCopy版本的snapshot,数据是固定不变的.
    • Delegate版本getLocations返回的是一个unmodifiableMap版本的map.但是每个key 对应的value更新的话
    • Collections.unmodifiableMap通过无法让你put来保证你无法更改这个map
      public V put(K key, V value) {
          throw new UnsupportedOperationException();
      }
      
    • 只是无法通过unmodifiableMap来增加,但是还是可以通过原来的map增加.
      package org.hfeng.book.jcip.ch4;
      
      import java.util.Collections;
      import java.util.HashMap;
      import java.util.Map;
      
      public class TestUnmodified {
      
          public static void main(String[] args) {
              Map<String, String> map = new HashMap<String, String>();
              Map<String, String> umap = Collections.unmodifiableMap(map);
              map.put("1", "a");
              map.put("2", "b");
              map.put("3", "c");
      
              System.out.println(umap.get("1"));
              map.put("1", "A");
              System.out.println(umap.get("1"));
          }
      }
      
      ////////////////////////////////////////////////////
      // <===================OUTPUT===================> //
      // a                                              //
      // A                                              //
      ////////////////////////////////////////////////////
      

Independent State Variables

  • 第二个例子是"成员(component)"是thread-safe的, 而且不止有一个成员.只是成员 之间没有"不变式"关系(也即independent)!
  • VisualComponent是一个允许client来注册为鼠标和键盘准备的listener.
    package org.hfeng.book.jcip.ch4.multiple;
    
    import java.awt.event.KeyListener;
    import java.awt.event.MouseListener;
    import java.util.List;
    import java.util.concurrent.CopyOnWriteArrayList;
    
    public class VisualComponent {
        private final List<KeyListener> keyListeners
                = new CopyOnWriteArrayList<KeyListener>();
        private final List<MouseListener> mouseListeners
                = new CopyOnWriteArrayList<MouseListener>();
    
        public void addKeyListener(KeyListener listener) {
            keyListeners.add(listener);
        }
    
        public void addMouseListener(MouseListener listener) {
            mouseListeners.add(listener);
        }
    
        public void removeKeyListener(KeyListener listener) {
            keyListeners.remove(listener);
        }
    
        public void removeMouseListener(KeyListener listener) {
            mouseListeners.remove(listener);
        }
    }
    
  • 上面的例子中,我们使用了两个线程安全的CopyOnWriteArrayList, 而这两个list又 是independent的, 所以总体上是线程安全的,也可以说
    VisualComponent把自己的的thread-safe的责任代理给了多个CopyOnWriteArrayList的
    state variable. 如果这些variable之间没有'不变式'关系,那么VisualComponent就线程安全
    
  • 上例中使用的CopyOnWriteArrayList是一种thread-safe的List实现.其所有的可变操 作(add、set 等)都是通过对底层数组进行一次新的复制来实现的,代价昂贵.

When Delegation Fails

  • 第三个例子是"成员(component)"是thread-safe的,而且不止有一个成员,同时,成员 之间还有"不变式"关系
  • NumberRange有两个成员,其中lower的值必须必upper小.在这种'不变式'关系的存在下, 仅仅包裹"成员thread-safe的component"而不对access函数做synchronized处理的话 是无法做到thread-safe的
    package org.hfeng.book.jcip.ch4.delegate.invariant;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class NumberRange {
        // INVARIANT: lower <= upper
        private final AtomicInteger lower = new AtomicInteger(0);
        private final AtomicInteger upper = new AtomicInteger(0);
    
        public void setLower(int i) {
            // Warning -- unsafe check-then-act
            if (i > upper.get()) {
                throw new IllegalArgumentException("can't set lower to " + i + " > upper");
            }
            lower.set(i);
        }
    
        public void setUpper(int i) {
            // Warning -- unsafe check-then-act
            if (i < lower.get()) {
                throw new IllegalArgumentException("can't set upper to " + i + " < lower");
            }
            upper.set(i);
        }
    
        public boolean isInRange(int i) {
            return (i >= lower.get() && i <= upper.get());
        }
    }
    
  • 比如本来range是(0, 10)某一个thread设置setLower(5), 另外一个thread设置setUpper(4), 在"某些很不幸的情况下", 两个thread都通过了check,导致最后的range结果是(5,4) – 一个invalid的结果.
  • 让这个情况能够thread的方法:
    • 用同一个lock(通常是synchronized函数)来包含lower和upper
    • 不要publishing upper和lower这两个reference,防止他们被用户更改.

Publishing Underlying State Variables

  • 大多数情况下都不要在"代理模式"下面去publishing underlying state,除非
    If a state variable is thread-safe, does not participate in any invariants
    that constrain its value, and has not prohibited state transitions for any of
    its operations, then it can safely be published
    
  • 举例来说,上面的mouseListeners和keyListeners是可以被安全publish的

Vehicel Tracker that Publishes Its State

  • 说到publish,我们刚才的vehicle tracker是一个"必须"publish自己的内容的class 那么他就要做到上面的几点:
    • state variable thread-safe
    • does not participate in any invariants
    • has not prohibited state transitions for any of its operations
  • 我们这次的例子跟上次Immutable Point很像,只是使用了thread-safe的SafePoint
    package org.hfeng.book.jcip.ch4.delegate.publish;
    
    import java.util.Collections;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    public class PublishingVehicleTracker {
        private final Map<String, SafePoint> locations;
        private final Map<String, SafePoint> unmodifiableMap;
    
        public PublishingVehicleTracker(Map<String, SafePoint> locations) {
            this.locations = new ConcurrentHashMap<String, SafePoint>(locations);
            this.unmodifiableMap = Collections.unmodifiableMap(this.locations);
        }
    
        public Map<String, SafePoint> getLocations() {
            return unmodifiableMap;
        }
    
        public SafePoint getLocation(String id) {
            return locations.get(id);
        }
    
        public void setLocations(String id, int x, int y) {
            if (!locations.containsKey(id)) {
                throw new IllegalArgumentException("invalid vehicle name:" + id);
            }
            locations.get(id).set(x, y);
        }
    }
    
  • 这次的SafePoint如下,是一个可以改变的thread-safe Point
    package org.hfeng.book.jcip.ch4.delegate.publish;
    
    import org.apache.http.annotation.GuardedBy;
    import org.apache.http.annotation.ThreadSafe;
    
    @ThreadSafe
    public class SafePoint {
        @GuardedBy("this")
        private int x, y;
    
        private SafePoint(int[] a) {
            this(a[0], a[1]);
        }
    
        public SafePoint(SafePoint p) {
            this(p.get());
        }
    
        public SafePoint(int x, int y) {
            this.set(x, y);
        }
    
        public synchronized int[] get() {
            return new int[] {x, y};
        }
    
        public synchronized void set(int x, int y) {
            this.x = x;
            this.y = y;
        }
    }
    
  • 为了保证这个SafePoint的thread-safe,也是煞费苦心:
    • 首先看private SafetPoint(int[] a), 其有两个作用:
      • 首先private ctor肯定是不想被外人调用用来初始化,比如
        int[] arr = new int[] {1, 2};
        // other thread got *arr* can modified it while constructing
        SafetPoint safepoint = new SafetPoint(arr);
        
      • 其次private ctor还可以防止其他人'错误的实现copy ctor',怎么防止?通过自己 实现一个copy ctor, 而且在这个copy ctor里面使用了刚才的private ctor
        public SafePoint(SafePoint p) {
            // Do not worry, p.get() return a new ref, which can
            // not be obtained by other threads!
            this(p.get());
        }
        
        // p.get() return a new ref, which can not be obtained by other threads
        public synchronized int[] get() {
            return new int[] {x, y};
        }
        

Adding Functionality to Existing Thread-safe Classes

  • 在开发中,使用已经存在的库显然是一个好的注意,因为经过很多年的发展,这些库稳定 而且经过长期测试.
  • 但是已经存在的库不一定"百分百"的满足我们的要求,所以我们要增加一些功能.本书 是讨论多线程的,所以这里主要讨论在增加功能的同时,不要破坏原来代码的thread-safe性
  • 我们举个例子,比如原来有个线程安全的SynList, 其有两个函数contains和add, 我们希望 这个SynList里面不能有重复的元素(对List的扩展),所以我们要实现一个putIfAbsent的 函数
  • 新增加的函数不能破坏原有SynList的thread-safe,所以必须保证新增加的函数"必须是 atomic"的.这样才会排除SynList暴露invalid state的风险
  • 为了保证函数的"原子性",我们有两条路:
    1. 更改源代码:在lib里面加入一个新的synchronized函数(或其他同步方法). 这个通 常是一个可以接受的做法,因为一个类的所有操作在一个源文件里面.但是需要你能 够得到源代码,这不是一定能做到的
    2. 继承原来的类,并增加同步方法:这种做法需要父类清晰的在文档里面说明了自己的 同步策略,否则是不可取的(万一父类更改了同步策略怎么办), 下面例子中的Vector 在文档中明确了自己的同步策略,所以方法是可行的
      package org.hfeng.book.jcip.ch4;
      
      import org.apache.http.annotation.ThreadSafe;
      
      import java.util.Vector;
      
      @ThreadSafe
      public class BetterVector<E> extends Vector<E> {
          // When extending a serializable class, you should redefine serialVersionUID
          static final long serialVersionUID = -3963416950630760744L;
      
          public synchronized  boolean putIfAbsent(E x) {
              boolean absent = !contains(x);
              if (absent) {
                  add(x);
              }
              return absent;
          }
      
      }
      

Client-side Locking

  • 有时候你既无法更改源代码,又不知道所操作容器的真实具体类型,比如通过Collections.synchronizedList wrapper返回的容器.这种情况下,还有第三种增加功能的办法:helper函数
  • 下面是一个错误的helper例子,其原因是加错了锁!这里使用了BadListHelper的intrinsic 锁,这显然是不对的.list肯定是使用自己的锁(不一定是自己的instrinsic)
    package org.hfeng.book.jcip.ch4.function;
    
    import org.apache.http.annotation.NotThreadSafe;
    
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    
    @NotThreadSafe
    public class BadListHelper<E> {
        public List<E> list = Collections.synchronizedList(new ArrayList<E>());
    
        public synchronized boolean putIfAbsent(E x) {
            boolean absent = !list.contains(x);
            if (absent) {
                list.add(x);
            }
            return absent;
        }
    }
    
  • list具体使用哪个锁是内部实现,我们通过查阅文档知道它是使用了intrinsic锁,所以 我们可以如下实现helper函数
    package org.hfeng.book.jcip.ch4.function;
    
    import org.apache.http.annotation.ThreadSafe;
    
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    
    @ThreadSafe
    public class GoodListHelper<E> {
        public List<E> list = Collections.synchronizedList(new ArrayList<E>());
    
        public boolean putIfAbsent(E x) {
            synchronized (list) {
                boolean absent = !list.contains(x);
                if (absent) {
                    list.add(x);
                }
                return absent;
            }
        }
    }
    
  • list具体使用哪个锁按说是不应该在文档中记录的(因为这个是内部实现,说出来违反 封装要求). 但是没有这个文档,无论是extend类还是client helper函数都无法实现 因为无法知道是哪个锁.
  • 换句话说, extend类和client helper都"事实上破坏了封装"不应该采用.正确的方法 是组合(composition)
  • 下面的例子就通过包含一个List, 并且对所有access List的函数加锁即便底层List是 一个thread-不safe的实现,我们上面所做的也可以保证最终整个class的thread-safe.
  • 同时它还增加了一个自己的功能函数putIfAbsent
    package org.hfeng.book.jcip.ch4.function;
    
    import org.apache.http.annotation.ThreadSafe;
    
    import java.util.*;
    
    @ThreadSafe
    public class ImprovedList<T> implements List<T> {
        private final List<T> list;
    
        /**
         * PRE: list argument is thread-safe.
         */
        public ImprovedList(List<T> list) { this.list = list; }
    
        public synchronized boolean putIfAbsent(T x) {
            boolean contains = list.contains(x);
            if (contains)
                list.add(x);
            return !contains;
        }
    
        // Plain vanilla delegation for List methods.
        // Mutative methods must be synchronized to ensure atomicity of putIfAbsent.
    
        public int size() {
            return list.size();
        }
    
        public boolean isEmpty() {
            return list.isEmpty();
        }
    
        public boolean contains(Object o) {
            return list.contains(o);
        }
    
        public Iterator<T> iterator() {
            return list.iterator();
        }
    
        public Object[] toArray() {
            return list.toArray();
        }
    
        public <T> T[] toArray(T[] a) {
            return list.toArray(a);
        }
    
        public synchronized boolean add(T e) {
            return list.add(e);
        }
    
        public synchronized boolean remove(Object o) {
            return list.remove(o);
        }
    
        public boolean containsAll(Collection<?> c) {
            return list.containsAll(c);
        }
    
        public synchronized boolean addAll(Collection<? extends T> c) {
            return list.addAll(c);
        }
    
        public synchronized boolean addAll(int index, Collection<? extends T> c) {
            return list.addAll(index, c);
        }
    
        public synchronized boolean removeAll(Collection<?> c) {
            return list.removeAll(c);
        }
    
        public synchronized boolean retainAll(Collection<?> c) {
            return list.retainAll(c);
        }
    
        public boolean equals(Object o) {
            return list.equals(o);
        }
    
        public int hashCode() {
            return list.hashCode();
        }
    
        public T get(int index) {
            return list.get(index);
        }
    
        public T set(int index, T element) {
            return list.set(index, element);
        }
    
        public void add(int index, T element) {
            list.add(index, element);
        }
    
        public T remove(int index) {
            return list.remove(index);
        }
    
        public int indexOf(Object o) {
            return list.indexOf(o);
        }
    
        public int lastIndexOf(Object o) {
            return list.lastIndexOf(o);
        }
    
        public ListIterator<T> listIterator() {
            return list.listIterator();
        }
    
        public ListIterator<T> listIterator(int index) {
            return list.listIterator(index);
        }
    
        public List<T> subList(int fromIndex, int toIndex) {
            return list.subList(fromIndex, toIndex);
        }
    
        public synchronized void clear() { list.clear(); }
    }
    

Documenting Synchronization Policies

  • 文档是管理thread-safe的最有力工具:
    • 用户通过文档来得知class是否thread-safe
    • 维护者通过文档来理解实现的细节,才能在后续的维护中,不破坏原来的thread-safe

Chapter 5: Building Blocks

  • 上一章,我们介绍了如何将class的thread-safety代理给某些已经thread-safe的class 从而做到class自己的thread safe
  • java的lib在5.0和6.0引入了许多的concurrent building blocks,我们在这一章进行介 绍

Synchronized Collections

  • synchronized collection class主要包括:
    • Vector
    • HashTable
    • Collections.synchronizedXXX factory methods
  • synchronized collection class的主要原理是:
    • encapsulating states (private 或者 protected-private)
    • synchronized every public method, 从而保障每次只能有一个thread真正使用collection

Problems with Synchronized Collections

  • synchronized collection是thread-safe的,但是这只能保证你使用一个method之内 是thread-safe的,如果你要多个method连续使用,中间有被其他thread抢占lock的 可能.
  • 如果在你连续调用method的中间被其他thread抢占了lock, 你当前的调用会抛出异常 (而不会进入invalid state), 所以从技术角度讲,还算是thread-safe的
  • compound action就是连续调用method,所以compound action在client side被使用的 时候,是需要额外加锁的. 常见的compound action主要有:
    • iteration : repleatedly fetch elements until the collection is exhausted
    • navigation: find the next element after this one according to some order
    • conditional operations such as put-if-absent : check if a Map has a mapping for key K, and if not, add the mapping (K, V)
  • 下面是一个compund action的例子: 一个client helper调用Vector的method来完成自己的method
    package org.hfeng.book.jcip.ch5;
    
    import java.util.Vector;
    
    public class UnSafeVectorHelpers {
        public static Object getLast(Vector list) {
            int lastIndex = list.size() - 1;
            return list.get(lastIndex);
        }
    
        public static void deleteLast(Vector list) {
            int lastIndex = list.size() - 1;
            list.remove(lastIndex);
        }
    }
    
  • 上面的代码即便被多个thread同时访问,也"不会"破坏Vector,但是在多thread访问的 情况下却不会"完全按照"我们的想法运行
               +-----------+      +-----------+
    Thread A   | size = 10 +----->| remove(9) |
               +-----------+      +-----------+
    
               +-----------+                     +-----------+    +-----------+
    Thread B   | size = 10 +-------------------->|  get(9)   +--->|   boom    |
               +-----------+                     +-----------+    +-----------+
    
  • 比如,当按照上图访问的顺序访问的时候:
    • Thread B在和Thread A的争夺中, 获得lock,调用size函数,获得size为10, 放弃lock
    • Thread A在和Thread B的争夺中, 获得lock,调用size函数,获得size为10, 放弃lock
    • Thread A在和Thread B的争夺中, 再次获得lock, 然后调用remove(9), 放弃lock
    • Thread B在无其他Thread竞争的情况下,获得lock,然后调用get(9), 程序抛出异常 ArrayIndexOutOfBoundsException
  • 上面本来在单线程访问的时候,正常的代码,由于有其他thread的同时运行,产生了意 想不到的结果(虽然还是thread-safe的,因为没有invalid state出现)
  • 为了防止上面的情况出现,我们查阅了synchronized collection的文档,得知,我们可 以使用client side的lock, 同时也必然通知了我们collection使用了intrinsic lock
    package org.hfeng.book.jcip.ch5;
    
    import java.util.Vector;
    
    public class SafeVectorHelpers {
        public static Object getLast(Vector list) {
            synchronized (list) {
                int lastIndex = list.size() - 1;
                return list.get(lastIndex);
            }
        }
    
        public static void deleteLast(Vector list) {
            synchronized (list) {
                int lastIndex = list.size() - 1;
                list.remove(lastIndex);
            }
        }
    }
    
  • 下面的iteration也会遇到相同的情况(被其他thread抢到lock,然后自己的thread会 抛异常).
    for (int i = 0; i < vector.size(); i++) {
        doSomething(vector.get(i));
    }
    
  • 解决方法也是client加锁
    synchronized(vector) {
        for (int i = 0; i < vector.size(); i++) {
            doSomething(vector.get(i));
        }
    }
    

Iterators and Concurrentmodificationexception

  • Vector的循环道size()是比较原始的遍历手段,比较新的java遍历手段是iterator,有 两种表现形式:
    • 通过明确的Iterator
    • 通过foreach
  • iterator的手段并不是说能够"不显式加锁"就做到thread-safe,而是
    把一个modification count和collection联系起来,在遍历的时候,如果发现
    modification count改变了(通常是被其他thread改变)hasNext,或者next
    就会抛出ConcurrentModificationException
    
  • 下面就是一个Iterator的例子(通过foreach实现), 如果有其他thread来更改widgetList 的内容, foreach内部实现的hashNext就会抛ConcurrentModificationException,如果 想防止这种事情的发生,那么就必须synchronized(widgetList)
    List<widget> wdigetList =
        Collections.synchronizedList(new ArrayList<widget>());
    
    //...
    // May throw ConcurrentModificationException
    for (widget w : widgetList) {
        doSomething(w);
    }
    
  • 上面的foreach会抛出异常,但是手段却"不是通过synchronized", 这种做法是存在如 下一种可能, 从而导致invalid state的
    modification count的stale value被hasNext()看到了,从而导致iterator
    没有意识到别的thread更改了数据
    
  • 但是新的jdk这样做的原因是为了提高并发访问下, 程序performance的一种tradeoff
  • foreach是由缺陷,但是在client side对遍历代码加锁的方法,缺陷更大:
    • 其他的thread在你遍历的时候是无法进入的,如果遍历是一个非常漫长的过程,那么 这会极大的降低性能
    • 如果在for循环里面doSomething,的话,很可能会导致deadlock
  • 另外一个遍历的方法是clone整个collection,然后遍历整个克隆.因为clone是thread-confined 所以不会存在线程安全的问题.当然了,性能方面是得优点损失.

Hidden Iterators

  • 下面的例子,在最后输出DEBUG信息的时候,使用了+,也就会"遍历的"调用toString函数
    package org.hfeng.book.jcip.ch5;
    
    import org.apache.http.annotation.GuardedBy;
    
    import java.util.HashSet;
    import java.util.Random;
    import java.util.Set;
    
    public class HiddenIterator {
        @GuardedBy("this")
        private final Set<Integer> set = new HashSet<Integer>();
    
        public synchronized void add(Integer i) {
            set.add(i);
        }
    
        public synchronized void remove(Integer i) {
            set.remove(i);
        }
    
        public void addTenThings() {
            Random r = new Random();
            for (int i = 0; i < 10; i++) {
                add(r.nextInt());
            }
            System.out.println("DEBUG: added ten elements to " + set);
        }
    }
    
  • "遍历的"调用toString函数,在多线程访问的情况下会导致抛出ConcurrentModificationException
  • 更重要的错误,是上面的代码thread-safe不safe(如果thread-safe了,可能会像for size()循环一样返回其他异常), 因为println代码附近没有加锁!(调试代码经常忘了加锁)
  • 如果想让上面的代码变成thread-safe,那么一个可行的办法是把set的类型声明成synchronizedSet, 此外就不需要再加其他同步手段.这等于把同步封装在了class的成员变量里面
    Just as encapsulating a object's state make it easier to preserve
    its invariants, encapsulating its synchronization makes it easier
    to enforce its synchronization policy
    
  • Iteration还会出现在调用collection的hashCode和equals函数的时候(因为collection 会把所有的成员都遍历一遍,计算自己的hashCode),所有这种"隐藏"的遍历,都可能会 抛出ConcurrentModificationException

Concurrent Collections

  • Java 5.0的贡献,就是提供了concurrent collection class:
    • jdk 1.0开始提供的synchronized collection是通过每次"只允许一个thread access" collection来做到线程安全的,这种做法的代价太大,效率太低
    • 而concurrent collection则采取了完全不同的策略:它允许多个thread共同访问collection
  • Java 5.0几个常见的concurrent collection是
    • ConcurrentHashMap -> Hashed Maps
    • CopyOnWriteArrayList -> Lists
    • interface ConcurrentMap 新增了put-if-absent, replace, conditional remove等 操作
  • Java 5.0增加了两个新的collection type:
    • Queue(interface) : 提供了排队策略. 如果队列是空的,从这个队列retrieval的操作 就会返回空. 继承这个接口的主要有如下
      • LinkedList (FIFO, 非concurrent)
      • ConcurrentLinkedQueue(FIFO, concurrent)
      • PriorityQueue (优先队列, 非concurrent)
    • BlockingQueue(interface): 其extends了Queue, 也就有了Queue所有的功能,只是如 果队列是空的,从这个队列retrieval element的操作会block,等待其他thread放进去! 相似的,如果队列是满的,向里面加入element的操作也会block.这对于Producer-consumer 设计来说很适用

ConcurrentHashMap

  • synchronized collection的每一个操作,都必须一直hold住锁,但是很多的操作非常费 时间,比如HashTable.get或者是Vector.contains(因为要遍历整个数据结构, 对于HashTable 来说,有时候hash function设计不好,hashCode的collision过多,如果collision出现 冲突,那么那个hash table entry就需要一个LinkedList来防止所有的值). 这样一来 在寻找的时候,就会让其他thread等待过久
  • ConcurrentHashMap是一个hash-based Map, 只不过它使用了一个完全不同的lock策略 来加锁, 总结起来就是:
    • 很多reader可以共同外加writer访问map
    • 限定个数的writer(不止一个)可以和reader一起访问map
  • Concurrent collection的另外一个提升是:
    • 他们提供了不会抛出异常的iterator, 这样一来就不必再iterator的时候对collection加锁了
    • 他们提供的iterator可以容忍concurrent的改动(也就是说在遍历的同时,更改map的 值是被允许的,不会抛出异常的), 虽然可以容忍concurrent的改动,但是最后只能 "尽最大努力"的反应当时concurrent map的状况,不保证完全重现.这和原来synchronized 的map实现是不一样
      package org.hfeng.book.jcip.ch5;
      
      import java.util.HashMap;
      import java.util.Iterator;
      import java.util.Map;
      import java.util.concurrent.ConcurrentHashMap;
      
      public class ConcurrentHashMapExample {
          public static void main(String[] args) {
              //ConcurrentHashMap
              Map<String, String> myMap = new ConcurrentHashMap<String, String>();
              myMap.put("1", "1");
              myMap.put("2", "2");
              myMap.put("3", "3");
              System.out.println("ConcurrentHashMap before iterator" + myMap);
      
              Iterator<String> it = myMap.keySet().iterator();
      
              while (it.hasNext()) {
                  String key = it.next();
                  if (key.equals("3")) {
                      myMap.put(key + "new", "new3");
                  }
              }
              System.out.println("ConcurrentHashMap after iterator" + myMap);
      
              //HashMap
              myMap = new HashMap<String, String>();
              myMap.put("1", "1");
              myMap.put("2", "2");
              myMap.put("3", "3");
              System.out.println("HashMap before iterator" + myMap);
              Iterator<String> it1 = myMap.keySet().iterator();
      
              while (it1.hasNext()) {
                  String key = it1.next();
                  if (key.equals("3")) {
                      myMap.put(key + "new", "new3");
                  }
              }
              System.out.println("HashMap after iterator" + myMap);
          }
      }
      
      ///////////////////////////////////////////////////////////////////////////////////////////////////////
      // <===================OUTPUT===================>                                                    //
      // ConcurrentHashMap before iterator{1=1, 3=3, 2=2}                                                  //
      // ConcurrentHashMap after iterator{3new=new3, 1=1, 3=3, 2=2}                                        //
      // HashMap before iterator{3=3, 2=2, 1=1}                                                            //
      // java.lang.reflect.InvocationTargetException                                                       //
      //  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)                                   //
      //  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)                 //
      //  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)         //
      //  at java.lang.reflect.Method.invoke(Method.java:606)                                              //
      //  at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)                              //
      //  at java.lang.Thread.run(Thread.java:745)                                                         //
      // Caused by: java.util.ConcurrentModificationException                                              //
      //  at java.util.HashMap$HashIterator.nextEntry(HashMap.java:922)                                    //
      //  at java.util.HashMap$KeyIterator.next(HashMap.java:956)                                          //
      //  at org.hfeng.book.jcip.ch5.ConcurrentHashMapExample.main(ConcurrentHashMapExample.java:36)       //
      //  ... 6 more                                                                                       //
      ///////////////////////////////////////////////////////////////////////////////////////////////////////
      
  • Concurrent collection新的加锁策略也是有一些牺牲的,比如,对全局进行操作的函 数(像size(), isEmpty()), 都是"没有那么真实的"反映collection的concurrent nature
  • 这也很好理解,因为在size()在被计算的时候,map的大小可能已经改变了,返回的值可 能过时了.所以这里的size()只是一个大概的估计,而不是准确的描述
  • 仔细一想,size()这类函数在多线程环境下确实不是很重要,因为这些值都是不停的在 改变, Java牺牲了这些函数的准确性,提高了"最重要操作get, put, containsKey remove" 的性能
  • ConcurrentHashMap唯一"不能"提供的而synchronized Map能够提供的功能就是exclusive access.也只有在这种情况下,synchronized Map是比ConcurrentHashMap更好的选择.

Additional Atomic Map Operations

  • 因为ConcurrentHashMap不能exclusive access,所以内部实现的时候也没有对this加 锁,所以不能使用client side的lock来创建atomic的操作(比如put-if-absent), 为了 弥补,JDK在concurrent collection的内部实现了常用atomic操作
    public interface ConcurrentMap<K, V> extends Map<K, V> {
        // Inert into map only if no value is mapped from K
        V putIfAbsent(K key, V value);
    
        // Remove only if K is mapped to V
        boolean remove(K key, V value);
    
        // Replace value only if K is mpaaed to oldValue
        boolean replace(K key, V oldValue, V newValue);
    }
    

CopyOnWriteArrayList

  • CopyOnWriteArrayList是在并发访问下面效率更高,遍历的时候不需要加锁的List
  • 同样的,CopyOnWriteArraySet也是在并发访问下面效率更高,遍历的时候不需要加锁的Set
  • CopyOnWriteArrayList的原理来自于:immutable对象如果合适的publish,那么对于这 个对象的访问就不再需要加锁
  • CopyOnWriteArrayList的"原数组"可以认为是immutable的,每次write操作,都会触发 一次拷贝整个数组,然后write是在这个新的数组上面完成的,更改完以后再把原来的ref 指向新的数组
  • CopyOnWriteArrayList的操作都是在"原数组"上进行的,因为是"immutable"的,所以读 也不需要加任何的锁
  • CopyOnWriteArrayList的iterator不会抛出ConcurrentModificationException, 而且返回值能够"完全精确"的反应collection当前的状况
  • 当然CopyOnWriteArrayList的效率不高,因为write的时候要重新拷贝.所以CopyOnWriteArrayList 非常适合的领域必定是read操作远大于write操作的领域.

Blocking Queues and the Producer-consumer Pattern

  • Blocking queue的特点如下:
    • 提供queue的常规操作put, take
    • 提供等待一定时间的操作offer(对应put), poll(对应take)
    • 常规put, take如果在queue满的情况下会block
  • Blocking queue的设计简化了java实现producer-consumer pattern的方法:
    • Producer不用管有多少Consumer(甚至有多少Producer也不用管), 只管把数据放到 queue里面(只要queue ready)
    • Consumer不管有多少Producer,只管从queue里面取数据(只要queue ready)
  • 最常见的producer-consumer设计就是thread-pool
  • 类库里面有多重BlockingQueue的FIFO实现(都是concurrent collection):
    • LinkedBlockingQueue 对应LinkedList
    • ArrayBlockingQueue 对应ArrayList
  • Blocking Queue的另外一个实现是PriorityBlockingQueue, 这个queue"不是"按照某 种"先来后到"的顺序,而是按照natural order,或者是Comparator定义的order来行事
  • BlockingQueue还有一种特殊实现叫做SynchronousQueue,这个queue很奇特,它自己不维 护任何的存储空间(好像一个零仓储的超市!)produce的element,马上就被consume光了. 这种queue适合consumer特别多,然后producer特别少,每次基本只有一个element可供 consumer抢夺的情况(也就没有必要维护存储空间)

Example: Desktop Search

  • producer-consumer的一个例子是desktop search, 一般来说desktop search都有如下 两个步骤:
    • 首先是"爬取"所有的文件,放入blockingQueue(如果满就block):
      static class FileCrawler implements Runnable {
          private final BlockingQueue<File> fileQueue;
          private final FileFilter fileFilter;
          private final File root;
      
          public FileCrawler(BlockingQueue<File> fileQueue,
                             final FileFilter fileFilter,
                             File root) {
              this.fileQueue = fileQueue;
              this.root = root;
              this.fileFilter = new FileFilter() {
                      public boolean accept(File pathname) {
                          return true;
                      }
                  };
          }
      
          public void run() {
              try {
                  crawl(root);
              } catch (InterruptedException e) {
                  Thread.currentThread().interrupt();
              }
          }
      
          private void crawl(File root) throws InterruptedException {
              File[] entries = root.listFiles(fileFilter);
              if (entries != null) {
                  for (File entry : entries) {
                      if (entry.isDirectory()) {
                          crawl(entry);
                      } else if (!alreadyIndexed(entry)) {
                          fileQueue.put(entry);
                      }
                  }
              }
          }
      
          private boolean alreadyIndexed(File f) {
              return false;
          }
      
      }
      
    • 其次是"索引"所有文件,从blockingQueue中取出(如果空就block)
      static class Indexer implements Runnable {
          private final BlockingQueue<File> queue;
      
          public Indexer(BlockingQueue<File> queue) {
              this.queue = queue;
          }
      
          public void run() {
              try {
                  while (true) {
                      indexFile(queue.take());
                  }
              } catch (InterruptedException e) {
                  Thread.currentThread().interrupt();
              }
          }
      
          public void indexFile(File file) {
              // Index the file
              System.out.println("Indexing at file -> " + file.getName());
          }
      
      }
      
  • 调用代码如下
    public static void startIndexing(File[] roots) {
        BlockingQueue<File> queue = new LinkedBlockingQueue<File>(BOUND);
        FileFilter filter = new FileFilter() {
                public boolean accept(File file) {
                    return true;
                }
            };
    
        for (File root : roots) {
            new Thread(new FileCrawler(queue, filter, root)).start();
        }
    
        for (int i = 0; i < N_CONSUMERS; i++) {
            new Thread(new Indexer(queue)).start();
        }
    }
    

Deques and Work Stealing

  • java 6又在原来的基础上增加了可以在首位两端进行插入删除操作的双向队列:
    1. Deque(interface) : ArrayDeque(implementation)
    2. BlockingDeque(inteface) : LinkedBlockingDeque(implementation)
  • dqueue也是为了一种pattern而创造的, producer-consumer pattern的变形: Work stealing:
    • 每个consumer 拥有自己的deque(这样多个consumer就不会去竞争一个queue)
    • 每个consumer 在自己的deque使用完了以后,可以去其他consumer的deque里面stealing element. 而且是在尾部进行stealing(这样也会减少和deque consumer的正面冲突, 只会和其他前来stealing的consumer进行争夺)
  • Work stealing在解决consumer同时也是producer的问题中非常的合适:比如在web crawler的处理中:
    • 从自己的deque里面得到一个任务. 在处理这个任务的同时,会发现这个网页有很多 链接,同样需要抓取,
    • 就从"后面"把这些链接任务放入到自己的deque(像producer一样),
    • 而如果自己的deque里面没有任务了,它就会从其他consumer deque的"后面(也就是 producer放入的地方)获得任务"
  • 总体上来讲, working stealing减少了竞争,让每个consumer都处于busy的状态,从而 提高了效率

Blocking and Interruptible Methods

  • Thread有很多种情况下都会被block,常见的情况有:
    • 等待IO完成
    • 等待获得一个锁
    • 等待自己调用的Thread.sleep完成
    • 等待另外一个线程计算结果
  • 当一个thread进入block状态的时候, 它通常会被"搁置(suspended)",然后被置于如下 的一种blocked thread state:
    • BLOCKED
    • WAITING
    • TIMED_WAITING
  • 一个blocking operation和'一个普通的但是需要较长时间完成的'operation之间的差 距是: blocked thread必须等待一个"它自己无法控制"的event,然后才能继续执行.这些 event就是前面说的几种情况:
    • I/O 结束
    • lock available了
    • 外部的计算结束了
  • 当"thread自己无法控制,但又需要"的event出现以后, thread就会又进入RUNNABLE state 重新可以被调度运行了
  • BlockingQueue里面的操作put和take都是checked InterruptedException,也就是说, 这些操作suppose是会被interrupted(而且是checked的,所以调用者要自己处理,或者throw 到上一层解决).
  • 一旦put和take等抛出InterruptedException的操作被interrupted, 他们就有机会停止 自己的blocking状态
  • Thread也提供了一些interrupt的函数, 比如:
    • interrupting a thread的函数
    • querying whether a thread has been interrupted的函数
  • Interruption是一个"合作"的机制,它并不能"强行的"让其他thread停止:
    Thread A interrupt了 Thread B,并不是"强行"停止Thread B,而是"建议" Thread B 停止
    
  • ThreadA interrupt另外的threads的例子
    package org.hfeng.book.jcip.ch5;
    
    public class InterruptOthers {
        final Thread subject1 = new Thread(new Runnable() {
            public void run() {
                while (!Thread.interrupted()) {
                    Thread.yield();
                }
                System.out.println("subject 1 stopped!");
            }
        });
    
        final Thread subject2 = new Thread(new Runnable() {
            public void run() {
                while (!Thread.interrupted()) {
                    Thread.yield();
                }
                System.out.println("subject 2 stopped!");
            }
        });
    
        final Thread coordinator = new Thread(new Runnable() {
            public void run() {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException exe) {
    
                }
                System.out.println("coordinator stopping!");
                subject1.interrupt();
                subject2.interrupt();
            }
        });
    
        public static void main(String[] args) {
            InterruptOthers interruptOthers = new InterruptOthers();
            interruptOthers.subject1.start();
            interruptOthers.subject2.start();
            interruptOthers.coordinator.start();
        }
    }
    
    ////////////////////////////////////////////////////
    // <===================OUTPUT===================> //
    // coordinator stopping!                          //
    // subject 1 stopped!                             //
    // subject 2 stopped!                             //
    ////////////////////////////////////////////////////
    
  • 如果一个函数会抛出InterruptedException,那么说明你的函数是blocking method,也 就意味着你的函数必须对于interruption的到来,有"预案":
    • 可以选择把exception抛给上层
    • 也可以选择restore这个interrupt(也就是让当前thread重新call一遍interrupt): 这是因为某些情况下你无法throw exception,那么你要选择让其他code(higher up the call stack)来发现这个interrupt,然后来处理
      public class TaskRunnable implements Runnable {
          BlockingQueue<Task> queue;
      
          public void run() {
              try {
                  // take is a blocking method
                  processTask(queue.take());
              } catch (InterruptException e) {
                  // restore interrupted status
                  Thread.currentThread().interrupt();
              }
          }
      }
      
  • 上面是应该"如何处理中断", 而下面的我们要介绍一种"不应该如何处理中断"
           捕获了总段,但是不做任何的响应, 这样做会丢失中断的证据
    
  • 只有一种情况允许掩盖中断,那就是你extends了Thread,因此控制了所有处于调用栈上 层, 就像我们上面的InterruptOthers代码.

Synchronizers

  • Blocking queues是一类特殊的容器:
    • 首先能够实现普通容器的功能: contains object
    • 其次能够对"进出容器进行梳理": 因为put和take都会在"不合适的时候",自动block
  • 所谓的Synchronizers,就是满足Blocking queue第二个特性的一种object
    A synchronizer is any object that coordinates the control flow of
    threads based on its state
    
  • 除了Blocking queue以外,jdk自带的synchronizer还有:
    • semaphores
    • barriers
    • latches
    • 根据需要,自己创建的synchronizer
  • 所有的synchronizer都有一些共性:
    • 封装state: 这些state可以决定到达synchronizer的thread是否被允许进入,还是要 wait
    • 管理state: 提供一些函数来管理这些state
    • 提高等待效率: 提供一些函数来提高仅需desired state的等待效率

Latches

  • Latch是一种synchronizer,它的特点是
    • 在latch到达它的terminal state之前, 所有的thread都是被它block住的,没有一个thread 可以通过.
    • 一旦latch到达了它的terminal state,所有的thread都可以通过,
    • 而且latch一旦到达它的terminal state,它再也无法改变了,只能一直允许thread通过.
  • latch的这种特性让它在比较适合应用在"直到某些one-time activity完成,某些 thread才能往前走!", 例子有如下:
    • 直到运算的某些资源被创建完成, 这些运算才能开始
    • 一个service运行需要"另外的service"先运行.
    • 等待自己所有的"组成部分"都完成,然后向前进.
  • CountDownLatch就是java里面一种简单的latch实现:
    • 初始化的时候设置一个正整数, 就是内部的counter number, 也就说,这个latch会在"多少"次后"开门"
    • countDown()函数会减少内部的count number
    • await()函数会等待如下事件:
      1. counter number变成0
      2. 等待的thread 被interrupted
      3. wait超时
  • 下面的Testharness就是介绍了两种常见的latch实现:
    • "starting gate"的count是1, 也就是说有一thread到达了,它就terminal了, 保证 是最快的thread开启了"前门"
    • "ending gate"的count是thread总的数目,也就是说,所有的thread都到达的时候,才 开启了"后门".这样来计算"thread运行的时间", 才能保证准确!
  • 例子如下
    package org.hfeng.book.jcip.ch5;
    
    import java.util.concurrent.CountDownLatch;
    
    public class TestHarness {
        public long timeTasks(int nThreads, final Runnable task) throws InterruptedException{
            final CountDownLatch startGate = new CountDownLatch(1);
            final CountDownLatch endGate = new CountDownLatch(nThreads);
    
            for (int i = 0; i < nThreads; i++) {
                Thread t = new Thread() {
                    @Override
                    public void run() {
                        try {
                            startGate.await();
                            try {
                                task.run();
                            }finally {
                                endGate.countDown();
                            }
                        } catch (InterruptedException ignored) {
    
                        }
                    }
                };
                t.start();
    
            }
    
            long start = System.nanoTime();
            startGate.countDown();
            endGate.await();
            long end = System.nanoTime();
            return end - start;
        }
    
        public static void main(String[] args) {
            System.out.println("Start!");
            TestHarness testHarness = new TestHarness();
            final int count = 10;
            try {
                long ii = testHarness.timeTasks(count, new TaskRunnable());
                System.out.println("Time (in nano second) is " + ii);
            } catch (InterruptedException e) {
    
            }
    
        }
    }
    
    class TaskRunnable implements Runnable {
        public void run() {
            System.out.println("In task!");
        }
    }
    
    ////////////////////////////////////////////////////
    // <===================OUTPUT===================> //
    // Start!                                         //
    // In task!                                       //
    // In task!                                       //
    // In task!                                       //
    // In task!                                       //
    // In task!                                       //
    // In task!                                       //
    // In task!                                       //
    // In task!                                       //
    // In task!                                       //
    // In task!                                       //
    // Time (in nano second) is 479000                //
    ////////////////////////////////////////////////////
    
  • 使用上面的方法,而不是直接new了thread以后就马上start是因为, 使用CountDownLatch 的方法,计算的更准确:
    • 在循环里面,虽然每个thread都start了,但是因为其run()内部调用了startGate.await() 鉴于startGate开始的时候还是1, 所以,所有的循环里面初始化的thread都会block!
    • 直到startGate.countDown()的一瞬间,所有thread开始"竞争". 在startGate.countDown() 之前,已经开始计时
    • endGate会一直等到所有thread都完结,才结束计时
  • 反观设置thread,马上开始start的话. 就会出现某些thread先完成了.比如3个, 而我 们想测试的是"N个thread同时并发的时间", 少了这三个,我们的测试变成了"N-3个thread 同时并发的时间.

FutureTask

  • FutureTask是一种非常特别的"能够返回值"的Thread,原因在于:
    • 使用Callable作为参数, 就能返回值
    • 它继承了Runnable接口, 就本质上说,就是一个thread了. 当然使用的时候是作为Thread class的参数.
  • 下面是一个使用FutureTask的例子
    package org.hfeng.book.jcip.ch5;
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.FutureTask;
    
    public class PreLoader {
        ProductInfo loadProductInfo() throws DataLoadException {
            return null;
        }
    
        private final FutureTask<ProductInfo> future =
                new FutureTask<ProductInfo>(new Callable<ProductInfo>() {
                    public ProductInfo call() throws Exception {
                        return loadProductInfo();
                    }
                });
    
        private final Thread thread = new Thread(future);
    
        public void start() {
            thread.start();
        }
    
        public ProductInfo get() throws DataLoadException, InterruptedException {
            try {
                return future.get();
            } catch (ExecutionException e) {
                Throwable cause = e.getCause();
                if (cause instanceof DataLoadException) {
                    throw (DataLoadException) cause;
                } else {
                    throw launderThrowable(cause);
                }
            }
        }
    
        public static RuntimeException launderThrowable(Throwable t) {
            if (t instanceof RuntimeException) {
                return (RuntimeException) t;
            } else if (t instanceof  Error) {
                throw (Error) t;
            } else {
                throw new IllegalStateException("Not unchecked", t);
            }
        }
    
        interface ProductInfo {
        }
    }
    
    class DataLoadException extends Exception {}
    
  • 从上面的例子我们可以看到:
    • future初始化的时候,成员是Callable类型的匿名class
    • future.get()会:
      • 如果callable的instance运行完了以后,能够返回值,get()也就会返回
      • 如果callable的instance还没运行完,那么就会block住.
    • 因为Callable是一种会抛出异常的thread,那么,我们的future.get()也要处理各种 异常

Semaphores

  • Counting semaphores用来控制访问某种resource的数量.
  • Counting semaphores可以用来实现资源库,或者给一个collection增加"额度"
  • 一个semaphore开始的时候会被设置一个初始的数字,代表它可以发放的permits数量:
    • thread在开始"真正运行以前"先acquire锁
    • thread在离开之前,释放这个锁
  • Semaphore在实现数据库连接池(Data Connection Pool)的时候非常管用.因为连接池 里面就是"一定数量"的资源,如果用尽,再请求的话,就是block了
    package org.hfeng.misc.concurrent.semaphore;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    public class SemaphoreTest {
        private static final int THREAD_COUNT = 10;
    
        private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
    
        private static Semaphore s = new Semaphore(3);
    
        public static void main(String[] args) {
            for (int i = 0; i < THREAD_COUNT; i++) {
                threadPool.execute(new Runnable() {
                    public void run() {
                        try {
                            s.acquire();
                            System.out.println("Save data\n");
                            Thread.sleep(5000);
                            s.release();
                        } catch (InterruptedException e) {
    
                        }
                    }
                });
            }
            threadPool.shutdown();
        }
    }
    
  • 可以使用Semaphore把任何一个collection都变成blocking bounded collection.
    • 初始化Semaphore为collection的长度
    • add会减少permit
    • remove会增加permit

Barriers

  • 前面我们看到了,latch是一种"一次性"开关的存在,一旦进入terminal state,就不允许改变state了
  • Barrier和latch不同,它在释放后,可以重复利用
  • Barrier最本质的用法和latch也有点"相反",比如去饭店吃饭:
    • latch是"只要有一个人来了"就开饭
    • barrier是"要等待所有人都来齐了"才开饭
  • CyclicBarrier例子如下
    package org.hfeng.misc.concurrent.barrier;
    
    import java.util.Random;
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class CyclicBarrierTest {
        public static void main(String[] args) {
            ExecutorService exec = Executors.newCachedThreadPool();
            final Random random = new Random();
    
            final CyclicBarrier barrier = new CyclicBarrier(4, new Runnable() {
                public void run() {
                    System.out.println("All people are here, go to lunch!");
                }
            });
    
            for (int i = 0; i < 4; i++) {
                exec.execute(new Runnable() {
                    public void run() {
                        try {
                            Thread.sleep(random.nextInt(1000));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println(Thread.currentThread().getName() + " is here");
                        try {
                            barrier.await(); // wait for others
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (BrokenBarrierException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
            exec.shutdown();
        }
    }
    
    ////////////////////////////////////////////////////
    // <===================OUTPUT===================> //
    // pool-1-thread-2 is here                        //
    // pool-1-thread-4 is here                        //
    // pool-1-thread-1 is here                        //
    // pool-1-thread-3 is here                        //
    // All people are here, go to lunch!              //
    ////////////////////////////////////////////////////
    

Chapter 6: Task Execution

  • 大多数的并发应用程序都是以task为基础单位组合起来的.
  • 所谓task,就是高度抽象,单独的一份工作
  • 如果能够清晰的划分task,其实也会提高并发程序的响应

Executing Tasks in Threads

  • 划分task的第一步就是区分出"可以感知的task边界(sensible task boundaries)"
  • 理想情况下, task都是独立的行动, 他们不依赖于任何的其他task的state, result, 或者side effect
  • 独立的task会提高并发性能, 因为相互独立,就意味着可以在资源足够多的情况下,尽可 能的并发运行!
  • 为了提高1调度的灵活性和2task的负载均衡, 每个task使用的资源的不易过大
  • 在正常的吞吐情况下, server应该兼具如下两点:
    • 好的吞吐量(good throughput)
    • 好的向英雄(good responsiveness)
  • 而server在承受更大的overload的时候,良好的设计会'平缓的裂化'(graceful degradation) 而拙劣的设计则会迅速宕机.
  • 清晰的划分task边界,配合清晰的task执行政策,能够让server得到良好的设计避免过 载时候的简单粗暴宕机.
  • 大部分的服务器都会选择一个最简单的task边界区分方法:
           每个单独的用户请求(individual client request),作为一个单独的task
    
  • 这种task的"区分"方法非常的明确且简单,所以,很多服务器程序都是这样区分task的:
    • Web server
    • mail server
    • file server
    • EJB container
    • database server (remote client request)
  • 这种区分很明显是不会影响其他'任务'的. 比如你'发送一封邮件到mail server'这个 task,显然是不受'别的邮件发送到同一个mail server'的影响的

Executing Tasks Sequentially

  • 前面说到的是server如何区分task, 下面我们来讲的都是server如何在多个task到来 的情况下处理这些请求(task),并返回结果的
  • 最简单同时也是最安全,也是最低效的方法就是'线性的'(也就是整个进程只有一个线程的)处理这些请求
  • 下面就是一个单线程处理请求的例子, HTTP请求都是通过80端口到达这个程序
    public class SingleThreadWebServer {
        public static void main(String[] args) throws IOException{
            ServerSocket socket = new ServerSocket(80);
            while (true) {
                Socket connection = socket.accept();
                handleRequest(connection);
            }
        }
    
        private static void handleRequest(Socket connection) {
            // reuest-handling logic here
        }
    }
    
  • 这种方法正确性是无疑的,但是在生产环境中效率也必定是非常的差.每次只能处理一 个请求的server在实际情况中不存在.
  • 理想情况下,如果server处理request的时间短到忽略不计,那么single thread server 还是有市场的,但现实的情况下, server至少会在如下两个情况下block:
    • perform socket I/O
    • perform database
  • 在桌面CS系统中,由于没有上述两种IO,而single thread server的设计简单而又安全 所以GUI框架往往采取single thread的架构

Explicity Creating Threads for Tasks

  • 一个更高效的方法是为每一个请求都创建一个thread
    public class ThreadPerTaskWebServer {
        public static void main(String[] args) throws IOException{
            ServerSocket socket = new ServerSocket(80);
            while (true) {
                final Socket connectio = socket.accept();
                Runnable task = new Runnable() {
                    public void run() {
                        handleRequest(connection);
                    }
                };
                new Thread(task).start();
            }
        }
    
        private static void handleRequest(Socket connection) {
            // request-handling logic here
        }
    }
    
  • 这里的main thread在接受到一个新的请求以后,不再是简单的自己埋头处理了,而是 创建了一个新的thread来处理这个task. 这样做会产生如下的后果:
    • 处理task的线程会和main线程分道扬镳(有不同的call stack), 所以会提高main thread的响应速率
    • 所有被接收的task都有了自己的thread,在资源多的情况下,可以"并行"的执行,在 IO block的时候,也不至于让CPU空转(可以调度给其他thread).
    • 函数handleRequest必须是thread-safe的, 因为它会同时被多个thread访问

Disadvantages of Unbounded Thread Creation

  • 为每一个thread分别一个线程的作法,其实也无法达到在production的应用强度.因为 这个方法也有比较明显的问题,特别是在thread数量过多的时候:
    • Thread的创建和删除其实也是需要消耗一定的CPU时间的
    • Thread的创建也是需要浪费内存的,特别是thread数目超过了processor的数目,那 么多出来的thread只是白白的占用内存而已.
    • 一个进程可以创建的thread数目是有限制的,这个限制通常是操作系统给的.一旦超 过这个限制, OutOfMemoryError是肯定无法避免的了.
  • 为每一个request创建一个thread的方法,在实验的时候还可以试试,但是它"无法控制 thread创建的数目", 所以一旦布置上线,很可能会被黑客攻击.即便不算黑客,用户过 多也会让整个server突然宕机.

The Executor Framework

  • task是从逻辑上来区分出的一个块工作,而thread是java中能够支持"一个快工作"的机 制. 从逻辑上来讲,每个task一个thread是合乎情理的,但是task来了以后,多久给一个 thread是前面争论的焦点:
    • Single Thread Server是说每一个时刻都只给最前面的task建立一个thread. 这种 方法对server来说,效率太差
    • Thread Per Task Server是说,所有的task,只要来了,立马给你建立一个thread. 这 种方法,资源申请容易超出控制.
  • 所以很明显上面两种做法是两个极端,正确的做法是:
    • 给一个task赋予一个thread是肯定的
    • 但是要控制thread的规模,在某一个时段,只能有N个thread.如果超过了这个thread数 目,再有task来,就block他们.
  • java 5给出的答案是Executor的一个implementation(因为Executor):
    fixed-sized thread pool (能够保证同时只有一定数目thread的存在)
    
  • 我们首先来看看Executor(它是一个接口)
    public interface Executor {
        void execute(Runnable command);
    }
    
  • Executor接口本身没有多么代码,但是它确有强大的意义, 因为它提供了一种规范(standard), 把task submission和task execution区分开了.它使用了Runnable 来描述task.
    Executor provides a standard means of decoupling task submission
    from task execution, describing tasks as Runnable.
    
  • Executor主要关注producer-consumer 模式,也可以这么认为,如果你的代码主要是这 种模式,那么你可以使用Executor,放弃Thread的方式来创建新线程. 而且不出所料的在 某个Executor的实现里面我们发现了BlockingQueue的身影
    private final BlockingQueue<Runnable> workQueue;
    public class ThreadPoolExecutor extends AbstractExecutorService {
        //...
        private final BlockingQueue<Runnable> workQueue;
        //...
    }
    

Example: Web Server Using Executor

  • 下面就是一个使用executor来创建thread的例子
    package org.hfeng.book.jcip.ch6;
    
    import java.io.IOException;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;
    
    public class TaskExecutionWebServer {
        private static final int NTHREADS = 100;
        private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);
    
        public static void main(String[] args) throws IOException{
            ServerSocket socket = new ServerSocket(80);
            while (true) {
                final Socket connection = socket.accept();
                Runnable task = new Runnable() {
                    public void run() {
                        handleRequest(connection);
                    }
                };
                exec.execute(task);
            }
        }
    
        private static void handleRequest(Socket connection) {
            // request-handling logic here
        }
    }
    
  • 我们也可以使用executor来实现ThreadPer-TaskWebServer, 代码如下
    public class ThreadPerTaskExecutor implements Executor {
        public void execute(Runnable r) {
            new Thread(r).start();
        }
    }
    

Execution Policies

  • 所谓execution policy是用来决定"what, where, when, and how" of task execution, 具体来说就是:
    • task会在哪个thread里面执行(并不是每个task都有一个thread)
    • task执行的顺序是什么
    • 最多有多少task"并发concurrently"执行
    • 最多有多少task可以pending等待执行
    • 如果系统过载,需要杀掉一些task, 哪些task会被首先牺牲
    • 执行task之前和之后要有什么操作
  • Execution policy是一种资源管理池(resource management tool): 通过限制同时执 行的task的数目, 来达到保证系统不会因为资源耗尽而崩溃,或者太多线程争抢系统 资源大打出手,导致系统响应过慢。
  • 因为executor的天然的将"submission"和"execution"分开了,导致我们更加容易的更 改execution policy: 比如我们可以在编译阶段再根据硬件状况来决定使用何种的 execution policy
  • 每当你看到如下代码. 你就可以想办法把它变成executor了, 因为后者更加的灵活,更 容易面对不断更改的需求.
    new Thread(runnable).start()
    

Thread Pools

  • 所谓Thread Pool,就是管理一系列相似的worker thread的"容器"
  • 一个thread pool都会和一个work queue合作
  • work thread会从work queue里面申请一个任务,执行完之后,再放回去.
  • 使用thread pool相比于thread-per-task有很多的优势:
    • 重用thread,就不必为每个thread都创建和销毁thread: 创建和销毁thread通常是 消耗系统资源的
    • task来的时候,thread已经在了.所以反应会很快
    • thread pool的大小设计的合适,会在机器能够承受的情况下,最大限度利用机器资源
  • java的Executors(注意有个s) class里面有如下的static factory method来提供 thread pool实现:
    • newFixedThreadPool : 线程数固定
    • newCachedThreadPool : 线程数可以在一定限度下增加
    • newSingleThraedExecutor: 只有一个线程的池子
    • newScheduledThreadPool: 线程数目固定:主要用来支持delayed, periodic task(和 后面的Timer类似)

Executor Lifecycle

  • 下面我们来看看如何关闭executor. 关闭executor就好像关闭一个网上服务:
    • 优雅的关闭方式: 不在接受新的任务,老的任务逐渐停止,一旦全部停止了,服务也 就停止
    • "粗暴"的关闭方式:直接关机, 老的不再管,新的也不会接受成功.
    • 介于"粗暴"和"优雅"之间的方式
  • ExecutorService借口继承了Executor接口,提供了一系列的处理executor lifecycle 的函数
    public interface ExecutorService extends Executor {
        void shutdown();
        List<Runnable> shutdownNow();
        boolean isShutdown();
        boolean isTrminated();
        boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;
        // ... additional convenience methods for task submission
    }
    
  • 对于ExecutorService的Executor来说,它有三种state:
    • running: 程序开始的时候就是这个状态
    • shutting down:
      1. 调用了shutdown函数后:此时执行的是graceful shutdown
      2. 调用了shutdownNow函数后: 此时执行的是abrupt shutdown
    • terminated: 所有的task都结束以后(比如graceful shutdown了,还要把手头的task 结束掉), 就进入了terminated模式
  • 下面就是一个使用ExecutorService接口的实现
    package org.hfeng.book.jcip.ch6;
    
    import java.io.IOException;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.RejectedExecutionException;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    
    public class LifecycleWebServer {
        private final ExecutorService exec = Executors.newCachedThreadPool();
    
        public void start() throws IOException {
            ServerSocket socket = new ServerSocket(80);
    
            while (!exec.isShutdown()) {
                try {
                    final Socket conn = socket.accept();
                    exec.execute(new Runnable() {
                        public void run() {
                            handleRequest(conn);
                        }
                    });
                } catch (RejectedExecutionException e) {
                    if (!exec.isShutdown()) {
                        log("task submission rejected", e);
                    }
                }
            }
        }
    
        public void stop() {
            exec.shutdown();
        }
    
        private void log(String msg, Exception e) {
            Logger.getAnonymousLogger().log(Level.WARNING, msg, e);
        }
    
        void handleRequest(Socket connection) {
            Request req = readRequest(connection);
            if (isShutdownRequest(req)) {
                stop();
            } else {
                dispatchRequest(req);
            }
        }
    
        private Request readRequest(Socket s) {
            return null;
        }
    
        private void dispatchRequest(Request r) {
        }
    
        private boolean isShutdownRequest(Request r) {
            return false;
        }
    
        interface Request {
        }
    }
    
  • 从上面的代码我们可以看到,如果想关闭这个executor,有两个办法:
    • 本地stop()
    • client端还可以发送一个request过来,如果是一个shutdownrequest(函数isShutdownRequest(req) 的结果为true的话), 那么也会导致executor关闭

Delayed and Periodic Tasks

  • Java里面有Timer一类的函数,用来运行 deferred 和 periodic的事件:
    • deferred task: run this task in 100ms
    • periodic task: run this task every 10 ms
  • Timer类函数,有一些自身的缺点,比如只能使用absolute的time设置,无法设置relative 的time设置. 所以一旦系统时间有改变,对于Time来说就可能会有问题.
  • Timer的问题还主要在它内部给"所有"Timer task分配的"总共"只有一个thread,所以 如果一个TimerTask被设定为没10ms运行一次,而另外一个设置为每40ms运行一次. 那 么可能的结果是:
    1. 要么40ms的先得到那个thread运行,然后结束了.另外一个task猜得到thread,会接 连运行两次
    2. 要么10ms的先得到thread,然后运行, 40ms的就被完全忘记了
  • Timer的再一个问题也是"自己只有一个thread为所有的TimeTask服务"这个原因造成的, 比如其中一个TimeTask抛出异常,这个内部的thread就结束了,全然不顾可能还有其他 TimeTask还要用到它, 例子如下
    package org.hfeng.book.jcip.ch6;
    
    import java.util.Timer;
    import java.util.TimerTask;
    import static java.util.concurrent.TimeUnit.SECONDS;
    
    public class OutOfTime {
        public static void main(String[] args) throws Exception{
            Timer timer = new Timer();
            timer.schedule(new ThrowTask(), 1);
            SECONDS.sleep(1);
            timer.schedule(new ThrowTask(), 1);
            SECONDS.sleep(5);
        }
    
        static class ThrowTask extends TimerTask {
            public void run() {
                throw new RuntimeException();
            }
        }
    }
    
    /////////////////////////////////////////////////////////////////////////////////////////////////////
    // <===================OUTPUT===================>                                                  //
    // Exception in thread "Timer-0" java.lang.RuntimeException                                        //
    //  at org.hfeng.book.jcip.ch6.OutOfTime$ThrowTask.run(OutOfTime.java:18)                          //
    //  at java.util.TimerThread.mainLoop(Timer.java:555)                                              //
    //  at java.util.TimerThread.run(Timer.java:505)                                                   //
    // Exception in thread "main" java.lang.IllegalStateException: Timer already cancelled.            //
    //  at java.util.Timer.sched(Timer.java:397)                                                       //
    //  at java.util.Timer.schedule(Timer.java:193)                                                    //
    //  at org.hfeng.book.jcip.ch6.OutOfTime.main(OutOfTime.java:12)                                   //
    //  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)                                 //
    //  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)               //
    //  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)       //
    //  at java.lang.reflect.Method.invoke(Method.java:606)                                            //
    //  at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)                        //
    /////////////////////////////////////////////////////////////////////////////////////////////////////
    
  • ScheduledThreadPoolExecutor一定程度上弥补了Timer的缺点,可以看做是Timer实现 delay或periodic task的一种replacement

Finding Exploitable Parallelism

  • 在server系统中,一个request就天然的是一个task,也就天然的可以分给一个thread去 使用
  • 但是在desktop开发中,却没有request这么好的"自然边界"存在.需要我们自己去找寻.

Example: Sequential Page Renderer

  • 处理HTML解析最简单的办法,当然就是线性的处理HTML文件:
    • 遇到文字markup, 放到处理结果里面
    • 遇到图片markup, 下载图片, 然后放到处理结果里面
    • 最后都解析完,把结果展示给用户
  • 上述线性处理HTML解析的方法非常容易实现,但是这可能会让用户不满:因为你要过很 久才能把网页全部解析完,然后用户才能看. 但是很多时候,用户不想傻等,他想先看到 点什么
  • 一个微小的改进,就是先显示"文字",因为文字解析的快.然后图片是作为"占位符"放在 那里.等待下载完成. 下面就是这样一个例子: 先renderText, image在后面慢慢来.
    package org.hfeng.book.jcip.ch6;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public abstract class SingleThreadRenderer {
        void renderPage(CharSequence source) {
            renderText(source);
            List<ImageData> imageData = new ArrayList<ImageData>();
            for (ImageInfo imageinfo: scanForImageInfo(source)) {
                imageData.add(imageinfo.downloadImage());
            }
            for (ImageData data : imageData) {
                renderImage(data);
            }
        }
    
        interface ImageData {
        }
    
        interface ImageInfo {
            ImageData downloadImage();
        }
    
        abstract void renderText(CharSequence s);
        abstract List<ImageInfo> scanForImageInfo(CharSequence s);
        abstract void renderImage(ImageData i);
    }
    
  • 上面的操作还是线性的, 更好的方法当然是把整个解析HTML语言的过程分成'不同的 相互不干扰'的task,然后分给不同的thread去做

Result-bearing Tasks: Callable and Future

  • Exacutor内部使用了Runnable接口来表达"task"的概念, 但是Runnable接口有如下的 缺点:
    • 无法返回值
    • 无法抛出checked exception
  • Callable弥补了Runnable的两个缺陷,可以看做是一种良好的replacement.Executor是 可以include callable的
    public interface Callable<V> {
        V call() throws Exception;
    }
    
  • 无论是Runnable还是Callable其实都是描述task的, 而这个"task"一旦被Executor所 包括,那么它通常有如下四个生命周期:
    • created
    • submitted
    • started
    • completed
  • 我们前面说了,Executor的提出就是用有限的几个thread来运行"多得多的"task, 也 就是说,一个task可能要过很久才能轮的上去执行.那么我们就会想着去cancle某些task, 在Executor framework下,cancle规则如下:
    • submitted了但是没started的,总是可以cancle
    • started了,某些情况下可以cancle
    • 如果已经completed了,那么即便你调用了cancle,也不会有副作用
  • Future用来管理(比如查看task是否完成, 是否被cancel, 读取结果等待)task的声明 周期.
    public interface Future<V> {
        boolean cancel(boolean mayInterruptIfRunning);
        boolean isCancelled();
        boolean isDone();
        V get() throws InterruptedException, ExecutionException,
                       CancellationException;
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException,
                   CancellationException, TimeoutException;
    }
    
  • ExecutorService的submit函数会返回一个Future的instance,用来追踪这个task的状 态.
  • FutureTask是一个实现了1.Runnable和2.Future接口的类, 所以把它自己就可以检测 自己的状态

Example: Page Renderer with Future

  • 我们来实现我们刚才的设想: 把处理HTML代码的过程分成多个task. 这里我们的初 步设想是分成两个task:
    • 一个转换text markup (CPU-bound)
    • 一个下载图片 (I/O-bound)
  • 下面就是我们的代码, 我们使用Callable来下载图片,而在main thread里面进行进行 文本的转换. 而一旦所有的文本都下载下来的话,我们可以把他们都显示出来
    package org.hfeng.book.jcip.ch6;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.*;
    
    public abstract class FutureRenderer {
        private final ExecutorService executor = Executors.newCachedThreadPool();
    
        void renderPage(CharSequence source) {
            final List<ImageInfo> imageInfos = scanForImageInfo(source);
            Callable<List<ImageData>> task =
                    new Callable<List<ImageData>>() {
                        public List<ImageData> call() throws Exception {
                            List<ImageData> result = new ArrayList<ImageData>();
                            for (ImageInfo imageInfo : imageInfos) {
                                result.add(imageInfo.downloadImage());
                            }
                            return result;
                        }
                    };
            Future<List<ImageData>> future = executor.submit(task);
            renderText(source);
    
            try {
                List<ImageData> imageData = future.get();
                for (ImageData data : imageData) {
                    renderImage(data);
                }
            } catch (InterruptedException e) {
                // Re-assert the thread's interrupted status
                Thread.currentThread().interrupt();
                // Wd don't need the result, so cancel the task too
                future.cancel(true);
            } catch (ExecutionException e) {
    
            }
        }
    
        interface ImageData {
        }
    
        interface ImageInfo {
            ImageData downloadImage();
        }
    
        abstract void renderText(CharSequence s);
    
        abstract List<ImageInfo> scanForImageInfo(CharSequence s);
    
        abstract void renderImage(ImageData i);
    }
    
  • 这个例子中已经有了一点"并发"了: 文本处理的同时,在下载图片. 但是其实我们可 以做的更加的并发:因为所有图片都下载好了再呈现,显然不如'有个图片就呈现'更好

Limitations of Parallelizing Heterogeneous Tasks

  • 上面我们区分task的方法是:把"不同"的task分给"不同"的thread:
    • 下载的task分给callable thread
    • 处理文字的分给main thread
  • 这种分配任务的方法有很大的缺点:因为任务不同,所以万一某个任务比较容易完成, 而另外一个需要的时间过长.那么其中有一个thread很快就结束了.使用多线程的意义 不大.
  • 比如上面的下载task用了10秒钟, 而文字处理使用了1秒钟,那么总共使用10秒,很原来 线性的11秒比起来,并没有多大的提升(9%而已), 而我们却使用了两个thread, 理论 最大提升是50%
  • 所以多线程的正道是把"相同"的task分到"不同"的thread里面
    The real performance payoff of dividing a program's workload into
    tasks comes when there are a large number of independent, homogeneous
    tasks that can be processed concurrently
    

CompletionService: Executor Meets BlockingQueue

  • 下面我们的做法就应该是找到那些"相同"的task了. 一个页面上面有n个图片. 每个图 片的下载过程就是一个task
  • 我们有多个task,但是每个task的完成时间是有限度的.我们不可能busy waiting来 不停的去submit返回的那个Future来查看哪些已经可以了.这太麻烦了.我们期望有一 个container来管理这些Future, 而且希望能够做到每次take都能返回完成的工作, 没 有完成的就block (这很像blockingQueue)
  • 于是CompletionService在聚合(aggregate)了Executor和BlockingQueue之后为大家 提供了一个既"能够以Executor的形式运行相同task"又"能以容器的结构保存Future结果"
  • 下面是CompletionService的一个实现: ExecutorCompletionService
    public class ExecutorCompletionService<V> implements CompletionService<V> {
        private final Executor executor;
        private final AbstractExecutorService aes;
        private final BlockingQueue<Future<V>> completionQueue;
        // ...
    }
    

Example: Page Renderer with CompletionService

  • 下面就是使用CompletionService的一个例子.
  • 从例子中我们可以看到, CompletionService使用的是一个executor (具体使用哪种 executor,可以在实例化的时候再决定).
  • 这里的CompletionService: 处理了多个thread的返回结果(用BlockingQueue保存), 其实相当于处理"一个"thread时候的Future.

Placing Time Limits on Tasks

  • 处理task的时候,如果过了一定的时间,这个task还没有完,那么就没有必要再等了.我 们可以使用timed版本的Future.get(): 一旦过了一定时间,它就不再去get了,而是抛 出TimeoutException
  • 下面就是一个timed版本的Future.get使用方法(多了两个时间参数), 如果在规定时间 内得不到值,那么就可以cancle这个task(通过Future), 然后调用default的广告.
    Page renderPageWithAd() throws InterruptedException {
        long endnanos = System.nanoTime() + TIME_BUDGET;
        Future<Ad> f = exec.submit(new FetchAdTask());
        // Render the page while waiting for the ad
        Page page = renderPageBody();
        Ad ad;
        try {
            // Only wait for the remaining time budget
            long timeLeft = endNanos - System.nanoTime();
            ad = f.get(timeLeft, NANOSECONDS);
        } catch (ExecutionException e) {
            ad = DEFAULT_AD;
        } catch (TimeoutException e) {
            ad = DEFAULT_AD;
            f.cancel(true);
        }
        page.setAd(ad);
        return page;
    }
    

Example: A Travel Reservation Portal

  • 前面讲了如何给一个Future.get设置timeout时间.如果有很多个task,要设置同样的 timeout的话,我们给出的解决方案是invokeAll
  • 下面是一个invokeAll的例子:航空公司选择网站
    private class QuoteTask implements Callable<TrabelQueue> {
        private final TravelCompany company;
        private final TravelInfo travelInfo;
        //...
        public TravelQuote call() throws Exception {
            return company.solicitQuote(travelInfo);
        }
    }
    
    public class Portal {
        public List<TravelQuote> getRankedTravelQuotes(TravelInfo travelInfo,
                                                       Set<TravelCompany> companies,
                                                       Comparator<TravelQuote> ranking,
                                                       long time,
                                                       TimeUnit unit) throws InterruptedException {
            List<QuoteTask> tasks = new ArrayList<QuoteTask>();
            for (TravelCompany company : companies) {
                tasks.add(new QuoteTask(company, travelInfo));
            }
    
            List<Future<TravelQuote>> futures =
                exec.invokeAll(tasks, time, unit);
    
            List<TravelQuote> quotes =
                new ArrayList<TravelQuote>(tasks.size());
    
            Iterator<QuoteTask> taskIter = tasks.iterator();
            for (Future<TravelQuote> f : futures) {
                QuoteTask task = tastIter.next();
                try {
                    quotes.add(f.get());
                } catch (ExecutionException e) {
                    quotes.add(task.getFailureQuote(e.getCause()));
                } catch (CancellationException e) {
                    quotes.add(task.getTimeoutQuote(e));
                }
            }
        }
        Collections.sort(quotes, ranking);
        return quotes;
    }
    
  • 用户在输入了旅行时间和旅行要求以后.网站利用各个航空公司网站的API去查询. 显 然等到所有的航空公司都返回数据然后显示显然是不正确的.
  • 正确的做法是等待一定的时间,没返回数据的航空公司就不管了.然后把取到数据的航 空公司排序(因为要排序,所以不能出现一个显示一个), 然后显示给用户. 没返回数 据的航空公司就显示一条"没有从Airline Java得到数据"就可以了
  • 给他所有航空公司的timeout是一样的,所以我们在invokeAll里面一次设置就可以了.