diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
index 74f5dc69f2c..c7222ca534d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
@@ -132,6 +132,11 @@ public final class CBlockConfigKeys {
"dfs.cblock.cache.block.buffer.size";
public static final int DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT = 512;
+ public static final String DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS =
+ "dfs.cblock.block.buffer.flush.interval.seconds";
+ public static final int
+ DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS_DEFAULT = 60;
+
// jscsi server settings
public static final String DFS_CBLOCK_JSCSI_SERVER_ADDRESS_KEY =
"dfs.cblock.jscsi.server.address";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java
index 1174c332df6..5efb4612651 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java
@@ -39,14 +39,15 @@ public class CBlockTargetMetrics {
@Metric private MutableCounterLong numWriteOps;
@Metric private MutableCounterLong numReadCacheHits;
@Metric private MutableCounterLong numReadCacheMiss;
+ @Metric private MutableCounterLong numDirectBlockWrites;
// Cblock internal Metrics
- @Metric private MutableCounterLong numDirectBlockWrites;
- @Metric private MutableCounterLong numBlockBufferFlush;
@Metric private MutableCounterLong numDirtyLogBlockRead;
- @Metric private MutableCounterLong numDirtyLogBlockUpdated;
@Metric private MutableCounterLong numBytesDirtyLogRead;
@Metric private MutableCounterLong numBytesDirtyLogWritten;
+ @Metric private MutableCounterLong numBlockBufferFlushCompleted;
+ @Metric private MutableCounterLong numBlockBufferFlushTriggered;
+ @Metric private MutableCounterLong numBlockBufferUpdates;
// Failure Metrics
@Metric private MutableCounterLong numReadLostBlocks;
@@ -54,7 +55,10 @@ public class CBlockTargetMetrics {
@Metric private MutableCounterLong numWriteIOExceptionRetryBlocks;
@Metric private MutableCounterLong numWriteGenericExceptionRetryBlocks;
@Metric private MutableCounterLong numFailedDirectBlockWrites;
- @Metric private MutableCounterLong numFailedDirtyBlockFlushes;
+ @Metric private MutableCounterLong numIllegalDirtyLogFiles;
+ @Metric private MutableCounterLong numFailedDirtyLogFileDeletes;
+ @Metric private MutableCounterLong numFailedBlockBufferFlushes;
+ @Metric private MutableCounterLong numInterruptedBufferWaits;
// Latency based Metrics
@Metric private MutableRate dbReadLatency;
@@ -114,8 +118,12 @@ public class CBlockTargetMetrics {
numFailedReadBlocks.incr();
}
- public void incNumBlockBufferFlush() {
- numBlockBufferFlush.incr();
+ public void incNumBlockBufferFlushCompleted() {
+ numBlockBufferFlushCompleted.incr();
+ }
+
+ public void incNumBlockBufferFlushTriggered() {
+ numBlockBufferFlushTriggered.incr();
}
public void incNumDirtyLogBlockRead() {
@@ -126,16 +134,28 @@ public class CBlockTargetMetrics {
numBytesDirtyLogRead.incr(bytes);
}
- public void incNumDirtyLogBlockUpdated() {
- numDirtyLogBlockUpdated.incr();
+ public void incNumBlockBufferUpdates() {
+ numBlockBufferUpdates.incr();
}
public void incNumBytesDirtyLogWritten(int bytes) {
numBytesDirtyLogWritten.incr(bytes);
}
- public void incNumFailedDirtyBlockFlushes() {
- numFailedDirtyBlockFlushes.incr();
+ public void incNumFailedBlockBufferFlushes() {
+ numFailedBlockBufferFlushes.incr();
+ }
+
+ public void incNumInterruptedBufferWaits() {
+ numInterruptedBufferWaits.incr();
+ }
+
+ public void incNumIllegalDirtyLogFiles() {
+ numIllegalDirtyLogFiles.incr();
+ }
+
+ public void incNumFailedDirtyLogFileDeletes() {
+ numFailedDirtyLogFileDeletes.incr();
}
public void updateDBReadLatency(long latency) {
@@ -213,8 +233,13 @@ public class CBlockTargetMetrics {
}
@VisibleForTesting
- public long getNumBlockBufferFlush() {
- return numBlockBufferFlush.value();
+ public long getNumBlockBufferFlushCompleted() {
+ return numBlockBufferFlushCompleted.value();
+ }
+
+ @VisibleForTesting
+ public long getNumBlockBufferFlushTriggered() {
+ return numBlockBufferFlushTriggered.value();
}
@VisibleForTesting
@@ -228,8 +253,8 @@ public class CBlockTargetMetrics {
}
@VisibleForTesting
- public long getNumDirtyLogBlockUpdated() {
- return numDirtyLogBlockUpdated.value();
+ public long getNumBlockBufferUpdates() {
+ return numBlockBufferUpdates.value();
}
@VisibleForTesting
@@ -238,7 +263,22 @@ public class CBlockTargetMetrics {
}
@VisibleForTesting
- public long getNumFailedDirtyBlockFlushes() {
- return numFailedDirtyBlockFlushes.value();
+ public long getNumFailedBlockBufferFlushes() {
+ return numFailedBlockBufferFlushes.value();
+ }
+
+ @VisibleForTesting
+ public long getNumInterruptedBufferWaits() {
+ return numInterruptedBufferWaits.value();
+ }
+
+ @VisibleForTesting
+ public long getNumIllegalDirtyLogFiles() {
+ return numIllegalDirtyLogFiles.value();
+ }
+
+ @VisibleForTesting
+ public long getNumFailedDirtyLogFileDeletes() {
+ return numFailedDirtyLogFileDeletes.value();
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java
index 905d9bade3d..c796f734f1a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java
@@ -363,6 +363,7 @@ public class ContainerCacheFlusher implements Runnable {
if (finishCountMap.containsKey(message.getFileName())) {
// In theory this should never happen. But if it happened,
// we need to know it...
+ getTargetMetrics().incNumIllegalDirtyLogFiles();
LOG.error("Adding DirtyLog file again {} current count {} new {}",
message.getFileName(),
finishCountMap.get(message.getFileName()).expectedCount,
@@ -516,6 +517,7 @@ public class ContainerCacheFlusher implements Runnable {
}*/
fileDeleted.set(true);
} catch (Exception e) {
+ flusher.getTargetMetrics().incNumFailedDirtyLogFileDeletes();
LOG.error("Error deleting dirty log file:" + filePath, e);
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
index 1273cd286c2..d8e98391fbc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
@@ -32,19 +32,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.Paths;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
-import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT;
-
/**
* A Queue that is used to write blocks asynchronously to the container.
*/
@@ -52,12 +45,6 @@ public class AsyncBlockWriter {
private static final Logger LOG =
LoggerFactory.getLogger(AsyncBlockWriter.class);
- /**
- * Right now we have a single buffer and we block when we write it to
- * the file.
- */
- private final ByteBuffer blockIDBuffer;
-
/**
* XceiverClientManager is used to get client connections to a set of
* machines.
@@ -80,8 +67,8 @@ public class AsyncBlockWriter {
* The cache this writer is operating against.
*/
private final CBlockLocalCache parentCache;
- private final int blockBufferSize;
- private final static String DIRTY_LOG_PREFIX = "DirtyLog";
+ private final BlockBufferManager blockBufferManager;
+ public final static String DIRTY_LOG_PREFIX = "DirtyLog";
private AtomicLong localIoCount;
/**
@@ -95,14 +82,11 @@ public class AsyncBlockWriter {
Preconditions.checkNotNull(cache, "Cache cannot be null.");
Preconditions.checkNotNull(cache.getCacheDB(), "DB cannot be null.");
localIoCount = new AtomicLong();
- blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE,
- DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * (Long.SIZE / Byte.SIZE);
- LOG.info("Cache: Block Size: {}", blockBufferSize);
lock = new ReentrantLock();
notEmpty = lock.newCondition();
parentCache = cache;
xceiverClientManager = cache.getClientManager();
- blockIDBuffer = ByteBuffer.allocateDirect(blockBufferSize);
+ blockBufferManager = new BlockBufferManager(config, parentCache);
}
public void start() throws IOException {
@@ -113,6 +97,7 @@ public class AsyncBlockWriter {
throw new IllegalStateException("Cache Directory create failed, Cannot " +
"continue. Log Dir: {}" + logDir);
}
+ blockBufferManager.start();
}
/**
@@ -179,11 +164,7 @@ public class AsyncBlockWriter {
block.getBlockID(), endTime - startTime, datahash);
}
block.clearData();
- parentCache.getTargetMetrics().incNumDirtyLogBlockUpdated();
- blockIDBuffer.putLong(block.getBlockID());
- if (blockIDBuffer.remaining() == 0) {
- writeBlockBufferToFile(blockIDBuffer);
- }
+ blockBufferManager.addToBlockBuffer(block.getBlockID());
} else {
Pipeline pipeline = parentCache.getPipeline(block.getBlockID());
String containerName = pipeline.getContainerName();
@@ -215,69 +196,11 @@ public class AsyncBlockWriter {
}
}
- /**
- * Write Block Buffer to file.
- *
- * @param blockBuffer - ByteBuffer
- * @throws IOException
- */
- private synchronized void writeBlockBufferToFile(ByteBuffer blockBuffer)
- throws IOException {
- long startTime = Time.monotonicNow();
- boolean append = false;
- int bytesWritten = 0;
-
- // If there is nothing written to blockId buffer,
- // then skip flushing of blockId buffer
- if (blockBuffer.position() == 0) {
- return;
- }
-
- blockBuffer.flip();
- String fileName =
- String.format("%s.%s", DIRTY_LOG_PREFIX, Time.monotonicNow());
- String log = Paths.get(parentCache.getDbPath().toString(), fileName)
- .toString();
-
- try {
- FileChannel channel = new FileOutputStream(log, append).getChannel();
- bytesWritten = channel.write(blockBuffer);
- } catch (Exception ex) {
- LOG.error("Unable to sync the Block map to disk -- This might cause a " +
- "data loss or corruption", ex);
- parentCache.getTargetMetrics().incNumFailedDirtyBlockFlushes();
- throw ex;
- } finally {
- blockBuffer.clear();
- }
-
- parentCache.processDirtyMessage(fileName);
- blockIDBuffer.clear();
- long endTime = Time.monotonicNow();
- if (parentCache.isTraceEnabled()) {
- parentCache.getTracer().info(
- "Task=DirtyBlockLogWrite,Time={} bytesWritten={}",
- endTime - startTime, bytesWritten);
- }
-
- parentCache.getTargetMetrics().incNumBytesDirtyLogWritten(bytesWritten);
- parentCache.getTargetMetrics().incNumBlockBufferFlush();
- parentCache.getTargetMetrics()
- .updateBlockBufferFlushLatency(endTime - startTime);
- LOG.debug("Block buffer writer bytesWritten:{} Time:{}",
- bytesWritten, endTime - startTime);
- }
-
/**
* Shutdown by writing any pending I/O to dirtylog buffer.
*/
public void shutdown() {
- try {
- writeBlockBufferToFile(this.blockIDBuffer);
- } catch (IOException e) {
- LOG.error("Unable to sync the Block map to disk -- This might cause a " +
- "data loss or corruption");
- }
+ blockBufferManager.shutdown();
}
/**
* Returns tracer.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferFlushTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferFlushTask.java
new file mode 100644
index 00000000000..c61a7a4c23d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferFlushTask.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.cblock.jscsiHelper.cache.impl;
+
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Paths;
+
+/**
+ * This task is responsible for flushing the BlockIDBuffer
+ * to Dirty Log File. This Dirty Log file is used later by
+ * ContainerCacheFlusher when the data is written to container
+ */
+public class BlockBufferFlushTask implements Runnable {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(BlockBufferFlushTask.class);
+ private final CBlockLocalCache parentCache;
+ private final BlockBufferManager bufferManager;
+ private final ByteBuffer blockIDBuffer;
+
+ BlockBufferFlushTask(ByteBuffer blockIDBuffer, CBlockLocalCache parentCache,
+ BlockBufferManager manager) {
+ this.parentCache = parentCache;
+ this.bufferManager = manager;
+ this.blockIDBuffer = blockIDBuffer;
+ }
+
+ /**
+ * When an object implementing interface Runnable
is used
+ * to create a thread, starting the thread causes the object's
+ * run
method to be called in that separately executing
+ * thread.
+ *
+ * The general contract of the method run
is that it may
+ * take any action whatsoever.
+ *
+ * @see Thread#run()
+ */
+ @Override
+ public void run() {
+ try {
+ writeBlockBufferToFile(blockIDBuffer);
+ } catch (Exception e) {
+ parentCache.getTargetMetrics().incNumFailedBlockBufferFlushes();
+ LOG.error("Unable to sync the Block map to disk with "
+ + (blockIDBuffer.position() / Long.SIZE) + "entries "
+ + "-- NOTE: This might cause a data loss or corruption", e);
+ } finally {
+ bufferManager.releaseBuffer(blockIDBuffer);
+ }
+ }
+
+ /**
+ * Write Block Buffer to file.
+ *
+ * @param buffer - ByteBuffer
+ * @throws IOException
+ */
+ private void writeBlockBufferToFile(ByteBuffer buffer)
+ throws IOException {
+ long startTime = Time.monotonicNow();
+ boolean append = false;
+
+ // If there is nothing written to blockId buffer,
+ // then skip flushing of blockId buffer
+ if (buffer.position() == 0) {
+ return;
+ }
+
+ buffer.flip();
+ String fileName =
+ String.format("%s.%s", AsyncBlockWriter.DIRTY_LOG_PREFIX,
+ Time.monotonicNow());
+ String log = Paths.get(parentCache.getDbPath().toString(), fileName)
+ .toString();
+
+ FileChannel channel = new FileOutputStream(log, append).getChannel();
+ int bytesWritten = channel.write(buffer);
+ channel.close();
+ buffer.clear();
+ parentCache.processDirtyMessage(fileName);
+ long endTime = Time.monotonicNow();
+ if (parentCache.isTraceEnabled()) {
+ parentCache.getTracer().info(
+ "Task=DirtyBlockLogWrite,Time={} bytesWritten={}",
+ endTime - startTime, bytesWritten);
+ }
+
+ parentCache.getTargetMetrics().incNumBlockBufferFlushCompleted();
+ parentCache.getTargetMetrics().incNumBytesDirtyLogWritten(bytesWritten);
+ parentCache.getTargetMetrics().
+ updateBlockBufferFlushLatency(endTime - startTime);
+ LOG.debug("Block buffer writer bytesWritten:{} Time:{}",
+ bytesWritten, endTime - startTime);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferManager.java
new file mode 100644
index 00000000000..7e3aed1f730
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferManager.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.cblock.jscsiHelper.cache.impl;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+ DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+ DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+ DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+ DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+ DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+ DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+ DFS_CBLOCK_CACHE_THREAD_PRIORITY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+ DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT;
+
+/**
+ * This class manages the block ID buffer.
+ * Block ID Buffer keeps a list of blocks which are in leveldb cache
+ * This buffer is used later when the blocks are flushed to container
+ *
+ * Two blockIDBuffers are maintained so that write are not blocked when
+ * DirtyLog is being written. Once a blockIDBuffer is full, it will be
+ * enqueued for DirtyLog write while the other buffer accepts new write.
+ * Once the DirtyLog write is done, the buffer is returned back to the pool.
+ *
+ * There are three triggers for blockIDBuffer flush
+ * 1) BlockIDBuffer is full,
+ * 2) Time period defined for blockIDBuffer flush has elapsed.
+ * 3) Shutdown
+ */
+public class BlockBufferManager {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(BlockBufferManager.class);
+
+ private enum FlushReason {
+ BUFFER_FULL,
+ SHUTDOWN,
+ TIMER
+ };
+
+ private final int blockBufferSize;
+ private final CBlockLocalCache parentCache;
+ private final ScheduledThreadPoolExecutor scheduledExecutor;
+ private final ThreadPoolExecutor threadPoolExecutor;
+ private final int intervalSeconds;
+ private final ArrayBlockingQueue acquireQueue;
+ private final ArrayBlockingQueue workQueue;
+ private ByteBuffer currentBuffer;
+
+ BlockBufferManager(Configuration config, CBlockLocalCache parentCache) {
+ this.parentCache = parentCache;
+ this.scheduledExecutor = new ScheduledThreadPoolExecutor(1);
+
+ this.intervalSeconds =
+ config.getInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS,
+ DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS_DEFAULT);
+
+ long keepAlive = config.getLong(DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS,
+ DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS_DEFAULT);
+ this.workQueue = new ArrayBlockingQueue<>(2, true);
+ int threadPri = config.getInt(DFS_CBLOCK_CACHE_THREAD_PRIORITY,
+ DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT);
+ ThreadFactory workerThreadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("Cache Block Buffer Manager Thread #%d")
+ .setDaemon(true)
+ .setPriority(threadPri)
+ .build();
+ /*
+ * starting a thread pool with core pool size of 1 and maximum of 2 threads
+ * as there are maximum of 2 buffers which can be flushed at the same time.
+ */
+ this.threadPoolExecutor = new ThreadPoolExecutor(1, 2,
+ keepAlive, TimeUnit.SECONDS, workQueue, workerThreadFactory,
+ new ThreadPoolExecutor.AbortPolicy());
+
+ this.blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE,
+ DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * (Long.SIZE / Byte.SIZE);
+ this.acquireQueue = new ArrayBlockingQueue<>(2, true);
+
+ for (int i = 0; i < 2; i++) {
+ acquireQueue.add(ByteBuffer.allocate(blockBufferSize));
+ }
+ // get the first buffer to be used
+ this.currentBuffer = acquireQueue.remove();
+
+ LOG.info("BufferManager: Buffer Size:{} FlushIntervalSeconds:{}",
+ blockBufferSize, intervalSeconds);
+ }
+
+ // triggerBlockBufferFlush enqueues current ByteBuffer for flush and returns.
+ // This enqueue is asynchronous and hence triggerBlockBufferFlush will
+ // only block when there are no available buffers in acquireQueue
+ // Once the DirtyLog write is done, buffer is returned back to
+ // BlockBufferManager using releaseBuffer
+ private synchronized void triggerBlockBufferFlush(FlushReason reason) {
+ LOG.debug("Flush triggered because: " + reason.toString() +
+ " Num entries in buffer: " +
+ currentBuffer.position() / (Long.SIZE / Byte.SIZE) +
+ " Acquire Queue Size: " + acquireQueue.size());
+
+ parentCache.getTargetMetrics().incNumBlockBufferFlushTriggered();
+ BlockBufferFlushTask flushTask =
+ new BlockBufferFlushTask(currentBuffer, parentCache, this);
+ threadPoolExecutor.submit(flushTask);
+ try {
+ currentBuffer = acquireQueue.take();
+ } catch (InterruptedException ex) {
+ currentBuffer = null;
+ parentCache.getTargetMetrics().incNumInterruptedBufferWaits();
+ LOG.error("wait on take operation on acquire queue interrupted", ex);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public synchronized void addToBlockBuffer(long blockId) {
+ parentCache.getTargetMetrics().incNumBlockBufferUpdates();
+ currentBuffer.putLong(blockId);
+ // if no space left, flush this buffer
+ if (currentBuffer.remaining() == 0) {
+ triggerBlockBufferFlush(FlushReason.BUFFER_FULL);
+ }
+ }
+
+ public void releaseBuffer(ByteBuffer buffer) {
+ if (buffer.position() != 0) {
+ LOG.error("requeuing a non empty buffer with:{}",
+ "elements enqueued in the acquire queue",
+ buffer.position() / (Long.SIZE / Byte.SIZE));
+ buffer.reset();
+ }
+ // There should always be space in the queue to add an element
+ acquireQueue.add(buffer);
+ }
+
+ // Start a scheduled task to flush blockIDBuffer
+ public void start() {
+ Runnable scheduledTask = () -> triggerBlockBufferFlush(FlushReason.TIMER);
+ scheduledExecutor.scheduleWithFixedDelay(scheduledTask, intervalSeconds,
+ intervalSeconds, TimeUnit.SECONDS);
+ threadPoolExecutor.prestartAllCoreThreads();
+ }
+
+ public void shutdown() {
+ triggerBlockBufferFlush(FlushReason.SHUTDOWN);
+ scheduledExecutor.shutdown();
+ threadPoolExecutor.shutdown();
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java
new file mode 100644
index 00000000000..b488cbd70d0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock;
+
+import com.google.common.primitives.Longs;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics;
+import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher;
+import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.XceiverClientSpi;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.TimeoutException;
+
+
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+ DFS_CBLOCK_DISK_CACHE_PATH_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+ DFS_CBLOCK_TRACE_IO;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+ DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+ DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS;
+
+/**
+ * Tests for Tests for local cache.
+ */
+public class TestBufferManager {
+ private final static long GB = 1024 * 1024 * 1024;
+ private final static int KB = 1024;
+ private static MiniOzoneCluster cluster;
+ private static OzoneConfiguration config;
+ private static StorageContainerLocationProtocolClientSideTranslatorPB
+ storageContainerLocationClient;
+ private static XceiverClientManager xceiverClientManager;
+
+ @BeforeClass
+ public static void init() throws IOException {
+ config = new OzoneConfiguration();
+ File p = GenericTestUtils.getTestDir();
+ String path = p.getPath().concat(
+ TestOzoneContainer.class.getSimpleName());
+ config.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+ config.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+ config.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+ cluster = new MiniOzoneCluster.Builder(config)
+ .numDataNodes(1).setHandlerType("distributed").build();
+ storageContainerLocationClient = cluster
+ .createStorageContainerLocationClient();
+ xceiverClientManager = new XceiverClientManager(config);
+ }
+
+ @AfterClass
+ public static void shutdown() throws InterruptedException {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ IOUtils.cleanup(null, storageContainerLocationClient, cluster);
+ }
+
+ /**
+ * createContainerAndGetPipeline creates a set of containers and returns the
+ * Pipelines that define those containers.
+ *
+ * @param count - Number of containers to create.
+ * @return - List of Pipelines.
+ * @throws IOException
+ */
+ private List createContainerAndGetPipeline(int count)
+ throws IOException {
+ List containerPipelines = new LinkedList<>();
+ for (int x = 0; x < count; x++) {
+ String traceID = "trace" + RandomStringUtils.randomNumeric(4);
+ String containerName = "container" + RandomStringUtils.randomNumeric(10);
+ Pipeline pipeline =
+ storageContainerLocationClient.allocateContainer(containerName);
+ XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
+ ContainerProtocolCalls.createContainer(client, traceID);
+ // This step is needed since we set private data on pipelines, when we
+ // read the list from CBlockServer. So we mimic that action here.
+ pipeline.setData(Longs.toByteArray(x));
+ containerPipelines.add(pipeline);
+ }
+ return containerPipelines;
+ }
+
+ /**
+ * This test writes some block to the cache and then shuts down the cache.
+ * The cache is then restarted to check that the
+ * correct number of blocks are read from Dirty Log
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testEmptyBlockBufferHandling() throws IOException,
+ InterruptedException, TimeoutException {
+ // Create a new config so that this tests write metafile to new location
+ OzoneConfiguration flushTestConfig = new OzoneConfiguration();
+ URL p = flushTestConfig.getClass().getResource("");
+ String path = p.getPath().concat(
+ TestOzoneContainer.class.getSimpleName() +
+ GenericTestUtils.getMethodName() +
+ RandomStringUtils.randomNumeric(4));
+ flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+ flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+ flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+
+ String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+ String userName = "user" + RandomStringUtils.randomNumeric(4);
+ String data = RandomStringUtils.random(4 * KB);
+ List pipelines = createContainerAndGetPipeline(10);
+ CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+ ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
+ xceiverClientManager, metrics);
+ CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+ .setConfiguration(flushTestConfig)
+ .setVolumeName(volumeName)
+ .setUserName(userName)
+ .setPipelines(pipelines)
+ .setClientManager(xceiverClientManager)
+ .setBlockSize(4 * KB)
+ .setVolumeSize(50 * GB)
+ .setFlusher(flusher)
+ .setCBlockTargetMetrics(metrics)
+ .build();
+ cache.start();
+ // Write data to the cache
+ cache.put(1, data.getBytes(StandardCharsets.UTF_8));
+ Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
+ Assert.assertEquals(1, metrics.getNumWriteOps());
+ cache.put(2, data.getBytes(StandardCharsets.UTF_8));
+ Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
+ Assert.assertEquals(2, metrics.getNumWriteOps());
+
+ // Store the previous block buffer position
+ Assert.assertEquals(2, metrics.getNumBlockBufferUpdates());
+ // Simulate a shutdown by closing the cache
+ cache.close();
+ Thread.sleep(1000);
+ Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered());
+ Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
+ Assert.assertEquals(2 * (Long.SIZE/ Byte.SIZE),
+ metrics.getNumBytesDirtyLogWritten());
+ Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes());
+ Assert.assertEquals(0, metrics.getNumInterruptedBufferWaits());
+
+ // Restart cache and check that right number of entries are read
+ CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
+ ContainerCacheFlusher newFlusher =
+ new ContainerCacheFlusher(flushTestConfig,
+ xceiverClientManager, newMetrics);
+ CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
+ .setConfiguration(flushTestConfig)
+ .setVolumeName(volumeName)
+ .setUserName(userName)
+ .setPipelines(pipelines)
+ .setClientManager(xceiverClientManager)
+ .setBlockSize(4 * KB)
+ .setVolumeSize(50 * GB)
+ .setFlusher(newFlusher)
+ .setCBlockTargetMetrics(newMetrics)
+ .build();
+ newCache.start();
+ Thread fllushListenerThread = new Thread(newFlusher);
+ fllushListenerThread.setDaemon(true);
+ fllushListenerThread.start();
+
+ Thread.sleep(5000);
+ Assert.assertEquals(metrics.getNumBlockBufferUpdates(),
+ newMetrics.getNumDirtyLogBlockRead());
+ Assert.assertEquals(newMetrics.getNumDirtyLogBlockRead()
+ * (Long.SIZE/ Byte.SIZE), newMetrics.getNumBytesDirtyLogReads());
+ // Now shutdown again, nothing should be flushed
+ newFlusher.shutdown();
+ Assert.assertEquals(0, newMetrics.getNumBlockBufferUpdates());
+ Assert.assertEquals(0, newMetrics.getNumBytesDirtyLogWritten());
+ }
+
+ @Test
+ public void testPeriodicFlush() throws IOException,
+ InterruptedException, TimeoutException{
+ // Create a new config so that this tests write metafile to new location
+ OzoneConfiguration flushTestConfig = new OzoneConfiguration();
+ URL p = flushTestConfig.getClass().getResource("");
+ String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
+ flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+ flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+ flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+ flushTestConfig
+ .setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 5);
+
+ String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+ String userName = "user" + RandomStringUtils.randomNumeric(4);
+ CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+ ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
+ xceiverClientManager, metrics);
+ CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+ .setConfiguration(flushTestConfig)
+ .setVolumeName(volumeName)
+ .setUserName(userName)
+ .setPipelines(createContainerAndGetPipeline(10))
+ .setClientManager(xceiverClientManager)
+ .setBlockSize(4 * KB)
+ .setVolumeSize(50 * GB)
+ .setFlusher(flusher)
+ .setCBlockTargetMetrics(metrics)
+ .build();
+ cache.start();
+ Thread.sleep(8000);
+ // Ticks will be at 5s, 10s and so on, so this count should be 1
+ Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered());
+ // Nothing pushed to cache, so nothing should be written
+ Assert.assertEquals(0, metrics.getNumBytesDirtyLogWritten());
+ Assert.assertEquals(0, metrics.getNumBlockBufferFlushCompleted());
+ cache.close();
+ // After close, another trigger should happen but still no data written
+ Assert.assertEquals(2, metrics.getNumBlockBufferFlushTriggered());
+ Assert.assertEquals(0, metrics.getNumBytesDirtyLogWritten());
+ Assert.assertEquals(0, metrics.getNumBlockBufferFlushCompleted());
+ Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes());
+ }
+
+ @Test
+ public void testSingleBufferFlush() throws IOException,
+ InterruptedException, TimeoutException {
+ // Create a new config so that this tests write metafile to new location
+ OzoneConfiguration flushTestConfig = new OzoneConfiguration();
+ URL p = flushTestConfig.getClass().getResource("");
+ String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
+ flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+ flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+ flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+
+ String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+ String userName = "user" + RandomStringUtils.randomNumeric(4);
+ String data = RandomStringUtils.random(4 * KB);
+ CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+ ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
+ xceiverClientManager, metrics);
+ CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+ .setConfiguration(flushTestConfig)
+ .setVolumeName(volumeName)
+ .setUserName(userName)
+ .setPipelines(createContainerAndGetPipeline(10))
+ .setClientManager(xceiverClientManager)
+ .setBlockSize(4 * KB)
+ .setVolumeSize(50 * GB)
+ .setFlusher(flusher)
+ .setCBlockTargetMetrics(metrics)
+ .build();
+ cache.start();
+
+ for (int i = 0; i < 511; i++) {
+ cache.put(i, data.getBytes(StandardCharsets.UTF_8));
+ }
+ // After writing 511 block no flush should happen
+ Assert.assertEquals(0, metrics.getNumBlockBufferFlushTriggered());
+ Assert.assertEquals(0, metrics.getNumBlockBufferFlushCompleted());
+
+
+ // After one more block it should
+ cache.put(512, data.getBytes(StandardCharsets.UTF_8));
+ Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered());
+ Thread.sleep(1000);
+ Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
+ cache.close();
+ Assert.assertEquals(512 * (Long.SIZE / Byte.SIZE),
+ metrics.getNumBytesDirtyLogWritten());
+ }
+
+ @Test
+ public void testMultipleBuffersFlush() throws IOException,
+ InterruptedException, TimeoutException {
+ // Create a new config so that this tests write metafile to new location
+ OzoneConfiguration flushTestConfig = new OzoneConfiguration();
+ URL p = flushTestConfig.getClass().getResource("");
+ String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
+ flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+ flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+ flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+
+ String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+ String userName = "user" + RandomStringUtils.randomNumeric(4);
+ String data = RandomStringUtils.random(4 * KB);
+ CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+ ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
+ xceiverClientManager, metrics);
+ CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+ .setConfiguration(flushTestConfig)
+ .setVolumeName(volumeName)
+ .setUserName(userName)
+ .setPipelines(createContainerAndGetPipeline(10))
+ .setClientManager(xceiverClientManager)
+ .setBlockSize(4 * KB)
+ .setVolumeSize(50 * GB)
+ .setFlusher(flusher)
+ .setCBlockTargetMetrics(metrics)
+ .build();
+ cache.start();
+
+ for (int i = 0; i < 4; i++) {
+ for (int j = 0; j < 512; j++) {
+ cache.put(i * 512 + j, data.getBytes(StandardCharsets.UTF_8));
+ }
+ // Flush should be triggered after every 512 block write
+ Assert.assertEquals(i + 1, metrics.getNumBlockBufferFlushTriggered());
+ }
+ Assert.assertEquals(0, metrics.getNumIllegalDirtyLogFiles());
+ Assert.assertEquals(0, metrics.getNumFailedDirtyLogFileDeletes());
+ cache.close();
+ Assert.assertEquals(4 * 512 * (Long.SIZE / Byte.SIZE),
+ metrics.getNumBytesDirtyLogWritten());
+ Assert.assertEquals(5, metrics.getNumBlockBufferFlushTriggered());
+ Assert.assertEquals(4, metrics.getNumBlockBufferFlushCompleted());
+ }
+
+ @Test
+ public void testSingleBlockFlush() throws IOException,
+ InterruptedException, TimeoutException{
+ // Create a new config so that this tests write metafile to new location
+ OzoneConfiguration flushTestConfig = new OzoneConfiguration();
+ URL p = flushTestConfig.getClass().getResource("");
+ String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
+ flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+ flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+ flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+ flushTestConfig
+ .setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 5);
+
+ String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+ String userName = "user" + RandomStringUtils.randomNumeric(4);
+ String data = RandomStringUtils.random(4 * KB);
+ CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+ ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
+ xceiverClientManager, metrics);
+ CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+ .setConfiguration(flushTestConfig)
+ .setVolumeName(volumeName)
+ .setUserName(userName)
+ .setPipelines(createContainerAndGetPipeline(10))
+ .setClientManager(xceiverClientManager)
+ .setBlockSize(4 * KB)
+ .setVolumeSize(50 * GB)
+ .setFlusher(flusher)
+ .setCBlockTargetMetrics(metrics)
+ .build();
+ cache.start();
+ cache.put(0, data.getBytes(StandardCharsets.UTF_8));
+ Thread.sleep(8000);
+ // Ticks will be at 5s, 10s and so on, so this count should be 1
+ Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered());
+ // 1 block written to cache, which should be flushed
+ Assert.assertEquals(8, metrics.getNumBytesDirtyLogWritten());
+ Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
+ cache.close();
+ // After close, another trigger should happen but no data should be written
+ Assert.assertEquals(2, metrics.getNumBlockBufferFlushTriggered());
+ Assert.assertEquals(8, metrics.getNumBytesDirtyLogWritten());
+ Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
+ Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes());
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
index f15a5ed3fcc..2dd72b3b181 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
@@ -62,10 +62,6 @@ import static org.apache.hadoop.cblock.CBlockConfigKeys.
DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO;
import static org.apache.hadoop.cblock.CBlockConfigKeys.
DFS_CBLOCK_TRACE_IO;
-import static org.apache.hadoop.cblock.CBlockConfigKeys
- .DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT;
-import static org.apache.hadoop.cblock.CBlockConfigKeys
- .DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
/**
* Tests for Tests for local cache.
@@ -238,15 +234,12 @@ public class TestLocalBlockCache {
cache.put(blockid, data.getBytes(StandardCharsets.UTF_8));
}
Assert.assertEquals(totalBlocks, metrics.getNumWriteOps());
+ Assert.assertEquals(totalBlocks, metrics.getNumBlockBufferUpdates());
LOG.info("Wrote 50K blocks, waiting for replication to finish.");
GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
long endTime = Time.monotonicNow();
LOG.info("Time taken for writing {} blocks is {} seconds", totalBlocks,
TimeUnit.MILLISECONDS.toSeconds(endTime - startTime));
- long blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE,
- DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT);
- Assert.assertEquals(metrics.getNumWriteOps() / blockBufferSize,
- metrics.getNumBlockBufferFlush());
// TODO: Read this data back.
cache.close();
}
@@ -278,6 +271,7 @@ public class TestLocalBlockCache {
Assert.assertEquals(1, metrics.getNumReadOps());
Assert.assertEquals(1, metrics.getNumReadLostBlocks());
Assert.assertEquals(1, metrics.getNumReadCacheMiss());
+ cache.close();
}
@Test
@@ -507,95 +501,6 @@ public class TestLocalBlockCache {
cache.close();
}
- /**
- * This test writes some block to the cache and then shuts down the cache.
- * The cache is then restarted to check that the
- * correct number of blocks are read from Dirty Log
- *
- * @throws IOException
- */
- @Test
- public void testEmptyBlockBufferHandling() throws IOException,
- InterruptedException, TimeoutException {
- // Create a new config so that this tests write metafile to new location
- OzoneConfiguration flushTestConfig = new OzoneConfiguration();
- URL p = flushTestConfig.getClass().getResource("");
- String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
- flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
- flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
- flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
-
- String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
- String userName = "user" + RandomStringUtils.randomNumeric(4);
- String data = RandomStringUtils.random(4 * KB);
- List pipelines = getContainerPipeline(10);
-
- CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
- ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
- xceiverClientManager, metrics);
- CBlockLocalCache cache = CBlockLocalCache.newBuilder()
- .setConfiguration(flushTestConfig)
- .setVolumeName(volumeName)
- .setUserName(userName)
- .setPipelines(pipelines)
- .setClientManager(xceiverClientManager)
- .setBlockSize(4 * KB)
- .setVolumeSize(50 * GB)
- .setFlusher(flusher)
- .setCBlockTargetMetrics(metrics)
- .build();
- cache.start();
- // Write data to the cache
- cache.put(1, data.getBytes(StandardCharsets.UTF_8));
- Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
- Assert.assertEquals(1, metrics.getNumWriteOps());
- cache.put(2, data.getBytes(StandardCharsets.UTF_8));
- Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
- Assert.assertEquals(2, metrics.getNumWriteOps());
-
- // Store the previous block buffer position
- Assert.assertEquals(2, metrics.getNumDirtyLogBlockUpdated());
- // Simulate a shutdown by closing the cache
- GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
- cache.close();
- Assert.assertEquals(2 * (Long.SIZE/ Byte.SIZE),
- metrics.getNumBytesDirtyLogWritten());
- Assert.assertEquals(0, metrics.getNumFailedDirtyBlockFlushes());
-
- // Restart cache and check that right number of entries are read
- CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
- ContainerCacheFlusher newFlusher =
- new ContainerCacheFlusher(flushTestConfig,
- xceiverClientManager, newMetrics);
- CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
- .setConfiguration(flushTestConfig)
- .setVolumeName(volumeName)
- .setUserName(userName)
- .setPipelines(pipelines)
- .setClientManager(xceiverClientManager)
- .setBlockSize(4 * KB)
- .setVolumeSize(50 * GB)
- .setFlusher(newFlusher)
- .setCBlockTargetMetrics(newMetrics)
- .build();
- newCache.start();
- Thread flushListenerThread = new Thread(newFlusher);
- flushListenerThread.setDaemon(true);
- flushListenerThread.start();
-
- Thread.sleep(5000);
- Assert.assertEquals(metrics.getNumDirtyLogBlockUpdated(),
- newMetrics.getNumDirtyLogBlockRead());
- Assert.assertEquals(newMetrics.getNumDirtyLogBlockRead()
- * (Long.SIZE/ Byte.SIZE), newMetrics.getNumBytesDirtyLogReads());
- // Now shutdown again, nothing should be flushed
- newCache.close();
- newFlusher.shutdown();
- Assert.assertEquals(0, newMetrics.getNumDirtyLogBlockUpdated());
- Assert.assertEquals(0, newMetrics.getNumBytesDirtyLogWritten());
- Assert.assertEquals(0, newMetrics.getNumFailedDirtyBlockFlushes());
- }
-
/**
* This test writes some block to the cache and then shuts down the cache
* The cache is then restarted with "short.circuit.io" disable to check
@@ -642,9 +547,9 @@ public class TestLocalBlockCache {
.setCBlockTargetMetrics(metrics)
.build();
cache.start();
- Thread fllushListenerThread = new Thread(flusher);
- fllushListenerThread.setDaemon(true);
- fllushListenerThread.start();
+ Thread flushListenerThread = new Thread(flusher);
+ flushListenerThread.setDaemon(true);
+ flushListenerThread.start();
Assert.assertTrue(cache.isShortCircuitIOEnabled());
// Write data to the cache
for (int i = 0; i < 512; i++) {
@@ -686,7 +591,7 @@ public class TestLocalBlockCache {
}
Assert.assertEquals(0, newMetrics.getNumReadLostBlocks());
Assert.assertEquals(0, newMetrics.getNumFailedReadBlocks());
- newFlusher.shutdown();
newCache.close();
+ newFlusher.shutdown();
}
}