Fix file reading in ccr restore service (#38117)

Currently we use the raw byte array length when calling the IndexInput
read call to determine how many bytes we want to read. However, due to
how BigArrays works, the array length might be longer than the reference
length. This commit fixes the issue and uses the BytesRef length when
calling read. Additionally, it expands the index follow test to index
many more documents. These documents should potentially lead to large
enough segment files to trigger scenarios where this fix matters.
This commit is contained in:
Tim Brooks 2019-01-31 18:02:24 -07:00 committed by GitHub
parent c67a9663af
commit 291c4e7a0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 26 additions and 34 deletions

View File

@ -235,8 +235,7 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen
BytesRefIterator refIterator = reference.iterator();
BytesRef ref;
while ((ref = refIterator.next()) != null) {
byte[] refBytes = ref.bytes;
indexInput.readBytes(refBytes, 0, refBytes.length);
indexInput.readBytes(ref.bytes, ref.offset, ref.length);
}
long offsetAfterRead = indexInput.getFilePointer();

View File

@ -70,7 +70,6 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.nio.MockNioTransportPlugin;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.LocalStateCcr;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
@ -551,27 +550,6 @@ public abstract class CcrIntegTestCase extends ESTestCase {
});
}
protected void assertTotalNumberOfOptimizedIndexing(Index followerIndex, int numberOfShards, long expectedTotal) throws Exception {
assertBusy(() -> {
long[] numOfOptimizedOps = new long[numberOfShards];
for (int shardId = 0; shardId < numberOfShards; shardId++) {
for (String node : getFollowerCluster().nodesInclude(followerIndex.getName())) {
IndicesService indicesService = getFollowerCluster().getInstance(IndicesService.class, node);
IndexShard shard = indicesService.getShardOrNull(new ShardId(followerIndex, shardId));
if (shard != null && shard.routingEntry().primary()) {
try {
FollowingEngine engine = ((FollowingEngine) IndexShardTestCase.getEngine(shard));
numOfOptimizedOps[shardId] = engine.getNumberOfOptimizedIndexing();
} catch (AlreadyClosedException e) {
throw new AssertionError(e); // causes assertBusy to retry
}
}
}
}
assertThat(Arrays.stream(numOfOptimizedOps).sum(), equalTo(expectedTotal));
});
}
static void removeCCRRelatedMetadataFromClusterState(ClusterService clusterService) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("remove-ccr-related-metadata", new ClusterStateUpdateTask() {

View File

@ -26,6 +26,7 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
@ -101,9 +102,30 @@ public class IndexFollowingIT extends CcrIntegTestCase {
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderYellow("index1");
final int firstBatchNumDocs = randomIntBetween(2, 64);
final int firstBatchNumDocs;
// Sometimes we want to index a lot of documents to ensure that the recovery works with larger files
if (rarely()) {
firstBatchNumDocs = randomIntBetween(1800, 2000);
} else {
firstBatchNumDocs = randomIntBetween(10, 64);
}
final int flushPoint = (int) (firstBatchNumDocs * 0.75);
logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs);
for (int i = 0; i < firstBatchNumDocs; i++) {
BulkRequestBuilder bulkRequestBuilder = leaderClient().prepareBulk();
for (int i = 0; i < flushPoint; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
IndexRequest indexRequest = new IndexRequest("index1", "doc", Integer.toString(i))
.source(source, XContentType.JSON)
.timeout(TimeValue.timeValueSeconds(1));
bulkRequestBuilder.add(indexRequest);
}
bulkRequestBuilder.get();
leaderClient().admin().indices().prepareFlush("index1").setWaitIfOngoing(true).get();
// Index some docs after the flush that might be recovered in the normal index following operations
for (int i = flushPoint; i < firstBatchNumDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
}
@ -147,7 +169,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
for (int i = 0; i < firstBatchNumDocs; i++) {
assertBusy(assertExpectedDocumentRunnable(i));
}
assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), numberOfPrimaryShards, firstBatchNumDocs);
pauseFollow("index2");
followerClient().execute(ResumeFollowAction.INSTANCE, followRequest.getFollowRequest()).get();
final int secondBatchNumDocs = randomIntBetween(2, 64);
@ -172,8 +194,6 @@ public class IndexFollowingIT extends CcrIntegTestCase {
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
assertBusy(assertExpectedDocumentRunnable(i));
}
assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), numberOfPrimaryShards,
firstBatchNumDocs + secondBatchNumDocs);
pauseFollow("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfPrimaryShards);
}
@ -287,7 +307,6 @@ public class IndexFollowingIT extends CcrIntegTestCase {
for (int i = 0; i < firstBatchNumDocs; i++) {
assertBusy(assertExpectedDocumentRunnable(i));
}
assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), numberOfPrimaryShards, firstBatchNumDocs);
pauseFollow("index2");
}
@ -432,8 +451,6 @@ public class IndexFollowingIT extends CcrIntegTestCase {
assertIndexFullyReplicatedToFollower("index1", "index2");
pauseFollow("index2");
leaderClient().admin().indices().prepareRefresh("index1").get();
assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), numberOfShards,
leaderClient().prepareSearch("index1").get().getHits().getTotalHits().value);
assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfShards);
}
@ -475,7 +492,6 @@ public class IndexFollowingIT extends CcrIntegTestCase {
}
pauseFollow("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), 1);
assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), 1, numDocs);
}
public void testUnfollowNonExistingIndex() {
@ -538,7 +554,6 @@ public class IndexFollowingIT extends CcrIntegTestCase {
}
pauseFollow("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), 1);
assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), 1, numDocs);
}
public void testAttemptToChangeCcrFollowingIndexSetting() throws Exception {