Add indexUUID to mapping-updated and mapping-refresh events and make sure they are applied to an index with same UUID.
This can go wrong if indices with the same name are repeatably created and deleted. UUIDs can not be null anymore. If UUID is not available `_na_` will be used as a value. Also - some minor clean up in ShardStateAction where shard started events could be added twice to the to-be-applied list where the second instance will be ignored. Closes #3783
This commit is contained in:
parent
8f087e802d
commit
1d7e20b712
|
@ -40,6 +40,7 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
|
|||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
@ -62,7 +63,6 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -569,7 +569,10 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
|||
}
|
||||
documentMapper.refreshSource();
|
||||
|
||||
mappingUpdatedAction.execute(new MappingUpdatedAction.MappingUpdatedRequest(index, type, documentMapper.mappingSource()), new ActionListener<MappingUpdatedAction.MappingUpdatedResponse>() {
|
||||
IndexMetaData metaData = clusterService.state().metaData().index(index);
|
||||
|
||||
final MappingUpdatedAction.MappingUpdatedRequest request = new MappingUpdatedAction.MappingUpdatedRequest(index, metaData.uuid(), type, documentMapper.mappingSource());
|
||||
mappingUpdatedAction.execute(request, new ActionListener<MappingUpdatedAction.MappingUpdatedResponse>() {
|
||||
@Override
|
||||
public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) {
|
||||
// all is well
|
||||
|
@ -577,11 +580,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
|||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
try {
|
||||
logger.warn("failed to update master on updated mapping for index [{}], type [{}] and source [{}]", e, index, type, documentMapper.mappingSource().string());
|
||||
} catch (IOException e1) {
|
||||
// ignore
|
||||
}
|
||||
logger.warn("failed to update master on updated mapping for {}", e, request);
|
||||
}
|
||||
});
|
||||
} catch (Throwable e) {
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
|
|||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
|
@ -48,7 +49,6 @@ import org.elasticsearch.indices.IndicesService;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -184,7 +184,8 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
|
|||
final IndexRequest request = shardRequest.request;
|
||||
|
||||
// validate, if routing is required, that we got routing
|
||||
MappingMetaData mappingMd = clusterState.metaData().index(request.index()).mappingOrDefault(request.type());
|
||||
IndexMetaData indexMetaData = clusterState.metaData().index(request.index());
|
||||
MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type());
|
||||
if (mappingMd != null && mappingMd.routing().required()) {
|
||||
if (request.routing() == null) {
|
||||
throw new RoutingMissingException(request.index(), request.type(), request.id());
|
||||
|
@ -203,7 +204,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
|
|||
.versionType(request.versionType())
|
||||
.origin(Engine.Operation.Origin.PRIMARY);
|
||||
if (index.parsedDoc().mappingsModified()) {
|
||||
updateMappingOnMaster(request);
|
||||
updateMappingOnMaster(request, indexMetaData);
|
||||
}
|
||||
indexShard.index(index);
|
||||
version = index.version();
|
||||
|
@ -215,7 +216,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
|
|||
.versionType(request.versionType())
|
||||
.origin(Engine.Operation.Origin.PRIMARY);
|
||||
if (create.parsedDoc().mappingsModified()) {
|
||||
updateMappingOnMaster(request);
|
||||
updateMappingOnMaster(request, indexMetaData);
|
||||
}
|
||||
indexShard.create(create);
|
||||
version = create.version();
|
||||
|
@ -263,17 +264,19 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
|
|||
}
|
||||
}
|
||||
|
||||
private void updateMappingOnMaster(final IndexRequest request) {
|
||||
private void updateMappingOnMaster(final IndexRequest request, IndexMetaData indexMetaData) {
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
try {
|
||||
MapperService mapperService = indicesService.indexServiceSafe(request.index()).mapperService();
|
||||
final MapperService mapperService = indicesService.indexServiceSafe(request.index()).mapperService();
|
||||
final DocumentMapper documentMapper = mapperService.documentMapper(request.type());
|
||||
if (documentMapper == null) { // should not happen
|
||||
return;
|
||||
}
|
||||
documentMapper.refreshSource();
|
||||
logger.trace("Sending mapping updated to master: index [{}] type [{}]", request.index(), request.type());
|
||||
mappingUpdatedAction.execute(new MappingUpdatedAction.MappingUpdatedRequest(request.index(), request.type(), documentMapper.mappingSource()), new ActionListener<MappingUpdatedAction.MappingUpdatedResponse>() {
|
||||
final MappingUpdatedAction.MappingUpdatedRequest mappingRequest =
|
||||
new MappingUpdatedAction.MappingUpdatedRequest(request.index(), indexMetaData.uuid(), request.type(), documentMapper.mappingSource());
|
||||
logger.trace("Sending mapping updated to master: {}", mappingRequest);
|
||||
mappingUpdatedAction.execute(mappingRequest, new ActionListener<MappingUpdatedAction.MappingUpdatedResponse>() {
|
||||
@Override
|
||||
public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) {
|
||||
// all is well
|
||||
|
@ -283,11 +286,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
|
|||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
latch.countDown();
|
||||
try {
|
||||
logger.warn("Failed to update master on updated mapping for index [" + request.index() + "], type [" + request.type() + "] and source [" + documentMapper.mappingSource().string() + "]", e);
|
||||
} catch (IOException e1) {
|
||||
// ignore
|
||||
}
|
||||
logger.warn("Failed to update master on updated mapping for {}", e, mappingRequest);
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.cluster.action.index;
|
||||
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
|
@ -27,6 +28,7 @@ import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
|
|||
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
|
||||
import org.elasticsearch.common.compress.CompressedString;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
|
@ -76,7 +78,7 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
|
|||
|
||||
@Override
|
||||
protected void masterOperation(final MappingUpdatedRequest request, final ClusterState state, final ActionListener<MappingUpdatedResponse> listener) throws ElasticSearchException {
|
||||
metaDataMappingService.updateMapping(request.index(), request.type(), request.mappingSource(), new MetaDataMappingService.Listener() {
|
||||
metaDataMappingService.updateMapping(request.index(), request.indexUUID(), request.type(), request.mappingSource(), new MetaDataMappingService.Listener() {
|
||||
@Override
|
||||
public void onResponse(MetaDataMappingService.Response response) {
|
||||
listener.onResponse(new MappingUpdatedResponse());
|
||||
|
@ -84,7 +86,7 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
|
|||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
logger.warn("failed to dynamically update the mapping in cluster_state from shard", t);
|
||||
logger.warn("[{}] update-mapping [{}] failed to dynamically update the mapping in cluster_state from shard", t, request.index(), request.type());
|
||||
listener.onFailure(t);
|
||||
}
|
||||
});
|
||||
|
@ -105,16 +107,16 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
|
|||
public static class MappingUpdatedRequest extends MasterNodeOperationRequest<MappingUpdatedRequest> {
|
||||
|
||||
private String index;
|
||||
|
||||
private String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE;
|
||||
private String type;
|
||||
|
||||
private CompressedString mappingSource;
|
||||
|
||||
MappingUpdatedRequest() {
|
||||
}
|
||||
|
||||
public MappingUpdatedRequest(String index, String type, CompressedString mappingSource) {
|
||||
public MappingUpdatedRequest(String index, String indexUUID, String type, CompressedString mappingSource) {
|
||||
this.index = index;
|
||||
this.indexUUID = indexUUID;
|
||||
this.type = type;
|
||||
this.mappingSource = mappingSource;
|
||||
}
|
||||
|
@ -123,6 +125,10 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
|
|||
return index;
|
||||
}
|
||||
|
||||
public String indexUUID() {
|
||||
return indexUUID;
|
||||
}
|
||||
|
||||
public String type() {
|
||||
return type;
|
||||
}
|
||||
|
@ -142,6 +148,9 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
|
|||
index = in.readString();
|
||||
type = in.readString();
|
||||
mappingSource = CompressedString.readCompressedString(in);
|
||||
if (in.getVersion().onOrAfter(Version.V_0_90_6)) {
|
||||
indexUUID = in.readString();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -150,6 +159,14 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
|
|||
out.writeString(index);
|
||||
out.writeString(type);
|
||||
mappingSource.writeTo(out);
|
||||
if (out.getVersion().onOrAfter(Version.V_0_90_6)) {
|
||||
out.writeString(indexUUID);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "index [" + index + "], indexUUID [" + indexUUID + "], type [" + type + "] and source [" + mappingSource + "]";
|
||||
}
|
||||
}
|
||||
}
|
|
@ -20,7 +20,9 @@
|
|||
package org.elasticsearch.cluster.action.index;
|
||||
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
|
@ -60,7 +62,7 @@ public class NodeMappingRefreshAction extends AbstractComponent {
|
|||
}
|
||||
|
||||
private void innerMappingRefresh(NodeMappingRefreshRequest request) {
|
||||
metaDataMappingService.refreshMapping(request.index(), request.types());
|
||||
metaDataMappingService.refreshMapping(request.index(), request.indexUUID(), request.types());
|
||||
}
|
||||
|
||||
private class NodeMappingRefreshTransportHandler extends BaseTransportRequestHandler<NodeMappingRefreshRequest> {
|
||||
|
@ -87,14 +89,16 @@ public class NodeMappingRefreshAction extends AbstractComponent {
|
|||
public static class NodeMappingRefreshRequest extends TransportRequest {
|
||||
|
||||
private String index;
|
||||
private String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE;
|
||||
private String[] types;
|
||||
private String nodeId;
|
||||
|
||||
NodeMappingRefreshRequest() {
|
||||
}
|
||||
|
||||
public NodeMappingRefreshRequest(String index, String[] types, String nodeId) {
|
||||
public NodeMappingRefreshRequest(String index, String indexUUID, String[] types, String nodeId) {
|
||||
this.index = index;
|
||||
this.indexUUID = indexUUID;
|
||||
this.types = types;
|
||||
this.nodeId = nodeId;
|
||||
}
|
||||
|
@ -103,6 +107,11 @@ public class NodeMappingRefreshAction extends AbstractComponent {
|
|||
return index;
|
||||
}
|
||||
|
||||
public String indexUUID() {
|
||||
return indexUUID;
|
||||
}
|
||||
|
||||
|
||||
public String[] types() {
|
||||
return types;
|
||||
}
|
||||
|
@ -117,6 +126,9 @@ public class NodeMappingRefreshAction extends AbstractComponent {
|
|||
out.writeString(index);
|
||||
out.writeStringArray(types);
|
||||
out.writeString(nodeId);
|
||||
if (out.getVersion().onOrAfter(Version.V_0_90_6)) {
|
||||
out.writeString(indexUUID);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -125,6 +137,9 @@ public class NodeMappingRefreshAction extends AbstractComponent {
|
|||
index = in.readString();
|
||||
types = in.readStringArray();
|
||||
nodeId = in.readString();
|
||||
if (in.getVersion().onOrAfter(Version.V_0_90_6)) {
|
||||
indexUUID = in.readString();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -77,7 +77,7 @@ public class ShardStateAction extends AbstractComponent {
|
|||
transportService.registerHandler(ShardFailedTransportHandler.ACTION, new ShardFailedTransportHandler());
|
||||
}
|
||||
|
||||
public void shardFailed(final ShardRouting shardRouting, String indexUUID, final String reason) throws ElasticSearchException {
|
||||
public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String reason) throws ElasticSearchException {
|
||||
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason);
|
||||
logger.warn("{} sending failed shard for {}", shardRouting.shardId(), shardRoutingEntry);
|
||||
DiscoveryNodes nodes = clusterService.state().nodes();
|
||||
|
@ -215,21 +215,25 @@ public class ShardStateAction extends AbstractComponent {
|
|||
// with the shard still initializing, and it will try and start it again (until the verification comes)
|
||||
|
||||
IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(shardRouting.id());
|
||||
|
||||
boolean applyShardEvent = true;
|
||||
|
||||
for (ShardRouting entry : indexShardRoutingTable) {
|
||||
if (shardRouting.currentNodeId().equals(entry.currentNodeId())) {
|
||||
// we found the same shard that exists on the same node id
|
||||
if (entry.initializing()) {
|
||||
// shard not started, add it to the shards to be processed.
|
||||
shardRoutingToBeApplied.add(shardRouting);
|
||||
logger.debug("{} will apply shard started {}", shardRouting.shardId(), shardRoutingEntry);
|
||||
} else {
|
||||
if (!entry.initializing()) {
|
||||
// shard is in initialized state, skipping event (probable already started)
|
||||
logger.debug("{} ignoring shard started event for {}, current state: {}", shardRouting.shardId(), shardRoutingEntry, entry.state());
|
||||
applyShardEvent = false;
|
||||
}
|
||||
} else {
|
||||
}
|
||||
}
|
||||
|
||||
if (applyShardEvent) {
|
||||
shardRoutingToBeApplied.add(shardRouting);
|
||||
logger.debug("{} will apply shard started {}", shardRouting.shardId(), shardRoutingEntry);
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Throwable t) {
|
||||
logger.error("{} unexpected failure while processing shard started [{}]", t, shardRouting.shardId(), shardRouting);
|
||||
}
|
||||
|
@ -299,7 +303,7 @@ public class ShardStateAction extends AbstractComponent {
|
|||
|
||||
private ShardRouting shardRouting;
|
||||
|
||||
private String indexUUID;
|
||||
private String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE;
|
||||
|
||||
private String reason;
|
||||
|
||||
|
@ -318,7 +322,7 @@ public class ShardStateAction extends AbstractComponent {
|
|||
shardRouting = readShardRoutingEntry(in);
|
||||
reason = in.readString();
|
||||
if (in.getVersion().onOrAfter(Version.V_0_90_6)) {
|
||||
indexUUID = in.readOptionalString();
|
||||
indexUUID = in.readString();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -328,18 +332,13 @@ public class ShardStateAction extends AbstractComponent {
|
|||
shardRouting.writeTo(out);
|
||||
out.writeString(reason);
|
||||
if (out.getVersion().onOrAfter(Version.V_0_90_6)) {
|
||||
out.writeOptionalString(indexUUID);
|
||||
out.writeString(indexUUID);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder(shardRouting.toString());
|
||||
if (indexUUID != null) {
|
||||
sb.append(", indexUUID [").append(indexUUID).append("]");
|
||||
}
|
||||
sb.append(", reason [").append(reason).append("]");
|
||||
return sb.toString();
|
||||
return "" + shardRouting + ", indexUUID [" + indexUUID + "], reason [" + reason + "]";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -157,6 +157,7 @@ public class IndexMetaData {
|
|||
public static final String SETTING_BLOCKS_METADATA = "index.blocks.metadata";
|
||||
public static final String SETTING_VERSION_CREATED = "index.version.created";
|
||||
public static final String SETTING_UUID = "index.uuid";
|
||||
public static final String INDEX_UUID_NA_VALUE = "_na_";
|
||||
|
||||
private final String index;
|
||||
private final long version;
|
||||
|
@ -214,25 +215,30 @@ public class IndexMetaData {
|
|||
return index;
|
||||
}
|
||||
|
||||
public String getIndex() {
|
||||
return index();
|
||||
}
|
||||
|
||||
public String uuid() {
|
||||
return settings.get(SETTING_UUID, INDEX_UUID_NA_VALUE);
|
||||
}
|
||||
|
||||
public String getUUID() {
|
||||
return settings.get(SETTING_UUID);
|
||||
return uuid();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test whether the current index UUID is the same as the given one. Incoming nulls always return true.
|
||||
* Test whether the current index UUID is the same as the given one. Returns true if either are _na_
|
||||
*/
|
||||
public boolean isSameUUID(@Nullable String otherUUID) {
|
||||
if (otherUUID == null || getUUID() == null) {
|
||||
public boolean isSameUUID(String otherUUID) {
|
||||
assert otherUUID != null;
|
||||
assert uuid() != null;
|
||||
if (INDEX_UUID_NA_VALUE.equals(otherUUID) || INDEX_UUID_NA_VALUE.equals(uuid())) {
|
||||
return true;
|
||||
}
|
||||
return otherUUID.equals(getUUID());
|
||||
}
|
||||
|
||||
|
||||
public String getIndex() {
|
||||
return index();
|
||||
}
|
||||
|
||||
public long version() {
|
||||
return this.version;
|
||||
}
|
||||
|
|
|
@ -69,7 +69,7 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
|
||||
private final NodeMappingCreatedAction mappingCreatedAction;
|
||||
|
||||
private final BlockingQueue<Object> refreshOrUpdateQueue = ConcurrentCollections.newBlockingQueue();
|
||||
private final BlockingQueue<MappingTask> refreshOrUpdateQueue = ConcurrentCollections.newBlockingQueue();
|
||||
|
||||
@Inject
|
||||
public MetaDataMappingService(Settings settings, ClusterService clusterService, IndicesService indicesService, NodeMappingCreatedAction mappingCreatedAction) {
|
||||
|
@ -79,24 +79,32 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
this.mappingCreatedAction = mappingCreatedAction;
|
||||
}
|
||||
|
||||
static class RefreshTask {
|
||||
static class MappingTask {
|
||||
final String index;
|
||||
final String indexUUID;
|
||||
|
||||
MappingTask(String index, final String indexUUID) {
|
||||
this.index = index;
|
||||
this.indexUUID = indexUUID;
|
||||
}
|
||||
}
|
||||
|
||||
static class RefreshTask extends MappingTask {
|
||||
final String[] types;
|
||||
|
||||
RefreshTask(String index, String[] types) {
|
||||
this.index = index;
|
||||
RefreshTask(String index, final String indexUUID, String[] types) {
|
||||
super(index, indexUUID);
|
||||
this.types = types;
|
||||
}
|
||||
}
|
||||
|
||||
static class UpdateTask {
|
||||
final String index;
|
||||
static class UpdateTask extends MappingTask {
|
||||
final String type;
|
||||
final CompressedString mappingSource;
|
||||
final Listener listener;
|
||||
|
||||
UpdateTask(String index, String type, CompressedString mappingSource, Listener listener) {
|
||||
this.index = index;
|
||||
UpdateTask(String index, String indexUUID, String type, CompressedString mappingSource, Listener listener) {
|
||||
super(index, indexUUID);
|
||||
this.type = type;
|
||||
this.mappingSource = mappingSource;
|
||||
this.listener = listener;
|
||||
|
@ -109,7 +117,7 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
* and generate a single cluster change event out of all of those.
|
||||
*/
|
||||
ClusterState executeRefreshOrUpdate(final ClusterState currentState) throws Exception {
|
||||
List<Object> allTasks = new ArrayList<Object>();
|
||||
List<MappingTask> allTasks = new ArrayList<MappingTask>();
|
||||
refreshOrUpdateQueue.drainTo(allTasks);
|
||||
|
||||
if (allTasks.isEmpty()) {
|
||||
|
@ -118,43 +126,45 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
|
||||
// break down to tasks per index, so we can optimize the on demand index service creation
|
||||
// to only happen for the duration of a single index processing of its respective events
|
||||
Map<String, List<Object>> tasksPerIndex = Maps.newHashMap();
|
||||
for (Object task : allTasks) {
|
||||
String index = null;
|
||||
if (task instanceof UpdateTask) {
|
||||
index = ((UpdateTask) task).index;
|
||||
} else if (task instanceof RefreshTask) {
|
||||
index = ((RefreshTask) task).index;
|
||||
} else {
|
||||
logger.warn("illegal state, got wrong mapping task type [{}]", task);
|
||||
Map<String, List<MappingTask>> tasksPerIndex = Maps.newHashMap();
|
||||
for (MappingTask task : allTasks) {
|
||||
if (task.index == null) {
|
||||
logger.debug("ignoring a mapping task of type [{}] with a null index.", task);
|
||||
}
|
||||
if (index != null) {
|
||||
List<Object> indexTasks = tasksPerIndex.get(index);
|
||||
List<MappingTask> indexTasks = tasksPerIndex.get(task.index);
|
||||
if (indexTasks == null) {
|
||||
indexTasks = new ArrayList<Object>();
|
||||
tasksPerIndex.put(index, indexTasks);
|
||||
indexTasks = new ArrayList<MappingTask>();
|
||||
tasksPerIndex.put(task.index, indexTasks);
|
||||
}
|
||||
indexTasks.add(task);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
boolean dirty = false;
|
||||
MetaData.Builder mdBuilder = newMetaDataBuilder().metaData(currentState.metaData());
|
||||
for (Map.Entry<String, List<Object>> entry : tasksPerIndex.entrySet()) {
|
||||
for (Map.Entry<String, List<MappingTask>> entry : tasksPerIndex.entrySet()) {
|
||||
String index = entry.getKey();
|
||||
List<Object> tasks = entry.getValue();
|
||||
List<MappingTask> tasks = entry.getValue();
|
||||
boolean removeIndex = false;
|
||||
// keep track of what we already refreshed, no need to refresh it again...
|
||||
Set<String> processedRefreshes = Sets.newHashSet();
|
||||
try {
|
||||
for (Object task : tasks) {
|
||||
if (task instanceof RefreshTask) {
|
||||
RefreshTask refreshTask = (RefreshTask) task;
|
||||
for (MappingTask task : tasks) {
|
||||
final IndexMetaData indexMetaData = mdBuilder.get(index);
|
||||
if (indexMetaData == null) {
|
||||
// index got delete on us, ignore...
|
||||
// index got deleted on us, ignore...
|
||||
logger.debug("[{}] ignoring task [{}] - index meta data doesn't exist", index, task);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!indexMetaData.isSameUUID(task.indexUUID)) {
|
||||
// index got deleted on us, ignore...
|
||||
logger.debug("[{}] ignoring task [{}] - index meta data doesn't match task uuid", index, task);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (task instanceof RefreshTask) {
|
||||
RefreshTask refreshTask = (RefreshTask) task;
|
||||
IndexService indexService = indicesService.indexService(index);
|
||||
if (indexService == null) {
|
||||
// we need to create the index here, and add the current mapping to it, so we can merge
|
||||
|
@ -195,13 +205,8 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
String type = updateTask.type;
|
||||
CompressedString mappingSource = updateTask.mappingSource;
|
||||
|
||||
// first, check if it really needs to be updated
|
||||
final IndexMetaData indexMetaData = mdBuilder.get(index);
|
||||
if (indexMetaData == null) {
|
||||
// index got delete on us, ignore...
|
||||
continue;
|
||||
}
|
||||
if (indexMetaData.mappings().containsKey(type) && indexMetaData.mapping(type).source().equals(mappingSource)) {
|
||||
logger.debug("[{}] update_mapping [{}] ignoring mapping update task as it's source is equal to ours", index, updateTask.type);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -221,16 +226,13 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
|
||||
// if we end up with the same mapping as the original once, ignore
|
||||
if (indexMetaData.mappings().containsKey(type) && indexMetaData.mapping(type).source().equals(updatedMapper.mappingSource())) {
|
||||
logger.debug("[{}] update_mapping [{}] ignoring mapping update task as it results in the same source as what we have", index, updateTask.type);
|
||||
continue;
|
||||
}
|
||||
|
||||
// build the updated mapping source
|
||||
if (logger.isDebugEnabled()) {
|
||||
try {
|
||||
logger.debug("[{}] update_mapping [{}] (dynamic) with source [{}]", index, type, updatedMapper.mappingSource().string());
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
logger.debug("[{}] update_mapping [{}] (dynamic) with source [{}]", index, type, updatedMapper.mappingSource());
|
||||
} else if (logger.isInfoEnabled()) {
|
||||
logger.info("[{}] update_mapping [{}] (dynamic)", index, type);
|
||||
}
|
||||
|
@ -262,8 +264,8 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
/**
|
||||
* Refreshes mappings if they are not the same between original and parsed version
|
||||
*/
|
||||
public void refreshMapping(final String index, final String... types) {
|
||||
refreshOrUpdateQueue.add(new RefreshTask(index, types));
|
||||
public void refreshMapping(final String index, final String indexUUID, final String... types) {
|
||||
refreshOrUpdateQueue.add(new RefreshTask(index, indexUUID, types));
|
||||
clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", Priority.HIGH, new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public void onFailure(String source, Throwable t) {
|
||||
|
@ -277,8 +279,8 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
});
|
||||
}
|
||||
|
||||
public void updateMapping(final String index, final String type, final CompressedString mappingSource, final Listener listener) {
|
||||
refreshOrUpdateQueue.add(new UpdateTask(index, type, mappingSource, listener));
|
||||
public void updateMapping(final String index, final String indexUUID, final String type, final CompressedString mappingSource, final Listener listener) {
|
||||
refreshOrUpdateQueue.add(new UpdateTask(index, indexUUID, type, mappingSource, listener));
|
||||
clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "]", Priority.HIGH, new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public void onFailure(String source, Throwable t) {
|
||||
|
|
|
@ -295,7 +295,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
|
|||
|
||||
@Override
|
||||
public String indexUUID() {
|
||||
return indexSettings.get(IndexMetaData.SETTING_UUID);
|
||||
return indexSettings.get(IndexMetaData.SETTING_UUID, IndexMetaData.INDEX_UUID_NA_VALUE);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -388,7 +388,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
}
|
||||
if (typesToRefresh != null) {
|
||||
nodeMappingRefreshAction.nodeMappingRefresh(event.state(),
|
||||
new NodeMappingRefreshAction.NodeMappingRefreshRequest(index, typesToRefresh.toArray(new String[typesToRefresh.size()]), event.state().nodes().localNodeId()));
|
||||
new NodeMappingRefreshAction.NodeMappingRefreshRequest(index, indexMetaData.uuid(),
|
||||
typesToRefresh.toArray(new String[typesToRefresh.size()]), event.state().nodes().localNodeId()));
|
||||
}
|
||||
// go over and remove mappings
|
||||
for (DocumentMapper documentMapper : mapperService) {
|
||||
|
@ -820,9 +821,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
public void onFailedEngine(final ShardId shardId, final Throwable failure) {
|
||||
ShardRouting shardRouting = null;
|
||||
final IndexService indexService = indicesService.indexService(shardId.index().name());
|
||||
String indexUUID = null;
|
||||
if (indexService != null) {
|
||||
indexUUID = indexService.indexUUID();
|
||||
IndexShard indexShard = indexService.shard(shardId.id());
|
||||
if (indexShard != null) {
|
||||
shardRouting = indexShard.routingEntry();
|
||||
|
@ -833,7 +832,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
return;
|
||||
}
|
||||
final ShardRouting fShardRouting = shardRouting;
|
||||
final String finalIndexUUID = indexUUID;
|
||||
final String indexUUID = indexService.indexUUID(); // we know indexService is not null here.
|
||||
threadPool.generic().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
@ -849,7 +848,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
}
|
||||
try {
|
||||
failedShards.put(fShardRouting.shardId(), new FailedShard(fShardRouting.version()));
|
||||
shardStateAction.shardFailed(fShardRouting, finalIndexUUID, "engine failure, message [" + detailedMessage(failure) + "]");
|
||||
shardStateAction.shardFailed(fShardRouting, indexUUID, "engine failure, message [" + detailedMessage(failure) + "]");
|
||||
} catch (Throwable e1) {
|
||||
logger.warn("[{}][{}] failed to mark shard as failed after a failed engine", e1, indexService.index().name(), shardId.id());
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.lucene.document.StringField;
|
|||
import org.apache.lucene.index.*;
|
||||
import org.apache.lucene.store.RAMDirectory;
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.HashedBytesArray;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
|
@ -390,7 +391,7 @@ public class SimpleIdCacheTests {
|
|||
|
||||
@Override
|
||||
public String indexUUID() {
|
||||
return null;
|
||||
return IndexMetaData.INDEX_UUID_NA_VALUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue