Synced-flush should not seal index of out of sync replicas (#28464)

Today the correctness of synced-flush is guaranteed by ensuring that 
there is no ongoing indexing operations on the primary. Unfortunately, a
replica might fall out of sync with the primary even the condition is
met. Moreover, if synced-flush mistakenly issues a sync_id for an out of
sync replica, then that replica would not be able to recover from the 
primary. ES prevents that peer-recovery because it detects that both
indexes from primary and replica were sealed with the same sync_id but
have a different content. This commit modifies the synced-flush to not
issue sync_id for out of sync replicas. This change will report the
divergence issue earlier to users and also prevent replicas from getting
into the "unrecoverable" state.

Relates #10032
This commit is contained in:
Nhat Nguyen 2018-02-02 11:20:38 -05:00 committed by GitHub
parent 075fdc579f
commit 5f2121960e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 145 additions and 40 deletions

View File

@ -69,6 +69,13 @@ public final class CommitStats implements Streamable, ToXContentFragment {
return id; return id;
} }
/**
* A raw version of the commit id (see {@link SegmentInfos#getId()}
*/
public Engine.CommitId getRawCommitId() {
return new Engine.CommitId(Base64.getDecoder().decode(id));
}
/** /**
* Returns the number of documents in the in this commit * Returns the number of documents in the in this commit
*/ */

View File

@ -21,6 +21,7 @@ package org.elasticsearch.indices.flush;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier; import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse; import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
@ -44,6 +45,7 @@ import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
@ -199,10 +201,10 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
return; return;
} }
final ActionListener<Map<String, Engine.CommitId>> commitIdsListener = new ActionListener<Map<String, Engine.CommitId>>() { final ActionListener<Map<String, PreSyncedFlushResponse>> presyncListener = new ActionListener<Map<String, PreSyncedFlushResponse>>() {
@Override @Override
public void onResponse(final Map<String, Engine.CommitId> commitIds) { public void onResponse(final Map<String, PreSyncedFlushResponse> presyncResponses) {
if (commitIds.isEmpty()) { if (presyncResponses.isEmpty()) {
actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "all shards failed to commit on pre-sync")); actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "all shards failed to commit on pre-sync"));
return; return;
} }
@ -216,7 +218,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
} else { } else {
// 3. now send the sync request to all the shards // 3. now send the sync request to all the shards
String syncId = UUIDs.randomBase64UUID(); String syncId = UUIDs.randomBase64UUID();
sendSyncRequests(syncId, activeShards, state, commitIds, shardId, totalShards, actionListener); sendSyncRequests(syncId, activeShards, state, presyncResponses, shardId, totalShards, actionListener);
} }
} }
@ -236,7 +238,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
}; };
// 1. send pre-sync flushes to all replicas // 1. send pre-sync flushes to all replicas
sendPreSyncRequests(activeShards, state, shardId, commitIdsListener); sendPreSyncRequests(activeShards, state, shardId, presyncListener);
} catch (Exception e) { } catch (Exception e) {
actionListener.onFailure(e); actionListener.onFailure(e);
} }
@ -299,28 +301,49 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
} }
} }
private int numDocsOnPrimary(List<ShardRouting> shards, Map<String, PreSyncedFlushResponse> preSyncResponses) {
for (ShardRouting shard : shards) {
if (shard.primary()) {
final PreSyncedFlushResponse resp = preSyncResponses.get(shard.currentNodeId());
if (resp != null) {
return resp.numDocs;
}
}
}
return PreSyncedFlushResponse.UNKNOWN_NUM_DOCS;
}
void sendSyncRequests(final String syncId, final List<ShardRouting> shards, ClusterState state, Map<String, Engine.CommitId> expectedCommitIds, void sendSyncRequests(final String syncId, final List<ShardRouting> shards, ClusterState state, Map<String, PreSyncedFlushResponse> preSyncResponses,
final ShardId shardId, final int totalShards, final ActionListener<ShardsSyncedFlushResult> listener) { final ShardId shardId, final int totalShards, final ActionListener<ShardsSyncedFlushResult> listener) {
final CountDown countDown = new CountDown(shards.size()); final CountDown countDown = new CountDown(shards.size());
final Map<ShardRouting, ShardSyncedFlushResponse> results = ConcurrentCollections.newConcurrentMap(); final Map<ShardRouting, ShardSyncedFlushResponse> results = ConcurrentCollections.newConcurrentMap();
final int numDocsOnPrimary = numDocsOnPrimary(shards, preSyncResponses);
for (final ShardRouting shard : shards) { for (final ShardRouting shard : shards) {
final DiscoveryNode node = state.nodes().get(shard.currentNodeId()); final DiscoveryNode node = state.nodes().get(shard.currentNodeId());
if (node == null) { if (node == null) {
logger.trace("{} is assigned to an unknown node. skipping for sync id [{}]. shard routing {}", shardId, syncId, shard); logger.trace("{} is assigned to an unknown node. skipping for sync id [{}]. shard routing {}", shardId, syncId, shard);
results.put(shard, new ShardSyncedFlushResponse("unknown node")); results.put(shard, new ShardSyncedFlushResponse("unknown node"));
contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results); countDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
continue; continue;
} }
final Engine.CommitId expectedCommitId = expectedCommitIds.get(shard.currentNodeId()); final PreSyncedFlushResponse preSyncedResponse = preSyncResponses.get(shard.currentNodeId());
if (expectedCommitId == null) { if (preSyncedResponse == null) {
logger.trace("{} can't resolve expected commit id for current node, skipping for sync id [{}]. shard routing {}", shardId, syncId, shard); logger.trace("{} can't resolve expected commit id for current node, skipping for sync id [{}]. shard routing {}", shardId, syncId, shard);
results.put(shard, new ShardSyncedFlushResponse("no commit id from pre-sync flush")); results.put(shard, new ShardSyncedFlushResponse("no commit id from pre-sync flush"));
contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results); countDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
continue;
}
if (preSyncedResponse.numDocs != numDocsOnPrimary
&& preSyncedResponse.numDocs != PreSyncedFlushResponse.UNKNOWN_NUM_DOCS && numDocsOnPrimary != PreSyncedFlushResponse.UNKNOWN_NUM_DOCS) {
logger.warn("{} can't to issue sync id [{}] for out of sync replica [{}] with num docs [{}]; num docs on primary [{}]",
shardId, syncId, shard, preSyncedResponse.numDocs, numDocsOnPrimary);
results.put(shard, new ShardSyncedFlushResponse("out of sync replica; " +
"num docs on replica [" + preSyncedResponse.numDocs + "]; num docs on primary [" + numDocsOnPrimary + "]"));
countDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
continue; continue;
} }
logger.trace("{} sending synced flush request to {}. sync id [{}].", shardId, shard, syncId); logger.trace("{} sending synced flush request to {}. sync id [{}].", shardId, shard, syncId);
transportService.sendRequest(node, SYNCED_FLUSH_ACTION_NAME, new ShardSyncedFlushRequest(shard.shardId(), syncId, expectedCommitId), transportService.sendRequest(node, SYNCED_FLUSH_ACTION_NAME, new ShardSyncedFlushRequest(shard.shardId(), syncId, preSyncedResponse.commitId),
new TransportResponseHandler<ShardSyncedFlushResponse>() { new TransportResponseHandler<ShardSyncedFlushResponse>() {
@Override @Override
public ShardSyncedFlushResponse newInstance() { public ShardSyncedFlushResponse newInstance() {
@ -332,14 +355,14 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
ShardSyncedFlushResponse existing = results.put(shard, response); ShardSyncedFlushResponse existing = results.put(shard, response);
assert existing == null : "got two answers for node [" + node + "]"; assert existing == null : "got two answers for node [" + node + "]";
// count after the assert so we won't decrement twice in handleException // count after the assert so we won't decrement twice in handleException
contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results); countDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
} }
@Override @Override
public void handleException(TransportException exp) { public void handleException(TransportException exp) {
logger.trace((Supplier<?>) () -> new ParameterizedMessage("{} error while performing synced flush on [{}], skipping", shardId, shard), exp); logger.trace((Supplier<?>) () -> new ParameterizedMessage("{} error while performing synced flush on [{}], skipping", shardId, shard), exp);
results.put(shard, new ShardSyncedFlushResponse(exp.getMessage())); results.put(shard, new ShardSyncedFlushResponse(exp.getMessage()));
contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results); countDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
} }
@Override @Override
@ -351,7 +374,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
} }
private void contDownAndSendResponseIfDone(String syncId, List<ShardRouting> shards, ShardId shardId, int totalShards, private void countDownAndSendResponseIfDone(String syncId, List<ShardRouting> shards, ShardId shardId, int totalShards,
ActionListener<ShardsSyncedFlushResult> listener, CountDown countDown, Map<ShardRouting, ShardSyncedFlushResponse> results) { ActionListener<ShardsSyncedFlushResult> listener, CountDown countDown, Map<ShardRouting, ShardSyncedFlushResponse> results) {
if (countDown.countDown()) { if (countDown.countDown()) {
assert results.size() == shards.size(); assert results.size() == shards.size();
@ -362,16 +385,16 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
/** /**
* send presync requests to all started copies of the given shard * send presync requests to all started copies of the given shard
*/ */
void sendPreSyncRequests(final List<ShardRouting> shards, final ClusterState state, final ShardId shardId, final ActionListener<Map<String, Engine.CommitId>> listener) { void sendPreSyncRequests(final List<ShardRouting> shards, final ClusterState state, final ShardId shardId, final ActionListener<Map<String, PreSyncedFlushResponse>> listener) {
final CountDown countDown = new CountDown(shards.size()); final CountDown countDown = new CountDown(shards.size());
final ConcurrentMap<String, Engine.CommitId> commitIds = ConcurrentCollections.newConcurrentMap(); final ConcurrentMap<String, PreSyncedFlushResponse> presyncResponses = ConcurrentCollections.newConcurrentMap();
for (final ShardRouting shard : shards) { for (final ShardRouting shard : shards) {
logger.trace("{} sending pre-synced flush request to {}", shardId, shard); logger.trace("{} sending pre-synced flush request to {}", shardId, shard);
final DiscoveryNode node = state.nodes().get(shard.currentNodeId()); final DiscoveryNode node = state.nodes().get(shard.currentNodeId());
if (node == null) { if (node == null) {
logger.trace("{} shard routing {} refers to an unknown node. skipping.", shardId, shard); logger.trace("{} shard routing {} refers to an unknown node. skipping.", shardId, shard);
if (countDown.countDown()) { if (countDown.countDown()) {
listener.onResponse(commitIds); listener.onResponse(presyncResponses);
} }
continue; continue;
} }
@ -383,11 +406,11 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
@Override @Override
public void handleResponse(PreSyncedFlushResponse response) { public void handleResponse(PreSyncedFlushResponse response) {
Engine.CommitId existing = commitIds.putIfAbsent(node.getId(), response.commitId()); PreSyncedFlushResponse existing = presyncResponses.putIfAbsent(node.getId(), response);
assert existing == null : "got two answers for node [" + node + "]"; assert existing == null : "got two answers for node [" + node + "]";
// count after the assert so we won't decrement twice in handleException // count after the assert so we won't decrement twice in handleException
if (countDown.countDown()) { if (countDown.countDown()) {
listener.onResponse(commitIds); listener.onResponse(presyncResponses);
} }
} }
@ -395,7 +418,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
public void handleException(TransportException exp) { public void handleException(TransportException exp) {
logger.trace((Supplier<?>) () -> new ParameterizedMessage("{} error while performing pre synced flush on [{}], skipping", shardId, shard), exp); logger.trace((Supplier<?>) () -> new ParameterizedMessage("{} error while performing pre synced flush on [{}], skipping", shardId, shard), exp);
if (countDown.countDown()) { if (countDown.countDown()) {
listener.onResponse(commitIds); listener.onResponse(presyncResponses);
} }
} }
@ -411,9 +434,11 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id()); IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
FlushRequest flushRequest = new FlushRequest().force(false).waitIfOngoing(true); FlushRequest flushRequest = new FlushRequest().force(false).waitIfOngoing(true);
logger.trace("{} performing pre sync flush", request.shardId()); logger.trace("{} performing pre sync flush", request.shardId());
Engine.CommitId commitId = indexShard.flush(flushRequest); indexShard.flush(flushRequest);
logger.trace("{} pre sync flush done. commit id {}", request.shardId(), commitId); final CommitStats commitStats = indexShard.commitStats();
return new PreSyncedFlushResponse(commitId); final Engine.CommitId commitId = commitStats.getRawCommitId();
logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, commitStats.getNumDocs());
return new PreSyncedFlushResponse(commitId, commitStats.getNumDocs());
} }
private ShardSyncedFlushResponse performSyncedFlush(ShardSyncedFlushRequest request) { private ShardSyncedFlushResponse performSyncedFlush(ShardSyncedFlushRequest request) {
@ -483,30 +508,45 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
* Response for first step of synced flush (flush) for one shard copy * Response for first step of synced flush (flush) for one shard copy
*/ */
static final class PreSyncedFlushResponse extends TransportResponse { static final class PreSyncedFlushResponse extends TransportResponse {
static final int UNKNOWN_NUM_DOCS = -1;
Engine.CommitId commitId; Engine.CommitId commitId;
int numDocs;
PreSyncedFlushResponse() { PreSyncedFlushResponse() {
} }
PreSyncedFlushResponse(Engine.CommitId commitId) { PreSyncedFlushResponse(Engine.CommitId commitId, int numDocs) {
this.commitId = commitId; this.commitId = commitId;
this.numDocs = numDocs;
} }
public Engine.CommitId commitId() { Engine.CommitId commitId() {
return commitId; return commitId;
} }
int numDocs() {
return numDocs;
}
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
commitId = new Engine.CommitId(in); commitId = new Engine.CommitId(in);
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
numDocs = in.readInt();
} else {
numDocs = UNKNOWN_NUM_DOCS;
}
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
commitId.writeTo(out); commitId.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeInt(numDocs);
}
} }
} }

View File

@ -602,9 +602,10 @@ public class InternalEngineTests extends EngineTestCase {
globalCheckpoint.set(rarely() || localCheckpoint.get() == SequenceNumbers.NO_OPS_PERFORMED ? globalCheckpoint.set(rarely() || localCheckpoint.get() == SequenceNumbers.NO_OPS_PERFORMED ?
SequenceNumbers.UNASSIGNED_SEQ_NO : randomIntBetween(0, (int) localCheckpoint.get())); SequenceNumbers.UNASSIGNED_SEQ_NO : randomIntBetween(0, (int) localCheckpoint.get()));
engine.flush(true, true); final Engine.CommitId commitId = engine.flush(true, true);
CommitStats stats2 = engine.commitStats(); CommitStats stats2 = engine.commitStats();
assertThat(stats2.getRawCommitId(), equalTo(commitId));
assertThat(stats2.getGeneration(), greaterThan(stats1.getGeneration())); assertThat(stats2.getGeneration(), greaterThan(stats1.getGeneration()));
assertThat(stats2.getId(), notNullValue()); assertThat(stats2.getId(), notNullValue());
assertThat(stats2.getId(), not(equalTo(stats1.getId()))); assertThat(stats2.getId(), not(equalTo(stats1.getId())));

View File

@ -18,6 +18,7 @@
*/ */
package org.elasticsearch.indices.flush; package org.elasticsearch.indices.flush;
import org.apache.lucene.index.Term;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushResponse; import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse; import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
@ -35,7 +36,13 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.InternalEngineTests;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import java.io.IOException; import java.io.IOException;
@ -47,9 +54,12 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
public class FlushIT extends ESIntegTestCase { public class FlushIT extends ESIntegTestCase {
public void testWaitIfOngoing() throws InterruptedException { public void testWaitIfOngoing() throws InterruptedException {
@ -224,4 +234,50 @@ public class FlushIT extends ESIntegTestCase {
assertThat(shardsResult.size(), equalTo(numShards)); assertThat(shardsResult.size(), equalTo(numShards));
assertThat(shardsResult.get(0).failureReason(), equalTo("no active shards")); assertThat(shardsResult.get(0).failureReason(), equalTo("no active shards"));
} }
private void indexDoc(Engine engine, String id) throws IOException {
final ParsedDocument doc = InternalEngineTests.createParsedDoc(id, null);
final Engine.IndexResult indexResult = engine.index(new Engine.Index(new Term("_id", Uid.encodeId(doc.id())), doc));
assertThat(indexResult.getFailure(), nullValue());
}
public void testSyncedFlushSkipOutOfSyncReplicas() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(between(2, 3));
final int numberOfReplicas = internalCluster().numDataNodes() - 1;
assertAcked(
prepareCreate("test").setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)).get()
);
ensureGreen();
final Index index = clusterService().state().metaData().index("test").getIndex();
final ShardId shardId = new ShardId(index, 0);
final int numDocs = between(1, 10);
for (int i = 0; i < numDocs; i++) {
index("test", "doc", Integer.toString(i));
}
final List<IndexShard> indexShards = internalCluster().nodesInclude("test").stream()
.map(node -> internalCluster().getInstance(IndicesService.class, node).getShardOrNull(shardId))
.collect(Collectors.toList());
// Index extra documents to one replica - synced-flush should fail on that replica.
final IndexShard outOfSyncReplica = randomValueOtherThanMany(s -> s.routingEntry().primary(), () -> randomFrom(indexShards));
final int extraDocs = between(1, 10);
for (int i = 0; i < extraDocs; i++) {
indexDoc(IndexShardTestCase.getEngine(outOfSyncReplica), "extra_" + i);
}
final ShardsSyncedFlushResult partialResult = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
assertThat(partialResult.totalShards(), equalTo(numberOfReplicas + 1));
assertThat(partialResult.successfulShards(), equalTo(numberOfReplicas));
assertThat(partialResult.shardResponses().get(outOfSyncReplica.routingEntry()).failureReason, equalTo(
"out of sync replica; num docs on replica [" + (numDocs + extraDocs) + "]; num docs on primary [" + numDocs + "]"));
// Index extra documents to all shards - synced-flush should be ok.
for (IndexShard indexShard : indexShards) {
for (int i = 0; i < extraDocs; i++) {
indexDoc(IndexShardTestCase.getEngine(indexShard), "extra_" + i);
}
}
final ShardsSyncedFlushResult fullResult = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
assertThat(fullResult.totalShards(), equalTo(numberOfReplicas + 1));
assertThat(fullResult.successfulShards(), equalTo(numberOfReplicas + 1));
}
} }

View File

@ -38,6 +38,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase { public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase {
@ -53,12 +54,12 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase {
final IndexShardRoutingTable shardRoutingTable = flushService.getShardRoutingTable(shardId, state); final IndexShardRoutingTable shardRoutingTable = flushService.getShardRoutingTable(shardId, state);
final List<ShardRouting> activeShards = shardRoutingTable.activeShards(); final List<ShardRouting> activeShards = shardRoutingTable.activeShards();
assertEquals("exactly one active shard", 1, activeShards.size()); assertEquals("exactly one active shard", 1, activeShards.size());
Map<String, Engine.CommitId> commitIds = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); Map<String, SyncedFlushService.PreSyncedFlushResponse> preSyncedResponses = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId);
assertEquals("exactly one commit id", 1, commitIds.size()); assertEquals("exactly one commit id", 1, preSyncedResponses.size());
client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON).get(); client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON).get();
String syncId = UUIDs.randomBase64UUID(); String syncId = UUIDs.randomBase64UUID();
SyncedFlushUtil.LatchedListener<ShardsSyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener<>(); SyncedFlushUtil.LatchedListener<ShardsSyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener<>();
flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, shardRoutingTable.size(), listener); flushService.sendSyncRequests(syncId, activeShards, state, preSyncedResponses, shardId, shardRoutingTable.size(), listener);
listener.latch.await(); listener.latch.await();
assertNull(listener.error); assertNull(listener.error);
ShardsSyncedFlushResult syncedFlushResult = listener.result; ShardsSyncedFlushResult syncedFlushResult = listener.result;
@ -72,7 +73,7 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase {
SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); // pull another commit and make sure we can't sync-flush with the old one SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); // pull another commit and make sure we can't sync-flush with the old one
listener = new SyncedFlushUtil.LatchedListener(); listener = new SyncedFlushUtil.LatchedListener();
flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, shardRoutingTable.size(), listener); flushService.sendSyncRequests(syncId, activeShards, state, preSyncedResponses, shardId, shardRoutingTable.size(), listener);
listener.latch.await(); listener.latch.await();
assertNull(listener.error); assertNull(listener.error);
syncedFlushResult = listener.result; syncedFlushResult = listener.result;
@ -172,15 +173,15 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase {
final IndexShardRoutingTable shardRoutingTable = flushService.getShardRoutingTable(shardId, state); final IndexShardRoutingTable shardRoutingTable = flushService.getShardRoutingTable(shardId, state);
final List<ShardRouting> activeShards = shardRoutingTable.activeShards(); final List<ShardRouting> activeShards = shardRoutingTable.activeShards();
assertEquals("exactly one active shard", 1, activeShards.size()); assertEquals("exactly one active shard", 1, activeShards.size());
Map<String, Engine.CommitId> commitIds = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); Map<String, SyncedFlushService.PreSyncedFlushResponse> preSyncedResponses = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId);
assertEquals("exactly one commit id", 1, commitIds.size()); assertEquals("exactly one commit id", 1, preSyncedResponses.size());
if (randomBoolean()) { if (randomBoolean()) {
client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON).get(); client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON).get();
} }
client().admin().indices().prepareFlush("test").setForce(true).get(); client().admin().indices().prepareFlush("test").setForce(true).get();
String syncId = UUIDs.randomBase64UUID(); String syncId = UUIDs.randomBase64UUID();
final SyncedFlushUtil.LatchedListener<ShardsSyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener(); final SyncedFlushUtil.LatchedListener<ShardsSyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener();
flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, shardRoutingTable.size(), listener); flushService.sendSyncRequests(syncId, activeShards, state, preSyncedResponses, shardId, shardRoutingTable.size(), listener);
listener.latch.await(); listener.latch.await();
assertNull(listener.error); assertNull(listener.error);
ShardsSyncedFlushResult syncedFlushResult = listener.result; ShardsSyncedFlushResult syncedFlushResult = listener.result;
@ -205,12 +206,12 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase {
final IndexShardRoutingTable shardRoutingTable = flushService.getShardRoutingTable(shardId, state); final IndexShardRoutingTable shardRoutingTable = flushService.getShardRoutingTable(shardId, state);
final List<ShardRouting> activeShards = shardRoutingTable.activeShards(); final List<ShardRouting> activeShards = shardRoutingTable.activeShards();
assertEquals("exactly one active shard", 1, activeShards.size()); assertEquals("exactly one active shard", 1, activeShards.size());
Map<String, Engine.CommitId> commitIds = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); Map<String, SyncedFlushService.PreSyncedFlushResponse> preSyncedResponses = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId);
assertEquals("exactly one commit id", 1, commitIds.size()); assertEquals("exactly one commit id", 1, preSyncedResponses.size());
commitIds.clear(); // wipe it... preSyncedResponses.clear(); // wipe it...
String syncId = UUIDs.randomBase64UUID(); String syncId = UUIDs.randomBase64UUID();
SyncedFlushUtil.LatchedListener<ShardsSyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener(); SyncedFlushUtil.LatchedListener<ShardsSyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener();
flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, shardRoutingTable.size(), listener); flushService.sendSyncRequests(syncId, activeShards, state, preSyncedResponses, shardId, shardRoutingTable.size(), listener);
listener.latch.await(); listener.latch.await();
assertNull(listener.error); assertNull(listener.error);
ShardsSyncedFlushResult syncedFlushResult = listener.result; ShardsSyncedFlushResult syncedFlushResult = listener.result;

View File

@ -76,8 +76,8 @@ public class SyncedFlushUtil {
/** /**
* Blocking version of {@link SyncedFlushService#sendPreSyncRequests(List, ClusterState, ShardId, ActionListener)} * Blocking version of {@link SyncedFlushService#sendPreSyncRequests(List, ClusterState, ShardId, ActionListener)}
*/ */
public static Map<String, Engine.CommitId> sendPreSyncRequests(SyncedFlushService service, List<ShardRouting> activeShards, ClusterState state, ShardId shardId) { public static Map<String, SyncedFlushService.PreSyncedFlushResponse> sendPreSyncRequests(SyncedFlushService service, List<ShardRouting> activeShards, ClusterState state, ShardId shardId) {
LatchedListener<Map<String, Engine.CommitId>> listener = new LatchedListener<>(); LatchedListener<Map<String, SyncedFlushService.PreSyncedFlushResponse>> listener = new LatchedListener<>();
service.sendPreSyncRequests(activeShards, state, shardId, listener); service.sendPreSyncRequests(activeShards, state, shardId, listener);
try { try {
listener.latch.await(); listener.latch.await();