Merge r1555021 through r1557038 from trunk.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1557039 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
commit
1c7aa44d62
|
@ -412,6 +412,12 @@ Release 2.4.0 - UNRELEASED
|
||||||
HADOOP-10198. DomainSocket: add support for socketpair.
|
HADOOP-10198. DomainSocket: add support for socketpair.
|
||||||
(Colin Patrick McCabe via wang)
|
(Colin Patrick McCabe via wang)
|
||||||
|
|
||||||
|
HADOOP-10208. Remove duplicate initialization in StringUtils.getStringCollection.
|
||||||
|
(Benoy Antony via jing9)
|
||||||
|
|
||||||
|
HADOOP-9420. Add percentile or max metric for rpcQueueTime, processing time.
|
||||||
|
(Liang Xie via wang)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)
|
HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)
|
||||||
|
@ -500,6 +506,11 @@ Release 2.4.0 - UNRELEASED
|
||||||
HADOOP-10147 HDFS-5678 Upgrade to commons-logging 1.1.3 to avoid potential
|
HADOOP-10147 HDFS-5678 Upgrade to commons-logging 1.1.3 to avoid potential
|
||||||
deadlock in MiniDFSCluster (stevel)
|
deadlock in MiniDFSCluster (stevel)
|
||||||
|
|
||||||
|
HADOOP-10207. TestUserGroupInformation#testLogin is flaky (jxiang via cmccabe)
|
||||||
|
|
||||||
|
HADOOP-10214. Fix multithreaded correctness warnings in ActiveStandbyElector
|
||||||
|
(Liang Xie via kasha)
|
||||||
|
|
||||||
Release 2.3.0 - UNRELEASED
|
Release 2.3.0 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -242,4 +242,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
|
||||||
|
|
||||||
public static final String HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS =
|
public static final String HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS =
|
||||||
"hadoop.user.group.metrics.percentiles.intervals";
|
"hadoop.user.group.metrics.percentiles.intervals";
|
||||||
|
|
||||||
|
public static final String RPC_METRICS_QUANTILE_ENABLE =
|
||||||
|
"rpc.metrics.quantile.enable";
|
||||||
|
public static final String RPC_METRICS_PERCENTILES_INTERVALS_KEY =
|
||||||
|
"rpc.metrics.percentiles.intervals";
|
||||||
}
|
}
|
||||||
|
|
|
@ -768,7 +768,7 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
||||||
}
|
}
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public void terminateConnection() {
|
public synchronized void terminateConnection() {
|
||||||
if (zkClient == null) {
|
if (zkClient == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2193,7 +2193,7 @@ public abstract class Server {
|
||||||
listener = new Listener();
|
listener = new Listener();
|
||||||
this.port = listener.getAddress().getPort();
|
this.port = listener.getAddress().getPort();
|
||||||
connectionManager = new ConnectionManager();
|
connectionManager = new ConnectionManager();
|
||||||
this.rpcMetrics = RpcMetrics.create(this);
|
this.rpcMetrics = RpcMetrics.create(this, conf);
|
||||||
this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port);
|
this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port);
|
||||||
this.tcpNoDelay = conf.getBoolean(
|
this.tcpNoDelay = conf.getBoolean(
|
||||||
CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,
|
CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,
|
||||||
|
|
|
@ -19,14 +19,17 @@ package org.apache.hadoop.ipc.metrics;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
|
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableRate;
|
import org.apache.hadoop.metrics2.lib.MutableRate;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -41,26 +44,48 @@ public class RpcMetrics {
|
||||||
final Server server;
|
final Server server;
|
||||||
final MetricsRegistry registry;
|
final MetricsRegistry registry;
|
||||||
final String name;
|
final String name;
|
||||||
|
final boolean rpcQuantileEnable;
|
||||||
|
|
||||||
RpcMetrics(Server server) {
|
RpcMetrics(Server server, Configuration conf) {
|
||||||
String port = String.valueOf(server.getListenerAddress().getPort());
|
String port = String.valueOf(server.getListenerAddress().getPort());
|
||||||
name = "RpcActivityForPort"+ port;
|
name = "RpcActivityForPort" + port;
|
||||||
this.server = server;
|
this.server = server;
|
||||||
registry = new MetricsRegistry("rpc").tag("port", "RPC port", port);
|
registry = new MetricsRegistry("rpc").tag("port", "RPC port", port);
|
||||||
LOG.debug("Initialized "+ registry);
|
int[] intervals = conf.getInts(
|
||||||
|
CommonConfigurationKeys.RPC_METRICS_PERCENTILES_INTERVALS_KEY);
|
||||||
|
rpcQuantileEnable = (intervals.length > 0) && conf.getBoolean(
|
||||||
|
CommonConfigurationKeys.RPC_METRICS_QUANTILE_ENABLE, false);
|
||||||
|
if (rpcQuantileEnable) {
|
||||||
|
rpcQueueTimeMillisQuantiles =
|
||||||
|
new MutableQuantiles[intervals.length];
|
||||||
|
rpcProcessingTimeMillisQuantiles =
|
||||||
|
new MutableQuantiles[intervals.length];
|
||||||
|
for (int i = 0; i < intervals.length; i++) {
|
||||||
|
int interval = intervals[i];
|
||||||
|
rpcQueueTimeMillisQuantiles[i] = registry.newQuantiles("rpcQueueTime"
|
||||||
|
+ interval + "s", "rpc queue time in milli second", "ops",
|
||||||
|
"latency", interval);
|
||||||
|
rpcProcessingTimeMillisQuantiles[i] = registry.newQuantiles(
|
||||||
|
"rpcProcessingTime" + interval + "s",
|
||||||
|
"rpc processing time in milli second", "ops", "latency", interval);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.debug("Initialized " + registry);
|
||||||
}
|
}
|
||||||
|
|
||||||
public String name() { return name; }
|
public String name() { return name; }
|
||||||
|
|
||||||
public static RpcMetrics create(Server server) {
|
public static RpcMetrics create(Server server, Configuration conf) {
|
||||||
RpcMetrics m = new RpcMetrics(server);
|
RpcMetrics m = new RpcMetrics(server, conf);
|
||||||
return DefaultMetricsSystem.instance().register(m.name, null, m);
|
return DefaultMetricsSystem.instance().register(m.name, null, m);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Metric("Number of received bytes") MutableCounterLong receivedBytes;
|
@Metric("Number of received bytes") MutableCounterLong receivedBytes;
|
||||||
@Metric("Number of sent bytes") MutableCounterLong sentBytes;
|
@Metric("Number of sent bytes") MutableCounterLong sentBytes;
|
||||||
@Metric("Queue time") MutableRate rpcQueueTime;
|
@Metric("Queue time") MutableRate rpcQueueTime;
|
||||||
|
MutableQuantiles[] rpcQueueTimeMillisQuantiles;
|
||||||
@Metric("Processsing time") MutableRate rpcProcessingTime;
|
@Metric("Processsing time") MutableRate rpcProcessingTime;
|
||||||
|
MutableQuantiles[] rpcProcessingTimeMillisQuantiles;
|
||||||
@Metric("Number of authentication failures")
|
@Metric("Number of authentication failures")
|
||||||
MutableCounterInt rpcAuthenticationFailures;
|
MutableCounterInt rpcAuthenticationFailures;
|
||||||
@Metric("Number of authentication successes")
|
@Metric("Number of authentication successes")
|
||||||
|
@ -146,6 +171,11 @@ public class RpcMetrics {
|
||||||
//@Override
|
//@Override
|
||||||
public void addRpcQueueTime(int qTime) {
|
public void addRpcQueueTime(int qTime) {
|
||||||
rpcQueueTime.add(qTime);
|
rpcQueueTime.add(qTime);
|
||||||
|
if (rpcQuantileEnable) {
|
||||||
|
for (MutableQuantiles q : rpcQueueTimeMillisQuantiles) {
|
||||||
|
q.add(qTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -155,5 +185,10 @@ public class RpcMetrics {
|
||||||
//@Override
|
//@Override
|
||||||
public void addRpcProcessingTime(int processingTime) {
|
public void addRpcProcessingTime(int processingTime) {
|
||||||
rpcProcessingTime.add(processingTime);
|
rpcProcessingTime.add(processingTime);
|
||||||
|
if (rpcQuantileEnable) {
|
||||||
|
for (MutableQuantiles q : rpcProcessingTimeMillisQuantiles) {
|
||||||
|
q.add(processingTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -343,7 +343,6 @@ public class StringUtils {
|
||||||
if (str == null)
|
if (str == null)
|
||||||
return values;
|
return values;
|
||||||
StringTokenizer tokenizer = new StringTokenizer(str, delim);
|
StringTokenizer tokenizer = new StringTokenizer(str, delim);
|
||||||
values = new ArrayList<String>();
|
|
||||||
while (tokenizer.hasMoreTokens()) {
|
while (tokenizer.hasMoreTokens()) {
|
||||||
values.add(tokenizer.nextToken());
|
values.add(tokenizer.nextToken());
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.ipc;
|
||||||
|
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
|
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
|
||||||
|
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotSame;
|
import static org.junit.Assert.assertNotSame;
|
||||||
|
@ -67,6 +68,7 @@ import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||||
import org.apache.hadoop.security.authorize.Service;
|
import org.apache.hadoop.security.authorize.Service;
|
||||||
import org.apache.hadoop.security.token.SecretManager;
|
import org.apache.hadoop.security.token.SecretManager;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
import org.apache.hadoop.test.MetricsAsserts;
|
||||||
import org.apache.hadoop.test.MockitoUtil;
|
import org.apache.hadoop.test.MockitoUtil;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -961,6 +963,44 @@ public class TestRPC {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRpcMetrics() throws Exception {
|
||||||
|
Configuration configuration = new Configuration();
|
||||||
|
final int interval = 1;
|
||||||
|
configuration.setBoolean(CommonConfigurationKeys.
|
||||||
|
RPC_METRICS_QUANTILE_ENABLE, true);
|
||||||
|
configuration.set(CommonConfigurationKeys.
|
||||||
|
RPC_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);
|
||||||
|
final Server server = new RPC.Builder(configuration)
|
||||||
|
.setProtocol(TestProtocol.class).setInstance(new TestImpl())
|
||||||
|
.setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
|
||||||
|
.build();
|
||||||
|
server.start();
|
||||||
|
final TestProtocol proxy = RPC.getProxy(TestProtocol.class,
|
||||||
|
TestProtocol.versionID, server.getListenerAddress(), configuration);
|
||||||
|
try {
|
||||||
|
for (int i=0; i<1000; i++) {
|
||||||
|
proxy.ping();
|
||||||
|
proxy.echo("" + i);
|
||||||
|
}
|
||||||
|
MetricsRecordBuilder rpcMetrics =
|
||||||
|
getMetrics(server.getRpcMetrics().name());
|
||||||
|
assertTrue("Expected non-zero rpc queue time",
|
||||||
|
getLongCounter("RpcQueueTimeNumOps", rpcMetrics) > 0);
|
||||||
|
assertTrue("Expected non-zero rpc processing time",
|
||||||
|
getLongCounter("RpcProcessingTimeNumOps", rpcMetrics) > 0);
|
||||||
|
MetricsAsserts.assertQuantileGauges("RpcQueueTime" + interval + "s",
|
||||||
|
rpcMetrics);
|
||||||
|
MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s",
|
||||||
|
rpcMetrics);
|
||||||
|
} finally {
|
||||||
|
if (proxy != null) {
|
||||||
|
RPC.stopProxy(proxy);
|
||||||
|
}
|
||||||
|
server.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws IOException {
|
public static void main(String[] args) throws IOException {
|
||||||
new TestRPC().testCallsInternal(conf);
|
new TestRPC().testCallsInternal(conf);
|
||||||
|
|
||||||
|
|
|
@ -738,7 +738,7 @@ public class TestUserGroupInformation {
|
||||||
long groups) throws InterruptedException {
|
long groups) throws InterruptedException {
|
||||||
MetricsRecordBuilder rb = getMetrics("UgiMetrics");
|
MetricsRecordBuilder rb = getMetrics("UgiMetrics");
|
||||||
if (groups > 0) {
|
if (groups > 0) {
|
||||||
assertCounter("GetGroupsNumOps", groups, rb);
|
assertCounterGt("GetGroupsNumOps", groups-1, rb);
|
||||||
double avg = getDoubleGauge("GetGroupsAvgTime", rb);
|
double avg = getDoubleGauge("GetGroupsAvgTime", rb);
|
||||||
assertTrue(avg >= 0.0);
|
assertTrue(avg >= 0.0);
|
||||||
|
|
||||||
|
|
|
@ -245,6 +245,9 @@ Trunk (Unreleased)
|
||||||
HDFS-5715. Use Snapshot ID to indicate the corresponding Snapshot for a
|
HDFS-5715. Use Snapshot ID to indicate the corresponding Snapshot for a
|
||||||
FileDiff/DirectoryDiff. (jing9)
|
FileDiff/DirectoryDiff. (jing9)
|
||||||
|
|
||||||
|
HDFS-5721. sharedEditsImage in Namenode#initializeSharedEdits() should be
|
||||||
|
closed before method returns. (Ted Yu via junping_du)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
|
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
|
||||||
|
@ -736,6 +739,8 @@ Release 2.4.0 - UNRELEASED
|
||||||
HDFS-5690. DataNode fails to start in secure mode when dfs.http.policy equals to
|
HDFS-5690. DataNode fails to start in secure mode when dfs.http.policy equals to
|
||||||
HTTP_ONLY. (Haohui Mai via jing9)
|
HTTP_ONLY. (Haohui Mai via jing9)
|
||||||
|
|
||||||
|
HDFS-5449. WebHdfs compatibility broken between 2.2 and 1.x / 23.x (kihwal)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-2832 SUBTASKS AND RELATED JIRAS
|
BREAKDOWN OF HDFS-2832 SUBTASKS AND RELATED JIRAS
|
||||||
|
|
||||||
HDFS-4985. Add storage type to the protocol and expose it in block report
|
HDFS-4985. Add storage type to the protocol and expose it in block report
|
||||||
|
|
|
@ -622,8 +622,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
|
|
||||||
long loadStart = now();
|
long loadStart = now();
|
||||||
String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
|
String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
|
||||||
namesystem.loadFSImage(startOpt, fsImage,
|
try {
|
||||||
HAUtil.isHAEnabled(conf, nameserviceId));
|
namesystem.loadFSImage(startOpt, fsImage,
|
||||||
|
HAUtil.isHAEnabled(conf, nameserviceId));
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.warn("Encountered exception loading fsimage", ioe);
|
||||||
|
fsImage.close();
|
||||||
|
throw ioe;
|
||||||
|
}
|
||||||
long timeTakenToLoadFSImage = now() - loadStart;
|
long timeTakenToLoadFSImage = now() - loadStart;
|
||||||
LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
|
LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
|
||||||
NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics();
|
NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics();
|
||||||
|
|
|
@ -816,14 +816,20 @@ public class NameNode implements NameNodeStatusMXBean {
|
||||||
System.out.println("Formatting using clusterid: " + clusterId);
|
System.out.println("Formatting using clusterid: " + clusterId);
|
||||||
|
|
||||||
FSImage fsImage = new FSImage(conf, nameDirsToFormat, editDirsToFormat);
|
FSImage fsImage = new FSImage(conf, nameDirsToFormat, editDirsToFormat);
|
||||||
FSNamesystem fsn = new FSNamesystem(conf, fsImage);
|
try {
|
||||||
fsImage.getEditLog().initJournalsForWrite();
|
FSNamesystem fsn = new FSNamesystem(conf, fsImage);
|
||||||
|
fsImage.getEditLog().initJournalsForWrite();
|
||||||
if (!fsImage.confirmFormat(force, isInteractive)) {
|
|
||||||
return true; // aborted
|
if (!fsImage.confirmFormat(force, isInteractive)) {
|
||||||
|
return true; // aborted
|
||||||
|
}
|
||||||
|
|
||||||
|
fsImage.format(fsn, clusterId);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.warn("Encountered exception during format: ", ioe);
|
||||||
|
fsImage.close();
|
||||||
|
throw ioe;
|
||||||
}
|
}
|
||||||
|
|
||||||
fsImage.format(fsn, clusterId);
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -897,6 +903,7 @@ public class NameNode implements NameNodeStatusMXBean {
|
||||||
}
|
}
|
||||||
|
|
||||||
NNStorage existingStorage = null;
|
NNStorage existingStorage = null;
|
||||||
|
FSImage sharedEditsImage = null;
|
||||||
try {
|
try {
|
||||||
FSNamesystem fsns =
|
FSNamesystem fsns =
|
||||||
FSNamesystem.loadFromDisk(getConfigurationWithoutSharedEdits(conf));
|
FSNamesystem.loadFromDisk(getConfigurationWithoutSharedEdits(conf));
|
||||||
|
@ -906,7 +913,7 @@ public class NameNode implements NameNodeStatusMXBean {
|
||||||
|
|
||||||
List<URI> sharedEditsDirs = FSNamesystem.getSharedEditsDirs(conf);
|
List<URI> sharedEditsDirs = FSNamesystem.getSharedEditsDirs(conf);
|
||||||
|
|
||||||
FSImage sharedEditsImage = new FSImage(conf,
|
sharedEditsImage = new FSImage(conf,
|
||||||
Lists.<URI>newArrayList(),
|
Lists.<URI>newArrayList(),
|
||||||
sharedEditsDirs);
|
sharedEditsDirs);
|
||||||
sharedEditsImage.getEditLog().initJournalsForWrite();
|
sharedEditsImage.getEditLog().initJournalsForWrite();
|
||||||
|
@ -934,6 +941,13 @@ public class NameNode implements NameNodeStatusMXBean {
|
||||||
LOG.error("Could not initialize shared edits dir", ioe);
|
LOG.error("Could not initialize shared edits dir", ioe);
|
||||||
return true; // aborted
|
return true; // aborted
|
||||||
} finally {
|
} finally {
|
||||||
|
if (sharedEditsImage != null) {
|
||||||
|
try {
|
||||||
|
sharedEditsImage.close();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.warn("Could not close sharedEditsImage", ioe);
|
||||||
|
}
|
||||||
|
}
|
||||||
// Have to unlock storage explicitly for the case when we're running in a
|
// Have to unlock storage explicitly for the case when we're running in a
|
||||||
// unit test, which runs in the same JVM as NNs.
|
// unit test, which runs in the same JVM as NNs.
|
||||||
if (existingStorage != null) {
|
if (existingStorage != null) {
|
||||||
|
|
|
@ -190,24 +190,29 @@ public class BootstrapStandby implements Tool, Configurable {
|
||||||
// Load the newly formatted image, using all of the directories (including shared
|
// Load the newly formatted image, using all of the directories (including shared
|
||||||
// edits)
|
// edits)
|
||||||
FSImage image = new FSImage(conf);
|
FSImage image = new FSImage(conf);
|
||||||
image.getStorage().setStorageInfo(storage);
|
try {
|
||||||
image.initEditLog();
|
image.getStorage().setStorageInfo(storage);
|
||||||
assert image.getEditLog().isOpenForRead() :
|
image.initEditLog();
|
||||||
|
assert image.getEditLog().isOpenForRead() :
|
||||||
"Expected edit log to be open for read";
|
"Expected edit log to be open for read";
|
||||||
|
|
||||||
// Ensure that we have enough edits already in the shared directory to
|
|
||||||
// start up from the last checkpoint on the active.
|
|
||||||
if (!checkLogsAvailableForRead(image, imageTxId, curTxId)) {
|
|
||||||
return ERR_CODE_LOGS_UNAVAILABLE;
|
|
||||||
}
|
|
||||||
|
|
||||||
image.getStorage().writeTransactionIdFileToStorage(curTxId);
|
|
||||||
|
|
||||||
// Download that checkpoint into our storage directories.
|
// Ensure that we have enough edits already in the shared directory to
|
||||||
MD5Hash hash = TransferFsImage.downloadImageToStorage(
|
// start up from the last checkpoint on the active.
|
||||||
|
if (!checkLogsAvailableForRead(image, imageTxId, curTxId)) {
|
||||||
|
return ERR_CODE_LOGS_UNAVAILABLE;
|
||||||
|
}
|
||||||
|
|
||||||
|
image.getStorage().writeTransactionIdFileToStorage(curTxId);
|
||||||
|
|
||||||
|
// Download that checkpoint into our storage directories.
|
||||||
|
MD5Hash hash = TransferFsImage.downloadImageToStorage(
|
||||||
otherHttpAddr, imageTxId,
|
otherHttpAddr, imageTxId,
|
||||||
storage, true);
|
storage, true);
|
||||||
image.saveDigestAndRenameCheckpointImage(imageTxId, hash);
|
image.saveDigestAndRenameCheckpointImage(imageTxId, hash);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
image.close();
|
||||||
|
throw ioe;
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -271,7 +271,7 @@ public class JsonUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Convert a DatanodeInfo to a Json map. */
|
/** Convert a DatanodeInfo to a Json map. */
|
||||||
private static Map<String, Object> toJsonMap(final DatanodeInfo datanodeinfo) {
|
static Map<String, Object> toJsonMap(final DatanodeInfo datanodeinfo) {
|
||||||
if (datanodeinfo == null) {
|
if (datanodeinfo == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -279,6 +279,9 @@ public class JsonUtil {
|
||||||
// TODO: Fix storageID
|
// TODO: Fix storageID
|
||||||
final Map<String, Object> m = new TreeMap<String, Object>();
|
final Map<String, Object> m = new TreeMap<String, Object>();
|
||||||
m.put("ipAddr", datanodeinfo.getIpAddr());
|
m.put("ipAddr", datanodeinfo.getIpAddr());
|
||||||
|
// 'name' is equivalent to ipAddr:xferPort. Older clients (1.x, 0.23.x)
|
||||||
|
// expects this instead of the two fields.
|
||||||
|
m.put("name", datanodeinfo.getXferAddr());
|
||||||
m.put("hostName", datanodeinfo.getHostName());
|
m.put("hostName", datanodeinfo.getHostName());
|
||||||
m.put("storageID", datanodeinfo.getDatanodeUuid());
|
m.put("storageID", datanodeinfo.getDatanodeUuid());
|
||||||
m.put("xferPort", datanodeinfo.getXferPort());
|
m.put("xferPort", datanodeinfo.getXferPort());
|
||||||
|
@ -325,17 +328,49 @@ public class JsonUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Convert a Json map to an DatanodeInfo object. */
|
/** Convert a Json map to an DatanodeInfo object. */
|
||||||
static DatanodeInfo toDatanodeInfo(final Map<?, ?> m) {
|
static DatanodeInfo toDatanodeInfo(final Map<?, ?> m)
|
||||||
|
throws IOException {
|
||||||
if (m == null) {
|
if (m == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ipAddr and xferPort are the critical fields for accessing data.
|
||||||
|
// If any one of the two is missing, an exception needs to be thrown.
|
||||||
|
|
||||||
|
// Handle the case of old servers (1.x, 0.23.x) sending 'name' instead
|
||||||
|
// of ipAddr and xferPort.
|
||||||
|
String ipAddr = getString(m, "ipAddr", null);
|
||||||
|
int xferPort = getInt(m, "xferPort", -1);
|
||||||
|
if (ipAddr == null) {
|
||||||
|
String name = getString(m, "name", null);
|
||||||
|
if (name != null) {
|
||||||
|
int colonIdx = name.indexOf(':');
|
||||||
|
if (colonIdx > 0) {
|
||||||
|
ipAddr = name.substring(0, colonIdx);
|
||||||
|
xferPort = Integer.parseInt(name.substring(colonIdx +1));
|
||||||
|
} else {
|
||||||
|
throw new IOException(
|
||||||
|
"Invalid value in server response: name=[" + name + "]");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw new IOException(
|
||||||
|
"Missing both 'ipAddr' and 'name' in server response.");
|
||||||
|
}
|
||||||
|
// ipAddr is non-null & non-empty string at this point.
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the validity of xferPort.
|
||||||
|
if (xferPort == -1) {
|
||||||
|
throw new IOException(
|
||||||
|
"Invalid or missing 'xferPort' in server response.");
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: Fix storageID
|
// TODO: Fix storageID
|
||||||
return new DatanodeInfo(
|
return new DatanodeInfo(
|
||||||
(String)m.get("ipAddr"),
|
ipAddr,
|
||||||
(String)m.get("hostName"),
|
(String)m.get("hostName"),
|
||||||
(String)m.get("storageID"),
|
(String)m.get("storageID"),
|
||||||
(int)(long)(Long)m.get("xferPort"),
|
xferPort,
|
||||||
(int)(long)(Long)m.get("infoPort"),
|
(int)(long)(Long)m.get("infoPort"),
|
||||||
getInt(m, "infoSecurePort", 0),
|
getInt(m, "infoSecurePort", 0),
|
||||||
(int)(long)(Long)m.get("ipcPort"),
|
(int)(long)(Long)m.get("ipcPort"),
|
||||||
|
@ -368,7 +403,8 @@ public class JsonUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Convert an Object[] to a DatanodeInfo[]. */
|
/** Convert an Object[] to a DatanodeInfo[]. */
|
||||||
private static DatanodeInfo[] toDatanodeInfoArray(final Object[] objects) {
|
private static DatanodeInfo[] toDatanodeInfoArray(final Object[] objects)
|
||||||
|
throws IOException {
|
||||||
if (objects == null) {
|
if (objects == null) {
|
||||||
return null;
|
return null;
|
||||||
} else if (objects.length == 0) {
|
} else if (objects.length == 0) {
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INodeId;
|
import org.apache.hadoop.hdfs.server.namenode.INodeId;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
@ -61,7 +62,7 @@ public class TestJsonUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testToDatanodeInfoWithoutSecurePort() {
|
public void testToDatanodeInfoWithoutSecurePort() throws Exception {
|
||||||
Map<String, Object> response = new HashMap<String, Object>();
|
Map<String, Object> response = new HashMap<String, Object>();
|
||||||
|
|
||||||
response.put("ipAddr", "127.0.0.1");
|
response.put("ipAddr", "127.0.0.1");
|
||||||
|
@ -84,4 +85,63 @@ public class TestJsonUtil {
|
||||||
|
|
||||||
JsonUtil.toDatanodeInfo(response);
|
JsonUtil.toDatanodeInfo(response);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testToDatanodeInfoWithName() throws Exception {
|
||||||
|
Map<String, Object> response = new HashMap<String, Object>();
|
||||||
|
|
||||||
|
// Older servers (1.x, 0.23, etc.) sends 'name' instead of ipAddr
|
||||||
|
// and xferPort.
|
||||||
|
String name = "127.0.0.1:1004";
|
||||||
|
response.put("name", name);
|
||||||
|
response.put("hostName", "localhost");
|
||||||
|
response.put("storageID", "fake-id");
|
||||||
|
response.put("infoPort", 1338l);
|
||||||
|
response.put("ipcPort", 1339l);
|
||||||
|
response.put("capacity", 1024l);
|
||||||
|
response.put("dfsUsed", 512l);
|
||||||
|
response.put("remaining", 512l);
|
||||||
|
response.put("blockPoolUsed", 512l);
|
||||||
|
response.put("lastUpdate", 0l);
|
||||||
|
response.put("xceiverCount", 4096l);
|
||||||
|
response.put("networkLocation", "foo.bar.baz");
|
||||||
|
response.put("adminState", "NORMAL");
|
||||||
|
response.put("cacheCapacity", 123l);
|
||||||
|
response.put("cacheUsed", 321l);
|
||||||
|
|
||||||
|
DatanodeInfo di = JsonUtil.toDatanodeInfo(response);
|
||||||
|
Assert.assertEquals(name, di.getXferAddr());
|
||||||
|
|
||||||
|
// The encoded result should contain name, ipAddr and xferPort.
|
||||||
|
Map<String, Object> r = JsonUtil.toJsonMap(di);
|
||||||
|
Assert.assertEquals(name, (String)r.get("name"));
|
||||||
|
Assert.assertEquals("127.0.0.1", (String)r.get("ipAddr"));
|
||||||
|
// In this test, it is Integer instead of Long since json was not actually
|
||||||
|
// involved in constructing the map.
|
||||||
|
Assert.assertEquals(1004, (int)(Integer)r.get("xferPort"));
|
||||||
|
|
||||||
|
// Invalid names
|
||||||
|
String[] badNames = {"127.0.0.1", "127.0.0.1:", ":", "127.0.0.1:sweet", ":123"};
|
||||||
|
for (String badName : badNames) {
|
||||||
|
response.put("name", badName);
|
||||||
|
checkDecodeFailure(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Missing both name and ipAddr
|
||||||
|
response.remove("name");
|
||||||
|
checkDecodeFailure(response);
|
||||||
|
|
||||||
|
// Only missing xferPort
|
||||||
|
response.put("ipAddr", "127.0.0.1");
|
||||||
|
checkDecodeFailure(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkDecodeFailure(Map<String, Object> map) {
|
||||||
|
try {
|
||||||
|
JsonUtil.toDatanodeInfo(map);
|
||||||
|
Assert.fail("Exception not thrown against bad input.");
|
||||||
|
} catch (Exception e) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -182,9 +182,10 @@ public class TestMRApps {
|
||||||
MRApps.setClasspath(environment, job.getConfiguration());
|
MRApps.setClasspath(environment, job.getConfiguration());
|
||||||
assertTrue(environment.get("CLASSPATH").startsWith(
|
assertTrue(environment.get("CLASSPATH").startsWith(
|
||||||
ApplicationConstants.Environment.PWD.$() + File.pathSeparator));
|
ApplicationConstants.Environment.PWD.$() + File.pathSeparator));
|
||||||
String yarnAppClasspath =
|
String yarnAppClasspath = job.getConfiguration().get(
|
||||||
job.getConfiguration().get(
|
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
|
||||||
YarnConfiguration.YARN_APPLICATION_CLASSPATH);
|
StringUtils.join(",",
|
||||||
|
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH));
|
||||||
if (yarnAppClasspath != null) {
|
if (yarnAppClasspath != null) {
|
||||||
yarnAppClasspath = yarnAppClasspath.replaceAll(",\\s*", File.pathSeparator)
|
yarnAppClasspath = yarnAppClasspath.replaceAll(",\\s*", File.pathSeparator)
|
||||||
.trim();
|
.trim();
|
||||||
|
@ -217,7 +218,10 @@ public class TestMRApps {
|
||||||
MRApps.setClasspath(environment, conf);
|
MRApps.setClasspath(environment, conf);
|
||||||
assertTrue(environment.get("CLASSPATH").startsWith(
|
assertTrue(environment.get("CLASSPATH").startsWith(
|
||||||
ApplicationConstants.Environment.PWD.$() + File.pathSeparator));
|
ApplicationConstants.Environment.PWD.$() + File.pathSeparator));
|
||||||
String confClasspath = job.getConfiguration().get(YarnConfiguration.YARN_APPLICATION_CLASSPATH);
|
String confClasspath = job.getConfiguration().get(
|
||||||
|
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
|
||||||
|
StringUtils.join(",",
|
||||||
|
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH));
|
||||||
if (confClasspath != null) {
|
if (confClasspath != null) {
|
||||||
confClasspath = confClasspath.replaceAll(",\\s*", File.pathSeparator)
|
confClasspath = confClasspath.replaceAll(",\\s*", File.pathSeparator)
|
||||||
.trim();
|
.trim();
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.RunningJob;
|
||||||
|
|
||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
@ -86,8 +87,10 @@ public class TestEncryptedShuffle {
|
||||||
conf.set("dfs.block.access.token.enable", "false");
|
conf.set("dfs.block.access.token.enable", "false");
|
||||||
conf.set("dfs.permissions", "true");
|
conf.set("dfs.permissions", "true");
|
||||||
conf.set("hadoop.security.authentication", "simple");
|
conf.set("hadoop.security.authentication", "simple");
|
||||||
String cp = conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH) +
|
String cp = conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
|
||||||
File.pathSeparator + classpathDir;
|
StringUtils.join(",",
|
||||||
|
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH))
|
||||||
|
+ File.pathSeparator + classpathDir;
|
||||||
conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, cp);
|
conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, cp);
|
||||||
dfsCluster = new MiniDFSCluster(conf, 1, true, null);
|
dfsCluster = new MiniDFSCluster(conf, 1, true, null);
|
||||||
FileSystem fileSystem = dfsCluster.getFileSystem();
|
FileSystem fileSystem = dfsCluster.getFileSystem();
|
||||||
|
|
|
@ -205,6 +205,8 @@ Release 2.4.0 - UNRELEASED
|
||||||
|
|
||||||
YARN-1568. Rename clusterid to clusterId in ActiveRMInfoProto (kasha)
|
YARN-1568. Rename clusterid to clusterId in ActiveRMInfoProto (kasha)
|
||||||
|
|
||||||
|
YARN-1579. ActiveRMInfoProto fields should be optional (kasha)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
@ -395,6 +397,9 @@ Release 2.3.0 - UNRELEASED
|
||||||
YARN-1438. Ensure container diagnostics includes exception from container
|
YARN-1438. Ensure container diagnostics includes exception from container
|
||||||
launch. (stevel via acmurthy)
|
launch. (stevel via acmurthy)
|
||||||
|
|
||||||
|
YARN-1138. yarn.application.classpath is set to point to $HADOOP_CONF_DIR
|
||||||
|
etc., which does not work on Windows. (Chuan Liu via cnauroth)
|
||||||
|
|
||||||
Release 2.2.0 - 2013-10-13
|
Release 2.2.0 - 2013-10-13
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -138,6 +138,6 @@ message RMStateVersionProto {
|
||||||
///////////// RM Failover related records ////////////////////////
|
///////////// RM Failover related records ////////////////////////
|
||||||
//////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////
|
||||||
message ActiveRMInfoProto {
|
message ActiveRMInfoProto {
|
||||||
required string clusterId = 1;
|
optional string clusterId = 1;
|
||||||
required string rmId = 2;
|
optional string rmId = 2;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1016,10 +1016,29 @@
|
||||||
<!-- Applications' Configuration-->
|
<!-- Applications' Configuration-->
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>CLASSPATH for YARN applications. A comma-separated list
|
<description>
|
||||||
of CLASSPATH entries</description>
|
CLASSPATH for YARN applications. A comma-separated list
|
||||||
<name>yarn.application.classpath</name>
|
of CLASSPATH entries. When this value is empty, the following default
|
||||||
<value>$HADOOP_CONF_DIR,$HADOOP_COMMON_HOME/share/hadoop/common/*,$HADOOP_COMMON_HOME/share/hadoop/common/lib/*,$HADOOP_HDFS_HOME/share/hadoop/hdfs/*,$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*,$HADOOP_YARN_HOME/share/hadoop/yarn/*,$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*</value>
|
CLASSPATH for YARN applications would be used.
|
||||||
|
For Linux:
|
||||||
|
$HADOOP_CONF_DIR,
|
||||||
|
$HADOOP_COMMON_HOME/share/hadoop/common/*,
|
||||||
|
$HADOOP_COMMON_HOME/share/hadoop/common/lib/*,
|
||||||
|
$HADOOP_HDFS_HOME/share/hadoop/hdfs/*,
|
||||||
|
$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*,
|
||||||
|
$HADOOP_YARN_HOME/share/hadoop/yarn/*,
|
||||||
|
$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*
|
||||||
|
For Windows:
|
||||||
|
%HADOOP_CONF_DIR%,
|
||||||
|
%HADOOP_COMMON_HOME%/share/hadoop/common/*,
|
||||||
|
%HADOOP_COMMON_HOME%/share/hadoop/common/lib/*,
|
||||||
|
%HADOOP_HDFS_HOME%/share/hadoop/hdfs/*,
|
||||||
|
%HADOOP_HDFS_HOME%/share/hadoop/hdfs/lib/*,
|
||||||
|
%HADOOP_YARN_HOME%/share/hadoop/yarn/*,
|
||||||
|
%HADOOP_YARN_HOME%/share/hadoop/yarn/lib/*
|
||||||
|
</description>
|
||||||
|
<name>yarn.application.classpath</name>
|
||||||
|
<value></value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<!-- Other configuration -->
|
<!-- Other configuration -->
|
||||||
|
|
Loading…
Reference in New Issue