HDFS-11424. Block Storage: add container representation to meta data. Contributed Chen Liang.
This commit is contained in:
parent
3e1317de02
commit
9c57a61f68
|
@ -18,11 +18,11 @@
|
||||||
|
|
||||||
package org.apache.hadoop.scm.protocol;
|
package org.apache.hadoop.scm.protocol;
|
||||||
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Holds the nodes that currently host the container for an object key hash.
|
* Holds the nodes that currently host the container for an object key hash.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -30,12 +30,13 @@ import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolPB;
|
||||||
import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolServerSideTranslatorPB;
|
import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolServerSideTranslatorPB;
|
||||||
import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
|
import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
|
||||||
import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolServerSideTranslatorPB;
|
import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolServerSideTranslatorPB;
|
||||||
import org.apache.hadoop.cblock.storage.IStorageClient;
|
import org.apache.hadoop.scm.client.ScmClient;
|
||||||
import org.apache.hadoop.cblock.storage.StorageManager;
|
import org.apache.hadoop.cblock.storage.StorageManager;
|
||||||
import org.apache.hadoop.cblock.util.KeyUtil;
|
import org.apache.hadoop.cblock.util.KeyUtil;
|
||||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||||
import org.apache.hadoop.utils.LevelDBStore;
|
import org.apache.hadoop.utils.LevelDBStore;
|
||||||
import org.iq80.leveldb.DBIterator;
|
import org.iq80.leveldb.DBIterator;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -88,9 +89,9 @@ public class CBlockManager implements CBlockServiceProtocol,
|
||||||
|
|
||||||
private Charset encoding = Charset.forName("UTF-8");
|
private Charset encoding = Charset.forName("UTF-8");
|
||||||
|
|
||||||
public CBlockManager(CBlockConfiguration conf, IStorageClient storageClient
|
public CBlockManager(OzoneConfiguration conf,
|
||||||
) throws IOException {
|
ScmClient storageClient) throws IOException {
|
||||||
storageManager = new StorageManager(storageClient);
|
storageManager = new StorageManager(storageClient, conf);
|
||||||
|
|
||||||
dbPath = conf.getTrimmed(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY,
|
dbPath = conf.getTrimmed(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY,
|
||||||
DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT);
|
DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT);
|
||||||
|
@ -177,7 +178,7 @@ public class CBlockManager implements CBlockServiceProtocol,
|
||||||
* @return RPC server, or null if addr is null
|
* @return RPC server, or null if addr is null
|
||||||
* @throws IOException if there is an I/O error while creating RPC server
|
* @throws IOException if there is an I/O error while creating RPC server
|
||||||
*/
|
*/
|
||||||
private static RPC.Server startRpcServer(CBlockConfiguration conf,
|
private static RPC.Server startRpcServer(OzoneConfiguration conf,
|
||||||
Class<?> protocol, BlockingService instance,
|
Class<?> protocol, BlockingService instance,
|
||||||
InetSocketAddress addr, String bindHostKey,
|
InetSocketAddress addr, String bindHostKey,
|
||||||
String handlerCountKey, int handlerCountDefault) throws IOException {
|
String handlerCountKey, int handlerCountDefault) throws IOException {
|
||||||
|
@ -281,6 +282,10 @@ public class CBlockManager implements CBlockServiceProtocol,
|
||||||
return storageManager.getAllVolume(null);
|
return storageManager.getAllVolume(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public synchronized List<VolumeDescriptor> getAllVolumes(String userName) {
|
||||||
|
return storageManager.getAllVolume(userName);
|
||||||
|
}
|
||||||
|
|
||||||
public synchronized void close() {
|
public synchronized void close() {
|
||||||
try {
|
try {
|
||||||
levelDBStore.close();
|
levelDBStore.close();
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.cblock.meta;
|
||||||
|
|
||||||
import com.google.protobuf.InvalidProtocolBufferException;
|
import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos;
|
import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos;
|
||||||
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -35,6 +36,7 @@ public class ContainerDescriptor {
|
||||||
// on creation, there may be no way to know the index of the container
|
// on creation, there may be no way to know the index of the container
|
||||||
// as it is a volume specific information
|
// as it is a volume specific information
|
||||||
private int containerIndex;
|
private int containerIndex;
|
||||||
|
private Pipeline pipeline;
|
||||||
|
|
||||||
public ContainerDescriptor(String containerID) {
|
public ContainerDescriptor(String containerID) {
|
||||||
this.containerID = containerID;
|
this.containerID = containerID;
|
||||||
|
@ -53,6 +55,14 @@ public class ContainerDescriptor {
|
||||||
return containerID;
|
return containerID;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setPipeline(Pipeline pipeline) {
|
||||||
|
this.pipeline = pipeline;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Pipeline getPipeline() {
|
||||||
|
return pipeline;
|
||||||
|
}
|
||||||
|
|
||||||
public int getContainerIndex() {
|
public int getContainerIndex() {
|
||||||
return containerIndex;
|
return containerIndex;
|
||||||
}
|
}
|
||||||
|
@ -66,6 +76,9 @@ public class ContainerDescriptor {
|
||||||
CBlockClientServerProtocolProtos.ContainerIDProto.newBuilder();
|
CBlockClientServerProtocolProtos.ContainerIDProto.newBuilder();
|
||||||
builder.setContainerID(containerID);
|
builder.setContainerID(containerID);
|
||||||
builder.setIndex(containerIndex);
|
builder.setIndex(containerIndex);
|
||||||
|
if (pipeline != null) {
|
||||||
|
builder.setPipeline(pipeline.getProtobufMessage());
|
||||||
|
}
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.cblock.meta;
|
||||||
|
|
||||||
import com.google.protobuf.InvalidProtocolBufferException;
|
import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos;
|
import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos;
|
||||||
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -126,6 +127,16 @@ public class VolumeDescriptor {
|
||||||
containerDescriptor);
|
containerDescriptor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public HashMap<String, Pipeline> getPipelines() {
|
||||||
|
HashMap<String, Pipeline> pipelines = new HashMap<>();
|
||||||
|
for (Map.Entry<String, ContainerDescriptor> entry :
|
||||||
|
containerMap.entrySet()) {
|
||||||
|
pipelines.put(entry.getKey(), entry.getValue().getPipeline());
|
||||||
|
}
|
||||||
|
return pipelines;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isEmpty() {
|
public boolean isEmpty() {
|
||||||
VolumeInfo info = getInfo();
|
VolumeInfo info = getInfo();
|
||||||
return info.getUsage() == 0;
|
return info.getUsage() == 0;
|
||||||
|
@ -154,6 +165,15 @@ public class VolumeDescriptor {
|
||||||
return new ArrayList<>(containerIdOrdered);
|
return new ArrayList<>(containerIdOrdered);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public List<Pipeline> getContainerPipelines() {
|
||||||
|
Map<String, Pipeline> tmp = getPipelines();
|
||||||
|
List<Pipeline> pipelineList = new LinkedList<>();
|
||||||
|
for (String containerIDString : containerIdOrdered) {
|
||||||
|
pipelineList.add(tmp.get(containerIDString));
|
||||||
|
}
|
||||||
|
return pipelineList;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
String string = "";
|
String string = "";
|
||||||
|
@ -204,6 +224,10 @@ public class VolumeDescriptor {
|
||||||
ContainerDescriptor containerDescriptor = new ContainerDescriptor(
|
ContainerDescriptor containerDescriptor = new ContainerDescriptor(
|
||||||
containerProto.getContainerID(),
|
containerProto.getContainerID(),
|
||||||
(int)containerProto.getIndex());
|
(int)containerProto.getIndex());
|
||||||
|
if(containerProto.hasPipeline()) {
|
||||||
|
containerDescriptor.setPipeline(
|
||||||
|
Pipeline.getFromProtoBuf(containerProto.getPipeline()));
|
||||||
|
}
|
||||||
volumeDescriptor.addContainer(containerDescriptor);
|
volumeDescriptor.addContainer(containerDescriptor);
|
||||||
containerOrdering[containerDescriptor.getContainerIndex()] =
|
containerOrdering[containerDescriptor.getContainerIndex()] =
|
||||||
containerDescriptor.getContainerID();
|
containerDescriptor.getContainerID();
|
||||||
|
|
|
@ -17,6 +17,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.cblock.proto;
|
package org.apache.hadoop.cblock.proto;
|
||||||
|
|
||||||
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -30,17 +33,20 @@ public class MountVolumeResponse {
|
||||||
private final String volumeName;
|
private final String volumeName;
|
||||||
private final long volumeSize;
|
private final long volumeSize;
|
||||||
private final int blockSize;
|
private final int blockSize;
|
||||||
private List<String> containerList;
|
private List<Pipeline> containerList;
|
||||||
|
private HashMap<String, Pipeline> pipelineMap;
|
||||||
|
|
||||||
public MountVolumeResponse(boolean isValid, String userName,
|
public MountVolumeResponse(boolean isValid, String userName,
|
||||||
String volumeName, long volumeSize, int blockSize,
|
String volumeName, long volumeSize, int blockSize,
|
||||||
List<String> containerList) {
|
List<Pipeline> containerList,
|
||||||
|
HashMap<String, Pipeline> pipelineMap) {
|
||||||
this.isValid = isValid;
|
this.isValid = isValid;
|
||||||
this.userName = userName;
|
this.userName = userName;
|
||||||
this.volumeName = volumeName;
|
this.volumeName = volumeName;
|
||||||
this.volumeSize = volumeSize;
|
this.volumeSize = volumeSize;
|
||||||
this.blockSize = blockSize;
|
this.blockSize = blockSize;
|
||||||
this.containerList = containerList;
|
this.containerList = containerList;
|
||||||
|
this.pipelineMap = pipelineMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean getIsValid() {
|
public boolean getIsValid() {
|
||||||
|
@ -63,7 +69,11 @@ public class MountVolumeResponse {
|
||||||
return blockSize;
|
return blockSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<String> getContainerList() {
|
public List<Pipeline> getContainerList() {
|
||||||
return containerList;
|
return containerList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public HashMap<String, Pipeline> getPipelineMap() {
|
||||||
|
return pipelineMap;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,8 +23,10 @@ import org.apache.hadoop.cblock.proto.CBlockClientServerProtocol;
|
||||||
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
|
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
|
||||||
import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos;
|
import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -61,12 +63,18 @@ public class CBlockClientServerProtocolServerSideTranslatorPB implements
|
||||||
resp.setVolumeName(result.getVolumeName());
|
resp.setVolumeName(result.getVolumeName());
|
||||||
resp.setVolumeSize(result.getVolumeSize());
|
resp.setVolumeSize(result.getVolumeSize());
|
||||||
resp.setBlockSize(result.getBlockSize());
|
resp.setBlockSize(result.getBlockSize());
|
||||||
List<String> containers = result.getContainerList();
|
List<Pipeline> containers = result.getContainerList();
|
||||||
|
HashMap<String, Pipeline> pipelineMap = result.getPipelineMap();
|
||||||
|
|
||||||
for (int i=0; i<containers.size(); i++) {
|
for (int i=0; i<containers.size(); i++) {
|
||||||
CBlockClientServerProtocolProtos.ContainerIDProto.Builder id =
|
CBlockClientServerProtocolProtos.ContainerIDProto.Builder id =
|
||||||
CBlockClientServerProtocolProtos.ContainerIDProto.newBuilder();
|
CBlockClientServerProtocolProtos.ContainerIDProto.newBuilder();
|
||||||
id.setContainerID(containers.get(i));
|
String containerName = containers.get(i).getContainerName();
|
||||||
|
id.setContainerID(containerName);
|
||||||
id.setIndex(i);
|
id.setIndex(i);
|
||||||
|
if (pipelineMap.containsKey(containerName)) {
|
||||||
|
id.setPipeline(pipelineMap.get(containerName).getProtobufMessage());
|
||||||
|
}
|
||||||
resp.addAllContainerIDs(id.build());
|
resp.addAllContainerIDs(id.build());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,44 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.cblock.storage;
|
|
||||||
|
|
||||||
import org.apache.hadoop.cblock.meta.ContainerDescriptor;
|
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The interface to call into underlying container layer.
|
|
||||||
*
|
|
||||||
* Written as interface to allow easy testing: implement a mock container layer
|
|
||||||
* for standalone testing of CBlock API without actually calling into remote
|
|
||||||
* containers. Actual container layer can simply re-implement this.
|
|
||||||
*
|
|
||||||
* NOTE this is temporarily needed class. When SCM containers are full-fledged,
|
|
||||||
* this interface will likely be removed.
|
|
||||||
*/
|
|
||||||
@InterfaceStability.Unstable
|
|
||||||
public interface IStorageClient {
|
|
||||||
ContainerDescriptor createContainer() throws IOException;
|
|
||||||
|
|
||||||
void deleteContainer(String containerId) throws IOException;
|
|
||||||
|
|
||||||
ContainerDescriptor getContainer(String containerId) throws IOException;
|
|
||||||
|
|
||||||
long getContainerSize() throws IOException;
|
|
||||||
}
|
|
|
@ -23,6 +23,10 @@ import org.apache.hadoop.cblock.meta.ContainerDescriptor;
|
||||||
import org.apache.hadoop.cblock.meta.VolumeDescriptor;
|
import org.apache.hadoop.cblock.meta.VolumeDescriptor;
|
||||||
import org.apache.hadoop.cblock.meta.VolumeInfo;
|
import org.apache.hadoop.cblock.meta.VolumeInfo;
|
||||||
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
|
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
|
||||||
|
import org.apache.hadoop.cblock.util.KeyUtil;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||||
|
import org.apache.hadoop.scm.client.ScmClient;
|
||||||
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -50,7 +54,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||||
public class StorageManager {
|
public class StorageManager {
|
||||||
private static final Logger LOGGER =
|
private static final Logger LOGGER =
|
||||||
LoggerFactory.getLogger(StorageManager.class);
|
LoggerFactory.getLogger(StorageManager.class);
|
||||||
private final IStorageClient storageClient;
|
private final ScmClient storageClient;
|
||||||
/**
|
/**
|
||||||
* We will NOT have the situation where same kv pair getting
|
* We will NOT have the situation where same kv pair getting
|
||||||
* processed, but it is possible to have multiple kv pair being
|
* processed, but it is possible to have multiple kv pair being
|
||||||
|
@ -68,10 +72,11 @@ public class StorageManager {
|
||||||
// TODO : assuming all containers are of the same size
|
// TODO : assuming all containers are of the same size
|
||||||
private long containerSizeB;
|
private long containerSizeB;
|
||||||
|
|
||||||
public StorageManager(IStorageClient storageClient) throws IOException {
|
public StorageManager(ScmClient storageClient,
|
||||||
|
OzoneConfiguration ozoneConfig) throws IOException {
|
||||||
this.storageClient = storageClient;
|
this.storageClient = storageClient;
|
||||||
this.user2VolumeMap = new ConcurrentHashMap<>();
|
this.user2VolumeMap = new ConcurrentHashMap<>();
|
||||||
this.containerSizeB = storageClient.getContainerSize();
|
this.containerSizeB = storageClient.getContainerSize(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -128,8 +133,10 @@ public class StorageManager {
|
||||||
|
|
||||||
for (String containerId : containerIds) {
|
for (String containerId : containerIds) {
|
||||||
try {
|
try {
|
||||||
|
Pipeline pipeline = storageClient.getContainer(containerId);
|
||||||
ContainerDescriptor containerDescriptor =
|
ContainerDescriptor containerDescriptor =
|
||||||
storageClient.getContainer(containerId);
|
new ContainerDescriptor(containerId);
|
||||||
|
containerDescriptor.setPipeline(pipeline);
|
||||||
volumeDescriptor.addContainer(containerDescriptor);
|
volumeDescriptor.addContainer(containerDescriptor);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOGGER.error("Getting container failed! Container:{} error:{}",
|
LOGGER.error("Getting container failed! Container:{} error:{}",
|
||||||
|
@ -172,7 +179,11 @@ public class StorageManager {
|
||||||
long allocatedSize = 0;
|
long allocatedSize = 0;
|
||||||
ArrayList<String> containerIds = new ArrayList<>();
|
ArrayList<String> containerIds = new ArrayList<>();
|
||||||
while (allocatedSize < volumeSize) {
|
while (allocatedSize < volumeSize) {
|
||||||
ContainerDescriptor container = storageClient.createContainer();
|
Pipeline pipeline = storageClient.createContainer(
|
||||||
|
KeyUtil.getContainerName(userName, volumeName, containerIdx));
|
||||||
|
ContainerDescriptor container =
|
||||||
|
new ContainerDescriptor(pipeline.getContainerName());
|
||||||
|
container.setPipeline(pipeline);
|
||||||
container.setContainerIndex(containerIdx);
|
container.setContainerIndex(containerIdx);
|
||||||
volume.addContainer(container);
|
volume.addContainer(container);
|
||||||
containerIds.add(container.getContainerID());
|
containerIds.add(container.getContainerID());
|
||||||
|
@ -211,7 +222,8 @@ public class StorageManager {
|
||||||
VolumeDescriptor volume = user2VolumeMap.get(userName).remove(volumeName);
|
VolumeDescriptor volume = user2VolumeMap.get(userName).remove(volumeName);
|
||||||
for (String containerID : volume.getContainerIDsList()) {
|
for (String containerID : volume.getContainerIDsList()) {
|
||||||
try {
|
try {
|
||||||
storageClient.deleteContainer(containerID);
|
Pipeline pipeline = storageClient.getContainer(containerID);
|
||||||
|
storageClient.deleteContainer(pipeline);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOGGER.error("Error deleting container Container:{} error:{}",
|
LOGGER.error("Error deleting container Container:{} error:{}",
|
||||||
containerID, e);
|
containerID, e);
|
||||||
|
@ -261,12 +273,12 @@ public class StorageManager {
|
||||||
|| !user2VolumeMap.get(userName).containsKey(volumeName)) {
|
|| !user2VolumeMap.get(userName).containsKey(volumeName)) {
|
||||||
// in the case of invalid volume, no need to set any value other than
|
// in the case of invalid volume, no need to set any value other than
|
||||||
// isValid flag.
|
// isValid flag.
|
||||||
return new MountVolumeResponse(false, null, null, 0, 0, null);
|
return new MountVolumeResponse(false, null, null, 0, 0, null, null);
|
||||||
}
|
}
|
||||||
VolumeDescriptor volume = user2VolumeMap.get(userName).get(volumeName);
|
VolumeDescriptor volume = user2VolumeMap.get(userName).get(volumeName);
|
||||||
return new MountVolumeResponse(true, userName,
|
return new MountVolumeResponse(true, userName,
|
||||||
volumeName, volume.getVolumeSize(), volume.getBlockSize(),
|
volumeName, volume.getVolumeSize(), volume.getBlockSize(),
|
||||||
volume.getContainerIDsList());
|
volume.getContainerPipelines(), volume.getPipelines());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -26,6 +26,11 @@ public final class KeyUtil {
|
||||||
return userName + ":" + volumeName;
|
return userName + ":" + volumeName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static String getContainerName(String userName, String volumeName,
|
||||||
|
int containerID) {
|
||||||
|
return getVolumeKey(userName, volumeName) + "#" + containerID;
|
||||||
|
}
|
||||||
|
|
||||||
private KeyUtil() {
|
private KeyUtil() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,8 @@ option java_generic_services = true;
|
||||||
option java_generate_equals_and_hash = true;
|
option java_generate_equals_and_hash = true;
|
||||||
package hadoop.cblock;
|
package hadoop.cblock;
|
||||||
|
|
||||||
|
import "DatanodeContainerProtocol.proto";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This message is sent from CBlock client side to CBlock server to
|
* This message is sent from CBlock client side to CBlock server to
|
||||||
* mount a volume specified by owner name and volume name.
|
* mount a volume specified by owner name and volume name.
|
||||||
|
@ -66,6 +68,8 @@ message MountVolumeResponseProto {
|
||||||
message ContainerIDProto {
|
message ContainerIDProto {
|
||||||
required string containerID = 1;
|
required string containerID = 1;
|
||||||
required uint64 index = 2;
|
required uint64 index = 2;
|
||||||
|
// making pipeline optional to be compatible with exisiting tests
|
||||||
|
optional hadoop.hdfs.ozone.Pipeline pipeline = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
service CBlockClientServerProtocolService {
|
service CBlockClientServerProtocolService {
|
||||||
|
|
|
@ -17,9 +17,11 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.cblock;
|
package org.apache.hadoop.cblock;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.RandomStringUtils;
|
||||||
import org.apache.hadoop.cblock.meta.VolumeInfo;
|
import org.apache.hadoop.cblock.meta.VolumeInfo;
|
||||||
import org.apache.hadoop.cblock.storage.IStorageClient;
|
import org.apache.hadoop.scm.client.ScmClient;
|
||||||
import org.apache.hadoop.cblock.util.MockStorageClient;
|
import org.apache.hadoop.cblock.util.MockStorageClient;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -36,12 +38,12 @@ import static org.junit.Assert.assertTrue;
|
||||||
*/
|
*/
|
||||||
public class TestCBlockServer {
|
public class TestCBlockServer {
|
||||||
private static CBlockManager cBlockManager;
|
private static CBlockManager cBlockManager;
|
||||||
private static CBlockConfiguration conf;
|
private static OzoneConfiguration conf;
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setup() throws Exception {
|
public static void setup() throws Exception {
|
||||||
IStorageClient storageClient = new MockStorageClient();
|
ScmClient storageClient = new MockStorageClient();
|
||||||
conf = new CBlockConfiguration();
|
conf = new OzoneConfiguration();
|
||||||
cBlockManager = new CBlockManager(conf, storageClient);
|
cBlockManager = new CBlockManager(conf, storageClient);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -51,10 +53,10 @@ public class TestCBlockServer {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testCreateVolume() throws Exception {
|
public void testCreateVolume() throws Exception {
|
||||||
String userName1 = "testCreateUser1";
|
String userName1 = "user" + RandomStringUtils.randomNumeric(5);
|
||||||
String userName2 = "testCreateUser2";
|
String userName2 = "user" + RandomStringUtils.randomNumeric(5);
|
||||||
String volumeName1 = "testVolume1";
|
String volumeName1 = "volume" + RandomStringUtils.randomNumeric(5);
|
||||||
String volumeName2 = "testVolume2";
|
String volumeName2 = "volume" + RandomStringUtils.randomNumeric(5);
|
||||||
long volumeSize = 1L*1024*1024;
|
long volumeSize = 1L*1024*1024;
|
||||||
int blockSize = 4096;
|
int blockSize = 4096;
|
||||||
cBlockManager.createVolume(userName1, volumeName1, volumeSize, blockSize);
|
cBlockManager.createVolume(userName1, volumeName1, volumeSize, blockSize);
|
||||||
|
@ -80,9 +82,9 @@ public class TestCBlockServer {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testDeleteVolume() throws Exception {
|
public void testDeleteVolume() throws Exception {
|
||||||
String userName = "testDeleteUser";
|
String userName = "user" + RandomStringUtils.randomNumeric(5);
|
||||||
String volumeName1 = "testVolume1";
|
String volumeName1 = "volume" + RandomStringUtils.randomNumeric(5);
|
||||||
String volumeName2 = "testVolume2";
|
String volumeName2 = "volume" + RandomStringUtils.randomNumeric(5);
|
||||||
long volumeSize = 1L*1024*1024;
|
long volumeSize = 1L*1024*1024;
|
||||||
int blockSize = 4096;
|
int blockSize = 4096;
|
||||||
cBlockManager.createVolume(userName, volumeName1, volumeSize, blockSize);
|
cBlockManager.createVolume(userName, volumeName1, volumeSize, blockSize);
|
||||||
|
@ -106,8 +108,8 @@ public class TestCBlockServer {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testInfoVolume() throws Exception {
|
public void testInfoVolume() throws Exception {
|
||||||
String userName = "testInfoUser";
|
String userName = "user" + RandomStringUtils.randomNumeric(5);
|
||||||
String volumeName = "testVolume";
|
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
|
||||||
long volumeSize = 1L*1024*1024;
|
long volumeSize = 1L*1024*1024;
|
||||||
int blockSize = 4096;
|
int blockSize = 4096;
|
||||||
cBlockManager.createVolume(userName, volumeName, volumeSize, blockSize);
|
cBlockManager.createVolume(userName, volumeName, volumeSize, blockSize);
|
||||||
|
@ -124,8 +126,8 @@ public class TestCBlockServer {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testListVolume() throws Exception {
|
public void testListVolume() throws Exception {
|
||||||
String userName = "testListUser";
|
String userName = "user" + RandomStringUtils.randomNumeric(5);
|
||||||
String volumeName = "testVolume";
|
String volumeName ="volume" + RandomStringUtils.randomNumeric(5);
|
||||||
long volumeSize = 1L*1024*1024;
|
long volumeSize = 1L*1024*1024;
|
||||||
int blockSize = 4096;
|
int blockSize = 4096;
|
||||||
int volumeNum = 100;
|
int volumeNum = 100;
|
||||||
|
|
|
@ -18,10 +18,15 @@
|
||||||
package org.apache.hadoop.cblock;
|
package org.apache.hadoop.cblock;
|
||||||
|
|
||||||
import org.apache.hadoop.cblock.meta.VolumeDescriptor;
|
import org.apache.hadoop.cblock.meta.VolumeDescriptor;
|
||||||
import org.apache.hadoop.cblock.storage.IStorageClient;
|
import org.apache.hadoop.scm.client.ScmClient;
|
||||||
import org.apache.hadoop.cblock.util.MockStorageClient;
|
import org.apache.hadoop.cblock.util.MockStorageClient;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||||
|
import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URL;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY;
|
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY;
|
||||||
|
@ -40,6 +45,7 @@ public class TestCBlockServerPersistence {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testWriteToPersistentStore() throws Exception {
|
public void testWriteToPersistentStore() throws Exception {
|
||||||
|
OzoneConfiguration conf = new OzoneConfiguration();
|
||||||
String userName = "testWriteToPersistentStore";
|
String userName = "testWriteToPersistentStore";
|
||||||
String volumeName1 = "testVolume1";
|
String volumeName1 = "testVolume1";
|
||||||
String volumeName2 = "testVolume2";
|
String volumeName2 = "testVolume2";
|
||||||
|
@ -48,11 +54,17 @@ public class TestCBlockServerPersistence {
|
||||||
int blockSize = 4096;
|
int blockSize = 4096;
|
||||||
CBlockManager cBlockManager = null;
|
CBlockManager cBlockManager = null;
|
||||||
CBlockManager cBlockManager1 = null;
|
CBlockManager cBlockManager1 = null;
|
||||||
String dbPath = "/tmp/testCblockPersistence.dat";
|
URL p = conf.getClass().getResource("");
|
||||||
|
String path = p.getPath().concat(
|
||||||
|
TestOzoneContainer.class.getSimpleName());
|
||||||
|
File filePath = new File(path);
|
||||||
|
if(!filePath.exists() && !filePath.mkdirs()) {
|
||||||
|
throw new IOException("Unable to create test DB dir");
|
||||||
|
}
|
||||||
|
conf.set(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY, path.concat(
|
||||||
|
"/testCblockPersistence.dat"));
|
||||||
try {
|
try {
|
||||||
IStorageClient storageClient = new MockStorageClient();
|
ScmClient storageClient = new MockStorageClient();
|
||||||
CBlockConfiguration conf = new CBlockConfiguration();
|
|
||||||
conf.set(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY, dbPath);
|
|
||||||
cBlockManager = new CBlockManager(conf, storageClient);
|
cBlockManager = new CBlockManager(conf, storageClient);
|
||||||
cBlockManager.createVolume(userName, volumeName1, volumeSize1, blockSize);
|
cBlockManager.createVolume(userName, volumeName1, volumeSize1, blockSize);
|
||||||
cBlockManager.createVolume(userName, volumeName2, volumeSize2, blockSize);
|
cBlockManager.createVolume(userName, volumeName2, volumeSize2, blockSize);
|
||||||
|
@ -69,9 +81,10 @@ public class TestCBlockServerPersistence {
|
||||||
|
|
||||||
// create a new cblock server instance. This is just the
|
// create a new cblock server instance. This is just the
|
||||||
// same as restarting cblock server.
|
// same as restarting cblock server.
|
||||||
IStorageClient storageClient1 = new MockStorageClient();
|
ScmClient storageClient1 = new MockStorageClient();
|
||||||
CBlockConfiguration conf1 = new CBlockConfiguration();
|
OzoneConfiguration conf1 = new OzoneConfiguration();
|
||||||
conf1.set(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY, dbPath);
|
conf1.set(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY, path.concat(
|
||||||
|
"/testCblockPersistence.dat"));
|
||||||
cBlockManager1 = new CBlockManager(conf1, storageClient1);
|
cBlockManager1 = new CBlockManager(conf1, storageClient1);
|
||||||
List<VolumeDescriptor> allVolumes1 = cBlockManager1.getAllVolumes();
|
List<VolumeDescriptor> allVolumes1 = cBlockManager1.getAllVolumes();
|
||||||
assertEquals(2, allVolumes1.size());
|
assertEquals(2, allVolumes1.size());
|
||||||
|
@ -85,11 +98,15 @@ public class TestCBlockServerPersistence {
|
||||||
// here.
|
// here.
|
||||||
if (volumeDescriptor1.getVolumeName().equals(
|
if (volumeDescriptor1.getVolumeName().equals(
|
||||||
newvolumeDescriptor1.getVolumeName())) {
|
newvolumeDescriptor1.getVolumeName())) {
|
||||||
assertEquals(volumeDescriptor1, newvolumeDescriptor1);
|
assertEquals(volumeDescriptor1.toString(),
|
||||||
assertEquals(volumeDescriptor2, newvolumeDescriptor2);
|
newvolumeDescriptor1.toString());
|
||||||
|
assertEquals(volumeDescriptor2.toString(),
|
||||||
|
newvolumeDescriptor2.toString());
|
||||||
} else {
|
} else {
|
||||||
assertEquals(volumeDescriptor1, newvolumeDescriptor2);
|
assertEquals(volumeDescriptor1.toString(),
|
||||||
assertEquals(volumeDescriptor2, newvolumeDescriptor1);
|
newvolumeDescriptor2.toString());
|
||||||
|
assertEquals(volumeDescriptor2.toString(),
|
||||||
|
newvolumeDescriptor1.toString());
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (cBlockManager != null) {
|
if (cBlockManager != null) {
|
||||||
|
|
|
@ -18,7 +18,10 @@
|
||||||
package org.apache.hadoop.cblock.util;
|
package org.apache.hadoop.cblock.util;
|
||||||
|
|
||||||
import org.apache.hadoop.cblock.meta.ContainerDescriptor;
|
import org.apache.hadoop.cblock.meta.ContainerDescriptor;
|
||||||
|
import org.apache.hadoop.ozone.container.ContainerTestHelper;
|
||||||
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -42,19 +45,26 @@ public final class ContainerLookUpService {
|
||||||
*
|
*
|
||||||
* found
|
* found
|
||||||
* @param containerID
|
* @param containerID
|
||||||
* @return
|
* @return the corresponding pipeline instance (create if not exist)
|
||||||
*/
|
*/
|
||||||
public static ContainerDescriptor lookUp(String containerID) {
|
public static ContainerDescriptor lookUp(String containerID)
|
||||||
|
throws IOException {
|
||||||
if (!containers.containsKey(containerID)) {
|
if (!containers.containsKey(containerID)) {
|
||||||
System.err.println("A container id never seen, return a new one " +
|
Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline(
|
||||||
"for testing purpose:" + containerID);
|
containerID);
|
||||||
containers.put(containerID, new ContainerDescriptor(containerID));
|
ContainerDescriptor cd = new ContainerDescriptor(containerID);
|
||||||
|
cd.setPipeline(pipeline);
|
||||||
|
containers.put(containerID, cd);
|
||||||
}
|
}
|
||||||
return containers.get(containerID);
|
return containers.get(containerID);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void addContainer(String containerID) {
|
public static void addContainer(String containerID) throws IOException {
|
||||||
containers.put(containerID, new ContainerDescriptor(containerID));
|
Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline(
|
||||||
|
containerID);
|
||||||
|
ContainerDescriptor cd = new ContainerDescriptor(containerID);
|
||||||
|
cd.setPipeline(pipeline);
|
||||||
|
containers.put(containerID, cd);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ContainerLookUpService() {
|
private ContainerLookUpService() {
|
||||||
|
|
|
@ -17,8 +17,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.cblock.util;
|
package org.apache.hadoop.cblock.util;
|
||||||
|
|
||||||
import org.apache.hadoop.cblock.meta.ContainerDescriptor;
|
import org.apache.hadoop.scm.client.ScmClient;
|
||||||
import org.apache.hadoop.cblock.storage.IStorageClient;
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@ -30,7 +30,7 @@ import java.io.IOException;
|
||||||
* replaced with actual container look up calls.
|
* replaced with actual container look up calls.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public class MockStorageClient implements IStorageClient {
|
public class MockStorageClient implements ScmClient {
|
||||||
private static long currentContainerId = -1;
|
private static long currentContainerId = -1;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -39,20 +39,22 @@ public class MockStorageClient implements IStorageClient {
|
||||||
* @return A container descriptor object to locate this container
|
* @return A container descriptor object to locate this container
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
public ContainerDescriptor createContainer() throws IOException {
|
@Override
|
||||||
|
public Pipeline createContainer(String containerId)
|
||||||
|
throws IOException {
|
||||||
currentContainerId += 1;
|
currentContainerId += 1;
|
||||||
ContainerLookUpService.addContainer(Long.toString(currentContainerId));
|
ContainerLookUpService.addContainer(Long.toString(currentContainerId));
|
||||||
return ContainerLookUpService.lookUp(Long.toString(currentContainerId));
|
return ContainerLookUpService.lookUp(Long.toString(currentContainerId))
|
||||||
|
.getPipeline();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* As this is only a testing class, with all "container" maintained in
|
* As this is only a testing class, with all "container" maintained in
|
||||||
* memory, no need to really delete anything for now.
|
* memory, no need to really delete anything for now.
|
||||||
* @param containerId
|
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void deleteContainer(String containerId) throws IOException {
|
public void deleteContainer(Pipeline pipeline) throws IOException {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -63,13 +65,13 @@ public class MockStorageClient implements IStorageClient {
|
||||||
* @return
|
* @return
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public ContainerDescriptor getContainer(String containerId)
|
public Pipeline getContainer(String containerId)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return ContainerLookUpService.lookUp(containerId);
|
return ContainerLookUpService.lookUp(containerId).getPipeline();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getContainerSize() throws IOException {
|
public long getContainerSize(Pipeline pipeline) throws IOException {
|
||||||
// just return a constant value for now
|
// just return a constant value for now
|
||||||
return 5L*1024*1024*1024; // 5GB
|
return 5L*1024*1024*1024; // 5GB
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue