HDFS-3920. libwebdhfs string processing and using strerror consistently to handle all errors. Contributed by Jing Zhao.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1403173 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2012-10-29 05:10:29 +00:00
parent cea7bbc630
commit 4bcf516d0e
9 changed files with 2269 additions and 1068 deletions

View File

@ -761,6 +761,9 @@ Release 2.0.2-alpha - 2012-09-07
HDFS-3907. Allow multiple users for local block readers. (eli) HDFS-3907. Allow multiple users for local block readers. (eli)
HDFS-3910. DFSTestUtil#waitReplication should timeout. (eli) HDFS-3910. DFSTestUtil#waitReplication should timeout. (eli)
HDFS-3920. libwebdhfs string processing and using strerror consistently
to handle all errors. (Jing Zhao via suresh)
OPTIMIZATIONS OPTIMIZATIONS

View File

@ -15,28 +15,43 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <curl/curl.h> #include <curl/curl.h>
#include <pthread.h>
#include "hdfs_http_client.h" #include "hdfs_http_client.h"
static pthread_mutex_t curlInitMutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t curlInitMutex = PTHREAD_MUTEX_INITIALIZER;
static volatile int curlGlobalInited = 0; static volatile int curlGlobalInited = 0;
ResponseBuffer initResponseBuffer() { const char *hdfs_strerror(int errnoval)
ResponseBuffer info = (ResponseBuffer) calloc(1, sizeof(ResponseBufferInternal)); {
if (!info) { const char *msg = NULL;
fprintf(stderr, "Cannot allocate memory for responseInfo\n"); if (errnoval < 0 || errnoval >= sys_nerr) {
return NULL; msg = "Invalid Error Code";
} else if (sys_errlist == NULL) {
msg = "Unknown Error";
} else {
msg = sys_errlist[errnoval];
} }
info->remaining = 0; return msg;
info->offset = 0;
info->content = NULL;
return info;
} }
void freeResponseBuffer(ResponseBuffer buffer) { int initResponseBuffer(struct ResponseBuffer **buffer)
{
struct ResponseBuffer *info = NULL;
int ret = 0;
info = calloc(1, sizeof(struct ResponseBuffer));
if (!info) {
ret = ENOMEM;
}
*buffer = info;
return ret;
}
void freeResponseBuffer(struct ResponseBuffer *buffer)
{
if (buffer) { if (buffer) {
if (buffer->content) { if (buffer->content) {
free(buffer->content); free(buffer->content);
@ -46,8 +61,9 @@ void freeResponseBuffer(ResponseBuffer buffer) {
} }
} }
void freeResponse(Response resp) { void freeResponse(struct Response *resp)
if(resp) { {
if (resp) {
freeResponseBuffer(resp->body); freeResponseBuffer(resp->body);
freeResponseBuffer(resp->header); freeResponseBuffer(resp->header);
free(resp); free(resp);
@ -55,21 +71,30 @@ void freeResponse(Response resp) {
} }
} }
/* Callback for allocating local buffer and reading data to local buffer */ /**
static size_t writefunc(void *ptr, size_t size, size_t nmemb, ResponseBuffer rbuffer) { * Callback used by libcurl for allocating local buffer and
* reading data to local buffer
*/
static size_t writefunc(void *ptr, size_t size,
size_t nmemb, struct ResponseBuffer *rbuffer)
{
void *temp = NULL;
if (size * nmemb < 1) { if (size * nmemb < 1) {
return 0; return 0;
} }
if (!rbuffer) { if (!rbuffer) {
fprintf(stderr, "In writefunc, ResponseBuffer is NULL.\n"); fprintf(stderr,
return -1; "ERROR: ResponseBuffer is NULL for the callback writefunc.\n");
return 0;
} }
if (rbuffer->remaining < size * nmemb) { if (rbuffer->remaining < size * nmemb) {
rbuffer->content = realloc(rbuffer->content, rbuffer->offset + size * nmemb + 1); temp = realloc(rbuffer->content, rbuffer->offset + size * nmemb + 1);
if (rbuffer->content == NULL) { if (temp == NULL) {
return -1; fprintf(stderr, "ERROR: fail to realloc in callback writefunc.\n");
return 0;
} }
rbuffer->content = temp;
rbuffer->remaining = size * nmemb; rbuffer->remaining = size * nmemb;
} }
memcpy(rbuffer->content + rbuffer->offset, ptr, size * nmemb); memcpy(rbuffer->content + rbuffer->offset, ptr, size * nmemb);
@ -80,67 +105,84 @@ static size_t writefunc(void *ptr, size_t size, size_t nmemb, ResponseBuffer rbu
} }
/** /**
* Callback for reading data to buffer provided by user, * Callback used by libcurl for reading data into buffer provided by user,
* thus no need to reallocate buffer. * thus no need to reallocate buffer.
*/ */
static size_t writefunc_withbuffer(void *ptr, size_t size, size_t nmemb, ResponseBuffer rbuffer) { static size_t writeFuncWithUserBuffer(void *ptr, size_t size,
size_t nmemb, struct ResponseBuffer *rbuffer)
{
size_t toCopy = 0;
if (size * nmemb < 1) { if (size * nmemb < 1) {
return 0; return 0;
} }
if (!rbuffer || !rbuffer->content) { if (!rbuffer || !rbuffer->content) {
fprintf(stderr, "In writefunc_withbuffer, the buffer provided by user is NULL.\n"); fprintf(stderr,
"ERROR: buffer to read is NULL for the "
"callback writeFuncWithUserBuffer.\n");
return 0; return 0;
} }
size_t toCopy = rbuffer->remaining < (size * nmemb) ? rbuffer->remaining : (size * nmemb); toCopy = rbuffer->remaining < (size * nmemb) ?
rbuffer->remaining : (size * nmemb);
memcpy(rbuffer->content + rbuffer->offset, ptr, toCopy); memcpy(rbuffer->content + rbuffer->offset, ptr, toCopy);
rbuffer->offset += toCopy; rbuffer->offset += toCopy;
rbuffer->remaining -= toCopy; rbuffer->remaining -= toCopy;
return toCopy; return toCopy;
} }
//callback for writing data to remote peer /**
static size_t readfunc(void *ptr, size_t size, size_t nmemb, void *stream) { * Callback used by libcurl for writing data to remote peer
*/
static size_t readfunc(void *ptr, size_t size, size_t nmemb, void *stream)
{
struct webhdfsBuffer *wbuffer = NULL;
if (size * nmemb < 1) { if (size * nmemb < 1) {
fprintf(stderr, "In readfunc callback: size * nmemb == %ld\n", size * nmemb);
return 0; return 0;
} }
webhdfsBuffer *wbuffer = (webhdfsBuffer *) stream;
wbuffer = stream;
pthread_mutex_lock(&wbuffer->writeMutex); pthread_mutex_lock(&wbuffer->writeMutex);
while (wbuffer->remaining == 0) { while (wbuffer->remaining == 0) {
/* /*
* the current remainning bytes to write is 0, * The current remainning bytes to write is 0,
* check whether need to finish the transfer * check closeFlag to see whether need to finish the transfer.
* if yes, return 0; else, wait * if yes, return 0; else, wait
*/ */
if (wbuffer->closeFlag) { if (wbuffer->closeFlag) { // We can close the transfer now
//we can close the transfer now //For debug
fprintf(stderr, "CloseFlag is set, ready to close the transfer\n"); fprintf(stderr, "CloseFlag is set, ready to close the transfer\n");
pthread_mutex_unlock(&wbuffer->writeMutex); pthread_mutex_unlock(&wbuffer->writeMutex);
return 0; return 0;
} else { } else {
// len == 0 indicates that user's buffer has been transferred // remaining == 0 but closeFlag is not set
// indicates that user's buffer has been transferred
pthread_cond_signal(&wbuffer->transfer_finish); pthread_cond_signal(&wbuffer->transfer_finish);
pthread_cond_wait(&wbuffer->newwrite_or_close, &wbuffer->writeMutex); pthread_cond_wait(&wbuffer->newwrite_or_close,
&wbuffer->writeMutex);
} }
} }
if(wbuffer->remaining > 0 && !wbuffer->closeFlag) { if (wbuffer->remaining > 0 && !wbuffer->closeFlag) {
size_t copySize = wbuffer->remaining < size * nmemb ? wbuffer->remaining : size * nmemb; size_t copySize = wbuffer->remaining < size * nmemb ?
wbuffer->remaining : size * nmemb;
memcpy(ptr, wbuffer->wbuffer + wbuffer->offset, copySize); memcpy(ptr, wbuffer->wbuffer + wbuffer->offset, copySize);
wbuffer->offset += copySize; wbuffer->offset += copySize;
wbuffer->remaining -= copySize; wbuffer->remaining -= copySize;
pthread_mutex_unlock(&wbuffer->writeMutex); pthread_mutex_unlock(&wbuffer->writeMutex);
return copySize; return copySize;
} else { } else {
fprintf(stderr, "Webhdfs buffer is %ld, it should be a positive value!\n", wbuffer->remaining); fprintf(stderr, "ERROR: webhdfsBuffer's remaining is %ld, "
"it should be a positive value!\n", wbuffer->remaining);
pthread_mutex_unlock(&wbuffer->writeMutex); pthread_mutex_unlock(&wbuffer->writeMutex);
return 0; return 0;
} }
} }
static void initCurlGlobal() { /**
* Initialize the global libcurl environment
*/
static void initCurlGlobal()
{
if (!curlGlobalInited) { if (!curlGlobalInited) {
pthread_mutex_lock(&curlInitMutex); pthread_mutex_lock(&curlInitMutex);
if (!curlGlobalInited) { if (!curlGlobalInited) {
@ -151,202 +193,297 @@ static void initCurlGlobal() {
} }
} }
static Response launchCmd(char *url, enum HttpHeader method, enum Redirect followloc) { /**
CURL *curl; * Launch simple commands (commands without file I/O) and return response
CURLcode res; *
Response resp; * @param url Target URL
* @param method HTTP method (GET/PUT/POST)
* @param followloc Whether or not need to set CURLOPT_FOLLOWLOCATION
* @param response Response from remote service
* @return 0 for success and non-zero value to indicate error
*/
static int launchCmd(const char *url, enum HttpHeader method,
enum Redirect followloc, struct Response **response)
{
CURL *curl = NULL;
CURLcode curlCode;
int ret = 0;
struct Response *resp = NULL;
resp = (Response) calloc(1, sizeof(*resp)); resp = calloc(1, sizeof(struct Response));
if (!resp) { if (!resp) {
return NULL; return ENOMEM;
} }
resp->body = initResponseBuffer(); ret = initResponseBuffer(&(resp->body));
resp->header = initResponseBuffer(); if (ret) {
initCurlGlobal(); goto done;
curl = curl_easy_init(); /* get a curl handle */
if(curl) {
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writefunc);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, resp->body);
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc);
curl_easy_setopt(curl, CURLOPT_WRITEHEADER, resp->header);
curl_easy_setopt(curl, CURLOPT_URL, url); /* specify target URL */
switch(method) {
case GET:
break;
case PUT:
curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"PUT");
break;
case POST:
curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"POST");
break;
case DELETE:
curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"DELETE");
break;
default:
fprintf(stderr, "\nHTTP method not defined\n");
exit(EXIT_FAILURE);
}
if(followloc == YES) {
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
}
res = curl_easy_perform(curl); /* Now run the curl handler */
if(res != CURLE_OK) {
fprintf(stderr, "preform the URL %s failed\n", url);
return NULL;
}
curl_easy_cleanup(curl);
} }
return resp; ret = initResponseBuffer(&(resp->header));
} if (ret) {
goto done;
static Response launchRead_internal(char *url, enum HttpHeader method, enum Redirect followloc, Response resp) {
if (!resp || !resp->body || !resp->body->content) {
fprintf(stderr, "The user provided buffer should not be NULL!\n");
return NULL;
}
CURL *curl;
CURLcode res;
initCurlGlobal();
curl = curl_easy_init(); /* get a curl handle */
if(curl) {
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writefunc_withbuffer);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, resp->body);
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc);
curl_easy_setopt(curl, CURLOPT_WRITEHEADER, resp->header);
curl_easy_setopt(curl, CURLOPT_URL, url); /* specify target URL */
if(followloc == YES) {
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
}
res = curl_easy_perform(curl); /* Now run the curl handler */
if(res != CURLE_OK && res != CURLE_PARTIAL_FILE) {
fprintf(stderr, "preform the URL %s failed\n", url);
return NULL;
}
curl_easy_cleanup(curl);
}
return resp;
}
static Response launchWrite(const char *url, enum HttpHeader method, webhdfsBuffer *uploadBuffer) {
if (!uploadBuffer) {
fprintf(stderr, "upload buffer is NULL!\n");
errno = EINVAL;
return NULL;
} }
initCurlGlobal(); initCurlGlobal();
CURLcode res; curl = curl_easy_init();
Response response = (Response) calloc(1, sizeof(*response));
if (!response) {
fprintf(stderr, "failed to allocate memory for response\n");
return NULL;
}
response->body = initResponseBuffer();
response->header = initResponseBuffer();
//connect to the datanode in order to create the lease in the namenode
CURL *curl = curl_easy_init();
if (!curl) { if (!curl) {
fprintf(stderr, "Failed to initialize the curl handle.\n"); ret = ENOMEM; // curl_easy_init does not return error code,
return NULL; // and most of its errors are caused by malloc()
fprintf(stderr, "ERROR in curl_easy_init.\n");
goto done;
}
/* Set callback function for reading data from remote service */
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writefunc);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, resp->body);
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc);
curl_easy_setopt(curl, CURLOPT_WRITEHEADER, resp->header);
curl_easy_setopt(curl, CURLOPT_URL, url);
switch(method) {
case GET:
break;
case PUT:
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT");
break;
case POST:
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST");
break;
case DELETE:
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
break;
default:
ret = EINVAL;
fprintf(stderr, "ERROR: Invalid HTTP method\n");
goto done;
}
if (followloc == YES) {
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
}
/* Now run the curl handler */
curlCode = curl_easy_perform(curl);
if (curlCode != CURLE_OK) {
ret = EIO;
fprintf(stderr, "ERROR: preform the URL %s failed, <%d>: %s\n",
url, curlCode, curl_easy_strerror(curlCode));
}
done:
if (curl != NULL) {
curl_easy_cleanup(curl);
}
if (ret) {
free(resp);
resp = NULL;
}
*response = resp;
return ret;
}
/**
* Launch the read request. The request is sent to the NameNode and then
* redirected to corresponding DataNode
*
* @param url The URL for the read request
* @param resp The response containing the buffer provided by user
* @return 0 for success and non-zero value to indicate error
*/
static int launchReadInternal(const char *url, struct Response* resp)
{
CURL *curl;
CURLcode curlCode;
int ret = 0;
if (!resp || !resp->body || !resp->body->content) {
fprintf(stderr,
"ERROR: invalid user-provided buffer!\n");
return EINVAL;
}
initCurlGlobal();
/* get a curl handle */
curl = curl_easy_init();
if (!curl) {
fprintf(stderr, "ERROR in curl_easy_init.\n");
return ENOMEM;
}
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writeFuncWithUserBuffer);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, resp->body);
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc);
curl_easy_setopt(curl, CURLOPT_WRITEHEADER, resp->header);
curl_easy_setopt(curl, CURLOPT_URL, url);
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
curlCode = curl_easy_perform(curl);
if (curlCode != CURLE_OK && curlCode != CURLE_PARTIAL_FILE) {
ret = EIO;
fprintf(stderr, "ERROR: preform the URL %s failed, <%d>: %s\n",
url, curlCode, curl_easy_strerror(curlCode));
}
curl_easy_cleanup(curl);
return ret;
}
/**
* The function does the write operation by connecting to a DataNode.
* The function keeps the connection with the DataNode until
* the closeFlag is set. Whenever the current data has been sent out,
* the function blocks waiting for further input from user or close.
*
* @param url URL of the remote DataNode
* @param method PUT for create and POST for append
* @param uploadBuffer Buffer storing user's data to write
* @param response Response from remote service
* @return 0 for success and non-zero value to indicate error
*/
static int launchWrite(const char *url, enum HttpHeader method,
struct webhdfsBuffer *uploadBuffer,
struct Response **response)
{
CURLcode curlCode;
struct Response* resp = NULL;
struct curl_slist *chunk = NULL;
CURL *curl = NULL;
int ret = 0;
if (!uploadBuffer) {
fprintf(stderr, "ERROR: upload buffer is NULL!\n");
return EINVAL;
}
initCurlGlobal();
resp = calloc(1, sizeof(struct Response));
if (!resp) {
return ENOMEM;
}
ret = initResponseBuffer(&(resp->body));
if (ret) {
goto done;
}
ret = initResponseBuffer(&(resp->header));
if (ret) {
goto done;
}
// Connect to the datanode in order to create the lease in the namenode
curl = curl_easy_init();
if (!curl) {
fprintf(stderr, "ERROR: failed to initialize the curl handle.\n");
return ENOMEM;
} }
curl_easy_setopt(curl, CURLOPT_URL, url); curl_easy_setopt(curl, CURLOPT_URL, url);
if(curl) { curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writefunc);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writefunc); curl_easy_setopt(curl, CURLOPT_WRITEDATA, resp->body);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, response->body); curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc);
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc); curl_easy_setopt(curl, CURLOPT_WRITEHEADER, resp->header);
curl_easy_setopt(curl, CURLOPT_WRITEHEADER, response->header); curl_easy_setopt(curl, CURLOPT_READFUNCTION, readfunc);
curl_easy_setopt(curl, CURLOPT_READFUNCTION, readfunc); curl_easy_setopt(curl, CURLOPT_READDATA, uploadBuffer);
curl_easy_setopt(curl, CURLOPT_READDATA, uploadBuffer); curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
curl_easy_setopt(curl, CURLOPT_VERBOSE, 1); chunk = curl_slist_append(chunk, "Transfer-Encoding: chunked");
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk);
struct curl_slist *chunk = NULL; chunk = curl_slist_append(chunk, "Expect:");
chunk = curl_slist_append(chunk, "Transfer-Encoding: chunked"); curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk);
res = curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk);
chunk = curl_slist_append(chunk, "Expect:"); switch(method) {
res = curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk); case PUT:
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT");
switch(method) { break;
case GET: case POST:
break; curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST");
case PUT: break;
curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"PUT"); default:
break; ret = EINVAL;
case POST: fprintf(stderr, "ERROR: Invalid HTTP method\n");
curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"POST"); goto done;
break; }
case DELETE: curlCode = curl_easy_perform(curl);
curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"DELETE"); if (curlCode != CURLE_OK) {
break; ret = EIO;
default: fprintf(stderr, "ERROR: preform the URL %s failed, <%d>: %s\n",
fprintf(stderr, "\nHTTP method not defined\n"); url, curlCode, curl_easy_strerror(curlCode));
exit(EXIT_FAILURE);
}
res = curl_easy_perform(curl);
curl_slist_free_all(chunk);
curl_easy_cleanup(curl);
} }
return response; done:
if (chunk != NULL) {
curl_slist_free_all(chunk);
}
if (curl != NULL) {
curl_easy_cleanup(curl);
}
if (ret) {
free(resp);
resp = NULL;
}
*response = resp;
return ret;
} }
Response launchMKDIR(char *url) { int launchMKDIR(const char *url, struct Response **resp)
return launchCmd(url, PUT, NO); {
return launchCmd(url, PUT, NO, resp);
} }
Response launchRENAME(char *url) { int launchRENAME(const char *url, struct Response **resp)
return launchCmd(url, PUT, NO); {
return launchCmd(url, PUT, NO, resp);
} }
Response launchGFS(char *url) { int launchGFS(const char *url, struct Response **resp)
return launchCmd(url, GET, NO); {
return launchCmd(url, GET, NO, resp);
} }
Response launchLS(char *url) { int launchLS(const char *url, struct Response **resp)
return launchCmd(url, GET, NO); {
return launchCmd(url, GET, NO, resp);
} }
Response launchCHMOD(char *url) { int launchCHMOD(const char *url, struct Response **resp)
return launchCmd(url, PUT, NO); {
return launchCmd(url, PUT, NO, resp);
} }
Response launchCHOWN(char *url) { int launchCHOWN(const char *url, struct Response **resp)
return launchCmd(url, PUT, NO); {
return launchCmd(url, PUT, NO, resp);
} }
Response launchDELETE(char *url) { int launchDELETE(const char *url, struct Response **resp)
return launchCmd(url, DELETE, NO); {
return launchCmd(url, DELETE, NO, resp);
} }
Response launchOPEN(char *url, Response resp) { int launchOPEN(const char *url, struct Response* resp)
return launchRead_internal(url, GET, YES, resp); {
return launchReadInternal(url, resp);
} }
Response launchUTIMES(char *url) { int launchUTIMES(const char *url, struct Response **resp)
return launchCmd(url, PUT, NO); {
return launchCmd(url, PUT, NO, resp);
} }
Response launchNnWRITE(char *url) { int launchNnWRITE(const char *url, struct Response **resp)
return launchCmd(url, PUT, NO); {
return launchCmd(url, PUT, NO, resp);
} }
Response launchNnAPPEND(char *url) { int launchNnAPPEND(const char *url, struct Response **resp)
return launchCmd(url, POST, NO); {
return launchCmd(url, POST, NO, resp);
} }
Response launchDnWRITE(const char *url, webhdfsBuffer *buffer) { int launchDnWRITE(const char *url, struct webhdfsBuffer *buffer,
return launchWrite(url, PUT, buffer); struct Response **resp)
{
return launchWrite(url, PUT, buffer, resp);
} }
Response launchDnAPPEND(const char *url, webhdfsBuffer *buffer) { int launchDnAPPEND(const char *url, struct webhdfsBuffer *buffer,
return launchWrite(url, POST, buffer); struct Response **resp)
{
return launchWrite(url, POST, buffer, resp);
} }
Response launchSETREPLICATION(char *url) { int launchSETREPLICATION(const char *url, struct Response **resp)
return launchCmd(url, PUT, NO); {
return launchCmd(url, PUT, NO, resp);
} }

View File

@ -26,6 +26,7 @@
#include <pthread.h> /* for pthread_t */ #include <pthread.h> /* for pthread_t */
#include <unistd.h> /* for size_t */ #include <unistd.h> /* for size_t */
/** enum indicating the type of hdfs stream */
enum hdfsStreamType enum hdfsStreamType
{ {
UNINITIALIZED = 0, UNINITIALIZED = 0,
@ -36,28 +37,39 @@ enum hdfsStreamType
/** /**
* webhdfsBuffer - used for hold the data for read/write from/to http connection * webhdfsBuffer - used for hold the data for read/write from/to http connection
*/ */
typedef struct { struct webhdfsBuffer {
const char *wbuffer; // The user's buffer for uploading const char *wbuffer; /* The user's buffer for uploading */
size_t remaining; // Length of content size_t remaining; /* Length of content */
size_t offset; // offset for reading size_t offset; /* offset for reading */
int openFlag; // Check whether the hdfsOpenFile has been called before /* Check whether the hdfsOpenFile has been called before */
int closeFlag; // Whether to close the http connection for writing int openFlag;
pthread_mutex_t writeMutex; // Synchronization between the curl and hdfsWrite threads /* Whether to close the http connection for writing */
pthread_cond_t newwrite_or_close; // Transferring thread waits for this condition int closeFlag;
// when there is no more content for transferring in the buffer /* Synchronization between the curl and hdfsWrite threads */
pthread_cond_t transfer_finish; // Condition used to indicate finishing transferring (one buffer) pthread_mutex_t writeMutex;
} webhdfsBuffer; /*
* Transferring thread waits for this condition
* when there is no more content for transferring in the buffer
*/
pthread_cond_t newwrite_or_close;
/* Condition used to indicate finishing transferring (one buffer) */
pthread_cond_t transfer_finish;
};
/** File handle for webhdfs */
struct webhdfsFileHandle { struct webhdfsFileHandle {
char *absPath; char *absPath; /* Absolute path of file */
int bufferSize; int bufferSize; /* Size of buffer */
short replication; short replication; /* Number of replication */
tSize blockSize; tSize blockSize; /* Block size */
char *datanode; char *datanode; /* URL of the DataNode */
webhdfsBuffer *uploadBuffer; /* webhdfsBuffer handle used to store the upload data */
struct webhdfsBuffer *uploadBuffer;
/* The thread used for data transferring */
pthread_t connThread; pthread_t connThread;
}; };
/** Type of http header */
enum HttpHeader { enum HttpHeader {
GET, GET,
PUT, PUT,
@ -65,44 +77,218 @@ enum HttpHeader {
DELETE DELETE
}; };
/** Whether to redirect */
enum Redirect { enum Redirect {
YES, YES,
NO NO
}; };
typedef struct { /** Buffer used for holding response */
struct ResponseBuffer {
char *content; char *content;
size_t remaining; size_t remaining;
size_t offset; size_t offset;
} ResponseBufferInternal; };
typedef ResponseBufferInternal *ResponseBuffer;
/** /**
* The response got through webhdfs * The response got through webhdfs
*/ */
typedef struct { struct Response {
ResponseBuffer body; struct ResponseBuffer *body;
ResponseBuffer header; struct ResponseBuffer *header;
}* Response; };
ResponseBuffer initResponseBuffer(); /**
void freeResponseBuffer(ResponseBuffer buffer); * Create and initialize a ResponseBuffer
void freeResponse(Response resp); *
* @param buffer Pointer pointing to new created ResponseBuffer handle
* @return 0 for success, non-zero value to indicate error
*/
int initResponseBuffer(struct ResponseBuffer **buffer) __attribute__ ((warn_unused_result));
Response launchMKDIR(char *url); /**
Response launchRENAME(char *url); * Free the given ResponseBuffer
Response launchCHMOD(char *url); *
Response launchGFS(char *url); * @param buffer The ResponseBuffer to free
Response launchLS(char *url); */
Response launchDELETE(char *url); void freeResponseBuffer(struct ResponseBuffer *buffer);
Response launchCHOWN(char *url);
Response launchOPEN(char *url, Response resp);
Response launchUTIMES(char *url);
Response launchNnWRITE(char *url);
Response launchDnWRITE(const char *url, webhdfsBuffer *buffer); /**
Response launchNnAPPEND(char *url); * Free the given Response
Response launchSETREPLICATION(char *url); *
Response launchDnAPPEND(const char *url, webhdfsBuffer *buffer); * @param resp The Response to free
*/
void freeResponse(struct Response *resp);
/**
* Send the MKDIR request to NameNode using the given URL.
* The NameNode will execute the operation and return the result as response.
*
* @param url The URL for MKDIR operation
* @param response Response handle to store response returned from the NameNode
* @return 0 for success, non-zero value to indicate error
*/
int launchMKDIR(const char *url,
struct Response **response) __attribute__ ((warn_unused_result));
/**
* Send the RENAME request to NameNode using the given URL.
* The NameNode will execute the operation and return the result as response.
*
* @param url The URL for RENAME operation
* @param response Response handle to store response returned from the NameNode
* @return 0 for success, non-zero value to indicate error
*/
int launchRENAME(const char *url,
struct Response **response) __attribute__ ((warn_unused_result));
/**
* Send the CHMOD request to NameNode using the given URL.
* The NameNode will execute the operation and return the result as response.
*
* @param url The URL for CHMOD operation
* @param response Response handle to store response returned from the NameNode
* @return 0 for success, non-zero value to indicate error
*/
int launchCHMOD(const char *url,
struct Response **response) __attribute__ ((warn_unused_result));
/**
* Send the GetFileStatus request to NameNode using the given URL.
* The NameNode will execute the operation and return the result as response.
*
* @param url The URL for GetFileStatus operation
* @param response Response handle to store response returned from the NameNode,
* containing either file status or exception information
* @return 0 for success, non-zero value to indicate error
*/
int launchGFS(const char *url,
struct Response **response) __attribute__ ((warn_unused_result));
/**
* Send the LS (LISTSTATUS) request to NameNode using the given URL.
* The NameNode will execute the operation and return the result as response.
*
* @param url The URL for LISTSTATUS operation
* @param response Response handle to store response returned from the NameNode
* @return 0 for success, non-zero value to indicate error
*/
int launchLS(const char *url,
struct Response **response) __attribute__ ((warn_unused_result));
/**
* Send the DELETE request to NameNode using the given URL.
* The NameNode will execute the operation and return the result as response.
*
* @param url The URL for DELETE operation
* @param response Response handle to store response returned from the NameNode
* @return 0 for success, non-zero value to indicate error
*/
int launchDELETE(const char *url,
struct Response **response) __attribute__ ((warn_unused_result));
/**
* Send the CHOWN request to NameNode using the given URL.
* The NameNode will execute the operation and return the result as response.
*
* @param url The URL for CHOWN operation
* @param response Response handle to store response returned from the NameNode
* @return 0 for success, non-zero value to indicate error
*/
int launchCHOWN(const char *url,
struct Response **response) __attribute__ ((warn_unused_result));
/**
* Send the OPEN request to NameNode using the given URL,
* asking for reading a file (within a range).
* The NameNode first redirects the request to the datanode
* that holds the corresponding first block of the file (within a range),
* and the datanode returns the content of the file through the HTTP connection.
*
* @param url The URL for OPEN operation
* @param resp The response holding user's buffer.
The file content will be written into the buffer.
* @return 0 for success, non-zero value to indicate error
*/
int launchOPEN(const char *url,
struct Response* resp) __attribute__ ((warn_unused_result));
/**
* Send the SETTIMES request to NameNode using the given URL.
* The NameNode will execute the operation and return the result as response.
*
* @param url The URL for SETTIMES operation
* @param response Response handle to store response returned from the NameNode
* @return 0 for success, non-zero value to indicate error
*/
int launchUTIMES(const char *url,
struct Response **response) __attribute__ ((warn_unused_result));
/**
* Send the WRITE/CREATE request to NameNode using the given URL.
* The NameNode will choose the writing target datanodes
* and return the first datanode in the pipeline as response
*
* @param url The URL for WRITE/CREATE operation connecting to NameNode
* @param response Response handle to store response returned from the NameNode
* @return 0 for success, non-zero value to indicate error
*/
int launchNnWRITE(const char *url,
struct Response **response) __attribute__ ((warn_unused_result));
/**
* Send the WRITE request along with to-write content to
* the corresponding DataNode using the given URL.
* The DataNode will write the data and return the response.
*
* @param url The URL for WRITE operation connecting to DataNode
* @param buffer The webhdfsBuffer containing data to be written to hdfs
* @param response Response handle to store response returned from the NameNode
* @return 0 for success, non-zero value to indicate error
*/
int launchDnWRITE(const char *url, struct webhdfsBuffer *buffer,
struct Response **response) __attribute__ ((warn_unused_result));
/**
* Send the WRITE (APPEND) request to NameNode using the given URL.
* The NameNode determines the DataNode for appending and
* sends its URL back as response.
*
* @param url The URL for APPEND operation
* @param response Response handle to store response returned from the NameNode
* @return 0 for success, non-zero value to indicate error
*/
int launchNnAPPEND(const char *url, struct Response **response) __attribute__ ((warn_unused_result));
/**
* Send the SETREPLICATION request to NameNode using the given URL.
* The NameNode will execute the operation and return the result as response.
*
* @param url The URL for SETREPLICATION operation
* @param response Response handle to store response returned from the NameNode
* @return 0 for success, non-zero value to indicate error
*/
int launchSETREPLICATION(const char *url,
struct Response **response) __attribute__ ((warn_unused_result));
/**
* Send the APPEND request along with the content to DataNode.
* The DataNode will do the appending and return the result as response.
*
* @param url The URL for APPEND operation connecting to DataNode
* @param buffer The webhdfsBuffer containing data to be appended
* @param response Response handle to store response returned from the NameNode
* @return 0 for success, non-zero value to indicate error
*/
int launchDnAPPEND(const char *url, struct webhdfsBuffer *buffer,
struct Response **response) __attribute__ ((warn_unused_result));
/**
* Call sys_errlist to get the error message string for the given error code
*
* @param errnoval The error code value
* @return The error message string mapped to the given error code
*/
const char *hdfs_strerror(int errnoval);
#endif //_HDFS_HTTP_CLIENT_H_ #endif //_HDFS_HTTP_CLIENT_H_

View File

@ -22,233 +22,381 @@
#include <stdio.h> #include <stdio.h>
#include <errno.h> #include <errno.h>
#define NUM_OF_PERMISSION_BITS 4 #define PERM_STR_LEN 4 // "644" + one byte for NUL
#define NUM_OF_PORT_BITS 6 #define SHORT_STR_LEN 6 // 65535 + NUL
#define NUM_OF_REPLICATION_BITS 6 #define LONG_STR_LEN 21 // 2^64-1 = 18446744073709551615 + NUL
static char *prepareQUERY(const char *host, int nnPort, const char *srcpath, const char *OP, const char *user) { /**
size_t length; * Create query based on NameNode hostname,
char *url; * NameNode port, path, operation and other parameters
const char *const protocol = "http://"; *
const char *const prefix = "/webhdfs/v1"; * @param host NameNode hostName
char *temp; * @param nnPort Port of NameNode
char *port; * @param path Absolute path for the corresponding file
port= (char*) malloc(NUM_OF_PORT_BITS); * @param op Operations
if (!port) { * @param paraNum Number of remaining parameters
return NULL; * @param paraNames Names of remaining parameters
} * @param paraValues Values of remaining parameters
sprintf(port,"%d",nnPort); * @param url Holding the created URL
if (user != NULL) { * @return 0 on success and non-zero value to indicate error
length = strlen(protocol) + strlen(host) + strlen(":") + strlen(port) + strlen(prefix) + strlen(srcpath) + strlen ("?op=") + strlen(OP) + strlen("&user.name=") + strlen(user); */
} else { static int createQueryURL(const char *host, unsigned int nnPort,
length = strlen(protocol) + strlen(host) + strlen(":") + strlen(port) + strlen(prefix) + strlen(srcpath) + strlen ("?op=") + strlen(OP); const char *path, const char *op, int paraNum,
} const char **paraNames, const char **paraValues,
char **queryUrl)
{
size_t length = 0;
int i = 0, offset = 0, ret = 0;
char *url = NULL;
const char *protocol = "http://";
const char *prefix = "/webhdfs/v1";
temp = (char*) malloc(length + 1); if (!paraNames || !paraValues) {
if (!temp) { return EINVAL;
return NULL;
} }
strcpy(temp,protocol); length = strlen(protocol) + strlen(host) + strlen(":") +
temp = strcat(temp,host); SHORT_STR_LEN + strlen(prefix) + strlen(path) +
temp = strcat(temp,":"); strlen ("?op=") + strlen(op);
temp = strcat(temp,port); for (i = 0; i < paraNum; i++) {
temp = strcat(temp,prefix); if (paraNames[i] && paraValues[i]) {
temp = strcat(temp,srcpath); length += 2 + strlen(paraNames[i]) + strlen(paraValues[i]);
temp = strcat(temp,"?op="); }
temp = strcat(temp,OP); }
if (user) { url = malloc(length); // The '\0' has already been included
temp = strcat(temp,"&user.name="); // when using SHORT_STR_LEN
temp = strcat(temp,user); if (!url) {
} return ENOMEM;
url = temp; }
return url;
} offset = snprintf(url, length, "%s%s:%d%s%s?op=%s",
protocol, host, nnPort, prefix, path, op);
if (offset >= length || offset < 0) {
static int decToOctal(int decNo) { ret = EIO;
int octNo=0; goto done;
int expo =0; }
while (decNo != 0) { for (i = 0; i < paraNum; i++) {
octNo = ((decNo % 8) * pow(10,expo)) + octNo; if (!paraNames[i] || !paraValues[i] || paraNames[i][0] == '\0' ||
decNo = decNo / 8; paraValues[i][0] == '\0') {
expo++; continue;
} }
return octNo; offset += snprintf(url + offset, length - offset,
} "&%s=%s", paraNames[i], paraValues[i]);
if (offset >= length || offset < 0) {
ret = EIO;
char *prepareMKDIR(const char *host, int nnPort, const char *dirsubpath, const char *user) { goto done;
return prepareQUERY(host, nnPort, dirsubpath, "MKDIRS", user); }
} }
done:
if (ret) {
char *prepareMKDIRwithMode(const char *host, int nnPort, const char *dirsubpath, int mode, const char *user) { free(url);
char *url; return ret;
char *permission; }
permission = (char*) malloc(NUM_OF_PERMISSION_BITS); *queryUrl = url;
if (!permission) { return 0;
return NULL; }
}
mode = decToOctal(mode); int createUrlForMKDIR(const char *host, int nnPort,
sprintf(permission,"%d",mode); const char *path, const char *user, char **url)
url = prepareMKDIR(host, nnPort, dirsubpath, user); {
url = realloc(url,(strlen(url) + strlen("&permission=") + strlen(permission) + 1)); const char *userPara = "user.name";
if (!url) { return createQueryURL(host, nnPort, path, "MKDIRS", 1,
return NULL; &userPara, &user, url);
} }
url = strcat(url,"&permission=");
url = strcat(url,permission); int createUrlForGetFileStatus(const char *host, int nnPort, const char *path,
return url; const char *user, char **url)
} {
const char *userPara = "user.name";
return createQueryURL(host, nnPort, path, "GETFILESTATUS", 1,
char *prepareRENAME(const char *host, int nnPort, const char *srcpath, const char *destpath, const char *user) { &userPara, &user, url);
char *url; }
url = prepareQUERY(host, nnPort, srcpath, "RENAME", user);
url = realloc(url,(strlen(url) + strlen("&destination=") + strlen(destpath) + 1)); int createUrlForLS(const char *host, int nnPort, const char *path,
if (!url) { const char *user, char **url)
return NULL; {
} const char *userPara = "user.name";
url = strcat(url,"&destination="); return createQueryURL(host, nnPort, path, "LISTSTATUS",
url = strcat(url,destpath); 1, &userPara, &user, url);
return url; }
}
int createUrlForNnAPPEND(const char *host, int nnPort, const char *path,
char *prepareGFS(const char *host, int nnPort, const char *dirsubpath, const char *user) { const char *user, char **url)
return (prepareQUERY(host, nnPort, dirsubpath, "GETFILESTATUS", user)); {
} const char *userPara = "user.name";
return createQueryURL(host, nnPort, path, "APPEND",
char *prepareLS(const char *host, int nnPort, const char *dirsubpath, const char *user) { 1, &userPara, &user, url);
return (prepareQUERY(host, nnPort, dirsubpath, "LISTSTATUS", user)); }
}
int createUrlForMKDIRwithMode(const char *host, int nnPort, const char *path,
char *prepareCHMOD(const char *host, int nnPort, const char *dirsubpath, int mode, const char *user) { int mode, const char *user, char **url)
char *url; {
char *permission; int strlength;
permission = (char*) malloc(NUM_OF_PERMISSION_BITS); char permission[PERM_STR_LEN];
if (!permission) { const char *paraNames[2], *paraValues[2];
return NULL;
} paraNames[0] = "permission";
mode &= 0x3FFF; paraNames[1] = "user.name";
mode = decToOctal(mode); memset(permission, 0, PERM_STR_LEN);
sprintf(permission,"%d",mode); strlength = snprintf(permission, PERM_STR_LEN, "%o", mode);
url = prepareQUERY(host, nnPort, dirsubpath, "SETPERMISSION", user); if (strlength < 0 || strlength >= PERM_STR_LEN) {
url = realloc(url,(strlen(url) + strlen("&permission=") + strlen(permission) + 1)); return EIO;
if (!url) { }
return NULL; paraValues[0] = permission;
} paraValues[1] = user;
url = strcat(url,"&permission=");
url = strcat(url,permission); return createQueryURL(host, nnPort, path, "MKDIRS", 2,
return url; paraNames, paraValues, url);
} }
char *prepareDELETE(const char *host, int nnPort, const char *dirsubpath, int recursive, const char *user) { int createUrlForRENAME(const char *host, int nnPort, const char *srcpath,
char *url = (prepareQUERY(host, nnPort, dirsubpath, "DELETE", user)); const char *destpath, const char *user, char **url)
char *recursiveFlag = (char *)malloc(6); {
if (!recursive) { const char *paraNames[2], *paraValues[2];
strcpy(recursiveFlag, "false"); paraNames[0] = "destination";
} else { paraNames[1] = "user.name";
strcpy(recursiveFlag, "true"); paraValues[0] = destpath;
} paraValues[1] = user;
url = (char *) realloc(url, strlen(url) + strlen("&recursive=") + strlen(recursiveFlag) + 1);
if (!url) { return createQueryURL(host, nnPort, srcpath,
return NULL; "RENAME", 2, paraNames, paraValues, url);
} }
strcat(url, "&recursive="); int createUrlForCHMOD(const char *host, int nnPort, const char *path,
strcat(url, recursiveFlag); int mode, const char *user, char **url)
return url; {
} int strlength;
char permission[PERM_STR_LEN];
char *prepareCHOWN(const char *host, int nnPort, const char *dirsubpath, const char *owner, const char *group, const char *user) { const char *paraNames[2], *paraValues[2];
char *url;
url = prepareQUERY(host, nnPort, dirsubpath, "SETOWNER", user); paraNames[0] = "permission";
if (!url) { paraNames[1] = "user.name";
return NULL; memset(permission, 0, PERM_STR_LEN);
} strlength = snprintf(permission, PERM_STR_LEN, "%o", mode);
if(owner != NULL) { if (strlength < 0 || strlength >= PERM_STR_LEN) {
url = realloc(url,(strlen(url) + strlen("&owner=") + strlen(owner) + 1)); return EIO;
url = strcat(url,"&owner="); }
url = strcat(url,owner); paraValues[0] = permission;
} paraValues[1] = user;
if (group != NULL) {
url = realloc(url,(strlen(url) + strlen("&group=") + strlen(group) + 1)); return createQueryURL(host, nnPort, path, "SETPERMISSION",
url = strcat(url,"&group="); 2, paraNames, paraValues, url);
url = strcat(url,group); }
}
return url; int createUrlForDELETE(const char *host, int nnPort, const char *path,
} int recursive, const char *user, char **url)
{
char *prepareOPEN(const char *host, int nnPort, const char *dirsubpath, const char *user, size_t offset, size_t length) { const char *paraNames[2], *paraValues[2];
char *base_url = prepareQUERY(host, nnPort, dirsubpath, "OPEN", user); paraNames[0] = "recursive";
char *url = (char *) malloc(strlen(base_url) + strlen("&offset=") + 15 + strlen("&length=") + 15); paraNames[1] = "user.name";
if (!url) { if (recursive) {
return NULL; paraValues[0] = "true";
} } else {
sprintf(url, "%s&offset=%ld&length=%ld", base_url, offset, length); paraValues[0] = "false";
return url; }
} paraValues[1] = user;
char *prepareUTIMES(const char *host, int nnPort, const char *dirsubpath, long unsigned mTime, long unsigned aTime, const char *user) { return createQueryURL(host, nnPort, path, "DELETE",
char *url; 2, paraNames, paraValues, url);
char *modTime; }
char *acsTime;
modTime = (char*) malloc(12); int createUrlForCHOWN(const char *host, int nnPort, const char *path,
acsTime = (char*) malloc(12); const char *owner, const char *group,
url = prepareQUERY(host, nnPort, dirsubpath, "SETTIMES", user); const char *user, char **url)
sprintf(modTime,"%lu",mTime); {
sprintf(acsTime,"%lu",aTime); const char *paraNames[3], *paraValues[3];
url = realloc(url,(strlen(url) + strlen("&modificationtime=") + strlen(modTime) + strlen("&accesstime=") + strlen(acsTime) + 1)); paraNames[0] = "owner";
if (!url) { paraNames[1] = "group";
return NULL; paraNames[2] = "user.name";
} paraValues[0] = owner;
url = strcat(url, "&modificationtime="); paraValues[1] = group;
url = strcat(url, modTime); paraValues[2] = user;
url = strcat(url,"&accesstime=");
url = strcat(url, acsTime); return createQueryURL(host, nnPort, path, "SETOWNER",
return url; 3, paraNames, paraValues, url);
} }
char *prepareNnWRITE(const char *host, int nnPort, const char *dirsubpath, const char *user, int16_t replication, size_t blockSize) { int createUrlForOPEN(const char *host, int nnPort, const char *path,
char *url; const char *user, size_t offset, size_t length, char **url)
url = prepareQUERY(host, nnPort, dirsubpath, "CREATE", user); {
url = realloc(url, (strlen(url) + strlen("&overwrite=true") + 1)); int strlength;
if (!url) { char offsetStr[LONG_STR_LEN], lengthStr[LONG_STR_LEN];
return NULL; const char *paraNames[3], *paraValues[3];
}
url = strcat(url, "&overwrite=true"); paraNames[0] = "offset";
if (replication > 0) { paraNames[1] = "length";
url = realloc(url, (strlen(url) + strlen("&replication=") + 6)); paraNames[2] = "user.name";
if (!url) { memset(offsetStr, 0, LONG_STR_LEN);
return NULL; memset(lengthStr, 0, LONG_STR_LEN);
strlength = snprintf(offsetStr, LONG_STR_LEN, "%lu", offset);
if (strlength < 0 || strlength >= LONG_STR_LEN) {
return EIO;
}
strlength = snprintf(lengthStr, LONG_STR_LEN, "%lu", length);
if (strlength < 0 || strlength >= LONG_STR_LEN) {
return EIO;
}
paraValues[0] = offsetStr;
paraValues[1] = lengthStr;
paraValues[2] = user;
return createQueryURL(host, nnPort, path, "OPEN",
3, paraNames, paraValues, url);
}
int createUrlForUTIMES(const char *host, int nnPort, const char *path,
long unsigned mTime, long unsigned aTime,
const char *user, char **url)
{
int strlength;
char modTime[LONG_STR_LEN], acsTime[LONG_STR_LEN];
const char *paraNames[3], *paraValues[3];
memset(modTime, 0, LONG_STR_LEN);
memset(acsTime, 0, LONG_STR_LEN);
strlength = snprintf(modTime, LONG_STR_LEN, "%lu", mTime);
if (strlength < 0 || strlength >= LONG_STR_LEN) {
return EIO;
}
strlength = snprintf(acsTime, LONG_STR_LEN, "%lu", aTime);
if (strlength < 0 || strlength >= LONG_STR_LEN) {
return EIO;
}
paraNames[0] = "modificationtime";
paraNames[1] = "accesstime";
paraNames[2] = "user.name";
paraValues[0] = modTime;
paraValues[1] = acsTime;
paraValues[2] = user;
return createQueryURL(host, nnPort, path, "SETTIMES",
3, paraNames, paraValues, url);
}
int createUrlForNnWRITE(const char *host, int nnPort,
const char *path, const char *user,
int16_t replication, size_t blockSize, char **url)
{
int strlength;
char repStr[SHORT_STR_LEN], blockSizeStr[LONG_STR_LEN];
const char *paraNames[4], *paraValues[4];
memset(repStr, 0, SHORT_STR_LEN);
memset(blockSizeStr, 0, LONG_STR_LEN);
if (replication > 0) {
strlength = snprintf(repStr, SHORT_STR_LEN, "%u", replication);
if (strlength < 0 || strlength >= SHORT_STR_LEN) {
return EIO;
} }
sprintf(url, "%s&replication=%d", url, replication);
} }
if (blockSize > 0) { if (blockSize > 0) {
url = realloc(url, (strlen(url) + strlen("&blocksize=") + 16)); strlength = snprintf(blockSizeStr, LONG_STR_LEN, "%lu", blockSize);
if (!url) { if (strlength < 0 || strlength >= LONG_STR_LEN) {
return NULL; return EIO;
} }
sprintf(url, "%s&blocksize=%ld", url, blockSize);
} }
return url; paraNames[0] = "overwrite";
paraNames[1] = "replication";
paraNames[2] = "blocksize";
paraNames[3] = "user.name";
paraValues[0] = "true";
paraValues[1] = repStr;
paraValues[2] = blockSizeStr;
paraValues[3] = user;
return createQueryURL(host, nnPort, path, "CREATE",
4, paraNames, paraValues, url);
} }
char *prepareNnAPPEND(const char *host, int nnPort, const char *dirsubpath, const char *user) { int createUrlForSETREPLICATION(const char *host, int nnPort,
return (prepareQUERY(host, nnPort, dirsubpath, "APPEND", user)); const char *path, int16_t replication,
} const char *user, char **url)
char *prepareSETREPLICATION(const char *host, int nnPort, const char *path, int16_t replication, const char *user)
{ {
char *url = prepareQUERY(host, nnPort, path, "SETREPLICATION", user); char repStr[SHORT_STR_LEN];
char *replicationNum = (char *) malloc(NUM_OF_REPLICATION_BITS); const char *paraNames[2], *paraValues[2];
sprintf(replicationNum, "%u", replication); int strlength;
url = realloc(url, strlen(url) + strlen("&replication=") + strlen(replicationNum)+ 1);
if (!url) { memset(repStr, 0, SHORT_STR_LEN);
return NULL; if (replication > 0) {
strlength = snprintf(repStr, SHORT_STR_LEN, "%u", replication);
if (strlength < 0 || strlength >= SHORT_STR_LEN) {
return EIO;
}
}
paraNames[0] = "replication";
paraNames[1] = "user.name";
paraValues[0] = repStr;
paraValues[1] = user;
return createQueryURL(host, nnPort, path, "SETREPLICATION",
2, paraNames, paraValues, url);
}
int createUrlForGetBlockLocations(const char *host, int nnPort,
const char *path, size_t offset,
size_t length, const char *user, char **url)
{
char offsetStr[LONG_STR_LEN], lengthStr[LONG_STR_LEN];
const char *paraNames[3], *paraValues[3];
int strlength;
memset(offsetStr, 0, LONG_STR_LEN);
memset(lengthStr, 0, LONG_STR_LEN);
if (offset > 0) {
strlength = snprintf(offsetStr, LONG_STR_LEN, "%lu", offset);
if (strlength < 0 || strlength >= LONG_STR_LEN) {
return EIO;
}
}
if (length > 0) {
strlength = snprintf(lengthStr, LONG_STR_LEN, "%lu", length);
if (strlength < 0 || strlength >= LONG_STR_LEN) {
return EIO;
}
}
paraNames[0] = "offset";
paraNames[1] = "length";
paraNames[2] = "user.name";
paraValues[0] = offsetStr;
paraValues[1] = lengthStr;
paraValues[2] = user;
return createQueryURL(host, nnPort, path, "GET_BLOCK_LOCATIONS",
3, paraNames, paraValues, url);
}
int createUrlForReadFromDatanode(const char *dnHost, int dnPort,
const char *path, size_t offset,
size_t length, const char *user,
const char *namenodeRpcAddr, char **url)
{
char offsetStr[LONG_STR_LEN], lengthStr[LONG_STR_LEN];
const char *paraNames[4], *paraValues[4];
int strlength;
memset(offsetStr, 0, LONG_STR_LEN);
memset(lengthStr, 0, LONG_STR_LEN);
if (offset > 0) {
strlength = snprintf(offsetStr, LONG_STR_LEN, "%lu", offset);
if (strlength < 0 || strlength >= LONG_STR_LEN) {
return EIO;
}
}
if (length > 0) {
strlength = snprintf(lengthStr, LONG_STR_LEN, "%lu", length);
if (strlength < 0 || strlength >= LONG_STR_LEN) {
return EIO;
}
} }
url = strcat(url, "&replication="); paraNames[0] = "offset";
url = strcat(url, replicationNum); paraNames[1] = "length";
return url; paraNames[2] = "user.name";
paraNames[3] = "namenoderpcaddress";
paraValues[0] = offsetStr;
paraValues[1] = lengthStr;
paraValues[2] = user;
paraValues[3] = namenodeRpcAddr;
return createQueryURL(dnHost, dnPort, path, "OPEN",
4, paraNames, paraValues, url);
} }

View File

@ -20,22 +20,221 @@
#ifndef _HDFS_HTTP_QUERY_H_ #ifndef _HDFS_HTTP_QUERY_H_
#define _HDFS_HTTP_QUERY_H_ #define _HDFS_HTTP_QUERY_H_
#include <stdint.h> #include <unistd.h> /* for size_t */
#include <stdio.h> #include <inttypes.h> /* for int16_t */
char *prepareMKDIR(const char *host, int nnPort, const char *dirsubpath, const char *user); /**
char *prepareMKDIRwithMode(const char *host, int nnPort, const char *dirsubpath, int mode, const char *user); * Create the URL for a MKDIR request
char *prepareRENAME(const char *host, int nnPort, const char *srcpath, const char *destpath, const char *user); *
char *prepareCHMOD(const char *host, int nnPort, const char *dirsubpath, int mode, const char *user); * @param host The hostname of the NameNode
char *prepareGFS(const char *host, int nnPort, const char *dirsubpath, const char *user); * @param nnPort Port of the NameNode
char *prepareLS(const char *host, int nnPort, const char *dirsubpath, const char *user); * @param path Path of the dir to create
char *prepareDELETE(const char *host, int nnPort, const char *dirsubpath, int recursive, const char *user); * @param user User name
char *prepareCHOWN(const char *host, int nnPort, const char *dirsubpath, const char *owner, const char *group, const char *user); * @param url Holding the generated URL for MKDIR request
char *prepareOPEN(const char *host, int nnPort, const char *dirsubpath, const char *user, size_t offset, size_t length); * @return 0 on success and non-zero value on errors
char *prepareUTIMES(const char *host, int nnPort, const char *dirsubpath, long unsigned mTime, long unsigned aTime, const char *user); */
char *prepareNnWRITE(const char *host, int nnPort, const char *dirsubpath, const char *user, int16_t replication, size_t blockSize); int createUrlForMKDIR(const char *host, int nnPort,
char *prepareNnAPPEND(const char *host, int nnPort, const char *dirsubpath, const char *user); const char *path, const char *user,
char *prepareSETREPLICATION(const char *host, int nnPort, const char *path, int16_t replication, const char *user); char **url) __attribute__ ((warn_unused_result));
/**
* Create the URL for a MKDIR (with mode) request
*
* @param host The hostname of the NameNode
* @param nnPort Port of the NameNode
* @param path Path of the dir to create
* @param mode Mode of MKDIR
* @param user User name
* @param url Holding the generated URL for MKDIR request
* @return 0 on success and non-zero value on errors
*/
int createUrlForMKDIRwithMode(const char *host, int nnPort, const char *path,
int mode, const char *user,
char **url) __attribute__ ((warn_unused_result));
/**
* Create the URL for a RENAME request
*
* @param host The hostname of the NameNode
* @param nnPort Port of the NameNode
* @param srcpath Source path
* @param dstpath Destination path
* @param user User name
* @param url Holding the generated URL for RENAME request
* @return 0 on success and non-zero value on errors
*/
int createUrlForRENAME(const char *host, int nnPort, const char *srcpath,
const char *dstpath, const char *user,
char **url) __attribute__ ((warn_unused_result));
/**
* Create the URL for a CHMOD request
*
* @param host The hostname of the NameNode
* @param nnPort Port of the NameNode
* @param path Target path
* @param mode New mode for the file
* @param user User name
* @param url Holding the generated URL for CHMOD request
* @return 0 on success and non-zero value on errors
*/
int createUrlForCHMOD(const char *host, int nnPort, const char *path,
int mode, const char *user,
char **url) __attribute__ ((warn_unused_result));
/**
* Create the URL for a GETFILESTATUS request
*
* @param host The hostname of the NameNode
* @param nnPort Port of the NameNode
* @param path Path of the target file
* @param user User name
* @param url Holding the generated URL for GETFILESTATUS request
* @return 0 on success and non-zero value on errors
*/
int createUrlForGetFileStatus(const char *host, int nnPort,
const char *path, const char *user,
char **url) __attribute__ ((warn_unused_result));
/**
* Create the URL for a LISTSTATUS request
*
* @param host The hostname of the NameNode
* @param nnPort Port of the NameNode
* @param path Path of the directory for listing
* @param user User name
* @param url Holding the generated URL for LISTSTATUS request
* @return 0 on success and non-zero value on errors
*/
int createUrlForLS(const char *host, int nnPort,
const char *path, const char *user,
char **url) __attribute__ ((warn_unused_result));
/**
* Create the URL for a DELETE request
*
* @param host The hostname of the NameNode
* @param nnPort Port of the NameNode
* @param path Path of the file to be deletected
* @param recursive Whether or not to delete in a recursive way
* @param user User name
* @param url Holding the generated URL for DELETE request
* @return 0 on success and non-zero value on errors
*/
int createUrlForDELETE(const char *host, int nnPort, const char *path,
int recursive, const char *user,
char **url) __attribute__ ((warn_unused_result));
/**
* Create the URL for a CHOWN request
*
* @param host The hostname of the NameNode
* @param nnPort Port of the NameNode
* @param path Path of the target
* @param owner New owner
* @param group New group
* @param user User name
* @param url Holding the generated URL for CHOWN request
* @return 0 on success and non-zero value on errors
*/
int createUrlForCHOWN(const char *host, int nnPort, const char *path,
const char *owner, const char *group, const char *user,
char **url) __attribute__ ((warn_unused_result));
/**
* Create the URL for a OPEN/READ request
*
* @param host The hostname of the NameNode
* @param nnPort Port of the NameNode
* @param path Path of the file to read
* @param user User name
* @param offset Offset for reading (the start position for this read)
* @param length Length of the file to read
* @param url Holding the generated URL for OPEN/READ request
* @return 0 on success and non-zero value on errors
*/
int createUrlForOPEN(const char *host, int nnPort, const char *path,
const char *user, size_t offset, size_t length,
char **url) __attribute__ ((warn_unused_result));
/**
* Create the URL for a UTIMES (update time) request
*
* @param host The hostname of the NameNode
* @param nnPort Port of the NameNode
* @param path Path of the file for updating time
* @param mTime Modified time to set
* @param aTime Access time to set
* @param user User name
* @param url Holding the generated URL for UTIMES request
* @return 0 on success and non-zero value on errors
*/
int createUrlForUTIMES(const char *host, int nnPort, const char *path,
long unsigned mTime, long unsigned aTime,
const char *user,
char **url) __attribute__ ((warn_unused_result));
/**
* Create the URL for a WRITE/CREATE request (sent to NameNode)
*
* @param host The hostname of the NameNode
* @param nnPort Port of the NameNode
* @param path Path of the dir to create
* @param user User name
* @param replication Number of replication of the file
* @param blockSize Size of the block for the file
* @param url Holding the generated URL for WRITE request
* @return 0 on success and non-zero value on errors
*/
int createUrlForNnWRITE(const char *host, int nnPort, const char *path,
const char *user, int16_t replication, size_t blockSize,
char **url) __attribute__ ((warn_unused_result));
/**
* Create the URL for an APPEND request (sent to NameNode)
*
* @param host The hostname of the NameNode
* @param nnPort Port of the NameNode
* @param path Path of the file for appending
* @param user User name
* @param url Holding the generated URL for APPEND request
* @return 0 on success and non-zero value on errors
*/
int createUrlForNnAPPEND(const char *host, int nnPort,
const char *path, const char *user,
char **url) __attribute__ ((warn_unused_result));
/**
* Create the URL for a SETREPLICATION request
*
* @param host The hostname of the NameNode
* @param nnPort Port of the NameNode
* @param path Path of the target file
* @param replication New replication number
* @param user User name
* @param url Holding the generated URL for SETREPLICATION request
* @return 0 on success and non-zero value on errors
*/
int createUrlForSETREPLICATION(const char *host, int nnPort, const char *path,
int16_t replication, const char *user,
char **url) __attribute__ ((warn_unused_result));
/**
* Create the URL for a GET_BLOCK_LOCATIONS request
*
* @param host The hostname of the NameNode
* @param nnPort Port of the NameNode
* @param path Path of the target file
* @param offset The offset in the file
* @param length Length of the file content
* @param user User name
* @param url Holding the generated URL for GET_BLOCK_LOCATIONS request
* @return 0 on success and non-zero value on errors
*/
int createUrlForGetBlockLocations(const char *host, int nnPort,
const char *path, size_t offset,
size_t length, const char *user,
char **url) __attribute__ ((warn_unused_result));
#endif //_HDFS_HTTP_QUERY_H_ #endif //_HDFS_HTTP_QUERY_H_

View File

@ -25,6 +25,11 @@
#include <ctype.h> #include <ctype.h>
#include <jansson.h> #include <jansson.h>
static const char * const temporaryRedirectCode = "307 TEMPORARY_REDIRECT";
static const char * const twoHundredOKCode = "200 OK";
static const char * const twoHundredOneCreatedCode = "201 Created";
static const char * const httpHeaderString = "HTTP/1.1";
/** /**
* Exception information after calling JSON operations * Exception information after calling JSON operations
*/ */
@ -34,9 +39,6 @@ struct jsonException {
const char *message; const char *message;
}; };
static hdfsFileInfo *parseJsonGFS(json_t *jobj, hdfsFileInfo *fileStat,
int *numEntries, const char *operation);
static void dotsToSlashes(char *str) static void dotsToSlashes(char *str)
{ {
for (; *str != '\0'; str++) { for (; *str != '\0'; str++) {
@ -45,8 +47,9 @@ static void dotsToSlashes(char *str)
} }
} }
int printJsonExceptionV(struct jsonException *exc, int noPrintFlags, /** Print out the JSON exception information */
const char *fmt, va_list ap) static int printJsonExceptionV(struct jsonException *exc, int noPrintFlags,
const char *fmt, va_list ap)
{ {
char *javaClassName = NULL; char *javaClassName = NULL;
int excErrno = EINTERNAL, shouldPrint = 0; int excErrno = EINTERNAL, shouldPrint = 0;
@ -74,11 +77,23 @@ int printJsonExceptionV(struct jsonException *exc, int noPrintFlags,
return excErrno; return excErrno;
} }
int printJsonException(struct jsonException *exc, int noPrintFlags, /**
const char *fmt, ...) * Print out JSON exception information.
*
* @param exc The exception information to print and free
* @param noPrintFlags Flags which determine which exceptions we should NOT
* print.
* @param fmt Printf-style format list
* @param ... Printf-style varargs
*
* @return The POSIX error number associated with the exception
* object.
*/
static int printJsonException(struct jsonException *exc, int noPrintFlags,
const char *fmt, ...)
{ {
va_list ap; va_list ap;
int ret; int ret = 0;
va_start(ap, fmt); va_start(ap, fmt);
ret = printJsonExceptionV(exc, noPrintFlags, fmt, ap); ret = printJsonExceptionV(exc, noPrintFlags, fmt, ap);
@ -86,81 +101,20 @@ int printJsonException(struct jsonException *exc, int noPrintFlags,
return ret; return ret;
} }
static hdfsFileInfo *json_parse_array(json_t *jobj, char *key, hdfsFileInfo *fileStat, int *numEntries, const char *operation) { /** Parse the exception information from JSON */
int arraylen = json_array_size(jobj); //Getting the length of the array static struct jsonException *parseJsonException(json_t *jobj)
*numEntries = arraylen; {
if (!key) { const char *key = NULL;
return NULL; json_t *value = NULL;
}
if(arraylen > 0) {
fileStat = (hdfsFileInfo *)realloc(fileStat,sizeof(hdfsFileInfo)*arraylen);
}
json_t *jvalue;
int i;
for (i=0; i< arraylen; i++) {
jvalue = json_array_get(jobj, i); //Getting the array element at position i
if (json_is_array(jvalue)) { // array within an array - program should never come here for now
json_parse_array(jvalue, NULL, &fileStat[i], numEntries, operation);
}
else if (json_is_object(jvalue)) { // program will definitely come over here
parseJsonGFS(jvalue, &fileStat[i], numEntries, operation);
}
else {
return NULL; // program will never come over here for now
}
}
*numEntries = arraylen;
return fileStat;
}
int parseBoolean(char *response) {
json_t *root;
json_error_t error;
size_t flags = 0;
int result = 0;
const char *key;
json_t *value;
root = json_loads(response, flags, &error);
void *iter = json_object_iter(root);
while(iter) {
key = json_object_iter_key(iter);
value = json_object_iter_value(iter);
switch (json_typeof(value)) {
case JSON_TRUE:
result = 1;
break;
default:
result = 0;
break;
}
iter = json_object_iter_next(root, iter);
}
return result;
}
int parseMKDIR(char *response) {
return (parseBoolean(response));
}
int parseRENAME(char *response) {
return (parseBoolean(response));
}
int parseDELETE(char *response) {
return (parseBoolean(response));
}
struct jsonException *parseJsonException(json_t *jobj) {
const char *key;
json_t *value;
struct jsonException *exception = NULL; struct jsonException *exception = NULL;
void *iter = NULL;
exception = calloc(1, sizeof(*exception)); exception = calloc(1, sizeof(*exception));
if (!exception) { if (!exception) {
return NULL; return NULL;
} }
void *iter = json_object_iter(jobj); iter = json_object_iter(jobj);
while (iter) { while (iter) {
key = json_object_iter_key(iter); key = json_object_iter_key(iter);
value = json_object_iter_value(iter); value = json_object_iter_value(iter);
@ -175,23 +129,31 @@ struct jsonException *parseJsonException(json_t *jobj) {
iter = json_object_iter_next(jobj, iter); iter = json_object_iter_next(jobj, iter);
} }
return exception; return exception;
} }
struct jsonException *parseException(const char *content) { /**
* Parse the exception information which is presented in JSON
*
* @param content Exception information in JSON
* @return jsonException for printing out
*/
static struct jsonException *parseException(const char *content)
{
json_error_t error;
size_t flags = 0;
const char *key = NULL;
json_t *value;
json_t *jobj;
struct jsonException *exception = NULL;
if (!content) { if (!content) {
return NULL; return NULL;
} }
jobj = json_loads(content, flags, &error);
json_error_t error;
size_t flags = 0;
const char *key;
json_t *value;
json_t *jobj = json_loads(content, flags, &error);
if (!jobj) { if (!jobj) {
fprintf(stderr, "JSon parsing failed\n"); fprintf(stderr, "JSon parsing error: on line %d: %s\n",
error.line, error.text);
return NULL; return NULL;
} }
void *iter = json_object_iter(jobj); void *iter = json_object_iter(jobj);
@ -199,254 +161,503 @@ struct jsonException *parseException(const char *content) {
key = json_object_iter_key(iter); key = json_object_iter_key(iter);
value = json_object_iter_value(iter); value = json_object_iter_value(iter);
if (!strcmp(key, "RemoteException") && json_typeof(value) == JSON_OBJECT) { if (!strcmp(key, "RemoteException") &&
return parseJsonException(value); json_typeof(value) == JSON_OBJECT) {
exception = parseJsonException(value);
break;
} }
iter = json_object_iter_next(jobj, iter); iter = json_object_iter_next(jobj, iter);
} }
return NULL;
json_decref(jobj);
return exception;
} }
static hdfsFileInfo *parseJsonGFS(json_t *jobj, hdfsFileInfo *fileStat, /**
int *numEntries, const char *operation) * Parse the response information which uses TRUE/FALSE
* to indicate whether the operation succeeded
*
* @param response Response information
* @return 0 to indicate success
*/
static int parseBoolean(const char *response)
{ {
const char *tempstr; json_t *root, *value;
const char *key; json_error_t error;
json_t *value; size_t flags = 0;
void *iter = json_object_iter(jobj); int result = 0;
while(iter) {
key = json_object_iter_key(iter); root = json_loads(response, flags, &error);
value = json_object_iter_value(iter); if (!root) {
fprintf(stderr, "JSon parsing error: on line %d: %s\n",
switch (json_typeof(value)) { error.line, error.text);
case JSON_INTEGER: return EIO;
if(!strcmp(key,"accessTime")) {
fileStat->mLastAccess = (time_t)(json_integer_value(value)/1000);
} else if (!strcmp(key,"blockSize")) {
fileStat->mBlockSize = (tOffset)json_integer_value(value);
} else if (!strcmp(key,"length")) {
fileStat->mSize = (tOffset)json_integer_value(value);
} else if(!strcmp(key,"modificationTime")) {
fileStat->mLastMod = (time_t)(json_integer_value(value)/1000);
} else if (!strcmp(key,"replication")) {
fileStat->mReplication = (short)json_integer_value(value);
}
break;
case JSON_STRING:
if(!strcmp(key,"group")) {
fileStat->mGroup=(char *)json_string_value(value);
} else if (!strcmp(key,"owner")) {
fileStat->mOwner=(char *)json_string_value(value);
} else if (!strcmp(key,"pathSuffix")) {
fileStat->mName=(char *)json_string_value(value);
} else if (!strcmp(key,"permission")) {
tempstr=(char *)json_string_value(value);
fileStat->mPermissions = (short)strtol(tempstr,(char **)NULL,8);
} else if (!strcmp(key,"type")) {
char *cvalue = (char *)json_string_value(value);
if (!strcmp(cvalue, "DIRECTORY")) {
fileStat->mKind = kObjectKindDirectory;
} else {
fileStat->mKind = kObjectKindFile;
}
}
break;
case JSON_OBJECT:
if(!strcmp(key,"FileStatus")) {
parseJsonGFS(value, fileStat, numEntries, operation);
} else if (!strcmp(key,"FileStatuses")) {
fileStat = parseJsonGFS(value, &fileStat[0], numEntries, operation);
} else if (!strcmp(key,"RemoteException")) {
//Besides returning NULL, we also need to print the exception information
struct jsonException *exception = parseJsonException(value);
if (exception) {
errno = printJsonException(exception, PRINT_EXC_ALL, "Calling WEBHDFS (%s)", operation);
}
if(fileStat != NULL) {
free(fileStat);
fileStat = NULL;
}
}
break;
case JSON_ARRAY:
if (!strcmp(key,"FileStatus")) {
fileStat = json_parse_array(value,(char *) key,fileStat,numEntries, operation);
}
break;
default:
if(fileStat != NULL) {
free(fileStat);
fileStat = NULL;
}
}
iter = json_object_iter_next(jobj, iter);
} }
return fileStat; void *iter = json_object_iter(root);
value = json_object_iter_value(iter);
if (json_typeof(value) == JSON_TRUE) {
result = 0;
} else {
result = EIO; // FALSE means error in remote NN/DN
}
json_decref(root);
return result;
} }
int parseMKDIR(const char *response)
{
return parseBoolean(response);
}
int checkHeader(char *header, const char *content, const char *operation) { int parseRENAME(const char *response)
{
return parseBoolean(response);
}
int parseDELETE(const char *response)
{
return parseBoolean(response);
}
int parseSETREPLICATION(const char *response)
{
return parseBoolean(response);
}
/**
* Check the header of response to see if it's 200 OK
*
* @param header Header information for checking
* @param content Stores exception information if there are errors
* @param operation Indicate the operation for exception printing
* @return 0 for success
*/
static int checkHeader(const char *header, const char *content,
const char *operation)
{
char *result = NULL; char *result = NULL;
char delims[] = ":"; const char delims[] = ":";
char *responseCode= "200 OK"; char *savepter;
if(header == '\0' || strncmp(header, "HTTP/", strlen("HTTP/"))) { int ret = 0;
return 0;
if (!header || strncmp(header, "HTTP/", strlen("HTTP/"))) {
return EINVAL;
} }
if(!(strstr(header, responseCode)) || !(header = strstr(header, "Content-Length"))) { if (!(strstr(header, twoHundredOKCode)) ||
!(result = strstr(header, "Content-Length"))) {
struct jsonException *exc = parseException(content); struct jsonException *exc = parseException(content);
if (exc) { if (exc) {
errno = printJsonException(exc, PRINT_EXC_ALL, "Calling WEBHDFS (%s)", operation); ret = printJsonException(exc, PRINT_EXC_ALL,
"Calling WEBHDFS (%s)", operation);
} else {
ret = EIO;
} }
return 0; return ret;
} }
result = strtok(header, delims); result = strtok_r(result, delims, &savepter);
result = strtok(NULL,delims); result = strtok_r(NULL, delims, &savepter);
while (isspace(*result)) { while (isspace(*result)) {
result++; result++;
} }
if(strcmp(result,"0")) { //Content-Length should be equal to 0 // Content-Length should be equal to 0,
return 1; // and the string should be "0\r\nServer"
} else { if (strncmp(result, "0\r\n", 3)) {
return 0; ret = EIO;
} }
return ret;
} }
int parseOPEN(const char *header, const char *content) { int parseCHMOD(const char *header, const char *content)
const char *responseCode1 = "307 TEMPORARY_REDIRECT"; {
const char *responseCode2 = "200 OK";
if(header == '\0' || strncmp(header,"HTTP/",strlen("HTTP/"))) {
return -1;
}
if(!(strstr(header,responseCode1) && strstr(header, responseCode2))) {
struct jsonException *exc = parseException(content);
if (exc) {
//if the exception is an IOException and it is because the offset is out of the range
//do not print out the exception
if (!strcasecmp(exc->exception, "IOException") && strstr(exc->message, "out of the range")) {
return 0;
}
errno = printJsonException(exc, PRINT_EXC_ALL, "Calling WEBHDFS (OPEN)");
}
return -1;
}
return 1;
}
int parseCHMOD(char *header, const char *content) {
return checkHeader(header, content, "CHMOD"); return checkHeader(header, content, "CHMOD");
} }
int parseCHOWN(const char *header, const char *content)
int parseCHOWN(char *header, const char *content) { {
return checkHeader(header, content, "CHOWN"); return checkHeader(header, content, "CHOWN");
} }
int parseUTIMES(char *header, const char *content) { int parseUTIMES(const char *header, const char *content)
return checkHeader(header, content, "UTIMES"); {
return checkHeader(header, content, "SETTIMES");
} }
/**
int checkIfRedirect(const char *const headerstr, const char *content, const char *operation) { * Check if the header contains correct information
char *responseCode = "307 TEMPORARY_REDIRECT"; * ("307 TEMPORARY_REDIRECT" and "Location")
char * locTag = "Location"; *
char * tempHeader; * @param header Header for parsing
if(headerstr == '\0' || strncmp(headerstr,"HTTP/", 5)) { * @param content Contains exception information
return 0; * if the remote operation failed
* @param operation Specify the remote operation when printing out exception
* @return 0 for success
*/
static int checkRedirect(const char *header,
const char *content, const char *operation)
{
const char *locTag = "Location";
int ret = 0, offset = 0;
// The header must start with "HTTP/1.1"
if (!header || strncmp(header, httpHeaderString,
strlen(httpHeaderString))) {
return EINVAL;
} }
if(!(tempHeader = strstr(headerstr,responseCode))) {
//process possible exception information offset += strlen(httpHeaderString);
while (isspace(header[offset])) {
offset++;
}
// Looking for "307 TEMPORARY_REDIRECT" in header
if (strncmp(header + offset, temporaryRedirectCode,
strlen(temporaryRedirectCode))) {
// Process possible exception information
struct jsonException *exc = parseException(content); struct jsonException *exc = parseException(content);
if (exc) { if (exc) {
errno = printJsonException(exc, PRINT_EXC_ALL, "Calling WEBHDFS (%s)", operation); ret = printJsonException(exc, PRINT_EXC_ALL,
"Calling WEBHDFS (%s)", operation);
} else {
ret = EIO;
} }
return 0; return ret;
} }
if(!(strstr(tempHeader,locTag))) { // Here we just simply check if header contains "Location" tag,
return 0; // detailed processing is in parseDnLoc
if (!(strstr(header, locTag))) {
ret = EIO;
} }
return 1; return ret;
} }
int parseNnWRITE(const char *header, const char *content)
int parseNnWRITE(const char *header, const char *content) { {
return checkIfRedirect(header, content, "Write(NameNode)"); return checkRedirect(header, content, "Write(NameNode)");
} }
int parseNnAPPEND(const char *header, const char *content)
int parseNnAPPEND(const char *header, const char *content) { {
return checkIfRedirect(header, content, "Append(NameNode)"); return checkRedirect(header, content, "Append(NameNode)");
} }
char *parseDnLoc(char *content) { /** 0 for success , -1 for out of range, other values for error */
char delims[] = "\r\n"; int parseOPEN(const char *header, const char *content)
char *url = NULL; {
char *DnLocation = NULL; int ret = 0, offset = 0;
char *savepter;
DnLocation = strtok_r(content, delims, &savepter); if (!header || strncmp(header, httpHeaderString,
while (DnLocation && strncmp(DnLocation, "Location:", strlen("Location:"))) { strlen(httpHeaderString))) {
DnLocation = strtok_r(NULL, delims, &savepter); return EINVAL;
} }
if (!DnLocation) {
return NULL; offset += strlen(httpHeaderString);
while (isspace(header[offset])) {
offset++;
} }
DnLocation = strstr(DnLocation, "http"); if (strncmp(header + offset, temporaryRedirectCode,
if (!DnLocation) { strlen(temporaryRedirectCode)) ||
return NULL; !strstr(header, twoHundredOKCode)) {
struct jsonException *exc = parseException(content);
if (exc) {
// If the exception is an IOException and it is because
// the offset is out of the range, do not print out the exception
if (!strcasecmp(exc->exception, "IOException") &&
strstr(exc->message, "out of the range")) {
ret = -1;
} else {
ret = printJsonException(exc, PRINT_EXC_ALL,
"Calling WEBHDFS (OPEN)");
}
} else {
ret = EIO;
}
} }
url = malloc(strlen(DnLocation) + 1); return ret;
}
int parseDnLoc(char *content, char **dn)
{
char *url = NULL, *dnLocation = NULL, *savepter, *tempContent;
const char *prefix = "Location: http://";
const char *prefixToRemove = "Location: ";
const char *delims = "\r\n";
tempContent = strdup(content);
if (!tempContent) {
return ENOMEM;
}
dnLocation = strtok_r(tempContent, delims, &savepter);
while (dnLocation && strncmp(dnLocation, "Location:",
strlen("Location:"))) {
dnLocation = strtok_r(NULL, delims, &savepter);
}
if (!dnLocation) {
return EIO;
}
while (isspace(*dnLocation)) {
dnLocation++;
}
if (strncmp(dnLocation, prefix, strlen(prefix))) {
return EIO;
}
url = strdup(dnLocation + strlen(prefixToRemove));
if (!url) { if (!url) {
return NULL; return ENOMEM;
} }
strcpy(url, DnLocation); *dn = url;
return url; return 0;
} }
int parseDnWRITE(const char *header, const char *content) { int parseDnWRITE(const char *header, const char *content)
char *responseCode = "201 Created"; {
fprintf(stderr, "\nheaderstr is: %s\n", header); int ret = 0;
if(header == '\0' || strncmp(header,"HTTP/",strlen("HTTP/"))) { if (header == NULL || header[0] == '\0' ||
return 0; strncmp(header, "HTTP/", strlen("HTTP/"))) {
return EINVAL;
} }
if(!(strstr(header,responseCode))) { if (!(strstr(header, twoHundredOneCreatedCode))) {
struct jsonException *exc = parseException(content); struct jsonException *exc = parseException(content);
if (exc) { if (exc) {
errno = printJsonException(exc, PRINT_EXC_ALL, "Calling WEBHDFS (WRITE(DataNode))"); ret = printJsonException(exc, PRINT_EXC_ALL,
"Calling WEBHDFS (WRITE(DataNode))");
} else {
ret = EIO;
} }
return 0;
} }
return 1; return ret;
} }
int parseDnAPPEND(const char *header, const char *content) { int parseDnAPPEND(const char *header, const char *content)
char *responseCode = "200 OK"; {
if(header == '\0' || strncmp(header, "HTTP/", strlen("HTTP/"))) { int ret = 0;
return 0;
if (header == NULL || header[0] == '\0' ||
strncmp(header, "HTTP/", strlen("HTTP/"))) {
return EINVAL;
} }
if(!(strstr(header, responseCode))) { if (!(strstr(header, twoHundredOKCode))) {
struct jsonException *exc = parseException(content); struct jsonException *exc = parseException(content);
if (exc) { if (exc) {
errno = printJsonException(exc, PRINT_EXC_ALL, "Calling WEBHDFS (APPEND(DataNode))"); ret = printJsonException(exc, PRINT_EXC_ALL,
"Calling WEBHDFS (APPEND(DataNode))");
} else {
ret = EIO;
} }
return 0;
} }
return 1; return ret;
} }
hdfsFileInfo *parseGFS(char *str, hdfsFileInfo *fileStat, int *numEntries) { /**
* Retrieve file status from the JSON object
*
* @param jobj JSON object for parsing, which contains
* file status information
* @param fileStat hdfsFileInfo handle to hold file status information
* @return 0 on success
*/
static int parseJsonForFileStatus(json_t *jobj, hdfsFileInfo *fileStat)
{
const char *key, *tempstr;
json_t *value;
void *iter = NULL;
iter = json_object_iter(jobj);
while (iter) {
key = json_object_iter_key(iter);
value = json_object_iter_value(iter);
if (!strcmp(key, "accessTime")) {
// json field contains time in milliseconds,
// hdfsFileInfo is counted in seconds
fileStat->mLastAccess = json_integer_value(value) / 1000;
} else if (!strcmp(key, "blockSize")) {
fileStat->mBlockSize = json_integer_value(value);
} else if (!strcmp(key, "length")) {
fileStat->mSize = json_integer_value(value);
} else if (!strcmp(key, "modificationTime")) {
fileStat->mLastMod = json_integer_value(value) / 1000;
} else if (!strcmp(key, "replication")) {
fileStat->mReplication = json_integer_value(value);
} else if (!strcmp(key, "group")) {
fileStat->mGroup = strdup(json_string_value(value));
if (!fileStat->mGroup) {
return ENOMEM;
}
} else if (!strcmp(key, "owner")) {
fileStat->mOwner = strdup(json_string_value(value));
if (!fileStat->mOwner) {
return ENOMEM;
}
} else if (!strcmp(key, "pathSuffix")) {
fileStat->mName = strdup(json_string_value(value));
if (!fileStat->mName) {
return ENOMEM;
}
} else if (!strcmp(key, "permission")) {
tempstr = json_string_value(value);
fileStat->mPermissions = (short) strtol(tempstr, NULL, 8);
} else if (!strcmp(key, "type")) {
tempstr = json_string_value(value);
if (!strcmp(tempstr, "DIRECTORY")) {
fileStat->mKind = kObjectKindDirectory;
} else {
fileStat->mKind = kObjectKindFile;
}
}
// Go to the next key-value pair in the json object
iter = json_object_iter_next(jobj, iter);
}
return 0;
}
int parseGFS(const char *response, hdfsFileInfo *fileStat, int printError)
{
int ret = 0, printFlag;
json_error_t error; json_error_t error;
size_t flags = 0; size_t flags = 0;
json_t *jobj = json_loads(str, flags, &error); json_t *jobj, *value;
fileStat = parseJsonGFS(jobj, fileStat, numEntries, "GETPATHSTATUS/LISTSTATUS"); const char *key;
return fileStat; void *iter = NULL;
if (!response || !fileStat) {
return EIO;
}
jobj = json_loads(response, flags, &error);
if (!jobj) {
fprintf(stderr, "error while parsing json: on line %d: %s\n",
error.line, error.text);
return EIO;
}
iter = json_object_iter(jobj);
key = json_object_iter_key(iter);
value = json_object_iter_value(iter);
if (json_typeof(value) == JSON_OBJECT) {
if (!strcmp(key, "RemoteException")) {
struct jsonException *exception = parseJsonException(value);
if (exception) {
if (printError) {
printFlag = PRINT_EXC_ALL;
} else {
printFlag = NOPRINT_EXC_FILE_NOT_FOUND |
NOPRINT_EXC_ACCESS_CONTROL |
NOPRINT_EXC_PARENT_NOT_DIRECTORY;
}
ret = printJsonException(exception, printFlag,
"Calling WEBHDFS GETFILESTATUS");
} else {
ret = EIO;
}
} else if (!strcmp(key, "FileStatus")) {
ret = parseJsonForFileStatus(value, fileStat);
} else {
ret = EIO;
}
} else {
ret = EIO;
}
json_decref(jobj);
return ret;
} }
int parseSETREPLICATION(char *response) { /**
return (parseBoolean(response)); * Parse the JSON array. Called to parse the result of
* the LISTSTATUS operation. Thus each element of the JSON array is
* a JSON object with the information of a file entry contained
* in the folder.
*
* @param jobj The JSON array to be parsed
* @param fileStat The hdfsFileInfo handle used to
* store a group of file information
* @param numEntries Capture the number of files in the folder
* @return 0 for success
*/
static int parseJsonArrayForFileStatuses(json_t *jobj, hdfsFileInfo **fileStat,
int *numEntries)
{
json_t *jvalue = NULL;
int i = 0, ret = 0, arraylen = 0;
hdfsFileInfo *fileInfo = NULL;
arraylen = (int) json_array_size(jobj);
if (arraylen > 0) {
fileInfo = calloc(arraylen, sizeof(hdfsFileInfo));
if (!fileInfo) {
return ENOMEM;
}
}
for (i = 0; i < arraylen; i++) {
//Getting the array element at position i
jvalue = json_array_get(jobj, i);
if (json_is_object(jvalue)) {
ret = parseJsonForFileStatus(jvalue, &fileInfo[i]);
if (ret) {
goto done;
}
} else {
ret = EIO;
goto done;
}
}
done:
if (ret) {
free(fileInfo);
} else {
*numEntries = arraylen;
*fileStat = fileInfo;
}
return ret;
} }
int parseLS(const char *response, hdfsFileInfo **fileStats, int *numOfEntries)
{
int ret = 0;
json_error_t error;
size_t flags = 0;
json_t *jobj, *value;
const char *key;
void *iter = NULL;
if (!response || response[0] == '\0' || !fileStats) {
return EIO;
}
jobj = json_loads(response, flags, &error);
if (!jobj) {
fprintf(stderr, "error while parsing json: on line %d: %s\n",
error.line, error.text);
return EIO;
}
iter = json_object_iter(jobj);
key = json_object_iter_key(iter);
value = json_object_iter_value(iter);
if (json_typeof(value) == JSON_OBJECT) {
if (!strcmp(key, "RemoteException")) {
struct jsonException *exception = parseJsonException(value);
if (exception) {
ret = printJsonException(exception, PRINT_EXC_ALL,
"Calling WEBHDFS GETFILESTATUS");
} else {
ret = EIO;
}
} else if (!strcmp(key, "FileStatuses")) {
iter = json_object_iter(value);
value = json_object_iter_value(iter);
if (json_is_array(value)) {
ret = parseJsonArrayForFileStatuses(value, fileStats,
numOfEntries);
} else {
ret = EIO;
}
} else {
ret = EIO;
}
} else {
ret = EIO;
}
json_decref(jobj);
return ret;
}

View File

@ -18,41 +18,161 @@
#ifndef _HDFS_JSON_PARSER_H_ #ifndef _HDFS_JSON_PARSER_H_
#define _HDFS_JSON_PARSER_H_ #define _HDFS_JSON_PARSER_H_
struct jsonException; /**
* Parse the response for MKDIR request. The response uses TRUE/FALSE
* to indicate whether the operation succeeded.
*
* @param response The response information to parse.
* @return 0 for success
*/
int parseMKDIR(const char *response);
/** /**
* Print out JSON exception information. * Parse the response for RENAME request. The response uses TRUE/FALSE
* to indicate whether the operation succeeded.
* *
* @param exc The exception information to print and free * @param response The response information to parse.
* @param noPrintFlags Flags which determine which exceptions we should NOT * @return 0 for success
* print.
* @param fmt Printf-style format list
* @param ... Printf-style varargs
*
* @return The POSIX error number associated with the exception
* object.
*/ */
int printJsonException(struct jsonException *exc, int noPrintFlags, int parseRENAME(const char *response);
const char *fmt, ...);
int parseMKDIR(char *response); /**
int parseRENAME(char *response); * Parse the response for DELETE request. The response uses TRUE/FALSE
int parseDELETE (char *response); * to indicate whether the operation succeeded.
int parseSETREPLICATION(char *response); *
* @param response The response information to parse.
* @return 0 for success
*/
int parseDELETE(const char *response);
/**
* Parse the response for SETREPLICATION request. The response uses TRUE/FALSE
* to indicate whether the operation succeeded.
*
* @param response The response information to parse.
* @return 0 for success
*/
int parseSETREPLICATION(const char *response);
/**
* Parse the response for OPEN (read) request. A successful operation
* will return "200 OK".
*
* @param response The response information for parsing
* @return 0 for success , -1 for out of range, other values for error
*/
int parseOPEN(const char *header, const char *content); int parseOPEN(const char *header, const char *content);
/**
* Parse the response for WRITE (from NameNode) request.
* A successful operation should return "307 TEMPORARY_REDIRECT" in its header.
*
* @param header The header of the http response
* @param content If failing, the exception message
* sent from NameNode is stored in content
* @return 0 for success
*/
int parseNnWRITE(const char *header, const char *content); int parseNnWRITE(const char *header, const char *content);
/**
* Parse the response for WRITE (from DataNode) request.
* A successful operation should return "201 Created" in its header.
*
* @param header The header of the http response
* @param content If failing, the exception message
* sent from DataNode is stored in content
* @return 0 for success
*/
int parseDnWRITE(const char *header, const char *content); int parseDnWRITE(const char *header, const char *content);
/**
* Parse the response for APPEND (sent from NameNode) request.
* A successful operation should return "307 TEMPORARY_REDIRECT" in its header.
*
* @param header The header of the http response
* @param content If failing, the exception message
* sent from NameNode is stored in content
* @return 0 for success
*/
int parseNnAPPEND(const char *header, const char *content); int parseNnAPPEND(const char *header, const char *content);
/**
* Parse the response for APPEND (from DataNode) request.
* A successful operation should return "200 OK" in its header.
*
* @param header The header of the http response
* @param content If failing, the exception message
* sent from DataNode is stored in content
* @return 0 for success
*/
int parseDnAPPEND(const char *header, const char *content); int parseDnAPPEND(const char *header, const char *content);
char* parseDnLoc(char *content); /**
* Parse the response (from NameNode) to get the location information
* of the DataNode that should be contacted for the following write operation.
*
* @param content Content of the http header
* @param dn To store the location of the DataNode for writing
* @return 0 for success
*/
int parseDnLoc(char *content, char **dn) __attribute__ ((warn_unused_result));
hdfsFileInfo *parseGFS(char *response, hdfsFileInfo *fileStat, int *numEntries); /**
* Parse the response for GETFILESTATUS operation.
*
* @param response Response to parse. Its detailed format is specified in
* "http://hadoop.apache.org/docs/stable/webhdfs.html#GETFILESTATUS"
* @param fileStat A hdfsFileInfo handle for holding file information
* @param printError Whether or not print out exception
* when file does not exist
* @return 0 for success, non-zero value to indicate error
*/
int parseGFS(const char *response, hdfsFileInfo *fileStat, int printError);
int parseCHOWN (char *header, const char *content); /**
int parseCHMOD (char *header, const char *content); * Parse the response for LISTSTATUS operation.
int parseUTIMES(char *header, const char *content); *
* @param response Response to parse. Its detailed format is specified in
* "http://hadoop.apache.org/docs/r1.0.3/webhdfs.html#LISTSTATUS"
* @param fileStats Pointer pointing to a list of hdfsFileInfo handles
* holding file/dir information in the directory
* @param numEntries After parsing, the value of this parameter indicates
* the number of file entries.
* @return 0 for success, non-zero value to indicate error
*/
int parseLS(const char *response, hdfsFileInfo **fileStats, int *numOfEntries);
#endif //_FUSE_JSON_PARSER_H /**
* Parse the response for CHOWN request.
* A successful operation should contains "200 OK" in its header,
* and the Content-Length should be 0.
*
* @param header The header of the http response
* @param content If failing, the exception message is stored in content
* @return 0 for success
*/
int parseCHOWN(const char *header, const char *content);
/**
* Parse the response for CHMOD request.
* A successful operation should contains "200 OK" in its header,
* and the Content-Length should be 0.
*
* @param header The header of the http response
* @param content If failing, the exception message is stored in content
* @return 0 for success
*/
int parseCHMOD(const char *header, const char *content);
/**
* Parse the response for SETTIMES request.
* A successful operation should contains "200 OK" in its header,
* and the Content-Length should be 0.
*
* @param header The header of the http response
* @param content If failing, the exception message is stored in content
* @return 0 for success
*/
int parseUTIMES(const char *header, const char *content);
#endif //_HDFS_JSON_PARSER_H_

View File

@ -146,6 +146,7 @@ static int hashTableInit(void)
if (hcreate(MAX_HASH_TABLE_ELEM) == 0) { if (hcreate(MAX_HASH_TABLE_ELEM) == 0) {
fprintf(stderr, "error creating hashtable, <%d>: %s\n", fprintf(stderr, "error creating hashtable, <%d>: %s\n",
errno, strerror(errno)); errno, strerror(errno));
UNLOCK_HASH_TABLE();
return 0; return 0;
} }
hashTableInited = 1; hashTableInited = 1;