diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java index f952325ad93..66d6edc52e7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java @@ -20,6 +20,7 @@ package org.apache.hadoop.ipc; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; /** @@ -48,4 +49,17 @@ public interface AlignmentContext { */ void receiveResponseState(RpcResponseHeaderProto header); + /** + * This is the intended client method call to pull last seen state info + * into RPC request processing. + * @param header The RPC request header builder. + */ + void updateRequestState(RpcRequestHeaderProto.Builder header); + + /** + * This is the intended server method call to implement to receive + * client state info during RPC response header processing. + * @param header The RPC request header. + */ + void receiveRequestState(RpcRequestHeaderProto header); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 096c164ccd8..91a698f6879 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -1114,7 +1114,7 @@ public class Client implements AutoCloseable { // Items '1' and '2' are prepared here. RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry, - clientId); + clientId, alignmentContext); final ResponseBuffer buf = new ResponseBuffer(); header.writeDelimitedTo(buf); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 2064ea74685..6b5435244df 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -2486,6 +2486,11 @@ public abstract class Server { } } + if (alignmentContext != null) { + // Check incoming RPC request's state. + alignmentContext.receiveRequestState(header); + } + CallerContext callerContext = null; if (header.hasCallerContext()) { callerContext = diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java index 1a5acbab6ec..9a0b05c369b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java @@ -21,6 +21,7 @@ package org.apache.hadoop.util; import java.io.DataInput; import java.io.IOException; +import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto; @@ -165,6 +166,13 @@ public abstract class ProtoUtil { public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind, RpcRequestHeaderProto.OperationProto operation, int callId, int retryCount, byte[] uuid) { + return makeRpcRequestHeader(rpcKind, operation, callId, retryCount, uuid, + null); + } + + public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind, + RpcRequestHeaderProto.OperationProto operation, int callId, + int retryCount, byte[] uuid, AlignmentContext alignmentContext) { RpcRequestHeaderProto.Builder result = RpcRequestHeaderProto.newBuilder(); result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId) .setRetryCount(retryCount).setClientId(ByteString.copyFrom(uuid)); @@ -190,6 +198,11 @@ public abstract class ProtoUtil { result.setCallerContext(contextBuilder); } + // Add alignment context if it is not null + if (alignmentContext != null) { + alignmentContext.updateRequestState(result); + } + return result.build(); } } diff --git a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto index bfe13017fa4..e8d8cbbfe70 100644 --- a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto +++ b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto @@ -90,6 +90,7 @@ message RpcRequestHeaderProto { // the header for the RpcRequest optional sint32 retryCount = 5 [default = -1]; optional RPCTraceInfoProto traceInfo = 6; // tracing info optional RPCCallerContextProto callerContext = 7; // call context + optional int64 stateId = 8; // The last seen Global State ID } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java index 3d722f8496c..0d0bd251425 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.ipc.AlignmentContext; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; import java.util.concurrent.atomic.LongAccumulator; @@ -33,16 +34,11 @@ import java.util.concurrent.atomic.LongAccumulator; @InterfaceStability.Stable class ClientGCIContext implements AlignmentContext { - private final DFSClient dfsClient; private final LongAccumulator lastSeenStateId = new LongAccumulator(Math::max, Long.MIN_VALUE); - /** - * Client side constructor. - * @param dfsClient client side state receiver - */ - ClientGCIContext(DFSClient dfsClient) { - this.dfsClient = dfsClient; + long getLastSeenStateId() { + return lastSeenStateId.get(); } /** @@ -55,11 +51,27 @@ class ClientGCIContext implements AlignmentContext { } /** - * Client side implementation for receiving state alignment info. + * Client side implementation for receiving state alignment info in responses. */ @Override public void receiveResponseState(RpcResponseHeaderProto header) { lastSeenStateId.accumulate(header.getStateId()); - dfsClient.lastSeenStateId = lastSeenStateId.get(); + } + + /** + * Client side implementation for providing state alignment info in requests. + */ + @Override + public void updateRequestState(RpcRequestHeaderProto.Builder header) { + header.setStateId(lastSeenStateId.longValue()); + } + + /** + * Client side implementation only provides state alignment info in requests. + * Client does not receive RPC requests therefore this does nothing. + */ + @Override + public void receiveRequestState(RpcRequestHeaderProto header) { + // Do nothing. } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 3c5a40ecc0c..0f4d5c1ee13 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -220,7 +220,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, final UserGroupInformation ugi; volatile boolean clientRunning = true; volatile long lastLeaseRenewal; - volatile long lastSeenStateId; private volatile FsServerDefaults serverDefaults; private volatile long serverDefaultsLastUpdate; final String clientName; @@ -243,6 +242,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, private static volatile ThreadPoolExecutor STRIPED_READ_THREAD_POOL; private final int smallBufferSize; private final long serverDefaultsValidityPeriod; + private final ClientGCIContext alignmentContext; public DfsClientConf getConf() { return dfsClientConf; @@ -398,7 +398,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, this.saslClient = new SaslDataTransferClient( conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth); - Client.setAlignmentContext(new ClientGCIContext(this)); + this.alignmentContext = new ClientGCIContext(); + Client.setAlignmentContext(alignmentContext); } /** @@ -547,6 +548,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, return clientRunning; } + @VisibleForTesting + ClientGCIContext getAlignmentContext() { + return alignmentContext; + } + long getLastLeaseRenewal() { return lastLeaseRenewal; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java index 2d7d94e3a4b..f0ebf986b75 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.namenode; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.ipc.AlignmentContext; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; /** @@ -41,7 +42,7 @@ class GlobalStateIdContext implements AlignmentContext { } /** - * Server side implementation for providing state alignment info. + * Server side implementation for providing state alignment info in responses. */ @Override public void updateResponseState(RpcResponseHeaderProto.Builder header) { @@ -56,4 +57,27 @@ class GlobalStateIdContext implements AlignmentContext { public void receiveResponseState(RpcResponseHeaderProto header) { // Do nothing. } + + /** + * Server side implementation only receives state alignment info. + * It does not build RPC requests therefore this does nothing. + */ + @Override + public void updateRequestState(RpcRequestHeaderProto.Builder header) { + // Do nothing. + } + + /** + * Server side implementation for processing state alignment info in requests. + */ + @Override + public void receiveRequestState(RpcRequestHeaderProto header) { + long serverStateId = namesystem.getLastWrittenTransactionId(); + long clientStateId = header.getStateId(); + if (clientStateId > serverStateId) { + FSNamesystem.LOG.warn("A client sent stateId: " + clientStateId + + ", but server state is: " + serverStateId); + } + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java index 590f7020655..ce4639f8efa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java @@ -18,20 +18,30 @@ package org.apache.hadoop.hdfs; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; import static org.junit.Assert.assertThat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.ipc.AlignmentContext; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; /** * Class is used to test server sending state alignment information to clients @@ -91,7 +101,7 @@ public class TestStateAlignmentContext { public void testStateTransferOnWrite() throws Exception { long preWriteState = cluster.getNamesystem().getLastWrittenTransactionId(); DFSTestUtil.writeFile(dfs, new Path("/testFile1"), "abc"); - long clientState = dfs.dfs.lastSeenStateId; + long clientState = dfs.dfs.getAlignmentContext().getLastSeenStateId(); long postWriteState = cluster.getNamesystem().getLastWrittenTransactionId(); // Write(s) should have increased state. Check for greater than. assertThat(clientState > preWriteState, is(true)); @@ -109,7 +119,8 @@ public class TestStateAlignmentContext { long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId(); DFSTestUtil.readFile(dfs, new Path("/testFile2")); // Read should catch client up to last written state. - assertThat(dfs.dfs.lastSeenStateId, is(lastWrittenId)); + long clientState = dfs.dfs.getAlignmentContext().getLastSeenStateId(); + assertThat(clientState, is(lastWrittenId)); } /** @@ -122,10 +133,80 @@ public class TestStateAlignmentContext { long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId(); try (DistributedFileSystem clearDfs = (DistributedFileSystem) FileSystem.get(CONF)) { - assertThat(clearDfs.dfs.lastSeenStateId, is(0L)); + ClientGCIContext clientState = clearDfs.dfs.getAlignmentContext(); + assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE)); DFSTestUtil.readFile(clearDfs, new Path("/testFile3")); - assertThat(clearDfs.dfs.lastSeenStateId, is(lastWrittenId)); + assertThat(clientState.getLastSeenStateId(), is(lastWrittenId)); } } + /** + * This test mocks an AlignmentContext and ensures that DFSClient + * writes its lastSeenStateId into RPC requests. + */ + @Test + public void testClientSendsState() throws Exception { + AlignmentContext alignmentContext = dfs.dfs.getAlignmentContext(); + AlignmentContext spiedAlignContext = Mockito.spy(alignmentContext); + Client.setAlignmentContext(spiedAlignContext); + + // Collect RpcRequestHeaders for verification later. + final List collectedHeaders = + new ArrayList<>(); + Mockito.doAnswer(a -> { + Object[] arguments = a.getArguments(); + RpcHeaderProtos.RpcRequestHeaderProto.Builder header = + (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0]; + collectedHeaders.add(header); + return a.callRealMethod(); + }).when(spiedAlignContext).updateRequestState(Mockito.any()); + + DFSTestUtil.writeFile(dfs, new Path("/testFile4"), "shv"); + + // Ensure first header and last header have different state. + assertThat(collectedHeaders.size() > 1, is(true)); + assertThat(collectedHeaders.get(0).getStateId(), + is(not(collectedHeaders.get(collectedHeaders.size() - 1)))); + + // Ensure collected RpcRequestHeaders are in increasing order. + long lastHeader = collectedHeaders.get(0).getStateId(); + for(RpcHeaderProtos.RpcRequestHeaderProto.Builder header : + collectedHeaders.subList(1, collectedHeaders.size())) { + long currentHeader = header.getStateId(); + assertThat(currentHeader >= lastHeader, is(true)); + lastHeader = header.getStateId(); + } + } + + /** + * This test mocks an AlignmentContext to send stateIds greater than + * server's stateId in RPC requests. + */ + @Test + public void testClientSendsGreaterState() throws Exception { + AlignmentContext alignmentContext = dfs.dfs.getAlignmentContext(); + AlignmentContext spiedAlignContext = Mockito.spy(alignmentContext); + Client.setAlignmentContext(spiedAlignContext); + + // Make every client call have a stateId > server's stateId. + Mockito.doAnswer(a -> { + Object[] arguments = a.getArguments(); + RpcHeaderProtos.RpcRequestHeaderProto.Builder header = + (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0]; + try { + return a.callRealMethod(); + } finally { + header.setStateId(Long.MAX_VALUE); + } + }).when(spiedAlignContext).updateRequestState(Mockito.any()); + + GenericTestUtils.LogCapturer logCapturer = + GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.LOG); + DFSTestUtil.writeFile(dfs, new Path("/testFile4"), "shv"); + logCapturer.stopCapturing(); + + String output = logCapturer.getOutput(); + assertThat(output, containsString("A client sent stateId: ")); + } + }