diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java index b690b8685c0..0883ba3a3db 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java @@ -167,21 +167,18 @@ public abstract class PeriodicService extends AbstractService { stopPeriodic(); // Create the runnable service - Runnable updateRunnable = new Runnable() { - @Override - public void run() { - LOG.debug("Running {} update task", serviceName); - try { - if (!isRunning) { - return; - } - periodicInvoke(); - runCount++; - lastRun = Time.now(); - } catch (Exception ex) { - errorCount++; - LOG.warn(serviceName + " service threw an exception", ex); + Runnable updateRunnable = () -> { + LOG.debug("Running {} update task", serviceName); + try { + if (!isRunning) { + return; } + periodicInvoke(); + runCount++; + lastRun = Time.now(); + } catch (Exception ex) { + errorCount++; + LOG.warn("{} service threw an exception", serviceName, ex); } }; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java index 1316cf71849..19d7442acb6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java @@ -63,12 +63,7 @@ public class RouterHeartbeatService extends PeriodicService { * Trigger the update of the Router state asynchronously. */ protected void updateStateAsync() { - Thread thread = new Thread(new Runnable() { - @Override - public void run() { - updateStateStore(); - } - }, "Router Heartbeat Async"); + Thread thread = new Thread(this::updateStateStore, "Router Heartbeat Async"); thread.setDaemon(true); thread.start(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 980d64a45d1..423e0ba8e48 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -405,7 +405,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol, .asMap() .keySet() .parallelStream() - .forEach((key) -> this.dnCache.refresh(key)), + .forEach(this.dnCache::refresh), 0, dnCacheExpire, TimeUnit.MILLISECONDS); initRouterFedRename(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java index e758eee4fda..107a1ba9551 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java @@ -92,8 +92,6 @@ import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.function.Supplier; - /** * Helper utilities for testing HDFS Federation. */ @@ -174,26 +172,23 @@ public final class FederationTestUtils { final String nsId, final String nnId, final FederationNamenodeServiceState state) throws Exception { - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - try { - List namenodes = - resolver.getNamenodesForNameserviceId(nsId); - if (namenodes != null) { - for (FederationNamenodeContext namenode : namenodes) { - // Check if this is the Namenode we are checking - if (namenode.getNamenodeId() == nnId || - namenode.getNamenodeId().equals(nnId)) { - return state == null || namenode.getState().equals(state); - } + GenericTestUtils.waitFor(() -> { + try { + List namenodes = + resolver.getNamenodesForNameserviceId(nsId); + if (namenodes != null) { + for (FederationNamenodeContext namenode : namenodes) { + // Check if this is the Namenode we are checking + if (namenode.getNamenodeId() == nnId || + namenode.getNamenodeId().equals(nnId)) { + return state == null || namenode.getState().equals(state); } } - } catch (IOException e) { - // Ignore } - return false; + } catch (IOException e) { + // Ignore } + return false; }, 1000, 60 * 1000); } @@ -209,22 +204,19 @@ public final class FederationTestUtils { final ActiveNamenodeResolver resolver, final String nsId, final FederationNamenodeServiceState state) throws Exception { - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - try { - List nns = - resolver.getNamenodesForNameserviceId(nsId); - for (FederationNamenodeContext nn : nns) { - if (nn.getState().equals(state)) { - return true; - } + GenericTestUtils.waitFor(() -> { + try { + List nns = + resolver.getNamenodesForNameserviceId(nsId); + for (FederationNamenodeContext nn : nns) { + if (nn.getState().equals(state)) { + return true; } - } catch (IOException e) { - // Ignore } - return false; + } catch (IOException e) { + // Ignore } + return false; }, 1000, 20 * 1000); } @@ -361,19 +353,16 @@ public final class FederationTestUtils { */ public static void waitRouterRegistered(RouterStore stateManager, long routerCount, int timeout) throws Exception { - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - try { - List cachedRecords = stateManager.getCachedRecords(); - if (cachedRecords.size() == routerCount) { - return true; - } - } catch (IOException e) { - // Ignore + GenericTestUtils.waitFor(() -> { + try { + List cachedRecords = stateManager.getCachedRecords(); + if (cachedRecords.size() == routerCount) { + return true; } - return false; + } catch (IOException e) { + // Ignore } + return false; }, 100, timeout); } @@ -390,13 +379,10 @@ public final class FederationTestUtils { ConnectionManager connectionManager = new ConnectionManager(server.getConfig()); ConnectionManager spyConnectionManager = spy(connectionManager); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - LOG.info("Simulating connectionManager throw IOException {}", - invocation.getMock()); - throw new IOException("Simulate connectionManager throw IOException"); - } + doAnswer(invocation -> { + LOG.info("Simulating connectionManager throw IOException {}", + invocation.getMock()); + throw new IOException("Simulate connectionManager throw IOException"); }).when(spyConnectionManager).getConnection( any(UserGroupInformation.class), any(String.class), any(Class.class)); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java index 87b99e5d952..53247262cef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java @@ -237,12 +237,8 @@ public class MiniRouterDFSCluster { throws IOException, URISyntaxException, InterruptedException { LOG.info("Connecting to router at {}", fileSystemUri); - return user.doAs(new PrivilegedExceptionAction() { - @Override - public DFSClient run() throws IOException { - return new DFSClient(fileSystemUri, conf); - } - }); + return user.doAs((PrivilegedExceptionAction) + () -> new DFSClient(fileSystemUri, conf)); } public RouterClient getAdminClient() throws IOException { @@ -384,12 +380,8 @@ public class MiniRouterDFSCluster { throws IOException, URISyntaxException, InterruptedException { LOG.info("Connecting to namenode at {}", fileSystemUri); - return user.doAs(new PrivilegedExceptionAction() { - @Override - public DFSClient run() throws IOException { - return new DFSClient(fileSystemUri, conf); - } - }); + return user.doAs((PrivilegedExceptionAction) + () -> new DFSClient(fileSystemUri, conf)); } public DFSClient getClient() throws IOException, URISyntaxException { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java index a4755c20fca..27fcf8726b6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java @@ -156,15 +156,12 @@ public class MockNamenode { NamespaceInfo nsInfo = new NamespaceInfo(1, this.nsId, this.nsId, 1); when(mockNn.versionRequest()).thenReturn(nsInfo); - when(mockNn.getServiceStatus()).thenAnswer(new Answer() { - @Override - public HAServiceStatus answer(InvocationOnMock invocation) - throws Throwable { - HAServiceStatus haStatus = new HAServiceStatus(getHAServiceState()); - haStatus.setNotReadyToBecomeActive(""); - return haStatus; - } - }); + when(mockNn.getServiceStatus()). + thenAnswer((Answer) invocation -> { + HAServiceStatus haStatus = new HAServiceStatus(getHAServiceState()); + haStatus.setNotReadyToBecomeActive(""); + return haStatus; + }); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java index dfda47b9a53..06520906022 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java @@ -133,9 +133,8 @@ public class TestRouterRefreshFairnessPolicyController { // Spawn 100 concurrent refresh requests Thread[] threads = new Thread[100]; for (int i = 0; i < 100; i++) { - threads[i] = new Thread(() -> { - client.refreshFairnessPolicyController(routerContext.getConf()); - }); + threads[i] = new Thread(() -> + client.refreshFairnessPolicyController(routerContext.getConf())); } for (Thread thread : threads) { @@ -182,9 +181,8 @@ public class TestRouterRefreshFairnessPolicyController { final int newNs1Permits = 4; conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns0", newNs0Permits); conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", newNs1Permits); - Thread threadRefreshController = new Thread(() -> { - client.refreshFairnessPolicyController(routerContext.getConf()); - }); + Thread threadRefreshController = new Thread(() -> client. + refreshFairnessPolicyController(routerContext.getConf())); threadRefreshController.start(); threadRefreshController.join(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestLocalResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestLocalResolver.java index 08e75b2d309..0625bbadf7d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestLocalResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestLocalResolver.java @@ -43,7 +43,6 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegist import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; /** @@ -78,12 +77,8 @@ public class TestLocalResolver { StringBuilder sb = new StringBuilder("clientX"); LocalResolver localResolver = new LocalResolver(conf, router); LocalResolver spyLocalResolver = spy(localResolver); - doAnswer(new Answer() { - @Override - public String answer(InvocationOnMock invocation) throws Throwable { - return sb.toString(); - } - }).when(spyLocalResolver).getClientAddr(); + doAnswer((Answer) invocation -> sb.toString() + ).when(spyLocalResolver).getClientAddr(); // Add the mocks to the resolver MultipleDestinationMountTableResolver resolver = diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java index e397692e9a8..acb79cb4701 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java @@ -144,7 +144,7 @@ public class TestConnectionManager { connectionCreator.setDaemon(true); connectionCreator.start(); // Wait to make sure async thread is scheduled and picks - GenericTestUtils.waitFor(()->queue.isEmpty(), 50, 5000); + GenericTestUtils.waitFor(queue::isEmpty, 50, 5000); // At this point connection creation task should be definitely picked up. assertTrue(queue.isEmpty()); // At this point connection thread should still be alive. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java index 04cfb5c9d90..8d776546801 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java @@ -206,33 +206,29 @@ public class TestRouterClientRejectOverload { for (int i = 0; i < numOps; i++) { // Stagger the operations a little (50ms) final int sleepTime = i * 50; - Future future = exec.submit(new Runnable() { - @Override - public void run() { - DFSClient routerClient = null; - try { - Thread.sleep(sleepTime); - routerClient = new DFSClient(address, conf); - String clientName = routerClient.getClientName(); - ClientProtocol routerProto = routerClient.getNamenode(); - routerProto.renewLease(clientName, null); - } catch (RemoteException re) { - IOException ioe = re.unwrapRemoteException(); - assertTrue("Wrong exception: " + ioe, - ioe instanceof StandbyException); - assertExceptionContains("is overloaded", ioe); - overloadException.incrementAndGet(); - } catch (IOException e) { - fail("Unexpected exception: " + e); - } catch (InterruptedException e) { - fail("Cannot sleep: " + e); - } finally { - if (routerClient != null) { - try { - routerClient.close(); - } catch (IOException e) { - LOG.error("Cannot close the client"); - } + Future future = exec.submit(() -> { + DFSClient routerClient = null; + try { + Thread.sleep(sleepTime); + routerClient = new DFSClient(address, conf); + String clientName = routerClient.getClientName(); + ClientProtocol routerProto = routerClient.getNamenode(); + routerProto.renewLease(clientName, null); + } catch (RemoteException re) { + IOException ioe = re.unwrapRemoteException(); + assertTrue("Wrong exception: " + ioe, ioe instanceof StandbyException); + assertExceptionContains("is overloaded", ioe); + overloadException.incrementAndGet(); + } catch (IOException e) { + fail("Unexpected exception: " + e); + } catch (InterruptedException e) { + fail("Cannot sleep: " + e); + } finally { + if (routerClient != null) { + try { + routerClient.close(); + } catch (IOException e) { + LOG.error("Cannot close the client"); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java index ef5322ba218..34d50937b2b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java @@ -168,7 +168,7 @@ public class TestRouterFaultTolerant { } namenodes.clear(); - routers.forEach(router -> router.stop()); + routers.forEach(Router::stop); routers.clear(); if (service != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java index b69004198eb..aa3d5470561 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java @@ -74,8 +74,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.util.function.Supplier; - /** * Tests quota behaviors in Router-based Federation. */ @@ -210,21 +208,17 @@ public class TestRouterQuota { routerClient.create("/ssquota/file", true).close(); routerClient.create("/ssquota/subdir/file", true).close(); - GenericTestUtils.waitFor(new Supplier() { - - @Override - public Boolean get() { - boolean isDsQuotaViolated = false; - try { - // append data to trigger NSQuotaExceededException - appendData("/ssquota/file", routerClient, BLOCK_SIZE); - appendData("/ssquota/subdir/file", routerClient, BLOCK_SIZE); - } catch (DSQuotaExceededException e) { - isDsQuotaViolated = true; - } catch (IOException ignored) { - } - return isDsQuotaViolated; + GenericTestUtils.waitFor(() -> { + boolean isDsQuotaViolated = false; + try { + // append data to trigger NSQuotaExceededException + appendData("/ssquota/file", routerClient, BLOCK_SIZE); + appendData("/ssquota/subdir/file", routerClient, BLOCK_SIZE); + } catch (DSQuotaExceededException e) { + isDsQuotaViolated = true; + } catch (IOException ignored) { } + return isDsQuotaViolated; }, 5000, 60000); // append data to destination path in real FileSystem should be okay diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java index 039acbb5988..b2bfb2f5121 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java @@ -54,8 +54,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; -import java.util.function.Supplier; - /** * Test retry behavior of the Router RPC Client. */ @@ -237,11 +235,6 @@ public class TestRouterRPCClientRetries { private static void waitUpdateLiveNodes( final String oldValue, final NamenodeBeanMetrics metrics) throws Exception { - waitFor(new Supplier() { - @Override - public Boolean get() { - return !oldValue.equals(metrics.getLiveNodes()); - } - }, 500, 5 * 1000); + waitFor(() -> !oldValue.equals(metrics.getLiveNodes()), 500, 5 * 1000); } }