Error unable to intercept a read client packet

Introduction

Here is the source code for com.comphenix.protocol.injector.netty.ChannelInjector.java

Source

/**
 *  ProtocolLib - Bukkit server library that allows access to the Minecraft protocol.
 *  Copyright (C) 2015 dmulloy2
 *
 *  This program is free software; you can redistribute it and/or modify it under the terms of the
 *  GNU General Public License as published by the Free Software Foundation; either version 2 of
 *  the License, or (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
 *  without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 *  See the GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License along with this program;
 *  if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
 *  02111-1307 USA
 */
package com.comphenix.protocol.injector.netty;

import java.lang.reflect.InvocationTargetException;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.ListIterator;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;

import net.sf.cglib.proxy.Factory;

import org.bukkit.Bukkit;
import org.bukkit.entity.Player;

import com.comphenix.protocol.PacketType;
import com.comphenix.protocol.PacketType.Protocol;
import com.comphenix.protocol.ProtocolLibrary;
import com.comphenix.protocol.error.Report;
import com.comphenix.protocol.error.ReportType;
import com.comphenix.protocol.events.ConnectionSide;
import com.comphenix.protocol.events.NetworkMarker;
import com.comphenix.protocol.events.PacketEvent;
import com.comphenix.protocol.injector.NetworkProcessor;
import com.comphenix.protocol.injector.server.SocketInjector;
import com.comphenix.protocol.reflect.FuzzyReflection;
import com.comphenix.protocol.reflect.VolatileField;
import com.comphenix.protocol.reflect.accessors.Accessors;
import com.comphenix.protocol.reflect.accessors.FieldAccessor;
import com.comphenix.protocol.reflect.accessors.MethodAccessor;
import com.comphenix.protocol.utility.MinecraftFields;
import com.comphenix.protocol.utility.MinecraftMethods;
import com.comphenix.protocol.utility.MinecraftProtocolVersion;
import com.comphenix.protocol.utility.MinecraftReflection;
import com.comphenix.protocol.wrappers.WrappedGameProfile;
import com.google.common.base.Preconditions;
import com.google.common.collect.MapMaker;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.TypeParameterMatcher;

/**
 * Represents a channel injector.
 * @author Kristian
 */
public class ChannelInjector extends ByteToMessageDecoder implements Injector {
    public static final ReportType REPORT_CANNOT_INTERCEPT_SERVER_PACKET = new ReportType(
            "Unable to intercept a written server packet.");
    public static final ReportType REPORT_CANNOT_INTERCEPT_CLIENT_PACKET = new ReportType(
            "Unable to intercept a read client packet.");
    public static final ReportType REPORT_CANNOT_EXECUTE_IN_CHANNEL_THREAD = new ReportType(
            "Cannot execute code in channel thread.");
    public static final ReportType REPORT_CANNOT_FIND_GET_VERSION = new ReportType(
            "Cannot find getVersion() in NetworkMananger");
    public static final ReportType REPORT_CANNOT_SEND_PACKET = new ReportType("Unable to send packet %s to %s");

    /**
     * Indicates that a packet has bypassed packet listeners.
     */
    private static final PacketEvent BYPASSED_PACKET = new PacketEvent(ChannelInjector.class);

    // The login packet
    private static Class<?> PACKET_LOGIN_CLIENT = null;
    private static FieldAccessor LOGIN_GAME_PROFILE = null;

    // Versioning
    private static Class<?> PACKET_SET_PROTOCOL = null;
    private static AttributeKey<Integer> PROTOCOL_KEY = AttributeKey.valueOf("PROTOCOL");

    // Saved accessors
    private static MethodAccessor DECODE_BUFFER;
    private static MethodAccessor ENCODE_BUFFER;
    private static FieldAccessor ENCODER_TYPE_MATCHER;

    // For retrieving the protocol
    private static FieldAccessor PROTOCOL_ACCESSOR;

    // The factory that created this injector
    private InjectionFactory factory;

    // The player, or temporary player
    private Player player;
    private Player updated;
    private String playerName;

    // The player connection
    private Object playerConnection;

    // The current network manager and channel
    private final Object networkManager;
    private final Channel originalChannel;
    private VolatileField channelField;

