translog actions to use bytes ref serialization, and have the option to mark BytesStreamInput as unsafe

This commit is contained in:
Shay Banon 2012-01-08 17:23:37 +02:00
parent c02dc8f4f8
commit 858195351b
19 changed files with 59 additions and 142 deletions

View File

@ -233,7 +233,7 @@ public class ClusterState {
}
public static ClusterState fromBytes(byte[] data, DiscoveryNode localNode) throws IOException {
return readFrom(new BytesStreamInput(data), localNode);
return readFrom(new BytesStreamInput(data, false), localNode);
}
public static void writeTo(ClusterState state, StreamOutput out) throws IOException {

View File

@ -35,18 +35,24 @@ public class BytesStreamInput extends StreamInput {
protected int count;
public BytesStreamInput(byte buf[]) {
this(buf, 0, buf.length);
private final boolean unsafe;
public BytesStreamInput(byte buf[], boolean unsafe) {
this(buf, 0, buf.length, unsafe);
}
public BytesStreamInput(byte buf[], int offset, int length) {
public BytesStreamInput(byte buf[], int offset, int length, boolean unsafe) {
this.buf = buf;
this.pos = offset;
this.count = Math.min(offset + length, buf.length);
this.unsafe = unsafe;
}
@Override
public BytesHolder readBytesReference() throws IOException {
if (unsafe) {
return readBytesHolder();
}
int size = readVInt();
BytesHolder bytes = new BytesHolder(buf, pos, size);
pos += size;

View File

@ -40,7 +40,7 @@ public class XContentHelper {
public static XContentParser createParser(byte[] data, int offset, int length) throws IOException {
if (LZF.isCompressed(data, offset, length)) {
BytesStreamInput siBytes = new BytesStreamInput(data, offset, length);
BytesStreamInput siBytes = new BytesStreamInput(data, offset, length, false);
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
XContentType contentType = XContentFactory.xContentType(siLzf);
siLzf.resetToBufferStart();
@ -55,7 +55,7 @@ public class XContentHelper {
XContentParser parser;
XContentType contentType;
if (LZF.isCompressed(data, offset, length)) {
BytesStreamInput siBytes = new BytesStreamInput(data, offset, length);
BytesStreamInput siBytes = new BytesStreamInput(data, offset, length, false);
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
contentType = XContentFactory.xContentType(siLzf);
siLzf.resetToBufferStart();

View File

@ -396,7 +396,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
}
}
if (internal) {
StreamInput input = CachedStreamInput.cachedHandles(new BytesStreamInput(datagramPacketReceive.getData(), datagramPacketReceive.getOffset() + INTERNAL_HEADER.length, datagramPacketReceive.getLength()));
StreamInput input = CachedStreamInput.cachedHandles(new BytesStreamInput(datagramPacketReceive.getData(), datagramPacketReceive.getOffset() + INTERNAL_HEADER.length, datagramPacketReceive.getLength(), true));
Version version = Version.readVersion(input);
id = input.readInt();
clusterName = ClusterName.readClusterName(input);

View File

@ -95,7 +95,7 @@ public class PublishClusterStateAction extends AbstractComponent {
}
}
private class PublishClusterStateRequest implements Streamable {
class PublishClusterStateRequest implements Streamable {
private byte[] clusterStateInBytes;
@ -130,7 +130,7 @@ public class PublishClusterStateAction extends AbstractComponent {
@Override
public void messageReceived(PublishClusterStateRequest request, TransportChannel channel) throws Exception {
StreamInput in = CachedStreamInput.cachedHandlesLzf(new BytesStreamInput(request.clusterStateInBytes));
StreamInput in = CachedStreamInput.cachedHandlesLzf(new BytesStreamInput(request.clusterStateInBytes, false));
ClusterState clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode());
listener.onNewClusterState(clusterState);
channel.sendResponse(VoidStreamable.INSTANCE);

View File

@ -207,7 +207,7 @@ public abstract class BlobStoreGateway extends SharedStorageGateway {
XContentParser parser = null;
try {
if (LZF.isCompressed(data)) {
BytesStreamInput siBytes = new BytesStreamInput(data);
BytesStreamInput siBytes = new BytesStreamInput(data, false);
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf);
} else {

View File

@ -374,7 +374,7 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
XContentParser parser = null;
try {
if (LZF.isCompressed(data)) {
BytesStreamInput siBytes = new BytesStreamInput(data);
BytesStreamInput siBytes = new BytesStreamInput(data, false);
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf);
} else {
@ -392,7 +392,7 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
XContentParser parser = null;
try {
if (LZF.isCompressed(data)) {
BytesStreamInput siBytes = new BytesStreamInput(data);
BytesStreamInput siBytes = new BytesStreamInput(data, false);
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf);
} else {

View File

@ -462,7 +462,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
if (bos.size() < 4) {
return;
}
BytesStreamInput si = new BytesStreamInput(bos.underlyingBytes(), 0, bos.size());
BytesStreamInput si = new BytesStreamInput(bos.underlyingBytes(), 0, bos.size(), false);
int position;
while (true) {
try {

View File

@ -596,13 +596,13 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
switch (operation.opType()) {
case CREATE:
Translog.Create create = (Translog.Create) operation;
engine.create(prepareCreate(source(create.source(), create.sourceOffset(), create.sourceLength()).type(create.type()).id(create.id())
engine.create(prepareCreate(source(create.source().bytes(), create.source().offset(), create.source().length()).type(create.type()).id(create.id())
.routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl())).version(create.version())
.origin(Engine.Operation.Origin.RECOVERY));
break;
case SAVE:
Translog.Index index = (Translog.Index) operation;
engine.index(prepareIndex(source(index.source(), index.sourceOffset(), index.sourceLength()).type(index.type()).id(index.id())
engine.index(prepareIndex(source(index.source().bytes(), index.source().offset(), index.source().length()).type(index.type()).id(index.id())
.routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl())).version(index.version())
.origin(Engine.Operation.Origin.RECOVERY));
break;

View File

@ -24,7 +24,6 @@ import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@ -219,7 +218,7 @@ public interface Translog extends IndexShardComponent {
long estimateSize();
Source readSource(BytesStreamInput in) throws IOException;
Source readSource(StreamInput in) throws IOException;
}
static class Source {
@ -241,9 +240,7 @@ public interface Translog extends IndexShardComponent {
static class Create implements Operation {
private String id;
private String type;
private byte[] source;
private int sourceOffset;
private int sourceLength;
private BytesHolder source;
private String routing;
private String parent;
private long timestamp;
@ -256,9 +253,7 @@ public interface Translog extends IndexShardComponent {
public Create(Engine.Create create) {
this.id = create.id();
this.type = create.type();
this.source = create.source();
this.sourceOffset = create.sourceOffset();
this.sourceLength = create.sourceLength();
this.source = new BytesHolder(create.source(), create.sourceOffset(), create.sourceLength());
this.routing = create.routing();
this.parent = create.parent();
this.timestamp = create.timestamp();
@ -269,9 +264,7 @@ public interface Translog extends IndexShardComponent {
public Create(String type, String id, byte[] source) {
this.id = id;
this.type = type;
this.source = source;
this.sourceOffset = 0;
this.sourceLength = source.length;
this.source = new BytesHolder(source);
}
@Override
@ -281,25 +274,17 @@ public interface Translog extends IndexShardComponent {
@Override
public long estimateSize() {
return ((id.length() + type.length()) * 2) + source.length + 12;
return ((id.length() + type.length()) * 2) + source.length() + 12;
}
public String id() {
return this.id;
}
public byte[] source() {
public BytesHolder source() {
return this.source;
}
public int sourceOffset() {
return this.sourceOffset;
}
public int sourceLength() {
return this.sourceLength;
}
public String type() {
return this.type;
}
@ -325,34 +310,8 @@ public interface Translog extends IndexShardComponent {
}
@Override
public Source readSource(BytesStreamInput in) throws IOException {
int version = in.readVInt(); // version
id = in.readUTF();
type = in.readUTF();
int length = in.readVInt();
int offset = in.position();
BytesHolder source = new BytesHolder(in.underlyingBuffer(), offset, length);
in.skip(length);
if (version >= 1) {
if (in.readBoolean()) {
routing = in.readUTF();
}
}
if (version >= 2) {
if (in.readBoolean()) {
parent = in.readUTF();
}
}
if (version >= 3) {
this.version = in.readLong();
}
if (version >= 4) {
this.timestamp = in.readLong();
}
if (version >= 5) {
this.ttl = in.readLong();
}
public Source readSource(StreamInput in) throws IOException {
readFrom(in);
return new Source(source, routing, parent, timestamp, ttl);
}
@ -361,10 +320,7 @@ public interface Translog extends IndexShardComponent {
int version = in.readVInt(); // version
id = in.readUTF();
type = in.readUTF();
sourceOffset = 0;
sourceLength = in.readVInt();
source = new byte[sourceLength];
in.readFully(source);
source = in.readBytesReference();
if (version >= 1) {
if (in.readBoolean()) {
routing = in.readUTF();
@ -391,8 +347,7 @@ public interface Translog extends IndexShardComponent {
out.writeVInt(5); // version
out.writeUTF(id);
out.writeUTF(type);
out.writeVInt(sourceLength);
out.writeBytes(source, sourceOffset, sourceLength);
out.writeBytesHolder(source);
if (routing == null) {
out.writeBoolean(false);
} else {
@ -415,9 +370,7 @@ public interface Translog extends IndexShardComponent {
private String id;
private String type;
private long version;
private byte[] source;
private int sourceOffset;
private int sourceLength;
private BytesHolder source;
private String routing;
private String parent;
private long timestamp;
@ -429,9 +382,7 @@ public interface Translog extends IndexShardComponent {
public Index(Engine.Index index) {
this.id = index.id();
this.type = index.type();
this.source = index.source();
this.sourceOffset = index.sourceOffset();
this.sourceLength = index.sourceLength();
this.source = new BytesHolder(index.source(), index.sourceOffset(), index.sourceLength());
this.routing = index.routing();
this.parent = index.parent();
this.version = index.version();
@ -442,9 +393,7 @@ public interface Translog extends IndexShardComponent {
public Index(String type, String id, byte[] source) {
this.type = type;
this.id = id;
this.source = source;
this.sourceOffset = 0;
this.sourceLength = source.length;
this.source = new BytesHolder(source);
}
@Override
@ -454,7 +403,7 @@ public interface Translog extends IndexShardComponent {
@Override
public long estimateSize() {
return ((id.length() + type.length()) * 2) + source.length + 12;
return ((id.length() + type.length()) * 2) + source.length() + 12;
}
public String type() {
@ -481,51 +430,17 @@ public interface Translog extends IndexShardComponent {
return this.ttl;
}
public byte[] source() {
public BytesHolder source() {
return this.source;
}
public int sourceOffset() {
return this.sourceOffset;
}
public int sourceLength() {
return this.sourceLength;
}
public long version() {
return this.version;
}
@Override
public Source readSource(BytesStreamInput in) throws IOException {
int version = in.readVInt(); // version
id = in.readUTF();
type = in.readUTF();
int length = in.readVInt();
int offset = in.position();
BytesHolder source = new BytesHolder(in.underlyingBuffer(), offset, length);
in.skip(length);
if (version >= 1) {
if (in.readBoolean()) {
routing = in.readUTF();
}
}
if (version >= 2) {
if (in.readBoolean()) {
parent = in.readUTF();
}
}
if (version >= 3) {
this.version = in.readLong();
}
if (version >= 4) {
this.timestamp = in.readLong();
}
if (version >= 5) {
this.ttl = in.readLong();
}
public Source readSource(StreamInput in) throws IOException {
readFrom(in);
return new Source(source, routing, parent, timestamp, ttl);
}
@ -534,10 +449,7 @@ public interface Translog extends IndexShardComponent {
int version = in.readVInt(); // version
id = in.readUTF();
type = in.readUTF();
sourceOffset = 0;
sourceLength = in.readVInt();
source = new byte[sourceLength];
in.readFully(source);
source = in.readBytesReference();
if (version >= 1) {
if (in.readBoolean()) {
routing = in.readUTF();
@ -564,8 +476,7 @@ public interface Translog extends IndexShardComponent {
out.writeVInt(5); // version
out.writeUTF(id);
out.writeUTF(type);
out.writeVInt(sourceLength);
out.writeBytes(source, sourceOffset, sourceLength);
out.writeBytesHolder(source);
if (routing == null) {
out.writeBoolean(false);
} else {
@ -619,7 +530,7 @@ public interface Translog extends IndexShardComponent {
}
@Override
public Source readSource(BytesStreamInput in) throws IOException {
public Source readSource(StreamInput in) throws IOException {
throw new ElasticSearchIllegalStateException("trying to read doc source from delete operation");
}
@ -683,7 +594,7 @@ public interface Translog extends IndexShardComponent {
}
@Override
public Source readSource(BytesStreamInput in) throws IOException {
public Source readSource(StreamInput in) throws IOException {
throw new ElasticSearchIllegalStateException("trying to read doc source from delete_by_query operation");
}

View File

@ -54,7 +54,7 @@ public class TranslogStreams {
}
public static Translog.Source readSource(byte[] data) throws IOException {
BytesStreamInput in = new BytesStreamInput(data);
BytesStreamInput in = new BytesStreamInput(data, false);
in.readInt(); // the size header
Translog.Operation.Type type = Translog.Operation.Type.fromId(in.readByte());
Translog.Operation operation;

View File

@ -120,7 +120,7 @@ public class FsChannelSnapshot implements Translog.Snapshot {
channel.read(cacheBuffer, position);
cacheBuffer.flip();
position += opSize;
lastOperationRead = TranslogStreams.readTranslogOperation(new BytesStreamInput(cacheBuffer.array(), 0, opSize));
lastOperationRead = TranslogStreams.readTranslogOperation(new BytesStreamInput(cacheBuffer.array(), 0, opSize, true));
return true;
} catch (Exception e) {
return false;

View File

@ -68,7 +68,7 @@ public class RestXContentBuilder {
public static void restDocumentSource(byte[] source, int offset, int length, XContentBuilder builder, ToXContent.Params params) throws IOException {
if (LZF.isCompressed(source, offset, length)) {
BytesStreamInput siBytes = new BytesStreamInput(source, offset, length);
BytesStreamInput siBytes = new BytesStreamInput(source, offset, length, false);
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
XContentType contentType = XContentFactory.xContentType(siLzf);
siLzf.resetToBufferStart();

View File

@ -195,7 +195,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
void messageReceived(byte[] data, String action, LocalTransport sourceTransport, @Nullable final Long sendRequestId) {
transportServiceAdapter.received(data.length);
StreamInput stream = new BytesStreamInput(data);
StreamInput stream = new BytesStreamInput(data, false);
stream = CachedStreamInput.cachedHandles(stream);
try {

View File

@ -85,7 +85,7 @@ public class ClusterSerializationTests {
BytesStreamOutput outStream = new BytesStreamOutput();
RoutingTable.Builder.writeTo(source, outStream);
BytesStreamInput inStream = new BytesStreamInput(outStream.copiedByteArray());
BytesStreamInput inStream = new BytesStreamInput(outStream.copiedByteArray(), false);
RoutingTable target = RoutingTable.Builder.readFrom(inStream);
assertThat(target.prettyPrint(), equalTo(source.prettyPrint()));

View File

@ -49,7 +49,7 @@ public class BytesStreamsTests {
out.writeUTF("hello");
out.writeUTF("goodbye");
BytesStreamInput in = new BytesStreamInput(out.copiedByteArray());
BytesStreamInput in = new BytesStreamInput(out.copiedByteArray(), false);
assertThat(in.readBoolean(), equalTo(false));
assertThat(in.readByte(), equalTo((byte) 1));
assertThat(in.readShort(), equalTo((short) -1));

View File

@ -47,7 +47,7 @@ public class HandlesStreamsTests {
out.writeUTF(higherThresholdValue);
out.writeUTF(lowerThresholdValue);
HandlesStreamInput in = new HandlesStreamInput(new BytesStreamInput(bytesOut.copiedByteArray()));
HandlesStreamInput in = new HandlesStreamInput(new BytesStreamInput(bytesOut.copiedByteArray(), false));
assertThat(in.readUTF(), equalTo(lowerThresholdValue));
assertThat(in.readUTF(), equalTo(higherThresholdValue));
assertThat(in.readInt(), equalTo(1));

View File

@ -396,7 +396,7 @@ public abstract class AbstractSimpleEngineTests {
MatcherAssert.assertThat(snapshotIndexCommit1, SnapshotIndexCommitExistsMatcher.snapshotIndexCommitExists());
assertThat(translogSnapshot1.hasNext(), equalTo(true));
Translog.Create create1 = (Translog.Create) translogSnapshot1.next();
assertThat(create1.source(), equalTo(B_1));
assertThat(create1.source().copyBytes(), equalTo(B_1));
assertThat(translogSnapshot1.hasNext(), equalTo(false));
Future<Object> future = executorService.submit(new Callable<Object>() {
@ -429,7 +429,7 @@ public abstract class AbstractSimpleEngineTests {
assertThat(snapshotIndexCommit2.getSegmentsFileName(), not(equalTo(snapshotIndexCommit1.getSegmentsFileName())));
assertThat(translogSnapshot2.hasNext(), equalTo(true));
Translog.Create create3 = (Translog.Create) translogSnapshot2.next();
assertThat(create3.source(), equalTo(B_3));
assertThat(create3.source().copyBytes(), equalTo(B_3));
assertThat(translogSnapshot2.hasNext(), equalTo(false));
return null;
}
@ -503,7 +503,7 @@ public abstract class AbstractSimpleEngineTests {
public void phase2(Translog.Snapshot snapshot) throws EngineException {
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Create create = (Translog.Create) snapshot.next();
assertThat(create.source(), equalTo(B_2));
assertThat(create.source().copyBytes(), equalTo(B_2));
assertThat(snapshot.hasNext(), equalTo(false));
}
@ -535,7 +535,7 @@ public abstract class AbstractSimpleEngineTests {
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Create create = (Translog.Create) snapshot.next();
assertThat(snapshot.hasNext(), equalTo(false));
assertThat(create.source(), equalTo(B_2));
assertThat(create.source().copyBytes(), equalTo(B_2));
// add for phase3
ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, -1, -1, doc().add(uidField("3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false);
@ -547,7 +547,7 @@ public abstract class AbstractSimpleEngineTests {
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Create create = (Translog.Create) snapshot.next();
assertThat(snapshot.hasNext(), equalTo(false));
assertThat(create.source(), equalTo(B_3));
assertThat(create.source().copyBytes(), equalTo(B_3));
}
});

View File

@ -140,11 +140,11 @@ public abstract class AbstractSimpleTranslogTests {
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Create create = (Translog.Create) snapshot.next();
assertThat(create.source(), equalTo(new byte[]{1}));
assertThat(create.source().copyBytes(), equalTo(new byte[]{1}));
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Index index = (Translog.Index) snapshot.next();
assertThat(index.source(), equalTo(new byte[]{2}));
assertThat(index.source().copyBytes(), equalTo(new byte[]{2}));
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Delete delete = (Translog.Delete) snapshot.next();
@ -183,7 +183,7 @@ public abstract class AbstractSimpleTranslogTests {
snapshot = translog.snapshot();
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Create create = (Translog.Create) snapshot.next();
assertThat(create.source(), equalTo(new byte[]{1}));
assertThat(create.source().copyBytes(), equalTo(new byte[]{1}));
snapshot.release();
Translog.Snapshot snapshot1 = translog.snapshot();
@ -201,7 +201,7 @@ public abstract class AbstractSimpleTranslogTests {
snapshot = translog.snapshot(snapshot1);
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Index index = (Translog.Index) snapshot.next();
assertThat(index.source(), equalTo(new byte[]{2}));
assertThat(index.source().copyBytes(), equalTo(new byte[]{2}));
assertThat(snapshot.hasNext(), equalTo(false));
assertThat(snapshot.estimatedTotalOperations(), equalTo(2));
snapshot.release();
@ -230,7 +230,7 @@ public abstract class AbstractSimpleTranslogTests {
snapshot = translog.snapshot(actualSnapshot);
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Index index = (Translog.Index) snapshot.next();
assertThat(index.source(), equalTo(new byte[]{3}));
assertThat(index.source().copyBytes(), equalTo(new byte[]{3}));
assertThat(snapshot.hasNext(), equalTo(false));
actualSnapshot.release();