From e9588c64220c66b78617402a17b94a5a2ae4b564 Mon Sep 17 00:00:00 2001 From: Chen Liang Date: Wed, 26 Apr 2017 10:36:56 -0700 Subject: [PATCH] HDFS-11627. Block Storage: Cblock cache should register with flusher to upload blocks to containers. Contributed by Mukul Kumar Singh. --- .../hadoop/cblock/CBlockConfigKeys.java | 4 + .../cblock/jscsiHelper/BlockWriterTask.java | 8 +- .../jscsiHelper/CBlockTargetMetrics.java | 44 ++++++- .../jscsiHelper/ContainerCacheFlusher.java | 46 ++++--- .../cache/impl/AsyncBlockWriter.java | 6 +- .../cache/impl/CBlockLocalCache.java | 6 +- .../cache/impl/SyncBlockReader.java | 4 + .../hadoop/cblock/TestLocalBlockCache.java | 121 +++++++++++++++++- 8 files changed, 202 insertions(+), 37 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java index b1fba41f934..74f5dc69f2c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java @@ -156,6 +156,10 @@ public final class CBlockConfigKeys { public static final int DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT = 5; + // LevelDB cache file uses an off-heap cache in LevelDB of 256 MB. + public static final String DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_KEY = + "dfs.cblock.cache.leveldb.cache.size.mb"; + public static final int DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT = 256; private CBlockConfigKeys() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java index 310dccabad0..6b5416becad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java @@ -79,6 +79,7 @@ public class BlockWriterTask implements Runnable { incTryCount(); Pipeline pipeline = flusher.getPipeline(this.dbPath, block.getBlockID()); client = flusher.getXceiverClientManager().acquireClient(pipeline); + containerName = pipeline.getContainerName(); byte[] keybuf = Longs.toByteArray(block.getBlockID()); byte[] data; long startTime = Time.monotonicNow(); @@ -97,11 +98,16 @@ public class BlockWriterTask implements Runnable { flusher.incrementRemoteIO(); - } catch (IOException ex) { + } catch (Exception ex) { flusher.getLOG().error("Writing of block failed, We have attempted " + "to write this block {} times to the container {}.Trace ID:{}", this.getTryCount(), containerName, "", ex); writeRetryBlock(block); + if (ex instanceof IOException) { + flusher.getTargetMetrics().incNumWriteIOExceptionRetryBlocks(); + } else { + flusher.getTargetMetrics().incNumWriteGenericExceptionRetryBlocks(); + } } finally { flusher.incFinishCount(fileName); if(client != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java index 9ba63eea7ad..1174c332df6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java @@ -34,19 +34,26 @@ import org.apache.hadoop.metrics2.lib.MutableRate; * as well as the latency time of read and write ops. */ public class CBlockTargetMetrics { - // Counter based Metrics + // IOPS based Metrics @Metric private MutableCounterLong numReadOps; @Metric private MutableCounterLong numWriteOps; @Metric private MutableCounterLong numReadCacheHits; @Metric private MutableCounterLong numReadCacheMiss; - @Metric private MutableCounterLong numReadLostBlocks; - @Metric private MutableCounterLong numBlockBufferFlush; + + // Cblock internal Metrics @Metric private MutableCounterLong numDirectBlockWrites; - @Metric private MutableCounterLong numFailedDirectBlockWrites; + @Metric private MutableCounterLong numBlockBufferFlush; @Metric private MutableCounterLong numDirtyLogBlockRead; - @Metric private MutableCounterLong numBytesDirtyLogRead; @Metric private MutableCounterLong numDirtyLogBlockUpdated; + @Metric private MutableCounterLong numBytesDirtyLogRead; @Metric private MutableCounterLong numBytesDirtyLogWritten; + + // Failure Metrics + @Metric private MutableCounterLong numReadLostBlocks; + @Metric private MutableCounterLong numFailedReadBlocks; + @Metric private MutableCounterLong numWriteIOExceptionRetryBlocks; + @Metric private MutableCounterLong numWriteGenericExceptionRetryBlocks; + @Metric private MutableCounterLong numFailedDirectBlockWrites; @Metric private MutableCounterLong numFailedDirtyBlockFlushes; // Latency based Metrics @@ -91,10 +98,22 @@ public class CBlockTargetMetrics { numDirectBlockWrites.incr(); } + public void incNumWriteIOExceptionRetryBlocks() { + numWriteIOExceptionRetryBlocks.incr(); + } + + public void incNumWriteGenericExceptionRetryBlocks() { + numWriteGenericExceptionRetryBlocks.incr(); + } + public void incNumFailedDirectBlockWrites() { numFailedDirectBlockWrites.incr(); } + public void incNumFailedReadBlocks() { + numFailedReadBlocks.incr(); + } + public void incNumBlockBufferFlush() { numBlockBufferFlush.incr(); } @@ -178,6 +197,21 @@ public class CBlockTargetMetrics { return numFailedDirectBlockWrites.value(); } + @VisibleForTesting + public long getNumFailedReadBlocks() { + return numFailedReadBlocks.value(); + } + + @VisibleForTesting + public long getNumWriteIOExceptionRetryBlocks() { + return numWriteIOExceptionRetryBlocks.value(); + } + + @VisibleForTesting + public long getNumWriteGenericExceptionRetryBlocks() { + return numWriteGenericExceptionRetryBlocks.value(); + } + @VisibleForTesting public long getNumBlockBufferFlush() { return numBlockBufferFlush.value(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java index 148734fc6b8..905d9bade3d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java @@ -23,6 +23,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; import org.apache.hadoop.cblock.jscsiHelper.cache.impl.DiskBlock; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.scm.XceiverClientManager; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.utils.LevelDBStore; @@ -77,8 +78,10 @@ import static org.apache.hadoop.cblock.CBlockConfigKeys .DFS_CBLOCK_CACHE_THREAD_PRIORITY; import static org.apache.hadoop.cblock.CBlockConfigKeys .DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_DISK_CACHE_PATH_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT; /** * Class that writes to remote containers. @@ -96,6 +99,7 @@ public class ContainerCacheFlusher implements Runnable { private final XceiverClientManager xceiverClientManager; private final CBlockTargetMetrics metrics; private AtomicBoolean shutdown; + private final long levelDBCacheSize; private final ConcurrentMap finishCountMap; @@ -117,6 +121,8 @@ public class ContainerCacheFlusher implements Runnable { DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT); int blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE, DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * (Long.SIZE / Byte.SIZE); + levelDBCacheSize = config.getInt(DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_KEY, + DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT) * OzoneConsts.MB; LOG.info("Cache: Core Pool Size: {}", corePoolSize); LOG.info("Cache: Keep Alive: {}", keepAlive); @@ -146,17 +152,14 @@ public class ContainerCacheFlusher implements Runnable { this.remoteIO = new AtomicLong(); this.finishCountMap = new ConcurrentHashMap<>(); - checkExisitingDirtyLog(config); } - private void checkExisitingDirtyLog(Configuration config) { - File dbPath = Paths.get(config.get(DFS_CBLOCK_DISK_CACHE_PATH_KEY, - DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT)).toFile(); + private void checkExistingDirtyLog(File dbPath) { if (!dbPath.exists()) { - LOG.info("No existing dirty log found at {}", dbPath); + LOG.debug("No existing dirty log found at {}", dbPath); return; } - LOG.info("Need to check and requeue existing dirty log {}", dbPath); + LOG.debug("Need to check and requeue existing dirty log {}", dbPath); HashMap> allFiles = new HashMap<>(); traverse(dbPath, allFiles); for (Map.Entry> entry : allFiles.entrySet()) { @@ -237,11 +240,10 @@ public class ContainerCacheFlusher implements Runnable { * Opens a DB if needed or returns a handle to an already open DB. * * @param dbPath -- dbPath - * @param cacheSize - cacheSize * @return the levelDB on the given path. * @throws IOException */ - public synchronized LevelDBStore openDB(String dbPath, int cacheSize) + public synchronized LevelDBStore openDB(String dbPath) throws IOException { if (dbMap.containsKey(dbPath)) { RefCountedDB refDB = dbMap.get(dbPath); @@ -249,7 +251,7 @@ public class ContainerCacheFlusher implements Runnable { return refDB.db; } else { Options options = new Options(); - options.cacheSize(cacheSize * (1024L * 1024L)); + options.cacheSize(levelDBCacheSize); options.createIfMissing(true); LevelDBStore cacheDB = new LevelDBStore( new File(getDBFileName(dbPath)), options); @@ -260,14 +262,19 @@ public class ContainerCacheFlusher implements Runnable { } /** - * Updates the contianer map. This data never changes so we will update this + * Updates the container map. This data never changes so we will update this * during restarts and it should not hurt us. * + * Once a CBlockLocalCache cache is registered, requeue dirty/retry log files + * for the volume + * * @param dbPath - DbPath - * @param containerList - Contianer List. + * @param containerList - Container List. */ public void register(String dbPath, Pipeline[] containerList) { + File dbFile = Paths.get(dbPath).toFile(); pipelineMap.put(dbPath, containerList); + checkExistingDirtyLog(dbFile); } private String getDBFileName(String dbPath) { @@ -363,7 +370,7 @@ public class ContainerCacheFlusher implements Runnable { } finishCountMap.put(message.getFileName(), new FinishCounter(blockCount, message.getDbPath(), - message.getFileName())); + message.getFileName(), this)); // should be flip instead of rewind, because we also need to make sure // the end position is correct. blockIDBuffer.flip(); @@ -473,14 +480,17 @@ public class ContainerCacheFlusher implements Runnable { private final String dirtyLogPath; private final AtomicLong currentCount; private AtomicBoolean fileDeleted; + private final ContainerCacheFlusher flusher; FinishCounter(long expectedCount, String dbPath, - String dirtyLogPath) { + String dirtyLogPath, ContainerCacheFlusher flusher) throws IOException { this.expectedCount = expectedCount; this.dbPath = dbPath; this.dirtyLogPath = dirtyLogPath; this.currentCount = new AtomicLong(0); this.fileDeleted = new AtomicBoolean(false); + this.flusher = flusher; + this.flusher.openDB(dbPath); } public boolean isFileDeleted() { @@ -494,6 +504,7 @@ public class ContainerCacheFlusher implements Runnable { LOG.debug( "Deleting {} with count {} {}", filePath, count, expectedCount); try { + flusher.closeDB(dbPath); Path path = Paths.get(filePath); Files.delete(path); // the following part tries to remove the directory if it is empty @@ -504,9 +515,8 @@ public class ContainerCacheFlusher implements Runnable { Files.delete(parent); }*/ fileDeleted.set(true); - } catch (IOException e) { - LOG.error( - "Error deleting dirty log file {} {}", filePath, e.toString()); + } catch (Exception e) { + LOG.error("Error deleting dirty log file:" + filePath, e); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java index 9a72f51de06..1273cd286c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java @@ -179,11 +179,11 @@ public class AsyncBlockWriter { block.getBlockID(), endTime - startTime, datahash); } block.clearData(); - if (blockIDBuffer.remaining() <= (Long.SIZE / Byte.SIZE)) { - writeBlockBufferToFile(blockIDBuffer); - } parentCache.getTargetMetrics().incNumDirtyLogBlockUpdated(); blockIDBuffer.putLong(block.getBlockID()); + if (blockIDBuffer.remaining() == 0) { + writeBlockBufferToFile(blockIDBuffer); + } } else { Pipeline pipeline = parentCache.getPipeline(block.getBlockID()); String containerName = pipeline.getContainerName(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java index 576338e6048..bd034c32e37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java @@ -69,10 +69,9 @@ public class CBlockLocalCache implements CacheModule { private final Configuration conf; /** - * LevelDB cache file, we use an off-heap cache in LevelDB for 256 MB for now. + * LevelDB cache file. */ private final LevelDBStore cacheDB; - private final int cacheSizeMb = 256; /** * Asyncblock writer updates the cacheDB and writes the blocks async to @@ -158,9 +157,10 @@ public class CBlockLocalCache implements CacheModule { throw new IllegalArgumentException("Unable to create paths. Path: " + dbPath); } - cacheDB = flusher.openDB(dbPath.toString(), cacheSizeMb); + cacheDB = flusher.openDB(dbPath.toString()); this.containerList = containerPipelines.toArray(new Pipeline[containerPipelines.size()]); + flusher.register(dbPath.toString(), containerList); this.ipAddressString = getHostIP(); this.tracePrefix = ipAddressString + ":" + this.volumeName; this.volumeSize = volumeSize; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/SyncBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/SyncBlockReader.java index 19e375620a0..533e91927ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/SyncBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/SyncBlockReader.java @@ -136,6 +136,10 @@ public class SyncBlockReader { .acquireClient(parentCache.getPipeline(blockID)); LogicalBlock block = getBlockFromContainer(blockID, client); return block; + } catch (Exception ex) { + parentCache.getTargetMetrics().incNumFailedReadBlocks(); + LOG.error("read failed for BlockId: {}", blockID, ex); + throw ex; } finally { if (client != null) { parentCache.getClientManager().releaseClient(client); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java index 63ae921e956..e578b6eb6b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java @@ -518,9 +518,7 @@ public class TestLocalBlockCache { // Create a new config so that this tests write metafile to new location OzoneConfiguration flushTestConfig = new OzoneConfiguration(); URL p = flushTestConfig.getClass().getResource(""); - String path = p.getPath().concat( - TestOzoneContainer.class.getSimpleName() - + "/testEmptyBlockBufferHandling"); + String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName()); flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); @@ -528,6 +526,8 @@ public class TestLocalBlockCache { String volumeName = "volume" + RandomStringUtils.randomNumeric(4); String userName = "user" + RandomStringUtils.randomNumeric(4); String data = RandomStringUtils.random(4 * KB); + List pipelines = getContainerPipeline(10); + CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, xceiverClientManager, metrics); @@ -535,7 +535,7 @@ public class TestLocalBlockCache { .setConfiguration(flushTestConfig) .setVolumeName(volumeName) .setUserName(userName) - .setPipelines(getContainerPipeline(10)) + .setPipelines(pipelines) .setClientManager(xceiverClientManager) .setBlockSize(4 * KB) .setVolumeSize(50 * GB) @@ -565,9 +565,21 @@ public class TestLocalBlockCache { ContainerCacheFlusher newFlusher = new ContainerCacheFlusher(flushTestConfig, xceiverClientManager, newMetrics); - Thread fllushListenerThread = new Thread(newFlusher); - fllushListenerThread.setDaemon(true); - fllushListenerThread.start(); + CBlockLocalCache newCache = CBlockLocalCache.newBuilder() + .setConfiguration(flushTestConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(pipelines) + .setClientManager(xceiverClientManager) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(newFlusher) + .setCBlockTargetMetrics(newMetrics) + .build(); + newCache.start(); + Thread flushListenerThread = new Thread(newFlusher); + flushListenerThread.setDaemon(true); + flushListenerThread.start(); Thread.sleep(5000); Assert.assertEquals(metrics.getNumDirtyLogBlockUpdated(), @@ -575,9 +587,104 @@ public class TestLocalBlockCache { Assert.assertEquals(newMetrics.getNumDirtyLogBlockRead() * (Long.SIZE/ Byte.SIZE), newMetrics.getNumBytesDirtyLogReads()); // Now shutdown again, nothing should be flushed + newCache.close(); newFlusher.shutdown(); Assert.assertEquals(0, newMetrics.getNumDirtyLogBlockUpdated()); Assert.assertEquals(0, newMetrics.getNumBytesDirtyLogWritten()); Assert.assertEquals(0, newMetrics.getNumFailedDirtyBlockFlushes()); } + + /** + * This test writes some block to the cache and then shuts down the cache + * The cache is then restarted with "short.circuit.io" disable to check + * that the blocks are read correctly from the container. + * + * @throws IOException + */ + @Test + public void testContainerWrites() throws IOException, + InterruptedException, TimeoutException { + // Create a new config so that this tests write metafile to new location + OzoneConfiguration flushTestConfig = new OzoneConfiguration(); + URL p = flushTestConfig.getClass().getResource(""); + String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName()); + flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); + flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); + flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); + + XceiverClientManager xcm = new XceiverClientManager(flushTestConfig); + String volumeName = "volume" + RandomStringUtils.randomNumeric(4); + String userName = "user" + RandomStringUtils.randomNumeric(4); + + int numUniqueBlocks = 4; + String[] data = new String[numUniqueBlocks]; + String[] dataHash = new String[numUniqueBlocks]; + for (int i = 0; i < numUniqueBlocks; i++) { + data[i] = RandomStringUtils.random(4 * KB); + dataHash[i] = DigestUtils.sha256Hex(data[i]); + } + + CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, + xcm, metrics); + List pipelines = getContainerPipeline(10); + CBlockLocalCache cache = CBlockLocalCache.newBuilder() + .setConfiguration(flushTestConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(pipelines) + .setClientManager(xcm) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(flusher) + .setCBlockTargetMetrics(metrics) + .build(); + cache.start(); + Thread fllushListenerThread = new Thread(flusher); + fllushListenerThread.setDaemon(true); + fllushListenerThread.start(); + Assert.assertTrue(cache.isShortCircuitIOEnabled()); + // Write data to the cache + for (int i = 0; i < 512; i++) { + cache.put(i, data[i % numUniqueBlocks].getBytes(StandardCharsets.UTF_8)); + } + // Close the cache and flush the data to the containers + cache.close(); + Assert.assertEquals(0, metrics.getNumDirectBlockWrites()); + Assert.assertEquals(512, metrics.getNumWriteOps()); + Thread.sleep(5000); + flusher.shutdown(); + Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks()); + Assert.assertEquals(0, metrics.getNumWriteGenericExceptionRetryBlocks()); + + // Now disable DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO and restart cache + flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false); + CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher newFlusher = + new ContainerCacheFlusher(flushTestConfig, xcm, newMetrics); + CBlockLocalCache newCache = CBlockLocalCache.newBuilder() + .setConfiguration(flushTestConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(pipelines) + .setClientManager(xcm) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(newFlusher) + .setCBlockTargetMetrics(newMetrics) + .build(); + newCache.start(); + Assert.assertFalse(newCache.isShortCircuitIOEnabled()); + // this read will be from the container, also match the hash + for (int i = 0; i < 512; i++) { + LogicalBlock block = newCache.get(i); + String readHash = DigestUtils.sha256Hex(block.getData().array()); + Assert.assertEquals("File content does not match, for index:" + + i, dataHash[i % numUniqueBlocks], readHash); + } + Assert.assertEquals(0, newMetrics.getNumReadLostBlocks()); + Assert.assertEquals(0, newMetrics.getNumFailedReadBlocks()); + newFlusher.shutdown(); + newCache.close(); + } }