diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index c2952aa304b..ae0f5bbe6e0 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -98,6 +98,8 @@ New Features necessarily returned in the file SolrDocument by returning a list of fields from DocTransformer#getExtraRequestFields (ryan) +* SOLR-7458: Expose HDFS Block Locality Metrics via JMX (Mike Drob via Mark Miller) + Bug Fixes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java b/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java index 08577def536..64213552aba 100644 --- a/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java +++ b/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java @@ -53,16 +53,20 @@ import org.apache.solr.store.blockcache.BufferStore; import org.apache.solr.store.blockcache.Cache; import org.apache.solr.store.blockcache.Metrics; import org.apache.solr.store.hdfs.HdfsDirectory; +import org.apache.solr.store.hdfs.HdfsLocalityReporter; import org.apache.solr.store.hdfs.HdfsLockFactory; import org.apache.solr.util.HdfsUtil; +import org.apache.solr.util.plugin.SolrCoreAware; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; -public class HdfsDirectoryFactory extends CachingDirectoryFactory { +public class HdfsDirectoryFactory extends CachingDirectoryFactory implements SolrCoreAware { public static Logger LOG = LoggerFactory .getLogger(HdfsDirectoryFactory.class); @@ -108,7 +112,7 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory { } }) .build(); - + private final static class MetricsHolder { // [JCIP SE, Goetz, 16.6] Lazy initialization // Won't load until MetricsHolder is referenced @@ -126,6 +130,10 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory { tmpFsCache.cleanUp(); } + private final static class LocalityHolder { + public static final HdfsLocalityReporter reporter = new HdfsLocalityReporter(); + } + @Override public void init(NamedList args) { params = SolrParams.toSolrParams(args); @@ -177,6 +185,7 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory { boolean blockCacheGlobal = getConfig(BLOCKCACHE_GLOBAL, false); // default to false for back compat boolean blockCacheReadEnabled = getConfig(BLOCKCACHE_READ_ENABLED, true); + final HdfsDirectory hdfsDir; final Directory dir; if (blockCacheEnabled && dirContext != DirContext.META_DATA) { int numberOfBlocksPerBank = getConfig(NUMBEROFBLOCKSPERBANK, 16384); @@ -204,12 +213,15 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory { bufferSize, bufferCount, blockCacheGlobal); Cache cache = new BlockDirectoryCache(blockCache, path, metrics, blockCacheGlobal); - HdfsDirectory hdfsDirectory = new HdfsDirectory(new Path(path), lockFactory, conf); - dir = new BlockDirectory(path, hdfsDirectory, cache, null, blockCacheReadEnabled, false); + hdfsDir = new HdfsDirectory(new Path(path), lockFactory, conf); + dir = new BlockDirectory(path, hdfsDir, cache, null, blockCacheReadEnabled, false); } else { - dir = new HdfsDirectory(new Path(path), lockFactory, conf); + hdfsDir = new HdfsDirectory(new Path(path), lockFactory, conf); + dir = hdfsDir; } + LocalityHolder.reporter.registerDirectory(hdfsDir); + boolean nrtCachingDirectory = getConfig(NRTCACHINGDIRECTORY_ENABLE, true); if (nrtCachingDirectory) { double nrtCacheMaxMergeSizeMB = getConfig(NRTCACHINGDIRECTORY_MAXMERGESIZEMB, 16); @@ -443,7 +455,17 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory { @Override public Collection offerMBeans() { - return Arrays.asList(MetricsHolder.metrics); + return Arrays.asList(MetricsHolder.metrics, LocalityHolder.reporter); + } + + @Override + public void inform(SolrCore core) { + setHost(core.getCoreDescriptor().getCoreContainer().getHostName()); + } + + @VisibleForTesting + void setHost(String hostname) { + LocalityHolder.reporter.setHost(hostname); } @Override diff --git a/solr/core/src/java/org/apache/solr/core/SolrResourceLoader.java b/solr/core/src/java/org/apache/solr/core/SolrResourceLoader.java index e9f0821bc4d..60b28774d2a 100644 --- a/solr/core/src/java/org/apache/solr/core/SolrResourceLoader.java +++ b/solr/core/src/java/org/apache/solr/core/SolrResourceLoader.java @@ -738,6 +738,7 @@ public class SolrResourceLoader implements ResourceLoader,Closeable awareCompatibility.put( SolrCoreAware.class, new Class[] { CodecFactory.class, + DirectoryFactory.class, ManagedIndexSchemaFactory.class, QueryResponseWriter.class, SearchComponent.class, diff --git a/solr/core/src/java/org/apache/solr/store/hdfs/HdfsDirectory.java b/solr/core/src/java/org/apache/solr/store/hdfs/HdfsDirectory.java index 70e4d552172..4ddad89ef68 100644 --- a/solr/core/src/java/org/apache/solr/store/hdfs/HdfsDirectory.java +++ b/solr/core/src/java/org/apache/solr/store/hdfs/HdfsDirectory.java @@ -100,6 +100,15 @@ public class HdfsDirectory extends BaseDirectory { public void close() throws IOException { LOG.info("Closing hdfs directory {}", hdfsDirPath); fileSystem.close(); + isOpen = false; + } + + /** + * Check whether this directory is open or closed. This check may return stale results in the form of false negatives. + * @return true if the directory is definitely closed, false if the directory is open or is pending closure + */ + public boolean isClosed() { + return !isOpen; } @Override @@ -241,4 +250,22 @@ public class HdfsDirectory extends BaseDirectory { LOG.debug("Sync called on {}", Arrays.toString(names.toArray())); } + @Override + public int hashCode() { + return hdfsDirPath.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof HdfsDirectory)) { + return false; + } + return this.hdfsDirPath.equals(((HdfsDirectory) obj).hdfsDirPath); + } } diff --git a/solr/core/src/java/org/apache/solr/store/hdfs/HdfsLocalityReporter.java b/solr/core/src/java/org/apache/solr/store/hdfs/HdfsLocalityReporter.java new file mode 100644 index 00000000000..56d97b37a5c --- /dev/null +++ b/solr/core/src/java/org/apache/solr/store/hdfs/HdfsLocalityReporter.java @@ -0,0 +1,212 @@ +package org.apache.solr.store.hdfs; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.net.URL; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.SimpleOrderedMap; +import org.apache.solr.core.SolrInfoMBean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HdfsLocalityReporter implements SolrInfoMBean { + public static final String LOCALITY_BYTES_TOTAL = "locality.bytes.total"; + public static final String LOCALITY_BYTES_LOCAL = "locality.bytes.local"; + public static final String LOCALITY_BYTES_RATIO = "locality.bytes.ratio"; + public static final String LOCALITY_BLOCKS_TOTAL = "locality.blocks.total"; + public static final String LOCALITY_BLOCKS_LOCAL = "locality.blocks.local"; + public static final String LOCALITY_BLOCKS_RATIO = "locality.blocks.ratio"; + + public static final Logger logger = LoggerFactory.getLogger(HdfsLocalityReporter.class); + + private String hostname; + private final ConcurrentMap> cache; + + public HdfsLocalityReporter() { + cache = new ConcurrentHashMap<>(); + } + + /** + * Set the host name to use when determining locality + * @param hostname The name of this host; should correspond to what HDFS Data Nodes think this is. + */ + public void setHost(String hostname) { + this.hostname = hostname; + } + + @Override + public String getName() { + return "hdfs-locality"; + } + + @Override + public String getVersion() { + return getClass().getPackage().getSpecificationVersion(); + } + + @Override + public String getDescription() { + return "Provides metrics for HDFS data locality."; + } + + @Override + public Category getCategory() { + return Category.OTHER; + } + + @Override + public String getSource() { + return null; + } + + @Override + public URL[] getDocs() { + return null; + } + + /** + * Provide statistics on HDFS block locality, both in terms of bytes and block counts. + */ + @Override + public NamedList getStatistics() { + long totalBytes = 0; + long localBytes = 0; + int totalCount = 0; + int localCount = 0; + + for (Iterator iterator = cache.keySet().iterator(); iterator.hasNext();) { + HdfsDirectory hdfsDirectory = iterator.next(); + + if (hdfsDirectory.isClosed()) { + iterator.remove(); + } else { + try { + refreshDirectory(hdfsDirectory); + Map blockMap = cache.get(hdfsDirectory); + + // For every block in every file in this directory, count it + for (BlockLocation[] locations : blockMap.values()) { + for (BlockLocation bl : locations) { + totalBytes += bl.getLength(); + totalCount++; + + if (Arrays.asList(bl.getHosts()).contains(hostname)) { + localBytes += bl.getLength(); + localCount++; + } + } + } + } catch (IOException e) { + logger.warn("Could not retrieve locality information for {} due to exception: {}", + hdfsDirectory.getHdfsDirPath(), e); + } + } + } + + return createStatistics(totalBytes, localBytes, totalCount, localCount); + } + + /** + * Generate a statistics object based on the given measurements for all files monitored by this reporter. + * + * @param totalBytes + * The total bytes used + * @param localBytes + * The amount of bytes found on local nodes + * @param totalCount + * The total block count + * @param localCount + * The amount of blocks found on local nodes + * @return HDFS block locality statistics + */ + private NamedList createStatistics(long totalBytes, long localBytes, int totalCount, int localCount) { + NamedList statistics = new SimpleOrderedMap(); + + statistics.add(LOCALITY_BYTES_TOTAL, totalBytes); + statistics.add(LOCALITY_BYTES_LOCAL, localBytes); + if (localBytes == 0) { + statistics.add(LOCALITY_BYTES_RATIO, 0); + } else { + statistics.add(LOCALITY_BYTES_RATIO, localBytes / (double) totalBytes); + } + statistics.add(LOCALITY_BLOCKS_TOTAL, totalCount); + statistics.add(LOCALITY_BLOCKS_LOCAL, localCount); + if (localCount == 0) { + statistics.add(LOCALITY_BLOCKS_RATIO, 0); + } else { + statistics.add(LOCALITY_BLOCKS_RATIO, localCount / (double) totalCount); + } + + return statistics; + } + + /** + * Add a directory for block locality reporting. This directory will continue to be checked until its close method has + * been called. + * + * @param dir + * The directory to keep metrics on. + */ + public void registerDirectory(HdfsDirectory dir) { + logger.info("Registering direcotry {} for locality metrics.", dir.getHdfsDirPath().toString()); + cache.put(dir, new ConcurrentHashMap()); + } + + /** + * Update the cached block locations for the given directory. This includes deleting any files that no longer exist in + * the file system and adding any new files that have shown up. + * + * @param dir + * The directory to refresh + * @throws IOException + * If there is a problem getting info from HDFS + */ + private void refreshDirectory(HdfsDirectory dir) throws IOException { + Map directoryCache = cache.get(dir); + Set cachedStatuses = directoryCache.keySet(); + + FileSystem fs = dir.getFileSystem(); + FileStatus[] statuses = fs.listStatus(dir.getHdfsDirPath()); + List statusList = Arrays.asList(statuses); + + logger.debug("Updating locality information for: {}", statusList); + + // Keep only the files that still exist + cachedStatuses.retainAll(statusList); + + // Fill in missing entries in the cache + for (FileStatus status : statusList) { + if (!status.isDirectory() && !directoryCache.containsKey(status)) { + BlockLocation[] locations = fs.getFileBlockLocations(status, 0, status.getLen()); + directoryCache.put(status, locations); + } + } + } +} diff --git a/solr/core/src/test/org/apache/solr/core/HdfsDirectoryFactoryTest.java b/solr/core/src/test/org/apache/solr/core/HdfsDirectoryFactoryTest.java index 979222b07c6..511a45cb344 100644 --- a/solr/core/src/test/org/apache/solr/core/HdfsDirectoryFactoryTest.java +++ b/solr/core/src/test/org/apache/solr/core/HdfsDirectoryFactoryTest.java @@ -17,23 +17,34 @@ package org.apache.solr.core; +import java.io.IOException; import java.nio.file.Path; import java.text.SimpleDateFormat; +import java.util.Collections; import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; import java.util.Locale; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.LockFactory; import org.apache.lucene.store.NoLockFactory; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.cloud.hdfs.HdfsTestUtil; import org.apache.solr.common.util.NamedList; import org.apache.solr.core.DirectoryFactory.DirContext; import org.apache.solr.handler.SnapShooter; +import org.apache.solr.store.hdfs.HdfsLocalityReporter; import org.apache.solr.util.BadHdfsThreadsFilter; import org.apache.solr.util.MockCoreContainer.MockCoreDescriptor; +import org.junit.After; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -165,5 +176,51 @@ public class HdfsDirectoryFactoryTest extends SolrTestCaseJ4 { assertTrue(hdfs.isDirectory(currentIndexDirPath)); assertTrue(!hdfs.isDirectory(oldIndexDirPath)); } - + + @Test + public void testLocalityReporter() throws Exception { + Configuration conf = HdfsTestUtil.getClientConfiguration(dfsCluster); + conf.set("dfs.permissions.enabled", "false"); + + HdfsDirectoryFactory factory = new HdfsDirectoryFactory(); + Map props = new HashMap(); + props.put(HdfsDirectoryFactory.HDFS_HOME, HdfsTestUtil.getURI(dfsCluster) + "/solr"); + props.put(HdfsDirectoryFactory.BLOCKCACHE_ENABLED, "false"); + props.put(HdfsDirectoryFactory.NRTCACHINGDIRECTORY_ENABLE, "false"); + factory.init(new NamedList<>(props)); + + Iterator it = factory.offerMBeans().iterator(); + it.next(); // skip + SolrInfoMBean localityBean = it.next(); // brittle, but it's ok + + // Make sure we have the right bean. + assertEquals("hdfs-locality", localityBean.getName()); + + // We haven't done anything, so there should be no data + NamedList statistics = localityBean.getStatistics(); + assertEquals(0l, statistics.get(HdfsLocalityReporter.LOCALITY_BYTES_TOTAL)); + assertEquals(0, statistics.get(HdfsLocalityReporter.LOCALITY_BYTES_RATIO)); + + // create a directory and a file + String path = HdfsTestUtil.getURI(dfsCluster) + "/solr3/"; + Directory dir = factory.create(path, NoLockFactory.INSTANCE, DirContext.DEFAULT); + try(IndexOutput writer = dir.createOutput("output", null)) { + writer.writeLong(42l); + } + + final long long_bytes = Long.SIZE / Byte.SIZE; + + // no locality because hostname not set + statistics = localityBean.getStatistics(); + assertEquals(long_bytes, statistics.get(HdfsLocalityReporter.LOCALITY_BYTES_TOTAL)); + assertEquals(1, statistics.get(HdfsLocalityReporter.LOCALITY_BLOCKS_TOTAL)); + assertEquals(0, statistics.get(HdfsLocalityReporter.LOCALITY_BLOCKS_LOCAL)); + + // set hostname and check again + factory.setHost("127.0.0.1"); + statistics = localityBean.getStatistics(); + assertEquals(long_bytes, statistics.get(HdfsLocalityReporter.LOCALITY_BYTES_LOCAL)); + + factory.close(); + } }