From 2e8b0464b65a2ca0d7db738637b151d586395b63 Mon Sep 17 00:00:00 2001 From: kimchy Date: Sat, 13 Mar 2010 02:42:43 +0200 Subject: [PATCH] allow to specify mappings in created index (currently, only internally), use it to create the mappings on index creation with recoverying from gateway --- .../indices/create/CreateIndexRequest.java | 45 +++++++++++++++++++ .../create/TransportCreateIndexAction.java | 2 +- .../cluster/metadata/MetaDataService.java | 9 +++- .../elasticsearch/gateway/GatewayService.java | 27 +---------- .../cluster/IndicesClusterStateService.java | 10 ++--- .../transport/TransportService.java | 24 +++++++--- .../local/SimpleLocalTransportTests.java | 4 +- .../netty/SimpleNettyTransportTests.java | 4 +- .../netty/benchmark/BenchmarkNettyClient.java | 2 +- .../netty/benchmark/BenchmarkNettyServer.java | 2 +- 10 files changed, 82 insertions(+), 47 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java index f354c19e49e..1f2101e2ab4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java @@ -19,14 +19,18 @@ package org.elasticsearch.action.admin.indices.create; +import com.google.common.collect.Maps; +import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.master.MasterNodeOperationRequest; import org.elasticsearch.util.TimeValue; +import org.elasticsearch.util.json.JsonBuilder; import org.elasticsearch.util.settings.Settings; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.Map; import java.util.concurrent.TimeUnit; import static org.elasticsearch.action.Actions.*; @@ -50,6 +54,8 @@ public class CreateIndexRequest extends MasterNodeOperationRequest { private Settings settings = EMPTY_SETTINGS; + private Map mappings = Maps.newHashMap(); + private TimeValue timeout = new TimeValue(10, TimeUnit.SECONDS); CreateIndexRequest() { @@ -100,6 +106,36 @@ public class CreateIndexRequest extends MasterNodeOperationRequest { return this; } + /** + * Adds mapping that will be added when the index gets created. + * + * @param type The mapping type + * @param source The mapping source + */ + public CreateIndexRequest mapping(String type, String source) { + mappings.put(type, source); + return this; + } + + /** + * Adds mapping that will be added when the index gets created. + * + * @param type The mapping type + * @param source The mapping source + */ + public CreateIndexRequest mapping(String type, JsonBuilder source) { + try { + mappings.put(type, source.string()); + } catch (IOException e) { + throw new ElasticSearchIllegalArgumentException("Failed to build json for mapping request", e); + } + return this; + } + + Map mappings() { + return this.mappings; + } + /** * Timeout to wait for the index creation to be acknowledged by current cluster nodes. Defaults * to 10s. @@ -121,11 +157,20 @@ public class CreateIndexRequest extends MasterNodeOperationRequest { index = in.readUTF(); settings = readSettingsFromStream(in); timeout = readTimeValue(in); + int size = in.readInt(); + for (int i = 0; i < size; i++) { + mappings.put(in.readUTF(), in.readUTF()); + } } @Override public void writeTo(DataOutput out) throws IOException { out.writeUTF(index); writeSettingsToStream(settings, out); timeout.writeTo(out); + out.writeInt(mappings.size()); + for (Map.Entry entry : mappings.entrySet()) { + out.writeUTF(entry.getKey()); + out.writeUTF(entry.getValue()); + } } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java index 4df629a3b18..37581ad8728 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java @@ -57,7 +57,7 @@ public class TransportCreateIndexAction extends TransportMasterNodeOperationActi } @Override protected CreateIndexResponse masterOperation(CreateIndexRequest request) throws ElasticSearchException { - MetaDataService.CreateIndexResult createIndexResult = metaDataService.createIndex(request.index(), request.settings(), request.timeout()); + MetaDataService.CreateIndexResult createIndexResult = metaDataService.createIndex(request.index(), request.settings(), request.mappings(), request.timeout()); return new CreateIndexResponse(createIndexResult.acknowledged()); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataService.java index acaef86a372..1a388db1be9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataService.java @@ -91,7 +91,7 @@ public class MetaDataService extends AbstractComponent { // TODO should find nicer solution than sync here, since we block for timeout (same for other ops) - public synchronized CreateIndexResult createIndex(final String index, final Settings indexSettings, TimeValue timeout) throws IndexAlreadyExistsException { + public synchronized CreateIndexResult createIndex(final String index, final Settings indexSettings, final Map mappings, TimeValue timeout) throws IndexAlreadyExistsException { if (clusterService.state().routingTable().hasIndex(index)) { throw new IndexAlreadyExistsException(new Index(index)); } @@ -138,7 +138,12 @@ public class MetaDataService extends AbstractComponent { } Settings actualIndexSettings = indexSettingsBuilder.build(); - IndexMetaData indexMetaData = newIndexMetaDataBuilder(index).settings(actualIndexSettings).build(); + IndexMetaData.Builder indexMetaData = newIndexMetaDataBuilder(index).settings(actualIndexSettings); + if (mappings != null) { + for (Map.Entry entry : mappings.entrySet()) { + indexMetaData.putMapping(entry.getKey(), entry.getValue()); + } + } MetaData newMetaData = newMetaDataBuilder() .metaData(currentState.metaData()) .put(indexMetaData) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java index 78198901c87..dffd5f5d482 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -32,8 +32,6 @@ import org.elasticsearch.util.component.LifecycleComponent; import org.elasticsearch.util.concurrent.DynamicExecutors; import org.elasticsearch.util.settings.Settings; -import java.util.Map; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -163,34 +161,11 @@ public class GatewayService extends AbstractComponent implements ClusterStateLis for (final IndexMetaData indexMetaData : fMetaData) { threadPool.execute(new Runnable() { @Override public void run() { - final CountDownLatch latch = new CountDownLatch(1); - ClusterStateListener waitForIndex = new ClusterStateListener() { - @Override public void clusterChanged(ClusterChangedEvent event) { - if (event.state().metaData().hasIndex(indexMetaData.index())) { - latch.countDown(); - } - } - }; - clusterService.add(waitForIndex); try { - metaDataService.createIndex(indexMetaData.index(), indexMetaData.settings(), timeValueMillis(10)); + metaDataService.createIndex(indexMetaData.index(), indexMetaData.settings(), indexMetaData.mappings(), timeValueMillis(10)); } catch (Exception e) { - latch.countDown(); logger.error("Failed to create index [" + indexMetaData.index() + "]", e); } - try { - latch.await(5, TimeUnit.MINUTES); - } catch (InterruptedException e) { - logger.warn("Interrupted while waiting for index creation in gateway recovery"); - } - clusterService.remove(waitForIndex); - for (Map.Entry entry : indexMetaData.mappings().entrySet()) { - try { - metaDataService.putMapping(new String[]{indexMetaData.index()}, entry.getKey(), entry.getValue(), true, timeValueMillis(10)); - } catch (Exception e) { - logger.error("Failed to put mapping [" + entry.getKey() + "] for index [" + indexMetaData.index() + "]", e); - } - } } }); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 348ae6743c4..cd25a310415 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -147,11 +147,6 @@ public class IndicesClusterStateService extends AbstractComponent implements Clu RoutingTable routingTable = event.state().routingTable(); - RoutingNode routingNodes = event.state().readOnlyRoutingNodes().nodesToShards().get(event.state().nodes().localNodeId()); - if (routingNodes != null) { - applyShards(routingNodes, routingTable, event.state().nodes()); - } - // go over and update mappings for (IndexMetaData indexMetaData : metaData) { if (!indicesService.hasIndex(indexMetaData.index())) { @@ -191,6 +186,11 @@ public class IndicesClusterStateService extends AbstractComponent implements Clu } } + RoutingNode routingNodes = event.state().readOnlyRoutingNodes().nodesToShards().get(event.state().nodes().localNodeId()); + if (routingNodes != null) { + applyShards(routingNodes, routingTable, event.state().nodes()); + } + // go over and delete either all indices or specific shards for (final String index : indicesService.indices()) { if (metaData.index(index) == null) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java index c50912a7573..d01fb041430 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java @@ -22,6 +22,7 @@ package org.elasticsearch.transport; import com.google.inject.Inject; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.cluster.node.Node; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.util.component.AbstractComponent; import org.elasticsearch.util.component.Lifecycle; import org.elasticsearch.util.component.LifecycleComponent; @@ -45,6 +46,8 @@ public class TransportService extends AbstractComponent implements LifecycleComp private final Transport transport; + private final ThreadPool threadPool; + private final ConcurrentMap serverHandlers = newConcurrentMap(); private final NonBlockingHashMapLong clientHandlers = new NonBlockingHashMapLong(); @@ -53,13 +56,14 @@ public class TransportService extends AbstractComponent implements LifecycleComp private boolean throwConnectException = false; - public TransportService(Transport transport) { - this(EMPTY_SETTINGS, transport); + public TransportService(Transport transport, ThreadPool threadPool) { + this(EMPTY_SETTINGS, transport, threadPool); } - @Inject public TransportService(Settings settings, Transport transport) { + @Inject public TransportService(Settings settings, Transport transport, ThreadPool threadPool) { super(settings); this.transport = transport; + this.threadPool = threadPool; } @Override public Lifecycle.State lifecycleState() { @@ -143,13 +147,13 @@ public class TransportService extends AbstractComponent implements LifecycleComp return futureHandler; } - public void sendRequest(Node node, String action, Streamable message, - TransportResponseHandler handler) throws TransportException { + public void sendRequest(final Node node, final String action, final Streamable message, + final TransportResponseHandler handler) throws TransportException { final long requestId = newRequestId(); try { clientHandlers.put(requestId, handler); transport.sendRequest(node, requestId, action, message, handler); - } catch (Exception e) { + } catch (final Exception e) { // usually happen either because we failed to connect to the node // or because we failed serializing the message clientHandlers.remove(requestId); @@ -158,7 +162,13 @@ public class TransportService extends AbstractComponent implements LifecycleComp throw (ConnectTransportException) e; } } - handler.handleException(new SendRequestTransportException(node, action, e)); + // callback that an exception happened, but on a different thread since we don't + // want handlers to worry about stack overflows + threadPool.execute(new Runnable() { + @Override public void run() { + handler.handleException(new SendRequestTransportException(node, action, e)); + } + }); } } diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java index 00806ca672c..75dd3809004 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java @@ -47,10 +47,10 @@ public class SimpleLocalTransportTests { @BeforeClass public void setUp() { threadPool = new DynamicThreadPool(); - serviceA = new TransportService(new LocalTransport(threadPool)).start(); + serviceA = new TransportService(new LocalTransport(threadPool), threadPool).start(); serviceANode = new Node("A", serviceA.boundAddress().publishAddress()); - serviceB = new TransportService(new LocalTransport(threadPool)).start(); + serviceB = new TransportService(new LocalTransport(threadPool), threadPool).start(); serviceBNode = new Node("B", serviceB.boundAddress().publishAddress()); } diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java index b50f4f54118..f4da2736b45 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java @@ -47,10 +47,10 @@ public class SimpleNettyTransportTests { @BeforeClass public void setUp() { threadPool = new DynamicThreadPool(); - serviceA = new TransportService(new NettyTransport(threadPool)).start(); + serviceA = new TransportService(new NettyTransport(threadPool), threadPool).start(); serviceANode = new Node("A", serviceA.boundAddress().publishAddress()); - serviceB = new TransportService(new NettyTransport(threadPool)).start(); + serviceB = new TransportService(new NettyTransport(threadPool), threadPool).start(); serviceBNode = new Node("B", serviceB.boundAddress().publishAddress()); } diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/benchmark/BenchmarkNettyClient.java b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/benchmark/BenchmarkNettyClient.java index 03dbeed69c7..ab8ad69f298 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/benchmark/BenchmarkNettyClient.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/benchmark/BenchmarkNettyClient.java @@ -58,7 +58,7 @@ public class BenchmarkNettyClient { .build(); final ThreadPool threadPool = new CachedThreadPool(); - final TransportService transportService = new TransportService(new NettyTransport(settings, threadPool)).start(); + final TransportService transportService = new TransportService(new NettyTransport(settings, threadPool), threadPool).start(); final Node node = new Node("server", new InetSocketTransportAddress("localhost", 9999)); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/benchmark/BenchmarkNettyServer.java b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/benchmark/BenchmarkNettyServer.java index a2159e5042b..e2e494d42f0 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/benchmark/BenchmarkNettyServer.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/benchmark/BenchmarkNettyServer.java @@ -41,7 +41,7 @@ public class BenchmarkNettyServer { .build(); final ThreadPool threadPool = new CachedThreadPool(); - final TransportService transportService = new TransportService(new NettyTransport(settings, threadPool)).start(); + final TransportService transportService = new TransportService(new NettyTransport(settings, threadPool), threadPool).start(); transportService.registerHandler("benchmark", new BaseTransportRequestHandler() { @Override public BenchmarkMessage newInstance() {