Merge trunk into HA branch.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1234087 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2012-01-20 19:30:53 +00:00
commit 2abe5c818d
21 changed files with 133 additions and 38 deletions

View File

@ -279,6 +279,9 @@ Release 0.23.1 - Unreleased
HADOOP-7971. Adding back job/pipes/queue commands to bin/hadoop for HADOOP-7971. Adding back job/pipes/queue commands to bin/hadoop for
backward compatibility. (Prashath Sharma via acmurthy) backward compatibility. (Prashath Sharma via acmurthy)
HADOOP-7982. UserGroupInformation fails to login if thread's context
classloader can't load HadoopLoginModule. (todd)
Release 0.23.0 - 2011-11-01 Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -416,9 +416,19 @@ public class UserGroupInformation {
private static LoginContext private static LoginContext
newLoginContext(String appName, Subject subject) throws LoginException { newLoginContext(String appName, Subject subject) throws LoginException {
return new LoginContext(appName, subject, null, new HadoopConfiguration()); // Temporarily switch the thread's ContextClassLoader to match this
// class's classloader, so that we can properly load HadoopLoginModule
// from the JAAS libraries.
Thread t = Thread.currentThread();
ClassLoader oldCCL = t.getContextClassLoader();
t.setContextClassLoader(HadoopLoginModule.class.getClassLoader());
try {
return new LoginContext(appName, subject, null, new HadoopConfiguration());
} finally {
t.setContextClassLoader(oldCCL);
}
} }
private LoginContext getLogin() { private LoginContext getLogin() {
return user.getLogin(); return user.getLogin();
} }

View File

@ -169,6 +169,9 @@ Trunk (unreleased changes)
HDFS-2776. Missing interface annotation on JournalSet. HDFS-2776. Missing interface annotation on JournalSet.
(Brandon Li via jitendra) (Brandon Li via jitendra)
HDFS-2768. BackupNode stop can not close proxy connections because
it is not a proxy instance. (Uma Maheswara Rao G via eli)
Release 0.23.1 - UNRELEASED Release 0.23.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
@ -335,6 +338,11 @@ Release 0.23.1 - UNRELEASED
HDFS-2790. FSNamesystem.setTimes throws exception with wrong HDFS-2790. FSNamesystem.setTimes throws exception with wrong
configuration name in the message. (Arpit Gupta via eli) configuration name in the message. (Arpit Gupta via eli)
HDFS-2810. Leases not getting renewed properly by clients (todd)
HDFS-2751. Datanode may incorrectly drop OS cache behind reads
even for short reads. (todd)
Release 0.23.0 - 2011-11-01 Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -404,11 +404,17 @@ public class DFSClient implements java.io.Closeable {
return clientRunning; return clientRunning;
} }
/** Renew leases */ /**
void renewLease() throws IOException { * Renew leases.
* @return true if lease was renewed. May return false if this
* client has been closed or has no files open.
**/
boolean renewLease() throws IOException {
if (clientRunning && !isFilesBeingWrittenEmpty()) { if (clientRunning && !isFilesBeingWrittenEmpty()) {
namenode.renewLease(clientName); namenode.renewLease(clientName);
return true;
} }
return false;
} }
/** /**

View File

@ -67,7 +67,7 @@ import org.apache.hadoop.util.StringUtils;
* </p> * </p>
*/ */
class LeaseRenewer { class LeaseRenewer {
private static final Log LOG = LogFactory.getLog(LeaseRenewer.class); static final Log LOG = LogFactory.getLog(LeaseRenewer.class);
static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L; static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L; static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
@ -407,7 +407,13 @@ class LeaseRenewer {
final DFSClient c = copies.get(i); final DFSClient c = copies.get(i);
//skip if current client name is the same as the previous name. //skip if current client name is the same as the previous name.
if (!c.getClientName().equals(previousName)) { if (!c.getClientName().equals(previousName)) {
c.renewLease(); if (!c.renewLease()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Did not renew lease for client " +
c);
}
continue;
}
previousName = c.getClientName(); previousName = c.getClientName();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Lease renewed for client " + previousName); LOG.debug("Lease renewed for client " + previousName);

View File

@ -315,7 +315,7 @@ class BlockSender implements java.io.Closeable {
* close opened files. * close opened files.
*/ */
public void close() throws IOException { public void close() throws IOException {
if (blockInFd != null && shouldDropCacheBehindRead) { if (blockInFd != null && shouldDropCacheBehindRead && isLongRead()) {
// drop the last few MB of the file from cache // drop the last few MB of the file from cache
try { try {
NativeIO.posixFadviseIfPossible( NativeIO.posixFadviseIfPossible(

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -70,7 +71,7 @@ public class BackupNode extends NameNode {
private static final String BN_SERVICE_RPC_ADDRESS_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY; private static final String BN_SERVICE_RPC_ADDRESS_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY;
/** Name-node proxy */ /** Name-node proxy */
NamenodeProtocol namenode; NamenodeProtocolTranslatorPB namenode;
/** Name-node RPC address */ /** Name-node RPC address */
String nnRpcAddress; String nnRpcAddress;
/** Name-node HTTP address */ /** Name-node HTTP address */
@ -191,7 +192,7 @@ public class BackupNode extends NameNode {
} }
// Stop the RPC client // Stop the RPC client
if (namenode != null) { if (namenode != null) {
RPC.stopProxy(namenode); IOUtils.cleanup(LOG, namenode);
} }
namenode = null; namenode = null;
// Stop the checkpoint manager // Stop the checkpoint manager

View File

@ -24,10 +24,9 @@ import java.util.Arrays;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
/** /**
@ -41,7 +40,7 @@ import org.apache.hadoop.net.NetUtils;
class EditLogBackupOutputStream extends EditLogOutputStream { class EditLogBackupOutputStream extends EditLogOutputStream {
static int DEFAULT_BUFFER_SIZE = 256; static int DEFAULT_BUFFER_SIZE = 256;
private JournalProtocol backupNode; // RPC proxy to backup node private JournalProtocolTranslatorPB backupNode; // RPC proxy to backup node
private NamenodeRegistration bnRegistration; // backup node registration private NamenodeRegistration bnRegistration; // backup node registration
private NamenodeRegistration nnRegistration; // active node registration private NamenodeRegistration nnRegistration; // active node registration
private EditsDoubleBuffer doubleBuf; private EditsDoubleBuffer doubleBuf;
@ -94,14 +93,14 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
throw new IOException("BackupEditStream has " + size + throw new IOException("BackupEditStream has " + size +
" records still to be flushed and cannot be closed."); " records still to be flushed and cannot be closed.");
} }
RPC.stopProxy(backupNode); // stop the RPC threads IOUtils.cleanup(Storage.LOG, backupNode); // stop the RPC threads
doubleBuf.close(); doubleBuf.close();
doubleBuf = null; doubleBuf = null;
} }
@Override @Override
public void abort() throws IOException { public void abort() throws IOException {
RPC.stopProxy(backupNode); IOUtils.cleanup(Storage.LOG, backupNode);
doubleBuf = null; doubleBuf = null;
} }

View File

@ -17,11 +17,14 @@
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import static org.junit.Assert.*;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -29,6 +32,8 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import com.google.common.base.Supplier;
public class TestLeaseRenewer { public class TestLeaseRenewer {
private String FAKE_AUTHORITY="hdfs://nn1/"; private String FAKE_AUTHORITY="hdfs://nn1/";
private UserGroupInformation FAKE_UGI_A = private UserGroupInformation FAKE_UGI_A =
@ -46,19 +51,24 @@ public class TestLeaseRenewer {
@Before @Before
public void setupMocksAndRenewer() throws IOException { public void setupMocksAndRenewer() throws IOException {
MOCK_DFSCLIENT = Mockito.mock(DFSClient.class); MOCK_DFSCLIENT = createMockClient();
Mockito.doReturn(true)
.when(MOCK_DFSCLIENT).isClientRunning();
Mockito.doReturn((int)FAST_GRACE_PERIOD)
.when(MOCK_DFSCLIENT).getHdfsTimeout();
Mockito.doReturn("myclient")
.when(MOCK_DFSCLIENT).getClientName();
renewer = LeaseRenewer.getInstance( renewer = LeaseRenewer.getInstance(
FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT); FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
renewer.setGraceSleepPeriod(FAST_GRACE_PERIOD); renewer.setGraceSleepPeriod(FAST_GRACE_PERIOD);
} }
private DFSClient createMockClient() {
DFSClient mock = Mockito.mock(DFSClient.class);
Mockito.doReturn(true)
.when(mock).isClientRunning();
Mockito.doReturn((int)FAST_GRACE_PERIOD)
.when(mock).getHdfsTimeout();
Mockito.doReturn("myclient")
.when(mock).getClientName();
return mock;
}
@Test @Test
public void testInstanceSharing() throws IOException { public void testInstanceSharing() throws IOException {
// Two lease renewers with the same UGI should return // Two lease renewers with the same UGI should return
@ -93,11 +103,11 @@ public class TestLeaseRenewer {
public void testRenewal() throws Exception { public void testRenewal() throws Exception {
// Keep track of how many times the lease gets renewed // Keep track of how many times the lease gets renewed
final AtomicInteger leaseRenewalCount = new AtomicInteger(); final AtomicInteger leaseRenewalCount = new AtomicInteger();
Mockito.doAnswer(new Answer<Void>() { Mockito.doAnswer(new Answer<Boolean>() {
@Override @Override
public Void answer(InvocationOnMock invocation) throws Throwable { public Boolean answer(InvocationOnMock invocation) throws Throwable {
leaseRenewalCount.incrementAndGet(); leaseRenewalCount.incrementAndGet();
return null; return true;
} }
}).when(MOCK_DFSCLIENT).renewLease(); }).when(MOCK_DFSCLIENT).renewLease();
@ -120,6 +130,57 @@ public class TestLeaseRenewer {
renewer.closeFile(filePath, MOCK_DFSCLIENT); renewer.closeFile(filePath, MOCK_DFSCLIENT);
} }
/**
* Regression test for HDFS-2810. In this bug, the LeaseRenewer has handles
* to several DFSClients with the same name, the first of which has no files
* open. Previously, this was causing the lease to not get renewed.
*/
@Test
public void testManyDfsClientsWhereSomeNotOpen() throws Exception {
// First DFSClient has no files open so doesn't renew leases.
final DFSClient mockClient1 = createMockClient();
Mockito.doReturn(false).when(mockClient1).renewLease();
assertSame(renewer, LeaseRenewer.getInstance(
FAKE_AUTHORITY, FAKE_UGI_A, mockClient1));
// Set up a file so that we start renewing our lease.
DFSOutputStream mockStream1 = Mockito.mock(DFSOutputStream.class);
String filePath = "/foo";
renewer.put(filePath, mockStream1, mockClient1);
// Second DFSClient does renew lease
final DFSClient mockClient2 = createMockClient();
Mockito.doReturn(true).when(mockClient2).renewLease();
assertSame(renewer, LeaseRenewer.getInstance(
FAKE_AUTHORITY, FAKE_UGI_A, mockClient2));
// Set up a file so that we start renewing our lease.
DFSOutputStream mockStream2 = Mockito.mock(DFSOutputStream.class);
renewer.put(filePath, mockStream2, mockClient2);
// Wait for lease to get renewed
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
Mockito.verify(mockClient1, Mockito.atLeastOnce()).renewLease();
Mockito.verify(mockClient2, Mockito.atLeastOnce()).renewLease();
return true;
} catch (AssertionError err) {
LeaseRenewer.LOG.warn("Not yet satisfied", err);
return false;
} catch (IOException e) {
// should not throw!
throw new RuntimeException(e);
}
}
}, 100, 10000);
renewer.closeFile(filePath, mockClient1);
renewer.closeFile(filePath, mockClient2);
}
@Test @Test
public void testThreadName() throws Exception { public void testThreadName() throws Exception {
DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class); DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class);

View File

@ -187,6 +187,8 @@ Release 0.23.1 - Unreleased
assign only one off-switch container in a single scheduling assign only one off-switch container in a single scheduling
iteration. (Arun C Murthy via vinodkv) iteration. (Arun C Murthy via vinodkv)
MAPREDUCE-3692. yarn-resourcemanager out and log files can get big. (eli)
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar

View File

@ -632,7 +632,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
* The only entry point to change the Job. * The only entry point to change the Job.
*/ */
public void handle(JobEvent event) { public void handle(JobEvent event) {
LOG.info("Processing " + event.getJobId() + " of type " + event.getType()); LOG.debug("Processing " + event.getJobId() + " of type " + event.getType());
try { try {
writeLock.lock(); writeLock.lock();
JobState oldState = getState(); JobState oldState = getState();

View File

@ -537,7 +537,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
@Override @Override
public void handle(TaskEvent event) { public void handle(TaskEvent event) {
LOG.info("Processing " + event.getTaskID() + " of type " + event.getType()); LOG.debug("Processing " + event.getTaskID() + " of type " + event.getType());
try { try {
writeLock.lock(); writeLock.lock();
TaskState oldState = getState(); TaskState oldState = getState();

View File

@ -315,11 +315,10 @@ public class ProtoOverHadoopRpcEngine implements RpcEngine {
ProtoSpecificRequestWritable request = (ProtoSpecificRequestWritable) writableRequest; ProtoSpecificRequestWritable request = (ProtoSpecificRequestWritable) writableRequest;
ProtoSpecificRpcRequest rpcRequest = request.message; ProtoSpecificRpcRequest rpcRequest = request.message;
String methodName = rpcRequest.getMethodName(); String methodName = rpcRequest.getMethodName();
System.out.println("Call: protocol=" + protocol + ", method=" if (verbose) {
+ methodName);
if (verbose)
log("Call: protocol=" + protocol + ", method=" log("Call: protocol=" + protocol + ", method="
+ methodName); + methodName);
}
MethodDescriptor methodDescriptor = service.getDescriptorForType() MethodDescriptor methodDescriptor = service.getDescriptorForType()
.findMethodByName(methodName); .findMethodByName(methodName);
if (methodDescriptor == null) { if (methodDescriptor == null) {

View File

@ -373,7 +373,7 @@ public class ApplicationImpl implements Application {
try { try {
ApplicationId applicationID = event.getApplicationID(); ApplicationId applicationID = event.getApplicationID();
LOG.info("Processing " + applicationID + " of type " + event.getType()); LOG.debug("Processing " + applicationID + " of type " + event.getType());
ApplicationState oldState = stateMachine.getCurrentState(); ApplicationState oldState = stateMachine.getCurrentState();
ApplicationState newState = null; ApplicationState newState = null;

View File

@ -811,7 +811,7 @@ public class ContainerImpl implements Container {
this.writeLock.lock(); this.writeLock.lock();
ContainerId containerID = event.getContainerID(); ContainerId containerID = event.getContainerID();
LOG.info("Processing " + containerID + " of type " + event.getType()); LOG.debug("Processing " + containerID + " of type " + event.getType());
ContainerState oldState = stateMachine.getCurrentState(); ContainerState oldState = stateMachine.getCurrentState();
ContainerState newState = null; ContainerState newState = null;

View File

@ -181,7 +181,7 @@ public class LocalizedResource implements EventHandler<ResourceEvent> {
this.writeLock.lock(); this.writeLock.lock();
Path resourcePath = event.getLocalResourceRequest().getPath(); Path resourcePath = event.getLocalResourceRequest().getPath();
LOG.info("Processing " + resourcePath + " of type " + event.getType()); LOG.debug("Processing " + resourcePath + " of type " + event.getType());
ResourceState oldState = this.stateMachine.getCurrentState(); ResourceState oldState = this.stateMachine.getCurrentState();
ResourceState newState = null; ResourceState newState = null;

View File

@ -413,7 +413,7 @@ public class RMAppImpl implements RMApp {
try { try {
ApplicationId appID = event.getApplicationId(); ApplicationId appID = event.getApplicationId();
LOG.info("Processing event for " + appID + " of type " LOG.debug("Processing event for " + appID + " of type "
+ event.getType()); + event.getType());
final RMAppState oldState = getState(); final RMAppState oldState = getState();
try { try {

View File

@ -468,7 +468,7 @@ public class RMAppAttemptImpl implements RMAppAttempt {
try { try {
ApplicationAttemptId appAttemptID = event.getApplicationAttemptId(); ApplicationAttemptId appAttemptID = event.getApplicationAttemptId();
LOG.info("Processing event for " + appAttemptID + " of type " LOG.debug("Processing event for " + appAttemptID + " of type "
+ event.getType()); + event.getType());
final RMAppAttemptState oldState = getAppAttemptState(); final RMAppAttemptState oldState = getAppAttemptState();
try { try {

View File

@ -192,7 +192,7 @@ public class RMContainerImpl implements RMContainer {
@Override @Override
public void handle(RMContainerEvent event) { public void handle(RMContainerEvent event) {
LOG.info("Processing " + event.getContainerId() + " of type " + event.getType()); LOG.debug("Processing " + event.getContainerId() + " of type " + event.getType());
try { try {
writeLock.lock(); writeLock.lock();
RMContainerState oldState = getState(); RMContainerState oldState = getState();

View File

@ -283,7 +283,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
} }
public void handle(RMNodeEvent event) { public void handle(RMNodeEvent event) {
LOG.info("Processing " + event.getNodeId() + " of type " + event.getType()); LOG.debug("Processing " + event.getNodeId() + " of type " + event.getType());
try { try {
writeLock.lock(); writeLock.lock();
RMNodeState oldState = getState(); RMNodeState oldState = getState();

View File

@ -575,12 +575,12 @@ public class FifoScheduler implements ResourceScheduler {
if (Resources.greaterThanOrEqual(node.getAvailableResource(), if (Resources.greaterThanOrEqual(node.getAvailableResource(),
minimumAllocation)) { minimumAllocation)) {
LOG.info("Node heartbeat " + rmNode.getNodeID() + LOG.debug("Node heartbeat " + rmNode.getNodeID() +
" available resource = " + node.getAvailableResource()); " available resource = " + node.getAvailableResource());
assignContainers(node); assignContainers(node);
LOG.info("Node after allocation " + rmNode.getNodeID() + " resource = " LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "
+ node.getAvailableResource()); + node.getAvailableResource());
} }