From e2337bfc576edddcb702e262df53e85e1f8cba45 Mon Sep 17 00:00:00 2001 From: Chen Liang Date: Mon, 13 Aug 2018 10:30:06 -0700 Subject: [PATCH] HDFS-13767. Add msync server implementation. Contributed by Chen Liang. --- .../apache/hadoop/ipc/AlignmentContext.java | 9 ++- .../java/org/apache/hadoop/ipc/Server.java | 37 +++++++++-- .../apache/hadoop/hdfs/ClientGSIContext.java | 6 +- .../ClientNamenodeProtocolTranslatorPB.java | 1 - .../server/namenode/GlobalStateIdContext.java | 20 ++++-- .../hdfs/TestStateAlignmentContextWithHA.java | 36 ----------- .../server/namenode/ha/TestObserverNode.java | 63 ++++++++++++++++--- 7 files changed, 114 insertions(+), 58 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java index 66d6edc52e7..0e8b960ecd3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java @@ -60,6 +60,13 @@ public interface AlignmentContext { * This is the intended server method call to implement to receive * client state info during RPC response header processing. * @param header The RPC request header. + * @return state id of in the request header. */ - void receiveRequestState(RpcRequestHeaderProto header); + long receiveRequestState(RpcRequestHeaderProto header); + + /** + * Returns the last seen state id of the alignment context instance. + * @return the value of the last seen state id. + */ + long getLastSeenStateId(); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 32de3d3fe84..747a0844889 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -705,6 +705,7 @@ public abstract class Server { private boolean deferredResponse = false; private int priorityLevel; // the priority level assigned by scheduler, 0 by default + private long clientStateId; Call() { this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT, @@ -735,6 +736,7 @@ public abstract class Server { this.clientId = clientId; this.traceScope = traceScope; this.callerContext = callerContext; + this.clientStateId = Long.MIN_VALUE; } @Override @@ -812,6 +814,14 @@ public abstract class Server { this.priorityLevel = priorityLevel; } + public long getClientStateId() { + return this.clientStateId; + } + + public void setClientStateId(long stateId) { + this.clientStateId = stateId; + } + @InterfaceStability.Unstable public void deferResponse() { this.deferredResponse = true; @@ -2500,11 +2510,6 @@ public abstract class Server { } } - if (alignmentContext != null) { - // Check incoming RPC request's state. - alignmentContext.receiveRequestState(header); - } - CallerContext callerContext = null; if (header.hasCallerContext()) { callerContext = @@ -2521,6 +2526,10 @@ public abstract class Server { // Save the priority level assignment by the scheduler call.setPriorityLevel(callQueue.getPriorityLevel(call)); + if(alignmentContext != null) { + long stateId = alignmentContext.receiveRequestState(header); + call.setClientStateId(stateId); + } try { internalQueueCall(call); @@ -2702,6 +2711,24 @@ public abstract class Server { TraceScope traceScope = null; try { final Call call = callQueue.take(); // pop the queue; maybe blocked here + if (alignmentContext != null && call.getClientStateId() > + alignmentContext.getLastSeenStateId()) { + /* + * The call processing should be postponed until the client call's + * state id is aligned (>=) with the server state id. + + * NOTE: + * Inserting the call back to the queue can change the order of call + * execution comparing to their original placement into the queue. + * This is not a problem, because Hadoop RPC does not have any + * constraints on ordering the incoming rpc requests. + * In case of Observer, it handles only reads, which are + * commutative. + */ + //Re-queue the call and continue + internalQueueCall(call); + continue; + } if (LOG.isDebugEnabled()) { LOG.debug(Thread.currentThread().getName() + ": " + call + " for RpcKind " + call.rpcKind); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java index 241ec059c21..10fa0e15e4a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java @@ -39,7 +39,8 @@ public class ClientGSIContext implements AlignmentContext { private final LongAccumulator lastSeenStateId = new LongAccumulator(Math::max, Long.MIN_VALUE); - long getLastSeenStateId() { + @Override + public long getLastSeenStateId() { return lastSeenStateId.get(); } @@ -73,7 +74,8 @@ public class ClientGSIContext implements AlignmentContext { * Client does not receive RPC requests therefore this does nothing. */ @Override - public void receiveRequestState(RpcRequestHeaderProto header) { + public long receiveRequestState(RpcRequestHeaderProto header) { // Do nothing. + return 0; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index f5aa1749b19..13c9568ddc5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -159,7 +159,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Mkdirs import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCacheDirectiveRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MsyncRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MsyncResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.OpenFilesBatchResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java index f0ebf986b75..00166929338 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; @@ -46,7 +47,11 @@ class GlobalStateIdContext implements AlignmentContext { */ @Override public void updateResponseState(RpcResponseHeaderProto.Builder header) { - header.setStateId(namesystem.getLastWrittenTransactionId()); + // Using getCorrectLastAppliedOrWrittenTxId will acquire the lock on + // FSEditLog. This is needed so that ANN will return the correct state id + // it currently has. But this may not be necessary for Observer, may want + // revisit for optimization. Same goes to receiveRequestState. + header.setStateId(getLastSeenStateId()); } /** @@ -71,13 +76,20 @@ class GlobalStateIdContext implements AlignmentContext { * Server side implementation for processing state alignment info in requests. */ @Override - public void receiveRequestState(RpcRequestHeaderProto header) { - long serverStateId = namesystem.getLastWrittenTransactionId(); + public long receiveRequestState(RpcRequestHeaderProto header) { + long serverStateId = + namesystem.getFSImage().getCorrectLastAppliedOrWrittenTxId(); long clientStateId = header.getStateId(); - if (clientStateId > serverStateId) { + if (clientStateId > serverStateId && + HAServiceProtocol.HAServiceState.ACTIVE.equals(namesystem.getState())) { FSNamesystem.LOG.warn("A client sent stateId: " + clientStateId + ", but server state is: " + serverStateId); } + return clientStateId; } + @Override + public long getLastSeenStateId() { + return namesystem.getFSImage().getCorrectLastAppliedOrWrittenTxId(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java index 4fcfd8c5df1..ae828814f7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java @@ -280,42 +280,6 @@ public class TestStateAlignmentContextWithHA { } } - /** - * This test mocks an AlignmentContext to send stateIds greater than - * server's stateId in RPC requests. - */ - @Test - public void testClientSendsGreaterState() throws Exception { - ClientGSIContext alignmentContext = new ClientGSIContext(); - ClientGSIContext spiedAlignContext = Mockito.spy(alignmentContext); - spy = spiedAlignContext; - - try (DistributedFileSystem clearDfs = - (DistributedFileSystem) FileSystem.get(CONF)) { - - // Make every client call have a stateId > server's stateId. - Mockito.doAnswer(a -> { - Object[] arguments = a.getArguments(); - RpcHeaderProtos.RpcRequestHeaderProto.Builder header = - (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0]; - try { - return a.callRealMethod(); - } finally { - header.setStateId(Long.MAX_VALUE); - } - }).when(spiedAlignContext).updateRequestState(Mockito.any()); - - GenericTestUtils.LogCapturer logCapturer = - GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.LOG); - - DFSTestUtil.writeFile(clearDfs, new Path("/testFile4"), "shv"); - logCapturer.stopCapturing(); - - String output = logCapturer.getOutput(); - assertThat(output, containsString("A client sent stateId: ")); - } - } - /** * This test checks if after a client writes we can see the state id in * updated via the response. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java index 98ffefd1050..de34454e38d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -32,15 +32,21 @@ import org.junit.Before; import org.junit.Test; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.Proxy; import java.net.URI; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; + // Main unit tests for ObserverNode public class TestObserverNode { private Configuration conf; @@ -58,7 +64,9 @@ public class TestObserverNode { @Before public void setUp() throws Exception { conf = new Configuration(); - setUpCluster(1); + conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, true); + conf.setTimeDuration( + DFS_HA_TAILEDITS_PERIOD_KEY, 100, TimeUnit.MILLISECONDS); testPath = new Path("/test"); testPath2 = new Path("/test2"); @@ -74,18 +82,12 @@ public class TestObserverNode { @Test public void testSimpleRead() throws Exception { + setUpCluster(1); setObserverRead(true); dfs.mkdir(testPath, FsPermission.getDefault()); assertSentTo(0); - try { - dfs.getFileStatus(testPath); - fail("Should throw FileNotFoundException"); - } catch (FileNotFoundException e) { - // Pass - } - rollEditLogAndTail(0); dfs.getFileStatus(testPath); assertSentTo(2); @@ -96,6 +98,7 @@ public class TestObserverNode { @Test public void testFailover() throws Exception { + setUpCluster(1); setObserverRead(false); dfs.mkdir(testPath, FsPermission.getDefault()); @@ -115,6 +118,7 @@ public class TestObserverNode { @Test public void testDoubleFailover() throws Exception { + setUpCluster(1); setObserverRead(true); dfs.mkdir(testPath, FsPermission.getDefault()); @@ -180,6 +184,7 @@ public class TestObserverNode { @Test public void testObserverShutdown() throws Exception { + setUpCluster(1); setObserverRead(true); dfs.mkdir(testPath, FsPermission.getDefault()); @@ -201,6 +206,7 @@ public class TestObserverNode { @Test public void testObserverFailOverAndShutdown() throws Exception { + setUpCluster(1); // Test the case when there is a failover before ONN shutdown setObserverRead(true); @@ -273,6 +279,7 @@ public class TestObserverNode { @Test public void testBootstrap() throws Exception { + setUpCluster(1); for (URI u : dfsCluster.getNameDirs(2)) { File dir = new File(u.getPath()); assertTrue(FileUtil.fullyDelete(dir)); @@ -284,6 +291,44 @@ public class TestObserverNode { assertEquals(0, rc); } + @Test + public void testMsyncSimple() throws Exception { + // disable fast path here because this test's assertions are based on the + // timing of explicitly called rollEditLogAndTail. Although this means this + // test takes some time to run + // TODO: revisit if there is a better way. + conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, false); + conf.setTimeDuration(DFS_HA_LOGROLL_PERIOD_KEY, 60, TimeUnit.SECONDS); + conf.setTimeDuration( + DFS_HA_TAILEDITS_PERIOD_KEY, 30, TimeUnit.SECONDS); + setUpCluster(1); + setObserverRead(true); + + AtomicBoolean readSucceed = new AtomicBoolean(false); + + dfs.mkdir(testPath, FsPermission.getDefault()); + assertSentTo(0); + + Thread reader = new Thread(() -> { + try { + // this read will block until roll and tail edits happen. + dfs.getFileStatus(testPath); + readSucceed.set(true); + } catch (IOException e) { + e.printStackTrace(); + } + }); + + reader.start(); + // the reader is still blocking, not succeeded yet. + assertFalse(readSucceed.get()); + rollEditLogAndTail(0); + // wait a while for all the change to be done + Thread.sleep(100); + // the reader should have succeed. + assertTrue(readSucceed.get()); + } + private void setUpCluster(int numObservers) throws Exception { qjmhaCluster = new MiniQJMHACluster.Builder(conf) .setNumNameNodes(2 + numObservers)