在Java7中体会NIO.2异步执行

  categories:资料  author:

简单介绍 Asynchronous I/O

JDK7 已经大致确定发布时间。JSR 203 提出很久了。2009.11.13,JDK7 M5(b76)已经发布。JSR 203 习惯上称为 NIO.2,主要包括新的:

  • 异步 I/O(简称 AIO);
  • Multicase 多播;
  • Stream Control Transport Protocol(SCTP);
  • 文件系统 API;
  • 以及一些 I/O API 的更新,例如:java.io.File.toPath,NetworkChannel 的完整抽象,等等。

本文将主要关注 AIO。AIO 包括 Sockets 和 Files 两部分的异步通道接口及其实现,并尽量使用操作系统提供的原生本地 I/O 功能进行实现。例如 Windows 版本的实现就使用了所谓的完成端口模型(IOCP)。其实 JDK 7 中 AIO 实现本质上说应该是 Proactor 模式的实现。Alexander Libman 提供 NIO 版本的 JProactor 的实现。NIO.2 版本 JProactor 正在进行。Grizzly 也已经提供新的基于 AIO 的实现。如果您只想检查这些最新的 API,NIO.2 项目的 Javadoc 站点只列出了 NIO.2 部分的 API。

  • AIO 的核心概念:发起非阻塞方式的 I/O 操作。当 I/O 操作完成时通知。
  • 应用程序的责任就是:什么时候发起操作? I/O 操作完成时通知谁?

AIO 的 I/O 操作,有两种方式的 API 可以进行:

  • Future 方式;
  • Callback 方式。

下面我们分别对这两种方式的 API 进行举例说明。

Future 方式

Future 方式:即提交一个 I/O 操作请求,返回一个 Future。然后您可以对 Future 进行检查,确定它是否完成,或者阻塞 IO 操作直到操作正常完成或者超时异常。使用 Future 方式很简单,比较典型的代码通常像清单 1 所示。

清单 1. 使用 Future 方式的代码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
// 连接远程服务器,等待连接完成或者失败
Future<Void> result = ch.connect(remote);
// 进行其他工作,例如,连接后的准备环境,f.e.
//prepareForConnection();
//Future 返回 null 表示连接成功
if(result.get()!=null){
   // 连接失败,清理刚才准备好的环境,f.e.
   //clearPreparation();
   return;
}
// 网络连接正常建立
...
ByteBuffer buffer = ByteBuffer.allocateDirect(8192);
// 进行读操作
Future<Integer> result = ch.read(buffer);
// 此时可以进行其他工作,f.e.
//prepareLocalFile();
// 然后等待读操作完成
try {
   int bytesRead = result.get();
   if(bytesRead==-1){
   // 返回 -1 表示没有数据了而且通道已经结束,即远程服务器正常关闭连接。
       //clear();
       return;
   }
   
   // 处理读到的内容,例如,写入本地文件,f.e.
   //writeToLocolFile(buffer);
} catch (ExecutionExecption x) {
   //failed
}

需要注意的是,因为 Future.get()是同步的,所以如果不仔细考虑使用场合,使用 Future 方式可能很容易进入完全同步的编程模式,从而使得异步操作成为一个摆设。如果这样,那么原来旧版本的 Socket API 便可以完全胜任,大可不必使用异步 I/O。

Callback 方式

Callback 方式:即提交一个 I/O 操作请求,并且指定一个 CompletionHandler。当异步 I/O 操作完成时,便发送一个通知,此时这个 CompletionHandler 对象的 completed 或者 failed 方法将会被调用,样例代码如清单 2 所示。

清单 2. 完成处理接口
1
2
3
4
5
6
7
8
public interface CompletionHandler<V,A> {
   // 当操作完成后被调用,result 参数表示操作结果,
   //attachment 参数表示提交操作请求时的参数。
   void completed(V result, A attachment);
   // 当操作失败是调用,exc 参数表示失败原因。attachment 参数同上。
   void failed(Throwable exc, A attachment);
}
  • V表示结果值的类型。对于异步网络通道的读写操作而言,这个结果值 V 都是整数类型,表示已经操作的卦数,如果是 -1,NIO.2 内核实现保证传递的 ByteBuffer参数不会有变化。
  • A表示关联到 I/O 操作的对象的类型。用于传递操作环境。通常会封装一个连接环境。
  • 如果成功则 completed 方法被调用。如果失败则 failed 方法被调用。

准备好 CompletionHandler 之后,如何使用 CompletionHandler 呢? AIO 提供四种类型的异步通道以及不同的 I/O 操作接受一个 CompletionHandler 对象,它们分别是:

  • AsynchronousSocketChannel:connect,read,write
  • AsynchronousFileChannel:lock,read,write
  • AsynchronousServerSocketChannel:accept
  • AsynchronousDatagramChannel:read,write,send,receive

本文重点关注 AsynchronousSocketChannel 的使用,首先简单浏览一下该类型的 API。

AsynchronousSocketChannel
1
2
public abstract class AsynchronousSocketChannel
   implements AsynchronousByteChannel, NetworkChannel
创建一个异步网络通道,并且绑定到一个默认组。
1
public static AsynchronousSocketChannel open() throws IOException
将异步网络通道连接到远程服务器,使用指定的 CompletionHandler 听候完成通知。
1
2
3
public abstract <A> void connect(SocketAddress remote,
   A attachment,
   CompletionHandler<Void,? super A> handler)
从异步网络通道读取数据到指定的缓冲区,使用指定的 CompletionHandler 听候完成通知。
1
2
3
public final <A> void read(ByteBuffer dst,
   A attachment,
   CompletionHandler<Integer,? super A> handler)
向异步网络通道写缓冲区中的数据,使用指定的 CompletionHandler 听候完成通知。
1
2
3
public final <A> void write(ByteBuffer src,
   A attachment,
   CompletionHandler<Integer,? super A> handler)

开始简单的异步 I/O 网络客户端程序

本文重点关注 AIO 的 socket 部分。接下来,我们以 AIO 方式的 FTP 客户端程序为例,开始体会异步执行的快乐。需要提醒的是:快乐和痛苦如影随行。好,那“痛并快乐着”吧。

使用 AIO,可以想象一个在线视频播放的应用场景。使用异步 I/O 回调方式,可以这样描述一边下载视频一边播放的功能:

  1. 准备好网络连接
  2. 准备一个缓冲区,提交读操作希望下载部分视频内容,(这个读请求马上完成)
  3. 等待读请求完成操作,此时可以进行其他工作,比如播放广告
  4. 读操作真正完成,得到通知,CompletionHandler#completed 方法被调用,
  5. 启动另外的播放线程,从下载的缓冲区读取内容播放视频。
  6. 再准备一个另外的缓冲区,回到第二步

这样,第二步到第六步自动构成一个执行循环,但不是 while 之类的代码循环。

本文以 FTP 客户端程序为例,来阐述如何使用异步 I/O 进行网络程序的编写。

FTP 分为两个通道进行处理:控制通道和数据通道。

首先,开始 FTP 的控制通道的编程。FTP 的控制通道使用 telnet 行命令方式进行请求和响应处理。 第一个例子不会复杂,我们只是连接到一个远程服务器,并且检查某个文件的大小,然后退出。基本步骤如下:

  1. 连接到 FTP 服务器。为了便于测试,本文将“攻击”ftp.gnu.org 服务器。
  2. 读取服务器的欢迎信息,检查远程服务器是否已经准备就绪。
  3. 如果服务器没有准备好,关闭连接,退出
  4. 如果服务器没有问题,发送登录命令。
  5. 检查登录命令结果。如果登录失败,转到第 8 步。
  6. 如果服务器没有问题,发送检查文件大小的命令。
  7. 检查命令结果。并且显示结果。
  8. 发送退出命令
  9. 关闭连接。

使用进行一个简单的设计:

第一个简单有问题的例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class FTPClient1 {
    public static void main(String[] args) throws IOException {
        // 第一个,创建异步网络通道
        AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
        // 连接到服务器,以 ftp.gnu.org 为目标
        channel.connect(new InetSocketAddress("ftp.gnu.org", 21), channel,
            // 使用连接完成的回调
            new CompletionHandler<Void, AsynchronousSocketChannel>() {
            @Override
            public void completed(Void result, AsynchronousSocketChannel attachment) {
                // 完成连接后,启动 FTP 的控制逻辑
                FTPClient1 client = new FTPClient1();
                client.start(attachment);
            }
            @Override
            public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
                exc.printStackTrace();
            }
        });
        //connect 的调用异步执行,马上完成,阻止 JVM 退出
        System.in.read();
    }
    AsynchronousSocketChannel channel;
    public void start(AsynchronousSocketChannel channel) {
        this.channel = channel;
        // 准备读缓冲区
        ByteBuffer buffer = ByteBuffer.allocateDirect(128);
        // 发出读操作请求,
        channel.read(buffer, buffer,
        // 读操作完成后通知
        new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                if (result > 0) {
                    // 简单处理读到的响应结果
                    int position = attachment.position() - 1;
                    if (attachment.get(position - 1) == 13 &&
                    attachment.get(position) == 10) {
                        if (isValidReply(attachment)) {
                            attachment.flip();
                            showReply(attachment);
                            if (getReplyCode(attachment) == 220)
                                login();
                        }
                    } else {
                       // 继续读
                        FTPClient1.this.channel.read(attachment, attachment, this);
                    }
                 } else {
                     System.out.println("remote server closed");
                 }
             }
             @Override
             public void failed(Throwable exc, ByteBuffer attachment) {
                 exc.printStackTrace();
             }
         });
     }
     protected void login() {
        // 准备写缓冲区
         String user = "user anonymous\r\n";
         ByteBuffer buffer = ByteBuffer.wrap(user.getBytes());
         // 发出写操作请求
         channel.write(buffer, buffer,
         // 写操作完成通知
         new CompletionHandler<Integer, ByteBuffer>() {
             @Override
             public void completed(Integer result, ByteBuffer attachment) {
                 if (attachment.hasRemaining())
                     channel.write(attachment, attachment, this);
                 else {
                     // channel.read(dst, attachment, handler);
                     // readReply();
                     // 此处有问题
                 }
             }
             @Override
             public void failed(Throwable exc, ByteBuffer attachment) {
                 exc.printStackTrace();
             }
         });
     }
     protected void showReply(ByteBuffer attachment) {
         while (attachment.hasRemaining())
             System.out.print((char) attachment.get());
     }
     public static int getReplyCode(ByteBuffer buffer) {
         return Character.digit(buffer.get(0), 10) * 100 +
         Character.digit(buffer.get(1), 10) * 10
                 + Character.digit(buffer.get(2), 10);
     }
     public static boolean isValidReply(ByteBuffer buffer) {
         return buffer.get(3) == 32 && Character.isDigit(buffer.get(0))
         && Character.isDigit(buffer.get(1))
                 && Character.isDigit(buffer.get(2));
     }
 }

问题:上面的代码中,login 方法中,完成 login 命令之后,如何继续?

答案是:不能继续。实际上,上面的例子代码回到了同步处理时代。典型的错误使用方式。痛。 同时,CompletionHandler 的创建也成了问题,需要不停地创建这种类型的对象吗?痛! 回顾前面提到的核心:应用程序的责任:什么时候发起操作? I/O 操作完成时通知谁?

就本例而言,FTPClient 本身应该承担应用程序的责任,正如 Client 名称所示,应该由 Client 来实现 CompletionHandler。 Client 负责发出 I/O 操作请求,I/O 操作完成通知 Client。正如世界上其他诸多问题一样,名称本身就是个问题。此处的 Client 的意思是真正的顾客。

可以想象另外一个场景:去一个有叫号机的银行大厅办理业务。“我”到银行,“我”决定办理个人业务,所以取个人业务的号码。然后看看前面等待的其他客人还不少,计算一下时间,“我”决定去隔壁馋嘴一个冰淇淋,回来后,在大厅到处晃晃,这时候,大厅广播通知,333 号顾客请到 3 号窗口办理业务,“我”听到了,检查一下号码,“我”持有 333 号,所以“我”去 3 号窗口。

上面这个场景中有几个非常重要的事实 “我”决定取个人业务号码,“我”听到了,“我”是顾客。 因此,上面例子应该让 FTPClient1 实现 CompletionHandler。这是对的。但是 FTPClient1 有两个重要的职责:发出读操作请求和发出写操作请求。需要两个 CompletionHandler 的角色,但是不能重复实现 CompletionHandler 接口,此时内部类是个不错的选择。修改上面的例子,如下:

第二个简单的例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class FTPClient2 {
    public static void main(String[] args) throws IOException {
        AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
        channel.connect(new InetSocketAddress("ftp.gnu.org", 21), channel,
                new CompletionHandler<Void, AsynchronousSocketChannel>() {
                    @Override
                    public void completed(Void result,
                        AsynchronousSocketChannel attachment) {
                        FTPClient2 client = new FTPClient2();
                        client.start(attachment);
                    }
                    @Override
                    public void failed(Throwable exc,
                        AsynchronousSocketChannel attachment) {
                        exc.printStackTrace();
                    }
                });
        System.in.read();
    }
    AsynchronousSocketChannel channel;
    void readResponse() {
        ByteBuffer buffer = ByteBuffer.allocateDirect(128);
        read(buffer);
    }
    void read(ByteBuffer buffer) {
        channel.read(buffer, buffer, reader);
    }
   // 使用内部类接收读操作完成通知   
    CompletionHandler<Integer, ByteBuffer> reader =
    new CompletionHandler<Integer, ByteBuffer>() {
        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            if (result > 0) {
                int position = attachment.position() - 1;
                if (attachment.get(position - 1) == 13 &&
                    attachment.get(position) == 10) {
                    if (isValidReply(attachment, 0)) {
                        attachment.flip();
                        showReply(attachment);
                        // 状态逻辑,处理响应
                        onReply(getReplyCode(attachment, 0));
                    } else {
                        removeLine(attachment, position - 2);
                        if (isValidReply(attachment, 0)) {
                            attachment.flip();
                            showReply(attachment);
                            onReply(getReplyCode(attachment, 0));
                        } else
                            read(attachment);
                    }
                } else {
                    if (!attachment.hasRemaining())
                        removeLine(attachment, position - 1);
                    read(attachment);
                }
            } else {
                System.out.println("remote server closed");
            }
        }
         @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            exc.printStackTrace();
        }
    };
    public void start(AsynchronousSocketChannel channel) {
        this.channel = channel;
        readResponse();
    }
    protected void onReply(int replyCode) {
       // 按照前面定义好的步骤,处理状态逻辑
        if (replyCode == 220)
            login();
        if (replyCode == 230)
            writeCommand("size README");
        else if (replyCode == 213)
            writeCommand("QUIT");
        else if (replyCode == 221)
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
    }
    void writeCommand(String string) {
        System.out.print("==>");
        System.out.println(string);
        ByteBuffer buffer = ByteBuffer.wrap((string + "\r\n").getBytes());
        write(buffer);
    }
    void write(ByteBuffer buffer) {
        channel.write(buffer, buffer, writer);
    }
    // 使用内部类接收写操作完成通知
    CompletionHandler<Integer, ByteBuffer> writer =
    new CompletionHandler<Integer, ByteBuffer>() {
        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            if (attachment.hasRemaining())
                channel.write(attachment, attachment, this);
            else
                readResponse();
        }
        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
             exc.printStackTrace();
        }
    };
    protected void login() {
        String user = "user anonymous";
        writeCommand(user);
    }
    
   // 处理多行响应
    protected void removeLine(ByteBuffer buffer, int position) {
        int limit = buffer.position();
        byte c;
        while (position >= 0) {
            c = buffer.get(position);
            if (c == 13 || c == 10) {
                showReply(buffer, position);
                buffer.position(position + 1);
                buffer.limit(limit);
                buffer.compact();
                break;
            }
            position--;
        }
    }
   // 针对多数 FTP 服务器的响应的偷懒的方法,不用费劲处理 String。
    protected void showReply(ByteBuffer buffer) {
        while (buffer.hasRemaining())
            System.out.print((char) buffer.get());
    }
    protected void showReply(ByteBuffer buffer, int position) {
        for (int i = 0; i < position; i++)
            System.out.print((char) buffer.get(i));
    }
    public static int getReplyCode(ByteBuffer buffer, int start) {
        return Character.digit(buffer.get(start), 10) * 100 +
        Character.digit(buffer.get(start + 1), 10) * 10
                + Character.digit(buffer.get(start + 2), 10);
    }
    public static boolean isValidReply(ByteBuffer buffer, int start) {
        return buffer.get(start + 3) == 32 &&
        Character.isDigit(buffer.get(start))
                && Character.isDigit(buffer.get(start + 1))
                && Character.isDigit(buffer.get(start + 2));
    }
    public static int findCRLF(ByteBuffer buffer, int start, int end) {
        while (start < end) {
            if (buffer.get(start++) == 13) {
                if (start < end) {
                    if (buffer.get(start) == 10) {
                        return start + 1;
                    }
                }
            }
        }
        return -1;
    }
}

