你好,游客 登录 注册 搜索
背景:
阅读新闻

向Hadoop学习NIO的使用

[日期:2016-04-17] 来源:李乾坤的博客  作者: [字体: ]

前言

如果没有专门进行过网络开发,那么科班出身的程序员对java网络编程的了解到bio就为止了,即便知道nio,也是一个简单地eventloop的例子,这个demo代码应付实际的业务是远远不够的。于是更多的人用了netty,netty将java nio封装的很好,也许封装的太好了,但netty也有它的局限性,比如对大文件传输的支持不是很好。

所以,不管从了解java nio的角度,还是为了在java网络编程时有更多的选择,都应该了解下java nio开发的一些套路。

一切都来自SelectorProvider

java nio中几个基本操作对象SocketChannel、ServerSocketChannel和Selector,它们的创建本质上都是调用了SelectorProvider。其OpenJDK实现如下

public static SelectorProvider create() {  
      PrivilegedAction pa = new GetPropertyAction("os.name");  
      String osname = (String) AccessController.doPrivileged(pa);  
      if ("SunOS".equals(osname)) {//1、如果SunOS  
          return new sun.nio.ch.DevPollSelectorProvider();  
      }  
      //2、Linux 内核>=2.6  
      // use EPollSelectorProvider for Linux kernels >= 2.6  
      if ("Linux".equals(osname)) {  
          pa = new GetPropertyAction("os.version");  
          String osversion = (String) AccessController.doPrivileged(pa);  
          String[] vers = osversion.split("\\.", 0);  
          if (vers.length >= 2) {  
              try {  
                  int major = Integer.parseInt(vers[0]);  
                  int minor = Integer.parseInt(vers[1]);  
                  if (major > 2 || (major == 2 && minor >= 6)) {  
                      return new sun.nio.ch.EPollSelectorProvider();  
                  }  
              } catch (NumberFormatException x) {  
                  // format not recognized  
              }  
          }  
      }  
      return new sun.nio.ch.PollSelectorProvider();  
}  

可以简单归纳如下:在Solaris系统下将会使用DevPollSelectorProdiver;在Linux系统下(2.6+版本),将会使用EPollSelectorProvider;否则将会使用PollSelectorProvider.对于SunJDK,DefaultSelectorProvider将直接返回WindowsSelectorProvider.这是一种基于Poll机制.

本节引用自 NIO中Channel.spi学习

牛逼的org.apache.hadoop.net包

众所周知,hadoop将文件分块存储,每块默认有3个副本,那hadoop内部肯定有文件(块)传输操作,这在我们自己实现文件传输时有很大的借鉴意义。相关的代码在org.apache.hadoop.net 中,主要涉及到四个类SocketInputStream,SocketOutputStream,SocketIOWithTimeout,StandardSocketFactory。笔者初看完这4个类的实现,只有一个感觉:鬼斧神工。

StandardSocketFactory负责创建Socket对象,SocketIOWithTimeout(封装Socket)提供带超时设置的io和connect操作。SocketInputStream和SocketOutputStream有个内部类作为SocketIOWithTimeout的子类,负责对外提供读写方法。很明显,SocketIOWithTimeout是核心,SocketIOWithTimeout的核心是 int doIO(ByteBuffer buf, int ops) 方法,其简化逻辑是

int doIO(ByteBuffer,ops){
    buffer// 对于read操作,就要要填充的buffer,对于写操作,就是要输出的buffer
while(buffer.hasremaing()){
// channel尝试读写,performIO留给子类实现(决定是读还是写)
int n = performIO(buffer)
// 如果读到就返回
if(n != 0){
    return n;
}
// 如果设定的时间内不能readable/wriable
count = selectorPool.select(channel, ops, timeout)
if(count == 0){
    抛出SocketTimeoutException
}
}
return 0
}

由此可以看到nio使用bytebuffer的一个优势,那就是操作的统一。不管read和write,不管有没有读到或写入数据,不管读取或写入了一个字节还是多个字节,buffer都是参数,不像bio,本质上是 int read() 和 write(int b) ,一次只能一个字节,并且这个字节,在read操作中作为返回值,在write操作中作为参数。读者可以参见InputStream和OutputStream抽象类的源码, read(byte[]) 和read(byte[],off,len) 本质上是操作read实现的。

SelectorPool

SocketIOWithTimeout中另一个比较厉害的操作就是selectorPool.select(channel, ops, timeout) 。selectorPool是一个selector池,调用 select(SelectableChannel channel, int ops, long timeout) 方法就为channel分配一个空闲的selector监听ops操作,一个selector在某个时刻只负责一个channel((如果监听多个的话,timeout就不准了)),用完回收,如果一个selector空闲时间太长,就关闭它。select方法简化逻辑如下

int select(SelectableChannel channel, int ops, long timeout){
    // 为channel选择一个空闲的selector,如果没有则创建一个新的selector
    while(true){
        channel.register(selector, ops);
        int ret = selector.select(timeout);
        if (ret != 0) {
    return ret;
}
// 记录循环的耗时,如果timeout了return 0
    }
    // 释放一些资源,回收selector
}

SelectorPool提供的select方法,意义还是蛮大的

  1. 以前一个selector负责多个socketchannel,现在一对多变成一对一
  2. 我们知道,nio设置成非阻塞后,没有超时一说(只是buffer中有没有数据的区别),而它提供了一种判断超时的机制。
  3. 隐藏了selector的创建和销毁。这样, nio的数据读写就不用eventloop ,编写代码的感觉回归到以socketchannel为主角,跟bio就比较像了。这为我们实现基于nio的socket pool提供了便利。

     public class TestSocketOutputStream {
     	public static void main(String[] args) throws UnknownHostException, IOException {
     		SocketFactory socketFactory = StandardSocketFactory.getDefault();
     		Socket socket = socketFactory.createSocket("127.0.0.1", 8080);
     		SocketOutputStream outputStream = new SocketOutputStream(socket, 5000);
     		outputStream.write("abc".getBytes());
     	}
     }




收藏 推荐 打印 | 录入:elainebo | 阅读:
本文评论   查看全部评论 (0)
表情: 表情 姓名: 字数
点评:
       
评论声明
  • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
  • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
  • 本站管理人员有权保留或删除其管辖留言中的任意内容
  • 本站有权在网站内转载或引用您的评论
  • 参与本评论即表明您已经阅读并接受上述条款