refactor gateway service to allow for more custom gateway implemenations
This commit is contained in:
parent
6a79a16e5b
commit
163b7be639
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.gateway;
|
||||
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.component.LifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Module;
|
||||
|
||||
|
@ -30,11 +29,15 @@ public interface Gateway extends LifecycleComponent<Gateway> {
|
|||
|
||||
String type();
|
||||
|
||||
void write(MetaData metaData) throws GatewayException;
|
||||
|
||||
MetaData read() throws GatewayException;
|
||||
void performStateRecovery(GatewayStateRecoveredListener listener) throws GatewayException;
|
||||
|
||||
Class<? extends Module> suggestIndexGateway();
|
||||
|
||||
void reset() throws Exception;
|
||||
|
||||
interface GatewayStateRecoveredListener {
|
||||
void onSuccess();
|
||||
|
||||
void onFailure(Throwable t);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,14 +20,12 @@
|
|||
package org.elasticsearch.gateway;
|
||||
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.ElasticSearchInterruptedException;
|
||||
import org.elasticsearch.cluster.*;
|
||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
|
||||
import org.elasticsearch.common.StopWatch;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -36,17 +34,12 @@ import org.elasticsearch.discovery.DiscoveryService;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static java.util.concurrent.Executors.*;
|
||||
import static org.elasticsearch.cluster.ClusterState.*;
|
||||
import static org.elasticsearch.cluster.metadata.MetaData.*;
|
||||
import static org.elasticsearch.common.unit.TimeValue.*;
|
||||
import static org.elasticsearch.common.util.concurrent.EsExecutors.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
|
@ -59,31 +52,23 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
|
|||
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
private volatile ExecutorService executor;
|
||||
|
||||
private final ClusterService clusterService;
|
||||
|
||||
private final DiscoveryService discoveryService;
|
||||
|
||||
private final MetaDataCreateIndexService createIndexService;
|
||||
|
||||
|
||||
private final TimeValue initialStateTimeout;
|
||||
|
||||
private final TimeValue recoverAfterTime;
|
||||
private final int recoverAfterNodes;
|
||||
|
||||
|
||||
private final AtomicBoolean readFromGateway = new AtomicBoolean();
|
||||
private final AtomicBoolean performedStateRecovery = new AtomicBoolean();
|
||||
|
||||
@Inject public GatewayService(Settings settings, Gateway gateway, ClusterService clusterService, DiscoveryService discoveryService,
|
||||
ThreadPool threadPool, MetaDataCreateIndexService createIndexService) {
|
||||
@Inject public GatewayService(Settings settings, Gateway gateway, ClusterService clusterService, DiscoveryService discoveryService, ThreadPool threadPool) {
|
||||
super(settings);
|
||||
this.gateway = gateway;
|
||||
this.clusterService = clusterService;
|
||||
this.discoveryService = discoveryService;
|
||||
this.threadPool = threadPool;
|
||||
this.createIndexService = createIndexService;
|
||||
this.initialStateTimeout = componentSettings.getAsTime("initial_state_timeout", TimeValue.timeValueSeconds(30));
|
||||
// allow to control a delay of when indices will get created
|
||||
this.recoverAfterTime = componentSettings.getAsTime("recover_after_time", null);
|
||||
|
@ -92,7 +77,6 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
|
|||
|
||||
@Override protected void doStart() throws ElasticSearchException {
|
||||
gateway.start();
|
||||
this.executor = newSingleThreadExecutor(daemonThreadFactory(settings, "gateway"));
|
||||
// if we received initial state, see if we can recover within the start phase, so we hold the
|
||||
// node from starting until we recovered properly
|
||||
if (discoveryService.initialStateReceived()) {
|
||||
|
@ -101,12 +85,12 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
|
|||
if (recoverAfterNodes != -1 && clusterState.nodes().dataNodes().size() < recoverAfterNodes) {
|
||||
updateClusterStateBlockedOnNotRecovered();
|
||||
logger.debug("not recovering from gateway, data_nodes_size [" + clusterState.nodes().dataNodes().size() + "] < recover_after_nodes [" + recoverAfterNodes + "]");
|
||||
} else if (recoverAfterTime != null) {
|
||||
updateClusterStateBlockedOnNotRecovered();
|
||||
logger.debug("not recovering from gateway, recover_after_time [{}]", recoverAfterTime);
|
||||
} else {
|
||||
if (readFromGateway.compareAndSet(false, true)) {
|
||||
Boolean waited = readFromGateway(initialStateTimeout);
|
||||
if (waited != null && !waited) {
|
||||
logger.warn("waited for {} for indices to be created from the gateway, and not all have been created", initialStateTimeout);
|
||||
}
|
||||
if (performedStateRecovery.compareAndSet(false, true)) {
|
||||
performStateRecovery(initialStateTimeout);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -118,12 +102,6 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
|
|||
|
||||
@Override protected void doStop() throws ElasticSearchException {
|
||||
clusterService.remove(this);
|
||||
executor.shutdown();
|
||||
try {
|
||||
executor.awaitTermination(10, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
gateway.stop();
|
||||
}
|
||||
|
||||
|
@ -141,86 +119,51 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
|
|||
if (recoverAfterNodes != -1 && clusterState.nodes().dataNodes().size() < recoverAfterNodes) {
|
||||
logger.debug("not recovering from gateway, data_nodes_size [" + clusterState.nodes().dataNodes().size() + "] < recover_after_nodes [" + recoverAfterNodes + "]");
|
||||
} else {
|
||||
if (readFromGateway.compareAndSet(false, true)) {
|
||||
executor.execute(new Runnable() {
|
||||
if (performedStateRecovery.compareAndSet(false, true)) {
|
||||
threadPool.cached().execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
readFromGateway(null);
|
||||
performStateRecovery(null);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
} else {
|
||||
writeToGateway(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void writeToGateway(final ClusterChangedEvent event) {
|
||||
if (!event.metaDataChanged()) {
|
||||
return;
|
||||
}
|
||||
executor.execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
logger.debug("writing to gateway {} ...", gateway);
|
||||
StopWatch stopWatch = new StopWatch().start();
|
||||
try {
|
||||
gateway.write(event.state().metaData());
|
||||
logger.debug("wrote to gateway {}, took {}", gateway, stopWatch.stop().totalTime());
|
||||
// TODO, we need to remember that we failed, maybe add a retry scheduler?
|
||||
} catch (Exception e) {
|
||||
logger.error("failed to write to gateway", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
private void performStateRecovery(@Nullable TimeValue timeout) {
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final Gateway.GatewayStateRecoveredListener recoveryListener = new Gateway.GatewayStateRecoveredListener() {
|
||||
@Override public void onSuccess() {
|
||||
markMetaDataAsReadFromGateway("success");
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads from the gateway. If the waitTimeout is set, will wait till all the indices
|
||||
* have been created from the meta data read from the gateway. Return value only applicable
|
||||
* when waiting, and indicates that everything was created within teh wait timeout.
|
||||
*/
|
||||
private Boolean readFromGateway(@Nullable TimeValue waitTimeout) {
|
||||
logger.debug("reading state from gateway {} ...", gateway);
|
||||
StopWatch stopWatch = new StopWatch().start();
|
||||
MetaData metaData;
|
||||
try {
|
||||
metaData = gateway.read();
|
||||
logger.debug("read state from gateway {}, took {}", gateway, stopWatch.stop().totalTime());
|
||||
} catch (Exception e) {
|
||||
logger.error("failed to read from gateway", e);
|
||||
markMetaDataAsReadFromGateway("failure");
|
||||
return false;
|
||||
@Override public void onFailure(Throwable t) {
|
||||
markMetaDataAsReadFromGateway("failure [" + t.getMessage() + "]");
|
||||
latch.countDown();
|
||||
}
|
||||
if (metaData == null) {
|
||||
logger.debug("no state read from gateway");
|
||||
markMetaDataAsReadFromGateway("no state");
|
||||
return true;
|
||||
}
|
||||
final MetaData fMetaData = metaData;
|
||||
final CountDownLatch latch = new CountDownLatch(fMetaData.indices().size());
|
||||
};
|
||||
|
||||
if (recoverAfterTime != null) {
|
||||
updateClusterStateBlockedOnNotRecovered();
|
||||
logger.debug("delaying initial state index creation for [{}]", recoverAfterTime);
|
||||
logger.debug("delaying initial state recovery for [{}]", recoverAfterTime);
|
||||
threadPool.schedule(new Runnable() {
|
||||
@Override public void run() {
|
||||
updateClusterStateFromGateway(fMetaData, latch);
|
||||
gateway.performStateRecovery(recoveryListener);
|
||||
}
|
||||
}, recoverAfterTime);
|
||||
} else {
|
||||
updateClusterStateFromGateway(fMetaData, latch);
|
||||
gateway.performStateRecovery(recoveryListener);
|
||||
}
|
||||
// if we delay indices creation, then waiting for them does not make sense
|
||||
if (recoverAfterTime != null) {
|
||||
return null;
|
||||
}
|
||||
if (waitTimeout != null) {
|
||||
|
||||
if (timeout != null) {
|
||||
try {
|
||||
return latch.await(waitTimeout.millis(), TimeUnit.MILLISECONDS);
|
||||
latch.await(timeout.millis(), TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
throw new ElasticSearchInterruptedException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private void markMetaDataAsReadFromGateway(String reason) {
|
||||
|
@ -239,44 +182,6 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
|
|||
});
|
||||
}
|
||||
|
||||
private void updateClusterStateFromGateway(final MetaData fMetaData, final CountDownLatch latch) {
|
||||
clusterService.submitStateUpdateTask("gateway (recovered meta-data)", new ProcessedClusterStateUpdateTask() {
|
||||
@Override public ClusterState execute(ClusterState currentState) {
|
||||
MetaData.Builder metaDataBuilder = newMetaDataBuilder()
|
||||
.metaData(currentState.metaData());
|
||||
// mark the metadata as read from gateway
|
||||
metaDataBuilder.markAsRecoveredFromGateway();
|
||||
return newClusterStateBuilder().state(currentState).metaData(metaDataBuilder).build();
|
||||
}
|
||||
|
||||
@Override public void clusterStateProcessed(ClusterState clusterState) {
|
||||
// go over the meta data and create indices, we don't really need to copy over
|
||||
// the meta data per index, since we create the index and it will be added automatically
|
||||
for (final IndexMetaData indexMetaData : fMetaData) {
|
||||
try {
|
||||
createIndexService.createIndex(new MetaDataCreateIndexService.Request("gateway", indexMetaData.index()).settings(indexMetaData.settings()).mappingsCompressed(indexMetaData.mappings()).timeout(timeValueSeconds(30)), new MetaDataCreateIndexService.Listener() {
|
||||
@Override public void onResponse(MetaDataCreateIndexService.Response response) {
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override public void onFailure(Throwable t) {
|
||||
logger.error("failed to create index [{}]", indexMetaData.index(), t);
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
logger.error("failed to create index [{}]", indexMetaData.index(), e);
|
||||
}
|
||||
}
|
||||
clusterService.submitStateUpdateTask("gateway (remove block)", new ClusterStateUpdateTask() {
|
||||
@Override public ClusterState execute(ClusterState currentState) {
|
||||
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(NOT_RECOVERED_FROM_GATEWAY_BLOCK);
|
||||
return newClusterStateBuilder().state(currentState).blocks(blocks).build();
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void updateClusterStateBlockedOnNotRecovered() {
|
||||
clusterService.submitStateUpdateTask("gateway (block: not recovered from gateway)", new ClusterStateUpdateTask() {
|
||||
@Override public ClusterState execute(ClusterState currentState) {
|
||||
|
|
|
@ -19,12 +19,12 @@
|
|||
|
||||
package org.elasticsearch.gateway.blobstore;
|
||||
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
|
||||
import org.elasticsearch.common.blobstore.*;
|
||||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
|
@ -32,8 +32,8 @@ import org.elasticsearch.common.xcontent.XContentFactory;
|
|||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.builder.BinaryXContentBuilder;
|
||||
import org.elasticsearch.gateway.Gateway;
|
||||
import org.elasticsearch.gateway.GatewayException;
|
||||
import org.elasticsearch.gateway.shared.SharedStorageGateway;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.ByteArrayInputStream;
|
||||
|
@ -42,7 +42,7 @@ import java.io.IOException;
|
|||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public abstract class BlobStoreGateway extends AbstractLifecycleComponent<Gateway> implements Gateway {
|
||||
public abstract class BlobStoreGateway extends SharedStorageGateway {
|
||||
|
||||
private BlobStore blobStore;
|
||||
|
||||
|
@ -54,8 +54,8 @@ public abstract class BlobStoreGateway extends AbstractLifecycleComponent<Gatewa
|
|||
|
||||
private volatile int currentIndex;
|
||||
|
||||
protected BlobStoreGateway(Settings settings) throws IOException {
|
||||
super(settings);
|
||||
protected BlobStoreGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService) {
|
||||
super(settings, clusterService, createIndexService);
|
||||
}
|
||||
|
||||
protected void initialize(BlobStore blobStore, ClusterName clusterName, @Nullable ByteSizeValue defaultChunkSize) throws IOException {
|
||||
|
@ -87,15 +87,6 @@ public abstract class BlobStoreGateway extends AbstractLifecycleComponent<Gatewa
|
|||
blobStore.delete(BlobPath.cleanPath());
|
||||
}
|
||||
|
||||
@Override protected void doStart() throws ElasticSearchException {
|
||||
}
|
||||
|
||||
@Override protected void doStop() throws ElasticSearchException {
|
||||
}
|
||||
|
||||
@Override protected void doClose() throws ElasticSearchException {
|
||||
}
|
||||
|
||||
@Override public MetaData read() throws GatewayException {
|
||||
try {
|
||||
this.currentIndex = findLatestIndex();
|
||||
|
|
|
@ -20,6 +20,8 @@
|
|||
package org.elasticsearch.gateway.fs;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
|
||||
import org.elasticsearch.common.blobstore.fs.FsBlobStore;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.inject.Module;
|
||||
|
@ -37,8 +39,9 @@ import java.io.IOException;
|
|||
*/
|
||||
public class FsGateway extends BlobStoreGateway {
|
||||
|
||||
@Inject public FsGateway(Settings settings, Environment environment, ClusterName clusterName, ThreadPool threadPool) throws IOException {
|
||||
super(settings);
|
||||
@Inject public FsGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService,
|
||||
Environment environment, ClusterName clusterName, ThreadPool threadPool) throws IOException {
|
||||
super(settings, clusterService, createIndexService);
|
||||
|
||||
File gatewayFile;
|
||||
String location = componentSettings.get("location");
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.gateway.none;
|
||||
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.inject.Module;
|
||||
|
@ -57,12 +56,8 @@ public class NoneGateway extends AbstractLifecycleComponent<Gateway> implements
|
|||
@Override protected void doClose() throws ElasticSearchException {
|
||||
}
|
||||
|
||||
@Override public void write(MetaData metaData) throws GatewayException {
|
||||
|
||||
}
|
||||
|
||||
@Override public MetaData read() throws GatewayException {
|
||||
return null;
|
||||
@Override public void performStateRecovery(GatewayStateRecoveredListener listener) throws GatewayException {
|
||||
listener.onSuccess();
|
||||
}
|
||||
|
||||
@Override public Class<? extends Module> suggestIndexGateway() {
|
||||
|
|
|
@ -0,0 +1,170 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.gateway.shared;
|
||||
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.cluster.*;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
|
||||
import org.elasticsearch.common.StopWatch;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.gateway.Gateway;
|
||||
import org.elasticsearch.gateway.GatewayException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static java.util.concurrent.Executors.*;
|
||||
import static org.elasticsearch.cluster.ClusterState.*;
|
||||
import static org.elasticsearch.cluster.metadata.MetaData.*;
|
||||
import static org.elasticsearch.common.unit.TimeValue.*;
|
||||
import static org.elasticsearch.common.util.concurrent.EsExecutors.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public abstract class SharedStorageGateway extends AbstractLifecycleComponent<Gateway> implements Gateway, ClusterStateListener {
|
||||
|
||||
private final ClusterService clusterService;
|
||||
|
||||
private final MetaDataCreateIndexService createIndexService;
|
||||
|
||||
private volatile boolean performedStateRecovery = false;
|
||||
|
||||
private volatile ExecutorService executor;
|
||||
|
||||
public SharedStorageGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService) {
|
||||
super(settings);
|
||||
this.clusterService = clusterService;
|
||||
this.createIndexService = createIndexService;
|
||||
}
|
||||
|
||||
@Override protected void doStart() throws ElasticSearchException {
|
||||
this.executor = newSingleThreadExecutor(daemonThreadFactory(settings, "gateway"));
|
||||
clusterService.add(this);
|
||||
}
|
||||
|
||||
@Override protected void doStop() throws ElasticSearchException {
|
||||
clusterService.remove(this);
|
||||
executor.shutdown();
|
||||
try {
|
||||
executor.awaitTermination(10, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
@Override protected void doClose() throws ElasticSearchException {
|
||||
}
|
||||
|
||||
@Override public void performStateRecovery(final GatewayStateRecoveredListener listener) throws GatewayException {
|
||||
performedStateRecovery = true;
|
||||
executor.execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
logger.debug("reading state from gateway {} ...", this);
|
||||
StopWatch stopWatch = new StopWatch().start();
|
||||
MetaData metaData;
|
||||
try {
|
||||
metaData = read();
|
||||
logger.debug("read state from gateway {}, took {}", this, stopWatch.stop().totalTime());
|
||||
if (metaData == null) {
|
||||
logger.debug("no state read from gateway");
|
||||
listener.onSuccess();
|
||||
} else {
|
||||
updateClusterStateFromGateway(metaData, listener);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("failed to read from gateway", e);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override public void clusterChanged(final ClusterChangedEvent event) {
|
||||
if (!lifecycle.started()) {
|
||||
return;
|
||||
}
|
||||
if (!performedStateRecovery) {
|
||||
return;
|
||||
}
|
||||
if (event.localNodeMaster()) {
|
||||
if (!event.metaDataChanged()) {
|
||||
return;
|
||||
}
|
||||
executor.execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
logger.debug("writing to gateway {} ...", this);
|
||||
StopWatch stopWatch = new StopWatch().start();
|
||||
try {
|
||||
write(event.state().metaData());
|
||||
logger.debug("wrote to gateway {}, took {}", this, stopWatch.stop().totalTime());
|
||||
// TODO, we need to remember that we failed, maybe add a retry scheduler?
|
||||
} catch (Exception e) {
|
||||
logger.error("failed to write to gateway", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void updateClusterStateFromGateway(final MetaData fMetaData, final GatewayStateRecoveredListener listener) {
|
||||
final AtomicInteger indicesCounter = new AtomicInteger(fMetaData.indices().size());
|
||||
clusterService.submitStateUpdateTask("gateway (recovered meta-data)", new ProcessedClusterStateUpdateTask() {
|
||||
@Override public ClusterState execute(ClusterState currentState) {
|
||||
MetaData.Builder metaDataBuilder = newMetaDataBuilder()
|
||||
.metaData(currentState.metaData());
|
||||
// mark the metadata as read from gateway
|
||||
metaDataBuilder.markAsRecoveredFromGateway();
|
||||
return newClusterStateBuilder().state(currentState).metaData(metaDataBuilder).build();
|
||||
}
|
||||
|
||||
@Override public void clusterStateProcessed(ClusterState clusterState) {
|
||||
// go over the meta data and create indices, we don't really need to copy over
|
||||
// the meta data per index, since we create the index and it will be added automatically
|
||||
for (final IndexMetaData indexMetaData : fMetaData) {
|
||||
try {
|
||||
createIndexService.createIndex(new MetaDataCreateIndexService.Request("gateway", indexMetaData.index()).settings(indexMetaData.settings()).mappingsCompressed(indexMetaData.mappings()).timeout(timeValueSeconds(30)), new MetaDataCreateIndexService.Listener() {
|
||||
@Override public void onResponse(MetaDataCreateIndexService.Response response) {
|
||||
if (indicesCounter.decrementAndGet() == 0) {
|
||||
listener.onSuccess();
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void onFailure(Throwable t) {
|
||||
logger.error("failed to create index [{}]", indexMetaData.index(), t);
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
logger.error("failed to create index [{}]", indexMetaData.index(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected abstract MetaData read() throws ElasticSearchException;
|
||||
|
||||
protected abstract void write(MetaData metaData) throws ElasticSearchException;
|
||||
}
|
|
@ -24,6 +24,8 @@ import org.elasticsearch.ElasticSearchIllegalArgumentException;
|
|||
import org.elasticsearch.cloud.aws.AwsS3Service;
|
||||
import org.elasticsearch.cloud.aws.blobstore.S3BlobStore;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.inject.Module;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -40,8 +42,9 @@ import java.io.IOException;
|
|||
*/
|
||||
public class S3Gateway extends BlobStoreGateway {
|
||||
|
||||
@Inject public S3Gateway(Settings settings, ClusterName clusterName, ThreadPool threadPool, AwsS3Service s3Service) throws IOException {
|
||||
super(settings);
|
||||
@Inject public S3Gateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService,
|
||||
ClusterName clusterName, ThreadPool threadPool, AwsS3Service s3Service) throws IOException {
|
||||
super(settings, clusterService, createIndexService);
|
||||
|
||||
String bucket = componentSettings.get("bucket");
|
||||
if (bucket == null) {
|
||||
|
|
|
@ -25,6 +25,8 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.ElasticSearchIllegalArgumentException;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
|
||||
import org.elasticsearch.common.blobstore.hdfs.HdfsBlobStore;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.inject.Module;
|
||||
|
@ -45,8 +47,9 @@ public class HdfsGateway extends BlobStoreGateway {
|
|||
|
||||
private final FileSystem fileSystem;
|
||||
|
||||
@Inject public HdfsGateway(Settings settings, ClusterName clusterName, ThreadPool threadPool) throws IOException {
|
||||
super(settings);
|
||||
@Inject public HdfsGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService,
|
||||
ClusterName clusterName, ThreadPool threadPool) throws IOException {
|
||||
super(settings, clusterService, createIndexService);
|
||||
|
||||
this.closeFileSystem = componentSettings.getAsBoolean("close_fs", true);
|
||||
String uri = componentSettings.get("uri");
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.hadoop.gateway;
|
|||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.client.Requests;
|
||||
|
@ -78,7 +79,8 @@ public class HdfsGatewayTests {
|
|||
|
||||
@Test public void testHdfsGateway() {
|
||||
// first, test meta data
|
||||
node.client().admin().indices().create(createIndexRequest("test")).actionGet();
|
||||
CreateIndexResponse createIndexResponse = node.client().admin().indices().create(createIndexRequest("test")).actionGet();
|
||||
assertThat(createIndexResponse.acknowledged(), equalTo(true));
|
||||
node.close();
|
||||
node = buildNode().start();
|
||||
try {
|
||||
|
|
Loading…
Reference in New Issue