Create Index: Allow to provide index warmers when creating an index, closes #1917.
This commit is contained in:
parent
ca2dc1801c
commit
f0007fd4ae
|
@ -25,6 +25,7 @@ import org.elasticsearch.ElasticSearchIllegalArgumentException;
|
|||
import org.elasticsearch.ElasticSearchParseException;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
@ -65,6 +66,8 @@ public class CreateIndexRequest extends MasterNodeOperationRequest {
|
|||
|
||||
private Map<String, String> mappings = newHashMap();
|
||||
|
||||
private Map<String, IndexMetaData.Custom> customs = newHashMap();
|
||||
|
||||
private TimeValue timeout = new TimeValue(10, TimeUnit.SECONDS);
|
||||
|
||||
CreateIndexRequest() {
|
||||
|
@ -271,15 +274,28 @@ public class CreateIndexRequest extends MasterNodeOperationRequest {
|
|||
*/
|
||||
public CreateIndexRequest source(Map<String, Object> source) {
|
||||
boolean found = false;
|
||||
if (source.containsKey("settings")) {
|
||||
settings((Map<String, Object>) source.get("settings"));
|
||||
found = true;
|
||||
}
|
||||
if (source.containsKey("mappings")) {
|
||||
found = true;
|
||||
Map<String, Object> mappings = (Map<String, Object>) source.get("mappings");
|
||||
for (Map.Entry<String, Object> entry : mappings.entrySet()) {
|
||||
mapping(entry.getKey(), (Map<String, Object>) entry.getValue());
|
||||
for (Map.Entry<String, Object> entry : source.entrySet()) {
|
||||
String name = entry.getKey();
|
||||
if (name.equals("settings")) {
|
||||
found = true;
|
||||
settings((Map<String, Object>) entry.getValue());
|
||||
} else if (name.equals("mappings")) {
|
||||
found = true;
|
||||
Map<String, Object> mappings = (Map<String, Object>) entry.getValue();
|
||||
for (Map.Entry<String, Object> entry1 : mappings.entrySet()) {
|
||||
mapping(entry1.getKey(), (Map<String, Object>) entry1.getValue());
|
||||
}
|
||||
} else {
|
||||
// maybe custom?
|
||||
IndexMetaData.Custom.Factory factory = IndexMetaData.lookupFactory(name);
|
||||
if (factory != null) {
|
||||
found = true;
|
||||
try {
|
||||
customs.put(name, factory.fromMap((Map<String, Object>) entry.getValue()));
|
||||
} catch (IOException e) {
|
||||
throw new ElasticSearchParseException("failed to parse custom metadata for [" + name + "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!found) {
|
||||
|
@ -293,6 +309,15 @@ public class CreateIndexRequest extends MasterNodeOperationRequest {
|
|||
return this.mappings;
|
||||
}
|
||||
|
||||
public CreateIndexRequest custom(IndexMetaData.Custom custom) {
|
||||
customs.put(custom.type(), custom);
|
||||
return this;
|
||||
}
|
||||
|
||||
Map<String, IndexMetaData.Custom> customs() {
|
||||
return this.customs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Timeout to wait for the index creation to be acknowledged by current cluster nodes. Defaults
|
||||
* to <tt>10s</tt>.
|
||||
|
@ -338,6 +363,12 @@ public class CreateIndexRequest extends MasterNodeOperationRequest {
|
|||
for (int i = 0; i < size; i++) {
|
||||
mappings.put(in.readUTF(), in.readUTF());
|
||||
}
|
||||
int customSize = in.readVInt();
|
||||
for (int i = 0; i < customSize; i++) {
|
||||
String type = in.readUTF();
|
||||
IndexMetaData.Custom customIndexMetaData = IndexMetaData.lookupFactorySafe(type).readFrom(in);
|
||||
customs.put(type, customIndexMetaData);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -352,5 +383,10 @@ public class CreateIndexRequest extends MasterNodeOperationRequest {
|
|||
out.writeUTF(entry.getKey());
|
||||
out.writeUTF(entry.getValue());
|
||||
}
|
||||
out.writeVInt(customs.size());
|
||||
for (Map.Entry<String, IndexMetaData.Custom> entry : customs.entrySet()) {
|
||||
out.writeUTF(entry.getKey());
|
||||
IndexMetaData.lookupFactorySafe(entry.getKey()).writeTo(entry.getValue(), out);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.action.admin.indices.create;
|
|||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.support.BaseIndicesRequestBuilder;
|
||||
import org.elasticsearch.client.IndicesAdminClient;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
@ -159,6 +160,11 @@ public class CreateIndexRequestBuilder extends BaseIndicesRequestBuilder<CreateI
|
|||
return this;
|
||||
}
|
||||
|
||||
public CreateIndexRequestBuilder setCustom(IndexMetaData.Custom custom) {
|
||||
request.custom(custom);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the settings and mappings as a single source.
|
||||
*/
|
||||
|
|
|
@ -83,19 +83,23 @@ public class TransportCreateIndexAction extends TransportMasterNodeOperationActi
|
|||
final AtomicReference<CreateIndexResponse> responseRef = new AtomicReference<CreateIndexResponse>();
|
||||
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
createIndexService.createIndex(new MetaDataCreateIndexService.Request(cause, request.index()).settings(request.settings()).mappings(request.mappings()).timeout(request.timeout()), new MetaDataCreateIndexService.Listener() {
|
||||
@Override
|
||||
public void onResponse(MetaDataCreateIndexService.Response response) {
|
||||
responseRef.set(new CreateIndexResponse(response.acknowledged()));
|
||||
latch.countDown();
|
||||
}
|
||||
createIndexService.createIndex(new MetaDataCreateIndexService.Request(cause, request.index()).settings(request.settings())
|
||||
.mappings(request.mappings())
|
||||
.customs(request.customs())
|
||||
.timeout(request.timeout()),
|
||||
new MetaDataCreateIndexService.Listener() {
|
||||
@Override
|
||||
public void onResponse(MetaDataCreateIndexService.Response response) {
|
||||
responseRef.set(new CreateIndexResponse(response.acknowledged()));
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
failureRef.set(t);
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
failureRef.set(t);
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
latch.await();
|
||||
|
|
|
@ -68,6 +68,7 @@ import java.util.concurrent.ScheduledFuture;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static com.google.common.collect.Maps.newHashMap;
|
||||
import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
|
||||
import static org.elasticsearch.cluster.metadata.MetaData.newMetaDataBuilder;
|
||||
|
@ -154,7 +155,9 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
mappings.put(entry.getKey(), parseMapping(entry.getValue()));
|
||||
}
|
||||
|
||||
// TODO: request should be able to add custom metadata
|
||||
for (Map.Entry<String, Custom> entry : request.customs.entrySet()) {
|
||||
customs.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
||||
// apply templates, merging the mappings into the request mapping if exists
|
||||
for (IndexTemplateMetaData template : templates) {
|
||||
|
@ -503,6 +506,9 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
|
||||
Map<String, String> mappings = Maps.newHashMap();
|
||||
|
||||
Map<String, IndexMetaData.Custom> customs = newHashMap();
|
||||
|
||||
|
||||
TimeValue timeout = TimeValue.timeValueSeconds(5);
|
||||
|
||||
Set<ClusterBlock> blocks = Sets.newHashSet();
|
||||
|
@ -536,6 +542,11 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Request customs(Map<String, Custom> customs) {
|
||||
this.customs.putAll(customs);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Request blocks(Set<ClusterBlock> blocks) {
|
||||
this.blocks.addAll(blocks);
|
||||
return this;
|
||||
|
|
|
@ -111,4 +111,35 @@ public class SimpleIndicesWarmerTests extends AbstractNodesTests {
|
|||
client.prepareIndex("test", "type1", "1").setSource("field", "value1").setRefresh(true).execute().actionGet();
|
||||
client.prepareIndex("test", "type1", "2").setSource("field", "value2").setRefresh(true).execute().actionGet();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createIndexWarmer() {
|
||||
client.admin().indices().prepareDelete().execute().actionGet();
|
||||
|
||||
client.admin().indices().prepareCreate("test")
|
||||
.setSource("{\n" +
|
||||
" \"settings\" : {\n" +
|
||||
" \"index.number_of_shards\" : 1\n" +
|
||||
" },\n" +
|
||||
" \"warmers\" : {\n" +
|
||||
" \"warmer_1\" : {\n" +
|
||||
" \"types\" : [],\n" +
|
||||
" \"source\" : {\n" +
|
||||
" \"query\" : {\n" +
|
||||
" \"match_all\" : {}\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
"}")
|
||||
.execute().actionGet();
|
||||
|
||||
ClusterState clusterState = client.admin().cluster().prepareState().execute().actionGet().state();
|
||||
IndexWarmersMetaData warmersMetaData = clusterState.metaData().index("test").custom(IndexWarmersMetaData.TYPE);
|
||||
assertThat(warmersMetaData, Matchers.notNullValue());
|
||||
assertThat(warmersMetaData.entries().size(), equalTo(1));
|
||||
|
||||
client.prepareIndex("test", "type1", "1").setSource("field", "value1").setRefresh(true).execute().actionGet();
|
||||
client.prepareIndex("test", "type1", "2").setSource("field", "value2").setRefresh(true).execute().actionGet();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue