HDFS-16605. Improve Code With Lambda in hadoop-hdfs-rbf moudle. (#4375)
This commit is contained in:
parent
08a940d5dd
commit
2f6916a313
|
@ -167,21 +167,18 @@ public abstract class PeriodicService extends AbstractService {
|
||||||
stopPeriodic();
|
stopPeriodic();
|
||||||
|
|
||||||
// Create the runnable service
|
// Create the runnable service
|
||||||
Runnable updateRunnable = new Runnable() {
|
Runnable updateRunnable = () -> {
|
||||||
@Override
|
LOG.debug("Running {} update task", serviceName);
|
||||||
public void run() {
|
try {
|
||||||
LOG.debug("Running {} update task", serviceName);
|
if (!isRunning) {
|
||||||
try {
|
return;
|
||||||
if (!isRunning) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
periodicInvoke();
|
|
||||||
runCount++;
|
|
||||||
lastRun = Time.now();
|
|
||||||
} catch (Exception ex) {
|
|
||||||
errorCount++;
|
|
||||||
LOG.warn(serviceName + " service threw an exception", ex);
|
|
||||||
}
|
}
|
||||||
|
periodicInvoke();
|
||||||
|
runCount++;
|
||||||
|
lastRun = Time.now();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
errorCount++;
|
||||||
|
LOG.warn("{} service threw an exception", serviceName, ex);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -63,12 +63,7 @@ public class RouterHeartbeatService extends PeriodicService {
|
||||||
* Trigger the update of the Router state asynchronously.
|
* Trigger the update of the Router state asynchronously.
|
||||||
*/
|
*/
|
||||||
protected void updateStateAsync() {
|
protected void updateStateAsync() {
|
||||||
Thread thread = new Thread(new Runnable() {
|
Thread thread = new Thread(this::updateStateStore, "Router Heartbeat Async");
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
updateStateStore();
|
|
||||||
}
|
|
||||||
}, "Router Heartbeat Async");
|
|
||||||
thread.setDaemon(true);
|
thread.setDaemon(true);
|
||||||
thread.start();
|
thread.start();
|
||||||
}
|
}
|
||||||
|
|
|
@ -405,7 +405,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
|
||||||
.asMap()
|
.asMap()
|
||||||
.keySet()
|
.keySet()
|
||||||
.parallelStream()
|
.parallelStream()
|
||||||
.forEach((key) -> this.dnCache.refresh(key)),
|
.forEach(this.dnCache::refresh),
|
||||||
0,
|
0,
|
||||||
dnCacheExpire, TimeUnit.MILLISECONDS);
|
dnCacheExpire, TimeUnit.MILLISECONDS);
|
||||||
initRouterFedRename();
|
initRouterFedRename();
|
||||||
|
|
|
@ -92,8 +92,6 @@ import org.mockito.stubbing.Answer;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.function.Supplier;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper utilities for testing HDFS Federation.
|
* Helper utilities for testing HDFS Federation.
|
||||||
*/
|
*/
|
||||||
|
@ -174,26 +172,23 @@ public final class FederationTestUtils {
|
||||||
final String nsId, final String nnId,
|
final String nsId, final String nnId,
|
||||||
final FederationNamenodeServiceState state) throws Exception {
|
final FederationNamenodeServiceState state) throws Exception {
|
||||||
|
|
||||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
GenericTestUtils.waitFor(() -> {
|
||||||
@Override
|
try {
|
||||||
public Boolean get() {
|
List<? extends FederationNamenodeContext> namenodes =
|
||||||
try {
|
resolver.getNamenodesForNameserviceId(nsId);
|
||||||
List<? extends FederationNamenodeContext> namenodes =
|
if (namenodes != null) {
|
||||||
resolver.getNamenodesForNameserviceId(nsId);
|
for (FederationNamenodeContext namenode : namenodes) {
|
||||||
if (namenodes != null) {
|
// Check if this is the Namenode we are checking
|
||||||
for (FederationNamenodeContext namenode : namenodes) {
|
if (namenode.getNamenodeId() == nnId ||
|
||||||
// Check if this is the Namenode we are checking
|
namenode.getNamenodeId().equals(nnId)) {
|
||||||
if (namenode.getNamenodeId() == nnId ||
|
return state == null || namenode.getState().equals(state);
|
||||||
namenode.getNamenodeId().equals(nnId)) {
|
|
||||||
return state == null || namenode.getState().equals(state);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
|
||||||
// Ignore
|
|
||||||
}
|
}
|
||||||
return false;
|
} catch (IOException e) {
|
||||||
|
// Ignore
|
||||||
}
|
}
|
||||||
|
return false;
|
||||||
}, 1000, 60 * 1000);
|
}, 1000, 60 * 1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -209,22 +204,19 @@ public final class FederationTestUtils {
|
||||||
final ActiveNamenodeResolver resolver, final String nsId,
|
final ActiveNamenodeResolver resolver, final String nsId,
|
||||||
final FederationNamenodeServiceState state) throws Exception {
|
final FederationNamenodeServiceState state) throws Exception {
|
||||||
|
|
||||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
GenericTestUtils.waitFor(() -> {
|
||||||
@Override
|
try {
|
||||||
public Boolean get() {
|
List<? extends FederationNamenodeContext> nns =
|
||||||
try {
|
resolver.getNamenodesForNameserviceId(nsId);
|
||||||
List<? extends FederationNamenodeContext> nns =
|
for (FederationNamenodeContext nn : nns) {
|
||||||
resolver.getNamenodesForNameserviceId(nsId);
|
if (nn.getState().equals(state)) {
|
||||||
for (FederationNamenodeContext nn : nns) {
|
return true;
|
||||||
if (nn.getState().equals(state)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
|
||||||
// Ignore
|
|
||||||
}
|
}
|
||||||
return false;
|
} catch (IOException e) {
|
||||||
|
// Ignore
|
||||||
}
|
}
|
||||||
|
return false;
|
||||||
}, 1000, 20 * 1000);
|
}, 1000, 20 * 1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -361,19 +353,16 @@ public final class FederationTestUtils {
|
||||||
*/
|
*/
|
||||||
public static void waitRouterRegistered(RouterStore stateManager,
|
public static void waitRouterRegistered(RouterStore stateManager,
|
||||||
long routerCount, int timeout) throws Exception {
|
long routerCount, int timeout) throws Exception {
|
||||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
GenericTestUtils.waitFor(() -> {
|
||||||
@Override
|
try {
|
||||||
public Boolean get() {
|
List<RouterState> cachedRecords = stateManager.getCachedRecords();
|
||||||
try {
|
if (cachedRecords.size() == routerCount) {
|
||||||
List<RouterState> cachedRecords = stateManager.getCachedRecords();
|
return true;
|
||||||
if (cachedRecords.size() == routerCount) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
// Ignore
|
|
||||||
}
|
}
|
||||||
return false;
|
} catch (IOException e) {
|
||||||
|
// Ignore
|
||||||
}
|
}
|
||||||
|
return false;
|
||||||
}, 100, timeout);
|
}, 100, timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -390,13 +379,10 @@ public final class FederationTestUtils {
|
||||||
ConnectionManager connectionManager =
|
ConnectionManager connectionManager =
|
||||||
new ConnectionManager(server.getConfig());
|
new ConnectionManager(server.getConfig());
|
||||||
ConnectionManager spyConnectionManager = spy(connectionManager);
|
ConnectionManager spyConnectionManager = spy(connectionManager);
|
||||||
doAnswer(new Answer() {
|
doAnswer(invocation -> {
|
||||||
@Override
|
LOG.info("Simulating connectionManager throw IOException {}",
|
||||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
invocation.getMock());
|
||||||
LOG.info("Simulating connectionManager throw IOException {}",
|
throw new IOException("Simulate connectionManager throw IOException");
|
||||||
invocation.getMock());
|
|
||||||
throw new IOException("Simulate connectionManager throw IOException");
|
|
||||||
}
|
|
||||||
}).when(spyConnectionManager).getConnection(
|
}).when(spyConnectionManager).getConnection(
|
||||||
any(UserGroupInformation.class), any(String.class), any(Class.class));
|
any(UserGroupInformation.class), any(String.class), any(Class.class));
|
||||||
|
|
||||||
|
|
|
@ -237,12 +237,8 @@ public class MiniRouterDFSCluster {
|
||||||
throws IOException, URISyntaxException, InterruptedException {
|
throws IOException, URISyntaxException, InterruptedException {
|
||||||
|
|
||||||
LOG.info("Connecting to router at {}", fileSystemUri);
|
LOG.info("Connecting to router at {}", fileSystemUri);
|
||||||
return user.doAs(new PrivilegedExceptionAction<DFSClient>() {
|
return user.doAs((PrivilegedExceptionAction<DFSClient>)
|
||||||
@Override
|
() -> new DFSClient(fileSystemUri, conf));
|
||||||
public DFSClient run() throws IOException {
|
|
||||||
return new DFSClient(fileSystemUri, conf);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public RouterClient getAdminClient() throws IOException {
|
public RouterClient getAdminClient() throws IOException {
|
||||||
|
@ -384,12 +380,8 @@ public class MiniRouterDFSCluster {
|
||||||
throws IOException, URISyntaxException, InterruptedException {
|
throws IOException, URISyntaxException, InterruptedException {
|
||||||
|
|
||||||
LOG.info("Connecting to namenode at {}", fileSystemUri);
|
LOG.info("Connecting to namenode at {}", fileSystemUri);
|
||||||
return user.doAs(new PrivilegedExceptionAction<DFSClient>() {
|
return user.doAs((PrivilegedExceptionAction<DFSClient>)
|
||||||
@Override
|
() -> new DFSClient(fileSystemUri, conf));
|
||||||
public DFSClient run() throws IOException {
|
|
||||||
return new DFSClient(fileSystemUri, conf);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public DFSClient getClient() throws IOException, URISyntaxException {
|
public DFSClient getClient() throws IOException, URISyntaxException {
|
||||||
|
|
|
@ -156,15 +156,12 @@ public class MockNamenode {
|
||||||
NamespaceInfo nsInfo = new NamespaceInfo(1, this.nsId, this.nsId, 1);
|
NamespaceInfo nsInfo = new NamespaceInfo(1, this.nsId, this.nsId, 1);
|
||||||
when(mockNn.versionRequest()).thenReturn(nsInfo);
|
when(mockNn.versionRequest()).thenReturn(nsInfo);
|
||||||
|
|
||||||
when(mockNn.getServiceStatus()).thenAnswer(new Answer<HAServiceStatus>() {
|
when(mockNn.getServiceStatus()).
|
||||||
@Override
|
thenAnswer((Answer<HAServiceStatus>) invocation -> {
|
||||||
public HAServiceStatus answer(InvocationOnMock invocation)
|
HAServiceStatus haStatus = new HAServiceStatus(getHAServiceState());
|
||||||
throws Throwable {
|
haStatus.setNotReadyToBecomeActive("");
|
||||||
HAServiceStatus haStatus = new HAServiceStatus(getHAServiceState());
|
return haStatus;
|
||||||
haStatus.setNotReadyToBecomeActive("");
|
});
|
||||||
return haStatus;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -133,9 +133,8 @@ public class TestRouterRefreshFairnessPolicyController {
|
||||||
// Spawn 100 concurrent refresh requests
|
// Spawn 100 concurrent refresh requests
|
||||||
Thread[] threads = new Thread[100];
|
Thread[] threads = new Thread[100];
|
||||||
for (int i = 0; i < 100; i++) {
|
for (int i = 0; i < 100; i++) {
|
||||||
threads[i] = new Thread(() -> {
|
threads[i] = new Thread(() ->
|
||||||
client.refreshFairnessPolicyController(routerContext.getConf());
|
client.refreshFairnessPolicyController(routerContext.getConf()));
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (Thread thread : threads) {
|
for (Thread thread : threads) {
|
||||||
|
@ -182,9 +181,8 @@ public class TestRouterRefreshFairnessPolicyController {
|
||||||
final int newNs1Permits = 4;
|
final int newNs1Permits = 4;
|
||||||
conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns0", newNs0Permits);
|
conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns0", newNs0Permits);
|
||||||
conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", newNs1Permits);
|
conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", newNs1Permits);
|
||||||
Thread threadRefreshController = new Thread(() -> {
|
Thread threadRefreshController = new Thread(() -> client.
|
||||||
client.refreshFairnessPolicyController(routerContext.getConf());
|
refreshFairnessPolicyController(routerContext.getConf()));
|
||||||
});
|
|
||||||
threadRefreshController.start();
|
threadRefreshController.start();
|
||||||
threadRefreshController.join();
|
threadRefreshController.join();
|
||||||
|
|
||||||
|
|
|
@ -43,7 +43,6 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegist
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
|
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -78,12 +77,8 @@ public class TestLocalResolver {
|
||||||
StringBuilder sb = new StringBuilder("clientX");
|
StringBuilder sb = new StringBuilder("clientX");
|
||||||
LocalResolver localResolver = new LocalResolver(conf, router);
|
LocalResolver localResolver = new LocalResolver(conf, router);
|
||||||
LocalResolver spyLocalResolver = spy(localResolver);
|
LocalResolver spyLocalResolver = spy(localResolver);
|
||||||
doAnswer(new Answer<String>() {
|
doAnswer((Answer<String>) invocation -> sb.toString()
|
||||||
@Override
|
).when(spyLocalResolver).getClientAddr();
|
||||||
public String answer(InvocationOnMock invocation) throws Throwable {
|
|
||||||
return sb.toString();
|
|
||||||
}
|
|
||||||
}).when(spyLocalResolver).getClientAddr();
|
|
||||||
|
|
||||||
// Add the mocks to the resolver
|
// Add the mocks to the resolver
|
||||||
MultipleDestinationMountTableResolver resolver =
|
MultipleDestinationMountTableResolver resolver =
|
||||||
|
|
|
@ -144,7 +144,7 @@ public class TestConnectionManager {
|
||||||
connectionCreator.setDaemon(true);
|
connectionCreator.setDaemon(true);
|
||||||
connectionCreator.start();
|
connectionCreator.start();
|
||||||
// Wait to make sure async thread is scheduled and picks
|
// Wait to make sure async thread is scheduled and picks
|
||||||
GenericTestUtils.waitFor(()->queue.isEmpty(), 50, 5000);
|
GenericTestUtils.waitFor(queue::isEmpty, 50, 5000);
|
||||||
// At this point connection creation task should be definitely picked up.
|
// At this point connection creation task should be definitely picked up.
|
||||||
assertTrue(queue.isEmpty());
|
assertTrue(queue.isEmpty());
|
||||||
// At this point connection thread should still be alive.
|
// At this point connection thread should still be alive.
|
||||||
|
|
|
@ -206,33 +206,29 @@ public class TestRouterClientRejectOverload {
|
||||||
for (int i = 0; i < numOps; i++) {
|
for (int i = 0; i < numOps; i++) {
|
||||||
// Stagger the operations a little (50ms)
|
// Stagger the operations a little (50ms)
|
||||||
final int sleepTime = i * 50;
|
final int sleepTime = i * 50;
|
||||||
Future<?> future = exec.submit(new Runnable() {
|
Future<?> future = exec.submit(() -> {
|
||||||
@Override
|
DFSClient routerClient = null;
|
||||||
public void run() {
|
try {
|
||||||
DFSClient routerClient = null;
|
Thread.sleep(sleepTime);
|
||||||
try {
|
routerClient = new DFSClient(address, conf);
|
||||||
Thread.sleep(sleepTime);
|
String clientName = routerClient.getClientName();
|
||||||
routerClient = new DFSClient(address, conf);
|
ClientProtocol routerProto = routerClient.getNamenode();
|
||||||
String clientName = routerClient.getClientName();
|
routerProto.renewLease(clientName, null);
|
||||||
ClientProtocol routerProto = routerClient.getNamenode();
|
} catch (RemoteException re) {
|
||||||
routerProto.renewLease(clientName, null);
|
IOException ioe = re.unwrapRemoteException();
|
||||||
} catch (RemoteException re) {
|
assertTrue("Wrong exception: " + ioe, ioe instanceof StandbyException);
|
||||||
IOException ioe = re.unwrapRemoteException();
|
assertExceptionContains("is overloaded", ioe);
|
||||||
assertTrue("Wrong exception: " + ioe,
|
overloadException.incrementAndGet();
|
||||||
ioe instanceof StandbyException);
|
} catch (IOException e) {
|
||||||
assertExceptionContains("is overloaded", ioe);
|
fail("Unexpected exception: " + e);
|
||||||
overloadException.incrementAndGet();
|
} catch (InterruptedException e) {
|
||||||
} catch (IOException e) {
|
fail("Cannot sleep: " + e);
|
||||||
fail("Unexpected exception: " + e);
|
} finally {
|
||||||
} catch (InterruptedException e) {
|
if (routerClient != null) {
|
||||||
fail("Cannot sleep: " + e);
|
try {
|
||||||
} finally {
|
routerClient.close();
|
||||||
if (routerClient != null) {
|
} catch (IOException e) {
|
||||||
try {
|
LOG.error("Cannot close the client");
|
||||||
routerClient.close();
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.error("Cannot close the client");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -168,7 +168,7 @@ public class TestRouterFaultTolerant {
|
||||||
}
|
}
|
||||||
namenodes.clear();
|
namenodes.clear();
|
||||||
|
|
||||||
routers.forEach(router -> router.stop());
|
routers.forEach(Router::stop);
|
||||||
routers.clear();
|
routers.clear();
|
||||||
|
|
||||||
if (service != null) {
|
if (service != null) {
|
||||||
|
|
|
@ -74,8 +74,6 @@ import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.function.Supplier;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests quota behaviors in Router-based Federation.
|
* Tests quota behaviors in Router-based Federation.
|
||||||
*/
|
*/
|
||||||
|
@ -210,21 +208,17 @@ public class TestRouterQuota {
|
||||||
routerClient.create("/ssquota/file", true).close();
|
routerClient.create("/ssquota/file", true).close();
|
||||||
routerClient.create("/ssquota/subdir/file", true).close();
|
routerClient.create("/ssquota/subdir/file", true).close();
|
||||||
|
|
||||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
GenericTestUtils.waitFor(() -> {
|
||||||
|
boolean isDsQuotaViolated = false;
|
||||||
@Override
|
try {
|
||||||
public Boolean get() {
|
// append data to trigger NSQuotaExceededException
|
||||||
boolean isDsQuotaViolated = false;
|
appendData("/ssquota/file", routerClient, BLOCK_SIZE);
|
||||||
try {
|
appendData("/ssquota/subdir/file", routerClient, BLOCK_SIZE);
|
||||||
// append data to trigger NSQuotaExceededException
|
} catch (DSQuotaExceededException e) {
|
||||||
appendData("/ssquota/file", routerClient, BLOCK_SIZE);
|
isDsQuotaViolated = true;
|
||||||
appendData("/ssquota/subdir/file", routerClient, BLOCK_SIZE);
|
} catch (IOException ignored) {
|
||||||
} catch (DSQuotaExceededException e) {
|
|
||||||
isDsQuotaViolated = true;
|
|
||||||
} catch (IOException ignored) {
|
|
||||||
}
|
|
||||||
return isDsQuotaViolated;
|
|
||||||
}
|
}
|
||||||
|
return isDsQuotaViolated;
|
||||||
}, 5000, 60000);
|
}, 5000, 60000);
|
||||||
|
|
||||||
// append data to destination path in real FileSystem should be okay
|
// append data to destination path in real FileSystem should be okay
|
||||||
|
|
|
@ -54,8 +54,6 @@ import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.Timeout;
|
import org.junit.rules.Timeout;
|
||||||
|
|
||||||
import java.util.function.Supplier;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test retry behavior of the Router RPC Client.
|
* Test retry behavior of the Router RPC Client.
|
||||||
*/
|
*/
|
||||||
|
@ -237,11 +235,6 @@ public class TestRouterRPCClientRetries {
|
||||||
private static void waitUpdateLiveNodes(
|
private static void waitUpdateLiveNodes(
|
||||||
final String oldValue, final NamenodeBeanMetrics metrics)
|
final String oldValue, final NamenodeBeanMetrics metrics)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
waitFor(new Supplier<Boolean>() {
|
waitFor(() -> !oldValue.equals(metrics.getLiveNodes()), 500, 5 * 1000);
|
||||||
@Override
|
|
||||||
public Boolean get() {
|
|
||||||
return !oldValue.equals(metrics.getLiveNodes());
|
|
||||||
}
|
|
||||||
}, 500, 5 * 1000);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue