HDFS-11627. Block Storage: Cblock cache should register with flusher to upload blocks to containers. Contributed by Mukul Kumar Singh.

This commit is contained in:
Chen Liang 2017-04-26 10:36:56 -07:00
parent eae8c2a469
commit 50dd3a5cfa
8 changed files with 202 additions and 37 deletions

View File

@ -156,6 +156,10 @@ public final class CBlockConfigKeys {
public static final int DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT =
5;
// LevelDB cache file uses an off-heap cache in LevelDB of 256 MB.
public static final String DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_KEY =
"dfs.cblock.cache.leveldb.cache.size.mb";
public static final int DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT = 256;
private CBlockConfigKeys() {

View File

@ -79,6 +79,7 @@ public class BlockWriterTask implements Runnable {
incTryCount();
Pipeline pipeline = flusher.getPipeline(this.dbPath, block.getBlockID());
client = flusher.getXceiverClientManager().acquireClient(pipeline);
containerName = pipeline.getContainerName();
byte[] keybuf = Longs.toByteArray(block.getBlockID());
byte[] data;
long startTime = Time.monotonicNow();
@ -97,11 +98,16 @@ public class BlockWriterTask implements Runnable {
flusher.incrementRemoteIO();
} catch (IOException ex) {
} catch (Exception ex) {
flusher.getLOG().error("Writing of block failed, We have attempted " +
"to write this block {} times to the container {}.Trace ID:{}",
this.getTryCount(), containerName, "", ex);
writeRetryBlock(block);
if (ex instanceof IOException) {
flusher.getTargetMetrics().incNumWriteIOExceptionRetryBlocks();
} else {
flusher.getTargetMetrics().incNumWriteGenericExceptionRetryBlocks();
}
} finally {
flusher.incFinishCount(fileName);
if(client != null) {

View File

@ -34,19 +34,26 @@ import org.apache.hadoop.metrics2.lib.MutableRate;
* as well as the latency time of read and write ops.
*/
public class CBlockTargetMetrics {
// Counter based Metrics
// IOPS based Metrics
@Metric private MutableCounterLong numReadOps;
@Metric private MutableCounterLong numWriteOps;
@Metric private MutableCounterLong numReadCacheHits;
@Metric private MutableCounterLong numReadCacheMiss;
@Metric private MutableCounterLong numReadLostBlocks;
@Metric private MutableCounterLong numBlockBufferFlush;
// Cblock internal Metrics
@Metric private MutableCounterLong numDirectBlockWrites;
@Metric private MutableCounterLong numFailedDirectBlockWrites;
@Metric private MutableCounterLong numBlockBufferFlush;
@Metric private MutableCounterLong numDirtyLogBlockRead;
@Metric private MutableCounterLong numBytesDirtyLogRead;
@Metric private MutableCounterLong numDirtyLogBlockUpdated;
@Metric private MutableCounterLong numBytesDirtyLogRead;
@Metric private MutableCounterLong numBytesDirtyLogWritten;
// Failure Metrics
@Metric private MutableCounterLong numReadLostBlocks;
@Metric private MutableCounterLong numFailedReadBlocks;
@Metric private MutableCounterLong numWriteIOExceptionRetryBlocks;
@Metric private MutableCounterLong numWriteGenericExceptionRetryBlocks;
@Metric private MutableCounterLong numFailedDirectBlockWrites;
@Metric private MutableCounterLong numFailedDirtyBlockFlushes;
// Latency based Metrics
@ -91,10 +98,22 @@ public class CBlockTargetMetrics {
numDirectBlockWrites.incr();
}
public void incNumWriteIOExceptionRetryBlocks() {
numWriteIOExceptionRetryBlocks.incr();
}
public void incNumWriteGenericExceptionRetryBlocks() {
numWriteGenericExceptionRetryBlocks.incr();
}
public void incNumFailedDirectBlockWrites() {
numFailedDirectBlockWrites.incr();
}
public void incNumFailedReadBlocks() {
numFailedReadBlocks.incr();
}
public void incNumBlockBufferFlush() {
numBlockBufferFlush.incr();
}
@ -178,6 +197,21 @@ public class CBlockTargetMetrics {
return numFailedDirectBlockWrites.value();
}
@VisibleForTesting
public long getNumFailedReadBlocks() {
return numFailedReadBlocks.value();
}
@VisibleForTesting
public long getNumWriteIOExceptionRetryBlocks() {
return numWriteIOExceptionRetryBlocks.value();
}
@VisibleForTesting
public long getNumWriteGenericExceptionRetryBlocks() {
return numWriteGenericExceptionRetryBlocks.value();
}
@VisibleForTesting
public long getNumBlockBufferFlush() {
return numBlockBufferFlush.value();

View File

@ -23,6 +23,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
import org.apache.hadoop.cblock.jscsiHelper.cache.impl.DiskBlock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.utils.LevelDBStore;
@ -77,8 +78,10 @@ import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_CACHE_THREAD_PRIORITY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_DISK_CACHE_PATH_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT;
/**
* Class that writes to remote containers.
@ -96,6 +99,7 @@ public class ContainerCacheFlusher implements Runnable {
private final XceiverClientManager xceiverClientManager;
private final CBlockTargetMetrics metrics;
private AtomicBoolean shutdown;
private final long levelDBCacheSize;
private final ConcurrentMap<String, FinishCounter> finishCountMap;
@ -117,6 +121,8 @@ public class ContainerCacheFlusher implements Runnable {
DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT);
int blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE,
DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * (Long.SIZE / Byte.SIZE);
levelDBCacheSize = config.getInt(DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_KEY,
DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT) * OzoneConsts.MB;
LOG.info("Cache: Core Pool Size: {}", corePoolSize);
LOG.info("Cache: Keep Alive: {}", keepAlive);
@ -146,17 +152,14 @@ public class ContainerCacheFlusher implements Runnable {
this.remoteIO = new AtomicLong();
this.finishCountMap = new ConcurrentHashMap<>();
checkExisitingDirtyLog(config);
}
private void checkExisitingDirtyLog(Configuration config) {
File dbPath = Paths.get(config.get(DFS_CBLOCK_DISK_CACHE_PATH_KEY,
DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT)).toFile();
private void checkExistingDirtyLog(File dbPath) {
if (!dbPath.exists()) {
LOG.info("No existing dirty log found at {}", dbPath);
LOG.debug("No existing dirty log found at {}", dbPath);
return;
}
LOG.info("Need to check and requeue existing dirty log {}", dbPath);
LOG.debug("Need to check and requeue existing dirty log {}", dbPath);
HashMap<String, ArrayList<String>> allFiles = new HashMap<>();
traverse(dbPath, allFiles);
for (Map.Entry<String, ArrayList<String>> entry : allFiles.entrySet()) {
@ -237,11 +240,10 @@ public class ContainerCacheFlusher implements Runnable {
* Opens a DB if needed or returns a handle to an already open DB.
*
* @param dbPath -- dbPath
* @param cacheSize - cacheSize
* @return the levelDB on the given path.
* @throws IOException
*/
public synchronized LevelDBStore openDB(String dbPath, int cacheSize)
public synchronized LevelDBStore openDB(String dbPath)
throws IOException {
if (dbMap.containsKey(dbPath)) {
RefCountedDB refDB = dbMap.get(dbPath);
@ -249,7 +251,7 @@ public class ContainerCacheFlusher implements Runnable {
return refDB.db;
} else {
Options options = new Options();
options.cacheSize(cacheSize * (1024L * 1024L));
options.cacheSize(levelDBCacheSize);
options.createIfMissing(true);
LevelDBStore cacheDB = new LevelDBStore(
new File(getDBFileName(dbPath)), options);
@ -260,14 +262,19 @@ public class ContainerCacheFlusher implements Runnable {
}
/**
* Updates the contianer map. This data never changes so we will update this
* Updates the container map. This data never changes so we will update this
* during restarts and it should not hurt us.
*
* Once a CBlockLocalCache cache is registered, requeue dirty/retry log files
* for the volume
*
* @param dbPath - DbPath
* @param containerList - Contianer List.
* @param containerList - Container List.
*/
public void register(String dbPath, Pipeline[] containerList) {
File dbFile = Paths.get(dbPath).toFile();
pipelineMap.put(dbPath, containerList);
checkExistingDirtyLog(dbFile);
}
private String getDBFileName(String dbPath) {
@ -363,7 +370,7 @@ public class ContainerCacheFlusher implements Runnable {
}
finishCountMap.put(message.getFileName(),
new FinishCounter(blockCount, message.getDbPath(),
message.getFileName()));
message.getFileName(), this));
// should be flip instead of rewind, because we also need to make sure
// the end position is correct.
blockIDBuffer.flip();
@ -473,14 +480,17 @@ public class ContainerCacheFlusher implements Runnable {
private final String dirtyLogPath;
private final AtomicLong currentCount;
private AtomicBoolean fileDeleted;
private final ContainerCacheFlusher flusher;
FinishCounter(long expectedCount, String dbPath,
String dirtyLogPath) {
String dirtyLogPath, ContainerCacheFlusher flusher) throws IOException {
this.expectedCount = expectedCount;
this.dbPath = dbPath;
this.dirtyLogPath = dirtyLogPath;
this.currentCount = new AtomicLong(0);
this.fileDeleted = new AtomicBoolean(false);
this.flusher = flusher;
this.flusher.openDB(dbPath);
}
public boolean isFileDeleted() {
@ -494,6 +504,7 @@ public class ContainerCacheFlusher implements Runnable {
LOG.debug(
"Deleting {} with count {} {}", filePath, count, expectedCount);
try {
flusher.closeDB(dbPath);
Path path = Paths.get(filePath);
Files.delete(path);
// the following part tries to remove the directory if it is empty
@ -504,9 +515,8 @@ public class ContainerCacheFlusher implements Runnable {
Files.delete(parent);
}*/
fileDeleted.set(true);
} catch (IOException e) {
LOG.error(
"Error deleting dirty log file {} {}", filePath, e.toString());
} catch (Exception e) {
LOG.error("Error deleting dirty log file:" + filePath, e);
}
}
}

View File

@ -179,11 +179,11 @@ public class AsyncBlockWriter {
block.getBlockID(), endTime - startTime, datahash);
}
block.clearData();
if (blockIDBuffer.remaining() <= (Long.SIZE / Byte.SIZE)) {
writeBlockBufferToFile(blockIDBuffer);
}
parentCache.getTargetMetrics().incNumDirtyLogBlockUpdated();
blockIDBuffer.putLong(block.getBlockID());
if (blockIDBuffer.remaining() == 0) {
writeBlockBufferToFile(blockIDBuffer);
}
} else {
Pipeline pipeline = parentCache.getPipeline(block.getBlockID());
String containerName = pipeline.getContainerName();

View File

@ -69,10 +69,9 @@ public class CBlockLocalCache implements CacheModule {
private final Configuration conf;
/**
* LevelDB cache file, we use an off-heap cache in LevelDB for 256 MB for now.
* LevelDB cache file.
*/
private final LevelDBStore cacheDB;
private final int cacheSizeMb = 256;
/**
* Asyncblock writer updates the cacheDB and writes the blocks async to
@ -158,9 +157,10 @@ public class CBlockLocalCache implements CacheModule {
throw new IllegalArgumentException("Unable to create paths. Path: " +
dbPath);
}
cacheDB = flusher.openDB(dbPath.toString(), cacheSizeMb);
cacheDB = flusher.openDB(dbPath.toString());
this.containerList = containerPipelines.toArray(new
Pipeline[containerPipelines.size()]);
flusher.register(dbPath.toString(), containerList);
this.ipAddressString = getHostIP();
this.tracePrefix = ipAddressString + ":" + this.volumeName;
this.volumeSize = volumeSize;

View File

@ -136,6 +136,10 @@ public class SyncBlockReader {
.acquireClient(parentCache.getPipeline(blockID));
LogicalBlock block = getBlockFromContainer(blockID, client);
return block;
} catch (Exception ex) {
parentCache.getTargetMetrics().incNumFailedReadBlocks();
LOG.error("read failed for BlockId: {}", blockID, ex);
throw ex;
} finally {
if (client != null) {
parentCache.getClientManager().releaseClient(client);

View File

@ -518,9 +518,7 @@ public class TestLocalBlockCache {
// Create a new config so that this tests write metafile to new location
OzoneConfiguration flushTestConfig = new OzoneConfiguration();
URL p = flushTestConfig.getClass().getResource("");
String path = p.getPath().concat(
TestOzoneContainer.class.getSimpleName()
+ "/testEmptyBlockBufferHandling");
String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
@ -528,6 +526,8 @@ public class TestLocalBlockCache {
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
String userName = "user" + RandomStringUtils.randomNumeric(4);
String data = RandomStringUtils.random(4 * KB);
List<Pipeline> pipelines = getContainerPipeline(10);
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
xceiverClientManager, metrics);
@ -535,7 +535,7 @@ public class TestLocalBlockCache {
.setConfiguration(flushTestConfig)
.setVolumeName(volumeName)
.setUserName(userName)
.setPipelines(getContainerPipeline(10))
.setPipelines(pipelines)
.setClientManager(xceiverClientManager)
.setBlockSize(4 * KB)
.setVolumeSize(50 * GB)
@ -565,9 +565,21 @@ public class TestLocalBlockCache {
ContainerCacheFlusher newFlusher =
new ContainerCacheFlusher(flushTestConfig,
xceiverClientManager, newMetrics);
Thread fllushListenerThread = new Thread(newFlusher);
fllushListenerThread.setDaemon(true);
fllushListenerThread.start();
CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
.setConfiguration(flushTestConfig)
.setVolumeName(volumeName)
.setUserName(userName)
.setPipelines(pipelines)
.setClientManager(xceiverClientManager)
.setBlockSize(4 * KB)
.setVolumeSize(50 * GB)
.setFlusher(newFlusher)
.setCBlockTargetMetrics(newMetrics)
.build();
newCache.start();
Thread flushListenerThread = new Thread(newFlusher);
flushListenerThread.setDaemon(true);
flushListenerThread.start();
Thread.sleep(5000);
Assert.assertEquals(metrics.getNumDirtyLogBlockUpdated(),
@ -575,9 +587,104 @@ public class TestLocalBlockCache {
Assert.assertEquals(newMetrics.getNumDirtyLogBlockRead()
* (Long.SIZE/ Byte.SIZE), newMetrics.getNumBytesDirtyLogReads());
// Now shutdown again, nothing should be flushed
newCache.close();
newFlusher.shutdown();
Assert.assertEquals(0, newMetrics.getNumDirtyLogBlockUpdated());
Assert.assertEquals(0, newMetrics.getNumBytesDirtyLogWritten());
Assert.assertEquals(0, newMetrics.getNumFailedDirtyBlockFlushes());
}
/**
* This test writes some block to the cache and then shuts down the cache
* The cache is then restarted with "short.circuit.io" disable to check
* that the blocks are read correctly from the container.
*
* @throws IOException
*/
@Test
public void testContainerWrites() throws IOException,
InterruptedException, TimeoutException {
// Create a new config so that this tests write metafile to new location
OzoneConfiguration flushTestConfig = new OzoneConfiguration();
URL p = flushTestConfig.getClass().getResource("");
String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
XceiverClientManager xcm = new XceiverClientManager(flushTestConfig);
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
String userName = "user" + RandomStringUtils.randomNumeric(4);
int numUniqueBlocks = 4;
String[] data = new String[numUniqueBlocks];
String[] dataHash = new String[numUniqueBlocks];
for (int i = 0; i < numUniqueBlocks; i++) {
data[i] = RandomStringUtils.random(4 * KB);
dataHash[i] = DigestUtils.sha256Hex(data[i]);
}
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
xcm, metrics);
List<Pipeline> pipelines = getContainerPipeline(10);
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
.setConfiguration(flushTestConfig)
.setVolumeName(volumeName)
.setUserName(userName)
.setPipelines(pipelines)
.setClientManager(xcm)
.setBlockSize(4 * KB)
.setVolumeSize(50 * GB)
.setFlusher(flusher)
.setCBlockTargetMetrics(metrics)
.build();
cache.start();
Thread fllushListenerThread = new Thread(flusher);
fllushListenerThread.setDaemon(true);
fllushListenerThread.start();
Assert.assertTrue(cache.isShortCircuitIOEnabled());
// Write data to the cache
for (int i = 0; i < 512; i++) {
cache.put(i, data[i % numUniqueBlocks].getBytes(StandardCharsets.UTF_8));
}
// Close the cache and flush the data to the containers
cache.close();
Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
Assert.assertEquals(512, metrics.getNumWriteOps());
Thread.sleep(5000);
flusher.shutdown();
Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks());
Assert.assertEquals(0, metrics.getNumWriteGenericExceptionRetryBlocks());
// Now disable DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO and restart cache
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false);
CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
ContainerCacheFlusher newFlusher =
new ContainerCacheFlusher(flushTestConfig, xcm, newMetrics);
CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
.setConfiguration(flushTestConfig)
.setVolumeName(volumeName)
.setUserName(userName)
.setPipelines(pipelines)
.setClientManager(xcm)
.setBlockSize(4 * KB)
.setVolumeSize(50 * GB)
.setFlusher(newFlusher)
.setCBlockTargetMetrics(newMetrics)
.build();
newCache.start();
Assert.assertFalse(newCache.isShortCircuitIOEnabled());
// this read will be from the container, also match the hash
for (int i = 0; i < 512; i++) {
LogicalBlock block = newCache.get(i);
String readHash = DigestUtils.sha256Hex(block.getData().array());
Assert.assertEquals("File content does not match, for index:"
+ i, dataHash[i % numUniqueBlocks], readHash);
}
Assert.assertEquals(0, newMetrics.getNumReadLostBlocks());
Assert.assertEquals(0, newMetrics.getNumFailedReadBlocks());
newFlusher.shutdown();
newCache.close();
}
}