Add an option to force _optimize operations.

When forced, the index will be merged even if it contains a single segment with
no deletions.

Close #5243
This commit is contained in:
Adrien Grand 2014-02-28 11:54:45 +01:00
parent b6dc7cecd5
commit 65d3b61b97
13 changed files with 142 additions and 10 deletions

View File

@ -36,6 +36,9 @@ only merge segments that have deletes. Defaults to `false`.
to `true`. Note, a merge can potentially be a very heavy operation, so
it might make sense to run it set to `false`.
`force`:: Force a merge operation, even if there is a single segment in the
shard with no deletions. coming[1.1.0]
[float]
[[optimize-multi-index]]
=== Multi Index

View File

@ -44,6 +44,10 @@
"wait_for_merge": {
"type" : "boolean",
"description" : "Specify whether the request should block until the merge process is finished (default: true)"
},
"force": {
"type": "boolean",
"description": "Force a merge operation to run, even if there is a single segment in the index (default: false)"
}
}
},

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.indices.optimize;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -46,12 +47,14 @@ public class OptimizeRequest extends BroadcastOperationRequest<OptimizeRequest>
public static final int MAX_NUM_SEGMENTS = -1;
public static final boolean ONLY_EXPUNGE_DELETES = false;
public static final boolean FLUSH = true;
public static final boolean FORCE = false;
}
private boolean waitForMerge = Defaults.WAIT_FOR_MERGE;
private int maxNumSegments = Defaults.MAX_NUM_SEGMENTS;
private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES;
private boolean flush = Defaults.FLUSH;
private boolean force = Defaults.FORCE;
/**
* Constructs an optimization request over one or more indices.
@ -130,12 +133,31 @@ public class OptimizeRequest extends BroadcastOperationRequest<OptimizeRequest>
return this;
}
/**
* Should the merge be forced even if there is a single segment with no deletions in the shard.
* Defaults to <tt>false</tt>.
*/
public boolean force() {
return force;
}
/**
* See #force().
*/
public OptimizeRequest force(boolean force) {
this.force = force;
return this;
}
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
waitForMerge = in.readBoolean();
maxNumSegments = in.readInt();
onlyExpungeDeletes = in.readBoolean();
flush = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_1_1_0)) {
force = in.readBoolean();
}
}
public void writeTo(StreamOutput out) throws IOException {
@ -144,5 +166,8 @@ public class OptimizeRequest extends BroadcastOperationRequest<OptimizeRequest>
out.writeInt(maxNumSegments);
out.writeBoolean(onlyExpungeDeletes);
out.writeBoolean(flush);
if (out.getVersion().onOrAfter(Version.V_1_1_0)) {
out.writeBoolean(force);
}
}
}

View File

@ -74,6 +74,15 @@ public class OptimizeRequestBuilder extends BroadcastOperationRequestBuilder<Opt
return this;
}
/**
* Should the merge be forced even if there is a single segment with no deletions in the shard.
* Defaults to <tt>false</tt>.
*/
public OptimizeRequestBuilder setForce(boolean force) {
request.force(force);
return this;
}
@Override
protected void doExecute(ActionListener<OptimizeResponse> listener) {
((IndicesAdminClient) client).optimize(request, listener);

View File

@ -19,6 +19,8 @@
package org.elasticsearch.action.admin.indices.optimize;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -34,6 +36,7 @@ class ShardOptimizeRequest extends BroadcastShardOperationRequest {
private int maxNumSegments = OptimizeRequest.Defaults.MAX_NUM_SEGMENTS;
private boolean onlyExpungeDeletes = OptimizeRequest.Defaults.ONLY_EXPUNGE_DELETES;
private boolean flush = OptimizeRequest.Defaults.FLUSH;
private boolean force = OptimizeRequest.Defaults.FORCE;
ShardOptimizeRequest() {
}
@ -62,6 +65,10 @@ class ShardOptimizeRequest extends BroadcastShardOperationRequest {
return flush;
}
public boolean force() {
return force;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -69,6 +76,9 @@ class ShardOptimizeRequest extends BroadcastShardOperationRequest {
maxNumSegments = in.readInt();
onlyExpungeDeletes = in.readBoolean();
flush = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_1_1_0)) {
force = in.readBoolean();
}
}
@Override
@ -78,5 +88,8 @@ class ShardOptimizeRequest extends BroadcastShardOperationRequest {
out.writeInt(maxNumSegments);
out.writeBoolean(onlyExpungeDeletes);
out.writeBoolean(flush);
if (out.getVersion().onOrAfter(Version.V_1_1_0)) {
out.writeBoolean(force);
}
}
}

View File

@ -117,6 +117,7 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction<O
.maxNumSegments(request.maxNumSegments())
.onlyExpungeDeletes(request.onlyExpungeDeletes())
.flush(request.flush())
.force(request.force())
);
return new ShardOptimizeResponse(request.index(), request.shardId());
}

