HDFS-13017. Block Storage: implement simple iscsi discovery in jscsi server. Contributed by Elek, Marton.
This commit is contained in:
parent
ba4d5a52a8
commit
4bb9ad8e8f
|
@ -246,6 +246,11 @@ public class CBlockManager implements CBlockServiceProtocol,
|
||||||
return storageManager.isVolumeValid(userName, volumeName);
|
return storageManager.isVolumeValid(userName, volumeName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<VolumeInfo> listVolumes() throws IOException {
|
||||||
|
return listVolume(null);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void createVolume(String userName, String volumeName,
|
public synchronized void createVolume(String userName, String volumeName,
|
||||||
long volumeSize, int blockSize) throws IOException {
|
long volumeSize, int blockSize) throws IOException {
|
||||||
|
|
|
@ -20,9 +20,21 @@ package org.apache.hadoop.cblock.jscsiHelper;
|
||||||
import com.google.common.primitives.Longs;
|
import com.google.common.primitives.Longs;
|
||||||
import com.google.protobuf.ServiceException;
|
import com.google.protobuf.ServiceException;
|
||||||
import org.apache.hadoop.cblock.exception.CBlockException;
|
import org.apache.hadoop.cblock.exception.CBlockException;
|
||||||
|
import org.apache.hadoop.cblock.meta.VolumeInfo;
|
||||||
import org.apache.hadoop.cblock.proto.CBlockClientProtocol;
|
import org.apache.hadoop.cblock.proto.CBlockClientProtocol;
|
||||||
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
|
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
|
||||||
import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos;
|
import org.apache.hadoop.cblock.protocol.proto
|
||||||
|
.CBlockClientServerProtocolProtos.ContainerIDProto;
|
||||||
|
import org.apache.hadoop.cblock.protocol.proto
|
||||||
|
.CBlockClientServerProtocolProtos.ListVolumesRequestProto;
|
||||||
|
import org.apache.hadoop.cblock.protocol.proto
|
||||||
|
.CBlockClientServerProtocolProtos.ListVolumesResponseProto;
|
||||||
|
import org.apache.hadoop.cblock.protocol.proto
|
||||||
|
.CBlockClientServerProtocolProtos.MountVolumeRequestProto;
|
||||||
|
import org.apache.hadoop.cblock.protocol.proto
|
||||||
|
.CBlockClientServerProtocolProtos.MountVolumeResponseProto;
|
||||||
|
import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos
|
||||||
|
.VolumeInfoProto;
|
||||||
import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolPB;
|
import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolPB;
|
||||||
import org.apache.hadoop.ipc.ProtobufHelper;
|
import org.apache.hadoop.ipc.ProtobufHelper;
|
||||||
import org.apache.hadoop.ipc.ProtocolTranslator;
|
import org.apache.hadoop.ipc.ProtocolTranslator;
|
||||||
|
@ -69,14 +81,14 @@ public class CBlockClientProtocolClientSideTranslatorPB
|
||||||
@Override
|
@Override
|
||||||
public MountVolumeResponse mountVolume(
|
public MountVolumeResponse mountVolume(
|
||||||
String userName, String volumeName) throws IOException {
|
String userName, String volumeName) throws IOException {
|
||||||
CBlockClientServerProtocolProtos.MountVolumeRequestProto.Builder
|
MountVolumeRequestProto.Builder
|
||||||
request
|
request
|
||||||
= CBlockClientServerProtocolProtos.MountVolumeRequestProto
|
= MountVolumeRequestProto
|
||||||
.newBuilder();
|
.newBuilder();
|
||||||
request.setUserName(userName);
|
request.setUserName(userName);
|
||||||
request.setVolumeName(volumeName);
|
request.setVolumeName(volumeName);
|
||||||
try {
|
try {
|
||||||
CBlockClientServerProtocolProtos.MountVolumeResponseProto resp
|
MountVolumeResponseProto resp
|
||||||
= rpcProxy.mountVolume(null, request.build());
|
= rpcProxy.mountVolume(null, request.build());
|
||||||
if (!resp.getIsValid()) {
|
if (!resp.getIsValid()) {
|
||||||
throw new CBlockException(
|
throw new CBlockException(
|
||||||
|
@ -87,7 +99,7 @@ public class CBlockClientProtocolClientSideTranslatorPB
|
||||||
if (resp.getAllContainerIDsList().size() == 0) {
|
if (resp.getAllContainerIDsList().size() == 0) {
|
||||||
throw new CBlockException("Mount volume request returned no container");
|
throw new CBlockException("Mount volume request returned no container");
|
||||||
}
|
}
|
||||||
for (CBlockClientServerProtocolProtos.ContainerIDProto containerID :
|
for (ContainerIDProto containerID :
|
||||||
resp.getAllContainerIDsList()) {
|
resp.getAllContainerIDsList()) {
|
||||||
if (containerID.hasPipeline()) {
|
if (containerID.hasPipeline()) {
|
||||||
// it should always have a pipeline only except for tests.
|
// it should always have a pipeline only except for tests.
|
||||||
|
@ -111,4 +123,25 @@ public class CBlockClientProtocolClientSideTranslatorPB
|
||||||
throw ProtobufHelper.getRemoteException(e);
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<VolumeInfo> listVolumes() throws IOException {
|
||||||
|
try {
|
||||||
|
List<VolumeInfo> result = new ArrayList<>();
|
||||||
|
ListVolumesResponseProto
|
||||||
|
listVolumesResponseProto = this.rpcProxy.listVolumes(null,
|
||||||
|
ListVolumesRequestProto.newBuilder()
|
||||||
|
.build());
|
||||||
|
for (VolumeInfoProto volumeInfoProto :
|
||||||
|
listVolumesResponseProto
|
||||||
|
.getVolumeEntryList()) {
|
||||||
|
result.add(new VolumeInfo(volumeInfoProto.getUserName(),
|
||||||
|
volumeInfoProto.getVolumeName(), volumeInfoProto.getVolumeSize(),
|
||||||
|
volumeInfoProto.getBlockSize()));
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,9 +17,11 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.cblock.jscsiHelper;
|
package org.apache.hadoop.cblock.jscsiHelper;
|
||||||
|
|
||||||
|
import org.apache.hadoop.cblock.meta.VolumeInfo;
|
||||||
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
|
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class is the handler of CBlockManager used by target server
|
* This class is the handler of CBlockManager used by target server
|
||||||
|
@ -41,4 +43,8 @@ public class CBlockManagerHandler {
|
||||||
String userName, String volumeName) throws IOException {
|
String userName, String volumeName) throws IOException {
|
||||||
return handler.mountVolume(userName, volumeName);
|
return handler.mountVolume(userName, volumeName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public List<VolumeInfo> listVolumes() throws IOException {
|
||||||
|
return handler.listVolumes();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -104,6 +104,23 @@ public final class CBlockTargetServer extends TargetServer {
|
||||||
return targets.containsKey(checkTargetName);
|
return targets.containsKey(checkTargetName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String[] getTargetNames() {
|
||||||
|
try {
|
||||||
|
if (cBlockManagerHandler != null) {
|
||||||
|
return cBlockManagerHandler.listVolumes().
|
||||||
|
stream().map(
|
||||||
|
volumeInfo -> volumeInfo.getUserName() + ":" + volumeInfo
|
||||||
|
.getVolumeName()).toArray(String[]::new);
|
||||||
|
} else {
|
||||||
|
return new String[0];
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOGGER.error("Can't list existing volumes", e);
|
||||||
|
return new String[0];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public HashMap<String, Target> getTargets() {
|
public HashMap<String, Target> getTargets() {
|
||||||
return targets;
|
return targets;
|
||||||
|
|
|
@ -17,7 +17,10 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.cblock.proto;
|
package org.apache.hadoop.cblock.proto;
|
||||||
|
|
||||||
|
import org.apache.hadoop.cblock.meta.VolumeInfo;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The protocol that CBlock client side uses to talk to server side. CBlock
|
* The protocol that CBlock client side uses to talk to server side. CBlock
|
||||||
|
@ -30,4 +33,6 @@ import java.io.IOException;
|
||||||
public interface CBlockClientProtocol {
|
public interface CBlockClientProtocol {
|
||||||
MountVolumeResponse mountVolume(String userName, String volumeName)
|
MountVolumeResponse mountVolume(String userName, String volumeName)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
List<VolumeInfo> listVolumes() throws IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,15 +19,18 @@ package org.apache.hadoop.cblock.protocolPB;
|
||||||
|
|
||||||
import com.google.protobuf.RpcController;
|
import com.google.protobuf.RpcController;
|
||||||
import com.google.protobuf.ServiceException;
|
import com.google.protobuf.ServiceException;
|
||||||
|
import org.apache.hadoop.cblock.meta.VolumeInfo;
|
||||||
import org.apache.hadoop.cblock.proto.CBlockClientProtocol;
|
import org.apache.hadoop.cblock.proto.CBlockClientProtocol;
|
||||||
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
|
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
|
||||||
import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos;
|
import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos;
|
||||||
|
import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The server side implementation of cblock client to server protocol.
|
* The server side implementation of cblock client to server protocol.
|
||||||
|
@ -83,4 +86,31 @@ public class CBlockClientServerProtocolServerSideTranslatorPB implements
|
||||||
}
|
}
|
||||||
return resp.build();
|
return resp.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CBlockClientServerProtocolProtos.ListVolumesResponseProto listVolumes(
|
||||||
|
RpcController controller,
|
||||||
|
CBlockClientServerProtocolProtos.ListVolumesRequestProto request)
|
||||||
|
throws ServiceException {
|
||||||
|
try {
|
||||||
|
CBlockClientServerProtocolProtos.ListVolumesResponseProto.Builder resp =
|
||||||
|
CBlockClientServerProtocolProtos.ListVolumesResponseProto
|
||||||
|
.newBuilder();
|
||||||
|
List<VolumeInfo> volumeInfos = impl.listVolumes();
|
||||||
|
List<CBlockServiceProtocolProtos.VolumeInfoProto> convertedInfos =
|
||||||
|
volumeInfos.stream().map(
|
||||||
|
volumeInfo -> CBlockServiceProtocolProtos.VolumeInfoProto
|
||||||
|
.newBuilder().setUserName(volumeInfo.getUserName())
|
||||||
|
.setBlockSize(volumeInfo.getBlockSize())
|
||||||
|
.setVolumeName(volumeInfo.getVolumeName())
|
||||||
|
.setVolumeSize(volumeInfo.getVolumeSize())
|
||||||
|
.setUsage(volumeInfo.getUsage()).build())
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
resp.addAllVolumeEntry(convertedInfos);
|
||||||
|
return resp.build();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,7 @@ option java_generate_equals_and_hash = true;
|
||||||
package hadoop.cblock;
|
package hadoop.cblock;
|
||||||
|
|
||||||
import "Ozone.proto";
|
import "Ozone.proto";
|
||||||
|
import "CBlockServiceProtocol.proto";
|
||||||
/**
|
/**
|
||||||
* This message is sent from CBlock client side to CBlock server to
|
* This message is sent from CBlock client side to CBlock server to
|
||||||
* mount a volume specified by owner name and volume name.
|
* mount a volume specified by owner name and volume name.
|
||||||
|
@ -72,9 +72,22 @@ message ContainerIDProto {
|
||||||
optional hadoop.hdfs.ozone.Pipeline pipeline = 3;
|
optional hadoop.hdfs.ozone.Pipeline pipeline = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
message ListVolumesRequestProto {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
message ListVolumesResponseProto {
|
||||||
|
repeated VolumeInfoProto volumeEntry = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
service CBlockClientServerProtocolService {
|
service CBlockClientServerProtocolService {
|
||||||
/**
|
/**
|
||||||
* mount the volume.
|
* mount the volume.
|
||||||
*/
|
*/
|
||||||
rpc mountVolume(MountVolumeRequestProto) returns (MountVolumeResponseProto);
|
rpc mountVolume(MountVolumeRequestProto) returns (MountVolumeResponseProto);
|
||||||
|
|
||||||
|
rpc listVolumes(ListVolumesRequestProto) returns(ListVolumesResponseProto);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,12 +22,13 @@ import org.apache.hadoop.cblock.meta.VolumeInfo;
|
||||||
import org.apache.hadoop.scm.client.ScmClient;
|
import org.apache.hadoop.scm.client.ScmClient;
|
||||||
import org.apache.hadoop.cblock.util.MockStorageClient;
|
import org.apache.hadoop.cblock.util.MockStorageClient;
|
||||||
import org.apache.hadoop.conf.OzoneConfiguration;
|
import org.apache.hadoop.conf.OzoneConfiguration;
|
||||||
import org.junit.AfterClass;
|
import org.junit.After;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
.DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
|
.DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
|
||||||
|
@ -45,8 +46,8 @@ public class TestCBlockServer {
|
||||||
private static CBlockManager cBlockManager;
|
private static CBlockManager cBlockManager;
|
||||||
private static OzoneConfiguration conf;
|
private static OzoneConfiguration conf;
|
||||||
|
|
||||||
@BeforeClass
|
@Before
|
||||||
public static void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
ScmClient storageClient = new MockStorageClient();
|
ScmClient storageClient = new MockStorageClient();
|
||||||
conf = new OzoneConfiguration();
|
conf = new OzoneConfiguration();
|
||||||
conf.set(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, "127.0.0.1:0");
|
conf.set(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, "127.0.0.1:0");
|
||||||
|
@ -55,8 +56,8 @@ public class TestCBlockServer {
|
||||||
cBlockManager.start();
|
cBlockManager.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@After
|
||||||
public static void clean() {
|
public void clean() {
|
||||||
cBlockManager.stop();
|
cBlockManager.stop();
|
||||||
cBlockManager.join();
|
cBlockManager.join();
|
||||||
cBlockManager.clean();
|
cBlockManager.clean();
|
||||||
|
@ -152,7 +153,7 @@ public class TestCBlockServer {
|
||||||
}
|
}
|
||||||
List<VolumeInfo> volumes = cBlockManager.listVolume(userName);
|
List<VolumeInfo> volumes = cBlockManager.listVolume(userName);
|
||||||
assertEquals(volumeNum, volumes.size());
|
assertEquals(volumeNum, volumes.size());
|
||||||
HashSet<String> volumeIds = new HashSet<>();
|
Set<String> volumeIds = new HashSet<>();
|
||||||
for (int i = 0; i<volumeNum; i++) {
|
for (int i = 0; i<volumeNum; i++) {
|
||||||
VolumeInfo volumeInfo = volumes.get(i);
|
VolumeInfo volumeInfo = volumes.get(i);
|
||||||
assertEquals(userName, volumeInfo.getUserName());
|
assertEquals(userName, volumeInfo.getUserName());
|
||||||
|
@ -165,4 +166,47 @@ public class TestCBlockServer {
|
||||||
assertTrue(volumeIds.contains(volumeName + i));
|
assertTrue(volumeIds.contains(volumeName + i));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test listing a number of volumes.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testListVolumes() throws Exception {
|
||||||
|
String volumeName ="volume" + RandomStringUtils.randomNumeric(5);
|
||||||
|
long volumeSize = 1L*1024*1024;
|
||||||
|
int blockSize = 4096;
|
||||||
|
int volumeNum = 100;
|
||||||
|
int userCount = 10;
|
||||||
|
|
||||||
|
assertTrue("We need at least one volume for each user",
|
||||||
|
userCount < volumeNum);
|
||||||
|
|
||||||
|
for (int i = 0; i<volumeNum; i++) {
|
||||||
|
String userName =
|
||||||
|
"user-" + (i % userCount);
|
||||||
|
cBlockManager.createVolume(userName, volumeName + i,
|
||||||
|
volumeSize, blockSize);
|
||||||
|
}
|
||||||
|
List<VolumeInfo> allVolumes = cBlockManager.listVolumes();
|
||||||
|
//check if we have the volumes from all the users.
|
||||||
|
|
||||||
|
Set<String> volumeIds = new HashSet<>();
|
||||||
|
Set<String> usernames = new HashSet<>();
|
||||||
|
for (int i = 0; i < allVolumes.size(); i++) {
|
||||||
|
VolumeInfo volumeInfo = allVolumes.get(i);
|
||||||
|
assertFalse(volumeIds.contains(volumeName + i));
|
||||||
|
usernames.add(volumeInfo.getUserName());
|
||||||
|
volumeIds.add(volumeName + i);
|
||||||
|
assertEquals(volumeSize, volumeInfo.getVolumeSize());
|
||||||
|
assertEquals(blockSize, volumeInfo.getBlockSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(volumeNum, volumeIds.size());
|
||||||
|
for (int i = 0; i<volumeNum; i++) {
|
||||||
|
assertTrue(volumeIds.contains(volumeName + i));
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(userCount, usernames.size());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue