HDFS-14313. Get hdfs used space from FsDatasetImpl#volumeMap#ReplicaInfo in memory instead of df/du. Contributed by Lisheng Sun.
(cherry picked from commit a5bb1e8ee871df1111ff77d0f6921b13c8ffb50e) Conflicts: hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
This commit is contained in:
parent
3510a9e5e5
commit
c74027d9d3
@ -72,6 +72,25 @@ public class CommonConfigurationKeysPublic {
|
|||||||
public static final String FS_DU_INTERVAL_KEY = "fs.du.interval";
|
public static final String FS_DU_INTERVAL_KEY = "fs.du.interval";
|
||||||
/** Default value for FS_DU_INTERVAL_KEY */
|
/** Default value for FS_DU_INTERVAL_KEY */
|
||||||
public static final long FS_DU_INTERVAL_DEFAULT = 600000;
|
public static final long FS_DU_INTERVAL_DEFAULT = 600000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @see
|
||||||
|
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
|
||||||
|
* core-default.xml</a>
|
||||||
|
*/
|
||||||
|
public static final String FS_GETSPACEUSED_CLASSNAME =
|
||||||
|
"fs.getspaceused.classname";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @see
|
||||||
|
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
|
||||||
|
* core-default.xml</a>
|
||||||
|
*/
|
||||||
|
public static final String FS_GETSPACEUSED_JITTER_KEY =
|
||||||
|
"fs.getspaceused.jitterMillis";
|
||||||
|
/** Default value for FS_GETSPACEUSED_JITTER_KEY */
|
||||||
|
public static final long FS_GETSPACEUSED_JITTER_DEFAULT = 60000;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @see
|
* @see
|
||||||
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
|
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
|
||||||
|
@ -26,7 +26,6 @@
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.Constructor;
|
import java.lang.reflect.Constructor;
|
||||||
import java.lang.reflect.InvocationTargetException;
|
import java.lang.reflect.InvocationTargetException;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
public interface GetSpaceUsed {
|
public interface GetSpaceUsed {
|
||||||
|
|
||||||
@ -36,20 +35,16 @@ public interface GetSpaceUsed {
|
|||||||
/**
|
/**
|
||||||
* The builder class
|
* The builder class
|
||||||
*/
|
*/
|
||||||
final class Builder {
|
class Builder {
|
||||||
static final Logger LOG = LoggerFactory.getLogger(Builder.class);
|
static final Logger LOG = LoggerFactory.getLogger(Builder.class);
|
||||||
|
|
||||||
static final String CLASSNAME_KEY = "fs.getspaceused.classname";
|
|
||||||
static final String JITTER_KEY = "fs.getspaceused.jitterMillis";
|
|
||||||
static final long DEFAULT_JITTER = TimeUnit.MINUTES.toMillis(1);
|
|
||||||
|
|
||||||
|
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private Class<? extends GetSpaceUsed> klass = null;
|
private Class<? extends GetSpaceUsed> klass = null;
|
||||||
private File path = null;
|
private File path = null;
|
||||||
private Long interval = null;
|
private Long interval = null;
|
||||||
private Long jitter = null;
|
private Long jitter = null;
|
||||||
private Long initialUsed = null;
|
private Long initialUsed = null;
|
||||||
|
private Constructor<? extends GetSpaceUsed> cons;
|
||||||
|
|
||||||
public Configuration getConf() {
|
public Configuration getConf() {
|
||||||
return conf;
|
return conf;
|
||||||
@ -89,7 +84,8 @@ public Class<? extends GetSpaceUsed> getKlass() {
|
|||||||
if (conf == null) {
|
if (conf == null) {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
return conf.getClass(CLASSNAME_KEY, result, GetSpaceUsed.class);
|
return conf.getClass(CommonConfigurationKeys.FS_GETSPACEUSED_CLASSNAME,
|
||||||
|
result, GetSpaceUsed.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder setKlass(Class<? extends GetSpaceUsed> klass) {
|
public Builder setKlass(Class<? extends GetSpaceUsed> klass) {
|
||||||
@ -124,9 +120,10 @@ public long getJitter() {
|
|||||||
Configuration configuration = this.conf;
|
Configuration configuration = this.conf;
|
||||||
|
|
||||||
if (configuration == null) {
|
if (configuration == null) {
|
||||||
return DEFAULT_JITTER;
|
return CommonConfigurationKeys.FS_GETSPACEUSED_JITTER_DEFAULT;
|
||||||
}
|
}
|
||||||
return configuration.getLong(JITTER_KEY, DEFAULT_JITTER);
|
return configuration.getLong(CommonConfigurationKeys.FS_GETSPACEUSED_JITTER_KEY,
|
||||||
|
CommonConfigurationKeys.FS_GETSPACEUSED_JITTER_DEFAULT);
|
||||||
}
|
}
|
||||||
return jitter;
|
return jitter;
|
||||||
}
|
}
|
||||||
@ -136,11 +133,21 @@ public Builder setJitter(Long jit) {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Constructor<? extends GetSpaceUsed> getCons() {
|
||||||
|
return cons;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCons(Constructor<? extends GetSpaceUsed> cons) {
|
||||||
|
this.cons = cons;
|
||||||
|
}
|
||||||
|
|
||||||
public GetSpaceUsed build() throws IOException {
|
public GetSpaceUsed build() throws IOException {
|
||||||
GetSpaceUsed getSpaceUsed = null;
|
GetSpaceUsed getSpaceUsed = null;
|
||||||
try {
|
try {
|
||||||
Constructor<? extends GetSpaceUsed> cons =
|
if (cons == null) {
|
||||||
getKlass().getConstructor(Builder.class);
|
cons = getKlass().getConstructor(Builder.class);
|
||||||
|
}
|
||||||
|
|
||||||
getSpaceUsed = cons.newInstance(this);
|
getSpaceUsed = cons.newInstance(this);
|
||||||
} catch (InstantiationException e) {
|
} catch (InstantiationException e) {
|
||||||
LOG.warn("Error trying to create an instance of " + getKlass(), e);
|
LOG.warn("Error trying to create an instance of " + getKlass(), e);
|
||||||
|
@ -3210,4 +3210,25 @@
|
|||||||
address. (i.e 0.0.0.0)
|
address. (i.e 0.0.0.0)
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>fs.getspaceused.classname</name>
|
||||||
|
<value></value>
|
||||||
|
<description>
|
||||||
|
The class that can tell estimate much space is used in a directory.
|
||||||
|
There are four impl classes that being supported:
|
||||||
|
org.apache.hadoop.fs.DU(default), org.apache.hadoop.fs.WindowsGetSpaceUsed
|
||||||
|
org.apache.hadoop.fs.DFCachingGetSpaceUsed and
|
||||||
|
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.ReplicaCachingGetSpaceUsed.
|
||||||
|
And the ReplicaCachingGetSpaceUsed impl class only used in HDFS module.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>fs.getspaceused.jitterMillis</name>
|
||||||
|
<value>60000</value>
|
||||||
|
<description>
|
||||||
|
fs space usage statistics refresh jitter in msec.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
</configuration>
|
</configuration>
|
||||||
|
@ -0,0 +1,82 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.fs.CachingGetSpaceUsed;
|
||||||
|
import org.apache.hadoop.fs.GetSpaceUsed;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fast and accurate class to tell how much space HDFS is using.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public abstract class FSCachingGetSpaceUsed extends CachingGetSpaceUsed {
|
||||||
|
static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(FSCachingGetSpaceUsed.class);
|
||||||
|
|
||||||
|
public FSCachingGetSpaceUsed(Builder builder) throws IOException {
|
||||||
|
super(builder);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The builder class.
|
||||||
|
*/
|
||||||
|
public static class Builder extends GetSpaceUsed.Builder {
|
||||||
|
private FsVolumeImpl volume;
|
||||||
|
private String bpid;
|
||||||
|
|
||||||
|
public FsVolumeImpl getVolume() {
|
||||||
|
return volume;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder setVolume(FsVolumeImpl fsVolume) {
|
||||||
|
this.volume = fsVolume;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getBpid() {
|
||||||
|
return bpid;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder setBpid(String bpid) {
|
||||||
|
this.bpid = bpid;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetSpaceUsed build() throws IOException {
|
||||||
|
Class clazz = getKlass();
|
||||||
|
if (FSCachingGetSpaceUsed.class.isAssignableFrom(clazz)) {
|
||||||
|
try {
|
||||||
|
setCons(clazz.getConstructor(Builder.class));
|
||||||
|
} catch (NoSuchMethodException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return super.build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -660,4 +660,6 @@ ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block,
|
|||||||
* Acquire the lock of the data set.
|
* Acquire the lock of the data set.
|
||||||
*/
|
*/
|
||||||
AutoCloseableLock acquireDatasetLock();
|
AutoCloseableLock acquireDatasetLock();
|
||||||
|
|
||||||
|
Set<? extends Replica> deepCopyReplica(String bpid) throws IOException;
|
||||||
}
|
}
|
||||||
|
@ -43,6 +43,7 @@
|
|||||||
import java.util.concurrent.RecursiveAction;
|
import java.util.concurrent.RecursiveAction;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@ -180,10 +181,14 @@ public int compare(File f1, File f2) {
|
|||||||
|
|
||||||
// Use cached value initially if available. Or the following call will
|
// Use cached value initially if available. Or the following call will
|
||||||
// block until the initial du command completes.
|
// block until the initial du command completes.
|
||||||
this.dfsUsage = new CachingGetSpaceUsed.Builder().setPath(bpDir)
|
this.dfsUsage = new FSCachingGetSpaceUsed.Builder().setBpid(bpid)
|
||||||
.setConf(conf)
|
.setVolume(volume)
|
||||||
.setInitialUsed(loadDfsUsed())
|
.setPath(bpDir)
|
||||||
.build();
|
.setConf(conf)
|
||||||
|
.setInitialUsed(loadDfsUsed())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
|
||||||
if (addReplicaThreadPool == null) {
|
if (addReplicaThreadPool == null) {
|
||||||
// initialize add replica fork join pool
|
// initialize add replica fork join pool
|
||||||
initializeAddReplicaPool(conf);
|
initializeAddReplicaPool(conf);
|
||||||
|
@ -30,6 +30,7 @@
|
|||||||
import java.nio.channels.FileChannel;
|
import java.nio.channels.FileChannel;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
@ -197,6 +198,18 @@ public Block getStoredBlock(String bpid, long blkid)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The deepCopyReplica call doesn't use the datasetock since it will lead the
|
||||||
|
* potential deadlock with the {@link FsVolumeList#addBlockPool} call.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Set<? extends Replica> deepCopyReplica(String bpid)
|
||||||
|
throws IOException {
|
||||||
|
Set<? extends Replica> replicas =
|
||||||
|
new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET
|
||||||
|
: volumeMap.replicas(bpid));
|
||||||
|
return Collections.unmodifiableSet(replicas);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This should be primarily used for testing.
|
* This should be primarily used for testing.
|
||||||
|
@ -0,0 +1,108 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||||
|
|
||||||
|
import org.apache.commons.collections.CollectionUtils;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fast and accurate class to tell how much space HDFS is using. This class gets
|
||||||
|
* hdfs used space from FsDatasetImpl#volumeMap#ReplicaInfos that uses an in
|
||||||
|
* memory way.
|
||||||
|
*
|
||||||
|
* Getting hdfs used space by ReplicaCachingGetSpaceUsed impl only includes
|
||||||
|
* block and meta files, but DU impl is blockpool dir based statistics that will
|
||||||
|
* include additional files, e.g. tmp dir, scanner.cursor file. Getting space
|
||||||
|
* used by DU impl will be greater than by ReplicaCachingGetSpaceUsed impl, but
|
||||||
|
* the latter is more accurate.
|
||||||
|
*
|
||||||
|
* Setting fs.getspaceused.classname to
|
||||||
|
* org.apache.hadoop.hdfs.server.datanode.fsdataset
|
||||||
|
* impl.ReplicaCachingGetSpaceUsed in your core-site.xml if we want to enable.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class ReplicaCachingGetSpaceUsed extends FSCachingGetSpaceUsed {
|
||||||
|
static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(ReplicaCachingGetSpaceUsed.class);
|
||||||
|
|
||||||
|
private static final long DEEP_COPY_REPLICA_THRESHOLD_MS = 50;
|
||||||
|
private static final long REPLICA_CACHING_GET_SPACE_USED_THRESHOLD_MS = 1000;
|
||||||
|
private final FsVolumeImpl volume;
|
||||||
|
private final String bpid;
|
||||||
|
|
||||||
|
public ReplicaCachingGetSpaceUsed(Builder builder) throws IOException {
|
||||||
|
super(builder);
|
||||||
|
volume = builder.getVolume();
|
||||||
|
bpid = builder.getBpid();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void refresh() {
|
||||||
|
long start = Time.monotonicNow();
|
||||||
|
long dfsUsed = 0;
|
||||||
|
long count = 0;
|
||||||
|
|
||||||
|
FsDatasetSpi fsDataset = volume.getDataset();
|
||||||
|
try {
|
||||||
|
Collection<ReplicaInfo> replicaInfos =
|
||||||
|
(Collection<ReplicaInfo>) fsDataset.deepCopyReplica(bpid);
|
||||||
|
long cost = Time.monotonicNow() - start;
|
||||||
|
if (cost > DEEP_COPY_REPLICA_THRESHOLD_MS) {
|
||||||
|
LOG.debug(
|
||||||
|
"Copy replica infos, blockPoolId: {}, replicas size: {}, "
|
||||||
|
+ "duration: {}ms",
|
||||||
|
bpid, replicaInfos.size(), Time.monotonicNow() - start);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (CollectionUtils.isNotEmpty(replicaInfos)) {
|
||||||
|
for (ReplicaInfo replicaInfo : replicaInfos) {
|
||||||
|
if (Objects.equals(replicaInfo.getVolume().getStorageID(),
|
||||||
|
volume.getStorageID())) {
|
||||||
|
dfsUsed += replicaInfo.getBlockDataLength();
|
||||||
|
dfsUsed += replicaInfo.getMetadataLength();
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.used.set(dfsUsed);
|
||||||
|
cost = Time.monotonicNow() - start;
|
||||||
|
if (cost > REPLICA_CACHING_GET_SPACE_USED_THRESHOLD_MS) {
|
||||||
|
LOG.debug(
|
||||||
|
"Refresh dfs used, bpid: {}, replicas size: {}, dfsUsed: {} "
|
||||||
|
+ "on volume: {}, duration: {}ms",
|
||||||
|
bpid, count, used, volume.getStorageID(),
|
||||||
|
Time.monotonicNow() - start);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("ReplicaCachingGetSpaceUsed refresh error", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -25,8 +25,10 @@
|
|||||||
import java.nio.channels.ClosedChannelException;
|
import java.nio.channels.ClosedChannelException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
@ -1569,5 +1571,15 @@ public ReplicaInfo moveBlockAcrossVolumes(ExtendedBlock block,
|
|||||||
public AutoCloseableLock acquireDatasetLock() {
|
public AutoCloseableLock acquireDatasetLock() {
|
||||||
return datasetLock.acquire();
|
return datasetLock.acquire();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<? extends Replica> deepCopyReplica(String bpid)
|
||||||
|
throws IOException {
|
||||||
|
Set<BInfo> replicas = new HashSet<>();
|
||||||
|
for (SimulatedStorage s : storages) {
|
||||||
|
replicas.addAll(s.getBlockMap(bpid).values());
|
||||||
|
}
|
||||||
|
return Collections.unmodifiableSet(replicas);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -454,4 +454,10 @@ public ReplicaInfo moveBlockAcrossVolumes(ExtendedBlock block,
|
|||||||
public AutoCloseableLock acquireDatasetLock() {
|
public AutoCloseableLock acquireDatasetLock() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<? extends Replica> deepCopyReplica(String bpid)
|
||||||
|
throws IOException {
|
||||||
|
return Collections.EMPTY_SET;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,148 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CachingGetSpaceUsed;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.DFSInputStream;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit test for ReplicaCachingGetSpaceUsed class.
|
||||||
|
*/
|
||||||
|
public class TestReplicaCachingGetSpaceUsed {
|
||||||
|
private Configuration conf = null;
|
||||||
|
private MiniDFSCluster cluster;
|
||||||
|
private DistributedFileSystem fs;
|
||||||
|
private DataNode dataNode;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp()
|
||||||
|
throws IOException, NoSuchMethodException, InterruptedException {
|
||||||
|
conf = new Configuration();
|
||||||
|
conf.setClass("fs.getspaceused.classname", ReplicaCachingGetSpaceUsed.class,
|
||||||
|
CachingGetSpaceUsed.class);
|
||||||
|
conf.setLong(FS_DU_INTERVAL_KEY, 1000);
|
||||||
|
conf.setLong("fs.getspaceused.jitterMillis", 0);
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
dataNode = cluster.getDataNodes().get(0);
|
||||||
|
|
||||||
|
fs = cluster.getFileSystem();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws IOException {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
cluster = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReplicaCachingGetSpaceUsedByFINALIZEDReplica()
|
||||||
|
throws Exception {
|
||||||
|
FSDataOutputStream os = fs
|
||||||
|
.create(new Path("/testReplicaCachingGetSpaceUsedByFINALIZEDReplica"));
|
||||||
|
byte[] bytes = new byte[20480];
|
||||||
|
InputStream is = new ByteArrayInputStream(bytes);
|
||||||
|
IOUtils.copyBytes(is, os, bytes.length);
|
||||||
|
os.hsync();
|
||||||
|
os.close();
|
||||||
|
|
||||||
|
DFSInputStream dfsInputStream = fs.getClient()
|
||||||
|
.open("/testReplicaCachingGetSpaceUsedByFINALIZEDReplica");
|
||||||
|
long blockLength = 0;
|
||||||
|
long metaLength = 0;
|
||||||
|
List<LocatedBlock> locatedBlocks = dfsInputStream.getAllBlocks();
|
||||||
|
for (LocatedBlock locatedBlock : locatedBlocks) {
|
||||||
|
ExtendedBlock extendedBlock = locatedBlock.getBlock();
|
||||||
|
blockLength += extendedBlock.getLocalBlock().getNumBytes();
|
||||||
|
metaLength += dataNode.getFSDataset()
|
||||||
|
.getMetaDataInputStream(extendedBlock).getLength();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Guarantee ReplicaCachingGetSpaceUsed#refresh() is called after replica
|
||||||
|
// has been written to disk.
|
||||||
|
Thread.sleep(2000);
|
||||||
|
assertEquals(blockLength + metaLength,
|
||||||
|
dataNode.getFSDataset().getDfsUsed());
|
||||||
|
|
||||||
|
fs.delete(new Path("/testReplicaCachingGetSpaceUsedByFINALIZEDReplica"),
|
||||||
|
true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReplicaCachingGetSpaceUsedByRBWReplica() throws Exception {
|
||||||
|
FSDataOutputStream os =
|
||||||
|
fs.create(new Path("/testReplicaCachingGetSpaceUsedByRBWReplica"));
|
||||||
|
byte[] bytes = new byte[20480];
|
||||||
|
InputStream is = new ByteArrayInputStream(bytes);
|
||||||
|
IOUtils.copyBytes(is, os, bytes.length);
|
||||||
|
os.hsync();
|
||||||
|
|
||||||
|
DFSInputStream dfsInputStream =
|
||||||
|
fs.getClient().open("/testReplicaCachingGetSpaceUsedByRBWReplica");
|
||||||
|
long blockLength = 0;
|
||||||
|
long metaLength = 0;
|
||||||
|
List<LocatedBlock> locatedBlocks = dfsInputStream.getAllBlocks();
|
||||||
|
for (LocatedBlock locatedBlock : locatedBlocks) {
|
||||||
|
ExtendedBlock extendedBlock = locatedBlock.getBlock();
|
||||||
|
blockLength += extendedBlock.getLocalBlock().getNumBytes();
|
||||||
|
metaLength += dataNode.getFSDataset()
|
||||||
|
.getMetaDataInputStream(extendedBlock).getLength();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Guarantee ReplicaCachingGetSpaceUsed#refresh() is called after replica
|
||||||
|
// has been written to disk.
|
||||||
|
Thread.sleep(2000);
|
||||||
|
assertEquals(blockLength + metaLength,
|
||||||
|
dataNode.getFSDataset().getDfsUsed());
|
||||||
|
|
||||||
|
os.close();
|
||||||
|
|
||||||
|
// Guarantee ReplicaCachingGetSpaceUsed#refresh() is called, dfsspaceused is
|
||||||
|
// recalculated
|
||||||
|
Thread.sleep(2000);
|
||||||
|
// After close operation, the replica state will be transformed from RBW to
|
||||||
|
// finalized. But the space used of these replicas are all included and the
|
||||||
|
// dfsUsed value should be same.
|
||||||
|
assertEquals(blockLength + metaLength,
|
||||||
|
dataNode.getFSDataset().getDfsUsed());
|
||||||
|
|
||||||
|
fs.delete(new Path("/testReplicaCachingGetSpaceUsedByRBWReplica"), true);
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user