对比两个代码,可以发现:修改后的代码的 onReply 方法,与上文中描述的需求步骤基本上一模一样。与使用阻塞模式编写的代码相比,应该更加简洁。阻塞模式下,你至少需要一个控制循环。似乎有点快乐了。

继续 FTP 的编程,升华完成通知类型

因为读写操作的使用远远多于其他类型的操作,所以重点考虑如何处理读写操作。 回顾前面的第二个例子中的 reader 和 writer 成员, 其实与对象编程理论和实践中的一个很重要的原理“单一职责原理”比较吻合。 但是,如果需要写很多的网络程序,或者提供一个网络编程的框架(虽然现在有不少,例如:grizzly,JProactor),那么内部类的方式显然显得局限。

重用?重用什么?如何重用?

需要注意的是:AIO 读写操作并不保证操作一次全部完成。单个读写操作请求可能收到多次完成通知

多数网络应用程序发送响应或者请求消息,都需要将准备好的缓冲区全部内容发送出去。可以预见,前面的 writer 内部类成员可以独立,改编为抽象的 Writer 类型。这时候,前文中内部类的隐式引用好处就会失去,而且诞生出新的回调接口。

FTP 使用 telnet 协议的消息格式。消息以 <CRLF> 结束。 Telnet 协议家族的响应消息基本上都使用“code<SPACE>message<CRLF>”。

从处理 FTP 或者 telnet 协议家族的响应消息来看,前文的 reader 成员应该可以独立,至少可以抽象一个专门用于读取 Telnet 响应的 TelnetReader 类型。同样,也诞生出新的回调接口。对于 Reader 类型,还可以想象几种应用模式:

  • 读取指定长度的数据,SizeReader;
  • 一直读直到对方关闭通道,EOFReader。
  • 多数情况下读操作都会去检查读到的数据长度是否为 -1,以检测对方是否已经关闭通道

这样,对于 Reader 类型,某种程度的策略模式的应用需求已经浮现出来。

但是 Client 类型本身至少也可以实现一种类型的 CompletionHandler。如果这样,将产生一个争论:继承还是委托? 很多情况下这实际上是口味的问题,并非优劣的选择。

同时,对于读写操作而言,CompletionHandler 的类型是确定的 Integer 类型,似乎增加一个新的派生接口 Callback<T> 更加满足需要。

新的读写操作回调接口
1
2
3
4
5
6
7
public interface Callback<T> extends CompletionHandler<Integer, T> {
    @Override
    void completed(Integer result, T context);
    @Override
    void failed(Throwable cause, T context);
}

除上述考虑之外,最重要的一点是,有状态还是无状态。CompletionHandler 或者 Callback 接口本身无状态可言,但其实现存在有无状态的选择。AIO 内核并不关心 CompletionHandler 的 attachment 参数,内核不会使用也不会施加任何限制。但是实现类则大不同。有状态和无状态的设计将直接影响到 attachment 参数的使用。如您所看见,Callback 接口已经将 attachment 参数更名为 context。同时,因为 AsynchronousChannel 都需要 ByteBuffer,attachment 的使用也必须考虑 ByteBuffer 的使用方式。对于每一个读写操作而言,有三个因素是必须考虑的:AsynchronousChannel,ByteBuffer,attachment。普通应用程序也好,还是框架,实际上只考虑一个问题,就是如何组合这三个因素。某种程度上说,AIO 编程其实是 attachment编程,实不为过。怎一个痛字了得!

