From cde1f3af21b84a65fb27a5822b308413ea6296c5 Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Mon, 12 Sep 2022 22:28:16 +0800 Subject: [PATCH] HADOOP-18302. Remove WhiteBox in hadoop-common module. (#4457) Signed-off-by: Akira Ajisaka --- .../apache/hadoop/fs/RawLocalFileSystem.java | 5 + .../org/apache/hadoop/http/HttpServer2.java | 4 + .../java/org/apache/hadoop/ipc/Server.java | 14 + .../apache/hadoop/ipc/metrics/RpcMetrics.java | 5 + .../metrics2/impl/MetricsRecordImpl.java | 4 +- .../hadoop/metrics2/sink/GraphiteSink.java | 283 +++++++++--------- .../hadoop/metrics2/sink/StatsDSink.java | 6 +- .../apache/hadoop/fs/TestLocalFileSystem.java | 4 +- .../apache/hadoop/http/TestHttpServer.java | 7 +- .../java/org/apache/hadoop/ipc/TestIPC.java | 7 +- .../java/org/apache/hadoop/ipc/TestRPC.java | 24 +- .../metrics2/impl/TestGraphiteMetrics.java | 215 ------------- .../metrics2/sink/TestGraphiteMetrics.java | 219 ++++++++++++++ .../{impl => sink}/TestStatsDMetrics.java | 14 +- .../hadoop/portmap/RpcProgramPortmap.java | 7 +- .../apache/hadoop/portmap/TestPortmap.java | 7 +- 16 files changed, 427 insertions(+), 398 deletions(-) delete mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGraphiteMetrics.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestGraphiteMetrics.java rename hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/{impl => sink}/TestStatsDMetrics.java (91%) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java index d9ceab9a054..2f4f93099b5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java @@ -1326,4 +1326,9 @@ public boolean hasPathCapability(final Path path, final String capability) return super.hasPathCapability(path, capability); } } + + @VisibleForTesting + static void setUseDeprecatedFileStatus(boolean useDeprecatedFileStatus) { + RawLocalFileSystem.useDeprecatedFileStatus = useDeprecatedFileStatus; + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java index 1db8c750cef..934848b826a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java @@ -1967,4 +1967,8 @@ HttpServer2Metrics getMetrics() { return metrics; } + @VisibleForTesting + List getListeners() { + return listeners; + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index f5753efe7e5..db34af6ee65 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -4111,4 +4111,18 @@ public synchronized void run() { } } + @VisibleForTesting + CallQueueManager getCallQueue() { + return callQueue; + } + + @VisibleForTesting + void setCallQueue(CallQueueManager callQueue) { + this.callQueue = callQueue; + } + + @VisibleForTesting + void setRpcRequestClass(Class rpcRequestClass) { + this.rpcRequestClass = rpcRequestClass; + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java index bf21e3865fa..f01cd5bcfda 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java @@ -364,4 +364,9 @@ public double getDeferredRpcProcessingStdDev() { public MetricsTag getTag(String tagName) { return registry.getTag(tagName); } + + @VisibleForTesting + public MutableCounterLong getRpcAuthorizationSuccesses() { + return rpcAuthorizationSuccesses; + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsRecordImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsRecordImpl.java index 9ffceaaa0dd..b11f775a73d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsRecordImpl.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsRecordImpl.java @@ -22,12 +22,14 @@ import static org.apache.hadoop.util.Preconditions.*; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.metrics2.MetricsInfo; import org.apache.hadoop.metrics2.AbstractMetric; import org.apache.hadoop.metrics2.MetricsTag; import static org.apache.hadoop.metrics2.util.Contracts.*; -class MetricsRecordImpl extends AbstractMetricsRecord { +@VisibleForTesting +public class MetricsRecordImpl extends AbstractMetricsRecord { protected static final String DEFAULT_CONTEXT = "default"; private final long timestamp; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java index ea1bde3a75e..e07260c9993 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java @@ -21,6 +21,7 @@ import org.apache.commons.configuration2.SubsetConfiguration; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.metrics2.AbstractMetric; import org.apache.hadoop.metrics2.MetricsException; import org.apache.hadoop.metrics2.MetricsRecord; @@ -37,171 +38,173 @@ import java.nio.charset.StandardCharsets; /** - * A metrics sink that writes to a Graphite server + * A metrics sink that writes to a Graphite server. */ @InterfaceAudience.Public @InterfaceStability.Evolving public class GraphiteSink implements MetricsSink, Closeable { - private static final Logger LOG = - LoggerFactory.getLogger(GraphiteSink.class); - private static final String SERVER_HOST_KEY = "server_host"; - private static final String SERVER_PORT_KEY = "server_port"; - private static final String METRICS_PREFIX = "metrics_prefix"; - private String metricsPrefix = null; - private Graphite graphite = null; + private static final Logger LOG = + LoggerFactory.getLogger(GraphiteSink.class); + private static final String SERVER_HOST_KEY = "server_host"; + private static final String SERVER_PORT_KEY = "server_port"; + private static final String METRICS_PREFIX = "metrics_prefix"; + private String metricsPrefix = null; + private Graphite graphite = null; - @Override - public void init(SubsetConfiguration conf) { - // Get Graphite host configurations. - final String serverHost = conf.getString(SERVER_HOST_KEY); - final int serverPort = Integer.parseInt(conf.getString(SERVER_PORT_KEY)); + @Override + public void init(SubsetConfiguration conf) { + // Get Graphite host configurations. + final String serverHost = conf.getString(SERVER_HOST_KEY); + final int serverPort = Integer.parseInt(conf.getString(SERVER_PORT_KEY)); - // Get Graphite metrics graph prefix. - metricsPrefix = conf.getString(METRICS_PREFIX); - if (metricsPrefix == null) - metricsPrefix = ""; - - graphite = new Graphite(serverHost, serverPort); - graphite.connect(); + // Get Graphite metrics graph prefix. + metricsPrefix = conf.getString(METRICS_PREFIX); + if (metricsPrefix == null) { + metricsPrefix = ""; } - @Override - public void putMetrics(MetricsRecord record) { - StringBuilder lines = new StringBuilder(); - StringBuilder metricsPathPrefix = new StringBuilder(); + graphite = new Graphite(serverHost, serverPort); + graphite.connect(); + } - // Configure the hierarchical place to display the graph. - metricsPathPrefix.append(metricsPrefix).append(".") - .append(record.context()).append(".").append(record.name()); + @Override + public void putMetrics(MetricsRecord record) { + StringBuilder lines = new StringBuilder(); + StringBuilder metricsPathPrefix = new StringBuilder(); - for (MetricsTag tag : record.tags()) { - if (tag.value() != null) { - metricsPathPrefix.append(".") - .append(tag.name()) - .append("=") - .append(tag.value()); - } - } + // Configure the hierarchical place to display the graph. + metricsPathPrefix.append(metricsPrefix).append(".") + .append(record.context()).append(".").append(record.name()); - // The record timestamp is in milliseconds while Graphite expects an epoc time in seconds. - long timestamp = record.timestamp() / 1000L; - - // Collect datapoints. - for (AbstractMetric metric : record.metrics()) { - lines.append( - metricsPathPrefix.toString() + "." - + metric.name().replace(' ', '.')).append(" ") - .append(metric.value()).append(" ").append(timestamp) - .append("\n"); - } - - try { - graphite.write(lines.toString()); - } catch (Exception e) { - LOG.warn("Error sending metrics to Graphite", e); - try { - graphite.close(); - } catch (Exception e1) { - throw new MetricsException("Error closing connection to Graphite", e1); - } - } + for (MetricsTag tag : record.tags()) { + if (tag.value() != null) { + metricsPathPrefix.append(".") + .append(tag.name()) + .append("=") + .append(tag.value()); + } } - @Override - public void flush() { + // The record timestamp is in milliseconds while Graphite expects an epoc time in seconds. + long timestamp = record.timestamp() / 1000L; + + // Collect datapoints. + for (AbstractMetric metric : record.metrics()) { + lines.append(metricsPathPrefix + "." + metric.name().replace(' ', '.')).append(" ") + .append(metric.value()).append(" ").append(timestamp) + .append("\n"); + } + + try { + graphite.write(lines.toString()); + } catch (Exception e) { + LOG.warn("Error sending metrics to Graphite.", e); try { - graphite.flush(); - } catch (Exception e) { - LOG.warn("Error flushing metrics to Graphite", e); - try { - graphite.close(); - } catch (Exception e1) { - throw new MetricsException("Error closing connection to Graphite", e1); - } + graphite.close(); + } catch (Exception e1) { + throw new MetricsException("Error closing connection to Graphite", e1); } } + } - @Override - public void close() throws IOException { - graphite.close(); + @Override + public void flush() { + try { + graphite.flush(); + } catch (Exception e) { + LOG.warn("Error flushing metrics to Graphite.", e); + try { + graphite.close(); + } catch (Exception e1) { + throw new MetricsException("Error closing connection to Graphite.", e1); + } + } + } + + @Override + public void close() throws IOException { + graphite.close(); + } + + public static class Graphite { + private final static int MAX_CONNECTION_FAILURES = 5; + + private String serverHost; + private int serverPort; + private Writer writer = null; + private Socket socket = null; + private int connectionFailures = 0; + + public Graphite(String serverHost, int serverPort) { + this.serverHost = serverHost; + this.serverPort = serverPort; } - public static class Graphite { - private final static int MAX_CONNECTION_FAILURES = 5; - - private String serverHost; - private int serverPort; - private Writer writer = null; - private Socket socket = null; - private int connectionFailures = 0; - - public Graphite(String serverHost, int serverPort) { - this.serverHost = serverHost; - this.serverPort = serverPort; + public void connect() { + if (isConnected()) { + throw new MetricsException("Already connected to Graphite"); } - - public void connect() { - if (isConnected()) { - throw new MetricsException("Already connected to Graphite"); - } - if (tooManyConnectionFailures()) { - // return silently (there was ERROR in logs when we reached limit for the first time) - return; - } - try { + if (tooManyConnectionFailures()) { + // return silently (there was ERROR in logs when we reached limit for the first time) + return; + } + try { // Open a connection to Graphite server. - socket = new Socket(serverHost, serverPort); + socket = new Socket(serverHost, serverPort); writer = new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8); - } catch (Exception e) { - connectionFailures++; - if (tooManyConnectionFailures()) { - // first time when connection limit reached, report to logs - LOG.error("Too many connection failures, would not try to connect again."); - } - throw new MetricsException("Error creating connection, " - + serverHost + ":" + serverPort, e); + } catch (Exception e) { + connectionFailures++; + if (tooManyConnectionFailures()) { + // first time when connection limit reached, report to logs + LOG.error("Too many connection failures, would not try to connect again."); } + throw new MetricsException("Error creating connection, " + + serverHost + ":" + serverPort, e); } - - public void write(String msg) throws IOException { - if (!isConnected()) { - connect(); - } - if (isConnected()) { - writer.write(msg); - } - } - - public void flush() throws IOException { - if (isConnected()) { - writer.flush(); - } - } - - public boolean isConnected() { - return socket != null && socket.isConnected() && !socket.isClosed(); - } - - public void close() throws IOException { - try { - if (writer != null) { - writer.close(); - } - } catch (IOException ex) { - if (socket != null) { - socket.close(); - } - } finally { - socket = null; - writer = null; - } - } - - private boolean tooManyConnectionFailures() { - return connectionFailures > MAX_CONNECTION_FAILURES; - } - } + public void write(String msg) throws IOException { + if (!isConnected()) { + connect(); + } + if (isConnected()) { + writer.write(msg); + } + } + + public void flush() throws IOException { + if (isConnected()) { + writer.flush(); + } + } + + public boolean isConnected() { + return socket != null && socket.isConnected() && !socket.isClosed(); + } + + public void close() throws IOException { + try { + if (writer != null) { + writer.close(); + } + } catch (IOException ex) { + if (socket != null) { + socket.close(); + } + } finally { + socket = null; + writer = null; + } + } + + private boolean tooManyConnectionFailures() { + return connectionFailures > MAX_CONNECTION_FAILURES; + } + } + + @VisibleForTesting + void setGraphite(Graphite graphite) { + this.graphite = graphite; + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/StatsDSink.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/StatsDSink.java index d1ec47fdecb..4f41c0b0057 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/StatsDSink.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/StatsDSink.java @@ -28,6 +28,7 @@ import org.apache.commons.configuration2.SubsetConfiguration; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.metrics2.AbstractMetric; import org.apache.hadoop.metrics2.MetricType; import org.apache.hadoop.metrics2.MetricsException; @@ -214,5 +215,8 @@ public void close() throws IOException { } } - + @VisibleForTesting + void setStatsd(StatsD statsd) { + this.statsd = statsd; + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java index 29ef6ca6c7a..38e16221a45 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java @@ -24,7 +24,6 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.LambdaTestUtils; -import org.apache.hadoop.test.Whitebox; import org.apache.hadoop.util.StringUtils; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; @@ -650,7 +649,8 @@ public void testFileStatusPipeFile() throws Exception { RawLocalFileSystem fs = spy(origFs); Configuration conf = mock(Configuration.class); fs.setConf(conf); - Whitebox.setInternalState(fs, "useDeprecatedFileStatus", false); + + RawLocalFileSystem.setUseDeprecatedFileStatus(false); Path path = new Path("/foo"); File pipe = mock(File.class); when(pipe.isFile()).thenReturn(false); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/http/TestHttpServer.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/http/TestHttpServer.java index b1255d19d90..062033d4d56 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/http/TestHttpServer.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/http/TestHttpServer.java @@ -29,7 +29,6 @@ import org.apache.hadoop.security.ShellBasedUnixGroupsMapping; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; -import org.apache.hadoop.test.Whitebox; import org.assertj.core.api.Assertions; import org.eclipse.jetty.server.ServerConnector; @@ -663,8 +662,7 @@ private HttpServer2 checkBindAddress(String host, int port, boolean findPort) HttpServer2 server = createServer(host, port); try { // not bound, ephemeral should return requested port (0 for ephemeral) - List listeners = (List) Whitebox.getInternalState(server, - "listeners"); + List listeners = server.getListeners(); ServerConnector listener = (ServerConnector)listeners.get(0); assertEquals(port, listener.getPort()); @@ -740,8 +738,7 @@ public void testBacklogSize() throws Exception Configuration conf = new Configuration(); conf.setInt(HttpServer2.HTTP_SOCKET_BACKLOG_SIZE_KEY, backlogSize); HttpServer2 srv = createServer("test", conf); - List listeners = (List) Whitebox.getInternalState(srv, - "listeners"); + List listeners = srv.getListeners(); ServerConnector listener = (ServerConnector)listeners.get(0); assertEquals(backlogSize, listener.getAcceptQueueSize()); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index 1e780793a6d..ffa17224b03 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -92,7 +92,6 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.LambdaTestUtils; -import org.apache.hadoop.test.Whitebox; import org.apache.hadoop.util.StringUtils; import org.assertj.core.api.Condition; import org.junit.Assert; @@ -938,7 +937,6 @@ public void testIpcWithReaderQueuing() throws Exception { // goal is to jam a handler with a connection, fill the callq with // connections, in turn jamming the readers - then flood the server and // ensure that the listener blocks when the reader connection queues fill - @SuppressWarnings("unchecked") private void checkBlocking(int readers, int readerQ, int callQ) throws Exception { int handlers = 1; // makes it easier @@ -958,9 +956,8 @@ private void checkBlocking(int readers, int readerQ, int callQ) throws Exception // start server final TestServerQueue server = new TestServerQueue(clients, readers, callQ, handlers, conf); - CallQueueManager spy = spy( - (CallQueueManager)Whitebox.getInternalState(server, "callQueue")); - Whitebox.setInternalState(server, "callQueue", spy); + CallQueueManager spy = spy(server.getCallQueue()); + server.setCallQueue(spy); final InetSocketAddress addr = NetUtils.getConnectAddress(server); server.start(); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index 7201b28ebab..101750d72c8 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -52,7 +52,6 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.MetricsAsserts; import org.apache.hadoop.test.MockitoUtil; -import org.apache.hadoop.test.Whitebox; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -309,7 +308,7 @@ public ProtocolProxy getProxy( throws IOException { T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(), new Class[] { protocol }, new StoppedInvocationHandler()); - return new ProtocolProxy(protocol, proxy, false); + return new ProtocolProxy<>(protocol, proxy, false); } @Override @@ -1219,10 +1218,8 @@ public void testClientBackOff() throws Exception { .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true); server = setupTestServer(builder); - @SuppressWarnings("unchecked") - CallQueueManager spy = spy((CallQueueManager) Whitebox - .getInternalState(server, "callQueue")); - Whitebox.setInternalState(server, "callQueue", spy); + CallQueueManager spy = spy(server.getCallQueue()); + server.setCallQueue(spy); Exception lastException = null; proxy = getClient(addr, conf); @@ -1274,7 +1271,7 @@ public void testClientBackOffByResponseTime() throws Exception { GenericTestUtils.setLogLevel(DecayRpcScheduler.LOG, Level.DEBUG); GenericTestUtils.setLogLevel(RPC.LOG, Level.DEBUG); - final List> res = new ArrayList>(); + final List> res = new ArrayList<>(); final ExecutorService executorService = Executors.newFixedThreadPool(numClients); conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); @@ -1282,10 +1279,8 @@ public void testClientBackOffByResponseTime() throws Exception { final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0"; Server server = setupDecayRpcSchedulerandTestServer(ns + "."); - @SuppressWarnings("unchecked") - CallQueueManager spy = spy((CallQueueManager) Whitebox - .getInternalState(server, "callQueue")); - Whitebox.setInternalState(server, "callQueue", spy); + CallQueueManager spy = spy(server.getCallQueue()); + server.setCallQueue(spy); Exception lastException = null; proxy = getClient(addr, conf); @@ -1624,11 +1619,8 @@ public RpcStatusProto getRpcStatusProto() { RPC.Builder builder = newServerBuilder(conf) .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true); server = setupTestServer(builder); - Whitebox.setInternalState( - server, "rpcRequestClass", FakeRequestClass.class); - MutableCounterLong authMetric = - (MutableCounterLong)Whitebox.getInternalState( - server.getRpcMetrics(), "rpcAuthorizationSuccesses"); + server.setRpcRequestClass(FakeRequestClass.class); + MutableCounterLong authMetric = server.getRpcMetrics().getRpcAuthorizationSuccesses(); proxy = getClient(addr, conf); boolean isDisconnected = true; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGraphiteMetrics.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGraphiteMetrics.java deleted file mode 100644 index 743080acd7a..00000000000 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGraphiteMetrics.java +++ /dev/null @@ -1,215 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.metrics2.impl; - -import org.apache.hadoop.metrics2.AbstractMetric; -import org.apache.hadoop.metrics2.MetricsRecord; -import org.apache.hadoop.metrics2.MetricsTag; -import org.apache.hadoop.metrics2.sink.GraphiteSink; -import org.apache.hadoop.test.Whitebox; -import org.junit.Test; -import org.mockito.ArgumentCaptor; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.Collections; - -import static org.junit.Assert.assertEquals; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.reset; - - -public class TestGraphiteMetrics { - private AbstractMetric makeMetric(String name, Number value) { - AbstractMetric metric = mock(AbstractMetric.class); - when(metric.name()).thenReturn(name); - when(metric.value()).thenReturn(value); - return metric; - } - - private GraphiteSink.Graphite makeGraphite() { - GraphiteSink.Graphite mockGraphite = mock(GraphiteSink.Graphite.class); - when(mockGraphite.isConnected()).thenReturn(true); - return mockGraphite; - } - - @Test - public void testPutMetrics() { - GraphiteSink sink = new GraphiteSink(); - List tags = new ArrayList(); - tags.add(new MetricsTag(MsInfo.Context, "all")); - tags.add(new MetricsTag(MsInfo.Hostname, "host")); - Set metrics = new HashSet(); - metrics.add(makeMetric("foo1", 1.25)); - metrics.add(makeMetric("foo2", 2.25)); - MetricsRecord record = new MetricsRecordImpl(MsInfo.Context, (long) 10000, tags, metrics); - - ArgumentCaptor argument = ArgumentCaptor.forClass(String.class); - final GraphiteSink.Graphite mockGraphite = makeGraphite(); - Whitebox.setInternalState(sink, "graphite", mockGraphite); - sink.putMetrics(record); - - try { - verify(mockGraphite).write(argument.capture()); - } catch (IOException e) { - e.printStackTrace(); - } - - String result = argument.getValue(); - - assertEquals(true, - result.equals("null.all.Context.Context=all.Hostname=host.foo1 1.25 10\n" + - "null.all.Context.Context=all.Hostname=host.foo2 2.25 10\n") || - result.equals("null.all.Context.Context=all.Hostname=host.foo2 2.25 10\n" + - "null.all.Context.Context=all.Hostname=host.foo1 1.25 10\n")); - } - - @Test - public void testPutMetrics2() { - GraphiteSink sink = new GraphiteSink(); - List tags = new ArrayList(); - tags.add(new MetricsTag(MsInfo.Context, "all")); - tags.add(new MetricsTag(MsInfo.Hostname, null)); - Set metrics = new HashSet(); - metrics.add(makeMetric("foo1", 1)); - metrics.add(makeMetric("foo2", 2)); - MetricsRecord record = new MetricsRecordImpl(MsInfo.Context, (long) 10000, tags, metrics); - - - ArgumentCaptor argument = ArgumentCaptor.forClass(String.class); - final GraphiteSink.Graphite mockGraphite = makeGraphite(); - Whitebox.setInternalState(sink, "graphite", mockGraphite); - sink.putMetrics(record); - - try { - verify(mockGraphite).write(argument.capture()); - } catch (IOException e) { - e.printStackTrace(); - } - - String result = argument.getValue(); - - assertEquals(true, - result.equals("null.all.Context.Context=all.foo1 1 10\n" + - "null.all.Context.Context=all.foo2 2 10\n") || - result.equals("null.all.Context.Context=all.foo2 2 10\n" + - "null.all.Context.Context=all.foo1 1 10\n")); - } - - /** - * Assert that timestamps are converted correctly, ticket HADOOP-11182 - */ - @Test - public void testPutMetrics3() { - - // setup GraphiteSink - GraphiteSink sink = new GraphiteSink(); - final GraphiteSink.Graphite mockGraphite = makeGraphite(); - Whitebox.setInternalState(sink, "graphite", mockGraphite); - - // given two metrics records with timestamps 1000 milliseconds apart. - List tags = Collections.emptyList(); - Set metrics = new HashSet(); - metrics.add(makeMetric("foo1", 1)); - MetricsRecord record1 = new MetricsRecordImpl(MsInfo.Context, 1000000000000L, tags, metrics); - MetricsRecord record2 = new MetricsRecordImpl(MsInfo.Context, 1000000001000L, tags, metrics); - - sink.putMetrics(record1); - sink.putMetrics(record2); - - sink.flush(); - try { - sink.close(); - } catch(IOException e) { - e.printStackTrace(); - } - - // then the timestamps in the graphite stream should differ by one second. - try { - verify(mockGraphite).write(eq("null.default.Context.foo1 1 1000000000\n")); - verify(mockGraphite).write(eq("null.default.Context.foo1 1 1000000001\n")); - } catch (IOException e) { - e.printStackTrace(); - } - } - - @Test - public void testFailureAndPutMetrics() throws IOException { - GraphiteSink sink = new GraphiteSink(); - List tags = new ArrayList(); - tags.add(new MetricsTag(MsInfo.Context, "all")); - tags.add(new MetricsTag(MsInfo.Hostname, "host")); - Set metrics = new HashSet(); - metrics.add(makeMetric("foo1", 1.25)); - metrics.add(makeMetric("foo2", 2.25)); - MetricsRecord record = new MetricsRecordImpl(MsInfo.Context, (long) 10000, tags, metrics); - - final GraphiteSink.Graphite mockGraphite = makeGraphite(); - Whitebox.setInternalState(sink, "graphite", mockGraphite); - - // throw exception when first try - doThrow(new IOException("IO exception")).when(mockGraphite).write(anyString()); - - sink.putMetrics(record); - verify(mockGraphite).write(anyString()); - verify(mockGraphite).close(); - - // reset mock and try again - reset(mockGraphite); - when(mockGraphite.isConnected()).thenReturn(false); - - ArgumentCaptor argument = ArgumentCaptor.forClass(String.class); - sink.putMetrics(record); - - verify(mockGraphite).write(argument.capture()); - String result = argument.getValue(); - - assertEquals(true, - result.equals("null.all.Context.Context=all.Hostname=host.foo1 1.25 10\n" + - "null.all.Context.Context=all.Hostname=host.foo2 2.25 10\n") || - result.equals("null.all.Context.Context=all.Hostname=host.foo2 2.25 10\n" + - "null.all.Context.Context=all.Hostname=host.foo1 1.25 10\n")); - } - - @Test - public void testClose(){ - GraphiteSink sink = new GraphiteSink(); - final GraphiteSink.Graphite mockGraphite = makeGraphite(); - Whitebox.setInternalState(sink, "graphite", mockGraphite); - try { - sink.close(); - } catch (IOException ioe) { - ioe.printStackTrace(); - } - - try { - verify(mockGraphite).close(); - } catch (IOException ioe) { - ioe.printStackTrace(); - } - } -} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestGraphiteMetrics.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestGraphiteMetrics.java new file mode 100644 index 00000000000..9ea81c6e4c6 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestGraphiteMetrics.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink; + +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.hadoop.metrics2.MetricsTag; +import org.apache.hadoop.metrics2.impl.MetricsRecordImpl; +import org.apache.hadoop.metrics2.impl.MsInfo; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.reset; + + +public class TestGraphiteMetrics { + private AbstractMetric makeMetric(String name, Number value) { + AbstractMetric metric = mock(AbstractMetric.class); + when(metric.name()).thenReturn(name); + when(metric.value()).thenReturn(value); + return metric; + } + + private GraphiteSink.Graphite makeGraphite() { + GraphiteSink.Graphite mockGraphite = mock(GraphiteSink.Graphite.class); + when(mockGraphite.isConnected()).thenReturn(true); + return mockGraphite; + } + + @Test + public void testPutMetrics() { + GraphiteSink sink = new GraphiteSink(); + List tags = new ArrayList<>(); + tags.add(new MetricsTag(MsInfo.Context, "all")); + tags.add(new MetricsTag(MsInfo.Hostname, "host")); + Set metrics = new HashSet<>(); + metrics.add(makeMetric("foo1", 1.25)); + metrics.add(makeMetric("foo2", 2.25)); + MetricsRecord record = + new MetricsRecordImpl(MsInfo.Context, 10000, tags, metrics); + + ArgumentCaptor argument = ArgumentCaptor.forClass(String.class); + final GraphiteSink.Graphite mockGraphite = makeGraphite(); + sink.setGraphite(mockGraphite); + sink.putMetrics(record); + + try { + verify(mockGraphite).write(argument.capture()); + } catch (IOException e) { + e.printStackTrace(); + } + + String result = argument.getValue(); + + assertEquals(true, + result.equals("null.all.Context.Context=all.Hostname=host.foo1 1.25 10\n" + + "null.all.Context.Context=all.Hostname=host.foo2 2.25 10\n") || + result.equals("null.all.Context.Context=all.Hostname=host.foo2 2.25 10\n" + + "null.all.Context.Context=all.Hostname=host.foo1 1.25 10\n")); + } + + @Test + public void testPutMetrics2() throws IllegalAccessException { + GraphiteSink sink = new GraphiteSink(); + List tags = new ArrayList<>(); + tags.add(new MetricsTag(MsInfo.Context, "all")); + tags.add(new MetricsTag(MsInfo.Hostname, null)); + Set metrics = new HashSet<>(); + metrics.add(makeMetric("foo1", 1)); + metrics.add(makeMetric("foo2", 2)); + MetricsRecord record = + new MetricsRecordImpl(MsInfo.Context, 10000, tags, metrics); + + ArgumentCaptor argument = ArgumentCaptor.forClass(String.class); + final GraphiteSink.Graphite mockGraphite = makeGraphite(); + sink.setGraphite(mockGraphite); + sink.putMetrics(record); + + try { + verify(mockGraphite).write(argument.capture()); + } catch (IOException e) { + e.printStackTrace(); + } + + String result = argument.getValue(); + + assertEquals(true, + result.equals("null.all.Context.Context=all.foo1 1 10\n" + + "null.all.Context.Context=all.foo2 2 10\n") || + result.equals("null.all.Context.Context=all.foo2 2 10\n" + + "null.all.Context.Context=all.foo1 1 10\n")); + } + + /** + * Assert that timestamps are converted correctly, ticket HADOOP-11182. + */ + @Test + public void testPutMetrics3() throws IllegalAccessException { + + // setup GraphiteSink + GraphiteSink sink = new GraphiteSink(); + final GraphiteSink.Graphite mockGraphite = makeGraphite(); + sink.setGraphite(mockGraphite); + + // given two metrics records with timestamps 1000 milliseconds apart. + List tags = Collections.emptyList(); + Set metrics = new HashSet<>(); + metrics.add(makeMetric("foo1", 1)); + MetricsRecord record1 = + new MetricsRecordImpl(MsInfo.Context, 1000000000000L, tags, metrics); + MetricsRecord record2 = + new MetricsRecordImpl(MsInfo.Context, 1000000001000L, tags, metrics); + + sink.putMetrics(record1); + sink.putMetrics(record2); + + sink.flush(); + try { + sink.close(); + } catch(IOException e) { + e.printStackTrace(); + } + + // then the timestamps in the graphite stream should differ by one second. + try { + verify(mockGraphite).write(eq("null.default.Context.foo1 1 1000000000\n")); + verify(mockGraphite).write(eq("null.default.Context.foo1 1 1000000001\n")); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Test + public void testFailureAndPutMetrics() throws IOException, IllegalAccessException { + GraphiteSink sink = new GraphiteSink(); + List tags = new ArrayList<>(); + tags.add(new MetricsTag(MsInfo.Context, "all")); + tags.add(new MetricsTag(MsInfo.Hostname, "host")); + Set metrics = new HashSet<>(); + metrics.add(makeMetric("foo1", 1.25)); + metrics.add(makeMetric("foo2", 2.25)); + MetricsRecord record = + new MetricsRecordImpl(MsInfo.Context, 10000, tags, metrics); + + final GraphiteSink.Graphite mockGraphite = makeGraphite(); + sink.setGraphite(mockGraphite); + + // throw exception when first try + doThrow(new IOException("IO exception")).when(mockGraphite).write(anyString()); + + sink.putMetrics(record); + verify(mockGraphite).write(anyString()); + verify(mockGraphite).close(); + + // reset mock and try again + reset(mockGraphite); + when(mockGraphite.isConnected()).thenReturn(false); + + ArgumentCaptor argument = ArgumentCaptor.forClass(String.class); + sink.putMetrics(record); + + verify(mockGraphite).write(argument.capture()); + String result = argument.getValue(); + + assertEquals(true, + result.equals("null.all.Context.Context=all.Hostname=host.foo1 1.25 10\n" + + "null.all.Context.Context=all.Hostname=host.foo2 2.25 10\n") || + result.equals("null.all.Context.Context=all.Hostname=host.foo2 2.25 10\n" + + "null.all.Context.Context=all.Hostname=host.foo1 1.25 10\n")); + } + + @Test + public void testClose() throws IllegalAccessException { + GraphiteSink sink = new GraphiteSink(); + final GraphiteSink.Graphite mockGraphite = makeGraphite(); + sink.setGraphite(mockGraphite); + try { + sink.close(); + } catch (IOException ioe) { + ioe.printStackTrace(); + } + + try { + verify(mockGraphite).close(); + } catch (IOException ioe) { + ioe.printStackTrace(); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestStatsDMetrics.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestStatsDMetrics.java similarity index 91% rename from hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestStatsDMetrics.java rename to hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestStatsDMetrics.java index 4cf4894ff83..99a75787ad8 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestStatsDMetrics.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestStatsDMetrics.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.metrics2.impl; +package org.apache.hadoop.metrics2.sink; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -35,9 +35,9 @@ import org.apache.hadoop.metrics2.MetricType; import org.apache.hadoop.metrics2.MetricsRecord; import org.apache.hadoop.metrics2.MetricsTag; -import org.apache.hadoop.metrics2.sink.StatsDSink; +import org.apache.hadoop.metrics2.impl.MetricsRecordImpl; +import org.apache.hadoop.metrics2.impl.MsInfo; import org.apache.hadoop.metrics2.sink.StatsDSink.StatsD; -import org.apache.hadoop.test.Whitebox; import org.junit.Test; public class TestStatsDMetrics { @@ -52,7 +52,7 @@ private AbstractMetric makeMetric(String name, Number value, } @Test(timeout=3000) - public void testPutMetrics() throws IOException, InterruptedException { + public void testPutMetrics() throws IOException, IllegalAccessException { final StatsDSink sink = new StatsDSink(); List tags = new ArrayList(); tags.add(new MetricsTag(MsInfo.Hostname, "host")); @@ -69,7 +69,7 @@ public void testPutMetrics() throws IOException, InterruptedException { final StatsDSink.StatsD mockStatsD = new StatsD(sock.getLocalAddress().getHostName(), sock.getLocalPort()); - Whitebox.setInternalState(sink, "statsd", mockStatsD); + sink.setStatsd(mockStatsD); final DatagramPacket p = new DatagramPacket(new byte[8192], 8192); sink.putMetrics(record); sock.receive(p); @@ -87,7 +87,7 @@ public void testPutMetrics() throws IOException, InterruptedException { } @Test(timeout=3000) - public void testPutMetrics2() throws IOException { + public void testPutMetrics2() throws IOException, IllegalAccessException { StatsDSink sink = new StatsDSink(); List tags = new ArrayList(); tags.add(new MetricsTag(MsInfo.Hostname, null)); @@ -104,7 +104,7 @@ public void testPutMetrics2() throws IOException { final StatsDSink.StatsD mockStatsD = new StatsD(sock.getLocalAddress().getHostName(), sock.getLocalPort()); - Whitebox.setInternalState(sink, "statsd", mockStatsD); + sink.setStatsd(mockStatsD); final DatagramPacket p = new DatagramPacket(new byte[8192], 8192); sink.putMetrics(record); sock.receive(p); diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java index 7b33a644fbe..a585dbc6b20 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java @@ -18,6 +18,7 @@ package org.apache.hadoop.portmap; import java.util.concurrent.ConcurrentHashMap; +import java.util.Map; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -54,7 +55,7 @@ final class RpcProgramPortmap extends IdleStateHandler { private static final Logger LOG = LoggerFactory.getLogger(RpcProgramPortmap.class); - private final ConcurrentHashMap map = new ConcurrentHashMap(); + private final ConcurrentHashMap map = new ConcurrentHashMap<>(); /** ChannelGroup that remembers all active channels for gracefully shutdown. */ private final ChannelGroup allChannels; @@ -208,4 +209,8 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) { LOG.warn("Encountered ", t); ctx.channel().close(); } + + public Map getMap() { + return map; + } } diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java index 8ebf9d03c6c..84fa71a269d 100644 --- a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java +++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java @@ -31,7 +31,6 @@ import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.security.CredentialsNone; import org.apache.hadoop.oncrpc.security.VerifierNone; -import org.apache.hadoop.test.Whitebox; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -76,7 +75,7 @@ public void testIdle() throws InterruptedException, IOException { } @Test(timeout = 10000) - public void testRegistration() throws IOException, InterruptedException { + public void testRegistration() throws IOException, InterruptedException, IllegalAccessException { XDR req = new XDR(); RpcCall.getInstance(++xid, RpcProgramPortmap.PROGRAM, RpcProgramPortmap.VERSION, @@ -100,9 +99,7 @@ public void testRegistration() throws IOException, InterruptedException { // Give the server a chance to process the request Thread.sleep(100); boolean found = false; - @SuppressWarnings("unchecked") - Map map = (Map) Whitebox - .getInternalState(pm.getHandler(), "map"); + Map map = pm.getHandler().getMap(); for (PortmapMapping m : map.values()) { if (m.getPort() == sent.getPort()