From 1ee2f80e688e875824b56071ea54543b579e3ba6 Mon Sep 17 00:00:00 2001
From: kimchy <kimchy@gmail.com>
Date: Sun, 15 Aug 2010 02:57:22 +0300
Subject: [PATCH] Transport: add global compression support compressing all
 internal transport communication (using lzf), closes #321.

---
 .idea/dictionaries/kimchy.xml                 |   1 +
 ...arch_Tests.xml => ElasticSearch_Tests.xml} |   2 +-
 ...l_.xml => ElasticSearch_Tests__Local_.xml} |   4 +-
 .../ElasticSearch_Tests__compress_.xml        |  40 +++++
 .../common/compress/lzf/ChunkEncoder.java     |  20 +++
 .../common/compress/lzf/LZFChunk.java         |  25 +++
 .../common/compress/lzf/LZFEncoder.java       |  22 +++
 .../common/compress/lzf/LZFInputStream.java   |  12 +-
 .../common/compress/lzf/LZFOutputStream.java  |   8 +-
 .../common/io/stream/CachedStreamInput.java   |  61 +++++++
 .../common/io/stream/CachedStreamOutput.java  |  27 +++-
 .../common/io/stream/HandlesStreamInput.java  |  20 ---
 .../common/io/stream/HandlesStreamOutput.java |   7 +-
 .../common/io/stream/LZFStreamInput.java      | 130 +++++++++++++++
 .../common/io/stream/LZFStreamOutput.java     | 109 +++++++++++++
 .../zen/ping/multicast/MulticastZenPing.java  |   4 +-
 .../http/netty/NettyHttpServerTransport.java  |   2 +-
 .../elasticsearch/transport/Transport.java    |  28 ----
 .../transport/TransportRequestOptions.java    |  11 ++
 .../transport/TransportResponseOptions.java   |  11 ++
 .../transport/local/LocalTransport.java       |  12 +-
 .../local/LocalTransportChannel.java          |  14 +-
 .../netty/ChannelBufferStreamInput.java       |  84 ++++++++--
 .../netty/MessageChannelHandler.java          |  40 +++--
 .../transport/netty/NettyTransport.java       |  41 ++---
 .../netty/NettyTransportChannel.java          |  24 +--
 .../transport/support/TransportStreams.java   | 151 ++++++++++++++++++
 .../compress/lzf/LZFInputStreamTests.java     | 110 +++++++++++++
 .../compress/lzf/LZFOutputStreamTests.java    |  96 +++++++++++
 .../AbstractSimpleTransportTests.java         |  48 ++++++
 .../netty/NettyMemcachedServerTransport.java  |   2 +-
 31 files changed, 1017 insertions(+), 149 deletions(-)
 rename .idea/runConfigurations/{Elastic_Search_Tests.xml => ElasticSearch_Tests.xml} (94%)
 rename .idea/runConfigurations/{Elastic_Search_Tests__Local_.xml => ElasticSearch_Tests__Local_.xml} (87%)
 create mode 100644 .idea/runConfigurations/ElasticSearch_Tests__compress_.xml
 create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamInput.java
 create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamInput.java
 create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamOutput.java
 create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/transport/support/TransportStreams.java
 create mode 100644 modules/elasticsearch/src/test/java/org/elasticsearch/common/compress/lzf/LZFInputStreamTests.java
 create mode 100644 modules/elasticsearch/src/test/java/org/elasticsearch/common/compress/lzf/LZFOutputStreamTests.java

diff --git a/.idea/dictionaries/kimchy.xml b/.idea/dictionaries/kimchy.xml
index 34aab1f0327..5a4fc0f8438 100644
--- a/.idea/dictionaries/kimchy.xml
+++ b/.idea/dictionaries/kimchy.xml
@@ -39,6 +39,7 @@
       <w>deserialize</w>
       <w>docid</w>
       <w>elasticsearch</w>
+      <w>encodable</w>
       <w>estab</w>
       <w>failover</w>
       <w>flushable</w>
diff --git a/.idea/runConfigurations/Elastic_Search_Tests.xml b/.idea/runConfigurations/ElasticSearch_Tests.xml
similarity index 94%
rename from .idea/runConfigurations/Elastic_Search_Tests.xml
rename to .idea/runConfigurations/ElasticSearch_Tests.xml
index 40d12d65d0f..ec3cd7fb2ca 100644
--- a/.idea/runConfigurations/Elastic_Search_Tests.xml
+++ b/.idea/runConfigurations/ElasticSearch_Tests.xml
@@ -1,5 +1,5 @@
 <component name="ProjectRunConfigurationManager">
-  <configuration default="false" name="Elastic Search Tests" type="TestNG" factoryName="TestNG">
+  <configuration default="false" name="ElasticSearch Tests" type="TestNG" factoryName="TestNG">
     <extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea" />
     <extension name="snapshooter" />
     <module name="" />
diff --git a/.idea/runConfigurations/Elastic_Search_Tests__Local_.xml b/.idea/runConfigurations/ElasticSearch_Tests__Local_.xml
similarity index 87%
rename from .idea/runConfigurations/Elastic_Search_Tests__Local_.xml
rename to .idea/runConfigurations/ElasticSearch_Tests__Local_.xml
index 888fc4d08c8..33f7f8f324f 100644
--- a/.idea/runConfigurations/Elastic_Search_Tests__Local_.xml
+++ b/.idea/runConfigurations/ElasticSearch_Tests__Local_.xml
@@ -1,5 +1,5 @@
 <component name="ProjectRunConfigurationManager">
-  <configuration default="false" name="Elastic Search Tests (Local)" type="TestNG" factoryName="TestNG">
+  <configuration default="false" name="ElasticSearch Tests (Local)" type="TestNG" factoryName="TestNG">
     <extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea" />
     <extension name="snapshooter" />
     <module name="" />
@@ -32,7 +32,7 @@
       <option name="LOCAL" value="true" />
     </RunnerSettings>
     <RunnerSettings RunnerId="Profile ">
-      <option name="myExternalizedOptions" value="&#10;snapshots-dir=&#10;additional-options2=onexit\=snapshot&#10;" />
+      <option name="myExternalizedOptions" value="&#10;additional-options2=onexit\=snapshot&#10;" />
     </RunnerSettings>
     <RunnerSettings RunnerId="Run" />
     <ConfigurationWrapper RunnerId="Debug" />
diff --git a/.idea/runConfigurations/ElasticSearch_Tests__compress_.xml b/.idea/runConfigurations/ElasticSearch_Tests__compress_.xml
new file mode 100644
index 00000000000..2d8bf86b0a2
--- /dev/null
+++ b/.idea/runConfigurations/ElasticSearch_Tests__compress_.xml
@@ -0,0 +1,40 @@
+<component name="ProjectRunConfigurationManager">
+  <configuration default="false" name="ElasticSearch Tests (compress)" type="TestNG" factoryName="TestNG">
+    <extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea" />
+    <extension name="snapshooter" />
+    <module name="" />
+    <option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false" />
+    <option name="ALTERNATIVE_JRE_PATH" value="" />
+    <option name="SUITE_NAME" value="" />
+    <option name="PACKAGE_NAME" value="" />
+    <option name="MAIN_CLASS_NAME" value="" />
+    <option name="METHOD_NAME" value="" />
+    <option name="GROUP_NAME" value="" />
+    <option name="TEST_OBJECT" value="PACKAGE" />
+    <option name="VM_PARAMETERS" value="-Des.transport.tcp.compress=true -Xmx512m" />
+    <option name="PARAMETERS" value="" />
+    <option name="WORKING_DIRECTORY" value="file://$PROJECT_DIR$" />
+    <option name="OUTPUT_DIRECTORY" value="" />
+    <option name="ANNOTATION_TYPE" value="JDK" />
+    <option name="ENV_VARIABLES" />
+    <option name="PASS_PARENT_ENVS" value="true" />
+    <option name="TEST_SEARCH_SCOPE">
+      <value defaultName="wholeProject" />
+    </option>
+    <option name="USE_DEFAULT_REPORTERS" value="false" />
+    <option name="PROPERTIES_FILE" value="" />
+    <envs />
+    <properties />
+    <listeners />
+    <RunnerSettings RunnerId="Debug">
+      <option name="DEBUG_PORT" value="57221" />
+      <option name="TRANSPORT" value="0" />
+      <option name="LOCAL" value="true" />
+    </RunnerSettings>
+    <RunnerSettings RunnerId="Profile ">
+      <option name="myExternalizedOptions" value="&#10;additional-options2=onexit\=snapshot&#10;" />
+    </RunnerSettings>
+    <ConfigurationWrapper RunnerId="Debug" />
+    <method />
+  </configuration>
+</component>
\ No newline at end of file
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/ChunkEncoder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/ChunkEncoder.java
index 2a35b562bcf..7c9bab7951e 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/ChunkEncoder.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/ChunkEncoder.java
@@ -30,6 +30,9 @@
 
 package org.elasticsearch.common.compress.lzf;
 
