Segments API: Support merge id on segments (groups segments being merged)

Return a merge_id element in each segment of the segments API, allowing to group segments that are being merged as part of a single merge and indicate which ones are being merged now.
closes #3904
This commit is contained in:
Shay Banon 2013-10-14 11:04:48 +02:00
parent 148a772ea0
commit c093e90d51
10 changed files with 324 additions and 9 deletions

View File

@ -24,8 +24,12 @@ import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.merge.OnGoingMerge;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
/**
* An extension to the {@link ConcurrentMergeScheduler} that provides tracking on merge times, total
@ -42,6 +46,9 @@ public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler {
private final CounterMetric currentMergesNumDocs = new CounterMetric();
private final CounterMetric currentMergesSizeInBytes = new CounterMetric();
private final Set<OnGoingMerge> onGoingMerges = ConcurrentCollections.newConcurrentSet();
private final Set<OnGoingMerge> readOnlyOnGoingMerges = Collections.unmodifiableSet(onGoingMerges);
public TrackingConcurrentMergeScheduler(ESLogger logger) {
super();
this.logger = logger;
@ -75,6 +82,10 @@ public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler {
return currentMergesSizeInBytes.count();
}
public Set<OnGoingMerge> onGoingMerges() {
return readOnlyOnGoingMerges;
}
@Override
protected void doMerge(MergePolicy.OneMerge merge) throws IOException {
int totalNumDocs = merge.totalNumDocs();
@ -84,14 +95,22 @@ public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler {
currentMerges.inc();
currentMergesNumDocs.inc(totalNumDocs);
currentMergesSizeInBytes.inc(totalSizeInBytes);
OnGoingMerge onGoingMerge = new OnGoingMerge(merge);
onGoingMerges.add(onGoingMerge);
if (logger.isTraceEnabled()) {
logger.trace("merge [{}] starting..., merging [{}] segments, [{}] docs, [{}] size, into [{}] estimated_size", merge.info == null ? "_na_" : merge.info.info.name, merge.segments.size(), totalNumDocs, new ByteSizeValue(totalSizeInBytes), new ByteSizeValue(merge.estimatedMergeBytes));
}
try {
beforeMerge(onGoingMerge);
super.doMerge(merge);
} finally {
long took = System.currentTimeMillis() - time;
onGoingMerges.remove(onGoingMerge);
afterMerge(onGoingMerge);
currentMerges.dec();
currentMergesNumDocs.dec(totalNumDocs);
currentMergesSizeInBytes.dec(totalSizeInBytes);
@ -106,7 +125,21 @@ public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler {
}
}
}
/**
* A callback allowing for custom logic before an actual merge starts.
*/
protected void beforeMerge(OnGoingMerge merge) {
}
/**
* A callback allowing for custom logic before an actual merge starts.
*/
protected void afterMerge(OnGoingMerge merge) {
}
@Override
public MergeScheduler clone() {
// Lucene IW makes a clone internally but since we hold on to this instance

View File

@ -24,8 +24,12 @@ import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.merge.OnGoingMerge;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
// LUCENE MONITOR - Copied from SerialMergeScheduler
public class TrackingSerialMergeScheduler extends MergeScheduler {
@ -39,6 +43,9 @@ public class TrackingSerialMergeScheduler extends MergeScheduler {
private final CounterMetric currentMergesNumDocs = new CounterMetric();
private final CounterMetric currentMergesSizeInBytes = new CounterMetric();
private final Set<OnGoingMerge> onGoingMerges = ConcurrentCollections.newConcurrentSet();
private final Set<OnGoingMerge> readOnlyOnGoingMerges = Collections.unmodifiableSet(onGoingMerges);
public TrackingSerialMergeScheduler(ESLogger logger) {
this.logger = logger;
}
@ -71,6 +78,10 @@ public class TrackingSerialMergeScheduler extends MergeScheduler {
return currentMergesSizeInBytes.count();
}
public Set<OnGoingMerge> onGoingMerges() {
return readOnlyOnGoingMerges;
}
/**
* Just do the merges in sequence. We do this
* "synchronized" so that even if the application is using
@ -95,15 +106,22 @@ public class TrackingSerialMergeScheduler extends MergeScheduler {
currentMergesNumDocs.inc(totalNumDocs);
currentMergesSizeInBytes.inc(totalSizeInBytes);
OnGoingMerge onGoingMerge = new OnGoingMerge(merge);
onGoingMerges.add(onGoingMerge);
// sadly, segment name is not available since mergeInit is called from merge itself...
if (logger.isTraceEnabled()) {
logger.trace("merge [{}] starting..., merging [{}] segments, [{}] docs, [{}] size, into [{}] estimated_size", merge.info == null ? "_na_" : merge.info.info.name, merge.segments.size(), totalNumDocs, new ByteSizeValue(totalSizeInBytes), new ByteSizeValue(merge.estimatedMergeBytes));
}
try {
beforeMerge(onGoingMerge);
writer.merge(merge);
} finally {
long took = System.currentTimeMillis() - time;
onGoingMerges.remove(onGoingMerge);
afterMerge(onGoingMerge);
currentMerges.dec();
currentMergesNumDocs.dec(totalNumDocs);
currentMergesSizeInBytes.dec(totalSizeInBytes);
@ -120,6 +138,20 @@ public class TrackingSerialMergeScheduler extends MergeScheduler {
}
}
/**
* A callback allowing for custom logic before an actual merge starts.
*/
protected void beforeMerge(OnGoingMerge merge) {
}
/**
* A callback allowing for custom logic before an actual merge starts.
*/
protected void afterMerge(OnGoingMerge merge) {
}
@Override
public void close() {
}

View File

@ -135,6 +135,9 @@ public class IndicesSegmentResponse extends BroadcastOperationResponse implement
if (segment.isCompound() != null) {
builder.field(Fields.COMPOUND, segment.isCompound());
}
if (segment.getMergeId() != null) {
builder.field(Fields.MERGE_ID, segment.getMergeId());
}
builder.endObject();
}
builder.endObject();
@ -173,5 +176,6 @@ public class IndicesSegmentResponse extends BroadcastOperationResponse implement
static final XContentBuilderString SEARCH = new XContentBuilderString("search");
static final XContentBuilderString VERSION = new XContentBuilderString("version");
static final XContentBuilderString COMPOUND = new XContentBuilderString("compound");
static final XContentBuilderString MERGE_ID = new XContentBuilderString("merge_id");
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.engine;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -38,6 +39,7 @@ public class Segment implements Streamable {
public int delDocCount = -1;
public String version = null;
public Boolean compound = null;
public String mergeId;
Segment() {
}
@ -88,6 +90,15 @@ public class Segment implements Streamable {
return compound;
}
/**
* If set, a string representing that the segment is part of a merge, with the value representing the
* group of segments that represent this merge.
*/
@Nullable
public String getMergeId() {
return this.mergeId;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
@ -122,6 +133,9 @@ public class Segment implements Streamable {
sizeInBytes = in.readLong();
version = in.readOptionalString();
compound = in.readOptionalBoolean();
if (in.getVersion().onOrAfter(Version.V_0_90_6)) {
mergeId = in.readOptionalString();
}
}
@Override
@ -134,5 +148,8 @@ public class Segment implements Streamable {
out.writeLong(sizeInBytes);
out.writeOptionalString(version);
out.writeOptionalBoolean(compound);
if (out.getVersion().onOrAfter(Version.V_0_90_6)) {
out.writeOptionalString(mergeId);
}
}
}

View File

@ -55,6 +55,7 @@ import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.*;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.merge.policy.IndexUpgraderMergePolicy;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
@ -900,7 +901,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
} finally {
rwl.readLock().unlock();
}
} finally {
flushLock.unlock();
flushing.decrementAndGet();
@ -1181,6 +1182,19 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
}
});
// fill in the merges flag
Set<OnGoingMerge> onGoingMerges = mergeScheduler.onGoingMerges();
for (OnGoingMerge onGoingMerge : onGoingMerges) {
for (SegmentInfoPerCommit segmentInfoPerCommit : onGoingMerge.getMergedSegments()) {
for (Segment segment : segmentsArr) {
if (segment.getName().equals(segmentInfoPerCommit.info.name)) {
segment.mergeId = onGoingMerge.getId();
break;
}
}
}
}
return Arrays.asList(segmentsArr);
} finally {
rwl.readLock().unlock();
@ -1284,7 +1298,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
}
}
/** Returns whether a leaf reader comes from a merge (versus flush or addIndexes). */
/**
* Returns whether a leaf reader comes from a merge (versus flush or addIndexes).
*/
private static boolean isMergedSegment(AtomicReader reader) {
// We expect leaves to be segment readers
final Map<String, String> diagnostics = ((SegmentReader) reader).getSegmentInfo().info.getDiagnostics();
@ -1332,7 +1348,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
assert isMergedSegment(reader);
final Engine.Searcher searcher = new SimpleSearcher("warmer", new IndexSearcher(reader));
final IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId, searcher);
warmer.warm(context);
if (warmer != null) warmer.warm(context);
} catch (Throwable t) {
// Don't fail a merge if the warm-up failed
if (!closed) {

View File

@ -0,0 +1,53 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.merge;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentInfoPerCommit;
import java.util.List;
/**
* Represents a single on going merge within an index.
*/
public class OnGoingMerge {
private final String id;
private final List<SegmentInfoPerCommit> mergedSegments;
public OnGoingMerge(MergePolicy.OneMerge merge) {
this.id = Integer.toString(System.identityHashCode(merge));
this.mergedSegments = merge.segments;
}
/**
* A unique id for the merge.
*/
public String getId() {
return id;
}
/**
* The list of segments that are being merged.
*/
public List<SegmentInfoPerCommit> getMergedSegments() {
return mergedSegments;
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.merge.scheduler;
import com.google.common.collect.ImmutableSet;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeScheduler;
@ -28,6 +29,7 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
@ -74,6 +76,14 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider {
return mergeStats;
}
@Override
public Set<OnGoingMerge> onGoingMerges() {
for (CustomConcurrentMergeScheduler scheduler : schedulers) {
return scheduler.onGoingMerges();
}
return ImmutableSet.of();
}
public static class CustomConcurrentMergeScheduler extends TrackingConcurrentMergeScheduler {
private final ShardId shardId;
@ -105,5 +115,17 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider {
super.close();
provider.schedulers.remove(this);
}
@Override
protected void beforeMerge(OnGoingMerge merge) {
super.beforeMerge(merge);
provider.beforeMerge(merge);
}
@Override
protected void afterMerge(OnGoingMerge merge) {
super.afterMerge(merge);
provider.afterMerge(merge);
}
}
}

View File

@ -23,12 +23,14 @@ import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeScheduler;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
/**
@ -40,8 +42,27 @@ public abstract class MergeSchedulerProvider<T extends MergeScheduler> extends A
void onFailedMerge(MergePolicy.MergeException e);
}
/**
* Listener for events before/after single merges. Called on the merge thread.
*/
public static interface Listener {
/**
* A callback before a merge is going to execute. Note, any logic here will block the merge
* till its done.
*/
void beforeMerge(OnGoingMerge merge);
/**
* A callback after a merge is going to execute. Note, any logic here will block the merge
* thread.
*/
void afterMerge(OnGoingMerge merge);
}
private final ThreadPool threadPool;
private final CopyOnWriteArrayList<FailureListener> failureListeners = new CopyOnWriteArrayList<FailureListener>();
private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<Listener>();
private final boolean notifyOnMergeFailure;
@ -55,6 +76,14 @@ public abstract class MergeSchedulerProvider<T extends MergeScheduler> extends A
failureListeners.add(listener);
}
public void addListener(Listener listener) {
listeners.add(listener);
}
public void removeListener(Listener listener) {
listeners.remove(listener);
}
protected void failedMerge(final MergePolicy.MergeException e) {
if (!notifyOnMergeFailure) {
return;
@ -69,7 +98,21 @@ public abstract class MergeSchedulerProvider<T extends MergeScheduler> extends A
}
}
protected void beforeMerge(OnGoingMerge merge) {
for (Listener listener : listeners) {
listener.beforeMerge(merge);
}
}
protected void afterMerge(OnGoingMerge merge) {
for (Listener listener : listeners) {
listener.afterMerge(merge);
}
}
public abstract T newMergeScheduler();
public abstract MergeStats stats();
public abstract Set<OnGoingMerge> onGoingMerges();
}

View File

@ -19,11 +19,13 @@
package org.elasticsearch.index.merge.scheduler;
import com.google.common.collect.ImmutableSet;
import org.apache.lucene.index.*;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
@ -62,6 +64,14 @@ public class SerialMergeSchedulerProvider extends MergeSchedulerProvider {
return mergeStats;
}
@Override
public Set<OnGoingMerge> onGoingMerges() {
for (CustomSerialMergeScheduler scheduler : schedulers) {
return scheduler.onGoingMerges();
}
return ImmutableSet.of();
}
public static class CustomSerialMergeScheduler extends TrackingSerialMergeScheduler {
private final SerialMergeSchedulerProvider provider;
@ -87,5 +97,17 @@ public class SerialMergeSchedulerProvider extends MergeSchedulerProvider {
super.close();
provider.schedulers.remove(this);
}
@Override
protected void beforeMerge(OnGoingMerge merge) {
super.beforeMerge(merge);
provider.beforeMerge(merge);
}
@Override
protected void afterMerge(OnGoingMerge merge) {
super.afterMerge(merge);
provider.afterMerge(merge);
}
}
}

