Add BlobContainer.writeBlobAtomic() (#30902)
This commit adds a new writeBlobAtomic() method to the BlobContainer interface that can be implemented by repository implementations which support atomic writes operations. When the BlobContainer implementation does not provide a specific implementation of writeBlobAtomic(), then the writeBlob() method is used. Related to #30680
This commit is contained in:
parent
f2892f1bed
commit
9531b7bbcb
|
@ -505,8 +505,6 @@
|
|||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]settings[/\\]ClusterSettingsIT.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]shards[/\\]ClusterSearchShardsIT.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]structure[/\\]RoutingIteratorTests.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]blobstore[/\\]FsBlobStoreContainerTests.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]blobstore[/\\]FsBlobStoreTests.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]breaker[/\\]MemoryCircuitBreakerTests.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]geo[/\\]ShapeBuilderTests.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]hash[/\\]MessageDigestsTests.java" checks="LineLength" />
|
||||
|
|
|
@ -74,6 +74,29 @@ public interface BlobContainer {
|
|||
*/
|
||||
void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException;
|
||||
|
||||
/**
|
||||
* Reads blob content from the input stream and writes it to the container in a new blob with the given name,
|
||||
* using an atomic write operation if the implementation supports it. When the BlobContainer implementation
|
||||
* does not provide a specific implementation of writeBlobAtomic(String, InputStream, long), then
|
||||
* the {@link #writeBlob(String, InputStream, long)} method is used.
|
||||
*
|
||||
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the
|
||||
* same name already exists, the operation will fail and an {@link IOException} will be thrown.
|
||||
*
|
||||
* @param blobName
|
||||
* The name of the blob to write the contents of the input stream to.
|
||||
* @param inputStream
|
||||
* The input stream from which to retrieve the bytes to write to the blob.
|
||||
* @param blobSize
|
||||
* The size of the blob to be written, in bytes. It is implementation dependent whether
|
||||
* this value is used in writing the blob to the repository.
|
||||
* @throws FileAlreadyExistsException if a blob by the same name already exists
|
||||
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
|
||||
*/
|
||||
default void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException {
|
||||
writeBlob(blobName, inputStream, blobSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes a blob with giving name, if the blob exists. If the blob does not exist,
|
||||
* this method throws a NoSuchFileException.
|
||||
|
|
|
@ -19,11 +19,12 @@
|
|||
|
||||
package org.elasticsearch.common.blobstore.fs;
|
||||
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
|
||||
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.core.internal.io.Streams;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
|
@ -56,8 +57,9 @@ import static java.util.Collections.unmodifiableMap;
|
|||
*/
|
||||
public class FsBlobContainer extends AbstractBlobContainer {
|
||||
|
||||
protected final FsBlobStore blobStore;
|
||||
private static final String TEMP_FILE_PREFIX = "pending-";
|
||||
|
||||
protected final FsBlobStore blobStore;
|
||||
protected final Path path;
|
||||
|
||||
public FsBlobContainer(FsBlobStore blobStore, BlobPath blobPath, Path path) {
|
||||
|
@ -131,6 +133,48 @@ public class FsBlobContainer extends AbstractBlobContainer {
|
|||
IOUtils.fsync(path, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException {
|
||||
final String tempBlob = tempBlobName(blobName);
|
||||
final Path tempBlobPath = path.resolve(tempBlob);
|
||||
try {
|
||||
try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) {
|
||||
Streams.copy(inputStream, outputStream);
|
||||
}
|
||||
IOUtils.fsync(tempBlobPath, false);
|
||||
|
||||
final Path blobPath = path.resolve(blobName);
|
||||
// If the target file exists then Files.move() behaviour is implementation specific
|
||||
// the existing file might be replaced or this method fails by throwing an IOException.
|
||||
if (Files.exists(blobPath)) {
|
||||
throw new FileAlreadyExistsException("blob [" + blobPath + "] already exists, cannot overwrite");
|
||||
}
|
||||
Files.move(tempBlobPath, blobPath, StandardCopyOption.ATOMIC_MOVE);
|
||||
} catch (IOException ex) {
|
||||
try {
|
||||
deleteBlobIgnoringIfNotExists(tempBlob);
|
||||
} catch (IOException e) {
|
||||
ex.addSuppressed(e);
|
||||
}
|
||||
throw ex;
|
||||
} finally {
|
||||
IOUtils.fsync(path, true);
|
||||
}
|
||||
}
|
||||
|
||||
public static String tempBlobName(final String blobName) {
|
||||
return "pending-" + blobName + "-" + UUIDs.randomBase64UUID();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the blob is a leftover temporary blob.
|
||||
*
|
||||
* The temporary blobs might be left after failed atomic write operation.
|
||||
*/
|
||||
public static boolean isTempBlobName(final String blobName) {
|
||||
return blobName.startsWith(TEMP_FILE_PREFIX);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void move(String source, String target) throws IOException {
|
||||
Path sourcePath = path.resolve(source);
|
||||
|
|
|
@ -50,6 +50,7 @@ import org.elasticsearch.common.blobstore.BlobContainer;
|
|||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.common.blobstore.fs.FsBlobContainer;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
|
@ -555,10 +556,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
String blobName = "master.dat";
|
||||
BytesArray bytes = new BytesArray(testBytes);
|
||||
try (InputStream stream = bytes.streamInput()) {
|
||||
testContainer.writeBlob(blobName + "-temp", stream, bytes.length());
|
||||
testContainer.writeBlobAtomic(blobName, stream, bytes.length());
|
||||
}
|
||||
// Make sure that move is supported
|
||||
testContainer.move(blobName + "-temp", blobName);
|
||||
return seed;
|
||||
}
|
||||
} catch (IOException exp) {
|
||||
|
@ -774,18 +773,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
|
||||
private void writeAtomic(final String blobName, final BytesReference bytesRef) throws IOException {
|
||||
final String tempBlobName = "pending-" + blobName + "-" + UUIDs.randomBase64UUID();
|
||||
try (InputStream stream = bytesRef.streamInput()) {
|
||||
snapshotsBlobContainer.writeBlob(tempBlobName, stream, bytesRef.length());
|
||||
snapshotsBlobContainer.move(tempBlobName, blobName);
|
||||
} catch (IOException ex) {
|
||||
// temporary blob creation or move failed - try cleaning up
|
||||
try {
|
||||
snapshotsBlobContainer.deleteBlobIgnoringIfNotExists(tempBlobName);
|
||||
} catch (IOException e) {
|
||||
ex.addSuppressed(e);
|
||||
}
|
||||
throw ex;
|
||||
snapshotsBlobContainer.writeBlobAtomic(blobName, stream, bytesRef.length());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -955,7 +944,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
// Delete temporary index files first, as we might otherwise fail in the next step creating the new index file if an earlier
|
||||
// attempt to write an index file with this generation failed mid-way after creating the temporary file.
|
||||
for (final String blobName : blobs.keySet()) {
|
||||
if (indexShardSnapshotsFormat.isTempBlobName(blobName)) {
|
||||
if (FsBlobContainer.isTempBlobName(blobName)) {
|
||||
try {
|
||||
blobContainer.deleteBlobIgnoringIfNotExists(blobName);
|
||||
} catch (IOException e) {
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.lucene.index.CorruptIndexException;
|
|||
import org.apache.lucene.index.IndexFormatTooNewException;
|
||||
import org.apache.lucene.index.IndexFormatTooOldException;
|
||||
import org.apache.lucene.store.OutputStreamIndexOutput;
|
||||
import org.elasticsearch.common.CheckedConsumer;
|
||||
import org.elasticsearch.common.CheckedFunction;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
|
@ -52,8 +53,6 @@ import java.util.Locale;
|
|||
*/
|
||||
public class ChecksumBlobStoreFormat<T extends ToXContent> extends BlobStoreFormat<T> {
|
||||
|
||||
private static final String TEMP_FILE_PREFIX = "pending-";
|
||||
|
||||
private static final XContentType DEFAULT_X_CONTENT_TYPE = XContentType.SMILE;
|
||||
|
||||
// The format version
|
||||
|
@ -120,7 +119,7 @@ public class ChecksumBlobStoreFormat<T extends ToXContent> extends BlobStoreForm
|
|||
}
|
||||
|
||||
/**
|
||||
* Writes blob in atomic manner with resolving the blob name using {@link #blobName} and {@link #tempBlobName} methods.
|
||||
* Writes blob in atomic manner with resolving the blob name using {@link #blobName} method.
|
||||
* <p>
|
||||
* The blob will be compressed and checksum will be written if required.
|
||||
*
|
||||
|
@ -131,20 +130,12 @@ public class ChecksumBlobStoreFormat<T extends ToXContent> extends BlobStoreForm
|
|||
* @param name blob name
|
||||
*/
|
||||
public void writeAtomic(T obj, BlobContainer blobContainer, String name) throws IOException {
|
||||
String blobName = blobName(name);
|
||||
String tempBlobName = tempBlobName(name);
|
||||
writeBlob(obj, blobContainer, tempBlobName);
|
||||
try {
|
||||
blobContainer.move(tempBlobName, blobName);
|
||||
} catch (IOException ex) {
|
||||
// Move failed - try cleaning up
|
||||
try {
|
||||
blobContainer.deleteBlob(tempBlobName);
|
||||
} catch (Exception e) {
|
||||
ex.addSuppressed(e);
|
||||
final String blobName = blobName(name);
|
||||
writeTo(obj, blobName, bytesArray -> {
|
||||
try (InputStream stream = bytesArray.streamInput()) {
|
||||
blobContainer.writeBlobAtomic(blobName, stream, bytesArray.length());
|
||||
}
|
||||
throw ex;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -157,51 +148,35 @@ public class ChecksumBlobStoreFormat<T extends ToXContent> extends BlobStoreForm
|
|||
* @param name blob name
|
||||
*/
|
||||
public void write(T obj, BlobContainer blobContainer, String name) throws IOException {
|
||||
String blobName = blobName(name);
|
||||
writeBlob(obj, blobContainer, blobName);
|
||||
final String blobName = blobName(name);
|
||||
writeTo(obj, blobName, bytesArray -> {
|
||||
try (InputStream stream = bytesArray.streamInput()) {
|
||||
blobContainer.writeBlob(blobName, stream, bytesArray.length());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes blob in atomic manner without resolving the blobName using using {@link #blobName} method.
|
||||
* <p>
|
||||
* The blob will be compressed and checksum will be written if required.
|
||||
*
|
||||
* @param obj object to be serialized
|
||||
* @param blobContainer blob container
|
||||
* @param blobName blob name
|
||||
*/
|
||||
protected void writeBlob(T obj, BlobContainer blobContainer, String blobName) throws IOException {
|
||||
BytesReference bytes = write(obj);
|
||||
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
|
||||
private void writeTo(final T obj, final String blobName, final CheckedConsumer<BytesArray, IOException> consumer) throws IOException {
|
||||
final BytesReference bytes = write(obj);
|
||||
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
|
||||
final String resourceDesc = "ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")";
|
||||
try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(resourceDesc, blobName, byteArrayOutputStream, BUFFER_SIZE)) {
|
||||
try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(resourceDesc, blobName, outputStream, BUFFER_SIZE)) {
|
||||
CodecUtil.writeHeader(indexOutput, codec, VERSION);
|
||||
try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) {
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
// this is important since some of the XContentBuilders write bytes on close.
|
||||
// in order to write the footer we need to prevent closing the actual index input.
|
||||
} }) {
|
||||
}
|
||||
}) {
|
||||
bytes.writeTo(indexOutputOutputStream);
|
||||
}
|
||||
CodecUtil.writeFooter(indexOutput);
|
||||
}
|
||||
BytesArray bytesArray = new BytesArray(byteArrayOutputStream.toByteArray());
|
||||
try (InputStream stream = bytesArray.streamInput()) {
|
||||
blobContainer.writeBlob(blobName, stream, bytesArray.length());
|
||||
}
|
||||
consumer.accept(new BytesArray(outputStream.toByteArray()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the blob is a leftover temporary blob.
|
||||
*
|
||||
* The temporary blobs might be left after failed atomic write operation.
|
||||
*/
|
||||
public boolean isTempBlobName(String blobName) {
|
||||
return blobName.startsWith(ChecksumBlobStoreFormat.TEMP_FILE_PREFIX);
|
||||
}
|
||||
|
||||
protected BytesReference write(T obj) throws IOException {
|
||||
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
|
||||
if (compress) {
|
||||
|
@ -222,10 +197,4 @@ public class ChecksumBlobStoreFormat<T extends ToXContent> extends BlobStoreForm
|
|||
builder.endObject();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected String tempBlobName(String name) {
|
||||
return TEMP_FILE_PREFIX + String.format(Locale.ROOT, blobNameFormat, name);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.common.blobstore.fs;
|
||||
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
|
||||
public class FsBlobContainerTests extends ESTestCase {
|
||||
|
||||
public void testTempBlobName() {
|
||||
final String blobName = randomAlphaOfLengthBetween(1, 20);
|
||||
final String tempBlobName = FsBlobContainer.tempBlobName(blobName);
|
||||
assertThat(tempBlobName, startsWith("pending-"));
|
||||
assertThat(tempBlobName, containsString(blobName));
|
||||
}
|
||||
|
||||
public void testIsTempBlobName() {
|
||||
final String tempBlobName = FsBlobContainer.tempBlobName(randomAlphaOfLengthBetween(1, 20));
|
||||
assertThat(FsBlobContainer.isTempBlobName(tempBlobName), is(true));
|
||||
}
|
||||
}
|
|
@ -16,23 +16,27 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.common.blobstore;
|
||||
package org.elasticsearch.common.blobstore.fs;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.elasticsearch.common.blobstore.fs.FsBlobStore;
|
||||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
|
||||
@LuceneTestCase.SuppressFileSystems("ExtrasFS")
|
||||
public class FsBlobStoreContainerTests extends ESBlobStoreContainerTestCase {
|
||||
|
||||
protected BlobStore newBlobStore() throws IOException {
|
||||
Path tempDir = createTempDir();
|
||||
Settings settings = randomBoolean() ? Settings.EMPTY : Settings.builder().put("buffer_size", new ByteSizeValue(randomIntBetween(1, 100), ByteSizeUnit.KB)).build();
|
||||
return new FsBlobStore(settings, tempDir);
|
||||
final Settings settings;
|
||||
if (randomBoolean()) {
|
||||
settings = Settings.builder().put("buffer_size", new ByteSizeValue(randomIntBetween(1, 100), ByteSizeUnit.KB)).build();
|
||||
} else {
|
||||
settings = Settings.EMPTY;
|
||||
}
|
||||
return new FsBlobStore(settings, createTempDir());
|
||||
}
|
||||
}
|
|
@ -16,10 +16,12 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.common.blobstore;
|
||||
package org.elasticsearch.common.blobstore.fs;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.elasticsearch.common.blobstore.fs.FsBlobStore;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
|
@ -32,10 +34,15 @@ import java.nio.file.Path;
|
|||
|
||||
@LuceneTestCase.SuppressFileSystems("ExtrasFS")
|
||||
public class FsBlobStoreTests extends ESBlobStoreTestCase {
|
||||
|
||||
protected BlobStore newBlobStore() throws IOException {
|
||||
Path tempDir = createTempDir();
|
||||
Settings settings = randomBoolean() ? Settings.EMPTY : Settings.builder().put("buffer_size", new ByteSizeValue(randomIntBetween(1, 100), ByteSizeUnit.KB)).build();
|
||||
return new FsBlobStore(settings, tempDir);
|
||||
final Settings settings;
|
||||
if (randomBoolean()) {
|
||||
settings = Settings.builder().put("buffer_size", new ByteSizeValue(randomIntBetween(1, 100), ByteSizeUnit.KB)).build();
|
||||
} else {
|
||||
settings = Settings.EMPTY;
|
||||
}
|
||||
return new FsBlobStore(settings, createTempDir());
|
||||
}
|
||||
|
||||
public void testReadOnly() throws Exception {
|
|
@ -94,7 +94,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
|
|||
}
|
||||
Thread.sleep(100);
|
||||
}
|
||||
fail("Timeout!!!");
|
||||
fail("Timeout waiting for node [" + node + "] to be blocked");
|
||||
}
|
||||
|
||||
public SnapshotInfo waitForCompletion(String repository, String snapshotName, TimeValue timeout) throws InterruptedException {
|
||||
|
|
|
@ -224,52 +224,16 @@ public class BlobStoreFormatIT extends AbstractSnapshotIntegTestCase {
|
|||
IOException writeBlobException = expectThrows(IOException.class, () -> {
|
||||
BlobContainer wrapper = new BlobContainerWrapper(blobContainer) {
|
||||
@Override
|
||||
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
|
||||
throw new IOException("Exception thrown in writeBlob() for " + blobName);
|
||||
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize) throws IOException {
|
||||
throw new IOException("Exception thrown in writeBlobAtomic() for " + blobName);
|
||||
}
|
||||
};
|
||||
checksumFormat.writeAtomic(blobObj, wrapper, name);
|
||||
});
|
||||
|
||||
assertEquals("Exception thrown in writeBlob() for pending-" + name, writeBlobException.getMessage());
|
||||
assertEquals("Exception thrown in writeBlobAtomic() for " + name, writeBlobException.getMessage());
|
||||
assertEquals(0, writeBlobException.getSuppressed().length);
|
||||
}
|
||||
{
|
||||
IOException moveException = expectThrows(IOException.class, () -> {
|
||||
BlobContainer wrapper = new BlobContainerWrapper(blobContainer) {
|
||||
@Override
|
||||
public void move(String sourceBlobName, String targetBlobName) throws IOException {
|
||||
throw new IOException("Exception thrown in move() for " + sourceBlobName);
|
||||
}
|
||||
};
|
||||
checksumFormat.writeAtomic(blobObj, wrapper, name);
|
||||
});
|
||||
assertEquals("Exception thrown in move() for pending-" + name, moveException.getMessage());
|
||||
assertEquals(0, moveException.getSuppressed().length);
|
||||
}
|
||||
{
|
||||
IOException moveThenDeleteException = expectThrows(IOException.class, () -> {
|
||||
BlobContainer wrapper = new BlobContainerWrapper(blobContainer) {
|
||||
@Override
|
||||
public void move(String sourceBlobName, String targetBlobName) throws IOException {
|
||||
throw new IOException("Exception thrown in move() for " + sourceBlobName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteBlob(String blobName) throws IOException {
|
||||
throw new IOException("Exception thrown in deleteBlob() for " + blobName);
|
||||
}
|
||||
};
|
||||
checksumFormat.writeAtomic(blobObj, wrapper, name);
|
||||
});
|
||||
|
||||
assertEquals("Exception thrown in move() for pending-" + name, moveThenDeleteException.getMessage());
|
||||
assertEquals(1, moveThenDeleteException.getSuppressed().length);
|
||||
|
||||
final Throwable suppressedThrowable = moveThenDeleteException.getSuppressed()[0];
|
||||
assertTrue(suppressedThrowable instanceof IOException);
|
||||
assertEquals("Exception thrown in deleteBlob() for pending-" + name, suppressedThrowable.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
protected BlobStore createTestBlobStore() throws IOException {
|
||||
|
|
|
@ -53,11 +53,21 @@ public class BlobContainerWrapper implements BlobContainer {
|
|||
delegate.writeBlob(blobName, inputStream, blobSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException {
|
||||
delegate.writeBlobAtomic(blobName, inputStream, blobSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteBlob(String blobName) throws IOException {
|
||||
delegate.deleteBlob(blobName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteBlobIgnoringIfNotExists(final String blobName) throws IOException {
|
||||
delegate.deleteBlobIgnoringIfNotExists(blobName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, BlobMetaData> listBlobs() throws IOException {
|
||||
return delegate.listBlobs();
|
||||
|
|
|
@ -19,6 +19,28 @@
|
|||
|
||||
package org.elasticsearch.snapshots.mockstore;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.RandomizedContext;
|
||||
import org.apache.lucene.index.CorruptIndexException;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.common.blobstore.fs.FsBlobContainer;
|
||||
import org.elasticsearch.common.io.PathUtils;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.plugins.RepositoryPlugin;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
import org.elasticsearch.repositories.fs.FsRepository;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
|
@ -29,31 +51,11 @@ import java.util.Arrays;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.RandomizedContext;
|
||||
import org.apache.lucene.index.CorruptIndexException;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.common.io.PathUtils;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.plugins.RepositoryPlugin;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.repositories.fs.FsRepository;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
|
||||
public class MockRepository extends FsRepository {
|
||||
|
||||
public static class Plugin extends org.elasticsearch.plugins.Plugin implements RepositoryPlugin {
|
||||
|
@ -325,6 +327,12 @@ public class MockRepository extends FsRepository {
|
|||
super.deleteBlob(blobName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteBlobIgnoringIfNotExists(String blobName) throws IOException {
|
||||
maybeIOExceptionOrBlock(blobName);
|
||||
super.deleteBlobIgnoringIfNotExists(blobName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, BlobMetaData> listBlobs() throws IOException {
|
||||
maybeIOExceptionOrBlock("");
|
||||
|
@ -365,6 +373,31 @@ public class MockRepository extends FsRepository {
|
|||
maybeIOExceptionOrBlock(blobName);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException {
|
||||
final Random random = RandomizedContext.current().getRandom();
|
||||
if (random.nextBoolean()) {
|
||||
if ((delegate() instanceof FsBlobContainer) && (random.nextBoolean())) {
|
||||
// Simulate a failure between the write and move operation in FsBlobContainer
|
||||
final String tempBlobName = FsBlobContainer.tempBlobName(blobName);
|
||||
super.writeBlob(tempBlobName, inputStream, blobSize);
|
||||
maybeIOExceptionOrBlock(blobName);
|
||||
final FsBlobContainer fsBlobContainer = (FsBlobContainer) delegate();
|
||||
fsBlobContainer.move(tempBlobName, blobName);
|
||||
} else {
|
||||
// Atomic write since it is potentially supported
|
||||
// by the delegating blob container
|
||||
maybeIOExceptionOrBlock(blobName);
|
||||
super.writeBlobAtomic(blobName, inputStream, blobSize);
|
||||
}
|
||||
} else {
|
||||
// Simulate a non-atomic write since many blob container
|
||||
// implementations does not support atomic write
|
||||
maybeIOExceptionOrBlock(blobName);
|
||||
super.writeBlob(blobName, inputStream, blobSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -158,7 +158,11 @@ public abstract class ESBlobStoreContainerTestCase extends ESTestCase {
|
|||
|
||||
protected void writeBlob(final BlobContainer container, final String blobName, final BytesArray bytesArray) throws IOException {
|
||||
try (InputStream stream = bytesArray.streamInput()) {
|
||||
container.writeBlob(blobName, stream, bytesArray.length());
|
||||
if (randomBoolean()) {
|
||||
container.writeBlob(blobName, stream, bytesArray.length());
|
||||
} else {
|
||||
container.writeBlobAtomic(blobName, stream, bytesArray.length());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue