diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java new file mode 100644 index 00000000000..f952325ad93 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; + +/** + * This interface intends to align the state between client and server + * via RPC communication. + * + * This should be implemented separately on the client side and server side + * and can be used to pass state information on RPC responses from server + * to client. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public interface AlignmentContext { + + /** + * This is the intended server method call to implement to pass state info + * during RPC response header construction. + * @param header The RPC response header builder. + */ + void updateResponseState(RpcResponseHeaderProto.Builder header); + + /** + * This is the intended client method call to implement to recieve state info + * during RPC response processing. + * @param header The RPC response header. + */ + void receiveResponseState(RpcResponseHeaderProto header); + +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index c6ac7328066..096c164ccd8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -103,6 +103,12 @@ public class Client implements AutoCloseable { return false; } }; + private static AlignmentContext alignmentContext; + + /** Set alignment context to use to fetch state alignment info from RPC. */ + public static void setAlignmentContext(AlignmentContext ac) { + alignmentContext = ac; + } @SuppressWarnings("unchecked") @Unstable @@ -1186,6 +1192,9 @@ public class Client implements AutoCloseable { final Call call = calls.remove(callId); call.setRpcResponse(value); } + if (alignmentContext != null) { + alignmentContext.receiveResponseState(header); + } // verify that packet length was correct if (packet.remaining() > 0) { throw new RpcClientException("RPC response length mismatch"); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index 639bbadffbd..4b03990c151 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -337,11 +337,11 @@ public class ProtobufRpcEngine implements RpcEngine { String bindAddress, int port, int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf, SecretManager secretManager, - String portRangeConfig) + String portRangeConfig, AlignmentContext alignmentContext) throws IOException { return new Server(protocol, protocolImpl, conf, bindAddress, port, numHandlers, numReaders, queueSizePerHandler, verbose, secretManager, - portRangeConfig); + portRangeConfig, alignmentContext); } public static class Server extends RPC.Server { @@ -410,17 +410,18 @@ public class ProtobufRpcEngine implements RpcEngine { * @param numHandlers the number of method handler threads to run * @param verbose whether each call should be logged * @param portRangeConfig A config parameter that can be used to restrict - * the range of ports used when port is 0 (an ephemeral port) + * @param alignmentContext provides server state info on client responses */ public Server(Class protocolClass, Object protocolImpl, Configuration conf, String bindAddress, int port, int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, SecretManager secretManager, - String portRangeConfig) + String portRangeConfig, AlignmentContext alignmentContext) throws IOException { super(bindAddress, port, null, numHandlers, numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl .getClass().getName()), secretManager, portRangeConfig); + setAlignmentContext(alignmentContext); this.verbose = verbose; registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass, protocolImpl); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index 8f8eda6ded7..5e3275e3b86 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -717,6 +717,7 @@ public class RPC { private final Configuration conf; private SecretManager secretManager = null; private String portRangeConfig = null; + private AlignmentContext alignmentContext = null; public Builder(Configuration conf) { this.conf = conf; @@ -783,6 +784,12 @@ public class RPC { return this; } + /** Default: null */ + public Builder setAlignmentContext(AlignmentContext alignmentContext) { + this.alignmentContext = alignmentContext; + return this; + } + /** * Build the RPC Server. * @throws IOException on error @@ -802,7 +809,8 @@ public class RPC { return getProtocolEngine(this.protocol, this.conf).getServer( this.protocol, this.instance, this.bindAddress, this.port, this.numHandlers, this.numReaders, this.queueSizePerHandler, - this.verbose, this.conf, this.secretManager, this.portRangeConfig); + this.verbose, this.conf, this.secretManager, this.portRangeConfig, + this.alignmentContext); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java index 047722e649e..8a431726938 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java @@ -67,6 +67,7 @@ public interface RpcEngine { * @param secretManager The secret manager to use to validate incoming requests. * @param portRangeConfig A config parameter that can be used to restrict * the range of ports used when port is 0 (an ephemeral port) + * @param alignmentContext provides server state info on client responses * @return The Server instance * @throws IOException on any error */ @@ -75,8 +76,8 @@ public interface RpcEngine { int queueSizePerHandler, boolean verbose, Configuration conf, SecretManager secretManager, - String portRangeConfig - ) throws IOException; + String portRangeConfig, + AlignmentContext alignmentContext) throws IOException; /** * Returns a proxy for ProtocolMetaInfoPB, which uses the given connection diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 196b3d1db5f..28d97012764 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -140,7 +140,13 @@ public abstract class Server { private RpcSaslProto negotiateResponse; private ExceptionsHandler exceptionsHandler = new ExceptionsHandler(); private Tracer tracer; - + + private AlignmentContext alignmentContext; + /** + * Logical name of the server used in metrics and monitor. + */ + private final String serverName; + /** * Add exception classes for which server won't log stack traces. * @@ -159,6 +165,15 @@ public abstract class Server { exceptionsHandler.addSuppressedLoggingExceptions(exceptionClass); } + /** + * Set alignment context to pass state info thru RPC. + * + * @param alignmentContext alignment state context + */ + public void setAlignmentContext(AlignmentContext alignmentContext) { + this.alignmentContext = alignmentContext; + } + /** * ExceptionsHandler manages Exception groups for special handling * e.g., terse exception group for concise logging messages @@ -2775,6 +2790,7 @@ public abstract class Server { this.rpcRequestClass = rpcRequestClass; this.handlerCount = handlerCount; this.socketSendBufferSize = 0; + this.serverName = serverName; this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT); if (queueSizePerHandler != -1) { @@ -2916,6 +2932,9 @@ public abstract class Server { headerBuilder.setRetryCount(call.retryCount); headerBuilder.setStatus(status); headerBuilder.setServerIpcVersionNum(CURRENT_VERSION); + if(alignmentContext != null) { + alignmentContext.updateResponseState(headerBuilder); + } if (status == RpcStatusProto.SUCCESS) { RpcResponseHeaderProto header = headerBuilder.build(); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java index f2b5862372a..ebeff94f71d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -323,11 +323,11 @@ public class WritableRpcEngine implements RpcEngine { int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf, SecretManager secretManager, - String portRangeConfig) + String portRangeConfig, AlignmentContext alignmentContext) throws IOException { return new Server(protocolClass, protocolImpl, conf, bindAddress, port, numHandlers, numReaders, queueSizePerHandler, verbose, secretManager, - portRangeConfig); + portRangeConfig, alignmentContext); } @@ -397,18 +397,45 @@ public class WritableRpcEngine implements RpcEngine { * @param port the port to listen for connections on * @param numHandlers the number of method handler threads to run * @param verbose whether each call should be logged + * + * @deprecated use Server#Server(Class, Object, + * Configuration, String, int, int, int, int, boolean, SecretManager) */ + @Deprecated public Server(Class protocolClass, Object protocolImpl, Configuration conf, String bindAddress, int port, int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, SecretManager secretManager, String portRangeConfig) throws IOException { + this(null, protocolImpl, conf, bindAddress, port, + numHandlers, numReaders, queueSizePerHandler, verbose, + secretManager, null, null); + } + + /** + * Construct an RPC server. + * @param protocolClass - the protocol being registered + * can be null for compatibility with old usage (see below for details) + * @param protocolImpl the protocol impl that will be called + * @param conf the configuration to use + * @param bindAddress the address to bind on to listen for connection + * @param port the port to listen for connections on + * @param numHandlers the number of method handler threads to run + * @param verbose whether each call should be logged + * @param alignmentContext provides server state info on client responses + */ + public Server(Class protocolClass, Object protocolImpl, + Configuration conf, String bindAddress, int port, + int numHandlers, int numReaders, int queueSizePerHandler, + boolean verbose, SecretManager secretManager, + String portRangeConfig, AlignmentContext alignmentContext) + throws IOException { super(bindAddress, port, null, numHandlers, numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl.getClass().getName()), secretManager, portRangeConfig); - + setAlignmentContext(alignmentContext); this.verbose = verbose; diff --git a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto index aa146162896..bfe13017fa4 100644 --- a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto +++ b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto @@ -155,6 +155,7 @@ message RpcResponseHeaderProto { optional RpcErrorCodeProto errorDetail = 6; // in case of error optional bytes clientId = 7; // Globally unique client ID optional sint32 retryCount = 8 [default = -1]; + optional int64 stateId = 9; // The last written Global State ID } message RpcSaslProto { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index f3bc625d2a9..0f97dbd18f0 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -301,7 +301,8 @@ public class TestRPC extends TestRpcBase { int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf, SecretManager secretManager, - String portRangeConfig) throws IOException { + String portRangeConfig, AlignmentContext alignmentContext) + throws IOException { return null; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java new file mode 100644 index 00000000000..3d722f8496c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.ipc.AlignmentContext; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; + +import java.util.concurrent.atomic.LongAccumulator; + +/** + * This is the client side implementation responsible for receiving + * state alignment info from server(s). + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +class ClientGCIContext implements AlignmentContext { + + private final DFSClient dfsClient; + private final LongAccumulator lastSeenStateId = + new LongAccumulator(Math::max, Long.MIN_VALUE); + + /** + * Client side constructor. + * @param dfsClient client side state receiver + */ + ClientGCIContext(DFSClient dfsClient) { + this.dfsClient = dfsClient; + } + + /** + * Client side implementation only receives state alignment info. + * It does not provide state alignment info therefore this does nothing. + */ + @Override + public void updateResponseState(RpcResponseHeaderProto.Builder header) { + // Do nothing. + } + + /** + * Client side implementation for receiving state alignment info. + */ + @Override + public void receiveResponseState(RpcResponseHeaderProto header) { + lastSeenStateId.accumulate(header.getStateId()); + dfsClient.lastSeenStateId = lastSeenStateId.get(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index e2263ef069c..bc88b9de39d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -165,6 +165,7 @@ import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.LossyRetryInvocationHandler; +import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RetriableException; @@ -218,6 +219,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, final UserGroupInformation ugi; volatile boolean clientRunning = true; volatile long lastLeaseRenewal; + volatile long lastSeenStateId; private volatile FsServerDefaults serverDefaults; private volatile long serverDefaultsLastUpdate; final String clientName; @@ -395,6 +397,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, this.saslClient = new SaslDataTransferClient( conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth); + Client.setAlignmentContext(new ClientGCIContext(this)); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java new file mode 100644 index 00000000000..2d7d94e3a4b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.ipc.AlignmentContext; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; + +/** + * This is the server side implementation responsible for passing + * state alignment info to clients. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +class GlobalStateIdContext implements AlignmentContext { + private final FSNamesystem namesystem; + + /** + * Server side constructor. + * @param namesystem server side state provider + */ + GlobalStateIdContext(FSNamesystem namesystem) { + this.namesystem = namesystem; + } + + /** + * Server side implementation for providing state alignment info. + */ + @Override + public void updateResponseState(RpcResponseHeaderProto.Builder header) { + header.setStateId(namesystem.getLastWrittenTransactionId()); + } + + /** + * Server side implementation only provides state alignment info. + * It does not receive state alignment info therefore this does nothing. + */ + @Override + public void receiveResponseState(RpcResponseHeaderProto header) { + // Do nothing. + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 4fafd42cb18..e035c062cf8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -455,6 +455,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { .setNumHandlers(handlerCount) .setVerbose(false) .setSecretManager(namesystem.getDelegationTokenSecretManager()) + .setAlignmentContext(new GlobalStateIdContext(namesystem)) .build(); // Add all the RPC protocols that the namenode implements diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java new file mode 100644 index 00000000000..590f7020655 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; + +/** + * Class is used to test server sending state alignment information to clients + * via RPC and likewise clients receiving and updating their last known + * state alignment info. + * These tests check that after a single RPC call a client will have caught up + * to the most recent alignment state of the server. + */ +public class TestStateAlignmentContext { + + static final long BLOCK_SIZE = 64 * 1024; + private static final int NUMDATANODES = 3; + private static final Configuration CONF = new HdfsConfiguration(); + + private static MiniDFSCluster cluster; + private static DistributedFileSystem dfs; + + @BeforeClass + public static void startUpCluster() throws IOException { + // disable block scanner + CONF.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); + // Set short retry timeouts so this test runs faster + CONF.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10); + CONF.setBoolean("fs.hdfs.impl.disable.cache", true); + cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(NUMDATANODES) + .build(); + cluster.waitActive(); + } + + @Before + public void before() throws IOException { + dfs = cluster.getFileSystem(); + } + + @AfterClass + public static void shutDownCluster() throws IOException { + if (dfs != null) { + dfs.close(); + dfs = null; + } + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + @After + public void after() throws IOException { + dfs.close(); + } + + /** + * This test checks if after a client writes we can see the state id in + * updated via the response. + */ + @Test + public void testStateTransferOnWrite() throws Exception { + long preWriteState = cluster.getNamesystem().getLastWrittenTransactionId(); + DFSTestUtil.writeFile(dfs, new Path("/testFile1"), "abc"); + long clientState = dfs.dfs.lastSeenStateId; + long postWriteState = cluster.getNamesystem().getLastWrittenTransactionId(); + // Write(s) should have increased state. Check for greater than. + assertThat(clientState > preWriteState, is(true)); + // Client and server state should be equal. + assertThat(clientState, is(postWriteState)); + } + + /** + * This test checks if after a client reads we can see the state id in + * updated via the response. + */ + @Test + public void testStateTransferOnRead() throws Exception { + DFSTestUtil.writeFile(dfs, new Path("/testFile2"), "123"); + long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId(); + DFSTestUtil.readFile(dfs, new Path("/testFile2")); + // Read should catch client up to last written state. + assertThat(dfs.dfs.lastSeenStateId, is(lastWrittenId)); + } + + /** + * This test checks that a fresh client starts with no state and becomes + * updated of state from RPC call. + */ + @Test + public void testStateTransferOnFreshClient() throws Exception { + DFSTestUtil.writeFile(dfs, new Path("/testFile3"), "ezpz"); + long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId(); + try (DistributedFileSystem clearDfs = + (DistributedFileSystem) FileSystem.get(CONF)) { + assertThat(clearDfs.dfs.lastSeenStateId, is(0L)); + DFSTestUtil.readFile(clearDfs, new Path("/testFile3")); + assertThat(clearDfs.dfs.lastSeenStateId, is(lastWrittenId)); + } + } + +}