HDFS-13399. [SBN read] Make Client field AlignmentContext non-static. Contributed by Plamen Jeliazkov.

This commit is contained in:
Plamen Jeliazkov 2018-06-04 14:58:47 -07:00 committed by Konstantin V Shvachko
parent a109f2b32f
commit e880660a20
13 changed files with 619 additions and 252 deletions

View File

@ -103,12 +103,6 @@ protected Boolean initialValue() {
return false; return false;
} }
}; };
private static AlignmentContext alignmentContext;
/** Set alignment context to use to fetch state alignment info from RPC. */
public static void setAlignmentContext(AlignmentContext ac) {
alignmentContext = ac;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Unstable @Unstable
@ -345,6 +339,7 @@ static class Call {
final RPC.RpcKind rpcKind; // Rpc EngineKind final RPC.RpcKind rpcKind; // Rpc EngineKind
boolean done; // true when call is done boolean done; // true when call is done
private final Object externalHandler; private final Object externalHandler;
private AlignmentContext alignmentContext;
private Call(RPC.RpcKind rpcKind, Writable param) { private Call(RPC.RpcKind rpcKind, Writable param) {
this.rpcKind = rpcKind; this.rpcKind = rpcKind;
@ -386,6 +381,15 @@ protected synchronized void callComplete() {
} }
} }
/**
* Set an AlignmentContext for the call to update when call is done.
*
* @param ac alignment context to update.
*/
public synchronized void setAlignmentContext(AlignmentContext ac) {
this.alignmentContext = ac;
}
/** Set the exception when there is an error. /** Set the exception when there is an error.
* Notify the caller the call is done. * Notify the caller the call is done.
* *
@ -1114,7 +1118,7 @@ public void sendRpcRequest(final Call call)
// Items '1' and '2' are prepared here. // Items '1' and '2' are prepared here.
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry, call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
clientId, alignmentContext); clientId, call.alignmentContext);
final ResponseBuffer buf = new ResponseBuffer(); final ResponseBuffer buf = new ResponseBuffer();
header.writeDelimitedTo(buf); header.writeDelimitedTo(buf);
@ -1191,9 +1195,9 @@ private void receiveRpcResponse() {
Writable value = packet.newInstance(valueClass, conf); Writable value = packet.newInstance(valueClass, conf);
final Call call = calls.remove(callId); final Call call = calls.remove(callId);
call.setRpcResponse(value); call.setRpcResponse(value);
} if (call.alignmentContext != null) {
if (alignmentContext != null) { call.alignmentContext.receiveResponseState(header);
alignmentContext.receiveResponseState(header); }
} }
// verify that packet length was correct // verify that packet length was correct
if (packet.remaining() > 0) { if (packet.remaining() > 0) {
@ -1374,7 +1378,15 @@ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, AtomicBoolean fallbackToSimpleAuth) ConnectionId remoteId, AtomicBoolean fallbackToSimpleAuth)
throws IOException { throws IOException {
return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT, return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT,
fallbackToSimpleAuth); fallbackToSimpleAuth, null);
}
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, AtomicBoolean fallbackToSimpleAuth,
AlignmentContext alignmentContext)
throws IOException {
return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT,
fallbackToSimpleAuth, alignmentContext);
} }
private void checkAsyncCall() throws IOException { private void checkAsyncCall() throws IOException {
@ -1391,6 +1403,14 @@ private void checkAsyncCall() throws IOException {
} }
} }
Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, int serviceClass,
AtomicBoolean fallbackToSimpleAuth)
throws IOException {
return call(rpcKind, rpcRequest, remoteId, serviceClass,
fallbackToSimpleAuth, null);
}
/** /**
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by * Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
* <code>remoteId</code>, returning the rpc response. * <code>remoteId</code>, returning the rpc response.
@ -1401,14 +1421,17 @@ private void checkAsyncCall() throws IOException {
* @param serviceClass - service class for RPC * @param serviceClass - service class for RPC
* @param fallbackToSimpleAuth - set to true or false during this method to * @param fallbackToSimpleAuth - set to true or false during this method to
* indicate if a secure client falls back to simple auth * indicate if a secure client falls back to simple auth
* @param alignmentContext - state alignment context
* @return the rpc response * @return the rpc response
* Throws exceptions if there are network problems or if the remote code * Throws exceptions if there are network problems or if the remote code
* threw an exception. * threw an exception.
*/ */
Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, int serviceClass, ConnectionId remoteId, int serviceClass,
AtomicBoolean fallbackToSimpleAuth) throws IOException { AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext)
throws IOException {
final Call call = createCall(rpcKind, rpcRequest); final Call call = createCall(rpcKind, rpcRequest);
call.setAlignmentContext(alignmentContext);
final Connection connection = getConnection(remoteId, call, serviceClass, final Connection connection = getConnection(remoteId, call, serviceClass,
fallbackToSimpleAuth); fallbackToSimpleAuth);

View File

@ -86,7 +86,7 @@ public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy
) throws IOException { ) throws IOException {
return getProxy(protocol, clientVersion, addr, ticket, conf, factory, return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
rpcTimeout, connectionRetryPolicy, null); rpcTimeout, connectionRetryPolicy, null, null);
} }
@Override @Override
@ -94,10 +94,12 @@ public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth) throws IOException { AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext)
throws IOException {
final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory, final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth); rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth,
alignmentContext);
return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance( return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[]{protocol}, invoker), false); protocol.getClassLoader(), new Class[]{protocol}, invoker), false);
} }
@ -122,15 +124,18 @@ private static class Invoker implements RpcInvocationHandler {
private final long clientProtocolVersion; private final long clientProtocolVersion;
private final String protocolName; private final String protocolName;
private AtomicBoolean fallbackToSimpleAuth; private AtomicBoolean fallbackToSimpleAuth;
private AlignmentContext alignmentContext;
private Invoker(Class<?> protocol, InetSocketAddress addr, private Invoker(Class<?> protocol, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf, SocketFactory factory, UserGroupInformation ticket, Configuration conf, SocketFactory factory,
int rpcTimeout, RetryPolicy connectionRetryPolicy, int rpcTimeout, RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth) throws IOException { AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext)
throws IOException {
this(protocol, Client.ConnectionId.getConnectionId( this(protocol, Client.ConnectionId.getConnectionId(
addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf), addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf),
conf, factory); conf, factory);
this.fallbackToSimpleAuth = fallbackToSimpleAuth; this.fallbackToSimpleAuth = fallbackToSimpleAuth;
this.alignmentContext = alignmentContext;
} }
/** /**
@ -227,7 +232,7 @@ public Message invoke(Object proxy, final Method method, Object[] args)
try { try {
val = (RpcWritable.Buffer) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER, val = (RpcWritable.Buffer) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
new RpcProtobufRequest(rpcRequestHeader, theRequest), remoteId, new RpcProtobufRequest(rpcRequestHeader, theRequest), remoteId,
fallbackToSimpleAuth); fallbackToSimpleAuth, alignmentContext);
} catch (Throwable e) { } catch (Throwable e) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {

View File

@ -586,7 +586,44 @@ public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
} }
return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion, return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion,
addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy, addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy,
fallbackToSimpleAuth); fallbackToSimpleAuth, null);
}
/**
* Get a protocol proxy that contains a proxy connection to a remote server
* and a set of methods that are supported by the server
*
* @param protocol protocol
* @param clientVersion client's version
* @param addr server address
* @param ticket security ticket
* @param conf configuration
* @param factory socket factory
* @param rpcTimeout max time for each rpc; 0 means no timeout
* @param connectionRetryPolicy retry policy
* @param fallbackToSimpleAuth set to true or false during calls to indicate
* if a secure client falls back to simple auth
* @param alignmentContext state alignment context
* @return the proxy
* @throws IOException if any error occurs
*/
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr,
UserGroupInformation ticket,
Configuration conf,
SocketFactory factory,
int rpcTimeout,
RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth,
AlignmentContext alignmentContext)
throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
}
return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion,
addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy,
fallbackToSimpleAuth, alignmentContext);
} }
/** /**

View File

@ -50,7 +50,8 @@ <T> ProtocolProxy<T> getProxy(Class<T> protocol,
UserGroupInformation ticket, Configuration conf, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout, SocketFactory factory, int rpcTimeout,
RetryPolicy connectionRetryPolicy, RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth) throws IOException; AtomicBoolean fallbackToSimpleAuth,
AlignmentContext alignmentContext) throws IOException;
/** /**
* Construct a server for a protocol implementation instance. * Construct a server for a protocol implementation instance.

View File

@ -856,10 +856,15 @@ private class RpcCall extends Call {
final Writable rpcRequest; // Serialized Rpc request from client final Writable rpcRequest; // Serialized Rpc request from client
ByteBuffer rpcResponse; // the response for this call ByteBuffer rpcResponse; // the response for this call
private RpcResponseHeaderProto bufferedHeader; // the response header
private Writable bufferedRv; // the byte response
RpcCall(RpcCall call) { RpcCall(RpcCall call) {
super(call); super(call);
this.connection = call.connection; this.connection = call.connection;
this.rpcRequest = call.rpcRequest; this.rpcRequest = call.rpcRequest;
this.bufferedRv = call.bufferedRv;
this.bufferedHeader = call.bufferedHeader;
} }
RpcCall(Connection connection, int id) { RpcCall(Connection connection, int id) {
@ -880,6 +885,14 @@ private class RpcCall extends Call {
this.rpcRequest = param; this.rpcRequest = param;
} }
public void setBufferedHeader(RpcResponseHeaderProto header) {
this.bufferedHeader = header;
}
public void setBufferedRv(Writable rv) {
this.bufferedRv = rv;
}
@Override @Override
public String getProtocol() { public String getProtocol() {
return "rpc"; return "rpc";
@ -968,6 +981,13 @@ void doResponse(Throwable t) throws IOException {
setupResponse(call, setupResponse(call,
RpcStatusProto.FATAL, RpcErrorCodeProto.ERROR_RPC_SERVER, RpcStatusProto.FATAL, RpcErrorCodeProto.ERROR_RPC_SERVER,
null, t.getClass().getName(), StringUtils.stringifyException(t)); null, t.getClass().getName(), StringUtils.stringifyException(t));
} else if (alignmentContext != null) {
// rebuild response with state context in header
RpcResponseHeaderProto.Builder responseHeader =
call.bufferedHeader.toBuilder();
alignmentContext.updateResponseState(responseHeader);
RpcResponseHeaderProto builtHeader = responseHeader.build();
setupResponse(call, builtHeader, call.bufferedRv);
} }
connection.sendResponse(call); connection.sendResponse(call);
} }
@ -2992,9 +3012,6 @@ private void setupResponse(
headerBuilder.setRetryCount(call.retryCount); headerBuilder.setRetryCount(call.retryCount);
headerBuilder.setStatus(status); headerBuilder.setStatus(status);
headerBuilder.setServerIpcVersionNum(CURRENT_VERSION); headerBuilder.setServerIpcVersionNum(CURRENT_VERSION);
if(alignmentContext != null) {
alignmentContext.updateResponseState(headerBuilder);
}
if (status == RpcStatusProto.SUCCESS) { if (status == RpcStatusProto.SUCCESS) {
RpcResponseHeaderProto header = headerBuilder.build(); RpcResponseHeaderProto header = headerBuilder.build();
@ -3021,6 +3038,12 @@ private void setupResponse(
private void setupResponse(RpcCall call, private void setupResponse(RpcCall call,
RpcResponseHeaderProto header, Writable rv) throws IOException { RpcResponseHeaderProto header, Writable rv) throws IOException {
if (alignmentContext != null && call.bufferedHeader == null
&& call.bufferedRv == null) {
call.setBufferedHeader(header);
call.setBufferedRv(rv);
}
final byte[] response; final byte[] response;
if (rv == null || (rv instanceof RpcWritable.ProtobufWrapper)) { if (rv == null || (rv instanceof RpcWritable.ProtobufWrapper)) {
response = setupResponseForProtobuf(header, rv); response = setupResponseForProtobuf(header, rv);

View File

@ -214,16 +214,19 @@ private static class Invoker implements RpcInvocationHandler {
private Client client; private Client client;
private boolean isClosed = false; private boolean isClosed = false;
private final AtomicBoolean fallbackToSimpleAuth; private final AtomicBoolean fallbackToSimpleAuth;
private final AlignmentContext alignmentContext;
public Invoker(Class<?> protocol, public Invoker(Class<?> protocol,
InetSocketAddress address, UserGroupInformation ticket, InetSocketAddress address, UserGroupInformation ticket,
Configuration conf, SocketFactory factory, Configuration conf, SocketFactory factory,
int rpcTimeout, AtomicBoolean fallbackToSimpleAuth) int rpcTimeout, AtomicBoolean fallbackToSimpleAuth,
AlignmentContext alignmentContext)
throws IOException { throws IOException {
this.remoteId = Client.ConnectionId.getConnectionId(address, protocol, this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
ticket, rpcTimeout, null, conf); ticket, rpcTimeout, null, conf);
this.client = CLIENTS.getClient(conf, factory); this.client = CLIENTS.getClient(conf, factory);
this.fallbackToSimpleAuth = fallbackToSimpleAuth; this.fallbackToSimpleAuth = fallbackToSimpleAuth;
this.alignmentContext = alignmentContext;
} }
@Override @Override
@ -246,7 +249,7 @@ public Object invoke(Object proxy, Method method, Object[] args)
try { try {
value = (ObjectWritable) value = (ObjectWritable)
client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args), client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args),
remoteId, fallbackToSimpleAuth); remoteId, fallbackToSimpleAuth, alignmentContext);
} finally { } finally {
if (traceScope != null) traceScope.close(); if (traceScope != null) traceScope.close();
} }
@ -289,7 +292,7 @@ public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
int rpcTimeout, RetryPolicy connectionRetryPolicy) int rpcTimeout, RetryPolicy connectionRetryPolicy)
throws IOException { throws IOException {
return getProxy(protocol, clientVersion, addr, ticket, conf, factory, return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
rpcTimeout, connectionRetryPolicy, null); rpcTimeout, connectionRetryPolicy, null, null);
} }
/** Construct a client-side proxy object that implements the named protocol, /** Construct a client-side proxy object that implements the named protocol,
@ -301,7 +304,8 @@ public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory, Configuration conf, SocketFactory factory,
int rpcTimeout, RetryPolicy connectionRetryPolicy, int rpcTimeout, RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth) AtomicBoolean fallbackToSimpleAuth,
AlignmentContext alignmentContext)
throws IOException { throws IOException {
if (connectionRetryPolicy != null) { if (connectionRetryPolicy != null) {
@ -311,7 +315,7 @@ public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(), T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
factory, rpcTimeout, fallbackToSimpleAuth)); factory, rpcTimeout, fallbackToSimpleAuth, alignmentContext));
return new ProtocolProxy<T>(protocol, proxy, true); return new ProtocolProxy<T>(protocol, proxy, true);
} }

View File

@ -278,7 +278,7 @@ public <T> ProtocolProxy<T> getProxy(
SocketFactory factory, int rpcTimeout, SocketFactory factory, int rpcTimeout,
RetryPolicy connectionRetryPolicy) throws IOException { RetryPolicy connectionRetryPolicy) throws IOException {
return getProxy(protocol, clientVersion, addr, ticket, conf, factory, return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
rpcTimeout, connectionRetryPolicy, null); rpcTimeout, connectionRetryPolicy, null, null);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -287,7 +287,8 @@ public <T> ProtocolProxy<T> getProxy(
Class<T> protocol, long clientVersion, InetSocketAddress addr, Class<T> protocol, long clientVersion, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf, SocketFactory factory, UserGroupInformation ticket, Configuration conf, SocketFactory factory,
int rpcTimeout, RetryPolicy connectionRetryPolicy, int rpcTimeout, RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth) throws IOException { AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext)
throws IOException {
T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(), T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol }, new StoppedInvocationHandler()); new Class[] { protocol }, new StoppedInvocationHandler());
return new ProtocolProxy<T>(protocol, proxy, false); return new ProtocolProxy<T>(protocol, proxy, false);

View File

@ -166,7 +166,6 @@
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.LossyRetryInvocationHandler; import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.RetriableException;
@ -242,7 +241,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
private static volatile ThreadPoolExecutor STRIPED_READ_THREAD_POOL; private static volatile ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
private final int smallBufferSize; private final int smallBufferSize;
private final long serverDefaultsValidityPeriod; private final long serverDefaultsValidityPeriod;
private final ClientGCIContext alignmentContext;
public DfsClientConf getConf() { public DfsClientConf getConf() {
return dfsClientConf; return dfsClientConf;
@ -398,8 +396,6 @@ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
this.saslClient = new SaslDataTransferClient( this.saslClient = new SaslDataTransferClient(
conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth); TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
this.alignmentContext = new ClientGCIContext();
Client.setAlignmentContext(alignmentContext);
} }
/** /**
@ -548,11 +544,6 @@ public boolean isClientRunning() {
return clientRunning; return clientRunning;
} }
@VisibleForTesting
ClientGCIContext getAlignmentContext() {
return alignmentContext;
}
long getLastLeaseRenewal() { long getLastLeaseRenewal() {
return lastLeaseRenewal; return lastLeaseRenewal;
} }

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory; import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory;
import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory; import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory;
import org.apache.hadoop.ipc.AlignmentContext;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -337,6 +338,15 @@ public static ClientProtocol createNonHAProxyWithClientProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi, InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
boolean withRetries, AtomicBoolean fallbackToSimpleAuth) boolean withRetries, AtomicBoolean fallbackToSimpleAuth)
throws IOException { throws IOException {
return createProxyWithAlignmentContext(address, conf, ugi, withRetries,
fallbackToSimpleAuth, null);
}
public static ClientProtocol createProxyWithAlignmentContext(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
boolean withRetries, AtomicBoolean fallbackToSimpleAuth,
AlignmentContext alignmentContext)
throws IOException {
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
ProtobufRpcEngine.class); ProtobufRpcEngine.class);
@ -354,7 +364,7 @@ public static ClientProtocol createNonHAProxyWithClientProtocol(
ClientNamenodeProtocolPB.class, version, address, ugi, conf, ClientNamenodeProtocolPB.class, version, address, ugi, conf,
NetUtils.getDefaultSocketFactory(conf), NetUtils.getDefaultSocketFactory(conf),
org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy, org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
fallbackToSimpleAuth).getProxy(); fallbackToSimpleAuth, alignmentContext).getProxy();
if (withRetries) { // create the proxy with retries if (withRetries) { // create the proxy with retries
Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<>(); Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<>();

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.hdfs.HAUtilClient; import org.apache.hadoop.hdfs.HAUtilClient;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.io.retry.FailoverProxyProvider; import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -106,7 +107,11 @@ public synchronized AtomicBoolean getFallbackToSimpleAuth() {
return fallbackToSimpleAuth; return fallbackToSimpleAuth;
} }
/** public synchronized AlignmentContext getAlignmentContext() {
return null; // by default the context is null
}
/**
* ProxyInfo to a NameNode. Includes its address. * ProxyInfo to a NameNode. Includes its address.
*/ */
public static class NNProxyInfo<T> extends ProxyInfo<T> { public static class NNProxyInfo<T> extends ProxyInfo<T> {

View File

@ -19,6 +19,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.NameNodeProxiesClient; import org.apache.hadoop.hdfs.NameNodeProxiesClient;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException; import java.io.IOException;
@ -26,11 +27,22 @@
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
public class ClientHAProxyFactory<T> implements HAProxyFactory<T> { public class ClientHAProxyFactory<T> implements HAProxyFactory<T> {
private AlignmentContext alignmentContext;
public void setAlignmentContext(AlignmentContext alignmentContext) {
this.alignmentContext = alignmentContext;
}
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public T createProxy(Configuration conf, InetSocketAddress nnAddr, public T createProxy(Configuration conf, InetSocketAddress nnAddr,
Class<T> xface, UserGroupInformation ugi, boolean withRetries, Class<T> xface, UserGroupInformation ugi, boolean withRetries,
AtomicBoolean fallbackToSimpleAuth) throws IOException { AtomicBoolean fallbackToSimpleAuth) throws IOException {
if (alignmentContext != null) {
return (T) NameNodeProxiesClient.createProxyWithAlignmentContext(
nnAddr, conf, ugi, false, fallbackToSimpleAuth, alignmentContext);
}
return (T) NameNodeProxiesClient.createNonHAProxyWithClientProtocol( return (T) NameNodeProxiesClient.createNonHAProxyWithClientProtocol(
nnAddr, conf, ugi, false, fallbackToSimpleAuth); nnAddr, conf, ugi, false, fallbackToSimpleAuth);
} }

View File

@ -1,212 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertThat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Class is used to test server sending state alignment information to clients
* via RPC and likewise clients receiving and updating their last known
* state alignment info.
* These tests check that after a single RPC call a client will have caught up
* to the most recent alignment state of the server.
*/
public class TestStateAlignmentContext {
static final long BLOCK_SIZE = 64 * 1024;
private static final int NUMDATANODES = 3;
private static final Configuration CONF = new HdfsConfiguration();
private static MiniDFSCluster cluster;
private static DistributedFileSystem dfs;
@BeforeClass
public static void startUpCluster() throws IOException {
// disable block scanner
CONF.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
// Set short retry timeouts so this test runs faster
CONF.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
CONF.setBoolean("fs.hdfs.impl.disable.cache", true);
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(NUMDATANODES)
.build();
cluster.waitActive();
}
@Before
public void before() throws IOException {
dfs = cluster.getFileSystem();
}
@AfterClass
public static void shutDownCluster() throws IOException {
if (dfs != null) {
dfs.close();
dfs = null;
}
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
@After
public void after() throws IOException {
dfs.close();
}
/**
* This test checks if after a client writes we can see the state id in
* updated via the response.
*/
@Test
public void testStateTransferOnWrite() throws Exception {
long preWriteState = cluster.getNamesystem().getLastWrittenTransactionId();
DFSTestUtil.writeFile(dfs, new Path("/testFile1"), "abc");
long clientState = dfs.dfs.getAlignmentContext().getLastSeenStateId();
long postWriteState = cluster.getNamesystem().getLastWrittenTransactionId();
// Write(s) should have increased state. Check for greater than.
assertThat(clientState > preWriteState, is(true));
// Client and server state should be equal.
assertThat(clientState, is(postWriteState));
}
/**
* This test checks if after a client reads we can see the state id in
* updated via the response.
*/
@Test
public void testStateTransferOnRead() throws Exception {
DFSTestUtil.writeFile(dfs, new Path("/testFile2"), "123");
long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId();
DFSTestUtil.readFile(dfs, new Path("/testFile2"));
// Read should catch client up to last written state.
long clientState = dfs.dfs.getAlignmentContext().getLastSeenStateId();
assertThat(clientState, is(lastWrittenId));
}
/**
* This test checks that a fresh client starts with no state and becomes
* updated of state from RPC call.
*/
@Test
public void testStateTransferOnFreshClient() throws Exception {
DFSTestUtil.writeFile(dfs, new Path("/testFile3"), "ezpz");
long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId();
try (DistributedFileSystem clearDfs =
(DistributedFileSystem) FileSystem.get(CONF)) {
ClientGCIContext clientState = clearDfs.dfs.getAlignmentContext();
assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE));
DFSTestUtil.readFile(clearDfs, new Path("/testFile3"));
assertThat(clientState.getLastSeenStateId(), is(lastWrittenId));
}
}
/**
* This test mocks an AlignmentContext and ensures that DFSClient
* writes its lastSeenStateId into RPC requests.
*/
@Test
public void testClientSendsState() throws Exception {
AlignmentContext alignmentContext = dfs.dfs.getAlignmentContext();
AlignmentContext spiedAlignContext = Mockito.spy(alignmentContext);
Client.setAlignmentContext(spiedAlignContext);
// Collect RpcRequestHeaders for verification later.
final List<RpcHeaderProtos.RpcRequestHeaderProto.Builder> collectedHeaders =
new ArrayList<>();
Mockito.doAnswer(a -> {
Object[] arguments = a.getArguments();
RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
(RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
collectedHeaders.add(header);
return a.callRealMethod();
}).when(spiedAlignContext).updateRequestState(Mockito.any());
DFSTestUtil.writeFile(dfs, new Path("/testFile4"), "shv");
// Ensure first header and last header have different state.
assertThat(collectedHeaders.size() > 1, is(true));
assertThat(collectedHeaders.get(0).getStateId(),
is(not(collectedHeaders.get(collectedHeaders.size() - 1))));
// Ensure collected RpcRequestHeaders are in increasing order.
long lastHeader = collectedHeaders.get(0).getStateId();
for(RpcHeaderProtos.RpcRequestHeaderProto.Builder header :
collectedHeaders.subList(1, collectedHeaders.size())) {
long currentHeader = header.getStateId();
assertThat(currentHeader >= lastHeader, is(true));
lastHeader = header.getStateId();
}
}
/**
* This test mocks an AlignmentContext to send stateIds greater than
* server's stateId in RPC requests.
*/
@Test
public void testClientSendsGreaterState() throws Exception {
AlignmentContext alignmentContext = dfs.dfs.getAlignmentContext();
AlignmentContext spiedAlignContext = Mockito.spy(alignmentContext);
Client.setAlignmentContext(spiedAlignContext);
// Make every client call have a stateId > server's stateId.
Mockito.doAnswer(a -> {
Object[] arguments = a.getArguments();
RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
(RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
try {
return a.callRealMethod();
} finally {
header.setStateId(Long.MAX_VALUE);
}
}).when(spiedAlignContext).updateRequestState(Mockito.any());
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.LOG);
DFSTestUtil.writeFile(dfs, new Path("/testFile4"), "shv");
logCapturer.stopCapturing();
String output = logCapturer.getOutput();
assertThat(output, containsString("A client sent stateId: "));
}
}

View File

@ -0,0 +1,467 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertThat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* Class is used to test server sending state alignment information to clients
* via RPC and likewise clients receiving and updating their last known
* state alignment info.
* These tests check that after a single RPC call a client will have caught up
* to the most recent alignment state of the server.
*/
public class TestStateAlignmentContextWithHA {
private static final int NUMDATANODES = 1;
private static final int NUMCLIENTS = 10;
private static final int NUMFILES = 300;
private static final Configuration CONF = new HdfsConfiguration();
private static final String NAMESERVICE = "nameservice";
private static final List<ClientGCIContext> AC_LIST = new ArrayList<>();
private static MiniDFSCluster cluster;
private static List<Worker> clients;
private static ClientGCIContext spy;
private DistributedFileSystem dfs;
private int active = 0;
private int standby = 1;
static class AlignmentContextProxyProvider<T>
extends ConfiguredFailoverProxyProvider<T> {
private ClientGCIContext alignmentContext;
public AlignmentContextProxyProvider(
Configuration conf, URI uri, Class<T> xface,
HAProxyFactory<T> factory) throws IOException {
super(conf, uri, xface, factory);
// Create and set AlignmentContext in HAProxyFactory.
// All proxies by factory will now have AlignmentContext assigned.
this.alignmentContext = (spy != null ? spy : new ClientGCIContext());
((ClientHAProxyFactory) factory).setAlignmentContext(alignmentContext);
AC_LIST.add(alignmentContext);
}
@Override // AbstractNNFailoverProxyProvider
public synchronized ClientGCIContext getAlignmentContext() {
return this.alignmentContext;
}
}
static class SpyConfiguredContextProxyProvider<T>
extends ConfiguredFailoverProxyProvider<T> {
private ClientGCIContext alignmentContext;
public SpyConfiguredContextProxyProvider(
Configuration conf, URI uri, Class<T> xface,
HAProxyFactory<T> factory) throws IOException {
super(conf, uri, xface, factory);
// Create but DON'T set in HAProxyFactory.
this.alignmentContext = (spy != null ? spy : new ClientGCIContext());
AC_LIST.add(alignmentContext);
}
}
@BeforeClass
public static void startUpCluster() throws IOException {
// disable block scanner
CONF.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
// Set short retry timeouts so this test runs faster
CONF.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
CONF.setBoolean("fs.hdfs.impl.disable.cache", true);
MiniDFSNNTopology.NSConf nsConf = new MiniDFSNNTopology.NSConf(NAMESERVICE);
nsConf.addNN(new MiniDFSNNTopology.NNConf("nn1"));
nsConf.addNN(new MiniDFSNNTopology.NNConf("nn2"));
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(NUMDATANODES)
.nnTopology(MiniDFSNNTopology.simpleHATopology().addNameservice(nsConf))
.build();
cluster.waitActive();
cluster.transitionToActive(0);
}
@Before
public void before() throws IOException, URISyntaxException {
killWorkers();
HATestUtil.setFailoverConfigurations(cluster, CONF, NAMESERVICE, 0);
CONF.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX +
"." + NAMESERVICE, AlignmentContextProxyProvider.class.getName());
dfs = (DistributedFileSystem) FileSystem.get(CONF);
}
@AfterClass
public static void shutDownCluster() throws IOException {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
@After
public void after() throws IOException {
cluster.transitionToStandby(1);
cluster.transitionToActive(0);
active = 0;
standby = 1;
if (dfs != null) {
dfs.close();
dfs = null;
}
AC_LIST.clear();
spy = null;
}
/**
* This test checks if after a client writes we can see the state id in
* updated via the response.
*/
@Test
public void testNoStateOnConfiguredProxyProvider() throws Exception {
Configuration confCopy = new Configuration(CONF);
confCopy.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX +
"." + NAMESERVICE, SpyConfiguredContextProxyProvider.class.getName());
try (DistributedFileSystem clearDfs =
(DistributedFileSystem) FileSystem.get(confCopy)) {
ClientGCIContext clientState = getContext(1);
assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE));
DFSTestUtil.writeFile(clearDfs, new Path("/testFileNoState"), "no_state");
assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE));
}
}
/**
* This test checks if after a client writes we can see the state id in
* updated via the response.
*/
@Test
public void testStateTransferOnWrite() throws Exception {
long preWriteState =
cluster.getNamesystem(active).getLastWrittenTransactionId();
DFSTestUtil.writeFile(dfs, new Path("/testFile1"), "abc");
long clientState = getContext(0).getLastSeenStateId();
long postWriteState =
cluster.getNamesystem(active).getLastWrittenTransactionId();
// Write(s) should have increased state. Check for greater than.
assertThat(clientState > preWriteState, is(true));
// Client and server state should be equal.
assertThat(clientState, is(postWriteState));
}
/**
* This test checks if after a client reads we can see the state id in
* updated via the response.
*/
@Test
public void testStateTransferOnRead() throws Exception {
DFSTestUtil.writeFile(dfs, new Path("/testFile2"), "123");
long lastWrittenId =
cluster.getNamesystem(active).getLastWrittenTransactionId();
DFSTestUtil.readFile(dfs, new Path("/testFile2"));
// Read should catch client up to last written state.
long clientState = getContext(0).getLastSeenStateId();
assertThat(clientState, is(lastWrittenId));
}
/**
* This test checks that a fresh client starts with no state and becomes
* updated of state from RPC call.
*/
@Test
public void testStateTransferOnFreshClient() throws Exception {
DFSTestUtil.writeFile(dfs, new Path("/testFile3"), "ezpz");
long lastWrittenId =
cluster.getNamesystem(active).getLastWrittenTransactionId();
try (DistributedFileSystem clearDfs =
(DistributedFileSystem) FileSystem.get(CONF)) {
ClientGCIContext clientState = getContext(1);
assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE));
DFSTestUtil.readFile(clearDfs, new Path("/testFile3"));
assertThat(clientState.getLastSeenStateId(), is(lastWrittenId));
}
}
/**
* This test mocks an AlignmentContext and ensures that DFSClient
* writes its lastSeenStateId into RPC requests.
*/
@Test
public void testClientSendsState() throws Exception {
ClientGCIContext alignmentContext = new ClientGCIContext();
ClientGCIContext spiedAlignContext = Mockito.spy(alignmentContext);
spy = spiedAlignContext;
try (DistributedFileSystem clearDfs =
(DistributedFileSystem) FileSystem.get(CONF)) {
// Collect RpcRequestHeaders for verification later.
final List<RpcHeaderProtos.RpcRequestHeaderProto.Builder> headers =
new ArrayList<>();
Mockito.doAnswer(a -> {
Object[] arguments = a.getArguments();
RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
(RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
headers.add(header);
return a.callRealMethod();
}).when(spiedAlignContext).updateRequestState(Mockito.any());
DFSTestUtil.writeFile(clearDfs, new Path("/testFile4"), "shv");
// Ensure first header and last header have different state.
assertThat(headers.size() > 1, is(true));
assertThat(headers.get(0).getStateId(),
is(not(headers.get(headers.size() - 1))));
// Ensure collected RpcRequestHeaders are in increasing order.
long lastHeader = headers.get(0).getStateId();
for (RpcHeaderProtos.RpcRequestHeaderProto.Builder header :
headers.subList(1, headers.size())) {
long currentHeader = header.getStateId();
assertThat(currentHeader >= lastHeader, is(true));
lastHeader = header.getStateId();
}
}
}
/**
* This test mocks an AlignmentContext to send stateIds greater than
* server's stateId in RPC requests.
*/
@Test
public void testClientSendsGreaterState() throws Exception {
ClientGCIContext alignmentContext = new ClientGCIContext();
ClientGCIContext spiedAlignContext = Mockito.spy(alignmentContext);
spy = spiedAlignContext;
try (DistributedFileSystem clearDfs =
(DistributedFileSystem) FileSystem.get(CONF)) {
// Make every client call have a stateId > server's stateId.
Mockito.doAnswer(a -> {
Object[] arguments = a.getArguments();
RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
(RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
try {
return a.callRealMethod();
} finally {
header.setStateId(Long.MAX_VALUE);
}
}).when(spiedAlignContext).updateRequestState(Mockito.any());
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.LOG);
DFSTestUtil.writeFile(clearDfs, new Path("/testFile4"), "shv");
logCapturer.stopCapturing();
String output = logCapturer.getOutput();
assertThat(output, containsString("A client sent stateId: "));
}
}
/**
* This test checks if after a client writes we can see the state id in
* updated via the response.
*/
@Test
public void testStateTransferOnWriteWithFailover() throws Exception {
long preWriteState =
cluster.getNamesystem(active).getLastWrittenTransactionId();
// Write using HA client.
DFSTestUtil.writeFile(dfs, new Path("/testFile1FO"), "123");
long clientState = getContext(0).getLastSeenStateId();
long postWriteState =
cluster.getNamesystem(active).getLastWrittenTransactionId();
// Write(s) should have increased state. Check for greater than.
assertThat(clientState > preWriteState, is(true));
// Client and server state should be equal.
assertThat(clientState, is(postWriteState));
// Failover NameNode.
failOver();
// Write using HA client.
DFSTestUtil.writeFile(dfs, new Path("/testFile2FO"), "456");
long clientStateFO = getContext(0).getLastSeenStateId();
long writeStateFO =
cluster.getNamesystem(active).getLastWrittenTransactionId();
// Write(s) should have increased state. Check for greater than.
assertThat(clientStateFO > postWriteState, is(true));
// Client and server state should be equal.
assertThat(clientStateFO, is(writeStateFO));
}
@Test(timeout=300000)
public void testMultiClientStatesWithRandomFailovers() throws Exception {
// We want threads to run during failovers; assuming at minimum 4 cores,
// would like to see 2 clients competing against 2 NameNodes.
ExecutorService execService = Executors.newFixedThreadPool(2);
clients = new ArrayList<>(NUMCLIENTS);
for (int i = 1; i <= NUMCLIENTS; i++) {
DistributedFileSystem haClient =
(DistributedFileSystem) FileSystem.get(CONF);
clients.add(new Worker(haClient, NUMFILES, "/testFile3FO_", i));
}
// Execute workers in threadpool with random failovers.
List<Future<STATE>> futures = submitAll(execService, clients);
execService.shutdown();
boolean finished = false;
while (!finished) {
failOver();
finished = execService.awaitTermination(1L, TimeUnit.SECONDS);
}
// Validation.
for (Future<STATE> future : futures) {
assertThat(future.get(), is(STATE.SUCCESS));
}
}
private ClientGCIContext getContext(int clientCreationIndex) {
return AC_LIST.get(clientCreationIndex);
}
private void failOver() throws IOException {
cluster.transitionToStandby(active);
cluster.transitionToActive(standby);
int tempActive = active;
active = standby;
standby = tempActive;
}
/* Executor.invokeAll() is blocking so utilizing submit instead. */
private static List<Future<STATE>> submitAll(ExecutorService executor,
Collection<Worker> calls) {
List<Future<STATE>> futures = new ArrayList<>(calls.size());
for (Worker call : calls) {
Future<STATE> future = executor.submit(call);
futures.add(future);
}
return futures;
}
private void killWorkers() throws IOException {
if (clients != null) {
for(Worker worker : clients) {
worker.kill();
}
clients = null;
}
}
private enum STATE { SUCCESS, FAIL, ERROR }
private class Worker implements Callable<STATE> {
private final DistributedFileSystem client;
private final int filesToMake;
private String filePath;
private final int nonce;
Worker(DistributedFileSystem client,
int filesToMake,
String filePath,
int nonce) {
this.client = client;
this.filesToMake = filesToMake;
this.filePath = filePath;
this.nonce = nonce;
}
@Override
public STATE call() {
try {
for (int i = 0; i < filesToMake; i++) {
long preClientStateFO =
getContext(nonce).getLastSeenStateId();
// Write using HA client.
Path path = new Path(filePath + nonce + i);
DFSTestUtil.writeFile(client, path, "erk");
long postClientStateFO =
getContext(nonce).getLastSeenStateId();
// Write(s) should have increased state. Check for greater than.
if (postClientStateFO <= preClientStateFO) {
System.out.println("FAIL: Worker started with: " +
preClientStateFO + ", but finished with: " + postClientStateFO);
return STATE.FAIL;
}
}
client.close();
return STATE.SUCCESS;
} catch (IOException e) {
System.out.println("ERROR: Worker failed with: " + e);
return STATE.ERROR;
}
}
public void kill() throws IOException {
client.dfs.closeAllFilesBeingWritten(true);
client.dfs.closeOutputStreams(true);
client.dfs.closeConnectionToNamenode();
client.dfs.close();
client.close();
}
}
}