Merge r1569890 through r1571553 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1571554 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2014-02-25 03:30:24 +00:00
commit 98db64dd49
9 changed files with 372 additions and 12 deletions

View File

@ -613,6 +613,8 @@ Release 2.4.0 - UNRELEASED
HDFS-5981. PBImageXmlWriter generates malformed XML.
(Haohui Mai via cnauroth)
HDFS-5922. DN heartbeat thread can get stuck in tight loop. (Arpit Agarwal)
Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -1108,6 +1108,7 @@ public class DFSOutputStream extends FSOutputSummer
excluded.length > 0 ? excluded : null);
block = lb.getBlock();
block.setNumBytes(0);
bytesSent = 0;
accessToken = lb.getBlockToken();
nodes = lb.getLocations();

View File

@ -101,7 +101,10 @@ class BPServiceActor implements Runnable {
private final Map<String, PerStoragePendingIncrementalBR>
pendingIncrementalBRperStorage = Maps.newHashMap();
private volatile int pendingReceivedRequests = 0;
// IBR = Incremental Block Report. If this flag is set then an IBR will be
// sent immediately by the actor thread without waiting for the IBR timer
// to elapse.
private volatile boolean sendImmediateIBR = false;
private volatile boolean shouldServiceRun = true;
private final DataNode dn;
private final DNConf dnConf;
@ -275,12 +278,10 @@ class BPServiceActor implements Runnable {
if (perStorageMap.getBlockInfoCount() > 0) {
// Send newly-received and deleted blockids to namenode
ReceivedDeletedBlockInfo[] rdbi = perStorageMap.dequeueBlockInfos();
pendingReceivedRequests =
(pendingReceivedRequests > rdbi.length ?
(pendingReceivedRequests - rdbi.length) : 0);
reports.add(new StorageReceivedDeletedBlocks(storageUuid, rdbi));
}
}
sendImmediateIBR = false;
}
if (reports.size() == 0) {
@ -304,8 +305,8 @@ class BPServiceActor implements Runnable {
// didn't put something newer in the meantime.
PerStoragePendingIncrementalBR perStorageMap =
pendingIncrementalBRperStorage.get(report.getStorageID());
pendingReceivedRequests +=
perStorageMap.putMissingBlockInfos(report.getBlocks());
perStorageMap.putMissingBlockInfos(report.getBlocks());
sendImmediateIBR = true;
}
}
}
@ -363,7 +364,7 @@ class BPServiceActor implements Runnable {
ReceivedDeletedBlockInfo bInfo, String storageUuid) {
synchronized (pendingIncrementalBRperStorage) {
addPendingReplicationBlockInfo(bInfo, storageUuid);
pendingReceivedRequests++;
sendImmediateIBR = true;
pendingIncrementalBRperStorage.notifyAll();
}
}
@ -425,6 +426,11 @@ class BPServiceActor implements Runnable {
}
}
@VisibleForTesting
boolean hasPendingIBR() {
return sendImmediateIBR;
}
/**
* Report the list blocks to the Namenode
* @return DatanodeCommands returned by the NN. May be null.
@ -686,8 +692,8 @@ class BPServiceActor implements Runnable {
}
}
}
if (pendingReceivedRequests > 0
|| (startTime - lastDeletedReport > dnConf.deleteReportInterval)) {
if (sendImmediateIBR ||
(startTime - lastDeletedReport > dnConf.deleteReportInterval)) {
reportReceivedDeletedBlocks();
lastDeletedReport = startTime;
}
@ -711,7 +717,7 @@ class BPServiceActor implements Runnable {
long waitTime = dnConf.heartBeatInterval -
(Time.now() - lastHeartbeat);
synchronized(pendingIncrementalBRperStorage) {
if (waitTime > 0 && pendingReceivedRequests == 0) {
if (waitTime > 0 && !sendImmediateIBR) {
try {
pendingIncrementalBRperStorage.wait(waitTime);
} catch (InterruptedException ie) {

View File

@ -0,0 +1,211 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import static junit.framework.Assert.assertFalse;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.times;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
/**
* Verify that incremental block reports are generated in response to
* block additions/deletions.
*/
public class TestIncrementalBlockReports {
public static final Log LOG = LogFactory.getLog(TestIncrementalBlockReports.class);
private static final short DN_COUNT = 1;
private static final long DUMMY_BLOCK_ID = 5678;
private static final long DUMMY_BLOCK_LENGTH = 1024 * 1024;
private static final long DUMMY_BLOCK_GENSTAMP = 1000;
private MiniDFSCluster cluster = null;
private DistributedFileSystem fs;
private Configuration conf;
private NameNode singletonNn;
private DataNode singletonDn;
private BPOfferService bpos; // BPOS to use for block injection.
private BPServiceActor actor; // BPSA to use for block injection.
private String storageUuid; // DatanodeStorage to use for block injection.
@Before
public void startCluster() throws IOException {
conf = new HdfsConfiguration();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DN_COUNT).build();
fs = cluster.getFileSystem();
singletonNn = cluster.getNameNode();
singletonDn = cluster.getDataNodes().get(0);
bpos = singletonDn.getAllBpOs()[0];
actor = bpos.getBPServiceActors().get(0);
storageUuid = singletonDn.getFSDataset().getVolumes().get(0).getStorageID();
}
private static Block getDummyBlock() {
return new Block(DUMMY_BLOCK_ID, DUMMY_BLOCK_LENGTH, DUMMY_BLOCK_GENSTAMP);
}
/**
* Inject a fake 'received' block into the BPServiceActor state.
*/
private void injectBlockReceived() {
ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo(
getDummyBlock(), BlockStatus.RECEIVED_BLOCK, null);
actor.notifyNamenodeBlockImmediately(rdbi, storageUuid);
}
/**
* Inject a fake 'deleted' block into the BPServiceActor state.
*/
private void injectBlockDeleted() {
ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo(
getDummyBlock(), BlockStatus.DELETED_BLOCK, null);
actor.notifyNamenodeDeletedBlock(rdbi, storageUuid);
}
/**
* Spy on calls from the DN to the NN.
* @return spy object that can be used for Mockito verification.
*/
DatanodeProtocolClientSideTranslatorPB spyOnDnCallsToNn() {
return DataNodeTestUtils.spyOnBposToNN(singletonDn, singletonNn);
}
/**
* Ensure that an IBR is generated immediately for a block received by
* the DN.
*
* @throws InterruptedException
* @throws IOException
*/
@Test (timeout=60000)
public void testReportBlockReceived() throws InterruptedException, IOException {
try {
DatanodeProtocolClientSideTranslatorPB nnSpy = spyOnDnCallsToNn();
injectBlockReceived();
// Sleep for a very short time, this is necessary since the IBR is
// generated asynchronously.
Thread.sleep(2000);
// Ensure that the received block was reported immediately.
Mockito.verify(nnSpy, times(1)).blockReceivedAndDeleted(
any(DatanodeRegistration.class),
anyString(),
any(StorageReceivedDeletedBlocks[].class));
} finally {
cluster.shutdown();
cluster = null;
}
}
/**
* Ensure that a delayed IBR is generated for a block deleted on the DN.
*
* @throws InterruptedException
* @throws IOException
*/
@Test (timeout=60000)
public void testReportBlockDeleted() throws InterruptedException, IOException {
try {
// Trigger a block report to reset the IBR timer.
DataNodeTestUtils.triggerBlockReport(singletonDn);
// Spy on calls from the DN to the NN
DatanodeProtocolClientSideTranslatorPB nnSpy = spyOnDnCallsToNn();
injectBlockDeleted();
// Sleep for a very short time since IBR is generated
// asynchronously.
Thread.sleep(2000);
// Ensure that no block report was generated immediately.
// Deleted blocks are reported when the IBR timer elapses.
Mockito.verify(nnSpy, times(0)).blockReceivedAndDeleted(
any(DatanodeRegistration.class),
anyString(),
any(StorageReceivedDeletedBlocks[].class));
// Trigger a block report, this also triggers an IBR.
DataNodeTestUtils.triggerBlockReport(singletonDn);
Thread.sleep(2000);
// Ensure that the deleted block is reported.
Mockito.verify(nnSpy, times(1)).blockReceivedAndDeleted(
any(DatanodeRegistration.class),
anyString(),
any(StorageReceivedDeletedBlocks[].class));
} finally {
cluster.shutdown();
cluster = null;
}
}
/**
* Add a received block entry and then replace it. Ensure that a single
* IBR is generated and that pending receive request state is cleared.
* This test case verifies the failure in HDFS-5922.
*
* @throws InterruptedException
* @throws IOException
*/
@Test (timeout=60000)
public void testReplaceReceivedBlock() throws InterruptedException, IOException {
try {
// Spy on calls from the DN to the NN
DatanodeProtocolClientSideTranslatorPB nnSpy = spyOnDnCallsToNn();
injectBlockReceived();
injectBlockReceived(); // Overwrite the existing entry.
// Sleep for a very short time since IBR is generated
// asynchronously.
Thread.sleep(2000);
// Ensure that the received block is reported.
Mockito.verify(nnSpy, times(1)).blockReceivedAndDeleted(
any(DatanodeRegistration.class),
anyString(),
any(StorageReceivedDeletedBlocks[].class));
// Ensure that no more IBRs are pending.
assertFalse(actor.hasPendingIBR());
} finally {
cluster.shutdown();
cluster = null;
}
}
}