与此同时,因为诞生新的回调接口,预示着 Client 的层次在不断增加,也意味着 Client 的职责在进行分化。某些网络应用框架中的 filter 类型与此类似。

在没有更好的方案的时候,作者选择有状态方式的设计。

简单的有状态写操作类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class BufferWriter implements Callback<WriteCallback> {
    private AsynchronousSocketChannel channel;
    private ByteBuffer buffer;
    private Charset charset;
    public BufferWriter(AsynchronousSocketChannel channel, Charset charset) {
        this.channel = channel;
        this.charset = charset;
    }
    public void write(String string, WriteCallback write) {
        buffer = ByteBuffer.wrap(string.getBytes(charset));
        channel.write(buffer, write, this);
    }
    @Override
    public void completed(Integer result, WriteCallback context) {
        if (buffer.hasRemaining())
            channel.write(buffer, context, this);
        else {
            buffer = null;
            context.writeCompleted();
        }
    }
    @Override
    public void failed(Throwable cause, WriteCallback context) {
        buffer = null;
        context.writeFailed(cause);
    }
}
抽象读操作模板类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public abstract class AbstractReadCallback<T> implements Callback<T> {
    protected abstract void readCompleted(Integer result, T context);
    protected abstract void onChannelClose(T context);
    @Override
    public void completed(Integer result, T context) {
       // 重新分发回调通知
        if (result > 0)
            readCompleted(result, context);
        else
            onChannelClose(context);
    }
}
简单的有状态读操作类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
public class TelnetReplyReader extends AbstractReadCallback
    <ResponseCallback<Reply>> {
    private AsynchronousSocketChannel channel;
    private CharsetDecoder decoder;
    // 简单的 ByteBuffer 工厂,来自 JDK 的 corba 中的实现
    private ByteBufferPool pool;
    private ByteBuffer buffer;
    //FTP 响应数据对象
    private Reply reply = new Reply();
    public TelnetReplyReader(AsynchronousSocketChannel channel,
    ByteBufferPool pool, Charset charset) {
        this.channel = channel;
        this.pool = pool;
        decoder = charset.newDecoder();
    }
    public void read(ResponseCallback<Reply> protocol) {
        reply.reset();
        if (buffer == null)
            buffer = pool.get(1024);
        buffer.clear();
        channel.read(buffer, protocol, this);
    }
    @Override
    protected void onChannelClose(ResponseCallback<Reply> context) {
        try {
            channel.close();
        } catch (IOException e) {
            // ignore;
        }
        // 转换为特定的异常类型
        failed(new ClosedChannelException(), context);
    }
    @Override
    protected void readCompleted(Integer result,ResponseCallback<Reply> context){
        ByteBuffer buffer = this.buffer;
        try {
           // 响应代码的处理逻辑,直到获得有效的响应代码,否则哭到长城
            int position = buffer.position();
            if (buffer.get(position - 2) == 13 &&
                buffer.get(position - 1) == 10) {
                // Yes check reply code;
                if (findReplyCode(buffer, position - 2)) {
                    // buffer position at the code first char;
                    int first = buffer.position();
                    reply.code = getReplyCode(buffer, first);
                    if (first > 0) {
                        buffer.flip();
                        reply.other.append(decoder.decode(buffer));
                    }
                    buffer.limit(position - 2);
                    buffer.position(first + 4);
                    reply.message = decoder.decode(buffer).toString();
                    returnBuffer();
                    context.onResponse(reply);
                    return;
                }
                buffer.flip();
                reply.other.append(decoder.decode(buffer));
                buffer.clear();
                channel.read(buffer, context, this);
                return;
            }
            // No reply code, consider cache other message
            if (buffer.hasRemaining()) {
                channel.read(buffer, context, this);
                return;
            }
         // Have to cache some message, but may be have reply code, so just check CRLF;
            int index = findLF(buffer, position - 2);
            if (index == -1) {
                buffer.flip();
                reply.other.append(decoder.decode(buffer));
            } else {
                buffer.position(0).limit(index + 1);
                reply.other.append(decoder.decode(buffer));
                buffer.position(index);
            }
            buffer.limit(position);
            buffer.compact();
            channel.read(buffer, context, this);
        } catch (CharacterCodingException ex) {
            failed(ex, context);
        }
    }
    @Override
    public void failed(Throwable cause, ResponseCallback<Reply> context) {
        returnBuffer();
        context.failed(cause);
    }
    private void returnBuffer() {
        pool.releaseBuffer(buffer);
        buffer = null;
    }
...
使用有状态读写操作类型的控制类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class FTPClient implements ResponseCallback<Reply>,
WriteCallback, CommandProvider {
    private TelnetReplyReader reader;
    private BufferWriter writer;
    private Semaphore semaphore = new Semaphore(0);
    // 传输通道的处理环境
    private TransferContext transferContext;
    protected void start(Context context, AsynchronousSocketChannel channel) {
        InetSocketAddress remote;
        try {
            remote = (InetSocketAddress) channel.getRemoteAddress();
        } catch (IOException e) {
            failed(e);
            return;
        }
        InetSocketAddress local;
        try {
            local = (InetSocketAddress) channel.getLocalAddress();
        } catch (IOException e) {
            failed(e);
            return;
        }
        Charset charset = Charset.forName("UTF-8");
        reader = new TelnetReplyReader(channel, context.pool(), charset);
        writer = new BufferWriter(channel, charset);
        // 发起读操作请求
        reader.read(this);
        // 同时,预备传输通道环境
        transferContext = new SimpleTransferContext(context,
         remote.getAddress(), local.getAddress());
    }
    @Override
    public void onResponse(Reply reply) {
       // 简单的响应处理逻辑
        try {
            transferContext.check(reply);
        } catch (Throwable ex) {
            ex.printStackTrace();
        }
        // If reply not process right, just pending any advance operation.
        if (reply.code / 100 == 1)
            reader.read(this);
        else
            semaphore.release();
    }
    @Override
    public void writeCompleted() {
       //FTP 规则,发出请求命令后,开始等待对方的响应
        reader.read(this);
    }
    ...

除协议相关的部分代码,其余的看上去还蛮简单,似乎抽象 Reader 和 Writer 的代价值得的。上面代码中的 Context,Reply 等小类型,可以在完整的源代码中检查。

继续 FTP,处理传输通道

本文之所以选择 FTP 作为 AIO 的实践例子,FTP 的控制通道必须协调单独的数据传输通道。不仅如此,使用 Port 方式的话,客户端程序还需要建立一个简单的网络服务器。

上文中,我们尽量回避建立网络连接的 CompletionHandler 的再处理问题。FTP 的数据传输通道

  • 要么使用服务器方式,使用 AsynchronousServerSocketChannel.accept 方法
  • 要么使用客户端方式,使用 AsynchronousSocketChannel.connect 方法,与前文类似

延续上文的处理思路,继续抽象用于 connect 的 CompletionHandler 类型。与前不同的是,该连接回调类型使用无状态方式设计。该例演示下载文件的处理。

无状态类型的连接回调类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class SocketConnector implements Connector<Object[]> {
    public void connect(InetSocketAddress remote, ConnectionCallback client)
    throws IOException {
        // 创建新的异步网络通道
        AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
        // 无状态方式处理,将所有需要的参数打包为单个 attachment 参数
        Object[] attachment = { client, remote, channel };
        // 启动连接操作
        channel.connect(remote, attachment, this);
    }
    public void connect(InetSocketAddress remote, InetSocketAddress local,
    ConnectionCallback client)throws IOException {
        AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
        // 绑定本地网络地址,对于客户端而言,通常是 IP,对于服务器而言,一定需要端口号
        channel.bind(local);
        Object[] attachment = { client, remote, channel };
        channel.connect(remote, attachment, this);
    }
    @Override
    public void completed(Void result, Object[] attachment) {
        // 连接完成,通知 Client 启动协议控制逻辑
        ((ConnectionCallback) attachment[0]).
        start((AsynchronousSocketChannel) attachment[2]);
    }
    @Override
    public void failed(Throwable cause, Object[] attachment) {
        ((ConnectionCallback) attachment[0]).
        connectFailed(new Exception(attachment[1].toString(), cause));
    }
}

但是,有些 FTP 服务器要求数据传输通道必须使用与控制通道相同的 ip 地址,导致连接必须知道并保持控制通道的 ip 地址。唉!又痛到有状态方式了。

有状态类型的连接回调类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class TransferConnector extends SocketConnector {
     private InetAddress localAddress;
     private InetAddress remoteAddress;
     public TransferConnector(InetAddress remoteAddress, InetAddress localAddress) {
         this.remoteAddress = remoteAddress;
         this.localAddress = localAddress;
     }
     protected InetSocketAddress createRemoteAddress(int port) {
         return new InetSocketAddress(remoteAddress, port);
     }
     protected InetSocketAddress createLocalAddress() {
         return new InetSocketAddress(localAddress, 0);
     }
     public void connect(int port, ConnectionCallback client) throws IOException {
         if (port < 1)
             throw new IOException("Error remote server port number: " + port);
         super.connect(createRemoteAddress(port), createLocalAddress(), client);
     }
 }
使用连接回调类型建立数据传输通道
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class SimpleTransferContext
 implements TransferContext, ConnectionCallback, FileLockCallback {
   ...
    // 使用单独的传输连接回调对象再次进行连接完成通知
   connector = new TransferConnector(remoteAddress, localAddress);
    ...
    // 发起传输通道的连接操作请求
    case RETR:
            connector.connect(port, this);
            // clear for next time
            port = 0;
            ...
   // 传输通道连接完成
    @Override
    public void start(AsynchronousSocketChannel channel) {
        this.channel = channel;
    }

因为涉及到文件的处理,FTP 的数据传输通道起始控制看起来相当简单。快乐其实是很简单的东西。

继续 FTP,使用 AIO 的异步文件操作

AsynchronousFileChannel 没有 connect 方法,但是有一个类似的方法 lock。JDK7 中该方法的声明如下:

异步文件通道的 lock 方法 API
1
2
3
4
5
public abstract <A> void lock(long position,
                             long size,
                             boolean shared,
                             A attachment,
                             CompletionHandler<FileLock,? super A> handler)
无状态的文件连接回调类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class FileLocker implements CompletionHandler<FileLock, FileLockCallback> {
     public void lock(String filename, long position, long size,
        boolean shared, FileLockCallback client,
         OpenOption... options) throws IOException {
         // 使用新的 AIO 中的 Path API
         Path path = Paths.get(filename);
         // 创建异步文件通道对象
         AsynchronousFileChannel file = AsynchronousFileChannel.open(path, options);
         // 锁定要写的区域
         file.lock(position, size, shared, client, this);
     }
     @Override
     public void completed(FileLock result, FileLockCallback attachment) {
         // 文件锁(或者文件连接)完成通知传输通道环境可以工作
         attachment.start(result);
     }
     @Override
     public void failed(Throwable cause, FileLockCallback attachment) {
         attachment.lockFailed(cause);
     }
}
使用文件连接回调类型建立文件通道
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class SimpleTransferContext
implements TransferContext, ConnectionCallback, FileLockCallback {
...
 
   @Override
   public void start(FileLock fileLock) {
       this.fileLock = fileLock;
       // at here socket channel already prepared
       // 启动下载过程
       startDownload();
   }
   private Downloader download;
   private void startDownload() {
       download = new Downloader(context, channel, fileLock, size);
       channel = null;
       fileLock = null;
       download.run();
   }
   
   public void check(Reply reply) {
       if (currentCommand == null) {
           System.out.println(reply);
           return;
       }
       int code = reply.code;
       String message = reply.message;
       switch (currentCommand) {
       case SIZE...
       case RETR:
           if (code == 150) {
               // 150 Opening BINARY mode data connection for README (1765 bytes).
               int end = message.lastIndexOf(')');
               if (end != -1) {
                   int start = message.lastIndexOf('(', end - 1);
                   if (start != -1) {
                      //RETR 命令响应正确,检查本地文件,预备下载
                       lockFile(checkSize(message.substring(start + 1, end - 6)));
                       break;
                   }
               }
               ...
   }
   
   protected void lockFile(long size) {
       try {
           locker.lock(filename, 0, size, false, this,
              StandardOpenOption.CREATE,
               StandardOpenOption.READ,
               StandardOpenOption.WRITE);
       } catch (IOException e) {
           e.printStackTrace();
       }
   }

当 FTP 的 RETR 命令正确响应后,准备下载文件。首先准备好要写入的本地文件通道,锁住文件。 文件锁完成后,创建新的 Downloader 对象,开始真正的下载操作。

使用文件连接回调类型建立文件通道
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public abstract class Transfer {
   // 用于 Socket 和 File 读写操作使用的 ByteBuffer 的交换队列
    protected BlockingDeque<ByteBuffer> bufferQueue =
    new LinkedBlockingDeque<ByteBuffer>();
    protected Context context;
    public Transfer(Context context) {
        this.context = context;
    }
    public ByteBuffer getBuffer(int size) {
        return context.pool().get(size);
    }
    protected void releaseBuffer(ByteBuffer buffer) {
        context.pool().releaseBuffer(buffer);
    }
}
下载实现,读和写
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class Downloader extends Transfer
   implements ReadCallback, FileWriteCallback2, Runnable {
    // 读入指定长度内容的回调对象,处理网络内容
    private SizeReader reader;
    // 写入指定长度内容的回调对象,处理文件内容
    private FileWriter2 writer;
    private AtomicBoolean writable = new AtomicBoolean(true);
    // 用于显示网络数据传输速率的工具
    private ConsoleProgress progress = new ConsoleProgress();
    public Downloader(Context context, AsynchronousSocketChannel socket,
    FileLock fileLock, long size) {
        super(context);
        reader = new SizeReader(socket, size, this);
        writer = new FileWriter2(fileLock, this);
        progress.reset(size);
    }
    @Override
    public void run() {
        reader.read();
    }
    @Override
    public void writeCompleted(ByteBuffer buffer) {
       // 一个缓冲区写入文件完毕
        releaseBuffer(buffer);
        buffer = bufferQueue.poll();
        if (buffer != null)
           // 如果网络已经读好一个缓冲区,继续写入文件
            writer.write(buffer);
        else
           // 否则清除写状态
            writable.set(true);
    }
    @Override
    public void readCompletedBytes(Integer bytes, long start, long end) {
       // 显示网络传输进度
        progress.update(bytes, start, end);
        progress.run();
    }
    @Override
    public void completedReadBuffer(ByteBuffer buffer) {
        if (writable.compareAndSet(true, false)) {
           // 从网络下载了一个缓冲区的内容,如果写文件空闲,通知写文件
            writer.write(buffer);
        } else {
           // 如果文件正在写,将当前缓冲区放入后备队列
            bufferQueue.offer(buffer);
        }
    }
    @Override
    public void writeCompleted() {
        System.out.println("file saved OK");
    }
    @Override
    public void readCompleted() {
        System.out.println("file transfer OK");
    }
    ...
  • Downloader 使用 SizeReader 读取网络数据。SizeReader 使用自主的缓冲区申请,不需要调用者传递 ByteBuffer 参数。
  • Downloader 使用 FileWriter2 写文件内容。FileWriter2 使用一次性写完外部传递的缓冲区的策略。 需要调用者传递 ByteBuffer 参数。

限于篇幅,具体实现可以在源代码中检查。

总结

线程池和 Group

前文提到到 group,但是没有解释。group 指 AsynchronousChannelGroup,用于管理异步通道资源的环境对象,封装一个处理 I/O 完成的机制。 这个组对象关联一个线程池。可以将处理 I/O 事件的任务提交到这个线程池,通过 channel 的 read,write,connect 等方法进行。线程池中的工作线程将会带着 channel 上 I/O 操作结果调用 CompletionHandler.complete方法。除了处理 I/O 事件,组关联的线程池可能会执行其他与 I/O 操作相关的任务。这个 group 对象相当于 Proactor 模式中 Dispatcher。

四种异步通道的 open 方法可以指定 group 参数,或者不指定。 每个异步通道都必须关联一个组,要么是系统默认组,要么是创建的一个特定的组。例如,不能直接从一个 socket 对象上创建一个 AsynchronousSocketChannel。 如果不使用 group 参数,java 使用一个默认的系统范围的组对象。系统默认的组对象的线程池参数可以使用两个属性进行配置:

  1. java.nio.channels.DefaultThreadPool.threadFactory 默认组对象不会将其关联的线程池中的线程进行额外的配置,因此,这些线程都是 daemon 线程。
  2. java.nio.channels.DefaultThreadPool.initialSize: 处理 I/O 事件的最大线程数量。

是否使用自定义的 group 对象,各有优劣,由你决定。

  • 使用 group,好处是你可以将文件通常与网络通道分开,避免线程干扰。缺点是:使用者通常必须负责关闭组,多数时候取决于使用的现成工厂类型。组与 ExecutorService 类似,这意味着关闭过程通常是两步关闭方法。 在多层次 Client 结构(例如 FTP 的控制通道需要衍生新的数据传输通道)中,如果要使用 group,很讨厌的一点就是 group 参数传递。没有环境编程之类的工具进行辅助的话,使用者必须考虑如何有效传递 group 参数。
  • 不使用 group,最大的好处是不用传递 group 参数。缺点是:必须注意处理非 daemon 线程的完成和退出,不小心的话,将会导致异步通道的工作丢失;同时还需要处理线程工厂和最大线程数的配置。

*PendingException 和 AsynchronousChannel

AsynchronousChannel 设计为线程安全的,即可以同时进行读写操作,全双工模式操作。不少协议使用半双工模式。读完写或者写完读。什么时候会进行并发访问 AsynchronousChannel,即使用全双工模式?主要看协议的实现。例如 FTP 的 abort 命令,要求可以控制连接可以同时进行读写。数据连接在进行文件传输的时候,控制连接等待服务器响应。实际上此时也可以进行写操作,发送一个 abort 命令,促使数据传输过程中断。这个 abort 可以从 UI 线程或者从 UI 事件产生的线程中发出。虽然如此,但是不少系统实现最多只允许一个写操作和一个读操作。如果一个读写操作没有完成,程序又发送一个读写操作命令,则导致 ReadPendingException 或者 WritePendingException。如果你的程序非要这样的话,只有一个解决办法,将读写操作的命令使用队列排队进行。通常应该不会出现这种需求,如果有的话,很有可能是设计上的缺陷。

读写超时。AsynchronousChannel 的读写操作可以指定超时参数,但是超时发生之后,传递给读写操作的 ByteBuffer 参数不应该向正常读写完成一样进行处理。通常设计如果超时发生,一般应该丢弃当前期望数据结果。

ByteBuffer 和解码

AIO 鼓励使用 DirectByteBuffer。就算应用程序代码中不使用 DirectByteBuffer,AIO 内核实现也会使用 DirectByteBuffer 来复制外部传入的 HeadByteBuffer 内容。在某些情况下完全可以利用这一特征,偷懒而不会有损失。例如:传输协议中发送普通命令,完全可以不使用 DirectByteBuffer,这些命令的提供通常以 String 类型出现,而 String 到 DirectByteBuffer 无论如何必须经过两个步骤: String–byte[]–DirectByteBuffer. 第二步完全可以由 AIO 内核进行。

如果需要从 DirectByteBuffer 解码到 String,有选择余地:

  • 使用 Decoder 和 CharBuffer:DirectByteBuffer–CharBuffer–(char[])String。
  • 使用 String 和 byte[]:DirectByteBuffer–byte[]–(char[])String

可以看出,这种情况数组复制的工作量不小。如果没有使用 Javolution 方式的栈内存分配和对象工厂,其实没有什么区别。

 

来源: https://www.ibm.com/developerworks/cn/java/j-lo-nio2/index.html




快乐成长 每天进步一点点      京ICP备18032580号-1