improve initial read from gateway on first master startup, first master will not complete the startup sequence until meta data and all indices have been created from the gateway
This commit is contained in:
parent
a5a4b35e39
commit
962ce9a7e6
|
@ -38,6 +38,8 @@ public class DiscoveryService extends AbstractLifecycleComponent<DiscoveryServic
|
|||
|
||||
private final Discovery discovery;
|
||||
|
||||
private volatile boolean initialStateReceived;
|
||||
|
||||
@Inject public DiscoveryService(Settings settings, Discovery discovery) {
|
||||
super(settings);
|
||||
this.discovery = discovery;
|
||||
|
@ -58,7 +60,9 @@ public class DiscoveryService extends AbstractLifecycleComponent<DiscoveryServic
|
|||
logger.trace("Waiting for {} for the initial state to be set by the discovery", initialStateTimeout);
|
||||
if (latch.await(initialStateTimeout.millis(), TimeUnit.MILLISECONDS)) {
|
||||
logger.trace("Initial state set from discovery");
|
||||
initialStateReceived = true;
|
||||
} else {
|
||||
initialStateReceived = false;
|
||||
logger.warn("Waited for {} and no initial state was set by the discovery", initialStateTimeout);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -78,6 +82,14 @@ public class DiscoveryService extends AbstractLifecycleComponent<DiscoveryServic
|
|||
discovery.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns <tt>true</tt> if the initial state was received within the timeout waiting for it
|
||||
* on {@link #doStart()}.
|
||||
*/
|
||||
public boolean initialStateReceived() {
|
||||
return initialStateReceived;
|
||||
}
|
||||
|
||||
public String nodeDescription() {
|
||||
return discovery.nodeDescription();
|
||||
}
|
||||
|
|
|
@ -25,10 +25,14 @@ import org.elasticsearch.cluster.*;
|
|||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataService;
|
||||
import org.elasticsearch.discovery.DiscoveryService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.util.Nullable;
|
||||
import org.elasticsearch.util.TimeValue;
|
||||
import org.elasticsearch.util.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.util.settings.Settings;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -44,6 +48,8 @@ import static org.elasticsearch.util.concurrent.DynamicExecutors.*;
|
|||
*/
|
||||
public class GatewayService extends AbstractLifecycleComponent<GatewayService> implements ClusterStateListener {
|
||||
|
||||
private final TimeValue initialStateTimeout;
|
||||
|
||||
private final Gateway gateway;
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
|
@ -52,22 +58,40 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
|
|||
|
||||
private final ClusterService clusterService;
|
||||
|
||||
private final DiscoveryService discoveryService;
|
||||
|
||||
private final MetaDataService metaDataService;
|
||||
|
||||
private final AtomicBoolean firstMasterRead = new AtomicBoolean();
|
||||
|
||||
@Inject public GatewayService(Settings settings, Gateway gateway, ClusterService clusterService,
|
||||
@Inject public GatewayService(Settings settings, Gateway gateway, ClusterService clusterService, DiscoveryService discoveryService,
|
||||
ThreadPool threadPool, MetaDataService metaDataService) {
|
||||
super(settings);
|
||||
this.gateway = gateway;
|
||||
this.clusterService = clusterService;
|
||||
this.discoveryService = discoveryService;
|
||||
this.threadPool = threadPool;
|
||||
this.metaDataService = metaDataService;
|
||||
this.initialStateTimeout = componentSettings.getAsTime("initialStateTimeout", TimeValue.timeValueSeconds(30));
|
||||
}
|
||||
|
||||
@Override protected void doStart() throws ElasticSearchException {
|
||||
gateway.start();
|
||||
this.executor = newSingleThreadExecutor(daemonThreadFactory(settings, "gateway"));
|
||||
// if we received initial state, see if we can recover within the start phase, so we hold the
|
||||
// server from starting until we recovered properly
|
||||
if (discoveryService.initialStateReceived()) {
|
||||
if (discoveryService.firstMaster()) {
|
||||
if (firstMasterRead.compareAndSet(false, true)) {
|
||||
boolean waited = readFromGateway(initialStateTimeout);
|
||||
if (!waited) {
|
||||
logger.warn("Waited for {} for indices to be created from the gateway, and not all have been created", initialStateTimeout);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
logger.debug("Can't wait on start for (possibly) reading state from gateway, will do it asynchronously");
|
||||
}
|
||||
clusterService.add(this);
|
||||
}
|
||||
|
||||
|
@ -89,7 +113,11 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
|
|||
@Override public void clusterChanged(final ClusterChangedEvent event) {
|
||||
if (lifecycle.started() && event.localNodeMaster()) {
|
||||
if (event.firstMaster() && firstMasterRead.compareAndSet(false, true)) {
|
||||
readFromGateway();
|
||||
executor.execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
readFromGateway(null);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
writeToGateway(event);
|
||||
}
|
||||
|
@ -113,44 +141,56 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
|
|||
});
|
||||
}
|
||||
|
||||
private void readFromGateway() {
|
||||
/**
|
||||
* Reads from the gateway. If the waitTimeout is set, will wait till all the indices
|
||||
* have been created from the meta data read from the gateway. Return value only applicable
|
||||
* when waiting, and indicates that everything was created within teh wait timeout.
|
||||
*/
|
||||
private boolean readFromGateway(@Nullable TimeValue waitTimeout) {
|
||||
// we are the first master, go ahead and read and create indices
|
||||
logger.debug("First master in the cluster, reading state from gateway");
|
||||
executor.execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
MetaData metaData;
|
||||
try {
|
||||
metaData = gateway.read();
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to read from gateway", e);
|
||||
return;
|
||||
}
|
||||
if (metaData == null) {
|
||||
logger.debug("No state read from gateway");
|
||||
return;
|
||||
}
|
||||
final MetaData fMetaData = metaData;
|
||||
clusterService.submitStateUpdateTask("gateway (recovered meta-data)", new ClusterStateUpdateTask() {
|
||||
@Override public ClusterState execute(ClusterState currentState) {
|
||||
MetaData.Builder metaDataBuilder = newMetaDataBuilder()
|
||||
.metaData(currentState.metaData()).maxNumberOfShardsPerNode(fMetaData.maxNumberOfShardsPerNode());
|
||||
// go over the meta data and create indices, we don't really need to copy over
|
||||
// the meta data per index, since we create the index and it will be added automatically
|
||||
for (final IndexMetaData indexMetaData : fMetaData) {
|
||||
threadPool.execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
try {
|
||||
metaDataService.createIndex("gateway", indexMetaData.index(), indexMetaData.settings(), indexMetaData.mappings(), timeValueMillis(10));
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to create index [" + indexMetaData.index() + "]", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
MetaData metaData;
|
||||
try {
|
||||
metaData = gateway.read();
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to read from gateway", e);
|
||||
return false;
|
||||
}
|
||||
if (metaData == null) {
|
||||
logger.debug("No state read from gateway");
|
||||
return false;
|
||||
}
|
||||
final MetaData fMetaData = metaData;
|
||||
final CountDownLatch latch = new CountDownLatch(fMetaData.indices().size());
|
||||
clusterService.submitStateUpdateTask("gateway (recovered meta-data)", new ClusterStateUpdateTask() {
|
||||
@Override public ClusterState execute(ClusterState currentState) {
|
||||
MetaData.Builder metaDataBuilder = newMetaDataBuilder()
|
||||
.metaData(currentState.metaData()).maxNumberOfShardsPerNode(fMetaData.maxNumberOfShardsPerNode());
|
||||
// go over the meta data and create indices, we don't really need to copy over
|
||||
// the meta data per index, since we create the index and it will be added automatically
|
||||
for (final IndexMetaData indexMetaData : fMetaData) {
|
||||
threadPool.execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
try {
|
||||
metaDataService.createIndex("gateway", indexMetaData.index(), indexMetaData.settings(), indexMetaData.mappings(), timeValueMillis(initialStateTimeout.millis() - 1000));
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to create index [" + indexMetaData.index() + "]", e);
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
return newClusterStateBuilder().state(currentState).metaData(metaDataBuilder).build();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
return newClusterStateBuilder().state(currentState).metaData(metaDataBuilder).build();
|
||||
}
|
||||
});
|
||||
if (waitTimeout != null) {
|
||||
try {
|
||||
return latch.await(waitTimeout.millis(), TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -159,7 +159,6 @@ public final class InternalServer implements Server {
|
|||
}
|
||||
|
||||
injector.getInstance(IndicesService.class).start();
|
||||
injector.getInstance(GatewayService.class).start();
|
||||
injector.getInstance(ClusterService.class).start();
|
||||
injector.getInstance(RoutingService.class).start();
|
||||
injector.getInstance(SearchService.class).start();
|
||||
|
@ -167,6 +166,10 @@ public final class InternalServer implements Server {
|
|||
injector.getInstance(RestController.class).start();
|
||||
injector.getInstance(TransportService.class).start();
|
||||
DiscoveryService discoService = injector.getInstance(DiscoveryService.class).start();
|
||||
|
||||
// gateway should start after disco, so it can try and recovery from gateway on "start"
|
||||
injector.getInstance(GatewayService.class).start();
|
||||
|
||||
if (settings.getAsBoolean("http.enabled", true)) {
|
||||
injector.getInstance(HttpServer.class).start();
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
|
||||
package org.elasticsearch.test.integration.gateway;
|
||||
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
|
@ -86,10 +88,14 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractServersTes
|
|||
|
||||
logger.info("Closing the server");
|
||||
closeServer("server1");
|
||||
Thread.sleep(500);
|
||||
logger.info("Starting the server, should recover from the gateway (only translog should be populated)");
|
||||
startServer("server1");
|
||||
Thread.sleep(1000);
|
||||
|
||||
logger.info("Running Cluster Health (wait for the shards to startup)");
|
||||
ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForYellowStatus()).actionGet();
|
||||
logger.info("Done Cluster Health, status " + clusterHealth.status());
|
||||
assertThat(clusterHealth.timedOut(), equalTo(false));
|
||||
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW));
|
||||
|
||||
// verify that mapping is there
|
||||
clusterState = client("server1").admin().cluster().state(clusterState()).actionGet();
|
||||
|
@ -115,10 +121,14 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractServersTes
|
|||
|
||||
logger.info("Closing the server");
|
||||
closeServer("server1");
|
||||
Thread.sleep(500);
|
||||
logger.info("Starting the server, should recover from the gateway (both index and translog)");
|
||||
startServer("server1");
|
||||
Thread.sleep(1000);
|
||||
|
||||
logger.info("Running Cluster Health (wait for the shards to startup)");
|
||||
clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForYellowStatus()).actionGet();
|
||||
logger.info("Done Cluster Health, status " + clusterHealth.status());
|
||||
assertThat(clusterHealth.timedOut(), equalTo(false));
|
||||
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW));
|
||||
|
||||
logger.info("Getting #1, should not exists");
|
||||
getResponse = client("server1").get(getRequest("test").type("type1").id("1")).actionGet();
|
||||
|
@ -140,10 +150,14 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractServersTes
|
|||
|
||||
logger.info("Closing the server");
|
||||
closeServer("server1");
|
||||
Thread.sleep(500);
|
||||
logger.info("Starting the server, should recover from the gateway (just from the index, nothing in the translog)");
|
||||
startServer("server1");
|
||||
Thread.sleep(1000);
|
||||
|
||||
logger.info("Running Cluster Health (wait for the shards to startup)");
|
||||
clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForYellowStatus()).actionGet();
|
||||
logger.info("Done Cluster Health, status " + clusterHealth.status());
|
||||
assertThat(clusterHealth.timedOut(), equalTo(false));
|
||||
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW));
|
||||
|
||||
logger.info("Getting #1, should not exists");
|
||||
getResponse = client("server1").get(getRequest("test").type("type1").id("1")).actionGet();
|
||||
|
|
|
@ -56,10 +56,7 @@ public class FsMetaDataGatewayTests extends AbstractServersTests {
|
|||
|
||||
closeServer("server1");
|
||||
|
||||
Thread.sleep(1000);
|
||||
|
||||
startServer("server1");
|
||||
Thread.sleep(3000);
|
||||
try {
|
||||
client("server1").admin().indices().create(createIndexRequest("test")).actionGet();
|
||||
assert false : "index should exists";
|
||||
|
|
|
@ -22,7 +22,7 @@ package org.elasticsearch.test.integration.gateway.fs;
|
|||
import org.elasticsearch.test.integration.gateway.AbstractSimpleIndexGatewayTests;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class SimpleFsIndexGatewayTests extends AbstractSimpleIndexGatewayTests {
|
||||
|
||||
|
|
|
@ -22,7 +22,7 @@ package org.elasticsearch.test.integration.gateway.fs;
|
|||
import org.elasticsearch.test.integration.gateway.AbstractSimpleIndexGatewayTests;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class SimpleFsIndexInRamIndexGatewayTests extends AbstractSimpleIndexGatewayTests {
|
||||
|
||||
|
|
Loading…
Reference in New Issue