Moved the updateMappingOnMaster logic into a single place.

Closes #5798
This commit is contained in:
Martijn van Groningen 2014-04-17 17:18:43 +07:00
parent d6a676724a
commit a808fe9d46
4 changed files with 73 additions and 127 deletions

View File

@ -24,7 +24,6 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ElasticsearchWrapperException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.delete.DeleteRequest;
@ -41,9 +40,7 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
@ -56,8 +53,6 @@ import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
@ -185,7 +180,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
applyVersion(request.items()[j], preVersions[j], preVersionTypes[j]);
}
for (Tuple<String, String> mappingToUpdate : mappingsToUpdate) {
updateMappingOnMaster(mappingToUpdate.v1(), mappingToUpdate.v2());
mappingUpdatedAction.updateMappingOnMaster(mappingToUpdate.v1(), mappingToUpdate.v2(), true);
}
throw (ElasticsearchException) e;
}
@ -345,7 +340,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
}
for (Tuple<String, String> mappingToUpdate : mappingsToUpdate) {
updateMappingOnMaster(mappingToUpdate.v1(), mappingToUpdate.v2());
mappingUpdatedAction.updateMappingOnMaster(mappingToUpdate.v1(), mappingToUpdate.v2(), true);
}
if (request.refresh()) {
@ -600,41 +595,6 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
}
}
private void updateMappingOnMaster(final String index, final String type) {
try {
MapperService mapperService = indicesService.indexServiceSafe(index).mapperService();
final DocumentMapper documentMapper = mapperService.documentMapper(type);
if (documentMapper == null) { // should not happen
return;
}
IndexMetaData metaData = clusterService.state().metaData().index(index);
if (metaData == null) {
return;
}
// we generate the order id before we get the mapping to send and refresh the source, so
// if 2 happen concurrently, we know that the later order will include the previous one
long orderId = mappingUpdatedAction.generateNextMappingUpdateOrder();
documentMapper.refreshSource();
DiscoveryNode node = clusterService.localNode();
final MappingUpdatedAction.MappingUpdatedRequest request = new MappingUpdatedAction.MappingUpdatedRequest(index, metaData.uuid(), type, documentMapper.mappingSource(), orderId, node != null ? node.id() : null);
mappingUpdatedAction.execute(request, new ActionListener<MappingUpdatedAction.MappingUpdatedResponse>() {
@Override
public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) {
// all is well
}
@Override
public void onFailure(Throwable e) {
logger.warn("failed to update master on updated mapping for {}", e, request);
}
});
} catch (Throwable e) {
logger.warn("failed to update master on updated mapping for index [{}], type [{}]", e, index, type);
}
}
private void applyVersion(BulkItemRequest item, long version, VersionType versionType) {
if (item.request() instanceof IndexRequest) {
((IndexRequest) item.request()).version(version).versionType(versionType);

View File

@ -36,13 +36,10 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndexAlreadyExistsException;
@ -50,9 +47,6 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* Performs the index operation.
* <p/>
@ -73,8 +67,6 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
private final MappingUpdatedAction mappingUpdatedAction;
private final boolean waitForMappingChange;
@Inject
public TransportIndexAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
@ -84,7 +76,6 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
this.mappingUpdatedAction = mappingUpdatedAction;
this.autoCreateIndex = new AutoCreateIndex(settings);
this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true);
this.waitForMappingChange = settings.getAsBoolean("action.wait_on_mapping_change", false);
}
@Override
@ -205,7 +196,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
.versionType(request.versionType())
.origin(Engine.Operation.Origin.PRIMARY);
if (index.parsedDoc().mappingsModified()) {
updateMappingOnMaster(request, indexMetaData);
mappingUpdatedAction.updateMappingOnMaster(request.index(), request.type(), indexMetaData.getUUID(), false);
}
indexShard.index(index);
version = index.version();
@ -217,7 +208,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
.versionType(request.versionType())
.origin(Engine.Operation.Origin.PRIMARY);
if (create.parsedDoc().mappingsModified()) {
updateMappingOnMaster(request, indexMetaData);
mappingUpdatedAction.updateMappingOnMaster(request.index(), request.type(), indexMetaData.getUUID(), false);
}
indexShard.create(create);
version = create.version();
@ -267,47 +258,4 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
}
}
}
private void updateMappingOnMaster(final IndexRequest request, IndexMetaData indexMetaData) {
final CountDownLatch latch = new CountDownLatch(1);
try {
final MapperService mapperService = indicesService.indexServiceSafe(request.index()).mapperService();
final DocumentMapper documentMapper = mapperService.documentMapper(request.type());
if (documentMapper == null) { // should not happen
return;
}
// we generate the order id before we get the mapping to send and refresh the source, so
// if 2 happen concurrently, we know that the later order will include the previous one
long orderId = mappingUpdatedAction.generateNextMappingUpdateOrder();
documentMapper.refreshSource();
DiscoveryNode node = clusterService.localNode();
final MappingUpdatedAction.MappingUpdatedRequest mappingRequest =
new MappingUpdatedAction.MappingUpdatedRequest(request.index(), indexMetaData.uuid(), request.type(), documentMapper.mappingSource(), orderId, node != null ? node.id() : null);
logger.trace("Sending mapping updated to master: {}", mappingRequest);
mappingUpdatedAction.execute(mappingRequest, new ActionListener<MappingUpdatedAction.MappingUpdatedResponse>() {
@Override
public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) {
// all is well
latch.countDown();
}
@Override
public void onFailure(Throwable e) {
latch.countDown();
logger.warn("Failed to update master on updated mapping for {}", e, mappingRequest);
}
});
} catch (Exception e) {
latch.countDown();
logger.warn("Failed to update master on updated mapping for index [" + request.index() + "], type [" + request.type() + "]", e);
}
if (waitForMappingChange) {
try {
latch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// ignore
}
}
}
}

