Minecraft at Scale: Cross-Server Communication

Jul 7, 2017   #bukkit  #java 

Intro

I’ve written a lot of “Cores” for Minecraft servers in the past. These monolithic messes are the beating heart of your average Minecraft network, handling everything from player teleportation between servers, cross server communication, player metadata, metrics, and sometimes general persistence abstractions.

The communication between servers for different things such as:

  • Transporting players from server to server
  • Sending messages to players
  • Kicking players from the network as a whole
  • Notifying a server of an incoming player’s desire to play a specific game, or enter a specific area immediately upon join

This isn’t too hard to manage, with a well defined domain, but the real problem starts to hit when people are writing plugins that need to also do cross-node communication of their own.

Although not a particularly complex idea, in my most recent stab at the problem I think I stumbled upon a solution that will satiate the external plugins for a long while.

Services: an example

The solution that we arrived at was something we called “Services.”

Services are sets of methods that a node can “declare” to implement. A lobby server can therefore, given a reference to another lobby server, ask for a specific “Service” from it’s target, and communicate using whatever language the two plugins desire.

PlayerService

This is an example implementation of a MovementService, which is used to teleport players from server to server.

package sh.joey.core.services.player;

import sh.joey.core.player.MPlayer;
import sh.joey.core.services.RpcCall;
import sh.joey.core.services.Service;

public interface PlayerService extends Service {
    RpcCall<Void> messagePlayer(CPlayer target, String message);
    RpcCall<Void> kickPlayer(CPlayer target, String message);
}

Neat, right?

Here’s the real question, though- how do you actually implement such an interface?

BukkitPlayerService

To keep things simple, I’ll show the BukkitPlayerService implementing this service

package sh.joey.core.bukkit.services;

import sh.joey.core.player.MPlayer;
import sh.joey.core.services.RpcCall;
import sh.joey.core.services.player.PlayerService;
import org.bukkit.Bukkit;
import org.bukkit.entity.Player;

public final class BukkitPlayerService implements PlayerService {
    @Override
    public RpcCall<Void> messagePlayer(CPlayer target, String message) {
        resolvePlayer(target).sendMessage(message);
        return RpcCall.ok();
    }

    @Override
    public RpcCall<Void> kickPlayer(CPlayer target, String message) {
        resolvePlayer(target).kickPlayer(message);
        return RpcCall.ok();
    }

    private static Player resolvePlayer(CPlayer from) {
        Player player = Bukkit.getPlayer(from.getUuid());
        if (player == null)
            throw new IllegalArgumentException("player is not on this server");

        return player;
    }
}

Registering a service implementation

A node clearly has a need to “declare” itself an implementor of a service. When the server turns on, a plugin providing a “parties” functionality will register it’s presence on the server so that other parties plugins can message the server. You can do something as trivial as:

selfNode.implementService(PartyService.class, new PartyServiceImpl());

Using the service remotely

Let’s say you’d like to send a message to a player, then wait for 3 seconds, and kick them from the server. Let’s also say, for the sake of a beautiful contrived example, that you want to do that all remotely to a player on a different server.

Using some simple RxJava operators, we can implement this trivially.

public static Single<Void> warnPlayerKick(Node node, CPlayer target, String msg) {
  PlayerService service = node.getService(PlayerService.class);

  return service.messagePlayer(target, msg)
             .fire()
             .delay(3, TimeUnit.SECONDS, syncScheduler)
             .flatMap(ignored -> service.kickPlayer(target, msg).fire());
}

Implementation

There’s three main parts to an implementation of such a system.

  • Transport
  • Encoding
  • Java-fu (abusing proxies)

Transport: AMQP

My actual implementation of this sort of system is nearly verbatim for the code samples provided, but that obscures some pretty important details. Namely, how the actual RPC calls take place.

I’ve read often that re-inventing your own RPC is normally not a very good idea, but I’ve also read equally as much on the specifics of implementing a simple RPC system. In the actual implementation, I did end up writing my own RPC system of sorts, but using AMQP to back it up.

I specifically chose AMQP to do my messaging because I trusted it to deliver the message to the target node more often than a publish/subscribe system based on Redis. Using a queue to do my RPC calls guarantees that the code on the “other end” at least checks the message. AMQP goes a step further with a very nice NACK/ACK system so that I can requeue messages automatically if there’s a failure on the receiver end.

Encoding: GSON

There’s clearly no need for a section on the magic of GSON. If you’re unfamiliar with the GSON library, which is used for JSON encoding, you can read more about it here.

The actual structure of the messages is quite simple to implement. The first thing I implemented was the message format for the “call.”

package sh.joey.core.rpc.mq;

import com.google.gson.JsonElement;
import java.util.List;

final class MqRpcMethodCall {
    private String id;
    private String fromId;
    private String serviceFQCN;
    private String methodName;
    private List<JsonElement> parameters;
    private List<String> parameterTypes;

    ...
}

The other part of this equation is the format of replies:

package com.mineteria.mucore.rpc.mq;

import com.google.gson.Gson;
import com.google.gson.JsonElement;

final class MqRpcReply {
    public static final transient String RPC_ERROR = "err",
                                         RPC_REPLY = "reply";

    private String id;
    private String from;
    private JsonElement value;
    private String valueType;
    private String type; //"err" or "reply" from above

    ...
}

Java Proxies, making the RPC calls

Okay, so now we have the format defined, and it looks like it can encapsulate all the data we’re interested in. The next step is to actually make method calls to an instance of an interface cause the messages to get queued.

This can be achieved by a mechanism called dynamic proxies, or sometimes just “proxies.”

To accomplish this, we’ll need a few things. First:

  • We need to create a dynamic proxy to accept calls to methods
  • After sending messages, we’ll need to represent a “subscription” to the reply somehow
  • We’ll need to listen to our “reply” queue and notify the caller once we get a relevant message in that queue

For context, here’s some fields we’ll put on this class we’ll be constructing through this section of the article.

@Inject private MessageQueue messageQueue;
@Inject private Gson gson;
@Inject private NodeSelf nodeSelf;
@Inject @Named("net") private Scheduler scheduler;
@Inject private ServiceExceptionHandler exceptionHandler;

private final Map<String, SingleSubscriber<?>> pendingRpcSubscribers = new ConcurrentHashMap<>();

Here’s the first bit of code, constructing the dynamic proxy:

@SuppressWarnings("unchecked")
@Override
public <T> T getService(Class<T> type, String nodeId) {
    return (T) Proxy.newProxyInstance(
                type.getClassLoader(),
                new Class[]{type},
                (proxy, method, args) -> prepareRpcCall(nodeId, type, method, args));
}

You see here, though, we’ve hidden the important bit. The prepareRpcCall definition has within it the actual code that gets run on any proxied call. Let’s have a closer look at that:

private RpcCall prepareRpcCall(String nodeId, Class type, Method method, Object[] args) {
    if (args == null) {
        args = new Object[0];
    }

    Object[] finalArgs = args;

    return new RpcCall() {
        @Override
        public Single fire() {
            return sendRpcCall(nodeId,
                               newRpcCall(),
                               MqConstants.TIMEOUT,
                               MqConstants.TIMEOUT_UNIT);
        }

        @Override
        public void fireAndForget() {
            messageQueue.sendMessage(MqConstants.queueName(nodeId,
                                                           MqConstants.MQ_RPC_SEND_CHAN),
                                     newRpcCall()).subscribeOn(scheduler).doOnError(err -> {
                logger.severe("error during fire and forget mq rpc call");
                exceptionHandler.log("fire and forget", err);
                err.printStackTrace();
            }).subscribe();
        }

        private MqRpcMethodCall newRpcCall() {
            List<JsonElement> argList = Arrays.stream(finalArgs)
                                              .map(gson::toJsonTree)
                                              .collect(Collectors.toList());

            List<String> argListTypes = Arrays.stream(finalArgs)
                                              .map(o -> o.getClass().getName())
                                              .collect(Collectors.toList());

            return new MqRpcMethodCall(nodeSelf.getId(),
                                       type.getName(),
                                       method.getName(),
                                       argList,
                                       argListTypes);
        }
    };
}

Notice the important section: newRpcCall which generates an instance of the previously described MqRpcMethodCall class. This describes, to the remote host, which method to call and with what parameters (along with their exact types to decode to).

