Mapping Overhaul - More user friendly, cluster aware, closes #34.

This commit is contained in:
kimchy 2010-02-21 22:57:39 +02:00
parent 78e73259a0
commit 6d52065db3
26 changed files with 563 additions and 181 deletions

View File

@ -45,6 +45,8 @@ public class PutMappingRequest extends MasterNodeOperationRequest {
private TimeValue timeout = new TimeValue(10, TimeUnit.SECONDS);
private boolean ignoreDuplicates = true;
PutMappingRequest() {
}
@ -109,6 +111,15 @@ public class PutMappingRequest extends MasterNodeOperationRequest {
return this;
}
public boolean ignoreDuplicates() {
return ignoreDuplicates;
}
public PutMappingRequest ignoreDuplicates(boolean ignoreDuplicates) {
this.ignoreDuplicates = ignoreDuplicates;
return this;
}
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
super.readFrom(in);
indices = new String[in.readInt()];
@ -120,6 +131,7 @@ public class PutMappingRequest extends MasterNodeOperationRequest {
}
mappingSource = in.readUTF();
timeout = readTimeValue(in);
ignoreDuplicates = in.readBoolean();
}
@Override public void writeTo(DataOutput out) throws IOException {
@ -140,5 +152,6 @@ public class PutMappingRequest extends MasterNodeOperationRequest {
}
out.writeUTF(mappingSource);
timeout.writeTo(out);
out.writeBoolean(ignoreDuplicates);
}
}

View File

@ -33,32 +33,23 @@ public class PutMappingResponse implements ActionResponse, Streamable {
private boolean acknowledged;
private String parsedSource;
PutMappingResponse() {
}
public PutMappingResponse(boolean acknowledged, String parsedSource) {
public PutMappingResponse(boolean acknowledged) {
this.acknowledged = acknowledged;
this.parsedSource = parsedSource;
}
public boolean acknowledged() {
return acknowledged;
}
public String parsedSource() {
return parsedSource;
}
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
acknowledged = in.readBoolean();
parsedSource = in.readUTF();
}
@Override public void writeTo(DataOutput out) throws IOException {
out.writeBoolean(acknowledged);
out.writeUTF(parsedSource);
}
}

View File

