explicitly clean stream handles

This commit is contained in:
kimchy 2010-08-15 03:08:01 +03:00
parent c18904eb96
commit 6d509a5e4e
4 changed files with 20 additions and 6 deletions

View File

@ -84,6 +84,10 @@ public class HandlesStreamInput extends StreamInput {
in.readBytes(b, offset, len); in.readBytes(b, offset, len);
} }
public void cleanHandles() {
handles.clear();
}
@Override public void reset() throws IOException { @Override public void reset() throws IOException {
in.reset(); in.reset();
handles.clear(); handles.clear();

View File

@ -30,7 +30,7 @@ import java.util.Arrays;
*/ */
public class HandlesStreamOutput extends StreamOutput { public class HandlesStreamOutput extends StreamOutput {
private static final int DEFAULT_IDENTITY_THRESHOLD = 50; private static final int DEFAULT_IDENTITY_THRESHOLD = 100;
// a threshold above which strings will use identity check // a threshold above which strings will use identity check
private final int identityThreshold; private final int identityThreshold;
@ -85,6 +85,11 @@ public class HandlesStreamOutput extends StreamOutput {
out.writeBytes(b, offset, length); out.writeBytes(b, offset, length);
} }
public void cleanHandles() {
handles.clear();
identityHandles.clear();
}
@Override public void reset() throws IOException { @Override public void reset() throws IOException {
handles.clear(); handles.clear();
identityHandles.clear(); identityHandles.clear();

View File

@ -21,6 +21,7 @@ package org.elasticsearch.transport.netty;
import org.elasticsearch.common.io.ThrowableObjectInputStream; import org.elasticsearch.common.io.ThrowableObjectInputStream;
import org.elasticsearch.common.io.stream.CachedStreamInput; import org.elasticsearch.common.io.stream.CachedStreamInput;
import org.elasticsearch.common.io.stream.HandlesStreamInput;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
@ -73,14 +74,15 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
byte status = buffer.readByte(); byte status = buffer.readByte();
boolean isRequest = TransportStreams.statusIsRequest(status); boolean isRequest = TransportStreams.statusIsRequest(status);
HandlesStreamInput handlesStream;
if (TransportStreams.statusIsCompress(status)) { if (TransportStreams.statusIsCompress(status)) {
streamIn = CachedStreamInput.cachedHandlesLzf(streamIn); handlesStream = CachedStreamInput.cachedHandlesLzf(streamIn);
} else { } else {
streamIn = CachedStreamInput.cachedHandles(streamIn); handlesStream = CachedStreamInput.cachedHandles(streamIn);
} }
if (isRequest) { if (isRequest) {
String action = handleRequest(event, streamIn, requestId); String action = handleRequest(event, handlesStream, requestId);
if (buffer.readerIndex() != expectedIndexReader) { if (buffer.readerIndex() != expectedIndexReader) {
if (buffer.readerIndex() < expectedIndexReader) { if (buffer.readerIndex() < expectedIndexReader) {
logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action); logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action);
@ -95,9 +97,9 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
// ignore if its null, the adapter logs it // ignore if its null, the adapter logs it
if (handler != null) { if (handler != null) {
if (TransportStreams.statusIsError(status)) { if (TransportStreams.statusIsError(status)) {
handlerResponseError(streamIn, handler); handlerResponseError(handlesStream, handler);
} else { } else {
handleResponse(streamIn, handler); handleResponse(handlesStream, handler);
} }
} else { } else {
// if its null, skip those bytes // if its null, skip those bytes
@ -113,6 +115,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
} }
} }
} }
handlesStream.cleanHandles();
} }
private void handleResponse(StreamInput buffer, final TransportResponseHandler handler) { private void handleResponse(StreamInput buffer, final TransportResponseHandler handler) {

View File

@ -108,12 +108,14 @@ public class TransportStreams {
message.writeTo(stream); message.writeTo(stream);
stream.flush(); stream.flush();
wrapped = ((BytesStreamOutput) ((LZFStreamOutput) stream.wrappedOut()).wrappedOut()); wrapped = ((BytesStreamOutput) ((LZFStreamOutput) stream.wrappedOut()).wrappedOut());
stream.cleanHandles();
} else { } else {
HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesBytes(); HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesBytes();
stream.writeUTF(action); stream.writeUTF(action);
message.writeTo(stream); message.writeTo(stream);
stream.flush(); stream.flush();
wrapped = ((BytesStreamOutput) stream.wrappedOut()); wrapped = ((BytesStreamOutput) stream.wrappedOut());
stream.cleanHandles();
} }
byte[] data = new byte[HEADER_SIZE + wrapped.size()]; byte[] data = new byte[HEADER_SIZE + wrapped.size()];