HDFS-15108. RBF: MembershipNamenodeResolver should invalidate cache incase of active namenode update. Contributed by Ayush Saxena.
This commit is contained in:
parent
26a969ec73
commit
7b62409ace
|
@ -161,6 +161,11 @@ public class MembershipNamenodeResolver
|
||||||
UpdateNamenodeRegistrationRequest.newInstance(
|
UpdateNamenodeRegistrationRequest.newInstance(
|
||||||
record.getNameserviceId(), record.getNamenodeId(), ACTIVE);
|
record.getNameserviceId(), record.getNamenodeId(), ACTIVE);
|
||||||
membership.updateNamenodeRegistration(updateRequest);
|
membership.updateNamenodeRegistration(updateRequest);
|
||||||
|
|
||||||
|
cacheNS.remove(nsId);
|
||||||
|
// Invalidating the full cacheBp since getting the blockpool id from
|
||||||
|
// namespace id is quite costly.
|
||||||
|
cacheBP.clear();
|
||||||
}
|
}
|
||||||
} catch (StateStoreUnavailableException e) {
|
} catch (StateStoreUnavailableException e) {
|
||||||
LOG.error("Cannot update {} as active, State Store unavailable", address);
|
LOG.error("Cannot update {} as active, State Store unavailable", address);
|
||||||
|
|
|
@ -33,6 +33,7 @@ import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@ -281,4 +282,40 @@ public class TestNamenodeResolver {
|
||||||
verifyFirstRegistration(NAMESERVICES[0], NAMENODES[1], 3,
|
verifyFirstRegistration(NAMESERVICES[0], NAMENODES[1], 3,
|
||||||
FederationNamenodeServiceState.STANDBY);
|
FederationNamenodeServiceState.STANDBY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCacheUpdateOnNamenodeStateUpdate() throws IOException {
|
||||||
|
// Create a namenode initially registering in standby state.
|
||||||
|
assertTrue(namenodeResolver.registerNamenode(
|
||||||
|
createNamenodeReport(NAMESERVICES[0], NAMENODES[0],
|
||||||
|
HAServiceState.STANDBY)));
|
||||||
|
stateStore.refreshCaches(true);
|
||||||
|
// Check whether the namenpde state is reported correct as standby.
|
||||||
|
FederationNamenodeContext namenode =
|
||||||
|
namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0]).get(0);
|
||||||
|
assertEquals(FederationNamenodeServiceState.STANDBY, namenode.getState());
|
||||||
|
String rpcAddr = namenode.getRpcAddress();
|
||||||
|
InetSocketAddress inetAddr = getInetSocketAddress(rpcAddr);
|
||||||
|
|
||||||
|
// If the namenode state changes and it serves request,
|
||||||
|
// RouterRpcClient calls updateActiveNamenode to update the state to active,
|
||||||
|
// Check whether correct updated state is returned post update.
|
||||||
|
namenodeResolver.updateActiveNamenode(NAMESERVICES[0], inetAddr);
|
||||||
|
FederationNamenodeContext namenode1 =
|
||||||
|
namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0]).get(0);
|
||||||
|
assertEquals("The namenode state should be ACTIVE post update.",
|
||||||
|
FederationNamenodeServiceState.ACTIVE, namenode1.getState());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates InetSocketAddress from the given RPC address.
|
||||||
|
* @param rpcAddr RPC address (host:port).
|
||||||
|
* @return InetSocketAddress corresponding to the specified RPC address.
|
||||||
|
*/
|
||||||
|
private static InetSocketAddress getInetSocketAddress(String rpcAddr) {
|
||||||
|
String[] rpcAddrArr = rpcAddr.split(":");
|
||||||
|
int port = Integer.parseInt(rpcAddrArr[1]);
|
||||||
|
String hostname = rpcAddrArr[0];
|
||||||
|
return new InetSocketAddress(hostname, port);
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue