store the mapping definition in compressed form internally to save memory
This commit is contained in:
parent
a2d10d490d
commit
c40935ae14
|
@ -35,6 +35,7 @@ import org.elasticsearch.cluster.routing.ShardsIterator;
|
|||
import org.elasticsearch.common.UUID;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.mapper.DocumentMapper;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.indices.IndexAlreadyExistsException;
|
||||
|
@ -42,6 +43,8 @@ import org.elasticsearch.indices.IndicesService;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Performs the index operation.
|
||||
*
|
||||
|
@ -149,14 +152,20 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
|
|||
private void updateMappingOnMaster(final IndexRequest request) {
|
||||
try {
|
||||
MapperService mapperService = indicesService.indexServiceSafe(request.index()).mapperService();
|
||||
final String updatedSource = mapperService.documentMapper(request.type()).buildSource();
|
||||
mappingUpdatedAction.execute(new MappingUpdatedAction.MappingUpdatedRequest(request.index(), request.type(), updatedSource), new ActionListener<MappingUpdatedAction.MappingUpdatedResponse>() {
|
||||
final DocumentMapper documentMapper = mapperService.documentMapper(request.type());
|
||||
documentMapper.refreshSource();
|
||||
|
||||
mappingUpdatedAction.execute(new MappingUpdatedAction.MappingUpdatedRequest(request.index(), request.type(), documentMapper.mappingSource()), new ActionListener<MappingUpdatedAction.MappingUpdatedResponse>() {
|
||||
@Override public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) {
|
||||
// all is well
|
||||
}
|
||||
|
||||
@Override public void onFailure(Throwable e) {
|
||||
logger.warn("Failed to update master on updated mapping for index [" + request.index() + "], type [" + request.type() + "] and source [" + updatedSource + "]", e);
|
||||
try {
|
||||
logger.warn("Failed to update master on updated mapping for index [" + request.index() + "], type [" + request.type() + "] and source [" + documentMapper.mappingSource().string() + "]", e);
|
||||
} catch (IOException e1) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.cluster.action.index;
|
||||
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.ElasticSearchParseException;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
|
||||
|
@ -27,6 +28,7 @@ import org.elasticsearch.action.support.master.TransportMasterNodeOperationActio
|
|||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
|
||||
import org.elasticsearch.common.compress.CompressedString;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
@ -65,7 +67,11 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
|
|||
}
|
||||
|
||||
@Override protected MappingUpdatedResponse masterOperation(MappingUpdatedRequest request, ClusterState state) throws ElasticSearchException {
|
||||
metaDataMappingService.updateMapping(request.index(), request.type(), request.mappingSource());
|
||||
try {
|
||||
metaDataMappingService.updateMapping(request.index(), request.type(), request.mappingSource());
|
||||
} catch (IOException e) {
|
||||
throw new ElasticSearchParseException("failed to parse mapping form compressed string", e);
|
||||
}
|
||||
return new MappingUpdatedResponse();
|
||||
}
|
||||
|
||||
|
@ -83,12 +89,12 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
|
|||
|
||||
private String type;
|
||||
|
||||
private String mappingSource;
|
||||
private CompressedString mappingSource;
|
||||
|
||||
MappingUpdatedRequest() {
|
||||
}
|
||||
|
||||
public MappingUpdatedRequest(String index, String type, String mappingSource) {
|
||||
public MappingUpdatedRequest(String index, String type, CompressedString mappingSource) {
|
||||
this.index = index;
|
||||
this.type = type;
|
||||
this.mappingSource = mappingSource;
|
||||
|
@ -102,7 +108,7 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
|
|||
return type;
|
||||
}
|
||||
|
||||
public String mappingSource() {
|
||||
public CompressedString mappingSource() {
|
||||
return mappingSource;
|
||||
}
|
||||
|
||||
|
@ -114,14 +120,14 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
|
|||
super.readFrom(in);
|
||||
index = in.readUTF();
|
||||
type = in.readUTF();
|
||||
mappingSource = in.readUTF();
|
||||
mappingSource = CompressedString.readCompressedString(in);
|
||||
}
|
||||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeUTF(index);
|
||||
out.writeUTF(type);
|
||||
out.writeUTF(mappingSource);
|
||||
mappingSource.writeTo(out);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.common.Preconditions;
|
|||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.common.collect.ImmutableSet;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.compress.CompressedString;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
|
@ -55,11 +56,11 @@ public class IndexMetaData {
|
|||
|
||||
private final Settings settings;
|
||||
|
||||
private final ImmutableMap<String, String> mappings;
|
||||
private final ImmutableMap<String, CompressedString> mappings;
|
||||
|
||||
private transient final int totalNumberOfShards;
|
||||
|
||||
private IndexMetaData(String index, Settings settings, ImmutableMap<String, String> mappings) {
|
||||
private IndexMetaData(String index, Settings settings, ImmutableMap<String, CompressedString> mappings) {
|
||||
Preconditions.checkArgument(settings.getAsInt(SETTING_NUMBER_OF_SHARDS, -1) != -1, "must specify numberOfShards for index [" + index + "]");
|
||||
Preconditions.checkArgument(settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, -1) != -1, "must specify numberOfReplicas for index [" + index + "]");
|
||||
this.index = index;
|
||||
|
@ -118,15 +119,15 @@ public class IndexMetaData {
|
|||
return aliases();
|
||||
}
|
||||
|
||||
public ImmutableMap<String, String> mappings() {
|
||||
public ImmutableMap<String, CompressedString> mappings() {
|
||||
return mappings;
|
||||
}
|
||||
|
||||
public ImmutableMap<String, String> getMappings() {
|
||||
public ImmutableMap<String, CompressedString> getMappings() {
|
||||
return mappings();
|
||||
}
|
||||
|
||||
public String mapping(String mappingType) {
|
||||
public CompressedString mapping(String mappingType) {
|
||||
return mappings.get(mappingType);
|
||||
}
|
||||
|
||||
|
@ -144,7 +145,7 @@ public class IndexMetaData {
|
|||
|
||||
private Settings settings = ImmutableSettings.Builder.EMPTY_SETTINGS;
|
||||
|
||||
private MapBuilder<String, String> mappings = MapBuilder.newMapBuilder();
|
||||
private MapBuilder<String, CompressedString> mappings = MapBuilder.newMapBuilder();
|
||||
|
||||
public Builder(String index) {
|
||||
this.index = index;
|
||||
|
@ -193,11 +194,16 @@ public class IndexMetaData {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder putMapping(String mappingType, String mappingSource) {
|
||||
public Builder putMapping(String mappingType, CompressedString mappingSource) {
|
||||
mappings.put(mappingType, mappingSource);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder putMapping(String mappingType, String mappingSource) throws IOException {
|
||||
mappings.put(mappingType, new CompressedString(mappingSource));
|
||||
return this;
|
||||
}
|
||||
|
||||
public IndexMetaData build() {
|
||||
return new IndexMetaData(index, settings, mappings.immutableMap());
|
||||
}
|
||||
|
@ -212,8 +218,9 @@ public class IndexMetaData {
|
|||
builder.endObject();
|
||||
|
||||
builder.startArray("mappings");
|
||||
for (Map.Entry<String, String> entry : indexMetaData.mappings().entrySet()) {
|
||||
XContentParser parser = XContentFactory.xContent(entry.getValue()).createParser(entry.getValue());
|
||||
for (Map.Entry<String, CompressedString> entry : indexMetaData.mappings().entrySet()) {
|
||||
byte[] data = entry.getValue().uncompressed();
|
||||
XContentParser parser = XContentFactory.xContent(data).createParser(data);
|
||||
Map<String, Object> mapping = parser.map();
|
||||
parser.close();
|
||||
builder.map(mapping);
|
||||
|
@ -250,7 +257,7 @@ public class IndexMetaData {
|
|||
if (mappingSource == null) {
|
||||
// crap, no mapping source, warn?
|
||||
} else {
|
||||
builder.putMapping(mapping.keySet().iterator().next(), mappingSource);
|
||||
builder.putMapping(mapping.keySet().iterator().next(), new CompressedString(mappingSource));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -265,7 +272,7 @@ public class IndexMetaData {
|
|||
builder.settings(readSettingsFromStream(in, globalSettings));
|
||||
int mappingsSize = in.readVInt();
|
||||
for (int i = 0; i < mappingsSize; i++) {
|
||||
builder.putMapping(in.readUTF(), in.readUTF());
|
||||
builder.putMapping(in.readUTF(), CompressedString.readCompressedString(in));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
@ -274,9 +281,9 @@ public class IndexMetaData {
|
|||
out.writeUTF(indexMetaData.index());
|
||||
writeSettingsToStream(indexMetaData.settings(), out);
|
||||
out.writeVInt(indexMetaData.mappings().size());
|
||||
for (Map.Entry<String, String> entry : indexMetaData.mappings().entrySet()) {
|
||||
for (Map.Entry<String, CompressedString> entry : indexMetaData.mappings().entrySet()) {
|
||||
out.writeUTF(entry.getKey());
|
||||
out.writeUTF(entry.getValue());
|
||||
entry.getValue().writeTo(out);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.cluster.routing.strategy.ShardsRoutingStrategy;
|
|||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.Maps;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.compress.CompressedString;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
|
@ -131,7 +132,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
}
|
||||
|
||||
// add to the mappings files that exists within the config/mappings location
|
||||
Map<String, String> mappings = Maps.newHashMap();
|
||||
Map<String, CompressedString> mappings = Maps.newHashMap();
|
||||
File mappingsDir = new File(environment.configFile(), "mappings");
|
||||
if (mappingsDir.exists() && mappingsDir.isDirectory()) {
|
||||
File defaultMappingsDir = new File(mappingsDir, "_default");
|
||||
|
@ -145,7 +146,9 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
}
|
||||
|
||||
// put this last so index level mappings can override default mappings
|
||||
mappings.putAll(request.mappings);
|
||||
for (Map.Entry<String, String> entry : request.mappings.entrySet()) {
|
||||
mappings.put(entry.getKey(), new CompressedString(entry.getValue()));
|
||||
}
|
||||
|
||||
ImmutableSettings.Builder indexSettingsBuilder = settingsBuilder().put(request.settings);
|
||||
if (request.settings.get(SETTING_NUMBER_OF_SHARDS) == null) {
|
||||
|
@ -161,9 +164,9 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
// now add the mappings
|
||||
IndexService indexService = indicesService.indexServiceSafe(request.index);
|
||||
MapperService mapperService = indexService.mapperService();
|
||||
for (Map.Entry<String, String> entry : mappings.entrySet()) {
|
||||
for (Map.Entry<String, CompressedString> entry : mappings.entrySet()) {
|
||||
try {
|
||||
mapperService.add(entry.getKey(), entry.getValue());
|
||||
mapperService.add(entry.getKey(), entry.getValue().string());
|
||||
} catch (Exception e) {
|
||||
indicesService.deleteIndex(request.index);
|
||||
throw new MapperParsingException("mapping [" + entry.getKey() + "]", e);
|
||||
|
@ -176,7 +179,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
}
|
||||
|
||||
IndexMetaData.Builder indexMetaData = newIndexMetaDataBuilder(request.index).settings(actualIndexSettings);
|
||||
for (Map.Entry<String, String> entry : mappings.entrySet()) {
|
||||
for (Map.Entry<String, CompressedString> entry : mappings.entrySet()) {
|
||||
indexMetaData.putMapping(entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
||||
|
@ -223,7 +226,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
});
|
||||
}
|
||||
|
||||
private void addMappings(Map<String, String> mappings, File mappingsDir) {
|
||||
private void addMappings(Map<String, CompressedString> mappings, File mappingsDir) {
|
||||
File[] mappingsFiles = mappingsDir.listFiles();
|
||||
for (File mappingFile : mappingsFiles) {
|
||||
String fileNameNoSuffix = mappingFile.getName().substring(0, mappingFile.getName().lastIndexOf('.'));
|
||||
|
@ -232,7 +235,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
continue;
|
||||
}
|
||||
try {
|
||||
mappings.put(fileNameNoSuffix, Streams.copyToString(new FileReader(mappingFile)));
|
||||
mappings.put(fileNameNoSuffix, new CompressedString(Streams.copyToString(new FileReader(mappingFile))));
|
||||
} catch (IOException e) {
|
||||
logger.warn("failed to read mapping [" + fileNameNoSuffix + "] from location [" + mappingFile + "], ignoring...", e);
|
||||
}
|
||||
|
@ -324,6 +327,13 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Request mappingsCompressed(Map<String, CompressedString> mappings) throws IOException {
|
||||
for (Map.Entry<String, CompressedString> entry : mappings.entrySet()) {
|
||||
this.mappings.put(entry.getKey(), entry.getValue().string());
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
public Request timeout(TimeValue timeout) {
|
||||
this.timeout = timeout;
|
||||
return this;
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
|||
import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.compress.CompressedString;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.timer.Timeout;
|
||||
|
@ -40,6 +41,7 @@ import org.elasticsearch.indices.IndexMissingException;
|
|||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.timer.TimerService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -74,6 +76,10 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
this.nodeMappingCreatedAction = nodeMappingCreatedAction;
|
||||
}
|
||||
|
||||
public void updateMapping(final String index, final String type, final CompressedString mappingSource) throws IOException {
|
||||
updateMapping(index, type, mappingSource.string());
|
||||
}
|
||||
|
||||
public void updateMapping(final String index, final String type, final String mappingSource) {
|
||||
clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "]", new ClusterStateUpdateTask() {
|
||||
@Override public ClusterState execute(ClusterState currentState) {
|
||||
|
@ -89,16 +95,19 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
existingMapper.merge(updatedMapper, mergeFlags().simulate(false));
|
||||
}
|
||||
// build the updated mapping source
|
||||
final String updatedMappingSource = existingMapper.buildSource();
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("[{}] update_mapping [{}] (dynamic) with source [{}]", index, type, updatedMappingSource);
|
||||
try {
|
||||
logger.debug("[{}] update_mapping [{}] (dynamic) with source [{}]", index, type, existingMapper.mappingSource().string());
|
||||
} catch (IOException e) {
|
||||
// ignore
|
||||
}
|
||||
} else if (logger.isInfoEnabled()) {
|
||||
logger.info("[{}] update_mapping [{}] (dynamic)", index, type);
|
||||
}
|
||||
|
||||
MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData());
|
||||
IndexMetaData indexMetaData = currentState.metaData().index(index);
|
||||
builder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(type, updatedMappingSource));
|
||||
builder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(type, existingMapper.mappingSource()));
|
||||
return newClusterStateBuilder().state(currentState).metaData(builder).build();
|
||||
}
|
||||
});
|
||||
|
@ -151,7 +160,7 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
throw new InvalidTypeNameException("Document mapping type name can't start with '_'");
|
||||
}
|
||||
|
||||
final Map<String, Tuple<String, String>> mappings = newHashMap();
|
||||
final Map<String, Tuple<String, CompressedString>> mappings = newHashMap();
|
||||
int expectedReplies = 0;
|
||||
for (Map.Entry<String, DocumentMapper> entry : newMappers.entrySet()) {
|
||||
String index = entry.getKey();
|
||||
|
@ -160,15 +169,17 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
if (existingMappers.containsKey(entry.getKey())) {
|
||||
// we have an existing mapping, do the merge here (on the master), it will automatically update the mapping source
|
||||
DocumentMapper existingMapper = existingMappers.get(entry.getKey());
|
||||
String existingSource = existingMapper.mappingSource();
|
||||
CompressedString existingSource = existingMapper.mappingSource();
|
||||
|
||||
existingMapper.merge(newMapper, mergeFlags().simulate(false));
|
||||
String updatedSource = existingMapper.buildSource();
|
||||
|
||||
CompressedString updatedSource = existingMapper.mappingSource();
|
||||
if (existingSource.equals(updatedSource)) {
|
||||
// same source, no changes, ignore it
|
||||
} else {
|
||||
expectedReplies += (currentState.nodes().size() - 1); // for this index, on update, don't include the master, since we update it already
|
||||
// use the merged mapping source
|
||||
mappings.put(index, new Tuple<String, String>(existingMapper.type(), updatedSource));
|
||||
mappings.put(index, new Tuple<String, CompressedString>(existingMapper.type(), updatedSource));
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("[{}] update_mapping [{}] with source [{}]", index, existingMapper.type(), updatedSource);
|
||||
} else if (logger.isInfoEnabled()) {
|
||||
|
@ -177,8 +188,8 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
}
|
||||
} else {
|
||||
expectedReplies += currentState.nodes().size();
|
||||
String newSource = newMapper.buildSource();
|
||||
mappings.put(index, new Tuple<String, String>(newMapper.type(), newSource));
|
||||
CompressedString newSource = newMapper.mappingSource();
|
||||
mappings.put(index, new Tuple<String, CompressedString>(newMapper.type(), newSource));
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("[{}] create_mapping [{}] with source [{}]", index, newMapper.type(), newSource);
|
||||
} else if (logger.isInfoEnabled()) {
|
||||
|
@ -199,7 +210,7 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
if (indexMetaData == null) {
|
||||
throw new IndexMissingException(new Index(indexName));
|
||||
}
|
||||
Tuple<String, String> mapping = mappings.get(indexName);
|
||||
Tuple<String, CompressedString> mapping = mappings.get(indexName);
|
||||
if (mapping != null) {
|
||||
builder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(mapping.v1(), mapping.v2()));
|
||||
}
|
||||
|
|
|
@ -37,19 +37,32 @@ public class CompressedString implements Streamable {
|
|||
|
||||
private byte[] bytes;
|
||||
|
||||
CompressedString() {
|
||||
}
|
||||
|
||||
public CompressedString(String str) throws IOException {
|
||||
UnicodeUtil.UTF8Result result = Unicode.unsafeFromStringAsUtf8(str);
|
||||
this.bytes = LZFEncoder.encodeWithCache(result.result, result.length);
|
||||
}
|
||||
|
||||
public byte[] bytes() {
|
||||
public byte[] compressed() {
|
||||
return this.bytes;
|
||||
}
|
||||
|
||||
public byte[] uncompressed() throws IOException {
|
||||
return LZFDecoder.decode(bytes);
|
||||
}
|
||||
|
||||
public String string() throws IOException {
|
||||
return Unicode.fromBytes(LZFDecoder.decode(bytes));
|
||||
}
|
||||
|
||||
public static CompressedString readCompressedString(StreamInput in) throws IOException {
|
||||
CompressedString compressedString = new CompressedString();
|
||||
compressedString.readFrom(in);
|
||||
return compressedString;
|
||||
}
|
||||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
bytes = new byte[in.readVInt()];
|
||||
in.readBytes(bytes, 0, bytes.length);
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.elasticsearch.discovery.DiscoveryService;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -252,15 +253,19 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
|
|||
// go over the meta data and create indices, we don't really need to copy over
|
||||
// the meta data per index, since we create the index and it will be added automatically
|
||||
for (final IndexMetaData indexMetaData : fMetaData) {
|
||||
createIndexService.createIndex(new MetaDataCreateIndexService.Request("gateway", indexMetaData.index()).settings(indexMetaData.settings()).mappings(indexMetaData.mappings()).timeout(timeValueSeconds(30)), new MetaDataCreateIndexService.Listener() {
|
||||
@Override public void onResponse(MetaDataCreateIndexService.Response response) {
|
||||
latch.countDown();
|
||||
}
|
||||
try {
|
||||
createIndexService.createIndex(new MetaDataCreateIndexService.Request("gateway", indexMetaData.index()).settings(indexMetaData.settings()).mappingsCompressed(indexMetaData.mappings()).timeout(timeValueSeconds(30)), new MetaDataCreateIndexService.Listener() {
|
||||
@Override public void onResponse(MetaDataCreateIndexService.Response response) {
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override public void onFailure(Throwable t) {
|
||||
logger.error("failed to create index [{}]", indexMetaData.index(), t);
|
||||
}
|
||||
});
|
||||
@Override public void onFailure(Throwable t) {
|
||||
logger.error("failed to create index [{}]", indexMetaData.index(), t);
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
logger.error("failed to create index [{}]", indexMetaData.index(), e);
|
||||
}
|
||||
}
|
||||
clusterService.submitStateUpdateTask("gateway (remove block)", new ClusterStateUpdateTask() {
|
||||
@Override public ClusterState execute(ClusterState currentState) {
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.lucene.analysis.Analyzer;
|
|||
import org.apache.lucene.document.Fieldable;
|
||||
import org.apache.lucene.search.Filter;
|
||||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.common.compress.CompressedString;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadSafe;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -39,7 +40,7 @@ public interface DocumentMapper {
|
|||
* When constructed by parsing a mapping definition, will return it. Otherwise,
|
||||
* returns <tt>null</tt>.
|
||||
*/
|
||||
String mappingSource();
|
||||
CompressedString mappingSource();
|
||||
|
||||
/**
|
||||
* Attributes of this type mappings.
|
||||
|
@ -49,7 +50,7 @@ public interface DocumentMapper {
|
|||
/**
|
||||
* Generates the source of the mapper based on the current mappings.
|
||||
*/
|
||||
String buildSource() throws FailedToGenerateSourceMapperException;
|
||||
void refreshSource() throws FailedToGenerateSourceMapperException;
|
||||
|
||||
UidFieldMapper uidMapper();
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.lucene.document.Document;
|
|||
import org.apache.lucene.search.Filter;
|
||||
import org.elasticsearch.common.Preconditions;
|
||||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.common.compress.CompressedString;
|
||||
import org.elasticsearch.common.lucene.search.TermFilter;
|
||||
import org.elasticsearch.common.thread.ThreadLocals;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
|
@ -69,8 +70,6 @@ public class XContentDocumentMapper implements DocumentMapper, ToXContent {
|
|||
|
||||
private final XContentObjectMapper rootObjectMapper;
|
||||
|
||||
private String mappingSource;
|
||||
|
||||
private ImmutableMap<String, Object> attributes = ImmutableMap.of();
|
||||
|
||||
private XContentMapper.BuilderContext builderContext = new XContentMapper.BuilderContext(new ContentPath(1));
|
||||
|
@ -120,11 +119,6 @@ public class XContentDocumentMapper implements DocumentMapper, ToXContent {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder mappingSource(String mappingSource) {
|
||||
this.mappingSource = mappingSource;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder indexAnalyzer(NamedAnalyzer indexAnalyzer) {
|
||||
this.indexAnalyzer = indexAnalyzer;
|
||||
return this;
|
||||
|
@ -146,7 +140,7 @@ public class XContentDocumentMapper implements DocumentMapper, ToXContent {
|
|||
public XContentDocumentMapper build() {
|
||||
Preconditions.checkNotNull(rootObjectMapper, "Mapper builder must have the root object mapper set");
|
||||
return new XContentDocumentMapper(index, rootObjectMapper, attributes, uidFieldMapper, idFieldMapper, typeFieldMapper, indexFieldMapper,
|
||||
sourceFieldMapper, allFieldMapper, indexAnalyzer, searchAnalyzer, boostFieldMapper, mappingSource);
|
||||
sourceFieldMapper, allFieldMapper, indexAnalyzer, searchAnalyzer, boostFieldMapper);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -163,7 +157,7 @@ public class XContentDocumentMapper implements DocumentMapper, ToXContent {
|
|||
|
||||
private volatile ImmutableMap<String, Object> attributes;
|
||||
|
||||
private volatile String mappingSource;
|
||||
private volatile CompressedString mappingSource;
|
||||
|
||||
private final XContentUidFieldMapper uidFieldMapper;
|
||||
|
||||
|
@ -202,12 +196,10 @@ public class XContentDocumentMapper implements DocumentMapper, ToXContent {
|
|||
XContentSourceFieldMapper sourceFieldMapper,
|
||||
XContentAllFieldMapper allFieldMapper,
|
||||
Analyzer indexAnalyzer, Analyzer searchAnalyzer,
|
||||
@Nullable XContentBoostFieldMapper boostFieldMapper,
|
||||
@Nullable String mappingSource) {
|
||||
@Nullable XContentBoostFieldMapper boostFieldMapper) {
|
||||
this.index = index;
|
||||
this.type = rootObjectMapper.name();
|
||||
this.attributes = attributes;
|
||||
this.mappingSource = mappingSource;
|
||||
this.rootObjectMapper = rootObjectMapper;
|
||||
this.uidFieldMapper = uidFieldMapper;
|
||||
this.idFieldMapper = idFieldMapper;
|
||||
|
@ -252,6 +244,8 @@ public class XContentDocumentMapper implements DocumentMapper, ToXContent {
|
|||
});
|
||||
|
||||
this.fieldMappers = new DocumentFieldMappers(this, tempFieldMappers);
|
||||
|
||||
refreshSource();
|
||||
}
|
||||
|
||||
@Override public String type() {
|
||||
|
@ -262,14 +256,10 @@ public class XContentDocumentMapper implements DocumentMapper, ToXContent {
|
|||
return this.attributes;
|
||||
}
|
||||
|
||||
@Override public String mappingSource() {
|
||||
@Override public CompressedString mappingSource() {
|
||||
return this.mappingSource;
|
||||
}
|
||||
|
||||
void mappingSource(String mappingSource) {
|
||||
this.mappingSource = mappingSource;
|
||||
}
|
||||
|
||||
@Override public UidFieldMapper uidMapper() {
|
||||
return this.uidFieldMapper;
|
||||
}
|
||||
|
@ -425,18 +415,18 @@ public class XContentDocumentMapper implements DocumentMapper, ToXContent {
|
|||
// let the merge with attributes to override the attributes
|
||||
attributes = mergeWith.attributes();
|
||||
// update the source of the merged one
|
||||
mappingSource = buildSource();
|
||||
refreshSource();
|
||||
}
|
||||
return new MergeResult(mergeContext.buildConflicts());
|
||||
}
|
||||
|
||||
@Override public String buildSource() throws FailedToGenerateSourceMapperException {
|
||||
@Override public void refreshSource() throws FailedToGenerateSourceMapperException {
|
||||
try {
|
||||
XContentBuilder builder = XContentFactory.contentTextBuilder(XContentType.JSON);
|
||||
builder.startObject();
|
||||
toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||
builder.endObject();
|
||||
return builder.string();
|
||||
this.mappingSource = new CompressedString(builder.string());
|
||||
} catch (Exception e) {
|
||||
throw new FailedToGenerateSourceMapperException(e.getMessage(), e);
|
||||
}
|
||||
|
|
|
@ -167,7 +167,7 @@ public class XContentDocumentMapperParser extends AbstractIndexComponent impleme
|
|||
|
||||
XContentDocumentMapper documentMapper = docBuilder.build();
|
||||
// update the source with the generated one
|
||||
documentMapper.mappingSource(documentMapper.buildSource());
|
||||
documentMapper.refreshSource();
|
||||
return documentMapper;
|
||||
}
|
||||
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.elasticsearch.cluster.routing.RoutingTable;
|
|||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.compress.CompressedString;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
@ -193,27 +194,27 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
String index = indexMetaData.index();
|
||||
IndexService indexService = indicesService.indexServiceSafe(index);
|
||||
MapperService mapperService = indexService.mapperService();
|
||||
ImmutableMap<String, String> mappings = indexMetaData.mappings();
|
||||
ImmutableMap<String, CompressedString> mappings = indexMetaData.mappings();
|
||||
// we don't support removing mappings for now ...
|
||||
for (Map.Entry<String, String> entry : mappings.entrySet()) {
|
||||
for (Map.Entry<String, CompressedString> entry : mappings.entrySet()) {
|
||||
String mappingType = entry.getKey();
|
||||
String mappingSource = entry.getValue();
|
||||
CompressedString mappingSource = entry.getValue();
|
||||
|
||||
try {
|
||||
if (!mapperService.hasMapping(mappingType)) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("[{}] adding mapping [{}], source [{}]", index, mappingType, mappingSource);
|
||||
logger.debug("[{}] adding mapping [{}], source [{}]", index, mappingType, mappingSource.string());
|
||||
}
|
||||
mapperService.add(mappingType, mappingSource);
|
||||
mapperService.add(mappingType, mappingSource.string());
|
||||
nodeMappingCreatedAction.nodeMappingCreated(new NodeMappingCreatedAction.NodeMappingCreatedResponse(index, mappingType, event.state().nodes().localNodeId()));
|
||||
} else {
|
||||
DocumentMapper existingMapper = mapperService.documentMapper(mappingType);
|
||||
if (!mappingSource.equals(existingMapper.mappingSource())) {
|
||||
// mapping changed, update it
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("[{}] updating mapping [{}], source [{}]", index, mappingType, mappingSource);
|
||||
logger.debug("[{}] updating mapping [{}], source [{}]", index, mappingType, mappingSource.string());
|
||||
}
|
||||
mapperService.add(mappingType, mappingSource);
|
||||
mapperService.add(mappingType, mappingSource.string());
|
||||
nodeMappingCreatedAction.nodeMappingCreated(new NodeMappingCreatedAction.NodeMappingCreatedResponse(index, mappingType, event.state().nodes().localNodeId()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
|||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.collect.ImmutableSet;
|
||||
import org.elasticsearch.common.compress.CompressedString;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsFilter;
|
||||
|
@ -136,8 +137,9 @@ public class RestClusterStateAction extends BaseRestHandler {
|
|||
builder.endObject();
|
||||
|
||||
builder.startObject("mappings");
|
||||
for (Map.Entry<String, String> entry : indexMetaData.mappings().entrySet()) {
|
||||
XContentParser parser = XContentFactory.xContent(entry.getValue()).createParser(entry.getValue());
|
||||
for (Map.Entry<String, CompressedString> entry : indexMetaData.mappings().entrySet()) {
|
||||
byte[] mappingSource = entry.getValue().uncompressed();
|
||||
XContentParser parser = XContentFactory.xContent(mappingSource).createParser(mappingSource);
|
||||
Map<String, Object> mapping = parser.map();
|
||||
if (mapping.size() == 1 && mapping.containsKey(entry.getKey())) {
|
||||
// the type name is the root value, reduce it
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.elasticsearch.client.Requests;
|
|||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.collect.ImmutableSet;
|
||||
import org.elasticsearch.common.compress.CompressedString;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
|
@ -74,12 +75,13 @@ public class RestGetMappingAction extends BaseRestHandler {
|
|||
for (IndexMetaData indexMetaData : metaData) {
|
||||
builder.startObject(indexMetaData.index());
|
||||
|
||||
for (Map.Entry<String, String> entry : indexMetaData.mappings().entrySet()) {
|
||||
for (Map.Entry<String, CompressedString> entry : indexMetaData.mappings().entrySet()) {
|
||||
if (!types.isEmpty() && !types.contains(entry.getKey())) {
|
||||
// filter this type out...
|
||||
continue;
|
||||
}
|
||||
XContentParser parser = XContentFactory.xContent(entry.getValue()).createParser(entry.getValue());
|
||||
byte[] mappingSource = entry.getValue().uncompressed();
|
||||
XContentParser parser = XContentFactory.xContent(mappingSource).createParser(mappingSource);
|
||||
Map<String, Object> mapping = parser.map();
|
||||
if (mapping.size() == 1 && mapping.containsKey(entry.getKey())) {
|
||||
// the type name is the root value, reduce it
|
||||
|
|
|
@ -83,7 +83,7 @@ public class ToAndFromJsonMetaDataTests {
|
|||
assertThat(indexMetaData.numberOfReplicas(), equalTo(2));
|
||||
assertThat(indexMetaData.settings().getAsMap().size(), equalTo(2));
|
||||
assertThat(indexMetaData.mappings().size(), equalTo(1));
|
||||
assertThat(indexMetaData.mappings().get("mapping1"), equalTo(MAPPING_SOURCE1));
|
||||
assertThat(indexMetaData.mappings().get("mapping1").string(), equalTo(MAPPING_SOURCE1));
|
||||
|
||||
indexMetaData = parsedMetaData.index("test4");
|
||||
assertThat(indexMetaData.numberOfShards(), equalTo(1));
|
||||
|
@ -92,8 +92,8 @@ public class ToAndFromJsonMetaDataTests {
|
|||
assertThat(indexMetaData.settings().get("setting1"), equalTo("value1"));
|
||||
assertThat(indexMetaData.settings().get("setting2"), equalTo("value2"));
|
||||
assertThat(indexMetaData.mappings().size(), equalTo(2));
|
||||
assertThat(indexMetaData.mappings().get("mapping1"), equalTo(MAPPING_SOURCE1));
|
||||
assertThat(indexMetaData.mappings().get("mapping2"), equalTo(MAPPING_SOURCE2));
|
||||
assertThat(indexMetaData.mappings().get("mapping1").string(), equalTo(MAPPING_SOURCE1));
|
||||
assertThat(indexMetaData.mappings().get("mapping2").string(), equalTo(MAPPING_SOURCE2));
|
||||
}
|
||||
|
||||
private static final String MAPPING_SOURCE1 = "{\"mapping1\":{\"text1\":{\"type\":\"string\"}}}";
|
||||
|
|
|
@ -52,7 +52,7 @@ public class SimpleAllMapperTests {
|
|||
@Test public void testSimpleAllMappersWithReparse() throws Exception {
|
||||
String mapping = copyToStringFromClasspath("/org/elasticsearch/index/mapper/xcontent/all/mapping.json");
|
||||
XContentDocumentMapper docMapper = XContentMapperTests.newParser().parse(mapping);
|
||||
String builtMapping = docMapper.buildSource();
|
||||
String builtMapping = docMapper.mappingSource().string();
|
||||
// System.out.println(builtMapping);
|
||||
// reparse it
|
||||
XContentDocumentMapper builtDocMapper = XContentMapperTests.newParser().parse(builtMapping);
|
||||
|
@ -84,7 +84,7 @@ public class SimpleAllMapperTests {
|
|||
@Test public void testSimpleAllMappersWithReparseWithStore() throws Exception {
|
||||
String mapping = copyToStringFromClasspath("/org/elasticsearch/index/mapper/xcontent/all/store-mapping.json");
|
||||
XContentDocumentMapper docMapper = XContentMapperTests.newParser().parse(mapping);
|
||||
String builtMapping = docMapper.buildSource();
|
||||
String builtMapping = docMapper.mappingSource().string();
|
||||
System.out.println(builtMapping);
|
||||
// reparse it
|
||||
XContentDocumentMapper builtDocMapper = XContentMapperTests.newParser().parse(builtMapping);
|
||||
|
|
|
@ -75,8 +75,9 @@ public class XContentMultiFieldTests {
|
|||
.add(stringField("indexed").index(Field.Index.ANALYZED))
|
||||
.add(stringField("not_indexed").index(Field.Index.NO).store(Field.Store.YES))
|
||||
)).build();
|
||||
builderDocMapper.refreshSource();
|
||||
|
||||
String builtMapping = builderDocMapper.buildSource();
|
||||
String builtMapping = builderDocMapper.mappingSource().string();
|
||||
// System.out.println(builtMapping);
|
||||
// reparse it
|
||||
XContentDocumentMapper docMapper = XContentMapperTests.newParser().parse(builtMapping);
|
||||
|
|
|
@ -59,7 +59,7 @@ public class SimpleXContentMapperTests {
|
|||
@Test public void testParseToJsonAndParse() throws Exception {
|
||||
String mapping = copyToStringFromClasspath("/org/elasticsearch/index/mapper/xcontent/simple/test-mapping.json");
|
||||
XContentDocumentMapper docMapper = XContentMapperTests.newParser().parse(mapping);
|
||||
String builtMapping = docMapper.buildSource();
|
||||
String builtMapping = docMapper.mappingSource().string();
|
||||
// System.out.println(builtMapping);
|
||||
// reparse it
|
||||
XContentDocumentMapper builtDocMapper = XContentMapperTests.newParser().parse(builtMapping);
|
||||
|
|
|
@ -54,7 +54,7 @@ public class SimpleAttachmentMapperTests {
|
|||
assertThat(doc.get(docMapper.mappers().smartName("file").mapper().names().indexName()), containsString("This document tests the ability of Apache Tika to extract content"));
|
||||
|
||||
// re-parse it
|
||||
String builtMapping = docMapper.buildSource();
|
||||
String builtMapping = docMapper.mappingSource().string();
|
||||
docMapper = mapperParser.parse(builtMapping);
|
||||
|
||||
json = jsonBuilder().startObject().field("_id", 1).field("file", copyToBytesFromClasspath("/org/elasticsearch/index/mapper/xcontent/testXHTML.html")).endObject().copiedBytes();
|
||||
|
|
Loading…
Reference in New Issue