HDFS-799. libhdfs must call DetachCurrentThread when a thread is destroyed. Contributed by Colin Patrick McCabe

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1361009 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Eli Collins 2012-07-13 00:10:26 +00:00
parent c1c388bbb6
commit 1a76841335
4 changed files with 133 additions and 22 deletions

View File

@ -131,6 +131,9 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3633. libhdfs: hdfsDelete should pass JNI_FALSE or JNI_TRUE.
(Colin Patrick McCabe via eli)
HDFS-799. libhdfs must call DetachCurrentThread when a thread is destroyed.
(Colin Patrick McCabe via eli)
OPTIMIZATIONS
HDFS-2982. Startup performance suffers when there are many edit log

View File

@ -67,6 +67,12 @@ function(FLATTEN_LIST INPUT SEPARATOR OUTPUT)
set (${OUTPUT} "${_TMPS}" PARENT_SCOPE)
endfunction()
# Check to see if our compiler and linker support the __thread attribute.
# On Linux and some other operating systems, this is a more efficient
# alternative to POSIX thread local storage.
INCLUDE(CheckCSourceCompiles)
CHECK_C_SOURCE_COMPILES("int main(void) { static __thread int i = 0; return 0; }" HAVE_BETTER_TLS)
find_package(JNI REQUIRED)
if (NOT GENERATED_JAVAH)
# Must identify where the generated headers have been placed

View File

@ -3,4 +3,6 @@
#cmakedefine _FUSE_DFS_VERSION "@_FUSE_DFS_VERSION@"
#cmakedefine HAVE_BETTER_TLS
#endif

View File

@ -15,17 +15,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <string.h>
#include "config.h"
#include "hdfsJniHelper.h"
#include <stdio.h>
#include <string.h>
static pthread_mutex_t hdfsHashMutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t jvmMutex = PTHREAD_MUTEX_INITIALIZER;
static volatile int hashTableInited = 0;
#define LOCK_HASH_TABLE() pthread_mutex_lock(&hdfsHashMutex)
#define UNLOCK_HASH_TABLE() pthread_mutex_unlock(&hdfsHashMutex)
#define LOCK_JVM_MUTEX() pthread_mutex_lock(&jvmMutex)
#define UNLOCK_JVM_MUTEX() pthread_mutex_unlock(&jvmMutex)
/** The Native return types that methods could return */
@ -48,6 +50,43 @@ static volatile int hashTableInited = 0;
*/
#define MAX_HASH_TABLE_ELEM 4096
/** Key that allows us to retrieve thread-local storage */
static pthread_key_t gTlsKey;
/** nonzero if we succeeded in initializing gTlsKey. Protected by the jvmMutex */
static int gTlsKeyInitialized = 0;
/** Pthreads thread-local storage for each library thread. */
struct hdfsTls {
JNIEnv *env;
};
/**
* The function that is called whenever a thread with libhdfs thread local data
* is destroyed.
*
* @param v The thread-local data
*/
static void hdfsThreadDestructor(void *v)
{
struct hdfsTls *tls = v;
JavaVM *vm;
JNIEnv *env = tls->env;
jint ret;
if (!tls)
return;
ret = (*env)->GetJavaVM(env, &vm);
if (ret) {
fprintf(stderr, "hdfsThreadDestructor: GetJavaVM failed with "
"error %d\n", ret);
(*env)->ExceptionDescribe(env);
} else {
(*vm)->DetachCurrentThread(vm);
}
free(tls);
}
static int validateMethodType(MethType methType)
{
@ -375,34 +414,26 @@ char *classNameOfObject(jobject jobj, JNIEnv *env) {
return newstr;
}
/**
* getJNIEnv: A helper function to get the JNIEnv* for the given thread.
* If no JVM exists, then one will be created. JVM command line arguments
* are obtained from the LIBHDFS_OPTS environment variable.
* Get the global JNI environemnt.
*
* @param: None.
* @return The JNIEnv* corresponding to the thread.
* We only have to create the JVM once. After that, we can use it in
* every thread. You must be holding the jvmMutex when you call this
* function.
*
* @return The JNIEnv on success; error code otherwise
*/
JNIEnv* getJNIEnv(void)
static JNIEnv* getGlobalJNIEnv(void)
{
const jsize vmBufLength = 1;
JavaVM* vmBuf[vmBufLength];
JNIEnv *env;
jint rv = 0;
jint noVMs = 0;
// Only the first thread should create the JVM. The other trheads should
// just use the JVM created by the first thread.
LOCK_JVM_MUTEX();
rv = JNI_GetCreatedJavaVMs(&(vmBuf[0]), vmBufLength, &noVMs);
if (rv != 0) {
fprintf(stderr, "JNI_GetCreatedJavaVMs failed with error: %d\n", rv);
UNLOCK_JVM_MUTEX();
return NULL;
}
@ -411,7 +442,6 @@ JNIEnv* getJNIEnv(void)
char *hadoopClassPath = getenv("CLASSPATH");
if (hadoopClassPath == NULL) {
fprintf(stderr, "Environment variable CLASSPATH not set!\n");
UNLOCK_JVM_MUTEX();
return NULL;
}
char *hadoopClassPathVMArg = "-Djava.class.path=";
@ -470,7 +500,6 @@ JNIEnv* getJNIEnv(void)
if (rv != 0) {
fprintf(stderr, "Call to JNI_CreateJavaVM failed "
"with error: %d\n", rv);
UNLOCK_JVM_MUTEX();
return NULL;
}
}
@ -481,11 +510,82 @@ JNIEnv* getJNIEnv(void)
if (rv != 0) {
fprintf(stderr, "Call to AttachCurrentThread "
"failed with error: %d\n", rv);
UNLOCK_JVM_MUTEX();
return NULL;
}
}
UNLOCK_JVM_MUTEX();
return env;
}
/**
* getJNIEnv: A helper function to get the JNIEnv* for the given thread.
* If no JVM exists, then one will be created. JVM command line arguments
* are obtained from the LIBHDFS_OPTS environment variable.
*
* Implementation note: we rely on POSIX thread-local storage (tls).
* This allows us to associate a destructor function with each thread, that
* will detach the thread from the Java VM when the thread terminates. If we
* failt to do this, it will cause a memory leak.
*
* However, POSIX TLS is not the most efficient way to do things. It requires a
* key to be initialized before it can be used. Since we don't know if this key
* is initialized at the start of this function, we have to lock a mutex first
* and check. Luckily, most operating systems support the more efficient
* __thread construct, which is initialized by the linker.
*
* @param: None.
* @return The JNIEnv* corresponding to the thread.
*/
JNIEnv* getJNIEnv(void)
{
JNIEnv *env;
struct hdfsTls *tls;
int ret;
#ifdef HAVE_BETTER_TLS
static __thread struct hdfsTls *quickTls = NULL;
if (quickTls)
return quickTls->env;
#endif
pthread_mutex_lock(&jvmMutex);
if (!gTlsKeyInitialized) {
ret = pthread_key_create(&gTlsKey, hdfsThreadDestructor);
if (ret) {
pthread_mutex_unlock(&jvmMutex);
fprintf("pthread_key_create failed with error %d\n", ret);
return NULL;
}
gTlsKeyInitialized = 1;
}
tls = pthread_getspecific(gTlsKey);
if (tls) {
pthread_mutex_unlock(&jvmMutex);
return tls->env;
}
env = getGlobalJNIEnv();
pthread_mutex_unlock(&jvmMutex);
if (!env) {
fprintf(stderr, "getJNIEnv: getGlobalJNIEnv failed\n");
return NULL;
}
tls = calloc(1, sizeof(struct hdfsTls));
if (!tls) {
fprintf(stderr, "getJNIEnv: OOM allocating %d bytes\n",
sizeof(struct hdfsTls));
return NULL;
}
tls->env = env;
ret = pthread_setspecific(gTlsKey, tls);
if (ret) {
fprintf(stderr, "getJNIEnv: pthread_setspecific failed with "
"error code %d\n", ret);
hdfsThreadDestructor(tls);
return NULL;
}
#ifdef HAVE_BETTER_TLS
quickTls = tls;
#endif
return env;
}