wait till the index gets created to create the mappings when recovering from the gateway
This commit is contained in:
parent
4e1a1f3437
commit
024cdb4312
|
@ -33,6 +33,7 @@ import org.elasticsearch.util.concurrent.DynamicExecutors;
|
||||||
import org.elasticsearch.util.settings.Settings;
|
import org.elasticsearch.util.settings.Settings;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -162,11 +163,27 @@ public class GatewayService extends AbstractComponent implements ClusterStateLis
|
||||||
for (final IndexMetaData indexMetaData : fMetaData) {
|
for (final IndexMetaData indexMetaData : fMetaData) {
|
||||||
threadPool.execute(new Runnable() {
|
threadPool.execute(new Runnable() {
|
||||||
@Override public void run() {
|
@Override public void run() {
|
||||||
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
ClusterStateListener waitForIndex = new ClusterStateListener() {
|
||||||
|
@Override public void clusterChanged(ClusterChangedEvent event) {
|
||||||
|
if (event.state().metaData().hasIndex(indexMetaData.index())) {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
clusterService.add(waitForIndex);
|
||||||
try {
|
try {
|
||||||
metaDataService.createIndex(indexMetaData.index(), indexMetaData.settings(), timeValueMillis(500));
|
metaDataService.createIndex(indexMetaData.index(), indexMetaData.settings(), timeValueMillis(10));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
latch.countDown();
|
||||||
logger.error("Failed to create index [" + indexMetaData.index() + "]", e);
|
logger.error("Failed to create index [" + indexMetaData.index() + "]", e);
|
||||||
}
|
}
|
||||||
|
try {
|
||||||
|
latch.await(5, TimeUnit.MINUTES);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
logger.warn("Interrupted while waiting for index creation in gateway recovery");
|
||||||
|
}
|
||||||
|
clusterService.remove(waitForIndex);
|
||||||
for (Map.Entry<String, String> entry : indexMetaData.mappings().entrySet()) {
|
for (Map.Entry<String, String> entry : indexMetaData.mappings().entrySet()) {
|
||||||
try {
|
try {
|
||||||
metaDataService.putMapping(new String[]{indexMetaData.index()}, entry.getKey(), entry.getValue(), true, timeValueMillis(10));
|
metaDataService.putMapping(new String[]{indexMetaData.index()}, entry.getKey(), entry.getValue(), true, timeValueMillis(10));
|
||||||
|
|
Loading…
Reference in New Issue