From 65d3b61b975121c19d79843b24b6c6567a7f468c Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Fri, 28 Feb 2014 11:54:45 +0100 Subject: [PATCH] Add an option to force _optimize operations. When forced, the index will be merged even if it contains a single segment with no deletions. Close #5243 --- docs/reference/indices/optimize.asciidoc | 3 ++ rest-api-spec/api/indices.optimize.json | 4 +++ .../indices/optimize/OptimizeRequest.java | 25 +++++++++++++++ .../optimize/OptimizeRequestBuilder.java | 9 ++++++ .../optimize/ShardOptimizeRequest.java | 13 ++++++++ .../optimize/TransportOptimizeAction.java | 1 + .../elasticsearch/index/engine/Engine.java | 12 ++++++- .../index/engine/internal/InternalEngine.java | 27 ++++++++++++++-- ...icy.java => ElasticsearchMergePolicy.java} | 32 ++++++++++++++++--- .../indices/optimize/RestOptimizeAction.java | 1 + .../common/lucene/uid/VersionsTests.java | 4 +-- .../engine/internal/InternalEngineTests.java | 19 +++++++++++ .../test/ElasticsearchIntegrationTest.java | 2 +- 13 files changed, 142 insertions(+), 10 deletions(-) rename src/main/java/org/elasticsearch/index/merge/policy/{IndexUpgraderMergePolicy.java => ElasticsearchMergePolicy.java} (88%) diff --git a/docs/reference/indices/optimize.asciidoc b/docs/reference/indices/optimize.asciidoc index 39daf351d11..12babffcabf 100644 --- a/docs/reference/indices/optimize.asciidoc +++ b/docs/reference/indices/optimize.asciidoc @@ -36,6 +36,9 @@ only merge segments that have deletes. Defaults to `false`. to `true`. Note, a merge can potentially be a very heavy operation, so it might make sense to run it set to `false`. +`force`:: Force a merge operation, even if there is a single segment in the +shard with no deletions. coming[1.1.0] + [float] [[optimize-multi-index]] === Multi Index diff --git a/rest-api-spec/api/indices.optimize.json b/rest-api-spec/api/indices.optimize.json index 6e0292ec942..d92b8d8ab89 100644 --- a/rest-api-spec/api/indices.optimize.json +++ b/rest-api-spec/api/indices.optimize.json @@ -44,6 +44,10 @@ "wait_for_merge": { "type" : "boolean", "description" : "Specify whether the request should block until the merge process is finished (default: true)" + }, + "force": { + "type": "boolean", + "description": "Force a merge operation to run, even if there is a single segment in the index (default: false)" } } }, diff --git a/src/main/java/org/elasticsearch/action/admin/indices/optimize/OptimizeRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/optimize/OptimizeRequest.java index eda0aa49753..8403c1b0f97 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/optimize/OptimizeRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/optimize/OptimizeRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.indices.optimize; +import org.elasticsearch.Version; import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -46,12 +47,14 @@ public class OptimizeRequest extends BroadcastOperationRequest public static final int MAX_NUM_SEGMENTS = -1; public static final boolean ONLY_EXPUNGE_DELETES = false; public static final boolean FLUSH = true; + public static final boolean FORCE = false; } private boolean waitForMerge = Defaults.WAIT_FOR_MERGE; private int maxNumSegments = Defaults.MAX_NUM_SEGMENTS; private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES; private boolean flush = Defaults.FLUSH; + private boolean force = Defaults.FORCE; /** * Constructs an optimization request over one or more indices. @@ -130,12 +133,31 @@ public class OptimizeRequest extends BroadcastOperationRequest return this; } + /** + * Should the merge be forced even if there is a single segment with no deletions in the shard. + * Defaults to false. + */ + public boolean force() { + return force; + } + + /** + * See #force(). + */ + public OptimizeRequest force(boolean force) { + this.force = force; + return this; + } + public void readFrom(StreamInput in) throws IOException { super.readFrom(in); waitForMerge = in.readBoolean(); maxNumSegments = in.readInt(); onlyExpungeDeletes = in.readBoolean(); flush = in.readBoolean(); + if (in.getVersion().onOrAfter(Version.V_1_1_0)) { + force = in.readBoolean(); + } } public void writeTo(StreamOutput out) throws IOException { @@ -144,5 +166,8 @@ public class OptimizeRequest extends BroadcastOperationRequest out.writeInt(maxNumSegments); out.writeBoolean(onlyExpungeDeletes); out.writeBoolean(flush); + if (out.getVersion().onOrAfter(Version.V_1_1_0)) { + out.writeBoolean(force); + } } } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/optimize/OptimizeRequestBuilder.java b/src/main/java/org/elasticsearch/action/admin/indices/optimize/OptimizeRequestBuilder.java index ac99dd41317..1a317d600ee 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/optimize/OptimizeRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/optimize/OptimizeRequestBuilder.java @@ -74,6 +74,15 @@ public class OptimizeRequestBuilder extends BroadcastOperationRequestBuilderfalse. + */ + public OptimizeRequestBuilder setForce(boolean force) { + request.force(force); + return this; + } + @Override protected void doExecute(ActionListener listener) { ((IndicesAdminClient) client).optimize(request, listener); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/optimize/ShardOptimizeRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/optimize/ShardOptimizeRequest.java index 4cd66305604..8cdf46fba9f 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/optimize/ShardOptimizeRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/optimize/ShardOptimizeRequest.java @@ -19,6 +19,8 @@ package org.elasticsearch.action.admin.indices.optimize; + +import org.elasticsearch.Version; import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -34,6 +36,7 @@ class ShardOptimizeRequest extends BroadcastShardOperationRequest { private int maxNumSegments = OptimizeRequest.Defaults.MAX_NUM_SEGMENTS; private boolean onlyExpungeDeletes = OptimizeRequest.Defaults.ONLY_EXPUNGE_DELETES; private boolean flush = OptimizeRequest.Defaults.FLUSH; + private boolean force = OptimizeRequest.Defaults.FORCE; ShardOptimizeRequest() { } @@ -62,6 +65,10 @@ class ShardOptimizeRequest extends BroadcastShardOperationRequest { return flush; } + public boolean force() { + return force; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -69,6 +76,9 @@ class ShardOptimizeRequest extends BroadcastShardOperationRequest { maxNumSegments = in.readInt(); onlyExpungeDeletes = in.readBoolean(); flush = in.readBoolean(); + if (in.getVersion().onOrAfter(Version.V_1_1_0)) { + force = in.readBoolean(); + } } @Override @@ -78,5 +88,8 @@ class ShardOptimizeRequest extends BroadcastShardOperationRequest { out.writeInt(maxNumSegments); out.writeBoolean(onlyExpungeDeletes); out.writeBoolean(flush); + if (out.getVersion().onOrAfter(Version.V_1_1_0)) { + out.writeBoolean(force); + } } } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java b/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java index ad879112de1..5ec23c8cd9c 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java @@ -117,6 +117,7 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction * It can be useful to use the background merging process to upgrade segments, * for example when we perform internal changes that imply different index @@ -50,12 +51,13 @@ import java.util.Map; * For now, this {@link MergePolicy} takes care of moving versions that used to * be stored as payloads to numeric doc values. */ -public final class IndexUpgraderMergePolicy extends MergePolicy { +public final class ElasticsearchMergePolicy extends MergePolicy { private final MergePolicy delegate; + private volatile boolean force; /** @param delegate the merge policy to wrap */ - public IndexUpgraderMergePolicy(MergePolicy delegate) { + public ElasticsearchMergePolicy(MergePolicy delegate) { this.delegate = delegate; } @@ -194,6 +196,19 @@ public final class IndexUpgraderMergePolicy extends MergePolicy { public MergeSpecification findForcedMerges(SegmentInfos segmentInfos, int maxSegmentCount, Map segmentsToMerge) throws IOException { + if (force) { + List segments = Lists.newArrayList(); + for (SegmentCommitInfo info : segmentInfos) { + if (segmentsToMerge.containsKey(info)) { + segments.add(info); + } + } + if (!segments.isEmpty()) { + MergeSpecification spec = new IndexUpgraderMergeSpecification(); + spec.add(new OneMerge(segments)); + return spec; + } + } return upgradedMergeSpecification(delegate.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge)); } @@ -205,7 +220,7 @@ public final class IndexUpgraderMergePolicy extends MergePolicy { @Override public MergePolicy clone() { - return new IndexUpgraderMergePolicy(delegate.clone()); + return new ElasticsearchMergePolicy(delegate.clone()); } @Override @@ -224,6 +239,15 @@ public final class IndexUpgraderMergePolicy extends MergePolicy { delegate.setIndexWriter(writer); } + /** + * When force is true, running a force merge will cause a merge even if there + * is a single segment in the directory. This will apply to all calls to + * {@link IndexWriter#forceMerge} that are handled by this {@link MergePolicy}. + */ + public void setForce(boolean force) { + this.force = force; + } + @Override public String toString() { return getClass().getSimpleName() + "(" + delegate + ")"; diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/optimize/RestOptimizeAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/optimize/RestOptimizeAction.java index d4897ba2dd2..44f044c2291 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/optimize/RestOptimizeAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/optimize/RestOptimizeAction.java @@ -65,6 +65,7 @@ public class RestOptimizeAction extends BaseRestHandler { optimizeRequest.maxNumSegments(request.paramAsInt("max_num_segments", optimizeRequest.maxNumSegments())); optimizeRequest.onlyExpungeDeletes(request.paramAsBoolean("only_expunge_deletes", optimizeRequest.onlyExpungeDeletes())); optimizeRequest.flush(request.paramAsBoolean("flush", optimizeRequest.flush())); + optimizeRequest.force(request.paramAsBoolean("force", optimizeRequest.force())); BroadcastOperationThreading operationThreading = BroadcastOperationThreading.fromString(request.param("operation_threading"), BroadcastOperationThreading.THREAD_PER_SHARD); if (operationThreading == BroadcastOperationThreading.NO_THREADS) { diff --git a/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java b/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java index 855605866a4..5bffe748e93 100644 --- a/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java +++ b/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java @@ -35,7 +35,7 @@ import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.mapper.internal.VersionFieldMapper; import org.elasticsearch.index.merge.Merges; -import org.elasticsearch.index.merge.policy.IndexUpgraderMergePolicy; +import org.elasticsearch.index.merge.policy.ElasticsearchMergePolicy; import org.elasticsearch.test.ElasticsearchLuceneTestCase; import org.hamcrest.MatcherAssert; import org.junit.Test; @@ -226,7 +226,7 @@ public class VersionsTests extends ElasticsearchLuceneTestCase { @Test public void testMergingOldIndices() throws Exception { final IndexWriterConfig iwConf = new IndexWriterConfig(Lucene.VERSION, new KeywordAnalyzer()); - iwConf.setMergePolicy(new IndexUpgraderMergePolicy(iwConf.getMergePolicy())); + iwConf.setMergePolicy(new ElasticsearchMergePolicy(iwConf.getMergePolicy())); final Directory dir = newDirectory(); final IndexWriter iw = new IndexWriter(dir, iwConf); diff --git a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java index 0801b57d51b..081f44eddfe 100644 --- a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java @@ -26,6 +26,7 @@ import org.apache.lucene.document.TextField; import org.apache.lucene.index.IndexDeletionPolicy; import org.apache.lucene.index.Term; import org.apache.lucene.search.TermQuery; +import org.apache.lucene.util.Version; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -308,6 +309,10 @@ public class InternalEngineTests extends ElasticsearchTestCase { assertThat(segments.get(2).isCompound(), equalTo(true)); } + static { + assert Version.LUCENE_47.onOrAfter(Lucene.VERSION) : "LUCENE-5481 is fixed, improve test below"; + } + @Test public void testSegmentsWithMergeFlag() throws Exception { ConcurrentMergeSchedulerProvider mergeSchedulerProvider = new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool); @@ -377,6 +382,20 @@ public class InternalEngineTests extends ElasticsearchTestCase { assertThat(segment.getMergeId(), nullValue()); } + // forcing an optimize will merge this single segment shard + // TODO: put a random boolean again once LUCENE-5481 is fixed + final boolean force = true; // randomBoolean(); + waitTillMerge.set(new CountDownLatch(1)); + waitForMerge.set(new CountDownLatch(1)); + engine.optimize(new Engine.Optimize().flush(true).maxNumSegments(1).force(force).waitForMerge(false)); + waitTillMerge.get().await(); + + for (Segment segment : engine.segments()) { + assertThat(segment.getMergeId(), force ? notNullValue() : nullValue()); + } + + waitForMerge.get().countDown(); + engine.close(); } diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index a08c72fc7c1..846ada3a573 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -730,7 +730,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase */ protected OptimizeResponse optimize() { waitForRelocation(); - OptimizeResponse actionGet = client().admin().indices().prepareOptimize().execute().actionGet(); + OptimizeResponse actionGet = client().admin().indices().prepareOptimize().setForce(randomBoolean()).execute().actionGet(); assertNoFailures(actionGet); return actionGet; }