diff --git a/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java b/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java index dec0fc8f082..79d0c7a7063 100644 --- a/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java +++ b/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java @@ -204,7 +204,10 @@ public class XContentHelper { */ public static String toString(ToXContent toXContent, Params params) { try { - XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); + XContentBuilder builder = XContentFactory.jsonBuilder(); + if (params.paramAsBoolean("pretty", true)) { + builder.prettyPrint(); + } builder.startObject(); toXContent.toXContent(builder, params); builder.endObject(); diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java index 02860063115..29821df4ebd 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java @@ -20,13 +20,15 @@ package org.elasticsearch.index.engine.internal; import com.google.common.collect.Lists; - import org.apache.lucene.index.*; import org.apache.lucene.index.IndexWriter.IndexReaderWarmer; import org.apache.lucene.search.*; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.LockObtainFailedException; -import org.apache.lucene.util.*; +import org.apache.lucene.util.Accountable; +import org.apache.lucene.util.Accountables; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ExceptionsHelper; @@ -55,6 +57,7 @@ import org.elasticsearch.index.merge.policy.ElasticsearchMergePolicy; import org.elasticsearch.index.merge.policy.MergePolicyProvider; import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider; import org.elasticsearch.index.search.nested.IncludeNestedDocsQuery; +import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; @@ -1524,8 +1527,19 @@ public class InternalEngine implements Engine { void endRecovery() throws ElasticsearchException { store.decRef(); - onGoingRecoveries.decrementAndGet(); + int left = onGoingRecoveries.decrementAndGet(); assert onGoingRecoveries.get() >= 0 : "ongoingRecoveries must be >= 0 but was: " + onGoingRecoveries.get(); + if (left == 0) { + try { + flush(FlushType.COMMIT_TRANSLOG, false, false); + } catch (IllegalIndexShardStateException e) { + // we are being closed, or in created state, ignore + } catch (FlushNotAllowedEngineException e) { + // ignore this exception, we are not allowed to perform flush + } catch (Throwable e) { + logger.warn("failed to flush shard post recovery", e); + } + } } @Override diff --git a/src/test/java/org/elasticsearch/benchmark/recovery/ReplicaRecoveryBenchmark.java b/src/test/java/org/elasticsearch/benchmark/recovery/ReplicaRecoveryBenchmark.java new file mode 100644 index 00000000000..8b78e999fd8 --- /dev/null +++ b/src/test/java/org/elasticsearch/benchmark/recovery/ReplicaRecoveryBenchmark.java @@ -0,0 +1,194 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.benchmark.recovery; + +import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; +import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; +import org.elasticsearch.common.jna.Natives; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.ESLoggerFactory; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.SizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.indices.IndexMissingException; +import org.elasticsearch.node.Node; +import org.elasticsearch.test.BackgroundIndexer; +import org.elasticsearch.transport.TransportModule; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.elasticsearch.node.NodeBuilder.nodeBuilder; + +/** + * + */ +public class ReplicaRecoveryBenchmark { + + private static final String INDEX_NAME = "index"; + private static final String TYPE_NAME = "type"; + + + static int DOC_COUNT = (int) SizeValue.parseSizeValue("40k").singles(); + static int CONCURRENT_INDEXERS = 2; + + public static void main(String[] args) throws Exception { + System.setProperty("es.logger.prefix", ""); + Natives.tryMlockall(); + + Settings settings = settingsBuilder() + .put("gateway.type", "local") + .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, "false") + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 0) + .put(TransportModule.TRANSPORT_TYPE_KEY, "local") + .build(); + + String clusterName = ReplicaRecoveryBenchmark.class.getSimpleName(); + Node node1 = nodeBuilder().clusterName(clusterName) + .settings(settingsBuilder().put(settings)) + .node(); + + final ESLogger logger = ESLoggerFactory.getLogger("benchmark"); + + final Client client1 = node1.client(); + client1.admin().cluster().prepareUpdateSettings().setPersistentSettings("logger.indices.recovery: TRACE").get(); + final BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, TYPE_NAME, client1, 0, CONCURRENT_INDEXERS, false, new Random()); + indexer.setMinFieldSize(10); + indexer.setMaxFieldSize(150); + try { + client1.admin().indices().prepareDelete(INDEX_NAME).get(); + } catch (IndexMissingException e) { + } + client1.admin().indices().prepareCreate(INDEX_NAME).get(); + indexer.start(DOC_COUNT / 2); + while (indexer.totalIndexedDocs() < DOC_COUNT / 2) { + Thread.sleep(5000); + logger.info("--> indexed {} of {}", indexer.totalIndexedDocs(), DOC_COUNT); + } + client1.admin().indices().prepareFlush().get(); + indexer.continueIndexing(DOC_COUNT / 2); + while (indexer.totalIndexedDocs() < DOC_COUNT) { + Thread.sleep(5000); + logger.info("--> indexed {} of {}", indexer.totalIndexedDocs(), DOC_COUNT); + } + + + logger.info("--> starting another node and allocating a shard on it"); + + Node node2 = nodeBuilder().clusterName(clusterName) + .settings(settingsBuilder().put(settings)) + .node(); + + client1.admin().indices().prepareUpdateSettings(INDEX_NAME).setSettings(IndexMetaData.SETTING_NUMBER_OF_REPLICAS + ": 1").get(); + + final AtomicBoolean end = new AtomicBoolean(false); + + final Thread backgroundLogger = new Thread(new Runnable() { + + long lastTime = System.currentTimeMillis(); + long lastDocs = indexer.totalIndexedDocs(); + long lastBytes = 0; + long lastTranslogOps = 0; + + @Override + public void run() { + while (true) { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + + } + if (end.get()) { + return; + } + long currentTime = System.currentTimeMillis(); + long currentDocs = indexer.totalIndexedDocs(); + RecoveryResponse recoveryResponse = client1.admin().indices().prepareRecoveries(INDEX_NAME).setActiveOnly(true).get(); + List indexRecoveries = recoveryResponse.shardResponses().get(INDEX_NAME); + long translogOps; + long bytes; + if (indexRecoveries.size() > 0) { + translogOps = indexRecoveries.get(0).recoveryState().getTranslog().currentTranslogOperations(); + bytes = recoveryResponse.shardResponses().get(INDEX_NAME).get(0).recoveryState().getIndex().recoveredByteCount(); + } else { + bytes = lastBytes = 0; + translogOps = lastTranslogOps = 0; + } + float seconds = (currentTime - lastTime) / 1000.0F; + logger.info("--> indexed [{}];[{}] doc/s, recovered [{}] MB/s , translog ops [{}]/s ", + currentDocs, (currentDocs - lastDocs) / seconds, + (bytes - lastBytes) / 1024.0F / 1024F / seconds, (translogOps - lastTranslogOps) / seconds); + lastBytes = bytes; + lastTranslogOps = translogOps; + lastTime = currentTime; + lastDocs = currentDocs; + } + } + }); + + backgroundLogger.start(); + + client1.admin().cluster().prepareHealth().setWaitForGreenStatus().get(); + + logger.info("--> green. starting relocation cycles"); + + long startDocIndexed = indexer.totalIndexedDocs(); + indexer.continueIndexing(DOC_COUNT * 50); + + long totalRecoveryTime = 0; + long startTime = System.currentTimeMillis(); + for (int iteration = 0; iteration < 3; iteration++) { + logger.info("--> removing replicas"); + client1.admin().indices().prepareUpdateSettings(INDEX_NAME).setSettings(IndexMetaData.SETTING_NUMBER_OF_REPLICAS + ": 0").get(); + logger.info("--> adding replica again"); + long recoveryStart = System.currentTimeMillis(); + client1.admin().indices().prepareUpdateSettings(INDEX_NAME).setSettings(IndexMetaData.SETTING_NUMBER_OF_REPLICAS + ": 1").get(); + client1.admin().cluster().prepareHealth(INDEX_NAME).setWaitForGreenStatus().setTimeout("15m").get(); + long recoveryTime = System.currentTimeMillis() - recoveryStart; + totalRecoveryTime += recoveryTime; + logger.info("--> recovery done in [{}]", new TimeValue(recoveryTime)); + // sleep some to let things clean up + Thread.sleep(10000); + } + + long endDocIndexed = indexer.totalIndexedDocs(); + long totalTime = System.currentTimeMillis() - startTime; + indexer.stop(); + + end.set(true); + + backgroundLogger.interrupt(); + + backgroundLogger.join(); + + logger.info("average doc/s [{}], average relocation time [{}]", (endDocIndexed - startDocIndexed) * 1000.0 / totalTime, new TimeValue(totalRecoveryTime / 3)); + + client1.close(); + node1.close(); + node2.close(); + } +} diff --git a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java index 266794cf4c7..a1a73ce8b98 100644 --- a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java @@ -52,13 +52,10 @@ import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.engine.*; import org.elasticsearch.index.indexing.ShardIndexingService; import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService; -import org.elasticsearch.index.mapper.DocumentMapper; -import org.elasticsearch.index.mapper.DocumentMapperParser; import org.elasticsearch.index.mapper.ParseContext.Document; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.internal.SourceFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper; -import org.elasticsearch.index.mapper.object.RootObjectMapper; import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider; import org.elasticsearch.index.merge.policy.MergePolicyProvider; @@ -89,13 +86,16 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; -import static com.carrotsearch.randomizedtesting.RandomizedTest.*; +import static com.carrotsearch.randomizedtesting.RandomizedTest.between; +import static com.carrotsearch.randomizedtesting.RandomizedTest.randomBoolean; +import static com.carrotsearch.randomizedtesting.RandomizedTest.randomDouble; +import static com.carrotsearch.randomizedtesting.RandomizedTest.randomIntBetween; +import static com.carrotsearch.randomizedtesting.RandomizedTest.randomLong; import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; -import static org.elasticsearch.test.ElasticsearchTestCase.awaitBusy; +import static org.elasticsearch.test.ElasticsearchTestCase.*; import static org.elasticsearch.test.ElasticsearchTestCase.randomFrom; -import static org.elasticsearch.test.ElasticsearchTestCase.terminate; import static org.hamcrest.Matchers.*; public class InternalEngineTests extends ElasticsearchLuceneTestCase { @@ -108,6 +108,9 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { private Store store; private Store storeReplica; + protected Translog translog; + protected Translog replicaTranslog; + protected Engine engine; protected Engine replicaEngine; @@ -131,12 +134,14 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { storeReplica = createStore(); storeReplica.deleteContent(); engineSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build()); - engine = createEngine(engineSettingsService, store, createTranslog()); + translog = createTranslog(); + engine = createEngine(engineSettingsService, store, translog); if (randomBoolean()) { ((InternalEngine)engine).config().setEnableGcDeletes(false); } replicaSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build()); - replicaEngine = createEngine(replicaSettingsService, storeReplica, createTranslogReplica()); + replicaTranslog = createTranslogReplica(); + replicaEngine = createEngine(replicaSettingsService, storeReplica, replicaTranslog); if (randomBoolean()) { ((InternalEngine)engine).config().setEnableGcDeletes(false); @@ -742,7 +747,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { @Test public void testSimpleRecover() throws Exception { - ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, false); + final ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, false); engine.create(new Engine.Create(null, analyzer, newUid("1"), doc)); engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false); @@ -751,7 +756,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { public void phase1(SnapshotIndexCommit snapshot) throws EngineException { try { engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false); - assertThat("flush is not allowed in phase 3", false, equalTo(true)); + assertThat("flush is not allowed in phase 1", false, equalTo(true)); } catch (FlushNotAllowedEngineException e) { // all is well } @@ -762,15 +767,18 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { MatcherAssert.assertThat(snapshot, TranslogSizeMatcher.translogSize(0)); try { engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false); - assertThat("flush is not allowed in phase 3", false, equalTo(true)); + assertThat("flush is not allowed in phase 2", false, equalTo(true)); } catch (FlushNotAllowedEngineException e) { // all is well } + + // but we can index + engine.index(new Engine.Index(null, analyzer, newUid("1"), doc)); } @Override public void phase3(Translog.Snapshot snapshot) throws EngineException { - MatcherAssert.assertThat(snapshot, TranslogSizeMatcher.translogSize(0)); + MatcherAssert.assertThat(snapshot, TranslogSizeMatcher.translogSize(1)); try { // we can do this here since we are on the same thread engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false); @@ -780,6 +788,8 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { } } }); + // post recovery should flush the translog + MatcherAssert.assertThat(translog.snapshot(), TranslogSizeMatcher.translogSize(0)); engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false); engine.close(); diff --git a/src/test/java/org/elasticsearch/test/BackgroundIndexer.java b/src/test/java/org/elasticsearch/test/BackgroundIndexer.java index 2cafcef5d9f..02441008f69 100644 --- a/src/test/java/org/elasticsearch/test/BackgroundIndexer.java +++ b/src/test/java/org/elasticsearch/test/BackgroundIndexer.java @@ -18,6 +18,8 @@ package org.elasticsearch.test;/* */ import com.carrotsearch.randomizedtesting.RandomizedTest; +import com.carrotsearch.randomizedtesting.generators.RandomInts; +import com.carrotsearch.randomizedtesting.generators.RandomStrings; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; @@ -25,8 +27,12 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; import org.junit.Assert; +import java.io.IOException; +import java.util.Random; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; @@ -51,6 +57,9 @@ public class BackgroundIndexer implements AutoCloseable { final AtomicBoolean hasBudget = new AtomicBoolean(false); // when set to true, writers will acquire writes from a semaphore final Semaphore availableBudget = new Semaphore(0); + volatile int minFieldSize = 10; + volatile int maxFieldSize = 140; + /** * Start indexing in the background using a random number of threads. * @@ -86,7 +95,7 @@ public class BackgroundIndexer implements AutoCloseable { * @param writerCount number of indexing threads to use */ public BackgroundIndexer(String index, String type, Client client, int numOfDocs, final int writerCount) { - this(index, type, client, numOfDocs, writerCount, true); + this(index, type, client, numOfDocs, writerCount, true, null); } /** @@ -99,16 +108,22 @@ public class BackgroundIndexer implements AutoCloseable { * @param numOfDocs number of document to index before pausing. Set to -1 to have no limit. * @param writerCount number of indexing threads to use * @param autoStart set to true to start indexing as soon as all threads have been created. + * @param random random instance to use */ - public BackgroundIndexer(final String index, final String type, final Client client, final int numOfDocs, final int writerCount, boolean autoStart) { + public BackgroundIndexer(final String index, final String type, final Client client, final int numOfDocs, final int writerCount, + boolean autoStart, Random random) { + if (random == null) { + random = RandomizedTest.getRandom(); + } failures = new CopyOnWriteArrayList<>(); writers = new Thread[writerCount]; stopLatch = new CountDownLatch(writers.length); logger.info("--> creating {} indexing threads (auto start: [{}], numOfDocs: [{}])", writerCount, autoStart, numOfDocs); for (int i = 0; i < writers.length; i++) { final int indexerId = i; - final boolean batch = RandomizedTest.getRandom().nextBoolean(); + final boolean batch = random.nextBoolean(); + final Random threadRandom = new Random(random.nextLong()); writers[i] = new Thread() { @Override public void run() { @@ -118,7 +133,7 @@ public class BackgroundIndexer implements AutoCloseable { logger.info("**** starting indexing thread {}", indexerId); while (!stop.get()) { if (batch) { - int batchSize = RandomizedTest.getRandom().nextInt(20) + 1; + int batchSize = threadRandom.nextInt(20) + 1; if (hasBudget.get()) { batchSize = Math.max(Math.min(batchSize, availableBudget.availablePermits()), 1);// always try to get at least one if (!availableBudget.tryAcquire(batchSize, 250, TimeUnit.MILLISECONDS)) { @@ -130,7 +145,7 @@ public class BackgroundIndexer implements AutoCloseable { BulkRequestBuilder bulkRequest = client.prepareBulk(); for (int i = 0; i < batchSize; i++) { id = idGenerator.incrementAndGet(); - bulkRequest.add(client.prepareIndex(index, type, Long.toString(id)).setSource("test", "value" + id)); + bulkRequest.add(client.prepareIndex(index, type, Long.toString(id)).setSource(generateSource(id, threadRandom))); } BulkResponse bulkResponse = bulkRequest.get(); for (BulkItemResponse bulkItemResponse : bulkResponse) { @@ -149,7 +164,7 @@ public class BackgroundIndexer implements AutoCloseable { continue; } id = idGenerator.incrementAndGet(); - client.prepareIndex(index, type, Long.toString(id) + "-" + indexerId).setSource("test", "value" + id).get(); + client.prepareIndex(index, type, Long.toString(id) + "-" + indexerId).setSource(generateSource(id, threadRandom)).get(); indexCounter.incrementAndGet(); } } @@ -170,6 +185,21 @@ public class BackgroundIndexer implements AutoCloseable { } } + private XContentBuilder generateSource(long id, Random random) throws IOException { + int contentLength = RandomInts.randomIntBetween(random, minFieldSize, maxFieldSize); + StringBuilder text = new StringBuilder(contentLength); + while (text.length() < contentLength) { + int tokenLength = RandomInts.randomIntBetween(random, 1, Math.min(contentLength - text.length(), 10)); + text.append(" ").append(RandomStrings.randomRealisticUnicodeOfCodepointLength(random, tokenLength)); + } + XContentBuilder builder = XContentFactory.smileBuilder(); + builder.startObject().field("test", "value" + id) + .field("text", text.toString()) + .endObject(); + return builder; + + } + private void setBudget(int numOfDocs) { logger.debug("updating budget to [{}]", numOfDocs); if (numOfDocs >= 0) { @@ -239,6 +269,16 @@ public class BackgroundIndexer implements AutoCloseable { Assert.assertThat(failures, emptyIterable()); } + /** the minimum size in code points of a payload field in the indexed documents */ + public void setMinFieldSize(int fieldSize) { + minFieldSize = fieldSize; + } + + /** the minimum size in code points of a payload field in the indexed documents */ + public void setMaxFieldSize(int fieldSize) { + maxFieldSize = fieldSize; + } + @Override public void close() throws Exception { stop();