Merge pull request #19324 from rjernst/repository_deguice2

Add RepositoryPlugin interface for registering snapshot repositories
This commit is contained in:
Ryan Ernst 2016-07-08 14:38:07 -07:00 committed by GitHub
commit dea00a0b16
42 changed files with 526 additions and 1110 deletions

View File

@ -97,6 +97,7 @@ import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.plugins.ScriptPlugin;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.rest.RestController;
@ -291,7 +292,7 @@ public class Node implements Closeable {
clusterModule.getIndexNameExpressionResolver(), settingsModule.getClusterSettings(),
pluginsService.filterPlugins(ActionPlugin.class)));
modules.add(new GatewayModule());
modules.add(new RepositoriesModule());
modules.add(new RepositoriesModule(environment, pluginsService.filterPlugins(RepositoryPlugin.class)));
pluginsService.processModules(modules);
CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(),
settingsModule.getClusterSettings());

View File

@ -17,30 +17,28 @@
* under the License.
*/
package org.elasticsearch.repositories;
package org.elasticsearch.plugins;
import org.elasticsearch.common.settings.Settings;
import java.util.Collections;
import java.util.Map;
import org.elasticsearch.env.Environment;
import org.elasticsearch.repositories.Repository;
/**
* Combines repository-specific settings with global settings
* An extension point for {@link Plugin} implementations to add custom snapshot repositories.
*/
public interface RepositoryPlugin {
public class RepositorySettings {
private final Settings globalSettings;
private final Settings settings;
public RepositorySettings(Settings globalSettings, Settings settings) {
this.globalSettings = globalSettings;
this.settings = settings;
}
public Settings globalSettings() {
return globalSettings;
}
public Settings settings() {
return settings;
/**
* Returns repository types added by this plugin.
*
* @param env The environment for the local node, which may be used for the local settings and path.repo
*
* The key of the returned {@link Map} is the type name of the repository and
* the value is a factory to construct the {@link Repository} interface.
*/
default Map<String, Repository.Factory> getRepositories(Environment env) {
return Collections.emptyMap();
}
}

View File

@ -19,8 +19,17 @@
package org.elasticsearch.repositories;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.binder.LinkedBindingBuilder;
import org.elasticsearch.common.inject.multibindings.MapBinder;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.repositories.uri.URLRepository;
import org.elasticsearch.snapshots.RestoreService;
@ -29,21 +38,25 @@ import org.elasticsearch.snapshots.SnapshotsService;
/**
* Sets up classes for Snapshot/Restore.
*
* Plugins can add custom repository types by calling {@link #registerRepository(String, Class)}.
*/
public class RepositoriesModule extends AbstractModule {
private final RepositoryTypesRegistry repositoryTypes = new RepositoryTypesRegistry();
private final Map<String, Repository.Factory> repositoryTypes;
public RepositoriesModule() {
registerRepository(FsRepository.TYPE, FsRepository.class);
registerRepository(URLRepository.TYPE, URLRepository.class);
}
public RepositoriesModule(Environment env, List<RepositoryPlugin> repoPlugins) {
Map<String, Repository.Factory> factories = new HashMap<>();
factories.put(FsRepository.TYPE, (metadata) -> new FsRepository(metadata, env));
factories.put(URLRepository.TYPE, (metadata) -> new URLRepository(metadata, env));
/** Registers a custom repository type to the given {@link Repository}. */
public void registerRepository(String type, Class<? extends Repository> repositoryType) {
repositoryTypes.registerRepository(type, repositoryType);
for (RepositoryPlugin repoPlugin : repoPlugins) {
Map<String, Repository.Factory> newRepoTypes = repoPlugin.getRepositories(env);
for (Map.Entry<String, Repository.Factory> entry : newRepoTypes.entrySet()) {
if (factories.put(entry.getKey(), entry.getValue()) != null) {
throw new IllegalArgumentException("Repository type [" + entry.getKey() + "] is already registered");
}
}
}
repositoryTypes = Collections.unmodifiableMap(factories);
}
@Override
@ -53,6 +66,7 @@ public class RepositoriesModule extends AbstractModule {
bind(SnapshotShardsService.class).asEagerSingleton();
bind(TransportNodesSnapshotsStatus.class).asEagerSingleton();
bind(RestoreService.class).asEagerSingleton();
bind(RepositoryTypesRegistry.class).toInstance(repositoryTypes);
MapBinder<String, Repository.Factory> typesBinder = MapBinder.newMapBinder(binder(), String.class, Repository.Factory.class);
repositoryTypes.forEach((k, v) -> typesBinder.addBinding(k).toInstance(v));
}
}

View File

@ -33,8 +33,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.snapshots.RestoreService;
@ -44,35 +42,30 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.common.settings.Settings.Builder.EMPTY_SETTINGS;
/**
* Service responsible for maintaining and providing access to snapshot repositories on nodes.
*/
public class RepositoriesService extends AbstractComponent implements ClusterStateListener {
private final RepositoryTypesRegistry typesRegistry;
private final Injector injector;
private final Map<String, Repository.Factory> typesRegistry;
private final ClusterService clusterService;
private final VerifyNodeRepositoryAction verifyAction;
private volatile Map<String, RepositoryHolder> repositories = emptyMap();
private volatile Map<String, Repository> repositories = Collections.emptyMap();
@Inject
public RepositoriesService(Settings settings, ClusterService clusterService, TransportService transportService, RepositoryTypesRegistry typesRegistry, Injector injector) {
public RepositoriesService(Settings settings, ClusterService clusterService, TransportService transportService,
Map<String, Repository.Factory> typesRegistry) {
super(settings);
this.typesRegistry = typesRegistry;
this.injector = injector;
this.clusterService = clusterService;
// Doesn't make sense to maintain repositories on non-master and non-data nodes
// Nothing happens there anyway
@ -270,49 +263,53 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
logger.trace("processing new index repositories for state version [{}]", event.state().version());
Map<String, RepositoryHolder> survivors = new HashMap<>();
Map<String, Repository> survivors = new HashMap<>();
// First, remove repositories that are no longer there
for (Map.Entry<String, RepositoryHolder> entry : repositories.entrySet()) {
for (Map.Entry<String, Repository> entry : repositories.entrySet()) {
if (newMetaData == null || newMetaData.repository(entry.getKey()) == null) {
logger.debug("unregistering repository [{}]", entry.getKey());
closeRepository(entry.getKey(), entry.getValue());
closeRepository(entry.getValue());
} else {
survivors.put(entry.getKey(), entry.getValue());
}
}
Map<String, RepositoryHolder> builder = new HashMap<>();
Map<String, Repository> builder = new HashMap<>();
if (newMetaData != null) {
// Now go through all repositories and update existing or create missing
for (RepositoryMetaData repositoryMetaData : newMetaData.repositories()) {
RepositoryHolder holder = survivors.get(repositoryMetaData.name());
if (holder != null) {
Repository repository = survivors.get(repositoryMetaData.name());
if (repository != null) {
// Found previous version of this repository
if (!holder.type.equals(repositoryMetaData.type()) || !holder.settings.equals(repositoryMetaData.settings())) {
RepositoryMetaData previousMetadata = repository.getMetadata();
if (previousMetadata.type().equals(repositoryMetaData.type()) == false
|| previousMetadata.settings().equals(repositoryMetaData.settings()) == false) {
// Previous version is different from the version in settings
logger.debug("updating repository [{}]", repositoryMetaData.name());
closeRepository(repositoryMetaData.name(), holder);
holder = null;
closeRepository(repository);
repository = null;
try {
holder = createRepositoryHolder(repositoryMetaData);
repository = createRepository(repositoryMetaData);
} catch (RepositoryException ex) {
// TODO: this catch is bogus, it means the old repo is already closed,
// but we have nothing to replace it
logger.warn("failed to change repository [{}]", ex, repositoryMetaData.name());
}
}
} else {
try {
holder = createRepositoryHolder(repositoryMetaData);
repository = createRepository(repositoryMetaData);
} catch (RepositoryException ex) {
logger.warn("failed to create repository [{}]", ex, repositoryMetaData.name());
}
}
if (holder != null) {
if (repository != null) {
logger.debug("registering repository [{}]", repositoryMetaData.name());
builder.put(repositoryMetaData.name(), holder);
builder.put(repositoryMetaData.name(), repository);
}
}
}
repositories = unmodifiableMap(builder);
repositories = Collections.unmodifiableMap(builder);
} catch (Exception ex) {
logger.warn("failure updating cluster state ", ex);
}
@ -323,16 +320,16 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
* <p>
* This method is called only on the master node
*
* @param repository repository name
* @param repositoryName repository name
* @return registered repository
* @throws RepositoryMissingException if repository with such name isn't registered
*/
public Repository repository(String repository) {
RepositoryHolder holder = repositories.get(repository);
if (holder != null) {
return holder.repository;
public Repository repository(String repositoryName) {
Repository repository = repositories.get(repositoryName);
if (repository != null) {
return repository;
}
throw new RepositoryMissingException(repository);
throw new RepositoryMissingException(repositoryName);
}
/**
@ -346,55 +343,44 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
* @return {@code true} if new repository was added or {@code false} if it was ignored
*/
private boolean registerRepository(RepositoryMetaData repositoryMetaData) throws IOException {
RepositoryHolder previous = repositories.get(repositoryMetaData.name());
Repository previous = repositories.get(repositoryMetaData.name());
if (previous != null) {
if (!previous.type.equals(repositoryMetaData.type()) && previous.settings.equals(repositoryMetaData.settings())) {
RepositoryMetaData previousMetadata = previous.getMetadata();
if (!previousMetadata.type().equals(repositoryMetaData.type()) && previousMetadata.settings().equals(repositoryMetaData.settings())) {
// Previous version is the same as this one - ignore it
return false;
}
}
RepositoryHolder holder = createRepositoryHolder(repositoryMetaData);
Repository newRepo = createRepository(repositoryMetaData);
if (previous != null) {
// Closing previous version
closeRepository(repositoryMetaData.name(), previous);
closeRepository(previous);
}
Map<String, RepositoryHolder> newRepositories = new HashMap<>(repositories);
newRepositories.put(repositoryMetaData.name(), holder);
Map<String, Repository> newRepositories = new HashMap<>(repositories);
newRepositories.put(repositoryMetaData.name(), newRepo);
repositories = newRepositories;
return true;
}
/**
* Closes the repository
*
* @param name repository name
* @param holder repository holder
*/
private void closeRepository(String name, RepositoryHolder holder) throws IOException {
logger.debug("closing repository [{}][{}]", holder.type, name);
if (holder.repository != null) {
holder.repository.close();
}
/** Closes the given repository. */
private void closeRepository(Repository repository) throws IOException {
logger.debug("closing repository [{}][{}]", repository.getMetadata().type(), repository.getMetadata().name());
repository.close();
}
/**
* Creates repository holder
*/
private RepositoryHolder createRepositoryHolder(RepositoryMetaData repositoryMetaData) {
private Repository createRepository(RepositoryMetaData repositoryMetaData) {
logger.debug("creating repository [{}][{}]", repositoryMetaData.type(), repositoryMetaData.name());
Injector repositoryInjector = null;
Repository.Factory factory = typesRegistry.get(repositoryMetaData.type());
if (factory == null) {
throw new RepositoryException(repositoryMetaData.name(),
"repository type [" + repositoryMetaData.type() + "] does not exist");
}
try {
ModulesBuilder modules = new ModulesBuilder();
RepositoryName name = new RepositoryName(repositoryMetaData.type(), repositoryMetaData.name());
modules.add(b -> {
b.bind(RepositoryName.class).toInstance(name);
typesRegistry.bindType(b, repositoryMetaData.type());
b.bind(RepositorySettings.class).toInstance(new RepositorySettings(settings, repositoryMetaData.settings()));
});
repositoryInjector = modules.createChildInjector(injector);
Repository repository = repositoryInjector.getInstance(Repository.class);
Repository repository = factory.create(repositoryMetaData);
repository.start();
return new RepositoryHolder(repositoryMetaData.type(), repositoryMetaData.settings(), repository);
return repository;
} catch (Exception e) {
logger.warn("failed to create repository [{}][{}]", e, repositoryMetaData.type(), repositoryMetaData.name());
throw new RepositoryException(repositoryMetaData.name(), "failed to create repository", e);
@ -448,22 +434,6 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
}
}
/**
* Internal data structure for holding repository with its configuration information and injector
*/
private static class RepositoryHolder {
private final String type;
private final Settings settings;
private final Repository repository;
public RepositoryHolder(String type, Settings settings,Repository repository) {
this.type = type;
this.settings = settings;
this.repository = repository;
}
}
/**
* Register repository request
*/
@ -477,7 +447,7 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
final boolean verify;
Settings settings = EMPTY_SETTINGS;
Settings settings = Settings.EMPTY;
/**
* Constructs new register repository request

View File

@ -21,7 +21,10 @@ package org.elasticsearch.repositories;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.snapshots.SnapshotId;
@ -37,7 +40,7 @@ import java.util.List;
/**
* An interface for interacting with a repository in snapshot and restore.
* <p>
* Implementations are responsible for reading and writing both metadata and actual shard data to and from
* Implementations are responsible for reading and writing both metadata and shard data to and from
* a repository backend.
* <p>
* To perform a snapshot:
@ -51,6 +54,23 @@ import java.util.List;
*/
public interface Repository extends LifecycleComponent {
/**
* An factory interface for constructing repositories.
* See {@link org.elasticsearch.plugins.RepositoryPlugin}.
*/
interface Factory {
/**
* Constructs a repository.
* @param metadata metadata for the repository including name and settings
*/
Repository create(RepositoryMetaData metadata) throws Exception;
}
/**
* Returns metadata about this repository.
*/
RepositoryMetaData getMetadata();
/**
* Reads snapshot description from repository.
*

View File

@ -1,71 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories;
/**
* Combines together the name and type of the repository
*/
public class RepositoryName {
private final String type;
private final String name;
public RepositoryName(String type, String name) {
this.type = type;
this.name = name;
}
public String type() {
return this.type;
}
public String getType() {
return type();
}
public String name() {
return this.name;
}
public String getName() {
return name();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RepositoryName that = (RepositoryName) o;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
if (type != null ? !type.equals(that.type) : that.type != null) return false;
return true;
}
@Override
public int hashCode() {
int result = type != null ? type.hashCode() : 0;
result = 31 * result + (name != null ? name.hashCode() : 0);
return result;
}
}

View File

@ -1,47 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories;
import org.elasticsearch.common.inject.Binder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.ExtensionPoint;
/**
* A mapping from type name to implementations of {@link Repository}.
*/
public class RepositoryTypesRegistry {
// invariant: repositories and shardRepositories have the same keyset
private final ExtensionPoint.SelectedType<Repository> repositoryTypes =
new ExtensionPoint.SelectedType<>("repository", Repository.class);
/** Adds a new repository type to the registry, bound to the given implementation classes. */
public void registerRepository(String name, Class<? extends Repository> repositoryType) {
repositoryTypes.registerExtension(name, repositoryType);
}
/**
* Looks up the given type and binds the implementation into the given binder.
* Throws an {@link IllegalArgumentException} if the given type does not exist.
*/
public void bindType(Binder binder, String type) {
Settings settings = Settings.builder().put("type", type).build();
repositoryTypes.bindType(binder, settings, "type", null);
}
}

View File

@ -37,11 +37,13 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Numbers;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
@ -83,7 +85,6 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.snapshots.SnapshotCreationException;
import org.elasticsearch.snapshots.SnapshotException;
@ -154,7 +155,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
private BlobContainer snapshotsBlobContainer;
protected final String repositoryName;
protected final RepositoryMetaData metadata;
private static final int BUFFER_SIZE = 4096;
@ -225,16 +226,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
/**
* Constructs new BlobStoreRepository
*
* @param repositoryName repository name
* @param repositorySettings repository settings
* @param metadata The metadata for this repository including name and settings
* @param globalSettings Settings for the node this repository object is created on
*/
protected BlobStoreRepository(String repositoryName, RepositorySettings repositorySettings) {
super(repositorySettings.globalSettings());
this.repositoryName = repositoryName;
protected BlobStoreRepository(RepositoryMetaData metadata, Settings globalSettings) {
super(globalSettings);
this.metadata = metadata;
parseFieldMatcher = new ParseFieldMatcher(settings);
snapshotRateLimiter = getRateLimiter(repositorySettings, "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
restoreRateLimiter = getRateLimiter(repositorySettings, "max_restore_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
readOnly = repositorySettings.settings().getAsBoolean("readonly", false);
snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
readOnly = metadata.settings().getAsBoolean("readonly", false);
indexShardSnapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher, isCompress());
indexShardSnapshotLegacyFormat = new LegacyBlobStoreFormat<>(LEGACY_SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher);
indexShardSnapshotsFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_INDEX_CODEC, SNAPSHOT_INDEX_NAME_FORMAT, BlobStoreIndexShardSnapshots.PROTO, parseFieldMatcher, isCompress());
@ -299,37 +300,42 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
@Override
public void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData metaData) {
public RepositoryMetaData getMetadata() {
return metadata;
}
@Override
public void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData clusterMetadata) {
if (isReadOnly()) {
throw new RepositoryException(this.repositoryName, "cannot create snapshot in a readonly repository");
throw new RepositoryException(metadata.name(), "cannot create snapshot in a readonly repository");
}
try {
final String snapshotName = snapshotId.getName();
// check if the snapshot name already exists in the repository
if (getSnapshots().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
throw new SnapshotCreationException(repositoryName, snapshotId, "snapshot with the same name already exists");
throw new SnapshotCreationException(metadata.name(), snapshotId, "snapshot with the same name already exists");
}
if (snapshotFormat.exists(snapshotsBlobContainer, blobId(snapshotId)) ||
snapshotLegacyFormat.exists(snapshotsBlobContainer, snapshotName)) {
throw new SnapshotCreationException(repositoryName, snapshotId, "snapshot with such name already exists");
throw new SnapshotCreationException(metadata.name(), snapshotId, "snapshot with such name already exists");
}
// Write Global MetaData
globalMetaDataFormat.write(metaData, snapshotsBlobContainer, snapshotName);
globalMetaDataFormat.write(clusterMetadata, snapshotsBlobContainer, snapshotName);
for (String index : indices) {
final IndexMetaData indexMetaData = metaData.index(index);
final IndexMetaData indexMetaData = clusterMetadata.index(index);
final BlobPath indexPath = basePath().add("indices").add(index);
final BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath);
indexMetaDataFormat.write(indexMetaData, indexMetaDataBlobContainer, snapshotName);
}
} catch (IOException ex) {
throw new SnapshotCreationException(repositoryName, snapshotId, ex);
throw new SnapshotCreationException(metadata.name(), snapshotId, ex);
}
}
@Override
public void deleteSnapshot(SnapshotId snapshotId) {
if (isReadOnly()) {
throw new RepositoryException(this.repositoryName, "cannot delete snapshot from a readonly repository");
throw new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository");
}
List<String> indices = Collections.emptyList();
SnapshotInfo snapshot = null;
@ -391,7 +397,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
}
} catch (IOException ex) {
throw new RepositoryException(this.repositoryName, "failed to update snapshot in repository", ex);
throw new RepositoryException(metadata.name(), "failed to update snapshot in repository", ex);
}
}
@ -420,7 +426,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
return blobStoreSnapshot;
} catch (IOException ex) {
throw new RepositoryException(this.repositoryName, "failed to update snapshot in repository", ex);
throw new RepositoryException(metadata.name(), "failed to update snapshot in repository", ex);
}
}
@ -432,7 +438,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
// its a fresh repository, no index file exists, so return an empty list
return Collections.emptyList();
} catch (IOException ioe) {
throw new RepositoryException(repositoryName, "failed to list snapshots in repository", ioe);
throw new RepositoryException(metadata.name(), "failed to list snapshots in repository", ioe);
}
}
@ -450,12 +456,12 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
try {
return snapshotLegacyFormat.read(snapshotsBlobContainer, snapshotId.getName());
} catch (FileNotFoundException | NoSuchFileException ex1) {
throw new SnapshotMissingException(repositoryName, snapshotId, ex);
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
} catch (IOException | NotXContentException ex1) {
throw new SnapshotException(repositoryName, snapshotId, "failed to get snapshots", ex1);
throw new SnapshotException(metadata.name(), snapshotId, "failed to get snapshots", ex1);
}
} catch (IOException | NotXContentException ex) {
throw new SnapshotException(repositoryName, snapshotId, "failed to get snapshots", ex);
throw new SnapshotException(metadata.name(), snapshotId, "failed to get snapshots", ex);
}
}
@ -468,17 +474,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
if (globalMetaDataFormat.exists(snapshotsBlobContainer, snapshotId.getName())) {
snapshotVersion = Version.CURRENT;
} else if (globalMetaDataLegacyFormat.exists(snapshotsBlobContainer, snapshotId.getName())) {
throw new SnapshotException(repositoryName, snapshotId, "snapshot is too old");
throw new SnapshotException(metadata.name(), snapshotId, "snapshot is too old");
} else {
throw new SnapshotMissingException(repositoryName, snapshotId);
throw new SnapshotMissingException(metadata.name(), snapshotId);
}
}
try {
metaData = globalMetaDataFormat(snapshotVersion).read(snapshotsBlobContainer, snapshotId.getName());
} catch (FileNotFoundException | NoSuchFileException ex) {
throw new SnapshotMissingException(repositoryName, snapshotId, ex);
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
} catch (IOException ex) {
throw new SnapshotException(repositoryName, snapshotId, "failed to get snapshots", ex);
throw new SnapshotException(metadata.name(), snapshotId, "failed to get snapshots", ex);
}
MetaData.Builder metaDataBuilder = MetaData.builder(metaData);
for (String index : indices) {
@ -505,8 +511,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
* @param defaultRate default limiting rate
* @return rate limiter or null of no throttling is needed
*/
private RateLimiter getRateLimiter(RepositorySettings repositorySettings, String setting, ByteSizeValue defaultRate) {
ByteSizeValue maxSnapshotBytesPerSec = repositorySettings.settings().getAsBytesSize(setting,
private RateLimiter getRateLimiter(Settings repositorySettings, String setting, ByteSizeValue defaultRate) {
ByteSizeValue maxSnapshotBytesPerSec = repositorySettings.getAsBytesSize(setting,
settings.getAsBytesSize(setting, defaultRate));
if (maxSnapshotBytesPerSec.bytes() <= 0) {
return null;
@ -587,7 +593,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
return seed;
}
} catch (IOException exp) {
throw new RepositoryVerificationException(repositoryName, "path " + basePath() + " is not accessible on master node", exp);
throw new RepositoryVerificationException(metadata.name(), "path " + basePath() + " is not accessible on master node", exp);
}
}
@ -599,7 +605,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
try {
blobStore().delete(basePath().add(testBlobPrefix(seed)));
} catch (IOException exp) {
throw new RepositoryVerificationException(repositoryName, "cannot delete test data at " + basePath(), exp);
throw new RepositoryVerificationException(metadata.name(), "cannot delete test data at " + basePath(), exp);
}
}
@ -778,7 +784,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
} catch (NumberFormatException nfe) {
// the index- blob wasn't of the format index-N where N is a number,
// no idea what this blob is but it doesn't belong in the repository!
logger.debug("[{}] Unknown blob in the repository: {}", repositoryName, blobName);
logger.debug("[{}] Unknown blob in the repository: {}", metadata.name(), blobName);
}
}
return latest;
@ -848,10 +854,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
try {
testBlobContainer.writeBlob("data-" + localNode.getId() + ".dat", new BytesArray(seed));
} catch (IOException exp) {
throw new RepositoryVerificationException(repositoryName, "store location [" + blobStore() + "] is not accessible on the node [" + localNode + "]", exp);
throw new RepositoryVerificationException(metadata.name(), "store location [" + blobStore() + "] is not accessible on the node [" + localNode + "]", exp);
}
} else {
throw new RepositoryVerificationException(repositoryName, "a file written by master to the store [" + blobStore() + "] cannot be accessed on the node [" + localNode + "]. "
throw new RepositoryVerificationException(metadata.name(), "a file written by master to the store [" + blobStore() + "] cannot be accessed on the node [" + localNode + "]. "
+ "This might indicate that the store [" + blobStore() + "] is not shared between this node and the master node or "
+ "that permissions on the store don't allow reading files written by the master node");
}
@ -871,7 +877,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
@Override
public String toString() {
return "BlobStoreRepository[" +
"[" + repositoryName +
"[" + metadata.name() +
"], [" + blobStore() + ']' +
']';
}
@ -1121,7 +1127,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
* @param snapshotIndexCommit snapshot commit point
*/
public void snapshot(IndexCommit snapshotIndexCommit) {
logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, repositoryName);
logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, metadata.name());
store.incRef();
try {
final Map<String, BlobMetaData> blobs;
@ -1409,7 +1415,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
public void restore() throws IOException {
store.incRef();
try {
logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId);
logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, metadata.name(), shardId);
BlobStoreIndexShardSnapshot snapshot = loadSnapshot();
if (snapshot.indexFiles().size() == 1

View File

@ -19,17 +19,15 @@
package org.elasticsearch.repositories.fs;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.fs.FsBlobStore;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import java.io.IOException;
@ -72,62 +70,48 @@ public class FsRepository extends BlobStoreRepository {
private boolean compress;
/**
* Constructs new shared file system repository
*
* @param name repository name
* @param repositorySettings repository settings
* Constructs a shared file system repository.
*/
@Inject
public FsRepository(RepositoryName name, RepositorySettings repositorySettings, Environment environment) throws IOException {
super(name.getName(), repositorySettings);
Path locationFile;
String location = REPOSITORIES_LOCATION_SETTING.get(repositorySettings.settings());
public FsRepository(RepositoryMetaData metadata, Environment environment) throws IOException {
super(metadata, environment.settings());
String location = REPOSITORIES_LOCATION_SETTING.get(metadata.settings());
if (location.isEmpty()) {
logger.warn("the repository location is missing, it should point to a shared file system location that is available on all master and data nodes");
throw new RepositoryException(name.name(), "missing location");
throw new RepositoryException(metadata.name(), "missing location");
}
locationFile = environment.resolveRepoFile(location);
Path locationFile = environment.resolveRepoFile(location);
if (locationFile == null) {
if (environment.repoFiles().length > 0) {
logger.warn("The specified location [{}] doesn't start with any repository paths specified by the path.repo setting: [{}] ", location, environment.repoFiles());
throw new RepositoryException(name.name(), "location [" + location + "] doesn't match any of the locations specified by path.repo");
throw new RepositoryException(metadata.name(), "location [" + location + "] doesn't match any of the locations specified by path.repo");
} else {
logger.warn("The specified location [{}] should start with a repository path specified by the path.repo setting, but the path.repo setting was not set on this node", location);
throw new RepositoryException(name.name(), "location [" + location + "] doesn't match any of the locations specified by path.repo because this setting is empty");
throw new RepositoryException(metadata.name(), "location [" + location + "] doesn't match any of the locations specified by path.repo because this setting is empty");
}
}
blobStore = new FsBlobStore(settings, locationFile);
if (CHUNK_SIZE_SETTING.exists(repositorySettings.settings())) {
this.chunkSize = CHUNK_SIZE_SETTING.get(repositorySettings.settings());
if (CHUNK_SIZE_SETTING.exists(metadata.settings())) {
this.chunkSize = CHUNK_SIZE_SETTING.get(metadata.settings());
} else if (REPOSITORIES_CHUNK_SIZE_SETTING.exists(settings)) {
this.chunkSize = REPOSITORIES_CHUNK_SIZE_SETTING.get(settings);
} else {
this.chunkSize = null;
}
this.compress = COMPRESS_SETTING.exists(repositorySettings.settings()) ? COMPRESS_SETTING.get(repositorySettings.settings()) : REPOSITORIES_COMPRESS_SETTING.get(settings);
this.compress = COMPRESS_SETTING.exists(metadata.settings()) ? COMPRESS_SETTING.get(metadata.settings()) : REPOSITORIES_COMPRESS_SETTING.get(settings);
this.basePath = BlobPath.cleanPath();
}
/**
* {@inheritDoc}
*/
@Override
protected BlobStore blobStore() {
return blobStore;
}
/**
* {@inheritDoc}
*/
@Override
protected boolean isCompress() {
return compress;
}
/**
* {@inheritDoc}
*/
@Override
protected ByteSizeValue chunkSize() {
return chunkSize;

View File

@ -19,17 +19,15 @@
package org.elasticsearch.repositories.uri;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.url.URLBlobStore;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.util.URIPattern;
import org.elasticsearch.env.Environment;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import java.io.IOException;
@ -77,23 +75,19 @@ public class URLRepository extends BlobStoreRepository {
private final BlobPath basePath;
/**
* Constructs new read-only URL-based repository
*
* @param name repository name
* @param repositorySettings repository settings
* Constructs a read-only URL-based repository
*/
@Inject
public URLRepository(RepositoryName name, RepositorySettings repositorySettings, Environment environment) throws IOException {
super(name.getName(), repositorySettings);
public URLRepository(RepositoryMetaData metadata, Environment environment) throws IOException {
super(metadata, environment.settings());
if (URL_SETTING.exists(repositorySettings.settings()) == false && REPOSITORIES_URL_SETTING.exists(settings) == false) {
throw new RepositoryException(name.name(), "missing url");
if (URL_SETTING.exists(metadata.settings()) == false && REPOSITORIES_URL_SETTING.exists(settings) == false) {
throw new RepositoryException(metadata.name(), "missing url");
}
supportedProtocols = SUPPORTED_PROTOCOLS_SETTING.get(settings);
urlWhiteList = ALLOWED_URLS_SETTING.get(settings).toArray(new URIPattern[]{});
this.environment = environment;
URL url = URL_SETTING.exists(repositorySettings.settings()) ? URL_SETTING.get(repositorySettings.settings()) : REPOSITORIES_URL_SETTING.get(settings);
URL url = URL_SETTING.exists(metadata.settings()) ? URL_SETTING.get(metadata.settings()) : REPOSITORIES_URL_SETTING.get(settings);
URL normalizedURL = checkURL(url);
blobStore = new URLBlobStore(settings, normalizedURL);
basePath = BlobPath.cleanPath();
@ -115,7 +109,7 @@ public class URLRepository extends BlobStoreRepository {
private URL checkURL(URL url) {
String protocol = url.getProtocol();
if (protocol == null) {
throw new RepositoryException(repositoryName, "unknown url protocol from URL [" + url + "]");
throw new RepositoryException(getMetadata().name(), "unknown url protocol from URL [" + url + "]");
}
for (String supportedProtocol : supportedProtocols) {
if (supportedProtocol.equals(protocol)) {
@ -126,18 +120,18 @@ public class URLRepository extends BlobStoreRepository {
}
} catch (URISyntaxException ex) {
logger.warn("cannot parse the specified url [{}]", url);
throw new RepositoryException(repositoryName, "cannot parse the specified url [" + url + "]");
throw new RepositoryException(getMetadata().name(), "cannot parse the specified url [" + url + "]");
}
// We didn't match white list - try to resolve against path.repo
URL normalizedUrl = environment.resolveRepoURL(url);
if (normalizedUrl == null) {
logger.warn("The specified url [{}] doesn't start with any repository paths specified by the path.repo setting or by {} setting: [{}] ", url, ALLOWED_URLS_SETTING.getKey(), environment.repoFiles());
throw new RepositoryException(repositoryName, "file url [" + url + "] doesn't match any of the locations specified by path.repo or " + ALLOWED_URLS_SETTING.getKey());
throw new RepositoryException(getMetadata().name(), "file url [" + url + "] doesn't match any of the locations specified by path.repo or " + ALLOWED_URLS_SETTING.getKey());
}
return normalizedUrl;
}
}
throw new RepositoryException(repositoryName, "unsupported url protocol [" + protocol + "] from URL [" + url + "]");
throw new RepositoryException(getMetadata().name(), "unsupported url protocol [" + protocol + "] from URL [" + url + "]");
}
@Override

View File

@ -72,6 +72,7 @@ import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.RestoreSource;
@ -1653,6 +1654,10 @@ public class IndexShardTests extends ESSingleNodeTestCase {
@Override
protected void doClose() {}
@Override
public RepositoryMetaData getMetadata() {
return null;
}
@Override
public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) {
return null;
}

View File

@ -270,7 +270,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
final TransportService transportService = new TransportService(Settings.EMPTY, null, threadPool);
final ClusterService clusterService = mock(ClusterService.class);
final RepositoriesService repositoriesService = new RepositoriesService(Settings.EMPTY, clusterService,
transportService, null, null);
transportService, null);
final RecoveryTargetService recoveryTargetService = new RecoveryTargetService(Settings.EMPTY, threadPool,
transportService, null, clusterService);
final ShardStateAction shardStateAction = mock(ShardStateAction.class);

View File

@ -26,6 +26,7 @@ import java.nio.file.Path;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -34,34 +35,34 @@ import java.util.concurrent.atomic.AtomicLong;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.SnapshotId;
public class MockRepository extends FsRepository {
public static class Plugin extends org.elasticsearch.plugins.Plugin {
public static class Plugin extends org.elasticsearch.plugins.Plugin implements RepositoryPlugin {
public static final Setting<String> USERNAME_SETTING = Setting.simpleString("secret.mock.username", Property.NodeScope);
public static final Setting<String> PASSWORD_SETTING =
Setting.simpleString("secret.mock.password", Property.NodeScope, Property.Filtered);
public void onModule(RepositoriesModule repositoriesModule) {
repositoriesModule.registerRepository("mock", MockRepository.class);
@Override
public Map<String, Repository.Factory> getRepositories(Environment env) {
return Collections.singletonMap("mock", (metadata) -> new MockRepository(metadata, env));
}
@Override
@ -96,45 +97,41 @@ public class MockRepository extends FsRepository {
private volatile boolean blocked = false;
@Inject
public MockRepository(RepositoryName name, RepositorySettings repositorySettings, ClusterService clusterService, Environment environment) throws IOException {
super(name, overrideSettings(repositorySettings, clusterService), environment);
randomControlIOExceptionRate = repositorySettings.settings().getAsDouble("random_control_io_exception_rate", 0.0);
randomDataFileIOExceptionRate = repositorySettings.settings().getAsDouble("random_data_file_io_exception_rate", 0.0);
maximumNumberOfFailures = repositorySettings.settings().getAsLong("max_failure_number", 100L);
blockOnControlFiles = repositorySettings.settings().getAsBoolean("block_on_control", false);
blockOnDataFiles = repositorySettings.settings().getAsBoolean("block_on_data", false);
blockOnInitialization = repositorySettings.settings().getAsBoolean("block_on_init", false);
randomPrefix = repositorySettings.settings().get("random", "default");
waitAfterUnblock = repositorySettings.settings().getAsLong("wait_after_unblock", 0L);
public MockRepository(RepositoryMetaData metadata, Environment environment) throws IOException {
super(overrideSettings(metadata, environment), environment);
randomControlIOExceptionRate = metadata.settings().getAsDouble("random_control_io_exception_rate", 0.0);
randomDataFileIOExceptionRate = metadata.settings().getAsDouble("random_data_file_io_exception_rate", 0.0);
maximumNumberOfFailures = metadata.settings().getAsLong("max_failure_number", 100L);
blockOnControlFiles = metadata.settings().getAsBoolean("block_on_control", false);
blockOnDataFiles = metadata.settings().getAsBoolean("block_on_data", false);
blockOnInitialization = metadata.settings().getAsBoolean("block_on_init", false);
randomPrefix = metadata.settings().get("random", "default");
waitAfterUnblock = metadata.settings().getAsLong("wait_after_unblock", 0L);
logger.info("starting mock repository with random prefix {}", randomPrefix);
mockBlobStore = new MockBlobStore(super.blobStore());
}
@Override
public void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData metaData) {
public void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData clusterMetadata) {
if (blockOnInitialization ) {
blockExecution();
}
super.initializeSnapshot(snapshotId, indices, metaData);
super.initializeSnapshot(snapshotId, indices, clusterMetadata);
}
private static RepositorySettings overrideSettings(RepositorySettings repositorySettings, ClusterService clusterService) {
if (repositorySettings.settings().getAsBoolean("localize_location", false)) {
return new RepositorySettings(
repositorySettings.globalSettings(),
localizeLocation(repositorySettings.settings(), clusterService));
private static RepositoryMetaData overrideSettings(RepositoryMetaData metadata, Environment environment) {
// TODO: use another method of testing not being able to read the test file written by the master...
// this is super duper hacky
if (metadata.settings().getAsBoolean("localize_location", false)) {
Path location = PathUtils.get(metadata.settings().get("location"));
location = location.resolve(Integer.toString(environment.hashCode()));
return new RepositoryMetaData(metadata.name(), metadata.type(),
Settings.builder().put(metadata.settings()).put("location", location.toAbsolutePath()).build());
} else {
return repositorySettings;
return metadata;
}
}
private static Settings localizeLocation(Settings settings, ClusterService clusterService) {
Path location = PathUtils.get(settings.get("location"));
location = location.resolve(clusterService.localNode().getId());
return Settings.builder().put(settings).put("location", location.toAbsolutePath()).build();
}
private long incrementAndGetFailureCount() {
return failureCounter.incrementAndGet();
}

View File

@ -1,57 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.azure;
import org.elasticsearch.cloud.azure.storage.AzureStorageService;
import org.elasticsearch.cloud.azure.storage.AzureStorageServiceImpl;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
/**
* Azure Module
*
* <ul>
* <li>If needed this module will bind azure repository service by default
* to AzureStorageServiceImpl.</li>
* </ul>
*
* @see org.elasticsearch.cloud.azure.storage.AzureStorageServiceImpl
*/
public class AzureRepositoryModule extends AbstractModule {
protected final ESLogger logger;
// pkg private so it is settable by tests
static Class<? extends AzureStorageService> storageServiceImpl = AzureStorageServiceImpl.class;
@Inject
public AzureRepositoryModule(Settings settings) {
this.logger = Loggers.getLogger(getClass(), settings);
}
@Override
protected void configure() {
// If we have settings for azure repository, let's start the azure storage service
logger.debug("starting azure repository service");
bind(AzureStorageService.class).to(storageServiceImpl).asEagerSingleton();
}
}

View File

@ -23,16 +23,14 @@ import com.microsoft.azure.storage.LocationMode;
import com.microsoft.azure.storage.StorageException;
import org.elasticsearch.cloud.azure.storage.AzureStorageService;
import org.elasticsearch.cloud.azure.storage.AzureStorageService.Storage;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import java.io.IOException;
import java.io.InputStream;
@ -53,17 +51,15 @@ public class AzureBlobStore extends AbstractComponent implements BlobStore {
private final String container;
private final String repositoryName;
@Inject
public AzureBlobStore(RepositoryName name, Settings settings, RepositorySettings repositorySettings,
public AzureBlobStore(RepositoryMetaData metadata, Settings settings,
AzureStorageService client) throws URISyntaxException, StorageException {
super(settings);
this.client = client;
client.start();
this.container = getValue(repositorySettings, Repository.CONTAINER_SETTING, Storage.CONTAINER_SETTING);
this.repositoryName = name.getName();
this.accountName = getValue(repositorySettings, Repository.ACCOUNT_SETTING, Storage.ACCOUNT_SETTING);
this.container = getValue(metadata.settings(), settings, Repository.CONTAINER_SETTING, Storage.CONTAINER_SETTING);
this.repositoryName = metadata.name();
this.accountName = getValue(metadata.settings(), settings, Repository.ACCOUNT_SETTING, Storage.ACCOUNT_SETTING);
String modeStr = getValue(repositorySettings, Repository.LOCATION_MODE_SETTING, Storage.LOCATION_MODE_SETTING);
String modeStr = getValue(metadata.settings(), settings, Repository.LOCATION_MODE_SETTING, Storage.LOCATION_MODE_SETTING);
if (Strings.hasLength(modeStr)) {
this.locMode = LocationMode.valueOf(modeStr.toUpperCase(Locale.ROOT));
} else {

View File

@ -86,6 +86,4 @@ public interface AzureStorageService {
void moveBlob(String account, LocationMode mode, String container, String sourceBlob, String targetBlob)
throws URISyntaxException, StorageException;
void start();
}

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -45,15 +46,13 @@ import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
public class AzureStorageServiceImpl extends AbstractLifecycleComponent
implements AzureStorageService {
public class AzureStorageServiceImpl extends AbstractComponent implements AzureStorageService {
final AzureStorageSettings primaryStorageSettings;
final Map<String, AzureStorageSettings> secondariesStorageSettings;
final Map<String, CloudBlobClient> clients;
@Inject
public AzureStorageServiceImpl(Settings settings) {
super(settings);
@ -62,6 +61,20 @@ public class AzureStorageServiceImpl extends AbstractLifecycleComponent
this.secondariesStorageSettings = storageSettings.v2();
this.clients = new HashMap<>();
logger.debug("starting azure storage client instance");
// We register the primary client if any
if (primaryStorageSettings != null) {
logger.debug("registering primary client for account [{}]", primaryStorageSettings.getAccount());
createClient(primaryStorageSettings);
}
// We register all secondary clients
for (Map.Entry<String, AzureStorageSettings> azureStorageSettingsEntry : secondariesStorageSettings.entrySet()) {
logger.debug("registering secondary client for account [{}]", azureStorageSettingsEntry.getKey());
createClient(azureStorageSettingsEntry.getValue());
}
}
void createClient(AzureStorageSettings azureStorageSettings) {
@ -302,32 +315,4 @@ public class AzureStorageServiceImpl extends AbstractLifecycleComponent
logger.debug("moveBlob container [{}], sourceBlob [{}], targetBlob [{}] -> done", container, sourceBlob, targetBlob);
}
}
@Override
protected void doStart() throws ElasticsearchException {
logger.debug("starting azure storage client instance");
// We register the primary client if any
if (primaryStorageSettings != null) {
logger.debug("registering primary client for account [{}]", primaryStorageSettings.getAccount());
createClient(primaryStorageSettings);
}
// We register all secondary clients
for (Map.Entry<String, AzureStorageSettings> azureStorageSettingsEntry : secondariesStorageSettings.entrySet()) {
logger.debug("registering secondary client for account [{}]", azureStorageSettingsEntry.getKey());
createClient(azureStorageSettingsEntry.getValue());
}
}
@Override
protected void doStop() throws ElasticsearchException {
logger.debug("stopping azure storage client instance");
// We should stop all clients but it does sound like CloudBlobClient has
// any shutdown method...
}
@Override
protected void doClose() throws ElasticsearchException {
}
}

View File

@ -25,7 +25,6 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.repositories.RepositorySettings;
import java.util.ArrayList;
import java.util.Collections;
@ -172,20 +171,21 @@ public final class AzureStorageSettings {
return Collections.unmodifiableMap(secondaries);
}
public static <T> T getValue(RepositorySettings repositorySettings,
public static <T> T getValue(Settings repositorySettings,
Settings globalSettings,
Setting<T> repositorySetting,
Setting<T> repositoriesSetting) {
if (repositorySetting.exists(repositorySettings.settings())) {
return repositorySetting.get(repositorySettings.settings());
if (repositorySetting.exists(repositorySettings)) {
return repositorySetting.get(repositorySettings);
} else {
return repositoriesSetting.get(repositorySettings.globalSettings());
return repositoriesSetting.get(globalSettings);
}
}
public static <T> Setting<T> getEffectiveSetting(RepositorySettings repositorySettings,
public static <T> Setting<T> getEffectiveSetting(Settings repositorySettings,
Setting<T> repositorySetting,
Setting<T> repositoriesSetting) {
if (repositorySetting.exists(repositorySettings.settings())) {
if (repositorySetting.exists(repositorySettings)) {
return repositorySetting;
} else {
return repositoriesSetting;

View File

@ -20,42 +20,36 @@
package org.elasticsearch.plugin.repository.azure;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.elasticsearch.cloud.azure.AzureRepositoryModule;
import org.elasticsearch.cloud.azure.storage.AzureStorageService;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.cloud.azure.storage.AzureStorageServiceImpl;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.azure.AzureRepository;
/**
*
* A plugin to add a repository type that writes to and from the Azure cloud storage service.
*/
public class AzureRepositoryPlugin extends Plugin {
public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin {
private final Settings settings;
protected final ESLogger logger = Loggers.getLogger(AzureRepositoryPlugin.class);
public AzureRepositoryPlugin(Settings settings) {
this.settings = settings;
logger.trace("starting azure repository plugin...");
// overridable for tests
protected AzureStorageService createStorageService(Settings settings) {
return new AzureStorageServiceImpl(settings);
}
@Override
public Collection<Module> nodeModules() {
return Collections.singletonList((Module) new AzureRepositoryModule(settings));
}
public void onModule(RepositoriesModule module) {
logger.debug("registering repository type [{}]", AzureRepository.TYPE);
module.registerRepository(AzureRepository.TYPE, AzureRepository.class);
public Map<String, Repository.Factory> getRepositories(Environment env) {
return Collections.singletonMap(AzureRepository.TYPE,
(metadata) -> new AzureRepository(metadata, env, createStorageService(env.settings())));
}
@Override

View File

@ -19,33 +19,33 @@
package org.elasticsearch.repositories.azure;
import com.microsoft.azure.storage.LocationMode;
import com.microsoft.azure.storage.StorageException;
import org.elasticsearch.cloud.azure.blobstore.AzureBlobStore;
import org.elasticsearch.cloud.azure.storage.AzureStorageService.Storage;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotCreationException;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Locale;
import java.util.function.Function;
import com.microsoft.azure.storage.LocationMode;
import com.microsoft.azure.storage.StorageException;
import org.elasticsearch.cloud.azure.blobstore.AzureBlobStore;
import org.elasticsearch.cloud.azure.storage.AzureStorageService;
import org.elasticsearch.cloud.azure.storage.AzureStorageService.Storage;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotCreationException;
import org.elasticsearch.snapshots.SnapshotId;
import static org.elasticsearch.cloud.azure.storage.AzureStorageSettings.getEffectiveSetting;
import static org.elasticsearch.cloud.azure.storage.AzureStorageSettings.getValue;
@ -83,24 +83,22 @@ public class AzureRepository extends BlobStoreRepository {
private final boolean compress;
private final boolean readonly;
@Inject
public AzureRepository(RepositoryName name, RepositorySettings repositorySettings,
AzureBlobStore azureBlobStore) throws IOException, URISyntaxException, StorageException {
super(name.getName(), repositorySettings);
public AzureRepository(RepositoryMetaData metadata, Environment environment, AzureStorageService storageService)
throws IOException, URISyntaxException, StorageException {
super(metadata, environment.settings());
String container = getValue(repositorySettings, Repository.CONTAINER_SETTING, Storage.CONTAINER_SETTING);
this.blobStore = azureBlobStore;
ByteSizeValue configuredChunkSize = getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Storage.CHUNK_SIZE_SETTING);
blobStore = new AzureBlobStore(metadata, environment.settings(), storageService);
String container = getValue(metadata.settings(), settings, Repository.CONTAINER_SETTING, Storage.CONTAINER_SETTING);
ByteSizeValue configuredChunkSize = getValue(metadata.settings(), settings, Repository.CHUNK_SIZE_SETTING, Storage.CHUNK_SIZE_SETTING);
if (configuredChunkSize.getMb() > MAX_CHUNK_SIZE.getMb()) {
Setting<ByteSizeValue> setting = getEffectiveSetting(repositorySettings, Repository.CHUNK_SIZE_SETTING, Storage.CHUNK_SIZE_SETTING);
Setting<ByteSizeValue> setting = getEffectiveSetting(metadata.settings(), Repository.CHUNK_SIZE_SETTING, Storage.CHUNK_SIZE_SETTING);
throw new SettingsException("[" + setting.getKey() + "] must not exceed [" + MAX_CHUNK_SIZE + "] but is set to [" + configuredChunkSize + "].");
} else {
this.chunkSize = configuredChunkSize;
}
this.compress = getValue(repositorySettings, Repository.COMPRESS_SETTING, Storage.COMPRESS_SETTING);
String modeStr = getValue(repositorySettings, Repository.LOCATION_MODE_SETTING, Storage.LOCATION_MODE_SETTING);
this.compress = getValue(metadata.settings(), settings, Repository.COMPRESS_SETTING, Storage.COMPRESS_SETTING);
String modeStr = getValue(metadata.settings(), settings, Repository.LOCATION_MODE_SETTING, Storage.LOCATION_MODE_SETTING);
if (Strings.hasLength(modeStr)) {
LocationMode locationMode = LocationMode.valueOf(modeStr.toUpperCase(Locale.ROOT));
readonly = locationMode == LocationMode.SECONDARY_ONLY;
@ -108,7 +106,7 @@ public class AzureRepository extends BlobStoreRepository {
readonly = false;
}
String basePath = getValue(repositorySettings, Repository.BASE_PATH_SETTING, Storage.BASE_PATH_SETTING);
String basePath = getValue(metadata.settings(), settings, Repository.BASE_PATH_SETTING, Storage.BASE_PATH_SETTING);
if (Strings.hasLength(basePath)) {
// Remove starting / if any
@ -155,16 +153,16 @@ public class AzureRepository extends BlobStoreRepository {
}
@Override
public void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData metaData) {
public void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData clusterMetadata) {
try {
if (!blobStore.doesContainerExist(blobStore.container())) {
logger.debug("container [{}] does not exist. Creating...", blobStore.container());
blobStore.createContainer(blobStore.container());
}
super.initializeSnapshot(snapshotId, indices, metaData);
super.initializeSnapshot(snapshotId, indices, clusterMetadata);
} catch (StorageException | URISyntaxException e) {
logger.warn("can not initialize container [{}]: [{}]", blobStore.container(), e.getMessage());
throw new SnapshotCreationException(repositoryName, snapshotId, e);
throw new SnapshotCreationException(getMetadata().name(), snapshotId, e);
}
}
@ -178,7 +176,7 @@ public class AzureRepository extends BlobStoreRepository {
}
} catch (StorageException | URISyntaxException e) {
logger.warn("can not initialize container [{}]: [{}]", blobStore.container(), e.getMessage());
throw new RepositoryVerificationException(repositoryName, "can not initialize container " + blobStore.container(), e);
throw new RepositoryVerificationException(getMetadata().name(), "can not initialize container " + blobStore.container(), e);
}
}
return super.startVerification();

View File

@ -40,9 +40,12 @@ import java.util.Collection;
public abstract class AbstractAzureRepositoryServiceIntegTestCase extends AbstractAzureIntegTestCase {
public static class TestPlugin extends Plugin {
public void onModule(AzureRepositoryModule azureRepositoryModule) {
AzureRepositoryModule.storageServiceImpl = AzureStorageServiceMock.class;
private static final AzureStorageService storageService = new AzureStorageServiceMock();
public static class TestPlugin extends AzureRepositoryPlugin {
@Override
protected AzureStorageService createStorageService(Settings settings) {
return storageService;
}
}
@ -78,7 +81,7 @@ public abstract class AbstractAzureRepositoryServiceIntegTestCase extends Abstra
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return pluginList(AzureRepositoryPlugin.class, TestPlugin.class, MockFSIndexStore.TestPlugin.class);
return pluginList(TestPlugin.class, MockFSIndexStore.TestPlugin.class);
}
@Override
@ -104,7 +107,6 @@ public abstract class AbstractAzureRepositoryServiceIntegTestCase extends Abstra
public void cleanRepositoryFiles(String path) throws StorageException, URISyntaxException {
String container = internalCluster().getInstance(Settings.class).get("repositories.azure.container");
logger.info("--> remove blobs in container [{}]", container);
AzureStorageService client = internalCluster().getInstance(AzureStorageService.class);
client.deleteFiles(null, LocationMode.PRIMARY_ONLY, container, path);
storageService.deleteFiles(null, LocationMode.PRIMARY_ONLY, container, path);
}
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -43,14 +44,12 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* In memory storage for unit tests
*/
public class AzureStorageServiceMock extends AbstractLifecycleComponent
implements AzureStorageService {
public class AzureStorageServiceMock extends AbstractComponent implements AzureStorageService {
protected Map<String, ByteArrayOutputStream> blobs = new ConcurrentHashMap<>();
@Inject
public AzureStorageServiceMock(Settings settings) {
super(settings);
public AzureStorageServiceMock() {
super(Settings.EMPTY);
}
@Override
@ -124,18 +123,6 @@ public class AzureStorageServiceMock extends AbstractLifecycleComponent
}
}
@Override
protected void doStart() throws ElasticsearchException {
}
@Override
protected void doStop() throws ElasticsearchException {
}
@Override
protected void doClose() throws ElasticsearchException {
}
/**
* Test if the given String starts with the specified prefix,
* ignoring upper/lower case.

View File

@ -45,7 +45,6 @@ public class AzureStorageServiceTests extends ESTestCase {
public void testGetSelectedClientWithNoPrimaryAndSecondary() {
AzureStorageServiceImpl azureStorageService = new AzureStorageServiceMock(Settings.EMPTY);
azureStorageService.doStart();
try {
azureStorageService.getSelectedClient("whatever", LocationMode.PRIMARY_ONLY);
fail("we should have raised an IllegalArgumentException");
@ -59,7 +58,6 @@ public class AzureStorageServiceTests extends ESTestCase {
.put("cloud.azure.storage.azure1.account", "myaccount1")
.put("cloud.azure.storage.azure1.key", "mykey1")
.build());
azureStorageService.doStart();
CloudBlobClient client = azureStorageService.getSelectedClient("azure1", LocationMode.PRIMARY_ONLY);
assertThat(client.getEndpoint(), is(URI.create("https://azure1")));
}
@ -69,42 +67,36 @@ public class AzureStorageServiceTests extends ESTestCase {
.put("cloud.azure.storage.azure1.account", "myaccount1")
.put("cloud.azure.storage.azure1.key", "mykey1")
.build());
azureStorageService.doStart();
CloudBlobClient client = azureStorageService.getSelectedClient(null, LocationMode.PRIMARY_ONLY);
assertThat(client.getEndpoint(), is(URI.create("https://azure1")));
}
public void testGetSelectedClientPrimary() {
AzureStorageServiceImpl azureStorageService = new AzureStorageServiceMock(settings);
azureStorageService.doStart();
CloudBlobClient client = azureStorageService.getSelectedClient("azure1", LocationMode.PRIMARY_ONLY);
assertThat(client.getEndpoint(), is(URI.create("https://azure1")));
}
public void testGetSelectedClientSecondary1() {
AzureStorageServiceImpl azureStorageService = new AzureStorageServiceMock(settings);
azureStorageService.doStart();
CloudBlobClient client = azureStorageService.getSelectedClient("azure2", LocationMode.PRIMARY_ONLY);
assertThat(client.getEndpoint(), is(URI.create("https://azure2")));
}
public void testGetSelectedClientSecondary2() {
AzureStorageServiceImpl azureStorageService = new AzureStorageServiceMock(settings);
azureStorageService.doStart();
CloudBlobClient client = azureStorageService.getSelectedClient("azure3", LocationMode.PRIMARY_ONLY);
assertThat(client.getEndpoint(), is(URI.create("https://azure3")));
}
public void testGetDefaultClientWithPrimaryAndSecondaries() {
AzureStorageServiceImpl azureStorageService = new AzureStorageServiceMock(settings);
azureStorageService.doStart();
CloudBlobClient client = azureStorageService.getSelectedClient(null, LocationMode.PRIMARY_ONLY);
assertThat(client.getEndpoint(), is(URI.create("https://azure1")));
}
public void testGetSelectedClientNonExisting() {
AzureStorageServiceImpl azureStorageService = new AzureStorageServiceMock(settings);
azureStorageService.doStart();
try {
azureStorageService.getSelectedClient("azure4", LocationMode.PRIMARY_ONLY);
fail("we should have raised an IllegalArgumentException");
@ -115,7 +107,6 @@ public class AzureStorageServiceTests extends ESTestCase {
public void testGetSelectedClientDefault() {
AzureStorageServiceImpl azureStorageService = new AzureStorageServiceMock(settings);
azureStorageService.doStart();
CloudBlobClient client = azureStorageService.getSelectedClient(null, LocationMode.PRIMARY_ONLY);
assertThat(client.getEndpoint(), is(URI.create("https://azure1")));
}
@ -127,7 +118,6 @@ public class AzureStorageServiceTests extends ESTestCase {
.build();
AzureStorageServiceImpl azureStorageService = new AzureStorageServiceMock(timeoutSettings);
azureStorageService.doStart();
CloudBlobClient client1 = azureStorageService.getSelectedClient("azure1", LocationMode.PRIMARY_ONLY);
assertThat(client1.getDefaultRequestOptions().getTimeoutIntervalInMs(), is(10 * 1000));
CloudBlobClient client3 = azureStorageService.getSelectedClient("azure3", LocationMode.PRIMARY_ONLY);
@ -136,7 +126,6 @@ public class AzureStorageServiceTests extends ESTestCase {
public void testGetSelectedClientDefaultTimeout() {
AzureStorageServiceImpl azureStorageService = new AzureStorageServiceMock(settings);
azureStorageService.doStart();
CloudBlobClient client1 = azureStorageService.getSelectedClient("azure1", LocationMode.PRIMARY_ONLY);
assertThat(client1.getDefaultRequestOptions().getTimeoutIntervalInMs(), nullValue());
CloudBlobClient client3 = azureStorageService.getSelectedClient("azure3", LocationMode.PRIMARY_ONLY);
@ -150,7 +139,6 @@ public class AzureStorageServiceTests extends ESTestCase {
.build();
AzureStorageServiceImpl azureStorageService = new AzureStorageServiceMock(timeoutSettings);
azureStorageService.doStart();
CloudBlobClient client1 = azureStorageService.getSelectedClient("azure", LocationMode.PRIMARY_ONLY);
assertThat(client1.getDefaultRequestOptions().getTimeoutIntervalInMs(), is(nullValue()));
}

View File

@ -46,7 +46,7 @@ public class AzureStorageSettingsFilterTests extends ESTestCase {
.build();
public void testSettingsFiltering() throws IOException {
AzureRepositoryPlugin p = new AzureRepositoryPlugin(Settings.EMPTY);
AzureRepositoryPlugin p = new AzureRepositoryPlugin();
SettingsModule module = new SettingsModule(Settings.EMPTY, p.getSettings(), p.getSettingsFilter());
SettingsFilter settingsFilter = ModuleTestCase.bindAndGetInstance(module, SettingsFilter.class);

View File

@ -23,11 +23,10 @@ import com.microsoft.azure.storage.StorageException;
import org.elasticsearch.cloud.azure.blobstore.AzureBlobStore;
import org.elasticsearch.cloud.azure.storage.AzureStorageService;
import org.elasticsearch.cloud.azure.storage.AzureStorageServiceImpl;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.ESBlobStoreTestCase;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.test.ESIntegTestCase;
import java.io.IOException;
@ -44,11 +43,10 @@ public class AzureBlobStoreTests extends ESBlobStoreTestCase {
@Override
protected BlobStore newBlobStore() throws IOException {
try {
RepositoryName repositoryName = new RepositoryName("azure", "ittest");
Settings settings = readSettingsFromFile();
RepositorySettings repositorySettings = new RepositorySettings(settings, Settings.builder().build());
RepositoryMetaData metadata = new RepositoryMetaData("ittest", "azure", Settings.EMPTY);
AzureStorageService storageService = new AzureStorageServiceImpl(settings);
AzureBlobStore blobStore = new AzureBlobStore(repositoryName, settings, repositorySettings, storageService);
AzureBlobStore blobStore = new AzureBlobStore(metadata, settings, storageService);
blobStore.createContainer(blobStore.container());
return blobStore;
} catch (URISyntaxException | StorageException e) {

View File

@ -483,7 +483,7 @@ public class AzureSnapshotRestoreTests extends AbstractAzureWithThirdPartyIntegT
*/
public void testRemoveAndCreateContainer() throws Exception {
final String container = getContainerName().concat("-testremove");
final AzureStorageService storageService = internalCluster().getInstance(AzureStorageService.class);
final AzureStorageService storageService = new AzureStorageServiceImpl(internalCluster().getDefaultSettings());
// It could happen that we run this test really close to a previous one
// so we might need some time to be able to create the container

View File

@ -1,31 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.repository.gcs;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.repositories.gcs.GoogleCloudStorageService;
public class GoogleCloudStorageModule extends AbstractModule {
@Override
protected void configure() {
bind(GoogleCloudStorageService.class).to(GoogleCloudStorageService.InternalGoogleCloudStorageService.class).asEagerSingleton();
}
}

View File

@ -23,6 +23,7 @@ import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import com.google.api.client.auth.oauth2.TokenRequest;
import com.google.api.client.auth.oauth2.TokenResponse;
@ -40,11 +41,16 @@ import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.StorageObject;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository;
import org.elasticsearch.repositories.gcs.GoogleCloudStorageService;
public class GoogleCloudStoragePlugin extends Plugin {
public class GoogleCloudStoragePlugin extends Plugin implements RepositoryPlugin {
public static final String NAME = "repository-gcs";
@ -108,12 +114,14 @@ public class GoogleCloudStoragePlugin extends Plugin {
});
}
@Override
public Collection<Module> nodeModules() {
return Collections.singletonList(new GoogleCloudStorageModule());
// overridable for tests
protected GoogleCloudStorageService createStorageService(Environment environment) {
return new GoogleCloudStorageService.InternalGoogleCloudStorageService(environment);
}
public void onModule(RepositoriesModule repositoriesModule) {
repositoriesModule.registerRepository(GoogleCloudStorageRepository.TYPE, GoogleCloudStorageRepository.class);
@Override
public Map<String, Repository.Factory> getRepositories(Environment env) {
return Collections.singletonMap(GoogleCloudStorageRepository.TYPE,
(metadata) -> new GoogleCloudStorageRepository(metadata, env, createStorageService(env)));
}
}

View File

@ -20,19 +20,18 @@
package org.elasticsearch.repositories.gcs;
import com.google.api.services.storage.Storage;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.gcs.GoogleCloudStorageBlobStore;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugin.repository.gcs.GoogleCloudStoragePlugin;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import java.util.function.Function;
@ -72,16 +71,15 @@ public class GoogleCloudStorageRepository extends BlobStoreRepository {
private final BlobPath basePath;
private final GoogleCloudStorageBlobStore blobStore;
@Inject
public GoogleCloudStorageRepository(RepositoryName repositoryName, RepositorySettings repositorySettings,
public GoogleCloudStorageRepository(RepositoryMetaData metadata, Environment environment,
GoogleCloudStorageService storageService) throws Exception {
super(repositoryName.getName(), repositorySettings);
super(metadata, environment.settings());
String bucket = get(BUCKET, repositoryName, repositorySettings);
String application = get(APPLICATION_NAME, repositoryName, repositorySettings);
String serviceAccount = get(SERVICE_ACCOUNT, repositoryName, repositorySettings);
String bucket = get(BUCKET, metadata);
String application = get(APPLICATION_NAME, metadata);
String serviceAccount = get(SERVICE_ACCOUNT, metadata);
String basePath = BASE_PATH.get(repositorySettings.settings());
String basePath = BASE_PATH.get(metadata.settings());
if (Strings.hasLength(basePath)) {
BlobPath path = new BlobPath();
for (String elem : basePath.split("/")) {
@ -95,18 +93,18 @@ public class GoogleCloudStorageRepository extends BlobStoreRepository {
TimeValue connectTimeout = null;
TimeValue readTimeout = null;
TimeValue timeout = HTTP_CONNECT_TIMEOUT.get(repositorySettings.settings());
TimeValue timeout = HTTP_CONNECT_TIMEOUT.get(metadata.settings());
if ((timeout != null) && (timeout.millis() != NO_TIMEOUT.millis())) {
connectTimeout = timeout;
}
timeout = HTTP_READ_TIMEOUT.get(repositorySettings.settings());
timeout = HTTP_READ_TIMEOUT.get(metadata.settings());
if ((timeout != null) && (timeout.millis() != NO_TIMEOUT.millis())) {
readTimeout = timeout;
}
this.compress = get(COMPRESS, repositoryName, repositorySettings);
this.chunkSize = get(CHUNK_SIZE, repositoryName, repositorySettings);
this.compress = get(COMPRESS, metadata);
this.chunkSize = get(CHUNK_SIZE, metadata);
logger.debug("using bucket [{}], base_path [{}], chunk_size [{}], compress [{}], application [{}]",
bucket, basePath, chunkSize, compress, application);
@ -139,13 +137,13 @@ public class GoogleCloudStorageRepository extends BlobStoreRepository {
/**
* Get a given setting from the repository settings, throwing a {@link RepositoryException} if the setting does not exist or is empty.
*/
static <T> T get(Setting<T> setting, RepositoryName name, RepositorySettings repositorySettings) {
T value = setting.get(repositorySettings.settings());
static <T> T get(Setting<T> setting, RepositoryMetaData metadata) {
T value = setting.get(metadata.settings());
if (value == null) {
throw new RepositoryException(name.getName(), "Setting [" + setting.getKey() + "] is not defined for repository");
throw new RepositoryException(metadata.name(), "Setting [" + setting.getKey() + "] is not defined for repository");
}
if ((value instanceof String) && (Strings.hasText((String) value)) == false) {
throw new RepositoryException(name.getName(), "Setting [" + setting.getKey() + "] is empty for repository");
throw new RepositoryException(metadata.name(), "Setting [" + setting.getKey() + "] is empty for repository");
}
return value;
}

View File

@ -67,9 +67,8 @@ public interface GoogleCloudStorageService {
private final Environment environment;
@Inject
public InternalGoogleCloudStorageService(Settings settings, Environment environment) {
super(settings);
public InternalGoogleCloudStorageService(Environment environment) {
super(environment.settings());
this.environment = environment;
}

View File

@ -21,18 +21,16 @@ package org.elasticsearch.repositories.gcs;
import com.google.api.services.storage.Storage;
import org.elasticsearch.common.blobstore.gcs.MockHttpTransport;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.plugin.repository.gcs.GoogleCloudStorageModule;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugin.repository.gcs.GoogleCloudStoragePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.ESBlobStoreRepositoryIntegTestCase;
import org.junit.BeforeClass;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@ -68,25 +66,13 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepos
}
public static class MockGoogleCloudStoragePlugin extends GoogleCloudStoragePlugin {
public MockGoogleCloudStoragePlugin() {
}
@Override
public Collection<Module> nodeModules() {
return Collections.singletonList(new MockGoogleCloudStorageModule());
}
}
public static class MockGoogleCloudStorageModule extends GoogleCloudStorageModule {
@Override
protected void configure() {
bind(GoogleCloudStorageService.class).to(MockGoogleCloudStorageService.class).asEagerSingleton();
protected GoogleCloudStorageService createStorageService(Environment environment) {
return new MockGoogleCloudStorageService();
}
}
public static class MockGoogleCloudStorageService implements GoogleCloudStorageService {
@Override
public Storage createClient(String serviceAccount, String application, TimeValue connectTimeout, TimeValue readTimeout) throws
Exception {

View File

@ -23,14 +23,18 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Collections;
import java.util.Map;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.Repository;
// Code
public final class HdfsPlugin extends Plugin {
public final class HdfsPlugin extends Plugin implements RepositoryPlugin {
// initialize some problematic classes with elevated privileges
static {
@ -83,7 +87,8 @@ public final class HdfsPlugin extends Plugin {
return null;
}
public void onModule(RepositoriesModule repositoriesModule) {
repositoriesModule.registerRepository("hdfs", HdfsRepository.class);
@Override
public Map<String, Repository.Factory> getRepositories(Environment env) {
return Collections.singletonMap("hdfs", (metadata) -> new HdfsRepository(metadata, env));
}
}

View File

@ -37,21 +37,20 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
public final class HdfsRepository extends BlobStoreRepository {
private final BlobPath basePath = BlobPath.cleanPath();
private final RepositorySettings repositorySettings;
private final ByteSizeValue chunkSize;
private final boolean compress;
@ -61,18 +60,16 @@ public final class HdfsRepository extends BlobStoreRepository {
// TODO: why 100KB?
private static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(100, ByteSizeUnit.KB);
@Inject
public HdfsRepository(RepositoryName name, RepositorySettings repositorySettings) throws IOException {
super(name.getName(), repositorySettings);
this.repositorySettings = repositorySettings;
public HdfsRepository(RepositoryMetaData metadata, Environment environment) throws IOException {
super(metadata, environment.settings());
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", null);
this.compress = repositorySettings.settings().getAsBoolean("compress", false);
this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null);
this.compress = metadata.settings().getAsBoolean("compress", false);
}
@Override
protected void doStart() {
String uriSetting = repositorySettings.settings().get("uri");
String uriSetting = getMetadata().settings().get("uri");
if (Strings.hasText(uriSetting) == false) {
throw new IllegalArgumentException("No 'uri' defined for hdfs snapshot/restore");
}
@ -86,13 +83,13 @@ public final class HdfsRepository extends BlobStoreRepository {
"Use 'path' option to specify a path [%s], not the uri [%s] for hdfs snapshot/restore", uri.getPath(), uriSetting));
}
String pathSetting = repositorySettings.settings().get("path");
String pathSetting = getMetadata().settings().get("path");
// get configuration
if (pathSetting == null) {
throw new IllegalArgumentException("No 'path' defined for hdfs snapshot/restore");
}
int bufferSize = repositorySettings.settings().getAsBytesSize("buffer_size", DEFAULT_BUFFER_SIZE).bytesAsInt();
int bufferSize = getMetadata().settings().getAsBytesSize("buffer_size", DEFAULT_BUFFER_SIZE).bytesAsInt();
try {
// initialize our filecontext
@ -103,7 +100,7 @@ public final class HdfsRepository extends BlobStoreRepository {
FileContext fileContext = AccessController.doPrivileged(new PrivilegedAction<FileContext>() {
@Override
public FileContext run() {
return createContext(uri, repositorySettings);
return createContext(uri, getMetadata().settings());
}
});
blobStore = new HdfsBlobStore(fileContext, pathSetting, bufferSize);
@ -116,12 +113,12 @@ public final class HdfsRepository extends BlobStoreRepository {
// create hadoop filecontext
@SuppressForbidden(reason = "lesser of two evils (the other being a bunch of JNI/classloader nightmares)")
private static FileContext createContext(URI uri, RepositorySettings repositorySettings) {
Configuration cfg = new Configuration(repositorySettings.settings().getAsBoolean("load_defaults", true));
private static FileContext createContext(URI uri, Settings repositorySettings) {
Configuration cfg = new Configuration(repositorySettings.getAsBoolean("load_defaults", true));
cfg.setClassLoader(HdfsRepository.class.getClassLoader());
cfg.reloadConfiguration();
Map<String, String> map = repositorySettings.settings().getByPrefix("conf.").getAsMap();
Map<String, String> map = repositorySettings.getByPrefix("conf.").getAsMap();
for (Entry<String, String> entry : map.entrySet()) {
cfg.set(entry.getKey(), entry.getValue());
}

View File

@ -51,7 +51,6 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent implements
*/
private Map<Tuple<String, String>, AmazonS3Client> clients = new HashMap<>();
@Inject
public InternalAwsS3Service(Settings settings) {
super(settings);
}

View File

@ -1,38 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws;
import org.elasticsearch.common.inject.AbstractModule;
public class S3Module extends AbstractModule {
// pkg private so it is settable by tests
static Class<? extends AwsS3Service> s3ServiceImpl = InternalAwsS3Service.class;
public static Class<? extends AwsS3Service> getS3ServiceImpl() {
return s3ServiceImpl;
}
@Override
protected void configure() {
bind(AwsS3Service.class).to(s3ServiceImpl).asEagerSingleton();
}
}

View File

@ -21,26 +21,26 @@ package org.elasticsearch.plugin.repository.s3;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cloud.aws.AwsS3Service;
import org.elasticsearch.cloud.aws.S3Module;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.cloud.aws.InternalAwsS3Service;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.s3.S3Repository;
/**
*
* A plugin to add a repository type that writes to and from the AWS S3.
*/
public class S3RepositoryPlugin extends Plugin {
public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin {
// ClientConfiguration clinit has some classloader problems
// TODO: fix that
@ -62,21 +62,15 @@ public class S3RepositoryPlugin extends Plugin {
});
}
@Override
public Collection<Module> nodeModules() {
Collection<Module> modules = new ArrayList<>();
modules.add(new S3Module());
return modules;
// overridable for tests
protected AwsS3Service createStorageService(Settings settings) {
return new InternalAwsS3Service(settings);
}
@Override
@SuppressWarnings("rawtypes") // Supertype declaration has raw types
public Collection<Class<? extends LifecycleComponent>> nodeServices() {
return Collections.<Class<? extends LifecycleComponent>>singleton(S3Module.getS3ServiceImpl());
}
public void onModule(RepositoriesModule repositoriesModule) {
repositoriesModule.registerRepository(S3Repository.TYPE, S3Repository.class);
public Map<String, Repository.Factory> getRepositories(Environment env) {
return Collections.singletonMap(S3Repository.TYPE,
(metadata) -> new S3Repository(metadata, env.settings(), createStorageService(env.settings())));
}
@Override

View File

@ -24,17 +24,16 @@ import com.amazonaws.Protocol;
import org.elasticsearch.cloud.aws.AwsS3Service;
import org.elasticsearch.cloud.aws.AwsS3Service.CLOUD_S3;
import org.elasticsearch.cloud.aws.blobstore.S3BlobStore;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import java.io.IOException;
@ -245,59 +244,54 @@ public class S3Repository extends BlobStoreRepository {
private boolean compress;
/**
* Constructs new shared file system repository
*
* @param name repository name
* @param repositorySettings repository settings
* @param s3Service S3 service
* Constructs an s3 backed repository
*/
@Inject
public S3Repository(RepositoryName name, RepositorySettings repositorySettings, AwsS3Service s3Service) throws IOException {
super(name.getName(), repositorySettings);
public S3Repository(RepositoryMetaData metadata, Settings settings, AwsS3Service s3Service) throws IOException {
super(metadata, settings);
String bucket = getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING);
String bucket = getValue(metadata.settings(), settings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING);
if (bucket == null) {
throw new RepositoryException(name.name(), "No bucket defined for s3 gateway");
throw new RepositoryException(metadata.name(), "No bucket defined for s3 gateway");
}
String endpoint = getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING);
Protocol protocol = getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING);
String region = getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING);
String endpoint = getValue(metadata.settings(), settings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING);
Protocol protocol = getValue(metadata.settings(), settings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING);
String region = getValue(metadata.settings(), settings, Repository.REGION_SETTING, Repositories.REGION_SETTING);
// If no region is defined either in region, repositories.s3.region, cloud.aws.s3.region or cloud.aws.region
// we fallback to Default bucket - null
if (Strings.isEmpty(region)) {
region = null;
}
boolean serverSideEncryption = getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING);
ByteSizeValue bufferSize = getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING);
Integer maxRetries = getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING);
boolean useThrottleRetries = getValue(repositorySettings, Repository.USE_THROTTLE_RETRIES_SETTING, Repositories.USE_THROTTLE_RETRIES_SETTING);
this.chunkSize = getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING);
this.compress = getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING);
boolean serverSideEncryption = getValue(metadata.settings(), settings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING);
ByteSizeValue bufferSize = getValue(metadata.settings(), settings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING);
Integer maxRetries = getValue(metadata.settings(), settings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING);
boolean useThrottleRetries = getValue(metadata.settings(), settings, Repository.USE_THROTTLE_RETRIES_SETTING, Repositories.USE_THROTTLE_RETRIES_SETTING);
this.chunkSize = getValue(metadata.settings(), settings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING);
this.compress = getValue(metadata.settings(), settings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING);
// We make sure that chunkSize is bigger or equal than/to bufferSize
if (this.chunkSize.getBytes() < bufferSize.getBytes()) {
throw new RepositoryException(name.name(), Repository.CHUNK_SIZE_SETTING.getKey() + " (" + this.chunkSize +
throw new RepositoryException(metadata.name(), Repository.CHUNK_SIZE_SETTING.getKey() + " (" + this.chunkSize +
") can't be lower than " + Repository.BUFFER_SIZE_SETTING.getKey() + " (" + bufferSize + ").");
}
// Parse and validate the user's S3 Storage Class setting
String storageClass = getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING);
String cannedACL = getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING);
String storageClass = getValue(metadata.settings(), settings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING);
String cannedACL = getValue(metadata.settings(), settings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING);
logger.debug("using bucket [{}], region [{}], endpoint [{}], protocol [{}], chunk_size [{}], server_side_encryption [{}], " +
"buffer_size [{}], max_retries [{}], use_throttle_retries [{}], cannedACL [{}], storageClass [{}]",
bucket, region, endpoint, protocol, chunkSize, serverSideEncryption, bufferSize, maxRetries, useThrottleRetries, cannedACL,
storageClass);
String key = getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING);
String secret = getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING);
String key = getValue(metadata.settings(), settings, Repository.KEY_SETTING, Repositories.KEY_SETTING);
String secret = getValue(metadata.settings(), settings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING);
blobStore = new S3BlobStore(settings, s3Service.client(endpoint, protocol, region, key, secret, maxRetries, useThrottleRetries),
bucket, region, serverSideEncryption, bufferSize, maxRetries, cannedACL, storageClass);
String basePath = getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING);
String basePath = getValue(metadata.settings(), settings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING);
if (Strings.hasLength(basePath)) {
BlobPath path = new BlobPath();
for(String elem : basePath.split("/")) {
@ -309,9 +303,6 @@ public class S3Repository extends BlobStoreRepository {
}
}
/**
* {@inheritDoc}
*/
@Override
protected BlobStore blobStore() {
return blobStore;
@ -322,29 +313,24 @@ public class S3Repository extends BlobStoreRepository {
return basePath;
}
/**
* {@inheritDoc}
*/
@Override
protected boolean isCompress() {
return compress;
}
/**
* {@inheritDoc}
*/
@Override
protected ByteSizeValue chunkSize() {
return chunkSize;
}
public static <T> T getValue(RepositorySettings repositorySettings,
public static <T> T getValue(Settings repositorySettings,
Settings globalSettings,
Setting<T> repositorySetting,
Setting<T> repositoriesSetting) {
if (repositorySetting.exists(repositorySettings.settings())) {
return repositorySetting.get(repositorySettings.settings());
if (repositorySetting.exists(repositorySettings)) {
return repositorySetting.get(repositorySettings);
} else {
return repositoriesSetting.get(repositorySettings.globalSettings());
return repositoriesSetting.get(globalSettings);
}
}
}

View File

@ -69,6 +69,6 @@ public abstract class AbstractAwsTestCase extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return pluginList(S3RepositoryPlugin.class);
return pluginList(TestAwsS3Service.TestPlugin.class);
}
}

View File

@ -1,353 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws;
import com.amazonaws.Protocol;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.s3.S3Repository;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.elasticsearch.repositories.s3.S3Repository.Repositories;
import static org.elasticsearch.repositories.s3.S3Repository.Repository;
import static org.elasticsearch.repositories.s3.S3Repository.getValue;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isEmptyString;
public class RepositoryS3SettingsTests extends ESTestCase {
private static final Settings AWS = Settings.builder()
.put(AwsS3Service.KEY_SETTING.getKey(), "global-key")
.put(AwsS3Service.SECRET_SETTING.getKey(), "global-secret")
.put(AwsS3Service.PROTOCOL_SETTING.getKey(), "https")
.put(AwsS3Service.PROXY_HOST_SETTING.getKey(), "global-proxy-host")
.put(AwsS3Service.PROXY_PORT_SETTING.getKey(), 10000)
.put(AwsS3Service.PROXY_USERNAME_SETTING.getKey(), "global-proxy-username")
.put(AwsS3Service.PROXY_PASSWORD_SETTING.getKey(), "global-proxy-password")
.put(AwsS3Service.SIGNER_SETTING.getKey(), "global-signer")
.put(AwsS3Service.REGION_SETTING.getKey(), "global-region")
.build();
private static final Settings S3 = Settings.builder()
.put(AwsS3Service.CLOUD_S3.KEY_SETTING.getKey(), "s3-key")
.put(AwsS3Service.CLOUD_S3.SECRET_SETTING.getKey(), "s3-secret")
.put(AwsS3Service.CLOUD_S3.PROTOCOL_SETTING.getKey(), "http")
.put(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.getKey(), "s3-proxy-host")
.put(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.getKey(), 20000)
.put(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.getKey(), "s3-proxy-username")
.put(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.getKey(), "s3-proxy-password")
.put(AwsS3Service.CLOUD_S3.SIGNER_SETTING.getKey(), "s3-signer")
.put(AwsS3Service.CLOUD_S3.REGION_SETTING.getKey(), "s3-region")
.put(AwsS3Service.CLOUD_S3.ENDPOINT_SETTING.getKey(), "s3-endpoint")
.build();
private static final Settings REPOSITORIES = Settings.builder()
.put(Repositories.KEY_SETTING.getKey(), "repositories-key")
.put(Repositories.SECRET_SETTING.getKey(), "repositories-secret")
.put(Repositories.BUCKET_SETTING.getKey(), "repositories-bucket")
.put(Repositories.PROTOCOL_SETTING.getKey(), "https")
.put(Repositories.REGION_SETTING.getKey(), "repositories-region")
.put(Repositories.ENDPOINT_SETTING.getKey(), "repositories-endpoint")
.put(Repositories.SERVER_SIDE_ENCRYPTION_SETTING.getKey(), true)
.put(Repositories.BUFFER_SIZE_SETTING.getKey(), "6mb")
.put(Repositories.MAX_RETRIES_SETTING.getKey(), 4)
.put(Repositories.CHUNK_SIZE_SETTING.getKey(), "110mb")
.put(Repositories.COMPRESS_SETTING.getKey(), true)
.put(Repositories.STORAGE_CLASS_SETTING.getKey(), "repositories-class")
.put(Repositories.CANNED_ACL_SETTING.getKey(), "repositories-acl")
.put(Repositories.BASE_PATH_SETTING.getKey(), "repositories-basepath")
.build();
private static final Settings REPOSITORY = Settings.builder()
.put(Repository.KEY_SETTING.getKey(), "repository-key")
.put(Repository.SECRET_SETTING.getKey(), "repository-secret")
.put(Repository.BUCKET_SETTING.getKey(), "repository-bucket")
.put(Repository.PROTOCOL_SETTING.getKey(), "https")
.put(Repository.REGION_SETTING.getKey(), "repository-region")
.put(Repository.ENDPOINT_SETTING.getKey(), "repository-endpoint")
.put(Repository.SERVER_SIDE_ENCRYPTION_SETTING.getKey(), false)
.put(Repository.BUFFER_SIZE_SETTING.getKey(), "7mb")
.put(Repository.MAX_RETRIES_SETTING.getKey(), 5)
.put(Repository.CHUNK_SIZE_SETTING.getKey(), "120mb")
.put(Repository.COMPRESS_SETTING.getKey(), false)
.put(Repository.STORAGE_CLASS_SETTING.getKey(), "repository-class")
.put(Repository.CANNED_ACL_SETTING.getKey(), "repository-acl")
.put(Repository.BASE_PATH_SETTING.getKey(), "repository-basepath")
.build();
/**
* We test when only cloud.aws settings are set
*/
public void testRepositorySettingsGlobalOnly() {
Settings nodeSettings = buildSettings(AWS);
RepositorySettings repositorySettings = new RepositorySettings(nodeSettings, Settings.EMPTY);
assertThat(getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING), is("global-key"));
assertThat(getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING), is("global-secret"));
assertThat(getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING), isEmptyString());
assertThat(getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING), is(Protocol.HTTPS));
assertThat(getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING), is("global-region"));
assertThat(getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING), isEmptyString());
assertThat(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.get(nodeSettings), is("global-proxy-host"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.get(nodeSettings), is(10000));
assertThat(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.get(nodeSettings), is("global-proxy-username"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.get(nodeSettings), is("global-proxy-password"));
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("global-signer"));
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
is(false));
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(100L));
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(3));
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getGb(), is(1L));
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(false));
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING), isEmptyString());
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), isEmptyString());
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), isEmptyString());
}
/**
* We test when cloud.aws settings are overloaded by cloud.aws.s3 settings
*/
public void testRepositorySettingsGlobalOverloadedByS3() {
Settings nodeSettings = buildSettings(AWS, S3);
RepositorySettings repositorySettings = new RepositorySettings(nodeSettings, Settings.EMPTY);
assertThat(getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING), is("s3-key"));
assertThat(getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING), is("s3-secret"));
assertThat(getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING), isEmptyString());
assertThat(getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING), is(Protocol.HTTP));
assertThat(getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING), is("s3-region"));
assertThat(getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING), is("s3-endpoint"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.get(nodeSettings), is("s3-proxy-host"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.get(nodeSettings), is(20000));
assertThat(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.get(nodeSettings), is("s3-proxy-username"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.get(nodeSettings), is("s3-proxy-password"));
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("s3-signer"));
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
is(false));
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(100L));
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(3));
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getGb(), is(1L));
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(false));
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING), isEmptyString());
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), isEmptyString());
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), isEmptyString());
}
/**
* We test when cloud.aws settings are overloaded by repositories.s3 settings
*/
public void testRepositorySettingsGlobalOverloadedByRepositories() {
Settings nodeSettings = buildSettings(AWS, REPOSITORIES);
RepositorySettings repositorySettings = new RepositorySettings(nodeSettings, Settings.EMPTY);
assertThat(getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING), is("repositories-key"));
assertThat(getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING), is("repositories-secret"));
assertThat(getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING), is("repositories-bucket"));
assertThat(getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING), is(Protocol.HTTPS));
assertThat(getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING), is("repositories-region"));
assertThat(getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING), is("repositories-endpoint"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.get(nodeSettings), is("global-proxy-host"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.get(nodeSettings), is(10000));
assertThat(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.get(nodeSettings), is("global-proxy-username"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.get(nodeSettings), is("global-proxy-password"));
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("global-signer"));
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
is(true));
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(6L));
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(4));
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(110L));
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(true));
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING),
is("repositories-class"));
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), is("repositories-acl"));
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), is("repositories-basepath"));
}
/**
* We test when cloud.aws.s3 settings are overloaded by repositories.s3 settings
*/
public void testRepositorySettingsS3OverloadedByRepositories() {
Settings nodeSettings = buildSettings(AWS, S3, REPOSITORIES);
RepositorySettings repositorySettings = new RepositorySettings(nodeSettings, Settings.EMPTY);
assertThat(getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING), is("repositories-key"));
assertThat(getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING), is("repositories-secret"));
assertThat(getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING), is("repositories-bucket"));
assertThat(getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING), is(Protocol.HTTPS));
assertThat(getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING), is("repositories-region"));
assertThat(getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING), is("repositories-endpoint"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.get(nodeSettings), is("s3-proxy-host"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.get(nodeSettings), is(20000));
assertThat(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.get(nodeSettings), is("s3-proxy-username"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.get(nodeSettings), is("s3-proxy-password"));
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("s3-signer"));
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
is(true));
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(6L));
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(4));
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(110L));
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(true));
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING),
is("repositories-class"));
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), is("repositories-acl"));
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), is("repositories-basepath"));
}
/**
* We test when cloud.aws settings are overloaded by single repository settings
*/
public void testRepositorySettingsGlobalOverloadedByRepository() {
Settings nodeSettings = buildSettings(AWS);
RepositorySettings repositorySettings = new RepositorySettings(nodeSettings, REPOSITORY);
assertThat(getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING), is("repository-key"));
assertThat(getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING), is("repository-secret"));
assertThat(getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING), is("repository-bucket"));
assertThat(getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING), is(Protocol.HTTPS));
assertThat(getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING), is("repository-region"));
assertThat(getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING), is("repository-endpoint"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.get(nodeSettings), is("global-proxy-host"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.get(nodeSettings), is(10000));
assertThat(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.get(nodeSettings), is("global-proxy-username"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.get(nodeSettings), is("global-proxy-password"));
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("global-signer"));
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
is(false));
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(7L));
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(5));
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(120L));
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(false));
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING),
is("repository-class"));
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), is("repository-acl"));
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), is("repository-basepath"));
}
/**
* We test when cloud.aws.s3 settings are overloaded by single repository settings
*/
public void testRepositorySettingsS3OverloadedByRepository() {
Settings nodeSettings = buildSettings(AWS, S3);
RepositorySettings repositorySettings = new RepositorySettings(nodeSettings, REPOSITORY);
assertThat(getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING), is("repository-key"));
assertThat(getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING), is("repository-secret"));
assertThat(getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING), is("repository-bucket"));
assertThat(getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING), is(Protocol.HTTPS));
assertThat(getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING), is("repository-region"));
assertThat(getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING), is("repository-endpoint"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.get(nodeSettings), is("s3-proxy-host"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.get(nodeSettings), is(20000));
assertThat(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.get(nodeSettings), is("s3-proxy-username"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.get(nodeSettings), is("s3-proxy-password"));
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("s3-signer"));
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
is(false));
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(7L));
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(5));
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(120L));
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(false));
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING),
is("repository-class"));
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), is("repository-acl"));
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), is("repository-basepath"));
}
/**
* We test when repositories settings are overloaded by single repository settings
*/
public void testRepositorySettingsRepositoriesOverloadedByRepository() {
Settings nodeSettings = buildSettings(AWS, S3, REPOSITORIES);
RepositorySettings repositorySettings = new RepositorySettings(nodeSettings, REPOSITORY);
assertThat(getValue(repositorySettings, Repository.KEY_SETTING, Repositories.KEY_SETTING), is("repository-key"));
assertThat(getValue(repositorySettings, Repository.SECRET_SETTING, Repositories.SECRET_SETTING), is("repository-secret"));
assertThat(getValue(repositorySettings, Repository.BUCKET_SETTING, Repositories.BUCKET_SETTING), is("repository-bucket"));
assertThat(getValue(repositorySettings, Repository.PROTOCOL_SETTING, Repositories.PROTOCOL_SETTING), is(Protocol.HTTPS));
assertThat(getValue(repositorySettings, Repository.REGION_SETTING, Repositories.REGION_SETTING), is("repository-region"));
assertThat(getValue(repositorySettings, Repository.ENDPOINT_SETTING, Repositories.ENDPOINT_SETTING), is("repository-endpoint"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_HOST_SETTING.get(nodeSettings), is("s3-proxy-host"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PORT_SETTING.get(nodeSettings), is(20000));
assertThat(AwsS3Service.CLOUD_S3.PROXY_USERNAME_SETTING.get(nodeSettings), is("s3-proxy-username"));
assertThat(AwsS3Service.CLOUD_S3.PROXY_PASSWORD_SETTING.get(nodeSettings), is("s3-proxy-password"));
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("s3-signer"));
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
is(false));
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(7L));
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(5));
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(120L));
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(false));
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING),
is("repository-class"));
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), is("repository-acl"));
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), is("repository-basepath"));
}
/**
* We test wrong Chunk and Buffer settings
*/
public void testInvalidChunkBufferSizeRepositorySettings() throws IOException {
// chunk < buffer should fail
internalTestInvalidChunkBufferSizeSettings(new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(5, ByteSizeUnit.MB),
"chunk_size (5mb) can't be lower than buffer_size (10mb).");
// chunk > buffer should pass
internalTestInvalidChunkBufferSizeSettings(new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(10, ByteSizeUnit.MB), null);
// chunk = buffer should pass
internalTestInvalidChunkBufferSizeSettings(new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(5, ByteSizeUnit.MB), null);
// buffer < 5mb should fail
internalTestInvalidChunkBufferSizeSettings(new ByteSizeValue(4, ByteSizeUnit.MB), new ByteSizeValue(10, ByteSizeUnit.MB),
"Failed to parse value [4mb] for setting [buffer_size] must be >= 5mb");
// chunk > 5tb should fail
internalTestInvalidChunkBufferSizeSettings(new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(6, ByteSizeUnit.TB),
"Failed to parse value [6tb] for setting [chunk_size] must be <= 5tb");
}
private Settings buildSettings(Settings... global) {
Settings.Builder builder = Settings.builder();
for (Settings settings : global) {
builder.put(settings);
}
return builder.build();
}
private void internalTestInvalidChunkBufferSizeSettings(ByteSizeValue buffer, ByteSizeValue chunk, String expectedMessage)
throws IOException {
Settings nodeSettings = buildSettings(AWS, S3, REPOSITORIES);
RepositorySettings s3RepositorySettings = new RepositorySettings(nodeSettings, Settings.builder()
.put(Repository.BUFFER_SIZE_SETTING.getKey(), buffer)
.put(Repository.CHUNK_SIZE_SETTING.getKey(), chunk)
.build());
try {
new S3Repository(new RepositoryName("s3", "s3repo"), s3RepositorySettings, null);
fail("We should either raise a NPE or a RepositoryException or a IllegalArgumentException");
} catch (RepositoryException e) {
assertThat(e.getDetailedMessage(), containsString(expectedMessage));
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString(expectedMessage));
} catch (NullPointerException e) {
// Because we passed to the CTOR a Null AwsS3Service, we get a NPE which is expected
// in the context of this test
if (expectedMessage != null) {
fail("We should have raised a RepositoryException");
}
}
}
}

