HDFS-13399. [SBN read] Make Client field AlignmentContext non-static. Contributed by Plamen Jeliazkov.

This commit is contained in:
Plamen Jeliazkov 2018-06-04 14:58:47 -07:00 committed by Chen Liang
parent dbf777e4a5
commit a3521c53fe
13 changed files with 620 additions and 253 deletions

View File

@ -103,12 +103,6 @@ public class Client implements AutoCloseable {
return false;
}
};
private static AlignmentContext alignmentContext;
/** Set alignment context to use to fetch state alignment info from RPC. */
public static void setAlignmentContext(AlignmentContext ac) {
alignmentContext = ac;
}
@SuppressWarnings("unchecked")
@Unstable
@ -345,6 +339,7 @@ public class Client implements AutoCloseable {
final RPC.RpcKind rpcKind; // Rpc EngineKind
boolean done; // true when call is done
private final Object externalHandler;
private AlignmentContext alignmentContext;
private Call(RPC.RpcKind rpcKind, Writable param) {
this.rpcKind = rpcKind;
@ -386,6 +381,15 @@ public class Client implements AutoCloseable {
}
}
/**
* Set an AlignmentContext for the call to update when call is done.
*
* @param ac alignment context to update.
*/
public synchronized void setAlignmentContext(AlignmentContext ac) {
this.alignmentContext = ac;
}
/** Set the exception when there is an error.
* Notify the caller the call is done.
*
@ -1114,7 +1118,7 @@ public class Client implements AutoCloseable {
// Items '1' and '2' are prepared here.
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
clientId, alignmentContext);
clientId, call.alignmentContext);
final ResponseBuffer buf = new ResponseBuffer();
header.writeDelimitedTo(buf);
@ -1191,9 +1195,9 @@ public class Client implements AutoCloseable {
Writable value = packet.newInstance(valueClass, conf);
final Call call = calls.remove(callId);
call.setRpcResponse(value);
}
if (alignmentContext != null) {
alignmentContext.receiveResponseState(header);
if (call.alignmentContext != null) {
call.alignmentContext.receiveResponseState(header);
}
}
// verify that packet length was correct
if (packet.remaining() > 0) {
@ -1374,7 +1378,15 @@ public class Client implements AutoCloseable {
ConnectionId remoteId, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT,
fallbackToSimpleAuth);
fallbackToSimpleAuth, null);
}
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, AtomicBoolean fallbackToSimpleAuth,
AlignmentContext alignmentContext)
throws IOException {
return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT,
fallbackToSimpleAuth, alignmentContext);
}
private void checkAsyncCall() throws IOException {
@ -1391,6 +1403,14 @@ public class Client implements AutoCloseable {
}
}
Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, int serviceClass,
AtomicBoolean fallbackToSimpleAuth)
throws IOException {
return call(rpcKind, rpcRequest, remoteId, serviceClass,
fallbackToSimpleAuth, null);
}
/**
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
* <code>remoteId</code>, returning the rpc response.
@ -1401,14 +1421,17 @@ public class Client implements AutoCloseable {
* @param serviceClass - service class for RPC
* @param fallbackToSimpleAuth - set to true or false during this method to
* indicate if a secure client falls back to simple auth
* @returns the rpc response
* @param alignmentContext - state alignment context
* @return the rpc response
* Throws exceptions if there are network problems or if the remote code
* threw an exception.
*/
Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, int serviceClass,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext)
throws IOException {
final Call call = createCall(rpcKind, rpcRequest);
call.setAlignmentContext(alignmentContext);
final Connection connection = getConnection(remoteId, call, serviceClass,
fallbackToSimpleAuth);

View File

@ -86,7 +86,7 @@ public class ProtobufRpcEngine implements RpcEngine {
SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy
) throws IOException {
return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
rpcTimeout, connectionRetryPolicy, null);
rpcTimeout, connectionRetryPolicy, null, null);
}
@Override
@ -94,10 +94,12 @@ public class ProtobufRpcEngine implements RpcEngine {
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext)
throws IOException {
final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth);
rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth,
alignmentContext);
return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[]{protocol}, invoker), false);
}
@ -122,15 +124,18 @@ public class ProtobufRpcEngine implements RpcEngine {
private final long clientProtocolVersion;
private final String protocolName;
private AtomicBoolean fallbackToSimpleAuth;
private AlignmentContext alignmentContext;
private Invoker(Class<?> protocol, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf, SocketFactory factory,
int rpcTimeout, RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext)
throws IOException {
this(protocol, Client.ConnectionId.getConnectionId(
addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf),
conf, factory);
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
this.alignmentContext = alignmentContext;
}
/**
@ -227,7 +232,7 @@ public class ProtobufRpcEngine implements RpcEngine {
try {
val = (RpcWritable.Buffer) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
new RpcProtobufRequest(rpcRequestHeader, theRequest), remoteId,
fallbackToSimpleAuth);
fallbackToSimpleAuth, alignmentContext);
} catch (Throwable e) {
if (LOG.isTraceEnabled()) {

View File

@ -586,7 +586,44 @@ public class RPC {
}
return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion,
addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy,
fallbackToSimpleAuth);
fallbackToSimpleAuth, null);
}
/**
* Get a protocol proxy that contains a proxy connection to a remote server
* and a set of methods that are supported by the server
*
* @param protocol protocol
* @param clientVersion client's version
* @param addr server address
* @param ticket security ticket
* @param conf configuration
* @param factory socket factory
* @param rpcTimeout max time for each rpc; 0 means no timeout
* @param connectionRetryPolicy retry policy
* @param fallbackToSimpleAuth set to true or false during calls to indicate
* if a secure client falls back to simple auth
* @param alignmentContext state alignment context
* @return the proxy
* @throws IOException if any error occurs
*/
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr,
UserGroupInformation ticket,
Configuration conf,
SocketFactory factory,
int rpcTimeout,
RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth,
AlignmentContext alignmentContext)
throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
}
return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion,
addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy,
fallbackToSimpleAuth, alignmentContext);
}
/**

View File

@ -50,7 +50,8 @@ public interface RpcEngine {
UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout,
RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth) throws IOException;
AtomicBoolean fallbackToSimpleAuth,
AlignmentContext alignmentContext) throws IOException;
/**
* Construct a server for a protocol implementation instance.

View File

@ -835,10 +835,15 @@ public abstract class Server {
final Writable rpcRequest; // Serialized Rpc request from client
ByteBuffer rpcResponse; // the response for this call
private RpcResponseHeaderProto bufferedHeader; // the response header
private Writable bufferedRv; // the byte response
RpcCall(RpcCall call) {
super(call);
this.connection = call.connection;
this.rpcRequest = call.rpcRequest;
this.bufferedRv = call.bufferedRv;
this.bufferedHeader = call.bufferedHeader;
}
RpcCall(Connection connection, int id) {
@ -859,6 +864,14 @@ public abstract class Server {
this.rpcRequest = param;
}
public void setBufferedHeader(RpcResponseHeaderProto header) {
this.bufferedHeader = header;
}
public void setBufferedRv(Writable rv) {
this.bufferedRv = rv;
}
@Override
public String getProtocol() {
return "rpc";
@ -947,6 +960,13 @@ public abstract class Server {
setupResponse(call,
RpcStatusProto.FATAL, RpcErrorCodeProto.ERROR_RPC_SERVER,
null, t.getClass().getName(), StringUtils.stringifyException(t));
} else if (alignmentContext != null) {
// rebuild response with state context in header
RpcResponseHeaderProto.Builder responseHeader =
call.bufferedHeader.toBuilder();
alignmentContext.updateResponseState(responseHeader);
RpcResponseHeaderProto builtHeader = responseHeader.build();
setupResponse(call, builtHeader, call.bufferedRv);
}
connection.sendResponse(call);
}
@ -2936,9 +2956,6 @@ public abstract class Server {
headerBuilder.setRetryCount(call.retryCount);
headerBuilder.setStatus(status);
headerBuilder.setServerIpcVersionNum(CURRENT_VERSION);
if(alignmentContext != null) {
alignmentContext.updateResponseState(headerBuilder);
}
if (status == RpcStatusProto.SUCCESS) {
RpcResponseHeaderProto header = headerBuilder.build();
@ -2965,6 +2982,12 @@ public abstract class Server {
private void setupResponse(RpcCall call,
RpcResponseHeaderProto header, Writable rv) throws IOException {
if (alignmentContext != null && call.bufferedHeader == null
&& call.bufferedRv == null) {
call.setBufferedHeader(header);
call.setBufferedRv(rv);
}
final byte[] response;
if (rv == null || (rv instanceof RpcWritable.ProtobufWrapper)) {
response = setupResponseForProtobuf(header, rv);

View File

@ -214,16 +214,19 @@ public class WritableRpcEngine implements RpcEngine {
private Client client;
private boolean isClosed = false;
private final AtomicBoolean fallbackToSimpleAuth;
private final AlignmentContext alignmentContext;
public Invoker(Class<?> protocol,
InetSocketAddress address, UserGroupInformation ticket,
Configuration conf, SocketFactory factory,
int rpcTimeout, AtomicBoolean fallbackToSimpleAuth)
int rpcTimeout, AtomicBoolean fallbackToSimpleAuth,
AlignmentContext alignmentContext)
throws IOException {
this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
ticket, rpcTimeout, null, conf);
this.client = CLIENTS.getClient(conf, factory);
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
this.alignmentContext = alignmentContext;
}
@Override
@ -246,7 +249,7 @@ public class WritableRpcEngine implements RpcEngine {
try {
value = (ObjectWritable)
client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args),
remoteId, fallbackToSimpleAuth);
remoteId, fallbackToSimpleAuth, alignmentContext);
} finally {
if (traceScope != null) traceScope.close();
}
@ -289,7 +292,7 @@ public class WritableRpcEngine implements RpcEngine {
int rpcTimeout, RetryPolicy connectionRetryPolicy)
throws IOException {
return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
rpcTimeout, connectionRetryPolicy, null);
rpcTimeout, connectionRetryPolicy, null, null);
}
/** Construct a client-side proxy object that implements the named protocol,
@ -301,7 +304,8 @@ public class WritableRpcEngine implements RpcEngine {
InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory,
int rpcTimeout, RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth)
AtomicBoolean fallbackToSimpleAuth,
AlignmentContext alignmentContext)
throws IOException {
if (connectionRetryPolicy != null) {
@ -311,7 +315,7 @@ public class WritableRpcEngine implements RpcEngine {
T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
factory, rpcTimeout, fallbackToSimpleAuth));
factory, rpcTimeout, fallbackToSimpleAuth, alignmentContext));
return new ProtocolProxy<T>(protocol, proxy, true);
}

View File

@ -279,7 +279,7 @@ public class TestRPC extends TestRpcBase {
SocketFactory factory, int rpcTimeout,
RetryPolicy connectionRetryPolicy) throws IOException {
return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
rpcTimeout, connectionRetryPolicy, null);
rpcTimeout, connectionRetryPolicy, null, null);
}
@SuppressWarnings("unchecked")
@ -288,7 +288,8 @@ public class TestRPC extends TestRpcBase {
Class<T> protocol, long clientVersion, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf, SocketFactory factory,
int rpcTimeout, RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext)
throws IOException {
T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol }, new StoppedInvocationHandler());
return new ProtocolProxy<T>(protocol, proxy, false);

View File

@ -166,7 +166,6 @@ import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RetriableException;
@ -242,7 +241,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
private static volatile ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
private final int smallBufferSize;
private final long serverDefaultsValidityPeriod;
private final ClientGCIContext alignmentContext;
public DfsClientConf getConf() {
return dfsClientConf;
@ -398,8 +396,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
this.saslClient = new SaslDataTransferClient(
conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
this.alignmentContext = new ClientGCIContext();
Client.setAlignmentContext(alignmentContext);
}
/**
@ -548,11 +544,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
return clientRunning;
}
@VisibleForTesting
ClientGCIContext getAlignmentContext() {
return alignmentContext;
}
long getLastLeaseRenewal() {
return lastLeaseRenewal;
}

View File

@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory;
import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory;
import org.apache.hadoop.ipc.AlignmentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -337,6 +338,15 @@ public class NameNodeProxiesClient {
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
boolean withRetries, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
return createProxyWithAlignmentContext(address, conf, ugi, withRetries,
fallbackToSimpleAuth, null);
}
public static ClientProtocol createProxyWithAlignmentContext(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
boolean withRetries, AtomicBoolean fallbackToSimpleAuth,
AlignmentContext alignmentContext)
throws IOException {
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
ProtobufRpcEngine.class);
@ -354,7 +364,7 @@ public class NameNodeProxiesClient {
ClientNamenodeProtocolPB.class, version, address, ugi, conf,
NetUtils.getDefaultSocketFactory(conf),
org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
fallbackToSimpleAuth).getProxy();
fallbackToSimpleAuth, alignmentContext).getProxy();
if (withRetries) { // create the proxy with retries
Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<>();

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HAUtilClient;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -106,7 +107,11 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
return fallbackToSimpleAuth;
}
/**
public synchronized AlignmentContext getAlignmentContext() {
return null; // by default the context is null
}
/**
* ProxyInfo to a NameNode. Includes its address.
*/
public static class NNProxyInfo<T> extends ProxyInfo<T> {

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.NameNodeProxiesClient;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
@ -26,11 +27,22 @@ import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicBoolean;
public class ClientHAProxyFactory<T> implements HAProxyFactory<T> {
private AlignmentContext alignmentContext;
public void setAlignmentContext(AlignmentContext alignmentContext) {
this.alignmentContext = alignmentContext;
}
@Override
@SuppressWarnings("unchecked")
public T createProxy(Configuration conf, InetSocketAddress nnAddr,
Class<T> xface, UserGroupInformation ugi, boolean withRetries,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
if (alignmentContext != null) {
return (T) NameNodeProxiesClient.createProxyWithAlignmentContext(
nnAddr, conf, ugi, false, fallbackToSimpleAuth, alignmentContext);
}
return (T) NameNodeProxiesClient.createNonHAProxyWithClientProtocol(
nnAddr, conf, ugi, false, fallbackToSimpleAuth);
}

View File

@ -1,212 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertThat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Class is used to test server sending state alignment information to clients
* via RPC and likewise clients receiving and updating their last known
* state alignment info.
* These tests check that after a single RPC call a client will have caught up
* to the most recent alignment state of the server.
*/
public class TestStateAlignmentContext {
static final long BLOCK_SIZE = 64 * 1024;
private static final int NUMDATANODES = 3;
private static final Configuration CONF = new HdfsConfiguration();
private static MiniDFSCluster cluster;
private static DistributedFileSystem dfs;
@BeforeClass
public static void startUpCluster() throws IOException {
// disable block scanner
CONF.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
// Set short retry timeouts so this test runs faster
CONF.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
CONF.setBoolean("fs.hdfs.impl.disable.cache", true);
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(NUMDATANODES)
.build();
cluster.waitActive();
}
@Before
public void before() throws IOException {
dfs = cluster.getFileSystem();
}
@AfterClass
public static void shutDownCluster() throws IOException {
if (dfs != null) {
dfs.close();
dfs = null;
}
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
@After
public void after() throws IOException {
dfs.close();
}
/**
* This test checks if after a client writes we can see the state id in
* updated via the response.
*/
@Test
public void testStateTransferOnWrite() throws Exception {
long preWriteState = cluster.getNamesystem().getLastWrittenTransactionId();
DFSTestUtil.writeFile(dfs, new Path("/testFile1"), "abc");
long clientState = dfs.dfs.getAlignmentContext().getLastSeenStateId();
long postWriteState = cluster.getNamesystem().getLastWrittenTransactionId();
// Write(s) should have increased state. Check for greater than.
assertThat(clientState > preWriteState, is(true));
// Client and server state should be equal.
assertThat(clientState, is(postWriteState));
}
/**
* This test checks if after a client reads we can see the state id in
* updated via the response.
*/
@Test
public void testStateTransferOnRead() throws Exception {
DFSTestUtil.writeFile(dfs, new Path("/testFile2"), "123");
long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId();
DFSTestUtil.readFile(dfs, new Path("/testFile2"));
// Read should catch client up to last written state.
long clientState = dfs.dfs.getAlignmentContext().getLastSeenStateId();
assertThat(clientState, is(lastWrittenId));
}
/**
* This test checks that a fresh client starts with no state and becomes
* updated of state from RPC call.
*/
@Test
public void testStateTransferOnFreshClient() throws Exception {
DFSTestUtil.writeFile(dfs, new Path("/testFile3"), "ezpz");
long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId();
try (DistributedFileSystem clearDfs =
(DistributedFileSystem) FileSystem.get(CONF)) {
ClientGCIContext clientState = clearDfs.dfs.getAlignmentContext();
assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE));
DFSTestUtil.readFile(clearDfs, new Path("/testFile3"));
assertThat(clientState.getLastSeenStateId(), is(lastWrittenId));
}
}
/**
* This test mocks an AlignmentContext and ensures that DFSClient
* writes its lastSeenStateId into RPC requests.
*/
@Test
public void testClientSendsState() throws Exception {
AlignmentContext alignmentContext = dfs.dfs.getAlignmentContext();
AlignmentContext spiedAlignContext = Mockito.spy(alignmentContext);
Client.setAlignmentContext(spiedAlignContext);
// Collect RpcRequestHeaders for verification later.
final List<RpcHeaderProtos.RpcRequestHeaderProto.Builder> collectedHeaders =
new ArrayList<>();
Mockito.doAnswer(a -> {
Object[] arguments = a.getArguments();
RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
(RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
collectedHeaders.add(header);
return a.callRealMethod();
}).when(spiedAlignContext).updateRequestState(Mockito.any());
DFSTestUtil.writeFile(dfs, new Path("/testFile4"), "shv");
// Ensure first header and last header have different state.
assertThat(collectedHeaders.size() > 1, is(true));
assertThat(collectedHeaders.get(0).getStateId(),
is(not(collectedHeaders.get(collectedHeaders.size() - 1))));
// Ensure collected RpcRequestHeaders are in increasing order.
long lastHeader = collectedHeaders.get(0).getStateId();
for(RpcHeaderProtos.RpcRequestHeaderProto.Builder header :
collectedHeaders.subList(1, collectedHeaders.size())) {
long currentHeader = header.getStateId();
assertThat(currentHeader >= lastHeader, is(true));
lastHeader = header.getStateId();
}
}
/**
* This test mocks an AlignmentContext to send stateIds greater than
* server's stateId in RPC requests.
*/
@Test
public void testClientSendsGreaterState() throws Exception {
AlignmentContext alignmentContext = dfs.dfs.getAlignmentContext();
AlignmentContext spiedAlignContext = Mockito.spy(alignmentContext);
Client.setAlignmentContext(spiedAlignContext);
// Make every client call have a stateId > server's stateId.
Mockito.doAnswer(a -> {
Object[] arguments = a.getArguments();
RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
(RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
try {
return a.callRealMethod();
} finally {
header.setStateId(Long.MAX_VALUE);
}
}).when(spiedAlignContext).updateRequestState(Mockito.any());
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.LOG);
DFSTestUtil.writeFile(dfs, new Path("/testFile4"), "shv");
logCapturer.stopCapturing();
String output = logCapturer.getOutput();
assertThat(output, containsString("A client sent stateId: "));
}
}

View File

@ -0,0 +1,467 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertThat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* Class is used to test server sending state alignment information to clients
* via RPC and likewise clients receiving and updating their last known
* state alignment info.
* These tests check that after a single RPC call a client will have caught up
* to the most recent alignment state of the server.
*/
public class TestStateAlignmentContextWithHA {
private static final int NUMDATANODES = 1;
private static final int NUMCLIENTS = 10;
private static final int NUMFILES = 300;
private static final Configuration CONF = new HdfsConfiguration();
private static final String NAMESERVICE = "nameservice";
private static final List<ClientGCIContext> AC_LIST = new ArrayList<>();
private static MiniDFSCluster cluster;
private static List<Worker> clients;
private static ClientGCIContext spy;
private DistributedFileSystem dfs;
private int active = 0;
private int standby = 1;
static class AlignmentContextProxyProvider<T>
extends ConfiguredFailoverProxyProvider<T> {
private ClientGCIContext alignmentContext;
public AlignmentContextProxyProvider(
Configuration conf, URI uri, Class<T> xface,
HAProxyFactory<T> factory) throws IOException {
super(conf, uri, xface, factory);
// Create and set AlignmentContext in HAProxyFactory.
// All proxies by factory will now have AlignmentContext assigned.
this.alignmentContext = (spy != null ? spy : new ClientGCIContext());
((ClientHAProxyFactory) factory).setAlignmentContext(alignmentContext);
AC_LIST.add(alignmentContext);
}
@Override // AbstractNNFailoverProxyProvider
public synchronized ClientGCIContext getAlignmentContext() {
return this.alignmentContext;
}
}
static class SpyConfiguredContextProxyProvider<T>
extends ConfiguredFailoverProxyProvider<T> {
private ClientGCIContext alignmentContext;
public SpyConfiguredContextProxyProvider(
Configuration conf, URI uri, Class<T> xface,
HAProxyFactory<T> factory) throws IOException {
super(conf, uri, xface, factory);
// Create but DON'T set in HAProxyFactory.
this.alignmentContext = (spy != null ? spy : new ClientGCIContext());
AC_LIST.add(alignmentContext);
}
}
@BeforeClass
public static void startUpCluster() throws IOException {
// disable block scanner
CONF.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
// Set short retry timeouts so this test runs faster
CONF.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
CONF.setBoolean("fs.hdfs.impl.disable.cache", true);
MiniDFSNNTopology.NSConf nsConf = new MiniDFSNNTopology.NSConf(NAMESERVICE);
nsConf.addNN(new MiniDFSNNTopology.NNConf("nn1"));
nsConf.addNN(new MiniDFSNNTopology.NNConf("nn2"));
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(NUMDATANODES)
.nnTopology(MiniDFSNNTopology.simpleHATopology().addNameservice(nsConf))
.build();
cluster.waitActive();
cluster.transitionToActive(0);
}
@Before
public void before() throws IOException, URISyntaxException {
killWorkers();
HATestUtil.setFailoverConfigurations(cluster, CONF, NAMESERVICE, 0);
CONF.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX +
"." + NAMESERVICE, AlignmentContextProxyProvider.class.getName());
dfs = (DistributedFileSystem) FileSystem.get(CONF);
}
@AfterClass
public static void shutDownCluster() throws IOException {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
@After
public void after() throws IOException {
cluster.transitionToStandby(1);
cluster.transitionToActive(0);
active = 0;
standby = 1;
if (dfs != null) {
dfs.close();
dfs = null;
}
AC_LIST.clear();
spy = null;
}
/**
* This test checks if after a client writes we can see the state id in
* updated via the response.
*/
@Test
public void testNoStateOnConfiguredProxyProvider() throws Exception {
Configuration confCopy = new Configuration(CONF);
confCopy.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX +
"." + NAMESERVICE, SpyConfiguredContextProxyProvider.class.getName());
try (DistributedFileSystem clearDfs =
(DistributedFileSystem) FileSystem.get(confCopy)) {
ClientGCIContext clientState = getContext(1);
assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE));
DFSTestUtil.writeFile(clearDfs, new Path("/testFileNoState"), "no_state");
assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE));
}
}
/**
* This test checks if after a client writes we can see the state id in
* updated via the response.
*/
@Test
public void testStateTransferOnWrite() throws Exception {
long preWriteState =
cluster.getNamesystem(active).getLastWrittenTransactionId();
DFSTestUtil.writeFile(dfs, new Path("/testFile1"), "abc");
long clientState = getContext(0).getLastSeenStateId();
long postWriteState =
cluster.getNamesystem(active).getLastWrittenTransactionId();
// Write(s) should have increased state. Check for greater than.
assertThat(clientState > preWriteState, is(true));
// Client and server state should be equal.
assertThat(clientState, is(postWriteState));
}
/**
* This test checks if after a client reads we can see the state id in
* updated via the response.
*/
@Test
public void testStateTransferOnRead() throws Exception {
DFSTestUtil.writeFile(dfs, new Path("/testFile2"), "123");
long lastWrittenId =
cluster.getNamesystem(active).getLastWrittenTransactionId();
DFSTestUtil.readFile(dfs, new Path("/testFile2"));
// Read should catch client up to last written state.
long clientState = getContext(0).getLastSeenStateId();
assertThat(clientState, is(lastWrittenId));
}
/**
* This test checks that a fresh client starts with no state and becomes
* updated of state from RPC call.
*/
@Test
public void testStateTransferOnFreshClient() throws Exception {
DFSTestUtil.writeFile(dfs, new Path("/testFile3"), "ezpz");
long lastWrittenId =
cluster.getNamesystem(active).getLastWrittenTransactionId();
try (DistributedFileSystem clearDfs =
(DistributedFileSystem) FileSystem.get(CONF)) {
ClientGCIContext clientState = getContext(1);
assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE));
DFSTestUtil.readFile(clearDfs, new Path("/testFile3"));
assertThat(clientState.getLastSeenStateId(), is(lastWrittenId));
}
}
/**
* This test mocks an AlignmentContext and ensures that DFSClient
* writes its lastSeenStateId into RPC requests.
*/
@Test
public void testClientSendsState() throws Exception {
ClientGCIContext alignmentContext = new ClientGCIContext();
ClientGCIContext spiedAlignContext = Mockito.spy(alignmentContext);
spy = spiedAlignContext;
try (DistributedFileSystem clearDfs =
(DistributedFileSystem) FileSystem.get(CONF)) {
// Collect RpcRequestHeaders for verification later.
final List<RpcHeaderProtos.RpcRequestHeaderProto.Builder> headers =
new ArrayList<>();
Mockito.doAnswer(a -> {
Object[] arguments = a.getArguments();
RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
(RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
headers.add(header);
return a.callRealMethod();
}).when(spiedAlignContext).updateRequestState(Mockito.any());
DFSTestUtil.writeFile(clearDfs, new Path("/testFile4"), "shv");
// Ensure first header and last header have different state.
assertThat(headers.size() > 1, is(true));
assertThat(headers.get(0).getStateId(),
is(not(headers.get(headers.size() - 1))));
// Ensure collected RpcRequestHeaders are in increasing order.
long lastHeader = headers.get(0).getStateId();
for (RpcHeaderProtos.RpcRequestHeaderProto.Builder header :
headers.subList(1, headers.size())) {
long currentHeader = header.getStateId();
assertThat(currentHeader >= lastHeader, is(true));
lastHeader = header.getStateId();
}
}
}
/**
* This test mocks an AlignmentContext to send stateIds greater than
* server's stateId in RPC requests.
*/
@Test
public void testClientSendsGreaterState() throws Exception {
ClientGCIContext alignmentContext = new ClientGCIContext();
ClientGCIContext spiedAlignContext = Mockito.spy(alignmentContext);
spy = spiedAlignContext;
try (DistributedFileSystem clearDfs =
(DistributedFileSystem) FileSystem.get(CONF)) {
// Make every client call have a stateId > server's stateId.
Mockito.doAnswer(a -> {
Object[] arguments = a.getArguments();
RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
(RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
try {
return a.callRealMethod();
} finally {
header.setStateId(Long.MAX_VALUE);
}
}).when(spiedAlignContext).updateRequestState(Mockito.any());
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.LOG);
DFSTestUtil.writeFile(clearDfs, new Path("/testFile4"), "shv");
logCapturer.stopCapturing();
String output = logCapturer.getOutput();
assertThat(output, containsString("A client sent stateId: "));
}
}
/**
* This test checks if after a client writes we can see the state id in
* updated via the response.
*/
@Test
public void testStateTransferOnWriteWithFailover() throws Exception {
long preWriteState =
cluster.getNamesystem(active).getLastWrittenTransactionId();
// Write using HA client.
DFSTestUtil.writeFile(dfs, new Path("/testFile1FO"), "123");
long clientState = getContext(0).getLastSeenStateId();
long postWriteState =
cluster.getNamesystem(active).getLastWrittenTransactionId();
// Write(s) should have increased state. Check for greater than.
assertThat(clientState > preWriteState, is(true));
// Client and server state should be equal.
assertThat(clientState, is(postWriteState));
// Failover NameNode.
failOver();
// Write using HA client.
DFSTestUtil.writeFile(dfs, new Path("/testFile2FO"), "456");
long clientStateFO = getContext(0).getLastSeenStateId();
long writeStateFO =
cluster.getNamesystem(active).getLastWrittenTransactionId();
// Write(s) should have increased state. Check for greater than.
assertThat(clientStateFO > postWriteState, is(true));
// Client and server state should be equal.
assertThat(clientStateFO, is(writeStateFO));
}
@Test(timeout=300000)
public void testMultiClientStatesWithRandomFailovers() throws Exception {
// We want threads to run during failovers; assuming at minimum 4 cores,
// would like to see 2 clients competing against 2 NameNodes.
ExecutorService execService = Executors.newFixedThreadPool(2);
clients = new ArrayList<>(NUMCLIENTS);
for (int i = 1; i <= NUMCLIENTS; i++) {
DistributedFileSystem haClient =
(DistributedFileSystem) FileSystem.get(CONF);
clients.add(new Worker(haClient, NUMFILES, "/testFile3FO_", i));
}
// Execute workers in threadpool with random failovers.
List<Future<STATE>> futures = submitAll(execService, clients);
execService.shutdown();
boolean finished = false;
while (!finished) {
failOver();
finished = execService.awaitTermination(1L, TimeUnit.SECONDS);
}
// Validation.
for (Future<STATE> future : futures) {
assertThat(future.get(), is(STATE.SUCCESS));
}
}
private ClientGCIContext getContext(int clientCreationIndex) {
return AC_LIST.get(clientCreationIndex);
}
private void failOver() throws IOException {
cluster.transitionToStandby(active);
cluster.transitionToActive(standby);
int tempActive = active;
active = standby;
standby = tempActive;
}
/* Executor.invokeAll() is blocking so utilizing submit instead. */
private static List<Future<STATE>> submitAll(ExecutorService executor,
Collection<Worker> calls) {
List<Future<STATE>> futures = new ArrayList<>(calls.size());
for (Worker call : calls) {
Future<STATE> future = executor.submit(call);
futures.add(future);
}
return futures;
}
private void killWorkers() throws IOException {
if (clients != null) {
for(Worker worker : clients) {
worker.kill();
}
clients = null;
}
}
private enum STATE { SUCCESS, FAIL, ERROR }
private class Worker implements Callable<STATE> {
private final DistributedFileSystem client;
private final int filesToMake;
private String filePath;
private final int nonce;
Worker(DistributedFileSystem client,
int filesToMake,
String filePath,
int nonce) {
this.client = client;
this.filesToMake = filesToMake;
this.filePath = filePath;
this.nonce = nonce;
}
@Override
public STATE call() {
try {
for (int i = 0; i < filesToMake; i++) {
long preClientStateFO =
getContext(nonce).getLastSeenStateId();
// Write using HA client.
Path path = new Path(filePath + nonce + i);
DFSTestUtil.writeFile(client, path, "erk");
long postClientStateFO =
getContext(nonce).getLastSeenStateId();
// Write(s) should have increased state. Check for greater than.
if (postClientStateFO <= preClientStateFO) {
System.out.println("FAIL: Worker started with: " +
preClientStateFO + ", but finished with: " + postClientStateFO);
return STATE.FAIL;
}
}
client.close();
return STATE.SUCCESS;
} catch (IOException e) {
System.out.println("ERROR: Worker failed with: " + e);
return STATE.ERROR;
}
}
public void kill() throws IOException {
client.dfs.closeAllFilesBeingWritten(true);
client.dfs.closeOutputStreams(true);
client.dfs.closeConnectionToNamenode();
client.dfs.close();
client.close();
}
}
}