diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java new file mode 100644 index 00000000000..3d9b11b30b4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock; + +/** + * This class contains constants for configuration keys used in CBlock. + */ +public final class CBlockConfigKeys { + public static final String DFS_CBLOCK_ENABLED_KEY = + "dfs.cblock.enabled"; + public static final String DFS_CBLOCK_SERVICERPC_ADDRESS_KEY = + "dfs.cblock.servicerpc-address"; + public static final int DFS_CBLOCK_RPCSERVICE_PORT_DEFAULT = + 9810; + public static final String DFS_CBLOCK_RPCSERVICE_IP_DEFAULT = + "0.0.0.0"; + public static final String DFS_CBLOCK_SERVICERPC_ADDRESS_DEFAULT = + DFS_CBLOCK_RPCSERVICE_IP_DEFAULT + + ":" + DFS_CBLOCK_RPCSERVICE_PORT_DEFAULT; + + public static final String DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY = + "dfs.cblock.jscsi-address"; + + //The port on CBlockManager node for jSCSI to ask + public static final int DFS_CBLOCK_JSCSI_PORT_DEFAULT = + 9811; + public static final String DFS_CBLOCK_JSCSIRPC_ADDRESS_DEFAULT = + DFS_CBLOCK_RPCSERVICE_IP_DEFAULT + + ":" + DFS_CBLOCK_JSCSI_PORT_DEFAULT; + + + public static final String DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY = + "dfs.cblock.service.rpc-bind-host"; + public static final String DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY = + "dfs.cblock.jscsi.rpc-bind-host"; + + // default block size is 4KB + public static final int DFS_CBLOCK_SERVICE_BLOCK_SIZE_DEFAULT = + 4096; + + public static final String DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY = + "dfs.storage.service.handler.count"; + public static final int DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT = 10; + + private CBlockConfigKeys() { + + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfiguration.java new file mode 100644 index 00000000000..d44f4246301 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfiguration.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock; + +import org.apache.hadoop.conf.Configuration; + +/** + * This class specifies the CBlock configuration resources. + */ +public class CBlockConfiguration extends Configuration { + static { + // adds the default resources + Configuration.addDefaultResource("hdfs-default.xml"); + Configuration.addDefaultResource("hdfs-site.xml"); + Configuration.addDefaultResource("cblock-default.xml"); + Configuration.addDefaultResource("cblock-site.xml"); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockManager.java new file mode 100644 index 00000000000..28b33500e01 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockManager.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock; + +import com.google.protobuf.BlockingService; +import org.apache.hadoop.cblock.meta.VolumeDescriptor; +import org.apache.hadoop.cblock.meta.VolumeInfo; +import org.apache.hadoop.cblock.proto.CBlockClientServerProtocol; +import org.apache.hadoop.cblock.proto.CBlockServiceProtocol; +import org.apache.hadoop.cblock.proto.MountVolumeResponse; +import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos; +import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos; +import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolPB; +import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolServerSideTranslatorPB; +import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB; +import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolServerSideTranslatorPB; +import org.apache.hadoop.cblock.storage.IStorageClient; +import org.apache.hadoop.cblock.storage.StorageManager; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSIRPC_ADDRESS_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_ADDRESS_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_ADDRESS_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY; + +/** + * The main entry point of CBlock operations, ALL the CBlock operations + * will go through this class. But NOTE that: + * + * volume operations (create/ + * delete/info) are: + * client -> CBlockManager -> StorageManager -> CBlock client + * + * IO operations (put/get block) are; + * client -> CBlock client -> container + * + */ +public class CBlockManager implements CBlockServiceProtocol, + CBlockClientServerProtocol { + + private static final Logger LOG = + LoggerFactory.getLogger(CBlockManager.class); + + private final RPC.Server cblockService; + private final RPC.Server cblockServer; + + private final StorageManager storageManager; + + public CBlockManager(CBlockConfiguration conf, IStorageClient storageClient + ) throws IOException { + storageManager = new StorageManager(storageClient); + + RPC.setProtocolEngine(conf, CBlockServiceProtocolPB.class, + ProtobufRpcEngine.class); + RPC.setProtocolEngine(conf, CBlockClientServerProtocolPB.class, + ProtobufRpcEngine.class); + // start service for client command-to-cblock server service + InetSocketAddress serviceRpcAddr = NetUtils.createSocketAddr( + conf.getTrimmed(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, + DFS_CBLOCK_SERVICERPC_ADDRESS_DEFAULT), -1, + DFS_CBLOCK_SERVICERPC_ADDRESS_KEY); + BlockingService cblockProto = + CBlockServiceProtocolProtos + .CBlockServiceProtocolService + .newReflectiveBlockingService( + new CBlockServiceProtocolServerSideTranslatorPB(this) + ); + cblockService = startRpcServer(conf, CBlockServiceProtocolPB.class, + cblockProto, serviceRpcAddr, + DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY, + DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY, + DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT); + + LOG.info("CBlock manager listening for client commands on: {}", + serviceRpcAddr); + // now start service for cblock client-to-cblock server communication + InetSocketAddress serverRpcAddr = NetUtils.createSocketAddr( + conf.get(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, + DFS_CBLOCK_JSCSIRPC_ADDRESS_DEFAULT), -1, + DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY); + BlockingService serverProto = + CBlockClientServerProtocolProtos + .CBlockClientServerProtocolService + .newReflectiveBlockingService( + new CBlockClientServerProtocolServerSideTranslatorPB(this) + ); + cblockServer = startRpcServer( + conf, CBlockClientServerProtocolPB.class, + serverProto, serverRpcAddr, + DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY, + DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY, + DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT); + LOG.info("CBlock server listening for client commands on: {}", + serverRpcAddr); + } + + public void start() { + cblockService.start(); + cblockServer.start(); + LOG.info("CBlock manager started!"); + } + + public void stop() { + cblockService.stop(); + cblockServer.stop(); + } + + public void join() { + try { + cblockService.join(); + cblockServer.join(); + } catch (InterruptedException e) { + LOG.error("Interrupted during join"); + Thread.currentThread().interrupt(); + } + } + + /** + * Starts an RPC server, if configured. + * + * @param conf configuration + * @param protocol RPC protocol provided by RPC server + * @param instance RPC protocol implementation instance + * @param addr configured address of RPC server + * @param bindHostKey configuration key for setting explicit bind host. If + * the property is not configured, then the bind host is taken from addr. + * @param handlerCountKey configuration key for RPC server handler count + * @param handlerCountDefault default RPC server handler count if unconfigured + * @return RPC server, or null if addr is null + * @throws IOException if there is an I/O error while creating RPC server + */ + private static RPC.Server startRpcServer(CBlockConfiguration conf, + Class protocol, BlockingService instance, + InetSocketAddress addr, String bindHostKey, + String handlerCountKey, int handlerCountDefault) throws IOException { + if (addr == null) { + return null; + } + String bindHost = conf.getTrimmed(bindHostKey); + if (bindHost == null || bindHost.isEmpty()) { + bindHost = addr.getHostName(); + } + int numHandlers = conf.getInt(handlerCountKey, handlerCountDefault); + RPC.Server rpcServer = new RPC.Builder(conf) + .setProtocol(protocol) + .setInstance(instance) + .setBindAddress(bindHost) + .setPort(addr.getPort()) + .setNumHandlers(numHandlers) + .setVerbose(false) + .setSecretManager(null) + .build(); + return rpcServer; + } + + @Override + public MountVolumeResponse mountVolume( + String userName, String volumeName) throws IOException { + return storageManager.isVolumeValid(userName, volumeName); + } + + @Override + public void createVolume(String userName, String volumeName, + long volumeSize, int blockSize) throws IOException { + LOG.info("Create volume received: userName: {} volumeName: {} " + + "volumeSize: {} blockSize: {}", userName, volumeName, + volumeSize, blockSize); + // It is important to create in-memory representation of the + // volume first, then writes to persistent storage (levelDB) + // such that it is guaranteed that when there is an entry in + // levelDB, the volume is allocated. (more like a UNDO log fashion) + // TODO: what if creation failed? we allocated containers but lost + // the reference to the volume and all it's containers. How to release + // the containers? + storageManager.createVolume(userName, volumeName, volumeSize, blockSize); + VolumeDescriptor volume = storageManager.getVolume(userName, volumeName); + if (volume == null) { + throw new IOException("Volume creation failed!"); + } + } + + @Override + public void deleteVolume(String userName, + String volumeName, boolean force) throws IOException { + LOG.info("Delete volume received: volume:" + volumeName + + " force?:" + force); + storageManager.deleteVolume(userName, volumeName, force); + } + + @Override + public VolumeInfo infoVolume(String userName, String volumeName + ) throws IOException { + LOG.info("Info volume received: volume: {}", volumeName); + return storageManager.infoVolume(userName, volumeName); + } + + @Override + public List listVolume(String userName) throws IOException { + ArrayList response = new ArrayList<>(); + List allVolumes = + storageManager.getAllVolume(userName); + for (VolumeDescriptor volume : allVolumes) { + VolumeInfo info = + new VolumeInfo(volume.getUserName(), volume.getVolumeName(), + volume.getVolumeSize(), volume.getBlockSize()); + response.add(info); + } + return response; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/exception/CBlockException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/exception/CBlockException.java new file mode 100644 index 00000000000..8f6b82b75a5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/exception/CBlockException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.exception; + +import java.io.IOException; + +/** + * The exception class used in CBlock. + */ +public class CBlockException extends IOException { + public CBlockException(String message) { + super(message); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/exception/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/exception/package-info.java new file mode 100644 index 00000000000..268b8cb7f3a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/exception/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.exception; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/meta/ContainerDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/meta/ContainerDescriptor.java new file mode 100644 index 00000000000..298b7bcb3e3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/meta/ContainerDescriptor.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.meta; + +/** + * + * The internal representation of a container maintained by CBlock server. + * Include enough information to exactly identify a container for read/write + * operation. + * + * NOTE that this class is work-in-progress. Depends on HDFS-7240 container + * implementation. Currently only to allow testing. + */ +public class ContainerDescriptor { + private final String containerID; + // the index of this container with in a volume + // on creation, there is no way to know the index of the container + // as it is a volume specific information + private int containerIndex; + + public ContainerDescriptor(String containerID) { + this.containerID = containerID; + } + + public void setContainerIndex(int idx) { + this.containerIndex = idx; + } + + public String getContainerID() { + return containerID; + } + + public int getContainerIndex() { + return containerIndex; + } + + public long getUtilization() { + return 0; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/meta/VolumeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/meta/VolumeDescriptor.java new file mode 100644 index 00000000000..4c791b3cfb1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/meta/VolumeDescriptor.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.meta; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * The internal representation maintained by CBlock server as the info for + * a volume. Contains the list of containers belonging to this volume. + * + * Many methods of this class is made such that the volume information ( + * including container list) can be easily transformed into a Json string + * that can be stored/parsed from a persistent store for cblock server + * persistence. + * + * This class is still work-in-progress. + */ +public class VolumeDescriptor { + // The main data structure is the container location map + // other thing are mainly just information + + // since only one operation at a time is allowed, no + // need to consider concurrency control here + + // key is container id + + private static final Logger LOG = + LoggerFactory.getLogger(VolumeDescriptor.class); + + private HashMap containerMap; + private String userName; + private int blockSize; + private long volumeSize; + private String volumeName; + // this is essentially the ordered keys of containerMap + // which is kind of redundant information. But since we + // are likely to access it frequently based on ordering. + // keeping this copy to avoid having to sort the key every + // time + private List containerIdOrdered; + + /** + * This is not being called explicitly, but this is necessary as + * it will be called by the parse method implicitly when + * reconstructing the object from json string. The get*() methods + * and set*() methods are for the same purpose also. + */ + public VolumeDescriptor() { + containerMap = new HashMap<>(); + containerIdOrdered = new ArrayList<>(); + } + + public VolumeDescriptor(String userName, String volumeName, long volumeSize, + int blockSize) { + this.containerMap = new HashMap<>(); + this.userName = userName; + this.volumeName = volumeName; + this.blockSize = blockSize; + this.volumeSize = volumeSize; + this.containerIdOrdered = new LinkedList<>(); + } + + public String getUserName() { + return userName; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + public String getVolumeName() { + return volumeName; + } + + public void setVolumeName(String volumeName) { + this.volumeName = volumeName; + } + + public long getVolumeSize() { + return volumeSize; + } + + public void setVolumeSize(long volumeSize) { + this.volumeSize = volumeSize; + } + + public int getBlockSize() { + return blockSize; + } + + public void setBlockSize(int blockSize) { + this.blockSize = blockSize; + } + + public void setContainerIDs(ArrayList containerIDs) { + containerIdOrdered.addAll(containerIDs); + } + + public void addContainer(ContainerDescriptor containerDescriptor) { + containerMap.put(containerDescriptor.getContainerID(), + containerDescriptor); + } + + public boolean isEmpty() { + VolumeInfo info = getInfo(); + return info.getUsage() == 0; + } + + public VolumeInfo getInfo() { + // TODO : need to actually go through all containers of this volume and + // ask for their utilization. + long utilization = 0; + for (Map.Entry entry : + containerMap.entrySet()) { + utilization += entry.getValue().getUtilization(); + } + return new VolumeInfo(this.userName, this.volumeName, + this.volumeSize, this.blockSize, + utilization * blockSize); + } + + public String[] getContainerIDs() { + //ArrayList ids = new ArrayList(containerMap.keySet()); + //return ids.toArray(new Long[ids.size()]); + return containerIdOrdered.toArray(new String[containerIdOrdered.size()]); + } + + public List getContainerIDsList() { + return new ArrayList<>(containerIdOrdered); + } + + @Override + public String toString() { + String string = ""; + string += "Username:" + userName + "\n"; + string += "VolumeName:" + volumeName + "\n"; + string += "VolumeSize:" + volumeSize + "\n"; + string += "blockSize:" + blockSize + "\n"; + string += "containerIds:" + containerIdOrdered + "\n"; + string += "containerIdsWithObject:" + containerMap.keySet(); + return string; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/meta/VolumeInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/meta/VolumeInfo.java new file mode 100644 index 00000000000..7f50c41d0c9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/meta/VolumeInfo.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.meta; + +/** + * A wrapper class that represents the information about a volume. Used in + * communication between CBlock client and CBlock server only. + */ +public class VolumeInfo { + private final String userName; + private final String volumeName; + private final long volumeSize; + private final long blockSize; + private final long usage; + + public VolumeInfo(String userName, String volumeName, long volumeSize, + long blockSize, long usage) { + this.userName = userName; + this.volumeName = volumeName; + this.volumeSize = volumeSize; + this.blockSize = blockSize; + this.usage = usage; + } + + // When listing volume, the usage will not be set. + public VolumeInfo(String userName, String volumeName, long volumeSize, + long blockSize) { + this.userName = userName; + this.volumeName = volumeName; + this.volumeSize = volumeSize; + this.blockSize = blockSize; + this.usage = -1; + } + + public long getVolumeSize() { + return volumeSize; + } + + public long getBlockSize() { + return blockSize; + } + + public long getUsage() { + return usage; + } + + public String getUserName() { + return userName; + } + + public String getVolumeName() { + return volumeName; + } + + @Override + public String toString() { + return " userName:" + userName + + " volumeName:" + volumeName + + " volumeSize:" + volumeSize + + " blockSize:" + blockSize + + " (sizeInBlocks:" + volumeSize/blockSize + ")" + + " usageInBlocks:" + usage; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/meta/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/meta/package-info.java new file mode 100644 index 00000000000..a331d7a831a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/meta/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.meta; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/package-info.java new file mode 100644 index 00000000000..a7d5d8b4e60 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/proto/CBlockClientServerProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/proto/CBlockClientServerProtocol.java new file mode 100644 index 00000000000..4f15b0ab9b5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/proto/CBlockClientServerProtocol.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.proto; + +import java.io.IOException; + +/** + * The protocol that CBlock client side uses to talk to server side. CBlock + * client is the point where a volume is mounted. All the actual volume IO + * operations will go through CBlock client after the volume is mounted. + * + * When users mount a volume on CBlock client, CBlock client side uses this + * protocol to send mount request to CBlock server. + */ +public interface CBlockClientServerProtocol { + MountVolumeResponse mountVolume(String userName, String volumeName) + throws IOException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/proto/CBlockServiceProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/proto/CBlockServiceProtocol.java new file mode 100644 index 00000000000..bf00bc075c9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/proto/CBlockServiceProtocol.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.proto; + +import org.apache.hadoop.cblock.meta.VolumeInfo; +import org.apache.hadoop.classification.InterfaceAudience; + +import java.io.IOException; +import java.util.List; + +/** + * CBlock uses a separate command line tool to send volume management + * operations to CBlock server, including create/delete/info/list volumes. This + * is the protocol used by the command line tool to send these requests and get + * responses from CBlock server. + */ +@InterfaceAudience.Private +public interface CBlockServiceProtocol { + + void createVolume(String userName, String volumeName, + long volumeSize, int blockSize) throws IOException; + + void deleteVolume(String userName, String volumeName, + boolean force) throws IOException; + + VolumeInfo infoVolume(String userName, + String volumeName) throws IOException; + + List listVolume(String userName) throws IOException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/proto/MountVolumeResponse.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/proto/MountVolumeResponse.java new file mode 100644 index 00000000000..3efc071fb55 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/proto/MountVolumeResponse.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.proto; + +import java.util.List; + +/** + * The response message of mounting a volume. Including enough information + * for the client to communicate (perform IO) with the volume containers + * directly. + */ +public class MountVolumeResponse { + private final boolean isValid; + private final String userName; + private final String volumeName; + private final long volumeSize; + private final int blockSize; + private List containerList; + + public MountVolumeResponse(boolean isValid, String userName, + String volumeName, long volumeSize, int blockSize, + List containerList) { + this.isValid = isValid; + this.userName = userName; + this.volumeName = volumeName; + this.volumeSize = volumeSize; + this.blockSize = blockSize; + this.containerList = containerList; + } + + public boolean getIsValid() { + return isValid; + } + + public String getUserName() { + return userName; + } + + public String getVolumeName() { + return volumeName; + } + + public long getVolumeSize() { + return volumeSize; + } + + public int getBlockSize() { + return blockSize; + } + + public List getContainerList() { + return containerList; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/proto/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/proto/package-info.java new file mode 100644 index 00000000000..33438ec58a7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/proto/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.proto; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/protocolPB/CBlockClientServerProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/protocolPB/CBlockClientServerProtocolPB.java new file mode 100644 index 00000000000..99f31107985 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/protocolPB/CBlockClientServerProtocolPB.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.protocolPB; + +import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.ipc.ProtocolInfo; + +/** + * This is the protocol CBlock client uses to talk to CBlock server. + * CBlock client is the mounting point of a volume. When a user mounts a + * volume, the cBlock client running on the local node will use this protocol + * to talk to CBlock server to mount the volume. + */ +@ProtocolInfo(protocolName = + "org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocol", + protocolVersion = 1) +@InterfaceAudience.Private +public interface CBlockClientServerProtocolPB extends + CBlockClientServerProtocolProtos + .CBlockClientServerProtocolService.BlockingInterface { +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/protocolPB/CBlockClientServerProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/protocolPB/CBlockClientServerProtocolServerSideTranslatorPB.java new file mode 100644 index 00000000000..7295473b185 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/protocolPB/CBlockClientServerProtocolServerSideTranslatorPB.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.protocolPB; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.cblock.proto.CBlockClientServerProtocol; +import org.apache.hadoop.cblock.proto.MountVolumeResponse; +import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos; +import org.apache.hadoop.classification.InterfaceAudience; + +import java.io.IOException; +import java.util.List; + +/** + * The server side implementation of cblock client to server protocol. + */ +@InterfaceAudience.Private +public class CBlockClientServerProtocolServerSideTranslatorPB implements + CBlockClientServerProtocolPB { + + private final CBlockClientServerProtocol impl; + + public CBlockClientServerProtocolServerSideTranslatorPB( + CBlockClientServerProtocol impl) { + this.impl = impl; + } + + @Override + public CBlockClientServerProtocolProtos.MountVolumeResponseProto mountVolume( + RpcController controller, + CBlockClientServerProtocolProtos.MountVolumeRequestProto request) + throws ServiceException { + String userName = request.getUserName(); + String volumeName = request.getVolumeName(); + CBlockClientServerProtocolProtos.MountVolumeResponseProto.Builder + resp = + CBlockClientServerProtocolProtos + .MountVolumeResponseProto.newBuilder(); + try { + MountVolumeResponse result = impl.mountVolume(userName, volumeName); + boolean isValid = result.getIsValid(); + resp.setIsValid(isValid); + if (isValid) { + resp.setUserName(result.getUserName()); + resp.setVolumeName(result.getVolumeName()); + resp.setVolumeSize(result.getVolumeSize()); + resp.setBlockSize(result.getBlockSize()); + List containers = result.getContainerList(); + for (int i=0; i volumes; + try { + volumes = impl.listVolume(userName); + } catch (IOException e) { + throw new ServiceException(e); + } + for (VolumeInfo volume : volumes) { + CBlockServiceProtocolProtos.VolumeInfoProto.Builder volumeEntryProto + = CBlockServiceProtocolProtos.VolumeInfoProto.newBuilder(); + volumeEntryProto.setUserName(volume.getUserName()); + volumeEntryProto.setVolumeName(volume.getVolumeName()); + volumeEntryProto.setVolumeSize(volume.getVolumeSize()); + volumeEntryProto.setBlockSize(volume.getBlockSize()); + resp.addVolumeEntry(volumeEntryProto.build()); + } + return resp.build(); + } + + public CBlockServiceProtocolServerSideTranslatorPB( + CBlockServiceProtocol impl) { + this.impl = impl; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/protocolPB/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/protocolPB/package-info.java new file mode 100644 index 00000000000..5e03a92da26 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/protocolPB/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.protocolPB; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/IStorageClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/IStorageClient.java new file mode 100644 index 00000000000..1a58cbddc25 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/IStorageClient.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.storage; + +import org.apache.hadoop.cblock.meta.ContainerDescriptor; +import org.apache.hadoop.classification.InterfaceStability; + +import java.io.IOException; + +/** + * The interface to call into underlying container layer. + * + * Written as interface to allow easy testing: implement a mock container layer + * for standalone testing of CBlock API without actually calling into remote + * containers. Actual container layer can simply re-implement this. + * + * NOTE this is temporarily needed class. When SCM containers are full-fledged, + * this interface will likely be removed. + */ +@InterfaceStability.Unstable +public interface IStorageClient { + ContainerDescriptor createContainer() throws IOException; + + void deleteContainer(String containerId) throws IOException; + + ContainerDescriptor getContainer(String containerId) throws IOException; + + long getContainerSize() throws IOException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java new file mode 100644 index 00000000000..044dd248286 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.storage; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.cblock.exception.CBlockException; +import org.apache.hadoop.cblock.meta.ContainerDescriptor; +import org.apache.hadoop.cblock.meta.VolumeDescriptor; +import org.apache.hadoop.cblock.meta.VolumeInfo; +import org.apache.hadoop.cblock.proto.MountVolumeResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class maintains the key space of CBlock, more specifically, the + * volume to container mapping. The core data structure + * is a map from users to their volumes info, where volume info is a handler + * to a volume, containing information for IO on that volume. + * + * and a storage client responsible for talking to the SCM + * + * TODO : all the volume operations are fully serialized, which can potentially + * be optimized. + * + * TODO : if the certain operations (e.g. create) failed, the failure-handling + * logic may not be properly implemented currently. + */ +public class StorageManager { + private static final Logger LOGGER = + LoggerFactory.getLogger(StorageManager.class); + private final IStorageClient storageClient; + /** + * We will NOT have the situation where same kv pair getting + * processed, but it is possible to have multiple kv pair being + * processed at same time. + * + * So using just ConcurrentHashMap should be sufficient + * + * Again since currently same user accessing from multiple places + * is not allowed, no need to consider concurrency of volume map + * within one user + */ + private ConcurrentHashMap> + user2VolumeMap; + // size of an underlying container. + // TODO : assuming all containers are of the same size + private long containerSizeB; + + public StorageManager(IStorageClient storageClient) throws IOException { + this.storageClient = storageClient; + this.user2VolumeMap = new ConcurrentHashMap<>(); + this.containerSizeB = storageClient.getContainerSize(); + } + + /** + * This call will put the volume into in-memory map. + * + * more specifically, make the volume discoverable on jSCSI server + * and keep it's reference in-memory for look up. + * @param userName the user name of the volume. + * @param volumeName the name of the volume, + * @param volume a {@link VolumeDescriptor} object encapsulating the + * information about the volume. + */ + private void makeVolumeReady(String userName, String volumeName, + VolumeDescriptor volume) { + HashMap userVolumes; + if (user2VolumeMap.containsKey(userName)) { + userVolumes = user2VolumeMap.get(userName); + } else { + userVolumes = new HashMap<>(); + user2VolumeMap.put(userName, userVolumes); + } + userVolumes.put(volumeName, volume); + } + + /** + * Called by CBlockManager to add volumes read from persistent store into + * memory, need to contact SCM to setup the reference to the containers given + * their id. + * + * Only for failover process where container meta info is read from + * persistent store, and containers themselves are alive. + * + * TODO : Currently, this method is not being called as failover process + * is not implemented yet. + * + * @param volumeDescriptor a {@link VolumeDescriptor} object encapsulating + * the information about a volume. + * @throws IOException when adding the volume failed. e.g. volume already + * exist, or no more container available. + */ + public synchronized void addVolume(VolumeDescriptor volumeDescriptor) + throws IOException{ + String userName = volumeDescriptor.getUserName(); + String volumeName = volumeDescriptor.getVolumeName(); + LOGGER.info("addVolume:" + userName + ":" + volumeName); + if (user2VolumeMap.containsKey(userName) + && user2VolumeMap.get(userName).containsKey(volumeName)) { + throw new CBlockException("Volume already exist for " + + userName + ":" + volumeName); + } + // the container ids are read from levelDB, setting up the + // container handlers here. + String[] containerIds = volumeDescriptor.getContainerIDs(); + + for (String containerId : containerIds) { + try { + ContainerDescriptor containerDescriptor = + storageClient.getContainer(containerId); + volumeDescriptor.addContainer(containerDescriptor); + } catch (IOException e) { + LOGGER.error("Getting container failed! Container:{} error:{}", + containerId, e); + throw e; + } + } + // now ready to put into in-memory map. + makeVolumeReady(userName, volumeName, volumeDescriptor); + } + + + /** + * Called by CBlock server when creating a fresh volume. The core + * logic is adding needed information into in-memory meta data. + * + * @param userName the user name of the volume. + * @param volumeName the name of the volume. + * @param volumeSize the size of the volume. + * @param blockSize the block size of the volume. + * @throws CBlockException when the volume can not be created. + */ + public synchronized void createVolume(String userName, String volumeName, + long volumeSize, int blockSize) throws CBlockException { + LOGGER.debug("createVolume:" + userName + ":" + volumeName); + if (user2VolumeMap.containsKey(userName) + && user2VolumeMap.get(userName).containsKey(volumeName)) { + throw new CBlockException("Volume already exist for " + + userName + ":" + volumeName); + } + if (volumeSize < blockSize) { + throw new CBlockException("Volume size smaller than block size? " + + "volume size:" + volumeSize + " block size:" + blockSize); + } + VolumeDescriptor volume; + int containerIdx = 0; + try { + volume = new VolumeDescriptor(userName, volumeName, + volumeSize, blockSize); + long allocatedSize = 0; + ArrayList containerIds = new ArrayList<>(); + while (allocatedSize < volumeSize) { + ContainerDescriptor container = storageClient.createContainer(); + container.setContainerIndex(containerIdx); + volume.addContainer(container); + containerIds.add(container.getContainerID()); + allocatedSize += containerSizeB; + containerIdx += 1; + } + volume.setContainerIDs(containerIds); + } catch (IOException e) { + throw new CBlockException("Error when creating volume:" + e.getMessage()); + // TODO : delete already created containers? or re-try policy + } + makeVolumeReady(userName, volumeName, volume); + } + + /** + * Called by CBlock server to delete a specific volume. Mainly + * to check whether it can be deleted, and remove it from in-memory meta + * data. + * + * @param userName the user name of the volume. + * @param volumeName the name of the volume. + * @param force if set to false, only delete volume it is empty, otherwise + * throw exception. if set to true, delete regardless. + * @throws CBlockException when the volume can not be deleted. + */ + public synchronized void deleteVolume(String userName, String volumeName, + boolean force) throws CBlockException { + if (!user2VolumeMap.containsKey(userName) + || !user2VolumeMap.get(userName).containsKey(volumeName)) { + throw new CBlockException("Deleting non-exist volume " + + userName + ":" + volumeName); + } + if (!force && !user2VolumeMap.get(userName).get(volumeName).isEmpty()) { + throw new CBlockException("Deleting a non-empty volume without force!"); + } + VolumeDescriptor volume = user2VolumeMap.get(userName).remove(volumeName); + for (String containerID : volume.getContainerIDsList()) { + try { + storageClient.deleteContainer(containerID); + } catch (IOException e) { + LOGGER.error("Error deleting container Container:{} error:{}", + containerID, e); + throw new CBlockException(e.getMessage()); + } + } + if (user2VolumeMap.get(userName).size() == 0) { + user2VolumeMap.remove(userName); + } + } + + /** + * Called by CBlock server to get information of a specific volume. + * + * @param userName the user name of the volume. + * @param volumeName the name of the volume. + * @return a {@link VolumeInfo} object encapsulating the information of the + * volume. + * @throws CBlockException when the information can not be retrieved. + */ + public synchronized VolumeInfo infoVolume(String userName, String volumeName) + throws CBlockException { + if (!user2VolumeMap.containsKey(userName) + || !user2VolumeMap.get(userName).containsKey(volumeName)) { + throw new CBlockException("Getting info for non-exist volume " + + userName + ":" + volumeName); + } + return user2VolumeMap.get(userName).get(volumeName).getInfo(); + } + + /** + * Called by CBlock server to check whether the given volume can be + * mounted, i.e. whether it can be found in the meta data. + * + * return a {@link MountVolumeResponse} with isValid flag to indicate + * whether the volume can be mounted or not. + * + * @param userName the user name of the volume. + * @param volumeName the name of the volume + * @return a {@link MountVolumeResponse} object encapsulating whether the + * volume is valid, and if yes, the requried information for client to + * read/write the volume. + */ + public synchronized MountVolumeResponse isVolumeValid( + String userName, String volumeName) { + if (!user2VolumeMap.containsKey(userName) + || !user2VolumeMap.get(userName).containsKey(volumeName)) { + // in the case of invalid volume, no need to set any value other than + // isValid flag. + return new MountVolumeResponse(false, null, null, 0, 0, null); + } + VolumeDescriptor volume = user2VolumeMap.get(userName).get(volumeName); + return new MountVolumeResponse(true, userName, + volumeName, volume.getVolumeSize(), volume.getBlockSize(), + volume.getContainerIDsList()); + } + + /** + * Called by CBlock manager to list all volumes. + * + * @param userName the userName whose volume to be listed, if set to null, + * all volumes will be listed. + * @return a list of {@link VolumeDescriptor} representing all volumes + * requested. + */ + public synchronized List getAllVolume(String userName) { + ArrayList allVolumes = new ArrayList<>(); + if (userName == null) { + for (Map.Entry> entry + : user2VolumeMap.entrySet()) { + allVolumes.addAll(entry.getValue().values()); + } + } else { + if (user2VolumeMap.containsKey(userName)) { + allVolumes.addAll(user2VolumeMap.get(userName).values()); + } + } + return allVolumes; + } + + /** + * Only for testing the behavior of create/delete volumes. + */ + @VisibleForTesting + public VolumeDescriptor getVolume(String userName, String volumeName) { + if (!user2VolumeMap.containsKey(userName) + || !user2VolumeMap.get(userName).containsKey(volumeName)) { + return null; + } + return user2VolumeMap.get(userName).get(volumeName); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/package-info.java new file mode 100644 index 00000000000..4426e6d8ad3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.storage; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServer.java new file mode 100644 index 00000000000..dfaa3d23807 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServer.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock; + +import org.apache.hadoop.cblock.meta.VolumeInfo; +import org.apache.hadoop.cblock.storage.IStorageClient; +import org.apache.hadoop.cblock.util.MockStorageClient; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.HashSet; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * This class tests the basics of CBlock server. Mainly about the four + * operations on volumes: create, delete, info and list. + */ +public class TestCBlockServer { + private static CBlockManager cBlockManager; + private static CBlockConfiguration conf; + + @BeforeClass + public static void setup() throws Exception { + IStorageClient storageClient = new MockStorageClient(); + conf = new CBlockConfiguration(); + cBlockManager = new CBlockManager(conf, storageClient); + } + + /** + * Test create volume for different users. + * @throws Exception + */ + @Test + public void testCreateVolume() throws Exception { + String userName1 = "testCreateUser1"; + String userName2 = "testCreateUser2"; + String volumeName1 = "testVolume1"; + String volumeName2 = "testVolume2"; + long volumeSize = 1L*1024*1024; + int blockSize = 4096; + cBlockManager.createVolume(userName1, volumeName1, volumeSize, blockSize); + List volumes = cBlockManager.listVolume(userName1); + assertEquals(1, volumes.size()); + VolumeInfo existingVolume = volumes.get(0); + assertEquals(userName1, existingVolume.getUserName()); + assertEquals(volumeName1, existingVolume.getVolumeName()); + assertEquals(volumeSize, existingVolume.getVolumeSize()); + assertEquals(blockSize, existingVolume.getBlockSize()); + + cBlockManager.createVolume(userName1, volumeName2, volumeSize, blockSize); + cBlockManager.createVolume(userName2, volumeName1, volumeSize, blockSize); + volumes = cBlockManager.listVolume(userName1); + assertEquals(2, volumes.size()); + volumes = cBlockManager.listVolume(userName2); + assertEquals(1, volumes.size()); + } + + /** + * Test delete volume. + * @throws Exception + */ + @Test + public void testDeleteVolume() throws Exception { + String userName = "testDeleteUser"; + String volumeName1 = "testVolume1"; + String volumeName2 = "testVolume2"; + long volumeSize = 1L*1024*1024; + int blockSize = 4096; + cBlockManager.createVolume(userName, volumeName1, volumeSize, blockSize); + cBlockManager.createVolume(userName, volumeName2, volumeSize, blockSize); + cBlockManager.deleteVolume(userName, volumeName1, true); + List volumes = cBlockManager.listVolume(userName); + assertEquals(1, volumes.size()); + + VolumeInfo existingVolume = volumes.get(0); + assertEquals(userName, existingVolume.getUserName()); + assertEquals(volumeName2, existingVolume.getVolumeName()); + assertEquals(volumeSize, existingVolume.getVolumeSize()); + assertEquals(blockSize, existingVolume.getBlockSize()); + } + + /** + * Test info volume. + * + * TODO : usage field is not being tested (as it is not implemented yet) + * @throws Exception + */ + @Test + public void testInfoVolume() throws Exception { + String userName = "testInfoUser"; + String volumeName = "testVolume"; + long volumeSize = 1L*1024*1024; + int blockSize = 4096; + cBlockManager.createVolume(userName, volumeName, volumeSize, blockSize); + VolumeInfo info = cBlockManager.infoVolume(userName, volumeName); + assertEquals(userName, info.getUserName()); + assertEquals(volumeName, info.getVolumeName()); + assertEquals(volumeSize, info.getVolumeSize()); + assertEquals(blockSize, info.getBlockSize()); + } + + /** + * Test listing a number of volumes. + * @throws Exception + */ + @Test + public void testListVolume() throws Exception { + String userName = "testListUser"; + String volumeName = "testVolume"; + long volumeSize = 1L*1024*1024; + int blockSize = 4096; + int volumeNum = 100; + for (int i = 0; i volumes = cBlockManager.listVolume(userName); + assertEquals(volumeNum, volumes.size()); + HashSet volumeIds = new HashSet<>(); + for (int i = 0; i + containers = new HashMap<>(); + + /** + * Return an *existing* container with given Id. + * + * TODO : for testing purpose, return a new container if the given Id + * is not found + * + * found + * @param containerID + * @return + */ + public static ContainerDescriptor lookUp(String containerID) { + if (!containers.containsKey(containerID)) { + System.err.println("A container id never seen, return a new one " + + "for testing purpose:" + containerID); + containers.put(containerID, new ContainerDescriptor(containerID)); + } + return containers.get(containerID); + } + + public static void addContainer(String containerID) { + containers.put(containerID, new ContainerDescriptor(containerID)); + } + + private ContainerLookUpService() { + + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java new file mode 100644 index 00000000000..0fce6a1fae5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.util; + +import org.apache.hadoop.cblock.meta.ContainerDescriptor; +import org.apache.hadoop.cblock.storage.IStorageClient; + +import java.io.IOException; + +/** + * This class is the one that directly talks to SCM server. + * + * NOTE : this is only a mock class, only to allow testing volume + * creation without actually creating containers. In real world, need to be + * replaced with actual container look up calls. + * + */ +public class MockStorageClient implements IStorageClient { + private static long currentContainerId = -1; + + /** + * Ask SCM to get a exclusive container. + * + * @return A container descriptor object to locate this container + * @throws Exception + */ + public ContainerDescriptor createContainer() throws IOException { + currentContainerId += 1; + ContainerLookUpService.addContainer(Long.toString(currentContainerId)); + return ContainerLookUpService.lookUp(Long.toString(currentContainerId)); + } + + /** + * As this is only a testing class, with all "container" maintained in + * memory, no need to really delete anything for now. + * @param containerId + * @throws IOException + */ + @Override + public void deleteContainer(String containerId) throws IOException { + + } + + /** + * Return reference to an *existing* container with given ID. + * + * @param containerId + * @return + * @throws IOException + */ + public ContainerDescriptor getContainer(String containerId) + throws IOException { + return ContainerLookUpService.lookUp(containerId); + } + + @Override + public long getContainerSize() throws IOException { + // just return a constant value for now + return 5L*1024*1024*1024; // 5GB + } +}