HDFS-14272. [SBN read] Make ObserverReadProxyProvider initialize its state ID against the active NN on startup. Contributed by Erik Krogen.

This commit is contained in:
Erik Krogen 2019-03-01 12:58:55 -08:00 committed by Akira Ajisaka
parent 9e53088493
commit fc17ba172b
No known key found for this signature in database
GPG Key ID: C1EDBB9CA400FD50
3 changed files with 107 additions and 3 deletions

View File

@ -87,6 +87,15 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
*/
private boolean observerReadEnabled;
/**
* A client using an ObserverReadProxyProvider should first sync with the
* active NameNode on startup. This ensures that the client reads data which
* is consistent with the state of the world as of the time of its
* instantiation. This variable will be true after this initial sync has
* been performed.
*/
private volatile boolean msynced = false;
/**
* The index into the nameNodeProxies list currently being used. Should only
* be accessed in synchronized methods.
@ -224,6 +233,22 @@ private synchronized NNProxyInfo<T> changeProxy(NNProxyInfo<T> initial) {
return currentProxy;
}
/**
* This will call {@link ClientProtocol#msync()} on the active NameNode
* (via the {@link #failoverProxy}) to initialize the state of this client.
* Calling it multiple times is a no-op; only the first will perform an
* msync.
*
* @see #msynced
*/
private synchronized void initializeMsync() throws IOException {
if (msynced) {
return; // No need for an msync
}
failoverProxy.getProxy().proxy.msync();
msynced = true;
}
/**
* An InvocationHandler to handle incoming requests. This class's invoke
* method contains the primary logic for redirecting to observers.
@ -244,6 +269,12 @@ public Object invoke(Object proxy, final Method method, final Object[] args)
Object retVal;
if (observerReadEnabled && isRead(method)) {
if (!msynced) {
// An msync() must first be performed to ensure that this client is
// up-to-date with the active's state. This will only be done once.
initializeMsync();
}
int failedObserverCount = 0;
int activeCount = 0;
int standbyCount = 0;
@ -315,6 +346,9 @@ public Object invoke(Object proxy, final Method method, final Object[] args)
// This exception will be handled by higher layers
throw e.getCause();
}
// If this was reached, the request reached the active, so the
// state is up-to-date with active and no further msync is needed.
msynced = true;
lastProxy = activeProxy;
return retVal;
}

View File

@ -1409,7 +1409,8 @@ public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
@Override // ClientProtocol
public void msync() throws IOException {
// TODO : need to be filled up if needed. May be a no-op here.
// Check for write access to ensure that msync only happens on active
namesystem.checkOperation(OperationCategory.WRITE);
}
@Override // ClientProtocol

View File

@ -178,8 +178,12 @@ public void testMsync() throws Exception {
// Therefore, the subsequent getFileStatus call should succeed.
dfs2.getClient().msync();
dfs2.getFileStatus(testPath);
readStatus.set(1);
} catch (IOException e) {
if (HATestUtil.isSentToAnyOfNameNodes(dfs2, dfsCluster, 2)) {
readStatus.set(1);
} else {
readStatus.set(-1);
}
} catch (Exception e) {
e.printStackTrace();
readStatus.set(-1);
}
@ -196,6 +200,71 @@ public void testMsync() throws Exception {
assertEquals(1, readStatus.get());
}
// A new client should first contact the active, before using an observer,
// to ensure that it is up-to-date with the current state
@Test
public void testCallFromNewClient() throws Exception {
// Set the order of nodes: Observer, Standby, Active
// This is to ensure that test doesn't pass trivially because the active is
// the first node contacted
dfsCluster.transitionToStandby(0);
dfsCluster.transitionToObserver(0);
dfsCluster.transitionToStandby(2);
dfsCluster.transitionToActive(2);
try {
// 0 == not completed, 1 == succeeded, -1 == failed
AtomicInteger readStatus = new AtomicInteger(0);
// Initialize the proxies for Observer Node.
dfs.getClient().getHAServiceState();
// Advance Observer's state ID so it is ahead of client's.
dfs.mkdir(new Path("/test"), FsPermission.getDefault());
dfsCluster.getNameNode(2).getRpcServer().rollEditLog();
dfsCluster.getNameNode(0)
.getNamesystem().getEditLogTailer().doTailEdits();
dfs.mkdir(testPath, FsPermission.getDefault());
assertSentTo(2);
Configuration conf2 = new Configuration(conf);
// Disable FS cache so two different DFS clients will be used.
conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
DistributedFileSystem dfs2 =
(DistributedFileSystem) FileSystem.get(conf2);
dfs2.getClient().getHAServiceState();
Thread reader = new Thread(() -> {
try {
dfs2.getFileStatus(testPath);
readStatus.set(1);
} catch (Exception e) {
e.printStackTrace();
readStatus.set(-1);
}
});
reader.start();
Thread.sleep(100);
assertEquals(0, readStatus.get());
dfsCluster.getNameNode(2).getRpcServer().rollEditLog();
dfsCluster.getNameNode(0)
.getNamesystem().getEditLogTailer().doTailEdits();
GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 10000);
assertEquals(1, readStatus.get());
} finally {
// Put the cluster back the way it was when the test started
dfsCluster.transitionToStandby(2);
dfsCluster.transitionToObserver(2);
dfsCluster.transitionToStandby(0);
dfsCluster.transitionToActive(0);
}
}
@Test
public void testUncoordinatedCall() throws Exception {
// make a write call so that client will be ahead of