From 456ea56527cc8cac951b58143a79d39633c520d7 Mon Sep 17 00:00:00 2001 From: Ali Beyad Date: Tue, 2 Aug 2016 09:21:43 -0400 Subject: [PATCH] Cleans up the BlobContainer interface by removing the (#19727) writeBlob method takes a BytesReference in favor of just the writeBlob method that takes an InputStream. Closes #18528 --- .../common/blobstore/BlobContainer.java | 18 ------------------ .../support/AbstractBlobContainer.java | 10 ---------- .../common/blobstore/url/URLBlobContainer.java | 5 ----- .../blobstore/BlobStoreRepository.java | 14 +++++++++++--- .../blobstore/ChecksumBlobStoreFormat.java | 5 ++++- .../blobstore/BlobStoreRepositoryTests.java | 5 ++++- .../snapshots/BlobStoreFormatIT.java | 10 ++++++++-- .../mockstore/BlobContainerWrapper.java | 7 ------- .../snapshots/mockstore/MockRepository.java | 7 ------- .../azure/blobstore/AzureBlobContainer.java | 9 --------- .../gcs/GoogleCloudStorageBlobContainer.java | 11 ----------- .../cloud/aws/blobstore/S3BlobContainer.java | 8 -------- .../ESBlobStoreContainerTestCase.java | 16 +++++++++++----- .../repositories/ESBlobStoreTestCase.java | 13 +++++++++---- 14 files changed, 47 insertions(+), 91 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java b/core/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java index 4229ee954d4..50d1450bb72 100644 --- a/core/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java +++ b/core/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java @@ -19,8 +19,6 @@ package org.elasticsearch.common.blobstore; -import org.elasticsearch.common.bytes.BytesReference; - import java.io.IOException; import java.io.InputStream; import java.nio.file.NoSuchFileException; @@ -75,22 +73,6 @@ public interface BlobContainer { */ void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException; - /** - * Writes the input bytes to a new blob in the container with the given name. This method assumes the - * container does not already contain a blob of the same blobName. If a blob by the same name already - * exists, the operation will fail and an {@link IOException} will be thrown. - * - * TODO: Remove this in favor of a single {@link #writeBlob(String, InputStream, long)} method. - * See https://github.com/elastic/elasticsearch/issues/18528 - * - * @param blobName - * The name of the blob to write the contents of the input stream to. - * @param bytes - * The bytes to write to the blob. - * @throws IOException if a blob by the same name already exists, or the target blob could not be written to. - */ - void writeBlob(String blobName, BytesReference bytes) throws IOException; - /** * Deletes a blob with giving name, if the blob exists. If the blob does not exist, this method throws an IOException. * diff --git a/core/src/main/java/org/elasticsearch/common/blobstore/support/AbstractBlobContainer.java b/core/src/main/java/org/elasticsearch/common/blobstore/support/AbstractBlobContainer.java index 1c4652c9f10..72a9a225322 100644 --- a/core/src/main/java/org/elasticsearch/common/blobstore/support/AbstractBlobContainer.java +++ b/core/src/main/java/org/elasticsearch/common/blobstore/support/AbstractBlobContainer.java @@ -21,10 +21,6 @@ package org.elasticsearch.common.blobstore.support; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; -import org.elasticsearch.common.bytes.BytesReference; - -import java.io.IOException; -import java.io.InputStream; /** * A base abstract blob container that implements higher level container methods. @@ -42,10 +38,4 @@ public abstract class AbstractBlobContainer implements BlobContainer { return this.path; } - @Override - public void writeBlob(String blobName, BytesReference bytes) throws IOException { - try (InputStream stream = bytes.streamInput()) { - writeBlob(blobName, stream, bytes.length()); - } - } } diff --git a/core/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java b/core/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java index 5bf55213aca..537031ef778 100644 --- a/core/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java +++ b/core/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java @@ -22,7 +22,6 @@ package org.elasticsearch.common.blobstore.url; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; -import org.elasticsearch.common.bytes.BytesReference; import java.io.BufferedInputStream; import java.io.IOException; @@ -108,8 +107,4 @@ public class URLBlobContainer extends AbstractBlobContainer { throw new UnsupportedOperationException("URL repository doesn't support this operation"); } - @Override - public void writeBlob(String blobName, BytesReference data) throws IOException { - throw new UnsupportedOperationException("URL repository doesn't support this operation"); - } } diff --git a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index fe11a502c42..efc3c2cfe21 100644 --- a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -663,7 +663,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp byte[] testBytes = Strings.toUTF8Bytes(seed); BlobContainer testContainer = blobStore().blobContainer(basePath().add(testBlobPrefix(seed))); String blobName = "master.dat"; - testContainer.writeBlob(blobName + "-temp", new BytesArray(testBytes)); + BytesArray bytes = new BytesArray(testBytes); + try (InputStream stream = bytes.streamInput()) { + testContainer.writeBlob(blobName + "-temp", stream, bytes.length()); + } // Make sure that move is supported testContainer.move(blobName + "-temp", blobName); return seed; @@ -838,7 +841,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private void writeAtomic(final String blobName, final BytesReference bytesRef) throws IOException { final String tempBlobName = "pending-" + blobName; - snapshotsBlobContainer.writeBlob(tempBlobName, bytesRef); + try (InputStream stream = bytesRef.streamInput()) { + snapshotsBlobContainer.writeBlob(tempBlobName, stream, bytesRef.length()); + } try { snapshotsBlobContainer.move(tempBlobName, blobName); } catch (IOException ex) { @@ -900,7 +905,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp BlobContainer testBlobContainer = blobStore().blobContainer(basePath().add(testBlobPrefix(seed))); if (testBlobContainer.blobExists("master.dat")) { try { - testBlobContainer.writeBlob("data-" + localNode.getId() + ".dat", new BytesArray(seed)); + BytesArray bytes = new BytesArray(seed); + try (InputStream stream = bytes.streamInput()) { + testBlobContainer.writeBlob("data-" + localNode.getId() + ".dat", stream, bytes.length()); + } } catch (IOException exp) { throw new RepositoryVerificationException(metadata.name(), "store location [" + blobStore() + "] is not accessible on the node [" + localNode + "]", exp); } diff --git a/core/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/core/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 37df2ddfb90..17fad25e610 100644 --- a/core/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/core/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -179,7 +179,10 @@ public class ChecksumBlobStoreFormat extends BlobStoreForm } CodecUtil.writeFooter(indexOutput); } - blobContainer.writeBlob(blobName, new BytesArray(byteArrayOutputStream.toByteArray())); + BytesArray bytesArray = new BytesArray(byteArrayOutputStream.toByteArray()); + try (InputStream stream = bytesArray.streamInput()) { + blobContainer.writeBlob(blobName, stream, bytesArray.length()); + } } } diff --git a/core/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/core/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java index 6c4af1f7737..83e4c0e86cb 100644 --- a/core/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/core/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -193,7 +194,9 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase { } bRef = bStream.bytes(); } - repository.blobContainer().writeBlob(BlobStoreRepository.SNAPSHOTS_FILE, bRef); // write to index file + try (StreamInput stream = bRef.streamInput()) { + repository.blobContainer().writeBlob(BlobStoreRepository.SNAPSHOTS_FILE, stream, bRef.length()); // write to index file + } } } diff --git a/core/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java b/core/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java index e1589b4cd2f..183fd17b60f 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.FromXContentBuilder; @@ -132,7 +133,9 @@ public class BlobStoreFormatIT extends AbstractSnapshotIntegTestCase { public void write(T obj, BlobContainer blobContainer, String blobName) throws IOException { BytesReference bytes = write(obj); - blobContainer.writeBlob(blobName, bytes); + try (StreamInput stream = bytes.streamInput()) { + blobContainer.writeBlob(blobName, stream, bytes.length()); + } } private BytesReference write(T obj) throws IOException { @@ -284,7 +287,10 @@ public class BlobStoreFormatIT extends AbstractSnapshotIntegTestCase { buffer[location] = (byte) (buffer[location] ^ 42); } while (originalChecksum == checksum(buffer)); blobContainer.deleteBlob(blobName); // delete original before writing new blob - blobContainer.writeBlob(blobName, new BytesArray(buffer)); + BytesArray bytesArray = new BytesArray(buffer); + try (StreamInput stream = bytesArray.streamInput()) { + blobContainer.writeBlob(blobName, stream, bytesArray.length()); + } } private long checksum(byte[] buffer) throws IOException { diff --git a/core/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java b/core/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java index 9a66100ae17..72f17039f26 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java +++ b/core/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java @@ -21,11 +21,9 @@ package org.elasticsearch.snapshots.mockstore; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; -import org.elasticsearch.common.bytes.BytesReference; import java.io.IOException; import java.io.InputStream; -import java.util.Collection; import java.util.Map; /** @@ -58,11 +56,6 @@ public class BlobContainerWrapper implements BlobContainer { delegate.writeBlob(blobName, inputStream, blobSize); } - @Override - public void writeBlob(String blobName, BytesReference bytes) throws IOException { - delegate.writeBlob(blobName, bytes); - } - @Override public void deleteBlob(String blobName) throws IOException { delegate.deleteBlob(blobName); diff --git a/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 60c5e014828..06c4ec10af0 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -40,7 +40,6 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -318,12 +317,6 @@ public class MockRepository extends FsRepository { super.move(sourceBlob, targetBlob); } - @Override - public void writeBlob(String blobName, BytesReference bytes) throws IOException { - maybeIOExceptionOrBlock(blobName); - super.writeBlob(blobName, bytes); - } - @Override public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { maybeIOExceptionOrBlock(blobName); diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/cloud/azure/blobstore/AzureBlobContainer.java b/plugins/repository-azure/src/main/java/org/elasticsearch/cloud/azure/blobstore/AzureBlobContainer.java index e359aa7575e..fe23a33cebe 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/cloud/azure/blobstore/AzureBlobContainer.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/cloud/azure/blobstore/AzureBlobContainer.java @@ -24,7 +24,6 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; @@ -91,14 +90,6 @@ public class AzureBlobContainer extends AbstractBlobContainer { } } - @Override - public void writeBlob(String blobName, BytesReference bytes) throws IOException { - logger.trace("writeBlob({}, bytes)", blobName); - try (OutputStream stream = createOutput(blobName)) { - bytes.writeTo(stream); - } - } - private OutputStream createOutput(String blobName) throws IOException { try { return new AzureOutputStream(blobStore.getOutputStream(blobStore.container(), buildKey(blobName))); diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/common/blobstore/gcs/GoogleCloudStorageBlobContainer.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/common/blobstore/gcs/GoogleCloudStorageBlobContainer.java index 4550a4bb1df..a6e1640dd60 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/common/blobstore/gcs/GoogleCloudStorageBlobContainer.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/common/blobstore/gcs/GoogleCloudStorageBlobContainer.java @@ -23,16 +23,10 @@ import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStoreException; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; -import org.elasticsearch.common.bytes.BytesReference; import java.io.IOException; import java.io.InputStream; -import java.util.Collection; -import java.util.HashSet; import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - public class GoogleCloudStorageBlobContainer extends AbstractBlobContainer { @@ -74,11 +68,6 @@ public class GoogleCloudStorageBlobContainer extends AbstractBlobContainer { blobStore.writeBlob(buildKey(blobName), inputStream, blobSize); } - @Override - public void writeBlob(String blobName, BytesReference bytes) throws IOException { - writeBlob(blobName, bytes.streamInput(), bytes.length()); - } - @Override public void deleteBlob(String blobName) throws IOException { blobStore.deleteBlob(buildKey(blobName)); diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobContainer.java index 5659b2df1c8..f8ebafbcbb1 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobContainer.java @@ -33,7 +33,6 @@ import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStoreException; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.io.Streams; @@ -106,13 +105,6 @@ public class S3BlobContainer extends AbstractBlobContainer { } } - @Override - public void writeBlob(String blobName, BytesReference bytes) throws IOException { - try (OutputStream stream = createOutput(blobName)) { - bytes.writeTo(stream); - } - } - @Override public void deleteBlob(String blobName) throws IOException { if (!blobExists(blobName)) { diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java index aedbc946d74..6345f444640 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java @@ -49,7 +49,7 @@ public abstract class ESBlobStoreContainerTestCase extends ESTestCase { try(final BlobStore store = newBlobStore()) { final BlobContainer container = store.blobContainer(new BlobPath()); byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16))); - container.writeBlob("foobar", new BytesArray(data)); + writeBlob(container, "foobar", new BytesArray(data)); try (InputStream stream = container.readBlob("foobar")) { BytesRefBuilder target = new BytesRefBuilder(); while (target.length() < data.length) { @@ -120,7 +120,7 @@ public abstract class ESBlobStoreContainerTestCase extends ESTestCase { byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16))); final BytesArray bytesArray = new BytesArray(data); - container.writeBlob(blobName, bytesArray); + writeBlob(container, blobName, bytesArray); container.deleteBlob(blobName); // should not raise // blob deleted, so should raise again @@ -135,11 +135,17 @@ public abstract class ESBlobStoreContainerTestCase extends ESTestCase { final BlobContainer container = store.blobContainer(new BlobPath()); byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16))); final BytesArray bytesArray = new BytesArray(data); - container.writeBlob(blobName, bytesArray); + writeBlob(container, blobName, bytesArray); // should not be able to overwrite existing blob - expectThrows(IOException.class, () -> container.writeBlob(blobName, bytesArray)); + expectThrows(IOException.class, () -> writeBlob(container, blobName, bytesArray)); container.deleteBlob(blobName); - container.writeBlob(blobName, bytesArray); // after deleting the previous blob, we should be able to write to it again + writeBlob(container, blobName, bytesArray); // after deleting the previous blob, we should be able to write to it again + } + } + + private void writeBlob(final BlobContainer container, final String blobName, final BytesArray bytesArray) throws IOException { + try (InputStream stream = bytesArray.streamInput()) { + container.writeBlob(blobName, stream, bytesArray.length()); } } diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreTestCase.java index be7431795b2..0cec570dbe8 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreTestCase.java @@ -23,7 +23,6 @@ import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.test.ESTestCase; -import org.junit.Test; import java.io.IOException; import java.io.InputStream; @@ -42,8 +41,8 @@ public abstract class ESBlobStoreTestCase extends ESTestCase { final BlobContainer containerBar = store.blobContainer(new BlobPath().add("bar")); byte[] data1 = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16))); byte[] data2 = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16))); - containerFoo.writeBlob("test", new BytesArray(data1)); - containerBar.writeBlob("test", new BytesArray(data2)); + writeBlob(containerFoo, "test", new BytesArray(data1)); + writeBlob(containerBar, "test", new BytesArray(data2)); assertArrayEquals(readBlobFully(containerFoo, "test", data1.length), data1); assertArrayEquals(readBlobFully(containerBar, "test", data2.length), data2); @@ -58,7 +57,7 @@ public abstract class ESBlobStoreTestCase extends ESTestCase { public static byte[] writeRandomBlob(BlobContainer container, String name, int length) throws IOException { byte[] data = randomBytes(length); - container.writeBlob(name, new BytesArray(data)); + writeBlob(container, name, new BytesArray(data)); return data; } @@ -79,5 +78,11 @@ public abstract class ESBlobStoreTestCase extends ESTestCase { return data; } + private static void writeBlob(BlobContainer container, String blobName, BytesArray bytesArray) throws IOException { + try (InputStream stream = bytesArray.streamInput()) { + container.writeBlob(blobName, stream, bytesArray.length()); + } + } + protected abstract BlobStore newBlobStore() throws IOException; }