diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-native-client/pom.xml index 6e8033f1692..b578667bd60 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/pom.xml @@ -32,7 +32,6 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> false - false @@ -140,7 +139,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> - + @@ -199,7 +198,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> - + diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/CMakeLists.txt index 0a6f383401f..d7bfb766724 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/CMakeLists.txt @@ -91,11 +91,6 @@ endfunction() add_subdirectory(main/native/libhdfs) add_subdirectory(main/native/libhdfs-tests) - -if(REQUIRE_LIBWEBHDFS) - add_subdirectory(contrib/libwebhdfs) -endif() - # Find Linux FUSE if(${CMAKE_SYSTEM_NAME} MATCHES "Linux") find_package(PkgConfig REQUIRED) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/CMakeLists.txt deleted file mode 100644 index cc2b42d44d4..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/CMakeLists.txt +++ /dev/null @@ -1,88 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -find_package(CURL REQUIRED) - -set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} - "${CMAKE_SOURCE_DIR}/contrib/libwebhdfs/resources/") - -find_package(Jansson REQUIRED) -include_directories( - ${JNI_INCLUDE_DIRS} - ${CMAKE_BINARY_DIR} - ${CMAKE_SOURCE_DIR}/main/native - ${CMAKE_SOURCE_DIR}/main/native/libhdfs - ${CMAKE_SOURCE_DIR}/main/native/libhdfs/include - ${OS_DIR} - ${JANSSON_INCLUDE_DIR} -) - -add_definitions(-DLIBHDFS_DLL_EXPORT) - -hadoop_add_dual_library(webhdfs - src/hdfs_web.c - src/hdfs_http_client.c - src/hdfs_http_query.c - src/hdfs_json_parser.c - ../../main/native/libhdfs/exception.c - ../../main/native/libhdfs/jni_helper.c - ../../main/native/libhdfs/common/htable.c - ${OS_DIR}/mutexes.c - ${OS_DIR}/thread_local_storage.c -) -hadoop_target_link_dual_libraries(webhdfs - ${JAVA_JVM_LIBRARY} - ${CURL_LIBRARY} - ${JANSSON_LIBRARY} - pthread -) -hadoop_dual_output_directory(webhdfs target) -set(LIBWEBHDFS_VERSION "0.0.0") -set_target_properties(webhdfs PROPERTIES - SOVERSION ${LIBWEBHDFS_VERSION}) - -add_executable(test_libwebhdfs_ops - src/test_libwebhdfs_ops.c -) -target_link_libraries(test_libwebhdfs_ops - webhdfs - native_mini_dfs -) - -add_executable(test_libwebhdfs_read - src/test_libwebhdfs_read.c -) -target_link_libraries(test_libwebhdfs_read - webhdfs -) - -add_executable(test_libwebhdfs_write - src/test_libwebhdfs_write.c -) -target_link_libraries(test_libwebhdfs_write - webhdfs -) - -add_executable(test_libwebhdfs_threaded - src/test_libwebhdfs_threaded.c -) -target_link_libraries(test_libwebhdfs_threaded - webhdfs - native_mini_dfs - pthread -) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/resources/FindJansson.cmake b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/resources/FindJansson.cmake deleted file mode 100644 index b8c67eae32a..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/resources/FindJansson.cmake +++ /dev/null @@ -1,43 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -# - Try to find Jansson -# Once done this will define -# JANSSON_FOUND - System has Jansson -# JANSSON_INCLUDE_DIRS - The Jansson include directories -# JANSSON_LIBRARIES - The libraries needed to use Jansson -# JANSSON_DEFINITIONS - Compiler switches required for using Jansson - -find_path(JANSSON_INCLUDE_DIR jansson.h - /usr/include - /usr/include/jansson - /usr/local/include ) - -find_library(JANSSON_LIBRARY NAMES jansson - PATHS /usr/lib /usr/local/lib ) - -set(JANSSON_LIBRARIES ${JANSSON_LIBRARY} ) -set(JANSSON_INCLUDE_DIRS ${JANSSON_INCLUDE_DIR} ) - -include(FindPackageHandleStandardArgs) -# handle the QUIETLY and REQUIRED arguments and set JANSSON_FOUND to TRUE -# if all listed variables are TRUE -find_package_handle_standard_args(Jansson DEFAULT_MSG - JANSSON_LIBRARY JANSSON_INCLUDE_DIR) - -mark_as_advanced(JANSSON_INCLUDE_DIR JANSSON_LIBRARY ) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/hdfs_http_client.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/hdfs_http_client.c deleted file mode 100644 index dc5ca4102de..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/hdfs_http_client.c +++ /dev/null @@ -1,490 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include - -#include "hdfs_http_client.h" -#include "exception.h" - -static pthread_mutex_t curlInitMutex = PTHREAD_MUTEX_INITIALIZER; -static volatile int curlGlobalInited = 0; - -const char *hdfs_strerror(int errnoval) -{ -#if defined(__sun) -// MT-Safe under Solaris which doesn't support sys_errlist/sys_nerr - return strerror(errnoval); -#else - if ((errnoval < 0) || (errnoval >= sys_nerr)) { - return "unknown error."; - } - return sys_errlist[errnoval]; -#endif -} - -int initResponseBuffer(struct ResponseBuffer **buffer) -{ - struct ResponseBuffer *info = NULL; - int ret = 0; - info = calloc(1, sizeof(struct ResponseBuffer)); - if (!info) { - ret = ENOMEM; - } - *buffer = info; - return ret; -} - -void freeResponseBuffer(struct ResponseBuffer *buffer) -{ - if (buffer) { - if (buffer->content) { - free(buffer->content); - } - free(buffer); - buffer = NULL; - } -} - -void freeResponse(struct Response *resp) -{ - if (resp) { - freeResponseBuffer(resp->body); - freeResponseBuffer(resp->header); - free(resp); - resp = NULL; - } -} - -/** - * Callback used by libcurl for allocating local buffer and - * reading data to local buffer - */ -static size_t writefunc(void *ptr, size_t size, - size_t nmemb, struct ResponseBuffer *rbuffer) -{ - void *temp = NULL; - if (size * nmemb < 1) { - return 0; - } - if (!rbuffer) { - fprintf(stderr, - "ERROR: ResponseBuffer is NULL for the callback writefunc.\n"); - return 0; - } - - if (rbuffer->remaining < size * nmemb) { - temp = realloc(rbuffer->content, rbuffer->offset + size * nmemb + 1); - if (temp == NULL) { - fprintf(stderr, "ERROR: fail to realloc in callback writefunc.\n"); - return 0; - } - rbuffer->content = temp; - rbuffer->remaining = size * nmemb; - } - memcpy(rbuffer->content + rbuffer->offset, ptr, size * nmemb); - rbuffer->offset += size * nmemb; - (rbuffer->content)[rbuffer->offset] = '\0'; - rbuffer->remaining -= size * nmemb; - return size * nmemb; -} - -/** - * Callback used by libcurl for reading data into buffer provided by user, - * thus no need to reallocate buffer. - */ -static size_t writeFuncWithUserBuffer(void *ptr, size_t size, - size_t nmemb, struct ResponseBuffer *rbuffer) -{ - size_t toCopy = 0; - if (size * nmemb < 1) { - return 0; - } - if (!rbuffer || !rbuffer->content) { - fprintf(stderr, - "ERROR: buffer to read is NULL for the " - "callback writeFuncWithUserBuffer.\n"); - return 0; - } - - toCopy = rbuffer->remaining < (size * nmemb) ? - rbuffer->remaining : (size * nmemb); - memcpy(rbuffer->content + rbuffer->offset, ptr, toCopy); - rbuffer->offset += toCopy; - rbuffer->remaining -= toCopy; - return toCopy; -} - -/** - * Callback used by libcurl for writing data to remote peer - */ -static size_t readfunc(void *ptr, size_t size, size_t nmemb, void *stream) -{ - struct webhdfsBuffer *wbuffer = NULL; - if (size * nmemb < 1) { - return 0; - } - - wbuffer = stream; - pthread_mutex_lock(&wbuffer->writeMutex); - while (wbuffer->remaining == 0) { - /* - * The current remainning bytes to write is 0, - * check closeFlag to see whether need to finish the transfer. - * if yes, return 0; else, wait - */ - if (wbuffer->closeFlag) { // We can close the transfer now - //For debug - fprintf(stderr, "CloseFlag is set, ready to close the transfer\n"); - pthread_mutex_unlock(&wbuffer->writeMutex); - return 0; - } else { - // remaining == 0 but closeFlag is not set - // indicates that user's buffer has been transferred - pthread_cond_signal(&wbuffer->transfer_finish); - pthread_cond_wait(&wbuffer->newwrite_or_close, - &wbuffer->writeMutex); - } - } - - if (wbuffer->remaining > 0 && !wbuffer->closeFlag) { - size_t copySize = wbuffer->remaining < size * nmemb ? - wbuffer->remaining : size * nmemb; - memcpy(ptr, wbuffer->wbuffer + wbuffer->offset, copySize); - wbuffer->offset += copySize; - wbuffer->remaining -= copySize; - pthread_mutex_unlock(&wbuffer->writeMutex); - return copySize; - } else { - fprintf(stderr, "ERROR: webhdfsBuffer's remaining is %ld, " - "it should be a positive value!\n", wbuffer->remaining); - pthread_mutex_unlock(&wbuffer->writeMutex); - return 0; - } -} - -/** - * Initialize the global libcurl environment - */ -static void initCurlGlobal() -{ - if (!curlGlobalInited) { - pthread_mutex_lock(&curlInitMutex); - if (!curlGlobalInited) { - curl_global_init(CURL_GLOBAL_ALL); - curlGlobalInited = 1; - } - pthread_mutex_unlock(&curlInitMutex); - } -} - -/** - * Launch simple commands (commands without file I/O) and return response - * - * @param url Target URL - * @param method HTTP method (GET/PUT/POST) - * @param followloc Whether or not need to set CURLOPT_FOLLOWLOCATION - * @param response Response from remote service - * @return 0 for success and non-zero value to indicate error - */ -static int launchCmd(const char *url, enum HttpHeader method, - enum Redirect followloc, struct Response **response) -{ - CURL *curl = NULL; - CURLcode curlCode; - int ret = 0; - struct Response *resp = NULL; - - resp = calloc(1, sizeof(struct Response)); - if (!resp) { - return ENOMEM; - } - ret = initResponseBuffer(&(resp->body)); - if (ret) { - goto done; - } - ret = initResponseBuffer(&(resp->header)); - if (ret) { - goto done; - } - initCurlGlobal(); - curl = curl_easy_init(); - if (!curl) { - ret = ENOMEM; // curl_easy_init does not return error code, - // and most of its errors are caused by malloc() - fprintf(stderr, "ERROR in curl_easy_init.\n"); - goto done; - } - /* Set callback function for reading data from remote service */ - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writefunc); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, resp->body); - curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc); - curl_easy_setopt(curl, CURLOPT_WRITEHEADER, resp->header); - curl_easy_setopt(curl, CURLOPT_URL, url); - switch(method) { - case GET: - break; - case PUT: - curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT"); - break; - case POST: - curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST"); - break; - case DELETE: - curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE"); - break; - default: - ret = EINVAL; - fprintf(stderr, "ERROR: Invalid HTTP method\n"); - goto done; - } - if (followloc == YES) { - curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1); - } - /* Now run the curl handler */ - curlCode = curl_easy_perform(curl); - if (curlCode != CURLE_OK) { - ret = EIO; - fprintf(stderr, "ERROR: preform the URL %s failed, <%d>: %s\n", - url, curlCode, curl_easy_strerror(curlCode)); - } -done: - if (curl != NULL) { - curl_easy_cleanup(curl); - } - if (ret) { - free(resp); - resp = NULL; - } - *response = resp; - return ret; -} - -/** - * Launch the read request. The request is sent to the NameNode and then - * redirected to corresponding DataNode - * - * @param url The URL for the read request - * @param resp The response containing the buffer provided by user - * @return 0 for success and non-zero value to indicate error - */ -static int launchReadInternal(const char *url, struct Response* resp) -{ - CURL *curl; - CURLcode curlCode; - int ret = 0; - - if (!resp || !resp->body || !resp->body->content) { - fprintf(stderr, - "ERROR: invalid user-provided buffer!\n"); - return EINVAL; - } - - initCurlGlobal(); - /* get a curl handle */ - curl = curl_easy_init(); - if (!curl) { - fprintf(stderr, "ERROR in curl_easy_init.\n"); - return ENOMEM; - } - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writeFuncWithUserBuffer); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, resp->body); - curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc); - curl_easy_setopt(curl, CURLOPT_WRITEHEADER, resp->header); - curl_easy_setopt(curl, CURLOPT_URL, url); - curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1); - - curlCode = curl_easy_perform(curl); - if (curlCode != CURLE_OK && curlCode != CURLE_PARTIAL_FILE) { - ret = EIO; - fprintf(stderr, "ERROR: preform the URL %s failed, <%d>: %s\n", - url, curlCode, curl_easy_strerror(curlCode)); - } - - curl_easy_cleanup(curl); - return ret; -} - -/** - * The function does the write operation by connecting to a DataNode. - * The function keeps the connection with the DataNode until - * the closeFlag is set. Whenever the current data has been sent out, - * the function blocks waiting for further input from user or close. - * - * @param url URL of the remote DataNode - * @param method PUT for create and POST for append - * @param uploadBuffer Buffer storing user's data to write - * @param response Response from remote service - * @return 0 for success and non-zero value to indicate error - */ -static int launchWrite(const char *url, enum HttpHeader method, - struct webhdfsBuffer *uploadBuffer, - struct Response **response) -{ - CURLcode curlCode; - struct Response* resp = NULL; - struct curl_slist *chunk = NULL; - CURL *curl = NULL; - int ret = 0; - - if (!uploadBuffer) { - fprintf(stderr, "ERROR: upload buffer is NULL!\n"); - return EINVAL; - } - - initCurlGlobal(); - resp = calloc(1, sizeof(struct Response)); - if (!resp) { - return ENOMEM; - } - ret = initResponseBuffer(&(resp->body)); - if (ret) { - goto done; - } - ret = initResponseBuffer(&(resp->header)); - if (ret) { - goto done; - } - - // Connect to the datanode in order to create the lease in the namenode - curl = curl_easy_init(); - if (!curl) { - fprintf(stderr, "ERROR: failed to initialize the curl handle.\n"); - return ENOMEM; - } - curl_easy_setopt(curl, CURLOPT_URL, url); - - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writefunc); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, resp->body); - curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc); - curl_easy_setopt(curl, CURLOPT_WRITEHEADER, resp->header); - curl_easy_setopt(curl, CURLOPT_READFUNCTION, readfunc); - curl_easy_setopt(curl, CURLOPT_READDATA, uploadBuffer); - curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L); - - chunk = curl_slist_append(chunk, "Transfer-Encoding: chunked"); - curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk); - chunk = curl_slist_append(chunk, "Expect:"); - curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk); - - switch(method) { - case PUT: - curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT"); - break; - case POST: - curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST"); - break; - default: - ret = EINVAL; - fprintf(stderr, "ERROR: Invalid HTTP method\n"); - goto done; - } - curlCode = curl_easy_perform(curl); - if (curlCode != CURLE_OK) { - ret = EIO; - fprintf(stderr, "ERROR: preform the URL %s failed, <%d>: %s\n", - url, curlCode, curl_easy_strerror(curlCode)); - } - -done: - if (chunk != NULL) { - curl_slist_free_all(chunk); - } - if (curl != NULL) { - curl_easy_cleanup(curl); - } - if (ret) { - free(resp); - resp = NULL; - } - *response = resp; - return ret; -} - -int launchMKDIR(const char *url, struct Response **resp) -{ - return launchCmd(url, PUT, NO, resp); -} - -int launchRENAME(const char *url, struct Response **resp) -{ - return launchCmd(url, PUT, NO, resp); -} - -int launchGFS(const char *url, struct Response **resp) -{ - return launchCmd(url, GET, NO, resp); -} - -int launchLS(const char *url, struct Response **resp) -{ - return launchCmd(url, GET, NO, resp); -} - -int launchCHMOD(const char *url, struct Response **resp) -{ - return launchCmd(url, PUT, NO, resp); -} - -int launchCHOWN(const char *url, struct Response **resp) -{ - return launchCmd(url, PUT, NO, resp); -} - -int launchDELETE(const char *url, struct Response **resp) -{ - return launchCmd(url, DELETE, NO, resp); -} - -int launchOPEN(const char *url, struct Response* resp) -{ - return launchReadInternal(url, resp); -} - -int launchUTIMES(const char *url, struct Response **resp) -{ - return launchCmd(url, PUT, NO, resp); -} - -int launchNnWRITE(const char *url, struct Response **resp) -{ - return launchCmd(url, PUT, NO, resp); -} - -int launchNnAPPEND(const char *url, struct Response **resp) -{ - return launchCmd(url, POST, NO, resp); -} - -int launchDnWRITE(const char *url, struct webhdfsBuffer *buffer, - struct Response **resp) -{ - return launchWrite(url, PUT, buffer, resp); -} - -int launchDnAPPEND(const char *url, struct webhdfsBuffer *buffer, - struct Response **resp) -{ - return launchWrite(url, POST, buffer, resp); -} - -int launchSETREPLICATION(const char *url, struct Response **resp) -{ - return launchCmd(url, PUT, NO, resp); -} diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/hdfs_http_client.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/hdfs_http_client.h deleted file mode 100644 index ab854641675..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/hdfs_http_client.h +++ /dev/null @@ -1,294 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - - -#ifndef _HDFS_HTTP_CLIENT_H_ -#define _HDFS_HTTP_CLIENT_H_ - -#include "hdfs/hdfs.h" /* for tSize */ - -#include /* for pthread_t */ -#include /* for size_t */ - -/** enum indicating the type of hdfs stream */ -enum hdfsStreamType -{ - UNINITIALIZED = 0, - INPUT = 1, - OUTPUT = 2, -}; - -/** - * webhdfsBuffer - used for hold the data for read/write from/to http connection - */ -struct webhdfsBuffer { - const char *wbuffer; /* The user's buffer for uploading */ - size_t remaining; /* Length of content */ - size_t offset; /* offset for reading */ - /* Check whether the hdfsOpenFile has been called before */ - int openFlag; - /* Whether to close the http connection for writing */ - int closeFlag; - /* Synchronization between the curl and hdfsWrite threads */ - pthread_mutex_t writeMutex; - /* - * Transferring thread waits for this condition - * when there is no more content for transferring in the buffer - */ - pthread_cond_t newwrite_or_close; - /* Condition used to indicate finishing transferring (one buffer) */ - pthread_cond_t transfer_finish; -}; - -/** File handle for webhdfs */ -struct webhdfsFileHandle { - char *absPath; /* Absolute path of file */ - int bufferSize; /* Size of buffer */ - short replication; /* Number of replication */ - tSize blockSize; /* Block size */ - char *datanode; /* URL of the DataNode */ - /* webhdfsBuffer handle used to store the upload data */ - struct webhdfsBuffer *uploadBuffer; - /* The thread used for data transferring */ - pthread_t connThread; -}; - -/** Type of http header */ -enum HttpHeader { - GET, - PUT, - POST, - DELETE -}; - -/** Whether to redirect */ -enum Redirect { - YES, - NO -}; - -/** Buffer used for holding response */ -struct ResponseBuffer { - char *content; - size_t remaining; - size_t offset; -}; - -/** - * The response got through webhdfs - */ -struct Response { - struct ResponseBuffer *body; - struct ResponseBuffer *header; -}; - -/** - * Create and initialize a ResponseBuffer - * - * @param buffer Pointer pointing to new created ResponseBuffer handle - * @return 0 for success, non-zero value to indicate error - */ -int initResponseBuffer(struct ResponseBuffer **buffer) __attribute__ ((warn_unused_result)); - -/** - * Free the given ResponseBuffer - * - * @param buffer The ResponseBuffer to free - */ -void freeResponseBuffer(struct ResponseBuffer *buffer); - -/** - * Free the given Response - * - * @param resp The Response to free - */ -void freeResponse(struct Response *resp); - -/** - * Send the MKDIR request to NameNode using the given URL. - * The NameNode will execute the operation and return the result as response. - * - * @param url The URL for MKDIR operation - * @param response Response handle to store response returned from the NameNode - * @return 0 for success, non-zero value to indicate error - */ -int launchMKDIR(const char *url, - struct Response **response) __attribute__ ((warn_unused_result)); - -/** - * Send the RENAME request to NameNode using the given URL. - * The NameNode will execute the operation and return the result as response. - * - * @param url The URL for RENAME operation - * @param response Response handle to store response returned from the NameNode - * @return 0 for success, non-zero value to indicate error - */ -int launchRENAME(const char *url, - struct Response **response) __attribute__ ((warn_unused_result)); - -/** - * Send the CHMOD request to NameNode using the given URL. - * The NameNode will execute the operation and return the result as response. - * - * @param url The URL for CHMOD operation - * @param response Response handle to store response returned from the NameNode - * @return 0 for success, non-zero value to indicate error - */ -int launchCHMOD(const char *url, - struct Response **response) __attribute__ ((warn_unused_result)); - -/** - * Send the GetFileStatus request to NameNode using the given URL. - * The NameNode will execute the operation and return the result as response. - * - * @param url The URL for GetFileStatus operation - * @param response Response handle to store response returned from the NameNode, - * containing either file status or exception information - * @return 0 for success, non-zero value to indicate error - */ -int launchGFS(const char *url, - struct Response **response) __attribute__ ((warn_unused_result)); - -/** - * Send the LS (LISTSTATUS) request to NameNode using the given URL. - * The NameNode will execute the operation and return the result as response. - * - * @param url The URL for LISTSTATUS operation - * @param response Response handle to store response returned from the NameNode - * @return 0 for success, non-zero value to indicate error - */ -int launchLS(const char *url, - struct Response **response) __attribute__ ((warn_unused_result)); - -/** - * Send the DELETE request to NameNode using the given URL. - * The NameNode will execute the operation and return the result as response. - * - * @param url The URL for DELETE operation - * @param response Response handle to store response returned from the NameNode - * @return 0 for success, non-zero value to indicate error - */ -int launchDELETE(const char *url, - struct Response **response) __attribute__ ((warn_unused_result)); - -/** - * Send the CHOWN request to NameNode using the given URL. - * The NameNode will execute the operation and return the result as response. - * - * @param url The URL for CHOWN operation - * @param response Response handle to store response returned from the NameNode - * @return 0 for success, non-zero value to indicate error - */ -int launchCHOWN(const char *url, - struct Response **response) __attribute__ ((warn_unused_result)); - -/** - * Send the OPEN request to NameNode using the given URL, - * asking for reading a file (within a range). - * The NameNode first redirects the request to the datanode - * that holds the corresponding first block of the file (within a range), - * and the datanode returns the content of the file through the HTTP connection. - * - * @param url The URL for OPEN operation - * @param resp The response holding user's buffer. - The file content will be written into the buffer. - * @return 0 for success, non-zero value to indicate error - */ -int launchOPEN(const char *url, - struct Response* resp) __attribute__ ((warn_unused_result)); - -/** - * Send the SETTIMES request to NameNode using the given URL. - * The NameNode will execute the operation and return the result as response. - * - * @param url The URL for SETTIMES operation - * @param response Response handle to store response returned from the NameNode - * @return 0 for success, non-zero value to indicate error - */ -int launchUTIMES(const char *url, - struct Response **response) __attribute__ ((warn_unused_result)); - -/** - * Send the WRITE/CREATE request to NameNode using the given URL. - * The NameNode will choose the writing target datanodes - * and return the first datanode in the pipeline as response - * - * @param url The URL for WRITE/CREATE operation connecting to NameNode - * @param response Response handle to store response returned from the NameNode - * @return 0 for success, non-zero value to indicate error - */ -int launchNnWRITE(const char *url, - struct Response **response) __attribute__ ((warn_unused_result)); - -/** - * Send the WRITE request along with to-write content to - * the corresponding DataNode using the given URL. - * The DataNode will write the data and return the response. - * - * @param url The URL for WRITE operation connecting to DataNode - * @param buffer The webhdfsBuffer containing data to be written to hdfs - * @param response Response handle to store response returned from the NameNode - * @return 0 for success, non-zero value to indicate error - */ -int launchDnWRITE(const char *url, struct webhdfsBuffer *buffer, - struct Response **response) __attribute__ ((warn_unused_result)); - -/** - * Send the WRITE (APPEND) request to NameNode using the given URL. - * The NameNode determines the DataNode for appending and - * sends its URL back as response. - * - * @param url The URL for APPEND operation - * @param response Response handle to store response returned from the NameNode - * @return 0 for success, non-zero value to indicate error - */ -int launchNnAPPEND(const char *url, struct Response **response) __attribute__ ((warn_unused_result)); - -/** - * Send the SETREPLICATION request to NameNode using the given URL. - * The NameNode will execute the operation and return the result as response. - * - * @param url The URL for SETREPLICATION operation - * @param response Response handle to store response returned from the NameNode - * @return 0 for success, non-zero value to indicate error - */ -int launchSETREPLICATION(const char *url, - struct Response **response) __attribute__ ((warn_unused_result)); - -/** - * Send the APPEND request along with the content to DataNode. - * The DataNode will do the appending and return the result as response. - * - * @param url The URL for APPEND operation connecting to DataNode - * @param buffer The webhdfsBuffer containing data to be appended - * @param response Response handle to store response returned from the NameNode - * @return 0 for success, non-zero value to indicate error - */ -int launchDnAPPEND(const char *url, struct webhdfsBuffer *buffer, - struct Response **response) __attribute__ ((warn_unused_result)); - -/** - * Thread-safe strerror alternative. - * - * @param errnoval The error code value - * @return The error message string mapped to the given error code - */ -const char *hdfs_strerror(int errnoval); - -#endif //_HDFS_HTTP_CLIENT_H_ diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/hdfs_http_query.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/hdfs_http_query.c deleted file mode 100644 index b082c08cef5..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/hdfs_http_query.c +++ /dev/null @@ -1,402 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "hdfs_http_query.h" -#include -#include -#include -#include -#include - -#define PERM_STR_LEN 4 // "644" + one byte for NUL -#define SHORT_STR_LEN 6 // 65535 + NUL -#define LONG_STR_LEN 21 // 2^64-1 = 18446744073709551615 + NUL - -/** - * Create query based on NameNode hostname, - * NameNode port, path, operation and other parameters - * - * @param host NameNode hostName - * @param nnPort Port of NameNode - * @param path Absolute path for the corresponding file - * @param op Operations - * @param paraNum Number of remaining parameters - * @param paraNames Names of remaining parameters - * @param paraValues Values of remaining parameters - * @param url Holding the created URL - * @return 0 on success and non-zero value to indicate error - */ -static int createQueryURL(const char *host, unsigned int nnPort, - const char *path, const char *op, int paraNum, - const char **paraNames, const char **paraValues, - char **queryUrl) -{ - size_t length = 0; - int i = 0, offset = 0, ret = 0; - char *url = NULL; - const char *protocol = "http://"; - const char *prefix = "/webhdfs/v1"; - - if (!paraNames || !paraValues) { - return EINVAL; - } - length = strlen(protocol) + strlen(host) + strlen(":") + - SHORT_STR_LEN + strlen(prefix) + strlen(path) + - strlen ("?op=") + strlen(op); - for (i = 0; i < paraNum; i++) { - if (paraNames[i] && paraValues[i]) { - length += 2 + strlen(paraNames[i]) + strlen(paraValues[i]); - } - } - url = malloc(length); // The '\0' has already been included - // when using SHORT_STR_LEN - if (!url) { - return ENOMEM; - } - - offset = snprintf(url, length, "%s%s:%d%s%s?op=%s", - protocol, host, nnPort, prefix, path, op); - if (offset >= length || offset < 0) { - ret = EIO; - goto done; - } - for (i = 0; i < paraNum; i++) { - if (!paraNames[i] || !paraValues[i] || paraNames[i][0] == '\0' || - paraValues[i][0] == '\0') { - continue; - } - offset += snprintf(url + offset, length - offset, - "&%s=%s", paraNames[i], paraValues[i]); - if (offset >= length || offset < 0) { - ret = EIO; - goto done; - } - } -done: - if (ret) { - free(url); - return ret; - } - *queryUrl = url; - return 0; -} - -int createUrlForMKDIR(const char *host, int nnPort, - const char *path, const char *user, char **url) -{ - const char *userPara = "user.name"; - return createQueryURL(host, nnPort, path, "MKDIRS", 1, - &userPara, &user, url); -} - -int createUrlForGetFileStatus(const char *host, int nnPort, const char *path, - const char *user, char **url) -{ - const char *userPara = "user.name"; - return createQueryURL(host, nnPort, path, "GETFILESTATUS", 1, - &userPara, &user, url); -} - -int createUrlForLS(const char *host, int nnPort, const char *path, - const char *user, char **url) -{ - const char *userPara = "user.name"; - return createQueryURL(host, nnPort, path, "LISTSTATUS", - 1, &userPara, &user, url); -} - -int createUrlForNnAPPEND(const char *host, int nnPort, const char *path, - const char *user, char **url) -{ - const char *userPara = "user.name"; - return createQueryURL(host, nnPort, path, "APPEND", - 1, &userPara, &user, url); -} - -int createUrlForMKDIRwithMode(const char *host, int nnPort, const char *path, - int mode, const char *user, char **url) -{ - int strlength; - char permission[PERM_STR_LEN]; - const char *paraNames[2], *paraValues[2]; - - paraNames[0] = "permission"; - paraNames[1] = "user.name"; - memset(permission, 0, PERM_STR_LEN); - strlength = snprintf(permission, PERM_STR_LEN, "%o", mode); - if (strlength < 0 || strlength >= PERM_STR_LEN) { - return EIO; - } - paraValues[0] = permission; - paraValues[1] = user; - - return createQueryURL(host, nnPort, path, "MKDIRS", 2, - paraNames, paraValues, url); -} - -int createUrlForRENAME(const char *host, int nnPort, const char *srcpath, - const char *destpath, const char *user, char **url) -{ - const char *paraNames[2], *paraValues[2]; - paraNames[0] = "destination"; - paraNames[1] = "user.name"; - paraValues[0] = destpath; - paraValues[1] = user; - - return createQueryURL(host, nnPort, srcpath, - "RENAME", 2, paraNames, paraValues, url); -} - -int createUrlForCHMOD(const char *host, int nnPort, const char *path, - int mode, const char *user, char **url) -{ - int strlength; - char permission[PERM_STR_LEN]; - const char *paraNames[2], *paraValues[2]; - - paraNames[0] = "permission"; - paraNames[1] = "user.name"; - memset(permission, 0, PERM_STR_LEN); - strlength = snprintf(permission, PERM_STR_LEN, "%o", mode); - if (strlength < 0 || strlength >= PERM_STR_LEN) { - return EIO; - } - paraValues[0] = permission; - paraValues[1] = user; - - return createQueryURL(host, nnPort, path, "SETPERMISSION", - 2, paraNames, paraValues, url); -} - -int createUrlForDELETE(const char *host, int nnPort, const char *path, - int recursive, const char *user, char **url) -{ - const char *paraNames[2], *paraValues[2]; - paraNames[0] = "recursive"; - paraNames[1] = "user.name"; - if (recursive) { - paraValues[0] = "true"; - } else { - paraValues[0] = "false"; - } - paraValues[1] = user; - - return createQueryURL(host, nnPort, path, "DELETE", - 2, paraNames, paraValues, url); -} - -int createUrlForCHOWN(const char *host, int nnPort, const char *path, - const char *owner, const char *group, - const char *user, char **url) -{ - const char *paraNames[3], *paraValues[3]; - paraNames[0] = "owner"; - paraNames[1] = "group"; - paraNames[2] = "user.name"; - paraValues[0] = owner; - paraValues[1] = group; - paraValues[2] = user; - - return createQueryURL(host, nnPort, path, "SETOWNER", - 3, paraNames, paraValues, url); -} - -int createUrlForOPEN(const char *host, int nnPort, const char *path, - const char *user, size_t offset, size_t length, char **url) -{ - int strlength; - char offsetStr[LONG_STR_LEN], lengthStr[LONG_STR_LEN]; - const char *paraNames[3], *paraValues[3]; - - paraNames[0] = "offset"; - paraNames[1] = "length"; - paraNames[2] = "user.name"; - memset(offsetStr, 0, LONG_STR_LEN); - memset(lengthStr, 0, LONG_STR_LEN); - strlength = snprintf(offsetStr, LONG_STR_LEN, "%lu", offset); - if (strlength < 0 || strlength >= LONG_STR_LEN) { - return EIO; - } - strlength = snprintf(lengthStr, LONG_STR_LEN, "%lu", length); - if (strlength < 0 || strlength >= LONG_STR_LEN) { - return EIO; - } - paraValues[0] = offsetStr; - paraValues[1] = lengthStr; - paraValues[2] = user; - - return createQueryURL(host, nnPort, path, "OPEN", - 3, paraNames, paraValues, url); -} - -int createUrlForUTIMES(const char *host, int nnPort, const char *path, - long unsigned mTime, long unsigned aTime, - const char *user, char **url) -{ - int strlength; - char modTime[LONG_STR_LEN], acsTime[LONG_STR_LEN]; - const char *paraNames[3], *paraValues[3]; - - memset(modTime, 0, LONG_STR_LEN); - memset(acsTime, 0, LONG_STR_LEN); - strlength = snprintf(modTime, LONG_STR_LEN, "%lu", mTime); - if (strlength < 0 || strlength >= LONG_STR_LEN) { - return EIO; - } - strlength = snprintf(acsTime, LONG_STR_LEN, "%lu", aTime); - if (strlength < 0 || strlength >= LONG_STR_LEN) { - return EIO; - } - paraNames[0] = "modificationtime"; - paraNames[1] = "accesstime"; - paraNames[2] = "user.name"; - paraValues[0] = modTime; - paraValues[1] = acsTime; - paraValues[2] = user; - - return createQueryURL(host, nnPort, path, "SETTIMES", - 3, paraNames, paraValues, url); -} - -int createUrlForNnWRITE(const char *host, int nnPort, - const char *path, const char *user, - int16_t replication, size_t blockSize, char **url) -{ - int strlength; - char repStr[SHORT_STR_LEN], blockSizeStr[LONG_STR_LEN]; - const char *paraNames[4], *paraValues[4]; - - memset(repStr, 0, SHORT_STR_LEN); - memset(blockSizeStr, 0, LONG_STR_LEN); - if (replication > 0) { - strlength = snprintf(repStr, SHORT_STR_LEN, "%u", replication); - if (strlength < 0 || strlength >= SHORT_STR_LEN) { - return EIO; - } - } - if (blockSize > 0) { - strlength = snprintf(blockSizeStr, LONG_STR_LEN, "%lu", blockSize); - if (strlength < 0 || strlength >= LONG_STR_LEN) { - return EIO; - } - } - paraNames[0] = "overwrite"; - paraNames[1] = "replication"; - paraNames[2] = "blocksize"; - paraNames[3] = "user.name"; - paraValues[0] = "true"; - paraValues[1] = repStr; - paraValues[2] = blockSizeStr; - paraValues[3] = user; - - return createQueryURL(host, nnPort, path, "CREATE", - 4, paraNames, paraValues, url); -} - -int createUrlForSETREPLICATION(const char *host, int nnPort, - const char *path, int16_t replication, - const char *user, char **url) -{ - char repStr[SHORT_STR_LEN]; - const char *paraNames[2], *paraValues[2]; - int strlength; - - memset(repStr, 0, SHORT_STR_LEN); - if (replication > 0) { - strlength = snprintf(repStr, SHORT_STR_LEN, "%u", replication); - if (strlength < 0 || strlength >= SHORT_STR_LEN) { - return EIO; - } - } - paraNames[0] = "replication"; - paraNames[1] = "user.name"; - paraValues[0] = repStr; - paraValues[1] = user; - - return createQueryURL(host, nnPort, path, "SETREPLICATION", - 2, paraNames, paraValues, url); -} - -int createUrlForGetBlockLocations(const char *host, int nnPort, - const char *path, size_t offset, - size_t length, const char *user, char **url) -{ - char offsetStr[LONG_STR_LEN], lengthStr[LONG_STR_LEN]; - const char *paraNames[3], *paraValues[3]; - int strlength; - - memset(offsetStr, 0, LONG_STR_LEN); - memset(lengthStr, 0, LONG_STR_LEN); - if (offset > 0) { - strlength = snprintf(offsetStr, LONG_STR_LEN, "%lu", offset); - if (strlength < 0 || strlength >= LONG_STR_LEN) { - return EIO; - } - } - if (length > 0) { - strlength = snprintf(lengthStr, LONG_STR_LEN, "%lu", length); - if (strlength < 0 || strlength >= LONG_STR_LEN) { - return EIO; - } - } - paraNames[0] = "offset"; - paraNames[1] = "length"; - paraNames[2] = "user.name"; - paraValues[0] = offsetStr; - paraValues[1] = lengthStr; - paraValues[2] = user; - - return createQueryURL(host, nnPort, path, "GET_BLOCK_LOCATIONS", - 3, paraNames, paraValues, url); -} - -int createUrlForReadFromDatanode(const char *dnHost, int dnPort, - const char *path, size_t offset, - size_t length, const char *user, - const char *namenodeRpcAddr, char **url) -{ - char offsetStr[LONG_STR_LEN], lengthStr[LONG_STR_LEN]; - const char *paraNames[4], *paraValues[4]; - int strlength; - - memset(offsetStr, 0, LONG_STR_LEN); - memset(lengthStr, 0, LONG_STR_LEN); - if (offset > 0) { - strlength = snprintf(offsetStr, LONG_STR_LEN, "%lu", offset); - if (strlength < 0 || strlength >= LONG_STR_LEN) { - return EIO; - } - } - if (length > 0) { - strlength = snprintf(lengthStr, LONG_STR_LEN, "%lu", length); - if (strlength < 0 || strlength >= LONG_STR_LEN) { - return EIO; - } - } - - paraNames[0] = "offset"; - paraNames[1] = "length"; - paraNames[2] = "user.name"; - paraNames[3] = "namenoderpcaddress"; - paraValues[0] = offsetStr; - paraValues[1] = lengthStr; - paraValues[2] = user; - paraValues[3] = namenodeRpcAddr; - - return createQueryURL(dnHost, dnPort, path, "OPEN", - 4, paraNames, paraValues, url); -} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/hdfs_http_query.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/hdfs_http_query.h deleted file mode 100644 index 432797bfb8e..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/hdfs_http_query.h +++ /dev/null @@ -1,240 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -#ifndef _HDFS_HTTP_QUERY_H_ -#define _HDFS_HTTP_QUERY_H_ - -#include /* for size_t */ -#include /* for int16_t */ - -/** - * Create the URL for a MKDIR request - * - * @param host The hostname of the NameNode - * @param nnPort Port of the NameNode - * @param path Path of the dir to create - * @param user User name - * @param url Holding the generated URL for MKDIR request - * @return 0 on success and non-zero value on errors - */ -int createUrlForMKDIR(const char *host, int nnPort, - const char *path, const char *user, - char **url) __attribute__ ((warn_unused_result)); - -/** - * Create the URL for a MKDIR (with mode) request - * - * @param host The hostname of the NameNode - * @param nnPort Port of the NameNode - * @param path Path of the dir to create - * @param mode Mode of MKDIR - * @param user User name - * @param url Holding the generated URL for MKDIR request - * @return 0 on success and non-zero value on errors - */ -int createUrlForMKDIRwithMode(const char *host, int nnPort, const char *path, - int mode, const char *user, - char **url) __attribute__ ((warn_unused_result)); - -/** - * Create the URL for a RENAME request - * - * @param host The hostname of the NameNode - * @param nnPort Port of the NameNode - * @param srcpath Source path - * @param dstpath Destination path - * @param user User name - * @param url Holding the generated URL for RENAME request - * @return 0 on success and non-zero value on errors - */ -int createUrlForRENAME(const char *host, int nnPort, const char *srcpath, - const char *dstpath, const char *user, - char **url) __attribute__ ((warn_unused_result)); - -/** - * Create the URL for a CHMOD request - * - * @param host The hostname of the NameNode - * @param nnPort Port of the NameNode - * @param path Target path - * @param mode New mode for the file - * @param user User name - * @param url Holding the generated URL for CHMOD request - * @return 0 on success and non-zero value on errors - */ -int createUrlForCHMOD(const char *host, int nnPort, const char *path, - int mode, const char *user, - char **url) __attribute__ ((warn_unused_result)); - -/** - * Create the URL for a GETFILESTATUS request - * - * @param host The hostname of the NameNode - * @param nnPort Port of the NameNode - * @param path Path of the target file - * @param user User name - * @param url Holding the generated URL for GETFILESTATUS request - * @return 0 on success and non-zero value on errors - */ -int createUrlForGetFileStatus(const char *host, int nnPort, - const char *path, const char *user, - char **url) __attribute__ ((warn_unused_result)); - -/** - * Create the URL for a LISTSTATUS request - * - * @param host The hostname of the NameNode - * @param nnPort Port of the NameNode - * @param path Path of the directory for listing - * @param user User name - * @param url Holding the generated URL for LISTSTATUS request - * @return 0 on success and non-zero value on errors - */ -int createUrlForLS(const char *host, int nnPort, - const char *path, const char *user, - char **url) __attribute__ ((warn_unused_result)); - -/** - * Create the URL for a DELETE request - * - * @param host The hostname of the NameNode - * @param nnPort Port of the NameNode - * @param path Path of the file to be deletected - * @param recursive Whether or not to delete in a recursive way - * @param user User name - * @param url Holding the generated URL for DELETE request - * @return 0 on success and non-zero value on errors - */ -int createUrlForDELETE(const char *host, int nnPort, const char *path, - int recursive, const char *user, - char **url) __attribute__ ((warn_unused_result)); - -/** - * Create the URL for a CHOWN request - * - * @param host The hostname of the NameNode - * @param nnPort Port of the NameNode - * @param path Path of the target - * @param owner New owner - * @param group New group - * @param user User name - * @param url Holding the generated URL for CHOWN request - * @return 0 on success and non-zero value on errors - */ -int createUrlForCHOWN(const char *host, int nnPort, const char *path, - const char *owner, const char *group, const char *user, - char **url) __attribute__ ((warn_unused_result)); - -/** - * Create the URL for a OPEN/READ request - * - * @param host The hostname of the NameNode - * @param nnPort Port of the NameNode - * @param path Path of the file to read - * @param user User name - * @param offset Offset for reading (the start position for this read) - * @param length Length of the file to read - * @param url Holding the generated URL for OPEN/READ request - * @return 0 on success and non-zero value on errors - */ -int createUrlForOPEN(const char *host, int nnPort, const char *path, - const char *user, size_t offset, size_t length, - char **url) __attribute__ ((warn_unused_result)); - -/** - * Create the URL for a UTIMES (update time) request - * - * @param host The hostname of the NameNode - * @param nnPort Port of the NameNode - * @param path Path of the file for updating time - * @param mTime Modified time to set - * @param aTime Access time to set - * @param user User name - * @param url Holding the generated URL for UTIMES request - * @return 0 on success and non-zero value on errors - */ -int createUrlForUTIMES(const char *host, int nnPort, const char *path, - long unsigned mTime, long unsigned aTime, - const char *user, - char **url) __attribute__ ((warn_unused_result)); - -/** - * Create the URL for a WRITE/CREATE request (sent to NameNode) - * - * @param host The hostname of the NameNode - * @param nnPort Port of the NameNode - * @param path Path of the dir to create - * @param user User name - * @param replication Number of replication of the file - * @param blockSize Size of the block for the file - * @param url Holding the generated URL for WRITE request - * @return 0 on success and non-zero value on errors - */ -int createUrlForNnWRITE(const char *host, int nnPort, const char *path, - const char *user, int16_t replication, size_t blockSize, - char **url) __attribute__ ((warn_unused_result)); - -/** - * Create the URL for an APPEND request (sent to NameNode) - * - * @param host The hostname of the NameNode - * @param nnPort Port of the NameNode - * @param path Path of the file for appending - * @param user User name - * @param url Holding the generated URL for APPEND request - * @return 0 on success and non-zero value on errors - */ -int createUrlForNnAPPEND(const char *host, int nnPort, - const char *path, const char *user, - char **url) __attribute__ ((warn_unused_result)); - -/** - * Create the URL for a SETREPLICATION request - * - * @param host The hostname of the NameNode - * @param nnPort Port of the NameNode - * @param path Path of the target file - * @param replication New replication number - * @param user User name - * @param url Holding the generated URL for SETREPLICATION request - * @return 0 on success and non-zero value on errors - */ -int createUrlForSETREPLICATION(const char *host, int nnPort, const char *path, - int16_t replication, const char *user, - char **url) __attribute__ ((warn_unused_result)); - -/** - * Create the URL for a GET_BLOCK_LOCATIONS request - * - * @param host The hostname of the NameNode - * @param nnPort Port of the NameNode - * @param path Path of the target file - * @param offset The offset in the file - * @param length Length of the file content - * @param user User name - * @param url Holding the generated URL for GET_BLOCK_LOCATIONS request - * @return 0 on success and non-zero value on errors - */ -int createUrlForGetBlockLocations(const char *host, int nnPort, - const char *path, size_t offset, - size_t length, const char *user, - char **url) __attribute__ ((warn_unused_result)); - - -#endif //_HDFS_HTTP_QUERY_H_ diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/hdfs_json_parser.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/hdfs_json_parser.c deleted file mode 100644 index f0973a61131..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/hdfs_json_parser.c +++ /dev/null @@ -1,654 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "exception.h" -#include "hdfs/hdfs.h" /* for hdfsFileInfo */ -#include "hdfs_json_parser.h" - -#include -#include -#include -#include - -static const char * const temporaryRedirectCode = "307 TEMPORARY_REDIRECT"; -static const char * const twoHundredOKCode = "200 OK"; -static const char * const twoHundredOneCreatedCode = "201 Created"; -static const char * const httpHeaderString = "HTTP/1.1"; - -/** - * Exception information after calling JSON operations - */ -struct jsonException { - const char *exception; - const char *javaClassName; - const char *message; -}; - -/** Print out the JSON exception information */ -static int printJsonExceptionV(struct jsonException *exc, int noPrintFlags, - const char *fmt, va_list ap) -{ - char *javaClassName = NULL; - int excErrno = EINTERNAL, shouldPrint = 0; - if (!exc) { - fprintf(stderr, "printJsonExceptionV: the jsonException is NULL\n"); - return EINTERNAL; - } - javaClassName = strdup(exc->javaClassName); - if (!javaClassName) { - fprintf(stderr, "printJsonExceptionV: internal out of memory error\n"); - return EINTERNAL; - } - getExceptionInfo(javaClassName, noPrintFlags, &excErrno, &shouldPrint); - free(javaClassName); - - if (shouldPrint) { - vfprintf(stderr, fmt, ap); - fprintf(stderr, " error:\n"); - fprintf(stderr, "Exception: %s\nJavaClassName: %s\nMessage: %s\n", - exc->exception, exc->javaClassName, exc->message); - } - - free(exc); - return excErrno; -} - -/** - * Print out JSON exception information. - * - * @param exc The exception information to print and free - * @param noPrintFlags Flags which determine which exceptions we should NOT - * print. - * @param fmt Printf-style format list - * @param ... Printf-style varargs - * - * @return The POSIX error number associated with the exception - * object. - */ -static int printJsonException(struct jsonException *exc, int noPrintFlags, - const char *fmt, ...) -{ - va_list ap; - int ret = 0; - - va_start(ap, fmt); - ret = printJsonExceptionV(exc, noPrintFlags, fmt, ap); - va_end(ap); - return ret; -} - -/** Parse the exception information from JSON */ -static struct jsonException *parseJsonException(json_t *jobj) -{ - const char *key = NULL; - json_t *value = NULL; - struct jsonException *exception = NULL; - void *iter = NULL; - - exception = calloc(1, sizeof(*exception)); - if (!exception) { - return NULL; - } - - iter = json_object_iter(jobj); - while (iter) { - key = json_object_iter_key(iter); - value = json_object_iter_value(iter); - - if (!strcmp(key, "exception")) { - exception->exception = json_string_value(value); - } else if (!strcmp(key, "javaClassName")) { - exception->javaClassName = json_string_value(value); - } else if (!strcmp(key, "message")) { - exception->message = json_string_value(value); - } - - iter = json_object_iter_next(jobj, iter); - } - return exception; -} - -/** - * Parse the exception information which is presented in JSON - * - * @param content Exception information in JSON - * @return jsonException for printing out - */ -static struct jsonException *parseException(const char *content) -{ - json_error_t error; - size_t flags = 0; - const char *key = NULL; - json_t *value; - json_t *jobj; - struct jsonException *exception = NULL; - - if (!content) { - return NULL; - } - jobj = json_loads(content, flags, &error); - if (!jobj) { - fprintf(stderr, "JSon parsing error: on line %d: %s\n", - error.line, error.text); - return NULL; - } - void *iter = json_object_iter(jobj); - while(iter) { - key = json_object_iter_key(iter); - value = json_object_iter_value(iter); - - if (!strcmp(key, "RemoteException") && - json_typeof(value) == JSON_OBJECT) { - exception = parseJsonException(value); - break; - } - iter = json_object_iter_next(jobj, iter); - } - - json_decref(jobj); - return exception; -} - -/** - * Parse the response information which uses TRUE/FALSE - * to indicate whether the operation succeeded - * - * @param response Response information - * @return 0 to indicate success - */ -static int parseBoolean(const char *response) -{ - json_t *root, *value; - json_error_t error; - size_t flags = 0; - int result = 0; - - root = json_loads(response, flags, &error); - if (!root) { - fprintf(stderr, "JSon parsing error: on line %d: %s\n", - error.line, error.text); - return EIO; - } - void *iter = json_object_iter(root); - value = json_object_iter_value(iter); - if (json_typeof(value) == JSON_TRUE) { - result = 0; - } else { - result = EIO; // FALSE means error in remote NN/DN - } - json_decref(root); - return result; -} - -int parseMKDIR(const char *response) -{ - return parseBoolean(response); -} - -int parseRENAME(const char *response) -{ - return parseBoolean(response); -} - -int parseDELETE(const char *response) -{ - return parseBoolean(response); -} - -int parseSETREPLICATION(const char *response) -{ - return parseBoolean(response); -} - -/** - * Check the header of response to see if it's 200 OK - * - * @param header Header information for checking - * @param content Stores exception information if there are errors - * @param operation Indicate the operation for exception printing - * @return 0 for success - */ -static int checkHeader(const char *header, const char *content, - const char *operation) -{ - char *result = NULL; - const char delims[] = ":"; - char *savepter; - int ret = 0; - - if (!header || strncmp(header, "HTTP/", strlen("HTTP/"))) { - return EINVAL; - } - if (!(strstr(header, twoHundredOKCode)) || - !(result = strstr(header, "Content-Length"))) { - struct jsonException *exc = parseException(content); - if (exc) { - ret = printJsonException(exc, PRINT_EXC_ALL, - "Calling WEBHDFS (%s)", operation); - } else { - ret = EIO; - } - return ret; - } - result = strtok_r(result, delims, &savepter); - result = strtok_r(NULL, delims, &savepter); - while (isspace(*result)) { - result++; - } - // Content-Length should be equal to 0, - // and the string should be "0\r\nServer" - if (strncmp(result, "0\r\n", 3)) { - ret = EIO; - } - return ret; -} - -int parseCHMOD(const char *header, const char *content) -{ - return checkHeader(header, content, "CHMOD"); -} - -int parseCHOWN(const char *header, const char *content) -{ - return checkHeader(header, content, "CHOWN"); -} - -int parseUTIMES(const char *header, const char *content) -{ - return checkHeader(header, content, "SETTIMES"); -} - -/** - * Check if the header contains correct information - * ("307 TEMPORARY_REDIRECT" and "Location") - * - * @param header Header for parsing - * @param content Contains exception information - * if the remote operation failed - * @param operation Specify the remote operation when printing out exception - * @return 0 for success - */ -static int checkRedirect(const char *header, - const char *content, const char *operation) -{ - const char *locTag = "Location"; - int ret = 0, offset = 0; - - // The header must start with "HTTP/1.1" - if (!header || strncmp(header, httpHeaderString, - strlen(httpHeaderString))) { - return EINVAL; - } - - offset += strlen(httpHeaderString); - while (isspace(header[offset])) { - offset++; - } - // Looking for "307 TEMPORARY_REDIRECT" in header - if (strncmp(header + offset, temporaryRedirectCode, - strlen(temporaryRedirectCode))) { - // Process possible exception information - struct jsonException *exc = parseException(content); - if (exc) { - ret = printJsonException(exc, PRINT_EXC_ALL, - "Calling WEBHDFS (%s)", operation); - } else { - ret = EIO; - } - return ret; - } - // Here we just simply check if header contains "Location" tag, - // detailed processing is in parseDnLoc - if (!(strstr(header, locTag))) { - ret = EIO; - } - return ret; -} - -int parseNnWRITE(const char *header, const char *content) -{ - return checkRedirect(header, content, "Write(NameNode)"); -} - -int parseNnAPPEND(const char *header, const char *content) -{ - return checkRedirect(header, content, "Append(NameNode)"); -} - -/** 0 for success , -1 for out of range, other values for error */ -int parseOPEN(const char *header, const char *content) -{ - int ret = 0, offset = 0; - - if (!header || strncmp(header, httpHeaderString, - strlen(httpHeaderString))) { - return EINVAL; - } - - offset += strlen(httpHeaderString); - while (isspace(header[offset])) { - offset++; - } - if (strncmp(header + offset, temporaryRedirectCode, - strlen(temporaryRedirectCode)) || - !strstr(header, twoHundredOKCode)) { - struct jsonException *exc = parseException(content); - if (exc) { - // If the exception is an IOException and it is because - // the offset is out of the range, do not print out the exception - if (!strcasecmp(exc->exception, "IOException") && - strstr(exc->message, "out of the range")) { - ret = -1; - } else { - ret = printJsonException(exc, PRINT_EXC_ALL, - "Calling WEBHDFS (OPEN)"); - } - } else { - ret = EIO; - } - } - return ret; -} - -int parseDnLoc(char *content, char **dn) -{ - char *url = NULL, *dnLocation = NULL, *savepter, *tempContent; - const char *prefix = "Location: http://"; - const char *prefixToRemove = "Location: "; - const char *delims = "\r\n"; - - tempContent = strdup(content); - if (!tempContent) { - return ENOMEM; - } - - dnLocation = strtok_r(tempContent, delims, &savepter); - while (dnLocation && strncmp(dnLocation, "Location:", - strlen("Location:"))) { - dnLocation = strtok_r(NULL, delims, &savepter); - } - if (!dnLocation) { - return EIO; - } - - while (isspace(*dnLocation)) { - dnLocation++; - } - if (strncmp(dnLocation, prefix, strlen(prefix))) { - return EIO; - } - url = strdup(dnLocation + strlen(prefixToRemove)); - if (!url) { - return ENOMEM; - } - *dn = url; - return 0; -} - -int parseDnWRITE(const char *header, const char *content) -{ - int ret = 0; - if (header == NULL || header[0] == '\0' || - strncmp(header, "HTTP/", strlen("HTTP/"))) { - return EINVAL; - } - if (!(strstr(header, twoHundredOneCreatedCode))) { - struct jsonException *exc = parseException(content); - if (exc) { - ret = printJsonException(exc, PRINT_EXC_ALL, - "Calling WEBHDFS (WRITE(DataNode))"); - } else { - ret = EIO; - } - } - return ret; -} - -int parseDnAPPEND(const char *header, const char *content) -{ - int ret = 0; - - if (header == NULL || header[0] == '\0' || - strncmp(header, "HTTP/", strlen("HTTP/"))) { - return EINVAL; - } - if (!(strstr(header, twoHundredOKCode))) { - struct jsonException *exc = parseException(content); - if (exc) { - ret = printJsonException(exc, PRINT_EXC_ALL, - "Calling WEBHDFS (APPEND(DataNode))"); - } else { - ret = EIO; - } - } - return ret; -} - -/** - * Retrieve file status from the JSON object - * - * @param jobj JSON object for parsing, which contains - * file status information - * @param fileStat hdfsFileInfo handle to hold file status information - * @return 0 on success - */ -static int parseJsonForFileStatus(json_t *jobj, hdfsFileInfo *fileStat) -{ - const char *key, *tempstr; - json_t *value; - void *iter = NULL; - - iter = json_object_iter(jobj); - while (iter) { - key = json_object_iter_key(iter); - value = json_object_iter_value(iter); - - if (!strcmp(key, "accessTime")) { - // json field contains time in milliseconds, - // hdfsFileInfo is counted in seconds - fileStat->mLastAccess = json_integer_value(value) / 1000; - } else if (!strcmp(key, "blockSize")) { - fileStat->mBlockSize = json_integer_value(value); - } else if (!strcmp(key, "length")) { - fileStat->mSize = json_integer_value(value); - } else if (!strcmp(key, "modificationTime")) { - fileStat->mLastMod = json_integer_value(value) / 1000; - } else if (!strcmp(key, "replication")) { - fileStat->mReplication = json_integer_value(value); - } else if (!strcmp(key, "group")) { - fileStat->mGroup = strdup(json_string_value(value)); - if (!fileStat->mGroup) { - return ENOMEM; - } - } else if (!strcmp(key, "owner")) { - fileStat->mOwner = strdup(json_string_value(value)); - if (!fileStat->mOwner) { - return ENOMEM; - } - } else if (!strcmp(key, "pathSuffix")) { - fileStat->mName = strdup(json_string_value(value)); - if (!fileStat->mName) { - return ENOMEM; - } - } else if (!strcmp(key, "permission")) { - tempstr = json_string_value(value); - fileStat->mPermissions = (short) strtol(tempstr, NULL, 8); - } else if (!strcmp(key, "type")) { - tempstr = json_string_value(value); - if (!strcmp(tempstr, "DIRECTORY")) { - fileStat->mKind = kObjectKindDirectory; - } else { - fileStat->mKind = kObjectKindFile; - } - } - // Go to the next key-value pair in the json object - iter = json_object_iter_next(jobj, iter); - } - return 0; -} - -int parseGFS(const char *response, hdfsFileInfo *fileStat, int printError) -{ - int ret = 0, printFlag; - json_error_t error; - size_t flags = 0; - json_t *jobj, *value; - const char *key; - void *iter = NULL; - - if (!response || !fileStat) { - return EIO; - } - jobj = json_loads(response, flags, &error); - if (!jobj) { - fprintf(stderr, "error while parsing json: on line %d: %s\n", - error.line, error.text); - return EIO; - } - iter = json_object_iter(jobj); - key = json_object_iter_key(iter); - value = json_object_iter_value(iter); - if (json_typeof(value) == JSON_OBJECT) { - if (!strcmp(key, "RemoteException")) { - struct jsonException *exception = parseJsonException(value); - if (exception) { - if (printError) { - printFlag = PRINT_EXC_ALL; - } else { - printFlag = NOPRINT_EXC_FILE_NOT_FOUND | - NOPRINT_EXC_ACCESS_CONTROL | - NOPRINT_EXC_PARENT_NOT_DIRECTORY; - } - ret = printJsonException(exception, printFlag, - "Calling WEBHDFS GETFILESTATUS"); - } else { - ret = EIO; - } - } else if (!strcmp(key, "FileStatus")) { - ret = parseJsonForFileStatus(value, fileStat); - } else { - ret = EIO; - } - - } else { - ret = EIO; - } - - json_decref(jobj); - return ret; -} - -/** - * Parse the JSON array. Called to parse the result of - * the LISTSTATUS operation. Thus each element of the JSON array is - * a JSON object with the information of a file entry contained - * in the folder. - * - * @param jobj The JSON array to be parsed - * @param fileStat The hdfsFileInfo handle used to - * store a group of file information - * @param numEntries Capture the number of files in the folder - * @return 0 for success - */ -static int parseJsonArrayForFileStatuses(json_t *jobj, hdfsFileInfo **fileStat, - int *numEntries) -{ - json_t *jvalue = NULL; - int i = 0, ret = 0, arraylen = 0; - hdfsFileInfo *fileInfo = NULL; - - arraylen = (int) json_array_size(jobj); - if (arraylen > 0) { - fileInfo = calloc(arraylen, sizeof(hdfsFileInfo)); - if (!fileInfo) { - return ENOMEM; - } - } - for (i = 0; i < arraylen; i++) { - //Getting the array element at position i - jvalue = json_array_get(jobj, i); - if (json_is_object(jvalue)) { - ret = parseJsonForFileStatus(jvalue, &fileInfo[i]); - if (ret) { - goto done; - } - } else { - ret = EIO; - goto done; - } - } -done: - if (ret) { - free(fileInfo); - } else { - *numEntries = arraylen; - *fileStat = fileInfo; - } - return ret; -} - -int parseLS(const char *response, hdfsFileInfo **fileStats, int *numOfEntries) -{ - int ret = 0; - json_error_t error; - size_t flags = 0; - json_t *jobj, *value; - const char *key; - void *iter = NULL; - - if (!response || response[0] == '\0' || !fileStats) { - return EIO; - } - jobj = json_loads(response, flags, &error); - if (!jobj) { - fprintf(stderr, "error while parsing json: on line %d: %s\n", - error.line, error.text); - return EIO; - } - - iter = json_object_iter(jobj); - key = json_object_iter_key(iter); - value = json_object_iter_value(iter); - if (json_typeof(value) == JSON_OBJECT) { - if (!strcmp(key, "RemoteException")) { - struct jsonException *exception = parseJsonException(value); - if (exception) { - ret = printJsonException(exception, PRINT_EXC_ALL, - "Calling WEBHDFS GETFILESTATUS"); - } else { - ret = EIO; - } - } else if (!strcmp(key, "FileStatuses")) { - iter = json_object_iter(value); - value = json_object_iter_value(iter); - if (json_is_array(value)) { - ret = parseJsonArrayForFileStatuses(value, fileStats, - numOfEntries); - } else { - ret = EIO; - } - } else { - ret = EIO; - } - } else { - ret = EIO; - } - - json_decref(jobj); - return ret; -} diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/hdfs_json_parser.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/hdfs_json_parser.h deleted file mode 100644 index c5f2f9cafe6..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/hdfs_json_parser.h +++ /dev/null @@ -1,178 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef _HDFS_JSON_PARSER_H_ -#define _HDFS_JSON_PARSER_H_ - -/** - * Parse the response for MKDIR request. The response uses TRUE/FALSE - * to indicate whether the operation succeeded. - * - * @param response The response information to parse. - * @return 0 for success - */ -int parseMKDIR(const char *response); - -/** - * Parse the response for RENAME request. The response uses TRUE/FALSE - * to indicate whether the operation succeeded. - * - * @param response The response information to parse. - * @return 0 for success - */ -int parseRENAME(const char *response); - -/** - * Parse the response for DELETE request. The response uses TRUE/FALSE - * to indicate whether the operation succeeded. - * - * @param response The response information to parse. - * @return 0 for success - */ -int parseDELETE(const char *response); - -/** - * Parse the response for SETREPLICATION request. The response uses TRUE/FALSE - * to indicate whether the operation succeeded. - * - * @param response The response information to parse. - * @return 0 for success - */ -int parseSETREPLICATION(const char *response); - -/** - * Parse the response for OPEN (read) request. A successful operation - * will return "200 OK". - * - * @param response The response information for parsing - * @return 0 for success , -1 for out of range, other values for error - */ -int parseOPEN(const char *header, const char *content); - -/** - * Parse the response for WRITE (from NameNode) request. - * A successful operation should return "307 TEMPORARY_REDIRECT" in its header. - * - * @param header The header of the http response - * @param content If failing, the exception message - * sent from NameNode is stored in content - * @return 0 for success - */ -int parseNnWRITE(const char *header, const char *content); - -/** - * Parse the response for WRITE (from DataNode) request. - * A successful operation should return "201 Created" in its header. - * - * @param header The header of the http response - * @param content If failing, the exception message - * sent from DataNode is stored in content - * @return 0 for success - */ -int parseDnWRITE(const char *header, const char *content); - -/** - * Parse the response for APPEND (sent from NameNode) request. - * A successful operation should return "307 TEMPORARY_REDIRECT" in its header. - * - * @param header The header of the http response - * @param content If failing, the exception message - * sent from NameNode is stored in content - * @return 0 for success - */ -int parseNnAPPEND(const char *header, const char *content); - -/** - * Parse the response for APPEND (from DataNode) request. - * A successful operation should return "200 OK" in its header. - * - * @param header The header of the http response - * @param content If failing, the exception message - * sent from DataNode is stored in content - * @return 0 for success - */ -int parseDnAPPEND(const char *header, const char *content); - -/** - * Parse the response (from NameNode) to get the location information - * of the DataNode that should be contacted for the following write operation. - * - * @param content Content of the http header - * @param dn To store the location of the DataNode for writing - * @return 0 for success - */ -int parseDnLoc(char *content, char **dn) __attribute__ ((warn_unused_result)); - -/** - * Parse the response for GETFILESTATUS operation. - * - * @param response Response to parse. Its detailed format is specified in - * "http://hadoop.apache.org/docs/stable/webhdfs.html#GETFILESTATUS" - * @param fileStat A hdfsFileInfo handle for holding file information - * @param printError Whether or not print out exception - * when file does not exist - * @return 0 for success, non-zero value to indicate error - */ -int parseGFS(const char *response, hdfsFileInfo *fileStat, int printError); - -/** - * Parse the response for LISTSTATUS operation. - * - * @param response Response to parse. Its detailed format is specified in - * "http://hadoop.apache.org/docs/r1.0.3/webhdfs.html#LISTSTATUS" - * @param fileStats Pointer pointing to a list of hdfsFileInfo handles - * holding file/dir information in the directory - * @param numEntries After parsing, the value of this parameter indicates - * the number of file entries. - * @return 0 for success, non-zero value to indicate error - */ -int parseLS(const char *response, hdfsFileInfo **fileStats, int *numOfEntries); - -/** - * Parse the response for CHOWN request. - * A successful operation should contains "200 OK" in its header, - * and the Content-Length should be 0. - * - * @param header The header of the http response - * @param content If failing, the exception message is stored in content - * @return 0 for success - */ -int parseCHOWN(const char *header, const char *content); - -/** - * Parse the response for CHMOD request. - * A successful operation should contains "200 OK" in its header, - * and the Content-Length should be 0. - * - * @param header The header of the http response - * @param content If failing, the exception message is stored in content - * @return 0 for success - */ -int parseCHMOD(const char *header, const char *content); - -/** - * Parse the response for SETTIMES request. - * A successful operation should contains "200 OK" in its header, - * and the Content-Length should be 0. - * - * @param header The header of the http response - * @param content If failing, the exception message is stored in content - * @return 0 for success - */ -int parseUTIMES(const char *header, const char *content); - -#endif //_HDFS_JSON_PARSER_H_ diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/hdfs_web.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/hdfs_web.c deleted file mode 100644 index a3d6575fe97..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/hdfs_web.c +++ /dev/null @@ -1,1538 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include - -#include "libhdfs/exception.h" -#include "hdfs/hdfs.h" -#include "hdfs_http_client.h" -#include "hdfs_http_query.h" -#include "hdfs_json_parser.h" -#include "jni_helper.h" - -#define HADOOP_HDFS_CONF "org/apache/hadoop/hdfs/HdfsConfiguration" -#define HADOOP_NAMENODE "org/apache/hadoop/hdfs/server/namenode/NameNode" -#define JAVA_INETSOCKETADDRESS "java/net/InetSocketAddress" - -struct hdfsBuilder { - int forceNewInstance; - const char *nn; - tPort port; - const char *kerbTicketCachePath; - const char *userName; -}; - -/** - * The information required for accessing webhdfs, - * including the network address of the namenode and the user name - * - * Unlike the string in hdfsBuilder, the strings in this structure are - * dynamically allocated. This structure will not be freed until we disconnect - * from HDFS. - */ -struct hdfs_internal { - char *nn; - tPort port; - char *userName; - - /** - * Working directory -- stored with a trailing slash. - */ - char *workingDir; -}; - -/** - * The 'file-handle' to a file in hdfs. - */ -struct hdfsFile_internal { - struct webhdfsFileHandle* file; - enum hdfsStreamType type; /* INPUT or OUTPUT */ - int flags; /* Flag indicate read/create/append etc. */ - tOffset offset; /* Current offset position in the file */ -}; - -/** - * Create, initialize and return a webhdfsBuffer - */ -static int initWebHdfsBuffer(struct webhdfsBuffer **webhdfsBuffer) -{ - int ret = 0; - struct webhdfsBuffer *buffer = calloc(1, sizeof(struct webhdfsBuffer)); - if (!buffer) { - fprintf(stderr, - "ERROR: fail to allocate memory for webhdfsBuffer.\n"); - return ENOMEM; - } - ret = pthread_mutex_init(&buffer->writeMutex, NULL); - if (ret) { - fprintf(stderr, "ERROR: fail in pthread_mutex_init for writeMutex " - "in initWebHdfsBuffer, <%d>: %s.\n", - ret, hdfs_strerror(ret)); - goto done; - } - ret = pthread_cond_init(&buffer->newwrite_or_close, NULL); - if (ret) { - fprintf(stderr, - "ERROR: fail in pthread_cond_init for newwrite_or_close " - "in initWebHdfsBuffer, <%d>: %s.\n", - ret, hdfs_strerror(ret)); - goto done; - } - ret = pthread_cond_init(&buffer->transfer_finish, NULL); - if (ret) { - fprintf(stderr, - "ERROR: fail in pthread_cond_init for transfer_finish " - "in initWebHdfsBuffer, <%d>: %s.\n", - ret, hdfs_strerror(ret)); - goto done; - } - -done: - if (ret) { - free(buffer); - return ret; - } - *webhdfsBuffer = buffer; - return 0; -} - -/** - * Reset the webhdfsBuffer. This is used in a block way - * when hdfsWrite is called with a new buffer to write. - * The writing thread in libcurl will be waken up to continue writing, - * and the caller of this function is blocked waiting for writing to finish. - * - * @param wb The handle of the webhdfsBuffer - * @param buffer The buffer provided by user to write - * @param length The length of bytes to write - * @return Updated webhdfsBuffer. - */ -static struct webhdfsBuffer *resetWebhdfsBuffer(struct webhdfsBuffer *wb, - const char *buffer, size_t length) -{ - if (buffer && length > 0) { - pthread_mutex_lock(&wb->writeMutex); - wb->wbuffer = buffer; - wb->offset = 0; - wb->remaining = length; - pthread_cond_signal(&wb->newwrite_or_close); - while (wb->remaining != 0) { - pthread_cond_wait(&wb->transfer_finish, &wb->writeMutex); - } - pthread_mutex_unlock(&wb->writeMutex); - } - return wb; -} - -/** - * Free the webhdfsBuffer and destroy its pthread conditions/mutex - * @param buffer The webhdfsBuffer to free - */ -static void freeWebhdfsBuffer(struct webhdfsBuffer *buffer) -{ - int ret = 0; - if (buffer) { - ret = pthread_cond_destroy(&buffer->newwrite_or_close); - if (ret) { - fprintf(stderr, - "WARN: fail in pthread_cond_destroy for newwrite_or_close " - "in freeWebhdfsBuffer, <%d>: %s.\n", - ret, hdfs_strerror(ret)); - errno = ret; - } - ret = pthread_cond_destroy(&buffer->transfer_finish); - if (ret) { - fprintf(stderr, - "WARN: fail in pthread_cond_destroy for transfer_finish " - "in freeWebhdfsBuffer, <%d>: %s.\n", - ret, hdfs_strerror(ret)); - errno = ret; - } - ret = pthread_mutex_destroy(&buffer->writeMutex); - if (ret) { - fprintf(stderr, - "WARN: fail in pthread_mutex_destroy for writeMutex " - "in freeWebhdfsBuffer, <%d>: %s.\n", - ret, hdfs_strerror(ret)); - errno = ret; - } - free(buffer); - buffer = NULL; - } -} - -/** - * To free the webhdfsFileHandle, which includes a webhdfsBuffer and strings - * @param handle The webhdfsFileHandle to free - */ -static void freeWebFileHandle(struct webhdfsFileHandle * handle) -{ - if (!handle) - return; - freeWebhdfsBuffer(handle->uploadBuffer); - free(handle->datanode); - free(handle->absPath); - free(handle); -} - -static const char *maybeNull(const char *str) -{ - return str ? str : "(NULL)"; -} - -/** To print a hdfsBuilder as string */ -static const char *hdfsBuilderToStr(const struct hdfsBuilder *bld, - char *buf, size_t bufLen) -{ - int strlength = snprintf(buf, bufLen, "nn=%s, port=%d, " - "kerbTicketCachePath=%s, userName=%s", - maybeNull(bld->nn), bld->port, - maybeNull(bld->kerbTicketCachePath), maybeNull(bld->userName)); - if (strlength < 0 || strlength >= bufLen) { - fprintf(stderr, "failed to print a hdfsBuilder as string.\n"); - return NULL; - } - return buf; -} - -/** - * Free a hdfs_internal handle - * @param fs The hdfs_internal handle to free - */ -static void freeWebHdfsInternal(struct hdfs_internal *fs) -{ - if (fs) { - free(fs->nn); - free(fs->userName); - free(fs->workingDir); - } -} - -struct hdfsBuilder *hdfsNewBuilder(void) -{ - struct hdfsBuilder *bld = calloc(1, sizeof(struct hdfsBuilder)); - if (!bld) { - errno = ENOMEM; - return NULL; - } - return bld; -} - -void hdfsFreeBuilder(struct hdfsBuilder *bld) -{ - free(bld); -} - -void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld) -{ - // We don't cache instances in libwebhdfs, so this is not applicable. -} - -void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn) -{ - if (bld) { - bld->nn = nn; - } -} - -void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port) -{ - if (bld) { - bld->port = port; - } -} - -void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName) -{ - if (bld) { - bld->userName = userName; - } -} - -void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld, - const char *kerbTicketCachePath) -{ - if (bld) { - bld->kerbTicketCachePath = kerbTicketCachePath; - } -} - -hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user) -{ - struct hdfsBuilder* bld = hdfsNewBuilder(); - if (!bld) { - return NULL; - } - hdfsBuilderSetNameNode(bld, nn); - hdfsBuilderSetNameNodePort(bld, port); - hdfsBuilderSetUserName(bld, user); - return hdfsBuilderConnect(bld); -} - -hdfsFS hdfsConnect(const char* nn, tPort port) -{ - return hdfsConnectAsUser(nn, port, NULL); -} - -hdfsFS hdfsConnectNewInstance(const char* nn, tPort port) -{ - return hdfsConnect(nn, port); -} - -hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port, - const char *user) -{ - struct hdfsBuilder *bld = hdfsNewBuilder(); - if (!bld) - return NULL; - hdfsBuilderSetNameNode(bld, host); - hdfsBuilderSetNameNodePort(bld, port); - hdfsBuilderSetUserName(bld, user); - hdfsBuilderSetForceNewInstance(bld); - return hdfsBuilderConnect(bld); -} - -/** - * To retrieve the default configuration value for NameNode's hostName and port - * TODO: This function currently is using JNI, - * we need to do this without using JNI (HDFS-3917) - * - * @param bld The hdfsBuilder handle - * @param port Used to get the default value for NameNode's port - * @param nn Used to get the default value for NameNode's hostName - * @return 0 for success and non-zero value for failure - */ -static int retrieveDefaults(const struct hdfsBuilder *bld, tPort *port, - char **nn) -{ - JNIEnv *env = 0; - jobject jHDFSConf = NULL, jAddress = NULL; - jstring jHostName = NULL; - jvalue jVal; - jthrowable jthr = NULL; - int ret = 0; - char buf[512]; - - env = getJNIEnv(); - if (!env) { - return EINTERNAL; - } - - jthr = constructNewObjectOfClass(env, &jHDFSConf, HADOOP_HDFS_CONF, "()V"); - if (jthr) { - ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, - "hdfsBuilderConnect(%s)", - hdfsBuilderToStr(bld, buf, sizeof(buf))); - goto done; - } - - jthr = invokeMethod(env, &jVal, STATIC, NULL, - HADOOP_NAMENODE, "getHttpAddress", - "(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/InetSocketAddress;", - jHDFSConf); - if (jthr) { - ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, - "hdfsBuilderConnect(%s)", - hdfsBuilderToStr(bld, buf, sizeof(buf))); - goto done; - } - jAddress = jVal.l; - - jthr = invokeMethod(env, &jVal, INSTANCE, jAddress, - JAVA_INETSOCKETADDRESS, "getPort", "()I"); - if (jthr) { - ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, - "hdfsBuilderConnect(%s)", - hdfsBuilderToStr(bld, buf, sizeof(buf))); - goto done; - } - *port = jVal.i; - - jthr = invokeMethod(env, &jVal, INSTANCE, jAddress, - JAVA_INETSOCKETADDRESS, - "getHostName", "()Ljava/lang/String;"); - if (jthr) { - ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, - "hdfsBuilderConnect(%s)", - hdfsBuilderToStr(bld, buf, sizeof(buf))); - goto done; - } - jHostName = jVal.l; - jthr = newCStr(env, jHostName, nn); - if (jthr) { - ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, - "hdfsBuilderConnect(%s)", - hdfsBuilderToStr(bld, buf, sizeof(buf))); - goto done; - } - -done: - destroyLocalReference(env, jHDFSConf); - destroyLocalReference(env, jAddress); - destroyLocalReference(env, jHostName); - return ret; -} - -hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) -{ - struct hdfs_internal *fs = NULL; - int ret = 0; - - if (!bld) { - ret = EINVAL; - goto done; - } - if (bld->nn == NULL) { - // In the JNI version of libhdfs this returns a LocalFileSystem. - ret = ENOTSUP; - goto done; - } - - fs = calloc(1, sizeof(*fs)); - if (!fs) { - ret = ENOMEM; - goto done; - } - // If the namenode is "default" and/or the port of namenode is 0, - // get the default namenode/port - if (bld->port == 0 || !strcasecmp("default", bld->nn)) { - ret = retrieveDefaults(bld, &fs->port, &fs->nn); - if (ret) - goto done; - } else { - fs->port = bld->port; - fs->nn = strdup(bld->nn); - if (!fs->nn) { - ret = ENOMEM; - goto done; - } - } - if (bld->userName) { - // userName may be NULL - fs->userName = strdup(bld->userName); - if (!fs->userName) { - ret = ENOMEM; - goto done; - } - } - // The working directory starts out as root. - fs->workingDir = strdup("/"); - if (!fs->workingDir) { - ret = ENOMEM; - goto done; - } - // For debug - fprintf(stderr, "namenode: %s:%d\n", bld->nn, bld->port); - -done: - free(bld); - if (ret) { - freeWebHdfsInternal(fs); - errno = ret; - return NULL; - } - return fs; -} - -int hdfsDisconnect(hdfsFS fs) -{ - if (fs == NULL) { - errno = EINVAL; - return -1; - } - freeWebHdfsInternal(fs); - return 0; -} - -/** - * Based on the working directory stored in hdfsFS, - * generate the absolute path for the given path - * - * @param fs The hdfsFS handle which stores the current working directory - * @param path The given path which may not be an absolute path - * @param absPath To hold generated absolute path for the given path - * @return 0 on success, non-zero value indicating error - */ -static int getAbsolutePath(hdfsFS fs, const char *path, char **absPath) -{ - char *tempPath = NULL; - size_t absPathLen; - int strlength; - - if (path[0] == '/') { - // Path is already absolute. - tempPath = strdup(path); - if (!tempPath) { - return ENOMEM; - } - *absPath = tempPath; - return 0; - } - // Prepend the workingDir to the path. - absPathLen = strlen(fs->workingDir) + strlen(path) + 1; - tempPath = malloc(absPathLen); - if (!tempPath) { - return ENOMEM; - } - strlength = snprintf(tempPath, absPathLen, "%s%s", fs->workingDir, path); - if (strlength < 0 || strlength >= absPathLen) { - free(tempPath); - return EIO; - } - *absPath = tempPath; - return 0; -} - -int hdfsCreateDirectory(hdfsFS fs, const char* path) -{ - char *url = NULL, *absPath = NULL; - struct Response *resp = NULL; - int ret = 0; - - if (fs == NULL || path == NULL) { - ret = EINVAL; - goto done; - } - ret = getAbsolutePath(fs, path, &absPath); - if (ret) { - goto done; - } - ret = createUrlForMKDIR(fs->nn, fs->port, absPath, fs->userName, &url); - if (ret) { - goto done; - } - ret = launchMKDIR(url, &resp); - if (ret) { - goto done; - } - ret = parseMKDIR(resp->body->content); -done: - freeResponse(resp); - free(url); - free(absPath); - if (ret) { - errno = ret; - return -1; - } - return 0; -} - -int hdfsChmod(hdfsFS fs, const char* path, short mode) -{ - char *absPath = NULL, *url = NULL; - struct Response *resp = NULL; - int ret = 0; - - if (fs == NULL || path == NULL) { - ret = EINVAL; - goto done; - } - ret = getAbsolutePath(fs, path, &absPath); - if (ret) { - goto done; - } - ret = createUrlForCHMOD(fs->nn, fs->port, absPath, (int) mode, - fs->userName, &url); - if (ret) { - goto done; - } - ret = launchCHMOD(url, &resp); - if (ret) { - goto done; - } - ret = parseCHMOD(resp->header->content, resp->body->content); -done: - freeResponse(resp); - free(absPath); - free(url); - if (ret) { - errno = ret; - return -1; - } - return 0; -} - -int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group) -{ - int ret = 0; - char *absPath = NULL, *url = NULL; - struct Response *resp = NULL; - - if (fs == NULL || path == NULL) { - ret = EINVAL; - goto done; - } - - ret = getAbsolutePath(fs, path, &absPath); - if (ret) { - goto done; - } - ret = createUrlForCHOWN(fs->nn, fs->port, absPath, - owner, group, fs->userName, &url); - if (ret) { - goto done; - } - ret = launchCHOWN(url, &resp); - if (ret) { - goto done; - } - ret = parseCHOWN(resp->header->content, resp->body->content); -done: - freeResponse(resp); - free(absPath); - free(url); - if (ret) { - errno = ret; - return -1; - } - return 0; -} - -int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath) -{ - char *oldAbsPath = NULL, *newAbsPath = NULL, *url = NULL; - int ret = 0; - struct Response *resp = NULL; - - if (fs == NULL || oldPath == NULL || newPath == NULL) { - ret = EINVAL; - goto done; - } - ret = getAbsolutePath(fs, oldPath, &oldAbsPath); - if (ret) { - goto done; - } - ret = getAbsolutePath(fs, newPath, &newAbsPath); - if (ret) { - goto done; - } - ret = createUrlForRENAME(fs->nn, fs->port, oldAbsPath, - newAbsPath, fs->userName, &url); - if (ret) { - goto done; - } - ret = launchRENAME(url, &resp); - if (ret) { - goto done; - } - ret = parseRENAME(resp->body->content); -done: - freeResponse(resp); - free(oldAbsPath); - free(newAbsPath); - free(url); - if (ret) { - errno = ret; - return -1; - } - return 0; -} - -/** - * Get the file status for a given path. - * - * @param fs hdfsFS handle containing - * NameNode hostName/port information - * @param path Path for file - * @param printError Whether or not to print out error information - * (mainly remote FileNotFoundException) - * @return File information for the given path - */ -static hdfsFileInfo *hdfsGetPathInfoImpl(hdfsFS fs, const char* path, - int printError) -{ - char *absPath = NULL; - char *url=NULL; - struct Response *resp = NULL; - int ret = 0; - hdfsFileInfo *fileInfo = NULL; - - if (fs == NULL || path == NULL) { - ret = EINVAL; - goto done; - } - ret = getAbsolutePath(fs, path, &absPath); - if (ret) { - goto done; - } - fileInfo = (hdfsFileInfo *) calloc(1, sizeof(hdfsFileInfo)); - if (!fileInfo) { - ret = ENOMEM; - goto done; - } - fileInfo->mKind = kObjectKindFile; - - ret = createUrlForGetFileStatus(fs->nn, fs->port, absPath, - fs->userName, &url); - if (ret) { - goto done; - } - ret = launchGFS(url, &resp); - if (ret) { - goto done; - } - ret = parseGFS(resp->body->content, fileInfo, printError); - -done: - freeResponse(resp); - free(absPath); - free(url); - if (ret == 0) { - return fileInfo; - } else { - free(fileInfo); - errno = ret; - return NULL; - } -} - -hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) -{ - return hdfsGetPathInfoImpl(fs, path, 1); -} - -hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries) -{ - char *url = NULL, *absPath = NULL; - struct Response *resp = NULL; - int ret = 0; - hdfsFileInfo *fileInfo = NULL; - - if (fs == NULL || path == NULL) { - ret = EINVAL; - goto done; - } - ret = getAbsolutePath(fs, path, &absPath); - if (ret) { - goto done; - } - fileInfo = calloc(1, sizeof(*fileInfo)); - if (!fileInfo) { - ret = ENOMEM; - goto done; - } - - ret = createUrlForLS(fs->nn, fs->port, absPath, fs->userName, &url); - if (ret) { - goto done; - } - ret = launchLS(url, &resp); - if (ret) { - goto done; - } - ret = parseLS(resp->body->content, &fileInfo, numEntries); - -done: - freeResponse(resp); - free(absPath); - free(url); - if (ret == 0) { - return fileInfo; - } else { - errno = ret; - return NULL; - } -} - -int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) -{ - char *url = NULL, *absPath = NULL; - struct Response *resp = NULL; - int ret = 0; - - if (fs == NULL || path == NULL) { - ret = EINVAL; - goto done; - } - ret = getAbsolutePath(fs, path, &absPath); - if (ret) { - goto done; - } - - ret = createUrlForSETREPLICATION(fs->nn, fs->port, absPath, - replication, fs->userName, &url); - if (ret) { - goto done; - } - ret = launchSETREPLICATION(url, &resp); - if (ret) { - goto done; - } - ret = parseSETREPLICATION(resp->body->content); -done: - freeResponse(resp); - free(absPath); - free(url); - if (ret) { - errno = ret; - return -1; - } - return 0; -} - -void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries) -{ - int i; - for (i = 0; i < numEntries; ++i) { - free(hdfsFileInfo[i].mName); - free(hdfsFileInfo[i].mOwner); - free(hdfsFileInfo[i].mGroup); - } - free(hdfsFileInfo); -} - -int hdfsDelete(hdfsFS fs, const char* path, int recursive) -{ - char *url = NULL, *absPath = NULL; - struct Response *resp = NULL; - int ret = 0; - - if (fs == NULL || path == NULL) { - ret = EINVAL; - goto done; - } - ret = getAbsolutePath(fs, path, &absPath); - if (ret) { - goto done; - } - - ret = createUrlForDELETE(fs->nn, fs->port, absPath, - recursive, fs->userName, &url); - if (ret) { - goto done; - } - ret = launchDELETE(url, &resp); - if (ret) { - goto done; - } - ret = parseDELETE(resp->body->content); -done: - freeResponse(resp); - free(absPath); - free(url); - if (ret) { - errno = ret; - return -1; - } - return 0; -} - -int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) -{ - char *url = NULL, *absPath = NULL; - struct Response *resp = NULL; - int ret = 0; - - if (fs == NULL || path == NULL) { - ret = EINVAL; - goto done; - } - ret = getAbsolutePath(fs, path, &absPath); - if (ret) { - goto done; - } - - ret = createUrlForUTIMES(fs->nn, fs->port, absPath, mtime, atime, - fs->userName, &url); - if (ret) { - goto done; - } - ret = launchUTIMES(url, &resp); - if (ret) { - goto done; - } - ret = parseUTIMES(resp->header->content, resp->body->content); -done: - freeResponse(resp); - free(absPath); - free(url); - if (ret) { - errno = ret; - return -1; - } - return 0; -} - -int hdfsExists(hdfsFS fs, const char *path) -{ - hdfsFileInfo *fileInfo = hdfsGetPathInfoImpl(fs, path, 0); - if (!fileInfo) { - // (errno will have been set by hdfsGetPathInfo) - return -1; - } - hdfsFreeFileInfo(fileInfo, 1); - return 0; -} - -/** - * The information hold by the thread which writes data to hdfs through http - */ -typedef struct { - char *url; /* the url of the target datanode for writing*/ - struct webhdfsBuffer *uploadBuffer; /* buffer storing data to write */ - int flags; /* flag indicating writing mode: create or append */ - struct Response *resp; /* response from the target datanode */ -} threadData; - -/** - * Free the threadData struct instance, - * including the response and url contained in it - * @param data The threadData instance to free - */ -static void freeThreadData(threadData *data) -{ - if (data) { - if (data->url) { - free(data->url); - } - if (data->resp) { - freeResponse(data->resp); - } - // The uploadBuffer would be freed by freeWebFileHandle() - free(data); - data = NULL; - } -} - -/** - * The action of the thread that writes data to - * the target datanode for hdfsWrite. - * The writing can be either create or append, which is specified by flag - */ -static void *writeThreadOperation(void *v) -{ - int ret = 0; - threadData *data = v; - if (data->flags & O_APPEND) { - ret = launchDnAPPEND(data->url, data->uploadBuffer, &(data->resp)); - } else { - ret = launchDnWRITE(data->url, data->uploadBuffer, &(data->resp)); - } - if (ret) { - fprintf(stderr, "Failed to write to datanode %s, <%d>: %s.\n", - data->url, ret, hdfs_strerror(ret)); - } - return data; -} - -/** - * Free the memory associated with a webHDFS file handle. - * - * No other resources will be freed. - * - * @param file The webhdfs file handle - */ -static void freeFileInternal(hdfsFile file) -{ - if (!file) - return; - freeWebFileHandle(file->file); - free(file); -} - -/** - * Helper function for opening a file for OUTPUT. - * - * As part of the open process for OUTPUT files, we have to connect to the - * NameNode and get the URL of the corresponding DataNode. - * We also create a background thread here for doing I/O. - * - * @param webhandle The webhandle being opened - * @return 0 on success; error code otherwise - */ -static int hdfsOpenOutputFileImpl(hdfsFS fs, hdfsFile file) -{ - struct webhdfsFileHandle *webhandle = file->file; - struct Response *resp = NULL; - int append, ret = 0; - char *nnUrl = NULL, *dnUrl = NULL; - threadData *data = NULL; - - ret = initWebHdfsBuffer(&webhandle->uploadBuffer); - if (ret) { - goto done; - } - append = file->flags & O_APPEND; - if (!append) { - // If we're not appending, send a create request to the NN - ret = createUrlForNnWRITE(fs->nn, fs->port, webhandle->absPath, - fs->userName, webhandle->replication, - webhandle->blockSize, &nnUrl); - } else { - ret = createUrlForNnAPPEND(fs->nn, fs->port, webhandle->absPath, - fs->userName, &nnUrl); - } - if (ret) { - fprintf(stderr, "Failed to create the url connecting to namenode " - "for file creation/appending, <%d>: %s.\n", - ret, hdfs_strerror(ret)); - goto done; - } - if (!append) { - ret = launchNnWRITE(nnUrl, &resp); - } else { - ret = launchNnAPPEND(nnUrl, &resp); - } - if (ret) { - fprintf(stderr, "fail to get the response from namenode for " - "file creation/appending, <%d>: %s.\n", - ret, hdfs_strerror(ret)); - goto done; - } - if (!append) { - ret = parseNnWRITE(resp->header->content, resp->body->content); - } else { - ret = parseNnAPPEND(resp->header->content, resp->body->content); - } - if (ret) { - fprintf(stderr, "fail to parse the response from namenode for " - "file creation/appending, <%d>: %s.\n", - ret, hdfs_strerror(ret)); - goto done; - } - ret = parseDnLoc(resp->header->content, &dnUrl); - if (ret) { - fprintf(stderr, "fail to get the datanode url from namenode " - "for file creation/appending, <%d>: %s.\n", - ret, hdfs_strerror(ret)); - goto done; - } - //store the datanode url in the file handle - webhandle->datanode = strdup(dnUrl); - if (!webhandle->datanode) { - ret = ENOMEM; - goto done; - } - //create a new thread for performing the http transferring - data = calloc(1, sizeof(*data)); - if (!data) { - ret = ENOMEM; - goto done; - } - data->url = strdup(dnUrl); - if (!data->url) { - ret = ENOMEM; - goto done; - } - data->flags = file->flags; - data->uploadBuffer = webhandle->uploadBuffer; - ret = pthread_create(&webhandle->connThread, NULL, - writeThreadOperation, data); - if (ret) { - fprintf(stderr, "ERROR: failed to create the writing thread " - "in hdfsOpenOutputFileImpl, <%d>: %s.\n", - ret, hdfs_strerror(ret)); - goto done; - } - webhandle->uploadBuffer->openFlag = 1; - -done: - freeResponse(resp); - free(nnUrl); - free(dnUrl); - if (ret) { - errno = ret; - if (data) { - free(data->url); - free(data); - } - } - return ret; -} - -hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags, - int bufferSize, short replication, tSize blockSize) -{ - int ret = 0; - int accmode = flags & O_ACCMODE; - struct webhdfsFileHandle *webhandle = NULL; - hdfsFile file = NULL; - - if (fs == NULL || path == NULL) { - ret = EINVAL; - goto done; - } - if (accmode == O_RDWR) { - // TODO: the original libhdfs has very hackish support for this; should - // we do the same? It would actually be a lot easier in libwebhdfs - // since the protocol isn't connection-oriented. - fprintf(stderr, "ERROR: cannot open an hdfs file in O_RDWR mode\n"); - ret = ENOTSUP; - goto done; - } - if ((flags & O_CREAT) && (flags & O_EXCL)) { - fprintf(stderr, - "WARN: hdfs does not truly support O_CREATE && O_EXCL\n"); - } - file = calloc(1, sizeof(struct hdfsFile_internal)); - if (!file) { - ret = ENOMEM; - goto done; - } - file->flags = flags; - file->type = accmode == O_RDONLY ? INPUT : OUTPUT; - file->offset = 0; - webhandle = calloc(1, sizeof(struct webhdfsFileHandle)); - if (!webhandle) { - ret = ENOMEM; - goto done; - } - webhandle->bufferSize = bufferSize; - webhandle->replication = replication; - webhandle->blockSize = blockSize; - ret = getAbsolutePath(fs, path, &webhandle->absPath); - if (ret) { - goto done; - } - file->file = webhandle; - // If open for write/append, - // open and keep the connection with the target datanode for writing - if (file->type == OUTPUT) { - ret = hdfsOpenOutputFileImpl(fs, file); - if (ret) { - goto done; - } - } - -done: - if (ret) { - if (file) { - freeFileInternal(file); // Also frees webhandle - } else { - freeWebFileHandle(webhandle); - } - errno = ret; - return NULL; - } - return file; -} - -int hdfsTruncateFile(hdfsFS fs, const char* path, tOffset newlength) -{ - errno = ENOTSUP; - return -1; -} - -tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void* buffer, tSize length) -{ - if (length == 0) { - return 0; - } - if (fs == NULL || file == NULL || file->type != OUTPUT || length < 0) { - errno = EBADF; - return -1; - } - - struct webhdfsFileHandle *wfile = file->file; - if (wfile->uploadBuffer && wfile->uploadBuffer->openFlag) { - resetWebhdfsBuffer(wfile->uploadBuffer, buffer, length); - return length; - } else { - fprintf(stderr, - "Error: have not opened the file %s for writing yet.\n", - wfile->absPath); - errno = EBADF; - return -1; - } -} - -int hdfsCloseFile(hdfsFS fs, hdfsFile file) -{ - void *respv = NULL; - threadData *tdata = NULL; - int ret = 0; - struct webhdfsFileHandle *wfile = NULL; - - if (file->type == OUTPUT) { - wfile = file->file; - pthread_mutex_lock(&(wfile->uploadBuffer->writeMutex)); - wfile->uploadBuffer->closeFlag = 1; - pthread_cond_signal(&wfile->uploadBuffer->newwrite_or_close); - pthread_mutex_unlock(&(wfile->uploadBuffer->writeMutex)); - - // Waiting for the writing thread to terminate - ret = pthread_join(wfile->connThread, &respv); - if (ret) { - fprintf(stderr, "Error when pthread_join in hdfsClose, <%d>: %s.\n", - ret, hdfs_strerror(ret)); - } - // Parse the response - tdata = respv; - if (!tdata || !(tdata->resp)) { - fprintf(stderr, - "ERROR: response from the writing thread is NULL.\n"); - ret = EIO; - } - if (file->flags & O_APPEND) { - ret = parseDnAPPEND(tdata->resp->header->content, - tdata->resp->body->content); - } else { - ret = parseDnWRITE(tdata->resp->header->content, - tdata->resp->body->content); - } - // Free the threaddata - freeThreadData(tdata); - } - freeFileInternal(file); - if (ret) { - errno = ret; - return -1; - } - return 0; -} - -int hdfsFileIsOpenForRead(hdfsFile file) -{ - return (file->type == INPUT); -} - -int hdfsFileGetReadStatistics(hdfsFile file, - struct hdfsReadStatistics **stats) -{ - errno = ENOTSUP; - return -1; -} - -int64_t hdfsReadStatisticsGetRemoteBytesRead( - const struct hdfsReadStatistics *stats) -{ - return stats->totalBytesRead - stats->totalLocalBytesRead; -} - -void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats) -{ - free(stats); -} - -int hdfsFileIsOpenForWrite(hdfsFile file) -{ - return (file->type == OUTPUT); -} - -static int hdfsReadImpl(hdfsFS fs, hdfsFile file, void* buffer, tSize off, - tSize length, tSize *numRead) -{ - int ret = 0; - char *url = NULL; - struct Response *resp = NULL; - - if (fs == NULL || file == NULL || file->type != INPUT || buffer == NULL || - length < 0) { - ret = EINVAL; - goto done; - } - if (length == 0) { - // Special case: the user supplied a buffer of zero length, so there is - // nothing to do. - *numRead = 0; - goto done; - } - resp = calloc(1, sizeof(*resp)); // resp is actually a pointer type - if (!resp) { - ret = ENOMEM; - goto done; - } - ret = initResponseBuffer(&(resp->header)); - if (ret) { - goto done; - } - ret = initResponseBuffer(&(resp->body)); - if (ret) { - goto done; - } - memset(buffer, 0, length); - resp->body->content = buffer; - resp->body->remaining = length; - - ret = createUrlForOPEN(fs->nn, fs->port, file->file->absPath, - fs->userName, off, length, &url); - if (ret) { - goto done; - } - ret = launchOPEN(url, resp); - if (ret) { - goto done; - } - ret = parseOPEN(resp->header->content, resp->body->content); - if (ret == -1) { - // Special case: if parseOPEN returns -1, we asked for a byte range - // with outside what the file contains. In this case, hdfsRead and - // hdfsPread return 0, meaning end-of-file. - *numRead = 0; - } else if (ret == 0) { - *numRead = (tSize) resp->body->offset; - } -done: - if (resp) { - freeResponseBuffer(resp->header); - free(resp->body); - } - free(resp); - free(url); - return ret; -} - -tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length) -{ - int ret = 0; - tSize numRead = 0; - - ret = hdfsReadImpl(fs, file, buffer, (tSize) file->offset, - length, &numRead); - if (ret > 0) { // ret == -1 means end of file - errno = ret; - return -1; - } - file->offset += numRead; - return numRead; -} - -int hdfsAvailable(hdfsFS fs, hdfsFile file) -{ - /* We actually always block when reading from webhdfs, currently. So the - * number of bytes that can be read without blocking is currently 0. - */ - return 0; -} - -int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) -{ - struct webhdfsFileHandle *wf; - hdfsFileInfo *fileInfo = NULL; - int ret = 0; - - if (!fs || !file || (file->type == OUTPUT) || (desiredPos < 0)) { - ret = EINVAL; - goto done; - } - wf = file->file; - if (!wf) { - ret = EINVAL; - goto done; - } - fileInfo = hdfsGetPathInfo(fs, wf->absPath); - if (!fileInfo) { - ret = errno; - goto done; - } - if (desiredPos > fileInfo->mSize) { - fprintf(stderr, - "hdfsSeek for %s failed since the desired position %" PRId64 - " is beyond the size of the file %" PRId64 "\n", - wf->absPath, desiredPos, fileInfo->mSize); - ret = ENOTSUP; - goto done; - } - file->offset = desiredPos; - -done: - if (fileInfo) { - hdfsFreeFileInfo(fileInfo, 1); - } - if (ret) { - errno = ret; - return -1; - } - return 0; -} - -tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, - void* buffer, tSize length) -{ - int ret; - tSize numRead = 0; - - if (position < 0) { - errno = EINVAL; - return -1; - } - ret = hdfsReadImpl(fs, file, buffer, (tSize) position, length, &numRead); - if (ret > 0) { - errno = ret; - return -1; - } - return numRead; -} - -tOffset hdfsTell(hdfsFS fs, hdfsFile file) -{ - if (!file) { - errno = EINVAL; - return -1; - } - return file->offset; -} - -char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize) -{ - int strlength; - if (fs == NULL || buffer == NULL || bufferSize <= 0) { - errno = EINVAL; - return NULL; - } - strlength = snprintf(buffer, bufferSize, "%s", fs->workingDir); - if (strlength >= bufferSize) { - errno = ENAMETOOLONG; - return NULL; - } else if (strlength < 0) { - errno = EIO; - return NULL; - } - return buffer; -} - -/** Replace "//" with "/" in path */ -static void normalizePath(char *path) -{ - int i = 0, j = 0, sawslash = 0; - - for (i = j = sawslash = 0; path[i] != '\0'; i++) { - if (path[i] != '/') { - sawslash = 0; - path[j++] = path[i]; - } else if (path[i] == '/' && !sawslash) { - sawslash = 1; - path[j++] = '/'; - } - } - path[j] = '\0'; -} - -int hdfsSetWorkingDirectory(hdfsFS fs, const char* path) -{ - char *newWorkingDir = NULL; - size_t strlenPath = 0, newWorkingDirLen = 0; - int strlength; - - if (fs == NULL || path == NULL) { - errno = EINVAL; - return -1; - } - strlenPath = strlen(path); - if (strlenPath < 1) { - errno = EINVAL; - return -1; - } - // the max string length of the new working dir is - // (length of old working dir) + (length of given path) + strlen("/") + 1 - newWorkingDirLen = strlen(fs->workingDir) + strlenPath + 2; - newWorkingDir = malloc(newWorkingDirLen); - if (!newWorkingDir) { - errno = ENOMEM; - return -1; - } - strlength = snprintf(newWorkingDir, newWorkingDirLen, "%s%s%s", - (path[0] == '/') ? "" : fs->workingDir, - path, (path[strlenPath - 1] == '/') ? "" : "/"); - if (strlength < 0 || strlength >= newWorkingDirLen) { - free(newWorkingDir); - errno = EIO; - return -1; - } - - if (strstr(path, "//")) { - // normalize the path by replacing "//" with "/" - normalizePath(newWorkingDir); - } - - free(fs->workingDir); - fs->workingDir = newWorkingDir; - return 0; -} - -void hdfsFreeHosts(char ***blockHosts) -{ - int i, j; - for (i=0; blockHosts[i]; i++) { - for (j=0; blockHosts[i][j]; j++) { - free(blockHosts[i][j]); - } - free(blockHosts[i]); - } - free(blockHosts); -} - -tOffset hdfsGetDefaultBlockSize(hdfsFS fs) -{ - errno = ENOTSUP; - return -1; -} - -int hdfsFileUsesDirectRead(hdfsFile file) -{ - return 0; // webhdfs never performs direct reads. -} - -void hdfsFileDisableDirectRead(hdfsFile file) -{ - // webhdfs never performs direct reads -} - -int hdfsHFlush(hdfsFS fs, hdfsFile file) -{ - if (file->type != OUTPUT) { - errno = EINVAL; - return -1; - } - // TODO: block until our write buffer is flushed (HDFS-3952) - return 0; -} - -int hdfsFlush(hdfsFS fs, hdfsFile file) -{ - if (file->type != OUTPUT) { - errno = EINVAL; - return -1; - } - // TODO: block until our write buffer is flushed (HDFS-3952) - return 0; -} - -char*** hdfsGetHosts(hdfsFS fs, const char* path, - tOffset start, tOffset length) -{ - errno = ENOTSUP; - return NULL; -} - -tOffset hdfsGetCapacity(hdfsFS fs) -{ - errno = ENOTSUP; - return -1; -} - -tOffset hdfsGetUsed(hdfsFS fs) -{ - errno = ENOTSUP; - return -1; -} - -int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) -{ - errno = ENOTSUP; - return -1; -} - -int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) -{ - errno = ENOTSUP; - return -1; -} - diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/test_libwebhdfs_ops.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/test_libwebhdfs_ops.c deleted file mode 100644 index af748d83323..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/test_libwebhdfs_ops.c +++ /dev/null @@ -1,552 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "hdfs/hdfs.h" -#include "libhdfs-tests/native_mini_dfs.h" - -#include -#include -#include -#include -#include -#include -#include - -static struct NativeMiniDfsCluster *cluster; - -void permission_disp(short permissions, char *rtr) -{ - rtr[9] = '\0'; - int i; - short perm; - for(i = 2; i >= 0; i--) - { - perm = permissions >> (i * 3); - rtr[0] = perm & 4 ? 'r' : '-'; - rtr[1] = perm & 2 ? 'w' : '-'; - rtr[2] = perm & 1 ? 'x' : '-'; - rtr += 3; - } -} - -int main(int argc, char **argv) -{ - char buffer[32]; - tSize num_written_bytes; - const char* slashTmp = "/tmp"; - int nnPort; - char *rwTemplate, *rwTemplate2, *newDirTemplate, - *appendTemplate, *userTemplate, *rwPath = NULL; - const char* fileContents = "Hello, World!"; - const char* nnHost = NULL; - - if (argc != 2) { - fprintf(stderr, "usage: test_libwebhdfs_ops \n"); - exit(1); - } - - struct NativeMiniDfsConf conf = { - .doFormat = 1, .webhdfsEnabled = 1, .namenodeHttpPort = 50070, - }; - cluster = nmdCreate(&conf); - if (!cluster) { - fprintf(stderr, "Failed to create the NativeMiniDfsCluster.\n"); - exit(1); - } - if (nmdWaitClusterUp(cluster)) { - fprintf(stderr, "Error when waiting for cluster to be ready.\n"); - exit(1); - } - if (nmdGetNameNodeHttpAddress(cluster, &nnPort, &nnHost)) { - fprintf(stderr, "Error when retrieving namenode host address.\n"); - exit(1); - } - - hdfsFS fs = hdfsConnectAsUserNewInstance(nnHost, nnPort, argv[1]); - if(!fs) { - fprintf(stderr, "Oops! Failed to connect to hdfs!\n"); - exit(-1); - } - - { - // Write tests - rwTemplate = strdup("/tmp/helloWorldXXXXXX"); - if (!rwTemplate) { - fprintf(stderr, "Failed to create rwTemplate!\n"); - exit(1); - } - rwPath = mktemp(rwTemplate); - // hdfsOpenFile - hdfsFile writeFile = hdfsOpenFile(fs, rwPath, - O_WRONLY|O_CREAT, 0, 0, 0); - - if(!writeFile) { - fprintf(stderr, "Failed to open %s for writing!\n", rwPath); - exit(1); - } - fprintf(stderr, "Opened %s for writing successfully...\n", rwPath); - // hdfsWrite - num_written_bytes = hdfsWrite(fs, writeFile, (void*)fileContents, - (int) strlen(fileContents) + 1); - if (num_written_bytes != strlen(fileContents) + 1) { - fprintf(stderr, "Failed to write correct number of bytes - " - "expected %d, got %d\n", - (int)(strlen(fileContents) + 1), (int) num_written_bytes); - exit(1); - } - fprintf(stderr, "Wrote %d bytes\n", num_written_bytes); - - // hdfsTell - tOffset currentPos = -1; - if ((currentPos = hdfsTell(fs, writeFile)) == -1) { - fprintf(stderr, - "Failed to get current file position correctly. Got %" - PRId64 "!\n", currentPos); - exit(1); - } - fprintf(stderr, "Current position: %" PRId64 "\n", currentPos); - - hdfsCloseFile(fs, writeFile); - // Done test write - } - - sleep(1); - - { - //Read tests - int available = 0, exists = 0; - - // hdfsExists - exists = hdfsExists(fs, rwPath); - if (exists) { - fprintf(stderr, "Failed to validate existence of %s\n", rwPath); - exists = hdfsExists(fs, rwPath); - if (exists) { - fprintf(stderr, - "Still failed to validate existence of %s\n", rwPath); - exit(1); - } - } - - hdfsFile readFile = hdfsOpenFile(fs, rwPath, O_RDONLY, 0, 0, 0); - if (!readFile) { - fprintf(stderr, "Failed to open %s for reading!\n", rwPath); - exit(1); - } - if (!hdfsFileIsOpenForRead(readFile)) { - fprintf(stderr, "hdfsFileIsOpenForRead: we just opened a file " - "with O_RDONLY, and it did not show up as 'open for " - "read'\n"); - exit(1); - } - - available = hdfsAvailable(fs, readFile); - fprintf(stderr, "hdfsAvailable: %d\n", available); - - // hdfsSeek, hdfsTell - tOffset seekPos = 1; - if(hdfsSeek(fs, readFile, seekPos)) { - fprintf(stderr, "Failed to seek %s for reading!\n", rwPath); - exit(1); - } - - tOffset currentPos = -1; - if((currentPos = hdfsTell(fs, readFile)) != seekPos) { - fprintf(stderr, - "Failed to get current file position correctly! Got %" - PRId64 "!\n", currentPos); - - exit(1); - } - fprintf(stderr, "Current position: %" PRId64 "\n", currentPos); - - if(hdfsSeek(fs, readFile, 0)) { - fprintf(stderr, "Failed to seek %s for reading!\n", rwPath); - exit(1); - } - - // hdfsRead - memset(buffer, 0, sizeof(buffer)); - tSize num_read_bytes = hdfsRead(fs, readFile, buffer, sizeof(buffer)); - if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) { - fprintf(stderr, "Failed to read (direct). " - "Expected %s but got %s (%d bytes)\n", - fileContents, buffer, num_read_bytes); - exit(1); - } - fprintf(stderr, "Read following %d bytes:\n%s\n", - num_read_bytes, buffer); - - if (hdfsSeek(fs, readFile, 0L)) { - fprintf(stderr, "Failed to seek to file start!\n"); - exit(1); - } - - // hdfsPread - memset(buffer, 0, strlen(fileContents + 1)); - num_read_bytes = hdfsPread(fs, readFile, 0, buffer, sizeof(buffer)); - fprintf(stderr, "Read following %d bytes:\n%s\n", - num_read_bytes, buffer); - - hdfsCloseFile(fs, readFile); - // Done test read - } - - int totalResult = 0; - int result = 0; - { - //Generic file-system operations - char *srcPath = rwPath; - char buffer[256]; - const char *resp; - rwTemplate2 = strdup("/tmp/helloWorld2XXXXXX"); - if (!rwTemplate2) { - fprintf(stderr, "Failed to create rwTemplate2!\n"); - exit(1); - } - char *dstPath = mktemp(rwTemplate2); - newDirTemplate = strdup("/tmp/newdirXXXXXX"); - if (!newDirTemplate) { - fprintf(stderr, "Failed to create newDirTemplate!\n"); - exit(1); - } - char *newDirectory = mktemp(newDirTemplate); - - // hdfsRename - fprintf(stderr, "hdfsRename: %s\n", - ((result = hdfsRename(fs, rwPath, dstPath)) ? - "Failed!" : "Success!")); - totalResult += result; - fprintf(stderr, "hdfsRename back: %s\n", - ((result = hdfsRename(fs, dstPath, srcPath)) ? - "Failed!" : "Success!")); - totalResult += result; - - // hdfsCreateDirectory - fprintf(stderr, "hdfsCreateDirectory: %s\n", - ((result = hdfsCreateDirectory(fs, newDirectory)) ? - "Failed!" : "Success!")); - totalResult += result; - - // hdfsSetReplication - fprintf(stderr, "hdfsSetReplication: %s\n", - ((result = hdfsSetReplication(fs, srcPath, 1)) ? - "Failed!" : "Success!")); - totalResult += result; - - // hdfsGetWorkingDirectory, hdfsSetWorkingDirectory - fprintf(stderr, "hdfsGetWorkingDirectory: %s\n", - ((resp = hdfsGetWorkingDirectory(fs, buffer, sizeof(buffer))) ? - buffer : "Failed!")); - totalResult += (resp ? 0 : 1); - - const char* path[] = {"/foo", "/foo/bar", "foobar", "//foo/bar//foobar", - "foo//bar", "foo/bar///", "/", "////"}; - int i; - for (i = 0; i < 8; i++) { - fprintf(stderr, "hdfsSetWorkingDirectory: %s, %s\n", - ((result = hdfsSetWorkingDirectory(fs, path[i])) ? - "Failed!" : "Success!"), - hdfsGetWorkingDirectory(fs, buffer, sizeof(buffer))); - totalResult += result; - } - - fprintf(stderr, "hdfsSetWorkingDirectory: %s\n", - ((result = hdfsSetWorkingDirectory(fs, slashTmp)) ? - "Failed!" : "Success!")); - totalResult += result; - fprintf(stderr, "hdfsGetWorkingDirectory: %s\n", - ((resp = hdfsGetWorkingDirectory(fs, buffer, sizeof(buffer))) ? - buffer : "Failed!")); - totalResult += (resp ? 0 : 1); - - // hdfsGetPathInfo - hdfsFileInfo *fileInfo = NULL; - if((fileInfo = hdfsGetPathInfo(fs, slashTmp)) != NULL) { - fprintf(stderr, "hdfsGetPathInfo - SUCCESS!\n"); - fprintf(stderr, "Name: %s, ", fileInfo->mName); - fprintf(stderr, "Type: %c, ", (char)(fileInfo->mKind)); - fprintf(stderr, "Replication: %d, ", fileInfo->mReplication); - fprintf(stderr, "BlockSize: %"PRId64", ", fileInfo->mBlockSize); - fprintf(stderr, "Size: %"PRId64", ", fileInfo->mSize); - fprintf(stderr, "LastMod: %s", ctime(&fileInfo->mLastMod)); - fprintf(stderr, "Owner: %s, ", fileInfo->mOwner); - fprintf(stderr, "Group: %s, ", fileInfo->mGroup); - char permissions[10]; - permission_disp(fileInfo->mPermissions, permissions); - fprintf(stderr, "Permissions: %d (%s)\n", - fileInfo->mPermissions, permissions); - hdfsFreeFileInfo(fileInfo, 1); - } else { - totalResult++; - fprintf(stderr, "hdfsGetPathInfo for %s - FAILED!\n", slashTmp); - } - - // hdfsListDirectory - hdfsFileInfo *fileList = 0; - int numEntries = 0; - if((fileList = hdfsListDirectory(fs, slashTmp, &numEntries)) != NULL) { - int i = 0; - for(i=0; i < numEntries; ++i) { - fprintf(stderr, "Name: %s, ", fileList[i].mName); - fprintf(stderr, "Type: %c, ", (char)fileList[i].mKind); - fprintf(stderr, "Replication: %d, ", fileList[i].mReplication); - fprintf(stderr, "BlockSize: %"PRId64", ", fileList[i].mBlockSize); - fprintf(stderr, "Size: %"PRId64", ", fileList[i].mSize); - fprintf(stderr, "LastMod: %s", ctime(&fileList[i].mLastMod)); - fprintf(stderr, "Owner: %s, ", fileList[i].mOwner); - fprintf(stderr, "Group: %s, ", fileList[i].mGroup); - char permissions[10]; - permission_disp(fileList[i].mPermissions, permissions); - fprintf(stderr, "Permissions: %d (%s)\n", - fileList[i].mPermissions, permissions); - } - hdfsFreeFileInfo(fileList, numEntries); - } else { - if (errno) { - totalResult++; - fprintf(stderr, "waah! hdfsListDirectory - FAILED!\n"); - } else { - fprintf(stderr, "Empty directory!\n"); - } - } - - char *newOwner = "root"; - // Setting tmp dir to 777 so later when connectAsUser nobody, - // we can write to it - short newPerm = 0666; - - // hdfsChown - fprintf(stderr, "hdfsChown: %s\n", - ((result = hdfsChown(fs, rwPath, NULL, "users")) ? - "Failed!" : "Success!")); - totalResult += result; - fprintf(stderr, "hdfsChown: %s\n", - ((result = hdfsChown(fs, rwPath, newOwner, NULL)) ? - "Failed!" : "Success!")); - totalResult += result; - // hdfsChmod - fprintf(stderr, "hdfsChmod: %s\n", - ((result = hdfsChmod(fs, rwPath, newPerm)) ? - "Failed!" : "Success!")); - totalResult += result; - - sleep(2); - tTime newMtime = time(NULL); - tTime newAtime = time(NULL); - - // utime write - fprintf(stderr, "hdfsUtime: %s\n", - ((result = hdfsUtime(fs, rwPath, newMtime, newAtime)) ? - "Failed!" : "Success!")); - totalResult += result; - - // chown/chmod/utime read - hdfsFileInfo *finfo = hdfsGetPathInfo(fs, rwPath); - - fprintf(stderr, "hdfsChown read: %s\n", - ((result = (strcmp(finfo->mOwner, newOwner) != 0)) ? - "Failed!" : "Success!")); - totalResult += result; - - fprintf(stderr, "hdfsChmod read: %s\n", - ((result = (finfo->mPermissions != newPerm)) ? - "Failed!" : "Success!")); - totalResult += result; - - // will later use /tmp/ as a different user so enable it - fprintf(stderr, "hdfsChmod: %s\n", - ((result = hdfsChmod(fs, slashTmp, 0777)) ? - "Failed!" : "Success!")); - totalResult += result; - - fprintf(stderr,"newMTime=%ld\n",newMtime); - fprintf(stderr,"curMTime=%ld\n",finfo->mLastMod); - - - fprintf(stderr, "hdfsUtime read (mtime): %s\n", - ((result = (finfo->mLastMod != newMtime / 1000)) ? - "Failed!" : "Success!")); - totalResult += result; - - // Clean up - hdfsFreeFileInfo(finfo, 1); - fprintf(stderr, "hdfsDelete: %s\n", - ((result = hdfsDelete(fs, newDirectory, 1)) ? - "Failed!" : "Success!")); - totalResult += result; - fprintf(stderr, "hdfsDelete: %s\n", - ((result = hdfsDelete(fs, srcPath, 1)) ? - "Failed!" : "Success!")); - totalResult += result; - fprintf(stderr, "hdfsExists: %s\n", - ((result = hdfsExists(fs, newDirectory)) ? - "Success!" : "Failed!")); - totalResult += (result ? 0 : 1); - // Done test generic operations - } - - { - // Test Appends - appendTemplate = strdup("/tmp/appendsXXXXXX"); - if (!appendTemplate) { - fprintf(stderr, "Failed to create appendTemplate!\n"); - exit(1); - } - char *appendPath = mktemp(appendTemplate); - const char* helloBuffer = "Hello,"; - hdfsFile writeFile = NULL; - - // Create - writeFile = hdfsOpenFile(fs, appendPath, O_WRONLY, 0, 0, 0); - if(!writeFile) { - fprintf(stderr, "Failed to open %s for writing!\n", appendPath); - exit(1); - } - fprintf(stderr, "Opened %s for writing successfully...\n", appendPath); - - num_written_bytes = hdfsWrite(fs, writeFile, helloBuffer, - (int) strlen(helloBuffer)); - fprintf(stderr, "Wrote %d bytes\n", num_written_bytes); - hdfsCloseFile(fs, writeFile); - - fprintf(stderr, "hdfsSetReplication: %s\n", - ((result = hdfsSetReplication(fs, appendPath, 1)) ? - "Failed!" : "Success!")); - totalResult += result; - - // Re-Open for Append - writeFile = hdfsOpenFile(fs, appendPath, O_WRONLY | O_APPEND, 0, 0, 0); - if(!writeFile) { - fprintf(stderr, "Failed to open %s for writing!\n", appendPath); - exit(1); - } - fprintf(stderr, "Opened %s for appending successfully...\n", - appendPath); - - helloBuffer = " World"; - num_written_bytes = hdfsWrite(fs, writeFile, helloBuffer, - (int)strlen(helloBuffer) + 1); - fprintf(stderr, "Wrote %d bytes\n", num_written_bytes); - - hdfsCloseFile(fs, writeFile); - - // Check size - hdfsFileInfo *finfo = hdfsGetPathInfo(fs, appendPath); - fprintf(stderr, "fileinfo->mSize: == total %s\n", - ((result = (finfo->mSize == strlen("Hello, World") + 1)) ? - "Success!" : "Failed!")); - totalResult += (result ? 0 : 1); - - // Read and check data - hdfsFile readFile = hdfsOpenFile(fs, appendPath, O_RDONLY, 0, 0, 0); - if (!readFile) { - fprintf(stderr, "Failed to open %s for reading!\n", appendPath); - exit(1); - } - - tSize num_read_bytes = hdfsRead(fs, readFile, buffer, sizeof(buffer)); - fprintf(stderr, "Read following %d bytes:\n%s\n", - num_read_bytes, buffer); - fprintf(stderr, "read == Hello, World %s\n", - (result = (strcmp(buffer, "Hello, World") == 0)) ? - "Success!" : "Failed!"); - hdfsCloseFile(fs, readFile); - - // Cleanup - fprintf(stderr, "hdfsDelete: %s\n", - ((result = hdfsDelete(fs, appendPath, 1)) ? - "Failed!" : "Success!")); - totalResult += result; - // Done test appends - } - - totalResult += (hdfsDisconnect(fs) != 0); - - { - // - // Now test as connecting as a specific user - // This only meant to test that we connected as that user, not to test - // the actual fs user capabilities. Thus just create a file and read - // the owner is correct. - const char *tuser = "nobody"; - userTemplate = strdup("/tmp/usertestXXXXXX"); - if (!userTemplate) { - fprintf(stderr, "Failed to create userTemplate!\n"); - exit(1); - } - char* userWritePath = mktemp(userTemplate); - hdfsFile writeFile = NULL; - - fs = hdfsConnectAsUserNewInstance("default", 50070, tuser); - if(!fs) { - fprintf(stderr, - "Oops! Failed to connect to hdfs as user %s!\n",tuser); - exit(1); - } - - writeFile = hdfsOpenFile(fs, userWritePath, O_WRONLY|O_CREAT, 0, 0, 0); - if(!writeFile) { - fprintf(stderr, "Failed to open %s for writing!\n", userWritePath); - exit(1); - } - fprintf(stderr, "Opened %s for writing successfully...\n", - userWritePath); - - num_written_bytes = hdfsWrite(fs, writeFile, fileContents, - (int)strlen(fileContents) + 1); - fprintf(stderr, "Wrote %d bytes\n", num_written_bytes); - hdfsCloseFile(fs, writeFile); - - hdfsFileInfo *finfo = hdfsGetPathInfo(fs, userWritePath); - if (finfo) { - fprintf(stderr, "hdfs new file user is correct: %s\n", - ((result = (strcmp(finfo->mOwner, tuser) != 0)) ? - "Failed!" : "Success!")); - } else { - fprintf(stderr, - "hdfsFileInfo returned by hdfsGetPathInfo is NULL\n"); - result = -1; - } - totalResult += result; - - // Cleanup - fprintf(stderr, "hdfsDelete: %s\n", - ((result = hdfsDelete(fs, userWritePath, 1)) ? - "Failed!" : "Success!")); - totalResult += result; - // Done test specific user - } - - totalResult += (hdfsDisconnect(fs) != 0); - - // Shutdown the native minidfscluster - nmdShutdown(cluster); - nmdFree(cluster); - - fprintf(stderr, "totalResult == %d\n", totalResult); - if (totalResult != 0) { - return -1; - } else { - return 0; - } -} - -/** - * vim: ts=4: sw=4: et: - */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/test_libwebhdfs_read.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/test_libwebhdfs_read.c deleted file mode 100644 index 61ff1139312..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/test_libwebhdfs_read.c +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "hdfs/hdfs.h" - -#include -#include - -int main(int argc, char **argv) { - - const char* rfile; - tSize fileTotalSize, bufferSize, curSize, totalReadSize; - hdfsFS fs; - hdfsFile readFile; - char *buffer = NULL; - - if (argc != 4) { - fprintf(stderr, "Usage: test_libwebhdfs_read" - " \n"); - exit(1); - } - - fs = hdfsConnect("localhost", 50070); - if (!fs) { - fprintf(stderr, "Oops! Failed to connect to hdfs!\n"); - exit(1); - } - - rfile = argv[1]; - fileTotalSize = strtoul(argv[2], NULL, 10); - bufferSize = strtoul(argv[3], NULL, 10); - - readFile = hdfsOpenFile(fs, rfile, O_RDONLY, bufferSize, 0, 0); - if (!readFile) { - fprintf(stderr, "Failed to open %s for writing!\n", rfile); - exit(1); - } - - // data to be written to the file - buffer = malloc(sizeof(char) * bufferSize); - if(buffer == NULL) { - fprintf(stderr, "Failed to allocate buffer.\n"); - exit(1); - } - - // read from the file - curSize = bufferSize; - totalReadSize = 0; - for (; (curSize = hdfsRead(fs, readFile, buffer, bufferSize)) == bufferSize; ) { - totalReadSize += curSize; - } - totalReadSize += curSize; - - fprintf(stderr, "size of the file: %d; reading size: %d\n", - fileTotalSize, totalReadSize); - - free(buffer); - hdfsCloseFile(fs, readFile); - hdfsDisconnect(fs); - - return 0; -} - diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/test_libwebhdfs_threaded.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/test_libwebhdfs_threaded.c deleted file mode 100644 index 72e333d9837..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/test_libwebhdfs_threaded.c +++ /dev/null @@ -1,247 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "libhdfs-tests/expect.h" -#include "hdfs/hdfs.h" -#include "libhdfs-tests/native_mini_dfs.h" - -#include -#include -#include -#include -#include -#include - -#define TLH_MAX_THREADS 100 - -static struct NativeMiniDfsCluster* cluster; - -static const char *user; - -struct tlhThreadInfo { - /** Thread index */ - int threadIdx; - /** 0 = thread was successful; error code otherwise */ - int success; - /** pthread identifier */ - pthread_t thread; -}; - -static int hdfsSingleNameNodeConnect(struct NativeMiniDfsCluster *cluster, - hdfsFS *fs) -{ - int nnPort; - const char *nnHost; - hdfsFS hdfs; - - if (nmdGetNameNodeHttpAddress(cluster, &nnPort, &nnHost)) { - fprintf(stderr, "Error when retrieving namenode host address.\n"); - return 1; - } - - hdfs = hdfsConnectAsUser(nnHost, nnPort, user); - if(!hdfs) { - fprintf(stderr, "Oops! Failed to connect to hdfs!\n"); - return 1; - } - - *fs = hdfs; - return 0; -} - -static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs) -{ - char prefix[256], tmp[256]; - hdfsFile file; - int ret, expected; - hdfsFileInfo *fileInfo; - - snprintf(prefix, sizeof(prefix), "/tlhData%04d", ti->threadIdx); - - if (hdfsExists(fs, prefix) == 0) { - EXPECT_ZERO(hdfsDelete(fs, prefix, 1)); - } - EXPECT_ZERO(hdfsCreateDirectory(fs, prefix)); - snprintf(tmp, sizeof(tmp), "%s/file", prefix); - - EXPECT_NONNULL(hdfsOpenFile(fs, tmp, O_RDONLY, 0, 0, 0)); - - file = hdfsOpenFile(fs, tmp, O_WRONLY, 0, 0, 0); - EXPECT_NONNULL(file); - - /* TODO: implement writeFully and use it here */ - expected = (int)strlen(prefix); - ret = hdfsWrite(fs, file, prefix, expected); - if (ret < 0) { - ret = errno; - fprintf(stderr, "hdfsWrite failed and set errno %d\n", ret); - return ret; - } - if (ret != expected) { - fprintf(stderr, "hdfsWrite was supposed to write %d bytes, but " - "it wrote %d\n", ret, expected); - return EIO; - } - EXPECT_ZERO(hdfsFlush(fs, file)); - EXPECT_ZERO(hdfsCloseFile(fs, file)); - - /* Let's re-open the file for reading */ - file = hdfsOpenFile(fs, tmp, O_RDONLY, 0, 0, 0); - EXPECT_NONNULL(file); - - /* TODO: implement readFully and use it here */ - ret = hdfsRead(fs, file, tmp, sizeof(tmp)); - if (ret < 0) { - ret = errno; - fprintf(stderr, "hdfsRead failed and set errno %d\n", ret); - return ret; - } - if (ret != expected) { - fprintf(stderr, "hdfsRead was supposed to read %d bytes, but " - "it read %d\n", ret, expected); - return EIO; - } - EXPECT_ZERO(memcmp(prefix, tmp, expected)); - EXPECT_ZERO(hdfsCloseFile(fs, file)); - - snprintf(tmp, sizeof(tmp), "%s/file", prefix); - EXPECT_NONZERO(hdfsChown(fs, tmp, NULL, NULL)); - EXPECT_ZERO(hdfsChown(fs, tmp, NULL, "doop")); - fileInfo = hdfsGetPathInfo(fs, tmp); - EXPECT_NONNULL(fileInfo); - EXPECT_ZERO(strcmp("doop", fileInfo->mGroup)); - hdfsFreeFileInfo(fileInfo, 1); - - EXPECT_ZERO(hdfsChown(fs, tmp, "ha", "doop2")); - fileInfo = hdfsGetPathInfo(fs, tmp); - EXPECT_NONNULL(fileInfo); - EXPECT_ZERO(strcmp("ha", fileInfo->mOwner)); - EXPECT_ZERO(strcmp("doop2", fileInfo->mGroup)); - hdfsFreeFileInfo(fileInfo, 1); - - EXPECT_ZERO(hdfsChown(fs, tmp, "ha2", NULL)); - fileInfo = hdfsGetPathInfo(fs, tmp); - EXPECT_NONNULL(fileInfo); - EXPECT_ZERO(strcmp("ha2", fileInfo->mOwner)); - EXPECT_ZERO(strcmp("doop2", fileInfo->mGroup)); - hdfsFreeFileInfo(fileInfo, 1); - - EXPECT_ZERO(hdfsDelete(fs, prefix, 1)); - return 0; -} - -static void *testHdfsOperations(void *v) -{ - struct tlhThreadInfo *ti = (struct tlhThreadInfo*)v; - hdfsFS fs = NULL; - int ret; - - fprintf(stderr, "testHdfsOperations(threadIdx=%d): starting\n", - ti->threadIdx); - ret = hdfsSingleNameNodeConnect(cluster, &fs); - if (ret) { - fprintf(stderr, "testHdfsOperations(threadIdx=%d): " - "hdfsSingleNameNodeConnect failed with error %d.\n", - ti->threadIdx, ret); - ti->success = EIO; - return NULL; - } - ti->success = doTestHdfsOperations(ti, fs); - if (hdfsDisconnect(fs)) { - ret = errno; - fprintf(stderr, "hdfsDisconnect error %d\n", ret); - ti->success = ret; - } - return NULL; -} - -static int checkFailures(struct tlhThreadInfo *ti, int tlhNumThreads) -{ - int i, threadsFailed = 0; - const char *sep = ""; - - for (i = 0; i < tlhNumThreads; i++) { - if (ti[i].success != 0) { - threadsFailed = 1; - } - } - if (!threadsFailed) { - fprintf(stderr, "testLibHdfs: all threads succeeded. SUCCESS.\n"); - return EXIT_SUCCESS; - } - fprintf(stderr, "testLibHdfs: some threads failed: ["); - for (i = 0; i < tlhNumThreads; i++) { - if (ti[i].success != 0) { - fprintf(stderr, "%s%d", sep, i); - sep = ", "; - } - } - fprintf(stderr, "]. FAILURE.\n"); - return EXIT_FAILURE; -} - -/** - * Test that we can write a file with libhdfs and then read it back - */ -int main(int argc, const char *args[]) -{ - int i, tlhNumThreads; - const char *tlhNumThreadsStr; - struct tlhThreadInfo ti[TLH_MAX_THREADS]; - - if (argc != 2) { - fprintf(stderr, "usage: test_libwebhdfs_threaded \n"); - exit(1); - } - user = args[1]; - - struct NativeMiniDfsConf conf = { - .doFormat = 1, .webhdfsEnabled = 1, .namenodeHttpPort = 50070, - }; - cluster = nmdCreate(&conf); - EXPECT_NONNULL(cluster); - EXPECT_ZERO(nmdWaitClusterUp(cluster)); - - tlhNumThreadsStr = getenv("TLH_NUM_THREADS"); - if (!tlhNumThreadsStr) { - tlhNumThreadsStr = "3"; - } - tlhNumThreads = atoi(tlhNumThreadsStr); - if ((tlhNumThreads <= 0) || (tlhNumThreads > TLH_MAX_THREADS)) { - fprintf(stderr, "testLibHdfs: must have a number of threads " - "between 1 and %d inclusive, not %d\n", - TLH_MAX_THREADS, tlhNumThreads); - return EXIT_FAILURE; - } - memset(&ti[0], 0, sizeof(ti)); - for (i = 0; i < tlhNumThreads; i++) { - ti[i].threadIdx = i; - } - - for (i = 0; i < tlhNumThreads; i++) { - EXPECT_ZERO(pthread_create(&ti[i].thread, NULL, - testHdfsOperations, &ti[i])); - } - for (i = 0; i < tlhNumThreads; i++) { - EXPECT_ZERO(pthread_join(ti[i].thread, NULL)); - } - - EXPECT_ZERO(nmdShutdown(cluster)); - nmdFree(cluster); - return checkFailures(ti, tlhNumThreads); -} diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/test_libwebhdfs_write.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/test_libwebhdfs_write.c deleted file mode 100644 index 2a3310aa6ce..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/contrib/libwebhdfs/src/test_libwebhdfs_write.c +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "hdfs/hdfs.h" - -#include -#include -#include -#include -#include - -int main(int argc, char **argv) { - hdfsFS fs; - const char* writeFileName; - off_t fileTotalSize; - long long tmpBufferSize; - tSize bufferSize = 0, totalWriteSize = 0, toWrite = 0, written = 0; - hdfsFile writeFile = NULL; - int append, i = 0; - char* buffer = NULL; - - if (argc != 6) { - fprintf(stderr, "Usage: test_libwebhdfs_write " - " \n"); - exit(1); - } - - fs = hdfsConnectAsUser("default", 50070, argv[4]); - if (!fs) { - fprintf(stderr, "Oops! Failed to connect to hdfs!\n"); - exit(1); - } - - writeFileName = argv[1]; - fileTotalSize = strtoul(argv[2], NULL, 10); - tmpBufferSize = strtoul(argv[3], NULL, 10); - - // sanity check - if(fileTotalSize == ULONG_MAX && errno == ERANGE) { - fprintf(stderr, "invalid file size %s - must be <= %lu\n", - argv[2], ULONG_MAX); - exit(1); - } - - // currently libhdfs writes are of tSize which is int32 - if(tmpBufferSize > INT_MAX) { - fprintf(stderr, - "invalid buffer size libhdfs API write chunks must be <= %d\n", - INT_MAX); - exit(1); - } - - bufferSize = (tSize) tmpBufferSize; - append = atoi(argv[5]); - if (!append) { - writeFile = hdfsOpenFile(fs, writeFileName, O_WRONLY, bufferSize, 2, 0); - } else { - writeFile = hdfsOpenFile(fs, writeFileName, O_WRONLY | O_APPEND, - bufferSize, 2, 0); - } - if (!writeFile) { - fprintf(stderr, "Failed to open %s for writing!\n", writeFileName); - exit(1); - } - - // data to be written to the file - buffer = malloc(sizeof(char) * bufferSize + 1); - if(buffer == NULL) { - fprintf(stderr, "Could not allocate buffer of size %d\n", bufferSize); - exit(1); - } - for (i = 0; i < bufferSize; ++i) { - buffer[i] = 'a' + (i%26); - } - buffer[bufferSize] = '\0'; - - // write to the file - totalWriteSize = 0; - for (; totalWriteSize < fileTotalSize; ) { - toWrite = bufferSize < (fileTotalSize - totalWriteSize) ? - bufferSize : (fileTotalSize - totalWriteSize); - written = hdfsWrite(fs, writeFile, (void*)buffer, toWrite); - fprintf(stderr, "written size %d, to write size %d\n", - written, toWrite); - totalWriteSize += written; - } - - // cleanup - free(buffer); - hdfsCloseFile(fs, writeFile); - fprintf(stderr, "file total size: %" PRId64 ", total write size: %d\n", - fileTotalSize, totalWriteSize); - hdfsDisconnect(fs); - - return 0; -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index b4b99a273f6..4f569106506 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -4,6 +4,8 @@ Release 2.9.0 - UNRELEASED INCOMPATIBLE CHANGES + HDFS-9047. Retire libwebhdfs. (wheat9) + NEW FEATURES IMPROVEMENTS