Remove HandlesStreamInput/Output

The optimization we do in the HandlesStreamInput / Output
adds a lot of complexity with a rather unknown benefit. It tries
to compress commonly used strings and write ids instead. This
should rather be done on a lower level if at all necessary for
the small message we send over the network.
This commit is contained in:
Simon Willnauer 2015-01-29 16:49:39 +01:00
parent 1d77c3af82
commit c0fa60eb26
34 changed files with 112 additions and 903 deletions

View File

@ -247,7 +247,7 @@ public class BulkItemResponse implements Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
id = in.readVInt();
opType = in.readSharedString();
opType = in.readString();
byte type = in.readByte();
if (type == 0) {
@ -262,8 +262,8 @@ public class BulkItemResponse implements Streamable {
}
if (in.readBoolean()) {
String fIndex = in.readSharedString();
String fType = in.readSharedString();
String fIndex = in.readString();
String fType = in.readString();
String fId = in.readOptionalString();
String fMessage = in.readString();
RestStatus status = RestStatus.readFrom(in);
@ -274,7 +274,7 @@ public class BulkItemResponse implements Streamable {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(id);
out.writeSharedString(opType);
out.writeString(opType);
if (response == null) {
out.writeByte((byte) 2);
@ -292,8 +292,8 @@ public class BulkItemResponse implements Streamable {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeSharedString(failure.getIndex());
out.writeSharedString(failure.getType());
out.writeString(failure.getIndex());
out.writeString(failure.getType());
out.writeOptionalString(failure.getId());
out.writeString(failure.getMessage());
RestStatus.writeTo(out, failure.getStatus());

View File

@ -223,7 +223,7 @@ public class DeleteRequest extends ShardReplicationOperationRequest<DeleteReques
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
type = in.readSharedString();
type = in.readString();
id = in.readString();
routing = in.readOptionalString();
refresh = in.readBoolean();
@ -234,7 +234,7 @@ public class DeleteRequest extends ShardReplicationOperationRequest<DeleteReques
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeSharedString(type);
out.writeString(type);
out.writeString(id);
out.writeOptionalString(routing());
out.writeBoolean(refresh);

View File

@ -89,8 +89,8 @@ public class DeleteResponse extends ActionWriteResponse {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
index = in.readSharedString();
type = in.readSharedString();
index = in.readString();
type = in.readString();
id = in.readString();
version = in.readLong();
found = in.readBoolean();
@ -99,8 +99,8 @@ public class DeleteResponse extends ActionWriteResponse {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeSharedString(index);
out.writeSharedString(type);
out.writeString(index);
out.writeString(type);
out.writeString(id);
out.writeLong(version);
out.writeBoolean(found);

View File

@ -19,7 +19,6 @@
package org.elasticsearch.action.get;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ValidateActions;
@ -287,7 +286,7 @@ public class GetRequest extends SingleShardOperationRequest<GetRequest> {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
type = in.readSharedString();
type = in.readString();
id = in.readString();
routing = in.readOptionalString();
preference = in.readOptionalString();
@ -316,7 +315,7 @@ public class GetRequest extends SingleShardOperationRequest<GetRequest> {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeSharedString(type);
out.writeString(type);
out.writeString(id);
out.writeOptionalString(routing);
out.writeOptionalString(preference);

View File

@ -22,7 +22,6 @@ package org.elasticsearch.action.get;
import com.google.common.collect.Iterators;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.action.*;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Nullable;
@ -174,8 +173,8 @@ public class MultiGetRequest extends ActionRequest<MultiGetRequest> implements I
@Override
public void readFrom(StreamInput in) throws IOException {
index = in.readSharedString();
type = in.readOptionalSharedString();
index = in.readString();
type = in.readOptionalString();
id = in.readString();
routing = in.readOptionalString();
int size = in.readVInt();
@ -193,8 +192,8 @@ public class MultiGetRequest extends ActionRequest<MultiGetRequest> implements I
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeSharedString(index);
out.writeOptionalSharedString(type);
out.writeString(index);
out.writeOptionalString(type);
out.writeString(id);
out.writeOptionalString(routing);
if (fields == null) {

View File

@ -676,7 +676,7 @@ public class IndexRequest extends ShardReplicationOperationRequest<IndexRequest>
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
type = in.readSharedString();
type = in.readString();
id = in.readOptionalString();
routing = in.readOptionalString();
parent = in.readOptionalString();
@ -695,7 +695,7 @@ public class IndexRequest extends ShardReplicationOperationRequest<IndexRequest>
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeSharedString(type);
out.writeString(type);
out.writeOptionalString(id);
out.writeOptionalString(routing);
out.writeOptionalString(parent);

View File

@ -89,8 +89,8 @@ public class IndexResponse extends ActionWriteResponse {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
index = in.readSharedString();
type = in.readSharedString();
index = in.readString();
type = in.readString();
id = in.readString();
version = in.readLong();
created = in.readBoolean();
@ -99,8 +99,8 @@ public class IndexResponse extends ActionWriteResponse {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeSharedString(index);
out.writeSharedString(type);
out.writeString(index);
out.writeString(type);
out.writeString(id);
out.writeLong(version);
out.writeBoolean(created);

View File

@ -19,7 +19,6 @@
package org.elasticsearch.action.support.replication;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
@ -200,7 +199,7 @@ public abstract class ShardReplicationOperationRequest<T extends ShardReplicatio
replicationType = ReplicationType.fromId(in.readByte());
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
timeout = TimeValue.readTimeValue(in);
index = in.readSharedString();
index = in.readString();
canHaveDuplicates = in.readBoolean();
// no need to serialize threaded* parameters, since they only matter locally
}
@ -211,7 +210,7 @@ public abstract class ShardReplicationOperationRequest<T extends ShardReplicatio
out.writeByte(replicationType.id());
out.writeByte(consistencyLevel.id());
timeout.writeTo(out);
out.writeSharedString(index);
out.writeString(index);
out.writeBoolean(canHaveDuplicates);
}

View File

@ -631,7 +631,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
super.readFrom(in);
replicationType = ReplicationType.fromId(in.readByte());
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
type = in.readSharedString();
type = in.readString();
id = in.readString();
routing = in.readOptionalString();
script = in.readOptionalString();
@ -669,7 +669,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
super.writeTo(out);
out.writeByte(replicationType.id());
out.writeByte(consistencyLevel.id());
out.writeSharedString(type);
out.writeString(type);
out.writeString(id);
out.writeOptionalString(routing);
out.writeOptionalString(script);

View File

@ -104,8 +104,8 @@ public class UpdateResponse extends ActionWriteResponse {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
index = in.readSharedString();
type = in.readSharedString();
index = in.readString();
type = in.readString();
id = in.readString();
version = in.readLong();
created = in.readBoolean();
@ -117,8 +117,8 @@ public class UpdateResponse extends ActionWriteResponse {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeSharedString(index);
out.writeSharedString(type);
out.writeString(index);
out.writeString(type);
out.writeString(id);
out.writeLong(version);
out.writeBoolean(created);

View File

@ -66,7 +66,6 @@ import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.io.CachedStreams;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
@ -286,8 +285,6 @@ public class TransportClient extends AbstractClient {
}
injector.getInstance(PageCacheRecycler.class).close();
CachedStreams.clear();
}
@Override

View File

@ -1,29 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io;
import org.elasticsearch.common.io.stream.CachedStreamInput;
public class CachedStreams {
public static void clear() {
CachedStreamInput.clear();
}
}

View File

@ -1,174 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.stream;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.text.Text;
import java.io.IOException;
/**
*/
public abstract class AdapterStreamInput extends StreamInput {
protected StreamInput in;
protected AdapterStreamInput() {
}
public AdapterStreamInput(StreamInput in) {
this.in = in;
super.setVersion(in.getVersion());
}
@Override
public StreamInput setVersion(Version version) {
in.setVersion(version);
return super.setVersion(version);
}
public void reset(StreamInput in) {
this.in = in;
}
@Override
public byte readByte() throws IOException {
return in.readByte();
}
@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
in.readBytes(b, offset, len);
}
@Override
public BytesReference readBytesReference() throws IOException {
return in.readBytesReference();
}
@Override
public BytesReference readBytesReference(int length) throws IOException {
return in.readBytesReference(length);
}
@Override
public void reset() throws IOException {
in.reset();
}
@Override
public void close() throws IOException {
in.close();
}
@Override
public int read() throws IOException {
return in.read();
}
// override ones to direct them
@Override
public void readFully(byte[] b) throws IOException {
in.readFully(b);
}
@Override
public short readShort() throws IOException {
return in.readShort();
}
@Override
public int readInt() throws IOException {
return in.readInt();
}
@Override
public int readVInt() throws IOException {
return in.readVInt();
}
@Override
public long readLong() throws IOException {
return in.readLong();
}
@Override
public long readVLong() throws IOException {
return in.readVLong();
}
@Override
public String readString() throws IOException {
return in.readString();
}
@Override
public String readSharedString() throws IOException {
return in.readSharedString();
}
@Override
public Text readText() throws IOException {
return in.readText();
}
@Override
public Text readSharedText() throws IOException {
return in.readSharedText();
}
@Override
public int read(byte[] b) throws IOException {
return in.read(b);
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
return in.read(b, off, len);
}
@Override
public long skip(long n) throws IOException {
return in.skip(n);
}
@Override
public int available() throws IOException {
return in.available();
}
@Override
public void mark(int readlimit) {
in.mark(readlimit);
}
@Override
public boolean markSupported() {
return in.markSupported();
}
@Override
public String toString() {
return in.toString();
}
}

View File

@ -1,183 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.stream;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.text.Text;
import java.io.IOException;
/**
*/
public class AdapterStreamOutput extends StreamOutput {
protected StreamOutput out;
public AdapterStreamOutput(StreamOutput out) {
this.out = out;
super.setVersion(out.getVersion());
}
@Override
public StreamOutput setVersion(Version version) {
out.setVersion(version);
return super.setVersion(version);
}
public void setOut(StreamOutput out) {
this.out = out;
}
public StreamOutput wrappedOut() {
return this.out;
}
@Override
public boolean seekPositionSupported() {
return out.seekPositionSupported();
}
@Override
public long position() throws IOException {
return out.position();
}
@Override
public void seek(long position) throws IOException {
out.seek(position);
}
@Override
public void writeByte(byte b) throws IOException {
out.writeByte(b);
}
@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
out.writeBytes(b, offset, length);
}
@Override
public void flush() throws IOException {
out.flush();
}
@Override
public void close() throws IOException {
out.close();
}
@Override
public void reset() throws IOException {
out.reset();
}
@Override
public void writeBytes(byte[] b) throws IOException {
out.writeBytes(b);
}
@Override
public void writeBytes(byte[] b, int length) throws IOException {
out.writeBytes(b, length);
}
@Override
public void writeBytesReference(@Nullable BytesReference bytes) throws IOException {
out.writeBytesReference(bytes);
}
@Override
public void writeInt(int i) throws IOException {
out.writeInt(i);
}
@Override
public void writeVInt(int i) throws IOException {
out.writeVInt(i);
}
@Override
public void writeLong(long i) throws IOException {
out.writeLong(i);
}
@Override
public void writeVLong(long i) throws IOException {
out.writeVLong(i);
}
@Override
public void writeString(String str) throws IOException {
out.writeString(str);
}
@Override
public void writeSharedString(String str) throws IOException {
out.writeSharedString(str);
}
@Override
public void writeText(Text text) throws IOException {
out.writeText(text);
}
@Override
public void writeSharedText(Text text) throws IOException {
out.writeSharedText(text);
}
@Override
public void writeFloat(float v) throws IOException {
out.writeFloat(v);
}
@Override
public void writeDouble(double v) throws IOException {
out.writeDouble(v);
}
@Override
public void writeBoolean(boolean b) throws IOException {
out.writeBoolean(b);
}
@Override
public void write(int b) throws IOException {
out.write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
}
@Override
public void write(byte[] b) throws IOException {
out.write(b);
}
@Override
public String toString() {
return out.toString();
}
}

View File

@ -1,72 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.compress.Compressor;
import java.io.IOException;
import java.lang.ref.SoftReference;
/**
*
*/
public class CachedStreamInput {
static class Entry {
final HandlesStreamInput handles;
Entry(HandlesStreamInput handles) {
this.handles = handles;
}
}
private static final ThreadLocal<SoftReference<Entry>> cache = new ThreadLocal<>();
static Entry instance() {
SoftReference<Entry> ref = cache.get();
Entry entry = ref == null ? null : ref.get();
if (entry == null) {
HandlesStreamInput handles = new HandlesStreamInput();
entry = new Entry(handles);
cache.set(new SoftReference<>(entry));
}
return entry;
}
public static void clear() {
cache.remove();
}
public static StreamInput compressed(Compressor compressor, StreamInput in) throws IOException {
return compressor.streamInput(in);
}
public static HandlesStreamInput cachedHandles(StreamInput in) {
HandlesStreamInput handles = instance().handles;
handles.reset(in);
return handles;
}
public static HandlesStreamInput cachedHandlesCompressed(Compressor compressor, StreamInput in) throws IOException {
Entry entry = instance();
entry.handles.reset(compressor.streamInput(in));
return entry.handles;
}
}

View File

@ -1,96 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.stream;
import com.carrotsearch.hppc.IntObjectOpenHashMap;
import org.elasticsearch.common.text.Text;
import java.io.IOException;
/**
*
*/
public class HandlesStreamInput extends AdapterStreamInput {
private final IntObjectOpenHashMap<String> handles = new IntObjectOpenHashMap<>();
private final IntObjectOpenHashMap<Text> handlesText = new IntObjectOpenHashMap<>();
HandlesStreamInput() {
super();
}
public HandlesStreamInput(StreamInput in) {
super(in);
}
@Override
public String readSharedString() throws IOException {
byte b = in.readByte();
if (b == 0) {
// full string with handle
int handle = in.readVInt();
String s = in.readString();
handles.put(handle, s);
return s;
} else if (b == 1) {
return handles.get(in.readVInt());
} else {
throw new IOException("Expected handle header, got [" + b + "]");
}
}
@Override
public String readString() throws IOException {
return in.readString();
}
@Override
public Text readSharedText() throws IOException {
byte b = in.readByte();
if (b == 0) {
int handle = in.readVInt();
Text s = in.readText();
handlesText.put(handle, s);
return s;
} else if (b == 1) {
return handlesText.get(in.readVInt());
} else if (b == 2) {
return in.readText();
} else {
throw new IOException("Expected handle header, got [" + b + "]");
}
}
@Override
public void reset() throws IOException {
super.reset();
cleanHandles();
}
public void reset(StreamInput in) {
super.reset(in);
cleanHandles();
}
public void cleanHandles() {
handles.clear();
handlesText.clear();
}
}

View File

@ -1,84 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.stream;
import com.carrotsearch.hppc.ObjectIntOpenHashMap;
import org.elasticsearch.common.text.Text;
import java.io.IOException;
/**
*
*/
public class HandlesStreamOutput extends AdapterStreamOutput {
private final ObjectIntOpenHashMap<String> handles = new ObjectIntOpenHashMap<>();
private final ObjectIntOpenHashMap<Text> handlesText = new ObjectIntOpenHashMap<>();
public HandlesStreamOutput(StreamOutput out) {
super(out);
}
@Override
public void writeSharedString(String str) throws IOException {
if (handles.containsKey(str)) {
out.writeByte((byte) 1);
out.writeVInt(handles.lget());
} else {
int handle = handles.size();
handles.put(str, handle);
out.writeByte((byte) 0);
out.writeVInt(handle);
out.writeString(str);
}
}
@Override
public void writeString(String s) throws IOException {
out.writeString(s);
}
@Override
public void writeSharedText(Text text) throws IOException {
if (handlesText.containsKey(text)) {
out.writeByte((byte) 1);
out.writeVInt(handlesText.lget());
} else {
int handle = handlesText.size();
handlesText.put(text, handle);
out.writeByte((byte) 0);
out.writeVInt(handle);
out.writeText(text);
}
}
@Override
public void reset() throws IOException {
clear();
if (out != null) {
out.reset();
}
}
public void clear() {
handles.clear();
handlesText.clear();
}
}

View File

@ -27,7 +27,6 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.text.StringAndBytesText;
import org.elasticsearch.common.text.StringText;
import org.elasticsearch.common.text.Text;
import org.joda.time.DateTime;
@ -222,22 +221,6 @@ public abstract class StreamInput extends InputStream {
return new StringAndBytesText(readBytesReference(length));
}
public Text[] readTextArray() throws IOException {
int size = readVInt();
if (size == 0) {
return StringText.EMPTY_ARRAY;
}
Text[] ret = new Text[size];
for (int i = 0; i < size; i++) {
ret[i] = readText();
}
return ret;
}
public Text readSharedText() throws IOException {
return readText();
}
@Nullable
public String readOptionalString() throws IOException {
if (readBoolean()) {
@ -246,14 +229,6 @@ public abstract class StreamInput extends InputStream {
return null;
}
@Nullable
public String readOptionalSharedString() throws IOException {
if (readBoolean()) {
return readSharedString();
}
return null;
}
private final CharsRefBuilder spare = new CharsRefBuilder();
public String readString() throws IOException {
@ -286,10 +261,6 @@ public abstract class StreamInput extends InputStream {
return spare.toString();
}
public String readSharedString() throws IOException {
return readString();
}
public final float readFloat() throws IOException {
return Float.intBitsToFloat(readInt());
@ -400,14 +371,14 @@ public abstract class StreamInput extends InputStream {
int size9 = readVInt();
Map map9 = new LinkedHashMap(size9);
for (int i = 0; i < size9; i++) {
map9.put(readSharedString(), readGenericValue());
map9.put(readString(), readGenericValue());
}
return map9;
case 10:
int size10 = readVInt();
Map map10 = new HashMap(size10);
for (int i = 0; i < size10; i++) {
map10.put(readSharedString(), readGenericValue());
map10.put(readString(), readGenericValue());
}
return map10;
case 11:

View File

@ -176,15 +176,6 @@ public abstract class StreamOutput extends OutputStream {
}
}
public void writeOptionalSharedString(@Nullable String str) throws IOException {
if (str == null) {
writeBoolean(false);
} else {
writeBoolean(true);
writeSharedString(str);
}
}
public void writeOptionalText(@Nullable Text text) throws IOException {
if (text == null) {
writeInt(-1);
@ -208,17 +199,6 @@ public abstract class StreamOutput extends OutputStream {
}
}
public void writeTextArray(Text[] array) throws IOException {
writeVInt(array.length);
for (Text t : array) {
writeText(t);
}
}
public void writeSharedText(Text text) throws IOException {
writeText(text);
}
public void writeString(String str) throws IOException {
int charCount = str.length();
writeVInt(charCount);
@ -238,10 +218,6 @@ public abstract class StreamOutput extends OutputStream {
}
}
public void writeSharedString(String str) throws IOException {
writeString(str);
}
public void writeFloat(float v) throws IOException {
writeInt(Float.floatToIntBits(v));
}
@ -368,7 +344,7 @@ public abstract class StreamOutput extends OutputStream {
Map<String, Object> map = (Map<String, Object>) value;
writeVInt(map.size());
for (Map.Entry<String, Object> entry : map.entrySet()) {
writeSharedString(entry.getKey());
writeString(entry.getKey());
writeGenericValue(entry.getValue());
}
} else if (type == Byte.class) {

View File

@ -175,15 +175,15 @@ public abstract class TimeZoneRounding extends Rounding {
unit = DateTimeUnit.resolve(in.readByte());
field = unit.field();
durationField = field.getDurationField();
preTz = DateTimeZone.forID(in.readSharedString());
postTz = DateTimeZone.forID(in.readSharedString());
preTz = DateTimeZone.forID(in.readString());
postTz = DateTimeZone.forID(in.readString());
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeByte(unit.id());
out.writeSharedString(preTz.getID());
out.writeSharedString(postTz.getID());
out.writeString(preTz.getID());
out.writeString(postTz.getID());
}
}
@ -287,15 +287,15 @@ public abstract class TimeZoneRounding extends Rounding {
unit = DateTimeUnit.resolve(in.readByte());
field = unit.field();
durationField = field.getDurationField();
preTz = DateTimeZone.forID(in.readSharedString());
postTz = DateTimeZone.forID(in.readSharedString());
preTz = DateTimeZone.forID(in.readString());
postTz = DateTimeZone.forID(in.readString());
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeByte(unit.id());
out.writeSharedString(preTz.getID());
out.writeSharedString(postTz.getID());
out.writeString(preTz.getID());
out.writeString(postTz.getID());
}
}
@ -390,15 +390,15 @@ public abstract class TimeZoneRounding extends Rounding {
@Override
public void readFrom(StreamInput in) throws IOException {
interval = in.readVLong();
preTz = DateTimeZone.forID(in.readSharedString());
postTz = DateTimeZone.forID(in.readSharedString());
preTz = DateTimeZone.forID(in.readString());
postTz = DateTimeZone.forID(in.readString());
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(interval);
out.writeSharedString(preTz.getID());
out.writeSharedString(postTz.getID());
out.writeString(preTz.getID());
out.writeString(postTz.getID());
}
}
@ -447,15 +447,15 @@ public abstract class TimeZoneRounding extends Rounding {
@Override
public void readFrom(StreamInput in) throws IOException {
interval = in.readVLong();
preTz = DateTimeZone.forID(in.readSharedString());
postTz = DateTimeZone.forID(in.readSharedString());
preTz = DateTimeZone.forID(in.readString());
postTz = DateTimeZone.forID(in.readString());
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(interval);
out.writeSharedString(preTz.getID());
out.writeSharedString(postTz.getID());
out.writeString(preTz.getID());
out.writeString(postTz.getID());
}
}
}

View File

@ -249,8 +249,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
private void sendPingRequest(int id) {
try {
BytesStreamOutput bStream = new BytesStreamOutput();
StreamOutput out = new HandlesStreamOutput(bStream);
BytesStreamOutput out = new BytesStreamOutput();
out.writeBytes(INTERNAL_HEADER);
// TODO: change to min_required version!
Version.writeVersion(version, out);
@ -258,7 +257,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
clusterName.writeTo(out);
contextProvider.nodes().localNode().writeTo(out);
out.close();
multicastChannel.send(bStream.bytes());
multicastChannel.send(out.bytes());
if (logger.isTraceEnabled()) {
logger.trace("[{}] sending ping request", id);
}
@ -404,7 +403,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
}
}
if (internal) {
StreamInput input = CachedStreamInput.cachedHandles(new BytesStreamInput(new BytesArray(data.toBytes(), INTERNAL_HEADER.length, data.length() - INTERNAL_HEADER.length)));
StreamInput input = new BytesStreamInput(new BytesArray(data.toBytes(), INTERNAL_HEADER.length, data.length() - INTERNAL_HEADER.length));
Version version = Version.readVersion(input);
input.setVersion(version);
id = input.readInt();

View File

@ -105,7 +105,7 @@ public class PublishClusterStateAction extends AbstractComponent {
if (bytes == null) {
try {
BytesStreamOutput bStream = new BytesStreamOutput();
StreamOutput stream = new HandlesStreamOutput(CompressorFactory.defaultCompressor().streamOutput(bStream));
StreamOutput stream = CompressorFactory.defaultCompressor().streamOutput(bStream);
stream.setVersion(node.version());
ClusterState.Builder.writeTo(clusterState, stream);
stream.close();
@ -173,9 +173,9 @@ public class PublishClusterStateAction extends AbstractComponent {
Compressor compressor = CompressorFactory.compressor(request.bytes());
StreamInput in;
if (compressor != null) {
in = CachedStreamInput.cachedHandlesCompressed(compressor, request.bytes().streamInput());
in = compressor.streamInput(request.bytes().streamInput());
} else {
in = CachedStreamInput.cachedHandles(request.bytes().streamInput());
in = request.bytes().streamInput();
}
in.setVersion(request.version());
ClusterState clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode(), clusterName);

View File

@ -261,8 +261,8 @@ public class GetResult implements Streamable, Iterable<GetField>, ToXContent {
@Override
public void readFrom(StreamInput in) throws IOException {
index = in.readSharedString();
type = in.readOptionalSharedString();
index = in.readString();
type = in.readOptionalString();
id = in.readString();
version = in.readLong();
exists = in.readBoolean();
@ -286,8 +286,8 @@ public class GetResult implements Streamable, Iterable<GetField>, ToXContent {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeSharedString(index);
out.writeOptionalSharedString(type);
out.writeString(index);
out.writeOptionalString(type);
out.writeString(id);
out.writeLong(version);
out.writeBoolean(exists);

View File

@ -43,7 +43,6 @@ import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.Injectors;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.io.CachedStreams;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
@ -395,7 +394,6 @@ public final class InternalNode implements Node {
injector.getInstance(NodeEnvironment.class).close();
injector.getInstance(PageCacheRecycler.class).close();
CachedStreams.clear();
logger.info("closed");
}

View File

@ -100,9 +100,9 @@ public class SearchShardTarget implements Streamable, Serializable, Comparable<S
@Override
public void readFrom(StreamInput in) throws IOException {
if (in.readBoolean()) {
nodeId = in.readSharedText();
nodeId = in.readText();
}
index = in.readSharedText();
index = in.readText();
shardId = in.readVInt();
}
@ -112,9 +112,9 @@ public class SearchShardTarget implements Streamable, Serializable, Comparable<S
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeSharedText(nodeId);
out.writeText(nodeId);
}
out.writeSharedText(index);
out.writeText(index);
out.writeVInt(shardId);
}

View File

@ -86,7 +86,7 @@ public class HighlightField implements Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
name = in.readSharedString();
name = in.readString();
if (in.readBoolean()) {
int size = in.readVInt();
if (size == 0) {
@ -102,7 +102,7 @@ public class HighlightField implements Streamable {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeSharedString(name);
out.writeString(name);
if (fragments == null) {
out.writeBoolean(false);
} else {

View File

@ -555,7 +555,7 @@ public class InternalSearchHit implements SearchHit {
public void readFrom(StreamInput in, InternalSearchHits.StreamContext context) throws IOException {
score = in.readFloat();
id = in.readText();
type = in.readSharedText();
type = in.readText();
nestedIdentity = in.readOptionalStreamable(new InternalNestedIdentity());
version = in.readLong();
source = in.readBytesReference();
@ -701,7 +701,7 @@ public class InternalSearchHit implements SearchHit {
public void writeTo(StreamOutput out, InternalSearchHits.StreamContext context) throws IOException {
out.writeFloat(score);
out.writeText(id);
out.writeSharedText(type);
out.writeText(type);
out.writeOptionalStreamable(nestedIdentity);
out.writeLong(version);
out.writeBytesReference(source);

View File

@ -94,7 +94,7 @@ public class InternalSearchHitField implements SearchHitField {
@Override
public void readFrom(StreamInput in) throws IOException {
name = in.readSharedString();
name = in.readString();
int size = in.readVInt();
values = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
@ -104,7 +104,7 @@ public class InternalSearchHitField implements SearchHitField {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeSharedString(name);
out.writeString(name);
out.writeVInt(values.size());
for (Object value : values) {
out.writeGenericValue(value);

View File

@ -29,10 +29,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.ThrowableObjectInputStream;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.CachedStreamInput;
import org.elasticsearch.common.io.stream.HandlesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.LocalTransportAddress;
@ -197,35 +194,35 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
final Version version = Version.smallest(node.version(), this.version);
BytesStreamOutput bStream = new BytesStreamOutput();
StreamOutput stream = new HandlesStreamOutput(bStream);
stream.setVersion(version);
try (BytesStreamOutput stream = new BytesStreamOutput()) {
stream.setVersion(version);
stream.writeLong(requestId);
byte status = 0;
status = TransportStatus.setRequest(status);
stream.writeByte(status); // 0 for request, 1 for response.
stream.writeLong(requestId);
byte status = 0;
status = TransportStatus.setRequest(status);
stream.writeByte(status); // 0 for request, 1 for response.
stream.writeString(action);
request.writeTo(stream);
stream.writeString(action);
request.writeTo(stream);
stream.close();
stream.close();
final LocalTransport targetTransport = connectedNodes.get(node);
if (targetTransport == null) {
throw new NodeNotConnectedException(node, "Node not connected");
}
final byte[] data = bStream.bytes().toBytes();
transportServiceAdapter.sent(data.length);
targetTransport.workers().execute(new Runnable() {
@Override
public void run() {
targetTransport.messageReceived(data, action, LocalTransport.this, version, requestId);
final LocalTransport targetTransport = connectedNodes.get(node);
if (targetTransport == null) {
throw new NodeNotConnectedException(node, "Node not connected");
}
});
final byte[] data = stream.bytes().toBytes();
transportServiceAdapter.sent(data.length);
targetTransport.workers().execute(new Runnable() {
@Override
public void run() {
targetTransport.messageReceived(data, action, LocalTransport.this, version, requestId);
}
});
}
}
ThreadPoolExecutor workers() {
@ -237,7 +234,6 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
try {
transportServiceAdapter.received(data.length);
StreamInput stream = new BytesStreamInput(data, false);
stream = CachedStreamInput.cachedHandles(stream);
stream.setVersion(version);
long requestId = stream.readLong();

View File

@ -22,7 +22,6 @@ package org.elasticsearch.transport.local;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.ThrowableObjectOutputStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.HandlesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.support.TransportStatus;
@ -62,22 +61,21 @@ public class LocalTransportChannel implements TransportChannel {
@Override
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
BytesStreamOutput bStream = new BytesStreamOutput();
StreamOutput stream = new HandlesStreamOutput(bStream);
stream.setVersion(version);
stream.writeLong(requestId);
byte status = 0;
status = TransportStatus.setResponse(status);
stream.writeByte(status); // 0 for request, 1 for response.
response.writeTo(stream);
stream.close();
final byte[] data = bStream.bytes().toBytes();
targetTransport.workers().execute(new Runnable() {
@Override
public void run() {
targetTransport.messageReceived(data, action, sourceTransport, version, null);
}
});
try (BytesStreamOutput stream = new BytesStreamOutput()) {
stream.setVersion(version);
stream.writeLong(requestId);
byte status = 0;
status = TransportStatus.setResponse(status);
stream.writeByte(status); // 0 for request, 1 for response.
response.writeTo(stream);
final byte[] data = stream.bytes().toBytes();
targetTransport.workers().execute(new Runnable() {
@Override
public void run() {
targetTransport.messageReceived(data, action, sourceTransport, version, null);
}
});
}
}
@Override

View File

@ -25,7 +25,6 @@ import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.ThrowableObjectInputStream;
import org.elasticsearch.common.io.stream.CachedStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
@ -102,9 +101,9 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
sb.append("]");
throw new ElasticsearchIllegalStateException(sb.toString());
}
wrappedStream = CachedStreamInput.cachedHandlesCompressed(compressor, streamIn);
wrappedStream = compressor.streamInput(streamIn);
} else {
wrappedStream = CachedStreamInput.cachedHandles(streamIn);
wrappedStream = streamIn;
}
wrappedStream.setVersion(version);

View File

@ -31,7 +31,6 @@ import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.HandlesStreamOutput;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasables;
@ -665,7 +664,6 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
status = TransportStatus.setCompress(status);
stream = CompressorFactory.defaultCompressor().streamOutput(stream);
}
stream = new HandlesStreamOutput(stream);
// we pick the smallest of the 2, to support both backward and forward compatibility
// note, this is the only place we need to do this, since from here on, we use the serialized version

View File

@ -25,7 +25,6 @@ import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.ThrowableObjectOutputStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.HandlesStreamOutput;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasables;
@ -86,7 +85,6 @@ public class NettyTransportChannel implements TransportChannel {
status = TransportStatus.setCompress(status);
stream = CompressorFactory.defaultCompressor().streamOutput(stream);
}
stream = new HandlesStreamOutput(stream);
stream.setVersion(version);
response.writeTo(stream);
stream.close();

View File

@ -1,80 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.streams;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.HandlesStreamInput;
import org.elasticsearch.common.io.stream.HandlesStreamOutput;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
/**
*
*/
public class HandlesStreamsTests extends ElasticsearchTestCase {
@Test
public void testSharedStringHandles() throws Exception {
String test1 = "test1";
String test2 = "test2";
String test3 = "test3";
String test4 = "test4";
String test5 = "test5";
String test6 = "test6";
BytesStreamOutput bout = new BytesStreamOutput();
HandlesStreamOutput out = new HandlesStreamOutput(bout);
out.writeString(test1);
out.writeString(test1);
out.writeString(test2);
out.writeString(test3);
out.writeSharedString(test4);
out.writeSharedString(test4);
out.writeSharedString(test5);
out.writeSharedString(test6);
BytesStreamInput bin = new BytesStreamInput(bout.bytes());
HandlesStreamInput in = new HandlesStreamInput(bin);
String s1 = in.readString();
String s2 = in.readString();
String s3 = in.readString();
String s4 = in.readString();
String s5 = in.readSharedString();
String s6 = in.readSharedString();
String s7 = in.readSharedString();
String s8 = in.readSharedString();
assertThat(s1, equalTo(test1));
assertThat(s2, equalTo(test1));
assertThat(s3, equalTo(test2));
assertThat(s4, equalTo(test3));
assertThat(s5, equalTo(test4));
assertThat(s6, equalTo(test4));
assertThat(s7, equalTo(test5));
assertThat(s8, equalTo(test6));
assertThat(s1, not(sameInstance(s2)));
assertThat(s5, sameInstance(s6));
}
}