HDFS-3110. Use directRead API to reduce the number of buffer copies in libhdfs. Contributed by Henry Robinson.

(svn merge -c 1310185 from trunk)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1348246 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-06-08 21:16:34 +00:00
parent 2ce56df137
commit e6c0425996
5 changed files with 290 additions and 158 deletions

View File

@ -73,6 +73,9 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream.
(Henry Robinson via todd)
HDFS-3110. Use directRead API to reduce the number of buffer copies in
libhdfs (Henry Robinson via todd)
BUG FIXES
HDFS-3385. The last block of INodeFileUnderConstruction is not

View File

@ -123,6 +123,11 @@ static int errnoFromException(jthrowable exc, JNIEnv *env,
goto done;
}
if (!strcmp(excClass, "java.lang.UnsupportedOperationException")) {
errnum = ENOTSUP;
goto done;
}
if (!strcmp(excClass, "org.apache.hadoop.security."
"AccessControlException")) {
errnum = EACCES;
@ -614,8 +619,29 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
} else {
file->file = (*env)->NewGlobalRef(env, jVal.l);
file->type = (((flags & O_WRONLY) == 0) ? INPUT : OUTPUT);
file->flags = 0;
destroyLocalReference(env, jVal.l);
if ((flags & O_WRONLY) == 0) {
// Try a test read to see if we can do direct reads
errno = 0;
char buf;
if (readDirect(fs, file, &buf, 0) == 0) {
// Success - 0-byte read should return 0
file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ;
} else {
if (errno != ENOTSUP) {
// Unexpected error. Clear it, don't set the direct flag.
fprintf(stderr,
"WARN: Unexpected error %d when testing "
"for direct read compatibility\n", errno);
errno = 0;
goto done;
}
}
errno = 0;
}
}
done:
@ -706,10 +732,57 @@ int hdfsExists(hdfsFS fs, const char *path)
return jVal.z ? 0 : -1;
}
// Checks input file for readiness for reading.
static int readPrepare(JNIEnv* env, hdfsFS fs, hdfsFile f,
jobject* jInputStream)
{
*jInputStream = (jobject)(f ? f->file : NULL);
//Sanity check
if (!f || f->type == UNINITIALIZED) {
errno = EBADF;
return -1;
}
//Error checking... make sure that this file is 'readable'
if (f->type != INPUT) {
fprintf(stderr, "Cannot read from a non-InputStream object!\n");
errno = EINVAL;
return -1;
}
return 0;
}
// Common error-handling code between read paths.
static int handleReadResult(int success, jvalue jVal, jthrowable jExc,
JNIEnv* env)
{
int noReadBytes;
if (success != 0) {
if ((*env)->ExceptionCheck(env)) {
errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
"FSDataInputStream::read");
}
noReadBytes = -1;
} else {
noReadBytes = jVal.i;
if (noReadBytes < 0) {
// -1 from Java is EOF, which is 0 here
noReadBytes = 0;
}
errno = 0;
}
return noReadBytes;
}
tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
{
if (f->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) {
return readDirect(fs, f, buffer, length);
}
// JAVA EQUIVALENT:
// byte [] bR = new byte[length];
// fis.read(bR);
@ -722,46 +795,26 @@ tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
}
//Parameters
jobject jInputStream = (jobject)(f ? f->file : NULL);
jobject jInputStream;
if (readPrepare(env, fs, f, &jInputStream) == -1) {
return -1;
}
jbyteArray jbRarray;
jint noReadBytes = 0;
jvalue jVal;
jthrowable jExc = NULL;
//Sanity check
if (!f || f->type == UNINITIALIZED) {
errno = EBADF;
return -1;
}
//Error checking... make sure that this file is 'readable'
if (f->type != INPUT) {
fprintf(stderr, "Cannot read from a non-InputStream object!\n");
errno = EINVAL;
return -1;
}
//Read the requisite bytes
jbRarray = (*env)->NewByteArray(env, length);
if (invokeMethod(env, &jVal, &jExc, INSTANCE, jInputStream, HADOOP_ISTRM,
"read", "([B)I", jbRarray) != 0) {
errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
"FSDataInputStream::read");
noReadBytes = -1;
}
else {
noReadBytes = jVal.i;
if (noReadBytes > 0) {
(*env)->GetByteArrayRegion(env, jbRarray, 0, noReadBytes, buffer);
} else {
//This is a valid case: there aren't any bytes left to read!
if (noReadBytes == 0 || noReadBytes < -1) {
fprintf(stderr, "WARN: FSDataInputStream.read returned invalid return code - libhdfs returning EOF, i.e., 0: %d\n", noReadBytes);
}
noReadBytes = 0;
}
errno = 0;
int success = invokeMethod(env, &jVal, &jExc, INSTANCE, jInputStream, HADOOP_ISTRM,
"read", "([B)I", jbRarray);
noReadBytes = handleReadResult(success, jVal, jExc, env);;
if (noReadBytes > 0) {
(*env)->GetByteArrayRegion(env, jbRarray, 0, noReadBytes, buffer);
}
destroyLocalReference(env, jbRarray);
@ -769,6 +822,52 @@ tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
return noReadBytes;
}
// Reads using the read(ByteBuffer) API, which does fewer copies
tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
{
// JAVA EQUIVALENT:
// ByteBuffer bbuffer = ByteBuffer.allocateDirect(length) // wraps C buffer
// fis.read(bbuffer);
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
jobject jInputStream;
if (readPrepare(env, fs, f, &jInputStream) == -1) {
return -1;
}
jint noReadBytes = 0;
jvalue jVal;
jthrowable jExc = NULL;
//Read the requisite bytes
jobject bb = (*env)->NewDirectByteBuffer(env, buffer, length);
if (bb == NULL) {
fprintf(stderr, "Could not allocate ByteBuffer");
if ((*env)->ExceptionCheck(env)) {
errno = errnoFromException(NULL, env, "JNIEnv::NewDirectByteBuffer");
} else {
errno = ENOMEM; // Best guess if there's no exception waiting
}
return -1;
}
int success = invokeMethod(env, &jVal, &jExc, INSTANCE, jInputStream,
HADOOP_ISTRM, "read", "(Ljava/nio/ByteBuffer;)I",
bb);
noReadBytes = handleReadResult(success, jVal, jExc, env);
destroyLocalReference(env, bb);
return noReadBytes;
}
tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position,

View File

@ -81,12 +81,16 @@ extern "C" {
};
// Bit fields for hdfsFile_internal flags
#define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0)
/**
* The 'file-handle' to a file in hdfs.
*/
struct hdfsFile_internal {
void* file;
enum hdfsStreamType type;
uint32_t flags;
};
typedef struct hdfsFile_internal* hdfsFile;
@ -203,7 +207,6 @@ extern "C" {
*/
tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length);
/**
* hdfsPread - Positional read of data from an open file.
* @param fs The configured filesystem handle.

View File

@ -18,6 +18,8 @@
#include "hdfs.h"
tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length);
void permission_disp(short permissions, char *rtr) {
rtr[9] = '\0';
int i;
@ -51,7 +53,6 @@ void permission_disp(short permissions, char *rtr) {
}
int main(int argc, char **argv) {
hdfsFS fs = hdfsConnectNewInstance("default", 0);
if(!fs) {
fprintf(stderr, "Oops! Failed to connect to hdfs!\n");
@ -64,20 +65,25 @@ int main(int argc, char **argv) {
exit(-1);
}
const char* writePath = "/tmp/testfile.txt";
const char* writePath = "/tmp/testfile.txt";
const char* fileContents = "Hello, World!";
{
//Write tests
hdfsFile writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_CREAT, 0, 0, 0);
if(!writeFile) {
fprintf(stderr, "Failed to open %s for writing!\n", writePath);
exit(-1);
}
fprintf(stderr, "Opened %s for writing successfully...\n", writePath);
char* buffer = "Hello, World!";
tSize num_written_bytes = hdfsWrite(fs, writeFile, (void*)buffer, strlen(buffer)+1);
tSize num_written_bytes =
hdfsWrite(fs, writeFile, (void*)fileContents, strlen(fileContents)+1);
if (num_written_bytes != strlen(fileContents) + 1) {
fprintf(stderr, "Failed to write correct number of bytes - expected %d, got %d\n",
(int)(strlen(fileContents) + 1), (int)num_written_bytes);
exit(-1);
}
fprintf(stderr, "Wrote %d bytes\n", num_written_bytes);
tOffset currentPos = -1;
@ -138,18 +144,86 @@ int main(int argc, char **argv) {
}
fprintf(stderr, "Current position: %ld\n", currentPos);
if ((readFile->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) == 0) {
fprintf(stderr, "Direct read support incorrectly not detected "
"for HDFS filesystem\n");
exit(-1);
}
fprintf(stderr, "Direct read support detected for HDFS\n");
// Clear flags so that we really go through slow read path
readFile->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ;
static char buffer[32];
tSize num_read_bytes = hdfsRead(fs, readFile, (void*)buffer,
sizeof(buffer));
fprintf(stderr, "Read following %d bytes:\n%s\n",
num_read_bytes, buffer);
memset(buffer, 0, strlen(fileContents + 1));
num_read_bytes = hdfsPread(fs, readFile, 0, (void*)buffer,
sizeof(buffer));
fprintf(stderr, "Read following %d bytes:\n%s\n",
num_read_bytes, buffer);
if (hdfsSeek(fs, readFile, 0L)) {
fprintf(stderr,
"Failed to seek to file start for direct read test!\n");
exit(-1);
}
readFile->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ;
memset(buffer, 0, strlen(fileContents + 1));
num_read_bytes = hdfsRead(fs, readFile, (void*)buffer,
sizeof(buffer));
if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
fprintf(stderr, "Failed to read (direct). Expected %s but got %s (%d bytes)\n",
fileContents, buffer, num_read_bytes);
exit(-1);
}
fprintf(stderr, "Read (direct) following %d bytes:\n%s\n",
num_read_bytes, buffer);
hdfsCloseFile(fs, readFile);
// Test correct behaviour for unsupported filesystems
hdfsFile localFile = hdfsOpenFile(lfs, writePath, O_WRONLY|O_CREAT, 0, 0, 0);
if(!localFile) {
fprintf(stderr, "Failed to open %s for writing!\n", writePath);
exit(-1);
}
tSize num_written_bytes = hdfsWrite(lfs, localFile,
(void*)fileContents,
strlen(fileContents) + 1);
hdfsCloseFile(lfs, localFile);
localFile = hdfsOpenFile(lfs, writePath, O_RDONLY, 0, 0, 0);
if (localFile->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) {
fprintf(stderr, "Direct read support incorrectly detected for local "
"filesystem\n");
exit(-1);
}
memset(buffer, 0, strlen(fileContents + 1));
int result = readDirect(lfs, localFile, (void*)buffer, sizeof(buffer));
if (result != -1) {
fprintf(stderr, "Expected error from local direct read not seen!\n");
exit(-1);
}
if (errno != ENOTSUP) {
fprintf(stderr, "Error code not correctly set to ENOTSUP, was %d!\n",
errno);
exit(-1);
}
fprintf(stderr, "Expected exception thrown for unsupported direct read\n");
hdfsCloseFile(lfs, localFile);
}
int totalResult = 0;
@ -446,4 +520,3 @@ int main(int argc, char **argv) {
/**
* vim: ts=4: sw=4: et:
*/

