Add ability to specify base directory on the repository level
This change is needed to support multiple repositories per S3 bucket
This commit is contained in:
parent
95ca06cf09
commit
aafd4ddfbd
|
@ -97,8 +97,6 @@ import static com.google.common.collect.Lists.newArrayList;
|
||||||
*/
|
*/
|
||||||
public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Repository> implements Repository {
|
public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Repository> implements Repository {
|
||||||
|
|
||||||
private BlobPath basePath;
|
|
||||||
|
|
||||||
private ImmutableBlobContainer snapshotsBlobContainer;
|
private ImmutableBlobContainer snapshotsBlobContainer;
|
||||||
|
|
||||||
private final String repositoryName;
|
private final String repositoryName;
|
||||||
|
@ -124,7 +122,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
|
||||||
super(repositorySettings.globalSettings());
|
super(repositorySettings.globalSettings());
|
||||||
this.repositoryName = repositoryName;
|
this.repositoryName = repositoryName;
|
||||||
this.indexShardRepository = (BlobStoreIndexShardRepository) indexShardRepository;
|
this.indexShardRepository = (BlobStoreIndexShardRepository) indexShardRepository;
|
||||||
this.basePath = BlobPath.cleanPath();
|
|
||||||
Map<String, String> globalOnlyParams = Maps.newHashMap();
|
Map<String, String> globalOnlyParams = Maps.newHashMap();
|
||||||
globalOnlyParams.put(MetaData.GLOBAL_PERSISTENT_ONLY_PARAM, "true");
|
globalOnlyParams.put(MetaData.GLOBAL_PERSISTENT_ONLY_PARAM, "true");
|
||||||
globalOnlyFormatParams = new ToXContent.MapParams(globalOnlyParams);
|
globalOnlyFormatParams = new ToXContent.MapParams(globalOnlyParams);
|
||||||
|
@ -135,8 +132,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
protected void doStart() throws ElasticSearchException {
|
protected void doStart() throws ElasticSearchException {
|
||||||
this.snapshotsBlobContainer = blobStore().immutableBlobContainer(basePath);
|
this.snapshotsBlobContainer = blobStore().immutableBlobContainer(basePath());
|
||||||
indexShardRepository.initialize(blobStore(), basePath, chunkSize());
|
indexShardRepository.initialize(blobStore(), basePath(), chunkSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -167,6 +164,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
|
||||||
*/
|
*/
|
||||||
abstract protected BlobStore blobStore();
|
abstract protected BlobStore blobStore();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns base path of the repository
|
||||||
|
*/
|
||||||
|
abstract protected BlobPath basePath();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns true if metadata and snapshot files should be compressed
|
* Returns true if metadata and snapshot files should be compressed
|
||||||
*
|
*
|
||||||
|
@ -211,7 +213,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
|
||||||
snapshotsBlobContainer.writeBlob(metaDataBlobName(snapshotId), bStream.bytes().streamInput(), bStream.bytes().length());
|
snapshotsBlobContainer.writeBlob(metaDataBlobName(snapshotId), bStream.bytes().streamInput(), bStream.bytes().length());
|
||||||
for (String index : indices) {
|
for (String index : indices) {
|
||||||
IndexMetaData indexMetaData = metaData.index(index);
|
IndexMetaData indexMetaData = metaData.index(index);
|
||||||
BlobPath indexPath = basePath.add("indices").add(index);
|
BlobPath indexPath = basePath().add("indices").add(index);
|
||||||
ImmutableBlobContainer indexMetaDataBlobContainer = blobStore().immutableBlobContainer(indexPath);
|
ImmutableBlobContainer indexMetaDataBlobContainer = blobStore().immutableBlobContainer(indexPath);
|
||||||
bStream = new BytesStreamOutput();
|
bStream = new BytesStreamOutput();
|
||||||
StreamOutput stream = bStream;
|
StreamOutput stream = bStream;
|
||||||
|
@ -244,7 +246,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
|
||||||
snapshotsBlobContainer.deleteBlob(metaDataBlobName(snapshotId));
|
snapshotsBlobContainer.deleteBlob(metaDataBlobName(snapshotId));
|
||||||
// Now delete all indices
|
// Now delete all indices
|
||||||
for (String index : snapshot.indices()) {
|
for (String index : snapshot.indices()) {
|
||||||
BlobPath indexPath = basePath.add("indices").add(index);
|
BlobPath indexPath = basePath().add("indices").add(index);
|
||||||
ImmutableBlobContainer indexMetaDataBlobContainer = blobStore().immutableBlobContainer(indexPath);
|
ImmutableBlobContainer indexMetaDataBlobContainer = blobStore().immutableBlobContainer(indexPath);
|
||||||
try {
|
try {
|
||||||
indexMetaDataBlobContainer.deleteBlob(blobName);
|
indexMetaDataBlobContainer.deleteBlob(blobName);
|
||||||
|
@ -266,7 +268,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Snapshot finalizeSnapshot(SnapshotId snapshotId, String failure, int totalShards, ImmutableList<SnapshotShardFailure> shardFailures) {
|
public Snapshot finalizeSnapshot(SnapshotId snapshotId, String failure, int totalShards, ImmutableList<SnapshotShardFailure> shardFailures) {
|
||||||
BlobStoreSnapshot snapshot = readSnapshot(snapshotId);
|
BlobStoreSnapshot snapshot = (BlobStoreSnapshot)readSnapshot(snapshotId);
|
||||||
if (snapshot == null) {
|
if (snapshot == null) {
|
||||||
throw new SnapshotMissingException(snapshotId);
|
throw new SnapshotMissingException(snapshotId);
|
||||||
}
|
}
|
||||||
|
@ -338,7 +340,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
|
||||||
}
|
}
|
||||||
MetaData.Builder metaDataBuilder = MetaData.builder(metaData);
|
MetaData.Builder metaDataBuilder = MetaData.builder(metaData);
|
||||||
for (String index : indices) {
|
for (String index : indices) {
|
||||||
BlobPath indexPath = basePath.add("indices").add(index);
|
BlobPath indexPath = basePath().add("indices").add(index);
|
||||||
ImmutableBlobContainer indexMetaDataBlobContainer = blobStore().immutableBlobContainer(indexPath);
|
ImmutableBlobContainer indexMetaDataBlobContainer = blobStore().immutableBlobContainer(indexPath);
|
||||||
XContentParser parser = null;
|
XContentParser parser = null;
|
||||||
try {
|
try {
|
||||||
|
@ -368,7 +370,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
|
||||||
* {@inheritDoc}
|
* {@inheritDoc}
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public BlobStoreSnapshot readSnapshot(SnapshotId snapshotId) {
|
public Snapshot readSnapshot(SnapshotId snapshotId) {
|
||||||
try {
|
try {
|
||||||
String blobName = snapshotBlobName(snapshotId);
|
String blobName = snapshotBlobName(snapshotId);
|
||||||
byte[] data = snapshotsBlobContainer.readBlobFully(blobName);
|
byte[] data = snapshotsBlobContainer.readBlobFully(blobName);
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.elasticsearch.repositories.fs;
|
package org.elasticsearch.repositories.fs;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.blobstore.BlobPath;
|
||||||
import org.elasticsearch.common.blobstore.BlobStore;
|
import org.elasticsearch.common.blobstore.BlobStore;
|
||||||
import org.elasticsearch.common.blobstore.fs.FsBlobStore;
|
import org.elasticsearch.common.blobstore.fs.FsBlobStore;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
|
@ -54,6 +55,8 @@ public class FsRepository extends BlobStoreRepository {
|
||||||
|
|
||||||
private ByteSizeValue chunkSize;
|
private ByteSizeValue chunkSize;
|
||||||
|
|
||||||
|
private final BlobPath basePath;
|
||||||
|
|
||||||
private boolean compress;
|
private boolean compress;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -80,6 +83,7 @@ public class FsRepository extends BlobStoreRepository {
|
||||||
blobStore = new FsBlobStore(componentSettings, concurrentStreamPool, locationFile);
|
blobStore = new FsBlobStore(componentSettings, concurrentStreamPool, locationFile);
|
||||||
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", componentSettings.getAsBytesSize("chunk_size", null));
|
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", componentSettings.getAsBytesSize("chunk_size", null));
|
||||||
this.compress = repositorySettings.settings().getAsBoolean("compress", componentSettings.getAsBoolean("compress", false));
|
this.compress = repositorySettings.settings().getAsBoolean("compress", componentSettings.getAsBoolean("compress", false));
|
||||||
|
this.basePath = BlobPath.cleanPath();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -106,5 +110,8 @@ public class FsRepository extends BlobStoreRepository {
|
||||||
return chunkSize;
|
return chunkSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected BlobPath basePath() {
|
||||||
|
return basePath;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.elasticsearch.repositories.uri;
|
package org.elasticsearch.repositories.uri;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.blobstore.BlobPath;
|
||||||
import org.elasticsearch.common.blobstore.BlobStore;
|
import org.elasticsearch.common.blobstore.BlobStore;
|
||||||
import org.elasticsearch.common.blobstore.url.URLBlobStore;
|
import org.elasticsearch.common.blobstore.url.URLBlobStore;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
|
@ -49,6 +50,8 @@ public class URLRepository extends BlobStoreRepository {
|
||||||
|
|
||||||
private final URLBlobStore blobStore;
|
private final URLBlobStore blobStore;
|
||||||
|
|
||||||
|
private final BlobPath basePath;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs new read-only URL-based repository
|
* Constructs new read-only URL-based repository
|
||||||
*
|
*
|
||||||
|
@ -70,6 +73,7 @@ public class URLRepository extends BlobStoreRepository {
|
||||||
int concurrentStreams = repositorySettings.settings().getAsInt("concurrent_streams", componentSettings.getAsInt("concurrent_streams", 5));
|
int concurrentStreams = repositorySettings.settings().getAsInt("concurrent_streams", componentSettings.getAsInt("concurrent_streams", 5));
|
||||||
ExecutorService concurrentStreamPool = EsExecutors.newScaling(1, concurrentStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[fs_stream]"));
|
ExecutorService concurrentStreamPool = EsExecutors.newScaling(1, concurrentStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[fs_stream]"));
|
||||||
blobStore = new URLBlobStore(componentSettings, concurrentStreamPool, url);
|
blobStore = new URLBlobStore(componentSettings, concurrentStreamPool, url);
|
||||||
|
basePath = BlobPath.cleanPath();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -79,4 +83,9 @@ public class URLRepository extends BlobStoreRepository {
|
||||||
protected BlobStore blobStore() {
|
protected BlobStore blobStore() {
|
||||||
return blobStore;
|
return blobStore;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected BlobPath basePath() {
|
||||||
|
return basePath;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue