Transport: add global compression support compressing all internal transport communication (using lzf), closes #321.

This commit is contained in:
kimchy 2010-08-15 02:57:22 +03:00
parent c10544479f
commit 1ee2f80e68
31 changed files with 1017 additions and 149 deletions

View File

@ -39,6 +39,7 @@
<w>deserialize</w>
<w>docid</w>
<w>elasticsearch</w>
<w>encodable</w>
<w>estab</w>
<w>failover</w>
<w>flushable</w>

View File

@ -1,5 +1,5 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Elastic Search Tests" type="TestNG" factoryName="TestNG">
<configuration default="false" name="ElasticSearch Tests" type="TestNG" factoryName="TestNG">
<extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea" />
<extension name="snapshooter" />
<module name="" />

View File

@ -1,5 +1,5 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Elastic Search Tests (Local)" type="TestNG" factoryName="TestNG">
<configuration default="false" name="ElasticSearch Tests (Local)" type="TestNG" factoryName="TestNG">
<extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea" />
<extension name="snapshooter" />
<module name="" />
@ -32,7 +32,7 @@
<option name="LOCAL" value="true" />
</RunnerSettings>
<RunnerSettings RunnerId="Profile ">
<option name="myExternalizedOptions" value="&#10;snapshots-dir=&#10;additional-options2=onexit\=snapshot&#10;" />
<option name="myExternalizedOptions" value="&#10;additional-options2=onexit\=snapshot&#10;" />
</RunnerSettings>
<RunnerSettings RunnerId="Run" />
<ConfigurationWrapper RunnerId="Debug" />

View File

@ -0,0 +1,40 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="ElasticSearch Tests (compress)" type="TestNG" factoryName="TestNG">
<extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea" />
<extension name="snapshooter" />
<module name="" />
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false" />
<option name="ALTERNATIVE_JRE_PATH" value="" />
<option name="SUITE_NAME" value="" />
<option name="PACKAGE_NAME" value="" />
<option name="MAIN_CLASS_NAME" value="" />
<option name="METHOD_NAME" value="" />
<option name="GROUP_NAME" value="" />
<option name="TEST_OBJECT" value="PACKAGE" />
<option name="VM_PARAMETERS" value="-Des.transport.tcp.compress=true -Xmx512m" />
<option name="PARAMETERS" value="" />
<option name="WORKING_DIRECTORY" value="file://$PROJECT_DIR$" />
<option name="OUTPUT_DIRECTORY" value="" />
<option name="ANNOTATION_TYPE" value="JDK" />
<option name="ENV_VARIABLES" />
<option name="PASS_PARENT_ENVS" value="true" />
<option name="TEST_SEARCH_SCOPE">
<value defaultName="wholeProject" />
</option>
<option name="USE_DEFAULT_REPORTERS" value="false" />
<option name="PROPERTIES_FILE" value="" />
<envs />
<properties />
<listeners />
<RunnerSettings RunnerId="Debug">
<option name="DEBUG_PORT" value="57221" />
<option name="TRANSPORT" value="0" />
<option name="LOCAL" value="true" />
</RunnerSettings>
<RunnerSettings RunnerId="Profile ">
<option name="myExternalizedOptions" value="&#10;additional-options2=onexit\=snapshot&#10;" />
</RunnerSettings>
<ConfigurationWrapper RunnerId="Debug" />
<method />
</configuration>
</component>

View File

@ -30,6 +30,9 @@
package org.elasticsearch.common.compress.lzf;
import java.io.IOException;
import java.io.OutputStream;
/**
* Class that handles actual encoding of individual chunks.
* Resulting chunks can be compressed or non-compressed; compression
@ -77,6 +80,23 @@ public class ChunkEncoder {
_encodeBuffer = new byte[bufferLen];
}
/**
* Method for compressing (or not) individual chunks
*/
public int encodeChunk(OutputStream os, byte[] data, int offset, int len) throws IOException {
if (len >= MIN_BLOCK_TO_COMPRESS) {
/* If we have non-trivial block, and can compress it by at least
* 2 bytes (since header is 2 bytes longer), let's compress:
*/
int compLen = tryCompress(data, offset, offset + len, _encodeBuffer, 0);
if (compLen < (len - 2)) { // nah; just return uncompressed
return LZFChunk.createCompressed(os, len, _encodeBuffer, 0, compLen);
}
}
// Otherwise leave uncompressed:
return LZFChunk.createNonCompressed(os, data, offset, len);
}
/**
* Method for compressing (or not) individual chunks
*/

View File

@ -30,6 +30,9 @@
package org.elasticsearch.common.compress.lzf;
import java.io.IOException;
import java.io.OutputStream;
/**
* Helper class used to store LZF encoded segments (compressed and non-compressed)
* that can be sequenced to produce LZF files/streams.
@ -59,6 +62,18 @@ public class LZFChunk {
_data = data;
}
public static int createCompressed(OutputStream os, int origLen, byte[] encData, int encPtr, int encLen) throws IOException {
os.write(BYTE_Z);
os.write(BYTE_V);
os.write(BLOCK_TYPE_COMPRESSED);
os.write(encLen >> 8);
os.write(encLen);
os.write((origLen >> 8));
os.write(origLen);
os.write(encData, encPtr, encLen);
return encLen + 7;
}
/**
* Factory method for constructing compressed chunk
*/
@ -75,6 +90,16 @@ public class LZFChunk {
return new LZFChunk(result);
}
public static int createNonCompressed(OutputStream os, byte[] plainData, int ptr, int len) throws IOException {
os.write(BYTE_Z);
os.write(BYTE_V);
os.write(BLOCK_TYPE_NON_COMPRESSED);
os.write(len >> 8);
os.write(len);
os.write(plainData, ptr, len);
return len + 5;
}
/**
* Factory method for constructing compressed chunk
*/

View File

@ -31,6 +31,7 @@
package org.elasticsearch.common.compress.lzf;
import java.io.IOException;
import java.io.OutputStream;
/**
* Encoder that handles splitting of input into chunks to encode,
@ -49,6 +50,27 @@ public class LZFEncoder {
return encode(data, data.length);
}
public static void encode(OutputStream os, byte[] data, int length) throws IOException {
int left = length;
ChunkEncoder enc = new ChunkEncoder(left);
int chunkLen = Math.min(LZFChunk.MAX_CHUNK_LEN, left);
enc.encodeChunk(os, data, 0, chunkLen);
left -= chunkLen;
// shortcut: if it all fit in, no need to coalesce:
if (left < 1) {
return;
}
int inputOffset = chunkLen;
do {
chunkLen = Math.min(left, LZFChunk.MAX_CHUNK_LEN);
enc.encodeChunk(os, data, inputOffset, chunkLen);
inputOffset += chunkLen;
left -= chunkLen;
} while (left > 0);
}
/**
* Method for compressing given input data using LZF encoding and
* block structure (compatible with lzf command line utility).

View File

@ -61,20 +61,26 @@ public class LZFInputStream extends InputStream {
}
public int read(final byte[] buffer, final int offset, final int length) throws IOException {
// FIXED HERE: handle 0 length cases
if (length == 0) {
return 0;
}
int outputPos = offset;
readyBuffer();
if (bufferLength == -1) {
return -1;
}
while (outputPos < buffer.length && bufferPosition < bufferLength) {
int chunkLength = Math.min(bufferLength - bufferPosition, buffer.length - outputPos);
// FIXED HERE: fixed to use length
while (outputPos < length && bufferPosition < bufferLength) {
int chunkLength = Math.min(bufferLength - bufferPosition, length - outputPos);
System.arraycopy(uncompressedBytes, bufferPosition, buffer, outputPos, chunkLength);
outputPos += chunkLength;
bufferPosition += chunkLength;
readyBuffer();
}
return outputPos;
// FIXED HERE: fixed to return actual length read
return outputPos - offset;
}
public void close() throws IOException {

View File

@ -28,7 +28,8 @@ public class LZFOutputStream extends OutputStream {
private final OutputStream outputStream;
private byte[] outputBuffer = new byte[OUTPUT_BUFFER_SIZE];
private final byte[] outputBuffer = new byte[OUTPUT_BUFFER_SIZE];
private final ChunkEncoder encoder = new ChunkEncoder(OUTPUT_BUFFER_SIZE);
private int position = 0;
public LZFOutputStream(final OutputStream outputStream) {
@ -81,8 +82,7 @@ public class LZFOutputStream extends OutputStream {
* Compress and write the current block to the OutputStream
*/
private void writeCompressedBlock() throws IOException {
final byte[] compressedBytes = LZFEncoder.encode(outputBuffer, position);
outputStream.write(compressedBytes);
encoder.encodeChunk(outputStream, outputBuffer, 0, position);
position = 0;
}
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.thread.ThreadLocals;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class CachedStreamInput {
static class Entry {
final HandlesStreamInput handles;
final LZFStreamInput lzf;
Entry(HandlesStreamInput handles, LZFStreamInput lzf) {
this.handles = handles;
this.lzf = lzf;
}
}
private static final ThreadLocal<ThreadLocals.CleanableValue<Entry>> cache = new ThreadLocal<ThreadLocals.CleanableValue<Entry>>() {
@Override protected ThreadLocals.CleanableValue<Entry> initialValue() {
HandlesStreamInput handles = new HandlesStreamInput();
LZFStreamInput lzf = new LZFStreamInput();
return new ThreadLocals.CleanableValue<Entry>(new Entry(handles, lzf));
}
};
public static HandlesStreamInput cachedHandles(StreamInput in) {
HandlesStreamInput handles = cache.get().get().handles;
handles.reset(in);
return handles;
}
public static HandlesStreamInput cachedHandlesLzf(StreamInput in) throws IOException {
Entry entry = cache.get().get();
entry.lzf.reset(in);
entry.handles.reset(entry.lzf);
return entry.handles;
}
}

View File

@ -31,10 +31,12 @@ public class CachedStreamOutput {
static class Entry {
final BytesStreamOutput bytes;
final HandlesStreamOutput handles;
final LZFStreamOutput lzf;
Entry(BytesStreamOutput bytes, HandlesStreamOutput handles) {
Entry(BytesStreamOutput bytes, HandlesStreamOutput handles, LZFStreamOutput lzf) {
this.bytes = bytes;
this.handles = handles;
this.lzf = lzf;
}
}
@ -42,7 +44,8 @@ public class CachedStreamOutput {
@Override protected ThreadLocals.CleanableValue<Entry> initialValue() {
BytesStreamOutput bytes = new BytesStreamOutput();
HandlesStreamOutput handles = new HandlesStreamOutput(bytes);
return new ThreadLocals.CleanableValue<Entry>(new Entry(bytes, handles));
LZFStreamOutput lzf = new LZFStreamOutput(bytes);
return new ThreadLocals.CleanableValue<Entry>(new Entry(bytes, handles, lzf));
}
};
@ -55,9 +58,23 @@ public class CachedStreamOutput {
return os;
}
public static HandlesStreamOutput cachedHandles() throws IOException {
HandlesStreamOutput os = cache.get().get().handles;
os.reset();
public static LZFStreamOutput cachedLZFBytes() throws IOException {
LZFStreamOutput lzf = cache.get().get().lzf;
lzf.reset();
return lzf;
}
public static HandlesStreamOutput cachedHandlesLzfBytes() throws IOException {
Entry entry = cache.get().get();
HandlesStreamOutput os = entry.handles;
os.reset(entry.lzf);
return os;
}
public static HandlesStreamOutput cachedHandlesBytes() throws IOException {
Entry entry = cache.get().get();
HandlesStreamOutput os = entry.handles;
os.reset(entry.bytes);
return os;
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.trove.TIntObjectHashMap;
import java.io.IOException;
@ -29,25 +28,6 @@ import java.io.IOException;
*/
public class HandlesStreamInput extends StreamInput {
public static class Cached {
private static final ThreadLocal<ThreadLocals.CleanableValue<HandlesStreamInput>> cache = new ThreadLocal<ThreadLocals.CleanableValue<HandlesStreamInput>>() {
@Override protected ThreadLocals.CleanableValue<HandlesStreamInput> initialValue() {
return new ThreadLocals.CleanableValue<HandlesStreamInput>(new HandlesStreamInput());
}
};
/**
* Returns the cached thread local byte stream, with its internal stream cleared.
*/
public static HandlesStreamInput cached(StreamInput in) {
HandlesStreamInput os = cache.get().get();
os.reset(in);
return os;
}
}
private StreamInput in;
private final TIntObjectHashMap<String> handles = new TIntObjectHashMap<String>();

View File

@ -35,7 +35,7 @@ public class HandlesStreamOutput extends StreamOutput {
// a threshold above which strings will use identity check
private final int identityThreshold;
private final StreamOutput out;
private StreamOutput out;
private final TObjectIntHashMap<String> handles = new ExtTObjectIntHasMap<String>().defaultReturnValue(-1);
@ -91,6 +91,11 @@ public class HandlesStreamOutput extends StreamOutput {
out.reset();
}
public void reset(StreamOutput out) throws IOException {
this.out = out;
reset();
}
@Override public void flush() throws IOException {
out.flush();
}

View File

@ -0,0 +1,130 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.compress.lzf.LZFChunk;
import org.elasticsearch.common.compress.lzf.LZFDecoder;
import java.io.EOFException;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class LZFStreamInput extends StreamInput {
public static int EOF_FLAG = -1;
/* the current buffer of compressed bytes */
private final byte[] compressedBytes = new byte[LZFChunk.MAX_CHUNK_LEN];
/* the buffer of uncompressed bytes from which */
private final byte[] uncompressedBytes = new byte[LZFChunk.MAX_CHUNK_LEN];
/* The current position (next char to output) in the uncompressed bytes buffer. */
private int bufferPosition = 0;
/* Length of the current uncompressed bytes buffer */
private int bufferLength = 0;
private StreamInput in;
public LZFStreamInput() {
}
public LZFStreamInput(StreamInput in) throws IOException {
this.in = in;
// we need to read the first buffer here, since it might be a VOID message, and we need to at least read the LZF header
readyBuffer();
}
@Override public int read() throws IOException {
int returnValue = EOF_FLAG;
readyBuffer();
if (bufferPosition < bufferLength) {
returnValue = (uncompressedBytes[bufferPosition++] & 255);
}
return returnValue;
}
@Override public int read(byte[] b, int off, int len) throws IOException {
if (len == 0) {
return 0;
}
int outputPos = off;
readyBuffer();
if (bufferLength == -1) {
return -1;
}
while (outputPos < len && bufferPosition < bufferLength) {
int chunkLength = Math.min(bufferLength - bufferPosition, len - outputPos);
System.arraycopy(uncompressedBytes, bufferPosition, b, outputPos, chunkLength);
outputPos += chunkLength;
bufferPosition += chunkLength;
readyBuffer();
}
return outputPos - off;
}
@Override public byte readByte() throws IOException {
readyBuffer();
if (bufferPosition < bufferLength) {
return (uncompressedBytes[bufferPosition++]);
}
throw new EOFException();
}
@Override public void readBytes(byte[] b, int offset, int len) throws IOException {
int result = read(b, offset, len);
if (result < len) {
throw new EOFException();
}
}
@Override public void reset() throws IOException {
this.bufferPosition = 0;
this.bufferLength = 0;
in.reset();
}
public void reset(StreamInput in) throws IOException {
this.in = in;
this.bufferPosition = 0;
this.bufferLength = 0;
// we need to read the first buffer here, since it might be a VOID message, and we need to at least read the LZF header
readyBuffer();
}
@Override public void close() throws IOException {
in.close();
}
/**
* Fill the uncompressed bytes buffer by reading the underlying inputStream.
*
* @throws java.io.IOException
*/
private void readyBuffer() throws IOException {
if (bufferPosition >= bufferLength) {
bufferLength = LZFDecoder.decompressChunk(in, compressedBytes, uncompressedBytes);
bufferPosition = 0;
}
}
}

View File

@ -0,0 +1,109 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.compress.lzf.ChunkEncoder;
import org.elasticsearch.common.compress.lzf.LZFChunk;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class LZFStreamOutput extends StreamOutput {
private StreamOutput out;
private final byte[] outputBuffer = new byte[LZFChunk.MAX_CHUNK_LEN];
private final ChunkEncoder encoder = new ChunkEncoder(LZFChunk.MAX_CHUNK_LEN);
private int position = 0;
public LZFStreamOutput(StreamOutput out) {
this.out = out;
}
@Override public void write(final int singleByte) throws IOException {
if (position >= outputBuffer.length) {
writeCompressedBlock();
}
outputBuffer[position++] = (byte) (singleByte & 0xff);
}
@Override public void writeByte(byte b) throws IOException {
if (position >= outputBuffer.length) {
writeCompressedBlock();
}
outputBuffer[position++] = b;
}
@Override public void writeBytes(byte[] b, int offset, int length) throws IOException {
int inputCursor = offset;
int remainingBytes = length;
while (remainingBytes > 0) {
if (position >= outputBuffer.length) {
writeCompressedBlock();
}
int chunkLength = (remainingBytes > (outputBuffer.length - position)) ? outputBuffer.length - position : remainingBytes;
System.arraycopy(b, inputCursor, outputBuffer, position, chunkLength);
position += chunkLength;
remainingBytes -= chunkLength;
inputCursor += chunkLength;
}
}
@Override public void flush() throws IOException {
try {
writeCompressedBlock();
} finally {
out.flush();
}
}
@Override public void close() throws IOException {
try {
flush();
} finally {
out.close();
}
}
@Override public void reset() throws IOException {
this.position = 0;
out.reset();
}
public void reset(StreamOutput out) throws IOException {
this.out = out;
reset();
}
public StreamOutput wrappedOut() {
return this.out;
}
/**
* Compress and write the current block to the OutputStream
*/
private void writeCompressedBlock() throws IOException {
encoder.encodeChunk(out, outputBuffer, 0, position);
position = 0;
}
}

View File

@ -221,7 +221,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
private void sendPingRequest(int id) {
synchronized (sendMutex) {
try {
HandlesStreamOutput out = CachedStreamOutput.cachedHandles();
HandlesStreamOutput out = CachedStreamOutput.cachedHandlesBytes();
out.writeInt(id);
clusterName.writeTo(out);
nodesProvider.nodes().localNode().writeTo(out);
@ -311,7 +311,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
continue;
}
try {
StreamInput input = HandlesStreamInput.Cached.cached(new BytesStreamInput(datagramPacketReceive.getData(), datagramPacketReceive.getOffset(), datagramPacketReceive.getLength()));
StreamInput input = CachedStreamInput.cachedHandles(new BytesStreamInput(datagramPacketReceive.getData(), datagramPacketReceive.getOffset(), datagramPacketReceive.getLength()));
id = input.readInt();
clusterName = ClusterName.readClusterName(input);
requestingNodeX = readNode(input);

View File

@ -107,7 +107,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
this.networkService = networkService;
ByteSizeValue maxContentLength = componentSettings.getAsBytesSize("max_content_length", settings.getAsBytesSize("http.max_content_length", new ByteSizeValue(100, ByteSizeUnit.MB)));
this.workerCount = componentSettings.getAsInt("worker_count", Runtime.getRuntime().availableProcessors());
this.blockingServer = componentSettings.getAsBoolean("http.blocking_server", componentSettings.getAsBoolean(TCP_BLOCKING_SERVER, componentSettings.getAsBoolean(TCP_BLOCKING, false)));
this.blockingServer = settings.getAsBoolean("http.blocking_server", settings.getAsBoolean(TCP_BLOCKING_SERVER, settings.getAsBoolean(TCP_BLOCKING, false)));
this.port = componentSettings.get("port", settings.get("http.port", "9200-9300"));
this.bindHost = componentSettings.get("bind_host");
this.publishHost = componentSettings.get("publish_host");

View File

@ -32,34 +32,6 @@ import java.io.IOException;
*/
public interface Transport extends LifecycleComponent<Transport> {
class Helper {
public static final byte TRANSPORT_TYPE = 1 << 0;
public static final byte ERROR = 1 << 1;
public static boolean isRequest(byte value) {
return (value & TRANSPORT_TYPE) == 0;
}
public static byte setRequest(byte value) {
value &= ~TRANSPORT_TYPE;
return value;
}
public static byte setResponse(byte value) {
value |= TRANSPORT_TYPE;
return value;
}
public static boolean isError(byte value) {
return (value & ERROR) != 0;
}
public static byte setError(byte value) {
value |= ERROR;
return value;
}
}
void transportServiceAdapter(TransportServiceAdapter service);
/**

View File

@ -34,6 +34,8 @@ public class TransportRequestOptions {
private TimeValue timeout;
private boolean compress;
public TransportRequestOptions withTimeout(long timeout) {
return withTimeout(TimeValue.timeValueMillis(timeout));
}
@ -43,7 +45,16 @@ public class TransportRequestOptions {
return this;
}
public TransportRequestOptions withCompress() {
this.compress = true;
return this;
}
public TimeValue timeout() {
return this.timeout;
}
public boolean compress() {
return this.compress;
}
}

View File

@ -29,4 +29,15 @@ public class TransportResponseOptions {
public static TransportResponseOptions options() {
return new TransportResponseOptions();
}
private boolean compress;
public TransportResponseOptions withCompress() {
this.compress = true;
return this;
}
public boolean compress() {
return this.compress;
}
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.support.TransportStreams;
import javax.annotation.Nullable;
import java.io.IOException;
@ -40,7 +41,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.*;
import static org.elasticsearch.transport.Transport.Helper.*;
/**
* @author kimchy (shay.banon)
@ -135,11 +135,11 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
}
@Override public <T extends Streamable> void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException, TransportException {
HandlesStreamOutput stream = CachedStreamOutput.cachedHandles();
HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesBytes();
stream.writeLong(requestId);
byte status = 0;
status = setRequest(status);
status = TransportStreams.statusSetRequest(status);
stream.writeByte(status); // 0 for request, 1 for response.
stream.writeUTF(action);
@ -169,12 +169,12 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
transportServiceAdapter.received(data.length);
StreamInput stream = new BytesStreamInput(data);
stream = HandlesStreamInput.Cached.cached(stream);
stream = CachedStreamInput.cachedHandles(stream);
try {
long requestId = stream.readLong();
byte status = stream.readByte();
boolean isRequest = isRequest(status);
boolean isRequest = TransportStreams.statusIsRequest(status);
if (isRequest) {
handleRequest(stream, requestId, sourceTransport);
@ -182,7 +182,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
final TransportResponseHandler handler = transportServiceAdapter.remove(requestId);
// ignore if its null, the adapter logs it
if (handler != null) {
if (Transport.Helper.isError(status)) {
if (TransportStreams.statusIsError(status)) {
handlerResponseError(stream, handler);
} else {
handleResponse(stream, handler);

View File

@ -24,7 +24,11 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.io.stream.HandlesStreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.NotSerializableTransportException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportResponseOptions;
import org.elasticsearch.transport.support.TransportStreams;
import java.io.IOException;
import java.io.NotSerializableException;
@ -59,10 +63,10 @@ public class LocalTransportChannel implements TransportChannel {
}
@Override public void sendResponse(Streamable message, TransportResponseOptions options) throws IOException {
HandlesStreamOutput stream = CachedStreamOutput.cachedHandles();
HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesBytes();
stream.writeLong(requestId);
byte status = 0;
status = Transport.Helper.setResponse(status);
status = TransportStreams.statusSetResponse(status);
stream.writeByte(status); // 0 for request, 1 for response.
message.writeTo(stream);
final byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray();
@ -101,8 +105,8 @@ public class LocalTransportChannel implements TransportChannel {
private void writeResponseExceptionHeader(BytesStreamOutput stream) throws IOException {
stream.writeLong(requestId);
byte status = 0;
status = Transport.Helper.setResponse(status);
status = Transport.Helper.setError(status);
status = TransportStreams.statusSetResponse(status);
status = TransportStreams.statusSetError(status);
stream.writeByte(status);
}
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.transport.netty;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.netty.buffer.ChannelBuffer;
import java.io.EOFException;
import java.io.IOException;
/**
@ -32,34 +33,91 @@ import java.io.IOException;
public class ChannelBufferStreamInput extends StreamInput {
private final ChannelBuffer buffer;
private final int startIndex;
private final int endIndex;
public ChannelBufferStreamInput(ChannelBuffer buffer) {
public ChannelBufferStreamInput(ChannelBuffer buffer, int length) {
if (length > buffer.readableBytes()) {
throw new IndexOutOfBoundsException();
}
this.buffer = buffer;
startIndex = buffer.readerIndex();
endIndex = startIndex + length;
buffer.markReaderIndex();
}
// Not really maps to InputStream, but good enough for us
/**
* Returns the number of read bytes by this stream so far.
*/
public int readBytes() {
return buffer.readerIndex() - startIndex;
}
@Override public int available() throws IOException {
return endIndex - buffer.readerIndex();
}
@Override public void mark(int readlimit) {
buffer.markReaderIndex();
}
@Override public boolean markSupported() {
return true;
}
@Override public int read() throws IOException {
return buffer.readByte() & 0xFF;
if (available() == 0) {
return -1;
}
return buffer.readByte() & 0xff;
}
@Override public int read(byte[] b, int off, int len) throws IOException {
readBytes(b, off, len);
@Override
public int read(byte[] b, int off, int len) throws IOException {
int available = available();
if (available == 0) {
return -1;
}
len = Math.min(available, len);
buffer.readBytes(b, off, len);
return len;
}
@Override public byte readByte() throws IOException {
return buffer.readByte();
}
@Override public void readBytes(byte[] b, int offset, int len) throws IOException {
buffer.readBytes(b, offset, len);
}
@Override public void reset() throws IOException {
buffer.resetReaderIndex();
}
@Override
public long skip(long n) throws IOException {
if (n > Integer.MAX_VALUE) {
return skipBytes(Integer.MAX_VALUE);
} else {
return skipBytes((int) n);
}
}
public int skipBytes(int n) throws IOException {
int nBytes = Math.min(available(), n);
buffer.skipBytes(nBytes);
return nBytes;
}
@Override public byte readByte() throws IOException {
if (available() == 0) {
throw new EOFException();
}
return buffer.readByte();
}
@Override public void readBytes(byte[] b, int offset, int len) throws IOException {
int read = read(b, offset, len);
if (read < len) {
throw new EOFException();
}
}
@Override public void close() throws IOException {
// nothing to do here
}

View File

@ -20,7 +20,7 @@
package org.elasticsearch.transport.netty;
import org.elasticsearch.common.io.ThrowableObjectInputStream;
import org.elasticsearch.common.io.stream.HandlesStreamInput;
import org.elasticsearch.common.io.stream.CachedStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.logging.ESLogger;
@ -28,11 +28,10 @@ import org.elasticsearch.common.netty.buffer.ChannelBuffer;
import org.elasticsearch.common.netty.channel.*;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.support.TransportStreams;
import java.io.IOException;
import static org.elasticsearch.transport.Transport.Helper.*;
/**
* @author kimchy (shay.banon)
*/
@ -68,24 +67,34 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
int markedReaderIndex = buffer.readerIndex();
int expectedIndexReader = markedReaderIndex + size;
StreamInput streamIn = new ChannelBufferStreamInput(buffer);
streamIn = HandlesStreamInput.Cached.cached(streamIn);
StreamInput streamIn = new ChannelBufferStreamInput(buffer, size);
long requestId = buffer.readLong();
byte status = buffer.readByte();
boolean isRequest = isRequest(status);
boolean isRequest = TransportStreams.statusIsRequest(status);
if (TransportStreams.statusIsCompress(status)) {
streamIn = CachedStreamInput.cachedHandlesLzf(streamIn);
} else {
streamIn = CachedStreamInput.cachedHandles(streamIn);
}
if (isRequest) {
String action = handleRequest(event, streamIn, requestId);
if (buffer.readerIndex() < expectedIndexReader) {
logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action);
buffer.readerIndex(expectedIndexReader);
if (buffer.readerIndex() != expectedIndexReader) {
if (buffer.readerIndex() < expectedIndexReader) {
logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action);
buffer.readerIndex(expectedIndexReader);
} else {
logger.warn("Message read past expected size (request) for [{}] and action [{}], resetting", requestId, action);
buffer.readerIndex(expectedIndexReader);
}
}
} else {
TransportResponseHandler handler = transportServiceAdapter.remove(requestId);
// ignore if its null, the adapter logs it
if (handler != null) {
if (isError(status)) {
if (TransportStreams.statusIsError(status)) {
handlerResponseError(streamIn, handler);
} else {
handleResponse(streamIn, handler);
@ -94,9 +103,14 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
// if its null, skip those bytes
buffer.readerIndex(markedReaderIndex + size);
}
if (buffer.readerIndex() < expectedIndexReader) {
logger.warn("Message not fully read (response) for [{}] and handler {}, resetting", requestId, handler);
buffer.readerIndex(expectedIndexReader);
if (buffer.readerIndex() != expectedIndexReader) {
if (buffer.readerIndex() < expectedIndexReader) {
logger.warn("Message not fully read (response) for [{}] handler {}, error [{}], resetting", requestId, handler, TransportStreams.statusIsError(status));
buffer.readerIndex(expectedIndexReader);
} else if (buffer.readerIndex() > expectedIndexReader) {
logger.warn("Message read past expected size (response) for [{}] handler {}, error [{}], resetting", requestId, handler, TransportStreams.statusIsError(status));
buffer.readerIndex(expectedIndexReader);
}
}
}
}

View File

@ -26,9 +26,6 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.io.stream.HandlesStreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.netty.OpenChannelsHandler;
import org.elasticsearch.common.netty.bootstrap.ClientBootstrap;
@ -53,6 +50,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.support.TransportStreams;
import java.io.IOException;
import java.net.InetAddress;
@ -72,7 +70,6 @@ import static org.elasticsearch.common.transport.NetworkExceptionHelper.*;
import static org.elasticsearch.common.unit.TimeValue.*;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.*;
import static org.elasticsearch.common.util.concurrent.EsExecutors.*;
import static org.elasticsearch.transport.Transport.Helper.*;
/**
* @author kimchy (shay.banon)
@ -101,6 +98,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
final String publishHost;
final boolean compress;
final TimeValue connectTimeout;
final Boolean tcpNoDelay;
@ -145,11 +144,12 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
this.networkService = networkService;
this.workerCount = componentSettings.getAsInt("worker_count", Runtime.getRuntime().availableProcessors());
this.blockingServer = componentSettings.getAsBoolean("transport.tcp.blocking_server", componentSettings.getAsBoolean(TCP_BLOCKING_SERVER, componentSettings.getAsBoolean(TCP_BLOCKING, false)));
this.blockingClient = componentSettings.getAsBoolean("transport.tcp.blocking_client", componentSettings.getAsBoolean(TCP_BLOCKING_CLIENT, componentSettings.getAsBoolean(TCP_BLOCKING, false)));
this.blockingServer = settings.getAsBoolean("transport.tcp.blocking_server", settings.getAsBoolean(TCP_BLOCKING_SERVER, settings.getAsBoolean(TCP_BLOCKING, false)));
this.blockingClient = settings.getAsBoolean("transport.tcp.blocking_client", settings.getAsBoolean(TCP_BLOCKING_CLIENT, settings.getAsBoolean(TCP_BLOCKING, false)));
this.port = componentSettings.get("port", settings.get("transport.tcp.port", "9300-9400"));
this.bindHost = componentSettings.get("bind_host");
this.publishHost = componentSettings.get("publish_host");
this.compress = settings.getAsBoolean("transport.tcp.compress", false);
this.connectTimeout = componentSettings.getAsTime("connect_timeout", settings.getAsTime("transport.tcp.connect_timeout", timeValueSeconds(1)));
this.tcpNoDelay = componentSettings.getAsBoolean("tcp_no_delay", settings.getAsBoolean(TCP_NO_DELAY, true));
this.tcpKeepAlive = componentSettings.getAsBoolean("tcp_keep_alive", settings.getAsBoolean(TCP_KEEP_ALIVE, null));
@ -381,31 +381,16 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
return new InetSocketTransportAddress((InetSocketAddress) socketAddress);
}
private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
@Override public <T extends Streamable> void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable streamable, TransportRequestOptions options) throws IOException, TransportException {
@Override public <T extends Streamable> void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException, TransportException {
Channel targetChannel = nodeChannel(node);
HandlesStreamOutput stream = CachedStreamOutput.cachedHandles();
stream.writeBytes(LENGTH_PLACEHOLDER); // fake size
stream.writeLong(requestId);
byte status = 0;
status = setRequest(status);
stream.writeByte(status); // 0 for request, 1 for response.
stream.writeUTF(action);
streamable.writeTo(stream);
byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray();
ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(data);
int size = buffer.writerIndex() - 4;
if (size == 0) {
throw new ElasticSearchIllegalStateException("Trying to send a stream with 0 size");
if (compress) {
options.withCompress();
}
buffer.setInt(0, size); // update real size.
byte[] data = TransportStreams.buildRequest(requestId, action, message, options);
ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(data);
ChannelFuture channelFuture = targetChannel.write(buffer);
// We handle close connection exception in the #exceptionCaught method, which is the main reason we want to add this future
// channelFuture.addListener(new ChannelFutureListener() {

View File

@ -22,7 +22,6 @@ package org.elasticsearch.transport.netty;
import org.elasticsearch.common.io.ThrowableObjectOutputStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.io.stream.HandlesStreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.netty.buffer.ChannelBuffer;
import org.elasticsearch.common.netty.buffer.ChannelBuffers;
@ -31,14 +30,13 @@ import org.elasticsearch.transport.NotSerializableTransportException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportResponseOptions;
import org.elasticsearch.transport.support.TransportStreams;
import java.io.IOException;
import java.io.NotSerializableException;
import static org.elasticsearch.transport.Transport.Helper.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class NettyTransportChannel implements TransportChannel {
@ -68,17 +66,11 @@ public class NettyTransportChannel implements TransportChannel {
}
@Override public void sendResponse(Streamable message, TransportResponseOptions options) throws IOException {
HandlesStreamOutput stream = CachedStreamOutput.cachedHandles();
stream.writeBytes(LENGTH_PLACEHOLDER); // fake size
stream.writeLong(requestId);
byte status = 0;
status = setResponse(status);
stream.writeByte(status); // 0 for request, 1 for response.
message.writeTo(stream);
stream.flush();
byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray();
if (transport.compress) {
options.withCompress();
}
byte[] data = TransportStreams.buildResponse(requestId, message, options);
ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(data);
buffer.setInt(0, buffer.writerIndex() - 4); // update real size.
channel.write(buffer);
}
@ -108,8 +100,8 @@ public class NettyTransportChannel implements TransportChannel {
stream.writeBytes(LENGTH_PLACEHOLDER);
stream.writeLong(requestId);
byte status = 0;
status = setResponse(status);
status = setError(status);
status = TransportStreams.statusSetResponse(status);
status = TransportStreams.statusSetError(status);
stream.writeByte(status);
}
}

View File

@ -0,0 +1,151 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.support;
import org.elasticsearch.common.io.stream.*;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseOptions;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class TransportStreams {
public static final int HEADER_SIZE = 4 + 8 + 1;
public static void writeHeader(byte[] data, int dataLength, long requestId, byte status) {
writeInt(data, 0, dataLength + 9); // add the requestId and the status
writeLong(data, 4, requestId);
data[12] = status;
}
// same as writeLong in StreamOutput
private static void writeLong(byte[] buffer, int offset, long value) {
buffer[offset++] = ((byte) (value >> 56));
buffer[offset++] = ((byte) (value >> 48));
buffer[offset++] = ((byte) (value >> 40));
buffer[offset++] = ((byte) (value >> 32));
buffer[offset++] = ((byte) (value >> 24));
buffer[offset++] = ((byte) (value >> 16));
buffer[offset++] = ((byte) (value >> 8));
buffer[offset] = ((byte) (value));
}
// same as writeInt in StreamOutput
private static void writeInt(byte[] buffer, int offset, int value) {
buffer[offset++] = ((byte) (value >> 24));
buffer[offset++] = ((byte) (value >> 16));
buffer[offset++] = ((byte) (value >> 8));
buffer[offset] = ((byte) (value));
}
private static final byte STATUS_REQRES = 1 << 0;
private static final byte STATUS_ERROR = 1 << 1;
private static final byte STATUS_COMPRESS = 1 << 2;
public static boolean statusIsRequest(byte value) {
return (value & STATUS_REQRES) == 0;
}
public static byte statusSetRequest(byte value) {
value &= ~STATUS_REQRES;
return value;
}
public static byte statusSetResponse(byte value) {
value |= STATUS_REQRES;
return value;
}
public static boolean statusIsError(byte value) {
return (value & STATUS_ERROR) != 0;
}
public static byte statusSetError(byte value) {
value |= STATUS_ERROR;
return value;
}
public static boolean statusIsCompress(byte value) {
return (value & STATUS_COMPRESS) != 0;
}
public static byte statusSetCompress(byte value) {
value |= STATUS_COMPRESS;
return value;
}
public static byte[] buildRequest(final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException {
byte status = 0;
status = TransportStreams.statusSetRequest(status);
BytesStreamOutput wrapped;
if (options.compress()) {
status = TransportStreams.statusSetCompress(status);
HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesLzfBytes();
stream.writeUTF(action);
message.writeTo(stream);
stream.flush();
wrapped = ((BytesStreamOutput) ((LZFStreamOutput) stream.wrappedOut()).wrappedOut());
} else {
HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesBytes();
stream.writeUTF(action);
message.writeTo(stream);
stream.flush();
wrapped = ((BytesStreamOutput) stream.wrappedOut());
}
byte[] data = new byte[HEADER_SIZE + wrapped.size()];
TransportStreams.writeHeader(data, wrapped.size(), requestId, status);
System.arraycopy(wrapped.unsafeByteArray(), 0, data, HEADER_SIZE, wrapped.size());
return data;
}
public static byte[] buildResponse(final long requestId, Streamable message, TransportResponseOptions options) throws IOException {
byte status = 0;
status = TransportStreams.statusSetResponse(status);
BytesStreamOutput wrapped;
if (options.compress()) {
status = TransportStreams.statusSetCompress(status);
HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesLzfBytes();
message.writeTo(stream);
stream.flush();
wrapped = ((BytesStreamOutput) ((LZFStreamOutput) stream.wrappedOut()).wrappedOut());
} else {
HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesBytes();
message.writeTo(stream);
stream.flush();
wrapped = ((BytesStreamOutput) stream.wrappedOut());
}
byte[] data = new byte[HEADER_SIZE + wrapped.size()];
TransportStreams.writeHeader(data, wrapped.size(), requestId, status);
System.arraycopy(wrapped.unsafeByteArray(), 0, data, HEADER_SIZE, wrapped.size());
return data;
}
}

View File

@ -0,0 +1,110 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.lzf;
import org.testng.Assert;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import java.io.*;
import java.security.SecureRandom;
/**
* @author kimchy (shay.banon)
*/
public class LZFInputStreamTests {
private static int BUFFER_SIZE = LZFChunk.MAX_CHUNK_LEN * 64;
private byte[] nonEncodableBytesToWrite = new byte[BUFFER_SIZE];
private byte[] bytesToWrite = new byte[BUFFER_SIZE];
private ByteArrayOutputStream nonCompressed;
private ByteArrayOutputStream compressed;
@BeforeTest(alwaysRun = true)
public void setUp() throws Exception {
SecureRandom.getInstance("SHA1PRNG").nextBytes(nonEncodableBytesToWrite);
String phrase = "all work and no play make Jack a dull boy";
byte[] bytes = phrase.getBytes();
int cursor = 0;
while (cursor <= bytesToWrite.length) {
System.arraycopy(bytes, 0, bytesToWrite, cursor, (bytes.length + cursor < bytesToWrite.length) ? bytes.length : bytesToWrite.length - cursor);
cursor += bytes.length;
}
nonCompressed = new ByteArrayOutputStream();
OutputStream os = new LZFOutputStream(nonCompressed);
os.write(nonEncodableBytesToWrite);
os.close();
compressed = new ByteArrayOutputStream();
os = new LZFOutputStream(compressed);
os.write(bytesToWrite);
os.close();
}
@Test
public void testDecompressNonEncodableReadByte() throws IOException {
doDecompressReadBlock(nonCompressed.toByteArray(), nonEncodableBytesToWrite);
}
@Test
public void testDecompressNonEncodableReadBlock() throws IOException {
doDecompressReadBlock(nonCompressed.toByteArray(), nonEncodableBytesToWrite);
}
@Test
public void testDecompressEncodableReadByte() throws IOException {
doDecompressReadBlock(compressed.toByteArray(), bytesToWrite);
}
@Test
public void testDecompressEncodableReadBlock() throws IOException {
doDecompressReadBlock(compressed.toByteArray(), bytesToWrite);
}
public void doDecompressNonEncodableReadByte(byte[] bytes, byte[] reference) throws IOException {
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
int outputBytes = 0;
InputStream is = new LZFInputStream(bis);
int val;
while ((val = is.read()) != -1) {
byte testVal = (byte) (val & 255);
Assert.assertTrue(testVal == reference[outputBytes]);
outputBytes++;
}
Assert.assertTrue(outputBytes == reference.length);
}
private void doDecompressReadBlock(byte[] bytes, byte[] reference) throws IOException {
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
int outputBytes = 0;
InputStream is = new LZFInputStream(bis);
int val;
byte[] buffer = new byte[65536 + 23];
while ((val = is.read(buffer)) != -1) {
for (int i = 0; i < val; i++) {
byte testVal = buffer[i];
Assert.assertTrue(testVal == reference[outputBytes]);
outputBytes++;
}
}
Assert.assertTrue(outputBytes == reference.length);
}
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.lzf;
import org.testng.Assert;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.security.SecureRandom;
/**
* @author kimchy (shay.banon)
*/
public class LZFOutputStreamTests {
private static int BUFFER_SIZE = LZFChunk.MAX_CHUNK_LEN * 64;
private byte[] nonEncodableBytesToWrite = new byte[BUFFER_SIZE];
private byte[] bytesToWrite = new byte[BUFFER_SIZE];
@BeforeTest(alwaysRun = true)
public void setUp() throws Exception {
SecureRandom.getInstance("SHA1PRNG").nextBytes(nonEncodableBytesToWrite);
String phrase = "all work and no play make Jack a dull boy";
byte[] bytes = phrase.getBytes();
int cursor = 0;
while (cursor <= bytesToWrite.length) {
System.arraycopy(bytes, 0, bytesToWrite, cursor, (bytes.length + cursor < bytesToWrite.length) ? bytes.length : bytesToWrite.length - cursor);
cursor += bytes.length;
}
}
@Test
public void testUnencodable() throws Exception {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
OutputStream os = new LZFOutputStream(bos);
os.write(nonEncodableBytesToWrite);
os.close();
Assert.assertTrue(bos.toByteArray().length > nonEncodableBytesToWrite.length);
}
@Test
public void testStreaming() throws Exception {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
OutputStream os = new LZFOutputStream(bos);
os.write(bytesToWrite);
os.close();
Assert.assertTrue(bos.toByteArray().length > 10);
Assert.assertTrue(bos.toByteArray().length < bytesToWrite.length * .5);
}
@Test
public void testSingleByte() throws Exception {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
OutputStream os = new LZFOutputStream(bos);
for (int idx = 0; idx < BUFFER_SIZE; idx++) {
os.write(bytesToWrite[idx]);
if (idx % 1023 == 0) {
os.flush();
}
}
os.close();
Assert.assertTrue(bos.toByteArray().length > 10);
Assert.assertTrue(bos.toByteArray().length < bytesToWrite.length * .5);
}
@Test
public void testPartialBuffer() throws Exception {
int offset = 255;
int len = 1 << 17;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
OutputStream os = new LZFOutputStream(bos);
os.write(bytesToWrite, offset, len);
os.close();
Assert.assertTrue(bos.toByteArray().length > 10);
Assert.assertTrue(bos.toByteArray().length < bytesToWrite.length * .5);
}
}

View File

@ -116,6 +116,54 @@ public abstract class AbstractSimpleTransportTests {
System.out.println("after ...");
}
@Test public void testHelloWorldCompressed() {
serviceA.registerHandler("sayHello", new BaseTransportRequestHandler<StringMessage>() {
@Override public StringMessage newInstance() {
return new StringMessage();
}
@Override public void messageReceived(StringMessage request, TransportChannel channel) {
System.out.println("got message: " + request.message);
assertThat("moshe", equalTo(request.message));
try {
channel.sendResponse(new StringMessage("hello " + request.message), TransportResponseOptions.options().withCompress());
} catch (IOException e) {
e.printStackTrace();
assertThat(e.getMessage(), false, equalTo(true));
}
}
});
TransportFuture<StringMessage> res = serviceB.submitRequest(serviceANode, "sayHello",
new StringMessage("moshe"), TransportRequestOptions.options().withCompress(), new BaseTransportResponseHandler<StringMessage>() {
@Override public StringMessage newInstance() {
return new StringMessage();
}
@Override public void handleResponse(StringMessage response) {
System.out.println("got response: " + response.message);
assertThat("hello moshe", equalTo(response.message));
}
@Override public void handleException(RemoteTransportException exp) {
exp.printStackTrace();
assertThat("got exception instead of a response: " + exp.getMessage(), false, equalTo(true));
}
});
try {
StringMessage message = res.get();
assertThat("hello moshe", equalTo(message.message));
} catch (Exception e) {
assertThat(e.getMessage(), false, equalTo(true));
}
serviceA.removeHandler("sayHello");
System.out.println("after ...");
}
@Test public void testErrorMessage() {
serviceA.registerHandler("sayHelloException", new BaseTransportRequestHandler<StringMessage>() {
@Override public StringMessage newInstance() {

View File

@ -94,7 +94,7 @@ public class NettyMemcachedServerTransport extends AbstractLifecycleComponent<Me
this.networkService = networkService;
this.workerCount = componentSettings.getAsInt("worker_count", Runtime.getRuntime().availableProcessors());
this.blockingServer = componentSettings.getAsBoolean("memcached.blocking_server", componentSettings.getAsBoolean(TCP_BLOCKING_SERVER, componentSettings.getAsBoolean(TCP_BLOCKING, false)));
this.blockingServer = componentSettings.getAsBoolean("memcached.blocking_server", settings.getAsBoolean(TCP_BLOCKING_SERVER, settings.getAsBoolean(TCP_BLOCKING, false)));
this.port = componentSettings.get("port", settings.get("memcached.port", "11211-11311"));
this.bindHost = componentSettings.get("bind_host");
this.publishHost = componentSettings.get("publish_host");