diff --git a/docs/reference/modules/threadpool.asciidoc b/docs/reference/modules/threadpool.asciidoc index 21b07dc4d26..0def25ee10b 100644 --- a/docs/reference/modules/threadpool.asciidoc +++ b/docs/reference/modules/threadpool.asciidoc @@ -41,11 +41,6 @@ pools, but the important ones include: keep-alive `5m`, size `(# of available processors)/2`. -`snapshot_data`:: - For snapshot/restore operations on data files, defaults to `scaling` - with a `5m` keep-alive, - size `5`. - `warmer`:: For segment warm-up operations, defaults to `scaling` with a `5m` keep-alive. diff --git a/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java b/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java index 9f94bbfbe24..bac1e1ed719 100644 --- a/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java +++ b/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java @@ -22,6 +22,8 @@ package org.elasticsearch.common.blobstore; import com.google.common.collect.ImmutableMap; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; /** * @@ -35,22 +37,19 @@ public interface BlobContainer { boolean accept(String blobName); } - interface ReadBlobListener { - - void onPartial(byte[] data, int offset, int size) throws IOException; - - void onCompleted(); - - void onFailure(Throwable t); - } - BlobPath path(); boolean blobExists(String blobName); - void readBlob(String blobName, ReadBlobListener listener); + /** + * Creates a new {@link InputStream} for the given blob name + */ + InputStream openInput(String blobName) throws IOException; - byte[] readBlobFully(String blobName) throws IOException; + /** + * Creates a new OutputStream for the given blob name + */ + OutputStream createOutput(String blobName) throws IOException; boolean deleteBlob(String blobName) throws IOException; diff --git a/src/main/java/org/elasticsearch/common/blobstore/BlobStore.java b/src/main/java/org/elasticsearch/common/blobstore/BlobStore.java index 34844e212f6..eb91d484851 100644 --- a/src/main/java/org/elasticsearch/common/blobstore/BlobStore.java +++ b/src/main/java/org/elasticsearch/common/blobstore/BlobStore.java @@ -23,7 +23,7 @@ package org.elasticsearch.common.blobstore; */ public interface BlobStore { - ImmutableBlobContainer immutableBlobContainer(BlobPath path); + BlobContainer blobContainer(BlobPath path); void delete(BlobPath path); diff --git a/src/main/java/org/elasticsearch/common/blobstore/ImmutableBlobContainer.java b/src/main/java/org/elasticsearch/common/blobstore/ImmutableBlobContainer.java deleted file mode 100644 index f6d516ffdb0..00000000000 --- a/src/main/java/org/elasticsearch/common/blobstore/ImmutableBlobContainer.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.blobstore; - -import java.io.IOException; -import java.io.InputStream; - -/** - * - */ -public interface ImmutableBlobContainer extends BlobContainer { - - interface WriterListener { - void onCompleted(); - - void onFailure(Throwable t); - } - - void writeBlob(String blobName, InputStream is, long sizeInBytes, WriterListener listener); - - void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException; -} diff --git a/src/main/java/org/elasticsearch/common/blobstore/fs/AbstractFsBlobContainer.java b/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java similarity index 65% rename from src/main/java/org/elasticsearch/common/blobstore/fs/AbstractFsBlobContainer.java rename to src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index e93c0f35a94..0e331a55b24 100644 --- a/src/main/java/org/elasticsearch/common/blobstore/fs/AbstractFsBlobContainer.java +++ b/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -21,27 +21,26 @@ package org.elasticsearch.common.blobstore.fs; import com.google.common.collect.ImmutableMap; import org.apache.lucene.util.IOUtils; +import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.io.FileSystemUtils; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; +import java.io.*; /** * */ -public abstract class AbstractFsBlobContainer extends AbstractBlobContainer { +public class FsBlobContainer extends AbstractBlobContainer { protected final FsBlobStore blobStore; protected final File path; - public AbstractFsBlobContainer(FsBlobStore blobStore, BlobPath blobPath, File path) { + public FsBlobContainer(FsBlobStore blobStore, BlobPath blobPath, File path) { super(blobPath); this.blobStore = blobStore; this.path = path; @@ -76,30 +75,20 @@ public abstract class AbstractFsBlobContainer extends AbstractBlobContainer { } @Override - public void readBlob(final String blobName, final ReadBlobListener listener) { - blobStore.executor().execute(new Runnable() { + public InputStream openInput(String name) throws IOException { + return new BufferedInputStream(new FileInputStream(new File(path, name)), blobStore.bufferSizeInBytes()); + } + + @Override + public OutputStream createOutput(String blobName) throws IOException { + final File file = new File(path, blobName); + return new BufferedOutputStream(new FilterOutputStream(new FileOutputStream(file)) { @Override - public void run() { - byte[] buffer = new byte[blobStore.bufferSizeInBytes()]; - FileInputStream is = null; - try { - is = new FileInputStream(new File(path, blobName)); - int bytesRead; - while ((bytesRead = is.read(buffer)) != -1) { - listener.onPartial(buffer, 0, bytesRead); - } - } catch (Throwable t) { - IOUtils.closeWhileHandlingException(is); - listener.onFailure(t); - return; - } - try { - IOUtils.closeWhileHandlingException(is); - listener.onCompleted(); - } catch (Throwable t) { - listener.onFailure(t); - } + public void close() throws IOException { + super.close(); + IOUtils.fsync(file, false); + IOUtils.fsync(path, true); } - }); + }, blobStore.bufferSizeInBytes()); } } diff --git a/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobStore.java b/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobStore.java index ebdb810b27e..d63f63fbb72 100644 --- a/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobStore.java +++ b/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobStore.java @@ -19,35 +19,30 @@ package org.elasticsearch.common.blobstore.fs; +import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.BlobStoreException; -import org.elasticsearch.common.blobstore.ImmutableBlobContainer; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.threadpool.ThreadPool; import java.io.File; -import java.util.concurrent.Executor; /** * */ public class FsBlobStore extends AbstractComponent implements BlobStore { - private final ThreadPool threadPool; - private final File path; private final int bufferSizeInBytes; - public FsBlobStore(Settings settings, ThreadPool threadPool, File path) { + public FsBlobStore(Settings settings, File path) { super(settings); this.path = path; - this.threadPool = threadPool; if (!path.exists()) { boolean b = FileSystemUtils.mkdirs(path); if (!b) { @@ -73,13 +68,9 @@ public class FsBlobStore extends AbstractComponent implements BlobStore { return this.bufferSizeInBytes; } - public Executor executor() { - return threadPool.executor(ThreadPool.Names.SNAPSHOT_DATA); - } - @Override - public ImmutableBlobContainer immutableBlobContainer(BlobPath path) { - return new FsImmutableBlobContainer(this, path, buildAndCreate(path)); + public BlobContainer blobContainer(BlobPath path) { + return new FsBlobContainer(this, path, buildAndCreate(path)); } @Override diff --git a/src/main/java/org/elasticsearch/common/blobstore/fs/FsImmutableBlobContainer.java b/src/main/java/org/elasticsearch/common/blobstore/fs/FsImmutableBlobContainer.java deleted file mode 100644 index ae666d72cae..00000000000 --- a/src/main/java/org/elasticsearch/common/blobstore/fs/FsImmutableBlobContainer.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.blobstore.fs; - -import org.elasticsearch.ElasticsearchIllegalStateException; -import org.elasticsearch.common.blobstore.BlobPath; -import org.elasticsearch.common.blobstore.ImmutableBlobContainer; -import org.elasticsearch.common.blobstore.support.BlobStores; -import org.elasticsearch.common.io.FileSystemUtils; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.RandomAccessFile; - -/** - * - */ -public class FsImmutableBlobContainer extends AbstractFsBlobContainer implements ImmutableBlobContainer { - - public FsImmutableBlobContainer(FsBlobStore blobStore, BlobPath blobPath, File path) { - super(blobStore, blobPath, path); - } - - @Override - public void writeBlob(final String blobName, final InputStream stream, final long sizeInBytes, final WriterListener listener) { - blobStore.executor().execute(new Runnable() { - @Override - public void run() { - final File file = new File(path, blobName); - boolean success = false; - try { - try (final RandomAccessFile raf = new RandomAccessFile(file, "rw"); - final InputStream is = stream) { - // clean the file if it exists - raf.setLength(0); - long bytesWritten = 0; - final byte[] buffer = new byte[blobStore.bufferSizeInBytes()]; - int bytesRead; - while ((bytesRead = is.read(buffer)) != -1) { - raf.write(buffer, 0, bytesRead); - bytesWritten += bytesRead; - } - if (bytesWritten != sizeInBytes) { - throw new ElasticsearchIllegalStateException("[" + blobName + "]: wrote [" + bytesWritten + "], expected to write [" + sizeInBytes + "]"); - } - // fsync the FD we are done with writing - raf.getFD().sync(); - // try to fsync the directory to make sure all metadata is written to - // the storage device - NOTE: if it's a dir it will not throw any exception - FileSystemUtils.syncFile(path, true); - } - success = true; - } catch (Throwable e) { - listener.onFailure(e); - // just on the safe size, try and delete it on failure - FileSystemUtils.tryDeleteFile(file); - } finally { - if (success) { - listener.onCompleted(); - } - } - } - }); - } - - @Override - public void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException { - BlobStores.syncWriteBlob(this, blobName, is, sizeInBytes); - } -} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/common/blobstore/support/AbstractBlobContainer.java b/src/main/java/org/elasticsearch/common/blobstore/support/AbstractBlobContainer.java index 1608a58fa0f..eb076cc2dab 100644 --- a/src/main/java/org/elasticsearch/common/blobstore/support/AbstractBlobContainer.java +++ b/src/main/java/org/elasticsearch/common/blobstore/support/AbstractBlobContainer.java @@ -24,11 +24,7 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; +import java.io.*; /** * @@ -46,46 +42,6 @@ public abstract class AbstractBlobContainer implements BlobContainer { return this.path; } - @Override - public byte[] readBlobFully(String blobName) throws IOException { - final CountDownLatch latch = new CountDownLatch(1); - final AtomicReference failure = new AtomicReference<>(); - final ByteArrayOutputStream bos = new ByteArrayOutputStream(); - - readBlob(blobName, new ReadBlobListener() { - @Override - public void onPartial(byte[] data, int offset, int size) { - bos.write(data, offset, size); - } - - @Override - public void onCompleted() { - latch.countDown(); - } - - @Override - public void onFailure(Throwable t) { - failure.set(t); - latch.countDown(); - } - }); - - try { - latch.await(); - } catch (InterruptedException e) { - throw new InterruptedIOException("Interrupted while waiting to read [" + blobName + "]"); - } - - if (failure.get() != null) { - if (failure.get() instanceof IOException) { - throw (IOException) failure.get(); - } else { - throw new IOException("Failed to get [" + blobName + "]", failure.get()); - } - } - return bos.toByteArray(); - } - @Override public ImmutableMap listBlobsByPrefix(String blobNamePrefix) throws IOException { ImmutableMap allBlobs = listBlobs(); @@ -117,4 +73,5 @@ public abstract class AbstractBlobContainer implements BlobContainer { } } } + } diff --git a/src/main/java/org/elasticsearch/common/blobstore/support/BlobStores.java b/src/main/java/org/elasticsearch/common/blobstore/support/BlobStores.java deleted file mode 100644 index a8dd155d70a..00000000000 --- a/src/main/java/org/elasticsearch/common/blobstore/support/BlobStores.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.blobstore.support; - -import org.elasticsearch.common.blobstore.ImmutableBlobContainer; - -import java.io.IOException; -import java.io.InputStream; -import java.io.InterruptedIOException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; - -/** - * - */ -public class BlobStores { - - public static void syncWriteBlob(ImmutableBlobContainer blobContainer, String blobName, InputStream is, long sizeInBytes) throws IOException { - final CountDownLatch latch = new CountDownLatch(1); - final AtomicReference failure = new AtomicReference<>(); - blobContainer.writeBlob(blobName, is, sizeInBytes, new ImmutableBlobContainer.WriterListener() { - @Override - public void onCompleted() { - latch.countDown(); - } - - @Override - public void onFailure(Throwable t) { - failure.set(t); - latch.countDown(); - } - }); - try { - latch.await(); - } catch (InterruptedException e) { - throw new InterruptedIOException("Interrupted while waiting to write [" + blobName + "]"); - } - - if (failure.get() != null) { - if (failure.get() instanceof IOException) { - throw (IOException) failure.get(); - } else { - throw new IOException("Failed to get [" + blobName + "]", failure.get()); - } - } - } -} diff --git a/src/main/java/org/elasticsearch/common/blobstore/url/AbstractURLBlobContainer.java b/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java similarity index 60% rename from src/main/java/org/elasticsearch/common/blobstore/url/AbstractURLBlobContainer.java rename to src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java index 26f0ca0e10a..5dd633e9114 100644 --- a/src/main/java/org/elasticsearch/common/blobstore/url/AbstractURLBlobContainer.java +++ b/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java @@ -20,32 +20,33 @@ package org.elasticsearch.common.blobstore.url; import com.google.common.collect.ImmutableMap; -import org.apache.lucene.util.IOUtils; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; +import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.net.URL; /** * URL blob implementation of {@link org.elasticsearch.common.blobstore.BlobContainer} */ -public abstract class AbstractURLBlobContainer extends AbstractBlobContainer { +public class URLBlobContainer extends AbstractBlobContainer { protected final URLBlobStore blobStore; protected final URL path; /** - * Constructs new AbstractURLBlobContainer + * Constructs new URLBlobContainer * * @param blobStore blob store * @param blobPath blob path for this container * @param path URL for this container */ - public AbstractURLBlobContainer(URLBlobStore blobStore, BlobPath blobPath, URL path) { + public URLBlobContainer(URLBlobStore blobStore, BlobPath blobPath, URL path) { super(blobPath); this.blobStore = blobStore; this.path = path; @@ -61,7 +62,7 @@ public abstract class AbstractURLBlobContainer extends AbstractBlobContainer { } /** - * This operation is not supported by AbstractURLBlobContainer + * This operation is not supported by URLBlobContainer */ @Override public ImmutableMap listBlobs() throws IOException { @@ -69,7 +70,7 @@ public abstract class AbstractURLBlobContainer extends AbstractBlobContainer { } /** - * This operation is not supported by AbstractURLBlobContainer + * This operation is not supported by URLBlobContainer */ @Override public boolean deleteBlob(String blobName) throws IOException { @@ -77,41 +78,20 @@ public abstract class AbstractURLBlobContainer extends AbstractBlobContainer { } /** - * This operation is not supported by AbstractURLBlobContainer + * This operation is not supported by URLBlobContainer */ @Override public boolean blobExists(String blobName) { throw new UnsupportedOperationException("URL repository doesn't support this operation"); } - /** - * {@inheritDoc} - */ @Override - public void readBlob(final String blobName, final ReadBlobListener listener) { - blobStore.executor().execute(new Runnable() { - @Override - public void run() { - byte[] buffer = new byte[blobStore.bufferSizeInBytes()]; - InputStream is = null; - try { - is = new URL(path, blobName).openStream(); - int bytesRead; - while ((bytesRead = is.read(buffer)) != -1) { - listener.onPartial(buffer, 0, bytesRead); - } - } catch (Throwable t) { - IOUtils.closeWhileHandlingException(is); - listener.onFailure(t); - return; - } - try { - IOUtils.closeWhileHandlingException(is); - listener.onCompleted(); - } catch (Throwable t) { - listener.onFailure(t); - } - } - }); + public InputStream openInput(String name) throws IOException { + return new BufferedInputStream(new URL(path, name).openStream(), blobStore.bufferSizeInBytes()); + } + + @Override + public OutputStream createOutput(String blobName) throws IOException { + throw new UnsupportedOperationException("URL repository doesn't support this operation"); } } diff --git a/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobStore.java b/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobStore.java index 3124fd9accc..16e66d58885 100644 --- a/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobStore.java +++ b/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobStore.java @@ -19,10 +19,10 @@ package org.elasticsearch.common.blobstore.url; +import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.BlobStoreException; -import org.elasticsearch.common.blobstore.ImmutableBlobContainer; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -38,8 +38,6 @@ import java.util.concurrent.Executor; */ public class URLBlobStore extends AbstractComponent implements BlobStore { - private final ThreadPool threadPool; - private final URL path; private final int bufferSizeInBytes; @@ -54,14 +52,12 @@ public class URLBlobStore extends AbstractComponent implements BlobStore { * * * @param settings settings - * @param threadPool thread pool for read operations * @param path base URL */ - public URLBlobStore(Settings settings, ThreadPool threadPool, URL path) { + public URLBlobStore(Settings settings, URL path) { super(settings); this.path = path; this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes(); - this.threadPool = threadPool; } /** @@ -90,22 +86,13 @@ public class URLBlobStore extends AbstractComponent implements BlobStore { return this.bufferSizeInBytes; } - /** - * Returns executor used for read operations - * - * @return executor - */ - public Executor executor() { - return threadPool.executor(ThreadPool.Names.SNAPSHOT_DATA); - } - /** * {@inheritDoc} */ @Override - public ImmutableBlobContainer immutableBlobContainer(BlobPath path) { + public BlobContainer blobContainer(BlobPath path) { try { - return new URLImmutableBlobContainer(this, path, buildPath(path)); + return new URLBlobContainer(this, path, buildPath(path)); } catch (MalformedURLException ex) { throw new BlobStoreException("malformed URL " + path, ex); } diff --git a/src/main/java/org/elasticsearch/common/blobstore/url/URLImmutableBlobContainer.java b/src/main/java/org/elasticsearch/common/blobstore/url/URLImmutableBlobContainer.java deleted file mode 100644 index af37ac4d1fe..00000000000 --- a/src/main/java/org/elasticsearch/common/blobstore/url/URLImmutableBlobContainer.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.blobstore.url; - -import org.elasticsearch.common.blobstore.BlobPath; -import org.elasticsearch.common.blobstore.ImmutableBlobContainer; - -import java.io.IOException; -import java.io.InputStream; -import java.net.URL; - -/** - * Read-only URL-based implementation of {@link ImmutableBlobContainer} - */ -public class URLImmutableBlobContainer extends AbstractURLBlobContainer implements ImmutableBlobContainer { - - /** - * Constructs a new URLImmutableBlobContainer - * - * @param blobStore blob store - * @param blobPath blob path to this container - * @param path URL of this container - */ - public URLImmutableBlobContainer(URLBlobStore blobStore, BlobPath blobPath, URL path) { - super(blobStore, blobPath, path); - } - - /** - * This operation is not supported by URL Blob Container - */ - @Override - public void writeBlob(final String blobName, final InputStream is, final long sizeInBytes, final WriterListener listener) { - throw new UnsupportedOperationException("URL repository is read only"); - } - - /** - * This operation is not supported by URL Blob Container - */ - @Override - public void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException { - throw new UnsupportedOperationException("URL repository is read only"); - } -} diff --git a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java index af52e38b72a..0a39e77dd70 100644 --- a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java +++ b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java @@ -22,16 +22,15 @@ package org.elasticsearch.index.snapshots.blobstore; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.io.ByteStreams; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.store.*; import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.IOUtils; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.metadata.SnapshotId; import org.elasticsearch.common.blobstore.*; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.BytesStreamInput; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.settings.Settings; @@ -48,14 +47,12 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.RepositoryName; -import java.io.ByteArrayOutputStream; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; import static com.google.common.collect.Lists.newArrayList; @@ -64,6 +61,7 @@ import static com.google.common.collect.Lists.newArrayList; */ public class BlobStoreIndexShardRepository extends AbstractComponent implements IndexShardRepository { + private static final int BUFFER_SIZE = 4096; private BlobStore blobStore; private BlobPath basePath; @@ -144,8 +142,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements */ @Override public void restore(SnapshotId snapshotId, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) { - RestoreContext snapshotContext = new RestoreContext(snapshotId, shardId, snapshotShardId, recoveryState); - + final RestoreContext snapshotContext = new RestoreContext(snapshotId, shardId, snapshotShardId, recoveryState); try { recoveryState.getIndex().startTime(System.currentTimeMillis()); snapshotContext.restore(); @@ -205,24 +202,25 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements * Serializes snapshot to JSON * * @param snapshot snapshot - * @return JSON representation of the snapshot - * @throws IOException + * @param stream the stream to output the snapshot JSON represetation to + * @throws IOException if an IOException occurs */ - public static byte[] writeSnapshot(BlobStoreIndexShardSnapshot snapshot) throws IOException { - XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON).prettyPrint(); + public static void writeSnapshot(BlobStoreIndexShardSnapshot snapshot, OutputStream stream) throws IOException { + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream).prettyPrint(); BlobStoreIndexShardSnapshot.toXContent(snapshot, builder, ToXContent.EMPTY_PARAMS); - return builder.bytes().toBytes(); + builder.flush(); + builder.close(); } /** * Parses JSON representation of a snapshot * - * @param data JSON + * @param stream JSON * @return snapshot - * @throws IOException - */ - public static BlobStoreIndexShardSnapshot readSnapshot(byte[] data) throws IOException { - try (XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(data)) { + * @throws IOException if an IOException occurs + * */ + public static BlobStoreIndexShardSnapshot readSnapshot(InputStream stream) throws IOException { + try (XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(stream)) { parser.nextToken(); return BlobStoreIndexShardSnapshot.fromXContent(parser); } @@ -237,7 +235,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements protected final ShardId shardId; - protected final ImmutableBlobContainer blobContainer; + protected final BlobContainer blobContainer; public Context(SnapshotId snapshotId, ShardId shardId) { this(snapshotId, shardId, shardId); @@ -246,7 +244,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements public Context(SnapshotId snapshotId, ShardId shardId, ShardId snapshotShardId) { this.snapshotId = snapshotId; this.shardId = shardId; - blobContainer = blobStore.immutableBlobContainer(basePath.add("indices").add(snapshotShardId.getIndex()).add(Integer.toString(snapshotShardId.getId()))); + blobContainer = blobStore.blobContainer(basePath.add("indices").add(snapshotShardId.getIndex()).add(Integer.toString(snapshotShardId.getId()))); } /** @@ -286,8 +284,8 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements */ public BlobStoreIndexShardSnapshot loadSnapshot() { BlobStoreIndexShardSnapshot snapshot; - try { - snapshot = readSnapshot(blobContainer.readBlobFully(snapshotBlobName(snapshotId))); + try (InputStream stream = blobContainer.openInput(snapshotBlobName(snapshotId))) { + snapshot = readSnapshot(stream); } catch (IOException ex) { throw new IndexShardRestoreFailedException(shardId, "failed to read shard snapshot file", ex); } @@ -362,8 +360,8 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements List snapshots = Lists.newArrayList(); for (String name : blobs.keySet()) { if (name.startsWith(SNAPSHOT_PREFIX)) { - try { - snapshots.add(readSnapshot(blobContainer.readBlobFully(name))); + try (InputStream stream = blobContainer.openInput(name)) { + snapshots.add(readSnapshot(stream)); } catch (IOException e) { logger.warn("failed to read commit point [{}]", e, name); } @@ -469,28 +467,15 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.STARTED); - final CountDownLatch indexLatch = new CountDownLatch(filesToSnapshot.size()); - for (FileInfo snapshotFileInfo : filesToSnapshot) { try { - snapshotFile(snapshotFileInfo, indexLatch, failures); + snapshotFile(snapshotFileInfo); } catch (IOException e) { - failures.add(e); + throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e); } } snapshotStatus.indexVersion(snapshotIndexCommit.getGeneration()); - - try { - indexLatch.await(); - } catch (InterruptedException e) { - failures.add(e); - Thread.currentThread().interrupt(); - } - if (!failures.isEmpty()) { - throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", failures.get(0)); - } - // now create and write the commit point snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FINALIZE); @@ -501,9 +486,10 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements System.currentTimeMillis() - snapshotStatus.startTime(), indexNumberOfFiles, indexTotalFilesSize); //TODO: The time stored in snapshot doesn't include cleanup time. try { - byte[] snapshotData = writeSnapshot(snapshot); logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); - blobContainer.writeBlob(commitPointName, new BytesStreamInput(snapshotData, false), snapshotData.length); + try (OutputStream output = blobContainer.createOutput(commitPointName)) { + writeSnapshot(snapshot, output); + } } catch (IOException e) { throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); } @@ -529,98 +515,32 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements * added to the {@code failures} list * * @param fileInfo file to be snapshotted - * @param latch latch that should be counted down once file is snapshoted - * @param failures thread-safe list of failures * @throws IOException */ - private void snapshotFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, final CountDownLatch latch, final List failures) throws IOException { + private void snapshotFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo) throws IOException { final String file = fileInfo.physicalName(); - IndexInput indexInput = store.openVerifyingInput(file, IOContext.READONCE, fileInfo.metadata()); - writeBlob(indexInput, fileInfo, 0, latch, failures); - } - - private class BlobPartWriter implements ImmutableBlobContainer.WriterListener { - - private final int part; - - private final FileInfo fileInfo; - - private final List failures; - - private final CountDownLatch latch; - - private final IndexInput indexInput; - - private final InputStream inputStream; - - private final InputStreamIndexInput inputStreamIndexInput; - - private BlobPartWriter(IndexInput indexInput, FileInfo fileInfo, int part, CountDownLatch latch, List failures) throws IOException { - this.indexInput = indexInput; - this.part = part; - this.fileInfo = fileInfo; - this.failures = failures; - this.latch = latch; - inputStreamIndexInput = new InputStreamIndexInput(indexInput, fileInfo.partBytes()); - InputStream inputStream = inputStreamIndexInput; - if (snapshotRateLimiter != null) { - inputStream = new RateLimitingInputStream(inputStream, snapshotRateLimiter, snapshotThrottleListener); - } - inputStream = new AbortableInputStream(inputStream, fileInfo.physicalName()); - this.inputStream = inputStream; - } - - @Override - public void onCompleted() { - int nextPart = part + 1; - if (nextPart < fileInfo.numberOfParts()) { - try { - // We have more parts to go - writeBlob(indexInput, fileInfo, nextPart, latch, failures); - } catch (Throwable t) { - onFailure(t); + try (IndexInput indexInput = store.openVerifyingInput(file, IOContext.READONCE, fileInfo.metadata())) { + for (int i = 0; i < fileInfo.numberOfParts(); i++) { + final InputStreamIndexInput inputStreamIndexInput = new InputStreamIndexInput(indexInput, fileInfo.partBytes()); + InputStream inputStream = snapshotRateLimiter == null ? inputStreamIndexInput : new RateLimitingInputStream(inputStreamIndexInput, snapshotRateLimiter, snapshotThrottleListener); + inputStream = new AbortableInputStream(inputStream, fileInfo.physicalName()); + try (OutputStream output = blobContainer.createOutput(fileInfo.partName(i))) { + int len; + final byte[] buffer = new byte[BUFFER_SIZE]; + while ((len = inputStream.read(buffer)) > 0) { + output.write(buffer, 0, len); + } } - } else { - // Last part - verify checksum - try { - Store.verify(indexInput); - indexInput.close(); - snapshotStatus.addProcessedFile(fileInfo.length()); - } catch (Throwable t) { - onFailure(t); - return; - } - latch.countDown(); } - } - - @Override - public void onFailure(Throwable t) { - cleanupFailedSnapshot(t, indexInput, latch, failures); - } - - public void writeBlobPart() throws IOException { - blobContainer.writeBlob(fileInfo.partName(part), inputStream, inputStreamIndexInput.actualSizeToRead(), this); - } - - } - - private void writeBlob(IndexInput indexInput, FileInfo fileInfo, int part, CountDownLatch latch, List failures) { - try { - new BlobPartWriter(indexInput, fileInfo, part, latch, failures).writeBlobPart(); + Store.verify(indexInput); + snapshotStatus.addProcessedFile(fileInfo.length()); } catch (Throwable t) { - cleanupFailedSnapshot(t, indexInput, latch, failures); + failStoreIfCorrupted(t); + snapshotStatus.addProcessedFile(0); + throw t; } } - private void cleanupFailedSnapshot(Throwable t, IndexInput indexInput, CountDownLatch latch, List failures) { - IOUtils.closeWhileHandlingException(indexInput); - failStoreIfCorrupted(t); - snapshotStatus.addProcessedFile(0); - failures.add(t); - latch.countDown(); - } - private void failStoreIfCorrupted(Throwable t) { if (t instanceof CorruptIndexException) { try { @@ -693,72 +613,42 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements * The new logic for StoreFileMetaData reads the entire .si and segments.n files to strengthen the * comparison of the files on a per-segment / per-commit level. */ - private static final void maybeRecalculateMetadataHash(final ImmutableBlobContainer blobContainer, final FileInfo fileInfo, Store.MetadataSnapshot snapshot) throws Throwable { + private static final void maybeRecalculateMetadataHash(final BlobContainer blobContainer, final FileInfo fileInfo, Store.MetadataSnapshot snapshot) throws Throwable { final StoreFileMetaData metadata; if (fileInfo != null && (metadata = snapshot.get(fileInfo.physicalName())) != null) { if (metadata.hash().length > 0 && fileInfo.metadata().hash().length == 0) { // we have a hash - check if our repo has a hash too otherwise we have // to calculate it. - final ByteArrayOutputStream out = new ByteArrayOutputStream(); - final CountDownLatch latch = new CountDownLatch(1); - final CopyOnWriteArrayList failures = new CopyOnWriteArrayList<>(); // we might have multiple parts even though the file is small... make sure we read all of it. - // TODO this API should really support a stream! - blobContainer.readBlob(fileInfo.partName(0), new BlobContainer.ReadBlobListener() { - final AtomicInteger partIndex = new AtomicInteger(); - @Override - public synchronized void onPartial(byte[] data, int offset, int size) throws IOException { - out.write(data, offset, size); - } - - @Override - public synchronized void onCompleted() { - boolean countDown = true; - try { - final int part = partIndex.incrementAndGet(); - if (part < fileInfo.numberOfParts()) { - final String partName = fileInfo.partName(part); - // continue with the new part - blobContainer.readBlob(partName, this); - countDown = false; - return; - } - } finally { - if (countDown) { - latch.countDown(); - } - } - } - - @Override - public void onFailure(Throwable t) { - try { - failures.add(t); - } finally { - latch.countDown(); - } - } - }); - - try { - latch.await(); - } catch (InterruptedException e) { - Thread.interrupted(); + try (final InputStream stream = new PartSliceStream(blobContainer, fileInfo)) { + final byte[] bytes = ByteStreams.toByteArray(stream); + assert bytes != null; + assert bytes.length == fileInfo.length() : bytes.length + " != " + fileInfo.length(); + final BytesRef spare = new BytesRef(bytes); + Store.MetadataSnapshot.hashFile(fileInfo.metadata().hash(), spare); } - - if (!failures.isEmpty()) { - ExceptionsHelper.rethrowAndSuppress(failures); - } - - final byte[] bytes = out.toByteArray(); - assert bytes != null; - assert bytes.length == fileInfo.length() : bytes.length + " != " + fileInfo.length(); - final BytesRef spare = new BytesRef(bytes); - Store.MetadataSnapshot.hashFile(fileInfo.metadata().hash(), spare); } } } + private static final class PartSliceStream extends SlicedInputStream { + + private final BlobContainer container; + private final FileInfo info; + + public PartSliceStream(BlobContainer container, FileInfo info) { + super(info.numberOfParts()); + this.info = info; + this.container = container; + } + + @Override + protected InputStream openSlice(long slice) throws IOException { + return container.openInput(info.partName(slice)); + + } + } + /** * Context for restore operations */ @@ -864,25 +754,14 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements if (logger.isTraceEnabled()) { logger.trace("[{}] [{}] recovering_files [{}] with total_size [{}], reusing_files [{}] with reused_size [{}]", shardId, snapshotId, numberOfFiles, new ByteSizeValue(totalSize), numberOfReusedFiles, new ByteSizeValue(reusedTotalSize)); } - - final CountDownLatch latch = new CountDownLatch(filesToRecover.size()); - final CopyOnWriteArrayList failures = new CopyOnWriteArrayList<>(); - - for (final FileInfo fileToRecover : filesToRecover) { - logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name()); - restoreFile(fileToRecover, latch, failures); - } - try { - latch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + for (final FileInfo fileToRecover : filesToRecover) { + logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name()); + restoreFile(fileToRecover); + } + } catch (IOException ex) { + throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex); } - - if (!failures.isEmpty()) { - throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", failures.get(0)); - } - // read the snapshot data persisted long version = -1; try { @@ -919,107 +798,52 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements * added to the {@code failures} list * * @param fileInfo file to be restored - * @param latch latch that should be counted down once file is snapshoted - * @param failures thread-safe list of failures */ - private void restoreFile(final FileInfo fileInfo, final CountDownLatch latch, final List failures) { - final IndexOutput indexOutput; - try { - // we create an output with no checksum, this is because the pure binary data of the file is not - // the checksum (because of seek). We will create the checksum file once copying is done - indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), IOContext.DEFAULT, fileInfo.metadata()); - } catch (IOException e) { - try { - failures.add(e); - } finally { - latch.countDown(); - } - return; - } - - String firstFileToRecover = fileInfo.partName(0); - final AtomicInteger partIndex = new AtomicInteger(); + private void restoreFile(final FileInfo fileInfo) throws IOException { boolean success = false; - try { - blobContainer.readBlob(firstFileToRecover, new BlobContainer.ReadBlobListener() { - @Override - public synchronized void onPartial(byte[] data, int offset, int size) throws IOException { - recoveryState.getIndex().addRecoveredByteCount(size); - RecoveryState.File file = recoveryState.getIndex().file(fileInfo.name()); + RecoveryState.File file = recoveryState.getIndex().file(fileInfo.name()); + try (InputStream stream = new PartSliceStream(blobContainer, fileInfo)) { + try (final IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), IOContext.DEFAULT, fileInfo.metadata())) { + final byte[] buffer = new byte[BUFFER_SIZE]; + int length; + while((length=stream.read(buffer))>0){ + indexOutput.writeBytes(buffer,0,length); if (file != null) { - file.updateRecovered(size); + file.updateRecovered(length); } - indexOutput.writeBytes(data, offset, size); if (restoreRateLimiter != null) { - rateLimiterListener.onRestorePause(restoreRateLimiter.pause(size)); + rateLimiterListener.onRestorePause(restoreRateLimiter.pause(length)); } } + Store.verify(indexOutput); + indexOutput.close(); + // write the checksum + if (fileInfo.metadata().hasLegacyChecksum()) { + Store.LegacyChecksums legacyChecksums = new Store.LegacyChecksums(); + legacyChecksums.add(fileInfo.metadata()); + legacyChecksums.write(store); - @Override - public synchronized void onCompleted() { - int part = partIndex.incrementAndGet(); - if (part < fileInfo.numberOfParts()) { - String partName = fileInfo.partName(part); - // continue with the new part - blobContainer.readBlob(partName, this); - return; - } else { - // we are done... - try { - Store.verify(indexOutput); - indexOutput.close(); - // write the checksum - if (fileInfo.metadata().hasLegacyChecksum()) { - Store.LegacyChecksums legacyChecksums = new Store.LegacyChecksums(); - legacyChecksums.add(fileInfo.metadata()); - legacyChecksums.write(store); - - } - store.directory().sync(Collections.singleton(fileInfo.physicalName())); - recoveryState.getIndex().addRecoveredFileCount(1); - } catch (IOException e) { - onFailure(e); - return; - } - } - latch.countDown(); } - - @Override - public void onFailure(Throwable t) { - try { - failures.add(t); - IOUtils.closeWhileHandlingException(indexOutput); - if (t instanceof CorruptIndexException) { - try { - store.markStoreCorrupted((CorruptIndexException) t); - } catch (IOException e) { - logger.warn("store cannot be marked as corrupted", e); - } - } - store.deleteQuiet(fileInfo.physicalName()); - } finally { - latch.countDown(); - } - } - }); - success = true; - } finally { - if (!success) { + store.directory().sync(Collections.singleton(fileInfo.physicalName())); + recoveryState.getIndex().addRecoveredFileCount(1); + success = true; + } catch (CorruptIndexException ex) { try { - IOUtils.closeWhileHandlingException(indexOutput); - store.deleteQuiet(fileInfo.physicalName()); - } finally { - latch.countDown(); + store.markStoreCorrupted(ex); + } catch (IOException e) { + logger.warn("store cannot be marked as corrupted", e); + } + throw ex; + } finally { + if (success == false) { + store.deleteQuiet(fileInfo.physicalName()); } - } } - } } - + public interface RateLimiterListener { void onRestorePause(long nanos); diff --git a/src/main/java/org/elasticsearch/index/snapshots/blobstore/SlicedInputStream.java b/src/main/java/org/elasticsearch/index/snapshots/blobstore/SlicedInputStream.java new file mode 100644 index 00000000000..f49459a280b --- /dev/null +++ b/src/main/java/org/elasticsearch/index/snapshots/blobstore/SlicedInputStream.java @@ -0,0 +1,113 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.snapshots.blobstore; + +import org.apache.lucene.util.IOUtils; + +import java.io.IOException; +import java.io.InputStream; + +/** + * A {@link SlicedInputStream} is a logical + * concatenation one or more input streams. In contrast to the JDKs + * {@link java.io.SequenceInputStream} this stream doesn't require the instantiation + * of all logical sub-streams ahead of time. Instead, {@link #openSlice(long)} is called + * if a new slice is required. Each slice is closed once it's been fully consumed or if + * close is called before. + */ +public abstract class SlicedInputStream extends InputStream { + private long slice = 0; + private InputStream currentStream; + private final long numSlices; + private boolean initialized = false; + + /** + * Creates a new SlicedInputStream + * @param numSlices the number of slices to consume + */ + protected SlicedInputStream(final long numSlices) { + this.numSlices = numSlices; + } + + private InputStream nextStream() throws IOException { + assert initialized == false || currentStream != null; + initialized = true; + IOUtils.close(currentStream); + if (slice < numSlices) { + currentStream = openSlice(slice++); + } else { + currentStream = null; + } + return currentStream; + } + + /** + * Called for each logical slice given a zero based slice ordinal. + */ + protected abstract InputStream openSlice(long slice) throws IOException; + + private InputStream currentStream() throws IOException { + if (currentStream == null) { + return initialized ? null : nextStream(); + } + return currentStream; + } + + @Override + public final int read() throws IOException { + InputStream stream = currentStream(); + if (stream == null) { + return -1; + } + final int read = stream.read(); + if (read == -1) { + nextStream(); + return read(); + } + return read; + } + + @Override + public final int read(byte[] buffer, int offset, int length) throws IOException { + final InputStream stream = currentStream(); + if (stream == null) { + return -1; + } + final int read = stream.read(buffer, offset, length); + if (read <= 0) { + nextStream(); + return read(buffer, offset, length); + } + return read; + } + + @Override + public final void close() throws IOException { + IOUtils.close(currentStream); + initialized = true; + currentStream = null; + } + + @Override + public final int available() throws IOException { + InputStream stream = currentStream(); + return stream == null ? 0 : stream.available(); + } + +} diff --git a/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index b25fb796aa2..ae3df7a096b 100644 --- a/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -22,16 +22,17 @@ package org.elasticsearch.repositories.blobstore; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import com.google.common.io.ByteStreams; import org.apache.lucene.store.RateLimiter; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.SnapshotId; +import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; -import org.elasticsearch.common.blobstore.ImmutableBlobContainer; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.compress.CompressorFactory; @@ -52,6 +53,8 @@ import org.elasticsearch.snapshots.*; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.nio.file.NoSuchFileException; import java.util.ArrayList; import java.util.List; @@ -103,7 +106,7 @@ import static com.google.common.collect.Lists.newArrayList; */ public abstract class BlobStoreRepository extends AbstractLifecycleComponent implements Repository, RateLimiterListener { - private ImmutableBlobContainer snapshotsBlobContainer; + private BlobContainer snapshotsBlobContainer; protected final String repositoryName; @@ -150,7 +153,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent snapshotIds = snapshots(); if (!snapshotIds.contains(snapshotId)) { snapshotIds = ImmutableList.builder().addAll(snapshotIds).add(snapshotId).build(); @@ -381,22 +392,24 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent indices, boolean ignoreIndexErrors) { MetaData metaData; - try { - byte[] data = snapshotsBlobContainer.readBlobFully(metaDataBlobName(snapshotId)); + try (InputStream blob = snapshotsBlobContainer.openInput(metaDataBlobName(snapshotId))){ + byte[] data = ByteStreams.toByteArray(blob); metaData = readMetaData(data); } catch (FileNotFoundException | NoSuchFileException ex) { throw new SnapshotMissingException(snapshotId, ex); @@ -420,9 +433,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent readSnapshotList() throws IOException { - byte[] data = snapshotsBlobContainer.readBlobFully(SNAPSHOTS_FILE); - ArrayList snapshots = new ArrayList<>(); - try (XContentParser parser = XContentHelper.createParser(data, 0, data.length)) { - if (parser.nextToken() == XContentParser.Token.START_OBJECT) { - if (parser.nextToken() == XContentParser.Token.FIELD_NAME) { - String currentFieldName = parser.currentName(); - if ("snapshots".equals(currentFieldName)) { - if (parser.nextToken() == XContentParser.Token.START_ARRAY) { - while (parser.nextToken() != XContentParser.Token.END_ARRAY) { - snapshots.add(new SnapshotId(repositoryName, parser.text())); + try (InputStream blob = snapshotsBlobContainer.openInput(SNAPSHOTS_FILE)){ + final byte[] data = ByteStreams.toByteArray(blob); + ArrayList snapshots = new ArrayList<>(); + try (XContentParser parser = XContentHelper.createParser(data, 0, data.length)) { + if (parser.nextToken() == XContentParser.Token.START_OBJECT) { + if (parser.nextToken() == XContentParser.Token.FIELD_NAME) { + String currentFieldName = parser.currentName(); + if ("snapshots".equals(currentFieldName)) { + if (parser.nextToken() == XContentParser.Token.START_ARRAY) { + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + snapshots.add(new SnapshotId(repositoryName, parser.text())); + } } } } } } + return ImmutableList.copyOf(snapshots); } - return ImmutableList.copyOf(snapshots); } @Override diff --git a/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java b/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java index d7fa2011b8a..184c672b014 100644 --- a/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java +++ b/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java @@ -66,7 +66,7 @@ public class FsRepository extends BlobStoreRepository { * @throws IOException */ @Inject - public FsRepository(RepositoryName name, RepositorySettings repositorySettings, ThreadPool threadPool, IndexShardRepository indexShardRepository) throws IOException { + public FsRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException { super(name.getName(), repositorySettings, indexShardRepository); File locationFile; String location = repositorySettings.settings().get("location", componentSettings.get("location")); @@ -76,7 +76,7 @@ public class FsRepository extends BlobStoreRepository { } else { locationFile = new File(location); } - blobStore = new FsBlobStore(componentSettings, threadPool, locationFile); + blobStore = new FsBlobStore(componentSettings, locationFile); this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", componentSettings.getAsBytesSize("chunk_size", null)); this.compress = repositorySettings.settings().getAsBoolean("compress", componentSettings.getAsBoolean("compress", false)); this.basePath = BlobPath.cleanPath(); diff --git a/src/main/java/org/elasticsearch/repositories/uri/URLRepository.java b/src/main/java/org/elasticsearch/repositories/uri/URLRepository.java index 023fdb8d74e..26783f3da6c 100644 --- a/src/main/java/org/elasticsearch/repositories/uri/URLRepository.java +++ b/src/main/java/org/elasticsearch/repositories/uri/URLRepository.java @@ -63,7 +63,7 @@ public class URLRepository extends BlobStoreRepository { * @throws IOException */ @Inject - public URLRepository(RepositoryName name, RepositorySettings repositorySettings, ThreadPool threadPool, IndexShardRepository indexShardRepository) throws IOException { + public URLRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException { super(name.getName(), repositorySettings, indexShardRepository); URL url; String path = repositorySettings.settings().get("url", componentSettings.get("url")); @@ -73,7 +73,7 @@ public class URLRepository extends BlobStoreRepository { url = new URL(path); } listDirectories = repositorySettings.settings().getAsBoolean("list_directories", componentSettings.getAsBoolean("list_directories", true)); - blobStore = new URLBlobStore(componentSettings, threadPool, url); + blobStore = new URLBlobStore(componentSettings, url); basePath = BlobPath.cleanPath(); } diff --git a/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java b/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java index 6e4e48d2a24..e5a6397aff2 100644 --- a/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java +++ b/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java @@ -65,7 +65,6 @@ public class RestThreadPoolAction extends AbstractCatAction { ThreadPool.Names.REFRESH, ThreadPool.Names.SEARCH, ThreadPool.Names.SNAPSHOT, - ThreadPool.Names.SNAPSHOT_DATA, ThreadPool.Names.SUGGEST, ThreadPool.Names.WARMER }; diff --git a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 1936dfbc06f..163e0124fe5 100644 --- a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -78,7 +78,6 @@ public class ThreadPool extends AbstractComponent { public static final String REFRESH = "refresh"; public static final String WARMER = "warmer"; public static final String SNAPSHOT = "snapshot"; - public static final String SNAPSHOT_DATA = "snapshot_data"; public static final String OPTIMIZE = "optimize"; public static final String BENCH = "bench"; } @@ -125,7 +124,6 @@ public class ThreadPool extends AbstractComponent { .put(Names.REFRESH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt10).build()) .put(Names.WARMER, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()) .put(Names.SNAPSHOT, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()) - .put(Names.SNAPSHOT_DATA, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", 5).build()) .put(Names.OPTIMIZE, settingsBuilder().put("type", "fixed").put("size", 1).build()) .put(Names.BENCH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()) .build(); diff --git a/src/test/java/org/elasticsearch/common/blobstore/BlobStoreTest.java b/src/test/java/org/elasticsearch/common/blobstore/BlobStoreTest.java new file mode 100644 index 00000000000..50a9c251985 --- /dev/null +++ b/src/test/java/org/elasticsearch/common/blobstore/BlobStoreTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.common.blobstore; + +import com.carrotsearch.randomizedtesting.LifecycleScope; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.blobstore.fs.FsBlobStore; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + +import java.io.*; +import java.util.Arrays; +import java.util.Random; + +public class BlobStoreTest extends ElasticsearchTestCase { + + @Test + public void testWriteRead() throws IOException { + BlobContainer store = newBlobContainer(); + int length = randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)); + byte[] data = new byte[length]; + for (int i = 0; i < data.length; i++) { + data[i] = (byte) randomInt(); + } + try (OutputStream stream = store.createOutput("foobar")) { + stream.write(data); + } + InputStream stream = store.openInput("foobar"); + + BytesRef target = new BytesRef(); + while (target.length < data.length) { + byte[] buffer = new byte[scaledRandomIntBetween(1, data.length - target.length)]; + int offset = scaledRandomIntBetween(0, buffer.length - 1); + int read = stream.read(buffer, offset, buffer.length - offset); + target.append(new BytesRef(buffer, offset, read)); + } + assertEquals(data.length, target.length); + assertArrayEquals(data, Arrays.copyOfRange(target.bytes, target.offset, target.length)); + } + + protected BlobContainer newBlobContainer() { + File tempDir = newTempDir(LifecycleScope.TEST); + Settings settings = randomBoolean() ? ImmutableSettings.EMPTY : ImmutableSettings.builder().put("buffer_size", new ByteSizeValue(randomIntBetween(1, 100), ByteSizeUnit.KB)).build(); + FsBlobStore store = new FsBlobStore(settings, tempDir); + return store.blobContainer(new BlobPath()); + } +} diff --git a/src/test/java/org/elasticsearch/index/snapshots/blobstore/SlicedInputStreamTest.java b/src/test/java/org/elasticsearch/index/snapshots/blobstore/SlicedInputStreamTest.java new file mode 100644 index 00000000000..ea2b51ab971 --- /dev/null +++ b/src/test/java/org/elasticsearch/index/snapshots/blobstore/SlicedInputStreamTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.snapshots.blobstore; + +import com.carrotsearch.randomizedtesting.generators.RandomInts; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + +import java.io.*; +import java.util.Random; + +import static org.hamcrest.Matchers.equalTo; + +public class SlicedInputStreamTest extends ElasticsearchTestCase { + + @Test + public void readRandom() throws IOException { + int parts = randomIntBetween(1, 20); + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + int numWriteOps = scaledRandomIntBetween(1000, 10000); + final long seed = randomLong(); + Random random = new Random(seed); + for (int i = 0; i < numWriteOps; i++) { + switch(random.nextInt(5)) { + case 1: + stream.write(random.nextInt(Byte.MAX_VALUE)); + break; + default: + stream.write(randomBytes(random)); + break; + } + } + + final CheckClosedInputStream[] streams = new CheckClosedInputStream[parts]; + byte[] bytes = stream.toByteArray(); + int slice = bytes.length / parts; + int offset = 0; + int length; + for (int i = 0; i < parts; i++) { + length = i == parts-1 ? bytes.length-offset : slice; + streams[i] = new CheckClosedInputStream(new ByteArrayInputStream(bytes, offset, length)); + offset += length; + } + + SlicedInputStream input = new SlicedInputStream(parts) { + + @Override + protected InputStream openSlice(long slice) throws IOException { + return streams[(int)slice]; + } + }; + random = new Random(seed); + assertThat(input.available(), equalTo(streams[0].available())); + for (int i = 0; i < numWriteOps; i++) { + switch(random.nextInt(5)) { + case 1: + assertThat(random.nextInt(Byte.MAX_VALUE), equalTo(input.read())); + break; + default: + byte[] b = randomBytes(random); + byte[] buffer = new byte[b.length]; + int read = readFully(input, buffer); + assertThat(b.length, equalTo(read)); + assertArrayEquals(b, buffer); + break; + } + } + + assertThat(input.available(), equalTo(0)); + for (int i =0; i < streams.length-1; i++) { + assertTrue(streams[i].closed); + } + input.close(); + + for (int i =0; i < streams.length; i++) { + assertTrue(streams[i].closed); + } + + } + + private int readFully(InputStream stream, byte[] buffer) throws IOException { + for (int i = 0; i < buffer.length;) { + int read = stream.read(buffer, i, buffer.length-i); + if (read == -1) { + if (i == 0) { + return -1; + } else { + return i; + } + } + i+= read; + } + return buffer.length; + } + + private byte[] randomBytes(Random random) { + int length = RandomInts.randomIntBetween(random, 1, 10); + byte[] data = new byte[length]; + random.nextBytes(data); + return data; + } + + private static final class CheckClosedInputStream extends FilterInputStream { + + public boolean closed = false; + + public CheckClosedInputStream(InputStream in) { + super(in); + } + + @Override + public void close() throws IOException { + closed = true; + super.close(); + } + } +} diff --git a/src/test/java/org/elasticsearch/snapshots/mockstore/ImmutableBlobContainerWrapper.java b/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java similarity index 70% rename from src/test/java/org/elasticsearch/snapshots/mockstore/ImmutableBlobContainerWrapper.java rename to src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java index d72409ecfc0..055cd3276f1 100644 --- a/src/test/java/org/elasticsearch/snapshots/mockstore/ImmutableBlobContainerWrapper.java +++ b/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java @@ -21,31 +21,22 @@ package org.elasticsearch.snapshots.mockstore; import com.google.common.collect.ImmutableMap; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; -import org.elasticsearch.common.blobstore.ImmutableBlobContainer; +import org.elasticsearch.common.blobstore.BlobContainer; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; /** * */ -public class ImmutableBlobContainerWrapper implements ImmutableBlobContainer { - private ImmutableBlobContainer delegate; +public class BlobContainerWrapper implements BlobContainer { + private BlobContainer delegate; - public ImmutableBlobContainerWrapper(ImmutableBlobContainer delegate) { + public BlobContainerWrapper(BlobContainer delegate) { this.delegate = delegate; } - @Override - public void writeBlob(String blobName, InputStream is, long sizeInBytes, WriterListener listener) { - delegate.writeBlob(blobName, is, sizeInBytes, listener); - } - - @Override - public void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException { - delegate.writeBlob(blobName, is, sizeInBytes); - } - @Override public BlobPath path() { return delegate.path(); @@ -57,13 +48,13 @@ public class ImmutableBlobContainerWrapper implements ImmutableBlobContainer { } @Override - public void readBlob(String blobName, ReadBlobListener listener) { - delegate.readBlob(blobName, listener); + public InputStream openInput(String name) throws IOException { + return delegate.openInput(name); } @Override - public byte[] readBlobFully(String blobName) throws IOException { - return delegate.readBlobFully(blobName); + public OutputStream createOutput(String blobName) throws IOException { + return delegate.createOutput(blobName); } @Override diff --git a/src/test/java/org/elasticsearch/snapshots/mockstore/BlobStoreWrapper.java b/src/test/java/org/elasticsearch/snapshots/mockstore/BlobStoreWrapper.java index 3e80880cc65..e5df46eb998 100644 --- a/src/test/java/org/elasticsearch/snapshots/mockstore/BlobStoreWrapper.java +++ b/src/test/java/org/elasticsearch/snapshots/mockstore/BlobStoreWrapper.java @@ -18,9 +18,9 @@ */ package org.elasticsearch.snapshots.mockstore; +import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; -import org.elasticsearch.common.blobstore.ImmutableBlobContainer; /** * @@ -34,8 +34,8 @@ public class BlobStoreWrapper implements BlobStore { } @Override - public ImmutableBlobContainer immutableBlobContainer(BlobPath path) { - return delegate.immutableBlobContainer(path); + public BlobContainer blobContainer(BlobPath path) { + return delegate.blobContainer(path); } @Override diff --git a/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 6347e114681..d628bf4c470 100644 --- a/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -24,16 +24,16 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; -import org.elasticsearch.common.blobstore.ImmutableBlobContainer; +import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.repositories.RepositoryName; import org.elasticsearch.repositories.RepositorySettings; import org.elasticsearch.repositories.fs.FsRepository; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.io.UnsupportedEncodingException; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; @@ -47,10 +47,6 @@ public class MockRepository extends FsRepository { private final AtomicLong failureCounter = new AtomicLong(); - public void resetFailureCount() { - failureCounter.set(0); - } - public long getFailureCount() { return failureCounter.get(); } @@ -72,8 +68,8 @@ public class MockRepository extends FsRepository { private volatile boolean blocked = false; @Inject - public MockRepository(RepositoryName name, ThreadPool threadPool, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException { - super(name, repositorySettings, threadPool, indexShardRepository); + public MockRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException { + super(name, repositorySettings, indexShardRepository); randomControlIOExceptionRate = repositorySettings.settings().getAsDouble("random_control_io_exception_rate", 0.0); randomDataFileIOExceptionRate = repositorySettings.settings().getAsDouble("random_data_file_io_exception_rate", 0.0); blockOnControlFiles = repositorySettings.settings().getAsBoolean("block_on_control", false); @@ -134,8 +130,8 @@ public class MockRepository extends FsRepository { } @Override - public ImmutableBlobContainer immutableBlobContainer(BlobPath path) { - return new MockImmutableBlobContainer(super.immutableBlobContainer(path)); + public BlobContainer blobContainer(BlobPath path) { + return new MockBlobContainer(super.blobContainer(path)); } public synchronized void unblockExecution() { @@ -166,7 +162,7 @@ public class MockRepository extends FsRepository { return wasBlocked; } - private class MockImmutableBlobContainer extends ImmutableBlobContainerWrapper { + private class MockBlobContainer extends BlobContainerWrapper { private MessageDigest digest; private boolean shouldFail(String blobName, double probability) { @@ -232,60 +228,20 @@ public class MockRepository extends FsRepository { } } - private boolean maybeIOExceptionOrBlock(String blobName, ImmutableBlobContainer.WriterListener listener) { - try { - maybeIOExceptionOrBlock(blobName); - return true; - } catch (IOException ex) { - listener.onFailure(ex); - return false; - } - } - private boolean maybeIOExceptionOrBlock(String blobName, ImmutableBlobContainer.ReadBlobListener listener) { - try { - maybeIOExceptionOrBlock(blobName); - return true; - } catch (IOException ex) { - listener.onFailure(ex); - return false; - } - } - - - public MockImmutableBlobContainer(ImmutableBlobContainer delegate) { + public MockBlobContainer(BlobContainer delegate) { super(delegate); } - @Override - public void writeBlob(String blobName, InputStream is, long sizeInBytes, WriterListener listener) { - if (maybeIOExceptionOrBlock(blobName, listener) ) { - super.writeBlob(blobName, is, sizeInBytes, listener); - } - } - - @Override - public void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException { - maybeIOExceptionOrBlock(blobName); - super.writeBlob(blobName, is, sizeInBytes); - } - @Override public boolean blobExists(String blobName) { return super.blobExists(blobName); } @Override - public void readBlob(String blobName, ReadBlobListener listener) { - if (maybeIOExceptionOrBlock(blobName, listener)) { - super.readBlob(blobName, listener); - } - } - - @Override - public byte[] readBlobFully(String blobName) throws IOException { - maybeIOExceptionOrBlock(blobName); - return super.readBlobFully(blobName); + public InputStream openInput(String name) throws IOException { + maybeIOExceptionOrBlock(name); + return super.openInput(name); } @Override @@ -318,6 +274,11 @@ public class MockRepository extends FsRepository { return super.listBlobsByPrefix(blobNamePrefix); } + @Override + public OutputStream createOutput(String blobName) throws IOException { + maybeIOExceptionOrBlock(blobName); + return super.createOutput(blobName); + } } } } diff --git a/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/src/test/java/org/elasticsearch/test/InternalTestCluster.java index b9ba9667dae..67f7dc3da48 100644 --- a/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -366,7 +366,7 @@ public final class InternalTestCluster extends TestCluster { for (String name : Arrays.asList(ThreadPool.Names.BULK, ThreadPool.Names.FLUSH, ThreadPool.Names.GET, ThreadPool.Names.INDEX, ThreadPool.Names.MANAGEMENT, ThreadPool.Names.MERGE, ThreadPool.Names.OPTIMIZE, ThreadPool.Names.PERCOLATE, ThreadPool.Names.REFRESH, ThreadPool.Names.SEARCH, ThreadPool.Names.SNAPSHOT, - ThreadPool.Names.SNAPSHOT_DATA, ThreadPool.Names.SUGGEST, ThreadPool.Names.WARMER)) { + ThreadPool.Names.SUGGEST, ThreadPool.Names.WARMER)) { if (random.nextBoolean()) { final String type = RandomPicks.randomFrom(random, Arrays.asList("fixed", "cached", "scaling")); builder.put(ThreadPool.THREADPOOL_GROUP + name + ".type", type);