1.内置锁的弊端

内置锁的使用:

synchronized(object){
    //使用共享资源
}

内置锁虽然方便但是又很多限制:

  1. 一个线程因为等待内置锁而进入阻塞之后,就无法终端该线程
  2. 尝试获取内置锁时,无法设置超时
  3. 获得内置锁,必须使用synchronized块

2. ReentrantLock

超越内置锁就是靠ReentrantLock替代synchronized工作,弥补内置锁的问题。用法如下:

Lock lock = new ReentrantLock();
lock.lock();
try{
    //使用共享资源
} finally{
 lock.unlock();     //这个属于最佳实践,无论内部代码发生什么错误都会释放锁
}

2.1 可中断的锁

一个死锁的例子如下。你只能通过终止JVM才能结束这个死锁。

/***
 * Excerpted from "Seven Concurrency Models in Seven Weeks",
 * published by The Pragmatic Bookshelf.
 * Copyrights apply to this code. It may not be used to create training material, 
 * courses, books, articles, and the like. Contact us if you are in doubt.
 * We make no guarantees that this code is fit for any purpose. 
 * Visit http://www.pragmaticprogrammer.com/titles/pb7con for more book information.
***/
package com.paulbutcher;

public class Uninterruptible {

  public static void main(String[] args) throws InterruptedException {

    final Object o1 = new Object(); final Object o2 = new Object();

    Thread t1 = new Thread() {
      public void run() {
        try {
          synchronized(o1) {
            Thread.sleep(1000);
            synchronized(o2) {}
          }
        } catch (InterruptedException e) { System.out.println("t1 interrupted"); }
      }
    };

    Thread t2 = new Thread() {
      public void run() {
        try {
          synchronized(o2) {
            Thread.sleep(1000);
            synchronized(o1) {}
          }
        } catch (InterruptedException e) { System.out.println("t2 interrupted"); }
      }
    };

    t1.start(); t2.start();
    Thread.sleep(2000);
    t1.interrupt(); t2.interrupt();     //该方法无法中断死锁线程
    t1.join(); t2.join();
  }
}

如果使用ReentrantLock可以使用它的lockInterruptibly()方法来终止死锁的线程。

/***
 * Excerpted from "Seven Concurrency Models in Seven Weeks",
 * published by The Pragmatic Bookshelf.
 * Copyrights apply to this code. It may not be used to create training material, 
 * courses, books, articles, and the like. Contact us if you are in doubt.
 * We make no guarantees that this code is fit for any purpose. 
 * Visit http://www.pragmaticprogrammer.com/titles/pb7con for more book information.
***/
package com.paulbutcher;

import java.util.concurrent.locks.ReentrantLock;

public class Interruptible {

  public static void main(String[] args) throws InterruptedException {

    final ReentrantLock l1 = new ReentrantLock();
    final ReentrantLock l2 = new ReentrantLock();

    Thread t1 = new Thread() {
      public void run() {
        try {
          l1.lockInterruptibly();       //获取可中断的锁l1,和明显l1是为了t1的互斥访问而加的锁
          //访问t1的互斥资源
          Thread.sleep(1000);
          l2.lockInterruptibly();       //获取可中断的锁t2
            //访问t2的互斥资源(虽然这里什么都没做,但是这里加锁的意思是因为要访问临界资源)
        } catch (InterruptedException e) { System.out.println("t1 interrupted"); }
      }
    };

    Thread t2 = new Thread() {
      public void run() {
        try {
          l2.lockInterruptibly();
          Thread.sleep(1000);
          l1.lockInterruptibly();
        } catch (InterruptedException e) { System.out.println("t2 interrupted"); }
      }
    };

    t1.start(); t2.start();
    Thread.sleep(2000);
    t1.interrupt(); t2.interrupt();     //这时候再调用interrupt方法就可以终止线程了。
    t1.join(); t2.join();
  }
}

2.2 设置获取锁超时时间

获取锁一直获取不到,可以设置超时。之前哲学家进餐的代码,我们可以通过设定一个全局固定顺序获取锁来避免死锁。在下面的哲学家类中我们通过设置超时使得即使发生了死锁,也可以从中恢复出来。在例子中我们可以看到通过使用ReentrantLock的tryLock方法可以尝试获取锁,并且设定超时时间,如果超时了就不继续等待了。

/***
 * Excerpted from "Seven Concurrency Models in Seven Weeks",
 * published by The Pragmatic Bookshelf.
 * Copyrights apply to this code. It may not be used to create training material, 
 * courses, books, articles, and the like. Contact us if you are in doubt.
 * We make no guarantees that this code is fit for any purpose. 
 * Visit http://www.pragmaticprogrammer.com/titles/pb7con for more book information.
***/
package com.paulbutcher;

