HDFS-14272. [SBN read] Make ObserverReadProxyProvider initialize its state ID against the active NN on startup. Contributed by Erik Krogen.
This commit is contained in:
parent
98684fb821
commit
35200b3671
hadoop-hdfs-project
hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha
hadoop-hdfs/src
main/java/org/apache/hadoop/hdfs/server/namenode
test/java/org/apache/hadoop/hdfs/server/namenode/ha
|
@ -87,6 +87,15 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
||||||
*/
|
*/
|
||||||
private boolean observerReadEnabled;
|
private boolean observerReadEnabled;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A client using an ObserverReadProxyProvider should first sync with the
|
||||||
|
* active NameNode on startup. This ensures that the client reads data which
|
||||||
|
* is consistent with the state of the world as of the time of its
|
||||||
|
* instantiation. This variable will be true after this initial sync has
|
||||||
|
* been performed.
|
||||||
|
*/
|
||||||
|
private volatile boolean msynced = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The index into the nameNodeProxies list currently being used. Should only
|
* The index into the nameNodeProxies list currently being used. Should only
|
||||||
* be accessed in synchronized methods.
|
* be accessed in synchronized methods.
|
||||||
|
@ -224,6 +233,22 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
||||||
return currentProxy;
|
return currentProxy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This will call {@link ClientProtocol#msync()} on the active NameNode
|
||||||
|
* (via the {@link #failoverProxy}) to initialize the state of this client.
|
||||||
|
* Calling it multiple times is a no-op; only the first will perform an
|
||||||
|
* msync.
|
||||||
|
*
|
||||||
|
* @see #msynced
|
||||||
|
*/
|
||||||
|
private synchronized void initializeMsync() throws IOException {
|
||||||
|
if (msynced) {
|
||||||
|
return; // No need for an msync
|
||||||
|
}
|
||||||
|
failoverProxy.getProxy().proxy.msync();
|
||||||
|
msynced = true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An InvocationHandler to handle incoming requests. This class's invoke
|
* An InvocationHandler to handle incoming requests. This class's invoke
|
||||||
* method contains the primary logic for redirecting to observers.
|
* method contains the primary logic for redirecting to observers.
|
||||||
|
@ -244,6 +269,12 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
||||||
Object retVal;
|
Object retVal;
|
||||||
|
|
||||||
if (observerReadEnabled && isRead(method)) {
|
if (observerReadEnabled && isRead(method)) {
|
||||||
|
if (!msynced) {
|
||||||
|
// An msync() must first be performed to ensure that this client is
|
||||||
|
// up-to-date with the active's state. This will only be done once.
|
||||||
|
initializeMsync();
|
||||||
|
}
|
||||||
|
|
||||||
int failedObserverCount = 0;
|
int failedObserverCount = 0;
|
||||||
int activeCount = 0;
|
int activeCount = 0;
|
||||||
int standbyCount = 0;
|
int standbyCount = 0;
|
||||||
|
@ -315,6 +346,9 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
||||||
// This exception will be handled by higher layers
|
// This exception will be handled by higher layers
|
||||||
throw e.getCause();
|
throw e.getCause();
|
||||||
}
|
}
|
||||||
|
// If this was reached, the request reached the active, so the
|
||||||
|
// state is up-to-date with active and no further msync is needed.
|
||||||
|
msynced = true;
|
||||||
lastProxy = activeProxy;
|
lastProxy = activeProxy;
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1368,7 +1368,8 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
|
|
||||||
@Override // ClientProtocol
|
@Override // ClientProtocol
|
||||||
public void msync() throws IOException {
|
public void msync() throws IOException {
|
||||||
// TODO : need to be filled up if needed. May be a no-op here.
|
// Check for write access to ensure that msync only happens on active
|
||||||
|
namesystem.checkOperation(OperationCategory.WRITE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // ClientProtocol
|
@Override // ClientProtocol
|
||||||
|
|
|
@ -178,8 +178,12 @@ public class TestConsistentReadsObserver {
|
||||||
// Therefore, the subsequent getFileStatus call should succeed.
|
// Therefore, the subsequent getFileStatus call should succeed.
|
||||||
dfs2.getClient().msync();
|
dfs2.getClient().msync();
|
||||||
dfs2.getFileStatus(testPath);
|
dfs2.getFileStatus(testPath);
|
||||||
|
if (HATestUtil.isSentToAnyOfNameNodes(dfs2, dfsCluster, 2)) {
|
||||||
readStatus.set(1);
|
readStatus.set(1);
|
||||||
} catch (IOException e) {
|
} else {
|
||||||
|
readStatus.set(-1);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
readStatus.set(-1);
|
readStatus.set(-1);
|
||||||
}
|
}
|
||||||
|
@ -196,6 +200,71 @@ public class TestConsistentReadsObserver {
|
||||||
assertEquals(1, readStatus.get());
|
assertEquals(1, readStatus.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// A new client should first contact the active, before using an observer,
|
||||||
|
// to ensure that it is up-to-date with the current state
|
||||||
|
@Test
|
||||||
|
public void testCallFromNewClient() throws Exception {
|
||||||
|
// Set the order of nodes: Observer, Standby, Active
|
||||||
|
// This is to ensure that test doesn't pass trivially because the active is
|
||||||
|
// the first node contacted
|
||||||
|
dfsCluster.transitionToStandby(0);
|
||||||
|
dfsCluster.transitionToObserver(0);
|
||||||
|
dfsCluster.transitionToStandby(2);
|
||||||
|
dfsCluster.transitionToActive(2);
|
||||||
|
try {
|
||||||
|
// 0 == not completed, 1 == succeeded, -1 == failed
|
||||||
|
AtomicInteger readStatus = new AtomicInteger(0);
|
||||||
|
|
||||||
|
// Initialize the proxies for Observer Node.
|
||||||
|
dfs.getClient().getHAServiceState();
|
||||||
|
|
||||||
|
// Advance Observer's state ID so it is ahead of client's.
|
||||||
|
dfs.mkdir(new Path("/test"), FsPermission.getDefault());
|
||||||
|
dfsCluster.getNameNode(2).getRpcServer().rollEditLog();
|
||||||
|
dfsCluster.getNameNode(0)
|
||||||
|
.getNamesystem().getEditLogTailer().doTailEdits();
|
||||||
|
|
||||||
|
dfs.mkdir(testPath, FsPermission.getDefault());
|
||||||
|
assertSentTo(2);
|
||||||
|
|
||||||
|
Configuration conf2 = new Configuration(conf);
|
||||||
|
|
||||||
|
// Disable FS cache so two different DFS clients will be used.
|
||||||
|
conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
|
||||||
|
DistributedFileSystem dfs2 =
|
||||||
|
(DistributedFileSystem) FileSystem.get(conf2);
|
||||||
|
dfs2.getClient().getHAServiceState();
|
||||||
|
|
||||||
|
Thread reader = new Thread(() -> {
|
||||||
|
try {
|
||||||
|
dfs2.getFileStatus(testPath);
|
||||||
|
readStatus.set(1);
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
readStatus.set(-1);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
reader.start();
|
||||||
|
|
||||||
|
Thread.sleep(100);
|
||||||
|
assertEquals(0, readStatus.get());
|
||||||
|
|
||||||
|
dfsCluster.getNameNode(2).getRpcServer().rollEditLog();
|
||||||
|
dfsCluster.getNameNode(0)
|
||||||
|
.getNamesystem().getEditLogTailer().doTailEdits();
|
||||||
|
|
||||||
|
GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 10000);
|
||||||
|
assertEquals(1, readStatus.get());
|
||||||
|
} finally {
|
||||||
|
// Put the cluster back the way it was when the test started
|
||||||
|
dfsCluster.transitionToStandby(2);
|
||||||
|
dfsCluster.transitionToObserver(2);
|
||||||
|
dfsCluster.transitionToStandby(0);
|
||||||
|
dfsCluster.transitionToActive(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUncoordinatedCall() throws Exception {
|
public void testUncoordinatedCall() throws Exception {
|
||||||
// make a write call so that client will be ahead of
|
// make a write call so that client will be ahead of
|
||||||
|
|
Loading…
Reference in New Issue