HDFS-14279. [SBN read] Fix race condition in ObserverReadProxyProvider. Contributed by Erik Krogen.

This commit is contained in:
Erik Krogen 2019-02-14 08:59:56 -08:00 committed by Chen Liang
parent 73d56b9d2a
commit 98434bed5c
1 changed files with 9 additions and 14 deletions

View File

@ -92,13 +92,12 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
* be accessed in synchronized methods. * be accessed in synchronized methods.
*/ */
private int currentIndex = -1; private int currentIndex = -1;
/** /**
* The proxy being used currently; this will match with currentIndex above. * The proxy being used currently. Should only be accessed in synchronized
* This field is volatile to allow reads without synchronization; updates * methods.
* should still be performed synchronously to maintain consistency between
* currentIndex and this field.
*/ */
private volatile NNProxyInfo<T> currentProxy; private NNProxyInfo<T> currentProxy;
/** The last proxy that has been used. Only used for testing. */ /** The last proxy that has been used. Only used for testing. */
private volatile ProxyInfo<T> lastProxy = null; private volatile ProxyInfo<T> lastProxy = null;
@ -191,10 +190,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
* {@link #changeProxy(NNProxyInfo)} to initialize one. * {@link #changeProxy(NNProxyInfo)} to initialize one.
*/ */
private NNProxyInfo<T> getCurrentProxy() { private NNProxyInfo<T> getCurrentProxy() {
if (currentProxy == null) { return changeProxy(null);
changeProxy(null);
}
return currentProxy;
} }
/** /**
@ -205,15 +201,13 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
* returning. * returning.
* *
* @param initial The expected current proxy * @param initial The expected current proxy
* @return The new proxy that should be used.
*/ */
private synchronized void changeProxy(NNProxyInfo<T> initial) { private synchronized NNProxyInfo<T> changeProxy(NNProxyInfo<T> initial) {
if (currentProxy != initial) { if (currentProxy != initial) {
// Must have been a concurrent modification; ignore the move request // Must have been a concurrent modification; ignore the move request
return; return currentProxy;
} }
// Attempt to force concurrent callers of getCurrentProxy to wait for the
// new proxy; best-effort by setting currentProxy to null
currentProxy = null;
currentIndex = (currentIndex + 1) % nameNodeProxies.size(); currentIndex = (currentIndex + 1) % nameNodeProxies.size();
currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex)); currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
try { try {
@ -227,6 +221,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
LOG.debug("Changed current proxy from {} to {}", LOG.debug("Changed current proxy from {} to {}",
initial == null ? "none" : initial.proxyInfo, initial == null ? "none" : initial.proxyInfo,
currentProxy.proxyInfo); currentProxy.proxyInfo);
return currentProxy;
} }
/** /**