@ -74,8 +74,8 @@ public class TransportPutMappingAction extends TransportMasterNodeOperationActio
@Override protected PutMappingResponse masterOperation(PutMappingRequest request) throws ElasticSearchException {
final String[] indices = processIndices(clusterService.state(), request.indices());
MetaDataService.PutMappingResult result = metaDataService.putMapping(indices, request.type(), request.mappingSource(), request.timeout());
return new PutMappingResponse(result.acknowledged(), result.parsedSource());
MetaDataService.PutMappingResult result = metaDataService.putMapping(indices, request.type(), request.mappingSource(), request.ignoreDuplicates(), request.timeout());
return new PutMappingResponse(result.acknowledged());
}
@Override protected void doExecute(final PutMappingRequest request, final ActionListener<PutMappingResponse> listener) {

View File

@ -28,8 +28,11 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
@ -48,11 +51,14 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
private final TransportCreateIndexAction createIndexAction;
private final MappingUpdatedAction mappingUpdatedAction;
@Inject public TransportIndexAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
TransportCreateIndexAction createIndexAction) {
TransportCreateIndexAction createIndexAction, MappingUpdatedAction mappingUpdatedAction) {
super(settings, transportService, clusterService, indicesService, threadPool, shardStateAction);
this.createIndexAction = createIndexAction;
this.mappingUpdatedAction = mappingUpdatedAction;
this.autoCreateIndex = settings.getAsBoolean("action.autoCreateIndex", true);
this.allowIdGeneration = componentSettings.getAsBoolean("allowIdGeneration", true);
}
@ -105,11 +111,15 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
}
@Override protected IndexResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) {
IndexRequest request = shardRequest.request;
final IndexRequest request = shardRequest.request;
ParsedDocument doc;
if (request.opType() == IndexRequest.OpType.INDEX) {
indexShard(shardRequest).index(request.type(), request.id(), request.source());
doc = indexShard(shardRequest).index(request.type(), request.id(), request.source());
} else {
indexShard(shardRequest).create(request.type(), request.id(), request.source());
doc = indexShard(shardRequest).create(request.type(), request.id(), request.source());
}
if (doc.mappersAdded()) {
updateMappingOnMaster(request);
}
return new IndexResponse(request.index(), request.type(), request.id());
}
@ -122,4 +132,22 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
indexShard(shardRequest).create(request.type(), request.id(), request.source());
}
}
private void updateMappingOnMaster(final IndexRequest request) {
try {
MapperService mapperService = indicesService.indexServiceSafe(request.index()).mapperService();
final String updatedSource = mapperService.documentMapper(request.type()).buildSource();
mappingUpdatedAction.execute(new MappingUpdatedAction.MappingUpdatedRequest(request.index(), request.type(), updatedSource), new ActionListener<MappingUpdatedAction.MappingUpdatedResponse>() {
@Override public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) {
// all is well
}
@Override public void onFailure(Throwable e) {
logger.warn("Failed to update master on updated mapping for index [" + request.index() + "], type [" + request.type() + "] and source [" + updatedSource + "]", e);
}
});
} catch (Exception e) {
logger.warn("Failed to update master on updated mapping for index [" + request.index() + "], type [" + request.type() + "]", e);
}
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cluster;
import com.google.inject.AbstractModule;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction;
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction;
@ -55,5 +56,6 @@ public class ClusterModule extends AbstractModule {
bind(NodeIndexCreatedAction.class).asEagerSingleton();
bind(NodeIndexDeletedAction.class).asEagerSingleton();
bind(NodeMappingCreatedAction.class).asEagerSingleton();
bind(MappingUpdatedAction.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,126 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.action.index;
import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.MetaDataService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.settings.Settings;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* Called by shards in the cluster when their mapping was dynamically updated and it needs to be updated
* in the cluster state meta data (and broadcast to all members).
*
* @author kimchy (Shay Banon)
*/
public class MappingUpdatedAction extends TransportMasterNodeOperationAction<MappingUpdatedAction.MappingUpdatedRequest, MappingUpdatedAction.MappingUpdatedResponse> {
private final MetaDataService metaDataService;
@Inject public MappingUpdatedAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
MetaDataService metaDataService) {
super(settings, transportService, clusterService, threadPool);
this.metaDataService = metaDataService;
}
@Override protected String transportAction() {
return "cluster/mappingUpdated";
}
@Override protected MappingUpdatedRequest newRequest() {
return new MappingUpdatedRequest();
}
@Override protected MappingUpdatedResponse newResponse() {
return new MappingUpdatedResponse();
}
@Override protected MappingUpdatedResponse masterOperation(MappingUpdatedRequest request) throws ElasticSearchException {
metaDataService.updateMapping(request.index(), request.type(), request.mappingSource());
return new MappingUpdatedResponse();
}
public static class MappingUpdatedResponse implements ActionResponse {
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
}
@Override public void writeTo(DataOutput out) throws IOException {
}
}
public static class MappingUpdatedRequest extends MasterNodeOperationRequest {
private String index;
private String type;
private String mappingSource;
MappingUpdatedRequest() {
}
public MappingUpdatedRequest(String index, String type, String mappingSource) {
this.index = index;
this.type = type;
this.mappingSource = mappingSource;
}
public String index() {
return index;
}
public String type() {
return type;
}
public String mappingSource() {
return mappingSource;
}
@Override public ActionRequestValidationException validate() {
return null;
}
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
super.readFrom(in);
index = in.readUTF();
type = in.readUTF();
mappingSource = in.readUTF();
}
@Override public void writeTo(DataOutput out) throws IOException {
super.writeTo(out);
out.writeUTF(index);
out.writeUTF(type);
out.writeUTF(mappingSource);
}
}
}

View File

