From c0fa60eb26100803ebc654b51069c6a6b289ac1d Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 29 Jan 2015 16:49:39 +0100 Subject: [PATCH] Remove HandlesStreamInput/Output The optimization we do in the HandlesStreamInput / Output adds a lot of complexity with a rather unknown benefit. It tries to compress commonly used strings and write ids instead. This should rather be done on a lower level if at all necessary for the small message we send over the network. --- .../action/bulk/BulkItemResponse.java | 12 +- .../action/delete/DeleteRequest.java | 4 +- .../action/delete/DeleteResponse.java | 8 +- .../elasticsearch/action/get/GetRequest.java | 5 +- .../action/get/MultiGetRequest.java | 9 +- .../action/index/IndexRequest.java | 4 +- .../action/index/IndexResponse.java | 8 +- .../ShardReplicationOperationRequest.java | 5 +- .../action/update/UpdateRequest.java | 4 +- .../action/update/UpdateResponse.java | 8 +- .../client/transport/TransportClient.java | 3 - .../common/io/CachedStreams.java | 29 --- .../common/io/stream/AdapterStreamInput.java | 174 ----------------- .../common/io/stream/AdapterStreamOutput.java | 183 ------------------ .../common/io/stream/CachedStreamInput.java | 72 ------- .../common/io/stream/HandlesStreamInput.java | 96 --------- .../common/io/stream/HandlesStreamOutput.java | 84 -------- .../common/io/stream/StreamInput.java | 33 +--- .../common/io/stream/StreamOutput.java | 26 +-- .../common/rounding/TimeZoneRounding.java | 32 +-- .../zen/ping/multicast/MulticastZenPing.java | 7 +- .../publish/PublishClusterStateAction.java | 6 +- .../elasticsearch/index/get/GetResult.java | 8 +- .../node/internal/InternalNode.java | 2 - .../search/SearchShardTarget.java | 8 +- .../search/highlight/HighlightField.java | 4 +- .../search/internal/InternalSearchHit.java | 4 +- .../internal/InternalSearchHitField.java | 4 +- .../transport/local/LocalTransport.java | 52 +++-- .../local/LocalTransportChannel.java | 32 ++- .../netty/MessageChannelHandler.java | 5 +- .../transport/netty/NettyTransport.java | 2 - .../netty/NettyTransportChannel.java | 2 - .../io/streams/HandlesStreamsTests.java | 80 -------- 34 files changed, 112 insertions(+), 903 deletions(-) delete mode 100644 src/main/java/org/elasticsearch/common/io/CachedStreams.java delete mode 100644 src/main/java/org/elasticsearch/common/io/stream/AdapterStreamInput.java delete mode 100644 src/main/java/org/elasticsearch/common/io/stream/AdapterStreamOutput.java delete mode 100644 src/main/java/org/elasticsearch/common/io/stream/CachedStreamInput.java delete mode 100644 src/main/java/org/elasticsearch/common/io/stream/HandlesStreamInput.java delete mode 100644 src/main/java/org/elasticsearch/common/io/stream/HandlesStreamOutput.java delete mode 100644 src/test/java/org/elasticsearch/common/io/streams/HandlesStreamsTests.java diff --git a/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java b/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java index a95a7ab0c8a..3ffa1ddcb62 100644 --- a/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java +++ b/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java @@ -247,7 +247,7 @@ public class BulkItemResponse implements Streamable { @Override public void readFrom(StreamInput in) throws IOException { id = in.readVInt(); - opType = in.readSharedString(); + opType = in.readString(); byte type = in.readByte(); if (type == 0) { @@ -262,8 +262,8 @@ public class BulkItemResponse implements Streamable { } if (in.readBoolean()) { - String fIndex = in.readSharedString(); - String fType = in.readSharedString(); + String fIndex = in.readString(); + String fType = in.readString(); String fId = in.readOptionalString(); String fMessage = in.readString(); RestStatus status = RestStatus.readFrom(in); @@ -274,7 +274,7 @@ public class BulkItemResponse implements Streamable { @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(id); - out.writeSharedString(opType); + out.writeString(opType); if (response == null) { out.writeByte((byte) 2); @@ -292,8 +292,8 @@ public class BulkItemResponse implements Streamable { out.writeBoolean(false); } else { out.writeBoolean(true); - out.writeSharedString(failure.getIndex()); - out.writeSharedString(failure.getType()); + out.writeString(failure.getIndex()); + out.writeString(failure.getType()); out.writeOptionalString(failure.getId()); out.writeString(failure.getMessage()); RestStatus.writeTo(out, failure.getStatus()); diff --git a/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index ba6cf69f84d..22e10ee12de 100644 --- a/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -223,7 +223,7 @@ public class DeleteRequest extends ShardReplicationOperationRequest { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - type = in.readSharedString(); + type = in.readString(); id = in.readString(); routing = in.readOptionalString(); preference = in.readOptionalString(); @@ -316,7 +315,7 @@ public class GetRequest extends SingleShardOperationRequest { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeSharedString(type); + out.writeString(type); out.writeString(id); out.writeOptionalString(routing); out.writeOptionalString(preference); diff --git a/src/main/java/org/elasticsearch/action/get/MultiGetRequest.java b/src/main/java/org/elasticsearch/action/get/MultiGetRequest.java index d1e3814abf1..74e76253ca2 100644 --- a/src/main/java/org/elasticsearch/action/get/MultiGetRequest.java +++ b/src/main/java/org/elasticsearch/action/get/MultiGetRequest.java @@ -22,7 +22,6 @@ package org.elasticsearch.action.get; import com.google.common.collect.Iterators; import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.Version; import org.elasticsearch.action.*; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.Nullable; @@ -174,8 +173,8 @@ public class MultiGetRequest extends ActionRequest implements I @Override public void readFrom(StreamInput in) throws IOException { - index = in.readSharedString(); - type = in.readOptionalSharedString(); + index = in.readString(); + type = in.readOptionalString(); id = in.readString(); routing = in.readOptionalString(); int size = in.readVInt(); @@ -193,8 +192,8 @@ public class MultiGetRequest extends ActionRequest implements I @Override public void writeTo(StreamOutput out) throws IOException { - out.writeSharedString(index); - out.writeOptionalSharedString(type); + out.writeString(index); + out.writeOptionalString(type); out.writeString(id); out.writeOptionalString(routing); if (fields == null) { diff --git a/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 68b3f0326c5..cce251384b7 100644 --- a/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -676,7 +676,7 @@ public class IndexRequest extends ShardReplicationOperationRequest @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - type = in.readSharedString(); + type = in.readString(); id = in.readOptionalString(); routing = in.readOptionalString(); parent = in.readOptionalString(); @@ -695,7 +695,7 @@ public class IndexRequest extends ShardReplicationOperationRequest @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeSharedString(type); + out.writeString(type); out.writeOptionalString(id); out.writeOptionalString(routing); out.writeOptionalString(parent); diff --git a/src/main/java/org/elasticsearch/action/index/IndexResponse.java b/src/main/java/org/elasticsearch/action/index/IndexResponse.java index 818fd635981..0074d87b563 100644 --- a/src/main/java/org/elasticsearch/action/index/IndexResponse.java +++ b/src/main/java/org/elasticsearch/action/index/IndexResponse.java @@ -89,8 +89,8 @@ public class IndexResponse extends ActionWriteResponse { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - index = in.readSharedString(); - type = in.readSharedString(); + index = in.readString(); + type = in.readString(); id = in.readString(); version = in.readLong(); created = in.readBoolean(); @@ -99,8 +99,8 @@ public class IndexResponse extends ActionWriteResponse { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeSharedString(index); - out.writeSharedString(type); + out.writeString(index); + out.writeString(type); out.writeString(id); out.writeLong(version); out.writeBoolean(created); diff --git a/src/main/java/org/elasticsearch/action/support/replication/ShardReplicationOperationRequest.java b/src/main/java/org/elasticsearch/action/support/replication/ShardReplicationOperationRequest.java index acf1594a4bf..069036d6c87 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/ShardReplicationOperationRequest.java +++ b/src/main/java/org/elasticsearch/action/support/replication/ShardReplicationOperationRequest.java @@ -19,7 +19,6 @@ package org.elasticsearch.action.support.replication; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; @@ -200,7 +199,7 @@ public abstract class ShardReplicationOperationRequest super.readFrom(in); replicationType = ReplicationType.fromId(in.readByte()); consistencyLevel = WriteConsistencyLevel.fromId(in.readByte()); - type = in.readSharedString(); + type = in.readString(); id = in.readString(); routing = in.readOptionalString(); script = in.readOptionalString(); @@ -669,7 +669,7 @@ public class UpdateRequest extends InstanceShardOperationRequest super.writeTo(out); out.writeByte(replicationType.id()); out.writeByte(consistencyLevel.id()); - out.writeSharedString(type); + out.writeString(type); out.writeString(id); out.writeOptionalString(routing); out.writeOptionalString(script); diff --git a/src/main/java/org/elasticsearch/action/update/UpdateResponse.java b/src/main/java/org/elasticsearch/action/update/UpdateResponse.java index b63ecd07165..fe71cc093dc 100644 --- a/src/main/java/org/elasticsearch/action/update/UpdateResponse.java +++ b/src/main/java/org/elasticsearch/action/update/UpdateResponse.java @@ -104,8 +104,8 @@ public class UpdateResponse extends ActionWriteResponse { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - index = in.readSharedString(); - type = in.readSharedString(); + index = in.readString(); + type = in.readString(); id = in.readString(); version = in.readLong(); created = in.readBoolean(); @@ -117,8 +117,8 @@ public class UpdateResponse extends ActionWriteResponse { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeSharedString(index); - out.writeSharedString(type); + out.writeString(index); + out.writeString(type); out.writeString(id); out.writeLong(version); out.writeBoolean(created); diff --git a/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/src/main/java/org/elasticsearch/client/transport/TransportClient.java index f10cdd995de..c382a70a789 100644 --- a/src/main/java/org/elasticsearch/client/transport/TransportClient.java +++ b/src/main/java/org/elasticsearch/client/transport/TransportClient.java @@ -66,7 +66,6 @@ import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.inject.ModulesBuilder; -import org.elasticsearch.common.io.CachedStreams; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; @@ -286,8 +285,6 @@ public class TransportClient extends AbstractClient { } injector.getInstance(PageCacheRecycler.class).close(); - - CachedStreams.clear(); } @Override diff --git a/src/main/java/org/elasticsearch/common/io/CachedStreams.java b/src/main/java/org/elasticsearch/common/io/CachedStreams.java deleted file mode 100644 index 12af1f056b6..00000000000 --- a/src/main/java/org/elasticsearch/common/io/CachedStreams.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.io; - -import org.elasticsearch.common.io.stream.CachedStreamInput; - -public class CachedStreams { - - public static void clear() { - CachedStreamInput.clear(); - } -} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/common/io/stream/AdapterStreamInput.java b/src/main/java/org/elasticsearch/common/io/stream/AdapterStreamInput.java deleted file mode 100644 index e2838f1482c..00000000000 --- a/src/main/java/org/elasticsearch/common/io/stream/AdapterStreamInput.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.io.stream; - -import org.elasticsearch.Version; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.text.Text; - -import java.io.IOException; - -/** - */ -public abstract class AdapterStreamInput extends StreamInput { - - protected StreamInput in; - - protected AdapterStreamInput() { - } - - public AdapterStreamInput(StreamInput in) { - this.in = in; - super.setVersion(in.getVersion()); - } - - @Override - public StreamInput setVersion(Version version) { - in.setVersion(version); - return super.setVersion(version); - } - - public void reset(StreamInput in) { - this.in = in; - } - - @Override - public byte readByte() throws IOException { - return in.readByte(); - } - - @Override - public void readBytes(byte[] b, int offset, int len) throws IOException { - in.readBytes(b, offset, len); - } - - @Override - public BytesReference readBytesReference() throws IOException { - return in.readBytesReference(); - } - - @Override - public BytesReference readBytesReference(int length) throws IOException { - return in.readBytesReference(length); - } - - @Override - public void reset() throws IOException { - in.reset(); - } - - @Override - public void close() throws IOException { - in.close(); - } - - @Override - public int read() throws IOException { - return in.read(); - } - - // override ones to direct them - - - @Override - public void readFully(byte[] b) throws IOException { - in.readFully(b); - } - - @Override - public short readShort() throws IOException { - return in.readShort(); - } - - @Override - public int readInt() throws IOException { - return in.readInt(); - } - - @Override - public int readVInt() throws IOException { - return in.readVInt(); - } - - @Override - public long readLong() throws IOException { - return in.readLong(); - } - - @Override - public long readVLong() throws IOException { - return in.readVLong(); - } - - @Override - public String readString() throws IOException { - return in.readString(); - } - - @Override - public String readSharedString() throws IOException { - return in.readSharedString(); - } - - @Override - public Text readText() throws IOException { - return in.readText(); - } - - @Override - public Text readSharedText() throws IOException { - return in.readSharedText(); - } - - @Override - public int read(byte[] b) throws IOException { - return in.read(b); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - return in.read(b, off, len); - } - - @Override - public long skip(long n) throws IOException { - return in.skip(n); - } - - @Override - public int available() throws IOException { - return in.available(); - } - - @Override - public void mark(int readlimit) { - in.mark(readlimit); - } - - @Override - public boolean markSupported() { - return in.markSupported(); - } - - @Override - public String toString() { - return in.toString(); - } -} diff --git a/src/main/java/org/elasticsearch/common/io/stream/AdapterStreamOutput.java b/src/main/java/org/elasticsearch/common/io/stream/AdapterStreamOutput.java deleted file mode 100644 index add9b49068b..00000000000 --- a/src/main/java/org/elasticsearch/common/io/stream/AdapterStreamOutput.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.io.stream; - -import org.elasticsearch.Version; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.text.Text; - -import java.io.IOException; - -/** - */ -public class AdapterStreamOutput extends StreamOutput { - - protected StreamOutput out; - - public AdapterStreamOutput(StreamOutput out) { - this.out = out; - super.setVersion(out.getVersion()); - } - - @Override - public StreamOutput setVersion(Version version) { - out.setVersion(version); - return super.setVersion(version); - } - - public void setOut(StreamOutput out) { - this.out = out; - } - - public StreamOutput wrappedOut() { - return this.out; - } - - @Override - public boolean seekPositionSupported() { - return out.seekPositionSupported(); - } - - @Override - public long position() throws IOException { - return out.position(); - } - - @Override - public void seek(long position) throws IOException { - out.seek(position); - } - - @Override - public void writeByte(byte b) throws IOException { - out.writeByte(b); - } - - @Override - public void writeBytes(byte[] b, int offset, int length) throws IOException { - out.writeBytes(b, offset, length); - } - - @Override - public void flush() throws IOException { - out.flush(); - } - - @Override - public void close() throws IOException { - out.close(); - } - - @Override - public void reset() throws IOException { - out.reset(); - } - - @Override - public void writeBytes(byte[] b) throws IOException { - out.writeBytes(b); - } - - @Override - public void writeBytes(byte[] b, int length) throws IOException { - out.writeBytes(b, length); - } - - @Override - public void writeBytesReference(@Nullable BytesReference bytes) throws IOException { - out.writeBytesReference(bytes); - } - - @Override - public void writeInt(int i) throws IOException { - out.writeInt(i); - } - - @Override - public void writeVInt(int i) throws IOException { - out.writeVInt(i); - } - - @Override - public void writeLong(long i) throws IOException { - out.writeLong(i); - } - - @Override - public void writeVLong(long i) throws IOException { - out.writeVLong(i); - } - - @Override - public void writeString(String str) throws IOException { - out.writeString(str); - } - - @Override - public void writeSharedString(String str) throws IOException { - out.writeSharedString(str); - } - - @Override - public void writeText(Text text) throws IOException { - out.writeText(text); - } - - @Override - public void writeSharedText(Text text) throws IOException { - out.writeSharedText(text); - } - - @Override - public void writeFloat(float v) throws IOException { - out.writeFloat(v); - } - - @Override - public void writeDouble(double v) throws IOException { - out.writeDouble(v); - } - - @Override - public void writeBoolean(boolean b) throws IOException { - out.writeBoolean(b); - } - - @Override - public void write(int b) throws IOException { - out.write(b); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - out.write(b, off, len); - } - - @Override - public void write(byte[] b) throws IOException { - out.write(b); - } - - @Override - public String toString() { - return out.toString(); - } -} diff --git a/src/main/java/org/elasticsearch/common/io/stream/CachedStreamInput.java b/src/main/java/org/elasticsearch/common/io/stream/CachedStreamInput.java deleted file mode 100644 index 6ad6f3b0e12..00000000000 --- a/src/main/java/org/elasticsearch/common/io/stream/CachedStreamInput.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.io.stream; - -import org.elasticsearch.common.compress.Compressor; - -import java.io.IOException; -import java.lang.ref.SoftReference; - -/** - * - */ -public class CachedStreamInput { - - static class Entry { - final HandlesStreamInput handles; - - Entry(HandlesStreamInput handles) { - this.handles = handles; - } - } - - private static final ThreadLocal> cache = new ThreadLocal<>(); - - static Entry instance() { - SoftReference ref = cache.get(); - Entry entry = ref == null ? null : ref.get(); - if (entry == null) { - HandlesStreamInput handles = new HandlesStreamInput(); - entry = new Entry(handles); - cache.set(new SoftReference<>(entry)); - } - return entry; - } - - public static void clear() { - cache.remove(); - } - - public static StreamInput compressed(Compressor compressor, StreamInput in) throws IOException { - return compressor.streamInput(in); - } - - public static HandlesStreamInput cachedHandles(StreamInput in) { - HandlesStreamInput handles = instance().handles; - handles.reset(in); - return handles; - } - - public static HandlesStreamInput cachedHandlesCompressed(Compressor compressor, StreamInput in) throws IOException { - Entry entry = instance(); - entry.handles.reset(compressor.streamInput(in)); - return entry.handles; - } -} diff --git a/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamInput.java b/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamInput.java deleted file mode 100644 index 9ae8890e0bf..00000000000 --- a/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamInput.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.io.stream; - -import com.carrotsearch.hppc.IntObjectOpenHashMap; -import org.elasticsearch.common.text.Text; - -import java.io.IOException; - -/** - * - */ -public class HandlesStreamInput extends AdapterStreamInput { - - private final IntObjectOpenHashMap handles = new IntObjectOpenHashMap<>(); - private final IntObjectOpenHashMap handlesText = new IntObjectOpenHashMap<>(); - - HandlesStreamInput() { - super(); - } - - public HandlesStreamInput(StreamInput in) { - super(in); - } - - @Override - public String readSharedString() throws IOException { - byte b = in.readByte(); - if (b == 0) { - // full string with handle - int handle = in.readVInt(); - String s = in.readString(); - handles.put(handle, s); - return s; - } else if (b == 1) { - return handles.get(in.readVInt()); - } else { - throw new IOException("Expected handle header, got [" + b + "]"); - } - } - - @Override - public String readString() throws IOException { - return in.readString(); - } - - @Override - public Text readSharedText() throws IOException { - byte b = in.readByte(); - if (b == 0) { - int handle = in.readVInt(); - Text s = in.readText(); - handlesText.put(handle, s); - return s; - } else if (b == 1) { - return handlesText.get(in.readVInt()); - } else if (b == 2) { - return in.readText(); - } else { - throw new IOException("Expected handle header, got [" + b + "]"); - } - } - - @Override - public void reset() throws IOException { - super.reset(); - cleanHandles(); - } - - public void reset(StreamInput in) { - super.reset(in); - cleanHandles(); - } - - public void cleanHandles() { - handles.clear(); - handlesText.clear(); - } -} diff --git a/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamOutput.java b/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamOutput.java deleted file mode 100644 index 35da9913120..00000000000 --- a/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamOutput.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.io.stream; - -import com.carrotsearch.hppc.ObjectIntOpenHashMap; -import org.elasticsearch.common.text.Text; - -import java.io.IOException; - -/** - * - */ -public class HandlesStreamOutput extends AdapterStreamOutput { - - private final ObjectIntOpenHashMap handles = new ObjectIntOpenHashMap<>(); - private final ObjectIntOpenHashMap handlesText = new ObjectIntOpenHashMap<>(); - - public HandlesStreamOutput(StreamOutput out) { - super(out); - } - - @Override - public void writeSharedString(String str) throws IOException { - if (handles.containsKey(str)) { - out.writeByte((byte) 1); - out.writeVInt(handles.lget()); - } else { - int handle = handles.size(); - handles.put(str, handle); - out.writeByte((byte) 0); - out.writeVInt(handle); - out.writeString(str); - } - } - - @Override - public void writeString(String s) throws IOException { - out.writeString(s); - } - - @Override - public void writeSharedText(Text text) throws IOException { - if (handlesText.containsKey(text)) { - out.writeByte((byte) 1); - out.writeVInt(handlesText.lget()); - } else { - int handle = handlesText.size(); - handlesText.put(text, handle); - out.writeByte((byte) 0); - out.writeVInt(handle); - out.writeText(text); - } - } - - @Override - public void reset() throws IOException { - clear(); - if (out != null) { - out.reset(); - } - } - - public void clear() { - handles.clear(); - handlesText.clear(); - } -} diff --git a/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java index 40f8ccc38ef..ed11a6cdd26 100644 --- a/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java +++ b/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java @@ -27,7 +27,6 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.text.StringAndBytesText; -import org.elasticsearch.common.text.StringText; import org.elasticsearch.common.text.Text; import org.joda.time.DateTime; @@ -222,22 +221,6 @@ public abstract class StreamInput extends InputStream { return new StringAndBytesText(readBytesReference(length)); } - public Text[] readTextArray() throws IOException { - int size = readVInt(); - if (size == 0) { - return StringText.EMPTY_ARRAY; - } - Text[] ret = new Text[size]; - for (int i = 0; i < size; i++) { - ret[i] = readText(); - } - return ret; - } - - public Text readSharedText() throws IOException { - return readText(); - } - @Nullable public String readOptionalString() throws IOException { if (readBoolean()) { @@ -246,14 +229,6 @@ public abstract class StreamInput extends InputStream { return null; } - @Nullable - public String readOptionalSharedString() throws IOException { - if (readBoolean()) { - return readSharedString(); - } - return null; - } - private final CharsRefBuilder spare = new CharsRefBuilder(); public String readString() throws IOException { @@ -286,10 +261,6 @@ public abstract class StreamInput extends InputStream { return spare.toString(); } - public String readSharedString() throws IOException { - return readString(); - } - public final float readFloat() throws IOException { return Float.intBitsToFloat(readInt()); @@ -400,14 +371,14 @@ public abstract class StreamInput extends InputStream { int size9 = readVInt(); Map map9 = new LinkedHashMap(size9); for (int i = 0; i < size9; i++) { - map9.put(readSharedString(), readGenericValue()); + map9.put(readString(), readGenericValue()); } return map9; case 10: int size10 = readVInt(); Map map10 = new HashMap(size10); for (int i = 0; i < size10; i++) { - map10.put(readSharedString(), readGenericValue()); + map10.put(readString(), readGenericValue()); } return map10; case 11: diff --git a/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java index 0b42e58192d..53586bf5867 100644 --- a/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java +++ b/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java @@ -176,15 +176,6 @@ public abstract class StreamOutput extends OutputStream { } } - public void writeOptionalSharedString(@Nullable String str) throws IOException { - if (str == null) { - writeBoolean(false); - } else { - writeBoolean(true); - writeSharedString(str); - } - } - public void writeOptionalText(@Nullable Text text) throws IOException { if (text == null) { writeInt(-1); @@ -208,17 +199,6 @@ public abstract class StreamOutput extends OutputStream { } } - public void writeTextArray(Text[] array) throws IOException { - writeVInt(array.length); - for (Text t : array) { - writeText(t); - } - } - - public void writeSharedText(Text text) throws IOException { - writeText(text); - } - public void writeString(String str) throws IOException { int charCount = str.length(); writeVInt(charCount); @@ -238,10 +218,6 @@ public abstract class StreamOutput extends OutputStream { } } - public void writeSharedString(String str) throws IOException { - writeString(str); - } - public void writeFloat(float v) throws IOException { writeInt(Float.floatToIntBits(v)); } @@ -368,7 +344,7 @@ public abstract class StreamOutput extends OutputStream { Map map = (Map) value; writeVInt(map.size()); for (Map.Entry entry : map.entrySet()) { - writeSharedString(entry.getKey()); + writeString(entry.getKey()); writeGenericValue(entry.getValue()); } } else if (type == Byte.class) { diff --git a/src/main/java/org/elasticsearch/common/rounding/TimeZoneRounding.java b/src/main/java/org/elasticsearch/common/rounding/TimeZoneRounding.java index 4a07d27fc39..1dee393d8cf 100644 --- a/src/main/java/org/elasticsearch/common/rounding/TimeZoneRounding.java +++ b/src/main/java/org/elasticsearch/common/rounding/TimeZoneRounding.java @@ -175,15 +175,15 @@ public abstract class TimeZoneRounding extends Rounding { unit = DateTimeUnit.resolve(in.readByte()); field = unit.field(); durationField = field.getDurationField(); - preTz = DateTimeZone.forID(in.readSharedString()); - postTz = DateTimeZone.forID(in.readSharedString()); + preTz = DateTimeZone.forID(in.readString()); + postTz = DateTimeZone.forID(in.readString()); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeByte(unit.id()); - out.writeSharedString(preTz.getID()); - out.writeSharedString(postTz.getID()); + out.writeString(preTz.getID()); + out.writeString(postTz.getID()); } } @@ -287,15 +287,15 @@ public abstract class TimeZoneRounding extends Rounding { unit = DateTimeUnit.resolve(in.readByte()); field = unit.field(); durationField = field.getDurationField(); - preTz = DateTimeZone.forID(in.readSharedString()); - postTz = DateTimeZone.forID(in.readSharedString()); + preTz = DateTimeZone.forID(in.readString()); + postTz = DateTimeZone.forID(in.readString()); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeByte(unit.id()); - out.writeSharedString(preTz.getID()); - out.writeSharedString(postTz.getID()); + out.writeString(preTz.getID()); + out.writeString(postTz.getID()); } } @@ -390,15 +390,15 @@ public abstract class TimeZoneRounding extends Rounding { @Override public void readFrom(StreamInput in) throws IOException { interval = in.readVLong(); - preTz = DateTimeZone.forID(in.readSharedString()); - postTz = DateTimeZone.forID(in.readSharedString()); + preTz = DateTimeZone.forID(in.readString()); + postTz = DateTimeZone.forID(in.readString()); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeVLong(interval); - out.writeSharedString(preTz.getID()); - out.writeSharedString(postTz.getID()); + out.writeString(preTz.getID()); + out.writeString(postTz.getID()); } } @@ -447,15 +447,15 @@ public abstract class TimeZoneRounding extends Rounding { @Override public void readFrom(StreamInput in) throws IOException { interval = in.readVLong(); - preTz = DateTimeZone.forID(in.readSharedString()); - postTz = DateTimeZone.forID(in.readSharedString()); + preTz = DateTimeZone.forID(in.readString()); + postTz = DateTimeZone.forID(in.readString()); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeVLong(interval); - out.writeSharedString(preTz.getID()); - out.writeSharedString(postTz.getID()); + out.writeString(preTz.getID()); + out.writeString(postTz.getID()); } } } diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java b/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java index 8a4d8212e06..c1d4058a6d8 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java @@ -249,8 +249,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem private void sendPingRequest(int id) { try { - BytesStreamOutput bStream = new BytesStreamOutput(); - StreamOutput out = new HandlesStreamOutput(bStream); + BytesStreamOutput out = new BytesStreamOutput(); out.writeBytes(INTERNAL_HEADER); // TODO: change to min_required version! Version.writeVersion(version, out); @@ -258,7 +257,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem clusterName.writeTo(out); contextProvider.nodes().localNode().writeTo(out); out.close(); - multicastChannel.send(bStream.bytes()); + multicastChannel.send(out.bytes()); if (logger.isTraceEnabled()) { logger.trace("[{}] sending ping request", id); } @@ -404,7 +403,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem } } if (internal) { - StreamInput input = CachedStreamInput.cachedHandles(new BytesStreamInput(new BytesArray(data.toBytes(), INTERNAL_HEADER.length, data.length() - INTERNAL_HEADER.length))); + StreamInput input = new BytesStreamInput(new BytesArray(data.toBytes(), INTERNAL_HEADER.length, data.length() - INTERNAL_HEADER.length)); Version version = Version.readVersion(input); input.setVersion(version); id = input.readInt(); diff --git a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index 1e46bbb0171..ba9ca114c4b 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -105,7 +105,7 @@ public class PublishClusterStateAction extends AbstractComponent { if (bytes == null) { try { BytesStreamOutput bStream = new BytesStreamOutput(); - StreamOutput stream = new HandlesStreamOutput(CompressorFactory.defaultCompressor().streamOutput(bStream)); + StreamOutput stream = CompressorFactory.defaultCompressor().streamOutput(bStream); stream.setVersion(node.version()); ClusterState.Builder.writeTo(clusterState, stream); stream.close(); @@ -173,9 +173,9 @@ public class PublishClusterStateAction extends AbstractComponent { Compressor compressor = CompressorFactory.compressor(request.bytes()); StreamInput in; if (compressor != null) { - in = CachedStreamInput.cachedHandlesCompressed(compressor, request.bytes().streamInput()); + in = compressor.streamInput(request.bytes().streamInput()); } else { - in = CachedStreamInput.cachedHandles(request.bytes().streamInput()); + in = request.bytes().streamInput(); } in.setVersion(request.version()); ClusterState clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode(), clusterName); diff --git a/src/main/java/org/elasticsearch/index/get/GetResult.java b/src/main/java/org/elasticsearch/index/get/GetResult.java index 445da7b0ac0..ad3fc6ab646 100644 --- a/src/main/java/org/elasticsearch/index/get/GetResult.java +++ b/src/main/java/org/elasticsearch/index/get/GetResult.java @@ -261,8 +261,8 @@ public class GetResult implements Streamable, Iterable, ToXContent { @Override public void readFrom(StreamInput in) throws IOException { - index = in.readSharedString(); - type = in.readOptionalSharedString(); + index = in.readString(); + type = in.readOptionalString(); id = in.readString(); version = in.readLong(); exists = in.readBoolean(); @@ -286,8 +286,8 @@ public class GetResult implements Streamable, Iterable, ToXContent { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeSharedString(index); - out.writeOptionalSharedString(type); + out.writeString(index); + out.writeOptionalString(type); out.writeString(id); out.writeLong(version); out.writeBoolean(exists); diff --git a/src/main/java/org/elasticsearch/node/internal/InternalNode.java b/src/main/java/org/elasticsearch/node/internal/InternalNode.java index 5bcc7bdb3ff..45c89262192 100644 --- a/src/main/java/org/elasticsearch/node/internal/InternalNode.java +++ b/src/main/java/org/elasticsearch/node/internal/InternalNode.java @@ -43,7 +43,6 @@ import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.inject.Injectors; import org.elasticsearch.common.inject.ModulesBuilder; -import org.elasticsearch.common.io.CachedStreams; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; @@ -395,7 +394,6 @@ public final class InternalNode implements Node { injector.getInstance(NodeEnvironment.class).close(); injector.getInstance(PageCacheRecycler.class).close(); - CachedStreams.clear(); logger.info("closed"); } diff --git a/src/main/java/org/elasticsearch/search/SearchShardTarget.java b/src/main/java/org/elasticsearch/search/SearchShardTarget.java index ce489a188a3..c39515ec4c7 100644 --- a/src/main/java/org/elasticsearch/search/SearchShardTarget.java +++ b/src/main/java/org/elasticsearch/search/SearchShardTarget.java @@ -100,9 +100,9 @@ public class SearchShardTarget implements Streamable, Serializable, Comparable(size); for (int i = 0; i < size; i++) { @@ -104,7 +104,7 @@ public class InternalSearchHitField implements SearchHitField { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeSharedString(name); + out.writeString(name); out.writeVInt(values.size()); for (Object value : values) { out.writeGenericValue(value); diff --git a/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/src/main/java/org/elasticsearch/transport/local/LocalTransport.java index 9c6bc45bc2e..0e73a96e072 100644 --- a/src/main/java/org/elasticsearch/transport/local/LocalTransport.java +++ b/src/main/java/org/elasticsearch/transport/local/LocalTransport.java @@ -29,10 +29,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.ThrowableObjectInputStream; import org.elasticsearch.common.io.stream.BytesStreamInput; import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.CachedStreamInput; -import org.elasticsearch.common.io.stream.HandlesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.LocalTransportAddress; @@ -197,35 +194,35 @@ public class LocalTransport extends AbstractLifecycleComponent implem public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { final Version version = Version.smallest(node.version(), this.version); - BytesStreamOutput bStream = new BytesStreamOutput(); - StreamOutput stream = new HandlesStreamOutput(bStream); - stream.setVersion(version); + try (BytesStreamOutput stream = new BytesStreamOutput()) { + stream.setVersion(version); - stream.writeLong(requestId); - byte status = 0; - status = TransportStatus.setRequest(status); - stream.writeByte(status); // 0 for request, 1 for response. + stream.writeLong(requestId); + byte status = 0; + status = TransportStatus.setRequest(status); + stream.writeByte(status); // 0 for request, 1 for response. - stream.writeString(action); - request.writeTo(stream); + stream.writeString(action); + request.writeTo(stream); - stream.close(); + stream.close(); - final LocalTransport targetTransport = connectedNodes.get(node); - if (targetTransport == null) { - throw new NodeNotConnectedException(node, "Node not connected"); - } - - final byte[] data = bStream.bytes().toBytes(); - - transportServiceAdapter.sent(data.length); - - targetTransport.workers().execute(new Runnable() { - @Override - public void run() { - targetTransport.messageReceived(data, action, LocalTransport.this, version, requestId); + final LocalTransport targetTransport = connectedNodes.get(node); + if (targetTransport == null) { + throw new NodeNotConnectedException(node, "Node not connected"); } - }); + + final byte[] data = stream.bytes().toBytes(); + + transportServiceAdapter.sent(data.length); + + targetTransport.workers().execute(new Runnable() { + @Override + public void run() { + targetTransport.messageReceived(data, action, LocalTransport.this, version, requestId); + } + }); + } } ThreadPoolExecutor workers() { @@ -237,7 +234,6 @@ public class LocalTransport extends AbstractLifecycleComponent implem try { transportServiceAdapter.received(data.length); StreamInput stream = new BytesStreamInput(data, false); - stream = CachedStreamInput.cachedHandles(stream); stream.setVersion(version); long requestId = stream.readLong(); diff --git a/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java b/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java index f316e9ba69d..44c0aa6ad9e 100644 --- a/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java +++ b/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java @@ -22,7 +22,6 @@ package org.elasticsearch.transport.local; import org.elasticsearch.Version; import org.elasticsearch.common.io.ThrowableObjectOutputStream; import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.HandlesStreamOutput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.transport.*; import org.elasticsearch.transport.support.TransportStatus; @@ -62,22 +61,21 @@ public class LocalTransportChannel implements TransportChannel { @Override public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { - BytesStreamOutput bStream = new BytesStreamOutput(); - StreamOutput stream = new HandlesStreamOutput(bStream); - stream.setVersion(version); - stream.writeLong(requestId); - byte status = 0; - status = TransportStatus.setResponse(status); - stream.writeByte(status); // 0 for request, 1 for response. - response.writeTo(stream); - stream.close(); - final byte[] data = bStream.bytes().toBytes(); - targetTransport.workers().execute(new Runnable() { - @Override - public void run() { - targetTransport.messageReceived(data, action, sourceTransport, version, null); - } - }); + try (BytesStreamOutput stream = new BytesStreamOutput()) { + stream.setVersion(version); + stream.writeLong(requestId); + byte status = 0; + status = TransportStatus.setResponse(status); + stream.writeByte(status); // 0 for request, 1 for response. + response.writeTo(stream); + final byte[] data = stream.bytes().toBytes(); + targetTransport.workers().execute(new Runnable() { + @Override + public void run() { + targetTransport.messageReceived(data, action, sourceTransport, version, null); + } + }); + } } @Override diff --git a/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java index c74ffa6fd60..e415de5118e 100644 --- a/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java +++ b/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.compress.Compressor; import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.ThrowableObjectInputStream; -import org.elasticsearch.common.io.stream.CachedStreamInput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.transport.InetSocketTransportAddress; @@ -102,9 +101,9 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { sb.append("]"); throw new ElasticsearchIllegalStateException(sb.toString()); } - wrappedStream = CachedStreamInput.cachedHandlesCompressed(compressor, streamIn); + wrappedStream = compressor.streamInput(streamIn); } else { - wrappedStream = CachedStreamInput.cachedHandles(streamIn); + wrappedStream = streamIn; } wrappedStream.setVersion(version); diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index d7a06234bb4..a46233cc338 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -31,7 +31,6 @@ import org.elasticsearch.common.bytes.ReleasablePagedBytesReference; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.HandlesStreamOutput; import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lease.Releasables; @@ -665,7 +664,6 @@ public class NettyTransport extends AbstractLifecycleComponent implem status = TransportStatus.setCompress(status); stream = CompressorFactory.defaultCompressor().streamOutput(stream); } - stream = new HandlesStreamOutput(stream); // we pick the smallest of the 2, to support both backward and forward compatibility // note, this is the only place we need to do this, since from here on, we use the serialized version diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java index 924ec2c74da..e1d0b5b21ae 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.bytes.ReleasablePagedBytesReference; import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.ThrowableObjectOutputStream; import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.HandlesStreamOutput; import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lease.Releasables; @@ -86,7 +85,6 @@ public class NettyTransportChannel implements TransportChannel { status = TransportStatus.setCompress(status); stream = CompressorFactory.defaultCompressor().streamOutput(stream); } - stream = new HandlesStreamOutput(stream); stream.setVersion(version); response.writeTo(stream); stream.close(); diff --git a/src/test/java/org/elasticsearch/common/io/streams/HandlesStreamsTests.java b/src/test/java/org/elasticsearch/common/io/streams/HandlesStreamsTests.java deleted file mode 100644 index a28082f83a4..00000000000 --- a/src/test/java/org/elasticsearch/common/io/streams/HandlesStreamsTests.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.io.streams; - -import org.elasticsearch.common.io.stream.BytesStreamInput; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.HandlesStreamInput; -import org.elasticsearch.common.io.stream.HandlesStreamOutput; -import org.elasticsearch.test.ElasticsearchTestCase; -import org.junit.Test; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.*; - -/** - * - */ -public class HandlesStreamsTests extends ElasticsearchTestCase { - - @Test - public void testSharedStringHandles() throws Exception { - String test1 = "test1"; - String test2 = "test2"; - String test3 = "test3"; - String test4 = "test4"; - String test5 = "test5"; - String test6 = "test6"; - - BytesStreamOutput bout = new BytesStreamOutput(); - HandlesStreamOutput out = new HandlesStreamOutput(bout); - out.writeString(test1); - out.writeString(test1); - out.writeString(test2); - out.writeString(test3); - out.writeSharedString(test4); - out.writeSharedString(test4); - out.writeSharedString(test5); - out.writeSharedString(test6); - - BytesStreamInput bin = new BytesStreamInput(bout.bytes()); - HandlesStreamInput in = new HandlesStreamInput(bin); - String s1 = in.readString(); - String s2 = in.readString(); - String s3 = in.readString(); - String s4 = in.readString(); - String s5 = in.readSharedString(); - String s6 = in.readSharedString(); - String s7 = in.readSharedString(); - String s8 = in.readSharedString(); - - assertThat(s1, equalTo(test1)); - assertThat(s2, equalTo(test1)); - assertThat(s3, equalTo(test2)); - assertThat(s4, equalTo(test3)); - assertThat(s5, equalTo(test4)); - assertThat(s6, equalTo(test4)); - assertThat(s7, equalTo(test5)); - assertThat(s8, equalTo(test6)); - - assertThat(s1, not(sameInstance(s2))); - assertThat(s5, sameInstance(s6)); - } -}