Merge trunk into HA branch.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1293279 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2012-02-24 14:49:38 +00:00
commit 328702891e
9 changed files with 116 additions and 21 deletions

View File

@ -89,6 +89,12 @@ Trunk (unreleased changes)
HADOOP-8084. Updates ProtoBufRpc engine to not do an unnecessary copy
for RPC request/response. (ddas)
HADOOP-8085. Add RPC metrics to ProtobufRpcEngine. (Hari Mankude via
suresh)
HADOOP-8108. Move method getHostPortString() from NameNode to NetUtils.
(Brandon Li via jitendra)
BUG FIXES
HADOOP-8018. Hudson auto test for HDFS has started throwing javadoc

View File

@ -379,6 +379,24 @@ public class ProtobufRpcEngine implements RpcEngine {
* Protobuf invoker for {@link RpcInvoker}
*/
static class ProtoBufRpcInvoker implements RpcInvoker {
private static ProtoClassProtoImpl getProtocolImpl(RPC.Server server,
String protoName, long version) throws IOException {
ProtoNameVer pv = new ProtoNameVer(protoName, version);
ProtoClassProtoImpl impl =
server.getProtocolImplMap(RpcKind.RPC_PROTOCOL_BUFFER).get(pv);
if (impl == null) { // no match for Protocol AND Version
VerProtocolImpl highest =
server.getHighestSupportedProtocol(RpcKind.RPC_PROTOCOL_BUFFER,
protoName);
if (highest == null) {
throw new IOException("Unknown protocol: " + protoName);
}
// protocol supported but not the version that client wants
throw new RPC.VersionMismatch(protoName, version,
highest.version);
}
return impl;
}
@Override
/**
@ -409,21 +427,8 @@ public class ProtobufRpcEngine implements RpcEngine {
if (server.verbose)
LOG.info("Call: protocol=" + protocol + ", method=" + methodName);
ProtoNameVer pv = new ProtoNameVer(protoName, clientVersion);
ProtoClassProtoImpl protocolImpl =
server.getProtocolImplMap(RpcKind.RPC_PROTOCOL_BUFFER).get(pv);
if (protocolImpl == null) { // no match for Protocol AND Version
VerProtocolImpl highest =
server.getHighestSupportedProtocol(RpcKind.RPC_PROTOCOL_BUFFER,
protoName);
if (highest == null) {
throw new IOException("Unknown protocol: " + protoName);
}
// protocol supported but not the version that client wants
throw new RPC.VersionMismatch(protoName, clientVersion,
highest.version);
}
ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, protoName,
clientVersion);
BlockingService service = (BlockingService) protocolImpl.protocolImpl;
MethodDescriptor methodDescriptor = service.getDescriptorForType()
.findMethodByName(methodName);
@ -438,7 +443,19 @@ public class ProtobufRpcEngine implements RpcEngine {
.mergeFrom(rpcRequest.getRequest()).build();
Message result;
try {
long startTime = System.currentTimeMillis();
server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
result = service.callBlockingMethod(methodDescriptor, null, param);
int processingTime = (int) (System.currentTimeMillis() - startTime);
int qTime = (int) (startTime - receiveTime);
if (LOG.isDebugEnabled()) {
LOG.info("Served: " + methodName + " queueTime= " + qTime +
" procesingTime= " + processingTime);
}
server.rpcMetrics.addRpcQueueTime(qTime);
server.rpcMetrics.addRpcProcessingTime(processingTime);
server.rpcDetailedMetrics.addProcessingTime(methodName,
processingTime);
} catch (ServiceException e) {
Throwable cause = e.getCause();
return handleException(cause != null ? cause : e);

View File

@ -317,10 +317,15 @@ public abstract class Server {
* Returns a handle to the rpcMetrics (required in tests)
* @return rpc metrics
*/
@VisibleForTesting
public RpcMetrics getRpcMetrics() {
return rpcMetrics;
}
@VisibleForTesting
public RpcDetailedMetrics getRpcDetailedMetrics() {
return rpcDetailedMetrics;
}
@VisibleForTesting
Iterable<? extends Thread> getHandlers() {

View File

@ -606,6 +606,13 @@ public class NetUtils {
catch(UnknownHostException uhe) {return "" + uhe;}
}
/**
* Compose a "host:port" string from the address.
*/
public static String getHostPortString(InetSocketAddress addr) {
return addr.getHostName() + ":" + addr.getPort();
}
/**
* Checks if {@code host} is a local host name and return {@link InetAddress}
* corresponding to that address.

View File

@ -17,6 +17,9 @@
*/
package org.apache.hadoop.ipc;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -28,6 +31,7 @@ import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto;
import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpc2Proto;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.net.NetUtils;
import org.junit.Assert;
import org.junit.Test;
@ -187,5 +191,14 @@ public class TestProtoBufRpc {
.setMessage("hello").build();
EchoResponseProto echoResponse = client.echo2(null, echoRequest);
Assert.assertEquals(echoResponse.getMessage(), "hello");
// Ensure RPC metrics are updated
MetricsRecordBuilder rpcMetrics = getMetrics(server.getRpcMetrics().name());
assertCounterGt("RpcQueueTimeNumOps", 0L, rpcMetrics);
assertCounterGt("RpcProcessingTimeNumOps", 0L, rpcMetrics);
MetricsRecordBuilder rpcDetailedMetrics =
getMetrics(server.getRpcDetailedMetrics().name());
assertCounterGt("Echo2NumOps", 0L, rpcDetailedMetrics);
}
}

View File

@ -137,6 +137,12 @@ Release 0.23.2 - UNRELEASED
MAPREDUCE-3884. PWD should be first in the classpath of MR tasks (tucu)
MAPREDUCE-3878. Null user on filtered jobhistory job page (Jonathon Eagles
via tgraves)
MAPREDUCE-3738. MM can hang during shutdown if AppLogAggregatorImpl thread
dies unexpectedly (Jason Lowe via sseth)
Release 0.23.1 - 2012-02-17
INCOMPATIBLE CHANGES

View File

@ -343,9 +343,15 @@ public class AppController extends Controller implements AMParams {
* @return True if the requesting user has permission to view the job
*/
boolean checkAccess(Job job) {
UserGroupInformation callerUgi = UserGroupInformation.createRemoteUser(
request().getRemoteUser());
return job.checkAccess(callerUgi, JobACL.VIEW_JOB);
String remoteUser = request().getRemoteUser();
UserGroupInformation callerUGI = null;
if (remoteUser != null) {
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
}
if (callerUGI != null && !job.checkAccess(callerUGI, JobACL.VIEW_JOB)) {
return false;
}
return true;
}
/**

View File

@ -133,8 +133,16 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
}
@Override
@SuppressWarnings("unchecked")
public void run() {
try {
doAppLogAggregation();
} finally {
this.appAggregationFinished.set(true);
}
}
@SuppressWarnings("unchecked")
private void doAppLogAggregation() {
ContainerId containerId;
while (!this.appFinishing.get()) {
@ -189,8 +197,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
this.dispatcher.getEventHandler().handle(
new ApplicationEvent(this.appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
this.appAggregationFinished.set(true);
}
private Path getRemoteNodeTmpLogFileForApp() {

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregatio
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertTrue;
@ -69,6 +70,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
@ -536,4 +538,31 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
appAcls.put(ApplicationAccessType.VIEW_APP, "*");
return appAcls;
}
@Test(timeout=20000)
@SuppressWarnings("unchecked")
public void testStopAfterError() throws Exception {
DeletionService delSrvc = mock(DeletionService.class);
// get the AppLogAggregationImpl thread to crash
LocalDirsHandlerService mockedDirSvc = mock(LocalDirsHandlerService.class);
when(mockedDirSvc.getLogDirs()).thenThrow(new RuntimeException());
DrainDispatcher dispatcher = createDispatcher();
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
LogAggregationService logAggregationService =
new LogAggregationService(dispatcher, this.context, delSrvc,
mockedDirSvc);
logAggregationService.init(this.conf);
logAggregationService.start();
ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
logAggregationService.handle(new LogHandlerAppStartedEvent(
application1, this.user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
logAggregationService.stop();
}
}