HDFS-12966. Ozone: owner name should be set properly when the container allocation happens. Contributed by Shashikant Banerjee.

This commit is contained in:
Xiaoyu Yao 2018-01-10 11:50:07 -08:00 committed by Owen O'Malley
parent 5e31b920f0
commit 98973cd0c4
39 changed files with 206 additions and 180 deletions

View File

@ -82,14 +82,14 @@ public static void setContainerSizeB(long size) {
* @inheritDoc
*/
@Override
public Pipeline createContainer(String containerId)
public Pipeline createContainer(String containerId, String owner)
throws IOException {
XceiverClientSpi client = null;
try {
Pipeline pipeline =
storageContainerLocationClient.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerId);
xceiverClientManager.getFactor(), containerId, owner);
client = xceiverClientManager.acquireClient(pipeline);
// Allocated State means that SCM has allocated this pipeline in its
@ -187,13 +187,13 @@ private void createPipeline(XceiverClientSpi client, Pipeline pipeline)
@Override
public Pipeline createContainer(OzoneProtos.ReplicationType type,
OzoneProtos.ReplicationFactor factor,
String containerId) throws IOException {
String containerId, String owner) throws IOException {
XceiverClientSpi client = null;
try {
// allocate container on SCM.
Pipeline pipeline =
storageContainerLocationClient.allocateContainer(type, factor,
containerId);
containerId, owner);
client = xceiverClientManager.acquireClient(pipeline);
// Allocated State means that SCM has allocated this pipeline in its

View File

@ -45,7 +45,7 @@ public interface ScmClient {
* @return Pipeline
* @throws IOException
*/
Pipeline createContainer(String containerId) throws IOException;
Pipeline createContainer(String containerId, String owner) throws IOException;
/**
* Gets a container by Name -- Throws if the container does not exist.
@ -112,8 +112,8 @@ List<ContainerInfo> listContainer(String startName, String prefixName,
* @throws IOException - in case of error.
*/
Pipeline createContainer(OzoneProtos.ReplicationType type,
OzoneProtos.ReplicationFactor replicationFactor, String containerId)
throws IOException;
OzoneProtos.ReplicationFactor replicationFactor, String containerId,
String owner) throws IOException;
/**
* Returns a set of Nodes that meet a query criteria.

View File

@ -38,7 +38,7 @@ public class ContainerInfo
private long lastUsed;
// The wall-clock ms since the epoch at which the current state enters.
private long stateEnterTime;
private OzoneProtos.Owner owner;
private String owner;
private String containerName;
ContainerInfo(
@ -49,7 +49,7 @@ public class ContainerInfo
long usedBytes,
long numberOfKeys,
long stateEnterTime,
OzoneProtos.Owner owner) {
String owner) {
this.containerName = containerName;
this.pipeline = pipeline;
this.allocatedBytes = allocatedBytes;
@ -143,11 +143,11 @@ public OzoneProtos.SCMContainerInfo getProtobuf() {
return builder.build();
}
public OzoneProtos.Owner getOwner() {
public String getOwner() {
return owner;
}
public void setOwner(OzoneProtos.Owner owner) {
public void setOwner(String owner) {
this.owner = owner;
}
@ -241,7 +241,7 @@ public static class Builder {
private long used;
private long keys;
private long stateEnterTime;
private OzoneProtos.Owner owner;
private String owner;
private String containerName;
public Builder setState(OzoneProtos.LifeCycleState lifeCycleState) {
@ -274,7 +274,7 @@ public Builder setStateEnterTime(long time) {
return this;
}
public Builder setOwner(OzoneProtos.Owner containerOwner) {
public Builder setOwner(String containerOwner) {
this.owner = containerOwner;
return this;
}

View File

@ -53,7 +53,7 @@ public interface ScmBlockLocationProtocol {
* @throws IOException
*/
AllocatedBlock allocateBlock(long size, ReplicationType type,
ReplicationFactor factor) throws IOException;
ReplicationFactor factor, String owner) throws IOException;
/**
* Delete blocks for a set of object keys.

View File

@ -38,7 +38,7 @@ public interface StorageContainerLocationProtocol {
*
*/
Pipeline allocateContainer(OzoneProtos.ReplicationType replicationType,
OzoneProtos.ReplicationFactor factor, String containerName)
OzoneProtos.ReplicationFactor factor, String containerName, String owner)
throws IOException;
/**

View File

@ -116,16 +116,14 @@ public Set<AllocatedBlock> getBlockLocations(Set<String> keys)
* @throws IOException
*/
@Override
public AllocatedBlock allocateBlock(long size, OzoneProtos.ReplicationType
type, OzoneProtos.ReplicationFactor factor) throws IOException {
Preconditions.checkArgument(size > 0,
"block size must be greater than 0");
public AllocatedBlock allocateBlock(long size,
OzoneProtos.ReplicationType type, OzoneProtos.ReplicationFactor factor,
String owner) throws IOException {
Preconditions.checkArgument(size > 0, "block size must be greater than 0");
AllocateScmBlockRequestProto request = AllocateScmBlockRequestProto
.newBuilder()
.setSize(size).setType(type)
.setFactor(factor)
.build();
AllocateScmBlockRequestProto request =
AllocateScmBlockRequestProto.newBuilder().setSize(size).setType(type)
.setFactor(factor).setOwner(owner).build();
final AllocateScmBlockResponseProto response;
try {
response = rpcProxy.allocateScmBlock(NULL_RPC_CONTROLLER, request);

View File

@ -88,7 +88,7 @@ public StorageContainerLocationProtocolClientSideTranslatorPB(
@Override
public Pipeline allocateContainer(OzoneProtos.ReplicationType type,
OzoneProtos.ReplicationFactor factor, String
containerName) throws IOException {
containerName, String owner) throws IOException {
Preconditions.checkNotNull(containerName, "Container Name cannot be Null");
Preconditions.checkState(!containerName.isEmpty(), "Container name cannot" +
@ -97,6 +97,7 @@ public Pipeline allocateContainer(OzoneProtos.ReplicationType type,
.setContainerName(containerName)
.setReplicationFactor(factor)
.setReplicationType(type)
.setOwner(owner)
.build();
final ContainerResponseProto response;

View File

@ -132,7 +132,7 @@ message SCMContainerInfo {
required uint64 usedBytes = 5;
required uint64 numberOfKeys = 6;
optional int64 stateEnterTime = 7;
optional Owner owner = 8 [default = OZONE];
required string owner = 8;
}
message GetScmInfoRequestProto {
@ -154,10 +154,3 @@ enum ReplicationFactor {
ONE = 1;
THREE = 3;
}
enum Owner {
OZONE = 1;
CBLOCK = 2;
// In future --
//HDFS = 3;
}

View File

@ -63,6 +63,7 @@ message AllocateScmBlockRequestProto {
required uint64 size = 1;
required hadoop.hdfs.ozone.ReplicationType type = 2;
required hadoop.hdfs.ozone.ReplicationFactor factor = 3;
required string owner = 4;
}

View File

@ -39,7 +39,7 @@ message ContainerRequestProto {
// Ozone only support replciation of either 1 or 3.
required hadoop.hdfs.ozone.ReplicationFactor replicationFactor = 2;
required hadoop.hdfs.ozone.ReplicationType replicationType = 3;
optional hadoop.hdfs.ozone.Owner owner = 4 [default = OZONE];
required string owner = 4;
}

View File

@ -62,6 +62,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT;
@ -122,7 +123,11 @@ public class CBlockManager implements CBlockServiceProtocol,
public CBlockManager(OzoneConfiguration conf,
ScmClient storageClient) throws IOException {
storageManager = new StorageManager(storageClient, conf);
// Fix the cBlockManagerId generattion code here. Should support
// cBlockManager --init command which will generate a cBlockManagerId and
// persist it locally.
storageManager =
new StorageManager(storageClient, conf, "CBLOCK");
dbPath = conf.getTrimmed(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY,
DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT);

View File

@ -58,6 +58,7 @@ public class StorageManager {
private static final int MAX_THREADS =
Runtime.getRuntime().availableProcessors() * 2;
private static final int MAX_QUEUE_CAPACITY = 1024;
private final String cblockId;
/**
* We will NOT have the situation where same kv pair getting
@ -77,13 +78,14 @@ public class StorageManager {
private long containerSizeB;
public StorageManager(ScmClient storageClient,
OzoneConfiguration ozoneConfig) throws IOException {
OzoneConfiguration ozoneConfig, String cblockId) throws IOException {
this.storageClient = storageClient;
this.user2VolumeMap = new ConcurrentHashMap<>();
this.containerSizeB = storageClient.getContainerSize(null);
this.numThreads =
ozoneConfig.getInt(CBlockConfigKeys.DFS_CBLOCK_MANAGER_POOL_SIZE,
CBlockConfigKeys.DFS_CBLOCK_MANAGER_POOL_SIZE_DEFAULT);
this.cblockId = cblockId;
}
/**
@ -188,7 +190,7 @@ public void run() {
OzoneProtos.ReplicationType.STAND_ALONE,
OzoneProtos.ReplicationFactor.ONE,
KeyUtil.getContainerName(volume.getUserName(),
volume.getVolumeName(), containerIdx));
volume.getVolumeName(), containerIdx), cblockId);
container = new ContainerDescriptor(pipeline.getContainerName());

View File

@ -95,9 +95,11 @@ public class KeyManagerImpl implements KeyManager {
private final long preallocateMax;
private final Random random;
private final String ksmId;
public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
KSMMetadataManager metadataManager, OzoneConfiguration conf) {
KSMMetadataManager metadataManager, OzoneConfiguration conf,
String ksmId) {
this.scmBlockClient = scmBlockClient;
this.metadataManager = metadataManager;
this.scmBlockSize = conf.getLong(OZONE_SCM_BLOCK_SIZE_IN_MB,
@ -123,6 +125,7 @@ public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
openKeyCleanupService = new OpenKeyCleanupService(
scmBlockClient, this, openkeyCheckInterval, serviceTimeout);
random = new Random();
this.ksmId = ksmId;
}
@Override
@ -190,7 +193,7 @@ public KsmKeyLocationInfo allocateBlock(KsmKeyArgs args, int clientID)
KSMException.ResultCodes.FAILED_KEY_NOT_FOUND);
}
AllocatedBlock allocatedBlock =
scmBlockClient.allocateBlock(scmBlockSize, type, factor);
scmBlockClient.allocateBlock(scmBlockSize, type, factor, ksmId);
KsmKeyInfo keyInfo =
KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(keyData));
KsmKeyLocationInfo info = new KsmKeyLocationInfo.Builder()
@ -245,7 +248,7 @@ public OpenKeySession openKey(KsmKeyArgs args) throws IOException {
while (requestedSize > 0) {
long allocateSize = Math.min(scmBlockSize, requestedSize);
AllocatedBlock allocatedBlock =
scmBlockClient.allocateBlock(allocateSize, type, factor);
scmBlockClient.allocateBlock(allocateSize, type, factor, ksmId);
KsmKeyLocationInfo subKeyInfo = new KsmKeyLocationInfo.Builder()
.setContainerName(allocatedBlock.getPipeline().getContainerName())
.setBlockID(allocatedBlock.getKey())

View File

@ -182,8 +182,9 @@ private KeySpaceManager(OzoneConfiguration conf) throws IOException {
volumeManager = new VolumeManagerImpl(metadataManager, configuration);
bucketManager = new BucketManagerImpl(metadataManager);
metrics = KSMMetrics.create();
keyManager = new KeyManagerImpl(scmBlockClient, metadataManager,
configuration);
keyManager =
new KeyManagerImpl(scmBlockClient, metadataManager, configuration,
ksmStorage.getKsmId());
httpServer = new KeySpaceManagerHttpServer(configuration, this);
}
@ -704,7 +705,7 @@ public void commitKey(KsmKeyArgs args, int clientID)
}
@Override
public KsmKeyLocationInfo allocateBlock(KsmKeyArgs args, int clientID)
public KsmKeyLocationInfo allocateBlock(KsmKeyArgs args, int clientID)
throws IOException {
try {
metrics.incNumBlockAllocateCalls();

View File

@ -107,7 +107,7 @@ public AllocateScmBlockResponseProto allocateScmBlock(
try {
AllocatedBlock allocatedBlock =
impl.allocateBlock(request.getSize(), request.getType(),
request.getFactor());
request.getFactor(), request.getOwner());
if (allocatedBlock != null) {
return
AllocateScmBlockResponseProto.newBuilder()

View File

@ -76,7 +76,8 @@ public ContainerResponseProto allocateContainer(RpcController unused,
ContainerRequestProto request) throws ServiceException {
try {
Pipeline pipeline = impl.allocateContainer(request.getReplicationType(),
request.getReplicationFactor(), request.getContainerName());
request.getReplicationFactor(), request.getContainerName(),
request.getOwner());
return ContainerResponseProto.newBuilder()
.setPipeline(pipeline.getProtobufMessage())
.setErrorCode(ContainerResponseProto.Error.success)

View File

@ -788,16 +788,13 @@ private Set<DatanodeID> queryNodeState(NodeState nodeState) {
*/
@Override
public Pipeline allocateContainer(OzoneProtos.ReplicationType replicationType,
OzoneProtos.ReplicationFactor replicationFactor, String containerName)
throws IOException {
OzoneProtos.ReplicationFactor replicationFactor, String containerName,
String owner) throws IOException {
//TODO : FIX ME : Pass the owner argument to this function.
// This causes a lot of test change and cblock change to filing
// another JIRA to fix it.
final OzoneProtos.Owner owner = OzoneProtos.Owner.OZONE;
checkAdminAccess();
return scmContainerManager.allocateContainer(replicationType,
replicationFactor, containerName, owner).getPipeline();
return scmContainerManager
.allocateContainer(replicationType, replicationFactor, containerName,
owner).getPipeline();
}
/**
@ -1115,9 +1112,10 @@ public Set<AllocatedBlock> getBlockLocations(final Set<String> keys)
* @throws IOException
*/
@Override
public AllocatedBlock allocateBlock(long size, OzoneProtos.ReplicationType
type, OzoneProtos.ReplicationFactor factor) throws IOException {
return scmBlockManager.allocateBlock(size, type, factor);
public AllocatedBlock allocateBlock(long size,
OzoneProtos.ReplicationType type, OzoneProtos.ReplicationFactor factor,
String owner) throws IOException {
return scmBlockManager.allocateBlock(size, type, factor, owner);
}
/**

View File

@ -40,7 +40,7 @@ public interface BlockManager extends Closeable {
* @throws IOException
*/
AllocatedBlock allocateBlock(long size, OzoneProtos.ReplicationType type,
OzoneProtos.ReplicationFactor factor) throws IOException;
OzoneProtos.ReplicationFactor factor, String owner) throws IOException;
/**
* Give the key to the block, get the pipeline info.

View File

@ -22,7 +22,6 @@
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.Owner;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
import org.apache.hadoop.ozone.scm.container.Mapping;
@ -76,7 +75,6 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
// TODO : FIX ME : Hard coding the owner.
// Currently only user of the block service is Ozone, CBlock manages blocks
// by itself and does not rely on the Block service offered by SCM.
private final Owner owner = Owner.OZONE;
private final NodeManager nodeManager;
private final Mapping containerManager;
@ -178,7 +176,7 @@ public void stop() throws IOException {
* @throws IOException
*/
private void preAllocateContainers(int count, ReplicationType type,
ReplicationFactor factor)
ReplicationFactor factor, String owner)
throws IOException {
lock.lock();
try {
@ -214,8 +212,8 @@ private void preAllocateContainers(int count, ReplicationType type,
* @throws IOException on failure.
*/
@Override
public AllocatedBlock allocateBlock(
final long size, ReplicationType type, ReplicationFactor factor)
public AllocatedBlock allocateBlock(final long size,
ReplicationType type, ReplicationFactor factor, String owner)
throws IOException {
LOG.trace("Size;{} , type : {}, factor : {} ", size, type, factor);
@ -289,7 +287,7 @@ public AllocatedBlock allocateBlock(
// that most of our containers are full or we have not allocated
// containers of the type and replication factor. So let us go and
// allocate some.
preAllocateContainers(containerProvisionBatchSize, type, factor);
preAllocateContainers(containerProvisionBatchSize, type, factor, owner);
// Since we just allocated a set of containers this should work
containerInfo =

View File

@ -36,6 +36,7 @@ public class CreateContainerHandler extends OzoneCommandHandler {
public static final String CONTAINER_CREATE = "create";
public static final String OPT_CONTAINER_NAME = "c";
public static final String containerOwner = "OZONE";
// TODO Support an optional -p <pipelineID> option to create
// container on given datanodes.
@ -59,7 +60,7 @@ public void execute(CommandLine cmd) throws IOException {
String containerName = cmd.getOptionValue(OPT_CONTAINER_NAME);
logOut("Creating container : %s.", containerName);
getScmClient().createContainer(containerName);
getScmClient().createContainer(containerName, containerOwner);
logOut("Container created.");
}

View File

@ -26,7 +26,6 @@
import org.apache.hadoop.ozone.lease.LeaseException;
import org.apache.hadoop.ozone.lease.LeaseManager;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.Owner;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
@ -198,7 +197,7 @@ public ContainerInfo allocateContainer(
ReplicationType type,
ReplicationFactor replicationFactor,
final String containerName,
Owner owner)
String owner)
throws IOException {
Preconditions.checkNotNull(containerName);
Preconditions.checkState(!containerName.isEmpty());

View File

@ -54,7 +54,6 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.Owner;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleEvent;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
@ -167,7 +166,6 @@ public ContainerStateManager(Configuration configuration,
lock = new ReentrantReadWriteLock();
containers = new HashMap<>();
initializeContainerMaps();
loadExistingContainers(containerMapping);
containerCloseQueue = new ConcurrentLinkedQueue<>();
}
@ -185,17 +183,15 @@ public ContainerStateManager(Configuration configuration,
* of these {ALLOCATED, CREATING, OPEN, CLOSED, DELETING, DELETED} container
* states
*/
private void initializeContainerMaps() {
private void initializeContainerMaps(String owner) {
// Called only from Ctor path, hence no lock is held.
Preconditions.checkNotNull(containers);
for (OzoneProtos.Owner owner : OzoneProtos.Owner.values()) {
for (ReplicationType type : ReplicationType.values()) {
for (ReplicationFactor factor : ReplicationFactor.values()) {
for (LifeCycleState state : LifeCycleState.values()) {
ContainerKey key = new ContainerKey(owner, type, factor, state);
PriorityQueue<ContainerInfo> queue = new PriorityQueue<>();
containers.put(key, queue);
}
for (ReplicationType type : ReplicationType.values()) {
for (ReplicationFactor factor : ReplicationFactor.values()) {
for (LifeCycleState state : LifeCycleState.values()) {
ContainerKey key = new ContainerKey(owner, type, factor, state);
PriorityQueue<ContainerInfo> queue = new PriorityQueue<>();
containers.put(key, queue);
}
}
}
@ -208,12 +204,18 @@ private void initializeContainerMaps() {
*/
private void loadExistingContainers(Mapping containerMapping) {
try {
List<String> ownerList = new ArrayList<>();
List<ContainerInfo> containerList =
containerMapping.listContainer(null, null, Integer.MAX_VALUE);
for (ContainerInfo container : containerList) {
ContainerKey key = new ContainerKey(container.getOwner(),
container.getPipeline().getType(),
container.getPipeline().getFactor(), container.getState());
String owner = container.getOwner();
if (ownerList.isEmpty() || !ownerList.contains(owner)) {
ownerList.add(owner);
initializeContainerMaps(owner);
}
ContainerKey key =
new ContainerKey(owner, container.getPipeline().getType(),
container.getPipeline().getFactor(), container.getState());
containers.get(key).add(container);
}
} catch (IOException e) {
@ -317,7 +319,7 @@ private void initializeStateMachine() {
*/
public ContainerInfo allocateContainer(PipelineSelector selector, OzoneProtos
.ReplicationType type, OzoneProtos.ReplicationFactor replicationFactor,
final String containerName, OzoneProtos.Owner owner) throws
final String containerName, String owner) throws
IOException {
Pipeline pipeline = selector.getReplicationPipeline(type,
@ -340,7 +342,10 @@ public ContainerInfo allocateContainer(PipelineSelector selector, OzoneProtos
ContainerKey key = new ContainerKey(owner, type, replicationFactor,
containerInfo.getState());
PriorityQueue<ContainerInfo> queue = containers.get(key);
Preconditions.checkNotNull(queue);
if (queue == null) {
initializeContainerMaps(owner);
queue = containers.get(key);
}
queue.add(containerInfo);
LOG.trace("New container allocated: {}", containerInfo);
} finally {
@ -431,12 +436,16 @@ public ContainerInfo updateContainerState(ContainerInfo
* @return ContainerInfo
*/
public ContainerInfo getMatchingContainer(final long size,
Owner owner, ReplicationType type, ReplicationFactor factor,
String owner, ReplicationType type, ReplicationFactor factor,
LifeCycleState state) {
ContainerKey key = new ContainerKey(owner, type, factor, state);
lock.writeLock().lock();
try {
PriorityQueue<ContainerInfo> queue = containers.get(key);
if (queue == null) {
initializeContainerMaps(owner);
queue = containers.get(key);
}
if (queue.size() == 0) {
// We don't have any Containers of this type.
return null;
@ -466,13 +475,17 @@ public ContainerInfo getMatchingContainer(final long size,
}
@VisibleForTesting
public List<ContainerInfo> getMatchingContainers(Owner owner,
public List<ContainerInfo> getMatchingContainers(String owner,
ReplicationType type, ReplicationFactor factor, LifeCycleState state) {
ContainerKey key = new ContainerKey(owner, type, factor, state);
lock.readLock().lock();
try {
return Arrays.asList((ContainerInfo[]) containers.get(key)
.toArray(new ContainerInfo[0]));
if (containers.get(key) == null) {
return null;
} else {
return Arrays.asList((ContainerInfo[]) containers.get(key)
.toArray(new ContainerInfo[0]));
}
} catch (Exception e) {
LOG.error("Could not get matching containers", e);
} finally {
@ -492,7 +505,7 @@ public void close() throws IOException {
private static class ContainerKey {
private final LifeCycleState state;
private final ReplicationType type;
private final OzoneProtos.Owner owner;
private final String owner;
private final ReplicationFactor replicationFactor;
/**
@ -503,7 +516,7 @@ private static class ContainerKey {
* @param factor - Replication Factors
* @param state - LifeCycle State
*/
ContainerKey(Owner owner, ReplicationType type,
ContainerKey(String owner, ReplicationType type,
ReplicationFactor factor, LifeCycleState state) {
this.state = state;
this.type = type;

View File

@ -73,7 +73,7 @@ List<ContainerInfo> listContainer(String startName, String prefixName,
*/
ContainerInfo allocateContainer(OzoneProtos.ReplicationType type,
OzoneProtos.ReplicationFactor replicationFactor,
String containerName, OzoneProtos.Owner owner) throws IOException;
String containerName, String owner) throws IOException;
/**
* Deletes a container from SCM.

View File

@ -105,7 +105,7 @@ private List<Pipeline> createContainerAndGetPipeline(int count)
Pipeline pipeline =
storageContainerLocationClient.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerName);
xceiverClientManager.getFactor(), containerName, "CBLOCK");
XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
ContainerProtocolCalls.createContainer(client, traceID);
// This step is needed since we set private data on pipelines, when we

View File

@ -111,7 +111,7 @@ private List<Pipeline> getContainerPipeline(int count) throws IOException {
Pipeline pipeline =
storageContainerLocationClient.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerName);
xceiverClientManager.getFactor(), containerName, "CBLOCK");
XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
ContainerProtocolCalls.createContainer(client, traceID);
// This step is needed since we set private data on pipelines, when we

View File

@ -116,7 +116,7 @@ private List<Pipeline> getContainerPipeline(int count) throws IOException {
Pipeline pipeline =
storageContainerLocationClient.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerName);
xceiverClientManager.getFactor(), containerName, "CBLOCK");
XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
ContainerProtocolCalls.createContainer(client, traceID);
// This step is needed since we set private data on pipelines, when we

View File

@ -50,7 +50,7 @@ public class MockStorageClient implements ScmClient {
* @throws Exception
*/
@Override
public Pipeline createContainer(String containerId)
public Pipeline createContainer(String containerId, String owner)
throws IOException {
int contId = currentContainerId.getAndIncrement();
ContainerLookUpService.addContainer(Long.toString(contId));
@ -135,8 +135,8 @@ public long getContainerSize(Pipeline pipeline) throws IOException {
@Override
public Pipeline createContainer(OzoneProtos.ReplicationType type,
OzoneProtos.ReplicationFactor replicationFactor, String containerId)
throws IOException {
OzoneProtos.ReplicationFactor replicationFactor, String containerId,
String owner) throws IOException {
int contId = currentContainerId.getAndIncrement();
ContainerLookUpService.addContainer(Long.toString(contId));
return ContainerLookUpService.lookUp(Long.toString(contId))

View File

@ -82,7 +82,7 @@ public static void cleanup() throws Exception {
public void testCreate() throws Exception {
Pipeline pipeline0 = storageClient.createContainer(OzoneProtos
.ReplicationType.STAND_ALONE, OzoneProtos.ReplicationFactor
.ONE, "container0");
.ONE, "container0", "OZONE");
assertEquals("container0", pipeline0.getContainerName());
}

View File

@ -132,7 +132,7 @@ private void testRpcPermissionWithConf(
try {
Pipeline pipeLine2 = mockScm.allocateContainer(
xceiverClientManager.getType(),
OzoneProtos.ReplicationFactor.ONE, "container2");
OzoneProtos.ReplicationFactor.ONE, "container2", "OZONE");
if (expectPermissionDenied) {
fail("Operation should fail, expecting an IOException here.");
} else {
@ -145,7 +145,7 @@ private void testRpcPermissionWithConf(
try {
Pipeline pipeLine3 = mockScm.allocateContainer(
xceiverClientManager.getType(),
OzoneProtos.ReplicationFactor.ONE, "container3");
OzoneProtos.ReplicationFactor.ONE, "container3", "OZONE");
if (expectPermissionDenied) {
fail("Operation should fail, expecting an IOException here.");

View File

@ -45,6 +45,7 @@ public class TestAllocateContainer {
private static StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
private static XceiverClientManager xceiverClientManager;
private static String containerOwner = "OZONE";
@Rule
public ExpectedException thrown = ExpectedException.none();
@ -74,7 +75,7 @@ public void testAllocate() throws Exception {
Pipeline pipeline = storageContainerLocationClient.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
"container0");
"container0", containerOwner);
Assert.assertNotNull(pipeline);
Assert.assertNotNull(pipeline.getLeader());
@ -85,7 +86,7 @@ public void testAllocateNull() throws Exception {
thrown.expect(NullPointerException.class);
storageContainerLocationClient.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), null);
xceiverClientManager.getFactor(), null, containerOwner);
}
@Test
@ -95,9 +96,9 @@ public void testAllocateDuplicate() throws Exception {
thrown.expectMessage("Specified container already exists");
storageContainerLocationClient.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerName);
xceiverClientManager.getFactor(), containerName, containerOwner);
storageContainerLocationClient.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerName);
xceiverClientManager.getFactor(), containerName, containerOwner);
}
}

View File

@ -2,7 +2,7 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* regarding copyright containerOwnership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
@ -99,6 +99,7 @@ public TestContainerSQLCli(String type) {
private final static long DEFAULT_BLOCK_SIZE = 4 * KB;
private static OzoneProtos.ReplicationFactor factor;
private static OzoneProtos.ReplicationType type;
private static final String containerOwner = "OZONE";
@Before
@ -145,7 +146,7 @@ public void setup() throws Exception {
}
assertEquals(2, nodeManager.getAllNodes().size());
AllocatedBlock ab1 = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE, type,
factor);
factor, containerOwner);
pipeline1 = ab1.getPipeline();
blockContainerMap.put(ab1.getKey(), pipeline1.getContainerName());
@ -157,7 +158,8 @@ public void setup() throws Exception {
// although each retry will create a block and assign to a container. So
// the size of blockContainerMap will vary each time the test is run.
while (true) {
ab2 = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE, type, factor);
ab2 = blockManager
.allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, containerOwner);
pipeline2 = ab2.getPipeline();
blockContainerMap.put(ab2.getKey(), pipeline2.getContainerName());
if (!pipeline1.getContainerName().equals(pipeline2.getContainerName())) {

View File

@ -55,6 +55,7 @@ public class TestContainerSmallFile {
private static StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
private static XceiverClientManager xceiverClientManager;
private static String containerOwner = "OZONE";
@BeforeClass
public static void init() throws Exception {
@ -86,7 +87,7 @@ public void testAllocateWrite() throws Exception {
Pipeline pipeline =
storageContainerLocationClient.allocateContainer(
xceiverClientManager.getType(),
OzoneProtos.ReplicationFactor.ONE, containerName);
OzoneProtos.ReplicationFactor.ONE, containerName, containerOwner);
XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
ContainerProtocolCalls.createContainer(client, traceID);
@ -107,7 +108,7 @@ public void testInvalidKeyRead() throws Exception {
Pipeline pipeline =
storageContainerLocationClient.allocateContainer(
xceiverClientManager.getType(),
OzoneProtos.ReplicationFactor.ONE, containerName);
OzoneProtos.ReplicationFactor.ONE, containerName, containerOwner);
XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
ContainerProtocolCalls.createContainer(client, traceID);
@ -129,7 +130,7 @@ public void testInvalidContainerRead() throws Exception {
Pipeline pipeline =
storageContainerLocationClient.allocateContainer(
xceiverClientManager.getType(),
OzoneProtos.ReplicationFactor.ONE, containerName);
OzoneProtos.ReplicationFactor.ONE, containerName, containerOwner);
XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
ContainerProtocolCalls.createContainer(client, traceID);
ContainerProtocolCalls.writeSmallFile(client, containerName,

View File

@ -71,6 +71,7 @@ public class TestSCMCli {
private static ByteArrayOutputStream errContent;
private static PrintStream errStream;
private static XceiverClientManager xceiverClientManager;
private static String containerOwner = "OZONE";
@Rule
public Timeout globalTimeout = new Timeout(30000);
@ -158,7 +159,7 @@ public void testDeleteContainer() throws Exception {
containerName = "non-empty-container";
pipeline = containerOperationClient
.createContainer(xceiverClientManager.getType(),
OzoneProtos.ReplicationFactor.ONE, containerName);
OzoneProtos.ReplicationFactor.ONE, containerName, containerOwner);
ContainerData cdata = ContainerData
.getFromProtBuf(containerOperationClient.readContainer(pipeline), conf);
@ -200,7 +201,7 @@ public void testDeleteContainer() throws Exception {
containerName = "empty-container";
pipeline = containerOperationClient
.createContainer(xceiverClientManager.getType(),
OzoneProtos.ReplicationFactor.ONE, containerName);
OzoneProtos.ReplicationFactor.ONE, containerName, containerOwner);
containerOperationClient.closeContainer(pipeline);
Assert.assertTrue(containerExist(containerName));
@ -213,7 +214,7 @@ public void testDeleteContainer() throws Exception {
// After the container is deleted,
// a same name container can now be recreated.
containerOperationClient.createContainer(xceiverClientManager.getType(),
OzoneProtos.ReplicationFactor.ONE, containerName);
OzoneProtos.ReplicationFactor.ONE, containerName, containerOwner);
Assert.assertTrue(containerExist(containerName));
// ****************************************
@ -262,7 +263,7 @@ public void testInfoContainer() throws Exception {
cname = "ContainerTestInfo1";
Pipeline pipeline = containerOperationClient
.createContainer(xceiverClientManager.getType(),
OzoneProtos.ReplicationFactor.ONE, cname);
OzoneProtos.ReplicationFactor.ONE, cname, containerOwner);
ContainerData data = ContainerData
.getFromProtBuf(containerOperationClient.readContainer(pipeline), conf);
@ -284,7 +285,7 @@ public void testInfoContainer() throws Exception {
cname = "ContainerTestInfo2";
pipeline = containerOperationClient
.createContainer(xceiverClientManager.getType(),
OzoneProtos.ReplicationFactor.ONE, cname);
OzoneProtos.ReplicationFactor.ONE, cname, containerOwner);
data = ContainerData
.getFromProtBuf(containerOperationClient.readContainer(pipeline), conf);
KeyUtils.getDB(data, conf).put(cname.getBytes(), "someKey".getBytes());
@ -343,7 +344,7 @@ public void testListContainerCommand() throws Exception {
for (int index = 0; index < 20; index++) {
String containerName = String.format("%s%02d", prefix, index);
containerOperationClient.createContainer(xceiverClientManager.getType(),
OzoneProtos.ReplicationFactor.ONE, containerName);
OzoneProtos.ReplicationFactor.ONE, containerName, containerOwner);
}
ByteArrayOutputStream out = new ByteArrayOutputStream();

View File

@ -51,6 +51,7 @@ public class TestXceiverClientManager {
private static MiniOzoneCluster cluster;
private static StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
private static String containerOwner = "OZONE";
@Rule
public ExpectedException exception = ExpectedException.none();
@ -76,18 +77,18 @@ public void testCaching() throws IOException {
XceiverClientManager clientManager = new XceiverClientManager(conf);
String containerName1 = "container" + RandomStringUtils.randomNumeric(10);
Pipeline pipeline1 =
storageContainerLocationClient.allocateContainer(
clientManager.getType(), clientManager.getFactor(), containerName1);
Pipeline pipeline1 = storageContainerLocationClient
.allocateContainer(clientManager.getType(), clientManager.getFactor(),
containerName1, containerOwner);
XceiverClientSpi client1 = clientManager.acquireClient(pipeline1);
Assert.assertEquals(1, client1.getRefcount());
Assert.assertEquals(containerName1,
client1.getPipeline().getContainerName());
String containerName2 = "container" + RandomStringUtils.randomNumeric(10);
Pipeline pipeline2 =
storageContainerLocationClient.allocateContainer(
clientManager.getType(), clientManager.getFactor(), containerName2);
Pipeline pipeline2 = storageContainerLocationClient
.allocateContainer(clientManager.getType(), clientManager.getFactor(),
containerName2, containerOwner);
XceiverClientSpi client2 = clientManager.acquireClient(pipeline2);
Assert.assertEquals(1, client2.getRefcount());
Assert.assertEquals(containerName2,
@ -116,7 +117,7 @@ public void testFreeByReference() throws IOException {
Pipeline pipeline1 =
storageContainerLocationClient.allocateContainer(
clientManager.getType(), OzoneProtos.ReplicationFactor.ONE,
containerName1);
containerName1, containerOwner);
XceiverClientSpi client1 = clientManager.acquireClient(pipeline1);
Assert.assertEquals(1, client1.getRefcount());
Assert.assertEquals(containerName1,
@ -126,7 +127,7 @@ public void testFreeByReference() throws IOException {
Pipeline pipeline2 =
storageContainerLocationClient.allocateContainer(
clientManager.getType(),
OzoneProtos.ReplicationFactor.ONE, containerName2);
OzoneProtos.ReplicationFactor.ONE, containerName2, containerOwner);
XceiverClientSpi client2 = clientManager.acquireClient(pipeline2);
Assert.assertEquals(1, client2.getRefcount());
Assert.assertEquals(containerName2,
@ -161,7 +162,7 @@ public void testFreeByEviction() throws IOException {
Pipeline pipeline1 =
storageContainerLocationClient.allocateContainer(
clientManager.getType(),
clientManager.getFactor(), containerName1);
clientManager.getFactor(), containerName1, containerOwner);
XceiverClientSpi client1 = clientManager.acquireClient(pipeline1);
Assert.assertEquals(1, client1.getRefcount());
Assert.assertEquals(containerName1,
@ -171,9 +172,9 @@ public void testFreeByEviction() throws IOException {
Assert.assertEquals(0, client1.getRefcount());
String containerName2 = "container" + RandomStringUtils.randomNumeric(10);
Pipeline pipeline2 =
storageContainerLocationClient.allocateContainer(
clientManager.getType(), clientManager.getFactor(), containerName2);
Pipeline pipeline2 = storageContainerLocationClient
.allocateContainer(clientManager.getType(), clientManager.getFactor(),
containerName2, containerOwner);
XceiverClientSpi client2 = clientManager.acquireClient(pipeline2);
Assert.assertEquals(1, client2.getRefcount());
Assert.assertEquals(containerName2,

View File

@ -60,6 +60,7 @@ public class TestXceiverClientMetrics {
private static MiniOzoneCluster cluster;
private static StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
private static String containerOwner = "OZONE";
@BeforeClass
public static void init() throws IOException {
@ -82,8 +83,9 @@ public void testMetrics() throws Exception {
XceiverClientManager clientManager = new XceiverClientManager(conf);
String containerName = "container" + RandomStringUtils.randomNumeric(10);
Pipeline pipeline = storageContainerLocationClient.allocateContainer(
clientManager.getType(), clientManager.getFactor(), containerName);
Pipeline pipeline = storageContainerLocationClient
.allocateContainer(clientManager.getType(), clientManager.getFactor(),
containerName, containerOwner);
XceiverClientSpi client = clientManager.acquireClient(pipeline);
ContainerCommandRequestProto request = ContainerTestHelper

View File

@ -57,6 +57,7 @@ public class TestBlockManager {
private final static long DEFAULT_BLOCK_SIZE = 128 * MB;
private static OzoneProtos.ReplicationFactor factor;
private static OzoneProtos.ReplicationType type;
private static String containerOwner = "OZONE";
@Rule
public ExpectedException thrown = ExpectedException.none();
@ -101,14 +102,14 @@ public void clearChillMode() {
@Test
public void testAllocateBlock() throws Exception {
AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE,
type, factor);
type, factor, containerOwner);
Assert.assertNotNull(block);
}
@Test
public void testGetAllocatedBlock() throws IOException {
AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE,
type, factor);
type, factor, containerOwner);
Assert.assertNotNull(block);
Pipeline pipeline = blockManager.getBlock(block.getKey());
Assert.assertEquals(pipeline.getLeader().getDatanodeUuid(),
@ -118,7 +119,7 @@ public void testGetAllocatedBlock() throws IOException {
@Test
public void testDeleteBlock() throws Exception {
AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE,
type, factor);
type, factor, containerOwner);
Assert.assertNotNull(block);
blockManager.deleteBlocks(Collections.singletonList(block.getKey()));
@ -139,7 +140,7 @@ public void testAllocateOversizedBlock() throws IOException {
long size = 6 * GB;
thrown.expectMessage("Unsupported block size");
AllocatedBlock block = blockManager.allocateBlock(size,
type, factor);
type, factor, containerOwner);
}
@Test
@ -154,6 +155,6 @@ public void testChillModeAllocateBlockFails() throws IOException {
nodeManager.setChillmode(true);
thrown.expectMessage("Unable to create block while in chill mode");
blockManager.allocateBlock(DEFAULT_BLOCK_SIZE,
type, factor);
type, factor, containerOwner);
}
}

View File

@ -57,6 +57,7 @@ public class TestContainerMapping {
private static MockNodeManager nodeManager;
private static File testDir;
private static XceiverClientManager xceiverClientManager;
private static String containerOwner = "OZONE";
private static final long TIMEOUT = 10000;
@ -101,7 +102,7 @@ public void testallocateContainer() throws Exception {
ContainerInfo containerInfo = mapping.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
UUID.randomUUID().toString(), OzoneProtos.Owner.OZONE);
UUID.randomUUID().toString(), containerOwner);
Assert.assertNotNull(containerInfo);
}
@ -118,7 +119,7 @@ public void testallocateContainerDistributesAllocation() throws Exception {
ContainerInfo containerInfo = mapping.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
UUID.randomUUID().toString(), OzoneProtos.Owner.OZONE);
UUID.randomUUID().toString(), containerOwner);
Assert.assertNotNull(containerInfo);
Assert.assertNotNull(containerInfo.getPipeline());
@ -134,7 +135,7 @@ public void testGetContainer() throws IOException {
Pipeline pipeline = mapping.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerName,
OzoneProtos.Owner.OZONE).getPipeline();
containerOwner).getPipeline();
Assert.assertNotNull(pipeline);
Pipeline newPipeline = mapping.getContainer(containerName).getPipeline();
Assert.assertEquals(pipeline.getLeader().getDatanodeUuid(),
@ -147,12 +148,12 @@ public void testDuplicateAllocateContainerFails() throws IOException {
Pipeline pipeline = mapping.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerName,
OzoneProtos.Owner.OZONE).getPipeline();
containerOwner).getPipeline();
Assert.assertNotNull(pipeline);
thrown.expectMessage("Specified container already exists.");
mapping.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerName,
OzoneProtos.Owner.OZONE);
containerOwner);
}
@Test
@ -169,7 +170,7 @@ public void testChillModeAllocateContainerFails() throws IOException {
thrown.expectMessage("Unable to create container while in chill mode");
mapping.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerName,
OzoneProtos.Owner.OZONE);
containerOwner);
}
@Test
@ -181,7 +182,7 @@ public void testContainerCreationLeaseTimeout() throws IOException,
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
containerName,
OzoneProtos.Owner.OZONE);
containerOwner);
mapping.updateContainerState(containerInfo.getContainerName(),
OzoneProtos.LifeCycleEvent.CREATE);
Thread.sleep(TIMEOUT + 1000);
@ -265,7 +266,7 @@ public void testContainerCloseWithContainerReport() throws IOException {
Assert.assertEquals(5368705120L, updatedContainer.getUsedBytes());
List<ContainerInfo> pendingCloseContainers = mapping.getStateManager()
.getMatchingContainers(
OzoneProtos.Owner.OZONE,
containerOwner,
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.CLOSING);
@ -282,7 +283,7 @@ public void testCloseContainer() throws IOException {
OzoneProtos.LifeCycleEvent.FINALIZE);
List<ContainerInfo> pendingCloseContainers = mapping.getStateManager()
.getMatchingContainers(
OzoneProtos.Owner.OZONE,
containerOwner,
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.CLOSING);
@ -293,7 +294,7 @@ public void testCloseContainer() throws IOException {
OzoneProtos.LifeCycleEvent.CLOSE);
List<ContainerInfo> closeContainers = mapping.getStateManager()
.getMatchingContainers(
OzoneProtos.Owner.OZONE,
containerOwner,
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.CLOSED);
@ -314,7 +315,7 @@ private void createContainer(String containerName) throws IOException {
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
containerName,
OzoneProtos.Owner.OZONE);
containerOwner);
mapping.updateContainerState(containerInfo.getContainerName(),
OzoneProtos.LifeCycleEvent.CREATE);
mapping.updateContainerState(containerInfo.getContainerName(),

View File

@ -33,6 +33,7 @@
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.List;
import java.util.Random;
/**
@ -46,6 +47,7 @@ public class TestContainerStateManager {
private StorageContainerManager scm;
private Mapping scmContainerMapping;
private ContainerStateManager stateManager;
private String containerOwner = "OZONE";
@Rule
public ExpectedException thrown = ExpectedException.none();
@ -74,14 +76,14 @@ public void testAllocateContainer() throws IOException {
// Allocate a container and verify the container info
String container1 = "container" + RandomStringUtils.randomNumeric(5);
scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container1);
xceiverClientManager.getFactor(), container1, containerOwner);
ContainerInfo info = stateManager
.getMatchingContainer(OzoneConsts.GB * 3, OzoneProtos.Owner.OZONE,
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.ALLOCATED);
Assert.assertEquals(container1, info.getContainerName());
Assert.assertEquals(OzoneConsts.GB * 3, info.getAllocatedBytes());
Assert.assertEquals(OzoneProtos.Owner.OZONE, info.getOwner());
Assert.assertEquals(containerOwner, info.getOwner());
Assert.assertEquals(xceiverClientManager.getType(),
info.getPipeline().getType());
Assert.assertEquals(xceiverClientManager.getFactor(),
@ -91,9 +93,9 @@ public void testAllocateContainer() throws IOException {
// Check there are two containers in ALLOCATED state after allocation
String container2 = "container" + RandomStringUtils.randomNumeric(5);
scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container2);
xceiverClientManager.getFactor(), container2, containerOwner);
int numContainers = stateManager
.getMatchingContainers(OzoneProtos.Owner.OZONE,
.getMatchingContainers(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.ALLOCATED).size();
Assert.assertEquals(2, numContainers);
@ -105,10 +107,10 @@ public void testContainerStateManagerRestart() throws IOException {
String cname = "container" + RandomStringUtils.randomNumeric(5);
for (int i = 0; i < 10; i++) {
scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), cname + i);
xceiverClientManager.getFactor(), cname + i, containerOwner);
if (i >= 5) {
scm.getScmContainerManager().updateContainerState(cname + i,
OzoneProtos.LifeCycleEvent.CREATE);
scm.getScmContainerManager()
.updateContainerState(cname + i, OzoneProtos.LifeCycleEvent.CREATE);
}
}
@ -118,11 +120,11 @@ public void testContainerStateManagerRestart() throws IOException {
new ContainerStateManager(conf, scmContainerMapping,
128 * OzoneConsts.MB);
int containers = stateManager
.getMatchingContainers(OzoneProtos.Owner.OZONE,
.getMatchingContainers(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.ALLOCATED).size();
Assert.assertEquals(5, containers);
containers = stateManager.getMatchingContainers(OzoneProtos.Owner.OZONE,
containers = stateManager.getMatchingContainers(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.CREATING).size();
Assert.assertEquals(5, containers);
@ -132,7 +134,7 @@ public void testContainerStateManagerRestart() throws IOException {
public void testGetMatchingContainer() throws IOException {
String container1 = "container" + RandomStringUtils.randomNumeric(5);
scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container1);
xceiverClientManager.getFactor(), container1, containerOwner);
scmContainerMapping.updateContainerState(container1,
OzoneProtos.LifeCycleEvent.CREATE);
scmContainerMapping.updateContainerState(container1,
@ -140,22 +142,22 @@ public void testGetMatchingContainer() throws IOException {
String container2 = "container" + RandomStringUtils.randomNumeric(5);
scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container2);
xceiverClientManager.getFactor(), container2, containerOwner);
ContainerInfo info = stateManager
.getMatchingContainer(OzoneConsts.GB * 3, OzoneProtos.Owner.OZONE,
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.OPEN);
Assert.assertEquals(container1, info.getContainerName());
info = stateManager
.getMatchingContainer(OzoneConsts.GB * 3, OzoneProtos.Owner.OZONE,
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.OPEN);
Assert.assertEquals(null, info);
info = stateManager
.getMatchingContainer(OzoneConsts.GB * 3, OzoneProtos.Owner.OZONE,
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.ALLOCATED);
Assert.assertEquals(container2, info.getContainerName());
@ -165,7 +167,7 @@ public void testGetMatchingContainer() throws IOException {
scmContainerMapping.updateContainerState(container2,
OzoneProtos.LifeCycleEvent.CREATED);
info = stateManager
.getMatchingContainer(OzoneConsts.GB * 3, OzoneProtos.Owner.OZONE,
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.OPEN);
Assert.assertEquals(container2, info.getContainerName());
@ -173,60 +175,61 @@ public void testGetMatchingContainer() throws IOException {
@Test
public void testUpdateContainerState() throws IOException {
int containers = stateManager
.getMatchingContainers(OzoneProtos.Owner.OZONE,
List<ContainerInfo> containerList = stateManager
.getMatchingContainers(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.ALLOCATED).size();
OzoneProtos.LifeCycleState.ALLOCATED);
int containers = containerList == null ? 0 : containerList.size();
Assert.assertEquals(0, containers);
// Allocate container1 and update its state from ALLOCATED -> CREATING ->
// OPEN -> CLOSING -> CLOSED -> DELETING -> DELETED
String container1 = "container" + RandomStringUtils.randomNumeric(5);
scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container1);
containers = stateManager.getMatchingContainers(OzoneProtos.Owner.OZONE,
xceiverClientManager.getFactor(), container1, containerOwner);
containers = stateManager.getMatchingContainers(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.ALLOCATED).size();
Assert.assertEquals(1, containers);
scmContainerMapping.updateContainerState(container1,
OzoneProtos.LifeCycleEvent.CREATE);
containers = stateManager.getMatchingContainers(OzoneProtos.Owner.OZONE,
containers = stateManager.getMatchingContainers(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.CREATING).size();
Assert.assertEquals(1, containers);
scmContainerMapping.updateContainerState(container1,
OzoneProtos.LifeCycleEvent.CREATED);
containers = stateManager.getMatchingContainers(OzoneProtos.Owner.OZONE,
containers = stateManager.getMatchingContainers(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.OPEN).size();
Assert.assertEquals(1, containers);
scmContainerMapping
.updateContainerState(container1, OzoneProtos.LifeCycleEvent.FINALIZE);
containers = stateManager.getMatchingContainers(OzoneProtos.Owner.OZONE,
containers = stateManager.getMatchingContainers(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.CLOSING).size();
Assert.assertEquals(1, containers);
scmContainerMapping
.updateContainerState(container1, OzoneProtos.LifeCycleEvent.CLOSE);
containers = stateManager.getMatchingContainers(OzoneProtos.Owner.OZONE,
containers = stateManager.getMatchingContainers(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.CLOSED).size();
Assert.assertEquals(1, containers);
scmContainerMapping
.updateContainerState(container1, OzoneProtos.LifeCycleEvent.DELETE);
containers = stateManager.getMatchingContainers(OzoneProtos.Owner.OZONE,
containers = stateManager.getMatchingContainers(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.DELETING).size();
Assert.assertEquals(1, containers);
scmContainerMapping
.updateContainerState(container1, OzoneProtos.LifeCycleEvent.CLEANUP);
containers = stateManager.getMatchingContainers(OzoneProtos.Owner.OZONE,
containers = stateManager.getMatchingContainers(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.DELETED).size();
Assert.assertEquals(1, containers);
@ -235,12 +238,12 @@ public void testUpdateContainerState() throws IOException {
// DELETING
String container2 = "container" + RandomStringUtils.randomNumeric(5);
scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container2);
xceiverClientManager.getFactor(), container2, containerOwner);
scmContainerMapping.updateContainerState(container2,
OzoneProtos.LifeCycleEvent.CREATE);
scmContainerMapping
.updateContainerState(container2, OzoneProtos.LifeCycleEvent.TIMEOUT);
containers = stateManager.getMatchingContainers(OzoneProtos.Owner.OZONE,
containers = stateManager.getMatchingContainers(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.DELETING).size();
Assert.assertEquals(1, containers);
@ -249,7 +252,7 @@ public void testUpdateContainerState() throws IOException {
// OPEN -> CLOSING -> CLOSED
String container3 = "container" + RandomStringUtils.randomNumeric(5);
scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container3);
xceiverClientManager.getFactor(), container3, containerOwner);
scmContainerMapping.updateContainerState(container3,
OzoneProtos.LifeCycleEvent.CREATE);
scmContainerMapping.updateContainerState(container3,
@ -258,7 +261,7 @@ public void testUpdateContainerState() throws IOException {
OzoneProtos.LifeCycleEvent.FINALIZE);
scmContainerMapping
.updateContainerState(container3, OzoneProtos.LifeCycleEvent.CLOSE);
containers = stateManager.getMatchingContainers(OzoneProtos.Owner.OZONE,
containers = stateManager.getMatchingContainers(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.CLOSED).size();
Assert.assertEquals(1, containers);
@ -268,7 +271,7 @@ public void testUpdateContainerState() throws IOException {
public void testUpdatingAllocatedBytes() throws Exception {
String container1 = "container" + RandomStringUtils.randomNumeric(5);
scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container1);
xceiverClientManager.getFactor(), container1, containerOwner);
scmContainerMapping.updateContainerState(container1,
OzoneProtos.LifeCycleEvent.CREATE);
scmContainerMapping.updateContainerState(container1,
@ -281,7 +284,7 @@ public void testUpdatingAllocatedBytes() throws Exception {
allocatedSize += size;
// trigger allocating bytes by calling getMatchingContainer
ContainerInfo info = stateManager
.getMatchingContainer(size, OzoneProtos.Owner.OZONE,
.getMatchingContainer(size, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.OPEN);
Assert.assertEquals(container1, info.getContainerName());

View File

@ -26,7 +26,6 @@
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.protocol.proto
@ -161,7 +160,7 @@ public void testContainerPlacementCapacity() throws IOException,
String container1 = UUID.randomUUID().toString();
Pipeline pipeline1 = containerManager.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container1, OzoneProtos.Owner.OZONE)
xceiverClientManager.getFactor(), container1, "OZONE")
.getPipeline();
assertEquals(xceiverClientManager.getFactor().getNumber(),
pipeline1.getMachines().size());
@ -190,7 +189,7 @@ public void testContainerPlacementCapacity() throws IOException,
String container2 = UUID.randomUUID().toString();
containerManager.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container2,
OzoneProtos.Owner.OZONE);
"OZONE");
} finally {
IOUtils.closeQuietly(containerManager);
IOUtils.closeQuietly(nodeManager);