From 6d509a5e4e068620188b4a19d3b704138d4f6aed Mon Sep 17 00:00:00 2001 From: kimchy Date: Sun, 15 Aug 2010 03:08:01 +0300 Subject: [PATCH] explicitly clean stream handles --- .../common/io/stream/HandlesStreamInput.java | 4 ++++ .../common/io/stream/HandlesStreamOutput.java | 7 ++++++- .../transport/netty/MessageChannelHandler.java | 13 ++++++++----- .../transport/support/TransportStreams.java | 2 ++ 4 files changed, 20 insertions(+), 6 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamInput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamInput.java index 75f33432170..46b3e136730 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamInput.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamInput.java @@ -84,6 +84,10 @@ public class HandlesStreamInput extends StreamInput { in.readBytes(b, offset, len); } + public void cleanHandles() { + handles.clear(); + } + @Override public void reset() throws IOException { in.reset(); handles.clear(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamOutput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamOutput.java index a2c6a23da85..095d745b487 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamOutput.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamOutput.java @@ -30,7 +30,7 @@ import java.util.Arrays; */ public class HandlesStreamOutput extends StreamOutput { - private static final int DEFAULT_IDENTITY_THRESHOLD = 50; + private static final int DEFAULT_IDENTITY_THRESHOLD = 100; // a threshold above which strings will use identity check private final int identityThreshold; @@ -85,6 +85,11 @@ public class HandlesStreamOutput extends StreamOutput { out.writeBytes(b, offset, length); } + public void cleanHandles() { + handles.clear(); + identityHandles.clear(); + } + @Override public void reset() throws IOException { handles.clear(); identityHandles.clear(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java index 370b60d28d5..cea5a16f66d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java @@ -21,6 +21,7 @@ package org.elasticsearch.transport.netty; import org.elasticsearch.common.io.ThrowableObjectInputStream; import org.elasticsearch.common.io.stream.CachedStreamInput; +import org.elasticsearch.common.io.stream.HandlesStreamInput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.logging.ESLogger; @@ -73,14 +74,15 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { byte status = buffer.readByte(); boolean isRequest = TransportStreams.statusIsRequest(status); + HandlesStreamInput handlesStream; if (TransportStreams.statusIsCompress(status)) { - streamIn = CachedStreamInput.cachedHandlesLzf(streamIn); + handlesStream = CachedStreamInput.cachedHandlesLzf(streamIn); } else { - streamIn = CachedStreamInput.cachedHandles(streamIn); + handlesStream = CachedStreamInput.cachedHandles(streamIn); } if (isRequest) { - String action = handleRequest(event, streamIn, requestId); + String action = handleRequest(event, handlesStream, requestId); if (buffer.readerIndex() != expectedIndexReader) { if (buffer.readerIndex() < expectedIndexReader) { logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action); @@ -95,9 +97,9 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { // ignore if its null, the adapter logs it if (handler != null) { if (TransportStreams.statusIsError(status)) { - handlerResponseError(streamIn, handler); + handlerResponseError(handlesStream, handler); } else { - handleResponse(streamIn, handler); + handleResponse(handlesStream, handler); } } else { // if its null, skip those bytes @@ -113,6 +115,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { } } } + handlesStream.cleanHandles(); } private void handleResponse(StreamInput buffer, final TransportResponseHandler handler) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/support/TransportStreams.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/support/TransportStreams.java index f4d950cab6f..d3fd23c01ae 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/support/TransportStreams.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/support/TransportStreams.java @@ -108,12 +108,14 @@ public class TransportStreams { message.writeTo(stream); stream.flush(); wrapped = ((BytesStreamOutput) ((LZFStreamOutput) stream.wrappedOut()).wrappedOut()); + stream.cleanHandles(); } else { HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesBytes(); stream.writeUTF(action); message.writeTo(stream); stream.flush(); wrapped = ((BytesStreamOutput) stream.wrappedOut()); + stream.cleanHandles(); } byte[] data = new byte[HEADER_SIZE + wrapped.size()];