View File

@ -173,6 +173,9 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5699. Allow setting tags on MR jobs (kasha)
MAPREDUCE-5761. Added a simple log message to denote when encrypted shuffle
is on in the shuffle-handler. (Jian He via vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -367,6 +367,7 @@ public class ShuffleHandler extends AuxiliaryService {
SHUFFLE = getShuffle(conf);
if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
LOG.info("Encrypted shuffle is enabled.");
sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
sslFactory.init();
}

View File

@ -348,6 +348,10 @@ Release 2.4.0 - UNRELEASED
re-registration after a RESYNC and thus avoid hanging. (Rohith Sharma via
vinodkv)
YARN-1734. Fixed ResourceManager to update the configurations when it
transits from standby to active mode so as to assimilate any changes that
happened while it was in standby mode. (Xuan Gong via vinodkv)
Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -250,10 +250,20 @@ public class AdminService extends CompositeService implements
@Override
public synchronized void transitionToActive(
HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
// call refreshAdminAcls before HA state transition
// for the case that adminAcls have been updated in previous active RM
try {
refreshAdminAcls(false);
} catch (YarnException ex) {
throw new ServiceFailedException("Can not execute refreshAdminAcls", ex);
}
UserGroupInformation user = checkAccess("transitionToActive");
checkHaStateChange(reqInfo);
try {
rm.transitionToActive();
// call all refresh*s for active RM to get the updated configurations.
refreshAll();
RMAuditLogger.logSuccess(user.getShortUserName(),
"transitionToActive", "RMHAProtocolService");
} catch (Exception e) {
@ -268,6 +278,13 @@ public class AdminService extends CompositeService implements
@Override
public synchronized void transitionToStandby(
HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
// call refreshAdminAcls before HA state transition
// for the case that adminAcls have been updated in previous active RM
try {
refreshAdminAcls(false);
} catch (YarnException ex) {
throw new ServiceFailedException("Can not execute refreshAdminAcls", ex);
}
UserGroupInformation user = checkAccess("transitionToStandby");
checkHaStateChange(reqInfo);
try {
@ -406,10 +423,15 @@ public class AdminService extends CompositeService implements
@Override
public RefreshAdminAclsResponse refreshAdminAcls(
RefreshAdminAclsRequest request) throws YarnException, IOException {
return refreshAdminAcls(true);
}
private RefreshAdminAclsResponse refreshAdminAcls(boolean checkRMHAState)
throws YarnException, IOException {
String argName = "refreshAdminAcls";
UserGroupInformation user = checkAcls(argName);
if (!isRMActive()) {
if (checkRMHAState && !isRMActive()) {
RMAuditLogger.logFailure(user.getShortUserName(), argName,
adminAcl.toString(), "AdminService",
"ResourceManager is not active. Can not refresh user-groups.");
@ -521,6 +543,24 @@ public class AdminService extends CompositeService implements
return conf;
}
private void refreshAll() throws ServiceFailedException {
try {
refreshQueues(RefreshQueuesRequest.newInstance());
refreshNodes(RefreshNodesRequest.newInstance());
refreshSuperUserGroupsConfiguration(
RefreshSuperUserGroupsConfigurationRequest.newInstance());
refreshUserToGroupsMappings(
RefreshUserToGroupsMappingsRequest.newInstance());
if (getConfig().getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
refreshServiceAcls(RefreshServiceAclsRequest.newInstance());
}
} catch (Exception ex) {
throw new ServiceFailedException(ex.getMessage());
}
}
@VisibleForTesting
public AccessControlList getAccessControlList() {
return this.adminAcl;

View File

@ -34,12 +34,16 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.security.GroupMappingServiceProvider;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
@ -518,6 +522,94 @@ public class TestRMAdminService {
Assert.assertTrue(excludeHosts.contains("0.0.0.0:123"));
}
@Test
public void testRMHAWithFileSystemBasedConfiguration() throws IOException,
YarnException {
StateChangeRequestInfo requestInfo = new StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
configuration.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2");
int base = 100;
for (String confKey : YarnConfiguration
.getServiceAddressConfKeys(configuration)) {
configuration.set(HAUtil.addSuffix(confKey, "rm1"), "0.0.0.0:"
+ (base + 20));
configuration.set(HAUtil.addSuffix(confKey, "rm2"), "0.0.0.0:"
+ (base + 40));
base = base * 2;
}
Configuration conf1 = new Configuration(configuration);
conf1.set(YarnConfiguration.RM_HA_ID, "rm1");
Configuration conf2 = new Configuration(configuration);
conf2.set(YarnConfiguration.RM_HA_ID, "rm2");
// upload default configurations
uploadDefaultConfiguration();
MockRM rm1 = null;
MockRM rm2 = null;
try {
rm1 = new MockRM(conf1);
rm1.init(conf1);
rm1.start();
Assert.assertTrue(rm1.getRMContext().getHAServiceState()
== HAServiceState.STANDBY);
rm2 = new MockRM(conf2);
rm2.init(conf1);
rm2.start();
Assert.assertTrue(rm2.getRMContext().getHAServiceState()
== HAServiceState.STANDBY);
rm1.adminService.transitionToActive(requestInfo);
Assert.assertTrue(rm1.getRMContext().getHAServiceState()
== HAServiceState.ACTIVE);
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
csConf.set("yarn.scheduler.capacity.maximum-applications", "5000");
uploadConfiguration(csConf, "capacity-scheduler.xml");
rm1.adminService.refreshQueues(RefreshQueuesRequest.newInstance());
int maxApps =
((CapacityScheduler) rm1.getRMContext().getScheduler())
.getConfiguration().getMaximumSystemApplications();
Assert.assertEquals(maxApps, 5000);
// Before failover happens, the maxApps is
// still the default value on the standby rm : rm2
int maxAppsBeforeFailOver =
((CapacityScheduler) rm2.getRMContext().getScheduler())
.getConfiguration().getMaximumSystemApplications();
Assert.assertEquals(maxAppsBeforeFailOver, 10000);
// Do the failover
rm1.adminService.transitionToStandby(requestInfo);
rm2.adminService.transitionToActive(requestInfo);
Assert.assertTrue(rm1.getRMContext().getHAServiceState()
== HAServiceState.STANDBY);
Assert.assertTrue(rm2.getRMContext().getHAServiceState()
== HAServiceState.ACTIVE);
int maxAppsAfter =
((CapacityScheduler) rm2.getRMContext().getScheduler())
.getConfiguration().getMaximumSystemApplications();
Assert.assertEquals(maxAppsAfter, 5000);
} finally {
if (rm1 != null) {
rm1.stop();
}
if (rm2 != null) {
rm2.stop();
}
}
}
private String writeConfigurationXML(Configuration conf, String confXMLName)
throws IOException {
DataOutputStream output = null;