HDFS-11582. Block Storage : add SCSI target access daemon. Contributed by Chen Liang.

This commit is contained in:
Anu Engineer 2017-04-13 13:25:53 -07:00 committed by Owen O'Malley
parent 7d4a2d4104
commit 783b7c27b1
10 changed files with 412 additions and 7 deletions

View File

@ -50,8 +50,11 @@ public final class ScmConfigKeys {
public static final int OZONE_SCM_CHUNK_SIZE_DEFAULT = 1 * 1024 * 1024;
public static final int OZONE_SCM_CHUNK_MAX_SIZE = 1 * 1024 * 1024;
public static final String OZONE_SCM_CLIENT_PORT_KEY =
"ozone.scm.client.port";
public static final int OZONE_SCM_CLIENT_PORT_DEFAULT = 9860;
public static final String OZONE_SCM_DATANODE_PORT_KEY =
"ozone.scm.datanode.port";
public static final int OZONE_SCM_DATANODE_PORT_DEFAULT = 9861;
public static final String OZONE_SCM_CLIENT_ADDRESS_KEY =

View File

@ -132,6 +132,20 @@ public final class CBlockConfigKeys {
"dfs.cblock.cache.block.buffer.size";
public static final int DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT = 512;
// jscsi server settings
public static final String DFS_CBLOCK_JSCSI_SERVER_ADDRESS_KEY =
"dfs.cblock.jscsi.server.address";
public static final String DFS_CBLOCK_JSCSI_SERVER_ADDRESS_DEFAULT =
"127.0.0.1";
public static final String DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_KEY =
"dfs.cblock.jscsi.cblock.server.address";
public static final String DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_DEFAULT =
"127.0.0.1";
public static final String DFS_CBLOCK_CONTAINER_SIZE_GB_KEY =
"dfs.cblock.container.size.gb";
public static final int DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT = 5;
private CBlockConfigKeys() {
}

View File

@ -21,7 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
import org.apache.hadoop.cblock.meta.VolumeDescriptor;
import org.apache.hadoop.cblock.meta.VolumeInfo;
import org.apache.hadoop.cblock.proto.CBlockClientServerProtocol;
import org.apache.hadoop.cblock.proto.CBlockClientProtocol;
import org.apache.hadoop.cblock.proto.CBlockServiceProtocol;
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos;
@ -74,7 +74,7 @@ import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICE_LEVEL
*
*/
public class CBlockManager implements CBlockServiceProtocol,
CBlockClientServerProtocol {
CBlockClientProtocol {
private static final Logger LOG =
LoggerFactory.getLogger(CBlockManager.class);

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.jscsiHelper;
import com.google.common.primitives.Longs;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.cblock.exception.CBlockException;
import org.apache.hadoop.cblock.proto.CBlockClientProtocol;
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos;
import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolPB;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/**
* The client side of CBlockClientProtocol.
*
* CBlockClientProtocol is the protocol used between cblock client side
* and cblock manager (cblock client side is just the node where jscsi daemon
* process runs. a machines talks to jscsi daemon for mounting a volume).
*
* Right now, the only communication carried by this protocol is for client side
* to request mounting a volume.
*/
public class CBlockClientProtocolClientSideTranslatorPB
implements CBlockClientProtocol, ProtocolTranslator, Closeable {
private final CBlockClientServerProtocolPB rpcProxy;
public CBlockClientProtocolClientSideTranslatorPB(
CBlockClientServerProtocolPB rpcProxy) {
this.rpcProxy = rpcProxy;
}
@Override
public void close() throws IOException {
RPC.stopProxy(rpcProxy);
}
@Override
public Object getUnderlyingProxyObject() {
return rpcProxy;
}
@Override
public MountVolumeResponse mountVolume(
String userName, String volumeName) throws IOException {
CBlockClientServerProtocolProtos.MountVolumeRequestProto.Builder
request
= CBlockClientServerProtocolProtos.MountVolumeRequestProto
.newBuilder();
request.setUserName(userName);
request.setVolumeName(volumeName);
try {
CBlockClientServerProtocolProtos.MountVolumeResponseProto resp
= rpcProxy.mountVolume(null, request.build());
if (!resp.getIsValid()) {
throw new CBlockException(
"Not a valid volume:" + userName + ":" + volumeName);
}
List<Pipeline> containerIDs = new ArrayList<>();
HashMap<String, Pipeline> containerPipelines = new HashMap<>();
if (resp.getAllContainerIDsList().size() == 0) {
throw new CBlockException("Mount volume request returned no container");
}
for (CBlockClientServerProtocolProtos.ContainerIDProto containerID :
resp.getAllContainerIDsList()) {
if (containerID.hasPipeline()) {
// it should always have a pipeline only except for tests.
Pipeline p = Pipeline.getFromProtoBuf(containerID.getPipeline());
p.setData(Longs.toByteArray(containerID.getIndex()));
containerIDs.add(p);
containerPipelines.put(containerID.getContainerID(), p);
} else {
throw new CBlockException("ContainerID does not have pipeline!");
}
}
return new MountVolumeResponse(
resp.getIsValid(),
resp.getUserName(),
resp.getVolumeName(),
resp.getVolumeSize(),
resp.getBlockSize(),
containerIDs,
containerPipelines);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.jscsiHelper;
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
import java.io.IOException;
/**
* This class is the handler of CBlockManager used by target server
* to communicate with CBlockManager.
*
* More specifically, this class will expose local methods to target
* server, and make RPC calls to CBlockManager accordingly
*/
public class CBlockManagerHandler {
private final CBlockClientProtocolClientSideTranslatorPB handler;
public CBlockManagerHandler(
CBlockClientProtocolClientSideTranslatorPB handler) {
this.handler = handler;
}
public MountVolumeResponse mountVolume(
String userName, String volumeName) throws IOException {
return handler.mountVolume(userName, volumeName);
}
}

View File

@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.jscsiHelper;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
import org.apache.hadoop.cblock.util.KeyUtil;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.scm.XceiverClientManager;
import org.jscsi.target.Configuration;
import org.jscsi.target.Target;
import org.jscsi.target.TargetServer;
import java.io.IOException;
import java.util.HashMap;
/**
* This class extends JSCSI target server, which is a ISCSI target that can be
* recognized by a remote machine with ISCSI installed.
*/
public final class CBlockTargetServer extends TargetServer {
private final OzoneConfiguration conf;
private final CBlockManagerHandler cBlockManagerHandler;
private final XceiverClientManager xceiverClientManager;
private final ContainerCacheFlusher containerCacheFlusher;
private final CBlockTargetMetrics metrics;
public CBlockTargetServer(OzoneConfiguration ozoneConfig,
Configuration jscsiConf,
CBlockManagerHandler cBlockManagerHandler,
CBlockTargetMetrics metrics)
throws IOException {
super(jscsiConf);
this.cBlockManagerHandler = cBlockManagerHandler;
this.xceiverClientManager = new XceiverClientManager(ozoneConfig);
this.conf = ozoneConfig;
this.containerCacheFlusher = new ContainerCacheFlusher(this.conf,
xceiverClientManager, metrics);
this.metrics = metrics;
LOGGER.info("Starting flusher thread.");
Thread flushListenerThread = new Thread(containerCacheFlusher);
flushListenerThread.setDaemon(true);
flushListenerThread.start();
}
public static void main(String[] args) throws Exception {
}
@Override
public boolean isValidTargetName(String checkTargetName) {
if (!KeyUtil.isValidVolumeKey(checkTargetName)) {
return false;
}
String userName = KeyUtil.getUserNameFromVolumeKey(checkTargetName);
String volumeName = KeyUtil.getVolumeFromVolumeKey(checkTargetName);
if (userName == null || volumeName == null) {
return false;
}
try {
MountVolumeResponse result =
cBlockManagerHandler.mountVolume(userName, volumeName);
if (!result.getIsValid()) {
LOGGER.error("Not a valid volume:" + checkTargetName);
return false;
}
String volumeKey = KeyUtil.getVolumeKey(result.getUserName(),
result.getVolumeName());
if (!targets.containsKey(volumeKey)) {
LOGGER.info("Mounting Volume. username: {} volume:{}",
userName, volumeName);
CBlockIStorageImpl ozoneStore = CBlockIStorageImpl.newBuilder()
.setUserName(userName)
.setVolumeName(volumeName)
.setVolumeSize(result.getVolumeSize())
.setBlockSize(result.getBlockSize())
.setContainerList(result.getContainerList())
.setClientManager(xceiverClientManager)
.setConf(this.conf)
.setFlusher(containerCacheFlusher)
.setCBlockTargetMetrics(metrics)
.build();
Target target = new Target(volumeKey, volumeKey, ozoneStore);
targets.put(volumeKey, target);
}
} catch (IOException e) {
LOGGER.error("Can not connect to server when validating target!"
+ e.getMessage());
}
return targets.containsKey(checkTargetName);
}
@VisibleForTesting
public HashMap<String, Target> getTargets() {
return targets;
}
}

View File

@ -0,0 +1,107 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.jscsiHelper;
import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolPB;
import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.scm.client.ContainerOperationClient;
import org.apache.hadoop.security.UserGroupInformation;
import org.jscsi.target.Configuration;
import java.net.InetSocketAddress;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CONTAINER_SIZE_GB_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_PORT_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_SERVER_ADDRESS_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_SERVER_ADDRESS_KEY;
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY;
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_DEFAULT;
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_KEY;
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT;
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CLIENT_PORT_KEY;
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT;
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY;
/**
* This class runs the target server process.
*/
public final class SCSITargetDaemon {
public static void main(String[] args) throws Exception {
OzoneConfiguration ozoneConf = new OzoneConfiguration();
RPC.setProtocolEngine(ozoneConf, CBlockClientServerProtocolPB.class,
ProtobufRpcEngine.class);
long containerSizeGB = ozoneConf.getInt(DFS_CBLOCK_CONTAINER_SIZE_GB_KEY,
DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT);
ContainerOperationClient.setContainerSizeB(
containerSizeGB * OzoneConsts.GB);
String jscsiServerAddress = ozoneConf.get(
DFS_CBLOCK_JSCSI_SERVER_ADDRESS_KEY,
DFS_CBLOCK_JSCSI_SERVER_ADDRESS_DEFAULT);
String cbmIPAddress = ozoneConf.get(
DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_KEY,
DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_DEFAULT
);
String scmAddress = ozoneConf.get(OZONE_SCM_CLIENT_BIND_HOST_KEY,
OZONE_SCM_CLIENT_BIND_HOST_DEFAULT);
int scmClientPort = ozoneConf.getInt(OZONE_SCM_CLIENT_PORT_KEY,
OZONE_SCM_CLIENT_PORT_DEFAULT);
int scmDatanodePort = ozoneConf.getInt(OZONE_SCM_DATANODE_PORT_KEY,
OZONE_SCM_DATANODE_PORT_DEFAULT);
String scmClientAddress = scmAddress + ":" + scmClientPort;
String scmDataodeAddress = scmAddress + ":" + scmDatanodePort;
ozoneConf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, scmClientAddress);
ozoneConf.set(OZONE_SCM_DATANODE_ADDRESS_KEY, scmDataodeAddress);
InetSocketAddress cbmAddress = new InetSocketAddress(
cbmIPAddress, DFS_CBLOCK_JSCSI_PORT_DEFAULT);
long version = RPC.getProtocolVersion(
CBlockServiceProtocolPB.class);
CBlockClientProtocolClientSideTranslatorPB cbmClient =
new CBlockClientProtocolClientSideTranslatorPB(
RPC.getProxy(CBlockClientServerProtocolPB.class, version,
cbmAddress, UserGroupInformation.getCurrentUser(), ozoneConf,
NetUtils.getDefaultSocketFactory(ozoneConf), 5000)
);
CBlockManagerHandler cbmHandler = new CBlockManagerHandler(cbmClient);
Configuration jscsiConfig = new Configuration(jscsiServerAddress);
DefaultMetricsSystem.initialize("CBlockMetrics");
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
CBlockTargetServer targetServer = new CBlockTargetServer(
ozoneConf, jscsiConfig, cbmHandler, metrics);
targetServer.call();
}
private SCSITargetDaemon() {
}
}

View File

@ -27,7 +27,7 @@ import java.io.IOException;
* When users mount a volume on CBlock client, CBlock client side uses this
* protocol to send mount request to CBlock server.
*/
public interface CBlockClientServerProtocol {
public interface CBlockClientProtocol {
MountVolumeResponse mountVolume(String userName, String volumeName)
throws IOException;
}

View File

@ -19,7 +19,7 @@ package org.apache.hadoop.cblock.protocolPB;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.cblock.proto.CBlockClientServerProtocol;
import org.apache.hadoop.cblock.proto.CBlockClientProtocol;
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos;
import org.apache.hadoop.classification.InterfaceAudience;
@ -36,10 +36,10 @@ import java.util.List;
public class CBlockClientServerProtocolServerSideTranslatorPB implements
CBlockClientServerProtocolPB {
private final CBlockClientServerProtocol impl;
private final CBlockClientProtocol impl;
public CBlockClientServerProtocolServerSideTranslatorPB(
CBlockClientServerProtocol impl) {
CBlockClientProtocol impl) {
this.impl = impl;
}

View File

@ -31,6 +31,18 @@ public final class KeyUtil {
return getVolumeKey(userName, volumeName) + "#" + containerID;
}
public static String getUserNameFromVolumeKey(String key) {
return key.split(":")[0];
}
public static String getVolumeFromVolumeKey(String key) {
return key.split(":")[1];
}
public static boolean isValidVolumeKey(String key) {
return key.contains(":");
}
private KeyUtil() {
}