HDFS-11635. Block Storage: Add metrics for Container Flushes. Contributed by Mukul Kumar Singh.
This commit is contained in:
parent
a2ff806d2e
commit
f4dec00896
|
@ -126,11 +126,11 @@ public final class CBlockConfigKeys {
|
||||||
NORM_PRIORITY;
|
NORM_PRIORITY;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Block Buffer size in 1024 entries, 128 means 128 * 1024 blockIDs.
|
* Block Buffer size in terms of blockID entries, 512 means 512 blockIDs.
|
||||||
*/
|
*/
|
||||||
public static final String DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE =
|
public static final String DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE =
|
||||||
"dfs.cblock.cache.block.buffer.size";
|
"dfs.cblock.cache.block.buffer.size";
|
||||||
public static final int DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT = 128;
|
public static final int DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT = 512;
|
||||||
|
|
||||||
private CBlockConfigKeys() {
|
private CBlockConfigKeys() {
|
||||||
|
|
||||||
|
|
|
@ -34,20 +34,22 @@ import org.apache.hadoop.metrics2.lib.MutableRate;
|
||||||
* as well as the latency time of read and write ops.
|
* as well as the latency time of read and write ops.
|
||||||
*/
|
*/
|
||||||
public class CBlockTargetMetrics {
|
public class CBlockTargetMetrics {
|
||||||
|
// Counter based Metrics
|
||||||
@Metric private MutableCounterLong numReadOps;
|
@Metric private MutableCounterLong numReadOps;
|
||||||
@Metric private MutableCounterLong numWriteOps;
|
@Metric private MutableCounterLong numWriteOps;
|
||||||
@Metric private MutableCounterLong numReadCacheHits;
|
@Metric private MutableCounterLong numReadCacheHits;
|
||||||
@Metric private MutableCounterLong numReadCacheMiss;
|
@Metric private MutableCounterLong numReadCacheMiss;
|
||||||
@Metric private MutableCounterLong numReadLostBlocks;
|
@Metric private MutableCounterLong numReadLostBlocks;
|
||||||
|
@Metric private MutableCounterLong numBlockBufferFlush;
|
||||||
@Metric private MutableCounterLong numDirectBlockWrites;
|
@Metric private MutableCounterLong numDirectBlockWrites;
|
||||||
@Metric private MutableCounterLong numFailedDirectBlockWrites;
|
@Metric private MutableCounterLong numFailedDirectBlockWrites;
|
||||||
|
|
||||||
|
// Latency based Metrics
|
||||||
@Metric private MutableRate dbReadLatency;
|
@Metric private MutableRate dbReadLatency;
|
||||||
@Metric private MutableRate containerReadLatency;
|
@Metric private MutableRate containerReadLatency;
|
||||||
|
|
||||||
@Metric private MutableRate dbWriteLatency;
|
@Metric private MutableRate dbWriteLatency;
|
||||||
@Metric private MutableRate containerWriteLatency;
|
@Metric private MutableRate containerWriteLatency;
|
||||||
|
@Metric private MutableRate blockBufferFlushLatency;
|
||||||
@Metric private MutableRate directBlockWriteLatency;
|
@Metric private MutableRate directBlockWriteLatency;
|
||||||
|
|
||||||
public CBlockTargetMetrics() {
|
public CBlockTargetMetrics() {
|
||||||
|
@ -88,6 +90,10 @@ public class CBlockTargetMetrics {
|
||||||
numFailedDirectBlockWrites.incr();
|
numFailedDirectBlockWrites.incr();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void incNumBlockBufferFlush() {
|
||||||
|
numBlockBufferFlush.incr();
|
||||||
|
}
|
||||||
|
|
||||||
public void updateDBReadLatency(long latency) {
|
public void updateDBReadLatency(long latency) {
|
||||||
dbReadLatency.add(latency);
|
dbReadLatency.add(latency);
|
||||||
}
|
}
|
||||||
|
@ -108,6 +114,10 @@ public class CBlockTargetMetrics {
|
||||||
directBlockWriteLatency.add(latency);
|
directBlockWriteLatency.add(latency);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void updateBlockBufferFlushLatency(long latency) {
|
||||||
|
blockBufferFlushLatency.add(latency);
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public long getNumReadOps() {
|
public long getNumReadOps() {
|
||||||
return numReadOps.value();
|
return numReadOps.value();
|
||||||
|
@ -142,4 +152,9 @@ public class CBlockTargetMetrics {
|
||||||
public long getNumFailedDirectBlockWrites() {
|
public long getNumFailedDirectBlockWrites() {
|
||||||
return numFailedDirectBlockWrites.value();
|
return numFailedDirectBlockWrites.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getNumBlockBufferFlush() {
|
||||||
|
return numBlockBufferFlush.value();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -116,7 +116,7 @@ public class ContainerCacheFlusher implements Runnable {
|
||||||
int threadPri = config.getInt(DFS_CBLOCK_CACHE_THREAD_PRIORITY,
|
int threadPri = config.getInt(DFS_CBLOCK_CACHE_THREAD_PRIORITY,
|
||||||
DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT);
|
DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT);
|
||||||
int blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE,
|
int blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE,
|
||||||
DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * 1024;
|
DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * (Long.SIZE / Byte.SIZE);
|
||||||
|
|
||||||
LOG.info("Cache: Core Pool Size: {}", corePoolSize);
|
LOG.info("Cache: Core Pool Size: {}", corePoolSize);
|
||||||
LOG.info("Cache: Keep Alive: {}", keepAlive);
|
LOG.info("Cache: Keep Alive: {}", keepAlive);
|
||||||
|
|
|
@ -96,7 +96,7 @@ public class AsyncBlockWriter {
|
||||||
Preconditions.checkNotNull(cache.getCacheDB(), "DB cannot be null.");
|
Preconditions.checkNotNull(cache.getCacheDB(), "DB cannot be null.");
|
||||||
localIoCount = new AtomicLong();
|
localIoCount = new AtomicLong();
|
||||||
blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE,
|
blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE,
|
||||||
DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * 1024;
|
DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * (Long.SIZE / Byte.SIZE);
|
||||||
LOG.info("Cache: Block Size: {}", blockBufferSize);
|
LOG.info("Cache: Block Size: {}", blockBufferSize);
|
||||||
lock = new ReentrantLock();
|
lock = new ReentrantLock();
|
||||||
notEmpty = lock.newCondition();
|
notEmpty = lock.newCondition();
|
||||||
|
@ -208,6 +208,9 @@ public class AsyncBlockWriter {
|
||||||
parentCache.getTracer().info(
|
parentCache.getTracer().info(
|
||||||
"Task=DirtyBlockLogWrite,Time={}", endTime - startTime);
|
"Task=DirtyBlockLogWrite,Time={}", endTime - startTime);
|
||||||
}
|
}
|
||||||
|
parentCache.getTargetMetrics().incNumBlockBufferFlush();
|
||||||
|
parentCache.getTargetMetrics()
|
||||||
|
.updateBlockBufferFlushLatency(endTime - startTime);
|
||||||
}
|
}
|
||||||
blockIDBuffer.putLong(block.getBlockID());
|
blockIDBuffer.putLong(block.getBlockID());
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,9 +55,16 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import static java.lang.Math.abs;
|
import static java.lang.Math.abs;
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_DISK_CACHE_PATH_KEY;
|
import static org.apache.hadoop.cblock.CBlockConfigKeys.
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO;
|
DFS_CBLOCK_DISK_CACHE_PATH_KEY;
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_TRACE_IO;
|
import static org.apache.hadoop.cblock.CBlockConfigKeys.
|
||||||
|
DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO;
|
||||||
|
import static org.apache.hadoop.cblock.CBlockConfigKeys.
|
||||||
|
DFS_CBLOCK_TRACE_IO;
|
||||||
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
|
.DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT;
|
||||||
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
|
.DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests for Tests for local cache.
|
* Tests for Tests for local cache.
|
||||||
|
@ -231,6 +238,10 @@ public class TestLocalBlockCache {
|
||||||
long endTime = Time.monotonicNow();
|
long endTime = Time.monotonicNow();
|
||||||
LOG.info("Time taken for writing {} blocks is {} seconds", totalBlocks,
|
LOG.info("Time taken for writing {} blocks is {} seconds", totalBlocks,
|
||||||
TimeUnit.MILLISECONDS.toSeconds(endTime - startTime));
|
TimeUnit.MILLISECONDS.toSeconds(endTime - startTime));
|
||||||
|
long blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE,
|
||||||
|
DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT);
|
||||||
|
Assert.assertEquals(metrics.getNumWriteOps() / blockBufferSize,
|
||||||
|
metrics.getNumBlockBufferFlush());
|
||||||
// TODO: Read this data back.
|
// TODO: Read this data back.
|
||||||
cache.close();
|
cache.close();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue