diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
index bc0d68c69b0..53cb57c6bf5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
@@ -82,6 +82,11 @@ public final class CBlockConfigKeys {
public static final String DFS_CBLOCK_TRACE_IO = "dfs.cblock.trace.io";
public static final boolean DFS_CBLOCK_TRACE_IO_DEFAULT = false;
+ public static final String DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO =
+ "dfs.cblock.short.circuit.io";
+ public static final boolean DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO_DEFAULT =
+ false;
+
/**
* Cache size in 1000s of entries. 256 indicates 256 * 1024.
*/
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
new file mode 100644
index 00000000000..8c3b3ff5380
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock.jscsiHelper.cache.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.LevelDBStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Paths;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT;
+
+/**
+ * A Queue that is used to write blocks asynchronously to the container.
+ */
+public class AsyncBlockWriter {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AsyncBlockWriter.class);
+
+ /**
+ * Right now we have a single buffer and we block when we write it to
+ * the file.
+ */
+ private final ByteBuffer blockIDBuffer;
+
+ /**
+ * XceiverClientManager is used to get client connections to a set of
+ * machines.
+ */
+ private final XceiverClientManager xceiverClientManager;
+
+ /**
+ * This lock is used as a signal to re-queuing thread. The requeue thread
+ * wakes up as soon as it is signaled some blocks are in the retry queue.
+ * We try really aggressively since this new block will automatically move
+ * to the end of the queue.
+ *
+ * In the event a container is unavailable for a long time, we can either
+ * fail all writes or remap and let the writes succeed. The easier
+ * semantics is to fail the volume until the container is recovered by SCM.
+ */
+ private final Lock lock;
+ private final Condition notEmpty;
+ /**
+ * The cache this writer is operating against.
+ */
+ private final CBlockLocalCache parentCache;
+ private final int blockBufferSize;
+ private final static String DIRTY_LOG_PREFIX = "DirtyLog";
+ private AtomicLong localIoCount;
+
+ /**
+ * Constructs an Async Block Writer.
+ *
+ * @param config - Config
+ * @param cache - Parent Cache for this writer
+ */
+ public AsyncBlockWriter(Configuration config, CBlockLocalCache cache) {
+
+ Preconditions.checkNotNull(cache, "Cache cannot be null.");
+ Preconditions.checkNotNull(cache.getCacheDB(), "DB cannot be null.");
+ localIoCount = new AtomicLong();
+ blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE,
+ DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * 1024;
+ LOG.info("Cache: Block Size: {}", blockBufferSize);
+ lock = new ReentrantLock();
+ notEmpty = lock.newCondition();
+ parentCache = cache;
+ xceiverClientManager = cache.getClientManager();
+ blockIDBuffer = ByteBuffer.allocateDirect(blockBufferSize);
+ }
+
+ /**
+ * Return the log to write to.
+ *
+ * @return Logger.
+ */
+ public static Logger getLOG() {
+ return LOG;
+ }
+
+ /**
+ * Get the CacheDB.
+ *
+ * @return LevelDB Handle
+ */
+ LevelDBStore getCacheDB() {
+ return parentCache.getCacheDB();
+ }
+
+ /**
+ * Returns the client manager.
+ *
+ * @return XceiverClientManager
+ */
+ XceiverClientManager getXceiverClientManager() {
+ return xceiverClientManager;
+ }
+
+ /**
+ * Incs the localIoPacket Count that has gone into this device.
+ */
+ public long incrementLocalIO() {
+ return localIoCount.incrementAndGet();
+ }
+
+ /**
+ * Return the local io counts to this device.
+ * @return the count of io
+ */
+ public long getLocalIOCount() {
+ return localIoCount.get();
+ }
+
+ /**
+ * Writes a block to LevelDB store and queues a work item for the system to
+ * sync the block to containers.
+ *
+ * @param block - Logical Block
+ */
+ public void writeBlock(LogicalBlock block) throws IOException {
+ byte[] keybuf = Longs.toByteArray(block.getBlockID());
+ if (parentCache.isShortCircuitIOEnabled()) {
+ long startTime = Time.monotonicNow();
+ getCacheDB().put(keybuf, block.getData().array());
+ incrementLocalIO();
+ long endTime = Time.monotonicNow();
+ parentCache.getTargetMetrics().updateDBWriteLatency(
+ endTime - startTime);
+ if (parentCache.isTraceEnabled()) {
+ String datahash = DigestUtils.sha256Hex(block.getData().array());
+ parentCache.getTracer().info(
+ "Task=WriterTaskDBPut,BlockID={},Time={},SHA={}",
+ block.getBlockID(), endTime - startTime, datahash);
+ }
+ block.clearData();
+ } else {
+ // TODO : Support Direct I/O
+ LOG.error("Non-Cache I/O is not supported at this point of time.");
+ throw new IllegalStateException("Cache is required and cannot be " +
+ "disabled now.");
+ }
+ if (blockIDBuffer.remaining() <= (Long.SIZE / Byte.SIZE)) {
+ long startTime = Time.monotonicNow();
+ blockIDBuffer.flip();
+ writeBlockBufferToFile(blockIDBuffer);
+ blockIDBuffer.clear();
+ long endTime = Time.monotonicNow();
+ if (parentCache.isTraceEnabled()) {
+ parentCache.getTracer().info(
+ "Task=DirtyBlockLogWrite,Time={}", endTime - startTime);
+ }
+ }
+ blockIDBuffer.putLong(block.getBlockID());
+ }
+
+ /**
+ * Write Block Buffer to file.
+ *
+ * @param blockID - ByteBuffer
+ * @throws IOException
+ */
+ private void writeBlockBufferToFile(ByteBuffer blockID)
+ throws IOException {
+ boolean append = false;
+ String fileName =
+ String.format("%s.%s", DIRTY_LOG_PREFIX, Time.monotonicNow());
+ File logDir = new File(parentCache.getDbPath().toString());
+ if (!logDir.exists() && !logDir.mkdirs()) {
+ LOG.error("Unable to create the log directory, Critical error cannot " +
+ "continue. Log Dir : {}", logDir);
+ throw new IllegalStateException("Cache Directory create failed, Cannot " +
+ "continue. Log Dir: {}" + logDir);
+ }
+ String log = Paths.get(parentCache.getDbPath().toString(), fileName)
+ .toString();
+
+ try (FileChannel channel = new FileOutputStream(log, append).getChannel()) {
+ channel.write(blockID);
+ }
+ blockID.clear();
+ parentCache.processDirtyMessage(fileName);
+ }
+
+ /**
+ * Shutdown by writing any pending I/O to dirtylog buffer.
+ */
+ public void shutdown() {
+ try {
+ writeBlockBufferToFile(this.blockIDBuffer);
+ } catch (IOException e) {
+ LOG.error("Unable to sync the Block map to disk -- This might cause a " +
+ "data loss or corruption");
+ }
+ }
+ /**
+ * Returns tracer.
+ *
+ * @return Tracer
+ */
+ Logger getTracer() {
+ return parentCache.getTracer();
+ }
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java
index 743a35b1141..329e8d1eb52 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java
@@ -17,36 +17,278 @@
*/
package org.apache.hadoop.cblock.jscsiHelper.cache.impl;
-import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher;
import org.apache.hadoop.cblock.jscsiHelper.cache.CacheModule;
import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.LevelDBStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.nio.file.FileStore;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.List;
+import java.util.UUID;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+ .DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+ .DFS_CBLOCK_DISK_CACHE_PATH_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+ .DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+ .DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_TRACE_IO;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+ .DFS_CBLOCK_TRACE_IO_DEFAULT;
/**
* A local cache used by the CBlock ISCSI server. This class is enabled or
* disabled via config settings.
- *
- * TODO : currently, this class is a just a place holder.
*/
-final public class CBlockLocalCache implements CacheModule {
+public class CBlockLocalCache implements CacheModule {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CBlockLocalCache.class);
+ private static final Logger TRACER =
+ LoggerFactory.getLogger("TraceIO");
- private CBlockLocalCache() {
+ private final Configuration conf;
+ /**
+ * LevelDB cache file, we use an off-heap cache in LevelDB for 256 MB for now.
+ */
+ private final LevelDBStore cacheDB;
+ private final int cacheSizeMb = 256;
+
+ /**
+ * Asyncblock writer updates the cacheDB and writes the blocks async to
+ * remote containers.
+ */
+ private final AsyncBlockWriter blockWriter;
+
+ /**
+ * Sync block reader tries to read from the cache and if we get a cache
+ * miss we will fetch the block from remote location. It will asynchronously
+ * update the cacheDB.
+ */
+ private final SyncBlockReader blockReader;
+ /**
+ * We create a trace ID to make it easy to debug issues.
+ * A trace ID is in the following format. IPAddress:VolumeName:blockID:second
+ *
+ * This will get written down on the data node if we get any failures, so
+ * with this trace ID we can correlate cBlock failures across machines.
+ */
+ private final String userName;
+ private final String volumeName;
+ private final String ipAddressString;
+ private final String tracePrefix;
+
+ /**
+ * From a block ID we are able to get the pipeline by indexing this array.
+ */
+ private final Pipeline[] containerList;
+ private final int blockSize;
+ private XceiverClientManager clientManager;
+ /**
+ * If this flag is enabled then cache traces all I/O, all reads and writes
+ * are visible in the log with sha of the block written. Makes the system
+ * slower use it only for debugging or creating trace simulations.
+ */
+ private final boolean traceEnabled;
+ private final boolean enableShortCircuitIO;
+ private final long volumeSize;
+ private long currentCacheSize;
+ private File dbPath;
+ private final ContainerCacheFlusher flusher;
+ private CBlockTargetMetrics cblockTargetMetrics;
+
+ /**
+ * Get Db Path.
+ * @return the file instance of the db.
+ */
+ public File getDbPath() {
+ return dbPath;
}
+ /**
+ * Constructor for CBlockLocalCache invoked via the builder.
+ *
+ * @param conf - Configuration
+ * @param volumeName - volume Name
+ * @param userName - user name
+ * @param containerPipelines - Pipelines that make up this contianer
+ * @param blockSize - blockSize
+ * @param flusher - flusher to flush data to container
+ * @throws IOException
+ */
+ CBlockLocalCache(
+ Configuration conf, String volumeName,
+ String userName, List containerPipelines, int blockSize,
+ long volumeSize, ContainerCacheFlusher flusher) throws IOException {
+ this.conf = conf;
+ this.userName = userName;
+ this.volumeName = volumeName;
+ this.blockSize = blockSize;
+ this.flusher = flusher;
+ this.traceEnabled = conf.getBoolean(DFS_CBLOCK_TRACE_IO,
+ DFS_CBLOCK_TRACE_IO_DEFAULT);
+ this.enableShortCircuitIO = conf.getBoolean(
+ DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO,
+ DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO_DEFAULT);
+ dbPath = Paths.get(conf.get(DFS_CBLOCK_DISK_CACHE_PATH_KEY,
+ DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT), userName, volumeName).toFile();
+
+ if (!dbPath.exists() && !dbPath.mkdirs()) {
+ LOG.error("Unable to create the cache paths. Path: {}", dbPath);
+ throw new IllegalArgumentException("Unable to create paths. Path: " +
+ dbPath);
+ }
+ cacheDB = flusher.openDB(dbPath.toString(), cacheSizeMb);
+ this.containerList = containerPipelines.toArray(new
+ Pipeline[containerPipelines.size()]);
+ this.ipAddressString = getHostIP();
+ this.tracePrefix = ipAddressString + ":" + this.volumeName;
+ this.volumeSize = volumeSize;
+
+ blockWriter = new AsyncBlockWriter(conf, this);
+ blockReader = new SyncBlockReader(conf, this);
+ if (this.traceEnabled) {
+ getTracer().info("Task=StartingCache");
+ }
+ }
+
+ private void setClientManager(XceiverClientManager manager) {
+ this.clientManager = manager;
+ }
+
+ private void setCblockTargetMetrics(CBlockTargetMetrics targetMetrics) {
+ this.cblockTargetMetrics = targetMetrics;
+ }
+
+ /**
+ * Returns new builder class that builds a CBlockLocalCache.
+ *
+ * @return Builder
+ */
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public void processDirtyMessage(String fileName) {
+ flusher.processDirtyBlocks(dbPath.toString(), fileName);
+ }
+
+ /**
+ * Get usable disk space.
+ *
+ * @param dbPathString - Path to db
+ * @return long bytes remaining.
+ */
+ private static long getRemaningDiskSpace(String dbPathString) {
+ try {
+ URI fileUri = new URI("file:///");
+ Path dbPath = Paths.get(fileUri).resolve(dbPathString);
+ FileStore disk = Files.getFileStore(dbPath);
+ return disk.getUsableSpace();
+ } catch (URISyntaxException | IOException ex) {
+ LOG.error("Unable to get free space on for path :" + dbPathString);
+ }
+ return 0L;
+ }
+
+ /**
+ * Returns the Max current CacheSize.
+ *
+ * @return - Cache Size
+ */
+ public long getCurrentCacheSize() {
+ return currentCacheSize;
+ }
+
+ /**
+ * Sets the Maximum Cache Size.
+ *
+ * @param currentCacheSize - Max current Cache Size.
+ */
+ public void setCurrentCacheSize(long currentCacheSize) {
+ this.currentCacheSize = currentCacheSize;
+ }
+
+ /**
+ * True if block tracing is enabled.
+ *
+ * @return - bool
+ */
+ public boolean isTraceEnabled() {
+ return traceEnabled;
+ }
+
+ /**
+ * Checks if Short Circuit I/O is enabled.
+ *
+ * @return - true if it is enabled.
+ */
+ public boolean isShortCircuitIOEnabled() {
+ return enableShortCircuitIO;
+ }
+
+ /**
+ * Returns the default block size of this device.
+ *
+ * @return - int
+ */
+ public int getBlockSize() {
+ return blockSize;
+ }
+
+ /**
+ * Gets the client manager.
+ *
+ * @return XceiverClientManager
+ */
+ public XceiverClientManager getClientManager() {
+ return clientManager;
+ }
+
+ /**
+ * check if the key is cached, if yes, returned the cached object.
+ * otherwise, load from data source. Then put it into cache.
+ *
+ * @param blockID
+ * @return the block associated to the blockID
+ */
@Override
public LogicalBlock get(long blockID) throws IOException {
- return null;
+ cblockTargetMetrics.incNumReadOps();
+ return blockReader.readBlock(blockID);
}
+ /**
+ * put the value of the key into cache and remote container.
+ *
+ * @param blockID - BlockID
+ * @param data - byte[]
+ */
@Override
public void put(long blockID, byte[] data) throws IOException {
+ cblockTargetMetrics.incNumWriteOps();
+ LogicalBlock block = new DiskBlock(blockID, data, false);
+ blockWriter.writeBlock(block);
}
@Override
@@ -56,32 +298,179 @@ final public class CBlockLocalCache implements CacheModule {
@Override
public void start() throws IOException {
-
+ // This is a No-op for us. We start when we bootup.
}
@Override
public void stop() throws IOException {
-
}
@Override
public void close() throws IOException {
-
+ blockReader.shutdown();
+ blockWriter.shutdown();
+ this.flusher.closeDB(dbPath.toString());
+ if (this.traceEnabled) {
+ getTracer().info("Task=ShutdownCache");
+ }
}
+ /**
+ * Returns true if cache still has blocks pending to write.
+ *
+ * @return false if we have no pending blocks to write.
+ */
@Override
public boolean isDirtyCache() {
return false;
}
- public static Builder newBuilder() {
- return new Builder();
+
+ /**
+ * Tries to get the local host IP Address for creating trace IDs.
+ */
+ private String getHostIP() {
+ String tmp;
+ try {
+ tmp = InetAddress.getLocalHost().toString();
+ } catch (UnknownHostException ex) {
+ tmp = UUID.randomUUID().toString();
+ LOG.error("Unable to read the host address. Using a GUID for " +
+ "hostname:{} ", tmp, ex);
+ }
+ return tmp;
+ }
+
+ /**
+ * Returns the local cache DB.
+ *
+ * @return - DB
+ */
+ LevelDBStore getCacheDB() {
+ return this.cacheDB;
+ }
+
+ /**
+ * Returns the current userName.
+ *
+ * @return - UserName
+ */
+ String getUserName() {
+ return this.userName;
+ }
+
+ /**
+ * Returns the volume name.
+ *
+ * @return VolumeName.
+ */
+ String getVolumeName() {
+ return this.volumeName;
+ }
+
+ /**
+ * Returns the target metrics.
+ *
+ * @return CBlock Target Metrics.
+ */
+ CBlockTargetMetrics getTargetMetrics() {
+ return this.cblockTargetMetrics;
+ }
+
+ /**
+ * Returns the pipeline to use given a container.
+ *
+ * @param blockId - blockID
+ * @return - pipeline.
+ */
+ Pipeline getPipeline(long blockId) {
+ int containerIdx = (int) blockId % containerList.length;
+ long cBlockIndex =
+ Longs.fromByteArray(containerList[containerIdx].getData());
+ if (cBlockIndex > 0) {
+ // This catches the case when we get a wrong container in the ordering
+ // of the containers.
+ Preconditions.checkState(containerIdx % cBlockIndex == 0,
+ "The container ID computed should match with the container index " +
+ "returned from cBlock Server.");
+ }
+ return containerList[containerIdx];
+ }
+
+ /**
+ * Returns a traceID based in Block ID.
+ * The format is HostIP:VolumeName:BlockID:timeStamp, in case of error this
+ * will be logged on the container side.
+ *
+ * @param blockID - Block ID
+ * @return trace ID
+ */
+ String getTraceID(long blockID) {
+ // mapping to seconds to make the string smaller.
+ return this.tracePrefix + ":" + blockID + ":"
+ + Time.monotonicNow() / 1000;
+ }
+
+ /**
+ * Returns tracer.
+ *
+ * @return - Logger
+ */
+ Logger getTracer() {
+ return TRACER;
}
/**
* Builder class for CBlocklocalCache.
*/
public static class Builder {
+ private Configuration configuration;
+ private String userName;
+ private String volumeName;
+ private List pipelines;
+ private XceiverClientManager clientManager;
+ private int blockSize;
+ private long volumeSize;
+ private ContainerCacheFlusher flusher;
+ private CBlockTargetMetrics metrics;
+
+ /**
+ * Ctor.
+ */
+ Builder() {
+ }
+
+ /**
+ * Computes a cache size based on the configuration and available disk
+ * space.
+ *
+ * @param configuration - Config
+ * @param volumeSize - Size of Volume
+ * @param blockSize - Size of the block
+ * @return - cache size in bytes.
+ */
+ private static long computeCacheSize(Configuration configuration,
+ long volumeSize, int blockSize) {
+ long cacheSize = 0;
+ String dbPath = configuration.get(DFS_CBLOCK_DISK_CACHE_PATH_KEY,
+ DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT);
+ if (StringUtils.isBlank(dbPath)) {
+ return cacheSize;
+ }
+ long spaceRemaining = getRemaningDiskSpace(dbPath);
+ double cacheRatio = 1.0;
+
+ if (spaceRemaining < volumeSize) {
+ cacheRatio = (double)spaceRemaining / volumeSize;
+ }
+
+ // if cache is going to be at least 10% of the volume size it is worth
+ // doing, otherwise skip creating the cache.
+ if (cacheRatio >= 0.10) {
+ cacheSize = Double.doubleToLongBits(volumeSize * cacheRatio);
+ }
+ return cacheSize;
+ }
/**
* Sets the Config to be used by this cache.
@@ -90,6 +479,7 @@ final public class CBlockLocalCache implements CacheModule {
* @return Builder
*/
public Builder setConfiguration(Configuration configuration) {
+ this.configuration = configuration;
return this;
}
@@ -101,6 +491,7 @@ final public class CBlockLocalCache implements CacheModule {
* @return - Builder
*/
public Builder setUserName(String userName) {
+ this.userName = userName;
return this;
}
@@ -111,6 +502,7 @@ final public class CBlockLocalCache implements CacheModule {
* @return Builder
*/
public Builder setVolumeName(String volumeName) {
+ this.volumeName = volumeName;
return this;
}
@@ -121,6 +513,7 @@ final public class CBlockLocalCache implements CacheModule {
* @return Builder
*/
public Builder setPipelines(List pipelines) {
+ this.pipelines = pipelines;
return this;
}
@@ -131,6 +524,7 @@ final public class CBlockLocalCache implements CacheModule {
* @return - Builder
*/
public Builder setClientManager(XceiverClientManager clientManager) {
+ this.clientManager = clientManager;
return this;
}
@@ -141,6 +535,7 @@ final public class CBlockLocalCache implements CacheModule {
* @return - Builder
*/
public Builder setBlockSize(int blockSize) {
+ this.blockSize = blockSize;
return this;
}
@@ -151,6 +546,7 @@ final public class CBlockLocalCache implements CacheModule {
* @return - Builder
*/
public Builder setVolumeSize(long volumeSize) {
+ this.volumeSize = volumeSize;
return this;
}
@@ -160,6 +556,7 @@ final public class CBlockLocalCache implements CacheModule {
* @return Builder.
*/
public Builder setFlusher(ContainerCacheFlusher flusher) {
+ this.flusher = flusher;
return this;
}
@@ -170,11 +567,52 @@ final public class CBlockLocalCache implements CacheModule {
* @return - Builder
*/
public Builder setCBlockTargetMetrics(CBlockTargetMetrics targetMetrics) {
+ this.metrics = targetMetrics;
return this;
}
+ /**
+ * Constructs a CBlockLocalCache.
+ *
+ * @return the CBlockLocalCache with the preset properties.
+ * @throws IOException
+ */
public CBlockLocalCache build() throws IOException {
- return new CBlockLocalCache();
+ Preconditions.checkNotNull(this.configuration, "A valid configuration " +
+ "is needed");
+ Preconditions.checkState(StringUtils.isNotBlank(userName), "A valid " +
+ "username is needed");
+ Preconditions.checkState(StringUtils.isNotBlank(volumeName), " A valid" +
+ " volume name is needed");
+ Preconditions.checkNotNull(this.pipelines, "Pipelines cannot be null");
+ Preconditions.checkState(this.pipelines.size() > 0, "At least one " +
+ "pipeline location is needed for a volume");
+
+ for (int x = 0; x < pipelines.size(); x++) {
+ Preconditions.checkNotNull(pipelines.get(x).getData(), "cBlock " +
+ "relies on private data on the pipeline, null data found.");
+ }
+
+ Preconditions.checkNotNull(clientManager, "Client Manager canoot be " +
+ "null");
+ Preconditions.checkState(blockSize > 0, " Block size has to be a " +
+ "number greater than 0");
+
+ Preconditions.checkState(volumeSize > 0, "Volume Size cannot be less " +
+ "than 1");
+ Preconditions.checkNotNull(this.flusher, "Flusher cannot be null.");
+
+ CBlockLocalCache cache = new CBlockLocalCache(this.configuration,
+ this.volumeName, this.userName, this.pipelines, blockSize,
+ volumeSize, flusher);
+ cache.setCblockTargetMetrics(this.metrics);
+ cache.setClientManager(this.clientManager);
+
+ // TODO : Support user configurable maximum size.
+ long cacheSize = computeCacheSize(this.configuration, this.volumeSize,
+ this.blockSize);
+ cache.setCurrentCacheSize(cacheSize);
+ return cache;
}
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/SyncBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/SyncBlockReader.java
new file mode 100644
index 00000000000..19e375620a0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/SyncBlockReader.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock.jscsiHelper.cache.impl;
+
+import com.google.common.primitives.Longs;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.scm.XceiverClientSpi;
+import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
+import org.apache.hadoop.utils.LevelDBStore;
+import org.iq80.leveldb.DBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Reads blocks from the container via the local cache.
+ */
+public class SyncBlockReader {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SyncBlockReader.class);
+
+ /**
+ * Update Queue - The reason why we have the queue is that we want to
+ * return the block as soon as we read it from the containers. This queue
+ * is work queue which will take the read block and update the cache.
+ * During testing we found levelDB is slow during writes, hence we wanted
+ * to return as block as soon as possible and update levelDB asynchronously.
+ */
+ private final static int QUEUE_SIZE = 1024;
+ /**
+ * Config.
+ */
+ private final Configuration conf;
+ /**
+ * The parent cache this reader is operating against.
+ */
+ private final CBlockLocalCache parentCache;
+ private final BlockingQueue updateQueue;
+
+ /**
+ * executor is used for running LevelDB updates. In future, we might do
+ * read-aheads and this pool is useful for that too. The reason why we
+ * don't share an executor for reads and writes is because the write task
+ * is couple of magnitude slower than read task. So we don't want the
+ * update DB to queue up behind the writes.
+ */
+ private final ThreadPoolExecutor executor;
+
+ /**
+ * Number of threads that pool starts with.
+ */
+ private final int corePoolSize = 1;
+ /**
+ * Maximum number of threads our pool will ever create.
+ */
+ private final int maxPoolSize = 10;
+ /**
+ * The idle time a thread hangs around waiting for work. if we don't find
+ * new work in 60 seconds the worker thread is killed.
+ */
+ private final long keepAlive = 60L;
+
+ /**
+ * Constructs a SyncBlock reader.
+ *
+ * @param conf - Configuration
+ * @param cache - Cache
+ */
+ public SyncBlockReader(Configuration conf, CBlockLocalCache cache) {
+ this.conf = conf;
+ this.parentCache = cache;
+ updateQueue = new ArrayBlockingQueue<>(QUEUE_SIZE, true);
+ ThreadFactory workerThreadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("SyncBlockReader Thread #%d").setDaemon(true).build();
+ executor = new HadoopThreadPoolExecutor(
+ corePoolSize, maxPoolSize, keepAlive, TimeUnit.SECONDS,
+ updateQueue, workerThreadFactory,
+ new ThreadPoolExecutor.CallerRunsPolicy());
+ }
+
+ /**
+ * Returns the cache DB.
+ *
+ * @return LevelDB
+ */
+ private LevelDBStore getCacheDB() {
+ return parentCache.getCacheDB();
+ }
+
+ /**
+ * Returns data from the local cache if found, else reads from the remote
+ * container.
+ *
+ * @param blockID - blockID
+ * @return LogicalBlock
+ */
+ LogicalBlock readBlock(long blockID) throws IOException {
+ XceiverClientSpi client = null;
+ byte[] data = getblockFromDB(blockID);
+ if (data != null) {
+ parentCache.getTargetMetrics().incNumReadCacheHits();
+ return new DiskBlock(blockID, data, false);
+ }
+
+ parentCache.getTargetMetrics().incNumReadCacheMiss();
+ try {
+ client = parentCache.getClientManager()
+ .acquireClient(parentCache.getPipeline(blockID));
+ LogicalBlock block = getBlockFromContainer(blockID, client);
+ return block;
+ } finally {
+ if (client != null) {
+ parentCache.getClientManager().releaseClient(client);
+ }
+ }
+ }
+
+ /**
+ * Gets data from the DB if it exists.
+ *
+ * @param blockID - block id
+ * @return data
+ */
+ private byte[] getblockFromDB(long blockID) {
+ try {
+ if(parentCache.isShortCircuitIOEnabled()) {
+ long startTime = Time.monotonicNow();
+ byte[] data = getCacheDB().get(Longs.toByteArray(blockID));
+ long endTime = Time.monotonicNow();
+
+ if (parentCache.isTraceEnabled()) {
+ parentCache.getTracer().info(
+ "Task=ReadTaskDBRead,BlockID={},SHA={},Time={}",
+ blockID, (data != null && data.length > 0)
+ ? DigestUtils.sha256Hex(data) : null,
+ endTime - startTime);
+ }
+ parentCache.getTargetMetrics().updateDBReadLatency(
+ endTime - startTime);
+ return data;
+ }
+
+
+ } catch (DBException dbe) {
+ LOG.error("Error while reading from cacheDB.", dbe);
+ throw dbe;
+ }
+ return null;
+ }
+
+
+ /**
+ * Returns a block from a Remote Container. if the key is not found on a
+ * remote container we just return a block initialzied with zeros.
+ *
+ * @param blockID - blockID
+ * @param client - client
+ * @return LogicalBlock
+ * @throws IOException
+ */
+ private LogicalBlock getBlockFromContainer(long blockID,
+ XceiverClientSpi client) throws IOException {
+ String containerName = parentCache.getPipeline(blockID).getContainerName();
+ try {
+ long startTime = Time.monotonicNow();
+ ContainerProtos.GetSmallFileResponseProto response =
+ ContainerProtocolCalls.readSmallFile(client, containerName,
+ Long.toString(blockID), parentCache.getTraceID(blockID));
+ long endTime = Time.monotonicNow();
+ if (parentCache.isTraceEnabled()) {
+ parentCache.getTracer().info(
+ "Task=ReadTaskContainerRead,BlockID={},SHA={},Time={}",
+ blockID, response.getData().getData().toByteArray().length > 0 ?
+ DigestUtils.sha256Hex(response.getData()
+ .getData().toByteArray()) : null, endTime - startTime);
+ }
+
+ parentCache.getTargetMetrics().updateContainerReadLatency(
+ endTime - startTime);
+ DiskBlock block = new DiskBlock(blockID,
+ response.getData().getData().toByteArray(), false);
+
+ if(parentCache.isShortCircuitIOEnabled()) {
+ queueUpdateTask(block);
+ }
+
+ return block;
+ } catch (IOException ex) {
+ if (ex instanceof StorageContainerException) {
+ parentCache.getTargetMetrics().incNumReadLostBlocks();
+ StorageContainerException sce = (StorageContainerException) ex;
+ if (sce.getResult() == ContainerProtos.Result.NO_SUCH_KEY ||
+ sce.getResult() == ContainerProtos.Result.IO_EXCEPTION) {
+ return new DiskBlock(blockID, new byte[parentCache.getBlockSize()],
+ false);
+ }
+ }
+ throw ex;
+ }
+ }
+
+ /**
+ * Updates the cache with the block that we just read.
+ *
+ * @param block
+ */
+ private void queueUpdateTask(final DiskBlock block) {
+ Runnable updateTask = () -> {
+ if(block.getData().array().length > 0) {
+ getCacheDB().put(Longs.toByteArray(block.getBlockID()),
+ block.getData().array());
+ block.setPersisted(true);
+ } else {
+ LOG.error("Refusing to update the a null block in the local cache.");
+ }
+ };
+ if (this.executor.isShutdown() || this.executor.isTerminated()) {
+ LOG.error("Thread executor is not running.");
+ } else {
+ this.executor.submit(updateTask);
+ }
+ }
+
+ /**
+ * This is a read operation, we don't care if we updated the cache with the
+ * last block e read.
+ */
+ void shutdown() {
+ this.executor.shutdownNow();
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
new file mode 100644
index 00000000000..c8d2aadc116
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock;
+
+import com.google.common.primitives.Longs;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.cblock.jscsiHelper.CBlockIStorageImpl;
+import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics;
+import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher;
+import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
+import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.XceiverClientSpi;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static java.lang.Math.abs;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_DISK_CACHE_PATH_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_TRACE_IO;
+
+/**
+ * Tests for Tests for local cache.
+ */
+public class TestLocalBlockCache {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestLocalBlockCache.class);
+ private final static long GB = 1024 * 1024 * 1024;
+ private final static int KB = 1024;
+ private static MiniOzoneCluster cluster;
+ private static OzoneConfiguration config;
+ private static StorageContainerLocationProtocolClientSideTranslatorPB
+ storageContainerLocationClient;
+ private static XceiverClientManager xceiverClientManager;
+
+ @BeforeClass
+ public static void init() throws IOException {
+ config = new OzoneConfiguration();
+ URL p = config.getClass().getResource("");
+ String path = p.getPath().concat(
+ TestOzoneContainer.class.getSimpleName());
+ config.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+ config.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+ config.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+ cluster = new MiniOzoneCluster.Builder(config)
+ .numDataNodes(1).setHandlerType("distributed").build();
+ storageContainerLocationClient = cluster
+ .createStorageContainerLocationClient();
+ xceiverClientManager = new XceiverClientManager(config);
+ }
+
+ @AfterClass
+ public static void shutdown() throws InterruptedException {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ IOUtils.cleanup(null, storageContainerLocationClient, cluster);
+ }
+
+ /**
+ * getContainerPipelines creates a set of containers and returns the
+ * Pipelines that define those containers.
+ *
+ * @param count - Number of containers to create.
+ * @return - List of Pipelines.
+ * @throws IOException throws Exception
+ */
+ private List getContainerPipeline(int count) throws IOException {
+ List containerPipelines = new LinkedList<>();
+ for (int x = 0; x < count; x++) {
+ String traceID = "trace" + RandomStringUtils.randomNumeric(4);
+ String containerName = "container" + RandomStringUtils.randomNumeric(10);
+ Pipeline pipeline =
+ storageContainerLocationClient.allocateContainer(containerName);
+ XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
+ ContainerProtocolCalls.createContainer(client, traceID);
+ // This step is needed since we set private data on pipelines, when we
+ // read the list from CBlockServer. So we mimic that action here.
+ pipeline.setData(Longs.toByteArray(x));
+ containerPipelines.add(pipeline);
+ }
+ return containerPipelines;
+ }
+
+ /**
+ * This test creates a cache and performs a simple write / read.
+ * Due to the cache - we have Read-after-write consistency for cBlocks.
+ *
+ * @throws IOException throws Exception
+ */
+ @Test
+ public void testCacheWriteRead() throws IOException,
+ InterruptedException, TimeoutException {
+ final long blockID = 0;
+ String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+ String userName = "user" + RandomStringUtils.randomNumeric(4);
+ String data = RandomStringUtils.random(4 * KB);
+ String dataHash = DigestUtils.sha256Hex(data);
+ CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+ ContainerCacheFlusher flusher = new ContainerCacheFlusher(this.config,
+ xceiverClientManager, metrics);
+ CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+ .setConfiguration(this.config)
+ .setVolumeName(volumeName)
+ .setUserName(userName)
+ .setPipelines(getContainerPipeline(10))
+ .setClientManager(xceiverClientManager)
+ .setBlockSize(4 * KB)
+ .setVolumeSize(50 * GB)
+ .setFlusher(flusher)
+ .setCBlockTargetMetrics(metrics)
+ .build();
+ cache.put(blockID, data.getBytes(StandardCharsets.UTF_8));
+ Assert.assertEquals(1, metrics.getNumWriteOps());
+ // Please note that this read is from the local cache.
+ LogicalBlock block = cache.get(blockID);
+ Assert.assertEquals(1, metrics.getNumReadOps());
+ Assert.assertEquals(1, metrics.getNumReadCacheHits());
+ Assert.assertEquals(0, metrics.getNumReadCacheMiss());
+ Assert.assertEquals(0, metrics.getNumReadLostBlocks());
+
+ cache.put(blockID + 1, data.getBytes(StandardCharsets.UTF_8));
+ Assert.assertEquals(2, metrics.getNumWriteOps());
+ // Please note that this read is from the local cache.
+ block = cache.get(blockID + 1);
+ Assert.assertEquals(2, metrics.getNumReadOps());
+ Assert.assertEquals(2, metrics.getNumReadCacheHits());
+ Assert.assertEquals(0, metrics.getNumReadCacheMiss());
+ Assert.assertEquals(0, metrics.getNumReadLostBlocks());
+ String readHash = DigestUtils.sha256Hex(block.getData().array());
+ Assert.assertEquals("File content does not match.", dataHash, readHash);
+ GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
+ cache.close();
+
+ }
+
+ @Test
+ public void testCacheWriteToRemoteContainer() throws IOException,
+ InterruptedException, TimeoutException {
+ final long blockID = 0;
+ String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+ String userName = "user" + RandomStringUtils.randomNumeric(4);
+ String data = RandomStringUtils.random(4 * KB);
+ CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+ ContainerCacheFlusher flusher = new ContainerCacheFlusher(this.config,
+ xceiverClientManager, metrics);
+ CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+ .setConfiguration(this.config)
+ .setVolumeName(volumeName)
+ .setUserName(userName)
+ .setPipelines(getContainerPipeline(10))
+ .setClientManager(xceiverClientManager)
+ .setBlockSize(4 * KB)
+ .setVolumeSize(50 * GB)
+ .setFlusher(flusher)
+ .setCBlockTargetMetrics(metrics)
+ .build();
+ cache.put(blockID, data.getBytes(StandardCharsets.UTF_8));
+ GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
+ cache.close();
+ }
+
+ @Test
+ public void testCacheWriteToRemote50KBlocks() throws IOException,
+ InterruptedException, TimeoutException {
+ final long totalBlocks = 50 * 1000;
+ String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+ String userName = "user" + RandomStringUtils.randomNumeric(4);
+ String data = RandomStringUtils.random(4 * KB);
+ CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+ ContainerCacheFlusher flusher = new ContainerCacheFlusher(this.config,
+ xceiverClientManager, metrics);
+ CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+ .setConfiguration(this.config)
+ .setVolumeName(volumeName)
+ .setUserName(userName)
+ .setPipelines(getContainerPipeline(10))
+ .setClientManager(xceiverClientManager)
+ .setBlockSize(4 * 1024)
+ .setVolumeSize(50 * GB)
+ .setFlusher(flusher)
+ .setCBlockTargetMetrics(metrics)
+ .build();
+ long startTime = Time.monotonicNow();
+ for (long blockid = 0; blockid < totalBlocks; blockid++) {
+ cache.put(blockid, data.getBytes(StandardCharsets.UTF_8));
+ }
+ Assert.assertEquals(totalBlocks, metrics.getNumWriteOps());
+ LOG.info("Wrote 50K blocks, waiting for replication to finish.");
+ GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
+ long endTime = Time.monotonicNow();
+ LOG.info("Time taken for writing {} blocks is {} seconds", totalBlocks,
+ TimeUnit.MILLISECONDS.toSeconds(endTime - startTime));
+ // TODO: Read this data back.
+ cache.close();
+ }
+
+ @Test
+ public void testCacheInvalidBlock() throws IOException {
+ final int blockID = 1024;
+ String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+ String userName = "user" + RandomStringUtils.randomNumeric(4);
+ CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+ ContainerCacheFlusher flusher = new ContainerCacheFlusher(this.config,
+ xceiverClientManager, metrics);
+ CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+ .setConfiguration(this.config)
+ .setVolumeName(volumeName)
+ .setUserName(userName)
+ .setPipelines(getContainerPipeline(10))
+ .setClientManager(xceiverClientManager)
+ .setBlockSize(4 * KB)
+ .setVolumeSize(50 * GB)
+ .setFlusher(flusher)
+ .setCBlockTargetMetrics(metrics)
+ .build();
+ // Read a non-existent block ID.
+ LogicalBlock block = cache.get(blockID);
+ Assert.assertNotNull(block);
+ Assert.assertEquals(4 * 1024, block.getData().array().length);
+ Assert.assertEquals(1, metrics.getNumReadOps());
+ Assert.assertEquals(1, metrics.getNumReadLostBlocks());
+ Assert.assertEquals(1, metrics.getNumReadCacheMiss());
+ }
+
+ @Test
+ public void testReadWriteCorrectness() throws IOException,
+ InterruptedException, TimeoutException {
+ Random r = new Random();
+ final int maxBlock = 12500000;
+ final int blockCount = 10 * 1000;
+ Map blockShaMap = new HashMap<>();
+ List pipelines = getContainerPipeline(10);
+ String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+ String userName = "user" + RandomStringUtils.randomNumeric(4);
+ CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+ ContainerCacheFlusher flusher = new ContainerCacheFlusher(this.config,
+ xceiverClientManager, metrics);
+ final CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+ .setConfiguration(this.config)
+ .setVolumeName(volumeName)
+ .setUserName(userName)
+ .setPipelines(pipelines)
+ .setClientManager(xceiverClientManager)
+ .setBlockSize(4 * KB)
+ .setVolumeSize(50 * GB)
+ .setFlusher(flusher)
+ .setCBlockTargetMetrics(metrics)
+ .build();
+ for (int x = 0; x < blockCount; x++) {
+ String data = RandomStringUtils.random(4 * 1024);
+ String dataHash = DigestUtils.sha256Hex(data);
+ long blockId = abs(r.nextInt(maxBlock));
+ blockShaMap.put(blockId, dataHash);
+ cache.put(blockId, data.getBytes(StandardCharsets.UTF_8));
+ }
+ Assert.assertEquals(blockCount, metrics.getNumWriteOps());
+ GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
+ LOG.info("Finished with putting blocks ..starting reading blocks back. " +
+ "unique blocks : {}", blockShaMap.size());
+ // Test reading from local cache.
+ for (Map.Entry entry : blockShaMap.entrySet()) {
+ LogicalBlock block = cache.get(entry.getKey());
+ String blockSha = DigestUtils.sha256Hex(block.getData().array());
+ Assert.assertEquals("Block data is not equal", entry.getValue(),
+ blockSha);
+ }
+ Assert.assertEquals(blockShaMap.size(), metrics.getNumReadOps());
+ Assert.assertEquals(blockShaMap.size(), metrics.getNumReadCacheHits());
+ Assert.assertEquals(0, metrics.getNumReadCacheMiss());
+ Assert.assertEquals(0, metrics.getNumReadLostBlocks());
+
+ LOG.info("Finished with reading blocks, SUCCESS.");
+ // Close and discard local cache.
+ cache.close();
+ LOG.info("Closing the and destroying local cache");
+ CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
+ ContainerCacheFlusher newflusher = new ContainerCacheFlusher(this.config,
+ xceiverClientManager, newMetrics);
+ Assert.assertEquals(0, newMetrics.getNumReadCacheHits());
+ CBlockLocalCache newCache = null;
+ try {
+ newCache = CBlockLocalCache.newBuilder()
+ .setConfiguration(this.config)
+ .setVolumeName(volumeName)
+ .setUserName(userName)
+ .setPipelines(pipelines)
+ .setClientManager(xceiverClientManager)
+ .setBlockSize(4 * KB)
+ .setVolumeSize(50 * GB)
+ .setFlusher(newflusher)
+ .setCBlockTargetMetrics(newMetrics)
+ .build();
+
+ for (Map.Entry entry : blockShaMap.entrySet()) {
+ LogicalBlock block = newCache.get(entry.getKey());
+ String blockSha = DigestUtils.sha256Hex(block.getData().array());
+ Assert.assertEquals("Block data is not equal", entry.getValue(),
+ blockSha);
+ }
+
+ Assert.assertEquals(blockShaMap.size(), newMetrics.getNumReadOps());
+ Assert.assertEquals(blockShaMap.size(), newMetrics.getNumReadCacheHits());
+ Assert.assertEquals(0, newMetrics.getNumReadCacheMiss());
+ Assert.assertEquals(0, newMetrics.getNumReadLostBlocks());
+
+ LOG.info("Finished with reading blocks from remote cache, SUCCESS.");
+ } finally {
+ if (newCache != null) {
+ newCache.close();
+ }
+ }
+ }
+
+ @Test
+ public void testStorageImplReadWrite() throws IOException,
+ InterruptedException, TimeoutException {
+ String userName = "user" + RandomStringUtils.randomNumeric(5);
+ String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+ long volumeSize = 50L * (1024L * 1024L * 1024L);
+ int blockSize = 4096;
+ byte[] data =
+ RandomStringUtils.randomAlphanumeric(10 * (1024 * 1024))
+ .getBytes(StandardCharsets.UTF_8);
+ String hash = DigestUtils.sha256Hex(data);
+ CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+ ContainerCacheFlusher flusher = new ContainerCacheFlusher(this.config,
+ xceiverClientManager, metrics);
+ CBlockIStorageImpl ozoneStore = CBlockIStorageImpl.newBuilder()
+ .setUserName(userName)
+ .setVolumeName(volumeName)
+ .setVolumeSize(volumeSize)
+ .setBlockSize(blockSize)
+ .setContainerList(getContainerPipeline(10))
+ .setClientManager(xceiverClientManager)
+ .setConf(this.config)
+ .setFlusher(flusher)
+ .setCBlockTargetMetrics(metrics)
+ .build();
+ ozoneStore.write(data, 0);
+
+ byte[] newData = new byte[10 * 1024 * 1024];
+ ozoneStore.read(newData, 0);
+ String newHash = DigestUtils.sha256Hex(newData);
+ Assert.assertEquals("hashes don't match.", hash, newHash);
+ GenericTestUtils.waitFor(() -> !ozoneStore.getCache().isDirtyCache(),
+ 100, 20 * 1000);
+ ozoneStore.close();
+ }
+
+ //@Test
+ // Disabling this test for time being since the bug in JSCSI
+ // forces us always to have a local cache.
+ public void testStorageImplNoLocalCache() throws IOException,
+ InterruptedException, TimeoutException {
+ OzoneConfiguration oConfig = new OzoneConfiguration();
+ oConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false);
+ oConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+ String userName = "user" + RandomStringUtils.randomNumeric(5);
+ String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+ long volumeSize = 50L * (1024L * 1024L * 1024L);
+ int blockSize = 4096;
+ byte[] data =
+ RandomStringUtils.randomAlphanumeric(10 * (1024 * 1024))
+ .getBytes(StandardCharsets.UTF_8);
+ String hash = DigestUtils.sha256Hex(data);
+ CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+ ContainerCacheFlusher flusher = new ContainerCacheFlusher(oConfig,
+ xceiverClientManager, metrics);
+ CBlockIStorageImpl ozoneStore = CBlockIStorageImpl.newBuilder()
+ .setUserName(userName)
+ .setVolumeName(volumeName)
+ .setVolumeSize(volumeSize)
+ .setBlockSize(blockSize)
+ .setContainerList(getContainerPipeline(10))
+ .setClientManager(xceiverClientManager)
+ .setConf(oConfig)
+ .setFlusher(flusher)
+ .setCBlockTargetMetrics(metrics)
+ .build();
+ ozoneStore.write(data, 0);
+
+ byte[] newData = new byte[10 * 1024 * 1024];
+ ozoneStore.read(newData, 0);
+ String newHash = DigestUtils.sha256Hex(newData);
+ Assert.assertEquals("hashes don't match.", hash, newHash);
+ GenericTestUtils.waitFor(() -> !ozoneStore.getCache().isDirtyCache(),
+ 100, 20 * 1000);
+ ozoneStore.close();
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestStorageImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestStorageImpl.java
deleted file mode 100644
index b440ae0078a..00000000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestStorageImpl.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.cblock;
-
-import com.google.common.primitives.Longs;
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.commons.lang.RandomStringUtils;
-import org.apache.hadoop.cblock.jscsiHelper.CBlockIStorageImpl;
-import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics;
-import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.OzoneConfiguration;
-import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
-import org.apache.hadoop.scm.XceiverClientManager;
-import org.apache.hadoop.scm.XceiverClientSpi;
-import org.apache.hadoop.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.util.LinkedList;
-import java.util.List;
-
-import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_DISK_CACHE_PATH_KEY;
-import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_TRACE_IO;
-
-/**
- * This class tests the cblock storage layer.
- */
-public class TestStorageImpl {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(TestStorageImpl.class);
- private final static long GB = 1024 * 1024 * 1024;
- private final static int KB = 1024;
- private static MiniOzoneCluster cluster;
- private static OzoneConfiguration config;
- private static StorageContainerLocationProtocolClientSideTranslatorPB
- storageContainerLocationClient;
- private static XceiverClientManager xceiverClientManager;
-
- @BeforeClass
- public static void init() throws IOException {
- config = new OzoneConfiguration();
- URL p = config.getClass().getResource("");
- String path = p.getPath().concat(
- TestOzoneContainer.class.getSimpleName());
- config.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
- config.setBoolean(DFS_CBLOCK_TRACE_IO, true);
- cluster = new MiniOzoneCluster.Builder(config)
- .numDataNodes(1).setHandlerType("distributed").build();
- storageContainerLocationClient = cluster
- .createStorageContainerLocationClient();
- xceiverClientManager = new XceiverClientManager(config);
- }
-
- @AfterClass
- public static void shutdown() throws InterruptedException {
- if (cluster != null) {
- cluster.shutdown();
- }
- IOUtils.cleanup(null, storageContainerLocationClient, cluster);
- }
-
- /**
- * getContainerPipelines creates a set of containers and returns the
- * Pipelines that define those containers.
- *
- * @param count - Number of containers to create.
- * @return - List of Pipelines.
- * @throws IOException
- */
- private List getContainerPipeline(int count) throws IOException {
- List containerPipelines = new LinkedList<>();
- for (int x = 0; x < count; x++) {
- String traceID = "trace" + RandomStringUtils.randomNumeric(4);
- String containerName = "container" + RandomStringUtils.randomNumeric(10);
- Pipeline pipeline =
- storageContainerLocationClient.allocateContainer(containerName);
- XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
- ContainerProtocolCalls.createContainer(client, traceID);
- // This step is needed since we set private data on pipelines, when we
- // read the list from CBlockServer. So we mimic that action here.
- pipeline.setData(Longs.toByteArray(x));
- containerPipelines.add(pipeline);
- }
- return containerPipelines;
- }
-
- @Test
- public void testStorageImplBasicReadWrite() throws Exception {
- OzoneConfiguration oConfig = new OzoneConfiguration();
- String userName = "user" + RandomStringUtils.randomNumeric(5);
- String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
- long volumeSize = 50L * (1024L * 1024L * 1024L);
- int blockSize = 4096;
- byte[] data =
- RandomStringUtils.randomAlphanumeric(10 * (1024 * 1024))
- .getBytes(StandardCharsets.UTF_8);
- String hash = DigestUtils.sha256Hex(data);
- CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
- ContainerCacheFlusher flusher = new ContainerCacheFlusher(oConfig,
- xceiverClientManager, metrics);
- CBlockIStorageImpl ozoneStore = CBlockIStorageImpl.newBuilder()
- .setUserName(userName)
- .setVolumeName(volumeName)
- .setVolumeSize(volumeSize)
- .setBlockSize(blockSize)
- .setContainerList(getContainerPipeline(10))
- .setClientManager(xceiverClientManager)
- .setConf(oConfig)
- .setFlusher(flusher)
- .setCBlockTargetMetrics(metrics)
- .build();
- ozoneStore.write(data, 0);
-
- // Currently, local cache is a placeholder and does not actually handle
- // read and write. So the below write is guaranteed to fail. After
- // CBlockLocalCache is properly implemented, we should uncomment the
- // following lines
- // TODO uncomment the following.
-
- //byte[] newData = new byte[10 * 1024 * 1024];
- //ozoneStore.read(newData, 0);
- //String newHash = DigestUtils.sha256Hex(newData);
- //Assert.assertEquals("hashes don't match.", hash, newHash);
- GenericTestUtils.waitFor(() -> !ozoneStore.getCache().isDirtyCache(),
- 100, 20 *
- 1000);
-
- ozoneStore.close();
- }
-}