HDFS-11138. Block Storage: add block storage server. Contributed by Chen Liang
This commit is contained in:
parent
cb6b6b47b6
commit
334113adab
|
@ -0,0 +1,63 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.cblock;
|
||||
|
||||
/**
|
||||
* This class contains constants for configuration keys used in CBlock.
|
||||
*/
|
||||
public final class CBlockConfigKeys {
|
||||
public static final String DFS_CBLOCK_ENABLED_KEY =
|
||||
"dfs.cblock.enabled";
|
||||
public static final String DFS_CBLOCK_SERVICERPC_ADDRESS_KEY =
|
||||
"dfs.cblock.servicerpc-address";
|
||||
public static final int DFS_CBLOCK_RPCSERVICE_PORT_DEFAULT =
|
||||
9810;
|
||||
public static final String DFS_CBLOCK_RPCSERVICE_IP_DEFAULT =
|
||||
"0.0.0.0";
|
||||
public static final String DFS_CBLOCK_SERVICERPC_ADDRESS_DEFAULT =
|
||||
DFS_CBLOCK_RPCSERVICE_IP_DEFAULT
|
||||
+ ":" + DFS_CBLOCK_RPCSERVICE_PORT_DEFAULT;
|
||||
|
||||
public static final String DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY =
|
||||
"dfs.cblock.jscsi-address";
|
||||
|
||||
//The port on CBlockManager node for jSCSI to ask
|
||||
public static final int DFS_CBLOCK_JSCSI_PORT_DEFAULT =
|
||||
9811;
|
||||
public static final String DFS_CBLOCK_JSCSIRPC_ADDRESS_DEFAULT =
|
||||
DFS_CBLOCK_RPCSERVICE_IP_DEFAULT
|
||||
+ ":" + DFS_CBLOCK_JSCSI_PORT_DEFAULT;
|
||||
|
||||
|
||||
public static final String DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY =
|
||||
"dfs.cblock.service.rpc-bind-host";
|
||||
public static final String DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY =
|
||||
"dfs.cblock.jscsi.rpc-bind-host";
|
||||
|
||||
// default block size is 4KB
|
||||
public static final int DFS_CBLOCK_SERVICE_BLOCK_SIZE_DEFAULT =
|
||||
4096;
|
||||
|
||||
public static final String DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY =
|
||||
"dfs.storage.service.handler.count";
|
||||
public static final int DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT = 10;
|
||||
|
||||
private CBlockConfigKeys() {
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.cblock;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
/**
|
||||
* This class specifies the CBlock configuration resources.
|
||||
*/
|
||||
public class CBlockConfiguration extends Configuration {
|
||||
static {
|
||||
// adds the default resources
|
||||
Configuration.addDefaultResource("hdfs-default.xml");
|
||||
Configuration.addDefaultResource("hdfs-site.xml");
|
||||
Configuration.addDefaultResource("cblock-default.xml");
|
||||
Configuration.addDefaultResource("cblock-site.xml");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,238 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.cblock;
|
||||
|
||||
import com.google.protobuf.BlockingService;
|
||||
import org.apache.hadoop.cblock.meta.VolumeDescriptor;
|
||||
import org.apache.hadoop.cblock.meta.VolumeInfo;
|
||||
import org.apache.hadoop.cblock.proto.CBlockClientServerProtocol;
|
||||
import org.apache.hadoop.cblock.proto.CBlockServiceProtocol;
|
||||
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
|
||||
import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos;
|
||||
import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos;
|
||||
import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolPB;
|
||||
import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
|
||||
import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.cblock.storage.IStorageClient;
|
||||
import org.apache.hadoop.cblock.storage.StorageManager;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSIRPC_ADDRESS_DEFAULT;
|
||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY;
|
||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_ADDRESS_DEFAULT;
|
||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY;
|
||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT;
|
||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY;
|
||||
|
||||
/**
|
||||
* The main entry point of CBlock operations, ALL the CBlock operations
|
||||
* will go through this class. But NOTE that:
|
||||
*
|
||||
* volume operations (create/
|
||||
* delete/info) are:
|
||||
* client -> CBlockManager -> StorageManager -> CBlock client
|
||||
*
|
||||
* IO operations (put/get block) are;
|
||||
* client -> CBlock client -> container
|
||||
*
|
||||
*/
|
||||
public class CBlockManager implements CBlockServiceProtocol,
|
||||
CBlockClientServerProtocol {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(CBlockManager.class);
|
||||
|
||||
private final RPC.Server cblockService;
|
||||
private final RPC.Server cblockServer;
|
||||
|
||||
private final StorageManager storageManager;
|
||||
|
||||
public CBlockManager(CBlockConfiguration conf, IStorageClient storageClient
|
||||
) throws IOException {
|
||||
storageManager = new StorageManager(storageClient);
|
||||
|
||||
RPC.setProtocolEngine(conf, CBlockServiceProtocolPB.class,
|
||||
ProtobufRpcEngine.class);
|
||||
RPC.setProtocolEngine(conf, CBlockClientServerProtocolPB.class,
|
||||
ProtobufRpcEngine.class);
|
||||
// start service for client command-to-cblock server service
|
||||
InetSocketAddress serviceRpcAddr = NetUtils.createSocketAddr(
|
||||
conf.getTrimmed(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY,
|
||||
DFS_CBLOCK_SERVICERPC_ADDRESS_DEFAULT), -1,
|
||||
DFS_CBLOCK_SERVICERPC_ADDRESS_KEY);
|
||||
BlockingService cblockProto =
|
||||
CBlockServiceProtocolProtos
|
||||
.CBlockServiceProtocolService
|
||||
.newReflectiveBlockingService(
|
||||
new CBlockServiceProtocolServerSideTranslatorPB(this)
|
||||
);
|
||||
cblockService = startRpcServer(conf, CBlockServiceProtocolPB.class,
|
||||
cblockProto, serviceRpcAddr,
|
||||
DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY,
|
||||
DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY,
|
||||
DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT);
|
||||
|
||||
LOG.info("CBlock manager listening for client commands on: {}",
|
||||
serviceRpcAddr);
|
||||
// now start service for cblock client-to-cblock server communication
|
||||
InetSocketAddress serverRpcAddr = NetUtils.createSocketAddr(
|
||||
conf.get(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY,
|
||||
DFS_CBLOCK_JSCSIRPC_ADDRESS_DEFAULT), -1,
|
||||
DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY);
|
||||
BlockingService serverProto =
|
||||
CBlockClientServerProtocolProtos
|
||||
.CBlockClientServerProtocolService
|
||||
.newReflectiveBlockingService(
|
||||
new CBlockClientServerProtocolServerSideTranslatorPB(this)
|
||||
);
|
||||
cblockServer = startRpcServer(
|
||||
conf, CBlockClientServerProtocolPB.class,
|
||||
serverProto, serverRpcAddr,
|
||||
DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY,
|
||||
DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY,
|
||||
DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT);
|
||||
LOG.info("CBlock server listening for client commands on: {}",
|
||||
serverRpcAddr);
|
||||
}
|
||||
|
||||
public void start() {
|
||||
cblockService.start();
|
||||
cblockServer.start();
|
||||
LOG.info("CBlock manager started!");
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
cblockService.stop();
|
||||
cblockServer.stop();
|
||||
}
|
||||
|
||||
public void join() {
|
||||
try {
|
||||
cblockService.join();
|
||||
cblockServer.join();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("Interrupted during join");
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts an RPC server, if configured.
|
||||
*
|
||||
* @param conf configuration
|
||||
* @param protocol RPC protocol provided by RPC server
|
||||
* @param instance RPC protocol implementation instance
|
||||
* @param addr configured address of RPC server
|
||||
* @param bindHostKey configuration key for setting explicit bind host. If
|
||||
* the property is not configured, then the bind host is taken from addr.
|
||||
* @param handlerCountKey configuration key for RPC server handler count
|
||||
* @param handlerCountDefault default RPC server handler count if unconfigured
|
||||
* @return RPC server, or null if addr is null
|
||||
* @throws IOException if there is an I/O error while creating RPC server
|
||||
*/
|
||||
private static RPC.Server startRpcServer(CBlockConfiguration conf,
|
||||
Class<?> protocol, BlockingService instance,
|
||||
InetSocketAddress addr, String bindHostKey,
|
||||
String handlerCountKey, int handlerCountDefault) throws IOException {
|
||||
if (addr == null) {
|
||||
return null;
|
||||
}
|
||||
String bindHost = conf.getTrimmed(bindHostKey);
|
||||
if (bindHost == null || bindHost.isEmpty()) {
|
||||
bindHost = addr.getHostName();
|
||||
}
|
||||
int numHandlers = conf.getInt(handlerCountKey, handlerCountDefault);
|
||||
RPC.Server rpcServer = new RPC.Builder(conf)
|
||||
.setProtocol(protocol)
|
||||
.setInstance(instance)
|
||||
.setBindAddress(bindHost)
|
||||
.setPort(addr.getPort())
|
||||
.setNumHandlers(numHandlers)
|
||||
.setVerbose(false)
|
||||
.setSecretManager(null)
|
||||
.build();
|
||||
return rpcServer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MountVolumeResponse mountVolume(
|
||||
String userName, String volumeName) throws IOException {
|
||||
return storageManager.isVolumeValid(userName, volumeName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createVolume(String userName, String volumeName,
|
||||
long volumeSize, int blockSize) throws IOException {
|
||||
LOG.info("Create volume received: userName: {} volumeName: {} " +
|
||||
"volumeSize: {} blockSize: {}", userName, volumeName,
|
||||
volumeSize, blockSize);
|
||||
// It is important to create in-memory representation of the
|
||||
// volume first, then writes to persistent storage (levelDB)
|
||||
// such that it is guaranteed that when there is an entry in
|
||||
// levelDB, the volume is allocated. (more like a UNDO log fashion)
|
||||
// TODO: what if creation failed? we allocated containers but lost
|
||||
// the reference to the volume and all it's containers. How to release
|
||||
// the containers?
|
||||
storageManager.createVolume(userName, volumeName, volumeSize, blockSize);
|
||||
VolumeDescriptor volume = storageManager.getVolume(userName, volumeName);
|
||||
if (volume == null) {
|
||||
throw new IOException("Volume creation failed!");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteVolume(String userName,
|
||||
String volumeName, boolean force) throws IOException {
|
||||
LOG.info("Delete volume received: volume:" + volumeName
|
||||
+ " force?:" + force);
|
||||
storageManager.deleteVolume(userName, volumeName, force);
|
||||
}
|
||||
|
||||
@Override
|
||||
public VolumeInfo infoVolume(String userName, String volumeName
|
||||
) throws IOException {
|
||||
LOG.info("Info volume received: volume: {}", volumeName);
|
||||
return storageManager.infoVolume(userName, volumeName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<VolumeInfo> listVolume(String userName) throws IOException {
|
||||
ArrayList<VolumeInfo> response = new ArrayList<>();
|
||||
List<VolumeDescriptor> allVolumes =
|
||||
storageManager.getAllVolume(userName);
|
||||
for (VolumeDescriptor volume : allVolumes) {
|
||||
VolumeInfo info =
|
||||
new VolumeInfo(volume.getUserName(), volume.getVolumeName(),
|
||||
volume.getVolumeSize(), volume.getBlockSize());
|
||||
response.add(info);
|
||||
}
|
||||
return response;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.cblock.exception;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* The exception class used in CBlock.
|
||||
*/
|
||||
public class CBlockException extends IOException {
|
||||
public CBlockException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.cblock.exception;
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.cblock.meta;
|
||||
|
||||
/**
|
||||
*
|
||||
* The internal representation of a container maintained by CBlock server.
|
||||
* Include enough information to exactly identify a container for read/write
|
||||
* operation.
|
||||
*
|
||||
* NOTE that this class is work-in-progress. Depends on HDFS-7240 container
|
||||
* implementation. Currently only to allow testing.
|
||||
*/
|
||||
public class ContainerDescriptor {
|
||||
private final String containerID;
|
||||
// the index of this container with in a volume
|
||||
// on creation, there is no way to know the index of the container
|
||||
// as it is a volume specific information
|
||||
private int containerIndex;
|
||||
|
||||
public ContainerDescriptor(String containerID) {
|
||||
this.containerID = containerID;
|
||||
}
|
||||
|
||||
public void setContainerIndex(int idx) {
|
||||
this.containerIndex = idx;
|
||||
}
|
||||
|
||||
public String getContainerID() {
|
||||
return containerID;
|
||||
}
|
||||
|
||||
public int getContainerIndex() {
|
||||
return containerIndex;
|
||||
}
|
||||
|
||||
public long getUtilization() {
|
||||
return 0;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,165 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.cblock.meta;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* The internal representation maintained by CBlock server as the info for
|
||||
* a volume. Contains the list of containers belonging to this volume.
|
||||
*
|
||||
* Many methods of this class is made such that the volume information (
|
||||
* including container list) can be easily transformed into a Json string
|
||||
* that can be stored/parsed from a persistent store for cblock server
|
||||
* persistence.
|
||||
*
|
||||
* This class is still work-in-progress.
|
||||
*/
|
||||
public class VolumeDescriptor {
|
||||
// The main data structure is the container location map
|
||||
// other thing are mainly just information
|
||||
|
||||
// since only one operation at a time is allowed, no
|
||||
// need to consider concurrency control here
|
||||
|
||||
// key is container id
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(VolumeDescriptor.class);
|
||||
|
||||
private HashMap<String, ContainerDescriptor> containerMap;
|
||||
private String userName;
|
||||
private int blockSize;
|
||||
private long volumeSize;
|
||||
private String volumeName;
|
||||
// this is essentially the ordered keys of containerMap
|
||||
// which is kind of redundant information. But since we
|
||||
// are likely to access it frequently based on ordering.
|
||||
// keeping this copy to avoid having to sort the key every
|
||||
// time
|
||||
private List<String> containerIdOrdered;
|
||||
|
||||
/**
|
||||
* This is not being called explicitly, but this is necessary as
|
||||
* it will be called by the parse method implicitly when
|
||||
* reconstructing the object from json string. The get*() methods
|
||||
* and set*() methods are for the same purpose also.
|
||||
*/
|
||||
public VolumeDescriptor() {
|
||||
containerMap = new HashMap<>();
|
||||
containerIdOrdered = new ArrayList<>();
|
||||
}
|
||||
|
||||
public VolumeDescriptor(String userName, String volumeName, long volumeSize,
|
||||
int blockSize) {
|
||||
this.containerMap = new HashMap<>();
|
||||
this.userName = userName;
|
||||
this.volumeName = volumeName;
|
||||
this.blockSize = blockSize;
|
||||
this.volumeSize = volumeSize;
|
||||
this.containerIdOrdered = new LinkedList<>();
|
||||
}
|
||||
|
||||
public String getUserName() {
|
||||
return userName;
|
||||
}
|
||||
|
||||
public void setUserName(String userName) {
|
||||
this.userName = userName;
|
||||
}
|
||||
|
||||
public String getVolumeName() {
|
||||
return volumeName;
|
||||
}
|
||||
|
||||
public void setVolumeName(String volumeName) {
|
||||
this.volumeName = volumeName;
|
||||
}
|
||||
|
||||
public long getVolumeSize() {
|
||||
return volumeSize;
|
||||
}
|
||||
|
||||
public void setVolumeSize(long volumeSize) {
|
||||
this.volumeSize = volumeSize;
|
||||
}
|
||||
|
||||
public int getBlockSize() {
|
||||
return blockSize;
|
||||
}
|
||||
|
||||
public void setBlockSize(int blockSize) {
|
||||
this.blockSize = blockSize;
|
||||
}
|
||||
|
||||
public void setContainerIDs(ArrayList<String> containerIDs) {
|
||||
containerIdOrdered.addAll(containerIDs);
|
||||
}
|
||||
|
||||
public void addContainer(ContainerDescriptor containerDescriptor) {
|
||||
containerMap.put(containerDescriptor.getContainerID(),
|
||||
containerDescriptor);
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
VolumeInfo info = getInfo();
|
||||
return info.getUsage() == 0;
|
||||
}
|
||||
|
||||
public VolumeInfo getInfo() {
|
||||
// TODO : need to actually go through all containers of this volume and
|
||||
// ask for their utilization.
|
||||
long utilization = 0;
|
||||
for (Map.Entry<String, ContainerDescriptor> entry :
|
||||
containerMap.entrySet()) {
|
||||
utilization += entry.getValue().getUtilization();
|
||||
}
|
||||
return new VolumeInfo(this.userName, this.volumeName,
|
||||
this.volumeSize, this.blockSize,
|
||||
utilization * blockSize);
|
||||
}
|
||||
|
||||
public String[] getContainerIDs() {
|
||||
//ArrayList<Long> ids = new ArrayList(containerMap.keySet());
|
||||
//return ids.toArray(new Long[ids.size()]);
|
||||
return containerIdOrdered.toArray(new String[containerIdOrdered.size()]);
|
||||
}
|
||||
|
||||
public List<String> getContainerIDsList() {
|
||||
return new ArrayList<>(containerIdOrdered);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
String string = "";
|
||||
string += "Username:" + userName + "\n";
|
||||
string += "VolumeName:" + volumeName + "\n";
|
||||
string += "VolumeSize:" + volumeSize + "\n";
|
||||
string += "blockSize:" + blockSize + "\n";
|
||||
string += "containerIds:" + containerIdOrdered + "\n";
|
||||
string += "containerIdsWithObject:" + containerMap.keySet();
|
||||
return string;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,79 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.cblock.meta;
|
||||
|
||||
/**
|
||||
* A wrapper class that represents the information about a volume. Used in
|
||||
* communication between CBlock client and CBlock server only.
|
||||
*/
|
||||
public class VolumeInfo {
|
||||
private final String userName;
|
||||
private final String volumeName;
|
||||
private final long volumeSize;
|
||||
private final long blockSize;
|
||||
private final long usage;
|
||||
|
||||
public VolumeInfo(String userName, String volumeName, long volumeSize,
|
||||
long blockSize, long usage) {
|
||||
this.userName = userName;
|
||||
this.volumeName = volumeName;
|
||||
this.volumeSize = volumeSize;
|
||||
this.blockSize = blockSize;
|
||||
this.usage = usage;
|
||||
}
|
||||
|
||||
// When listing volume, the usage will not be set.
|
||||
public VolumeInfo(String userName, String volumeName, long volumeSize,
|
||||
long blockSize) {
|
||||
this.userName = userName;
|
||||
this.volumeName = volumeName;
|
||||
this.volumeSize = volumeSize;
|
||||
this.blockSize = blockSize;
|
||||
this.usage = -1;
|
||||
}
|
||||
|
||||
public long getVolumeSize() {
|
||||
return volumeSize;
|
||||
}
|
||||
|
||||
public long getBlockSize() {
|
||||
return blockSize;
|
||||
}
|
||||
|
||||
public long getUsage() {
|
||||
return usage;
|
||||
}
|
||||
|
||||
public String getUserName() {
|
||||
return userName;
|
||||
}
|
||||
|
||||
public String getVolumeName() {
|
||||
return volumeName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return " userName:" + userName +
|
||||
" volumeName:" + volumeName +
|
||||
" volumeSize:" + volumeSize +
|
||||
" blockSize:" + blockSize +
|
||||
" (sizeInBlocks:" + volumeSize/blockSize + ")" +
|
||||
" usageInBlocks:" + usage;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.cblock.meta;
|
|
@ -0,0 +1,18 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.cblock;
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.cblock.proto;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* The protocol that CBlock client side uses to talk to server side. CBlock
|
||||
* client is the point where a volume is mounted. All the actual volume IO
|
||||
* operations will go through CBlock client after the volume is mounted.
|
||||
*
|
||||
* When users mount a volume on CBlock client, CBlock client side uses this
|
||||
* protocol to send mount request to CBlock server.
|
||||
*/
|
||||
public interface CBlockClientServerProtocol {
|
||||
MountVolumeResponse mountVolume(String userName, String volumeName)
|
||||
throws IOException;
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.cblock.proto;
|
||||
|
||||
import org.apache.hadoop.cblock.meta.VolumeInfo;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* CBlock uses a separate command line tool to send volume management
|
||||
* operations to CBlock server, including create/delete/info/list volumes. This
|
||||
* is the protocol used by the command line tool to send these requests and get
|
||||
* responses from CBlock server.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface CBlockServiceProtocol {
|
||||
|
||||
void createVolume(String userName, String volumeName,
|
||||
long volumeSize, int blockSize) throws IOException;
|
||||
|
||||
void deleteVolume(String userName, String volumeName,
|
||||
boolean force) throws IOException;
|
||||
|
||||
VolumeInfo infoVolume(String userName,
|
||||
String volumeName) throws IOException;
|
||||
|
||||
List<VolumeInfo> listVolume(String userName) throws IOException;
|
||||
}
|
|
@ -0,0 +1,69 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.cblock.proto;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* The response message of mounting a volume. Including enough information
|
||||
* for the client to communicate (perform IO) with the volume containers
|
||||
* directly.
|
||||
*/
|
||||
public class MountVolumeResponse {
|
||||
private final boolean isValid;
|
||||
private final String userName;
|
||||
private final String volumeName;
|
||||
private final long volumeSize;
|
||||
private final int blockSize;
|
||||
private List<String> containerList;
|
||||
|
||||
public MountVolumeResponse(boolean isValid, String userName,
|
||||
String volumeName, long volumeSize, int blockSize,
|
||||
List<String> containerList) {
|
||||
this.isValid = isValid;
|
||||
this.userName = userName;
|
||||
this.volumeName = volumeName;
|
||||
this.volumeSize = volumeSize;
|
||||
this.blockSize = blockSize;
|
||||
this.containerList = containerList;
|
||||
}
|
||||
|
||||
public boolean getIsValid() {
|
||||
return isValid;
|
||||
}
|
||||
|
||||
public String getUserName() {
|
||||
return userName;
|
||||
}
|
||||
|
||||
public String getVolumeName() {
|
||||
return volumeName;
|
||||
}
|
||||
|
||||
public long getVolumeSize() {
|
||||
return volumeSize;
|
||||
}
|
||||
|
||||
public int getBlockSize() {
|
||||
return blockSize;
|
||||
}
|
||||
|
||||
public List<String> getContainerList() {
|
||||
return containerList;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.cblock.proto;
|
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.cblock.protocolPB;
|
||||
|
||||
import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.ipc.ProtocolInfo;
|
||||
|
||||
/**
|
||||
* This is the protocol CBlock client uses to talk to CBlock server.
|
||||
* CBlock client is the mounting point of a volume. When a user mounts a
|
||||
* volume, the cBlock client running on the local node will use this protocol
|
||||
* to talk to CBlock server to mount the volume.
|
||||
*/
|
||||
@ProtocolInfo(protocolName =
|
||||
"org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocol",
|
||||
protocolVersion = 1)
|
||||
@InterfaceAudience.Private
|
||||
public interface CBlockClientServerProtocolPB extends
|
||||
CBlockClientServerProtocolProtos
|
||||
.CBlockClientServerProtocolService.BlockingInterface {
|
||||
}
|
|
@ -0,0 +1,78 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.cblock.protocolPB;
|
||||
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import org.apache.hadoop.cblock.proto.CBlockClientServerProtocol;
|
||||
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
|
||||
import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* The server side implementation of cblock client to server protocol.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class CBlockClientServerProtocolServerSideTranslatorPB implements
|
||||
CBlockClientServerProtocolPB {
|
||||
|
||||
private final CBlockClientServerProtocol impl;
|
||||
|
||||
public CBlockClientServerProtocolServerSideTranslatorPB(
|
||||
CBlockClientServerProtocol impl) {
|
||||
this.impl = impl;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CBlockClientServerProtocolProtos.MountVolumeResponseProto mountVolume(
|
||||
RpcController controller,
|
||||
CBlockClientServerProtocolProtos.MountVolumeRequestProto request)
|
||||
throws ServiceException {
|
||||
String userName = request.getUserName();
|
||||
String volumeName = request.getVolumeName();
|
||||
CBlockClientServerProtocolProtos.MountVolumeResponseProto.Builder
|
||||
resp =
|
||||
CBlockClientServerProtocolProtos
|
||||
.MountVolumeResponseProto.newBuilder();
|
||||
try {
|
||||
MountVolumeResponse result = impl.mountVolume(userName, volumeName);
|
||||
boolean isValid = result.getIsValid();
|
||||
resp.setIsValid(isValid);
|
||||
if (isValid) {
|
||||
resp.setUserName(result.getUserName());
|
||||
resp.setVolumeName(result.getVolumeName());
|
||||
resp.setVolumeSize(result.getVolumeSize());
|
||||
resp.setBlockSize(result.getBlockSize());
|
||||
List<String> containers = result.getContainerList();
|
||||
for (int i=0; i<containers.size(); i++) {
|
||||
CBlockClientServerProtocolProtos.ContainerIDProto.Builder id =
|
||||
CBlockClientServerProtocolProtos.ContainerIDProto.newBuilder();
|
||||
id.setContainerID(containers.get(i));
|
||||
id.setIndex(i);
|
||||
resp.addAllContainerIDs(id.build());
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
return resp.build();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.cblock.protocolPB;
|
||||
|
||||
import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.ipc.ProtocolInfo;
|
||||
|
||||
/**
|
||||
* Users use a independent command line tool to talk to CBlock server for
|
||||
* volume operations (create/delete/info/list). This is the protocol used by
|
||||
* the the command line tool to send these requests to CBlock server.
|
||||
*/
|
||||
@ProtocolInfo(protocolName =
|
||||
"org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocol",
|
||||
protocolVersion = 1)
|
||||
@InterfaceAudience.Private
|
||||
public interface CBlockServiceProtocolPB extends
|
||||
CBlockServiceProtocolProtos.CBlockServiceProtocolService.BlockingInterface {
|
||||
}
|
|
@ -0,0 +1,159 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.cblock.protocolPB;
|
||||
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import org.apache.hadoop.cblock.meta.VolumeInfo;
|
||||
import org.apache.hadoop.cblock.proto.CBlockServiceProtocol;
|
||||
import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICE_BLOCK_SIZE_DEFAULT;
|
||||
|
||||
/**
|
||||
* Server side implementation of the protobuf service.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class CBlockServiceProtocolServerSideTranslatorPB
|
||||
implements CBlockServiceProtocolPB {
|
||||
|
||||
private final CBlockServiceProtocol impl;
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(
|
||||
CBlockServiceProtocolServerSideTranslatorPB.class);
|
||||
|
||||
@Override
|
||||
public CBlockServiceProtocolProtos.CreateVolumeResponseProto createVolume(
|
||||
RpcController controller,
|
||||
CBlockServiceProtocolProtos.CreateVolumeRequestProto request)
|
||||
throws ServiceException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("createVolume called! volume size: " + request.getVolumeSize()
|
||||
+ " block size: " + request.getBlockSize());
|
||||
}
|
||||
try {
|
||||
if (request.hasBlockSize()) {
|
||||
impl.createVolume(request.getUserName(), request.getVolumeName(),
|
||||
request.getVolumeSize(), request.getBlockSize());
|
||||
} else{
|
||||
impl.createVolume(request.getUserName(), request.getVolumeName(),
|
||||
request.getVolumeSize(), DFS_CBLOCK_SERVICE_BLOCK_SIZE_DEFAULT);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
return CBlockServiceProtocolProtos.CreateVolumeResponseProto
|
||||
.newBuilder().build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CBlockServiceProtocolProtos.DeleteVolumeResponseProto deleteVolume(
|
||||
RpcController controller,
|
||||
CBlockServiceProtocolProtos.DeleteVolumeRequestProto request)
|
||||
throws ServiceException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("deleteVolume called! volume name: " + request.getVolumeName()
|
||||
+ " force:" + request.getForce());
|
||||
}
|
||||
try {
|
||||
if (request.hasForce()) {
|
||||
impl.deleteVolume(request.getUserName(), request.getVolumeName(),
|
||||
request.getForce());
|
||||
} else {
|
||||
impl.deleteVolume(request.getUserName(), request.getVolumeName(),
|
||||
false);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
return CBlockServiceProtocolProtos.DeleteVolumeResponseProto
|
||||
.newBuilder().build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CBlockServiceProtocolProtos.InfoVolumeResponseProto infoVolume(
|
||||
RpcController controller,
|
||||
CBlockServiceProtocolProtos.InfoVolumeRequestProto request
|
||||
) throws ServiceException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("infoVolume called! volume name: " + request.getVolumeName());
|
||||
}
|
||||
CBlockServiceProtocolProtos.InfoVolumeResponseProto.Builder resp =
|
||||
CBlockServiceProtocolProtos.InfoVolumeResponseProto.newBuilder();
|
||||
CBlockServiceProtocolProtos.VolumeInfoProto.Builder volumeInfoProto =
|
||||
CBlockServiceProtocolProtos.VolumeInfoProto.newBuilder();
|
||||
VolumeInfo volumeInfo;
|
||||
try {
|
||||
volumeInfo = impl.infoVolume(request.getUserName(),
|
||||
request.getVolumeName());
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
|
||||
volumeInfoProto.setVolumeSize(volumeInfo.getVolumeSize());
|
||||
volumeInfoProto.setBlockSize(volumeInfo.getBlockSize());
|
||||
volumeInfoProto.setUsage(volumeInfo.getUsage());
|
||||
volumeInfoProto.setUserName(volumeInfo.getUserName());
|
||||
volumeInfoProto.setVolumeName(volumeInfo.getVolumeName());
|
||||
resp.setVolumeInfo(volumeInfoProto);
|
||||
return resp.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CBlockServiceProtocolProtos.ListVolumeResponseProto listVolume(
|
||||
RpcController controller,
|
||||
CBlockServiceProtocolProtos.ListVolumeRequestProto request
|
||||
) throws ServiceException {
|
||||
CBlockServiceProtocolProtos.ListVolumeResponseProto.Builder resp =
|
||||
CBlockServiceProtocolProtos.ListVolumeResponseProto.newBuilder();
|
||||
String userName = null;
|
||||
if (request.hasUserName()) {
|
||||
userName = request.getUserName();
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("list volume received for :" + userName);
|
||||
}
|
||||
List<VolumeInfo> volumes;
|
||||
try {
|
||||
volumes = impl.listVolume(userName);
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
for (VolumeInfo volume : volumes) {
|
||||
CBlockServiceProtocolProtos.VolumeInfoProto.Builder volumeEntryProto
|
||||
= CBlockServiceProtocolProtos.VolumeInfoProto.newBuilder();
|
||||
volumeEntryProto.setUserName(volume.getUserName());
|
||||
volumeEntryProto.setVolumeName(volume.getVolumeName());
|
||||
volumeEntryProto.setVolumeSize(volume.getVolumeSize());
|
||||
volumeEntryProto.setBlockSize(volume.getBlockSize());
|
||||
resp.addVolumeEntry(volumeEntryProto.build());
|
||||
}
|
||||
return resp.build();
|
||||
}
|
||||
|
||||
public CBlockServiceProtocolServerSideTranslatorPB(
|
||||
CBlockServiceProtocol impl) {
|
||||
this.impl = impl;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.cblock.protocolPB;
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.cblock.storage;
|
||||
|
||||
import org.apache.hadoop.cblock.meta.ContainerDescriptor;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* The interface to call into underlying container layer.
|
||||
*
|
||||
* Written as interface to allow easy testing: implement a mock container layer
|
||||
* for standalone testing of CBlock API without actually calling into remote
|
||||
* containers. Actual container layer can simply re-implement this.
|
||||
*
|
||||
* NOTE this is temporarily needed class. When SCM containers are full-fledged,
|
||||
* this interface will likely be removed.
|
||||
*/
|
||||
@InterfaceStability.Unstable
|
||||
public interface IStorageClient {
|
||||
ContainerDescriptor createContainer() throws IOException;
|
||||
|
||||
void deleteContainer(String containerId) throws IOException;
|
||||
|
||||
ContainerDescriptor getContainer(String containerId) throws IOException;
|
||||
|
||||
long getContainerSize() throws IOException;
|
||||
}
|
|
@ -0,0 +1,306 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.cblock.storage;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.cblock.exception.CBlockException;
|
||||
import org.apache.hadoop.cblock.meta.ContainerDescriptor;
|
||||
import org.apache.hadoop.cblock.meta.VolumeDescriptor;
|
||||
import org.apache.hadoop.cblock.meta.VolumeInfo;
|
||||
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* This class maintains the key space of CBlock, more specifically, the
|
||||
* volume to container mapping. The core data structure
|
||||
* is a map from users to their volumes info, where volume info is a handler
|
||||
* to a volume, containing information for IO on that volume.
|
||||
*
|
||||
* and a storage client responsible for talking to the SCM
|
||||
*
|
||||
* TODO : all the volume operations are fully serialized, which can potentially
|
||||
* be optimized.
|
||||
*
|
||||
* TODO : if the certain operations (e.g. create) failed, the failure-handling
|
||||
* logic may not be properly implemented currently.
|
||||
*/
|
||||
public class StorageManager {
|
||||
private static final Logger LOGGER =
|
||||
LoggerFactory.getLogger(StorageManager.class);
|
||||
private final IStorageClient storageClient;
|
||||
/**
|
||||
* We will NOT have the situation where same kv pair getting
|
||||
* processed, but it is possible to have multiple kv pair being
|
||||
* processed at same time.
|
||||
*
|
||||
* So using just ConcurrentHashMap should be sufficient
|
||||
*
|
||||
* Again since currently same user accessing from multiple places
|
||||
* is not allowed, no need to consider concurrency of volume map
|
||||
* within one user
|
||||
*/
|
||||
private ConcurrentHashMap<String, HashMap<String, VolumeDescriptor>>
|
||||
user2VolumeMap;
|
||||
// size of an underlying container.
|
||||
// TODO : assuming all containers are of the same size
|
||||
private long containerSizeB;
|
||||
|
||||
public StorageManager(IStorageClient storageClient) throws IOException {
|
||||
this.storageClient = storageClient;
|
||||
this.user2VolumeMap = new ConcurrentHashMap<>();
|
||||
this.containerSizeB = storageClient.getContainerSize();
|
||||
}
|
||||
|
||||
/**
|
||||
* This call will put the volume into in-memory map.
|
||||
*
|
||||
* more specifically, make the volume discoverable on jSCSI server
|
||||
* and keep it's reference in-memory for look up.
|
||||
* @param userName the user name of the volume.
|
||||
* @param volumeName the name of the volume,
|
||||
* @param volume a {@link VolumeDescriptor} object encapsulating the
|
||||
* information about the volume.
|
||||
*/
|
||||
private void makeVolumeReady(String userName, String volumeName,
|
||||
VolumeDescriptor volume) {
|
||||
HashMap<String, VolumeDescriptor> userVolumes;
|
||||
if (user2VolumeMap.containsKey(userName)) {
|
||||
userVolumes = user2VolumeMap.get(userName);
|
||||
} else {
|
||||
userVolumes = new HashMap<>();
|
||||
user2VolumeMap.put(userName, userVolumes);
|
||||
}
|
||||
userVolumes.put(volumeName, volume);
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by CBlockManager to add volumes read from persistent store into
|
||||
* memory, need to contact SCM to setup the reference to the containers given
|
||||
* their id.
|
||||
*
|
||||
* Only for failover process where container meta info is read from
|
||||
* persistent store, and containers themselves are alive.
|
||||
*
|
||||
* TODO : Currently, this method is not being called as failover process
|
||||
* is not implemented yet.
|
||||
*
|
||||
* @param volumeDescriptor a {@link VolumeDescriptor} object encapsulating
|
||||
* the information about a volume.
|
||||
* @throws IOException when adding the volume failed. e.g. volume already
|
||||
* exist, or no more container available.
|
||||
*/
|
||||
public synchronized void addVolume(VolumeDescriptor volumeDescriptor)
|
||||
throws IOException{
|
||||
String userName = volumeDescriptor.getUserName();
|
||||
String volumeName = volumeDescriptor.getVolumeName();
|
||||
LOGGER.info("addVolume:" + userName + ":" + volumeName);
|
||||
if (user2VolumeMap.containsKey(userName)
|
||||
&& user2VolumeMap.get(userName).containsKey(volumeName)) {
|
||||
throw new CBlockException("Volume already exist for "
|
||||
+ userName + ":" + volumeName);
|
||||
}
|
||||
// the container ids are read from levelDB, setting up the
|
||||
// container handlers here.
|
||||
String[] containerIds = volumeDescriptor.getContainerIDs();
|
||||
|
||||
for (String containerId : containerIds) {
|
||||
try {
|
||||
ContainerDescriptor containerDescriptor =
|
||||
storageClient.getContainer(containerId);
|
||||
volumeDescriptor.addContainer(containerDescriptor);
|
||||
} catch (IOException e) {
|
||||
LOGGER.error("Getting container failed! Container:{} error:{}",
|
||||
containerId, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
// now ready to put into in-memory map.
|
||||
makeVolumeReady(userName, volumeName, volumeDescriptor);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Called by CBlock server when creating a fresh volume. The core
|
||||
* logic is adding needed information into in-memory meta data.
|
||||
*
|
||||
* @param userName the user name of the volume.
|
||||
* @param volumeName the name of the volume.
|
||||
* @param volumeSize the size of the volume.
|
||||
* @param blockSize the block size of the volume.
|
||||
* @throws CBlockException when the volume can not be created.
|
||||
*/
|
||||
public synchronized void createVolume(String userName, String volumeName,
|
||||
long volumeSize, int blockSize) throws CBlockException {
|
||||
LOGGER.debug("createVolume:" + userName + ":" + volumeName);
|
||||
if (user2VolumeMap.containsKey(userName)
|
||||
&& user2VolumeMap.get(userName).containsKey(volumeName)) {
|
||||
throw new CBlockException("Volume already exist for "
|
||||
+ userName + ":" + volumeName);
|
||||
}
|
||||
if (volumeSize < blockSize) {
|
||||
throw new CBlockException("Volume size smaller than block size? " +
|
||||
"volume size:" + volumeSize + " block size:" + blockSize);
|
||||
}
|
||||
VolumeDescriptor volume;
|
||||
int containerIdx = 0;
|
||||
try {
|
||||
volume = new VolumeDescriptor(userName, volumeName,
|
||||
volumeSize, blockSize);
|
||||
long allocatedSize = 0;
|
||||
ArrayList<String> containerIds = new ArrayList<>();
|
||||
while (allocatedSize < volumeSize) {
|
||||
ContainerDescriptor container = storageClient.createContainer();
|
||||
container.setContainerIndex(containerIdx);
|
||||
volume.addContainer(container);
|
||||
containerIds.add(container.getContainerID());
|
||||
allocatedSize += containerSizeB;
|
||||
containerIdx += 1;
|
||||
}
|
||||
volume.setContainerIDs(containerIds);
|
||||
} catch (IOException e) {
|
||||
throw new CBlockException("Error when creating volume:" + e.getMessage());
|
||||
// TODO : delete already created containers? or re-try policy
|
||||
}
|
||||
makeVolumeReady(userName, volumeName, volume);
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by CBlock server to delete a specific volume. Mainly
|
||||
* to check whether it can be deleted, and remove it from in-memory meta
|
||||
* data.
|
||||
*
|
||||
* @param userName the user name of the volume.
|
||||
* @param volumeName the name of the volume.
|
||||
* @param force if set to false, only delete volume it is empty, otherwise
|
||||
* throw exception. if set to true, delete regardless.
|
||||
* @throws CBlockException when the volume can not be deleted.
|
||||
*/
|
||||
public synchronized void deleteVolume(String userName, String volumeName,
|
||||
boolean force) throws CBlockException {
|
||||
if (!user2VolumeMap.containsKey(userName)
|
||||
|| !user2VolumeMap.get(userName).containsKey(volumeName)) {
|
||||
throw new CBlockException("Deleting non-exist volume "
|
||||
+ userName + ":" + volumeName);
|
||||
}
|
||||
if (!force && !user2VolumeMap.get(userName).get(volumeName).isEmpty()) {
|
||||
throw new CBlockException("Deleting a non-empty volume without force!");
|
||||
}
|
||||
VolumeDescriptor volume = user2VolumeMap.get(userName).remove(volumeName);
|
||||
for (String containerID : volume.getContainerIDsList()) {
|
||||
try {
|
||||
storageClient.deleteContainer(containerID);
|
||||
} catch (IOException e) {
|
||||
LOGGER.error("Error deleting container Container:{} error:{}",
|
||||
containerID, e);
|
||||
throw new CBlockException(e.getMessage());
|
||||
}
|
||||
}
|
||||
if (user2VolumeMap.get(userName).size() == 0) {
|
||||
user2VolumeMap.remove(userName);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by CBlock server to get information of a specific volume.
|
||||
*
|
||||
* @param userName the user name of the volume.
|
||||
* @param volumeName the name of the volume.
|
||||
* @return a {@link VolumeInfo} object encapsulating the information of the
|
||||
* volume.
|
||||
* @throws CBlockException when the information can not be retrieved.
|
||||
*/
|
||||
public synchronized VolumeInfo infoVolume(String userName, String volumeName)
|
||||
throws CBlockException {
|
||||
if (!user2VolumeMap.containsKey(userName)
|
||||
|| !user2VolumeMap.get(userName).containsKey(volumeName)) {
|
||||
throw new CBlockException("Getting info for non-exist volume "
|
||||
+ userName + ":" + volumeName);
|
||||
}
|
||||
return user2VolumeMap.get(userName).get(volumeName).getInfo();
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by CBlock server to check whether the given volume can be
|
||||
* mounted, i.e. whether it can be found in the meta data.
|
||||
*
|
||||
* return a {@link MountVolumeResponse} with isValid flag to indicate
|
||||
* whether the volume can be mounted or not.
|
||||
*
|
||||
* @param userName the user name of the volume.
|
||||
* @param volumeName the name of the volume
|
||||
* @return a {@link MountVolumeResponse} object encapsulating whether the
|
||||
* volume is valid, and if yes, the requried information for client to
|
||||
* read/write the volume.
|
||||
*/
|
||||
public synchronized MountVolumeResponse isVolumeValid(
|
||||
String userName, String volumeName) {
|
||||
if (!user2VolumeMap.containsKey(userName)
|
||||
|| !user2VolumeMap.get(userName).containsKey(volumeName)) {
|
||||
// in the case of invalid volume, no need to set any value other than
|
||||
// isValid flag.
|
||||
return new MountVolumeResponse(false, null, null, 0, 0, null);
|
||||
}
|
||||
VolumeDescriptor volume = user2VolumeMap.get(userName).get(volumeName);
|
||||
return new MountVolumeResponse(true, userName,
|
||||
volumeName, volume.getVolumeSize(), volume.getBlockSize(),
|
||||
volume.getContainerIDsList());
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by CBlock manager to list all volumes.
|
||||
*
|
||||
* @param userName the userName whose volume to be listed, if set to null,
|
||||
* all volumes will be listed.
|
||||
* @return a list of {@link VolumeDescriptor} representing all volumes
|
||||
* requested.
|
||||
*/
|
||||
public synchronized List<VolumeDescriptor> getAllVolume(String userName) {
|
||||
ArrayList<VolumeDescriptor> allVolumes = new ArrayList<>();
|
||||
if (userName == null) {
|
||||
for (Map.Entry<String, HashMap<String, VolumeDescriptor>> entry
|
||||
: user2VolumeMap.entrySet()) {
|
||||
allVolumes.addAll(entry.getValue().values());
|
||||
}
|
||||
} else {
|
||||
if (user2VolumeMap.containsKey(userName)) {
|
||||
allVolumes.addAll(user2VolumeMap.get(userName).values());
|
||||
}
|
||||
}
|
||||
return allVolumes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Only for testing the behavior of create/delete volumes.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public VolumeDescriptor getVolume(String userName, String volumeName) {
|
||||
if (!user2VolumeMap.containsKey(userName)
|
||||
|| !user2VolumeMap.get(userName).containsKey(volumeName)) {
|
||||
return null;
|
||||
}
|
||||
return user2VolumeMap.get(userName).get(volumeName);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.cblock.storage;
|
|
@ -0,0 +1,151 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.cblock;
|
||||
|
||||
import org.apache.hadoop.cblock.meta.VolumeInfo;
|
||||
import org.apache.hadoop.cblock.storage.IStorageClient;
|
||||
import org.apache.hadoop.cblock.util.MockStorageClient;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* This class tests the basics of CBlock server. Mainly about the four
|
||||
* operations on volumes: create, delete, info and list.
|
||||
*/
|
||||
public class TestCBlockServer {
|
||||
private static CBlockManager cBlockManager;
|
||||
private static CBlockConfiguration conf;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws Exception {
|
||||
IStorageClient storageClient = new MockStorageClient();
|
||||
conf = new CBlockConfiguration();
|
||||
cBlockManager = new CBlockManager(conf, storageClient);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test create volume for different users.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testCreateVolume() throws Exception {
|
||||
String userName1 = "testCreateUser1";
|
||||
String userName2 = "testCreateUser2";
|
||||
String volumeName1 = "testVolume1";
|
||||
String volumeName2 = "testVolume2";
|
||||
long volumeSize = 1L*1024*1024;
|
||||
int blockSize = 4096;
|
||||
cBlockManager.createVolume(userName1, volumeName1, volumeSize, blockSize);
|
||||
List<VolumeInfo> volumes = cBlockManager.listVolume(userName1);
|
||||
assertEquals(1, volumes.size());
|
||||
VolumeInfo existingVolume = volumes.get(0);
|
||||
assertEquals(userName1, existingVolume.getUserName());
|
||||
assertEquals(volumeName1, existingVolume.getVolumeName());
|
||||
assertEquals(volumeSize, existingVolume.getVolumeSize());
|
||||
assertEquals(blockSize, existingVolume.getBlockSize());
|
||||
|
||||
cBlockManager.createVolume(userName1, volumeName2, volumeSize, blockSize);
|
||||
cBlockManager.createVolume(userName2, volumeName1, volumeSize, blockSize);
|
||||
volumes = cBlockManager.listVolume(userName1);
|
||||
assertEquals(2, volumes.size());
|
||||
volumes = cBlockManager.listVolume(userName2);
|
||||
assertEquals(1, volumes.size());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test delete volume.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testDeleteVolume() throws Exception {
|
||||
String userName = "testDeleteUser";
|
||||
String volumeName1 = "testVolume1";
|
||||
String volumeName2 = "testVolume2";
|
||||
long volumeSize = 1L*1024*1024;
|
||||
int blockSize = 4096;
|
||||
cBlockManager.createVolume(userName, volumeName1, volumeSize, blockSize);
|
||||
cBlockManager.createVolume(userName, volumeName2, volumeSize, blockSize);
|
||||
cBlockManager.deleteVolume(userName, volumeName1, true);
|
||||
List<VolumeInfo> volumes = cBlockManager.listVolume(userName);
|
||||
assertEquals(1, volumes.size());
|
||||
|
||||
VolumeInfo existingVolume = volumes.get(0);
|
||||
assertEquals(userName, existingVolume.getUserName());
|
||||
assertEquals(volumeName2, existingVolume.getVolumeName());
|
||||
assertEquals(volumeSize, existingVolume.getVolumeSize());
|
||||
assertEquals(blockSize, existingVolume.getBlockSize());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test info volume.
|
||||
*
|
||||
* TODO : usage field is not being tested (as it is not implemented yet)
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testInfoVolume() throws Exception {
|
||||
String userName = "testInfoUser";
|
||||
String volumeName = "testVolume";
|
||||
long volumeSize = 1L*1024*1024;
|
||||
int blockSize = 4096;
|
||||
cBlockManager.createVolume(userName, volumeName, volumeSize, blockSize);
|
||||
VolumeInfo info = cBlockManager.infoVolume(userName, volumeName);
|
||||
assertEquals(userName, info.getUserName());
|
||||
assertEquals(volumeName, info.getVolumeName());
|
||||
assertEquals(volumeSize, info.getVolumeSize());
|
||||
assertEquals(blockSize, info.getBlockSize());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test listing a number of volumes.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testListVolume() throws Exception {
|
||||
String userName = "testListUser";
|
||||
String volumeName = "testVolume";
|
||||
long volumeSize = 1L*1024*1024;
|
||||
int blockSize = 4096;
|
||||
int volumeNum = 100;
|
||||
for (int i = 0; i<volumeNum; i++) {
|
||||
cBlockManager.createVolume(userName, volumeName + i,
|
||||
volumeSize, blockSize);
|
||||
}
|
||||
List<VolumeInfo> volumes = cBlockManager.listVolume(userName);
|
||||
assertEquals(volumeNum, volumes.size());
|
||||
HashSet<String> volumeIds = new HashSet<>();
|
||||
for (int i = 0; i<volumeNum; i++) {
|
||||
VolumeInfo volumeInfo = volumes.get(i);
|
||||
assertEquals(userName, volumeInfo.getUserName());
|
||||
assertFalse(volumeIds.contains(volumeName + i));
|
||||
volumeIds.add(volumeName + i);
|
||||
assertEquals(volumeSize, volumeInfo.getVolumeSize());
|
||||
assertEquals(blockSize, volumeInfo.getBlockSize());
|
||||
}
|
||||
for (int i = 0; i<volumeNum; i++) {
|
||||
assertTrue(volumeIds.contains(volumeName + i));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,63 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.cblock.util;
|
||||
|
||||
import org.apache.hadoop.cblock.meta.ContainerDescriptor;
|
||||
|
||||
import java.util.HashMap;
|
||||
|
||||
/**
|
||||
* NOTE : This class is only for testing purpose.
|
||||
*
|
||||
* Mock an underlying container storage layer, expose to CBlock to perform
|
||||
* IO. While in this mock implementation, a container is nothing more than
|
||||
* a in memory hashmap.
|
||||
*
|
||||
* This is to allow volume creation call and perform standalone tests.
|
||||
*/
|
||||
public final class ContainerLookUpService {
|
||||
private static HashMap<String, ContainerDescriptor>
|
||||
containers = new HashMap<>();
|
||||
|
||||
/**
|
||||
* Return an *existing* container with given Id.
|
||||
*
|
||||
* TODO : for testing purpose, return a new container if the given Id
|
||||
* is not found
|
||||
*
|
||||
* found
|
||||
* @param containerID
|
||||
* @return
|
||||
*/
|
||||
public static ContainerDescriptor lookUp(String containerID) {
|
||||
if (!containers.containsKey(containerID)) {
|
||||
System.err.println("A container id never seen, return a new one " +
|
||||
"for testing purpose:" + containerID);
|
||||
containers.put(containerID, new ContainerDescriptor(containerID));
|
||||
}
|
||||
return containers.get(containerID);
|
||||
}
|
||||
|
||||
public static void addContainer(String containerID) {
|
||||
containers.put(containerID, new ContainerDescriptor(containerID));
|
||||
}
|
||||
|
||||
private ContainerLookUpService() {
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.cblock.util;
|
||||
|
||||
import org.apache.hadoop.cblock.meta.ContainerDescriptor;
|
||||
import org.apache.hadoop.cblock.storage.IStorageClient;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* This class is the one that directly talks to SCM server.
|
||||
*
|
||||
* NOTE : this is only a mock class, only to allow testing volume
|
||||
* creation without actually creating containers. In real world, need to be
|
||||
* replaced with actual container look up calls.
|
||||
*
|
||||
*/
|
||||
public class MockStorageClient implements IStorageClient {
|
||||
private static long currentContainerId = -1;
|
||||
|
||||
/**
|
||||
* Ask SCM to get a exclusive container.
|
||||
*
|
||||
* @return A container descriptor object to locate this container
|
||||
* @throws Exception
|
||||
*/
|
||||
public ContainerDescriptor createContainer() throws IOException {
|
||||
currentContainerId += 1;
|
||||
ContainerLookUpService.addContainer(Long.toString(currentContainerId));
|
||||
return ContainerLookUpService.lookUp(Long.toString(currentContainerId));
|
||||
}
|
||||
|
||||
/**
|
||||
* As this is only a testing class, with all "container" maintained in
|
||||
* memory, no need to really delete anything for now.
|
||||
* @param containerId
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public void deleteContainer(String containerId) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Return reference to an *existing* container with given ID.
|
||||
*
|
||||
* @param containerId
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public ContainerDescriptor getContainer(String containerId)
|
||||
throws IOException {
|
||||
return ContainerLookUpService.lookUp(containerId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getContainerSize() throws IOException {
|
||||
// just return a constant value for now
|
||||
return 5L*1024*1024*1024; // 5GB
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue