Jade Dungeon

Java TCP/IP Socket 编程

第三章:发送和接收数据

TCP/IP协议的唯一约束是:

  • 信息必须在块(chunks)中发送和接收
  • 而块的长度必须是8位的倍数,因此,我们可以认为在TCP/IP协议中传输的信息是字节序列。

可以进一步把传输的信息看作数字序列或数组,每个数字的取值范围是0到255。即8位编码 的00000000 ~ 11111111

一个程序使用套接字与其他程序交换信息,通常符合下面两种情况之一:

  • 设计和编写了套接字的客户端和服务器端,这样能够随心所欲地定义自己的应用程序协议;
  • 要么是实现了一个已经存在的协议,或许是一个协议标准。

信息编码

简单数据类型如intlongcharString等通过套接字发送和接收。传输信息时 可以:

  • 通过套接字将字节信息写入一个已经关联Socket的OutputStream实例中。
  • 将其封装进一个DatagramPacket实例中由DatagramSocket发送。

这些操作所能处理的唯一数据类型是字节和字节数组。Java的内置工具能把intString等其他数据类型显式转换成字节数组,如:String.getBytes()

基本整型

TCP和UDP套接字使我们能发送和接收范围在0 ~ 255之间的字节序列数组。Java程序中:

  • int数据类型由32位表示,因此可以使用4个字节来传输任意的int型变量或常量;
  • short数据类型由16位表示,传输short类型的数据只需要两个字节
  • 同理传输64位的long类型数据则需要8个字节。

例:考虑如何对一个包含了4个整数的序列进行编码:一个byte型,一个short型, 一个int型,以及一个long型,按照这个顺序从发送者传输到接收者。我们总共需要 15个字节:

  • 第一个字节存放byte型数据,
  • 接下来两个字节存放short型数据,
  • 再后面4个字节存放int型数据,
  • 最后8个字节存放long型数据,

如下所示:

+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
byte| short |<---- int ---->|<---         long          --->|

大端与小端

对于需要超过一个字节来表示的数据类型,必须确定字节的发送顺序。显然有两种选择:

  • 从整数的右边开始,由低位到高位地发送,即little-endian顺序;
  • 从左边开始,由高位到低位发送,即big-endian顺序。

注意, 同一个字节中,位的顺序在实现时是以标准的方式处理的。

例:考虑长整型数123456787654321L,以64位十六进制形式表示为0x0000704885F926B1

big-endian顺序,十进制数值序列:

00 00 70 48 85 F9 26 B1
0 0 112 72 133 249 38 177

little-endian顺序,十进制数组序列:

B1 26 F9 85 48 70 00 00
177 38 249 133 72 112 0 0

Java中的ByteOrder类中有两个常量:BIG_ENDIANLITTLE_ENDIAN来表示大端与小端。

数值的符号位

传输的数值是有符号的(signed)还是无符号的(unsigned)。Java中的四种基本整型都是 有符号的,它们的值以二进制补码(two's-complement)的方式存储。

  • 有符号数值用二进制补码的形式表示负整数。在处理有\(k\)位的有符号数时, \(-n\)(其中\( n \in [1, 2^{k-1}]\)),则补码的二进制值就为\(2^k-n\)。
    • 非负整数\(p\)(其中\(p \in [0, 2^{k-1}-1]\)),只是简单地用\(k\)位二进制数来表示\(p\)的值。
    • 因此,对于给定的\(k\)位,我们可以通过二进制补码来表示\(-2^{k-1}\)到\(2^{k-1}-1\)范围 的值。
    • 注意,最高位(msb)标识了该数是正数(msb = 0)还是负数(msb = 1)。
  • 无符号(unsigned)编码,\(k\)位可以直接表示\(0\)到\(2^k-1\)之间的数值。例如,32位 数值0xffffffff(所有位全为1),将其解析为有符号数时,二进制补码整数表示\(-1\); 将其解析为无符号数时,它表示\(4294967295\)。

由于Java并不支持无符号整型,如果要在Java中编码和解码无符号数,则需要额外的工作。 在此假设我们处理的都是有符号整数数据。

移位和屏蔽符号位的例子

使用「位操作(bit-diddling)」(移位和屏蔽)来显式编码进行介绍。示例程序 BruteForceCoding.java

  • 发送时用encodeIntBigEndian()对基本类型进行编码。
  • 接收端decodeIntBigEndian()进行解码到一个Java的long型整数。

// define how many byte every type use
public static final int  BYTE_SIZE =    Byte.SIZE / Byte.SIZE;
public static final int SHORT_SIZE =   Short.SIZE / Byte.SIZE;
public static final int   INT_SIZE = Integer.SIZE / Byte.SIZE;
public static final int  LONG_SIZE =    Long.SIZE / Byte.SIZE;

public static void main(String[] args) {
	// prepare data for test
	byte   byteVal = 101;
	short shortVal = 10001;
	int     intVal = 100000001;
	long   longVal = 1000000000001L;

	System.out.printf("       In Logic: %02X %04X %08X %016X \n",
			byteVal, shortVal, intVal, longVal);

	// Encode the fields in the target byte array
	// stream of byte code
	byte[] byteStream = 
			new byte[BYTE_SIZE + SHORT_SIZE + INT_SIZE + LONG_SIZE];
	// encode the datas
	int offset = 0;
	offset = encodeIntBigEndian(byteStream, offset,  byteVal,  BYTE_SIZE);
	offset = encodeIntBigEndian(byteStream, offset, shortVal, SHORT_SIZE);
	offset = encodeIntBigEndian(byteStream, offset,   intVal,   INT_SIZE);
	offset = encodeIntBigEndian(byteStream, offset,  longVal,  LONG_SIZE);
	// print the stream in hex
	System.out.printf("Encoded message: %s\n",
			transByteStreamToHexString(byteStream));

	// Decode several fields
	long value = 0L;
	offset = 0;

	value = decodeIntBigEndian(byteStream, offset, BYTE_SIZE);
	System.out.println("Decoded  byte = " + value);
	offset += BYTE_SIZE;

	value = decodeIntBigEndian(byteStream, offset, SHORT_SIZE);
	System.out.println("Decoded short = " + value);
	offset += SHORT_SIZE;

	value = decodeIntBigEndian(byteStream, offset, INT_SIZE);
	System.out.println("Decoded   int = " + value);
	offset += INT_SIZE;

	value = decodeIntBigEndian(byteStream, offset, LONG_SIZE);
	System.out.println("Decoded  long = " + value);
	offset += LONG_SIZE;

	// Demonstrate dangers of conversion
	offset = 4;
	value = decodeIntBigEndian(byteStream, offset, BYTE_SIZE);
	System.out.printf("Decoded value (offset %d, size %d) = %d\n", 
			offset, BYTE_SIZE, value);
	byte bVal = (byte) decodeIntBigEndian(byteStream, offset, BYTE_SIZE);
	System.out.printf("Same value as byte = %d\n", bVal);
}


// 把长度为size个字节的变量long存入数组dst,dst前offset个字节已经存放了
// 其他内容了。
// @param byteStream 存放消息的数组
// @param offset 存放的起始位置
// @param data 要存放的值
// @param dataSize 存放的值占几个字节
// @return 存放后数组被使用了多少个字节(新的offset)
public static int encodeIntBigEndian(byte[] byteStream, int offset,
		long data, int dataSize) 
{
	// Warning: Untested preconditions (e.g., 0 <= dataSize <= 8)
	for (int i = 0; i < dataSize; i++) {
		// 先通过位移把要取的那8位移到最低,
		// 通过强制转换把最低的8位转为一个byte。
		// 
		// 赋值语句的右边,首先将数值向右移动,
		// 以使我们需要的字节处于该数值的低8位中。然后,将移位后的数转换成byte型,
		// 并存入字节数组的适当位置。在转换过程中,除了低8位以外,其他位都将丢弃。
		// 这个过程将根据给定数值所占字节数迭代进行。
		// 该方法还将返回存入数值后字节数组中新的偏移位置,
		// 因此我们不必做额外的工作来跟踪偏移量。 
		byteStream[offset++] = (byte) (data >> ((dataSize - i - 1) * Byte.SIZE));
	}
	return offset;
}


// 从数组中取出数据
// @param byteStream 存放消息的数组
// @param offset 存放的起始位置
// @param dataSize 存放的值占几个字节
// @return 读出的值
public static long decodeIntBigEndian(byte[] byteStream, int offset,//
		int dataSize) 
{
	// Warning: Untested preconditions (e.g., 0 <= dataSize <= 8)
	long data = 0;
	// 根据给定数组的字节大小进行迭代,通过每次迭代的左移操作,
	// 将所取得字节的值累积到一个long型整数中。 
	for (int i = 0; i < dataSize; i++) {
		// 先把当前的内容左移一个字节
		// 空出的字节中填入流中指定位置的字节
		data = (data << Byte.SIZE) | ((long) byteStream[offset + i] & 0xFF);
	}
	return data;
}

// 用十进制显示数组中的每个字节
// @param byteStream 要显示的数组
// @return 显示的字符串
public static String transByteStreamToHexString(byte[] byteStream) {
	StringBuilder rtn = new StringBuilder();
	for (byte b : byteStream) {
		// 用十六进制字符串显示一个字节的内容
		// 位与`0xFF`只取最后两个字节
		rtn.append(String.format("%02X", b & 0xFF)).append(" ");
	}
	return rtn.toString();
}

在字节数组偏移量为4的位置,该字节的十进制值是245,然而,当将其作为一个有符号字节 读取时,其值则为-11。如果我们将返回值直接存入一个long型整数,它只是简单地变成 这个long型整数的最后一个字节,值为245。如果将返回值放入一个字节型整数,其值则为 -11。到底哪个值正确取决于你的应用程序。如果你从N个字节解码后希望得到一个有符号的 数值,就必须将解码结果(长的结果)存入一个刚好占用N个字节的基本整型中。如果你 希望得到一个无符号的数组,就必须将解码结果存入更长的基本整型中,该整型至少要占用 N+1个字节。

注意,在encodeIntBigEndian()decodeIntBigEndian()方法的开始部分,我们可能 需要做一些前提条件检测,如\(0 \le size \le 8\)和\(dst \neq null\)等。你能举出需要做的其他前期检测吗?

以上程序中数组内的数据如下:

  byte short int long
HEX 65 27 11 05 F5 E1 01 00 00 00 E8 D4 A5 10 01
DEC 101 39 17 5 245 225 1 0 0 0 232 212 165 16 1

使用IO流

构建本例中的消息的一个相对简单的方法是使用DataOutputStream类和 ByteArrayOutputStream类。

  • DataOutputStream类允许你将基本数据类型,如上述整型,写入一个流中:它提供了 writeInt()wirteByte()等方法按照big-endian顺序把数据二进制补码写到流中。
  • ByteArrayOutputStream()类获取写到流中的字节序列,并将其转换成一个字节数组。

下面的代码和BruteForceEncoding.java的效果一样:

ByteArrayOutputStream buf = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(buf);
try {
	out.writeByte(byteVal);
	out.writeShort(shortVal);
	out.writeInt(intVal);
	out.writeLong(longVal);
	out.flush();
} catch (IOException e) {
	e.printStackTrace();
}
System.out.printf("Encoded message: %s\n",
		transByteStreamToHexString(buf.toByteArray()));

类似在接收时可以用DataInputStream类和ByteArrayInputStream类。

字符串和文本

Java使用Unicode编码字符集表示char型和String型值。

StringgetBytes()方法返回的数组根据平台默认字符集(default charset)。很多 平台的默认字符集都是UTF-8,然而一些地区情况有所不同。保证特定编码要将字符集 名字作为参数传递给getBytes()方法。

例如:"Test!".getBytes()

+-------+-------+-------+-------+-------+
|     84|    101|    115|    116|     33|
+-------+-------+-------+-------+-------+

"Test!".getBytes("UTF-16BE"),每个值被编码成了两个字节的序列,高位在前;

+---+---+---+---+---+---+---+---+---+---+
|  0| 84|  0|101|  0|115|  0|116|  0| 33|
+---+---+---+---+---+---+---+---+---+---+

调用"Test!".getBytes("IBM037"),返回结果将是:

+-------+-------+-------+-------+-------+
|    227|    133|    162|    163|     90|
+-------+-------+-------+-------+-------+

上面的例子说明,发送者和接收者必须在文本字符串的表示方式上达成共识。 最简单的方法就是定义一个标准字符集。

位操作:布尔值编码

位图(Bitmaps)是对布尔信息进行编码的一种非常紧凑的方式,通常用在协议中。 掩码(mask)是一个的整数值,其中有一位或多位被设为1,其他各位被清空(即,设为0)。 在这里我们处理的是int大小的位图和掩码(32位),但这些方法对其他类型的整数也同样适用。

我们将int中的各位从0到31进行编号,其中0代表最低位。一般来说,如果一个int值在 第i位值为1,其他位都为0的话,该int型整数的值就是\(2^i\)。因此编号为5的位表示32, 编号为12的位表示4096,等等。这里有一些掩码声明的例子:

final int BIT5      = (1<<5); 
final int BIT7      = 0x80; 
final int BITS2AND3 = 12; // 8+4 
int bitmap = 1234567; 

要设置int变量中的特定一位,需要将该int值与特定位对应的掩码进行按位或 (bitwise-OR)操作(|),通过掩码指定位置1或置0

bitmap |=   BIT5;            // bit 5 is now one 

要清空特定一位,则将该整数与特定所对应的掩码的按位补码(特定位为0,其他位为1) 进行按位与(bitwise-AND)操作。Java中的按位与操作符是&,按位补码操作符是~

bitmap &=  ~BIT7;            // bit 7 is now zero 

也可以通过将相应的所有掩码进行按位或操作,一次设置和清空多位:

bitmap &= ~(BITS2AND3|BIT5); // clear bits 2, 3 and 5 

要测试一个整数的特定位是否已经被设置,可以将该整数与特定位对应的掩码进行按位与 &,并将操作结果与0比较:

boolean bit6Set = (bitmap & (1<<6)) != 0; 

组合输入输出流

Java中与流相关的类可以组合起来从而提供强大的功能。例如:

  • 将一个Socket实例的 OutputStream包装在一个BufferedOutputStream实例中, 这样可以先将字节暂时缓存在一起,然后再一次全部发送到底层的通信信道中以提高 程序的性能。
  • 再将这个BufferedOutputStream实例包裹在一个DataOutputStream实例中,以实现 发送基本数据类型的功能。以下是实现这种组合的代码:
Socket socket = new Socket(server, port); 
DataOutputStream out = new DataOutputStream( 
new BufferedOutputStream(socket.getOutputStream())); 

图 3.1:流组合

Java基本输入输出类:

输入输出 类型
Buffered[Input/Output]Stream 通过缓存性能优化
Checked[Input/Output]Stream 维护数据检查
Cipher[Input/Output]Stream 加密/解密
Data[Input/Output]Stream 数据处理
Digest[Input/Output]Stream 维护数据摘要
GZIP[Input/Output]Stream 压缩/解压缩
Object[Input/Output]Stream 数据处理
PushbackInputStream 允许一个字节或是字节是「末读的」
PrintOutputStream 输出数据类型的字符串表示法
Zip[Input/Output]Stream 以zip格式

成帧与解析

数据转换成在线路上传输的格式只完成了一半工作,应用程序协议必须指定消息的接收者 如何确定何时消息已完整接收。成帧(Framing)技术则解决了接收端如何定位消息的 首尾位置的问题。

如果在一个DatagramPacket中发送数据就有一个确定的长度,接收者能够准确地知道消息 的结束位置。

如果通过TCP套接字来发送消息,情况将变得更复杂,因为TCP协议中没有消息边界的概念。 如果一个消息中的所有字段都有固定的长度,同时每个消息又是由固定数量的字段组成 的话,消息的长度就能够确定,接收者就可以简单地将消息长度对应的字节数读到一个 byte[]缓存区中。在TCPEchoClient.java示例程序中我们就是用的这个方法。

但是如果消息的长度是可变的(例如消息中包含了一些变长的文本字符串),就无法知道 需要读取多少字节。

如果接收者试图从套接字中读取比消息本身更多的字节,将可能发生以下两种情况之一:

  • 如果信道中没有其他消息,接收者将阻塞等待,同时无法处理接收到的消息; 如果发送者也在等待接收端的响应信息,则会形成死锁(deadlock);
  • 如果信道中还有其他消息,则接收者会将后面消息的一部分甚至全部读到第一条消息 中去,这将产生一些协议错误。因此,在使用TCP套接字时,成帧就是一个非常重要的 考虑因素。

一些相同的考虑也适用于查找消息中每个字段的边界: 接收者需要知道每个字段的结束位置和下一个字段的开始位置。因此消息成帧技术几乎 都可以应用到字段上。

然而,最简单并使代码最简洁的方法是将这两个问题分开处理: 首先定位消息的结束位置,然后将消息作为一个整体进行解析。 在此我们专注于将整个消息作为一帧进行处理。

主要有两个技术使接收者能够准确地找到消息的结束位置:

  • 基于定界符(Delimiter-based):消息的结束由一个唯一的标记(unique marker) 指出,即发送者在传输完数据后显式添加的一个特殊字节序列。这个特殊标记不能在 传输的数据中出现。
  • 显式长度(Explicit length):在变长字段或消息前附加一个固定大小的字段, 用来指示该字段或消息中包含了多少字节。

基于定界符的方法的一个特殊情况是,可以用在TCP连接上传输的最后一个消息上: 在发送完这个消息后,发送者就简单地关闭(使用shutdownOutput()close()方法) 发送端的TCP连接。接收者读取完这条消息的最后一个字节后,将接收到一个流结束标记 (即read()方法返回-1),该标记指示出已经读取到达了消息的末尾。

基于定界符的方法的缺点是消息本身不能包含有定界字符。幸运的是填充(stuffing)技术 能够对消息中出现的定界符进行修改,从而是接收者不将其识别为定界符。在接收者扫描 定界符时,还能识别出修改过的数据,并在输出消息中对其进行还原,从而使其与原始消息 一致。这个技术的缺点是发送者和接收者双方都必须扫描消息。

基于长度的方法更简单一些,不过要使用这种方法必须知道消息长度的上限。发送者先要 确定消息的长度,将长度信息存入一个整数,作为消息的前缀。消息的长度上限定义了用来 编码消息长度所需要的字节数:如果消息的长度小于256字节,则需要1个字节;如果消息的 长度小于65536字节,则需要2个字节,等等。

基于定界符的例子

定义的Framer.java接口。它有两个方法:

  • frameMsg()方法用来添加成帧信息并将指定消息输出到指定流
  • nextMsg()方法则扫描指定的流,从中抽取出下一条消息。
public interface Framer {
	
	void   frameMsg(byte[] message, OutputStream out) throws IOException;

	byte[] nextMsg() throws IOException;
	
}

DelimFramer.java类实现了基于定界符的成帧方法,其定界符为「换行」符("\n", 字节值为10)。

  • frameMethod()方法并没有实现填充,当成帧的字节序列中包含有定界符时,它只是 简单地抛出异常。(扩展该方法以实现填充功能将作为练习留给读者)
  • nextMsg()方法扫描流,直到读取到了定界符,并返回定界符前面的所有字符,如果 流为空则返回null。如果累积了一个消息的不少字符,但直到流结束也没有找到 定界符程序将抛出一个异常来指示成帧错误。
public class DelimFramer implements Framer {

	private InputStream in; // data source
	private static final byte DELIMITER = '\n'; // message delimiter

	// 获取消息的输入流作为参数传递给该函数。 
	public DelimFramer(InputStream in) {
		this.in = in;
	}

	// 输出一帧信息
	public void frameMsg(byte[] message, OutputStream out) throws IOException {
		// ensure that the message does not contain the delimiter
		// 检查消息中是否包含了定界符,如果包含,则抛出一个异常。 
		for (byte b : message) {
			if (b == DELIMITER) {
				throw new IOException("Message contains delimiter");
			}
		}
		// 将成帧的消息输出到流中。 
		out.write(message);
		out.write(DELIMITER);
		out.flush();
	}

	// 从输入中提取一帧消息
	public byte[] nextMsg() throws IOException {
		ByteArrayOutputStream messageBuffer = new ByteArrayOutputStream();
		int nextByte;

		// fetch bytes until find delimiter
		// 读取流中的每个字节,直到遇到定界符为止:第35行 
		while ((nextByte = in.read()) != DELIMITER) {
			if (nextByte == -1) { // end of stream?
				// 如果把输入流读完了都没有读到分隔符
				if (messageBuffer.size() == 0) { // if no byte read
					// 整个消息就是空的,直接反回空
					return null;
				} else { // if bytes followed by end of stream: framing error
					// 输入流里有数据但没有分隔符,那么认为是数据丢失,抛异常
					throw new EOFException("Non-empty message without delimiter");
				}
			}
			// 将无定界符的字节写入消息缓存区
			messageBuffer.write(nextByte); // write byte to buffer
		}
		// 将消息缓存区中的内容以字节数组的形式返回
		return messageBuffer.toByteArray();
	}
}

以上定界符帧有一个限制,即它不支持多字节定界符。如何对其进行修改以支持多字节 定界符将作为练习留给我们的读者。

基于长度的例子

LengthFramer.java类实现了基于长度的成帧方法,适用于长度小于65535 (\(2^{16}-1\))字节的消息。

  • 发送者首先给出指定消息的长度,并将长度信息以big-endian顺序存入两个字节的整数 中。
  • 再将这两个字节放在完整的消息内容前,连同消息一起写入输出流。

在接收端:

  • 使用DataInputStream以读取整型的长度信息;readFully()方法将阻塞等待, 直到给定的数组完全填满。值得注意的是,使用这种成帧方法,发送者不需要检查要 成帧的消息内容,而只需要检查消息的长度是否超出了限制。
public class LengthFramer implements Framer {
	public static final int MAXMESSAGELENGTH = 65535;
	public static final int BYTEMASK = 0xff;
	public static final int SHORTMASK = 0xffff;
	public static final int BYTESHIFT = 8;

	private DataInputStream in; // wrapper for data I/O

	public LengthFramer(InputStream in) throws IOException {
		this.in = new DataInputStream(in);
	}

	// 写入一帧的数据
	public void frameMsg(byte[] message, OutputStream out) throws IOException {
		// 由于用两个字节,因此消息的长度不能超过`65535`。
		// 注意该值太大而不能存入一个short型整数中,因此每次只写一个字节 
		if (message.length > MAXMESSAGELENGTH) {
			throw new IOException("message too long");
		}
		// write length prefix
		// 添加长度信息(无符号short型整数)前缀,输出消息的字节数。 
		out.write((message.length >> BYTESHIFT) & BYTEMASK);
		out.write(message.length & BYTEMASK);
		// 输出信息
		out.write(message);
		out.flush();
	}

	// 读取一帧的数据
	public byte[] nextMsg() throws IOException {
		int length;
		try {
			// 取两个字节,将它们作为big-endian整数进行解释,并以int型返回 
			length = in.readUnsignedShort(); // read 2 bytes
		} catch (EOFException e) { // no (or 1 byte) message
			return null;
		}
		// 0 <= length <= 65535
		byte[] msg = new byte[length];
		// 阻塞等待,直到接收到足够的字节来填满指定的数组
		in.readFully(msg); // if exception, it's a framing error.
		// 以字节的形式返回消息
		return msg;
	}
}

Java特定编码

当使用套接字时需要同时创建通信信道两端的程序,这种情况下你也拥有了协议的完全 控制权。这时你可以选择实现特定的协议进行通信。在以下的情况下:

  • 通信双方都使用Java实现
  • 你拥有对协议的完全控制权

那么就可以使用Java的内置工具如Serializable接口或远程方法调用(Remote Method Invocation,RMI)工具。RMI能够调用不同Java虚拟机上的方法,并隐藏了所有繁琐的 参数编码解码细节。序列化(Serialization)处理了将实际的Java对象转换成字节序列的 工作,因此你可以在不同虚拟机之间传递Java对象实例。

但是在实际中,由于种种原因,它们并不总是最好的解决方案:

  • 首先,由于它们都是很笼统的工具,因而在通信开销上不能做到最高效。例如,一个对象的 序列化形式,其包含的信息在Java虚拟机(JVM)环境以外是毫无意义的。
  • 其次,SerializableExternalizable接口不能用于已经定义了不同传输格式的 情况:如一个标准的协议。
  • 最后,用户自定义的类必须自己实现它们的序列化接口,而这项工作很容易出错。

再强调一次,在某些情况下,这些Java的内置工具的确很有用,但是有些时候, 「实现你自己的方法」可能更简单、容易或更有效。

构建和解析协议消息

本章结束时我们再看一个简单的例子,对在实现别人定义的协议时可能用到的技术进行了介绍。这个例子程序是一个简单的"投票"协议,如图3.2所示。这里,一个客户端向服务器发送了一个请求消息,消息中包含了一个候选人ID,范围是0至1000。

图 3.2:投票协议

  • Vote Reques:投票请求
  • Candidate:候选人
  • Vote Count:选票总数

程序支持两种请求:

  • 一种是查询(inquiry),即向服务器询问给定候选人当前获得的投票总数。服务器 发回一个响应消息,包含了原来的候选人ID和该候选人当前(查询请求收到时)获得的 选票总数。
  • 另一种是投票(voting)请求,即向指定候选人投一票。服务器对这种请求也发回 响应消息,包含了候选人ID和其获得的选票数(包括了刚投的一票)。

用一个类来表示客户端和服务器端的两种消息VoteMsg.java

public class VoteMsg {

	// 其值为true时表示该消息是查询请求(为false时表示该消息是投票信息); 
	private boolean isInquiry;   // true if inquiry; false if vote
	// 指示该消息是响应(由服务器发送)还是请求;
	private boolean isResponse;  // true if response from server
	private int     candidateID; // 候选人的ID in [0,1000]
	// 查询的候选人获得的总选票数。 
	// voteCount在响应消息中只能是一个非零值(isResponse为true)。 
	// voteCount 不能为负数。 
	private long    voteCount;   // nonzero only in response

	public static final int MAX_CANDIDATE_ID = 1000;

	public VoteMsg(boolean isResponse, boolean isInquiry, int candidateID,
			long voteCount) throws IllegalArgumentException 
	{
		// check invariants
		if (voteCount != 0 && !isResponse) {
			throw new IllegalArgumentException("Request vote count must be zero");
		}
		if (candidateID < 0 || candidateID > MAX_CANDIDATE_ID) {
			throw new IllegalArgumentException("Bad Candidate ID: " + candidateID);
		}
		if (voteCount < 0) {
			throw new IllegalArgumentException("Total must be >= zero");
		}
		this.candidateID = candidateID;
		this.isResponse = isResponse;
		this.isInquiry = isInquiry;
		this.voteCount = voteCount;
	}

	public void setInquiry(boolean isInquiry) {
		this.isInquiry = isInquiry;
	}

	public void setResponse(boolean isResponse) {
		this.isResponse = isResponse;
	}

	public boolean isInquiry() {
		return isInquiry;
	}

	public boolean isResponse() {
		return isResponse;
	}

	public void setCandidateID(int candidateID) throws IllegalArgumentException {
		if (candidateID < 0 || candidateID > MAX_CANDIDATE_ID) {
			throw new IllegalArgumentException("Bad Candidate ID: " + candidateID);
		}
		this.candidateID = candidateID;
	}

	public int getCandidateID() {
		return candidateID;
	}

	public void setVoteCount(long count) {
		if ((count != 0 && !isResponse) || count < 0) {
			throw new IllegalArgumentException("Bad vote count");
		}
		voteCount = count;
	}

	public long getVoteCount() {
		return voteCount;
	}

	public String toString() {
		String res = (isInquiry ? "inquiry" : "vote") + " for candidate "
				+ candidateID;
		if (isResponse) {
			res = "response to " + res + " who now has " + voteCount
					+ " vote(s)";
		}
		return res;
	}
}

现在我们有了一个用Java表示的投票消息,还需要根据一定的协议来对其进行编码和解码。 VoteMsgCoder.java接口提供了对投票消息进行序列化和反序列化的方法。

public interface VoteMsgCoder {
	// 序列化
	byte[] toWire(VoteMsg msg) throws IOException;
	// 反序列化
	VoteMsg fromWire(byte[] input) throws IOException;
}

基于文本的表示方法

用文本方式编码的版本使用US-ASCII字符集。消息的开头是一个「魔术字符串」Voting

具体各个字段:

  • 'v'表示投票消息
  • 'i'表示查询消息。
  • 'R'表示消息的状态,即是否为服务器的响应,
  • 状态标记后面是候选人ID,其后跟的是选票总数,它们都编码成十进制字符串。

VoteMsgTextCoder类提供了一种基于文本的VoteMsg编码方法。

public class VoteMsgTextCoder implements VoteMsgCoder {

	// Wire Format "VOTEPROTO" <"v" | "i"> [<RESPFLAG>] <CANDIDATE> [<VOTECNT>]
	// Charset is fixed by the wire format.

	// Manifest constants for encoding
	public static final String MAGIC       = "Voting";
	public static final String VOTESTR     = "v";
	public static final String INQSTR      = "i";
	public static final String RESPONSESTR = "R";

	public static final String CHARSETNAME  = "US-ASCII";
	public static final String DELIMSTR     = " ";
	public static final int MAX_WIRE_LENGTH = 2000;

	// 简单地创建一个字符串,该字符串中包含了消息的所有字段,并由空白符隔开
	public byte[] toWire(VoteMsg msg) throws IOException {
		String msgString = MAGIC + DELIMSTR
				+ (msg.isInquiry() ? INQSTR : VOTESTR) + DELIMSTR
				+ (msg.isResponse() ? RESPONSESTR + DELIMSTR : "")
				+ Integer.toString(msg.getCandidateID()) + DELIMSTR
				+ Long.toString(msg.getVoteCount());
		byte data[] = msgString.getBytes(CHARSETNAME);
		return data;
	}

	public VoteMsg fromWire(byte[] message) throws IOException {
		// 就使用Scanner实例,根据空白符一个一个地获取字段。
		// 消息的字段数与其是请求消息(由客户端发送)还是响应消息(由服务器发送)
		// 有关。如果输入流提前结束或格式错误将抛出一个异常。
		ByteArrayInputStream msgStream = new ByteArrayInputStream(message);
		Scanner s = new Scanner(new InputStreamReader(msgStream, CHARSETNAME));
		boolean isInquiry;
		boolean isResponse;
		int candidateID;
		long voteCount;
		String token;

		try {
			token = s.next();
			// 方法首先检查"魔术"字符串,
			// 如果在消息最前面没有魔术字符串,则抛出一个异常。
			if (!token.equals(MAGIC)) {
				throw new IOException("Bad magic string: " + token);
			}
			token = s.next();
			if (token.equals(VOTESTR)) {
				isInquiry = false;
			} else if (!token.equals(INQSTR)) {
				throw new IOException("Bad vote/inq indicator: " + token);
			} else {
				isInquiry = true;
			}

			token = s.next();
			if (token.equals(RESPONSESTR)) {
				isResponse = true;
				token = s.next();
			} else {
				isResponse = false;
			}
			// Current token is candidateID
			// Note: isResponse now valid
			candidateID = Integer.parseInt(token);
			if (isResponse) {
				token = s.next();
				voteCount = Long.parseLong(token);
			} else {
				voteCount = 0;
			}
		} catch (IOException ioe) {
			throw new IOException("Parse error...");
		}
		return new VoteMsg(isResponse, isInquiry, candidateID, voteCount);
	}
}

二进制表示方法

下面我们将展示二进制格式对投票协议消息进行编码的方法。

与基于文本的格式相反,二进制格式使用固定大小的消息:

  • 每条消息由一个特殊字节开始,该字节的最高六位为一个「魔术值」010101
  • 该字节的最低两位对两个布尔值进行了编码。消息的第二个字节总是0
  • 第三、第四个字节包含了candidateID值。
  • 只有响应消息的最后8个字节才包含了选票总数信息。
/* Wire Format
 *                                1  1  1  1  1  1
 *  0  1  2  3  4  5  6  7  8  9  0  1  2  3  4  5
 * +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+
 * |     Magic       |Flags|       ZERO            |
 * +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+
 * |                  Candidate ID                 |
 * +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+
 * |                                               |
 * |         Vote Count (only in response)         |
 * |                                               |
 * |                                               |
 * +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+
 */
public class VoteMsgBinCoder implements VoteMsgCoder {

	// manifest constants for encoding
	public static final int MIN_WIRE_LENGTH = 4;
	public static final int MAX_WIRE_LENGTH = 16;
	public static final int MAGIC           = 0x5400;
	public static final int MAGIC_MASK      = 0xfc00;
	public static final int MAGIC_SHIFT     = 8;
	public static final int RESPONSE_FLAG   = 0x0200;
	public static final int INQUIRE_FLAG    = 0x0100;

	public byte[] toWire(VoteMsg msg) throws IOException {
		// 创建了一个ByteArrayOutputStream
		// 并将其包裹在一个DataOutputStream中来接收结果。
		ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
		DataOutputStream out = new DataOutputStream(byteStream); // converts ints

		// 使用按位或操作,使用1位对每个布尔值进行编码。 
		short magicAndFlags = MAGIC;
		if (msg.isInquiry()) {
			magicAndFlags |= INQUIRE_FLAG;
		}
		if (msg.isResponse()) {
			magicAndFlags |= RESPONSE_FLAG;
		}
		out.writeShort(magicAndFlags);
		// We know the candidate ID will fit in a short: it's > 0 && < 1000
		out.writeShort((short) msg.getCandidateID());
		if (msg.isResponse()) {
			out.writeLong(msg.getVoteCount());
		}
		out.flush();
		byte[] data = byteStream.toByteArray();
		return data;
	}

	public VoteMsg fromWire(byte[] input) throws IOException {
		// sanity checks
		if (input.length < MIN_WIRE_LENGTH) {
			throw new IOException("Runt message");
		}
		ByteArrayInputStream bs = new ByteArrayInputStream(input);
		DataInputStream in = new DataInputStream(bs);
		int magic = in.readShort();
		if ((magic & MAGIC_MASK) != MAGIC) {
			throw new IOException("Bad Magic #: "
					+ ((magic & MAGIC_MASK) >> MAGIC_SHIFT));
		}
		boolean resp = ((magic & RESPONSE_FLAG) != 0);
		boolean inq = ((magic & INQUIRE_FLAG) != 0);
		int candidateID = in.readShort();
		if (candidateID < 0 || candidateID > 1000) {
			throw new IOException("Bad candidate ID: " + candidateID);
		}
		long count = 0;
		if (resp) {
			count = in.readLong();
			if (count < 0) {
				throw new IOException("Bad vote count: " + count);
			}
		}
		// Ignore any extra bytes
		return new VoteMsg(resp, inq, candidateID, count);
	}
}

发送和接收

投票服务要有以下功能:

  • 维护一个候选人ID与其获得选票数的映射,
  • 记录提交的投票,
  • 根据其获得的选票数,对查询指定的候选人和为其投票的消息做出响应。

首先,我们实现一个投票服务器所用到的服务。当接收到投票消息时,投票服务器将调用 VoteService类的handleRequest()方法对请求进行处理。

public class VoteService {

	// Map of candidates to number of votes
	// 创建候选人ID与选票数量的映射
	private Map<Integer, Long> results = new HashMap<Integer, Long>();

	public VoteMsg handleRequest(VoteMsg msg) {
		// 如果投票消息已经是一个响应信息,则直接发回而不对其进行处理和修改。
		// 否则,对其响应消息标志进行设置。 
		if (msg.isResponse()) { // If response, just send it back
			return msg;
		}
		msg.setResponse(true); // Make message a response
		
		// 根据候选人ID从映射中获取其获得的选票总数
		// Get candidate ID and vote count
		int candidate = msg.getCandidateID();
		Long count = results.get(candidate);
		if (count == null) {
			count = 0L; // Candidate does not exist
		}
		// 有新的投票更新选票总数
		if (!msg.isInquiry()) {
			results.put(candidate, ++count); // If vote, increment count
		}
		msg.setVoteCount(count);
		return msg;
	}
}

TCP版本Client

下面我们将展示如何实现一个TCP投票客户端,该客户端通过TCP套接字连接到投票服务器, 在一次投票后发送一个查询请求,并接收查询和投票结果。

public class VoteClientTCP {

	public static final int CANDIDATEID = 888;

	public static void main(String args[]) throws Exception {

		if (args.length != 2) { // Test for correct # of args
			throw new IllegalArgumentException("Parameter(s): <Server> <Port>");
		}

		String destAddr = args[0]; // Destination address
		int destPort = Integer.parseInt(args[1]); // Destination port

		// 创建套接字,获取输出流
		Socket sock = new Socket(destAddr, destPort);
		OutputStream out = sock.getOutputStream();

		// 创建二进制编码器和基于长度的成帧器
		// Change Bin to Text for a different framing strategy
		VoteMsgCoder coder = new VoteMsgBinCoder();
		// Change Length to Delim for a different encoding strategy
		Framer framer = new LengthFramer(sock.getInputStream());

		// 创建和发送消息:
		// Create an inquiry request (2nd arg = true)
		VoteMsg msg = new VoteMsg(false, true, CANDIDATEID, 0);
		byte[] encodedMsg = coder.toWire(msg);

		// Send request
		System.out.println("Sending Inquiry (" + encodedMsg.length + " bytes): ");
		System.out.println(msg);
		framer.frameMsg(encodedMsg, out);

		// Now send a vote
		msg.setInquiry(false);
		encodedMsg = coder.toWire(msg);
		System.out.println("Sending Vote (" + encodedMsg.length + " bytes): ");
		framer.frameMsg(encodedMsg, out);

		// 获取和解析响应
		// Receive inquiry response
		encodedMsg = framer.nextMsg();
		msg = coder.fromWire(encodedMsg);
		System.out.println("Received Response (" + encodedMsg.length + " bytes): ");
		System.out.println(msg);

		// Receive vote response
		msg = coder.fromWire(framer.nextMsg());
		System.out.println("Received Response (" + encodedMsg.length + " bytes): ");
		System.out.println(msg);

		sock.close();
	}
}

由于TCP协议是一个基于流的服务,我们需要提供字节的帧。在此,我们使用 LengthFramer类,它为每条消息添加一个长度前缀。注意,我们只需要改变具体的类, 就能方便地转换成基于定界符的成帧方法和基于文本的编码方式,这里将VoteMsgCoderFramer换成VoteMsgTextCoderDelimFramer即可。

TCP版本Server

下面我们示范TCP版本的投票服务器。该服务器反复地接收新的客户端连接, 并使用VoteService类为客户端的投票消息作出响应。

public class VoteServerTCP {

	public static void main(String args[]) throws Exception {

		if (args.length != 1) { // Test for correct # of args
			throw new IllegalArgumentException("Parameter(s): <Port>");
		}

		int port = Integer.parseInt(args[0]); // Receiving Port

		ServerSocket servSock = new ServerSocket(port);
		

		// 为服务器端建立编码器和投票服务
		// Change Bin to Text on both client and server
		// for different encoding
		VoteMsgCoder coder = new VoteMsgBinCoder();
		VoteService service = new VoteService();


		// 循环,接收和处理客户端连接
		while (true) {
			Socket clntSock = servSock.accept();
			System.out.println("Handling client at "
					+ clntSock.getRemoteSocketAddress());
					
			// Change Length to Delim for a different framing strategy
			// 为客户端创建成帧器:第28行 
			Framer framer = new LengthFramer(clntSock.getInputStream());
			try {
				//	从客户端获取消息并对其解码
				byte[] req;
				// 反复地向成帧器发送获取下一条消息的请求,
				// 直到其返回null,即指示了消息的结束。 
				while ((req = framer.nextMsg()) != null) {
					System.out.println("Received message (" + req.length + " bytes)");
					VoteMsg responseMsg = service.handleRequest(coder.fromWire(req));
					framer.frameMsg(coder.toWire(responseMsg), clntSock.getOutputStream());
				}
			} catch (IOException ioe) {
				System.err.println("Error handling client: " + ioe.getMessage());
			} finally {
				System.out.println("Closing connection");
				clntSock.close();
			}
		}
	}
}

UDP版本的Client

UDP版本的投票客户端与TCP版本非常相似。在UDP客户端中我们不需要使用成帧器, UDP协议为我们维护了消息的边界信息。对于UDP协议使用基于文本的编码方式对消息进行 编码,不过只要客户端与服务器能达成一致,也能够很方便地改成其他编码方式。

public class VoteClientUDP {

	public static void main(String args[]) throws IOException {

		if (args.length != 3) { // Test for correct # of args
			throw new IllegalArgumentException( //
					"Parameter(s): <Destination>" + " <Port> <Candidate#>");
		}


		// Destination addr
		InetAddress destAddr = InetAddress.getByName(args[0]);
		// Destination port
		int destPort = Integer.parseInt(args[1]);
		// 0 <= candidate <= 1000 req'd
		int candidate = Integer.parseInt(args[2]);

		// UDP socket for sending
		// 设置DatagramSocket 和连接:
		// 通过调用connect()方法,我们不必
		// * 为发送的每个数据报文指定远程地址和端口,
		// * 也不必测试接收到的每个数据报文的源地址。 
		DatagramSocket sock = new DatagramSocket();
		sock.connect(destAddr, destPort);
		

		// 这次使用的是文本编码器,但我们也可以很容易地换成二进制编码器。
		// 注意这里我们不需要成帧器,因为只要每次发送都只有一个投票消息,
		// UDP协议就已经为我们保留了边界信息。 

		// Create a voting message (2nd param false = vote)
		VoteMsg vote = new VoteMsg(false, false, candidate, 0);

		// Change Text to Bin here for a different coding strategy
		VoteMsgCoder coder = new VoteMsgTextCoder();

		// Send request
		byte[] encodedVote = coder.toWire(vote);
		System.out.println("Sending Text-Encoded Request ("
				+ encodedVote.length + " bytes): ");
		System.out.println(vote);
		DatagramPacket message = new DatagramPacket(encodedVote,encodedVote.length);
		sock.send(message);

		// 接收解码和打印服务器响应信息:第36-45行 
		// Receive response
		//
		// 在创建DatagramPacket时,我们需要知道消息的最大长度,以避免数据被截断。
		// 当然,在对数据报文进行解码时,我们只使用数据报文中包含的实际字节,
		// 因此调用了`Arrays.copyOfRange()`方法来复制返回的数据报文中数组的子序列。 
		message = new DatagramPacket(
				new byte[VoteMsgTextCoder.MAX_WIRE_LENGTH],
				VoteMsgTextCoder.MAX_WIRE_LENGTH);
		sock.receive(message);
		encodedVote = Arrays.copyOfRange(message.getData(), 0, message.getLength());

		System.out.println("Received Text-Encoded Response ("
				+ encodedVote.length + " bytes): ");
		vote = coder.fromWire(encodedVote);
		System.out.println(vote);
	}
}

UDP版Server

最后是UDP投票服务器,同样,也与TCP版本非常相似。

public class VoteServerUDP {

	public static void main(String[] args) throws IOException {

		if (args.length != 1) { // Test for correct # of args
			throw new IllegalArgumentException("Parameter(s): <Port>");
		}

		int port = Integer.parseInt(args[0]); // Receiving Port

		DatagramSocket sock = new DatagramSocket(port); // Receive socket

		//	为服务器创建接收缓存区,编码器,以及投票服务。 
		byte[] inBuffer = new byte[VoteMsgTextCoder.MAX_WIRE_LENGTH];
		// Change Bin to Text for a different coding approach
		VoteMsgCoder coder = new VoteMsgTextCoder();
		VoteService service = new VoteService();

		// 循环接收和处理客户端的投票消息:
		while (true) {
			// 为接收数据报文创建DatagramPacket:
			// 在每次迭代中将数据区重置为输入缓存区。 
			DatagramPacket packet = new DatagramPacket(inBuffer, inBuffer.length);
			// 接收数据报文,抽取数据:
			sock.receive(packet);
			byte[] encodedMsg = Arrays.copyOfRange(packet.getData(), 0,
					packet.getLength());
			System.out.println("Handling request from "
					+ packet.getSocketAddress() + " (" + encodedMsg.length
					+ " bytes)");

			// 解码和处理请求
			try {
				VoteMsg msg = coder.fromWire(encodedMsg);
				msg = service.handleRequest(msg);
				// 服务将响应返回给消息。 
				// 编码并发送响应消息:
				packet.setData(coder.toWire(msg));
				System.out.println("Sending response (" + packet.getLength()
						+ " bytes):");
				System.out.println(msg);
				sock.send(packet);
			} catch (IOException ioe) {
				System.err.println("Parse error in message: "
						+ ioe.getMessage());
			}
		}
	}
}