HADOOP-12973. Make DU pluggable. (Elliott Clark via cmccabe)

(cherry picked from commit 35f0770555)
This commit is contained in:
Colin Patrick Mccabe 2016-04-12 16:20:30 -07:00
parent 6e37e40dd2
commit 2b0b332e2f
7 changed files with 619 additions and 258 deletions

View File

@ -0,0 +1,168 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* Interface for class that can tell estimate much space
* is used in a directory.
* <p>
* The implementor is fee to cache space used. As such there
* are methods to update the cached value with any known changes.
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Evolving
public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
static final Logger LOG = LoggerFactory.getLogger(CachingGetSpaceUsed.class);
protected final AtomicLong used = new AtomicLong();
private final AtomicBoolean running = new AtomicBoolean(true);
private final long refreshInterval;
private final String dirPath;
private Thread refreshUsed;
/**
* This is the constructor used by the builder.
* All overriding classes should implement this.
*/
public CachingGetSpaceUsed(CachingGetSpaceUsed.Builder builder)
throws IOException {
this(builder.getPath(), builder.getInterval(), builder.getInitialUsed());
}
/**
* Keeps track of disk usage.
*
* @param path the path to check disk usage in
* @param interval refresh the disk usage at this interval
* @param initialUsed use this value until next refresh
* @throws IOException if we fail to refresh the disk usage
*/
CachingGetSpaceUsed(File path,
long interval,
long initialUsed) throws IOException {
dirPath = path.getCanonicalPath();
refreshInterval = interval;
used.set(initialUsed);
}
void init() {
if (used.get() < 0) {
used.set(0);
refresh();
}
if (refreshInterval > 0) {
refreshUsed = new Thread(new RefreshThread(this),
"refreshUsed-" + dirPath);
refreshUsed.setDaemon(true);
refreshUsed.start();
} else {
running.set(false);
refreshUsed = null;
}
}
protected abstract void refresh();
/**
* @return an estimate of space used in the directory path.
*/
@Override public long getUsed() throws IOException {
return Math.max(used.get(), 0);
}
/**
* @return The directory path being monitored.
*/
public String getDirPath() {
return dirPath;
}
/**
* Increment the cached value of used space.
*/
public void incDfsUsed(long value) {
used.addAndGet(value);
}
/**
* Is the background thread running.
*/
boolean running() {
return running.get();
}
/**
* How long in between runs of the background refresh.
*/
long getRefreshInterval() {
return refreshInterval;
}
/**
* Reset the current used data amount. This should be called
* when the cached value is re-computed.
*
* @param usedValue new value that should be the disk usage.
*/
protected void setUsed(long usedValue) {
this.used.set(usedValue);
}
@Override
public void close() throws IOException {
running.set(false);
if (refreshUsed != null) {
refreshUsed.interrupt();
}
}
private static final class RefreshThread implements Runnable {
final CachingGetSpaceUsed spaceUsed;
RefreshThread(CachingGetSpaceUsed spaceUsed) {
this.spaceUsed = spaceUsed;
}
@Override
public void run() {
while (spaceUsed.running()) {
try {
Thread.sleep(spaceUsed.getRefreshInterval());
// update the used variable
spaceUsed.refresh();
} catch (InterruptedException e) {
LOG.warn("Thread Interrupted waiting to refresh disk information", e);
Thread.currentThread().interrupt();
}
}
}
}
}

View File

