Initialize checkpoint tracker with allocation ID

This commit pushes the allocation ID down through to the global
checkpoint tracker at construction rather than when activated as a
primary.

Relates #26630
This commit is contained in:
Jason Tedor 2017-09-13 12:15:15 -04:00 committed by GitHub
parent 93da7720ff
commit 7be5ee5f28
9 changed files with 99 additions and 59 deletions

View File

@ -97,6 +97,7 @@ public abstract class Engine implements Closeable {
public static final String SYNC_COMMIT_ID = "sync_id";
protected final ShardId shardId;
protected final String allocationId;
protected final Logger logger;
protected final EngineConfig engineConfig;
protected final Store store;
@ -126,6 +127,7 @@ public abstract class Engine implements Closeable {
this.engineConfig = engineConfig;
this.shardId = engineConfig.getShardId();
this.allocationId = engineConfig.getAllocationId();
this.store = engineConfig.getStore();
this.logger = Loggers.getLogger(Engine.class, // we use the engine class directly here to make sure all subclasses have the same logger name
engineConfig.getIndexSettings().getSettings(), engineConfig.getShardId());

View File

@ -51,6 +51,7 @@ import java.util.List;
*/
public final class EngineConfig {
private final ShardId shardId;
private final String allocationId;
private final IndexSettings indexSettings;
private final ByteSizeValue indexingBufferSize;
private volatile boolean enableGcDeletes = true;
@ -109,7 +110,7 @@ public final class EngineConfig {
/**
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
*/
public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool,
public EngineConfig(OpenMode openMode, ShardId shardId, String allocationId, ThreadPool threadPool,
IndexSettings indexSettings, Engine.Warmer warmer, Store store,
MergePolicy mergePolicy, Analyzer analyzer,
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
@ -120,6 +121,7 @@ public final class EngineConfig {
throw new IllegalArgumentException("openMode must not be null");
}
this.shardId = shardId;
this.allocationId = allocationId;
this.indexSettings = indexSettings;
this.threadPool = threadPool;
this.warmer = warmer == null ? (a) -> {} : warmer;
@ -240,6 +242,15 @@ public final class EngineConfig {
*/
public ShardId getShardId() { return shardId; }
/**
* Returns the allocation ID for the shard.
*
* @return the allocation ID
*/
public String getAllocationId() {
return allocationId;
}
/**
* Returns the analyzer as the default analyzer in the engines {@link org.apache.lucene.index.IndexWriter}
*/

View File

@ -192,7 +192,7 @@ public class InternalEngine extends Engine {
throw new IllegalArgumentException(openMode.toString());
}
logger.trace("recovered [{}]", seqNoStats);
seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats);
seqNoService = sequenceNumberService(shardId, allocationId, engineConfig.getIndexSettings(), seqNoStats);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
indexWriter = writer;
translog = openTranslog(engineConfig, writer, translogDeletionPolicy, () -> seqNoService().getGlobalCheckpoint());
@ -283,10 +283,12 @@ public class InternalEngine extends Engine {
private static SequenceNumbersService sequenceNumberService(
final ShardId shardId,
final String allocationId,
final IndexSettings indexSettings,
final SeqNoStats seqNoStats) {
return new SequenceNumbersService(
shardId,
allocationId,
indexSettings,
seqNoStats.getMaxSeqNo(),
seqNoStats.getLocalCheckpoint(),

View File

@ -50,6 +50,8 @@ import java.util.stream.Collectors;
*/
public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
private final String allocationId;
/**
* The global checkpoint tracker can operate in two modes:
* - primary: this shard is in charge of collecting local checkpoint information from all shard copies and computing the global
@ -245,12 +247,18 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
* {@link SequenceNumbers#UNASSIGNED_SEQ_NO}.
*
* @param shardId the shard ID
* @param allocationId the allocation ID
* @param indexSettings the index settings
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
*/
GlobalCheckpointTracker(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) {
GlobalCheckpointTracker(
final ShardId shardId,
final String allocationId,
final IndexSettings indexSettings,
final long globalCheckpoint) {
super(shardId, indexSettings);
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
this.allocationId = allocationId;
this.primaryMode = false;
this.handoffInProgress = false;
this.appliedClusterStateVersion = -1L;
@ -310,7 +318,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
/**
* Initializes the global checkpoint tracker in primary mode (see {@link #primaryMode}. Called on primary activation or promotion.
*/
public synchronized void activatePrimaryMode(final String allocationId, final long localCheckpoint) {
public synchronized void activatePrimaryMode(final long localCheckpoint) {
assert invariant();
assert primaryMode == false;
assert localCheckpoints.get(allocationId) != null && localCheckpoints.get(allocationId).inSync &&

View File

@ -54,13 +54,14 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
*/
public SequenceNumbersService(
final ShardId shardId,
final String allocationId,
final IndexSettings indexSettings,
final long maxSeqNo,
final long localCheckpoint,
final long globalCheckpoint) {
super(shardId, indexSettings);
localCheckpointTracker = new LocalCheckpointTracker(indexSettings, maxSeqNo, localCheckpoint);
globalCheckpointTracker = new GlobalCheckpointTracker(shardId, indexSettings, globalCheckpoint);
globalCheckpointTracker = new GlobalCheckpointTracker(shardId, allocationId, indexSettings, globalCheckpoint);
}
/**
@ -201,7 +202,7 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
* Called on primary activation or promotion.
*/
public void activatePrimaryMode(final String allocationId, final long localCheckpoint) {
globalCheckpointTracker.activatePrimaryMode(allocationId, localCheckpoint);
globalCheckpointTracker.activatePrimaryMode(localCheckpoint);
}
/**

View File

@ -2074,7 +2074,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) {
Sort indexSort = indexSortSupplier.get();
return new EngineConfig(openMode, shardId,
return new EngineConfig(openMode, shardId, shardRouting.allocationId().getId(),
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener,
indexCache.query(), cachingPolicy, translogConfig,

View File

@ -73,6 +73,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
@ -80,6 +81,7 @@ import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
@ -198,6 +200,7 @@ import static org.hamcrest.Matchers.nullValue;
public class InternalEngineTests extends ESTestCase {
protected final ShardId shardId = new ShardId(new Index("index", "_na_"), 0);
protected final AllocationId allocationId = AllocationId.newInitializing();
private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY);
protected ThreadPool threadPool;
@ -264,11 +267,11 @@ public class InternalEngineTests extends ESTestCase {
}
public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, Analyzer analyzer) {
return new EngineConfig(openMode, config.getShardId(), config.getThreadPool(), config.getIndexSettings(), config.getWarmer(),
config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), config.getQueryCache(),
config.getQueryCachingPolicy(), config.getTranslogConfig(),
config.getFlushMergesAfter(), config.getRefreshListeners(), config.getIndexSort(), config.getTranslogRecoveryRunner());
return new EngineConfig(openMode, config.getShardId(), config.getAllocationId(), config.getThreadPool(), config.getIndexSettings(),
config.getWarmer(), config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(),
config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(), config.getIndexSort(),
config.getTranslogRecoveryRunner());
}
@Override
@ -447,7 +450,7 @@ public class InternalEngineTests extends ESTestCase {
indexSettings.getSettings()));
final List<ReferenceManager.RefreshListener> refreshListenerList =
refreshListener == null ? emptyList() : Collections.singletonList(refreshListener);
EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store,
EngineConfig config = new EngineConfig(openMode, shardId, allocationId.getId(), threadPool, indexSettings, null, store,
mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener,
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler);
@ -728,6 +731,7 @@ public class InternalEngineTests extends ESTestCase {
Store store = createStore();
InternalEngine engine = createEngine(store, createTempDir(), (config) -> new SequenceNumbersService(
config.getShardId(),
config.getAllocationId(),
config.getIndexSettings(),
maxSeqNo.get(),
localCheckpoint.get(),
@ -901,6 +905,7 @@ public class InternalEngineTests extends ESTestCase {
initialEngine = createEngine(store, createTempDir(), (config) ->
new SequenceNumbersService(
config.getShardId(),
config.getAllocationId(),
config.getIndexSettings(),
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
@ -2028,7 +2033,7 @@ public class InternalEngineTests extends ESTestCase {
try {
initialEngine = engine;
final ShardRouting primary = TestShardRouting.newShardRouting(shardId, "node1", true, ShardRoutingState.STARTED);
final ShardRouting primary = TestShardRouting.newShardRouting("test", shardId.id(), "node1", null, true, ShardRoutingState.STARTED, allocationId);
final ShardRouting replica = TestShardRouting.newShardRouting(shardId, "node2", false, ShardRoutingState.STARTED);
initialEngine.seqNoService().updateAllocationIdsFromMaster(1L, new HashSet<>(Arrays.asList(primary.allocationId().getId(),
replica.allocationId().getId())),
@ -2788,12 +2793,11 @@ public class InternalEngineTests extends ESTestCase {
TranslogConfig translogConfig = new TranslogConfig(shardId, translog.location(), config.getIndexSettings(),
BigArrays.NON_RECYCLING_INSTANCE);
EngineConfig brokenConfig = new EngineConfig(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, shardId, threadPool,
config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(),
config.getSimilarity(), new CodecService(null, logger), config.getEventListener(),
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
TimeValue.timeValueMinutes(5), config.getRefreshListeners(), null,
config.getTranslogRecoveryRunner());
EngineConfig brokenConfig = new EngineConfig(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, shardId, allocationId.getId(),
threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), config.getRefreshListeners(),
null, config.getTranslogRecoveryRunner());
try {
InternalEngine internalEngine = new InternalEngine(brokenConfig);
@ -3628,6 +3632,7 @@ public class InternalEngineTests extends ESTestCase {
final AtomicLong expectedLocalCheckpoint) {
return new SequenceNumbersService(
shardId,
allocationId.getId(),
defaultSettings,
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
@ -3839,7 +3844,7 @@ public class InternalEngineTests extends ESTestCase {
final int globalCheckpoint = randomIntBetween(0, localCheckpoint);
try {
final SequenceNumbersService seqNoService =
new SequenceNumbersService(shardId, defaultSettings, maxSeqNo, localCheckpoint, globalCheckpoint) {
new SequenceNumbersService(shardId, allocationId.getId(), defaultSettings, maxSeqNo, localCheckpoint, globalCheckpoint) {
@Override
public long generateSeqNo() {
throw new UnsupportedOperationException();
@ -3986,6 +3991,7 @@ public class InternalEngineTests extends ESTestCase {
final SequenceNumbersService seqNoService =
new SequenceNumbersService(
shardId,
allocationId.getId(),
defaultSettings,
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,

View File

@ -25,11 +25,13 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
@ -46,7 +48,6 @@ import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -61,25 +62,11 @@ import static org.hamcrest.Matchers.not;
public class GlobalCheckpointTrackerTests extends ESTestCase {
GlobalCheckpointTracker tracker;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
tracker =
new GlobalCheckpointTracker(
new ShardId("test", "_na_", 0),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
UNASSIGNED_SEQ_NO);
}
public void testEmptyShards() {
final GlobalCheckpointTracker tracker = newTracker(AllocationId.newInitializing());
assertThat(tracker.getGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
}
private final AtomicInteger aIdGenerator = new AtomicInteger();
private Map<AllocationId, Long> randomAllocationsWithLocalCheckpoints(int min, int max) {
Map<AllocationId, Long> allocations = new HashMap<>();
for (int i = randomIntBetween(min, max); i > 0; i--) {
@ -117,6 +104,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
// it is however nice not to assume this on this level and check we do the right thing.
final long minLocalCheckpoint = allocations.values().stream().min(Long::compare).orElse(UNASSIGNED_SEQ_NO);
final GlobalCheckpointTracker tracker = newTracker(active.iterator().next());
assertThat(tracker.getGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
logger.info("--> using allocations");
@ -133,7 +121,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
});
tracker.updateFromMaster(initialClusterStateVersion, ids(active), routingTable(initializing), emptySet());
tracker.activatePrimaryMode(active.iterator().next().getId(), NO_OPS_PERFORMED);
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
initializing.forEach(aId -> markAllocationIdAsInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED));
allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId.getId(), allocations.get(aId)));
@ -179,9 +167,10 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
final Map<AllocationId, Long> assigned = new HashMap<>();
assigned.putAll(active);
assigned.putAll(initializing);
tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet()), emptySet());
AllocationId primary = active.keySet().iterator().next();
tracker.activatePrimaryMode(primary.getId(), NO_OPS_PERFORMED);
final GlobalCheckpointTracker tracker = newTracker(primary);
tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet()), emptySet());
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
randomSubsetOf(initializing.keySet()).forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED));
final AllocationId missingActiveID = randomFrom(active.keySet());
assigned
@ -202,9 +191,11 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
final Map<AllocationId, Long> active = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<AllocationId, Long> initializing = randomAllocationsWithLocalCheckpoints(2, 5);
logger.info("active: {}, initializing: {}", active, initializing);
tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet()), emptySet());
AllocationId primary = active.keySet().iterator().next();
tracker.activatePrimaryMode(primary.getId(), NO_OPS_PERFORMED);
final GlobalCheckpointTracker tracker = newTracker(primary);
tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet()), emptySet());
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
randomSubsetOf(randomIntBetween(1, initializing.size() - 1),
initializing.keySet()).forEach(aId -> markAllocationIdAsInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED));
@ -221,8 +212,9 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
final Map<AllocationId, Long> active = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<AllocationId, Long> initializing = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<AllocationId, Long> nonApproved = randomAllocationsWithLocalCheckpoints(1, 5);
final GlobalCheckpointTracker tracker = newTracker(active.keySet().iterator().next());
tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet()), emptySet());
tracker.activatePrimaryMode(active.keySet().iterator().next().getId(), NO_OPS_PERFORMED);
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
initializing.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED));
nonApproved.keySet().forEach(k ->
expectThrows(IllegalStateException.class, () -> markAllocationIdAsInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED)));
@ -251,8 +243,9 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
if (randomBoolean()) {
allocations.putAll(initializingToBeRemoved);
}
final GlobalCheckpointTracker tracker = newTracker(active.iterator().next());
tracker.updateFromMaster(initialClusterStateVersion, ids(active), routingTable(initializing), emptySet());
tracker.activatePrimaryMode(active.iterator().next().getId(), NO_OPS_PERFORMED);
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
if (randomBoolean()) {
initializingToStay.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED));
} else {
@ -286,9 +279,10 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
final AtomicBoolean complete = new AtomicBoolean();
final AllocationId inSyncAllocationId = AllocationId.newInitializing();
final AllocationId trackingAllocationId = AllocationId.newInitializing();
final GlobalCheckpointTracker tracker = newTracker(inSyncAllocationId);
tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(inSyncAllocationId.getId()),
routingTable(Collections.singleton(trackingAllocationId)), emptySet());
tracker.activatePrimaryMode(inSyncAllocationId.getId(), globalCheckpoint);
tracker.activatePrimaryMode(globalCheckpoint);
final Thread thread = new Thread(() -> {
try {
// synchronize starting with the test thread
@ -326,6 +320,14 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
thread.join();
}
private GlobalCheckpointTracker newTracker(final AllocationId allocationId) {
return new GlobalCheckpointTracker(
new ShardId("test", "_na_", 0),
allocationId.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
UNASSIGNED_SEQ_NO);
}
public void testWaitForAllocationIdToBeInSyncCanBeInterrupted() throws BrokenBarrierException, InterruptedException {
final int localCheckpoint = randomIntBetween(1, 32);
final int globalCheckpoint = randomIntBetween(localCheckpoint + 1, 64);
@ -333,9 +335,10 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
final AtomicBoolean interrupted = new AtomicBoolean();
final AllocationId inSyncAllocationId = AllocationId.newInitializing();
final AllocationId trackingAllocationId = AllocationId.newInitializing();
final GlobalCheckpointTracker tracker = newTracker(inSyncAllocationId);
tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(inSyncAllocationId.getId()),
routingTable(Collections.singleton(trackingAllocationId)), emptySet());
tracker.activatePrimaryMode(inSyncAllocationId.getId(), globalCheckpoint);
tracker.activatePrimaryMode(globalCheckpoint);
final Thread thread = new Thread(() -> {
try {
// synchronize starting with the test thread
@ -380,9 +383,10 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
final Set<AllocationId> activeAllocationIds = activeAndInitializingAllocationIds.v1();
final Set<AllocationId> initializingIds = activeAndInitializingAllocationIds.v2();
IndexShardRoutingTable routingTable = routingTable(initializingIds);
tracker.updateFromMaster(initialClusterStateVersion, ids(activeAllocationIds), routingTable, emptySet());
AllocationId primaryId = activeAllocationIds.iterator().next();
tracker.activatePrimaryMode(primaryId.getId(), NO_OPS_PERFORMED);
final GlobalCheckpointTracker tracker = newTracker(primaryId);
tracker.updateFromMaster(initialClusterStateVersion, ids(activeAllocationIds), routingTable, emptySet());
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
assertThat(tracker.getReplicationGroup().getInSyncAllocationIds(), equalTo(ids(activeAllocationIds)));
assertThat(tracker.getReplicationGroup().getRoutingTable(), equalTo(routingTable));
@ -529,9 +533,10 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
final CyclicBarrier barrier = new CyclicBarrier(4);
final int activeLocalCheckpoint = randomIntBetween(0, Integer.MAX_VALUE - 1);
final GlobalCheckpointTracker tracker = newTracker(active);
tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(active.getId()),
routingTable(Collections.singleton(initializing)), emptySet());
tracker.activatePrimaryMode(active.getId(), activeLocalCheckpoint);
tracker.activatePrimaryMode(activeLocalCheckpoint);
final int nextActiveLocalCheckpoint = randomIntBetween(activeLocalCheckpoint + 1, Integer.MAX_VALUE);
final Thread activeThread = new Thread(() -> {
try {
@ -574,12 +579,15 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
}
public void testPrimaryContextHandoff() throws IOException {
GlobalCheckpointTracker oldPrimary = new GlobalCheckpointTracker(new ShardId("test", "_na_", 0),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), UNASSIGNED_SEQ_NO);
GlobalCheckpointTracker newPrimary = new GlobalCheckpointTracker(new ShardId("test", "_na_", 0),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), UNASSIGNED_SEQ_NO);
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", Settings.EMPTY);
final ShardId shardId = new ShardId("test", "_na_", 0);
FakeClusterState clusterState = initialState();
GlobalCheckpointTracker oldPrimary =
new GlobalCheckpointTracker(shardId, randomFrom(ids(clusterState.inSyncIds)), indexSettings, UNASSIGNED_SEQ_NO);
GlobalCheckpointTracker newPrimary =
new GlobalCheckpointTracker(shardId, UUIDs.randomBase64UUID(random()), indexSettings, UNASSIGNED_SEQ_NO);
clusterState.apply(oldPrimary);
clusterState.apply(newPrimary);
@ -686,9 +694,10 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
public void testIllegalStateExceptionIfUnknownAllocationId() {
final AllocationId active = AllocationId.newInitializing();
final AllocationId initializing = AllocationId.newInitializing();
final GlobalCheckpointTracker tracker = newTracker(active);
tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(active.getId()),
routingTable(Collections.singleton(initializing)), emptySet());
tracker.activatePrimaryMode(active.getId(), NO_OPS_PERFORMED);
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
expectThrows(IllegalStateException.class, () -> tracker.initiateTracking(randomAlphaOfLength(10)));
expectThrows(IllegalStateException.class, () -> tracker.markAllocationIdAsInSync(randomAlphaOfLength(10), randomNonNegativeLong()));
@ -731,7 +740,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
}
private static void activatePrimary(FakeClusterState clusterState, GlobalCheckpointTracker gcp) {
gcp.activatePrimaryMode(randomFrom(ids(clusterState.inSyncIds)), randomIntBetween(Math.toIntExact(NO_OPS_PERFORMED), 10));
gcp.activatePrimaryMode(randomIntBetween(Math.toIntExact(NO_OPS_PERFORMED), 10));
}
private static void randomLocalCheckpointUpdate(GlobalCheckpointTracker gcp) {

View File

@ -28,6 +28,7 @@ import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.uid.Versions;
@ -98,6 +99,7 @@ public class RefreshListenersTests extends ESTestCase {
threadPool = new TestThreadPool(getTestName());
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY);
ShardId shardId = new ShardId(new Index("index", "_na_"), 1);
String allocationId = UUIDs.randomBase64UUID(random());
Directory directory = newDirectory();
DirectoryService directoryService = new DirectoryService(shardId, indexSettings) {
@Override
@ -115,10 +117,9 @@ public class RefreshListenersTests extends ESTestCase {
// we don't need to notify anybody in this test
}
};
EngineConfig config = new EngineConfig(EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, shardId, threadPool, indexSettings, null,
store, newMergePolicy(), iwc.getAnalyzer(),
iwc.getSimilarity(), new CodecService(null, logger), eventListener,
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
EngineConfig config = new EngineConfig(EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, shardId, allocationId, threadPool,
indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger),
eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), null, null);
engine = new InternalEngine(config);
listeners.setTranslog(engine.getTranslog());