diff --git a/core/src/main/java/org/elasticsearch/ElasticsearchException.java b/core/src/main/java/org/elasticsearch/ElasticsearchException.java
index 93576c790c4..7830d521ff5 100644
--- a/core/src/main/java/org/elasticsearch/ElasticsearchException.java
+++ b/core/src/main/java/org/elasticsearch/ElasticsearchException.java
@@ -101,7 +101,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
public ElasticsearchException(StreamInput in) throws IOException {
super(in.readOptionalString(), in.readException());
readStackTrace(this, in);
- headers.putAll(in.readMapOfLists());
+ headers.putAll(in.readMapOfLists(StreamInput::readString, StreamInput::readString));
}
/**
@@ -196,7 +196,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
out.writeOptionalString(this.getMessage());
out.writeException(this.getCause());
writeStackTraces(this, out);
- out.writeMapOfLists(headers);
+ out.writeMapOfLists(headers, StreamOutput::writeString, StreamOutput::writeString);
}
public static ElasticsearchException readException(StreamInput input, int id) throws IOException {
diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java
index 4c5f9757caa..794ed6f36fa 100644
--- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java
+++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java
@@ -431,27 +431,35 @@ public abstract class StreamInput extends InputStream {
return map;
}
+ /**
+ * Read a {@link Map} of {@code K}-type keys to {@code V}-type {@link List}s.
+ *
+ * Map<String, List<String>> map = in.readMapOfLists(StreamInput::readString, StreamInput::readString);
+ *
+ *
+ * @param keyReader The key reader
+ * @param valueReader The value reader
+ * @return Never {@code null}.
+ */
+ public Map> readMapOfLists(final Writeable.Reader keyReader, final Writeable.Reader valueReader)
+ throws IOException {
+ final int size = readVInt();
+ if (size == 0) {
+ return Collections.emptyMap();
+ }
+ final Map> map = new HashMap<>(size);
+ for (int i = 0; i < size; ++i) {
+ map.put(keyReader.read(this), readList(valueReader));
+ }
+ return map;
+ }
+
@Nullable
@SuppressWarnings("unchecked")
public Map readMap() throws IOException {
return (Map) readGenericValue();
}
- /**
- * Read a map of strings to string lists.
- */
- public Map> readMapOfLists() throws IOException {
- int size = readVInt();
- if (size == 0) {
- return Collections.emptyMap();
- }
- Map> map = new HashMap<>(size);
- for (int i = 0; i < size; ++i) {
- map.put(readString(), readList(StreamInput::readString));
- }
- return map;
- }
-
@SuppressWarnings({"unchecked"})
@Nullable
public Object readGenericValue() throws IOException {
diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
index 24350936fa2..1176f54e88a 100644
--- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
+++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
@@ -32,6 +32,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.geo.GeoPoint;
+import org.elasticsearch.common.io.stream.Writeable.Writer;
import org.elasticsearch.common.text.Text;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadableInstant;
@@ -413,25 +414,28 @@ public abstract class StreamOutput extends OutputStream {
}
/**
- * Writes a map of strings to string lists.
+ * Write a {@link Map} of {@code K}-type keys to {@code V}-type {@link List}s.
+ *
+ * Map<String, List<String>> map = ...;
+ * out.writeMapOfLists(map, StreamOutput::writeString, StreamOutput::writeString);
+ *
+ *
+ * @param keyWriter The key writer
+ * @param valueWriter The value writer
*/
- public void writeMapOfLists(Map> map) throws IOException {
+ public void writeMapOfLists(final Map> map, final Writer keyWriter, final Writer valueWriter)
+ throws IOException {
writeVInt(map.size());
- for (Map.Entry> entry : map.entrySet()) {
- writeString(entry.getKey());
+ for (final Map.Entry> entry : map.entrySet()) {
+ keyWriter.write(this, entry.getKey());
writeVInt(entry.getValue().size());
- for (String v : entry.getValue()) {
- writeString(v);
+ for (final V value : entry.getValue()) {
+ valueWriter.write(this, value);
}
}
}
- @FunctionalInterface
- interface Writer {
- void write(StreamOutput o, Object value) throws IOException;
- }
-
private static final Map, Writer> WRITERS;
static {
diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/Writeable.java b/core/src/main/java/org/elasticsearch/common/io/stream/Writeable.java
index 16497533e29..30607f33759 100644
--- a/core/src/main/java/org/elasticsearch/common/io/stream/Writeable.java
+++ b/core/src/main/java/org/elasticsearch/common/io/stream/Writeable.java
@@ -25,26 +25,69 @@ import java.io.IOException;
* Implementers can be written to a {@linkplain StreamOutput} and read from a {@linkplain StreamInput}. This allows them to be "thrown
* across the wire" using Elasticsearch's internal protocol. If the implementer also implements equals and hashCode then a copy made by
* serializing and deserializing must be equal and have the same hashCode. It isn't required that such a copy be entirely unchanged.
- *
+ *
* Prefer implementing this interface over implementing {@link Streamable} where possible. Lots of code depends on {@linkplain Streamable}
* so this isn't always possible.
*/
public interface Writeable {
+
/**
* Write this into the {@linkplain StreamOutput}.
*/
- void writeTo(StreamOutput out) throws IOException;
+ void writeTo(final StreamOutput out) throws IOException;
+
+ /**
+ * Reference to a method that can write some object to a {@link StreamOutput}.
+ *
+ * By convention this is a method from {@link StreamOutput} itself (e.g., {@link StreamOutput#writeString}). If the value can be
+ * {@code null}, then the "optional" variant of methods should be used!
+ *
+ * Most classes should implement {@link Writeable} and the {@link Writeable#writeTo(StreamOutput)} method should use
+ * {@link StreamOutput} methods directly or this indirectly:
+ *
+ * public void writeTo(StreamOutput out) throws IOException {
+ * out.writeVInt(someValue);
+ * out.writeMapOfLists(someMap, StreamOutput::writeString, StreamOutput::writeString);
+ * }
+ *
+ */
+ @FunctionalInterface
+ interface Writer {
+
+ /**
+ * Write {@code V}-type {@code value} to the {@code out}put stream.
+ *
+ * @param out Output to write the {@code value} too
+ * @param value The value to add
+ */
+ void write(final StreamOutput out, final V value) throws IOException;
+
+ }
/**
* Reference to a method that can read some object from a stream. By convention this is a constructor that takes
* {@linkplain StreamInput} as an argument for most classes and a static method for things like enums. Returning null from one of these
* is always wrong - for that we use methods like {@link StreamInput#readOptionalWriteable(Reader)}.
+ *
+ * As most classes will implement this via a constructor (or a static method in the case of enumerations), it's something that should
+ * look like:
+ *
+ * public MyClass(final StreamInput in) throws IOException {
+ * this.someValue = in.readVInt();
+ * this.someMap = in.readMapOfLists(StreamInput::readString, StreamInput::readString);
+ * }
+ *
*/
@FunctionalInterface
- interface Reader {
+ interface Reader {
+
/**
- * Read R from a stream.
+ * Read {@code V}-type value from a stream.
+ *
+ * @param in Input to read the value from
*/
- R read(StreamInput in) throws IOException;
+ V read(final StreamInput in) throws IOException;
+
}
+
}
diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java
index bf1ef6a563f..8c04c24ec5b 100644
--- a/core/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java
+++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java
@@ -269,7 +269,7 @@ public final class ThreadContext implements Closeable, Writeable {
}
this.requestHeaders = requestHeaders;
- this.responseHeaders = in.readMapOfLists();
+ this.responseHeaders = in.readMapOfLists(StreamInput::readString, StreamInput::readString);
this.transientHeaders = Collections.emptyMap();
}
@@ -370,7 +370,7 @@ public final class ThreadContext implements Closeable, Writeable {
out.writeString(entry.getValue());
}
- out.writeMapOfLists(responseHeaders);
+ out.writeMapOfLists(responseHeaders, StreamOutput::writeString, StreamOutput::writeString);
}
}
diff --git a/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java b/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java
index 9e5beabd9b7..52676444d3c 100644
--- a/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java
+++ b/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java
@@ -30,7 +30,6 @@ import org.joda.time.DateTimeZone;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -42,7 +41,6 @@ import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.startsWith;
/**
* Tests for {@link BytesStreamOutput} paging behaviour.
@@ -462,11 +460,11 @@ public class BytesStreamsTests extends ESTestCase {
}
final BytesStreamOutput out = new BytesStreamOutput();
- out.writeMapOfLists(expected);
+ out.writeMapOfLists(expected, StreamOutput::writeString, StreamOutput::writeString);
final StreamInput in = StreamInput.wrap(BytesReference.toBytes(out.bytes()));
- final Map> loaded = in.readMapOfLists();
+ final Map> loaded = in.readMapOfLists(StreamInput::readString, StreamInput::readString);
assertThat(loaded.size(), equalTo(expected.size()));