From 81e3d5cdcb44a90c9ab197c192ee8d1c84892db3 Mon Sep 17 00:00:00 2001 From: Britta Weber Date: Sun, 17 May 2015 14:24:15 +0200 Subject: [PATCH] use imutable object for commit id --- .../elasticsearch/index/engine/Engine.java | 60 ++++++++++++++++++- .../index/engine/InternalEngine.java | 14 ++--- .../index/engine/ShadowEngine.java | 8 +-- .../elasticsearch/index/shard/IndexShard.java | 9 ++- .../indices/SyncedFlushService.java | 39 ++++++------ .../index/engine/InternalEngineTests.java | 13 ++-- .../indices/SycnedFlushSingleNodeTest.java | 7 ++- 7 files changed, 102 insertions(+), 48 deletions(-) diff --git a/src/main/java/org/elasticsearch/index/engine/Engine.java b/src/main/java/org/elasticsearch/index/engine/Engine.java index 09a388ec532..ed2b49f303f 100644 --- a/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -37,8 +37,12 @@ import org.apache.lucene.search.join.BitDocIdSetFilter; import org.apache.lucene.util.Accountable; import org.apache.lucene.util.Accountables; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.Base64; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.logging.ESLogger; @@ -227,7 +231,7 @@ public abstract class Engine implements Closeable { * @param expectedCommitId the expected value of * @return true if the sync commit was made, false o.w. */ - public abstract SyncedFlushResult syncFlushIfNoPendingChanges(String syncId, byte[] expectedCommitId) throws EngineException; + public abstract SyncedFlushResult syncFlushIfNoPendingChanges(String syncId, CommitId expectedCommitId) throws EngineException; public enum SyncedFlushResult { SUCCESS, @@ -458,7 +462,7 @@ public abstract class Engine implements Closeable { * Otherwise this call will return without blocking. * @return the commit Id for the resulting commit */ - public abstract byte[] flush(boolean force, boolean waitIfOngoing) throws EngineException; + public abstract CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException; /** * Flushes the state of the engine including the transaction log, clearing memory and persisting @@ -468,7 +472,7 @@ public abstract class Engine implements Closeable { * * @return the commit Id for the resulting commit */ - public abstract byte[] flush() throws EngineException; + public abstract CommitId flush() throws EngineException; /** * Optimizes to 1 segment @@ -1141,4 +1145,54 @@ public abstract class Engine implements Closeable { * @return */ public abstract boolean hasUncommittedChanges(); + + public static class CommitId implements Writeable { + private byte[] id; + public CommitId(byte[] id) { + assert id != null; + this.id = Arrays.copyOf(id, id.length); + } + + public CommitId(StreamInput in) throws IOException { + assert in != null; + this.id = in.readByteArray(); + } + + @Override + public String toString() { + return Base64.encodeBytes(id); + } + + @Override + public CommitId readFrom(StreamInput in) throws IOException { + byte[] bytes = in.readByteArray(); + return new CommitId(bytes); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeByteArray(id); + } + + public boolean idsEqual(byte[] id) { + return Arrays.equals(id, this.id); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + CommitId commitId = (CommitId) o; + + if (!Arrays.equals(id, commitId.id)) return false; + + return true; + } + + @Override + public int hashCode() { + return Arrays.hashCode(id); + } + } } diff --git a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 70e31700c7f..3cb8999dc28 100644 --- a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -665,14 +665,14 @@ public class InternalEngine extends Engine { } @Override - public SyncedFlushResult syncFlushIfNoPendingChanges(String syncId, byte[] expectedCommitId) throws EngineException { + public SyncedFlushResult syncFlushIfNoPendingChanges(String syncId, CommitId expectedCommitId) throws EngineException { // best effort attempt before we acquire locks ensureOpen(); if (indexWriter.hasUncommittedChanges()) { logger.trace("can't sync commit [{}]. have pending changes", syncId); return SyncedFlushResult.FAILED_PENDING_OPERATIONS; } - if (Arrays.equals(expectedCommitId, lastCommittedSegmentInfos.getId()) == false) { + if (expectedCommitId.idsEqual(lastCommittedSegmentInfos.getId()) == false) { logger.trace("can't sync commit [{}]. current commit id is not equal to expected.", syncId); return SyncedFlushResult.FAILED_COMMIT_MISMATCH; } @@ -682,7 +682,7 @@ public class InternalEngine extends Engine { logger.trace("can't sync commit [{}]. have pending changes", syncId); return SyncedFlushResult.FAILED_PENDING_OPERATIONS; } - if (Arrays.equals(expectedCommitId, lastCommittedSegmentInfos.getId()) == false) { + if (expectedCommitId.idsEqual(lastCommittedSegmentInfos.getId()) == false) { logger.trace("can't sync commit [{}]. current commit id is not equal to expected.", syncId); return SyncedFlushResult.FAILED_COMMIT_MISMATCH; } @@ -699,16 +699,16 @@ public class InternalEngine extends Engine { } @Override - public byte[] flush() throws EngineException { + public CommitId flush() throws EngineException { return flush(true, false, false); } @Override - public byte[] flush(boolean force, boolean waitIfOngoing) throws EngineException { + public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException { return flush(true, force, waitIfOngoing); } - private byte[] flush(boolean commitTranslog, boolean force, boolean waitIfOngoing) throws EngineException { + private CommitId flush(boolean commitTranslog, boolean force, boolean waitIfOngoing) throws EngineException { ensureOpen(); final byte[] newCommitId; /* @@ -799,7 +799,7 @@ public class InternalEngine extends Engine { if (engineConfig.isEnableGcDeletes()) { pruneDeletedTombstones(); } - return newCommitId; + return new CommitId(newCommitId); } private void pruneDeletedTombstones() { diff --git a/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java b/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java index 303426d16b3..9b295e7e60f 100644 --- a/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java @@ -125,17 +125,17 @@ public class ShadowEngine extends Engine { } @Override - public SyncedFlushResult syncFlushIfNoPendingChanges(String syncId, byte[] expectedCommitId) { + public SyncedFlushResult syncFlushIfNoPendingChanges(String syncId, CommitId expectedCommitId) { throw new UnsupportedOperationException(shardId + " sync commit operation not allowed on shadow engine"); } @Override - public byte[] flush() throws EngineException { + public CommitId flush() throws EngineException { return flush(false, false); } @Override - public byte[] flush(boolean force, boolean waitIfOngoing) throws EngineException { + public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException { logger.trace("skipping FLUSH on shadow engine"); // reread the last committed segment infos refresh("flush"); @@ -159,7 +159,7 @@ public class ShadowEngine extends Engine { } finally { store.decRef(); } - return lastCommittedSegmentInfos.getId(); + return new CommitId(lastCommittedSegmentInfos.getId()); } @Override diff --git a/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 873f93140ac..fa0a8daccc2 100644 --- a/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -115,8 +115,6 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.io.PrintStream; import java.nio.channels.ClosedByInterruptException; -import java.util.Arrays; -import java.util.Locale; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ScheduledFuture; @@ -693,13 +691,13 @@ public class IndexShard extends AbstractIndexShardComponent { return completionStats; } - public Engine.SyncedFlushResult syncFlushIfNoPendingChanges(String syncId, byte[] expectedCommitId) { + public Engine.SyncedFlushResult syncFlushIfNoPendingChanges(String syncId, Engine.CommitId expectedCommitId) { verifyStartedOrRecovering(); logger.trace("trying to sync flush. sync id [{}]. expected commit id [{}]]", syncId, expectedCommitId); return engine().syncFlushIfNoPendingChanges(syncId, expectedCommitId); } - public byte[] flush(FlushRequest request) throws ElasticsearchException { + public Engine.CommitId flush(FlushRequest request) throws ElasticsearchException { boolean waitIfOngoing = request.waitIfOngoing(); boolean force = request.force(); if (logger.isTraceEnabled()) { @@ -711,7 +709,7 @@ public class IndexShard extends AbstractIndexShardComponent { verifyStartedOrRecovering(); long time = System.nanoTime(); - byte[] commitId = engine().flush(force, waitIfOngoing); + Engine.CommitId commitId = engine().flush(force, waitIfOngoing); flushMetric.inc(System.nanoTime() - time); return commitId; @@ -1385,4 +1383,5 @@ public class IndexShard extends AbstractIndexShardComponent { public Translog.Durabilty getTranslogDurability() { return engine().getTranslog().getDurabilty(); } + } diff --git a/src/main/java/org/elasticsearch/indices/SyncedFlushService.java b/src/main/java/org/elasticsearch/indices/SyncedFlushService.java index 2974b55ae72..da7d7282555 100644 --- a/src/main/java/org/elasticsearch/indices/SyncedFlushService.java +++ b/src/main/java/org/elasticsearch/indices/SyncedFlushService.java @@ -49,7 +49,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; import java.io.IOException; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -137,7 +136,7 @@ public class SyncedFlushService extends AbstractComponent { final ClusterState state = clusterService.state(); final IndexShardRoutingTable shardRoutingTable = getActiveShardRoutings(shardId, state); final List activeShards = shardRoutingTable.activeShards(); - Map commitIds = sendPreSyncRequests(activeShards, state, shardId); + Map commitIds = sendPreSyncRequests(activeShards, state, shardId); if (commitIds.isEmpty()) { actionListener.onResponse(new SyncedFlushResult(shardId, "all shards failed to commit on pre-sync")); @@ -221,7 +220,7 @@ public class SyncedFlushService extends AbstractComponent { } - void sendSyncRequests(final String syncId, List shards, ClusterState state, Map expectedCommitIds, final ShardId shardId, final ActionListener listener) { + void sendSyncRequests(final String syncId, List shards, ClusterState state, Map expectedCommitIds, final ShardId shardId, final ActionListener listener) { final CountDown countDownLatch = new CountDown(shards.size()); final Map results = ConcurrentCollections.newConcurrentMap(); for (final ShardRouting shard : shards) { @@ -234,7 +233,7 @@ public class SyncedFlushService extends AbstractComponent { } continue; } - final byte[] expectedCommitId = expectedCommitIds.get(shard.currentNodeId()); + final Engine.CommitId expectedCommitId = expectedCommitIds.get(shard.currentNodeId()); if (expectedCommitId == null) { logger.trace("{} can't resolve expected commit id for {}, skipping for sync id [{}]. shard routing {}", shardId, syncId, shard); results.put(shard, new SyncedFlushResponse("no commit id from pre-sync flush")); @@ -282,9 +281,9 @@ public class SyncedFlushService extends AbstractComponent { /** * send presync requests to all started copies of the given shard */ - Map sendPreSyncRequests(final List shards, final ClusterState state, final ShardId shardId) { + Map sendPreSyncRequests(final List shards, final ClusterState state, final ShardId shardId) { final CountDownLatch countDownLatch = new CountDownLatch(shards.size()); - final Map commitIds = ConcurrentCollections.newConcurrentMap(); + final Map commitIds = ConcurrentCollections.newConcurrentMap(); for (final ShardRouting shard : shards) { logger.trace("{} sending pre-synced flush request to {}", shardId, shard); final DiscoveryNode node = state.nodes().get(shard.currentNodeId()); @@ -301,7 +300,7 @@ public class SyncedFlushService extends AbstractComponent { @Override public void handleResponse(PreSyncedFlushResponse response) { - byte[] existing = commitIds.put(node.id(), response.commitId()); + Engine.CommitId existing = commitIds.put(node.id(), response.commitId()); assert existing == null : "got two answers for node [" + node + "]"; // count after the assert so we won't decrement twice in handleException countDownLatch.countDown(); @@ -334,9 +333,9 @@ public class SyncedFlushService extends AbstractComponent { IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).shardSafe(request.shardId().id()); FlushRequest flushRequest = new FlushRequest().force(false).waitIfOngoing(true); logger.trace("{} performing pre sync flush", request.shardId()); - byte[] id = indexShard.flush(flushRequest); - logger.trace("{} pre sync flush done. commit id {}", request.shardId(), id); - return new PreSyncedFlushResponse(id); + Engine.CommitId commitId = indexShard.flush(flushRequest); + logger.trace("{} pre sync flush done. commit id {}", request.shardId(), commitId); + return new PreSyncedFlushResponse(commitId); } private SyncedFlushResponse performSyncedFlush(SyncedFlushRequest request) { @@ -526,42 +525,42 @@ public class SyncedFlushService extends AbstractComponent { */ final static class PreSyncedFlushResponse extends TransportResponse { - private byte[] commitId; + Engine.CommitId commitId; PreSyncedFlushResponse() { } - PreSyncedFlushResponse(byte[] commitId) { + PreSyncedFlushResponse(Engine.CommitId commitId) { this.commitId = commitId; } - public byte[] commitId() { + public Engine.CommitId commitId() { return commitId; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - commitId = in.readByteArray(); + commitId = new Engine.CommitId(in); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeByteArray(commitId); + commitId.writeTo(out); } } static final class SyncedFlushRequest extends TransportRequest { private String syncId; - private byte[] expectedCommitId; + private Engine.CommitId expectedCommitId; private ShardId shardId; public SyncedFlushRequest() { } - public SyncedFlushRequest(ShardId shardId, String syncId, byte[] expectedCommitId) { + public SyncedFlushRequest(ShardId shardId, String syncId, Engine.CommitId expectedCommitId) { this.expectedCommitId = expectedCommitId; this.shardId = shardId; this.syncId = syncId; @@ -571,7 +570,7 @@ public class SyncedFlushService extends AbstractComponent { public void readFrom(StreamInput in) throws IOException { super.readFrom(in); shardId = ShardId.readShardId(in); - expectedCommitId = in.readByteArray(); + expectedCommitId = new Engine.CommitId(in); syncId = in.readString(); } @@ -579,7 +578,7 @@ public class SyncedFlushService extends AbstractComponent { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); shardId.writeTo(out); - out.writeByteArray(expectedCommitId); + expectedCommitId.writeTo(out); out.writeString(syncId); } @@ -591,7 +590,7 @@ public class SyncedFlushService extends AbstractComponent { return syncId; } - public byte[] expectedCommitId() { + public Engine.CommitId expectedCommitId() { return expectedCommitId; } diff --git a/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index fabbfb49f8c..9788e23b4d9 100644 --- a/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -41,6 +41,7 @@ import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.LuceneTestCase.SuppressFileSystems; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.Base64; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -690,11 +691,12 @@ public class InternalEngineTests extends ElasticsearchTestCase { final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null); engine.create(new Engine.Create(null, newUid("1"), doc)); - byte[] commitID = engine.flush(); - assertThat(commitID, equalTo(store.readLastCommittedSegmentsInfo().getId())); - byte[] fakeId = commitID.clone(); - fakeId[0] = (byte) ~fakeId[0]; - assertThat("should fail to sync flush with wrong id (but no docs)", engine.syncFlushIfNoPendingChanges(syncId + "1", fakeId), + Engine.CommitId commitID = engine.flush(); + assertThat(commitID, equalTo(new Engine.CommitId(store.readLastCommittedSegmentsInfo().getId()))); + byte[] wrongBytes = Base64.decode(commitID.toString()); + wrongBytes[0] = (byte) ~wrongBytes[0]; + Engine.CommitId wrongId = new Engine.CommitId(wrongBytes); + assertThat("should fail to sync flush with wrong id (but no docs)", engine.syncFlushIfNoPendingChanges(syncId + "1", wrongId), equalTo(Engine.SyncedFlushResult.FAILED_COMMIT_MISMATCH)); engine.create(new Engine.Create(null, newUid("2"), doc)); assertThat("should fail to sync flush with right id but pending doc", engine.syncFlushIfNoPendingChanges(syncId + "2", commitID), @@ -1797,5 +1799,4 @@ public class InternalEngineTests extends ElasticsearchTestCase { recoveredOps.incrementAndGet(); } } - } diff --git a/src/test/java/org/elasticsearch/indices/SycnedFlushSingleNodeTest.java b/src/test/java/org/elasticsearch/indices/SycnedFlushSingleNodeTest.java index ea9d77b0acc..beb55f6ff4e 100644 --- a/src/test/java/org/elasticsearch/indices/SycnedFlushSingleNodeTest.java +++ b/src/test/java/org/elasticsearch/indices/SycnedFlushSingleNodeTest.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Strings; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ElasticsearchSingleNodeTest; @@ -49,7 +50,7 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest { final IndexShardRoutingTable shardRoutingTable = flushService.getActiveShardRoutings(shardId, state); final List activeShards = shardRoutingTable.activeShards(); assertEquals("exactly one active shard", 1, activeShards.size()); - Map commitIds = flushService.sendPreSyncRequests(activeShards, state, shardId); + Map commitIds = flushService.sendPreSyncRequests(activeShards, state, shardId); assertEquals("exactly one commit id", 1, commitIds.size()); client().prepareIndex("test", "test", "2").setSource("{}").get(); String syncId = Strings.base64UUID(); @@ -171,7 +172,7 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest { final IndexShardRoutingTable shardRoutingTable = flushService.getActiveShardRoutings(shardId, state); final List activeShards = shardRoutingTable.activeShards(); assertEquals("exactly one active shard", 1, activeShards.size()); - Map commitIds = flushService.sendPreSyncRequests(activeShards, state, shardId); + Map commitIds = flushService.sendPreSyncRequests(activeShards, state, shardId); assertEquals("exactly one commit id", 1, commitIds.size()); if (randomBoolean()) { client().prepareIndex("test", "test", "2").setSource("{}").get(); @@ -205,7 +206,7 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest { final IndexShardRoutingTable shardRoutingTable = flushService.getActiveShardRoutings(shardId, state); final List activeShards = shardRoutingTable.activeShards(); assertEquals("exactly one active shard", 1, activeShards.size()); - Map commitIds = flushService.sendPreSyncRequests(activeShards, state, shardId); + Map commitIds = flushService.sendPreSyncRequests(activeShards, state, shardId); assertEquals("exactly one commit id", 1, commitIds.size()); commitIds.clear(); // wipe it... String syncId = Strings.base64UUID();