From 0753e094d707e6ac8add4f1e5f1ab8c73a222520 Mon Sep 17 00:00:00 2001 From: Chen Liang Date: Tue, 23 May 2017 10:55:17 -0700 Subject: [PATCH] HDFS-11727. Block Storage: Retry Blocks should be requeued when cblock is restarted. Contributed by Mukul Kumar Singh. --- .../hadoop/cblock/CBlockConfigKeys.java | 15 +++ .../cblock/jscsiHelper/BlockWriterTask.java | 30 ++++-- .../jscsiHelper/CBlockTargetMetrics.java | 50 ++++++++++ .../jscsiHelper/ContainerCacheFlusher.java | 65 ++++++++++--- .../cache/impl/AsyncBlockWriter.java | 1 + .../cache/impl/CBlockLocalCache.java | 8 +- .../hadoop/cblock/TestLocalBlockCache.java | 97 ++++++++++++++++++- 7 files changed, 236 insertions(+), 30 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java index c7222ca534d..e49c6de1f98 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java @@ -166,6 +166,21 @@ public final class CBlockConfigKeys { "dfs.cblock.cache.leveldb.cache.size.mb"; public static final int DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT = 256; + /** + * Cache does an best case attempt to write a block to a container. + * At some point of time, we will need to handle the case where we did try + * 64K times and is till not able to write to the container. + * + * TODO: We will need cBlock Server to allow us to do a remapping of the + * block location in case of failures, at that point we should reduce the + * retry count to a more normal number. This is approximately 18 hours of + * retry. + */ + public static final String DFS_CBLOCK_CACHE_MAX_RETRY_KEY = + "dfs.cblock.cache.max.retry"; + public static final int DFS_CBLOCK_CACHE_MAX_RETRY_DEFAULT = + 64 * 1024; + private CBlockConfigKeys() { } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java index 6b5416becad..68bd866fc32 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java @@ -20,10 +20,12 @@ package org.apache.hadoop.cblock.jscsiHelper; import com.google.common.base.Preconditions; import com.google.common.primitives.Longs; import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; +import org.apache.hadoop.cblock.jscsiHelper.cache.impl.AsyncBlockWriter; import org.apache.hadoop.scm.XceiverClientSpi; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.storage.ContainerProtocolCalls; import org.apache.hadoop.util.Time; +import org.apache.hadoop.utils.LevelDBStore; import java.io.File; import java.io.FileOutputStream; @@ -41,7 +43,7 @@ public class BlockWriterTask implements Runnable { private final ContainerCacheFlusher flusher; private final String dbPath; private final String fileName; - private static final String RETRY_LOG_PREFIX = "RetryLog"; + private final int maxRetryCount; /** * Constructs a BlockWriterTask. @@ -50,12 +52,13 @@ public class BlockWriterTask implements Runnable { * @param flusher - ContainerCacheFlusher. */ public BlockWriterTask(LogicalBlock block, ContainerCacheFlusher flusher, - String dbPath, String fileName) { + String dbPath, int tryCount, String fileName, int maxRetryCount) { this.block = block; this.flusher = flusher; this.dbPath = dbPath; - tryCount = 0; + this.tryCount = tryCount; this.fileName = fileName; + this.maxRetryCount = maxRetryCount; } /** @@ -73,6 +76,7 @@ public class BlockWriterTask implements Runnable { public void run() { String containerName = null; XceiverClientSpi client = null; + LevelDBStore levelDBStore = null; flusher.getLOG().debug( "Writing block to remote. block ID: {}", block.getBlockID()); try { @@ -83,7 +87,9 @@ public class BlockWriterTask implements Runnable { byte[] keybuf = Longs.toByteArray(block.getBlockID()); byte[] data; long startTime = Time.monotonicNow(); - data = flusher.getCacheDB(this.dbPath).get(keybuf); + levelDBStore = flusher.getCacheDB(this.dbPath); + data = levelDBStore.get(keybuf); + Preconditions.checkNotNull(data); long endTime = Time.monotonicNow(); Preconditions.checkState(data.length > 0, "Block data is zero length"); startTime = Time.monotonicNow(); @@ -99,17 +105,23 @@ public class BlockWriterTask implements Runnable { flusher.incrementRemoteIO(); } catch (Exception ex) { - flusher.getLOG().error("Writing of block failed, We have attempted " + + flusher.getLOG().error("Writing of block:{} failed, We have attempted " + "to write this block {} times to the container {}.Trace ID:{}", - this.getTryCount(), containerName, "", ex); + block.getBlockID(), this.getTryCount(), containerName, "", ex); writeRetryBlock(block); if (ex instanceof IOException) { flusher.getTargetMetrics().incNumWriteIOExceptionRetryBlocks(); } else { flusher.getTargetMetrics().incNumWriteGenericExceptionRetryBlocks(); } + if (this.getTryCount() >= maxRetryCount) { + flusher.getTargetMetrics().incNumWriteMaxRetryBlocks(); + } } finally { flusher.incFinishCount(fileName); + if (levelDBStore != null) { + flusher.releaseCacheDB(dbPath); + } if(client != null) { flusher.getXceiverClientManager().releaseClient(client); } @@ -120,8 +132,8 @@ public class BlockWriterTask implements Runnable { private void writeRetryBlock(LogicalBlock currentBlock) { boolean append = false; String retryFileName = - String.format("%s.%d.%s", RETRY_LOG_PREFIX, currentBlock.getBlockID(), - Time.monotonicNow()); + String.format("%s.%d.%s.%s", AsyncBlockWriter.RETRY_LOG_PREFIX, + currentBlock.getBlockID(), Time.monotonicNow(), tryCount); File logDir = new File(this.dbPath); if (!logDir.exists() && !logDir.mkdirs()) { flusher.getLOG().error( @@ -131,12 +143,14 @@ public class BlockWriterTask implements Runnable { String log = Paths.get(this.dbPath, retryFileName).toString(); ByteBuffer buffer = ByteBuffer.allocate(Long.SIZE / Byte.SIZE); buffer.putLong(currentBlock.getBlockID()); + buffer.flip(); try { FileChannel channel = new FileOutputStream(log, append).getChannel(); channel.write(buffer); channel.close(); flusher.processDirtyBlocks(this.dbPath, retryFileName); } catch (IOException e) { + flusher.getTargetMetrics().incNumFailedRetryLogFileWrites(); flusher.getLOG().error("Unable to write the retry block. Block ID: {}", currentBlock.getBlockID(), e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java index 5efb4612651..e7df0cf0b27 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java @@ -48,6 +48,8 @@ public class CBlockTargetMetrics { @Metric private MutableCounterLong numBlockBufferFlushCompleted; @Metric private MutableCounterLong numBlockBufferFlushTriggered; @Metric private MutableCounterLong numBlockBufferUpdates; + @Metric private MutableCounterLong numRetryLogBlockRead; + @Metric private MutableCounterLong numBytesRetryLogRead; // Failure Metrics @Metric private MutableCounterLong numReadLostBlocks; @@ -59,6 +61,9 @@ public class CBlockTargetMetrics { @Metric private MutableCounterLong numFailedDirtyLogFileDeletes; @Metric private MutableCounterLong numFailedBlockBufferFlushes; @Metric private MutableCounterLong numInterruptedBufferWaits; + @Metric private MutableCounterLong numFailedRetryLogFileWrites; + @Metric private MutableCounterLong numWriteMaxRetryBlocks; + @Metric private MutableCounterLong numFailedReleaseLevelDB; // Latency based Metrics @Metric private MutableRate dbReadLatency; @@ -138,6 +143,14 @@ public class CBlockTargetMetrics { numBlockBufferUpdates.incr(); } + public void incNumRetryLogBlockRead() { + numRetryLogBlockRead.incr(); + } + + public void incNumBytesRetryLogRead(int bytes) { + numBytesRetryLogRead.incr(bytes); + } + public void incNumBytesDirtyLogWritten(int bytes) { numBytesDirtyLogWritten.incr(bytes); } @@ -158,6 +171,18 @@ public class CBlockTargetMetrics { numFailedDirtyLogFileDeletes.incr(); } + public void incNumFailedRetryLogFileWrites() { + numFailedRetryLogFileWrites.incr(); + } + + public void incNumWriteMaxRetryBlocks() { + numWriteMaxRetryBlocks.incr(); + } + + public void incNumFailedReleaseLevelDB() { + numFailedReleaseLevelDB.incr(); + } + public void updateDBReadLatency(long latency) { dbReadLatency.add(latency); } @@ -257,6 +282,16 @@ public class CBlockTargetMetrics { return numBlockBufferUpdates.value(); } + @VisibleForTesting + public long getNumRetryLogBlockRead() { + return numRetryLogBlockRead.value(); + } + + @VisibleForTesting + public long getNumBytesRetryLogReads() { + return numBytesRetryLogRead.value(); + } + @VisibleForTesting public long getNumBytesDirtyLogWritten() { return numBytesDirtyLogWritten.value(); @@ -281,4 +316,19 @@ public class CBlockTargetMetrics { public long getNumFailedDirtyLogFileDeletes() { return numFailedDirtyLogFileDeletes.value(); } + + @VisibleForTesting + public long getNumFailedRetryLogFileWrites() { + return numFailedRetryLogFileWrites.value(); + } + + @VisibleForTesting + public long getNumWriteMaxRetryBlocks() { + return numWriteMaxRetryBlocks.value(); + } + + @VisibleForTesting + public long getNumFailedReleaseLevelDB() { + return numFailedReleaseLevelDB.value(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java index c796f734f1a..19372f49eaa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java @@ -20,7 +20,9 @@ package org.apache.hadoop.cblock.jscsiHelper; import com.google.common.base.Preconditions; import com.google.common.primitives.Longs; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.cblock.CBlockConfigKeys; import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; +import org.apache.hadoop.cblock.jscsiHelper.cache.impl.AsyncBlockWriter; import org.apache.hadoop.cblock.jscsiHelper.cache.impl.DiskBlock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ozone.OzoneConsts; @@ -100,6 +102,7 @@ public class ContainerCacheFlusher implements Runnable { private final CBlockTargetMetrics metrics; private AtomicBoolean shutdown; private final long levelDBCacheSize; + private final int maxRetryCount; private final ConcurrentMap finishCountMap; @@ -152,28 +155,33 @@ public class ContainerCacheFlusher implements Runnable { this.remoteIO = new AtomicLong(); this.finishCountMap = new ConcurrentHashMap<>(); + this.maxRetryCount = + config.getInt(CBlockConfigKeys.DFS_CBLOCK_CACHE_MAX_RETRY_KEY, + CBlockConfigKeys.DFS_CBLOCK_CACHE_MAX_RETRY_DEFAULT); } - private void checkExistingDirtyLog(File dbPath) { + private void checkExistingLog(String prefixFileName, File dbPath) { if (!dbPath.exists()) { LOG.debug("No existing dirty log found at {}", dbPath); return; } LOG.debug("Need to check and requeue existing dirty log {}", dbPath); HashMap> allFiles = new HashMap<>(); - traverse(dbPath, allFiles); + traverse(prefixFileName, dbPath, allFiles); for (Map.Entry> entry : allFiles.entrySet()) { String parentPath = entry.getKey(); for (String fileName : entry.getValue()) { - LOG.info("found this {} with {}", parentPath, fileName); + LOG.info("found {} {} with prefix {}", + parentPath, fileName, prefixFileName); processDirtyBlocks(parentPath, fileName); } } } - private void traverse(File path, HashMap> files) { + private void traverse(String prefixFileName, File path, + HashMap> files) { if (path.isFile()) { - if (path.getName().startsWith("DirtyLog")) { + if (path.getName().startsWith(prefixFileName)) { LOG.debug("found this {} with {}", path.getParent(), path.getName()); if (!files.containsKey(path.getParent())) { files.put(path.getParent(), new ArrayList<>()); @@ -184,7 +192,7 @@ public class ContainerCacheFlusher implements Runnable { File[] listFiles = path.listFiles(); if (listFiles != null) { for (File subPath : listFiles) { - traverse(subPath, files); + traverse(prefixFileName, subPath, files); } } } @@ -274,19 +282,28 @@ public class ContainerCacheFlusher implements Runnable { public void register(String dbPath, Pipeline[] containerList) { File dbFile = Paths.get(dbPath).toFile(); pipelineMap.put(dbPath, containerList); - checkExistingDirtyLog(dbFile); + checkExistingLog(AsyncBlockWriter.DIRTY_LOG_PREFIX, dbFile); + checkExistingLog(AsyncBlockWriter.RETRY_LOG_PREFIX, dbFile); } private String getDBFileName(String dbPath) { return dbPath + ".db"; } - LevelDBStore getCacheDB(String dbPath) { - return dbMap.get(dbPath).db; + public LevelDBStore getCacheDB(String dbPath) throws IOException { + return openDB(dbPath); } + public void releaseCacheDB(String dbPath) { + try { + closeDB(dbPath); + } catch (Exception e) { + metrics.incNumFailedReleaseLevelDB(); + LOG.error("LevelDB close failed, dbPath:" + dbPath, e); + } + } /** - * Close the DB if we don't have any outstanding refrences. + * Close the DB if we don't have any outstanding references. * * @param dbPath - dbPath * @throws IOException @@ -348,18 +365,28 @@ public class ContainerCacheFlusher implements Runnable { message.getDbPath(), message.getFileName()); String fullPath = Paths.get(message.getDbPath(), message.getFileName()).toString(); + String[] fileNameParts = message.getFileName().split("\\."); + Preconditions.checkState(fileNameParts.length > 1); + String fileType = fileNameParts[0]; + boolean isDirtyLogFile = + fileType.equalsIgnoreCase(AsyncBlockWriter.DIRTY_LOG_PREFIX); ReadableByteChannel fileChannel = new FileInputStream(fullPath) .getChannel(); // TODO: We can batch and unique the IOs here. First getting the code // to work, we will add those later. int bytesRead = fileChannel.read(blockIDBuffer); + fileChannel.close(); LOG.debug("Read blockID log of size: {} position {} remaining {}", bytesRead, blockIDBuffer.position(), blockIDBuffer.remaining()); // current position of in the buffer in bytes, divided by number of // bytes per long (which is calculated by number of bits per long // divided by number of bits per byte) gives the number of blocks int blockCount = blockIDBuffer.position()/(Long.SIZE / Byte.SIZE); - getTargetMetrics().incNumBytesDirtyLogRead(bytesRead); + if (isDirtyLogFile) { + getTargetMetrics().incNumBytesDirtyLogRead(bytesRead); + } else { + getTargetMetrics().incNumBytesRetryLogRead(bytesRead); + } if (finishCountMap.containsKey(message.getFileName())) { // In theory this should never happen. But if it happened, // we need to know it... @@ -375,14 +402,22 @@ public class ContainerCacheFlusher implements Runnable { // should be flip instead of rewind, because we also need to make sure // the end position is correct. blockIDBuffer.flip(); - LOG.debug("Remaining blocks count {} and {}", blockIDBuffer.remaining(), + LOG.info("Remaining blocks count {} and {}", blockIDBuffer.remaining(), blockCount); while (blockIDBuffer.remaining() >= (Long.SIZE / Byte.SIZE)) { - getTargetMetrics().incNumDirtyLogBlockRead(); long blockID = blockIDBuffer.getLong(); + int retryCount = 0; + if (isDirtyLogFile) { + getTargetMetrics().incNumDirtyLogBlockRead(); + } else { + getTargetMetrics().incNumRetryLogBlockRead(); + Preconditions.checkState(fileNameParts.length == 4); + retryCount = Integer.parseInt(fileNameParts[3]); + } LogicalBlock block = new DiskBlock(blockID, null, false); BlockWriterTask blockWriterTask = new BlockWriterTask(block, this, - message.getDbPath(), message.getFileName()); + message.getDbPath(), retryCount, message.getFileName(), + maxRetryCount); threadPoolExecutor.submit(blockWriterTask); } blockIDBuffer.clear(); @@ -491,7 +526,6 @@ public class ContainerCacheFlusher implements Runnable { this.currentCount = new AtomicLong(0); this.fileDeleted = new AtomicBoolean(false); this.flusher = flusher; - this.flusher.openDB(dbPath); } public boolean isFileDeleted() { @@ -505,7 +539,6 @@ public class ContainerCacheFlusher implements Runnable { LOG.debug( "Deleting {} with count {} {}", filePath, count, expectedCount); try { - flusher.closeDB(dbPath); Path path = Paths.get(filePath); Files.delete(path); // the following part tries to remove the directory if it is empty diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java index d8e98391fbc..73e17ddd159 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java @@ -69,6 +69,7 @@ public class AsyncBlockWriter { private final CBlockLocalCache parentCache; private final BlockBufferManager blockBufferManager; public final static String DIRTY_LOG_PREFIX = "DirtyLog"; + public static final String RETRY_LOG_PREFIX = "RetryLog"; private AtomicLong localIoCount; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java index bd034c32e37..1ab4d5a5237 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java @@ -157,10 +157,9 @@ public class CBlockLocalCache implements CacheModule { throw new IllegalArgumentException("Unable to create paths. Path: " + dbPath); } - cacheDB = flusher.openDB(dbPath.toString()); + cacheDB = flusher.getCacheDB(dbPath.toString()); this.containerList = containerPipelines.toArray(new Pipeline[containerPipelines.size()]); - flusher.register(dbPath.toString(), containerList); this.ipAddressString = getHostIP(); this.tracePrefix = ipAddressString + ":" + this.volumeName; this.volumeSize = volumeSize; @@ -298,6 +297,7 @@ public class CBlockLocalCache implements CacheModule { @Override public void start() throws IOException { + flusher.register(getDbPath().getPath(), containerList); blockWriter.start(); } @@ -309,7 +309,7 @@ public class CBlockLocalCache implements CacheModule { public void close() throws IOException { blockReader.shutdown(); blockWriter.shutdown(); - this.flusher.closeDB(dbPath.toString()); + this.flusher.releaseCacheDB(dbPath.toString()); if (this.traceEnabled) { getTracer().info("Task=ShutdownCache"); } @@ -593,7 +593,7 @@ public class CBlockLocalCache implements CacheModule { "relies on private data on the pipeline, null data found."); } - Preconditions.checkNotNull(clientManager, "Client Manager canoot be " + + Preconditions.checkNotNull(clientManager, "Client Manager cannot be " + "null"); Preconditions.checkState(blockSize > 0, " Block size has to be a " + "number greater than 0"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java index 2dd72b3b181..b03fb22ddc5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java @@ -62,6 +62,10 @@ import static org.apache.hadoop.cblock.CBlockConfigKeys. DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO; import static org.apache.hadoop.cblock.CBlockConfigKeys. DFS_CBLOCK_TRACE_IO; +import static org.apache.hadoop.cblock.CBlockConfigKeys. + DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS; +import static org.apache.hadoop.cblock.CBlockConfigKeys. + DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE; /** * Tests for Tests for local cache. @@ -518,7 +522,7 @@ public class TestLocalBlockCache { flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); - + flushTestConfig.setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 3); XceiverClientManager xcm = new XceiverClientManager(flushTestConfig); String volumeName = "volume" + RandomStringUtils.randomNumeric(4); String userName = "user" + RandomStringUtils.randomNumeric(4); @@ -561,9 +565,11 @@ public class TestLocalBlockCache { Assert.assertEquals(512, metrics.getNumWriteOps()); Thread.sleep(5000); flusher.shutdown(); + Assert.assertTrue(metrics.getNumBlockBufferFlushTriggered() > 1); + Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted()); Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks()); Assert.assertEquals(0, metrics.getNumWriteGenericExceptionRetryBlocks()); - + Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB()); // Now disable DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO and restart cache flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false); CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create(); @@ -594,4 +600,91 @@ public class TestLocalBlockCache { newCache.close(); newFlusher.shutdown(); } + + @Test + public void testRetryLog() throws IOException, + InterruptedException, TimeoutException { + // Create a new config so that this tests write metafile to new location + OzoneConfiguration flushTestConfig = new OzoneConfiguration(); + URL p = flushTestConfig.getClass().getResource(""); + String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName()); + flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); + flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); + flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); + flushTestConfig.setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 3); + + int numblocks = 10; + flushTestConfig.setInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE, numblocks); + + String volumeName = "volume" + RandomStringUtils.randomNumeric(4); + String userName = "user" + RandomStringUtils.randomNumeric(4); + String data = RandomStringUtils.random(4 * KB); + + List fakeContainerPipelines = new LinkedList<>(); + Pipeline fakePipeline = new Pipeline("fake"); + fakePipeline.setData(Longs.toByteArray(1)); + fakeContainerPipelines.add(fakePipeline); + + CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, + xceiverClientManager, metrics); + CBlockLocalCache cache = CBlockLocalCache.newBuilder() + .setConfiguration(flushTestConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(fakeContainerPipelines) + .setClientManager(xceiverClientManager) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(flusher) + .setCBlockTargetMetrics(metrics) + .build(); + cache.start(); + Thread flushListenerThread = new Thread(flusher); + flushListenerThread.setDaemon(true); + flushListenerThread.start(); + + for (int i = 0; i < numblocks; i++) { + cache.put(i, data.getBytes(StandardCharsets.UTF_8)); + } + Assert.assertEquals(numblocks, metrics.getNumWriteOps()); + Thread.sleep(3000); + + // all the writes to the container will fail because of fake pipelines + Assert.assertEquals(numblocks, metrics.getNumDirtyLogBlockRead()); + Assert.assertTrue( + metrics.getNumWriteGenericExceptionRetryBlocks() >= numblocks); + Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks()); + Assert.assertEquals(0, metrics.getNumFailedRetryLogFileWrites()); + Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB()); + cache.close(); + flusher.shutdown(); + + // restart cache with correct pipelines, now blocks should be uploaded + // correctly + CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher newFlusher = + new ContainerCacheFlusher(flushTestConfig, + xceiverClientManager, newMetrics); + CBlockLocalCache newCache = CBlockLocalCache.newBuilder() + .setConfiguration(flushTestConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(getContainerPipeline(10)) + .setClientManager(xceiverClientManager) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(newFlusher) + .setCBlockTargetMetrics(newMetrics) + .build(); + newCache.start(); + Thread newFlushListenerThread = new Thread(newFlusher); + newFlushListenerThread.setDaemon(true); + newFlushListenerThread.start(); + Thread.sleep(3000); + Assert.assertTrue(newMetrics.getNumRetryLogBlockRead() >= numblocks); + Assert.assertEquals(0, newMetrics.getNumWriteGenericExceptionRetryBlocks()); + Assert.assertEquals(0, newMetrics.getNumWriteIOExceptionRetryBlocks()); + Assert.assertEquals(0, newMetrics.getNumFailedReleaseLevelDB()); + } }