Make IndexShard operation be more explicit about whether they are expected to run on a primary or replica

This commit cherry picks some infrastructure changes from the `feature/seq_no` branch to make merging from master easier.

More explicitly, IndexShard current have  prepareIndex and prepareDelete methods that are called both on the primary as the replica, giving it a different origin parameter. Instead, this commits creates two explicit prepare*OnPrimary and prepare*OnReplica methods. This has the extra added value of not expecting the caller to use an Engine enum.

Also, the commit adds some code reuse between TransportIndexAction and TransportDeleteAction and their TransportShardBulkAction counter parts.

Closes #15282
This commit is contained in:
Boaz Leskes 2015-12-07 16:36:31 +01:00
parent 47bcb33006
commit 82b502c21f
7 changed files with 191 additions and 151 deletions

View File

@ -25,8 +25,10 @@ import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.delete.TransportDeleteAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.index.TransportIndexAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.action.update.UpdateHelper;
@ -49,8 +51,6 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
@ -164,7 +164,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
try {
// add the response
final WriteResult<DeleteResponse> writeResult = shardDeleteOperation(request, deleteRequest, indexShard);
final WriteResult<DeleteResponse> writeResult = TransportDeleteAction.executeDeleteRequestOnPrimary(deleteRequest, indexShard);
DeleteResponse deleteResponse = writeResult.response();
location = locationToSync(location, writeResult.location);
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE, deleteResponse));
@ -304,7 +304,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
assert preVersionTypes[requestIndex] != null;
}
processAfter(request.refresh(), indexShard, location);
processAfterWrite(request.refresh(), indexShard, location);
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
BulkItemRequest[] items = request.items();
for (int i = 0; i < items.length; i++) {
@ -320,7 +320,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
}
}
private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, ClusterState clusterState,
private WriteResult<IndexResponse> shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, ClusterState clusterState,
IndexShard indexShard, boolean processed) throws Throwable {
// validate, if routing is required, that we got routing
@ -334,21 +334,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
if (!processed) {
indexRequest.process(clusterState.metaData(), mappingMd, allowIdGeneration, request.index());
}
return executeIndexRequestOnPrimary(indexRequest, indexShard);
}
private WriteResult<DeleteResponse> shardDeleteOperation(BulkShardRequest request, DeleteRequest deleteRequest, IndexShard indexShard) {
Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version(), deleteRequest.versionType(), Engine.Operation.Origin.PRIMARY);
indexShard.delete(delete);
// update the request with the version so it will go to the replicas
deleteRequest.versionType(delete.versionType().versionTypeForReplicationAndRecovery());
deleteRequest.version(delete.version());
assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version());
DeleteResponse deleteResponse = new DeleteResponse(request.index(), deleteRequest.type(), deleteRequest.id(), delete.version(), delete.found());
return new WriteResult(deleteResponse, delete.getTranslogLocation());
return TransportIndexAction.executeIndexRequestOnPrimary(indexRequest, indexShard, mappingUpdatedAction);
}
static class UpdateResult {
@ -424,7 +410,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
case DELETE:
DeleteRequest deleteRequest = translate.action();
try {
WriteResult result = shardDeleteOperation(bulkShardRequest, deleteRequest, indexShard);
WriteResult<DeleteResponse> result = TransportDeleteAction.executeDeleteRequestOnPrimary(deleteRequest, indexShard);
return new UpdateResult(translate, deleteRequest, result);
} catch (Throwable t) {
t = ExceptionsHelper.unwrapCause(t);
@ -457,15 +443,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
if (item.request() instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) item.request();
try {
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, indexRequest.source()).index(shardId.getIndex()).type(indexRequest.type()).id(indexRequest.id())
.routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl());
final Engine.Index operation = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA);
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
indexShard.index(operation);
Engine.Index operation = TransportIndexAction.executeIndexRequestOnReplica(indexRequest, indexShard);
location = locationToSync(location, operation.getTranslogLocation());
} catch (Throwable e) {
// if its not an ignore replica failure, we need to make sure to bubble up the failure
@ -477,7 +455,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
} else if (item.request() instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) item.request();
try {
Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version(), deleteRequest.versionType(), Engine.Operation.Origin.REPLICA);
Engine.Delete delete = TransportDeleteAction.executeDeleteRequestOnReplica(deleteRequest, indexShard);
indexShard.delete(delete);
location = locationToSync(location, delete.getTranslogLocation());
} catch (Throwable e) {
@ -492,7 +470,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
}
}
processAfter(request.refresh(), indexShard, location);
processAfterWrite(request.refresh(), indexShard, location);
}
private void applyVersion(BulkItemRequest item, long version, VersionType versionType) {

View File

@ -130,26 +130,36 @@ public class TransportDeleteAction extends TransportReplicationAction<DeleteRequ
protected Tuple<DeleteResponse, DeleteRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
DeleteRequest request = shardRequest.request;
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).getShard(shardRequest.shardId.id());
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY);
final WriteResult<DeleteResponse> result = executeDeleteRequestOnPrimary(request, indexShard);
processAfterWrite(request.refresh(), indexShard, result.location);
return new Tuple<>(result.response, shardRequest.request);
}
public static WriteResult<DeleteResponse> executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard indexShard) {
Engine.Delete delete = indexShard.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType());
indexShard.delete(delete);
// update the request with the version so it will go to the replicas
request.versionType(delete.versionType().versionTypeForReplicationAndRecovery());
request.version(delete.version());
assert request.versionType().validateVersionForWrites(request.version());
processAfter(request.refresh(), indexShard, delete.getTranslogLocation());
DeleteResponse response = new DeleteResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), delete.version(), delete.found());
return new Tuple<>(response, shardRequest.request);
return new WriteResult<>(
new DeleteResponse(indexShard.shardId().getIndex(), request.type(), request.id(), delete.version(), delete.found()),
delete.getTranslogLocation());
}
public static Engine.Delete executeDeleteRequestOnReplica(DeleteRequest request, IndexShard indexShard) {
Engine.Delete delete = indexShard.prepareDeleteOnReplica(request.type(), request.id(), request.version(), request.versionType());
indexShard.delete(delete);
return delete;
}
@Override
protected void shardOperationOnReplica(ShardId shardId, DeleteRequest request) {
IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id());
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), request.versionType(), Engine.Operation.Origin.REPLICA);
indexShard.delete(delete);
processAfter(request.refresh(), indexShard, delete.getTranslogLocation());
Engine.Delete delete = executeDeleteRequestOnReplica(request, indexShard);
processAfterWrite(request.refresh(), indexShard, delete.getTranslogLocation());
}
@Override

View File

@ -166,11 +166,11 @@ public class TransportIndexAction extends TransportReplicationAction<IndexReques
IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardRequest.shardId.id());
final WriteResult<IndexResponse> result = executeIndexRequestOnPrimary(request, indexShard);
final WriteResult<IndexResponse> result = executeIndexRequestOnPrimary(request, indexShard, mappingUpdatedAction);
final IndexResponse response = result.response;
final Translog.Location location = result.location;
processAfter(request.refresh(), indexShard, location);
processAfterWrite(request.refresh(), indexShard, location);
return new Tuple<>(response, shardRequest.request);
}
@ -178,16 +178,64 @@ public class TransportIndexAction extends TransportReplicationAction<IndexReques
protected void shardOperationOnReplica(ShardId shardId, IndexRequest request) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
final Engine.Index operation = executeIndexRequestOnReplica(request, indexShard);
processAfterWrite(request.refresh(), indexShard, operation.getTranslogLocation());
}
/**
* Execute the given {@link IndexRequest} on a replica shard, throwing a
* {@link RetryOnReplicaException} if the operation needs to be re-tried.
*/
public static Engine.Index executeIndexRequestOnReplica(IndexRequest request, IndexShard indexShard) {
final ShardId shardId = indexShard.shardId();
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).index(shardId.getIndex()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
final Engine.Index operation = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA);
final Engine.Index operation = indexShard.prepareIndexOnReplica(sourceToParse, request.version(), request.versionType());
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
indexShard.index(operation);
processAfter(request.refresh(), indexShard, operation.getTranslogLocation());
return operation;
}
/** Utility method to prepare an index operation on primary shards */
public static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard indexShard) {
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).index(request.index()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
return indexShard.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType());
}
/**
* Execute the given {@link IndexRequest} on a primary shard, throwing a
* {@link RetryOnPrimaryException} if the operation needs to be re-tried.
*/
public static WriteResult<IndexResponse> executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard, MappingUpdatedAction mappingUpdatedAction) throws Throwable {
Engine.Index operation = prepareIndexOperationOnPrimary(request, indexShard);
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
final ShardId shardId = indexShard.shardId();
if (update != null) {
final String indexName = shardId.getIndex();
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
operation = prepareIndexOperationOnPrimary(request, indexShard);
update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnPrimaryException(shardId,
"Dynamics mappings are not available on the node that holds the primary yet");
}
}
final boolean created = indexShard.index(operation);
// update the version on request so it will happen on the replicas
final long version = operation.version();
request.version(version);
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
assert request.versionType().validateVersionForWrites(request.version());
return new WriteResult<>(new IndexResponse(shardId.getIndex(), request.type(), request.id(), request.version(), created), operation.getTranslogLocation());
}
}

View File

@ -25,9 +25,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequest.OpType;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.TransportActions;
@ -55,10 +52,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
@ -1071,43 +1065,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
}
}
/** Utility method to create either an index or a create operation depending
* on the {@link OpType} of the request. */
private Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard indexShard) {
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).index(request.index()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
return indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY);
}
/** Execute the given {@link IndexRequest} on a primary shard, throwing a
* {@link RetryOnPrimaryException} if the operation needs to be re-tried. */
protected final WriteResult<IndexResponse> executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard) throws Throwable {
Engine.Index operation = prepareIndexOperationOnPrimary(request, indexShard);
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
final ShardId shardId = indexShard.shardId();
if (update != null) {
final String indexName = shardId.getIndex();
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
operation = prepareIndexOperationOnPrimary(request, indexShard);
update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnPrimaryException(shardId,
"Dynamics mappings are not available on the node that holds the primary yet");
}
}
final boolean created = indexShard.index(operation);
// update the version on request so it will happen on the replicas
final long version = operation.version();
request.version(version);
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
assert request.versionType().validateVersionForWrites(request.version());
return new WriteResult(new IndexResponse(shardId.getIndex(), request.type(), request.id(), request.version(), created), operation.getTranslogLocation());
}
protected final void processAfter(boolean refresh, IndexShard indexShard, Translog.Location location) {
protected final void processAfterWrite(boolean refresh, IndexShard indexShard, Translog.Location location) {
if (refresh) {
try {
indexShard.refresh("refresh_flag_index");

View File

@ -215,7 +215,7 @@ public final class ShardRouting implements Streamable, ToXContent {
public ShardRouting buildTargetRelocatingShard() {
assert relocating();
return new ShardRouting(index, shardId, relocatingNodeId, currentNodeId, restoreSource, primary, ShardRoutingState.INITIALIZING, version, unassignedInfo,
AllocationId.newTargetRelocation(allocationId), true, expectedShardSize);
AllocationId.newTargetRelocation(allocationId), true, expectedShardSize);
}
/**
@ -538,29 +538,36 @@ public final class ShardRouting implements Streamable, ToXContent {
return b;
}
/**
* Returns <code>true</code> if this shard is a relocation target for another shard (i.e., was created with {@link #buildTargetRelocatingShard()}
*/
public boolean isRelocationTarget() {
return state == ShardRoutingState.INITIALIZING && relocatingNodeId != null;
}
/** returns true if the routing is the relocation target of the given routing */
public boolean isRelocationTargetOf(ShardRouting other) {
boolean b = this.allocationId != null && other.allocationId != null && this.state == ShardRoutingState.INITIALIZING &&
this.allocationId.getId().equals(other.allocationId.getRelocationId());
this.allocationId.getId().equals(other.allocationId.getRelocationId());
assert b == false || other.state == ShardRoutingState.RELOCATING :
"ShardRouting is a relocation target but the source shard state isn't relocating. This [" + this + "], other [" + other + "]";
"ShardRouting is a relocation target but the source shard state isn't relocating. This [" + this + "], other [" + other + "]";
assert b == false || other.allocationId.getId().equals(this.allocationId.getRelocationId()) :
"ShardRouting is a relocation target but the source id isn't equal to source's allocationId.getRelocationId. This [" + this + "], other [" + other + "]";
"ShardRouting is a relocation target but the source id isn't equal to source's allocationId.getRelocationId. This [" + this + "], other [" + other + "]";
assert b == false || other.currentNodeId().equals(this.relocatingNodeId) :
"ShardRouting is a relocation target but source current node id isn't equal to target relocating node. This [" + this + "], other [" + other + "]";
"ShardRouting is a relocation target but source current node id isn't equal to target relocating node. This [" + this + "], other [" + other + "]";
assert b == false || this.currentNodeId().equals(other.relocatingNodeId) :
"ShardRouting is a relocation target but current node id isn't equal to source relocating node. This [" + this + "], other [" + other + "]";
"ShardRouting is a relocation target but current node id isn't equal to source relocating node. This [" + this + "], other [" + other + "]";
assert b == false || isSameShard(other) :
"ShardRouting is a relocation target but both routings are not of the same shard. This [" + this + "], other [" + other + "]";
"ShardRouting is a relocation target but both routings are not of the same shard. This [" + this + "], other [" + other + "]";
assert b == false || this.primary == other.primary :
"ShardRouting is a relocation target but primary flag is different. This [" + this + "], target [" + other + "]";
"ShardRouting is a relocation target but primary flag is different. This [" + this + "], target [" + other + "]";
return b;
}
@ -568,26 +575,26 @@ public final class ShardRouting implements Streamable, ToXContent {
/** returns true if the routing is the relocation source for the given routing */
public boolean isRelocationSourceOf(ShardRouting other) {
boolean b = this.allocationId != null && other.allocationId != null && other.state == ShardRoutingState.INITIALIZING &&
other.allocationId.getId().equals(this.allocationId.getRelocationId());
other.allocationId.getId().equals(this.allocationId.getRelocationId());
assert b == false || this.state == ShardRoutingState.RELOCATING :
"ShardRouting is a relocation source but shard state isn't relocating. This [" + this + "], other [" + other + "]";
"ShardRouting is a relocation source but shard state isn't relocating. This [" + this + "], other [" + other + "]";
assert b == false || this.allocationId.getId().equals(other.allocationId.getRelocationId()) :
"ShardRouting is a relocation source but the allocation id isn't equal to other.allocationId.getRelocationId. This [" + this + "], other [" + other + "]";
"ShardRouting is a relocation source but the allocation id isn't equal to other.allocationId.getRelocationId. This [" + this + "], other [" + other + "]";
assert b == false || this.currentNodeId().equals(other.relocatingNodeId) :
"ShardRouting is a relocation source but current node isn't equal to other's relocating node. This [" + this + "], other [" + other + "]";
"ShardRouting is a relocation source but current node isn't equal to other's relocating node. This [" + this + "], other [" + other + "]";
assert b == false || other.currentNodeId().equals(this.relocatingNodeId) :
"ShardRouting is a relocation source but relocating node isn't equal to other's current node. This [" + this + "], other [" + other + "]";
"ShardRouting is a relocation source but relocating node isn't equal to other's current node. This [" + this + "], other [" + other + "]";
assert b == false || isSameShard(other) :
"ShardRouting is a relocation source but both routings are not of the same shard. This [" + this + "], target [" + other + "]";
"ShardRouting is a relocation source but both routings are not of the same shard. This [" + this + "], target [" + other + "]";
assert b == false || this.primary == other.primary :
"ShardRouting is a relocation source but primary flag is different. This [" + this + "], target [" + other + "]";
"ShardRouting is a relocation source but primary flag is different. This [" + this + "], target [" + other + "]";
return b;
}
@ -701,14 +708,14 @@ public final class ShardRouting implements Streamable, ToXContent {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject()
.field("state", state())
.field("primary", primary())
.field("node", currentNodeId())
.field("relocating_node", relocatingNodeId())
.field("shard", shardId().id())
.field("index", shardId().index().name())
.field("version", version);
if (expectedShardSize != UNAVAILABLE_EXPECTED_SHARD_SIZE){
.field("state", state())
.field("primary", primary())
.field("node", currentNodeId())
.field("relocating_node", relocatingNodeId())
.field("shard", shardId().id())
.field("index", shardId().index().name())
.field("version", version);
if (expectedShardSize != UNAVAILABLE_EXPECTED_SHARD_SIZE) {
builder.field("expected_shard_size_in_bytes", expectedShardSize);
}
if (restoreSource() != null) {

View File

@ -19,10 +19,7 @@
package org.elasticsearch.index.shard;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.*;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
import org.apache.lucene.store.AlreadyClosedException;
@ -194,8 +191,10 @@ public class IndexShard extends AbstractIndexShardComponent {
private final IndexSearcherWrapper searcherWrapper;
private final TimeValue inactiveTime;
/** True if this shard is still indexing (recently) and false if we've been idle for long enough (as periodically checked by {@link
* IndexingMemoryController}). */
/**
* True if this shard is still indexing (recently) and false if we've been idle for long enough (as periodically checked by {@link
* IndexingMemoryController}).
*/
private final AtomicBoolean active = new AtomicBoolean();
public IndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache,
@ -220,10 +219,10 @@ public class IndexShard extends AbstractIndexShardComponent {
this.indexCache = indexCache;
this.indexingService = new ShardIndexingService(shardId, indexSettings);
this.getService = new ShardGetService(indexSettings, this, mapperService);
this.termVectorsService = provider.getTermVectorsService();
this.termVectorsService = provider.getTermVectorsService();
this.searchService = new ShardSearchStats(settings);
this.shardWarmerService = new ShardIndexWarmerService(shardId, indexSettings);
this.indicesQueryCache = provider.getIndicesQueryCache();
this.indicesQueryCache = provider.getIndicesQueryCache();
this.shardQueryCache = new ShardRequestCache(shardId, indexSettings);
this.shardFieldData = new ShardFieldData();
this.indexFieldDataService = indexFieldDataService;
@ -238,7 +237,7 @@ public class IndexShard extends AbstractIndexShardComponent {
this.checkIndexOnStartup = settings.get("index.shard.check_on_startup", "false");
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, getFromSettings(logger, settings, Translog.Durabilty.REQUEST),
provider.getBigArrays(), threadPool);
provider.getBigArrays(), threadPool);
final QueryCachingPolicy cachingPolicy;
// the query cache is a node-level thing, however we want the most popular filters
// to be computed on a per-shard basis
@ -394,7 +393,7 @@ public class IndexShard extends AbstractIndexShardComponent {
* Marks the shard as recovering based on a recovery state, fails with exception is recovering is not allowed to be set.
*/
public IndexShardState markAsRecovering(String reason, RecoveryState recoveryState) throws IndexShardStartedException,
IndexShardRelocatedException, IndexShardRecoveringException, IndexShardClosedException {
IndexShardRelocatedException, IndexShardRecoveringException, IndexShardClosedException {
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
throw new IndexShardClosedException(shardId);
@ -445,9 +444,21 @@ public class IndexShard extends AbstractIndexShardComponent {
return previousState;
}
public Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) {
public Engine.Index prepareIndexOnPrimary(SourceToParse source, long version, VersionType versionType) {
try {
return prepareIndex(docMapper(source.type()), source, version, versionType, origin);
if (shardRouting.primary() == false) {
throw new IllegalIndexShardStateException(shardId, state, "shard is not a primary");
}
return prepareIndex(docMapper(source.type()), source, version, versionType, Engine.Operation.Origin.PRIMARY);
} catch (Throwable t) {
verifyNotClosed(t);
throw t;
}
}
public Engine.Index prepareIndexOnReplica(SourceToParse source, long version, VersionType versionType) {
try {
return prepareIndex(docMapper(source.type()), source, version, versionType, Engine.Operation.Origin.REPLICA);
} catch (Throwable t) {
verifyNotClosed(t);
throw t;
@ -486,12 +497,28 @@ public class IndexShard extends AbstractIndexShardComponent {
return created;
}
public Engine.Delete prepareDelete(String type, String id, long version, VersionType versionType, Engine.Operation.Origin origin) {
long startTime = System.nanoTime();
public Engine.Delete prepareDeleteOnPrimary(String type, String id, long version, VersionType versionType) {
if (shardRouting.primary() == false) {
throw new IllegalIndexShardStateException(shardId, state, "shard is not a primary");
}
final DocumentMapper documentMapper = docMapper(type).getDocumentMapper();
return new Engine.Delete(type, id, documentMapper.uidMapper().term(Uid.createUid(type, id)), version, versionType, origin, startTime, false);
return prepareDelete(type, id, documentMapper.uidMapper().term(Uid.createUid(type, id)), version, versionType, Engine.Operation.Origin.PRIMARY);
}
public Engine.Delete prepareDeleteOnReplica(String type, String id, long version, VersionType versionType) {
if (shardRouting.primary() && shardRouting.isRelocationTarget() == false) {
throw new IllegalIndexShardStateException(shardId, state, "shard is not a replica");
}
final DocumentMapper documentMapper = docMapper(type).getDocumentMapper();
return prepareDelete(type, id, documentMapper.uidMapper().term(Uid.createUid(type, id)), version, versionType, Engine.Operation.Origin.REPLICA);
}
static Engine.Delete prepareDelete(String type, String id, Term uid, long version, VersionType versionType, Engine.Operation.Origin origin) {
long startTime = System.nanoTime();
return new Engine.Delete(type, id, uid, version, versionType, origin, startTime, false);
}
public void delete(Engine.Delete delete) {
ensureWriteAllowed(delete);
markLastWrite();
@ -533,11 +560,8 @@ public class IndexShard extends AbstractIndexShardComponent {
}
public DocsStats docStats() {
final Engine.Searcher searcher = acquireSearcher("doc_stats");
try {
try (Engine.Searcher searcher = acquireSearcher("doc_stats")) {
return new DocsStats(searcher.reader().numDocs(), searcher.reader().numDeletedDocs());
} finally {
searcher.close();
}
}
@ -652,7 +676,7 @@ public class IndexShard extends AbstractIndexShardComponent {
logger.trace("force merge with {}", forceMerge);
}
getEngine().forceMerge(forceMerge.flush(), forceMerge.maxNumSegments(),
forceMerge.onlyExpungeDeletes(), false, false);
forceMerge.onlyExpungeDeletes(), false, false);
}
/**
@ -666,8 +690,8 @@ public class IndexShard extends AbstractIndexShardComponent {
org.apache.lucene.util.Version previousVersion = minimumCompatibleVersion();
// we just want to upgrade the segments, not actually forge merge to a single segment
getEngine().forceMerge(true, // we need to flush at the end to make sure the upgrade is durable
Integer.MAX_VALUE, // we just want to upgrade the segments, not actually optimize to a single segment
false, true, upgrade.upgradeOnlyAncientSegments());
Integer.MAX_VALUE, // we just want to upgrade the segments, not actually optimize to a single segment
false, true, upgrade.upgradeOnlyAncientSegments());
org.apache.lucene.util.Version version = minimumCompatibleVersion();
if (logger.isTraceEnabled()) {
logger.trace("upgraded segment {} from version {} to version {}", previousVersion, version);
@ -897,7 +921,7 @@ public class IndexShard extends AbstractIndexShardComponent {
public boolean ignoreRecoveryAttempt() {
IndexShardState state = state(); // one time volatile read
return state == IndexShardState.POST_RECOVERY || state == IndexShardState.RECOVERING || state == IndexShardState.STARTED ||
state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED;
state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED;
}
public void readAllowed() throws IllegalIndexShardStateException {
@ -977,8 +1001,10 @@ public class IndexShard extends AbstractIndexShardComponent {
this.shardEventListener.delegates.add(onShardFailure);
}
/** Change the indexing and translog buffer sizes. If {@code IndexWriter} is currently using more than
* the new buffering indexing size then we do a refresh to free up the heap. */
/**
* Change the indexing and translog buffer sizes. If {@code IndexWriter} is currently using more than
* the new buffering indexing size then we do a refresh to free up the heap.
*/
public void updateBufferSize(ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
final EngineConfig config = engineConfig;
@ -1000,7 +1026,7 @@ public class IndexShard extends AbstractIndexShardComponent {
long iwBytesUsed = engine.indexWriterRAMBytesUsed();
String message = LoggerMessageFormat.format("updating index_buffer_size from [{}] to [{}]; IndexWriter now using [{}] bytes",
preValue, shardIndexingBufferSize, iwBytesUsed);
preValue, shardIndexingBufferSize, iwBytesUsed);
if (iwBytesUsed > shardIndexingBufferSize.bytes()) {
// our allowed buffer was changed to less than we are currently using; we ask IW to refresh
@ -1021,9 +1047,11 @@ public class IndexShard extends AbstractIndexShardComponent {
engine.getTranslog().updateBuffer(shardTranslogBufferSize);
}
/** Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last
* indexing operation, and become inactive (reducing indexing and translog buffers to tiny values) if so. This returns true
* if the shard is inactive. */
/**
* Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last
* indexing operation, and become inactive (reducing indexing and translog buffers to tiny values) if so. This returns true
* if the shard is inactive.
*/
public boolean checkIdle() {
return checkIdle(inactiveTime.nanos());
}
@ -1042,8 +1070,10 @@ public class IndexShard extends AbstractIndexShardComponent {
return active.get() == false;
}
/** Returns {@code true} if this shard is active (has seen indexing ops in the last {@link
* IndexShard#INDEX_SHARD_INACTIVE_TIME_SETTING} (default 5 minutes), else {@code false}. */
/**
* Returns {@code true} if this shard is active (has seen indexing ops in the last {@link
* IndexShard#INDEX_SHARD_INACTIVE_TIME_SETTING} (default 5 minutes), else {@code false}.
*/
public boolean getActive() {
return active.get();
}
@ -1077,10 +1107,10 @@ public class IndexShard extends AbstractIndexShardComponent {
return storeRecovery.recoverFromStore(this, shouldExist, localNode);
}
public boolean restoreFromRepository(IndexShardRepository repository, DiscoveryNode locaNode) {
public boolean restoreFromRepository(IndexShardRepository repository, DiscoveryNode localNode) {
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
return storeRecovery.recoverFromRepository(this, repository, locaNode);
return storeRecovery.recoverFromRepository(this, repository, localNode);
}
/**
@ -1369,8 +1399,10 @@ public class IndexShard extends AbstractIndexShardComponent {
return engine;
}
/** NOTE: returns null if engine is not yet started (e.g. recovery phase 1, copying over index files, is still running), or if engine is
* closed. */
/**
* NOTE: returns null if engine is not yet started (e.g. recovery phase 1, copying over index files, is still running), or if engine is
* closed.
*/
protected Engine getEngineOrNull() {
return this.currentEngineReference.get();
}
@ -1427,9 +1459,9 @@ public class IndexShard extends AbstractIndexShardComponent {
writeReason = "routing changed from " + currentRouting + " to " + newRouting;
} else {
logger.trace("skip writing shard state, has been written before; previous version: [" +
currentRouting.version() + "] current version [" + newRouting.version() + "]");
currentRouting.version() + "] current version [" + newRouting.version() + "]");
assert currentRouting.version() <= newRouting.version() : "version should not go backwards for shardID: " + shardId +
" previous version: [" + currentRouting.version() + "] current version [" + newRouting.version() + "]";
" previous version: [" + currentRouting.version() + "] current version [" + newRouting.version() + "]";
return;
}
final ShardStateMetaData newShardStateMetadata = new ShardStateMetaData(newRouting.version(), newRouting.primary(), getIndexUUID(), newRouting.allocationId());
@ -1461,8 +1493,8 @@ public class IndexShard extends AbstractIndexShardComponent {
};
final Engine.Warmer engineWarmer = (searcher, toLevel) -> warmer.warm(searcher, this, idxSettings, toLevel);
return new EngineConfig(shardId,
threadPool, indexingService, indexSettings, engineWarmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig,
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig, inactiveTime);
threadPool, indexingService, indexSettings, engineWarmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig,
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig, inactiveTime);
}
private static class IndexShardOperationCounter extends AbstractRefCounted {
@ -1578,6 +1610,7 @@ public class IndexShard extends AbstractIndexShardComponent {
/**
* Simple struct encapsulating a shard failure
*
* @see IndexShard#addShardFailureCallback(Callback)
*/
public static final class ShardFailure {
@ -1604,7 +1637,7 @@ public class IndexShard extends AbstractIndexShardComponent {
};
private QueryShardContext newQueryShardContext() {
return new QueryShardContext(idxSettings, provider.getClient(), indexCache.bitsetFilterCache(), indexFieldDataService, mapperService, similarityService, provider.getScriptService(), provider.getIndicesQueriesRegistry());
return new QueryShardContext(idxSettings, provider.getClient(), indexCache.bitsetFilterCache(), indexFieldDataService, mapperService, similarityService, provider.getScriptService(), provider.getIndicesQueriesRegistry());
}
/**

View File

@ -99,12 +99,18 @@ public class ShardRoutingTests extends ESTestCase {
ShardRouting initializingShard0 = TestShardRouting.newShardRouting("test", 0, "node1", randomBoolean(), ShardRoutingState.INITIALIZING, 1);
ShardRouting initializingShard1 = TestShardRouting.newShardRouting("test", 1, "node1", randomBoolean(), ShardRoutingState.INITIALIZING, 1);
ShardRouting startedShard0 = new ShardRouting(initializingShard0);
assertFalse(startedShard0.isRelocationTarget());
startedShard0.moveToStarted();
assertFalse(startedShard0.isRelocationTarget());
ShardRouting startedShard1 = new ShardRouting(initializingShard1);
assertFalse(startedShard1.isRelocationTarget());
startedShard1.moveToStarted();
assertFalse(startedShard1.isRelocationTarget());
ShardRouting sourceShard0a = new ShardRouting(startedShard0);
sourceShard0a.relocate("node2", -1);
assertFalse(sourceShard0a.isRelocationTarget());
ShardRouting targetShard0a = sourceShard0a.buildTargetRelocatingShard();
assertTrue(targetShard0a.isRelocationTarget());
ShardRouting sourceShard0b = new ShardRouting(startedShard0);
sourceShard0b.relocate("node2", -1);
ShardRouting sourceShard1 = new ShardRouting(startedShard1);