HDFS-3916. Merge 1403922 from trunk.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1403925 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ef6b842f0e
commit
c092ee4c4c
|
@ -101,6 +101,8 @@ Release 2.0.3-alpha - Unreleased
|
|||
HDFS-3809. Make BKJM use protobufs for all serialization with ZK.
|
||||
(Ivan Kelly via umamhesh)
|
||||
|
||||
HDFS-3916. libwebhdfs testing code cleanup. (Jing Zhao via suresh)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -1,180 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "expect.h"
|
||||
#include "hdfs.h"
|
||||
|
||||
#include <errno.h>
|
||||
#include <semaphore.h>
|
||||
#include <pthread.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include "hdfs_http_client.h"
|
||||
#include "hdfs_http_query.h"
|
||||
#include "hdfs_json_parser.h"
|
||||
#include <unistd.h>
|
||||
#include <curl/curl.h>
|
||||
|
||||
#define TLH_MAX_THREADS 100
|
||||
|
||||
static sem_t *tlhSem;
|
||||
|
||||
static const char *nn;
|
||||
static const char *user;
|
||||
static int port;
|
||||
|
||||
static const char *fileName = "/tmp/tlhData";
|
||||
|
||||
struct tlhThreadInfo {
|
||||
/** Thread index */
|
||||
int threadIdx;
|
||||
/** 0 = thread was successful; error code otherwise */
|
||||
int success;
|
||||
/** pthread identifier */
|
||||
pthread_t thread;
|
||||
};
|
||||
|
||||
static int hdfsSingleNameNodeConnect(const char *nn, int port, const char *user, hdfsFS *fs)
|
||||
{
|
||||
hdfsFS hdfs;
|
||||
if (port < 0) {
|
||||
fprintf(stderr, "hdfsSingleNameNodeConnect: nmdGetNameNodePort "
|
||||
"returned error %d\n", port);
|
||||
return port;
|
||||
}
|
||||
|
||||
hdfs = hdfsConnectAsUserNewInstance(nn, port, user);
|
||||
if (!hdfs) {
|
||||
return -errno;
|
||||
}
|
||||
*fs = hdfs;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs)
|
||||
{
|
||||
hdfsFile file;
|
||||
int ret = 0;
|
||||
char buffer[1024 * (ti->threadIdx + 1)];
|
||||
memset(buffer, 'a', sizeof(buffer));
|
||||
|
||||
file = hdfsOpenFile(fs, "/tmp/thread_test.txt", O_WRONLY, 0, 0, 0);
|
||||
sleep(1);
|
||||
hdfsCloseFile(fs, file);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void *testHdfsOperations(void *v)
|
||||
{
|
||||
struct tlhThreadInfo *ti = (struct tlhThreadInfo*)v;
|
||||
hdfsFS fs = NULL;
|
||||
int ret;
|
||||
|
||||
fprintf(stderr, "testHdfsOperations(threadIdx=%d): starting\n",
|
||||
ti->threadIdx);
|
||||
ret = hdfsSingleNameNodeConnect(nn, port, user, &fs);
|
||||
if (ret) {
|
||||
fprintf(stderr, "testHdfsOperations(threadIdx=%d): "
|
||||
"hdfsSingleNameNodeConnect failed with error %d.\n",
|
||||
ti->threadIdx, ret);
|
||||
ti->success = EIO;
|
||||
return NULL;
|
||||
}
|
||||
ti->success = doTestHdfsOperations(ti, fs);
|
||||
if (hdfsDisconnect(fs)) {
|
||||
ret = errno;
|
||||
fprintf(stderr, "hdfsDisconnect error %d\n", ret);
|
||||
ti->success = ret;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int checkFailures(struct tlhThreadInfo *ti, int tlhNumThreads)
|
||||
{
|
||||
int i, threadsFailed = 0;
|
||||
const char *sep = "";
|
||||
|
||||
for (i = 0; i < tlhNumThreads; i++) {
|
||||
if (ti[i].success != 0) {
|
||||
threadsFailed = 1;
|
||||
}
|
||||
}
|
||||
if (!threadsFailed) {
|
||||
fprintf(stderr, "testLibHdfs: all threads succeeded. SUCCESS.\n");
|
||||
return EXIT_SUCCESS;
|
||||
}
|
||||
fprintf(stderr, "testLibHdfs: some threads failed: [");
|
||||
for (i = 0; i < tlhNumThreads; i++) {
|
||||
if (ti[i].success != 0) {
|
||||
fprintf(stderr, "%s%d", sep, i);
|
||||
sep = ", ";
|
||||
}
|
||||
}
|
||||
fprintf(stderr, "]. FAILURE.\n");
|
||||
return EXIT_FAILURE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that we can write a file with libhdfs and then read it back
|
||||
*/
|
||||
int main(int argc, const char *args[])
|
||||
{
|
||||
if (argc != 4) {
|
||||
fprintf(stderr, "usage: test_libhdfs_threaded <namenode> <port> <username>");
|
||||
return -1;
|
||||
}
|
||||
|
||||
nn = args[1];
|
||||
port = atoi(args[2]);
|
||||
user = args[3];
|
||||
|
||||
int i, tlhNumThreads;
|
||||
const char *tlhNumThreadsStr;
|
||||
struct tlhThreadInfo ti[TLH_MAX_THREADS];
|
||||
|
||||
tlhNumThreadsStr = getenv("TLH_NUM_THREADS");
|
||||
if (!tlhNumThreadsStr) {
|
||||
tlhNumThreadsStr = "3";
|
||||
}
|
||||
tlhNumThreads = atoi(tlhNumThreadsStr);
|
||||
if ((tlhNumThreads <= 0) || (tlhNumThreads > TLH_MAX_THREADS)) {
|
||||
fprintf(stderr, "testLibHdfs: must have a number of threads "
|
||||
"between 1 and %d inclusive, not %d\n",
|
||||
TLH_MAX_THREADS, tlhNumThreads);
|
||||
return EXIT_FAILURE;
|
||||
}
|
||||
memset(&ti[0], 0, sizeof(ti));
|
||||
for (i = 0; i < tlhNumThreads; i++) {
|
||||
ti[i].threadIdx = i;
|
||||
}
|
||||
|
||||
tlhSem = sem_open("sem", O_CREAT, 0644, tlhNumThreads);
|
||||
|
||||
for (i = 0; i < tlhNumThreads; i++) {
|
||||
fprintf(stderr, "\ncreating thread %d\n", i);
|
||||
EXPECT_ZERO(pthread_create(&ti[i].thread, NULL,
|
||||
testHdfsOperations, &ti[i]));
|
||||
}
|
||||
for (i = 0; i < tlhNumThreads; i++) {
|
||||
EXPECT_ZERO(pthread_join(ti[i].thread, NULL));
|
||||
}
|
||||
|
||||
EXPECT_ZERO(sem_close(tlhSem));
|
||||
return checkFailures(ti, tlhNumThreads);
|
||||
}
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
|
||||
#include "hdfs.h"
|
||||
#include "native_mini_dfs.h"
|
||||
|
||||
#include <inttypes.h>
|
||||
#include <jni.h>
|
||||
|
@ -26,228 +27,254 @@
|
|||
#include <time.h>
|
||||
#include <unistd.h>
|
||||
|
||||
void permission_disp(short permissions, char *rtr) {
|
||||
static struct NativeMiniDfsCluster *cluster;
|
||||
|
||||
void permission_disp(short permissions, char *rtr)
|
||||
{
|
||||
rtr[9] = '\0';
|
||||
int i;
|
||||
for(i=2;i>=0;i--)
|
||||
short perm;
|
||||
for(i = 2; i >= 0; i--)
|
||||
{
|
||||
short permissionsId = permissions >> (i * 3) & (short)7;
|
||||
char* perm;
|
||||
switch(permissionsId) {
|
||||
case 7:
|
||||
perm = "rwx"; break;
|
||||
case 6:
|
||||
perm = "rw-"; break;
|
||||
case 5:
|
||||
perm = "r-x"; break;
|
||||
case 4:
|
||||
perm = "r--"; break;
|
||||
case 3:
|
||||
perm = "-wx"; break;
|
||||
case 2:
|
||||
perm = "-w-"; break;
|
||||
case 1:
|
||||
perm = "--x"; break;
|
||||
case 0:
|
||||
perm = "---"; break;
|
||||
default:
|
||||
perm = "???";
|
||||
}
|
||||
strncpy(rtr, perm, 3);
|
||||
rtr+=3;
|
||||
perm = permissions >> (i * 3);
|
||||
rtr[0] = perm & 4 ? 'r' : '-';
|
||||
rtr[1] = perm & 2 ? 'w' : '-';
|
||||
rtr[2] = perm & 1 ? 'x' : '-';
|
||||
rtr += 3;
|
||||
}
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
if (argc != 2) {
|
||||
fprintf(stderr, "usage: test_libwebhdfs_ops <username>\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
char buffer[32];
|
||||
tSize num_written_bytes;
|
||||
const char* slashTmp = "/tmp";
|
||||
int nnPort;
|
||||
char *rwTemplate, *rwTemplate2, *newDirTemplate,
|
||||
*appendTemplate, *userTemplate, *rwPath = NULL;
|
||||
const char* fileContents = "Hello, World!";
|
||||
const char* nnHost = NULL;
|
||||
|
||||
hdfsFS fs = hdfsConnectAsUserNewInstance("default", 50070, argv[1]);
|
||||
if (argc != 2) {
|
||||
fprintf(stderr, "usage: test_libwebhdfs_ops <username>\n");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
struct NativeMiniDfsConf conf = {
|
||||
.doFormat = 1, .webhdfsEnabled = 1, .namenodeHttpPort = 50070,
|
||||
};
|
||||
cluster = nmdCreate(&conf);
|
||||
if (!cluster) {
|
||||
fprintf(stderr, "Failed to create the NativeMiniDfsCluster.\n");
|
||||
exit(1);
|
||||
}
|
||||
if (nmdWaitClusterUp(cluster)) {
|
||||
fprintf(stderr, "Error when waiting for cluster to be ready.\n");
|
||||
exit(1);
|
||||
}
|
||||
if (nmdGetNameNodeHttpAddress(cluster, &nnPort, &nnHost)) {
|
||||
fprintf(stderr, "Error when retrieving namenode host address.\n");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
hdfsFS fs = hdfsConnectAsUserNewInstance(nnHost, nnPort, argv[1]);
|
||||
if(!fs) {
|
||||
fprintf(stderr, "Oops! Failed to connect to hdfs!\n");
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
const char* writePath = "/tmp/testfile.txt";
|
||||
const char* fileContents = "Hello, World!";
|
||||
|
||||
{
|
||||
//Write tests
|
||||
|
||||
hdfsFile writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_CREAT, 0, 0, 0);
|
||||
if(!writeFile) {
|
||||
fprintf(stderr, "Failed to open %s for writing!\n", writePath);
|
||||
exit(-1);
|
||||
// Write tests
|
||||
rwTemplate = strdup("/tmp/helloWorldXXXXXX");
|
||||
if (!rwTemplate) {
|
||||
fprintf(stderr, "Failed to create rwTemplate!\n");
|
||||
exit(1);
|
||||
}
|
||||
fprintf(stderr, "Opened %s for writing successfully...\n", writePath);
|
||||
num_written_bytes = hdfsWrite(fs, writeFile, (void*)fileContents, strlen(fileContents) + 1);
|
||||
rwPath = mktemp(rwTemplate);
|
||||
// hdfsOpenFile
|
||||
hdfsFile writeFile = hdfsOpenFile(fs, rwPath,
|
||||
O_WRONLY|O_CREAT, 0, 0, 0);
|
||||
|
||||
if(!writeFile) {
|
||||
fprintf(stderr, "Failed to open %s for writing!\n", rwPath);
|
||||
exit(1);
|
||||
}
|
||||
fprintf(stderr, "Opened %s for writing successfully...\n", rwPath);
|
||||
// hdfsWrite
|
||||
num_written_bytes = hdfsWrite(fs, writeFile, (void*)fileContents,
|
||||
(int) strlen(fileContents) + 1);
|
||||
if (num_written_bytes != strlen(fileContents) + 1) {
|
||||
fprintf(stderr, "Failed to write correct number of bytes - expected %d, got %d\n",
|
||||
(int)(strlen(fileContents) + 1), (int)num_written_bytes);
|
||||
exit(-1);
|
||||
fprintf(stderr, "Failed to write correct number of bytes - "
|
||||
"expected %d, got %d\n",
|
||||
(int)(strlen(fileContents) + 1), (int) num_written_bytes);
|
||||
exit(1);
|
||||
}
|
||||
fprintf(stderr, "Wrote %d bytes\n", num_written_bytes);
|
||||
|
||||
// hdfsTell
|
||||
tOffset currentPos = -1;
|
||||
if ((currentPos = hdfsTell(fs, writeFile)) == -1) {
|
||||
fprintf(stderr,
|
||||
"Failed to get current file position correctly! Got %lld!\n",
|
||||
currentPos);
|
||||
exit(-1);
|
||||
"Failed to get current file position correctly. Got %"
|
||||
PRId64 "!\n", currentPos);
|
||||
exit(1);
|
||||
}
|
||||
fprintf(stderr, "Current position: %lld\n", currentPos);
|
||||
|
||||
if (hdfsFlush(fs, writeFile)) {
|
||||
fprintf(stderr, "Failed to 'flush' %s\n", writePath);
|
||||
exit(-1);
|
||||
}
|
||||
fprintf(stderr, "Flushed %s successfully!\n", writePath);
|
||||
|
||||
if (hdfsHFlush(fs, writeFile)) {
|
||||
fprintf(stderr, "Failed to 'hflush' %s\n", writePath);
|
||||
exit(-1);
|
||||
}
|
||||
fprintf(stderr, "HFlushed %s successfully!\n", writePath);
|
||||
fprintf(stderr, "Current position: %" PRId64 "\n", currentPos);
|
||||
|
||||
hdfsCloseFile(fs, writeFile);
|
||||
// Done test write
|
||||
}
|
||||
|
||||
sleep(1);
|
||||
|
||||
{
|
||||
//Read tests
|
||||
sleep(1);
|
||||
const char* readPath = "/tmp/testfile.txt";
|
||||
int exists = hdfsExists(fs, readPath);
|
||||
int available = 0, exists = 0;
|
||||
|
||||
// hdfsExists
|
||||
exists = hdfsExists(fs, rwPath);
|
||||
if (exists) {
|
||||
fprintf(stderr, "Failed to validate existence of %s\n", readPath);
|
||||
exists = hdfsExists(fs, readPath);
|
||||
fprintf(stderr, "Failed to validate existence of %s\n", rwPath);
|
||||
exists = hdfsExists(fs, rwPath);
|
||||
if (exists) {
|
||||
fprintf(stderr, "Still failed to validate existence of %s\n", readPath);
|
||||
exit(-1);
|
||||
fprintf(stderr,
|
||||
"Still failed to validate existence of %s\n", rwPath);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
hdfsFile readFile = hdfsOpenFile(fs, readPath, O_RDONLY, 0, 0, 0);
|
||||
hdfsFile readFile = hdfsOpenFile(fs, rwPath, O_RDONLY, 0, 0, 0);
|
||||
if (!readFile) {
|
||||
fprintf(stderr, "Failed to open %s for reading!\n", readPath);
|
||||
exit(-1);
|
||||
fprintf(stderr, "Failed to open %s for reading!\n", rwPath);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
if (!hdfsFileIsOpenForRead(readFile)) {
|
||||
fprintf(stderr, "hdfsFileIsOpenForRead: we just opened a file "
|
||||
"with O_RDONLY, and it did not show up as 'open for "
|
||||
"read'\n");
|
||||
exit(-1);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
fprintf(stderr, "hdfsAvailable: %d\n", hdfsAvailable(fs, readFile));
|
||||
available = hdfsAvailable(fs, readFile);
|
||||
fprintf(stderr, "hdfsAvailable: %d\n", available);
|
||||
|
||||
// hdfsSeek, hdfsTell
|
||||
tOffset seekPos = 1;
|
||||
if(hdfsSeek(fs, readFile, seekPos)) {
|
||||
fprintf(stderr, "Failed to seek %s for reading!\n", readPath);
|
||||
exit(-1);
|
||||
fprintf(stderr, "Failed to seek %s for reading!\n", rwPath);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
tOffset currentPos = -1;
|
||||
if((currentPos = hdfsTell(fs, readFile)) != seekPos) {
|
||||
fprintf(stderr,
|
||||
"Failed to get current file position correctly! Got %lld!\n",
|
||||
currentPos);
|
||||
exit(-1);
|
||||
"Failed to get current file position correctly! Got %"
|
||||
PRId64 "!\n", currentPos);
|
||||
|
||||
exit(1);
|
||||
}
|
||||
fprintf(stderr, "Current position: %lld\n", currentPos);
|
||||
fprintf(stderr, "Current position: %" PRId64 "\n", currentPos);
|
||||
|
||||
if (!hdfsFileUsesDirectRead(readFile)) {
|
||||
fprintf(stderr, "Direct read support incorrectly not detected "
|
||||
"for HDFS filesystem\n");
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
fprintf(stderr, "Direct read support detected for HDFS\n");
|
||||
|
||||
// Test the direct read path
|
||||
if(hdfsSeek(fs, readFile, 0)) {
|
||||
fprintf(stderr, "Failed to seek %s for reading!\n", readPath);
|
||||
exit(-1);
|
||||
fprintf(stderr, "Failed to seek %s for reading!\n", rwPath);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
// hdfsRead
|
||||
memset(buffer, 0, sizeof(buffer));
|
||||
tSize num_read_bytes = hdfsRead(fs, readFile, (void*)buffer,
|
||||
sizeof(buffer));
|
||||
tSize num_read_bytes = hdfsRead(fs, readFile, buffer, sizeof(buffer));
|
||||
if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
|
||||
fprintf(stderr, "Failed to read (direct). Expected %s but got %s (%d bytes)\n",
|
||||
fprintf(stderr, "Failed to read (direct). "
|
||||
"Expected %s but got %s (%d bytes)\n",
|
||||
fileContents, buffer, num_read_bytes);
|
||||
exit(-1);
|
||||
exit(1);
|
||||
}
|
||||
fprintf(stderr, "Read following %d bytes:\n%s\n",
|
||||
num_read_bytes, buffer);
|
||||
|
||||
if (hdfsSeek(fs, readFile, 0L)) {
|
||||
fprintf(stderr, "Failed to seek to file start!\n");
|
||||
exit(-1);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
// Disable the direct read path so that we really go through the slow
|
||||
// read path
|
||||
hdfsFileDisableDirectRead(readFile);
|
||||
|
||||
num_read_bytes = hdfsRead(fs, readFile, (void*)buffer,
|
||||
sizeof(buffer));
|
||||
fprintf(stderr, "Read following %d bytes:\n%s\n",
|
||||
num_read_bytes, buffer);
|
||||
|
||||
// hdfsPread
|
||||
memset(buffer, 0, strlen(fileContents + 1));
|
||||
|
||||
num_read_bytes = hdfsPread(fs, readFile, 0, (void*)buffer,
|
||||
sizeof(buffer));
|
||||
num_read_bytes = hdfsPread(fs, readFile, 0, buffer, sizeof(buffer));
|
||||
fprintf(stderr, "Read following %d bytes:\n%s\n",
|
||||
num_read_bytes, buffer);
|
||||
|
||||
hdfsCloseFile(fs, readFile);
|
||||
// Done test read
|
||||
}
|
||||
|
||||
int totalResult = 0;
|
||||
int result = 0;
|
||||
{
|
||||
//Generic file-system operations
|
||||
|
||||
const char* srcPath = "/tmp/testfile.txt";
|
||||
const char* dstPath = "/tmp/testfile2.txt";
|
||||
const char* copyPath = "/tmp/testfile_copy.txt";
|
||||
const char* movePath = "/tmp/testfile_move.txt";
|
||||
|
||||
fprintf(stderr, "hdfsCopy: %s\n", ((result = hdfsCopy(fs, srcPath, fs, copyPath)) ? "Failed!" : "Success!"));
|
||||
totalResult += result;
|
||||
fprintf(stderr, "hdfsMove: %s\n", ((result = hdfsMove(fs, copyPath, fs, movePath)) ? "Failed!" : "Success!"));
|
||||
totalResult += result;
|
||||
|
||||
fprintf(stderr, "hdfsGetDefaultBlockSize: %lld\n", hdfsGetDefaultBlockSize(fs));
|
||||
|
||||
fprintf(stderr, "hdfsRename: %s\n", ((result = hdfsRename(fs, srcPath, dstPath)) ? "Failed!" : "Success!"));
|
||||
totalResult += result;
|
||||
fprintf(stderr, "hdfsRename back: %s\n", ((result = hdfsRename(fs, dstPath, srcPath)) ? "Failed!" : "Success!"));
|
||||
totalResult += result;
|
||||
|
||||
const char* slashTmp = "/tmp";
|
||||
const char* newDirectory = "/tmp/newdir";
|
||||
fprintf(stderr, "hdfsCreateDirectory: %s\n", ((result = hdfsCreateDirectory(fs, newDirectory)) ? "Failed!" : "Success!"));
|
||||
totalResult += result;
|
||||
|
||||
fprintf(stderr, "hdfsSetReplication: %s\n", ((result = hdfsSetReplication(fs, srcPath, 1)) ? "Failed!" : "Success!"));
|
||||
totalResult += result;
|
||||
|
||||
char *srcPath = rwPath;
|
||||
char buffer[256];
|
||||
const char *resp;
|
||||
fprintf(stderr, "hdfsGetWorkingDirectory: %s\n", ((resp = hdfsGetWorkingDirectory(fs, buffer, sizeof(buffer))) ? buffer : "Failed!"));
|
||||
totalResult += (resp ? 0 : 1);
|
||||
fprintf(stderr, "hdfsSetWorkingDirectory: %s\n", ((result = hdfsSetWorkingDirectory(fs, slashTmp)) ? "Failed!" : "Success!"));
|
||||
totalResult += result;
|
||||
fprintf(stderr, "hdfsGetWorkingDirectory: %s\n", ((resp = hdfsGetWorkingDirectory(fs, buffer, sizeof(buffer))) ? buffer : "Failed!"));
|
||||
totalResult += (resp ? 0 : 1);
|
||||
rwTemplate2 = strdup("/tmp/helloWorld2XXXXXX");
|
||||
if (!rwTemplate2) {
|
||||
fprintf(stderr, "Failed to create rwTemplate2!\n");
|
||||
exit(1);
|
||||
}
|
||||
char *dstPath = mktemp(rwTemplate2);
|
||||
newDirTemplate = strdup("/tmp/newdirXXXXXX");
|
||||
if (!newDirTemplate) {
|
||||
fprintf(stderr, "Failed to create newDirTemplate!\n");
|
||||
exit(1);
|
||||
}
|
||||
char *newDirectory = mktemp(newDirTemplate);
|
||||
|
||||
// hdfsRename
|
||||
fprintf(stderr, "hdfsRename: %s\n",
|
||||
((result = hdfsRename(fs, rwPath, dstPath)) ?
|
||||
"Failed!" : "Success!"));
|
||||
totalResult += result;
|
||||
fprintf(stderr, "hdfsRename back: %s\n",
|
||||
((result = hdfsRename(fs, dstPath, srcPath)) ?
|
||||
"Failed!" : "Success!"));
|
||||
totalResult += result;
|
||||
|
||||
// hdfsCreateDirectory
|
||||
fprintf(stderr, "hdfsCreateDirectory: %s\n",
|
||||
((result = hdfsCreateDirectory(fs, newDirectory)) ?
|
||||
"Failed!" : "Success!"));
|
||||
totalResult += result;
|
||||
|
||||
// hdfsSetReplication
|
||||
fprintf(stderr, "hdfsSetReplication: %s\n",
|
||||
((result = hdfsSetReplication(fs, srcPath, 1)) ?
|
||||
"Failed!" : "Success!"));
|
||||
totalResult += result;
|
||||
|
||||
// hdfsGetWorkingDirectory, hdfsSetWorkingDirectory
|
||||
fprintf(stderr, "hdfsGetWorkingDirectory: %s\n",
|
||||
((resp = hdfsGetWorkingDirectory(fs, buffer, sizeof(buffer))) ?
|
||||
buffer : "Failed!"));
|
||||
totalResult += (resp ? 0 : 1);
|
||||
|
||||
const char* path[] = {"/foo", "/foo/bar", "foobar", "//foo/bar//foobar",
|
||||
"foo//bar", "foo/bar///", "/", "////"};
|
||||
for (int i = 0; i < 8; i++) {
|
||||
fprintf(stderr, "hdfsSetWorkingDirectory: %s, %s\n",
|
||||
((result = hdfsSetWorkingDirectory(fs, path[i])) ?
|
||||
"Failed!" : "Success!"),
|
||||
hdfsGetWorkingDirectory(fs, buffer, sizeof(buffer)));
|
||||
totalResult += result;
|
||||
}
|
||||
|
||||
fprintf(stderr, "hdfsSetWorkingDirectory: %s\n",
|
||||
((result = hdfsSetWorkingDirectory(fs, slashTmp)) ?
|
||||
"Failed!" : "Success!"));
|
||||
totalResult += result;
|
||||
fprintf(stderr, "hdfsGetWorkingDirectory: %s\n",
|
||||
((resp = hdfsGetWorkingDirectory(fs, buffer, sizeof(buffer))) ?
|
||||
buffer : "Failed!"));
|
||||
totalResult += (resp ? 0 : 1);
|
||||
|
||||
// hdfsGetPathInfo
|
||||
hdfsFileInfo *fileInfo = NULL;
|
||||
if((fileInfo = hdfsGetPathInfo(fs, slashTmp)) != NULL) {
|
||||
fprintf(stderr, "hdfsGetPathInfo - SUCCESS!\n");
|
||||
|
@ -261,13 +288,15 @@ int main(int argc, char **argv) {
|
|||
fprintf(stderr, "Group: %s, ", fileInfo->mGroup);
|
||||
char permissions[10];
|
||||
permission_disp(fileInfo->mPermissions, permissions);
|
||||
fprintf(stderr, "Permissions: %d (%s)\n", fileInfo->mPermissions, permissions);
|
||||
fprintf(stderr, "Permissions: %d (%s)\n",
|
||||
fileInfo->mPermissions, permissions);
|
||||
hdfsFreeFileInfo(fileInfo, 1);
|
||||
} else {
|
||||
totalResult++;
|
||||
fprintf(stderr, "waah! hdfsGetPathInfo for %s - FAILED!\n", slashTmp);
|
||||
fprintf(stderr, "hdfsGetPathInfo for %s - FAILED!\n", slashTmp);
|
||||
}
|
||||
|
||||
// hdfsListDirectory
|
||||
hdfsFileInfo *fileList = 0;
|
||||
int numEntries = 0;
|
||||
if((fileList = hdfsListDirectory(fs, slashTmp, &numEntries)) != NULL) {
|
||||
|
@ -283,7 +312,8 @@ int main(int argc, char **argv) {
|
|||
fprintf(stderr, "Group: %s, ", fileList[i].mGroup);
|
||||
char permissions[10];
|
||||
permission_disp(fileList[i].mPermissions, permissions);
|
||||
fprintf(stderr, "Permissions: %d (%s)\n", fileList[i].mPermissions, permissions);
|
||||
fprintf(stderr, "Permissions: %d (%s)\n",
|
||||
fileList[i].mPermissions, permissions);
|
||||
}
|
||||
hdfsFreeFileInfo(fileList, numEntries);
|
||||
} else {
|
||||
|
@ -295,203 +325,220 @@ int main(int argc, char **argv) {
|
|||
}
|
||||
}
|
||||
|
||||
// char*** hosts = hdfsGetHosts(fs, srcPath, 0, 1);
|
||||
// if(hosts) {
|
||||
// fprintf(stderr, "hdfsGetHosts - SUCCESS! ... \n");
|
||||
// int i=0;
|
||||
// while(hosts[i]) {
|
||||
// int j = 0;
|
||||
// while(hosts[i][j]) {
|
||||
// fprintf(stderr,
|
||||
// "\thosts[%d][%d] - %s\n", i, j, hosts[i][j]);
|
||||
// ++j;
|
||||
// }
|
||||
// ++i;
|
||||
// }
|
||||
// } else {
|
||||
// totalResult++;
|
||||
// fprintf(stderr, "waah! hdfsGetHosts - FAILED!\n");
|
||||
// }
|
||||
|
||||
char *newOwner = "root";
|
||||
// setting tmp dir to 777 so later when connectAsUser nobody, we can write to it
|
||||
// Setting tmp dir to 777 so later when connectAsUser nobody,
|
||||
// we can write to it
|
||||
short newPerm = 0666;
|
||||
|
||||
// chown write
|
||||
fprintf(stderr, "hdfsChown: %s\n", ((result = hdfsChown(fs, writePath, NULL, "users")) ? "Failed!" : "Success!"));
|
||||
// hdfsChown
|
||||
fprintf(stderr, "hdfsChown: %s\n",
|
||||
((result = hdfsChown(fs, rwPath, NULL, "users")) ?
|
||||
"Failed!" : "Success!"));
|
||||
totalResult += result;
|
||||
fprintf(stderr, "hdfsChown: %s\n", ((result = hdfsChown(fs, writePath, newOwner, NULL)) ? "Failed!" : "Success!"));
|
||||
fprintf(stderr, "hdfsChown: %s\n",
|
||||
((result = hdfsChown(fs, rwPath, newOwner, NULL)) ?
|
||||
"Failed!" : "Success!"));
|
||||
totalResult += result;
|
||||
// chmod write
|
||||
fprintf(stderr, "hdfsChmod: %s\n", ((result = hdfsChmod(fs, writePath, newPerm)) ? "Failed!" : "Success!"));
|
||||
// hdfsChmod
|
||||
fprintf(stderr, "hdfsChmod: %s\n",
|
||||
((result = hdfsChmod(fs, rwPath, newPerm)) ?
|
||||
"Failed!" : "Success!"));
|
||||
totalResult += result;
|
||||
|
||||
|
||||
|
||||
sleep(2);
|
||||
tTime newMtime = time(NULL);
|
||||
tTime newAtime = time(NULL);
|
||||
|
||||
// utime write
|
||||
fprintf(stderr, "hdfsUtime: %s\n", ((result = hdfsUtime(fs, writePath, newMtime, newAtime)) ? "Failed!" : "Success!"));
|
||||
|
||||
fprintf(stderr, "hdfsUtime: %s\n",
|
||||
((result = hdfsUtime(fs, rwPath, newMtime, newAtime)) ?
|
||||
"Failed!" : "Success!"));
|
||||
totalResult += result;
|
||||
|
||||
// chown/chmod/utime read
|
||||
hdfsFileInfo *finfo = hdfsGetPathInfo(fs, writePath);
|
||||
hdfsFileInfo *finfo = hdfsGetPathInfo(fs, rwPath);
|
||||
|
||||
fprintf(stderr, "hdfsChown read: %s\n", ((result = (strcmp(finfo->mOwner, newOwner) != 0)) ? "Failed!" : "Success!"));
|
||||
fprintf(stderr, "hdfsChown read: %s\n",
|
||||
((result = (strcmp(finfo->mOwner, newOwner) != 0)) ?
|
||||
"Failed!" : "Success!"));
|
||||
totalResult += result;
|
||||
|
||||
fprintf(stderr, "hdfsChmod read: %s\n", ((result = (finfo->mPermissions != newPerm)) ? "Failed!" : "Success!"));
|
||||
fprintf(stderr, "hdfsChmod read: %s\n",
|
||||
((result = (finfo->mPermissions != newPerm)) ?
|
||||
"Failed!" : "Success!"));
|
||||
totalResult += result;
|
||||
|
||||
// will later use /tmp/ as a different user so enable it
|
||||
fprintf(stderr, "hdfsChmod: %s\n", ((result = hdfsChmod(fs, "/tmp/", 0777)) ? "Failed!" : "Success!"));
|
||||
fprintf(stderr, "hdfsChmod: %s\n",
|
||||
((result = hdfsChmod(fs, slashTmp, 0777)) ?
|
||||
"Failed!" : "Success!"));
|
||||
totalResult += result;
|
||||
|
||||
fprintf(stderr,"newMTime=%ld\n",newMtime);
|
||||
fprintf(stderr,"curMTime=%ld\n",finfo->mLastMod);
|
||||
|
||||
|
||||
fprintf(stderr, "hdfsUtime read (mtime): %s\n", ((result = (finfo->mLastMod != newMtime / 1000)) ? "Failed!" : "Success!"));
|
||||
fprintf(stderr, "hdfsUtime read (mtime): %s\n",
|
||||
((result = (finfo->mLastMod != newMtime / 1000)) ?
|
||||
"Failed!" : "Success!"));
|
||||
totalResult += result;
|
||||
|
||||
hdfsFreeFileInfo(finfo, 1);
|
||||
|
||||
// Clean up
|
||||
fprintf(stderr, "hdfsDelete: %s\n", ((result = hdfsDelete(fs, newDirectory, 1)) ? "Failed!" : "Success!"));
|
||||
hdfsFreeFileInfo(finfo, 1);
|
||||
fprintf(stderr, "hdfsDelete: %s\n",
|
||||
((result = hdfsDelete(fs, newDirectory, 1)) ?
|
||||
"Failed!" : "Success!"));
|
||||
totalResult += result;
|
||||
fprintf(stderr, "hdfsDelete: %s\n", ((result = hdfsDelete(fs, srcPath, 1)) ? "Failed!" : "Success!"));
|
||||
fprintf(stderr, "hdfsDelete: %s\n",
|
||||
((result = hdfsDelete(fs, srcPath, 1)) ?
|
||||
"Failed!" : "Success!"));
|
||||
totalResult += result;
|
||||
// fprintf(stderr, "hdfsDelete: %s\n", ((result = hdfsDelete(fs, movePath, 1)) ? "Failed!" : "Success!"));
|
||||
// totalResult += result;
|
||||
fprintf(stderr, "hdfsExists: %s\n", ((result = hdfsExists(fs, newDirectory)) ? "Success!" : "Failed!"));
|
||||
fprintf(stderr, "hdfsExists: %s\n",
|
||||
((result = hdfsExists(fs, newDirectory)) ?
|
||||
"Success!" : "Failed!"));
|
||||
totalResult += (result ? 0 : 1);
|
||||
// Done test generic operations
|
||||
}
|
||||
|
||||
{
|
||||
// TEST APPENDS
|
||||
const char *writePath = "/tmp/appends";
|
||||
// Test Appends
|
||||
appendTemplate = strdup("/tmp/appendsXXXXXX");
|
||||
if (!appendTemplate) {
|
||||
fprintf(stderr, "Failed to create appendTemplate!\n");
|
||||
exit(1);
|
||||
}
|
||||
char *appendPath = mktemp(appendTemplate);
|
||||
const char* helloBuffer = "Hello,";
|
||||
hdfsFile writeFile = NULL;
|
||||
|
||||
// CREATE
|
||||
hdfsFile writeFile = hdfsOpenFile(fs, writePath, O_WRONLY, 0, 0, 0);
|
||||
// Create
|
||||
writeFile = hdfsOpenFile(fs, appendPath, O_WRONLY, 0, 0, 0);
|
||||
if(!writeFile) {
|
||||
fprintf(stderr, "Failed to open %s for writing!\n", writePath);
|
||||
exit(-1);
|
||||
fprintf(stderr, "Failed to open %s for writing!\n", appendPath);
|
||||
exit(1);
|
||||
}
|
||||
fprintf(stderr, "Opened %s for writing successfully...\n", writePath);
|
||||
fprintf(stderr, "Opened %s for writing successfully...\n", appendPath);
|
||||
|
||||
const char* buffer = "Hello,";
|
||||
tSize num_written_bytes = hdfsWrite(fs, writeFile, (void*)buffer, strlen(buffer));
|
||||
num_written_bytes = hdfsWrite(fs, writeFile, helloBuffer,
|
||||
(int) strlen(helloBuffer));
|
||||
fprintf(stderr, "Wrote %d bytes\n", num_written_bytes);
|
||||
|
||||
if (hdfsFlush(fs, writeFile)) {
|
||||
fprintf(stderr, "Failed to 'flush' %s\n", writePath);
|
||||
exit(-1);
|
||||
}
|
||||
fprintf(stderr, "Flushed %s successfully!\n", writePath);
|
||||
|
||||
hdfsCloseFile(fs, writeFile);
|
||||
|
||||
fprintf(stderr, "hdfsSetReplication: %s\n", ((result = hdfsSetReplication(fs, writePath, 1)) ? "Failed!" : "Success!"));
|
||||
fprintf(stderr, "hdfsSetReplication: %s\n",
|
||||
((result = hdfsSetReplication(fs, appendPath, 1)) ?
|
||||
"Failed!" : "Success!"));
|
||||
totalResult += result;
|
||||
|
||||
// RE-OPEN
|
||||
writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_APPEND, 0, 0, 0);
|
||||
// Re-Open for Append
|
||||
writeFile = hdfsOpenFile(fs, appendPath, O_WRONLY | O_APPEND, 0, 0, 0);
|
||||
if(!writeFile) {
|
||||
fprintf(stderr, "Failed to open %s for writing!\n", writePath);
|
||||
exit(-1);
|
||||
fprintf(stderr, "Failed to open %s for writing!\n", appendPath);
|
||||
exit(1);
|
||||
}
|
||||
fprintf(stderr, "Opened %s for appending successfully...\n", writePath);
|
||||
fprintf(stderr, "Opened %s for appending successfully...\n",
|
||||
appendPath);
|
||||
|
||||
buffer = " World";
|
||||
num_written_bytes = hdfsWrite(fs, writeFile, (void*)buffer, strlen(buffer) + 1);
|
||||
helloBuffer = " World";
|
||||
num_written_bytes = hdfsWrite(fs, writeFile, helloBuffer,
|
||||
(int)strlen(helloBuffer) + 1);
|
||||
fprintf(stderr, "Wrote %d bytes\n", num_written_bytes);
|
||||
|
||||
if (hdfsFlush(fs, writeFile)) {
|
||||
fprintf(stderr, "Failed to 'flush' %s\n", writePath);
|
||||
exit(-1);
|
||||
}
|
||||
fprintf(stderr, "Flushed %s successfully!\n", writePath);
|
||||
|
||||
hdfsCloseFile(fs, writeFile);
|
||||
|
||||
// CHECK size
|
||||
hdfsFileInfo *finfo = hdfsGetPathInfo(fs, writePath);
|
||||
fprintf(stderr, "fileinfo->mSize: == total %s\n", ((result = (finfo->mSize == strlen("Hello, World") + 1)) ? "Success!" : "Failed!"));
|
||||
// Check size
|
||||
hdfsFileInfo *finfo = hdfsGetPathInfo(fs, appendPath);
|
||||
fprintf(stderr, "fileinfo->mSize: == total %s\n",
|
||||
((result = (finfo->mSize == strlen("Hello, World") + 1)) ?
|
||||
"Success!" : "Failed!"));
|
||||
totalResult += (result ? 0 : 1);
|
||||
|
||||
// READ and check data
|
||||
hdfsFile readFile = hdfsOpenFile(fs, writePath, O_RDONLY, 0, 0, 0);
|
||||
// Read and check data
|
||||
hdfsFile readFile = hdfsOpenFile(fs, appendPath, O_RDONLY, 0, 0, 0);
|
||||
if (!readFile) {
|
||||
fprintf(stderr, "Failed to open %s for reading!\n", writePath);
|
||||
exit(-1);
|
||||
fprintf(stderr, "Failed to open %s for reading!\n", appendPath);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
char rdbuffer[32];
|
||||
tSize num_read_bytes = hdfsRead(fs, readFile, (void*)rdbuffer, sizeof(rdbuffer));
|
||||
tSize num_read_bytes = hdfsRead(fs, readFile, buffer, sizeof(buffer));
|
||||
fprintf(stderr, "Read following %d bytes:\n%s\n",
|
||||
num_read_bytes, rdbuffer);
|
||||
|
||||
fprintf(stderr, "read == Hello, World %s\n", (result = (strcmp(rdbuffer, "Hello, World") == 0)) ? "Success!" : "Failed!");
|
||||
|
||||
num_read_bytes, buffer);
|
||||
fprintf(stderr, "read == Hello, World %s\n",
|
||||
(result = (strcmp(buffer, "Hello, World") == 0)) ?
|
||||
"Success!" : "Failed!");
|
||||
hdfsCloseFile(fs, readFile);
|
||||
|
||||
// DONE test appends
|
||||
// Cleanup
|
||||
fprintf(stderr, "hdfsDelete: %s\n",
|
||||
((result = hdfsDelete(fs, appendPath, 1)) ?
|
||||
"Failed!" : "Success!"));
|
||||
totalResult += result;
|
||||
// Done test appends
|
||||
}
|
||||
|
||||
|
||||
totalResult += (hdfsDisconnect(fs) != 0);
|
||||
|
||||
{
|
||||
//
|
||||
// Now test as connecting as a specific user
|
||||
// This is only meant to test that we connected as that user, not to test
|
||||
// This only meant to test that we connected as that user, not to test
|
||||
// the actual fs user capabilities. Thus just create a file and read
|
||||
// the owner is correct.
|
||||
|
||||
const char *tuser = "nobody";
|
||||
const char* writePath = "/tmp/usertestfile.txt";
|
||||
userTemplate = strdup("/tmp/usertestXXXXXX");
|
||||
if (!userTemplate) {
|
||||
fprintf(stderr, "Failed to create userTemplate!\n");
|
||||
exit(1);
|
||||
}
|
||||
char* userWritePath = mktemp(userTemplate);
|
||||
hdfsFile writeFile = NULL;
|
||||
|
||||
fs = hdfsConnectAsUserNewInstance("default", 50070, tuser);
|
||||
if(!fs) {
|
||||
fprintf(stderr, "Oops! Failed to connect to hdfs as user %s!\n",tuser);
|
||||
exit(-1);
|
||||
fprintf(stderr,
|
||||
"Oops! Failed to connect to hdfs as user %s!\n",tuser);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
hdfsFile writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_CREAT, 0, 0, 0);
|
||||
writeFile = hdfsOpenFile(fs, userWritePath, O_WRONLY|O_CREAT, 0, 0, 0);
|
||||
if(!writeFile) {
|
||||
fprintf(stderr, "Failed to open %s for writing!\n", writePath);
|
||||
exit(-1);
|
||||
fprintf(stderr, "Failed to open %s for writing!\n", userWritePath);
|
||||
exit(1);
|
||||
}
|
||||
fprintf(stderr, "Opened %s for writing successfully...\n", writePath);
|
||||
fprintf(stderr, "Opened %s for writing successfully...\n",
|
||||
userWritePath);
|
||||
|
||||
char* buffer = "Hello, World!";
|
||||
tSize num_written_bytes = hdfsWrite(fs, writeFile, (void*)buffer, strlen(buffer)+1);
|
||||
num_written_bytes = hdfsWrite(fs, writeFile, fileContents,
|
||||
(int)strlen(fileContents) + 1);
|
||||
fprintf(stderr, "Wrote %d bytes\n", num_written_bytes);
|
||||
|
||||
if (hdfsFlush(fs, writeFile)) {
|
||||
fprintf(stderr, "Failed to 'flush' %s\n", writePath);
|
||||
exit(-1);
|
||||
}
|
||||
fprintf(stderr, "Flushed %s successfully!\n", writePath);
|
||||
|
||||
hdfsCloseFile(fs, writeFile);
|
||||
|
||||
hdfsFileInfo *finfo = hdfsGetPathInfo(fs, writePath);
|
||||
hdfsFileInfo *finfo = hdfsGetPathInfo(fs, userWritePath);
|
||||
if (finfo) {
|
||||
fprintf(stderr, "hdfs new file user is correct: %s\n", ((result = (strcmp(finfo->mOwner, tuser) != 0)) ? "Failed!" : "Success!"));
|
||||
fprintf(stderr, "hdfs new file user is correct: %s\n",
|
||||
((result = (strcmp(finfo->mOwner, tuser) != 0)) ?
|
||||
"Failed!" : "Success!"));
|
||||
} else {
|
||||
fprintf(stderr, "hdfsFileInfo returned by hdfsGetPathInfo is NULL\n");
|
||||
fprintf(stderr,
|
||||
"hdfsFileInfo returned by hdfsGetPathInfo is NULL\n");
|
||||
result = -1;
|
||||
}
|
||||
totalResult += result;
|
||||
|
||||
// Cleanup
|
||||
fprintf(stderr, "hdfsDelete: %s\n",
|
||||
((result = hdfsDelete(fs, userWritePath, 1)) ?
|
||||
"Failed!" : "Success!"));
|
||||
totalResult += result;
|
||||
// Done test specific user
|
||||
}
|
||||
|
||||
|
||||
totalResult += (hdfsDisconnect(fs) != 0);
|
||||
fprintf(stderr, "totalResult == %d\n", totalResult);
|
||||
|
||||
// Shutdown the native minidfscluster
|
||||
nmdShutdown(cluster);
|
||||
nmdFree(cluster);
|
||||
|
||||
fprintf(stderr, "totalResult == %d\n", totalResult);
|
||||
if (totalResult != 0) {
|
||||
return -1;
|
||||
} else {
|
||||
|
|
|
@ -22,43 +22,52 @@
|
|||
#include <stdlib.h>
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
|
||||
const char* rfile;
|
||||
tSize fileTotalSize, bufferSize, curSize, totalReadSize;
|
||||
hdfsFS fs;
|
||||
hdfsFile readFile;
|
||||
char *buffer = NULL;
|
||||
|
||||
if (argc != 4) {
|
||||
fprintf(stderr, "Usage: hdfs_read <filename> <filesize> <buffersize>\n");
|
||||
exit(-1);
|
||||
fprintf(stderr, "Usage: test_libwebhdfs_read"
|
||||
" <filename> <filesize> <buffersize>\n");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
hdfsFS fs = hdfsConnect("0.0.0.0", 50070);
|
||||
fs = hdfsConnect("localhost", 50070);
|
||||
if (!fs) {
|
||||
fprintf(stderr, "Oops! Failed to connect to hdfs!\n");
|
||||
exit(-1);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
const char* rfile = argv[1];
|
||||
tSize fileTotalSize = strtoul(argv[2], NULL, 10);
|
||||
tSize bufferSize = strtoul(argv[3], NULL, 10);
|
||||
rfile = argv[1];
|
||||
fileTotalSize = strtoul(argv[2], NULL, 10);
|
||||
bufferSize = strtoul(argv[3], NULL, 10);
|
||||
|
||||
hdfsFile readFile = hdfsOpenFile(fs, rfile, O_RDONLY, bufferSize, 0, 0);
|
||||
readFile = hdfsOpenFile(fs, rfile, O_RDONLY, bufferSize, 0, 0);
|
||||
if (!readFile) {
|
||||
fprintf(stderr, "Failed to open %s for writing!\n", rfile);
|
||||
exit(-2);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
// data to be written to the file
|
||||
char* buffer = malloc(sizeof(char) * bufferSize);
|
||||
buffer = malloc(sizeof(char) * bufferSize);
|
||||
if(buffer == NULL) {
|
||||
return -2;
|
||||
fprintf(stderr, "Failed to allocate buffer.\n");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
// read from the file
|
||||
tSize curSize = bufferSize;
|
||||
tSize totalReadSize = 0;
|
||||
for (; (curSize = hdfsRead(fs, readFile, (void*)buffer, bufferSize)) == bufferSize ;) {
|
||||
curSize = bufferSize;
|
||||
totalReadSize = 0;
|
||||
for (; (curSize = hdfsRead(fs, readFile, buffer, bufferSize)) == bufferSize; ) {
|
||||
totalReadSize += curSize;
|
||||
}
|
||||
totalReadSize += curSize;
|
||||
|
||||
fprintf(stderr, "size of the file: %d; reading size: %d\n", fileTotalSize, totalReadSize);
|
||||
fprintf(stderr, "size of the file: %d; reading size: %d\n",
|
||||
fileTotalSize, totalReadSize);
|
||||
|
||||
free(buffer);
|
||||
hdfsCloseFile(fs, readFile);
|
||||
|
@ -67,7 +76,3 @@ int main(int argc, char **argv) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* vim: ts=4: sw=4: et:
|
||||
*/
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
#include "expect.h"
|
||||
#include "hdfs.h"
|
||||
#include "native_mini_dfs.h"
|
||||
|
||||
#include <errno.h>
|
||||
#include <semaphore.h>
|
||||
|
@ -28,11 +29,9 @@
|
|||
|
||||
#define TLH_MAX_THREADS 100
|
||||
|
||||
static sem_t *tlhSem;
|
||||
static struct NativeMiniDfsCluster* cluster;
|
||||
|
||||
static const char *nn;
|
||||
static const char *user;
|
||||
static int port;
|
||||
|
||||
struct tlhThreadInfo {
|
||||
/** Thread index */
|
||||
|
@ -43,19 +42,24 @@ struct tlhThreadInfo {
|
|||
pthread_t thread;
|
||||
};
|
||||
|
||||
static int hdfsSingleNameNodeConnect(const char *nn, int port, const char *user, hdfsFS *fs)
|
||||
static int hdfsSingleNameNodeConnect(struct NativeMiniDfsCluster *cluster,
|
||||
hdfsFS *fs)
|
||||
{
|
||||
int nnPort;
|
||||
const char *nnHost;
|
||||
hdfsFS hdfs;
|
||||
if (port < 0) {
|
||||
fprintf(stderr, "hdfsSingleNameNodeConnect: nmdGetNameNodePort "
|
||||
"returned error %d\n", port);
|
||||
return port;
|
||||
|
||||
if (nmdGetNameNodeHttpAddress(cluster, &nnPort, &nnHost)) {
|
||||
fprintf(stderr, "Error when retrieving namenode host address.\n");
|
||||
return 1;
|
||||
}
|
||||
|
||||
hdfs = hdfsConnectAsUserNewInstance(nn, port, user);
|
||||
if (!hdfs) {
|
||||
return -errno;
|
||||
hdfs = hdfsConnectAsUser(nnHost, nnPort, user);
|
||||
if(!hdfs) {
|
||||
fprintf(stderr, "Oops! Failed to connect to hdfs!\n");
|
||||
return 1;
|
||||
}
|
||||
|
||||
*fs = hdfs;
|
||||
return 0;
|
||||
}
|
||||
|
@ -65,6 +69,7 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs)
|
|||
char prefix[256], tmp[256];
|
||||
hdfsFile file;
|
||||
int ret, expected;
|
||||
hdfsFileInfo *fileInfo;
|
||||
|
||||
snprintf(prefix, sizeof(prefix), "/tlhData%04d", ti->threadIdx);
|
||||
|
||||
|
@ -74,18 +79,13 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs)
|
|||
EXPECT_ZERO(hdfsCreateDirectory(fs, prefix));
|
||||
snprintf(tmp, sizeof(tmp), "%s/file", prefix);
|
||||
|
||||
/*
|
||||
* Although there should not be any file to open for reading,
|
||||
* the right now implementation only construct a local
|
||||
* information struct when opening file
|
||||
*/
|
||||
EXPECT_NONNULL(hdfsOpenFile(fs, tmp, O_RDONLY, 0, 0, 0));
|
||||
|
||||
file = hdfsOpenFile(fs, tmp, O_WRONLY, 0, 0, 0);
|
||||
EXPECT_NONNULL(file);
|
||||
|
||||
/* TODO: implement writeFully and use it here */
|
||||
expected = strlen(prefix);
|
||||
expected = (int)strlen(prefix);
|
||||
ret = hdfsWrite(fs, file, prefix, expected);
|
||||
if (ret < 0) {
|
||||
ret = errno;
|
||||
|
@ -118,9 +118,28 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs)
|
|||
}
|
||||
EXPECT_ZERO(memcmp(prefix, tmp, expected));
|
||||
EXPECT_ZERO(hdfsCloseFile(fs, file));
|
||||
|
||||
snprintf(tmp, sizeof(tmp), "%s/file", prefix);
|
||||
EXPECT_NONZERO(hdfsChown(fs, tmp, NULL, NULL));
|
||||
EXPECT_ZERO(hdfsChown(fs, tmp, NULL, "doop"));
|
||||
fileInfo = hdfsGetPathInfo(fs, tmp);
|
||||
EXPECT_NONNULL(fileInfo);
|
||||
EXPECT_ZERO(strcmp("doop", fileInfo->mGroup));
|
||||
hdfsFreeFileInfo(fileInfo, 1);
|
||||
|
||||
// TODO: Non-recursive delete should fail?
|
||||
//EXPECT_NONZERO(hdfsDelete(fs, prefix, 0));
|
||||
EXPECT_ZERO(hdfsChown(fs, tmp, "ha", "doop2"));
|
||||
fileInfo = hdfsGetPathInfo(fs, tmp);
|
||||
EXPECT_NONNULL(fileInfo);
|
||||
EXPECT_ZERO(strcmp("ha", fileInfo->mOwner));
|
||||
EXPECT_ZERO(strcmp("doop2", fileInfo->mGroup));
|
||||
hdfsFreeFileInfo(fileInfo, 1);
|
||||
|
||||
EXPECT_ZERO(hdfsChown(fs, tmp, "ha2", NULL));
|
||||
fileInfo = hdfsGetPathInfo(fs, tmp);
|
||||
EXPECT_NONNULL(fileInfo);
|
||||
EXPECT_ZERO(strcmp("ha2", fileInfo->mOwner));
|
||||
EXPECT_ZERO(strcmp("doop2", fileInfo->mGroup));
|
||||
hdfsFreeFileInfo(fileInfo, 1);
|
||||
|
||||
EXPECT_ZERO(hdfsDelete(fs, prefix, 1));
|
||||
return 0;
|
||||
|
@ -134,7 +153,7 @@ static void *testHdfsOperations(void *v)
|
|||
|
||||
fprintf(stderr, "testHdfsOperations(threadIdx=%d): starting\n",
|
||||
ti->threadIdx);
|
||||
ret = hdfsSingleNameNodeConnect(nn, port, user, &fs);
|
||||
ret = hdfsSingleNameNodeConnect(cluster, &fs);
|
||||
if (ret) {
|
||||
fprintf(stderr, "testHdfsOperations(threadIdx=%d): "
|
||||
"hdfsSingleNameNodeConnect failed with error %d.\n",
|
||||
|
@ -181,19 +200,23 @@ static int checkFailures(struct tlhThreadInfo *ti, int tlhNumThreads)
|
|||
*/
|
||||
int main(int argc, const char *args[])
|
||||
{
|
||||
if (argc != 4) {
|
||||
fprintf(stderr, "usage: test_libhdfs_threaded <namenode> <port> <username>");
|
||||
return -1;
|
||||
}
|
||||
|
||||
nn = args[1];
|
||||
port = atoi(args[2]);
|
||||
user = args[3];
|
||||
|
||||
int i, tlhNumThreads;
|
||||
const char *tlhNumThreadsStr;
|
||||
struct tlhThreadInfo ti[TLH_MAX_THREADS];
|
||||
|
||||
if (argc != 2) {
|
||||
fprintf(stderr, "usage: test_libwebhdfs_threaded <username>\n");
|
||||
exit(1);
|
||||
}
|
||||
user = args[1];
|
||||
|
||||
struct NativeMiniDfsConf conf = {
|
||||
.doFormat = 1, .webhdfsEnabled = 1, .namenodeHttpPort = 50070,
|
||||
};
|
||||
cluster = nmdCreate(&conf);
|
||||
EXPECT_NONNULL(cluster);
|
||||
EXPECT_ZERO(nmdWaitClusterUp(cluster));
|
||||
|
||||
tlhNumThreadsStr = getenv("TLH_NUM_THREADS");
|
||||
if (!tlhNumThreadsStr) {
|
||||
tlhNumThreadsStr = "3";
|
||||
|
@ -210,8 +233,6 @@ int main(int argc, const char *args[])
|
|||
ti[i].threadIdx = i;
|
||||
}
|
||||
|
||||
// tlhSem = sem_open("sem", O_CREAT, 0644, tlhNumThreads);
|
||||
|
||||
for (i = 0; i < tlhNumThreads; i++) {
|
||||
EXPECT_ZERO(pthread_create(&ti[i].thread, NULL,
|
||||
testHdfsOperations, &ti[i]));
|
||||
|
@ -220,6 +241,7 @@ int main(int argc, const char *args[])
|
|||
EXPECT_ZERO(pthread_join(ti[i].thread, NULL));
|
||||
}
|
||||
|
||||
// EXPECT_ZERO(sem_close(tlhSem));
|
||||
EXPECT_ZERO(nmdShutdown(cluster));
|
||||
nmdFree(cluster);
|
||||
return checkFailures(ti, tlhNumThreads);
|
||||
}
|
||||
|
|
|
@ -22,97 +22,90 @@
|
|||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <inttypes.h>
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
hdfsFS fs;
|
||||
const char* writeFileName;
|
||||
off_t fileTotalSize;
|
||||
long long tmpBufferSize;
|
||||
tSize bufferSize = 0, totalWriteSize = 0, toWrite = 0, written = 0;
|
||||
hdfsFile writeFile = NULL;
|
||||
int append, i = 0;
|
||||
char* buffer = NULL;
|
||||
|
||||
if (argc != 6) {
|
||||
fprintf(stderr, "Usage: hdfs_write <filename> <filesize> <buffersize> <username> <append>\n");
|
||||
exit(-1);
|
||||
fprintf(stderr, "Usage: test_libwebhdfs_write <filename> <filesize> "
|
||||
"<buffersize> <username> <append>\n");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
hdfsFS fs = hdfsConnectAsUser("0.0.0.0", 50070, argv[4]);
|
||||
fs = hdfsConnectAsUser("default", 50070, argv[4]);
|
||||
if (!fs) {
|
||||
fprintf(stderr, "Oops! Failed to connect to hdfs!\n");
|
||||
exit(-1);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
const char* writeFileName = argv[1];
|
||||
off_t fileTotalSize = strtoul(argv[2], NULL, 10);
|
||||
long long tmpBufferSize = strtoul(argv[3], NULL, 10);
|
||||
writeFileName = argv[1];
|
||||
fileTotalSize = strtoul(argv[2], NULL, 10);
|
||||
tmpBufferSize = strtoul(argv[3], NULL, 10);
|
||||
|
||||
// sanity check
|
||||
if(fileTotalSize == ULONG_MAX && errno == ERANGE) {
|
||||
fprintf(stderr, "invalid file size %s - must be <= %lu\n", argv[2], ULONG_MAX);
|
||||
exit(-3);
|
||||
fprintf(stderr, "invalid file size %s - must be <= %lu\n",
|
||||
argv[2], ULONG_MAX);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
// currently libhdfs writes are of tSize which is int32
|
||||
if(tmpBufferSize > INT_MAX) {
|
||||
fprintf(stderr, "invalid buffer size libhdfs API write chunks must be <= %d\n",INT_MAX);
|
||||
exit(-3);
|
||||
fprintf(stderr,
|
||||
"invalid buffer size libhdfs API write chunks must be <= %d\n",
|
||||
INT_MAX);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
tSize bufferSize = tmpBufferSize;
|
||||
|
||||
hdfsFile writeFile = NULL;
|
||||
int append = atoi(argv[5]);
|
||||
bufferSize = (tSize) tmpBufferSize;
|
||||
append = atoi(argv[5]);
|
||||
if (!append) {
|
||||
writeFile = hdfsOpenFile(fs, writeFileName, O_WRONLY, bufferSize, 2, 0);
|
||||
} else {
|
||||
writeFile = hdfsOpenFile(fs, writeFileName, O_WRONLY | O_APPEND, bufferSize, 2, 0);
|
||||
writeFile = hdfsOpenFile(fs, writeFileName, O_WRONLY | O_APPEND,
|
||||
bufferSize, 2, 0);
|
||||
}
|
||||
if (!writeFile) {
|
||||
fprintf(stderr, "Failed to open %s for writing!\n", writeFileName);
|
||||
exit(-2);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
// data to be written to the file
|
||||
char* buffer = malloc(sizeof(char) * bufferSize + 1);
|
||||
buffer = malloc(sizeof(char) * bufferSize + 1);
|
||||
if(buffer == NULL) {
|
||||
fprintf(stderr, "Could not allocate buffer of size %d\n", bufferSize);
|
||||
return -2;
|
||||
exit(1);
|
||||
}
|
||||
int i = 0;
|
||||
for (i=0; i < bufferSize; ++i) {
|
||||
for (i = 0; i < bufferSize; ++i) {
|
||||
buffer[i] = 'a' + (i%26);
|
||||
}
|
||||
buffer[bufferSize] = '\0';
|
||||
|
||||
size_t totalWriteSize = 0;
|
||||
// write to the file
|
||||
totalWriteSize = 0;
|
||||
for (; totalWriteSize < fileTotalSize; ) {
|
||||
tSize toWrite = bufferSize < (fileTotalSize - totalWriteSize) ? bufferSize : (fileTotalSize - totalWriteSize);
|
||||
size_t written = hdfsWrite(fs, writeFile, (void*)buffer, toWrite);
|
||||
fprintf(stderr, "written size %ld, to write size %d\n", written, toWrite);
|
||||
toWrite = bufferSize < (fileTotalSize - totalWriteSize) ?
|
||||
bufferSize : (fileTotalSize - totalWriteSize);
|
||||
written = hdfsWrite(fs, writeFile, (void*)buffer, toWrite);
|
||||
fprintf(stderr, "written size %d, to write size %d\n",
|
||||
written, toWrite);
|
||||
totalWriteSize += written;
|
||||
//sleep(1);
|
||||
}
|
||||
|
||||
// cleanup
|
||||
free(buffer);
|
||||
hdfsCloseFile(fs, writeFile);
|
||||
|
||||
fprintf(stderr, "file total size: %lld, total write size: %ld\n", fileTotalSize, totalWriteSize);
|
||||
|
||||
hdfsFile readFile = hdfsOpenFile(fs, writeFileName, O_RDONLY, 0, 0, 0);
|
||||
//sleep(1);
|
||||
fprintf(stderr, "hdfsAvailable: %d\n", hdfsAvailable(fs, readFile));
|
||||
|
||||
hdfsFile writeFile2 = hdfsOpenFile(fs, writeFileName, O_WRONLY | O_APPEND, 0, 2, 0);
|
||||
fprintf(stderr, "Opened %s for writing successfully...\n", writeFileName);
|
||||
const char *content = "Hello, World!";
|
||||
size_t num_written_bytes = hdfsWrite(fs, writeFile2, content, strlen(content) + 1);
|
||||
if (num_written_bytes != strlen(content) + 1) {
|
||||
fprintf(stderr, "Failed to write correct number of bytes - expected %d, got %d\n",
|
||||
(int)(strlen(content) + 1), (int)num_written_bytes);
|
||||
exit(-1);
|
||||
}
|
||||
fprintf(stderr, "Wrote %zd bytes\n", num_written_bytes);
|
||||
|
||||
fprintf(stderr, "file total size: %" PRId64 ", total write size: %d\n",
|
||||
fileTotalSize, totalWriteSize);
|
||||
hdfsDisconnect(fs);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* vim: ts=4: sw=4: et:
|
||||
*/
|
||||
|
||||
|
|
|
@ -1,111 +0,0 @@
|
|||
#include "hdfs.h"
|
||||
|
||||
#include <time.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <sys/time.h>
|
||||
|
||||
#ifdef __MACH__
|
||||
#include <mach/clock.h>
|
||||
#include <mach/mach.h>
|
||||
#endif
|
||||
|
||||
void current_utc_time(struct timespec *ts) {
|
||||
#ifdef __MACH__ // OS X does not have clock_gettime, use clock_get_time
|
||||
clock_serv_t cclock;
|
||||
mach_timespec_t mts;
|
||||
host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock);
|
||||
clock_get_time(cclock, &mts);
|
||||
mach_port_deallocate(mach_task_self(), cclock);
|
||||
ts->tv_sec = mts.tv_sec;
|
||||
ts->tv_nsec = mts.tv_nsec;
|
||||
#else
|
||||
clock_gettime(CLOCK_REALTIME, ts);
|
||||
#endif
|
||||
|
||||
}
|
||||
|
||||
long get_time() {
|
||||
struct timespec tp;
|
||||
current_utc_time(&tp);
|
||||
return (long)((tp.tv_sec * 1000000000) + tp.tv_nsec);
|
||||
}
|
||||
|
||||
#define SIZE 512*1024*1024
|
||||
#define READ_SIZE 512*1024*1024
|
||||
#define DISCARD_COUNT 5
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
if (argc != 4) {
|
||||
fprintf(stderr, "Usage: test_read_bm <namenode> <user_name> <iteration_number>\n");
|
||||
exit(0);
|
||||
}
|
||||
|
||||
hdfsFS fs = hdfsConnectAsUser(argv[1], 50070, argv[2]);
|
||||
|
||||
/* printf("File is null: %d\n", file == NULL ? 1 : 0); */
|
||||
|
||||
char *buf = (char *) malloc(sizeof(unsigned char) * SIZE);
|
||||
|
||||
printf("Read size: %d\n", READ_SIZE);
|
||||
|
||||
int iterations = atoi(argv[3]);
|
||||
|
||||
if (iterations <= DISCARD_COUNT) {
|
||||
printf("Iterations should be at least %d\n", DISCARD_COUNT + 1);
|
||||
exit(0);
|
||||
}
|
||||
|
||||
printf("Running %d iterations\n", iterations);
|
||||
float time_total;
|
||||
float max = 0.f;
|
||||
float min = 999999999999999.f;
|
||||
|
||||
printf("Start...\n");
|
||||
int i;
|
||||
for (i=0; i<iterations; ++i) {
|
||||
long start = get_time();
|
||||
hdfsFile file = hdfsOpenFile(fs, "/tmp/512_mb.txt", O_RDONLY, 0, 0, 0);
|
||||
int n = 0;
|
||||
|
||||
while (n < SIZE) {
|
||||
int nread = hdfsRead(fs, file, buf + n, READ_SIZE);
|
||||
if (nread <= 0) {
|
||||
printf("EOF before finished, read %d bytes\n", n);
|
||||
hdfsDisconnect(fs);
|
||||
return 0;
|
||||
}
|
||||
n += nread;
|
||||
printf("Read %d kilobytes\n", nread / 1024);
|
||||
}
|
||||
|
||||
long end = get_time();
|
||||
printf("Read %d bytes, hoping for %d.\n", n, SIZE);
|
||||
long elapsed = (end - start);
|
||||
printf("Start: %lu, end: %lu\n", start, end);
|
||||
float time = elapsed / (1000000000.0f);
|
||||
printf ("Took %2.6fs\n", time);
|
||||
printf("Throughput: %2.2fMB/s\n", SIZE * 1.0f / (1024 * 1024 * time));
|
||||
if (i >= DISCARD_COUNT) {
|
||||
time_total += time;
|
||||
if (time < min) {
|
||||
min = time;
|
||||
}
|
||||
if (time > max) {
|
||||
max = time;
|
||||
}
|
||||
}
|
||||
}
|
||||
hdfsDisconnect(fs);
|
||||
printf("------\n");
|
||||
printf("Average time: %2.2fs\n", time_total / (iterations - DISCARD_COUNT));
|
||||
printf("Max. time: %2.2f, min. time: %2.2f\n", max, min);
|
||||
float maxt = SIZE * 1.f / (1024 * 1024 * max);
|
||||
float mint = SIZE * 1.f / (1024 * 1024 * min);
|
||||
printf("Average throughput: %2.2fMB/s\n", 1.f * SIZE * (iterations - DISCARD_COUNT) / (1024 * 1024 * time_total));
|
||||
printf("Max. throughput: %2.2f, min. throughput: %2.2f\n", maxt, mint);
|
||||
|
||||
// printf("File contents: %d\n", buf[0]);
|
||||
return 0;
|
||||
}
|
||||
|
|
@ -24,10 +24,15 @@
|
|||
#include <jni.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
|
||||
#define MINIDFS_CLUSTER_BUILDER "org/apache/hadoop/hdfs/MiniDFSCluster$Builder"
|
||||
#define MINIDFS_CLUSTER "org/apache/hadoop/hdfs/MiniDFSCluster"
|
||||
#define HADOOP_CONF "org/apache/hadoop/conf/Configuration"
|
||||
#define HADOOP_NAMENODE "org/apache/hadoop/hdfs/server/namenode/NameNode"
|
||||
#define JAVA_INETSOCKETADDRESS "java/net/InetSocketAddress"
|
||||
|
||||
#define DFS_WEBHDFS_ENABLED_KEY "dfs.webhdfs.enabled"
|
||||
|
||||
struct NativeMiniDfsCluster {
|
||||
/**
|
||||
|
@ -43,6 +48,7 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
|
|||
jvalue val;
|
||||
JNIEnv *env = getJNIEnv();
|
||||
jthrowable jthr;
|
||||
jstring jconfStr;
|
||||
|
||||
if (!env) {
|
||||
fprintf(stderr, "nmdCreate: unable to construct JNIEnv.\n");
|
||||
|
@ -59,6 +65,22 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
|
|||
"nmdCreate: new Configuration");
|
||||
goto error_free_cl;
|
||||
}
|
||||
if (conf->webhdfsEnabled) {
|
||||
jthr = newJavaStr(env, DFS_WEBHDFS_ENABLED_KEY, &jconfStr);
|
||||
if (jthr) {
|
||||
printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
|
||||
"nmdCreate: new String");
|
||||
goto error_dlr_cobj;
|
||||
}
|
||||
jthr = invokeMethod(env, NULL, INSTANCE, cobj, HADOOP_CONF,
|
||||
"setBoolean", "(Ljava/lang/String;Z)V",
|
||||
jconfStr, conf->webhdfsEnabled);
|
||||
if (jthr) {
|
||||
printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
|
||||
"nmdCreate: Configuration::setBoolean");
|
||||
goto error_dlr_cobj;
|
||||
}
|
||||
}
|
||||
jthr = constructNewObjectOfClass(env, &bld, MINIDFS_CLUSTER_BUILDER,
|
||||
"(L"HADOOP_CONF";)V", cobj);
|
||||
if (jthr) {
|
||||
|
@ -74,6 +96,16 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
|
|||
goto error_dlr_bld;
|
||||
}
|
||||
bld2 = val.l;
|
||||
if (conf->webhdfsEnabled) {
|
||||
jthr = invokeMethod(env, &val, INSTANCE, bld2, MINIDFS_CLUSTER_BUILDER,
|
||||
"nameNodeHttpPort", "(I)L" MINIDFS_CLUSTER_BUILDER ";",
|
||||
conf->namenodeHttpPort);
|
||||
if (jthr) {
|
||||
printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "nmdCreate: "
|
||||
"Builder::nameNodeHttpPort");
|
||||
goto error_dlr_bld2;
|
||||
}
|
||||
}
|
||||
jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
|
||||
"build", "()L" MINIDFS_CLUSTER ";");
|
||||
if (jthr) {
|
||||
|
@ -91,6 +123,7 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
|
|||
(*env)->DeleteLocalRef(env, bld2);
|
||||
(*env)->DeleteLocalRef(env, bld);
|
||||
(*env)->DeleteLocalRef(env, cobj);
|
||||
(*env)->DeleteLocalRef(env, jconfStr);
|
||||
return cl;
|
||||
|
||||
error_dlr_val:
|
||||
|
@ -101,6 +134,7 @@ error_dlr_bld:
|
|||
(*env)->DeleteLocalRef(env, bld);
|
||||
error_dlr_cobj:
|
||||
(*env)->DeleteLocalRef(env, cobj);
|
||||
(*env)->DeleteLocalRef(env, jconfStr);
|
||||
error_free_cl:
|
||||
free(cl);
|
||||
error:
|
||||
|
@ -177,3 +211,69 @@ int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl)
|
|||
}
|
||||
return jVal.i;
|
||||
}
|
||||
|
||||
int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl,
|
||||
int *port, const char **hostName)
|
||||
{
|
||||
JNIEnv *env = getJNIEnv();
|
||||
jvalue jVal;
|
||||
jobject jNameNode, jAddress;
|
||||
jthrowable jthr;
|
||||
int ret = 0;
|
||||
const char *host;
|
||||
|
||||
if (!env) {
|
||||
fprintf(stderr, "nmdHdfsConnect: getJNIEnv failed\n");
|
||||
return -EIO;
|
||||
}
|
||||
// First get the (first) NameNode of the cluster
|
||||
jthr = invokeMethod(env, &jVal, INSTANCE, cl->obj, MINIDFS_CLUSTER,
|
||||
"getNameNode", "()L" HADOOP_NAMENODE ";");
|
||||
if (jthr) {
|
||||
printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
|
||||
"nmdGetNameNodeHttpAddress: "
|
||||
"MiniDFSCluster#getNameNode");
|
||||
return -EIO;
|
||||
}
|
||||
jNameNode = jVal.l;
|
||||
|
||||
// Then get the http address (InetSocketAddress) of the NameNode
|
||||
jthr = invokeMethod(env, &jVal, INSTANCE, jNameNode, HADOOP_NAMENODE,
|
||||
"getHttpAddress", "()L" JAVA_INETSOCKETADDRESS ";");
|
||||
if (jthr) {
|
||||
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
|
||||
"nmdGetNameNodeHttpAddress: "
|
||||
"NameNode#getHttpAddress");
|
||||
goto error_dlr_nn;
|
||||
}
|
||||
jAddress = jVal.l;
|
||||
|
||||
jthr = invokeMethod(env, &jVal, INSTANCE, jAddress,
|
||||
JAVA_INETSOCKETADDRESS, "getPort", "()I");
|
||||
if (jthr) {
|
||||
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
|
||||
"nmdGetNameNodeHttpAddress: "
|
||||
"InetSocketAddress#getPort");
|
||||
goto error_dlr_addr;
|
||||
}
|
||||
*port = jVal.i;
|
||||
|
||||
jthr = invokeMethod(env, &jVal, INSTANCE, jAddress, JAVA_INETSOCKETADDRESS,
|
||||
"getHostName", "()Ljava/lang/String;");
|
||||
if (jthr) {
|
||||
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
|
||||
"nmdGetNameNodeHttpAddress: "
|
||||
"InetSocketAddress#getHostName");
|
||||
goto error_dlr_addr;
|
||||
}
|
||||
host = (*env)->GetStringUTFChars(env, jVal.l, NULL);
|
||||
*hostName = strdup(host);
|
||||
(*env)->ReleaseStringUTFChars(env, jVal.l, host);
|
||||
|
||||
error_dlr_addr:
|
||||
(*env)->DeleteLocalRef(env, jAddress);
|
||||
error_dlr_nn:
|
||||
(*env)->DeleteLocalRef(env, jNameNode);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -31,6 +31,14 @@ struct NativeMiniDfsConf {
|
|||
* Nonzero if the cluster should be formatted prior to startup
|
||||
*/
|
||||
jboolean doFormat;
|
||||
/**
|
||||
* Whether or not to enable webhdfs in MiniDfsCluster
|
||||
*/
|
||||
jboolean webhdfsEnabled;
|
||||
/**
|
||||
* The http port of the namenode in MiniDfsCluster
|
||||
*/
|
||||
jint namenodeHttpPort;
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -76,5 +84,21 @@ void nmdFree(struct NativeMiniDfsCluster* cl);
|
|||
*
|
||||
* @return the port, or a negative error code
|
||||
*/
|
||||
int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl);
|
||||
int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl);
|
||||
|
||||
/**
|
||||
* Get the http address that's in use by the given (non-HA) nativeMiniDfs
|
||||
*
|
||||
* @param cl The initialized NativeMiniDfsCluster
|
||||
* @param port Used to capture the http port of the NameNode
|
||||
* of the NativeMiniDfsCluster
|
||||
* @param hostName Used to capture the http hostname of the NameNode
|
||||
* of the NativeMiniDfsCluster
|
||||
*
|
||||
* @return 0 on success; a non-zero error code if failing to
|
||||
* get the information.
|
||||
*/
|
||||
int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl,
|
||||
int *port, const char **hostName);
|
||||
|
||||
#endif
|
||||
|
|
Loading…
Reference in New Issue