HDFS-11694. Block Storage: Add Support for 2 BlockIDBuffers and also for periodic flush of BlockIDBuffer. Contributed by Mukul Kumar Singh

This commit is contained in:
Chen Liang 2017-05-15 10:35:44 -07:00
parent 055e556e67
commit e40e09540b
8 changed files with 769 additions and 200 deletions

View File

@ -132,6 +132,11 @@ public final class CBlockConfigKeys {
"dfs.cblock.cache.block.buffer.size";
public static final int DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT = 512;
public static final String DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS =
"dfs.cblock.block.buffer.flush.interval.seconds";
public static final int
DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS_DEFAULT = 60;
// jscsi server settings
public static final String DFS_CBLOCK_JSCSI_SERVER_ADDRESS_KEY =
"dfs.cblock.jscsi.server.address";

View File

@ -39,14 +39,15 @@ public class CBlockTargetMetrics {
@Metric private MutableCounterLong numWriteOps;
@Metric private MutableCounterLong numReadCacheHits;
@Metric private MutableCounterLong numReadCacheMiss;
@Metric private MutableCounterLong numDirectBlockWrites;
// Cblock internal Metrics
@Metric private MutableCounterLong numDirectBlockWrites;
@Metric private MutableCounterLong numBlockBufferFlush;
@Metric private MutableCounterLong numDirtyLogBlockRead;
@Metric private MutableCounterLong numDirtyLogBlockUpdated;
@Metric private MutableCounterLong numBytesDirtyLogRead;
@Metric private MutableCounterLong numBytesDirtyLogWritten;
@Metric private MutableCounterLong numBlockBufferFlushCompleted;
@Metric private MutableCounterLong numBlockBufferFlushTriggered;
@Metric private MutableCounterLong numBlockBufferUpdates;
// Failure Metrics
@Metric private MutableCounterLong numReadLostBlocks;
@ -54,7 +55,10 @@ public class CBlockTargetMetrics {
@Metric private MutableCounterLong numWriteIOExceptionRetryBlocks;
@Metric private MutableCounterLong numWriteGenericExceptionRetryBlocks;
@Metric private MutableCounterLong numFailedDirectBlockWrites;
@Metric private MutableCounterLong numFailedDirtyBlockFlushes;
@Metric private MutableCounterLong numIllegalDirtyLogFiles;
@Metric private MutableCounterLong numFailedDirtyLogFileDeletes;
@Metric private MutableCounterLong numFailedBlockBufferFlushes;
@Metric private MutableCounterLong numInterruptedBufferWaits;
// Latency based Metrics
@Metric private MutableRate dbReadLatency;
@ -114,8 +118,12 @@ public class CBlockTargetMetrics {
numFailedReadBlocks.incr();
}
public void incNumBlockBufferFlush() {
numBlockBufferFlush.incr();
public void incNumBlockBufferFlushCompleted() {
numBlockBufferFlushCompleted.incr();
}
public void incNumBlockBufferFlushTriggered() {
numBlockBufferFlushTriggered.incr();
}
public void incNumDirtyLogBlockRead() {
@ -126,16 +134,28 @@ public class CBlockTargetMetrics {
numBytesDirtyLogRead.incr(bytes);
}
public void incNumDirtyLogBlockUpdated() {
numDirtyLogBlockUpdated.incr();
public void incNumBlockBufferUpdates() {
numBlockBufferUpdates.incr();
}
public void incNumBytesDirtyLogWritten(int bytes) {
numBytesDirtyLogWritten.incr(bytes);
}
public void incNumFailedDirtyBlockFlushes() {
numFailedDirtyBlockFlushes.incr();
public void incNumFailedBlockBufferFlushes() {
numFailedBlockBufferFlushes.incr();
}
public void incNumInterruptedBufferWaits() {
numInterruptedBufferWaits.incr();
}
public void incNumIllegalDirtyLogFiles() {
numIllegalDirtyLogFiles.incr();
}
public void incNumFailedDirtyLogFileDeletes() {
numFailedDirtyLogFileDeletes.incr();
}
public void updateDBReadLatency(long latency) {
@ -213,8 +233,13 @@ public class CBlockTargetMetrics {
}
@VisibleForTesting
public long getNumBlockBufferFlush() {
return numBlockBufferFlush.value();
public long getNumBlockBufferFlushCompleted() {
return numBlockBufferFlushCompleted.value();
}
@VisibleForTesting
public long getNumBlockBufferFlushTriggered() {
return numBlockBufferFlushTriggered.value();
}
@VisibleForTesting
@ -228,8 +253,8 @@ public class CBlockTargetMetrics {
}
@VisibleForTesting
public long getNumDirtyLogBlockUpdated() {
return numDirtyLogBlockUpdated.value();
public long getNumBlockBufferUpdates() {
return numBlockBufferUpdates.value();
}
@VisibleForTesting
@ -238,7 +263,22 @@ public class CBlockTargetMetrics {
}
@VisibleForTesting
public long getNumFailedDirtyBlockFlushes() {
return numFailedDirtyBlockFlushes.value();
public long getNumFailedBlockBufferFlushes() {
return numFailedBlockBufferFlushes.value();
}
@VisibleForTesting
public long getNumInterruptedBufferWaits() {
return numInterruptedBufferWaits.value();
}
@VisibleForTesting
public long getNumIllegalDirtyLogFiles() {
return numIllegalDirtyLogFiles.value();
}
@VisibleForTesting
public long getNumFailedDirtyLogFileDeletes() {
return numFailedDirtyLogFileDeletes.value();
}
}

View File

@ -363,6 +363,7 @@ public class ContainerCacheFlusher implements Runnable {
if (finishCountMap.containsKey(message.getFileName())) {
// In theory this should never happen. But if it happened,
// we need to know it...
getTargetMetrics().incNumIllegalDirtyLogFiles();
LOG.error("Adding DirtyLog file again {} current count {} new {}",
message.getFileName(),
finishCountMap.get(message.getFileName()).expectedCount,
@ -516,6 +517,7 @@ public class ContainerCacheFlusher implements Runnable {
}*/
fileDeleted.set(true);
} catch (Exception e) {
flusher.getTargetMetrics().incNumFailedDirtyLogFileDeletes();
LOG.error("Error deleting dirty log file:" + filePath, e);
}
}

View File

@ -32,19 +32,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT;
/**
* A Queue that is used to write blocks asynchronously to the container.
*/
@ -52,12 +45,6 @@ public class AsyncBlockWriter {
private static final Logger LOG =
LoggerFactory.getLogger(AsyncBlockWriter.class);
/**
* Right now we have a single buffer and we block when we write it to
* the file.
*/
private final ByteBuffer blockIDBuffer;
/**
* XceiverClientManager is used to get client connections to a set of
* machines.
@ -80,8 +67,8 @@ public class AsyncBlockWriter {
* The cache this writer is operating against.
*/
private final CBlockLocalCache parentCache;
private final int blockBufferSize;
private final static String DIRTY_LOG_PREFIX = "DirtyLog";
private final BlockBufferManager blockBufferManager;
public final static String DIRTY_LOG_PREFIX = "DirtyLog";
private AtomicLong localIoCount;
/**
@ -95,14 +82,11 @@ public class AsyncBlockWriter {
Preconditions.checkNotNull(cache, "Cache cannot be null.");
Preconditions.checkNotNull(cache.getCacheDB(), "DB cannot be null.");
localIoCount = new AtomicLong();
blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE,
DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * (Long.SIZE / Byte.SIZE);
LOG.info("Cache: Block Size: {}", blockBufferSize);
lock = new ReentrantLock();
notEmpty = lock.newCondition();
parentCache = cache;
xceiverClientManager = cache.getClientManager();
blockIDBuffer = ByteBuffer.allocateDirect(blockBufferSize);
blockBufferManager = new BlockBufferManager(config, parentCache);
}
public void start() throws IOException {
@ -113,6 +97,7 @@ public class AsyncBlockWriter {
throw new IllegalStateException("Cache Directory create failed, Cannot " +
"continue. Log Dir: {}" + logDir);
}
blockBufferManager.start();
}
/**
@ -179,11 +164,7 @@ public class AsyncBlockWriter {
block.getBlockID(), endTime - startTime, datahash);
}
block.clearData();
parentCache.getTargetMetrics().incNumDirtyLogBlockUpdated();
blockIDBuffer.putLong(block.getBlockID());
if (blockIDBuffer.remaining() == 0) {
writeBlockBufferToFile(blockIDBuffer);
}
blockBufferManager.addToBlockBuffer(block.getBlockID());
} else {
Pipeline pipeline = parentCache.getPipeline(block.getBlockID());
String containerName = pipeline.getContainerName();
@ -215,69 +196,11 @@ public class AsyncBlockWriter {
}
}
/**
* Write Block Buffer to file.
*
* @param blockBuffer - ByteBuffer
* @throws IOException
*/
private synchronized void writeBlockBufferToFile(ByteBuffer blockBuffer)
throws IOException {
long startTime = Time.monotonicNow();
boolean append = false;
int bytesWritten = 0;
// If there is nothing written to blockId buffer,
// then skip flushing of blockId buffer
if (blockBuffer.position() == 0) {
return;
}
blockBuffer.flip();
String fileName =
String.format("%s.%s", DIRTY_LOG_PREFIX, Time.monotonicNow());
String log = Paths.get(parentCache.getDbPath().toString(), fileName)
.toString();
try {
FileChannel channel = new FileOutputStream(log, append).getChannel();
bytesWritten = channel.write(blockBuffer);
} catch (Exception ex) {
LOG.error("Unable to sync the Block map to disk -- This might cause a " +
"data loss or corruption", ex);
parentCache.getTargetMetrics().incNumFailedDirtyBlockFlushes();
throw ex;
} finally {
blockBuffer.clear();
}
parentCache.processDirtyMessage(fileName);
blockIDBuffer.clear();
long endTime = Time.monotonicNow();
if (parentCache.isTraceEnabled()) {
parentCache.getTracer().info(
"Task=DirtyBlockLogWrite,Time={} bytesWritten={}",
endTime - startTime, bytesWritten);
}
parentCache.getTargetMetrics().incNumBytesDirtyLogWritten(bytesWritten);
parentCache.getTargetMetrics().incNumBlockBufferFlush();
parentCache.getTargetMetrics()
.updateBlockBufferFlushLatency(endTime - startTime);
LOG.debug("Block buffer writer bytesWritten:{} Time:{}",
bytesWritten, endTime - startTime);
}
/**
* Shutdown by writing any pending I/O to dirtylog buffer.
*/
public void shutdown() {
try {
writeBlockBufferToFile(this.blockIDBuffer);
} catch (IOException e) {
LOG.error("Unable to sync the Block map to disk -- This might cause a " +
"data loss or corruption");
}
blockBufferManager.shutdown();
}
/**
* Returns tracer.

View File

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.jscsiHelper.cache.impl;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
/**
* This task is responsible for flushing the BlockIDBuffer
* to Dirty Log File. This Dirty Log file is used later by
* ContainerCacheFlusher when the data is written to container
*/
public class BlockBufferFlushTask implements Runnable {
private static final Logger LOG =
LoggerFactory.getLogger(BlockBufferFlushTask.class);
private final CBlockLocalCache parentCache;
private final BlockBufferManager bufferManager;
private final ByteBuffer blockIDBuffer;
BlockBufferFlushTask(ByteBuffer blockIDBuffer, CBlockLocalCache parentCache,
BlockBufferManager manager) {
this.parentCache = parentCache;
this.bufferManager = manager;
this.blockIDBuffer = blockIDBuffer;
}
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see Thread#run()
*/
@Override
public void run() {
try {
writeBlockBufferToFile(blockIDBuffer);
} catch (Exception e) {
parentCache.getTargetMetrics().incNumFailedBlockBufferFlushes();
LOG.error("Unable to sync the Block map to disk with "
+ (blockIDBuffer.position() / Long.SIZE) + "entries "
+ "-- NOTE: This might cause a data loss or corruption", e);
} finally {
bufferManager.releaseBuffer(blockIDBuffer);
}
}
/**
* Write Block Buffer to file.
*
* @param buffer - ByteBuffer
* @throws IOException
*/
private void writeBlockBufferToFile(ByteBuffer buffer)
throws IOException {
long startTime = Time.monotonicNow();
boolean append = false;
// If there is nothing written to blockId buffer,
// then skip flushing of blockId buffer
if (buffer.position() == 0) {
return;
}
buffer.flip();
String fileName =
String.format("%s.%s", AsyncBlockWriter.DIRTY_LOG_PREFIX,
Time.monotonicNow());
String log = Paths.get(parentCache.getDbPath().toString(), fileName)
.toString();
FileChannel channel = new FileOutputStream(log, append).getChannel();
int bytesWritten = channel.write(buffer);
channel.close();
buffer.clear();
parentCache.processDirtyMessage(fileName);
long endTime = Time.monotonicNow();
if (parentCache.isTraceEnabled()) {
parentCache.getTracer().info(
"Task=DirtyBlockLogWrite,Time={} bytesWritten={}",
endTime - startTime, bytesWritten);
}
parentCache.getTargetMetrics().incNumBlockBufferFlushCompleted();
parentCache.getTargetMetrics().incNumBytesDirtyLogWritten(bytesWritten);
parentCache.getTargetMetrics().
updateBlockBufferFlushLatency(endTime - startTime);
LOG.debug("Block buffer writer bytesWritten:{} Time:{}",
bytesWritten, endTime - startTime);
}
}

View File

@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.jscsiHelper.cache.impl;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import static org.apache.hadoop.cblock.CBlockConfigKeys.
DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS;
import static org.apache.hadoop.cblock.CBlockConfigKeys.
DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys.
DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
import static org.apache.hadoop.cblock.CBlockConfigKeys.
DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys.
DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS;
import static org.apache.hadoop.cblock.CBlockConfigKeys.
DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys.
DFS_CBLOCK_CACHE_THREAD_PRIORITY;
import static org.apache.hadoop.cblock.CBlockConfigKeys.
DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT;
/**
* This class manages the block ID buffer.
* Block ID Buffer keeps a list of blocks which are in leveldb cache
* This buffer is used later when the blocks are flushed to container
*
* Two blockIDBuffers are maintained so that write are not blocked when
* DirtyLog is being written. Once a blockIDBuffer is full, it will be
* enqueued for DirtyLog write while the other buffer accepts new write.
* Once the DirtyLog write is done, the buffer is returned back to the pool.
*
* There are three triggers for blockIDBuffer flush
* 1) BlockIDBuffer is full,
* 2) Time period defined for blockIDBuffer flush has elapsed.
* 3) Shutdown
*/
public class BlockBufferManager {
private static final Logger LOG =
LoggerFactory.getLogger(BlockBufferManager.class);
private enum FlushReason {
BUFFER_FULL,
SHUTDOWN,
TIMER
};
private final int blockBufferSize;
private final CBlockLocalCache parentCache;
private final ScheduledThreadPoolExecutor scheduledExecutor;
private final ThreadPoolExecutor threadPoolExecutor;
private final int intervalSeconds;
private final ArrayBlockingQueue<ByteBuffer> acquireQueue;
private final ArrayBlockingQueue<Runnable> workQueue;
private ByteBuffer currentBuffer;
BlockBufferManager(Configuration config, CBlockLocalCache parentCache) {
this.parentCache = parentCache;
this.scheduledExecutor = new ScheduledThreadPoolExecutor(1);
this.intervalSeconds =
config.getInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS,
DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS_DEFAULT);
long keepAlive = config.getLong(DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS,
DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS_DEFAULT);
this.workQueue = new ArrayBlockingQueue<>(2, true);
int threadPri = config.getInt(DFS_CBLOCK_CACHE_THREAD_PRIORITY,
DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT);
ThreadFactory workerThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("Cache Block Buffer Manager Thread #%d")
.setDaemon(true)
.setPriority(threadPri)
.build();
/*
* starting a thread pool with core pool size of 1 and maximum of 2 threads
* as there are maximum of 2 buffers which can be flushed at the same time.
*/
this.threadPoolExecutor = new ThreadPoolExecutor(1, 2,
keepAlive, TimeUnit.SECONDS, workQueue, workerThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
this.blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE,
DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * (Long.SIZE / Byte.SIZE);
this.acquireQueue = new ArrayBlockingQueue<>(2, true);
for (int i = 0; i < 2; i++) {
acquireQueue.add(ByteBuffer.allocate(blockBufferSize));
}
// get the first buffer to be used
this.currentBuffer = acquireQueue.remove();
LOG.info("BufferManager: Buffer Size:{} FlushIntervalSeconds:{}",
blockBufferSize, intervalSeconds);
}
// triggerBlockBufferFlush enqueues current ByteBuffer for flush and returns.
// This enqueue is asynchronous and hence triggerBlockBufferFlush will
// only block when there are no available buffers in acquireQueue
// Once the DirtyLog write is done, buffer is returned back to
// BlockBufferManager using releaseBuffer
private synchronized void triggerBlockBufferFlush(FlushReason reason) {
LOG.debug("Flush triggered because: " + reason.toString() +
" Num entries in buffer: " +
currentBuffer.position() / (Long.SIZE / Byte.SIZE) +
" Acquire Queue Size: " + acquireQueue.size());
parentCache.getTargetMetrics().incNumBlockBufferFlushTriggered();
BlockBufferFlushTask flushTask =
new BlockBufferFlushTask(currentBuffer, parentCache, this);
threadPoolExecutor.submit(flushTask);
try {
currentBuffer = acquireQueue.take();
} catch (InterruptedException ex) {
currentBuffer = null;
parentCache.getTargetMetrics().incNumInterruptedBufferWaits();
LOG.error("wait on take operation on acquire queue interrupted", ex);
Thread.currentThread().interrupt();
}
}
public synchronized void addToBlockBuffer(long blockId) {
parentCache.getTargetMetrics().incNumBlockBufferUpdates();
currentBuffer.putLong(blockId);
// if no space left, flush this buffer
if (currentBuffer.remaining() == 0) {
triggerBlockBufferFlush(FlushReason.BUFFER_FULL);
}
}
public void releaseBuffer(ByteBuffer buffer) {
if (buffer.position() != 0) {
LOG.error("requeuing a non empty buffer with:{}",
"elements enqueued in the acquire queue",
buffer.position() / (Long.SIZE / Byte.SIZE));
buffer.reset();
}
// There should always be space in the queue to add an element
acquireQueue.add(buffer);
}
// Start a scheduled task to flush blockIDBuffer
public void start() {
Runnable scheduledTask = () -> triggerBlockBufferFlush(FlushReason.TIMER);
scheduledExecutor.scheduleWithFixedDelay(scheduledTask, intervalSeconds,
intervalSeconds, TimeUnit.SECONDS);
threadPoolExecutor.prestartAllCoreThreads();
}
public void shutdown() {
triggerBlockBufferFlush(FlushReason.SHUTDOWN);
scheduledExecutor.shutdown();
threadPoolExecutor.shutdown();
}
}

View File

@ -0,0 +1,394 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock;
import com.google.common.primitives.Longs;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics;
import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher;
import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.cblock.CBlockConfigKeys.
DFS_CBLOCK_DISK_CACHE_PATH_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys.
DFS_CBLOCK_TRACE_IO;
import static org.apache.hadoop.cblock.CBlockConfigKeys.
DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO;
import static org.apache.hadoop.cblock.CBlockConfigKeys.
DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS;
/**
* Tests for Tests for local cache.
*/
public class TestBufferManager {
private final static long GB = 1024 * 1024 * 1024;
private final static int KB = 1024;
private static MiniOzoneCluster cluster;
private static OzoneConfiguration config;
private static StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
private static XceiverClientManager xceiverClientManager;
@BeforeClass
public static void init() throws IOException {
config = new OzoneConfiguration();
File p = GenericTestUtils.getTestDir();
String path = p.getPath().concat(
TestOzoneContainer.class.getSimpleName());
config.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
config.setBoolean(DFS_CBLOCK_TRACE_IO, true);
config.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
cluster = new MiniOzoneCluster.Builder(config)
.numDataNodes(1).setHandlerType("distributed").build();
storageContainerLocationClient = cluster
.createStorageContainerLocationClient();
xceiverClientManager = new XceiverClientManager(config);
}
@AfterClass
public static void shutdown() throws InterruptedException {
if (cluster != null) {
cluster.shutdown();
}
IOUtils.cleanup(null, storageContainerLocationClient, cluster);
}
/**
* createContainerAndGetPipeline creates a set of containers and returns the
* Pipelines that define those containers.
*
* @param count - Number of containers to create.
* @return - List of Pipelines.
* @throws IOException
*/
private List<Pipeline> createContainerAndGetPipeline(int count)
throws IOException {
List<Pipeline> containerPipelines = new LinkedList<>();
for (int x = 0; x < count; x++) {
String traceID = "trace" + RandomStringUtils.randomNumeric(4);
String containerName = "container" + RandomStringUtils.randomNumeric(10);
Pipeline pipeline =
storageContainerLocationClient.allocateContainer(containerName);
XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
ContainerProtocolCalls.createContainer(client, traceID);
// This step is needed since we set private data on pipelines, when we
// read the list from CBlockServer. So we mimic that action here.
pipeline.setData(Longs.toByteArray(x));
containerPipelines.add(pipeline);
}
return containerPipelines;
}
/**
* This test writes some block to the cache and then shuts down the cache.
* The cache is then restarted to check that the
* correct number of blocks are read from Dirty Log
*
* @throws IOException
*/
@Test
public void testEmptyBlockBufferHandling() throws IOException,
InterruptedException, TimeoutException {
// Create a new config so that this tests write metafile to new location
OzoneConfiguration flushTestConfig = new OzoneConfiguration();
URL p = flushTestConfig.getClass().getResource("");
String path = p.getPath().concat(
TestOzoneContainer.class.getSimpleName() +
GenericTestUtils.getMethodName() +
RandomStringUtils.randomNumeric(4));
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
String userName = "user" + RandomStringUtils.randomNumeric(4);
String data = RandomStringUtils.random(4 * KB);
List<Pipeline> pipelines = createContainerAndGetPipeline(10);
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
xceiverClientManager, metrics);
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
.setConfiguration(flushTestConfig)
.setVolumeName(volumeName)
.setUserName(userName)
.setPipelines(pipelines)
.setClientManager(xceiverClientManager)
.setBlockSize(4 * KB)
.setVolumeSize(50 * GB)
.setFlusher(flusher)
.setCBlockTargetMetrics(metrics)
.build();
cache.start();
// Write data to the cache
cache.put(1, data.getBytes(StandardCharsets.UTF_8));
Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
Assert.assertEquals(1, metrics.getNumWriteOps());
cache.put(2, data.getBytes(StandardCharsets.UTF_8));
Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
Assert.assertEquals(2, metrics.getNumWriteOps());
// Store the previous block buffer position
Assert.assertEquals(2, metrics.getNumBlockBufferUpdates());
// Simulate a shutdown by closing the cache
cache.close();
Thread.sleep(1000);
Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered());
Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
Assert.assertEquals(2 * (Long.SIZE/ Byte.SIZE),
metrics.getNumBytesDirtyLogWritten());
Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes());
Assert.assertEquals(0, metrics.getNumInterruptedBufferWaits());
// Restart cache and check that right number of entries are read
CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
ContainerCacheFlusher newFlusher =
new ContainerCacheFlusher(flushTestConfig,
xceiverClientManager, newMetrics);
CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
.setConfiguration(flushTestConfig)
.setVolumeName(volumeName)
.setUserName(userName)
.setPipelines(pipelines)
.setClientManager(xceiverClientManager)
.setBlockSize(4 * KB)
.setVolumeSize(50 * GB)
.setFlusher(newFlusher)
.setCBlockTargetMetrics(newMetrics)
.build();
newCache.start();
Thread fllushListenerThread = new Thread(newFlusher);
fllushListenerThread.setDaemon(true);
fllushListenerThread.start();
Thread.sleep(5000);
Assert.assertEquals(metrics.getNumBlockBufferUpdates(),
newMetrics.getNumDirtyLogBlockRead());
Assert.assertEquals(newMetrics.getNumDirtyLogBlockRead()
* (Long.SIZE/ Byte.SIZE), newMetrics.getNumBytesDirtyLogReads());
// Now shutdown again, nothing should be flushed
newFlusher.shutdown();
Assert.assertEquals(0, newMetrics.getNumBlockBufferUpdates());
Assert.assertEquals(0, newMetrics.getNumBytesDirtyLogWritten());
}
@Test
public void testPeriodicFlush() throws IOException,
InterruptedException, TimeoutException{
// Create a new config so that this tests write metafile to new location
OzoneConfiguration flushTestConfig = new OzoneConfiguration();
URL p = flushTestConfig.getClass().getResource("");
String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
flushTestConfig
.setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
String userName = "user" + RandomStringUtils.randomNumeric(4);
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
xceiverClientManager, metrics);
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
.setConfiguration(flushTestConfig)
.setVolumeName(volumeName)
.setUserName(userName)
.setPipelines(createContainerAndGetPipeline(10))
.setClientManager(xceiverClientManager)
.setBlockSize(4 * KB)
.setVolumeSize(50 * GB)
.setFlusher(flusher)
.setCBlockTargetMetrics(metrics)
.build();
cache.start();
Thread.sleep(8000);
// Ticks will be at 5s, 10s and so on, so this count should be 1
Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered());
// Nothing pushed to cache, so nothing should be written
Assert.assertEquals(0, metrics.getNumBytesDirtyLogWritten());
Assert.assertEquals(0, metrics.getNumBlockBufferFlushCompleted());
cache.close();
// After close, another trigger should happen but still no data written
Assert.assertEquals(2, metrics.getNumBlockBufferFlushTriggered());
Assert.assertEquals(0, metrics.getNumBytesDirtyLogWritten());
Assert.assertEquals(0, metrics.getNumBlockBufferFlushCompleted());
Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes());
}
@Test
public void testSingleBufferFlush() throws IOException,
InterruptedException, TimeoutException {
// Create a new config so that this tests write metafile to new location
OzoneConfiguration flushTestConfig = new OzoneConfiguration();
URL p = flushTestConfig.getClass().getResource("");
String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
String userName = "user" + RandomStringUtils.randomNumeric(4);
String data = RandomStringUtils.random(4 * KB);
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
xceiverClientManager, metrics);
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
.setConfiguration(flushTestConfig)
.setVolumeName(volumeName)
.setUserName(userName)
.setPipelines(createContainerAndGetPipeline(10))
.setClientManager(xceiverClientManager)
.setBlockSize(4 * KB)
.setVolumeSize(50 * GB)
.setFlusher(flusher)
.setCBlockTargetMetrics(metrics)
.build();
cache.start();
for (int i = 0; i < 511; i++) {
cache.put(i, data.getBytes(StandardCharsets.UTF_8));
}
// After writing 511 block no flush should happen
Assert.assertEquals(0, metrics.getNumBlockBufferFlushTriggered());
Assert.assertEquals(0, metrics.getNumBlockBufferFlushCompleted());
// After one more block it should
cache.put(512, data.getBytes(StandardCharsets.UTF_8));
Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered());
Thread.sleep(1000);
Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
cache.close();
Assert.assertEquals(512 * (Long.SIZE / Byte.SIZE),
metrics.getNumBytesDirtyLogWritten());
}
@Test
public void testMultipleBuffersFlush() throws IOException,
InterruptedException, TimeoutException {
// Create a new config so that this tests write metafile to new location
OzoneConfiguration flushTestConfig = new OzoneConfiguration();
URL p = flushTestConfig.getClass().getResource("");
String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
String userName = "user" + RandomStringUtils.randomNumeric(4);
String data = RandomStringUtils.random(4 * KB);
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
xceiverClientManager, metrics);
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
.setConfiguration(flushTestConfig)
.setVolumeName(volumeName)
.setUserName(userName)
.setPipelines(createContainerAndGetPipeline(10))
.setClientManager(xceiverClientManager)
.setBlockSize(4 * KB)
.setVolumeSize(50 * GB)
.setFlusher(flusher)
.setCBlockTargetMetrics(metrics)
.build();
cache.start();
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 512; j++) {
cache.put(i * 512 + j, data.getBytes(StandardCharsets.UTF_8));
}
// Flush should be triggered after every 512 block write
Assert.assertEquals(i + 1, metrics.getNumBlockBufferFlushTriggered());
}
Assert.assertEquals(0, metrics.getNumIllegalDirtyLogFiles());
Assert.assertEquals(0, metrics.getNumFailedDirtyLogFileDeletes());
cache.close();
Assert.assertEquals(4 * 512 * (Long.SIZE / Byte.SIZE),
metrics.getNumBytesDirtyLogWritten());
Assert.assertEquals(5, metrics.getNumBlockBufferFlushTriggered());
Assert.assertEquals(4, metrics.getNumBlockBufferFlushCompleted());
}
@Test
public void testSingleBlockFlush() throws IOException,
InterruptedException, TimeoutException{
// Create a new config so that this tests write metafile to new location
OzoneConfiguration flushTestConfig = new OzoneConfiguration();
URL p = flushTestConfig.getClass().getResource("");
String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
flushTestConfig
.setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
String userName = "user" + RandomStringUtils.randomNumeric(4);
String data = RandomStringUtils.random(4 * KB);
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
xceiverClientManager, metrics);
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
.setConfiguration(flushTestConfig)
.setVolumeName(volumeName)
.setUserName(userName)
.setPipelines(createContainerAndGetPipeline(10))
.setClientManager(xceiverClientManager)
.setBlockSize(4 * KB)
.setVolumeSize(50 * GB)
.setFlusher(flusher)
.setCBlockTargetMetrics(metrics)
.build();
cache.start();
cache.put(0, data.getBytes(StandardCharsets.UTF_8));
Thread.sleep(8000);
// Ticks will be at 5s, 10s and so on, so this count should be 1
Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered());
// 1 block written to cache, which should be flushed
Assert.assertEquals(8, metrics.getNumBytesDirtyLogWritten());
Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
cache.close();
// After close, another trigger should happen but no data should be written
Assert.assertEquals(2, metrics.getNumBlockBufferFlushTriggered());
Assert.assertEquals(8, metrics.getNumBytesDirtyLogWritten());
Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes());
}
}

View File

@ -62,10 +62,6 @@ import static org.apache.hadoop.cblock.CBlockConfigKeys.
DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO;
import static org.apache.hadoop.cblock.CBlockConfigKeys.
DFS_CBLOCK_TRACE_IO;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
/**
* Tests for Tests for local cache.
@ -238,15 +234,12 @@ public class TestLocalBlockCache {
cache.put(blockid, data.getBytes(StandardCharsets.UTF_8));
}
Assert.assertEquals(totalBlocks, metrics.getNumWriteOps());
Assert.assertEquals(totalBlocks, metrics.getNumBlockBufferUpdates());
LOG.info("Wrote 50K blocks, waiting for replication to finish.");
GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
long endTime = Time.monotonicNow();
LOG.info("Time taken for writing {} blocks is {} seconds", totalBlocks,
TimeUnit.MILLISECONDS.toSeconds(endTime - startTime));
long blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE,
DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT);
Assert.assertEquals(metrics.getNumWriteOps() / blockBufferSize,
metrics.getNumBlockBufferFlush());
// TODO: Read this data back.
cache.close();
}
@ -278,6 +271,7 @@ public class TestLocalBlockCache {
Assert.assertEquals(1, metrics.getNumReadOps());
Assert.assertEquals(1, metrics.getNumReadLostBlocks());
Assert.assertEquals(1, metrics.getNumReadCacheMiss());
cache.close();
}
@Test
@ -507,95 +501,6 @@ public class TestLocalBlockCache {
cache.close();
}
/**
* This test writes some block to the cache and then shuts down the cache.
* The cache is then restarted to check that the
* correct number of blocks are read from Dirty Log
*
* @throws IOException
*/
@Test
public void testEmptyBlockBufferHandling() throws IOException,
InterruptedException, TimeoutException {
// Create a new config so that this tests write metafile to new location
OzoneConfiguration flushTestConfig = new OzoneConfiguration();
URL p = flushTestConfig.getClass().getResource("");
String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
String userName = "user" + RandomStringUtils.randomNumeric(4);
String data = RandomStringUtils.random(4 * KB);
List<Pipeline> pipelines = getContainerPipeline(10);
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
xceiverClientManager, metrics);
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
.setConfiguration(flushTestConfig)
.setVolumeName(volumeName)
.setUserName(userName)
.setPipelines(pipelines)
.setClientManager(xceiverClientManager)
.setBlockSize(4 * KB)
.setVolumeSize(50 * GB)
.setFlusher(flusher)
.setCBlockTargetMetrics(metrics)
.build();
cache.start();
// Write data to the cache
cache.put(1, data.getBytes(StandardCharsets.UTF_8));
Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
Assert.assertEquals(1, metrics.getNumWriteOps());
cache.put(2, data.getBytes(StandardCharsets.UTF_8));
Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
Assert.assertEquals(2, metrics.getNumWriteOps());
// Store the previous block buffer position
Assert.assertEquals(2, metrics.getNumDirtyLogBlockUpdated());
// Simulate a shutdown by closing the cache
GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
cache.close();
Assert.assertEquals(2 * (Long.SIZE/ Byte.SIZE),
metrics.getNumBytesDirtyLogWritten());
Assert.assertEquals(0, metrics.getNumFailedDirtyBlockFlushes());
// Restart cache and check that right number of entries are read
CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
ContainerCacheFlusher newFlusher =
new ContainerCacheFlusher(flushTestConfig,
xceiverClientManager, newMetrics);
CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
.setConfiguration(flushTestConfig)
.setVolumeName(volumeName)
.setUserName(userName)
.setPipelines(pipelines)
.setClientManager(xceiverClientManager)
.setBlockSize(4 * KB)
.setVolumeSize(50 * GB)
.setFlusher(newFlusher)
.setCBlockTargetMetrics(newMetrics)
.build();
newCache.start();
Thread flushListenerThread = new Thread(newFlusher);
flushListenerThread.setDaemon(true);
flushListenerThread.start();
Thread.sleep(5000);
Assert.assertEquals(metrics.getNumDirtyLogBlockUpdated(),
newMetrics.getNumDirtyLogBlockRead());
Assert.assertEquals(newMetrics.getNumDirtyLogBlockRead()
* (Long.SIZE/ Byte.SIZE), newMetrics.getNumBytesDirtyLogReads());
// Now shutdown again, nothing should be flushed
newCache.close();
newFlusher.shutdown();
Assert.assertEquals(0, newMetrics.getNumDirtyLogBlockUpdated());
Assert.assertEquals(0, newMetrics.getNumBytesDirtyLogWritten());
Assert.assertEquals(0, newMetrics.getNumFailedDirtyBlockFlushes());
}
/**
* This test writes some block to the cache and then shuts down the cache
* The cache is then restarted with "short.circuit.io" disable to check
@ -642,9 +547,9 @@ public class TestLocalBlockCache {
.setCBlockTargetMetrics(metrics)
.build();
cache.start();
Thread fllushListenerThread = new Thread(flusher);
fllushListenerThread.setDaemon(true);
fllushListenerThread.start();
Thread flushListenerThread = new Thread(flusher);
flushListenerThread.setDaemon(true);
flushListenerThread.start();
Assert.assertTrue(cache.isShortCircuitIOEnabled());
// Write data to the cache
for (int i = 0; i < 512; i++) {
@ -686,7 +591,7 @@ public class TestLocalBlockCache {
}
Assert.assertEquals(0, newMetrics.getNumReadLostBlocks());
Assert.assertEquals(0, newMetrics.getNumFailedReadBlocks());
newFlusher.shutdown();
newCache.close();
newFlusher.shutdown();
}
}