[STORE] Simplify reading / writing from and to BlobContainer

BlobContainer used to provide async APIs which are not used
internally. The implementation of these APIs are also not async
by nature and neither is any of the pluggable BlobContainers. This
commit simplifies the API to a simple input / output stream and
reduces the hierarchy of BlobContainer dramatically.
NOTE: This is a breaking change!

Closes #7551
This commit is contained in:
Simon Willnauer 2014-09-03 10:15:16 +02:00
parent 6a0a7afea6
commit 7f32e8c707
25 changed files with 562 additions and 812 deletions

View File

@ -41,11 +41,6 @@ pools, but the important ones include:
keep-alive `5m`,
size `(# of available processors)/2`.
`snapshot_data`::
For snapshot/restore operations on data files, defaults to `scaling`
with a `5m` keep-alive,
size `5`.
`warmer`::
For segment warm-up operations, defaults to `scaling`
with a `5m` keep-alive.

View File

@ -22,6 +22,8 @@ package org.elasticsearch.common.blobstore;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
*
@ -35,22 +37,19 @@ public interface BlobContainer {
boolean accept(String blobName);
}
interface ReadBlobListener {
void onPartial(byte[] data, int offset, int size) throws IOException;
void onCompleted();
void onFailure(Throwable t);
}
BlobPath path();
boolean blobExists(String blobName);
void readBlob(String blobName, ReadBlobListener listener);
/**
* Creates a new {@link InputStream} for the given blob name
*/
InputStream openInput(String blobName) throws IOException;
byte[] readBlobFully(String blobName) throws IOException;
/**
* Creates a new OutputStream for the given blob name
*/
OutputStream createOutput(String blobName) throws IOException;
boolean deleteBlob(String blobName) throws IOException;

View File

@ -23,7 +23,7 @@ package org.elasticsearch.common.blobstore;
*/
public interface BlobStore {
ImmutableBlobContainer immutableBlobContainer(BlobPath path);
BlobContainer blobContainer(BlobPath path);
void delete(BlobPath path);

View File

@ -1,39 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.blobstore;
import java.io.IOException;
import java.io.InputStream;
/**
*
*/
public interface ImmutableBlobContainer extends BlobContainer {
interface WriterListener {
void onCompleted();
void onFailure(Throwable t);
}
void writeBlob(String blobName, InputStream is, long sizeInBytes, WriterListener listener);
void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException;
}

View File

@ -21,27 +21,26 @@ package org.elasticsearch.common.blobstore.fs;
import com.google.common.collect.ImmutableMap;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.FileSystemUtils;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.*;
/**
*
*/
public abstract class AbstractFsBlobContainer extends AbstractBlobContainer {
public class FsBlobContainer extends AbstractBlobContainer {
protected final FsBlobStore blobStore;
protected final File path;
public AbstractFsBlobContainer(FsBlobStore blobStore, BlobPath blobPath, File path) {
public FsBlobContainer(FsBlobStore blobStore, BlobPath blobPath, File path) {
super(blobPath);
this.blobStore = blobStore;
this.path = path;
@ -76,30 +75,20 @@ public abstract class AbstractFsBlobContainer extends AbstractBlobContainer {
}
@Override
public void readBlob(final String blobName, final ReadBlobListener listener) {
blobStore.executor().execute(new Runnable() {
public InputStream openInput(String name) throws IOException {
return new BufferedInputStream(new FileInputStream(new File(path, name)), blobStore.bufferSizeInBytes());
}
@Override
public OutputStream createOutput(String blobName) throws IOException {
final File file = new File(path, blobName);
return new BufferedOutputStream(new FilterOutputStream(new FileOutputStream(file)) {
@Override
public void run() {
byte[] buffer = new byte[blobStore.bufferSizeInBytes()];
FileInputStream is = null;
try {
is = new FileInputStream(new File(path, blobName));
int bytesRead;
while ((bytesRead = is.read(buffer)) != -1) {
listener.onPartial(buffer, 0, bytesRead);
}
} catch (Throwable t) {
IOUtils.closeWhileHandlingException(is);
listener.onFailure(t);
return;
}
try {
IOUtils.closeWhileHandlingException(is);
listener.onCompleted();
} catch (Throwable t) {
listener.onFailure(t);
}
public void close() throws IOException {
super.close();
IOUtils.fsync(file, false);
IOUtils.fsync(path, true);
}
});
}, blobStore.bufferSizeInBytes());
}
}

View File

@ -19,35 +19,30 @@
package org.elasticsearch.common.blobstore.fs;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.BlobStoreException;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.File;
import java.util.concurrent.Executor;
/**
*
*/
public class FsBlobStore extends AbstractComponent implements BlobStore {
private final ThreadPool threadPool;
private final File path;
private final int bufferSizeInBytes;
public FsBlobStore(Settings settings, ThreadPool threadPool, File path) {
public FsBlobStore(Settings settings, File path) {
super(settings);
this.path = path;
this.threadPool = threadPool;
if (!path.exists()) {
boolean b = FileSystemUtils.mkdirs(path);
if (!b) {
@ -73,13 +68,9 @@ public class FsBlobStore extends AbstractComponent implements BlobStore {
return this.bufferSizeInBytes;
}
public Executor executor() {
return threadPool.executor(ThreadPool.Names.SNAPSHOT_DATA);
}
@Override
public ImmutableBlobContainer immutableBlobContainer(BlobPath path) {
return new FsImmutableBlobContainer(this, path, buildAndCreate(path));
public BlobContainer blobContainer(BlobPath path) {
return new FsBlobContainer(this, path, buildAndCreate(path));
}
@Override

View File

@ -1,88 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.blobstore.fs;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.blobstore.support.BlobStores;
import org.elasticsearch.common.io.FileSystemUtils;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
/**
*
*/
public class FsImmutableBlobContainer extends AbstractFsBlobContainer implements ImmutableBlobContainer {
public FsImmutableBlobContainer(FsBlobStore blobStore, BlobPath blobPath, File path) {
super(blobStore, blobPath, path);
}
@Override
public void writeBlob(final String blobName, final InputStream stream, final long sizeInBytes, final WriterListener listener) {
blobStore.executor().execute(new Runnable() {
@Override
public void run() {
final File file = new File(path, blobName);
boolean success = false;
try {
try (final RandomAccessFile raf = new RandomAccessFile(file, "rw");
final InputStream is = stream) {
// clean the file if it exists
raf.setLength(0);
long bytesWritten = 0;
final byte[] buffer = new byte[blobStore.bufferSizeInBytes()];
int bytesRead;
while ((bytesRead = is.read(buffer)) != -1) {
raf.write(buffer, 0, bytesRead);
bytesWritten += bytesRead;
}
if (bytesWritten != sizeInBytes) {
throw new ElasticsearchIllegalStateException("[" + blobName + "]: wrote [" + bytesWritten + "], expected to write [" + sizeInBytes + "]");
}
// fsync the FD we are done with writing
raf.getFD().sync();
// try to fsync the directory to make sure all metadata is written to
// the storage device - NOTE: if it's a dir it will not throw any exception
FileSystemUtils.syncFile(path, true);
}
success = true;
} catch (Throwable e) {
listener.onFailure(e);
// just on the safe size, try and delete it on failure
FileSystemUtils.tryDeleteFile(file);
} finally {
if (success) {
listener.onCompleted();
}
}
}
});
}
@Override
public void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException {
BlobStores.syncWriteBlob(this, blobName, is, sizeInBytes);
}
}

View File

@ -24,11 +24,7 @@ import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.io.*;
/**
*
@ -46,46 +42,6 @@ public abstract class AbstractBlobContainer implements BlobContainer {
return this.path;
}
@Override
public byte[] readBlobFully(String blobName) throws IOException {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> failure = new AtomicReference<>();
final ByteArrayOutputStream bos = new ByteArrayOutputStream();
readBlob(blobName, new ReadBlobListener() {
@Override
public void onPartial(byte[] data, int offset, int size) {
bos.write(data, offset, size);
}
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onFailure(Throwable t) {
failure.set(t);
latch.countDown();
}
});
try {
latch.await();
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted while waiting to read [" + blobName + "]");
}
if (failure.get() != null) {
if (failure.get() instanceof IOException) {
throw (IOException) failure.get();
} else {
throw new IOException("Failed to get [" + blobName + "]", failure.get());
}
}
return bos.toByteArray();
}
@Override
public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException {
ImmutableMap<String, BlobMetaData> allBlobs = listBlobs();
@ -117,4 +73,5 @@ public abstract class AbstractBlobContainer implements BlobContainer {
}
}
}
}

View File

@ -1,64 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.blobstore.support;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
/**
*
*/
public class BlobStores {
public static void syncWriteBlob(ImmutableBlobContainer blobContainer, String blobName, InputStream is, long sizeInBytes) throws IOException {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> failure = new AtomicReference<>();
blobContainer.writeBlob(blobName, is, sizeInBytes, new ImmutableBlobContainer.WriterListener() {
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onFailure(Throwable t) {
failure.set(t);
latch.countDown();
}
});
try {
latch.await();
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted while waiting to write [" + blobName + "]");
}
if (failure.get() != null) {
if (failure.get() instanceof IOException) {
throw (IOException) failure.get();
} else {
throw new IOException("Failed to get [" + blobName + "]", failure.get());
}
}
}
}

View File

@ -20,32 +20,33 @@
package org.elasticsearch.common.blobstore.url;
import com.google.common.collect.ImmutableMap;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
/**
* URL blob implementation of {@link org.elasticsearch.common.blobstore.BlobContainer}
*/
public abstract class AbstractURLBlobContainer extends AbstractBlobContainer {
public class URLBlobContainer extends AbstractBlobContainer {
protected final URLBlobStore blobStore;
protected final URL path;
/**
* Constructs new AbstractURLBlobContainer
* Constructs new URLBlobContainer
*
* @param blobStore blob store
* @param blobPath blob path for this container
* @param path URL for this container
*/
public AbstractURLBlobContainer(URLBlobStore blobStore, BlobPath blobPath, URL path) {
public URLBlobContainer(URLBlobStore blobStore, BlobPath blobPath, URL path) {
super(blobPath);
this.blobStore = blobStore;
this.path = path;
@ -61,7 +62,7 @@ public abstract class AbstractURLBlobContainer extends AbstractBlobContainer {
}
/**
* This operation is not supported by AbstractURLBlobContainer
* This operation is not supported by URLBlobContainer
*/
@Override
public ImmutableMap<String, BlobMetaData> listBlobs() throws IOException {
@ -69,7 +70,7 @@ public abstract class AbstractURLBlobContainer extends AbstractBlobContainer {
}
/**
* This operation is not supported by AbstractURLBlobContainer
* This operation is not supported by URLBlobContainer
*/
@Override
public boolean deleteBlob(String blobName) throws IOException {
@ -77,41 +78,20 @@ public abstract class AbstractURLBlobContainer extends AbstractBlobContainer {
}
/**
* This operation is not supported by AbstractURLBlobContainer
* This operation is not supported by URLBlobContainer
*/
@Override
public boolean blobExists(String blobName) {
throw new UnsupportedOperationException("URL repository doesn't support this operation");
}
/**
* {@inheritDoc}
*/
@Override
public void readBlob(final String blobName, final ReadBlobListener listener) {
blobStore.executor().execute(new Runnable() {
@Override
public void run() {
byte[] buffer = new byte[blobStore.bufferSizeInBytes()];
InputStream is = null;
try {
is = new URL(path, blobName).openStream();
int bytesRead;
while ((bytesRead = is.read(buffer)) != -1) {
listener.onPartial(buffer, 0, bytesRead);
}
} catch (Throwable t) {
IOUtils.closeWhileHandlingException(is);
listener.onFailure(t);
return;
}
try {
IOUtils.closeWhileHandlingException(is);
listener.onCompleted();
} catch (Throwable t) {
listener.onFailure(t);
}
}
});
public InputStream openInput(String name) throws IOException {
return new BufferedInputStream(new URL(path, name).openStream(), blobStore.bufferSizeInBytes());
}
@Override
public OutputStream createOutput(String blobName) throws IOException {
throw new UnsupportedOperationException("URL repository doesn't support this operation");
}
}

View File

@ -19,10 +19,10 @@
package org.elasticsearch.common.blobstore.url;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.BlobStoreException;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
@ -38,8 +38,6 @@ import java.util.concurrent.Executor;
*/
public class URLBlobStore extends AbstractComponent implements BlobStore {
private final ThreadPool threadPool;
private final URL path;
private final int bufferSizeInBytes;
@ -54,14 +52,12 @@ public class URLBlobStore extends AbstractComponent implements BlobStore {
* </dl>
*
* @param settings settings
* @param threadPool thread pool for read operations
* @param path base URL
*/
public URLBlobStore(Settings settings, ThreadPool threadPool, URL path) {
public URLBlobStore(Settings settings, URL path) {
super(settings);
this.path = path;
this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();
this.threadPool = threadPool;
}
/**
@ -90,22 +86,13 @@ public class URLBlobStore extends AbstractComponent implements BlobStore {
return this.bufferSizeInBytes;
}
/**
* Returns executor used for read operations
*
* @return executor
*/
public Executor executor() {
return threadPool.executor(ThreadPool.Names.SNAPSHOT_DATA);
}
/**
* {@inheritDoc}
*/
@Override
public ImmutableBlobContainer immutableBlobContainer(BlobPath path) {
public BlobContainer blobContainer(BlobPath path) {
try {
return new URLImmutableBlobContainer(this, path, buildPath(path));
return new URLBlobContainer(this, path, buildPath(path));
} catch (MalformedURLException ex) {
throw new BlobStoreException("malformed URL " + path, ex);
}

View File

@ -1,60 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.blobstore.url;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
/**
* Read-only URL-based implementation of {@link ImmutableBlobContainer}
*/
public class URLImmutableBlobContainer extends AbstractURLBlobContainer implements ImmutableBlobContainer {
/**
* Constructs a new URLImmutableBlobContainer
*
* @param blobStore blob store
* @param blobPath blob path to this container
* @param path URL of this container
*/
public URLImmutableBlobContainer(URLBlobStore blobStore, BlobPath blobPath, URL path) {
super(blobStore, blobPath, path);
}
/**
* This operation is not supported by URL Blob Container
*/
@Override
public void writeBlob(final String blobName, final InputStream is, final long sizeInBytes, final WriterListener listener) {
throw new UnsupportedOperationException("URL repository is read only");
}
/**
* This operation is not supported by URL Blob Container
*/
@Override
public void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException {
throw new UnsupportedOperationException("URL repository is read only");
}
}

View File

@ -22,16 +22,15 @@ package org.elasticsearch.index.snapshots.blobstore;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.io.ByteStreams;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.*;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.common.blobstore.*;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.settings.Settings;
@ -48,14 +47,12 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.RepositoryName;
import java.io.ByteArrayOutputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.collect.Lists.newArrayList;
@ -64,6 +61,7 @@ import static com.google.common.collect.Lists.newArrayList;
*/
public class BlobStoreIndexShardRepository extends AbstractComponent implements IndexShardRepository {
private static final int BUFFER_SIZE = 4096;
private BlobStore blobStore;
private BlobPath basePath;
@ -144,8 +142,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
*/
@Override
public void restore(SnapshotId snapshotId, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) {
RestoreContext snapshotContext = new RestoreContext(snapshotId, shardId, snapshotShardId, recoveryState);
final RestoreContext snapshotContext = new RestoreContext(snapshotId, shardId, snapshotShardId, recoveryState);
try {
recoveryState.getIndex().startTime(System.currentTimeMillis());
snapshotContext.restore();
@ -205,24 +202,25 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
* Serializes snapshot to JSON
*
* @param snapshot snapshot
* @return JSON representation of the snapshot
* @throws IOException
* @param stream the stream to output the snapshot JSON represetation to
* @throws IOException if an IOException occurs
*/
public static byte[] writeSnapshot(BlobStoreIndexShardSnapshot snapshot) throws IOException {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON).prettyPrint();
public static void writeSnapshot(BlobStoreIndexShardSnapshot snapshot, OutputStream stream) throws IOException {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream).prettyPrint();
BlobStoreIndexShardSnapshot.toXContent(snapshot, builder, ToXContent.EMPTY_PARAMS);
return builder.bytes().toBytes();
builder.flush();
builder.close();
}
/**
* Parses JSON representation of a snapshot
*
* @param data JSON
* @param stream JSON
* @return snapshot
* @throws IOException
*/
public static BlobStoreIndexShardSnapshot readSnapshot(byte[] data) throws IOException {
try (XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(data)) {
* @throws IOException if an IOException occurs
* */
public static BlobStoreIndexShardSnapshot readSnapshot(InputStream stream) throws IOException {
try (XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(stream)) {
parser.nextToken();
return BlobStoreIndexShardSnapshot.fromXContent(parser);
}
@ -237,7 +235,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
protected final ShardId shardId;
protected final ImmutableBlobContainer blobContainer;
protected final BlobContainer blobContainer;
public Context(SnapshotId snapshotId, ShardId shardId) {
this(snapshotId, shardId, shardId);
@ -246,7 +244,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
public Context(SnapshotId snapshotId, ShardId shardId, ShardId snapshotShardId) {
this.snapshotId = snapshotId;
this.shardId = shardId;
blobContainer = blobStore.immutableBlobContainer(basePath.add("indices").add(snapshotShardId.getIndex()).add(Integer.toString(snapshotShardId.getId())));
blobContainer = blobStore.blobContainer(basePath.add("indices").add(snapshotShardId.getIndex()).add(Integer.toString(snapshotShardId.getId())));
}
/**
@ -286,8 +284,8 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
*/
public BlobStoreIndexShardSnapshot loadSnapshot() {
BlobStoreIndexShardSnapshot snapshot;
try {
snapshot = readSnapshot(blobContainer.readBlobFully(snapshotBlobName(snapshotId)));
try (InputStream stream = blobContainer.openInput(snapshotBlobName(snapshotId))) {
snapshot = readSnapshot(stream);
} catch (IOException ex) {
throw new IndexShardRestoreFailedException(shardId, "failed to read shard snapshot file", ex);
}
@ -362,8 +360,8 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
List<BlobStoreIndexShardSnapshot> snapshots = Lists.newArrayList();
for (String name : blobs.keySet()) {
if (name.startsWith(SNAPSHOT_PREFIX)) {
try {
snapshots.add(readSnapshot(blobContainer.readBlobFully(name)));
try (InputStream stream = blobContainer.openInput(name)) {
snapshots.add(readSnapshot(stream));
} catch (IOException e) {
logger.warn("failed to read commit point [{}]", e, name);
}
@ -469,28 +467,15 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.STARTED);
final CountDownLatch indexLatch = new CountDownLatch(filesToSnapshot.size());
for (FileInfo snapshotFileInfo : filesToSnapshot) {
try {
snapshotFile(snapshotFileInfo, indexLatch, failures);
snapshotFile(snapshotFileInfo);
} catch (IOException e) {
failures.add(e);
throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e);
}
}
snapshotStatus.indexVersion(snapshotIndexCommit.getGeneration());
try {
indexLatch.await();
} catch (InterruptedException e) {
failures.add(e);
Thread.currentThread().interrupt();
}
if (!failures.isEmpty()) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", failures.get(0));
}
// now create and write the commit point
snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FINALIZE);
@ -501,9 +486,10 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
System.currentTimeMillis() - snapshotStatus.startTime(), indexNumberOfFiles, indexTotalFilesSize);
//TODO: The time stored in snapshot doesn't include cleanup time.
try {
byte[] snapshotData = writeSnapshot(snapshot);
logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
blobContainer.writeBlob(commitPointName, new BytesStreamInput(snapshotData, false), snapshotData.length);
try (OutputStream output = blobContainer.createOutput(commitPointName)) {
writeSnapshot(snapshot, output);
}
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
}
@ -529,98 +515,32 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
* added to the {@code failures} list
*
* @param fileInfo file to be snapshotted
* @param latch latch that should be counted down once file is snapshoted
* @param failures thread-safe list of failures
* @throws IOException
*/
private void snapshotFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, final CountDownLatch latch, final List<Throwable> failures) throws IOException {
private void snapshotFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo) throws IOException {
final String file = fileInfo.physicalName();
IndexInput indexInput = store.openVerifyingInput(file, IOContext.READONCE, fileInfo.metadata());
writeBlob(indexInput, fileInfo, 0, latch, failures);
}
private class BlobPartWriter implements ImmutableBlobContainer.WriterListener {
private final int part;
private final FileInfo fileInfo;
private final List<Throwable> failures;
private final CountDownLatch latch;
private final IndexInput indexInput;
private final InputStream inputStream;
private final InputStreamIndexInput inputStreamIndexInput;
private BlobPartWriter(IndexInput indexInput, FileInfo fileInfo, int part, CountDownLatch latch, List<Throwable> failures) throws IOException {
this.indexInput = indexInput;
this.part = part;
this.fileInfo = fileInfo;
this.failures = failures;
this.latch = latch;
inputStreamIndexInput = new InputStreamIndexInput(indexInput, fileInfo.partBytes());
InputStream inputStream = inputStreamIndexInput;
if (snapshotRateLimiter != null) {
inputStream = new RateLimitingInputStream(inputStream, snapshotRateLimiter, snapshotThrottleListener);
}
inputStream = new AbortableInputStream(inputStream, fileInfo.physicalName());
this.inputStream = inputStream;
}
@Override
public void onCompleted() {
int nextPart = part + 1;
if (nextPart < fileInfo.numberOfParts()) {
try {
// We have more parts to go
writeBlob(indexInput, fileInfo, nextPart, latch, failures);
} catch (Throwable t) {
onFailure(t);
try (IndexInput indexInput = store.openVerifyingInput(file, IOContext.READONCE, fileInfo.metadata())) {
for (int i = 0; i < fileInfo.numberOfParts(); i++) {
final InputStreamIndexInput inputStreamIndexInput = new InputStreamIndexInput(indexInput, fileInfo.partBytes());
InputStream inputStream = snapshotRateLimiter == null ? inputStreamIndexInput : new RateLimitingInputStream(inputStreamIndexInput, snapshotRateLimiter, snapshotThrottleListener);
inputStream = new AbortableInputStream(inputStream, fileInfo.physicalName());
try (OutputStream output = blobContainer.createOutput(fileInfo.partName(i))) {
int len;
final byte[] buffer = new byte[BUFFER_SIZE];
while ((len = inputStream.read(buffer)) > 0) {
output.write(buffer, 0, len);
}
}
} else {
// Last part - verify checksum
try {
Store.verify(indexInput);
indexInput.close();
snapshotStatus.addProcessedFile(fileInfo.length());
} catch (Throwable t) {
onFailure(t);
return;
}
latch.countDown();
}
}
@Override
public void onFailure(Throwable t) {
cleanupFailedSnapshot(t, indexInput, latch, failures);
}
public void writeBlobPart() throws IOException {
blobContainer.writeBlob(fileInfo.partName(part), inputStream, inputStreamIndexInput.actualSizeToRead(), this);
}
}
private void writeBlob(IndexInput indexInput, FileInfo fileInfo, int part, CountDownLatch latch, List<Throwable> failures) {
try {
new BlobPartWriter(indexInput, fileInfo, part, latch, failures).writeBlobPart();
Store.verify(indexInput);
snapshotStatus.addProcessedFile(fileInfo.length());
} catch (Throwable t) {
cleanupFailedSnapshot(t, indexInput, latch, failures);
failStoreIfCorrupted(t);
snapshotStatus.addProcessedFile(0);
throw t;
}
}
private void cleanupFailedSnapshot(Throwable t, IndexInput indexInput, CountDownLatch latch, List<Throwable> failures) {
IOUtils.closeWhileHandlingException(indexInput);
failStoreIfCorrupted(t);
snapshotStatus.addProcessedFile(0);
failures.add(t);
latch.countDown();
}
private void failStoreIfCorrupted(Throwable t) {
if (t instanceof CorruptIndexException) {
try {
@ -693,72 +613,42 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
* The new logic for StoreFileMetaData reads the entire <tt>.si</tt> and <tt>segments.n</tt> files to strengthen the
* comparison of the files on a per-segment / per-commit level.
*/
private static final void maybeRecalculateMetadataHash(final ImmutableBlobContainer blobContainer, final FileInfo fileInfo, Store.MetadataSnapshot snapshot) throws Throwable {
private static final void maybeRecalculateMetadataHash(final BlobContainer blobContainer, final FileInfo fileInfo, Store.MetadataSnapshot snapshot) throws Throwable {
final StoreFileMetaData metadata;
if (fileInfo != null && (metadata = snapshot.get(fileInfo.physicalName())) != null) {
if (metadata.hash().length > 0 && fileInfo.metadata().hash().length == 0) {
// we have a hash - check if our repo has a hash too otherwise we have
// to calculate it.
final ByteArrayOutputStream out = new ByteArrayOutputStream();
final CountDownLatch latch = new CountDownLatch(1);
final CopyOnWriteArrayList<Throwable> failures = new CopyOnWriteArrayList<>();
// we might have multiple parts even though the file is small... make sure we read all of it.
// TODO this API should really support a stream!
blobContainer.readBlob(fileInfo.partName(0), new BlobContainer.ReadBlobListener() {
final AtomicInteger partIndex = new AtomicInteger();
@Override
public synchronized void onPartial(byte[] data, int offset, int size) throws IOException {
out.write(data, offset, size);
}
@Override
public synchronized void onCompleted() {
boolean countDown = true;
try {
final int part = partIndex.incrementAndGet();
if (part < fileInfo.numberOfParts()) {
final String partName = fileInfo.partName(part);
// continue with the new part
blobContainer.readBlob(partName, this);
countDown = false;
return;
}
} finally {
if (countDown) {
latch.countDown();
}
}
}
@Override
public void onFailure(Throwable t) {
try {
failures.add(t);
} finally {
latch.countDown();
}
}
});
try {
latch.await();
} catch (InterruptedException e) {
Thread.interrupted();
try (final InputStream stream = new PartSliceStream(blobContainer, fileInfo)) {
final byte[] bytes = ByteStreams.toByteArray(stream);
assert bytes != null;
assert bytes.length == fileInfo.length() : bytes.length + " != " + fileInfo.length();
final BytesRef spare = new BytesRef(bytes);
Store.MetadataSnapshot.hashFile(fileInfo.metadata().hash(), spare);
}
if (!failures.isEmpty()) {
ExceptionsHelper.rethrowAndSuppress(failures);
}
final byte[] bytes = out.toByteArray();
assert bytes != null;
assert bytes.length == fileInfo.length() : bytes.length + " != " + fileInfo.length();
final BytesRef spare = new BytesRef(bytes);
Store.MetadataSnapshot.hashFile(fileInfo.metadata().hash(), spare);
}
}
}
private static final class PartSliceStream extends SlicedInputStream {
private final BlobContainer container;
private final FileInfo info;
public PartSliceStream(BlobContainer container, FileInfo info) {
super(info.numberOfParts());
this.info = info;
this.container = container;
}
@Override
protected InputStream openSlice(long slice) throws IOException {
return container.openInput(info.partName(slice));
}
}
/**
* Context for restore operations
*/
@ -864,25 +754,14 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
if (logger.isTraceEnabled()) {
logger.trace("[{}] [{}] recovering_files [{}] with total_size [{}], reusing_files [{}] with reused_size [{}]", shardId, snapshotId, numberOfFiles, new ByteSizeValue(totalSize), numberOfReusedFiles, new ByteSizeValue(reusedTotalSize));
}
final CountDownLatch latch = new CountDownLatch(filesToRecover.size());
final CopyOnWriteArrayList<Throwable> failures = new CopyOnWriteArrayList<>();
for (final FileInfo fileToRecover : filesToRecover) {
logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name());
restoreFile(fileToRecover, latch, failures);
}
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
for (final FileInfo fileToRecover : filesToRecover) {
logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name());
restoreFile(fileToRecover);
}
} catch (IOException ex) {
throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex);
}
if (!failures.isEmpty()) {
throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", failures.get(0));
}
// read the snapshot data persisted
long version = -1;
try {
@ -919,107 +798,52 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
* added to the {@code failures} list
*
* @param fileInfo file to be restored
* @param latch latch that should be counted down once file is snapshoted
* @param failures thread-safe list of failures
*/
private void restoreFile(final FileInfo fileInfo, final CountDownLatch latch, final List<Throwable> failures) {
final IndexOutput indexOutput;
try {
// we create an output with no checksum, this is because the pure binary data of the file is not
// the checksum (because of seek). We will create the checksum file once copying is done
indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), IOContext.DEFAULT, fileInfo.metadata());
} catch (IOException e) {
try {
failures.add(e);
} finally {
latch.countDown();
}
return;
}
String firstFileToRecover = fileInfo.partName(0);
final AtomicInteger partIndex = new AtomicInteger();
private void restoreFile(final FileInfo fileInfo) throws IOException {
boolean success = false;
try {
blobContainer.readBlob(firstFileToRecover, new BlobContainer.ReadBlobListener() {
@Override
public synchronized void onPartial(byte[] data, int offset, int size) throws IOException {
recoveryState.getIndex().addRecoveredByteCount(size);
RecoveryState.File file = recoveryState.getIndex().file(fileInfo.name());
RecoveryState.File file = recoveryState.getIndex().file(fileInfo.name());
try (InputStream stream = new PartSliceStream(blobContainer, fileInfo)) {
try (final IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), IOContext.DEFAULT, fileInfo.metadata())) {
final byte[] buffer = new byte[BUFFER_SIZE];
int length;
while((length=stream.read(buffer))>0){
indexOutput.writeBytes(buffer,0,length);
if (file != null) {
file.updateRecovered(size);
file.updateRecovered(length);
}
indexOutput.writeBytes(data, offset, size);
if (restoreRateLimiter != null) {
rateLimiterListener.onRestorePause(restoreRateLimiter.pause(size));
rateLimiterListener.onRestorePause(restoreRateLimiter.pause(length));
}
}
Store.verify(indexOutput);
indexOutput.close();
// write the checksum
if (fileInfo.metadata().hasLegacyChecksum()) {
Store.LegacyChecksums legacyChecksums = new Store.LegacyChecksums();
legacyChecksums.add(fileInfo.metadata());
legacyChecksums.write(store);
@Override
public synchronized void onCompleted() {
int part = partIndex.incrementAndGet();
if (part < fileInfo.numberOfParts()) {
String partName = fileInfo.partName(part);
// continue with the new part
blobContainer.readBlob(partName, this);
return;
} else {
// we are done...
try {
Store.verify(indexOutput);
indexOutput.close();
// write the checksum
if (fileInfo.metadata().hasLegacyChecksum()) {
Store.LegacyChecksums legacyChecksums = new Store.LegacyChecksums();
legacyChecksums.add(fileInfo.metadata());
legacyChecksums.write(store);
}
store.directory().sync(Collections.singleton(fileInfo.physicalName()));
recoveryState.getIndex().addRecoveredFileCount(1);
} catch (IOException e) {
onFailure(e);
return;
}
}
latch.countDown();
}
@Override
public void onFailure(Throwable t) {
try {
failures.add(t);
IOUtils.closeWhileHandlingException(indexOutput);
if (t instanceof CorruptIndexException) {
try {
store.markStoreCorrupted((CorruptIndexException) t);
} catch (IOException e) {
logger.warn("store cannot be marked as corrupted", e);
}
}
store.deleteQuiet(fileInfo.physicalName());
} finally {
latch.countDown();
}
}
});
success = true;
} finally {
if (!success) {
store.directory().sync(Collections.singleton(fileInfo.physicalName()));
recoveryState.getIndex().addRecoveredFileCount(1);
success = true;
} catch (CorruptIndexException ex) {
try {
IOUtils.closeWhileHandlingException(indexOutput);
store.deleteQuiet(fileInfo.physicalName());
} finally {
latch.countDown();
store.markStoreCorrupted(ex);
} catch (IOException e) {
logger.warn("store cannot be marked as corrupted", e);
}
throw ex;
} finally {
if (success == false) {
store.deleteQuiet(fileInfo.physicalName());
}
}
}
}
}
public interface RateLimiterListener {
void onRestorePause(long nanos);

View File

@ -0,0 +1,113 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.snapshots.blobstore;
import org.apache.lucene.util.IOUtils;
import java.io.IOException;
import java.io.InputStream;
/**
* A {@link SlicedInputStream} is a logical
* concatenation one or more input streams. In contrast to the JDKs
* {@link java.io.SequenceInputStream} this stream doesn't require the instantiation
* of all logical sub-streams ahead of time. Instead, {@link #openSlice(long)} is called
* if a new slice is required. Each slice is closed once it's been fully consumed or if
* close is called before.
*/
public abstract class SlicedInputStream extends InputStream {
private long slice = 0;
private InputStream currentStream;
private final long numSlices;
private boolean initialized = false;
/**
* Creates a new SlicedInputStream
* @param numSlices the number of slices to consume
*/
protected SlicedInputStream(final long numSlices) {
this.numSlices = numSlices;
}
private InputStream nextStream() throws IOException {
assert initialized == false || currentStream != null;
initialized = true;
IOUtils.close(currentStream);
if (slice < numSlices) {
currentStream = openSlice(slice++);
} else {
currentStream = null;
}
return currentStream;
}
/**
* Called for each logical slice given a zero based slice ordinal.
*/
protected abstract InputStream openSlice(long slice) throws IOException;
private InputStream currentStream() throws IOException {
if (currentStream == null) {
return initialized ? null : nextStream();
}
return currentStream;
}
@Override
public final int read() throws IOException {
InputStream stream = currentStream();
if (stream == null) {
return -1;
}
final int read = stream.read();
if (read == -1) {
nextStream();
return read();
}
return read;
}
@Override
public final int read(byte[] buffer, int offset, int length) throws IOException {
final InputStream stream = currentStream();
if (stream == null) {
return -1;
}
final int read = stream.read(buffer, offset, length);
if (read <= 0) {
nextStream();
return read(buffer, offset, length);
}
return read;
}
@Override
public final void close() throws IOException {
IOUtils.close(currentStream);
initialized = true;
currentStream = null;
}
@Override
public final int available() throws IOException {
InputStream stream = currentStream();
return stream == null ? 0 : stream.available();
}
}

View File

@ -22,16 +22,17 @@ package org.elasticsearch.repositories.blobstore;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.io.ByteStreams;
import org.apache.lucene.store.RateLimiter;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.CompressorFactory;
@ -52,6 +53,8 @@ import org.elasticsearch.snapshots.*;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.List;
@ -103,7 +106,7 @@ import static com.google.common.collect.Lists.newArrayList;
*/
public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Repository> implements Repository, RateLimiterListener {
private ImmutableBlobContainer snapshotsBlobContainer;
private BlobContainer snapshotsBlobContainer;
protected final String repositoryName;
@ -150,7 +153,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
*/
@Override
protected void doStart() throws ElasticsearchException {
this.snapshotsBlobContainer = blobStore().immutableBlobContainer(basePath());
this.snapshotsBlobContainer = blobStore().blobContainer(basePath());
indexShardRepository.initialize(blobStore(), basePath(), chunkSize(), snapshotRateLimiter, restoreRateLimiter, this);
}
@ -225,16 +228,20 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
throw new InvalidSnapshotNameException(snapshotId, "snapshot with such name already exists");
}
BytesReference bRef = bStream.bytes();
snapshotsBlobContainer.writeBlob(snapshotBlobName, bRef.streamInput(), bRef.length());
try (OutputStream output = snapshotsBlobContainer.createOutput(snapshotBlobName)) {
bRef.writeTo(output);
}
// Write Global MetaData
// TODO: Check if metadata needs to be written
bStream = writeGlobalMetaData(metaData);
bRef = bStream.bytes();
snapshotsBlobContainer.writeBlob(metaDataBlobName(snapshotId), bRef.streamInput(), bRef.length());
try (OutputStream output = snapshotsBlobContainer.createOutput(metaDataBlobName(snapshotId))) {
bRef.writeTo(output);
}
for (String index : indices) {
IndexMetaData indexMetaData = metaData.index(index);
BlobPath indexPath = basePath().add("indices").add(index);
ImmutableBlobContainer indexMetaDataBlobContainer = blobStore().immutableBlobContainer(indexPath);
BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath);
bStream = new BytesStreamOutput();
StreamOutput stream = bStream;
if (isCompress()) {
@ -246,7 +253,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
builder.endObject();
builder.close();
bRef = bStream.bytes();
indexMetaDataBlobContainer.writeBlob(snapshotBlobName(snapshotId), bRef.streamInput(), bRef.length());
try (OutputStream output = indexMetaDataBlobContainer.createOutput(snapshotBlobName(snapshotId))) {
bRef.writeTo(output);
}
}
} catch (IOException ex) {
throw new SnapshotCreationException(snapshotId, ex);
@ -280,7 +289,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
// Now delete all indices
for (String index : snapshot.indices()) {
BlobPath indexPath = basePath().add("indices").add(index);
ImmutableBlobContainer indexMetaDataBlobContainer = blobStore().immutableBlobContainer(indexPath);
BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath);
try {
indexMetaDataBlobContainer.deleteBlob(blobName);
} catch (IOException ex) {
@ -327,7 +336,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
snapshot = updatedSnapshot.build();
BytesStreamOutput bStream = writeSnapshot(snapshot);
BytesReference bRef = bStream.bytes();
snapshotsBlobContainer.writeBlob(blobName, bRef.streamInput(), bRef.length());
try (OutputStream output = snapshotsBlobContainer.createOutput(blobName)) {
bRef.writeTo(output);
}
ImmutableList<SnapshotId> snapshotIds = snapshots();
if (!snapshotIds.contains(snapshotId)) {
snapshotIds = ImmutableList.<SnapshotId>builder().addAll(snapshotIds).add(snapshotId).build();
@ -381,22 +392,24 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
String blobName = snapshotBlobName(snapshotId);
int retryCount = 0;
while (true) {
byte[] data = snapshotsBlobContainer.readBlobFully(blobName);
// Because we are overriding snapshot during finalization, it's possible that
// we can get an empty or incomplete snapshot for a brief moment
// retrying after some what can resolve the issue
// TODO: switch to atomic update after non-local gateways are removed and we switch to java 1.7
try {
return readSnapshot(data);
} catch (ElasticsearchParseException ex) {
if (retryCount++ < 3) {
try {
Thread.sleep(50);
} catch (InterruptedException ex1) {
Thread.currentThread().interrupt();
try (InputStream blob = snapshotsBlobContainer.openInput(blobName)){
byte[] data = ByteStreams.toByteArray(blob);
// Because we are overriding snapshot during finalization, it's possible that
// we can get an empty or incomplete snapshot for a brief moment
// retrying after some what can resolve the issue
// TODO: switch to atomic update after non-local gateways are removed and we switch to java 1.7
try {
return readSnapshot(data);
} catch (ElasticsearchParseException ex) {
if (retryCount++ < 3) {
try {
Thread.sleep(50);
} catch (InterruptedException ex1) {
Thread.currentThread().interrupt();
}
} else {
throw ex;
}
} else {
throw ex;
}
}
}
@ -409,8 +422,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
private MetaData readSnapshotMetaData(SnapshotId snapshotId, ImmutableList<String> indices, boolean ignoreIndexErrors) {
MetaData metaData;
try {
byte[] data = snapshotsBlobContainer.readBlobFully(metaDataBlobName(snapshotId));
try (InputStream blob = snapshotsBlobContainer.openInput(metaDataBlobName(snapshotId))){
byte[] data = ByteStreams.toByteArray(blob);
metaData = readMetaData(data);
} catch (FileNotFoundException | NoSuchFileException ex) {
throw new SnapshotMissingException(snapshotId, ex);
@ -420,9 +433,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
MetaData.Builder metaDataBuilder = MetaData.builder(metaData);
for (String index : indices) {
BlobPath indexPath = basePath().add("indices").add(index);
ImmutableBlobContainer indexMetaDataBlobContainer = blobStore().immutableBlobContainer(indexPath);
try {
byte[] data = indexMetaDataBlobContainer.readBlobFully(snapshotBlobName(snapshotId));
BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath);
try (InputStream blob = indexMetaDataBlobContainer.openInput(snapshotBlobName(snapshotId))) {
byte[] data = ByteStreams.toByteArray(blob);
try (XContentParser parser = XContentHelper.createParser(data, 0, data.length)) {
XContentParser.Token token;
if ((token = parser.nextToken()) == XContentParser.Token.START_OBJECT) {
@ -599,7 +612,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
builder.endObject();
builder.close();
BytesReference bRef = bStream.bytes();
snapshotsBlobContainer.writeBlob(SNAPSHOTS_FILE, bRef.streamInput(), bRef.length());
try (OutputStream output = snapshotsBlobContainer.createOutput(SNAPSHOTS_FILE)) {
bRef.writeTo(output);
}
}
/**
@ -611,23 +626,25 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
* @throws IOException I/O errors
*/
protected ImmutableList<SnapshotId> readSnapshotList() throws IOException {
byte[] data = snapshotsBlobContainer.readBlobFully(SNAPSHOTS_FILE);
ArrayList<SnapshotId> snapshots = new ArrayList<>();
try (XContentParser parser = XContentHelper.createParser(data, 0, data.length)) {
if (parser.nextToken() == XContentParser.Token.START_OBJECT) {
if (parser.nextToken() == XContentParser.Token.FIELD_NAME) {
String currentFieldName = parser.currentName();
if ("snapshots".equals(currentFieldName)) {
if (parser.nextToken() == XContentParser.Token.START_ARRAY) {
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
snapshots.add(new SnapshotId(repositoryName, parser.text()));
try (InputStream blob = snapshotsBlobContainer.openInput(SNAPSHOTS_FILE)){
final byte[] data = ByteStreams.toByteArray(blob);
ArrayList<SnapshotId> snapshots = new ArrayList<>();
try (XContentParser parser = XContentHelper.createParser(data, 0, data.length)) {
if (parser.nextToken() == XContentParser.Token.START_OBJECT) {
if (parser.nextToken() == XContentParser.Token.FIELD_NAME) {
String currentFieldName = parser.currentName();
if ("snapshots".equals(currentFieldName)) {
if (parser.nextToken() == XContentParser.Token.START_ARRAY) {
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
snapshots.add(new SnapshotId(repositoryName, parser.text()));
}
}
}
}
}
}
return ImmutableList.copyOf(snapshots);
}
return ImmutableList.copyOf(snapshots);
}
@Override

View File

@ -66,7 +66,7 @@ public class FsRepository extends BlobStoreRepository {
* @throws IOException
*/
@Inject
public FsRepository(RepositoryName name, RepositorySettings repositorySettings, ThreadPool threadPool, IndexShardRepository indexShardRepository) throws IOException {
public FsRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException {
super(name.getName(), repositorySettings, indexShardRepository);
File locationFile;
String location = repositorySettings.settings().get("location", componentSettings.get("location"));
@ -76,7 +76,7 @@ public class FsRepository extends BlobStoreRepository {
} else {
locationFile = new File(location);
}
blobStore = new FsBlobStore(componentSettings, threadPool, locationFile);
blobStore = new FsBlobStore(componentSettings, locationFile);
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", componentSettings.getAsBytesSize("chunk_size", null));
this.compress = repositorySettings.settings().getAsBoolean("compress", componentSettings.getAsBoolean("compress", false));
this.basePath = BlobPath.cleanPath();

View File

@ -63,7 +63,7 @@ public class URLRepository extends BlobStoreRepository {
* @throws IOException
*/
@Inject
public URLRepository(RepositoryName name, RepositorySettings repositorySettings, ThreadPool threadPool, IndexShardRepository indexShardRepository) throws IOException {
public URLRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException {
super(name.getName(), repositorySettings, indexShardRepository);
URL url;
String path = repositorySettings.settings().get("url", componentSettings.get("url"));
@ -73,7 +73,7 @@ public class URLRepository extends BlobStoreRepository {
url = new URL(path);
}
listDirectories = repositorySettings.settings().getAsBoolean("list_directories", componentSettings.getAsBoolean("list_directories", true));
blobStore = new URLBlobStore(componentSettings, threadPool, url);
blobStore = new URLBlobStore(componentSettings, url);
basePath = BlobPath.cleanPath();
}

View File

@ -65,7 +65,6 @@ public class RestThreadPoolAction extends AbstractCatAction {
ThreadPool.Names.REFRESH,
ThreadPool.Names.SEARCH,
ThreadPool.Names.SNAPSHOT,
ThreadPool.Names.SNAPSHOT_DATA,
ThreadPool.Names.SUGGEST,
ThreadPool.Names.WARMER
};

View File

@ -78,7 +78,6 @@ public class ThreadPool extends AbstractComponent {
public static final String REFRESH = "refresh";
public static final String WARMER = "warmer";
public static final String SNAPSHOT = "snapshot";
public static final String SNAPSHOT_DATA = "snapshot_data";
public static final String OPTIMIZE = "optimize";
public static final String BENCH = "bench";
}
@ -125,7 +124,6 @@ public class ThreadPool extends AbstractComponent {
.put(Names.REFRESH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt10).build())
.put(Names.WARMER, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build())
.put(Names.SNAPSHOT, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build())
.put(Names.SNAPSHOT_DATA, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", 5).build())
.put(Names.OPTIMIZE, settingsBuilder().put("type", "fixed").put("size", 1).build())
.put(Names.BENCH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build())
.build();

View File

@ -0,0 +1,67 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.blobstore;
import com.carrotsearch.randomizedtesting.LifecycleScope;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.blobstore.fs.FsBlobStore;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.io.*;
import java.util.Arrays;
import java.util.Random;
public class BlobStoreTest extends ElasticsearchTestCase {
@Test
public void testWriteRead() throws IOException {
BlobContainer store = newBlobContainer();
int length = randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16));
byte[] data = new byte[length];
for (int i = 0; i < data.length; i++) {
data[i] = (byte) randomInt();
}
try (OutputStream stream = store.createOutput("foobar")) {
stream.write(data);
}
InputStream stream = store.openInput("foobar");
BytesRef target = new BytesRef();
while (target.length < data.length) {
byte[] buffer = new byte[scaledRandomIntBetween(1, data.length - target.length)];
int offset = scaledRandomIntBetween(0, buffer.length - 1);
int read = stream.read(buffer, offset, buffer.length - offset);
target.append(new BytesRef(buffer, offset, read));
}
assertEquals(data.length, target.length);
assertArrayEquals(data, Arrays.copyOfRange(target.bytes, target.offset, target.length));
}
protected BlobContainer newBlobContainer() {
File tempDir = newTempDir(LifecycleScope.TEST);
Settings settings = randomBoolean() ? ImmutableSettings.EMPTY : ImmutableSettings.builder().put("buffer_size", new ByteSizeValue(randomIntBetween(1, 100), ByteSizeUnit.KB)).build();
FsBlobStore store = new FsBlobStore(settings, tempDir);
return store.blobContainer(new BlobPath());
}
}

View File

@ -0,0 +1,133 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.snapshots.blobstore;
import com.carrotsearch.randomizedtesting.generators.RandomInts;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.io.*;
import java.util.Random;
import static org.hamcrest.Matchers.equalTo;
public class SlicedInputStreamTest extends ElasticsearchTestCase {
@Test
public void readRandom() throws IOException {
int parts = randomIntBetween(1, 20);
ByteArrayOutputStream stream = new ByteArrayOutputStream();
int numWriteOps = scaledRandomIntBetween(1000, 10000);
final long seed = randomLong();
Random random = new Random(seed);
for (int i = 0; i < numWriteOps; i++) {
switch(random.nextInt(5)) {
case 1:
stream.write(random.nextInt(Byte.MAX_VALUE));
break;
default:
stream.write(randomBytes(random));
break;
}
}
final CheckClosedInputStream[] streams = new CheckClosedInputStream[parts];
byte[] bytes = stream.toByteArray();
int slice = bytes.length / parts;
int offset = 0;
int length;
for (int i = 0; i < parts; i++) {
length = i == parts-1 ? bytes.length-offset : slice;
streams[i] = new CheckClosedInputStream(new ByteArrayInputStream(bytes, offset, length));
offset += length;
}
SlicedInputStream input = new SlicedInputStream(parts) {
@Override
protected InputStream openSlice(long slice) throws IOException {
return streams[(int)slice];
}
};
random = new Random(seed);
assertThat(input.available(), equalTo(streams[0].available()));
for (int i = 0; i < numWriteOps; i++) {
switch(random.nextInt(5)) {
case 1:
assertThat(random.nextInt(Byte.MAX_VALUE), equalTo(input.read()));
break;
default:
byte[] b = randomBytes(random);
byte[] buffer = new byte[b.length];
int read = readFully(input, buffer);
assertThat(b.length, equalTo(read));
assertArrayEquals(b, buffer);
break;
}
}
assertThat(input.available(), equalTo(0));
for (int i =0; i < streams.length-1; i++) {
assertTrue(streams[i].closed);
}
input.close();
for (int i =0; i < streams.length; i++) {
assertTrue(streams[i].closed);
}
}
private int readFully(InputStream stream, byte[] buffer) throws IOException {
for (int i = 0; i < buffer.length;) {
int read = stream.read(buffer, i, buffer.length-i);
if (read == -1) {
if (i == 0) {
return -1;
} else {
return i;
}
}
i+= read;
}
return buffer.length;
}
private byte[] randomBytes(Random random) {
int length = RandomInts.randomIntBetween(random, 1, 10);
byte[] data = new byte[length];
random.nextBytes(data);
return data;
}
private static final class CheckClosedInputStream extends FilterInputStream {
public boolean closed = false;
public CheckClosedInputStream(InputStream in) {
super(in);
}
@Override
public void close() throws IOException {
closed = true;
super.close();
}
}
}

View File

@ -21,31 +21,22 @@ package org.elasticsearch.snapshots.mockstore;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.blobstore.BlobContainer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
*
*/
public class ImmutableBlobContainerWrapper implements ImmutableBlobContainer {
private ImmutableBlobContainer delegate;
public class BlobContainerWrapper implements BlobContainer {
private BlobContainer delegate;
public ImmutableBlobContainerWrapper(ImmutableBlobContainer delegate) {
public BlobContainerWrapper(BlobContainer delegate) {
this.delegate = delegate;
}
@Override
public void writeBlob(String blobName, InputStream is, long sizeInBytes, WriterListener listener) {
delegate.writeBlob(blobName, is, sizeInBytes, listener);
}
@Override
public void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException {
delegate.writeBlob(blobName, is, sizeInBytes);
}
@Override
public BlobPath path() {
return delegate.path();
@ -57,13 +48,13 @@ public class ImmutableBlobContainerWrapper implements ImmutableBlobContainer {
}
@Override
public void readBlob(String blobName, ReadBlobListener listener) {
delegate.readBlob(blobName, listener);
public InputStream openInput(String name) throws IOException {
return delegate.openInput(name);
}
@Override
public byte[] readBlobFully(String blobName) throws IOException {
return delegate.readBlobFully(blobName);
public OutputStream createOutput(String blobName) throws IOException {
return delegate.createOutput(blobName);
}
@Override

View File

@ -18,9 +18,9 @@
*/
package org.elasticsearch.snapshots.mockstore;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
/**
*
@ -34,8 +34,8 @@ public class BlobStoreWrapper implements BlobStore {
}
@Override
public ImmutableBlobContainer immutableBlobContainer(BlobPath path) {
return delegate.immutableBlobContainer(path);
public BlobContainer blobContainer(BlobPath path) {
return delegate.blobContainer(path);
}
@Override

View File

@ -24,16 +24,16 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
@ -47,10 +47,6 @@ public class MockRepository extends FsRepository {
private final AtomicLong failureCounter = new AtomicLong();
public void resetFailureCount() {
failureCounter.set(0);
}
public long getFailureCount() {
return failureCounter.get();
}
@ -72,8 +68,8 @@ public class MockRepository extends FsRepository {
private volatile boolean blocked = false;
@Inject
public MockRepository(RepositoryName name, ThreadPool threadPool, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException {
super(name, repositorySettings, threadPool, indexShardRepository);
public MockRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException {
super(name, repositorySettings, indexShardRepository);
randomControlIOExceptionRate = repositorySettings.settings().getAsDouble("random_control_io_exception_rate", 0.0);
randomDataFileIOExceptionRate = repositorySettings.settings().getAsDouble("random_data_file_io_exception_rate", 0.0);
blockOnControlFiles = repositorySettings.settings().getAsBoolean("block_on_control", false);
@ -134,8 +130,8 @@ public class MockRepository extends FsRepository {
}
@Override
public ImmutableBlobContainer immutableBlobContainer(BlobPath path) {
return new MockImmutableBlobContainer(super.immutableBlobContainer(path));
public BlobContainer blobContainer(BlobPath path) {
return new MockBlobContainer(super.blobContainer(path));
}
public synchronized void unblockExecution() {
@ -166,7 +162,7 @@ public class MockRepository extends FsRepository {
return wasBlocked;
}
private class MockImmutableBlobContainer extends ImmutableBlobContainerWrapper {
private class MockBlobContainer extends BlobContainerWrapper {
private MessageDigest digest;
private boolean shouldFail(String blobName, double probability) {
@ -232,60 +228,20 @@ public class MockRepository extends FsRepository {
}
}
private boolean maybeIOExceptionOrBlock(String blobName, ImmutableBlobContainer.WriterListener listener) {
try {
maybeIOExceptionOrBlock(blobName);
return true;
} catch (IOException ex) {
listener.onFailure(ex);
return false;
}
}
private boolean maybeIOExceptionOrBlock(String blobName, ImmutableBlobContainer.ReadBlobListener listener) {
try {
maybeIOExceptionOrBlock(blobName);
return true;
} catch (IOException ex) {
listener.onFailure(ex);
return false;
}
}
public MockImmutableBlobContainer(ImmutableBlobContainer delegate) {
public MockBlobContainer(BlobContainer delegate) {
super(delegate);
}
@Override
public void writeBlob(String blobName, InputStream is, long sizeInBytes, WriterListener listener) {
if (maybeIOExceptionOrBlock(blobName, listener) ) {
super.writeBlob(blobName, is, sizeInBytes, listener);
}
}
@Override
public void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException {
maybeIOExceptionOrBlock(blobName);
super.writeBlob(blobName, is, sizeInBytes);
}
@Override
public boolean blobExists(String blobName) {
return super.blobExists(blobName);
}
@Override
public void readBlob(String blobName, ReadBlobListener listener) {
if (maybeIOExceptionOrBlock(blobName, listener)) {
super.readBlob(blobName, listener);
}
}
@Override
public byte[] readBlobFully(String blobName) throws IOException {
maybeIOExceptionOrBlock(blobName);
return super.readBlobFully(blobName);
public InputStream openInput(String name) throws IOException {
maybeIOExceptionOrBlock(name);
return super.openInput(name);
}
@Override
@ -318,6 +274,11 @@ public class MockRepository extends FsRepository {
return super.listBlobsByPrefix(blobNamePrefix);
}
@Override
public OutputStream createOutput(String blobName) throws IOException {
maybeIOExceptionOrBlock(blobName);
return super.createOutput(blobName);
}
}
}
}

View File

@ -366,7 +366,7 @@ public final class InternalTestCluster extends TestCluster {
for (String name : Arrays.asList(ThreadPool.Names.BULK, ThreadPool.Names.FLUSH, ThreadPool.Names.GET,
ThreadPool.Names.INDEX, ThreadPool.Names.MANAGEMENT, ThreadPool.Names.MERGE, ThreadPool.Names.OPTIMIZE,
ThreadPool.Names.PERCOLATE, ThreadPool.Names.REFRESH, ThreadPool.Names.SEARCH, ThreadPool.Names.SNAPSHOT,
ThreadPool.Names.SNAPSHOT_DATA, ThreadPool.Names.SUGGEST, ThreadPool.Names.WARMER)) {
ThreadPool.Names.SUGGEST, ThreadPool.Names.WARMER)) {
if (random.nextBoolean()) {
final String type = RandomPicks.randomFrom(random, Arrays.asList("fixed", "cached", "scaling"));
builder.put(ThreadPool.THREADPOOL_GROUP + name + ".type", type);