View File

@ -307,6 +307,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
private int maxNumSegments = -1;
private boolean onlyExpungeDeletes = false;
private boolean flush = false;
private boolean force = false;
public Optimize() {
}
@ -347,9 +348,18 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this;
}
public boolean force() {
return force;
}
public Optimize force(boolean force) {
this.force = force;
return this;
}
@Override
public String toString() {
return "waitForMerge[" + waitForMerge + "], maxNumSegments[" + maxNumSegments + "], onlyExpungeDeletes[" + onlyExpungeDeletes + "], flush[" + flush + "]";
return "waitForMerge[" + waitForMerge + "], maxNumSegments[" + maxNumSegments + "], onlyExpungeDeletes[" + onlyExpungeDeletes + "], flush[" + flush + "], force[" + force + "]";
}
}

View File

@ -57,7 +57,7 @@ import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.Merges;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.merge.policy.IndexUpgraderMergePolicy;
import org.elasticsearch.index.merge.policy.ElasticsearchMergePolicy;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.search.nested.IncludeNestedDocsQuery;
@ -939,9 +939,28 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
flush(new Flush().force(true).waitIfOngoing(true));
}
if (optimizeMutex.compareAndSet(false, true)) {
ElasticsearchMergePolicy elasticsearchMergePolicy = null;
rwl.readLock().lock();
try {
ensureOpen();
if (indexWriter.getConfig().getMergePolicy() instanceof ElasticsearchMergePolicy) {
elasticsearchMergePolicy = (ElasticsearchMergePolicy) indexWriter.getConfig().getMergePolicy();
}
if (optimize.force() && elasticsearchMergePolicy == null) {
throw new ElasticsearchIllegalStateException("The `force` flag can only be used if the merge policy is an instance of "
+ ElasticsearchMergePolicy.class.getSimpleName() + ", got [" + indexWriter.getConfig().getMergePolicy().getClass().getName() + "]");
}
/*
* The way we implement "forced forced merges" is a bit hackish in the sense that we set an instance variable and that this
* setting will thus apply to all forced merges that will be run until `force` is set back to false. However, since
* InternalEngine.optimize is the only place in code where we call forceMerge and since calls are protected with
* `optimizeMutex`, this has the expected behavior.
*/
if (optimize.force()) {
elasticsearchMergePolicy.setForce(true);
}
if (optimize.onlyExpungeDeletes()) {
Merges.forceMergeDeletes(indexWriter, false);
} else if (optimize.maxNumSegments() <= 0) {
@ -961,6 +980,9 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
} catch (Throwable e) {
throw new OptimizeFailedEngineException(shardId, e);
} finally {
if (elasticsearchMergePolicy != null) {
elasticsearchMergePolicy.setForce(false);
}
rwl.readLock().unlock();
optimizeMutex.set(false);
}
@ -1320,7 +1342,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
MergePolicy mergePolicy = mergePolicyProvider.newMergePolicy();
// Give us the opportunity to upgrade old segments while performing
// background merges
mergePolicy = new IndexUpgraderMergePolicy(mergePolicy);
mergePolicy = new ElasticsearchMergePolicy(mergePolicy);
config.setMergePolicy(mergePolicy);
config.setSimilarity(similarityService.similarity());
config.setRAMBufferSizeMB(indexingBufferSize.mbFrac());
@ -1608,4 +1630,5 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
return ongoingRecoveries;
}
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.merge.policy;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.lucene.index.*;
import org.apache.lucene.index.FieldInfo.DocValuesType;
import org.apache.lucene.index.FieldInfo.IndexOptions;
@ -39,7 +40,7 @@ import java.util.List;
import java.util.Map;
/**
* A {@link MergePolicy} that upgrades segments.
* A {@link MergePolicy} that upgrades segments and can force merges.
* <p>
* It can be useful to use the background merging process to upgrade segments,
* for example when we perform internal changes that imply different index
@ -50,12 +51,13 @@ import java.util.Map;
* For now, this {@link MergePolicy} takes care of moving versions that used to
* be stored as payloads to numeric doc values.
*/
public final class IndexUpgraderMergePolicy extends MergePolicy {
public final class ElasticsearchMergePolicy extends MergePolicy {
private final MergePolicy delegate;
private volatile boolean force;
/** @param delegate the merge policy to wrap */
public IndexUpgraderMergePolicy(MergePolicy delegate) {
public ElasticsearchMergePolicy(MergePolicy delegate) {
this.delegate = delegate;
}
@ -194,6 +196,19 @@ public final class IndexUpgraderMergePolicy extends MergePolicy {
public MergeSpecification findForcedMerges(SegmentInfos segmentInfos,
int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge)
throws IOException {
if (force) {
List<SegmentCommitInfo> segments = Lists.newArrayList();
for (SegmentCommitInfo info : segmentInfos) {
if (segmentsToMerge.containsKey(info)) {
segments.add(info);
}
}
if (!segments.isEmpty()) {
MergeSpecification spec = new IndexUpgraderMergeSpecification();
spec.add(new OneMerge(segments));
return spec;
}
}
return upgradedMergeSpecification(delegate.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge));
}
@ -205,7 +220,7 @@ public final class IndexUpgraderMergePolicy extends MergePolicy {
@Override
public MergePolicy clone() {
return new IndexUpgraderMergePolicy(delegate.clone());
return new ElasticsearchMergePolicy(delegate.clone());
}
@Override
@ -224,6 +239,15 @@ public final class IndexUpgraderMergePolicy extends MergePolicy {
delegate.setIndexWriter(writer);
}
/**
* When <code>force</code> is true, running a force merge will cause a merge even if there
* is a single segment in the directory. This will apply to all calls to
* {@link IndexWriter#forceMerge} that are handled by this {@link MergePolicy}.
*/
public void setForce(boolean force) {
this.force = force;
}
@Override
public String toString() {
return getClass().getSimpleName() + "(" + delegate + ")";

View File

@ -65,6 +65,7 @@ public class RestOptimizeAction extends BaseRestHandler {
optimizeRequest.maxNumSegments(request.paramAsInt("max_num_segments", optimizeRequest.maxNumSegments()));
optimizeRequest.onlyExpungeDeletes(request.paramAsBoolean("only_expunge_deletes", optimizeRequest.onlyExpungeDeletes()));
optimizeRequest.flush(request.paramAsBoolean("flush", optimizeRequest.flush()));
optimizeRequest.force(request.paramAsBoolean("force", optimizeRequest.force()));
BroadcastOperationThreading operationThreading = BroadcastOperationThreading.fromString(request.param("operation_threading"), BroadcastOperationThreading.THREAD_PER_SHARD);
if (operationThreading == BroadcastOperationThreading.NO_THREADS) {

View File

@ -35,7 +35,7 @@ import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.mapper.internal.VersionFieldMapper;
import org.elasticsearch.index.merge.Merges;
import org.elasticsearch.index.merge.policy.IndexUpgraderMergePolicy;
import org.elasticsearch.index.merge.policy.ElasticsearchMergePolicy;
import org.elasticsearch.test.ElasticsearchLuceneTestCase;
import org.hamcrest.MatcherAssert;
import org.junit.Test;
@ -226,7 +226,7 @@ public class VersionsTests extends ElasticsearchLuceneTestCase {
@Test
public void testMergingOldIndices() throws Exception {
final IndexWriterConfig iwConf = new IndexWriterConfig(Lucene.VERSION, new KeywordAnalyzer());
iwConf.setMergePolicy(new IndexUpgraderMergePolicy(iwConf.getMergePolicy()));
iwConf.setMergePolicy(new ElasticsearchMergePolicy(iwConf.getMergePolicy()));
final Directory dir = newDirectory();
final IndexWriter iw = new IndexWriter(dir, iwConf);

View File

@ -26,6 +26,7 @@ import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.util.Version;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
@ -308,6 +309,10 @@ public class InternalEngineTests extends ElasticsearchTestCase {
assertThat(segments.get(2).isCompound(), equalTo(true));
}
static {
assert Version.LUCENE_47.onOrAfter(Lucene.VERSION) : "LUCENE-5481 is fixed, improve test below";
}
@Test
public void testSegmentsWithMergeFlag() throws Exception {
ConcurrentMergeSchedulerProvider mergeSchedulerProvider = new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool);
@ -377,6 +382,20 @@ public class InternalEngineTests extends ElasticsearchTestCase {
assertThat(segment.getMergeId(), nullValue());
}
// forcing an optimize will merge this single segment shard
// TODO: put a random boolean again once LUCENE-5481 is fixed
final boolean force = true; // randomBoolean();
waitTillMerge.set(new CountDownLatch(1));
waitForMerge.set(new CountDownLatch(1));
engine.optimize(new Engine.Optimize().flush(true).maxNumSegments(1).force(force).waitForMerge(false));
waitTillMerge.get().await();
for (Segment segment : engine.segments()) {
assertThat(segment.getMergeId(), force ? notNullValue() : nullValue());
}
waitForMerge.get().countDown();
engine.close();
}

View File

@ -730,7 +730,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
*/
protected OptimizeResponse optimize() {
waitForRelocation();
OptimizeResponse actionGet = client().admin().indices().prepareOptimize().execute().actionGet();
OptimizeResponse actionGet = client().admin().indices().prepareOptimize().setForce(randomBoolean()).execute().actionGet();
assertNoFailures(actionGet);
return actionGet;
}