Simplify the BlobContainer blob writing interface

Instead of asking blob store to create output for posting blob content, this change provides that content of the blob to the blob store for writing. This will significantly simplify the  interface for S3 and Azure plugins.
This commit is contained in:
Igor Motov 2015-09-09 14:24:13 -04:00
parent 7b0f086946
commit 39ca45050a
15 changed files with 162 additions and 81 deletions

View File

@ -19,11 +19,10 @@
package org.elasticsearch.common.blobstore;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.common.bytes.BytesReference;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Map;
@ -37,14 +36,19 @@ public interface BlobContainer {
boolean blobExists(String blobName);
/**
* Creates a new {@link InputStream} for the given blob name
* Creates a new InputStream for the given blob name
*/
InputStream openInput(String blobName) throws IOException;
InputStream readBlob(String blobName) throws IOException;
/**
* Creates a new OutputStream for the given blob name
* Reads blob content from the input stream and writes it to the blob store
*/
OutputStream createOutput(String blobName) throws IOException;
void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException;
/**
* Writes bytes to the blob
*/
void writeBlob(String blobName, BytesReference bytes) throws IOException;
/**
* Deletes a blob with giving name.

View File

@ -25,7 +25,9 @@ import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.Streams;
import java.io.*;
import java.nio.file.DirectoryStream;
@ -83,25 +85,28 @@ public class FsBlobContainer extends AbstractBlobContainer {
}
@Override
public InputStream openInput(String name) throws IOException {
public InputStream readBlob(String name) throws IOException {
return new BufferedInputStream(Files.newInputStream(path.resolve(name)), blobStore.bufferSizeInBytes());
}
@Override
public OutputStream createOutput(String blobName) throws IOException {
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
final Path file = path.resolve(blobName);
return new BufferedOutputStream(new FilterOutputStream(Files.newOutputStream(file)) {
try (OutputStream outputStream = Files.newOutputStream(file)) {
Streams.copy(inputStream, outputStream, new byte[blobStore.bufferSizeInBytes()]);
}
IOUtils.fsync(file, false);
IOUtils.fsync(path, true);
}
@Override // FilterOutputStream#write(byte[] b, int off, int len) is trappy writes every single byte
public void write(byte[] b, int off, int len) throws IOException { out.write(b, off, len);}
@Override
public void close() throws IOException {
super.close();
IOUtils.fsync(file, false);
IOUtils.fsync(path, true);
}
}, blobStore.bufferSizeInBytes());
@Override
public void writeBlob(String blobName, BytesReference data) throws IOException {
final Path file = path.resolve(blobName);
try (OutputStream outputStream = Files.newOutputStream(file)) {
data.writeTo(outputStream);
}
IOUtils.fsync(file, false);
IOUtils.fsync(path, true);
}
@Override

View File

@ -19,7 +19,6 @@
package org.elasticsearch.common.blobstore.support;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;

View File

@ -0,0 +1,78 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.blobstore.support;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
* Temporary compatibility interface.
*
* This class should be removed after S3 and Azure containers migrate to the new model
*/
@Deprecated
public abstract class AbstractLegacyBlobContainer extends AbstractBlobContainer {
protected AbstractLegacyBlobContainer(BlobPath path) {
super(path);
}
/**
* Creates a new {@link InputStream} for the given blob name
* <p/>
* This method is deprecated and is used only for compatibility with older blob containers
* The new blob containers should use readBlob/writeBlob methods instead
*/
@Deprecated
protected abstract InputStream openInput(String blobName) throws IOException;
/**
* Creates a new OutputStream for the given blob name
* <p/>
* This method is deprecated and is used only for compatibility with older blob containers
* The new blob containers should override readBlob/writeBlob methods instead
*/
@Deprecated
protected abstract OutputStream createOutput(String blobName) throws IOException;
@Override
public InputStream readBlob(String blobName) throws IOException {
return openInput(blobName);
}
@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
try (OutputStream stream = createOutput(blobName)) {
Streams.copy(inputStream, stream);
}
}
@Override
public void writeBlob(String blobName, BytesReference data) throws IOException {
try (OutputStream stream = createOutput(blobName)) {
data.writeTo(stream);
}
}
}