    // Known network markers
    private ConcurrentMap<Object, NetworkMarker> packetMarker = new MapMaker().weakKeys().makeMap();

    /**
     * Indicate that this packet has been processed by event listeners.
     * <p>
     * This must never be set outside the channel pipeline's thread.
     */
    private PacketEvent currentEvent;

    /**
     * A packet event that should be processed by the write method.
     */
    private PacketEvent finalEvent;

    /**
     * A flag set by the main thread to indiciate that a packet should not be processed.
     */
    private final ThreadLocal<Boolean> scheduleProcessPackets = new ThreadLocal<Boolean>() {
        @Override
        protected Boolean initialValue() {
            return true;
        };
    };

    // Other handlers
    private ByteToMessageDecoder vanillaDecoder;
    private MessageToByteEncoder<Object> vanillaEncoder;

    private Deque<PacketEvent> finishQueue = new ArrayDeque<PacketEvent>();

    // The channel listener
    private ChannelListener channelListener;

    // Processing network markers
    private NetworkProcessor processor;

    // Closed
    private boolean injected;
    private boolean closed;

    /**
     * Construct a new channel injector.
     * @param player - the current player, or temporary player.
     * @param networkManager - its network manager.
     * @param channel - its channel.
     * @param channelListener - a listener.
     * @param factory - the factory that created this injector
     */
    public ChannelInjector(Player player, Object networkManager, Channel channel, ChannelListener channelListener,
            InjectionFactory factory) {
        this.player = Preconditions.checkNotNull(player, "player cannot be NULL");
        this.networkManager = Preconditions.checkNotNull(networkManager, "networkMananger cannot be NULL");
        this.originalChannel = Preconditions.checkNotNull(channel, "channel cannot be NULL");
        this.channelListener = Preconditions.checkNotNull(channelListener, "channelListener cannot be NULL");
        this.factory = Preconditions.checkNotNull(factory, "factory cannot be NULL");
        this.processor = new NetworkProcessor(ProtocolLibrary.getErrorReporter());

        // Get the channel field
        this.channelField = new VolatileField(
                FuzzyReflection.fromObject(networkManager, true).getFieldByType("channel", Channel.class),
                networkManager, true);
    }

    /**
     * Get the version of the current protocol.
     * @return The version.
     */
    @Override
    public int getProtocolVersion() {
        Integer value = originalChannel.attr(PROTOCOL_KEY).get();
        return value != null ? value : MinecraftProtocolVersion.getCurrentVersion();
    }

    @Override
    @SuppressWarnings("unchecked")
    public boolean inject() {
        synchronized (networkManager) {
            if (closed)
                return false;
            if (originalChannel instanceof Factory)
                return false;
            if (!originalChannel.isActive())
                return false;

            // Main thread? We should synchronize with the channel thread, otherwise we might see a
            // pipeline with only some of the handlers removed
            if (Bukkit.isPrimaryThread()) {
                // Just like in the close() method, we'll avoid blocking the main thread
                executeInChannelThread(new Runnable() {
                    @Override
                    public void run() {
                        inject();
                    }
                });
                return false; // We don't know
            }

            // Don't inject the same channel twice
            if (findChannelHandler(originalChannel, ChannelInjector.class) != null) {
                return false;
            }

            // Get the vanilla decoder, so we don't have to replicate the work
            vanillaDecoder = (ByteToMessageDecoder) originalChannel.pipeline().get("decoder");
            vanillaEncoder = (MessageToByteEncoder<Object>) originalChannel.pipeline().get("encoder");

            if (vanillaDecoder == null)
                throw new IllegalArgumentException(
                        "Unable to find vanilla decoder in " + originalChannel.pipeline());
            if (vanillaEncoder == null)
                throw new IllegalArgumentException(
                        "Unable to find vanilla encoder in " + originalChannel.pipeline());
            patchEncoder(vanillaEncoder);

            if (DECODE_BUFFER == null)
                DECODE_BUFFER = Accessors.getMethodAccessor(vanillaDecoder.getClass(), "decode",
                        ChannelHandlerContext.class, ByteBuf.class, List.class);
            if (ENCODE_BUFFER == null)
                ENCODE_BUFFER = Accessors.getMethodAccessor(vanillaEncoder.getClass(), "encode",
                        ChannelHandlerContext.class, Object.class, ByteBuf.class);

            // Intercept sent packets
            MessageToByteEncoder<Object> protocolEncoder = new MessageToByteEncoder<Object>() {
                @Override
                protected void encode(ChannelHandlerContext ctx, Object packet, ByteBuf output) throws Exception {
                    if (packet instanceof WirePacket) {
                        // Special case for wire format
                        ChannelInjector.this.encodeWirePacket((WirePacket) packet, output);
                    } else {
                        ChannelInjector.this.encode(ctx, packet, output);
                    }
                }

                @Override
                public void write(ChannelHandlerContext ctx, Object packet, ChannelPromise promise)
                        throws Exception {
                    super.write(ctx, packet, promise);
                    ChannelInjector.this.finalWrite(ctx, packet, promise);
                }
            };

            // Intercept recieved packets
            ChannelInboundHandlerAdapter finishHandler = new ChannelInboundHandlerAdapter() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    // Execute context first
                    ctx.fireChannelRead(msg);
                    ChannelInjector.this.finishRead(ctx, msg);
                }
            };

