HDFS-8180. AbstractFileSystem Implementation for WebHdfs. Contributed by Sathosh G Nayak.

This commit is contained in:
Jakob Homan 2015-07-28 21:03:31 -07:00
parent 69b095730b
commit 0712a8103f
8 changed files with 411 additions and 2 deletions

View File

@ -595,6 +595,18 @@ for ldap providers in the same way as above does.
<description>The FileSystem for Ftp: uris.</description>
</property>
<property>
<name>fs.AbstractFileSystem.webhdfs.impl</name>
<value>org.apache.hadoop.fs.WebHdfs</value>
<description>The FileSystem for webhdfs: uris.</description>
</property>
<property>
<name>fs.AbstractFileSystem.swebhdfs.impl</name>
<value>org.apache.hadoop.fs.SWebHdfs</value>
<description>The FileSystem for swebhdfs: uris.</description>
</property>
<property>
<name>fs.ftp.host</name>
<value>0.0.0.0</value>

View File

@ -1249,7 +1249,7 @@ public abstract class FileContextMainOperationsBaseTest {
byte[] bb = new byte[(int)len];
FSDataInputStream fsdis = fc.open(path);
try {
fsdis.read(bb);
fsdis.readFully(bb);
} finally {
fsdis.close();
}
@ -1310,7 +1310,7 @@ public abstract class FileContextMainOperationsBaseTest {
byte[] bb = new byte[data.length];
FSDataInputStream fsdis = fc.open(path);
try {
fsdis.read(bb);
fsdis.readFully(bb);
} finally {
fsdis.close();
}

View File

@ -755,6 +755,8 @@ Release 2.8.0 - UNRELEASED
HDFS-7858. Improve HA Namenode Failover detection on the client. (asuresh)
HDFS-8180. AbstractFileSystem Implementation for WebHdfs. (snayak via jghoman)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.web.SWebHdfsFileSystem;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
* AbstractFileSystem implementation for HDFS over the web (secure).
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class SWebHdfs extends DelegateToFileSystem {
public static final String SCHEME = "swebhdfs";
/**
* This constructor has the signature needed by
* {@link AbstractFileSystem#createFileSystem(URI, Configuration)}
*
* @param theUri which must be that of swebhdfs
* @param conf configuration
* @throws IOException
*/
SWebHdfs(URI theUri, Configuration conf)
throws IOException, URISyntaxException {
super(theUri, new SWebHdfsFileSystem(), conf, SCHEME, false);
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
* AbstractFileSystem implementation for HDFS over the web.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class WebHdfs extends DelegateToFileSystem {
public static final String SCHEME = "webhdfs";
/**
* This constructor has the signature needed by
* {@link AbstractFileSystem#createFileSystem(URI, Configuration)}
*
* @param theUri which must be that of webhdfs
* @param conf configuration
* @throws IOException
*/
WebHdfs(URI theUri, Configuration conf)
throws IOException, URISyntaxException {
super(theUri, new WebHdfsFileSystem(), conf, SCHEME, false);
}
}

View File

@ -0,0 +1,26 @@
<html>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<body>
<p>Implementations of {@link org.apache.hadoop.fs.AbstractFileSystem} for hdfs
over rpc and hdfs over web.</p>
</body>
</html>

View File

@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import javax.security.auth.login.LoginException;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import static org.apache.hadoop.fs.FileContextTestHelper.getDefaultBlockSize;
import static org.apache.hadoop.fs.FileContextTestHelper.getFileData;
/**
* Test of FileContext apis on SWebhdfs.
*/
public class TestSWebHdfsFileContextMainOperations
extends TestWebHdfsFileContextMainOperations {
private static MiniDFSCluster cluster;
private static Path defaultWorkingDirectory;
private static String keystoresDir;
private static String sslConfDir;
protected static URI webhdfsUrl;
private static final HdfsConfiguration CONF = new HdfsConfiguration();
private static final String BASEDIR =
System.getProperty("test.build.dir", "target/test-dir") + "/"
+ TestSWebHdfsFileContextMainOperations.class.getSimpleName();
protected static int numBlocks = 2;
protected static final byte[] data = getFileData(numBlocks,
getDefaultBlockSize());
private static Configuration sslConf;
@BeforeClass
public static void clusterSetupAtBeginning()
throws IOException, LoginException, URISyntaxException {
File base = new File(BASEDIR);
FileUtil.fullyDelete(base);
base.mkdirs();
keystoresDir = new File(BASEDIR).getAbsolutePath();
sslConf = new Configuration();
try {
sslConfDir = KeyStoreTestUtil
.getClasspathDir(TestSWebHdfsFileContextMainOperations.class);
KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, sslConf, false);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
CONF.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, "HTTPS_ONLY");
CONF.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
CONF.set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
CONF.set(SSLFactory.SSL_HOSTNAME_VERIFIER_KEY, "DEFAULT_AND_LOCALHOST");
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(2).build();
cluster.waitClusterUp();
webhdfsUrl = new URI(SWebHdfs.SCHEME + "://" + cluster.getConfiguration(0)
.get(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY));
fc = FileContext.getFileContext(webhdfsUrl, CONF);
defaultWorkingDirectory = fc.makeQualified(new Path(
"/user/" + UserGroupInformation.getCurrentUser().getShortUserName()));
fc.mkdir(defaultWorkingDirectory, FileContext.DEFAULT_PERM, true);
}
@Override
public URI getWebhdfsUrl() {
return webhdfsUrl;
}
@AfterClass
public static void ClusterShutdownAtEnd() throws Exception {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
FileUtil.fullyDelete(new File(BASEDIR));
KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, sslConfDir);
}
}

View File

@ -0,0 +1,157 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import javax.security.auth.login.LoginException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.EnumSet;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.FileContextTestHelper.getDefaultBlockSize;
import static org.apache.hadoop.fs.FileContextTestHelper.getFileData;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertTrue;
/**
* Test of FileContext apis on Webhdfs.
*/
public class TestWebHdfsFileContextMainOperations
extends FileContextMainOperationsBaseTest {
protected static MiniDFSCluster cluster;
private static Path defaultWorkingDirectory;
protected static URI webhdfsUrl;
protected static int numBlocks = 2;
protected static final byte[] data = getFileData(numBlocks,
getDefaultBlockSize());
protected static final HdfsConfiguration CONF = new HdfsConfiguration();
@Override
public Path getDefaultWorkingDirectory() {
return defaultWorkingDirectory;
}
public URI getWebhdfsUrl() {
return webhdfsUrl;
}
@BeforeClass
public static void clusterSetupAtBeginning()
throws IOException, LoginException, URISyntaxException {
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(2).build();
cluster.waitClusterUp();
webhdfsUrl = new URI(WebHdfs.SCHEME + "://" + cluster.getConfiguration(0)
.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY));
fc = FileContext.getFileContext(webhdfsUrl, CONF);
defaultWorkingDirectory = fc.makeQualified(new Path(
"/user/" + UserGroupInformation.getCurrentUser().getShortUserName()));
fc.mkdir(defaultWorkingDirectory, FileContext.DEFAULT_PERM, true);
}
@Before
public void setUp() throws Exception {
URI webhdfsUrlReal = getWebhdfsUrl();
Path testBuildData = new Path(
webhdfsUrlReal + "/build/test/data/" + RandomStringUtils
.randomAlphanumeric(10));
Path rootPath = new Path(testBuildData, "root-uri");
localFsRootPath = rootPath.makeQualified(webhdfsUrlReal, null);
fc.mkdir(getTestRootPath(fc, "test"), FileContext.DEFAULT_PERM, true);
}
private Path getTestRootPath(FileContext fc, String path) {
return fileContextTestHelper.getTestRootPath(fc, path);
}
@Override
protected boolean listCorruptedBlocksSupported() {
return false;
}
/**
* Test FileContext APIs when symlinks are not supported
* TODO: Open separate JIRA for full support of the Symlink in webhdfs
*/
@Test
public void testUnsupportedSymlink() throws IOException {
/**
* WebHdfs client Partially supports the Symlink.
* creation of Symlink is supported, but the getLinkTargetPath() api is not supported currently,
* Implement the test case once the full support is available.
*/
}
/**
* TODO: Open JIRA for the idiosyncrasies between hdfs and webhdfs
*/
public void testSetVerifyChecksum() throws IOException {
final Path rootPath = getTestRootPath(fc, "test");
final Path path = new Path(rootPath, "zoo");
FSDataOutputStream out = fc
.create(path, EnumSet.of(CREATE), Options.CreateOpts.createParent());
try {
out.write(data, 0, data.length);
} finally {
out.close();
}
//In webhdfs scheme fc.setVerifyChecksum() can be called only after
// writing first few bytes but in case of the hdfs scheme we can call
// immediately after the creation call.
// instruct FS to verify checksum through the FileContext:
fc.setVerifyChecksum(true, path);
FileStatus fileStatus = fc.getFileStatus(path);
final long len = fileStatus.getLen();
assertTrue(len == data.length);
byte[] bb = new byte[(int) len];
FSDataInputStream fsdis = fc.open(path);
try {
fsdis.readFully(bb);
} finally {
fsdis.close();
}
assertArrayEquals(data, bb);
}
@AfterClass
public static void ClusterShutdownAtEnd() throws Exception {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
}