View File

@ -23,11 +23,11 @@ import com.google.common.collect.ImmutableMap;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.bytes.BytesReference;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
/**
@ -99,12 +99,17 @@ public class URLBlobContainer extends AbstractBlobContainer {
}
@Override
public InputStream openInput(String name) throws IOException {
public InputStream readBlob(String name) throws IOException {
return new BufferedInputStream(new URL(path, name).openStream(), blobStore.bufferSizeInBytes());
}
@Override
public OutputStream createOutput(String blobName) throws IOException {
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
throw new UnsupportedOperationException("URL repository doesn't support this operation");
}
@Override
public void writeBlob(String blobName, BytesReference data) throws IOException {
throw new UnsupportedOperationException("URL repository doesn't support this operation");
}
}

View File

@ -36,11 +36,11 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
@ -71,7 +71,6 @@ import org.elasticsearch.repositories.blobstore.LegacyBlobStoreFormat;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -230,8 +229,8 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
BlobContainer testBlobContainer = blobStore.blobContainer(basePath.add(testBlobPrefix(seed)));
DiscoveryNode localNode = clusterService.localNode();
if (testBlobContainer.blobExists("master.dat")) {
try (OutputStream outputStream = testBlobContainer.createOutput("data-" + localNode.getId() + ".dat")) {
outputStream.write(Strings.toUTF8Bytes(seed));
try {
testBlobContainer.writeBlob("data-" + localNode.getId() + ".dat", new BytesArray(seed));
} catch (IOException exp) {
throw new RepositoryVerificationException(repositoryName, "store location [" + blobStore + "] is not accessible on the node [" + localNode + "]", exp);
}
@ -647,12 +646,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
final InputStreamIndexInput inputStreamIndexInput = new InputStreamIndexInput(indexInput, fileInfo.partBytes());
InputStream inputStream = snapshotRateLimiter == null ? inputStreamIndexInput : new RateLimitingInputStream(inputStreamIndexInput, snapshotRateLimiter, snapshotThrottleListener);
inputStream = new AbortableInputStream(inputStream, fileInfo.physicalName());
try (OutputStream output = blobContainer.createOutput(fileInfo.partName(i))) {
int len;
while ((len = inputStream.read(buffer)) > 0) {
output.write(buffer, 0, len);
}
}
blobContainer.writeBlob(fileInfo.partName(i), inputStream, fileInfo.partBytes());
}
Store.verify(indexInput);
snapshotStatus.addProcessedFile(fileInfo.length());
@ -768,8 +762,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
@Override
protected InputStream openSlice(long slice) throws IOException {
return container.openInput(info.partName(slice));
return container.readBlob(info.partName(slice));
}
}

View File

@ -577,9 +577,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
if (snapshotsBlobContainer.blobExists(SNAPSHOTS_FILE)) {
snapshotsBlobContainer.deleteBlob(SNAPSHOTS_FILE);
}
try (OutputStream output = snapshotsBlobContainer.createOutput(SNAPSHOTS_FILE)) {
bRef.writeTo(output);
}
snapshotsBlobContainer.writeBlob(SNAPSHOTS_FILE, bRef);
}
/**
@ -591,7 +589,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
* @throws IOException I/O errors
*/
protected List<SnapshotId> readSnapshotList() throws IOException {
try (InputStream blob = snapshotsBlobContainer.openInput(SNAPSHOTS_FILE)) {
try (InputStream blob = snapshotsBlobContainer.readBlob(SNAPSHOTS_FILE)) {
final byte[] data = ByteStreams.toByteArray(blob);
ArrayList<SnapshotId> snapshots = new ArrayList<>();
try (XContentParser parser = XContentHelper.createParser(new BytesArray(data))) {
@ -643,9 +641,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
byte[] testBytes = Strings.toUTF8Bytes(seed);
BlobContainer testContainer = blobStore().blobContainer(basePath().add(testBlobPrefix(seed)));
String blobName = "master.dat";
try (OutputStream outputStream = testContainer.createOutput(blobName + "-temp")) {
outputStream.write(testBytes);
}
testContainer.writeBlob(blobName + "-temp", new BytesArray(testBytes));
// Make sure that move is supported
testContainer.move(blobName + "-temp", blobName);
return seed;

View File

@ -36,9 +36,7 @@ import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.gateway.CorruptStateException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.*;
import java.util.Locale;
/**
@ -94,7 +92,7 @@ public class ChecksumBlobStoreFormat<T extends ToXContent> extends BlobStoreForm
* @throws IOException
*/
public T readBlob(BlobContainer blobContainer, String blobName) throws IOException {
try (InputStream inputStream = blobContainer.openInput(blobName)) {
try (InputStream inputStream = blobContainer.readBlob(blobName)) {
byte[] bytes = ByteStreams.toByteArray(inputStream);
final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")";
try (ByteArrayIndexInput indexInput = new ByteArrayIndexInput(resourceDesc, bytes)) {
@ -163,9 +161,9 @@ public class ChecksumBlobStoreFormat<T extends ToXContent> extends BlobStoreForm
*/
protected void writeBlob(T obj, BlobContainer blobContainer, String blobName) throws IOException {
BytesReference bytes = write(obj);
try (OutputStream outputStream = blobContainer.createOutput(blobName)) {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
final String resourceDesc = "ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")";
try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(resourceDesc, outputStream, BUFFER_SIZE)) {
try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(resourceDesc, byteArrayOutputStream, BUFFER_SIZE)) {
CodecUtil.writeHeader(indexOutput, codec, VERSION);
try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) {
@Override
@ -177,6 +175,7 @@ public class ChecksumBlobStoreFormat<T extends ToXContent> extends BlobStoreForm
}
CodecUtil.writeFooter(indexOutput);
}
blobContainer.writeBlob(blobName, new BytesArray(byteArrayOutputStream.toByteArray()));
}
}

View File

@ -52,7 +52,7 @@ public class LegacyBlobStoreFormat<T extends ToXContent> extends BlobStoreFormat
* @throws IOException
*/
public T readBlob(BlobContainer blobContainer, String blobName) throws IOException {
try (InputStream inputStream = blobContainer.openInput(blobName)) {
try (InputStream inputStream = blobContainer.readBlob(blobName)) {
return read(new BytesArray(ByteStreams.toByteArray(inputStream)));
}
}

View File

@ -22,6 +22,7 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.common.blobstore.fs.FsBlobStore;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -30,7 +31,6 @@ import org.junit.Test;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
@ -47,10 +47,8 @@ public class BlobStoreTests extends ESTestCase {
final BlobStore store = newBlobStore();
final BlobContainer container = store.blobContainer(new BlobPath());
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
try (OutputStream stream = container.createOutput("foobar")) {
stream.write(data);
}
try (InputStream stream = container.openInput("foobar")) {
container.writeBlob("foobar", new BytesArray(data));
try (InputStream stream = container.readBlob("foobar")) {
BytesRefBuilder target = new BytesRefBuilder();
while (target.length() < data.length) {
byte[] buffer = new byte[scaledRandomIntBetween(1, data.length - target.length())];
@ -115,15 +113,13 @@ public class BlobStoreTests extends ESTestCase {
protected byte[] createRandomBlob(BlobContainer container, String name, int length) throws IOException {
byte[] data = randomBytes(length);
try (OutputStream stream = container.createOutput(name)) {
stream.write(data);
}
container.writeBlob(name, new BytesArray(data));
return data;
}
protected byte[] readBlobFully(BlobContainer container, String name, int length) throws IOException {
byte[] data = new byte[length];
try (InputStream inputStream = container.openInput(name)) {
try (InputStream inputStream = container.readBlob(name)) {
assertThat(inputStream.read(data), equalTo(length));
assertThat(inputStream.read(), equalTo(-1));
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.fs.FsBlobStore;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.Streams;
@ -42,7 +43,6 @@ import org.junit.Test;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import java.util.concurrent.*;
@ -122,9 +122,7 @@ public class BlobStoreFormatIT extends AbstractSnapshotIntegTestCase {
public void write(T obj, BlobContainer blobContainer, String blobName) throws IOException {
BytesReference bytes = write(obj);
try (OutputStream outputStream = blobContainer.createOutput(blobName)) {
bytes.writeTo(outputStream);
}
blobContainer.writeBlob(blobName, bytes);
}
private BytesReference write(T obj) throws IOException {
@ -272,16 +270,14 @@ public class BlobStoreFormatIT extends AbstractSnapshotIntegTestCase {
protected void randomCorruption(BlobContainer blobContainer, String blobName) throws IOException {
byte[] buffer = new byte[(int) blobContainer.listBlobsByPrefix(blobName).get(blobName).length()];
long originalChecksum = checksum(buffer);
try (InputStream inputStream = blobContainer.openInput(blobName)) {
try (InputStream inputStream = blobContainer.readBlob(blobName)) {
Streams.readFully(inputStream, buffer);
}
do {
int location = randomIntBetween(0, buffer.length - 1);
buffer[location] = (byte) (buffer[location] ^ 42);
} while (originalChecksum == checksum(buffer));
try (OutputStream outputStream = blobContainer.createOutput(blobName)) {
Streams.copy(buffer, outputStream);
}
blobContainer.writeBlob(blobName, new BytesArray(buffer));
}
private long checksum(byte[] buffer) throws IOException {

View File

@ -18,14 +18,13 @@
*/
package org.elasticsearch.snapshots.mockstore;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.bytes.BytesReference;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Map;
@ -50,13 +49,18 @@ public class BlobContainerWrapper implements BlobContainer {
}
@Override
public InputStream openInput(String name) throws IOException {
return delegate.openInput(name);
public InputStream readBlob(String name) throws IOException {
return delegate.readBlob(name);
}
@Override
public OutputStream createOutput(String blobName) throws IOException {
return delegate.createOutput(blobName);
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
delegate.writeBlob(blobName, inputStream, blobSize);
}
@Override
public void writeBlob(String blobName, BytesReference bytes) throws IOException {
delegate.writeBlob(blobName, bytes);
}
@Override

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Module;
@ -43,7 +44,6 @@ import org.elasticsearch.repositories.fs.FsRepository;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.nio.file.Path;
import java.security.MessageDigest;
@ -317,9 +317,9 @@ public class MockRepository extends FsRepository {
}
@Override
public InputStream openInput(String name) throws IOException {
public InputStream readBlob(String name) throws IOException {
maybeIOExceptionOrBlock(name);
return super.openInput(name);
return super.readBlob(name);
}
@Override
@ -353,9 +353,15 @@ public class MockRepository extends FsRepository {
}
@Override
public OutputStream createOutput(String blobName) throws IOException {
public void writeBlob(String blobName, BytesReference bytes) throws IOException {
maybeIOExceptionOrBlock(blobName);
return super.createOutput(blobName);
super.writeBlob(blobName, bytes);
}
@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
maybeIOExceptionOrBlock(blobName);
super.writeBlob(blobName, inputStream, blobSize);
}
}
}

View File

@ -23,7 +23,7 @@ import com.microsoft.azure.storage.StorageException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.AbstractLegacyBlobContainer;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.repositories.RepositoryException;
@ -39,7 +39,7 @@ import java.util.Map;
/**
*
*/
public class AzureBlobContainer extends AbstractBlobContainer {
public class AzureBlobContainer extends AbstractLegacyBlobContainer {
protected final ESLogger logger = Loggers.getLogger(AzureBlobContainer.class);
protected final AzureBlobStore blobStore;

View File

@ -25,7 +25,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStoreException;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.AbstractLegacyBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.MapBuilder;
@ -38,7 +38,7 @@ import java.util.Map;
/**
*
*/
public class S3BlobContainer extends AbstractBlobContainer {
public class S3BlobContainer extends AbstractLegacyBlobContainer {
protected final S3BlobStore blobStore;