View File

@ -27,6 +27,7 @@ import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.TermQuery;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene;
@ -46,8 +47,10 @@ import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.merge.scheduler.SerialMergeSchedulerProvider;
import org.elasticsearch.index.settings.IndexSettingsService;
@ -71,10 +74,8 @@ import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
@ -189,7 +190,11 @@ public class RobinEngineTests extends ElasticsearchTestCase {
}
protected Engine createEngine(IndexSettingsService indexSettingsService, Store store, Translog translog) {
return new RobinEngine(shardId, defaultSettings, threadPool, indexSettingsService, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(),
return createEngine(indexSettingsService, store, translog, createMergeScheduler());
}
protected Engine createEngine(IndexSettingsService indexSettingsService, Store store, Translog translog, MergeSchedulerProvider mergeSchedulerProvider) {
return new RobinEngine(shardId, defaultSettings, threadPool, indexSettingsService, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), mergeSchedulerProvider,
new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new CodecService(shardId.index()));
}
@ -297,6 +302,74 @@ public class RobinEngineTests extends ElasticsearchTestCase {
assertThat(segments.get(2).isCompound(), equalTo(true));
}
@Test
public void testSegmentsWithMergeFlag() throws Exception {
MergeSchedulerProvider mergeSchedulerProvider = new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool);
final AtomicReference<CountDownLatch> waitTillMerge = new AtomicReference<CountDownLatch>();
final AtomicReference<CountDownLatch> waitForMerge = new AtomicReference<CountDownLatch>();
mergeSchedulerProvider.addListener(new MergeSchedulerProvider.Listener() {
@Override
public void beforeMerge(OnGoingMerge merge) {
try {
if (waitTillMerge.get() != null) {
waitTillMerge.get().countDown();
}
if (waitForMerge.get() != null) {
waitForMerge.get().await();
}
} catch (InterruptedException e) {
throw ExceptionsHelper.convertToRuntime(e);
}
}
@Override
public void afterMerge(OnGoingMerge merge) {
}
});
Engine engine = createEngine(engineSettingsService, store, createTranslog(), mergeSchedulerProvider);
engine.start();
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
engine.flush(new Engine.Flush());
assertThat(engine.segments().size(), equalTo(1));
index = new Engine.Index(null, newUid("2"), doc);
engine.index(index);
engine.flush(new Engine.Flush());
assertThat(engine.segments().size(), equalTo(2));
for (Segment segment : engine.segments()) {
assertThat(segment.getMergeId(), nullValue());
}
index = new Engine.Index(null, newUid("3"), doc);
engine.index(index);
engine.flush(new Engine.Flush());
assertThat(engine.segments().size(), equalTo(3));
for (Segment segment : engine.segments()) {
assertThat(segment.getMergeId(), nullValue());
}
waitTillMerge.set(new CountDownLatch(1));
waitForMerge.set(new CountDownLatch(1));
engine.optimize(new Engine.Optimize().maxNumSegments(1).waitForMerge(false));
waitTillMerge.get().await();
for (Segment segment : engine.segments()) {
assertThat(segment.getMergeId(), notNullValue());
}
waitForMerge.get().countDown();
// now, optimize and wait for merges, see that we have no merge flag
engine.optimize(new Engine.Optimize().maxNumSegments(1).waitForMerge(true));
for (Segment segment : engine.segments()) {
assertThat(segment.getMergeId(), nullValue());
}
engine.close();
}
@Test
public void testSimpleOperations() throws Exception {
Engine.Searcher searchResult = engine.acquireSearcher("test");