@ -142,7 +142,7 @@ public class IndexMetaData {
return this;
}
public Builder addMapping(String mappingType, String mappingSource) {
public Builder putMapping(String mappingType, String mappingSource) {
mappings.put(mappingType, mappingSource);
return this;
}
@ -156,7 +156,7 @@ public class IndexMetaData {
builder.settings(readSettingsFromStream(in, globalSettings));
int mappingsSize = in.readInt();
for (int i = 0; i < mappingsSize; i++) {
builder.addMapping(in.readUTF(), in.readUTF());
builder.putMapping(in.readUTF(), in.readUTF());
}
return builder.build();
}

View File

@ -35,24 +35,28 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.InvalidTypeNameException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.util.Strings;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.Tuple;
import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.settings.ImmutableSettings;
import org.elasticsearch.util.settings.Settings;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static com.google.common.collect.Maps.*;
import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.cluster.metadata.MetaData.*;
import static org.elasticsearch.index.mapper.DocumentMapper.MergeFlags.*;
import static org.elasticsearch.util.settings.ImmutableSettings.*;
/**
@ -84,6 +88,8 @@ public class MetaDataService extends AbstractComponent {
this.nodeMappingCreatedAction = nodeMappingCreatedAction;
}
// TODO should find nicer solution than sync here, since we block for timeout (same for other ops)
public synchronized boolean createIndex(final String index, final Settings indexSettings, TimeValue timeout) throws IndexAlreadyExistsException {
if (clusterService.state().routingTable().hasIndex(index)) {
throw new IndexAlreadyExistsException(new Index(index));
@ -200,7 +206,33 @@ public class MetaDataService extends AbstractComponent {
}
}
public PutMappingResult putMapping(final String[] indices, String mappingType, final String mappingSource, TimeValue timeout) throws ElasticSearchException {
public synchronized void updateMapping(final String index, final String type, final String mappingSource) {
MapperService mapperService = indicesService.indexServiceSafe(index).mapperService();
DocumentMapper existingMapper = mapperService.documentMapper(type);
// parse the updated one
DocumentMapper updatedMapper = mapperService.parse(type, mappingSource);
if (existingMapper == null) {
existingMapper = updatedMapper;
} else {
// merge from the updated into the existing, ignore duplicates (we know we have them, we just want the new ones)
existingMapper.merge(updatedMapper, mergeFlags().simulate(false).ignoreDuplicates(true));
}
// build the updated mapping source
final String updatedMappingSource = existingMapper.buildSource();
logger.info("Index [" + index + "]: Update mapping [" + type + "] (dynamic) with source [" + updatedMappingSource + "]");
// publish the new mapping
clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "]", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData());
IndexMetaData indexMetaData = currentState.metaData().index(index);
builder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(type, updatedMappingSource));
return newClusterStateBuilder().state(currentState).metaData(builder).build();
}
});
}
public synchronized PutMappingResult putMapping(final String[] indices, String mappingType, final String mappingSource, boolean ignoreDuplicates, TimeValue timeout) throws ElasticSearchException {
ClusterState clusterState = clusterService.state();
for (String index : indices) {
IndexRoutingTable indexTable = clusterState.routingTable().indicesRouting().get(index);
@ -209,29 +241,52 @@ public class MetaDataService extends AbstractComponent {
}
}
DocumentMapper documentMapper = null;
Map<String, DocumentMapper> newMappers = newHashMap();
Map<String, DocumentMapper> existingMappers = newHashMap();
for (String index : indices) {
IndexService indexService = indicesService.indexService(index);
if (indexService != null) {
// try and parse it (no need to add it here) so we can bail early in case of parsing exception
documentMapper = indexService.mapperService().parse(mappingType, mappingSource);
DocumentMapper newMapper = indexService.mapperService().parse(mappingType, mappingSource);
newMappers.put(index, newMapper);
DocumentMapper existingMapper = indexService.mapperService().documentMapper(mappingType);
if (existingMapper != null) {
// first simulate and throw an exception if something goes wrong
existingMapper.merge(newMapper, mergeFlags().simulate(true).ignoreDuplicates(ignoreDuplicates));
existingMappers.put(index, newMapper);
}
} else {
throw new IndexMissingException(new Index(index));
}
}
String parsedSource = documentMapper.buildSource();
if (mappingType == null) {
mappingType = documentMapper.type();
} else if (!mappingType.equals(documentMapper.type())) {
mappingType = newMappers.values().iterator().next().type();
} else if (!mappingType.equals(newMappers.values().iterator().next().type())) {
throw new InvalidTypeNameException("Type name provided does not match type name within mapping definition");
}
if (mappingType.charAt(0) == '_') {
throw new InvalidTypeNameException("Document mapping type name can't start with '_'");
}
logger.info("Indices [" + Arrays.toString(indices) + "]: Put mapping [" + mappingType + "] with source [" + mappingSource + "]");
final Map<String, Tuple<String, String>> mappings = newHashMap();
for (Map.Entry<String, DocumentMapper> entry : newMappers.entrySet()) {
Tuple<String, String> mapping;
String index = entry.getKey();
// do the actual merge here on the master, and update the mapping source
DocumentMapper newMapper = entry.getValue();
if (existingMappers.containsKey(entry.getKey())) {
// we have an existing mapping, do the merge here (on the master), it will automatically update the mapping source
DocumentMapper existingMapper = existingMappers.get(entry.getKey());
existingMapper.merge(newMapper, mergeFlags().simulate(false).ignoreDuplicates(ignoreDuplicates));
// use the merged mapping source
mapping = new Tuple<String, String>(existingMapper.type(), existingMapper.buildSource());
} else {
mapping = new Tuple<String, String>(newMapper.type(), newMapper.buildSource());
}
mappings.put(index, mapping);
logger.info("Index [" + index + "]: Put mapping [" + mapping.v1() + "] with source [" + mapping.v2() + "]");
}
final CountDownLatch latch = new CountDownLatch(clusterService.state().nodes().size() * indices.length);
final Set<String> indicesSet = Sets.newHashSet(indices);
@ -245,8 +300,7 @@ public class MetaDataService extends AbstractComponent {
};
nodeMappingCreatedAction.add(listener);
final String mappingTypeP = mappingType;
clusterService.submitStateUpdateTask("create-mapping [" + mappingTypeP + "]", new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("put-mapping [" + mappingType + "]", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData());
for (String indexName : indices) {
@ -254,7 +308,8 @@ public class MetaDataService extends AbstractComponent {
if (indexMetaData == null) {
throw new IndexMissingException(new Index(indexName));
}
builder.put(newIndexMetaDataBuilder(indexMetaData).addMapping(mappingTypeP, mappingSource));
Tuple<String, String> mapping = mappings.get(indexName);
builder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(mapping.v1(), mapping.v2()));
}
return newClusterStateBuilder().state(currentState).metaData(builder).build();
}
@ -269,26 +324,19 @@ public class MetaDataService extends AbstractComponent {
nodeMappingCreatedAction.remove(listener);
}
return new PutMappingResult(acknowledged, parsedSource);
return new PutMappingResult(acknowledged);
}
public static class PutMappingResult {
private final boolean acknowledged;
private final String parsedSource;
public PutMappingResult(boolean acknowledged, String parsedSource) {
public PutMappingResult(boolean acknowledged) {
this.acknowledged = acknowledged;
this.parsedSource = parsedSource;
}
public boolean acknowledged() {
return acknowledged;
}
public String parsedSource() {
return parsedSource;
}
}
}

View File

@ -25,6 +25,6 @@ package org.elasticsearch.index.mapper;
*
* @author kimchy (Shay Banon)
*/
public interface BoostFieldMapper extends FieldMapper<Float> {
public interface BoostFieldMapper extends FieldMapper<Float>, InternalMapper {
}

View File

@ -42,6 +42,11 @@ public interface DocumentMapper {
*/
String buildSource() throws FailedToGenerateSourceMapperException;
/**
* Merges this document mapper with the provided document mapper.
*/
void merge(DocumentMapper mergeWith, MergeFlags mergeFlags) throws MergeMappingException;
UidFieldMapper uidMapper();
IdFieldMapper idMapper();
@ -79,5 +84,40 @@ public interface DocumentMapper {
*/
ParsedDocument parse(String source) throws MapperParsingException;
/**
* Adds a field mapper listener.
*/
void addFieldMapperListener(FieldMapperListener fieldMapperListener, boolean includeExisting);
public static class MergeFlags {
public static MergeFlags mergeFlags() {
return new MergeFlags();
}
private boolean simulate = true;
private boolean ignoreDuplicates = false;
public MergeFlags() {
}
public boolean simulate() {
return simulate;
}
public MergeFlags simulate(boolean simulate) {
this.simulate = simulate;
return this;
}
public boolean ignoreDuplicates() {
return ignoreDuplicates;
}
public MergeFlags ignoreDuplicates(boolean ignoreDuplicates) {
this.ignoreDuplicates = ignoreDuplicates;
return this;
}
}
}

View File

@ -26,7 +26,7 @@ public interface DocumentMapperParser {
/**
* Parses the source mapping definition into a document mapper with the specified
* type (overrding the one defined in the source mapping).
* type (overriding the one defined in the source mapping).
*/
DocumentMapper parse(String type, String mappingSource) throws MapperParsingException;

View File

@ -24,7 +24,7 @@ import org.apache.lucene.document.Document;
/**
* @author kimchy (Shay Banon)
*/
public interface IdFieldMapper extends FieldMapper<String> {
public interface IdFieldMapper extends FieldMapper<String>, InternalMapper {
String value(Document document);
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.mapper;
/**
* A marker interface for internal mappings.
*
* @author kimchy (shay.banon)
*/
public interface InternalMapper {
}

View File

@ -218,8 +218,7 @@ public class MapperService extends AbstractIndexComponent implements Iterable<Do
/**
* Returns the {@link FieldMappers} of all the {@link FieldMapper}s that are
* registered under the give fullName ({@link FieldMapper#fullName()} across
* all the different {@link DocumentMapper} types.
* registered under the give fullName across all the different {@link DocumentMapper} types.
*
* @param fullName The full name
* @return All teh {@link FieldMappers} across all the {@link DocumentMapper}s for the given fullName.

View File

@ -0,0 +1,34 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.mapper;
/**
* @author kimchy (shay.banon)
*/
public class MergeMappingException extends MapperException {
public MergeMappingException(String message) {
super(message);
}
public MergeMappingException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -36,12 +36,15 @@ public class ParsedDocument {
private final String source;
public ParsedDocument(String uid, String id, String type, Document document, String source) {
private boolean mappersAdded;
public ParsedDocument(String uid, String id, String type, Document document, String source, boolean mappersAdded) {
this.uid = uid;
this.id = id;
this.type = type;
this.document = document;
this.source = source;
this.mappersAdded = mappersAdded;
}
public String uid() {
@ -64,6 +67,13 @@ public class ParsedDocument {
return this.source;
}
/**
* Has the parsed document caused for new mappings to be added.
*/
public boolean mappersAdded() {
return mappersAdded;
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Document ").append("uid[").append(uid).append("] doc [").append(document).append("]");

View File

@ -30,7 +30,7 @@ import org.elasticsearch.util.concurrent.ThreadSafe;
* @author kimchy (Shay Banon)
*/
@ThreadSafe
public interface SourceFieldMapper extends FieldMapper<String> {
public interface SourceFieldMapper extends FieldMapper<String>, InternalMapper {
public final String NAME = StringHelper.intern("_source");

View File

@ -30,7 +30,7 @@ import org.elasticsearch.util.concurrent.ThreadSafe;
* @author kimchy (Shay Banon)
*/
@ThreadSafe
public interface TypeFieldMapper extends FieldMapper<String> {
public interface TypeFieldMapper extends FieldMapper<String>, InternalMapper {
public static final String NAME = StringHelper.intern("_type");

View File

@ -27,7 +27,7 @@ import org.elasticsearch.util.concurrent.ThreadSafe;
* @author kimchy (Shay Banon)
*/
@ThreadSafe
public interface UidFieldMapper extends FieldMapper<Uid> {
public interface UidFieldMapper extends FieldMapper<Uid>, InternalMapper {
public static final String NAME = StringHelper.intern("_uid");

View File

@ -136,7 +136,7 @@ public class JsonDocumentMapper implements DocumentMapper, ToJson {
private final String type;
private final String mappingSource;
private volatile String mappingSource;
private final JsonUidFieldMapper uidFieldMapper;
@ -211,6 +211,10 @@ public class JsonDocumentMapper implements DocumentMapper, ToJson {
return this.mappingSource;
}
void mappingSource(String mappingSource) {
this.mappingSource = mappingSource;
}
@Override public UidFieldMapper uidMapper() {
return this.uidFieldMapper;
}
@ -314,7 +318,7 @@ public class JsonDocumentMapper implements DocumentMapper, ToJson {
}
}
}
return new ParsedDocument(jsonContext.uid(), jsonContext.id(), jsonContext.type(), jsonContext.doc(), source);
return new ParsedDocument(jsonContext.uid(), jsonContext.id(), jsonContext.type(), jsonContext.doc(), source, jsonContext.mappersAdded());
}
void addFieldMapper(FieldMapper fieldMapper) {
@ -339,6 +343,14 @@ public class JsonDocumentMapper implements DocumentMapper, ToJson {
}
}
@Override public synchronized void merge(DocumentMapper mergeWith, MergeFlags mergeFlags) throws MergeMappingException {
JsonDocumentMapper jsonMergeWith = (JsonDocumentMapper) mergeWith;
rootObjectMapper.mergeMapping(jsonMergeWith.rootObjectMapper, mergeFlags);
if (!mergeFlags.simulate()) {
// update the source to the merged one
mappingSource = buildSource();
}
}
@Override public String buildSource() throws FailedToGenerateSourceMapperException {
try {
@ -353,8 +365,6 @@ public class JsonDocumentMapper implements DocumentMapper, ToJson {
}
@Override public void toJson(JsonBuilder builder, Params params) throws IOException {
builder.startObject(type);
rootObjectMapper.toJson(builder, params);
builder.endObject();
}
}

View File

@ -124,7 +124,10 @@ public class JsonDocumentMapperParser implements DocumentMapperParser {
docBuilder.mappingSource(source);
return docBuilder.build();
JsonDocumentMapper documentMapper = docBuilder.build();
// update the source with the generated one
documentMapper.mappingSource(documentMapper.buildSource());
return documentMapper;
}
private JsonUidFieldMapper.Builder parseUidField(ObjectNode uidNode) {

View File

@ -23,8 +23,7 @@ import com.google.common.collect.ImmutableMap;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.JsonToken;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.FieldMapperListener;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.util.concurrent.ThreadSafe;
import org.elasticsearch.util.joda.FormatDateTimeFormatter;
import org.elasticsearch.util.json.JsonBuilder;
@ -253,6 +252,7 @@ public class JsonObjectMapper implements JsonMapper {
.dynamic(dynamic).pathType(pathType).dateTimeFormatter(dateTimeFormatters).build(builderContext);
putMapper(objectMapper);
objectMapper.parse(jsonContext);
jsonContext.addedMapper();
}
} else {
// not dynamic, read everything up to end object
@ -333,6 +333,44 @@ public class JsonObjectMapper implements JsonMapper {
jsonContext.docMapper().addFieldMapper((FieldMapper) mapper);
mapper.parse(jsonContext);
jsonContext.addedMapper();
}
}
public void mergeMapping(JsonObjectMapper mergeWith, DocumentMapper.MergeFlags mergeFlags) throws MergeMappingException {
synchronized (mutex) {
for (JsonMapper mapper : mergeWith.mappers.values()) {
if (mapper instanceof JsonObjectMapper) {
JsonObjectMapper mergeWithMapper = (JsonObjectMapper) mapper;
JsonMapper mergeIntoMapper = mappers.get(mergeWithMapper.name());
if (mergeIntoMapper != null) {
if (!(mergeIntoMapper instanceof JsonObjectMapper)) {
throw new MergeMappingException("Can't merge an object mapping [" + mergeWithMapper.name() + "] into a non object mapper");
}
((JsonObjectMapper) mergeIntoMapper).mergeMapping(mergeWithMapper, mergeFlags);
} else {
if (!mergeFlags.simulate()) {
putMapper(mergeWithMapper);
}
}
} else {
// not an object mapper, bail if we have it, otherwise, add
// we might get fancy later on and allow per field mapper merge
if (mappers.containsKey(mapper.name())) {
if (mappers.get(mapper.name()) instanceof InternalMapper) {
// simple ignore internal mappings
} else {
if (!mergeFlags.ignoreDuplicates()) {
throw new MergeMappingException("Mapper [" + mapper.name() + "] exists, can't merge");
}
}
} else {
if (!mergeFlags.simulate()) {
putMapper(mapper);
}
}
}
}
}
}

View File

@ -45,10 +45,12 @@ public class JsonParseContext {
private String uid;
private StringBuilder stringBuiler = new StringBuilder();
private StringBuilder stringBuilder = new StringBuilder();
private ParsedIdState parsedIdState;
private boolean mappersAdded = false;
public JsonParseContext(JsonDocumentMapper docMapper, JsonPath path) {
this.docMapper = docMapper;
this.path = path;
@ -61,6 +63,15 @@ public class JsonParseContext {
this.source = source;
this.path.reset();
this.parsedIdState = ParsedIdState.NO;
this.mappersAdded = false;
}
public boolean mappersAdded() {
return this.mappersAdded;
}
public void addedMapper() {
this.mappersAdded = true;
}
public String type() {
@ -122,8 +133,8 @@ public class JsonParseContext {
* Its better to reuse the.
*/
public StringBuilder stringBuilder() {
stringBuiler.setLength(0);
return this.stringBuiler;
stringBuilder.setLength(0);
return this.stringBuilder;
}
public static enum ParsedIdState {

View File

@ -24,6 +24,7 @@ import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.util.Nullable;
import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.concurrent.ThreadSafe;
@ -44,9 +45,9 @@ public interface IndexShard extends IndexShardComponent {
*/
SizeValue estimateFlushableMemorySize() throws ElasticSearchException;
void create(String type, String id, String source) throws ElasticSearchException;
ParsedDocument create(String type, String id, String source) throws ElasticSearchException;
void index(String type, String id, String source) throws ElasticSearchException;
ParsedDocument index(String type, String id, String source) throws ElasticSearchException;
void delete(String type, String id);

View File

@ -21,10 +21,7 @@ package org.elasticsearch.index.shard;
import com.google.inject.Inject;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.FieldSelector;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TermEnum;
import org.apache.lucene.search.*;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
@ -34,7 +31,10 @@ import org.elasticsearch.index.cache.filter.FilterCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.ScheduledRefreshableEngine;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.DocumentMapperNotFoundException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.query.IndexQueryParser;
import org.elasticsearch.index.query.IndexQueryParserMissingException;
import org.elasticsearch.index.query.IndexQueryParserService;
@ -53,11 +53,8 @@ import org.elasticsearch.util.lucene.search.TermFilter;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import static com.google.common.collect.Lists.*;
/**
* @author kimchy (Shay Banon)
*/
@ -207,12 +204,12 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
return engine.estimateFlushableMemorySize();
}
public void create(String type, String id, String source) throws ElasticSearchException {
public ParsedDocument create(String type, String id, String source) throws ElasticSearchException {
writeAllowed();
innerCreate(type, id, source);
return innerCreate(type, id, source);
}
private void innerCreate(String type, String id, String source) {
private ParsedDocument innerCreate(String type, String id, String source) {
DocumentMapper docMapper = mapperService.type(type);
if (docMapper == null) {
throw new DocumentMapperNotFoundException("No mapper found for type [" + type + "]");
@ -222,14 +219,15 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
logger.trace("Indexing {}", doc);
}
engine.create(new Engine.Create(doc.doc(), docMapper.mappers().indexAnalyzer(), docMapper.type(), doc.id(), doc.source()));
return doc;
}
public void index(String type, String id, String source) throws ElasticSearchException {
public ParsedDocument index(String type, String id, String source) throws ElasticSearchException {
writeAllowed();
innerIndex(type, id, source);
return innerIndex(type, id, source);
}
private void innerIndex(String type, String id, String source) {
private ParsedDocument innerIndex(String type, String id, String source) {
DocumentMapper docMapper = mapperService.type(type);
if (docMapper == null) {
throw new DocumentMapperNotFoundException("No mapper found for type [" + type + "]");
@ -239,6 +237,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
logger.trace("Indexing {}", doc);
}
engine.index(new Engine.Index(docMapper.uidMapper().term(doc.uid()), doc.doc(), docMapper.mappers().indexAnalyzer(), docMapper.type(), doc.id(), doc.source()));
return doc;
}
public void delete(String type, String id) {
@ -400,7 +399,6 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
synchronized (mutex) {
state = IndexShardState.STARTED;
}
threadPool.execute(new ShardMappingSniffer());
scheduleRefresherIfNeeded();
}
@ -417,7 +415,6 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
synchronized (mutex) {
state = IndexShardState.STARTED;
}
threadPool.execute(new ShardMappingSniffer());
scheduleRefresherIfNeeded();
}
}
@ -518,109 +515,113 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
}
}
/**
* The mapping sniffer reads docs from the index and introduces them into the mapping service. This is
* because of dynamic fields and we want to reintroduce them.
*
* <p>Note, this is done on the shard level, we might have other dynamic fields in other shards, but
* this will be taken care off in another component.
*/
private class ShardMappingSniffer implements Runnable {
@Override public void run() {
engine.refresh(new Engine.Refresh(true));
// I wrote all this code, and now there is no need for it since dynamic mappings are autoamtically
// broadcast to all the cluster when updated, so we won't be in a state when the mappings are not up to
// date, in any case, lets leave it here for now
TermEnum termEnum = null;
Engine.Searcher searcher = searcher();
try {
List<String> typeNames = newArrayList();
termEnum = searcher.reader().terms(new Term(TypeFieldMapper.NAME, ""));
while (true) {
Term term = termEnum.term();
if (term == null) {
break;
}
if (!term.field().equals(TypeFieldMapper.NAME)) {
break;
}
typeNames.add(term.text());
termEnum.next();
}
logger.debug("Sniffing mapping for [{}]", typeNames);
for (final String type : typeNames) {
threadPool.execute(new Runnable() {
@Override public void run() {
Engine.Searcher searcher = searcher();
try {
Query query = new ConstantScoreQuery(filterCache.cache(new TermFilter(new Term(TypeFieldMapper.NAME, type))));
long typeCount = Lucene.count(searcher().searcher(), query, -1);
int marker = (int) (typeCount / mappingSnifferDocs);
if (marker == 0) {
marker = 1;
}
final int fMarker = marker;
searcher.searcher().search(query, new Collector() {
private final FieldSelector fieldSelector = new UidAndSourceFieldSelector();
private int counter = 0;
private IndexReader reader;
@Override public void setScorer(Scorer scorer) throws IOException {
}
@Override public void collect(int doc) throws IOException {
if (state == IndexShardState.CLOSED) {
throw new IOException("CLOSED");
}
if (++counter == fMarker) {
counter = 0;
Document document = reader.document(doc, fieldSelector);
Uid uid = Uid.createUid(document.get(UidFieldMapper.NAME));
String source = document.get(SourceFieldMapper.NAME);
mapperService.type(uid.type()).parse(uid.type(), uid.id(), source);
}
}
@Override public void setNextReader(IndexReader reader, int docBase) throws IOException {
this.reader = reader;
}
@Override public boolean acceptsDocsOutOfOrder() {
return true;
}
});
} catch (IOException e) {
if (e.getMessage().equals("CLOSED")) {
// ignore, we got closed
} else {
logger.warn("Failed to sniff mapping for type [" + type + "]", e);
}
} finally {
searcher.release();
}
}
});
}
} catch (IOException e) {
if (e.getMessage().equals("CLOSED")) {
// ignore, we got closed
} else {
logger.warn("Failed to sniff mapping", e);
}
} finally {
if (termEnum != null) {
try {
termEnum.close();
} catch (IOException e) {
// ignore
}
}
searcher.release();
}
}
}
// /**
// * The mapping sniffer reads docs from the index and introduces them into the mapping service. This is
// * because of dynamic fields and we want to reintroduce them.
// *
// * <p>Note, this is done on the shard level, we might have other dynamic fields in other shards, but
// * this will be taken care off in another component.
// */
// private class ShardMappingSniffer implements Runnable {
// @Override public void run() {
// engine.refresh(new Engine.Refresh(true));
//
// TermEnum termEnum = null;
// Engine.Searcher searcher = searcher();
// try {
// List<String> typeNames = newArrayList();
// termEnum = searcher.reader().terms(new Term(TypeFieldMapper.NAME, ""));
// while (true) {
// Term term = termEnum.term();
// if (term == null) {
// break;
// }
// if (!term.field().equals(TypeFieldMapper.NAME)) {
// break;
// }
// typeNames.add(term.text());
// termEnum.next();
// }
//
// logger.debug("Sniffing mapping for [{}]", typeNames);
//
// for (final String type : typeNames) {
// threadPool.execute(new Runnable() {
// @Override public void run() {
// Engine.Searcher searcher = searcher();
// try {
// Query query = new ConstantScoreQuery(filterCache.cache(new TermFilter(new Term(TypeFieldMapper.NAME, type))));
// long typeCount = Lucene.count(searcher().searcher(), query, -1);
//
// int marker = (int) (typeCount / mappingSnifferDocs);
// if (marker == 0) {
// marker = 1;
// }
// final int fMarker = marker;
// searcher.searcher().search(query, new Collector() {
//
// private final FieldSelector fieldSelector = new UidAndSourceFieldSelector();
// private int counter = 0;
// private IndexReader reader;
//
// @Override public void setScorer(Scorer scorer) throws IOException {
// }
//
// @Override public void collect(int doc) throws IOException {
// if (state == IndexShardState.CLOSED) {
// throw new IOException("CLOSED");
// }
// if (++counter == fMarker) {
// counter = 0;
//
// Document document = reader.document(doc, fieldSelector);
// Uid uid = Uid.createUid(document.get(UidFieldMapper.NAME));
// String source = document.get(SourceFieldMapper.NAME);
//
// mapperService.type(uid.type()).parse(uid.type(), uid.id(), source);
// }
// }
//
// @Override public void setNextReader(IndexReader reader, int docBase) throws IOException {
// this.reader = reader;
// }
//
// @Override public boolean acceptsDocsOutOfOrder() {
// return true;
// }
// });
// } catch (IOException e) {
// if (e.getMessage().equals("CLOSED")) {
// // ignore, we got closed
// } else {
// logger.warn("Failed to sniff mapping for type [" + type + "]", e);
// }
// } finally {
// searcher.release();
// }
// }
// });
// }
// } catch (IOException e) {
// if (e.getMessage().equals("CLOSED")) {
// // ignore, we got closed
// } else {
// logger.warn("Failed to sniff mapping", e);
// }
// } finally {
// if (termEnum != null) {
// try {
// termEnum.close();
// } catch (IOException e) {
// // ignore
// }
// }
// searcher.release();
// }
// }
// }
}

View File

@ -56,6 +56,7 @@ public class RestPutMappingAction extends BaseRestHandler {
putMappingRequest.type(request.param("type"));
putMappingRequest.mappingSource(request.contentAsString());
putMappingRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10)));
putMappingRequest.ignoreDuplicates(request.paramAsBoolean("ignoreDuplicates", putMappingRequest.ignoreDuplicates()));
client.admin().indices().execPutMapping(putMappingRequest, new ActionListener<PutMappingResponse>() {
@Override public void onResponse(PutMappingResponse response) {
try {
@ -63,8 +64,6 @@ public class RestPutMappingAction extends BaseRestHandler {
builder.startObject()
.field("ok", true)
.field("acknowledged", response.acknowledged());
builder.raw(", \"parsedSource\" : ");
builder.raw(response.parsedSource());
builder.endObject();
channel.sendResponse(new JsonRestResponse(request, OK, builder));
} catch (IOException e) {