Hadoop Plugin: Use HDFS as gateway storage, closes #189.

This commit is contained in:
kimchy 2010-05-23 17:02:06 +03:00
parent 116cfce6f2
commit 28fa384b32
3 changed files with 49 additions and 21 deletions

View File

@ -38,6 +38,7 @@ import org.elasticsearch.util.xcontent.XContentType;
import org.elasticsearch.util.xcontent.builder.BinaryXContentBuilder;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
/**
@ -45,8 +46,12 @@ import java.util.Map;
*/
public class HdfsGateway extends AbstractLifecycleComponent<Gateway> implements Gateway {
private final boolean closeFileSystem;
private final FileSystem fileSystem;
private final String uri;
private final Path path;
private final Path metaDataPath;
@ -56,12 +61,19 @@ public class HdfsGateway extends AbstractLifecycleComponent<Gateway> implements
@Inject public HdfsGateway(Settings settings, ClusterName clusterName) throws IOException {
super(settings);
this.closeFileSystem = componentSettings.getAsBoolean("close_fs", true);
this.uri = componentSettings.get("uri");
if (uri == null) {
throw new ElasticSearchIllegalArgumentException("hdfs gateway requires the 'uri' setting to be set");
}
String path = componentSettings.get("path");
if (path == null) {
throw new ElasticSearchIllegalArgumentException("hdfs gateway requires the 'path' path setting to be set");
}
this.path = new Path(new Path(path), clusterName.value());
logger.debug("Using uri [{}], path [{}]", this.uri, this.path);
this.metaDataPath = new Path(this.path, "metadata");
Configuration conf = new Configuration();
@ -70,7 +82,7 @@ public class HdfsGateway extends AbstractLifecycleComponent<Gateway> implements
conf.set(entry.getKey(), entry.getValue());
}
fileSystem = FileSystem.get(conf);
fileSystem = FileSystem.get(URI.create(uri), conf);
fileSystem.mkdirs(metaDataPath);
@ -93,10 +105,12 @@ public class HdfsGateway extends AbstractLifecycleComponent<Gateway> implements
}
@Override protected void doClose() throws ElasticSearchException {
try {
fileSystem.close();
} catch (IOException e) {
logger.warn("Failed to close file system {}", fileSystem);
if (closeFileSystem) {
try {
fileSystem.close();
} catch (IOException e) {
logger.warn("Failed to close file system {}", fileSystem);
}
}
}
@ -124,8 +138,10 @@ public class HdfsGateway extends AbstractLifecycleComponent<Gateway> implements
}
});
for (FileStatus oldFile : oldFiles) {
fileSystem.delete(oldFile.getPath(), false);
if (oldFiles != null) {
for (FileStatus oldFile : oldFiles) {
fileSystem.delete(oldFile.getPath(), false);
}
}
} catch (IOException e) {
@ -161,6 +177,9 @@ public class HdfsGateway extends AbstractLifecycleComponent<Gateway> implements
return path.getName().startsWith("metadata-");
}
});
if (files == null || files.length == 0) {
return -1;
}
int index = -1;
for (FileStatus file : files) {
@ -174,7 +193,7 @@ public class HdfsGateway extends AbstractLifecycleComponent<Gateway> implements
try {
readMetaData(file.getPath());
index = fileIndex;
} catch (IOException e) {
} catch (Exception e) {
logger.warn("[findLatestMetadata]: Failed to read metadata from [" + file + "], ignoring...", e);
}
}

View File

@ -290,16 +290,18 @@ public class HdfsIndexShardGateway extends AbstractIndexShardComponent implement
if (indexDirty) {
try {
FileStatus[] existingFiles = fileSystem.listStatus(indexPath);
for (FileStatus existingFile : existingFiles) {
boolean found = false;
for (final String fileName : snapshotIndexCommit.getFiles()) {
if (existingFile.getPath().getName().equals(fileName)) {
found = true;
break;
if (existingFiles != null) {
for (FileStatus existingFile : existingFiles) {
boolean found = false;
for (final String fileName : snapshotIndexCommit.getFiles()) {
if (existingFile.getPath().getName().equals(fileName)) {
found = true;
break;
}
}
if (!found) {
fileSystem.delete(existingFile.getPath(), false);
}
}
if (!found) {
fileSystem.delete(existingFile.getPath(), false);
}
}
} catch (Exception e) {
@ -320,6 +322,10 @@ public class HdfsIndexShardGateway extends AbstractIndexShardComponent implement
} catch (IOException e) {
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to list files", e);
}
if (files == null || files.length == 0) {
return new RecoveryStatus.Index(-1, 0, new SizeValue(0, SizeUnit.BYTES), TimeValue.timeValueMillis(0));
}
final CountDownLatch latch = new CountDownLatch(files.length);
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
final AtomicLong throttlingWaitTime = new AtomicLong();
@ -334,7 +340,6 @@ public class HdfsIndexShardGateway extends AbstractIndexShardComponent implement
throttlingWaitTime.addAndGet(System.currentTimeMillis() - throttlingStartTime);
FSDataInputStream fileStream = fileSystem.open(file.getPath());
Directories.copyToDirectory(fileStream, store.directory(), file.getPath().getName());
fileSystem.close();
} catch (Exception e) {
logger.debug("Failed to read [" + file + "] into [" + store + "]", e);
lastException.set(e);
@ -397,7 +402,7 @@ public class HdfsIndexShardGateway extends AbstractIndexShardComponent implement
} finally {
if (fileStream != null) {
try {
fileSystem.close();
fileStream.close();
} catch (IOException e) {
// ignore
}

View File

@ -52,15 +52,19 @@ public class HdfsGatewayTests {
private Node node;
@BeforeMethod void setUpNodes() throws Exception {
// start the node and reset the gateway
node = buildNode();
((InternalNode) node).injector().getInstance(Gateway.class).reset();
node.start();
node.close();
// now start the node clean
node = buildNode().start();
}
private Node buildNode() {
Settings settings = settingsBuilder()
// .put("hdfs.conf.fs.default.name", "file://work")
.put("gateway.type", "hdfs")
// .put("gateway.hdfs.uri", "hdfs://training-vm.local:8022")
.put("gateway.hdfs.uri", "file:///")
.put("gateway.hdfs.path", "work/hdfs/gateway")
.build();
return nodeBuilder().settings(settingsBuilder().put(settings).put("node.name", "node1")).build();