赛迪网 > IT技术 Java > 技术动态
  IT资讯搜索
 
IT产品搜索
[程序开发][网管世界][网络安全][数据库技术]
[操作系统][嘉宾聊天·在线访谈][活动集锦]
[精彩专题][Symantec专区][订阅IT技术周刊]
[开发论坛][网管论坛][安全论坛][数据库论坛]
[操作系统论坛][Sybase专区][IBM dW技术专区]
[病毒求助][病毒与漏洞播报][文档·源码下载]

J2SE综合--Java 5.0 多线程编程实践

发布时间:2008.01.15 04:33     来源:赛迪网    作者:huanglz

Java5增加了新的类库并发集java.util.concurrent,该类库为并发程序提供了丰富的API多线程编程在Java 5中更加容易,灵活。本文通过一

个网络服务器模型,来实践Java5的多线程编程,该模型中使用了Java5中的线程池,阻塞队列,可重入锁等,还实践了Callable, Future等接

口,并使用了Java 5的另外一个新特性泛型。 

  简介 

  本文将实现一个网络服务器模型,一旦有客户端连接到该服务器,则启动一个新线程为该连接服务,服务内容为往客户端输送一些字符信

息。一个典型的网络服务器模型如下: 

  1. 建立监听端口。 

  2. 发现有新连接,接受连接,启动线程,执行服务线程。 3. 服务完毕,关闭线程。 

  这个模型在大部分情况下运行良好,但是需要频繁的处理用户请求而每次请求需要的服务又是简短的时候,系统会将大量的时间花费在线

程的创建销毁。Java 5的线程池克服了这些缺点。通过对重用线程来执行多个任务,避免了频繁线程的创建与销毁开销,使得服务器的性能方

面得到很大提高。因此,本文的网络服务器模型将如下: 

  1. 建立监听端口,创建线程池。 

  2. 发现有新连接,使用线程池来执行服务任务。 

  3. 服务完毕,释放线程到线程池。 

  下面详细介绍如何使用Java 5的concurrent包提供的API来实现该服务器。 

  初始化 

  初始化包括创建线程池以及初始化监听端口。创建线程池可以通过调用java.util.concurrent.Executors类里的静态方法

newChahedThreadPool或是newFixedThreadPool来创建,也可以通过新建一个java.util.concurrent.ThreadPoolExecutor实例来执行任务。这

里我们采用newFixedThreadPool方法来建立线程池。 

  ExecutorService pool = Executors.newFixedThreadPool(10); 

  表示新建了一个线程池,线程池里面有10个线程为任务队列服务。 

  使用ServerSocket对象来初始化监听端口。 

  private static final int PORT = 19527; 
  serverListenSocket = new ServerSocket(PORT); 
  serverListenSocket.setReuseAddress(true); 
  serverListenSocket.setReuseAddress(true); 

  服务新连接 

  当有新连接建立时,accept返回时,将服务任务提交给线程池执行。 

  while(true){ 
  Socket socket = serverListenSocket.accept(); 
  pool.execute(new ServiceThread(socket)); 
  } 

  这里使用线程池对象来执行线程,减少了每次线程创建和销毁的开销。任务执行完毕,线程释放到线程池。 

  服务任务 

  服务线程ServiceThread维护一个count来记录服务线程被调用的次数。每当服务任务被调用一次时,count的值自增1,因此ServiceThread

提供一个increaseCount和getCount的方法,分别将count值自增1和取得该count值。由于可能多个线程存在竞争,同时访问count,因此需要加

锁机制,在Java 5之前,我们只能使用synchronized来锁定。Java 5中引入了性能更加粒度更细的重入锁ReentrantLock。我们使用

ReentrantLock保证代码线程安全。下面是具体代码: 

  private static ReentrantLock lock = new ReentrantLock (); 
  private static int count = 0; 
  private int getCount(){ 
  int ret = 0; 
  try{ 
  lock.lock(); 
  ret = count; 
  }finally{ 
  lock.unlock(); 
  } 
  return ret; 
  } 
  private void increaseCount(){ 
  try{ 
  lock.lock(); 
  ++count; 
  }finally{ 
  lock.unlock(); 
  } 
  } 

  服务线程在开始给客户端打印一个欢迎信息, 

  increaseCount(); 
  int curCount = getCount(); 
  helloString = "hello, id = " + curCount+"\r\n"; 
  dos = new DataOutputStream(connectedSocket.getOutputStream()); 
  dos.write(helloString.getBytes()); 

  然后使用ExecutorService的submit方法提交一个Callable的任务,返回一个Future接口的引用。这种做法对费时的任务非常有效,submit

任务之后可以继续执行下面的代码,然后在适当的位置可以使用Future的get方法来获取结果,如果这时候该方法已经执行完毕,则无需等待即

可获得结果,如果还在执行,则等待到运行完毕。 

  ExecutorService executor = Executors.newSingleThreadExecutor(); 
  Future future = executor.submit(new TimeConsumingTask()); 
  dos.write("let's do soemthing other".getBytes()); 
  String result = future.get(); 
  dos.write(result.getBytes()); 

  其中TimeConsumingTask实现了Callable接口 

  class TimeConsumingTask implements Callable { 
  public String call() throws Exception { 
  System.out.println("It's a time-consuming task, you'd better retrieve your result in the furture"); 
  return "ok, here's the result: It takes me lots of time to produce this result"; 
  } 
  } 

  这里使用了Java 5的另外一个新特性泛型,声明TimeConsumingTask的时候使用了String做为类型参数。必须实现Callable接口的call函数

,其作用类似与Runnable中的run函数,在call函数里写入要执行的代码,其返回值类型等同于在类声明中传入的类型值。在这段程序中,我们

提交了一个Callable的任务,然后程序不会堵塞,而是继续执行dos.write("let's do soemthing other".getBytes());当程序执行到String 

result = future.get()时如果call函数已经执行完毕,则取得返回值,如果还在执行,则等待其执行完毕。 


服务器端的完整实现 

  服务器端的完整实现代肴缦拢?

  package com.andrew; 

  import java.io.DataOutputStream; 
  import java.io.IOException; 
  import java.io.Serializable; 
  import java.net.ServerSocket; 
  import java.net.Socket; 
  import java.util.concurrent.ArrayBlockingQueue; 
  import java.util.concurrent.BlockingQueue; 
  import java.util.concurrent.Callable; 
  import java.util.concurrent.ExecutionException; 
  import java.util.concurrent.ExecutorService; 
  import java.util.concurrent.Executors; 
  import java.util.concurrent.Future; 
  import java.util.concurrent.RejectedExecutionHandler; 
  import java.util.concurrent.ThreadPoolExecutor; 
  import java.util.concurrent.TimeUnit; 
  import java.util.concurrent.locks.ReentrantLock; 

  public class Server { 
  private static int produceTaskSleepTime = 100; 
  private static int consumeTaskSleepTime = 1200; 
  private static int produceTaskMaxNumber = 100; 
  private static final int CORE_POOL_SIZE = 2; 
  private static final int MAX_POOL_SIZE = 100; 
  private static final int KEEPALIVE_TIME = 3; 
  private static final int QUEUE_CAPACITY = (CORE_POOL_SIZE + MAX_POOL_SIZE) / 2; 
  private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS; 
  private static final String HOST = "127.0.0.1"; 
  private static final int PORT = 19527; 
  private BlockingQueue workQueue = new ArrayBlockingQueue(QUEUE_CAPACITY); 
  //private ThreadPoolExecutor serverThreadPool = null; 
  private ExecutorService pool = null; 
  private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy(); 
  private ServerSocket serverListenSocket = null; 
  private int times = 5; 
  public void start() { 
  // You can also init thread pool in this way. 
  /*serverThreadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, 
  MAX_POOL_SIZE, KEEPALIVE_TIME, TIME_UNIT, workQueue, 
  rejectedExecutionHandler);*/ 
  pool = Executors.newFixedThreadPool(10); 
  try { 
   serverListenSocket = new ServerSocket(PORT); 
   serverListenSocket.setReuseAddress(true); 

   System.out.println("I'm listening"); 
   while (times-- > 0) { 
    Socket socket = serverListenSocket.accept(); 
    String welcomeString = "hello"; 
    //serverThreadPool.execute(new ServiceThread(socket, welcomeString)); 
    pool.execute(new ServiceThread(socket)); 
   } 
  } catch (IOException e) { 
   // TODO Auto-generated catch block 
   e.printStackTrace(); 
  } 
  cleanup(); 
  } 

  public void cleanup() { 
  if (null != serverListenSocket) { 
   try { 
    serverListenSocket.close(); 
   } catch (IOException e) { 
    // TODO Auto-generated catch block 
    e.printStackTrace(); 
   } 
  } 
  //serverThreadPool.shutdown(); 
  pool.shutdown(); 
  } 

  public static void main(String args[]) { 
  Server server = new Server(); 
  server.start(); 
  } 
  } 

  class ServiceThread implements Runnable, Serializable { 
  private static final long serialVersionUID = 0; 
  private Socket connectedSocket = null; 
  private String helloString = null; 
  private static int count = 0; 
  private static ReentrantLock lock = new ReentrantLock(); 

  ServiceThread(Socket socket) { 
  connectedSocket = socket; 
  } 

  public void run() { 
  increaseCount(); 
  int curCount = getCount(); 
  helloString = "hello, id = " + curCount + "\r\n"; 

  ExecutorService executor = Executors.newSingleThreadExecutor(); 
  Future future = executor.submit(new TimeConsumingTask()); 

  DataOutputStream dos = null; 
  try { 
   dos = new DataOutputStream(connectedSocket.getOutputStream()); 
   dos.write(helloString.getBytes()); 
   try { 
    dos.write("let's do soemthing other.\r\n".getBytes()); 
    String result = future.get(); 
    dos.write(result.getBytes()); 
   } catch (InterruptedException e) { 
    e.printStackTrace(); 
   } catch (ExecutionException e) { 
    e.printStackTrace(); 
   } 
  } catch (IOException e) { 
   // TODO Auto-generated catch block 
   e.printStackTrace(); 
  } finally { 
   if (null != connectedSocket) { 
    try { 
     connectedSocket.close(); 
    } catch (IOException e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } 
   } 
   if (null != dos) { 
    try { 
     dos.close(); 
    } catch (IOException e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } 
   } 
   executor.shutdown(); 
  } 
  } 

  private int getCount() { 
  int ret = 0; 
  try { 
   lock.lock(); 
   ret = count; 
  } finally { 
   lock.unlock(); 
  } 
  return ret; 
  } 

  private void increaseCount() { 
  try { 
   lock.lock(); 
   ++count; 
  } finally { 
   lock.unlock(); 
  } 
  } 
  } 

  class TimeConsumingTask implements Callable { 
  public String call() throws Exception { 
  System.out.println("It's a time-consuming task, you'd better retrieve your result in the furture"); 
  return "ok, here's the result: It takes me lots of time to produce this result"; 
  } 

  } 
       (责任编辑:包春林)


[ 发表评论 ] 字体[  ] [ 打印 ] [ 进入博客 ] [ 进入论坛 ]  [ 推荐给朋友 ]
  相关文章
· 数据库相关:小结Hibernate的查询方式 (01-14) · 开发框架:初学者学习Hibernate的方法 (01-14)
· J2EE综合--分析Hibernate的缓存机制 (01-14) · J2EE综合--Hibernate的事务处理机制 (01-14)
· 开发框架--Hibernate中各个包的作用 (01-14) · 开发框架--Hibernate为什么如此成功 (01-14)
· 一篇不错的讲解Java异常的文章 (01-14) · 写Java程序最容易犯的21种错误实例分析 (01-14)
· 使用XML的五种场合 (01-14) · Hibernate应用中如何处理批量更新和删除 (01-11)
  客户需求反馈表
* 姓  名:
更多资料  了解方案  认识厂商
* 单位名称:
* 联系电话:
* 电子邮件:
  赛迪推荐  
  手机·资费 ·新品·导购·评测·手机资费·宽带
手机搜索  诺基亚 N73 MOTO Z6
  IT产品 ·笔记本·台式机·服务器·打印·投影
IT产品搜索 
  IT技术 ·开发·网管·安全·数据库·操作系统
  信息化 ·热点·专题·访谈·周刊·方案案例
· 信息化市场百家争鸣 SaaS深陷争议“泥潭”
· 提高管理水平 "两栖"CIO应具备的六大能力
· 国产ITIL运维先行者 四大厂商角力BI市场
· 金融行业GSN专题解决方案 企业网解决方案
  IT博客 ·曾剑秋·项立刚·Java学习·网管
  IT技术论坛 ·开发·网管·安全·数据库·系统