diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java index 356600f22f8..fe867c516f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java @@ -87,6 +87,15 @@ public class ObserverReadProxyProvider */ private boolean observerReadEnabled; + /** + * A client using an ObserverReadProxyProvider should first sync with the + * active NameNode on startup. This ensures that the client reads data which + * is consistent with the state of the world as of the time of its + * instantiation. This variable will be true after this initial sync has + * been performed. + */ + private volatile boolean msynced = false; + /** * The index into the nameNodeProxies list currently being used. Should only * be accessed in synchronized methods. @@ -224,6 +233,22 @@ public class ObserverReadProxyProvider return currentProxy; } + /** + * This will call {@link ClientProtocol#msync()} on the active NameNode + * (via the {@link #failoverProxy}) to initialize the state of this client. + * Calling it multiple times is a no-op; only the first will perform an + * msync. + * + * @see #msynced + */ + private synchronized void initializeMsync() throws IOException { + if (msynced) { + return; // No need for an msync + } + failoverProxy.getProxy().proxy.msync(); + msynced = true; + } + /** * An InvocationHandler to handle incoming requests. This class's invoke * method contains the primary logic for redirecting to observers. @@ -244,6 +269,12 @@ public class ObserverReadProxyProvider Object retVal; if (observerReadEnabled && isRead(method)) { + if (!msynced) { + // An msync() must first be performed to ensure that this client is + // up-to-date with the active's state. This will only be done once. + initializeMsync(); + } + int failedObserverCount = 0; int activeCount = 0; int standbyCount = 0; @@ -315,6 +346,9 @@ public class ObserverReadProxyProvider // This exception will be handled by higher layers throw e.getCause(); } + // If this was reached, the request reached the active, so the + // state is up-to-date with active and no further msync is needed. + msynced = true; lastProxy = activeProxy; return retVal; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 57aa3340b95..9a15144e017 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -1368,7 +1368,8 @@ public class NameNodeRpcServer implements NamenodeProtocols { @Override // ClientProtocol public void msync() throws IOException { - // TODO : need to be filled up if needed. May be a no-op here. + // Check for write access to ensure that msync only happens on active + namesystem.checkOperation(OperationCategory.WRITE); } @Override // ClientProtocol diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java index 2845670d44f..2bed37ca8c0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java @@ -178,8 +178,12 @@ public class TestConsistentReadsObserver { // Therefore, the subsequent getFileStatus call should succeed. dfs2.getClient().msync(); dfs2.getFileStatus(testPath); - readStatus.set(1); - } catch (IOException e) { + if (HATestUtil.isSentToAnyOfNameNodes(dfs2, dfsCluster, 2)) { + readStatus.set(1); + } else { + readStatus.set(-1); + } + } catch (Exception e) { e.printStackTrace(); readStatus.set(-1); } @@ -196,6 +200,71 @@ public class TestConsistentReadsObserver { assertEquals(1, readStatus.get()); } + // A new client should first contact the active, before using an observer, + // to ensure that it is up-to-date with the current state + @Test + public void testCallFromNewClient() throws Exception { + // Set the order of nodes: Observer, Standby, Active + // This is to ensure that test doesn't pass trivially because the active is + // the first node contacted + dfsCluster.transitionToStandby(0); + dfsCluster.transitionToObserver(0); + dfsCluster.transitionToStandby(2); + dfsCluster.transitionToActive(2); + try { + // 0 == not completed, 1 == succeeded, -1 == failed + AtomicInteger readStatus = new AtomicInteger(0); + + // Initialize the proxies for Observer Node. + dfs.getClient().getHAServiceState(); + + // Advance Observer's state ID so it is ahead of client's. + dfs.mkdir(new Path("/test"), FsPermission.getDefault()); + dfsCluster.getNameNode(2).getRpcServer().rollEditLog(); + dfsCluster.getNameNode(0) + .getNamesystem().getEditLogTailer().doTailEdits(); + + dfs.mkdir(testPath, FsPermission.getDefault()); + assertSentTo(2); + + Configuration conf2 = new Configuration(conf); + + // Disable FS cache so two different DFS clients will be used. + conf2.setBoolean("fs.hdfs.impl.disable.cache", true); + DistributedFileSystem dfs2 = + (DistributedFileSystem) FileSystem.get(conf2); + dfs2.getClient().getHAServiceState(); + + Thread reader = new Thread(() -> { + try { + dfs2.getFileStatus(testPath); + readStatus.set(1); + } catch (Exception e) { + e.printStackTrace(); + readStatus.set(-1); + } + }); + + reader.start(); + + Thread.sleep(100); + assertEquals(0, readStatus.get()); + + dfsCluster.getNameNode(2).getRpcServer().rollEditLog(); + dfsCluster.getNameNode(0) + .getNamesystem().getEditLogTailer().doTailEdits(); + + GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 10000); + assertEquals(1, readStatus.get()); + } finally { + // Put the cluster back the way it was when the test started + dfsCluster.transitionToStandby(2); + dfsCluster.transitionToObserver(2); + dfsCluster.transitionToStandby(0); + dfsCluster.transitionToActive(0); + } + } + @Test public void testUncoordinatedCall() throws Exception { // make a write call so that client will be ahead of