前一段时间需要选择一个Java RPC库,看了Avro,Thrift,Protocol Buffer等,感觉使用起来都太复杂,最后遇到了kryonet,试了试,确实使用简单,拥有简洁的API,实现一个RPC,只需要几行代码,并且是纯Java开发,轻量级,代码量小,没有多余的复杂功能,与kryo集成。借此机会,通过这篇文章对kryonet进行了简单的分析。

以下的代码去掉了日志记录代码和处理UDP协议的代码 ##Listener机制 kryonet通过为客户端和服务器端添加Listener来处理接收到的数据。Client端和Server都有addListener方法来添加自定义的Listener对象,Listener类的代码如下:

/** 用于通知连接事件 */
public class Listener {
	/**当连接建立时,这个方法会被调用。这个方法会在任何对象被接受之前调用。
	由于这个方法在Client.update(int)和Server.update(int)这两个线程中
	调用,所以不应该长时间阻塞**/
	public void connected (Connection connection) {
	}

	/**当远程连接断开时,被调用。这个方法不保证一定会被调用**/
	public void disconnected (Connection connection) {
	}

	 /**当从远程连接中接收了数据时,这个方法会被调用,同样在Client.update(int)和Server.update(int)这两个线程中
	调用,所以不应该长时间阻塞**/
	public void received (Connection connection, Object object) {
	}

	/** Called when the connection is below the {@link Connection#setIdleThreshold(float) idle threshold}. */
	/**当Connection的idle阈值低于Connection.setIdleThreshold(float)时会
	被调用**/
	public void idle (Connection connection) {
	}
}

由上面的方法可以看到,Listener机制主要应用于连接建立与断开和接收到数据时的处理,在kryonet中,调用Server或者Client的addListener方法添加自定义的Listener对象,每次连接建立、断开或接收到数据时,调用每个Listener的相应方法处理。 kryonet默认提供了ReflectionListenerQueuedListenerThreadedListenerLagListener等实现。

##服务器端 在kryonet中,不管是启动服务器端还是客户端,都要先调用start()方法,然后服务端调用bind方法,客户端调用connect方法。服务端调用start方法后,就启动了一个服务线程,这个线程,run方法如下:

public void run () {
		shutdown = false;
		while (!shutdown) {
			try {
				update(250);
			} catch (IOException ex) {
				close();
			}
		}
}

从上面的代码可以看到,服务端不断的调用update方法来处理客户端请求,在update方法中接收客户端的连接请求,从socket中读写数据。这三种操作都是常规的Java NIO操作。 处理客户端连接请求部分的代码如下:

if ((ops & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
  ServerSocketChannel serverChannel = this.serverChannel;
  if (serverChannel == null) continue;
  try {
    SocketChannel socketChannel = serverChannel.accept();
    if (socketChannel != null) acceptOperation(socketChannel);
  } catch (IOException ex) {
    if (DEBUG) debug("kryonet", "Unable to accept new connection.", ex);
  }
  continue;
}

主要调用Server.acceptOperation()方法进行处理接收连接请求的处理,代码如下:

private void acceptOperation (SocketChannel socketChannel) {
		Connection connection = newConnection();//创建一个Connection
		connection.initialize(serialization, writeBufferSize, objectBufferSize);
		connection.endPoint = this;
		UdpConnection udp = this.udp;
		if (udp != null) connection.udp = udp;
		try {
			SelectionKey selectionKey = connection.tcp.accept(selector, socketChannel);
			selectionKey.attach(connection);

			int id = nextConnectionID++;
			if (nextConnectionID == -1) nextConnectionID = 1;
			connection.id = id;
			connection.setConnected(true);
			connection.addListener(dispatchListener);

			if (udp == null)
				addConnection(connection);
			else
				pendingConnections.put(id, connection);
			//客户端connect成功之后,就向客户端发送一个RegisterTCP的包
			RegisterTCP registerConnection = new RegisterTCP();
			registerConnection.connectionID = id;
			connection.sendTCP(registerConnection);

			if (udp == null) connection.notifyConnected();
		} catch (IOException ex) {
			connection.close();
		}
	}

acceptOperation方法中,首先创建了一个对应的连接,并进行相应的初始化,如初始化读写缓存大小,分配连接ID号等等,其中有一个重要的操作是connection.addListener(dispatchListener),这行代码将Server中所有的Listener对象与该连接关联,这样,对于该连接的所有操作,都可以通过这些Listener对象进行处理。待连接建立完成后,服务端就向客户端发送一个RegisterTCP对象所表示的数据包,通知客户端连接建立成功,连接ID是connectionID

处理向客户端写数据的操作部分代码如下:

//Write操作
if ((ops & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
  try {
    fromConnection.tcp.writeOperation();
  } catch (IOException ex) {
    fromConnection.close();
  }
}

即直接向客户端发送数据,在Connection对象中,有读缓存和写缓存,和TcpConnection对象,TcpConnection类抽象了TCP连接,发送数据和接收数据都是通过TcpConnection类完成的。

处理读取客户端数据的操作部分代码如下:

//read操作
if ((ops & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
  try {
    while (true) {
      Object object = fromConnection.tcp.readObject(fromConnection);
      if (object == null) break;
      fromConnection.notifyReceived(object);
    }
  } catch (IOException ex) {
    fromConnection.close();
  } catch (KryoNetException ex) {
    fromConnection.close();
  }
}

首先读取出数据对象,然后调用Connection.notifyReceived()来处理读取到的数据,notifyReceived方法代码如下:

void notifyReceived (Object object) {
  if (object instanceof Ping) {
    Ping ping = (Ping)object;
    if (ping.isReply) {
      if (ping.id == lastPingID - 1) {
        returnTripTime = (int)(System.currentTimeMillis() - lastPingSendTime);
      }
    } else {
      ping.isReply = true;
      sendTCP(ping);
    }
  }
  Listener[] listeners = this.listeners;
  for (int i = 0, n = listeners.length; i < n; i++)
    listeners[i].received(this, object);
}

如果是要给Ping请求消息,那么就回复一个Ping回复消息,如果是一个Ping回复消息,那么就记录Ping消息往返的时间,如果是其他消息,那么就调用与当前Connection关联的所有的Listener进行处理。

##客户端 Client类表示客户端,与服务器端Server类似,Client对象创建以后,Client要执行Client.start()方法和Client.connect()方法,并且start方法要先于connect方法执行,否则会客户会失败。 客户端在建立连接时,在connect方法中向服务端发送一个tcp连接请求,服务端在接受到这个连接请求后就会向客户端返回一个RegisterTCP对象,包含了连接的ID,而接收数据包的操作在另一个线程中执行,Client是通过start方法启动这个线程的。即先启动读写Socket的线程,再向Server发送连接请求,这样才能正确建立KryoNet的连接。

客户端的读写Socket线程用于周期性的从服务器端读数据或想服务器端写数据,由于客户端不涉及到接收请求,所以在这个线程中只有对Socket的读写操作,这个处理过程也是常规的Java NIO操作,向服务器端写数据时,直接调用TcpConnection.writeOperation()方法,从服务器端读数据时,读到数据对象后,如果是RegisterTCP对象,则调用Connection.notifyConnected()方法,否则调用Connection.notifyReceived()方法。 从服务器端读数据的部分代码如下:

if ((ops & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
  if (selectionKey.attachment() == tcp) {
    while (true) {
      Object object = tcp.readObject(this);
      if (object == null) break;
      if (!tcpRegistered) {
        if (object instanceof RegisterTCP) {
          id = ((RegisterTCP)object).connectionID;
          synchronized (tcpRegistrationLock) {
            tcpRegistered = true;
            tcpRegistrationLock.notifyAll();
            if (udp == null) setConnected(true);
          }
          if (udp == null) notifyConnected();
        }
        continue;
      }
      if (!isConnected) continue;
      notifyReceived(object);
    }
  }
}

notifyReceivednotifyConnected方法中都是调用了注册的Listener来处理读取到的数据。

##keepalive kryonet中有一种keepalive机制来检测tcp双方的连接是否正常,在TcpConnection类中方法needsKeepAlive用于判断是否需要发送keepalive数据包。 Client.keepAlive()方法和Server.keepAlive() 分别在各自的update(int)方法中被调用,实现若需要保持keeplive就发送一个空包FrameworkMessage.KeepAlive来检测是否正常,有keepAliveMillistimeoutMillis属性分别设置keepalive时间和timeout时间,timeout时间大于keepalive时间。TcpConnection.needsKeepAlive()用于判断是否需要发送keeplive包。

public boolean needsKeepAlive (long time) {
		//没有超时
		return socketChannel != null && keepAliveMillis > 0 && time - lastWriteTime > keepAliveMillis;//当前时间距离上次向Socket写数据的时间大于keepAliveMillis
	}

在客户端的update(int)方法的最后会检查此次调用是否超时,如果超时,会关闭连接:

if (isConnected) {
	long time = System.currentTimeMillis();
	if (tcp.isTimedOut(time)) {
		if (DEBUG) debug("kryonet", this + " timed out.");
		info("kryonet", this + " timed out.");
		close();//在这里关闭连接
	} else
		keepAlive();
	if (isIdle()) notifyIdle();
}

在服务器端的update(int)方法的最后也会检测所有的连接是否超时,如果有超时的连接,就关闭,如果连接没超时就根据需要发送Keeplive,代码如下:

long time = System.currentTimeMillis();
	Connection[] connections = this.connections;
	for (int i = 0, n = connections.length; i < n; i++) {
		Connection connection = connections[i];
		if (connection.tcp.isTimedOut(time)) {
			connection.close();
		} else {
			if (connection.tcp.needsKeepAlive(time)) connection.sendTCP(FrameworkMessage.keepAlive);
		}
		if (connection.isIdle()) connection.notifyIdle();
}

##RPC 当使用kryonet做RPC时,kryonet会要求用户针对每个Connection创建一个ObjectSpace对象,同时需要调用ObjectSpace.addConnection()方法将该Connection对象添加到Connection集合,在addConnection中,会默认为该Connection对象注册一个名为invokeListener的Listener对象,这个对象作用是在服务端执行真正的RPC方法,它是一个ObjectSpace类的匿名类,代码如下:

private final Listener invokeListener = new Listener() {
  public void received (final Connection connection, Object object) {
    if (!(object instanceof InvokeMethod)) return;
    if (connections != null) {
      int i = 0, n = connections.length;
      for (; i < n; i++)
        if (connection == connections[i]) break;
      if (i == n) return; // The InvokeMethod message is not for a connection in this ObjectSpace.
    }
    final InvokeMethod invokeMethod = (InvokeMethod)object;
    final Object target = idToObject.get(invokeMethod.objectID);
    if (target == null) {
      if (WARN) warn("kryonet", "Ignoring remote invocation request for unknown object ID: " + invokeMethod.objectID);
      return;
    }
    if (executor == null)
      invoke(connection, target, invokeMethod);
    else {
      executor.execute(new Runnable() {
        public void run () {
          invoke(connection, target, invokeMethod);
        }
      });
    }
}

该匿名类重写了Listener的received()方法,在服务器端接收到客户端的调用后,就使用反射调用目标方法,从上面的代码中可以看到,如果executor为空,则在当前线程中调用,否则,就在通过该连接池启动一个新的线程调用,由于received()方法在服务器端的update(int)方法执行的线程中调用,所以为了其他RPC方法的调用不超时,最好的方式时在创建ObjectSpace对象时,为其传递一个Executor实例。 RPC客户端则通过Java动态代理机制来实现,主要代码在ObjectSpace.getRemoteObject()方法和RemoteInvocationHandler类中

##总结 kryonet确实是一个简洁的Java网络库,通过Listener机制,kryonet能够很方便的进行扩展,例如通过实现一个Listener.received()松实现了RPC功能。其TCP和UDP通信建立在Java NIO基础之上,比较适合简单的网络高效通信,如果是实现一个简单的RPC功能,确实是一个不错的选择。