Avoid losing ops in file-based recovery

When a primary is relocated from an old node to a new node, it can have
ops in its translog that do not have a sequence number assigned. When a
file-based recovery is started, this can lead to skipping these ops when
replaying the translog due to a bug in the recovery logic. This commit
addresses this bug and adds a test in the BWC tests.

Relates #22945
This commit is contained in:
Jason Tedor 2017-02-03 08:11:57 -05:00 committed by GitHub
parent fb8bdbc57a
commit 6e9940283b
3 changed files with 130 additions and 11 deletions

View File

@ -497,8 +497,10 @@ public class RecoverySourceHandler {
throw new IndexShardClosedException(request.shardId());
}
cancellableThreads.checkForCancel();
// we have to send older ops for which no sequence number was assigned, and any ops after the starting sequence number
if (operation.seqNo() == SequenceNumbersService.UNASSIGNED_SEQ_NO || operation.seqNo() < startingSeqNo) continue;
// if we are doing a sequence-number-based recovery, we have to skip older ops for which no sequence number was assigned, and
// any ops before the starting sequence number
final long seqNo = operation.seqNo();
if (startingSeqNo >= 0 && (seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) continue;
operations.add(operation);
ops++;
size += operation.estimateSize();

View File

@ -20,6 +20,7 @@ package org.elasticsearch.indices.recovery;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.CorruptIndexException;
@ -27,6 +28,7 @@ import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.store.BaseDirectoryWrapper;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
@ -35,13 +37,23 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexShard;
@ -60,6 +72,7 @@ import org.elasticsearch.test.IndexSettingsModule;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@ -136,6 +149,72 @@ public class RecoverySourceHandlerTests extends ESTestCase {
IOUtils.close(reader, store, targetStore);
}
public void testSendSnapshotSendsOps() throws IOException {
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
final int fileChunkSizeInBytes = recoverySettings.getChunkSize().bytesAsInt();
final long startingSeqNo = randomBoolean() ? SequenceNumbersService.UNASSIGNED_SEQ_NO : randomIntBetween(0, 16);
final StartRecoveryRequest request = new StartRecoveryRequest(
shardId,
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
null,
randomBoolean(),
randomNonNegativeLong(),
randomBoolean() ? SequenceNumbersService.UNASSIGNED_SEQ_NO : randomNonNegativeLong());
final IndexShard shard = mock(IndexShard.class);
when(shard.state()).thenReturn(IndexShardState.STARTED);
final RecoveryTargetHandler recoveryTarget = mock(RecoveryTargetHandler.class);
final RecoverySourceHandler handler =
new RecoverySourceHandler(shard, recoveryTarget, request, () -> 0L, e -> () -> {}, fileChunkSizeInBytes, Settings.EMPTY);
final List<Translog.Operation> operations = new ArrayList<>();
final int initialNumberOfDocs = randomIntBetween(16, 64);
for (int i = 0; i < initialNumberOfDocs; i++) {
final Engine.Index index = getIndex(Integer.toString(i));
operations.add(new Translog.Index(index, new Engine.IndexResult(1, SequenceNumbersService.UNASSIGNED_SEQ_NO, true)));
}
final int numberOfDocsWithValidSequenceNumbers = randomIntBetween(16, 64);
for (int i = initialNumberOfDocs; i < initialNumberOfDocs + numberOfDocsWithValidSequenceNumbers; i++) {
final Engine.Index index = getIndex(Integer.toString(i));
operations.add(new Translog.Index(index, new Engine.IndexResult(1, i - initialNumberOfDocs, true)));
}
operations.add(null);
int totalOperations = handler.sendSnapshot(startingSeqNo, new Translog.Snapshot() {
private int counter = 0;
@Override
public int totalOperations() {
return operations.size() - 1;
}
@Override
public Translog.Operation next() throws IOException {
return operations.get(counter++);
}
});
if (startingSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
assertThat(totalOperations, equalTo(initialNumberOfDocs + numberOfDocsWithValidSequenceNumbers));
} else {
assertThat(totalOperations, equalTo(Math.toIntExact(numberOfDocsWithValidSequenceNumbers - startingSeqNo)));
}
}
private Engine.Index getIndex(final String id) {
final String type = "test";
final ParseContext.Document document = new ParseContext.Document();
document.add(new TextField("test", "test", Field.Store.YES));
final Field uidField = new Field("_uid", Uid.createUid(type, id), UidFieldMapper.Defaults.FIELD_TYPE);
final Field versionField = new NumericDocValuesField("_version", Versions.MATCH_ANY);
final SeqNoFieldMapper.SequenceID seqID = SeqNoFieldMapper.SequenceID.emptySeqID();
document.add(uidField);
document.add(versionField);
document.add(seqID.seqNo);
document.add(seqID.seqNoDocValue);
document.add(seqID.primaryTerm);
final BytesReference source = new BytesArray(new byte[] { 1 });
final ParsedDocument doc = new ParsedDocument(versionField, seqID, id, type, null, Arrays.asList(document), source, null);
return new Engine.Index(new Term("_uid", doc.uid()), doc);
}
public void testHandleCorruptedIndexOnSendSendFiles() throws Throwable {
Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1).
put("indices.recovery.concurrent_small_file_streams", 1).build();

View File

@ -28,7 +28,9 @@ import org.elasticsearch.client.RestClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbersService;
@ -36,6 +38,7 @@ import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.yaml.ObjectPath;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
@ -95,7 +98,8 @@ public class IndexingIT extends ESRestTestCase {
Nodes nodes = buildNodeAndVersions();
assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty());
logger.info("cluster discovered: {}", nodes.toString());
final String bwcNames = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.joining(","));
final List<String> bwcNamesList = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.toList());
final String bwcNames = bwcNamesList.stream().collect(Collectors.joining(","));
Settings.Builder settings = Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2)
@ -111,26 +115,60 @@ public class IndexingIT extends ESRestTestCase {
createIndex(index, settings.build());
try (RestClient newNodeClient = buildClient(restClientSettings(),
nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
int numDocs = indexDocs(index, 0, randomInt(5));
int numDocs = 0;
final int numberOfInitialDocs = 1 + randomInt(5);
logger.info("indexing [{}] docs initially", numberOfInitialDocs);
numDocs += indexDocs(index, 0, numberOfInitialDocs);
assertSeqNoOnShards(nodes, checkGlobalCheckpoints, 0, newNodeClient);
logger.info("allowing shards on all nodes");
updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name"));
ensureGreen();
logger.info("indexing some more docs");
numDocs += indexDocs(index, numDocs, randomInt(5));
assertOK(client().performRequest("POST", index + "/_refresh"));
for (final String bwcName : bwcNamesList) {
assertCount(index, "_only_nodes:" + bwcName, numDocs);
}
final int numberOfDocsAfterAllowingShardsOnAllNodes = 1 + randomInt(5);
logger.info("indexing [{}] docs after allowing shards on all nodes", numberOfDocsAfterAllowingShardsOnAllNodes);
numDocs += indexDocs(index, numDocs, numberOfDocsAfterAllowingShardsOnAllNodes);
assertSeqNoOnShards(nodes, checkGlobalCheckpoints, 0, newNodeClient);
logger.info("moving primary to new node");
Shard primary = buildShards(nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get();
logger.info("moving primary to new node by excluding {}", primary.getNode().getNodeName());
updateIndexSetting(index, Settings.builder().put("index.routing.allocation.exclude._name", primary.getNode().getNodeName()));
ensureGreen();
logger.info("indexing some more docs");
int numDocsOnNewPrimary = indexDocs(index, numDocs, randomInt(5));
numDocs += numDocsOnNewPrimary;
int numDocsOnNewPrimary = 0;
final int numberOfDocsAfterMovingPrimary = 1 + randomInt(5);
logger.info("indexing [{}] docs after moving primary", numberOfDocsAfterMovingPrimary);
numDocsOnNewPrimary += indexDocs(index, numDocs, numberOfDocsAfterMovingPrimary);
numDocs += numberOfDocsAfterMovingPrimary;
assertSeqNoOnShards(nodes, checkGlobalCheckpoints, numDocsOnNewPrimary, newNodeClient);
/*
* Dropping the number of replicas to zero, and then increasing it to one triggers a recovery thus exercising any BWC-logic in
* the recovery code.
*/
logger.info("setting number of replicas to 0");
updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 0));
final int numberOfDocsAfterDroppingReplicas = 1 + randomInt(5);
logger.info("indexing [{}] docs after setting number of replicas to 0", numberOfDocsAfterDroppingReplicas);
numDocsOnNewPrimary += indexDocs(index, numDocs, numberOfDocsAfterDroppingReplicas);
numDocs += numberOfDocsAfterDroppingReplicas;
logger.info("setting number of replicas to 1");
updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 1));
ensureGreen();
assertOK(client().performRequest("POST", index + "/_refresh"));
// the number of documents on the primary and on the recovered replica should match the number of indexed documents
assertCount(index, "_primary", numDocs);
assertCount(index, "_replica", numDocs);
assertSeqNoOnShards(nodes, checkGlobalCheckpoints, numDocsOnNewPrimary, newNodeClient);
}
}
private void assertCount(final String index, final String preference, final int expectedCount) throws IOException {
final Response response = client().performRequest("GET", index + "/_count", Collections.singletonMap("preference", preference));
assertOK(response);
final int actualCount = Integer.parseInt(objectPath(response).evaluate("count").toString());
assertThat(actualCount, equalTo(expectedCount));
}
private void assertSeqNoOnShards(Nodes nodes, boolean checkGlobalCheckpoints, int numDocs, RestClient client) throws Exception {
assertBusy(() -> {
try {