import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

class Philosopher extends Thread {
  private ReentrantLock leftChopstick, rightChopstick;
  private Random random;
  private int thinkCount;

  public Philosopher(ReentrantLock leftChopstick, ReentrantLock rightChopstick) {
    this.leftChopstick = leftChopstick; this.rightChopstick = rightChopstick;
    random = new Random();
  }

  public void run() {
    try {
      while(true) {
        ++thinkCount;
        if (thinkCount % 10 == 0)
          System.out.println("Philosopher " + this + " has thought " + thinkCount + " times");
        Thread.sleep(random.nextInt(1000)); // Think for a while
        leftChopstick.lock();
        try {
          if (rightChopstick.tryLock(1000, TimeUnit.MILLISECONDS)) {
            // Got the right chopstick
            try {
              Thread.sleep(random.nextInt(1000)); // Eat for a while
            } finally { rightChopstick.unlock(); }
          } else {
            // Didn't get the right chopstick - give up and go back to thinking
            System.out.println("Philosopher " + this + " timed out");
          }
        } finally { leftChopstick.unlock(); }
      }
    } catch(InterruptedException e) {}
  }
}

采用超时的方式来从死锁中恢复出来并不是一个好的方式,因为这样无法避免死锁。而且可能出现活锁的问题,即大家同时放弃获取锁,又同时开始获取锁。虽然可以通过设置不同的超时时间来避免,但是后面还有更好的方式。

2.3 交替锁

链表中需要插入一个节点,一种做法是用锁保护整个链表,显然这种方式效率太低了。交替锁可以只锁住链表的一部分。在链表中交替加锁的过程如下,即不断的加锁和解锁,直到找到要插入的位置对两边的节点加锁。


内置锁无法完成这种效果,可以采用ReentrantLock在需要的地方使用lock和unlock方法即可。交替锁的有序链表实现如下:

/***
 * Excerpted from "Seven Concurrency Models in Seven Weeks",
 * published by The Pragmatic Bookshelf.
 * Copyrights apply to this code. It may not be used to create training material, 
 * courses, books, articles, and the like. Contact us if you are in doubt.
 * We make no guarantees that this code is fit for any purpose. 
 * Visit http://www.pragmaticprogrammer.com/titles/pb7con for more book information.
***/
package com.paulbutcher;

import java.util.concurrent.locks.ReentrantLock;

class ConcurrentSortedList {

  private class Node {
    int value;
    Node prev;
    Node next;
    ReentrantLock lock = new ReentrantLock();

    Node() {}

    Node(int value, Node prev, Node next) {
      this.value = value; this.prev = prev; this.next = next;
    }
  }

  private final Node head;
  private final Node tail;

  public ConcurrentSortedList() {
    head = new Node(); tail = new Node();
    head.next = tail; tail.prev = head;
  }

  public void insert(int value) {           //insert方法用于在两个链表节点之间插入节点
    Node current = head;
    current.lock.lock(); 
    Node next = current.next;
    try {
      while (true) {        
        next.lock.lock(); 
        try {
          if (next == tail || next.value < value) {         //找到插入位置则进行插入并且跳出while,要不然就一直获取锁判断,再释放锁(finally方法中释放了锁)
            Node node = new Node(value, current, next); 
            next.prev = node;
            current.next = node;
            return; 
          }
        } finally { current.lock.unlock(); } 
        current = next;
        next = current.next;
      }
    } finally { next.lock.unlock(); }       //没找到插入位置,则释放锁
  }

  public int size() {           //求长度的方法,也采用交替锁从尾巴遍历到头部,获得整个链表的大小。
    Node current = tail;
    int count = 0;

    while (current.prev != head) {
      ReentrantLock lock = current.lock;
      lock.lock();
      try {
        ++count;
        current = current.prev;
      } finally { lock.unlock(); }
    }

    return count;
  }

  public boolean isSorted() {
    Node current = head;
    while (current.next.next != tail) {
      current = current.next;
      if (current.value < current.next.value)
        return false;
    }
    return true;
  }
}

size()方法获取锁的顺序和insert()方法不同,这个没有关系,不违背全局固定顺序获取锁的准则,因为size()在某一个时间上并不持有一把以上的锁。

2.4 条件变量

并发编程经常需要等待某个事件发生。比如,从队列删除元素前需要等待队列非空。条件变量就是为这种情况而设计。推荐的使用条件变量的模式如下:

ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();

lock.lock();
try{
    while(!MyList.isEmpty())    //这里的MyList.isEmpty()可以是自己定义的任何条件,判断语句。这里一定要用循环,否则可能造成虚假唤醒(比如使用sinalAll()方法)。
        //await()方法将原子地解锁并且阻塞线程来等待该条件。此处的原子指的是其他线程看该线程要么是锁要么是没锁。 当另一线程调用了singnal()或者signalAll()则意味着对应条件可能为正了,await()将原子性地恢复运行并且重新加锁,所以要放在while中。
         condition.await();        
    //这里开始使用互斥资源
}finally{ lock,unlock()}        //最佳实践

接下来我们将使用条件变量来解决哲学家进餐问题,这个也是推荐的方式。因为只要条件满足,他都会去进餐,这样有非常好的并发度。具体解释见书本p25

/***
 * Excerpted from "Seven Concurrency Models in Seven Weeks",
 * published by The Pragmatic Bookshelf.
 * Copyrights apply to this code. It may not be used to create training material, 
 * courses, books, articles, and the like. Contact us if you are in doubt.
 * We make no guarantees that this code is fit for any purpose. 
 * Visit http://www.pragmaticprogrammer.com/titles/pb7con for more book information.
***/
package com.paulbutcher;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import java.util.Random;

class Philosopher extends Thread {

  private boolean eating;
  private Philosopher left;
  private Philosopher right;
  private ReentrantLock table;      //整个桌子的锁
  private Condition condition;
  private Random random;
  private int thinkCount;

  public Philosopher(ReentrantLock table) {
    eating = false;
    this.table = table;
    condition = table.newCondition();
    random = new Random();
  }

  public void setLeft(Philosopher left) { this.left = left; }
  public void setRight(Philosopher right) { this.right = right; }

  public void run() {
    try {
      while (true) {
        think();
        eat();
      }
    } catch (InterruptedException e) {}
  }

  private void think() throws InterruptedException {        //思考的时候锁住整个桌子,保持现场,然后告诉身边的两位可以试试看eat了
    table.lock();
    try {
      eating = false;
      left.condition.signal();
      right.condition.signal();
    } finally { table.unlock(); }
    ++thinkCount;
    if (thinkCount % 10 == 0)
      System.out.println("Philosopher " + this + " has thought " + thinkCount + " times");
    Thread.sleep(1000);
  }

  private void eat() throws InterruptedException {      //被唤醒或者自己第一次开始准备吃,这时候也先锁住桌子,然后判断下条件变量。
    table.lock();
    try {
      while (left.eating || right.eating)
        condition.await();
      eating = true;
    } finally { table.unlock(); }
    Thread.sleep(1000);
  }
}

3. atomic包

java.util.concurrent.atomic包提供了使用一把锁保护互斥资源的更好方式。之前关于递增的操作可以使用如下的方式实现。

3.1 原子变量

使用atomic包里面的原子对象来完成互斥对象的加锁操作。

/***
 * Excerpted from "Seven Concurrency Models in Seven Weeks",
 * published by The Pragmatic Bookshelf.
 * Copyrights apply to this code. It may not be used to create training material, 
 * courses, books, articles, and the like. Contact us if you are in doubt.
 * We make no guarantees that this code is fit for any purpose. 
 * Visit http://www.pragmaticprogrammer.com/titles/pb7con for more book information.
***/
package com.paulbutcher;

import java.util.concurrent.atomic.AtomicInteger;

public class Counting {
  public static void main(String[] args) throws InterruptedException {

    final AtomicInteger counter = new AtomicInteger();

    class CountingThread extends Thread {
      public void run() {
        for(int x = 0; x < 10000; ++x)
          counter.incrementAndGet();        //incrementAndGet操作就是对当前的AtomicInteger执行递增1的操作。
      }
    }

    CountingThread t1 = new CountingThread();
    CountingThread t2 = new CountingThread();

    t1.start(); t2.start();
    t1.join(); t2.join();

    System.out.println(counter.get());
  }
}

3.2 原子变量的优点

由于原子对象本身的特点支持互斥访问,所以不需要自己控制加锁。从而不会因此造成死锁问题已经由于没有对读写都进行同步而引起的可见性问题。

原子变量时无锁(lock-free)非阻塞(non-blocking)算法的基础。java.util.concurrent包中的类都尽量使用了无锁的代码。

3.3 关于volatile

volatile标记的变量得读写操作不会被乱序执行。这是一种低级形式的同步,不能解决例如count递增这种互斥对象修改。JVM的优化以及提供了更多轻量级的锁,推荐从java.util.concurrent.atomic包中寻找适合自己的工具。

4.总结

ReentrantLock和java.util.concurrent.atomic突破了使用内置锁的限制,利用新的工具我们可以做到:

  1. 在线程获取锁时中断它
  2. 设置线程获取锁的超时时间
  3. 按任意顺序获取和释放锁
  4. 用条件变量等待某个条件变为真
  5. 使用原子变量避免锁的使用