diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java index 6b3de941fef..310dccabad0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java @@ -95,7 +95,7 @@ public class BlockWriterTask implements Runnable { flusher.getLOG().debug("Time taken for Write Small File : {} ms", endTime - startTime); - flusher.incrementremoteIO(); + flusher.incrementRemoteIO(); } catch (IOException ex) { flusher.getLOG().error("Writing of block failed, We have attempted " + @@ -119,7 +119,7 @@ public class BlockWriterTask implements Runnable { File logDir = new File(this.dbPath); if (!logDir.exists() && !logDir.mkdirs()) { flusher.getLOG().error( - "Unable to create the log directory, Crticial error cannot continue"); + "Unable to create the log directory, Critical error cannot continue"); return; } String log = Paths.get(this.dbPath, retryFileName).toString(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java index 739346397bb..9ba63eea7ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java @@ -43,6 +43,11 @@ public class CBlockTargetMetrics { @Metric private MutableCounterLong numBlockBufferFlush; @Metric private MutableCounterLong numDirectBlockWrites; @Metric private MutableCounterLong numFailedDirectBlockWrites; + @Metric private MutableCounterLong numDirtyLogBlockRead; + @Metric private MutableCounterLong numBytesDirtyLogRead; + @Metric private MutableCounterLong numDirtyLogBlockUpdated; + @Metric private MutableCounterLong numBytesDirtyLogWritten; + @Metric private MutableCounterLong numFailedDirtyBlockFlushes; // Latency based Metrics @Metric private MutableRate dbReadLatency; @@ -94,6 +99,26 @@ public class CBlockTargetMetrics { numBlockBufferFlush.incr(); } + public void incNumDirtyLogBlockRead() { + numDirtyLogBlockRead.incr(); + } + + public void incNumBytesDirtyLogRead(int bytes) { + numBytesDirtyLogRead.incr(bytes); + } + + public void incNumDirtyLogBlockUpdated() { + numDirtyLogBlockUpdated.incr(); + } + + public void incNumBytesDirtyLogWritten(int bytes) { + numBytesDirtyLogWritten.incr(bytes); + } + + public void incNumFailedDirtyBlockFlushes() { + numFailedDirtyBlockFlushes.incr(); + } + public void updateDBReadLatency(long latency) { dbReadLatency.add(latency); } @@ -157,4 +182,29 @@ public class CBlockTargetMetrics { public long getNumBlockBufferFlush() { return numBlockBufferFlush.value(); } + + @VisibleForTesting + public long getNumDirtyLogBlockRead() { + return numDirtyLogBlockRead.value(); + } + + @VisibleForTesting + public long getNumBytesDirtyLogReads() { + return numBytesDirtyLogRead.value(); + } + + @VisibleForTesting + public long getNumDirtyLogBlockUpdated() { + return numDirtyLogBlockUpdated.value(); + } + + @VisibleForTesting + public long getNumBytesDirtyLogWritten() { + return numBytesDirtyLogWritten.value(); + } + + @VisibleForTesting + public long getNumFailedDirtyBlockFlushes() { + return numFailedDirtyBlockFlushes.value(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java index 99f9ed4feae..148734fc6b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java @@ -213,7 +213,7 @@ public class ContainerCacheFlusher implements Runnable { threadPoolExecutor.shutdown(); } - public long incrementremoteIO() { + public long incrementRemoteIO() { return remoteIO.incrementAndGet(); } @@ -352,6 +352,7 @@ public class ContainerCacheFlusher implements Runnable { // bytes per long (which is calculated by number of bits per long // divided by number of bits per byte) gives the number of blocks int blockCount = blockIDBuffer.position()/(Long.SIZE / Byte.SIZE); + getTargetMetrics().incNumBytesDirtyLogRead(bytesRead); if (finishCountMap.containsKey(message.getFileName())) { // In theory this should never happen. But if it happened, // we need to know it... @@ -369,6 +370,7 @@ public class ContainerCacheFlusher implements Runnable { LOG.debug("Remaining blocks count {} and {}", blockIDBuffer.remaining(), blockCount); while (blockIDBuffer.remaining() >= (Long.SIZE / Byte.SIZE)) { + getTargetMetrics().incNumDirtyLogBlockRead(); long blockID = blockIDBuffer.getLong(); LogicalBlock block = new DiskBlock(blockID, null, false); BlockWriterTask blockWriterTask = new BlockWriterTask(block, this, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java index 2edaf331436..9a72f51de06 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java @@ -105,6 +105,16 @@ public class AsyncBlockWriter { blockIDBuffer = ByteBuffer.allocateDirect(blockBufferSize); } + public void start() throws IOException { + File logDir = new File(parentCache.getDbPath().toString()); + if (!logDir.exists() && !logDir.mkdirs()) { + LOG.error("Unable to create the log directory, Critical error cannot " + + "continue. Log Dir : {}", logDir); + throw new IllegalStateException("Cache Directory create failed, Cannot " + + "continue. Log Dir: {}" + logDir); + } + } + /** * Return the log to write to. * @@ -169,6 +179,11 @@ public class AsyncBlockWriter { block.getBlockID(), endTime - startTime, datahash); } block.clearData(); + if (blockIDBuffer.remaining() <= (Long.SIZE / Byte.SIZE)) { + writeBlockBufferToFile(blockIDBuffer); + } + parentCache.getTargetMetrics().incNumDirtyLogBlockUpdated(); + blockIDBuffer.putLong(block.getBlockID()); } else { Pipeline pipeline = parentCache.getPipeline(block.getBlockID()); String containerName = pipeline.getContainerName(); @@ -198,49 +213,59 @@ public class AsyncBlockWriter { block.clearData(); } } - if (blockIDBuffer.remaining() <= (Long.SIZE / Byte.SIZE)) { - long startTime = Time.monotonicNow(); - blockIDBuffer.flip(); - writeBlockBufferToFile(blockIDBuffer); - blockIDBuffer.clear(); - long endTime = Time.monotonicNow(); - if (parentCache.isTraceEnabled()) { - parentCache.getTracer().info( - "Task=DirtyBlockLogWrite,Time={}", endTime - startTime); - } - parentCache.getTargetMetrics().incNumBlockBufferFlush(); - parentCache.getTargetMetrics() - .updateBlockBufferFlushLatency(endTime - startTime); - } - blockIDBuffer.putLong(block.getBlockID()); } /** * Write Block Buffer to file. * - * @param blockID - ByteBuffer + * @param blockBuffer - ByteBuffer * @throws IOException */ - private void writeBlockBufferToFile(ByteBuffer blockID) + private synchronized void writeBlockBufferToFile(ByteBuffer blockBuffer) throws IOException { + long startTime = Time.monotonicNow(); boolean append = false; + int bytesWritten = 0; + + // If there is nothing written to blockId buffer, + // then skip flushing of blockId buffer + if (blockBuffer.position() == 0) { + return; + } + + blockBuffer.flip(); String fileName = String.format("%s.%s", DIRTY_LOG_PREFIX, Time.monotonicNow()); - File logDir = new File(parentCache.getDbPath().toString()); - if (!logDir.exists() && !logDir.mkdirs()) { - LOG.error("Unable to create the log directory, Critical error cannot " + - "continue. Log Dir : {}", logDir); - throw new IllegalStateException("Cache Directory create failed, Cannot " + - "continue. Log Dir: {}" + logDir); - } String log = Paths.get(parentCache.getDbPath().toString(), fileName) .toString(); - try (FileChannel channel = new FileOutputStream(log, append).getChannel()) { - channel.write(blockID); + try { + FileChannel channel = new FileOutputStream(log, append).getChannel(); + bytesWritten = channel.write(blockBuffer); + } catch (Exception ex) { + LOG.error("Unable to sync the Block map to disk -- This might cause a " + + "data loss or corruption", ex); + parentCache.getTargetMetrics().incNumFailedDirtyBlockFlushes(); + throw ex; + } finally { + blockBuffer.clear(); } - blockID.clear(); + parentCache.processDirtyMessage(fileName); + blockIDBuffer.clear(); + long endTime = Time.monotonicNow(); + if (parentCache.isTraceEnabled()) { + parentCache.getTracer().info( + "Task=DirtyBlockLogWrite,Time={} bytesWritten={}", + endTime - startTime, bytesWritten); + } + + parentCache.getTargetMetrics().incNumBytesDirtyLogWritten(bytesWritten); + parentCache.getTargetMetrics().incNumBlockBufferFlush(); + parentCache.getTargetMetrics() + .updateBlockBufferFlushLatency(endTime - startTime); + LOG.debug("Block buffer writer bytesWritten:{} Time:{}", + bytesWritten, endTime - startTime); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java index 329e8d1eb52..576338e6048 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java @@ -199,7 +199,7 @@ public class CBlockLocalCache implements CacheModule { * @param dbPathString - Path to db * @return long bytes remaining. */ - private static long getRemaningDiskSpace(String dbPathString) { + private static long getRemainingDiskSpace(String dbPathString) { try { URI fileUri = new URI("file:///"); Path dbPath = Paths.get(fileUri).resolve(dbPathString); @@ -298,7 +298,7 @@ public class CBlockLocalCache implements CacheModule { @Override public void start() throws IOException { - // This is a No-op for us. We start when we bootup. + blockWriter.start(); } @Override @@ -457,7 +457,7 @@ public class CBlockLocalCache implements CacheModule { if (StringUtils.isBlank(dbPath)) { return cacheSize; } - long spaceRemaining = getRemaningDiskSpace(dbPath); + long spaceRemaining = getRemainingDiskSpace(dbPath); double cacheRatio = 1.0; if (spaceRemaining < volumeSize) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java index 77cbfedaa64..63ae921e956 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java @@ -157,6 +157,7 @@ public class TestLocalBlockCache { .setFlusher(flusher) .setCBlockTargetMetrics(metrics) .build(); + cache.start(); cache.put(blockID, data.getBytes(StandardCharsets.UTF_8)); Assert.assertEquals(1, metrics.getNumWriteOps()); // Please note that this read is from the local cache. @@ -202,6 +203,7 @@ public class TestLocalBlockCache { .setFlusher(flusher) .setCBlockTargetMetrics(metrics) .build(); + cache.start(); cache.put(blockID, data.getBytes(StandardCharsets.UTF_8)); GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000); cache.close(); @@ -228,6 +230,7 @@ public class TestLocalBlockCache { .setFlusher(flusher) .setCBlockTargetMetrics(metrics) .build(); + cache.start(); long startTime = Time.monotonicNow(); for (long blockid = 0; blockid < totalBlocks; blockid++) { cache.put(blockid, data.getBytes(StandardCharsets.UTF_8)); @@ -265,6 +268,7 @@ public class TestLocalBlockCache { .setFlusher(flusher) .setCBlockTargetMetrics(metrics) .build(); + cache.start(); // Read a non-existent block ID. LogicalBlock block = cache.get(blockID); Assert.assertNotNull(block); @@ -298,6 +302,7 @@ public class TestLocalBlockCache { .setFlusher(flusher) .setCBlockTargetMetrics(metrics) .build(); + cache.start(); for (int x = 0; x < blockCount; x++) { String data = RandomStringUtils.random(4 * 1024); String dataHash = DigestUtils.sha256Hex(data); @@ -342,7 +347,7 @@ public class TestLocalBlockCache { .setFlusher(newflusher) .setCBlockTargetMetrics(newMetrics) .build(); - + newCache.start(); for (Map.Entry entry : blockShaMap.entrySet()) { LogicalBlock block = newCache.get(entry.getKey()); String blockSha = DigestUtils.sha256Hex(block.getData().array()); @@ -471,6 +476,7 @@ public class TestLocalBlockCache { .setFlusher(flusher) .setCBlockTargetMetrics(metrics) .build(); + cache.start(); Assert.assertFalse(cache.isShortCircuitIOEnabled()); cache.put(blockID, data.getBytes(StandardCharsets.UTF_8)); Assert.assertEquals(1, metrics.getNumDirectBlockWrites()); @@ -498,4 +504,80 @@ public class TestLocalBlockCache { GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000); cache.close(); } + + /** + * This test writes some block to the cache and then shuts down the cache. + * The cache is then restarted to check that the + * correct number of blocks are read from Dirty Log + * + * @throws IOException + */ + @Test + public void testEmptyBlockBufferHandling() throws IOException, + InterruptedException, TimeoutException { + // Create a new config so that this tests write metafile to new location + OzoneConfiguration flushTestConfig = new OzoneConfiguration(); + URL p = flushTestConfig.getClass().getResource(""); + String path = p.getPath().concat( + TestOzoneContainer.class.getSimpleName() + + "/testEmptyBlockBufferHandling"); + flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); + flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); + flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); + + String volumeName = "volume" + RandomStringUtils.randomNumeric(4); + String userName = "user" + RandomStringUtils.randomNumeric(4); + String data = RandomStringUtils.random(4 * KB); + CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, + xceiverClientManager, metrics); + CBlockLocalCache cache = CBlockLocalCache.newBuilder() + .setConfiguration(flushTestConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(getContainerPipeline(10)) + .setClientManager(xceiverClientManager) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(flusher) + .setCBlockTargetMetrics(metrics) + .build(); + cache.start(); + // Write data to the cache + cache.put(1, data.getBytes(StandardCharsets.UTF_8)); + Assert.assertEquals(0, metrics.getNumDirectBlockWrites()); + Assert.assertEquals(1, metrics.getNumWriteOps()); + cache.put(2, data.getBytes(StandardCharsets.UTF_8)); + Assert.assertEquals(0, metrics.getNumDirectBlockWrites()); + Assert.assertEquals(2, metrics.getNumWriteOps()); + + // Store the previous block buffer position + Assert.assertEquals(2, metrics.getNumDirtyLogBlockUpdated()); + // Simulate a shutdown by closing the cache + GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000); + cache.close(); + Assert.assertEquals(2 * (Long.SIZE/ Byte.SIZE), + metrics.getNumBytesDirtyLogWritten()); + Assert.assertEquals(0, metrics.getNumFailedDirtyBlockFlushes()); + + // Restart cache and check that right number of entries are read + CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher newFlusher = + new ContainerCacheFlusher(flushTestConfig, + xceiverClientManager, newMetrics); + Thread fllushListenerThread = new Thread(newFlusher); + fllushListenerThread.setDaemon(true); + fllushListenerThread.start(); + + Thread.sleep(5000); + Assert.assertEquals(metrics.getNumDirtyLogBlockUpdated(), + newMetrics.getNumDirtyLogBlockRead()); + Assert.assertEquals(newMetrics.getNumDirtyLogBlockRead() + * (Long.SIZE/ Byte.SIZE), newMetrics.getNumBytesDirtyLogReads()); + // Now shutdown again, nothing should be flushed + newFlusher.shutdown(); + Assert.assertEquals(0, newMetrics.getNumDirtyLogBlockUpdated()); + Assert.assertEquals(0, newMetrics.getNumBytesDirtyLogWritten()); + Assert.assertEquals(0, newMetrics.getNumFailedDirtyBlockFlushes()); + } }