allow to specify mappings in created index (currently, only internally), use it to create the mappings on index creation with recoverying from gateway

This commit is contained in:
kimchy 2010-03-13 02:42:43 +02:00
parent 024cdb4312
commit 2e8b0464b6
10 changed files with 82 additions and 47 deletions

View File

@ -19,14 +19,18 @@
package org.elasticsearch.action.admin.indices.create;
import com.google.common.collect.Maps;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.json.JsonBuilder;
import org.elasticsearch.util.settings.Settings;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.action.Actions.*;
@ -50,6 +54,8 @@ public class CreateIndexRequest extends MasterNodeOperationRequest {
private Settings settings = EMPTY_SETTINGS;
private Map<String, String> mappings = Maps.newHashMap();
private TimeValue timeout = new TimeValue(10, TimeUnit.SECONDS);
CreateIndexRequest() {
@ -100,6 +106,36 @@ public class CreateIndexRequest extends MasterNodeOperationRequest {
return this;
}
/**
* Adds mapping that will be added when the index gets created.
*
* @param type The mapping type
* @param source The mapping source
*/
public CreateIndexRequest mapping(String type, String source) {
mappings.put(type, source);
return this;
}
/**
* Adds mapping that will be added when the index gets created.
*
* @param type The mapping type
* @param source The mapping source
*/
public CreateIndexRequest mapping(String type, JsonBuilder source) {
try {
mappings.put(type, source.string());
} catch (IOException e) {
throw new ElasticSearchIllegalArgumentException("Failed to build json for mapping request", e);
}
return this;
}
Map<String, String> mappings() {
return this.mappings;
}
/**
* Timeout to wait for the index creation to be acknowledged by current cluster nodes. Defaults
* to <tt>10s</tt>.
@ -121,11 +157,20 @@ public class CreateIndexRequest extends MasterNodeOperationRequest {
index = in.readUTF();
settings = readSettingsFromStream(in);
timeout = readTimeValue(in);
int size = in.readInt();
for (int i = 0; i < size; i++) {
mappings.put(in.readUTF(), in.readUTF());
}
}
@Override public void writeTo(DataOutput out) throws IOException {
out.writeUTF(index);
writeSettingsToStream(settings, out);
timeout.writeTo(out);
out.writeInt(mappings.size());
for (Map.Entry<String, String> entry : mappings.entrySet()) {
out.writeUTF(entry.getKey());
out.writeUTF(entry.getValue());
}
}
}

View File

@ -57,7 +57,7 @@ public class TransportCreateIndexAction extends TransportMasterNodeOperationActi
}
@Override protected CreateIndexResponse masterOperation(CreateIndexRequest request) throws ElasticSearchException {
MetaDataService.CreateIndexResult createIndexResult = metaDataService.createIndex(request.index(), request.settings(), request.timeout());
MetaDataService.CreateIndexResult createIndexResult = metaDataService.createIndex(request.index(), request.settings(), request.mappings(), request.timeout());
return new CreateIndexResponse(createIndexResult.acknowledged());
}
}

View File

@ -91,7 +91,7 @@ public class MetaDataService extends AbstractComponent {
// TODO should find nicer solution than sync here, since we block for timeout (same for other ops)
public synchronized CreateIndexResult createIndex(final String index, final Settings indexSettings, TimeValue timeout) throws IndexAlreadyExistsException {
public synchronized CreateIndexResult createIndex(final String index, final Settings indexSettings, final Map<String, String> mappings, TimeValue timeout) throws IndexAlreadyExistsException {
if (clusterService.state().routingTable().hasIndex(index)) {
throw new IndexAlreadyExistsException(new Index(index));
}
@ -138,7 +138,12 @@ public class MetaDataService extends AbstractComponent {
}
Settings actualIndexSettings = indexSettingsBuilder.build();
IndexMetaData indexMetaData = newIndexMetaDataBuilder(index).settings(actualIndexSettings).build();
IndexMetaData.Builder indexMetaData = newIndexMetaDataBuilder(index).settings(actualIndexSettings);
if (mappings != null) {
for (Map.Entry<String, String> entry : mappings.entrySet()) {
indexMetaData.putMapping(entry.getKey(), entry.getValue());
}
}
MetaData newMetaData = newMetaDataBuilder()
.metaData(currentState.metaData())
.put(indexMetaData)

View File

@ -32,8 +32,6 @@ import org.elasticsearch.util.component.LifecycleComponent;
import org.elasticsearch.util.concurrent.DynamicExecutors;
import org.elasticsearch.util.settings.Settings;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -163,34 +161,11 @@ public class GatewayService extends AbstractComponent implements ClusterStateLis
for (final IndexMetaData indexMetaData : fMetaData) {
threadPool.execute(new Runnable() {
@Override public void run() {
final CountDownLatch latch = new CountDownLatch(1);
ClusterStateListener waitForIndex = new ClusterStateListener() {
@Override public void clusterChanged(ClusterChangedEvent event) {
if (event.state().metaData().hasIndex(indexMetaData.index())) {
latch.countDown();
}
}
};
clusterService.add(waitForIndex);
try {
metaDataService.createIndex(indexMetaData.index(), indexMetaData.settings(), timeValueMillis(10));
metaDataService.createIndex(indexMetaData.index(), indexMetaData.settings(), indexMetaData.mappings(), timeValueMillis(10));
} catch (Exception e) {
latch.countDown();
logger.error("Failed to create index [" + indexMetaData.index() + "]", e);
}
try {
latch.await(5, TimeUnit.MINUTES);
} catch (InterruptedException e) {
logger.warn("Interrupted while waiting for index creation in gateway recovery");
}
clusterService.remove(waitForIndex);
for (Map.Entry<String, String> entry : indexMetaData.mappings().entrySet()) {
try {
metaDataService.putMapping(new String[]{indexMetaData.index()}, entry.getKey(), entry.getValue(), true, timeValueMillis(10));
} catch (Exception e) {
logger.error("Failed to put mapping [" + entry.getKey() + "] for index [" + indexMetaData.index() + "]", e);
}
}
}
});
}

View File

@ -147,11 +147,6 @@ public class IndicesClusterStateService extends AbstractComponent implements Clu
RoutingTable routingTable = event.state().routingTable();
RoutingNode routingNodes = event.state().readOnlyRoutingNodes().nodesToShards().get(event.state().nodes().localNodeId());
if (routingNodes != null) {
applyShards(routingNodes, routingTable, event.state().nodes());
}
// go over and update mappings
for (IndexMetaData indexMetaData : metaData) {
if (!indicesService.hasIndex(indexMetaData.index())) {
@ -191,6 +186,11 @@ public class IndicesClusterStateService extends AbstractComponent implements Clu
}
}
RoutingNode routingNodes = event.state().readOnlyRoutingNodes().nodesToShards().get(event.state().nodes().localNodeId());
if (routingNodes != null) {
applyShards(routingNodes, routingTable, event.state().nodes());
}
// go over and delete either all indices or specific shards
for (final String index : indicesService.indices()) {
if (metaData.index(index) == null) {

View File

@ -22,6 +22,7 @@ package org.elasticsearch.transport;
import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.node.Node;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.component.Lifecycle;
import org.elasticsearch.util.component.LifecycleComponent;
@ -45,6 +46,8 @@ public class TransportService extends AbstractComponent implements LifecycleComp
private final Transport transport;
private final ThreadPool threadPool;
private final ConcurrentMap<String, TransportRequestHandler> serverHandlers = newConcurrentMap();
private final NonBlockingHashMapLong<TransportResponseHandler> clientHandlers = new NonBlockingHashMapLong<TransportResponseHandler>();
@ -53,13 +56,14 @@ public class TransportService extends AbstractComponent implements LifecycleComp
private boolean throwConnectException = false;
public TransportService(Transport transport) {
this(EMPTY_SETTINGS, transport);
public TransportService(Transport transport, ThreadPool threadPool) {
this(EMPTY_SETTINGS, transport, threadPool);
}
@Inject public TransportService(Settings settings, Transport transport) {
@Inject public TransportService(Settings settings, Transport transport, ThreadPool threadPool) {
super(settings);
this.transport = transport;
this.threadPool = threadPool;
}
@Override public Lifecycle.State lifecycleState() {
@ -143,13 +147,13 @@ public class TransportService extends AbstractComponent implements LifecycleComp
return futureHandler;
}
public <T extends Streamable> void sendRequest(Node node, String action, Streamable message,
TransportResponseHandler<T> handler) throws TransportException {
public <T extends Streamable> void sendRequest(final Node node, final String action, final Streamable message,
final TransportResponseHandler<T> handler) throws TransportException {
final long requestId = newRequestId();
try {
clientHandlers.put(requestId, handler);
transport.sendRequest(node, requestId, action, message, handler);
} catch (Exception e) {
} catch (final Exception e) {
// usually happen either because we failed to connect to the node
// or because we failed serializing the message
clientHandlers.remove(requestId);
@ -158,7 +162,13 @@ public class TransportService extends AbstractComponent implements LifecycleComp
throw (ConnectTransportException) e;
}
}
handler.handleException(new SendRequestTransportException(node, action, e));
// callback that an exception happened, but on a different thread since we don't
// want handlers to worry about stack overflows
threadPool.execute(new Runnable() {
@Override public void run() {
handler.handleException(new SendRequestTransportException(node, action, e));
}
});
}
}

View File

@ -47,10 +47,10 @@ public class SimpleLocalTransportTests {
@BeforeClass public void setUp() {
threadPool = new DynamicThreadPool();
serviceA = new TransportService(new LocalTransport(threadPool)).start();
serviceA = new TransportService(new LocalTransport(threadPool), threadPool).start();
serviceANode = new Node("A", serviceA.boundAddress().publishAddress());
serviceB = new TransportService(new LocalTransport(threadPool)).start();
serviceB = new TransportService(new LocalTransport(threadPool), threadPool).start();
serviceBNode = new Node("B", serviceB.boundAddress().publishAddress());
}

View File

@ -47,10 +47,10 @@ public class SimpleNettyTransportTests {
@BeforeClass public void setUp() {
threadPool = new DynamicThreadPool();
serviceA = new TransportService(new NettyTransport(threadPool)).start();
serviceA = new TransportService(new NettyTransport(threadPool), threadPool).start();
serviceANode = new Node("A", serviceA.boundAddress().publishAddress());
serviceB = new TransportService(new NettyTransport(threadPool)).start();
serviceB = new TransportService(new NettyTransport(threadPool), threadPool).start();
serviceBNode = new Node("B", serviceB.boundAddress().publishAddress());
}

View File

@ -58,7 +58,7 @@ public class BenchmarkNettyClient {
.build();
final ThreadPool threadPool = new CachedThreadPool();
final TransportService transportService = new TransportService(new NettyTransport(settings, threadPool)).start();
final TransportService transportService = new TransportService(new NettyTransport(settings, threadPool), threadPool).start();
final Node node = new Node("server", new InetSocketTransportAddress("localhost", 9999));

View File

@ -41,7 +41,7 @@ public class BenchmarkNettyServer {
.build();
final ThreadPool threadPool = new CachedThreadPool();
final TransportService transportService = new TransportService(new NettyTransport(settings, threadPool)).start();
final TransportService transportService = new TransportService(new NettyTransport(settings, threadPool), threadPool).start();
transportService.registerHandler("benchmark", new BaseTransportRequestHandler<BenchmarkMessage>() {
@Override public BenchmarkMessage newInstance() {