View File

@ -31,15 +31,21 @@ import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
@ -50,16 +56,73 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
private final AtomicLong mappingUpdateOrderGen = new AtomicLong();
private final MetaDataMappingService metaDataMappingService;
private final IndicesService indicesService;
private final boolean waitForMappingChange;
@Inject
public MappingUpdatedAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
MetaDataMappingService metaDataMappingService) {
MetaDataMappingService metaDataMappingService, IndicesService indicesService) {
super(settings, transportService, clusterService, threadPool);
this.metaDataMappingService = metaDataMappingService;
this.indicesService = indicesService;
this.waitForMappingChange = settings.getAsBoolean("action.wait_on_mapping_change", false);
}
public long generateNextMappingUpdateOrder() {
return mappingUpdateOrderGen.incrementAndGet();
public void updateMappingOnMaster(String index, String type, boolean neverWaitForMappingChange) {
IndexMetaData metaData = clusterService.state().metaData().index(index);
if (metaData != null) {
updateMappingOnMaster(index, type, metaData.getUUID(), neverWaitForMappingChange);
}
}
public void updateMappingOnMaster(String index, String type, String indexUUID, boolean neverWaitForMappingChange) {
final MapperService mapperService = indicesService.indexServiceSafe(index).mapperService();
final DocumentMapper documentMapper = mapperService.documentMapper(type);
if (documentMapper != null) { // should not happen
updateMappingOnMaster(documentMapper, index, type, indexUUID, neverWaitForMappingChange);
}
}
public void updateMappingOnMaster(DocumentMapper documentMapper, String index, String type, String indexUUID, boolean neverWaitForMappingChange) {
final CountDownLatch latch = new CountDownLatch(1);
final MappingUpdatedAction.MappingUpdatedRequest mappingRequest;
try {
// we generate the order id before we get the mapping to send and refresh the source, so
// if 2 happen concurrently, we know that the later order will include the previous one
long orderId = mappingUpdateOrderGen.incrementAndGet();
documentMapper.refreshSource();
DiscoveryNode node = clusterService.localNode();
mappingRequest = new MappingUpdatedAction.MappingUpdatedRequest(
index, indexUUID, type, documentMapper.mappingSource(), orderId, node != null ? node.id() : null
);
} catch (Throwable t) {
logger.warn("Failed to update master on updated mapping for index [" + index + "], type [" + type + "]", t);
latch.countDown();
throw t;
}
logger.trace("Sending mapping updated to master: {}", mappingRequest);
execute(mappingRequest, new ActionListener<MappingUpdatedAction.MappingUpdatedResponse>() {
@Override
public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) {
// all is well
latch.countDown();
logger.debug("Successfully updated master with mapping update: {}", mappingRequest);
}
@Override
public void onFailure(Throwable e) {
latch.countDown();
logger.warn("Failed to update master on updated mapping for {}", e, mappingRequest);
}
});
if (waitForMappingChange && !neverWaitForMappingChange) {
try {
latch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
@Override

View File

@ -31,7 +31,6 @@ import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.action.percolate.PercolateShardRequest;
import org.elasticsearch.action.percolate.PercolateShardResponse;
@ -39,7 +38,6 @@ import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
@ -275,7 +273,9 @@ public class PercolatorService extends AbstractComponent {
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(request.documentType());
doc = docMapper.parse(source(parser).type(request.documentType()).flyweight(true));
if (doc.mappingsModified()) {
updateMappingOnMaster(docMapper, request, documentIndexService.indexUUID());
mappingUpdatedAction.updateMappingOnMaster(
docMapper, request.index(), request.documentType(), documentIndexService.indexUUID(), true
);
}
// the document parsing exists the "doc" object, so we need to set the new current field.
currentFieldName = parser.currentName();
@ -835,31 +835,6 @@ public class PercolatorService extends AbstractComponent {
}
}
// TODO: maybe move this logic into MappingUpdatedAction? There is similar logic for the index and bulk api now.
private void updateMappingOnMaster(DocumentMapper documentMapper, final PercolateShardRequest request, String indexUUID) {
// we generate the order id before we get the mapping to send and refresh the source, so
// if 2 happen concurrently, we know that the later order will include the previous one
long orderId = mappingUpdatedAction.generateNextMappingUpdateOrder();
documentMapper.refreshSource();
DiscoveryNode node = clusterService.localNode();
final MappingUpdatedAction.MappingUpdatedRequest mappingRequest = new MappingUpdatedAction.MappingUpdatedRequest(
request.index(), indexUUID, request.documentType(), documentMapper.mappingSource(), orderId, node != null ? node.id() : null
);
logger.trace("Sending mapping updated to master: {}", mappingRequest);
mappingUpdatedAction.execute(mappingRequest, new ActionListener<MappingUpdatedAction.MappingUpdatedResponse>() {
@Override
public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) {
// all is well
logger.debug("Successfully updated master with mapping update: {}", mappingRequest);
}
@Override
public void onFailure(Throwable e) {
logger.warn("Failed to update master on updated mapping for {}", e, mappingRequest);
}
});
}
private InternalFacets reduceFacets(List<PercolateShardResponse> shardResults) {
if (shardResults.get(0).facets() == null) {
return null;