From 4bcf516d0e4220f876fdf83f05f0b1ffbd3af243 Mon Sep 17 00:00:00 2001 From: Suresh Srinivas Date: Mon, 29 Oct 2012 05:10:29 +0000 Subject: [PATCH] HDFS-3920. libwebdhfs string processing and using strerror consistently to handle all errors. Contributed by Jing Zhao. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1403173 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../contrib/libwebhdfs/src/hdfs_http_client.c | 529 +++++++----- .../contrib/libwebhdfs/src/hdfs_http_client.h | 268 +++++- .../contrib/libwebhdfs/src/hdfs_http_query.c | 580 ++++++++----- .../contrib/libwebhdfs/src/hdfs_http_query.h | 229 ++++- .../contrib/libwebhdfs/src/hdfs_json_parser.c | 747 ++++++++++------ .../contrib/libwebhdfs/src/hdfs_json_parser.h | 164 +++- .../src/contrib/libwebhdfs/src/hdfs_web.c | 816 +++++++++++------- .../src/main/native/libhdfs/jni_helper.c | 1 + 9 files changed, 2269 insertions(+), 1068 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 6d34dd7958b..30a2f69f6fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -761,6 +761,9 @@ Release 2.0.2-alpha - 2012-09-07 HDFS-3907. Allow multiple users for local block readers. (eli) HDFS-3910. DFSTestUtil#waitReplication should timeout. (eli) + + HDFS-3920. libwebdhfs string processing and using strerror consistently + to handle all errors. (Jing Zhao via suresh) OPTIMIZATIONS diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_client.c b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_client.c index 65b0629d428..e41f950828d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_client.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_client.c @@ -15,28 +15,43 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + #include #include #include -#include + #include "hdfs_http_client.h" static pthread_mutex_t curlInitMutex = PTHREAD_MUTEX_INITIALIZER; static volatile int curlGlobalInited = 0; -ResponseBuffer initResponseBuffer() { - ResponseBuffer info = (ResponseBuffer) calloc(1, sizeof(ResponseBufferInternal)); - if (!info) { - fprintf(stderr, "Cannot allocate memory for responseInfo\n"); - return NULL; +const char *hdfs_strerror(int errnoval) +{ + const char *msg = NULL; + if (errnoval < 0 || errnoval >= sys_nerr) { + msg = "Invalid Error Code"; + } else if (sys_errlist == NULL) { + msg = "Unknown Error"; + } else { + msg = sys_errlist[errnoval]; } - info->remaining = 0; - info->offset = 0; - info->content = NULL; - return info; + return msg; } -void freeResponseBuffer(ResponseBuffer buffer) { +int initResponseBuffer(struct ResponseBuffer **buffer) +{ + struct ResponseBuffer *info = NULL; + int ret = 0; + info = calloc(1, sizeof(struct ResponseBuffer)); + if (!info) { + ret = ENOMEM; + } + *buffer = info; + return ret; +} + +void freeResponseBuffer(struct ResponseBuffer *buffer) +{ if (buffer) { if (buffer->content) { free(buffer->content); @@ -46,8 +61,9 @@ void freeResponseBuffer(ResponseBuffer buffer) { } } -void freeResponse(Response resp) { - if(resp) { +void freeResponse(struct Response *resp) +{ + if (resp) { freeResponseBuffer(resp->body); freeResponseBuffer(resp->header); free(resp); @@ -55,21 +71,30 @@ void freeResponse(Response resp) { } } -/* Callback for allocating local buffer and reading data to local buffer */ -static size_t writefunc(void *ptr, size_t size, size_t nmemb, ResponseBuffer rbuffer) { +/** + * Callback used by libcurl for allocating local buffer and + * reading data to local buffer + */ +static size_t writefunc(void *ptr, size_t size, + size_t nmemb, struct ResponseBuffer *rbuffer) +{ + void *temp = NULL; if (size * nmemb < 1) { return 0; } if (!rbuffer) { - fprintf(stderr, "In writefunc, ResponseBuffer is NULL.\n"); - return -1; + fprintf(stderr, + "ERROR: ResponseBuffer is NULL for the callback writefunc.\n"); + return 0; } if (rbuffer->remaining < size * nmemb) { - rbuffer->content = realloc(rbuffer->content, rbuffer->offset + size * nmemb + 1); - if (rbuffer->content == NULL) { - return -1; + temp = realloc(rbuffer->content, rbuffer->offset + size * nmemb + 1); + if (temp == NULL) { + fprintf(stderr, "ERROR: fail to realloc in callback writefunc.\n"); + return 0; } + rbuffer->content = temp; rbuffer->remaining = size * nmemb; } memcpy(rbuffer->content + rbuffer->offset, ptr, size * nmemb); @@ -80,67 +105,84 @@ static size_t writefunc(void *ptr, size_t size, size_t nmemb, ResponseBuffer rbu } /** - * Callback for reading data to buffer provided by user, + * Callback used by libcurl for reading data into buffer provided by user, * thus no need to reallocate buffer. */ -static size_t writefunc_withbuffer(void *ptr, size_t size, size_t nmemb, ResponseBuffer rbuffer) { +static size_t writeFuncWithUserBuffer(void *ptr, size_t size, + size_t nmemb, struct ResponseBuffer *rbuffer) +{ + size_t toCopy = 0; if (size * nmemb < 1) { return 0; } if (!rbuffer || !rbuffer->content) { - fprintf(stderr, "In writefunc_withbuffer, the buffer provided by user is NULL.\n"); + fprintf(stderr, + "ERROR: buffer to read is NULL for the " + "callback writeFuncWithUserBuffer.\n"); return 0; } - size_t toCopy = rbuffer->remaining < (size * nmemb) ? rbuffer->remaining : (size * nmemb); + toCopy = rbuffer->remaining < (size * nmemb) ? + rbuffer->remaining : (size * nmemb); memcpy(rbuffer->content + rbuffer->offset, ptr, toCopy); rbuffer->offset += toCopy; rbuffer->remaining -= toCopy; return toCopy; } -//callback for writing data to remote peer -static size_t readfunc(void *ptr, size_t size, size_t nmemb, void *stream) { +/** + * Callback used by libcurl for writing data to remote peer + */ +static size_t readfunc(void *ptr, size_t size, size_t nmemb, void *stream) +{ + struct webhdfsBuffer *wbuffer = NULL; if (size * nmemb < 1) { - fprintf(stderr, "In readfunc callback: size * nmemb == %ld\n", size * nmemb); return 0; } - webhdfsBuffer *wbuffer = (webhdfsBuffer *) stream; + wbuffer = stream; pthread_mutex_lock(&wbuffer->writeMutex); while (wbuffer->remaining == 0) { /* - * the current remainning bytes to write is 0, - * check whether need to finish the transfer + * The current remainning bytes to write is 0, + * check closeFlag to see whether need to finish the transfer. * if yes, return 0; else, wait */ - if (wbuffer->closeFlag) { - //we can close the transfer now + if (wbuffer->closeFlag) { // We can close the transfer now + //For debug fprintf(stderr, "CloseFlag is set, ready to close the transfer\n"); pthread_mutex_unlock(&wbuffer->writeMutex); return 0; } else { - // len == 0 indicates that user's buffer has been transferred + // remaining == 0 but closeFlag is not set + // indicates that user's buffer has been transferred pthread_cond_signal(&wbuffer->transfer_finish); - pthread_cond_wait(&wbuffer->newwrite_or_close, &wbuffer->writeMutex); + pthread_cond_wait(&wbuffer->newwrite_or_close, + &wbuffer->writeMutex); } } - if(wbuffer->remaining > 0 && !wbuffer->closeFlag) { - size_t copySize = wbuffer->remaining < size * nmemb ? wbuffer->remaining : size * nmemb; + if (wbuffer->remaining > 0 && !wbuffer->closeFlag) { + size_t copySize = wbuffer->remaining < size * nmemb ? + wbuffer->remaining : size * nmemb; memcpy(ptr, wbuffer->wbuffer + wbuffer->offset, copySize); wbuffer->offset += copySize; wbuffer->remaining -= copySize; pthread_mutex_unlock(&wbuffer->writeMutex); return copySize; } else { - fprintf(stderr, "Webhdfs buffer is %ld, it should be a positive value!\n", wbuffer->remaining); + fprintf(stderr, "ERROR: webhdfsBuffer's remaining is %ld, " + "it should be a positive value!\n", wbuffer->remaining); pthread_mutex_unlock(&wbuffer->writeMutex); return 0; } } -static void initCurlGlobal() { +/** + * Initialize the global libcurl environment + */ +static void initCurlGlobal() +{ if (!curlGlobalInited) { pthread_mutex_lock(&curlInitMutex); if (!curlGlobalInited) { @@ -151,202 +193,297 @@ static void initCurlGlobal() { } } -static Response launchCmd(char *url, enum HttpHeader method, enum Redirect followloc) { - CURL *curl; - CURLcode res; - Response resp; +/** + * Launch simple commands (commands without file I/O) and return response + * + * @param url Target URL + * @param method HTTP method (GET/PUT/POST) + * @param followloc Whether or not need to set CURLOPT_FOLLOWLOCATION + * @param response Response from remote service + * @return 0 for success and non-zero value to indicate error + */ +static int launchCmd(const char *url, enum HttpHeader method, + enum Redirect followloc, struct Response **response) +{ + CURL *curl = NULL; + CURLcode curlCode; + int ret = 0; + struct Response *resp = NULL; - resp = (Response) calloc(1, sizeof(*resp)); + resp = calloc(1, sizeof(struct Response)); if (!resp) { - return NULL; + return ENOMEM; } - resp->body = initResponseBuffer(); - resp->header = initResponseBuffer(); - initCurlGlobal(); - curl = curl_easy_init(); /* get a curl handle */ - if(curl) { - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writefunc); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, resp->body); - curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc); - curl_easy_setopt(curl, CURLOPT_WRITEHEADER, resp->header); - curl_easy_setopt(curl, CURLOPT_URL, url); /* specify target URL */ - switch(method) { - case GET: - break; - case PUT: - curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"PUT"); - break; - case POST: - curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"POST"); - break; - case DELETE: - curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"DELETE"); - break; - default: - fprintf(stderr, "\nHTTP method not defined\n"); - exit(EXIT_FAILURE); - } - if(followloc == YES) { - curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1); - } - - res = curl_easy_perform(curl); /* Now run the curl handler */ - if(res != CURLE_OK) { - fprintf(stderr, "preform the URL %s failed\n", url); - return NULL; - } - curl_easy_cleanup(curl); + ret = initResponseBuffer(&(resp->body)); + if (ret) { + goto done; } - return resp; -} - -static Response launchRead_internal(char *url, enum HttpHeader method, enum Redirect followloc, Response resp) { - if (!resp || !resp->body || !resp->body->content) { - fprintf(stderr, "The user provided buffer should not be NULL!\n"); - return NULL; - } - - CURL *curl; - CURLcode res; - initCurlGlobal(); - curl = curl_easy_init(); /* get a curl handle */ - if(curl) { - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writefunc_withbuffer); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, resp->body); - curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc); - curl_easy_setopt(curl, CURLOPT_WRITEHEADER, resp->header); - curl_easy_setopt(curl, CURLOPT_URL, url); /* specify target URL */ - if(followloc == YES) { - curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1); - } - - res = curl_easy_perform(curl); /* Now run the curl handler */ - if(res != CURLE_OK && res != CURLE_PARTIAL_FILE) { - fprintf(stderr, "preform the URL %s failed\n", url); - return NULL; - } - curl_easy_cleanup(curl); - } - return resp; - -} - -static Response launchWrite(const char *url, enum HttpHeader method, webhdfsBuffer *uploadBuffer) { - if (!uploadBuffer) { - fprintf(stderr, "upload buffer is NULL!\n"); - errno = EINVAL; - return NULL; + ret = initResponseBuffer(&(resp->header)); + if (ret) { + goto done; } initCurlGlobal(); - CURLcode res; - Response response = (Response) calloc(1, sizeof(*response)); - if (!response) { - fprintf(stderr, "failed to allocate memory for response\n"); - return NULL; - } - response->body = initResponseBuffer(); - response->header = initResponseBuffer(); - - //connect to the datanode in order to create the lease in the namenode - CURL *curl = curl_easy_init(); + curl = curl_easy_init(); if (!curl) { - fprintf(stderr, "Failed to initialize the curl handle.\n"); - return NULL; + ret = ENOMEM; // curl_easy_init does not return error code, + // and most of its errors are caused by malloc() + fprintf(stderr, "ERROR in curl_easy_init.\n"); + goto done; + } + /* Set callback function for reading data from remote service */ + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writefunc); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, resp->body); + curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc); + curl_easy_setopt(curl, CURLOPT_WRITEHEADER, resp->header); + curl_easy_setopt(curl, CURLOPT_URL, url); + switch(method) { + case GET: + break; + case PUT: + curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT"); + break; + case POST: + curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST"); + break; + case DELETE: + curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE"); + break; + default: + ret = EINVAL; + fprintf(stderr, "ERROR: Invalid HTTP method\n"); + goto done; + } + if (followloc == YES) { + curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1); + } + /* Now run the curl handler */ + curlCode = curl_easy_perform(curl); + if (curlCode != CURLE_OK) { + ret = EIO; + fprintf(stderr, "ERROR: preform the URL %s failed, <%d>: %s\n", + url, curlCode, curl_easy_strerror(curlCode)); + } +done: + if (curl != NULL) { + curl_easy_cleanup(curl); + } + if (ret) { + free(resp); + resp = NULL; + } + *response = resp; + return ret; +} + +/** + * Launch the read request. The request is sent to the NameNode and then + * redirected to corresponding DataNode + * + * @param url The URL for the read request + * @param resp The response containing the buffer provided by user + * @return 0 for success and non-zero value to indicate error + */ +static int launchReadInternal(const char *url, struct Response* resp) +{ + CURL *curl; + CURLcode curlCode; + int ret = 0; + + if (!resp || !resp->body || !resp->body->content) { + fprintf(stderr, + "ERROR: invalid user-provided buffer!\n"); + return EINVAL; + } + + initCurlGlobal(); + /* get a curl handle */ + curl = curl_easy_init(); + if (!curl) { + fprintf(stderr, "ERROR in curl_easy_init.\n"); + return ENOMEM; + } + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writeFuncWithUserBuffer); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, resp->body); + curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc); + curl_easy_setopt(curl, CURLOPT_WRITEHEADER, resp->header); + curl_easy_setopt(curl, CURLOPT_URL, url); + curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1); + + curlCode = curl_easy_perform(curl); + if (curlCode != CURLE_OK && curlCode != CURLE_PARTIAL_FILE) { + ret = EIO; + fprintf(stderr, "ERROR: preform the URL %s failed, <%d>: %s\n", + url, curlCode, curl_easy_strerror(curlCode)); + } + + curl_easy_cleanup(curl); + return ret; +} + +/** + * The function does the write operation by connecting to a DataNode. + * The function keeps the connection with the DataNode until + * the closeFlag is set. Whenever the current data has been sent out, + * the function blocks waiting for further input from user or close. + * + * @param url URL of the remote DataNode + * @param method PUT for create and POST for append + * @param uploadBuffer Buffer storing user's data to write + * @param response Response from remote service + * @return 0 for success and non-zero value to indicate error + */ +static int launchWrite(const char *url, enum HttpHeader method, + struct webhdfsBuffer *uploadBuffer, + struct Response **response) +{ + CURLcode curlCode; + struct Response* resp = NULL; + struct curl_slist *chunk = NULL; + CURL *curl = NULL; + int ret = 0; + + if (!uploadBuffer) { + fprintf(stderr, "ERROR: upload buffer is NULL!\n"); + return EINVAL; + } + + initCurlGlobal(); + resp = calloc(1, sizeof(struct Response)); + if (!resp) { + return ENOMEM; + } + ret = initResponseBuffer(&(resp->body)); + if (ret) { + goto done; + } + ret = initResponseBuffer(&(resp->header)); + if (ret) { + goto done; + } + + // Connect to the datanode in order to create the lease in the namenode + curl = curl_easy_init(); + if (!curl) { + fprintf(stderr, "ERROR: failed to initialize the curl handle.\n"); + return ENOMEM; } curl_easy_setopt(curl, CURLOPT_URL, url); - if(curl) { - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writefunc); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, response->body); - curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc); - curl_easy_setopt(curl, CURLOPT_WRITEHEADER, response->header); - curl_easy_setopt(curl, CURLOPT_READFUNCTION, readfunc); - curl_easy_setopt(curl, CURLOPT_READDATA, uploadBuffer); - curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L); - curl_easy_setopt(curl, CURLOPT_VERBOSE, 1); - - struct curl_slist *chunk = NULL; - chunk = curl_slist_append(chunk, "Transfer-Encoding: chunked"); - res = curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk); - chunk = curl_slist_append(chunk, "Expect:"); - res = curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk); - - switch(method) { - case GET: - break; - case PUT: - curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"PUT"); - break; - case POST: - curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"POST"); - break; - case DELETE: - curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"DELETE"); - break; - default: - fprintf(stderr, "\nHTTP method not defined\n"); - exit(EXIT_FAILURE); - } - res = curl_easy_perform(curl); - curl_slist_free_all(chunk); - curl_easy_cleanup(curl); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writefunc); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, resp->body); + curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc); + curl_easy_setopt(curl, CURLOPT_WRITEHEADER, resp->header); + curl_easy_setopt(curl, CURLOPT_READFUNCTION, readfunc); + curl_easy_setopt(curl, CURLOPT_READDATA, uploadBuffer); + curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L); + + chunk = curl_slist_append(chunk, "Transfer-Encoding: chunked"); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk); + chunk = curl_slist_append(chunk, "Expect:"); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk); + + switch(method) { + case PUT: + curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT"); + break; + case POST: + curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST"); + break; + default: + ret = EINVAL; + fprintf(stderr, "ERROR: Invalid HTTP method\n"); + goto done; + } + curlCode = curl_easy_perform(curl); + if (curlCode != CURLE_OK) { + ret = EIO; + fprintf(stderr, "ERROR: preform the URL %s failed, <%d>: %s\n", + url, curlCode, curl_easy_strerror(curlCode)); } - return response; +done: + if (chunk != NULL) { + curl_slist_free_all(chunk); + } + if (curl != NULL) { + curl_easy_cleanup(curl); + } + if (ret) { + free(resp); + resp = NULL; + } + *response = resp; + return ret; } -Response launchMKDIR(char *url) { - return launchCmd(url, PUT, NO); +int launchMKDIR(const char *url, struct Response **resp) +{ + return launchCmd(url, PUT, NO, resp); } -Response launchRENAME(char *url) { - return launchCmd(url, PUT, NO); +int launchRENAME(const char *url, struct Response **resp) +{ + return launchCmd(url, PUT, NO, resp); } -Response launchGFS(char *url) { - return launchCmd(url, GET, NO); +int launchGFS(const char *url, struct Response **resp) +{ + return launchCmd(url, GET, NO, resp); } -Response launchLS(char *url) { - return launchCmd(url, GET, NO); +int launchLS(const char *url, struct Response **resp) +{ + return launchCmd(url, GET, NO, resp); } -Response launchCHMOD(char *url) { - return launchCmd(url, PUT, NO); +int launchCHMOD(const char *url, struct Response **resp) +{ + return launchCmd(url, PUT, NO, resp); } -Response launchCHOWN(char *url) { - return launchCmd(url, PUT, NO); +int launchCHOWN(const char *url, struct Response **resp) +{ + return launchCmd(url, PUT, NO, resp); } -Response launchDELETE(char *url) { - return launchCmd(url, DELETE, NO); +int launchDELETE(const char *url, struct Response **resp) +{ + return launchCmd(url, DELETE, NO, resp); } -Response launchOPEN(char *url, Response resp) { - return launchRead_internal(url, GET, YES, resp); +int launchOPEN(const char *url, struct Response* resp) +{ + return launchReadInternal(url, resp); } -Response launchUTIMES(char *url) { - return launchCmd(url, PUT, NO); +int launchUTIMES(const char *url, struct Response **resp) +{ + return launchCmd(url, PUT, NO, resp); } -Response launchNnWRITE(char *url) { - return launchCmd(url, PUT, NO); +int launchNnWRITE(const char *url, struct Response **resp) +{ + return launchCmd(url, PUT, NO, resp); } -Response launchNnAPPEND(char *url) { - return launchCmd(url, POST, NO); +int launchNnAPPEND(const char *url, struct Response **resp) +{ + return launchCmd(url, POST, NO, resp); } -Response launchDnWRITE(const char *url, webhdfsBuffer *buffer) { - return launchWrite(url, PUT, buffer); +int launchDnWRITE(const char *url, struct webhdfsBuffer *buffer, + struct Response **resp) +{ + return launchWrite(url, PUT, buffer, resp); } -Response launchDnAPPEND(const char *url, webhdfsBuffer *buffer) { - return launchWrite(url, POST, buffer); +int launchDnAPPEND(const char *url, struct webhdfsBuffer *buffer, + struct Response **resp) +{ + return launchWrite(url, POST, buffer, resp); } -Response launchSETREPLICATION(char *url) { - return launchCmd(url, PUT, NO); +int launchSETREPLICATION(const char *url, struct Response **resp) +{ + return launchCmd(url, PUT, NO, resp); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_client.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_client.h index 654436440ab..0e70decac57 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_client.h +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_client.h @@ -26,6 +26,7 @@ #include /* for pthread_t */ #include /* for size_t */ +/** enum indicating the type of hdfs stream */ enum hdfsStreamType { UNINITIALIZED = 0, @@ -36,28 +37,39 @@ enum hdfsStreamType /** * webhdfsBuffer - used for hold the data for read/write from/to http connection */ -typedef struct { - const char *wbuffer; // The user's buffer for uploading - size_t remaining; // Length of content - size_t offset; // offset for reading - int openFlag; // Check whether the hdfsOpenFile has been called before - int closeFlag; // Whether to close the http connection for writing - pthread_mutex_t writeMutex; // Synchronization between the curl and hdfsWrite threads - pthread_cond_t newwrite_or_close; // Transferring thread waits for this condition - // when there is no more content for transferring in the buffer - pthread_cond_t transfer_finish; // Condition used to indicate finishing transferring (one buffer) -} webhdfsBuffer; +struct webhdfsBuffer { + const char *wbuffer; /* The user's buffer for uploading */ + size_t remaining; /* Length of content */ + size_t offset; /* offset for reading */ + /* Check whether the hdfsOpenFile has been called before */ + int openFlag; + /* Whether to close the http connection for writing */ + int closeFlag; + /* Synchronization between the curl and hdfsWrite threads */ + pthread_mutex_t writeMutex; + /* + * Transferring thread waits for this condition + * when there is no more content for transferring in the buffer + */ + pthread_cond_t newwrite_or_close; + /* Condition used to indicate finishing transferring (one buffer) */ + pthread_cond_t transfer_finish; +}; +/** File handle for webhdfs */ struct webhdfsFileHandle { - char *absPath; - int bufferSize; - short replication; - tSize blockSize; - char *datanode; - webhdfsBuffer *uploadBuffer; + char *absPath; /* Absolute path of file */ + int bufferSize; /* Size of buffer */ + short replication; /* Number of replication */ + tSize blockSize; /* Block size */ + char *datanode; /* URL of the DataNode */ + /* webhdfsBuffer handle used to store the upload data */ + struct webhdfsBuffer *uploadBuffer; + /* The thread used for data transferring */ pthread_t connThread; }; +/** Type of http header */ enum HttpHeader { GET, PUT, @@ -65,44 +77,218 @@ enum HttpHeader { DELETE }; +/** Whether to redirect */ enum Redirect { YES, NO }; -typedef struct { +/** Buffer used for holding response */ +struct ResponseBuffer { char *content; size_t remaining; size_t offset; -} ResponseBufferInternal; -typedef ResponseBufferInternal *ResponseBuffer; +}; /** * The response got through webhdfs */ -typedef struct { - ResponseBuffer body; - ResponseBuffer header; -}* Response; +struct Response { + struct ResponseBuffer *body; + struct ResponseBuffer *header; +}; -ResponseBuffer initResponseBuffer(); -void freeResponseBuffer(ResponseBuffer buffer); -void freeResponse(Response resp); +/** + * Create and initialize a ResponseBuffer + * + * @param buffer Pointer pointing to new created ResponseBuffer handle + * @return 0 for success, non-zero value to indicate error + */ +int initResponseBuffer(struct ResponseBuffer **buffer) __attribute__ ((warn_unused_result)); -Response launchMKDIR(char *url); -Response launchRENAME(char *url); -Response launchCHMOD(char *url); -Response launchGFS(char *url); -Response launchLS(char *url); -Response launchDELETE(char *url); -Response launchCHOWN(char *url); -Response launchOPEN(char *url, Response resp); -Response launchUTIMES(char *url); -Response launchNnWRITE(char *url); +/** + * Free the given ResponseBuffer + * + * @param buffer The ResponseBuffer to free + */ +void freeResponseBuffer(struct ResponseBuffer *buffer); -Response launchDnWRITE(const char *url, webhdfsBuffer *buffer); -Response launchNnAPPEND(char *url); -Response launchSETREPLICATION(char *url); -Response launchDnAPPEND(const char *url, webhdfsBuffer *buffer); +/** + * Free the given Response + * + * @param resp The Response to free + */ +void freeResponse(struct Response *resp); + +/** + * Send the MKDIR request to NameNode using the given URL. + * The NameNode will execute the operation and return the result as response. + * + * @param url The URL for MKDIR operation + * @param response Response handle to store response returned from the NameNode + * @return 0 for success, non-zero value to indicate error + */ +int launchMKDIR(const char *url, + struct Response **response) __attribute__ ((warn_unused_result)); + +/** + * Send the RENAME request to NameNode using the given URL. + * The NameNode will execute the operation and return the result as response. + * + * @param url The URL for RENAME operation + * @param response Response handle to store response returned from the NameNode + * @return 0 for success, non-zero value to indicate error + */ +int launchRENAME(const char *url, + struct Response **response) __attribute__ ((warn_unused_result)); + +/** + * Send the CHMOD request to NameNode using the given URL. + * The NameNode will execute the operation and return the result as response. + * + * @param url The URL for CHMOD operation + * @param response Response handle to store response returned from the NameNode + * @return 0 for success, non-zero value to indicate error + */ +int launchCHMOD(const char *url, + struct Response **response) __attribute__ ((warn_unused_result)); + +/** + * Send the GetFileStatus request to NameNode using the given URL. + * The NameNode will execute the operation and return the result as response. + * + * @param url The URL for GetFileStatus operation + * @param response Response handle to store response returned from the NameNode, + * containing either file status or exception information + * @return 0 for success, non-zero value to indicate error + */ +int launchGFS(const char *url, + struct Response **response) __attribute__ ((warn_unused_result)); + +/** + * Send the LS (LISTSTATUS) request to NameNode using the given URL. + * The NameNode will execute the operation and return the result as response. + * + * @param url The URL for LISTSTATUS operation + * @param response Response handle to store response returned from the NameNode + * @return 0 for success, non-zero value to indicate error + */ +int launchLS(const char *url, + struct Response **response) __attribute__ ((warn_unused_result)); + +/** + * Send the DELETE request to NameNode using the given URL. + * The NameNode will execute the operation and return the result as response. + * + * @param url The URL for DELETE operation + * @param response Response handle to store response returned from the NameNode + * @return 0 for success, non-zero value to indicate error + */ +int launchDELETE(const char *url, + struct Response **response) __attribute__ ((warn_unused_result)); + +/** + * Send the CHOWN request to NameNode using the given URL. + * The NameNode will execute the operation and return the result as response. + * + * @param url The URL for CHOWN operation + * @param response Response handle to store response returned from the NameNode + * @return 0 for success, non-zero value to indicate error + */ +int launchCHOWN(const char *url, + struct Response **response) __attribute__ ((warn_unused_result)); + +/** + * Send the OPEN request to NameNode using the given URL, + * asking for reading a file (within a range). + * The NameNode first redirects the request to the datanode + * that holds the corresponding first block of the file (within a range), + * and the datanode returns the content of the file through the HTTP connection. + * + * @param url The URL for OPEN operation + * @param resp The response holding user's buffer. + The file content will be written into the buffer. + * @return 0 for success, non-zero value to indicate error + */ +int launchOPEN(const char *url, + struct Response* resp) __attribute__ ((warn_unused_result)); + +/** + * Send the SETTIMES request to NameNode using the given URL. + * The NameNode will execute the operation and return the result as response. + * + * @param url The URL for SETTIMES operation + * @param response Response handle to store response returned from the NameNode + * @return 0 for success, non-zero value to indicate error + */ +int launchUTIMES(const char *url, + struct Response **response) __attribute__ ((warn_unused_result)); + +/** + * Send the WRITE/CREATE request to NameNode using the given URL. + * The NameNode will choose the writing target datanodes + * and return the first datanode in the pipeline as response + * + * @param url The URL for WRITE/CREATE operation connecting to NameNode + * @param response Response handle to store response returned from the NameNode + * @return 0 for success, non-zero value to indicate error + */ +int launchNnWRITE(const char *url, + struct Response **response) __attribute__ ((warn_unused_result)); + +/** + * Send the WRITE request along with to-write content to + * the corresponding DataNode using the given URL. + * The DataNode will write the data and return the response. + * + * @param url The URL for WRITE operation connecting to DataNode + * @param buffer The webhdfsBuffer containing data to be written to hdfs + * @param response Response handle to store response returned from the NameNode + * @return 0 for success, non-zero value to indicate error + */ +int launchDnWRITE(const char *url, struct webhdfsBuffer *buffer, + struct Response **response) __attribute__ ((warn_unused_result)); + +/** + * Send the WRITE (APPEND) request to NameNode using the given URL. + * The NameNode determines the DataNode for appending and + * sends its URL back as response. + * + * @param url The URL for APPEND operation + * @param response Response handle to store response returned from the NameNode + * @return 0 for success, non-zero value to indicate error + */ +int launchNnAPPEND(const char *url, struct Response **response) __attribute__ ((warn_unused_result)); + +/** + * Send the SETREPLICATION request to NameNode using the given URL. + * The NameNode will execute the operation and return the result as response. + * + * @param url The URL for SETREPLICATION operation + * @param response Response handle to store response returned from the NameNode + * @return 0 for success, non-zero value to indicate error + */ +int launchSETREPLICATION(const char *url, + struct Response **response) __attribute__ ((warn_unused_result)); + +/** + * Send the APPEND request along with the content to DataNode. + * The DataNode will do the appending and return the result as response. + * + * @param url The URL for APPEND operation connecting to DataNode + * @param buffer The webhdfsBuffer containing data to be appended + * @param response Response handle to store response returned from the NameNode + * @return 0 for success, non-zero value to indicate error + */ +int launchDnAPPEND(const char *url, struct webhdfsBuffer *buffer, + struct Response **response) __attribute__ ((warn_unused_result)); + +/** + * Call sys_errlist to get the error message string for the given error code + * + * @param errnoval The error code value + * @return The error message string mapped to the given error code + */ +const char *hdfs_strerror(int errnoval); #endif //_HDFS_HTTP_CLIENT_H_ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_query.c b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_query.c index 93c095cd7db..b082c08cef5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_query.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_query.c @@ -22,233 +22,381 @@ #include #include -#define NUM_OF_PERMISSION_BITS 4 -#define NUM_OF_PORT_BITS 6 -#define NUM_OF_REPLICATION_BITS 6 +#define PERM_STR_LEN 4 // "644" + one byte for NUL +#define SHORT_STR_LEN 6 // 65535 + NUL +#define LONG_STR_LEN 21 // 2^64-1 = 18446744073709551615 + NUL -static char *prepareQUERY(const char *host, int nnPort, const char *srcpath, const char *OP, const char *user) { - size_t length; - char *url; - const char *const protocol = "http://"; - const char *const prefix = "/webhdfs/v1"; - char *temp; - char *port; - port= (char*) malloc(NUM_OF_PORT_BITS); - if (!port) { - return NULL; - } - sprintf(port,"%d",nnPort); - if (user != NULL) { - length = strlen(protocol) + strlen(host) + strlen(":") + strlen(port) + strlen(prefix) + strlen(srcpath) + strlen ("?op=") + strlen(OP) + strlen("&user.name=") + strlen(user); - } else { - length = strlen(protocol) + strlen(host) + strlen(":") + strlen(port) + strlen(prefix) + strlen(srcpath) + strlen ("?op=") + strlen(OP); - } +/** + * Create query based on NameNode hostname, + * NameNode port, path, operation and other parameters + * + * @param host NameNode hostName + * @param nnPort Port of NameNode + * @param path Absolute path for the corresponding file + * @param op Operations + * @param paraNum Number of remaining parameters + * @param paraNames Names of remaining parameters + * @param paraValues Values of remaining parameters + * @param url Holding the created URL + * @return 0 on success and non-zero value to indicate error + */ +static int createQueryURL(const char *host, unsigned int nnPort, + const char *path, const char *op, int paraNum, + const char **paraNames, const char **paraValues, + char **queryUrl) +{ + size_t length = 0; + int i = 0, offset = 0, ret = 0; + char *url = NULL; + const char *protocol = "http://"; + const char *prefix = "/webhdfs/v1"; - temp = (char*) malloc(length + 1); - if (!temp) { - return NULL; + if (!paraNames || !paraValues) { + return EINVAL; } - strcpy(temp,protocol); - temp = strcat(temp,host); - temp = strcat(temp,":"); - temp = strcat(temp,port); - temp = strcat(temp,prefix); - temp = strcat(temp,srcpath); - temp = strcat(temp,"?op="); - temp = strcat(temp,OP); - if (user) { - temp = strcat(temp,"&user.name="); - temp = strcat(temp,user); - } - url = temp; - return url; -} - - -static int decToOctal(int decNo) { - int octNo=0; - int expo =0; - while (decNo != 0) { - octNo = ((decNo % 8) * pow(10,expo)) + octNo; - decNo = decNo / 8; - expo++; - } - return octNo; -} - - -char *prepareMKDIR(const char *host, int nnPort, const char *dirsubpath, const char *user) { - return prepareQUERY(host, nnPort, dirsubpath, "MKDIRS", user); -} - - -char *prepareMKDIRwithMode(const char *host, int nnPort, const char *dirsubpath, int mode, const char *user) { - char *url; - char *permission; - permission = (char*) malloc(NUM_OF_PERMISSION_BITS); - if (!permission) { - return NULL; - } - mode = decToOctal(mode); - sprintf(permission,"%d",mode); - url = prepareMKDIR(host, nnPort, dirsubpath, user); - url = realloc(url,(strlen(url) + strlen("&permission=") + strlen(permission) + 1)); - if (!url) { - return NULL; - } - url = strcat(url,"&permission="); - url = strcat(url,permission); - return url; -} - - -char *prepareRENAME(const char *host, int nnPort, const char *srcpath, const char *destpath, const char *user) { - char *url; - url = prepareQUERY(host, nnPort, srcpath, "RENAME", user); - url = realloc(url,(strlen(url) + strlen("&destination=") + strlen(destpath) + 1)); - if (!url) { - return NULL; - } - url = strcat(url,"&destination="); - url = strcat(url,destpath); - return url; -} - -char *prepareGFS(const char *host, int nnPort, const char *dirsubpath, const char *user) { - return (prepareQUERY(host, nnPort, dirsubpath, "GETFILESTATUS", user)); -} - -char *prepareLS(const char *host, int nnPort, const char *dirsubpath, const char *user) { - return (prepareQUERY(host, nnPort, dirsubpath, "LISTSTATUS", user)); -} - -char *prepareCHMOD(const char *host, int nnPort, const char *dirsubpath, int mode, const char *user) { - char *url; - char *permission; - permission = (char*) malloc(NUM_OF_PERMISSION_BITS); - if (!permission) { - return NULL; - } - mode &= 0x3FFF; - mode = decToOctal(mode); - sprintf(permission,"%d",mode); - url = prepareQUERY(host, nnPort, dirsubpath, "SETPERMISSION", user); - url = realloc(url,(strlen(url) + strlen("&permission=") + strlen(permission) + 1)); - if (!url) { - return NULL; - } - url = strcat(url,"&permission="); - url = strcat(url,permission); - return url; -} - -char *prepareDELETE(const char *host, int nnPort, const char *dirsubpath, int recursive, const char *user) { - char *url = (prepareQUERY(host, nnPort, dirsubpath, "DELETE", user)); - char *recursiveFlag = (char *)malloc(6); - if (!recursive) { - strcpy(recursiveFlag, "false"); - } else { - strcpy(recursiveFlag, "true"); - } - url = (char *) realloc(url, strlen(url) + strlen("&recursive=") + strlen(recursiveFlag) + 1); - if (!url) { - return NULL; - } - - strcat(url, "&recursive="); - strcat(url, recursiveFlag); - return url; -} - -char *prepareCHOWN(const char *host, int nnPort, const char *dirsubpath, const char *owner, const char *group, const char *user) { - char *url; - url = prepareQUERY(host, nnPort, dirsubpath, "SETOWNER", user); - if (!url) { - return NULL; - } - if(owner != NULL) { - url = realloc(url,(strlen(url) + strlen("&owner=") + strlen(owner) + 1)); - url = strcat(url,"&owner="); - url = strcat(url,owner); - } - if (group != NULL) { - url = realloc(url,(strlen(url) + strlen("&group=") + strlen(group) + 1)); - url = strcat(url,"&group="); - url = strcat(url,group); - } - return url; -} - -char *prepareOPEN(const char *host, int nnPort, const char *dirsubpath, const char *user, size_t offset, size_t length) { - char *base_url = prepareQUERY(host, nnPort, dirsubpath, "OPEN", user); - char *url = (char *) malloc(strlen(base_url) + strlen("&offset=") + 15 + strlen("&length=") + 15); - if (!url) { - return NULL; - } - sprintf(url, "%s&offset=%ld&length=%ld", base_url, offset, length); - return url; -} - -char *prepareUTIMES(const char *host, int nnPort, const char *dirsubpath, long unsigned mTime, long unsigned aTime, const char *user) { - char *url; - char *modTime; - char *acsTime; - modTime = (char*) malloc(12); - acsTime = (char*) malloc(12); - url = prepareQUERY(host, nnPort, dirsubpath, "SETTIMES", user); - sprintf(modTime,"%lu",mTime); - sprintf(acsTime,"%lu",aTime); - url = realloc(url,(strlen(url) + strlen("&modificationtime=") + strlen(modTime) + strlen("&accesstime=") + strlen(acsTime) + 1)); - if (!url) { - return NULL; - } - url = strcat(url, "&modificationtime="); - url = strcat(url, modTime); - url = strcat(url,"&accesstime="); - url = strcat(url, acsTime); - return url; -} - -char *prepareNnWRITE(const char *host, int nnPort, const char *dirsubpath, const char *user, int16_t replication, size_t blockSize) { - char *url; - url = prepareQUERY(host, nnPort, dirsubpath, "CREATE", user); - url = realloc(url, (strlen(url) + strlen("&overwrite=true") + 1)); - if (!url) { - return NULL; - } - url = strcat(url, "&overwrite=true"); - if (replication > 0) { - url = realloc(url, (strlen(url) + strlen("&replication=") + 6)); - if (!url) { - return NULL; + length = strlen(protocol) + strlen(host) + strlen(":") + + SHORT_STR_LEN + strlen(prefix) + strlen(path) + + strlen ("?op=") + strlen(op); + for (i = 0; i < paraNum; i++) { + if (paraNames[i] && paraValues[i]) { + length += 2 + strlen(paraNames[i]) + strlen(paraValues[i]); + } + } + url = malloc(length); // The '\0' has already been included + // when using SHORT_STR_LEN + if (!url) { + return ENOMEM; + } + + offset = snprintf(url, length, "%s%s:%d%s%s?op=%s", + protocol, host, nnPort, prefix, path, op); + if (offset >= length || offset < 0) { + ret = EIO; + goto done; + } + for (i = 0; i < paraNum; i++) { + if (!paraNames[i] || !paraValues[i] || paraNames[i][0] == '\0' || + paraValues[i][0] == '\0') { + continue; + } + offset += snprintf(url + offset, length - offset, + "&%s=%s", paraNames[i], paraValues[i]); + if (offset >= length || offset < 0) { + ret = EIO; + goto done; + } + } +done: + if (ret) { + free(url); + return ret; + } + *queryUrl = url; + return 0; +} + +int createUrlForMKDIR(const char *host, int nnPort, + const char *path, const char *user, char **url) +{ + const char *userPara = "user.name"; + return createQueryURL(host, nnPort, path, "MKDIRS", 1, + &userPara, &user, url); +} + +int createUrlForGetFileStatus(const char *host, int nnPort, const char *path, + const char *user, char **url) +{ + const char *userPara = "user.name"; + return createQueryURL(host, nnPort, path, "GETFILESTATUS", 1, + &userPara, &user, url); +} + +int createUrlForLS(const char *host, int nnPort, const char *path, + const char *user, char **url) +{ + const char *userPara = "user.name"; + return createQueryURL(host, nnPort, path, "LISTSTATUS", + 1, &userPara, &user, url); +} + +int createUrlForNnAPPEND(const char *host, int nnPort, const char *path, + const char *user, char **url) +{ + const char *userPara = "user.name"; + return createQueryURL(host, nnPort, path, "APPEND", + 1, &userPara, &user, url); +} + +int createUrlForMKDIRwithMode(const char *host, int nnPort, const char *path, + int mode, const char *user, char **url) +{ + int strlength; + char permission[PERM_STR_LEN]; + const char *paraNames[2], *paraValues[2]; + + paraNames[0] = "permission"; + paraNames[1] = "user.name"; + memset(permission, 0, PERM_STR_LEN); + strlength = snprintf(permission, PERM_STR_LEN, "%o", mode); + if (strlength < 0 || strlength >= PERM_STR_LEN) { + return EIO; + } + paraValues[0] = permission; + paraValues[1] = user; + + return createQueryURL(host, nnPort, path, "MKDIRS", 2, + paraNames, paraValues, url); +} + +int createUrlForRENAME(const char *host, int nnPort, const char *srcpath, + const char *destpath, const char *user, char **url) +{ + const char *paraNames[2], *paraValues[2]; + paraNames[0] = "destination"; + paraNames[1] = "user.name"; + paraValues[0] = destpath; + paraValues[1] = user; + + return createQueryURL(host, nnPort, srcpath, + "RENAME", 2, paraNames, paraValues, url); +} + +int createUrlForCHMOD(const char *host, int nnPort, const char *path, + int mode, const char *user, char **url) +{ + int strlength; + char permission[PERM_STR_LEN]; + const char *paraNames[2], *paraValues[2]; + + paraNames[0] = "permission"; + paraNames[1] = "user.name"; + memset(permission, 0, PERM_STR_LEN); + strlength = snprintf(permission, PERM_STR_LEN, "%o", mode); + if (strlength < 0 || strlength >= PERM_STR_LEN) { + return EIO; + } + paraValues[0] = permission; + paraValues[1] = user; + + return createQueryURL(host, nnPort, path, "SETPERMISSION", + 2, paraNames, paraValues, url); +} + +int createUrlForDELETE(const char *host, int nnPort, const char *path, + int recursive, const char *user, char **url) +{ + const char *paraNames[2], *paraValues[2]; + paraNames[0] = "recursive"; + paraNames[1] = "user.name"; + if (recursive) { + paraValues[0] = "true"; + } else { + paraValues[0] = "false"; + } + paraValues[1] = user; + + return createQueryURL(host, nnPort, path, "DELETE", + 2, paraNames, paraValues, url); +} + +int createUrlForCHOWN(const char *host, int nnPort, const char *path, + const char *owner, const char *group, + const char *user, char **url) +{ + const char *paraNames[3], *paraValues[3]; + paraNames[0] = "owner"; + paraNames[1] = "group"; + paraNames[2] = "user.name"; + paraValues[0] = owner; + paraValues[1] = group; + paraValues[2] = user; + + return createQueryURL(host, nnPort, path, "SETOWNER", + 3, paraNames, paraValues, url); +} + +int createUrlForOPEN(const char *host, int nnPort, const char *path, + const char *user, size_t offset, size_t length, char **url) +{ + int strlength; + char offsetStr[LONG_STR_LEN], lengthStr[LONG_STR_LEN]; + const char *paraNames[3], *paraValues[3]; + + paraNames[0] = "offset"; + paraNames[1] = "length"; + paraNames[2] = "user.name"; + memset(offsetStr, 0, LONG_STR_LEN); + memset(lengthStr, 0, LONG_STR_LEN); + strlength = snprintf(offsetStr, LONG_STR_LEN, "%lu", offset); + if (strlength < 0 || strlength >= LONG_STR_LEN) { + return EIO; + } + strlength = snprintf(lengthStr, LONG_STR_LEN, "%lu", length); + if (strlength < 0 || strlength >= LONG_STR_LEN) { + return EIO; + } + paraValues[0] = offsetStr; + paraValues[1] = lengthStr; + paraValues[2] = user; + + return createQueryURL(host, nnPort, path, "OPEN", + 3, paraNames, paraValues, url); +} + +int createUrlForUTIMES(const char *host, int nnPort, const char *path, + long unsigned mTime, long unsigned aTime, + const char *user, char **url) +{ + int strlength; + char modTime[LONG_STR_LEN], acsTime[LONG_STR_LEN]; + const char *paraNames[3], *paraValues[3]; + + memset(modTime, 0, LONG_STR_LEN); + memset(acsTime, 0, LONG_STR_LEN); + strlength = snprintf(modTime, LONG_STR_LEN, "%lu", mTime); + if (strlength < 0 || strlength >= LONG_STR_LEN) { + return EIO; + } + strlength = snprintf(acsTime, LONG_STR_LEN, "%lu", aTime); + if (strlength < 0 || strlength >= LONG_STR_LEN) { + return EIO; + } + paraNames[0] = "modificationtime"; + paraNames[1] = "accesstime"; + paraNames[2] = "user.name"; + paraValues[0] = modTime; + paraValues[1] = acsTime; + paraValues[2] = user; + + return createQueryURL(host, nnPort, path, "SETTIMES", + 3, paraNames, paraValues, url); +} + +int createUrlForNnWRITE(const char *host, int nnPort, + const char *path, const char *user, + int16_t replication, size_t blockSize, char **url) +{ + int strlength; + char repStr[SHORT_STR_LEN], blockSizeStr[LONG_STR_LEN]; + const char *paraNames[4], *paraValues[4]; + + memset(repStr, 0, SHORT_STR_LEN); + memset(blockSizeStr, 0, LONG_STR_LEN); + if (replication > 0) { + strlength = snprintf(repStr, SHORT_STR_LEN, "%u", replication); + if (strlength < 0 || strlength >= SHORT_STR_LEN) { + return EIO; } - sprintf(url, "%s&replication=%d", url, replication); } if (blockSize > 0) { - url = realloc(url, (strlen(url) + strlen("&blocksize=") + 16)); - if (!url) { - return NULL; + strlength = snprintf(blockSizeStr, LONG_STR_LEN, "%lu", blockSize); + if (strlength < 0 || strlength >= LONG_STR_LEN) { + return EIO; } - sprintf(url, "%s&blocksize=%ld", url, blockSize); } - return url; + paraNames[0] = "overwrite"; + paraNames[1] = "replication"; + paraNames[2] = "blocksize"; + paraNames[3] = "user.name"; + paraValues[0] = "true"; + paraValues[1] = repStr; + paraValues[2] = blockSizeStr; + paraValues[3] = user; + + return createQueryURL(host, nnPort, path, "CREATE", + 4, paraNames, paraValues, url); } -char *prepareNnAPPEND(const char *host, int nnPort, const char *dirsubpath, const char *user) { - return (prepareQUERY(host, nnPort, dirsubpath, "APPEND", user)); -} - -char *prepareSETREPLICATION(const char *host, int nnPort, const char *path, int16_t replication, const char *user) +int createUrlForSETREPLICATION(const char *host, int nnPort, + const char *path, int16_t replication, + const char *user, char **url) { - char *url = prepareQUERY(host, nnPort, path, "SETREPLICATION", user); - char *replicationNum = (char *) malloc(NUM_OF_REPLICATION_BITS); - sprintf(replicationNum, "%u", replication); - url = realloc(url, strlen(url) + strlen("&replication=") + strlen(replicationNum)+ 1); - if (!url) { - return NULL; + char repStr[SHORT_STR_LEN]; + const char *paraNames[2], *paraValues[2]; + int strlength; + + memset(repStr, 0, SHORT_STR_LEN); + if (replication > 0) { + strlength = snprintf(repStr, SHORT_STR_LEN, "%u", replication); + if (strlength < 0 || strlength >= SHORT_STR_LEN) { + return EIO; + } + } + paraNames[0] = "replication"; + paraNames[1] = "user.name"; + paraValues[0] = repStr; + paraValues[1] = user; + + return createQueryURL(host, nnPort, path, "SETREPLICATION", + 2, paraNames, paraValues, url); +} + +int createUrlForGetBlockLocations(const char *host, int nnPort, + const char *path, size_t offset, + size_t length, const char *user, char **url) +{ + char offsetStr[LONG_STR_LEN], lengthStr[LONG_STR_LEN]; + const char *paraNames[3], *paraValues[3]; + int strlength; + + memset(offsetStr, 0, LONG_STR_LEN); + memset(lengthStr, 0, LONG_STR_LEN); + if (offset > 0) { + strlength = snprintf(offsetStr, LONG_STR_LEN, "%lu", offset); + if (strlength < 0 || strlength >= LONG_STR_LEN) { + return EIO; + } + } + if (length > 0) { + strlength = snprintf(lengthStr, LONG_STR_LEN, "%lu", length); + if (strlength < 0 || strlength >= LONG_STR_LEN) { + return EIO; + } + } + paraNames[0] = "offset"; + paraNames[1] = "length"; + paraNames[2] = "user.name"; + paraValues[0] = offsetStr; + paraValues[1] = lengthStr; + paraValues[2] = user; + + return createQueryURL(host, nnPort, path, "GET_BLOCK_LOCATIONS", + 3, paraNames, paraValues, url); +} + +int createUrlForReadFromDatanode(const char *dnHost, int dnPort, + const char *path, size_t offset, + size_t length, const char *user, + const char *namenodeRpcAddr, char **url) +{ + char offsetStr[LONG_STR_LEN], lengthStr[LONG_STR_LEN]; + const char *paraNames[4], *paraValues[4]; + int strlength; + + memset(offsetStr, 0, LONG_STR_LEN); + memset(lengthStr, 0, LONG_STR_LEN); + if (offset > 0) { + strlength = snprintf(offsetStr, LONG_STR_LEN, "%lu", offset); + if (strlength < 0 || strlength >= LONG_STR_LEN) { + return EIO; + } + } + if (length > 0) { + strlength = snprintf(lengthStr, LONG_STR_LEN, "%lu", length); + if (strlength < 0 || strlength >= LONG_STR_LEN) { + return EIO; + } } - url = strcat(url, "&replication="); - url = strcat(url, replicationNum); - return url; + paraNames[0] = "offset"; + paraNames[1] = "length"; + paraNames[2] = "user.name"; + paraNames[3] = "namenoderpcaddress"; + paraValues[0] = offsetStr; + paraValues[1] = lengthStr; + paraValues[2] = user; + paraValues[3] = namenodeRpcAddr; + + return createQueryURL(dnHost, dnPort, path, "OPEN", + 4, paraNames, paraValues, url); } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_query.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_query.h index 2377ea15e63..432797bfb8e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_query.h +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_query.h @@ -20,22 +20,221 @@ #ifndef _HDFS_HTTP_QUERY_H_ #define _HDFS_HTTP_QUERY_H_ -#include -#include +#include /* for size_t */ +#include /* for int16_t */ -char *prepareMKDIR(const char *host, int nnPort, const char *dirsubpath, const char *user); -char *prepareMKDIRwithMode(const char *host, int nnPort, const char *dirsubpath, int mode, const char *user); -char *prepareRENAME(const char *host, int nnPort, const char *srcpath, const char *destpath, const char *user); -char *prepareCHMOD(const char *host, int nnPort, const char *dirsubpath, int mode, const char *user); -char *prepareGFS(const char *host, int nnPort, const char *dirsubpath, const char *user); -char *prepareLS(const char *host, int nnPort, const char *dirsubpath, const char *user); -char *prepareDELETE(const char *host, int nnPort, const char *dirsubpath, int recursive, const char *user); -char *prepareCHOWN(const char *host, int nnPort, const char *dirsubpath, const char *owner, const char *group, const char *user); -char *prepareOPEN(const char *host, int nnPort, const char *dirsubpath, const char *user, size_t offset, size_t length); -char *prepareUTIMES(const char *host, int nnPort, const char *dirsubpath, long unsigned mTime, long unsigned aTime, const char *user); -char *prepareNnWRITE(const char *host, int nnPort, const char *dirsubpath, const char *user, int16_t replication, size_t blockSize); -char *prepareNnAPPEND(const char *host, int nnPort, const char *dirsubpath, const char *user); -char *prepareSETREPLICATION(const char *host, int nnPort, const char *path, int16_t replication, const char *user); +/** + * Create the URL for a MKDIR request + * + * @param host The hostname of the NameNode + * @param nnPort Port of the NameNode + * @param path Path of the dir to create + * @param user User name + * @param url Holding the generated URL for MKDIR request + * @return 0 on success and non-zero value on errors + */ +int createUrlForMKDIR(const char *host, int nnPort, + const char *path, const char *user, + char **url) __attribute__ ((warn_unused_result)); + +/** + * Create the URL for a MKDIR (with mode) request + * + * @param host The hostname of the NameNode + * @param nnPort Port of the NameNode + * @param path Path of the dir to create + * @param mode Mode of MKDIR + * @param user User name + * @param url Holding the generated URL for MKDIR request + * @return 0 on success and non-zero value on errors + */ +int createUrlForMKDIRwithMode(const char *host, int nnPort, const char *path, + int mode, const char *user, + char **url) __attribute__ ((warn_unused_result)); + +/** + * Create the URL for a RENAME request + * + * @param host The hostname of the NameNode + * @param nnPort Port of the NameNode + * @param srcpath Source path + * @param dstpath Destination path + * @param user User name + * @param url Holding the generated URL for RENAME request + * @return 0 on success and non-zero value on errors + */ +int createUrlForRENAME(const char *host, int nnPort, const char *srcpath, + const char *dstpath, const char *user, + char **url) __attribute__ ((warn_unused_result)); + +/** + * Create the URL for a CHMOD request + * + * @param host The hostname of the NameNode + * @param nnPort Port of the NameNode + * @param path Target path + * @param mode New mode for the file + * @param user User name + * @param url Holding the generated URL for CHMOD request + * @return 0 on success and non-zero value on errors + */ +int createUrlForCHMOD(const char *host, int nnPort, const char *path, + int mode, const char *user, + char **url) __attribute__ ((warn_unused_result)); + +/** + * Create the URL for a GETFILESTATUS request + * + * @param host The hostname of the NameNode + * @param nnPort Port of the NameNode + * @param path Path of the target file + * @param user User name + * @param url Holding the generated URL for GETFILESTATUS request + * @return 0 on success and non-zero value on errors + */ +int createUrlForGetFileStatus(const char *host, int nnPort, + const char *path, const char *user, + char **url) __attribute__ ((warn_unused_result)); + +/** + * Create the URL for a LISTSTATUS request + * + * @param host The hostname of the NameNode + * @param nnPort Port of the NameNode + * @param path Path of the directory for listing + * @param user User name + * @param url Holding the generated URL for LISTSTATUS request + * @return 0 on success and non-zero value on errors + */ +int createUrlForLS(const char *host, int nnPort, + const char *path, const char *user, + char **url) __attribute__ ((warn_unused_result)); + +/** + * Create the URL for a DELETE request + * + * @param host The hostname of the NameNode + * @param nnPort Port of the NameNode + * @param path Path of the file to be deletected + * @param recursive Whether or not to delete in a recursive way + * @param user User name + * @param url Holding the generated URL for DELETE request + * @return 0 on success and non-zero value on errors + */ +int createUrlForDELETE(const char *host, int nnPort, const char *path, + int recursive, const char *user, + char **url) __attribute__ ((warn_unused_result)); + +/** + * Create the URL for a CHOWN request + * + * @param host The hostname of the NameNode + * @param nnPort Port of the NameNode + * @param path Path of the target + * @param owner New owner + * @param group New group + * @param user User name + * @param url Holding the generated URL for CHOWN request + * @return 0 on success and non-zero value on errors + */ +int createUrlForCHOWN(const char *host, int nnPort, const char *path, + const char *owner, const char *group, const char *user, + char **url) __attribute__ ((warn_unused_result)); + +/** + * Create the URL for a OPEN/READ request + * + * @param host The hostname of the NameNode + * @param nnPort Port of the NameNode + * @param path Path of the file to read + * @param user User name + * @param offset Offset for reading (the start position for this read) + * @param length Length of the file to read + * @param url Holding the generated URL for OPEN/READ request + * @return 0 on success and non-zero value on errors + */ +int createUrlForOPEN(const char *host, int nnPort, const char *path, + const char *user, size_t offset, size_t length, + char **url) __attribute__ ((warn_unused_result)); + +/** + * Create the URL for a UTIMES (update time) request + * + * @param host The hostname of the NameNode + * @param nnPort Port of the NameNode + * @param path Path of the file for updating time + * @param mTime Modified time to set + * @param aTime Access time to set + * @param user User name + * @param url Holding the generated URL for UTIMES request + * @return 0 on success and non-zero value on errors + */ +int createUrlForUTIMES(const char *host, int nnPort, const char *path, + long unsigned mTime, long unsigned aTime, + const char *user, + char **url) __attribute__ ((warn_unused_result)); + +/** + * Create the URL for a WRITE/CREATE request (sent to NameNode) + * + * @param host The hostname of the NameNode + * @param nnPort Port of the NameNode + * @param path Path of the dir to create + * @param user User name + * @param replication Number of replication of the file + * @param blockSize Size of the block for the file + * @param url Holding the generated URL for WRITE request + * @return 0 on success and non-zero value on errors + */ +int createUrlForNnWRITE(const char *host, int nnPort, const char *path, + const char *user, int16_t replication, size_t blockSize, + char **url) __attribute__ ((warn_unused_result)); + +/** + * Create the URL for an APPEND request (sent to NameNode) + * + * @param host The hostname of the NameNode + * @param nnPort Port of the NameNode + * @param path Path of the file for appending + * @param user User name + * @param url Holding the generated URL for APPEND request + * @return 0 on success and non-zero value on errors + */ +int createUrlForNnAPPEND(const char *host, int nnPort, + const char *path, const char *user, + char **url) __attribute__ ((warn_unused_result)); + +/** + * Create the URL for a SETREPLICATION request + * + * @param host The hostname of the NameNode + * @param nnPort Port of the NameNode + * @param path Path of the target file + * @param replication New replication number + * @param user User name + * @param url Holding the generated URL for SETREPLICATION request + * @return 0 on success and non-zero value on errors + */ +int createUrlForSETREPLICATION(const char *host, int nnPort, const char *path, + int16_t replication, const char *user, + char **url) __attribute__ ((warn_unused_result)); + +/** + * Create the URL for a GET_BLOCK_LOCATIONS request + * + * @param host The hostname of the NameNode + * @param nnPort Port of the NameNode + * @param path Path of the target file + * @param offset The offset in the file + * @param length Length of the file content + * @param user User name + * @param url Holding the generated URL for GET_BLOCK_LOCATIONS request + * @return 0 on success and non-zero value on errors + */ +int createUrlForGetBlockLocations(const char *host, int nnPort, + const char *path, size_t offset, + size_t length, const char *user, + char **url) __attribute__ ((warn_unused_result)); #endif //_HDFS_HTTP_QUERY_H_ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.c b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.c index 5973fa5638c..a883f06f7c6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.c @@ -25,6 +25,11 @@ #include #include +static const char * const temporaryRedirectCode = "307 TEMPORARY_REDIRECT"; +static const char * const twoHundredOKCode = "200 OK"; +static const char * const twoHundredOneCreatedCode = "201 Created"; +static const char * const httpHeaderString = "HTTP/1.1"; + /** * Exception information after calling JSON operations */ @@ -34,9 +39,6 @@ struct jsonException { const char *message; }; -static hdfsFileInfo *parseJsonGFS(json_t *jobj, hdfsFileInfo *fileStat, - int *numEntries, const char *operation); - static void dotsToSlashes(char *str) { for (; *str != '\0'; str++) { @@ -45,8 +47,9 @@ static void dotsToSlashes(char *str) } } -int printJsonExceptionV(struct jsonException *exc, int noPrintFlags, - const char *fmt, va_list ap) +/** Print out the JSON exception information */ +static int printJsonExceptionV(struct jsonException *exc, int noPrintFlags, + const char *fmt, va_list ap) { char *javaClassName = NULL; int excErrno = EINTERNAL, shouldPrint = 0; @@ -74,11 +77,23 @@ int printJsonExceptionV(struct jsonException *exc, int noPrintFlags, return excErrno; } -int printJsonException(struct jsonException *exc, int noPrintFlags, - const char *fmt, ...) +/** + * Print out JSON exception information. + * + * @param exc The exception information to print and free + * @param noPrintFlags Flags which determine which exceptions we should NOT + * print. + * @param fmt Printf-style format list + * @param ... Printf-style varargs + * + * @return The POSIX error number associated with the exception + * object. + */ +static int printJsonException(struct jsonException *exc, int noPrintFlags, + const char *fmt, ...) { va_list ap; - int ret; + int ret = 0; va_start(ap, fmt); ret = printJsonExceptionV(exc, noPrintFlags, fmt, ap); @@ -86,81 +101,20 @@ int printJsonException(struct jsonException *exc, int noPrintFlags, return ret; } -static hdfsFileInfo *json_parse_array(json_t *jobj, char *key, hdfsFileInfo *fileStat, int *numEntries, const char *operation) { - int arraylen = json_array_size(jobj); //Getting the length of the array - *numEntries = arraylen; - if (!key) { - return NULL; - } - if(arraylen > 0) { - fileStat = (hdfsFileInfo *)realloc(fileStat,sizeof(hdfsFileInfo)*arraylen); - } - json_t *jvalue; - int i; - for (i=0; i< arraylen; i++) { - jvalue = json_array_get(jobj, i); //Getting the array element at position i - if (json_is_array(jvalue)) { // array within an array - program should never come here for now - json_parse_array(jvalue, NULL, &fileStat[i], numEntries, operation); - } - else if (json_is_object(jvalue)) { // program will definitely come over here - parseJsonGFS(jvalue, &fileStat[i], numEntries, operation); - } - else { - return NULL; // program will never come over here for now - } - } - *numEntries = arraylen; - return fileStat; -} - -int parseBoolean(char *response) { - json_t *root; - json_error_t error; - size_t flags = 0; - int result = 0; - const char *key; - json_t *value; - root = json_loads(response, flags, &error); - void *iter = json_object_iter(root); - while(iter) { - key = json_object_iter_key(iter); - value = json_object_iter_value(iter); - switch (json_typeof(value)) { - case JSON_TRUE: - result = 1; - break; - default: - result = 0; - break; - } - iter = json_object_iter_next(root, iter); - } - return result; -} - -int parseMKDIR(char *response) { - return (parseBoolean(response)); -} - -int parseRENAME(char *response) { - return (parseBoolean(response)); -} - -int parseDELETE(char *response) { - return (parseBoolean(response)); -} - -struct jsonException *parseJsonException(json_t *jobj) { - const char *key; - json_t *value; +/** Parse the exception information from JSON */ +static struct jsonException *parseJsonException(json_t *jobj) +{ + const char *key = NULL; + json_t *value = NULL; struct jsonException *exception = NULL; + void *iter = NULL; exception = calloc(1, sizeof(*exception)); if (!exception) { return NULL; } - void *iter = json_object_iter(jobj); + iter = json_object_iter(jobj); while (iter) { key = json_object_iter_key(iter); value = json_object_iter_value(iter); @@ -175,23 +129,31 @@ struct jsonException *parseJsonException(json_t *jobj) { iter = json_object_iter_next(jobj, iter); } - return exception; } -struct jsonException *parseException(const char *content) { +/** + * Parse the exception information which is presented in JSON + * + * @param content Exception information in JSON + * @return jsonException for printing out + */ +static struct jsonException *parseException(const char *content) +{ + json_error_t error; + size_t flags = 0; + const char *key = NULL; + json_t *value; + json_t *jobj; + struct jsonException *exception = NULL; + if (!content) { return NULL; } - - json_error_t error; - size_t flags = 0; - const char *key; - json_t *value; - json_t *jobj = json_loads(content, flags, &error); - + jobj = json_loads(content, flags, &error); if (!jobj) { - fprintf(stderr, "JSon parsing failed\n"); + fprintf(stderr, "JSon parsing error: on line %d: %s\n", + error.line, error.text); return NULL; } void *iter = json_object_iter(jobj); @@ -199,254 +161,503 @@ struct jsonException *parseException(const char *content) { key = json_object_iter_key(iter); value = json_object_iter_value(iter); - if (!strcmp(key, "RemoteException") && json_typeof(value) == JSON_OBJECT) { - return parseJsonException(value); + if (!strcmp(key, "RemoteException") && + json_typeof(value) == JSON_OBJECT) { + exception = parseJsonException(value); + break; } iter = json_object_iter_next(jobj, iter); } - return NULL; + + json_decref(jobj); + return exception; } -static hdfsFileInfo *parseJsonGFS(json_t *jobj, hdfsFileInfo *fileStat, - int *numEntries, const char *operation) +/** + * Parse the response information which uses TRUE/FALSE + * to indicate whether the operation succeeded + * + * @param response Response information + * @return 0 to indicate success + */ +static int parseBoolean(const char *response) { - const char *tempstr; - const char *key; - json_t *value; - void *iter = json_object_iter(jobj); - while(iter) { - key = json_object_iter_key(iter); - value = json_object_iter_value(iter); - - switch (json_typeof(value)) { - case JSON_INTEGER: - if(!strcmp(key,"accessTime")) { - fileStat->mLastAccess = (time_t)(json_integer_value(value)/1000); - } else if (!strcmp(key,"blockSize")) { - fileStat->mBlockSize = (tOffset)json_integer_value(value); - } else if (!strcmp(key,"length")) { - fileStat->mSize = (tOffset)json_integer_value(value); - } else if(!strcmp(key,"modificationTime")) { - fileStat->mLastMod = (time_t)(json_integer_value(value)/1000); - } else if (!strcmp(key,"replication")) { - fileStat->mReplication = (short)json_integer_value(value); - } - break; - - case JSON_STRING: - if(!strcmp(key,"group")) { - fileStat->mGroup=(char *)json_string_value(value); - } else if (!strcmp(key,"owner")) { - fileStat->mOwner=(char *)json_string_value(value); - } else if (!strcmp(key,"pathSuffix")) { - fileStat->mName=(char *)json_string_value(value); - } else if (!strcmp(key,"permission")) { - tempstr=(char *)json_string_value(value); - fileStat->mPermissions = (short)strtol(tempstr,(char **)NULL,8); - } else if (!strcmp(key,"type")) { - char *cvalue = (char *)json_string_value(value); - if (!strcmp(cvalue, "DIRECTORY")) { - fileStat->mKind = kObjectKindDirectory; - } else { - fileStat->mKind = kObjectKindFile; - } - } - break; - - case JSON_OBJECT: - if(!strcmp(key,"FileStatus")) { - parseJsonGFS(value, fileStat, numEntries, operation); - } else if (!strcmp(key,"FileStatuses")) { - fileStat = parseJsonGFS(value, &fileStat[0], numEntries, operation); - } else if (!strcmp(key,"RemoteException")) { - //Besides returning NULL, we also need to print the exception information - struct jsonException *exception = parseJsonException(value); - if (exception) { - errno = printJsonException(exception, PRINT_EXC_ALL, "Calling WEBHDFS (%s)", operation); - } - - if(fileStat != NULL) { - free(fileStat); - fileStat = NULL; - } - } - break; - - case JSON_ARRAY: - if (!strcmp(key,"FileStatus")) { - fileStat = json_parse_array(value,(char *) key,fileStat,numEntries, operation); - } - break; - - default: - if(fileStat != NULL) { - free(fileStat); - fileStat = NULL; - } - } - iter = json_object_iter_next(jobj, iter); + json_t *root, *value; + json_error_t error; + size_t flags = 0; + int result = 0; + + root = json_loads(response, flags, &error); + if (!root) { + fprintf(stderr, "JSon parsing error: on line %d: %s\n", + error.line, error.text); + return EIO; } - return fileStat; + void *iter = json_object_iter(root); + value = json_object_iter_value(iter); + if (json_typeof(value) == JSON_TRUE) { + result = 0; + } else { + result = EIO; // FALSE means error in remote NN/DN + } + json_decref(root); + return result; } +int parseMKDIR(const char *response) +{ + return parseBoolean(response); +} -int checkHeader(char *header, const char *content, const char *operation) { +int parseRENAME(const char *response) +{ + return parseBoolean(response); +} + +int parseDELETE(const char *response) +{ + return parseBoolean(response); +} + +int parseSETREPLICATION(const char *response) +{ + return parseBoolean(response); +} + +/** + * Check the header of response to see if it's 200 OK + * + * @param header Header information for checking + * @param content Stores exception information if there are errors + * @param operation Indicate the operation for exception printing + * @return 0 for success + */ +static int checkHeader(const char *header, const char *content, + const char *operation) +{ char *result = NULL; - char delims[] = ":"; - char *responseCode= "200 OK"; - if(header == '\0' || strncmp(header, "HTTP/", strlen("HTTP/"))) { - return 0; + const char delims[] = ":"; + char *savepter; + int ret = 0; + + if (!header || strncmp(header, "HTTP/", strlen("HTTP/"))) { + return EINVAL; } - if(!(strstr(header, responseCode)) || !(header = strstr(header, "Content-Length"))) { + if (!(strstr(header, twoHundredOKCode)) || + !(result = strstr(header, "Content-Length"))) { struct jsonException *exc = parseException(content); if (exc) { - errno = printJsonException(exc, PRINT_EXC_ALL, "Calling WEBHDFS (%s)", operation); + ret = printJsonException(exc, PRINT_EXC_ALL, + "Calling WEBHDFS (%s)", operation); + } else { + ret = EIO; } - return 0; + return ret; } - result = strtok(header, delims); - result = strtok(NULL,delims); + result = strtok_r(result, delims, &savepter); + result = strtok_r(NULL, delims, &savepter); while (isspace(*result)) { result++; } - if(strcmp(result,"0")) { //Content-Length should be equal to 0 - return 1; - } else { - return 0; + // Content-Length should be equal to 0, + // and the string should be "0\r\nServer" + if (strncmp(result, "0\r\n", 3)) { + ret = EIO; } + return ret; } -int parseOPEN(const char *header, const char *content) { - const char *responseCode1 = "307 TEMPORARY_REDIRECT"; - const char *responseCode2 = "200 OK"; - if(header == '\0' || strncmp(header,"HTTP/",strlen("HTTP/"))) { - return -1; - } - if(!(strstr(header,responseCode1) && strstr(header, responseCode2))) { - struct jsonException *exc = parseException(content); - if (exc) { - //if the exception is an IOException and it is because the offset is out of the range - //do not print out the exception - if (!strcasecmp(exc->exception, "IOException") && strstr(exc->message, "out of the range")) { - return 0; - } - errno = printJsonException(exc, PRINT_EXC_ALL, "Calling WEBHDFS (OPEN)"); - } - return -1; - } - - return 1; -} - -int parseCHMOD(char *header, const char *content) { +int parseCHMOD(const char *header, const char *content) +{ return checkHeader(header, content, "CHMOD"); } - -int parseCHOWN(char *header, const char *content) { +int parseCHOWN(const char *header, const char *content) +{ return checkHeader(header, content, "CHOWN"); } -int parseUTIMES(char *header, const char *content) { - return checkHeader(header, content, "UTIMES"); +int parseUTIMES(const char *header, const char *content) +{ + return checkHeader(header, content, "SETTIMES"); } - -int checkIfRedirect(const char *const headerstr, const char *content, const char *operation) { - char *responseCode = "307 TEMPORARY_REDIRECT"; - char * locTag = "Location"; - char * tempHeader; - if(headerstr == '\0' || strncmp(headerstr,"HTTP/", 5)) { - return 0; +/** + * Check if the header contains correct information + * ("307 TEMPORARY_REDIRECT" and "Location") + * + * @param header Header for parsing + * @param content Contains exception information + * if the remote operation failed + * @param operation Specify the remote operation when printing out exception + * @return 0 for success + */ +static int checkRedirect(const char *header, + const char *content, const char *operation) +{ + const char *locTag = "Location"; + int ret = 0, offset = 0; + + // The header must start with "HTTP/1.1" + if (!header || strncmp(header, httpHeaderString, + strlen(httpHeaderString))) { + return EINVAL; } - if(!(tempHeader = strstr(headerstr,responseCode))) { - //process possible exception information + + offset += strlen(httpHeaderString); + while (isspace(header[offset])) { + offset++; + } + // Looking for "307 TEMPORARY_REDIRECT" in header + if (strncmp(header + offset, temporaryRedirectCode, + strlen(temporaryRedirectCode))) { + // Process possible exception information struct jsonException *exc = parseException(content); if (exc) { - errno = printJsonException(exc, PRINT_EXC_ALL, "Calling WEBHDFS (%s)", operation); + ret = printJsonException(exc, PRINT_EXC_ALL, + "Calling WEBHDFS (%s)", operation); + } else { + ret = EIO; } - return 0; + return ret; } - if(!(strstr(tempHeader,locTag))) { - return 0; + // Here we just simply check if header contains "Location" tag, + // detailed processing is in parseDnLoc + if (!(strstr(header, locTag))) { + ret = EIO; } - return 1; + return ret; } - -int parseNnWRITE(const char *header, const char *content) { - return checkIfRedirect(header, content, "Write(NameNode)"); +int parseNnWRITE(const char *header, const char *content) +{ + return checkRedirect(header, content, "Write(NameNode)"); } - -int parseNnAPPEND(const char *header, const char *content) { - return checkIfRedirect(header, content, "Append(NameNode)"); +int parseNnAPPEND(const char *header, const char *content) +{ + return checkRedirect(header, content, "Append(NameNode)"); } -char *parseDnLoc(char *content) { - char delims[] = "\r\n"; - char *url = NULL; - char *DnLocation = NULL; - char *savepter; - DnLocation = strtok_r(content, delims, &savepter); - while (DnLocation && strncmp(DnLocation, "Location:", strlen("Location:"))) { - DnLocation = strtok_r(NULL, delims, &savepter); +/** 0 for success , -1 for out of range, other values for error */ +int parseOPEN(const char *header, const char *content) +{ + int ret = 0, offset = 0; + + if (!header || strncmp(header, httpHeaderString, + strlen(httpHeaderString))) { + return EINVAL; } - if (!DnLocation) { - return NULL; + + offset += strlen(httpHeaderString); + while (isspace(header[offset])) { + offset++; } - DnLocation = strstr(DnLocation, "http"); - if (!DnLocation) { - return NULL; + if (strncmp(header + offset, temporaryRedirectCode, + strlen(temporaryRedirectCode)) || + !strstr(header, twoHundredOKCode)) { + struct jsonException *exc = parseException(content); + if (exc) { + // If the exception is an IOException and it is because + // the offset is out of the range, do not print out the exception + if (!strcasecmp(exc->exception, "IOException") && + strstr(exc->message, "out of the range")) { + ret = -1; + } else { + ret = printJsonException(exc, PRINT_EXC_ALL, + "Calling WEBHDFS (OPEN)"); + } + } else { + ret = EIO; + } } - url = malloc(strlen(DnLocation) + 1); + return ret; +} + +int parseDnLoc(char *content, char **dn) +{ + char *url = NULL, *dnLocation = NULL, *savepter, *tempContent; + const char *prefix = "Location: http://"; + const char *prefixToRemove = "Location: "; + const char *delims = "\r\n"; + + tempContent = strdup(content); + if (!tempContent) { + return ENOMEM; + } + + dnLocation = strtok_r(tempContent, delims, &savepter); + while (dnLocation && strncmp(dnLocation, "Location:", + strlen("Location:"))) { + dnLocation = strtok_r(NULL, delims, &savepter); + } + if (!dnLocation) { + return EIO; + } + + while (isspace(*dnLocation)) { + dnLocation++; + } + if (strncmp(dnLocation, prefix, strlen(prefix))) { + return EIO; + } + url = strdup(dnLocation + strlen(prefixToRemove)); if (!url) { - return NULL; + return ENOMEM; } - strcpy(url, DnLocation); - return url; + *dn = url; + return 0; } -int parseDnWRITE(const char *header, const char *content) { - char *responseCode = "201 Created"; - fprintf(stderr, "\nheaderstr is: %s\n", header); - if(header == '\0' || strncmp(header,"HTTP/",strlen("HTTP/"))) { - return 0; +int parseDnWRITE(const char *header, const char *content) +{ + int ret = 0; + if (header == NULL || header[0] == '\0' || + strncmp(header, "HTTP/", strlen("HTTP/"))) { + return EINVAL; } - if(!(strstr(header,responseCode))) { + if (!(strstr(header, twoHundredOneCreatedCode))) { struct jsonException *exc = parseException(content); if (exc) { - errno = printJsonException(exc, PRINT_EXC_ALL, "Calling WEBHDFS (WRITE(DataNode))"); + ret = printJsonException(exc, PRINT_EXC_ALL, + "Calling WEBHDFS (WRITE(DataNode))"); + } else { + ret = EIO; } - return 0; } - return 1; + return ret; } -int parseDnAPPEND(const char *header, const char *content) { - char *responseCode = "200 OK"; - if(header == '\0' || strncmp(header, "HTTP/", strlen("HTTP/"))) { - return 0; +int parseDnAPPEND(const char *header, const char *content) +{ + int ret = 0; + + if (header == NULL || header[0] == '\0' || + strncmp(header, "HTTP/", strlen("HTTP/"))) { + return EINVAL; } - if(!(strstr(header, responseCode))) { + if (!(strstr(header, twoHundredOKCode))) { struct jsonException *exc = parseException(content); if (exc) { - errno = printJsonException(exc, PRINT_EXC_ALL, "Calling WEBHDFS (APPEND(DataNode))"); + ret = printJsonException(exc, PRINT_EXC_ALL, + "Calling WEBHDFS (APPEND(DataNode))"); + } else { + ret = EIO; } - return 0; } - return 1; + return ret; } -hdfsFileInfo *parseGFS(char *str, hdfsFileInfo *fileStat, int *numEntries) { +/** + * Retrieve file status from the JSON object + * + * @param jobj JSON object for parsing, which contains + * file status information + * @param fileStat hdfsFileInfo handle to hold file status information + * @return 0 on success + */ +static int parseJsonForFileStatus(json_t *jobj, hdfsFileInfo *fileStat) +{ + const char *key, *tempstr; + json_t *value; + void *iter = NULL; + + iter = json_object_iter(jobj); + while (iter) { + key = json_object_iter_key(iter); + value = json_object_iter_value(iter); + + if (!strcmp(key, "accessTime")) { + // json field contains time in milliseconds, + // hdfsFileInfo is counted in seconds + fileStat->mLastAccess = json_integer_value(value) / 1000; + } else if (!strcmp(key, "blockSize")) { + fileStat->mBlockSize = json_integer_value(value); + } else if (!strcmp(key, "length")) { + fileStat->mSize = json_integer_value(value); + } else if (!strcmp(key, "modificationTime")) { + fileStat->mLastMod = json_integer_value(value) / 1000; + } else if (!strcmp(key, "replication")) { + fileStat->mReplication = json_integer_value(value); + } else if (!strcmp(key, "group")) { + fileStat->mGroup = strdup(json_string_value(value)); + if (!fileStat->mGroup) { + return ENOMEM; + } + } else if (!strcmp(key, "owner")) { + fileStat->mOwner = strdup(json_string_value(value)); + if (!fileStat->mOwner) { + return ENOMEM; + } + } else if (!strcmp(key, "pathSuffix")) { + fileStat->mName = strdup(json_string_value(value)); + if (!fileStat->mName) { + return ENOMEM; + } + } else if (!strcmp(key, "permission")) { + tempstr = json_string_value(value); + fileStat->mPermissions = (short) strtol(tempstr, NULL, 8); + } else if (!strcmp(key, "type")) { + tempstr = json_string_value(value); + if (!strcmp(tempstr, "DIRECTORY")) { + fileStat->mKind = kObjectKindDirectory; + } else { + fileStat->mKind = kObjectKindFile; + } + } + // Go to the next key-value pair in the json object + iter = json_object_iter_next(jobj, iter); + } + return 0; +} + +int parseGFS(const char *response, hdfsFileInfo *fileStat, int printError) +{ + int ret = 0, printFlag; json_error_t error; size_t flags = 0; - json_t *jobj = json_loads(str, flags, &error); - fileStat = parseJsonGFS(jobj, fileStat, numEntries, "GETPATHSTATUS/LISTSTATUS"); - return fileStat; + json_t *jobj, *value; + const char *key; + void *iter = NULL; + + if (!response || !fileStat) { + return EIO; + } + jobj = json_loads(response, flags, &error); + if (!jobj) { + fprintf(stderr, "error while parsing json: on line %d: %s\n", + error.line, error.text); + return EIO; + } + iter = json_object_iter(jobj); + key = json_object_iter_key(iter); + value = json_object_iter_value(iter); + if (json_typeof(value) == JSON_OBJECT) { + if (!strcmp(key, "RemoteException")) { + struct jsonException *exception = parseJsonException(value); + if (exception) { + if (printError) { + printFlag = PRINT_EXC_ALL; + } else { + printFlag = NOPRINT_EXC_FILE_NOT_FOUND | + NOPRINT_EXC_ACCESS_CONTROL | + NOPRINT_EXC_PARENT_NOT_DIRECTORY; + } + ret = printJsonException(exception, printFlag, + "Calling WEBHDFS GETFILESTATUS"); + } else { + ret = EIO; + } + } else if (!strcmp(key, "FileStatus")) { + ret = parseJsonForFileStatus(value, fileStat); + } else { + ret = EIO; + } + + } else { + ret = EIO; + } + + json_decref(jobj); + return ret; } -int parseSETREPLICATION(char *response) { - return (parseBoolean(response)); +/** + * Parse the JSON array. Called to parse the result of + * the LISTSTATUS operation. Thus each element of the JSON array is + * a JSON object with the information of a file entry contained + * in the folder. + * + * @param jobj The JSON array to be parsed + * @param fileStat The hdfsFileInfo handle used to + * store a group of file information + * @param numEntries Capture the number of files in the folder + * @return 0 for success + */ +static int parseJsonArrayForFileStatuses(json_t *jobj, hdfsFileInfo **fileStat, + int *numEntries) +{ + json_t *jvalue = NULL; + int i = 0, ret = 0, arraylen = 0; + hdfsFileInfo *fileInfo = NULL; + + arraylen = (int) json_array_size(jobj); + if (arraylen > 0) { + fileInfo = calloc(arraylen, sizeof(hdfsFileInfo)); + if (!fileInfo) { + return ENOMEM; + } + } + for (i = 0; i < arraylen; i++) { + //Getting the array element at position i + jvalue = json_array_get(jobj, i); + if (json_is_object(jvalue)) { + ret = parseJsonForFileStatus(jvalue, &fileInfo[i]); + if (ret) { + goto done; + } + } else { + ret = EIO; + goto done; + } + } +done: + if (ret) { + free(fileInfo); + } else { + *numEntries = arraylen; + *fileStat = fileInfo; + } + return ret; } +int parseLS(const char *response, hdfsFileInfo **fileStats, int *numOfEntries) +{ + int ret = 0; + json_error_t error; + size_t flags = 0; + json_t *jobj, *value; + const char *key; + void *iter = NULL; + + if (!response || response[0] == '\0' || !fileStats) { + return EIO; + } + jobj = json_loads(response, flags, &error); + if (!jobj) { + fprintf(stderr, "error while parsing json: on line %d: %s\n", + error.line, error.text); + return EIO; + } + + iter = json_object_iter(jobj); + key = json_object_iter_key(iter); + value = json_object_iter_value(iter); + if (json_typeof(value) == JSON_OBJECT) { + if (!strcmp(key, "RemoteException")) { + struct jsonException *exception = parseJsonException(value); + if (exception) { + ret = printJsonException(exception, PRINT_EXC_ALL, + "Calling WEBHDFS GETFILESTATUS"); + } else { + ret = EIO; + } + } else if (!strcmp(key, "FileStatuses")) { + iter = json_object_iter(value); + value = json_object_iter_value(iter); + if (json_is_array(value)) { + ret = parseJsonArrayForFileStatuses(value, fileStats, + numOfEntries); + } else { + ret = EIO; + } + } else { + ret = EIO; + } + } else { + ret = EIO; + } + + json_decref(jobj); + return ret; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.h index 2fbcb9dcc4d..c5f2f9cafe6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.h +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.h @@ -18,41 +18,161 @@ #ifndef _HDFS_JSON_PARSER_H_ #define _HDFS_JSON_PARSER_H_ -struct jsonException; +/** + * Parse the response for MKDIR request. The response uses TRUE/FALSE + * to indicate whether the operation succeeded. + * + * @param response The response information to parse. + * @return 0 for success + */ +int parseMKDIR(const char *response); /** - * Print out JSON exception information. + * Parse the response for RENAME request. The response uses TRUE/FALSE + * to indicate whether the operation succeeded. * - * @param exc The exception information to print and free - * @param noPrintFlags Flags which determine which exceptions we should NOT - * print. - * @param fmt Printf-style format list - * @param ... Printf-style varargs - * - * @return The POSIX error number associated with the exception - * object. + * @param response The response information to parse. + * @return 0 for success */ -int printJsonException(struct jsonException *exc, int noPrintFlags, - const char *fmt, ...); +int parseRENAME(const char *response); -int parseMKDIR(char *response); -int parseRENAME(char *response); -int parseDELETE (char *response); -int parseSETREPLICATION(char *response); +/** + * Parse the response for DELETE request. The response uses TRUE/FALSE + * to indicate whether the operation succeeded. + * + * @param response The response information to parse. + * @return 0 for success + */ +int parseDELETE(const char *response); +/** + * Parse the response for SETREPLICATION request. The response uses TRUE/FALSE + * to indicate whether the operation succeeded. + * + * @param response The response information to parse. + * @return 0 for success + */ +int parseSETREPLICATION(const char *response); + +/** + * Parse the response for OPEN (read) request. A successful operation + * will return "200 OK". + * + * @param response The response information for parsing + * @return 0 for success , -1 for out of range, other values for error + */ int parseOPEN(const char *header, const char *content); +/** + * Parse the response for WRITE (from NameNode) request. + * A successful operation should return "307 TEMPORARY_REDIRECT" in its header. + * + * @param header The header of the http response + * @param content If failing, the exception message + * sent from NameNode is stored in content + * @return 0 for success + */ int parseNnWRITE(const char *header, const char *content); + +/** + * Parse the response for WRITE (from DataNode) request. + * A successful operation should return "201 Created" in its header. + * + * @param header The header of the http response + * @param content If failing, the exception message + * sent from DataNode is stored in content + * @return 0 for success + */ int parseDnWRITE(const char *header, const char *content); + +/** + * Parse the response for APPEND (sent from NameNode) request. + * A successful operation should return "307 TEMPORARY_REDIRECT" in its header. + * + * @param header The header of the http response + * @param content If failing, the exception message + * sent from NameNode is stored in content + * @return 0 for success + */ int parseNnAPPEND(const char *header, const char *content); + +/** + * Parse the response for APPEND (from DataNode) request. + * A successful operation should return "200 OK" in its header. + * + * @param header The header of the http response + * @param content If failing, the exception message + * sent from DataNode is stored in content + * @return 0 for success + */ int parseDnAPPEND(const char *header, const char *content); -char* parseDnLoc(char *content); +/** + * Parse the response (from NameNode) to get the location information + * of the DataNode that should be contacted for the following write operation. + * + * @param content Content of the http header + * @param dn To store the location of the DataNode for writing + * @return 0 for success + */ +int parseDnLoc(char *content, char **dn) __attribute__ ((warn_unused_result)); -hdfsFileInfo *parseGFS(char *response, hdfsFileInfo *fileStat, int *numEntries); +/** + * Parse the response for GETFILESTATUS operation. + * + * @param response Response to parse. Its detailed format is specified in + * "http://hadoop.apache.org/docs/stable/webhdfs.html#GETFILESTATUS" + * @param fileStat A hdfsFileInfo handle for holding file information + * @param printError Whether or not print out exception + * when file does not exist + * @return 0 for success, non-zero value to indicate error + */ +int parseGFS(const char *response, hdfsFileInfo *fileStat, int printError); -int parseCHOWN (char *header, const char *content); -int parseCHMOD (char *header, const char *content); -int parseUTIMES(char *header, const char *content); +/** + * Parse the response for LISTSTATUS operation. + * + * @param response Response to parse. Its detailed format is specified in + * "http://hadoop.apache.org/docs/r1.0.3/webhdfs.html#LISTSTATUS" + * @param fileStats Pointer pointing to a list of hdfsFileInfo handles + * holding file/dir information in the directory + * @param numEntries After parsing, the value of this parameter indicates + * the number of file entries. + * @return 0 for success, non-zero value to indicate error + */ +int parseLS(const char *response, hdfsFileInfo **fileStats, int *numOfEntries); -#endif //_FUSE_JSON_PARSER_H +/** + * Parse the response for CHOWN request. + * A successful operation should contains "200 OK" in its header, + * and the Content-Length should be 0. + * + * @param header The header of the http response + * @param content If failing, the exception message is stored in content + * @return 0 for success + */ +int parseCHOWN(const char *header, const char *content); + +/** + * Parse the response for CHMOD request. + * A successful operation should contains "200 OK" in its header, + * and the Content-Length should be 0. + * + * @param header The header of the http response + * @param content If failing, the exception message is stored in content + * @return 0 for success + */ +int parseCHMOD(const char *header, const char *content); + +/** + * Parse the response for SETTIMES request. + * A successful operation should contains "200 OK" in its header, + * and the Content-Length should be 0. + * + * @param header The header of the http response + * @param content If failing, the exception message is stored in content + * @return 0 for success + */ +int parseUTIMES(const char *header, const char *content); + +#endif //_HDFS_JSON_PARSER_H_ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c index 5a80449eff5..aa3db3d4af4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c @@ -16,6 +16,10 @@ * limitations under the License. */ +#include +#include +#include + #include "exception.h" #include "hdfs.h" #include "hdfs_http_client.h" @@ -23,15 +27,9 @@ #include "hdfs_json_parser.h" #include "jni_helper.h" -#include -#include -#include -#include -#include - -#define HADOOP_HDFS_CONF "org/apache/hadoop/hdfs/HdfsConfiguration" -#define HADOOP_NAMENODE "org/apache/hadoop/hdfs/server/namenode/NameNode" -#define JAVA_INETSOCKETADDRESS "java/net/InetSocketAddress" +#define HADOOP_HDFS_CONF "org/apache/hadoop/hdfs/HdfsConfiguration" +#define HADOOP_NAMENODE "org/apache/hadoop/hdfs/server/namenode/NameNode" +#define JAVA_INETSOCKETADDRESS "java/net/InetSocketAddress" struct hdfsBuilder { int forceNewInstance; @@ -65,30 +63,70 @@ struct hdfs_internal { */ struct hdfsFile_internal { struct webhdfsFileHandle* file; - enum hdfsStreamType type; - int flags; - tOffset offset; + enum hdfsStreamType type; /* INPUT or OUTPUT */ + int flags; /* Flag indicate read/create/append etc. */ + tOffset offset; /* Current offset position in the file */ }; -static webhdfsBuffer *initWebHdfsBuffer(void) +/** + * Create, initialize and return a webhdfsBuffer + */ +static int initWebHdfsBuffer(struct webhdfsBuffer **webhdfsBuffer) { - webhdfsBuffer *buffer = calloc(1, sizeof(*buffer)); + int ret = 0; + struct webhdfsBuffer *buffer = calloc(1, sizeof(struct webhdfsBuffer)); if (!buffer) { - fprintf(stderr, "Fail to allocate memory for webhdfsBuffer.\n"); - return NULL; + fprintf(stderr, + "ERROR: fail to allocate memory for webhdfsBuffer.\n"); + return ENOMEM; } - buffer->remaining = 0; - buffer->offset = 0; - buffer->wbuffer = NULL; - buffer->closeFlag = 0; - buffer->openFlag = 0; - pthread_mutex_init(&buffer->writeMutex, NULL); - pthread_cond_init(&buffer->newwrite_or_close, NULL); - pthread_cond_init(&buffer->transfer_finish, NULL); - return buffer; + ret = pthread_mutex_init(&buffer->writeMutex, NULL); + if (ret) { + fprintf(stderr, "ERROR: fail in pthread_mutex_init for writeMutex " + "in initWebHdfsBuffer, <%d>: %s.\n", + ret, hdfs_strerror(ret)); + goto done; + } + ret = pthread_cond_init(&buffer->newwrite_or_close, NULL); + if (ret) { + fprintf(stderr, + "ERROR: fail in pthread_cond_init for newwrite_or_close " + "in initWebHdfsBuffer, <%d>: %s.\n", + ret, hdfs_strerror(ret)); + goto done; + } + ret = pthread_cond_init(&buffer->transfer_finish, NULL); + if (ret) { + fprintf(stderr, + "ERROR: fail in pthread_cond_init for transfer_finish " + "in initWebHdfsBuffer, <%d>: %s.\n", + ret, hdfs_strerror(ret)); + goto done; + } + +done: + if (ret) { + free(buffer); + return ret; + } + *webhdfsBuffer = buffer; + return 0; } -static webhdfsBuffer *resetWebhdfsBuffer(webhdfsBuffer *wb, const char *buffer, size_t length) { +/** + * Reset the webhdfsBuffer. This is used in a block way + * when hdfsWrite is called with a new buffer to write. + * The writing thread in libcurl will be waken up to continue writing, + * and the caller of this function is blocked waiting for writing to finish. + * + * @param wb The handle of the webhdfsBuffer + * @param buffer The buffer provided by user to write + * @param length The length of bytes to write + * @return Updated webhdfsBuffer. + */ +static struct webhdfsBuffer *resetWebhdfsBuffer(struct webhdfsBuffer *wb, + const char *buffer, size_t length) +{ if (buffer && length > 0) { pthread_mutex_lock(&wb->writeMutex); wb->wbuffer = buffer; @@ -103,35 +141,49 @@ static webhdfsBuffer *resetWebhdfsBuffer(webhdfsBuffer *wb, const char *buffer, return wb; } -static void freeWebhdfsBuffer(webhdfsBuffer *buffer) { +/** + * Free the webhdfsBuffer and destroy its pthread conditions/mutex + * @param buffer The webhdfsBuffer to free + */ +static void freeWebhdfsBuffer(struct webhdfsBuffer *buffer) +{ + int ret = 0; if (buffer) { - int des = pthread_cond_destroy(&buffer->newwrite_or_close); - if (des == EBUSY) { - fprintf(stderr, "The condition newwrite_or_close is still referenced!\n"); - } else if (des == EINVAL) { - fprintf(stderr, "The condition newwrite_or_close is invalid!\n"); + ret = pthread_cond_destroy(&buffer->newwrite_or_close); + if (ret) { + fprintf(stderr, + "WARN: fail in pthread_cond_destroy for newwrite_or_close " + "in freeWebhdfsBuffer, <%d>: %s.\n", + ret, hdfs_strerror(ret)); + errno = ret; } - des = pthread_cond_destroy(&buffer->transfer_finish); - if (des == EBUSY) { - fprintf(stderr, "The condition transfer_finish is still referenced!\n"); - } else if (des == EINVAL) { - fprintf(stderr, "The condition transfer_finish is invalid!\n"); + ret = pthread_cond_destroy(&buffer->transfer_finish); + if (ret) { + fprintf(stderr, + "WARN: fail in pthread_cond_destroy for transfer_finish " + "in freeWebhdfsBuffer, <%d>: %s.\n", + ret, hdfs_strerror(ret)); + errno = ret; } - if (des == EBUSY) { - fprintf(stderr, "The condition close_clean is still referenced!\n"); - } else if (des == EINVAL) { - fprintf(stderr, "The condition close_clean is invalid!\n"); - } - des = pthread_mutex_destroy(&buffer->writeMutex); - if (des == EBUSY) { - fprintf(stderr, "The mutex is still locked or referenced!\n"); + ret = pthread_mutex_destroy(&buffer->writeMutex); + if (ret) { + fprintf(stderr, + "WARN: fail in pthread_mutex_destroy for writeMutex " + "in freeWebhdfsBuffer, <%d>: %s.\n", + ret, hdfs_strerror(ret)); + errno = ret; } free(buffer); buffer = NULL; } } -static void freeWebFileHandle(struct webhdfsFileHandle * handle) { +/** + * To free the webhdfsFileHandle, which includes a webhdfsBuffer and strings + * @param handle The webhdfsFileHandle to free + */ +static void freeWebFileHandle(struct webhdfsFileHandle * handle) +{ if (!handle) return; freeWebhdfsBuffer(handle->uploadBuffer); @@ -140,11 +192,46 @@ static void freeWebFileHandle(struct webhdfsFileHandle * handle) { free(handle); } +static const char *maybeNull(const char *str) +{ + return str ? str : "(NULL)"; +} + +/** To print a hdfsBuilder as string */ +static const char *hdfsBuilderToStr(const struct hdfsBuilder *bld, + char *buf, size_t bufLen) +{ + int strlength = snprintf(buf, bufLen, "nn=%s, port=%d, " + "kerbTicketCachePath=%s, userName=%s", + maybeNull(bld->nn), bld->port, + maybeNull(bld->kerbTicketCachePath), maybeNull(bld->userName)); + if (strlength < 0 || strlength >= bufLen) { + fprintf(stderr, "failed to print a hdfsBuilder as string.\n"); + return NULL; + } + return buf; +} + +/** + * Free a hdfs_internal handle + * @param fs The hdfs_internal handle to free + */ +static void freeWebHdfsInternal(struct hdfs_internal *fs) +{ + if (fs) { + free(fs->nn); + free(fs->userName); + free(fs->workingDir); + } +} + struct hdfsBuilder *hdfsNewBuilder(void) { struct hdfsBuilder *bld = calloc(1, sizeof(struct hdfsBuilder)); - if (!bld) + if (!bld) { + errno = ENOMEM; return NULL; + } return bld; } @@ -206,12 +293,7 @@ hdfsFS hdfsConnect(const char* nn, tPort port) hdfsFS hdfsConnectNewInstance(const char* nn, tPort port) { - struct hdfsBuilder* bld = (struct hdfsBuilder *) hdfsConnect(nn, port); - if (!bld) { - return NULL; - } - hdfsBuilderSetForceNewInstance(bld); - return hdfsBuilderConnect(bld); + return hdfsConnect(nn, port); } hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port, @@ -227,30 +309,16 @@ hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port, return hdfsBuilderConnect(bld); } -static const char *maybeNull(const char *str) -{ - return str ? str : "(NULL)"; -} - -static const char *hdfsBuilderToStr(const struct hdfsBuilder *bld, - char *buf, size_t bufLen) -{ - snprintf(buf, bufLen, "nn=%s, port=%d, " - "kerbTicketCachePath=%s, userName=%s", - maybeNull(bld->nn), bld->port, - maybeNull(bld->kerbTicketCachePath), maybeNull(bld->userName)); - return buf; -} - -static void freeWebHdfsInternal(struct hdfs_internal *fs) -{ - if (fs) { - free(fs->nn); - free(fs->userName); - free(fs->workingDir); - } -} - +/** + * To retrieve the default configuration value for NameNode's hostName and port + * TODO: This function currently is using JNI, + * we need to do this without using JNI (HDFS-3917) + * + * @param bld The hdfsBuilder handle + * @param port Used to get the default value for NameNode's port + * @param nn Used to get the default value for NameNode's hostName + * @return 0 for success and non-zero value for failure + */ static int retrieveDefaults(const struct hdfsBuilder *bld, tPort *port, char **nn) { @@ -262,13 +330,11 @@ static int retrieveDefaults(const struct hdfsBuilder *bld, tPort *port, int ret = 0; char buf[512]; - // TODO: can we do this without using JNI? See HDFS-3917 env = getJNIEnv(); if (!env) { return EINTERNAL; } - // jHDFSConf = new HDFSConfiguration(); jthr = constructNewObjectOfClass(env, &jHDFSConf, HADOOP_HDFS_CONF, "()V"); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, @@ -277,12 +343,14 @@ static int retrieveDefaults(const struct hdfsBuilder *bld, tPort *port, goto done; } - jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_NAMENODE, "getHttpAddress", - "(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/InetSocketAddress;", - jHDFSConf); + jthr = invokeMethod(env, &jVal, STATIC, NULL, + HADOOP_NAMENODE, "getHttpAddress", + "(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/InetSocketAddress;", + jHDFSConf); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, - "hdfsBuilderConnect(%s)", hdfsBuilderToStr(bld, buf, sizeof(buf))); + "hdfsBuilderConnect(%s)", + hdfsBuilderToStr(bld, buf, sizeof(buf))); goto done; } jAddress = jVal.l; @@ -298,7 +366,8 @@ static int retrieveDefaults(const struct hdfsBuilder *bld, tPort *port, *port = jVal.i; jthr = invokeMethod(env, &jVal, INSTANCE, jAddress, - JAVA_INETSOCKETADDRESS, "getHostName", "()Ljava/lang/String;"); + JAVA_INETSOCKETADDRESS, + "getHostName", "()Ljava/lang/String;"); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsBuilderConnect(%s)", @@ -324,7 +393,7 @@ done: hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) { struct hdfs_internal *fs = NULL; - int ret; + int ret = 0; if (!bld) { ret = EINVAL; @@ -341,8 +410,8 @@ hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) ret = ENOMEM; goto done; } - /* If the namenode is "default" and/or the port of namenode is 0, get the - * default namenode/port */ + // If the namenode is "default" and/or the port of namenode is 0, + // get the default namenode/port if (bld->port == 0 || !strcasecmp("default", bld->nn)) { ret = retrieveDefaults(bld, &fs->port, &fs->nn); if (ret) @@ -369,7 +438,7 @@ hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) ret = ENOMEM; goto done; } - //for debug + // For debug fprintf(stderr, "namenode: %s:%d\n", bld->nn, bld->port); done: @@ -392,47 +461,68 @@ int hdfsDisconnect(hdfsFS fs) return 0; } -static char *getAbsolutePath(hdfsFS fs, const char *path) +/** + * Based on the working directory stored in hdfsFS, + * generate the absolute path for the given path + * + * @param fs The hdfsFS handle which stores the current working directory + * @param path The given path which may not be an absolute path + * @param absPath To hold generated absolute path for the given path + * @return 0 on success, non-zero value indicating error + */ +static int getAbsolutePath(hdfsFS fs, const char *path, char **absPath) { - char *absPath = NULL; + char *tempPath = NULL; size_t absPathLen; + int strlength; if (path[0] == '/') { - // path is already absolute. - return strdup(path); + // Path is already absolute. + tempPath = strdup(path); + if (!tempPath) { + return ENOMEM; + } + *absPath = tempPath; + return 0; } - // prepend the workingDir to the path. - absPathLen = strlen(fs->workingDir) + strlen(path); - absPath = malloc(absPathLen + 1); - if (!absPath) { - return NULL; + // Prepend the workingDir to the path. + absPathLen = strlen(fs->workingDir) + strlen(path) + 1; + tempPath = malloc(absPathLen); + if (!tempPath) { + return ENOMEM; } - snprintf(absPath, absPathLen + 1, "%s%s", fs->workingDir, path); - return absPath; + strlength = snprintf(tempPath, absPathLen, "%s%s", fs->workingDir, path); + if (strlength < 0 || strlength >= absPathLen) { + free(tempPath); + return EIO; + } + *absPath = tempPath; + return 0; } int hdfsCreateDirectory(hdfsFS fs, const char* path) { char *url = NULL, *absPath = NULL; - Response resp = NULL; + struct Response *resp = NULL; int ret = 0; if (fs == NULL || path == NULL) { ret = EINVAL; goto done; } - absPath = getAbsolutePath(fs, path); - if (!absPath) { - ret = ENOMEM; + ret = getAbsolutePath(fs, path, &absPath); + if (ret) { goto done; } - if(!((url = prepareMKDIR(fs->nn, fs->port, absPath, fs->userName)) - && (resp = launchMKDIR(url)) - && (parseMKDIR(resp->body->content)))) { - ret = EIO; + ret = createUrlForMKDIR(fs->nn, fs->port, absPath, fs->userName, &url); + if (ret) { goto done; } - + ret = launchMKDIR(url, &resp); + if (ret) { + goto done; + } + ret = parseMKDIR(resp->body->content); done: freeResponse(resp); free(url); @@ -447,24 +537,27 @@ done: int hdfsChmod(hdfsFS fs, const char* path, short mode) { char *absPath = NULL, *url = NULL; - Response resp = NULL; + struct Response *resp = NULL; int ret = 0; if (fs == NULL || path == NULL) { ret = EINVAL; goto done; } - absPath = getAbsolutePath(fs, path); - if (!absPath) { - ret = ENOMEM; + ret = getAbsolutePath(fs, path, &absPath); + if (ret) { goto done; } - if(!((url = prepareCHMOD(fs->nn, fs->port, absPath, (int)mode, fs->userName)) - && (resp = launchCHMOD(url)) - && (parseCHMOD(resp->header->content, resp->body->content)))) { - ret = EIO; + ret = createUrlForCHMOD(fs->nn, fs->port, absPath, (int) mode, + fs->userName, &url); + if (ret) { goto done; } + ret = launchCHMOD(url, &resp); + if (ret) { + goto done; + } + ret = parseCHMOD(resp->header->content, resp->body->content); done: freeResponse(resp); free(absPath); @@ -480,26 +573,27 @@ int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group) { int ret = 0; char *absPath = NULL, *url = NULL; - Response resp = NULL; + struct Response *resp = NULL; if (fs == NULL || path == NULL) { ret = EINVAL; goto done; } - absPath = getAbsolutePath(fs, path); - if (!absPath) { - ret = ENOMEM; + ret = getAbsolutePath(fs, path, &absPath); + if (ret) { goto done; } - - if(!((url = prepareCHOWN(fs->nn, fs->port, absPath, owner, group, fs->userName)) - && (resp = launchCHOWN(url)) - && (parseCHOWN(resp->header->content, resp->body->content)))) { - ret = EIO; + ret = createUrlForCHOWN(fs->nn, fs->port, absPath, + owner, group, fs->userName, &url); + if (ret) { goto done; } - + ret = launchCHOWN(url, &resp); + if (ret) { + goto done; + } + ret = parseCHOWN(resp->header->content, resp->body->content); done: freeResponse(resp); free(absPath); @@ -515,27 +609,30 @@ int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath) { char *oldAbsPath = NULL, *newAbsPath = NULL, *url = NULL; int ret = 0; - Response resp = NULL; + struct Response *resp = NULL; if (fs == NULL || oldPath == NULL || newPath == NULL) { ret = EINVAL; goto done; } - oldAbsPath = getAbsolutePath(fs, oldPath); - if (!oldAbsPath) { - ret = ENOMEM; + ret = getAbsolutePath(fs, oldPath, &oldAbsPath); + if (ret) { goto done; } - newAbsPath = getAbsolutePath(fs, newPath); - if (!newAbsPath) { - ret = ENOMEM; + ret = getAbsolutePath(fs, newPath, &newAbsPath); + if (ret) { goto done; } - if(!((url = prepareRENAME(fs->nn, fs->port, oldAbsPath, newAbsPath, fs->userName)) - && (resp = launchRENAME(url)) - && (parseRENAME(resp->body->content)))) { - ret = -1; + ret = createUrlForRENAME(fs->nn, fs->port, oldAbsPath, + newAbsPath, fs->userName, &url); + if (ret) { + goto done; } + ret = launchRENAME(url, &resp); + if (ret) { + goto done; + } + ret = parseRENAME(resp->body->content); done: freeResponse(resp); free(oldAbsPath); @@ -548,12 +645,22 @@ done: return 0; } -hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) +/** + * Get the file status for a given path. + * + * @param fs hdfsFS handle containing + * NameNode hostName/port information + * @param path Path for file + * @param printError Whether or not to print out error information + * (mainly remote FileNotFoundException) + * @return File information for the given path + */ +static hdfsFileInfo *hdfsGetPathInfoImpl(hdfsFS fs, const char* path, + int printError) { char *absPath = NULL; char *url=NULL; - Response resp = NULL; - int numEntries = 0; + struct Response *resp = NULL; int ret = 0; hdfsFileInfo *fileInfo = NULL; @@ -561,9 +668,8 @@ hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) ret = EINVAL; goto done; } - absPath = getAbsolutePath(fs, path); - if (!absPath) { - ret = ENOMEM; + ret = getAbsolutePath(fs, path, &absPath); + if (ret) { goto done; } fileInfo = (hdfsFileInfo *) calloc(1, sizeof(hdfsFileInfo)); @@ -573,18 +679,21 @@ hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) } fileInfo->mKind = kObjectKindFile; - if(!((url = prepareGFS(fs->nn, fs->port, absPath, fs->userName)) - && (resp = launchGFS(url)) - && (fileInfo = parseGFS(resp->body->content, fileInfo, &numEntries)))) { - ret = EIO; + ret = createUrlForGetFileStatus(fs->nn, fs->port, absPath, + fs->userName, &url); + if (ret) { goto done; } + ret = launchGFS(url, &resp); + if (ret) { + goto done; + } + ret = parseGFS(resp->body->content, fileInfo, printError); done: freeResponse(resp); free(absPath); free(url); - if (ret == 0) { return fileInfo; } else { @@ -594,10 +703,15 @@ done: } } +hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) +{ + return hdfsGetPathInfoImpl(fs, path, 1); +} + hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries) { char *url = NULL, *absPath = NULL; - Response resp = NULL; + struct Response *resp = NULL; int ret = 0; hdfsFileInfo *fileInfo = NULL; @@ -605,9 +719,8 @@ hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries) ret = EINVAL; goto done; } - absPath = getAbsolutePath(fs, path); - if (!absPath) { - ret = ENOMEM; + ret = getAbsolutePath(fs, path, &absPath); + if (ret) { goto done; } fileInfo = calloc(1, sizeof(*fileInfo)); @@ -615,21 +728,24 @@ hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries) ret = ENOMEM; goto done; } - if(!((url = prepareLS(fs->nn, fs->port, absPath, fs->userName)) - && (resp = launchLS(url)) - && (fileInfo = parseGFS(resp->body->content, fileInfo, numEntries)))) { - ret = EIO; + + ret = createUrlForLS(fs->nn, fs->port, absPath, fs->userName, &url); + if (ret) { goto done; } + ret = launchLS(url, &resp); + if (ret) { + goto done; + } + ret = parseLS(resp->body->content, &fileInfo, numEntries); + done: freeResponse(resp); free(absPath); free(url); - if (ret == 0) { return fileInfo; } else { - hdfsFreeFileInfo(fileInfo, 1); errno = ret; return NULL; } @@ -638,24 +754,28 @@ done: int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) { char *url = NULL, *absPath = NULL; - Response resp = NULL; + struct Response *resp = NULL; int ret = 0; if (fs == NULL || path == NULL) { ret = EINVAL; goto done; } - absPath = getAbsolutePath(fs, path); - if (!absPath) { - ret = ENOMEM; + ret = getAbsolutePath(fs, path, &absPath); + if (ret) { goto done; } - if(!((url = prepareSETREPLICATION(fs->nn, fs->port, absPath, replication, fs->userName)) - && (resp = launchSETREPLICATION(url)) - && (parseSETREPLICATION(resp->body->content)))) { - ret = EIO; + + ret = createUrlForSETREPLICATION(fs->nn, fs->port, absPath, + replication, fs->userName, &url); + if (ret) { goto done; } + ret = launchSETREPLICATION(url, &resp); + if (ret) { + goto done; + } + ret = parseSETREPLICATION(resp->body->content); done: freeResponse(resp); free(absPath); @@ -670,8 +790,7 @@ done: void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries) { int i; - - for (i=0; i < numEntries; ++i) { + for (i = 0; i < numEntries; ++i) { free(hdfsFileInfo[i].mName); free(hdfsFileInfo[i].mOwner); free(hdfsFileInfo[i].mGroup); @@ -682,25 +801,28 @@ void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries) int hdfsDelete(hdfsFS fs, const char* path, int recursive) { char *url = NULL, *absPath = NULL; - Response resp = NULL; + struct Response *resp = NULL; int ret = 0; if (fs == NULL || path == NULL) { ret = EINVAL; goto done; } - absPath = getAbsolutePath(fs, path); - if (!absPath) { - ret = ENOMEM; - goto done; - } - if(!((url = prepareDELETE(fs->nn, fs->port, absPath, recursive, fs->userName)) - && (resp = launchDELETE(url)) - && (parseDELETE(resp->body->content)))) { - ret = EIO; + ret = getAbsolutePath(fs, path, &absPath); + if (ret) { goto done; } + ret = createUrlForDELETE(fs->nn, fs->port, absPath, + recursive, fs->userName, &url); + if (ret) { + goto done; + } + ret = launchDELETE(url, &resp); + if (ret) { + goto done; + } + ret = parseDELETE(resp->body->content); done: freeResponse(resp); free(absPath); @@ -715,26 +837,28 @@ done: int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) { char *url = NULL, *absPath = NULL; - Response resp = NULL; + struct Response *resp = NULL; int ret = 0; if (fs == NULL || path == NULL) { ret = EINVAL; goto done; } - absPath = getAbsolutePath(fs, path); - if (!absPath) { - ret = ENOMEM; + ret = getAbsolutePath(fs, path, &absPath); + if (ret) { goto done; } - if(!((url = prepareUTIMES(fs->nn, fs->port, absPath, mtime, atime, - fs->userName)) - && (resp = launchUTIMES(url)) - && (parseUTIMES(resp->header->content, resp->body->content)))) { - ret = EIO; + + ret = createUrlForUTIMES(fs->nn, fs->port, absPath, mtime, atime, + fs->userName, &url); + if (ret) { goto done; } - + ret = launchUTIMES(url, &resp); + if (ret) { + goto done; + } + ret = parseUTIMES(resp->header->content, resp->body->content); done: freeResponse(resp); free(absPath); @@ -748,7 +872,7 @@ done: int hdfsExists(hdfsFS fs, const char *path) { - hdfsFileInfo *fileInfo = hdfsGetPathInfo(fs, path); + hdfsFileInfo *fileInfo = hdfsGetPathInfoImpl(fs, path, 0); if (!fileInfo) { // (errno will have been set by hdfsGetPathInfo) return -1; @@ -757,14 +881,23 @@ int hdfsExists(hdfsFS fs, const char *path) return 0; } +/** + * The information hold by the thread which writes data to hdfs through http + */ typedef struct { - char *url; - webhdfsBuffer *uploadBuffer; - int flags; - Response resp; + char *url; /* the url of the target datanode for writing*/ + struct webhdfsBuffer *uploadBuffer; /* buffer storing data to write */ + int flags; /* flag indicating writing mode: create or append */ + struct Response *resp; /* response from the target datanode */ } threadData; -static void freeThreadData(threadData *data) { +/** + * Free the threadData struct instance, + * including the response and url contained in it + * @param data The threadData instance to free + */ +static void freeThreadData(threadData *data) +{ if (data) { if (data->url) { free(data->url); @@ -772,18 +905,29 @@ static void freeThreadData(threadData *data) { if (data->resp) { freeResponse(data->resp); } - //the uploadBuffer would be freed by freeWebFileHandle() + // The uploadBuffer would be freed by freeWebFileHandle() free(data); data = NULL; } } -static void *writeThreadOperation(void *v) { - threadData *data = (threadData *) v; +/** + * The action of the thread that writes data to + * the target datanode for hdfsWrite. + * The writing can be either create or append, which is specified by flag + */ +static void *writeThreadOperation(void *v) +{ + int ret = 0; + threadData *data = v; if (data->flags & O_APPEND) { - data->resp = launchDnAPPEND(data->url, data->uploadBuffer); + ret = launchDnAPPEND(data->url, data->uploadBuffer, &(data->resp)); } else { - data->resp = launchDnWRITE(data->url, data->uploadBuffer); + ret = launchDnWRITE(data->url, data->uploadBuffer, &(data->resp)); + } + if (ret) { + fprintf(stderr, "Failed to write to datanode %s, <%d>: %s.\n", + data->url, ret, hdfs_strerror(ret)); } return data; } @@ -816,58 +960,58 @@ static void freeFileInternal(hdfsFile file) static int hdfsOpenOutputFileImpl(hdfsFS fs, hdfsFile file) { struct webhdfsFileHandle *webhandle = file->file; - Response resp = NULL; - int parseRet, append, ret = 0; - char *prepareUrl = NULL, *dnUrl = NULL; + struct Response *resp = NULL; + int append, ret = 0; + char *nnUrl = NULL, *dnUrl = NULL; threadData *data = NULL; - webhandle->uploadBuffer = initWebHdfsBuffer(); - if (!webhandle->uploadBuffer) { - ret = ENOMEM; + ret = initWebHdfsBuffer(&webhandle->uploadBuffer); + if (ret) { goto done; } append = file->flags & O_APPEND; if (!append) { // If we're not appending, send a create request to the NN - prepareUrl = prepareNnWRITE(fs->nn, fs->port, webhandle->absPath, - fs->userName, webhandle->replication, webhandle->blockSize); + ret = createUrlForNnWRITE(fs->nn, fs->port, webhandle->absPath, + fs->userName, webhandle->replication, + webhandle->blockSize, &nnUrl); } else { - prepareUrl = prepareNnAPPEND(fs->nn, fs->port, webhandle->absPath, - fs->userName); + ret = createUrlForNnAPPEND(fs->nn, fs->port, webhandle->absPath, + fs->userName, &nnUrl); } - if (!prepareUrl) { - fprintf(stderr, "fail to create the url connecting to namenode " - "for file creation/appending\n"); - ret = EIO; + if (ret) { + fprintf(stderr, "Failed to create the url connecting to namenode " + "for file creation/appending, <%d>: %s.\n", + ret, hdfs_strerror(ret)); goto done; } if (!append) { - resp = launchNnWRITE(prepareUrl); + ret = launchNnWRITE(nnUrl, &resp); } else { - resp = launchNnAPPEND(prepareUrl); + ret = launchNnAPPEND(nnUrl, &resp); } - if (!resp) { + if (ret) { fprintf(stderr, "fail to get the response from namenode for " - "file creation/appending\n"); - ret = EIO; + "file creation/appending, <%d>: %s.\n", + ret, hdfs_strerror(ret)); goto done; } if (!append) { - parseRet = parseNnWRITE(resp->header->content, resp->body->content); + ret = parseNnWRITE(resp->header->content, resp->body->content); } else { - parseRet = parseNnAPPEND(resp->header->content, resp->body->content); + ret = parseNnAPPEND(resp->header->content, resp->body->content); } - if (!parseRet) { + if (ret) { fprintf(stderr, "fail to parse the response from namenode for " - "file creation/appending\n"); - ret = EIO; + "file creation/appending, <%d>: %s.\n", + ret, hdfs_strerror(ret)); goto done; } - dnUrl = parseDnLoc(resp->header->content); - if (!dnUrl) { + ret = parseDnLoc(resp->header->content, &dnUrl); + if (ret) { fprintf(stderr, "fail to get the datanode url from namenode " - "for file creation/appending\n"); - ret = EIO; + "for file creation/appending, <%d>: %s.\n", + ret, hdfs_strerror(ret)); goto done; } //store the datanode url in the file handle @@ -892,18 +1036,23 @@ static int hdfsOpenOutputFileImpl(hdfsFS fs, hdfsFile file) ret = pthread_create(&webhandle->connThread, NULL, writeThreadOperation, data); if (ret) { - fprintf(stderr, "Failed to create the writing thread.\n"); + fprintf(stderr, "ERROR: failed to create the writing thread " + "in hdfsOpenOutputFileImpl, <%d>: %s.\n", + ret, hdfs_strerror(ret)); goto done; } webhandle->uploadBuffer->openFlag = 1; done: freeResponse(resp); - free(prepareUrl); + free(nnUrl); free(dnUrl); if (ret) { - free(data->url); - free(data); + errno = ret; + if (data) { + free(data->url); + free(data); + } } return ret; } @@ -929,7 +1078,8 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags, goto done; } if ((flags & O_CREAT) && (flags & O_EXCL)) { - fprintf(stderr, "WARN: hdfs does not truly support O_CREATE && O_EXCL\n"); + fprintf(stderr, + "WARN: hdfs does not truly support O_CREATE && O_EXCL\n"); } file = calloc(1, sizeof(struct hdfsFile_internal)); if (!file) { @@ -947,12 +1097,13 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags, webhandle->bufferSize = bufferSize; webhandle->replication = replication; webhandle->blockSize = blockSize; - webhandle->absPath = getAbsolutePath(fs, path); - if (!webhandle->absPath) { - ret = ENOMEM; + ret = getAbsolutePath(fs, path, &webhandle->absPath); + if (ret) { goto done; } file->file = webhandle; + // If open for write/append, + // open and keep the connection with the target datanode for writing if (file->type == OUTPUT) { ret = hdfsOpenOutputFileImpl(fs, file); if (ret) { @@ -988,7 +1139,9 @@ tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void* buffer, tSize length) resetWebhdfsBuffer(wfile->uploadBuffer, buffer, length); return length; } else { - fprintf(stderr, "Error: have not opened the file %s for writing yet.\n", wfile->absPath); + fprintf(stderr, + "Error: have not opened the file %s for writing yet.\n", + wfile->absPath); errno = EBADF; return -1; } @@ -996,42 +1149,47 @@ tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void* buffer, tSize length) int hdfsCloseFile(hdfsFS fs, hdfsFile file) { + void *respv = NULL; + threadData *tdata = NULL; int ret = 0; - fprintf(stderr, "to close file...\n"); + struct webhdfsFileHandle *wfile = NULL; + if (file->type == OUTPUT) { - void *respv; - threadData *tdata; - struct webhdfsFileHandle *wfile = file->file; + wfile = file->file; pthread_mutex_lock(&(wfile->uploadBuffer->writeMutex)); wfile->uploadBuffer->closeFlag = 1; pthread_cond_signal(&wfile->uploadBuffer->newwrite_or_close); pthread_mutex_unlock(&(wfile->uploadBuffer->writeMutex)); - //waiting for the writing thread to terminate + // Waiting for the writing thread to terminate ret = pthread_join(wfile->connThread, &respv); if (ret) { - fprintf(stderr, "Error (code %d) when pthread_join.\n", ret); + fprintf(stderr, "Error when pthread_join in hdfsClose, <%d>: %s.\n", + ret, hdfs_strerror(ret)); } - //parse the response - tdata = (threadData *) respv; - if (!tdata) { - fprintf(stderr, "Response from the writing thread is NULL.\n"); - ret = -1; + // Parse the response + tdata = respv; + if (!tdata || !(tdata->resp)) { + fprintf(stderr, + "ERROR: response from the writing thread is NULL.\n"); + ret = EIO; } if (file->flags & O_APPEND) { - parseDnAPPEND(tdata->resp->header->content, tdata->resp->body->content); + ret = parseDnAPPEND(tdata->resp->header->content, + tdata->resp->body->content); } else { - parseDnWRITE(tdata->resp->header->content, tdata->resp->body->content); + ret = parseDnWRITE(tdata->resp->header->content, + tdata->resp->body->content); } - //free the threaddata + // Free the threaddata freeThreadData(tdata); } freeFileInternal(file); - fprintf(stderr, "Closed the webfilehandle...\n"); if (ret) { - errno = EIO; + errno = ret; + return -1; } - return ret; + return 0; } int hdfsFileIsOpenForRead(hdfsFile file) @@ -1049,8 +1207,7 @@ static int hdfsReadImpl(hdfsFS fs, hdfsFile file, void* buffer, tSize off, { int ret = 0; char *url = NULL; - Response resp = NULL; - int openResult = -1; + struct Response *resp = NULL; if (fs == NULL || file == NULL || file->type != INPUT || buffer == NULL || length < 0) { @@ -1068,30 +1225,41 @@ static int hdfsReadImpl(hdfsFS fs, hdfsFile file, void* buffer, tSize off, ret = ENOMEM; goto done; } - resp->header = initResponseBuffer(); - resp->body = initResponseBuffer(); + ret = initResponseBuffer(&(resp->header)); + if (ret) { + goto done; + } + ret = initResponseBuffer(&(resp->body)); + if (ret) { + goto done; + } + memset(buffer, 0, length); resp->body->content = buffer; resp->body->remaining = length; - if (!((url = prepareOPEN(fs->nn, fs->port, file->file->absPath, - fs->userName, off, length)) - && (resp = launchOPEN(url, resp)) - && ((openResult = parseOPEN(resp->header->content, resp->body->content)) > 0))) { - if (openResult == 0) { - // Special case: if parseOPEN returns 0, we asked for a byte range - // with outside what the file contains. In this case, hdfsRead and - // hdfsPread return 0, meaning end-of-file. - *numRead = 0; - goto done; - } - ret = EIO; + ret = createUrlForOPEN(fs->nn, fs->port, file->file->absPath, + fs->userName, off, length, &url); + if (ret) { goto done; } - *numRead = resp->body->offset; - + ret = launchOPEN(url, resp); + if (ret) { + goto done; + } + ret = parseOPEN(resp->header->content, resp->body->content); + if (ret == -1) { + // Special case: if parseOPEN returns -1, we asked for a byte range + // with outside what the file contains. In this case, hdfsRead and + // hdfsPread return 0, meaning end-of-file. + *numRead = 0; + } else if (ret == 0) { + *numRead = (tSize) resp->body->offset; + } done: - freeResponseBuffer(resp->header); - free(resp->body); + if (resp) { + freeResponseBuffer(resp->header); + free(resp->body); + } free(resp); free(url); return ret; @@ -1099,11 +1267,12 @@ done: tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length) { - int ret; + int ret = 0; tSize numRead = 0; - ret = hdfsReadImpl(fs, file, buffer, file->offset, length, &numRead); - if (ret) { + ret = hdfsReadImpl(fs, file, buffer, (tSize) file->offset, + length, &numRead); + if (ret > 0) { // ret == -1 means end of file errno = ret; return -1; } @@ -1119,18 +1288,6 @@ int hdfsAvailable(hdfsFS fs, hdfsFile file) return 0; } -int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) -{ - errno = ENOTSUP; - return -1; -} - -int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) -{ - errno = ENOTSUP; - return -1; -} - int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) { struct webhdfsFileHandle *wf; @@ -1172,7 +1329,8 @@ done: return 0; } -tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length) +tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, + void* buffer, tSize length) { int ret; tSize numRead = 0; @@ -1181,8 +1339,8 @@ tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize errno = EINVAL; return -1; } - ret = hdfsReadImpl(fs, file, buffer, position, length, &numRead); - if (ret) { + ret = hdfsReadImpl(fs, file, buffer, (tSize) position, length, &numRead); + if (ret > 0) { errno = ret; return -1; } @@ -1200,21 +1358,44 @@ tOffset hdfsTell(hdfsFS fs, hdfsFile file) char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize) { + int strlength; if (fs == NULL || buffer == NULL || bufferSize <= 0) { errno = EINVAL; return NULL; } - if (snprintf(buffer, bufferSize, "%s", fs->workingDir) >= bufferSize) { + strlength = snprintf(buffer, bufferSize, "%s", fs->workingDir); + if (strlength >= bufferSize) { errno = ENAMETOOLONG; return NULL; + } else if (strlength < 0) { + errno = EIO; + return NULL; } return buffer; } +/** Replace "//" with "/" in path */ +static void normalizePath(char *path) +{ + int i = 0, j = 0, sawslash = 0; + + for (i = j = sawslash = 0; path[i] != '\0'; i++) { + if (path[i] != '/') { + sawslash = 0; + path[j++] = path[i]; + } else if (path[i] == '/' && !sawslash) { + sawslash = 1; + path[j++] = '/'; + } + } + path[j] = '\0'; +} + int hdfsSetWorkingDirectory(hdfsFS fs, const char* path) { - char *newWorkingDir; - size_t strlenPath, newWorkingDirLen; + char *newWorkingDir = NULL; + size_t strlenPath = 0, newWorkingDirLen = 0; + int strlength; if (fs == NULL || path == NULL) { errno = EINVAL; @@ -1225,25 +1406,28 @@ int hdfsSetWorkingDirectory(hdfsFS fs, const char* path) errno = EINVAL; return -1; } - if (path[0] != '/') { - // TODO: support non-absolute paths. They should be interpreted - // relative to the current path. - errno = ENOTSUP; - return -1; - } - if (strstr(path, "//")) { - // TODO: support non-normalized paths (by normalizing them.) - errno = ENOTSUP; - return -1; - } - newWorkingDirLen = strlenPath + 2; + // the max string length of the new working dir is + // (length of old working dir) + (length of given path) + strlen("/") + 1 + newWorkingDirLen = strlen(fs->workingDir) + strlenPath + 2; newWorkingDir = malloc(newWorkingDirLen); if (!newWorkingDir) { errno = ENOMEM; return -1; } - snprintf(newWorkingDir, newWorkingDirLen, "%s%s", - path, (path[strlenPath - 1] == '/') ? "" : "/"); + strlength = snprintf(newWorkingDir, newWorkingDirLen, "%s%s%s", + (path[0] == '/') ? "" : fs->workingDir, + path, (path[strlenPath - 1] == '/') ? "" : "/"); + if (strlength < 0 || strlength >= newWorkingDirLen) { + free(newWorkingDir); + errno = EIO; + return -1; + } + + if (strstr(path, "//")) { + // normalize the path by replacing "//" with "/" + normalizePath(newWorkingDir); + } + free(fs->workingDir); fs->workingDir = newWorkingDir; return 0; @@ -1283,7 +1467,7 @@ int hdfsHFlush(hdfsFS fs, hdfsFile file) errno = EINVAL; return -1; } - // TODO: block until our write buffer is flushed + // TODO: block until our write buffer is flushed (HDFS-3952) return 0; } @@ -1293,7 +1477,7 @@ int hdfsFlush(hdfsFS fs, hdfsFile file) errno = EINVAL; return -1; } - // TODO: block until our write buffer is flushed + // TODO: block until our write buffer is flushed (HDFS-3952) return 0; } @@ -1316,3 +1500,15 @@ tOffset hdfsGetUsed(hdfsFS fs) return -1; } +int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) +{ + errno = ENOTSUP; + return -1; +} + +int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) +{ + errno = ENOTSUP; + return -1; +} + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c index bcd22882208..c768c9c1d04 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c @@ -146,6 +146,7 @@ static int hashTableInit(void) if (hcreate(MAX_HASH_TABLE_ELEM) == 0) { fprintf(stderr, "error creating hashtable, <%d>: %s\n", errno, strerror(errno)); + UNLOCK_HASH_TABLE(); return 0; } hashTableInited = 1;