BlobContainer interface changed in elasticsearch 1.4.0

AWS plugin needs an update because of this change https://github.com/elasticsearch/elasticsearch/pull/7551

Closes #37.
This commit is contained in:
David Pilato 2014-10-22 15:29:21 +02:00
parent 29aae071c4
commit 6d5ac76eee
11 changed files with 107 additions and 172 deletions

View File

@ -378,7 +378,6 @@ The Azure repository supports following settings:
* `container`: Container name. Defaults to `elasticsearch-snapshots`
* `base_path`: Specifies the path within container to repository data. Defaults to empty (root directory).
* `concurrent_streams`: Throttles the number of streams (per node) preforming snapshot operation. Defaults to `5`.
* `chunk_size`: Big files can be broken down into chunks during snapshotting if needed. The chunk size can be specified
in bytes or by using size value notation, i.e. `1g`, `10m`, `5k`. Defaults to `64m` (64m max)
* `compress`: When set to `true` metadata files are stored in compressed format. This setting doesn't affect index
@ -398,7 +397,6 @@ $ curl -XPUT 'http://localhost:9200/_snapshot/my_backup2' -d '{
"settings": {
"container": "backup_container",
"base_path": "backups",
"concurrent_streams": 2,
"chunk_size": "32m",
"compress": true
}

View File

@ -44,7 +44,7 @@ governing permissions and limitations under the License. -->
<properties>
<elasticsearch.version>2.0.0-SNAPSHOT</elasticsearch.version>
<lucene.version>4.9.0</lucene.version>
<lucene.version>4.10.1</lucene.version>
<tests.output>onerror</tests.output>
<tests.shuffle>true</tests.shuffle>
<tests.output>onerror</tests.output>

View File