We’ll get into how the remote end actually invokes the method in a moment, but let’s move to how a reply is received by the local end. You’ll notice another cop-out method call hiding the juicy implementation details in the earlier block of code. We need to see the definition of sendRpcCall which should return a Single (from reactivex).

//we have no type on this Single because we don't actually know what the return type will be
private Single sendRpcCall(String to, MqRpcMethodCall call, long timeout, TimeUnit unit) {
    return Single.create(subscriber -> {
        String id = call.getId();
        pendingRpcSubscribers.put(id, subscriber);
        subscriber.add(Subscriptions.create(() -> pendingRpcSubscribers.remove(id)));
        String queue = MqConstants.queueName(to, MqConstants.MQ_RPC_SEND_CHAN);
        subscriber.add(messageQueue.sendMessage(queue, call).subscribe(ignored -> {}, subscriber::onError));
    }).timeout(timeout, unit, scheduler);
}

And here’s the magic. Notice pendingRpcSubscribers is used here, as well as the actual adding to the message queue. The actual mechanism by which the reply is sent to the subscriber is the last lingering interesting detail. Let’s have a look at that:

@SuppressWarnings({"unchecked", "IfCanBeSwitch"})
@PostConstruct
private void listenForReplies() {
    String queue = MqConstants.queueName(nodeSelf.getId(), MqConstants.MQ_RPC_REPLY_CHAN);
    messageQueue.watchMessages(queue).subscribe(message -> {
        MqRpcReply reply = message.readAs(MqRpcReply.class);
        String id = reply.getId();
        SingleSubscriber sub = pendingRpcSubscribers.remove(id);
        if (sub == null) {
            logger.info("there was no subscriber for " + id);
            return;
        }

        Class<?> valueType;
        try {
            valueType = Class.forName(reply.getValueType());
        } catch (ClassNotFoundException e) {
            exceptionHandler.log("reading reply value class", e);
            return;
        }

        Object value;
        try {
            value = gson.fromJson(reply.getValue(), valueType);
        } catch (JsonParseException e) {
            exceptionHandler.log("parsing json value", e, valueType.getName());
            return;
        }

        String type = reply.getType();
        if (type.equals(MqRpcReply.RPC_REPLY)) {
            sub.onSuccess(value);
        } else if (type.equals(MqRpcReply.RPC_ERROR)) {
            sub.onError((Throwable) value);
        }
    });
}

This code is extremely simple, thankfully, and the first few lines show how subscribers are notified of their replies (after the rest of this code extracts the meaning from the messages).

Handling the incoming calls

The final piece missing from all of this is how messages are handled on the remote end.

The code is actually very similar to the code used to watch for replies on the local end:

@PostConstruct
private void listenForCalls() {
    String queue = MqConstants.queueName(nodeSelf.getId(), MqConstants.MQ_RPC_SEND_CHAN);
    messageQueue.watchMessages(queue).subscribe(message -> {
        MqRpcMethodCall rpcCall = message.readAs(MqRpcMethodCall.class);
        MqRpcReply rpcReply;
        try {
            rpcReply = handleMessage(message, rpcCall);
        } catch (MqRpcHandleException e) {
            e.printTo(exceptionHandler);
            message.nack(false)
                   .timeout(MqConstants.TIMEOUT, MqConstants.TIMEOUT_UNIT)
                   .subscribe();
            return;
        } catch (Exception e) {
            exceptionHandler.log("handling message", e);
            return;
        }

        try {
          String queueName = MqConstants.queueName(to, MqConstants.MQ_RPC_REPLY_CHAN);
          messageQueue.sendMessage(queueName, rpcReply)
            .timeout(MqConstants.TIMEOUT, MqConstants.TIMEOUT_UNIT)
            .subscribe(ignored -> {}, err -> exceptionHandler.log("sending reply", err));
        } catch (Exception e) {
            exceptionHandler.log("sending reply", e);
        }
    });
}

You can see the simplicity of all of this. handleMessage might hide some of the important information as to how it’s actually handled, but given the code seen above for registering a service instance (recall selfNode.implementService(PartyService.class, new PartyServiceImpl());) we can assume it’s as simple as getting the service from a map and reflectively invoking the method.