Java TCP/IP Socket 编程
第三章:发送和接收数据
TCP/IP协议的唯一约束是:
- 信息必须在块(chunks)中发送和接收
- 而块的长度必须是8位的倍数,因此,我们可以认为在TCP/IP协议中传输的信息是字节序列。
可以进一步把传输的信息看作数字序列或数组,每个数字的取值范围是0到255。即8位编码
的00000000 ~ 11111111
。
一个程序使用套接字与其他程序交换信息,通常符合下面两种情况之一:
- 设计和编写了套接字的客户端和服务器端,这样能够随心所欲地定义自己的应用程序协议;
- 要么是实现了一个已经存在的协议,或许是一个协议标准。
信息编码
简单数据类型如int
,long
,char
,String
等通过套接字发送和接收。传输信息时
可以:
-
通过套接字将字节信息写入一个已经关联Socket的
OutputStream
实例中。 -
将其封装进一个
DatagramPacket
实例中由DatagramSocket
发送。
这些操作所能处理的唯一数据类型是字节和字节数组。Java的内置工具能把int
或
String
等其他数据类型显式转换成字节数组,如:String.getBytes()
。
基本整型
TCP和UDP套接字使我们能发送和接收范围在0 ~ 255
之间的字节序列数组。Java程序中:
-
int
数据类型由32位表示,因此可以使用4个字节来传输任意的int
型变量或常量; -
short
数据类型由16位表示,传输short
类型的数据只需要两个字节 -
同理传输64位的
long
类型数据则需要8个字节。
例:考虑如何对一个包含了4个整数的序列进行编码:一个byte
型,一个short
型,
一个int
型,以及一个long
型,按照这个顺序从发送者传输到接收者。我们总共需要
15个字节:
- 第一个字节存放byte型数据,
- 接下来两个字节存放short型数据,
- 再后面4个字节存放int型数据,
- 最后8个字节存放long型数据,
如下所示:
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ byte| short |<---- int ---->|<--- long --->|
大端与小端
对于需要超过一个字节来表示的数据类型,必须确定字节的发送顺序。显然有两种选择:
- 从整数的右边开始,由低位到高位地发送,即little-endian顺序;
- 从左边开始,由高位到低位发送,即big-endian顺序。
注意, 同一个字节中,位的顺序在实现时是以标准的方式处理的。
例:考虑长整型数123456787654321L
,以64位十六进制形式表示为0x0000704885F926B1
big-endian顺序,十进制数值序列:
00 | 00 | 70 | 48 | 85 | F9 | 26 | B1 |
---|---|---|---|---|---|---|---|
0 | 0 | 112 | 72 | 133 | 249 | 38 | 177 |
little-endian顺序,十进制数组序列:
B1 | 26 | F9 | 85 | 48 | 70 | 00 | 00 |
---|---|---|---|---|---|---|---|
177 | 38 | 249 | 133 | 72 | 112 | 0 | 0 |
Java中的ByteOrder
类中有两个常量:BIG_ENDIAN
和LITTLE_ENDIAN
来表示大端与小端。
数值的符号位
传输的数值是有符号的(signed)还是无符号的(unsigned)。Java中的四种基本整型都是 有符号的,它们的值以二进制补码(two's-complement)的方式存储。
-
有符号数值用二进制补码的形式表示负整数。在处理有\(k\)位的有符号数时,
\(-n\)(其中\( n \in [1, 2^{k-1}]\)),则补码的二进制值就为\(2^k-n\)。
- 非负整数\(p\)(其中\(p \in [0, 2^{k-1}-1]\)),只是简单地用\(k\)位二进制数来表示\(p\)的值。
- 因此,对于给定的\(k\)位,我们可以通过二进制补码来表示\(-2^{k-1}\)到\(2^{k-1}-1\)范围 的值。
- 注意,最高位(msb)标识了该数是正数(msb = 0)还是负数(msb = 1)。
-
无符号(unsigned)编码,\(k\)位可以直接表示\(0\)到\(2^k-1\)之间的数值。例如,32位
数值
0xffffffff
(所有位全为1),将其解析为有符号数时,二进制补码整数表示\(-1\); 将其解析为无符号数时,它表示\(4294967295\)。
由于Java并不支持无符号整型,如果要在Java中编码和解码无符号数,则需要额外的工作。 在此假设我们处理的都是有符号整数数据。
移位和屏蔽符号位的例子
使用「位操作(bit-diddling)」(移位和屏蔽)来显式编码进行介绍。示例程序
BruteForceCoding.java
中
-
发送时用
encodeIntBigEndian()
对基本类型进行编码。 -
接收端
decodeIntBigEndian()
进行解码到一个Java的long
型整数。
// define how many byte every type use public static final int BYTE_SIZE = Byte.SIZE / Byte.SIZE; public static final int SHORT_SIZE = Short.SIZE / Byte.SIZE; public static final int INT_SIZE = Integer.SIZE / Byte.SIZE; public static final int LONG_SIZE = Long.SIZE / Byte.SIZE; public static void main(String[] args) { // prepare data for test byte byteVal = 101; short shortVal = 10001; int intVal = 100000001; long longVal = 1000000000001L; System.out.printf(" In Logic: %02X %04X %08X %016X \n", byteVal, shortVal, intVal, longVal); // Encode the fields in the target byte array // stream of byte code byte[] byteStream = new byte[BYTE_SIZE + SHORT_SIZE + INT_SIZE + LONG_SIZE]; // encode the datas int offset = 0; offset = encodeIntBigEndian(byteStream, offset, byteVal, BYTE_SIZE); offset = encodeIntBigEndian(byteStream, offset, shortVal, SHORT_SIZE); offset = encodeIntBigEndian(byteStream, offset, intVal, INT_SIZE); offset = encodeIntBigEndian(byteStream, offset, longVal, LONG_SIZE); // print the stream in hex System.out.printf("Encoded message: %s\n", transByteStreamToHexString(byteStream)); // Decode several fields long value = 0L; offset = 0; value = decodeIntBigEndian(byteStream, offset, BYTE_SIZE); System.out.println("Decoded byte = " + value); offset += BYTE_SIZE; value = decodeIntBigEndian(byteStream, offset, SHORT_SIZE); System.out.println("Decoded short = " + value); offset += SHORT_SIZE; value = decodeIntBigEndian(byteStream, offset, INT_SIZE); System.out.println("Decoded int = " + value); offset += INT_SIZE; value = decodeIntBigEndian(byteStream, offset, LONG_SIZE); System.out.println("Decoded long = " + value); offset += LONG_SIZE; // Demonstrate dangers of conversion offset = 4; value = decodeIntBigEndian(byteStream, offset, BYTE_SIZE); System.out.printf("Decoded value (offset %d, size %d) = %d\n", offset, BYTE_SIZE, value); byte bVal = (byte) decodeIntBigEndian(byteStream, offset, BYTE_SIZE); System.out.printf("Same value as byte = %d\n", bVal); } // 把长度为size个字节的变量long存入数组dst,dst前offset个字节已经存放了 // 其他内容了。 // @param byteStream 存放消息的数组 // @param offset 存放的起始位置 // @param data 要存放的值 // @param dataSize 存放的值占几个字节 // @return 存放后数组被使用了多少个字节(新的offset) public static int encodeIntBigEndian(byte[] byteStream, int offset, long data, int dataSize) { // Warning: Untested preconditions (e.g., 0 <= dataSize <= 8) for (int i = 0; i < dataSize; i++) { // 先通过位移把要取的那8位移到最低, // 通过强制转换把最低的8位转为一个byte。 // // 赋值语句的右边,首先将数值向右移动, // 以使我们需要的字节处于该数值的低8位中。然后,将移位后的数转换成byte型, // 并存入字节数组的适当位置。在转换过程中,除了低8位以外,其他位都将丢弃。 // 这个过程将根据给定数值所占字节数迭代进行。 // 该方法还将返回存入数值后字节数组中新的偏移位置, // 因此我们不必做额外的工作来跟踪偏移量。 byteStream[offset++] = (byte) (data >> ((dataSize - i - 1) * Byte.SIZE)); } return offset; } // 从数组中取出数据 // @param byteStream 存放消息的数组 // @param offset 存放的起始位置 // @param dataSize 存放的值占几个字节 // @return 读出的值 public static long decodeIntBigEndian(byte[] byteStream, int offset,// int dataSize) { // Warning: Untested preconditions (e.g., 0 <= dataSize <= 8) long data = 0; // 根据给定数组的字节大小进行迭代,通过每次迭代的左移操作, // 将所取得字节的值累积到一个long型整数中。 for (int i = 0; i < dataSize; i++) { // 先把当前的内容左移一个字节 // 空出的字节中填入流中指定位置的字节 data = (data << Byte.SIZE) | ((long) byteStream[offset + i] & 0xFF); } return data; } // 用十进制显示数组中的每个字节 // @param byteStream 要显示的数组 // @return 显示的字符串 public static String transByteStreamToHexString(byte[] byteStream) { StringBuilder rtn = new StringBuilder(); for (byte b : byteStream) { // 用十六进制字符串显示一个字节的内容 // 位与`0xFF`只取最后两个字节 rtn.append(String.format("%02X", b & 0xFF)).append(" "); } return rtn.toString(); }
在字节数组偏移量为4的位置,该字节的十进制值是245,然而,当将其作为一个有符号字节
读取时,其值则为-11。如果我们将返回值直接存入一个long型整数,它只是简单地变成
这个long型整数的最后一个字节,值为245。如果将返回值放入一个字节型整数,其值则为
-11。到底哪个值正确取决于你的应用程序。如果你从N个字节解码后希望得到一个有符号的
数值,就必须将解码结果(长的结果)存入一个刚好占用N个字节的基本整型中。如果你
希望得到一个无符号的数组,就必须将解码结果存入更长的基本整型中,该整型至少要占用
N+1
个字节。
注意,在encodeIntBigEndian()
和decodeIntBigEndian()
方法的开始部分,我们可能
需要做一些前提条件检测,如\(0 \le size \le 8\)和\(dst \neq null\)等。你能举出需要做的其他前期检测吗?
以上程序中数组内的数据如下:
byte | short | int | long | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
HEX | 65 | 27 | 11 | 05 | F5 | E1 | 01 | 00 | 00 | 00 | E8 | D4 | A5 | 10 | 01 |
DEC | 101 | 39 | 17 | 5 | 245 | 225 | 1 | 0 | 0 | 0 | 232 | 212 | 165 | 16 | 1 |
使用IO流
构建本例中的消息的一个相对简单的方法是使用DataOutputStream
类和
ByteArrayOutputStream
类。
-
DataOutputStream
类允许你将基本数据类型,如上述整型,写入一个流中:它提供了writeInt()
、wirteByte()
等方法按照big-endian顺序把数据二进制补码写到流中。 -
ByteArrayOutputStream()
类获取写到流中的字节序列,并将其转换成一个字节数组。
下面的代码和BruteForceEncoding.java
的效果一样:
ByteArrayOutputStream buf = new ByteArrayOutputStream(); DataOutputStream out = new DataOutputStream(buf); try { out.writeByte(byteVal); out.writeShort(shortVal); out.writeInt(intVal); out.writeLong(longVal); out.flush(); } catch (IOException e) { e.printStackTrace(); } System.out.printf("Encoded message: %s\n", transByteStreamToHexString(buf.toByteArray()));
类似在接收时可以用DataInputStream
类和ByteArrayInputStream
类。
字符串和文本
Java使用Unicode编码字符集表示char
型和String
型值。
String
的getBytes()
方法返回的数组根据平台默认字符集(default charset)。很多
平台的默认字符集都是UTF-8
,然而一些地区情况有所不同。保证特定编码要将字符集
名字作为参数传递给getBytes()
方法。
例如:"Test!".getBytes()
:
+-------+-------+-------+-------+-------+ | 84| 101| 115| 116| 33| +-------+-------+-------+-------+-------+
"Test!".getBytes("UTF-16BE")
,每个值被编码成了两个字节的序列,高位在前;
+---+---+---+---+---+---+---+---+---+---+ | 0| 84| 0|101| 0|115| 0|116| 0| 33| +---+---+---+---+---+---+---+---+---+---+
调用"Test!".getBytes("IBM037")
,返回结果将是:
+-------+-------+-------+-------+-------+ | 227| 133| 162| 163| 90| +-------+-------+-------+-------+-------+
上面的例子说明,发送者和接收者必须在文本字符串的表示方式上达成共识。 最简单的方法就是定义一个标准字符集。
位操作:布尔值编码
位图(Bitmaps)是对布尔信息进行编码的一种非常紧凑的方式,通常用在协议中。 掩码(mask)是一个的整数值,其中有一位或多位被设为1,其他各位被清空(即,设为0)。 在这里我们处理的是int大小的位图和掩码(32位),但这些方法对其他类型的整数也同样适用。
我们将int中的各位从0到31进行编号,其中0代表最低位。一般来说,如果一个int值在 第i位值为1,其他位都为0的话,该int型整数的值就是\(2^i\)。因此编号为5的位表示32, 编号为12的位表示4096,等等。这里有一些掩码声明的例子:
final int BIT5 = (1<<5); final int BIT7 = 0x80; final int BITS2AND3 = 12; // 8+4 int bitmap = 1234567;
要设置int变量中的特定一位,需要将该int值与特定位对应的掩码进行按位或
(bitwise-OR)操作(|
),通过掩码指定位置1
或置0
:
bitmap |= BIT5; // bit 5 is now one
要清空特定一位,则将该整数与特定所对应的掩码的按位补码(特定位为0,其他位为1)
进行按位与(bitwise-AND)操作。Java中的按位与操作符是&
,按位补码操作符是~
:
bitmap &= ~BIT7; // bit 7 is now zero
也可以通过将相应的所有掩码进行按位或操作,一次设置和清空多位:
bitmap &= ~(BITS2AND3|BIT5); // clear bits 2, 3 and 5
要测试一个整数的特定位是否已经被设置,可以将该整数与特定位对应的掩码进行按位与
&
,并将操作结果与0比较:
boolean bit6Set = (bitmap & (1<<6)) != 0;
组合输入输出流
Java中与流相关的类可以组合起来从而提供强大的功能。例如:
-
将一个Socket实例的
OutputStream
包装在一个BufferedOutputStream
实例中, 这样可以先将字节暂时缓存在一起,然后再一次全部发送到底层的通信信道中以提高 程序的性能。 -
再将这个
BufferedOutputStream
实例包裹在一个DataOutputStream
实例中,以实现 发送基本数据类型的功能。以下是实现这种组合的代码:
Socket socket = new Socket(server, port); DataOutputStream out = new DataOutputStream( new BufferedOutputStream(socket.getOutputStream()));
Java基本输入输出类:
输入输出 | 类型 |
---|---|
Buffered[Input/Output]Stream | 通过缓存性能优化 |
Checked[Input/Output]Stream | 维护数据检查 |
Cipher[Input/Output]Stream | 加密/解密 |
Data[Input/Output]Stream | 数据处理 |
Digest[Input/Output]Stream | 维护数据摘要 |
GZIP[Input/Output]Stream | 压缩/解压缩 |
Object[Input/Output]Stream | 数据处理 |
PushbackInputStream | 允许一个字节或是字节是「末读的」 |
PrintOutputStream | 输出数据类型的字符串表示法 |
Zip[Input/Output]Stream | 以zip格式 |
成帧与解析
数据转换成在线路上传输的格式只完成了一半工作,应用程序协议必须指定消息的接收者 如何确定何时消息已完整接收。成帧(Framing)技术则解决了接收端如何定位消息的 首尾位置的问题。
如果在一个DatagramPacket
中发送数据就有一个确定的长度,接收者能够准确地知道消息
的结束位置。
如果通过TCP套接字来发送消息,情况将变得更复杂,因为TCP协议中没有消息边界的概念。
如果一个消息中的所有字段都有固定的长度,同时每个消息又是由固定数量的字段组成
的话,消息的长度就能够确定,接收者就可以简单地将消息长度对应的字节数读到一个
byte[]
缓存区中。在TCPEchoClient.java
示例程序中我们就是用的这个方法。
但是如果消息的长度是可变的(例如消息中包含了一些变长的文本字符串),就无法知道 需要读取多少字节。
如果接收者试图从套接字中读取比消息本身更多的字节,将可能发生以下两种情况之一:
- 如果信道中没有其他消息,接收者将阻塞等待,同时无法处理接收到的消息; 如果发送者也在等待接收端的响应信息,则会形成死锁(deadlock);
- 如果信道中还有其他消息,则接收者会将后面消息的一部分甚至全部读到第一条消息 中去,这将产生一些协议错误。因此,在使用TCP套接字时,成帧就是一个非常重要的 考虑因素。
一些相同的考虑也适用于查找消息中每个字段的边界: 接收者需要知道每个字段的结束位置和下一个字段的开始位置。因此消息成帧技术几乎 都可以应用到字段上。
然而,最简单并使代码最简洁的方法是将这两个问题分开处理: 首先定位消息的结束位置,然后将消息作为一个整体进行解析。 在此我们专注于将整个消息作为一帧进行处理。
主要有两个技术使接收者能够准确地找到消息的结束位置:
- 基于定界符(Delimiter-based):消息的结束由一个唯一的标记(unique marker) 指出,即发送者在传输完数据后显式添加的一个特殊字节序列。这个特殊标记不能在 传输的数据中出现。
- 显式长度(Explicit length):在变长字段或消息前附加一个固定大小的字段, 用来指示该字段或消息中包含了多少字节。
基于定界符的方法的一个特殊情况是,可以用在TCP连接上传输的最后一个消息上:
在发送完这个消息后,发送者就简单地关闭(使用shutdownOutput()
或close()
方法)
发送端的TCP连接。接收者读取完这条消息的最后一个字节后,将接收到一个流结束标记
(即read()
方法返回-1
),该标记指示出已经读取到达了消息的末尾。
基于定界符的方法的缺点是消息本身不能包含有定界字符。幸运的是填充(stuffing)技术 能够对消息中出现的定界符进行修改,从而是接收者不将其识别为定界符。在接收者扫描 定界符时,还能识别出修改过的数据,并在输出消息中对其进行还原,从而使其与原始消息 一致。这个技术的缺点是发送者和接收者双方都必须扫描消息。
基于长度的方法更简单一些,不过要使用这种方法必须知道消息长度的上限。发送者先要 确定消息的长度,将长度信息存入一个整数,作为消息的前缀。消息的长度上限定义了用来 编码消息长度所需要的字节数:如果消息的长度小于256字节,则需要1个字节;如果消息的 长度小于65536字节,则需要2个字节,等等。
基于定界符的例子
定义的Framer.java
接口。它有两个方法:
-
frameMsg()
方法用来添加成帧信息并将指定消息输出到指定流 -
nextMsg()
方法则扫描指定的流,从中抽取出下一条消息。
public interface Framer { void frameMsg(byte[] message, OutputStream out) throws IOException; byte[] nextMsg() throws IOException; }
DelimFramer.java
类实现了基于定界符的成帧方法,其定界符为「换行」符("\n"
,
字节值为10
)。
-
frameMethod()
方法并没有实现填充,当成帧的字节序列中包含有定界符时,它只是 简单地抛出异常。(扩展该方法以实现填充功能将作为练习留给读者) -
nextMsg()
方法扫描流,直到读取到了定界符,并返回定界符前面的所有字符,如果 流为空则返回null
。如果累积了一个消息的不少字符,但直到流结束也没有找到 定界符程序将抛出一个异常来指示成帧错误。
public class DelimFramer implements Framer { private InputStream in; // data source private static final byte DELIMITER = '\n'; // message delimiter // 获取消息的输入流作为参数传递给该函数。 public DelimFramer(InputStream in) { this.in = in; } // 输出一帧信息 public void frameMsg(byte[] message, OutputStream out) throws IOException { // ensure that the message does not contain the delimiter // 检查消息中是否包含了定界符,如果包含,则抛出一个异常。 for (byte b : message) { if (b == DELIMITER) { throw new IOException("Message contains delimiter"); } } // 将成帧的消息输出到流中。 out.write(message); out.write(DELIMITER); out.flush(); } // 从输入中提取一帧消息 public byte[] nextMsg() throws IOException { ByteArrayOutputStream messageBuffer = new ByteArrayOutputStream(); int nextByte; // fetch bytes until find delimiter // 读取流中的每个字节,直到遇到定界符为止:第35行 while ((nextByte = in.read()) != DELIMITER) { if (nextByte == -1) { // end of stream? // 如果把输入流读完了都没有读到分隔符 if (messageBuffer.size() == 0) { // if no byte read // 整个消息就是空的,直接反回空 return null; } else { // if bytes followed by end of stream: framing error // 输入流里有数据但没有分隔符,那么认为是数据丢失,抛异常 throw new EOFException("Non-empty message without delimiter"); } } // 将无定界符的字节写入消息缓存区 messageBuffer.write(nextByte); // write byte to buffer } // 将消息缓存区中的内容以字节数组的形式返回 return messageBuffer.toByteArray(); } }
以上定界符帧有一个限制,即它不支持多字节定界符。如何对其进行修改以支持多字节 定界符将作为练习留给我们的读者。
基于长度的例子
LengthFramer.java
类实现了基于长度的成帧方法,适用于长度小于65535
(\(2^{16}-1\))字节的消息。
- 发送者首先给出指定消息的长度,并将长度信息以big-endian顺序存入两个字节的整数 中。
- 再将这两个字节放在完整的消息内容前,连同消息一起写入输出流。
在接收端:
-
使用
DataInputStream
以读取整型的长度信息;readFully()
方法将阻塞等待, 直到给定的数组完全填满。值得注意的是,使用这种成帧方法,发送者不需要检查要 成帧的消息内容,而只需要检查消息的长度是否超出了限制。
public class LengthFramer implements Framer { public static final int MAXMESSAGELENGTH = 65535; public static final int BYTEMASK = 0xff; public static final int SHORTMASK = 0xffff; public static final int BYTESHIFT = 8; private DataInputStream in; // wrapper for data I/O public LengthFramer(InputStream in) throws IOException { this.in = new DataInputStream(in); } // 写入一帧的数据 public void frameMsg(byte[] message, OutputStream out) throws IOException { // 由于用两个字节,因此消息的长度不能超过`65535`。 // 注意该值太大而不能存入一个short型整数中,因此每次只写一个字节 if (message.length > MAXMESSAGELENGTH) { throw new IOException("message too long"); } // write length prefix // 添加长度信息(无符号short型整数)前缀,输出消息的字节数。 out.write((message.length >> BYTESHIFT) & BYTEMASK); out.write(message.length & BYTEMASK); // 输出信息 out.write(message); out.flush(); } // 读取一帧的数据 public byte[] nextMsg() throws IOException { int length; try { // 取两个字节,将它们作为big-endian整数进行解释,并以int型返回 length = in.readUnsignedShort(); // read 2 bytes } catch (EOFException e) { // no (or 1 byte) message return null; } // 0 <= length <= 65535 byte[] msg = new byte[length]; // 阻塞等待,直到接收到足够的字节来填满指定的数组 in.readFully(msg); // if exception, it's a framing error. // 以字节的形式返回消息 return msg; } }
Java特定编码
当使用套接字时需要同时创建通信信道两端的程序,这种情况下你也拥有了协议的完全 控制权。这时你可以选择实现特定的协议进行通信。在以下的情况下:
- 通信双方都使用Java实现
- 你拥有对协议的完全控制权
那么就可以使用Java的内置工具如Serializable
接口或远程方法调用(Remote Method
Invocation,RMI)工具。RMI能够调用不同Java虚拟机上的方法,并隐藏了所有繁琐的
参数编码解码细节。序列化(Serialization)处理了将实际的Java对象转换成字节序列的
工作,因此你可以在不同虚拟机之间传递Java对象实例。
但是在实际中,由于种种原因,它们并不总是最好的解决方案:
- 首先,由于它们都是很笼统的工具,因而在通信开销上不能做到最高效。例如,一个对象的 序列化形式,其包含的信息在Java虚拟机(JVM)环境以外是毫无意义的。
-
其次,
Serializable
和Externalizable
接口不能用于已经定义了不同传输格式的 情况:如一个标准的协议。 - 最后,用户自定义的类必须自己实现它们的序列化接口,而这项工作很容易出错。
再强调一次,在某些情况下,这些Java的内置工具的确很有用,但是有些时候, 「实现你自己的方法」可能更简单、容易或更有效。
构建和解析协议消息
本章结束时我们再看一个简单的例子,对在实现别人定义的协议时可能用到的技术进行了介绍。这个例子程序是一个简单的"投票"协议,如图3.2所示。这里,一个客户端向服务器发送了一个请求消息,消息中包含了一个候选人ID,范围是0至1000。
- Vote Reques:投票请求
- Candidate:候选人
- Vote Count:选票总数
程序支持两种请求:
- 一种是查询(inquiry),即向服务器询问给定候选人当前获得的投票总数。服务器 发回一个响应消息,包含了原来的候选人ID和该候选人当前(查询请求收到时)获得的 选票总数。
- 另一种是投票(voting)请求,即向指定候选人投一票。服务器对这种请求也发回 响应消息,包含了候选人ID和其获得的选票数(包括了刚投的一票)。
用一个类来表示客户端和服务器端的两种消息VoteMsg.java
:
public class VoteMsg { // 其值为true时表示该消息是查询请求(为false时表示该消息是投票信息); private boolean isInquiry; // true if inquiry; false if vote // 指示该消息是响应(由服务器发送)还是请求; private boolean isResponse; // true if response from server private int candidateID; // 候选人的ID in [0,1000] // 查询的候选人获得的总选票数。 // voteCount在响应消息中只能是一个非零值(isResponse为true)。 // voteCount 不能为负数。 private long voteCount; // nonzero only in response public static final int MAX_CANDIDATE_ID = 1000; public VoteMsg(boolean isResponse, boolean isInquiry, int candidateID, long voteCount) throws IllegalArgumentException { // check invariants if (voteCount != 0 && !isResponse) { throw new IllegalArgumentException("Request vote count must be zero"); } if (candidateID < 0 || candidateID > MAX_CANDIDATE_ID) { throw new IllegalArgumentException("Bad Candidate ID: " + candidateID); } if (voteCount < 0) { throw new IllegalArgumentException("Total must be >= zero"); } this.candidateID = candidateID; this.isResponse = isResponse; this.isInquiry = isInquiry; this.voteCount = voteCount; } public void setInquiry(boolean isInquiry) { this.isInquiry = isInquiry; } public void setResponse(boolean isResponse) { this.isResponse = isResponse; } public boolean isInquiry() { return isInquiry; } public boolean isResponse() { return isResponse; } public void setCandidateID(int candidateID) throws IllegalArgumentException { if (candidateID < 0 || candidateID > MAX_CANDIDATE_ID) { throw new IllegalArgumentException("Bad Candidate ID: " + candidateID); } this.candidateID = candidateID; } public int getCandidateID() { return candidateID; } public void setVoteCount(long count) { if ((count != 0 && !isResponse) || count < 0) { throw new IllegalArgumentException("Bad vote count"); } voteCount = count; } public long getVoteCount() { return voteCount; } public String toString() { String res = (isInquiry ? "inquiry" : "vote") + " for candidate " + candidateID; if (isResponse) { res = "response to " + res + " who now has " + voteCount + " vote(s)"; } return res; } }
现在我们有了一个用Java表示的投票消息,还需要根据一定的协议来对其进行编码和解码。
VoteMsgCoder.java
接口提供了对投票消息进行序列化和反序列化的方法。
public interface VoteMsgCoder { // 序列化 byte[] toWire(VoteMsg msg) throws IOException; // 反序列化 VoteMsg fromWire(byte[] input) throws IOException; }
基于文本的表示方法
用文本方式编码的版本使用US-ASCII字符集。消息的开头是一个「魔术字符串」Voting
。
具体各个字段:
-
'v'
表示投票消息 -
'i'
表示查询消息。 -
'R'
表示消息的状态,即是否为服务器的响应, -
状态标记后面是候选人
ID
,其后跟的是选票总数,它们都编码成十进制字符串。
VoteMsgTextCoder
类提供了一种基于文本的VoteMsg编码方法。
public class VoteMsgTextCoder implements VoteMsgCoder { // Wire Format "VOTEPROTO" <"v" | "i"> [<RESPFLAG>] <CANDIDATE> [<VOTECNT>] // Charset is fixed by the wire format. // Manifest constants for encoding public static final String MAGIC = "Voting"; public static final String VOTESTR = "v"; public static final String INQSTR = "i"; public static final String RESPONSESTR = "R"; public static final String CHARSETNAME = "US-ASCII"; public static final String DELIMSTR = " "; public static final int MAX_WIRE_LENGTH = 2000; // 简单地创建一个字符串,该字符串中包含了消息的所有字段,并由空白符隔开 public byte[] toWire(VoteMsg msg) throws IOException { String msgString = MAGIC + DELIMSTR + (msg.isInquiry() ? INQSTR : VOTESTR) + DELIMSTR + (msg.isResponse() ? RESPONSESTR + DELIMSTR : "") + Integer.toString(msg.getCandidateID()) + DELIMSTR + Long.toString(msg.getVoteCount()); byte data[] = msgString.getBytes(CHARSETNAME); return data; } public VoteMsg fromWire(byte[] message) throws IOException { // 就使用Scanner实例,根据空白符一个一个地获取字段。 // 消息的字段数与其是请求消息(由客户端发送)还是响应消息(由服务器发送) // 有关。如果输入流提前结束或格式错误将抛出一个异常。 ByteArrayInputStream msgStream = new ByteArrayInputStream(message); Scanner s = new Scanner(new InputStreamReader(msgStream, CHARSETNAME)); boolean isInquiry; boolean isResponse; int candidateID; long voteCount; String token; try { token = s.next(); // 方法首先检查"魔术"字符串, // 如果在消息最前面没有魔术字符串,则抛出一个异常。 if (!token.equals(MAGIC)) { throw new IOException("Bad magic string: " + token); } token = s.next(); if (token.equals(VOTESTR)) { isInquiry = false; } else if (!token.equals(INQSTR)) { throw new IOException("Bad vote/inq indicator: " + token); } else { isInquiry = true; } token = s.next(); if (token.equals(RESPONSESTR)) { isResponse = true; token = s.next(); } else { isResponse = false; } // Current token is candidateID // Note: isResponse now valid candidateID = Integer.parseInt(token); if (isResponse) { token = s.next(); voteCount = Long.parseLong(token); } else { voteCount = 0; } } catch (IOException ioe) { throw new IOException("Parse error..."); } return new VoteMsg(isResponse, isInquiry, candidateID, voteCount); } }
二进制表示方法
下面我们将展示二进制格式对投票协议消息进行编码的方法。
与基于文本的格式相反,二进制格式使用固定大小的消息:
-
每条消息由一个特殊字节开始,该字节的最高六位为一个「魔术值」
010101
。 -
该字节的最低两位对两个布尔值进行了编码。消息的第二个字节总是
0
, -
第三、第四个字节包含了
candidateID
值。 - 只有响应消息的最后8个字节才包含了选票总数信息。
/* Wire Format * 1 1 1 1 1 1 * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 * +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ * | Magic |Flags| ZERO | * +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ * | Candidate ID | * +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ * | | * | Vote Count (only in response) | * | | * | | * +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ */ public class VoteMsgBinCoder implements VoteMsgCoder { // manifest constants for encoding public static final int MIN_WIRE_LENGTH = 4; public static final int MAX_WIRE_LENGTH = 16; public static final int MAGIC = 0x5400; public static final int MAGIC_MASK = 0xfc00; public static final int MAGIC_SHIFT = 8; public static final int RESPONSE_FLAG = 0x0200; public static final int INQUIRE_FLAG = 0x0100; public byte[] toWire(VoteMsg msg) throws IOException { // 创建了一个ByteArrayOutputStream // 并将其包裹在一个DataOutputStream中来接收结果。 ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); DataOutputStream out = new DataOutputStream(byteStream); // converts ints // 使用按位或操作,使用1位对每个布尔值进行编码。 short magicAndFlags = MAGIC; if (msg.isInquiry()) { magicAndFlags |= INQUIRE_FLAG; } if (msg.isResponse()) { magicAndFlags |= RESPONSE_FLAG; } out.writeShort(magicAndFlags); // We know the candidate ID will fit in a short: it's > 0 && < 1000 out.writeShort((short) msg.getCandidateID()); if (msg.isResponse()) { out.writeLong(msg.getVoteCount()); } out.flush(); byte[] data = byteStream.toByteArray(); return data; } public VoteMsg fromWire(byte[] input) throws IOException { // sanity checks if (input.length < MIN_WIRE_LENGTH) { throw new IOException("Runt message"); } ByteArrayInputStream bs = new ByteArrayInputStream(input); DataInputStream in = new DataInputStream(bs); int magic = in.readShort(); if ((magic & MAGIC_MASK) != MAGIC) { throw new IOException("Bad Magic #: " + ((magic & MAGIC_MASK) >> MAGIC_SHIFT)); } boolean resp = ((magic & RESPONSE_FLAG) != 0); boolean inq = ((magic & INQUIRE_FLAG) != 0); int candidateID = in.readShort(); if (candidateID < 0 || candidateID > 1000) { throw new IOException("Bad candidate ID: " + candidateID); } long count = 0; if (resp) { count = in.readLong(); if (count < 0) { throw new IOException("Bad vote count: " + count); } } // Ignore any extra bytes return new VoteMsg(resp, inq, candidateID, count); } }
发送和接收
投票服务要有以下功能:
- 维护一个候选人ID与其获得选票数的映射,
- 记录提交的投票,
- 根据其获得的选票数,对查询指定的候选人和为其投票的消息做出响应。
首先,我们实现一个投票服务器所用到的服务。当接收到投票消息时,投票服务器将调用
VoteService
类的handleRequest()
方法对请求进行处理。
public class VoteService { // Map of candidates to number of votes // 创建候选人ID与选票数量的映射 private Map<Integer, Long> results = new HashMap<Integer, Long>(); public VoteMsg handleRequest(VoteMsg msg) { // 如果投票消息已经是一个响应信息,则直接发回而不对其进行处理和修改。 // 否则,对其响应消息标志进行设置。 if (msg.isResponse()) { // If response, just send it back return msg; } msg.setResponse(true); // Make message a response // 根据候选人ID从映射中获取其获得的选票总数 // Get candidate ID and vote count int candidate = msg.getCandidateID(); Long count = results.get(candidate); if (count == null) { count = 0L; // Candidate does not exist } // 有新的投票更新选票总数 if (!msg.isInquiry()) { results.put(candidate, ++count); // If vote, increment count } msg.setVoteCount(count); return msg; } }
TCP版本Client
下面我们将展示如何实现一个TCP投票客户端,该客户端通过TCP套接字连接到投票服务器, 在一次投票后发送一个查询请求,并接收查询和投票结果。
public class VoteClientTCP { public static final int CANDIDATEID = 888; public static void main(String args[]) throws Exception { if (args.length != 2) { // Test for correct # of args throw new IllegalArgumentException("Parameter(s): <Server> <Port>"); } String destAddr = args[0]; // Destination address int destPort = Integer.parseInt(args[1]); // Destination port // 创建套接字,获取输出流 Socket sock = new Socket(destAddr, destPort); OutputStream out = sock.getOutputStream(); // 创建二进制编码器和基于长度的成帧器 // Change Bin to Text for a different framing strategy VoteMsgCoder coder = new VoteMsgBinCoder(); // Change Length to Delim for a different encoding strategy Framer framer = new LengthFramer(sock.getInputStream()); // 创建和发送消息: // Create an inquiry request (2nd arg = true) VoteMsg msg = new VoteMsg(false, true, CANDIDATEID, 0); byte[] encodedMsg = coder.toWire(msg); // Send request System.out.println("Sending Inquiry (" + encodedMsg.length + " bytes): "); System.out.println(msg); framer.frameMsg(encodedMsg, out); // Now send a vote msg.setInquiry(false); encodedMsg = coder.toWire(msg); System.out.println("Sending Vote (" + encodedMsg.length + " bytes): "); framer.frameMsg(encodedMsg, out); // 获取和解析响应 // Receive inquiry response encodedMsg = framer.nextMsg(); msg = coder.fromWire(encodedMsg); System.out.println("Received Response (" + encodedMsg.length + " bytes): "); System.out.println(msg); // Receive vote response msg = coder.fromWire(framer.nextMsg()); System.out.println("Received Response (" + encodedMsg.length + " bytes): "); System.out.println(msg); sock.close(); } }
由于TCP协议是一个基于流的服务,我们需要提供字节的帧。在此,我们使用
LengthFramer
类,它为每条消息添加一个长度前缀。注意,我们只需要改变具体的类,
就能方便地转换成基于定界符的成帧方法和基于文本的编码方式,这里将VoteMsgCoder
和Framer
换成VoteMsgTextCoder
和DelimFramer
即可。
TCP版本Server
下面我们示范TCP版本的投票服务器。该服务器反复地接收新的客户端连接,
并使用VoteService
类为客户端的投票消息作出响应。
public class VoteServerTCP { public static void main(String args[]) throws Exception { if (args.length != 1) { // Test for correct # of args throw new IllegalArgumentException("Parameter(s): <Port>"); } int port = Integer.parseInt(args[0]); // Receiving Port ServerSocket servSock = new ServerSocket(port); // 为服务器端建立编码器和投票服务 // Change Bin to Text on both client and server // for different encoding VoteMsgCoder coder = new VoteMsgBinCoder(); VoteService service = new VoteService(); // 循环,接收和处理客户端连接 while (true) { Socket clntSock = servSock.accept(); System.out.println("Handling client at " + clntSock.getRemoteSocketAddress()); // Change Length to Delim for a different framing strategy // 为客户端创建成帧器:第28行 Framer framer = new LengthFramer(clntSock.getInputStream()); try { // 从客户端获取消息并对其解码 byte[] req; // 反复地向成帧器发送获取下一条消息的请求, // 直到其返回null,即指示了消息的结束。 while ((req = framer.nextMsg()) != null) { System.out.println("Received message (" + req.length + " bytes)"); VoteMsg responseMsg = service.handleRequest(coder.fromWire(req)); framer.frameMsg(coder.toWire(responseMsg), clntSock.getOutputStream()); } } catch (IOException ioe) { System.err.println("Error handling client: " + ioe.getMessage()); } finally { System.out.println("Closing connection"); clntSock.close(); } } } }
UDP版本的Client
UDP版本的投票客户端与TCP版本非常相似。在UDP客户端中我们不需要使用成帧器, UDP协议为我们维护了消息的边界信息。对于UDP协议使用基于文本的编码方式对消息进行 编码,不过只要客户端与服务器能达成一致,也能够很方便地改成其他编码方式。
public class VoteClientUDP { public static void main(String args[]) throws IOException { if (args.length != 3) { // Test for correct # of args throw new IllegalArgumentException( // "Parameter(s): <Destination>" + " <Port> <Candidate#>"); } // Destination addr InetAddress destAddr = InetAddress.getByName(args[0]); // Destination port int destPort = Integer.parseInt(args[1]); // 0 <= candidate <= 1000 req'd int candidate = Integer.parseInt(args[2]); // UDP socket for sending // 设置DatagramSocket 和连接: // 通过调用connect()方法,我们不必 // * 为发送的每个数据报文指定远程地址和端口, // * 也不必测试接收到的每个数据报文的源地址。 DatagramSocket sock = new DatagramSocket(); sock.connect(destAddr, destPort); // 这次使用的是文本编码器,但我们也可以很容易地换成二进制编码器。 // 注意这里我们不需要成帧器,因为只要每次发送都只有一个投票消息, // UDP协议就已经为我们保留了边界信息。 // Create a voting message (2nd param false = vote) VoteMsg vote = new VoteMsg(false, false, candidate, 0); // Change Text to Bin here for a different coding strategy VoteMsgCoder coder = new VoteMsgTextCoder(); // Send request byte[] encodedVote = coder.toWire(vote); System.out.println("Sending Text-Encoded Request (" + encodedVote.length + " bytes): "); System.out.println(vote); DatagramPacket message = new DatagramPacket(encodedVote,encodedVote.length); sock.send(message); // 接收解码和打印服务器响应信息:第36-45行 // Receive response // // 在创建DatagramPacket时,我们需要知道消息的最大长度,以避免数据被截断。 // 当然,在对数据报文进行解码时,我们只使用数据报文中包含的实际字节, // 因此调用了`Arrays.copyOfRange()`方法来复制返回的数据报文中数组的子序列。 message = new DatagramPacket( new byte[VoteMsgTextCoder.MAX_WIRE_LENGTH], VoteMsgTextCoder.MAX_WIRE_LENGTH); sock.receive(message); encodedVote = Arrays.copyOfRange(message.getData(), 0, message.getLength()); System.out.println("Received Text-Encoded Response (" + encodedVote.length + " bytes): "); vote = coder.fromWire(encodedVote); System.out.println(vote); } }
UDP版Server
最后是UDP投票服务器,同样,也与TCP版本非常相似。
public class VoteServerUDP { public static void main(String[] args) throws IOException { if (args.length != 1) { // Test for correct # of args throw new IllegalArgumentException("Parameter(s): <Port>"); } int port = Integer.parseInt(args[0]); // Receiving Port DatagramSocket sock = new DatagramSocket(port); // Receive socket // 为服务器创建接收缓存区,编码器,以及投票服务。 byte[] inBuffer = new byte[VoteMsgTextCoder.MAX_WIRE_LENGTH]; // Change Bin to Text for a different coding approach VoteMsgCoder coder = new VoteMsgTextCoder(); VoteService service = new VoteService(); // 循环接收和处理客户端的投票消息: while (true) { // 为接收数据报文创建DatagramPacket: // 在每次迭代中将数据区重置为输入缓存区。 DatagramPacket packet = new DatagramPacket(inBuffer, inBuffer.length); // 接收数据报文,抽取数据: sock.receive(packet); byte[] encodedMsg = Arrays.copyOfRange(packet.getData(), 0, packet.getLength()); System.out.println("Handling request from " + packet.getSocketAddress() + " (" + encodedMsg.length + " bytes)"); // 解码和处理请求 try { VoteMsg msg = coder.fromWire(encodedMsg); msg = service.handleRequest(msg); // 服务将响应返回给消息。 // 编码并发送响应消息: packet.setData(coder.toWire(msg)); System.out.println("Sending response (" + packet.getLength() + " bytes):"); System.out.println(msg); sock.send(packet); } catch (IOException ioe) { System.err.println("Parse error in message: " + ioe.getMessage()); } } } }