diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java index f952325ad93..66d6edc52e7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java @@ -20,6 +20,7 @@ package org.apache.hadoop.ipc; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; /** @@ -48,4 +49,17 @@ public interface AlignmentContext { */ void receiveResponseState(RpcResponseHeaderProto header); + /** + * This is the intended client method call to pull last seen state info + * into RPC request processing. + * @param header The RPC request header builder. + */ + void updateRequestState(RpcRequestHeaderProto.Builder header); + + /** + * This is the intended server method call to implement to receive + * client state info during RPC response header processing. + * @param header The RPC request header. + */ + void receiveRequestState(RpcRequestHeaderProto header); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 1db25f4fcce..f9b2f8a51cf 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -1115,7 +1115,7 @@ public class Client implements AutoCloseable { // Items '1' and '2' are prepared here. RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry, - clientId); + clientId, alignmentContext); final ResponseBuffer buf = new ResponseBuffer(); header.writeDelimitedTo(buf); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 40675669104..ddc8a13aa04 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -2408,6 +2408,11 @@ public abstract class Server { } } + if (alignmentContext != null) { + // Check incoming RPC request's state. + alignmentContext.receiveRequestState(header); + } + CallerContext callerContext = null; if (header.hasCallerContext()) { callerContext = diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java index 1a5acbab6ec..9a0b05c369b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java @@ -21,6 +21,7 @@ package org.apache.hadoop.util; import java.io.DataInput; import java.io.IOException; +import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto; @@ -165,6 +166,13 @@ public abstract class ProtoUtil { public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind, RpcRequestHeaderProto.OperationProto operation, int callId, int retryCount, byte[] uuid) { + return makeRpcRequestHeader(rpcKind, operation, callId, retryCount, uuid, + null); + } + + public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind, + RpcRequestHeaderProto.OperationProto operation, int callId, + int retryCount, byte[] uuid, AlignmentContext alignmentContext) { RpcRequestHeaderProto.Builder result = RpcRequestHeaderProto.newBuilder(); result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId) .setRetryCount(retryCount).setClientId(ByteString.copyFrom(uuid)); @@ -190,6 +198,11 @@ public abstract class ProtoUtil { result.setCallerContext(contextBuilder); } + // Add alignment context if it is not null + if (alignmentContext != null) { + alignmentContext.updateRequestState(result); + } + return result.build(); } } diff --git a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto index bfe13017fa4..e8d8cbbfe70 100644 --- a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto +++ b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto @@ -90,6 +90,7 @@ message RpcRequestHeaderProto { // the header for the RpcRequest optional sint32 retryCount = 5 [default = -1]; optional RPCTraceInfoProto traceInfo = 6; // tracing info optional RPCCallerContextProto callerContext = 7; // call context + optional int64 stateId = 8; // The last seen Global State ID } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java index a53ada9bb7f..0f27715a302 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.ipc.AlignmentContext; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; /** @@ -32,15 +33,10 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; @InterfaceStability.Stable class ClientGCIContext implements AlignmentContext { - private final DFSClient dfsClient; private final AtomicLong lastSeenStateId = new AtomicLong(Long.MIN_VALUE); - /** - * Client side constructor. - * @param dfsClient client side state receiver - */ - ClientGCIContext(DFSClient dfsClient) { - this.dfsClient = dfsClient; + long getLastSeenStateId() { + return lastSeenStateId.get(); } /** @@ -53,13 +49,30 @@ class ClientGCIContext implements AlignmentContext { } /** - * Client side implementation for receiving state alignment info. + * Client side implementation for receiving state alignment info in responses. */ @Override public void receiveResponseState(RpcResponseHeaderProto header) { updateMax(header.getStateId()); } + /** + * Client side implementation for providing state alignment info in requests. + */ + @Override + public void updateRequestState(RpcRequestHeaderProto.Builder header) { + header.setStateId(lastSeenStateId.longValue()); + } + + /** + * Client side implementation only provides state alignment info in requests. + * Client does not receive RPC requests therefore this does nothing. + */ + @Override + public void receiveRequestState(RpcRequestHeaderProto header) { + // Do nothing. + } + private void updateMax(long sample) { while (true) { long curMax = lastSeenStateId.get(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 761b4c7f4ab..cf1d2b0e88c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -218,7 +218,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, final UserGroupInformation ugi; volatile boolean clientRunning = true; volatile long lastLeaseRenewal; - volatile long lastSeenStateId; private volatile FsServerDefaults serverDefaults; private volatile long serverDefaultsLastUpdate; final String clientName; @@ -240,6 +239,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL; private final int smallBufferSize; private final long serverDefaultsValidityPeriod; + private final ClientGCIContext alignmentContext; public DfsClientConf getConf() { return dfsClientConf; @@ -391,7 +391,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, this.saslClient = new SaslDataTransferClient( conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth); - Client.setAlignmentContext(new ClientGCIContext(this)); + this.alignmentContext = new ClientGCIContext(); + Client.setAlignmentContext(alignmentContext); } /** @@ -540,6 +541,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, return clientRunning; } + @VisibleForTesting + ClientGCIContext getAlignmentContext() { + return alignmentContext; + } + long getLastLeaseRenewal() { return lastLeaseRenewal; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java index 2d7d94e3a4b..f0ebf986b75 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.namenode; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.ipc.AlignmentContext; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; /** @@ -41,7 +42,7 @@ class GlobalStateIdContext implements AlignmentContext { } /** - * Server side implementation for providing state alignment info. + * Server side implementation for providing state alignment info in responses. */ @Override public void updateResponseState(RpcResponseHeaderProto.Builder header) { @@ -56,4 +57,27 @@ class GlobalStateIdContext implements AlignmentContext { public void receiveResponseState(RpcResponseHeaderProto header) { // Do nothing. } + + /** + * Server side implementation only receives state alignment info. + * It does not build RPC requests therefore this does nothing. + */ + @Override + public void updateRequestState(RpcRequestHeaderProto.Builder header) { + // Do nothing. + } + + /** + * Server side implementation for processing state alignment info in requests. + */ + @Override + public void receiveRequestState(RpcRequestHeaderProto header) { + long serverStateId = namesystem.getLastWrittenTransactionId(); + long clientStateId = header.getStateId(); + if (clientStateId > serverStateId) { + FSNamesystem.LOG.warn("A client sent stateId: " + clientStateId + + ", but server state is: " + serverStateId); + } + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java index 590f7020655..4525d014414 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java @@ -18,20 +18,33 @@ package org.apache.hadoop.hdfs; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; import static org.junit.Assert.assertThat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.ipc.AlignmentContext; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + /** * Class is used to test server sending state alignment information to clients @@ -91,7 +104,7 @@ public class TestStateAlignmentContext { public void testStateTransferOnWrite() throws Exception { long preWriteState = cluster.getNamesystem().getLastWrittenTransactionId(); DFSTestUtil.writeFile(dfs, new Path("/testFile1"), "abc"); - long clientState = dfs.dfs.lastSeenStateId; + long clientState = dfs.dfs.getAlignmentContext().getLastSeenStateId(); long postWriteState = cluster.getNamesystem().getLastWrittenTransactionId(); // Write(s) should have increased state. Check for greater than. assertThat(clientState > preWriteState, is(true)); @@ -109,7 +122,8 @@ public class TestStateAlignmentContext { long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId(); DFSTestUtil.readFile(dfs, new Path("/testFile2")); // Read should catch client up to last written state. - assertThat(dfs.dfs.lastSeenStateId, is(lastWrittenId)); + long clientState = dfs.dfs.getAlignmentContext().getLastSeenStateId(); + assertThat(clientState, is(lastWrittenId)); } /** @@ -122,10 +136,86 @@ public class TestStateAlignmentContext { long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId(); try (DistributedFileSystem clearDfs = (DistributedFileSystem) FileSystem.get(CONF)) { - assertThat(clearDfs.dfs.lastSeenStateId, is(0L)); + ClientGCIContext clientState = clearDfs.dfs.getAlignmentContext(); + assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE)); DFSTestUtil.readFile(clearDfs, new Path("/testFile3")); - assertThat(clearDfs.dfs.lastSeenStateId, is(lastWrittenId)); + assertThat(clientState.getLastSeenStateId(), is(lastWrittenId)); } } + /** + * This test mocks an AlignmentContext and ensures that DFSClient + * writes its lastSeenStateId into RPC requests. + */ + @Test + public void testClientSendsState() throws Exception { + AlignmentContext alignmentContext = dfs.dfs.getAlignmentContext(); + AlignmentContext spiedAlignContext = Mockito.spy(alignmentContext); + Client.setAlignmentContext(spiedAlignContext); + + // Collect RpcRequestHeaders for verification later. + final List collectedHeaders = + new ArrayList<>(); + Mockito.doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock a) throws Throwable { + Object[] arguments = a.getArguments(); + RpcHeaderProtos.RpcRequestHeaderProto.Builder header = + (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0]; + collectedHeaders.add(header); + return a.callRealMethod(); + } + }).when(spiedAlignContext).updateRequestState(Mockito.any()); + + DFSTestUtil.writeFile(dfs, new Path("/testFile4"), "shv"); + + // Ensure first header and last header have different state. + assertThat(collectedHeaders.size() > 1, is(true)); + assertThat(collectedHeaders.get(0).getStateId(), + is(not(collectedHeaders.get(collectedHeaders.size() - 1)))); + + // Ensure collected RpcRequestHeaders are in increasing order. + long lastHeader = collectedHeaders.get(0).getStateId(); + for(RpcHeaderProtos.RpcRequestHeaderProto.Builder header : + collectedHeaders.subList(1, collectedHeaders.size())) { + long currentHeader = header.getStateId(); + assertThat(currentHeader >= lastHeader, is(true)); + lastHeader = header.getStateId(); + } + } + + /** + * This test mocks an AlignmentContext to send stateIds greater than + * server's stateId in RPC requests. + */ + @Test + public void testClientSendsGreaterState() throws Exception { + AlignmentContext alignmentContext = dfs.dfs.getAlignmentContext(); + AlignmentContext spiedAlignContext = Mockito.spy(alignmentContext); + Client.setAlignmentContext(spiedAlignContext); + + // Make every client call have a stateId > server's stateId. + Mockito.doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock a) throws Throwable { + Object[] arguments = a.getArguments(); + RpcHeaderProtos.RpcRequestHeaderProto.Builder header = + (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0]; + try { + return a.callRealMethod(); + } finally { + header.setStateId(Long.MAX_VALUE); + } + } + }).when(spiedAlignContext).updateRequestState(Mockito.any()); + + GenericTestUtils.LogCapturer logCapturer = + GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.LOG); + DFSTestUtil.writeFile(dfs, new Path("/testFile4"), "shv"); + logCapturer.stopCapturing(); + + String output = logCapturer.getOutput(); + assertThat(output, containsString("A client sent stateId: ")); + } + }