Merge r1403306 through r1404284 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1404285 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-10-31 18:34:51 +00:00
commit e341184d54
55 changed files with 1139 additions and 755 deletions

View File

@ -1106,6 +1106,8 @@ Release 0.23.5 - UNRELEASED
HADOOP-8962. RawLocalFileSystem.listStatus fails when a child filename HADOOP-8962. RawLocalFileSystem.listStatus fails when a child filename
contains a colon (jlowe via bobby) contains a colon (jlowe via bobby)
HADOOP-8986. Server$Call object is never released after it is sent (bobby)
Release 0.23.4 - UNRELEASED Release 0.23.4 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -75,6 +75,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.JsonGenerator;
@ -2002,13 +2003,16 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
continue; continue;
Element field = (Element)fieldNode; Element field = (Element)fieldNode;
if ("name".equals(field.getTagName()) && field.hasChildNodes()) if ("name".equals(field.getTagName()) && field.hasChildNodes())
attr = ((Text)field.getFirstChild()).getData().trim(); attr = StringInterner.weakIntern(
((Text)field.getFirstChild()).getData().trim());
if ("value".equals(field.getTagName()) && field.hasChildNodes()) if ("value".equals(field.getTagName()) && field.hasChildNodes())
value = ((Text)field.getFirstChild()).getData(); value = StringInterner.weakIntern(
((Text)field.getFirstChild()).getData());
if ("final".equals(field.getTagName()) && field.hasChildNodes()) if ("final".equals(field.getTagName()) && field.hasChildNodes())
finalParameter = "true".equals(((Text)field.getFirstChild()).getData()); finalParameter = "true".equals(((Text)field.getFirstChild()).getData());
if ("source".equals(field.getTagName()) && field.hasChildNodes()) if ("source".equals(field.getTagName()) && field.hasChildNodes())
source.add(((Text)field.getFirstChild()).getData()); source.add(StringInterner.weakIntern(
((Text)field.getFirstChild()).getData()));
} }
source.add(name); source.add(name);

View File

