Cleans up the BlobContainer interface by removing the (#19727)
writeBlob method takes a BytesReference in favor of just the writeBlob method that takes an InputStream. Closes #18528
This commit is contained in:
parent
a21dd80f1b
commit
456ea56527
|
@ -19,8 +19,6 @@
|
|||
|
||||
package org.elasticsearch.common.blobstore;
|
||||
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
|
@ -75,22 +73,6 @@ public interface BlobContainer {
|
|||
*/
|
||||
void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException;
|
||||
|
||||
/**
|
||||
* Writes the input bytes to a new blob in the container with the given name. This method assumes the
|
||||
* container does not already contain a blob of the same blobName. If a blob by the same name already
|
||||
* exists, the operation will fail and an {@link IOException} will be thrown.
|
||||
*
|
||||
* TODO: Remove this in favor of a single {@link #writeBlob(String, InputStream, long)} method.
|
||||
* See https://github.com/elastic/elasticsearch/issues/18528
|
||||
*
|
||||
* @param blobName
|
||||
* The name of the blob to write the contents of the input stream to.
|
||||
* @param bytes
|
||||
* The bytes to write to the blob.
|
||||
* @throws IOException if a blob by the same name already exists, or the target blob could not be written to.
|
||||
*/
|
||||
void writeBlob(String blobName, BytesReference bytes) throws IOException;
|
||||
|
||||
/**
|
||||
* Deletes a blob with giving name, if the blob exists. If the blob does not exist, this method throws an IOException.
|
||||
*
|
||||
|
|
|
@ -21,10 +21,6 @@ package org.elasticsearch.common.blobstore.support;
|
|||
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
/**
|
||||
* A base abstract blob container that implements higher level container methods.
|
||||
|
@ -42,10 +38,4 @@ public abstract class AbstractBlobContainer implements BlobContainer {
|
|||
return this.path;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBlob(String blobName, BytesReference bytes) throws IOException {
|
||||
try (InputStream stream = bytes.streamInput()) {
|
||||
writeBlob(blobName, stream, bytes.length());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,6 @@ package org.elasticsearch.common.blobstore.url;
|
|||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.IOException;
|
||||
|
@ -108,8 +107,4 @@ public class URLBlobContainer extends AbstractBlobContainer {
|
|||
throw new UnsupportedOperationException("URL repository doesn't support this operation");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBlob(String blobName, BytesReference data) throws IOException {
|
||||
throw new UnsupportedOperationException("URL repository doesn't support this operation");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -663,7 +663,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
byte[] testBytes = Strings.toUTF8Bytes(seed);
|
||||
BlobContainer testContainer = blobStore().blobContainer(basePath().add(testBlobPrefix(seed)));
|
||||
String blobName = "master.dat";
|
||||
testContainer.writeBlob(blobName + "-temp", new BytesArray(testBytes));
|
||||
BytesArray bytes = new BytesArray(testBytes);
|
||||
try (InputStream stream = bytes.streamInput()) {
|
||||
testContainer.writeBlob(blobName + "-temp", stream, bytes.length());
|
||||
}
|
||||
// Make sure that move is supported
|
||||
testContainer.move(blobName + "-temp", blobName);
|
||||
return seed;
|
||||
|
@ -838,7 +841,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
|
||||
private void writeAtomic(final String blobName, final BytesReference bytesRef) throws IOException {
|
||||
final String tempBlobName = "pending-" + blobName;
|
||||
snapshotsBlobContainer.writeBlob(tempBlobName, bytesRef);
|
||||
try (InputStream stream = bytesRef.streamInput()) {
|
||||
snapshotsBlobContainer.writeBlob(tempBlobName, stream, bytesRef.length());
|
||||
}
|
||||
try {
|
||||
snapshotsBlobContainer.move(tempBlobName, blobName);
|
||||
} catch (IOException ex) {
|
||||
|
@ -900,7 +905,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
BlobContainer testBlobContainer = blobStore().blobContainer(basePath().add(testBlobPrefix(seed)));
|
||||
if (testBlobContainer.blobExists("master.dat")) {
|
||||
try {
|
||||
testBlobContainer.writeBlob("data-" + localNode.getId() + ".dat", new BytesArray(seed));
|
||||
BytesArray bytes = new BytesArray(seed);
|
||||
try (InputStream stream = bytes.streamInput()) {
|
||||
testBlobContainer.writeBlob("data-" + localNode.getId() + ".dat", stream, bytes.length());
|
||||
}
|
||||
} catch (IOException exp) {
|
||||
throw new RepositoryVerificationException(metadata.name(), "store location [" + blobStore() + "] is not accessible on the node [" + localNode + "]", exp);
|
||||
}
|
||||
|
|
|
@ -179,7 +179,10 @@ public class ChecksumBlobStoreFormat<T extends ToXContent> extends BlobStoreForm
|
|||
}
|
||||
CodecUtil.writeFooter(indexOutput);
|
||||
}
|
||||
blobContainer.writeBlob(blobName, new BytesArray(byteArrayOutputStream.toByteArray()));
|
||||
BytesArray bytesArray = new BytesArray(byteArrayOutputStream.toByteArray());
|
||||
try (InputStream stream = bytesArray.streamInput()) {
|
||||
blobContainer.writeBlob(blobName, stream, bytesArray.length());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.elasticsearch.common.UUIDs;
|
|||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
@ -193,7 +194,9 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
|
|||
}
|
||||
bRef = bStream.bytes();
|
||||
}
|
||||
repository.blobContainer().writeBlob(BlobStoreRepository.SNAPSHOTS_FILE, bRef); // write to index file
|
||||
try (StreamInput stream = bRef.streamInput()) {
|
||||
repository.blobContainer().writeBlob(BlobStoreRepository.SNAPSHOTS_FILE, stream, bRef.length()); // write to index file
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.elasticsearch.common.bytes.BytesReference;
|
|||
import org.elasticsearch.common.compress.CompressorFactory;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.FromXContentBuilder;
|
||||
|
@ -132,7 +133,9 @@ public class BlobStoreFormatIT extends AbstractSnapshotIntegTestCase {
|
|||
|
||||
public void write(T obj, BlobContainer blobContainer, String blobName) throws IOException {
|
||||
BytesReference bytes = write(obj);
|
||||
blobContainer.writeBlob(blobName, bytes);
|
||||
try (StreamInput stream = bytes.streamInput()) {
|
||||
blobContainer.writeBlob(blobName, stream, bytes.length());
|
||||
}
|
||||
}
|
||||
|
||||
private BytesReference write(T obj) throws IOException {
|
||||
|
@ -284,7 +287,10 @@ public class BlobStoreFormatIT extends AbstractSnapshotIntegTestCase {
|
|||
buffer[location] = (byte) (buffer[location] ^ 42);
|
||||
} while (originalChecksum == checksum(buffer));
|
||||
blobContainer.deleteBlob(blobName); // delete original before writing new blob
|
||||
blobContainer.writeBlob(blobName, new BytesArray(buffer));
|
||||
BytesArray bytesArray = new BytesArray(buffer);
|
||||
try (StreamInput stream = bytesArray.streamInput()) {
|
||||
blobContainer.writeBlob(blobName, stream, bytesArray.length());
|
||||
}
|
||||
}
|
||||
|
||||
private long checksum(byte[] buffer) throws IOException {
|
||||
|
|
|
@ -21,11 +21,9 @@ package org.elasticsearch.snapshots.mockstore;
|
|||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
@ -58,11 +56,6 @@ public class BlobContainerWrapper implements BlobContainer {
|
|||
delegate.writeBlob(blobName, inputStream, blobSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBlob(String blobName, BytesReference bytes) throws IOException {
|
||||
delegate.writeBlob(blobName, bytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteBlob(String blobName) throws IOException {
|
||||
delegate.deleteBlob(blobName);
|
||||
|
|
|
@ -40,7 +40,6 @@ import org.elasticsearch.common.blobstore.BlobContainer;
|
|||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.PathUtils;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
|
@ -318,12 +317,6 @@ public class MockRepository extends FsRepository {
|
|||
super.move(sourceBlob, targetBlob);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBlob(String blobName, BytesReference bytes) throws IOException {
|
||||
maybeIOExceptionOrBlock(blobName);
|
||||
super.writeBlob(blobName, bytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
|
||||
maybeIOExceptionOrBlock(blobName);
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.elasticsearch.common.Nullable;
|
|||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
|
@ -91,14 +90,6 @@ public class AzureBlobContainer extends AbstractBlobContainer {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBlob(String blobName, BytesReference bytes) throws IOException {
|
||||
logger.trace("writeBlob({}, bytes)", blobName);
|
||||
try (OutputStream stream = createOutput(blobName)) {
|
||||
bytes.writeTo(stream);
|
||||
}
|
||||
}
|
||||
|
||||
private OutputStream createOutput(String blobName) throws IOException {
|
||||
try {
|
||||
return new AzureOutputStream(blobStore.getOutputStream(blobStore.container(), buildKey(blobName)));
|
||||
|
|
|
@ -23,16 +23,10 @@ import org.elasticsearch.common.blobstore.BlobMetaData;
|
|||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStoreException;
|
||||
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
public class GoogleCloudStorageBlobContainer extends AbstractBlobContainer {
|
||||
|
||||
|
@ -74,11 +68,6 @@ public class GoogleCloudStorageBlobContainer extends AbstractBlobContainer {
|
|||
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBlob(String blobName, BytesReference bytes) throws IOException {
|
||||
writeBlob(blobName, bytes.streamInput(), bytes.length());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteBlob(String blobName) throws IOException {
|
||||
blobStore.deleteBlob(buildKey(blobName));
|
||||
|
|
|
@ -33,7 +33,6 @@ import org.elasticsearch.common.blobstore.BlobPath;
|
|||
import org.elasticsearch.common.blobstore.BlobStoreException;
|
||||
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
|
||||
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
|
||||
|
@ -106,13 +105,6 @@ public class S3BlobContainer extends AbstractBlobContainer {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBlob(String blobName, BytesReference bytes) throws IOException {
|
||||
try (OutputStream stream = createOutput(blobName)) {
|
||||
bytes.writeTo(stream);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteBlob(String blobName) throws IOException {
|
||||
if (!blobExists(blobName)) {
|
||||
|
|
|
@ -49,7 +49,7 @@ public abstract class ESBlobStoreContainerTestCase extends ESTestCase {
|
|||
try(final BlobStore store = newBlobStore()) {
|
||||
final BlobContainer container = store.blobContainer(new BlobPath());
|
||||
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
||||
container.writeBlob("foobar", new BytesArray(data));
|
||||
writeBlob(container, "foobar", new BytesArray(data));
|
||||
try (InputStream stream = container.readBlob("foobar")) {
|
||||
BytesRefBuilder target = new BytesRefBuilder();
|
||||
while (target.length() < data.length) {
|
||||
|
@ -120,7 +120,7 @@ public abstract class ESBlobStoreContainerTestCase extends ESTestCase {
|
|||
|
||||
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
||||
final BytesArray bytesArray = new BytesArray(data);
|
||||
container.writeBlob(blobName, bytesArray);
|
||||
writeBlob(container, blobName, bytesArray);
|
||||
container.deleteBlob(blobName); // should not raise
|
||||
|
||||
// blob deleted, so should raise again
|
||||
|
@ -135,11 +135,17 @@ public abstract class ESBlobStoreContainerTestCase extends ESTestCase {
|
|||
final BlobContainer container = store.blobContainer(new BlobPath());
|
||||
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
||||
final BytesArray bytesArray = new BytesArray(data);
|
||||
container.writeBlob(blobName, bytesArray);
|
||||
writeBlob(container, blobName, bytesArray);
|
||||
// should not be able to overwrite existing blob
|
||||
expectThrows(IOException.class, () -> container.writeBlob(blobName, bytesArray));
|
||||
expectThrows(IOException.class, () -> writeBlob(container, blobName, bytesArray));
|
||||
container.deleteBlob(blobName);
|
||||
container.writeBlob(blobName, bytesArray); // after deleting the previous blob, we should be able to write to it again
|
||||
writeBlob(container, blobName, bytesArray); // after deleting the previous blob, we should be able to write to it again
|
||||
}
|
||||
}
|
||||
|
||||
private void writeBlob(final BlobContainer container, final String blobName, final BytesArray bytesArray) throws IOException {
|
||||
try (InputStream stream = bytesArray.streamInput()) {
|
||||
container.writeBlob(blobName, stream, bytesArray.length());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.elasticsearch.common.blobstore.BlobPath;
|
|||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
@ -42,8 +41,8 @@ public abstract class ESBlobStoreTestCase extends ESTestCase {
|
|||
final BlobContainer containerBar = store.blobContainer(new BlobPath().add("bar"));
|
||||
byte[] data1 = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
||||
byte[] data2 = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
||||
containerFoo.writeBlob("test", new BytesArray(data1));
|
||||
containerBar.writeBlob("test", new BytesArray(data2));
|
||||
writeBlob(containerFoo, "test", new BytesArray(data1));
|
||||
writeBlob(containerBar, "test", new BytesArray(data2));
|
||||
|
||||
assertArrayEquals(readBlobFully(containerFoo, "test", data1.length), data1);
|
||||
assertArrayEquals(readBlobFully(containerBar, "test", data2.length), data2);
|
||||
|
@ -58,7 +57,7 @@ public abstract class ESBlobStoreTestCase extends ESTestCase {
|
|||
|
||||
public static byte[] writeRandomBlob(BlobContainer container, String name, int length) throws IOException {
|
||||
byte[] data = randomBytes(length);
|
||||
container.writeBlob(name, new BytesArray(data));
|
||||
writeBlob(container, name, new BytesArray(data));
|
||||
return data;
|
||||
}
|
||||
|
||||
|
@ -79,5 +78,11 @@ public abstract class ESBlobStoreTestCase extends ESTestCase {
|
|||
return data;
|
||||
}
|
||||
|
||||
private static void writeBlob(BlobContainer container, String blobName, BytesArray bytesArray) throws IOException {
|
||||
try (InputStream stream = bytesArray.streamInput()) {
|
||||
container.writeBlob(blobName, stream, bytesArray.length());
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract BlobStore newBlobStore() throws IOException;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue