diff --git a/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/MemcachedDecoder.java b/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/MemcachedDecoder.java index 3d8d060b8c2..e8ea51473d1 100644 --- a/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/MemcachedDecoder.java +++ b/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/MemcachedDecoder.java @@ -19,6 +19,7 @@ package org.elasticsearch.memcached.netty; +import org.elasticsearch.Version; import org.elasticsearch.common.Unicode; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.netty.buffer.ChannelBuffer; @@ -49,6 +50,7 @@ public class MemcachedDecoder extends FrameDecoder { private volatile StringBuffer sb = new StringBuffer(); private volatile MemcachedRestRequest request; + private volatile boolean ending = false; public MemcachedDecoder(ESLogger logger) { super(false); @@ -65,7 +67,10 @@ public class MemcachedDecoder extends FrameDecoder { } short magic = buffer.readUnsignedByte(); if (magic == 0x80) { - if (buffer.readableBytes() < 23) return null; + if (buffer.readableBytes() < 23) { + buffer.resetReaderIndex(); // but back magic + return null; + } short opcode = buffer.readUnsignedByte(); short keyLength = buffer.readShort(); short extraLength = buffer.readUnsignedByte(); @@ -139,22 +144,23 @@ public class MemcachedDecoder extends FrameDecoder { int readableBytes = buffer.readableBytes(); for (int i = 0; i < readableBytes; i++) { byte next = buffer.readByte(); - if (next == CR) { - next = buffer.readByte(); - if (next == LF) { - done = true; - break; - } - } else if (next == LF) { + if (!ending && next == CR) { + ending = true; + } else if (ending && next == LF) { + ending = false; done = true; break; + } else if (ending) { + logger.error("Corrupt stream, expected LF, found [0x{}]", Integer.toHexString(next)); + throw new StreamCorruptedException("Expecting LF after CR"); } else { sb.append((char) next); } } if (!done) { - sb.setLength(0); - buffer.resetReaderIndex(); + // let's keep the buffer and bytes read +// buffer.discardReadBytes(); + buffer.markReaderIndex(); return null; } @@ -163,27 +169,36 @@ public class MemcachedDecoder extends FrameDecoder { sb.setLength(0); String cmd = args[0]; - String uri = args[1]; if ("get".equals(cmd)) { - request = new MemcachedRestRequest(RestRequest.Method.GET, uri, null, -1, false); + request = new MemcachedRestRequest(RestRequest.Method.GET, args[1], null, -1, false); if (args.length > 3) { request.setData(Unicode.fromStringAsBytes(args[2])); } return request; } else if ("delete".equals(cmd)) { - request = new MemcachedRestRequest(RestRequest.Method.DELETE, uri, null, -1, false); + request = new MemcachedRestRequest(RestRequest.Method.DELETE, args[1], null, -1, false); // if (args.length > 3) { // request.setData(Unicode.fromStringAsBytes(args[2])); // } return request; } else if ("set".equals(cmd)) { - this.request = new MemcachedRestRequest(RestRequest.Method.POST, uri, null, Integer.parseInt(args[4]), false); + this.request = new MemcachedRestRequest(RestRequest.Method.POST, args[1], null, Integer.parseInt(args[4]), false); buffer.markReaderIndex(); + } else if ("version".equals(cmd)) { // sent as a noop + byte[] bytes = Version.full().getBytes(); + ChannelBuffer writeBuffer = ChannelBuffers.dynamicBuffer(bytes.length); + writeBuffer.writeBytes(bytes); + channel.write(writeBuffer); + return MemcachedDispatcher.IGNORE_REQUEST; } else if ("quit".equals(cmd)) { - channel.disconnect(); + if (channel.isConnected()) { // we maybe in the process of clearing the queued bits + channel.disconnect(); + } } else { logger.error("Unsupported command [{}], ignoring and closing connection", cmd); - channel.disconnect(); + if (channel.isConnected()) { // we maybe in the process of clearing the queued bits + channel.disconnect(); + } return null; } } @@ -215,7 +230,13 @@ public class MemcachedDecoder extends FrameDecoder { @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { this.request = null; - sb.setLength(0); - e.getCause().printStackTrace(); + this.ending = false; + this.sb.setLength(0); + + if (ctx.getChannel().isConnected()) { + ctx.getChannel().disconnect(); + } + + logger.error("caught exception on memcached decoder", e); } -} +} \ No newline at end of file