HDFS-14313. Get hdfs used space from FsDatasetImpl#volumeMap#ReplicaInfo in memory instead of df/du. Contributed by Lisheng Sun.

(cherry picked from commit a5bb1e8ee8)

 Conflicts:
	hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

(cherry picked from commit c74027d9d3)

 Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
This commit is contained in:
Yiqun Lin 2019-08-07 10:18:11 +08:00 committed by Wei-Chiu Chuang
parent 2a3eb04996
commit 3aea2bcf12
11 changed files with 439 additions and 16 deletions

View File

@ -72,6 +72,25 @@ public class CommonConfigurationKeysPublic {
public static final String FS_DU_INTERVAL_KEY = "fs.du.interval";
/** Default value for FS_DU_INTERVAL_KEY */
public static final long FS_DU_INTERVAL_DEFAULT = 600000;
/**
* @see
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
* core-default.xml</a>
*/
public static final String FS_GETSPACEUSED_CLASSNAME =
"fs.getspaceused.classname";
/**
* @see
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
* core-default.xml</a>
*/
public static final String FS_GETSPACEUSED_JITTER_KEY =
"fs.getspaceused.jitterMillis";
/** Default value for FS_GETSPACEUSED_JITTER_KEY */
public static final long FS_GETSPACEUSED_JITTER_DEFAULT = 60000;
/**
* @see
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">

View File

@ -26,7 +26,6 @@ import java.io.File;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.TimeUnit;
public interface GetSpaceUsed {
@ -36,20 +35,16 @@ public interface GetSpaceUsed {
/**
* The builder class
*/
final class Builder {
class Builder {
static final Logger LOG = LoggerFactory.getLogger(Builder.class);
static final String CLASSNAME_KEY = "fs.getspaceused.classname";
static final String JITTER_KEY = "fs.getspaceused.jitterMillis";
static final long DEFAULT_JITTER = TimeUnit.MINUTES.toMillis(1);
private Configuration conf;
private Class<? extends GetSpaceUsed> klass = null;
private File path = null;
private Long interval = null;
private Long jitter = null;
private Long initialUsed = null;
private Constructor<? extends GetSpaceUsed> cons;
public Configuration getConf() {
return conf;
@ -89,7 +84,8 @@ public interface GetSpaceUsed {
if (conf == null) {
return result;
}
return conf.getClass(CLASSNAME_KEY, result, GetSpaceUsed.class);
return conf.getClass(CommonConfigurationKeys.FS_GETSPACEUSED_CLASSNAME,
result, GetSpaceUsed.class);
}
public Builder setKlass(Class<? extends GetSpaceUsed> klass) {
@ -124,9 +120,10 @@ public interface GetSpaceUsed {
Configuration configuration = this.conf;
if (configuration == null) {
return DEFAULT_JITTER;
return CommonConfigurationKeys.FS_GETSPACEUSED_JITTER_DEFAULT;
}
return configuration.getLong(JITTER_KEY, DEFAULT_JITTER);
return configuration.getLong(CommonConfigurationKeys.FS_GETSPACEUSED_JITTER_KEY,
CommonConfigurationKeys.FS_GETSPACEUSED_JITTER_DEFAULT);
}
return jitter;
}
@ -136,11 +133,21 @@ public interface GetSpaceUsed {
return this;
}
public Constructor<? extends GetSpaceUsed> getCons() {
return cons;
}
public void setCons(Constructor<? extends GetSpaceUsed> cons) {
this.cons = cons;
}
public GetSpaceUsed build() throws IOException {
GetSpaceUsed getSpaceUsed = null;
try {
Constructor<? extends GetSpaceUsed> cons =
getKlass().getConstructor(Builder.class);
if (cons == null) {
cons = getKlass().getConstructor(Builder.class);
}
getSpaceUsed = cons.newInstance(this);
} catch (InstantiationException e) {
LOG.warn("Error trying to create an instance of " + getKlass(), e);

View File

@ -3090,4 +3090,25 @@
address. (i.e 0.0.0.0)
</description>
</property>
<property>
<name>fs.getspaceused.classname</name>
<value></value>
<description>
The class that can tell estimate much space is used in a directory.
There are four impl classes that being supported:
org.apache.hadoop.fs.DU(default), org.apache.hadoop.fs.WindowsGetSpaceUsed
org.apache.hadoop.fs.DFCachingGetSpaceUsed and
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.ReplicaCachingGetSpaceUsed.
And the ReplicaCachingGetSpaceUsed impl class only used in HDFS module.
</description>
</property>
<property>
<name>fs.getspaceused.jitterMillis</name>
<value>60000</value>
<description>
fs space usage statistics refresh jitter in msec.
</description>
</property>
</configuration>

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CachingGetSpaceUsed;
import org.apache.hadoop.fs.GetSpaceUsed;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Fast and accurate class to tell how much space HDFS is using.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class FSCachingGetSpaceUsed extends CachingGetSpaceUsed {
static final Logger LOG =
LoggerFactory.getLogger(FSCachingGetSpaceUsed.class);
public FSCachingGetSpaceUsed(Builder builder) throws IOException {
super(builder);
}
/**
* The builder class.
*/
public static class Builder extends GetSpaceUsed.Builder {
private FsVolumeImpl volume;
private String bpid;
public FsVolumeImpl getVolume() {
return volume;
}
public Builder setVolume(FsVolumeImpl fsVolume) {
this.volume = fsVolume;
return this;
}
public String getBpid() {
return bpid;
}
public Builder setBpid(String bpid) {
this.bpid = bpid;
return this;
}
@Override
public GetSpaceUsed build() throws IOException {
Class clazz = getKlass();
if (FSCachingGetSpaceUsed.class.isAssignableFrom(clazz)) {
try {
setCons(clazz.getConstructor(Builder.class));
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
}
}
return super.build();
}
}
}

View File

@ -660,4 +660,6 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* Acquire the lock of the data set.
*/
AutoCloseableLock acquireDatasetLock();
Set<? extends Replica> deepCopyReplica(String bpid) throws IOException;
}

View File

@ -45,6 +45,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CachingGetSpaceUsed;
import org.apache.hadoop.fs.CommonConfigurationKeys;
@ -180,10 +181,14 @@ class BlockPoolSlice {
// Use cached value initially if available. Or the following call will
// block until the initial du command completes.
this.dfsUsage = new CachingGetSpaceUsed.Builder().setPath(bpDir)
this.dfsUsage = new FSCachingGetSpaceUsed.Builder().setBpid(bpid)
.setVolume(volume)
.setPath(bpDir)
.setConf(conf)
.setInitialUsed(loadDfsUsed())
.build();
if (addReplicaThreadPool == null) {
// initialize add replica fork join pool
initializeAddReplicaPool(conf);

View File

@ -30,6 +30,7 @@ import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -197,6 +198,18 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
}
/**
* The deepCopyReplica call doesn't use the datasetock since it will lead the
* potential deadlock with the {@link FsVolumeList#addBlockPool} call.
*/
@Override
public Set<? extends Replica> deepCopyReplica(String bpid)
throws IOException {
Set<? extends Replica> replicas =
new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET
: volumeMap.replicas(bpid));
return Collections.unmodifiableSet(replicas);
}
/**
* This should be primarily used for testing.

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
import java.util.Objects;
/**
* Fast and accurate class to tell how much space HDFS is using. This class gets
* hdfs used space from FsDatasetImpl#volumeMap#ReplicaInfos that uses an in
* memory way.
*
* Getting hdfs used space by ReplicaCachingGetSpaceUsed impl only includes
* block and meta files, but DU impl is blockpool dir based statistics that will
* include additional files, e.g. tmp dir, scanner.cursor file. Getting space
* used by DU impl will be greater than by ReplicaCachingGetSpaceUsed impl, but
* the latter is more accurate.
*
* Setting fs.getspaceused.classname to
* org.apache.hadoop.hdfs.server.datanode.fsdataset
* impl.ReplicaCachingGetSpaceUsed in your core-site.xml if we want to enable.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ReplicaCachingGetSpaceUsed extends FSCachingGetSpaceUsed {
static final Logger LOG =
LoggerFactory.getLogger(ReplicaCachingGetSpaceUsed.class);
private static final long DEEP_COPY_REPLICA_THRESHOLD_MS = 50;
private static final long REPLICA_CACHING_GET_SPACE_USED_THRESHOLD_MS = 1000;
private final FsVolumeImpl volume;
private final String bpid;
public ReplicaCachingGetSpaceUsed(Builder builder) throws IOException {
super(builder);
volume = builder.getVolume();
bpid = builder.getBpid();
}
@Override
protected void refresh() {
long start = Time.monotonicNow();
long dfsUsed = 0;
long count = 0;
FsDatasetSpi fsDataset = volume.getDataset();
try {
Collection<ReplicaInfo> replicaInfos =
(Collection<ReplicaInfo>) fsDataset.deepCopyReplica(bpid);
long cost = Time.monotonicNow() - start;
if (cost > DEEP_COPY_REPLICA_THRESHOLD_MS) {
LOG.debug(
"Copy replica infos, blockPoolId: {}, replicas size: {}, "
+ "duration: {}ms",
bpid, replicaInfos.size(), Time.monotonicNow() - start);
}
if (CollectionUtils.isNotEmpty(replicaInfos)) {
for (ReplicaInfo replicaInfo : replicaInfos) {
if (Objects.equals(replicaInfo.getVolume().getStorageID(),
volume.getStorageID())) {
dfsUsed += replicaInfo.getBlockDataLength();
dfsUsed += replicaInfo.getMetadataLength();
count++;
}
}
}
this.used.set(dfsUsed);
cost = Time.monotonicNow() - start;
if (cost > REPLICA_CACHING_GET_SPACE_USED_THRESHOLD_MS) {
LOG.debug(
"Refresh dfs used, bpid: {}, replicas size: {}, dfsUsed: {} "
+ "on volume: {}, duration: {}ms",
bpid, count, used, volume.getStorageID(),
Time.monotonicNow() - start);
}
} catch (Exception e) {
LOG.error("ReplicaCachingGetSpaceUsed refresh error", e);
}
}
}

View File

@ -25,8 +25,10 @@ import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -1569,5 +1571,15 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public AutoCloseableLock acquireDatasetLock() {
return datasetLock.acquire();
}
@Override
public Set<? extends Replica> deepCopyReplica(String bpid)
throws IOException {
Set<BInfo> replicas = new HashSet<>();
for (SimulatedStorage s : storages) {
replicas.addAll(s.getBlockMap(bpid).values());
}
return Collections.unmodifiableSet(replicas);
}
}

View File

@ -454,4 +454,10 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
public AutoCloseableLock acquireDatasetLock() {
return null;
}
@Override
public Set<? extends Replica> deepCopyReplica(String bpid)
throws IOException {
return Collections.EMPTY_SET;
}
}

View File

@ -0,0 +1,148 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CachingGetSpaceUsed;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
import static org.junit.Assert.assertEquals;
/**
* Unit test for ReplicaCachingGetSpaceUsed class.
*/
public class TestReplicaCachingGetSpaceUsed {
private Configuration conf = null;
private MiniDFSCluster cluster;
private DistributedFileSystem fs;
private DataNode dataNode;
@Before
public void setUp()
throws IOException, NoSuchMethodException, InterruptedException {
conf = new Configuration();
conf.setClass("fs.getspaceused.classname", ReplicaCachingGetSpaceUsed.class,
CachingGetSpaceUsed.class);
conf.setLong(FS_DU_INTERVAL_KEY, 1000);
conf.setLong("fs.getspaceused.jitterMillis", 0);
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
dataNode = cluster.getDataNodes().get(0);
fs = cluster.getFileSystem();
}
@After
public void tearDown() throws IOException {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
@Test
public void testReplicaCachingGetSpaceUsedByFINALIZEDReplica()
throws Exception {
FSDataOutputStream os = fs
.create(new Path("/testReplicaCachingGetSpaceUsedByFINALIZEDReplica"));
byte[] bytes = new byte[20480];
InputStream is = new ByteArrayInputStream(bytes);
IOUtils.copyBytes(is, os, bytes.length);
os.hsync();
os.close();
DFSInputStream dfsInputStream = fs.getClient()
.open("/testReplicaCachingGetSpaceUsedByFINALIZEDReplica");
long blockLength = 0;
long metaLength = 0;
List<LocatedBlock> locatedBlocks = dfsInputStream.getAllBlocks();
for (LocatedBlock locatedBlock : locatedBlocks) {
ExtendedBlock extendedBlock = locatedBlock.getBlock();
blockLength += extendedBlock.getLocalBlock().getNumBytes();
metaLength += dataNode.getFSDataset()
.getMetaDataInputStream(extendedBlock).getLength();
}
// Guarantee ReplicaCachingGetSpaceUsed#refresh() is called after replica
// has been written to disk.
Thread.sleep(2000);
assertEquals(blockLength + metaLength,
dataNode.getFSDataset().getDfsUsed());
fs.delete(new Path("/testReplicaCachingGetSpaceUsedByFINALIZEDReplica"),
true);
}
@Test
public void testReplicaCachingGetSpaceUsedByRBWReplica() throws Exception {
FSDataOutputStream os =
fs.create(new Path("/testReplicaCachingGetSpaceUsedByRBWReplica"));
byte[] bytes = new byte[20480];
InputStream is = new ByteArrayInputStream(bytes);
IOUtils.copyBytes(is, os, bytes.length);
os.hsync();
DFSInputStream dfsInputStream =
fs.getClient().open("/testReplicaCachingGetSpaceUsedByRBWReplica");
long blockLength = 0;
long metaLength = 0;
List<LocatedBlock> locatedBlocks = dfsInputStream.getAllBlocks();
for (LocatedBlock locatedBlock : locatedBlocks) {
ExtendedBlock extendedBlock = locatedBlock.getBlock();
blockLength += extendedBlock.getLocalBlock().getNumBytes();
metaLength += dataNode.getFSDataset()
.getMetaDataInputStream(extendedBlock).getLength();
}
// Guarantee ReplicaCachingGetSpaceUsed#refresh() is called after replica
// has been written to disk.
Thread.sleep(2000);
assertEquals(blockLength + metaLength,
dataNode.getFSDataset().getDfsUsed());
os.close();
// Guarantee ReplicaCachingGetSpaceUsed#refresh() is called, dfsspaceused is
// recalculated
Thread.sleep(2000);
// After close operation, the replica state will be transformed from RBW to
// finalized. But the space used of these replicas are all included and the
// dfsUsed value should be same.
assertEquals(blockLength + metaLength,
dataNode.getFSDataset().getDfsUsed());
fs.delete(new Path("/testReplicaCachingGetSpaceUsedByRBWReplica"), true);
}
}