refactor hdfs gateway to use the new common blobstore

This commit is contained in:
kimchy 2010-06-22 09:10:00 +03:00
parent 21627bca10
commit 3d84af2a40
13 changed files with 455 additions and 608 deletions

View File

@ -2,6 +2,7 @@ rootLogger: INFO, console, file
logger:
# log action execution errors for easier debugging
action : DEBUG
index.gateway : DEBUG
appender:
console:

View File

@ -43,5 +43,10 @@ public interface AppendableBlobContainer extends BlobContainer {
void close();
}
AppendableBlob appendBlob(String blobName, boolean append) throws IOException;
/**
* Returns of an appended blob can be opened on an existing blob.
*/
boolean canAppendToExistingBlob();
AppendableBlob appendBlob(String blobName) throws IOException;
}

View File

@ -37,21 +37,20 @@ public class FsAppendableBlobContainer extends AbstractFsBlobContainer implement
super(blobStore, blobPath, path);
}
@Override public AppendableBlob appendBlob(String blobName, boolean append) throws IOException {
return new FsAppendableBlob(new File(path, blobName), append);
@Override public AppendableBlob appendBlob(String blobName) throws IOException {
return new FsAppendableBlob(new File(path, blobName));
}
@Override public boolean canAppendToExistingBlob() {
return true;
}
private class FsAppendableBlob implements AppendableBlob {
private final File file;
public FsAppendableBlob(File file, boolean append) throws IOException {
public FsAppendableBlob(File file) throws IOException {
this.file = file;
if (!append) {
RandomAccessFile raf = new RandomAccessFile(file, "rw");
raf.setLength(0);
raf.close();
}
}
@Override public void append(final AppendBlobListener listener) {

View File

@ -39,7 +39,7 @@ public class FsImmutableBlobContainer extends AbstractFsBlobContainer implements
blobStore.executorService().execute(new Runnable() {
@Override public void run() {
File file = new File(path, blobName);
RandomAccessFile raf = null;
RandomAccessFile raf;
try {
raf = new RandomAccessFile(file, "rw");
} catch (FileNotFoundException e) {

View File

@ -220,7 +220,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
if (translogBlob == null) {
try {
translogBlob = translogContainer.appendBlob("translog-" + translogSnapshot.translogId(), !snapshot.newTranslogCreated());
translogBlob = translogContainer.appendBlob("translog-" + translogSnapshot.translogId());
} catch (IOException e) {
throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to create translog", e);
}
@ -382,7 +382,25 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
}
}
indexShard.performRecovery(operations);
return new RecoveryStatus.Translog(indexShard.translog().currentId(), operations.size(), new ByteSizeValue(translogData.length, ByteSizeUnit.BYTES));
// clean all the other translogs
for (Long translogIdToDelete : translogIds) {
if (!translogId.equals(translogIdToDelete)) {
try {
translogContainer.deleteBlob("translog-" + translogIdToDelete);
} catch (Exception e) {
// ignore
}
}
}
// only if we can append to an existing translog we should use the current id and continue to append to it
long lastTranslogId = indexShard.translog().currentId();
if (!translogContainer.canAppendToExistingBlob()) {
lastTranslogId = -1;
}
return new RecoveryStatus.Translog(lastTranslogId, operations.size(), new ByteSizeValue(translogData.length, ByteSizeUnit.BYTES));
} catch (Exception e) {
lastException = e;
logger.debug("Failed to read translog, will try the next one", e);

View File

@ -21,7 +21,6 @@ package org.elasticsearch.index.gateway.fs;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.gateway.IndexShardGateway;
@ -33,7 +32,7 @@ import org.elasticsearch.index.settings.IndexSettings;
*/
public class FsIndexGateway extends BlobStoreIndexGateway {
@Inject public FsIndexGateway(Index index, @IndexSettings Settings indexSettings, Environment environment, Gateway gateway) {
@Inject public FsIndexGateway(Index index, @IndexSettings Settings indexSettings, Gateway gateway) {
super(index, indexSettings, gateway);
}

View File

@ -0,0 +1,111 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.blobstore.hdfs;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.ImmutableMap;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public abstract class AbstractHdfsBlobContainer extends AbstractBlobContainer {
protected final HdfsBlobStore blobStore;
protected final Path path;
public AbstractHdfsBlobContainer(HdfsBlobStore blobStore, BlobPath blobPath, Path path) {
super(blobPath);
this.blobStore = blobStore;
this.path = path;
}
public ImmutableMap<String, BlobMetaData> listBlobs() throws IOException {
FileStatus[] files = blobStore.fileSystem().listStatus(path);
if (files == null || files.length == 0) {
return ImmutableMap.of();
}
ImmutableMap.Builder<String, BlobMetaData> builder = ImmutableMap.builder();
for (FileStatus file : files) {
builder.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen()));
}
return builder.build();
}
@Override public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(final String blobNamePrefix) throws IOException {
FileStatus[] files = blobStore.fileSystem().listStatus(path, new PathFilter() {
@Override public boolean accept(Path path) {
return path.getName().startsWith(blobNamePrefix);
}
});
if (files == null || files.length == 0) {
return ImmutableMap.of();
}
ImmutableMap.Builder<String, BlobMetaData> builder = ImmutableMap.builder();
for (FileStatus file : files) {
builder.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen()));
}
return builder.build();
}
public boolean deleteBlob(String blobName) throws IOException {
return blobStore.fileSystem().delete(new Path(path, blobName), true);
}
@Override public void readBlob(final String blobName, final ReadBlobListener listener) {
blobStore.executorService().execute(new Runnable() {
@Override public void run() {
byte[] buffer = new byte[1024 * 16];
FSDataInputStream fileStream;
try {
fileStream = blobStore.fileSystem().open(new Path(path, blobName));
} catch (IOException e) {
listener.onFailure(e);
return;
}
try {
int bytesRead;
while ((bytesRead = fileStream.read(buffer)) != -1) {
listener.onPartial(buffer, 0, bytesRead);
}
listener.onCompleted();
} catch (Exception e) {
try {
fileStream.close();
} catch (IOException e1) {
// ignore
}
listener.onFailure(e);
}
}
});
}
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.blobstore.hdfs;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.elasticsearch.common.blobstore.AppendableBlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.io.stream.DataOutputStreamOutput;
import java.io.IOException;
import java.io.RandomAccessFile;
/**
* @author kimchy (shay.banon)
*/
public class HdfsAppendableBlobContainer extends AbstractHdfsBlobContainer implements AppendableBlobContainer {
public HdfsAppendableBlobContainer(HdfsBlobStore blobStore, BlobPath blobPath, Path path) {
super(blobStore, blobPath, path);
}
@Override public AppendableBlob appendBlob(String blobName) throws IOException {
return new HdfsAppendableBlob(new Path(path, blobName));
}
@Override public boolean canAppendToExistingBlob() {
return false;
}
private class HdfsAppendableBlob implements AppendableBlob {
private final Path file;
private final FSDataOutputStream fsDataStream;
private final DataOutputStreamOutput out;
public HdfsAppendableBlob(Path file) throws IOException {
this.file = file;
this.fsDataStream = blobStore.fileSystem().create(file, true);
this.out = new DataOutputStreamOutput(fsDataStream);
}
@Override public void append(final AppendBlobListener listener) {
blobStore.executorService().execute(new Runnable() {
@Override public void run() {
RandomAccessFile raf = null;
try {
listener.withStream(out);
out.flush();
fsDataStream.flush();
fsDataStream.sync();
listener.onCompleted();
} catch (IOException e) {
listener.onFailure(e);
}
}
});
}
@Override public void close() {
try {
fsDataStream.close();
} catch (IOException e) {
// ignore
}
}
}
}

View File

@ -0,0 +1,110 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.blobstore.hdfs;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.elasticsearch.common.blobstore.AppendableBlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.settings.Settings;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.elasticsearch.common.util.concurrent.DynamicExecutors.*;
/**
* @author kimchy (shay.banon)
*/
public class HdfsBlobStore implements BlobStore {
private final FileSystem fileSystem;
private final Path path;
private final ExecutorService executorService;
public HdfsBlobStore(Settings settings, FileSystem fileSystem, Path path) throws IOException {
this.fileSystem = fileSystem;
this.path = path;
if (!fileSystem.exists(path)) {
fileSystem.mkdirs(path);
}
executorService = Executors.newCachedThreadPool(daemonThreadFactory(settings, "hdfs_blobstore"));
}
@Override public String toString() {
return path.toString();
}
public FileSystem fileSystem() {
return fileSystem;
}
public Path path() {
return path;
}
public ExecutorService executorService() {
return executorService;
}
@Override public ImmutableBlobContainer immutableBlobContainer(BlobPath path) {
return new HdfsImmutableBlobContainer(this, path, buildAndCreate(path));
}
@Override public AppendableBlobContainer appendableBlobContainer(BlobPath path) {
return new HdfsAppendableBlobContainer(this, path, buildAndCreate(path));
}
@Override public void delete(BlobPath path) {
try {
fileSystem.delete(buildPath(path), true);
} catch (IOException e) {
// ignore
}
}
@Override public void close() {
}
private Path buildAndCreate(BlobPath blobPath) {
Path path = buildPath(blobPath);
try {
fileSystem.mkdirs(path);
} catch (IOException e) {
// ignore
}
return path;
}
private Path buildPath(BlobPath blobPath) {
Path path = this.path;
for (String p : blobPath) {
path = new Path(path, p);
}
return path;
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.blobstore.hdfs;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.blobstore.support.BlobStores;
import java.io.IOException;
import java.io.InputStream;
/**
* @author kimchy (shay.banon)
*/
public class HdfsImmutableBlobContainer extends AbstractHdfsBlobContainer implements ImmutableBlobContainer {
public HdfsImmutableBlobContainer(HdfsBlobStore blobStore, BlobPath blobPath, Path path) {
super(blobStore, blobPath, path);
}
@Override public void writeBlob(final String blobName, final InputStream is, final long sizeInBytes, final WriterListener listener) {
blobStore.executorService().execute(new Runnable() {
@Override public void run() {
Path file = new Path(path, blobName);
FSDataOutputStream fileStream;
try {
fileStream = blobStore.fileSystem().create(file, true);
} catch (IOException e) {
listener.onFailure(e);
return;
}
try {
try {
byte[] buffer = new byte[16 * 1024];
int bytesRead;
while ((bytesRead = is.read(buffer)) != -1) {
fileStream.write(buffer, 0, bytesRead);
}
} finally {
try {
is.close();
} catch (IOException ex) {
// do nothing
}
try {
fileStream.close();
} catch (IOException ex) {
// do nothing
}
}
listener.onCompleted();
} catch (Exception e) {
// just on the safe size, try and delete it on failure
try {
if (blobStore.fileSystem().exists(file)) {
blobStore.fileSystem().delete(file, true);
}
} catch (Exception e1) {
// ignore
}
listener.onFailure(e);
}
}
});
}
@Override public void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException {
BlobStores.syncWriteBlob(this, blobName, is, sizeInBytes);
}
}

View File

@ -20,22 +20,16 @@
package org.elasticsearch.gateway.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.blobstore.hdfs.HdfsBlobStore;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.builder.BinaryXContentBuilder;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.gateway.GatewayException;
import org.elasticsearch.gateway.blobstore.BlobStoreGateway;
import java.io.IOException;
import java.net.URI;
@ -44,25 +38,17 @@ import java.util.Map;
/**
* @author kimchy (shay.banon)
*/
public class HdfsGateway extends AbstractLifecycleComponent<Gateway> implements Gateway {
public class HdfsGateway extends BlobStoreGateway {
private final boolean closeFileSystem;
private final FileSystem fileSystem;
private final String uri;
private final Path path;
private final Path metaDataPath;
private volatile int currentIndex;
@Inject public HdfsGateway(Settings settings, ClusterName clusterName) throws IOException {
super(settings);
this.closeFileSystem = componentSettings.getAsBoolean("close_fs", true);
this.uri = componentSettings.get("uri");
String uri = componentSettings.get("uri");
if (uri == null) {
throw new ElasticSearchIllegalArgumentException("hdfs gateway requires the 'uri' setting to be set");
}
@ -70,11 +56,9 @@ public class HdfsGateway extends AbstractLifecycleComponent<Gateway> implements
if (path == null) {
throw new ElasticSearchIllegalArgumentException("hdfs gateway requires the 'path' path setting to be set");
}
this.path = new Path(new Path(path), clusterName.value());
Path hPath = new Path(new Path(path), clusterName.value());
logger.debug("Using uri [{}], path [{}]", this.uri, this.path);
this.metaDataPath = new Path(this.path, "metadata");
logger.debug("Using uri [{}], path [{}]", uri, hPath);
Configuration conf = new Configuration();
Settings hdfsSettings = settings.getByPrefix("hdfs.conf.");
@ -84,141 +68,23 @@ public class HdfsGateway extends AbstractLifecycleComponent<Gateway> implements
fileSystem = FileSystem.get(URI.create(uri), conf);
fileSystem.mkdirs(metaDataPath);
this.currentIndex = findLatestIndex();
logger.debug("Latest metadata found at index [" + currentIndex + "]");
initialize(new HdfsBlobStore(settings, fileSystem, hPath), clusterName);
}
@Override public String type() {
return "fs";
}
public FileSystem fileSystem() {
return this.fileSystem;
}
public Path path() {
return this.path;
}
@Override protected void doStart() throws ElasticSearchException {
}
@Override protected void doStop() throws ElasticSearchException {
}
@Override protected void doClose() throws ElasticSearchException {
if (closeFileSystem) {
try {
fileSystem.close();
} catch (IOException e) {
logger.warn("Failed to close file system {}", fileSystem);
}
}
}
@Override public void write(MetaData metaData) throws GatewayException {
try {
final Path file = new Path(metaDataPath, "metadata-" + (currentIndex + 1));
BinaryXContentBuilder builder = XContentFactory.contentBinaryBuilder(XContentType.JSON);
builder.prettyPrint();
builder.startObject();
MetaData.Builder.toXContent(metaData, builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
FSDataOutputStream fileStream = fileSystem.create(file, true);
fileStream.write(builder.unsafeBytes(), 0, builder.unsafeBytesLength());
fileStream.flush();
fileStream.sync();
fileStream.close();
currentIndex++;
FileStatus[] oldFiles = fileSystem.listStatus(metaDataPath, new PathFilter() {
@Override public boolean accept(Path path) {
return path.getName().startsWith("metadata-") && !path.getName().equals(file.getName());
}
});
if (oldFiles != null) {
for (FileStatus oldFile : oldFiles) {
fileSystem.delete(oldFile.getPath(), false);
}
}
} catch (IOException e) {
throw new GatewayException("can't write new metadata file into the gateway", e);
}
}
@Override public MetaData read() throws GatewayException {
try {
if (currentIndex == -1)
return null;
Path file = new Path(metaDataPath, "metadata-" + currentIndex);
return readMetaData(file);
} catch (GatewayException e) {
throw e;
} catch (Exception e) {
throw new GatewayException("can't read metadata file from the gateway", e);
}
return "hdfs";
}
@Override public Class<? extends Module> suggestIndexGateway() {
return HdfsIndexGatewayModule.class;
}
@Override public void reset() throws IOException {
fileSystem.delete(path, true);
}
private int findLatestIndex() throws IOException {
FileStatus[] files = fileSystem.listStatus(metaDataPath, new PathFilter() {
@Override public boolean accept(Path path) {
return path.getName().startsWith("metadata-");
}
});
if (files == null || files.length == 0) {
return -1;
}
int index = -1;
for (FileStatus file : files) {
if (logger.isTraceEnabled()) {
logger.trace("[findLatestMetadata]: Processing file [" + file + "]");
}
String name = file.getPath().getName();
int fileIndex = Integer.parseInt(name.substring(name.indexOf('-') + 1));
if (fileIndex >= index) {
// try and read the meta data
try {
readMetaData(file.getPath());
index = fileIndex;
} catch (Exception e) {
logger.warn("[findLatestMetadata]: Failed to read metadata from [" + file + "], ignoring...", e);
}
}
}
return index;
}
private MetaData readMetaData(Path file) throws IOException {
FSDataInputStream fileStream = fileSystem.open(file);
XContentParser parser = null;
try {
parser = XContentFactory.xContent(XContentType.JSON).createParser(fileStream);
return MetaData.Builder.fromXContent(parser, settings);
} finally {
if (parser != null) {
parser.close();
}
@Override protected void doClose() throws ElasticSearchException {
super.doClose();
if (closeFileSystem) {
try {
fileStream.close();
} catch (Exception e) {
fileSystem.close();
} catch (IOException e) {
// ignore
}
}

View File

@ -19,74 +19,28 @@
package org.elasticsearch.index.gateway.hdfs;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.gateway.hdfs.HdfsGateway;
import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.gateway.IndexGateway;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.gateway.blobstore.BlobStoreIndexGateway;
import org.elasticsearch.index.settings.IndexSettings;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class HdfsIndexGateway extends AbstractIndexComponent implements IndexGateway {
private final FileSystem fileSystem;
private final Path indexPath;
public class HdfsIndexGateway extends BlobStoreIndexGateway {
@Inject public HdfsIndexGateway(Index index, @IndexSettings Settings indexSettings, Gateway gateway) {
super(index, indexSettings);
Path path = null;
String pathSetting = componentSettings.get("path");
if (pathSetting != null) {
path = new Path(pathSetting);
}
if (gateway instanceof HdfsGateway) {
HdfsGateway hdfsGateway = (HdfsGateway) gateway;
fileSystem = hdfsGateway.fileSystem();
if (path == null) {
path = hdfsGateway.path();
}
} else {
throw new ElasticSearchIllegalArgumentException("Must configure an hdfs gateway to use index hdfs gateway");
}
this.indexPath = new Path(new Path(path, "indices"), index.name());
super(index, indexSettings, gateway);
}
@Override public String type() {
return "hdfs";
}
public FileSystem fileSystem() {
return this.fileSystem;
}
public Path indexPath() {
return this.indexPath;
}
@Override public Class<? extends IndexShardGateway> shardGatewayClass() {
return HdfsIndexShardGateway.class;
}
@Override public void close(boolean delete) throws ElasticSearchException {
if (delete) {
try {
fileSystem.delete(indexPath, true);
} catch (IOException e) {
logger.warn("Failed to delete [{}]", e, indexPath);
}
}
}
}

View File

@ -19,421 +19,28 @@
package org.elasticsearch.index.gateway.hdfs;
import org.apache.hadoop.fs.*;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.DataInputStreamInput;
import org.elasticsearch.common.io.stream.DataOutputStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.Directories;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.gateway.IndexGateway;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
import org.elasticsearch.index.gateway.IndexShardGatewaySnapshotFailedException;
import org.elasticsearch.index.gateway.blobstore.BlobStoreIndexShardGateway;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.common.lucene.Directories.*;
import static org.elasticsearch.index.translog.TranslogStreams.*;
/**
* @author kimchy (shay.banon)
*/
public class HdfsIndexShardGateway extends AbstractIndexShardComponent implements IndexShardGateway {
private final InternalIndexShard indexShard;
private final ThreadPool threadPool;
private final RecoveryThrottler recoveryThrottler;
private final Store store;
private final FileSystem fileSystem;
private final Path path;
private final Path indexPath;
private final Path translogPath;
private volatile FSDataOutputStream currentTranslogStream = null;
public class HdfsIndexShardGateway extends BlobStoreIndexShardGateway {
@Inject public HdfsIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexGateway hdfsIndexGateway,
IndexShard indexShard, Store store, RecoveryThrottler recoveryThrottler) {
super(shardId, indexSettings);
this.indexShard = (InternalIndexShard) indexShard;
this.threadPool = threadPool;
this.recoveryThrottler = recoveryThrottler;
this.store = store;
this.fileSystem = ((HdfsIndexGateway) hdfsIndexGateway).fileSystem();
this.path = new Path(((HdfsIndexGateway) hdfsIndexGateway).indexPath(), Integer.toString(shardId.id()));
this.indexPath = new Path(path, "index");
this.translogPath = new Path(path, "translog");
}
@Override public void close(boolean delete) throws ElasticSearchException {
if (currentTranslogStream != null) {
try {
currentTranslogStream.close();
} catch (IOException e) {
// ignore
}
}
if (delete) {
try {
fileSystem.delete(path, true);
} catch (IOException e) {
logger.warn("Failed to delete [{}]", e, path);
}
}
super(shardId, indexSettings, threadPool, hdfsIndexGateway, indexShard, store, recoveryThrottler);
}
@Override public String type() {
return "hdfs";
}
@Override public boolean requiresSnapshotScheduling() {
return true;
}
@Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException {
RecoveryStatus.Index recoveryStatusIndex = recoverIndex();
RecoveryStatus.Translog recoveryStatusTranslog = recoverTranslog();
return new RecoveryStatus(recoveryStatusIndex, recoveryStatusTranslog);
}
@Override public SnapshotStatus snapshot(Snapshot snapshot) {
long totalTimeStart = System.currentTimeMillis();
boolean indexDirty = false;
boolean translogDirty = false;
final SnapshotIndexCommit snapshotIndexCommit = snapshot.indexCommit();
final Translog.Snapshot translogSnapshot = snapshot.translogSnapshot();
int indexNumberOfFiles = 0;
long indexTotalFilesSize = 0;
long indexTime = 0;
if (snapshot.indexChanged()) {
long time = System.currentTimeMillis();
indexDirty = true;
// snapshot into the index
final CountDownLatch latch = new CountDownLatch(snapshotIndexCommit.getFiles().length);
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
for (final String fileName : snapshotIndexCommit.getFiles()) {
// don't copy over the segments file, it will be copied over later on as part of the
// final snapshot phase
if (fileName.equals(snapshotIndexCommit.getSegmentsFileName())) {
latch.countDown();
continue;
}
IndexInput indexInput = null;
try {
indexInput = snapshotIndexCommit.getDirectory().openInput(fileName);
FileStatus fileStatus = fileSystem.getFileStatus(new Path(indexPath, fileName));
if (fileStatus.getLen() == indexInput.length()) {
// we assume its the same one, no need to copy
latch.countDown();
continue;
}
} catch (FileNotFoundException e) {
// that's fine!
} catch (Exception e) {
logger.debug("Failed to verify file equality based on length, copying...", e);
} finally {
if (indexInput != null) {
try {
indexInput.close();
} catch (IOException e) {
// ignore
}
}
}
indexNumberOfFiles++;
try {
indexTotalFilesSize += snapshotIndexCommit.getDirectory().fileLength(fileName);
} catch (IOException e) {
// ignore...
}
threadPool.execute(new Runnable() {
@Override public void run() {
Path copyTo = new Path(indexPath, fileName);
FSDataOutputStream fileStream;
try {
fileStream = fileSystem.create(copyTo, true);
copyFromDirectory(snapshotIndexCommit.getDirectory(), fileName, fileStream);
} catch (Exception e) {
lastException.set(new IndexShardGatewaySnapshotFailedException(shardId, "Failed to copy to [" + copyTo + "], from dir [" + snapshotIndexCommit.getDirectory() + "] and file [" + fileName + "]", e));
} finally {
latch.countDown();
}
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
lastException.set(e);
}
if (lastException.get() != null) {
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to perform snapshot (index files)", lastException.get());
}
indexTime = System.currentTimeMillis() - time;
}
int translogNumberOfOperations = 0;
long translogTime = 0;
if (snapshot.newTranslogCreated() || currentTranslogStream == null) {
translogDirty = true;
long time = System.currentTimeMillis();
// a new translog, close the current stream
if (currentTranslogStream != null) {
try {
currentTranslogStream.close();
} catch (IOException e) {
// ignore
}
}
Path currentTranslogPath = new Path(translogPath, "translog-" + translogSnapshot.translogId());
try {
currentTranslogStream = fileSystem.create(currentTranslogPath, true);
StreamOutput out = new DataOutputStreamOutput(currentTranslogStream);
for (Translog.Operation operation : translogSnapshot) {
translogNumberOfOperations++;
writeTranslogOperation(out, operation);
}
currentTranslogStream.flush();
currentTranslogStream.sync();
} catch (Exception e) {
currentTranslogPath = null;
if (currentTranslogStream != null) {
try {
currentTranslogStream.close();
} catch (IOException e1) {
// ignore
} finally {
currentTranslogStream = null;
}
}
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to snapshot translog into [" + currentTranslogPath + "]", e);
}
translogTime = System.currentTimeMillis() - time;
} else if (snapshot.sameTranslogNewOperations()) {
translogDirty = true;
long time = System.currentTimeMillis();
try {
StreamOutput out = new DataOutputStreamOutput(currentTranslogStream);
for (Translog.Operation operation : translogSnapshot.skipTo(snapshot.lastTranslogSize())) {
translogNumberOfOperations++;
writeTranslogOperation(out, operation);
}
} catch (Exception e) {
try {
currentTranslogStream.close();
} catch (IOException e1) {
// ignore
} finally {
currentTranslogStream = null;
}
}
translogTime = System.currentTimeMillis() - time;
}
// now write the segments file and update the translog header
if (indexDirty) {
Path segmentsPath = new Path(indexPath, snapshotIndexCommit.getSegmentsFileName());
try {
indexNumberOfFiles++;
indexTotalFilesSize += snapshotIndexCommit.getDirectory().fileLength(snapshotIndexCommit.getSegmentsFileName());
long time = System.currentTimeMillis();
FSDataOutputStream fileStream;
fileStream = fileSystem.create(segmentsPath, true);
copyFromDirectory(snapshotIndexCommit.getDirectory(), snapshotIndexCommit.getSegmentsFileName(), fileStream);
indexTime += (System.currentTimeMillis() - time);
} catch (Exception e) {
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to finalize index snapshot into [" + segmentsPath + "]", e);
}
}
// delete the old translog
if (snapshot.newTranslogCreated()) {
try {
fileSystem.delete(new Path(translogPath, "translog-" + snapshot.lastTranslogId()), false);
} catch (IOException e) {
// ignore
}
}
// delete files that no longer exists in the index
if (indexDirty) {
try {
FileStatus[] existingFiles = fileSystem.listStatus(indexPath);
if (existingFiles != null) {
for (FileStatus existingFile : existingFiles) {
boolean found = false;
for (final String fileName : snapshotIndexCommit.getFiles()) {
if (existingFile.getPath().getName().equals(fileName)) {
found = true;
break;
}
}
if (!found) {
fileSystem.delete(existingFile.getPath(), false);
}
}
}
} catch (Exception e) {
// no worries, failed to clean old ones, will clean them later
}
}
return new SnapshotStatus(new TimeValue(System.currentTimeMillis() - totalTimeStart),
new SnapshotStatus.Index(indexNumberOfFiles, new ByteSizeValue(indexTotalFilesSize), new TimeValue(indexTime)),
new SnapshotStatus.Translog(translogNumberOfOperations, new TimeValue(translogTime)));
}
private RecoveryStatus.Index recoverIndex() throws IndexShardGatewayRecoveryException {
FileStatus[] files;
try {
files = fileSystem.listStatus(indexPath);
} catch (IOException e) {
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to list files", e);
}
if (files == null || files.length == 0) {
return new RecoveryStatus.Index(-1, 0, new ByteSizeValue(0, ByteSizeUnit.BYTES), TimeValue.timeValueMillis(0));
}
final CountDownLatch latch = new CountDownLatch(files.length);
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
final AtomicLong throttlingWaitTime = new AtomicLong();
for (final FileStatus file : files) {
threadPool.execute(new Runnable() {
@Override public void run() {
try {
long throttlingStartTime = System.currentTimeMillis();
while (!recoveryThrottler.tryStream(shardId, file.getPath().getName())) {
Thread.sleep(recoveryThrottler.throttleInterval().millis());
}
throttlingWaitTime.addAndGet(System.currentTimeMillis() - throttlingStartTime);
FSDataInputStream fileStream = fileSystem.open(file.getPath());
Directories.copyToDirectory(fileStream, store.directory(), file.getPath().getName());
} catch (Exception e) {
logger.debug("Failed to read [" + file + "] into [" + store + "]", e);
lastException.set(e);
} finally {
recoveryThrottler.streamDone(shardId, file.getPath().getName());
latch.countDown();
}
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
lastException.set(e);
}
if (lastException.get() != null) {
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to recover index files", lastException.get());
}
long totalSize = 0;
for (FileStatus file : files) {
totalSize += file.getLen();
}
long version = -1;
try {
if (IndexReader.indexExists(store.directory())) {
version = IndexReader.getCurrentVersion(store.directory());
}
} catch (IOException e) {
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to fetch index version after copying it over", e);
}
return new RecoveryStatus.Index(version, files.length, new ByteSizeValue(totalSize, ByteSizeUnit.BYTES), TimeValue.timeValueMillis(throttlingWaitTime.get()));
}
private RecoveryStatus.Translog recoverTranslog() throws IndexShardGatewayRecoveryException {
FSDataInputStream fileStream = null;
try {
long recoveryTranslogId = findLatestTranslogId();
if (recoveryTranslogId == -1) {
// no recovery file found, start the shard and bail
indexShard.start();
return new RecoveryStatus.Translog(-1, 0, new ByteSizeValue(0, ByteSizeUnit.BYTES));
}
FileStatus status = fileSystem.getFileStatus(new Path(translogPath, "translog-" + recoveryTranslogId));
fileStream = fileSystem.open(status.getPath());
ArrayList<Translog.Operation> operations = Lists.newArrayList();
for (; ;) {
try {
operations.add(readTranslogOperation(new DataInputStreamInput(fileStream)));
} catch (EOFException e) {
// reached end of stream
break;
}
}
indexShard.performRecovery(operations);
return new RecoveryStatus.Translog(recoveryTranslogId, operations.size(), new ByteSizeValue(status.getLen(), ByteSizeUnit.BYTES));
} catch (Exception e) {
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to perform recovery of translog", e);
} finally {
if (fileStream != null) {
try {
fileStream.close();
} catch (IOException e) {
// ignore
}
}
}
}
private long findLatestTranslogId() throws IOException {
FileStatus[] files = fileSystem.listStatus(translogPath, new PathFilter() {
@Override public boolean accept(Path path) {
return path.getName().startsWith("translog-");
}
});
if (files == null) {
return -1;
}
long index = -1;
for (FileStatus file : files) {
String name = file.getPath().getName();
long fileIndex = Long.parseLong(name.substring(name.indexOf('-') + 1));
if (fileIndex >= index) {
index = fileIndex;
}
}
return index;
}
}