more improved transport stream size, duplicate strings are serialized once and use handles to deserialize

This commit is contained in:
kimchy 2010-03-21 10:35:25 +02:00
parent 1e455789d0
commit 77564cb14f
15 changed files with 439 additions and 16 deletions

View File

@ -41,7 +41,7 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
/** /**
* @author kimchy (Shay Banon) * @author kimchy (shay.banon)
*/ */
public class FetchPhase implements SearchPhase { public class FetchPhase implements SearchPhase {

View File

@ -43,7 +43,7 @@ import static org.elasticsearch.util.json.Jackson.*;
import static org.elasticsearch.util.lucene.Lucene.*; import static org.elasticsearch.util.lucene.Lucene.*;
/** /**
* @author kimchy (Shay Banon) * @author kimchy (shay.banon)
*/ */
public class InternalSearchHit implements SearchHit { public class InternalSearchHit implements SearchHit {

View File

@ -30,7 +30,7 @@ import java.io.IOException;
import static org.elasticsearch.search.internal.InternalSearchHit.*; import static org.elasticsearch.search.internal.InternalSearchHit.*;
/** /**
* @author kimchy (Shay Banon) * @author kimchy (shay.banon)
*/ */
public class InternalSearchHits implements SearchHits { public class InternalSearchHits implements SearchHits {

View File

@ -28,10 +28,7 @@ import org.elasticsearch.util.Nullable;
import org.elasticsearch.util.component.AbstractComponent; import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.component.Lifecycle; import org.elasticsearch.util.component.Lifecycle;
import org.elasticsearch.util.io.ThrowableObjectInputStream; import org.elasticsearch.util.io.ThrowableObjectInputStream;
import org.elasticsearch.util.io.stream.BytesStreamInput; import org.elasticsearch.util.io.stream.*;
import org.elasticsearch.util.io.stream.BytesStreamOutput;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.Streamable;
import org.elasticsearch.util.settings.ImmutableSettings; import org.elasticsearch.util.settings.ImmutableSettings;
import org.elasticsearch.util.settings.Settings; import org.elasticsearch.util.settings.Settings;
import org.elasticsearch.util.transport.BoundTransportAddress; import org.elasticsearch.util.transport.BoundTransportAddress;
@ -120,7 +117,7 @@ public class LocalTransport extends AbstractComponent implements Transport {
@Override public <T extends Streamable> void sendRequest(final Node node, final long requestId, final String action, @Override public <T extends Streamable> void sendRequest(final Node node, final long requestId, final String action,
final Streamable message, final TransportResponseHandler<T> handler) throws IOException, TransportException { final Streamable message, final TransportResponseHandler<T> handler) throws IOException, TransportException {
BytesStreamOutput stream = BytesStreamOutput.Cached.cached(); HandlesStreamOutput stream = BytesStreamOutput.Cached.cachedHandles();
stream.writeLong(requestId); stream.writeLong(requestId);
byte status = 0; byte status = 0;
@ -135,7 +132,7 @@ public class LocalTransport extends AbstractComponent implements Transport {
throw new ConnectTransportException(node, "Failed to connect"); throw new ConnectTransportException(node, "Failed to connect");
} }
final byte[] data = stream.copiedByteArray(); final byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray();
threadPool.execute(new Runnable() { threadPool.execute(new Runnable() {
@Override public void run() { @Override public void run() {
targetTransport.messageReceived(data, action, LocalTransport.this, handler); targetTransport.messageReceived(data, action, LocalTransport.this, handler);
@ -148,7 +145,8 @@ public class LocalTransport extends AbstractComponent implements Transport {
} }
void messageReceived(byte[] data, String action, LocalTransport sourceTransport, @Nullable final TransportResponseHandler responseHandler) { void messageReceived(byte[] data, String action, LocalTransport sourceTransport, @Nullable final TransportResponseHandler responseHandler) {
BytesStreamInput stream = new BytesStreamInput(data); StreamInput stream = new BytesStreamInput(data);
stream = HandlesStreamInput.Cached.cached(stream);
try { try {
long requestId = stream.readLong(); long requestId = stream.readLong();

View File

@ -25,6 +25,7 @@ import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.util.io.ThrowableObjectOutputStream; import org.elasticsearch.util.io.ThrowableObjectOutputStream;
import org.elasticsearch.util.io.stream.BytesStreamOutput; import org.elasticsearch.util.io.stream.BytesStreamOutput;
import org.elasticsearch.util.io.stream.HandlesStreamOutput;
import org.elasticsearch.util.io.stream.Streamable; import org.elasticsearch.util.io.stream.Streamable;
import java.io.IOException; import java.io.IOException;
@ -56,13 +57,13 @@ public class LocalTransportChannel implements TransportChannel {
} }
@Override public void sendResponse(Streamable message) throws IOException { @Override public void sendResponse(Streamable message) throws IOException {
BytesStreamOutput stream = BytesStreamOutput.Cached.cached(); HandlesStreamOutput stream = BytesStreamOutput.Cached.cachedHandles();
stream.writeLong(requestId); stream.writeLong(requestId);
byte status = 0; byte status = 0;
status = Transport.Helper.setResponse(status); status = Transport.Helper.setResponse(status);
stream.writeByte(status); // 0 for request, 1 for response. stream.writeByte(status); // 0 for request, 1 for response.
message.writeTo(stream); message.writeTo(stream);
final byte[] data = stream.copiedByteArray(); final byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray();
targetTransport.threadPool().execute(new Runnable() { targetTransport.threadPool().execute(new Runnable() {
@Override public void run() { @Override public void run() {
targetTransport.messageReceived(data, action, sourceTransport, null); targetTransport.messageReceived(data, action, sourceTransport, null);

View File

@ -22,6 +22,7 @@ package org.elasticsearch.transport.netty;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*; import org.elasticsearch.transport.*;
import org.elasticsearch.util.io.ThrowableObjectInputStream; import org.elasticsearch.util.io.ThrowableObjectInputStream;
import org.elasticsearch.util.io.stream.HandlesStreamInput;
import org.elasticsearch.util.io.stream.StreamInput; import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.Streamable; import org.elasticsearch.util.io.stream.Streamable;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
@ -56,6 +57,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
ChannelBuffer buffer = (ChannelBuffer) event.getMessage(); ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
StreamInput streamIn = new ChannelBufferStreamInput(buffer); StreamInput streamIn = new ChannelBufferStreamInput(buffer);
streamIn = HandlesStreamInput.Cached.cached(streamIn);
long requestId = buffer.readLong(); long requestId = buffer.readLong();
byte status = buffer.readByte(); byte status = buffer.readByte();

View File

@ -31,6 +31,7 @@ import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.component.AbstractComponent; import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.component.Lifecycle; import org.elasticsearch.util.component.Lifecycle;
import org.elasticsearch.util.io.stream.BytesStreamOutput; import org.elasticsearch.util.io.stream.BytesStreamOutput;
import org.elasticsearch.util.io.stream.HandlesStreamOutput;
import org.elasticsearch.util.io.stream.Streamable; import org.elasticsearch.util.io.stream.Streamable;
import org.elasticsearch.util.settings.Settings; import org.elasticsearch.util.settings.Settings;
import org.elasticsearch.util.transport.BoundTransportAddress; import org.elasticsearch.util.transport.BoundTransportAddress;
@ -378,7 +379,7 @@ public class NettyTransport extends AbstractComponent implements Transport {
Channel targetChannel = nodeChannel(node); Channel targetChannel = nodeChannel(node);
BytesStreamOutput stream = BytesStreamOutput.Cached.cached(); HandlesStreamOutput stream = BytesStreamOutput.Cached.cachedHandles();
stream.writeBytes(LENGTH_PLACEHOLDER); // fake size stream.writeBytes(LENGTH_PLACEHOLDER); // fake size
stream.writeLong(requestId); stream.writeLong(requestId);
@ -389,7 +390,8 @@ public class NettyTransport extends AbstractComponent implements Transport {
stream.writeUTF(action); stream.writeUTF(action);
streamable.writeTo(stream); streamable.writeTo(stream);
ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(stream.copiedByteArray()); byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray();
ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(data);
int size = buffer.writerIndex() - 4; int size = buffer.writerIndex() - 4;
if (size == 0) { if (size == 0) {

View File

@ -24,6 +24,7 @@ import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.util.io.ThrowableObjectOutputStream; import org.elasticsearch.util.io.ThrowableObjectOutputStream;
import org.elasticsearch.util.io.stream.BytesStreamOutput; import org.elasticsearch.util.io.stream.BytesStreamOutput;
import org.elasticsearch.util.io.stream.HandlesStreamOutput;
import org.elasticsearch.util.io.stream.Streamable; import org.elasticsearch.util.io.stream.Streamable;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.buffer.ChannelBuffers;
@ -61,14 +62,15 @@ public class NettyTransportChannel implements TransportChannel {
} }
@Override public void sendResponse(Streamable message) throws IOException { @Override public void sendResponse(Streamable message) throws IOException {
BytesStreamOutput stream = BytesStreamOutput.Cached.cached(); HandlesStreamOutput stream = BytesStreamOutput.Cached.cachedHandles();
stream.writeBytes(LENGTH_PLACEHOLDER); // fake size stream.writeBytes(LENGTH_PLACEHOLDER); // fake size
stream.writeLong(requestId); stream.writeLong(requestId);
byte status = 0; byte status = 0;
status = setResponse(status); status = setResponse(status);
stream.writeByte(status); // 0 for request, 1 for response. stream.writeByte(status); // 0 for request, 1 for response.
message.writeTo(stream); message.writeTo(stream);
ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(stream.copiedByteArray()); byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray();
ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(data);
buffer.setInt(0, buffer.writerIndex() - 4); // update real size. buffer.setInt(0, buffer.writerIndex() - 4); // update real size.
channel.write(buffer); channel.write(buffer);
} }

View File

@ -38,6 +38,12 @@ public class BytesStreamOutput extends StreamOutput {
} }
}; };
private static final ThreadLocal<HandlesStreamOutput> cacheHandles = new ThreadLocal<HandlesStreamOutput>() {
@Override protected HandlesStreamOutput initialValue() {
return new HandlesStreamOutput(new BytesStreamOutput());
}
};
/** /**
* Returns the cached thread local byte stream, with its internal stream cleared. * Returns the cached thread local byte stream, with its internal stream cleared.
*/ */
@ -46,6 +52,12 @@ public class BytesStreamOutput extends StreamOutput {
os.reset(); os.reset();
return os; return os;
} }
public static HandlesStreamOutput cachedHandles() throws IOException {
HandlesStreamOutput os = cacheHandles.get();
os.reset();
return os;
}
} }
/** /**

View File

@ -46,6 +46,10 @@ public class DataOutputStreamOutput extends StreamOutput {
// nothing to do there... // nothing to do there...
} }
@Override public void reset() throws IOException {
// nothing to do there...
}
@Override public void close() throws IOException { @Override public void close() throws IOException {
if (out instanceof Closeable) { if (out instanceof Closeable) {
((Closeable) out).close(); ((Closeable) out).close();

View File

@ -0,0 +1,108 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.util.io.stream;
import org.elasticsearch.util.gnu.trove.TIntObjectHashMap;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class HandlesStreamInput extends StreamInput {
public static class Cached {
private static final ThreadLocal<HandlesStreamInput> cache = new ThreadLocal<HandlesStreamInput>() {
@Override protected HandlesStreamInput initialValue() {
return new HandlesStreamInput();
}
};
/**
* Returns the cached thread local byte stream, with its internal stream cleared.
*/
public static HandlesStreamInput cached(StreamInput in) {
HandlesStreamInput os = cache.get();
os.reset(in);
return os;
}
}
private StreamInput in;
private final TIntObjectHashMap<String> handles = new TIntObjectHashMap<String>();
private final TIntObjectHashMap<String> identityHandles = new TIntObjectHashMap<String>();
HandlesStreamInput() {
}
public HandlesStreamInput(StreamInput in) {
this.in = in;
}
@Override public String readUTF() throws IOException {
byte b = in.readByte();
if (b == 0) {
// full string with handle
int handle = in.readVInt();
String s = in.readUTF();
handles.put(handle, s);
return s;
} else if (b == 1) {
return handles.get(in.readVInt());
} else if (b == 2) {
// full string with handle
int handle = in.readVInt();
String s = in.readUTF();
identityHandles.put(handle, s);
return s;
} else if (b == 3) {
return identityHandles.get(in.readVInt());
} else {
throw new IOException("Expected handle header");
}
}
@Override public byte readByte() throws IOException {
return in.readByte();
}
@Override public void readBytes(byte[] b, int offset, int len) throws IOException {
in.readBytes(b, offset, len);
}
@Override public void reset() throws IOException {
in.reset();
handles.clear();
}
public void reset(StreamInput in) {
this.in = in;
handles.clear();
identityHandles.clear();
}
@Override public void close() throws IOException {
in.close();
}
}

View File

@ -0,0 +1,231 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.util.io.stream;
import org.elasticsearch.util.gnu.trove.TObjectIntHashMap;
import org.elasticsearch.util.trove.ExtTObjectIntHasMap;
import java.io.IOException;
import java.util.Arrays;
/**
* @author kimchy (shay.banon)
*/
public class HandlesStreamOutput extends StreamOutput {
private static final int DEFAULT_IDENTITY_THRESHOLD = 50;
// a threshold above which strings will use identity check
private final int identityThreshold;
private final StreamOutput out;
private final TObjectIntHashMap<String> handles = new ExtTObjectIntHasMap<String>().defaultReturnValue(-1);
private final HandleTable identityHandles = new HandleTable(10, (float) 3.00);
public HandlesStreamOutput(StreamOutput out) {
this(out, DEFAULT_IDENTITY_THRESHOLD);
}
public HandlesStreamOutput(StreamOutput out, int identityThreshold) {
this.out = out;
this.identityThreshold = identityThreshold;
}
@Override public void writeUTF(String s) throws IOException {
if (s.length() < identityThreshold) {
int handle = handles.get(s);
if (handle == -1) {
handle = handles.size();
handles.put(s, handle);
out.writeByte((byte) 0);
out.writeVInt(handle);
out.writeUTF(s);
} else {
out.writeByte((byte) 1);
out.writeVInt(handle);
}
} else {
int handle = identityHandles.lookup(s);
if (handle == -1) {
handle = identityHandles.assign(s);
out.writeByte((byte) 2);
out.writeVInt(handle);
out.writeUTF(s);
} else {
out.writeByte((byte) 3);
out.writeVInt(handle);
}
}
}
@Override public void writeByte(byte b) throws IOException {
out.writeByte(b);
}
@Override public void writeBytes(byte[] b, int offset, int length) throws IOException {
out.writeBytes(b, offset, length);
}
@Override public void reset() throws IOException {
handles.clear();
identityHandles.clear();
out.reset();
}
@Override public void flush() throws IOException {
out.flush();
}
@Override public void close() throws IOException {
out.close();
}
public StreamOutput wrappedOut() {
return this.out;
}
/**
* Lightweight identity hash table which maps objects to integer handles,
* assigned in ascending order.
*/
private static class HandleTable {
/* number of mappings in table/next available handle */
private int size;
/* size threshold determining when to expand hash spine */
private int threshold;
/* factor for computing size threshold */
private final float loadFactor;
/* maps hash value -> candidate handle value */
private int[] spine;
/* maps handle value -> next candidate handle value */
private int[] next;
/* maps handle value -> associated object */
private Object[] objs;
/**
* Creates new HandleTable with given capacity and load factor.
*/
HandleTable(int initialCapacity, float loadFactor) {
this.loadFactor = loadFactor;
spine = new int[initialCapacity];
next = new int[initialCapacity];
objs = new Object[initialCapacity];
threshold = (int) (initialCapacity * loadFactor);
clear();
}
/**
* Assigns next available handle to given object, and returns handle
* value. Handles are assigned in ascending order starting at 0.
*/
int assign(Object obj) {
if (size >= next.length) {
growEntries();
}
if (size >= threshold) {
growSpine();
}
insert(obj, size);
return size++;
}
/**
* Looks up and returns handle associated with given object, or -1 if
* no mapping found.
*/
int lookup(Object obj) {
if (size == 0) {
return -1;
}
int index = hash(obj) % spine.length;
for (int i = spine[index]; i >= 0; i = next[i]) {
if (objs[i] == obj) {
return i;
}
}
return -1;
}
/**
* Resets table to its initial (empty) state.
*/
void clear() {
Arrays.fill(spine, -1);
Arrays.fill(objs, 0, size, null);
size = 0;
}
/**
* Returns the number of mappings currently in table.
*/
int size() {
return size;
}
/**
* Inserts mapping object -> handle mapping into table. Assumes table
* is large enough to accommodate new mapping.
*/
private void insert(Object obj, int handle) {
int index = hash(obj) % spine.length;
objs[handle] = obj;
next[handle] = spine[index];
spine[index] = handle;
}
/**
* Expands the hash "spine" -- equivalent to increasing the number of
* buckets in a conventional hash table.
*/
private void growSpine() {
spine = new int[(spine.length << 1) + 1];
threshold = (int) (spine.length * loadFactor);
Arrays.fill(spine, -1);
for (int i = 0; i < size; i++) {
insert(objs[i], i);
}
}
/**
* Increases hash table capacity by lengthening entry arrays.
*/
private void growEntries() {
int newLength = (next.length << 1) + 1;
int[] newNext = new int[newLength];
System.arraycopy(next, 0, newNext, 0, size);
next = newNext;
Object[] newObjs = new Object[newLength];
System.arraycopy(objs, 0, newObjs, 0, size);
objs = newObjs;
}
/**
* Returns hash value for given object.
*/
private int hash(Object obj) {
return System.identityHashCode(obj) & 0x7FFFFFFF;
}
}
}

View File

@ -41,6 +41,10 @@ public class OutputStreamStreamOutput extends StreamOutput {
os.write(b, offset, length); os.write(b, offset, length);
} }
@Override public void reset() throws IOException {
// nothing to do
}
@Override public void flush() throws IOException { @Override public void flush() throws IOException {
os.flush(); os.flush();
} }

View File

@ -150,6 +150,8 @@ public abstract class StreamOutput extends OutputStream {
*/ */
public abstract void close() throws IOException; public abstract void close() throws IOException;
public abstract void reset() throws IOException;
@Override public void write(int b) throws IOException { @Override public void write(int b) throws IOException {
writeByte((byte) b); writeByte((byte) b);
} }

View File

@ -0,0 +1,57 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.util.io.streams;
import org.elasticsearch.util.io.stream.BytesStreamInput;
import org.elasticsearch.util.io.stream.BytesStreamOutput;
import org.elasticsearch.util.io.stream.HandlesStreamInput;
import org.elasticsearch.util.io.stream.HandlesStreamOutput;
import org.testng.annotations.Test;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
@Test
public class HandlesStreamsTests {
@Test public void testSharedUTFHandles() throws Exception {
BytesStreamOutput bytesOut = new BytesStreamOutput();
HandlesStreamOutput out = new HandlesStreamOutput(bytesOut, 5);
String lowerThresholdValue = "test";
String higherThresholdValue = "something that is higher than 5";
out.writeUTF(lowerThresholdValue);
out.writeUTF(higherThresholdValue);
out.writeInt(1);
out.writeUTF("else");
out.writeUTF(higherThresholdValue);
out.writeUTF(lowerThresholdValue);
HandlesStreamInput in = new HandlesStreamInput(new BytesStreamInput(bytesOut.copiedByteArray()));
assertThat(in.readUTF(), equalTo(lowerThresholdValue));
assertThat(in.readUTF(), equalTo(higherThresholdValue));
assertThat(in.readInt(), equalTo(1));
assertThat(in.readUTF(), equalTo("else"));
assertThat(in.readUTF(), equalTo(higherThresholdValue));
assertThat(in.readUTF(), equalTo(lowerThresholdValue));
}
}