            // Insert our handlers - note that we effectively replace the vanilla encoder/decoder
            originalChannel.pipeline().addBefore("decoder", "protocol_lib_decoder", this);
            originalChannel.pipeline().addBefore("protocol_lib_decoder", "protocol_lib_finish", finishHandler);
            originalChannel.pipeline().addAfter("encoder", "protocol_lib_encoder", protocolEncoder);

            // Intercept all write methods
            channelField.setValue(new ChannelProxy(originalChannel, MinecraftReflection.getPacketClass()) {
                // Compatibility with Spigot 1.8
                private final PipelineProxy pipelineProxy = new PipelineProxy(originalChannel.pipeline(), this) {
                    @Override
                    public ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
                        // Correct the position of the decoder
                        if ("decoder".equals(baseName)) {
                            if (super.get("protocol_lib_decoder") != null && guessCompression(handler)) {
                                super.addBefore("protocol_lib_decoder", name, handler);
                                return this;
                            }
                        }

                        return super.addBefore(baseName, name, handler);
                    }
                };

                @Override
                public ChannelPipeline pipeline() {
                    return pipelineProxy;
                }

                @Override
                protected <T> Callable<T> onMessageScheduled(final Callable<T> callable,
                        FieldAccessor packetAccessor) {
                    final PacketEvent event = handleScheduled(callable, packetAccessor);

                    // Handle cancelled events
                    if (event != null && event.isCancelled())
                        return null;

                    return new Callable<T>() {
                        @Override
                        public T call() throws Exception {
                            T result = null;

                            // This field must only be updated in the pipeline thread
                            currentEvent = event;
                            result = callable.call();
                            currentEvent = null;
                            return result;
                        }
                    };
                }

                @Override
                protected Runnable onMessageScheduled(final Runnable runnable, FieldAccessor packetAccessor) {
                    final PacketEvent event = handleScheduled(runnable, packetAccessor);

                    // Handle cancelled events
                    if (event != null && event.isCancelled())
                        return null;

                    return new Runnable() {
                        @Override
                        public void run() {
                            currentEvent = event;
                            runnable.run();
                            currentEvent = null;
                        }
                    };
                }

                protected PacketEvent handleScheduled(Object instance, FieldAccessor accessor) {
                    // Let the filters handle this packet
                    Object original = accessor.get(instance);

                    // See if we've been instructed not to process packets
                    if (!scheduleProcessPackets.get()) {
                        NetworkMarker marker = getMarker(original);

                        if (marker != null) {
                            PacketEvent result = new PacketEvent(ChannelInjector.class);
                            result.setNetworkMarker(marker);
                            return result;
                        } else {
                            return BYPASSED_PACKET;
                        }
                    }
                    PacketEvent event = processSending(original);

                    if (event != null && !event.isCancelled()) {
                        Object changed = event.getPacket().getHandle();

                        // Change packet to be scheduled
                        if (original != changed)
                            accessor.set(instance, changed);
                    }
                    ;
                    return event != null ? event : BYPASSED_PACKET;
                }
            });

            injected = true;
            return true;
        }
    }

    /**
     * Determine if the given object is a compressor or decompressor.
     * @param handler - object to test.
     * @return TRUE if it is, FALSE if not or unknown.
     */
    private boolean guessCompression(ChannelHandler handler) {
        String className = handler != null ? handler.getClass().getCanonicalName() : "";
        return className.contains("Compressor") || className.contains("Decompressor");
    }

    /**
     * Process a given message on the packet listeners.
     * @param message - the message/packet.
     * @return The resulting message/packet.
     */
    private PacketEvent processSending(Object message) {
        return channelListener.onPacketSending(ChannelInjector.this, message, getMarker(message));
    }

    /**
     * This method patches the encoder so that it skips already created packets.
     * @param encoder - the encoder to patch.
     */
    private void patchEncoder(MessageToByteEncoder<Object> encoder) {
        if (ENCODER_TYPE_MATCHER == null) {
            ENCODER_TYPE_MATCHER = Accessors.getFieldAccessor(encoder.getClass(), "matcher", true);
        }
        ENCODER_TYPE_MATCHER.set(encoder, TypeParameterMatcher.get(MinecraftReflection.getPacketClass()));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (channelListener.isDebug())
            cause.printStackTrace();
        super.exceptionCaught(ctx, cause);
    }

    protected void encodeWirePacket(WirePacket packet, ByteBuf output) throws Exception {
        packet.writeId(output);
        packet.writeBytes(output);
    }

    /**
     * Encode a packet to a byte buffer, taking over for the standard Minecraft encoder.
     * @param ctx - the current context.
     * @param packet - the packet to encode to a byte array.
     * @param output - the output byte array.
     * @throws Exception If anything went wrong.
     */
    protected void encode(ChannelHandlerContext ctx, Object packet, ByteBuf output) throws Exception {
        NetworkMarker marker = null;
        PacketEvent event = currentEvent;

        try {
            // Skip every kind of non-filtered packet
            if (!scheduleProcessPackets.get()) {
                return;
            }

            // This packet has not been seen by the main thread
            if (event == null) {
                Class<?> clazz = packet.getClass();

                // Schedule the transmission on the main thread instead
                if (channelListener.hasMainThreadListener(clazz)) {
                    // Delay the packet
                    scheduleMainThread(packet);
                    packet = null;

                } else {
                    event = processSending(packet);

                    // Handle the output
                    if (event != null) {
                        packet = !event.isCancelled() ? event.getPacket().getHandle() : null;
                    }
                }
            }
            if (event != null) {
                // Retrieve marker without accidentally constructing it
                marker = NetworkMarker.getNetworkMarker(event);
            }

            // Process output handler
            if (packet != null && event != null && NetworkMarker.hasOutputHandlers(marker)) {
                ByteBuf packetBuffer = ctx.alloc().buffer();
                ENCODE_BUFFER.invoke(vanillaEncoder, ctx, packet, packetBuffer);

                // Let each handler prepare the actual output
                byte[] data = processor.processOutput(event, marker, getBytes(packetBuffer));

                // Write the result
                output.writeBytes(data);
                packet = null;

                // Sent listeners?
                finalEvent = event;
                return;
            }
        } catch (Exception e) {
            channelListener.getReporter().reportDetailed(this,
                    Report.newBuilder(REPORT_CANNOT_INTERCEPT_SERVER_PACKET).callerParam(packet).error(e).build());
        } finally {
            // Attempt to handle the packet nevertheless
            if (packet != null) {
                ENCODE_BUFFER.invoke(vanillaEncoder, ctx, packet, output);
                finalEvent = event;
            }
        }
    }

    /**
     * Invoked when a packet has been written to the channel.
     * @param ctx - current context.
     * @param packet - the packet that has been written.
     * @param promise - a promise.
     */
    protected void finalWrite(ChannelHandlerContext ctx, Object packet, ChannelPromise promise) {
        PacketEvent event = finalEvent;

        if (event != null) {
            // Necessary to prevent infinite loops
            finalEvent = null;
            currentEvent = null;

            processor.invokePostEvent(event, NetworkMarker.getNetworkMarker(event));
        }
    }

    private void scheduleMainThread(final Object packetCopy) {
        // Don't use BukkitExecutors for this - it has a bit of overhead
        Bukkit.getScheduler().scheduleSyncDelayedTask(factory.getPlugin(), new Runnable() {
            @Override
            public void run() {
                invokeSendPacket(packetCopy);
            }
        });
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuffer, List<Object> packets) throws Exception {
        byteBuffer.markReaderIndex();
        DECODE_BUFFER.invoke(vanillaDecoder, ctx, byteBuffer, packets);

        try {
            // Reset queue
            finishQueue.clear();

            for (ListIterator<Object> it = packets.listIterator(); it.hasNext();) {
                Object input = it.next();
                Class<?> packetClass = input.getClass();
                NetworkMarker marker = null;

                // Special case!
                handleLogin(packetClass, input);

                if (channelListener.includeBuffer(packetClass)) {
                    byteBuffer.resetReaderIndex();
                    marker = new NettyNetworkMarker(ConnectionSide.CLIENT_SIDE, getBytes(byteBuffer));
                }

                PacketEvent output = channelListener.onPacketReceiving(this, input, marker);

                // Handle packet changes
                if (output != null) {
                    if (output.isCancelled()) {
                        it.remove();
                        continue;
                    } else if (output.getPacket().getHandle() != input) {
                        it.set(output.getPacket().getHandle());
                    }

                    finishQueue.addLast(output);
                }
            }
        } catch (Exception e) {
            channelListener.getReporter().reportDetailed(this, Report
                    .newBuilder(REPORT_CANNOT_INTERCEPT_CLIENT_PACKET).callerParam(byteBuffer).error(e).build());
        }
    }

    /**
     * Invoked after our decoder.
     * @param ctx - current context.
     * @param msg - the current packet.
     */
    protected void finishRead(ChannelHandlerContext ctx, Object msg) {
        // Assume same order
        PacketEvent event = finishQueue.pollFirst();

        if (event != null) {
            NetworkMarker marker = NetworkMarker.getNetworkMarker(event);

            if (marker != null) {
                processor.invokePostEvent(event, marker);
            }
        }
    }

    /**
     * Invoked when we may need to handle the login packet.
     * @param packetClass - the packet class.
     * @param packet - the packet.
     */
    protected void handleLogin(Class<?> packetClass, Object packet) {
        // Try to find the login packet class
        if (PACKET_LOGIN_CLIENT == null) {
            PACKET_LOGIN_CLIENT = PacketType.Login.Client.START.getPacketClass();
        }

        // If we can't, there's an issue
        if (PACKET_LOGIN_CLIENT == null) {
            throw new IllegalStateException(
                    "Failed to obtain login start packet. Did you build Spigot with BuildTools?");
        }

        if (LOGIN_GAME_PROFILE == null) {
            LOGIN_GAME_PROFILE = Accessors.getFieldAccessor(PACKET_LOGIN_CLIENT,
                    MinecraftReflection.getGameProfileClass(), true);
        }

        // See if we are dealing with the login packet
        if (PACKET_LOGIN_CLIENT.equals(packetClass)) {
            WrappedGameProfile profile = WrappedGameProfile.fromHandle(LOGIN_GAME_PROFILE.get(packet));

            // Save the channel injector
            factory.cacheInjector(profile.getName(), this);
        }

        if (PACKET_SET_PROTOCOL == null) {
            try {
                PACKET_SET_PROTOCOL = PacketType.Handshake.Client.SET_PROTOCOL.getPacketClass();
            } catch (Throwable ex) {
                PACKET_SET_PROTOCOL = getClass(); // If we can't find it don't worry about it
            }
        }

        if (PACKET_SET_PROTOCOL.equals(packetClass)) {
            FuzzyReflection fuzzy = FuzzyReflection.fromObject(packet);
            try {
                int protocol = (int) fuzzy.invokeMethod(packet, "getProtocol", int.class);
                originalChannel.attr(PROTOCOL_KEY).set(protocol);
            } catch (Throwable ex) {
                // Oh well
            }
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);

        // See NetworkManager.channelActive(ChannelHandlerContext) for why
        if (channelField != null) {
            channelField.refreshValue();
        }
    }

    /**
     * Retrieve every byte in the given byte buffer.
     * @param buffer - the buffer.
     * @return The bytes.
     */
    private byte[] getBytes(ByteBuf buffer) {
        byte[] data = new byte[buffer.readableBytes()];

        buffer.readBytes(data);
        return data;
    }

    /**
     * Disconnect the current player.
     * @param message - the disconnect message, if possible.
     */
    private void disconnect(String message) {
        // If we're logging in, we can only close the channel
        if (playerConnection == null || player instanceof Factory) {
            originalChannel.disconnect();
        } else {
            // Call the disconnect method
            try {
                MinecraftMethods.getDisconnectMethod(playerConnection.getClass()).invoke(playerConnection, message);
            } catch (Exception e) {
                throw new IllegalArgumentException("Unable to invoke disconnect method.", e);
            }
        }
    }

    @Override
    public void sendServerPacket(Object packet, NetworkMarker marker, boolean filtered) {
        saveMarker(packet, marker);

        try {
            scheduleProcessPackets.set(filtered);
            invokeSendPacket(packet);
        } finally {
            scheduleProcessPackets.set(true);
        }
    }

    /**
     * Invoke the sendPacket method in Minecraft.
     * @param packet - the packet to send.
      */
    private void invokeSendPacket(Object packet) {
        // Attempt to send the packet with NetworkMarker.handle(), or the PlayerConnection if its active
        try {
            if (player instanceof Factory) {
                MinecraftMethods.getNetworkManagerHandleMethod().invoke(networkManager, packet,
                        new GenericFutureListener[0]);
            } else {
                MinecraftMethods.getSendPacketMethod().invoke(getPlayerConnection(), packet);
            }
        } catch (Throwable ex) {
            ProtocolLibrary.getErrorReporter().reportWarning(this, Report.newBuilder(REPORT_CANNOT_SEND_PACKET)
                    .messageParam(packet, playerName).error(ex).build());
        }
    }

    @Override
    public void recieveClientPacket(final Object packet) {
        // TODO: Ensure the packet listeners are executed in the channel thread.

        // Execute this in the channel thread
        Runnable action = new Runnable() {
            @Override
            public void run() {
                try {
                    MinecraftMethods.getNetworkManagerReadPacketMethod().invoke(networkManager, null, packet);
                } catch (Exception e) {
                    // Inform the user
                    ProtocolLibrary.getErrorReporter().reportMinimal(factory.getPlugin(), "recieveClientPacket", e);
                }
            }
        };

        // Execute in the worker thread
        if (originalChannel.eventLoop().inEventLoop()) {
            action.run();
        } else {
            originalChannel.eventLoop().execute(action);
        }
    }

    @Override
    public Protocol getCurrentProtocol() {
        if (PROTOCOL_ACCESSOR == null) {
            PROTOCOL_ACCESSOR = Accessors.getFieldAccessor(networkManager.getClass(),
                    MinecraftReflection.getEnumProtocolClass(), true);
        }
        return Protocol.fromVanilla((Enum<?>) PROTOCOL_ACCESSOR.get(networkManager));
    }

    /**
     * Retrieve the player connection of the current player.
     * @return The player connection.
     */
    private Object getPlayerConnection() {
        if (playerConnection == null) {
            playerConnection = MinecraftFields.getPlayerConnection(getPlayer());
        }
        return playerConnection;
    }

    @Override
    public NetworkMarker getMarker(Object packet) {
        return packetMarker.get(packet);
    }

    @Override
    public void saveMarker(Object packet, NetworkMarker marker) {
        if (marker != null) {
            packetMarker.put(packet, marker);
        }
    }

    @Override
    public Player getPlayer() {
        if (player == null && playerName != null) {
            return Bukkit.getPlayer(playerName);
        }

        return player;
    }

    /**
     * Set the player instance.
     * @param player - current instance.
     */
    @Override
    public void setPlayer(Player player) {
        this.player = player;
        this.playerName = player.getName();
    }

    /**
     * Set the updated player instance.
     * @param updated - updated instance.
     */
    @Override
    public void setUpdatedPlayer(Player updated) {
        this.updated = updated;
        this.playerName = updated.getName();
    }

    @Override
    public boolean isInjected() {
        return injected;
    }

    /**
     * Determine if this channel has been closed and cleaned up.
     * @return TRUE if it has, FALSE otherwise.
     */
    @Override
    public boolean isClosed() {
        return closed;
    }

    @Override
    public void close() {
        if (!closed) {
            closed = true;

            if (injected) {
                channelField.revertValue();

                // Calling remove() in the main thread will block the main thread, which may lead
                // to a deadlock:
                //    http://pastebin.com/L3SBVKzp
                //
                // ProtocolLib executes this close() method through a PlayerQuitEvent in the main thread,
                // which has implicitly aquired a lock on SimplePluginManager (see SimplePluginManager.callEvent(Event)).
                // Unfortunately, the remove() method will schedule the removal on one of the Netty worker threads if
                // it's called from a different thread, blocking until the removal has been confirmed.
                //
                // This is bad enough (Rule #1: Don't block the main thread), but the real trouble starts if the same
                // worker thread happens to be handling a server ping connection when this removal task is scheduled.
                // In that case, it may attempt to invoke an asynchronous ServerPingEvent (see PacketStatusListener)
                // using SimplePluginManager.callEvent(). But, since this has already been locked by the main thread,
                // we end up with a deadlock. The main thread is waiting for the worker thread to process the task, and
                // the worker thread is waiting for the main thread to finish executing PlayerQuitEvent.
                //
                // TL;DR: Concurrency is hard.
                executeInChannelThread(new Runnable() {
                    @Override
                    public void run() {
                        String[] handlers = new String[] { "protocol_lib_decoder", "protocol_lib_finish",
                                "protocol_lib_encoder" };

                        for (String handler : handlers) {
                            try {
                                originalChannel.pipeline().remove(handler);
                            } catch (NoSuchElementException e) {
                                // Ignore
                            }
                        }
                    }
                });

                // Clear cache
                factory.invalidate(player);

                // Clear player instances
                // Should help fix memory leaks
                this.player = null;
                this.updated = null;
            }
        }
    }

    /**
     * Execute a specific command in the channel thread.
     * <p>
     * Exceptions are printed through the standard error reporter mechanism.
     * @param command - the command to execute.
     */
    private void executeInChannelThread(final Runnable command) {
        originalChannel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                try {
                    command.run();
                } catch (Exception e) {
                    ProtocolLibrary.getErrorReporter().reportDetailed(ChannelInjector.this,
                            Report.newBuilder(REPORT_CANNOT_EXECUTE_IN_CHANNEL_THREAD).error(e).build());
                }
            }
        });
    }

    /**
     * Find the first channel handler that is assignable to a given type.
     * @param channel - the channel.
     * @param clazz - the type.
     * @return The first handler, or NULL.
     */
    public static ChannelHandler findChannelHandler(Channel channel, Class<?> clazz) {
        for (Entry<String, ChannelHandler> entry : channel.pipeline()) {
            if (clazz.isAssignableFrom(entry.getValue().getClass())) {
                return entry.getValue();
            }
        }
        return null;
    }

    /**
     * Represents a socket injector that foreards to the current channel injector.
     * @author Kristian
     */
    public static class ChannelSocketInjector implements SocketInjector {
        private final ChannelInjector injector;

        public ChannelSocketInjector(ChannelInjector injector) {
            this.injector = Preconditions.checkNotNull(injector, "injector cannot be NULL");
        }

        @Override
        public Socket getSocket() throws IllegalAccessException {
            return SocketAdapter.adapt((SocketChannel) injector.originalChannel);
        }

        @Override
        public SocketAddress getAddress() throws IllegalAccessException {
            return injector.originalChannel.remoteAddress();
        }

        @Override
        public void disconnect(String message) throws InvocationTargetException {
            injector.disconnect(message);
        }

        @Override
        public void sendServerPacket(Object packet, NetworkMarker marker, boolean filtered)
                throws InvocationTargetException {
            injector.sendServerPacket(packet, marker, filtered);
        }

        @Override
        public Player getPlayer() {
            return injector.getPlayer();
        }

        @Override
        public Player getUpdatedPlayer() {
            return injector.updated;
        }

        @Override
        public void transferState(SocketInjector delegate) {
            // Do nothing
        }

        @Override
        public void setUpdatedPlayer(Player updatedPlayer) {
            injector.setPlayer(updatedPlayer);
        }

        public ChannelInjector getChannelInjector() {
            return injector;
        }
    }

    public Channel getChannel() {
        return originalChannel;
    }
}