Revert "Block Storage: volume creation times out while creating 3TB volume because of too many containers. Contributed by Mukul Kumar Singh." to fix commit message.

This reverts commit 087c69ba24.
This commit is contained in:
Chen Liang 2017-09-26 08:27:42 -07:00 committed by Owen O'Malley
parent d303b7f40f
commit dddded0016
9 changed files with 82 additions and 212 deletions

View File

@ -172,21 +172,6 @@ public final class CBlockConfigKeys {
public static final int DFS_CBLOCK_CACHE_MAX_RETRY_DEFAULT =
64 * 1024;
/**
* Cblock CLI configs.
*/
public static final String DFS_CBLOCK_MANAGER_POOL_SIZE =
"dfs.cblock.manager.pool.size";
public static final int DFS_CBLOCK_MANAGER_POOL_SIZE_DEFAULT = 16;
/**
* currently the largest supported volume is about 8TB, which might take
* > 20 seconds to finish creating containers. thus set timeout to 30 sec.
*/
public static final String DFS_CBLOCK_RPC_TIMEOUT_SECONDS =
"dfs.cblock.rpc.timeout.seconds";
public static final int DFS_CBLOCK_RPC_TIMEOUT_SECONDS_DEFAULT = 300;
private CBlockConfigKeys() {
}

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.cblock.client;
import org.apache.hadoop.cblock.CBlockConfigKeys;
import org.apache.hadoop.cblock.meta.VolumeInfo;
import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
import org.apache.hadoop.io.retry.RetryPolicies;
@ -37,25 +36,32 @@ import java.util.concurrent.TimeUnit;
*/
public class CBlockVolumeClient {
private final CBlockServiceProtocolClientSideTranslatorPB cblockClient;
private final OzoneConfiguration conf;
public CBlockVolumeClient(OzoneConfiguration conf) throws IOException {
this(conf, null);
this.conf = conf;
long version = RPC.getProtocolVersion(CBlockServiceProtocolPB.class);
InetSocketAddress address = OzoneClientUtils.getCblockServiceRpcAddr(conf);
// currently the largest supported volume is about 8TB, which might take
// > 20 seconds to finish creating containers. thus set timeout to 30 sec.
cblockClient = new CBlockServiceProtocolClientSideTranslatorPB(
RPC.getProtocolProxy(CBlockServiceProtocolPB.class, version,
address, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf), 30000, RetryPolicies
.retryUpToMaximumCountWithFixedSleep(300, 1, TimeUnit
.SECONDS)).getProxy());
}
public CBlockVolumeClient(OzoneConfiguration conf,
InetSocketAddress serverAddress) throws IOException {
InetSocketAddress address = serverAddress != null ? serverAddress :
OzoneClientUtils.getCblockServiceRpcAddr(conf);
this.conf = conf;
long version = RPC.getProtocolVersion(CBlockServiceProtocolPB.class);
int rpcTimeout =
conf.getInt(CBlockConfigKeys.DFS_CBLOCK_RPC_TIMEOUT_SECONDS,
CBlockConfigKeys.DFS_CBLOCK_RPC_TIMEOUT_SECONDS_DEFAULT) * 1000;
cblockClient = new CBlockServiceProtocolClientSideTranslatorPB(
RPC.getProtocolProxy(CBlockServiceProtocolPB.class, version,
address, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf), rpcTimeout, RetryPolicies
.retryUpToMaximumCountWithFixedSleep(
300, 1, TimeUnit.SECONDS)).getProxy());
serverAddress, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf), 30000, RetryPolicies
.retryUpToMaximumCountWithFixedSleep(300, 1, TimeUnit
.SECONDS)).getProxy());
}
public void createVolume(String userName, String volumeName,

View File

@ -77,7 +77,6 @@ public class BlockWriterTask implements Runnable {
String containerName = null;
XceiverClientSpi client = null;
LevelDBStore levelDBStore = null;
String traceID = flusher.getTraceID(new File(dbPath), block.getBlockID());
flusher.getLOG().debug(
"Writing block to remote. block ID: {}", block.getBlockID());
try {
@ -95,7 +94,8 @@ public class BlockWriterTask implements Runnable {
Preconditions.checkState(data.length > 0, "Block data is zero length");
startTime = Time.monotonicNow();
ContainerProtocolCalls.writeSmallFile(client, containerName,
Long.toString(block.getBlockID()), data, traceID);
Long.toString(block.getBlockID()), data,
flusher.getTraceID(new File(dbPath), block.getBlockID()));
endTime = Time.monotonicNow();
flusher.getTargetMetrics().updateContainerWriteLatency(
endTime - startTime);
@ -107,7 +107,7 @@ public class BlockWriterTask implements Runnable {
} catch (Exception ex) {
flusher.getLOG().error("Writing of block:{} failed, We have attempted " +
"to write this block {} times to the container {}.Trace ID:{}",
block.getBlockID(), this.getTryCount(), containerName, traceID, ex);
block.getBlockID(), this.getTryCount(), containerName, "", ex);
writeRetryBlock(block);
if (ex instanceof IOException) {
flusher.getTargetMetrics().incNumWriteIOExceptionRetryBlocks();

View File

@ -151,7 +151,6 @@ public class AsyncBlockWriter {
*/
public void writeBlock(LogicalBlock block) throws IOException {
byte[] keybuf = Longs.toByteArray(block.getBlockID());
String traceID = parentCache.getTraceID(block.getBlockID());
if (parentCache.isShortCircuitIOEnabled()) {
long startTime = Time.monotonicNow();
getCacheDB().put(keybuf, block.getData().array());
@ -177,7 +176,7 @@ public class AsyncBlockWriter {
.acquireClient(parentCache.getPipeline(block.getBlockID()));
ContainerProtocolCalls.writeSmallFile(client, containerName,
Long.toString(block.getBlockID()), block.getData().array(),
traceID);
parentCache.getTraceID(block.getBlockID()));
long endTime = Time.monotonicNow();
if (parentCache.isTraceEnabled()) {
String datahash = DigestUtils.sha256Hex(block.getData().array());
@ -190,9 +189,8 @@ public class AsyncBlockWriter {
parentCache.getTargetMetrics().incNumDirectBlockWrites();
} catch (Exception ex) {
parentCache.getTargetMetrics().incNumFailedDirectBlockWrites();
LOG.error("Direct I/O writing of block:{} traceID:{} to "
+ "container {} failed", block.getBlockID(), traceID,
containerName, ex);
LOG.error("Direct I/O writing of block:{} to container {} failed",
block.getBlockID(), containerName, ex);
throw ex;
} finally {
if (client != null) {

View File

@ -29,7 +29,6 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* The internal representation maintained by CBlock server as the info for
@ -54,7 +53,7 @@ public class VolumeDescriptor {
private static final Logger LOG =
LoggerFactory.getLogger(VolumeDescriptor.class);
private ConcurrentHashMap<String, ContainerDescriptor> containerMap;
private HashMap<String, ContainerDescriptor> containerMap;
private String userName;
private int blockSize;
private long volumeSize;
@ -73,12 +72,13 @@ public class VolumeDescriptor {
* and set*() methods are for the same purpose also.
*/
public VolumeDescriptor() {
this(null, null, 0, 0);
containerMap = new HashMap<>();
containerIdOrdered = new ArrayList<>();
}
public VolumeDescriptor(String userName, String volumeName, long volumeSize,
int blockSize) {
this.containerMap = new ConcurrentHashMap<>();
this.containerMap = new HashMap<>();
this.userName = userName;
this.volumeName = volumeName;
this.blockSize = blockSize;

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.cblock.storage;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.cblock.CBlockConfigKeys;
import org.apache.hadoop.cblock.exception.CBlockException;
import org.apache.hadoop.cblock.meta.ContainerDescriptor;
import org.apache.hadoop.cblock.meta.VolumeDescriptor;
@ -38,27 +37,25 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* This class maintains the key space of CBlock, more specifically, the
* volume to container mapping. The core data structure
* is a map from users to their volumes info, where volume info is a handler
* to a volume, containing information for IO on that volume and a storage
* client responsible for talking to the SCM.
* to a volume, containing information for IO on that volume.
*
* and a storage client responsible for talking to the SCM
*
* TODO : all the volume operations are fully serialized, which can potentially
* be optimized.
*
* TODO : if the certain operations (e.g. create) failed, the failure-handling
* logic may not be properly implemented currently.
*/
public class StorageManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(StorageManager.class);
private final ScmClient storageClient;
private final int numThreads;
private static final int MAX_THREADS =
Runtime.getRuntime().availableProcessors() * 2;
private static final int MAX_QUEUE_CAPACITY = 1024;
/**
* We will NOT have the situation where same kv pair getting
* processed, but it is possible to have multiple kv pair being
@ -81,9 +78,6 @@ public class StorageManager {
this.storageClient = storageClient;
this.user2VolumeMap = new ConcurrentHashMap<>();
this.containerSizeB = storageClient.getContainerSize(null);
this.numThreads =
ozoneConfig.getInt(CBlockConfigKeys.DFS_CBLOCK_MANAGER_POOL_SIZE,
CBlockConfigKeys.DFS_CBLOCK_MANAGER_POOL_SIZE_DEFAULT);
}
/**
@ -155,127 +149,6 @@ public class StorageManager {
makeVolumeReady(userName, volumeName, volumeDescriptor);
}
private class CreateContainerTask implements Runnable {
private final VolumeDescriptor volume;
private final int containerIdx;
private final ArrayList<String> containerIds;
private final AtomicInteger numFailed;
CreateContainerTask(VolumeDescriptor volume, int containerIdx,
ArrayList<String> containerIds,
AtomicInteger numFailed) {
this.volume = volume;
this.containerIdx = containerIdx;
this.containerIds = containerIds;
this.numFailed = numFailed;
}
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see Thread#run()
*/
public void run() {
ContainerDescriptor container = null;
try {
Pipeline pipeline = storageClient.createContainer(
OzoneProtos.ReplicationType.STAND_ALONE,
OzoneProtos.ReplicationFactor.ONE,
KeyUtil.getContainerName(volume.getUserName(),
volume.getVolumeName(), containerIdx));
container = new ContainerDescriptor(pipeline.getContainerName());
container.setPipeline(pipeline);
container.setContainerIndex(containerIdx);
volume.addContainer(container);
containerIds.set(containerIdx, container.getContainerID());
} catch (Exception e) {
numFailed.incrementAndGet();
if (container != null) {
LOGGER.error("Error creating container Container:{}:" +
" index:{} error:{}", container.getContainerID(),
containerIdx, e);
}
}
}
}
private boolean createVolumeContainers(VolumeDescriptor volume) {
ArrayList<String> containerIds = new ArrayList<>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(numThreads,
MAX_THREADS, 1, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(MAX_QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy());
AtomicInteger numFailedCreates = new AtomicInteger(0);
long allocatedSize = 0;
int containerIdx = 0;
while (allocatedSize < volume.getVolumeSize()) {
// adding null to allocate space in ArrayList
containerIds.add(containerIdx, null);
Runnable task = new CreateContainerTask(volume, containerIdx,
containerIds, numFailedCreates);
executor.submit(task);
allocatedSize += containerSizeB;
containerIdx += 1;
}
// issue the command and then wait for it to finish
executor.shutdown();
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
} catch (InterruptedException e) {
LOGGER.error("Error creating volume:{} error:{}",
volume.getVolumeName(), e);
executor.shutdownNow();
Thread.currentThread().interrupt();
}
volume.setContainerIDs(containerIds);
return numFailedCreates.get() == 0;
}
private void deleteContainer(String containerID, boolean force) {
try {
Pipeline pipeline = storageClient.getContainer(containerID);
storageClient.deleteContainer(pipeline, force);
} catch (Exception e) {
LOGGER.error("Error deleting container Container:{} error:{}",
containerID, e);
}
}
private void deleteVolumeContainers(List<String> containers, boolean force)
throws CBlockException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(numThreads,
MAX_THREADS, 1, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(MAX_QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy());
for (String deleteContainer : containers) {
if (deleteContainer != null) {
Runnable task = () -> deleteContainer(deleteContainer, force);
executor.submit(task);
}
}
// issue the command and then wait for it to finish
executor.shutdown();
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
} catch (InterruptedException e) {
LOGGER.error("Error deleting containers error:{}", e);
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
/**
* Called by CBlock server when creating a fresh volume. The core
@ -299,13 +172,31 @@ public class StorageManager {
throw new CBlockException("Volume size smaller than block size? " +
"volume size:" + volumeSize + " block size:" + blockSize);
}
VolumeDescriptor volume
= new VolumeDescriptor(userName, volumeName, volumeSize, blockSize);
boolean success = createVolumeContainers(volume);
if (!success) {
// cleanup the containers and throw the exception
deleteVolumeContainers(volume.getContainerIDsList(), true);
throw new CBlockException("Error when creating volume:" + volumeName);
VolumeDescriptor volume;
int containerIdx = 0;
try {
volume = new VolumeDescriptor(userName, volumeName,
volumeSize, blockSize);
long allocatedSize = 0;
ArrayList<String> containerIds = new ArrayList<>();
while (allocatedSize < volumeSize) {
Pipeline pipeline = storageClient.createContainer(OzoneProtos
.ReplicationType.STAND_ALONE,
OzoneProtos.ReplicationFactor.ONE,
KeyUtil.getContainerName(userName, volumeName, containerIdx));
ContainerDescriptor container =
new ContainerDescriptor(pipeline.getContainerName());
container.setPipeline(pipeline);
container.setContainerIndex(containerIdx);
volume.addContainer(container);
containerIds.add(container.getContainerID());
allocatedSize += containerSizeB;
containerIdx += 1;
}
volume.setContainerIDs(containerIds);
} catch (IOException e) {
throw new CBlockException("Error when creating volume:" + e.getMessage());
// TODO : delete already created containers? or re-try policy
}
makeVolumeReady(userName, volumeName, volume);
}
@ -332,7 +223,16 @@ public class StorageManager {
throw new CBlockException("Deleting a non-empty volume without force!");
}
VolumeDescriptor volume = user2VolumeMap.get(userName).remove(volumeName);
deleteVolumeContainers(volume.getContainerIDsList(), force);
for (String containerID : volume.getContainerIDsList()) {
try {
Pipeline pipeline = storageClient.getContainer(containerID);
storageClient.deleteContainer(pipeline, force);
} catch (IOException e) {
LOGGER.error("Error deleting container Container:{} error:{}",
containerID, e);
throw new CBlockException(e.getMessage());
}
}
if (user2VolumeMap.get(userName).size() == 0) {
user2VolumeMap.remove(userName);
}

View File

@ -897,22 +897,6 @@
</description>
</property>
<property>
<name>dfs.cblock.manager.pool.size</name>
<value>16</value>
<description>
Number of threads that cblock manager will use for container operations.
</description>
</property>
<property>
<name>dfs.cblock.rpc.timeout.seconds</name>
<value>300</value>
<description>
RPC timeout in seconds used for cblock CLI operations.
</description>
</property>
<property>
<name>dfs.cblock.scm.ipaddress</name>
<value>127.0.0.1</value>

View File

@ -22,7 +22,7 @@ import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.HashMap;
/**
* NOTE : This class is only for testing purpose.
@ -34,8 +34,8 @@ import java.util.concurrent.ConcurrentHashMap;
* This is to allow volume creation call and perform standalone tests.
*/
public final class ContainerLookUpService {
private static ConcurrentHashMap<String, ContainerDescriptor>
containers = new ConcurrentHashMap<>();
private static HashMap<String, ContainerDescriptor>
containers = new HashMap<>();
/**
* Return an *existing* container with given Id.

View File

@ -27,7 +27,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* This class is the one that directly talks to SCM server.
@ -38,8 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
*
*/
public class MockStorageClient implements ScmClient {
private static AtomicInteger currentContainerId =
new AtomicInteger(0);
private static long currentContainerId = -1;
/**
* Ask SCM to get a exclusive container.
@ -50,9 +48,9 @@ public class MockStorageClient implements ScmClient {
@Override
public Pipeline createContainer(String containerId)
throws IOException {
int contId = currentContainerId.getAndIncrement();
ContainerLookUpService.addContainer(Long.toString(contId));
return ContainerLookUpService.lookUp(Long.toString(contId))
currentContainerId += 1;
ContainerLookUpService.addContainer(Long.toString(currentContainerId));
return ContainerLookUpService.lookUp(Long.toString(currentContainerId))
.getPipeline();
}
@ -128,11 +126,10 @@ public class MockStorageClient implements ScmClient {
public Pipeline createContainer(OzoneProtos.ReplicationType type,
OzoneProtos.ReplicationFactor replicationFactor, String containerId)
throws IOException {
int contId = currentContainerId.getAndIncrement();
ContainerLookUpService.addContainer(Long.toString(contId));
return ContainerLookUpService.lookUp(Long.toString(contId))
.getPipeline();
}
currentContainerId += 1;
ContainerLookUpService.addContainer(Long.toString(currentContainerId));
return ContainerLookUpService.lookUp(Long.toString(currentContainerId))
.getPipeline(); }
/**
* Returns a set of Nodes that meet a query criteria.