Introduced Global checkpoints for Sequence Numbers (#15485)

Global checkpoints are update by the primary and represent the common part of history across shard copies, as know at a given time. The primary is also in charge of periodically broadcast this information to the replicas. See #10708 for more details.
This commit is contained in:
Boaz Leskes 2016-06-06 12:53:04 +02:00
parent 6eb96e5fd8
commit 4844325921
40 changed files with 1216 additions and 158 deletions

View File

@ -33,10 +33,10 @@ import java.util.function.Supplier;
*/
public class ActionListenerResponseHandler<Response extends TransportResponse> extends BaseTransportResponseHandler<Response> {
private final ActionListener<Response> listener;
private final ActionListener<? super Response> listener;
private final Supplier<Response> responseSupplier;
public ActionListenerResponseHandler(ActionListener<Response> listener, Supplier<Response> responseSupplier) {
public ActionListenerResponseHandler(ActionListener<? super Response> listener, Supplier<Response> responseSupplier) {
this.listener = Objects.requireNonNull(listener);
this.responseSupplier = Objects.requireNonNull(responseSupplier);
}

View File

@ -35,7 +35,6 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
import java.util.ArrayList;
@ -85,7 +84,8 @@ public class ReplicationOperation<Request extends ReplicationRequest<Request>, R
void execute() throws Exception {
final String writeConsistencyFailure = checkWriteConsistency ? checkWriteConsistency() : null;
final ShardId shardId = primary.routingEntry().shardId();
final ShardRouting primaryRouting = primary.routingEntry();
final ShardId shardId = primaryRouting.shardId();
if (writeConsistencyFailure != null) {
finishAsFailed(new UnavailableShardsException(shardId,
"{} Timeout: [{}], request: [{}]", writeConsistencyFailure, request.timeout(), request));
@ -96,6 +96,7 @@ public class ReplicationOperation<Request extends ReplicationRequest<Request>, R
pendingShards.incrementAndGet(); // increase by 1 until we finish all primary coordination
Tuple<Response, ReplicaRequest> primaryResponse = primary.perform(request);
successfulShards.incrementAndGet(); // mark primary as successful
primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint());
finalResponse = primaryResponse.v1();
ReplicaRequest replicaRequest = primaryResponse.v2();
assert replicaRequest.primaryTerm() > 0 : "replicaRequest doesn't have a primary term";
@ -107,7 +108,7 @@ public class ReplicationOperation<Request extends ReplicationRequest<Request>, R
// to the recovery target. If we use an old cluster state, we may miss a relocation that has started since then.
// If the index gets deleted after primary operation, we skip replication
List<ShardRouting> shards = getShards(shardId, clusterStateSupplier.get());
final String localNodeId = primary.routingEntry().currentNodeId();
final String localNodeId = primaryRouting.currentNodeId();
for (final ShardRouting shard : shards) {
if (executeOnReplicas == false || shard.unassigned()) {
if (shard.primary() == false) {
@ -136,10 +137,11 @@ public class ReplicationOperation<Request extends ReplicationRequest<Request>, R
totalShards.incrementAndGet();
pendingShards.incrementAndGet();
replicasProxy.performOn(shard, replicaRequest, new ActionListener<TransportResponse.Empty>() {
replicasProxy.performOn(shard, replicaRequest, new ActionListener<ReplicaResponse>() {
@Override
public void onResponse(TransportResponse.Empty empty) {
public void onResponse(ReplicaResponse response) {
successfulShards.incrementAndGet();
primary.updateLocalCheckpointForShard(response.allocationId(), response.localCheckpoint());
decPendingAndFinishIfNeeded();
}
@ -301,18 +303,30 @@ public class ReplicationOperation<Request extends ReplicationRequest<Request>, R
*/
Tuple<Response, ReplicaRequest> perform(Request request) throws Exception;
/**
* Notifies the primary of a local checkpoint for the given allocation.
*
* Note: The primary will use this information to advance the global checkpoint if possible.
*
* @param allocationId allocation ID of the shard corresponding to the supplied local checkpoint
* @param checkpoint the *local* checkpoint for the shard
*/
void updateLocalCheckpointForShard(String allocationId, long checkpoint);
/** returns the local checkpoint of the primary shard */
long localCheckpoint();
}
interface Replicas<ReplicaRequest extends ReplicationRequest<ReplicaRequest>> {
/**
* performs the the given request on the specified replica
*
* @param replica {@link ShardRouting} of the shard this request should be executed on
* @param replicaRequest operation to peform
* @param listener a callback to call once the operation has been complicated, either successfully or with an error.
*/
void performOn(ShardRouting replica, ReplicaRequest replicaRequest, ActionListener<TransportResponse.Empty> listener);
void performOn(ShardRouting replica, ReplicaRequest replicaRequest, ActionListener<ReplicaResponse> listener);
/**
* Fail the specified shard, removing it from the current set of active shards
@ -331,6 +345,18 @@ public class ReplicationOperation<Request extends ReplicationRequest<Request>, R
Consumer<Throwable> onPrimaryDemoted, Consumer<Throwable> onIgnoredFailure);
}
/**
* An interface to encapsulate the metadata needed from replica shards when they respond to operations performed on them
*/
interface ReplicaResponse {
/** the local check point for the shard. see {@link org.elasticsearch.index.seqno.SequenceNumbersService#getLocalCheckpoint()} */
long localCheckpoint();
/** the allocation id of the replica shard */
String allocationId();
}
public static class RetryOnPrimaryException extends ElasticsearchException {
public RetryOnPrimaryException(ShardId shardId, String msg) {
this(shardId, msg, null);

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.support.replication;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ReplicationResponse;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.WriteConsistencyLevel;
@ -43,6 +44,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -425,16 +427,18 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
@Override
protected void doRun() throws Exception {
setPhase(task, "replica");
final ReplicaResponse response;
assert request.shardId() != null : "request shardId must be set";
try (Releasable ignored = acquireReplicaOperationLock(request.shardId(), request.primaryTerm())) {
try (ShardReference replica = getReplicaShardReference(request.shardId(), request.primaryTerm())) {
shardOperationOnReplica(request);
response = new ReplicaResponse(replica.routingEntry().allocationId().getId(), replica.getLocalCheckpoint());
if (logger.isTraceEnabled()) {
logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction, request.shardId(),
request);
}
}
setPhase(task, "finished");
channel.sendResponse(TransportResponse.Empty.INSTANCE);
channel.sendResponse(response);
}
}
@ -705,13 +709,13 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
}
/**
* Acquire an operation on replicas. The lock is closed as soon as
* Get a reference to a replica shard. The reference is released as soon as
* replication is completed on the node.
*/
protected Releasable acquireReplicaOperationLock(ShardId shardId, long primaryTerm) {
protected ShardReference getReplicaShardReference(ShardId shardId, long primaryTerm) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
return indexShard.acquireReplicaOperationLock(primaryTerm);
return new ShardReference(indexShard, indexShard.acquireReplicaOperationLock(primaryTerm));
}
/**
@ -722,12 +726,12 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
return IndexMetaData.isIndexUsingShadowReplicas(settings) == false;
}
class PrimaryShardReference implements ReplicationOperation.Primary<Request, ReplicaRequest, Response>, Releasable {
class ShardReference implements Releasable {
private final IndexShard indexShard;
protected final IndexShard indexShard;
private final Releasable operationLock;
PrimaryShardReference(IndexShard indexShard, Releasable operationLock) {
ShardReference(IndexShard indexShard, Releasable operationLock) {
this.indexShard = indexShard;
this.operationLock = operationLock;
}
@ -737,6 +741,22 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
operationLock.close();
}
public long getLocalCheckpoint() {
return indexShard.getLocalCheckpoint();
}
public ShardRouting routingEntry() {
return indexShard.routingEntry();
}
}
class PrimaryShardReference extends ShardReference implements ReplicationOperation.Primary<Request, ReplicaRequest, Response> {
PrimaryShardReference(IndexShard indexShard, Releasable operationLock) {
super(indexShard, operationLock);
}
public boolean isRelocated() {
return indexShard.state() == IndexShardState.RELOCATED;
}
@ -758,15 +778,59 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
}
@Override
public ShardRouting routingEntry() {
return indexShard.routingEntry();
public void updateLocalCheckpointForShard(String allocationId, long checkpoint) {
indexShard.updateLocalCheckpointForShard(allocationId, checkpoint);
}
@Override
public long localCheckpoint() {
return indexShard.getLocalCheckpoint();
}
}
static class ReplicaResponse extends ActionResponse implements ReplicationOperation.ReplicaResponse {
private long localCheckpoint;
private String allocationId;
ReplicaResponse() {
}
ReplicaResponse(String allocationId, long localCheckpoint) {
this.allocationId = allocationId;
this.localCheckpoint = localCheckpoint;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
localCheckpoint = in.readZLong();
allocationId = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeZLong(localCheckpoint);
out.writeString(allocationId);
}
@Override
public long localCheckpoint() {
return localCheckpoint;
}
@Override
public String allocationId() {
return allocationId;
}
}
final class ReplicasProxy implements ReplicationOperation.Replicas<ReplicaRequest> {
@Override
public void performOn(ShardRouting replica, ReplicaRequest request, ActionListener<TransportResponse.Empty> listener) {
public void performOn(ShardRouting replica, ReplicaRequest request, ActionListener<ReplicationOperation.ReplicaResponse> listener) {
String nodeId = replica.currentNodeId();
final DiscoveryNode node = clusterService.state().nodes().get(nodeId);
if (node == null) {
@ -774,7 +838,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
return;
}
transportService.sendRequest(node, transportReplicaAction, request, transportOptions,
new ActionListenerResponseHandler<>(listener, () -> TransportResponse.Empty.INSTANCE));
new ActionListenerResponseHandler<>(listener, ReplicaResponse::new));
}
@Override

View File

@ -65,7 +65,6 @@ import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndexCreationException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.script.ScriptService;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@ -299,7 +298,8 @@ public class MetaDataCreateIndexService extends AbstractComponent {
// Set up everything, now locally create the index to see that things are ok, and apply
final IndexMetaData tmpImd = IndexMetaData.builder(request.index()).settings(actualIndexSettings).build();
// create the index here (on the master) to validate it can be created, as well as adding the mapping
final IndexService indexService = indicesService.createIndex(nodeServicesProvider, tmpImd, Collections.emptyList());
final IndexService indexService = indicesService.createIndex(nodeServicesProvider, tmpImd,
Collections.emptyList(), shardId -> {});
createdIndex = indexService.index();
// now add the mappings
MapperService mapperService = indexService.mapperService();

View File

@ -104,7 +104,8 @@ public class MetaDataIndexAliasesService extends AbstractComponent {
if (indexService == null) {
// temporarily create the index and add mappings so we can parse the filter
try {
indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.emptyList());
indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData,
Collections.emptyList(), shardId -> {});
for (ObjectCursor<MappingMetaData> cursor : indexMetaData.getMappings().values()) {
MappingMetaData mappingMetaData = cursor.value;
indexService.mapperService().merge(mappingMetaData.type(), mappingMetaData.source(), MapperService.MergeReason.MAPPING_RECOVERY, false);

View File

@ -42,7 +42,6 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.NodeServicesProvider;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.percolator.PercolatorFieldMapper;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidTypeNameException;
@ -140,7 +139,7 @@ public class MetaDataMappingService extends AbstractComponent {
IndexService indexService = indicesService.indexService(indexMetaData.getIndex());
if (indexService == null) {
// we need to create the index here, and add the current mapping to it, so we can merge
indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.emptyList());
indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.emptyList(), shardId -> {});
removeIndex = true;
for (ObjectCursor<MappingMetaData> metaData : indexMetaData.getMappings().values()) {
// don't apply the default mapping, it has been applied when the mapping was created
@ -224,7 +223,7 @@ public class MetaDataMappingService extends AbstractComponent {
// close it later once we are done with mapping update
indicesToClose.add(indexMetaData.getIndex());
IndexService indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData,
Collections.emptyList());
Collections.emptyList(), shardId -> {});
// add mappings for all types, we need them for cross-type validation
for (ObjectCursor<MappingMetaData> mapping : indexMetaData.getMappings().values()) {
indexService.mapperService().merge(mapping.value.type(), mapping.value.source(),

View File

@ -183,6 +183,15 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return this.activeShards;
}
/**
* Returns a {@link List} of all initializing shards, including target shards of relocations
*
* @return a {@link List} of shards
*/
public List<ShardRouting> getAllInitializingShards() {
return this.allInitializingShards;
}
/**
* Returns a {@link List} of active shards
*

View File

@ -37,6 +37,7 @@ import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.percolator.PercolatorFieldMapper;
import org.elasticsearch.index.seqno.LocalCheckpointService;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.FsDirectoryService;
import org.elasticsearch.index.store.IndexStore;
@ -116,6 +117,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.QUERY_STRING_LENIENT_SETTING,
IndexSettings.ALLOW_UNMAPPED,
IndexSettings.INDEX_CHECK_ON_STARTUP,
IndexSettings.INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL,
LocalCheckpointService.SETTINGS_BIT_ARRAYS_SIZE,
ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING,
IndexSettings.INDEX_GC_DELETES_SETTING,
IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING,

View File

@ -26,14 +26,15 @@ import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.index.cache.query.IndexQueryCache;
import org.elasticsearch.index.cache.query.DisabledQueryCache;
import org.elasticsearch.index.cache.query.IndexQueryCache;
import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.similarity.BM25SimilarityProvider;
import org.elasticsearch.index.similarity.SimilarityProvider;
import org.elasticsearch.index.similarity.SimilarityService;
@ -291,7 +292,8 @@ public final class IndexModule {
public IndexService newIndexService(NodeEnvironment environment, IndexService.ShardStoreDeleter shardStoreDeleter,
NodeServicesProvider servicesProvider, IndicesQueryCache indicesQueryCache,
MapperRegistry mapperRegistry, IndicesFieldDataCache indicesFieldDataCache) throws IOException {
MapperRegistry mapperRegistry, Consumer<ShardId> globalCheckpointSyncer,
IndicesFieldDataCache indicesFieldDataCache) throws IOException {
final IndexEventListener eventListener = freeze();
IndexSearcherWrapperFactory searcherWrapperFactory = indexSearcherWrapper.get() == null
? (shard) -> null : indexSearcherWrapper.get();
@ -326,7 +328,7 @@ public final class IndexModule {
}
return new IndexService(indexSettings, environment, new SimilarityService(indexSettings, similarities), shardStoreDeleter,
analysisRegistry, engineFactory.get(), servicesProvider, queryCache, store, eventListener, searcherWrapperFactory,
mapperRegistry, indicesFieldDataCache, searchOperationListeners, indexOperationListeners);
mapperRegistry, indicesFieldDataCache, globalCheckpointSyncer, searchOperationListeners, indexOperationListeners);
}
/**

View File

@ -84,6 +84,7 @@ import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
@ -108,6 +109,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
private final SimilarityService similarityService;
private final EngineFactory engineFactory;
private final IndexWarmer warmer;
private final Consumer<ShardId> globalCheckpointSyncer;
private volatile Map<Integer, IndexShard> shards = emptyMap();
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean deleted = new AtomicBoolean(false);
@ -118,6 +120,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
private volatile AsyncTranslogFSync fsyncTask;
private final ThreadPool threadPool;
private final BigArrays bigArrays;
private final AsyncGlobalCheckpointTask globalCheckpointTask;
public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv,
SimilarityService similarityService,
@ -131,10 +134,12 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
IndexModule.IndexSearcherWrapperFactory wrapperFactory,
MapperRegistry mapperRegistry,
IndicesFieldDataCache indicesFieldDataCache,
Consumer<ShardId> globalCheckpointSyncer,
List<SearchOperationListener> searchOperationListeners,
List<IndexingOperationListener> indexingOperationListeners) throws IOException {
super(indexSettings);
this.indexSettings = indexSettings;
this.globalCheckpointSyncer = globalCheckpointSyncer;
this.analysisService = registry.build(indexSettings);
this.similarityService = similarityService;
this.mapperService = new MapperService(indexSettings, analysisService, similarityService, mapperRegistry,
@ -156,6 +161,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
this.engineFactory = engineFactory;
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
this.searcherWrapper = wrapperFactory.newWrapper(this);
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners);
this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners);
// kick off async ops for the first shard in this index
@ -236,7 +242,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
}
}
} finally {
IOUtils.close(bitsetFilterCache, indexCache, indexFieldData, analysisService, refreshTask, fsyncTask);
IOUtils.close(bitsetFilterCache, indexCache, indexFieldData, analysisService, refreshTask, fsyncTask, globalCheckpointTask);
}
}
}
@ -339,6 +345,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
} else {
indexShard = new IndexShard(routing, this.indexSettings, path, store, indexCache, mapperService, similarityService,
indexFieldData, engineFactory, eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer,
() -> globalCheckpointSyncer.accept(shardId),
searchOperationListeners, indexingOperationListeners);
}
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
@ -718,6 +725,31 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
}
}
private void maybeUpdateGlobalCheckpoints() {
for (IndexShard shard : this.shards.values()) {
if (shard.routingEntry().primary()) {
switch (shard.state()) {
case CREATED:
case RECOVERING:
case CLOSED:
case RELOCATED:
continue;
case POST_RECOVERY:
case STARTED:
try {
shard.updateGlobalCheckpointOnPrimary();
} catch (EngineClosedException | AlreadyClosedException ex) {
// fine - continue, the shard was concurrently closed on us.
}
continue;
default:
throw new IllegalStateException("unknown state: " + shard.state());
}
}
}
}
static abstract class BaseAsyncTask implements Runnable, Closeable {
protected final IndexService indexService;
protected final ThreadPool threadPool;
@ -825,6 +857,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
protected String getThreadPool() {
return ThreadPool.Names.FLUSH;
}
@Override
protected void runInternal() {
indexService.maybeFSyncTranslogs();
@ -858,6 +891,23 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
}
}
final class AsyncGlobalCheckpointTask extends BaseAsyncTask {
AsyncGlobalCheckpointTask(IndexService indexService) {
super(indexService, indexService.getIndexSettings().getGlobalCheckpointInterval());
}
@Override
protected void runInternal() {
indexService.maybeUpdateGlobalCheckpoints();
}
@Override
public String toString() {
return "global_checkpoint";
}
}
AsyncRefreshTask getRefreshTask() { // for tests
return refreshTask;
}
@ -865,4 +915,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
AsyncTranslogFSync getFsyncTask() { // for tests
return fsyncTask;
}
AsyncGlobalCheckpointTask getGlobalCheckpointTask() { // for tests
return globalCheckpointTask;
}
}

View File

@ -106,6 +106,9 @@ public final class IndexSettings {
Setting.byteSizeSetting("index.translog.flush_threshold_size", new ByteSizeValue(512, ByteSizeUnit.MB), Property.Dynamic,
Property.IndexScope);
public static final Setting<TimeValue> INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL =
Setting.timeSetting("index.seq_no.checkpoint_sync_interval", new TimeValue(30, TimeUnit.SECONDS),
new TimeValue(-1, TimeUnit.MILLISECONDS), Property.Dynamic, Property.IndexScope);
/**
* Index setting to enable / disable deletes garbage collection.
@ -136,6 +139,7 @@ public final class IndexSettings {
private volatile Translog.Durability durability;
private final TimeValue syncInterval;
private volatile TimeValue refreshInterval;
private final TimeValue globalCheckpointInterval;
private volatile ByteSizeValue flushThresholdSize;
private final MergeSchedulerConfig mergeSchedulerConfig;
private final MergePolicyConfig mergePolicyConfig;
@ -222,6 +226,7 @@ public final class IndexSettings {
this.durability = scopedSettings.get(INDEX_TRANSLOG_DURABILITY_SETTING);
syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings);
refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING);
globalCheckpointInterval = scopedSettings.get(INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL);
flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING);
mergeSchedulerConfig = new MergeSchedulerConfig(this);
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
@ -437,6 +442,13 @@ public final class IndexSettings {
return refreshInterval;
}
/**
* Returns this interval in which the primary shards of this index should check and advance the global checkpoint
*/
public TimeValue getGlobalCheckpointInterval() {
return globalCheckpointInterval;
}
/**
* Returns the transaction log threshold size when to forcefully flush the index and clear the transaction log.
*/

View File

@ -28,15 +28,12 @@ import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.join.BitSetProducer;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
@ -64,10 +61,8 @@ import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
@ -76,7 +71,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@ -393,8 +387,8 @@ public abstract class Engine implements Closeable {
return new CommitStats(getLastCommittedSegmentInfos());
}
/** get sequence number related stats */
public abstract SeqNoStats seqNoStats();
/** get the sequence number service */
public abstract SequenceNumbersService seqNoService();
/**
* Read the last segments info from the commit pointed to by the searcher manager

View File

@ -56,7 +56,6 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
import org.elasticsearch.index.shard.ShardId;
@ -1149,7 +1148,7 @@ public class InternalEngine extends Engine {
}
@Override
public SeqNoStats seqNoStats() {
return seqNoService.stats();
public SequenceNumbersService seqNoService() {
return seqNoService;
}
}

View File

@ -30,7 +30,7 @@ import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.translog.Translog;
import java.io.IOException;
@ -250,7 +250,7 @@ public class ShadowEngine extends Engine {
}
@Override
public SeqNoStats seqNoStats() {
public SequenceNumbersService seqNoService() {
throw new UnsupportedOperationException("ShadowEngine doesn't track sequence numbers");
}

View File

@ -0,0 +1,244 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.seqno;
import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.ObjectLongMap;
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import java.util.Set;
/**
* A shard component that is responsible of tracking the global checkpoint. The global checkpoint
* is the highest seq_no for which all lower (or equal) seq_no have been processed on all shards that
* are currently active. Since shards count as "active" when the master starts them, and before this primary shard
* has been notified of this fact, we also include shards in that are in the
* {@link org.elasticsearch.index.shard.IndexShardState#POST_RECOVERY} state when checking for global checkpoint advancement.
* We call these shards "in sync" with all operations on the primary (see {@link #inSyncLocalCheckpoints}.
*
* <p>
* The global checkpoint is maintained by the primary shard and is replicated to all the replicas
* (via {@link GlobalCheckpointSyncAction}).
*/
public class GlobalCheckpointService extends AbstractIndexShardComponent {
/**
* This map holds the last known local checkpoint for every shard copy that's active.
* All shard copies in this map participate in determining the global checkpoint
* keyed by allocation ids
*/
final private ObjectLongMap<String> activeLocalCheckpoints;
/**
* This map holds the last known local checkpoint for every initializing shard copy that's has been brought up
* to speed through recovery. These shards are treated as valid copies and participate in determining the global
* checkpoint.
* <p>
* Keyed by allocation ids.
*/
final private ObjectLongMap<String> inSyncLocalCheckpoints; // keyed by allocation ids
/**
* This map holds the last known local checkpoint for every initializing shard copy that is still undergoing recovery.
* These shards <strong>do not</strong> participate in determining the global checkpoint. This map is needed to make sure that when
* shards are promoted to {@link #inSyncLocalCheckpoints} we use the highest known checkpoint, even if we index concurrently
* while recovering the shard.
* Keyed by allocation ids
*/
final private ObjectLongMap<String> trackingLocalCheckpoint;
private long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
public GlobalCheckpointService(ShardId shardId, IndexSettings indexSettings) {
super(shardId, indexSettings);
activeLocalCheckpoints = new ObjectLongHashMap<>(1 + indexSettings.getNumberOfReplicas());
inSyncLocalCheckpoints = new ObjectLongHashMap<>(indexSettings.getNumberOfReplicas());
trackingLocalCheckpoint = new ObjectLongHashMap<>(indexSettings.getNumberOfReplicas());
}
/**
* notifies the service of a local checkpoint. if the checkpoint is lower than the currently known one,
* this is a noop. Last, if the allocation id is not yet known, it is ignored. This to prevent late
* arrivals from shards that are removed to be re-added.
*/
synchronized public void updateLocalCheckpoint(String allocationId, long localCheckpoint) {
if (updateLocalCheckpointInMap(allocationId, localCheckpoint, activeLocalCheckpoints, "active")) {
return;
}
if (updateLocalCheckpointInMap(allocationId, localCheckpoint, inSyncLocalCheckpoints, "inSync")) {
return;
}
if (updateLocalCheckpointInMap(allocationId, localCheckpoint, trackingLocalCheckpoint, "tracking")) {
return;
}
logger.trace("local checkpoint of [{}] ([{}]) wasn't found in any map. ignoring.", allocationId, localCheckpoint);
}
private boolean updateLocalCheckpointInMap(String allocationId, long localCheckpoint,
ObjectLongMap<String> checkpointsMap, String name) {
assert Thread.holdsLock(this);
int indexOfKey = checkpointsMap.indexOf(allocationId);
if (indexOfKey < 0) {
return false;
}
long current = checkpointsMap.indexGet(indexOfKey);
// nocommit: this can change when we introduces rollback/resync
if (current < localCheckpoint) {
checkpointsMap.indexReplace(indexOfKey, localCheckpoint);
if (logger.isTraceEnabled()) {
logger.trace("updated local checkpoint of [{}] to [{}] (type [{}])", allocationId, localCheckpoint,
name);
}
} else {
logger.trace("skipping update local checkpoint [{}], current check point is higher " +
"(current [{}], incoming [{}], type [{}])",
allocationId, current, localCheckpoint, allocationId);
}
return true;
}
/**
* Scans through the currently known local checkpoints and updates the global checkpoint accordingly.
*
* @return true if the checkpoint has been updated or if it can not be updated since one of the local checkpoints
* of one of the active allocations is not known.
*/
synchronized public boolean updateCheckpointOnPrimary() {
long minCheckpoint = Long.MAX_VALUE;
if (activeLocalCheckpoints.isEmpty() && inSyncLocalCheckpoints.isEmpty()) {
return false;
}
for (ObjectLongCursor<String> cp : activeLocalCheckpoints) {
if (cp.value == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
logger.trace("unknown local checkpoint for active allocationId [{}], requesting a sync", cp.key);
return true;
}
minCheckpoint = Math.min(cp.value, minCheckpoint);
}
for (ObjectLongCursor<String> cp : inSyncLocalCheckpoints) {
assert cp.value != SequenceNumbersService.UNASSIGNED_SEQ_NO :
"in sync allocation ids can not have an unknown checkpoint (aId [" + cp.key + "])";
minCheckpoint = Math.min(cp.value, minCheckpoint);
}
if (minCheckpoint < globalCheckpoint) {
// nocommit: if this happens - do you we fail the shard?
throw new IllegalStateException(shardId + " new global checkpoint [" + minCheckpoint
+ "] is lower than previous one [" + globalCheckpoint + "]");
}
if (globalCheckpoint != minCheckpoint) {
logger.trace("global checkpoint updated to [{}]", minCheckpoint);
globalCheckpoint = minCheckpoint;
return true;
}
return false;
}
/**
* gets the current global checkpoint. See java docs for {@link GlobalCheckpointService} for more details
*/
synchronized public long getCheckpoint() {
return globalCheckpoint;
}
/**
* updates the global checkpoint on a replica shard (after it has been updated by the primary).
*/
synchronized public void updateCheckpointOnReplica(long globalCheckpoint) {
if (this.globalCheckpoint <= globalCheckpoint) {
this.globalCheckpoint = globalCheckpoint;
logger.trace("global checkpoint updated from primary to [{}]", globalCheckpoint);
} else {
// nocommit: fail the shard?
throw new IllegalArgumentException("global checkpoint from primary should never decrease. current [" +
this.globalCheckpoint + "], got [" + globalCheckpoint + "]");
}
}
/**
* Notifies the service of the current allocation ids in the cluster state. This method trims any shards that
* have been removed and adds/promotes any active allocations to the {@link #activeLocalCheckpoints}.
*
* @param activeAllocationIds the allocation ids of the currently active shard copies
* @param initializingAllocationIds the allocation ids of the currently initializing shard copies
*/
synchronized public void updateAllocationIdsFromMaster(Set<String> activeAllocationIds,
Set<String> initializingAllocationIds) {
activeLocalCheckpoints.removeAll(key -> activeAllocationIds.contains(key) == false);
for (String activeId : activeAllocationIds) {
if (activeLocalCheckpoints.containsKey(activeId) == false) {
long knownCheckpoint = trackingLocalCheckpoint.getOrDefault(activeId, SequenceNumbersService.UNASSIGNED_SEQ_NO);
knownCheckpoint = inSyncLocalCheckpoints.getOrDefault(activeId, knownCheckpoint);
activeLocalCheckpoints.put(activeId, knownCheckpoint);
logger.trace("marking [{}] as active. known checkpoint [{}]", activeId, knownCheckpoint);
}
}
inSyncLocalCheckpoints.removeAll(key -> initializingAllocationIds.contains(key) == false);
trackingLocalCheckpoint.removeAll(key -> initializingAllocationIds.contains(key) == false);
// add initializing shards to tracking
for (String initID : initializingAllocationIds) {
if (inSyncLocalCheckpoints.containsKey(initID)) {
continue;
}
if (trackingLocalCheckpoint.containsKey(initID)) {
continue;
}
trackingLocalCheckpoint.put(initID, SequenceNumbersService.UNASSIGNED_SEQ_NO);
logger.trace("added [{}] to the tracking map due to a CS update", initID);
}
}
/**
* marks the allocationId as "in sync" with the primary shard. This should be called at the end of recovery
* where the primary knows all operation bellow the global checkpoint have been completed on this shard.
*
* @param allocationId allocationId of the recovering shard
* @param localCheckpoint the local checkpoint of the shard in question
*/
synchronized public void markAllocationIdAsInSync(String allocationId, long localCheckpoint) {
if (trackingLocalCheckpoint.containsKey(allocationId) == false) {
// master have change its mind and removed this allocation, ignore.
return;
}
long current = trackingLocalCheckpoint.remove(allocationId);
localCheckpoint = Math.max(current, localCheckpoint);
logger.trace("marked [{}] as in sync with a local checkpoint of [{}]", allocationId, localCheckpoint);
inSyncLocalCheckpoints.put(allocationId, localCheckpoint);
}
// for testing
synchronized long getLocalCheckpointForAllocation(String allocationId) {
if (activeLocalCheckpoints.containsKey(allocationId)) {
return activeLocalCheckpoints.get(allocationId);
}
if (inSyncLocalCheckpoints.containsKey(allocationId)) {
return inSyncLocalCheckpoints.get(allocationId);
}
if (trackingLocalCheckpoint.containsKey(allocationId)) {
return trackingLocalCheckpoint.get(allocationId);
}
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
}

View File

@ -0,0 +1,132 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.seqno;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ReplicationResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
public class GlobalCheckpointSyncAction extends TransportReplicationAction<GlobalCheckpointSyncAction.PrimaryRequest,
GlobalCheckpointSyncAction.ReplicaRequest, ReplicationResponse> {
public static String ACTION_NAME = "indices:admin/seq_no/global_checkpoint_sync";
@Inject
public GlobalCheckpointSyncAction(Settings settings, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
actionFilters, indexNameExpressionResolver, PrimaryRequest::new, ReplicaRequest::new,
ThreadPool.Names.SAME);
}
@Override
protected ReplicationResponse newResponseInstance() {
return new ReplicationResponse();
}
@Override
protected Tuple<ReplicationResponse, ReplicaRequest> shardOperationOnPrimary(PrimaryRequest request) {
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.getShard(request.shardId().id());
long checkpoint = indexShard.getGlobalCheckpoint();
return new Tuple<>(new ReplicationResponse(), new ReplicaRequest(request, checkpoint));
}
@Override
protected void shardOperationOnReplica(ReplicaRequest request) {
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.getShard(request.shardId().id());
indexShard.updateGlobalCheckpointOnReplica(request.checkpoint);
}
public void updateCheckpointForShard(ShardId shardId) {
execute(new PrimaryRequest(shardId), new ActionListener<ReplicationResponse>() {
@Override
public void onResponse(ReplicationResponse replicationResponse) {
if (logger.isTraceEnabled()) {
logger.trace("{} global checkpoint successfully updated (shard info [{}])", shardId,
replicationResponse.getShardInfo());
}
}
@Override
public void onFailure(Throwable e) {
logger.debug("{} failed to update global checkpoint", e, shardId);
}
});
}
final static class PrimaryRequest extends ReplicationRequest<PrimaryRequest> {
private PrimaryRequest() {
super();
}
PrimaryRequest(ShardId shardId) {
super(shardId);
}
}
final static class ReplicaRequest extends ReplicationRequest<GlobalCheckpointSyncAction.ReplicaRequest> {
public long checkpoint;
private ReplicaRequest() {
}
ReplicaRequest(PrimaryRequest primaryRequest, long checkpoint) {
super(primaryRequest.shardId());
this.checkpoint = checkpoint;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
checkpoint = in.readZLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeZLong(checkpoint);
}
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.seqno;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
@ -35,10 +36,8 @@ public class LocalCheckpointService extends AbstractIndexShardComponent {
* we keep a bit for each seq No that is still pending. to optimize allocation, we do so in multiple arrays
* allocating them on demand and cleaning up while completed. This setting controls the size of the arrays
*/
public static String SETTINGS_BIT_ARRAYS_SIZE = "index.seq_no.checkpoint.bit_arrays_size";
/** default value for {@link #SETTINGS_BIT_ARRAYS_SIZE} */
final static int DEFAULT_BIT_ARRAYS_SIZE = 1024;
public static Setting<Integer> SETTINGS_BIT_ARRAYS_SIZE = Setting.intSetting("index.seq_no.checkpoint.bit_arrays_size", 1024,
4, Setting.Property.IndexScope);
/**
@ -50,7 +49,7 @@ public class LocalCheckpointService extends AbstractIndexShardComponent {
long firstProcessedSeqNo = 0;
/** the current local checkpoint, i.e., all seqNo lower (&lt;=) than this number have been completed */
volatile long checkpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
volatile long checkpoint = SequenceNumbersService.NO_OPS_PERFORMED;
/** the next available seqNo - used for seqNo generation */
volatile long nextSeqNo = 0;
@ -58,10 +57,7 @@ public class LocalCheckpointService extends AbstractIndexShardComponent {
public LocalCheckpointService(ShardId shardId, IndexSettings indexSettings) {
super(shardId, indexSettings);
bitArraysSize = indexSettings.getSettings().getAsInt(SETTINGS_BIT_ARRAYS_SIZE, DEFAULT_BIT_ARRAYS_SIZE);
if (bitArraysSize <= 0) {
throw new IllegalArgumentException("[" + SETTINGS_BIT_ARRAYS_SIZE + "] must be positive. got [" + bitArraysSize + "]");
}
bitArraysSize = SETTINGS_BIT_ARRAYS_SIZE.get(indexSettings.getSettings());
processedSeqNo = new LinkedList<>();
}

View File

@ -31,19 +31,22 @@ public class SeqNoStats implements ToXContent, Writeable {
private static final String SEQ_NO = "seq_no";
private static final String MAX_SEQ_NO = "max";
private static final String LOCAL_CHECKPOINT = "local_checkpoint";
private static final String GLOBAL_CHECKPOINT = "global_checkpoint";
public static final SeqNoStats PROTO = new SeqNoStats(0,0);
public static final SeqNoStats PROTO = new SeqNoStats(0, 0, 0);
final long maxSeqNo;
final long localCheckpoint;
private final long maxSeqNo;
private final long localCheckpoint;
private final long globalCheckpoint;
public SeqNoStats(long maxSeqNo, long localCheckpoint) {
public SeqNoStats(long maxSeqNo, long localCheckpoint, long globalCheckpoint) {
this.maxSeqNo = maxSeqNo;
this.localCheckpoint = localCheckpoint;
this.globalCheckpoint = globalCheckpoint;
}
public SeqNoStats(StreamInput in) throws IOException {
this(in.readZLong(), in.readZLong());
this(in.readZLong(), in.readZLong(), in.readZLong());
}
/** the maximum sequence number seen so far */
@ -56,10 +59,15 @@ public class SeqNoStats implements ToXContent, Writeable {
return localCheckpoint;
}
public long getGlobalCheckpoint() {
return globalCheckpoint;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeZLong(maxSeqNo);
out.writeZLong(localCheckpoint);
out.writeZLong(globalCheckpoint);
}
@Override
@ -67,8 +75,8 @@ public class SeqNoStats implements ToXContent, Writeable {
builder.startObject(SEQ_NO);
builder.field(MAX_SEQ_NO, maxSeqNo);
builder.field(LOCAL_CHECKPOINT, localCheckpoint);
builder.field(GLOBAL_CHECKPOINT, globalCheckpoint);
builder.endObject();
return builder;
}
}

View File

@ -22,15 +22,22 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
/** a very light weight implementation. will be replaced with proper machinery later */
import java.util.Set;
/**
* a very light weight implementation. will be replaced with proper machinery later
*/
public class SequenceNumbersService extends AbstractIndexShardComponent {
public final static long UNASSIGNED_SEQ_NO = -1L;
public final static long UNASSIGNED_SEQ_NO = -2L;
public final static long NO_OPS_PERFORMED = -1L;
final LocalCheckpointService localCheckpointService;
final GlobalCheckpointService globalCheckpointService;
public SequenceNumbersService(ShardId shardId, IndexSettings indexSettings) {
super(shardId, indexSettings);
localCheckpointService = new LocalCheckpointService(shardId, indexSettings);
globalCheckpointService = new GlobalCheckpointService(shardId, indexSettings);
}
/**
@ -54,6 +61,62 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
* Gets sequence number related stats
*/
public SeqNoStats stats() {
return new SeqNoStats(localCheckpointService.getMaxSeqNo(), localCheckpointService.getCheckpoint());
return new SeqNoStats(localCheckpointService.getMaxSeqNo(), localCheckpointService.getCheckpoint(),
globalCheckpointService.getCheckpoint());
}
/**
* notifies the service of a local checkpoint.
* see {@link GlobalCheckpointService#updateLocalCheckpoint(String, long)} for details.
*/
public void updateLocalCheckpointForShard(String allocationId, long checkpoint) {
globalCheckpointService.updateLocalCheckpoint(allocationId, checkpoint);
}
/**
* marks the allocationId as "in sync" with the primary shard.
* see {@link GlobalCheckpointService#markAllocationIdAsInSync(String, long)} for details.
*
* @param allocationId allocationId of the recovering shard
* @param localCheckpoint the local checkpoint of the shard in question
*/
public void markAllocationIdAsInSync(String allocationId, long localCheckpoint) {
globalCheckpointService.markAllocationIdAsInSync(allocationId, localCheckpoint);
}
public long getLocalCheckpoint() {
return localCheckpointService.getCheckpoint();
}
public long getGlobalCheckpoint() {
return globalCheckpointService.getCheckpoint();
}
/**
* updates the global checkpoint on a replica shard (after it has been updated by the primary).
*/
public void updateGlobalCheckpointOnReplica(long checkpoint) {
globalCheckpointService.updateCheckpointOnReplica(checkpoint);
}
/**
* Notifies the service of the current allocation ids in the cluster state.
* see {@link GlobalCheckpointService#updateAllocationIdsFromMaster(Set, Set)} for details.
*
* @param activeAllocationIds the allocation ids of the currently active shard copies
* @param initializingAllocationIds the allocation ids of the currently initializing shard copies
*/
public void updateAllocationIdsFromMaster(Set<String> activeAllocationIds, Set<String> initializingAllocationIds) {
globalCheckpointService.updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds);
}
/**
* Scans through the currently known local checkpoint and updates the global checkpoint accordingly.
*
* @return true if the checkpoint has been updated or if it can not be updated since one of the local checkpoints
* of one of the active allocations is not known.
*/
public boolean updateGlobalCheckpointOnPrimary() {
return globalCheckpointService.updateCheckpointOnPrimary();
}
}

View File

@ -90,6 +90,7 @@ import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.search.stats.ShardSearchStats;
import org.elasticsearch.index.seqno.GlobalCheckpointService;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.similarity.SimilarityService;
@ -122,6 +123,7 @@ import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -193,6 +195,9 @@ public class IndexShard extends AbstractIndexShardComponent {
private static final EnumSet<IndexShardState> writeAllowedStatesForReplica = EnumSet.of(IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED);
private final IndexSearcherWrapper searcherWrapper;
private final Runnable globalCheckpointSyncer;
/**
* True if this shard is still indexing (recently) and false if we've been idle for long enough (as periodically checked by {@link
* IndexingMemoryController}).
@ -203,7 +208,8 @@ public class IndexShard extends AbstractIndexShardComponent {
MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService,
@Nullable EngineFactory engineFactory,
IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, ThreadPool threadPool, BigArrays bigArrays,
Engine.Warmer warmer, List<SearchOperationListener> searchOperationListener, List<IndexingOperationListener> listeners) throws IOException {
Engine.Warmer warmer, Runnable globalCheckpointSyncer,
List<SearchOperationListener> searchOperationListener, List<IndexingOperationListener> listeners) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
this.shardRouting = shardRouting;
@ -226,6 +232,7 @@ public class IndexShard extends AbstractIndexShardComponent {
final List<SearchOperationListener> searchListenersList = new ArrayList<>(searchOperationListener);
searchListenersList.add(searchStats);
this.searchOperationListener = new SearchOperationListener.CompositeListener(searchListenersList, logger);
this.globalCheckpointSyncer = globalCheckpointSyncer;
this.getService = new ShardGetService(indexSettings, this, mapperService);
this.shardWarmerService = new ShardIndexWarmerService(shardId, indexSettings);
this.shardQueryCache = new ShardRequestCache();
@ -531,9 +538,7 @@ public class IndexShard extends AbstractIndexShardComponent {
}
public Engine.Delete prepareDeleteOnReplica(String type, String id, long seqNo, long version, VersionType versionType) {
if (shardRouting.primary() && shardRouting.isRelocationTarget() == false) {
throw new IllegalIndexShardStateException(shardId, state, "shard is not a replica");
}
verifyReplicationTarget();
final DocumentMapper documentMapper = docMapper(type).getDocumentMapper();
final MappedFieldType uidFieldType = documentMapper.uidMapper().fieldType();
final Query uidQuery = uidFieldType.termQuery(Uid.createUid(type, id), null);
@ -641,7 +646,7 @@ public class IndexShard extends AbstractIndexShardComponent {
@Nullable
public SeqNoStats seqNoStats() {
Engine engine = getEngineOrNull();
return engine == null ? null : engine.seqNoStats();
return engine == null ? null : engine.seqNoService().stats();
}
public IndexingStats indexingStats(String... types) {
@ -1253,6 +1258,69 @@ public class IndexShard extends AbstractIndexShardComponent {
}
}
/**
* notifies the service of a local checkpoint. see {@link GlobalCheckpointService#updateLocalCheckpoint(String, long)} for details.
*/
public void updateLocalCheckpointForShard(String allocationId, long checkpoint) {
verifyPrimary();
getEngine().seqNoService().updateLocalCheckpointForShard(allocationId, checkpoint);
}
/**
* marks the allocationId as "in sync" with the primary shard. see {@link GlobalCheckpointService#markAllocationIdAsInSync(String, long)} for details.
*
* @param allocationId allocationId of the recovering shard
* @param localCheckpoint the local checkpoint of the shard in question
*/
public void markAllocationIdAsInSync(String allocationId, long localCheckpoint) {
verifyPrimary();
getEngine().seqNoService().markAllocationIdAsInSync(allocationId, localCheckpoint);
}
public long getLocalCheckpoint() {
return getEngine().seqNoService().getLocalCheckpoint();
}
public long getGlobalCheckpoint() {
return getEngine().seqNoService().getGlobalCheckpoint();
}
/**
* checks whether the global checkpoint can be updated based on current knowledge of local checkpoints on the different
* shard copies. The checkpoint is updated or more information is required from the replica, a globack checkpoint sync
* is initiated.
*/
public void updateGlobalCheckpointOnPrimary() {
verifyPrimary();
if (getEngine().seqNoService().updateGlobalCheckpointOnPrimary()) {
globalCheckpointSyncer.run();
}
}
/**
* updates the global checkpoint on a replica shard (after it has been updated by the primary).
*/
public void updateGlobalCheckpointOnReplica(long checkpoint) {
verifyReplicationTarget();
getEngine().seqNoService().updateGlobalCheckpointOnReplica(checkpoint);
}
/**
* Notifies the service of the current allocation ids in the cluster state.
* see {@link GlobalCheckpointService#updateAllocationIdsFromMaster(Set, Set)} for details.
*
* @param activeAllocationIds the allocation ids of the currently active shard copies
* @param initializingAllocationIds the allocation ids of the currently initializing shard copies
*/
public void updateAllocationIdsFromMaster(Set<String> activeAllocationIds, Set<String> initializingAllocationIds) {
verifyPrimary();
Engine engine = getEngineOrNull();
// if engine is not yet started, we are not ready yet and can just ignore this
if (engine != null) {
engine.seqNoService().updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds);
}
}
/**
* Should be called for each no-op update operation to increment relevant statistics.
*
@ -1644,4 +1712,9 @@ public class IndexShard extends AbstractIndexShardComponent {
}
}
// for tests
Runnable getGlobalCheckpointSyncer() {
return globalCheckpointSyncer;
}
}

View File

@ -53,7 +53,8 @@ public final class ShadowIndexShard extends IndexShard {
ThreadPool threadPool, BigArrays bigArrays, Engine.Warmer engineWarmer,
List<SearchOperationListener> searchOperationListeners) throws IOException {
super(shardRouting, indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldDataService, engineFactory,
indexEventListener, wrapper, threadPool, bigArrays, engineWarmer, searchOperationListeners, Collections.emptyList());
indexEventListener, wrapper, threadPool, bigArrays, engineWarmer, () -> {
}, searchOperationListeners, Collections.emptyList());
}
/**
@ -102,4 +103,22 @@ public final class ShadowIndexShard extends IndexShard {
public TranslogStats translogStats() {
return null; // shadow engine has no translog
}
@Override
public void updateGlobalCheckpointOnReplica(long checkpoint) {
// nocommit: think shadow replicas through
}
@Override
public long getLocalCheckpoint() {
// nocommit: think shadow replicas through
return -1;
}
@Override
public long getGlobalCheckpoint() {
// nocommit: think shadow replicas through
return -1;
}
}

View File

@ -31,11 +31,10 @@ import org.elasticsearch.index.mapper.core.BooleanFieldMapper;
import org.elasticsearch.index.mapper.core.CompletionFieldMapper;
import org.elasticsearch.index.mapper.core.DateFieldMapper;
import org.elasticsearch.index.mapper.core.KeywordFieldMapper;
import org.elasticsearch.index.mapper.core.NumberFieldMapper;
import org.elasticsearch.index.mapper.core.StringFieldMapper;
import org.elasticsearch.index.mapper.core.TextFieldMapper;
import org.elasticsearch.index.mapper.core.TokenCountFieldMapper;
import org.elasticsearch.index.mapper.core.LegacyTokenCountFieldMapper;
import org.elasticsearch.index.mapper.core.NumberFieldMapper;
import org.elasticsearch.index.mapper.geo.GeoPointFieldMapper;
import org.elasticsearch.index.mapper.geo.GeoShapeFieldMapper;
import org.elasticsearch.index.mapper.internal.AllFieldMapper;
@ -54,6 +53,7 @@ import org.elasticsearch.index.mapper.internal.VersionFieldMapper;
import org.elasticsearch.index.mapper.ip.IpFieldMapper;
import org.elasticsearch.index.mapper.object.ObjectMapper;
import org.elasticsearch.index.percolator.PercolatorFieldMapper;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.flush.SyncedFlushService;
import org.elasticsearch.indices.mapper.MapperRegistry;
@ -162,6 +162,7 @@ public class IndicesModule extends AbstractModule {
bind(UpdateHelper.class).asEagerSingleton();
bind(MetaDataIndexUpgradeService.class).asEagerSingleton();
bind(NodeServicesProvider.class).asEagerSingleton();
bind(GlobalCheckpointSyncAction.class).asEagerSingleton();
}
// public for testing

View File

@ -112,6 +112,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@ -348,8 +349,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
* @param builtInListeners a list of built-in lifecycle {@link IndexEventListener} that should should be used along side with the per-index listeners
* @throws IndexAlreadyExistsException if the index already exists.
*/
public synchronized IndexService createIndex(final NodeServicesProvider nodeServicesProvider, IndexMetaData indexMetaData, List<IndexEventListener> builtInListeners) throws IOException {
public synchronized IndexService createIndex(final NodeServicesProvider nodeServicesProvider, IndexMetaData indexMetaData, List<IndexEventListener> builtInListeners, Consumer<ShardId> globalCheckpointSyncer) throws IOException {
if (!lifecycle.started()) {
throw new IllegalStateException("Can't create an index [" + indexMetaData.getIndex() + "], node is closed");
}
@ -369,7 +369,8 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
};
finalListeners.add(onStoreClose);
finalListeners.add(oldShardsStats);
final IndexService indexService = createIndexService("create index", nodeServicesProvider, indexMetaData, indicesQueryCache, indicesFieldDataCache, finalListeners, indexingMemoryController);
final IndexService indexService = createIndexService("create index", nodeServicesProvider, indexMetaData, indicesQueryCache,
indicesFieldDataCache, finalListeners, globalCheckpointSyncer, indexingMemoryController);
boolean success = false;
try {
indexService.getIndexEventListener().afterIndexCreated(indexService);
@ -386,7 +387,12 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
/**
* This creates a new IndexService without registering it
*/
private synchronized IndexService createIndexService(final String reason, final NodeServicesProvider nodeServicesProvider, IndexMetaData indexMetaData, IndicesQueryCache indicesQueryCache, IndicesFieldDataCache indicesFieldDataCache, List<IndexEventListener> builtInListeners, IndexingOperationListener... indexingOperationListeners) throws IOException {
private synchronized IndexService createIndexService(final String reason, final NodeServicesProvider nodeServicesProvider,
IndexMetaData indexMetaData, IndicesQueryCache indicesQueryCache,
IndicesFieldDataCache indicesFieldDataCache,
List<IndexEventListener> builtInListeners,
Consumer<ShardId> globalCheckpointSyncer,
IndexingOperationListener... indexingOperationListeners) throws IOException {
final Index index = indexMetaData.getIndex();
final Predicate<String> indexNameMatcher = (indexExpression) -> indexNameExpressionResolver.matchesIndex(index.getName(), indexExpression, clusterService.state());
final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexNameMatcher, indexScopeSetting);
@ -404,7 +410,8 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
for (IndexEventListener listener : builtInListeners) {
indexModule.addIndexEventListener(listener);
}
return indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, indicesQueryCache, mapperRegistry, indicesFieldDataCache);
return indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, indicesQueryCache, mapperRegistry, globalCheckpointSyncer,
indicesFieldDataCache);
}
/**
@ -420,7 +427,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
closeables.add(indicesQueryCache);
// this will also fail if some plugin fails etc. which is nice since we can verify that early
final IndexService service = createIndexService("metadata verification", nodeServicesProvider,
metaData, indicesQueryCache, indicesFieldDataCache, Collections.emptyList());
metaData, indicesQueryCache, indicesFieldDataCache, Collections.emptyList(), s -> {});
for (ObjectCursor<MappingMetaData> typeMapping : metaData.getMappings().values()) {
// don't apply the default mapping, it has been applied when the mapping was created
service.mapperService().merge(typeMapping.value.type(), typeMapping.value.source(),

View File

@ -52,6 +52,7 @@ import org.elasticsearch.index.IndexShardAlreadyExistsException;
import org.elasticsearch.index.NodeServicesProvider;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
@ -75,6 +76,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
/**
*
@ -89,6 +91,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private final NodeIndexDeletedAction nodeIndexDeletedAction;
private final NodeMappingRefreshAction nodeMappingRefreshAction;
private final NodeServicesProvider nodeServicesProvider;
private final GlobalCheckpointSyncAction globalCheckpointSyncAction;
private static final ShardStateAction.Listener SHARD_STATE_ACTION_LISTENER = new ShardStateAction.Listener() {
};
@ -113,9 +116,11 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
NodeMappingRefreshAction nodeMappingRefreshAction,
RepositoriesService repositoriesService, RestoreService restoreService,
SearchService searchService, SyncedFlushService syncedFlushService,
RecoverySource recoverySource, NodeServicesProvider nodeServicesProvider) {
RecoverySource recoverySource, NodeServicesProvider nodeServicesProvider,
GlobalCheckpointSyncAction globalCheckpointSyncAction) {
super(settings);
this.buildInIndexListener = Arrays.asList(recoverySource, recoveryTargetService, searchService, syncedFlushService);
this.globalCheckpointSyncAction = globalCheckpointSyncAction;
this.indicesService = indicesService;
this.clusterService = clusterService;
this.threadPool = threadPool;
@ -379,7 +384,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
logger.debug("[{}] creating index", indexMetaData.getIndex());
}
try {
indicesService.createIndex(nodeServicesProvider, indexMetaData, buildInIndexListener);
indicesService.createIndex(nodeServicesProvider, indexMetaData, buildInIndexListener,
globalCheckpointSyncAction::updateCheckpointForShard);
} catch (Throwable e) {
sendFailShard(shard, "failed to create index", e);
}
@ -527,6 +533,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (shardHasBeenRemoved == false) {
try {
indexShard.updateRoutingEntry(shardRouting, event.state().blocks().disableStatePersistence() == false);
if (shardRouting.primary()) {
final IndexShardRoutingTable shardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId());
Set<String> activeIds = shardRoutingTable.activeShards().stream().map(sr -> sr.allocationId().getId()).collect(Collectors.toSet());
Set<String> initializingIds = shardRoutingTable.getAllInitializingShards().stream().map(sr -> sr.allocationId().getId()).collect(Collectors.toSet());
indexShard.updateAllocationIdsFromMaster(activeIds, initializingIds);
}
} catch (Throwable e) {
failAndRemoveShard(shardRouting, indexService, true, "failed updating shard routing entry", e);
}

View File

@ -362,7 +362,10 @@ public class RecoverySourceHandler {
logger.trace("[{}][{}] finalizing recovery to {}", indexName, shardId, request.targetNode());
cancellableThreads.execute(recoveryTarget::finalizeRecovery);
cancellableThreads.execute(() -> {
RecoveryTarget.FinalizeResponse response = recoveryTarget.finalizeRecovery();
shard.markAllocationIdAsInSync(response.getAllocationId(), response.getLocalCheckpoint());
});
if (isPrimaryRelocation()) {
/**

View File

@ -298,8 +298,10 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
}
@Override
public void finalizeRecovery() {
indexShard().finalizeRecovery();
public FinalizeResponse finalizeRecovery() {
final IndexShard indexShard = indexShard();
indexShard.finalizeRecovery();
return new FinalizeResponse(indexShard.routingEntry().allocationId().getId(), indexShard.getLocalCheckpoint());
}
@Override

View File

@ -19,9 +19,12 @@
package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
import java.util.List;
@ -41,7 +44,7 @@ public interface RecoveryTargetHandler {
* new segments are available, and enables garbage collection of
* tombstone files. The shard is also moved to the POST_RECOVERY phase during this time
**/
void finalizeRecovery();
FinalizeResponse finalizeRecovery();
/**
* Index a set of translog operations on the target
@ -71,4 +74,41 @@ public interface RecoveryTargetHandler {
void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,
boolean lastChunk, int totalTranslogOps) throws IOException;
class FinalizeResponse extends TransportResponse {
private long localCheckpoint;
private String allocationId;
public FinalizeResponse(String allocationId, long localCheckpoint) {
this.localCheckpoint = localCheckpoint;
this.allocationId = allocationId;
}
FinalizeResponse() {
}
public long getLocalCheckpoint() {
return localCheckpoint;
}
public String getAllocationId() {
return allocationId;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeZLong(localCheckpoint);
out.writeString(allocationId);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
localCheckpoint = in.readZLong();
allocationId = in.readString();
}
}
}

View File

@ -58,7 +58,6 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
@ -300,11 +299,12 @@ public class RecoveryTargetService extends AbstractComponent implements IndexEve
@Override
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel) throws Exception {
final RecoveryTargetHandler.FinalizeResponse response;
try (RecoveriesCollection.RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.status().finalizeRecovery();
response = recoveryRef.status().finalizeRecovery();
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
channel.sendResponse(response);
}
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.FutureTransportResponseHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
@ -82,11 +83,16 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
}
@Override
public void finalizeRecovery() {
transportService.submitRequest(targetNode, RecoveryTargetService.Actions.FINALIZE,
public FinalizeResponse finalizeRecovery() {
return transportService.submitRequest(targetNode, RecoveryTargetService.Actions.FINALIZE,
new RecoveryFinalizeRecoveryRequest(recoveryId, shardId),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
new FutureTransportResponseHandler<FinalizeResponse>() {
@Override
public FinalizeResponse newInstance() {
return new FinalizeResponse();
}
}).txGet();
}
@Override

View File

@ -39,7 +39,6 @@ import org.elasticsearch.index.shard.IndexShardNotStartedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
import java.util.ArrayList;
@ -105,8 +104,9 @@ public class ReplicationOperationTests extends ESTestCase {
PlainActionFuture<Response> listener = new PlainActionFuture<>();
final ClusterState finalState = state;
final TestReplicaProxy replicasProxy = new TestReplicaProxy(expectedFailures);
final TestPrimary primary = new TestPrimary(primaryShard, primaryTerm);
final TestReplicationOperation op = new TestReplicationOperation(request,
new TestPrimary(primaryShard, primaryTerm), listener, replicasProxy, () -> finalState);
primary, listener, replicasProxy, () -> finalState);
op.execute();
assertThat(request.primaryTerm(), equalTo(primaryTerm));
@ -122,6 +122,9 @@ public class ReplicationOperationTests extends ESTestCase {
indexShardRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED);
final int totalShards = 1 + expectedReplicas.size() + unassignedShards.size();
assertThat(shardInfo.getTotal(), equalTo(totalShards));
assertThat(primary.knownLocalCheckpoints.remove(primaryShard.allocationId().getId()), equalTo(primary.localCheckpoint));
assertThat(primary.knownLocalCheckpoints, equalTo(replicasProxy.generatedLocalCheckpoints));
}
@ -368,10 +371,13 @@ public class ReplicationOperationTests extends ESTestCase {
static class TestPrimary implements ReplicationOperation.Primary<Request, Request, Response> {
final ShardRouting routing;
final long term;
final long localCheckpoint;
final Map<String, Long> knownLocalCheckpoints = new HashMap<>();
TestPrimary(ShardRouting routing, long term) {
this.routing = routing;
this.term = term;
this.localCheckpoint = random().nextLong();
}
@Override
@ -392,6 +398,36 @@ public class ReplicationOperationTests extends ESTestCase {
request.primaryTerm(term);
return new Tuple<>(new Response(), request);
}
@Override
public void updateLocalCheckpointForShard(String allocationId, long checkpoint) {
knownLocalCheckpoints.put(allocationId, checkpoint);
}
@Override
public long localCheckpoint() {
return localCheckpoint;
}
}
static class ReplicaResponse implements ReplicationOperation.ReplicaResponse {
final String allocationId;
final long localCheckpoint;
ReplicaResponse(String allocationId, long localCheckpoint) {
this.allocationId = allocationId;
this.localCheckpoint = localCheckpoint;
}
@Override
public long localCheckpoint() {
return localCheckpoint;
}
@Override
public String allocationId() {
return allocationId;
}
}
static class TestReplicaProxy implements ReplicationOperation.Replicas<Request> {
@ -400,6 +436,8 @@ public class ReplicationOperationTests extends ESTestCase {
final Set<ShardRouting> failedReplicas = ConcurrentCollections.newConcurrentSet();
final Map<String, Long> generatedLocalCheckpoints = ConcurrentCollections.newConcurrentMap();
TestReplicaProxy() {
this(Collections.emptyMap());
}
@ -409,12 +447,16 @@ public class ReplicationOperationTests extends ESTestCase {
}
@Override
public void performOn(ShardRouting replica, Request request, ActionListener<TransportResponse.Empty> listener) {
public void performOn(ShardRouting replica, Request request, ActionListener<ReplicationOperation.ReplicaResponse> listener) {
assertTrue("replica request processed twice on [" + replica + "]", request.processedOnReplicas.add(replica));
if (opFailures.containsKey(replica)) {
listener.onFailure(opFailures.get(replica));
} else {
listener.onResponse(TransportResponse.Empty.INSTANCE);
final long checkpoint = random().nextLong();
final String allocationId = replica.allocationId().getId();
Long existing = generatedLocalCheckpoints.put(allocationId, checkpoint);
assertNull(existing);
listener.onResponse(new ReplicaResponse(allocationId, checkpoint));
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.ReplicationResponse;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.replication.ReplicationOperation.ReplicaResponse;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
@ -43,7 +44,6 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -502,7 +502,7 @@ public class TransportReplicationActionTests extends ESTestCase {
setState(clusterService, state);
// check that at unknown node fails
PlainActionFuture<TransportResponse.Empty> listener = new PlainActionFuture<>();
PlainActionFuture<ReplicaResponse> listener = new PlainActionFuture<>();
proxy.performOn(
TestShardRouting.newShardRouting(shardId, "NOT THERE", false, randomFrom(ShardRoutingState.values())),
new Request(), listener);
@ -519,9 +519,11 @@ public class TransportReplicationActionTests extends ESTestCase {
CapturingTransport.CapturedRequest[] captures = transport.getCapturedRequestsAndClear();
assertThat(captures, arrayWithSize(1));
if (randomBoolean()) {
transport.handleResponse(captures[0].requestId, TransportResponse.Empty.INSTANCE);
final TransportReplicationAction.ReplicaResponse response =
new TransportReplicationAction.ReplicaResponse(randomAsciiOfLength(10), randomLong());
transport.handleResponse(captures[0].requestId, response);
assertTrue(listener.isDone());
listener.get();
assertThat(listener.get(), equalTo(response));
} else if (randomBoolean()) {
transport.handleRemoteError(captures[0].requestId, new ElasticsearchException("simulated"));
assertTrue(listener.isDone());

View File

@ -45,9 +45,9 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.index.cache.query.IndexQueryCache;
import org.elasticsearch.index.cache.query.DisabledQueryCache;
import org.elasticsearch.index.cache.query.IndexQueryCache;
import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
@ -61,9 +61,9 @@ import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
@ -88,8 +88,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import static java.util.Collections.emptyMap;
public class IndexModuleTests extends ESTestCase {
private Index index;
private Settings settings;
@ -154,7 +152,8 @@ public class IndexModuleTests extends ESTestCase {
IndexModule module = new IndexModule(indexSettings, null, new AnalysisRegistry(null, environment));
module.setSearcherWrapper((s) -> new Wrapper());
module.engineFactory.set(new MockEngineFactory(AssertingDirectoryReader.class));
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, new IndicesFieldDataCache(settings, listener));
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache,
mapperRegistry, shardId -> {} ,new IndicesFieldDataCache(settings, listener));
assertTrue(indexService.getSearcherWrapper() instanceof Wrapper);
assertSame(indexService.getEngineFactory(), module.engineFactory.get());
indexService.close("simon says", false);
@ -177,7 +176,8 @@ public class IndexModuleTests extends ESTestCase {
} catch (IllegalArgumentException ex) {
// fine
}
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, new IndicesFieldDataCache(settings, listener));
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache,
mapperRegistry, shardId -> {}, new IndicesFieldDataCache(settings, listener));
assertTrue(indexService.getIndexStore() instanceof FooStore);
indexService.close("simon says", false);
@ -196,7 +196,7 @@ public class IndexModuleTests extends ESTestCase {
Consumer<Settings> listener = (s) -> {};
module.addIndexEventListener(eventListener);
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry,
new IndicesFieldDataCache(settings, this.listener));
shardId -> {}, new IndicesFieldDataCache(settings, this.listener));
IndexSettings x = indexService.getIndexSettings();
assertEquals(x.getSettings().getAsMap(), indexSettings.getSettings().getAsMap());
assertEquals(x.getIndex(), index);
@ -221,7 +221,7 @@ public class IndexModuleTests extends ESTestCase {
}
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry,
new IndicesFieldDataCache(settings, listener));
shardId -> {}, new IndicesFieldDataCache(settings, listener));
assertSame(booleanSetting, indexService.getIndexSettings().getScopedSettings().get(booleanSetting.getKey()));
indexService.close("simon says", false);
@ -244,7 +244,7 @@ public class IndexModuleTests extends ESTestCase {
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry,
new IndicesFieldDataCache(settings, this.listener));
shardId -> {}, new IndicesFieldDataCache(settings, this.listener));
assertEquals(2, indexService.getIndexOperationListeners().size());
assertEquals(IndexingSlowLog.class, indexService.getIndexOperationListeners().get(0).getClass());
assertSame(listener, indexService.getIndexOperationListeners().get(1));
@ -274,7 +274,7 @@ public class IndexModuleTests extends ESTestCase {
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry,
new IndicesFieldDataCache(settings, this.listener));
shardId -> {}, new IndicesFieldDataCache(settings, this.listener));
assertEquals(2, indexService.getSearchOperationListener().size());
assertEquals(SearchSlowLog.class, indexService.getSearchOperationListener().get(0).getClass());
assertSame(listener, indexService.getSearchOperationListener().get(1));
@ -307,7 +307,7 @@ public class IndexModuleTests extends ESTestCase {
});
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry,
new IndicesFieldDataCache(settings, listener));
shardId -> {}, new IndicesFieldDataCache(settings, listener));
SimilarityService similarityService = indexService.similarityService();
assertNotNull(similarityService.getSimilarity("my_similarity"));
assertTrue(similarityService.getSimilarity("my_similarity").get() instanceof TestSimilarity);
@ -338,7 +338,7 @@ public class IndexModuleTests extends ESTestCase {
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null, new AnalysisRegistry(null, environment));
try {
module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry,
new IndicesFieldDataCache(settings, listener));
shardId -> {}, new IndicesFieldDataCache(settings, listener));
} catch (IllegalArgumentException ex) {
assertEquals("Unknown Similarity type [test_similarity] for [my_similarity]", ex.getMessage());
}
@ -353,7 +353,7 @@ public class IndexModuleTests extends ESTestCase {
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null, new AnalysisRegistry(null, environment));
try {
module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry,
new IndicesFieldDataCache(settings, listener));
shardId -> {}, new IndicesFieldDataCache(settings, listener));
} catch (IllegalArgumentException ex) {
assertEquals("Similarity [my_similarity] must have an associated type", ex.getMessage());
}
@ -367,7 +367,7 @@ public class IndexModuleTests extends ESTestCase {
module.forceQueryCacheProvider((a, b) -> new CustomQueryCache());
expectThrows(AlreadySetException.class, () -> module.forceQueryCacheProvider((a, b) -> new CustomQueryCache()));
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry,
new IndicesFieldDataCache(settings, listener));
shardId -> {}, new IndicesFieldDataCache(settings, listener));
assertTrue(indexService.cache().query() instanceof CustomQueryCache);
indexService.close("simon says", false);
}
@ -378,7 +378,7 @@ public class IndexModuleTests extends ESTestCase {
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null, new AnalysisRegistry(null, environment));
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry,
new IndicesFieldDataCache(settings, listener));
shardId -> {}, new IndicesFieldDataCache(settings, listener));
assertTrue(indexService.cache().query() instanceof IndexQueryCache);
indexService.close("simon says", false);
}
@ -391,7 +391,7 @@ public class IndexModuleTests extends ESTestCase {
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null, new AnalysisRegistry(null, environment));
module.forceQueryCacheProvider((a, b) -> new CustomQueryCache());
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry,
new IndicesFieldDataCache(settings, listener));
shardId -> {}, new IndicesFieldDataCache(settings, listener));
assertTrue(indexService.cache().query() instanceof DisabledQueryCache);
indexService.close("simon says", false);
}

View File

@ -280,6 +280,20 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
assertNull(indexService.getFsyncTask());
}
public void testGlobalCheckpointTaskIsRunning() throws IOException {
IndexService indexService = createIndex("test", Settings.EMPTY);
IndexService.AsyncGlobalCheckpointTask task = indexService.getGlobalCheckpointTask();
assertNotNull(task);
assertEquals(IndexSettings.INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL.getDefault(Settings.EMPTY), task.getInterval());
assertTrue(task.mustReschedule());
assertTrue(task.isScheduled());
indexService.close("simon says", false);
assertFalse(task.isScheduled());
assertTrue(task.isClosed());
}
public void testRefreshActuallyWorks() throws Exception {
IndexService indexService = createIndex("test", Settings.EMPTY);
ensureGreen("test");

View File

@ -20,7 +20,6 @@
package org.elasticsearch.index.engine;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
@ -1295,13 +1294,13 @@ public class InternalEngineTests extends ESTestCase {
public void testVersioningCreateExistsExceptionWithFlush() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index create = new Engine.Index(newUid("1"), doc, -1, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0);
Engine.Index create = new Engine.Index(newUid("1"), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0);
engine.index(create);
assertThat(create.version(), equalTo(1L));
engine.flush();
create = new Engine.Index(newUid("1"), doc, -1, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0);
create = new Engine.Index(newUid("1"), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0);
try {
engine.index(create);
fail();
@ -1520,10 +1519,10 @@ public class InternalEngineTests extends ESTestCase {
}
}
}
assertThat(engine.seqNoStats().getMaxSeqNo(), equalTo(seqNoCount));
assertThat(engine.seqNoStats().getLocalCheckpoint(), equalTo(seqNoCount));
assertThat(replicaEngine.seqNoStats().getMaxSeqNo(), equalTo(seqNoCount));
assertThat(replicaEngine.seqNoStats().getLocalCheckpoint(), equalTo(seqNoCount));
assertThat(engine.seqNoService().stats().getMaxSeqNo(), equalTo(seqNoCount));
assertThat(engine.seqNoService().stats().getLocalCheckpoint(), equalTo(seqNoCount));
assertThat(replicaEngine.seqNoService().stats().getMaxSeqNo(), equalTo(seqNoCount));
assertThat(replicaEngine.seqNoService().stats().getLocalCheckpoint(), equalTo(seqNoCount));
}
// #8603: make sure we can separately log IFD's messages

View File

@ -0,0 +1,79 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.seqno;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.hamcrest.Matcher;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
@TestLogging("index.shard:TRACE,index.seqno:TRACE")
public class CheckpointsIT extends ESIntegTestCase {
public void testCheckpointsAdvance() throws Exception {
prepareCreate("test").setSettings(
"index.seq_no.checkpoint_sync_interval", "100ms", // update global point frequently
"index.number_of_shards", "1" // simplify things so we know how many ops goes to the shards
).get();
final List<IndexRequestBuilder> builders = new ArrayList<>();
final int numDocs = scaledRandomIntBetween(0, 100);
logger.info("--> will index [{}] docs", numDocs);
for (int i = 0; i < numDocs; i++) {
builders.add(client().prepareIndex("test", "type", "id_" + i).setSource("{}"));
}
indexRandom(randomBoolean(), false, builders);
assertBusy(() -> {
IndicesStatsResponse stats = client().admin().indices().prepareStats("test").clear().get();
for (ShardStats shardStats : stats.getShards()) {
if (shardStats.getSeqNoStats() == null) {
assertFalse("no seq_no stats for primary " + shardStats.getShardRouting(), shardStats.getShardRouting().primary());
continue;
}
logger.debug("seq_no stats for {}: {}", shardStats.getShardRouting(),
XContentHelper.toString(shardStats.getSeqNoStats(),
new ToXContent.MapParams(Collections.singletonMap("pretty", "false"))));
final Matcher<Long> localCheckpointRule;
if (shardStats.getShardRouting().primary()) {
localCheckpointRule = equalTo(numDocs - 1L);
} else {
// nocommit: recovery doesn't transfer local checkpoints yet (we don't persist them in lucene).
localCheckpointRule = anyOf(equalTo(numDocs - 1L), equalTo(SequenceNumbersService.NO_OPS_PERFORMED));
}
assertThat(shardStats.getShardRouting() + " local checkpoint mismatch",
shardStats.getSeqNoStats().getLocalCheckpoint(), localCheckpointRule);
assertThat(shardStats.getShardRouting() + " global checkpoint mismatch",
shardStats.getSeqNoStats().getGlobalCheckpoint(), equalTo(numDocs - 1L));
assertThat(shardStats.getShardRouting() + " max seq no mismatch",
shardStats.getSeqNoStats().getMaxSeqNo(), equalTo(numDocs - 1L));
}
});
}
}

View File

@ -0,0 +1,143 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.seqno;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.junit.Before;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
public class GlobalCheckpointTests extends ESTestCase {
GlobalCheckpointService checkpointService;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
checkpointService = new GlobalCheckpointService(new ShardId("test", "_na_", 0),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY));
}
public void testEmptyShards() {
assertFalse("checkpoint shouldn't be updated when the are no active shards", checkpointService.updateCheckpointOnPrimary());
assertThat(checkpointService.getCheckpoint(), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO));
}
public void testGlobalCheckpointUpdate() {
Map<String, Long> allocations = new HashMap<>();
Set<String> active = new HashSet<>();
Set<String> insync = new HashSet<>();
Set<String> tracking = new HashSet<>();
long maxLocalCheckpoint = Long.MAX_VALUE;
for (int i = randomIntBetween(3, 10); i > 0; i--) {
String id = "id_" + i + "_" + randomAsciiOfLength(5);
long localCheckpoint = randomInt(200);
switch (randomInt(2)) {
case 0:
active.add(id);
maxLocalCheckpoint = Math.min(maxLocalCheckpoint, localCheckpoint);
break;
case 1:
insync.add(id);
maxLocalCheckpoint = Math.min(maxLocalCheckpoint, localCheckpoint);
break;
case 2:
tracking.add(id);
break;
default:
throw new IllegalStateException("you messed up your numbers, didn't you?");
}
allocations.put(id, localCheckpoint);
}
if (maxLocalCheckpoint == Long.MAX_VALUE) {
// note: this state can not happen in practice as we always have at least one primary shard active/in sync
// it is however nice not to assume this on this level and check we do the right thing.
maxLocalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
assertThat(checkpointService.getCheckpoint(), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO));
logger.info("--> using allocations");
allocations.keySet().stream().forEach(aId -> {
final String type;
if (active.contains(aId)) {
type = "active";
} else if (insync.contains(aId)) {
type = "insync";
} else if (tracking.contains(aId)) {
type = "tracked";
} else {
throw new IllegalStateException(aId + " not found in any map");
}
logger.info(" - [{}], local checkpoint [{}], [{}]", aId, allocations.get(aId), type);
});
Set<String> initializing = new HashSet<>(insync);
initializing.addAll(tracking);
checkpointService.updateAllocationIdsFromMaster(active, initializing);
allocations.keySet().stream().forEach(aId -> checkpointService.updateLocalCheckpoint(aId, allocations.get(aId)));
// make sure insync allocation count
insync.stream().forEach(aId -> checkpointService.markAllocationIdAsInSync(aId, randomBoolean() ? 0 : allocations.get(aId)));
assertThat(checkpointService.getCheckpoint(), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO));
assertThat(checkpointService.updateCheckpointOnPrimary(), equalTo(maxLocalCheckpoint != SequenceNumbersService.UNASSIGNED_SEQ_NO));
assertThat(checkpointService.getCheckpoint(), equalTo(maxLocalCheckpoint));
// increment checkpoints
active.stream().forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4)));
insync.stream().forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4)));
allocations.keySet().stream().forEach(aId -> checkpointService.updateLocalCheckpoint(aId, allocations.get(aId)));
// now insert an unknown active/insync id , the checkpoint shouldn't change but a refresh should be requested.
final String extraId = "extra_" + randomAsciiOfLength(5);
// first check that adding it without the master blessing doesn't change anything.
checkpointService.updateLocalCheckpoint(extraId, maxLocalCheckpoint + 1 + randomInt(4));
assertThat(checkpointService.getLocalCheckpointForAllocation(extraId), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO));
Set<String> newActive = new HashSet<>(active);
newActive.add(extraId);
checkpointService.updateAllocationIdsFromMaster(newActive, initializing);
// we should ask for a refresh , but not update the checkpoint
assertTrue(checkpointService.updateCheckpointOnPrimary());
assertThat(checkpointService.getCheckpoint(), equalTo(maxLocalCheckpoint));
// now notify for the new id
checkpointService.updateLocalCheckpoint(extraId, maxLocalCheckpoint + 1 + randomInt(4));
// now it should be incremented
assertTrue(checkpointService.updateCheckpointOnPrimary());
assertThat(checkpointService.getCheckpoint(), greaterThan(maxLocalCheckpoint));
}
}

View File

@ -40,9 +40,9 @@ import static org.hamcrest.Matchers.isOneOf;
public class LocalCheckpointServiceTests extends ESTestCase {
LocalCheckpointService checkpointService;
private LocalCheckpointService checkpointService;
final int SMALL_CHUNK_SIZE = 4;
private final int SMALL_CHUNK_SIZE = 4;
@Override
@Before
@ -51,19 +51,19 @@ public class LocalCheckpointServiceTests extends ESTestCase {
checkpointService = getCheckpointService();
}
protected LocalCheckpointService getCheckpointService() {
private LocalCheckpointService getCheckpointService() {
return new LocalCheckpointService(
new ShardId("test", "_na_", 0),
IndexSettingsModule.newIndexSettings("test",
Settings.builder()
.put(LocalCheckpointService.SETTINGS_BIT_ARRAYS_SIZE, SMALL_CHUNK_SIZE)
.put(LocalCheckpointService.SETTINGS_BIT_ARRAYS_SIZE.getKey(), SMALL_CHUNK_SIZE)
.build()
));
}
public void testSimplePrimary() {
long seqNo1, seqNo2;
assertThat(checkpointService.getCheckpoint(), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO));
assertThat(checkpointService.getCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED));
seqNo1 = checkpointService.generateSeqNo();
assertThat(seqNo1, equalTo(0L));
checkpointService.markSeqNoAsCompleted(seqNo1);
@ -79,7 +79,7 @@ public class LocalCheckpointServiceTests extends ESTestCase {
}
public void testSimpleReplica() {
assertThat(checkpointService.getCheckpoint(), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO));
assertThat(checkpointService.getCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED));
checkpointService.markSeqNoAsCompleted(0L);
assertThat(checkpointService.getCheckpoint(), equalTo(0L));
checkpointService.markSeqNoAsCompleted(2L);

View File

@ -1414,7 +1414,8 @@ public class IndexShardTests extends ESSingleNodeTestCase {
IndexShard newShard = new IndexShard(initializingShardRouting, indexService.getIndexSettings(), shard.shardPath(),
shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(),
indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper,
indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners)
indexService.getThreadPool(), indexService.getBigArrays(), null, shard.getGlobalCheckpointSyncer(),
Collections.emptyList(), Arrays.asList(listeners)
);
return newShard;
}

View File

@ -95,7 +95,7 @@ public class IndicesLifecycleListenerSingleNodeTests extends ESSingleNodeTestCas
indicesService.deleteIndex(idx, "simon says");
try {
NodeServicesProvider nodeServicesProvider = getInstanceFromNode(NodeServicesProvider.class);
IndexService index = indicesService.createIndex(nodeServicesProvider, metaData, Arrays.asList(countingListener));
IndexService index = indicesService.createIndex(nodeServicesProvider, metaData, Arrays.asList(countingListener), s -> {});
idx = index.index();
ShardRouting newRouting = shardRouting;
String nodeId = newRouting.currentNodeId();

View File

@ -80,7 +80,6 @@ import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.node.MockNode;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
@ -162,7 +161,9 @@ public final class InternalTestCluster extends TestCluster {
private static final int JVM_ORDINAL = Integer.parseInt(System.getProperty(SysGlobals.CHILDVM_SYSPROP_JVM_ID, "0"));
/** a per-JVM unique offset to be used for calculating unique port ranges. */
/**
* a per-JVM unique offset to be used for calculating unique port ranges.
*/
public static final int JVM_BASE_PORT_OFFSET = PORTS_PER_JVM * (JVM_ORDINAL + 1);
private static final AtomicInteger clusterOrdinal = new AtomicInteger();
@ -1028,7 +1029,14 @@ public final class InternalTestCluster extends TestCluster {
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
for (IndexService indexService : indexServices) {
for (IndexShard indexShard : indexService) {
// we assert busy as we can have background global checkpoint activity
try {
assertBusy(() -> {
assertThat("index shard counter on shard " + indexShard.shardId() + " on node " + nodeAndClient.name + " not 0", indexShard.getActiveOperationsCount(), equalTo(0));
});
} catch (Exception e) {
throw new RuntimeException("unexpected error while checking for shard counters", e);
}
}
}
}
@ -1889,6 +1897,7 @@ public final class InternalTestCluster extends TestCluster {
/**
* Simple interface that allows to wait for an async operation to finish
*
* @param <T> the result of the async execution
*/
public interface Async<T> {