diff --git a/src/main/java/org/elasticsearch/cluster/ClusterState.java b/src/main/java/org/elasticsearch/cluster/ClusterState.java index fb17bc55fff..69eecccc10c 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -233,7 +233,7 @@ public class ClusterState { } public static ClusterState fromBytes(byte[] data, DiscoveryNode localNode) throws IOException { - return readFrom(new BytesStreamInput(data), localNode); + return readFrom(new BytesStreamInput(data, false), localNode); } public static void writeTo(ClusterState state, StreamOutput out) throws IOException { diff --git a/src/main/java/org/elasticsearch/common/io/stream/BytesStreamInput.java b/src/main/java/org/elasticsearch/common/io/stream/BytesStreamInput.java index 19e9574c904..7177ea36991 100644 --- a/src/main/java/org/elasticsearch/common/io/stream/BytesStreamInput.java +++ b/src/main/java/org/elasticsearch/common/io/stream/BytesStreamInput.java @@ -35,18 +35,24 @@ public class BytesStreamInput extends StreamInput { protected int count; - public BytesStreamInput(byte buf[]) { - this(buf, 0, buf.length); + private final boolean unsafe; + + public BytesStreamInput(byte buf[], boolean unsafe) { + this(buf, 0, buf.length, unsafe); } - public BytesStreamInput(byte buf[], int offset, int length) { + public BytesStreamInput(byte buf[], int offset, int length, boolean unsafe) { this.buf = buf; this.pos = offset; this.count = Math.min(offset + length, buf.length); + this.unsafe = unsafe; } @Override public BytesHolder readBytesReference() throws IOException { + if (unsafe) { + return readBytesHolder(); + } int size = readVInt(); BytesHolder bytes = new BytesHolder(buf, pos, size); pos += size; diff --git a/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java b/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java index 75b70983afd..e170f66c6a3 100644 --- a/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java +++ b/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java @@ -40,7 +40,7 @@ public class XContentHelper { public static XContentParser createParser(byte[] data, int offset, int length) throws IOException { if (LZF.isCompressed(data, offset, length)) { - BytesStreamInput siBytes = new BytesStreamInput(data, offset, length); + BytesStreamInput siBytes = new BytesStreamInput(data, offset, length, false); LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes); XContentType contentType = XContentFactory.xContentType(siLzf); siLzf.resetToBufferStart(); @@ -55,7 +55,7 @@ public class XContentHelper { XContentParser parser; XContentType contentType; if (LZF.isCompressed(data, offset, length)) { - BytesStreamInput siBytes = new BytesStreamInput(data, offset, length); + BytesStreamInput siBytes = new BytesStreamInput(data, offset, length, false); LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes); contentType = XContentFactory.xContentType(siLzf); siLzf.resetToBufferStart(); diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java b/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java index 9c042216cf4..2693049d7b3 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java @@ -396,7 +396,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem } } if (internal) { - StreamInput input = CachedStreamInput.cachedHandles(new BytesStreamInput(datagramPacketReceive.getData(), datagramPacketReceive.getOffset() + INTERNAL_HEADER.length, datagramPacketReceive.getLength())); + StreamInput input = CachedStreamInput.cachedHandles(new BytesStreamInput(datagramPacketReceive.getData(), datagramPacketReceive.getOffset() + INTERNAL_HEADER.length, datagramPacketReceive.getLength(), true)); Version version = Version.readVersion(input); id = input.readInt(); clusterName = ClusterName.readClusterName(input); diff --git a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index db1ef3f1dd3..51970a98886 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -95,7 +95,7 @@ public class PublishClusterStateAction extends AbstractComponent { } } - private class PublishClusterStateRequest implements Streamable { + class PublishClusterStateRequest implements Streamable { private byte[] clusterStateInBytes; @@ -130,7 +130,7 @@ public class PublishClusterStateAction extends AbstractComponent { @Override public void messageReceived(PublishClusterStateRequest request, TransportChannel channel) throws Exception { - StreamInput in = CachedStreamInput.cachedHandlesLzf(new BytesStreamInput(request.clusterStateInBytes)); + StreamInput in = CachedStreamInput.cachedHandlesLzf(new BytesStreamInput(request.clusterStateInBytes, false)); ClusterState clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode()); listener.onNewClusterState(clusterState); channel.sendResponse(VoidStreamable.INSTANCE); diff --git a/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java b/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java index 23463e7ccf6..07edbca12e8 100644 --- a/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java +++ b/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java @@ -207,7 +207,7 @@ public abstract class BlobStoreGateway extends SharedStorageGateway { XContentParser parser = null; try { if (LZF.isCompressed(data)) { - BytesStreamInput siBytes = new BytesStreamInput(data); + BytesStreamInput siBytes = new BytesStreamInput(data, false); LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes); parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf); } else { diff --git a/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java b/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java index f7fc9cf2d73..1fae23024c0 100644 --- a/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java +++ b/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java @@ -374,7 +374,7 @@ public class LocalGateway extends AbstractLifecycleComponent implements XContentParser parser = null; try { if (LZF.isCompressed(data)) { - BytesStreamInput siBytes = new BytesStreamInput(data); + BytesStreamInput siBytes = new BytesStreamInput(data, false); LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes); parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf); } else { @@ -392,7 +392,7 @@ public class LocalGateway extends AbstractLifecycleComponent implements XContentParser parser = null; try { if (LZF.isCompressed(data)) { - BytesStreamInput siBytes = new BytesStreamInput(data); + BytesStreamInput siBytes = new BytesStreamInput(data, false); LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes); parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf); } else { diff --git a/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java b/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java index b6de2744636..45fcb2af431 100644 --- a/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java +++ b/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java @@ -462,7 +462,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo if (bos.size() < 4) { return; } - BytesStreamInput si = new BytesStreamInput(bos.underlyingBytes(), 0, bos.size()); + BytesStreamInput si = new BytesStreamInput(bos.underlyingBytes(), 0, bos.size(), false); int position; while (true) { try { diff --git a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index ecca0b93068..24e70962431 100644 --- a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -596,13 +596,13 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I switch (operation.opType()) { case CREATE: Translog.Create create = (Translog.Create) operation; - engine.create(prepareCreate(source(create.source(), create.sourceOffset(), create.sourceLength()).type(create.type()).id(create.id()) + engine.create(prepareCreate(source(create.source().bytes(), create.source().offset(), create.source().length()).type(create.type()).id(create.id()) .routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl())).version(create.version()) .origin(Engine.Operation.Origin.RECOVERY)); break; case SAVE: Translog.Index index = (Translog.Index) operation; - engine.index(prepareIndex(source(index.source(), index.sourceOffset(), index.sourceLength()).type(index.type()).id(index.id()) + engine.index(prepareIndex(source(index.source().bytes(), index.source().offset(), index.source().length()).type(index.type()).id(index.id()) .routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl())).version(index.version()) .origin(Engine.Operation.Origin.RECOVERY)); break; diff --git a/src/main/java/org/elasticsearch/index/translog/Translog.java b/src/main/java/org/elasticsearch/index/translog/Translog.java index 6fe28e42511..0907695ec89 100644 --- a/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -24,7 +24,6 @@ import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.common.BytesHolder; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.io.stream.BytesStreamInput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -219,7 +218,7 @@ public interface Translog extends IndexShardComponent { long estimateSize(); - Source readSource(BytesStreamInput in) throws IOException; + Source readSource(StreamInput in) throws IOException; } static class Source { @@ -241,9 +240,7 @@ public interface Translog extends IndexShardComponent { static class Create implements Operation { private String id; private String type; - private byte[] source; - private int sourceOffset; - private int sourceLength; + private BytesHolder source; private String routing; private String parent; private long timestamp; @@ -256,9 +253,7 @@ public interface Translog extends IndexShardComponent { public Create(Engine.Create create) { this.id = create.id(); this.type = create.type(); - this.source = create.source(); - this.sourceOffset = create.sourceOffset(); - this.sourceLength = create.sourceLength(); + this.source = new BytesHolder(create.source(), create.sourceOffset(), create.sourceLength()); this.routing = create.routing(); this.parent = create.parent(); this.timestamp = create.timestamp(); @@ -269,9 +264,7 @@ public interface Translog extends IndexShardComponent { public Create(String type, String id, byte[] source) { this.id = id; this.type = type; - this.source = source; - this.sourceOffset = 0; - this.sourceLength = source.length; + this.source = new BytesHolder(source); } @Override @@ -281,25 +274,17 @@ public interface Translog extends IndexShardComponent { @Override public long estimateSize() { - return ((id.length() + type.length()) * 2) + source.length + 12; + return ((id.length() + type.length()) * 2) + source.length() + 12; } public String id() { return this.id; } - public byte[] source() { + public BytesHolder source() { return this.source; } - public int sourceOffset() { - return this.sourceOffset; - } - - public int sourceLength() { - return this.sourceLength; - } - public String type() { return this.type; } @@ -325,34 +310,8 @@ public interface Translog extends IndexShardComponent { } @Override - public Source readSource(BytesStreamInput in) throws IOException { - int version = in.readVInt(); // version - id = in.readUTF(); - type = in.readUTF(); - - int length = in.readVInt(); - int offset = in.position(); - BytesHolder source = new BytesHolder(in.underlyingBuffer(), offset, length); - in.skip(length); - if (version >= 1) { - if (in.readBoolean()) { - routing = in.readUTF(); - } - } - if (version >= 2) { - if (in.readBoolean()) { - parent = in.readUTF(); - } - } - if (version >= 3) { - this.version = in.readLong(); - } - if (version >= 4) { - this.timestamp = in.readLong(); - } - if (version >= 5) { - this.ttl = in.readLong(); - } + public Source readSource(StreamInput in) throws IOException { + readFrom(in); return new Source(source, routing, parent, timestamp, ttl); } @@ -361,10 +320,7 @@ public interface Translog extends IndexShardComponent { int version = in.readVInt(); // version id = in.readUTF(); type = in.readUTF(); - sourceOffset = 0; - sourceLength = in.readVInt(); - source = new byte[sourceLength]; - in.readFully(source); + source = in.readBytesReference(); if (version >= 1) { if (in.readBoolean()) { routing = in.readUTF(); @@ -391,8 +347,7 @@ public interface Translog extends IndexShardComponent { out.writeVInt(5); // version out.writeUTF(id); out.writeUTF(type); - out.writeVInt(sourceLength); - out.writeBytes(source, sourceOffset, sourceLength); + out.writeBytesHolder(source); if (routing == null) { out.writeBoolean(false); } else { @@ -415,9 +370,7 @@ public interface Translog extends IndexShardComponent { private String id; private String type; private long version; - private byte[] source; - private int sourceOffset; - private int sourceLength; + private BytesHolder source; private String routing; private String parent; private long timestamp; @@ -429,9 +382,7 @@ public interface Translog extends IndexShardComponent { public Index(Engine.Index index) { this.id = index.id(); this.type = index.type(); - this.source = index.source(); - this.sourceOffset = index.sourceOffset(); - this.sourceLength = index.sourceLength(); + this.source = new BytesHolder(index.source(), index.sourceOffset(), index.sourceLength()); this.routing = index.routing(); this.parent = index.parent(); this.version = index.version(); @@ -442,9 +393,7 @@ public interface Translog extends IndexShardComponent { public Index(String type, String id, byte[] source) { this.type = type; this.id = id; - this.source = source; - this.sourceOffset = 0; - this.sourceLength = source.length; + this.source = new BytesHolder(source); } @Override @@ -454,7 +403,7 @@ public interface Translog extends IndexShardComponent { @Override public long estimateSize() { - return ((id.length() + type.length()) * 2) + source.length + 12; + return ((id.length() + type.length()) * 2) + source.length() + 12; } public String type() { @@ -481,51 +430,17 @@ public interface Translog extends IndexShardComponent { return this.ttl; } - public byte[] source() { + public BytesHolder source() { return this.source; } - public int sourceOffset() { - return this.sourceOffset; - } - - public int sourceLength() { - return this.sourceLength; - } - public long version() { return this.version; } @Override - public Source readSource(BytesStreamInput in) throws IOException { - int version = in.readVInt(); // version - id = in.readUTF(); - type = in.readUTF(); - - int length = in.readVInt(); - int offset = in.position(); - BytesHolder source = new BytesHolder(in.underlyingBuffer(), offset, length); - in.skip(length); - if (version >= 1) { - if (in.readBoolean()) { - routing = in.readUTF(); - } - } - if (version >= 2) { - if (in.readBoolean()) { - parent = in.readUTF(); - } - } - if (version >= 3) { - this.version = in.readLong(); - } - if (version >= 4) { - this.timestamp = in.readLong(); - } - if (version >= 5) { - this.ttl = in.readLong(); - } + public Source readSource(StreamInput in) throws IOException { + readFrom(in); return new Source(source, routing, parent, timestamp, ttl); } @@ -534,10 +449,7 @@ public interface Translog extends IndexShardComponent { int version = in.readVInt(); // version id = in.readUTF(); type = in.readUTF(); - sourceOffset = 0; - sourceLength = in.readVInt(); - source = new byte[sourceLength]; - in.readFully(source); + source = in.readBytesReference(); if (version >= 1) { if (in.readBoolean()) { routing = in.readUTF(); @@ -564,8 +476,7 @@ public interface Translog extends IndexShardComponent { out.writeVInt(5); // version out.writeUTF(id); out.writeUTF(type); - out.writeVInt(sourceLength); - out.writeBytes(source, sourceOffset, sourceLength); + out.writeBytesHolder(source); if (routing == null) { out.writeBoolean(false); } else { @@ -619,7 +530,7 @@ public interface Translog extends IndexShardComponent { } @Override - public Source readSource(BytesStreamInput in) throws IOException { + public Source readSource(StreamInput in) throws IOException { throw new ElasticSearchIllegalStateException("trying to read doc source from delete operation"); } @@ -683,7 +594,7 @@ public interface Translog extends IndexShardComponent { } @Override - public Source readSource(BytesStreamInput in) throws IOException { + public Source readSource(StreamInput in) throws IOException { throw new ElasticSearchIllegalStateException("trying to read doc source from delete_by_query operation"); } diff --git a/src/main/java/org/elasticsearch/index/translog/TranslogStreams.java b/src/main/java/org/elasticsearch/index/translog/TranslogStreams.java index b2aa1ae6784..230575d2a48 100644 --- a/src/main/java/org/elasticsearch/index/translog/TranslogStreams.java +++ b/src/main/java/org/elasticsearch/index/translog/TranslogStreams.java @@ -54,7 +54,7 @@ public class TranslogStreams { } public static Translog.Source readSource(byte[] data) throws IOException { - BytesStreamInput in = new BytesStreamInput(data); + BytesStreamInput in = new BytesStreamInput(data, false); in.readInt(); // the size header Translog.Operation.Type type = Translog.Operation.Type.fromId(in.readByte()); Translog.Operation operation; diff --git a/src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java b/src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java index d3312f3700c..8b545945259 100644 --- a/src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java +++ b/src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java @@ -120,7 +120,7 @@ public class FsChannelSnapshot implements Translog.Snapshot { channel.read(cacheBuffer, position); cacheBuffer.flip(); position += opSize; - lastOperationRead = TranslogStreams.readTranslogOperation(new BytesStreamInput(cacheBuffer.array(), 0, opSize)); + lastOperationRead = TranslogStreams.readTranslogOperation(new BytesStreamInput(cacheBuffer.array(), 0, opSize, true)); return true; } catch (Exception e) { return false; diff --git a/src/main/java/org/elasticsearch/rest/action/support/RestXContentBuilder.java b/src/main/java/org/elasticsearch/rest/action/support/RestXContentBuilder.java index ed262178ecd..37f60404e7a 100644 --- a/src/main/java/org/elasticsearch/rest/action/support/RestXContentBuilder.java +++ b/src/main/java/org/elasticsearch/rest/action/support/RestXContentBuilder.java @@ -68,7 +68,7 @@ public class RestXContentBuilder { public static void restDocumentSource(byte[] source, int offset, int length, XContentBuilder builder, ToXContent.Params params) throws IOException { if (LZF.isCompressed(source, offset, length)) { - BytesStreamInput siBytes = new BytesStreamInput(source, offset, length); + BytesStreamInput siBytes = new BytesStreamInput(source, offset, length, false); LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes); XContentType contentType = XContentFactory.xContentType(siLzf); siLzf.resetToBufferStart(); diff --git a/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/src/main/java/org/elasticsearch/transport/local/LocalTransport.java index 08a6daee3f2..325532164cf 100644 --- a/src/main/java/org/elasticsearch/transport/local/LocalTransport.java +++ b/src/main/java/org/elasticsearch/transport/local/LocalTransport.java @@ -195,7 +195,7 @@ public class LocalTransport extends AbstractLifecycleComponent implem void messageReceived(byte[] data, String action, LocalTransport sourceTransport, @Nullable final Long sendRequestId) { transportServiceAdapter.received(data.length); - StreamInput stream = new BytesStreamInput(data); + StreamInput stream = new BytesStreamInput(data, false); stream = CachedStreamInput.cachedHandles(stream); try { diff --git a/src/test/java/org/elasticsearch/test/unit/cluster/serialization/ClusterSerializationTests.java b/src/test/java/org/elasticsearch/test/unit/cluster/serialization/ClusterSerializationTests.java index 8d8fda75987..a05207f418d 100644 --- a/src/test/java/org/elasticsearch/test/unit/cluster/serialization/ClusterSerializationTests.java +++ b/src/test/java/org/elasticsearch/test/unit/cluster/serialization/ClusterSerializationTests.java @@ -85,7 +85,7 @@ public class ClusterSerializationTests { BytesStreamOutput outStream = new BytesStreamOutput(); RoutingTable.Builder.writeTo(source, outStream); - BytesStreamInput inStream = new BytesStreamInput(outStream.copiedByteArray()); + BytesStreamInput inStream = new BytesStreamInput(outStream.copiedByteArray(), false); RoutingTable target = RoutingTable.Builder.readFrom(inStream); assertThat(target.prettyPrint(), equalTo(source.prettyPrint())); diff --git a/src/test/java/org/elasticsearch/test/unit/common/io/streams/BytesStreamsTests.java b/src/test/java/org/elasticsearch/test/unit/common/io/streams/BytesStreamsTests.java index 9222b328750..36419eb67e8 100644 --- a/src/test/java/org/elasticsearch/test/unit/common/io/streams/BytesStreamsTests.java +++ b/src/test/java/org/elasticsearch/test/unit/common/io/streams/BytesStreamsTests.java @@ -49,7 +49,7 @@ public class BytesStreamsTests { out.writeUTF("hello"); out.writeUTF("goodbye"); - BytesStreamInput in = new BytesStreamInput(out.copiedByteArray()); + BytesStreamInput in = new BytesStreamInput(out.copiedByteArray(), false); assertThat(in.readBoolean(), equalTo(false)); assertThat(in.readByte(), equalTo((byte) 1)); assertThat(in.readShort(), equalTo((short) -1)); diff --git a/src/test/java/org/elasticsearch/test/unit/common/io/streams/HandlesStreamsTests.java b/src/test/java/org/elasticsearch/test/unit/common/io/streams/HandlesStreamsTests.java index f7b33789936..56b2a438f86 100644 --- a/src/test/java/org/elasticsearch/test/unit/common/io/streams/HandlesStreamsTests.java +++ b/src/test/java/org/elasticsearch/test/unit/common/io/streams/HandlesStreamsTests.java @@ -47,7 +47,7 @@ public class HandlesStreamsTests { out.writeUTF(higherThresholdValue); out.writeUTF(lowerThresholdValue); - HandlesStreamInput in = new HandlesStreamInput(new BytesStreamInput(bytesOut.copiedByteArray())); + HandlesStreamInput in = new HandlesStreamInput(new BytesStreamInput(bytesOut.copiedByteArray(), false)); assertThat(in.readUTF(), equalTo(lowerThresholdValue)); assertThat(in.readUTF(), equalTo(higherThresholdValue)); assertThat(in.readInt(), equalTo(1)); diff --git a/src/test/java/org/elasticsearch/test/unit/index/engine/AbstractSimpleEngineTests.java b/src/test/java/org/elasticsearch/test/unit/index/engine/AbstractSimpleEngineTests.java index 3b6705248af..7d22fe2c8c4 100644 --- a/src/test/java/org/elasticsearch/test/unit/index/engine/AbstractSimpleEngineTests.java +++ b/src/test/java/org/elasticsearch/test/unit/index/engine/AbstractSimpleEngineTests.java @@ -396,7 +396,7 @@ public abstract class AbstractSimpleEngineTests { MatcherAssert.assertThat(snapshotIndexCommit1, SnapshotIndexCommitExistsMatcher.snapshotIndexCommitExists()); assertThat(translogSnapshot1.hasNext(), equalTo(true)); Translog.Create create1 = (Translog.Create) translogSnapshot1.next(); - assertThat(create1.source(), equalTo(B_1)); + assertThat(create1.source().copyBytes(), equalTo(B_1)); assertThat(translogSnapshot1.hasNext(), equalTo(false)); Future future = executorService.submit(new Callable() { @@ -429,7 +429,7 @@ public abstract class AbstractSimpleEngineTests { assertThat(snapshotIndexCommit2.getSegmentsFileName(), not(equalTo(snapshotIndexCommit1.getSegmentsFileName()))); assertThat(translogSnapshot2.hasNext(), equalTo(true)); Translog.Create create3 = (Translog.Create) translogSnapshot2.next(); - assertThat(create3.source(), equalTo(B_3)); + assertThat(create3.source().copyBytes(), equalTo(B_3)); assertThat(translogSnapshot2.hasNext(), equalTo(false)); return null; } @@ -503,7 +503,7 @@ public abstract class AbstractSimpleEngineTests { public void phase2(Translog.Snapshot snapshot) throws EngineException { assertThat(snapshot.hasNext(), equalTo(true)); Translog.Create create = (Translog.Create) snapshot.next(); - assertThat(create.source(), equalTo(B_2)); + assertThat(create.source().copyBytes(), equalTo(B_2)); assertThat(snapshot.hasNext(), equalTo(false)); } @@ -535,7 +535,7 @@ public abstract class AbstractSimpleEngineTests { assertThat(snapshot.hasNext(), equalTo(true)); Translog.Create create = (Translog.Create) snapshot.next(); assertThat(snapshot.hasNext(), equalTo(false)); - assertThat(create.source(), equalTo(B_2)); + assertThat(create.source().copyBytes(), equalTo(B_2)); // add for phase3 ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, -1, -1, doc().add(uidField("3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false); @@ -547,7 +547,7 @@ public abstract class AbstractSimpleEngineTests { assertThat(snapshot.hasNext(), equalTo(true)); Translog.Create create = (Translog.Create) snapshot.next(); assertThat(snapshot.hasNext(), equalTo(false)); - assertThat(create.source(), equalTo(B_3)); + assertThat(create.source().copyBytes(), equalTo(B_3)); } }); diff --git a/src/test/java/org/elasticsearch/test/unit/index/translog/AbstractSimpleTranslogTests.java b/src/test/java/org/elasticsearch/test/unit/index/translog/AbstractSimpleTranslogTests.java index e233193ca82..5bc8607b5b9 100644 --- a/src/test/java/org/elasticsearch/test/unit/index/translog/AbstractSimpleTranslogTests.java +++ b/src/test/java/org/elasticsearch/test/unit/index/translog/AbstractSimpleTranslogTests.java @@ -140,11 +140,11 @@ public abstract class AbstractSimpleTranslogTests { assertThat(snapshot.hasNext(), equalTo(true)); Translog.Create create = (Translog.Create) snapshot.next(); - assertThat(create.source(), equalTo(new byte[]{1})); + assertThat(create.source().copyBytes(), equalTo(new byte[]{1})); assertThat(snapshot.hasNext(), equalTo(true)); Translog.Index index = (Translog.Index) snapshot.next(); - assertThat(index.source(), equalTo(new byte[]{2})); + assertThat(index.source().copyBytes(), equalTo(new byte[]{2})); assertThat(snapshot.hasNext(), equalTo(true)); Translog.Delete delete = (Translog.Delete) snapshot.next(); @@ -183,7 +183,7 @@ public abstract class AbstractSimpleTranslogTests { snapshot = translog.snapshot(); assertThat(snapshot.hasNext(), equalTo(true)); Translog.Create create = (Translog.Create) snapshot.next(); - assertThat(create.source(), equalTo(new byte[]{1})); + assertThat(create.source().copyBytes(), equalTo(new byte[]{1})); snapshot.release(); Translog.Snapshot snapshot1 = translog.snapshot(); @@ -201,7 +201,7 @@ public abstract class AbstractSimpleTranslogTests { snapshot = translog.snapshot(snapshot1); assertThat(snapshot.hasNext(), equalTo(true)); Translog.Index index = (Translog.Index) snapshot.next(); - assertThat(index.source(), equalTo(new byte[]{2})); + assertThat(index.source().copyBytes(), equalTo(new byte[]{2})); assertThat(snapshot.hasNext(), equalTo(false)); assertThat(snapshot.estimatedTotalOperations(), equalTo(2)); snapshot.release(); @@ -230,7 +230,7 @@ public abstract class AbstractSimpleTranslogTests { snapshot = translog.snapshot(actualSnapshot); assertThat(snapshot.hasNext(), equalTo(true)); Translog.Index index = (Translog.Index) snapshot.next(); - assertThat(index.source(), equalTo(new byte[]{3})); + assertThat(index.source().copyBytes(), equalTo(new byte[]{3})); assertThat(snapshot.hasNext(), equalTo(false)); actualSnapshot.release();