YARN-708. Moved RecordFactory classes to hadoop-yarn-api, and put some miscellaneous fixes to the interfaces. Contributed by Siddharth Seth.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1485478 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8c62c46046
commit
1fac1ac98c
|
@ -260,6 +260,9 @@ Release 2.0.5-beta - UNRELEASED
|
|||
also in unsecure mode to prevent AMs from faking resource requirements in
|
||||
unsecure mode. (Omkar Vinit Joshi via vinodkv)
|
||||
|
||||
YARN-708. Moved RecordFactory classes to hadoop-yarn-api, and put some
|
||||
miscellaneous fixes to the interfaces. (Siddharth Seth via vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -56,26 +56,23 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final String IPC_PREFIX = YARN_PREFIX + "ipc.";
|
||||
|
||||
/** Factory to create client IPC classes.*/
|
||||
public static final String IPC_CLIENT_FACTORY =
|
||||
public static final String IPC_CLIENT_FACTORY_CLASS =
|
||||
IPC_PREFIX + "client.factory.class";
|
||||
|
||||
/** Type of serialization to use.*/
|
||||
public static final String IPC_SERIALIZER_TYPE =
|
||||
IPC_PREFIX + "serializer.type";
|
||||
public static final String DEFAULT_IPC_SERIALIZER_TYPE = "protocolbuffers";
|
||||
|
||||
public static final String DEFAULT_IPC_CLIENT_FACTORY_CLASS =
|
||||
"org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl";
|
||||
|
||||
/** Factory to create server IPC classes.*/
|
||||
public static final String IPC_SERVER_FACTORY =
|
||||
public static final String IPC_SERVER_FACTORY_CLASS =
|
||||
IPC_PREFIX + "server.factory.class";
|
||||
|
||||
/** Factory to create IPC exceptions.*/
|
||||
public static final String IPC_EXCEPTION_FACTORY =
|
||||
IPC_PREFIX + "exception.factory.class";
|
||||
|
||||
public static final String DEFAULT_IPC_SERVER_FACTORY_CLASS =
|
||||
"org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl";
|
||||
|
||||
/** Factory to create serializeable records.*/
|
||||
public static final String IPC_RECORD_FACTORY =
|
||||
public static final String IPC_RECORD_FACTORY_CLASS =
|
||||
IPC_PREFIX + "record.factory.class";
|
||||
|
||||
public static final String DEFAULT_IPC_RECORD_FACTORY_CLASS =
|
||||
"org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl";
|
||||
|
||||
/** RPC class implementation*/
|
||||
public static final String IPC_RPC_IMPL =
|
||||
IPC_PREFIX + "rpc.class";
|
|
@ -18,9 +18,8 @@
|
|||
|
||||
package org.apache.hadoop.yarn.factories;
|
||||
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
|
||||
|
||||
public interface RecordFactory {
|
||||
public <T> T newRecordInstance(Class<T> clazz) throws YarnException;
|
||||
public <T> T newRecordInstance(Class<T> clazz);
|
||||
}
|
|
@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.yarn.YarnException;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl;
|
||||
|
||||
public class RecordFactoryProvider {
|
||||
private static Configuration defaultConf;
|
||||
|
@ -43,17 +42,10 @@ public class RecordFactoryProvider {
|
|||
//Users can specify a particular factory by providing a configuration.
|
||||
conf = defaultConf;
|
||||
}
|
||||
String recordFactoryClassName = conf.get(YarnConfiguration.IPC_RECORD_FACTORY);
|
||||
if (recordFactoryClassName == null) {
|
||||
String serializer = conf.get(YarnConfiguration.IPC_SERIALIZER_TYPE, YarnConfiguration.DEFAULT_IPC_SERIALIZER_TYPE);
|
||||
if (serializer.equals(YarnConfiguration.DEFAULT_IPC_SERIALIZER_TYPE)) {
|
||||
return RecordFactoryPBImpl.get();
|
||||
} else {
|
||||
throw new YarnException("Unknown serializer: [" + conf.get(YarnConfiguration.IPC_SERIALIZER_TYPE) + "]. Use keys: [" + YarnConfiguration.IPC_RECORD_FACTORY + "] to specify Record factory");
|
||||
}
|
||||
} else {
|
||||
return (RecordFactory) getFactoryClassInstance(recordFactoryClassName);
|
||||
}
|
||||
String recordFactoryClassName = conf.get(
|
||||
YarnConfiguration.IPC_RECORD_FACTORY_CLASS,
|
||||
YarnConfiguration.DEFAULT_IPC_RECORD_FACTORY_CLASS);
|
||||
return (RecordFactory) getFactoryClassInstance(recordFactoryClassName);
|
||||
}
|
||||
|
||||
private static Object getFactoryClassInstance(String factoryClassName) {
|
|
@ -21,12 +21,11 @@ package org.apache.hadoop.yarn.factories;
|
|||
import java.net.InetSocketAddress;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
|
||||
public interface RpcClientFactory {
|
||||
|
||||
public Object getClient(Class<?> protocol, long clientVersion,
|
||||
InetSocketAddress addr, Configuration conf) throws YarnException;
|
||||
InetSocketAddress addr, Configuration conf);
|
||||
|
||||
public void stopClient(Object proxy);
|
||||
|
||||
|
|
|
@ -24,13 +24,11 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
|
||||
public interface RpcServerFactory {
|
||||
|
||||
public Server getServer(Class<?> protocol, Object instance,
|
||||
InetSocketAddress addr, Configuration conf,
|
||||
SecretManager<? extends TokenIdentifier> secretManager,
|
||||
int numHandlers, String portRangeConfig)
|
||||
throws YarnException;
|
||||
int numHandlers, String portRangeConfig);
|
||||
}
|
|
@ -45,7 +45,7 @@ public class RecordFactoryPBImpl implements RecordFactory {
|
|||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public <T> T newRecordInstance(Class<T> clazz) throws YarnException {
|
||||
public <T> T newRecordInstance(Class<T> clazz) {
|
||||
|
||||
Constructor<?> constructor = cache.get(clazz);
|
||||
if (constructor == null) {
|
||||
|
|
|
@ -50,7 +50,8 @@ public class RpcClientFactoryPBImpl implements RpcClientFactory {
|
|||
private RpcClientFactoryPBImpl() {
|
||||
}
|
||||
|
||||
public Object getClient(Class<?> protocol, long clientVersion, InetSocketAddress addr, Configuration conf) throws YarnException {
|
||||
public Object getClient(Class<?> protocol, long clientVersion,
|
||||
InetSocketAddress addr, Configuration conf) {
|
||||
|
||||
Constructor<?> constructor = cache.get(protocol);
|
||||
if (constructor == null) {
|
||||
|
|
|
@ -63,8 +63,7 @@ public class RpcServerFactoryPBImpl implements RpcServerFactory {
|
|||
|
||||
public Server getServer(Class<?> protocol, Object instance,
|
||||
InetSocketAddress addr, Configuration conf,
|
||||
SecretManager<? extends TokenIdentifier> secretManager, int numHandlers)
|
||||
throws YarnException {
|
||||
SecretManager<? extends TokenIdentifier> secretManager, int numHandlers) {
|
||||
return getServer(protocol, instance, addr, conf, secretManager, numHandlers,
|
||||
null);
|
||||
}
|
||||
|
@ -73,8 +72,7 @@ public class RpcServerFactoryPBImpl implements RpcServerFactory {
|
|||
public Server getServer(Class<?> protocol, Object instance,
|
||||
InetSocketAddress addr, Configuration conf,
|
||||
SecretManager<? extends TokenIdentifier> secretManager, int numHandlers,
|
||||
String portRangeConfig)
|
||||
throws YarnException {
|
||||
String portRangeConfig) {
|
||||
|
||||
Constructor<?> constructor = serviceCache.get(protocol);
|
||||
if (constructor == null) {
|
||||
|
|
|
@ -21,21 +21,16 @@ package org.apache.hadoop.yarn.factory.providers;
|
|||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.factories.RpcClientFactory;
|
||||
import org.apache.hadoop.yarn.factories.RpcServerFactory;
|
||||
import org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl;
|
||||
import org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl;
|
||||
|
||||
/**
|
||||
* A public static get() method must be present in the Client/Server Factory implementation.
|
||||
*/
|
||||
public class RpcFactoryProvider {
|
||||
private static final Log LOG = LogFactory.getLog(RpcFactoryProvider.class);
|
||||
|
||||
private RpcFactoryProvider() {
|
||||
|
||||
|
@ -46,29 +41,17 @@ public class RpcFactoryProvider {
|
|||
if (conf == null) {
|
||||
conf = new Configuration();
|
||||
}
|
||||
String serverFactoryClassName = conf.get(YarnConfiguration.IPC_SERVER_FACTORY);
|
||||
if (serverFactoryClassName == null) {
|
||||
if (conf.get(YarnConfiguration.IPC_SERIALIZER_TYPE, YarnConfiguration.DEFAULT_IPC_SERIALIZER_TYPE).equals(YarnConfiguration.DEFAULT_IPC_SERIALIZER_TYPE)) {
|
||||
return RpcServerFactoryPBImpl.get();
|
||||
} else {
|
||||
throw new YarnException("Unknown serializer: [" + conf.get(YarnConfiguration.IPC_SERIALIZER_TYPE) + "]. Use keys: [" + YarnConfiguration.IPC_CLIENT_FACTORY + "][" + YarnConfiguration.IPC_SERVER_FACTORY + "] to specify factories");
|
||||
}
|
||||
} else {
|
||||
return (RpcServerFactory) getFactoryClassInstance(serverFactoryClassName);
|
||||
}
|
||||
String serverFactoryClassName = conf.get(
|
||||
YarnConfiguration.IPC_SERVER_FACTORY_CLASS,
|
||||
YarnConfiguration.DEFAULT_IPC_SERVER_FACTORY_CLASS);
|
||||
return (RpcServerFactory) getFactoryClassInstance(serverFactoryClassName);
|
||||
}
|
||||
|
||||
|
||||
public static RpcClientFactory getClientFactory(Configuration conf) {
|
||||
String clientFactoryClassName = conf.get(YarnConfiguration.IPC_CLIENT_FACTORY);
|
||||
if (clientFactoryClassName == null) {
|
||||
if (conf.get(YarnConfiguration.IPC_SERIALIZER_TYPE, YarnConfiguration.DEFAULT_IPC_SERIALIZER_TYPE).equals(YarnConfiguration.DEFAULT_IPC_SERIALIZER_TYPE)) {
|
||||
return RpcClientFactoryPBImpl.get();
|
||||
} else {
|
||||
throw new YarnException("Unknown serializer: [" + conf.get(YarnConfiguration.IPC_SERIALIZER_TYPE) + "]. Use keys: [" + YarnConfiguration.IPC_CLIENT_FACTORY + "][" + YarnConfiguration.IPC_SERVER_FACTORY + "] to specify factories");
|
||||
}
|
||||
} else {
|
||||
return(RpcClientFactory) getFactoryClassInstance(clientFactoryClassName);
|
||||
}
|
||||
String clientFactoryClassName = conf.get(
|
||||
YarnConfiguration.IPC_CLIENT_FACTORY_CLASS,
|
||||
YarnConfiguration.DEFAULT_IPC_CLIENT_FACTORY_CLASS);
|
||||
return (RpcClientFactory) getFactoryClassInstance(clientFactoryClassName);
|
||||
}
|
||||
|
||||
private static Object getFactoryClassInstance(String factoryClassName) {
|
||||
|
|
|
@ -42,8 +42,11 @@ public class TestRpcFactoryProvider {
|
|||
serverFactory = RpcFactoryProvider.getServerFactory(conf);
|
||||
Assert.assertEquals(RpcClientFactoryPBImpl.class, clientFactory.getClass());
|
||||
Assert.assertEquals(RpcServerFactoryPBImpl.class, serverFactory.getClass());
|
||||
|
||||
conf.set(YarnConfiguration.IPC_SERIALIZER_TYPE, "writable");
|
||||
|
||||
conf.set(YarnConfiguration.IPC_CLIENT_FACTORY_CLASS, "unknown");
|
||||
conf.set(YarnConfiguration.IPC_SERVER_FACTORY_CLASS, "unknown");
|
||||
conf.set(YarnConfiguration.IPC_RECORD_FACTORY_CLASS, "unknown");
|
||||
|
||||
try {
|
||||
clientFactory = RpcFactoryProvider.getClientFactory(conf);
|
||||
Assert.fail("Expected an exception - unknown serializer");
|
||||
|
@ -56,8 +59,8 @@ public class TestRpcFactoryProvider {
|
|||
}
|
||||
|
||||
conf = new Configuration();
|
||||
conf.set(YarnConfiguration.IPC_CLIENT_FACTORY, "NonExistantClass");
|
||||
conf.set(YarnConfiguration.IPC_SERVER_FACTORY, RpcServerFactoryPBImpl.class.getName());
|
||||
conf.set(YarnConfiguration.IPC_CLIENT_FACTORY_CLASS, "NonExistantClass");
|
||||
conf.set(YarnConfiguration.IPC_SERVER_FACTORY_CLASS, RpcServerFactoryPBImpl.class.getName());
|
||||
|
||||
try {
|
||||
clientFactory = RpcFactoryProvider.getClientFactory(conf);
|
||||
|
|
|
@ -18,11 +18,13 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
|
@ -30,9 +32,7 @@ import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
|
||||
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class TestPBLocalizerRPC {
|
||||
|
||||
|
@ -40,7 +40,6 @@ public class TestPBLocalizerRPC {
|
|||
|
||||
static RecordFactory createPBRecordFactory() {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(YarnConfiguration.IPC_SERIALIZER_TYPE, "protocolbuffers");
|
||||
return RecordFactoryProvider.getRecordFactory(conf);
|
||||
}
|
||||
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.hadoop.io.DataOutputBuffer;
|
|||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalResourceStatusProto;
|
||||
|
@ -53,7 +52,6 @@ public class TestPBRecordImpl {
|
|||
|
||||
static RecordFactory createPBRecordFactory() {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(YarnConfiguration.IPC_SERIALIZER_TYPE, "protocolbuffers");
|
||||
return RecordFactoryProvider.getRecordFactory(conf);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue