use imutable object for commit id

This commit is contained in:
Britta Weber 2015-05-17 14:24:15 +02:00
parent 7a8d08a4a3
commit 81e3d5cdcb
7 changed files with 102 additions and 48 deletions

View File

@ -37,8 +37,12 @@ import org.apache.lucene.search.join.BitDocIdSetFilter;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.Accountables;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Base64;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger;
@ -227,7 +231,7 @@ public abstract class Engine implements Closeable {
* @param expectedCommitId the expected value of
* @return true if the sync commit was made, false o.w.
*/
public abstract SyncedFlushResult syncFlushIfNoPendingChanges(String syncId, byte[] expectedCommitId) throws EngineException;
public abstract SyncedFlushResult syncFlushIfNoPendingChanges(String syncId, CommitId expectedCommitId) throws EngineException;
public enum SyncedFlushResult {
SUCCESS,
@ -458,7 +462,7 @@ public abstract class Engine implements Closeable {
* Otherwise this call will return without blocking.
* @return the commit Id for the resulting commit
*/
public abstract byte[] flush(boolean force, boolean waitIfOngoing) throws EngineException;
public abstract CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException;
/**
* Flushes the state of the engine including the transaction log, clearing memory and persisting
@ -468,7 +472,7 @@ public abstract class Engine implements Closeable {
*
* @return the commit Id for the resulting commit
*/
public abstract byte[] flush() throws EngineException;
public abstract CommitId flush() throws EngineException;
/**
* Optimizes to 1 segment
@ -1141,4 +1145,54 @@ public abstract class Engine implements Closeable {
* @return
*/
public abstract boolean hasUncommittedChanges();
public static class CommitId implements Writeable {
private byte[] id;
public CommitId(byte[] id) {
assert id != null;
this.id = Arrays.copyOf(id, id.length);
}
public CommitId(StreamInput in) throws IOException {
assert in != null;
this.id = in.readByteArray();
}
@Override
public String toString() {
return Base64.encodeBytes(id);
}
@Override
public CommitId readFrom(StreamInput in) throws IOException {
byte[] bytes = in.readByteArray();
return new CommitId(bytes);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeByteArray(id);
}
public boolean idsEqual(byte[] id) {
return Arrays.equals(id, this.id);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CommitId commitId = (CommitId) o;
if (!Arrays.equals(id, commitId.id)) return false;
return true;
}
@Override
public int hashCode() {
return Arrays.hashCode(id);
}
}
}

View File

@ -665,14 +665,14 @@ public class InternalEngine extends Engine {
}
@Override
public SyncedFlushResult syncFlushIfNoPendingChanges(String syncId, byte[] expectedCommitId) throws EngineException {
public SyncedFlushResult syncFlushIfNoPendingChanges(String syncId, CommitId expectedCommitId) throws EngineException {
// best effort attempt before we acquire locks
ensureOpen();
if (indexWriter.hasUncommittedChanges()) {
logger.trace("can't sync commit [{}]. have pending changes", syncId);
return SyncedFlushResult.FAILED_PENDING_OPERATIONS;
}
if (Arrays.equals(expectedCommitId, lastCommittedSegmentInfos.getId()) == false) {
if (expectedCommitId.idsEqual(lastCommittedSegmentInfos.getId()) == false) {
logger.trace("can't sync commit [{}]. current commit id is not equal to expected.", syncId);
return SyncedFlushResult.FAILED_COMMIT_MISMATCH;
}
@ -682,7 +682,7 @@ public class InternalEngine extends Engine {
logger.trace("can't sync commit [{}]. have pending changes", syncId);
return SyncedFlushResult.FAILED_PENDING_OPERATIONS;
}
if (Arrays.equals(expectedCommitId, lastCommittedSegmentInfos.getId()) == false) {
if (expectedCommitId.idsEqual(lastCommittedSegmentInfos.getId()) == false) {
logger.trace("can't sync commit [{}]. current commit id is not equal to expected.", syncId);
return SyncedFlushResult.FAILED_COMMIT_MISMATCH;
}
@ -699,16 +699,16 @@ public class InternalEngine extends Engine {
}
@Override
public byte[] flush() throws EngineException {
public CommitId flush() throws EngineException {
return flush(true, false, false);
}
@Override
public byte[] flush(boolean force, boolean waitIfOngoing) throws EngineException {
public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException {
return flush(true, force, waitIfOngoing);
}
private byte[] flush(boolean commitTranslog, boolean force, boolean waitIfOngoing) throws EngineException {
private CommitId flush(boolean commitTranslog, boolean force, boolean waitIfOngoing) throws EngineException {
ensureOpen();
final byte[] newCommitId;
/*
@ -799,7 +799,7 @@ public class InternalEngine extends Engine {
if (engineConfig.isEnableGcDeletes()) {
pruneDeletedTombstones();
}
return newCommitId;
return new CommitId(newCommitId);
}
private void pruneDeletedTombstones() {

View File

@ -125,17 +125,17 @@ public class ShadowEngine extends Engine {
}
@Override
public SyncedFlushResult syncFlushIfNoPendingChanges(String syncId, byte[] expectedCommitId) {
public SyncedFlushResult syncFlushIfNoPendingChanges(String syncId, CommitId expectedCommitId) {
throw new UnsupportedOperationException(shardId + " sync commit operation not allowed on shadow engine");
}
@Override
public byte[] flush() throws EngineException {
public CommitId flush() throws EngineException {
return flush(false, false);
}
@Override
public byte[] flush(boolean force, boolean waitIfOngoing) throws EngineException {
public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException {
logger.trace("skipping FLUSH on shadow engine");
// reread the last committed segment infos
refresh("flush");
@ -159,7 +159,7 @@ public class ShadowEngine extends Engine {
} finally {
store.decRef();
}
return lastCommittedSegmentInfos.getId();
return new CommitId(lastCommittedSegmentInfos.getId());
}
@Override

View File

@ -115,8 +115,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.channels.ClosedByInterruptException;
import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
@ -693,13 +691,13 @@ public class IndexShard extends AbstractIndexShardComponent {
return completionStats;
}
public Engine.SyncedFlushResult syncFlushIfNoPendingChanges(String syncId, byte[] expectedCommitId) {
public Engine.SyncedFlushResult syncFlushIfNoPendingChanges(String syncId, Engine.CommitId expectedCommitId) {
verifyStartedOrRecovering();
logger.trace("trying to sync flush. sync id [{}]. expected commit id [{}]]", syncId, expectedCommitId);
return engine().syncFlushIfNoPendingChanges(syncId, expectedCommitId);
}
public byte[] flush(FlushRequest request) throws ElasticsearchException {
public Engine.CommitId flush(FlushRequest request) throws ElasticsearchException {
boolean waitIfOngoing = request.waitIfOngoing();
boolean force = request.force();
if (logger.isTraceEnabled()) {
@ -711,7 +709,7 @@ public class IndexShard extends AbstractIndexShardComponent {
verifyStartedOrRecovering();
long time = System.nanoTime();
byte[] commitId = engine().flush(force, waitIfOngoing);
Engine.CommitId commitId = engine().flush(force, waitIfOngoing);
flushMetric.inc(System.nanoTime() - time);
return commitId;
@ -1385,4 +1383,5 @@ public class IndexShard extends AbstractIndexShardComponent {
public Translog.Durabilty getTranslogDurability() {
return engine().getTranslog().getDurabilty();
}
}

View File

@ -49,7 +49,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@ -137,7 +136,7 @@ public class SyncedFlushService extends AbstractComponent {
final ClusterState state = clusterService.state();
final IndexShardRoutingTable shardRoutingTable = getActiveShardRoutings(shardId, state);
final List<ShardRouting> activeShards = shardRoutingTable.activeShards();
Map<String, byte[]> commitIds = sendPreSyncRequests(activeShards, state, shardId);
Map<String, Engine.CommitId> commitIds = sendPreSyncRequests(activeShards, state, shardId);
if (commitIds.isEmpty()) {
actionListener.onResponse(new SyncedFlushResult(shardId, "all shards failed to commit on pre-sync"));
@ -221,7 +220,7 @@ public class SyncedFlushService extends AbstractComponent {
}
void sendSyncRequests(final String syncId, List<ShardRouting> shards, ClusterState state, Map<String, byte[]> expectedCommitIds, final ShardId shardId, final ActionListener<SyncedFlushResult> listener) {
void sendSyncRequests(final String syncId, List<ShardRouting> shards, ClusterState state, Map<String, Engine.CommitId> expectedCommitIds, final ShardId shardId, final ActionListener<SyncedFlushResult> listener) {
final CountDown countDownLatch = new CountDown(shards.size());
final Map<ShardRouting, SyncedFlushResponse> results = ConcurrentCollections.newConcurrentMap();
for (final ShardRouting shard : shards) {
@ -234,7 +233,7 @@ public class SyncedFlushService extends AbstractComponent {
}
continue;
}
final byte[] expectedCommitId = expectedCommitIds.get(shard.currentNodeId());
final Engine.CommitId expectedCommitId = expectedCommitIds.get(shard.currentNodeId());
if (expectedCommitId == null) {
logger.trace("{} can't resolve expected commit id for {}, skipping for sync id [{}]. shard routing {}", shardId, syncId, shard);
results.put(shard, new SyncedFlushResponse("no commit id from pre-sync flush"));
@ -282,9 +281,9 @@ public class SyncedFlushService extends AbstractComponent {
/**
* send presync requests to all started copies of the given shard
*/
Map<String, byte[]> sendPreSyncRequests(final List<ShardRouting> shards, final ClusterState state, final ShardId shardId) {
Map<String, Engine.CommitId> sendPreSyncRequests(final List<ShardRouting> shards, final ClusterState state, final ShardId shardId) {
final CountDownLatch countDownLatch = new CountDownLatch(shards.size());
final Map<String, byte[]> commitIds = ConcurrentCollections.newConcurrentMap();
final Map<String, Engine.CommitId> commitIds = ConcurrentCollections.newConcurrentMap();
for (final ShardRouting shard : shards) {
logger.trace("{} sending pre-synced flush request to {}", shardId, shard);
final DiscoveryNode node = state.nodes().get(shard.currentNodeId());
@ -301,7 +300,7 @@ public class SyncedFlushService extends AbstractComponent {
@Override
public void handleResponse(PreSyncedFlushResponse response) {
byte[] existing = commitIds.put(node.id(), response.commitId());
Engine.CommitId existing = commitIds.put(node.id(), response.commitId());
assert existing == null : "got two answers for node [" + node + "]";
// count after the assert so we won't decrement twice in handleException
countDownLatch.countDown();
@ -334,9 +333,9 @@ public class SyncedFlushService extends AbstractComponent {
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).shardSafe(request.shardId().id());
FlushRequest flushRequest = new FlushRequest().force(false).waitIfOngoing(true);
logger.trace("{} performing pre sync flush", request.shardId());
byte[] id = indexShard.flush(flushRequest);
logger.trace("{} pre sync flush done. commit id {}", request.shardId(), id);
return new PreSyncedFlushResponse(id);
Engine.CommitId commitId = indexShard.flush(flushRequest);
logger.trace("{} pre sync flush done. commit id {}", request.shardId(), commitId);
return new PreSyncedFlushResponse(commitId);
}
private SyncedFlushResponse performSyncedFlush(SyncedFlushRequest request) {
@ -526,42 +525,42 @@ public class SyncedFlushService extends AbstractComponent {
*/
final static class PreSyncedFlushResponse extends TransportResponse {
private byte[] commitId;
Engine.CommitId commitId;
PreSyncedFlushResponse() {
}
PreSyncedFlushResponse(byte[] commitId) {
PreSyncedFlushResponse(Engine.CommitId commitId) {
this.commitId = commitId;
}
public byte[] commitId() {
public Engine.CommitId commitId() {
return commitId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
commitId = in.readByteArray();
commitId = new Engine.CommitId(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByteArray(commitId);
commitId.writeTo(out);
}
}
static final class SyncedFlushRequest extends TransportRequest {
private String syncId;
private byte[] expectedCommitId;
private Engine.CommitId expectedCommitId;
private ShardId shardId;
public SyncedFlushRequest() {
}
public SyncedFlushRequest(ShardId shardId, String syncId, byte[] expectedCommitId) {
public SyncedFlushRequest(ShardId shardId, String syncId, Engine.CommitId expectedCommitId) {
this.expectedCommitId = expectedCommitId;
this.shardId = shardId;
this.syncId = syncId;
@ -571,7 +570,7 @@ public class SyncedFlushService extends AbstractComponent {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
expectedCommitId = in.readByteArray();
expectedCommitId = new Engine.CommitId(in);
syncId = in.readString();
}
@ -579,7 +578,7 @@ public class SyncedFlushService extends AbstractComponent {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardId.writeTo(out);
out.writeByteArray(expectedCommitId);
expectedCommitId.writeTo(out);
out.writeString(syncId);
}
@ -591,7 +590,7 @@ public class SyncedFlushService extends AbstractComponent {
return syncId;
}
public byte[] expectedCommitId() {
public Engine.CommitId expectedCommitId() {
return expectedCommitId;
}

View File

@ -41,6 +41,7 @@ import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase.SuppressFileSystems;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Base64;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
@ -690,11 +691,12 @@ public class InternalEngineTests extends ElasticsearchTestCase {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.create(new Engine.Create(null, newUid("1"), doc));
byte[] commitID = engine.flush();
assertThat(commitID, equalTo(store.readLastCommittedSegmentsInfo().getId()));
byte[] fakeId = commitID.clone();
fakeId[0] = (byte) ~fakeId[0];
assertThat("should fail to sync flush with wrong id (but no docs)", engine.syncFlushIfNoPendingChanges(syncId + "1", fakeId),
Engine.CommitId commitID = engine.flush();
assertThat(commitID, equalTo(new Engine.CommitId(store.readLastCommittedSegmentsInfo().getId())));
byte[] wrongBytes = Base64.decode(commitID.toString());
wrongBytes[0] = (byte) ~wrongBytes[0];
Engine.CommitId wrongId = new Engine.CommitId(wrongBytes);
assertThat("should fail to sync flush with wrong id (but no docs)", engine.syncFlushIfNoPendingChanges(syncId + "1", wrongId),
equalTo(Engine.SyncedFlushResult.FAILED_COMMIT_MISMATCH));
engine.create(new Engine.Create(null, newUid("2"), doc));
assertThat("should fail to sync flush with right id but pending doc", engine.syncFlushIfNoPendingChanges(syncId + "2", commitID),
@ -1797,5 +1799,4 @@ public class InternalEngineTests extends ElasticsearchTestCase {
recoveredOps.incrementAndGet();
}
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
@ -49,7 +50,7 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest {
final IndexShardRoutingTable shardRoutingTable = flushService.getActiveShardRoutings(shardId, state);
final List<ShardRouting> activeShards = shardRoutingTable.activeShards();
assertEquals("exactly one active shard", 1, activeShards.size());
Map<String, byte[]> commitIds = flushService.sendPreSyncRequests(activeShards, state, shardId);
Map<String, Engine.CommitId> commitIds = flushService.sendPreSyncRequests(activeShards, state, shardId);
assertEquals("exactly one commit id", 1, commitIds.size());
client().prepareIndex("test", "test", "2").setSource("{}").get();
String syncId = Strings.base64UUID();
@ -171,7 +172,7 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest {
final IndexShardRoutingTable shardRoutingTable = flushService.getActiveShardRoutings(shardId, state);
final List<ShardRouting> activeShards = shardRoutingTable.activeShards();
assertEquals("exactly one active shard", 1, activeShards.size());
Map<String, byte[]> commitIds = flushService.sendPreSyncRequests(activeShards, state, shardId);
Map<String, Engine.CommitId> commitIds = flushService.sendPreSyncRequests(activeShards, state, shardId);
assertEquals("exactly one commit id", 1, commitIds.size());
if (randomBoolean()) {
client().prepareIndex("test", "test", "2").setSource("{}").get();
@ -205,7 +206,7 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest {
final IndexShardRoutingTable shardRoutingTable = flushService.getActiveShardRoutings(shardId, state);
final List<ShardRouting> activeShards = shardRoutingTable.activeShards();
assertEquals("exactly one active shard", 1, activeShards.size());
Map<String, byte[]> commitIds = flushService.sendPreSyncRequests(activeShards, state, shardId);
Map<String, Engine.CommitId> commitIds = flushService.sendPreSyncRequests(activeShards, state, shardId);
assertEquals("exactly one commit id", 1, commitIds.size());
commitIds.clear(); // wipe it...
String syncId = Strings.base64UUID();