HDFS-11627. Block Storage: Cblock cache should register with flusher to upload blocks to containers. Contributed by Mukul Kumar Singh.

This commit is contained in:
Chen Liang 2017-04-26 10:36:56 -07:00 committed by Owen O'Malley
parent 668b056984
commit e9588c6422
8 changed files with 202 additions and 37 deletions

View File

@ -156,6 +156,10 @@ public final class CBlockConfigKeys {
public static final int DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT = public static final int DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT =
5; 5;
// LevelDB cache file uses an off-heap cache in LevelDB of 256 MB.
public static final String DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_KEY =
"dfs.cblock.cache.leveldb.cache.size.mb";
public static final int DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT = 256;
private CBlockConfigKeys() { private CBlockConfigKeys() {

View File

@ -79,6 +79,7 @@ public class BlockWriterTask implements Runnable {
incTryCount(); incTryCount();
Pipeline pipeline = flusher.getPipeline(this.dbPath, block.getBlockID()); Pipeline pipeline = flusher.getPipeline(this.dbPath, block.getBlockID());
client = flusher.getXceiverClientManager().acquireClient(pipeline); client = flusher.getXceiverClientManager().acquireClient(pipeline);
containerName = pipeline.getContainerName();
byte[] keybuf = Longs.toByteArray(block.getBlockID()); byte[] keybuf = Longs.toByteArray(block.getBlockID());
byte[] data; byte[] data;
long startTime = Time.monotonicNow(); long startTime = Time.monotonicNow();
@ -97,11 +98,16 @@ public class BlockWriterTask implements Runnable {
flusher.incrementRemoteIO(); flusher.incrementRemoteIO();
} catch (IOException ex) { } catch (Exception ex) {
flusher.getLOG().error("Writing of block failed, We have attempted " + flusher.getLOG().error("Writing of block failed, We have attempted " +
"to write this block {} times to the container {}.Trace ID:{}", "to write this block {} times to the container {}.Trace ID:{}",
this.getTryCount(), containerName, "", ex); this.getTryCount(), containerName, "", ex);
writeRetryBlock(block); writeRetryBlock(block);
if (ex instanceof IOException) {
flusher.getTargetMetrics().incNumWriteIOExceptionRetryBlocks();
} else {
flusher.getTargetMetrics().incNumWriteGenericExceptionRetryBlocks();
}
} finally { } finally {
flusher.incFinishCount(fileName); flusher.incFinishCount(fileName);
if(client != null) { if(client != null) {

View File

@ -34,19 +34,26 @@ import org.apache.hadoop.metrics2.lib.MutableRate;
* as well as the latency time of read and write ops. * as well as the latency time of read and write ops.
*/ */
public class CBlockTargetMetrics { public class CBlockTargetMetrics {
// Counter based Metrics // IOPS based Metrics
@Metric private MutableCounterLong numReadOps; @Metric private MutableCounterLong numReadOps;
@Metric private MutableCounterLong numWriteOps; @Metric private MutableCounterLong numWriteOps;
@Metric private MutableCounterLong numReadCacheHits; @Metric private MutableCounterLong numReadCacheHits;
@Metric private MutableCounterLong numReadCacheMiss; @Metric private MutableCounterLong numReadCacheMiss;
@Metric private MutableCounterLong numReadLostBlocks;
@Metric private MutableCounterLong numBlockBufferFlush; // Cblock internal Metrics
@Metric private MutableCounterLong numDirectBlockWrites; @Metric private MutableCounterLong numDirectBlockWrites;
@Metric private MutableCounterLong numFailedDirectBlockWrites; @Metric private MutableCounterLong numBlockBufferFlush;
@Metric private MutableCounterLong numDirtyLogBlockRead; @Metric private MutableCounterLong numDirtyLogBlockRead;
@Metric private MutableCounterLong numBytesDirtyLogRead;
@Metric private MutableCounterLong numDirtyLogBlockUpdated; @Metric private MutableCounterLong numDirtyLogBlockUpdated;
@Metric private MutableCounterLong numBytesDirtyLogRead;
@Metric private MutableCounterLong numBytesDirtyLogWritten; @Metric private MutableCounterLong numBytesDirtyLogWritten;
// Failure Metrics
@Metric private MutableCounterLong numReadLostBlocks;
@Metric private MutableCounterLong numFailedReadBlocks;
@Metric private MutableCounterLong numWriteIOExceptionRetryBlocks;
@Metric private MutableCounterLong numWriteGenericExceptionRetryBlocks;
@Metric private MutableCounterLong numFailedDirectBlockWrites;
@Metric private MutableCounterLong numFailedDirtyBlockFlushes; @Metric private MutableCounterLong numFailedDirtyBlockFlushes;
// Latency based Metrics // Latency based Metrics
@ -91,10 +98,22 @@ public class CBlockTargetMetrics {
numDirectBlockWrites.incr(); numDirectBlockWrites.incr();
} }
public void incNumWriteIOExceptionRetryBlocks() {
numWriteIOExceptionRetryBlocks.incr();
}
public void incNumWriteGenericExceptionRetryBlocks() {
numWriteGenericExceptionRetryBlocks.incr();
}
public void incNumFailedDirectBlockWrites() { public void incNumFailedDirectBlockWrites() {
numFailedDirectBlockWrites.incr(); numFailedDirectBlockWrites.incr();
} }
public void incNumFailedReadBlocks() {
numFailedReadBlocks.incr();
}
public void incNumBlockBufferFlush() { public void incNumBlockBufferFlush() {
numBlockBufferFlush.incr(); numBlockBufferFlush.incr();
} }
@ -178,6 +197,21 @@ public class CBlockTargetMetrics {
return numFailedDirectBlockWrites.value(); return numFailedDirectBlockWrites.value();
} }
@VisibleForTesting
public long getNumFailedReadBlocks() {
return numFailedReadBlocks.value();
}
@VisibleForTesting
public long getNumWriteIOExceptionRetryBlocks() {
return numWriteIOExceptionRetryBlocks.value();
}
@VisibleForTesting
public long getNumWriteGenericExceptionRetryBlocks() {
return numWriteGenericExceptionRetryBlocks.value();
}
@VisibleForTesting @VisibleForTesting
public long getNumBlockBufferFlush() { public long getNumBlockBufferFlush() {
return numBlockBufferFlush.value(); return numBlockBufferFlush.value();

View File

@ -23,6 +23,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
import org.apache.hadoop.cblock.jscsiHelper.cache.impl.DiskBlock; import org.apache.hadoop.cblock.jscsiHelper.cache.impl.DiskBlock;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.scm.XceiverClientManager; import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.utils.LevelDBStore; import org.apache.hadoop.utils.LevelDBStore;
@ -77,8 +78,10 @@ import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_CACHE_THREAD_PRIORITY; .DFS_CBLOCK_CACHE_THREAD_PRIORITY;
import static org.apache.hadoop.cblock.CBlockConfigKeys import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT; .DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT; import static org.apache.hadoop.cblock.CBlockConfigKeys
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_DISK_CACHE_PATH_KEY; .DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT;
/** /**
* Class that writes to remote containers. * Class that writes to remote containers.
@ -96,6 +99,7 @@ public class ContainerCacheFlusher implements Runnable {
private final XceiverClientManager xceiverClientManager; private final XceiverClientManager xceiverClientManager;
private final CBlockTargetMetrics metrics; private final CBlockTargetMetrics metrics;
private AtomicBoolean shutdown; private AtomicBoolean shutdown;
private final long levelDBCacheSize;
private final ConcurrentMap<String, FinishCounter> finishCountMap; private final ConcurrentMap<String, FinishCounter> finishCountMap;
@ -117,6 +121,8 @@ public class ContainerCacheFlusher implements Runnable {
DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT); DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT);
int blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE, int blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE,
DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * (Long.SIZE / Byte.SIZE); DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * (Long.SIZE / Byte.SIZE);
levelDBCacheSize = config.getInt(DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_KEY,
DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT) * OzoneConsts.MB;
LOG.info("Cache: Core Pool Size: {}", corePoolSize); LOG.info("Cache: Core Pool Size: {}", corePoolSize);
LOG.info("Cache: Keep Alive: {}", keepAlive); LOG.info("Cache: Keep Alive: {}", keepAlive);
@ -146,17 +152,14 @@ public class ContainerCacheFlusher implements Runnable {
this.remoteIO = new AtomicLong(); this.remoteIO = new AtomicLong();
this.finishCountMap = new ConcurrentHashMap<>(); this.finishCountMap = new ConcurrentHashMap<>();
checkExisitingDirtyLog(config);
} }
private void checkExisitingDirtyLog(Configuration config) { private void checkExistingDirtyLog(File dbPath) {
File dbPath = Paths.get(config.get(DFS_CBLOCK_DISK_CACHE_PATH_KEY,
DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT)).toFile();
if (!dbPath.exists()) { if (!dbPath.exists()) {
LOG.info("No existing dirty log found at {}", dbPath); LOG.debug("No existing dirty log found at {}", dbPath);
return; return;
} }
LOG.info("Need to check and requeue existing dirty log {}", dbPath); LOG.debug("Need to check and requeue existing dirty log {}", dbPath);
HashMap<String, ArrayList<String>> allFiles = new HashMap<>(); HashMap<String, ArrayList<String>> allFiles = new HashMap<>();
traverse(dbPath, allFiles); traverse(dbPath, allFiles);
for (Map.Entry<String, ArrayList<String>> entry : allFiles.entrySet()) { for (Map.Entry<String, ArrayList<String>> entry : allFiles.entrySet()) {
@ -237,11 +240,10 @@ public class ContainerCacheFlusher implements Runnable {
* Opens a DB if needed or returns a handle to an already open DB. * Opens a DB if needed or returns a handle to an already open DB.
* *
* @param dbPath -- dbPath * @param dbPath -- dbPath
* @param cacheSize - cacheSize
* @return the levelDB on the given path. * @return the levelDB on the given path.
* @throws IOException * @throws IOException
*/ */
public synchronized LevelDBStore openDB(String dbPath, int cacheSize) public synchronized LevelDBStore openDB(String dbPath)
throws IOException { throws IOException {
if (dbMap.containsKey(dbPath)) { if (dbMap.containsKey(dbPath)) {
RefCountedDB refDB = dbMap.get(dbPath); RefCountedDB refDB = dbMap.get(dbPath);
@ -249,7 +251,7 @@ public class ContainerCacheFlusher implements Runnable {
return refDB.db; return refDB.db;
} else { } else {
Options options = new Options(); Options options = new Options();
options.cacheSize(cacheSize * (1024L * 1024L)); options.cacheSize(levelDBCacheSize);
options.createIfMissing(true); options.createIfMissing(true);
LevelDBStore cacheDB = new LevelDBStore( LevelDBStore cacheDB = new LevelDBStore(
new File(getDBFileName(dbPath)), options); new File(getDBFileName(dbPath)), options);
@ -260,14 +262,19 @@ public class ContainerCacheFlusher implements Runnable {
} }
/** /**
* Updates the contianer map. This data never changes so we will update this * Updates the container map. This data never changes so we will update this
* during restarts and it should not hurt us. * during restarts and it should not hurt us.
* *
* Once a CBlockLocalCache cache is registered, requeue dirty/retry log files
* for the volume
*
* @param dbPath - DbPath * @param dbPath - DbPath
* @param containerList - Contianer List. * @param containerList - Container List.
*/ */
public void register(String dbPath, Pipeline[] containerList) { public void register(String dbPath, Pipeline[] containerList) {
File dbFile = Paths.get(dbPath).toFile();
pipelineMap.put(dbPath, containerList); pipelineMap.put(dbPath, containerList);
checkExistingDirtyLog(dbFile);
} }
private String getDBFileName(String dbPath) { private String getDBFileName(String dbPath) {
@ -363,7 +370,7 @@ public class ContainerCacheFlusher implements Runnable {
} }
finishCountMap.put(message.getFileName(), finishCountMap.put(message.getFileName(),
new FinishCounter(blockCount, message.getDbPath(), new FinishCounter(blockCount, message.getDbPath(),
message.getFileName())); message.getFileName(), this));
// should be flip instead of rewind, because we also need to make sure // should be flip instead of rewind, because we also need to make sure
// the end position is correct. // the end position is correct.
blockIDBuffer.flip(); blockIDBuffer.flip();
@ -473,14 +480,17 @@ public class ContainerCacheFlusher implements Runnable {
private final String dirtyLogPath; private final String dirtyLogPath;
private final AtomicLong currentCount; private final AtomicLong currentCount;
private AtomicBoolean fileDeleted; private AtomicBoolean fileDeleted;
private final ContainerCacheFlusher flusher;
FinishCounter(long expectedCount, String dbPath, FinishCounter(long expectedCount, String dbPath,
String dirtyLogPath) { String dirtyLogPath, ContainerCacheFlusher flusher) throws IOException {
this.expectedCount = expectedCount; this.expectedCount = expectedCount;
this.dbPath = dbPath; this.dbPath = dbPath;
this.dirtyLogPath = dirtyLogPath; this.dirtyLogPath = dirtyLogPath;
this.currentCount = new AtomicLong(0); this.currentCount = new AtomicLong(0);
this.fileDeleted = new AtomicBoolean(false); this.fileDeleted = new AtomicBoolean(false);
this.flusher = flusher;
this.flusher.openDB(dbPath);
} }
public boolean isFileDeleted() { public boolean isFileDeleted() {
@ -494,6 +504,7 @@ public class ContainerCacheFlusher implements Runnable {
LOG.debug( LOG.debug(
"Deleting {} with count {} {}", filePath, count, expectedCount); "Deleting {} with count {} {}", filePath, count, expectedCount);
try { try {
flusher.closeDB(dbPath);
Path path = Paths.get(filePath); Path path = Paths.get(filePath);
Files.delete(path); Files.delete(path);
// the following part tries to remove the directory if it is empty // the following part tries to remove the directory if it is empty
@ -504,9 +515,8 @@ public class ContainerCacheFlusher implements Runnable {
Files.delete(parent); Files.delete(parent);
}*/ }*/
fileDeleted.set(true); fileDeleted.set(true);
} catch (IOException e) { } catch (Exception e) {
LOG.error( LOG.error("Error deleting dirty log file:" + filePath, e);
"Error deleting dirty log file {} {}", filePath, e.toString());
} }
} }
} }

View File

@ -179,11 +179,11 @@ public class AsyncBlockWriter {
block.getBlockID(), endTime - startTime, datahash); block.getBlockID(), endTime - startTime, datahash);
} }
block.clearData(); block.clearData();
if (blockIDBuffer.remaining() <= (Long.SIZE / Byte.SIZE)) {
writeBlockBufferToFile(blockIDBuffer);
}
parentCache.getTargetMetrics().incNumDirtyLogBlockUpdated(); parentCache.getTargetMetrics().incNumDirtyLogBlockUpdated();
blockIDBuffer.putLong(block.getBlockID()); blockIDBuffer.putLong(block.getBlockID());
if (blockIDBuffer.remaining() == 0) {
writeBlockBufferToFile(blockIDBuffer);
}
} else { } else {
Pipeline pipeline = parentCache.getPipeline(block.getBlockID()); Pipeline pipeline = parentCache.getPipeline(block.getBlockID());
String containerName = pipeline.getContainerName(); String containerName = pipeline.getContainerName();

View File

@ -69,10 +69,9 @@ public class CBlockLocalCache implements CacheModule {
private final Configuration conf; private final Configuration conf;
/** /**
* LevelDB cache file, we use an off-heap cache in LevelDB for 256 MB for now. * LevelDB cache file.
*/ */
private final LevelDBStore cacheDB; private final LevelDBStore cacheDB;
private final int cacheSizeMb = 256;
/** /**
* Asyncblock writer updates the cacheDB and writes the blocks async to * Asyncblock writer updates the cacheDB and writes the blocks async to
@ -158,9 +157,10 @@ public class CBlockLocalCache implements CacheModule {
throw new IllegalArgumentException("Unable to create paths. Path: " + throw new IllegalArgumentException("Unable to create paths. Path: " +
dbPath); dbPath);
} }
cacheDB = flusher.openDB(dbPath.toString(), cacheSizeMb); cacheDB = flusher.openDB(dbPath.toString());
this.containerList = containerPipelines.toArray(new this.containerList = containerPipelines.toArray(new
Pipeline[containerPipelines.size()]); Pipeline[containerPipelines.size()]);
flusher.register(dbPath.toString(), containerList);
this.ipAddressString = getHostIP(); this.ipAddressString = getHostIP();
this.tracePrefix = ipAddressString + ":" + this.volumeName; this.tracePrefix = ipAddressString + ":" + this.volumeName;
this.volumeSize = volumeSize; this.volumeSize = volumeSize;

View File

@ -136,6 +136,10 @@ public class SyncBlockReader {
.acquireClient(parentCache.getPipeline(blockID)); .acquireClient(parentCache.getPipeline(blockID));
LogicalBlock block = getBlockFromContainer(blockID, client); LogicalBlock block = getBlockFromContainer(blockID, client);
return block; return block;
} catch (Exception ex) {
parentCache.getTargetMetrics().incNumFailedReadBlocks();
LOG.error("read failed for BlockId: {}", blockID, ex);
throw ex;
} finally { } finally {
if (client != null) { if (client != null) {
parentCache.getClientManager().releaseClient(client); parentCache.getClientManager().releaseClient(client);

View File

@ -518,9 +518,7 @@ public class TestLocalBlockCache {
// Create a new config so that this tests write metafile to new location // Create a new config so that this tests write metafile to new location
OzoneConfiguration flushTestConfig = new OzoneConfiguration(); OzoneConfiguration flushTestConfig = new OzoneConfiguration();
URL p = flushTestConfig.getClass().getResource(""); URL p = flushTestConfig.getClass().getResource("");
String path = p.getPath().concat( String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
TestOzoneContainer.class.getSimpleName()
+ "/testEmptyBlockBufferHandling");
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
@ -528,6 +526,8 @@ public class TestLocalBlockCache {
String volumeName = "volume" + RandomStringUtils.randomNumeric(4); String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
String userName = "user" + RandomStringUtils.randomNumeric(4); String userName = "user" + RandomStringUtils.randomNumeric(4);
String data = RandomStringUtils.random(4 * KB); String data = RandomStringUtils.random(4 * KB);
List<Pipeline> pipelines = getContainerPipeline(10);
CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
xceiverClientManager, metrics); xceiverClientManager, metrics);
@ -535,7 +535,7 @@ public class TestLocalBlockCache {
.setConfiguration(flushTestConfig) .setConfiguration(flushTestConfig)
.setVolumeName(volumeName) .setVolumeName(volumeName)
.setUserName(userName) .setUserName(userName)
.setPipelines(getContainerPipeline(10)) .setPipelines(pipelines)
.setClientManager(xceiverClientManager) .setClientManager(xceiverClientManager)
.setBlockSize(4 * KB) .setBlockSize(4 * KB)
.setVolumeSize(50 * GB) .setVolumeSize(50 * GB)
@ -565,9 +565,21 @@ public class TestLocalBlockCache {
ContainerCacheFlusher newFlusher = ContainerCacheFlusher newFlusher =
new ContainerCacheFlusher(flushTestConfig, new ContainerCacheFlusher(flushTestConfig,
xceiverClientManager, newMetrics); xceiverClientManager, newMetrics);
Thread fllushListenerThread = new Thread(newFlusher); CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
fllushListenerThread.setDaemon(true); .setConfiguration(flushTestConfig)
fllushListenerThread.start(); .setVolumeName(volumeName)
.setUserName(userName)
.setPipelines(pipelines)
.setClientManager(xceiverClientManager)
.setBlockSize(4 * KB)
.setVolumeSize(50 * GB)
.setFlusher(newFlusher)
.setCBlockTargetMetrics(newMetrics)
.build();
newCache.start();
Thread flushListenerThread = new Thread(newFlusher);
flushListenerThread.setDaemon(true);
flushListenerThread.start();
Thread.sleep(5000); Thread.sleep(5000);
Assert.assertEquals(metrics.getNumDirtyLogBlockUpdated(), Assert.assertEquals(metrics.getNumDirtyLogBlockUpdated(),
@ -575,9 +587,104 @@ public class TestLocalBlockCache {
Assert.assertEquals(newMetrics.getNumDirtyLogBlockRead() Assert.assertEquals(newMetrics.getNumDirtyLogBlockRead()
* (Long.SIZE/ Byte.SIZE), newMetrics.getNumBytesDirtyLogReads()); * (Long.SIZE/ Byte.SIZE), newMetrics.getNumBytesDirtyLogReads());
// Now shutdown again, nothing should be flushed // Now shutdown again, nothing should be flushed
newCache.close();
newFlusher.shutdown(); newFlusher.shutdown();
Assert.assertEquals(0, newMetrics.getNumDirtyLogBlockUpdated()); Assert.assertEquals(0, newMetrics.getNumDirtyLogBlockUpdated());
Assert.assertEquals(0, newMetrics.getNumBytesDirtyLogWritten()); Assert.assertEquals(0, newMetrics.getNumBytesDirtyLogWritten());
Assert.assertEquals(0, newMetrics.getNumFailedDirtyBlockFlushes()); Assert.assertEquals(0, newMetrics.getNumFailedDirtyBlockFlushes());
} }
/**
* This test writes some block to the cache and then shuts down the cache
* The cache is then restarted with "short.circuit.io" disable to check
* that the blocks are read correctly from the container.
*
* @throws IOException
*/
@Test
public void testContainerWrites() throws IOException,
InterruptedException, TimeoutException {
// Create a new config so that this tests write metafile to new location
OzoneConfiguration flushTestConfig = new OzoneConfiguration();
URL p = flushTestConfig.getClass().getResource("");
String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
XceiverClientManager xcm = new XceiverClientManager(flushTestConfig);
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
String userName = "user" + RandomStringUtils.randomNumeric(4);
int numUniqueBlocks = 4;
String[] data = new String[numUniqueBlocks];
String[] dataHash = new String[numUniqueBlocks];
for (int i = 0; i < numUniqueBlocks; i++) {
data[i] = RandomStringUtils.random(4 * KB);
dataHash[i] = DigestUtils.sha256Hex(data[i]);
}
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
xcm, metrics);
List<Pipeline> pipelines = getContainerPipeline(10);
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
.setConfiguration(flushTestConfig)
.setVolumeName(volumeName)
.setUserName(userName)
.setPipelines(pipelines)
.setClientManager(xcm)
.setBlockSize(4 * KB)
.setVolumeSize(50 * GB)
.setFlusher(flusher)
.setCBlockTargetMetrics(metrics)
.build();
cache.start();
Thread fllushListenerThread = new Thread(flusher);
fllushListenerThread.setDaemon(true);
fllushListenerThread.start();
Assert.assertTrue(cache.isShortCircuitIOEnabled());
// Write data to the cache
for (int i = 0; i < 512; i++) {
cache.put(i, data[i % numUniqueBlocks].getBytes(StandardCharsets.UTF_8));
}
// Close the cache and flush the data to the containers
cache.close();
Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
Assert.assertEquals(512, metrics.getNumWriteOps());
Thread.sleep(5000);
flusher.shutdown();
Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks());
Assert.assertEquals(0, metrics.getNumWriteGenericExceptionRetryBlocks());
// Now disable DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO and restart cache
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false);
CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
ContainerCacheFlusher newFlusher =
new ContainerCacheFlusher(flushTestConfig, xcm, newMetrics);
CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
.setConfiguration(flushTestConfig)
.setVolumeName(volumeName)
.setUserName(userName)
.setPipelines(pipelines)
.setClientManager(xcm)
.setBlockSize(4 * KB)
.setVolumeSize(50 * GB)
.setFlusher(newFlusher)
.setCBlockTargetMetrics(newMetrics)
.build();
newCache.start();
Assert.assertFalse(newCache.isShortCircuitIOEnabled());
// this read will be from the container, also match the hash
for (int i = 0; i < 512; i++) {
LogicalBlock block = newCache.get(i);
String readHash = DigestUtils.sha256Hex(block.getData().array());
Assert.assertEquals("File content does not match, for index:"
+ i, dataHash[i % numUniqueBlocks], readHash);
}
Assert.assertEquals(0, newMetrics.getNumReadLostBlocks());
Assert.assertEquals(0, newMetrics.getNumFailedReadBlocks());
newFlusher.shutdown();
newCache.close();
}
} }