View File

@ -17,126 +17,64 @@
#
#
# Note: This script depends on 8 environment variables to function correctly:
# a) CLASSPATH
# b) HADOOP_PREFIX
# c) HADOOP_CONF_DIR
# d) HADOOP_LOG_DIR
# e) LIBHDFS_BUILD_DIR
# f) LIBHDFS_INSTALL_DIR
# g) OS_NAME
# h) CLOVER_JAR
# i} HADOOP_VERSION
# j) HADOOP_HDFS_HOME
# All these are passed by build.xml.
# Note: This script depends on 5 environment variables to function correctly:
# a) HADOOP_HOME - must be set
# b) HDFS_TEST_CONF_DIR - optional; the directory to read and write
# core-site.xml to. Defaults to /tmp
# c) LIBHDFS_BUILD_DIR - optional; the location of the hdfs_test
# executable. Defaults to the parent directory.
# d) OS_NAME - used to choose how to locate libjvm.so
# e) CLOVER_JAR - optional; the location of the Clover code coverage tool's jar.
#
if [ "x$HADOOP_HOME" == "x" ]; then
echo "HADOOP_HOME is unset!"
exit 1
fi
if [ "x$LIBHDFS_BUILD_DIR" == "x" ]; then
LIBHDFS_BUILD_DIR=`pwd`/../
fi
if [ "x$HDFS_TEST_CONF_DIR" == "x" ]; then
HDFS_TEST_CONF_DIR=/tmp
fi
# LIBHDFS_INSTALL_DIR is the directory containing libhdfs.so
LIBHDFS_INSTALL_DIR=$HADOOP_HOME/lib/native/
HDFS_TEST=hdfs_test
HADOOP_LIB_DIR=$HADOOP_PREFIX/lib
HADOOP_BIN_DIR=$HADOOP_PREFIX/bin
COMMON_BUILD_DIR=$HADOOP_PREFIX/build/ivy/lib/hadoop-hdfs/common
COMMON_JAR=$COMMON_BUILD_DIR/hadoop-common-$HADOOP_VERSION.jar
HDFS_TEST_JAR=`find $HADOOP_HOME/share/hadoop/hdfs/ \
-name "hadoop-hdfs-*-tests.jar" | head -n 1`
cat > $HADOOP_CONF_DIR/core-site.xml <<EOF
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>file:///$LIBHDFS_TEST_DIR</value>
</property>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:23000/</value>
</property>
</configuration>
EOF
cat > $HADOOP_CONF_DIR/hdfs-site.xml <<EOF
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.support.append</name>
<value>true</value>
</property>
<property>
<name>dfs.namenode.logging.level</name>
<value>DEBUG</value>
</property>
</configuration>
EOF
cat > $HADOOP_CONF_DIR/slaves <<EOF
localhost
EOF
# If we are running from the hdfs repo we need to make sure
# HADOOP_BIN_DIR contains the common scripts.
# If the bin directory does not and we've got a common jar extract its
# bin directory to HADOOP_PREFIX/bin. The bin scripts hdfs-config.sh and
# hadoop-config.sh assume the bin directory is named "bin" and that it
# is located in HADOOP_PREFIX.
unpacked_common_bin_dir=0
if [ ! -f $HADOOP_BIN_DIR/hadoop-config.sh ]; then
if [ -f $COMMON_JAR ]; then
jar xf $COMMON_JAR bin.tgz
tar xfz bin.tgz -C $HADOOP_BIN_DIR
unpacked_common_bin_dir=1
fi
if [ "x$HDFS_TEST_JAR" == "x" ]; then
echo "HDFS test jar not found! Tried looking in all subdirectories \
of $HADOOP_HOME/share/hadoop/hdfs/"
exit 1
fi
# Manipulate HADOOP_CONF_DIR too
# which is necessary to circumvent bin/hadoop
HADOOP_CONF_DIR=$HADOOP_CONF_DIR:$HADOOP_PREFIX/conf
echo "Found HDFS test jar at $HDFS_TEST_JAR"
# set pid file dir so they are not written to /tmp
export HADOOP_PID_DIR=$HADOOP_LOG_DIR
# CLASSPATH initially contains $HADOOP_CONF_DIR
CLASSPATH="${HADOOP_CONF_DIR}"
# CLASSPATH initially contains $HDFS_TEST_CONF_DIR
CLASSPATH="${HDFS_TEST_CONF_DIR}"
CLASSPATH=${CLASSPATH}:$JAVA_HOME/lib/tools.jar
# for developers, add Hadoop classes to CLASSPATH
if [ -d "$HADOOP_PREFIX/build/classes" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_PREFIX/build/classes
fi
if [ -d "$HADOOP_PREFIX/build/web/webapps" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_PREFIX/build/web
fi
if [ -d "$HADOOP_PREFIX/build/test/classes" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_PREFIX/build/test/classes
fi
# add Clover jar file needed for code coverage runs
CLASSPATH=${CLASSPATH}:${CLOVER_JAR};
# so that filenames w/ spaces are handled correctly in loops below
IFS=
IFS=$'\n'
# add libs to CLASSPATH
for f in $HADOOP_PREFIX/lib/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
JAR_DIRS="$HADOOP_HOME/share/hadoop/common/lib/
$HADOOP_HOME/share/hadoop/common/
$HADOOP_HOME/share/hadoop/hdfs
$HADOOP_HOME/share/hadoop/hdfs/lib/"
for f in $HADOOP_PREFIX/*.jar; do
CLASSPATH=${CLASSPATH}:$f
done
for f in $HADOOP_PREFIX/lib/jsp-2.1/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
if [ -d "$COMMON_BUILD_DIR" ]; then
CLASSPATH=$CLASSPATH:$COMMON_JAR
for f in $COMMON_BUILD_DIR/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
fi
for d in $JAR_DIRS; do
for j in $d/*.jar; do
CLASSPATH=${CLASSPATH}:$j
done;
done;
# restore ordinary behaviour
unset IFS
@ -178,21 +116,37 @@ echo LIB_JVM_DIR = $LIB_JVM_DIR
echo "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++"
# Put delays to ensure hdfs is up and running and also shuts down
# after the tests are complete
cd $HADOOP_PREFIX
echo Y | $HADOOP_BIN_DIR/hdfs namenode -format &&
$HADOOP_BIN_DIR/hadoop-daemon.sh --script $HADOOP_BIN_DIR/hdfs start namenode && sleep 2
$HADOOP_BIN_DIR/hadoop-daemon.sh --script $HADOOP_BIN_DIR/hdfs start datanode && sleep 2
echo "Wait 30s for the datanode to start up..."
sleep 30
CLASSPATH=$CLASSPATH LD_PRELOAD="$LIB_JVM_DIR/libjvm.so:$LIBHDFS_INSTALL_DIR/libhdfs.so:" $LIBHDFS_BUILD_DIR/$HDFS_TEST
BUILD_STATUS=$?
sleep 3
$HADOOP_BIN_DIR/hadoop-daemon.sh --script $HADOOP_BIN_DIR/hdfs stop datanode && sleep 2
$HADOOP_BIN_DIR/hadoop-daemon.sh --script $HADOOP_BIN_DIR/hdfs stop namenode && sleep 2
rm $HDFS_TEST_CONF_DIR/core-site.xml
if [ $unpacked_common_bin_dir -eq 1 ]; then
rm -rf bin.tgz
$HADOOP_HOME/bin/hadoop jar $HDFS_TEST_JAR \
org.apache.hadoop.test.MiniDFSClusterManager \
-format -nnport 20300 -writeConfig $HDFS_TEST_CONF_DIR/core-site.xml \
> /tmp/libhdfs-test-cluster.out 2>&1 &
MINI_CLUSTER_PID=$!
for i in {1..15}; do
echo "Waiting for DFS cluster, attempt $i of 15"
[ -f $HDFS_TEST_CONF_DIR/core-site.xml ] && break;
sleep 2
done
if [ ! -f $HDFS_TEST_CONF_DIR/core-site.xml ]; then
echo "Cluster did not come up in 30s"
kill -9 $MINI_CLUSTER_PID
exit 1
fi
echo exiting with $BUILD_STATUS
echo "Cluster up, running tests"
# Disable error checking to make sure we get to cluster cleanup
set +e
CLASSPATH=$CLASSPATH \
LD_PRELOAD="$LIB_JVM_DIR/libjvm.so:$LIBHDFS_INSTALL_DIR/libhdfs.so:" \
$LIBHDFS_BUILD_DIR/$HDFS_TEST
BUILD_STATUS=$?
echo "Tearing cluster down"
kill -9 $MINI_CLUSTER_PID
echo "Exiting with $BUILD_STATUS"
exit $BUILD_STATUS