mirror of https://github.com/apache/lucene.git
SOLR-7458: Expose HDFS Block Locality Metrics via JMX
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1684711 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
40f4e7de5e
commit
3ad2bf92c4
|
@ -98,6 +98,8 @@ New Features
|
|||
necessarily returned in the file SolrDocument by returning a list of fields from
|
||||
DocTransformer#getExtraRequestFields (ryan)
|
||||
|
||||
* SOLR-7458: Expose HDFS Block Locality Metrics via JMX (Mike Drob via Mark Miller)
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -53,16 +53,20 @@ import org.apache.solr.store.blockcache.BufferStore;
|
|||
import org.apache.solr.store.blockcache.Cache;
|
||||
import org.apache.solr.store.blockcache.Metrics;
|
||||
import org.apache.solr.store.hdfs.HdfsDirectory;
|
||||
import org.apache.solr.store.hdfs.HdfsLocalityReporter;
|
||||
import org.apache.solr.store.hdfs.HdfsLockFactory;
|
||||
import org.apache.solr.util.HdfsUtil;
|
||||
import org.apache.solr.util.plugin.SolrCoreAware;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.cache.RemovalListener;
|
||||
import com.google.common.cache.RemovalNotification;
|
||||
|
||||
public class HdfsDirectoryFactory extends CachingDirectoryFactory {
|
||||
public class HdfsDirectoryFactory extends CachingDirectoryFactory implements SolrCoreAware {
|
||||
public static Logger LOG = LoggerFactory
|
||||
.getLogger(HdfsDirectoryFactory.class);
|
||||
|
||||
|
@ -108,7 +112,7 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory {
|
|||
}
|
||||
})
|
||||
.build();
|
||||
|
||||
|
||||
private final static class MetricsHolder {
|
||||
// [JCIP SE, Goetz, 16.6] Lazy initialization
|
||||
// Won't load until MetricsHolder is referenced
|
||||
|
@ -126,6 +130,10 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory {
|
|||
tmpFsCache.cleanUp();
|
||||
}
|
||||
|
||||
private final static class LocalityHolder {
|
||||
public static final HdfsLocalityReporter reporter = new HdfsLocalityReporter();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(NamedList args) {
|
||||
params = SolrParams.toSolrParams(args);
|
||||
|
@ -177,6 +185,7 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory {
|
|||
boolean blockCacheGlobal = getConfig(BLOCKCACHE_GLOBAL, false); // default to false for back compat
|
||||
boolean blockCacheReadEnabled = getConfig(BLOCKCACHE_READ_ENABLED, true);
|
||||
|
||||
final HdfsDirectory hdfsDir;
|
||||
final Directory dir;
|
||||
if (blockCacheEnabled && dirContext != DirContext.META_DATA) {
|
||||
int numberOfBlocksPerBank = getConfig(NUMBEROFBLOCKSPERBANK, 16384);
|
||||
|
@ -204,12 +213,15 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory {
|
|||
bufferSize, bufferCount, blockCacheGlobal);
|
||||
|
||||
Cache cache = new BlockDirectoryCache(blockCache, path, metrics, blockCacheGlobal);
|
||||
HdfsDirectory hdfsDirectory = new HdfsDirectory(new Path(path), lockFactory, conf);
|
||||
dir = new BlockDirectory(path, hdfsDirectory, cache, null, blockCacheReadEnabled, false);
|
||||
hdfsDir = new HdfsDirectory(new Path(path), lockFactory, conf);
|
||||
dir = new BlockDirectory(path, hdfsDir, cache, null, blockCacheReadEnabled, false);
|
||||
} else {
|
||||
dir = new HdfsDirectory(new Path(path), lockFactory, conf);
|
||||
hdfsDir = new HdfsDirectory(new Path(path), lockFactory, conf);
|
||||
dir = hdfsDir;
|
||||
}
|
||||
|
||||
LocalityHolder.reporter.registerDirectory(hdfsDir);
|
||||
|
||||
boolean nrtCachingDirectory = getConfig(NRTCACHINGDIRECTORY_ENABLE, true);
|
||||
if (nrtCachingDirectory) {
|
||||
double nrtCacheMaxMergeSizeMB = getConfig(NRTCACHINGDIRECTORY_MAXMERGESIZEMB, 16);
|
||||
|
@ -443,7 +455,17 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory {
|
|||
|
||||
@Override
|
||||
public Collection<SolrInfoMBean> offerMBeans() {
|
||||
return Arrays.<SolrInfoMBean>asList(MetricsHolder.metrics);
|
||||
return Arrays.<SolrInfoMBean>asList(MetricsHolder.metrics, LocalityHolder.reporter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inform(SolrCore core) {
|
||||
setHost(core.getCoreDescriptor().getCoreContainer().getHostName());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void setHost(String hostname) {
|
||||
LocalityHolder.reporter.setHost(hostname);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -738,6 +738,7 @@ public class SolrResourceLoader implements ResourceLoader,Closeable
|
|||
awareCompatibility.put(
|
||||
SolrCoreAware.class, new Class[] {
|
||||
CodecFactory.class,
|
||||
DirectoryFactory.class,
|
||||
ManagedIndexSchemaFactory.class,
|
||||
QueryResponseWriter.class,
|
||||
SearchComponent.class,
|
||||
|
|
|
@ -100,6 +100,15 @@ public class HdfsDirectory extends BaseDirectory {
|
|||
public void close() throws IOException {
|
||||
LOG.info("Closing hdfs directory {}", hdfsDirPath);
|
||||
fileSystem.close();
|
||||
isOpen = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether this directory is open or closed. This check may return stale results in the form of false negatives.
|
||||
* @return true if the directory is definitely closed, false if the directory is open or is pending closure
|
||||
*/
|
||||
public boolean isClosed() {
|
||||
return !isOpen;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -241,4 +250,22 @@ public class HdfsDirectory extends BaseDirectory {
|
|||
LOG.debug("Sync called on {}", Arrays.toString(names.toArray()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return hdfsDirPath.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == this) {
|
||||
return true;
|
||||
}
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (!(obj instanceof HdfsDirectory)) {
|
||||
return false;
|
||||
}
|
||||
return this.hdfsDirPath.equals(((HdfsDirectory) obj).hdfsDirPath);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,212 @@
|
|||
package org.apache.solr.store.hdfs;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URL;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.SimpleOrderedMap;
|
||||
import org.apache.solr.core.SolrInfoMBean;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class HdfsLocalityReporter implements SolrInfoMBean {
|
||||
public static final String LOCALITY_BYTES_TOTAL = "locality.bytes.total";
|
||||
public static final String LOCALITY_BYTES_LOCAL = "locality.bytes.local";
|
||||
public static final String LOCALITY_BYTES_RATIO = "locality.bytes.ratio";
|
||||
public static final String LOCALITY_BLOCKS_TOTAL = "locality.blocks.total";
|
||||
public static final String LOCALITY_BLOCKS_LOCAL = "locality.blocks.local";
|
||||
public static final String LOCALITY_BLOCKS_RATIO = "locality.blocks.ratio";
|
||||
|
||||
public static final Logger logger = LoggerFactory.getLogger(HdfsLocalityReporter.class);
|
||||
|
||||
private String hostname;
|
||||
private final ConcurrentMap<HdfsDirectory,ConcurrentMap<FileStatus,BlockLocation[]>> cache;
|
||||
|
||||
public HdfsLocalityReporter() {
|
||||
cache = new ConcurrentHashMap<>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the host name to use when determining locality
|
||||
* @param hostname The name of this host; should correspond to what HDFS Data Nodes think this is.
|
||||
*/
|
||||
public void setHost(String hostname) {
|
||||
this.hostname = hostname;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "hdfs-locality";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getVersion() {
|
||||
return getClass().getPackage().getSpecificationVersion();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDescription() {
|
||||
return "Provides metrics for HDFS data locality.";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Category getCategory() {
|
||||
return Category.OTHER;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSource() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public URL[] getDocs() {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Provide statistics on HDFS block locality, both in terms of bytes and block counts.
|
||||
*/
|
||||
@Override
|
||||
public NamedList getStatistics() {
|
||||
long totalBytes = 0;
|
||||
long localBytes = 0;
|
||||
int totalCount = 0;
|
||||
int localCount = 0;
|
||||
|
||||
for (Iterator<HdfsDirectory> iterator = cache.keySet().iterator(); iterator.hasNext();) {
|
||||
HdfsDirectory hdfsDirectory = iterator.next();
|
||||
|
||||
if (hdfsDirectory.isClosed()) {
|
||||
iterator.remove();
|
||||
} else {
|
||||
try {
|
||||
refreshDirectory(hdfsDirectory);
|
||||
Map<FileStatus,BlockLocation[]> blockMap = cache.get(hdfsDirectory);
|
||||
|
||||
// For every block in every file in this directory, count it
|
||||
for (BlockLocation[] locations : blockMap.values()) {
|
||||
for (BlockLocation bl : locations) {
|
||||
totalBytes += bl.getLength();
|
||||
totalCount++;
|
||||
|
||||
if (Arrays.asList(bl.getHosts()).contains(hostname)) {
|
||||
localBytes += bl.getLength();
|
||||
localCount++;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.warn("Could not retrieve locality information for {} due to exception: {}",
|
||||
hdfsDirectory.getHdfsDirPath(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return createStatistics(totalBytes, localBytes, totalCount, localCount);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a statistics object based on the given measurements for all files monitored by this reporter.
|
||||
*
|
||||
* @param totalBytes
|
||||
* The total bytes used
|
||||
* @param localBytes
|
||||
* The amount of bytes found on local nodes
|
||||
* @param totalCount
|
||||
* The total block count
|
||||
* @param localCount
|
||||
* The amount of blocks found on local nodes
|
||||
* @return HDFS block locality statistics
|
||||
*/
|
||||
private NamedList<Number> createStatistics(long totalBytes, long localBytes, int totalCount, int localCount) {
|
||||
NamedList<Number> statistics = new SimpleOrderedMap<Number>();
|
||||
|
||||
statistics.add(LOCALITY_BYTES_TOTAL, totalBytes);
|
||||
statistics.add(LOCALITY_BYTES_LOCAL, localBytes);
|
||||
if (localBytes == 0) {
|
||||
statistics.add(LOCALITY_BYTES_RATIO, 0);
|
||||
} else {
|
||||
statistics.add(LOCALITY_BYTES_RATIO, localBytes / (double) totalBytes);
|
||||
}
|
||||
statistics.add(LOCALITY_BLOCKS_TOTAL, totalCount);
|
||||
statistics.add(LOCALITY_BLOCKS_LOCAL, localCount);
|
||||
if (localCount == 0) {
|
||||
statistics.add(LOCALITY_BLOCKS_RATIO, 0);
|
||||
} else {
|
||||
statistics.add(LOCALITY_BLOCKS_RATIO, localCount / (double) totalCount);
|
||||
}
|
||||
|
||||
return statistics;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a directory for block locality reporting. This directory will continue to be checked until its close method has
|
||||
* been called.
|
||||
*
|
||||
* @param dir
|
||||
* The directory to keep metrics on.
|
||||
*/
|
||||
public void registerDirectory(HdfsDirectory dir) {
|
||||
logger.info("Registering direcotry {} for locality metrics.", dir.getHdfsDirPath().toString());
|
||||
cache.put(dir, new ConcurrentHashMap<FileStatus, BlockLocation[]>());
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the cached block locations for the given directory. This includes deleting any files that no longer exist in
|
||||
* the file system and adding any new files that have shown up.
|
||||
*
|
||||
* @param dir
|
||||
* The directory to refresh
|
||||
* @throws IOException
|
||||
* If there is a problem getting info from HDFS
|
||||
*/
|
||||
private void refreshDirectory(HdfsDirectory dir) throws IOException {
|
||||
Map<FileStatus,BlockLocation[]> directoryCache = cache.get(dir);
|
||||
Set<FileStatus> cachedStatuses = directoryCache.keySet();
|
||||
|
||||
FileSystem fs = dir.getFileSystem();
|
||||
FileStatus[] statuses = fs.listStatus(dir.getHdfsDirPath());
|
||||
List<FileStatus> statusList = Arrays.asList(statuses);
|
||||
|
||||
logger.debug("Updating locality information for: {}", statusList);
|
||||
|
||||
// Keep only the files that still exist
|
||||
cachedStatuses.retainAll(statusList);
|
||||
|
||||
// Fill in missing entries in the cache
|
||||
for (FileStatus status : statusList) {
|
||||
if (!status.isDirectory() && !directoryCache.containsKey(status)) {
|
||||
BlockLocation[] locations = fs.getFileBlockLocations(status, 0, status.getLen());
|
||||
directoryCache.put(status, locations);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -17,23 +17,34 @@
|
|||
|
||||
package org.apache.solr.core;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
import org.apache.lucene.store.LockFactory;
|
||||
import org.apache.lucene.store.NoLockFactory;
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.cloud.hdfs.HdfsTestUtil;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.core.DirectoryFactory.DirContext;
|
||||
import org.apache.solr.handler.SnapShooter;
|
||||
import org.apache.solr.store.hdfs.HdfsLocalityReporter;
|
||||
import org.apache.solr.util.BadHdfsThreadsFilter;
|
||||
import org.apache.solr.util.MockCoreContainer.MockCoreDescriptor;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -165,5 +176,51 @@ public class HdfsDirectoryFactoryTest extends SolrTestCaseJ4 {
|
|||
assertTrue(hdfs.isDirectory(currentIndexDirPath));
|
||||
assertTrue(!hdfs.isDirectory(oldIndexDirPath));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testLocalityReporter() throws Exception {
|
||||
Configuration conf = HdfsTestUtil.getClientConfiguration(dfsCluster);
|
||||
conf.set("dfs.permissions.enabled", "false");
|
||||
|
||||
HdfsDirectoryFactory factory = new HdfsDirectoryFactory();
|
||||
Map<String,String> props = new HashMap<String,String>();
|
||||
props.put(HdfsDirectoryFactory.HDFS_HOME, HdfsTestUtil.getURI(dfsCluster) + "/solr");
|
||||
props.put(HdfsDirectoryFactory.BLOCKCACHE_ENABLED, "false");
|
||||
props.put(HdfsDirectoryFactory.NRTCACHINGDIRECTORY_ENABLE, "false");
|
||||
factory.init(new NamedList<>(props));
|
||||
|
||||
Iterator<SolrInfoMBean> it = factory.offerMBeans().iterator();
|
||||
it.next(); // skip
|
||||
SolrInfoMBean localityBean = it.next(); // brittle, but it's ok
|
||||
|
||||
// Make sure we have the right bean.
|
||||
assertEquals("hdfs-locality", localityBean.getName());
|
||||
|
||||
// We haven't done anything, so there should be no data
|
||||
NamedList<?> statistics = localityBean.getStatistics();
|
||||
assertEquals(0l, statistics.get(HdfsLocalityReporter.LOCALITY_BYTES_TOTAL));
|
||||
assertEquals(0, statistics.get(HdfsLocalityReporter.LOCALITY_BYTES_RATIO));
|
||||
|
||||
// create a directory and a file
|
||||
String path = HdfsTestUtil.getURI(dfsCluster) + "/solr3/";
|
||||
Directory dir = factory.create(path, NoLockFactory.INSTANCE, DirContext.DEFAULT);
|
||||
try(IndexOutput writer = dir.createOutput("output", null)) {
|
||||
writer.writeLong(42l);
|
||||
}
|
||||
|
||||
final long long_bytes = Long.SIZE / Byte.SIZE;
|
||||
|
||||
// no locality because hostname not set
|
||||
statistics = localityBean.getStatistics();
|
||||
assertEquals(long_bytes, statistics.get(HdfsLocalityReporter.LOCALITY_BYTES_TOTAL));
|
||||
assertEquals(1, statistics.get(HdfsLocalityReporter.LOCALITY_BLOCKS_TOTAL));
|
||||
assertEquals(0, statistics.get(HdfsLocalityReporter.LOCALITY_BLOCKS_LOCAL));
|
||||
|
||||
// set hostname and check again
|
||||
factory.setHost("127.0.0.1");
|
||||
statistics = localityBean.getStatistics();
|
||||
assertEquals(long_bytes, statistics.get(HdfsLocalityReporter.LOCALITY_BYTES_LOCAL));
|
||||
|
||||
factory.close();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue