Record Force Merges in Live Commit Data (#52694) (#53372)

* Record Force Merges in live commit data

Prerequisite of #52182. Record force merges in the live commit data
so two shard states with the same sequence number that differ only in whether
or not they have been force merged can be distinguished when creating snapshots.
This commit is contained in:
Armin Braun 2020-03-11 06:30:36 +01:00 committed by GitHub
parent 2ab502afc4
commit 7189c57b6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 191 additions and 37 deletions

View File

@ -22,6 +22,7 @@ package org.elasticsearch.index.engine;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.index.mapper.ParsedDocument;
import java.io.IOException;
@ -89,7 +90,7 @@ public class EvilInternalEngineTests extends EngineTestCase {
StreamSupport.stream(e.getLastCommittedSegmentInfos().spliterator(), false).collect(Collectors.toList());
segmentsReference.set(segments);
// trigger a background merge that will be managed by the concurrent merge scheduler
e.forceMerge(randomBoolean(), 0, false, false, false);
e.forceMerge(randomBoolean(), 0, false, false, false, UUIDs.randomBase64UUID());
/*
* Merging happens in the background on a merge thread, and the maybeDie handler is invoked on yet another thread; we have
* to wait for these events to finish.

View File

@ -19,7 +19,10 @@
package org.elasticsearch.action.admin.indices.forcemerge;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.broadcast.BroadcastRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -50,6 +53,15 @@ public class ForceMergeRequest extends BroadcastRequest<ForceMergeRequest> {
private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES;
private boolean flush = Defaults.FLUSH;
private static final Version FORCE_MERGE_UUID_VERSION = Version.V_7_7_0;
/**
* Force merge UUID to store in the live commit data of a shard under
* {@link org.elasticsearch.index.engine.Engine#FORCE_MERGE_UUID_KEY} after force merging it.
*/
@Nullable
private final String forceMergeUUID;
/**
* Constructs a merge request over one or more indices.
*
@ -57,6 +69,7 @@ public class ForceMergeRequest extends BroadcastRequest<ForceMergeRequest> {
*/
public ForceMergeRequest(String... indices) {
super(indices);
forceMergeUUID = UUIDs.randomBase64UUID();
}
public ForceMergeRequest(StreamInput in) throws IOException {
@ -64,6 +77,11 @@ public class ForceMergeRequest extends BroadcastRequest<ForceMergeRequest> {
maxNumSegments = in.readInt();
onlyExpungeDeletes = in.readBoolean();
flush = in.readBoolean();
if (in.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) {
forceMergeUUID = in.readOptionalString();
} else {
forceMergeUUID = null;
}
}
/**
@ -100,6 +118,15 @@ public class ForceMergeRequest extends BroadcastRequest<ForceMergeRequest> {
return this;
}
/**
* Force merge UUID to use when force merging or {@code null} if not using one in a mixed version cluster containing nodes older than
* {@link #FORCE_MERGE_UUID_VERSION}.
*/
@Nullable
public String forceMergeUUID() {
return forceMergeUUID;
}
/**
* Should flush be performed after the merge. Defaults to {@code true}.
*/
@ -129,6 +156,9 @@ public class ForceMergeRequest extends BroadcastRequest<ForceMergeRequest> {
out.writeInt(maxNumSegments);
out.writeBoolean(onlyExpungeDeletes);
out.writeBoolean(flush);
if (out.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) {
out.writeOptionalString(forceMergeUUID);
}
}
@Override

View File

@ -109,6 +109,7 @@ public abstract class Engine implements Closeable {
public static final String SYNC_COMMIT_ID = "sync_id";
public static final String HISTORY_UUID_KEY = "history_uuid";
public static final String FORCE_MERGE_UUID_KEY = "force_merge_uuid";
public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no";
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
@ -1073,18 +1074,12 @@ public abstract class Engine implements Closeable {
*/
public abstract void rollTranslogGeneration() throws EngineException;
/**
* Force merges to 1 segment
*/
public void forceMerge(boolean flush) throws IOException {
forceMerge(flush, 1, false, false, false);
}
/**
* Triggers a forced merge on this engine
*/
public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes,
boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException, IOException;
boolean upgrade, boolean upgradeOnlyAncientSegments,
@Nullable String forceMergeUUID) throws EngineException, IOException;
/**
* Snapshots the most recent index and returns a handle to it. If needed will try and "commit" the

View File

@ -190,6 +190,12 @@ public class InternalEngine extends Engine {
@Nullable
private final String historyUUID;
/**
* UUID value that is updated every time the engine is force merged.
*/
@Nullable
private volatile String forceMergeUUID;
public InternalEngine(EngineConfig engineConfig) {
this(engineConfig, LocalCheckpointTracker::new);
}
@ -236,7 +242,9 @@ public class InternalEngine extends Engine {
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
writer = createWriter();
bootstrapAppendOnlyInfoFromWriter(writer);
historyUUID = loadHistoryUUID(writer);
final Map<String, String> commitData = commitDataAsMap(writer);
historyUUID = loadHistoryUUID(commitData);
forceMergeUUID = commitData.get(FORCE_MERGE_UUID_KEY);
indexWriter = writer;
} catch (IOException | TranslogCorruptedException e) {
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
@ -604,6 +612,12 @@ public class InternalEngine extends Engine {
return historyUUID;
}
/** returns the force merge uuid for the engine */
@Nullable
public String getForceMergeUUID() {
return forceMergeUUID;
}
/** Returns how many bytes we are currently moving from indexing buffer to segments on disk */
@Override
public long getWritingBytes() {
@ -613,8 +627,8 @@ public class InternalEngine extends Engine {
/**
* Reads the current stored history ID from the IW commit data.
*/
private String loadHistoryUUID(final IndexWriter writer) {
final String uuid = commitDataAsMap(writer).get(HISTORY_UUID_KEY);
private String loadHistoryUUID(Map<String, String> commitData) {
final String uuid = commitData.get(HISTORY_UUID_KEY);
if (uuid == null) {
throw new IllegalStateException("commit doesn't contain history uuid");
}
@ -1945,7 +1959,8 @@ public class InternalEngine extends Engine {
@Override
public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes,
final boolean upgrade, final boolean upgradeOnlyAncientSegments) throws EngineException, IOException {
final boolean upgrade, final boolean upgradeOnlyAncientSegments,
final String forceMergeUUID) throws EngineException, IOException {
/*
* We do NOT acquire the readlock here since we are waiting on the merges to finish
* that's fine since the IW.rollback should stop all the threads and trigger an IOException
@ -1977,6 +1992,7 @@ public class InternalEngine extends Engine {
indexWriter.maybeMerge();
} else {
indexWriter.forceMerge(maxNumSegments, true /* blocks and waits for merges*/);
this.forceMergeUUID = forceMergeUUID;
}
if (flush) {
if (tryRenewSyncCommit() == false) {
@ -2448,6 +2464,10 @@ public class InternalEngine extends Engine {
if (softDeleteEnabled) {
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo()));
}
final String currentForceMergeUUID = forceMergeUUID;
if (currentForceMergeUUID != null) {
commitData.put(FORCE_MERGE_UUID_KEY, currentForceMergeUUID);
}
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
});

View File

@ -395,7 +395,7 @@ public class ReadOnlyEngine extends Engine {
@Override
public void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes,
boolean upgrade, boolean upgradeOnlyAncientSegments) {
boolean upgrade, boolean upgradeOnlyAncientSegments, String forceMergeUUID) {
}
@Override

View File

@ -1117,7 +1117,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
Engine engine = getEngine();
engine.forceMerge(forceMerge.flush(), forceMerge.maxNumSegments(),
forceMerge.onlyExpungeDeletes(), false, false);
forceMerge.onlyExpungeDeletes(), false, false, forceMerge.forceMergeUUID());
}
/**
@ -1133,7 +1133,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
final Engine engine = getEngine();
engine.forceMerge(true, // we need to flush at the end to make sure the upgrade is durable
Integer.MAX_VALUE, // we just want to upgrade the segments, not actually optimize to a single segment
false, true, upgrade.upgradeOnlyAncientSegments());
false, true, upgrade.upgradeOnlyAncientSegments(), null);
org.apache.lucene.util.Version version = minimumCompatibleVersion();
if (logger.isTraceEnabled()) {
logger.trace("upgraded segments for {} from version {} to version {}", shardId, previousVersion, version);

View File

@ -38,6 +38,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
@ -139,7 +140,7 @@ final class StoreRecovery {
// just trigger a merge to do housekeeping on the
// copied segments - we will also see them in stats etc.
indexShard.getEngine().forceMerge(false, -1, false,
false, false);
false, false, UUIDs.randomBase64UUID());
return true;
} catch (IOException ex) {
throw new IndexShardRecoveryException(indexShard.shardId(), "failed to recover from local shards", ex);

View File

@ -0,0 +1,89 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.forcemerge;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESIntegTestCase;
import java.io.IOException;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class ForceMergeIT extends ESIntegTestCase {
public void testForceMergeUUIDConsistent() throws IOException {
internalCluster().ensureAtLeastNumDataNodes(2);
final String index = "test-index";
createIndex(index,
Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1).build());
ensureGreen(index);
final ClusterState state = clusterService().state();
final IndexRoutingTable indexShardRoutingTables = state.routingTable().getIndicesRouting().get(index);
final IndexShardRoutingTable shardRouting = indexShardRoutingTables.getShards().get(0);
final String primaryNodeId = shardRouting.primaryShard().currentNodeId();
final String replicaNodeId = shardRouting.replicaShards().get(0).currentNodeId();
final Index idx = shardRouting.primaryShard().index();
final IndicesService primaryIndicesService =
internalCluster().getInstance(IndicesService.class, state.nodes().get(primaryNodeId).getName());
final IndicesService replicaIndicesService = internalCluster().getInstance(IndicesService.class,
state.nodes().get(replicaNodeId).getName());
final IndexShard primary = primaryIndicesService.indexService(idx).getShard(0);
final IndexShard replica = replicaIndicesService.indexService(idx).getShard(0);
assertThat(getForceMergeUUID(primary), nullValue());
assertThat(getForceMergeUUID(replica), nullValue());
final ForceMergeResponse forceMergeResponse =
client().admin().indices().prepareForceMerge(index).setMaxNumSegments(1).get();
assertThat(forceMergeResponse.getFailedShards(), is(0));
assertThat(forceMergeResponse.getSuccessfulShards(), is(2));
// Force flush to force a new commit that contains the force flush UUID
final FlushResponse flushResponse = client().admin().indices().prepareFlush(index).setForce(true).get();
assertThat(flushResponse.getFailedShards(), is(0));
assertThat(flushResponse.getSuccessfulShards(), is(2));
final String primaryForceMergeUUID = getForceMergeUUID(primary);
assertThat(primaryForceMergeUUID, notNullValue());
final String replicaForceMergeUUID = getForceMergeUUID(replica);
assertThat(replicaForceMergeUUID, notNullValue());
assertThat(primaryForceMergeUUID, is(replicaForceMergeUUID));
}
private static String getForceMergeUUID(IndexShard indexShard) throws IOException {
try (Engine.IndexCommitRef indexCommitRef = indexShard.acquireLastIndexCommit(true)) {
return indexCommitRef.getIndexCommit().getUserData().get(Engine.FORCE_MERGE_UUID_KEY);
}
}
}

View File

@ -536,7 +536,7 @@ public class InternalEngineTests extends EngineTestCase {
engine.flush();
final long gen1 = store.readLastCommittedSegmentsInfo().getGeneration();
// now, optimize and wait for merges, see that we have no merge flag
engine.forceMerge(true);
engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID());
for (Segment segment : engine.segments(false)) {
assertThat(segment.getMergeId(), nullValue());
@ -546,7 +546,7 @@ public class InternalEngineTests extends EngineTestCase {
final boolean flush = randomBoolean();
final long gen2 = store.readLastCommittedSegmentsInfo().getGeneration();
engine.forceMerge(flush);
engine.forceMerge(flush, 1, false, false, false, UUIDs.randomBase64UUID());
for (Segment segment : engine.segments(false)) {
assertThat(segment.getMergeId(), nullValue());
}
@ -1239,7 +1239,7 @@ public class InternalEngineTests extends EngineTestCase {
assertEquals(3, engine.segments(false).size());
engine.forceMerge(forceMergeFlushes, 1, false,
false, false);
false, false, UUIDs.randomBase64UUID());
if (forceMergeFlushes == false) {
engine.refresh("make all segments visible");
assertEquals(4, engine.segments(false).size());
@ -1476,7 +1476,7 @@ public class InternalEngineTests extends EngineTestCase {
try (Engine.Searcher test = engine.acquireSearcher("test")) {
assertEquals(numDocs, test.getIndexReader().numDocs());
}
engine.forceMerge(true, 1, false, false, false);
engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID());
engine.refresh("test");
assertEquals(engine.segments(true).size(), 1);
@ -1484,7 +1484,7 @@ public class InternalEngineTests extends EngineTestCase {
Engine.Index index = indexForDoc(doc);
engine.delete(new Engine.Delete(index.type(), index.id(), index.uid(), primaryTerm.get()));
//expunge deletes
engine.forceMerge(true, 10, true, false, false);
engine.forceMerge(true, 10, true, false, false, UUIDs.randomBase64UUID());
engine.refresh("test");
assertEquals(engine.segments(true).size(), 1);
@ -1497,7 +1497,7 @@ public class InternalEngineTests extends EngineTestCase {
index = indexForDoc(doc);
engine.delete(new Engine.Delete(index.type(), index.id(), index.uid(), primaryTerm.get()));
//expunge deletes
engine.forceMerge(true, 10, false, false, false);
engine.forceMerge(true, 10, false, false, false, UUIDs.randomBase64UUID());
engine.refresh("test");
assertEquals(engine.segments(true).size(), 1);
try (Engine.Searcher test = engine.acquireSearcher("test")) {
@ -1605,7 +1605,7 @@ public class InternalEngineTests extends EngineTestCase {
try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) {
safeCommitCheckpoint = Long.parseLong(safeCommit.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
}
engine.forceMerge(true, 1, false, false, false);
engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID());
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
Map<Long, Translog.Operation> ops = readAllOperationsInLucene(engine, mapperService)
.stream().collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity()));
@ -1630,7 +1630,7 @@ public class InternalEngineTests extends EngineTestCase {
globalCheckpoint.set(localCheckpoint);
engine.syncTranslog();
engine.forceMerge(true, 1, false, false, false);
engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID());
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
assertThat(readAllOperationsInLucene(engine, mapperService), hasSize(liveDocs.size()));
}
@ -1695,7 +1695,7 @@ public class InternalEngineTests extends EngineTestCase {
safeCommit.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
minSeqNoToRetain = Math.min(globalCheckpoint.get() + 1 - retainedExtraOps, safeCommitLocalCheckpoint + 1);
}
engine.forceMerge(true, 1, false, false, false);
engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID());
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
Map<Long, Translog.Operation> ops = readAllOperationsInLucene(engine, mapperService)
.stream().collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity()));
@ -1739,7 +1739,7 @@ public class InternalEngineTests extends EngineTestCase {
globalCheckpoint.set(engine.getPersistedLocalCheckpoint());
engine.syncTranslog();
}
engine.forceMerge(true, 1, false, false, false);
engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID());
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
assertThat(readAllOperationsInLucene(engine, mapperService), hasSize(liveDocsWithSource.size()));
}
@ -1776,7 +1776,7 @@ public class InternalEngineTests extends EngineTestCase {
indexed.countDown();
try {
engine.forceMerge(randomBoolean(), 1, false, randomBoolean(),
randomBoolean());
randomBoolean(), UUIDs.randomBase64UUID());
} catch (IOException e) {
return;
}
@ -1793,7 +1793,7 @@ public class InternalEngineTests extends EngineTestCase {
startGun.countDown();
int someIters = randomIntBetween(1, 10);
for (int i = 0; i < someIters; i++) {
engine.forceMerge(randomBoolean(), 1, false, randomBoolean(), randomBoolean());
engine.forceMerge(randomBoolean(), 1, false, randomBoolean(), randomBoolean(), UUIDs.randomBase64UUID());
}
indexed.await();
IOUtils.close(engine);
@ -3192,7 +3192,7 @@ public class InternalEngineTests extends EngineTestCase {
switch (operation) {
case "optimize": {
engine.forceMerge(true, 1, false, false,
false);
false, UUIDs.randomBase64UUID());
break;
}
case "refresh": {
@ -4435,7 +4435,7 @@ public class InternalEngineTests extends EngineTestCase {
engine.flush();
}
if (randomBoolean()) {
engine.forceMerge(randomBoolean(), between(1, 10), randomBoolean(), false, false);
engine.forceMerge(randomBoolean(), between(1, 10), randomBoolean(), false, false, UUIDs.randomBase64UUID());
}
}
if (engine.engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
@ -5135,7 +5135,7 @@ public class InternalEngineTests extends EngineTestCase {
engine.index(indexForDoc(doc));
assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(2));
engine.refresh("test");
engine.forceMerge(false, 1, false, false, false);
engine.forceMerge(false, 1, false, false, false, UUIDs.randomBase64UUID());
assertBusy(() -> {
// the merge listner runs concurrently after the force merge returned
assertThat(engine.shouldPeriodicallyFlush(), equalTo(true));
@ -5384,7 +5384,7 @@ public class InternalEngineTests extends EngineTestCase {
engine.flush();
}
if (rarely()) {
engine.forceMerge(true);
engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID());
}
}
MapperService mapperService = createMapperService("test");
@ -5465,7 +5465,7 @@ public class InternalEngineTests extends EngineTestCase {
equalTo(engine.getMinRetainedSeqNo()));
}
if (rarely()) {
engine.forceMerge(randomBoolean());
engine.forceMerge(randomBoolean(), 1, false, false, false, UUIDs.randomBase64UUID());
}
try (Closeable ignored = engine.acquireHistoryRetentionLock(Engine.HistorySource.INDEX)) {
long minRetainSeqNos = engine.getMinRetainedSeqNo();
@ -5955,7 +5955,7 @@ public class InternalEngineTests extends EngineTestCase {
for (int i = 0; i < numDocs; i++) {
index(engine, i);
}
engine.forceMerge(true);
engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID());
engine.delete(new Engine.Delete("_doc", "0", newUid("0"), primaryTerm.get()));
engine.refresh("test");
// now we have 2 segments since we now added a tombstone plus the old segment with the delete
@ -5974,7 +5974,7 @@ public class InternalEngineTests extends EngineTestCase {
}
// lets force merge the tombstone and the original segment and make sure the doc is still there but the ID term is gone
engine.forceMerge(true);
engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID());
engine.refresh("test");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
IndexReader reader = searcher.getIndexReader();
@ -6018,7 +6018,7 @@ public class InternalEngineTests extends EngineTestCase {
engine.flush();
}
if (randomInt(100) < 5) {
engine.forceMerge(randomBoolean(), 1, false, false, false);
engine.forceMerge(randomBoolean(), 1, false, false, false, UUIDs.randomBase64UUID());
}
}
if (randomBoolean()) {

View File

@ -4183,4 +4183,22 @@ public class IndexShardTests extends IndexShardTestCase {
recoveryThread.join();
shard.store().close();
}
public void testRecordsForceMerges() throws IOException {
IndexShard shard = newStartedShard(true);
final String initialForceMergeUUID = ((InternalEngine) shard.getEngine()).getForceMergeUUID();
assertThat(initialForceMergeUUID, nullValue());
final ForceMergeRequest firstForceMergeRequest = new ForceMergeRequest().maxNumSegments(1);
shard.forceMerge(firstForceMergeRequest);
final String secondForceMergeUUID = ((InternalEngine) shard.getEngine()).getForceMergeUUID();
assertThat(secondForceMergeUUID, notNullValue());
assertThat(secondForceMergeUUID, equalTo(firstForceMergeRequest.forceMergeUUID()));
final ForceMergeRequest secondForceMergeRequest = new ForceMergeRequest().maxNumSegments(1);
shard.forceMerge(secondForceMergeRequest);
final String thirdForceMergeUUID = ((InternalEngine) shard.getEngine()).getForceMergeUUID();
assertThat(thirdForceMergeUUID, notNullValue());
assertThat(thirdForceMergeUUID, not(equalTo(secondForceMergeUUID)));
assertThat(thirdForceMergeUUID, equalTo(secondForceMergeRequest.forceMergeUUID()));
closeShards(shard);
}
}