YARN-708. Moved RecordFactory classes to hadoop-yarn-api, and put some miscellaneous fixes to the interfaces. Contributed by Siddharth Seth.

svn merge --ignore-ancestry -c 1485478 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1485479 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-05-22 22:51:51 +00:00
parent e30e5becc2
commit 356be82fc5
15 changed files with 47 additions and 77 deletions

View File

@ -182,6 +182,9 @@ Release 2.0.5-beta - UNRELEASED
also in unsecure mode to prevent AMs from faking resource requirements in also in unsecure mode to prevent AMs from faking resource requirements in
unsecure mode. (Omkar Vinit Joshi via vinodkv) unsecure mode. (Omkar Vinit Joshi via vinodkv)
YARN-708. Moved RecordFactory classes to hadoop-yarn-api, and put some
miscellaneous fixes to the interfaces. (Siddharth Seth via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -56,25 +56,22 @@ public class YarnConfiguration extends Configuration {
public static final String IPC_PREFIX = YARN_PREFIX + "ipc."; public static final String IPC_PREFIX = YARN_PREFIX + "ipc.";
/** Factory to create client IPC classes.*/ /** Factory to create client IPC classes.*/
public static final String IPC_CLIENT_FACTORY = public static final String IPC_CLIENT_FACTORY_CLASS =
IPC_PREFIX + "client.factory.class"; IPC_PREFIX + "client.factory.class";
public static final String DEFAULT_IPC_CLIENT_FACTORY_CLASS =
/** Type of serialization to use.*/ "org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl";
public static final String IPC_SERIALIZER_TYPE =
IPC_PREFIX + "serializer.type";
public static final String DEFAULT_IPC_SERIALIZER_TYPE = "protocolbuffers";
/** Factory to create server IPC classes.*/ /** Factory to create server IPC classes.*/
public static final String IPC_SERVER_FACTORY = public static final String IPC_SERVER_FACTORY_CLASS =
IPC_PREFIX + "server.factory.class"; IPC_PREFIX + "server.factory.class";
public static final String DEFAULT_IPC_SERVER_FACTORY_CLASS =
/** Factory to create IPC exceptions.*/ "org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl";
public static final String IPC_EXCEPTION_FACTORY =
IPC_PREFIX + "exception.factory.class";
/** Factory to create serializeable records.*/ /** Factory to create serializeable records.*/
public static final String IPC_RECORD_FACTORY = public static final String IPC_RECORD_FACTORY_CLASS =
IPC_PREFIX + "record.factory.class"; IPC_PREFIX + "record.factory.class";
public static final String DEFAULT_IPC_RECORD_FACTORY_CLASS =
"org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl";
/** RPC class implementation*/ /** RPC class implementation*/
public static final String IPC_RPC_IMPL = public static final String IPC_RPC_IMPL =

View File

@ -18,9 +18,8 @@
package org.apache.hadoop.yarn.factories; package org.apache.hadoop.yarn.factories;
import org.apache.hadoop.yarn.YarnException;
public interface RecordFactory { public interface RecordFactory {
public <T> T newRecordInstance(Class<T> clazz) throws YarnException; public <T> T newRecordInstance(Class<T> clazz);
} }

View File

@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl;
public class RecordFactoryProvider { public class RecordFactoryProvider {
private static Configuration defaultConf; private static Configuration defaultConf;
@ -43,18 +42,11 @@ public class RecordFactoryProvider {
//Users can specify a particular factory by providing a configuration. //Users can specify a particular factory by providing a configuration.
conf = defaultConf; conf = defaultConf;
} }
String recordFactoryClassName = conf.get(YarnConfiguration.IPC_RECORD_FACTORY); String recordFactoryClassName = conf.get(
if (recordFactoryClassName == null) { YarnConfiguration.IPC_RECORD_FACTORY_CLASS,
String serializer = conf.get(YarnConfiguration.IPC_SERIALIZER_TYPE, YarnConfiguration.DEFAULT_IPC_SERIALIZER_TYPE); YarnConfiguration.DEFAULT_IPC_RECORD_FACTORY_CLASS);
if (serializer.equals(YarnConfiguration.DEFAULT_IPC_SERIALIZER_TYPE)) {
return RecordFactoryPBImpl.get();
} else {
throw new YarnException("Unknown serializer: [" + conf.get(YarnConfiguration.IPC_SERIALIZER_TYPE) + "]. Use keys: [" + YarnConfiguration.IPC_RECORD_FACTORY + "] to specify Record factory");
}
} else {
return (RecordFactory) getFactoryClassInstance(recordFactoryClassName); return (RecordFactory) getFactoryClassInstance(recordFactoryClassName);
} }
}
private static Object getFactoryClassInstance(String factoryClassName) { private static Object getFactoryClassInstance(String factoryClassName) {
try { try {

View File

@ -21,12 +21,11 @@ package org.apache.hadoop.yarn.factories;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.YarnException;
public interface RpcClientFactory { public interface RpcClientFactory {
public Object getClient(Class<?> protocol, long clientVersion, public Object getClient(Class<?> protocol, long clientVersion,
InetSocketAddress addr, Configuration conf) throws YarnException; InetSocketAddress addr, Configuration conf);
public void stopClient(Object proxy); public void stopClient(Object proxy);

View File

@ -24,13 +24,11 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.YarnException;
public interface RpcServerFactory { public interface RpcServerFactory {
public Server getServer(Class<?> protocol, Object instance, public Server getServer(Class<?> protocol, Object instance,
InetSocketAddress addr, Configuration conf, InetSocketAddress addr, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager, SecretManager<? extends TokenIdentifier> secretManager,
int numHandlers, String portRangeConfig) int numHandlers, String portRangeConfig);
throws YarnException;
} }

View File

@ -45,7 +45,7 @@ public class RecordFactoryPBImpl implements RecordFactory {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public <T> T newRecordInstance(Class<T> clazz) throws YarnException { public <T> T newRecordInstance(Class<T> clazz) {
Constructor<?> constructor = cache.get(clazz); Constructor<?> constructor = cache.get(clazz);
if (constructor == null) { if (constructor == null) {

View File

@ -50,7 +50,8 @@ public class RpcClientFactoryPBImpl implements RpcClientFactory {
private RpcClientFactoryPBImpl() { private RpcClientFactoryPBImpl() {
} }
public Object getClient(Class<?> protocol, long clientVersion, InetSocketAddress addr, Configuration conf) throws YarnException { public Object getClient(Class<?> protocol, long clientVersion,
InetSocketAddress addr, Configuration conf) {
Constructor<?> constructor = cache.get(protocol); Constructor<?> constructor = cache.get(protocol);
if (constructor == null) { if (constructor == null) {

View File

@ -63,8 +63,7 @@ public class RpcServerFactoryPBImpl implements RpcServerFactory {
public Server getServer(Class<?> protocol, Object instance, public Server getServer(Class<?> protocol, Object instance,
InetSocketAddress addr, Configuration conf, InetSocketAddress addr, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager, int numHandlers) SecretManager<? extends TokenIdentifier> secretManager, int numHandlers) {
throws YarnException {
return getServer(protocol, instance, addr, conf, secretManager, numHandlers, return getServer(protocol, instance, addr, conf, secretManager, numHandlers,
null); null);
} }
@ -73,8 +72,7 @@ public class RpcServerFactoryPBImpl implements RpcServerFactory {
public Server getServer(Class<?> protocol, Object instance, public Server getServer(Class<?> protocol, Object instance,
InetSocketAddress addr, Configuration conf, InetSocketAddress addr, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager, int numHandlers, SecretManager<? extends TokenIdentifier> secretManager, int numHandlers,
String portRangeConfig) String portRangeConfig) {
throws YarnException {
Constructor<?> constructor = serviceCache.get(protocol); Constructor<?> constructor = serviceCache.get(protocol);
if (constructor == null) { if (constructor == null) {

View File

@ -21,21 +21,16 @@ package org.apache.hadoop.yarn.factory.providers;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RpcClientFactory; import org.apache.hadoop.yarn.factories.RpcClientFactory;
import org.apache.hadoop.yarn.factories.RpcServerFactory; import org.apache.hadoop.yarn.factories.RpcServerFactory;
import org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl;
import org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl;
/** /**
* A public static get() method must be present in the Client/Server Factory implementation. * A public static get() method must be present in the Client/Server Factory implementation.
*/ */
public class RpcFactoryProvider { public class RpcFactoryProvider {
private static final Log LOG = LogFactory.getLog(RpcFactoryProvider.class);
private RpcFactoryProvider() { private RpcFactoryProvider() {
@ -46,29 +41,17 @@ public class RpcFactoryProvider {
if (conf == null) { if (conf == null) {
conf = new Configuration(); conf = new Configuration();
} }
String serverFactoryClassName = conf.get(YarnConfiguration.IPC_SERVER_FACTORY); String serverFactoryClassName = conf.get(
if (serverFactoryClassName == null) { YarnConfiguration.IPC_SERVER_FACTORY_CLASS,
if (conf.get(YarnConfiguration.IPC_SERIALIZER_TYPE, YarnConfiguration.DEFAULT_IPC_SERIALIZER_TYPE).equals(YarnConfiguration.DEFAULT_IPC_SERIALIZER_TYPE)) { YarnConfiguration.DEFAULT_IPC_SERVER_FACTORY_CLASS);
return RpcServerFactoryPBImpl.get();
} else {
throw new YarnException("Unknown serializer: [" + conf.get(YarnConfiguration.IPC_SERIALIZER_TYPE) + "]. Use keys: [" + YarnConfiguration.IPC_CLIENT_FACTORY + "][" + YarnConfiguration.IPC_SERVER_FACTORY + "] to specify factories");
}
} else {
return (RpcServerFactory) getFactoryClassInstance(serverFactoryClassName); return (RpcServerFactory) getFactoryClassInstance(serverFactoryClassName);
} }
}
public static RpcClientFactory getClientFactory(Configuration conf) { public static RpcClientFactory getClientFactory(Configuration conf) {
String clientFactoryClassName = conf.get(YarnConfiguration.IPC_CLIENT_FACTORY); String clientFactoryClassName = conf.get(
if (clientFactoryClassName == null) { YarnConfiguration.IPC_CLIENT_FACTORY_CLASS,
if (conf.get(YarnConfiguration.IPC_SERIALIZER_TYPE, YarnConfiguration.DEFAULT_IPC_SERIALIZER_TYPE).equals(YarnConfiguration.DEFAULT_IPC_SERIALIZER_TYPE)) { YarnConfiguration.DEFAULT_IPC_CLIENT_FACTORY_CLASS);
return RpcClientFactoryPBImpl.get(); return (RpcClientFactory) getFactoryClassInstance(clientFactoryClassName);
} else {
throw new YarnException("Unknown serializer: [" + conf.get(YarnConfiguration.IPC_SERIALIZER_TYPE) + "]. Use keys: [" + YarnConfiguration.IPC_CLIENT_FACTORY + "][" + YarnConfiguration.IPC_SERVER_FACTORY + "] to specify factories");
}
} else {
return(RpcClientFactory) getFactoryClassInstance(clientFactoryClassName);
}
} }
private static Object getFactoryClassInstance(String factoryClassName) { private static Object getFactoryClassInstance(String factoryClassName) {

View File

@ -43,7 +43,10 @@ public class TestRpcFactoryProvider {
Assert.assertEquals(RpcClientFactoryPBImpl.class, clientFactory.getClass()); Assert.assertEquals(RpcClientFactoryPBImpl.class, clientFactory.getClass());
Assert.assertEquals(RpcServerFactoryPBImpl.class, serverFactory.getClass()); Assert.assertEquals(RpcServerFactoryPBImpl.class, serverFactory.getClass());
conf.set(YarnConfiguration.IPC_SERIALIZER_TYPE, "writable"); conf.set(YarnConfiguration.IPC_CLIENT_FACTORY_CLASS, "unknown");
conf.set(YarnConfiguration.IPC_SERVER_FACTORY_CLASS, "unknown");
conf.set(YarnConfiguration.IPC_RECORD_FACTORY_CLASS, "unknown");
try { try {
clientFactory = RpcFactoryProvider.getClientFactory(conf); clientFactory = RpcFactoryProvider.getClientFactory(conf);
Assert.fail("Expected an exception - unknown serializer"); Assert.fail("Expected an exception - unknown serializer");
@ -56,8 +59,8 @@ public class TestRpcFactoryProvider {
} }
conf = new Configuration(); conf = new Configuration();
conf.set(YarnConfiguration.IPC_CLIENT_FACTORY, "NonExistantClass"); conf.set(YarnConfiguration.IPC_CLIENT_FACTORY_CLASS, "NonExistantClass");
conf.set(YarnConfiguration.IPC_SERVER_FACTORY, RpcServerFactoryPBImpl.class.getName()); conf.set(YarnConfiguration.IPC_SERVER_FACTORY_CLASS, RpcServerFactoryPBImpl.class.getName());
try { try {
clientFactory = RpcFactoryProvider.getClientFactory(conf); clientFactory = RpcFactoryProvider.getClientFactory(conf);

View File

@ -18,11 +18,13 @@
package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb; package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
@ -30,9 +32,7 @@ import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*;
public class TestPBLocalizerRPC { public class TestPBLocalizerRPC {
@ -40,7 +40,6 @@ public class TestPBLocalizerRPC {
static RecordFactory createPBRecordFactory() { static RecordFactory createPBRecordFactory() {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(YarnConfiguration.IPC_SERIALIZER_TYPE, "protocolbuffers");
return RecordFactoryProvider.getRecordFactory(conf); return RecordFactoryProvider.getRecordFactory(conf);
} }

View File

@ -31,7 +31,6 @@ import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalResourceStatusProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalResourceStatusProto;
@ -53,7 +52,6 @@ public class TestPBRecordImpl {
static RecordFactory createPBRecordFactory() { static RecordFactory createPBRecordFactory() {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(YarnConfiguration.IPC_SERIALIZER_TYPE, "protocolbuffers");
return RecordFactoryProvider.getRecordFactory(conf); return RecordFactoryProvider.getRecordFactory(conf);
} }