@ -24,8 +24,8 @@ import com.microsoft.windowsazure.services.core.storage.StorageException;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.collect.ImmutableMap;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URISyntaxException;
/**
@ -38,7 +38,6 @@ public interface AzureStorageService {
public static final String KEY = "storage_key";
public static final String CONTAINER = "container";
public static final String BASE_PATH = "base_path";
public static final String CONCURRENT_STREAMS = "concurrent_streams";
public static final String CHUNK_SIZE = "chunk_size";
public static final String COMPRESS = "compress";
}
@ -57,9 +56,7 @@ public interface AzureStorageService {
InputStream getInputStream(String container, String blob) throws ServiceException;
OutputStream getOutputStream(String container, String blob) throws URISyntaxException, StorageException;
ImmutableMap<String,BlobMetaData> listBlobsByPrefix(String container, String keyPath, String prefix) throws URISyntaxException, StorageException, ServiceException;
void putObject(String container, String blob, InputStream is, long length) throws URISyntaxException, StorageException, IOException;
}

View File

@ -43,8 +43,8 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
@ -197,6 +197,11 @@ public class AzureStorageServiceImpl extends AbstractLifecycleComponent<AzureSto
return blobResult.getContentStream();
}
@Override
public OutputStream getOutputStream(String container, String blob) throws URISyntaxException, StorageException {
return client.getContainerReference(container).getBlockBlobReference(blob).openOutputStream();
}
@Override
public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(String container, String keyPath, String prefix) throws URISyntaxException, StorageException, ServiceException {
logger.debug("listBlobsByPrefix container [{}], keyPath [{}], prefix [{}]", container, keyPath, prefix);
@ -223,16 +228,6 @@ public class AzureStorageServiceImpl extends AbstractLifecycleComponent<AzureSto
return blobsBuilder.build();
}
@Override
public void putObject(String container, String blobname, InputStream is, long length) throws URISyntaxException, StorageException, IOException {
if (logger.isTraceEnabled()) {
logger.trace("creating blob in container [{}], blob [{}], length [{}]",
container, blobname, length);
}
CloudBlockBlob blob = client.getContainerReference(container).getBlockBlobReference(blobname);
blob.upload(is, length);
}
@Override
protected void doStart() throws ElasticsearchException {
logger.debug("starting azure storage client instance");

View File

@ -21,7 +21,6 @@ package org.elasticsearch.cloud.azure.blobstore;
import com.microsoft.windowsazure.services.core.ServiceException;
import com.microsoft.windowsazure.services.core.storage.StorageException;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
@ -33,20 +32,21 @@ import org.elasticsearch.common.logging.ESLoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URISyntaxException;
/**
*
*/
public class AbstractAzureBlobContainer extends AbstractBlobContainer {
public class AzureBlobContainer extends AbstractBlobContainer {
protected final ESLogger logger = ESLoggerFactory.getLogger(AbstractAzureBlobContainer.class.getName());
protected final ESLogger logger = ESLoggerFactory.getLogger(AzureBlobContainer.class.getName());
protected final AzureBlobStore blobStore;
protected final String keyPath;
public AbstractAzureBlobContainer(BlobPath path, AzureBlobStore blobStore) {
public AzureBlobContainer(BlobPath path, AzureBlobStore blobStore) {
super(path);
this.blobStore = blobStore;
String keyPath = path.buildAsString("/");
@ -68,6 +68,32 @@ public class AbstractAzureBlobContainer extends AbstractBlobContainer {
return false;
}
@Override
public InputStream openInput(String blobName) throws IOException {
try {
return blobStore.client().getInputStream(blobStore.container(), buildKey(blobName));
} catch (ServiceException e) {
if (e.getHttpStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new FileNotFoundException(e.getMessage());
}
throw new IOException(e);
}
}
@Override
public OutputStream createOutput(String blobName) throws IOException {
try {
return new AzureOutputStream(blobStore.client().getOutputStream(blobStore.container(), buildKey(blobName)));
} catch (StorageException e) {
if (e.getHttpStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new FileNotFoundException(e.getMessage());
}
throw new IOException(e);
} catch (URISyntaxException e) {
throw new IOException(e);
}
}
@Override
public boolean deleteBlob(String blobName) throws IOException {
try {
@ -82,35 +108,6 @@ public class AbstractAzureBlobContainer extends AbstractBlobContainer {
}
}
@Override
public void readBlob(final String blobName, final ReadBlobListener listener) {
blobStore.executor().execute(new Runnable() {
@Override
public void run() {
InputStream is = null;
try {
is = blobStore.client().getInputStream(blobStore.container(), buildKey(blobName));
byte[] buffer = new byte[blobStore.bufferSizeInBytes()];
int bytesRead;
while ((bytesRead = is.read(buffer)) != -1) {
listener.onPartial(buffer, 0, bytesRead);
}
is.close();
listener.onCompleted();
} catch (ServiceException e) {
if (e.getHttpStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
listener.onFailure(new FileNotFoundException(e.getMessage()));
} else {
listener.onFailure(e);
}
} catch (Throwable e) {
IOUtils.closeWhileHandlingException(is);
listener.onFailure(e);
}
}
});
}
@Override
public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(@Nullable String prefix) throws IOException {

View File

@ -22,16 +22,13 @@ package org.elasticsearch.cloud.azure.blobstore;
import com.microsoft.windowsazure.services.core.ServiceException;
import com.microsoft.windowsazure.services.core.storage.StorageException;
import org.elasticsearch.cloud.azure.AzureStorageService;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import java.net.URISyntaxException;
import java.util.concurrent.Executor;
/**
*
@ -42,17 +39,10 @@ public class AzureBlobStore extends AbstractComponent implements BlobStore {
private final String container;
private final Executor executor;
private final int bufferSizeInBytes;
public AzureBlobStore(Settings settings, AzureStorageService client, String container, Executor executor) throws URISyntaxException, StorageException {
public AzureBlobStore(Settings settings, AzureStorageService client, String container) throws URISyntaxException, StorageException {
super(settings);
this.client = client;
this.container = container;
this.executor = executor;
this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();
if (!client.doesContainerExist(container)) {
client.createContainer(container);
@ -72,17 +62,9 @@ public class AzureBlobStore extends AbstractComponent implements BlobStore {
return container;
}
public Executor executor() {
return executor;
}
public int bufferSizeInBytes() {
return bufferSizeInBytes;
}
@Override
public ImmutableBlobContainer immutableBlobContainer(BlobPath path) {
return new AzureImmutableBlobContainer(path, this);
public BlobContainer blobContainer(BlobPath path) {
return new AzureBlobContainer(path, this);
}
@Override

View File

@ -1,57 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.azure.blobstore;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.blobstore.support.BlobStores;
import java.io.IOException;
import java.io.InputStream;
/**
*
*/
public class AzureImmutableBlobContainer extends AbstractAzureBlobContainer implements ImmutableBlobContainer {
public AzureImmutableBlobContainer(BlobPath path, AzureBlobStore blobStore) {
super(path, blobStore);
}
@Override
public void writeBlob(final String blobName, final InputStream is, final long sizeInBytes, final WriterListener listener) {
blobStore.executor().execute(new Runnable() {
@Override
public void run() {
try {
blobStore.client().putObject(blobStore.container(), buildKey(blobName), is, sizeInBytes);
listener.onCompleted();
} catch (Throwable e) {
listener.onFailure(e);
}
}
});
}
@Override
public void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException {
BlobStores.syncWriteBlob(this, blobName, is, sizeInBytes);
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.azure.blobstore;
import java.io.IOException;
import java.io.OutputStream;
public class AzureOutputStream extends OutputStream {
private final OutputStream blobOutputStream;
public AzureOutputStream(OutputStream blobOutputStream) {
this.blobOutputStream = blobOutputStream;
}
@Override
public void write(int b) throws IOException {
blobOutputStream.write(b);
}
@Override
public void close() throws IOException {
try {
blobOutputStream.close();
} catch (IOException e) {
// Azure is sending a "java.io.IOException: Stream is already closed."
}
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.discovery.azure;
import org.elasticsearch.Version;
import org.elasticsearch.cloud.azure.AzureComputeService;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
@ -51,7 +50,7 @@ public class AzureDiscovery extends ZenDiscovery {
DiscoveryNodeService discoveryNodeService, AzureComputeService azureService, NetworkService networkService,
DiscoverySettings discoverySettings, ElectMasterService electMasterService) {
super(settings, clusterName, threadPool, transportService, clusterService, nodeSettingsService,
discoveryNodeService, pingService, electMasterService, Version.CURRENT, discoverySettings);
discoveryNodeService, pingService, electMasterService, discoverySettings);
if (settings.getAsBoolean("cloud.enabled", true)) {
ImmutableList<? extends ZenPing> zenPings = pingService.zenPings();
UnicastZenPing unicastZenPing = null;

View File

@ -28,7 +28,6 @@ import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
@ -36,8 +35,6 @@ import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Azure file system implementation of the BlobStoreRepository
@ -46,7 +43,6 @@ import java.util.concurrent.TimeUnit;
* <dl>
* <dt>{@code container}</dt><dd>Azure container name. Defaults to elasticsearch-snapshots</dd>
* <dt>{@code base_path}</dt><dd>Specifies the path within bucket to repository data. Defaults to root directory.</dd>
* <dt>{@code concurrent_streams}</dt><dd>Number of concurrent read/write stream (per repository on each node). Defaults to 5.</dd>
* <dt>{@code chunk_size}</dt><dd>Large file can be divided into chunks. This parameter specifies the chunk size. Defaults to 64mb.</dd>
* <dt>{@code compress}</dt><dd>If set to true metadata files will be stored compressed. Defaults to false.</dd>
* </dl>
@ -80,12 +76,7 @@ public class AzureRepository extends BlobStoreRepository {
String container = repositorySettings.settings().get(AzureStorageService.Fields.CONTAINER,
componentSettings.get(AzureStorageService.Fields.CONTAINER, CONTAINER_DEFAULT));
int concurrentStreams = repositorySettings.settings().getAsInt(AzureStorageService.Fields.CONCURRENT_STREAMS,
componentSettings.getAsInt(AzureStorageService.Fields.CONCURRENT_STREAMS, 5));
ExecutorService concurrentStreamPool = EsExecutors.newScaling(1, concurrentStreams, 5, TimeUnit.SECONDS,
EsExecutors.daemonThreadFactory(settings, "[azure_stream]"));
this.blobStore = new AzureBlobStore(settings, azureStorageService, container, concurrentStreamPool);
this.blobStore = new AzureBlobStore(settings, azureStorageService, container);
this.chunkSize = repositorySettings.settings().getAsBytesSize(AzureStorageService.Fields.CHUNK_SIZE,
componentSettings.getAsBytesSize(AzureStorageService.Fields.CHUNK_SIZE, new ByteSizeValue(64, ByteSizeUnit.MB)));
@ -109,8 +100,8 @@ public class AzureRepository extends BlobStoreRepository {
} else {
this.basePath = BlobPath.cleanPath();
}
logger.debug("using container [{}], chunk_size [{}], concurrent_streams [{}], compress [{}], base_path [{}]",
container, chunkSize, concurrentStreams, compress, basePath);
logger.debug("using container [{}], chunk_size [{}], compress [{}], base_path [{}]",
container, chunkSize, compress, basePath);
}
/**

View File

@ -19,6 +19,7 @@
package org.elasticsearch.repositories.azure;
import com.microsoft.windowsazure.services.core.storage.StorageException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cloud.azure.AzureStorageService;
import org.elasticsearch.common.blobstore.BlobMetaData;
@ -28,10 +29,8 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.*;
import java.net.URISyntaxException;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -42,7 +41,7 @@ import java.util.concurrent.ConcurrentHashMap;
public class AzureStorageServiceMock extends AbstractLifecycleComponent<AzureStorageServiceMock>
implements AzureStorageService {
protected Map<String, byte[]> blobs = new ConcurrentHashMap<String, byte[]>();
protected Map<String, ByteArrayOutputStream> blobs = new ConcurrentHashMap<String, ByteArrayOutputStream>();
@Inject
protected AzureStorageServiceMock(Settings settings) {
@ -78,7 +77,14 @@ public class AzureStorageServiceMock extends AbstractLifecycleComponent<AzureSto
@Override
public InputStream getInputStream(String container, String blob) {
return new ByteArrayInputStream(blobs.get(blob));
return new ByteArrayInputStream(blobs.get(blob).toByteArray());
}
@Override
public OutputStream getOutputStream(String container, String blob) throws URISyntaxException, StorageException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
blobs.put(blob, outputStream);
return outputStream;
}
@Override
@ -86,32 +92,13 @@ public class AzureStorageServiceMock extends AbstractLifecycleComponent<AzureSto
ImmutableMap.Builder<String, BlobMetaData> blobsBuilder = ImmutableMap.builder();
for (String blobName : blobs.keySet()) {
if (startsWithIgnoreCase(blobName, prefix)) {
blobsBuilder.put(blobName, new PlainBlobMetaData(blobName, blobs.get(blobName).length));
blobsBuilder.put(blobName, new PlainBlobMetaData(blobName, blobs.get(blobName).size()));
}
}
ImmutableMap<String, BlobMetaData> map = blobsBuilder.build();
return map;
}
@Override
public void putObject(String container, String blob, InputStream is, long length) {
try {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
int nRead;
byte[] data = new byte[65535];
while ((nRead = is.read(data, 0, data.length)) != -1) {
buffer.write(data, 0, nRead);
}
buffer.flush();
blobs.put(blob, buffer.toByteArray());
} catch (IOException e) {
}
}
@Override
protected void doStart() throws ElasticsearchException {
}