1. 概览

这一小节内容之所取名站在巨人肩膀上,是告诉我们要善用那些已经有大牛写好的工具。当然我们这里说的还是java

2.创建线程

在服务器上,每个请求有一个对应的处理线程,如果使用普通创建线程的方法,会有以下问题:

  1. 不断的创建和销毁线程存在代价
  2. 当请求链接的速度高于处理链接的速度时,线程数量会不断增长,导致服务器停止服务而崩溃。这给那些相对服务器进行拒绝服务供给的人提供了可乘之机。

可以采用Executors里面的方法来创建线程池

线程池设置多大的一般性准则:对于CPU密集型的任务,线程大小应该接近于核数;对于IO密集型任务,线程池可以设置的大一些。

/***
 * Excerpted from "Seven Concurrency Models in Seven Weeks",
 * published by The Pragmatic Bookshelf.
 * Copyrights apply to this code. It may not be used to create training material, 
 * courses, books, articles, and the like. Contact us if you are in doubt.
 * We make no guarantees that this code is fit for any purpose. 
 * Visit http://www.pragmaticprogrammer.com/titles/pb7con for more book information.
***/
package com.paulbutcher;

import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.io.InputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.net.ServerSocket;

public class EchoServer {

  public static void main(String[] args) throws IOException {

    class ConnectionHandler implements Runnable {
      InputStream in; OutputStream out;

      ConnectionHandler(Socket socket) throws IOException {
        in = socket.getInputStream();
        out = socket.getOutputStream();
      }

      public void run() {
        try {
          int n;
          byte[] buffer = new byte[1024];
          while((n = in.read(buffer)) != -1) {
            out.write(buffer, 0, n);
            out.flush();
          }
        } catch (IOException e) {}
      }
    }

    ServerSocket server = new ServerSocket(4567);
    int threadPoolSize = Runtime.getRuntime().availableProcessors() * 2;
    ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize);
    while (true) {
      Socket socket = server.accept();
      executor.execute(new ConnectionHandler(socket));
    }
  }
}

3. 写入时复制CopyOnWriteArrayList

之前在讲不要使用外星方法的那一节中我们学习了如何在并发程序中安全地调用监听器。当时采用一个保护性复制的方法。Java标准库提供了现成的更加优雅地方案——CopyOnWriteArrayList

/***
 * Excerpted from "Seven Concurrency Models in Seven Weeks",
 * published by The Pragmatic Bookshelf.
 * Copyrights apply to this code. It may not be used to create training material, 
 * courses, books, articles, and the like. Contact us if you are in doubt.
 * We make no guarantees that this code is fit for any purpose. 
 * Visit http://www.pragmaticprogrammer.com/titles/pb7con for more book information.
***/
package com.paulbutcher;

import java.io.*;
import java.net.URL;
import java.util.concurrent.CopyOnWriteArrayList;

class Downloader extends Thread {
  private InputStream in;
  private OutputStream out;
  private CopyOnWriteArrayList<ProgressListener> listeners;         //使用一个写入时拷贝对象。当原对象要发生写入的时候,对其做一份克隆,然后对克隆对象操作。

  public Downloader(URL url, String outputFilename) throws IOException {
    in = url.openConnection().getInputStream();
    out = new FileOutputStream(outputFilename);
    listeners = new CopyOnWriteArrayList<ProgressListener>();
  }
  public void addListener(ProgressListener listener) {
    listeners.add(listener);
  }
  public void removeListener(ProgressListener listener) {
    listeners.remove(listener);
  }
  private void updateProgress(int n) {
    for (ProgressListener listener: listeners)
      listener.onProgress(n);
  }

  public void run() {
    int n = 0, total = 0;
    byte[] buffer = new byte[1024];

    try {
      while((n = in.read(buffer)) != -1) {
        out.write(buffer, 0, n);
        total += n;
        updateProgress(total);
      }
      out.flush();
    } catch (IOException e) { }
  }
}

4.完整的程序

此处作者写了个完整的程序用于并发地去统计词频。作者从简单到复杂,而且还提到了各种坑,十分棒。程序太长这里不罗列了。仅仅写一些注意的地方。

  1. 可以采用的典型生产者消费者模式。获取网页放入队列就是生产者。从队列取得网页进行词频统计就是消费者。
  2. 存放网页的队列可以采用ArrayBlockingQueue。之所以采用阻塞队列,是因为如果生产者的速度比消费者快很多,很容易就占用大量内存空间
  3. 毒丸对象: “毒丸”是指一个放在队列上的对象,其含义是:“当得到这个对象时,立即停止。自己可以定义一个毒丸对象,用于告诉消费者可以停止了。
  4. 过度竞争: 作者采用2个消费者的时候并没有改善性能,因为由于竞争导致大量的等待。
  5. 为了避免过度竞争,作者引入了java.util.concurrent包中的ConcurrentHashMap。这个包提供了原子的读-改-写方法,还使用了更高级的并发访问(锁分段,lock striping)。所谓锁分段就是在一个HashMap上划分成多个数据段,每个数据段有自己的一个粒度更小的锁。具体可以查看这篇文章深入分析ConcurrentHashMap。此处用到的putIfAbsent方法把判断如果不存在值则放入新值这2个操作合在一起了,就完成了一个原子性操作。
  6. 作者发现并发提升的性能还不够,因为只用了一个count,所以考虑引入一个计数map,最后再对结果汇总。

5. 本节总结

  1. 使用线程池而不是直接创建线程
  2. 使用CopyOnWriterArrayList让监听器相关的代码更加简单高效
  3. 使用ArrayBlockingQueue让生产者和消费者之间高效协作
  4. ConcurrentHashMao提供了更好的并发访问

6.第二章章节总结

线程与锁并发模型的优点和缺点。

6.1 优点

  1. 适用面广
  2. 与本质比较接近,是对硬件工作方式的形式化
  3. 可以轻松继承到大多数编程语言中,让一门指令式语言或者面向对象语言支持线程与锁模型。

6.2 缺点

  1. 线程与锁模型没有为并行提供直接的支持。之前词频统计的程序是一个并发的形式,但是引入了不确定性的隐患。
  2. 仅仅支持共享内存模型,不支持分布式内存模型。不适用于单个系统武力解决的问题。
  3. 缺乏语言层面的强力支持。语言设计者很容易将该模型集成到一门语言中,但是对程序员来说在编程语言层面没有提供足够的帮助。(坑太多)
  4. 难以测试。之前应用多线程内存的例子,有可见性的问题,这种奇怪的问题难以测试。没有好的测试来证明访问内存时响应的代码都有同步保护。
  5. BUG很难重现。(也是导致测试难的问题之一)
  6. 维护难:无法对多线程代码进行可靠的重构。我们要保证所有对象都是正确的,必须按照顺序来获取多把锁,持有锁时不调用外星方法巴拉巴拉。。你除了谨慎的编程和思考,没办法了。

虽然我们仅仅讨论了Java的内存模型,但是会对内存访问进行乱序执行的不止Java。大多数语言没有对内存模型做出完善的定义。

另外java.util.concurrent包的作者攥写的Java编程实践欢迎查看!在我的读书笔记中也有。