diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 408a6ed3a9d..3f12cec4242 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -385,6 +385,9 @@ Release 2.7.0 - UNRELEASED HDFS-7420. Delegate permission checks to FSDirectory. (wheat9) + HDFS-7331. Add Datanode network counts to datanode jmx page. (Charles Lamb + via atm) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index af18f4dad10..78cae9cb258 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -155,6 +155,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final float DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT = 10.0f; public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES = "dfs.datanode.ram.disk.low.watermark.bytes"; public static final long DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES_DEFAULT = DFS_BLOCK_SIZE_DEFAULT; + public static final String DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY = "dfs.datanode.network.counts.cache.max.size"; + public static final int DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT = Integer.MAX_VALUE; // This setting is for testing/internal use only. public static final String DFS_DATANODE_DUPLICATE_REPLICA_DELETION = "dfs.datanode.duplicate.replica.deletion"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index a53698a1793..2ff6870a40f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -38,6 +38,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; @@ -77,6 +79,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -84,6 +87,9 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.management.ObjectName; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -299,6 +305,9 @@ public class DataNode extends ReconfigurableBase DataNodeMetrics metrics; private InetSocketAddress streamingAddr; + // See the note below in incrDatanodeNetworkErrors re: concurrency. + private LoadingCache> datanodeNetworkCounts; + private String hostName; private DatanodeID id; @@ -414,6 +423,20 @@ public class DataNode extends ReconfigurableBase shutdown(); throw ie; } + final int dncCacheMaxSize = + conf.getInt(DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY, + DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT) ; + datanodeNetworkCounts = + CacheBuilder.newBuilder() + .maximumSize(dncCacheMaxSize) + .build(new CacheLoader>() { + @Override + public Map load(String key) throws Exception { + final Map ret = new HashMap(); + ret.put("networkErrors", 0L); + return ret; + } + }); } @Override @@ -1767,6 +1790,30 @@ public class DataNode extends ReconfigurableBase public int getXceiverCount() { return threadGroup == null ? 0 : threadGroup.activeCount(); } + + @Override // DataNodeMXBean + public Map> getDatanodeNetworkCounts() { + return datanodeNetworkCounts.asMap(); + } + + void incrDatanodeNetworkErrors(String host) { + metrics.incrDatanodeNetworkErrors(); + + /* + * Synchronizing on the whole cache is a big hammer, but since it's only + * accumulating errors, it should be ok. If this is ever expanded to include + * non-error stats, then finer-grained concurrency should be applied. + */ + synchronized (datanodeNetworkCounts) { + try { + final Map curCount = datanodeNetworkCounts.get(host); + curCount.put("networkErrors", curCount.get("networkErrors") + 1L); + datanodeNetworkCounts.put(host, curCount); + } catch (ExecutionException e) { + LOG.warn("failed to increment network error counts for " + host); + } + } + } int getXmitsInProgress() { return xmitsInProgress.get(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java index 8e80c587420..92abd886fee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.server.datanode; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import java.util.Map; + /** * * This is the JMX management interface for data node information @@ -76,4 +78,9 @@ public interface DataNodeMXBean { * actively transferring blocks. */ public int getXceiverCount(); + + /** + * Gets the network error counts on a per-Datanode basis. + */ + public Map> getDatanodeNetworkCounts(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index a235c20cc6e..61b9c675efe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -97,6 +97,7 @@ class DataXceiver extends Receiver implements Runnable { private Peer peer; private final String remoteAddress; // address of remote side + private final String remoteAddressWithoutPort; // only the address, no port private final String localAddress; // local address of this daemon private final DataNode datanode; private final DNConf dnConf; @@ -129,6 +130,9 @@ class DataXceiver extends Receiver implements Runnable { this.dataXceiverServer = dataXceiverServer; this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname; remoteAddress = peer.getRemoteAddressString(); + final int colonIdx = remoteAddress.indexOf(':'); + remoteAddressWithoutPort = + (colonIdx < 0) ? remoteAddress : remoteAddress.substring(0, colonIdx); localAddress = peer.getLocalAddressString(); if (LOG.isDebugEnabled()) { @@ -222,7 +226,7 @@ class DataXceiver extends Receiver implements Runnable { LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops"); } } else { - datanode.metrics.incrDatanodeNetworkErrors(); + incrDatanodeNetworkErrors(); throw err; } break; @@ -521,7 +525,7 @@ class DataXceiver extends Receiver implements Runnable { } catch (IOException ioe) { LOG.debug("Error reading client status response. Will close connection.", ioe); IOUtils.closeStream(out); - datanode.metrics.incrDatanodeNetworkErrors(); + incrDatanodeNetworkErrors(); } } else { IOUtils.closeStream(out); @@ -543,7 +547,7 @@ class DataXceiver extends Receiver implements Runnable { if (!(ioe instanceof SocketTimeoutException)) { LOG.warn(dnR + ":Got exception while serving " + block + " to " + remoteAddress, ioe); - datanode.metrics.incrDatanodeNetworkErrors(); + incrDatanodeNetworkErrors(); } throw ioe; } finally { @@ -722,7 +726,7 @@ class DataXceiver extends Receiver implements Runnable { LOG.info(datanode + ":Exception transfering " + block + " to mirror " + mirrorNode + "- continuing without the mirror", e); - datanode.metrics.incrDatanodeNetworkErrors(); + incrDatanodeNetworkErrors(); } } } @@ -777,7 +781,7 @@ class DataXceiver extends Receiver implements Runnable { } catch (IOException ioe) { LOG.info("opWriteBlock " + block + " received exception " + ioe); - datanode.metrics.incrDatanodeNetworkErrors(); + incrDatanodeNetworkErrors(); throw ioe; } finally { // close all opened streams @@ -813,7 +817,7 @@ class DataXceiver extends Receiver implements Runnable { writeResponse(Status.SUCCESS, null, out); } catch (IOException ioe) { LOG.info("transferBlock " + blk + " received exception " + ioe); - datanode.metrics.incrDatanodeNetworkErrors(); + incrDatanodeNetworkErrors(); throw ioe; } finally { IOUtils.closeStream(out); @@ -908,7 +912,7 @@ class DataXceiver extends Receiver implements Runnable { out.flush(); } catch (IOException ioe) { LOG.info("blockChecksum " + block + " received exception " + ioe); - datanode.metrics.incrDatanodeNetworkErrors(); + incrDatanodeNetworkErrors(); throw ioe; } finally { IOUtils.closeStream(out); @@ -975,7 +979,7 @@ class DataXceiver extends Receiver implements Runnable { } catch (IOException ioe) { isOpSuccess = false; LOG.info("opCopyBlock " + block + " received exception " + ioe); - datanode.metrics.incrDatanodeNetworkErrors(); + incrDatanodeNetworkErrors(); throw ioe; } finally { dataXceiverServer.balanceThrottler.release(); @@ -1108,7 +1112,7 @@ class DataXceiver extends Receiver implements Runnable { LOG.info(errMsg); if (!IoeDuringCopyBlockOperation) { // Don't double count IO errors - datanode.metrics.incrDatanodeNetworkErrors(); + incrDatanodeNetworkErrors(); } throw ioe; } finally { @@ -1128,7 +1132,7 @@ class DataXceiver extends Receiver implements Runnable { sendResponse(opStatus, errMsg); } catch (IOException ioe) { LOG.warn("Error writing reply back to " + peer.getRemoteAddressString()); - datanode.metrics.incrDatanodeNetworkErrors(); + incrDatanodeNetworkErrors(); } IOUtils.closeStream(proxyOut); IOUtils.closeStream(blockReceiver); @@ -1182,6 +1186,9 @@ class DataXceiver extends Receiver implements Runnable { out.flush(); } + private void incrDatanodeNetworkErrors() { + datanode.incrDatanodeNetworkErrors(remoteAddressWithoutPort); + } private void checkAccess(OutputStream out, final boolean reply, final ExtendedBlock blk, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java index 90112af245e..0b85d35b0d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java @@ -27,7 +27,9 @@ import static org.junit.Assert.assertTrue; import java.io.Closeable; import java.io.IOException; +import java.lang.management.ManagementFactory; import java.util.List; +import java.util.Map; import com.google.common.collect.Lists; import org.apache.commons.logging.Log; @@ -48,6 +50,9 @@ import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.junit.Test; import org.mockito.Mockito; +import javax.management.MBeanServer; +import javax.management.ObjectName; + public class TestDataNodeMetrics { private static final Log LOG = LogFactory.getLog(TestDataNodeMetrics.class); @@ -217,9 +222,22 @@ public class TestDataNodeMetrics { out.writeBytes("old gs data\n"); out.hflush(); + /* Test the metric. */ final MetricsRecordBuilder dnMetrics = getMetrics(cluster.getDataNodes().get(0).getMetrics().name()); assertCounter("DatanodeNetworkErrors", 1L, dnMetrics); + + /* Test JMX datanode network counts. */ + final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + final ObjectName mxbeanName = + new ObjectName("Hadoop:service=DataNode,name=DataNodeInfo"); + final Object dnc = + mbs.getAttribute(mxbeanName, "DatanodeNetworkCounts"); + final String allDnc = dnc.toString(); + assertTrue("expected to see loopback address", + allDnc.indexOf("127.0.0.1") >= 0); + assertTrue("expected to see networkErrors", + allDnc.indexOf("networkErrors") >= 0); } finally { IOUtils.cleanup(LOG, streams.toArray(new Closeable[0])); if (cluster != null) {