Merge trunk into branch.

Branch will not build after this commit: need to implement new JournalManager
interfaces in QuorumJournalManager in a follow-up.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1371518 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-08-09 22:29:36 +00:00
commit 302f1e96ce
1540 changed files with 9052 additions and 2799 deletions

View File

@ -79,15 +79,15 @@ if $PATCH -p0 -E --dry-run < $PATCH_FILE 2>&1 > $TMP; then
if [[ -d hadoop-common-project ]]; then
echo Looks like this is being run at project root
# if all of the lines start with hadoop-common/, hadoop-hdfs/, or hadoop-mapreduce/, this is
# if all of the lines start with hadoop-common/, hadoop-hdfs/, hadoop-yarn/ or hadoop-mapreduce/, this is
# relative to the hadoop root instead of the subproject root, so we need
# to chop off another layer
elif [[ "$PREFIX_DIRS_AND_FILES" =~ ^(hadoop-common-project|hadoop-hdfs-project|hadoop-mapreduce-project)$ ]]; then
elif [[ "$PREFIX_DIRS_AND_FILES" =~ ^(hadoop-common-project|hadoop-hdfs-project|hadoop-yarn-project|hadoop-mapreduce-project)$ ]]; then
echo Looks like this is relative to project root. Increasing PLEVEL
PLEVEL=$[$PLEVEL + 1]
elif ! echo "$PREFIX_DIRS_AND_FILES" | grep -vxq 'hadoop-common-project\|hadoop-hdfs-project\|hadoop-mapreduce-project' ; then
elif ! echo "$PREFIX_DIRS_AND_FILES" | grep -vxq 'hadoop-common-project\|hadoop-hdfs-project\|hadoop-yarn-project\|hadoop-mapreduce-project' ; then
echo Looks like this is a cross-subproject patch. Try applying from the project root
cleanup 1
fi

View File

