HDFS-11667. Block Storage:Handling flushing of incomplete block id buffers during shutdown. Contributed by Mukul Kumar Singh.
This commit is contained in:
parent
a2c6f594ba
commit
eae8c2a469
|
@ -95,7 +95,7 @@ public class BlockWriterTask implements Runnable {
|
||||||
flusher.getLOG().debug("Time taken for Write Small File : {} ms",
|
flusher.getLOG().debug("Time taken for Write Small File : {} ms",
|
||||||
endTime - startTime);
|
endTime - startTime);
|
||||||
|
|
||||||
flusher.incrementremoteIO();
|
flusher.incrementRemoteIO();
|
||||||
|
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
flusher.getLOG().error("Writing of block failed, We have attempted " +
|
flusher.getLOG().error("Writing of block failed, We have attempted " +
|
||||||
|
@ -119,7 +119,7 @@ public class BlockWriterTask implements Runnable {
|
||||||
File logDir = new File(this.dbPath);
|
File logDir = new File(this.dbPath);
|
||||||
if (!logDir.exists() && !logDir.mkdirs()) {
|
if (!logDir.exists() && !logDir.mkdirs()) {
|
||||||
flusher.getLOG().error(
|
flusher.getLOG().error(
|
||||||
"Unable to create the log directory, Crticial error cannot continue");
|
"Unable to create the log directory, Critical error cannot continue");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
String log = Paths.get(this.dbPath, retryFileName).toString();
|
String log = Paths.get(this.dbPath, retryFileName).toString();
|
||||||
|
|
|
@ -43,6 +43,11 @@ public class CBlockTargetMetrics {
|
||||||
@Metric private MutableCounterLong numBlockBufferFlush;
|
@Metric private MutableCounterLong numBlockBufferFlush;
|
||||||
@Metric private MutableCounterLong numDirectBlockWrites;
|
@Metric private MutableCounterLong numDirectBlockWrites;
|
||||||
@Metric private MutableCounterLong numFailedDirectBlockWrites;
|
@Metric private MutableCounterLong numFailedDirectBlockWrites;
|
||||||
|
@Metric private MutableCounterLong numDirtyLogBlockRead;
|
||||||
|
@Metric private MutableCounterLong numBytesDirtyLogRead;
|
||||||
|
@Metric private MutableCounterLong numDirtyLogBlockUpdated;
|
||||||
|
@Metric private MutableCounterLong numBytesDirtyLogWritten;
|
||||||
|
@Metric private MutableCounterLong numFailedDirtyBlockFlushes;
|
||||||
|
|
||||||
// Latency based Metrics
|
// Latency based Metrics
|
||||||
@Metric private MutableRate dbReadLatency;
|
@Metric private MutableRate dbReadLatency;
|
||||||
|
@ -94,6 +99,26 @@ public class CBlockTargetMetrics {
|
||||||
numBlockBufferFlush.incr();
|
numBlockBufferFlush.incr();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void incNumDirtyLogBlockRead() {
|
||||||
|
numDirtyLogBlockRead.incr();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incNumBytesDirtyLogRead(int bytes) {
|
||||||
|
numBytesDirtyLogRead.incr(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incNumDirtyLogBlockUpdated() {
|
||||||
|
numDirtyLogBlockUpdated.incr();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incNumBytesDirtyLogWritten(int bytes) {
|
||||||
|
numBytesDirtyLogWritten.incr(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incNumFailedDirtyBlockFlushes() {
|
||||||
|
numFailedDirtyBlockFlushes.incr();
|
||||||
|
}
|
||||||
|
|
||||||
public void updateDBReadLatency(long latency) {
|
public void updateDBReadLatency(long latency) {
|
||||||
dbReadLatency.add(latency);
|
dbReadLatency.add(latency);
|
||||||
}
|
}
|
||||||
|
@ -157,4 +182,29 @@ public class CBlockTargetMetrics {
|
||||||
public long getNumBlockBufferFlush() {
|
public long getNumBlockBufferFlush() {
|
||||||
return numBlockBufferFlush.value();
|
return numBlockBufferFlush.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getNumDirtyLogBlockRead() {
|
||||||
|
return numDirtyLogBlockRead.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getNumBytesDirtyLogReads() {
|
||||||
|
return numBytesDirtyLogRead.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getNumDirtyLogBlockUpdated() {
|
||||||
|
return numDirtyLogBlockUpdated.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getNumBytesDirtyLogWritten() {
|
||||||
|
return numBytesDirtyLogWritten.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getNumFailedDirtyBlockFlushes() {
|
||||||
|
return numFailedDirtyBlockFlushes.value();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -213,7 +213,7 @@ public class ContainerCacheFlusher implements Runnable {
|
||||||
threadPoolExecutor.shutdown();
|
threadPoolExecutor.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
public long incrementremoteIO() {
|
public long incrementRemoteIO() {
|
||||||
return remoteIO.incrementAndGet();
|
return remoteIO.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -352,6 +352,7 @@ public class ContainerCacheFlusher implements Runnable {
|
||||||
// bytes per long (which is calculated by number of bits per long
|
// bytes per long (which is calculated by number of bits per long
|
||||||
// divided by number of bits per byte) gives the number of blocks
|
// divided by number of bits per byte) gives the number of blocks
|
||||||
int blockCount = blockIDBuffer.position()/(Long.SIZE / Byte.SIZE);
|
int blockCount = blockIDBuffer.position()/(Long.SIZE / Byte.SIZE);
|
||||||
|
getTargetMetrics().incNumBytesDirtyLogRead(bytesRead);
|
||||||
if (finishCountMap.containsKey(message.getFileName())) {
|
if (finishCountMap.containsKey(message.getFileName())) {
|
||||||
// In theory this should never happen. But if it happened,
|
// In theory this should never happen. But if it happened,
|
||||||
// we need to know it...
|
// we need to know it...
|
||||||
|
@ -369,6 +370,7 @@ public class ContainerCacheFlusher implements Runnable {
|
||||||
LOG.debug("Remaining blocks count {} and {}", blockIDBuffer.remaining(),
|
LOG.debug("Remaining blocks count {} and {}", blockIDBuffer.remaining(),
|
||||||
blockCount);
|
blockCount);
|
||||||
while (blockIDBuffer.remaining() >= (Long.SIZE / Byte.SIZE)) {
|
while (blockIDBuffer.remaining() >= (Long.SIZE / Byte.SIZE)) {
|
||||||
|
getTargetMetrics().incNumDirtyLogBlockRead();
|
||||||
long blockID = blockIDBuffer.getLong();
|
long blockID = blockIDBuffer.getLong();
|
||||||
LogicalBlock block = new DiskBlock(blockID, null, false);
|
LogicalBlock block = new DiskBlock(blockID, null, false);
|
||||||
BlockWriterTask blockWriterTask = new BlockWriterTask(block, this,
|
BlockWriterTask blockWriterTask = new BlockWriterTask(block, this,
|
||||||
|
|
|
@ -105,6 +105,16 @@ public class AsyncBlockWriter {
|
||||||
blockIDBuffer = ByteBuffer.allocateDirect(blockBufferSize);
|
blockIDBuffer = ByteBuffer.allocateDirect(blockBufferSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void start() throws IOException {
|
||||||
|
File logDir = new File(parentCache.getDbPath().toString());
|
||||||
|
if (!logDir.exists() && !logDir.mkdirs()) {
|
||||||
|
LOG.error("Unable to create the log directory, Critical error cannot " +
|
||||||
|
"continue. Log Dir : {}", logDir);
|
||||||
|
throw new IllegalStateException("Cache Directory create failed, Cannot " +
|
||||||
|
"continue. Log Dir: {}" + logDir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the log to write to.
|
* Return the log to write to.
|
||||||
*
|
*
|
||||||
|
@ -169,6 +179,11 @@ public class AsyncBlockWriter {
|
||||||
block.getBlockID(), endTime - startTime, datahash);
|
block.getBlockID(), endTime - startTime, datahash);
|
||||||
}
|
}
|
||||||
block.clearData();
|
block.clearData();
|
||||||
|
if (blockIDBuffer.remaining() <= (Long.SIZE / Byte.SIZE)) {
|
||||||
|
writeBlockBufferToFile(blockIDBuffer);
|
||||||
|
}
|
||||||
|
parentCache.getTargetMetrics().incNumDirtyLogBlockUpdated();
|
||||||
|
blockIDBuffer.putLong(block.getBlockID());
|
||||||
} else {
|
} else {
|
||||||
Pipeline pipeline = parentCache.getPipeline(block.getBlockID());
|
Pipeline pipeline = parentCache.getPipeline(block.getBlockID());
|
||||||
String containerName = pipeline.getContainerName();
|
String containerName = pipeline.getContainerName();
|
||||||
|
@ -198,49 +213,59 @@ public class AsyncBlockWriter {
|
||||||
block.clearData();
|
block.clearData();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (blockIDBuffer.remaining() <= (Long.SIZE / Byte.SIZE)) {
|
|
||||||
long startTime = Time.monotonicNow();
|
|
||||||
blockIDBuffer.flip();
|
|
||||||
writeBlockBufferToFile(blockIDBuffer);
|
|
||||||
blockIDBuffer.clear();
|
|
||||||
long endTime = Time.monotonicNow();
|
|
||||||
if (parentCache.isTraceEnabled()) {
|
|
||||||
parentCache.getTracer().info(
|
|
||||||
"Task=DirtyBlockLogWrite,Time={}", endTime - startTime);
|
|
||||||
}
|
|
||||||
parentCache.getTargetMetrics().incNumBlockBufferFlush();
|
|
||||||
parentCache.getTargetMetrics()
|
|
||||||
.updateBlockBufferFlushLatency(endTime - startTime);
|
|
||||||
}
|
|
||||||
blockIDBuffer.putLong(block.getBlockID());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Write Block Buffer to file.
|
* Write Block Buffer to file.
|
||||||
*
|
*
|
||||||
* @param blockID - ByteBuffer
|
* @param blockBuffer - ByteBuffer
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private void writeBlockBufferToFile(ByteBuffer blockID)
|
private synchronized void writeBlockBufferToFile(ByteBuffer blockBuffer)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
long startTime = Time.monotonicNow();
|
||||||
boolean append = false;
|
boolean append = false;
|
||||||
|
int bytesWritten = 0;
|
||||||
|
|
||||||
|
// If there is nothing written to blockId buffer,
|
||||||
|
// then skip flushing of blockId buffer
|
||||||
|
if (blockBuffer.position() == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
blockBuffer.flip();
|
||||||
String fileName =
|
String fileName =
|
||||||
String.format("%s.%s", DIRTY_LOG_PREFIX, Time.monotonicNow());
|
String.format("%s.%s", DIRTY_LOG_PREFIX, Time.monotonicNow());
|
||||||
File logDir = new File(parentCache.getDbPath().toString());
|
|
||||||
if (!logDir.exists() && !logDir.mkdirs()) {
|
|
||||||
LOG.error("Unable to create the log directory, Critical error cannot " +
|
|
||||||
"continue. Log Dir : {}", logDir);
|
|
||||||
throw new IllegalStateException("Cache Directory create failed, Cannot " +
|
|
||||||
"continue. Log Dir: {}" + logDir);
|
|
||||||
}
|
|
||||||
String log = Paths.get(parentCache.getDbPath().toString(), fileName)
|
String log = Paths.get(parentCache.getDbPath().toString(), fileName)
|
||||||
.toString();
|
.toString();
|
||||||
|
|
||||||
try (FileChannel channel = new FileOutputStream(log, append).getChannel()) {
|
try {
|
||||||
channel.write(blockID);
|
FileChannel channel = new FileOutputStream(log, append).getChannel();
|
||||||
|
bytesWritten = channel.write(blockBuffer);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.error("Unable to sync the Block map to disk -- This might cause a " +
|
||||||
|
"data loss or corruption", ex);
|
||||||
|
parentCache.getTargetMetrics().incNumFailedDirtyBlockFlushes();
|
||||||
|
throw ex;
|
||||||
|
} finally {
|
||||||
|
blockBuffer.clear();
|
||||||
}
|
}
|
||||||
blockID.clear();
|
|
||||||
parentCache.processDirtyMessage(fileName);
|
parentCache.processDirtyMessage(fileName);
|
||||||
|
blockIDBuffer.clear();
|
||||||
|
long endTime = Time.monotonicNow();
|
||||||
|
if (parentCache.isTraceEnabled()) {
|
||||||
|
parentCache.getTracer().info(
|
||||||
|
"Task=DirtyBlockLogWrite,Time={} bytesWritten={}",
|
||||||
|
endTime - startTime, bytesWritten);
|
||||||
|
}
|
||||||
|
|
||||||
|
parentCache.getTargetMetrics().incNumBytesDirtyLogWritten(bytesWritten);
|
||||||
|
parentCache.getTargetMetrics().incNumBlockBufferFlush();
|
||||||
|
parentCache.getTargetMetrics()
|
||||||
|
.updateBlockBufferFlushLatency(endTime - startTime);
|
||||||
|
LOG.debug("Block buffer writer bytesWritten:{} Time:{}",
|
||||||
|
bytesWritten, endTime - startTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -199,7 +199,7 @@ public class CBlockLocalCache implements CacheModule {
|
||||||
* @param dbPathString - Path to db
|
* @param dbPathString - Path to db
|
||||||
* @return long bytes remaining.
|
* @return long bytes remaining.
|
||||||
*/
|
*/
|
||||||
private static long getRemaningDiskSpace(String dbPathString) {
|
private static long getRemainingDiskSpace(String dbPathString) {
|
||||||
try {
|
try {
|
||||||
URI fileUri = new URI("file:///");
|
URI fileUri = new URI("file:///");
|
||||||
Path dbPath = Paths.get(fileUri).resolve(dbPathString);
|
Path dbPath = Paths.get(fileUri).resolve(dbPathString);
|
||||||
|
@ -298,7 +298,7 @@ public class CBlockLocalCache implements CacheModule {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start() throws IOException {
|
public void start() throws IOException {
|
||||||
// This is a No-op for us. We start when we bootup.
|
blockWriter.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -457,7 +457,7 @@ public class CBlockLocalCache implements CacheModule {
|
||||||
if (StringUtils.isBlank(dbPath)) {
|
if (StringUtils.isBlank(dbPath)) {
|
||||||
return cacheSize;
|
return cacheSize;
|
||||||
}
|
}
|
||||||
long spaceRemaining = getRemaningDiskSpace(dbPath);
|
long spaceRemaining = getRemainingDiskSpace(dbPath);
|
||||||
double cacheRatio = 1.0;
|
double cacheRatio = 1.0;
|
||||||
|
|
||||||
if (spaceRemaining < volumeSize) {
|
if (spaceRemaining < volumeSize) {
|
||||||
|
|
|
@ -157,6 +157,7 @@ public class TestLocalBlockCache {
|
||||||
.setFlusher(flusher)
|
.setFlusher(flusher)
|
||||||
.setCBlockTargetMetrics(metrics)
|
.setCBlockTargetMetrics(metrics)
|
||||||
.build();
|
.build();
|
||||||
|
cache.start();
|
||||||
cache.put(blockID, data.getBytes(StandardCharsets.UTF_8));
|
cache.put(blockID, data.getBytes(StandardCharsets.UTF_8));
|
||||||
Assert.assertEquals(1, metrics.getNumWriteOps());
|
Assert.assertEquals(1, metrics.getNumWriteOps());
|
||||||
// Please note that this read is from the local cache.
|
// Please note that this read is from the local cache.
|
||||||
|
@ -202,6 +203,7 @@ public class TestLocalBlockCache {
|
||||||
.setFlusher(flusher)
|
.setFlusher(flusher)
|
||||||
.setCBlockTargetMetrics(metrics)
|
.setCBlockTargetMetrics(metrics)
|
||||||
.build();
|
.build();
|
||||||
|
cache.start();
|
||||||
cache.put(blockID, data.getBytes(StandardCharsets.UTF_8));
|
cache.put(blockID, data.getBytes(StandardCharsets.UTF_8));
|
||||||
GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
|
GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
|
||||||
cache.close();
|
cache.close();
|
||||||
|
@ -228,6 +230,7 @@ public class TestLocalBlockCache {
|
||||||
.setFlusher(flusher)
|
.setFlusher(flusher)
|
||||||
.setCBlockTargetMetrics(metrics)
|
.setCBlockTargetMetrics(metrics)
|
||||||
.build();
|
.build();
|
||||||
|
cache.start();
|
||||||
long startTime = Time.monotonicNow();
|
long startTime = Time.monotonicNow();
|
||||||
for (long blockid = 0; blockid < totalBlocks; blockid++) {
|
for (long blockid = 0; blockid < totalBlocks; blockid++) {
|
||||||
cache.put(blockid, data.getBytes(StandardCharsets.UTF_8));
|
cache.put(blockid, data.getBytes(StandardCharsets.UTF_8));
|
||||||
|
@ -265,6 +268,7 @@ public class TestLocalBlockCache {
|
||||||
.setFlusher(flusher)
|
.setFlusher(flusher)
|
||||||
.setCBlockTargetMetrics(metrics)
|
.setCBlockTargetMetrics(metrics)
|
||||||
.build();
|
.build();
|
||||||
|
cache.start();
|
||||||
// Read a non-existent block ID.
|
// Read a non-existent block ID.
|
||||||
LogicalBlock block = cache.get(blockID);
|
LogicalBlock block = cache.get(blockID);
|
||||||
Assert.assertNotNull(block);
|
Assert.assertNotNull(block);
|
||||||
|
@ -298,6 +302,7 @@ public class TestLocalBlockCache {
|
||||||
.setFlusher(flusher)
|
.setFlusher(flusher)
|
||||||
.setCBlockTargetMetrics(metrics)
|
.setCBlockTargetMetrics(metrics)
|
||||||
.build();
|
.build();
|
||||||
|
cache.start();
|
||||||
for (int x = 0; x < blockCount; x++) {
|
for (int x = 0; x < blockCount; x++) {
|
||||||
String data = RandomStringUtils.random(4 * 1024);
|
String data = RandomStringUtils.random(4 * 1024);
|
||||||
String dataHash = DigestUtils.sha256Hex(data);
|
String dataHash = DigestUtils.sha256Hex(data);
|
||||||
|
@ -342,7 +347,7 @@ public class TestLocalBlockCache {
|
||||||
.setFlusher(newflusher)
|
.setFlusher(newflusher)
|
||||||
.setCBlockTargetMetrics(newMetrics)
|
.setCBlockTargetMetrics(newMetrics)
|
||||||
.build();
|
.build();
|
||||||
|
newCache.start();
|
||||||
for (Map.Entry<Long, String> entry : blockShaMap.entrySet()) {
|
for (Map.Entry<Long, String> entry : blockShaMap.entrySet()) {
|
||||||
LogicalBlock block = newCache.get(entry.getKey());
|
LogicalBlock block = newCache.get(entry.getKey());
|
||||||
String blockSha = DigestUtils.sha256Hex(block.getData().array());
|
String blockSha = DigestUtils.sha256Hex(block.getData().array());
|
||||||
|
@ -471,6 +476,7 @@ public class TestLocalBlockCache {
|
||||||
.setFlusher(flusher)
|
.setFlusher(flusher)
|
||||||
.setCBlockTargetMetrics(metrics)
|
.setCBlockTargetMetrics(metrics)
|
||||||
.build();
|
.build();
|
||||||
|
cache.start();
|
||||||
Assert.assertFalse(cache.isShortCircuitIOEnabled());
|
Assert.assertFalse(cache.isShortCircuitIOEnabled());
|
||||||
cache.put(blockID, data.getBytes(StandardCharsets.UTF_8));
|
cache.put(blockID, data.getBytes(StandardCharsets.UTF_8));
|
||||||
Assert.assertEquals(1, metrics.getNumDirectBlockWrites());
|
Assert.assertEquals(1, metrics.getNumDirectBlockWrites());
|
||||||
|
@ -498,4 +504,80 @@ public class TestLocalBlockCache {
|
||||||
GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
|
GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
|
||||||
cache.close();
|
cache.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test writes some block to the cache and then shuts down the cache.
|
||||||
|
* The cache is then restarted to check that the
|
||||||
|
* correct number of blocks are read from Dirty Log
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testEmptyBlockBufferHandling() throws IOException,
|
||||||
|
InterruptedException, TimeoutException {
|
||||||
|
// Create a new config so that this tests write metafile to new location
|
||||||
|
OzoneConfiguration flushTestConfig = new OzoneConfiguration();
|
||||||
|
URL p = flushTestConfig.getClass().getResource("");
|
||||||
|
String path = p.getPath().concat(
|
||||||
|
TestOzoneContainer.class.getSimpleName()
|
||||||
|
+ "/testEmptyBlockBufferHandling");
|
||||||
|
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
|
||||||
|
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
|
||||||
|
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
|
||||||
|
|
||||||
|
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
|
||||||
|
String userName = "user" + RandomStringUtils.randomNumeric(4);
|
||||||
|
String data = RandomStringUtils.random(4 * KB);
|
||||||
|
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
|
||||||
|
ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
|
||||||
|
xceiverClientManager, metrics);
|
||||||
|
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
|
||||||
|
.setConfiguration(flushTestConfig)
|
||||||
|
.setVolumeName(volumeName)
|
||||||
|
.setUserName(userName)
|
||||||
|
.setPipelines(getContainerPipeline(10))
|
||||||
|
.setClientManager(xceiverClientManager)
|
||||||
|
.setBlockSize(4 * KB)
|
||||||
|
.setVolumeSize(50 * GB)
|
||||||
|
.setFlusher(flusher)
|
||||||
|
.setCBlockTargetMetrics(metrics)
|
||||||
|
.build();
|
||||||
|
cache.start();
|
||||||
|
// Write data to the cache
|
||||||
|
cache.put(1, data.getBytes(StandardCharsets.UTF_8));
|
||||||
|
Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
|
||||||
|
Assert.assertEquals(1, metrics.getNumWriteOps());
|
||||||
|
cache.put(2, data.getBytes(StandardCharsets.UTF_8));
|
||||||
|
Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
|
||||||
|
Assert.assertEquals(2, metrics.getNumWriteOps());
|
||||||
|
|
||||||
|
// Store the previous block buffer position
|
||||||
|
Assert.assertEquals(2, metrics.getNumDirtyLogBlockUpdated());
|
||||||
|
// Simulate a shutdown by closing the cache
|
||||||
|
GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
|
||||||
|
cache.close();
|
||||||
|
Assert.assertEquals(2 * (Long.SIZE/ Byte.SIZE),
|
||||||
|
metrics.getNumBytesDirtyLogWritten());
|
||||||
|
Assert.assertEquals(0, metrics.getNumFailedDirtyBlockFlushes());
|
||||||
|
|
||||||
|
// Restart cache and check that right number of entries are read
|
||||||
|
CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
|
||||||
|
ContainerCacheFlusher newFlusher =
|
||||||
|
new ContainerCacheFlusher(flushTestConfig,
|
||||||
|
xceiverClientManager, newMetrics);
|
||||||
|
Thread fllushListenerThread = new Thread(newFlusher);
|
||||||
|
fllushListenerThread.setDaemon(true);
|
||||||
|
fllushListenerThread.start();
|
||||||
|
|
||||||
|
Thread.sleep(5000);
|
||||||
|
Assert.assertEquals(metrics.getNumDirtyLogBlockUpdated(),
|
||||||
|
newMetrics.getNumDirtyLogBlockRead());
|
||||||
|
Assert.assertEquals(newMetrics.getNumDirtyLogBlockRead()
|
||||||
|
* (Long.SIZE/ Byte.SIZE), newMetrics.getNumBytesDirtyLogReads());
|
||||||
|
// Now shutdown again, nothing should be flushed
|
||||||
|
newFlusher.shutdown();
|
||||||
|
Assert.assertEquals(0, newMetrics.getNumDirtyLogBlockUpdated());
|
||||||
|
Assert.assertEquals(0, newMetrics.getNumBytesDirtyLogWritten());
|
||||||
|
Assert.assertEquals(0, newMetrics.getNumFailedDirtyBlockFlushes());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue