HDFS-15207. VolumeScanner skip to scan blocks accessed during recent scan peroid. Contributed by Yang Yun.
(cherry picked from commit 50caba1a92
)
This commit is contained in:
parent
acae31aa28
commit
032ccba67c
|
@ -846,6 +846,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final int DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 21 * 24; // 3 weeks.
|
public static final int DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 21 * 24; // 3 weeks.
|
||||||
public static final String DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND = "dfs.block.scanner.volume.bytes.per.second";
|
public static final String DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND = "dfs.block.scanner.volume.bytes.per.second";
|
||||||
public static final long DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT = 1048576L;
|
public static final long DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT = 1048576L;
|
||||||
|
public static final String DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED =
|
||||||
|
"dfs.block.scanner.skip.recent.accessed";
|
||||||
|
public static final boolean DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED_DEFAULT =
|
||||||
|
false;
|
||||||
public static final String DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = "dfs.datanode.transferTo.allowed";
|
public static final String DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = "dfs.datanode.transferTo.allowed";
|
||||||
public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true;
|
public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true;
|
||||||
public static final String DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval";
|
public static final String DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval";
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
|
||||||
|
@ -112,6 +114,7 @@ public class BlockScanner {
|
||||||
final long maxStalenessMs;
|
final long maxStalenessMs;
|
||||||
final long scanPeriodMs;
|
final long scanPeriodMs;
|
||||||
final long cursorSaveMs;
|
final long cursorSaveMs;
|
||||||
|
final boolean skipRecentAccessed;
|
||||||
final Class<? extends ScanResultHandler> resultHandler;
|
final Class<? extends ScanResultHandler> resultHandler;
|
||||||
|
|
||||||
private static long getUnitTestLong(Configuration conf, String key,
|
private static long getUnitTestLong(Configuration conf, String key,
|
||||||
|
@ -163,6 +166,9 @@ public class BlockScanner {
|
||||||
this.cursorSaveMs = Math.max(0L, getUnitTestLong(conf,
|
this.cursorSaveMs = Math.max(0L, getUnitTestLong(conf,
|
||||||
INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS,
|
INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS,
|
||||||
INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS_DEFAULT));
|
INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS_DEFAULT));
|
||||||
|
this.skipRecentAccessed = conf.getBoolean(
|
||||||
|
DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED,
|
||||||
|
DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED_DEFAULT);
|
||||||
if (allowUnitTestSettings) {
|
if (allowUnitTestSettings) {
|
||||||
this.resultHandler = (Class<? extends ScanResultHandler>)
|
this.resultHandler = (Class<? extends ScanResultHandler>)
|
||||||
conf.getClass(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
|
conf.getClass(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
|
||||||
|
|
|
@ -19,8 +19,11 @@
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
|
import java.io.File;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.attribute.BasicFileAttributes;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedHashSet;
|
import java.util.LinkedHashSet;
|
||||||
|
@ -32,6 +35,7 @@ import com.google.common.base.Preconditions;
|
||||||
import com.google.common.cache.Cache;
|
import com.google.common.cache.Cache;
|
||||||
import com.google.common.cache.CacheBuilder;
|
import com.google.common.cache.CacheBuilder;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf;
|
import org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
||||||
|
@ -540,6 +544,24 @@ public class VolumeScanner extends Thread {
|
||||||
this, curBlockIter.getBlockPoolId());
|
this, curBlockIter.getBlockPoolId());
|
||||||
saveBlockIterator(curBlockIter);
|
saveBlockIterator(curBlockIter);
|
||||||
return 0;
|
return 0;
|
||||||
|
} else if (conf.skipRecentAccessed) {
|
||||||
|
// Check the access time of block file to avoid scanning recently
|
||||||
|
// changed blocks, reducing disk IO.
|
||||||
|
try {
|
||||||
|
BlockLocalPathInfo blockLocalPathInfo =
|
||||||
|
volume.getDataset().getBlockLocalPathInfo(block);
|
||||||
|
BasicFileAttributes attr = Files.readAttributes(
|
||||||
|
new File(blockLocalPathInfo.getBlockPath()).toPath(),
|
||||||
|
BasicFileAttributes.class);
|
||||||
|
if (System.currentTimeMillis() - attr.lastAccessTime().
|
||||||
|
to(TimeUnit.MILLISECONDS) < conf.scanPeriodMs) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.debug("Failed to get access time of block {}",
|
||||||
|
block, ioe);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (curBlockIter != null) {
|
if (curBlockIter != null) {
|
||||||
|
|
|
@ -1586,6 +1586,16 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.block.scanner.skip.recent.accessed</name>
|
||||||
|
<value>false</value>
|
||||||
|
<description>
|
||||||
|
If this is true, scanner will check the access time of block file to avoid
|
||||||
|
scanning blocks accessed during recent scan peroid, reducing disk IO.
|
||||||
|
This feature will not work if the DataNode volume has noatime mount option.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.datanode.readahead.bytes</name>
|
<name>dfs.datanode.readahead.bytes</name>
|
||||||
<value>4194304</value>
|
<value>4194304</value>
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND;
|
||||||
import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS;
|
import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS;
|
||||||
|
@ -25,6 +26,7 @@ import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
@ -974,4 +976,40 @@ public class TestBlockScanner {
|
||||||
info.blocksScanned = 0;
|
info.blocksScanned = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSkipRecentAccessFile() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setBoolean(DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED, true);
|
||||||
|
conf.setLong(INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS, 2000L);
|
||||||
|
conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
|
||||||
|
TestScanResultHandler.class.getName());
|
||||||
|
final TestContext ctx = new TestContext(conf, 1);
|
||||||
|
final int totalBlocks = 5;
|
||||||
|
ctx.createFiles(0, totalBlocks, 4096);
|
||||||
|
|
||||||
|
final TestScanResultHandler.Info info =
|
||||||
|
TestScanResultHandler.getInfo(ctx.volumes.get(0));
|
||||||
|
synchronized (info) {
|
||||||
|
info.shouldRun = true;
|
||||||
|
info.notify();
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
GenericTestUtils.waitFor(() -> {
|
||||||
|
synchronized (info) {
|
||||||
|
return info.blocksScanned > 0;
|
||||||
|
}
|
||||||
|
}, 10, 500);
|
||||||
|
fail("Scan nothing for all files are accessed in last period.");
|
||||||
|
} catch (TimeoutException e) {
|
||||||
|
LOG.debug("Timeout for all files are accessed in last period.");
|
||||||
|
}
|
||||||
|
synchronized (info) {
|
||||||
|
info.shouldRun = false;
|
||||||
|
info.notify();
|
||||||
|
}
|
||||||
|
assertEquals("Should not scan block accessed in last period",
|
||||||
|
0, info.blocksScanned);
|
||||||
|
ctx.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue