Recovery: flush immediately after a remote recovery finishes (unless there are ongoing ones)

To properly replicate, we currently stop flushing during recovery so we can repay the translog once copying files are done. Once recovery is done, the translog will be flushed by a background thread that, by default, kicks in every 5s. In case of a recovery failure and a quick re-assignment of a new shard copy, we may fail to flush before starting a new recovery, causing it to deal with potentially even longer translog. This commit makes sure we flush immediately when the ongoing recovery count goes to 0.

I also added a simple recovery benchmark.

Closes #9439
This commit is contained in:
Boaz Leskes 2015-01-26 23:03:16 +01:00
parent 13ef7d73b9
commit 22a576d5ba
5 changed files with 283 additions and 22 deletions

View File

@ -204,7 +204,10 @@ public class XContentHelper {
*/ */
public static String toString(ToXContent toXContent, Params params) { public static String toString(ToXContent toXContent, Params params) {
try { try {
XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); XContentBuilder builder = XContentFactory.jsonBuilder();
if (params.paramAsBoolean("pretty", true)) {
builder.prettyPrint();
}
builder.startObject(); builder.startObject();
toXContent.toXContent(builder, params); toXContent.toXContent(builder, params);
builder.endObject(); builder.endObject();

View File

@ -20,13 +20,15 @@
package org.elasticsearch.index.engine.internal; package org.elasticsearch.index.engine.internal;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.lucene.index.*; import org.apache.lucene.index.*;
import org.apache.lucene.index.IndexWriter.IndexReaderWarmer; import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
import org.apache.lucene.search.*; import org.apache.lucene.search.*;
import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.*; import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.Accountables;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
@ -55,6 +57,7 @@ import org.elasticsearch.index.merge.policy.ElasticsearchMergePolicy;
import org.elasticsearch.index.merge.policy.MergePolicyProvider; import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider; import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.search.nested.IncludeNestedDocsQuery; import org.elasticsearch.index.search.nested.IncludeNestedDocsQuery;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
@ -1524,8 +1527,19 @@ public class InternalEngine implements Engine {
void endRecovery() throws ElasticsearchException { void endRecovery() throws ElasticsearchException {
store.decRef(); store.decRef();
onGoingRecoveries.decrementAndGet(); int left = onGoingRecoveries.decrementAndGet();
assert onGoingRecoveries.get() >= 0 : "ongoingRecoveries must be >= 0 but was: " + onGoingRecoveries.get(); assert onGoingRecoveries.get() >= 0 : "ongoingRecoveries must be >= 0 but was: " + onGoingRecoveries.get();
if (left == 0) {
try {
flush(FlushType.COMMIT_TRANSLOG, false, false);
} catch (IllegalIndexShardStateException e) {
// we are being closed, or in created state, ignore
} catch (FlushNotAllowedEngineException e) {
// ignore this exception, we are not allowed to perform flush
} catch (Throwable e) {
logger.warn("failed to flush shard post recovery", e);
}
}
} }
@Override @Override

View File

@ -0,0 +1,194 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.benchmark.recovery;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.common.jna.Natives;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.transport.TransportModule;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
/**
*
*/
public class ReplicaRecoveryBenchmark {
private static final String INDEX_NAME = "index";
private static final String TYPE_NAME = "type";
static int DOC_COUNT = (int) SizeValue.parseSizeValue("40k").singles();
static int CONCURRENT_INDEXERS = 2;
public static void main(String[] args) throws Exception {
System.setProperty("es.logger.prefix", "");
Natives.tryMlockall();
Settings settings = settingsBuilder()
.put("gateway.type", "local")
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, "false")
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 0)
.put(TransportModule.TRANSPORT_TYPE_KEY, "local")
.build();
String clusterName = ReplicaRecoveryBenchmark.class.getSimpleName();
Node node1 = nodeBuilder().clusterName(clusterName)
.settings(settingsBuilder().put(settings))
.node();
final ESLogger logger = ESLoggerFactory.getLogger("benchmark");
final Client client1 = node1.client();
client1.admin().cluster().prepareUpdateSettings().setPersistentSettings("logger.indices.recovery: TRACE").get();
final BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, TYPE_NAME, client1, 0, CONCURRENT_INDEXERS, false, new Random());
indexer.setMinFieldSize(10);
indexer.setMaxFieldSize(150);
try {
client1.admin().indices().prepareDelete(INDEX_NAME).get();
} catch (IndexMissingException e) {
}
client1.admin().indices().prepareCreate(INDEX_NAME).get();
indexer.start(DOC_COUNT / 2);
while (indexer.totalIndexedDocs() < DOC_COUNT / 2) {
Thread.sleep(5000);
logger.info("--> indexed {} of {}", indexer.totalIndexedDocs(), DOC_COUNT);
}
client1.admin().indices().prepareFlush().get();
indexer.continueIndexing(DOC_COUNT / 2);
while (indexer.totalIndexedDocs() < DOC_COUNT) {
Thread.sleep(5000);
logger.info("--> indexed {} of {}", indexer.totalIndexedDocs(), DOC_COUNT);
}
logger.info("--> starting another node and allocating a shard on it");
Node node2 = nodeBuilder().clusterName(clusterName)
.settings(settingsBuilder().put(settings))
.node();
client1.admin().indices().prepareUpdateSettings(INDEX_NAME).setSettings(IndexMetaData.SETTING_NUMBER_OF_REPLICAS + ": 1").get();
final AtomicBoolean end = new AtomicBoolean(false);
final Thread backgroundLogger = new Thread(new Runnable() {
long lastTime = System.currentTimeMillis();
long lastDocs = indexer.totalIndexedDocs();
long lastBytes = 0;
long lastTranslogOps = 0;
@Override
public void run() {
while (true) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
if (end.get()) {
return;
}
long currentTime = System.currentTimeMillis();
long currentDocs = indexer.totalIndexedDocs();
RecoveryResponse recoveryResponse = client1.admin().indices().prepareRecoveries(INDEX_NAME).setActiveOnly(true).get();
List<ShardRecoveryResponse> indexRecoveries = recoveryResponse.shardResponses().get(INDEX_NAME);
long translogOps;
long bytes;
if (indexRecoveries.size() > 0) {
translogOps = indexRecoveries.get(0).recoveryState().getTranslog().currentTranslogOperations();
bytes = recoveryResponse.shardResponses().get(INDEX_NAME).get(0).recoveryState().getIndex().recoveredByteCount();
} else {
bytes = lastBytes = 0;
translogOps = lastTranslogOps = 0;
}
float seconds = (currentTime - lastTime) / 1000.0F;
logger.info("--> indexed [{}];[{}] doc/s, recovered [{}] MB/s , translog ops [{}]/s ",
currentDocs, (currentDocs - lastDocs) / seconds,
(bytes - lastBytes) / 1024.0F / 1024F / seconds, (translogOps - lastTranslogOps) / seconds);
lastBytes = bytes;
lastTranslogOps = translogOps;
lastTime = currentTime;
lastDocs = currentDocs;
}
}
});
backgroundLogger.start();
client1.admin().cluster().prepareHealth().setWaitForGreenStatus().get();
logger.info("--> green. starting relocation cycles");
long startDocIndexed = indexer.totalIndexedDocs();
indexer.continueIndexing(DOC_COUNT * 50);
long totalRecoveryTime = 0;
long startTime = System.currentTimeMillis();
for (int iteration = 0; iteration < 3; iteration++) {
logger.info("--> removing replicas");
client1.admin().indices().prepareUpdateSettings(INDEX_NAME).setSettings(IndexMetaData.SETTING_NUMBER_OF_REPLICAS + ": 0").get();
logger.info("--> adding replica again");
long recoveryStart = System.currentTimeMillis();
client1.admin().indices().prepareUpdateSettings(INDEX_NAME).setSettings(IndexMetaData.SETTING_NUMBER_OF_REPLICAS + ": 1").get();
client1.admin().cluster().prepareHealth(INDEX_NAME).setWaitForGreenStatus().setTimeout("15m").get();
long recoveryTime = System.currentTimeMillis() - recoveryStart;
totalRecoveryTime += recoveryTime;
logger.info("--> recovery done in [{}]", new TimeValue(recoveryTime));
// sleep some to let things clean up
Thread.sleep(10000);
}
long endDocIndexed = indexer.totalIndexedDocs();
long totalTime = System.currentTimeMillis() - startTime;
indexer.stop();
end.set(true);
backgroundLogger.interrupt();
backgroundLogger.join();
logger.info("average doc/s [{}], average relocation time [{}]", (endDocIndexed - startDocIndexed) * 1000.0 / totalTime, new TimeValue(totalRecoveryTime / 3));
client1.close();
node1.close();
node2.close();
}
}

View File

@ -52,13 +52,10 @@ import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.*; import org.elasticsearch.index.engine.*;
import org.elasticsearch.index.indexing.ShardIndexingService; import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService; import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.DocumentMapperParser;
import org.elasticsearch.index.mapper.ParseContext.Document; import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.internal.SourceFieldMapper; import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.mapper.object.RootObjectMapper;
import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider; import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider;
import org.elasticsearch.index.merge.policy.MergePolicyProvider; import org.elasticsearch.index.merge.policy.MergePolicyProvider;
@ -89,13 +86,16 @@ import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static com.carrotsearch.randomizedtesting.RandomizedTest.*; import static com.carrotsearch.randomizedtesting.RandomizedTest.between;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomBoolean;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomDouble;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomIntBetween;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomLong;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS; import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.elasticsearch.test.ElasticsearchTestCase.awaitBusy; import static org.elasticsearch.test.ElasticsearchTestCase.*;
import static org.elasticsearch.test.ElasticsearchTestCase.randomFrom; import static org.elasticsearch.test.ElasticsearchTestCase.randomFrom;
import static org.elasticsearch.test.ElasticsearchTestCase.terminate;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
public class InternalEngineTests extends ElasticsearchLuceneTestCase { public class InternalEngineTests extends ElasticsearchLuceneTestCase {
@ -108,6 +108,9 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
private Store store; private Store store;
private Store storeReplica; private Store storeReplica;
protected Translog translog;
protected Translog replicaTranslog;
protected Engine engine; protected Engine engine;
protected Engine replicaEngine; protected Engine replicaEngine;
@ -131,12 +134,14 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
storeReplica = createStore(); storeReplica = createStore();
storeReplica.deleteContent(); storeReplica.deleteContent();
engineSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build()); engineSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
engine = createEngine(engineSettingsService, store, createTranslog()); translog = createTranslog();
engine = createEngine(engineSettingsService, store, translog);
if (randomBoolean()) { if (randomBoolean()) {
((InternalEngine)engine).config().setEnableGcDeletes(false); ((InternalEngine)engine).config().setEnableGcDeletes(false);
} }
replicaSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build()); replicaSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
replicaEngine = createEngine(replicaSettingsService, storeReplica, createTranslogReplica()); replicaTranslog = createTranslogReplica();
replicaEngine = createEngine(replicaSettingsService, storeReplica, replicaTranslog);
if (randomBoolean()) { if (randomBoolean()) {
((InternalEngine)engine).config().setEnableGcDeletes(false); ((InternalEngine)engine).config().setEnableGcDeletes(false);
@ -742,7 +747,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
@Test @Test
public void testSimpleRecover() throws Exception { public void testSimpleRecover() throws Exception {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, false); final ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, false);
engine.create(new Engine.Create(null, analyzer, newUid("1"), doc)); engine.create(new Engine.Create(null, analyzer, newUid("1"), doc));
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false); engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
@ -751,7 +756,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
public void phase1(SnapshotIndexCommit snapshot) throws EngineException { public void phase1(SnapshotIndexCommit snapshot) throws EngineException {
try { try {
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false); engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
assertThat("flush is not allowed in phase 3", false, equalTo(true)); assertThat("flush is not allowed in phase 1", false, equalTo(true));
} catch (FlushNotAllowedEngineException e) { } catch (FlushNotAllowedEngineException e) {
// all is well // all is well
} }
@ -762,15 +767,18 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
MatcherAssert.assertThat(snapshot, TranslogSizeMatcher.translogSize(0)); MatcherAssert.assertThat(snapshot, TranslogSizeMatcher.translogSize(0));
try { try {
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false); engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
assertThat("flush is not allowed in phase 3", false, equalTo(true)); assertThat("flush is not allowed in phase 2", false, equalTo(true));
} catch (FlushNotAllowedEngineException e) { } catch (FlushNotAllowedEngineException e) {
// all is well // all is well
} }
// but we can index
engine.index(new Engine.Index(null, analyzer, newUid("1"), doc));
} }
@Override @Override
public void phase3(Translog.Snapshot snapshot) throws EngineException { public void phase3(Translog.Snapshot snapshot) throws EngineException {
MatcherAssert.assertThat(snapshot, TranslogSizeMatcher.translogSize(0)); MatcherAssert.assertThat(snapshot, TranslogSizeMatcher.translogSize(1));
try { try {
// we can do this here since we are on the same thread // we can do this here since we are on the same thread
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false); engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
@ -780,6 +788,8 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
} }
} }
}); });
// post recovery should flush the translog
MatcherAssert.assertThat(translog.snapshot(), TranslogSizeMatcher.translogSize(0));
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false); engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.close(); engine.close();

View File

@ -18,6 +18,8 @@ package org.elasticsearch.test;/*
*/ */
import com.carrotsearch.randomizedtesting.RandomizedTest; import com.carrotsearch.randomizedtesting.RandomizedTest;
import com.carrotsearch.randomizedtesting.generators.RandomInts;
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkRequestBuilder;
@ -25,8 +27,12 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.junit.Assert; import org.junit.Assert;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
@ -51,6 +57,9 @@ public class BackgroundIndexer implements AutoCloseable {
final AtomicBoolean hasBudget = new AtomicBoolean(false); // when set to true, writers will acquire writes from a semaphore final AtomicBoolean hasBudget = new AtomicBoolean(false); // when set to true, writers will acquire writes from a semaphore
final Semaphore availableBudget = new Semaphore(0); final Semaphore availableBudget = new Semaphore(0);
volatile int minFieldSize = 10;
volatile int maxFieldSize = 140;
/** /**
* Start indexing in the background using a random number of threads. * Start indexing in the background using a random number of threads.
* *
@ -86,7 +95,7 @@ public class BackgroundIndexer implements AutoCloseable {
* @param writerCount number of indexing threads to use * @param writerCount number of indexing threads to use
*/ */
public BackgroundIndexer(String index, String type, Client client, int numOfDocs, final int writerCount) { public BackgroundIndexer(String index, String type, Client client, int numOfDocs, final int writerCount) {
this(index, type, client, numOfDocs, writerCount, true); this(index, type, client, numOfDocs, writerCount, true, null);
} }
/** /**
@ -99,16 +108,22 @@ public class BackgroundIndexer implements AutoCloseable {
* @param numOfDocs number of document to index before pausing. Set to -1 to have no limit. * @param numOfDocs number of document to index before pausing. Set to -1 to have no limit.
* @param writerCount number of indexing threads to use * @param writerCount number of indexing threads to use
* @param autoStart set to true to start indexing as soon as all threads have been created. * @param autoStart set to true to start indexing as soon as all threads have been created.
* @param random random instance to use
*/ */
public BackgroundIndexer(final String index, final String type, final Client client, final int numOfDocs, final int writerCount, boolean autoStart) { public BackgroundIndexer(final String index, final String type, final Client client, final int numOfDocs, final int writerCount,
boolean autoStart, Random random) {
if (random == null) {
random = RandomizedTest.getRandom();
}
failures = new CopyOnWriteArrayList<>(); failures = new CopyOnWriteArrayList<>();
writers = new Thread[writerCount]; writers = new Thread[writerCount];
stopLatch = new CountDownLatch(writers.length); stopLatch = new CountDownLatch(writers.length);
logger.info("--> creating {} indexing threads (auto start: [{}], numOfDocs: [{}])", writerCount, autoStart, numOfDocs); logger.info("--> creating {} indexing threads (auto start: [{}], numOfDocs: [{}])", writerCount, autoStart, numOfDocs);
for (int i = 0; i < writers.length; i++) { for (int i = 0; i < writers.length; i++) {
final int indexerId = i; final int indexerId = i;
final boolean batch = RandomizedTest.getRandom().nextBoolean(); final boolean batch = random.nextBoolean();
final Random threadRandom = new Random(random.nextLong());
writers[i] = new Thread() { writers[i] = new Thread() {
@Override @Override
public void run() { public void run() {
@ -118,7 +133,7 @@ public class BackgroundIndexer implements AutoCloseable {
logger.info("**** starting indexing thread {}", indexerId); logger.info("**** starting indexing thread {}", indexerId);
while (!stop.get()) { while (!stop.get()) {
if (batch) { if (batch) {
int batchSize = RandomizedTest.getRandom().nextInt(20) + 1; int batchSize = threadRandom.nextInt(20) + 1;
if (hasBudget.get()) { if (hasBudget.get()) {
batchSize = Math.max(Math.min(batchSize, availableBudget.availablePermits()), 1);// always try to get at least one batchSize = Math.max(Math.min(batchSize, availableBudget.availablePermits()), 1);// always try to get at least one
if (!availableBudget.tryAcquire(batchSize, 250, TimeUnit.MILLISECONDS)) { if (!availableBudget.tryAcquire(batchSize, 250, TimeUnit.MILLISECONDS)) {
@ -130,7 +145,7 @@ public class BackgroundIndexer implements AutoCloseable {
BulkRequestBuilder bulkRequest = client.prepareBulk(); BulkRequestBuilder bulkRequest = client.prepareBulk();
for (int i = 0; i < batchSize; i++) { for (int i = 0; i < batchSize; i++) {
id = idGenerator.incrementAndGet(); id = idGenerator.incrementAndGet();
bulkRequest.add(client.prepareIndex(index, type, Long.toString(id)).setSource("test", "value" + id)); bulkRequest.add(client.prepareIndex(index, type, Long.toString(id)).setSource(generateSource(id, threadRandom)));
} }
BulkResponse bulkResponse = bulkRequest.get(); BulkResponse bulkResponse = bulkRequest.get();
for (BulkItemResponse bulkItemResponse : bulkResponse) { for (BulkItemResponse bulkItemResponse : bulkResponse) {
@ -149,7 +164,7 @@ public class BackgroundIndexer implements AutoCloseable {
continue; continue;
} }
id = idGenerator.incrementAndGet(); id = idGenerator.incrementAndGet();
client.prepareIndex(index, type, Long.toString(id) + "-" + indexerId).setSource("test", "value" + id).get(); client.prepareIndex(index, type, Long.toString(id) + "-" + indexerId).setSource(generateSource(id, threadRandom)).get();
indexCounter.incrementAndGet(); indexCounter.incrementAndGet();
} }
} }
@ -170,6 +185,21 @@ public class BackgroundIndexer implements AutoCloseable {
} }
} }
private XContentBuilder generateSource(long id, Random random) throws IOException {
int contentLength = RandomInts.randomIntBetween(random, minFieldSize, maxFieldSize);
StringBuilder text = new StringBuilder(contentLength);
while (text.length() < contentLength) {
int tokenLength = RandomInts.randomIntBetween(random, 1, Math.min(contentLength - text.length(), 10));
text.append(" ").append(RandomStrings.randomRealisticUnicodeOfCodepointLength(random, tokenLength));
}
XContentBuilder builder = XContentFactory.smileBuilder();
builder.startObject().field("test", "value" + id)
.field("text", text.toString())
.endObject();
return builder;
}
private void setBudget(int numOfDocs) { private void setBudget(int numOfDocs) {
logger.debug("updating budget to [{}]", numOfDocs); logger.debug("updating budget to [{}]", numOfDocs);
if (numOfDocs >= 0) { if (numOfDocs >= 0) {
@ -239,6 +269,16 @@ public class BackgroundIndexer implements AutoCloseable {
Assert.assertThat(failures, emptyIterable()); Assert.assertThat(failures, emptyIterable());
} }
/** the minimum size in code points of a payload field in the indexed documents */
public void setMinFieldSize(int fieldSize) {
minFieldSize = fieldSize;
}
/** the minimum size in code points of a payload field in the indexed documents */
public void setMaxFieldSize(int fieldSize) {
maxFieldSize = fieldSize;
}
@Override @Override
public void close() throws Exception { public void close() throws Exception {
stop(); stop();