diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index a4565cf4cfc..e51a1b938d8 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -25,8 +25,10 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.delete.TransportDeleteAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.index.TransportIndexAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.action.update.UpdateHelper; @@ -49,8 +51,6 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; -import org.elasticsearch.index.mapper.Mapping; -import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; @@ -164,7 +164,7 @@ public class TransportShardBulkAction extends TransportReplicationAction writeResult = shardDeleteOperation(request, deleteRequest, indexShard); + final WriteResult writeResult = TransportDeleteAction.executeDeleteRequestOnPrimary(deleteRequest, indexShard); DeleteResponse deleteResponse = writeResult.response(); location = locationToSync(location, writeResult.location); setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE, deleteResponse)); @@ -304,7 +304,7 @@ public class TransportShardBulkAction extends TransportReplicationAction shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, ClusterState clusterState, IndexShard indexShard, boolean processed) throws Throwable { // validate, if routing is required, that we got routing @@ -334,21 +334,7 @@ public class TransportShardBulkAction extends TransportReplicationAction shardDeleteOperation(BulkShardRequest request, DeleteRequest deleteRequest, IndexShard indexShard) { - Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version(), deleteRequest.versionType(), Engine.Operation.Origin.PRIMARY); - indexShard.delete(delete); - // update the request with the version so it will go to the replicas - deleteRequest.versionType(delete.versionType().versionTypeForReplicationAndRecovery()); - deleteRequest.version(delete.version()); - - assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version()); - - DeleteResponse deleteResponse = new DeleteResponse(request.index(), deleteRequest.type(), deleteRequest.id(), delete.version(), delete.found()); - return new WriteResult(deleteResponse, delete.getTranslogLocation()); + return TransportIndexAction.executeIndexRequestOnPrimary(indexRequest, indexShard, mappingUpdatedAction); } static class UpdateResult { @@ -424,7 +410,7 @@ public class TransportShardBulkAction extends TransportReplicationAction result = TransportDeleteAction.executeDeleteRequestOnPrimary(deleteRequest, indexShard); return new UpdateResult(translate, deleteRequest, result); } catch (Throwable t) { t = ExceptionsHelper.unwrapCause(t); @@ -457,15 +443,7 @@ public class TransportShardBulkAction extends TransportReplicationAction shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) { DeleteRequest request = shardRequest.request; IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).getShard(shardRequest.shardId.id()); - Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY); + final WriteResult result = executeDeleteRequestOnPrimary(request, indexShard); + processAfterWrite(request.refresh(), indexShard, result.location); + return new Tuple<>(result.response, shardRequest.request); + } + + public static WriteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard indexShard) { + Engine.Delete delete = indexShard.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType()); indexShard.delete(delete); // update the request with the version so it will go to the replicas request.versionType(delete.versionType().versionTypeForReplicationAndRecovery()); request.version(delete.version()); assert request.versionType().validateVersionForWrites(request.version()); - processAfter(request.refresh(), indexShard, delete.getTranslogLocation()); - - DeleteResponse response = new DeleteResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), delete.version(), delete.found()); - return new Tuple<>(response, shardRequest.request); + return new WriteResult<>( + new DeleteResponse(indexShard.shardId().getIndex(), request.type(), request.id(), delete.version(), delete.found()), + delete.getTranslogLocation()); } + public static Engine.Delete executeDeleteRequestOnReplica(DeleteRequest request, IndexShard indexShard) { + Engine.Delete delete = indexShard.prepareDeleteOnReplica(request.type(), request.id(), request.version(), request.versionType()); + indexShard.delete(delete); + return delete; + } + + @Override protected void shardOperationOnReplica(ShardId shardId, DeleteRequest request) { IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id()); - Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), request.versionType(), Engine.Operation.Origin.REPLICA); - - indexShard.delete(delete); - processAfter(request.refresh(), indexShard, delete.getTranslogLocation()); + Engine.Delete delete = executeDeleteRequestOnReplica(request, indexShard); + processAfterWrite(request.refresh(), indexShard, delete.getTranslogLocation()); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 3ffb5765e8c..06a417240d4 100644 --- a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -166,11 +166,11 @@ public class TransportIndexAction extends TransportReplicationAction result = executeIndexRequestOnPrimary(request, indexShard); + final WriteResult result = executeIndexRequestOnPrimary(request, indexShard, mappingUpdatedAction); final IndexResponse response = result.response; final Translog.Location location = result.location; - processAfter(request.refresh(), indexShard, location); + processAfterWrite(request.refresh(), indexShard, location); return new Tuple<>(response, shardRequest.request); } @@ -178,16 +178,64 @@ public class TransportIndexAction extends TransportReplicationAction executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard, MappingUpdatedAction mappingUpdatedAction) throws Throwable { + Engine.Index operation = prepareIndexOperationOnPrimary(request, indexShard); + Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); + final ShardId shardId = indexShard.shardId(); + if (update != null) { + final String indexName = shardId.getIndex(); + mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update); + operation = prepareIndexOperationOnPrimary(request, indexShard); + update = operation.parsedDoc().dynamicMappingsUpdate(); + if (update != null) { + throw new RetryOnPrimaryException(shardId, + "Dynamics mappings are not available on the node that holds the primary yet"); + } + } + final boolean created = indexShard.index(operation); + + // update the version on request so it will happen on the replicas + final long version = operation.version(); + request.version(version); + request.versionType(request.versionType().versionTypeForReplicationAndRecovery()); + + assert request.versionType().validateVersionForWrites(request.version()); + + return new WriteResult<>(new IndexResponse(shardId.getIndex(), request.type(), request.id(), request.version(), created), operation.getTranslogLocation()); } } + diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 62802910cdb..3b4d860f31e 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -25,9 +25,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.WriteConsistencyLevel; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexRequest.OpType; -import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportActions; @@ -55,10 +52,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; -import org.elasticsearch.index.mapper.Mapping; -import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; @@ -1071,43 +1065,7 @@ public abstract class TransportReplicationAction executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard) throws Throwable { - Engine.Index operation = prepareIndexOperationOnPrimary(request, indexShard); - Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); - final ShardId shardId = indexShard.shardId(); - if (update != null) { - final String indexName = shardId.getIndex(); - mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update); - operation = prepareIndexOperationOnPrimary(request, indexShard); - update = operation.parsedDoc().dynamicMappingsUpdate(); - if (update != null) { - throw new RetryOnPrimaryException(shardId, - "Dynamics mappings are not available on the node that holds the primary yet"); - } - } - final boolean created = indexShard.index(operation); - - // update the version on request so it will happen on the replicas - final long version = operation.version(); - request.version(version); - request.versionType(request.versionType().versionTypeForReplicationAndRecovery()); - - assert request.versionType().validateVersionForWrites(request.version()); - - return new WriteResult(new IndexResponse(shardId.getIndex(), request.type(), request.id(), request.version(), created), operation.getTranslogLocation()); - } - - protected final void processAfter(boolean refresh, IndexShard indexShard, Translog.Location location) { + protected final void processAfterWrite(boolean refresh, IndexShard indexShard, Translog.Location location) { if (refresh) { try { indexShard.refresh("refresh_flag_index"); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java b/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java index 7a9effac9b8..8dd71e3fba5 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java @@ -215,7 +215,7 @@ public final class ShardRouting implements Streamable, ToXContent { public ShardRouting buildTargetRelocatingShard() { assert relocating(); return new ShardRouting(index, shardId, relocatingNodeId, currentNodeId, restoreSource, primary, ShardRoutingState.INITIALIZING, version, unassignedInfo, - AllocationId.newTargetRelocation(allocationId), true, expectedShardSize); + AllocationId.newTargetRelocation(allocationId), true, expectedShardSize); } /** @@ -538,29 +538,36 @@ public final class ShardRouting implements Streamable, ToXContent { return b; } + /** + * Returns true if this shard is a relocation target for another shard (i.e., was created with {@link #buildTargetRelocatingShard()} + */ + public boolean isRelocationTarget() { + return state == ShardRoutingState.INITIALIZING && relocatingNodeId != null; + } + /** returns true if the routing is the relocation target of the given routing */ public boolean isRelocationTargetOf(ShardRouting other) { boolean b = this.allocationId != null && other.allocationId != null && this.state == ShardRoutingState.INITIALIZING && - this.allocationId.getId().equals(other.allocationId.getRelocationId()); + this.allocationId.getId().equals(other.allocationId.getRelocationId()); assert b == false || other.state == ShardRoutingState.RELOCATING : - "ShardRouting is a relocation target but the source shard state isn't relocating. This [" + this + "], other [" + other + "]"; + "ShardRouting is a relocation target but the source shard state isn't relocating. This [" + this + "], other [" + other + "]"; assert b == false || other.allocationId.getId().equals(this.allocationId.getRelocationId()) : - "ShardRouting is a relocation target but the source id isn't equal to source's allocationId.getRelocationId. This [" + this + "], other [" + other + "]"; + "ShardRouting is a relocation target but the source id isn't equal to source's allocationId.getRelocationId. This [" + this + "], other [" + other + "]"; assert b == false || other.currentNodeId().equals(this.relocatingNodeId) : - "ShardRouting is a relocation target but source current node id isn't equal to target relocating node. This [" + this + "], other [" + other + "]"; + "ShardRouting is a relocation target but source current node id isn't equal to target relocating node. This [" + this + "], other [" + other + "]"; assert b == false || this.currentNodeId().equals(other.relocatingNodeId) : - "ShardRouting is a relocation target but current node id isn't equal to source relocating node. This [" + this + "], other [" + other + "]"; + "ShardRouting is a relocation target but current node id isn't equal to source relocating node. This [" + this + "], other [" + other + "]"; assert b == false || isSameShard(other) : - "ShardRouting is a relocation target but both routings are not of the same shard. This [" + this + "], other [" + other + "]"; + "ShardRouting is a relocation target but both routings are not of the same shard. This [" + this + "], other [" + other + "]"; assert b == false || this.primary == other.primary : - "ShardRouting is a relocation target but primary flag is different. This [" + this + "], target [" + other + "]"; + "ShardRouting is a relocation target but primary flag is different. This [" + this + "], target [" + other + "]"; return b; } @@ -568,26 +575,26 @@ public final class ShardRouting implements Streamable, ToXContent { /** returns true if the routing is the relocation source for the given routing */ public boolean isRelocationSourceOf(ShardRouting other) { boolean b = this.allocationId != null && other.allocationId != null && other.state == ShardRoutingState.INITIALIZING && - other.allocationId.getId().equals(this.allocationId.getRelocationId()); + other.allocationId.getId().equals(this.allocationId.getRelocationId()); assert b == false || this.state == ShardRoutingState.RELOCATING : - "ShardRouting is a relocation source but shard state isn't relocating. This [" + this + "], other [" + other + "]"; + "ShardRouting is a relocation source but shard state isn't relocating. This [" + this + "], other [" + other + "]"; assert b == false || this.allocationId.getId().equals(other.allocationId.getRelocationId()) : - "ShardRouting is a relocation source but the allocation id isn't equal to other.allocationId.getRelocationId. This [" + this + "], other [" + other + "]"; + "ShardRouting is a relocation source but the allocation id isn't equal to other.allocationId.getRelocationId. This [" + this + "], other [" + other + "]"; assert b == false || this.currentNodeId().equals(other.relocatingNodeId) : - "ShardRouting is a relocation source but current node isn't equal to other's relocating node. This [" + this + "], other [" + other + "]"; + "ShardRouting is a relocation source but current node isn't equal to other's relocating node. This [" + this + "], other [" + other + "]"; assert b == false || other.currentNodeId().equals(this.relocatingNodeId) : - "ShardRouting is a relocation source but relocating node isn't equal to other's current node. This [" + this + "], other [" + other + "]"; + "ShardRouting is a relocation source but relocating node isn't equal to other's current node. This [" + this + "], other [" + other + "]"; assert b == false || isSameShard(other) : - "ShardRouting is a relocation source but both routings are not of the same shard. This [" + this + "], target [" + other + "]"; + "ShardRouting is a relocation source but both routings are not of the same shard. This [" + this + "], target [" + other + "]"; assert b == false || this.primary == other.primary : - "ShardRouting is a relocation source but primary flag is different. This [" + this + "], target [" + other + "]"; + "ShardRouting is a relocation source but primary flag is different. This [" + this + "], target [" + other + "]"; return b; } @@ -701,14 +708,14 @@ public final class ShardRouting implements Streamable, ToXContent { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject() - .field("state", state()) - .field("primary", primary()) - .field("node", currentNodeId()) - .field("relocating_node", relocatingNodeId()) - .field("shard", shardId().id()) - .field("index", shardId().index().name()) - .field("version", version); - if (expectedShardSize != UNAVAILABLE_EXPECTED_SHARD_SIZE){ + .field("state", state()) + .field("primary", primary()) + .field("node", currentNodeId()) + .field("relocating_node", relocatingNodeId()) + .field("shard", shardId().id()) + .field("index", shardId().index().name()) + .field("version", version); + if (expectedShardSize != UNAVAILABLE_EXPECTED_SHARD_SIZE) { builder.field("expected_shard_size_in_bytes", expectedShardSize); } if (restoreSource() != null) { diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 7804b50c390..d7e4294ee10 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -19,10 +19,7 @@ package org.elasticsearch.index.shard; -import org.apache.lucene.index.CheckIndex; -import org.apache.lucene.index.IndexCommit; -import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; -import org.apache.lucene.index.SnapshotDeletionPolicy; +import org.apache.lucene.index.*; import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.UsageTrackingQueryCachingPolicy; import org.apache.lucene.store.AlreadyClosedException; @@ -194,8 +191,10 @@ public class IndexShard extends AbstractIndexShardComponent { private final IndexSearcherWrapper searcherWrapper; private final TimeValue inactiveTime; - /** True if this shard is still indexing (recently) and false if we've been idle for long enough (as periodically checked by {@link - * IndexingMemoryController}). */ + /** + * True if this shard is still indexing (recently) and false if we've been idle for long enough (as periodically checked by {@link + * IndexingMemoryController}). + */ private final AtomicBoolean active = new AtomicBoolean(); public IndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache, @@ -220,10 +219,10 @@ public class IndexShard extends AbstractIndexShardComponent { this.indexCache = indexCache; this.indexingService = new ShardIndexingService(shardId, indexSettings); this.getService = new ShardGetService(indexSettings, this, mapperService); - this.termVectorsService = provider.getTermVectorsService(); + this.termVectorsService = provider.getTermVectorsService(); this.searchService = new ShardSearchStats(settings); this.shardWarmerService = new ShardIndexWarmerService(shardId, indexSettings); - this.indicesQueryCache = provider.getIndicesQueryCache(); + this.indicesQueryCache = provider.getIndicesQueryCache(); this.shardQueryCache = new ShardRequestCache(shardId, indexSettings); this.shardFieldData = new ShardFieldData(); this.indexFieldDataService = indexFieldDataService; @@ -238,7 +237,7 @@ public class IndexShard extends AbstractIndexShardComponent { this.checkIndexOnStartup = settings.get("index.shard.check_on_startup", "false"); this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, getFromSettings(logger, settings, Translog.Durabilty.REQUEST), - provider.getBigArrays(), threadPool); + provider.getBigArrays(), threadPool); final QueryCachingPolicy cachingPolicy; // the query cache is a node-level thing, however we want the most popular filters // to be computed on a per-shard basis @@ -394,7 +393,7 @@ public class IndexShard extends AbstractIndexShardComponent { * Marks the shard as recovering based on a recovery state, fails with exception is recovering is not allowed to be set. */ public IndexShardState markAsRecovering(String reason, RecoveryState recoveryState) throws IndexShardStartedException, - IndexShardRelocatedException, IndexShardRecoveringException, IndexShardClosedException { + IndexShardRelocatedException, IndexShardRecoveringException, IndexShardClosedException { synchronized (mutex) { if (state == IndexShardState.CLOSED) { throw new IndexShardClosedException(shardId); @@ -445,9 +444,21 @@ public class IndexShard extends AbstractIndexShardComponent { return previousState; } - public Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) { + public Engine.Index prepareIndexOnPrimary(SourceToParse source, long version, VersionType versionType) { try { - return prepareIndex(docMapper(source.type()), source, version, versionType, origin); + if (shardRouting.primary() == false) { + throw new IllegalIndexShardStateException(shardId, state, "shard is not a primary"); + } + return prepareIndex(docMapper(source.type()), source, version, versionType, Engine.Operation.Origin.PRIMARY); + } catch (Throwable t) { + verifyNotClosed(t); + throw t; + } + } + + public Engine.Index prepareIndexOnReplica(SourceToParse source, long version, VersionType versionType) { + try { + return prepareIndex(docMapper(source.type()), source, version, versionType, Engine.Operation.Origin.REPLICA); } catch (Throwable t) { verifyNotClosed(t); throw t; @@ -486,12 +497,28 @@ public class IndexShard extends AbstractIndexShardComponent { return created; } - public Engine.Delete prepareDelete(String type, String id, long version, VersionType versionType, Engine.Operation.Origin origin) { - long startTime = System.nanoTime(); + public Engine.Delete prepareDeleteOnPrimary(String type, String id, long version, VersionType versionType) { + if (shardRouting.primary() == false) { + throw new IllegalIndexShardStateException(shardId, state, "shard is not a primary"); + } final DocumentMapper documentMapper = docMapper(type).getDocumentMapper(); - return new Engine.Delete(type, id, documentMapper.uidMapper().term(Uid.createUid(type, id)), version, versionType, origin, startTime, false); + return prepareDelete(type, id, documentMapper.uidMapper().term(Uid.createUid(type, id)), version, versionType, Engine.Operation.Origin.PRIMARY); } + public Engine.Delete prepareDeleteOnReplica(String type, String id, long version, VersionType versionType) { + if (shardRouting.primary() && shardRouting.isRelocationTarget() == false) { + throw new IllegalIndexShardStateException(shardId, state, "shard is not a replica"); + } + final DocumentMapper documentMapper = docMapper(type).getDocumentMapper(); + return prepareDelete(type, id, documentMapper.uidMapper().term(Uid.createUid(type, id)), version, versionType, Engine.Operation.Origin.REPLICA); + } + + static Engine.Delete prepareDelete(String type, String id, Term uid, long version, VersionType versionType, Engine.Operation.Origin origin) { + long startTime = System.nanoTime(); + return new Engine.Delete(type, id, uid, version, versionType, origin, startTime, false); + } + + public void delete(Engine.Delete delete) { ensureWriteAllowed(delete); markLastWrite(); @@ -533,11 +560,8 @@ public class IndexShard extends AbstractIndexShardComponent { } public DocsStats docStats() { - final Engine.Searcher searcher = acquireSearcher("doc_stats"); - try { + try (Engine.Searcher searcher = acquireSearcher("doc_stats")) { return new DocsStats(searcher.reader().numDocs(), searcher.reader().numDeletedDocs()); - } finally { - searcher.close(); } } @@ -652,7 +676,7 @@ public class IndexShard extends AbstractIndexShardComponent { logger.trace("force merge with {}", forceMerge); } getEngine().forceMerge(forceMerge.flush(), forceMerge.maxNumSegments(), - forceMerge.onlyExpungeDeletes(), false, false); + forceMerge.onlyExpungeDeletes(), false, false); } /** @@ -666,8 +690,8 @@ public class IndexShard extends AbstractIndexShardComponent { org.apache.lucene.util.Version previousVersion = minimumCompatibleVersion(); // we just want to upgrade the segments, not actually forge merge to a single segment getEngine().forceMerge(true, // we need to flush at the end to make sure the upgrade is durable - Integer.MAX_VALUE, // we just want to upgrade the segments, not actually optimize to a single segment - false, true, upgrade.upgradeOnlyAncientSegments()); + Integer.MAX_VALUE, // we just want to upgrade the segments, not actually optimize to a single segment + false, true, upgrade.upgradeOnlyAncientSegments()); org.apache.lucene.util.Version version = minimumCompatibleVersion(); if (logger.isTraceEnabled()) { logger.trace("upgraded segment {} from version {} to version {}", previousVersion, version); @@ -897,7 +921,7 @@ public class IndexShard extends AbstractIndexShardComponent { public boolean ignoreRecoveryAttempt() { IndexShardState state = state(); // one time volatile read return state == IndexShardState.POST_RECOVERY || state == IndexShardState.RECOVERING || state == IndexShardState.STARTED || - state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED; + state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED; } public void readAllowed() throws IllegalIndexShardStateException { @@ -977,8 +1001,10 @@ public class IndexShard extends AbstractIndexShardComponent { this.shardEventListener.delegates.add(onShardFailure); } - /** Change the indexing and translog buffer sizes. If {@code IndexWriter} is currently using more than - * the new buffering indexing size then we do a refresh to free up the heap. */ + /** + * Change the indexing and translog buffer sizes. If {@code IndexWriter} is currently using more than + * the new buffering indexing size then we do a refresh to free up the heap. + */ public void updateBufferSize(ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) { final EngineConfig config = engineConfig; @@ -1000,7 +1026,7 @@ public class IndexShard extends AbstractIndexShardComponent { long iwBytesUsed = engine.indexWriterRAMBytesUsed(); String message = LoggerMessageFormat.format("updating index_buffer_size from [{}] to [{}]; IndexWriter now using [{}] bytes", - preValue, shardIndexingBufferSize, iwBytesUsed); + preValue, shardIndexingBufferSize, iwBytesUsed); if (iwBytesUsed > shardIndexingBufferSize.bytes()) { // our allowed buffer was changed to less than we are currently using; we ask IW to refresh @@ -1021,9 +1047,11 @@ public class IndexShard extends AbstractIndexShardComponent { engine.getTranslog().updateBuffer(shardTranslogBufferSize); } - /** Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last - * indexing operation, and become inactive (reducing indexing and translog buffers to tiny values) if so. This returns true - * if the shard is inactive. */ + /** + * Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last + * indexing operation, and become inactive (reducing indexing and translog buffers to tiny values) if so. This returns true + * if the shard is inactive. + */ public boolean checkIdle() { return checkIdle(inactiveTime.nanos()); } @@ -1042,8 +1070,10 @@ public class IndexShard extends AbstractIndexShardComponent { return active.get() == false; } - /** Returns {@code true} if this shard is active (has seen indexing ops in the last {@link - * IndexShard#INDEX_SHARD_INACTIVE_TIME_SETTING} (default 5 minutes), else {@code false}. */ + /** + * Returns {@code true} if this shard is active (has seen indexing ops in the last {@link + * IndexShard#INDEX_SHARD_INACTIVE_TIME_SETTING} (default 5 minutes), else {@code false}. + */ public boolean getActive() { return active.get(); } @@ -1077,10 +1107,10 @@ public class IndexShard extends AbstractIndexShardComponent { return storeRecovery.recoverFromStore(this, shouldExist, localNode); } - public boolean restoreFromRepository(IndexShardRepository repository, DiscoveryNode locaNode) { + public boolean restoreFromRepository(IndexShardRepository repository, DiscoveryNode localNode) { assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard"; StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); - return storeRecovery.recoverFromRepository(this, repository, locaNode); + return storeRecovery.recoverFromRepository(this, repository, localNode); } /** @@ -1369,8 +1399,10 @@ public class IndexShard extends AbstractIndexShardComponent { return engine; } - /** NOTE: returns null if engine is not yet started (e.g. recovery phase 1, copying over index files, is still running), or if engine is - * closed. */ + /** + * NOTE: returns null if engine is not yet started (e.g. recovery phase 1, copying over index files, is still running), or if engine is + * closed. + */ protected Engine getEngineOrNull() { return this.currentEngineReference.get(); } @@ -1427,9 +1459,9 @@ public class IndexShard extends AbstractIndexShardComponent { writeReason = "routing changed from " + currentRouting + " to " + newRouting; } else { logger.trace("skip writing shard state, has been written before; previous version: [" + - currentRouting.version() + "] current version [" + newRouting.version() + "]"); + currentRouting.version() + "] current version [" + newRouting.version() + "]"); assert currentRouting.version() <= newRouting.version() : "version should not go backwards for shardID: " + shardId + - " previous version: [" + currentRouting.version() + "] current version [" + newRouting.version() + "]"; + " previous version: [" + currentRouting.version() + "] current version [" + newRouting.version() + "]"; return; } final ShardStateMetaData newShardStateMetadata = new ShardStateMetaData(newRouting.version(), newRouting.primary(), getIndexUUID(), newRouting.allocationId()); @@ -1461,8 +1493,8 @@ public class IndexShard extends AbstractIndexShardComponent { }; final Engine.Warmer engineWarmer = (searcher, toLevel) -> warmer.warm(searcher, this, idxSettings, toLevel); return new EngineConfig(shardId, - threadPool, indexingService, indexSettings, engineWarmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig, - mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig, inactiveTime); + threadPool, indexingService, indexSettings, engineWarmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig, + mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig, inactiveTime); } private static class IndexShardOperationCounter extends AbstractRefCounted { @@ -1578,6 +1610,7 @@ public class IndexShard extends AbstractIndexShardComponent { /** * Simple struct encapsulating a shard failure + * * @see IndexShard#addShardFailureCallback(Callback) */ public static final class ShardFailure { @@ -1604,7 +1637,7 @@ public class IndexShard extends AbstractIndexShardComponent { }; private QueryShardContext newQueryShardContext() { - return new QueryShardContext(idxSettings, provider.getClient(), indexCache.bitsetFilterCache(), indexFieldDataService, mapperService, similarityService, provider.getScriptService(), provider.getIndicesQueriesRegistry()); + return new QueryShardContext(idxSettings, provider.getClient(), indexCache.bitsetFilterCache(), indexFieldDataService, mapperService, similarityService, provider.getScriptService(), provider.getIndicesQueriesRegistry()); } /** diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java index 146e80c7665..54e39cc227d 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java @@ -99,12 +99,18 @@ public class ShardRoutingTests extends ESTestCase { ShardRouting initializingShard0 = TestShardRouting.newShardRouting("test", 0, "node1", randomBoolean(), ShardRoutingState.INITIALIZING, 1); ShardRouting initializingShard1 = TestShardRouting.newShardRouting("test", 1, "node1", randomBoolean(), ShardRoutingState.INITIALIZING, 1); ShardRouting startedShard0 = new ShardRouting(initializingShard0); + assertFalse(startedShard0.isRelocationTarget()); startedShard0.moveToStarted(); + assertFalse(startedShard0.isRelocationTarget()); ShardRouting startedShard1 = new ShardRouting(initializingShard1); + assertFalse(startedShard1.isRelocationTarget()); startedShard1.moveToStarted(); + assertFalse(startedShard1.isRelocationTarget()); ShardRouting sourceShard0a = new ShardRouting(startedShard0); sourceShard0a.relocate("node2", -1); + assertFalse(sourceShard0a.isRelocationTarget()); ShardRouting targetShard0a = sourceShard0a.buildTargetRelocatingShard(); + assertTrue(targetShard0a.isRelocationTarget()); ShardRouting sourceShard0b = new ShardRouting(startedShard0); sourceShard0b.relocate("node2", -1); ShardRouting sourceShard1 = new ShardRouting(startedShard1);