kryonet源码分析
前一段时间需要选择一个Java RPC库,看了Avro,Thrift,Protocol Buffer等,感觉使用起来都太复杂,最后遇到了kryonet,试了试,确实使用简单,拥有简洁的API,实现一个RPC,只需要几行代码,并且是纯Java开发,轻量级,代码量小,没有多余的复杂功能,与kryo集成。借此机会,通过这篇文章对kryonet进行了简单的分析。
以下的代码去掉了日志记录代码和处理UDP协议的代码
##Listener机制
kryonet通过为客户端和服务器端添加Listener来处理接收到的数据。Client端和Server都有addListener
方法来添加自定义的Listener
对象,Listener类的代码如下:
/** 用于通知连接事件 */
public class Listener {
/**当连接建立时,这个方法会被调用。这个方法会在任何对象被接受之前调用。
由于这个方法在Client.update(int)和Server.update(int)这两个线程中
调用,所以不应该长时间阻塞**/
public void connected (Connection connection) {
}
/**当远程连接断开时,被调用。这个方法不保证一定会被调用**/
public void disconnected (Connection connection) {
}
/**当从远程连接中接收了数据时,这个方法会被调用,同样在Client.update(int)和Server.update(int)这两个线程中
调用,所以不应该长时间阻塞**/
public void received (Connection connection, Object object) {
}
/** Called when the connection is below the {@link Connection#setIdleThreshold(float) idle threshold}. */
/**当Connection的idle阈值低于Connection.setIdleThreshold(float)时会
被调用**/
public void idle (Connection connection) {
}
}
由上面的方法可以看到,Listener机制主要应用于连接建立与断开和接收到数据时的处理,在kryonet中,调用Server或者Client的addListener方法添加自定义的Listener对象,每次连接建立、断开或接收到数据时,调用每个Listener的相应方法处理。
kryonet默认提供了ReflectionListener
,QueuedListener
,ThreadedListener
,LagListener
等实现。
##服务器端 在kryonet中,不管是启动服务器端还是客户端,都要先调用start()方法,然后服务端调用bind方法,客户端调用connect方法。服务端调用start方法后,就启动了一个服务线程,这个线程,run方法如下:
public void run () {
shutdown = false;
while (!shutdown) {
try {
update(250);
} catch (IOException ex) {
close();
}
}
}
从上面的代码可以看到,服务端不断的调用update方法来处理客户端请求,在update方法中接收客户端的连接请求,从socket中读写数据。这三种操作都是常规的Java NIO操作。 处理客户端连接请求部分的代码如下:
if ((ops & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
ServerSocketChannel serverChannel = this.serverChannel;
if (serverChannel == null) continue;
try {
SocketChannel socketChannel = serverChannel.accept();
if (socketChannel != null) acceptOperation(socketChannel);
} catch (IOException ex) {
if (DEBUG) debug("kryonet", "Unable to accept new connection.", ex);
}
continue;
}
主要调用Server.acceptOperation()
方法进行处理接收连接请求的处理,代码如下:
private void acceptOperation (SocketChannel socketChannel) {
Connection connection = newConnection();//创建一个Connection
connection.initialize(serialization, writeBufferSize, objectBufferSize);
connection.endPoint = this;
UdpConnection udp = this.udp;
if (udp != null) connection.udp = udp;
try {
SelectionKey selectionKey = connection.tcp.accept(selector, socketChannel);
selectionKey.attach(connection);
int id = nextConnectionID++;
if (nextConnectionID == -1) nextConnectionID = 1;
connection.id = id;
connection.setConnected(true);
connection.addListener(dispatchListener);
if (udp == null)
addConnection(connection);
else
pendingConnections.put(id, connection);
//客户端connect成功之后,就向客户端发送一个RegisterTCP的包
RegisterTCP registerConnection = new RegisterTCP();
registerConnection.connectionID = id;
connection.sendTCP(registerConnection);
if (udp == null) connection.notifyConnected();
} catch (IOException ex) {
connection.close();
}
}
在acceptOperation
方法中,首先创建了一个对应的连接,并进行相应的初始化,如初始化读写缓存大小,分配连接ID号等等,其中有一个重要的操作是connection.addListener(dispatchListener)
,这行代码将Server中所有的Listener
对象与该连接关联,这样,对于该连接的所有操作,都可以通过这些Listener
对象进行处理。待连接建立完成后,服务端就向客户端发送一个RegisterTCP
对象所表示的数据包,通知客户端连接建立成功,连接ID是connectionID
。
处理向客户端写数据的操作部分代码如下:
//Write操作
if ((ops & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
try {
fromConnection.tcp.writeOperation();
} catch (IOException ex) {
fromConnection.close();
}
}
即直接向客户端发送数据,在Connection对象中,有读缓存和写缓存,和TcpConnection对象,TcpConnection类抽象了TCP连接,发送数据和接收数据都是通过TcpConnection类完成的。
处理读取客户端数据的操作部分代码如下:
//read操作
if ((ops & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
try {
while (true) {
Object object = fromConnection.tcp.readObject(fromConnection);
if (object == null) break;
fromConnection.notifyReceived(object);
}
} catch (IOException ex) {
fromConnection.close();
} catch (KryoNetException ex) {
fromConnection.close();
}
}
首先读取出数据对象,然后调用Connection.notifyReceived()
来处理读取到的数据,notifyReceived
方法代码如下:
void notifyReceived (Object object) {
if (object instanceof Ping) {
Ping ping = (Ping)object;
if (ping.isReply) {
if (ping.id == lastPingID - 1) {
returnTripTime = (int)(System.currentTimeMillis() - lastPingSendTime);
}
} else {
ping.isReply = true;
sendTCP(ping);
}
}
Listener[] listeners = this.listeners;
for (int i = 0, n = listeners.length; i < n; i++)
listeners[i].received(this, object);
}
如果是要给Ping请求消息,那么就回复一个Ping回复消息,如果是一个Ping回复消息,那么就记录Ping消息往返的时间,如果是其他消息,那么就调用与当前Connection关联的所有的Listener进行处理。
##客户端
Client类表示客户端,与服务器端Server类似,Client对象创建以后,Client要执行Client.start()方法和Client.connect()方法,并且start方法要先于connect方法执行,否则会客户会失败。
客户端在建立连接时,在connect方法中向服务端发送一个tcp连接请求,服务端在接受到这个连接请求后就会向客户端返回一个RegisterTCP
对象,包含了连接的ID,而接收数据包的操作在另一个线程中执行,Client是通过start方法启动这个线程的。即先启动读写Socket的线程,再向Server发送连接请求,这样才能正确建立KryoNet的连接。
客户端的读写Socket线程用于周期性的从服务器端读数据或想服务器端写数据,由于客户端不涉及到接收请求,所以在这个线程中只有对Socket的读写操作,这个处理过程也是常规的Java NIO操作,向服务器端写数据时,直接调用TcpConnection.writeOperation()
方法,从服务器端读数据时,读到数据对象后,如果是RegisterTCP
对象,则调用Connection.notifyConnected()
方法,否则调用Connection.notifyReceived()
方法。
从服务器端读数据的部分代码如下:
if ((ops & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
if (selectionKey.attachment() == tcp) {
while (true) {
Object object = tcp.readObject(this);
if (object == null) break;
if (!tcpRegistered) {
if (object instanceof RegisterTCP) {
id = ((RegisterTCP)object).connectionID;
synchronized (tcpRegistrationLock) {
tcpRegistered = true;
tcpRegistrationLock.notifyAll();
if (udp == null) setConnected(true);
}
if (udp == null) notifyConnected();
}
continue;
}
if (!isConnected) continue;
notifyReceived(object);
}
}
}
在notifyReceived
和notifyConnected
方法中都是调用了注册的Listener
来处理读取到的数据。
##keepalive
kryonet中有一种keepalive机制来检测tcp双方的连接是否正常,在TcpConnection类中方法needsKeepAlive
用于判断是否需要发送keepalive数据包。
Client.keepAlive()
方法和Server.keepAlive()
分别在各自的update(int)
方法中被调用,实现若需要保持keeplive就发送一个空包FrameworkMessage.KeepAlive
来检测是否正常,有keepAliveMillis
和timeoutMillis
属性分别设置keepalive时间和timeout时间,timeout时间大于keepalive时间。TcpConnection.needsKeepAlive()
用于判断是否需要发送keeplive包。
public boolean needsKeepAlive (long time) {
//没有超时
return socketChannel != null && keepAliveMillis > 0 && time - lastWriteTime > keepAliveMillis;//当前时间距离上次向Socket写数据的时间大于keepAliveMillis
}
在客户端的update(int)
方法的最后会检查此次调用是否超时,如果超时,会关闭连接:
if (isConnected) {
long time = System.currentTimeMillis();
if (tcp.isTimedOut(time)) {
if (DEBUG) debug("kryonet", this + " timed out.");
info("kryonet", this + " timed out.");
close();//在这里关闭连接
} else
keepAlive();
if (isIdle()) notifyIdle();
}
在服务器端的update(int)
方法的最后也会检测所有的连接是否超时,如果有超时的连接,就关闭,如果连接没超时就根据需要发送Keeplive,代码如下:
long time = System.currentTimeMillis();
Connection[] connections = this.connections;
for (int i = 0, n = connections.length; i < n; i++) {
Connection connection = connections[i];
if (connection.tcp.isTimedOut(time)) {
connection.close();
} else {
if (connection.tcp.needsKeepAlive(time)) connection.sendTCP(FrameworkMessage.keepAlive);
}
if (connection.isIdle()) connection.notifyIdle();
}
##RPC
当使用kryonet做RPC时,kryonet会要求用户针对每个Connection创建一个ObjectSpace对象,同时需要调用ObjectSpace.addConnection()
方法将该Connection对象添加到Connection集合,在addConnection中,会默认为该Connection对象注册一个名为invokeListener
的Listener对象,这个对象作用是在服务端执行真正的RPC方法,它是一个ObjectSpace类的匿名类,代码如下:
private final Listener invokeListener = new Listener() {
public void received (final Connection connection, Object object) {
if (!(object instanceof InvokeMethod)) return;
if (connections != null) {
int i = 0, n = connections.length;
for (; i < n; i++)
if (connection == connections[i]) break;
if (i == n) return; // The InvokeMethod message is not for a connection in this ObjectSpace.
}
final InvokeMethod invokeMethod = (InvokeMethod)object;
final Object target = idToObject.get(invokeMethod.objectID);
if (target == null) {
if (WARN) warn("kryonet", "Ignoring remote invocation request for unknown object ID: " + invokeMethod.objectID);
return;
}
if (executor == null)
invoke(connection, target, invokeMethod);
else {
executor.execute(new Runnable() {
public void run () {
invoke(connection, target, invokeMethod);
}
});
}
}
该匿名类重写了Listener的received()
方法,在服务器端接收到客户端的调用后,就使用反射调用目标方法,从上面的代码中可以看到,如果executor为空,则在当前线程中调用,否则,就在通过该连接池启动一个新的线程调用,由于received()
方法在服务器端的update(int)
方法执行的线程中调用,所以为了其他RPC方法的调用不超时,最好的方式时在创建ObjectSpace对象时,为其传递一个Executor实例。
RPC客户端则通过Java动态代理机制来实现,主要代码在ObjectSpace.getRemoteObject()
方法和RemoteInvocationHandler
类中
##总结
kryonet确实是一个简洁的Java网络库,通过Listener机制,kryonet能够很方便的进行扩展,例如通过实现一个Listener.received()
松实现了RPC功能。其TCP和UDP通信建立在Java NIO基础之上,比较适合简单的网络高效通信,如果是实现一个简单的RPC功能,确实是一个不错的选择。