@ -23,19 +23,6 @@
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/target/native/target/usr/local/bin</directory>
<outputDirectory>bin</outputDirectory>
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
<directory>hadoop-yarn/bin</directory>
<outputDirectory>bin</outputDirectory>
<includes>
<include>yarn</include>
</includes>
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
<directory>bin</directory>
<outputDirectory>bin</outputDirectory>
@ -52,25 +39,6 @@
</includes>
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
<directory>hadoop-yarn/bin</directory>
<outputDirectory>libexec</outputDirectory>
<includes>
<include>yarn-config.sh</include>
</includes>
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
<directory>hadoop-yarn/bin</directory>
<outputDirectory>sbin</outputDirectory>
<includes>
<include>yarn-daemon.sh</include>
<include>yarn-daemons.sh</include>
<include>start-yarn.sh</include>
<include>stop-yarn.sh</include>
</includes>
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
<directory>bin</directory>
<outputDirectory>sbin</outputDirectory>
@ -80,7 +48,7 @@
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
<directory>hadoop-yarn/conf</directory>
<directory>conf</directory>
<outputDirectory>etc/hadoop</outputDirectory>
<includes>
<include>**/*</include>
@ -153,10 +121,18 @@
<excludes>
<exclude>org.apache.hadoop:hadoop-common</exclude>
<exclude>org.apache.hadoop:hadoop-hdfs</exclude>
<!-- use slf4j from common to avoid multiple binding warnings -->
<exclude>org.slf4j:slf4j-api</exclude>
<exclude>org.slf4j:slf4j-log4j12</exclude>
<!-- use slf4j from common to avoid multiple binding warnings -->
<exclude>org.slf4j:slf4j-api</exclude>
<exclude>org.slf4j:slf4j-log4j12</exclude>
<exclude>org.hsqldb:hsqldb</exclude>
</excludes>
</dependencySet>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>/share/hadoop/${hadoop.component}/lib-examples</outputDirectory>
<includes>
<include>org.hsqldb:hsqldb</include>
</includes>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,145 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id>hadoop-yarn-dist</id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/target/native/target/usr/local/bin</directory>
<outputDirectory>bin</outputDirectory>
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
<directory>hadoop-yarn/bin</directory>
<outputDirectory>bin</outputDirectory>
<includes>
<include>yarn</include>
</includes>
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
<directory>hadoop-yarn/bin</directory>
<outputDirectory>libexec</outputDirectory>
<includes>
<include>yarn-config.sh</include>
</includes>
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
<directory>hadoop-yarn/bin</directory>
<outputDirectory>sbin</outputDirectory>
<includes>
<include>yarn-daemon.sh</include>
<include>yarn-daemons.sh</include>
<include>start-yarn.sh</include>
<include>stop-yarn.sh</include>
</includes>
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
<directory>hadoop-yarn/conf</directory>
<outputDirectory>etc/hadoop</outputDirectory>
<includes>
<include>**/*</include>
</includes>
</fileSet>
<fileSet>
<directory>${basedir}</directory>
<outputDirectory>/share/doc/hadoop/${hadoop.component}</outputDirectory>
<includes>
<include>*.txt</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.build.directory}/webapps</directory>
<outputDirectory>/share/hadoop/${hadoop.component}/webapps</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/src/main/conf</directory>
<outputDirectory>/share/hadoop/${hadoop.component}/templates</outputDirectory>
<includes>
<include>*-site.xml</include>
</includes>
</fileSet>
<fileSet>
<directory>${basedir}/src/main/packages/templates/conf</directory>
<outputDirectory>/share/hadoop/${hadoop.component}/templates/conf</outputDirectory>
<includes>
<include>*</include>
</includes>
</fileSet>
<fileSet>
<directory>${basedir}/dev-support/jdiff</directory>
<outputDirectory>/share/hadoop/${hadoop.component}/jdiff</outputDirectory>
</fileSet>
<fileSet>
<directory>${project.build.directory}/site/jdiff/xml</directory>
<outputDirectory>/share/hadoop/${hadoop.component}/jdiff</outputDirectory>
</fileSet>
<fileSet>
<directory>${project.build.directory}/site</directory>
<outputDirectory>/share/doc/hadoop/${hadoop.component}</outputDirectory>
</fileSet>
</fileSets>
<moduleSets>
<moduleSet>
<binaries>
<outputDirectory>share/hadoop/${hadoop.component}</outputDirectory>
<includeDependencies>false</includeDependencies>
<unpack>false</unpack>
</binaries>
</moduleSet>
<moduleSet>
<includes>
<include>org.apache.hadoop:hadoop-yarn-server-tests</include>
</includes>
<binaries>
<attachmentClassifier>tests</attachmentClassifier>
<outputDirectory>share/hadoop/${hadoop.component}</outputDirectory>
<includeDependencies>false</includeDependencies>
<unpack>false</unpack>
</binaries>
</moduleSet>
</moduleSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>/share/hadoop/${hadoop.component}/lib</outputDirectory>
<!-- Exclude hadoop artifacts. They will be found via HADOOP* env -->
<excludes>
<exclude>org.apache.hadoop:hadoop-common</exclude>
<exclude>org.apache.hadoop:hadoop-hdfs</exclude>
<!-- use slf4j from common to avoid multiple binding warnings -->
<exclude>org.slf4j:slf4j-api</exclude>
<exclude>org.slf4j:slf4j-log4j12</exclude>
<exclude>org.hsqldb:hsqldb</exclude>
</excludes>
</dependencySet>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>/share/hadoop/${hadoop.component}/lib-examples</outputDirectory>
<includes>
<include>org.hsqldb:hsqldb</include>
</includes>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -171,6 +171,7 @@ public static Class<? extends Authenticator> getDefaultAuthenticator() {
}
private Authenticator authenticator;
private ConnectionConfigurator connConfigurator;
/**
* Creates an {@link AuthenticatedURL}.
@ -186,11 +187,25 @@ public AuthenticatedURL() {
* KerberosAuthenticator} is used.
*/
public AuthenticatedURL(Authenticator authenticator) {
this(authenticator, null);
}
/**
* Creates an <code>AuthenticatedURL</code>.
*
* @param authenticator the {@link Authenticator} instance to use, if <code>null</code> a {@link
* KerberosAuthenticator} is used.
* @param connConfigurator a connection configurator.
*/
public AuthenticatedURL(Authenticator authenticator,
ConnectionConfigurator connConfigurator) {
try {
this.authenticator = (authenticator != null) ? authenticator : DEFAULT_AUTHENTICATOR.newInstance();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
this.connConfigurator = connConfigurator;
this.authenticator.setConnectionConfigurator(connConfigurator);
}
/**
@ -216,6 +231,9 @@ public HttpURLConnection openConnection(URL url, Token token) throws IOException
}
authenticator.authenticate(url, token);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
if (connConfigurator != null) {
conn = connConfigurator.configure(conn);
}
injectToken(conn, token);
return conn;
}

View File

@ -24,6 +24,14 @@
*/
public interface Authenticator {
/**
* Sets a {@link ConnectionConfigurator} instance to use for
* configuring connections.
*
* @param configurator the {@link ConnectionConfigurator} instance.
*/
public void setConnectionConfigurator(ConnectionConfigurator configurator);
/**
* Authenticates against a URL and returns a {@link AuthenticatedURL.Token} to be
* used by subsequent requests.

View File

@ -0,0 +1,36 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/
package org.apache.hadoop.security.authentication.client;
import java.io.IOException;
import java.net.HttpURLConnection;
/**
* Interface to configure {@link HttpURLConnection} created by
* {@link AuthenticatedURL} instances.
*/
public interface ConnectionConfigurator {
/**
* Configures the given {@link HttpURLConnection} instance.
*
* @param conn the {@link HttpURLConnection} instance to configure.
* @return the configured {@link HttpURLConnection} instance.
*
* @throws IOException if an IO error occurred.
*/
public HttpURLConnection configure(HttpURLConnection conn) throws IOException;
}

View File

@ -113,6 +113,18 @@ public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
private URL url;
private HttpURLConnection conn;
private Base64 base64;
private ConnectionConfigurator connConfigurator;
/**
* Sets a {@link ConnectionConfigurator} instance to use for
* configuring connections.
*
* @param configurator the {@link ConnectionConfigurator} instance.
*/
@Override
public void setConnectionConfigurator(ConnectionConfigurator configurator) {
connConfigurator = configurator;
}
/**
* Performs SPNEGO authentication against the specified URL.
@ -135,6 +147,9 @@ public void authenticate(URL url, AuthenticatedURL.Token token)
this.url = url;
base64 = new Base64(0);
conn = (HttpURLConnection) url.openConnection();
if (connConfigurator != null) {
conn = connConfigurator.configure(conn);
}
conn.setRequestMethod(AUTH_HTTP_METHOD);
conn.connect();
if (isNegotiate()) {
@ -244,6 +259,9 @@ public Void run() throws Exception {
private void sendToken(byte[] outToken) throws IOException, AuthenticationException {
String token = base64.encodeToString(outToken);
conn = (HttpURLConnection) url.openConnection();
if (connConfigurator != null) {
conn = connConfigurator.configure(conn);
}
conn.setRequestMethod(AUTH_HTTP_METHOD);
conn.setRequestProperty(AUTHORIZATION, NEGOTIATE + " " + token);
conn.connect();

View File

@ -32,6 +32,19 @@ public class PseudoAuthenticator implements Authenticator {
private static final String USER_NAME_EQ = USER_NAME + "=";
private ConnectionConfigurator connConfigurator;
/**
* Sets a {@link ConnectionConfigurator} instance to use for
* configuring connections.
*
* @param configurator the {@link ConnectionConfigurator} instance.
*/
@Override
public void setConnectionConfigurator(ConnectionConfigurator configurator) {
connConfigurator = configurator;
}
/**
* Performs simple authentication against the specified URL.
* <p/>
@ -56,6 +69,9 @@ public void authenticate(URL url, AuthenticatedURL.Token token) throws IOExcepti
strUrl += paramSeparator + USER_NAME_EQ + getUserName();
url = new URL(strUrl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
if (connConfigurator != null) {
conn = connConfigurator.configure(conn);
}
conn.setRequestMethod("OPTIONS");
conn.connect();
AuthenticatedURL.extractToken(conn, token);

View File

@ -13,8 +13,10 @@
*/
package org.apache.hadoop.security.authentication.client;
import junit.framework.Assert;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import junit.framework.TestCase;
import org.mockito.Mockito;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.FilterHolder;
@ -113,6 +115,18 @@ protected String getBaseURL() {
return "http://" + host + ":" + port + "/foo/bar";
}
private static class TestConnectionConfigurator
implements ConnectionConfigurator {
boolean invoked;
@Override
public HttpURLConnection configure(HttpURLConnection conn)
throws IOException {
invoked = true;
return conn;
}
}
private String POST = "test";
protected void _testAuthentication(Authenticator authenticator, boolean doPost) throws Exception {
@ -120,8 +134,10 @@ protected void _testAuthentication(Authenticator authenticator, boolean doPost)
try {
URL url = new URL(getBaseURL());
AuthenticatedURL.Token token = new AuthenticatedURL.Token();
AuthenticatedURL aUrl = new AuthenticatedURL(authenticator);
TestConnectionConfigurator connConf = new TestConnectionConfigurator();
AuthenticatedURL aUrl = new AuthenticatedURL(authenticator, connConf);
HttpURLConnection conn = aUrl.openConnection(url, token);
Assert.assertTrue(connConf.invoked);
String tokenStr = token.toString();
if (doPost) {
conn.setRequestMethod("POST");

View File

@ -18,6 +18,7 @@
import org.mockito.Mockito;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -114,4 +115,21 @@ public void testExtractTokenFail() throws Exception {
}
}
public void testConnectionConfigurator() throws Exception {
HttpURLConnection conn = Mockito.mock(HttpURLConnection.class);
Mockito.when(conn.getResponseCode()).
thenReturn(HttpURLConnection.HTTP_UNAUTHORIZED);
ConnectionConfigurator connConf =
Mockito.mock(ConnectionConfigurator.class);
Mockito.when(connConf.configure(Mockito.<HttpURLConnection>any())).
thenReturn(conn);
Authenticator authenticator = Mockito.mock(Authenticator.class);
AuthenticatedURL aURL = new AuthenticatedURL(authenticator, connConf);
aURL.openConnection(new URL("http://foo"), new AuthenticatedURL.Token());
Mockito.verify(connConf).configure(Mockito.<HttpURLConnection>any());
}
}

View File

@ -211,6 +211,8 @@ Branch-2 ( Unreleased changes )
HADOOP-8465. hadoop-auth should support ephemeral authentication (tucu)
HADOOP-8644. AuthenticatedURL should be able to use SSLFactory. (tucu)
IMPROVEMENTS
HADOOP-8340. SNAPSHOT build versions should compare as less than their eventual
@ -277,6 +279,9 @@ Branch-2 ( Unreleased changes )
HADOOP-8609. IPC server logs a useless message when shutting down socket.
(Jon Zuanich via atm)
HADOOP-8620. Add -Drequire.fuse and -Drequire.snappy. (Colin
Patrick McCabe via eli)
BUG FIXES
HADOOP-8372. NetUtils.normalizeHostName() incorrectly handles hostname
@ -371,6 +376,12 @@ Branch-2 ( Unreleased changes )
hadoop.security.group.mapping.ldap.search.filter.user. (Jonathan Natkins
via atm)
HADOOP-8480. The native build should honor -DskipTests.
(Colin Patrick McCabe via eli)
HADOOP-8659. Native libraries must build with soft-float ABI for Oracle JVM
on ARM. (Trevor Robinson via todd)
BREAKDOWN OF HDFS-3042 SUBTASKS
HADOOP-8220. ZKFailoverController doesn't handle failure to become active
@ -403,6 +414,8 @@ Branch-2 ( Unreleased changes )
HADOOP-8405. ZKFC tests leak ZK instances. (todd)
HADOOP-8660. TestPseudoAuthenticator failing with NPE. (tucu)
Release 2.0.0-alpha - 05-23-2012
INCOMPATIBLE CHANGES
@ -867,6 +880,15 @@ Release 0.23.3 - UNRELEASED
HADOOP-8635. Cannot cancel paths registered deleteOnExit (daryn via bobby)
HADOOP-8637. FilterFileSystem#setWriteChecksum is broken (daryn via bobby)
HADOOP-8370. Native build failure: javah: class file for
org.apache.hadoop.classification.InterfaceAudience not found (Trevor
Robinson via tgraves)
HADOOP-8633. Interrupted FsShell copies may leave tmp files (Daryn Sharp
via tgraves)
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -31,9 +31,6 @@
<packaging>jar</packaging>
<properties>
<snappy.prefix>/usr/local</snappy.prefix>
<snappy.lib>${snappy.prefix}/lib</snappy.lib>
<bundle.snappy>false</bundle.snappy>
<kdc.resource.dir>src/test/resources/kdc</kdc.resource.dir>
<hadoop.component>common</hadoop.component>
<is.hadoop.component>true</is.hadoop.component>
@ -44,7 +41,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<scope>provided</scope>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
@ -533,10 +530,10 @@
<activeByDefault>false</activeByDefault>
</activation>
<properties>
<snappy.prefix>/usr/local</snappy.prefix>
<snappy.lib>${snappy.prefix}/lib</snappy.lib>
<snappy.include>${snappy.prefix}/include</snappy.include>
<runas.home></runas.home>
<snappy.prefix></snappy.prefix>
<snappy.lib></snappy.lib>
<snappy.include></snappy.include>
<require.snappy>false</require.snappy>
</properties>
<build>
<plugins>
@ -579,9 +576,7 @@
<configuration>
<target>
<exec executable="cmake" dir="${project.build.directory}/native" failonerror="true">
<arg line="${basedir}/src/ -DGENERATED_JAVAH=${project.build.directory}/native/javah -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model}"/>
<env key="CFLAGS" value="-I${snappy.include}"/>
<env key="LDFLAGS" value="-L${snappy.lib}"/>
<arg line="${basedir}/src/ -DGENERATED_JAVAH=${project.build.directory}/native/javah -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model} -DREQUIRE_SNAPPY=${require.snappy} -DCUSTOM_SNAPPY_PREFIX=${snappy.prefix} -DCUSTOM_SNAPPY_LIB=${snappy.lib} -DCUSTOM_SNAPPY_INCLUDE=${snappy.include}"/>
</exec>
<exec executable="make" dir="${project.build.directory}/native" failonerror="true">
<arg line="VERBOSE=1"/>

View File

@ -21,18 +21,7 @@ cmake_minimum_required(VERSION 2.6 FATAL_ERROR)
# Default to release builds
set(CMAKE_BUILD_TYPE, Release)
# If JVM_ARCH_DATA_MODEL is 32, compile all binaries as 32-bit.
# This variable is set by maven.
if (JVM_ARCH_DATA_MODEL EQUAL 32)
# force 32-bit code generation on amd64/x86_64, ppc64, sparc64
if (CMAKE_COMPILER_IS_GNUCC AND CMAKE_SYSTEM_PROCESSOR MATCHES ".*64")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -m32")
set(CMAKE_LD_FLAGS "${CMAKE_LD_FLAGS} -m32")
endif ()
if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64")
set(CMAKE_SYSTEM_PROCESSOR "i686")
endif ()
endif (JVM_ARCH_DATA_MODEL EQUAL 32)
include(JNIFlags.cmake NO_POLICY_SCOPE)
# Compile a library with both shared and static variants
function(add_dual_library LIBNAME)
@ -79,17 +68,26 @@ INCLUDE(CheckCSourceCompiles)
CHECK_FUNCTION_EXISTS(sync_file_range HAVE_SYNC_FILE_RANGE)
CHECK_FUNCTION_EXISTS(posix_fadvise HAVE_POSIX_FADVISE)
find_library(SNAPPY_LIBRARY NAMES snappy PATHS)
find_path(SNAPPY_INCLUDE_DIR NAMES snappy.h PATHS)
if (SNAPPY_LIBRARY)
find_library(SNAPPY_LIBRARY
NAMES snappy
PATHS ${CUSTOM_SNAPPY_PREFIX} ${CUSTOM_SNAPPY_PREFIX}/lib
${CUSTOM_SNAPPY_PREFIX}/lib64 ${CUSTOM_SNAPPY_LIB})
find_path(SNAPPY_INCLUDE_DIR
NAMES snappy.h
PATHS ${CUSTOM_SNAPPY_PREFIX} ${CUSTOM_SNAPPY_PREFIX}/include
${CUSTOM_SNAPPY_INCLUDE})
if (SNAPPY_LIBRARY AND SNAPPY_INCLUDE_DIR)
GET_FILENAME_COMPONENT(HADOOP_SNAPPY_LIBRARY ${SNAPPY_LIBRARY} NAME)
set(SNAPPY_SOURCE_FILES
"${D}/io/compress/snappy/SnappyCompressor.c"
"${D}/io/compress/snappy/SnappyDecompressor.c")
else (${SNAPPY_LIBRARY})
else (SNAPPY_LIBRARY AND SNAPPY_INCLUDE_DIR)
set(SNAPPY_INCLUDE_DIR "")
set(SNAPPY_SOURCE_FILES "")
endif (SNAPPY_LIBRARY)
IF(REQUIRE_SNAPPY)
MESSAGE(FATAL_ERROR "Required snappy library could not be found. SNAPPY_LIBRARY=${SNAPPY_LIBRARY}, SNAPPY_INCLUDE_DIR=${SNAPPY_INCLUDE_DIR}, CUSTOM_SNAPPY_INCLUDE_DIR=${CUSTOM_SNAPPY_INCLUDE_DIR}, CUSTOM_SNAPPY_PREFIX=${CUSTOM_SNAPPY_PREFIX}, CUSTOM_SNAPPY_INCLUDE=${CUSTOM_SNAPPY_INCLUDE}")
ENDIF(REQUIRE_SNAPPY)
endif (SNAPPY_LIBRARY AND SNAPPY_INCLUDE_DIR)
include_directories(
${GENERATED_JAVAH}

View File

@ -0,0 +1,65 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
cmake_minimum_required(VERSION 2.6 FATAL_ERROR)
find_package(JNI REQUIRED)
# If JVM_ARCH_DATA_MODEL is 32, compile all binaries as 32-bit.
# This variable is set by maven.
if (JVM_ARCH_DATA_MODEL EQUAL 32)
# Force 32-bit code generation on amd64/x86_64, ppc64, sparc64
if (CMAKE_COMPILER_IS_GNUCC AND CMAKE_SYSTEM_PROCESSOR MATCHES ".*64")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -m32")
set(CMAKE_LD_FLAGS "${CMAKE_LD_FLAGS} -m32")
endif ()
if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64")
set(CMAKE_SYSTEM_PROCESSOR "i686")
endif ()
endif (JVM_ARCH_DATA_MODEL EQUAL 32)
# Determine float ABI of JVM on ARM Linux
if (CMAKE_SYSTEM_PROCESSOR MATCHES "^arm" AND CMAKE_SYSTEM_NAME STREQUAL "Linux")
find_program(READELF readelf)
if (READELF MATCHES "NOTFOUND")
message(WARNING "readelf not found; JVM float ABI detection disabled")
else (READELF MATCHES "NOTFOUND")
execute_process(
COMMAND ${READELF} -A ${JAVA_JVM_LIBRARY}
OUTPUT_VARIABLE JVM_ELF_ARCH
ERROR_QUIET)
if (NOT JVM_ELF_ARCH MATCHES "Tag_ABI_VFP_args: VFP registers")
message("Soft-float JVM detected")
# Test compilation with -mfloat-abi=softfp using an arbitrary libc function
# (typically fails with "fatal error: bits/predefs.h: No such file or directory"
# if soft-float dev libraries are not installed)
include(CMakePushCheckState)
cmake_push_check_state()
set(CMAKE_REQUIRED_FLAGS "${CMAKE_REQUIRED_FLAGS} -mfloat-abi=softfp")
include(CheckSymbolExists)
check_symbol_exists(exit stdlib.h SOFTFP_AVAILABLE)
if (NOT SOFTFP_AVAILABLE)
message(FATAL_ERROR "Soft-float dev libraries required (e.g. 'apt-get install libc6-dev-armel' on Debian/Ubuntu)")
endif (NOT SOFTFP_AVAILABLE)
cmake_pop_check_state()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mfloat-abi=softfp")
endif ()
endif (READELF MATCHES "NOTFOUND")
endif (CMAKE_SYSTEM_PROCESSOR MATCHES "^arm" AND CMAKE_SYSTEM_NAME STREQUAL "Linux")

View File

@ -42,8 +42,8 @@ HADOOP_COMMON_LIB_JARS_DIR=${HADOOP_COMMON_LIB_JARS_DIR:-"share/hadoop/common/li
HADOOP_COMMON_LIB_NATIVE_DIR=${HADOOP_COMMON_LIB_NATIVE_DIR:-"lib/native"}
HDFS_DIR=${HDFS_DIR:-"share/hadoop/hdfs"}
HDFS_LIB_JARS_DIR=${HDFS_LIB_JARS_DIR:-"share/hadoop/hdfs/lib"}
YARN_DIR=${YARN_DIR:-"share/hadoop/mapreduce"}
YARN_LIB_JARS_DIR=${YARN_LIB_JARS_DIR:-"share/hadoop/mapreduce/lib"}
YARN_DIR=${YARN_DIR:-"share/hadoop/yarn"}
YARN_LIB_JARS_DIR=${YARN_LIB_JARS_DIR:-"share/hadoop/yarn/lib"}
MAPRED_DIR=${MAPRED_DIR:-"share/hadoop/mapreduce"}
MAPRED_LIB_JARS_DIR=${MAPRED_LIB_JARS_DIR:-"share/hadoop/mapreduce/lib"}

View File

@ -620,7 +620,8 @@ public FsServerDefaults getServerDefaults() throws IOException {
conf.getInt("io.bytes.per.checksum", 512),
64 * 1024,
getDefaultReplication(),
conf.getInt("io.file.buffer.size", 4096));
conf.getInt("io.file.buffer.size", 4096),
false);
}
/**

View File

@ -376,7 +376,7 @@ public void setVerifyChecksum(boolean verifyChecksum) {
@Override
public void setWriteChecksum(boolean writeChecksum) {
fs.setVerifyChecksum(writeChecksum);
fs.setWriteChecksum(writeChecksum);
}
@Override

View File

@ -48,17 +48,20 @@ public Writable newInstance() {
private int writePacketSize;
private short replication;
private int fileBufferSize;
private boolean encryptDataTransfer;
public FsServerDefaults() {
}
public FsServerDefaults(long blockSize, int bytesPerChecksum,
int writePacketSize, short replication, int fileBufferSize) {
int writePacketSize, short replication, int fileBufferSize,
boolean encryptDataTransfer) {
this.blockSize = blockSize;
this.bytesPerChecksum = bytesPerChecksum;
this.writePacketSize = writePacketSize;
this.replication = replication;
this.fileBufferSize = fileBufferSize;
this.encryptDataTransfer = encryptDataTransfer;
}
public long getBlockSize() {
@ -80,6 +83,10 @@ public short getReplication() {
public int getFileBufferSize() {
return fileBufferSize;
}
public boolean getEncryptDataTransfer() {
return encryptDataTransfer;
}
// /////////////////////////////////////////
// Writable

View File

@ -44,6 +44,7 @@ public class FtpConfigKeys extends CommonConfigurationKeys {
public static final String CLIENT_WRITE_PACKET_SIZE_KEY =
"ftp.client-write-packet-size";
public static final int CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
public static final boolean ENCRYPT_DATA_TRANSFER_DEFAULT = false;
protected static FsServerDefaults getServerDefaults() throws IOException {
return new FsServerDefaults(
@ -51,7 +52,8 @@ protected static FsServerDefaults getServerDefaults() throws IOException {
BYTES_PER_CHECKSUM_DEFAULT,
CLIENT_WRITE_PACKET_SIZE_DEFAULT,
REPLICATION_DEFAULT,
STREAM_BUFFER_SIZE_DEFAULT);
STREAM_BUFFER_SIZE_DEFAULT,
ENCRYPT_DATA_TRANSFER_DEFAULT);
}
}

View File

@ -43,6 +43,7 @@ public class LocalConfigKeys extends CommonConfigurationKeys {
public static final String CLIENT_WRITE_PACKET_SIZE_KEY =
"file.client-write-packet-size";
public static final int CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
public static final boolean ENCRYPT_DATA_TRANSFER_DEFAULT = false;
public static FsServerDefaults getServerDefaults() throws IOException {
return new FsServerDefaults(
@ -50,7 +51,8 @@ public static FsServerDefaults getServerDefaults() throws IOException {
BYTES_PER_CHECKSUM_DEFAULT,
CLIENT_WRITE_PACKET_SIZE_DEFAULT,
REPLICATION_DEFAULT,
STREAM_BUFFER_SIZE_DEFAULT);
STREAM_BUFFER_SIZE_DEFAULT,
ENCRYPT_DATA_TRANSFER_DEFAULT);
}
}

View File

@ -24,6 +24,8 @@
import java.util.LinkedList;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.shell.PathExceptions.PathExistsException;
import org.apache.hadoop.fs.shell.PathExceptions.PathIOException;
@ -232,31 +234,65 @@ protected void copyStreamToTarget(InputStream in, PathData target)
if (target.exists && (target.stat.isDirectory() || !overwrite)) {
throw new PathExistsException(target.toString());
}
target.fs.setWriteChecksum(writeChecksum);
PathData tempFile = null;
TargetFileSystem targetFs = new TargetFileSystem(target.fs);
try {
tempFile = target.createTempFile(target+"._COPYING_");
FSDataOutputStream out = target.fs.create(tempFile.path, true);
IOUtils.copyBytes(in, out, getConf(), true);
PathData tempTarget = target.suffix("._COPYING_");
targetFs.setWriteChecksum(writeChecksum);
targetFs.writeStreamToFile(in, tempTarget);
targetFs.rename(tempTarget, target);
} finally {
targetFs.close(); // last ditch effort to ensure temp file is removed
}
}
// Helper filter filesystem that registers created files as temp files to
// be deleted on exit unless successfully renamed
private static class TargetFileSystem extends FilterFileSystem {
TargetFileSystem(FileSystem fs) {
super(fs);
}
void writeStreamToFile(InputStream in, PathData target) throws IOException {
FSDataOutputStream out = null;
try {
out = create(target);
IOUtils.copyBytes(in, out, getConf(), true);
} finally {
IOUtils.closeStream(out); // just in case copyBytes didn't
}
}
// tag created files as temp files
FSDataOutputStream create(PathData item) throws IOException {
try {
return create(item.path, true);
} finally { // might have been created but stream was interrupted
deleteOnExit(item.path);
}
}
void rename(PathData src, PathData target) throws IOException {
// the rename method with an option to delete the target is deprecated
if (target.exists && !target.fs.delete(target.path, false)) {
if (target.exists && !delete(target.path, false)) {
// too bad we don't know why it failed
PathIOException e = new PathIOException(target.toString());
e.setOperation("delete");
throw e;
}
if (!tempFile.fs.rename(tempFile.path, target.path)) {
if (!rename(src.path, target.path)) {
// too bad we don't know why it failed
PathIOException e = new PathIOException(tempFile.toString());
PathIOException e = new PathIOException(src.toString());
e.setOperation("rename");
e.setTargetPath(target.toString());
throw e;
}
tempFile = null;
} finally {
if (tempFile != null) {
tempFile.fs.delete(tempFile.path, false);
}
// cancel delete on exit if rename is successful
cancelDeleteOnExit(src.path);
}
@Override
public void close() {
// purge any remaining temp files, but don't close underlying fs
processDeleteOnExit();
}
}
}

View File

@ -60,7 +60,7 @@ public class PathData implements Comparable<PathData> {
* @throws IOException if anything goes wrong...
*/
public PathData(String pathString, Configuration conf) throws IOException {
this(FileSystem.get(URI.create(pathString), conf), pathString);
this(FileSystem.get(stringToUri(pathString), conf), pathString);
}
/**
@ -170,16 +170,13 @@ private void checkIfExists(FileTypeRequirement typeRequirement)
}
/**
* Returns a temporary file for this PathData with the given extension.
* The file will be deleted on exit.
* @param extension for the temporary file
* Returns a new PathData with the given extension.
* @param extension for the suffix
* @return PathData
* @throws IOException shouldn't happen
*/
public PathData createTempFile(String extension) throws IOException {
PathData tmpFile = new PathData(fs, uri+"._COPYING_");
fs.deleteOnExit(tmpFile.path);
return tmpFile;
public PathData suffix(String extension) throws IOException {
return new PathData(fs, this+extension);
}
/**

View File

@ -22,6 +22,8 @@
import java.io.EOFException;
import java.io.InputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
@ -42,7 +44,7 @@
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Evolving
public class SaslInputStream extends InputStream {
public class SaslInputStream extends InputStream implements ReadableByteChannel {
public static final Log LOG = LogFactory.getLog(SaslInputStream.class);
private final DataInputStream inStream;
@ -65,6 +67,8 @@ public class SaslInputStream extends InputStream {
private int ostart = 0;
// position of the last "new" byte
private int ofinish = 0;
// whether or not this stream is open
private boolean isOpen = true;
private static int unsignedBytesToInt(byte[] buf) {
if (buf.length != 4) {
@ -330,6 +334,7 @@ public void close() throws IOException {
ostart = 0;
ofinish = 0;
inStream.close();
isOpen = false;
}
/**
@ -342,4 +347,28 @@ public void close() throws IOException {
public boolean markSupported() {
return false;
}
@Override
public boolean isOpen() {
return isOpen;
}
@Override
public int read(ByteBuffer dst) throws IOException {
int bytesRead = 0;
if (dst.hasArray()) {
bytesRead = read(dst.array(), dst.arrayOffset() + dst.position(),
dst.remaining());
if (bytesRead > -1) {
dst.position(dst.position() + bytesRead);
}
} else {
byte[] buf = new byte[dst.remaining()];
bytesRead = read(buf);
if (bytesRead > -1) {
dst.put(buf, 0, bytesRead);
}
}
return bytesRead;
}
}

View File

@ -20,14 +20,17 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
import org.apache.hadoop.util.ReflectionUtils;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLServerSocketFactory;
import javax.net.ssl.SSLSocketFactory;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.security.GeneralSecurityException;
/**
@ -42,7 +45,7 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class SSLFactory {
public class SSLFactory implements ConnectionConfigurator {
@InterfaceAudience.Private
public static enum Mode { CLIENT, SERVER }
@ -234,4 +237,29 @@ public boolean isClientCertRequired() {
return requireClientCert;
}
/**
* If the given {@link HttpURLConnection} is an {@link HttpsURLConnection}
* configures the connection with the {@link SSLSocketFactory} and
* {@link HostnameVerifier} of this SSLFactory, otherwise does nothing.
*
* @param conn the {@link HttpURLConnection} instance to configure.
* @return the configured {@link HttpURLConnection} instance.
*
* @throws IOException if an IO error occurred.
*/
@Override
public HttpURLConnection configure(HttpURLConnection conn)
throws IOException {
if (conn instanceof HttpsURLConnection) {
HttpsURLConnection sslConn = (HttpsURLConnection) conn;
try {
sslConn.setSSLSocketFactory(createSSLSocketFactory());
} catch (GeneralSecurityException ex) {
throw new IOException(ex);
}
sslConn.setHostnameVerifier(getHostnameVerifier());
conn = sslConn;
}
return conn;
}
}

View File

@ -196,7 +196,9 @@ Deprecated Properties
*---+---+
|mapred.compress.map.output | mapreduce.map.output.compress
*---+---+
|mapred.create.symlink | mapreduce.job.cache.symlink.create
|mapred.create.symlink | NONE - symlinking is always on
*---+---+
|mapreduce.job.cache.symlink.create | NONE - symlinking is always on
*---+---+
|mapred.data.field.separator | mapreduce.fieldsel.data.field.separator
*---+---+

View File

@ -286,6 +286,30 @@ public void testInitFilterLocalFsSetsEmbedConf() throws Exception {
checkFsConf(flfs, conf, 3);
}
@Test
public void testVerifyChecksumPassthru() {
FileSystem mockFs = mock(FileSystem.class);
FileSystem fs = new FilterFileSystem(mockFs);
fs.setVerifyChecksum(false);
verify(mockFs).setVerifyChecksum(eq(false));
reset(mockFs);
fs.setVerifyChecksum(true);
verify(mockFs).setVerifyChecksum(eq(true));
}
@Test
public void testWriteChecksumPassthru() {
FileSystem mockFs = mock(FileSystem.class);
FileSystem fs = new FilterFileSystem(mockFs);
fs.setWriteChecksum(false);
verify(mockFs).setWriteChecksum(eq(false));
reset(mockFs);
fs.setWriteChecksum(true);
verify(mockFs).setWriteChecksum(eq(true));
}
private void checkInit(FilterFileSystem fs, boolean expectInit)
throws Exception {
URI uri = URI.create("filter:/");

View File

@ -0,0 +1,188 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.shell;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.shell.CopyCommands.Put;
import org.apache.hadoop.util.Progressable;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.stubbing.OngoingStubbing;
public class TestCopy {
static Configuration conf;
static Path path = new Path("mockfs:/file");
static Path tmpPath = new Path("mockfs:/file._COPYING_");
static Put cmd;
static FileSystem mockFs;
static PathData target;
static FileStatus fileStat;
@BeforeClass
public static void setup() throws IOException {
conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
mockFs = mock(FileSystem.class);
fileStat = mock(FileStatus.class);
when(fileStat.isDirectory()).thenReturn(false);
}
@Before
public void resetMock() throws IOException {
reset(mockFs);
target = new PathData(path.toString(), conf);
cmd = new CopyCommands.Put();
cmd.setConf(conf);
}
@Test
public void testCopyStreamTarget() throws Exception {
FSDataOutputStream out = mock(FSDataOutputStream.class);
whenFsCreate().thenReturn(out);
when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
when(mockFs.rename(eq(tmpPath), eq(path))).thenReturn(true);
FSInputStream in = mock(FSInputStream.class);
when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
tryCopyStream(in, true);
verify(mockFs, never()).delete(eq(path), anyBoolean());
verify(mockFs).rename(eq(tmpPath), eq(path));
verify(mockFs, never()).delete(eq(tmpPath), anyBoolean());
verify(mockFs, never()).close();
}
@Test
public void testCopyStreamTargetExists() throws Exception {
FSDataOutputStream out = mock(FSDataOutputStream.class);
whenFsCreate().thenReturn(out);
when(mockFs.getFileStatus(eq(path))).thenReturn(fileStat);
target.refreshStatus(); // so it's updated as existing
cmd.setOverwrite(true);
when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
when(mockFs.delete(eq(path), eq(false))).thenReturn(true);
when(mockFs.rename(eq(tmpPath), eq(path))).thenReturn(true);
FSInputStream in = mock(FSInputStream.class);
when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
tryCopyStream(in, true);
verify(mockFs).delete(eq(path), anyBoolean());
verify(mockFs).rename(eq(tmpPath), eq(path));
verify(mockFs, never()).delete(eq(tmpPath), anyBoolean());
verify(mockFs, never()).close();
}
@Test
public void testInterruptedCreate() throws Exception {
whenFsCreate().thenThrow(new InterruptedIOException());
when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
FSDataInputStream in = mock(FSDataInputStream.class);
tryCopyStream(in, false);
verify(mockFs).delete(eq(tmpPath), anyBoolean());
verify(mockFs, never()).rename(any(Path.class), any(Path.class));
verify(mockFs, never()).delete(eq(path), anyBoolean());
verify(mockFs, never()).close();
}
@Test
public void testInterruptedCopyBytes() throws Exception {
FSDataOutputStream out = mock(FSDataOutputStream.class);
whenFsCreate().thenReturn(out);
when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
FSInputStream in = mock(FSInputStream.class);
// make IOUtils.copyBytes fail
when(in.read(any(byte[].class), anyInt(), anyInt())).thenThrow(
new InterruptedIOException());
tryCopyStream(in, false);
verify(mockFs).delete(eq(tmpPath), anyBoolean());
verify(mockFs, never()).rename(any(Path.class), any(Path.class));
verify(mockFs, never()).delete(eq(path), anyBoolean());
verify(mockFs, never()).close();
}
@Test
public void testInterruptedRename() throws Exception {
FSDataOutputStream out = mock(FSDataOutputStream.class);
whenFsCreate().thenReturn(out);
when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
when(mockFs.rename(eq(tmpPath), eq(path))).thenThrow(
new InterruptedIOException());
FSInputStream in = mock(FSInputStream.class);
when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
tryCopyStream(in, false);
verify(mockFs).delete(eq(tmpPath), anyBoolean());
verify(mockFs).rename(eq(tmpPath), eq(path));
verify(mockFs, never()).delete(eq(path), anyBoolean());
verify(mockFs, never()).close();
}
private OngoingStubbing<FSDataOutputStream> whenFsCreate() throws IOException {
return when(mockFs.create(eq(tmpPath), any(FsPermission.class),
anyBoolean(), anyInt(), anyShort(), anyLong(),
any(Progressable.class)));
}
private void tryCopyStream(InputStream in, boolean shouldPass) {
try {
cmd.copyStreamToTarget(new FSDataInputStream(in), target);
} catch (InterruptedIOException e) {
assertFalse("copy failed", shouldPass);
} catch (Throwable e) {
assertFalse(e.getMessage(), shouldPass);
}
}
static class MockFileSystem extends FilterFileSystem {
Configuration conf;
MockFileSystem() {
super(mockFs);
}
@Override
public void initialize(URI uri, Configuration conf) {
this.conf = conf;
}
@Override
public Path makeQualified(Path path) {
return path;
}
@Override
public Configuration getConf() {
return conf;
}
}
}

View File

@ -25,6 +25,7 @@
import org.junit.BeforeClass;
import org.junit.Test;
import javax.net.ssl.HttpsURLConnection;
import java.io.File;
import java.net.URL;
import java.security.GeneralSecurityException;
@ -161,4 +162,23 @@ public void invalidHostnameVerifier() throws Exception {
}
}
@Test
public void testConnectionConfigurator() throws Exception {
Configuration conf = createConfiguration(false);
conf.set(SSLFactory.SSL_HOSTNAME_VERIFIER_KEY, "STRICT_IE6");
SSLFactory sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
try {
sslFactory.init();
HttpsURLConnection sslConn =
(HttpsURLConnection) new URL("https://foo").openConnection();
Assert.assertNotSame("STRICT_IE6",
sslConn.getHostnameVerifier().toString());
sslFactory.configure(sslConn);
Assert.assertEquals("STRICT_IE6",
sslConn.getHostnameVerifier().toString());
} finally {
sslFactory.destroy();
}
}
}

View File

@ -120,6 +120,7 @@
run cp -r $ROOT/hadoop-common-project/hadoop-common/target/hadoop-common-${project.version}/* .
run cp -r $ROOT/hadoop-hdfs-project/hadoop-hdfs/target/hadoop-hdfs-${project.version}/* .
run cp -r $ROOT/hadoop-hdfs-project/hadoop-hdfs-httpfs/target/hadoop-hdfs-httpfs-${project.version}/* .
run cp -r $ROOT/hadoop-yarn-project/target/hadoop-yarn-project-${project.version}/* .
run cp -r $ROOT/hadoop-mapreduce-project/target/hadoop-mapreduce-${project.version}/* .
run cp -r $ROOT/hadoop-tools/hadoop-tools-dist/target/hadoop-tools-dist-${project.version}/* .
echo

View File

@ -30,4 +30,9 @@
<Field name="authority" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<Match>
<Class name="org.apache.hadoop.lib.service.hadoop.FileSystemAccessService" />
<Method name="closeFileSystem" />
<Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE" />
</Match>
</FindBugsFilter>

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.fs.http.client;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.DelegationTokenRenewer;
@ -68,6 +69,7 @@
* <p/>
* This implementation allows a user to access HDFS over HTTP via a HttpFSServer server.
*/
@InterfaceAudience.Private
public class HttpFSFileSystem extends FileSystem
implements DelegationTokenRenewer.Renewable {
@ -160,7 +162,8 @@ public static FILE_TYPE getType(FileStatus fileStatus) {
private static final String HTTP_POST = "POST";
private static final String HTTP_DELETE = "DELETE";
public enum Operation {
@InterfaceAudience.Private
public static enum Operation {
OPEN(HTTP_GET), GETFILESTATUS(HTTP_GET), LISTSTATUS(HTTP_GET),
GETHOMEDIRECTORY(HTTP_GET), GETCONTENTSUMMARY(HTTP_GET),
GETFILECHECKSUM(HTTP_GET), GETFILEBLOCKLOCATIONS(HTTP_GET),

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.http.client;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
@ -43,6 +44,7 @@
* A <code>KerberosAuthenticator</code> subclass that fallback to
* {@link HttpFSPseudoAuthenticator}.
*/
@InterfaceAudience.Private
public class HttpFSKerberosAuthenticator extends KerberosAuthenticator {
/**
@ -71,6 +73,7 @@ protected Authenticator getFallBackAuthenticator() {
/**
* DelegationToken operations.
*/
@InterfaceAudience.Private
public static enum DelegationTokenOperation {
GETDELEGATIONTOKEN(HTTP_GET, true),
GETDELEGATIONTOKENS(HTTP_GET, true),

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.http.client;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
@ -27,6 +28,7 @@
* A <code>PseudoAuthenticator</code> subclass that uses FileSystemAccess's
* <code>UserGroupInformation</code> to obtain the client user name (the UGI's login user).
*/
@InterfaceAudience.Private
public class HttpFSPseudoAuthenticator extends PseudoAuthenticator {
/**

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.fs.http.client;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
@ -35,6 +36,7 @@
/**
* Utility methods used by HttpFS classes.
*/
@InterfaceAudience.Private
public class HttpFSUtils {
public static final String SERVICE_NAME = "/webhdfs";

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.fs.http.server;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.http.client.HttpFSFileSystem;
import javax.servlet.Filter;
@ -37,6 +38,7 @@
* Filter that Enforces the content-type to be application/octet-stream for
* POST and PUT requests.
*/
@InterfaceAudience.Private
public class CheckUploadContentTypeFilter implements Filter {
private static final Set<String> UPLOAD_OPERATIONS = new HashSet<String>();

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.fs.http.server;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
@ -40,6 +41,7 @@
/**
* FileSystem operation executors used by {@link HttpFSServer}.
*/
@InterfaceAudience.Private
public class FSOperations {
@SuppressWarnings({"unchecked", "deprecation"})
@ -160,6 +162,7 @@ private static JSONObject toJSON(String name, Object value) {
/**
* Executor that performs an append FileSystemAccess files system operation.
*/
@InterfaceAudience.Private
public static class FSAppend implements FileSystemAccess.FileSystemExecutor<Void> {
private InputStream is;
private Path path;
@ -198,6 +201,7 @@ public Void execute(FileSystem fs) throws IOException {
/**
* Executor that performs a content-summary FileSystemAccess files system operation.
*/
@InterfaceAudience.Private
public static class FSContentSummary implements FileSystemAccess.FileSystemExecutor<Map> {
private Path path;
@ -230,6 +234,7 @@ public Map execute(FileSystem fs) throws IOException {
/**
* Executor that performs a create FileSystemAccess files system operation.
*/
@InterfaceAudience.Private
public static class FSCreate implements FileSystemAccess.FileSystemExecutor<Void> {
private InputStream is;
private Path path;
@ -288,6 +293,7 @@ public Void execute(FileSystem fs) throws IOException {
/**
* Executor that performs a delete FileSystemAccess files system operation.
*/
@InterfaceAudience.Private
public static class FSDelete implements FileSystemAccess.FileSystemExecutor<JSONObject> {
private Path path;
private boolean recursive;
@ -324,6 +330,7 @@ public JSONObject execute(FileSystem fs) throws IOException {
/**
* Executor that performs a file-checksum FileSystemAccess files system operation.
*/
@InterfaceAudience.Private
public static class FSFileChecksum implements FileSystemAccess.FileSystemExecutor<Map> {
private Path path;
@ -356,6 +363,7 @@ public Map execute(FileSystem fs) throws IOException {
/**
* Executor that performs a file-status FileSystemAccess files system operation.
*/
@InterfaceAudience.Private
public static class FSFileStatus implements FileSystemAccess.FileSystemExecutor<Map> {
private Path path;
@ -388,6 +396,7 @@ public Map execute(FileSystem fs) throws IOException {
/**
* Executor that performs a home-dir FileSystemAccess files system operation.
*/
@InterfaceAudience.Private
public static class FSHomeDir implements FileSystemAccess.FileSystemExecutor<JSONObject> {
/**
@ -413,6 +422,7 @@ public JSONObject execute(FileSystem fs) throws IOException {
/**
* Executor that performs a list-status FileSystemAccess files system operation.
*/
@InterfaceAudience.Private
public static class FSListStatus implements FileSystemAccess.FileSystemExecutor<Map>, PathFilter {
private Path path;
private PathFilter filter;
@ -456,6 +466,7 @@ public boolean accept(Path path) {
/**
* Executor that performs a mkdirs FileSystemAccess files system operation.
*/
@InterfaceAudience.Private
public static class FSMkdirs implements FileSystemAccess.FileSystemExecutor<JSONObject> {
private Path path;
@ -494,6 +505,7 @@ public JSONObject execute(FileSystem fs) throws IOException {
/**
* Executor that performs a open FileSystemAccess files system operation.
*/
@InterfaceAudience.Private
public static class FSOpen implements FileSystemAccess.FileSystemExecutor<InputStream> {
private Path path;
@ -526,6 +538,7 @@ public InputStream execute(FileSystem fs) throws IOException {
/**
* Executor that performs a rename FileSystemAccess files system operation.
*/
@InterfaceAudience.Private
public static class FSRename implements FileSystemAccess.FileSystemExecutor<JSONObject> {
private Path path;
private Path toPath;
@ -562,6 +575,7 @@ public JSONObject execute(FileSystem fs) throws IOException {
/**
* Executor that performs a set-owner FileSystemAccess files system operation.
*/
@InterfaceAudience.Private
public static class FSSetOwner implements FileSystemAccess.FileSystemExecutor<Void> {
private Path path;
private String owner;
@ -600,6 +614,7 @@ public Void execute(FileSystem fs) throws IOException {
/**
* Executor that performs a set-permission FileSystemAccess files system operation.
*/
@InterfaceAudience.Private
public static class FSSetPermission implements FileSystemAccess.FileSystemExecutor<Void> {
private Path path;
@ -637,6 +652,7 @@ public Void execute(FileSystem fs) throws IOException {
/**
* Executor that performs a set-replication FileSystemAccess files system operation.
*/
@InterfaceAudience.Private
public static class FSSetReplication implements FileSystemAccess.FileSystemExecutor<JSONObject> {
private Path path;
private short replication;
@ -676,6 +692,7 @@ public JSONObject execute(FileSystem fs) throws IOException {
/**
* Executor that performs a set-times FileSystemAccess files system operation.
*/
@InterfaceAudience.Private
public static class FSSetTimes implements FileSystemAccess.FileSystemExecutor<Void> {
private Path path;
private long mTime;

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.fs.http.server;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import javax.servlet.FilterConfig;
@ -30,6 +31,7 @@
* Subclass of hadoop-auth <code>AuthenticationFilter</code> that obtains its configuration
* from HttpFSServer's server configuration.
*/
@InterfaceAudience.Private
public class HttpFSAuthenticationFilter extends AuthenticationFilter {
private static final String CONF_PREFIX = "httpfs.authentication.";

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.fs.http.server;
import com.sun.jersey.api.container.ContainerException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.lib.service.FileSystemAccessException;
import org.apache.hadoop.lib.wsrs.ExceptionProvider;
import org.slf4j.Logger;
@ -35,6 +36,7 @@
* exceptions to HTTP status codes.
*/
@Provider
@InterfaceAudience.Private
public class HttpFSExceptionProvider extends ExceptionProvider {
private static Logger AUDIT_LOG = LoggerFactory.getLogger("httpfsaudit");
private static Logger LOG = LoggerFactory.getLogger(HttpFSExceptionProvider.class);

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.fs.http.server;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.http.client.HttpFSFileSystem;
import org.apache.hadoop.fs.http.client.HttpFSKerberosAuthenticator;
import org.apache.hadoop.fs.http.client.HttpFSKerberosAuthenticator.DelegationTokenOperation;
@ -52,6 +53,7 @@
* If not delegation token is present in the request it delegates to the
* {@link KerberosAuthenticationHandler}
*/
@InterfaceAudience.Private
public class HttpFSKerberosAuthenticationHandler
extends KerberosAuthenticationHandler {

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.fs.http.server;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.http.client.HttpFSFileSystem;
import org.apache.hadoop.fs.http.client.HttpFSFileSystem.Operation;
import org.apache.hadoop.lib.wsrs.BooleanParam;
@ -38,6 +39,7 @@
* HttpFS ParametersProvider.
*/
@Provider
@InterfaceAudience.Private
public class HttpFSParametersProvider extends ParametersProvider {
private static final Map<Enum, Class<Param<?>>[]> PARAMS_DEF =
@ -85,6 +87,7 @@ public HttpFSParametersProvider() {
/**
* Class for access-time parameter.
*/
@InterfaceAudience.Private
public static class AccessTimeParam extends LongParam {
/**
@ -102,6 +105,7 @@ public AccessTimeParam() {
/**
* Class for block-size parameter.
*/
@InterfaceAudience.Private
public static class BlockSizeParam extends LongParam {
/**
@ -120,6 +124,7 @@ public BlockSizeParam() {
/**
* Class for data parameter.
*/
@InterfaceAudience.Private
public static class DataParam extends BooleanParam {
/**
@ -138,6 +143,7 @@ public DataParam() {
/**
* Class for operation parameter.
*/
@InterfaceAudience.Private
public static class OperationParam extends EnumParam<HttpFSFileSystem.Operation> {
/**
@ -156,6 +162,7 @@ public OperationParam(String operation) {
/**
* Class for delete's recursive parameter.
*/
@InterfaceAudience.Private
public static class RecursiveParam extends BooleanParam {
/**
@ -174,6 +181,7 @@ public RecursiveParam() {
/**
* Class for do-as parameter.
*/
@InterfaceAudience.Private
public static class DoAsParam extends StringParam {
/**
@ -208,6 +216,7 @@ public String parseParam(String str) {
/**
* Class for filter parameter.
*/
@InterfaceAudience.Private
public static class FilterParam extends StringParam {
/**
@ -227,6 +236,7 @@ public FilterParam() {
/**
* Class for group parameter.
*/
@InterfaceAudience.Private
public static class GroupParam extends StringParam {
/**
@ -246,6 +256,7 @@ public GroupParam() {
/**
* Class for len parameter.
*/
@InterfaceAudience.Private
public static class LenParam extends LongParam {
/**
@ -264,6 +275,7 @@ public LenParam() {
/**
* Class for modified-time parameter.
*/
@InterfaceAudience.Private
public static class ModifiedTimeParam extends LongParam {
/**
@ -282,6 +294,7 @@ public ModifiedTimeParam() {
/**
* Class for offset parameter.
*/
@InterfaceAudience.Private
public static class OffsetParam extends LongParam {
/**
@ -300,6 +313,7 @@ public OffsetParam() {
/**
* Class for overwrite parameter.
*/
@InterfaceAudience.Private
public static class OverwriteParam extends BooleanParam {
/**
@ -318,6 +332,7 @@ public OverwriteParam() {
/**
* Class for owner parameter.
*/
@InterfaceAudience.Private
public static class OwnerParam extends StringParam {
/**
@ -337,6 +352,7 @@ public OwnerParam() {
/**
* Class for permission parameter.
*/
@InterfaceAudience.Private
public static class PermissionParam extends ShortParam {
/**
@ -357,6 +373,7 @@ public PermissionParam() {
/**
* Class for replication parameter.
*/
@InterfaceAudience.Private
public static class ReplicationParam extends ShortParam {
/**
@ -375,6 +392,7 @@ public ReplicationParam() {
/**
* Class for to-path parameter.
*/
@InterfaceAudience.Private
public static class DestinationParam extends StringParam {
/**

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.http.server;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.lib.service.FileSystemAccess;
import org.apache.hadoop.lib.servlet.FileSystemReleaseFilter;
@ -25,6 +26,7 @@
* Filter that releases FileSystemAccess filesystem instances upon HTTP request
* completion.
*/
@InterfaceAudience.Private
public class HttpFSReleaseFilter extends FileSystemReleaseFilter {
/**

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.http.server;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.http.client.HttpFSFileSystem;
@ -82,6 +83,7 @@
* different operations.
*/
@Path(HttpFSFileSystem.SERVICE_VERSION)
@InterfaceAudience.Private
public class HttpFSServer {
private static Logger AUDIT_LOG = LoggerFactory.getLogger("httpfsaudit");

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.http.server;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.lib.server.ServerException;
@ -39,6 +40,7 @@
* All the configuration is loaded from configuration properties prefixed
* with <code>httpfs.</code>.
*/
@InterfaceAudience.Private
public class HttpFSServerWebApp extends ServerWebApp {
private static final Logger LOG =
LoggerFactory.getLogger(HttpFSServerWebApp.class);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.lib.lang;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.lib.util.Check;
import java.util.concurrent.Callable;
@ -26,6 +27,7 @@
* Adapter class that allows <code>Runnable</code>s and <code>Callable</code>s to
* be treated as the other.
*/
@InterfaceAudience.Private
public class RunnableCallable implements Callable<Void>, Runnable {
private Runnable runnable;
private Callable<?> callable;

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.lib.lang;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.lib.util.Check;
import java.text.MessageFormat;
@ -26,6 +27,7 @@
* Generic exception that requires error codes and uses the a message
* template from the error code.
*/
@InterfaceAudience.Private
public class XException extends Exception {
/**

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.lib.server;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.lib.util.ConfigurationUtils;
@ -26,6 +27,7 @@
/**
* Convenience class implementing the {@link Service} interface.
*/
@InterfaceAudience.Private
public abstract class BaseService implements Service {
private String prefix;
private Server server;

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.lib.server;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.lib.util.Check;
import org.apache.hadoop.lib.util.ConfigurationUtils;
@ -76,6 +77,7 @@
* post-initialized (this enables late/conditional service bindings).
* <p/>
*/
@InterfaceAudience.Private
public class Server {
private Logger log;
@ -97,7 +99,8 @@ public class Server {
/**
* Enumeration that defines the server status.
*/
public enum Status {
@InterfaceAudience.Private
public static enum Status {
UNDEF(false, false),
BOOTING(false, true),
HALTED(true, true),

View File

@ -18,16 +18,19 @@
package org.apache.hadoop.lib.server;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.lib.lang.XException;
/**
* Exception thrown by the {@link Server} class.
*/
@InterfaceAudience.Private
public class ServerException extends XException {
/**
* Error codes use by the {@link Server} class.
*/
@InterfaceAudience.Private
public static enum ERROR implements XException.ERROR {
S01("Dir [{0}] does not exist"),
S02("[{0}] is not a directory"),

View File

@ -18,9 +18,12 @@
package org.apache.hadoop.lib.server;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Service interface for components to be managed by the {@link Server} class.
*/
@InterfaceAudience.Private
public interface Service {
/**

View File

@ -18,11 +18,13 @@
package org.apache.hadoop.lib.server;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.lib.lang.XException;
/**
* Exception thrown by {@link Service} implementations.
*/
@InterfaceAudience.Private
public class ServiceException extends ServerException {
/**

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.lib.service;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.http.client.HttpFSKerberosAuthenticator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
@ -24,6 +25,7 @@
/**
* HttpFS <code>DelegationTokenIdentifier</code> implementation.
*/
@InterfaceAudience.Private
public class DelegationTokenIdentifier
extends AbstractDelegationTokenIdentifier {

View File

@ -17,12 +17,14 @@
*/
package org.apache.hadoop.lib.service;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
/**
* Service interface to manage HttpFS delegation tokens.
*/
@InterfaceAudience.Private
public interface DelegationTokenManager {
/**

View File

@ -17,11 +17,13 @@
*/
package org.apache.hadoop.lib.service;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.lib.lang.XException;
/**
* Exception thrown by the {@link DelegationTokenManager} service implementation.
*/
@InterfaceAudience.Private
public class DelegationTokenManagerException extends XException {
public enum ERROR implements XException.ERROR {

View File

@ -18,11 +18,13 @@
package org.apache.hadoop.lib.service;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import java.io.IOException;
@InterfaceAudience.Private
public interface FileSystemAccess {
public interface FileSystemExecutor<T> {

View File

@ -18,8 +18,10 @@
package org.apache.hadoop.lib.service;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.lib.lang.XException;
@InterfaceAudience.Private
public class FileSystemAccessException extends XException {
public enum ERROR implements XException.ERROR {

View File

@ -18,9 +18,12 @@
package org.apache.hadoop.lib.service;
import org.apache.hadoop.classification.InterfaceAudience;
import java.io.IOException;
import java.util.List;
@InterfaceAudience.Private
public interface Groups {
public List<String> getGroups(String user) throws IOException;

View File

@ -18,8 +18,11 @@
package org.apache.hadoop.lib.service;
import org.apache.hadoop.classification.InterfaceAudience;
import java.util.Map;
@InterfaceAudience.Private
public interface Instrumentation {
public interface Cron {

View File

@ -18,9 +18,12 @@
package org.apache.hadoop.lib.service;
import org.apache.hadoop.classification.InterfaceAudience;
import java.io.IOException;
import java.security.AccessControlException;
@InterfaceAudience.Private
public interface ProxyUser {
public void validate(String proxyUser, String proxyHost, String doAsUser) throws IOException, AccessControlException;

View File

@ -18,9 +18,12 @@
package org.apache.hadoop.lib.service;
import org.apache.hadoop.classification.InterfaceAudience;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
@InterfaceAudience.Private
public interface Scheduler {
public abstract void schedule(Callable<?> callable, long delay, long interval, TimeUnit unit);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.lib.service.hadoop;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
@ -27,6 +28,7 @@
import org.apache.hadoop.lib.service.FileSystemAccess;
import org.apache.hadoop.lib.service.FileSystemAccessException;
import org.apache.hadoop.lib.service.Instrumentation;
import org.apache.hadoop.lib.service.Scheduler;
import org.apache.hadoop.lib.util.Check;
import org.apache.hadoop.lib.util.ConfigurationUtils;
import org.apache.hadoop.security.UserGroupInformation;
@ -42,8 +44,11 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@InterfaceAudience.Private
public class FileSystemAccessService extends BaseService implements FileSystemAccess {
private static final Logger LOG = LoggerFactory.getLogger(FileSystemAccessService.class);
@ -54,6 +59,8 @@ public class FileSystemAccessService extends BaseService implements FileSystemAc
public static final String AUTHENTICATION_TYPE = "authentication.type";
public static final String KERBEROS_KEYTAB = "authentication.kerberos.keytab";
public static final String KERBEROS_PRINCIPAL = "authentication.kerberos.principal";
public static final String FS_CACHE_PURGE_FREQUENCY = "filesystem.cache.purge.frequency";
public static final String FS_CACHE_PURGE_TIMEOUT = "filesystem.cache.purge.timeout";
public static final String NAME_NODE_WHITELIST = "name.node.whitelist";
@ -63,6 +70,61 @@ public class FileSystemAccessService extends BaseService implements FileSystemAc
private static final String FILE_SYSTEM_SERVICE_CREATED = "FileSystemAccessService.created";
private static class CachedFileSystem {
private FileSystem fs;
private long lastUse;
private long timeout;
private int count;
public CachedFileSystem(long timeout) {
this.timeout = timeout;
lastUse = -1;
count = 0;
}
synchronized FileSystem getFileSytem(Configuration conf)
throws IOException {
if (fs == null) {
fs = FileSystem.get(conf);
}
lastUse = -1;
count++;
return fs;
}
synchronized void release() throws IOException {
count--;
if (count == 0) {
if (timeout == 0) {
fs.close();
fs = null;
lastUse = -1;
}
else {
lastUse = System.currentTimeMillis();
}
}
}
// to avoid race conditions in the map cache adding removing entries
// an entry in the cache remains forever, it just closes/opens filesystems
// based on their utilization. Worse case scenario, the penalty we'll
// pay is that the amount of entries in the cache will be the total
// number of users in HDFS (which seems a resonable overhead).
synchronized boolean purgeIfIdle() throws IOException {
boolean ret = false;
if (count == 0 && lastUse != -1 &&
(System.currentTimeMillis() - lastUse) > timeout) {
fs.close();
fs = null;
lastUse = -1;
ret = true;
}
return ret;
}
}
public FileSystemAccessService() {
super(PREFIX);
}
@ -73,6 +135,11 @@ public FileSystemAccessService() {
private AtomicInteger unmanagedFileSystems = new AtomicInteger();
private ConcurrentHashMap<String, CachedFileSystem> fsCache =
new ConcurrentHashMap<String, CachedFileSystem>();
private long purgeTimeout;
@Override
protected void init() throws ServiceException {
LOG.info("Using FileSystemAccess JARs version [{}]", VersionInfo.getVersion());
@ -157,6 +224,30 @@ public Long getValue() {
return (long) unmanagedFileSystems.get();
}
});
Scheduler scheduler = getServer().get(Scheduler.class);
int purgeInterval = getServiceConfig().getInt(FS_CACHE_PURGE_FREQUENCY, 60);
purgeTimeout = getServiceConfig().getLong(FS_CACHE_PURGE_TIMEOUT, 60);
purgeTimeout = (purgeTimeout > 0) ? purgeTimeout : 0;
if (purgeTimeout > 0) {
scheduler.schedule(new FileSystemCachePurger(),
purgeInterval, purgeInterval, TimeUnit.SECONDS);
}
}
private class FileSystemCachePurger implements Runnable {
@Override
public void run() {
int count = 0;
for (CachedFileSystem cacheFs : fsCache.values()) {
try {
count += cacheFs.purgeIfIdle() ? 1 : 0;
} catch (Throwable ex) {
LOG.warn("Error while purging filesystem, " + ex.toString(), ex);
}
}
LOG.debug("Purged [{}} filesystem instances", count);
}
}
private Set<String> toLowerCase(Collection<String> collection) {
@ -174,7 +265,7 @@ public Class getInterface() {
@Override
public Class[] getServiceDependencies() {
return new Class[]{Instrumentation.class};
return new Class[]{Instrumentation.class, Scheduler.class};
}
protected UserGroupInformation getUGI(String user) throws IOException {
@ -185,12 +276,25 @@ protected void setRequiredServiceHadoopConf(Configuration conf) {
conf.set("fs.hdfs.impl.disable.cache", "true");
}
protected FileSystem createFileSystem(Configuration namenodeConf) throws IOException {
return FileSystem.get(namenodeConf);
private static final String HTTPFS_FS_USER = "httpfs.fs.user";
protected FileSystem createFileSystem(Configuration namenodeConf)
throws IOException {
String user = UserGroupInformation.getCurrentUser().getShortUserName();
CachedFileSystem newCachedFS = new CachedFileSystem(purgeTimeout);
CachedFileSystem cachedFS = fsCache.putIfAbsent(user, newCachedFS);
if (cachedFS == null) {
cachedFS = newCachedFS;
}
Configuration conf = new Configuration(namenodeConf);
conf.set(HTTPFS_FS_USER, user);
return cachedFS.getFileSytem(conf);
}
protected void closeFileSystem(FileSystem fs) throws IOException {
fs.close();
if (fsCache.containsKey(fs.getConf().get(HTTPFS_FS_USER))) {
fsCache.get(fs.getConf().get(HTTPFS_FS_USER)).release();
}
}
protected void validateNamenode(String namenode) throws FileSystemAccessException {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.lib.service.instrumentation;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.lib.server.BaseService;
import org.apache.hadoop.lib.server.ServiceException;
import org.apache.hadoop.lib.service.Instrumentation;
@ -39,6 +40,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@InterfaceAudience.Private
public class InstrumentationService extends BaseService implements Instrumentation {
public static final String PREFIX = "instrumentation";
public static final String CONF_TIMERS_SIZE = "timers.size";

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.lib.service.scheduler;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.lib.lang.RunnableCallable;
import org.apache.hadoop.lib.server.BaseService;
import org.apache.hadoop.lib.server.Server;
@ -35,6 +36,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@InterfaceAudience.Private
public class SchedulerService extends BaseService implements Scheduler {
private static final Logger LOG = LoggerFactory.getLogger(SchedulerService.class);

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.lib.service.security;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.http.server.HttpFSServerWebApp;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.lib.server.BaseService;
@ -37,6 +38,7 @@
/**
* DelegationTokenManager service implementation.
*/
@InterfaceAudience.Private
public class DelegationTokenManagerService extends BaseService
implements DelegationTokenManager {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.lib.service.security;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.lib.server.BaseService;
import org.apache.hadoop.lib.server.ServiceException;
@ -27,6 +28,7 @@
import java.io.IOException;
import java.util.List;
@InterfaceAudience.Private
public class GroupsService extends BaseService implements Groups {
private static final String PREFIX = "groups";

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.lib.service.security;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.lib.lang.XException;
import org.apache.hadoop.lib.server.BaseService;
import org.apache.hadoop.lib.server.ServiceException;
@ -38,10 +39,12 @@
import java.util.Map;
import java.util.Set;
@InterfaceAudience.Private
public class ProxyUserService extends BaseService implements ProxyUser {
private static Logger LOG = LoggerFactory.getLogger(ProxyUserService.class);
public enum ERROR implements XException.ERROR {
@InterfaceAudience.Private
public static enum ERROR implements XException.ERROR {
PRXU01("Could not normalize host name [{0}], {1}"),
PRXU02("Missing [{0}] property");

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.lib.servlet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.lib.service.FileSystemAccess;
@ -37,6 +38,7 @@
* is streaming out HDFS data and the corresponding filesystem
* instance have to be closed after the streaming completes.
*/
@InterfaceAudience.Private
public abstract class FileSystemReleaseFilter implements Filter {
private static final ThreadLocal<FileSystem> FILE_SYSTEM_TL = new ThreadLocal<FileSystem>();

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.lib.servlet;
import org.apache.hadoop.classification.InterfaceAudience;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
@ -31,6 +33,7 @@
/**
* Filter that resolves the requester hostname.
*/
@InterfaceAudience.Private
public class HostnameFilter implements Filter {
static final ThreadLocal<String> HOSTNAME_TL = new ThreadLocal<String>();

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.lib.servlet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.slf4j.MDC;
import javax.servlet.Filter;
@ -42,6 +43,7 @@
* <li>path: the path of the request URL</li>
* </ul>
*/
@InterfaceAudience.Private
public class MDCFilter implements Filter {
/**

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.lib.servlet;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.lib.server.Server;
import org.apache.hadoop.lib.server.ServerException;
@ -34,6 +35,7 @@
* {@link Server} subclass that implements <code>ServletContextListener</code>
* and uses its lifecycle to start and stop the server.
*/
@InterfaceAudience.Private
public abstract class ServerWebApp extends Server implements ServletContextListener {
private static final String HOME_DIR = ".home.dir";

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.lib.util;
import org.apache.hadoop.classification.InterfaceAudience;
import java.text.MessageFormat;
import java.util.List;
import java.util.regex.Pattern;
@ -27,6 +29,7 @@
* <p/>
* Commonly used for method arguments preconditions.
*/
@InterfaceAudience.Private
public class Check {
/**

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.lib.util;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.w3c.dom.DOMException;
import org.w3c.dom.Document;
@ -37,6 +38,7 @@
/**
* Configuration utilities.
*/
@InterfaceAudience.Private
public abstract class ConfigurationUtils {
/**

View File

@ -18,8 +18,11 @@
package org.apache.hadoop.lib.wsrs;
import org.apache.hadoop.classification.InterfaceAudience;
import java.text.MessageFormat;
@InterfaceAudience.Private
public abstract class BooleanParam extends Param<Boolean> {
public BooleanParam(String name, Boolean defaultValue) {

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.lib.wsrs;
import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public abstract class ByteParam extends Param<Byte> {
public ByteParam(String name, Byte defaultValue) {

View File

@ -18,10 +18,12 @@
package org.apache.hadoop.lib.wsrs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.util.StringUtils;
import java.util.Arrays;
@InterfaceAudience.Private
public abstract class EnumParam<E extends Enum<E>> extends Param<E> {
Class<E> klass;

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.lib.wsrs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.http.client.HttpFSFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -28,6 +29,7 @@
import java.util.LinkedHashMap;
import java.util.Map;
@InterfaceAudience.Private
public class ExceptionProvider implements ExceptionMapper<Throwable> {
private static Logger LOG = LoggerFactory.getLogger(ExceptionProvider.class);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.lib.wsrs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.IOUtils;
import javax.ws.rs.core.StreamingOutput;
@ -25,6 +26,7 @@
import java.io.InputStream;
import java.io.OutputStream;
@InterfaceAudience.Private
public class InputStreamEntity implements StreamingOutput {
private InputStream is;
private long offset;

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.lib.wsrs;
import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public abstract class IntegerParam extends Param<Integer> {
public IntegerParam(String name, Integer defaultValue) {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.lib.wsrs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.json.simple.JSONObject;
import javax.ws.rs.Produces;
@ -36,6 +37,7 @@
@Provider
@Produces(MediaType.APPLICATION_JSON)
@InterfaceAudience.Private
public class JSONMapProvider implements MessageBodyWriter<Map> {
private static final String ENTER = System.getProperty("line.separator");

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.lib.wsrs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.json.simple.JSONStreamAware;
import javax.ws.rs.Produces;
@ -35,6 +36,7 @@
@Provider
@Produces(MediaType.APPLICATION_JSON)
@InterfaceAudience.Private
public class JSONProvider implements MessageBodyWriter<JSONStreamAware> {
private static final String ENTER = System.getProperty("line.separator");

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.lib.wsrs;
import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public abstract class LongParam extends Param<Long> {
public LongParam(String name, Long defaultValue) {

View File

@ -18,10 +18,11 @@
package org.apache.hadoop.lib.wsrs;
import org.apache.hadoop.lib.util.Check;
import org.apache.hadoop.classification.InterfaceAudience;
import java.text.MessageFormat;
@InterfaceAudience.Private
public abstract class Param<T> {
private String name;
protected T value;

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.lib.wsrs;
import org.apache.hadoop.classification.InterfaceAudience;
import java.util.Map;
/**
@ -24,6 +26,7 @@
* <p/>
* Instances are created by the {@link ParametersProvider} class.
*/
@InterfaceAudience.Private
public class Parameters {
private Map<String, Param<?>> params;

View File

@ -24,6 +24,7 @@
import com.sun.jersey.server.impl.inject.AbstractHttpContextInjectable;
import com.sun.jersey.spi.inject.Injectable;
import com.sun.jersey.spi.inject.InjectableProvider;
import org.apache.hadoop.classification.InterfaceAudience;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MultivaluedMap;
@ -36,6 +37,7 @@
* Jersey provider that parses the request parameters based on the
* given parameter definition.
*/
@InterfaceAudience.Private
public class ParametersProvider
extends AbstractHttpContextInjectable<Parameters>
implements InjectableProvider<Context, Type> {

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.lib.wsrs;
import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public abstract class ShortParam extends Param<Short> {
private int radix;

View File

@ -17,9 +17,12 @@
*/
package org.apache.hadoop.lib.wsrs;
import org.apache.hadoop.classification.InterfaceAudience;
import java.text.MessageFormat;
import java.util.regex.Pattern;
@InterfaceAudience.Private
public abstract class StringParam extends Param<String> {
private Pattern pattern;

View File

@ -24,6 +24,7 @@
import com.sun.jersey.server.impl.inject.AbstractHttpContextInjectable;
import com.sun.jersey.spi.inject.Injectable;
import com.sun.jersey.spi.inject.InjectableProvider;
import org.apache.hadoop.classification.InterfaceAudience;
import org.slf4j.MDC;
import javax.ws.rs.core.Context;
@ -33,6 +34,7 @@
import java.util.regex.Pattern;
@Provider
@InterfaceAudience.Private
public class UserProvider extends AbstractHttpContextInjectable<Principal> implements
InjectableProvider<Context, Type> {

View File

@ -210,4 +210,20 @@
</description>
</property>
<property>
<name>httpfs.hadoop.filesystem.cache.purge.frequency</name>
<value>60</value>
<description>
Frequency, in seconds, for the idle filesystem purging daemon runs.
</description>
</property>
<property>
<name>httpfs.hadoop.filesystem.cache.purge.timeout</name>
<value>60</value>
<description>
Timeout, in seconds, for an idle filesystem to be purged.
</description>
</property>
</configuration>

View File

@ -18,19 +18,6 @@
package org.apache.hadoop.fs.http.client;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Writer;
import java.net.URI;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.ContentSummary;
@ -47,7 +34,6 @@
import org.apache.hadoop.test.TestDir;
import org.apache.hadoop.test.TestDirHelper;
import org.apache.hadoop.test.TestHdfs;
import org.apache.hadoop.test.TestHdfsHelper;
import org.apache.hadoop.test.TestJetty;
import org.apache.hadoop.test.TestJettyHelper;
import org.junit.Assert;
@ -57,8 +43,31 @@
import org.mortbay.jetty.Server;
import org.mortbay.jetty.webapp.WebAppContext;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Writer;
import java.net.URI;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collection;
@RunWith(value = Parameterized.class)
public class TestHttpFSFileSystem extends HFSTestCase {
public abstract class BaseTestHttpFSWith extends HFSTestCase {
protected abstract Path getProxiedFSTestDir();
protected abstract String getProxiedFSURI();
protected abstract Configuration getProxiedFSConf();
protected boolean isLocalFS() {
return getProxiedFSURI().startsWith("file://");
}
private void createHttpFSServer() throws Exception {
File homeDir = TestDirHelper.getTestDir();
@ -72,8 +81,8 @@ private void createHttpFSServer() throws Exception {
w.write("secret");
w.close();
//HDFS configuration
String fsDefaultName = TestHdfsHelper.getHdfsConf().get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY);
//FileSystem being served by HttpFS
String fsDefaultName = getProxiedFSURI();
Configuration conf = new Configuration(false);
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, fsDefaultName);
File hdfsSite = new File(new File(homeDir, "conf"), "hdfs-site.xml");
@ -105,7 +114,7 @@ protected Class getFileSystemClass() {
return HttpFSFileSystem.class;
}
protected FileSystem getHttpFileSystem() throws Exception {
protected FileSystem getHttpFSFileSystem() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.webhdfs.impl", getFileSystemClass().getName());
URI uri = new URI("webhdfs://" +
@ -114,7 +123,7 @@ protected FileSystem getHttpFileSystem() throws Exception {
}
protected void testGet() throws Exception {
FileSystem fs = getHttpFileSystem();
FileSystem fs = getHttpFSFileSystem();
Assert.assertNotNull(fs);
URI uri = new URI("webhdfs://" +
TestJettyHelper.getJettyURL().toURI().getAuthority());
@ -123,13 +132,13 @@ protected void testGet() throws Exception {
}
private void testOpen() throws Exception {
FileSystem fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
Path path = new Path(TestHdfsHelper.getHdfsTestDir(), "foo.txt");
FileSystem fs = FileSystem.get(getProxiedFSConf());
Path path = new Path(getProxiedFSTestDir(), "foo.txt");
OutputStream os = fs.create(path);
os.write(1);
os.close();
fs.close();
fs = getHttpFileSystem();
fs = getHttpFSFileSystem();
InputStream is = fs.open(new Path(path.toUri().getPath()));
Assert.assertEquals(is.read(), 1);
is.close();
@ -137,7 +146,7 @@ private void testOpen() throws Exception {
}
private void testCreate(Path path, boolean override) throws Exception {
FileSystem fs = getHttpFileSystem();
FileSystem fs = getHttpFSFileSystem();
FsPermission permission = new FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE);
OutputStream os = fs.create(new Path(path.toUri().getPath()), permission, override, 1024,
(short) 2, 100 * 1024 * 1024, null);
@ -145,10 +154,12 @@ private void testCreate(Path path, boolean override) throws Exception {
os.close();
fs.close();
fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
fs = FileSystem.get(getProxiedFSConf());
FileStatus status = fs.getFileStatus(path);
Assert.assertEquals(status.getReplication(), 2);
Assert.assertEquals(status.getBlockSize(), 100 * 1024 * 1024);
if (!isLocalFS()) {
Assert.assertEquals(status.getReplication(), 2);
Assert.assertEquals(status.getBlockSize(), 100 * 1024 * 1024);
}
Assert.assertEquals(status.getPermission(), permission);
InputStream is = fs.open(path);
Assert.assertEquals(is.read(), 1);
@ -157,66 +168,70 @@ private void testCreate(Path path, boolean override) throws Exception {
}
private void testCreate() throws Exception {
Path path = new Path(TestHdfsHelper.getHdfsTestDir(), "foo.txt");
Path path = new Path(getProxiedFSTestDir(), "foo.txt");
testCreate(path, false);
testCreate(path, true);
try {
testCreate(path, false);
Assert.fail();
Assert.fail("the create should have failed because the file exists " +
"and override is FALSE");
} catch (IOException ex) {
} catch (Exception ex) {
Assert.fail();
Assert.fail(ex.toString());
}
}
private void testAppend() throws Exception {
FileSystem fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
Path path = new Path(TestHdfsHelper.getHdfsTestDir(), "foo.txt");
OutputStream os = fs.create(path);
os.write(1);
os.close();
fs.close();
fs = getHttpFileSystem();
os = fs.append(new Path(path.toUri().getPath()));
os.write(2);
os.close();
fs.close();
fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
InputStream is = fs.open(path);
Assert.assertEquals(is.read(), 1);
Assert.assertEquals(is.read(), 2);
Assert.assertEquals(is.read(), -1);
is.close();
fs.close();
if (!isLocalFS()) {
FileSystem fs = FileSystem.get(getProxiedFSConf());
fs.mkdirs(getProxiedFSTestDir());
Path path = new Path(getProxiedFSTestDir(), "foo.txt");
OutputStream os = fs.create(path);
os.write(1);
os.close();
fs.close();
fs = getHttpFSFileSystem();
os = fs.append(new Path(path.toUri().getPath()));
os.write(2);
os.close();
fs.close();
fs = FileSystem.get(getProxiedFSConf());
InputStream is = fs.open(path);
Assert.assertEquals(is.read(), 1);
Assert.assertEquals(is.read(), 2);
Assert.assertEquals(is.read(), -1);
is.close();
fs.close();
}
}
private void testRename() throws Exception {
FileSystem fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
Path path = new Path(TestHdfsHelper.getHdfsTestDir(), "foo");
FileSystem fs = FileSystem.get(getProxiedFSConf());
Path path = new Path(getProxiedFSTestDir(), "foo");
fs.mkdirs(path);
fs.close();
fs = getHttpFileSystem();
fs = getHttpFSFileSystem();
Path oldPath = new Path(path.toUri().getPath());
Path newPath = new Path(path.getParent(), "bar");
fs.rename(oldPath, newPath);
fs.close();
fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
fs = FileSystem.get(getProxiedFSConf());
Assert.assertFalse(fs.exists(oldPath));
Assert.assertTrue(fs.exists(newPath));
fs.close();
}
private void testDelete() throws Exception {
Path foo = new Path(TestHdfsHelper.getHdfsTestDir(), "foo");
Path bar = new Path(TestHdfsHelper.getHdfsTestDir(), "bar");
Path foe = new Path(TestHdfsHelper.getHdfsTestDir(), "foe");
FileSystem fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
Path foo = new Path(getProxiedFSTestDir(), "foo");
Path bar = new Path(getProxiedFSTestDir(), "bar");
Path foe = new Path(getProxiedFSTestDir(), "foe");
FileSystem fs = FileSystem.get(getProxiedFSConf());
fs.mkdirs(foo);
fs.mkdirs(new Path(bar, "a"));
fs.mkdirs(foe);
FileSystem hoopFs = getHttpFileSystem();
FileSystem hoopFs = getHttpFSFileSystem();
Assert.assertTrue(hoopFs.delete(new Path(foo.toUri().getPath()), false));
Assert.assertFalse(fs.exists(foo));
try {
@ -239,15 +254,15 @@ private void testDelete() throws Exception {
}
private void testListStatus() throws Exception {
FileSystem fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
Path path = new Path(TestHdfsHelper.getHdfsTestDir(), "foo.txt");
FileSystem fs = FileSystem.get(getProxiedFSConf());
Path path = new Path(getProxiedFSTestDir(), "foo.txt");
OutputStream os = fs.create(path);
os.write(1);
os.close();
FileStatus status1 = fs.getFileStatus(path);
fs.close();
fs = getHttpFileSystem();
fs = getHttpFSFileSystem();
FileStatus status2 = fs.getFileStatus(new Path(path.toUri().getPath()));
fs.close();
@ -267,16 +282,20 @@ private void testListStatus() throws Exception {
}
private void testWorkingdirectory() throws Exception {
FileSystem fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
FileSystem fs = FileSystem.get(getProxiedFSConf());
Path workingDir = fs.getWorkingDirectory();
fs.close();
fs = getHttpFileSystem();
Path hoopWorkingDir = fs.getWorkingDirectory();
fs = getHttpFSFileSystem();
if (isLocalFS()) {
fs.setWorkingDirectory(workingDir);
}
Path httpFSWorkingDir = fs.getWorkingDirectory();
fs.close();
Assert.assertEquals(hoopWorkingDir.toUri().getPath(), workingDir.toUri().getPath());
Assert.assertEquals(httpFSWorkingDir.toUri().getPath(),
workingDir.toUri().getPath());
fs = getHttpFileSystem();
fs = getHttpFSFileSystem();
fs.setWorkingDirectory(new Path("/tmp"));
workingDir = fs.getWorkingDirectory();
fs.close();
@ -284,62 +303,64 @@ private void testWorkingdirectory() throws Exception {
}
private void testMkdirs() throws Exception {
Path path = new Path(TestHdfsHelper.getHdfsTestDir(), "foo");
FileSystem fs = getHttpFileSystem();
Path path = new Path(getProxiedFSTestDir(), "foo");
FileSystem fs = getHttpFSFileSystem();
fs.mkdirs(path);
fs.close();
fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
fs = FileSystem.get(getProxiedFSConf());
Assert.assertTrue(fs.exists(path));
fs.close();
}
private void testSetTimes() throws Exception {
FileSystem fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
Path path = new Path(TestHdfsHelper.getHdfsTestDir(), "foo.txt");
OutputStream os = fs.create(path);
os.write(1);
os.close();
FileStatus status1 = fs.getFileStatus(path);
fs.close();
long at = status1.getAccessTime();
long mt = status1.getModificationTime();
if (!isLocalFS()) {
FileSystem fs = FileSystem.get(getProxiedFSConf());
Path path = new Path(getProxiedFSTestDir(), "foo.txt");
OutputStream os = fs.create(path);
os.write(1);
os.close();
FileStatus status1 = fs.getFileStatus(path);
fs.close();
long at = status1.getAccessTime();
long mt = status1.getModificationTime();
fs = getHttpFileSystem();
fs.setTimes(path, mt + 10, at + 20);
fs.close();
fs = getHttpFSFileSystem();
fs.setTimes(path, mt - 10, at - 20);
fs.close();
fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
status1 = fs.getFileStatus(path);
fs.close();
long atNew = status1.getAccessTime();
long mtNew = status1.getModificationTime();
Assert.assertEquals(mtNew, mt + 10);
Assert.assertEquals(atNew, at + 20);
fs = FileSystem.get(getProxiedFSConf());
status1 = fs.getFileStatus(path);
fs.close();
long atNew = status1.getAccessTime();
long mtNew = status1.getModificationTime();
Assert.assertEquals(mtNew, mt - 10);
Assert.assertEquals(atNew, at - 20);
}
}
private void testSetPermission() throws Exception {
FileSystem fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
Path path = new Path(TestHdfsHelper.getHdfsTestDir(), "foodir");
FileSystem fs = FileSystem.get(getProxiedFSConf());
Path path = new Path(getProxiedFSTestDir(), "foodir");
fs.mkdirs(path);
fs = getHttpFileSystem();
fs = getHttpFSFileSystem();
FsPermission permission1 = new FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE);
fs.setPermission(path, permission1);
fs.close();
fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
fs = FileSystem.get(getProxiedFSConf());
FileStatus status1 = fs.getFileStatus(path);
fs.close();
FsPermission permission2 = status1.getPermission();
Assert.assertEquals(permission2, permission1);
//sticky bit
fs = getHttpFileSystem();
//sticky bit
fs = getHttpFSFileSystem();
permission1 = new FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE, true);
fs.setPermission(path, permission1);
fs.close();
fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
fs = FileSystem.get(getProxiedFSConf());
status1 = fs.getFileStatus(path);
fs.close();
permission2 = status1.getPermission();
@ -348,70 +369,76 @@ private void testSetPermission() throws Exception {
}
private void testSetOwner() throws Exception {
FileSystem fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
Path path = new Path(TestHdfsHelper.getHdfsTestDir(), "foo.txt");
OutputStream os = fs.create(path);
os.write(1);
os.close();
fs.close();
if (!isLocalFS()) {
FileSystem fs = FileSystem.get(getProxiedFSConf());
fs.mkdirs(getProxiedFSTestDir());
Path path = new Path(getProxiedFSTestDir(), "foo.txt");
OutputStream os = fs.create(path);
os.write(1);
os.close();
fs.close();
fs = getHttpFileSystem();
String user = HadoopUsersConfTestHelper.getHadoopUsers()[1];
String group = HadoopUsersConfTestHelper.getHadoopUserGroups(user)[0];
fs.setOwner(path, user, group);
fs.close();
fs = getHttpFSFileSystem();
String user = HadoopUsersConfTestHelper.getHadoopUsers()[1];
String group = HadoopUsersConfTestHelper.getHadoopUserGroups(user)[0];
fs.setOwner(path, user, group);
fs.close();
fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
FileStatus status1 = fs.getFileStatus(path);
fs.close();
Assert.assertEquals(status1.getOwner(), user);
Assert.assertEquals(status1.getGroup(), group);
fs = FileSystem.get(getProxiedFSConf());
FileStatus status1 = fs.getFileStatus(path);
fs.close();
Assert.assertEquals(status1.getOwner(), user);
Assert.assertEquals(status1.getGroup(), group);
}
}
private void testSetReplication() throws Exception {
FileSystem fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
Path path = new Path(TestHdfsHelper.getHdfsTestDir(), "foo.txt");
FileSystem fs = FileSystem.get(getProxiedFSConf());
Path path = new Path(getProxiedFSTestDir(), "foo.txt");
OutputStream os = fs.create(path);
os.write(1);
os.close();
fs.close();
fs.setReplication(path, (short) 2);
fs = getHttpFileSystem();
fs = getHttpFSFileSystem();
fs.setReplication(path, (short) 1);
fs.close();
fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
fs = FileSystem.get(getProxiedFSConf());
FileStatus status1 = fs.getFileStatus(path);
fs.close();
Assert.assertEquals(status1.getReplication(), (short) 1);
}
private void testChecksum() throws Exception {
FileSystem fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
Path path = new Path(TestHdfsHelper.getHdfsTestDir(), "foo.txt");
OutputStream os = fs.create(path);
os.write(1);
os.close();
FileChecksum hdfsChecksum = fs.getFileChecksum(path);
fs.close();
fs = getHttpFileSystem();
FileChecksum httpChecksum = fs.getFileChecksum(path);
fs.close();
Assert.assertEquals(httpChecksum.getAlgorithmName(), hdfsChecksum.getAlgorithmName());
Assert.assertEquals(httpChecksum.getLength(), hdfsChecksum.getLength());
Assert.assertArrayEquals(httpChecksum.getBytes(), hdfsChecksum.getBytes());
if (!isLocalFS()) {
FileSystem fs = FileSystem.get(getProxiedFSConf());
fs.mkdirs(getProxiedFSTestDir());
Path path = new Path(getProxiedFSTestDir(), "foo.txt");
OutputStream os = fs.create(path);
os.write(1);
os.close();
FileChecksum hdfsChecksum = fs.getFileChecksum(path);
fs.close();
fs = getHttpFSFileSystem();
FileChecksum httpChecksum = fs.getFileChecksum(path);
fs.close();
Assert.assertEquals(httpChecksum.getAlgorithmName(), hdfsChecksum.getAlgorithmName());
Assert.assertEquals(httpChecksum.getLength(), hdfsChecksum.getLength());
Assert.assertArrayEquals(httpChecksum.getBytes(), hdfsChecksum.getBytes());
}
}
private void testContentSummary() throws Exception {
FileSystem fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
Path path = new Path(TestHdfsHelper.getHdfsTestDir(), "foo.txt");
FileSystem fs = FileSystem.get(getProxiedFSConf());
Path path = new Path(getProxiedFSTestDir(), "foo.txt");
OutputStream os = fs.create(path);
os.write(1);
os.close();
ContentSummary hdfsContentSummary = fs.getContentSummary(path);
fs.close();
fs = getHttpFileSystem();
fs = getHttpFSFileSystem();
ContentSummary httpContentSummary = fs.getContentSummary(path);
fs.close();
Assert.assertEquals(httpContentSummary.getDirectoryCount(), hdfsContentSummary.getDirectoryCount());
@ -484,13 +511,13 @@ public static Collection operations() {
ops[i] = new Object[]{Operation.values()[i]};
}
//To test one or a subset of operations do:
//return Arrays.asList(new Object[][]{ new Object[]{Operation.OPEN}});
//return Arrays.asList(new Object[][]{ new Object[]{Operation.APPEND}});
return Arrays.asList(ops);
}
private Operation operation;
public TestHttpFSFileSystem(Operation operation) {
public BaseTestHttpFSWith(Operation operation) {
this.operation = operation;
}

View File

@ -18,20 +18,15 @@
package org.apache.hadoop.fs.http.client;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.test.TestJettyHelper;
import org.junit.Assert;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(value = Parameterized.class)
public class TestWebhdfsFileSystem extends TestHttpFSFileSystem {
public class TestHttpFSFWithWebhdfsFileSystem
extends TestHttpFSWithHttpFSFileSystem {
public TestWebhdfsFileSystem(TestHttpFSFileSystem.Operation operation) {
public TestHttpFSFWithWebhdfsFileSystem(Operation operation) {
super(operation);
}

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.http.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.test.TestDirHelper;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
@RunWith(value = Parameterized.class)
public class TestHttpFSFileSystemLocalFileSystem extends BaseTestHttpFSWith {
private static String PATH_PREFIX;
static {
new TestDirHelper();
String prefix =
System.getProperty("test.build.dir", "target/test-dir") + "/local";
File file = new File(prefix);
file.mkdirs();
PATH_PREFIX = file.getAbsolutePath();
}
public TestHttpFSFileSystemLocalFileSystem(Operation operation) {
super(operation);
}
protected Path getProxiedFSTestDir() {
return addPrefix(new Path(TestDirHelper.getTestDir().getAbsolutePath()));
}
protected String getProxiedFSURI() {
return "file:///";
}
protected Configuration getProxiedFSConf() {
Configuration conf = new Configuration(false);
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, getProxiedFSURI());
return conf;
}
protected Path addPrefix(Path path) {
URI uri = path.toUri();
try {
if (uri.getAuthority() != null) {
uri = new URI(uri.getScheme(),
uri.getAuthority(), PATH_PREFIX + uri.getPath());
}
else {
if (uri.getPath().startsWith("/")) {
uri = new URI(PATH_PREFIX + uri.getPath());
}
}
} catch (URISyntaxException ex) {
throw new RuntimeException("It should not happen: " + ex.toString(), ex);
}
return new Path(uri);
}
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.http.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.test.TestHdfsHelper;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(value = Parameterized.class)
public class TestHttpFSWithHttpFSFileSystem extends BaseTestHttpFSWith {
public TestHttpFSWithHttpFSFileSystem(Operation operation) {
super(operation);
}
protected Class getFileSystemClass() {
return HttpFSFileSystem.class;
}
protected Path getProxiedFSTestDir() {
return TestHdfsHelper.getHdfsTestDir();
}
protected String getProxiedFSURI() {
return TestHdfsHelper.getHdfsConf().get(
CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY);
}
protected Configuration getProxiedFSConf() {
return TestHdfsHelper.getHdfsConf();
}
}

View File

@ -18,10 +18,6 @@
package org.apache.hadoop.lib.service.hadoop;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@ -37,6 +33,7 @@
import org.apache.hadoop.lib.service.FileSystemAccess;
import org.apache.hadoop.lib.service.FileSystemAccessException;
import org.apache.hadoop.lib.service.instrumentation.InstrumentationService;
import org.apache.hadoop.lib.service.scheduler.SchedulerService;
import org.apache.hadoop.test.HFSTestCase;
import org.apache.hadoop.test.TestDir;
import org.apache.hadoop.test.TestDirHelper;
@ -44,6 +41,7 @@
import org.apache.hadoop.test.TestHdfs;
import org.apache.hadoop.test.TestHdfsHelper;
import org.apache.hadoop.util.StringUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -68,13 +66,15 @@ public void createHadoopConf() throws Exception {
@TestDir
public void simpleSecurity() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(),
FileSystemAccessService.class.getName()));
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration conf = new Configuration(false);
conf.set("server.services", services);
Server server = new Server("server", dir, dir, dir, dir, conf);
server.init();
assertNotNull(server.get(FileSystemAccess.class));
Assert.assertNotNull(server.get(FileSystemAccess.class));
server.destroy();
}
@ -83,8 +83,10 @@ public void simpleSecurity() throws Exception {
@TestDir
public void noKerberosKeytabProperty() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(),
FileSystemAccessService.class.getName()));
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration conf = new Configuration(false);
conf.set("server.services", services);
conf.set("server.hadoop.authentication.type", "kerberos");
@ -98,8 +100,10 @@ public void noKerberosKeytabProperty() throws Exception {
@TestDir
public void noKerberosPrincipalProperty() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(),
FileSystemAccessService.class.getName()));
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration conf = new Configuration(false);
conf.set("server.services", services);
conf.set("server.hadoop.authentication.type", "kerberos");
@ -114,8 +118,10 @@ public void noKerberosPrincipalProperty() throws Exception {
@TestDir
public void kerberosInitializationFailure() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(),
FileSystemAccessService.class.getName()));
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration conf = new Configuration(false);
conf.set("server.services", services);
conf.set("server.hadoop.authentication.type", "kerberos");
@ -130,8 +136,10 @@ public void kerberosInitializationFailure() throws Exception {
@TestDir
public void invalidSecurity() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(),
FileSystemAccessService.class.getName()));
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration conf = new Configuration(false);
conf.set("server.services", services);
conf.set("server.hadoop.authentication.type", "foo");
@ -143,15 +151,17 @@ public void invalidSecurity() throws Exception {
@TestDir
public void serviceHadoopConf() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(),
FileSystemAccessService.class.getName()));
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration conf = new Configuration(false);
conf.set("server.services", services);
Server server = new Server("server", dir, dir, dir, dir, conf);
server.init();
FileSystemAccessService fsAccess = (FileSystemAccessService) server.get(FileSystemAccess.class);
assertEquals(fsAccess.serviceHadoopConf.get("foo"), "FOO");
Assert.assertEquals(fsAccess.serviceHadoopConf.get("foo"), "FOO");
server.destroy();
}
@ -161,8 +171,10 @@ public void serviceHadoopConfCustomDir() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String hadoopConfDir = new File(dir, "confx").getAbsolutePath();
new File(hadoopConfDir).mkdirs();
String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(),
FileSystemAccessService.class.getName()));
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration conf = new Configuration(false);
conf.set("server.services", services);
conf.set("server.hadoop.config.dir", hadoopConfDir);
@ -177,7 +189,7 @@ public void serviceHadoopConfCustomDir() throws Exception {
Server server = new Server("server", dir, dir, dir, dir, conf);
server.init();
FileSystemAccessService fsAccess = (FileSystemAccessService) server.get(FileSystemAccess.class);
assertEquals(fsAccess.serviceHadoopConf.get("foo"), "BAR");
Assert.assertEquals(fsAccess.serviceHadoopConf.get("foo"), "BAR");
server.destroy();
}
@ -185,8 +197,10 @@ public void serviceHadoopConfCustomDir() throws Exception {
@TestDir
public void inWhitelists() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(),
FileSystemAccessService.class.getName()));
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration conf = new Configuration(false);
conf.set("server.services", services);
Server server = new Server("server", dir, dir, dir, dir, conf);
@ -219,8 +233,10 @@ public void inWhitelists() throws Exception {
@TestDir
public void NameNodeNotinWhitelists() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(),
FileSystemAccessService.class.getName()));
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration conf = new Configuration(false);
conf.set("server.services", services);
conf.set("server.hadoop.name.node.whitelist", "NN");
@ -235,8 +251,10 @@ public void NameNodeNotinWhitelists() throws Exception {
@TestHdfs
public void createFileSystem() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(),
FileSystemAccessService.class.getName()));
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration hadoopConf = new Configuration(false);
hadoopConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, TestHdfsHelper.getHdfsConf().get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY));
@ -244,19 +262,20 @@ public void createFileSystem() throws Exception {
Configuration conf = new Configuration(false);
conf.set("server.services", services);
conf.set("server.hadoop.filesystem.cache.purge.timeout", "0");
Server server = new Server("server", dir, dir, dir, dir, conf);
server.init();
FileSystemAccess hadoop = server.get(FileSystemAccess.class);
FileSystem fs = hadoop.createFileSystem("u", hadoop.getFileSystemConfiguration());
assertNotNull(fs);
Assert.assertNotNull(fs);
fs.mkdirs(new Path("/tmp/foo"));
hadoop.releaseFileSystem(fs);
try {
fs.mkdirs(new Path("/tmp/foo"));
fail();
Assert.fail();
} catch (IOException ex) {
} catch (Exception ex) {
fail();
Assert.fail();
}
server.destroy();
}
@ -266,8 +285,10 @@ public void createFileSystem() throws Exception {
@TestHdfs
public void fileSystemExecutor() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(),
FileSystemAccessService.class.getName()));
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration hadoopConf = new Configuration(false);
hadoopConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, TestHdfsHelper.getHdfsConf().get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY));
@ -275,6 +296,7 @@ public void fileSystemExecutor() throws Exception {
Configuration conf = new Configuration(false);
conf.set("server.services", services);
conf.set("server.hadoop.filesystem.cache.purge.timeout", "0");
Server server = new Server("server", dir, dir, dir, dir, conf);
server.init();
FileSystemAccess hadoop = server.get(FileSystemAccess.class);
@ -291,10 +313,10 @@ public Void execute(FileSystem fs) throws IOException {
});
try {
fsa[0].mkdirs(new Path("/tmp/foo"));
fail();
Assert.fail();
} catch (IOException ex) {
} catch (Exception ex) {
fail();
Assert.fail();
}
server.destroy();
}
@ -305,8 +327,10 @@ public Void execute(FileSystem fs) throws IOException {
@TestHdfs
public void fileSystemExecutorNoNameNode() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(),
FileSystemAccessService.class.getName()));
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration hadoopConf = new Configuration(false);
hadoopConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, TestHdfsHelper.getHdfsConf().get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY));
createHadoopConf(hadoopConf);
@ -332,8 +356,10 @@ public Void execute(FileSystem fs) throws IOException {
@TestHdfs
public void fileSystemExecutorException() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(),
FileSystemAccessService.class.getName()));
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration hadoopConf = new Configuration(false);
hadoopConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, TestHdfsHelper.getHdfsConf().get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY));
@ -341,6 +367,7 @@ public void fileSystemExecutorException() throws Exception {
Configuration conf = new Configuration(false);
conf.set("server.services", services);
conf.set("server.hadoop.filesystem.cache.purge.timeout", "0");
Server server = new Server("server", dir, dir, dir, dir, conf);
server.init();
FileSystemAccess hadoop = server.get(FileSystemAccess.class);
@ -354,21 +381,86 @@ public Void execute(FileSystem fs) throws IOException {
throw new IOException();
}
});
fail();
Assert.fail();
} catch (FileSystemAccessException ex) {
assertEquals(ex.getError(), FileSystemAccessException.ERROR.H03);
Assert.assertEquals(ex.getError(), FileSystemAccessException.ERROR.H03);
} catch (Exception ex) {
fail();
Assert.fail();
}
try {
fsa[0].mkdirs(new Path("/tmp/foo"));
fail();
Assert.fail();
} catch (IOException ex) {
} catch (Exception ex) {
fail();
Assert.fail();
}
server.destroy();
}
@Test
@TestDir
@TestHdfs
public void fileSystemCache() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration hadoopConf = new Configuration(false);
hadoopConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
TestHdfsHelper.getHdfsConf().get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY));
createHadoopConf(hadoopConf);
Configuration conf = new Configuration(false);
conf.set("server.services", services);
conf.set("server.hadoop.filesystem.cache.purge.frequency", "1");
conf.set("server.hadoop.filesystem.cache.purge.timeout", "1");
Server server = new Server("server", dir, dir, dir, dir, conf);
try {
server.init();
FileSystemAccess hadoop = server.get(FileSystemAccess.class);
FileSystem fs1 =
hadoop.createFileSystem("u", hadoop.getFileSystemConfiguration());
Assert.assertNotNull(fs1);
fs1.mkdirs(new Path("/tmp/foo1"));
hadoop.releaseFileSystem(fs1);
//still around because of caching
fs1.mkdirs(new Path("/tmp/foo2"));
FileSystem fs2 =
hadoop.createFileSystem("u", hadoop.getFileSystemConfiguration());
//should be same instance because of caching
Assert.assertEquals(fs1, fs2);
Thread.sleep(4 * 1000);
//still around because of lease count is 1 (fs2 is out)
fs1.mkdirs(new Path("/tmp/foo2"));
Thread.sleep(4 * 1000);
//still around because of lease count is 1 (fs2 is out)
fs2.mkdirs(new Path("/tmp/foo"));
hadoop.releaseFileSystem(fs2);
Thread.sleep(4 * 1000);
//should not be around as lease count is 0
try {
fs2.mkdirs(new Path("/tmp/foo"));
Assert.fail();
} catch (IOException ex) {
} catch (Exception ex) {
Assert.fail();
}
} finally {
server.destroy();
}
}
}

View File

@ -77,7 +77,7 @@ private Server createJettyServer() {
server.getConnectors()[0].setPort(port);
return server;
} catch (Exception ex) {
throw new RuntimeException("Could not stop embedded servlet container, " + ex.getMessage(), ex);
throw new RuntimeException("Could not start embedded servlet container, " + ex.getMessage(), ex);
}
}

View File

@ -109,6 +109,11 @@ Trunk (unreleased changes)
HDFS-3630 Modify TestPersistBlocks to use both flush and hflush (sanjay)
HDFS-3768. Exception in TestJettyHelper is incorrect.
(Eli Reisman via jghoman)
HDFS-3695. Genericize format() to non-file JournalManagers. (todd)
OPTIMIZATIONS
BUG FIXES
@ -189,6 +194,9 @@ Branch-2 ( Unreleased changes )
HDFS-3446. HostsFileReader silently ignores bad includes/excludes
(Matthew Jacobs via todd)
HDFS-3755. Creating an already-open-for-write file with overwrite=true fails
(todd)
NEW FEATURES
HDFS-744. Support hsync in HDFS. (Lars Hofhansl via szetszwo)
@ -201,6 +209,10 @@ Branch-2 ( Unreleased changes )
HDFS-3113. httpfs does not support delegation tokens. (tucu)
HDFS-3513. HttpFS should cache filesystems. (tucu)
HDFS-3637. Add support for encrypting the DataTransferProtocol. (atm)
IMPROVEMENTS
HDFS-3390. DFSAdmin should print full stack traces of errors when DEBUG
@ -360,6 +372,14 @@ Branch-2 ( Unreleased changes )
HDFS-3650. Use MutableQuantiles to provide latency histograms for various
operations. (Andrew Wang via atm)
HDFS-3667. Add retry support to WebHdfsFileSystem. (szetszwo)
HDFS-3291. add test that covers HttpFS working w/ a non-HDFS Hadoop
filesystem (tucu)
HDFS-3634. Add self-contained, mavenized fuse_dfs test. (Colin Patrick
McCabe via atm)
OPTIMIZATIONS
HDFS-2982. Startup performance suffers when there are many edit log
@ -373,8 +393,6 @@ Branch-2 ( Unreleased changes )
HDFS-3697. Enable fadvise readahead by default. (todd)
HDFS-3667. Add retry support to WebHdfsFileSystem. (szetszwo)
BUG FIXES
HDFS-3385. The last block of INodeFileUnderConstruction is not
@ -546,6 +564,25 @@ Branch-2 ( Unreleased changes )
HDFS-3732. fuse_dfs: incorrect configuration value checked for connection
expiry timer period. (Colin Patrick McCabe via atm)
HDFS-3738. TestDFSClientRetries#testFailuresArePerOperation sets incorrect
timeout config. (atm)
HDFS-3756. DelegationTokenFetcher creates 2 HTTP connections, the second
one not properly configured. (tucu)
HDFS-3719. Re-enable append-related tests in TestFileConcurrentReader.
(Andrew Wang via atm)
HDFS-3579. libhdfs: fix exception handling. (Colin Patrick McCabe via atm)
HDFS-3754. BlockSender doesn't shutdown ReadaheadPool threads. (eli)
HDFS-3760. primitiveCreate is a write, not a read. (Andy Isaacson via atm)
HDFS-3710. libhdfs misuses O_RDONLY/WRONLY/RDWR. (Andy Isaacson via atm)
HDFS-3721. hsync support broke wire compatibility. (todd and atm)
BREAKDOWN OF HDFS-3042 SUBTASKS
HDFS-2185. HDFS portion of ZK-based FailoverController (todd)

View File

@ -34,6 +34,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<hadoop.component>hdfs</hadoop.component>
<kdc.resource.dir>../../hadoop-common-project/hadoop-common/src/test/resources/kdc</kdc.resource.dir>
<is.hadoop.component>true</is.hadoop.component>
<require.fuse>false</require.fuse>
</properties>
<dependencies>
@ -256,6 +257,9 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<configuration>
<skipTests>false</skipTests>
</configuration>
<executions>
<execution>
<id>compile-proto</id>
@ -437,7 +441,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<mkdir dir="${project.build.directory}/native"/>
<exec executable="cmake" dir="${project.build.directory}/native"
failonerror="true">
<arg line="${basedir}/src/ -DGENERATED_JAVAH=${project.build.directory}/native/javah -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model}"/>
<arg line="${basedir}/src/ -DGENERATED_JAVAH=${project.build.directory}/native/javah -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model} -DREQUIRE_FUSE=${require.fuse}"/>
</exec>
<exec executable="make" dir="${project.build.directory}/native" failonerror="true">
<arg line="VERBOSE=1"/>
@ -453,11 +457,17 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<target>
<property name="compile_classpath" refid="maven.compile.classpath"/>
<property name="test_classpath" refid="maven.test.classpath"/>
<exec executable="${project.build.directory}/native/test_libhdfs_threaded" dir="${project.build.directory}/native/" failonerror="true">
<exec executable="sh" failonerror="true" dir="${project.build.directory}/native/">
<arg value="-c"/>
<arg value="[ x$SKIPTESTS = xtrue ] || ${project.build.directory}/native/test_libhdfs_threaded"/>
<env key="CLASSPATH" value="${test_classpath}:${compile_classpath}"/>
<env key="SKIPTESTS" value="${skipTests}"/>
</exec>
<exec executable="${project.build.directory}/native/test_native_mini_dfs" dir="${project.build.directory}/native/" failonerror="true">
<exec executable="sh" failonerror="true" dir="${project.build.directory}/native/">
<arg value="-c"/>
<arg value="[ x$SKIPTESTS = xtrue ] || ${project.build.directory}/native/test_libhdfs_threaded"/>
<env key="CLASSPATH" value="${test_classpath}:${compile_classpath}"/>
<env key="SKIPTESTS" value="${skipTests}"/>
</exec>
</target>
</configuration>

View File

@ -21,18 +21,7 @@ cmake_minimum_required(VERSION 2.6 FATAL_ERROR)
# Default to release builds
set(CMAKE_BUILD_TYPE, Release)
# If JVM_ARCH_DATA_MODEL is 32, compile all binaries as 32-bit.
# This variable is set by maven.
if (JVM_ARCH_DATA_MODEL EQUAL 32)
# force 32-bit code generation on amd64/x86_64, ppc64, sparc64
if (CMAKE_COMPILER_IS_GNUCC AND CMAKE_SYSTEM_PROCESSOR MATCHES ".*64")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -m32")
set(CMAKE_LD_FLAGS "${CMAKE_LD_FLAGS} -m32")
endif ()
if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64")
set(CMAKE_SYSTEM_PROCESSOR "i686")
endif ()
endif (JVM_ARCH_DATA_MODEL EQUAL 32)
include(../../../hadoop-common-project/hadoop-common/src/JNIFlags.cmake NO_POLICY_SCOPE)
# Compile a library with both shared and static variants
function(add_dual_library LIBNAME)
@ -95,6 +84,7 @@ set(_FUSE_DFS_VERSION 0.1.0)
CONFIGURE_FILE(${CMAKE_SOURCE_DIR}/config.h.cmake ${CMAKE_BINARY_DIR}/config.h)
add_dual_library(hdfs
main/native/libhdfs/exception.c
main/native/libhdfs/hdfs.c
main/native/libhdfs/jni_helper.c
)
@ -106,6 +96,10 @@ set(LIBHDFS_VERSION "0.0.0")
set_target_properties(hdfs PROPERTIES
SOVERSION ${LIBHDFS_VERSION})
add_library(posix_util
main/native/util/posix_util.c
)
add_executable(test_libhdfs_ops
main/native/libhdfs/test/test_libhdfs_ops.c
)

View File

@ -271,6 +271,23 @@ public void processResult(int rc, String path, Object ctx, String name) {
}
}
@Override
public void format(NamespaceInfo ns) {
// Currently, BKJM automatically formats itself when first accessed.
// TODO: change over to explicit formatting so that the admin can
// clear out the BK storage when reformatting a cluster.
LOG.info("Not formatting " + this + " - BKJM does not currently " +
"support reformatting. If it has not been used before, it will" +
"be formatted automatically upon first use.");
}
@Override
public boolean hasSomeData() throws IOException {
// Don't confirm format on BKJM, since format() is currently a
// no-op anyway
return false;
}
/**
* Start a new log segment in a BookKeeper ledger.
* First ensure that we have the write lock for this journal.

View File

@ -21,6 +21,7 @@
import java.net.Socket;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
/**
* A BlockReader is responsible for reading a single block
@ -71,4 +72,8 @@ public interface BlockReader extends ByteBufferReadable {
*/
boolean hasSentStatusCode();
/**
* @return a reference to the streams this block reader is using.
*/
IOStreamPair getStreams();
}

View File

@ -25,7 +25,12 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient.Conf;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
@ -41,12 +46,13 @@ public static BlockReader newBlockReader(
Configuration conf,
Socket sock, String file,
ExtendedBlock block, Token<BlockTokenIdentifier> blockToken,
long startOffset, long len) throws IOException {
long startOffset, long len, DataEncryptionKey encryptionKey)
throws IOException {
int bufferSize = conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT);
return newBlockReader(new Conf(conf),
sock, file, block, blockToken, startOffset,
len, bufferSize, true, "");
len, bufferSize, true, "", encryptionKey, null);
}
/**
@ -73,14 +79,32 @@ public static BlockReader newBlockReader(
Token<BlockTokenIdentifier> blockToken,
long startOffset, long len,
int bufferSize, boolean verifyChecksum,
String clientName)
String clientName,
DataEncryptionKey encryptionKey,
IOStreamPair ioStreams)
throws IOException {
if (conf.useLegacyBlockReader) {
if (encryptionKey != null) {
throw new RuntimeException("Encryption is not supported with the legacy block reader.");
}
return RemoteBlockReader.newBlockReader(
sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);
} else {
if (ioStreams == null) {
ioStreams = new IOStreamPair(NetUtils.getInputStream(sock),
NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT));
if (encryptionKey != null) {
IOStreamPair encryptedStreams =
DataTransferEncryptor.getEncryptedStreams(
ioStreams.out, ioStreams.in, encryptionKey);
ioStreams = encryptedStreams;
}
}
return RemoteBlockReader2.newBlockReader(
sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);
sock, file, block, blockToken, startOffset, len, bufferSize,
verifyChecksum, clientName, encryptionKey, ioStreams);
}
}

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
@ -681,4 +682,9 @@ public Socket takeSocket() {
public boolean hasSentStatusCode() {
return false;
}
@Override
public IOStreamPair getStreams() {
return null;
}
}

Some files were not shown because too many files have changed in this diff Show More