From 3d84af2a4045c7c8cf84dbf20de2a7c452868e54 Mon Sep 17 00:00:00 2001 From: kimchy Date: Tue, 22 Jun 2010 09:10:00 +0300 Subject: [PATCH] refactor hdfs gateway to use the new common blobstore --- config/logging.yml | 1 + .../blobstore/AppendableBlobContainer.java | 7 +- .../fs/FsAppendableBlobContainer.java | 15 +- .../fs/FsImmutableBlobContainer.java | 2 +- .../blobstore/BlobStoreIndexShardGateway.java | 22 +- .../index/gateway/fs/FsIndexGateway.java | 3 +- .../hdfs/AbstractHdfsBlobContainer.java | 111 +++++ .../hdfs/HdfsAppendableBlobContainer.java | 87 ++++ .../common/blobstore/hdfs/HdfsBlobStore.java | 110 +++++ .../hdfs/HdfsImmutableBlobContainer.java | 90 ++++ .../gateway/hdfs/HdfsGateway.java | 164 +------ .../index/gateway/hdfs/HdfsIndexGateway.java | 52 +-- .../gateway/hdfs/HdfsIndexShardGateway.java | 399 +----------------- 13 files changed, 455 insertions(+), 608 deletions(-) create mode 100644 plugins/hadoop/src/main/java/org/elasticsearch/common/blobstore/hdfs/AbstractHdfsBlobContainer.java create mode 100644 plugins/hadoop/src/main/java/org/elasticsearch/common/blobstore/hdfs/HdfsAppendableBlobContainer.java create mode 100644 plugins/hadoop/src/main/java/org/elasticsearch/common/blobstore/hdfs/HdfsBlobStore.java create mode 100644 plugins/hadoop/src/main/java/org/elasticsearch/common/blobstore/hdfs/HdfsImmutableBlobContainer.java diff --git a/config/logging.yml b/config/logging.yml index 127f21998bd..61e4e3a55a7 100644 --- a/config/logging.yml +++ b/config/logging.yml @@ -2,6 +2,7 @@ rootLogger: INFO, console, file logger: # log action execution errors for easier debugging action : DEBUG + index.gateway : DEBUG appender: console: diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/AppendableBlobContainer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/AppendableBlobContainer.java index b22c2f88d15..31e594a259a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/AppendableBlobContainer.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/AppendableBlobContainer.java @@ -43,5 +43,10 @@ public interface AppendableBlobContainer extends BlobContainer { void close(); } - AppendableBlob appendBlob(String blobName, boolean append) throws IOException; + /** + * Returns of an appended blob can be opened on an existing blob. + */ + boolean canAppendToExistingBlob(); + + AppendableBlob appendBlob(String blobName) throws IOException; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsAppendableBlobContainer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsAppendableBlobContainer.java index 300eb3aa2d8..20f063598df 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsAppendableBlobContainer.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsAppendableBlobContainer.java @@ -37,21 +37,20 @@ public class FsAppendableBlobContainer extends AbstractFsBlobContainer implement super(blobStore, blobPath, path); } - @Override public AppendableBlob appendBlob(String blobName, boolean append) throws IOException { - return new FsAppendableBlob(new File(path, blobName), append); + @Override public AppendableBlob appendBlob(String blobName) throws IOException { + return new FsAppendableBlob(new File(path, blobName)); + } + + @Override public boolean canAppendToExistingBlob() { + return true; } private class FsAppendableBlob implements AppendableBlob { private final File file; - public FsAppendableBlob(File file, boolean append) throws IOException { + public FsAppendableBlob(File file) throws IOException { this.file = file; - if (!append) { - RandomAccessFile raf = new RandomAccessFile(file, "rw"); - raf.setLength(0); - raf.close(); - } } @Override public void append(final AppendBlobListener listener) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsImmutableBlobContainer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsImmutableBlobContainer.java index 623790f7957..7037540d2ea 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsImmutableBlobContainer.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsImmutableBlobContainer.java @@ -39,7 +39,7 @@ public class FsImmutableBlobContainer extends AbstractFsBlobContainer implements blobStore.executorService().execute(new Runnable() { @Override public void run() { File file = new File(path, blobName); - RandomAccessFile raf = null; + RandomAccessFile raf; try { raf = new RandomAccessFile(file, "rw"); } catch (FileNotFoundException e) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java index b1433543fe7..79823376d09 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java @@ -220,7 +220,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo if (translogBlob == null) { try { - translogBlob = translogContainer.appendBlob("translog-" + translogSnapshot.translogId(), !snapshot.newTranslogCreated()); + translogBlob = translogContainer.appendBlob("translog-" + translogSnapshot.translogId()); } catch (IOException e) { throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to create translog", e); } @@ -382,7 +382,25 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo } } indexShard.performRecovery(operations); - return new RecoveryStatus.Translog(indexShard.translog().currentId(), operations.size(), new ByteSizeValue(translogData.length, ByteSizeUnit.BYTES)); + + // clean all the other translogs + for (Long translogIdToDelete : translogIds) { + if (!translogId.equals(translogIdToDelete)) { + try { + translogContainer.deleteBlob("translog-" + translogIdToDelete); + } catch (Exception e) { + // ignore + } + } + } + + // only if we can append to an existing translog we should use the current id and continue to append to it + long lastTranslogId = indexShard.translog().currentId(); + if (!translogContainer.canAppendToExistingBlob()) { + lastTranslogId = -1; + } + + return new RecoveryStatus.Translog(lastTranslogId, operations.size(), new ByteSizeValue(translogData.length, ByteSizeUnit.BYTES)); } catch (Exception e) { lastException = e; logger.debug("Failed to read translog, will try the next one", e); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexGateway.java index f978e60a9bd..7f4b3474c50 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexGateway.java @@ -21,7 +21,6 @@ package org.elasticsearch.index.gateway.fs; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.env.Environment; import org.elasticsearch.gateway.Gateway; import org.elasticsearch.index.Index; import org.elasticsearch.index.gateway.IndexShardGateway; @@ -33,7 +32,7 @@ import org.elasticsearch.index.settings.IndexSettings; */ public class FsIndexGateway extends BlobStoreIndexGateway { - @Inject public FsIndexGateway(Index index, @IndexSettings Settings indexSettings, Environment environment, Gateway gateway) { + @Inject public FsIndexGateway(Index index, @IndexSettings Settings indexSettings, Gateway gateway) { super(index, indexSettings, gateway); } diff --git a/plugins/hadoop/src/main/java/org/elasticsearch/common/blobstore/hdfs/AbstractHdfsBlobContainer.java b/plugins/hadoop/src/main/java/org/elasticsearch/common/blobstore/hdfs/AbstractHdfsBlobContainer.java new file mode 100644 index 00000000000..77301299e7e --- /dev/null +++ b/plugins/hadoop/src/main/java/org/elasticsearch/common/blobstore/hdfs/AbstractHdfsBlobContainer.java @@ -0,0 +1,111 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.blobstore.hdfs; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.elasticsearch.common.blobstore.BlobMetaData; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; +import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; +import org.elasticsearch.common.collect.ImmutableMap; + +import java.io.IOException; + +/** + * @author kimchy (shay.banon) + */ +public abstract class AbstractHdfsBlobContainer extends AbstractBlobContainer { + + protected final HdfsBlobStore blobStore; + + protected final Path path; + + public AbstractHdfsBlobContainer(HdfsBlobStore blobStore, BlobPath blobPath, Path path) { + super(blobPath); + this.blobStore = blobStore; + this.path = path; + } + + public ImmutableMap listBlobs() throws IOException { + FileStatus[] files = blobStore.fileSystem().listStatus(path); + if (files == null || files.length == 0) { + return ImmutableMap.of(); + } + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (FileStatus file : files) { + builder.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen())); + } + return builder.build(); + } + + @Override public ImmutableMap listBlobsByPrefix(final String blobNamePrefix) throws IOException { + FileStatus[] files = blobStore.fileSystem().listStatus(path, new PathFilter() { + @Override public boolean accept(Path path) { + return path.getName().startsWith(blobNamePrefix); + } + }); + if (files == null || files.length == 0) { + return ImmutableMap.of(); + } + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (FileStatus file : files) { + builder.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen())); + } + return builder.build(); + } + + public boolean deleteBlob(String blobName) throws IOException { + return blobStore.fileSystem().delete(new Path(path, blobName), true); + } + + @Override public void readBlob(final String blobName, final ReadBlobListener listener) { + blobStore.executorService().execute(new Runnable() { + @Override public void run() { + byte[] buffer = new byte[1024 * 16]; + + + FSDataInputStream fileStream; + try { + fileStream = blobStore.fileSystem().open(new Path(path, blobName)); + } catch (IOException e) { + listener.onFailure(e); + return; + } + try { + int bytesRead; + while ((bytesRead = fileStream.read(buffer)) != -1) { + listener.onPartial(buffer, 0, bytesRead); + } + listener.onCompleted(); + } catch (Exception e) { + try { + fileStream.close(); + } catch (IOException e1) { + // ignore + } + listener.onFailure(e); + } + } + }); + } +} \ No newline at end of file diff --git a/plugins/hadoop/src/main/java/org/elasticsearch/common/blobstore/hdfs/HdfsAppendableBlobContainer.java b/plugins/hadoop/src/main/java/org/elasticsearch/common/blobstore/hdfs/HdfsAppendableBlobContainer.java new file mode 100644 index 00000000000..8d051795191 --- /dev/null +++ b/plugins/hadoop/src/main/java/org/elasticsearch/common/blobstore/hdfs/HdfsAppendableBlobContainer.java @@ -0,0 +1,87 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.blobstore.hdfs; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.elasticsearch.common.blobstore.AppendableBlobContainer; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.io.stream.DataOutputStreamOutput; + +import java.io.IOException; +import java.io.RandomAccessFile; + +/** + * @author kimchy (shay.banon) + */ +public class HdfsAppendableBlobContainer extends AbstractHdfsBlobContainer implements AppendableBlobContainer { + + public HdfsAppendableBlobContainer(HdfsBlobStore blobStore, BlobPath blobPath, Path path) { + super(blobStore, blobPath, path); + } + + @Override public AppendableBlob appendBlob(String blobName) throws IOException { + return new HdfsAppendableBlob(new Path(path, blobName)); + } + + @Override public boolean canAppendToExistingBlob() { + return false; + } + + private class HdfsAppendableBlob implements AppendableBlob { + + private final Path file; + + private final FSDataOutputStream fsDataStream; + + private final DataOutputStreamOutput out; + + public HdfsAppendableBlob(Path file) throws IOException { + this.file = file; + this.fsDataStream = blobStore.fileSystem().create(file, true); + this.out = new DataOutputStreamOutput(fsDataStream); + } + + @Override public void append(final AppendBlobListener listener) { + blobStore.executorService().execute(new Runnable() { + @Override public void run() { + RandomAccessFile raf = null; + try { + listener.withStream(out); + out.flush(); + fsDataStream.flush(); + fsDataStream.sync(); + listener.onCompleted(); + } catch (IOException e) { + listener.onFailure(e); + } + } + }); + } + + @Override public void close() { + try { + fsDataStream.close(); + } catch (IOException e) { + // ignore + } + } + } +} \ No newline at end of file diff --git a/plugins/hadoop/src/main/java/org/elasticsearch/common/blobstore/hdfs/HdfsBlobStore.java b/plugins/hadoop/src/main/java/org/elasticsearch/common/blobstore/hdfs/HdfsBlobStore.java new file mode 100644 index 00000000000..948b0b78d29 --- /dev/null +++ b/plugins/hadoop/src/main/java/org/elasticsearch/common/blobstore/hdfs/HdfsBlobStore.java @@ -0,0 +1,110 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.blobstore.hdfs; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.elasticsearch.common.blobstore.AppendableBlobContainer; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.blobstore.ImmutableBlobContainer; +import org.elasticsearch.common.settings.Settings; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.elasticsearch.common.util.concurrent.DynamicExecutors.*; + +/** + * @author kimchy (shay.banon) + */ +public class HdfsBlobStore implements BlobStore { + + private final FileSystem fileSystem; + + private final Path path; + + private final ExecutorService executorService; + + public HdfsBlobStore(Settings settings, FileSystem fileSystem, Path path) throws IOException { + this.fileSystem = fileSystem; + this.path = path; + + if (!fileSystem.exists(path)) { + fileSystem.mkdirs(path); + } + + executorService = Executors.newCachedThreadPool(daemonThreadFactory(settings, "hdfs_blobstore")); + } + + @Override public String toString() { + return path.toString(); + } + + public FileSystem fileSystem() { + return fileSystem; + } + + public Path path() { + return path; + } + + public ExecutorService executorService() { + return executorService; + } + + @Override public ImmutableBlobContainer immutableBlobContainer(BlobPath path) { + return new HdfsImmutableBlobContainer(this, path, buildAndCreate(path)); + } + + @Override public AppendableBlobContainer appendableBlobContainer(BlobPath path) { + return new HdfsAppendableBlobContainer(this, path, buildAndCreate(path)); + } + + @Override public void delete(BlobPath path) { + try { + fileSystem.delete(buildPath(path), true); + } catch (IOException e) { + // ignore + } + } + + @Override public void close() { + } + + private Path buildAndCreate(BlobPath blobPath) { + Path path = buildPath(blobPath); + try { + fileSystem.mkdirs(path); + } catch (IOException e) { + // ignore + } + return path; + } + + private Path buildPath(BlobPath blobPath) { + Path path = this.path; + for (String p : blobPath) { + path = new Path(path, p); + } + return path; + } +} diff --git a/plugins/hadoop/src/main/java/org/elasticsearch/common/blobstore/hdfs/HdfsImmutableBlobContainer.java b/plugins/hadoop/src/main/java/org/elasticsearch/common/blobstore/hdfs/HdfsImmutableBlobContainer.java new file mode 100644 index 00000000000..d3855b9cdbb --- /dev/null +++ b/plugins/hadoop/src/main/java/org/elasticsearch/common/blobstore/hdfs/HdfsImmutableBlobContainer.java @@ -0,0 +1,90 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.blobstore.hdfs; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.ImmutableBlobContainer; +import org.elasticsearch.common.blobstore.support.BlobStores; + +import java.io.IOException; +import java.io.InputStream; + +/** + * @author kimchy (shay.banon) + */ +public class HdfsImmutableBlobContainer extends AbstractHdfsBlobContainer implements ImmutableBlobContainer { + + public HdfsImmutableBlobContainer(HdfsBlobStore blobStore, BlobPath blobPath, Path path) { + super(blobStore, blobPath, path); + } + + @Override public void writeBlob(final String blobName, final InputStream is, final long sizeInBytes, final WriterListener listener) { + blobStore.executorService().execute(new Runnable() { + @Override public void run() { + Path file = new Path(path, blobName); + + FSDataOutputStream fileStream; + try { + fileStream = blobStore.fileSystem().create(file, true); + } catch (IOException e) { + listener.onFailure(e); + return; + } + try { + try { + byte[] buffer = new byte[16 * 1024]; + int bytesRead; + while ((bytesRead = is.read(buffer)) != -1) { + fileStream.write(buffer, 0, bytesRead); + } + } finally { + try { + is.close(); + } catch (IOException ex) { + // do nothing + } + try { + fileStream.close(); + } catch (IOException ex) { + // do nothing + } + } + listener.onCompleted(); + } catch (Exception e) { + // just on the safe size, try and delete it on failure + try { + if (blobStore.fileSystem().exists(file)) { + blobStore.fileSystem().delete(file, true); + } + } catch (Exception e1) { + // ignore + } + listener.onFailure(e); + } + } + }); + } + + @Override public void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException { + BlobStores.syncWriteBlob(this, blobName, is, sizeInBytes); + } +} \ No newline at end of file diff --git a/plugins/hadoop/src/main/java/org/elasticsearch/gateway/hdfs/HdfsGateway.java b/plugins/hadoop/src/main/java/org/elasticsearch/gateway/hdfs/HdfsGateway.java index 8fded8214cd..1305d0e221f 100644 --- a/plugins/hadoop/src/main/java/org/elasticsearch/gateway/hdfs/HdfsGateway.java +++ b/plugins/hadoop/src/main/java/org/elasticsearch/gateway/hdfs/HdfsGateway.java @@ -20,22 +20,16 @@ package org.elasticsearch.gateway.hdfs; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.blobstore.hdfs.HdfsBlobStore; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.common.xcontent.builder.BinaryXContentBuilder; -import org.elasticsearch.gateway.Gateway; -import org.elasticsearch.gateway.GatewayException; +import org.elasticsearch.gateway.blobstore.BlobStoreGateway; import java.io.IOException; import java.net.URI; @@ -44,25 +38,17 @@ import java.util.Map; /** * @author kimchy (shay.banon) */ -public class HdfsGateway extends AbstractLifecycleComponent implements Gateway { +public class HdfsGateway extends BlobStoreGateway { private final boolean closeFileSystem; private final FileSystem fileSystem; - private final String uri; - - private final Path path; - - private final Path metaDataPath; - - private volatile int currentIndex; - @Inject public HdfsGateway(Settings settings, ClusterName clusterName) throws IOException { super(settings); this.closeFileSystem = componentSettings.getAsBoolean("close_fs", true); - this.uri = componentSettings.get("uri"); + String uri = componentSettings.get("uri"); if (uri == null) { throw new ElasticSearchIllegalArgumentException("hdfs gateway requires the 'uri' setting to be set"); } @@ -70,11 +56,9 @@ public class HdfsGateway extends AbstractLifecycleComponent implements if (path == null) { throw new ElasticSearchIllegalArgumentException("hdfs gateway requires the 'path' path setting to be set"); } - this.path = new Path(new Path(path), clusterName.value()); + Path hPath = new Path(new Path(path), clusterName.value()); - logger.debug("Using uri [{}], path [{}]", this.uri, this.path); - - this.metaDataPath = new Path(this.path, "metadata"); + logger.debug("Using uri [{}], path [{}]", uri, hPath); Configuration conf = new Configuration(); Settings hdfsSettings = settings.getByPrefix("hdfs.conf."); @@ -84,141 +68,23 @@ public class HdfsGateway extends AbstractLifecycleComponent implements fileSystem = FileSystem.get(URI.create(uri), conf); - fileSystem.mkdirs(metaDataPath); - - this.currentIndex = findLatestIndex(); - logger.debug("Latest metadata found at index [" + currentIndex + "]"); + initialize(new HdfsBlobStore(settings, fileSystem, hPath), clusterName); } @Override public String type() { - return "fs"; - } - - public FileSystem fileSystem() { - return this.fileSystem; - } - - public Path path() { - return this.path; - } - - @Override protected void doStart() throws ElasticSearchException { - } - - @Override protected void doStop() throws ElasticSearchException { - } - - @Override protected void doClose() throws ElasticSearchException { - if (closeFileSystem) { - try { - fileSystem.close(); - } catch (IOException e) { - logger.warn("Failed to close file system {}", fileSystem); - } - } - } - - @Override public void write(MetaData metaData) throws GatewayException { - try { - final Path file = new Path(metaDataPath, "metadata-" + (currentIndex + 1)); - - BinaryXContentBuilder builder = XContentFactory.contentBinaryBuilder(XContentType.JSON); - builder.prettyPrint(); - builder.startObject(); - MetaData.Builder.toXContent(metaData, builder, ToXContent.EMPTY_PARAMS); - builder.endObject(); - - FSDataOutputStream fileStream = fileSystem.create(file, true); - fileStream.write(builder.unsafeBytes(), 0, builder.unsafeBytesLength()); - fileStream.flush(); - fileStream.sync(); - fileStream.close(); - - currentIndex++; - - FileStatus[] oldFiles = fileSystem.listStatus(metaDataPath, new PathFilter() { - @Override public boolean accept(Path path) { - return path.getName().startsWith("metadata-") && !path.getName().equals(file.getName()); - } - }); - - if (oldFiles != null) { - for (FileStatus oldFile : oldFiles) { - fileSystem.delete(oldFile.getPath(), false); - } - } - - } catch (IOException e) { - throw new GatewayException("can't write new metadata file into the gateway", e); - } - } - - @Override public MetaData read() throws GatewayException { - try { - if (currentIndex == -1) - return null; - - Path file = new Path(metaDataPath, "metadata-" + currentIndex); - return readMetaData(file); - } catch (GatewayException e) { - throw e; - } catch (Exception e) { - throw new GatewayException("can't read metadata file from the gateway", e); - } + return "hdfs"; } @Override public Class suggestIndexGateway() { return HdfsIndexGatewayModule.class; } - @Override public void reset() throws IOException { - fileSystem.delete(path, true); - } - - private int findLatestIndex() throws IOException { - FileStatus[] files = fileSystem.listStatus(metaDataPath, new PathFilter() { - @Override public boolean accept(Path path) { - return path.getName().startsWith("metadata-"); - } - }); - if (files == null || files.length == 0) { - return -1; - } - - int index = -1; - for (FileStatus file : files) { - if (logger.isTraceEnabled()) { - logger.trace("[findLatestMetadata]: Processing file [" + file + "]"); - } - String name = file.getPath().getName(); - int fileIndex = Integer.parseInt(name.substring(name.indexOf('-') + 1)); - if (fileIndex >= index) { - // try and read the meta data - try { - readMetaData(file.getPath()); - index = fileIndex; - } catch (Exception e) { - logger.warn("[findLatestMetadata]: Failed to read metadata from [" + file + "], ignoring...", e); - } - } - } - - return index; - } - - private MetaData readMetaData(Path file) throws IOException { - FSDataInputStream fileStream = fileSystem.open(file); - XContentParser parser = null; - try { - parser = XContentFactory.xContent(XContentType.JSON).createParser(fileStream); - return MetaData.Builder.fromXContent(parser, settings); - } finally { - if (parser != null) { - parser.close(); - } + @Override protected void doClose() throws ElasticSearchException { + super.doClose(); + if (closeFileSystem) { try { - fileStream.close(); - } catch (Exception e) { + fileSystem.close(); + } catch (IOException e) { // ignore } } diff --git a/plugins/hadoop/src/main/java/org/elasticsearch/index/gateway/hdfs/HdfsIndexGateway.java b/plugins/hadoop/src/main/java/org/elasticsearch/index/gateway/hdfs/HdfsIndexGateway.java index be07d090bef..1ef7041dfec 100644 --- a/plugins/hadoop/src/main/java/org/elasticsearch/index/gateway/hdfs/HdfsIndexGateway.java +++ b/plugins/hadoop/src/main/java/org/elasticsearch/index/gateway/hdfs/HdfsIndexGateway.java @@ -19,74 +19,28 @@ package org.elasticsearch.index.gateway.hdfs; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.elasticsearch.ElasticSearchException; -import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.gateway.Gateway; -import org.elasticsearch.gateway.hdfs.HdfsGateway; -import org.elasticsearch.index.AbstractIndexComponent; import org.elasticsearch.index.Index; -import org.elasticsearch.index.gateway.IndexGateway; import org.elasticsearch.index.gateway.IndexShardGateway; +import org.elasticsearch.index.gateway.blobstore.BlobStoreIndexGateway; import org.elasticsearch.index.settings.IndexSettings; -import java.io.IOException; - /** * @author kimchy (shay.banon) */ -public class HdfsIndexGateway extends AbstractIndexComponent implements IndexGateway { - - private final FileSystem fileSystem; - - private final Path indexPath; +public class HdfsIndexGateway extends BlobStoreIndexGateway { @Inject public HdfsIndexGateway(Index index, @IndexSettings Settings indexSettings, Gateway gateway) { - super(index, indexSettings); - - Path path = null; - String pathSetting = componentSettings.get("path"); - if (pathSetting != null) { - path = new Path(pathSetting); - } - if (gateway instanceof HdfsGateway) { - HdfsGateway hdfsGateway = (HdfsGateway) gateway; - fileSystem = hdfsGateway.fileSystem(); - if (path == null) { - path = hdfsGateway.path(); - } - } else { - throw new ElasticSearchIllegalArgumentException("Must configure an hdfs gateway to use index hdfs gateway"); - } - this.indexPath = new Path(new Path(path, "indices"), index.name()); + super(index, indexSettings, gateway); } @Override public String type() { return "hdfs"; } - public FileSystem fileSystem() { - return this.fileSystem; - } - - public Path indexPath() { - return this.indexPath; - } - @Override public Class shardGatewayClass() { return HdfsIndexShardGateway.class; } - - @Override public void close(boolean delete) throws ElasticSearchException { - if (delete) { - try { - fileSystem.delete(indexPath, true); - } catch (IOException e) { - logger.warn("Failed to delete [{}]", e, indexPath); - } - } - } } diff --git a/plugins/hadoop/src/main/java/org/elasticsearch/index/gateway/hdfs/HdfsIndexShardGateway.java b/plugins/hadoop/src/main/java/org/elasticsearch/index/gateway/hdfs/HdfsIndexShardGateway.java index 063cd8ad121..b7c34d513cd 100644 --- a/plugins/hadoop/src/main/java/org/elasticsearch/index/gateway/hdfs/HdfsIndexShardGateway.java +++ b/plugins/hadoop/src/main/java/org/elasticsearch/index/gateway/hdfs/HdfsIndexShardGateway.java @@ -19,421 +19,28 @@ package org.elasticsearch.index.gateway.hdfs; -import org.apache.hadoop.fs.*; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.store.IndexInput; -import org.elasticsearch.ElasticSearchException; -import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.DataInputStreamInput; -import org.elasticsearch.common.io.stream.DataOutputStreamOutput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.lucene.Directories; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.gateway.IndexGateway; -import org.elasticsearch.index.gateway.IndexShardGateway; -import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException; -import org.elasticsearch.index.gateway.IndexShardGatewaySnapshotFailedException; +import org.elasticsearch.index.gateway.blobstore.BlobStoreIndexShardGateway; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.service.IndexShard; -import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.store.Store; -import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler; import org.elasticsearch.threadpool.ThreadPool; -import java.io.EOFException; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -import static org.elasticsearch.common.lucene.Directories.*; -import static org.elasticsearch.index.translog.TranslogStreams.*; - /** * @author kimchy (shay.banon) */ -public class HdfsIndexShardGateway extends AbstractIndexShardComponent implements IndexShardGateway { - - private final InternalIndexShard indexShard; - - private final ThreadPool threadPool; - - private final RecoveryThrottler recoveryThrottler; - - private final Store store; - - private final FileSystem fileSystem; - - private final Path path; - - private final Path indexPath; - - private final Path translogPath; - - private volatile FSDataOutputStream currentTranslogStream = null; +public class HdfsIndexShardGateway extends BlobStoreIndexShardGateway { @Inject public HdfsIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexGateway hdfsIndexGateway, IndexShard indexShard, Store store, RecoveryThrottler recoveryThrottler) { - super(shardId, indexSettings); - this.indexShard = (InternalIndexShard) indexShard; - this.threadPool = threadPool; - this.recoveryThrottler = recoveryThrottler; - this.store = store; - - this.fileSystem = ((HdfsIndexGateway) hdfsIndexGateway).fileSystem(); - this.path = new Path(((HdfsIndexGateway) hdfsIndexGateway).indexPath(), Integer.toString(shardId.id())); - - this.indexPath = new Path(path, "index"); - this.translogPath = new Path(path, "translog"); - } - - @Override public void close(boolean delete) throws ElasticSearchException { - if (currentTranslogStream != null) { - try { - currentTranslogStream.close(); - } catch (IOException e) { - // ignore - } - } - if (delete) { - try { - fileSystem.delete(path, true); - } catch (IOException e) { - logger.warn("Failed to delete [{}]", e, path); - } - } + super(shardId, indexSettings, threadPool, hdfsIndexGateway, indexShard, store, recoveryThrottler); } @Override public String type() { return "hdfs"; } - - @Override public boolean requiresSnapshotScheduling() { - return true; - } - - @Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException { - RecoveryStatus.Index recoveryStatusIndex = recoverIndex(); - RecoveryStatus.Translog recoveryStatusTranslog = recoverTranslog(); - return new RecoveryStatus(recoveryStatusIndex, recoveryStatusTranslog); - } - - @Override public SnapshotStatus snapshot(Snapshot snapshot) { - long totalTimeStart = System.currentTimeMillis(); - boolean indexDirty = false; - boolean translogDirty = false; - - final SnapshotIndexCommit snapshotIndexCommit = snapshot.indexCommit(); - final Translog.Snapshot translogSnapshot = snapshot.translogSnapshot(); - - int indexNumberOfFiles = 0; - long indexTotalFilesSize = 0; - long indexTime = 0; - if (snapshot.indexChanged()) { - long time = System.currentTimeMillis(); - indexDirty = true; - // snapshot into the index - final CountDownLatch latch = new CountDownLatch(snapshotIndexCommit.getFiles().length); - final AtomicReference lastException = new AtomicReference(); - for (final String fileName : snapshotIndexCommit.getFiles()) { - // don't copy over the segments file, it will be copied over later on as part of the - // final snapshot phase - if (fileName.equals(snapshotIndexCommit.getSegmentsFileName())) { - latch.countDown(); - continue; - } - IndexInput indexInput = null; - try { - indexInput = snapshotIndexCommit.getDirectory().openInput(fileName); - FileStatus fileStatus = fileSystem.getFileStatus(new Path(indexPath, fileName)); - if (fileStatus.getLen() == indexInput.length()) { - // we assume its the same one, no need to copy - latch.countDown(); - continue; - } - } catch (FileNotFoundException e) { - // that's fine! - } catch (Exception e) { - logger.debug("Failed to verify file equality based on length, copying...", e); - } finally { - if (indexInput != null) { - try { - indexInput.close(); - } catch (IOException e) { - // ignore - } - } - } - indexNumberOfFiles++; - try { - indexTotalFilesSize += snapshotIndexCommit.getDirectory().fileLength(fileName); - } catch (IOException e) { - // ignore... - } - threadPool.execute(new Runnable() { - @Override public void run() { - Path copyTo = new Path(indexPath, fileName); - FSDataOutputStream fileStream; - try { - fileStream = fileSystem.create(copyTo, true); - copyFromDirectory(snapshotIndexCommit.getDirectory(), fileName, fileStream); - } catch (Exception e) { - lastException.set(new IndexShardGatewaySnapshotFailedException(shardId, "Failed to copy to [" + copyTo + "], from dir [" + snapshotIndexCommit.getDirectory() + "] and file [" + fileName + "]", e)); - } finally { - latch.countDown(); - } - } - }); - } - try { - latch.await(); - } catch (InterruptedException e) { - lastException.set(e); - } - if (lastException.get() != null) { - throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to perform snapshot (index files)", lastException.get()); - } - indexTime = System.currentTimeMillis() - time; - } - - int translogNumberOfOperations = 0; - long translogTime = 0; - if (snapshot.newTranslogCreated() || currentTranslogStream == null) { - translogDirty = true; - long time = System.currentTimeMillis(); - // a new translog, close the current stream - if (currentTranslogStream != null) { - try { - currentTranslogStream.close(); - } catch (IOException e) { - // ignore - } - } - Path currentTranslogPath = new Path(translogPath, "translog-" + translogSnapshot.translogId()); - try { - currentTranslogStream = fileSystem.create(currentTranslogPath, true); - StreamOutput out = new DataOutputStreamOutput(currentTranslogStream); - for (Translog.Operation operation : translogSnapshot) { - translogNumberOfOperations++; - writeTranslogOperation(out, operation); - } - currentTranslogStream.flush(); - currentTranslogStream.sync(); - } catch (Exception e) { - currentTranslogPath = null; - if (currentTranslogStream != null) { - try { - currentTranslogStream.close(); - } catch (IOException e1) { - // ignore - } finally { - currentTranslogStream = null; - } - } - throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to snapshot translog into [" + currentTranslogPath + "]", e); - } - translogTime = System.currentTimeMillis() - time; - } else if (snapshot.sameTranslogNewOperations()) { - translogDirty = true; - long time = System.currentTimeMillis(); - try { - StreamOutput out = new DataOutputStreamOutput(currentTranslogStream); - for (Translog.Operation operation : translogSnapshot.skipTo(snapshot.lastTranslogSize())) { - translogNumberOfOperations++; - writeTranslogOperation(out, operation); - } - } catch (Exception e) { - try { - currentTranslogStream.close(); - } catch (IOException e1) { - // ignore - } finally { - currentTranslogStream = null; - } - } - translogTime = System.currentTimeMillis() - time; - } - - - // now write the segments file and update the translog header - if (indexDirty) { - Path segmentsPath = new Path(indexPath, snapshotIndexCommit.getSegmentsFileName()); - try { - indexNumberOfFiles++; - indexTotalFilesSize += snapshotIndexCommit.getDirectory().fileLength(snapshotIndexCommit.getSegmentsFileName()); - long time = System.currentTimeMillis(); - FSDataOutputStream fileStream; - fileStream = fileSystem.create(segmentsPath, true); - copyFromDirectory(snapshotIndexCommit.getDirectory(), snapshotIndexCommit.getSegmentsFileName(), fileStream); - indexTime += (System.currentTimeMillis() - time); - } catch (Exception e) { - throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to finalize index snapshot into [" + segmentsPath + "]", e); - } - } - - // delete the old translog - if (snapshot.newTranslogCreated()) { - try { - fileSystem.delete(new Path(translogPath, "translog-" + snapshot.lastTranslogId()), false); - } catch (IOException e) { - // ignore - } - } - - // delete files that no longer exists in the index - if (indexDirty) { - try { - FileStatus[] existingFiles = fileSystem.listStatus(indexPath); - if (existingFiles != null) { - for (FileStatus existingFile : existingFiles) { - boolean found = false; - for (final String fileName : snapshotIndexCommit.getFiles()) { - if (existingFile.getPath().getName().equals(fileName)) { - found = true; - break; - } - } - if (!found) { - fileSystem.delete(existingFile.getPath(), false); - } - } - } - } catch (Exception e) { - // no worries, failed to clean old ones, will clean them later - } - } - - - return new SnapshotStatus(new TimeValue(System.currentTimeMillis() - totalTimeStart), - new SnapshotStatus.Index(indexNumberOfFiles, new ByteSizeValue(indexTotalFilesSize), new TimeValue(indexTime)), - new SnapshotStatus.Translog(translogNumberOfOperations, new TimeValue(translogTime))); - } - - private RecoveryStatus.Index recoverIndex() throws IndexShardGatewayRecoveryException { - FileStatus[] files; - try { - files = fileSystem.listStatus(indexPath); - } catch (IOException e) { - throw new IndexShardGatewayRecoveryException(shardId(), "Failed to list files", e); - } - if (files == null || files.length == 0) { - return new RecoveryStatus.Index(-1, 0, new ByteSizeValue(0, ByteSizeUnit.BYTES), TimeValue.timeValueMillis(0)); - } - - final CountDownLatch latch = new CountDownLatch(files.length); - final AtomicReference lastException = new AtomicReference(); - final AtomicLong throttlingWaitTime = new AtomicLong(); - for (final FileStatus file : files) { - threadPool.execute(new Runnable() { - @Override public void run() { - try { - long throttlingStartTime = System.currentTimeMillis(); - while (!recoveryThrottler.tryStream(shardId, file.getPath().getName())) { - Thread.sleep(recoveryThrottler.throttleInterval().millis()); - } - throttlingWaitTime.addAndGet(System.currentTimeMillis() - throttlingStartTime); - FSDataInputStream fileStream = fileSystem.open(file.getPath()); - Directories.copyToDirectory(fileStream, store.directory(), file.getPath().getName()); - } catch (Exception e) { - logger.debug("Failed to read [" + file + "] into [" + store + "]", e); - lastException.set(e); - } finally { - recoveryThrottler.streamDone(shardId, file.getPath().getName()); - latch.countDown(); - } - } - }); - } - try { - latch.await(); - } catch (InterruptedException e) { - lastException.set(e); - } - if (lastException.get() != null) { - throw new IndexShardGatewayRecoveryException(shardId(), "Failed to recover index files", lastException.get()); - } - long totalSize = 0; - for (FileStatus file : files) { - totalSize += file.getLen(); - } - - long version = -1; - try { - if (IndexReader.indexExists(store.directory())) { - version = IndexReader.getCurrentVersion(store.directory()); - } - } catch (IOException e) { - throw new IndexShardGatewayRecoveryException(shardId(), "Failed to fetch index version after copying it over", e); - } - - return new RecoveryStatus.Index(version, files.length, new ByteSizeValue(totalSize, ByteSizeUnit.BYTES), TimeValue.timeValueMillis(throttlingWaitTime.get())); - } - - private RecoveryStatus.Translog recoverTranslog() throws IndexShardGatewayRecoveryException { - FSDataInputStream fileStream = null; - try { - long recoveryTranslogId = findLatestTranslogId(); - if (recoveryTranslogId == -1) { - // no recovery file found, start the shard and bail - indexShard.start(); - return new RecoveryStatus.Translog(-1, 0, new ByteSizeValue(0, ByteSizeUnit.BYTES)); - } - FileStatus status = fileSystem.getFileStatus(new Path(translogPath, "translog-" + recoveryTranslogId)); - fileStream = fileSystem.open(status.getPath()); - ArrayList operations = Lists.newArrayList(); - for (; ;) { - try { - operations.add(readTranslogOperation(new DataInputStreamInput(fileStream))); - } catch (EOFException e) { - // reached end of stream - break; - } - } - indexShard.performRecovery(operations); - return new RecoveryStatus.Translog(recoveryTranslogId, operations.size(), new ByteSizeValue(status.getLen(), ByteSizeUnit.BYTES)); - } catch (Exception e) { - throw new IndexShardGatewayRecoveryException(shardId(), "Failed to perform recovery of translog", e); - } finally { - if (fileStream != null) { - try { - fileStream.close(); - } catch (IOException e) { - // ignore - } - } - } - } - - - private long findLatestTranslogId() throws IOException { - FileStatus[] files = fileSystem.listStatus(translogPath, new PathFilter() { - @Override public boolean accept(Path path) { - return path.getName().startsWith("translog-"); - } - }); - if (files == null) { - return -1; - } - - long index = -1; - for (FileStatus file : files) { - String name = file.getPath().getName(); - long fileIndex = Long.parseLong(name.substring(name.indexOf('-') + 1)); - if (fileIndex >= index) { - index = fileIndex; - } - } - - return index; - } }