introduce read/writeSharedString while streaming
currently, we treat all strings as shared (either by full equality or identity equality), while almost all times we know if they should be serialized as shared or not. Add an explicitly write/readSharedString, and use it where applicable, and all other write/readString will not treat them as shared relates to #3322
This commit is contained in:
parent
74a7c46b0e
commit
b12acbcf9e
|
@ -32,8 +32,6 @@ import java.io.IOException;
|
|||
/**
|
||||
* Represents a single item response for an action executed as part of the bulk API. Holds the index/type/id
|
||||
* of the relevant action, and if it has failed or not (with the failure message incase it failed).
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class BulkItemResponse implements Streamable {
|
||||
|
||||
|
@ -148,8 +146,7 @@ public class BulkItemResponse implements Streamable {
|
|||
return ((IndexResponse) response).getType();
|
||||
} else if (response instanceof DeleteResponse) {
|
||||
return ((DeleteResponse) response).getType();
|
||||
}
|
||||
else if (response instanceof UpdateResponse) {
|
||||
} else if (response instanceof UpdateResponse) {
|
||||
return ((UpdateResponse) response).getType();
|
||||
}
|
||||
return null;
|
||||
|
@ -230,7 +227,7 @@ public class BulkItemResponse implements Streamable {
|
|||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
id = in.readVInt();
|
||||
opType = in.readString();
|
||||
opType = in.readSharedString();
|
||||
|
||||
byte type = in.readByte();
|
||||
if (type == 0) {
|
||||
|
@ -245,14 +242,15 @@ public class BulkItemResponse implements Streamable {
|
|||
}
|
||||
|
||||
if (in.readBoolean()) {
|
||||
failure = new Failure(in.readString(), in.readString(), in.readString(), in.readString());
|
||||
failure = new Failure(in.readSharedString(), in.readSharedString(), in.readString(), in.readString());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVInt(id);
|
||||
out.writeString(opType);
|
||||
out.writeSharedString(opType);
|
||||
|
||||
if (response == null) {
|
||||
out.writeByte((byte) 2);
|
||||
} else {
|
||||
|
@ -269,8 +267,8 @@ public class BulkItemResponse implements Streamable {
|
|||
out.writeBoolean(false);
|
||||
} else {
|
||||
out.writeBoolean(true);
|
||||
out.writeString(failure.getIndex());
|
||||
out.writeString(failure.getType());
|
||||
out.writeSharedString(failure.getIndex());
|
||||
out.writeSharedString(failure.getType());
|
||||
out.writeString(failure.getId());
|
||||
out.writeString(failure.getMessage());
|
||||
}
|
||||
|
|
|
@ -201,7 +201,7 @@ public class DeleteRequest extends ShardReplicationOperationRequest<DeleteReques
|
|||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
type = in.readString();
|
||||
type = in.readSharedString();
|
||||
id = in.readString();
|
||||
routing = in.readOptionalString();
|
||||
refresh = in.readBoolean();
|
||||
|
@ -212,7 +212,7 @@ public class DeleteRequest extends ShardReplicationOperationRequest<DeleteReques
|
|||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(type);
|
||||
out.writeSharedString(type);
|
||||
out.writeString(id);
|
||||
out.writeOptionalString(routing());
|
||||
out.writeBoolean(refresh);
|
||||
|
|
|
@ -89,9 +89,9 @@ public class DeleteResponse extends ActionResponse {
|
|||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
index = in.readString();
|
||||
index = in.readSharedString();
|
||||
type = in.readSharedString();
|
||||
id = in.readString();
|
||||
type = in.readString();
|
||||
version = in.readLong();
|
||||
notFound = in.readBoolean();
|
||||
}
|
||||
|
@ -99,9 +99,9 @@ public class DeleteResponse extends ActionResponse {
|
|||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(index);
|
||||
out.writeSharedString(index);
|
||||
out.writeSharedString(type);
|
||||
out.writeString(id);
|
||||
out.writeString(type);
|
||||
out.writeLong(version);
|
||||
out.writeBoolean(notFound);
|
||||
}
|
||||
|
|
|
@ -200,7 +200,7 @@ public class GetRequest extends SingleShardOperationRequest<GetRequest> {
|
|||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
type = in.readString();
|
||||
type = in.readSharedString();
|
||||
id = in.readString();
|
||||
routing = in.readOptionalString();
|
||||
preference = in.readOptionalString();
|
||||
|
@ -223,7 +223,7 @@ public class GetRequest extends SingleShardOperationRequest<GetRequest> {
|
|||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(type);
|
||||
out.writeSharedString(type);
|
||||
out.writeString(id);
|
||||
out.writeOptionalString(routing);
|
||||
out.writeOptionalString(preference);
|
||||
|
|
|
@ -118,14 +118,10 @@ public class MultiGetRequest extends ActionRequest<MultiGetRequest> {
|
|||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
index = in.readString();
|
||||
if (in.readBoolean()) {
|
||||
type = in.readString();
|
||||
}
|
||||
index = in.readSharedString();
|
||||
type = in.readOptionalSharedString();
|
||||
id = in.readString();
|
||||
if (in.readBoolean()) {
|
||||
routing = in.readString();
|
||||
}
|
||||
routing = in.readOptionalString();
|
||||
int size = in.readVInt();
|
||||
if (size > 0) {
|
||||
fields = new String[size];
|
||||
|
@ -137,20 +133,10 @@ public class MultiGetRequest extends ActionRequest<MultiGetRequest> {
|
|||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(index);
|
||||
if (type == null) {
|
||||
out.writeBoolean(false);
|
||||
} else {
|
||||
out.writeBoolean(true);
|
||||
out.writeString(type);
|
||||
}
|
||||
out.writeSharedString(index);
|
||||
out.writeOptionalSharedString(type);
|
||||
out.writeString(id);
|
||||
if (routing == null) {
|
||||
out.writeBoolean(false);
|
||||
} else {
|
||||
out.writeBoolean(true);
|
||||
out.writeString(routing);
|
||||
}
|
||||
out.writeOptionalString(routing);
|
||||
if (fields == null) {
|
||||
out.writeVInt(0);
|
||||
} else {
|
||||
|
|
|
@ -108,7 +108,7 @@ public class MultiGetShardRequest extends SingleShardOperationRequest<MultiGetSh
|
|||
for (int i = 0; i < size; i++) {
|
||||
locations.add(in.readVInt());
|
||||
if (in.readBoolean()) {
|
||||
types.add(in.readString());
|
||||
types.add(in.readSharedString());
|
||||
} else {
|
||||
types.add(null);
|
||||
}
|
||||
|
@ -145,7 +145,7 @@ public class MultiGetShardRequest extends SingleShardOperationRequest<MultiGetSh
|
|||
out.writeBoolean(false);
|
||||
} else {
|
||||
out.writeBoolean(true);
|
||||
out.writeString(types.get(i));
|
||||
out.writeSharedString(types.get(i));
|
||||
}
|
||||
out.writeString(ids.get(i));
|
||||
if (fields.get(i) == null) {
|
||||
|
|
|
@ -608,7 +608,7 @@ public class IndexRequest extends ShardReplicationOperationRequest<IndexRequest>
|
|||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
type = in.readString();
|
||||
type = in.readSharedString();
|
||||
id = in.readOptionalString();
|
||||
routing = in.readOptionalString();
|
||||
parent = in.readOptionalString();
|
||||
|
@ -626,7 +626,7 @@ public class IndexRequest extends ShardReplicationOperationRequest<IndexRequest>
|
|||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(type);
|
||||
out.writeSharedString(type);
|
||||
out.writeOptionalString(id);
|
||||
out.writeOptionalString(routing);
|
||||
out.writeOptionalString(parent);
|
||||
|
|
|
@ -89,9 +89,9 @@ public class IndexResponse extends ActionResponse {
|
|||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
index = in.readString();
|
||||
index = in.readSharedString();
|
||||
type = in.readSharedString();
|
||||
id = in.readString();
|
||||
type = in.readString();
|
||||
version = in.readLong();
|
||||
created = in.readBoolean();
|
||||
}
|
||||
|
@ -99,9 +99,9 @@ public class IndexResponse extends ActionResponse {
|
|||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(index);
|
||||
out.writeSharedString(index);
|
||||
out.writeSharedString(type);
|
||||
out.writeString(id);
|
||||
out.writeString(type);
|
||||
out.writeLong(version);
|
||||
out.writeBoolean(created);
|
||||
}
|
||||
|
|
|
@ -161,7 +161,7 @@ public abstract class ShardReplicationOperationRequest<T extends ShardReplicatio
|
|||
replicationType = ReplicationType.fromId(in.readByte());
|
||||
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
|
||||
timeout = TimeValue.readTimeValue(in);
|
||||
index = in.readString();
|
||||
index = in.readSharedString();
|
||||
// no need to serialize threaded* parameters, since they only matter locally
|
||||
}
|
||||
|
||||
|
@ -171,7 +171,7 @@ public abstract class ShardReplicationOperationRequest<T extends ShardReplicatio
|
|||
out.writeByte(replicationType.id());
|
||||
out.writeByte(consistencyLevel.id());
|
||||
timeout.writeTo(out);
|
||||
out.writeString(index);
|
||||
out.writeSharedString(index);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -580,7 +580,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
|
|||
super.readFrom(in);
|
||||
replicationType = ReplicationType.fromId(in.readByte());
|
||||
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
|
||||
type = in.readString();
|
||||
type = in.readSharedString();
|
||||
id = in.readString();
|
||||
routing = in.readOptionalString();
|
||||
script = in.readOptionalString();
|
||||
|
@ -615,7 +615,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
|
|||
super.writeTo(out);
|
||||
out.writeByte(replicationType.id());
|
||||
out.writeByte(consistencyLevel.id());
|
||||
out.writeString(type);
|
||||
out.writeSharedString(type);
|
||||
out.writeString(id);
|
||||
out.writeOptionalString(routing);
|
||||
out.writeOptionalString(script);
|
||||
|
|
|
@ -96,9 +96,9 @@ public class UpdateResponse extends ActionResponse {
|
|||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
index = in.readString();
|
||||
index = in.readSharedString();
|
||||
type = in.readSharedString();
|
||||
id = in.readString();
|
||||
type = in.readString();
|
||||
version = in.readLong();
|
||||
created = in.readBoolean();
|
||||
if (in.readBoolean()) {
|
||||
|
@ -109,9 +109,9 @@ public class UpdateResponse extends ActionResponse {
|
|||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(index);
|
||||
out.writeSharedString(index);
|
||||
out.writeSharedString(type);
|
||||
out.writeString(id);
|
||||
out.writeString(type);
|
||||
out.writeLong(version);
|
||||
out.writeBoolean(created);
|
||||
if (getResult == null) {
|
||||
|
|
|
@ -103,6 +103,11 @@ public abstract class AdapterStreamInput extends StreamInput {
|
|||
return in.readString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String readSharedString() throws IOException {
|
||||
return in.readSharedString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Text readText() throws IOException {
|
||||
return in.readText();
|
||||
|
|
|
@ -131,6 +131,11 @@ public class AdapterStreamOutput extends StreamOutput {
|
|||
out.writeString(str);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeSharedString(String str) throws IOException {
|
||||
out.writeSharedString(str);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeText(Text text) throws IOException {
|
||||
out.writeText(text);
|
||||
|
|
|
@ -30,8 +30,6 @@ import java.io.IOException;
|
|||
public class HandlesStreamInput extends AdapterStreamInput {
|
||||
|
||||
private final TIntObjectHashMap<String> handles = new TIntObjectHashMap<String>();
|
||||
private final TIntObjectHashMap<String> identityHandles = new TIntObjectHashMap<String>();
|
||||
|
||||
private final TIntObjectHashMap<Text> handlesText = new TIntObjectHashMap<Text>();
|
||||
|
||||
HandlesStreamInput() {
|
||||
|
@ -43,7 +41,7 @@ public class HandlesStreamInput extends AdapterStreamInput {
|
|||
}
|
||||
|
||||
@Override
|
||||
public String readString() throws IOException {
|
||||
public String readSharedString() throws IOException {
|
||||
byte b = in.readByte();
|
||||
if (b == 0) {
|
||||
// full string with handle
|
||||
|
@ -53,19 +51,16 @@ public class HandlesStreamInput extends AdapterStreamInput {
|
|||
return s;
|
||||
} else if (b == 1) {
|
||||
return handles.get(in.readVInt());
|
||||
} else if (b == 2) {
|
||||
// full string with handle
|
||||
int handle = in.readVInt();
|
||||
String s = in.readString();
|
||||
identityHandles.put(handle, s);
|
||||
return s;
|
||||
} else if (b == 3) {
|
||||
return identityHandles.get(in.readVInt());
|
||||
} else {
|
||||
throw new IOException("Expected handle header, got [" + b + "]");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String readString() throws IOException {
|
||||
return in.readString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Text readSharedText() throws IOException {
|
||||
byte b = in.readByte();
|
||||
|
@ -86,21 +81,15 @@ public class HandlesStreamInput extends AdapterStreamInput {
|
|||
@Override
|
||||
public void reset() throws IOException {
|
||||
super.reset();
|
||||
handles.clear();
|
||||
identityHandles.clear();
|
||||
handlesText.clear();
|
||||
cleanHandles();
|
||||
}
|
||||
|
||||
public void reset(StreamInput in) {
|
||||
super.reset(in);
|
||||
handles.clear();
|
||||
identityHandles.clear();
|
||||
handlesText.clear();
|
||||
cleanHandles();
|
||||
}
|
||||
|
||||
public void cleanHandles() {
|
||||
handles.clear();
|
||||
identityHandles.clear();
|
||||
handlesText.clear();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,69 +24,41 @@ import gnu.trove.map.hash.TObjectIntHashMap;
|
|||
import org.elasticsearch.common.text.Text;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class HandlesStreamOutput extends AdapterStreamOutput {
|
||||
|
||||
private static final int DEFAULT_IDENTITY_THRESHOLD = 50;
|
||||
|
||||
// a threshold above which strings will use identity check
|
||||
private final int identityThreshold;
|
||||
|
||||
private final TObjectIntHashMap<String> handles = new TObjectIntHashMap<String>(Constants.DEFAULT_CAPACITY, Constants.DEFAULT_LOAD_FACTOR, -1);
|
||||
private final HandleTable identityHandles = new HandleTable(10, (float) 3.00);
|
||||
|
||||
private final TObjectIntHashMap<Text> handlesText = new TObjectIntHashMap<Text>(Constants.DEFAULT_CAPACITY, Constants.DEFAULT_LOAD_FACTOR, -1);
|
||||
|
||||
public HandlesStreamOutput(StreamOutput out) {
|
||||
this(out, DEFAULT_IDENTITY_THRESHOLD);
|
||||
}
|
||||
|
||||
public HandlesStreamOutput(StreamOutput out, int identityThreshold) {
|
||||
super(out);
|
||||
this.identityThreshold = identityThreshold;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeString(String s) throws IOException {
|
||||
if (s.length() < identityThreshold) {
|
||||
int handle = handles.get(s);
|
||||
public void writeSharedString(String str) throws IOException {
|
||||
int handle = handles.get(str);
|
||||
if (handle == -1) {
|
||||
handle = handles.size();
|
||||
handles.put(s, handle);
|
||||
handles.put(str, handle);
|
||||
out.writeByte((byte) 0);
|
||||
out.writeVInt(handle);
|
||||
out.writeString(s);
|
||||
out.writeString(str);
|
||||
} else {
|
||||
out.writeByte((byte) 1);
|
||||
out.writeVInt(handle);
|
||||
}
|
||||
} else {
|
||||
int handle = identityHandles.lookup(s);
|
||||
if (handle == -1) {
|
||||
handle = identityHandles.assign(s);
|
||||
out.writeByte((byte) 2);
|
||||
out.writeVInt(handle);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeString(String s) throws IOException {
|
||||
out.writeString(s);
|
||||
} else {
|
||||
out.writeByte((byte) 3);
|
||||
out.writeVInt(handle);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeSharedText(Text text) throws IOException {
|
||||
int length;
|
||||
if (text.hasBytes()) {
|
||||
length = text.bytes().length();
|
||||
} else {
|
||||
length = text.string().length();
|
||||
}
|
||||
if (length < identityThreshold) {
|
||||
int handle = handlesText.get(text);
|
||||
if (handle == -1) {
|
||||
handle = handlesText.size();
|
||||
|
@ -98,17 +70,11 @@ public class HandlesStreamOutput extends AdapterStreamOutput {
|
|||
out.writeByte((byte) 1);
|
||||
out.writeVInt(handle);
|
||||
}
|
||||
} else {
|
||||
out.writeByte((byte) 2);
|
||||
out.writeText(text);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() throws IOException {
|
||||
handles.clear();
|
||||
identityHandles.clear();
|
||||
handlesText.clear();
|
||||
clear();
|
||||
if (out != null) {
|
||||
out.reset();
|
||||
}
|
||||
|
@ -116,132 +82,6 @@ public class HandlesStreamOutput extends AdapterStreamOutput {
|
|||
|
||||
public void clear() {
|
||||
handles.clear();
|
||||
identityHandles.clear();
|
||||
handlesText.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Lightweight identity hash table which maps objects to integer handles,
|
||||
* assigned in ascending order.
|
||||
*/
|
||||
private static class HandleTable {
|
||||
|
||||
/* number of mappings in table/next available handle */
|
||||
private int size;
|
||||
/* size threshold determining when to expand hash spine */
|
||||
private int threshold;
|
||||
/* factor for computing size threshold */
|
||||
private final float loadFactor;
|
||||
/* maps hash value -> candidate handle value */
|
||||
private int[] spine;
|
||||
/* maps handle value -> next candidate handle value */
|
||||
private int[] next;
|
||||
/* maps handle value -> associated object */
|
||||
private Object[] objs;
|
||||
|
||||
/**
|
||||
* Creates new HandleTable with given capacity and load factor.
|
||||
*/
|
||||
HandleTable(int initialCapacity, float loadFactor) {
|
||||
this.loadFactor = loadFactor;
|
||||
spine = new int[initialCapacity];
|
||||
next = new int[initialCapacity];
|
||||
objs = new Object[initialCapacity];
|
||||
threshold = (int) (initialCapacity * loadFactor);
|
||||
clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Assigns next available handle to given object, and returns handle
|
||||
* value. Handles are assigned in ascending order starting at 0.
|
||||
*/
|
||||
int assign(Object obj) {
|
||||
if (size >= next.length) {
|
||||
growEntries();
|
||||
}
|
||||
if (size >= threshold) {
|
||||
growSpine();
|
||||
}
|
||||
insert(obj, size);
|
||||
return size++;
|
||||
}
|
||||
|
||||
/**
|
||||
* Looks up and returns handle associated with given object, or -1 if
|
||||
* no mapping found.
|
||||
*/
|
||||
int lookup(Object obj) {
|
||||
if (size == 0) {
|
||||
return -1;
|
||||
}
|
||||
int index = hash(obj) % spine.length;
|
||||
for (int i = spine[index]; i >= 0; i = next[i]) {
|
||||
if (objs[i] == obj) {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resets table to its initial (empty) state.
|
||||
*/
|
||||
void clear() {
|
||||
Arrays.fill(spine, -1);
|
||||
Arrays.fill(objs, 0, size, null);
|
||||
size = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of mappings currently in table.
|
||||
*/
|
||||
int size() {
|
||||
return size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Inserts mapping object -> handle mapping into table. Assumes table
|
||||
* is large enough to accommodate new mapping.
|
||||
*/
|
||||
private void insert(Object obj, int handle) {
|
||||
int index = hash(obj) % spine.length;
|
||||
objs[handle] = obj;
|
||||
next[handle] = spine[index];
|
||||
spine[index] = handle;
|
||||
}
|
||||
|
||||
/**
|
||||
* Expands the hash "spine" -- equivalent to increasing the number of
|
||||
* buckets in a conventional hash table.
|
||||
*/
|
||||
private void growSpine() {
|
||||
spine = new int[(spine.length << 1) + 1];
|
||||
threshold = (int) (spine.length * loadFactor);
|
||||
Arrays.fill(spine, -1);
|
||||
for (int i = 0; i < size; i++) {
|
||||
insert(objs[i], i);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Increases hash table capacity by lengthening entry arrays.
|
||||
*/
|
||||
private void growEntries() {
|
||||
int newLength = (next.length << 1) + 1;
|
||||
int[] newNext = new int[newLength];
|
||||
System.arraycopy(next, 0, newNext, 0, size);
|
||||
next = newNext;
|
||||
|
||||
Object[] newObjs = new Object[newLength];
|
||||
System.arraycopy(objs, 0, newObjs, 0, size);
|
||||
objs = newObjs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns hash value for given object.
|
||||
*/
|
||||
private int hash(Object obj) {
|
||||
return System.identityHashCode(obj) & 0x7FFFFFFF;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -245,6 +245,14 @@ public abstract class StreamInput extends InputStream {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public String readOptionalSharedString() throws IOException {
|
||||
if (readBoolean()) {
|
||||
return readSharedString();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public String readString() throws IOException {
|
||||
int charCount = readVInt();
|
||||
char[] chars = CachedStreamInput.getCharArray(charCount);
|
||||
|
@ -274,6 +282,10 @@ public abstract class StreamInput extends InputStream {
|
|||
return new String(chars, 0, charCount);
|
||||
}
|
||||
|
||||
public String readSharedString() throws IOException {
|
||||
return readString();
|
||||
}
|
||||
|
||||
|
||||
public final float readFloat() throws IOException {
|
||||
return Float.intBitsToFloat(readInt());
|
||||
|
@ -384,14 +396,14 @@ public abstract class StreamInput extends InputStream {
|
|||
int size9 = readVInt();
|
||||
Map map9 = new LinkedHashMap(size9);
|
||||
for (int i = 0; i < size9; i++) {
|
||||
map9.put(readString(), readGenericValue());
|
||||
map9.put(readSharedString(), readGenericValue());
|
||||
}
|
||||
return map9;
|
||||
case 10:
|
||||
int size10 = readVInt();
|
||||
Map map10 = new HashMap(size10);
|
||||
for (int i = 0; i < size10; i++) {
|
||||
map10.put(readString(), readGenericValue());
|
||||
map10.put(readSharedString(), readGenericValue());
|
||||
}
|
||||
return map10;
|
||||
case 11:
|
||||
|
|
|
@ -176,6 +176,15 @@ public abstract class StreamOutput extends OutputStream {
|
|||
}
|
||||
}
|
||||
|
||||
public void writeOptionalSharedString(@Nullable String str) throws IOException {
|
||||
if (str == null) {
|
||||
writeBoolean(false);
|
||||
} else {
|
||||
writeBoolean(true);
|
||||
writeSharedString(str);
|
||||
}
|
||||
}
|
||||
|
||||
public void writeOptionalText(@Nullable Text text) throws IOException {
|
||||
if (text == null) {
|
||||
writeInt(-1);
|
||||
|
@ -234,6 +243,10 @@ public abstract class StreamOutput extends OutputStream {
|
|||
}
|
||||
}
|
||||
|
||||
public void writeSharedString(String str) throws IOException {
|
||||
writeString(str);
|
||||
}
|
||||
|
||||
public void writeFloat(float v) throws IOException {
|
||||
writeInt(Float.floatToIntBits(v));
|
||||
}
|
||||
|
@ -360,7 +373,7 @@ public abstract class StreamOutput extends OutputStream {
|
|||
Map<String, Object> map = (Map<String, Object>) value;
|
||||
writeVInt(map.size());
|
||||
for (Map.Entry<String, Object> entry : map.entrySet()) {
|
||||
writeString(entry.getKey());
|
||||
writeSharedString(entry.getKey());
|
||||
writeGenericValue(entry.getValue());
|
||||
}
|
||||
} else if (type == Byte.class) {
|
||||
|
|
|
@ -46,21 +46,13 @@ import static org.elasticsearch.index.get.GetField.readGetField;
|
|||
public class GetResult implements Streamable, Iterable<GetField>, ToXContent {
|
||||
|
||||
private String index;
|
||||
|
||||
private String type;
|
||||
|
||||
private String id;
|
||||
|
||||
private long version;
|
||||
|
||||
private boolean exists;
|
||||
|
||||
private Map<String, GetField> fields;
|
||||
|
||||
private Map<String, Object> sourceAsMap;
|
||||
|
||||
private BytesReference source;
|
||||
|
||||
private byte[] sourceAsBytes;
|
||||
|
||||
GetResult() {
|
||||
|
@ -275,8 +267,8 @@ public class GetResult implements Streamable, Iterable<GetField>, ToXContent {
|
|||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
index = in.readString();
|
||||
type = in.readOptionalString();
|
||||
index = in.readSharedString();
|
||||
type = in.readOptionalSharedString();
|
||||
id = in.readString();
|
||||
version = in.readLong();
|
||||
exists = in.readBoolean();
|
||||
|
@ -300,8 +292,8 @@ public class GetResult implements Streamable, Iterable<GetField>, ToXContent {
|
|||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(index);
|
||||
out.writeOptionalString(type);
|
||||
out.writeSharedString(index);
|
||||
out.writeOptionalSharedString(type);
|
||||
out.writeString(id);
|
||||
out.writeLong(version);
|
||||
out.writeBoolean(exists);
|
||||
|
|
|
@ -86,7 +86,7 @@ public class HighlightField implements Streamable {
|
|||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
name = in.readString();
|
||||
name = in.readSharedString();
|
||||
if (in.readBoolean()) {
|
||||
int size = in.readVInt();
|
||||
if (size == 0) {
|
||||
|
@ -102,7 +102,7 @@ public class HighlightField implements Streamable {
|
|||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(name);
|
||||
out.writeSharedString(name);
|
||||
if (fragments == null) {
|
||||
out.writeBoolean(false);
|
||||
} else {
|
||||
|
|
|
@ -91,7 +91,7 @@ public class InternalSearchHitField implements SearchHitField {
|
|||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
name = in.readString();
|
||||
name = in.readSharedString();
|
||||
int size = in.readVInt();
|
||||
values = new ArrayList<Object>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
|
@ -101,7 +101,7 @@ public class InternalSearchHitField implements SearchHitField {
|
|||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(name);
|
||||
out.writeSharedString(name);
|
||||
out.writeVInt(values.size());
|
||||
for (Object value : values) {
|
||||
out.writeGenericValue(value);
|
||||
|
|
|
@ -19,27 +19,18 @@
|
|||
|
||||
package org.elasticsearch.test.unit.common.io;
|
||||
|
||||
import static org.elasticsearch.common.io.Streams.copy;
|
||||
import static org.elasticsearch.common.io.Streams.copyToByteArray;
|
||||
import static org.elasticsearch.common.io.Streams.copyToString;
|
||||
import com.google.common.base.Charsets;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.Arrays;
|
||||
|
||||
import static org.elasticsearch.common.io.Streams.*;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.StringReader;
|
||||
import java.io.StringWriter;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
|
||||
/**
|
||||
* Unit tests for {@link org.elasticsearch.common.io.Streams}.
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class StreamsTests {
|
||||
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.elasticsearch.common.io.stream.HandlesStreamOutput;
|
|||
import org.testng.annotations.Test;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -35,24 +35,46 @@ import static org.hamcrest.Matchers.equalTo;
|
|||
public class HandlesStreamsTests {
|
||||
|
||||
@Test
|
||||
public void testSharedUTFHandles() throws Exception {
|
||||
BytesStreamOutput bytesOut = new BytesStreamOutput();
|
||||
HandlesStreamOutput out = new HandlesStreamOutput(bytesOut, 5);
|
||||
String lowerThresholdValue = "test";
|
||||
String higherThresholdValue = "something that is higher than 5";
|
||||
out.writeString(lowerThresholdValue);
|
||||
out.writeString(higherThresholdValue);
|
||||
out.writeInt(1);
|
||||
out.writeString("else");
|
||||
out.writeString(higherThresholdValue);
|
||||
out.writeString(lowerThresholdValue);
|
||||
public void testSharedStringHandles() throws Exception {
|
||||
String test1 = "test1";
|
||||
String test2 = "test2";
|
||||
String test3 = "test3";
|
||||
String test4 = "test4";
|
||||
String test5 = "test5";
|
||||
String test6 = "test6";
|
||||
|
||||
HandlesStreamInput in = new HandlesStreamInput(new BytesStreamInput(bytesOut.bytes().toBytes(), false));
|
||||
assertThat(in.readString(), equalTo(lowerThresholdValue));
|
||||
assertThat(in.readString(), equalTo(higherThresholdValue));
|
||||
assertThat(in.readInt(), equalTo(1));
|
||||
assertThat(in.readString(), equalTo("else"));
|
||||
assertThat(in.readString(), equalTo(higherThresholdValue));
|
||||
assertThat(in.readString(), equalTo(lowerThresholdValue));
|
||||
BytesStreamOutput bout = new BytesStreamOutput();
|
||||
HandlesStreamOutput out = new HandlesStreamOutput(bout);
|
||||
out.writeString(test1);
|
||||
out.writeString(test1);
|
||||
out.writeString(test2);
|
||||
out.writeString(test3);
|
||||
out.writeSharedString(test4);
|
||||
out.writeSharedString(test4);
|
||||
out.writeSharedString(test5);
|
||||
out.writeSharedString(test6);
|
||||
|
||||
BytesStreamInput bin = new BytesStreamInput(bout.bytes());
|
||||
HandlesStreamInput in = new HandlesStreamInput(bin);
|
||||
String s1 = in.readString();
|
||||
String s2 = in.readString();
|
||||
String s3 = in.readString();
|
||||
String s4 = in.readString();
|
||||
String s5 = in.readSharedString();
|
||||
String s6 = in.readSharedString();
|
||||
String s7 = in.readSharedString();
|
||||
String s8 = in.readSharedString();
|
||||
|
||||
assertThat(s1, equalTo(test1));
|
||||
assertThat(s2, equalTo(test1));
|
||||
assertThat(s3, equalTo(test2));
|
||||
assertThat(s4, equalTo(test3));
|
||||
assertThat(s5, equalTo(test4));
|
||||
assertThat(s6, equalTo(test4));
|
||||
assertThat(s7, equalTo(test5));
|
||||
assertThat(s8, equalTo(test6));
|
||||
|
||||
assertThat(s1, not(sameInstance(s2)));
|
||||
assertThat(s5, sameInstance(s6));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue