HDFS-3916. libwebhdfs (C client) code cleanups. Contributed by Colin Patrick McCabe

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1393890 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Eli Collins 2012-10-04 02:53:28 +00:00
parent 38943644e0
commit ee13fb4f69
22 changed files with 952 additions and 3175 deletions

View File

@ -246,6 +246,9 @@ Release 2.0.3-alpha - Unreleased
HDFS-3996. Add debug log removed in HDFS-3873 back. (eli)
HDFS-3916. libwebhdfs (C client) code cleanups.
(Colin Patrick McCabe via eli)
OPTIMIZATIONS
BUG FIXES

View File

@ -85,8 +85,8 @@ CONFIGURE_FILE(${CMAKE_SOURCE_DIR}/config.h.cmake ${CMAKE_BINARY_DIR}/config.h)
add_dual_library(hdfs
main/native/libhdfs/exception.c
main/native/libhdfs/hdfs.c
main/native/libhdfs/jni_helper.c
main/native/libhdfs/hdfs.c
)
target_link_dual_libraries(hdfs
${JAVA_JVM_LIBRARY}

View File

@ -16,28 +16,21 @@
# limitations under the License.
#
find_package(CURL)
if (CURL_FOUND)
include_directories(${CURL_INCLUDE_DIRS})
else (CURL_FOUND)
MESSAGE(STATUS "Failed to find CURL library.")
endif (CURL_FOUND)
find_package(CURL REQUIRED)
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH}
"${CMAKE_SOURCE_DIR}/contrib/libwebhdfs/resources/")
MESSAGE("CMAKE_MODULE_PATH IS: " ${CMAKE_MODULE_PATH})
find_package(Jansson)
find_package(Jansson REQUIRED)
include_directories(${JANSSON_INCLUDE_DIR})
add_dual_library(webhdfs
src/exception.c
src/hdfs_web.c
src/hdfs_jni.c
src/jni_helper.c
src/hdfs_http_client.c
src/hdfs_http_query.c
src/hdfs_json_parser.c
../../main/native/libhdfs/exception.c
../../main/native/libhdfs/jni_helper.c
)
target_link_dual_libraries(webhdfs
${JAVA_JVM_LIBRARY}
@ -55,10 +48,6 @@ add_executable(test_libwebhdfs_ops
)
target_link_libraries(test_libwebhdfs_ops
webhdfs
${CURL_LIBRARY}
${JAVA_JVM_LIBRARY}
${JANSSON_LIBRARY}
pthread
)
add_executable(test_libwebhdfs_read
@ -66,10 +55,6 @@ add_executable(test_libwebhdfs_read
)
target_link_libraries(test_libwebhdfs_read
webhdfs
${CURL_LIBRARY}
${JAVA_JVM_LIBRARY}
${JANSSON_LIBRARY}
pthread
)
add_executable(test_libwebhdfs_write
@ -77,10 +62,6 @@ add_executable(test_libwebhdfs_write
)
target_link_libraries(test_libwebhdfs_write
webhdfs
${CURL_LIBRARY}
${JAVA_JVM_LIBRARY}
${JANSSON_LIBRARY}
pthread
)
add_executable(test_libwebhdfs_threaded
@ -88,8 +69,4 @@ add_executable(test_libwebhdfs_threaded
)
target_link_libraries(test_libwebhdfs_threaded
webhdfs
${CURL_LIBRARY}
${JAVA_JVM_LIBRARY}
${JANSSON_LIBRARY}
pthread
)

View File

@ -1,237 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "exception.h"
#include "webhdfs.h"
#include "jni_helper.h"
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define EXCEPTION_INFO_LEN (sizeof(gExceptionInfo)/sizeof(gExceptionInfo[0]))
struct ExceptionInfo {
const char * const name;
int noPrintFlag;
int excErrno;
};
static const struct ExceptionInfo gExceptionInfo[] = {
{
.name = "java/io/FileNotFoundException",
.noPrintFlag = NOPRINT_EXC_FILE_NOT_FOUND,
.excErrno = ENOENT,
},
{
.name = "org/apache/hadoop/security/AccessControlException",
.noPrintFlag = NOPRINT_EXC_ACCESS_CONTROL,
.excErrno = EACCES,
},
{
.name = "org/apache/hadoop/fs/UnresolvedLinkException",
.noPrintFlag = NOPRINT_EXC_UNRESOLVED_LINK,
.excErrno = ENOLINK,
},
{
.name = "org/apache/hadoop/fs/ParentNotDirectoryException",
.noPrintFlag = NOPRINT_EXC_PARENT_NOT_DIRECTORY,
.excErrno = ENOTDIR,
},
{
.name = "java/lang/IllegalArgumentException",
.noPrintFlag = NOPRINT_EXC_ILLEGAL_ARGUMENT,
.excErrno = EINVAL,
},
{
.name = "java/lang/OutOfMemoryError",
.noPrintFlag = 0,
.excErrno = ENOMEM,
},
};
int printExceptionWebV(hdfs_exception_msg *exc, int noPrintFlags, const char *fmt, va_list ap)
{
int i, noPrint, excErrno;
if (!exc) {
fprintf(stderr, "printExceptionWebV: the hdfs_exception_msg is NULL\n");
return EINTERNAL;
}
for (i = 0; i < EXCEPTION_INFO_LEN; i++) {
if (strstr(gExceptionInfo[i].name, exc->exception)) {
break;
}
}
if (i < EXCEPTION_INFO_LEN) {
noPrint = (gExceptionInfo[i].noPrintFlag & noPrintFlags);
excErrno = gExceptionInfo[i].excErrno;
} else {
noPrint = 0;
excErrno = EINTERNAL;
}
if (!noPrint) {
vfprintf(stderr, fmt, ap);
fprintf(stderr, " error:\n");
fprintf(stderr, "Exception: %s\nJavaClassName: %s\nMessage: %s\n", exc->exception, exc->javaClassName, exc->message);
}
free(exc);
return excErrno;
}
int printExceptionWeb(hdfs_exception_msg *exc, int noPrintFlags, const char *fmt, ...)
{
va_list ap;
int ret;
va_start(ap, fmt);
ret = printExceptionWebV(exc, noPrintFlags, fmt, ap);
va_end(ap);
return ret;
}
int printExceptionAndFreeV(JNIEnv *env, jthrowable exc, int noPrintFlags,
const char *fmt, va_list ap)
{
int i, noPrint, excErrno;
char *className = NULL;
jstring jStr = NULL;
jvalue jVal;
jthrowable jthr;
jthr = classNameOfObject(exc, env, &className);
if (jthr) {
fprintf(stderr, "PrintExceptionAndFree: error determining class name "
"of exception.\n");
className = strdup("(unknown)");
destroyLocalReference(env, jthr);
}
for (i = 0; i < EXCEPTION_INFO_LEN; i++) {
if (!strcmp(gExceptionInfo[i].name, className)) {
break;
}
}
if (i < EXCEPTION_INFO_LEN) {
noPrint = (gExceptionInfo[i].noPrintFlag & noPrintFlags);
excErrno = gExceptionInfo[i].excErrno;
} else {
noPrint = 0;
excErrno = EINTERNAL;
}
if (!noPrint) {
vfprintf(stderr, fmt, ap);
fprintf(stderr, " error:\n");
// We don't want to use ExceptionDescribe here, because that requires a
// pending exception. Instead, use ExceptionUtils.
jthr = invokeMethod(env, &jVal, STATIC, NULL,
"org/apache/commons/lang/exception/ExceptionUtils",
"getStackTrace", "(Ljava/lang/Throwable;)Ljava/lang/String;", exc);
if (jthr) {
fprintf(stderr, "(unable to get stack trace for %s exception: "
"ExceptionUtils::getStackTrace error.)\n", className);
destroyLocalReference(env, jthr);
} else {
jStr = jVal.l;
const char *stackTrace = (*env)->GetStringUTFChars(env, jStr, NULL);
if (!stackTrace) {
fprintf(stderr, "(unable to get stack trace for %s exception: "
"GetStringUTFChars error.)\n", className);
} else {
fprintf(stderr, "%s", stackTrace);
(*env)->ReleaseStringUTFChars(env, jStr, stackTrace);
}
}
}
destroyLocalReference(env, jStr);
destroyLocalReference(env, exc);
free(className);
return excErrno;
}
int printExceptionAndFree(JNIEnv *env, jthrowable exc, int noPrintFlags,
const char *fmt, ...)
{
va_list ap;
int ret;
va_start(ap, fmt);
ret = printExceptionAndFreeV(env, exc, noPrintFlags, fmt, ap);
va_end(ap);
return ret;
}
int printPendingExceptionAndFree(JNIEnv *env, int noPrintFlags,
const char *fmt, ...)
{
va_list ap;
int ret;
jthrowable exc;
exc = (*env)->ExceptionOccurred(env);
if (!exc) {
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
fprintf(stderr, " error: (no exception)");
ret = 0;
} else {
(*env)->ExceptionClear(env);
va_start(ap, fmt);
ret = printExceptionAndFreeV(env, exc, noPrintFlags, fmt, ap);
va_end(ap);
}
return ret;
}
jthrowable getPendingExceptionAndClear(JNIEnv *env)
{
jthrowable jthr = (*env)->ExceptionOccurred(env);
if (!jthr)
return NULL;
(*env)->ExceptionClear(env);
return jthr;
}
jthrowable newRuntimeError(JNIEnv *env, const char *fmt, ...)
{
char buf[512];
jobject out, exc;
jstring jstr;
va_list ap;
va_start(ap, fmt);
vsnprintf(buf, sizeof(buf), fmt, ap);
va_end(ap);
jstr = (*env)->NewStringUTF(env, buf);
if (!jstr) {
// We got an out of memory exception rather than a RuntimeException.
// Too bad...
return getPendingExceptionAndClear(env);
}
exc = constructNewObjectOfClass(env, &out, "RuntimeException",
"(java/lang/String;)V", jstr);
(*env)->DeleteLocalRef(env, jstr);
// Again, we'll either get an out of memory exception or the
// RuntimeException we wanted.
return (exc) ? exc : out;
}

View File

@ -1,178 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBHDFS_EXCEPTION_H
#define LIBHDFS_EXCEPTION_H
/**
* Exception handling routines for libhdfs.
*
* The convention we follow here is to clear pending exceptions as soon as they
* are raised. Never assume that the caller of your function will clean up
* after you-- do it yourself. Unhandled exceptions can lead to memory leaks
* and other undefined behavior.
*
* If you encounter an exception, return a local reference to it. The caller is
* responsible for freeing the local reference, by calling a function like
* PrintExceptionAndFree. (You can also free exceptions directly by calling
* DeleteLocalRef. However, that would not produce an error message, so it's
* usually not what you want.)
*/
#include <jni.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <search.h>
#include <pthread.h>
#include <errno.h>
/**
* Exception noprint flags
*
* Theses flags determine which exceptions should NOT be printed to stderr by
* the exception printing routines. For example, if you expect to see
* FileNotFound, you might use NOPRINT_EXC_FILE_NOT_FOUND, to avoid filling the
* logs with messages about routine events.
*
* On the other hand, if you don't expect any failures, you might pass
* PRINT_EXC_ALL.
*
* You can OR these flags together to avoid printing multiple classes of
* exceptions.
*/
#define PRINT_EXC_ALL 0x00
#define NOPRINT_EXC_FILE_NOT_FOUND 0x01
#define NOPRINT_EXC_ACCESS_CONTROL 0x02
#define NOPRINT_EXC_UNRESOLVED_LINK 0x04
#define NOPRINT_EXC_PARENT_NOT_DIRECTORY 0x08
#define NOPRINT_EXC_ILLEGAL_ARGUMENT 0x10
/**
* Exception information after calling webhdfs operations
*/
typedef struct {
const char *exception;
const char *javaClassName;
const char *message;
} hdfs_exception_msg;
/**
* Print out exception information got after calling webhdfs operations
*
* @param exc The exception information to print and free
* @param noPrintFlags Flags which determine which exceptions we should NOT
* print.
* @param fmt Printf-style format list
* @param ap Printf-style varargs
*
* @return The POSIX error number associated with the exception
* object.
*/
int printExceptionWebV(hdfs_exception_msg *exc, int noPrintFlags, const char *fmt, va_list ap);
/**
* Print out exception information got after calling webhdfs operations
*
* @param exc The exception information to print and free
* @param noPrintFlags Flags which determine which exceptions we should NOT
* print.
* @param fmt Printf-style format list
* @param ... Printf-style varargs
*
* @return The POSIX error number associated with the exception
* object.
*/
int printExceptionWeb(hdfs_exception_msg *exc, int noPrintFlags,
const char *fmt, ...) __attribute__((format(printf, 3, 4)));
/**
* Print out information about an exception and free it.
*
* @param env The JNI environment
* @param exc The exception to print and free
* @param noPrintFlags Flags which determine which exceptions we should NOT
* print.
* @param fmt Printf-style format list
* @param ap Printf-style varargs
*
* @return The POSIX error number associated with the exception
* object.
*/
int printExceptionAndFreeV(JNIEnv *env, jthrowable exc, int noPrintFlags,
const char *fmt, va_list ap);
/**
* Print out information about an exception and free it.
*
* @param env The JNI environment
* @param exc The exception to print and free
* @param noPrintFlags Flags which determine which exceptions we should NOT
* print.
* @param fmt Printf-style format list
* @param ... Printf-style varargs
*
* @return The POSIX error number associated with the exception
* object.
*/
int printExceptionAndFree(JNIEnv *env, jthrowable exc, int noPrintFlags,
const char *fmt, ...) __attribute__((format(printf, 4, 5)));
/**
* Print out information about the pending exception and free it.
*
* @param env The JNI environment
* @param noPrintFlags Flags which determine which exceptions we should NOT
* print.
* @param fmt Printf-style format list
* @param ... Printf-style varargs
*
* @return The POSIX error number associated with the exception
* object.
*/
int printPendingExceptionAndFree(JNIEnv *env, int noPrintFlags,
const char *fmt, ...) __attribute__((format(printf, 3, 4)));
/**
* Get a local reference to the pending exception and clear it.
*
* Once it is cleared, the exception will no longer be pending. The caller will
* have to decide what to do with the exception object.
*
* @param env The JNI environment
*
* @return The exception, or NULL if there was no exception
*/
jthrowable getPendingExceptionAndClear(JNIEnv *env);
/**
* Create a new runtime error.
*
* This creates (but does not throw) a new RuntimeError.
*
* @param env The JNI environment
* @param fmt Printf-style format list
* @param ... Printf-style varargs
*
* @return A local reference to a RuntimeError
*/
jthrowable newRuntimeError(JNIEnv *env, const char *fmt, ...)
__attribute__((format(printf, 2, 3)));
#endif

View File

@ -1,101 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBHDFS_NATIVE_TESTS_EXPECT_H
#define LIBHDFS_NATIVE_TESTS_EXPECT_H
#include <stdio.h>
#define EXPECT_ZERO(x) \
do { \
int __my_ret__ = x; \
if (__my_ret__) { \
int __my_errno__ = errno; \
fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
"code %d (errno: %d): got nonzero from %s\n", \
__LINE__, __my_ret__, __my_errno__, #x); \
return __my_ret__; \
} \
} while (0);
#define EXPECT_NULL(x) \
do { \
void* __my_ret__ = x; \
int __my_errno__ = errno; \
if (__my_ret__ != NULL) { \
fprintf(stderr, "TEST_ERROR: failed on line %d (errno: %d): " \
"got non-NULL value %p from %s\n", \
__LINE__, __my_errno__, __my_ret__, #x); \
return -1; \
} \
} while (0);
#define EXPECT_NONNULL(x) \
do { \
void* __my_ret__ = x; \
int __my_errno__ = errno; \
if (__my_ret__ == NULL) { \
fprintf(stderr, "TEST_ERROR: failed on line %d (errno: %d): " \
"got NULL from %s\n", __LINE__, __my_errno__, #x); \
return -1; \
} \
} while (0);
#define EXPECT_NEGATIVE_ONE_WITH_ERRNO(x, e) \
do { \
int __my_ret__ = x; \
int __my_errno__ = errno; \
if (__my_ret__ != -1) { \
fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
"code %d (errno: %d): expected -1 from %s\n", __LINE__, \
__my_ret__, __my_errno__, #x); \
return -1; \
} \
if (__my_errno__ != e) { \
fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
"code %d (errno: %d): expected errno = %d from %s\n", \
__LINE__, __my_ret__, __my_errno__, e, #x); \
return -1; \
} \
} while (0);
#define EXPECT_NONZERO(x) \
do { \
int __my_ret__ = x; \
int __my_errno__ = errno; \
if (__my_ret__) { \
fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
"code %d (errno: %d): got zero from %s\n", __LINE__, \
__my_ret__, __my_errno__, #x); \
return -1; \
} \
} while (0);
#define EXPECT_NONNEGATIVE(x) \
do { \
int __my_ret__ = x; \
int __my_errno__ = errno; \
if (__my_ret__ < 0) { \
fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
"code %d (errno: %d): got negative return from %s\n", \
__LINE__, __my_ret__, __my_errno__, #x); \
return __my_ret__; \
} \
} while (0);
#endif

View File

@ -21,8 +21,42 @@
#ifndef _HDFS_HTTP_CLIENT_H_
#define _HDFS_HTTP_CLIENT_H_
#include "webhdfs.h"
#include <curl/curl.h>
#include "hdfs.h" /* for tSize */
#include <pthread.h> /* for pthread_t */
#include <unistd.h> /* for size_t */
enum hdfsStreamType
{
UNINITIALIZED = 0,
INPUT = 1,
OUTPUT = 2,
};
/**
* webhdfsBuffer - used for hold the data for read/write from/to http connection
*/
typedef struct {
const char *wbuffer; // The user's buffer for uploading
size_t remaining; // Length of content
size_t offset; // offset for reading
int openFlag; // Check whether the hdfsOpenFile has been called before
int closeFlag; // Whether to close the http connection for writing
pthread_mutex_t writeMutex; // Synchronization between the curl and hdfsWrite threads
pthread_cond_t newwrite_or_close; // Transferring thread waits for this condition
// when there is no more content for transferring in the buffer
pthread_cond_t transfer_finish; // Condition used to indicate finishing transferring (one buffer)
} webhdfsBuffer;
struct webhdfsFileHandle {
char *absPath;
int bufferSize;
short replication;
tSize blockSize;
char *datanode;
webhdfsBuffer *uploadBuffer;
pthread_t connThread;
};
enum HttpHeader {
GET,

View File

@ -1,616 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdio.h>
#include <string.h>
#include "webhdfs.h"
#include "jni_helper.h"
#include "exception.h"
/* Some frequently used Java paths */
#define HADOOP_CONF "org/apache/hadoop/conf/Configuration"
#define HADOOP_PATH "org/apache/hadoop/fs/Path"
#define HADOOP_LOCALFS "org/apache/hadoop/fs/LocalFileSystem"
#define HADOOP_FS "org/apache/hadoop/fs/FileSystem"
#define HADOOP_FSSTATUS "org/apache/hadoop/fs/FsStatus"
#define HADOOP_BLK_LOC "org/apache/hadoop/fs/BlockLocation"
#define HADOOP_DFS "org/apache/hadoop/hdfs/DistributedFileSystem"
#define HADOOP_ISTRM "org/apache/hadoop/fs/FSDataInputStream"
#define HADOOP_OSTRM "org/apache/hadoop/fs/FSDataOutputStream"
#define HADOOP_STAT "org/apache/hadoop/fs/FileStatus"
#define HADOOP_FSPERM "org/apache/hadoop/fs/permission/FsPermission"
#define JAVA_NET_ISA "java/net/InetSocketAddress"
#define JAVA_NET_URI "java/net/URI"
#define JAVA_STRING "java/lang/String"
#define JAVA_VOID "V"
/* Macros for constructing method signatures */
#define JPARAM(X) "L" X ";"
#define JARRPARAM(X) "[L" X ";"
#define JMETHOD1(X, R) "(" X ")" R
#define JMETHOD2(X, Y, R) "(" X Y ")" R
#define JMETHOD3(X, Y, Z, R) "(" X Y Z")" R
#define KERBEROS_TICKET_CACHE_PATH "hadoop.security.kerberos.ticket.cache.path"
/**
* Helper function to create a org.apache.hadoop.fs.Path object.
* @param env: The JNIEnv pointer.
* @param path: The file-path for which to construct org.apache.hadoop.fs.Path
* object.
* @return Returns a jobject on success and NULL on error.
*/
static jthrowable constructNewObjectOfPath(JNIEnv *env, const char *path,
jobject *out)
{
jthrowable jthr;
jstring jPathString;
jobject jPath;
//Construct a java.lang.String object
jthr = newJavaStr(env, path, &jPathString);
if (jthr)
return jthr;
//Construct the org.apache.hadoop.fs.Path object
jthr = constructNewObjectOfClass(env, &jPath, "org/apache/hadoop/fs/Path",
"(Ljava/lang/String;)V", jPathString);
destroyLocalReference(env, jPathString);
if (jthr)
return jthr;
*out = jPath;
return NULL;
}
/**
* Set a configuration value.
*
* @param env The JNI environment
* @param jConfiguration The configuration object to modify
* @param key The key to modify
* @param value The value to set the key to
*
* @return NULL on success; exception otherwise
*/
static jthrowable hadoopConfSetStr(JNIEnv *env, jobject jConfiguration,
const char *key, const char *value)
{
jthrowable jthr;
jstring jkey = NULL, jvalue = NULL;
jthr = newJavaStr(env, key, &jkey);
if (jthr)
goto done;
jthr = newJavaStr(env, value, &jvalue);
if (jthr)
goto done;
jthr = invokeMethod(env, NULL, INSTANCE, jConfiguration,
HADOOP_CONF, "set", JMETHOD2(JPARAM(JAVA_STRING),
JPARAM(JAVA_STRING), JAVA_VOID),
jkey, jvalue);
if (jthr)
goto done;
done:
destroyLocalReference(env, jkey);
destroyLocalReference(env, jvalue);
return jthr;
}
static jthrowable hadoopConfGetStr(JNIEnv *env, jobject jConfiguration,
const char *key, char **val)
{
jthrowable jthr;
jvalue jVal;
jstring jkey = NULL, jRet = NULL;
jthr = newJavaStr(env, key, &jkey);
if (jthr)
goto done;
jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration,
HADOOP_CONF, "get", JMETHOD1(JPARAM(JAVA_STRING),
JPARAM(JAVA_STRING)), jkey);
if (jthr)
goto done;
jRet = jVal.l;
jthr = newCStr(env, jRet, val);
done:
destroyLocalReference(env, jkey);
destroyLocalReference(env, jRet);
return jthr;
}
int hdfsConfGetStr(const char *key, char **val)
{
JNIEnv *env;
int ret;
jthrowable jthr;
jobject jConfiguration = NULL;
env = getJNIEnv();
if (env == NULL) {
ret = EINTERNAL;
goto done;
}
jthr = constructNewObjectOfClass(env, &jConfiguration, HADOOP_CONF, "()V");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsConfGetStr(%s): new Configuration", key);
goto done;
}
jthr = hadoopConfGetStr(env, jConfiguration, key, val);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsConfGetStr(%s): hadoopConfGetStr", key);
goto done;
}
ret = 0;
done:
destroyLocalReference(env, jConfiguration);
if (ret)
errno = ret;
return ret;
}
void hdfsConfStrFree(char *val)
{
free(val);
}
static jthrowable hadoopConfGetInt(JNIEnv *env, jobject jConfiguration,
const char *key, int32_t *val)
{
jthrowable jthr = NULL;
jvalue jVal;
jstring jkey = NULL;
jthr = newJavaStr(env, key, &jkey);
if (jthr)
return jthr;
jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration,
HADOOP_CONF, "getInt", JMETHOD2(JPARAM(JAVA_STRING), "I", "I"),
jkey, (jint)(*val));
destroyLocalReference(env, jkey);
if (jthr)
return jthr;
*val = jVal.i;
return NULL;
}
int hdfsConfGetInt(const char *key, int32_t *val)
{
JNIEnv *env;
int ret;
jobject jConfiguration = NULL;
jthrowable jthr;
env = getJNIEnv();
if (env == NULL) {
ret = EINTERNAL;
goto done;
}
jthr = constructNewObjectOfClass(env, &jConfiguration, HADOOP_CONF, "()V");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsConfGetInt(%s): new Configuration", key);
goto done;
}
jthr = hadoopConfGetInt(env, jConfiguration, key, val);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsConfGetInt(%s): hadoopConfGetInt", key);
goto done;
}
ret = 0;
done:
destroyLocalReference(env, jConfiguration);
if (ret)
errno = ret;
return ret;
}
/**
* Calculate the effective URI to use, given a builder configuration.
*
* If there is not already a URI scheme, we prepend 'hdfs://'.
*
* If there is not already a port specified, and a port was given to the
* builder, we suffix that port. If there is a port specified but also one in
* the URI, that is an error.
*
* @param bld The hdfs builder object
* @param uri (out param) dynamically allocated string representing the
* effective URI
*
* @return 0 on success; error code otherwise
*/
static int calcEffectiveURI(struct hdfsBuilder *bld, char ** uri)
{
const char *scheme;
char suffix[64];
const char *lastColon;
char *u;
size_t uriLen;
if (!bld->nn_jni)
return EINVAL;
scheme = (strstr(bld->nn_jni, "://")) ? "" : "hdfs://";
if (bld->port == 0) {
suffix[0] = '\0';
} else {
lastColon = rindex(bld->nn_jni, ':');
if (lastColon && (strspn(lastColon + 1, "0123456789") ==
strlen(lastColon + 1))) {
fprintf(stderr, "port %d was given, but URI '%s' already "
"contains a port!\n", bld->port, bld->nn_jni);
return EINVAL;
}
snprintf(suffix, sizeof(suffix), ":%d", bld->port);
}
uriLen = strlen(scheme) + strlen(bld->nn_jni) + strlen(suffix);
u = malloc((uriLen + 1) * (sizeof(char)));
if (!u) {
fprintf(stderr, "calcEffectiveURI: out of memory");
return ENOMEM;
}
snprintf(u, uriLen + 1, "%s%s%s", scheme, bld->nn_jni, suffix);
*uri = u;
return 0;
}
static const char *maybeNull(const char *str)
{
return str ? str : "(NULL)";
}
const char *hdfsBuilderToStr(const struct hdfsBuilder *bld,
char *buf, size_t bufLen)
{
snprintf(buf, bufLen, "forceNewInstance=%d, nn=%s, port=%d, "
"kerbTicketCachePath=%s, userName=%s, workingDir=%s\n",
bld->forceNewInstance, maybeNull(bld->nn), bld->port,
maybeNull(bld->kerbTicketCachePath),
maybeNull(bld->userName), maybeNull(bld->workingDir));
return buf;
}
/*
* The JNI version of builderConnect, return the reflection of FileSystem
*/
jobject hdfsBuilderConnect_JNI(JNIEnv *env, struct hdfsBuilder *bld)
{
jobject jConfiguration = NULL, jFS = NULL, jURI = NULL, jCachePath = NULL;
jstring jURIString = NULL, jUserString = NULL;
jvalue jVal;
jthrowable jthr = NULL;
char *cURI = 0, buf[512];
int ret;
jobject jRet = NULL;
// jConfiguration = new Configuration();
jthr = constructNewObjectOfClass(env, &jConfiguration, HADOOP_CONF, "()V");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect_JNI(%s)", hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
//Check what type of FileSystem the caller wants...
if (bld->nn_jni == NULL) {
// Get a local filesystem.
// Also handle the scenario where nn of hdfsBuilder is set to localhost.
if (bld->forceNewInstance) {
// fs = FileSytem#newInstanceLocal(conf);
jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS,
"newInstanceLocal", JMETHOD1(JPARAM(HADOOP_CONF),
JPARAM(HADOOP_LOCALFS)), jConfiguration);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect_JNI(%s)",
hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
jFS = jVal.l;
} else {
// fs = FileSytem#getLocal(conf);
jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS, "getLocal",
JMETHOD1(JPARAM(HADOOP_CONF),
JPARAM(HADOOP_LOCALFS)),
jConfiguration);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect_JNI(%s)",
hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
jFS = jVal.l;
}
} else {
if (!strcmp(bld->nn_jni, "default")) {
// jURI = FileSystem.getDefaultUri(conf)
jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS,
"getDefaultUri",
"(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/URI;",
jConfiguration);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect_JNI(%s)",
hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
jURI = jVal.l;
} else {
// fs = FileSystem#get(URI, conf, ugi);
ret = calcEffectiveURI(bld, &cURI);
if (ret)
goto done;
jthr = newJavaStr(env, cURI, &jURIString);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect_JNI(%s)",
hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
jthr = invokeMethod(env, &jVal, STATIC, NULL, JAVA_NET_URI,
"create", "(Ljava/lang/String;)Ljava/net/URI;",
jURIString);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect_JNI(%s)",
hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
jURI = jVal.l;
}
if (bld->kerbTicketCachePath) {
jthr = hadoopConfSetStr(env, jConfiguration,
KERBEROS_TICKET_CACHE_PATH, bld->kerbTicketCachePath);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect_JNI(%s)",
hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
}
jthr = newJavaStr(env, bld->userName, &jUserString);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect_JNI(%s)",
hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
if (bld->forceNewInstance) {
jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS,
"newInstance", JMETHOD3(JPARAM(JAVA_NET_URI),
JPARAM(HADOOP_CONF), JPARAM(JAVA_STRING),
JPARAM(HADOOP_FS)),
jURI, jConfiguration, jUserString);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect_JNI(%s)",
hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
jFS = jVal.l;
} else {
jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS, "get",
JMETHOD3(JPARAM(JAVA_NET_URI), JPARAM(HADOOP_CONF),
JPARAM(JAVA_STRING), JPARAM(HADOOP_FS)),
jURI, jConfiguration, jUserString);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect_JNI(%s)",
hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
jFS = jVal.l;
}
}
jRet = (*env)->NewGlobalRef(env, jFS);
if (!jRet) {
ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
"hdfsBuilderConnect_JNI(%s)",
hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
ret = 0;
done:
// Release unnecessary local references
destroyLocalReference(env, jConfiguration);
destroyLocalReference(env, jFS);
destroyLocalReference(env, jURI);
destroyLocalReference(env, jCachePath);
destroyLocalReference(env, jURIString);
destroyLocalReference(env, jUserString);
free(cURI);
if (ret) {
errno = ret;
return NULL;
}
return jRet;
}
int hdfsDisconnect_JNI(jobject jFS)
{
// JAVA EQUIVALENT:
// fs.close()
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
int ret;
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
//Sanity check
if (jFS == NULL) {
errno = EBADF;
return -1;
}
jthrowable jthr = invokeMethod(env, NULL, INSTANCE, jFS, HADOOP_FS,
"close", "()V");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsDisconnect: FileSystem#close");
} else {
ret = 0;
}
(*env)->DeleteGlobalRef(env, jFS);
if (ret) {
errno = ret;
return -1;
}
return 0;
}
static int hdfsCopyImpl(hdfsFS srcFS, const char* src, hdfsFS dstFS,
const char* dst, jboolean deleteSource)
{
//JAVA EQUIVALENT
// FileUtil#copy(srcFS, srcPath, dstFS, dstPath,
// deleteSource = false, conf)
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
//In libwebhdfs, the hdfsFS derived from hdfsBuilderConnect series functions
//is actually a hdfsBuilder instance containing address information of NameNode.
//Thus here we need to use JNI to get the real java FileSystem objects.
jobject jSrcFS = hdfsBuilderConnect_JNI(env, (struct hdfsBuilder *) srcFS);
jobject jDstFS = hdfsBuilderConnect_JNI(env, (struct hdfsBuilder *) dstFS);
//Parameters
jobject jConfiguration = NULL, jSrcPath = NULL, jDstPath = NULL;
jthrowable jthr;
jvalue jVal;
int ret;
jthr = constructNewObjectOfPath(env, src, &jSrcPath);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsCopyImpl(src=%s): constructNewObjectOfPath", src);
goto done;
}
jthr = constructNewObjectOfPath(env, dst, &jDstPath);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsCopyImpl(dst=%s): constructNewObjectOfPath", dst);
goto done;
}
//Create the org.apache.hadoop.conf.Configuration object
jthr = constructNewObjectOfClass(env, &jConfiguration,
HADOOP_CONF, "()V");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsCopyImpl: Configuration constructor");
goto done;
}
//FileUtil#copy
jthr = invokeMethod(env, &jVal, STATIC,
NULL, "org/apache/hadoop/fs/FileUtil", "copy",
"(Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;"
"Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;"
"ZLorg/apache/hadoop/conf/Configuration;)Z",
jSrcFS, jSrcPath, jDstFS, jDstPath, deleteSource,
jConfiguration);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsCopyImpl(src=%s, dst=%s, deleteSource=%d): "
"FileUtil#copy", src, dst, deleteSource);
goto done;
}
if (!jVal.z) {
ret = EIO;
goto done;
}
ret = 0;
done:
destroyLocalReference(env, jConfiguration);
destroyLocalReference(env, jSrcPath);
destroyLocalReference(env, jDstPath);
//Disconnect src/dst FileSystem
hdfsDisconnect_JNI(jSrcFS);
hdfsDisconnect_JNI(jDstFS);
if (ret) {
errno = ret;
return -1;
}
return 0;
}
int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst)
{
return hdfsCopyImpl(srcFS, src, dstFS, dst, 0);
}
int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst)
{
return hdfsCopyImpl(srcFS, src, dstFS, dst, 1);
}
tOffset hdfsGetDefaultBlockSize(hdfsFS fs)
{
// JAVA EQUIVALENT:
// fs.getDefaultBlockSize();
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
//In libwebhdfs, the hdfsFS derived from hdfsConnect functions
//is actually a hdfsBuilder instance containing address information of NameNode.
//Thus here we need to use JNI to get the real java FileSystem objects.
jobject jFS = hdfsBuilderConnect_JNI(env, (struct hdfsBuilder *) fs);
//FileSystem#getDefaultBlockSize()
jvalue jVal;
jthrowable jthr;
jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
"getDefaultBlockSize", "()J");
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsGetDefaultBlockSize: FileSystem#getDefaultBlockSize");
//Disconnect
hdfsDisconnect_JNI(jFS);
return -1;
}
//Disconnect
hdfsDisconnect_JNI(jFS);
return jVal.j;
}

View File

@ -15,14 +15,76 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "exception.h"
#include "hdfs.h" /* for hdfsFileInfo */
#include "hdfs_json_parser.h"
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#include <jansson.h>
#include "hdfs_json_parser.h"
#include "exception.h"
hdfsFileInfo *parseJsonGFS(json_t *jobj, hdfsFileInfo *fileStat, int *numEntries, const char *operation); //Forward Declaration
/**
* Exception information after calling JSON operations
*/
struct jsonException {
const char *exception;
const char *javaClassName;
const char *message;
};
static hdfsFileInfo *parseJsonGFS(json_t *jobj, hdfsFileInfo *fileStat,
int *numEntries, const char *operation);
static void dotsToSlashes(char *str)
{
for (; *str != '\0'; str++) {
if (*str == '.')
*str = '/';
}
}
int printJsonExceptionV(struct jsonException *exc, int noPrintFlags,
const char *fmt, va_list ap)
{
char *javaClassName = NULL;
int excErrno = EINTERNAL, shouldPrint = 0;
if (!exc) {
fprintf(stderr, "printJsonExceptionV: the jsonException is NULL\n");
return EINTERNAL;
}
javaClassName = strdup(exc->javaClassName);
if (!javaClassName) {
fprintf(stderr, "printJsonExceptionV: internal out of memory error\n");
return EINTERNAL;
}
dotsToSlashes(javaClassName);
getExceptionInfo(javaClassName, noPrintFlags, &excErrno, &shouldPrint);
free(javaClassName);
if (shouldPrint) {
vfprintf(stderr, fmt, ap);
fprintf(stderr, " error:\n");
fprintf(stderr, "Exception: %s\nJavaClassName: %s\nMessage: %s\n",
exc->exception, exc->javaClassName, exc->message);
}
free(exc);
return excErrno;
}
int printJsonException(struct jsonException *exc, int noPrintFlags,
const char *fmt, ...)
{
va_list ap;
int ret;
va_start(ap, fmt);
ret = printJsonExceptionV(exc, noPrintFlags, fmt, ap);
va_end(ap);
return ret;
}
static hdfsFileInfo *json_parse_array(json_t *jobj, char *key, hdfsFileInfo *fileStat, int *numEntries, const char *operation) {
int arraylen = json_array_size(jobj); //Getting the length of the array
@ -88,12 +150,12 @@ int parseDELETE(char *response) {
return (parseBoolean(response));
}
hdfs_exception_msg *parseJsonException(json_t *jobj) {
struct jsonException *parseJsonException(json_t *jobj) {
const char *key;
json_t *value;
hdfs_exception_msg *exception = NULL;
struct jsonException *exception = NULL;
exception = (hdfs_exception_msg *) calloc(1, sizeof(hdfs_exception_msg));
exception = calloc(1, sizeof(*exception));
if (!exception) {
return NULL;
}
@ -117,7 +179,7 @@ hdfs_exception_msg *parseJsonException(json_t *jobj) {
return exception;
}
hdfs_exception_msg *parseException(const char *content) {
struct jsonException *parseException(const char *content) {
if (!content) {
return NULL;
}
@ -145,7 +207,9 @@ hdfs_exception_msg *parseException(const char *content) {
return NULL;
}
hdfsFileInfo *parseJsonGFS(json_t *jobj, hdfsFileInfo *fileStat, int *numEntries, const char *operation) {
static hdfsFileInfo *parseJsonGFS(json_t *jobj, hdfsFileInfo *fileStat,
int *numEntries, const char *operation)
{
const char *tempstr;
const char *key;
json_t *value;
@ -196,9 +260,9 @@ hdfsFileInfo *parseJsonGFS(json_t *jobj, hdfsFileInfo *fileStat, int *numEntries
fileStat = parseJsonGFS(value, &fileStat[0], numEntries, operation);
} else if (!strcmp(key,"RemoteException")) {
//Besides returning NULL, we also need to print the exception information
hdfs_exception_msg *exception = parseJsonException(value);
struct jsonException *exception = parseJsonException(value);
if (exception) {
errno = printExceptionWeb(exception, PRINT_EXC_ALL, "Calling WEBHDFS (%s)", operation);
errno = printJsonException(exception, PRINT_EXC_ALL, "Calling WEBHDFS (%s)", operation);
}
if(fileStat != NULL) {
@ -234,9 +298,9 @@ int checkHeader(char *header, const char *content, const char *operation) {
return 0;
}
if(!(strstr(header, responseCode)) || !(header = strstr(header, "Content-Length"))) {
hdfs_exception_msg *exc = parseException(content);
struct jsonException *exc = parseException(content);
if (exc) {
errno = printExceptionWeb(exc, PRINT_EXC_ALL, "Calling WEBHDFS (%s)", operation);
errno = printJsonException(exc, PRINT_EXC_ALL, "Calling WEBHDFS (%s)", operation);
}
return 0;
}
@ -259,14 +323,14 @@ int parseOPEN(const char *header, const char *content) {
return -1;
}
if(!(strstr(header,responseCode1) && strstr(header, responseCode2))) {
hdfs_exception_msg *exc = parseException(content);
struct jsonException *exc = parseException(content);
if (exc) {
//if the exception is an IOException and it is because the offset is out of the range
//do not print out the exception
if (!strcasecmp(exc->exception, "IOException") && strstr(exc->message, "out of the range")) {
return 0;
}
errno = printExceptionWeb(exc, PRINT_EXC_ALL, "Calling WEBHDFS (OPEN)");
errno = printJsonException(exc, PRINT_EXC_ALL, "Calling WEBHDFS (OPEN)");
}
return -1;
}
@ -297,9 +361,9 @@ int checkIfRedirect(const char *const headerstr, const char *content, const char
}
if(!(tempHeader = strstr(headerstr,responseCode))) {
//process possible exception information
hdfs_exception_msg *exc = parseException(content);
struct jsonException *exc = parseException(content);
if (exc) {
errno = printExceptionWeb(exc, PRINT_EXC_ALL, "Calling WEBHDFS (%s)", operation);
errno = printJsonException(exc, PRINT_EXC_ALL, "Calling WEBHDFS (%s)", operation);
}
return 0;
}
@ -350,9 +414,9 @@ int parseDnWRITE(const char *header, const char *content) {
return 0;
}
if(!(strstr(header,responseCode))) {
hdfs_exception_msg *exc = parseException(content);
struct jsonException *exc = parseException(content);
if (exc) {
errno = printExceptionWeb(exc, PRINT_EXC_ALL, "Calling WEBHDFS (WRITE(DataNode))");
errno = printJsonException(exc, PRINT_EXC_ALL, "Calling WEBHDFS (WRITE(DataNode))");
}
return 0;
}
@ -365,9 +429,9 @@ int parseDnAPPEND(const char *header, const char *content) {
return 0;
}
if(!(strstr(header, responseCode))) {
hdfs_exception_msg *exc = parseException(content);
struct jsonException *exc = parseException(content);
if (exc) {
errno = printExceptionWeb(exc, PRINT_EXC_ALL, "Calling WEBHDFS (APPEND(DataNode))");
errno = printJsonException(exc, PRINT_EXC_ALL, "Calling WEBHDFS (APPEND(DataNode))");
}
return 0;
}

View File

@ -17,7 +17,23 @@
*/
#ifndef _HDFS_JSON_PARSER_H_
#define _HDFS_JSON_PARSER_H_
#include "webhdfs.h"
struct jsonException;
/**
* Print out JSON exception information.
*
* @param exc The exception information to print and free
* @param noPrintFlags Flags which determine which exceptions we should NOT
* print.
* @param fmt Printf-style format list
* @param ... Printf-style varargs
*
* @return The POSIX error number associated with the exception
* object.
*/
int printJsonException(struct jsonException *exc, int noPrintFlags,
const char *fmt, ...);
int parseMKDIR(char *response);
int parseRENAME(char *response);

View File

@ -1,609 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//#include "config.h"
#include "exception.h"
#include "jni_helper.h"
#include <stdio.h>
#include <string.h>
static pthread_mutex_t hdfsHashMutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t jvmMutex = PTHREAD_MUTEX_INITIALIZER;
static volatile int hashTableInited = 0;
#define LOCK_HASH_TABLE() pthread_mutex_lock(&hdfsHashMutex)
#define UNLOCK_HASH_TABLE() pthread_mutex_unlock(&hdfsHashMutex)
/** The Native return types that methods could return */
#define VOID 'V'
#define JOBJECT 'L'
#define JARRAYOBJECT '['
#define JBOOLEAN 'Z'
#define JBYTE 'B'
#define JCHAR 'C'
#define JSHORT 'S'
#define JINT 'I'
#define JLONG 'J'
#define JFLOAT 'F'
#define JDOUBLE 'D'
/**
* MAX_HASH_TABLE_ELEM: The maximum no. of entries in the hashtable.
* It's set to 4096 to account for (classNames + No. of threads)
*/
#define MAX_HASH_TABLE_ELEM 4096
/** Key that allows us to retrieve thread-local storage */
static pthread_key_t gTlsKey;
/** nonzero if we succeeded in initializing gTlsKey. Protected by the jvmMutex */
static int gTlsKeyInitialized = 0;
/** Pthreads thread-local storage for each library thread. */
struct hdfsTls {
JNIEnv *env;
};
/**
* The function that is called whenever a thread with libhdfs thread local data
* is destroyed.
*
* @param v The thread-local data
*/
static void hdfsThreadDestructor(void *v)
{
struct hdfsTls *tls = v;
JavaVM *vm;
JNIEnv *env = tls->env;
jint ret;
ret = (*env)->GetJavaVM(env, &vm);
if (ret) {
fprintf(stderr, "hdfsThreadDestructor: GetJavaVM failed with "
"error %d\n", ret);
(*env)->ExceptionDescribe(env);
} else {
(*vm)->DetachCurrentThread(vm);
}
free(tls);
}
void destroyLocalReference(JNIEnv *env, jobject jObject)
{
if (jObject)
(*env)->DeleteLocalRef(env, jObject);
}
static jthrowable validateMethodType(JNIEnv *env, MethType methType)
{
if (methType != STATIC && methType != INSTANCE) {
return newRuntimeError(env, "validateMethodType(methType=%d): "
"illegal method type.\n", methType);
}
return NULL;
}
jthrowable newJavaStr(JNIEnv *env, const char *str, jstring *out)
{
jstring jstr;
if (!str) {
/* Can't pass NULL to NewStringUTF: the result would be
* implementation-defined. */
*out = NULL;
return NULL;
}
jstr = (*env)->NewStringUTF(env, str);
if (!jstr) {
/* If NewStringUTF returns NULL, an exception has been thrown,
* which we need to handle. Probaly an OOM. */
return getPendingExceptionAndClear(env);
}
*out = jstr;
return NULL;
}
jthrowable newCStr(JNIEnv *env, jstring jstr, char **out)
{
const char *tmp;
if (!jstr) {
*out = NULL;
return NULL;
}
tmp = (*env)->GetStringUTFChars(env, jstr, NULL);
if (!tmp) {
return getPendingExceptionAndClear(env);
}
*out = strdup(tmp);
(*env)->ReleaseStringUTFChars(env, jstr, tmp);
return NULL;
}
static int hashTableInit(void)
{
if (!hashTableInited) {
LOCK_HASH_TABLE();
if (!hashTableInited) {
if (hcreate(MAX_HASH_TABLE_ELEM) == 0) {
fprintf(stderr, "error creating hashtable, <%d>: %s\n",
errno, strerror(errno));
return 0;
}
hashTableInited = 1;
}
UNLOCK_HASH_TABLE();
}
return 1;
}
static int insertEntryIntoTable(const char *key, void *data)
{
ENTRY e, *ep;
if (key == NULL || data == NULL) {
return 0;
}
if (! hashTableInit()) {
return -1;
}
e.data = data;
e.key = (char*)key;
LOCK_HASH_TABLE();
ep = hsearch(e, ENTER);
UNLOCK_HASH_TABLE();
if (ep == NULL) {
fprintf(stderr, "warn adding key (%s) to hash table, <%d>: %s\n",
key, errno, strerror(errno));
}
return 0;
}
static void* searchEntryFromTable(const char *key)
{
ENTRY e,*ep;
if (key == NULL) {
return NULL;
}
hashTableInit();
e.key = (char*)key;
LOCK_HASH_TABLE();
ep = hsearch(e, FIND);
UNLOCK_HASH_TABLE();
if (ep != NULL) {
return ep->data;
}
return NULL;
}
jthrowable invokeMethod(JNIEnv *env, jvalue *retval, MethType methType,
jobject instObj, const char *className,
const char *methName, const char *methSignature, ...)
{
va_list args;
jclass cls;
jmethodID mid;
jthrowable jthr;
const char *str;
char returnType;
jthr = validateMethodType(env, methType);
if (jthr)
return jthr;
jthr = globalClassReference(className, env, &cls);
if (jthr)
return jthr;
jthr = methodIdFromClass(className, methName, methSignature,
methType, env, &mid);
if (jthr)
return jthr;
str = methSignature;
while (*str != ')') str++;
str++;
returnType = *str;
va_start(args, methSignature);
if (returnType == JOBJECT || returnType == JARRAYOBJECT) {
jobject jobj = NULL;
if (methType == STATIC) {
jobj = (*env)->CallStaticObjectMethodV(env, cls, mid, args);
}
else if (methType == INSTANCE) {
jobj = (*env)->CallObjectMethodV(env, instObj, mid, args);
}
retval->l = jobj;
}
else if (returnType == VOID) {
if (methType == STATIC) {
(*env)->CallStaticVoidMethodV(env, cls, mid, args);
}
else if (methType == INSTANCE) {
(*env)->CallVoidMethodV(env, instObj, mid, args);
}
}
else if (returnType == JBOOLEAN) {
jboolean jbool = 0;
if (methType == STATIC) {
jbool = (*env)->CallStaticBooleanMethodV(env, cls, mid, args);
}
else if (methType == INSTANCE) {
jbool = (*env)->CallBooleanMethodV(env, instObj, mid, args);
}
retval->z = jbool;
}
else if (returnType == JSHORT) {
jshort js = 0;
if (methType == STATIC) {
js = (*env)->CallStaticShortMethodV(env, cls, mid, args);
}
else if (methType == INSTANCE) {
js = (*env)->CallShortMethodV(env, instObj, mid, args);
}
retval->s = js;
}
else if (returnType == JLONG) {
jlong jl = -1;
if (methType == STATIC) {
jl = (*env)->CallStaticLongMethodV(env, cls, mid, args);
}
else if (methType == INSTANCE) {
jl = (*env)->CallLongMethodV(env, instObj, mid, args);
}
retval->j = jl;
}
else if (returnType == JINT) {
jint ji = -1;
if (methType == STATIC) {
ji = (*env)->CallStaticIntMethodV(env, cls, mid, args);
}
else if (methType == INSTANCE) {
ji = (*env)->CallIntMethodV(env, instObj, mid, args);
}
retval->i = ji;
}
va_end(args);
jthr = (*env)->ExceptionOccurred(env);
if (jthr) {
(*env)->ExceptionClear(env);
return jthr;
}
return NULL;
}
jthrowable constructNewObjectOfClass(JNIEnv *env, jobject *out, const char *className,
const char *ctorSignature, ...)
{
va_list args;
jclass cls;
jmethodID mid;
jobject jobj;
jthrowable jthr;
jthr = globalClassReference(className, env, &cls);
if (jthr)
return jthr;
jthr = methodIdFromClass(className, "<init>", ctorSignature,
INSTANCE, env, &mid);
if (jthr)
return jthr;
va_start(args, ctorSignature);
jobj = (*env)->NewObjectV(env, cls, mid, args);
va_end(args);
if (!jobj)
return getPendingExceptionAndClear(env);
*out = jobj;
return NULL;
}
jthrowable methodIdFromClass(const char *className, const char *methName,
const char *methSignature, MethType methType,
JNIEnv *env, jmethodID *out)
{
jclass cls;
jthrowable jthr;
jthr = globalClassReference(className, env, &cls);
if (jthr)
return jthr;
jmethodID mid = 0;
jthr = validateMethodType(env, methType);
if (jthr)
return jthr;
if (methType == STATIC) {
mid = (*env)->GetStaticMethodID(env, cls, methName, methSignature);
}
else if (methType == INSTANCE) {
mid = (*env)->GetMethodID(env, cls, methName, methSignature);
}
if (mid == NULL) {
fprintf(stderr, "could not find method %s from class %s with "
"signature %s\n", methName, className, methSignature);
return getPendingExceptionAndClear(env);
}
*out = mid;
return NULL;
}
jthrowable globalClassReference(const char *className, JNIEnv *env, jclass *out)
{
jclass clsLocalRef;
jclass cls = searchEntryFromTable(className);
if (cls) {
*out = cls;
return NULL;
}
clsLocalRef = (*env)->FindClass(env,className);
if (clsLocalRef == NULL) {
return getPendingExceptionAndClear(env);
}
cls = (*env)->NewGlobalRef(env, clsLocalRef);
if (cls == NULL) {
(*env)->DeleteLocalRef(env, clsLocalRef);
return getPendingExceptionAndClear(env);
}
(*env)->DeleteLocalRef(env, clsLocalRef);
insertEntryIntoTable(className, cls);
*out = cls;
return NULL;
}
jthrowable classNameOfObject(jobject jobj, JNIEnv *env, char **name)
{
jthrowable jthr;
jclass cls, clsClass = NULL;
jmethodID mid;
jstring str = NULL;
const char *cstr = NULL;
char *newstr;
cls = (*env)->GetObjectClass(env, jobj);
if (cls == NULL) {
jthr = getPendingExceptionAndClear(env);
goto done;
}
clsClass = (*env)->FindClass(env, "java/lang/Class");
if (clsClass == NULL) {
jthr = getPendingExceptionAndClear(env);
goto done;
}
mid = (*env)->GetMethodID(env, clsClass, "getName", "()Ljava/lang/String;");
if (mid == NULL) {
jthr = getPendingExceptionAndClear(env);
goto done;
}
str = (*env)->CallObjectMethod(env, cls, mid);
if (str == NULL) {
jthr = getPendingExceptionAndClear(env);
goto done;
}
cstr = (*env)->GetStringUTFChars(env, str, NULL);
if (!cstr) {
jthr = getPendingExceptionAndClear(env);
goto done;
}
newstr = strdup(cstr);
if (newstr == NULL) {
jthr = newRuntimeError(env, "classNameOfObject: out of memory");
goto done;
}
*name = newstr;
jthr = NULL;
done:
destroyLocalReference(env, cls);
destroyLocalReference(env, clsClass);
if (str) {
if (cstr)
(*env)->ReleaseStringUTFChars(env, str, cstr);
(*env)->DeleteLocalRef(env, str);
}
return jthr;
}
/**
* Get the global JNI environemnt.
*
* We only have to create the JVM once. After that, we can use it in
* every thread. You must be holding the jvmMutex when you call this
* function.
*
* @return The JNIEnv on success; error code otherwise
*/
static JNIEnv* getGlobalJNIEnv(void)
{
const jsize vmBufLength = 1;
JavaVM* vmBuf[vmBufLength];
JNIEnv *env;
jint rv = 0;
jint noVMs = 0;
jthrowable jthr;
rv = JNI_GetCreatedJavaVMs(&(vmBuf[0]), vmBufLength, &noVMs);
if (rv != 0) {
fprintf(stderr, "JNI_GetCreatedJavaVMs failed with error: %d\n", rv);
return NULL;
}
if (noVMs == 0) {
//Get the environment variables for initializing the JVM
char *hadoopClassPath = getenv("CLASSPATH");
if (hadoopClassPath == NULL) {
fprintf(stderr, "Environment variable CLASSPATH not set!\n");
return NULL;
}
char *hadoopClassPathVMArg = "-Djava.class.path=";
size_t optHadoopClassPathLen = strlen(hadoopClassPath) +
strlen(hadoopClassPathVMArg) + 1;
char *optHadoopClassPath = malloc(sizeof(char)*optHadoopClassPathLen);
snprintf(optHadoopClassPath, optHadoopClassPathLen,
"%s%s", hadoopClassPathVMArg, hadoopClassPath);
// Determine the # of LIBHDFS_OPTS args
int noArgs = 1;
char *hadoopJvmArgs = getenv("LIBHDFS_OPTS");
char jvmArgDelims[] = " ";
char *str, *token, *savePtr;
if (hadoopJvmArgs != NULL) {
hadoopJvmArgs = strdup(hadoopJvmArgs);
for (noArgs = 1, str = hadoopJvmArgs; ; noArgs++, str = NULL) {
token = strtok_r(str, jvmArgDelims, &savePtr);
if (NULL == token) {
break;
}
}
free(hadoopJvmArgs);
}
// Now that we know the # args, populate the options array
JavaVMOption options[noArgs];
options[0].optionString = optHadoopClassPath;
hadoopJvmArgs = getenv("LIBHDFS_OPTS");
if (hadoopJvmArgs != NULL) {
hadoopJvmArgs = strdup(hadoopJvmArgs);
for (noArgs = 1, str = hadoopJvmArgs; ; noArgs++, str = NULL) {
token = strtok_r(str, jvmArgDelims, &savePtr);
if (NULL == token) {
break;
}
options[noArgs].optionString = token;
}
}
//Create the VM
JavaVMInitArgs vm_args;
JavaVM *vm;
vm_args.version = JNI_VERSION_1_2;
vm_args.options = options;
vm_args.nOptions = noArgs;
vm_args.ignoreUnrecognized = 1;
rv = JNI_CreateJavaVM(&vm, (void*)&env, &vm_args);
if (hadoopJvmArgs != NULL) {
free(hadoopJvmArgs);
}
free(optHadoopClassPath);
if (rv != 0) {
fprintf(stderr, "Call to JNI_CreateJavaVM failed "
"with error: %d\n", rv);
return NULL;
}
jthr = invokeMethod(env, NULL, STATIC, NULL,
"org/apache/hadoop/fs/FileSystem",
"loadFileSystems", "()V");
if (jthr) {
printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "loadFileSystems");
}
}
else {
//Attach this thread to the VM
JavaVM* vm = vmBuf[0];
rv = (*vm)->AttachCurrentThread(vm, (void*)&env, 0);
if (rv != 0) {
fprintf(stderr, "Call to AttachCurrentThread "
"failed with error: %d\n", rv);
return NULL;
}
}
return env;
}
/**
* getJNIEnv: A helper function to get the JNIEnv* for the given thread.
* If no JVM exists, then one will be created. JVM command line arguments
* are obtained from the LIBHDFS_OPTS environment variable.
*
* Implementation note: we rely on POSIX thread-local storage (tls).
* This allows us to associate a destructor function with each thread, that
* will detach the thread from the Java VM when the thread terminates. If we
* failt to do this, it will cause a memory leak.
*
* However, POSIX TLS is not the most efficient way to do things. It requires a
* key to be initialized before it can be used. Since we don't know if this key
* is initialized at the start of this function, we have to lock a mutex first
* and check. Luckily, most operating systems support the more efficient
* __thread construct, which is initialized by the linker.
*
* @param: None.
* @return The JNIEnv* corresponding to the thread.
*/
JNIEnv* getJNIEnv(void)
{
JNIEnv *env;
struct hdfsTls *tls;
int ret;
#ifdef HAVE_BETTER_TLS
static __thread struct hdfsTls *quickTls = NULL;
if (quickTls)
return quickTls->env;
#endif
pthread_mutex_lock(&jvmMutex);
if (!gTlsKeyInitialized) {
ret = pthread_key_create(&gTlsKey, hdfsThreadDestructor);
if (ret) {
pthread_mutex_unlock(&jvmMutex);
fprintf(stderr, "getJNIEnv: pthread_key_create failed with "
"error %d\n", ret);
return NULL;
}
gTlsKeyInitialized = 1;
}
tls = pthread_getspecific(gTlsKey);
if (tls) {
pthread_mutex_unlock(&jvmMutex);
return tls->env;
}
env = getGlobalJNIEnv();
pthread_mutex_unlock(&jvmMutex);
if (!env) {
fprintf(stderr, "getJNIEnv: getGlobalJNIEnv failed\n");
return NULL;
}
tls = calloc(1, sizeof(struct hdfsTls));
if (!tls) {
fprintf(stderr, "getJNIEnv: OOM allocating %zd bytes\n",
sizeof(struct hdfsTls));
return NULL;
}
tls->env = env;
ret = pthread_setspecific(gTlsKey, tls);
if (ret) {
fprintf(stderr, "getJNIEnv: pthread_setspecific failed with "
"error code %d\n", ret);
hdfsThreadDestructor(tls);
return NULL;
}
#ifdef HAVE_BETTER_TLS
quickTls = tls;
#endif
return env;
}

View File

@ -1,122 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBHDFS_JNI_HELPER_H
#define LIBHDFS_JNI_HELPER_H
#include <jni.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <search.h>
#include <pthread.h>
#include <errno.h>
#define PATH_SEPARATOR ':'
/** Denote the method we want to invoke as STATIC or INSTANCE */
typedef enum {
STATIC,
INSTANCE
} MethType;
/**
* Create a new malloc'ed C string from a Java string.
*
* @param env The JNI environment
* @param jstr The Java string
* @param out (out param) the malloc'ed C string
*
* @return NULL on success; the exception otherwise
*/
jthrowable newCStr(JNIEnv *env, jstring jstr, char **out);
/**
* Create a new Java string from a C string.
*
* @param env The JNI environment
* @param str The C string
* @param out (out param) the java string
*
* @return NULL on success; the exception otherwise
*/
jthrowable newJavaStr(JNIEnv *env, const char *str, jstring *out);
/**
* Helper function to destroy a local reference of java.lang.Object
* @param env: The JNIEnv pointer.
* @param jFile: The local reference of java.lang.Object object
* @return None.
*/
void destroyLocalReference(JNIEnv *env, jobject jObject);
/** invokeMethod: Invoke a Static or Instance method.
* className: Name of the class where the method can be found
* methName: Name of the method
* methSignature: the signature of the method "(arg-types)ret-type"
* methType: The type of the method (STATIC or INSTANCE)
* instObj: Required if the methType is INSTANCE. The object to invoke
the method on.
* env: The JNIEnv pointer
* retval: The pointer to a union type which will contain the result of the
method invocation, e.g. if the method returns an Object, retval will be
set to that, if the method returns boolean, retval will be set to the
value (JNI_TRUE or JNI_FALSE), etc.
* exc: If the methods throws any exception, this will contain the reference
* Arguments (the method arguments) must be passed after methSignature
* RETURNS: -1 on error and 0 on success. If -1 is returned, exc will have
a valid exception reference, and the result stored at retval is undefined.
*/
jthrowable invokeMethod(JNIEnv *env, jvalue *retval, MethType methType,
jobject instObj, const char *className, const char *methName,
const char *methSignature, ...);
jthrowable constructNewObjectOfClass(JNIEnv *env, jobject *out, const char *className,
const char *ctorSignature, ...);
jthrowable methodIdFromClass(const char *className, const char *methName,
const char *methSignature, MethType methType,
JNIEnv *env, jmethodID *out);
jthrowable globalClassReference(const char *className, JNIEnv *env, jclass *out);
/** classNameOfObject: Get an object's class name.
* @param jobj: The object.
* @param env: The JNIEnv pointer.
* @param name: (out param) On success, will contain a string containing the
* class name. This string must be freed by the caller.
* @return NULL on success, or the exception
*/
jthrowable classNameOfObject(jobject jobj, JNIEnv *env, char **name);
/** getJNIEnv: A helper function to get the JNIEnv* for the given thread.
* If no JVM exists, then one will be created. JVM command line arguments
* are obtained from the LIBHDFS_OPTS environment variable.
* @param: None.
* @return The JNIEnv* corresponding to the thread.
* */
JNIEnv* getJNIEnv(void);
#endif /*LIBHDFS_JNI_HELPER_H*/
/**
* vim: ts=4: sw=4: et:
*/

View File

@ -17,7 +17,7 @@
*/
#include "expect.h"
#include "webhdfs.h"
#include "hdfs.h"
#include <errno.h>
#include <semaphore.h>

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
#include "webhdfs.h"
#include "hdfs.h"
#include <inttypes.h>
#include <jni.h>

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
#include "webhdfs.h"
#include "hdfs.h"
#include <stdio.h>
#include <stdlib.h>

View File

@ -17,7 +17,7 @@
*/
#include "expect.h"
#include "webhdfs.h"
#include "hdfs.h"
#include <errno.h>
#include <semaphore.h>

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
#include "webhdfs.h"
#include "hdfs.h"
#include <limits.h>
#include <stdio.h>

View File

@ -1,8 +1,9 @@
#include "hdfs.h"
#include <time.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include "webhdfs.h"
#ifdef __MACH__
#include <mach/clock.h>

View File

@ -1,694 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIB_WEBHDFS_H
#define LIB_WEBHDFS_H
#include <errno.h> /* for EINTERNAL, etc. */
#include <fcntl.h> /* for O_RDONLY, O_WRONLY */
#include <stdint.h> /* for uint64_t, etc. */
#include <time.h> /* for time_t */
#include <pthread.h>
#ifndef O_RDONLY
#define O_RDONLY 1
#endif
#ifndef O_WRONLY
#define O_WRONLY 2
#endif
#ifndef EINTERNAL
#define EINTERNAL 255
#endif
/** All APIs set errno to meaningful values */
#ifdef __cplusplus
extern "C" {
#endif
/**
* Some utility decls used in libhdfs.
*/
typedef int32_t tSize; /// size of data for read/write io ops
typedef time_t tTime; /// time type in seconds
typedef int64_t tOffset;/// offset within the file
typedef uint16_t tPort; /// port
/**
* The information required for accessing webhdfs,
* including the network address of the namenode and the user name
*/
struct hdfsBuilder {
int forceNewInstance;
const char *nn;
const char *nn_jni;
tPort port;
const char *kerbTicketCachePath;
const char *userName;
/*
* This is a new attribute compared to libhdfs.
* We maintain a local workingDir for constructing absolute path
*/
char *workingDir;
};
typedef enum tObjectKind {
kObjectKindFile = 'F',
kObjectKindDirectory = 'D',
} tObjectKind;
/**
* For libhdfs based on JNI, this is used as
* the C reflection of org.apache.org.hadoop.FileSystem .
* In the current libwebhdfs based on webhdfs,
* this is actually hdfsBuilder which contains
* the network address of the namenode and the user name
*/
struct hdfs_internal;
typedef struct hdfs_internal* hdfsFS;
/**
* The C equivalent of org.apache.org.hadoop.FSData(Input|Output)Stream .
*/
enum hdfsStreamType
{
UNINITIALIZED = 0,
INPUT = 1,
OUTPUT = 2,
};
/**
* The 'file-handle' to a file in hdfs.
*/
struct hdfsFile_internal {
void* file;
enum hdfsStreamType type;
int flags;
tOffset offset;
};
typedef struct hdfsFile_internal* hdfsFile;
/**
* hdfsFileInfo - Information about a file/directory.
*/
typedef struct {
tObjectKind mKind; /* file or directory */
char *mName; /* the name of the file */
tTime mLastMod; /* the last modification time for the file in seconds */
tOffset mSize; /* the size of the file in bytes */
short mReplication; /* the count of replicas */
tOffset mBlockSize; /* the block size for the file */
char *mOwner; /* the owner of the file */
char *mGroup; /* the group associated with the file */
short mPermissions; /* the permissions associated with the file */
tTime mLastAccess; /* the last access time for the file in seconds */
} hdfsFileInfo;
/**
* webhdfsBuffer - used for hold the data for read/write from/to http connection
*/
typedef struct {
const char *wbuffer; /* the user's buffer for uploading */
size_t remaining; /* length of content */
size_t offset; /* offset for reading */
int openFlag; /* check whether the hdfsOpenFile has been called before */
int closeFlag; /* whether to close the http connection for writing */
pthread_mutex_t writeMutex; // used for syschronization between the curl thread and the hdfsWrite thread
pthread_cond_t newwrite_or_close; // transferring thread waits for this condition
// when there is no more content for transferring in the buffer
pthread_cond_t transfer_finish; // condition used to indicate finishing transferring (one buffer)
} webhdfsBuffer;
struct webhdfsFileHandle {
char *absPath;
int bufferSize;
short replication;
tSize blockSize;
char *datanode;
webhdfsBuffer *uploadBuffer;
pthread_t connThread;
};
// Bit fields for hdfsFile_internal flags
#define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0)
/**
* Determine if a file is open for read.
*
* @param file The HDFS file
* @return 1 if the file is open for read; 0 otherwise
*/
int hdfsFileIsOpenForRead(hdfsFile file);
/**
* Determine if a file is open for write.
*
* @param file The HDFS file
* @return 1 if the file is open for write; 0 otherwise
*/
int hdfsFileIsOpenForWrite(hdfsFile file);
/**
* Disable the direct read optimization for a file in libhdfs.
* This is mainly provided for unit testing purposes.
* No longer useful in libwebhdfs since libwebhdfs is based on webhdfs.
*
* @param file The HDFS file
*/
void hdfsFileDisableDirectRead(hdfsFile file);
/**
* hdfsConnectAsUser - Connect to a hdfs file system as a specific user
* Connect to the hdfs.
* @param nn The NameNode. See hdfsBuilderSetNameNode for details.
* @param port The port on which the server is listening.
* @param user the user name (this is hadoop domain user). Or NULL is equivelant to hhdfsConnect(host, port)
* @return Returns a handle to the filesystem or NULL on error.
* @deprecated Use hdfsBuilderConnect instead.
*/
hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user);
/**
* hdfsConnect - Connect to a hdfs file system.
* Connect to the hdfs.
* @param nn The NameNode. See hdfsBuilderSetNameNode for details.
* @param port The port on which the server is listening.
* @return Returns a handle to the filesystem or NULL on error.
* @deprecated Use hdfsBuilderConnect instead.
*/
hdfsFS hdfsConnect(const char* nn, tPort port);
/**
* hdfsConnect - Connect to an hdfs file system.
*
* The effect with hdfsConnectAsUser in libwebhdfs.
*
* @param nn The NameNode. See hdfsBuilderSetNameNode for details.
* @param port The port on which the server is listening.
* @param user The user name to use when connecting
* @return Returns a handle to the filesystem or NULL on error.
* @deprecated Use hdfsBuilderConnect instead.
*/
hdfsFS hdfsConnectAsUserNewInstance(const char* nn, tPort port, const char *user );
/**
* hdfsConnect - Connect to an hdfs file system.
*
* The same effect with hdfsConnect in libwebhdfs.
*
* @param nn The NameNode. See hdfsBuilderSetNameNode for details.
* @param port The port on which the server is listening.
* @return Returns a handle to the filesystem or NULL on error.
* @deprecated Use hdfsBuilderConnect instead.
*/
hdfsFS hdfsConnectNewInstance(const char* nn, tPort port);
/**
* Connect to HDFS using the parameters defined by the builder.
*
* Every successful call to hdfsBuilderConnect should be matched with a call
* to hdfsDisconnect, when the hdfsFS is no longer needed.
*
* @param bld The HDFS builder
* @return Returns a handle to the filesystem, or NULL on error.
*/
hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld);
/**
* Create an HDFS builder.
*
* @return The HDFS builder, or NULL on error.
*/
struct hdfsBuilder *hdfsNewBuilder(void);
/**
* In libhdfs: force the builder to always create a new instance of the FileSystem,
* rather than possibly finding one in the cache.
*
* @param bld The HDFS builder
* @deprecated No longer usefule in libwebhdfs.
*/
void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld);
/**
* Set the HDFS NameNode to connect to.
*
* @param bld The HDFS builder
* @param nn The NameNode to use.
*
* If the string given is 'default', the default NameNode
* configuration will be used (from the XML configuration files)
*
* If NULL is given, a LocalFileSystem will be created.
*
* If the string starts with a protocol type such as file:// or
* hdfs://, this protocol type will be used. If not, the
* hdfs:// protocol type will be used.
*
* You may specify a NameNode port in the usual way by
* passing a string of the format hdfs://<hostname>:<port>.
* Alternately, you may set the port with
* hdfsBuilderSetNameNodePort. However, you must not pass the
* port in two different ways.
*/
void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn);
/**
* Set the port of the HDFS NameNode to connect to.
*
* @param bld The HDFS builder
* @param port The port.
*/
void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port);
/**
* Set the username to use when connecting to the HDFS cluster.
*
* @param bld The HDFS builder
* @param userName The user name. The string will be shallow-copied.
*/
void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName);
/**
* Set the path to the Kerberos ticket cache to use when connecting to
* the HDFS cluster.
*
* @param bld The HDFS builder
* @param kerbTicketCachePath The Kerberos ticket cache path. The string
* will be shallow-copied.
*/
void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld,
const char *kerbTicketCachePath);
/**
* Free an HDFS builder.
*
* @param bld The HDFS builder
*/
void hdfsFreeBuilder(struct hdfsBuilder *bld);
/**
* Get a configuration string.
*
* @param key The key to find
* @param val (out param) The value. This will be set to NULL if the
* key isn't found. You must free this string with
* hdfsConfStrFree.
*
* @return 0 on success; nonzero error code otherwise.
* Failure to find the key is not an error.
*/
int hdfsConfGetStr(const char *key, char **val);
/**
* Get a configuration integer.
*
* @param key The key to find
* @param val (out param) The value. This will NOT be changed if the
* key isn't found.
*
* @return 0 on success; nonzero error code otherwise.
* Failure to find the key is not an error.
*/
int hdfsConfGetInt(const char *key, int32_t *val);
/**
* Free a configuration string found with hdfsConfGetStr.
*
* @param val A configuration string obtained from hdfsConfGetStr
*/
void hdfsConfStrFree(char *val);
/**
* hdfsDisconnect - Disconnect from the hdfs file system.
* Disconnect from hdfs.
*
* In libwebhdfs, we simply free the hdfsFS,
* so do not use it after hdfsCopy/hdfsMove/hdfsGetDefaultBlockSize which still use JNI for FileSystem connection.
*
* @param fs The configured filesystem handle.
* @return Returns 0 on success, -1 on error.
*/
int hdfsDisconnect(hdfsFS fs);
/**
* hdfsOpenFile - Open a hdfs file in given mode.
* In libwebhdfs we simply store corresponding information in a hdfsFile.
*
* @param fs The configured filesystem handle.
* @param path The full path to the file.
* @param flags - an | of bits/fcntl.h file flags - supported flags are O_RDONLY, O_WRONLY (meaning create or overwrite i.e., implies O_TRUNCAT),
* O_WRONLY|O_APPEND. Other flags are generally ignored other than (O_RDWR || (O_EXCL & O_CREAT)) which return NULL and set errno equal ENOTSUP.
* @param bufferSize Size of buffer for read/write - pass 0 if you want
* to use the default configured values.
* @param replication Block replication - pass 0 if you want to use
* the default configured values.
* @param blocksize Size of block - pass 0 if you want to use the
* default configured values.
* @return Returns the handle to the open file or NULL on error.
*/
hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
int bufferSize, short replication, tSize blocksize);
/**
* hdfsCloseFile - Close an open file.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @return Returns 0 on success, -1 on error.
*/
int hdfsCloseFile(hdfsFS fs, hdfsFile file);
/**
* hdfsExists - Checks if a given path exsits on the filesystem
* @param fs The configured filesystem handle.
* @param path The path to look for
* @return Returns 0 on success, -1 on error.
*/
int hdfsExists(hdfsFS fs, const char *path);
/**
* hdfsSeek - Seek to given offset in file.
* This works only for files opened in read-only mode.
* In libwebhdfs we store the offset in the local hdfsFile handle, thus
* in this function we simply set the local offset.
*
* @param fs The configured filesystem handle.
* @param file The file handle.
* @param desiredPos Offset into the file to seek into.
* @return Returns 0 on success, -1 on error.
*/
int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos);
/**
* hdfsTell - Get the current offset in the file, in bytes.
* In libwebhdfs the current offset is stored in the local hdfsFile handle,
* thus this function simply sets the local offset.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @return Current offset, -1 on error.
*/
tOffset hdfsTell(hdfsFS fs, hdfsFile file);
/**
* hdfsRead - Read data from an open file.
* In libwebhdfs the reading starts from the current offset which is stored in the hdfsFile handle
* @param fs The configured filesystem handle.
* @param file The file handle.
* @param buffer The buffer to copy read bytes into.
* @param length The length of the buffer.
* @return On success, a positive number indicating how many bytes
* were read.
* On end-of-file, 0.
* On error, -1. Errno will be set to the error code.
* Just like the POSIX read function, hdfsRead will return -1
* and set errno to EINTR if data is temporarily unavailable,
* but we are not yet at the end of the file.
*/
tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length);
/**
* hdfsPread - Positional read of data from an open file.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @param position Position from which to read
* @param buffer The buffer to copy read bytes into.
* @param length The length of the buffer.
* @return Returns the number of bytes actually read, possibly less than
* than length;-1 on error.
*/
tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position,
void* buffer, tSize length);
/**
* hdfsWrite - Write data into an open file.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @param buffer The data.
* @param length The no. of bytes to write.
* @return Returns the number of bytes written, -1 on error.
*/
tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void* buffer,
tSize length);
/**
* hdfsWrite - Flush the data. No use for libwebhdfs.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @return Returns 0 on success, -1 on error.
* @deprecated Not usefule in libwebhdfs.
*/
int hdfsFlush(hdfsFS fs, hdfsFile file);
/**
* hdfsHFlush - Flush out the data in client's user buffer. After the
* return of this call, new readers will see the data.
* @param fs configured filesystem handle
* @param file file handle
* @return 0 on success, -1 on error and sets errno
* @deprecated Not usefule in libwebhdfs.
*/
int hdfsHFlush(hdfsFS fs, hdfsFile file);
/**
* hdfsAvailable - Number of bytes that can be read from this
* input stream.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @return Returns available bytes; -1 on error.
*/
int hdfsAvailable(hdfsFS fs, hdfsFile file);
/**
* hdfsCopy - Copy file from one filesystem to another.
* @param srcFS The handle to source filesystem.
* @param src The path of source file.
* @param dstFS The handle to destination filesystem.
* @param dst The path of destination file.
* @return Returns 0 on success, -1 on error.
*/
int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst);
/**
* hdfsMove - Move file from one filesystem to another.
* @param srcFS The handle to source filesystem.
* @param src The path of source file.
* @param dstFS The handle to destination filesystem.
* @param dst The path of destination file.
* @return Returns 0 on success, -1 on error.
*/
int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst);
/**
* hdfsDelete - Delete file.
* @param fs The configured filesystem handle.
* @param path The path of the file.
* @param recursive if path is a directory and set to
* non-zero, the directory is deleted else throws an exception. In
* case of a file the recursive argument is irrelevant.
* @return Returns 0 on success, -1 on error.
*/
int hdfsDelete(hdfsFS fs, const char* path, int recursive);
/**
* hdfsRename - Rename file.
* @param fs The configured filesystem handle.
* @param oldPath The path of the source file.
* @param newPath The path of the destination file.
* @return Returns 0 on success, -1 on error.
*/
int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath);
/**
* hdfsGetWorkingDirectory - Get the current working directory for
* the given filesystem. In libwebhdfs it is retrieved from local hdfsFS handle.
* @param fs The configured filesystem handle.
* @param buffer The user-buffer to copy path of cwd into.
* @param bufferSize The length of user-buffer.
* @return Returns buffer, NULL on error.
*/
char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize);
/**
* hdfsSetWorkingDirectory - Set the working directory. All relative
* paths will be resolved relative to it. In libwebhdfs the local hdfsFS is modified.
* @param fs The configured filesystem handle.
* @param path The path of the new 'cwd'.
* @return Returns 0 on success, -1 on error.
*/
int hdfsSetWorkingDirectory(hdfsFS fs, const char* path);
/**
* hdfsCreateDirectory - Make the given file and all non-existent
* parents into directories.
* @param fs The configured filesystem handle.
* @param path The path of the directory.
* @return Returns 0 on success, -1 on error.
*/
int hdfsCreateDirectory(hdfsFS fs, const char* path);
/**
* hdfsSetReplication - Set the replication of the specified
* file to the supplied value
* @param fs The configured filesystem handle.
* @param path The path of the file.
* @return Returns 0 on success, -1 on error.
*/
int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication);
/**
* hdfsListDirectory - Get list of files/directories for a given
* directory-path. hdfsFreeFileInfo should be called to deallocate memory.
* @param fs The configured filesystem handle.
* @param path The path of the directory.
* @param numEntries Set to the number of files/directories in path.
* @return Returns a dynamically-allocated array of hdfsFileInfo
* objects; NULL on error.
*/
hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path,
int *numEntries);
/**
* hdfsGetPathInfo - Get information about a path as a (dynamically
* allocated) single hdfsFileInfo struct. hdfsFreeFileInfo should be
* called when the pointer is no longer needed.
* @param fs The configured filesystem handle.
* @param path The path of the file.
* @return Returns a dynamically-allocated hdfsFileInfo object;
* NULL on error.
*/
hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path);
/**
* hdfsFreeFileInfo - Free up the hdfsFileInfo array (including fields)
* @param hdfsFileInfo The array of dynamically-allocated hdfsFileInfo
* objects.
* @param numEntries The size of the array.
*/
void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries);
/**
* hdfsGetHosts - Get hostnames where a particular block (determined by
* pos & blocksize) of a file is stored. The last element in the array
* is NULL. Due to replication, a single block could be present on
* multiple hosts.
* @param fs The configured filesystem handle.
* @param path The path of the file.
* @param start The start of the block.
* @param length The length of the block.
* @return Returns a dynamically-allocated 2-d array of blocks-hosts;
* NULL on error.
*
* Not supported yet but will be supported by libwebhdfs based on webhdfs.
*/
char*** hdfsGetHosts(hdfsFS fs, const char* path,
tOffset start, tOffset length);
/**
* hdfsFreeHosts - Free up the structure returned by hdfsGetHosts
* @param hdfsFileInfo The array of dynamically-allocated hdfsFileInfo
* objects.
* @param numEntries The size of the array.
*/
void hdfsFreeHosts(char ***blockHosts);
/**
* hdfsGetDefaultBlockSize - Get the optimum blocksize.
* @param fs The configured filesystem handle.
* @return Returns the blocksize; -1 on error.
*/
tOffset hdfsGetDefaultBlockSize(hdfsFS fs);
/**
* hdfsGetCapacity - Return the raw capacity of the filesystem.
* @param fs The configured filesystem handle.
* @return Returns the raw-capacity; -1 on error.
*
* Not supported yet but will be supported by libwebhdfs based on webhdfs.
*/
tOffset hdfsGetCapacity(hdfsFS fs);
/**
* hdfsGetUsed - Return the total raw size of all files in the filesystem.
* @param fs The configured filesystem handle.
* @return Returns the total-size; -1 on error.
*
* Not supported yet but will be supported by libwebhdfs based on webhdfs.
*/
tOffset hdfsGetUsed(hdfsFS fs);
/**
* hdfsChown
* @param fs The configured filesystem handle.
* @param path the path to the file or directory
* @param owner this is a string in Hadoop land. Set to null or "" if only setting group
* @param group this is a string in Hadoop land. Set to null or "" if only setting user
* @return 0 on success else -1
*/
int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group);
/**
* hdfsChmod
* @param fs The configured filesystem handle.
* @param path the path to the file or directory
* @param mode the bitmask to set it to
* @return 0 on success else -1
*/
int hdfsChmod(hdfsFS fs, const char* path, short mode);
/**
* hdfsUtime
* @param fs The configured filesystem handle.
* @param path the path to the file or directory
* @param mtime new modification time or -1 for no change
* @param atime new access time or -1 for no change
* @return 0 on success else -1
*/
int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime);
#ifdef __cplusplus
}
#endif
#endif /*LIB_WEBHDFS_H*/

View File

@ -67,6 +67,25 @@ static const struct ExceptionInfo gExceptionInfo[] = {
};
void getExceptionInfo(const char *excName, int noPrintFlags,
int *excErrno, int *shouldPrint)
{
int i;
for (i = 0; i < EXCEPTION_INFO_LEN; i++) {
if (strstr(gExceptionInfo[i].name, excName)) {
break;
}
}
if (i < EXCEPTION_INFO_LEN) {
*shouldPrint = !(gExceptionInfo[i].noPrintFlag & noPrintFlags);
*excErrno = gExceptionInfo[i].excErrno;
} else {
*shouldPrint = 1;
*excErrno = EINTERNAL;
}
}
int printExceptionAndFreeV(JNIEnv *env, jthrowable exc, int noPrintFlags,
const char *fmt, va_list ap)
{

View File

@ -64,6 +64,21 @@
#define NOPRINT_EXC_PARENT_NOT_DIRECTORY 0x08
#define NOPRINT_EXC_ILLEGAL_ARGUMENT 0x10
/**
* Get information about an exception.
*
* @param excName The Exception name.
* This is a Java class name in JNI format.
* @param noPrintFlags Flags which determine which exceptions we should NOT
* print.
* @param excErrno (out param) The POSIX error number associated with the
* exception.
* @param shouldPrint (out param) Nonzero if we should print this exception,
* based on the noPrintFlags and its name.
*/
void getExceptionInfo(const char *excName, int noPrintFlags,
int *excErrno, int *shouldPrint);
/**
* Print out information about an exception and free it.
*