+import java.io.IOException;
+import java.io.OutputStream;
+
 /**
  * Class that handles actual encoding of individual chunks.
  * Resulting chunks can be compressed or non-compressed; compression
@@ -77,6 +80,23 @@ public class ChunkEncoder {
         _encodeBuffer = new byte[bufferLen];
     }
 
+    /**
+     * Method for compressing (or not) individual chunks
+     */
+    public int encodeChunk(OutputStream os, byte[] data, int offset, int len) throws IOException {
+        if (len >= MIN_BLOCK_TO_COMPRESS) {
+            /* If we have non-trivial block, and can compress it by at least
+             * 2 bytes (since header is 2 bytes longer), let's compress:
+             */
+            int compLen = tryCompress(data, offset, offset + len, _encodeBuffer, 0);
+            if (compLen < (len - 2)) { // nah; just return uncompressed
+                return LZFChunk.createCompressed(os, len, _encodeBuffer, 0, compLen);
+            }
+        }
+        // Otherwise leave uncompressed:
+        return LZFChunk.createNonCompressed(os, data, offset, len);
+    }
+
     /**
      * Method for compressing (or not) individual chunks
      */
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFChunk.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFChunk.java
index 50ad3d8b97a..c89ab1324d4 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFChunk.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFChunk.java
@@ -30,6 +30,9 @@
 
 package org.elasticsearch.common.compress.lzf;
 
+import java.io.IOException;
+import java.io.OutputStream;
+
 /**
  * Helper class used to store LZF encoded segments (compressed and non-compressed)
  * that can be sequenced to produce LZF files/streams.
@@ -59,6 +62,18 @@ public class LZFChunk {
         _data = data;
     }
 
+    public static int createCompressed(OutputStream os, int origLen, byte[] encData, int encPtr, int encLen) throws IOException {
+        os.write(BYTE_Z);
+        os.write(BYTE_V);
+        os.write(BLOCK_TYPE_COMPRESSED);
+        os.write(encLen >> 8);
+        os.write(encLen);
+        os.write((origLen >> 8));
+        os.write(origLen);
+        os.write(encData, encPtr, encLen);
+        return encLen + 7;
+    }
+
     /**
      * Factory method for constructing compressed chunk
      */
@@ -75,6 +90,16 @@ public class LZFChunk {
         return new LZFChunk(result);
     }
 
+    public static int createNonCompressed(OutputStream os, byte[] plainData, int ptr, int len) throws IOException {
+        os.write(BYTE_Z);
+        os.write(BYTE_V);
+        os.write(BLOCK_TYPE_NON_COMPRESSED);
+        os.write(len >> 8);
+        os.write(len);
+        os.write(plainData, ptr, len);
+        return len + 5;
+    }
+
     /**
      * Factory method for constructing compressed chunk
      */
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFEncoder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFEncoder.java
index 6e9f6435ab0..34432160b43 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFEncoder.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFEncoder.java
@@ -31,6 +31,7 @@
 package org.elasticsearch.common.compress.lzf;
 
 import java.io.IOException;
+import java.io.OutputStream;
 
 /**
  * Encoder that handles splitting of input into chunks to encode,
@@ -49,6 +50,27 @@ public class LZFEncoder {
         return encode(data, data.length);
     }
 
+
+    public static void encode(OutputStream os, byte[] data, int length) throws IOException {
+        int left = length;
+        ChunkEncoder enc = new ChunkEncoder(left);
+        int chunkLen = Math.min(LZFChunk.MAX_CHUNK_LEN, left);
+        enc.encodeChunk(os, data, 0, chunkLen);
+        left -= chunkLen;
+        // shortcut: if it all fit in, no need to coalesce:
+        if (left < 1) {
+            return;
+        }
+        int inputOffset = chunkLen;
+
+        do {
+            chunkLen = Math.min(left, LZFChunk.MAX_CHUNK_LEN);
+            enc.encodeChunk(os, data, inputOffset, chunkLen);
+            inputOffset += chunkLen;
+            left -= chunkLen;
+        } while (left > 0);
+    }
+
     /**
      * Method for compressing given input data using LZF encoding and
      * block structure (compatible with lzf command line utility).
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFInputStream.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFInputStream.java
index 3f98a53074d..768852c0999 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFInputStream.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFInputStream.java
@@ -61,20 +61,26 @@ public class LZFInputStream extends InputStream {
     }
 
     public int read(final byte[] buffer, final int offset, final int length) throws IOException {
+        // FIXED HERE: handle 0 length cases
+        if (length == 0) {
+            return 0;
+        }
         int outputPos = offset;
         readyBuffer();
         if (bufferLength == -1) {
             return -1;
         }
 
-        while (outputPos < buffer.length && bufferPosition < bufferLength) {
-            int chunkLength = Math.min(bufferLength - bufferPosition, buffer.length - outputPos);
+        // FIXED HERE: fixed to use length
+        while (outputPos < length && bufferPosition < bufferLength) {
+            int chunkLength = Math.min(bufferLength - bufferPosition, length - outputPos);
             System.arraycopy(uncompressedBytes, bufferPosition, buffer, outputPos, chunkLength);
             outputPos += chunkLength;
             bufferPosition += chunkLength;
             readyBuffer();
         }
-        return outputPos;
+        // FIXED HERE: fixed to return actual length read
+        return outputPos - offset;
     }
 
     public void close() throws IOException {
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFOutputStream.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFOutputStream.java
index 01c3837568b..e20b6bb5abc 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFOutputStream.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFOutputStream.java
@@ -28,7 +28,8 @@ public class LZFOutputStream extends OutputStream {
 
     private final OutputStream outputStream;
 
-    private byte[] outputBuffer = new byte[OUTPUT_BUFFER_SIZE];
+    private final byte[] outputBuffer = new byte[OUTPUT_BUFFER_SIZE];
+    private final ChunkEncoder encoder = new ChunkEncoder(OUTPUT_BUFFER_SIZE);
     private int position = 0;
 
     public LZFOutputStream(final OutputStream outputStream) {
@@ -81,8 +82,7 @@ public class LZFOutputStream extends OutputStream {
      * Compress and write the current block to the OutputStream
      */
     private void writeCompressedBlock() throws IOException {
-        final byte[] compressedBytes = LZFEncoder.encode(outputBuffer, position);
-        outputStream.write(compressedBytes);
+        encoder.encodeChunk(outputStream, outputBuffer, 0, position);
         position = 0;
-	}
+    }
 }
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamInput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamInput.java
new file mode 100644
index 00000000000..853291d9af4
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamInput.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.io.stream;
+
+import org.elasticsearch.common.thread.ThreadLocals;
+
+import java.io.IOException;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class CachedStreamInput {
+
+    static class Entry {
+        final HandlesStreamInput handles;
+        final LZFStreamInput lzf;
+
+        Entry(HandlesStreamInput handles, LZFStreamInput lzf) {
+            this.handles = handles;
+            this.lzf = lzf;
+        }
+    }
+
+    private static final ThreadLocal<ThreadLocals.CleanableValue<Entry>> cache = new ThreadLocal<ThreadLocals.CleanableValue<Entry>>() {
+        @Override protected ThreadLocals.CleanableValue<Entry> initialValue() {
+            HandlesStreamInput handles = new HandlesStreamInput();
+            LZFStreamInput lzf = new LZFStreamInput();
+            return new ThreadLocals.CleanableValue<Entry>(new Entry(handles, lzf));
+        }
+    };
+
+    public static HandlesStreamInput cachedHandles(StreamInput in) {
+        HandlesStreamInput handles = cache.get().get().handles;
+        handles.reset(in);
+        return handles;
+    }
+
+    public static HandlesStreamInput cachedHandlesLzf(StreamInput in) throws IOException {
+        Entry entry = cache.get().get();
+        entry.lzf.reset(in);
+        entry.handles.reset(entry.lzf);
+        return entry.handles;
+    }
+}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java
index d439f1e26ed..6e9d3a118c2 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java
@@ -31,10 +31,12 @@ public class CachedStreamOutput {
     static class Entry {
         final BytesStreamOutput bytes;
         final HandlesStreamOutput handles;
+        final LZFStreamOutput lzf;
 
-        Entry(BytesStreamOutput bytes, HandlesStreamOutput handles) {
+        Entry(BytesStreamOutput bytes, HandlesStreamOutput handles, LZFStreamOutput lzf) {
             this.bytes = bytes;
             this.handles = handles;
+            this.lzf = lzf;
         }
     }
 
@@ -42,7 +44,8 @@ public class CachedStreamOutput {
         @Override protected ThreadLocals.CleanableValue<Entry> initialValue() {
             BytesStreamOutput bytes = new BytesStreamOutput();
             HandlesStreamOutput handles = new HandlesStreamOutput(bytes);
-            return new ThreadLocals.CleanableValue<Entry>(new Entry(bytes, handles));
+            LZFStreamOutput lzf = new LZFStreamOutput(bytes);
+            return new ThreadLocals.CleanableValue<Entry>(new Entry(bytes, handles, lzf));
         }
     };
 
@@ -55,9 +58,23 @@ public class CachedStreamOutput {
         return os;
     }
 
-    public static HandlesStreamOutput cachedHandles() throws IOException {
-        HandlesStreamOutput os = cache.get().get().handles;
-        os.reset();
+    public static LZFStreamOutput cachedLZFBytes() throws IOException {
+        LZFStreamOutput lzf = cache.get().get().lzf;
+        lzf.reset();
+        return lzf;
+    }
+
+    public static HandlesStreamOutput cachedHandlesLzfBytes() throws IOException {
+        Entry entry = cache.get().get();
+        HandlesStreamOutput os = entry.handles;
+        os.reset(entry.lzf);
+        return os;
+    }
+
+    public static HandlesStreamOutput cachedHandlesBytes() throws IOException {
+        Entry entry = cache.get().get();
+        HandlesStreamOutput os = entry.handles;
+        os.reset(entry.bytes);
         return os;
     }
 }
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamInput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamInput.java
index 4151b30ff36..75f33432170 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamInput.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamInput.java
@@ -19,7 +19,6 @@
 
 package org.elasticsearch.common.io.stream;
 
-import org.elasticsearch.common.thread.ThreadLocals;
 import org.elasticsearch.common.trove.TIntObjectHashMap;
 
 import java.io.IOException;
@@ -29,25 +28,6 @@ import java.io.IOException;
  */
 public class HandlesStreamInput extends StreamInput {
 
-    public static class Cached {
-
-        private static final ThreadLocal<ThreadLocals.CleanableValue<HandlesStreamInput>> cache = new ThreadLocal<ThreadLocals.CleanableValue<HandlesStreamInput>>() {
-            @Override protected ThreadLocals.CleanableValue<HandlesStreamInput> initialValue() {
-                return new ThreadLocals.CleanableValue<HandlesStreamInput>(new HandlesStreamInput());
-            }
-        };
-
-        /**
-         * Returns the cached thread local byte stream, with its internal stream cleared.
-         */
-        public static HandlesStreamInput cached(StreamInput in) {
-            HandlesStreamInput os = cache.get().get();
-            os.reset(in);
-            return os;
-        }
-    }
-
-
     private StreamInput in;
 
     private final TIntObjectHashMap<String> handles = new TIntObjectHashMap<String>();
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamOutput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamOutput.java
index a86d90cf013..a2c6a23da85 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamOutput.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamOutput.java
@@ -35,7 +35,7 @@ public class HandlesStreamOutput extends StreamOutput {
     // a threshold above which strings will use identity check
     private final int identityThreshold;
 
-    private final StreamOutput out;
+    private StreamOutput out;
 
     private final TObjectIntHashMap<String> handles = new ExtTObjectIntHasMap<String>().defaultReturnValue(-1);
 
@@ -91,6 +91,11 @@ public class HandlesStreamOutput extends StreamOutput {
         out.reset();
     }
 
+    public void reset(StreamOutput out) throws IOException {
+        this.out = out;
+        reset();
+    }
+
     @Override public void flush() throws IOException {
         out.flush();
     }
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamInput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamInput.java
new file mode 100644
index 00000000000..dc7851d041b
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamInput.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.io.stream;
+
+import org.elasticsearch.common.compress.lzf.LZFChunk;
+import org.elasticsearch.common.compress.lzf.LZFDecoder;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class LZFStreamInput extends StreamInput {
+    public static int EOF_FLAG = -1;
+
+    /* the current buffer of compressed bytes */
+    private final byte[] compressedBytes = new byte[LZFChunk.MAX_CHUNK_LEN];
+
+    /* the buffer of uncompressed bytes from which */
+    private final byte[] uncompressedBytes = new byte[LZFChunk.MAX_CHUNK_LEN];
+
+    /* The current position (next char to output) in the uncompressed bytes buffer. */
+    private int bufferPosition = 0;
+
+    /* Length of the current uncompressed bytes buffer */
+    private int bufferLength = 0;
+
+    private StreamInput in;
+
+    public LZFStreamInput() {
+    }
+
+    public LZFStreamInput(StreamInput in) throws IOException {
+        this.in = in;
+        // we need to read the first buffer here, since it might be a VOID message, and we need to at least read the LZF header
+        readyBuffer();
+    }
+
+    @Override public int read() throws IOException {
+        int returnValue = EOF_FLAG;
+        readyBuffer();
+        if (bufferPosition < bufferLength) {
+            returnValue = (uncompressedBytes[bufferPosition++] & 255);
+        }
+        return returnValue;
+    }
+
+    @Override public int read(byte[] b, int off, int len) throws IOException {
+        if (len == 0) {
+            return 0;
+        }
+        int outputPos = off;
+        readyBuffer();
+        if (bufferLength == -1) {
+            return -1;
+        }
+
+        while (outputPos < len && bufferPosition < bufferLength) {
+            int chunkLength = Math.min(bufferLength - bufferPosition, len - outputPos);
+            System.arraycopy(uncompressedBytes, bufferPosition, b, outputPos, chunkLength);
+            outputPos += chunkLength;
+            bufferPosition += chunkLength;
+            readyBuffer();
+        }
+        return outputPos - off;
+    }
+
+    @Override public byte readByte() throws IOException {
+        readyBuffer();
+        if (bufferPosition < bufferLength) {
+            return (uncompressedBytes[bufferPosition++]);
+        }
+        throw new EOFException();
+    }
+
+    @Override public void readBytes(byte[] b, int offset, int len) throws IOException {
+        int result = read(b, offset, len);
+        if (result < len) {
+            throw new EOFException();
+        }
+    }
+
+    @Override public void reset() throws IOException {
+        this.bufferPosition = 0;
+        this.bufferLength = 0;
+        in.reset();
+    }
+
+    public void reset(StreamInput in) throws IOException {
+        this.in = in;
+        this.bufferPosition = 0;
+        this.bufferLength = 0;
+        // we need to read the first buffer here, since it might be a VOID message, and we need to at least read the LZF header
+        readyBuffer();
+    }
+
+    @Override public void close() throws IOException {
+        in.close();
+    }
+
+    /**
+     * Fill the uncompressed bytes buffer by reading the underlying inputStream.
+     *
+     * @throws java.io.IOException
+     */
+    private void readyBuffer() throws IOException {
+        if (bufferPosition >= bufferLength) {
+            bufferLength = LZFDecoder.decompressChunk(in, compressedBytes, uncompressedBytes);
+            bufferPosition = 0;
+        }
+    }
+}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamOutput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamOutput.java
new file mode 100644
index 00000000000..d5718a98de6
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamOutput.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.io.stream;
+
+import org.elasticsearch.common.compress.lzf.ChunkEncoder;
+import org.elasticsearch.common.compress.lzf.LZFChunk;
+
+import java.io.IOException;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class LZFStreamOutput extends StreamOutput {
+
+    private StreamOutput out;
+
+    private final byte[] outputBuffer = new byte[LZFChunk.MAX_CHUNK_LEN];
+    private final ChunkEncoder encoder = new ChunkEncoder(LZFChunk.MAX_CHUNK_LEN);
+
+    private int position = 0;
+
+    public LZFStreamOutput(StreamOutput out) {
+        this.out = out;
+    }
+
+    @Override public void write(final int singleByte) throws IOException {
+        if (position >= outputBuffer.length) {
+            writeCompressedBlock();
+        }
+        outputBuffer[position++] = (byte) (singleByte & 0xff);
+    }
+
+    @Override public void writeByte(byte b) throws IOException {
+        if (position >= outputBuffer.length) {
+            writeCompressedBlock();
+        }
+        outputBuffer[position++] = b;
+    }
+
+    @Override public void writeBytes(byte[] b, int offset, int length) throws IOException {
+        int inputCursor = offset;
+        int remainingBytes = length;
+        while (remainingBytes > 0) {
+            if (position >= outputBuffer.length) {
+                writeCompressedBlock();
+            }
+            int chunkLength = (remainingBytes > (outputBuffer.length - position)) ? outputBuffer.length - position : remainingBytes;
+            System.arraycopy(b, inputCursor, outputBuffer, position, chunkLength);
+            position += chunkLength;
+            remainingBytes -= chunkLength;
+            inputCursor += chunkLength;
+        }
+    }
+
+    @Override public void flush() throws IOException {
+        try {
+            writeCompressedBlock();
+        } finally {
+            out.flush();
+        }
+    }
+
+    @Override public void close() throws IOException {
+        try {
+            flush();
+        } finally {
+            out.close();
+        }
+    }
+
+    @Override public void reset() throws IOException {
+        this.position = 0;
+        out.reset();
+    }
+
+    public void reset(StreamOutput out) throws IOException {
+        this.out = out;
+        reset();
+    }
+
+    public StreamOutput wrappedOut() {
+        return this.out;
+    }
+
+    /**
+     * Compress and write the current block to the OutputStream
+     */
+    private void writeCompressedBlock() throws IOException {
+        encoder.encodeChunk(out, outputBuffer, 0, position);
+        position = 0;
+    }
+}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java
index 40698abadc4..57c385f0bc2 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java
@@ -221,7 +221,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
     private void sendPingRequest(int id) {
         synchronized (sendMutex) {
             try {
-                HandlesStreamOutput out = CachedStreamOutput.cachedHandles();
+                HandlesStreamOutput out = CachedStreamOutput.cachedHandlesBytes();
                 out.writeInt(id);
                 clusterName.writeTo(out);
                 nodesProvider.nodes().localNode().writeTo(out);
@@ -311,7 +311,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
                             continue;
                         }
                         try {
-                            StreamInput input = HandlesStreamInput.Cached.cached(new BytesStreamInput(datagramPacketReceive.getData(), datagramPacketReceive.getOffset(), datagramPacketReceive.getLength()));
+                            StreamInput input = CachedStreamInput.cachedHandles(new BytesStreamInput(datagramPacketReceive.getData(), datagramPacketReceive.getOffset(), datagramPacketReceive.getLength()));
                             id = input.readInt();
                             clusterName = ClusterName.readClusterName(input);
                             requestingNodeX = readNode(input);
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java
index 753915b6fc6..a94c06ea629 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java
@@ -107,7 +107,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
         this.networkService = networkService;
         ByteSizeValue maxContentLength = componentSettings.getAsBytesSize("max_content_length", settings.getAsBytesSize("http.max_content_length", new ByteSizeValue(100, ByteSizeUnit.MB)));
         this.workerCount = componentSettings.getAsInt("worker_count", Runtime.getRuntime().availableProcessors());
-        this.blockingServer = componentSettings.getAsBoolean("http.blocking_server", componentSettings.getAsBoolean(TCP_BLOCKING_SERVER, componentSettings.getAsBoolean(TCP_BLOCKING, false)));
+        this.blockingServer = settings.getAsBoolean("http.blocking_server", settings.getAsBoolean(TCP_BLOCKING_SERVER, settings.getAsBoolean(TCP_BLOCKING, false)));
         this.port = componentSettings.get("port", settings.get("http.port", "9200-9300"));
         this.bindHost = componentSettings.get("bind_host");
         this.publishHost = componentSettings.get("publish_host");
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/Transport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/Transport.java
index 9c39aa520ac..78550cc2a06 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/Transport.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/Transport.java
@@ -32,34 +32,6 @@ import java.io.IOException;
  */
 public interface Transport extends LifecycleComponent<Transport> {
 
-    class Helper {
-        public static final byte TRANSPORT_TYPE = 1 << 0;
-        public static final byte ERROR = 1 << 1;
-
-        public static boolean isRequest(byte value) {
-            return (value & TRANSPORT_TYPE) == 0;
-        }
-
-        public static byte setRequest(byte value) {
-            value &= ~TRANSPORT_TYPE;
-            return value;
-        }
-
-        public static byte setResponse(byte value) {
-            value |= TRANSPORT_TYPE;
-            return value;
-        }
-
-        public static boolean isError(byte value) {
-            return (value & ERROR) != 0;
-        }
-
-        public static byte setError(byte value) {
-            value |= ERROR;
-            return value;
-        }
-    }
-
     void transportServiceAdapter(TransportServiceAdapter service);
 
     /**
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java
index beeb8fd586f..4ca6c380a74 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java
@@ -34,6 +34,8 @@ public class TransportRequestOptions {
 
     private TimeValue timeout;
 
+    private boolean compress;
+
     public TransportRequestOptions withTimeout(long timeout) {
         return withTimeout(TimeValue.timeValueMillis(timeout));
     }
@@ -43,7 +45,16 @@ public class TransportRequestOptions {
         return this;
     }
 
+    public TransportRequestOptions withCompress() {
+        this.compress = true;
+        return this;
+    }
+
     public TimeValue timeout() {
         return this.timeout;
     }
+
+    public boolean compress() {
+        return this.compress;
+    }
 }
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportResponseOptions.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportResponseOptions.java
index b4e642e552d..bffbbe5ee55 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportResponseOptions.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportResponseOptions.java
@@ -29,4 +29,15 @@ public class TransportResponseOptions {
     public static TransportResponseOptions options() {
         return new TransportResponseOptions();
     }
+
+    private boolean compress;
+
+    public TransportResponseOptions withCompress() {
+        this.compress = true;
+        return this;
+    }
+
+    public boolean compress() {
+        return this.compress;
+    }
 }
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java
index abc2b915e2d..1ee8784c756 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java
@@ -32,6 +32,7 @@ import org.elasticsearch.common.transport.LocalTransportAddress;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.*;
+import org.elasticsearch.transport.support.TransportStreams;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
@@ -40,7 +41,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.*;
-import static org.elasticsearch.transport.Transport.Helper.*;
 
 /**
  * @author kimchy (shay.banon)
@@ -135,11 +135,11 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
     }
 
     @Override public <T extends Streamable> void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException, TransportException {
-        HandlesStreamOutput stream = CachedStreamOutput.cachedHandles();
+        HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesBytes();
 
         stream.writeLong(requestId);
         byte status = 0;
-        status = setRequest(status);
+        status = TransportStreams.statusSetRequest(status);
         stream.writeByte(status); // 0 for request, 1 for response.
 
         stream.writeUTF(action);
@@ -169,12 +169,12 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
         transportServiceAdapter.received(data.length);
 
         StreamInput stream = new BytesStreamInput(data);
-        stream = HandlesStreamInput.Cached.cached(stream);
+        stream = CachedStreamInput.cachedHandles(stream);
 
         try {
             long requestId = stream.readLong();
             byte status = stream.readByte();
-            boolean isRequest = isRequest(status);
+            boolean isRequest = TransportStreams.statusIsRequest(status);
 
             if (isRequest) {
                 handleRequest(stream, requestId, sourceTransport);
@@ -182,7 +182,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
                 final TransportResponseHandler handler = transportServiceAdapter.remove(requestId);
                 // ignore if its null, the adapter logs it
                 if (handler != null) {
-                    if (Transport.Helper.isError(status)) {
+                    if (TransportStreams.statusIsError(status)) {
                         handlerResponseError(stream, handler);
                     } else {
                         handleResponse(stream, handler);
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java
index 09ff76b3cb8..7a105297e43 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java
@@ -24,7 +24,11 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.CachedStreamOutput;
 import org.elasticsearch.common.io.stream.HandlesStreamOutput;
 import org.elasticsearch.common.io.stream.Streamable;
-import org.elasticsearch.transport.*;
+import org.elasticsearch.transport.NotSerializableTransportException;
+import org.elasticsearch.transport.RemoteTransportException;
+import org.elasticsearch.transport.TransportChannel;
+import org.elasticsearch.transport.TransportResponseOptions;
+import org.elasticsearch.transport.support.TransportStreams;
 
 import java.io.IOException;
 import java.io.NotSerializableException;
@@ -59,10 +63,10 @@ public class LocalTransportChannel implements TransportChannel {
     }
 
     @Override public void sendResponse(Streamable message, TransportResponseOptions options) throws IOException {
-        HandlesStreamOutput stream = CachedStreamOutput.cachedHandles();
+        HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesBytes();
         stream.writeLong(requestId);
         byte status = 0;
-        status = Transport.Helper.setResponse(status);
+        status = TransportStreams.statusSetResponse(status);
         stream.writeByte(status); // 0 for request, 1 for response.
         message.writeTo(stream);
         final byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray();
@@ -101,8 +105,8 @@ public class LocalTransportChannel implements TransportChannel {
     private void writeResponseExceptionHeader(BytesStreamOutput stream) throws IOException {
         stream.writeLong(requestId);
         byte status = 0;
-        status = Transport.Helper.setResponse(status);
-        status = Transport.Helper.setError(status);
+        status = TransportStreams.statusSetResponse(status);
+        status = TransportStreams.statusSetError(status);
         stream.writeByte(status);
     }
 }
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/ChannelBufferStreamInput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/ChannelBufferStreamInput.java
index 7866856055e..84c2f8e739b 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/ChannelBufferStreamInput.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/ChannelBufferStreamInput.java
@@ -22,6 +22,7 @@ package org.elasticsearch.transport.netty;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.netty.buffer.ChannelBuffer;
 
+import java.io.EOFException;
 import java.io.IOException;
 
 /**
@@ -32,34 +33,91 @@ import java.io.IOException;
 public class ChannelBufferStreamInput extends StreamInput {
 
     private final ChannelBuffer buffer;
+    private final int startIndex;
+    private final int endIndex;
 
-    public ChannelBufferStreamInput(ChannelBuffer buffer) {
+    public ChannelBufferStreamInput(ChannelBuffer buffer, int length) {
+        if (length > buffer.readableBytes()) {
+            throw new IndexOutOfBoundsException();
+        }
         this.buffer = buffer;
+        startIndex = buffer.readerIndex();
+        endIndex = startIndex + length;
+        buffer.markReaderIndex();
     }
 
-    // Not really maps to InputStream, but good enough for us
+    /**
+     * Returns the number of read bytes by this stream so far.
+     */
+    public int readBytes() {
+        return buffer.readerIndex() - startIndex;
+    }
+
+    @Override public int available() throws IOException {
+        return endIndex - buffer.readerIndex();
+    }
+
+    @Override public void mark(int readlimit) {
+        buffer.markReaderIndex();
+    }
+
+    @Override public boolean markSupported() {
+        return true;
+    }
 
     @Override public int read() throws IOException {
-        return buffer.readByte() & 0xFF;
+        if (available() == 0) {
+            return -1;
+        }
+        return buffer.readByte() & 0xff;
     }
 
-    @Override public int read(byte[] b, int off, int len) throws IOException {
-        readBytes(b, off, len);
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        int available = available();
+        if (available == 0) {
+            return -1;
+        }
+
+        len = Math.min(available, len);
+        buffer.readBytes(b, off, len);
         return len;
     }
 
-    @Override public byte readByte() throws IOException {
-        return buffer.readByte();
-    }
-
-    @Override public void readBytes(byte[] b, int offset, int len) throws IOException {
-        buffer.readBytes(b, offset, len);
-    }
-
     @Override public void reset() throws IOException {
         buffer.resetReaderIndex();
     }
 
+    @Override
+    public long skip(long n) throws IOException {
+        if (n > Integer.MAX_VALUE) {
+            return skipBytes(Integer.MAX_VALUE);
+        } else {
+            return skipBytes((int) n);
+        }
+    }
+
+    public int skipBytes(int n) throws IOException {
+        int nBytes = Math.min(available(), n);
+        buffer.skipBytes(nBytes);
+        return nBytes;
+    }
+
+
+    @Override public byte readByte() throws IOException {
+        if (available() == 0) {
+            throw new EOFException();
+        }
+        return buffer.readByte();
+    }
+
+    @Override public void readBytes(byte[] b, int offset, int len) throws IOException {
+        int read = read(b, offset, len);
+        if (read < len) {
+            throw new EOFException();
+        }
+    }
+
     @Override public void close() throws IOException {
         // nothing to do here
     }
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java
index 062dd5a8dee..370b60d28d5 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java
@@ -20,7 +20,7 @@
 package org.elasticsearch.transport.netty;
 
 import org.elasticsearch.common.io.ThrowableObjectInputStream;
-import org.elasticsearch.common.io.stream.HandlesStreamInput;
+import org.elasticsearch.common.io.stream.CachedStreamInput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.Streamable;
 import org.elasticsearch.common.logging.ESLogger;
@@ -28,11 +28,10 @@ import org.elasticsearch.common.netty.buffer.ChannelBuffer;
 import org.elasticsearch.common.netty.channel.*;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.*;
+import org.elasticsearch.transport.support.TransportStreams;
 
 import java.io.IOException;
 
-import static org.elasticsearch.transport.Transport.Helper.*;
-
 /**
  * @author kimchy (shay.banon)
  */
@@ -68,24 +67,34 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
         int markedReaderIndex = buffer.readerIndex();
         int expectedIndexReader = markedReaderIndex + size;
 
-        StreamInput streamIn = new ChannelBufferStreamInput(buffer);
-        streamIn = HandlesStreamInput.Cached.cached(streamIn);
+        StreamInput streamIn = new ChannelBufferStreamInput(buffer, size);
 
         long requestId = buffer.readLong();
         byte status = buffer.readByte();
-        boolean isRequest = isRequest(status);
+        boolean isRequest = TransportStreams.statusIsRequest(status);
+
+        if (TransportStreams.statusIsCompress(status)) {
+            streamIn = CachedStreamInput.cachedHandlesLzf(streamIn);
+        } else {
+            streamIn = CachedStreamInput.cachedHandles(streamIn);
+        }
 
         if (isRequest) {
             String action = handleRequest(event, streamIn, requestId);
-            if (buffer.readerIndex() < expectedIndexReader) {
-                logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action);
-                buffer.readerIndex(expectedIndexReader);
+            if (buffer.readerIndex() != expectedIndexReader) {
+                if (buffer.readerIndex() < expectedIndexReader) {
+                    logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action);
+                    buffer.readerIndex(expectedIndexReader);
+                } else {
+                    logger.warn("Message read past expected size (request) for [{}] and action [{}], resetting", requestId, action);
+                    buffer.readerIndex(expectedIndexReader);
+                }
             }
         } else {
             TransportResponseHandler handler = transportServiceAdapter.remove(requestId);
             // ignore if its null, the adapter logs it
             if (handler != null) {
-                if (isError(status)) {
+                if (TransportStreams.statusIsError(status)) {
                     handlerResponseError(streamIn, handler);
                 } else {
                     handleResponse(streamIn, handler);
@@ -94,9 +103,14 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
                 // if its null, skip those bytes
                 buffer.readerIndex(markedReaderIndex + size);
             }
-            if (buffer.readerIndex() < expectedIndexReader) {
-                logger.warn("Message not fully read (response) for [{}] and handler {}, resetting", requestId, handler);
-                buffer.readerIndex(expectedIndexReader);
+            if (buffer.readerIndex() != expectedIndexReader) {
+                if (buffer.readerIndex() < expectedIndexReader) {
+                    logger.warn("Message not fully read (response) for [{}] handler {}, error [{}], resetting", requestId, handler, TransportStreams.statusIsError(status));
+                    buffer.readerIndex(expectedIndexReader);
+                } else if (buffer.readerIndex() > expectedIndexReader) {
+                    logger.warn("Message read past expected size (response) for [{}] handler {}, error [{}], resetting", requestId, handler, TransportStreams.statusIsError(status));
+                    buffer.readerIndex(expectedIndexReader);
+                }
             }
         }
     }
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java
index 0fa94911e2f..69746c70128 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java
@@ -26,9 +26,6 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.collect.Lists;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
 import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.common.io.stream.BytesStreamOutput;
-import org.elasticsearch.common.io.stream.CachedStreamOutput;
-import org.elasticsearch.common.io.stream.HandlesStreamOutput;
 import org.elasticsearch.common.io.stream.Streamable;
 import org.elasticsearch.common.netty.OpenChannelsHandler;
 import org.elasticsearch.common.netty.bootstrap.ClientBootstrap;
@@ -53,6 +50,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.*;
+import org.elasticsearch.transport.support.TransportStreams;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -72,7 +70,6 @@ import static org.elasticsearch.common.transport.NetworkExceptionHelper.*;
 import static org.elasticsearch.common.unit.TimeValue.*;
 import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.*;
 import static org.elasticsearch.common.util.concurrent.EsExecutors.*;
-import static org.elasticsearch.transport.Transport.Helper.*;
 
 /**
  * @author kimchy (shay.banon)
@@ -101,6 +98,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
 
     final String publishHost;
 
+    final boolean compress;
+
     final TimeValue connectTimeout;
 
     final Boolean tcpNoDelay;
@@ -145,11 +144,12 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
         this.networkService = networkService;
 
         this.workerCount = componentSettings.getAsInt("worker_count", Runtime.getRuntime().availableProcessors());
-        this.blockingServer = componentSettings.getAsBoolean("transport.tcp.blocking_server", componentSettings.getAsBoolean(TCP_BLOCKING_SERVER, componentSettings.getAsBoolean(TCP_BLOCKING, false)));
-        this.blockingClient = componentSettings.getAsBoolean("transport.tcp.blocking_client", componentSettings.getAsBoolean(TCP_BLOCKING_CLIENT, componentSettings.getAsBoolean(TCP_BLOCKING, false)));
+        this.blockingServer = settings.getAsBoolean("transport.tcp.blocking_server", settings.getAsBoolean(TCP_BLOCKING_SERVER, settings.getAsBoolean(TCP_BLOCKING, false)));
+        this.blockingClient = settings.getAsBoolean("transport.tcp.blocking_client", settings.getAsBoolean(TCP_BLOCKING_CLIENT, settings.getAsBoolean(TCP_BLOCKING, false)));
         this.port = componentSettings.get("port", settings.get("transport.tcp.port", "9300-9400"));
         this.bindHost = componentSettings.get("bind_host");
         this.publishHost = componentSettings.get("publish_host");
+        this.compress = settings.getAsBoolean("transport.tcp.compress", false);
         this.connectTimeout = componentSettings.getAsTime("connect_timeout", settings.getAsTime("transport.tcp.connect_timeout", timeValueSeconds(1)));
         this.tcpNoDelay = componentSettings.getAsBoolean("tcp_no_delay", settings.getAsBoolean(TCP_NO_DELAY, true));
         this.tcpKeepAlive = componentSettings.getAsBoolean("tcp_keep_alive", settings.getAsBoolean(TCP_KEEP_ALIVE, null));
@@ -381,31 +381,16 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
         return new InetSocketTransportAddress((InetSocketAddress) socketAddress);
     }
 
-    private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
-
-    @Override public <T extends Streamable> void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable streamable, TransportRequestOptions options) throws IOException, TransportException {
-
+    @Override public <T extends Streamable> void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException, TransportException {
         Channel targetChannel = nodeChannel(node);
 
-        HandlesStreamOutput stream = CachedStreamOutput.cachedHandles();
-        stream.writeBytes(LENGTH_PLACEHOLDER); // fake size
-
-        stream.writeLong(requestId);
-        byte status = 0;
-        status = setRequest(status);
-        stream.writeByte(status); // 0 for request, 1 for response.
-
-        stream.writeUTF(action);
-        streamable.writeTo(stream);
-
-        byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray();
-        ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(data);
-
-        int size = buffer.writerIndex() - 4;
-        if (size == 0) {
-            throw new ElasticSearchIllegalStateException("Trying to send a stream with 0 size");
+        if (compress) {
+            options.withCompress();
         }
-        buffer.setInt(0, size); // update real size.
+
+        byte[] data = TransportStreams.buildRequest(requestId, action, message, options);
+
+        ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(data);
         ChannelFuture channelFuture = targetChannel.write(buffer);
         // We handle close connection exception in the #exceptionCaught method, which is the main reason we want to add this future
 //        channelFuture.addListener(new ChannelFutureListener() {
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java
index 259aa034b28..adafb73ed53 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java
@@ -22,7 +22,6 @@ package org.elasticsearch.transport.netty;
 import org.elasticsearch.common.io.ThrowableObjectOutputStream;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.CachedStreamOutput;
-import org.elasticsearch.common.io.stream.HandlesStreamOutput;
 import org.elasticsearch.common.io.stream.Streamable;
 import org.elasticsearch.common.netty.buffer.ChannelBuffer;
 import org.elasticsearch.common.netty.buffer.ChannelBuffers;
@@ -31,14 +30,13 @@ import org.elasticsearch.transport.NotSerializableTransportException;
 import org.elasticsearch.transport.RemoteTransportException;
 import org.elasticsearch.transport.TransportChannel;
 import org.elasticsearch.transport.TransportResponseOptions;
+import org.elasticsearch.transport.support.TransportStreams;
 
 import java.io.IOException;
 import java.io.NotSerializableException;
 
-import static org.elasticsearch.transport.Transport.Helper.*;
-
 /**
- * @author kimchy (Shay Banon)
+ * @author kimchy (shay.banon)
  */
 public class NettyTransportChannel implements TransportChannel {
 
@@ -68,17 +66,11 @@ public class NettyTransportChannel implements TransportChannel {
     }
 
     @Override public void sendResponse(Streamable message, TransportResponseOptions options) throws IOException {
-        HandlesStreamOutput stream = CachedStreamOutput.cachedHandles();
-        stream.writeBytes(LENGTH_PLACEHOLDER); // fake size
-        stream.writeLong(requestId);
-        byte status = 0;
-        status = setResponse(status);
-        stream.writeByte(status); // 0 for request, 1 for response.
-        message.writeTo(stream);
-        stream.flush();
-        byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray();
+        if (transport.compress) {
+            options.withCompress();
+        }
+        byte[] data = TransportStreams.buildResponse(requestId, message, options);
         ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(data);
-        buffer.setInt(0, buffer.writerIndex() - 4); // update real size.
         channel.write(buffer);
     }
 
@@ -108,8 +100,8 @@ public class NettyTransportChannel implements TransportChannel {
         stream.writeBytes(LENGTH_PLACEHOLDER);
         stream.writeLong(requestId);
         byte status = 0;
-        status = setResponse(status);
-        status = setError(status);
+        status = TransportStreams.statusSetResponse(status);
+        status = TransportStreams.statusSetError(status);
         stream.writeByte(status);
     }
 }
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/support/TransportStreams.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/support/TransportStreams.java
new file mode 100644
index 00000000000..f4d950cab6f
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/support/TransportStreams.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.transport.support;
+
+import org.elasticsearch.common.io.stream.*;
+import org.elasticsearch.transport.TransportRequestOptions;
+import org.elasticsearch.transport.TransportResponseOptions;
+
+import java.io.IOException;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class TransportStreams {
+
+    public static final int HEADER_SIZE = 4 + 8 + 1;
+
+    public static void writeHeader(byte[] data, int dataLength, long requestId, byte status) {
+        writeInt(data, 0, dataLength + 9);  // add the requestId and the status
+        writeLong(data, 4, requestId);
+        data[12] = status;
+    }
+
+    // same as writeLong in StreamOutput
+
+    private static void writeLong(byte[] buffer, int offset, long value) {
+        buffer[offset++] = ((byte) (value >> 56));
+        buffer[offset++] = ((byte) (value >> 48));
+        buffer[offset++] = ((byte) (value >> 40));
+        buffer[offset++] = ((byte) (value >> 32));
+        buffer[offset++] = ((byte) (value >> 24));
+        buffer[offset++] = ((byte) (value >> 16));
+        buffer[offset++] = ((byte) (value >> 8));
+        buffer[offset] = ((byte) (value));
+    }
+
+    // same as writeInt in StreamOutput
+
+    private static void writeInt(byte[] buffer, int offset, int value) {
+        buffer[offset++] = ((byte) (value >> 24));
+        buffer[offset++] = ((byte) (value >> 16));
+        buffer[offset++] = ((byte) (value >> 8));
+        buffer[offset] = ((byte) (value));
+    }
+
+    private static final byte STATUS_REQRES = 1 << 0;
+    private static final byte STATUS_ERROR = 1 << 1;
+    private static final byte STATUS_COMPRESS = 1 << 2;
+
+    public static boolean statusIsRequest(byte value) {
+        return (value & STATUS_REQRES) == 0;
+    }
+
+    public static byte statusSetRequest(byte value) {
+        value &= ~STATUS_REQRES;
+        return value;
+    }
+
+    public static byte statusSetResponse(byte value) {
+        value |= STATUS_REQRES;
+        return value;
+    }
+
+    public static boolean statusIsError(byte value) {
+        return (value & STATUS_ERROR) != 0;
+    }
+
+    public static byte statusSetError(byte value) {
+        value |= STATUS_ERROR;
+        return value;
+    }
+
+    public static boolean statusIsCompress(byte value) {
+        return (value & STATUS_COMPRESS) != 0;
+    }
+
+    public static byte statusSetCompress(byte value) {
+        value |= STATUS_COMPRESS;
+        return value;
+    }
+
+    public static byte[] buildRequest(final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException {
+        byte status = 0;
+        status = TransportStreams.statusSetRequest(status);
+
+        BytesStreamOutput wrapped;
+        if (options.compress()) {
+            status = TransportStreams.statusSetCompress(status);
+            HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesLzfBytes();
+            stream.writeUTF(action);
+            message.writeTo(stream);
+            stream.flush();
+            wrapped = ((BytesStreamOutput) ((LZFStreamOutput) stream.wrappedOut()).wrappedOut());
+        } else {
+            HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesBytes();
+            stream.writeUTF(action);
+            message.writeTo(stream);
+            stream.flush();
+            wrapped = ((BytesStreamOutput) stream.wrappedOut());
+        }
+
+        byte[] data = new byte[HEADER_SIZE + wrapped.size()];
+        TransportStreams.writeHeader(data, wrapped.size(), requestId, status);
+        System.arraycopy(wrapped.unsafeByteArray(), 0, data, HEADER_SIZE, wrapped.size());
+
+        return data;
+    }
+
+    public static byte[] buildResponse(final long requestId, Streamable message, TransportResponseOptions options) throws IOException {
+        byte status = 0;
+        status = TransportStreams.statusSetResponse(status);
+
+        BytesStreamOutput wrapped;
+        if (options.compress()) {
+            status = TransportStreams.statusSetCompress(status);
+            HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesLzfBytes();
+            message.writeTo(stream);
+            stream.flush();
+            wrapped = ((BytesStreamOutput) ((LZFStreamOutput) stream.wrappedOut()).wrappedOut());
+        } else {
+            HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesBytes();
+            message.writeTo(stream);
+            stream.flush();
+            wrapped = ((BytesStreamOutput) stream.wrappedOut());
+        }
+
+
+        byte[] data = new byte[HEADER_SIZE + wrapped.size()];
+        TransportStreams.writeHeader(data, wrapped.size(), requestId, status);
+        System.arraycopy(wrapped.unsafeByteArray(), 0, data, HEADER_SIZE, wrapped.size());
+
+        return data;
+    }
+}
diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/common/compress/lzf/LZFInputStreamTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/common/compress/lzf/LZFInputStreamTests.java
new file mode 100644
index 00000000000..145ad86f054
--- /dev/null
+++ b/modules/elasticsearch/src/test/java/org/elasticsearch/common/compress/lzf/LZFInputStreamTests.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.compress.lzf;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import java.io.*;
+import java.security.SecureRandom;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class LZFInputStreamTests {
+
+    private static int BUFFER_SIZE = LZFChunk.MAX_CHUNK_LEN * 64;
+    private byte[] nonEncodableBytesToWrite = new byte[BUFFER_SIZE];
+    private byte[] bytesToWrite = new byte[BUFFER_SIZE];
+    private ByteArrayOutputStream nonCompressed;
+    private ByteArrayOutputStream compressed;
+
+    @BeforeTest(alwaysRun = true)
+    public void setUp() throws Exception {
+        SecureRandom.getInstance("SHA1PRNG").nextBytes(nonEncodableBytesToWrite);
+        String phrase = "all work and no play make Jack a dull boy";
+        byte[] bytes = phrase.getBytes();
+        int cursor = 0;
+        while (cursor <= bytesToWrite.length) {
+            System.arraycopy(bytes, 0, bytesToWrite, cursor, (bytes.length + cursor < bytesToWrite.length) ? bytes.length : bytesToWrite.length - cursor);
+            cursor += bytes.length;
+        }
+        nonCompressed = new ByteArrayOutputStream();
+        OutputStream os = new LZFOutputStream(nonCompressed);
+        os.write(nonEncodableBytesToWrite);
+        os.close();
+
+        compressed = new ByteArrayOutputStream();
+        os = new LZFOutputStream(compressed);
+        os.write(bytesToWrite);
+        os.close();
+    }
+
+    @Test
+    public void testDecompressNonEncodableReadByte() throws IOException {
+        doDecompressReadBlock(nonCompressed.toByteArray(), nonEncodableBytesToWrite);
+    }
+
+    @Test
+    public void testDecompressNonEncodableReadBlock() throws IOException {
+        doDecompressReadBlock(nonCompressed.toByteArray(), nonEncodableBytesToWrite);
+    }
+
+    @Test
+    public void testDecompressEncodableReadByte() throws IOException {
+        doDecompressReadBlock(compressed.toByteArray(), bytesToWrite);
+    }
+
+    @Test
+    public void testDecompressEncodableReadBlock() throws IOException {
+        doDecompressReadBlock(compressed.toByteArray(), bytesToWrite);
+    }
+
+    public void doDecompressNonEncodableReadByte(byte[] bytes, byte[] reference) throws IOException {
+        ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+        int outputBytes = 0;
+        InputStream is = new LZFInputStream(bis);
+        int val;
+        while ((val = is.read()) != -1) {
+            byte testVal = (byte) (val & 255);
+            Assert.assertTrue(testVal == reference[outputBytes]);
+            outputBytes++;
+        }
+        Assert.assertTrue(outputBytes == reference.length);
+    }
+
+
+    private void doDecompressReadBlock(byte[] bytes, byte[] reference) throws IOException {
+        ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+        int outputBytes = 0;
+        InputStream is = new LZFInputStream(bis);
+        int val;
+        byte[] buffer = new byte[65536 + 23];
+        while ((val = is.read(buffer)) != -1) {
+            for (int i = 0; i < val; i++) {
+                byte testVal = buffer[i];
+                Assert.assertTrue(testVal == reference[outputBytes]);
+                outputBytes++;
+            }
+        }
+        Assert.assertTrue(outputBytes == reference.length);
+    }
+}
diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/common/compress/lzf/LZFOutputStreamTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/common/compress/lzf/LZFOutputStreamTests.java
new file mode 100644
index 00000000000..8451db30776
--- /dev/null
+++ b/modules/elasticsearch/src/test/java/org/elasticsearch/common/compress/lzf/LZFOutputStreamTests.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.compress.lzf;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+import java.security.SecureRandom;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class LZFOutputStreamTests {
+
+    private static int BUFFER_SIZE = LZFChunk.MAX_CHUNK_LEN * 64;
+    private byte[] nonEncodableBytesToWrite = new byte[BUFFER_SIZE];
+    private byte[] bytesToWrite = new byte[BUFFER_SIZE];
+
+    @BeforeTest(alwaysRun = true)
+    public void setUp() throws Exception {
+        SecureRandom.getInstance("SHA1PRNG").nextBytes(nonEncodableBytesToWrite);
+        String phrase = "all work and no play make Jack a dull boy";
+        byte[] bytes = phrase.getBytes();
+        int cursor = 0;
+        while (cursor <= bytesToWrite.length) {
+            System.arraycopy(bytes, 0, bytesToWrite, cursor, (bytes.length + cursor < bytesToWrite.length) ? bytes.length : bytesToWrite.length - cursor);
+            cursor += bytes.length;
+        }
+    }
+
+    @Test
+    public void testUnencodable() throws Exception {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        OutputStream os = new LZFOutputStream(bos);
+        os.write(nonEncodableBytesToWrite);
+        os.close();
+        Assert.assertTrue(bos.toByteArray().length > nonEncodableBytesToWrite.length);
+    }
+
+    @Test
+    public void testStreaming() throws Exception {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        OutputStream os = new LZFOutputStream(bos);
+        os.write(bytesToWrite);
+        os.close();
+        Assert.assertTrue(bos.toByteArray().length > 10);
+        Assert.assertTrue(bos.toByteArray().length < bytesToWrite.length * .5);
+    }
+
+    @Test
+    public void testSingleByte() throws Exception {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        OutputStream os = new LZFOutputStream(bos);
+        for (int idx = 0; idx < BUFFER_SIZE; idx++) {
+            os.write(bytesToWrite[idx]);
+            if (idx % 1023 == 0) {
+                os.flush();
+            }
+        }
+        os.close();
+        Assert.assertTrue(bos.toByteArray().length > 10);
+        Assert.assertTrue(bos.toByteArray().length < bytesToWrite.length * .5);
+    }
+
+    @Test
+    public void testPartialBuffer() throws Exception {
+        int offset = 255;
+        int len = 1 << 17;
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        OutputStream os = new LZFOutputStream(bos);
+        os.write(bytesToWrite, offset, len);
+        os.close();
+        Assert.assertTrue(bos.toByteArray().length > 10);
+        Assert.assertTrue(bos.toByteArray().length < bytesToWrite.length * .5);
+    }
+}
diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java
index e8cb04110fd..e9617b31e66 100644
--- a/modules/elasticsearch/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java
+++ b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java
@@ -116,6 +116,54 @@ public abstract class AbstractSimpleTransportTests {
         System.out.println("after ...");
     }
 
+
+    @Test public void testHelloWorldCompressed() {
+        serviceA.registerHandler("sayHello", new BaseTransportRequestHandler<StringMessage>() {
+            @Override public StringMessage newInstance() {
+                return new StringMessage();
+            }
+
+            @Override public void messageReceived(StringMessage request, TransportChannel channel) {
+                System.out.println("got message: " + request.message);
+                assertThat("moshe", equalTo(request.message));
+                try {
+                    channel.sendResponse(new StringMessage("hello " + request.message), TransportResponseOptions.options().withCompress());
+                } catch (IOException e) {
+                    e.printStackTrace();
+                    assertThat(e.getMessage(), false, equalTo(true));
+                }
+            }
+        });
+
+        TransportFuture<StringMessage> res = serviceB.submitRequest(serviceANode, "sayHello",
+                new StringMessage("moshe"), TransportRequestOptions.options().withCompress(), new BaseTransportResponseHandler<StringMessage>() {
+                    @Override public StringMessage newInstance() {
+                        return new StringMessage();
+                    }
+
+                    @Override public void handleResponse(StringMessage response) {
+                        System.out.println("got response: " + response.message);
+                        assertThat("hello moshe", equalTo(response.message));
+                    }
+
+                    @Override public void handleException(RemoteTransportException exp) {
+                        exp.printStackTrace();
+                        assertThat("got exception instead of a response: " + exp.getMessage(), false, equalTo(true));
+                    }
+                });
+
+        try {
+            StringMessage message = res.get();
+            assertThat("hello moshe", equalTo(message.message));
+        } catch (Exception e) {
+            assertThat(e.getMessage(), false, equalTo(true));
+        }
+
+        serviceA.removeHandler("sayHello");
+
+        System.out.println("after ...");
+    }
+
     @Test public void testErrorMessage() {
         serviceA.registerHandler("sayHelloException", new BaseTransportRequestHandler<StringMessage>() {
             @Override public StringMessage newInstance() {
diff --git a/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransport.java b/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransport.java
index 214bfc3787c..a14485a57fc 100644
--- a/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransport.java
+++ b/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransport.java
@@ -94,7 +94,7 @@ public class NettyMemcachedServerTransport extends AbstractLifecycleComponent<Me
         this.networkService = networkService;
 
         this.workerCount = componentSettings.getAsInt("worker_count", Runtime.getRuntime().availableProcessors());
-        this.blockingServer = componentSettings.getAsBoolean("memcached.blocking_server", componentSettings.getAsBoolean(TCP_BLOCKING_SERVER, componentSettings.getAsBoolean(TCP_BLOCKING, false)));
+        this.blockingServer = componentSettings.getAsBoolean("memcached.blocking_server", settings.getAsBoolean(TCP_BLOCKING_SERVER, settings.getAsBoolean(TCP_BLOCKING, false)));
         this.port = componentSettings.get("port", settings.get("memcached.port", "11211-11311"));
         this.bindHost = componentSettings.get("bind_host");
         this.publishHost = componentSettings.get("publish_host");