View File

@ -23,20 +23,21 @@ import com.amazonaws.services.s3.AmazonS3;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.repository.s3.S3RepositoryPlugin;
import org.elasticsearch.plugins.Plugin;
import java.util.IdentityHashMap;
public class TestAwsS3Service extends InternalAwsS3Service {
public static class TestPlugin extends Plugin {
public void onModule(S3Module s3Module) {
S3Module.s3ServiceImpl = TestAwsS3Service.class;
public static class TestPlugin extends S3RepositoryPlugin {
@Override
protected AwsS3Service createStorageService(Settings settings) {
return new TestAwsS3Service(settings);
}
}
IdentityHashMap<AmazonS3, TestAmazonS3> clients = new IdentityHashMap<AmazonS3, TestAmazonS3>();
@Inject
public TestAwsS3Service(Settings settings) {
super(settings);
}

View File

@ -49,10 +49,8 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.notNullValue;
/**
*/
@ClusterScope(scope = Scope.SUITE, numDataNodes = 2, numClientNodes = 0, transportClientRatio = 0.0)
public abstract class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase {
public abstract class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase {
@Override
public Settings nodeSettings(int nodeOrdinal) {

View File

@ -0,0 +1,107 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.s3;
import java.io.IOException;
import com.amazonaws.Protocol;
import com.amazonaws.services.s3.AbstractAmazonS3;
import com.amazonaws.services.s3.AmazonS3;
import org.elasticsearch.cloud.aws.AwsS3Service;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.test.ESTestCase;
import static org.elasticsearch.repositories.s3.S3Repository.Repositories;
import static org.elasticsearch.repositories.s3.S3Repository.Repository;
import static org.elasticsearch.repositories.s3.S3Repository.getValue;
import static org.hamcrest.Matchers.containsString;
public class S3RepositoryTests extends ESTestCase {
private static class DummyS3Client extends AbstractAmazonS3 {
@Override
public boolean doesBucketExist(String bucketName) {
return true;
}
}
private static class DummyS3Service extends AbstractLifecycleComponent implements AwsS3Service {
public DummyS3Service() {
super(Settings.EMPTY);
}
@Override
protected void doStart() {}
@Override
protected void doStop() {}
@Override
protected void doClose() {}
@Override
public AmazonS3 client(String endpoint, Protocol protocol, String region, String account, String key,
Integer maxRetries, boolean useThrottleRetries) {
return new DummyS3Client();
}
}
public void testSettingsResolution() throws Exception {
Settings localSettings = Settings.builder().put(Repository.KEY_SETTING.getKey(), "key1").build();
Settings globalSettings = Settings.builder().put(Repositories.KEY_SETTING.getKey(), "key2").build();
assertEquals("key1", getValue(localSettings, globalSettings, Repository.KEY_SETTING, Repositories.KEY_SETTING));
assertEquals("key1", getValue(localSettings, Settings.EMPTY, Repository.KEY_SETTING, Repositories.KEY_SETTING));
assertEquals("key2", getValue(Settings.EMPTY, globalSettings, Repository.KEY_SETTING, Repositories.KEY_SETTING));
assertEquals("", getValue(Settings.EMPTY, Settings.EMPTY, Repository.KEY_SETTING, Repositories.KEY_SETTING));
}
public void testInvalidChunkBufferSizeSettings() throws IOException {
// chunk < buffer should fail
assertInvalidBuffer(10, 5, RepositoryException.class, "chunk_size (5mb) can't be lower than buffer_size (10mb).");
// chunk > buffer should pass
assertValidBuffer(5, 10);
// chunk = buffer should pass
assertValidBuffer(5, 5);
// buffer < 5mb should fail
assertInvalidBuffer(4, 10, IllegalArgumentException.class,
"Failed to parse value [4mb] for setting [buffer_size] must be >= 5mb");
// chunk > 5tb should fail
assertInvalidBuffer(5, 6000000, IllegalArgumentException.class,
"Failed to parse value [5.7tb] for setting [chunk_size] must be <= 5tb");
}
private void assertValidBuffer(long bufferMB, long chunkMB) throws IOException {
RepositoryMetaData metadata = new RepositoryMetaData("dummy-repo", "mock", Settings.builder()
.put(Repository.BUFFER_SIZE_SETTING.getKey(), new ByteSizeValue(bufferMB, ByteSizeUnit.MB))
.put(Repository.CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(chunkMB, ByteSizeUnit.MB)).build());
new S3Repository(metadata, Settings.EMPTY, new DummyS3Service());
}
private void assertInvalidBuffer(int bufferMB, int chunkMB, Class<? extends Exception> clazz, String msg) throws IOException {
RepositoryMetaData metadata = new RepositoryMetaData("dummy-repo", "mock", Settings.builder()
.put(Repository.BUFFER_SIZE_SETTING.getKey(), new ByteSizeValue(bufferMB, ByteSizeUnit.MB))
.put(Repository.CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(chunkMB, ByteSizeUnit.MB)).build());
Exception e = expectThrows(clazz, () -> new S3Repository(metadata, Settings.EMPTY, new DummyS3Service()));
assertThat(e.getMessage(), containsString(msg));
}
}