HDFS-11727. Block Storage: Retry Blocks should be requeued when cblock is restarted. Contributed by Mukul Kumar Singh.

This commit is contained in:
Chen Liang 2017-05-23 10:55:17 -07:00 committed by Owen O'Malley
parent 37642c12df
commit 0753e094d7
7 changed files with 236 additions and 30 deletions

View File

@ -166,6 +166,21 @@ public final class CBlockConfigKeys {
"dfs.cblock.cache.leveldb.cache.size.mb"; "dfs.cblock.cache.leveldb.cache.size.mb";
public static final int DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT = 256; public static final int DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT = 256;
/**
* Cache does an best case attempt to write a block to a container.
* At some point of time, we will need to handle the case where we did try
* 64K times and is till not able to write to the container.
*
* TODO: We will need cBlock Server to allow us to do a remapping of the
* block location in case of failures, at that point we should reduce the
* retry count to a more normal number. This is approximately 18 hours of
* retry.
*/
public static final String DFS_CBLOCK_CACHE_MAX_RETRY_KEY =
"dfs.cblock.cache.max.retry";
public static final int DFS_CBLOCK_CACHE_MAX_RETRY_DEFAULT =
64 * 1024;
private CBlockConfigKeys() { private CBlockConfigKeys() {
} }

View File

@ -20,10 +20,12 @@ package org.apache.hadoop.cblock.jscsiHelper;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
import org.apache.hadoop.cblock.jscsiHelper.cache.impl.AsyncBlockWriter;
import org.apache.hadoop.scm.XceiverClientSpi; import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.storage.ContainerProtocolCalls; import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.utils.LevelDBStore;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
@ -41,7 +43,7 @@ public class BlockWriterTask implements Runnable {
private final ContainerCacheFlusher flusher; private final ContainerCacheFlusher flusher;
private final String dbPath; private final String dbPath;
private final String fileName; private final String fileName;
private static final String RETRY_LOG_PREFIX = "RetryLog"; private final int maxRetryCount;
/** /**
* Constructs a BlockWriterTask. * Constructs a BlockWriterTask.
@ -50,12 +52,13 @@ public class BlockWriterTask implements Runnable {
* @param flusher - ContainerCacheFlusher. * @param flusher - ContainerCacheFlusher.
*/ */
public BlockWriterTask(LogicalBlock block, ContainerCacheFlusher flusher, public BlockWriterTask(LogicalBlock block, ContainerCacheFlusher flusher,
String dbPath, String fileName) { String dbPath, int tryCount, String fileName, int maxRetryCount) {
this.block = block; this.block = block;
this.flusher = flusher; this.flusher = flusher;
this.dbPath = dbPath; this.dbPath = dbPath;
tryCount = 0; this.tryCount = tryCount;
this.fileName = fileName; this.fileName = fileName;
this.maxRetryCount = maxRetryCount;
} }
/** /**
@ -73,6 +76,7 @@ public class BlockWriterTask implements Runnable {
public void run() { public void run() {
String containerName = null; String containerName = null;
XceiverClientSpi client = null; XceiverClientSpi client = null;
LevelDBStore levelDBStore = null;
flusher.getLOG().debug( flusher.getLOG().debug(
"Writing block to remote. block ID: {}", block.getBlockID()); "Writing block to remote. block ID: {}", block.getBlockID());
try { try {
@ -83,7 +87,9 @@ public class BlockWriterTask implements Runnable {
byte[] keybuf = Longs.toByteArray(block.getBlockID()); byte[] keybuf = Longs.toByteArray(block.getBlockID());
byte[] data; byte[] data;
long startTime = Time.monotonicNow(); long startTime = Time.monotonicNow();
data = flusher.getCacheDB(this.dbPath).get(keybuf); levelDBStore = flusher.getCacheDB(this.dbPath);
data = levelDBStore.get(keybuf);
Preconditions.checkNotNull(data);
long endTime = Time.monotonicNow(); long endTime = Time.monotonicNow();
Preconditions.checkState(data.length > 0, "Block data is zero length"); Preconditions.checkState(data.length > 0, "Block data is zero length");
startTime = Time.monotonicNow(); startTime = Time.monotonicNow();
@ -99,17 +105,23 @@ public class BlockWriterTask implements Runnable {
flusher.incrementRemoteIO(); flusher.incrementRemoteIO();
} catch (Exception ex) { } catch (Exception ex) {
flusher.getLOG().error("Writing of block failed, We have attempted " + flusher.getLOG().error("Writing of block:{} failed, We have attempted " +
"to write this block {} times to the container {}.Trace ID:{}", "to write this block {} times to the container {}.Trace ID:{}",
this.getTryCount(), containerName, "", ex); block.getBlockID(), this.getTryCount(), containerName, "", ex);
writeRetryBlock(block); writeRetryBlock(block);
if (ex instanceof IOException) { if (ex instanceof IOException) {
flusher.getTargetMetrics().incNumWriteIOExceptionRetryBlocks(); flusher.getTargetMetrics().incNumWriteIOExceptionRetryBlocks();
} else { } else {
flusher.getTargetMetrics().incNumWriteGenericExceptionRetryBlocks(); flusher.getTargetMetrics().incNumWriteGenericExceptionRetryBlocks();
} }
if (this.getTryCount() >= maxRetryCount) {
flusher.getTargetMetrics().incNumWriteMaxRetryBlocks();
}
} finally { } finally {
flusher.incFinishCount(fileName); flusher.incFinishCount(fileName);
if (levelDBStore != null) {
flusher.releaseCacheDB(dbPath);
}
if(client != null) { if(client != null) {
flusher.getXceiverClientManager().releaseClient(client); flusher.getXceiverClientManager().releaseClient(client);
} }
@ -120,8 +132,8 @@ public class BlockWriterTask implements Runnable {
private void writeRetryBlock(LogicalBlock currentBlock) { private void writeRetryBlock(LogicalBlock currentBlock) {
boolean append = false; boolean append = false;
String retryFileName = String retryFileName =
String.format("%s.%d.%s", RETRY_LOG_PREFIX, currentBlock.getBlockID(), String.format("%s.%d.%s.%s", AsyncBlockWriter.RETRY_LOG_PREFIX,
Time.monotonicNow()); currentBlock.getBlockID(), Time.monotonicNow(), tryCount);
File logDir = new File(this.dbPath); File logDir = new File(this.dbPath);
if (!logDir.exists() && !logDir.mkdirs()) { if (!logDir.exists() && !logDir.mkdirs()) {
flusher.getLOG().error( flusher.getLOG().error(
@ -131,12 +143,14 @@ public class BlockWriterTask implements Runnable {
String log = Paths.get(this.dbPath, retryFileName).toString(); String log = Paths.get(this.dbPath, retryFileName).toString();
ByteBuffer buffer = ByteBuffer.allocate(Long.SIZE / Byte.SIZE); ByteBuffer buffer = ByteBuffer.allocate(Long.SIZE / Byte.SIZE);
buffer.putLong(currentBlock.getBlockID()); buffer.putLong(currentBlock.getBlockID());
buffer.flip();
try { try {
FileChannel channel = new FileOutputStream(log, append).getChannel(); FileChannel channel = new FileOutputStream(log, append).getChannel();
channel.write(buffer); channel.write(buffer);
channel.close(); channel.close();
flusher.processDirtyBlocks(this.dbPath, retryFileName); flusher.processDirtyBlocks(this.dbPath, retryFileName);
} catch (IOException e) { } catch (IOException e) {
flusher.getTargetMetrics().incNumFailedRetryLogFileWrites();
flusher.getLOG().error("Unable to write the retry block. Block ID: {}", flusher.getLOG().error("Unable to write the retry block. Block ID: {}",
currentBlock.getBlockID(), e); currentBlock.getBlockID(), e);
} }

View File

@ -48,6 +48,8 @@ public class CBlockTargetMetrics {
@Metric private MutableCounterLong numBlockBufferFlushCompleted; @Metric private MutableCounterLong numBlockBufferFlushCompleted;
@Metric private MutableCounterLong numBlockBufferFlushTriggered; @Metric private MutableCounterLong numBlockBufferFlushTriggered;
@Metric private MutableCounterLong numBlockBufferUpdates; @Metric private MutableCounterLong numBlockBufferUpdates;
@Metric private MutableCounterLong numRetryLogBlockRead;
@Metric private MutableCounterLong numBytesRetryLogRead;
// Failure Metrics // Failure Metrics
@Metric private MutableCounterLong numReadLostBlocks; @Metric private MutableCounterLong numReadLostBlocks;
@ -59,6 +61,9 @@ public class CBlockTargetMetrics {
@Metric private MutableCounterLong numFailedDirtyLogFileDeletes; @Metric private MutableCounterLong numFailedDirtyLogFileDeletes;
@Metric private MutableCounterLong numFailedBlockBufferFlushes; @Metric private MutableCounterLong numFailedBlockBufferFlushes;
@Metric private MutableCounterLong numInterruptedBufferWaits; @Metric private MutableCounterLong numInterruptedBufferWaits;
@Metric private MutableCounterLong numFailedRetryLogFileWrites;
@Metric private MutableCounterLong numWriteMaxRetryBlocks;
@Metric private MutableCounterLong numFailedReleaseLevelDB;
// Latency based Metrics // Latency based Metrics
@Metric private MutableRate dbReadLatency; @Metric private MutableRate dbReadLatency;
@ -138,6 +143,14 @@ public class CBlockTargetMetrics {
numBlockBufferUpdates.incr(); numBlockBufferUpdates.incr();
} }
public void incNumRetryLogBlockRead() {
numRetryLogBlockRead.incr();
}
public void incNumBytesRetryLogRead(int bytes) {
numBytesRetryLogRead.incr(bytes);
}
public void incNumBytesDirtyLogWritten(int bytes) { public void incNumBytesDirtyLogWritten(int bytes) {
numBytesDirtyLogWritten.incr(bytes); numBytesDirtyLogWritten.incr(bytes);
} }
@ -158,6 +171,18 @@ public class CBlockTargetMetrics {
numFailedDirtyLogFileDeletes.incr(); numFailedDirtyLogFileDeletes.incr();
} }
public void incNumFailedRetryLogFileWrites() {
numFailedRetryLogFileWrites.incr();
}
public void incNumWriteMaxRetryBlocks() {
numWriteMaxRetryBlocks.incr();
}
public void incNumFailedReleaseLevelDB() {
numFailedReleaseLevelDB.incr();
}
public void updateDBReadLatency(long latency) { public void updateDBReadLatency(long latency) {
dbReadLatency.add(latency); dbReadLatency.add(latency);
} }
@ -257,6 +282,16 @@ public class CBlockTargetMetrics {
return numBlockBufferUpdates.value(); return numBlockBufferUpdates.value();
} }
@VisibleForTesting
public long getNumRetryLogBlockRead() {
return numRetryLogBlockRead.value();
}
@VisibleForTesting
public long getNumBytesRetryLogReads() {
return numBytesRetryLogRead.value();
}
@VisibleForTesting @VisibleForTesting
public long getNumBytesDirtyLogWritten() { public long getNumBytesDirtyLogWritten() {
return numBytesDirtyLogWritten.value(); return numBytesDirtyLogWritten.value();
@ -281,4 +316,19 @@ public class CBlockTargetMetrics {
public long getNumFailedDirtyLogFileDeletes() { public long getNumFailedDirtyLogFileDeletes() {
return numFailedDirtyLogFileDeletes.value(); return numFailedDirtyLogFileDeletes.value();
} }
@VisibleForTesting
public long getNumFailedRetryLogFileWrites() {
return numFailedRetryLogFileWrites.value();
}
@VisibleForTesting
public long getNumWriteMaxRetryBlocks() {
return numWriteMaxRetryBlocks.value();
}
@VisibleForTesting
public long getNumFailedReleaseLevelDB() {
return numFailedReleaseLevelDB.value();
}
} }

View File

@ -20,7 +20,9 @@ package org.apache.hadoop.cblock.jscsiHelper;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.cblock.CBlockConfigKeys;
import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
import org.apache.hadoop.cblock.jscsiHelper.cache.impl.AsyncBlockWriter;
import org.apache.hadoop.cblock.jscsiHelper.cache.impl.DiskBlock; import org.apache.hadoop.cblock.jscsiHelper.cache.impl.DiskBlock;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneConsts;
@ -100,6 +102,7 @@ public class ContainerCacheFlusher implements Runnable {
private final CBlockTargetMetrics metrics; private final CBlockTargetMetrics metrics;
private AtomicBoolean shutdown; private AtomicBoolean shutdown;
private final long levelDBCacheSize; private final long levelDBCacheSize;
private final int maxRetryCount;
private final ConcurrentMap<String, FinishCounter> finishCountMap; private final ConcurrentMap<String, FinishCounter> finishCountMap;
@ -152,28 +155,33 @@ public class ContainerCacheFlusher implements Runnable {
this.remoteIO = new AtomicLong(); this.remoteIO = new AtomicLong();
this.finishCountMap = new ConcurrentHashMap<>(); this.finishCountMap = new ConcurrentHashMap<>();
this.maxRetryCount =
config.getInt(CBlockConfigKeys.DFS_CBLOCK_CACHE_MAX_RETRY_KEY,
CBlockConfigKeys.DFS_CBLOCK_CACHE_MAX_RETRY_DEFAULT);
} }
private void checkExistingDirtyLog(File dbPath) { private void checkExistingLog(String prefixFileName, File dbPath) {
if (!dbPath.exists()) { if (!dbPath.exists()) {
LOG.debug("No existing dirty log found at {}", dbPath); LOG.debug("No existing dirty log found at {}", dbPath);
return; return;
} }
LOG.debug("Need to check and requeue existing dirty log {}", dbPath); LOG.debug("Need to check and requeue existing dirty log {}", dbPath);
HashMap<String, ArrayList<String>> allFiles = new HashMap<>(); HashMap<String, ArrayList<String>> allFiles = new HashMap<>();
traverse(dbPath, allFiles); traverse(prefixFileName, dbPath, allFiles);
for (Map.Entry<String, ArrayList<String>> entry : allFiles.entrySet()) { for (Map.Entry<String, ArrayList<String>> entry : allFiles.entrySet()) {
String parentPath = entry.getKey(); String parentPath = entry.getKey();
for (String fileName : entry.getValue()) { for (String fileName : entry.getValue()) {
LOG.info("found this {} with {}", parentPath, fileName); LOG.info("found {} {} with prefix {}",
parentPath, fileName, prefixFileName);
processDirtyBlocks(parentPath, fileName); processDirtyBlocks(parentPath, fileName);
} }
} }
} }
private void traverse(File path, HashMap<String, ArrayList<String>> files) { private void traverse(String prefixFileName, File path,
HashMap<String, ArrayList<String>> files) {
if (path.isFile()) { if (path.isFile()) {
if (path.getName().startsWith("DirtyLog")) { if (path.getName().startsWith(prefixFileName)) {
LOG.debug("found this {} with {}", path.getParent(), path.getName()); LOG.debug("found this {} with {}", path.getParent(), path.getName());
if (!files.containsKey(path.getParent())) { if (!files.containsKey(path.getParent())) {
files.put(path.getParent(), new ArrayList<>()); files.put(path.getParent(), new ArrayList<>());
@ -184,7 +192,7 @@ public class ContainerCacheFlusher implements Runnable {
File[] listFiles = path.listFiles(); File[] listFiles = path.listFiles();
if (listFiles != null) { if (listFiles != null) {
for (File subPath : listFiles) { for (File subPath : listFiles) {
traverse(subPath, files); traverse(prefixFileName, subPath, files);
} }
} }
} }
@ -274,19 +282,28 @@ public class ContainerCacheFlusher implements Runnable {
public void register(String dbPath, Pipeline[] containerList) { public void register(String dbPath, Pipeline[] containerList) {
File dbFile = Paths.get(dbPath).toFile(); File dbFile = Paths.get(dbPath).toFile();
pipelineMap.put(dbPath, containerList); pipelineMap.put(dbPath, containerList);
checkExistingDirtyLog(dbFile); checkExistingLog(AsyncBlockWriter.DIRTY_LOG_PREFIX, dbFile);
checkExistingLog(AsyncBlockWriter.RETRY_LOG_PREFIX, dbFile);
} }
private String getDBFileName(String dbPath) { private String getDBFileName(String dbPath) {
return dbPath + ".db"; return dbPath + ".db";
} }
LevelDBStore getCacheDB(String dbPath) { public LevelDBStore getCacheDB(String dbPath) throws IOException {
return dbMap.get(dbPath).db; return openDB(dbPath);
} }
public void releaseCacheDB(String dbPath) {
try {
closeDB(dbPath);
} catch (Exception e) {
metrics.incNumFailedReleaseLevelDB();
LOG.error("LevelDB close failed, dbPath:" + dbPath, e);
}
}
/** /**
* Close the DB if we don't have any outstanding refrences. * Close the DB if we don't have any outstanding references.
* *
* @param dbPath - dbPath * @param dbPath - dbPath
* @throws IOException * @throws IOException
@ -348,18 +365,28 @@ public class ContainerCacheFlusher implements Runnable {
message.getDbPath(), message.getFileName()); message.getDbPath(), message.getFileName());
String fullPath = Paths.get(message.getDbPath(), String fullPath = Paths.get(message.getDbPath(),
message.getFileName()).toString(); message.getFileName()).toString();
String[] fileNameParts = message.getFileName().split("\\.");
Preconditions.checkState(fileNameParts.length > 1);
String fileType = fileNameParts[0];
boolean isDirtyLogFile =
fileType.equalsIgnoreCase(AsyncBlockWriter.DIRTY_LOG_PREFIX);
ReadableByteChannel fileChannel = new FileInputStream(fullPath) ReadableByteChannel fileChannel = new FileInputStream(fullPath)
.getChannel(); .getChannel();
// TODO: We can batch and unique the IOs here. First getting the code // TODO: We can batch and unique the IOs here. First getting the code
// to work, we will add those later. // to work, we will add those later.
int bytesRead = fileChannel.read(blockIDBuffer); int bytesRead = fileChannel.read(blockIDBuffer);
fileChannel.close();
LOG.debug("Read blockID log of size: {} position {} remaining {}", LOG.debug("Read blockID log of size: {} position {} remaining {}",
bytesRead, blockIDBuffer.position(), blockIDBuffer.remaining()); bytesRead, blockIDBuffer.position(), blockIDBuffer.remaining());
// current position of in the buffer in bytes, divided by number of // current position of in the buffer in bytes, divided by number of
// bytes per long (which is calculated by number of bits per long // bytes per long (which is calculated by number of bits per long
// divided by number of bits per byte) gives the number of blocks // divided by number of bits per byte) gives the number of blocks
int blockCount = blockIDBuffer.position()/(Long.SIZE / Byte.SIZE); int blockCount = blockIDBuffer.position()/(Long.SIZE / Byte.SIZE);
if (isDirtyLogFile) {
getTargetMetrics().incNumBytesDirtyLogRead(bytesRead); getTargetMetrics().incNumBytesDirtyLogRead(bytesRead);
} else {
getTargetMetrics().incNumBytesRetryLogRead(bytesRead);
}
if (finishCountMap.containsKey(message.getFileName())) { if (finishCountMap.containsKey(message.getFileName())) {
// In theory this should never happen. But if it happened, // In theory this should never happen. But if it happened,
// we need to know it... // we need to know it...
@ -375,14 +402,22 @@ public class ContainerCacheFlusher implements Runnable {
// should be flip instead of rewind, because we also need to make sure // should be flip instead of rewind, because we also need to make sure
// the end position is correct. // the end position is correct.
blockIDBuffer.flip(); blockIDBuffer.flip();
LOG.debug("Remaining blocks count {} and {}", blockIDBuffer.remaining(), LOG.info("Remaining blocks count {} and {}", blockIDBuffer.remaining(),
blockCount); blockCount);
while (blockIDBuffer.remaining() >= (Long.SIZE / Byte.SIZE)) { while (blockIDBuffer.remaining() >= (Long.SIZE / Byte.SIZE)) {
getTargetMetrics().incNumDirtyLogBlockRead();
long blockID = blockIDBuffer.getLong(); long blockID = blockIDBuffer.getLong();
int retryCount = 0;
if (isDirtyLogFile) {
getTargetMetrics().incNumDirtyLogBlockRead();
} else {
getTargetMetrics().incNumRetryLogBlockRead();
Preconditions.checkState(fileNameParts.length == 4);
retryCount = Integer.parseInt(fileNameParts[3]);
}
LogicalBlock block = new DiskBlock(blockID, null, false); LogicalBlock block = new DiskBlock(blockID, null, false);
BlockWriterTask blockWriterTask = new BlockWriterTask(block, this, BlockWriterTask blockWriterTask = new BlockWriterTask(block, this,
message.getDbPath(), message.getFileName()); message.getDbPath(), retryCount, message.getFileName(),
maxRetryCount);
threadPoolExecutor.submit(blockWriterTask); threadPoolExecutor.submit(blockWriterTask);
} }
blockIDBuffer.clear(); blockIDBuffer.clear();
@ -491,7 +526,6 @@ public class ContainerCacheFlusher implements Runnable {
this.currentCount = new AtomicLong(0); this.currentCount = new AtomicLong(0);
this.fileDeleted = new AtomicBoolean(false); this.fileDeleted = new AtomicBoolean(false);
this.flusher = flusher; this.flusher = flusher;
this.flusher.openDB(dbPath);
} }
public boolean isFileDeleted() { public boolean isFileDeleted() {
@ -505,7 +539,6 @@ public class ContainerCacheFlusher implements Runnable {
LOG.debug( LOG.debug(
"Deleting {} with count {} {}", filePath, count, expectedCount); "Deleting {} with count {} {}", filePath, count, expectedCount);
try { try {
flusher.closeDB(dbPath);
Path path = Paths.get(filePath); Path path = Paths.get(filePath);
Files.delete(path); Files.delete(path);
// the following part tries to remove the directory if it is empty // the following part tries to remove the directory if it is empty

View File

@ -69,6 +69,7 @@ public class AsyncBlockWriter {
private final CBlockLocalCache parentCache; private final CBlockLocalCache parentCache;
private final BlockBufferManager blockBufferManager; private final BlockBufferManager blockBufferManager;
public final static String DIRTY_LOG_PREFIX = "DirtyLog"; public final static String DIRTY_LOG_PREFIX = "DirtyLog";
public static final String RETRY_LOG_PREFIX = "RetryLog";
private AtomicLong localIoCount; private AtomicLong localIoCount;
/** /**

View File

@ -157,10 +157,9 @@ public class CBlockLocalCache implements CacheModule {
throw new IllegalArgumentException("Unable to create paths. Path: " + throw new IllegalArgumentException("Unable to create paths. Path: " +
dbPath); dbPath);
} }
cacheDB = flusher.openDB(dbPath.toString()); cacheDB = flusher.getCacheDB(dbPath.toString());
this.containerList = containerPipelines.toArray(new this.containerList = containerPipelines.toArray(new
Pipeline[containerPipelines.size()]); Pipeline[containerPipelines.size()]);
flusher.register(dbPath.toString(), containerList);
this.ipAddressString = getHostIP(); this.ipAddressString = getHostIP();
this.tracePrefix = ipAddressString + ":" + this.volumeName; this.tracePrefix = ipAddressString + ":" + this.volumeName;
this.volumeSize = volumeSize; this.volumeSize = volumeSize;
@ -298,6 +297,7 @@ public class CBlockLocalCache implements CacheModule {
@Override @Override
public void start() throws IOException { public void start() throws IOException {
flusher.register(getDbPath().getPath(), containerList);
blockWriter.start(); blockWriter.start();
} }
@ -309,7 +309,7 @@ public class CBlockLocalCache implements CacheModule {
public void close() throws IOException { public void close() throws IOException {
blockReader.shutdown(); blockReader.shutdown();
blockWriter.shutdown(); blockWriter.shutdown();
this.flusher.closeDB(dbPath.toString()); this.flusher.releaseCacheDB(dbPath.toString());
if (this.traceEnabled) { if (this.traceEnabled) {
getTracer().info("Task=ShutdownCache"); getTracer().info("Task=ShutdownCache");
} }
@ -593,7 +593,7 @@ public class CBlockLocalCache implements CacheModule {
"relies on private data on the pipeline, null data found."); "relies on private data on the pipeline, null data found.");
} }
Preconditions.checkNotNull(clientManager, "Client Manager canoot be " + Preconditions.checkNotNull(clientManager, "Client Manager cannot be " +
"null"); "null");
Preconditions.checkState(blockSize > 0, " Block size has to be a " + Preconditions.checkState(blockSize > 0, " Block size has to be a " +
"number greater than 0"); "number greater than 0");

View File

@ -62,6 +62,10 @@ import static org.apache.hadoop.cblock.CBlockConfigKeys.
DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO; DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO;
import static org.apache.hadoop.cblock.CBlockConfigKeys. import static org.apache.hadoop.cblock.CBlockConfigKeys.
DFS_CBLOCK_TRACE_IO; DFS_CBLOCK_TRACE_IO;
import static org.apache.hadoop.cblock.CBlockConfigKeys.
DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS;
import static org.apache.hadoop.cblock.CBlockConfigKeys.
DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
/** /**
* Tests for Tests for local cache. * Tests for Tests for local cache.
@ -518,7 +522,7 @@ public class TestLocalBlockCache {
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
flushTestConfig.setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 3);
XceiverClientManager xcm = new XceiverClientManager(flushTestConfig); XceiverClientManager xcm = new XceiverClientManager(flushTestConfig);
String volumeName = "volume" + RandomStringUtils.randomNumeric(4); String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
String userName = "user" + RandomStringUtils.randomNumeric(4); String userName = "user" + RandomStringUtils.randomNumeric(4);
@ -561,9 +565,11 @@ public class TestLocalBlockCache {
Assert.assertEquals(512, metrics.getNumWriteOps()); Assert.assertEquals(512, metrics.getNumWriteOps());
Thread.sleep(5000); Thread.sleep(5000);
flusher.shutdown(); flusher.shutdown();
Assert.assertTrue(metrics.getNumBlockBufferFlushTriggered() > 1);
Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks()); Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks());
Assert.assertEquals(0, metrics.getNumWriteGenericExceptionRetryBlocks()); Assert.assertEquals(0, metrics.getNumWriteGenericExceptionRetryBlocks());
Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB());
// Now disable DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO and restart cache // Now disable DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO and restart cache
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false); flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false);
CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create(); CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
@ -594,4 +600,91 @@ public class TestLocalBlockCache {
newCache.close(); newCache.close();
newFlusher.shutdown(); newFlusher.shutdown();
} }
@Test
public void testRetryLog() throws IOException,
InterruptedException, TimeoutException {
// Create a new config so that this tests write metafile to new location
OzoneConfiguration flushTestConfig = new OzoneConfiguration();
URL p = flushTestConfig.getClass().getResource("");
String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
flushTestConfig.setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 3);
int numblocks = 10;
flushTestConfig.setInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE, numblocks);
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
String userName = "user" + RandomStringUtils.randomNumeric(4);
String data = RandomStringUtils.random(4 * KB);
List<Pipeline> fakeContainerPipelines = new LinkedList<>();
Pipeline fakePipeline = new Pipeline("fake");
fakePipeline.setData(Longs.toByteArray(1));
fakeContainerPipelines.add(fakePipeline);
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
xceiverClientManager, metrics);
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
.setConfiguration(flushTestConfig)
.setVolumeName(volumeName)
.setUserName(userName)
.setPipelines(fakeContainerPipelines)
.setClientManager(xceiverClientManager)
.setBlockSize(4 * KB)
.setVolumeSize(50 * GB)
.setFlusher(flusher)
.setCBlockTargetMetrics(metrics)
.build();
cache.start();
Thread flushListenerThread = new Thread(flusher);
flushListenerThread.setDaemon(true);
flushListenerThread.start();
for (int i = 0; i < numblocks; i++) {
cache.put(i, data.getBytes(StandardCharsets.UTF_8));
}
Assert.assertEquals(numblocks, metrics.getNumWriteOps());
Thread.sleep(3000);
// all the writes to the container will fail because of fake pipelines
Assert.assertEquals(numblocks, metrics.getNumDirtyLogBlockRead());
Assert.assertTrue(
metrics.getNumWriteGenericExceptionRetryBlocks() >= numblocks);
Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks());
Assert.assertEquals(0, metrics.getNumFailedRetryLogFileWrites());
Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB());
cache.close();
flusher.shutdown();
// restart cache with correct pipelines, now blocks should be uploaded
// correctly
CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
ContainerCacheFlusher newFlusher =
new ContainerCacheFlusher(flushTestConfig,
xceiverClientManager, newMetrics);
CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
.setConfiguration(flushTestConfig)
.setVolumeName(volumeName)
.setUserName(userName)
.setPipelines(getContainerPipeline(10))
.setClientManager(xceiverClientManager)
.setBlockSize(4 * KB)
.setVolumeSize(50 * GB)
.setFlusher(newFlusher)
.setCBlockTargetMetrics(newMetrics)
.build();
newCache.start();
Thread newFlushListenerThread = new Thread(newFlusher);
newFlushListenerThread.setDaemon(true);
newFlushListenerThread.start();
Thread.sleep(3000);
Assert.assertTrue(newMetrics.getNumRetryLogBlockRead() >= numblocks);
Assert.assertEquals(0, newMetrics.getNumWriteGenericExceptionRetryBlocks());
Assert.assertEquals(0, newMetrics.getNumWriteIOExceptionRetryBlocks());
Assert.assertEquals(0, newMetrics.getNumFailedReleaseLevelDB());
}
} }