HDFS-11138. Block Storage: add block storage server. Contributed by Chen Liang

This commit is contained in:
Anu Engineer 2016-11-18 15:49:01 -08:00 committed by Owen O'Malley
parent 48db56adea
commit 9997d36eb6
25 changed files with 1866 additions and 0 deletions

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock;
/**
* This class contains constants for configuration keys used in CBlock.
*/
public final class CBlockConfigKeys {
public static final String DFS_CBLOCK_ENABLED_KEY =
"dfs.cblock.enabled";
public static final String DFS_CBLOCK_SERVICERPC_ADDRESS_KEY =
"dfs.cblock.servicerpc-address";
public static final int DFS_CBLOCK_RPCSERVICE_PORT_DEFAULT =
9810;
public static final String DFS_CBLOCK_RPCSERVICE_IP_DEFAULT =
"0.0.0.0";
public static final String DFS_CBLOCK_SERVICERPC_ADDRESS_DEFAULT =
DFS_CBLOCK_RPCSERVICE_IP_DEFAULT
+ ":" + DFS_CBLOCK_RPCSERVICE_PORT_DEFAULT;
public static final String DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY =
"dfs.cblock.jscsi-address";
//The port on CBlockManager node for jSCSI to ask
public static final int DFS_CBLOCK_JSCSI_PORT_DEFAULT =
9811;
public static final String DFS_CBLOCK_JSCSIRPC_ADDRESS_DEFAULT =
DFS_CBLOCK_RPCSERVICE_IP_DEFAULT
+ ":" + DFS_CBLOCK_JSCSI_PORT_DEFAULT;
public static final String DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY =
"dfs.cblock.service.rpc-bind-host";
public static final String DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY =
"dfs.cblock.jscsi.rpc-bind-host";
// default block size is 4KB
public static final int DFS_CBLOCK_SERVICE_BLOCK_SIZE_DEFAULT =
4096;
public static final String DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY =
"dfs.storage.service.handler.count";
public static final int DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT = 10;
private CBlockConfigKeys() {
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock;
import org.apache.hadoop.conf.Configuration;
/**
* This class specifies the CBlock configuration resources.
*/
public class CBlockConfiguration extends Configuration {
static {
// adds the default resources
Configuration.addDefaultResource("hdfs-default.xml");
Configuration.addDefaultResource("hdfs-site.xml");
Configuration.addDefaultResource("cblock-default.xml");
Configuration.addDefaultResource("cblock-site.xml");
}
}

View File

@ -0,0 +1,238 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock;
import com.google.protobuf.BlockingService;
import org.apache.hadoop.cblock.meta.VolumeDescriptor;
import org.apache.hadoop.cblock.meta.VolumeInfo;
import org.apache.hadoop.cblock.proto.CBlockClientServerProtocol;
import org.apache.hadoop.cblock.proto.CBlockServiceProtocol;
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos;
import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos;
import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolPB;
import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolServerSideTranslatorPB;
import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolServerSideTranslatorPB;
import org.apache.hadoop.cblock.storage.IStorageClient;
import org.apache.hadoop.cblock.storage.StorageManager;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSIRPC_ADDRESS_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_ADDRESS_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_ADDRESS_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY;
/**
* The main entry point of CBlock operations, ALL the CBlock operations
* will go through this class. But NOTE that:
*
* volume operations (create/
* delete/info) are:
* client -> CBlockManager -> StorageManager -> CBlock client
*
* IO operations (put/get block) are;
* client -> CBlock client -> container
*
*/
public class CBlockManager implements CBlockServiceProtocol,
CBlockClientServerProtocol {
private static final Logger LOG =
LoggerFactory.getLogger(CBlockManager.class);
private final RPC.Server cblockService;
private final RPC.Server cblockServer;
private final StorageManager storageManager;
public CBlockManager(CBlockConfiguration conf, IStorageClient storageClient
) throws IOException {
storageManager = new StorageManager(storageClient);
RPC.setProtocolEngine(conf, CBlockServiceProtocolPB.class,
ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, CBlockClientServerProtocolPB.class,
ProtobufRpcEngine.class);
// start service for client command-to-cblock server service
InetSocketAddress serviceRpcAddr = NetUtils.createSocketAddr(
conf.getTrimmed(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY,
DFS_CBLOCK_SERVICERPC_ADDRESS_DEFAULT), -1,
DFS_CBLOCK_SERVICERPC_ADDRESS_KEY);
BlockingService cblockProto =
CBlockServiceProtocolProtos
.CBlockServiceProtocolService
.newReflectiveBlockingService(
new CBlockServiceProtocolServerSideTranslatorPB(this)
);
cblockService = startRpcServer(conf, CBlockServiceProtocolPB.class,
cblockProto, serviceRpcAddr,
DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY,
DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY,
DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT);
LOG.info("CBlock manager listening for client commands on: {}",
serviceRpcAddr);
// now start service for cblock client-to-cblock server communication
InetSocketAddress serverRpcAddr = NetUtils.createSocketAddr(
conf.get(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY,
DFS_CBLOCK_JSCSIRPC_ADDRESS_DEFAULT), -1,
DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY);
BlockingService serverProto =
CBlockClientServerProtocolProtos
.CBlockClientServerProtocolService
.newReflectiveBlockingService(
new CBlockClientServerProtocolServerSideTranslatorPB(this)
);
cblockServer = startRpcServer(
conf, CBlockClientServerProtocolPB.class,
serverProto, serverRpcAddr,
DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY,
DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY,
DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT);
LOG.info("CBlock server listening for client commands on: {}",
serverRpcAddr);
}
public void start() {
cblockService.start();
cblockServer.start();
LOG.info("CBlock manager started!");
}
public void stop() {
cblockService.stop();
cblockServer.stop();
}
public void join() {
try {
cblockService.join();
cblockServer.join();
} catch (InterruptedException e) {
LOG.error("Interrupted during join");
Thread.currentThread().interrupt();
}
}
/**
* Starts an RPC server, if configured.
*
* @param conf configuration
* @param protocol RPC protocol provided by RPC server
* @param instance RPC protocol implementation instance
* @param addr configured address of RPC server
* @param bindHostKey configuration key for setting explicit bind host. If
* the property is not configured, then the bind host is taken from addr.
* @param handlerCountKey configuration key for RPC server handler count
* @param handlerCountDefault default RPC server handler count if unconfigured
* @return RPC server, or null if addr is null
* @throws IOException if there is an I/O error while creating RPC server
*/
private static RPC.Server startRpcServer(CBlockConfiguration conf,
Class<?> protocol, BlockingService instance,
InetSocketAddress addr, String bindHostKey,
String handlerCountKey, int handlerCountDefault) throws IOException {
if (addr == null) {
return null;
}
String bindHost = conf.getTrimmed(bindHostKey);
if (bindHost == null || bindHost.isEmpty()) {
bindHost = addr.getHostName();
}
int numHandlers = conf.getInt(handlerCountKey, handlerCountDefault);
RPC.Server rpcServer = new RPC.Builder(conf)
.setProtocol(protocol)
.setInstance(instance)
.setBindAddress(bindHost)
.setPort(addr.getPort())
.setNumHandlers(numHandlers)
.setVerbose(false)
.setSecretManager(null)
.build();
return rpcServer;
}
@Override
public MountVolumeResponse mountVolume(
String userName, String volumeName) throws IOException {
return storageManager.isVolumeValid(userName, volumeName);
}
@Override
public void createVolume(String userName, String volumeName,
long volumeSize, int blockSize) throws IOException {
LOG.info("Create volume received: userName: {} volumeName: {} " +
"volumeSize: {} blockSize: {}", userName, volumeName,
volumeSize, blockSize);
// It is important to create in-memory representation of the
// volume first, then writes to persistent storage (levelDB)
// such that it is guaranteed that when there is an entry in
// levelDB, the volume is allocated. (more like a UNDO log fashion)
// TODO: what if creation failed? we allocated containers but lost
// the reference to the volume and all it's containers. How to release
// the containers?
storageManager.createVolume(userName, volumeName, volumeSize, blockSize);
VolumeDescriptor volume = storageManager.getVolume(userName, volumeName);
if (volume == null) {
throw new IOException("Volume creation failed!");
}
}
@Override
public void deleteVolume(String userName,
String volumeName, boolean force) throws IOException {
LOG.info("Delete volume received: volume:" + volumeName
+ " force?:" + force);
storageManager.deleteVolume(userName, volumeName, force);
}
@Override
public VolumeInfo infoVolume(String userName, String volumeName
) throws IOException {
LOG.info("Info volume received: volume: {}", volumeName);
return storageManager.infoVolume(userName, volumeName);
}
@Override
public List<VolumeInfo> listVolume(String userName) throws IOException {
ArrayList<VolumeInfo> response = new ArrayList<>();
List<VolumeDescriptor> allVolumes =
storageManager.getAllVolume(userName);
for (VolumeDescriptor volume : allVolumes) {
VolumeInfo info =
new VolumeInfo(volume.getUserName(), volume.getVolumeName(),
volume.getVolumeSize(), volume.getBlockSize());
response.add(info);
}
return response;
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.exception;
import java.io.IOException;
/**
* The exception class used in CBlock.
*/
public class CBlockException extends IOException {
public CBlockException(String message) {
super(message);
}
}

View File

@ -0,0 +1,18 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.exception;

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.meta;
/**
*
* The internal representation of a container maintained by CBlock server.
* Include enough information to exactly identify a container for read/write
* operation.
*
* NOTE that this class is work-in-progress. Depends on HDFS-7240 container
* implementation. Currently only to allow testing.
*/
public class ContainerDescriptor {
private final String containerID;
// the index of this container with in a volume
// on creation, there is no way to know the index of the container
// as it is a volume specific information
private int containerIndex;
public ContainerDescriptor(String containerID) {
this.containerID = containerID;
}
public void setContainerIndex(int idx) {
this.containerIndex = idx;
}
public String getContainerID() {
return containerID;
}
public int getContainerIndex() {
return containerIndex;
}
public long getUtilization() {
return 0;
}
}

View File

@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.meta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* The internal representation maintained by CBlock server as the info for
* a volume. Contains the list of containers belonging to this volume.
*
* Many methods of this class is made such that the volume information (
* including container list) can be easily transformed into a Json string
* that can be stored/parsed from a persistent store for cblock server
* persistence.
*
* This class is still work-in-progress.
*/
public class VolumeDescriptor {
// The main data structure is the container location map
// other thing are mainly just information
// since only one operation at a time is allowed, no
// need to consider concurrency control here
// key is container id
private static final Logger LOG =
LoggerFactory.getLogger(VolumeDescriptor.class);
private HashMap<String, ContainerDescriptor> containerMap;
private String userName;
private int blockSize;
private long volumeSize;
private String volumeName;
// this is essentially the ordered keys of containerMap
// which is kind of redundant information. But since we
// are likely to access it frequently based on ordering.
// keeping this copy to avoid having to sort the key every
// time
private List<String> containerIdOrdered;
/**
* This is not being called explicitly, but this is necessary as
* it will be called by the parse method implicitly when
* reconstructing the object from json string. The get*() methods
* and set*() methods are for the same purpose also.
*/
public VolumeDescriptor() {
containerMap = new HashMap<>();
containerIdOrdered = new ArrayList<>();
}
public VolumeDescriptor(String userName, String volumeName, long volumeSize,
int blockSize) {
this.containerMap = new HashMap<>();
this.userName = userName;
this.volumeName = volumeName;
this.blockSize = blockSize;
this.volumeSize = volumeSize;
this.containerIdOrdered = new LinkedList<>();
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getVolumeName() {
return volumeName;
}
public void setVolumeName(String volumeName) {
this.volumeName = volumeName;
}
public long getVolumeSize() {
return volumeSize;
}
public void setVolumeSize(long volumeSize) {
this.volumeSize = volumeSize;
}
public int getBlockSize() {
return blockSize;
}
public void setBlockSize(int blockSize) {
this.blockSize = blockSize;
}
public void setContainerIDs(ArrayList<String> containerIDs) {
containerIdOrdered.addAll(containerIDs);
}
public void addContainer(ContainerDescriptor containerDescriptor) {
containerMap.put(containerDescriptor.getContainerID(),
containerDescriptor);
}
public boolean isEmpty() {
VolumeInfo info = getInfo();
return info.getUsage() == 0;
}
public VolumeInfo getInfo() {
// TODO : need to actually go through all containers of this volume and
// ask for their utilization.
long utilization = 0;
for (Map.Entry<String, ContainerDescriptor> entry :
containerMap.entrySet()) {
utilization += entry.getValue().getUtilization();
}
return new VolumeInfo(this.userName, this.volumeName,
this.volumeSize, this.blockSize,
utilization * blockSize);
}
public String[] getContainerIDs() {
//ArrayList<Long> ids = new ArrayList(containerMap.keySet());
//return ids.toArray(new Long[ids.size()]);
return containerIdOrdered.toArray(new String[containerIdOrdered.size()]);
}
public List<String> getContainerIDsList() {
return new ArrayList<>(containerIdOrdered);
}
@Override
public String toString() {
String string = "";
string += "Username:" + userName + "\n";
string += "VolumeName:" + volumeName + "\n";
string += "VolumeSize:" + volumeSize + "\n";
string += "blockSize:" + blockSize + "\n";
string += "containerIds:" + containerIdOrdered + "\n";
string += "containerIdsWithObject:" + containerMap.keySet();
return string;
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.meta;
/**
* A wrapper class that represents the information about a volume. Used in
* communication between CBlock client and CBlock server only.
*/
public class VolumeInfo {
private final String userName;
private final String volumeName;
private final long volumeSize;
private final long blockSize;
private final long usage;
public VolumeInfo(String userName, String volumeName, long volumeSize,
long blockSize, long usage) {
this.userName = userName;
this.volumeName = volumeName;
this.volumeSize = volumeSize;
this.blockSize = blockSize;
this.usage = usage;
}
// When listing volume, the usage will not be set.
public VolumeInfo(String userName, String volumeName, long volumeSize,
long blockSize) {
this.userName = userName;
this.volumeName = volumeName;
this.volumeSize = volumeSize;
this.blockSize = blockSize;
this.usage = -1;
}
public long getVolumeSize() {
return volumeSize;
}
public long getBlockSize() {
return blockSize;
}
public long getUsage() {
return usage;
}
public String getUserName() {
return userName;
}
public String getVolumeName() {
return volumeName;
}
@Override
public String toString() {
return " userName:" + userName +
" volumeName:" + volumeName +
" volumeSize:" + volumeSize +
" blockSize:" + blockSize +
" (sizeInBlocks:" + volumeSize/blockSize + ")" +
" usageInBlocks:" + usage;
}
}

View File

@ -0,0 +1,18 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.meta;

View File

@ -0,0 +1,18 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock;

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.proto;
import java.io.IOException;
/**
* The protocol that CBlock client side uses to talk to server side. CBlock
* client is the point where a volume is mounted. All the actual volume IO
* operations will go through CBlock client after the volume is mounted.
*
* When users mount a volume on CBlock client, CBlock client side uses this
* protocol to send mount request to CBlock server.
*/
public interface CBlockClientServerProtocol {
MountVolumeResponse mountVolume(String userName, String volumeName)
throws IOException;
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.proto;
import org.apache.hadoop.cblock.meta.VolumeInfo;
import org.apache.hadoop.classification.InterfaceAudience;
import java.io.IOException;
import java.util.List;
/**
* CBlock uses a separate command line tool to send volume management
* operations to CBlock server, including create/delete/info/list volumes. This
* is the protocol used by the command line tool to send these requests and get
* responses from CBlock server.
*/
@InterfaceAudience.Private
public interface CBlockServiceProtocol {
void createVolume(String userName, String volumeName,
long volumeSize, int blockSize) throws IOException;
void deleteVolume(String userName, String volumeName,
boolean force) throws IOException;
VolumeInfo infoVolume(String userName,
String volumeName) throws IOException;
List<VolumeInfo> listVolume(String userName) throws IOException;
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.proto;
import java.util.List;
/**
* The response message of mounting a volume. Including enough information
* for the client to communicate (perform IO) with the volume containers
* directly.
*/
public class MountVolumeResponse {
private final boolean isValid;
private final String userName;
private final String volumeName;
private final long volumeSize;
private final int blockSize;
private List<String> containerList;
public MountVolumeResponse(boolean isValid, String userName,
String volumeName, long volumeSize, int blockSize,
List<String> containerList) {
this.isValid = isValid;
this.userName = userName;
this.volumeName = volumeName;
this.volumeSize = volumeSize;
this.blockSize = blockSize;
this.containerList = containerList;
}
public boolean getIsValid() {
return isValid;
}
public String getUserName() {
return userName;
}
public String getVolumeName() {
return volumeName;
}
public long getVolumeSize() {
return volumeSize;
}
public int getBlockSize() {
return blockSize;
}
public List<String> getContainerList() {
return containerList;
}
}

View File

@ -0,0 +1,18 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.proto;

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.protocolPB;
import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ipc.ProtocolInfo;
/**
* This is the protocol CBlock client uses to talk to CBlock server.
* CBlock client is the mounting point of a volume. When a user mounts a
* volume, the cBlock client running on the local node will use this protocol
* to talk to CBlock server to mount the volume.
*/
@ProtocolInfo(protocolName =
"org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocol",
protocolVersion = 1)
@InterfaceAudience.Private
public interface CBlockClientServerProtocolPB extends
CBlockClientServerProtocolProtos
.CBlockClientServerProtocolService.BlockingInterface {
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.protocolPB;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.cblock.proto.CBlockClientServerProtocol;
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos;
import org.apache.hadoop.classification.InterfaceAudience;
import java.io.IOException;
import java.util.List;
/**
* The server side implementation of cblock client to server protocol.
*/
@InterfaceAudience.Private
public class CBlockClientServerProtocolServerSideTranslatorPB implements
CBlockClientServerProtocolPB {
private final CBlockClientServerProtocol impl;
public CBlockClientServerProtocolServerSideTranslatorPB(
CBlockClientServerProtocol impl) {
this.impl = impl;
}
@Override
public CBlockClientServerProtocolProtos.MountVolumeResponseProto mountVolume(
RpcController controller,
CBlockClientServerProtocolProtos.MountVolumeRequestProto request)
throws ServiceException {
String userName = request.getUserName();
String volumeName = request.getVolumeName();
CBlockClientServerProtocolProtos.MountVolumeResponseProto.Builder
resp =
CBlockClientServerProtocolProtos
.MountVolumeResponseProto.newBuilder();
try {
MountVolumeResponse result = impl.mountVolume(userName, volumeName);
boolean isValid = result.getIsValid();
resp.setIsValid(isValid);
if (isValid) {
resp.setUserName(result.getUserName());
resp.setVolumeName(result.getVolumeName());
resp.setVolumeSize(result.getVolumeSize());
resp.setBlockSize(result.getBlockSize());
List<String> containers = result.getContainerList();
for (int i=0; i<containers.size(); i++) {
CBlockClientServerProtocolProtos.ContainerIDProto.Builder id =
CBlockClientServerProtocolProtos.ContainerIDProto.newBuilder();
id.setContainerID(containers.get(i));
id.setIndex(i);
resp.addAllContainerIDs(id.build());
}
}
} catch (IOException e) {
throw new ServiceException(e);
}
return resp.build();
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.protocolPB;
import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ipc.ProtocolInfo;
/**
* Users use a independent command line tool to talk to CBlock server for
* volume operations (create/delete/info/list). This is the protocol used by
* the the command line tool to send these requests to CBlock server.
*/
@ProtocolInfo(protocolName =
"org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocol",
protocolVersion = 1)
@InterfaceAudience.Private
public interface CBlockServiceProtocolPB extends
CBlockServiceProtocolProtos.CBlockServiceProtocolService.BlockingInterface {
}

View File

@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.protocolPB;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.cblock.meta.VolumeInfo;
import org.apache.hadoop.cblock.proto.CBlockServiceProtocol;
import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos;
import org.apache.hadoop.classification.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICE_BLOCK_SIZE_DEFAULT;
/**
* Server side implementation of the protobuf service.
*/
@InterfaceAudience.Private
public class CBlockServiceProtocolServerSideTranslatorPB
implements CBlockServiceProtocolPB {
private final CBlockServiceProtocol impl;
private static final Logger LOG =
LoggerFactory.getLogger(
CBlockServiceProtocolServerSideTranslatorPB.class);
@Override
public CBlockServiceProtocolProtos.CreateVolumeResponseProto createVolume(
RpcController controller,
CBlockServiceProtocolProtos.CreateVolumeRequestProto request)
throws ServiceException {
if (LOG.isDebugEnabled()) {
LOG.debug("createVolume called! volume size: " + request.getVolumeSize()
+ " block size: " + request.getBlockSize());
}
try {
if (request.hasBlockSize()) {
impl.createVolume(request.getUserName(), request.getVolumeName(),
request.getVolumeSize(), request.getBlockSize());
} else{
impl.createVolume(request.getUserName(), request.getVolumeName(),
request.getVolumeSize(), DFS_CBLOCK_SERVICE_BLOCK_SIZE_DEFAULT);
}
} catch (IOException e) {
throw new ServiceException(e);
}
return CBlockServiceProtocolProtos.CreateVolumeResponseProto
.newBuilder().build();
}
@Override
public CBlockServiceProtocolProtos.DeleteVolumeResponseProto deleteVolume(
RpcController controller,
CBlockServiceProtocolProtos.DeleteVolumeRequestProto request)
throws ServiceException {
if (LOG.isDebugEnabled()) {
LOG.debug("deleteVolume called! volume name: " + request.getVolumeName()
+ " force:" + request.getForce());
}
try {
if (request.hasForce()) {
impl.deleteVolume(request.getUserName(), request.getVolumeName(),
request.getForce());
} else {
impl.deleteVolume(request.getUserName(), request.getVolumeName(),
false);
}
} catch (IOException e) {
throw new ServiceException(e);
}
return CBlockServiceProtocolProtos.DeleteVolumeResponseProto
.newBuilder().build();
}
@Override
public CBlockServiceProtocolProtos.InfoVolumeResponseProto infoVolume(
RpcController controller,
CBlockServiceProtocolProtos.InfoVolumeRequestProto request
) throws ServiceException {
if (LOG.isDebugEnabled()) {
LOG.debug("infoVolume called! volume name: " + request.getVolumeName());
}
CBlockServiceProtocolProtos.InfoVolumeResponseProto.Builder resp =
CBlockServiceProtocolProtos.InfoVolumeResponseProto.newBuilder();
CBlockServiceProtocolProtos.VolumeInfoProto.Builder volumeInfoProto =
CBlockServiceProtocolProtos.VolumeInfoProto.newBuilder();
VolumeInfo volumeInfo;
try {
volumeInfo = impl.infoVolume(request.getUserName(),
request.getVolumeName());
} catch (IOException e) {
throw new ServiceException(e);
}
volumeInfoProto.setVolumeSize(volumeInfo.getVolumeSize());
volumeInfoProto.setBlockSize(volumeInfo.getBlockSize());
volumeInfoProto.setUsage(volumeInfo.getUsage());
volumeInfoProto.setUserName(volumeInfo.getUserName());
volumeInfoProto.setVolumeName(volumeInfo.getVolumeName());
resp.setVolumeInfo(volumeInfoProto);
return resp.build();
}
@Override
public CBlockServiceProtocolProtos.ListVolumeResponseProto listVolume(
RpcController controller,
CBlockServiceProtocolProtos.ListVolumeRequestProto request
) throws ServiceException {
CBlockServiceProtocolProtos.ListVolumeResponseProto.Builder resp =
CBlockServiceProtocolProtos.ListVolumeResponseProto.newBuilder();
String userName = null;
if (request.hasUserName()) {
userName = request.getUserName();
}
if (LOG.isDebugEnabled()) {
LOG.debug("list volume received for :" + userName);
}
List<VolumeInfo> volumes;
try {
volumes = impl.listVolume(userName);
} catch (IOException e) {
throw new ServiceException(e);
}
for (VolumeInfo volume : volumes) {
CBlockServiceProtocolProtos.VolumeInfoProto.Builder volumeEntryProto
= CBlockServiceProtocolProtos.VolumeInfoProto.newBuilder();
volumeEntryProto.setUserName(volume.getUserName());
volumeEntryProto.setVolumeName(volume.getVolumeName());
volumeEntryProto.setVolumeSize(volume.getVolumeSize());
volumeEntryProto.setBlockSize(volume.getBlockSize());
resp.addVolumeEntry(volumeEntryProto.build());
}
return resp.build();
}
public CBlockServiceProtocolServerSideTranslatorPB(
CBlockServiceProtocol impl) {
this.impl = impl;
}
}

View File

@ -0,0 +1,18 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.protocolPB;

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.storage;
import org.apache.hadoop.cblock.meta.ContainerDescriptor;
import org.apache.hadoop.classification.InterfaceStability;
import java.io.IOException;
/**
* The interface to call into underlying container layer.
*
* Written as interface to allow easy testing: implement a mock container layer
* for standalone testing of CBlock API without actually calling into remote
* containers. Actual container layer can simply re-implement this.
*
* NOTE this is temporarily needed class. When SCM containers are full-fledged,
* this interface will likely be removed.
*/
@InterfaceStability.Unstable
public interface IStorageClient {
ContainerDescriptor createContainer() throws IOException;
void deleteContainer(String containerId) throws IOException;
ContainerDescriptor getContainer(String containerId) throws IOException;
long getContainerSize() throws IOException;
}

View File

@ -0,0 +1,306 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.storage;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.cblock.exception.CBlockException;
import org.apache.hadoop.cblock.meta.ContainerDescriptor;
import org.apache.hadoop.cblock.meta.VolumeDescriptor;
import org.apache.hadoop.cblock.meta.VolumeInfo;
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* This class maintains the key space of CBlock, more specifically, the
* volume to container mapping. The core data structure
* is a map from users to their volumes info, where volume info is a handler
* to a volume, containing information for IO on that volume.
*
* and a storage client responsible for talking to the SCM
*
* TODO : all the volume operations are fully serialized, which can potentially
* be optimized.
*
* TODO : if the certain operations (e.g. create) failed, the failure-handling
* logic may not be properly implemented currently.
*/
public class StorageManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(StorageManager.class);
private final IStorageClient storageClient;
/**
* We will NOT have the situation where same kv pair getting
* processed, but it is possible to have multiple kv pair being
* processed at same time.
*
* So using just ConcurrentHashMap should be sufficient
*
* Again since currently same user accessing from multiple places
* is not allowed, no need to consider concurrency of volume map
* within one user
*/
private ConcurrentHashMap<String, HashMap<String, VolumeDescriptor>>
user2VolumeMap;
// size of an underlying container.
// TODO : assuming all containers are of the same size
private long containerSizeB;
public StorageManager(IStorageClient storageClient) throws IOException {
this.storageClient = storageClient;
this.user2VolumeMap = new ConcurrentHashMap<>();
this.containerSizeB = storageClient.getContainerSize();
}
/**
* This call will put the volume into in-memory map.
*
* more specifically, make the volume discoverable on jSCSI server
* and keep it's reference in-memory for look up.
* @param userName the user name of the volume.
* @param volumeName the name of the volume,
* @param volume a {@link VolumeDescriptor} object encapsulating the
* information about the volume.
*/
private void makeVolumeReady(String userName, String volumeName,
VolumeDescriptor volume) {
HashMap<String, VolumeDescriptor> userVolumes;
if (user2VolumeMap.containsKey(userName)) {
userVolumes = user2VolumeMap.get(userName);
} else {
userVolumes = new HashMap<>();
user2VolumeMap.put(userName, userVolumes);
}
userVolumes.put(volumeName, volume);
}
/**
* Called by CBlockManager to add volumes read from persistent store into
* memory, need to contact SCM to setup the reference to the containers given
* their id.
*
* Only for failover process where container meta info is read from
* persistent store, and containers themselves are alive.
*
* TODO : Currently, this method is not being called as failover process
* is not implemented yet.
*
* @param volumeDescriptor a {@link VolumeDescriptor} object encapsulating
* the information about a volume.
* @throws IOException when adding the volume failed. e.g. volume already
* exist, or no more container available.
*/
public synchronized void addVolume(VolumeDescriptor volumeDescriptor)
throws IOException{
String userName = volumeDescriptor.getUserName();
String volumeName = volumeDescriptor.getVolumeName();
LOGGER.info("addVolume:" + userName + ":" + volumeName);
if (user2VolumeMap.containsKey(userName)
&& user2VolumeMap.get(userName).containsKey(volumeName)) {
throw new CBlockException("Volume already exist for "
+ userName + ":" + volumeName);
}
// the container ids are read from levelDB, setting up the
// container handlers here.
String[] containerIds = volumeDescriptor.getContainerIDs();
for (String containerId : containerIds) {
try {
ContainerDescriptor containerDescriptor =
storageClient.getContainer(containerId);
volumeDescriptor.addContainer(containerDescriptor);
} catch (IOException e) {
LOGGER.error("Getting container failed! Container:{} error:{}",
containerId, e);
throw e;
}
}
// now ready to put into in-memory map.
makeVolumeReady(userName, volumeName, volumeDescriptor);
}
/**
* Called by CBlock server when creating a fresh volume. The core
* logic is adding needed information into in-memory meta data.
*
* @param userName the user name of the volume.
* @param volumeName the name of the volume.
* @param volumeSize the size of the volume.
* @param blockSize the block size of the volume.
* @throws CBlockException when the volume can not be created.
*/
public synchronized void createVolume(String userName, String volumeName,
long volumeSize, int blockSize) throws CBlockException {
LOGGER.debug("createVolume:" + userName + ":" + volumeName);
if (user2VolumeMap.containsKey(userName)
&& user2VolumeMap.get(userName).containsKey(volumeName)) {
throw new CBlockException("Volume already exist for "
+ userName + ":" + volumeName);
}
if (volumeSize < blockSize) {
throw new CBlockException("Volume size smaller than block size? " +
"volume size:" + volumeSize + " block size:" + blockSize);
}
VolumeDescriptor volume;
int containerIdx = 0;
try {
volume = new VolumeDescriptor(userName, volumeName,
volumeSize, blockSize);
long allocatedSize = 0;
ArrayList<String> containerIds = new ArrayList<>();
while (allocatedSize < volumeSize) {
ContainerDescriptor container = storageClient.createContainer();
container.setContainerIndex(containerIdx);
volume.addContainer(container);
containerIds.add(container.getContainerID());
allocatedSize += containerSizeB;
containerIdx += 1;
}
volume.setContainerIDs(containerIds);
} catch (IOException e) {
throw new CBlockException("Error when creating volume:" + e.getMessage());
// TODO : delete already created containers? or re-try policy
}
makeVolumeReady(userName, volumeName, volume);
}
/**
* Called by CBlock server to delete a specific volume. Mainly
* to check whether it can be deleted, and remove it from in-memory meta
* data.
*
* @param userName the user name of the volume.
* @param volumeName the name of the volume.
* @param force if set to false, only delete volume it is empty, otherwise
* throw exception. if set to true, delete regardless.
* @throws CBlockException when the volume can not be deleted.
*/
public synchronized void deleteVolume(String userName, String volumeName,
boolean force) throws CBlockException {
if (!user2VolumeMap.containsKey(userName)
|| !user2VolumeMap.get(userName).containsKey(volumeName)) {
throw new CBlockException("Deleting non-exist volume "
+ userName + ":" + volumeName);
}
if (!force && !user2VolumeMap.get(userName).get(volumeName).isEmpty()) {
throw new CBlockException("Deleting a non-empty volume without force!");
}
VolumeDescriptor volume = user2VolumeMap.get(userName).remove(volumeName);
for (String containerID : volume.getContainerIDsList()) {
try {
storageClient.deleteContainer(containerID);
} catch (IOException e) {
LOGGER.error("Error deleting container Container:{} error:{}",
containerID, e);
throw new CBlockException(e.getMessage());
}
}
if (user2VolumeMap.get(userName).size() == 0) {
user2VolumeMap.remove(userName);
}
}
/**
* Called by CBlock server to get information of a specific volume.
*
* @param userName the user name of the volume.
* @param volumeName the name of the volume.
* @return a {@link VolumeInfo} object encapsulating the information of the
* volume.
* @throws CBlockException when the information can not be retrieved.
*/
public synchronized VolumeInfo infoVolume(String userName, String volumeName)
throws CBlockException {
if (!user2VolumeMap.containsKey(userName)
|| !user2VolumeMap.get(userName).containsKey(volumeName)) {
throw new CBlockException("Getting info for non-exist volume "
+ userName + ":" + volumeName);
}
return user2VolumeMap.get(userName).get(volumeName).getInfo();
}
/**
* Called by CBlock server to check whether the given volume can be
* mounted, i.e. whether it can be found in the meta data.
*
* return a {@link MountVolumeResponse} with isValid flag to indicate
* whether the volume can be mounted or not.
*
* @param userName the user name of the volume.
* @param volumeName the name of the volume
* @return a {@link MountVolumeResponse} object encapsulating whether the
* volume is valid, and if yes, the requried information for client to
* read/write the volume.
*/
public synchronized MountVolumeResponse isVolumeValid(
String userName, String volumeName) {
if (!user2VolumeMap.containsKey(userName)
|| !user2VolumeMap.get(userName).containsKey(volumeName)) {
// in the case of invalid volume, no need to set any value other than
// isValid flag.
return new MountVolumeResponse(false, null, null, 0, 0, null);
}
VolumeDescriptor volume = user2VolumeMap.get(userName).get(volumeName);
return new MountVolumeResponse(true, userName,
volumeName, volume.getVolumeSize(), volume.getBlockSize(),
volume.getContainerIDsList());
}
/**
* Called by CBlock manager to list all volumes.
*
* @param userName the userName whose volume to be listed, if set to null,
* all volumes will be listed.
* @return a list of {@link VolumeDescriptor} representing all volumes
* requested.
*/
public synchronized List<VolumeDescriptor> getAllVolume(String userName) {
ArrayList<VolumeDescriptor> allVolumes = new ArrayList<>();
if (userName == null) {
for (Map.Entry<String, HashMap<String, VolumeDescriptor>> entry
: user2VolumeMap.entrySet()) {
allVolumes.addAll(entry.getValue().values());
}
} else {
if (user2VolumeMap.containsKey(userName)) {
allVolumes.addAll(user2VolumeMap.get(userName).values());
}
}
return allVolumes;
}
/**
* Only for testing the behavior of create/delete volumes.
*/
@VisibleForTesting
public VolumeDescriptor getVolume(String userName, String volumeName) {
if (!user2VolumeMap.containsKey(userName)
|| !user2VolumeMap.get(userName).containsKey(volumeName)) {
return null;
}
return user2VolumeMap.get(userName).get(volumeName);
}
}

View File

@ -0,0 +1,18 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.storage;

View File

@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock;
import org.apache.hadoop.cblock.meta.VolumeInfo;
import org.apache.hadoop.cblock.storage.IStorageClient;
import org.apache.hadoop.cblock.util.MockStorageClient;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.HashSet;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* This class tests the basics of CBlock server. Mainly about the four
* operations on volumes: create, delete, info and list.
*/
public class TestCBlockServer {
private static CBlockManager cBlockManager;
private static CBlockConfiguration conf;
@BeforeClass
public static void setup() throws Exception {
IStorageClient storageClient = new MockStorageClient();
conf = new CBlockConfiguration();
cBlockManager = new CBlockManager(conf, storageClient);
}
/**
* Test create volume for different users.
* @throws Exception
*/
@Test
public void testCreateVolume() throws Exception {
String userName1 = "testCreateUser1";
String userName2 = "testCreateUser2";
String volumeName1 = "testVolume1";
String volumeName2 = "testVolume2";
long volumeSize = 1L*1024*1024;
int blockSize = 4096;
cBlockManager.createVolume(userName1, volumeName1, volumeSize, blockSize);
List<VolumeInfo> volumes = cBlockManager.listVolume(userName1);
assertEquals(1, volumes.size());
VolumeInfo existingVolume = volumes.get(0);
assertEquals(userName1, existingVolume.getUserName());
assertEquals(volumeName1, existingVolume.getVolumeName());
assertEquals(volumeSize, existingVolume.getVolumeSize());
assertEquals(blockSize, existingVolume.getBlockSize());
cBlockManager.createVolume(userName1, volumeName2, volumeSize, blockSize);
cBlockManager.createVolume(userName2, volumeName1, volumeSize, blockSize);
volumes = cBlockManager.listVolume(userName1);
assertEquals(2, volumes.size());
volumes = cBlockManager.listVolume(userName2);
assertEquals(1, volumes.size());
}
/**
* Test delete volume.
* @throws Exception
*/
@Test
public void testDeleteVolume() throws Exception {
String userName = "testDeleteUser";
String volumeName1 = "testVolume1";
String volumeName2 = "testVolume2";
long volumeSize = 1L*1024*1024;
int blockSize = 4096;
cBlockManager.createVolume(userName, volumeName1, volumeSize, blockSize);
cBlockManager.createVolume(userName, volumeName2, volumeSize, blockSize);
cBlockManager.deleteVolume(userName, volumeName1, true);
List<VolumeInfo> volumes = cBlockManager.listVolume(userName);
assertEquals(1, volumes.size());
VolumeInfo existingVolume = volumes.get(0);
assertEquals(userName, existingVolume.getUserName());
assertEquals(volumeName2, existingVolume.getVolumeName());
assertEquals(volumeSize, existingVolume.getVolumeSize());
assertEquals(blockSize, existingVolume.getBlockSize());
}
/**
* Test info volume.
*
* TODO : usage field is not being tested (as it is not implemented yet)
* @throws Exception
*/
@Test
public void testInfoVolume() throws Exception {
String userName = "testInfoUser";
String volumeName = "testVolume";
long volumeSize = 1L*1024*1024;
int blockSize = 4096;
cBlockManager.createVolume(userName, volumeName, volumeSize, blockSize);
VolumeInfo info = cBlockManager.infoVolume(userName, volumeName);
assertEquals(userName, info.getUserName());
assertEquals(volumeName, info.getVolumeName());
assertEquals(volumeSize, info.getVolumeSize());
assertEquals(blockSize, info.getBlockSize());
}
/**
* Test listing a number of volumes.
* @throws Exception
*/
@Test
public void testListVolume() throws Exception {
String userName = "testListUser";
String volumeName = "testVolume";
long volumeSize = 1L*1024*1024;
int blockSize = 4096;
int volumeNum = 100;
for (int i = 0; i<volumeNum; i++) {
cBlockManager.createVolume(userName, volumeName + i,
volumeSize, blockSize);
}
List<VolumeInfo> volumes = cBlockManager.listVolume(userName);
assertEquals(volumeNum, volumes.size());
HashSet<String> volumeIds = new HashSet<>();
for (int i = 0; i<volumeNum; i++) {
VolumeInfo volumeInfo = volumes.get(i);
assertEquals(userName, volumeInfo.getUserName());
assertFalse(volumeIds.contains(volumeName + i));
volumeIds.add(volumeName + i);
assertEquals(volumeSize, volumeInfo.getVolumeSize());
assertEquals(blockSize, volumeInfo.getBlockSize());
}
for (int i = 0; i<volumeNum; i++) {
assertTrue(volumeIds.contains(volumeName + i));
}
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.util;
import org.apache.hadoop.cblock.meta.ContainerDescriptor;
import java.util.HashMap;
/**
* NOTE : This class is only for testing purpose.
*
* Mock an underlying container storage layer, expose to CBlock to perform
* IO. While in this mock implementation, a container is nothing more than
* a in memory hashmap.
*
* This is to allow volume creation call and perform standalone tests.
*/
public final class ContainerLookUpService {
private static HashMap<String, ContainerDescriptor>
containers = new HashMap<>();
/**
* Return an *existing* container with given Id.
*
* TODO : for testing purpose, return a new container if the given Id
* is not found
*
* found
* @param containerID
* @return
*/
public static ContainerDescriptor lookUp(String containerID) {
if (!containers.containsKey(containerID)) {
System.err.println("A container id never seen, return a new one " +
"for testing purpose:" + containerID);
containers.put(containerID, new ContainerDescriptor(containerID));
}
return containers.get(containerID);
}
public static void addContainer(String containerID) {
containers.put(containerID, new ContainerDescriptor(containerID));
}
private ContainerLookUpService() {
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.util;
import org.apache.hadoop.cblock.meta.ContainerDescriptor;
import org.apache.hadoop.cblock.storage.IStorageClient;
import java.io.IOException;
/**
* This class is the one that directly talks to SCM server.
*
* NOTE : this is only a mock class, only to allow testing volume
* creation without actually creating containers. In real world, need to be
* replaced with actual container look up calls.
*
*/
public class MockStorageClient implements IStorageClient {
private static long currentContainerId = -1;
/**
* Ask SCM to get a exclusive container.
*
* @return A container descriptor object to locate this container
* @throws Exception
*/
public ContainerDescriptor createContainer() throws IOException {
currentContainerId += 1;
ContainerLookUpService.addContainer(Long.toString(currentContainerId));
return ContainerLookUpService.lookUp(Long.toString(currentContainerId));
}
/**
* As this is only a testing class, with all "container" maintained in
* memory, no need to really delete anything for now.
* @param containerId
* @throws IOException
*/
@Override
public void deleteContainer(String containerId) throws IOException {
}
/**
* Return reference to an *existing* container with given ID.
*
* @param containerId
* @return
* @throws IOException
*/
public ContainerDescriptor getContainer(String containerId)
throws IOException {
return ContainerLookUpService.lookUp(containerId);
}
@Override
public long getContainerSize() throws IOException {
// just return a constant value for now
return 5L*1024*1024*1024; // 5GB
}
}