Cherry pick tests and seqNo recovery hardning from #27580

This commit is contained in:
Boaz Leskes 2017-11-30 15:07:06 +01:00
parent b44ae25c27
commit 1a976ea7a4
11 changed files with 381 additions and 104 deletions

View File

@ -426,11 +426,6 @@ public class InternalEngine extends Engine {
} else if (translog.isCurrent(translogGeneration) == false) {
commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
refreshLastCommittedSegmentInfos();
} else if (lastCommittedSegmentInfos.getUserData().containsKey(HISTORY_UUID_KEY) == false) {
assert historyUUID != null;
// put the history uuid into the index
commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
refreshLastCommittedSegmentInfos();
}
// clean up what's not needed
translog.trimUnreferencedReaders();

View File

@ -48,7 +48,6 @@ import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
@ -66,7 +65,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AsyncIOProcessor;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexModule;
@ -416,12 +414,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
logger.debug("failed to refresh due to move to cluster wide started", e);
}
if (newRouting.primary()) {
final DiscoveryNode recoverySourceNode = recoveryState.getSourceNode();
if (currentRouting.isRelocationTarget() == false || recoverySourceNode.getVersion().before(Version.V_6_0_0_alpha1)) {
// there was no primary context hand-off in < 6.0.0, need to manually activate the shard
getEngine().seqNoService().activatePrimaryMode(getEngine().seqNoService().getLocalCheckpoint());
}
if (newRouting.primary() && currentRouting.isRelocationTarget() == false) {
// there was no primary context hand-off in < 6.0.0, need to manually activate the shard
getEngine().seqNoService().activatePrimaryMode(getEngine().seqNoService().getLocalCheckpoint());
}
changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
@ -485,15 +480,18 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* subsequently fails before the primary/replica re-sync completes successfully and we are now being
* promoted, the local checkpoint tracker here could be left in a state where it would re-issue sequence
* numbers. To ensure that this is not the case, we restore the state of the local checkpoint tracker by
* replaying the translog and marking any operations there are completed. Rolling the translog generation is
* not strictly needed here (as we will never have collisions between sequence numbers in a translog
* generation in a new primary as it takes the last known sequence number as a starting point), but it
* simplifies reasoning about the relationship between primary terms and translog generations.
* replaying the translog and marking any operations there are completed.
*/
getEngine().rollTranslogGeneration();
getEngine().restoreLocalCheckpointFromTranslog();
getEngine().fillSeqNoGaps(newPrimaryTerm);
getEngine().seqNoService().updateLocalCheckpointForShard(currentRouting.allocationId().getId(),
final Engine engine = getEngine();
engine.restoreLocalCheckpointFromTranslog();
/* Rolling the translog generation is not strictly needed here (as we will never have collisions between
* sequence numbers in a translog generation in a new primary as it takes the last known sequence number
* as a starting point), but it simplifies reasoning about the relationship between primary terms and
* translog generations.
*/
engine.rollTranslogGeneration();
engine.fillSeqNoGaps(newPrimaryTerm);
engine.seqNoService().updateLocalCheckpointForShard(currentRouting.allocationId().getId(),
getEngine().seqNoService().getLocalCheckpoint());
primaryReplicaSyncer.accept(this, new ActionListener<ResyncTask>() {
@Override
@ -1337,6 +1335,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
active.set(true);
newEngine.recoverFromTranslog();
}
assertSequenceNumbersInCommit();
}
private boolean assertSequenceNumbersInCommit() throws IOException {
final Map<String, String> userData = SegmentInfos.readLatestCommit(store.directory()).getUserData();
assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint";
assert userData.containsKey(SequenceNumbers.MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number";
assert userData.containsKey(Engine.HISTORY_UUID_KEY) : "commit point doesn't contains a history uuid";
assert userData.get(Engine.HISTORY_UUID_KEY).equals(getHistoryUUID()) : "commit point history uuid ["
+ userData.get(Engine.HISTORY_UUID_KEY) + "] is different than engine [" + getHistoryUUID() + "]";
return true;
}
private boolean assertMaxUnsafeAutoIdInCommit() throws IOException {

View File

@ -149,12 +149,13 @@ public class RecoverySourceHandler {
final Translog translog = shard.getTranslog();
final long startingSeqNo;
final long requiredSeqNoRangeStart;
final boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
isTargetSameHistory() && isTranslogReadyForSequenceNumberBasedRecovery();
if (isSequenceNumberBasedRecoveryPossible) {
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
startingSeqNo = request.startingSeqNo();
requiredSeqNoRangeStart = startingSeqNo;
} else {
final Engine.IndexCommitRef phase1Snapshot;
try {
@ -162,10 +163,12 @@ public class RecoverySourceHandler {
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
}
// we set this to unassigned to create a translog roughly according to the retention policy
// on the target
startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
// we set this to 0 to create a translog roughly according to the retention policy
// on the target. Note that it will still filter out legacy operations with no sequence numbers
startingSeqNo = 0;
// but we must have everything above the local checkpoint in the commit
requiredSeqNoRangeStart =
Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
try {
phase1(phase1Snapshot.getIndexCommit(), translog::totalOperations);
} catch (final Exception e) {
@ -178,6 +181,9 @@ public class RecoverySourceHandler {
}
}
}
assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo;
assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than ["
+ startingSeqNo + "]";
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()));
@ -187,10 +193,19 @@ public class RecoverySourceHandler {
throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e);
}
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
/*
* We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
* operations in the required range will be available for replaying from the translog of the source.
*/
cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo));
logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo);
logger.trace("snapshot translog for recovery; current size is [{}]", translog.estimateTotalOperationsFromMinSeq(startingSeqNo));
final long targetLocalCheckpoint;
try(Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(startingSeqNo)) {
targetLocalCheckpoint = phase2(startingSeqNo, snapshot);
targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot);
} catch (Exception e) {
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
}
@ -224,7 +239,8 @@ public class RecoverySourceHandler {
/**
* Determines if the source translog is ready for a sequence-number-based peer recovery. The main condition here is that the source
* translog contains all operations between the local checkpoint on the target and the current maximum sequence number on the source.
* translog contains all operations above the local checkpoint on the target. We already know the that translog contains or will contain
* all ops above the source local checkpoint, so we can stop check there.
*
* @return {@code true} if the source is ready for a sequence-number-based recovery
* @throws IOException if an I/O exception occurred reading the translog snapshot
@ -232,18 +248,10 @@ public class RecoverySourceHandler {
boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException {
final long startingSeqNo = request.startingSeqNo();
assert startingSeqNo >= 0;
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
logger.trace("testing sequence numbers in range: [{}, {}]", startingSeqNo, endingSeqNo);
final long localCheckpoint = shard.getLocalCheckpoint();
logger.trace("testing sequence numbers in range: [{}, {}]", startingSeqNo, localCheckpoint);
// the start recovery request is initialized with the starting sequence number set to the target shard's local checkpoint plus one
if (startingSeqNo - 1 <= endingSeqNo) {
/*
* We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
* operations in the required range will be available for replaying from the translog of the source.
*/
cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo));
logger.trace("all operations up to [{}] completed, checking translog content", endingSeqNo);
if (startingSeqNo - 1 <= localCheckpoint) {
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
try (Translog.Snapshot snapshot = shard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
Translog.Operation operation;
@ -253,7 +261,7 @@ public class RecoverySourceHandler {
}
}
}
return tracker.getCheckpoint() >= endingSeqNo;
return tracker.getCheckpoint() >= localCheckpoint;
} else {
return false;
}
@ -433,13 +441,15 @@ public class RecoverySourceHandler {
* point-in-time view of the translog). It then sends each translog operation to the target node so it can be replayed into the new
* shard.
*
* @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all
* ops should be sent
* @param snapshot a snapshot of the translog
*
* @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all
* ops should be sent
* @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo)
* @param endingSeqNo the highest sequence number that should be sent
* @param snapshot a snapshot of the translog
* @return the local checkpoint on the target
*/
long phase2(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException {
long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot)
throws IOException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
@ -447,10 +457,11 @@ public class RecoverySourceHandler {
final StopWatch stopWatch = new StopWatch().start();
logger.trace("recovery [phase2]: sending transaction log operations");
logger.trace("recovery [phase2]: sending transaction log operations (seq# from [" + startingSeqNo + "], " +
"required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]");
// send all the snapshot's translog operations to the target
final SendSnapshotResult result = sendSnapshot(startingSeqNo, snapshot);
final SendSnapshotResult result = sendSnapshot(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot);
stopWatch.stop();
logger.trace("recovery [phase2]: took [{}]", stopWatch.totalTime());
@ -511,18 +522,26 @@ public class RecoverySourceHandler {
* <p>
* Operations are bulked into a single request depending on an operation count limit or size-in-bytes limit.
*
* @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent
* @param snapshot the translog snapshot to replay operations from
* @return the local checkpoint on the target and the total number of operations sent
* @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent
* @param requiredSeqNoRangeStart the lower sequence number of the required range
* @param endingSeqNo the upper bound of the sequence number range to be sent (inclusive)
* @param snapshot the translog snapshot to replay operations from @return the local checkpoint on the target and the
* total number of operations sent
* @throws IOException if an I/O exception occurred reading the translog snapshot
*/
protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException {
protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo,
final Translog.Snapshot snapshot) throws IOException {
assert requiredSeqNoRangeStart <= endingSeqNo + 1:
"requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo;
assert startingSeqNo <= requiredSeqNoRangeStart :
"startingSeqNo " + startingSeqNo + " is larger than requiredSeqNoRangeStart " + requiredSeqNoRangeStart;
int ops = 0;
long size = 0;
int skippedOps = 0;
int totalSentOps = 0;
final AtomicLong targetLocalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
final List<Translog.Operation> operations = new ArrayList<>();
final LocalCheckpointTracker requiredOpsTracker = new LocalCheckpointTracker(endingSeqNo, requiredSeqNoRangeStart - 1);
final int expectedTotalOps = snapshot.totalOperations();
if (expectedTotalOps == 0) {
@ -539,12 +558,9 @@ public class RecoverySourceHandler {
throw new IndexShardClosedException(request.shardId());
}
cancellableThreads.checkForCancel();
/*
* If we are doing a sequence-number-based recovery, we have to skip older ops for which no sequence number was assigned, and
* any ops before the starting sequence number.
*/
final long seqNo = operation.seqNo();
if (startingSeqNo >= 0 && (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) {
if (seqNo < startingSeqNo || seqNo > endingSeqNo) {
skippedOps++;
continue;
}
@ -552,6 +568,7 @@ public class RecoverySourceHandler {
ops++;
size += operation.estimateSize();
totalSentOps++;
requiredOpsTracker.markSeqNoAsCompleted(seqNo);
// check if this request is past bytes threshold, and if so, send it off
if (size >= chunkSizeInBytes) {
@ -569,8 +586,14 @@ public class RecoverySourceHandler {
}
assert expectedTotalOps == snapshot.overriddenOperations() + skippedOps + totalSentOps
: String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",
expectedTotalOps, snapshot.overriddenOperations(), skippedOps, totalSentOps);
: String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",
expectedTotalOps, snapshot.overriddenOperations(), skippedOps, totalSentOps);
if (requiredOpsTracker.getCheckpoint() < endingSeqNo) {
throw new IllegalStateException("translog replay failed to cover required sequence numbers" +
" (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is ["
+ (requiredOpsTracker.getCheckpoint() + 1) + "]");
}
logger.trace("sent final batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps);

View File

@ -374,15 +374,15 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
IndexShard newReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId());
CountDownLatch recoveryStart = new CountDownLatch(1);
AtomicBoolean preparedForTranslog = new AtomicBoolean(false);
AtomicBoolean opsSent = new AtomicBoolean(false);
final Future<Void> recoveryFuture = shards.asyncRecoverReplica(newReplica, (indexShard, node) -> {
recoveryStart.countDown();
return new RecoveryTarget(indexShard, node, recoveryListener, l -> {
}) {
@Override
public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
preparedForTranslog.set(true);
super.prepareForTranslogOperations(totalTranslogOps);
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws IOException {
opsSent.set(true);
return super.indexTranslogOperations(operations, totalTranslogOps);
}
};
});
@ -392,7 +392,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
// index some more
docs += shards.indexDocs(randomInt(5));
assertFalse("recovery should wait on pending docs", preparedForTranslog.get());
assertFalse("recovery should wait on pending docs", opsSent.get());
primaryEngineFactory.releaseLatchedIndexers();
pendingDocsDone.await();

View File

@ -70,15 +70,18 @@ import org.elasticsearch.test.CorruptionUtils;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.mockito.ArgumentCaptor;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
@ -88,6 +91,7 @@ import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class RecoverySourceHandlerTests extends ESTestCase {
@ -181,29 +185,68 @@ public class RecoverySourceHandlerTests extends ESTestCase {
operations.add(new Translog.Index(index, new Engine.IndexResult(1, i - initialNumberOfDocs, true)));
}
operations.add(null);
final long startingSeqNo = randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomIntBetween(0, 16);
RecoverySourceHandler.SendSnapshotResult result = handler.sendSnapshot(startingSeqNo, new Translog.Snapshot() {
@Override
public void close() {
final long startingSeqNo = randomIntBetween(0, numberOfDocsWithValidSequenceNumbers - 1);
final long requiredStartingSeqNo = randomIntBetween((int) startingSeqNo, numberOfDocsWithValidSequenceNumbers - 1);
final long endingSeqNo = randomIntBetween((int) requiredStartingSeqNo - 1, numberOfDocsWithValidSequenceNumbers - 1);
RecoverySourceHandler.SendSnapshotResult result = handler.sendSnapshot(startingSeqNo, requiredStartingSeqNo,
endingSeqNo, new Translog.Snapshot() {
@Override
public void close() {
}
}
private int counter = 0;
private int counter = 0;
@Override
public int totalOperations() {
return operations.size() - 1;
}
@Override
public int totalOperations() {
return operations.size() - 1;
}
@Override
public Translog.Operation next() throws IOException {
return operations.get(counter++);
}
});
if (startingSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) {
assertThat(result.totalOperations, equalTo(initialNumberOfDocs + numberOfDocsWithValidSequenceNumbers));
} else {
assertThat(result.totalOperations, equalTo(Math.toIntExact(numberOfDocsWithValidSequenceNumbers - startingSeqNo)));
@Override
public Translog.Operation next() throws IOException {
return operations.get(counter++);
}
});
final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1);
assertThat(result.totalOperations, equalTo(expectedOps));
final ArgumentCaptor<List> shippedOpsCaptor = ArgumentCaptor.forClass(List.class);
verify(recoveryTarget).indexTranslogOperations(shippedOpsCaptor.capture(), ArgumentCaptor.forClass(Integer.class).capture());
List<Translog.Operation> shippedOps = shippedOpsCaptor.getAllValues().stream()
.flatMap(List::stream).map(o -> (Translog.Operation) o).collect(Collectors.toList());
shippedOps.sort(Comparator.comparing(Translog.Operation::seqNo));
assertThat(shippedOps.size(), equalTo(expectedOps));
for (int i = 0; i < shippedOps.size(); i++) {
assertThat(shippedOps.get(i), equalTo(operations.get(i + (int) startingSeqNo + initialNumberOfDocs)));
}
if (endingSeqNo >= requiredStartingSeqNo + 1) {
// check that missing ops blows up
List<Translog.Operation> requiredOps = operations.subList(0, operations.size() - 1).stream() // remove last null marker
.filter(o -> o.seqNo() >= requiredStartingSeqNo && o.seqNo() <= endingSeqNo).collect(Collectors.toList());
List<Translog.Operation> opsToSkip = randomSubsetOf(randomIntBetween(1, requiredOps.size()), requiredOps);
expectThrows(IllegalStateException.class, () ->
handler.sendSnapshot(startingSeqNo, requiredStartingSeqNo,
endingSeqNo, new Translog.Snapshot() {
@Override
public void close() {
}
private int counter = 0;
@Override
public int totalOperations() {
return operations.size() - 1 - opsToSkip.size();
}
@Override
public Translog.Operation next() throws IOException {
Translog.Operation op;
do {
op = operations.get(counter++);
} while (op != null && opsToSkip.contains(op));
return op;
}
}));
}
}
@ -383,7 +426,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
}
@Override
long phase2(long startingSeqNo, Translog.Snapshot snapshot) throws IOException {
long phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot) throws IOException {
phase2Called.set(true);
return SequenceNumbers.UNASSIGNED_SEQ_NO;
}

View File

@ -26,8 +26,10 @@ import org.apache.http.util.EntityUtils;
import org.elasticsearch.Version;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
@ -53,6 +55,8 @@ import java.util.regex.Pattern;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@ -585,6 +589,28 @@ public class FullClusterRestartIT extends ESRestTestCase {
assertThat(toStr(client().performRequest("GET", docLocation)), containsString(doc));
}
/**
* Tests that a single empty shard index is correctly recovered. Empty shards are often an edge case.
*/
public void testEmptyShard() throws IOException {
final String index = "test_empty_shard";
if (runningAgainstOldCluster) {
Settings.Builder settings = Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)
// if the node with the replica is the first to be restarted, while a replica is still recovering
// then delayed allocation will kick in. When the node comes back, the master will search for a copy
// but the recovering copy will be seen as invalid and the cluster health won't return to GREEN
// before timing out
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
.put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0"); // fail faster
createIndex(index, settings.build());
}
ensureGreen(index);
}
/**
* Tests recovery of an index with or without a translog and the
* statistics we gather about that.

View File

@ -25,7 +25,6 @@ import org.elasticsearch.Version;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.seqno.SeqNoStats;
@ -49,14 +48,6 @@ import static org.hamcrest.Matchers.not;
public class IndexingIT extends ESRestTestCase {
private void updateIndexSetting(String name, Settings.Builder settings) throws IOException {
updateIndexSetting(name, settings.build());
}
private void updateIndexSetting(String name, Settings settings) throws IOException {
assertOK(client().performRequest("PUT", name + "/_settings", Collections.emptyMap(),
new StringEntity(Strings.toString(settings), ContentType.APPLICATION_JSON)));
}
private int indexDocs(String index, final int idStart, final int numDocs) throws IOException {
for (int i = 0; i < numDocs; i++) {
final int id = idStart + i;
@ -113,7 +104,7 @@ public class IndexingIT extends ESRestTestCase {
final int finalVersionForDoc1 = indexDocWithConcurrentUpdates(index, 1, nUpdates);
logger.info("allowing shards on all nodes");
updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name"));
ensureGreen();
ensureGreen(index);
assertOK(client().performRequest("POST", index + "/_refresh"));
List<Shard> shards = buildShards(index, nodes, newNodeClient);
Shard primary = buildShards(index, nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get();
@ -138,7 +129,7 @@ public class IndexingIT extends ESRestTestCase {
primary = shards.stream().filter(Shard::isPrimary).findFirst().get();
logger.info("moving primary to new node by excluding {}", primary.getNode().getNodeName());
updateIndexSetting(index, Settings.builder().put("index.routing.allocation.exclude._name", primary.getNode().getNodeName()));
ensureGreen();
ensureGreen(index);
nUpdates = randomIntBetween(minUpdates, maxUpdates);
logger.info("indexing docs with [{}] concurrent updates after moving primary", nUpdates);
final int finalVersionForDoc3 = indexDocWithConcurrentUpdates(index, 3, nUpdates);
@ -151,7 +142,7 @@ public class IndexingIT extends ESRestTestCase {
logger.info("setting number of replicas to 0");
updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 0));
ensureGreen();
ensureGreen(index);
nUpdates = randomIntBetween(minUpdates, maxUpdates);
logger.info("indexing doc with [{}] concurrent updates after setting number of replicas to 0", nUpdates);
final int finalVersionForDoc4 = indexDocWithConcurrentUpdates(index, 4, nUpdates);
@ -164,7 +155,7 @@ public class IndexingIT extends ESRestTestCase {
logger.info("setting number of replicas to 1");
updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 1));
ensureGreen();
ensureGreen(index);
nUpdates = randomIntBetween(minUpdates, maxUpdates);
logger.info("indexing doc with [{}] concurrent updates after setting number of replicas to 1", nUpdates);
final int finalVersionForDoc5 = indexDocWithConcurrentUpdates(index, 5, nUpdates);
@ -199,7 +190,7 @@ public class IndexingIT extends ESRestTestCase {
assertSeqNoOnShards(index, nodes, nodes.getBWCVersion().major >= 6 ? numDocs : 0, newNodeClient);
logger.info("allowing shards on all nodes");
updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name"));
ensureGreen();
ensureGreen(index);
assertOK(client().performRequest("POST", index + "/_refresh"));
for (final String bwcName : bwcNamesList) {
assertCount(index, "_only_nodes:" + bwcName, numDocs);
@ -211,7 +202,7 @@ public class IndexingIT extends ESRestTestCase {
Shard primary = buildShards(index, nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get();
logger.info("moving primary to new node by excluding {}", primary.getNode().getNodeName());
updateIndexSetting(index, Settings.builder().put("index.routing.allocation.exclude._name", primary.getNode().getNodeName()));
ensureGreen();
ensureGreen(index);
int numDocsOnNewPrimary = 0;
final int numberOfDocsAfterMovingPrimary = 1 + randomInt(5);
logger.info("indexing [{}] docs after moving primary", numberOfDocsAfterMovingPrimary);
@ -230,7 +221,7 @@ public class IndexingIT extends ESRestTestCase {
numDocs += numberOfDocsAfterDroppingReplicas;
logger.info("setting number of replicas to 1");
updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 1));
ensureGreen();
ensureGreen(index);
assertOK(client().performRequest("POST", index + "/_refresh"));
for (Shard shard : buildShards(index, nodes, newNodeClient)) {
@ -272,7 +263,7 @@ public class IndexingIT extends ESRestTestCase {
final String index = "test-snapshot-index";
createIndex(index, settings.build());
indexDocs(index, 0, between(50, 100));
ensureGreen();
ensureGreen(index);
assertOK(client().performRequest("POST", index + "/_refresh"));
assertOK(
@ -282,7 +273,7 @@ public class IndexingIT extends ESRestTestCase {
// Allocating shards on all nodes, taking snapshots should happen on all nodes.
updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name"));
ensureGreen();
ensureGreen(index);
assertOK(client().performRequest("POST", index + "/_refresh"));
assertOK(

View File

@ -18,16 +18,30 @@
*/
package org.elasticsearch.upgrades;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Response;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.yaml.ObjectPath;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.function.Predicate;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiOfLength;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.notNullValue;
@ -89,7 +103,7 @@ public class RecoveryIT extends ESRestTestCase {
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms");
createIndex(index, settings.build());
} else if (clusterType == CLUSTER_TYPE.UPGRADED) {
ensureGreen();
ensureGreen(index);
Response response = client().performRequest("GET", index + "/_stats", Collections.singletonMap("level", "shards"));
assertOK(response);
ObjectPath objectPath = ObjectPath.createFromResponse(response);
@ -109,4 +123,156 @@ public class RecoveryIT extends ESRestTestCase {
}
}
private int indexDocs(String index, final int idStart, final int numDocs) throws IOException {
for (int i = 0; i < numDocs; i++) {
final int id = idStart + i;
assertOK(client().performRequest("PUT", index + "/test/" + id, emptyMap(),
new StringEntity("{\"test\": \"test_" + randomAsciiOfLength(2) + "\"}", ContentType.APPLICATION_JSON)));
}
return numDocs;
}
private Future<Void> asyncIndexDocs(String index, final int idStart, final int numDocs) throws IOException {
PlainActionFuture<Void> future = new PlainActionFuture<>();
Thread background = new Thread(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
future.onFailure(e);
}
@Override
protected void doRun() throws Exception {
indexDocs(index, idStart, numDocs);
future.onResponse(null);
}
});
background.start();
return future;
}
public void testRecoveryWithConcurrentIndexing() throws Exception {
final String index = "recovery_with_concurrent_indexing";
Response response = client().performRequest("GET", "_nodes");
ObjectPath objectPath = ObjectPath.createFromResponse(response);
final Map<String, Object> nodeMap = objectPath.evaluate("nodes");
List<String> nodes = new ArrayList<>(nodeMap.keySet());
switch (clusterType) {
case OLD:
Settings.Builder settings = Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)
// if the node with the replica is the first to be restarted, while a replica is still recovering
// then delayed allocation will kick in. When the node comes back, the master will search for a copy
// but the recovering copy will be seen as invalid and the cluster health won't return to GREEN
// before timing out
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
.put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0"); // fail faster
createIndex(index, settings.build());
indexDocs(index, 0, 10);
ensureGreen(index);
// make sure that we can index while the replicas are recovering
updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries"));
break;
case MIXED:
updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), (String)null));
asyncIndexDocs(index, 10, 50).get();
ensureGreen(index);
assertOK(client().performRequest("POST", index + "/_refresh"));
assertCount(index, "_only_nodes:" + nodes.get(0), 60);
assertCount(index, "_only_nodes:" + nodes.get(1), 60);
// make sure that we can index while the replicas are recovering
updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries"));
break;
case UPGRADED:
updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), (String)null));
asyncIndexDocs(index, 60, 50).get();
ensureGreen(index);
assertOK(client().performRequest("POST", index + "/_refresh"));
assertCount(index, "_only_nodes:" + nodes.get(0), 60);
assertCount(index, "_only_nodes:" + nodes.get(1), 60);
break;
default:
throw new IllegalStateException("unknown type " + clusterType);
}
}
private void assertCount(final String index, final String preference, final int expectedCount) throws IOException {
final Response response = client().performRequest("GET", index + "/_count", Collections.singletonMap("preference", preference));
assertOK(response);
final int actualCount = Integer.parseInt(ObjectPath.createFromResponse(response).evaluate("count").toString());
assertThat(actualCount, equalTo(expectedCount));
}
private String getNodeId(Predicate<Version> versionPredicate) throws IOException {
Response response = client().performRequest("GET", "_nodes");
ObjectPath objectPath = ObjectPath.createFromResponse(response);
Map<String, Object> nodesAsMap = objectPath.evaluate("nodes");
for (String id : nodesAsMap.keySet()) {
Version version = Version.fromString(objectPath.evaluate("nodes." + id + ".version"));
if (versionPredicate.test(version)) {
return id;
}
}
return null;
}
public void testRelocationWithConcurrentIndexing() throws Exception {
final String index = "relocation_with_concurrent_indexing";
switch (clusterType) {
case OLD:
Settings.Builder settings = Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)
// if the node with the replica is the first to be restarted, while a replica is still recovering
// then delayed allocation will kick in. When the node comes back, the master will search for a copy
// but the recovering copy will be seen as invalid and the cluster health won't return to GREEN
// before timing out
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
.put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0"); // fail faster
createIndex(index, settings.build());
indexDocs(index, 0, 10);
ensureGreen(index);
// make sure that no shards are allocated, so we can make sure the primary stays on the old node (when one
// node stops, we lose the master too, so a replica will not be promoted)
updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none"));
break;
case MIXED:
final String newNode = getNodeId(v -> v.equals(Version.CURRENT));
final String oldNode = getNodeId(v -> v.before(Version.CURRENT));
// remove the replica now that we know that the primary is an old node
updateIndexSetting(index, Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), (String)null)
.put("index.routing.allocation.include._id", oldNode)
);
updateIndexSetting(index, Settings.builder().put("index.routing.allocation.include._id", newNode));
asyncIndexDocs(index, 10, 50).get();
ensureGreen(index);
assertOK(client().performRequest("POST", index + "/_refresh"));
assertCount(index, "_only_nodes:" + newNode, 60);
break;
case UPGRADED:
updateIndexSetting(index, Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)
.put("index.routing.allocation.include._id", (String)null)
);
asyncIndexDocs(index, 60, 50).get();
ensureGreen(index);
assertOK(client().performRequest("POST", index + "/_refresh"));
Response response = client().performRequest("GET", "_nodes");
ObjectPath objectPath = ObjectPath.createFromResponse(response);
final Map<String, Object> nodeMap = objectPath.evaluate("nodes");
List<String> nodes = new ArrayList<>(nodeMap.keySet());
assertCount(index, "_only_nodes:" + nodes.get(0), 110);
assertCount(index, "_only_nodes:" + nodes.get(1), 110);
break;
default:
throw new IllegalStateException("unknown type " + clusterType);
}
}
}

View File

@ -15,6 +15,16 @@
# allocation will kick in, and the cluster health won't return to GREEN
# before timing out
index.unassigned.node_left.delayed_timeout: "100ms"
- do:
indices.create:
index: empty_index # index to ensure we can recover empty indices
body:
# if the node with the replica is the first to be restarted, then delayed
# allocation will kick in, and the cluster health won't return to GREEN
# before timing out
index.unassigned.node_left.delayed_timeout: "100ms"
- do:
bulk:
refresh: true

View File

@ -7,6 +7,7 @@
# wait for long enough that we give delayed unassigned shards to stop being delayed
timeout: 70s
level: shards
index: test_index,index_with_replicas,empty_index
- do:
search:

View File

@ -392,13 +392,18 @@ public abstract class ESRestTestCase extends ESTestCase {
assertThat(response.getStatusLine().getStatusCode(), anyOf(equalTo(200), equalTo(201)));
}
protected void ensureGreen() throws IOException {
/**
* checks that the specific index is green. we force a selection of an index as the tests share a cluster and often leave indices
* in an non green state
* @param index index to test for
**/
protected void ensureGreen(String index) throws IOException {
Map<String, String> params = new HashMap<>();
params.put("wait_for_status", "green");
params.put("wait_for_no_relocating_shards", "true");
params.put("timeout", "70s");
params.put("level", "shards");
assertOK(client().performRequest("GET", "_cluster/health", params));
assertOK(client().performRequest("GET", "_cluster/health/" + index, params));
}
protected void createIndex(String name, Settings settings) throws IOException {
@ -411,4 +416,12 @@ public abstract class ESRestTestCase extends ESTestCase {
+ ", \"mappings\" : {" + mapping + "} }", ContentType.APPLICATION_JSON)));
}
protected void updateIndexSetting(String index, Settings.Builder settings) throws IOException {
updateIndexSetting(index, settings.build());
}
private void updateIndexSetting(String index, Settings settings) throws IOException {
assertOK(client().performRequest("PUT", index + "/_settings", Collections.emptyMap(),
new StringEntity(Strings.toString(settings), ContentType.APPLICATION_JSON)));
}
}