@ -974,6 +974,8 @@ public abstract class Server {
return true; return true;
} }
if (!call.rpcResponse.hasRemaining()) { if (!call.rpcResponse.hasRemaining()) {
//Clear out the response buffer so it can be collected
call.rpcResponse = null;
call.connection.decRpcCount(); call.connection.decRpcCount();
if (numElements == 1) { // last call fully processes. if (numElements == 1) { // last call fully processes.
done = true; // no more data for this channel. done = true; // no more data for this channel.

View File

@ -59,6 +59,9 @@ public class StringInterner {
* @return strong reference to interned string instance * @return strong reference to interned string instance
*/ */
public static String strongIntern(String sample) { public static String strongIntern(String sample) {
if (sample == null) {
return null;
}
return strongInterner.intern(sample); return strongInterner.intern(sample);
} }
@ -72,6 +75,9 @@ public class StringInterner {
* @return weak reference to interned string instance * @return weak reference to interned string instance
*/ */
public static String weakIntern(String sample) { public static String weakIntern(String sample) {
if (sample == null) {
return null;
}
return weakInterner.intern(sample); return weakInterner.intern(sample);
} }

View File

@ -103,18 +103,11 @@ Trunk (Unreleased)
HDFS-3510. Editlog pre-allocation is performed prior to writing edits HDFS-3510. Editlog pre-allocation is performed prior to writing edits
to avoid partial edits case disk out of space.(Colin McCabe via suresh) to avoid partial edits case disk out of space.(Colin McCabe via suresh)
HDFS-3573. Supply NamespaceInfo when instantiating JournalManagers (todd)
HDFS-3630 Modify TestPersistBlocks to use both flush and hflush (sanjay) HDFS-3630 Modify TestPersistBlocks to use both flush and hflush (sanjay)
HDFS-3768. Exception in TestJettyHelper is incorrect. HDFS-3768. Exception in TestJettyHelper is incorrect.
(Eli Reisman via jghoman) (Eli Reisman via jghoman)
HDFS-3695. Genericize format() to non-file JournalManagers. (todd)
HDFS-3789. JournalManager#format() should be able to throw IOException
(Ivan Kelly via todd)
HDFS-3723. Add support -h, -help to all the commands. (Jing Zhao via HDFS-3723. Add support -h, -help to all the commands. (Jing Zhao via
suresh) suresh)
@ -152,12 +145,12 @@ Trunk (Unreleased)
HDFS-4110. Refine a log printed in JNStorage. (Liang Xie via suresh) HDFS-4110. Refine a log printed in JNStorage. (Liang Xie via suresh)
HDFS-4122. Cleanup HDFS logs and reduce the size of logged messages.
(suresh)
HDFS-4124. Refactor INodeDirectory#getExistingPathINodes() to enable HDFS-4124. Refactor INodeDirectory#getExistingPathINodes() to enable
returningmore than INode array. (Jing Zhao via suresh) returningmore than INode array. (Jing Zhao via suresh)
HDFS-4129. Add utility methods to dump NameNode in memory tree for
testing. (szetszwo via suresh)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES
@ -356,6 +349,9 @@ Release 2.0.3-alpha - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
HDFS-4122. Cleanup HDFS logs and reduce the size of logged messages.
(suresh)
NEW FEATURES NEW FEATURES
HDFS-2656. Add libwebhdfs, a pure C client based on WebHDFS. HDFS-2656. Add libwebhdfs, a pure C client based on WebHDFS.
@ -440,6 +436,15 @@ Release 2.0.3-alpha - Unreleased
HDFS-4121. Add namespace declarations in hdfs .proto files for languages HDFS-4121. Add namespace declarations in hdfs .proto files for languages
other than java. (Binglin Chang via suresh) other than java. (Binglin Chang via suresh)
HDFS-3573. Supply NamespaceInfo when instantiating JournalManagers (todd)
HDFS-3695. Genericize format() to non-file JournalManagers. (todd)
HDFS-3789. JournalManager#format() should be able to throw IOException
(Ivan Kelly via todd)
HDFS-3916. libwebhdfs testing code cleanup. (Jing Zhao via suresh)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES
@ -522,6 +527,15 @@ Release 2.0.3-alpha - Unreleased
HDFS-3616. Fix a ConcurrentModificationException bug that BP actor threads HDFS-3616. Fix a ConcurrentModificationException bug that BP actor threads
may not be shutdown properly in DataNode. (Jing Zhao via szetszwo) may not be shutdown properly in DataNode. (Jing Zhao via szetszwo)
HDFS-4127. Log message is not correct in case of short of replica.
(Junping Du via suresh)
HADOOP-8994. TestDFSShell creates file named "noFileHere", making further
tests hard to understand (Andy Isaacson via daryn)
HDFS-3809. Make BKJM use protobufs for all serialization with ZK.
(Ivan Kelly via umamahesh)
Release 2.0.2-alpha - 2012-09-07 Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
@ -1048,8 +1062,6 @@ Release 2.0.2-alpha - 2012-09-07
HDFS-3828. Block Scanner rescans blocks too frequently. HDFS-3828. Block Scanner rescans blocks too frequently.
(Andy Isaacson via eli) (Andy Isaacson via eli)
HDFS-3809. Make BKJM use protobufs for all serialization with ZK.(Ivan Kelly via umamahesh)
HDFS-3895. hadoop-client must include commons-cli (tucu) HDFS-3895. hadoop-client must include commons-cli (tucu)
HDFS-2757. Cannot read a local block that's being written to when HDFS-2757. Cannot read a local block that's being written to when

View File

@ -1,180 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "expect.h"
#include "hdfs.h"
#include <errno.h>
#include <semaphore.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "hdfs_http_client.h"
#include "hdfs_http_query.h"
#include "hdfs_json_parser.h"
#include <unistd.h>
#include <curl/curl.h>
#define TLH_MAX_THREADS 100
static sem_t *tlhSem;
static const char *nn;
static const char *user;
static int port;
static const char *fileName = "/tmp/tlhData";
struct tlhThreadInfo {
/** Thread index */
int threadIdx;
/** 0 = thread was successful; error code otherwise */
int success;
/** pthread identifier */
pthread_t thread;
};
static int hdfsSingleNameNodeConnect(const char *nn, int port, const char *user, hdfsFS *fs)
{
hdfsFS hdfs;
if (port < 0) {
fprintf(stderr, "hdfsSingleNameNodeConnect: nmdGetNameNodePort "
"returned error %d\n", port);
return port;
}
hdfs = hdfsConnectAsUserNewInstance(nn, port, user);
if (!hdfs) {
return -errno;
}
*fs = hdfs;
return 0;
}
static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs)
{
hdfsFile file;
int ret = 0;
char buffer[1024 * (ti->threadIdx + 1)];
memset(buffer, 'a', sizeof(buffer));
file = hdfsOpenFile(fs, "/tmp/thread_test.txt", O_WRONLY, 0, 0, 0);
sleep(1);
hdfsCloseFile(fs, file);
return ret;
}
static void *testHdfsOperations(void *v)
{
struct tlhThreadInfo *ti = (struct tlhThreadInfo*)v;
hdfsFS fs = NULL;
int ret;
fprintf(stderr, "testHdfsOperations(threadIdx=%d): starting\n",
ti->threadIdx);
ret = hdfsSingleNameNodeConnect(nn, port, user, &fs);
if (ret) {
fprintf(stderr, "testHdfsOperations(threadIdx=%d): "
"hdfsSingleNameNodeConnect failed with error %d.\n",
ti->threadIdx, ret);
ti->success = EIO;
return NULL;
}
ti->success = doTestHdfsOperations(ti, fs);
if (hdfsDisconnect(fs)) {
ret = errno;
fprintf(stderr, "hdfsDisconnect error %d\n", ret);
ti->success = ret;
}
return NULL;
}
static int checkFailures(struct tlhThreadInfo *ti, int tlhNumThreads)
{
int i, threadsFailed = 0;
const char *sep = "";
for (i = 0; i < tlhNumThreads; i++) {
if (ti[i].success != 0) {
threadsFailed = 1;
}
}
if (!threadsFailed) {
fprintf(stderr, "testLibHdfs: all threads succeeded. SUCCESS.\n");
return EXIT_SUCCESS;
}
fprintf(stderr, "testLibHdfs: some threads failed: [");
for (i = 0; i < tlhNumThreads; i++) {
if (ti[i].success != 0) {
fprintf(stderr, "%s%d", sep, i);
sep = ", ";
}
}
fprintf(stderr, "]. FAILURE.\n");
return EXIT_FAILURE;
}
/**
* Test that we can write a file with libhdfs and then read it back
*/
int main(int argc, const char *args[])
{
if (argc != 4) {
fprintf(stderr, "usage: test_libhdfs_threaded <namenode> <port> <username>");
return -1;
}
nn = args[1];
port = atoi(args[2]);
user = args[3];
int i, tlhNumThreads;
const char *tlhNumThreadsStr;
struct tlhThreadInfo ti[TLH_MAX_THREADS];
tlhNumThreadsStr = getenv("TLH_NUM_THREADS");
if (!tlhNumThreadsStr) {
tlhNumThreadsStr = "3";
}
tlhNumThreads = atoi(tlhNumThreadsStr);
if ((tlhNumThreads <= 0) || (tlhNumThreads > TLH_MAX_THREADS)) {
fprintf(stderr, "testLibHdfs: must have a number of threads "
"between 1 and %d inclusive, not %d\n",
TLH_MAX_THREADS, tlhNumThreads);
return EXIT_FAILURE;
}
memset(&ti[0], 0, sizeof(ti));
for (i = 0; i < tlhNumThreads; i++) {
ti[i].threadIdx = i;
}
tlhSem = sem_open("sem", O_CREAT, 0644, tlhNumThreads);
for (i = 0; i < tlhNumThreads; i++) {
fprintf(stderr, "\ncreating thread %d\n", i);
EXPECT_ZERO(pthread_create(&ti[i].thread, NULL,
testHdfsOperations, &ti[i]));
}
for (i = 0; i < tlhNumThreads; i++) {
EXPECT_ZERO(pthread_join(ti[i].thread, NULL));
}
EXPECT_ZERO(sem_close(tlhSem));
return checkFailures(ti, tlhNumThreads);
}

View File

@ -17,6 +17,7 @@
*/ */
#include "hdfs.h" #include "hdfs.h"
#include "native_mini_dfs.h"
#include <inttypes.h> #include <inttypes.h>
#include <jni.h> #include <jni.h>
@ -26,228 +27,254 @@
#include <time.h> #include <time.h>
#include <unistd.h> #include <unistd.h>
void permission_disp(short permissions, char *rtr) { static struct NativeMiniDfsCluster *cluster;
void permission_disp(short permissions, char *rtr)
{
rtr[9] = '\0'; rtr[9] = '\0';
int i; int i;
short perm;
for(i = 2; i >= 0; i--) for(i = 2; i >= 0; i--)
{ {
short permissionsId = permissions >> (i * 3) & (short)7; perm = permissions >> (i * 3);
char* perm; rtr[0] = perm & 4 ? 'r' : '-';
switch(permissionsId) { rtr[1] = perm & 2 ? 'w' : '-';
case 7: rtr[2] = perm & 1 ? 'x' : '-';
perm = "rwx"; break;
case 6:
perm = "rw-"; break;
case 5:
perm = "r-x"; break;
case 4:
perm = "r--"; break;
case 3:
perm = "-wx"; break;
case 2:
perm = "-w-"; break;
case 1:
perm = "--x"; break;
case 0:
perm = "---"; break;
default:
perm = "???";
}
strncpy(rtr, perm, 3);
rtr += 3; rtr += 3;
} }
} }
int main(int argc, char **argv) { int main(int argc, char **argv)
if (argc != 2) { {
fprintf(stderr, "usage: test_libwebhdfs_ops <username>\n");
return -1;
}
char buffer[32]; char buffer[32];
tSize num_written_bytes; tSize num_written_bytes;
const char* slashTmp = "/tmp";
int nnPort;
char *rwTemplate, *rwTemplate2, *newDirTemplate,
*appendTemplate, *userTemplate, *rwPath = NULL;
const char* fileContents = "Hello, World!";
const char* nnHost = NULL;
hdfsFS fs = hdfsConnectAsUserNewInstance("default", 50070, argv[1]); if (argc != 2) {
fprintf(stderr, "usage: test_libwebhdfs_ops <username>\n");
exit(1);
}
struct NativeMiniDfsConf conf = {
.doFormat = 1, .webhdfsEnabled = 1, .namenodeHttpPort = 50070,
};
cluster = nmdCreate(&conf);
if (!cluster) {
fprintf(stderr, "Failed to create the NativeMiniDfsCluster.\n");
exit(1);
}
if (nmdWaitClusterUp(cluster)) {
fprintf(stderr, "Error when waiting for cluster to be ready.\n");
exit(1);
}
if (nmdGetNameNodeHttpAddress(cluster, &nnPort, &nnHost)) {
fprintf(stderr, "Error when retrieving namenode host address.\n");
exit(1);
}
hdfsFS fs = hdfsConnectAsUserNewInstance(nnHost, nnPort, argv[1]);
if(!fs) { if(!fs) {
fprintf(stderr, "Oops! Failed to connect to hdfs!\n"); fprintf(stderr, "Oops! Failed to connect to hdfs!\n");
exit(-1); exit(-1);
} }
const char* writePath = "/tmp/testfile.txt";
const char* fileContents = "Hello, World!";
{ {
// Write tests // Write tests
rwTemplate = strdup("/tmp/helloWorldXXXXXX");
hdfsFile writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_CREAT, 0, 0, 0); if (!rwTemplate) {
if(!writeFile) { fprintf(stderr, "Failed to create rwTemplate!\n");
fprintf(stderr, "Failed to open %s for writing!\n", writePath); exit(1);
exit(-1);
} }
fprintf(stderr, "Opened %s for writing successfully...\n", writePath); rwPath = mktemp(rwTemplate);
num_written_bytes = hdfsWrite(fs, writeFile, (void*)fileContents, strlen(fileContents) + 1); // hdfsOpenFile
hdfsFile writeFile = hdfsOpenFile(fs, rwPath,
O_WRONLY|O_CREAT, 0, 0, 0);
if(!writeFile) {
fprintf(stderr, "Failed to open %s for writing!\n", rwPath);
exit(1);
}
fprintf(stderr, "Opened %s for writing successfully...\n", rwPath);
// hdfsWrite
num_written_bytes = hdfsWrite(fs, writeFile, (void*)fileContents,
(int) strlen(fileContents) + 1);
if (num_written_bytes != strlen(fileContents) + 1) { if (num_written_bytes != strlen(fileContents) + 1) {
fprintf(stderr, "Failed to write correct number of bytes - expected %d, got %d\n", fprintf(stderr, "Failed to write correct number of bytes - "
"expected %d, got %d\n",
(int)(strlen(fileContents) + 1), (int) num_written_bytes); (int)(strlen(fileContents) + 1), (int) num_written_bytes);
exit(-1); exit(1);
} }
fprintf(stderr, "Wrote %d bytes\n", num_written_bytes); fprintf(stderr, "Wrote %d bytes\n", num_written_bytes);
// hdfsTell
tOffset currentPos = -1; tOffset currentPos = -1;
if ((currentPos = hdfsTell(fs, writeFile)) == -1) { if ((currentPos = hdfsTell(fs, writeFile)) == -1) {
fprintf(stderr, fprintf(stderr,
"Failed to get current file position correctly! Got %lld!\n", "Failed to get current file position correctly. Got %"
currentPos); PRId64 "!\n", currentPos);
exit(-1); exit(1);
} }
fprintf(stderr, "Current position: %lld\n", currentPos); fprintf(stderr, "Current position: %" PRId64 "\n", currentPos);
if (hdfsFlush(fs, writeFile)) {
fprintf(stderr, "Failed to 'flush' %s\n", writePath);
exit(-1);
}
fprintf(stderr, "Flushed %s successfully!\n", writePath);
if (hdfsHFlush(fs, writeFile)) {
fprintf(stderr, "Failed to 'hflush' %s\n", writePath);
exit(-1);
}
fprintf(stderr, "HFlushed %s successfully!\n", writePath);
hdfsCloseFile(fs, writeFile); hdfsCloseFile(fs, writeFile);
// Done test write
} }
sleep(1);
{ {
//Read tests //Read tests
sleep(1); int available = 0, exists = 0;
const char* readPath = "/tmp/testfile.txt";
int exists = hdfsExists(fs, readPath);
// hdfsExists
exists = hdfsExists(fs, rwPath);
if (exists) { if (exists) {
fprintf(stderr, "Failed to validate existence of %s\n", readPath); fprintf(stderr, "Failed to validate existence of %s\n", rwPath);
exists = hdfsExists(fs, readPath); exists = hdfsExists(fs, rwPath);
if (exists) { if (exists) {
fprintf(stderr, "Still failed to validate existence of %s\n", readPath); fprintf(stderr,
exit(-1); "Still failed to validate existence of %s\n", rwPath);
exit(1);
} }
} }
hdfsFile readFile = hdfsOpenFile(fs, readPath, O_RDONLY, 0, 0, 0); hdfsFile readFile = hdfsOpenFile(fs, rwPath, O_RDONLY, 0, 0, 0);
if (!readFile) { if (!readFile) {
fprintf(stderr, "Failed to open %s for reading!\n", readPath); fprintf(stderr, "Failed to open %s for reading!\n", rwPath);
exit(-1); exit(1);
} }
if (!hdfsFileIsOpenForRead(readFile)) { if (!hdfsFileIsOpenForRead(readFile)) {
fprintf(stderr, "hdfsFileIsOpenForRead: we just opened a file " fprintf(stderr, "hdfsFileIsOpenForRead: we just opened a file "
"with O_RDONLY, and it did not show up as 'open for " "with O_RDONLY, and it did not show up as 'open for "
"read'\n"); "read'\n");
exit(-1); exit(1);
} }
fprintf(stderr, "hdfsAvailable: %d\n", hdfsAvailable(fs, readFile)); available = hdfsAvailable(fs, readFile);
fprintf(stderr, "hdfsAvailable: %d\n", available);
// hdfsSeek, hdfsTell
tOffset seekPos = 1; tOffset seekPos = 1;
if(hdfsSeek(fs, readFile, seekPos)) { if(hdfsSeek(fs, readFile, seekPos)) {
fprintf(stderr, "Failed to seek %s for reading!\n", readPath); fprintf(stderr, "Failed to seek %s for reading!\n", rwPath);
exit(-1); exit(1);
} }
tOffset currentPos = -1; tOffset currentPos = -1;
if((currentPos = hdfsTell(fs, readFile)) != seekPos) { if((currentPos = hdfsTell(fs, readFile)) != seekPos) {
fprintf(stderr, fprintf(stderr,
"Failed to get current file position correctly! Got %lld!\n", "Failed to get current file position correctly! Got %"
currentPos); PRId64 "!\n", currentPos);
exit(-1);
exit(1);
} }
fprintf(stderr, "Current position: %lld\n", currentPos); fprintf(stderr, "Current position: %" PRId64 "\n", currentPos);
if (!hdfsFileUsesDirectRead(readFile)) {
fprintf(stderr, "Direct read support incorrectly not detected "
"for HDFS filesystem\n");
exit(-1);
}
fprintf(stderr, "Direct read support detected for HDFS\n");
// Test the direct read path
if(hdfsSeek(fs, readFile, 0)) { if(hdfsSeek(fs, readFile, 0)) {
fprintf(stderr, "Failed to seek %s for reading!\n", readPath); fprintf(stderr, "Failed to seek %s for reading!\n", rwPath);
exit(-1); exit(1);
} }
// hdfsRead
memset(buffer, 0, sizeof(buffer)); memset(buffer, 0, sizeof(buffer));
tSize num_read_bytes = hdfsRead(fs, readFile, (void*)buffer, tSize num_read_bytes = hdfsRead(fs, readFile, buffer, sizeof(buffer));
sizeof(buffer));
if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) { if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
fprintf(stderr, "Failed to read (direct). Expected %s but got %s (%d bytes)\n", fprintf(stderr, "Failed to read (direct). "
"Expected %s but got %s (%d bytes)\n",
fileContents, buffer, num_read_bytes); fileContents, buffer, num_read_bytes);
exit(-1); exit(1);
} }
fprintf(stderr, "Read following %d bytes:\n%s\n", fprintf(stderr, "Read following %d bytes:\n%s\n",
num_read_bytes, buffer); num_read_bytes, buffer);
if (hdfsSeek(fs, readFile, 0L)) { if (hdfsSeek(fs, readFile, 0L)) {
fprintf(stderr, "Failed to seek to file start!\n"); fprintf(stderr, "Failed to seek to file start!\n");
exit(-1); exit(1);
} }
// Disable the direct read path so that we really go through the slow // hdfsPread
// read path
hdfsFileDisableDirectRead(readFile);
num_read_bytes = hdfsRead(fs, readFile, (void*)buffer,
sizeof(buffer));
fprintf(stderr, "Read following %d bytes:\n%s\n",
num_read_bytes, buffer);
memset(buffer, 0, strlen(fileContents + 1)); memset(buffer, 0, strlen(fileContents + 1));
num_read_bytes = hdfsPread(fs, readFile, 0, buffer, sizeof(buffer));
num_read_bytes = hdfsPread(fs, readFile, 0, (void*)buffer,
sizeof(buffer));
fprintf(stderr, "Read following %d bytes:\n%s\n", fprintf(stderr, "Read following %d bytes:\n%s\n",
num_read_bytes, buffer); num_read_bytes, buffer);
hdfsCloseFile(fs, readFile); hdfsCloseFile(fs, readFile);
// Done test read
} }
int totalResult = 0; int totalResult = 0;
int result = 0; int result = 0;
{ {
//Generic file-system operations //Generic file-system operations
char *srcPath = rwPath;
const char* srcPath = "/tmp/testfile.txt";
const char* dstPath = "/tmp/testfile2.txt";
const char* copyPath = "/tmp/testfile_copy.txt";
const char* movePath = "/tmp/testfile_move.txt";
fprintf(stderr, "hdfsCopy: %s\n", ((result = hdfsCopy(fs, srcPath, fs, copyPath)) ? "Failed!" : "Success!"));
totalResult += result;
fprintf(stderr, "hdfsMove: %s\n", ((result = hdfsMove(fs, copyPath, fs, movePath)) ? "Failed!" : "Success!"));
totalResult += result;
fprintf(stderr, "hdfsGetDefaultBlockSize: %lld\n", hdfsGetDefaultBlockSize(fs));
fprintf(stderr, "hdfsRename: %s\n", ((result = hdfsRename(fs, srcPath, dstPath)) ? "Failed!" : "Success!"));
totalResult += result;
fprintf(stderr, "hdfsRename back: %s\n", ((result = hdfsRename(fs, dstPath, srcPath)) ? "Failed!" : "Success!"));
totalResult += result;
const char* slashTmp = "/tmp";
const char* newDirectory = "/tmp/newdir";
fprintf(stderr, "hdfsCreateDirectory: %s\n", ((result = hdfsCreateDirectory(fs, newDirectory)) ? "Failed!" : "Success!"));
totalResult += result;
fprintf(stderr, "hdfsSetReplication: %s\n", ((result = hdfsSetReplication(fs, srcPath, 1)) ? "Failed!" : "Success!"));
totalResult += result;
char buffer[256]; char buffer[256];
const char *resp; const char *resp;
fprintf(stderr, "hdfsGetWorkingDirectory: %s\n", ((resp = hdfsGetWorkingDirectory(fs, buffer, sizeof(buffer))) ? buffer : "Failed!")); rwTemplate2 = strdup("/tmp/helloWorld2XXXXXX");
totalResult += (resp ? 0 : 1); if (!rwTemplate2) {
fprintf(stderr, "hdfsSetWorkingDirectory: %s\n", ((result = hdfsSetWorkingDirectory(fs, slashTmp)) ? "Failed!" : "Success!")); fprintf(stderr, "Failed to create rwTemplate2!\n");
exit(1);
}
char *dstPath = mktemp(rwTemplate2);
newDirTemplate = strdup("/tmp/newdirXXXXXX");
if (!newDirTemplate) {
fprintf(stderr, "Failed to create newDirTemplate!\n");
exit(1);
}
char *newDirectory = mktemp(newDirTemplate);
// hdfsRename
fprintf(stderr, "hdfsRename: %s\n",
((result = hdfsRename(fs, rwPath, dstPath)) ?
"Failed!" : "Success!"));
totalResult += result; totalResult += result;
fprintf(stderr, "hdfsGetWorkingDirectory: %s\n", ((resp = hdfsGetWorkingDirectory(fs, buffer, sizeof(buffer))) ? buffer : "Failed!")); fprintf(stderr, "hdfsRename back: %s\n",
((result = hdfsRename(fs, dstPath, srcPath)) ?
"Failed!" : "Success!"));
totalResult += result;
// hdfsCreateDirectory
fprintf(stderr, "hdfsCreateDirectory: %s\n",
((result = hdfsCreateDirectory(fs, newDirectory)) ?
"Failed!" : "Success!"));
totalResult += result;
// hdfsSetReplication
fprintf(stderr, "hdfsSetReplication: %s\n",
((result = hdfsSetReplication(fs, srcPath, 1)) ?
"Failed!" : "Success!"));
totalResult += result;
// hdfsGetWorkingDirectory, hdfsSetWorkingDirectory
fprintf(stderr, "hdfsGetWorkingDirectory: %s\n",
((resp = hdfsGetWorkingDirectory(fs, buffer, sizeof(buffer))) ?
buffer : "Failed!"));
totalResult += (resp ? 0 : 1); totalResult += (resp ? 0 : 1);
const char* path[] = {"/foo", "/foo/bar", "foobar", "//foo/bar//foobar",
"foo//bar", "foo/bar///", "/", "////"};
for (int i = 0; i < 8; i++) {
fprintf(stderr, "hdfsSetWorkingDirectory: %s, %s\n",
((result = hdfsSetWorkingDirectory(fs, path[i])) ?
"Failed!" : "Success!"),
hdfsGetWorkingDirectory(fs, buffer, sizeof(buffer)));
totalResult += result;
}
fprintf(stderr, "hdfsSetWorkingDirectory: %s\n",
((result = hdfsSetWorkingDirectory(fs, slashTmp)) ?
"Failed!" : "Success!"));
totalResult += result;
fprintf(stderr, "hdfsGetWorkingDirectory: %s\n",
((resp = hdfsGetWorkingDirectory(fs, buffer, sizeof(buffer))) ?
buffer : "Failed!"));
totalResult += (resp ? 0 : 1);
// hdfsGetPathInfo
hdfsFileInfo *fileInfo = NULL; hdfsFileInfo *fileInfo = NULL;
if((fileInfo = hdfsGetPathInfo(fs, slashTmp)) != NULL) { if((fileInfo = hdfsGetPathInfo(fs, slashTmp)) != NULL) {
fprintf(stderr, "hdfsGetPathInfo - SUCCESS!\n"); fprintf(stderr, "hdfsGetPathInfo - SUCCESS!\n");
@ -261,13 +288,15 @@ int main(int argc, char **argv) {
fprintf(stderr, "Group: %s, ", fileInfo->mGroup); fprintf(stderr, "Group: %s, ", fileInfo->mGroup);
char permissions[10]; char permissions[10];
permission_disp(fileInfo->mPermissions, permissions); permission_disp(fileInfo->mPermissions, permissions);
fprintf(stderr, "Permissions: %d (%s)\n", fileInfo->mPermissions, permissions); fprintf(stderr, "Permissions: %d (%s)\n",
fileInfo->mPermissions, permissions);
hdfsFreeFileInfo(fileInfo, 1); hdfsFreeFileInfo(fileInfo, 1);
} else { } else {
totalResult++; totalResult++;
fprintf(stderr, "waah! hdfsGetPathInfo for %s - FAILED!\n", slashTmp); fprintf(stderr, "hdfsGetPathInfo for %s - FAILED!\n", slashTmp);
} }
// hdfsListDirectory
hdfsFileInfo *fileList = 0; hdfsFileInfo *fileList = 0;
int numEntries = 0; int numEntries = 0;
if((fileList = hdfsListDirectory(fs, slashTmp, &numEntries)) != NULL) { if((fileList = hdfsListDirectory(fs, slashTmp, &numEntries)) != NULL) {
@ -283,7 +312,8 @@ int main(int argc, char **argv) {
fprintf(stderr, "Group: %s, ", fileList[i].mGroup); fprintf(stderr, "Group: %s, ", fileList[i].mGroup);
char permissions[10]; char permissions[10];
permission_disp(fileList[i].mPermissions, permissions); permission_disp(fileList[i].mPermissions, permissions);
fprintf(stderr, "Permissions: %d (%s)\n", fileList[i].mPermissions, permissions); fprintf(stderr, "Permissions: %d (%s)\n",
fileList[i].mPermissions, permissions);
} }
hdfsFreeFileInfo(fileList, numEntries); hdfsFreeFileInfo(fileList, numEntries);
} else { } else {
@ -295,203 +325,220 @@ int main(int argc, char **argv) {
} }
} }
// char*** hosts = hdfsGetHosts(fs, srcPath, 0, 1);
// if(hosts) {
// fprintf(stderr, "hdfsGetHosts - SUCCESS! ... \n");
// int i=0;
// while(hosts[i]) {
// int j = 0;
// while(hosts[i][j]) {
// fprintf(stderr,
// "\thosts[%d][%d] - %s\n", i, j, hosts[i][j]);
// ++j;
// }
// ++i;
// }
// } else {
// totalResult++;
// fprintf(stderr, "waah! hdfsGetHosts - FAILED!\n");
// }
char *newOwner = "root"; char *newOwner = "root";
// setting tmp dir to 777 so later when connectAsUser nobody, we can write to it // Setting tmp dir to 777 so later when connectAsUser nobody,
// we can write to it
short newPerm = 0666; short newPerm = 0666;
// chown write // hdfsChown
fprintf(stderr, "hdfsChown: %s\n", ((result = hdfsChown(fs, writePath, NULL, "users")) ? "Failed!" : "Success!")); fprintf(stderr, "hdfsChown: %s\n",
((result = hdfsChown(fs, rwPath, NULL, "users")) ?
"Failed!" : "Success!"));
totalResult += result; totalResult += result;
fprintf(stderr, "hdfsChown: %s\n", ((result = hdfsChown(fs, writePath, newOwner, NULL)) ? "Failed!" : "Success!")); fprintf(stderr, "hdfsChown: %s\n",
((result = hdfsChown(fs, rwPath, newOwner, NULL)) ?
"Failed!" : "Success!"));
totalResult += result; totalResult += result;
// chmod write // hdfsChmod
fprintf(stderr, "hdfsChmod: %s\n", ((result = hdfsChmod(fs, writePath, newPerm)) ? "Failed!" : "Success!")); fprintf(stderr, "hdfsChmod: %s\n",
((result = hdfsChmod(fs, rwPath, newPerm)) ?
"Failed!" : "Success!"));
totalResult += result; totalResult += result;
sleep(2); sleep(2);
tTime newMtime = time(NULL); tTime newMtime = time(NULL);
tTime newAtime = time(NULL); tTime newAtime = time(NULL);
// utime write // utime write
fprintf(stderr, "hdfsUtime: %s\n", ((result = hdfsUtime(fs, writePath, newMtime, newAtime)) ? "Failed!" : "Success!")); fprintf(stderr, "hdfsUtime: %s\n",
((result = hdfsUtime(fs, rwPath, newMtime, newAtime)) ?
"Failed!" : "Success!"));
totalResult += result; totalResult += result;
// chown/chmod/utime read // chown/chmod/utime read
hdfsFileInfo *finfo = hdfsGetPathInfo(fs, writePath); hdfsFileInfo *finfo = hdfsGetPathInfo(fs, rwPath);
fprintf(stderr, "hdfsChown read: %s\n", ((result = (strcmp(finfo->mOwner, newOwner) != 0)) ? "Failed!" : "Success!")); fprintf(stderr, "hdfsChown read: %s\n",
((result = (strcmp(finfo->mOwner, newOwner) != 0)) ?
"Failed!" : "Success!"));
totalResult += result; totalResult += result;
fprintf(stderr, "hdfsChmod read: %s\n", ((result = (finfo->mPermissions != newPerm)) ? "Failed!" : "Success!")); fprintf(stderr, "hdfsChmod read: %s\n",
((result = (finfo->mPermissions != newPerm)) ?
"Failed!" : "Success!"));
totalResult += result; totalResult += result;
// will later use /tmp/ as a different user so enable it // will later use /tmp/ as a different user so enable it
fprintf(stderr, "hdfsChmod: %s\n", ((result = hdfsChmod(fs, "/tmp/", 0777)) ? "Failed!" : "Success!")); fprintf(stderr, "hdfsChmod: %s\n",
((result = hdfsChmod(fs, slashTmp, 0777)) ?
"Failed!" : "Success!"));
totalResult += result; totalResult += result;
fprintf(stderr,"newMTime=%ld\n",newMtime); fprintf(stderr,"newMTime=%ld\n",newMtime);
fprintf(stderr,"curMTime=%ld\n",finfo->mLastMod); fprintf(stderr,"curMTime=%ld\n",finfo->mLastMod);
fprintf(stderr, "hdfsUtime read (mtime): %s\n", ((result = (finfo->mLastMod != newMtime / 1000)) ? "Failed!" : "Success!")); fprintf(stderr, "hdfsUtime read (mtime): %s\n",
((result = (finfo->mLastMod != newMtime / 1000)) ?
"Failed!" : "Success!"));
totalResult += result; totalResult += result;
hdfsFreeFileInfo(finfo, 1);
// Clean up // Clean up
fprintf(stderr, "hdfsDelete: %s\n", ((result = hdfsDelete(fs, newDirectory, 1)) ? "Failed!" : "Success!")); hdfsFreeFileInfo(finfo, 1);
fprintf(stderr, "hdfsDelete: %s\n",
((result = hdfsDelete(fs, newDirectory, 1)) ?
"Failed!" : "Success!"));
totalResult += result; totalResult += result;
fprintf(stderr, "hdfsDelete: %s\n", ((result = hdfsDelete(fs, srcPath, 1)) ? "Failed!" : "Success!")); fprintf(stderr, "hdfsDelete: %s\n",
((result = hdfsDelete(fs, srcPath, 1)) ?
"Failed!" : "Success!"));
totalResult += result; totalResult += result;
// fprintf(stderr, "hdfsDelete: %s\n", ((result = hdfsDelete(fs, movePath, 1)) ? "Failed!" : "Success!")); fprintf(stderr, "hdfsExists: %s\n",
// totalResult += result; ((result = hdfsExists(fs, newDirectory)) ?
fprintf(stderr, "hdfsExists: %s\n", ((result = hdfsExists(fs, newDirectory)) ? "Success!" : "Failed!")); "Success!" : "Failed!"));
totalResult += (result ? 0 : 1); totalResult += (result ? 0 : 1);
// Done test generic operations
} }
{ {
// TEST APPENDS // Test Appends
const char *writePath = "/tmp/appends"; appendTemplate = strdup("/tmp/appendsXXXXXX");
if (!appendTemplate) {
fprintf(stderr, "Failed to create appendTemplate!\n");
exit(1);
}
char *appendPath = mktemp(appendTemplate);
const char* helloBuffer = "Hello,";
hdfsFile writeFile = NULL;
// CREATE // Create
hdfsFile writeFile = hdfsOpenFile(fs, writePath, O_WRONLY, 0, 0, 0); writeFile = hdfsOpenFile(fs, appendPath, O_WRONLY, 0, 0, 0);
if(!writeFile) { if(!writeFile) {
fprintf(stderr, "Failed to open %s for writing!\n", writePath); fprintf(stderr, "Failed to open %s for writing!\n", appendPath);
exit(-1); exit(1);
} }
fprintf(stderr, "Opened %s for writing successfully...\n", writePath); fprintf(stderr, "Opened %s for writing successfully...\n", appendPath);
const char* buffer = "Hello,"; num_written_bytes = hdfsWrite(fs, writeFile, helloBuffer,
tSize num_written_bytes = hdfsWrite(fs, writeFile, (void*)buffer, strlen(buffer)); (int) strlen(helloBuffer));
fprintf(stderr, "Wrote %d bytes\n", num_written_bytes); fprintf(stderr, "Wrote %d bytes\n", num_written_bytes);
if (hdfsFlush(fs, writeFile)) {
fprintf(stderr, "Failed to 'flush' %s\n", writePath);
exit(-1);
}
fprintf(stderr, "Flushed %s successfully!\n", writePath);
hdfsCloseFile(fs, writeFile); hdfsCloseFile(fs, writeFile);
fprintf(stderr, "hdfsSetReplication: %s\n", ((result = hdfsSetReplication(fs, writePath, 1)) ? "Failed!" : "Success!")); fprintf(stderr, "hdfsSetReplication: %s\n",
((result = hdfsSetReplication(fs, appendPath, 1)) ?
"Failed!" : "Success!"));
totalResult += result; totalResult += result;
// RE-OPEN // Re-Open for Append
writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_APPEND, 0, 0, 0); writeFile = hdfsOpenFile(fs, appendPath, O_WRONLY | O_APPEND, 0, 0, 0);
if(!writeFile) { if(!writeFile) {
fprintf(stderr, "Failed to open %s for writing!\n", writePath); fprintf(stderr, "Failed to open %s for writing!\n", appendPath);
exit(-1); exit(1);
} }
fprintf(stderr, "Opened %s for appending successfully...\n", writePath); fprintf(stderr, "Opened %s for appending successfully...\n",
appendPath);
buffer = " World"; helloBuffer = " World";
num_written_bytes = hdfsWrite(fs, writeFile, (void*)buffer, strlen(buffer) + 1); num_written_bytes = hdfsWrite(fs, writeFile, helloBuffer,
(int)strlen(helloBuffer) + 1);
fprintf(stderr, "Wrote %d bytes\n", num_written_bytes); fprintf(stderr, "Wrote %d bytes\n", num_written_bytes);
if (hdfsFlush(fs, writeFile)) {
fprintf(stderr, "Failed to 'flush' %s\n", writePath);
exit(-1);
}
fprintf(stderr, "Flushed %s successfully!\n", writePath);
hdfsCloseFile(fs, writeFile); hdfsCloseFile(fs, writeFile);
// CHECK size // Check size
hdfsFileInfo *finfo = hdfsGetPathInfo(fs, writePath); hdfsFileInfo *finfo = hdfsGetPathInfo(fs, appendPath);
fprintf(stderr, "fileinfo->mSize: == total %s\n", ((result = (finfo->mSize == strlen("Hello, World") + 1)) ? "Success!" : "Failed!")); fprintf(stderr, "fileinfo->mSize: == total %s\n",
((result = (finfo->mSize == strlen("Hello, World") + 1)) ?
"Success!" : "Failed!"));
totalResult += (result ? 0 : 1); totalResult += (result ? 0 : 1);
// READ and check data // Read and check data
hdfsFile readFile = hdfsOpenFile(fs, writePath, O_RDONLY, 0, 0, 0); hdfsFile readFile = hdfsOpenFile(fs, appendPath, O_RDONLY, 0, 0, 0);
if (!readFile) { if (!readFile) {
fprintf(stderr, "Failed to open %s for reading!\n", writePath); fprintf(stderr, "Failed to open %s for reading!\n", appendPath);
exit(-1); exit(1);
} }
char rdbuffer[32]; tSize num_read_bytes = hdfsRead(fs, readFile, buffer, sizeof(buffer));
tSize num_read_bytes = hdfsRead(fs, readFile, (void*)rdbuffer, sizeof(rdbuffer));
fprintf(stderr, "Read following %d bytes:\n%s\n", fprintf(stderr, "Read following %d bytes:\n%s\n",
num_read_bytes, rdbuffer); num_read_bytes, buffer);
fprintf(stderr, "read == Hello, World %s\n",
fprintf(stderr, "read == Hello, World %s\n", (result = (strcmp(rdbuffer, "Hello, World") == 0)) ? "Success!" : "Failed!"); (result = (strcmp(buffer, "Hello, World") == 0)) ?
"Success!" : "Failed!");
hdfsCloseFile(fs, readFile); hdfsCloseFile(fs, readFile);
// DONE test appends // Cleanup
fprintf(stderr, "hdfsDelete: %s\n",
((result = hdfsDelete(fs, appendPath, 1)) ?
"Failed!" : "Success!"));
totalResult += result;
// Done test appends
} }
totalResult += (hdfsDisconnect(fs) != 0); totalResult += (hdfsDisconnect(fs) != 0);
{ {
// //
// Now test as connecting as a specific user // Now test as connecting as a specific user
// This is only meant to test that we connected as that user, not to test // This only meant to test that we connected as that user, not to test
// the actual fs user capabilities. Thus just create a file and read // the actual fs user capabilities. Thus just create a file and read
// the owner is correct. // the owner is correct.
const char *tuser = "nobody"; const char *tuser = "nobody";
const char* writePath = "/tmp/usertestfile.txt"; userTemplate = strdup("/tmp/usertestXXXXXX");
if (!userTemplate) {
fprintf(stderr, "Failed to create userTemplate!\n");
exit(1);
}
char* userWritePath = mktemp(userTemplate);
hdfsFile writeFile = NULL;
fs = hdfsConnectAsUserNewInstance("default", 50070, tuser); fs = hdfsConnectAsUserNewInstance("default", 50070, tuser);
if(!fs) { if(!fs) {
fprintf(stderr, "Oops! Failed to connect to hdfs as user %s!\n",tuser); fprintf(stderr,
exit(-1); "Oops! Failed to connect to hdfs as user %s!\n",tuser);
exit(1);
} }
hdfsFile writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_CREAT, 0, 0, 0); writeFile = hdfsOpenFile(fs, userWritePath, O_WRONLY|O_CREAT, 0, 0, 0);
if(!writeFile) { if(!writeFile) {
fprintf(stderr, "Failed to open %s for writing!\n", writePath); fprintf(stderr, "Failed to open %s for writing!\n", userWritePath);
exit(-1); exit(1);
} }
fprintf(stderr, "Opened %s for writing successfully...\n", writePath); fprintf(stderr, "Opened %s for writing successfully...\n",
userWritePath);
char* buffer = "Hello, World!"; num_written_bytes = hdfsWrite(fs, writeFile, fileContents,
tSize num_written_bytes = hdfsWrite(fs, writeFile, (void*)buffer, strlen(buffer)+1); (int)strlen(fileContents) + 1);
fprintf(stderr, "Wrote %d bytes\n", num_written_bytes); fprintf(stderr, "Wrote %d bytes\n", num_written_bytes);
if (hdfsFlush(fs, writeFile)) {
fprintf(stderr, "Failed to 'flush' %s\n", writePath);
exit(-1);
}
fprintf(stderr, "Flushed %s successfully!\n", writePath);
hdfsCloseFile(fs, writeFile); hdfsCloseFile(fs, writeFile);
hdfsFileInfo *finfo = hdfsGetPathInfo(fs, writePath); hdfsFileInfo *finfo = hdfsGetPathInfo(fs, userWritePath);
if (finfo) { if (finfo) {
fprintf(stderr, "hdfs new file user is correct: %s\n", ((result = (strcmp(finfo->mOwner, tuser) != 0)) ? "Failed!" : "Success!")); fprintf(stderr, "hdfs new file user is correct: %s\n",
((result = (strcmp(finfo->mOwner, tuser) != 0)) ?
"Failed!" : "Success!"));
} else { } else {
fprintf(stderr, "hdfsFileInfo returned by hdfsGetPathInfo is NULL\n"); fprintf(stderr,
"hdfsFileInfo returned by hdfsGetPathInfo is NULL\n");
result = -1; result = -1;
} }
totalResult += result; totalResult += result;
// Cleanup
fprintf(stderr, "hdfsDelete: %s\n",
((result = hdfsDelete(fs, userWritePath, 1)) ?
"Failed!" : "Success!"));
totalResult += result;
// Done test specific user
} }
totalResult += (hdfsDisconnect(fs) != 0); totalResult += (hdfsDisconnect(fs) != 0);
fprintf(stderr, "totalResult == %d\n", totalResult);
// Shutdown the native minidfscluster
nmdShutdown(cluster);
nmdFree(cluster);
fprintf(stderr, "totalResult == %d\n", totalResult);
if (totalResult != 0) { if (totalResult != 0) {
return -1; return -1;
} else { } else {

View File

@ -23,42 +23,51 @@
int main(int argc, char **argv) { int main(int argc, char **argv) {
const char* rfile;
tSize fileTotalSize, bufferSize, curSize, totalReadSize;
hdfsFS fs;
hdfsFile readFile;
char *buffer = NULL;
if (argc != 4) { if (argc != 4) {
fprintf(stderr, "Usage: hdfs_read <filename> <filesize> <buffersize>\n"); fprintf(stderr, "Usage: test_libwebhdfs_read"
exit(-1); " <filename> <filesize> <buffersize>\n");
exit(1);
} }
hdfsFS fs = hdfsConnect("0.0.0.0", 50070); fs = hdfsConnect("localhost", 50070);
if (!fs) { if (!fs) {
fprintf(stderr, "Oops! Failed to connect to hdfs!\n"); fprintf(stderr, "Oops! Failed to connect to hdfs!\n");
exit(-1); exit(1);
} }
const char* rfile = argv[1]; rfile = argv[1];
tSize fileTotalSize = strtoul(argv[2], NULL, 10); fileTotalSize = strtoul(argv[2], NULL, 10);
tSize bufferSize = strtoul(argv[3], NULL, 10); bufferSize = strtoul(argv[3], NULL, 10);
hdfsFile readFile = hdfsOpenFile(fs, rfile, O_RDONLY, bufferSize, 0, 0); readFile = hdfsOpenFile(fs, rfile, O_RDONLY, bufferSize, 0, 0);
if (!readFile) { if (!readFile) {
fprintf(stderr, "Failed to open %s for writing!\n", rfile); fprintf(stderr, "Failed to open %s for writing!\n", rfile);
exit(-2); exit(1);
} }
// data to be written to the file // data to be written to the file
char* buffer = malloc(sizeof(char) * bufferSize); buffer = malloc(sizeof(char) * bufferSize);
if(buffer == NULL) { if(buffer == NULL) {
return -2; fprintf(stderr, "Failed to allocate buffer.\n");
exit(1);
} }
// read from the file // read from the file
tSize curSize = bufferSize; curSize = bufferSize;
tSize totalReadSize = 0; totalReadSize = 0;
for (; (curSize = hdfsRead(fs, readFile, (void*)buffer, bufferSize)) == bufferSize ;) { for (; (curSize = hdfsRead(fs, readFile, buffer, bufferSize)) == bufferSize; ) {
totalReadSize += curSize; totalReadSize += curSize;
} }
totalReadSize += curSize; totalReadSize += curSize;
fprintf(stderr, "size of the file: %d; reading size: %d\n", fileTotalSize, totalReadSize); fprintf(stderr, "size of the file: %d; reading size: %d\n",
fileTotalSize, totalReadSize);
free(buffer); free(buffer);
hdfsCloseFile(fs, readFile); hdfsCloseFile(fs, readFile);
@ -67,7 +76,3 @@ int main(int argc, char **argv) {
return 0; return 0;
} }
/**
* vim: ts=4: sw=4: et:
*/

View File

@ -18,6 +18,7 @@
#include "expect.h" #include "expect.h"
#include "hdfs.h" #include "hdfs.h"
#include "native_mini_dfs.h"
#include <errno.h> #include <errno.h>
#include <semaphore.h> #include <semaphore.h>
@ -28,11 +29,9 @@
#define TLH_MAX_THREADS 100 #define TLH_MAX_THREADS 100
static sem_t *tlhSem; static struct NativeMiniDfsCluster* cluster;
static const char *nn;
static const char *user; static const char *user;
static int port;
struct tlhThreadInfo { struct tlhThreadInfo {
/** Thread index */ /** Thread index */
@ -43,19 +42,24 @@ struct tlhThreadInfo {
pthread_t thread; pthread_t thread;
}; };
static int hdfsSingleNameNodeConnect(const char *nn, int port, const char *user, hdfsFS *fs) static int hdfsSingleNameNodeConnect(struct NativeMiniDfsCluster *cluster,
hdfsFS *fs)
{ {
int nnPort;
const char *nnHost;
hdfsFS hdfs; hdfsFS hdfs;
if (port < 0) {
fprintf(stderr, "hdfsSingleNameNodeConnect: nmdGetNameNodePort " if (nmdGetNameNodeHttpAddress(cluster, &nnPort, &nnHost)) {
"returned error %d\n", port); fprintf(stderr, "Error when retrieving namenode host address.\n");
return port; return 1;
} }
hdfs = hdfsConnectAsUserNewInstance(nn, port, user); hdfs = hdfsConnectAsUser(nnHost, nnPort, user);
if(!hdfs) { if(!hdfs) {
return -errno; fprintf(stderr, "Oops! Failed to connect to hdfs!\n");
return 1;
} }
*fs = hdfs; *fs = hdfs;
return 0; return 0;
} }
@ -65,6 +69,7 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs)
char prefix[256], tmp[256]; char prefix[256], tmp[256];
hdfsFile file; hdfsFile file;
int ret, expected; int ret, expected;
hdfsFileInfo *fileInfo;
snprintf(prefix, sizeof(prefix), "/tlhData%04d", ti->threadIdx); snprintf(prefix, sizeof(prefix), "/tlhData%04d", ti->threadIdx);
@ -74,18 +79,13 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs)
EXPECT_ZERO(hdfsCreateDirectory(fs, prefix)); EXPECT_ZERO(hdfsCreateDirectory(fs, prefix));
snprintf(tmp, sizeof(tmp), "%s/file", prefix); snprintf(tmp, sizeof(tmp), "%s/file", prefix);
/*
* Although there should not be any file to open for reading,
* the right now implementation only construct a local
* information struct when opening file
*/
EXPECT_NONNULL(hdfsOpenFile(fs, tmp, O_RDONLY, 0, 0, 0)); EXPECT_NONNULL(hdfsOpenFile(fs, tmp, O_RDONLY, 0, 0, 0));
file = hdfsOpenFile(fs, tmp, O_WRONLY, 0, 0, 0); file = hdfsOpenFile(fs, tmp, O_WRONLY, 0, 0, 0);
EXPECT_NONNULL(file); EXPECT_NONNULL(file);
/* TODO: implement writeFully and use it here */ /* TODO: implement writeFully and use it here */
expected = strlen(prefix); expected = (int)strlen(prefix);
ret = hdfsWrite(fs, file, prefix, expected); ret = hdfsWrite(fs, file, prefix, expected);
if (ret < 0) { if (ret < 0) {
ret = errno; ret = errno;
@ -119,8 +119,27 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs)
EXPECT_ZERO(memcmp(prefix, tmp, expected)); EXPECT_ZERO(memcmp(prefix, tmp, expected));
EXPECT_ZERO(hdfsCloseFile(fs, file)); EXPECT_ZERO(hdfsCloseFile(fs, file));
// TODO: Non-recursive delete should fail? snprintf(tmp, sizeof(tmp), "%s/file", prefix);
//EXPECT_NONZERO(hdfsDelete(fs, prefix, 0)); EXPECT_NONZERO(hdfsChown(fs, tmp, NULL, NULL));
EXPECT_ZERO(hdfsChown(fs, tmp, NULL, "doop"));
fileInfo = hdfsGetPathInfo(fs, tmp);
EXPECT_NONNULL(fileInfo);
EXPECT_ZERO(strcmp("doop", fileInfo->mGroup));
hdfsFreeFileInfo(fileInfo, 1);
EXPECT_ZERO(hdfsChown(fs, tmp, "ha", "doop2"));
fileInfo = hdfsGetPathInfo(fs, tmp);
EXPECT_NONNULL(fileInfo);
EXPECT_ZERO(strcmp("ha", fileInfo->mOwner));
EXPECT_ZERO(strcmp("doop2", fileInfo->mGroup));
hdfsFreeFileInfo(fileInfo, 1);
EXPECT_ZERO(hdfsChown(fs, tmp, "ha2", NULL));
fileInfo = hdfsGetPathInfo(fs, tmp);
EXPECT_NONNULL(fileInfo);
EXPECT_ZERO(strcmp("ha2", fileInfo->mOwner));
EXPECT_ZERO(strcmp("doop2", fileInfo->mGroup));
hdfsFreeFileInfo(fileInfo, 1);
EXPECT_ZERO(hdfsDelete(fs, prefix, 1)); EXPECT_ZERO(hdfsDelete(fs, prefix, 1));
return 0; return 0;
@ -134,7 +153,7 @@ static void *testHdfsOperations(void *v)
fprintf(stderr, "testHdfsOperations(threadIdx=%d): starting\n", fprintf(stderr, "testHdfsOperations(threadIdx=%d): starting\n",
ti->threadIdx); ti->threadIdx);
ret = hdfsSingleNameNodeConnect(nn, port, user, &fs); ret = hdfsSingleNameNodeConnect(cluster, &fs);
if (ret) { if (ret) {
fprintf(stderr, "testHdfsOperations(threadIdx=%d): " fprintf(stderr, "testHdfsOperations(threadIdx=%d): "
"hdfsSingleNameNodeConnect failed with error %d.\n", "hdfsSingleNameNodeConnect failed with error %d.\n",
@ -181,19 +200,23 @@ static int checkFailures(struct tlhThreadInfo *ti, int tlhNumThreads)
*/ */
int main(int argc, const char *args[]) int main(int argc, const char *args[])
{ {
if (argc != 4) {
fprintf(stderr, "usage: test_libhdfs_threaded <namenode> <port> <username>");
return -1;
}
nn = args[1];
port = atoi(args[2]);
user = args[3];
int i, tlhNumThreads; int i, tlhNumThreads;
const char *tlhNumThreadsStr; const char *tlhNumThreadsStr;
struct tlhThreadInfo ti[TLH_MAX_THREADS]; struct tlhThreadInfo ti[TLH_MAX_THREADS];
if (argc != 2) {
fprintf(stderr, "usage: test_libwebhdfs_threaded <username>\n");
exit(1);
}
user = args[1];
struct NativeMiniDfsConf conf = {
.doFormat = 1, .webhdfsEnabled = 1, .namenodeHttpPort = 50070,
};
cluster = nmdCreate(&conf);
EXPECT_NONNULL(cluster);
EXPECT_ZERO(nmdWaitClusterUp(cluster));
tlhNumThreadsStr = getenv("TLH_NUM_THREADS"); tlhNumThreadsStr = getenv("TLH_NUM_THREADS");
if (!tlhNumThreadsStr) { if (!tlhNumThreadsStr) {
tlhNumThreadsStr = "3"; tlhNumThreadsStr = "3";
@ -210,8 +233,6 @@ int main(int argc, const char *args[])
ti[i].threadIdx = i; ti[i].threadIdx = i;
} }
// tlhSem = sem_open("sem", O_CREAT, 0644, tlhNumThreads);
for (i = 0; i < tlhNumThreads; i++) { for (i = 0; i < tlhNumThreads; i++) {
EXPECT_ZERO(pthread_create(&ti[i].thread, NULL, EXPECT_ZERO(pthread_create(&ti[i].thread, NULL,
testHdfsOperations, &ti[i])); testHdfsOperations, &ti[i]));
@ -220,6 +241,7 @@ int main(int argc, const char *args[])
EXPECT_ZERO(pthread_join(ti[i].thread, NULL)); EXPECT_ZERO(pthread_join(ti[i].thread, NULL));
} }
// EXPECT_ZERO(sem_close(tlhSem)); EXPECT_ZERO(nmdShutdown(cluster));
nmdFree(cluster);
return checkFailures(ti, tlhNumThreads); return checkFailures(ti, tlhNumThreads);
} }

View File

@ -22,97 +22,90 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <inttypes.h>
int main(int argc, char **argv) { int main(int argc, char **argv) {
hdfsFS fs;
const char* writeFileName;
off_t fileTotalSize;
long long tmpBufferSize;
tSize bufferSize = 0, totalWriteSize = 0, toWrite = 0, written = 0;
hdfsFile writeFile = NULL;
int append, i = 0;
char* buffer = NULL;
if (argc != 6) { if (argc != 6) {
fprintf(stderr, "Usage: hdfs_write <filename> <filesize> <buffersize> <username> <append>\n"); fprintf(stderr, "Usage: test_libwebhdfs_write <filename> <filesize> "
exit(-1); "<buffersize> <username> <append>\n");
exit(1);
} }
hdfsFS fs = hdfsConnectAsUser("0.0.0.0", 50070, argv[4]); fs = hdfsConnectAsUser("default", 50070, argv[4]);
if (!fs) { if (!fs) {
fprintf(stderr, "Oops! Failed to connect to hdfs!\n"); fprintf(stderr, "Oops! Failed to connect to hdfs!\n");
exit(-1); exit(1);
} }
const char* writeFileName = argv[1]; writeFileName = argv[1];
off_t fileTotalSize = strtoul(argv[2], NULL, 10); fileTotalSize = strtoul(argv[2], NULL, 10);
long long tmpBufferSize = strtoul(argv[3], NULL, 10); tmpBufferSize = strtoul(argv[3], NULL, 10);
// sanity check // sanity check
if(fileTotalSize == ULONG_MAX && errno == ERANGE) { if(fileTotalSize == ULONG_MAX && errno == ERANGE) {
fprintf(stderr, "invalid file size %s - must be <= %lu\n", argv[2], ULONG_MAX); fprintf(stderr, "invalid file size %s - must be <= %lu\n",
exit(-3); argv[2], ULONG_MAX);
exit(1);
} }
// currently libhdfs writes are of tSize which is int32 // currently libhdfs writes are of tSize which is int32
if(tmpBufferSize > INT_MAX) { if(tmpBufferSize > INT_MAX) {
fprintf(stderr, "invalid buffer size libhdfs API write chunks must be <= %d\n",INT_MAX); fprintf(stderr,
exit(-3); "invalid buffer size libhdfs API write chunks must be <= %d\n",
INT_MAX);
exit(1);
} }
tSize bufferSize = tmpBufferSize; bufferSize = (tSize) tmpBufferSize;
append = atoi(argv[5]);
hdfsFile writeFile = NULL;
int append = atoi(argv[5]);
if (!append) { if (!append) {
writeFile = hdfsOpenFile(fs, writeFileName, O_WRONLY, bufferSize, 2, 0); writeFile = hdfsOpenFile(fs, writeFileName, O_WRONLY, bufferSize, 2, 0);
} else { } else {
writeFile = hdfsOpenFile(fs, writeFileName, O_WRONLY | O_APPEND, bufferSize, 2, 0); writeFile = hdfsOpenFile(fs, writeFileName, O_WRONLY | O_APPEND,
bufferSize, 2, 0);
} }
if (!writeFile) { if (!writeFile) {
fprintf(stderr, "Failed to open %s for writing!\n", writeFileName); fprintf(stderr, "Failed to open %s for writing!\n", writeFileName);
exit(-2); exit(1);
} }
// data to be written to the file // data to be written to the file
char* buffer = malloc(sizeof(char) * bufferSize + 1); buffer = malloc(sizeof(char) * bufferSize + 1);
if(buffer == NULL) { if(buffer == NULL) {
fprintf(stderr, "Could not allocate buffer of size %d\n", bufferSize); fprintf(stderr, "Could not allocate buffer of size %d\n", bufferSize);
return -2; exit(1);
} }
int i = 0;
for (i = 0; i < bufferSize; ++i) { for (i = 0; i < bufferSize; ++i) {
buffer[i] = 'a' + (i%26); buffer[i] = 'a' + (i%26);
} }
buffer[bufferSize] = '\0'; buffer[bufferSize] = '\0';
size_t totalWriteSize = 0; // write to the file
totalWriteSize = 0;
for (; totalWriteSize < fileTotalSize; ) { for (; totalWriteSize < fileTotalSize; ) {
tSize toWrite = bufferSize < (fileTotalSize - totalWriteSize) ? bufferSize : (fileTotalSize - totalWriteSize); toWrite = bufferSize < (fileTotalSize - totalWriteSize) ?
size_t written = hdfsWrite(fs, writeFile, (void*)buffer, toWrite); bufferSize : (fileTotalSize - totalWriteSize);
fprintf(stderr, "written size %ld, to write size %d\n", written, toWrite); written = hdfsWrite(fs, writeFile, (void*)buffer, toWrite);
fprintf(stderr, "written size %d, to write size %d\n",
written, toWrite);
totalWriteSize += written; totalWriteSize += written;
//sleep(1);
} }
// cleanup
free(buffer); free(buffer);
hdfsCloseFile(fs, writeFile); hdfsCloseFile(fs, writeFile);
fprintf(stderr, "file total size: %" PRId64 ", total write size: %d\n",
fprintf(stderr, "file total size: %lld, total write size: %ld\n", fileTotalSize, totalWriteSize); fileTotalSize, totalWriteSize);
hdfsFile readFile = hdfsOpenFile(fs, writeFileName, O_RDONLY, 0, 0, 0);
//sleep(1);
fprintf(stderr, "hdfsAvailable: %d\n", hdfsAvailable(fs, readFile));
hdfsFile writeFile2 = hdfsOpenFile(fs, writeFileName, O_WRONLY | O_APPEND, 0, 2, 0);
fprintf(stderr, "Opened %s for writing successfully...\n", writeFileName);
const char *content = "Hello, World!";
size_t num_written_bytes = hdfsWrite(fs, writeFile2, content, strlen(content) + 1);
if (num_written_bytes != strlen(content) + 1) {
fprintf(stderr, "Failed to write correct number of bytes - expected %d, got %d\n",
(int)(strlen(content) + 1), (int)num_written_bytes);
exit(-1);
}
fprintf(stderr, "Wrote %zd bytes\n", num_written_bytes);
hdfsDisconnect(fs); hdfsDisconnect(fs);
return 0; return 0;
} }
/**
* vim: ts=4: sw=4: et:
*/

View File

@ -1,111 +0,0 @@
#include "hdfs.h"
#include <time.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#ifdef __MACH__
#include <mach/clock.h>
#include <mach/mach.h>
#endif
void current_utc_time(struct timespec *ts) {
#ifdef __MACH__ // OS X does not have clock_gettime, use clock_get_time
clock_serv_t cclock;
mach_timespec_t mts;
host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock);
clock_get_time(cclock, &mts);
mach_port_deallocate(mach_task_self(), cclock);
ts->tv_sec = mts.tv_sec;
ts->tv_nsec = mts.tv_nsec;
#else
clock_gettime(CLOCK_REALTIME, ts);
#endif
}
long get_time() {
struct timespec tp;
current_utc_time(&tp);
return (long)((tp.tv_sec * 1000000000) + tp.tv_nsec);
}
#define SIZE 512*1024*1024
#define READ_SIZE 512*1024*1024
#define DISCARD_COUNT 5
int main(int argc, char** argv) {
if (argc != 4) {
fprintf(stderr, "Usage: test_read_bm <namenode> <user_name> <iteration_number>\n");
exit(0);
}
hdfsFS fs = hdfsConnectAsUser(argv[1], 50070, argv[2]);
/* printf("File is null: %d\n", file == NULL ? 1 : 0); */
char *buf = (char *) malloc(sizeof(unsigned char) * SIZE);
printf("Read size: %d\n", READ_SIZE);
int iterations = atoi(argv[3]);
if (iterations <= DISCARD_COUNT) {
printf("Iterations should be at least %d\n", DISCARD_COUNT + 1);
exit(0);
}
printf("Running %d iterations\n", iterations);
float time_total;
float max = 0.f;
float min = 999999999999999.f;
printf("Start...\n");
int i;
for (i=0; i<iterations; ++i) {
long start = get_time();
hdfsFile file = hdfsOpenFile(fs, "/tmp/512_mb.txt", O_RDONLY, 0, 0, 0);
int n = 0;
while (n < SIZE) {
int nread = hdfsRead(fs, file, buf + n, READ_SIZE);
if (nread <= 0) {
printf("EOF before finished, read %d bytes\n", n);
hdfsDisconnect(fs);
return 0;
}
n += nread;
printf("Read %d kilobytes\n", nread / 1024);
}
long end = get_time();
printf("Read %d bytes, hoping for %d.\n", n, SIZE);
long elapsed = (end - start);
printf("Start: %lu, end: %lu\n", start, end);
float time = elapsed / (1000000000.0f);
printf ("Took %2.6fs\n", time);
printf("Throughput: %2.2fMB/s\n", SIZE * 1.0f / (1024 * 1024 * time));
if (i >= DISCARD_COUNT) {
time_total += time;
if (time < min) {
min = time;
}
if (time > max) {
max = time;
}
}
}
hdfsDisconnect(fs);
printf("------\n");
printf("Average time: %2.2fs\n", time_total / (iterations - DISCARD_COUNT));
printf("Max. time: %2.2f, min. time: %2.2f\n", max, min);
float maxt = SIZE * 1.f / (1024 * 1024 * max);
float mint = SIZE * 1.f / (1024 * 1024 * min);
printf("Average throughput: %2.2fMB/s\n", 1.f * SIZE * (iterations - DISCARD_COUNT) / (1024 * 1024 * time_total));
printf("Max. throughput: %2.2f, min. throughput: %2.2f\n", maxt, mint);
// printf("File contents: %d\n", buf[0]);
return 0;
}

View File

@ -185,7 +185,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) { if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
return writer; return writer;
} }
int totalReplicasExpected = numOfReplicas; int totalReplicasExpected = numOfReplicas + results.size();
int numOfResults = results.size(); int numOfResults = results.size();
boolean newBlock = (numOfResults==0); boolean newBlock = (numOfResults==0);
@ -231,7 +231,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
maxNodesPerRack, results, avoidStaleNodes); maxNodesPerRack, results, avoidStaleNodes);
} catch (NotEnoughReplicasException e) { } catch (NotEnoughReplicasException e) {
LOG.warn("Not able to place enough replicas, still in need of " LOG.warn("Not able to place enough replicas, still in need of "
+ numOfReplicas + " to reach " + totalReplicasExpected + "\n" + (totalReplicasExpected - results.size()) + " to reach "
+ totalReplicasExpected + "\n"
+ e.getMessage()); + e.getMessage());
if (avoidStaleNodes) { if (avoidStaleNodes) {
// ecxludedNodes now has - initial excludedNodes, any nodes that were // ecxludedNodes now has - initial excludedNodes, any nodes that were

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -32,6 +34,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.SignedBytes; import com.google.common.primitives.SignedBytes;
/** /**
@ -225,11 +228,10 @@ public abstract class INode implements Comparable<byte[]> {
abstract DirCounts spaceConsumedInTree(DirCounts counts); abstract DirCounts spaceConsumedInTree(DirCounts counts);
/** /**
* Get local file name * @return null if the local name is null; otherwise, return the local name.
* @return local file name
*/ */
public String getLocalName() { public String getLocalName() {
return DFSUtil.bytes2String(name); return name == null? null: DFSUtil.bytes2String(name);
} }
@ -239,8 +241,8 @@ public abstract class INode implements Comparable<byte[]> {
} }
/** /**
* Get local file name * @return null if the local name is null;
* @return local file name * otherwise, return the local name byte array.
*/ */
byte[] getLocalNameBytes() { byte[] getLocalNameBytes() {
return name; return name;
@ -463,4 +465,30 @@ public abstract class INode implements Comparable<byte[]> {
return new INodeFile(permissions, blocks, replication, return new INodeFile(permissions, blocks, replication,
modificationTime, atime, preferredBlockSize); modificationTime, atime, preferredBlockSize);
} }
/**
* Dump the subtree starting from this inode.
* @return a text representation of the tree.
*/
@VisibleForTesting
public StringBuffer dumpTreeRecursively() {
final StringWriter out = new StringWriter();
dumpTreeRecursively(new PrintWriter(out, true), new StringBuilder());
return out.getBuffer();
}
/**
* Dump tree recursively.
* @param prefix The prefix string that each line should print.
*/
@VisibleForTesting
public void dumpTreeRecursively(PrintWriter out, StringBuilder prefix) {
out.print(prefix);
out.print(" ");
out.print(getLocalName());
out.print(" (");
final String s = super.toString();
out.print(s.substring(s.lastIndexOf(getClass().getSimpleName())));
out.println(")");
}
} }

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -32,6 +33,8 @@ import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshotRoot; import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshotRoot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable; import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
import com.google.common.annotations.VisibleForTesting;
/** /**
* Directory INode class. * Directory INode class.
*/ */
@ -568,4 +571,52 @@ public class INodeDirectory extends INode {
return size; return size;
} }
} }
/*
* The following code is to dump the tree recursively for testing.
*
* \- foo (INodeDirectory@33dd2717)
* \- sub1 (INodeDirectory@442172)
* +- file1 (INodeFile@78392d4)
* +- file2 (INodeFile@78392d5)
* +- sub11 (INodeDirectory@8400cff)
* \- file3 (INodeFile@78392d6)
* \- z_file4 (INodeFile@45848712)
*/
static final String DUMPTREE_EXCEPT_LAST_ITEM = "+-";
static final String DUMPTREE_LAST_ITEM = "\\-";
@VisibleForTesting
@Override
public void dumpTreeRecursively(PrintWriter out, StringBuilder prefix) {
super.dumpTreeRecursively(out, prefix);
if (prefix.length() >= 2) {
prefix.setLength(prefix.length() - 2);
prefix.append(" ");
}
dumpTreeRecursively(out, prefix, children);
}
/**
* Dump the given subtrees.
* @param prefix The prefix string that each line should print.
* @param subs The subtrees.
*/
@VisibleForTesting
protected static void dumpTreeRecursively(PrintWriter out,
StringBuilder prefix, List<? extends INode> subs) {
prefix.append(DUMPTREE_EXCEPT_LAST_ITEM);
if (subs != null && subs.size() != 0) {
int i = 0;
for(; i < subs.size() - 1; i++) {
subs.get(i).dumpTreeRecursively(out, prefix);
prefix.setLength(prefix.length() - 2);
prefix.append(DUMPTREE_EXCEPT_LAST_ITEM);
}
prefix.setLength(prefix.length() - 2);
prefix.append(DUMPTREE_LAST_ITEM);
subs.get(i).dumpTreeRecursively(out, prefix);
}
prefix.setLength(prefix.length() - 2);
}
} }

View File

@ -24,10 +24,15 @@
#include <jni.h> #include <jni.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h>
#define MINIDFS_CLUSTER_BUILDER "org/apache/hadoop/hdfs/MiniDFSCluster$Builder" #define MINIDFS_CLUSTER_BUILDER "org/apache/hadoop/hdfs/MiniDFSCluster$Builder"
#define MINIDFS_CLUSTER "org/apache/hadoop/hdfs/MiniDFSCluster" #define MINIDFS_CLUSTER "org/apache/hadoop/hdfs/MiniDFSCluster"
#define HADOOP_CONF "org/apache/hadoop/conf/Configuration" #define HADOOP_CONF "org/apache/hadoop/conf/Configuration"
#define HADOOP_NAMENODE "org/apache/hadoop/hdfs/server/namenode/NameNode"
#define JAVA_INETSOCKETADDRESS "java/net/InetSocketAddress"
#define DFS_WEBHDFS_ENABLED_KEY "dfs.webhdfs.enabled"
struct NativeMiniDfsCluster { struct NativeMiniDfsCluster {
/** /**
@ -43,6 +48,7 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
jvalue val; jvalue val;
JNIEnv *env = getJNIEnv(); JNIEnv *env = getJNIEnv();
jthrowable jthr; jthrowable jthr;
jstring jconfStr;
if (!env) { if (!env) {
fprintf(stderr, "nmdCreate: unable to construct JNIEnv.\n"); fprintf(stderr, "nmdCreate: unable to construct JNIEnv.\n");
@ -59,6 +65,22 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
"nmdCreate: new Configuration"); "nmdCreate: new Configuration");
goto error_free_cl; goto error_free_cl;
} }
if (conf->webhdfsEnabled) {
jthr = newJavaStr(env, DFS_WEBHDFS_ENABLED_KEY, &jconfStr);
if (jthr) {
printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"nmdCreate: new String");
goto error_dlr_cobj;
}
jthr = invokeMethod(env, NULL, INSTANCE, cobj, HADOOP_CONF,
"setBoolean", "(Ljava/lang/String;Z)V",
jconfStr, conf->webhdfsEnabled);
if (jthr) {
printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"nmdCreate: Configuration::setBoolean");
goto error_dlr_cobj;
}
}
jthr = constructNewObjectOfClass(env, &bld, MINIDFS_CLUSTER_BUILDER, jthr = constructNewObjectOfClass(env, &bld, MINIDFS_CLUSTER_BUILDER,
"(L"HADOOP_CONF";)V", cobj); "(L"HADOOP_CONF";)V", cobj);
if (jthr) { if (jthr) {
@ -74,6 +96,16 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
goto error_dlr_bld; goto error_dlr_bld;
} }
bld2 = val.l; bld2 = val.l;
if (conf->webhdfsEnabled) {
jthr = invokeMethod(env, &val, INSTANCE, bld2, MINIDFS_CLUSTER_BUILDER,
"nameNodeHttpPort", "(I)L" MINIDFS_CLUSTER_BUILDER ";",
conf->namenodeHttpPort);
if (jthr) {
printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "nmdCreate: "
"Builder::nameNodeHttpPort");
goto error_dlr_bld2;
}
}
jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER, jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
"build", "()L" MINIDFS_CLUSTER ";"); "build", "()L" MINIDFS_CLUSTER ";");
if (jthr) { if (jthr) {
@ -91,6 +123,7 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
(*env)->DeleteLocalRef(env, bld2); (*env)->DeleteLocalRef(env, bld2);
(*env)->DeleteLocalRef(env, bld); (*env)->DeleteLocalRef(env, bld);
(*env)->DeleteLocalRef(env, cobj); (*env)->DeleteLocalRef(env, cobj);
(*env)->DeleteLocalRef(env, jconfStr);
return cl; return cl;
error_dlr_val: error_dlr_val:
@ -101,6 +134,7 @@ error_dlr_bld:
(*env)->DeleteLocalRef(env, bld); (*env)->DeleteLocalRef(env, bld);
error_dlr_cobj: error_dlr_cobj:
(*env)->DeleteLocalRef(env, cobj); (*env)->DeleteLocalRef(env, cobj);
(*env)->DeleteLocalRef(env, jconfStr);
error_free_cl: error_free_cl:
free(cl); free(cl);
error: error:
@ -177,3 +211,69 @@ int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl)
} }
return jVal.i; return jVal.i;
} }
int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl,
int *port, const char **hostName)
{
JNIEnv *env = getJNIEnv();
jvalue jVal;
jobject jNameNode, jAddress;
jthrowable jthr;
int ret = 0;
const char *host;
if (!env) {
fprintf(stderr, "nmdHdfsConnect: getJNIEnv failed\n");
return -EIO;
}
// First get the (first) NameNode of the cluster
jthr = invokeMethod(env, &jVal, INSTANCE, cl->obj, MINIDFS_CLUSTER,
"getNameNode", "()L" HADOOP_NAMENODE ";");
if (jthr) {
printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"nmdGetNameNodeHttpAddress: "
"MiniDFSCluster#getNameNode");
return -EIO;
}
jNameNode = jVal.l;
// Then get the http address (InetSocketAddress) of the NameNode
jthr = invokeMethod(env, &jVal, INSTANCE, jNameNode, HADOOP_NAMENODE,
"getHttpAddress", "()L" JAVA_INETSOCKETADDRESS ";");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"nmdGetNameNodeHttpAddress: "
"NameNode#getHttpAddress");
goto error_dlr_nn;
}
jAddress = jVal.l;
jthr = invokeMethod(env, &jVal, INSTANCE, jAddress,
JAVA_INETSOCKETADDRESS, "getPort", "()I");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"nmdGetNameNodeHttpAddress: "
"InetSocketAddress#getPort");
goto error_dlr_addr;
}
*port = jVal.i;
jthr = invokeMethod(env, &jVal, INSTANCE, jAddress, JAVA_INETSOCKETADDRESS,
"getHostName", "()Ljava/lang/String;");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"nmdGetNameNodeHttpAddress: "
"InetSocketAddress#getHostName");
goto error_dlr_addr;
}
host = (*env)->GetStringUTFChars(env, jVal.l, NULL);
*hostName = strdup(host);
(*env)->ReleaseStringUTFChars(env, jVal.l, host);
error_dlr_addr:
(*env)->DeleteLocalRef(env, jAddress);
error_dlr_nn:
(*env)->DeleteLocalRef(env, jNameNode);
return ret;
}

View File

@ -31,6 +31,14 @@ struct NativeMiniDfsConf {
* Nonzero if the cluster should be formatted prior to startup * Nonzero if the cluster should be formatted prior to startup
*/ */
jboolean doFormat; jboolean doFormat;
/**
* Whether or not to enable webhdfs in MiniDfsCluster
*/
jboolean webhdfsEnabled;
/**
* The http port of the namenode in MiniDfsCluster
*/
jint namenodeHttpPort;
}; };
/** /**
@ -77,4 +85,20 @@ void nmdFree(struct NativeMiniDfsCluster* cl);
* @return the port, or a negative error code * @return the port, or a negative error code
*/ */
int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl); int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl);
/**
* Get the http address that's in use by the given (non-HA) nativeMiniDfs
*
* @param cl The initialized NativeMiniDfsCluster
* @param port Used to capture the http port of the NameNode
* of the NativeMiniDfsCluster
* @param hostName Used to capture the http hostname of the NameNode
* of the NativeMiniDfsCluster
*
* @return 0 on success; a non-zero error code if failing to
* get the information.
*/
int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl,
int *port, const char **hostName);
#endif #endif

View File

@ -1145,7 +1145,7 @@ public class TestDFSShell {
args = new String[2]; args = new String[2];
args[0] = "-touchz"; args[0] = "-touchz";
args[1] = "/test/mkdirs/noFileHere"; args[1] = "/test/mkdirs/isFileHere";
val = -1; val = -1;
try { try {
val = shell.run(args); val = shell.run(args);
@ -1157,7 +1157,7 @@ public class TestDFSShell {
args = new String[2]; args = new String[2];
args[0] = "-touchz"; args[0] = "-touchz";
args[1] = "/test/mkdirs/thisDirNotExists/noFileHere"; args[1] = "/test/mkdirs/thisDirNotExists/isFileHere";
val = -1; val = -1;
try { try {
val = shell.run(args); val = shell.run(args);
@ -1171,7 +1171,7 @@ public class TestDFSShell {
args = new String[3]; args = new String[3];
args[0] = "-test"; args[0] = "-test";
args[1] = "-e"; args[1] = "-e";
args[2] = "/test/mkdirs/noFileHere"; args[2] = "/test/mkdirs/isFileHere";
val = -1; val = -1;
try { try {
val = shell.run(args); val = shell.run(args);

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
@ -44,6 +45,10 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node; import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@ -378,6 +383,70 @@ public class TestReplicationPolicy {
assertFalse(cluster.isOnSameRack(targets[0], targets[1])); assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
} }
/**
* In this testcase, it tries to choose more targets than available nodes and
* check the result.
* @throws Exception
*/
@Test
public void testChooseTargetWithMoreThanAvaiableNodes() throws Exception {
// make data node 0 & 1 to be not qualified to choose: not enough disk space
for(int i=0; i<2; i++) {
dataNodes[i].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
}
final TestAppender appender = new TestAppender();
final Logger logger = Logger.getRootLogger();
logger.addAppender(appender);
// try to choose NUM_OF_DATANODES which is more than actually available
// nodes.
DatanodeDescriptor[] targets = replicator.chooseTarget(filename,
NUM_OF_DATANODES, dataNodes[0], new ArrayList<DatanodeDescriptor>(),
BLOCK_SIZE);
assertEquals(targets.length, NUM_OF_DATANODES - 2);
final List<LoggingEvent> log = appender.getLog();
assertNotNull(log);
assertFalse(log.size() == 0);
final LoggingEvent lastLogEntry = log.get(log.size() - 1);
assertEquals(lastLogEntry.getLevel(), Level.WARN);
// Suppose to place replicas on each node but two data nodes are not
// available for placing replica, so here we expect a short of 2
assertTrue(((String)lastLogEntry.getMessage()).contains("in need of 2"));
for(int i=0; i<2; i++) {
dataNodes[i].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
}
}
class TestAppender extends AppenderSkeleton {
private final List<LoggingEvent> log = new ArrayList<LoggingEvent>();
@Override
public boolean requiresLayout() {
return false;
}
@Override
protected void append(final LoggingEvent loggingEvent) {
log.add(loggingEvent);
}
@Override
public void close() {
}
public List<LoggingEvent> getLog() {
return new ArrayList<LoggingEvent>(log);
}
}
private boolean containsWithinRange(DatanodeDescriptor target, private boolean containsWithinRange(DatanodeDescriptor target,
DatanodeDescriptor[] nodes, int startIndex, int endIndex) { DatanodeDescriptor[] nodes, int startIndex, int endIndex) {
assert startIndex >= 0 && startIndex < nodes.length; assert startIndex >= 0 && startIndex < nodes.length;

View File

@ -0,0 +1,141 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.BufferedReader;
import java.io.StringReader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Test {@link FSDirectory}, the in-memory namespace tree.
*/
public class TestFSDirectory {
public static final Log LOG = LogFactory.getLog(TestFSDirectory.class);
private static final long seed = 0;
private static final short REPLICATION = 3;
private final Path dir = new Path("/" + getClass().getSimpleName());
private final Path sub1 = new Path(dir, "sub1");
private final Path file1 = new Path(sub1, "file1");
private final Path file2 = new Path(sub1, "file2");
private final Path sub11 = new Path(sub1, "sub11");
private final Path file3 = new Path(sub11, "file3");
private final Path file4 = new Path(sub1, "z_file4");
private final Path file5 = new Path(sub1, "z_file5");
private final Path sub2 = new Path(dir, "sub2");
private Configuration conf;
private MiniDFSCluster cluster;
private FSNamesystem fsn;
private FSDirectory fsdir;
private DistributedFileSystem hdfs;
@Before
public void setUp() throws Exception {
conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(REPLICATION)
.build();
cluster.waitActive();
fsn = cluster.getNamesystem();
fsdir = fsn.getFSDirectory();
hdfs = cluster.getFileSystem();
DFSTestUtil.createFile(hdfs, file1, 1024, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, file2, 1024, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, file3, 1024, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, file5, 1024, REPLICATION, seed);
hdfs.mkdirs(sub2);
}
@After
public void tearDown() throws Exception {
if (cluster != null) {
cluster.shutdown();
}
}
/** Dump the tree, make some changes, and then dump the tree again. */
@Test
public void testDumpTree() throws Exception {
final INode root = fsdir.getINode("/");
LOG.info("Original tree");
final StringBuffer b1 = root.dumpTreeRecursively();
System.out.println("b1=" + b1);
final BufferedReader in = new BufferedReader(new StringReader(b1.toString()));
String line = in.readLine();
checkClassName(line);
for(; (line = in.readLine()) != null; ) {
line = line.trim();
Assert.assertTrue(line.startsWith(INodeDirectory.DUMPTREE_LAST_ITEM)
|| line.startsWith(INodeDirectory.DUMPTREE_EXCEPT_LAST_ITEM));
checkClassName(line);
}
LOG.info("Create a new file " + file4);
DFSTestUtil.createFile(hdfs, file4, 1024, REPLICATION, seed);
final StringBuffer b2 = root.dumpTreeRecursively();
System.out.println("b2=" + b2);
int i = 0;
int j = b1.length() - 1;
for(; b1.charAt(i) == b2.charAt(i); i++);
int k = b2.length() - 1;
for(; b1.charAt(j) == b2.charAt(k); j--, k--);
final String diff = b2.substring(i, k + 1);
System.out.println("i=" + i + ", j=" + j + ", k=" + k);
System.out.println("diff=" + diff);
Assert.assertTrue(i > j);
Assert.assertTrue(diff.contains(file4.getName()));
}
static void checkClassName(String line) {
int i = line.lastIndexOf('(');
int j = line.lastIndexOf('@');
final String classname = line.substring(i+1, j);
Assert.assertTrue(classname.equals(INodeFile.class.getSimpleName())
|| classname.equals(INodeDirectory.class.getSimpleName())
|| classname.equals(INodeDirectoryWithQuota.class.getSimpleName()));
}
}

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.FSLimitException;
import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException; import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
import org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException; import org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
@ -60,17 +59,11 @@ public class TestFsLimits {
return fsn; return fsn;
} }
private static class TestFSDirectory extends FSDirectory { private static class MockFSDirectory extends FSDirectory {
public TestFSDirectory() throws IOException { public MockFSDirectory() throws IOException {
super(new FSImage(conf), getMockNamesystem(), conf); super(new FSImage(conf), getMockNamesystem(), conf);
setReady(fsIsReady); setReady(fsIsReady);
} }
@Override
public <T extends INode> void verifyFsLimits(INode[] pathComponents,
int pos, T child) throws FSLimitException {
super.verifyFsLimits(pathComponents, pos, child);
}
} }
@Before @Before
@ -157,7 +150,7 @@ public class TestFsLimits {
private void addChildWithName(String name, Class<?> expected) private void addChildWithName(String name, Class<?> expected)
throws Exception { throws Exception {
// have to create after the caller has had a chance to set conf values // have to create after the caller has had a chance to set conf values
if (fs == null) fs = new TestFSDirectory(); if (fs == null) fs = new MockFSDirectory();
INode child = new INodeDirectory(name, perms); INode child = new INodeDirectory(name, perms);
child.setLocalName(name); child.setLocalName(name);

View File

@ -192,6 +192,8 @@ Release 2.0.3-alpha - Unreleased
MAPREDUCE-4637. Handle TaskAttempt diagnostic updates while in the NEW and MAPREDUCE-4637. Handle TaskAttempt diagnostic updates while in the NEW and
UNASSIGNED states. (Mayank Bansal via sseth) UNASSIGNED states. (Mayank Bansal via sseth)
MAPREDUCE-1806. CombineFileInputFormat does not work with paths not on default FS. (Gera Shegalov via tucu)
Release 2.0.2-alpha - 2012-09-07 Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
@ -579,6 +581,9 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4596. Split StateMachine state from states seen by MRClientProtocol MAPREDUCE-4596. Split StateMachine state from states seen by MRClientProtocol
for Job, Task and TaskAttempt. (Siddarth Seth via vinodkv) for Job, Task and TaskAttempt. (Siddarth Seth via vinodkv)
MAPREDUCE-4752. Reduce MR AM memory usage through String Interning (Robert
Evans via tgraves)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent
import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider; import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.CompositeService;
@ -280,6 +281,7 @@ public class TaskAttemptListenerImpl extends CompositeService
@Override @Override
public void reportDiagnosticInfo(TaskAttemptID taskAttemptID, String diagnosticInfo) public void reportDiagnosticInfo(TaskAttemptID taskAttemptID, String diagnosticInfo)
throws IOException { throws IOException {
diagnosticInfo = StringInterner.weakIntern(diagnosticInfo);
LOG.info("Diagnostics report from " + taskAttemptID.toString() + ": " LOG.info("Diagnostics report from " + taskAttemptID.toString() + ": "
+ diagnosticInfo); + diagnosticInfo);

View File

@ -105,6 +105,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
@ -940,7 +941,6 @@ public abstract class TaskAttemptImpl implements
Counters counters = reportedStatus.counters; Counters counters = reportedStatus.counters;
if (counters == null) { if (counters == null) {
counters = EMPTY_COUNTERS; counters = EMPTY_COUNTERS;
// counters.groups = new HashMap<String, CounterGroup>();
} }
return counters; return counters;
} finally { } finally {
@ -1262,9 +1262,10 @@ public abstract class TaskAttemptImpl implements
(TaskAttemptContainerAssignedEvent) event; (TaskAttemptContainerAssignedEvent) event;
taskAttempt.containerID = cEvent.getContainer().getId(); taskAttempt.containerID = cEvent.getContainer().getId();
taskAttempt.containerNodeId = cEvent.getContainer().getNodeId(); taskAttempt.containerNodeId = cEvent.getContainer().getNodeId();
taskAttempt.containerMgrAddress = taskAttempt.containerNodeId taskAttempt.containerMgrAddress = StringInterner.weakIntern(
.toString(); taskAttempt.containerNodeId.toString());
taskAttempt.nodeHttpAddress = cEvent.getContainer().getNodeHttpAddress(); taskAttempt.nodeHttpAddress = StringInterner.weakIntern(
cEvent.getContainer().getNodeHttpAddress());
taskAttempt.nodeRackName = RackResolver.resolve( taskAttempt.nodeRackName = RackResolver.resolve(
taskAttempt.containerNodeId.getHost()).getNetworkLocation(); taskAttempt.containerNodeId.getHost()).getNetworkLocation();
taskAttempt.containerToken = cEvent.getContainer().getContainerToken(); taskAttempt.containerToken = cEvent.getContainer().getContainerToken();
@ -1710,7 +1711,6 @@ public abstract class TaskAttemptImpl implements
result.stateString = "NEW"; result.stateString = "NEW";
result.taskState = TaskAttemptState.NEW; result.taskState = TaskAttemptState.NEW;
Counters counters = EMPTY_COUNTERS; Counters counters = EMPTY_COUNTERS;
// counters.groups = new HashMap<String, CounterGroup>();
result.counters = counters; result.counters = counters;
} }

View File

@ -75,6 +75,7 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
@ -667,9 +668,9 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
.newRecordInstance(TaskAttemptCompletionEvent.class); .newRecordInstance(TaskAttemptCompletionEvent.class);
tce.setEventId(-1); tce.setEventId(-1);
String scheme = (encryptedShuffle) ? "https://" : "http://"; String scheme = (encryptedShuffle) ? "https://" : "http://";
tce.setMapOutputServerAddress(scheme tce.setMapOutputServerAddress(StringInterner.weakIntern(scheme
+ attempt.getNodeHttpAddress().split(":")[0] + ":" + attempt.getNodeHttpAddress().split(":")[0] + ":"
+ attempt.getShufflePort()); + attempt.getShufflePort()));
tce.setStatus(status); tce.setStatus(status);
tce.setAttemptId(attempt.getID()); tce.setAttemptId(attempt.getID());
int runTime = 0; int runTime = 0;

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdate
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.AMResponse; import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
@ -620,7 +621,7 @@ public class RMContainerAllocator extends RMContainerRequestor
eventHandler.handle(new TaskAttemptEvent(attemptID, eventHandler.handle(new TaskAttemptEvent(attemptID,
TaskAttemptEventType.TA_CONTAINER_COMPLETED)); TaskAttemptEventType.TA_CONTAINER_COMPLETED));
// Send the diagnostics // Send the diagnostics
String diagnostics = cont.getDiagnostics(); String diagnostics = StringInterner.weakIntern(cont.getDiagnostics());
eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID, eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID,
diagnostics)); diagnostics));
} }

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.ClusterMetrics; import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.TaskTrackerInfo; import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus; import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
import org.apache.hadoop.util.StringInterner;
/** /**
* Status information on the current state of the Map-Reduce cluster. * Status information on the current state of the Map-Reduce cluster.
@ -141,9 +142,9 @@ public class ClusterStatus implements Writable {
@Override @Override
public void readFields(DataInput in) throws IOException { public void readFields(DataInput in) throws IOException {
trackerName = Text.readString(in); trackerName = StringInterner.weakIntern(Text.readString(in));
reasonForBlackListing = Text.readString(in); reasonForBlackListing = StringInterner.weakIntern(Text.readString(in));
blackListReport = Text.readString(in); blackListReport = StringInterner.weakIntern(Text.readString(in));
} }
@Override @Override
@ -429,7 +430,7 @@ public class ClusterStatus implements Writable {
int numTrackerNames = in.readInt(); int numTrackerNames = in.readInt();
if (numTrackerNames > 0) { if (numTrackerNames > 0) {
for (int i = 0; i < numTrackerNames; i++) { for (int i = 0; i < numTrackerNames; i++) {
String name = Text.readString(in); String name = StringInterner.weakIntern(Text.readString(in));
activeTrackers.add(name); activeTrackers.add(name);
} }
} }

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories; import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory; import org.apache.hadoop.io.WritableFactory;
import org.apache.hadoop.util.StringInterner;
/************************************************** /**************************************************
* A JobProfile is a MapReduce primitive. Tracks a job, * A JobProfile is a MapReduce primitive. Tracks a job,
@ -176,11 +177,11 @@ public class JobProfile implements Writable {
public void readFields(DataInput in) throws IOException { public void readFields(DataInput in) throws IOException {
jobid.readFields(in); jobid.readFields(in);
this.jobFile = Text.readString(in); this.jobFile = StringInterner.weakIntern(Text.readString(in));
this.url = Text.readString(in); this.url = StringInterner.weakIntern(Text.readString(in));
this.user = Text.readString(in); this.user = StringInterner.weakIntern(Text.readString(in));
this.name = Text.readString(in); this.name = StringInterner.weakIntern(Text.readString(in));
this.queueName = Text.readString(in); this.queueName = StringInterner.weakIntern(Text.readString(in));
} }
} }

View File

@ -67,6 +67,7 @@ import org.apache.hadoop.util.IndexedSorter;
import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.QuickSort; import org.apache.hadoop.util.QuickSort;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
/** A Map task. */ /** A Map task. */
@ -343,7 +344,7 @@ class MapTask extends Task {
FileSystem fs = file.getFileSystem(conf); FileSystem fs = file.getFileSystem(conf);
FSDataInputStream inFile = fs.open(file); FSDataInputStream inFile = fs.open(file);
inFile.seek(offset); inFile.seek(offset);
String className = Text.readString(inFile); String className = StringInterner.weakIntern(Text.readString(inFile));
Class<T> cls; Class<T> cls;
try { try {
cls = (Class<T>) conf.getClassByName(className); cls = (Class<T>) conf.getClassByName(className);

View File

@ -67,6 +67,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
/** /**
@ -467,7 +468,7 @@ abstract public class Task implements Writable, Configurable {
} }
public void readFields(DataInput in) throws IOException { public void readFields(DataInput in) throws IOException {
jobFile = Text.readString(in); jobFile = StringInterner.weakIntern(Text.readString(in));
taskId = TaskAttemptID.read(in); taskId = TaskAttemptID.read(in);
partition = in.readInt(); partition = in.readInt();
numSlotsRequired = in.readInt(); numSlotsRequired = in.readInt();
@ -487,7 +488,7 @@ abstract public class Task implements Writable, Configurable {
if (taskCleanup) { if (taskCleanup) {
setPhase(TaskStatus.Phase.CLEANUP); setPhase(TaskStatus.Phase.CLEANUP);
} }
user = Text.readString(in); user = StringInterner.weakIntern(Text.readString(in));
extraData.readFields(in); extraData.readFields(in);
} }

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
/************************************************** /**************************************************
* Describes the current status of a task. This is * Describes the current status of a task. This is
@ -477,8 +478,8 @@ public abstract class TaskStatus implements Writable, Cloneable {
setProgress(in.readFloat()); setProgress(in.readFloat());
this.numSlots = in.readInt(); this.numSlots = in.readInt();
this.runState = WritableUtils.readEnum(in, State.class); this.runState = WritableUtils.readEnum(in, State.class);
setDiagnosticInfo(Text.readString(in)); setDiagnosticInfo(StringInterner.weakIntern(Text.readString(in)));
setStateString(Text.readString(in)); setStateString(StringInterner.weakIntern(Text.readString(in)));
this.phase = WritableUtils.readEnum(in, Phase.class); this.phase = WritableUtils.readEnum(in, Phase.class);
this.startTime = in.readLong(); this.startTime = in.readLong();
this.finishTime = in.readLong(); this.finishTime = in.readLong();

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringInterner;
/** /**
* An {@link InputSplit} that tags another InputSplit with extra data for use * An {@link InputSplit} that tags another InputSplit with extra data for use
@ -114,7 +115,7 @@ class TaggedInputSplit implements Configurable, InputSplit {
} }
private Class<?> readClass(DataInput in) throws IOException { private Class<?> readClass(DataInput in) throws IOException {
String className = Text.readString(in); String className = StringInterner.weakIntern(Text.readString(in));
try { try {
return conf.getClassByName(className); return conf.getClassByName(className);
} catch (ClassNotFoundException e) { } catch (ClassNotFoundException e) {

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory; import org.apache.hadoop.io.WritableFactory;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.util.StringInterner;
/************************************************** /**************************************************
* Describes the current status of a job. * Describes the current status of a job.
@ -456,15 +457,15 @@ public class JobStatus implements Writable, Cloneable {
this.cleanupProgress = in.readFloat(); this.cleanupProgress = in.readFloat();
this.runState = WritableUtils.readEnum(in, State.class); this.runState = WritableUtils.readEnum(in, State.class);
this.startTime = in.readLong(); this.startTime = in.readLong();
this.user = Text.readString(in); this.user = StringInterner.weakIntern(Text.readString(in));
this.priority = WritableUtils.readEnum(in, JobPriority.class); this.priority = WritableUtils.readEnum(in, JobPriority.class);
this.schedulingInfo = Text.readString(in); this.schedulingInfo = StringInterner.weakIntern(Text.readString(in));
this.finishTime = in.readLong(); this.finishTime = in.readLong();
this.isRetired = in.readBoolean(); this.isRetired = in.readBoolean();
this.historyFile = Text.readString(in); this.historyFile = StringInterner.weakIntern(Text.readString(in));
this.jobName = Text.readString(in); this.jobName = StringInterner.weakIntern(Text.readString(in));
this.trackingUrl = Text.readString(in); this.trackingUrl = StringInterner.weakIntern(Text.readString(in));
this.jobFile = Text.readString(in); this.jobFile = StringInterner.weakIntern(Text.readString(in));
this.isUber = in.readBoolean(); this.isUber = in.readBoolean();
// De-serialize the job's ACLs // De-serialize the job's ACLs

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.util.StringInterner;
/** /**
* Class to encapsulate Queue ACLs for a particular * Class to encapsulate Queue ACLs for a particular
@ -82,7 +83,7 @@ public class QueueAclsInfo implements Writable {
@Override @Override
public void readFields(DataInput in) throws IOException { public void readFields(DataInput in) throws IOException {
queueName = Text.readString(in); queueName = StringInterner.weakIntern(Text.readString(in));
operations = WritableUtils.readStringArray(in); operations = WritableUtils.readStringArray(in);
} }

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.util.StringInterner;
/** /**
* Class that contains the information regarding the Job Queues which are * Class that contains the information regarding the Job Queues which are
@ -190,9 +191,9 @@ public class QueueInfo implements Writable {
@Override @Override
public void readFields(DataInput in) throws IOException { public void readFields(DataInput in) throws IOException {
queueName = Text.readString(in); queueName = StringInterner.weakIntern(Text.readString(in));
queueState = WritableUtils.readEnum(in, QueueState.class); queueState = WritableUtils.readEnum(in, QueueState.class);
schedulingInfo = Text.readString(in); schedulingInfo = StringInterner.weakIntern(Text.readString(in));
int length = in.readInt(); int length = in.readInt();
stats = new JobStatus[length]; stats = new JobStatus[length];
for (int i = 0; i < length; i++) { for (int i = 0; i < length; i++) {

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.TIPStatus; import org.apache.hadoop.mapred.TIPStatus;
import org.apache.hadoop.util.StringInterner;
/** A report on the state of a task. */ /** A report on the state of a task. */
@InterfaceAudience.Private @InterfaceAudience.Private
@ -208,7 +209,7 @@ public class TaskReport implements Writable {
public void readFields(DataInput in) throws IOException { public void readFields(DataInput in) throws IOException {
this.taskid.readFields(in); this.taskid.readFields(in);
this.progress = in.readFloat(); this.progress = in.readFloat();
this.state = Text.readString(in); this.state = StringInterner.weakIntern(Text.readString(in));
this.startTime = in.readLong(); this.startTime = in.readLong();
this.finishTime = in.readLong(); this.finishTime = in.readLong();

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.util.ResourceBundles; import org.apache.hadoop.mapreduce.util.ResourceBundles;
import org.apache.hadoop.util.StringInterner;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
@ -164,7 +165,7 @@ public abstract class AbstractCounterGroup<T extends Counter>
@Override @Override
public synchronized void readFields(DataInput in) throws IOException { public synchronized void readFields(DataInput in) throws IOException {
displayName = Text.readString(in); displayName = StringInterner.weakIntern(Text.readString(in));
counters.clear(); counters.clear();
int size = WritableUtils.readVInt(in); int size = WritableUtils.readVInt(in);
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.FileSystemCounter; import org.apache.hadoop.mapreduce.FileSystemCounter;
import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.util.StringInterner;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
@ -308,7 +309,8 @@ public abstract class AbstractCounters<C extends Counter,
int numGroups = WritableUtils.readVInt(in); int numGroups = WritableUtils.readVInt(in);
while (numGroups-- > 0) { while (numGroups-- > 0) {
limits.checkGroups(groups.size() + 1); limits.checkGroups(groups.size() + 1);
G group = groupFactory.newGenericGroup(Text.readString(in), null, limits); G group = groupFactory.newGenericGroup(
StringInterner.weakIntern(Text.readString(in)), null, limits);
group.readFields(in); group.readFields(in);
groups.put(group.getName(), group); groups.put(group.getName(), group);
} }

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.util.StringInterner;
/** /**
* A generic counter implementation * A generic counter implementation
@ -59,8 +60,9 @@ public class GenericCounter extends AbstractCounter {
@Override @Override
public synchronized void readFields(DataInput in) throws IOException { public synchronized void readFields(DataInput in) throws IOException {
name = Text.readString(in); name = StringInterner.weakIntern(Text.readString(in));
displayName = in.readBoolean() ? Text.readString(in) : name; displayName = in.readBoolean() ?
StringInterner.weakIntern(Text.readString(in)) : name;
value = WritableUtils.readVLong(in); value = WritableUtils.readVLong(in);
} }

View File

@ -269,7 +269,7 @@ public class JobHistoryParser {
TaskAttemptInfo attemptInfo = TaskAttemptInfo attemptInfo =
taskInfo.attemptsMap.get(event.getTaskAttemptId()); taskInfo.attemptsMap.get(event.getTaskAttemptId());
attemptInfo.finishTime = event.getFinishTime(); attemptInfo.finishTime = event.getFinishTime();
attemptInfo.error = event.getError(); attemptInfo.error = StringInterner.weakIntern(event.getError());
attemptInfo.status = StringInterner.weakIntern(event.getTaskStatus()); attemptInfo.status = StringInterner.weakIntern(event.getTaskStatus());
attemptInfo.hostname = StringInterner.weakIntern(event.getHostname()); attemptInfo.hostname = StringInterner.weakIntern(event.getHostname());
attemptInfo.port = event.getPort(); attemptInfo.port = event.getPort();
@ -326,7 +326,7 @@ public class JobHistoryParser {
TaskInfo taskInfo = info.tasksMap.get(event.getTaskId()); TaskInfo taskInfo = info.tasksMap.get(event.getTaskId());
taskInfo.status = TaskStatus.State.FAILED.toString(); taskInfo.status = TaskStatus.State.FAILED.toString();
taskInfo.finishTime = event.getFinishTime(); taskInfo.finishTime = event.getFinishTime();
taskInfo.error = event.getError(); taskInfo.error = StringInterner.weakIntern(event.getError());
taskInfo.failedDueToAttemptId = event.getFailedAttemptID(); taskInfo.failedDueToAttemptId = event.getFailedAttemptID();
info.errorInfo = "Task " + taskInfo.taskId +" failed " + info.errorInfo = "Task " + taskInfo.taskId +" failed " +
taskInfo.attemptsMap.size() + " times "; taskInfo.attemptsMap.size() + " times ";

View File

@ -215,7 +215,8 @@ public abstract class CombineFileInputFormat<K, V>
// times, one time each for each pool in the next loop. // times, one time each for each pool in the next loop.
List<Path> newpaths = new LinkedList<Path>(); List<Path> newpaths = new LinkedList<Path>();
for (int i = 0; i < paths.length; i++) { for (int i = 0; i < paths.length; i++) {
Path p = new Path(paths[i].toUri().getPath()); FileSystem fs = paths[i].getFileSystem(conf);
Path p = fs.makeQualified(paths[i]);
newpaths.add(p); newpaths.add(p);
} }
paths = null; paths = null;

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringInterner;
/** /**
* An {@link InputSplit} that tags another InputSplit with extra data for use * An {@link InputSplit} that tags another InputSplit with extra data for use
@ -128,7 +129,7 @@ class TaggedInputSplit extends InputSplit implements Configurable, Writable {
} }
private Class<?> readClass(DataInput in) throws IOException { private Class<?> readClass(DataInput in) throws IOException {
String className = Text.readString(in); String className = StringInterner.weakIntern(Text.readString(in));
try { try {
return conf.getClassByName(className); return conf.getClassByName(className);
} catch (ClassNotFoundException e) { } catch (ClassNotFoundException e) {

View File

@ -76,6 +76,8 @@ public class TestCombineFileInputFormat extends TestCase {
static final int BLOCKSIZE = 1024; static final int BLOCKSIZE = 1024;
static final byte[] databuf = new byte[BLOCKSIZE]; static final byte[] databuf = new byte[BLOCKSIZE];
private static final String DUMMY_FS_URI = "dummyfs:///";
/** Dummy class to extend CombineFileInputFormat*/ /** Dummy class to extend CombineFileInputFormat*/
private class DummyInputFormat extends CombineFileInputFormat<Text, Text> { private class DummyInputFormat extends CombineFileInputFormat<Text, Text> {
@Override @Override
@ -1145,6 +1147,38 @@ public class TestCombineFileInputFormat extends TestCase {
fileSys.delete(file.getParent(), true); fileSys.delete(file.getParent(), true);
} }
/**
* Test when input files are from non-default file systems
*/
@Test
public void testForNonDefaultFileSystem() throws Throwable {
Configuration conf = new Configuration();
// use a fake file system scheme as default
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, DUMMY_FS_URI);
// default fs path
assertEquals(DUMMY_FS_URI, FileSystem.getDefaultUri(conf).toString());
// add a local file
Path localPath = new Path("testFile1");
FileSystem lfs = FileSystem.getLocal(conf);
FSDataOutputStream dos = lfs.create(localPath);
dos.writeChars("Local file for CFIF");
dos.close();
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, lfs.makeQualified(localPath));
DummyInputFormat inFormat = new DummyInputFormat();
List<InputSplit> splits = inFormat.getSplits(job);
assertTrue(splits.size() > 0);
for (InputSplit s : splits) {
CombineFileSplit cfs = (CombineFileSplit)s;
for (Path p : cfs.getPaths()) {
assertEquals(p.toUri().getScheme(), "file");
}
}
}
static class TestFilter implements PathFilter { static class TestFilter implements PathFilter {
private Path p; private Path p;
@ -1156,7 +1190,7 @@ public class TestCombineFileInputFormat extends TestCase {
// returns true if the specified path matches the prefix stored // returns true if the specified path matches the prefix stored
// in this TestFilter. // in this TestFilter.
public boolean accept(Path path) { public boolean accept(Path path) {
if (path.toString().indexOf(p.toString()) == 0) { if (path.toUri().getPath().indexOf(p.toString()) == 0) {
return true; return true;
} }
return false; return false;

View File

@ -187,6 +187,15 @@ Release 0.23.5 - UNRELEASED
YARN-139. Interrupted Exception within AsyncDispatcher leads to user YARN-139. Interrupted Exception within AsyncDispatcher leads to user
confusion. (Vinod Kumar Vavilapalli via jlowe) confusion. (Vinod Kumar Vavilapalli via jlowe)
YARN-165. RM should point tracking URL to RM web page for app when AM fails
(jlowe via bobby)
YARN-159. RM web ui applications page should be sorted to display last app
first (tgraves via bobby)
YARN-166. capacity scheduler doesn't allow capacity < 1.0 (tgraves via
bobby)
Release 0.23.4 - UNRELEASED Release 0.23.4 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.util;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -109,6 +110,7 @@ public class Apps {
} else { } else {
val = val + SYSTEM_PATH_SEPARATOR + value; val = val + SYSTEM_PATH_SEPARATOR + value;
} }
environment.put(variable, val); environment.put(StringInterner.weakIntern(variable),
StringInterner.weakIntern(val));
} }
} }

View File

@ -531,7 +531,7 @@ public class RMAppImpl implements RMApp {
RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId, RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId,
clientTokenStr, rmContext, scheduler, masterService, clientTokenStr, rmContext, scheduler, masterService,
submissionContext, YarnConfiguration.getProxyHostAndPort(conf)); submissionContext, conf);
attempts.put(appAttemptId, attempt); attempts.put(appAttemptId, attempt);
currentAttempt = attempt; currentAttempt = attempt;
handler.handle( handler.handle(

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt; package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
import static org.apache.hadoop.yarn.util.StringHelper.pjoin;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
@ -33,6 +35,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@ -45,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -128,7 +132,7 @@ public class RMAppAttemptImpl implements RMAppAttempt {
private FinalApplicationStatus finalStatus = null; private FinalApplicationStatus finalStatus = null;
private final StringBuilder diagnostics = new StringBuilder(); private final StringBuilder diagnostics = new StringBuilder();
private final String proxy; private Configuration conf;
private static final StateMachineFactory<RMAppAttemptImpl, private static final StateMachineFactory<RMAppAttemptImpl,
RMAppAttemptState, RMAppAttemptState,
@ -285,9 +289,9 @@ public class RMAppAttemptImpl implements RMAppAttempt {
String clientToken, RMContext rmContext, YarnScheduler scheduler, String clientToken, RMContext rmContext, YarnScheduler scheduler,
ApplicationMasterService masterService, ApplicationMasterService masterService,
ApplicationSubmissionContext submissionContext, ApplicationSubmissionContext submissionContext,
String proxy) { Configuration conf) {
this.proxy = proxy; this.conf = conf;
this.applicationAttemptId = appAttemptId; this.applicationAttemptId = appAttemptId;
this.rmContext = rmContext; this.rmContext = rmContext;
this.eventHandler = rmContext.getDispatcher().getEventHandler(); this.eventHandler = rmContext.getDispatcher().getEventHandler();
@ -397,6 +401,7 @@ public class RMAppAttemptImpl implements RMAppAttempt {
try { try {
URI trackingUri = trackingUriWithoutScheme == null ? null : URI trackingUri = trackingUriWithoutScheme == null ? null :
ProxyUriUtils.getUriFromAMUrl(trackingUriWithoutScheme); ProxyUriUtils.getUriFromAMUrl(trackingUriWithoutScheme);
String proxy = YarnConfiguration.getProxyHostAndPort(conf);
URI proxyUri = ProxyUriUtils.getUriFromAMUrl(proxy); URI proxyUri = ProxyUriUtils.getUriFromAMUrl(proxy);
URI result = ProxyUriUtils.getProxyUri(trackingUri, proxyUri, URI result = ProxyUriUtils.getProxyUri(trackingUri, proxyUri,
applicationAttemptId.getApplicationId()); applicationAttemptId.getApplicationId());
@ -977,15 +982,13 @@ public class RMAppAttemptImpl implements RMAppAttempt {
" due to: " + containerStatus.getDiagnostics() + "." + " due to: " + containerStatus.getDiagnostics() + "." +
"Failing this attempt."); "Failing this attempt.");
/* // When the AM dies, the trackingUrl is left pointing to the AM's URL,
* In the case when the AM dies, the trackingUrl is left pointing to the AM's // which shows up in the scheduler UI as a broken link. Direct the
* URL, which shows up in the scheduler UI as a broken link. Setting it here // user to the app page on the RM so they can see the status and logs.
* to empty string will prevent any link from being displayed. appAttempt.origTrackingUrl = pjoin(
* NOTE: don't set trackingUrl to 'null'. That will cause null-pointer exceptions YarnConfiguration.getRMWebAppHostAndPort(appAttempt.conf),
* in the generated proto code. "cluster", "app", appAttempt.getAppAttemptId().getApplicationId());
*/ appAttempt.proxiedTrackingUrl = appAttempt.origTrackingUrl;
appAttempt.origTrackingUrl = "";
appAttempt.proxiedTrackingUrl = "";
new FinalTransition(RMAppAttemptState.FAILED).transition( new FinalTransition(RMAppAttemptState.FAILED).transition(
appAttempt, containerFinishedEvent); appAttempt, containerFinishedEvent);

View File

@ -84,7 +84,7 @@ class CSQueueUtils {
if (clusterMemory > 0) { if (clusterMemory > 0) {
queueLimit = clusterMemory * childQueue.getAbsoluteCapacity(); queueLimit = clusterMemory * childQueue.getAbsoluteCapacity();
absoluteUsedCapacity = ((float)usedMemory / (float)clusterMemory); absoluteUsedCapacity = ((float)usedMemory / (float)clusterMemory);
usedCapacity = (usedMemory / queueLimit); usedCapacity = (queueLimit == 0) ? 0 : (usedMemory / queueLimit);
} }
childQueue.setUsedCapacity(usedCapacity); childQueue.setUsedCapacity(usedCapacity);

View File

@ -91,7 +91,7 @@ public class CapacitySchedulerConfiguration extends Configuration {
public static final float UNDEFINED = -1; public static final float UNDEFINED = -1;
@Private @Private
public static final float MINIMUM_CAPACITY_VALUE = 1; public static final float MINIMUM_CAPACITY_VALUE = 0;
@Private @Private
public static final float MAXIMUM_CAPACITY_VALUE = 100; public static final float MAXIMUM_CAPACITY_VALUE = 100;

View File

@ -202,7 +202,9 @@ public class ParentQueue implements CSQueue {
childCapacities += queue.getCapacity(); childCapacities += queue.getCapacity();
} }
float delta = Math.abs(1.0f - childCapacities); // crude way to check float delta = Math.abs(1.0f - childCapacities); // crude way to check
if (delta > PRECISION) { // allow capacities being set to 0, and enforce child 0 if parent is 0
if (((capacity > 0) && (delta > PRECISION)) ||
((capacity == 0) && (childCapacities > 0))) {
throw new IllegalArgumentException("Illegal" + throw new IllegalArgumentException("Illegal" +
" capacity of " + childCapacities + " capacity of " + childCapacities +
" for children of queue " + queueName); " for children of queue " + queueName);

View File

@ -69,7 +69,7 @@ public class RmView extends TwoColumnLayout {
append("null,{sType:'title-numeric', bSearchable:false}, null]"); append("null,{sType:'title-numeric', bSearchable:false}, null]");
// Sort by id upon page load // Sort by id upon page load
init.append(", aaSorting: [[0, 'asc']]"); init.append(", aaSorting: [[0, 'desc']]");
String rows = $("rowlimit"); String rows = $("rowlimit");
int rowLimit = rows.isEmpty() ? MAX_DISPLAY_ROWS : Integer.parseInt(rows); int rowLimit = rows.isEmpty() ? MAX_DISPLAY_ROWS : Integer.parseInt(rows);

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt; package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
import static org.apache.hadoop.yarn.util.StringHelper.pjoin;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
@ -43,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
@ -85,6 +87,8 @@ public class TestRMAppAttemptTransitions {
LogFactory.getLog(TestRMAppAttemptTransitions.class); LogFactory.getLog(TestRMAppAttemptTransitions.class);
private static final String EMPTY_DIAGNOSTICS = ""; private static final String EMPTY_DIAGNOSTICS = "";
private static final String RM_WEBAPP_ADDR =
YarnConfiguration.getRMWebAppHostAndPort(new Configuration());
private RMContext rmContext; private RMContext rmContext;
private YarnScheduler scheduler; private YarnScheduler scheduler;
@ -203,7 +207,7 @@ public class TestRMAppAttemptTransitions {
application = mock(RMApp.class); application = mock(RMApp.class);
applicationAttempt = applicationAttempt =
new RMAppAttemptImpl(applicationAttemptId, null, rmContext, scheduler, new RMAppAttemptImpl(applicationAttemptId, null, rmContext, scheduler,
masterService, submissionContext, null); masterService, submissionContext, new Configuration());
when(application.getCurrentAppAttempt()).thenReturn(applicationAttempt); when(application.getCurrentAppAttempt()).thenReturn(applicationAttempt);
when(application.getApplicationId()).thenReturn(applicationId); when(application.getApplicationId()).thenReturn(applicationId);
@ -216,6 +220,11 @@ public class TestRMAppAttemptTransitions {
} }
private String getProxyUrl(RMAppAttempt appAttempt) {
return pjoin(RM_WEBAPP_ADDR, "proxy",
appAttempt.getAppAttemptId().getApplicationId(), "");
}
/** /**
* {@link RMAppAttemptState#NEW} * {@link RMAppAttemptState#NEW}
*/ */
@ -373,8 +382,8 @@ public class TestRMAppAttemptTransitions {
assertEquals(host, applicationAttempt.getHost()); assertEquals(host, applicationAttempt.getHost());
assertEquals(rpcPort, applicationAttempt.getRpcPort()); assertEquals(rpcPort, applicationAttempt.getRpcPort());
assertEquals(trackingUrl, applicationAttempt.getOriginalTrackingUrl()); assertEquals(trackingUrl, applicationAttempt.getOriginalTrackingUrl());
assertEquals("null/proxy/"+applicationAttempt.getAppAttemptId(). assertEquals(getProxyUrl(applicationAttempt),
getApplicationId()+"/", applicationAttempt.getTrackingUrl()); applicationAttempt.getTrackingUrl());
// TODO - need to add more checks relevant to this state // TODO - need to add more checks relevant to this state
} }
@ -390,8 +399,8 @@ public class TestRMAppAttemptTransitions {
applicationAttempt.getAppAttemptState()); applicationAttempt.getAppAttemptState());
assertEquals(diagnostics, applicationAttempt.getDiagnostics()); assertEquals(diagnostics, applicationAttempt.getDiagnostics());
assertEquals(trackingUrl, applicationAttempt.getOriginalTrackingUrl()); assertEquals(trackingUrl, applicationAttempt.getOriginalTrackingUrl());
assertEquals("null/proxy/"+applicationAttempt.getAppAttemptId(). assertEquals(getProxyUrl(applicationAttempt),
getApplicationId()+"/", applicationAttempt.getTrackingUrl()); applicationAttempt.getTrackingUrl());
assertEquals(container, applicationAttempt.getMasterContainer()); assertEquals(container, applicationAttempt.getMasterContainer());
assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus()); assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus());
} }
@ -408,8 +417,8 @@ public class TestRMAppAttemptTransitions {
applicationAttempt.getAppAttemptState()); applicationAttempt.getAppAttemptState());
assertEquals(diagnostics, applicationAttempt.getDiagnostics()); assertEquals(diagnostics, applicationAttempt.getDiagnostics());
assertEquals(trackingUrl, applicationAttempt.getOriginalTrackingUrl()); assertEquals(trackingUrl, applicationAttempt.getOriginalTrackingUrl());
assertEquals("null/proxy/"+applicationAttempt.getAppAttemptId(). assertEquals(getProxyUrl(applicationAttempt),
getApplicationId()+"/", applicationAttempt.getTrackingUrl()); applicationAttempt.getTrackingUrl());
assertEquals(finishedContainerCount, applicationAttempt assertEquals(finishedContainerCount, applicationAttempt
.getJustFinishedContainers().size()); .getJustFinishedContainers().size());
assertEquals(container, applicationAttempt.getMasterContainer()); assertEquals(container, applicationAttempt.getMasterContainer());
@ -597,6 +606,29 @@ public class TestRMAppAttemptTransitions {
testAppAttemptFailedState(amContainer, diagnostics); testAppAttemptFailedState(amContainer, diagnostics);
} }
@Test
public void testRunningToFailed() {
Container amContainer = allocateApplicationAttempt();
launchApplicationAttempt(amContainer);
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
String containerDiagMsg = "some error";
int exitCode = 123;
ContainerStatus cs = BuilderUtils.newContainerStatus(amContainer.getId(),
ContainerState.COMPLETE, containerDiagMsg, exitCode);
ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId();
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
appAttemptId, cs));
assertEquals(RMAppAttemptState.FAILED,
applicationAttempt.getAppAttemptState());
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
assertEquals(amContainer, applicationAttempt.getMasterContainer());
assertEquals(0, applicationAttempt.getRanNodes().size());
String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app",
applicationAttempt.getAppAttemptId().getApplicationId());
assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
}
@Test @Test
public void testUnregisterToKilledFinishing() { public void testUnregisterToKilledFinishing() {
Container amContainer = allocateApplicationAttempt(); Container amContainer = allocateApplicationAttempt();

View File

@ -66,8 +66,8 @@ public class TestCapacityScheduler {
private static float B_CAPACITY = 89.5f; private static float B_CAPACITY = 89.5f;
private static float A1_CAPACITY = 30; private static float A1_CAPACITY = 30;
private static float A2_CAPACITY = 70; private static float A2_CAPACITY = 70;
private static float B1_CAPACITY = 50; private static float B1_CAPACITY = 79.2f;
private static float B2_CAPACITY = 30; private static float B2_CAPACITY = 0.8f;
private static float B3_CAPACITY = 20; private static float B3_CAPACITY = 20;
private ResourceManager resourceManager = null; private ResourceManager resourceManager = null;

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
@ -495,6 +496,72 @@ public class TestParentQueue {
reset(a); reset(b); reset(c); reset(a); reset(b); reset(c);
} }
@Test (expected=IllegalArgumentException.class)
public void testQueueCapacitySettingChildZero() throws Exception {
// Setup queue configs
setupMultiLevelQueues(csConf);
// set child queues capacity to 0 when parents not 0
final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
csConf.setCapacity(Q_B + "." + B1, 0);
csConf.setCapacity(Q_B + "." + B2, 0);
csConf.setCapacity(Q_B + "." + B3, 0);
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
CapacityScheduler.queueComparator,
CapacityScheduler.applicationComparator,
TestUtils.spyHook);
}
@Test (expected=IllegalArgumentException.class)
public void testQueueCapacitySettingParentZero() throws Exception {
// Setup queue configs
setupMultiLevelQueues(csConf);
// set parent capacity to 0 when child not 0
final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
csConf.setCapacity(Q_B, 0);
final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A;
csConf.setCapacity(Q_A, 60);
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
CapacityScheduler.queueComparator,
CapacityScheduler.applicationComparator,
TestUtils.spyHook);
}
@Test
public void testQueueCapacityZero() throws Exception {
// Setup queue configs
setupMultiLevelQueues(csConf);
// set parent and child capacity to 0
final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
csConf.setCapacity(Q_B, 0);
csConf.setCapacity(Q_B + "." + B1, 0);
csConf.setCapacity(Q_B + "." + B2, 0);
csConf.setCapacity(Q_B + "." + B3, 0);
final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A;
csConf.setCapacity(Q_A, 60);
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
try {
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
CapacityScheduler.queueComparator,
CapacityScheduler.applicationComparator,
TestUtils.spyHook);
} catch (IllegalArgumentException e) {
fail("Failed to create queues with 0 capacity: " + e);
}
assertTrue("Failed to create queues with 0 capacity", true);
}
@Test @Test
public void testOffSwitchScheduling() throws Exception { public void testOffSwitchScheduling() throws Exception {
// Setup queue configs // Setup queue configs

View File

@ -144,11 +144,11 @@ public class TestRMWebServicesCapacitySched extends JerseyTest {
final String B2 = B + ".b2"; final String B2 = B + ".b2";
final String B3 = B + ".b3"; final String B3 = B + ".b3";
conf.setQueues(B, new String[] { "b1", "b2", "b3" }); conf.setQueues(B, new String[] { "b1", "b2", "b3" });
conf.setCapacity(B1, 50); conf.setCapacity(B1, 60);
conf.setUserLimitFactor(B1, 100.0f); conf.setUserLimitFactor(B1, 100.0f);
conf.setCapacity(B2, 30); conf.setCapacity(B2, 39.5f);
conf.setUserLimitFactor(B2, 100.0f); conf.setUserLimitFactor(B2, 100.0f);
conf.setCapacity(B3, 20); conf.setCapacity(B3, 0.5f);
conf.setUserLimitFactor(B3, 100.0f); conf.setUserLimitFactor(B3, 100.0f);
conf.setQueues(A1, new String[] {"a1a", "a1b"}); conf.setQueues(A1, new String[] {"a1a", "a1b"});