From a808fe9d465d33de22303eb4bf713a4cfeeeb749 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 17 Apr 2014 17:18:43 +0700 Subject: [PATCH] Moved the updateMappingOnMaster logic into a single place. Closes #5798 --- .../action/bulk/TransportShardBulkAction.java | 44 +----------- .../action/index/TransportIndexAction.java | 56 +-------------- .../action/index/MappingUpdatedAction.java | 69 ++++++++++++++++++- .../percolator/PercolatorService.java | 31 +-------- 4 files changed, 73 insertions(+), 127 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index bff9e24d0e9..6024cac5efd 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -24,7 +24,6 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ElasticsearchWrapperException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.delete.DeleteRequest; @@ -41,9 +40,7 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; @@ -56,8 +53,6 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.DocumentAlreadyExistsException; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; -import org.elasticsearch.index.mapper.DocumentMapper; -import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.service.IndexShard; @@ -185,7 +180,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation applyVersion(request.items()[j], preVersions[j], preVersionTypes[j]); } for (Tuple mappingToUpdate : mappingsToUpdate) { - updateMappingOnMaster(mappingToUpdate.v1(), mappingToUpdate.v2()); + mappingUpdatedAction.updateMappingOnMaster(mappingToUpdate.v1(), mappingToUpdate.v2(), true); } throw (ElasticsearchException) e; } @@ -345,7 +340,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation } for (Tuple mappingToUpdate : mappingsToUpdate) { - updateMappingOnMaster(mappingToUpdate.v1(), mappingToUpdate.v2()); + mappingUpdatedAction.updateMappingOnMaster(mappingToUpdate.v1(), mappingToUpdate.v2(), true); } if (request.refresh()) { @@ -600,41 +595,6 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation } } - private void updateMappingOnMaster(final String index, final String type) { - try { - MapperService mapperService = indicesService.indexServiceSafe(index).mapperService(); - final DocumentMapper documentMapper = mapperService.documentMapper(type); - if (documentMapper == null) { // should not happen - return; - } - IndexMetaData metaData = clusterService.state().metaData().index(index); - if (metaData == null) { - return; - } - - // we generate the order id before we get the mapping to send and refresh the source, so - // if 2 happen concurrently, we know that the later order will include the previous one - long orderId = mappingUpdatedAction.generateNextMappingUpdateOrder(); - documentMapper.refreshSource(); - - DiscoveryNode node = clusterService.localNode(); - final MappingUpdatedAction.MappingUpdatedRequest request = new MappingUpdatedAction.MappingUpdatedRequest(index, metaData.uuid(), type, documentMapper.mappingSource(), orderId, node != null ? node.id() : null); - mappingUpdatedAction.execute(request, new ActionListener() { - @Override - public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) { - // all is well - } - - @Override - public void onFailure(Throwable e) { - logger.warn("failed to update master on updated mapping for {}", e, request); - } - }); - } catch (Throwable e) { - logger.warn("failed to update master on updated mapping for index [{}], type [{}]", e, index, type); - } - } - private void applyVersion(BulkItemRequest item, long version, VersionType versionType) { if (item.request() instanceof IndexRequest) { ((IndexRequest) item.request()).version(version).versionType(versionType); diff --git a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index b612844fbd2..7436a43b551 100644 --- a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -36,13 +36,10 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.mapper.DocumentMapper; -import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.indices.IndexAlreadyExistsException; @@ -50,9 +47,6 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - /** * Performs the index operation. *