@ -17,227 +17,73 @@
*/
package org.apache.hadoop.fs;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.util.Shell;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
/** Filesystem disk space usage statistics. Uses the unix 'du' program*/
/** Filesystem disk space usage statistics. Uses the unix 'du' program */
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Evolving
public class DU extends Shell {
private String dirPath;
public class DU extends CachingGetSpaceUsed {
private DUShell duShell;
private AtomicLong used = new AtomicLong();
private volatile boolean shouldRun = true;
private Thread refreshUsed;
private IOException duException = null;
private long refreshInterval;
/**
* Keeps track of disk usage.
* @param path the path to check disk usage in
* @param interval refresh the disk usage at this interval
* @throws IOException if we fail to refresh the disk usage
*/
public DU(File path, long interval) throws IOException {
this(path, interval, -1L);
@VisibleForTesting
public DU(File path, long interval, long initialUsed) throws IOException {
super(path, interval, initialUsed);
}
/**
* Keeps track of disk usage.
* @param path the path to check disk usage in
* @param interval refresh the disk usage at this interval
* @param initialUsed use this value until next refresh
* @throws IOException if we fail to refresh the disk usage
*/
public DU(File path, long interval, long initialUsed) throws IOException {
super(0);
//we set the Shell interval to 0 so it will always run our command
//and use this one to set the thread sleep interval
this.refreshInterval = interval;
this.dirPath = path.getCanonicalPath();
public DU(CachingGetSpaceUsed.Builder builder) throws IOException {
this(builder.getPath(), builder.getInterval(), builder.getInitialUsed());
}
//populate the used variable if the initial value is not specified.
if (initialUsed < 0) {
run();
} else {
this.used.set(initialUsed);
@Override
protected synchronized void refresh() {
if (duShell == null) {
duShell = new DUShell();
}
try {
duShell.startRefresh();
} catch (IOException ioe) {
LOG.warn("Could not get disk usage information", ioe);
}
}
/**
* Keeps track of disk usage.
* @param path the path to check disk usage in
* @param conf configuration object
* @throws IOException if we fail to refresh the disk usage
*/
public DU(File path, Configuration conf) throws IOException {
this(path, conf, -1L);
}
/**
* Keeps track of disk usage.
* @param path the path to check disk usage in
* @param conf configuration object
* @param initialUsed use it until the next refresh.
* @throws IOException if we fail to refresh the disk usage
*/
public DU(File path, Configuration conf, long initialUsed)
throws IOException {
this(path, conf.getLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY,
CommonConfigurationKeys.FS_DU_INTERVAL_DEFAULT), initialUsed);
}
/**
* This thread refreshes the "used" variable.
*
* Future improvements could be to not permanently
* run this thread, instead run when getUsed is called.
**/
class DURefreshThread implements Runnable {
private final class DUShell extends Shell {
void startRefresh() throws IOException {
run();
}
@Override
public void run() {
while(shouldRun) {
public String toString() {
return
"du -sk " + getDirPath() + "\n" + used.get() + "\t" + getDirPath();
}
try {
Thread.sleep(refreshInterval);
try {
//update the used variable
DU.this.run();
} catch (IOException e) {
synchronized (DU.this) {
//save the latest exception so we can return it in getUsed()
duException = e;
}
LOG.warn("Could not get disk usage information", e);
}
} catch (InterruptedException e) {
}
@Override
protected String[] getExecString() {
return new String[]{"du", "-sk", getDirPath()};
}
@Override
protected void parseExecResult(BufferedReader lines) throws IOException {
String line = lines.readLine();
if (line == null) {
throw new IOException("Expecting a line not the end of stream");
}
}
}
/**
* Decrease how much disk space we use.
* @param value decrease by this value
*/
public void decDfsUsed(long value) {
used.addAndGet(-value);
}
/**
* Increase how much disk space we use.
* @param value increase by this value
*/
public void incDfsUsed(long value) {
used.addAndGet(value);
}
/**
* @return disk space used
* @throws IOException if the shell command fails
*/
public long getUsed() throws IOException {
//if the updating thread isn't started, update on demand
if(refreshUsed == null) {
run();
} else {
synchronized (DU.this) {
//if an exception was thrown in the last run, rethrow
if(duException != null) {
IOException tmp = duException;
duException = null;
throw tmp;
}
String[] tokens = line.split("\t");
if (tokens.length == 0) {
throw new IOException("Illegal du output");
}
setUsed(Long.parseLong(tokens[0]) * 1024);
}
return Math.max(used.longValue(), 0L);
}
/**
* @return the path of which we're keeping track of disk usage
*/
public String getDirPath() {
return dirPath;
}
/**
* Override to hook in DUHelper class. Maybe this can be used more
* generally as well on Unix/Linux based systems
*/
@Override
protected void run() throws IOException {
if (WINDOWS) {
used.set(DUHelper.getFolderUsage(dirPath));
return;
}
super.run();
}
/**
* Start the disk usage checking thread.
*/
public void start() {
//only start the thread if the interval is sane
if(refreshInterval > 0) {
refreshUsed = new Thread(new DURefreshThread(),
"refreshUsed-"+dirPath);
refreshUsed.setDaemon(true);
refreshUsed.start();
}
}
/**
* Shut down the refreshing thread.
*/
public void shutdown() {
this.shouldRun = false;
if(this.refreshUsed != null) {
this.refreshUsed.interrupt();
}
}
@Override
public String toString() {
return
"du -sk " + dirPath +"\n" +
used + "\t" + dirPath;
}
@Override
protected String[] getExecString() {
return new String[] {"du", "-sk", dirPath};
}
@Override
protected void parseExecResult(BufferedReader lines) throws IOException {
String line = lines.readLine();
if (line == null) {
throw new IOException("Expecting a line not the end of stream");
}
String[] tokens = line.split("\t");
if(tokens.length == 0) {
throw new IOException("Illegal du output");
}
this.used.set(Long.parseLong(tokens[0])*1024);
}
public static void main(String[] args) throws Exception {
String path = ".";
@ -245,6 +91,10 @@ public class DU extends Shell {
path = args[0];
}
System.out.println(new DU(new File(path), new Configuration()).toString());
GetSpaceUsed du = new GetSpaceUsed.Builder().setPath(new File(path))
.setConf(new Configuration())
.build();
String duResult = du.toString();
System.out.println(duResult);
}
}

View File

@ -0,0 +1,147 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Shell;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
public interface GetSpaceUsed {
long getUsed() throws IOException;
/**
* The builder class
*/
final class Builder {
static final Logger LOG = LoggerFactory.getLogger(Builder.class);
static final String CLASSNAME_KEY = "fs.getspaceused.classname";
private Configuration conf;
private Class<? extends GetSpaceUsed> klass = null;
private File path = null;
private Long interval = null;
private Long initialUsed = null;
public Configuration getConf() {
return conf;
}
public Builder setConf(Configuration conf) {
this.conf = conf;
return this;
}
public long getInterval() {
if (interval != null) {
return interval;
}
long result = CommonConfigurationKeys.FS_DU_INTERVAL_DEFAULT;
if (conf == null) {
return result;
}
return conf.getLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, result);
}
public Builder setInterval(long interval) {
this.interval = interval;
return this;
}
public Class<? extends GetSpaceUsed> getKlass() {
if (klass != null) {
return klass;
}
Class<? extends GetSpaceUsed> result = null;
if (Shell.WINDOWS) {
result = WindowsGetSpaceUsed.class;
} else {
result = DU.class;
}
if (conf == null) {
return result;
}
return conf.getClass(CLASSNAME_KEY, result, GetSpaceUsed.class);
}
public Builder setKlass(Class<? extends GetSpaceUsed> klass) {
this.klass = klass;
return this;
}
public File getPath() {
return path;
}
public Builder setPath(File path) {
this.path = path;
return this;
}
public long getInitialUsed() {
if (initialUsed == null) {
return -1;
}
return initialUsed;
}
public Builder setInitialUsed(long initialUsed) {
this.initialUsed = initialUsed;
return this;
}
public GetSpaceUsed build() throws IOException {
GetSpaceUsed getSpaceUsed = null;
try {
Constructor<? extends GetSpaceUsed> cons =
getKlass().getConstructor(Builder.class);
getSpaceUsed = cons.newInstance(this);
} catch (InstantiationException e) {
LOG.warn("Error trying to create an instance of " + getKlass(), e);
} catch (IllegalAccessException e) {
LOG.warn("Error trying to create " + getKlass(), e);
} catch (InvocationTargetException e) {
LOG.warn("Error trying to create " + getKlass(), e);
} catch (NoSuchMethodException e) {
LOG.warn("Doesn't look like the class " + getKlass() +
" have the needed constructor", e);
}
// If there were any exceptions then du will be null.
// Construct our best guess fallback.
if (getSpaceUsed == null) {
if (Shell.WINDOWS) {
getSpaceUsed = new WindowsGetSpaceUsed(this);
} else {
getSpaceUsed = new DU(this);
}
}
// Call init after classes constructors have finished.
if (getSpaceUsed instanceof CachingGetSpaceUsed) {
((CachingGetSpaceUsed) getSpaceUsed).init();
}
return getSpaceUsed;
}
}
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import java.io.IOException;
/**
* Class to tell the size of a path on windows.
* Rather than shelling out, on windows this uses DUHelper.getFolderUsage
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Evolving
public class WindowsGetSpaceUsed extends CachingGetSpaceUsed {
WindowsGetSpaceUsed(CachingGetSpaceUsed.Builder builder) throws IOException {
super(builder.getPath(), builder.getInterval(), builder.getInitialUsed());
}
/**
* Override to hook in DUHelper class.
*/
@Override
protected void refresh() {
used.set(DUHelper.getFolderUsage(getDirPath()));
}
}

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.test.GenericTestUtils;
/** This test makes sure that "DU" does not get to run on each call to getUsed */
/** This test makes sure that "DU" does not get to run on each call to getUsed */
public class TestDU extends TestCase {
final static private File DU_DIR = GenericTestUtils.getTestDir("dutmp");
@ -42,7 +42,7 @@ public class TestDU extends TestCase {
public void tearDown() throws IOException {
FileUtil.fullyDelete(DU_DIR);
}
private void createFile(File newFile, int size) throws IOException {
// write random data so that filesystems with compression enabled (e.g., ZFS)
// can't compress the file
@ -54,18 +54,18 @@ public class TestDU extends TestCase {
RandomAccessFile file = new RandomAccessFile(newFile, "rws");
file.write(data);
file.getFD().sync();
file.close();
}
/**
* Verify that du returns expected used space for a file.
* We assume here that if a file system crates a file of size
* We assume here that if a file system crates a file of size
* that is a multiple of the block size in this file system,
* then the used size for the file will be exactly that size.
* This is true for most file systems.
*
*
* @throws IOException
* @throws InterruptedException
*/
@ -78,28 +78,29 @@ public class TestDU extends TestCase {
createFile(file, writtenSize);
Thread.sleep(5000); // let the metadata updater catch up
DU du = new DU(file, 10000);
du.start();
DU du = new DU(file, 10000, -1);
du.init();
long duSize = du.getUsed();
du.shutdown();
du.close();
assertTrue("Invalid on-disk size",
duSize >= writtenSize &&
writtenSize <= (duSize + slack));
//test with 0 interval, will not launch thread
du = new DU(file, 0);
du.start();
//test with 0 interval, will not launch thread
du = new DU(file, 0, -1);
du.init();
duSize = du.getUsed();
du.shutdown();
du.close();
assertTrue("Invalid on-disk size",
duSize >= writtenSize &&
writtenSize <= (duSize + slack));
//test without launching thread
du = new DU(file, 10000);
//test without launching thread
du = new DU(file, 10000, -1);
du.init();
duSize = du.getUsed();
assertTrue("Invalid on-disk size",
@ -111,8 +112,8 @@ public class TestDU extends TestCase {
assertTrue(file.createNewFile());
Configuration conf = new Configuration();
conf.setLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, 10000L);
DU du = new DU(file, conf);
du.decDfsUsed(Long.MAX_VALUE);
DU du = new DU(file, 10000L, -1);
du.incDfsUsed(-Long.MAX_VALUE);
long duSize = du.getUsed();
assertTrue(String.valueOf(duSize), duSize >= 0L);
}
@ -121,7 +122,7 @@ public class TestDU extends TestCase {
File file = new File(DU_DIR, "dataX");
createFile(file, 8192);
DU du = new DU(file, 3000, 1024);
du.start();
du.init();
assertTrue("Initial usage setting not honored", du.getUsed() == 1024);
// wait until the first du runs.
@ -131,4 +132,7 @@ public class TestDU extends TestCase {
assertTrue("Usage didn't get updated", du.getUsed() == 8192);
}
}

View File

@ -0,0 +1,133 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.apache.hadoop.conf.Configuration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import static org.junit.Assert.*;
public class TestGetSpaceUsed {
final static private File DIR = new File(
System.getProperty("test.build.data", "/tmp"), "TestGetSpaceUsed");
@Before
public void setUp() {
FileUtil.fullyDelete(DIR);
assertTrue(DIR.mkdirs());
}
@After
public void tearDown() throws IOException {
FileUtil.fullyDelete(DIR);
}
/**
* Test that the builder can create a class specified through the class.
*/
@Test
public void testBuilderConf() throws Exception {
File file = new File(DIR, "testBuilderConf");
assertTrue(file.createNewFile());
Configuration conf = new Configuration();
conf.set("fs.getspaceused.classname", DummyDU.class.getName());
CachingGetSpaceUsed instance =
(CachingGetSpaceUsed) new CachingGetSpaceUsed.Builder()
.setPath(file)
.setInterval(0)
.setConf(conf)
.build();
assertNotNull(instance);
assertTrue(instance instanceof DummyDU);
assertFalse(instance.running());
instance.close();
}
@Test
public void testBuildInitial() throws Exception {
File file = new File(DIR, "testBuildInitial");
assertTrue(file.createNewFile());
CachingGetSpaceUsed instance =
(CachingGetSpaceUsed) new CachingGetSpaceUsed.Builder()
.setPath(file)
.setInitialUsed(90210)
.setKlass(DummyDU.class)
.build();
assertEquals(90210, instance.getUsed());
instance.close();
}
@Test
public void testBuildInterval() throws Exception {
File file = new File(DIR, "testBuildInitial");
assertTrue(file.createNewFile());
CachingGetSpaceUsed instance =
(CachingGetSpaceUsed) new CachingGetSpaceUsed.Builder()
.setPath(file)
.setInitialUsed(90210)
.setInterval(50060)
.setKlass(DummyDU.class)
.build();
assertEquals(50060, instance.getRefreshInterval());
instance.close();
}
@Test
public void testBuildNonCaching() throws Exception {
File file = new File(DIR, "testBuildNonCaching");
assertTrue(file.createNewFile());
GetSpaceUsed instance = new CachingGetSpaceUsed.Builder()
.setPath(file)
.setInitialUsed(90210)
.setInterval(50060)
.setKlass(DummyGetSpaceUsed.class)
.build();
assertEquals(300, instance.getUsed());
assertTrue(instance instanceof DummyGetSpaceUsed);
}
private static class DummyDU extends CachingGetSpaceUsed {
public DummyDU(Builder builder) throws IOException {
// Push to the base class.
// Most times that's all that will need to be done.
super(builder);
}
@Override
protected void refresh() {
// This is a test so don't du anything.
}
}
private static class DummyGetSpaceUsed implements GetSpaceUsed {
public DummyGetSpaceUsed(GetSpaceUsed.Builder builder) {
}
@Override public long getUsed() throws IOException {
return 300;
}
}
}

View File

@ -36,8 +36,9 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DU;
import org.apache.hadoop.fs.CachingGetSpaceUsed;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.GetSpaceUsed;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.Block;
@ -62,10 +63,10 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Files;
/**
* A block pool slice represents a portion of a block pool stored on a volume.
* Taken together, all BlockPoolSlices sharing a block pool ID across a
* A block pool slice represents a portion of a block pool stored on a volume.
* Taken together, all BlockPoolSlices sharing a block pool ID across a
* cluster represent a single block pool.
*
*
* This class is synchronized by {@link FsVolumeImpl}.
*/
class BlockPoolSlice {
@ -92,10 +93,10 @@ class BlockPoolSlice {
private final Timer timer;
// TODO:FEDERATION scalability issue - a thread per DU is needed
private final DU dfsUsage;
private final GetSpaceUsed dfsUsage;
/**
* Create a blook pool slice
* Create a blook pool slice
* @param bpid Block pool Id
* @param volume {@link FsVolumeImpl} to which this BlockPool belongs to
* @param bpDir directory corresponding to the BlockPool
@ -107,7 +108,7 @@ class BlockPoolSlice {
Configuration conf, Timer timer) throws IOException {
this.bpid = bpid;
this.volume = volume;
this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
this.finalizedDir = new File(
currentDir, DataStorage.STORAGE_DIR_FINALIZED);
this.lazypersistDir = new File(currentDir, DataStorage.STORAGE_DIR_LAZY_PERSIST);
@ -157,8 +158,10 @@ class BlockPoolSlice {
}
// Use cached value initially if available. Or the following call will
// block until the initial du command completes.
this.dfsUsage = new DU(bpDir, conf, loadDfsUsed());
this.dfsUsage.start();
this.dfsUsage = new CachingGetSpaceUsed.Builder().setPath(bpDir)
.setConf(conf)
.setInitialUsed(loadDfsUsed())
.build();
// Make the dfs usage to be saved during shutdown.
ShutdownHookManager.get().addShutdownHook(
@ -179,7 +182,7 @@ class BlockPoolSlice {
File getFinalizedDir() {
return finalizedDir;
}
File getLazypersistDir() {
return lazypersistDir;
}
@ -194,17 +197,21 @@ class BlockPoolSlice {
/** Run DU on local drives. It must be synchronized from caller. */
void decDfsUsed(long value) {
dfsUsage.decDfsUsed(value);
if (dfsUsage instanceof CachingGetSpaceUsed) {
((CachingGetSpaceUsed)dfsUsage).incDfsUsed(-value);
}
}
long getDfsUsed() throws IOException {
return dfsUsage.getUsed();
}
void incDfsUsed(long value) {
dfsUsage.incDfsUsed(value);
if (dfsUsage instanceof CachingGetSpaceUsed) {
((CachingGetSpaceUsed)dfsUsage).incDfsUsed(value);
}
}
/**
* Read in the cached DU value and return it if it is less than
* cachedDfsUsedCheckTime which is set by
@ -310,7 +317,10 @@ class BlockPoolSlice {
}
File blockFile = FsDatasetImpl.moveBlockFiles(b, f, blockDir);
File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
if (dfsUsage instanceof CachingGetSpaceUsed) {
((CachingGetSpaceUsed) dfsUsage).incDfsUsed(
b.getNumBytes() + metaFile.length());
}
return blockFile;
}
@ -337,7 +347,7 @@ class BlockPoolSlice {
}
void getVolumeMap(ReplicaMap volumeMap,
final RamDiskReplicaTracker lazyWriteReplicaMap)
throws IOException {
@ -348,7 +358,7 @@ class BlockPoolSlice {
FsDatasetImpl.LOG.info(
"Recovered " + numRecovered + " replicas from " + lazypersistDir);
}
boolean success = readReplicasFromCache(volumeMap, lazyWriteReplicaMap);
if (!success) {
// add finalized replicas
@ -442,7 +452,7 @@ class BlockPoolSlice {
FileUtil.fullyDelete(source);
return numRecovered;
}
private void addReplicaToReplicasMap(Block block, ReplicaMap volumeMap,
final RamDiskReplicaTracker lazyWriteReplicaMap,boolean isFinalized)
throws IOException {
@ -450,7 +460,7 @@ class BlockPoolSlice {
long blockId = block.getBlockId();
long genStamp = block.getGenerationStamp();
if (isFinalized) {
newReplica = new FinalizedReplica(blockId,
newReplica = new FinalizedReplica(blockId,
block.getNumBytes(), genStamp, volume, DatanodeUtil
.idToBlockDir(finalizedDir, blockId));
} else {
@ -467,7 +477,7 @@ class BlockPoolSlice {
// We don't know the expected block length, so just use 0
// and don't reserve any more space for writes.
newReplica = new ReplicaBeingWritten(blockId,
validateIntegrityAndSetLength(file, genStamp),
validateIntegrityAndSetLength(file, genStamp),
genStamp, volume, file.getParentFile(), null, 0);
loadRwr = false;
}
@ -513,7 +523,7 @@ class BlockPoolSlice {
incrNumBlocks();
}
}
/**
* Add replicas under the given directory to the volume map
@ -543,12 +553,12 @@ class BlockPoolSlice {
}
if (!Block.isBlockFilename(file))
continue;
long genStamp = FsDatasetUtil.getGenerationStampFromFile(
files, file);
long blockId = Block.filename2id(file.getName());
Block block = new Block(blockId, file.length(), genStamp);
addReplicaToReplicasMap(block, volumeMap, lazyWriteReplicaMap,
Block block = new Block(blockId, file.length(), genStamp);
addReplicaToReplicasMap(block, volumeMap, lazyWriteReplicaMap,
isFinalized);
}
}
@ -642,11 +652,11 @@ class BlockPoolSlice {
/**
* Find out the number of bytes in the block that match its crc.
*
* This algorithm assumes that data corruption caused by unexpected
*
* This algorithm assumes that data corruption caused by unexpected
* datanode shutdown occurs only in the last crc chunk. So it checks
* only the last chunk.
*
*
* @param blockFile the block file
* @param genStamp generation stamp of the block
* @return the number of valid bytes
@ -673,7 +683,7 @@ class BlockPoolSlice {
int bytesPerChecksum = checksum.getBytesPerChecksum();
int checksumSize = checksum.getChecksumSize();
long numChunks = Math.min(
(blockFileLen + bytesPerChecksum - 1)/bytesPerChecksum,
(blockFileLen + bytesPerChecksum - 1)/bytesPerChecksum,
(metaFileLen - crcHeaderLen)/checksumSize);
if (numChunks == 0) {
return 0;
@ -716,17 +726,20 @@ class BlockPoolSlice {
IOUtils.closeStream(blockIn);
}
}
@Override
public String toString() {
return currentDir.getAbsolutePath();
}
void shutdown(BlockListAsLongs blocksListToPersist) {
saveReplicas(blocksListToPersist);
saveDfsUsed();
dfsUsedSaved = true;
dfsUsage.shutdown();
if (dfsUsage instanceof CachingGetSpaceUsed) {
IOUtils.cleanup(LOG, ((CachingGetSpaceUsed) dfsUsage));
}
}
private boolean readReplicasFromCache(ReplicaMap volumeMap,
@ -735,17 +748,17 @@ class BlockPoolSlice {
File replicaFile = new File(currentDir, REPLICA_CACHE_FILE);
// Check whether the file exists or not.
if (!replicaFile.exists()) {
LOG.info("Replica Cache file: "+ replicaFile.getPath() +
LOG.info("Replica Cache file: "+ replicaFile.getPath() +
" doesn't exist ");
return false;
}
long fileLastModifiedTime = replicaFile.lastModified();
if (System.currentTimeMillis() > fileLastModifiedTime + replicaCacheExpiry) {
LOG.info("Replica Cache file: " + replicaFile.getPath() +
LOG.info("Replica Cache file: " + replicaFile.getPath() +
" has gone stale");
// Just to make findbugs happy
if (!replicaFile.delete()) {
LOG.info("Replica Cache file: " + replicaFile.getPath() +
LOG.info("Replica Cache file: " + replicaFile.getPath() +
" cannot be deleted");
}
return false;
@ -782,7 +795,7 @@ class BlockPoolSlice {
iter.remove();
volumeMap.add(bpid, info);
}
LOG.info("Successfully read replica from cache file : "
LOG.info("Successfully read replica from cache file : "
+ replicaFile.getPath());
return true;
} catch (Exception e) {
@ -800,10 +813,10 @@ class BlockPoolSlice {
// close the inputStream
IOUtils.closeStream(inputStream);
}
}
}
private void saveReplicas(BlockListAsLongs blocksListToPersist) {
if (blocksListToPersist == null ||
if (blocksListToPersist == null ||
blocksListToPersist.getNumberOfBlocks()== 0) {
return;
}
@ -819,7 +832,7 @@ class BlockPoolSlice {
replicaCacheFile.getPath());
return;
}
FileOutputStream out = null;
try {
out = new FileOutputStream(tmpFile);
@ -833,7 +846,7 @@ class BlockPoolSlice {
// and continue.
LOG.warn("Failed to write replicas to cache ", e);
if (replicaCacheFile.exists() && !replicaCacheFile.delete()) {
LOG.warn("Failed to delete replicas file: " +
LOG.warn("Failed to delete replicas file: " +
replicaCacheFile.getPath());
}
} finally {