@@ -73,8 +67,6 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi private final MappingUpdatedAction mappingUpdatedAction; - private final boolean waitForMappingChange; - @Inject public TransportIndexAction(Settings settings, TransportService transportService, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, @@ -84,7 +76,6 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi this.mappingUpdatedAction = mappingUpdatedAction; this.autoCreateIndex = new AutoCreateIndex(settings); this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true); - this.waitForMappingChange = settings.getAsBoolean("action.wait_on_mapping_change", false); } @Override @@ -205,7 +196,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi .versionType(request.versionType()) .origin(Engine.Operation.Origin.PRIMARY); if (index.parsedDoc().mappingsModified()) { - updateMappingOnMaster(request, indexMetaData); + mappingUpdatedAction.updateMappingOnMaster(request.index(), request.type(), indexMetaData.getUUID(), false); } indexShard.index(index); version = index.version(); @@ -217,7 +208,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi .versionType(request.versionType()) .origin(Engine.Operation.Origin.PRIMARY); if (create.parsedDoc().mappingsModified()) { - updateMappingOnMaster(request, indexMetaData); + mappingUpdatedAction.updateMappingOnMaster(request.index(), request.type(), indexMetaData.getUUID(), false); } indexShard.create(create); version = create.version(); @@ -267,47 +258,4 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi } } } - - private void updateMappingOnMaster(final IndexRequest request, IndexMetaData indexMetaData) { - final CountDownLatch latch = new CountDownLatch(1); - try { - final MapperService mapperService = indicesService.indexServiceSafe(request.index()).mapperService(); - final DocumentMapper documentMapper = mapperService.documentMapper(request.type()); - if (documentMapper == null) { // should not happen - return; - } - // we generate the order id before we get the mapping to send and refresh the source, so - // if 2 happen concurrently, we know that the later order will include the previous one - long orderId = mappingUpdatedAction.generateNextMappingUpdateOrder(); - documentMapper.refreshSource(); - DiscoveryNode node = clusterService.localNode(); - final MappingUpdatedAction.MappingUpdatedRequest mappingRequest = - new MappingUpdatedAction.MappingUpdatedRequest(request.index(), indexMetaData.uuid(), request.type(), documentMapper.mappingSource(), orderId, node != null ? node.id() : null); - logger.trace("Sending mapping updated to master: {}", mappingRequest); - mappingUpdatedAction.execute(mappingRequest, new ActionListener() { - @Override - public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) { - // all is well - latch.countDown(); - } - - @Override - public void onFailure(Throwable e) { - latch.countDown(); - logger.warn("Failed to update master on updated mapping for {}", e, mappingRequest); - } - }); - } catch (Exception e) { - latch.countDown(); - logger.warn("Failed to update master on updated mapping for index [" + request.index() + "], type [" + request.type() + "]", e); - } - - if (waitForMappingChange) { - try { - latch.await(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - // ignore - } - } - } } diff --git a/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java b/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java index a7df8e995a7..e702a3a04c1 100644 --- a/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java +++ b/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java @@ -31,15 +31,21 @@ import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaDataMappingService; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.mapper.DocumentMapper; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** @@ -50,16 +56,73 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction() { + @Override + public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) { + // all is well + latch.countDown(); + logger.debug("Successfully updated master with mapping update: {}", mappingRequest); + } + + @Override + public void onFailure(Throwable e) { + latch.countDown(); + logger.warn("Failed to update master on updated mapping for {}", e, mappingRequest); + } + }); + if (waitForMappingChange && !neverWaitForMappingChange) { + try { + latch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } } @Override diff --git a/src/main/java/org/elasticsearch/percolator/PercolatorService.java b/src/main/java/org/elasticsearch/percolator/PercolatorService.java index 40b4c8ec473..d832379b05c 100644 --- a/src/main/java/org/elasticsearch/percolator/PercolatorService.java +++ b/src/main/java/org/elasticsearch/percolator/PercolatorService.java @@ -31,7 +31,6 @@ import org.apache.lucene.util.CloseableThreadLocal; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.percolate.PercolateResponse; import org.elasticsearch.action.percolate.PercolateShardRequest; import org.elasticsearch.action.percolate.PercolateShardResponse; @@ -39,7 +38,6 @@ import org.elasticsearch.cache.recycler.CacheRecycler; import org.elasticsearch.cache.recycler.PageCacheRecycler; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractComponent; @@ -275,7 +273,9 @@ public class PercolatorService extends AbstractComponent { DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(request.documentType()); doc = docMapper.parse(source(parser).type(request.documentType()).flyweight(true)); if (doc.mappingsModified()) { - updateMappingOnMaster(docMapper, request, documentIndexService.indexUUID()); + mappingUpdatedAction.updateMappingOnMaster( + docMapper, request.index(), request.documentType(), documentIndexService.indexUUID(), true + ); } // the document parsing exists the "doc" object, so we need to set the new current field. currentFieldName = parser.currentName(); @@ -835,31 +835,6 @@ public class PercolatorService extends AbstractComponent { } } - // TODO: maybe move this logic into MappingUpdatedAction? There is similar logic for the index and bulk api now. - private void updateMappingOnMaster(DocumentMapper documentMapper, final PercolateShardRequest request, String indexUUID) { - // we generate the order id before we get the mapping to send and refresh the source, so - // if 2 happen concurrently, we know that the later order will include the previous one - long orderId = mappingUpdatedAction.generateNextMappingUpdateOrder(); - documentMapper.refreshSource(); - DiscoveryNode node = clusterService.localNode(); - final MappingUpdatedAction.MappingUpdatedRequest mappingRequest = new MappingUpdatedAction.MappingUpdatedRequest( - request.index(), indexUUID, request.documentType(), documentMapper.mappingSource(), orderId, node != null ? node.id() : null - ); - logger.trace("Sending mapping updated to master: {}", mappingRequest); - mappingUpdatedAction.execute(mappingRequest, new ActionListener() { - @Override - public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) { - // all is well - logger.debug("Successfully updated master with mapping update: {}", mappingRequest); - } - - @Override - public void onFailure(Throwable e) { - logger.warn("Failed to update master on updated mapping for {}", e, mappingRequest); - } - }); - } - private InternalFacets reduceFacets(List shardResults) { if (shardResults.get(0).facets() == null) { return null;