HDFS-13022. Block Storage: Kubernetes dynamic persistent volume provisioner. Contributed by Elek, Marton.

This commit is contained in:
Mukul Kumar Singh 2018-02-11 21:29:29 +05:30 committed by Owen O'Malley
parent ee5495456e
commit 377b31ffa1
11 changed files with 607 additions and 4 deletions

View File

@ -195,6 +195,26 @@ public final class CBlockConfigKeys {
public static final int DFS_CBLOCK_ISCSI_ADVERTISED_PORT_DEFAULT = 3260;
public static final String
DFS_CBLOCK_KUBERNETES_DYNAMIC_PROVISIONER_ENABLED
= "dfs.cblock.kubernetes.dynamic-provisioner.enabled";
public static final boolean
DFS_CBLOCK_KUBERNETES_DYNAMIC_PROVISIONER_ENABLED_DEFAULT = false;
public static final String
DFS_CBLOCK_KUBERNETES_CBLOCK_USER =
"dfs.cblock.kubernetes.cblock-user";
public static final String
DFS_CBLOCK_KUBERNETES_CBLOCK_USER_DEFAULT =
"iqn.2001-04.org.apache.hadoop";
public static final String
DFS_CBLOCK_KUBERNETES_CONFIG_FILE_KEY =
"dfs.cblock.kubernetes.configfile";
private CBlockConfigKeys() {
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.cblock;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
import org.apache.hadoop.cblock.kubernetes.DynamicProvisioner;
import org.apache.hadoop.cblock.meta.VolumeDescriptor;
import org.apache.hadoop.cblock.meta.VolumeInfo;
import org.apache.hadoop.cblock.proto.CBlockClientProtocol;
@ -62,7 +63,6 @@ import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT;
@ -92,6 +92,11 @@ import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_KUBERNETES_DYNAMIC_PROVISIONER_ENABLED;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_KUBERNETES_DYNAMIC_PROVISIONER_ENABLED_DEFAULT;
/**
* The main entry point of CBlock operations, ALL the CBlock operations
@ -119,6 +124,8 @@ public class CBlockManager implements CBlockServiceProtocol,
private final LevelDBStore levelDBStore;
private final String dbPath;
private final DynamicProvisioner kubernetesDynamicProvisioner;
private Charset encoding = Charset.forName("UTF-8");
public CBlockManager(OzoneConfiguration conf,
@ -179,17 +186,34 @@ public class CBlockManager implements CBlockServiceProtocol,
DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, serverRpcAddr, cblockServer);
LOG.info("CBlock server listening for client commands on: {}",
cblockServerRpcAddress);
if (conf.getBoolean(DFS_CBLOCK_KUBERNETES_DYNAMIC_PROVISIONER_ENABLED,
DFS_CBLOCK_KUBERNETES_DYNAMIC_PROVISIONER_ENABLED_DEFAULT)) {
kubernetesDynamicProvisioner =
new DynamicProvisioner(conf, storageManager);
kubernetesDynamicProvisioner.init();
} else {
kubernetesDynamicProvisioner = null;
}
}
public void start() {
cblockService.start();
cblockServer.start();
if (kubernetesDynamicProvisioner != null) {
kubernetesDynamicProvisioner.start();
}
LOG.info("CBlock manager started!");
}
public void stop() {
cblockService.stop();
cblockServer.stop();
if (kubernetesDynamicProvisioner != null) {
kubernetesDynamicProvisioner.stop();
}
}
public void join() {

View File

@ -208,7 +208,7 @@ public class CBlockCli extends Configured implements Tool {
System.exit(res);
}
private long parseSize(String volumeSizeArgs) throws IOException {
public static long parseSize(String volumeSizeArgs) throws IOException {
long multiplier = 1;
Pattern p = Pattern.compile("([0-9]+)([a-zA-Z]+)");
@ -221,9 +221,14 @@ public class CBlockCli extends Configured implements Tool {
int size = Integer.parseInt(m.group(1));
String s = m.group(2);
if (s.equalsIgnoreCase("GB")) {
if (s.equalsIgnoreCase("MB") ||
s.equalsIgnoreCase("Mi")) {
multiplier = 1024L * 1024;
} else if (s.equalsIgnoreCase("GB") ||
s.equalsIgnoreCase("Gi")) {
multiplier = 1024L * 1024 * 1024;
} else if (s.equalsIgnoreCase("TB")) {
} else if (s.equalsIgnoreCase("TB") ||
s.equalsIgnoreCase("Ti")) {
multiplier = 1024L * 1024 * 1024 * 1024;
} else {
throw new IOException("Invalid volume size args " + volumeSizeArgs);

View File

@ -0,0 +1,330 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.kubernetes;
import com.google.gson.reflect.TypeToken;
import com.squareup.okhttp.RequestBody;
import io.kubernetes.client.ApiClient;
import io.kubernetes.client.ApiException;
import io.kubernetes.client.Configuration;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.models.V1ISCSIVolumeSource;
import io.kubernetes.client.models.V1ObjectMeta;
import io.kubernetes.client.models.V1ObjectReference;
import io.kubernetes.client.models.V1PersistentVolume;
import io.kubernetes.client.models.V1PersistentVolumeClaim;
import io.kubernetes.client.models.V1PersistentVolumeSpec;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.util.Watch;
import okio.Buffer;
import org.apache.hadoop.cblock.cli.CBlockCli;
import org.apache.hadoop.cblock.exception.CBlockException;
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
import org.apache.hadoop.cblock.storage.StorageManager;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_ISCSI_ADVERTISED_IP;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_ISCSI_ADVERTISED_PORT;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_ISCSI_ADVERTISED_PORT_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_JSCSI_SERVER_ADDRESS_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_JSCSI_SERVER_ADDRESS_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_KUBERNETES_CBLOCK_USER;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_KUBERNETES_CBLOCK_USER_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_KUBERNETES_CONFIG_FILE_KEY;
/**
* Kubernetes Dynamic Persistent Volume provisioner.
*
* Listens on the kubernetes feed and creates the appropriate cblock AND
* kubernetes PersistentVolume according to the created PersistentVolumeClaims.
*/
public class DynamicProvisioner implements Runnable{
protected static final Logger LOGGER =
LoggerFactory.getLogger(DynamicProvisioner.class);
private static final String STORAGE_CLASS = "cblock";
private static final String PROVISIONER_ID = "hadoop.apache.org/cblock";
private static final String KUBERNETES_PROVISIONER_KEY =
"volume.beta.kubernetes.io/storage-provisioner";
private static final String KUBERNETES_BIND_COMPLETED_KEY =
"pv.kubernetes.io/bind-completed";
private boolean running = true;
private final StorageManager storageManager;
private String kubernetesConfigFile;
private String externalIp;
private int externalPort;
private String cblockUser;
private CoreV1Api api;
private ApiClient client;
private Thread watcherThread;
public DynamicProvisioner(OzoneConfiguration ozoneConf,
StorageManager storageManager) throws IOException {
this.storageManager = storageManager;
kubernetesConfigFile = ozoneConf
.getTrimmed(DFS_CBLOCK_KUBERNETES_CONFIG_FILE_KEY);
String jscsiServerAddress = ozoneConf
.get(DFS_CBLOCK_JSCSI_SERVER_ADDRESS_KEY,
DFS_CBLOCK_JSCSI_SERVER_ADDRESS_DEFAULT);
externalIp = ozoneConf.
getTrimmed(DFS_CBLOCK_ISCSI_ADVERTISED_IP, jscsiServerAddress);
externalPort = ozoneConf.
getInt(DFS_CBLOCK_ISCSI_ADVERTISED_PORT,
DFS_CBLOCK_ISCSI_ADVERTISED_PORT_DEFAULT);
cblockUser = ozoneConf.getTrimmed(DFS_CBLOCK_KUBERNETES_CBLOCK_USER,
DFS_CBLOCK_KUBERNETES_CBLOCK_USER_DEFAULT);
}
public void init() throws IOException {
if (kubernetesConfigFile != null) {
client = Config.fromConfig(kubernetesConfigFile);
} else {
client = Config.fromCluster();
}
client.getHttpClient().setReadTimeout(60, TimeUnit.SECONDS);
Configuration.setDefaultApiClient(client);
api = new CoreV1Api();
watcherThread = new Thread(this);
watcherThread.setName("DynamicProvisioner");
watcherThread.setDaemon(true);
}
@Override
public void run() {
LOGGER.info("Starting kubernetes dynamic provisioner.");
while (running) {
String resourceVersion = null;
try {
Watch<V1PersistentVolumeClaim> watch = Watch.createWatch(client,
api.listPersistentVolumeClaimForAllNamespacesCall(null,
null,
false,
null,
null,
null,
resourceVersion,
null,
true,
null,
null),
new TypeToken<Watch.Response<V1PersistentVolumeClaim>>() {
}.getType());
//check the new pvc resources, and create cblock + pv if needed
for (Watch.Response<V1PersistentVolumeClaim> item : watch) {
V1PersistentVolumeClaim claim = item.object;
if (isPvMissingForPvc(claim)) {
LOGGER.info("Provisioning volumes for PVC {}/{}",
claim.getMetadata().getNamespace(),
claim.getMetadata().getName());
if (LOGGER.isDebugEnabled()) {
RequestBody request =
api.getApiClient().serialize(claim, "application/json");
final Buffer buffer = new Buffer();
request.writeTo(buffer);
LOGGER.debug("New PVC is detected: " + buffer.readUtf8());
}
String volumeName = createVolumeName(claim);
long size = CBlockCli.parseSize(
claim.getSpec().getResources().getRequests().get("storage"));
createCBlock(volumeName, size);
createPersistentVolumeFromPVC(item.object, volumeName);
}
}
} catch (Exception ex) {
if (ex.getCause() != null && ex
.getCause() instanceof SocketTimeoutException) {
//This is normal. We are connection to the kubernetes server and the
//connection should be reopened time to time...
LOGGER.debug("Time exception occured", ex);
} else {
LOGGER.error("Error on provisioning persistent volumes.", ex);
try {
//we can try again in the main loop
Thread.sleep(1000);
} catch (InterruptedException e) {
LOGGER.error("Error on sleeping after an error.", e);
}
}
}
}
}
private boolean isPvMissingForPvc(V1PersistentVolumeClaim claim) {
Map<String, String> annotations = claim.getMetadata().getAnnotations();
return claim.getStatus().getPhase().equals("Pending") && STORAGE_CLASS
.equals(claim.getSpec().getStorageClassName()) && PROVISIONER_ID
.equals(annotations.get(KUBERNETES_PROVISIONER_KEY)) && !"yes"
.equals(annotations.get(KUBERNETES_BIND_COMPLETED_KEY));
}
@VisibleForTesting
protected String createVolumeName(V1PersistentVolumeClaim claim) {
return claim.getMetadata().getName() + "-" + claim.getMetadata()
.getUid();
}
public void stop() {
running = false;
try {
watcherThread.join(60000);
} catch (InterruptedException e) {
LOGGER.error("Kubernetes watcher thread can't stopped gracefully.", e);
}
}
private void createCBlock(String volumeName, long size)
throws CBlockException {
MountVolumeResponse mountVolumeResponse =
storageManager.isVolumeValid(cblockUser, volumeName);
if (!mountVolumeResponse.getIsValid()) {
storageManager
.createVolume(cblockUser, volumeName, size, 4 * 1024);
}
}
private void createPersistentVolumeFromPVC(V1PersistentVolumeClaim claim,
String volumeName) throws ApiException, IOException {
V1PersistentVolume v1PersistentVolume =
persitenceVolumeBuilder(claim, volumeName);
if (LOGGER.isDebugEnabled()) {
RequestBody request =
api.getApiClient().serialize(v1PersistentVolume, "application/json");
final Buffer buffer = new Buffer();
request.writeTo(buffer);
LOGGER.debug("Creating new PV: " + buffer.readUtf8());
}
api.createPersistentVolume(v1PersistentVolume, null);
}
protected V1PersistentVolume persitenceVolumeBuilder(
V1PersistentVolumeClaim claim,
String volumeName) {
V1PersistentVolume v1PersistentVolume = new V1PersistentVolume();
v1PersistentVolume.setKind("PersistentVolume");
v1PersistentVolume.setApiVersion("v1");
V1ObjectMeta metadata = new V1ObjectMeta();
metadata.setName(volumeName);
metadata.setNamespace(claim.getMetadata().getNamespace());
metadata.setAnnotations(new HashMap<>());
metadata.getAnnotations()
.put("pv.kubernetes.io/provisioned-by", PROVISIONER_ID);
metadata.getAnnotations()
.put("volume.beta.kubernetes.io/storage-class", STORAGE_CLASS);
v1PersistentVolume.setMetadata(metadata);
V1PersistentVolumeSpec spec = new V1PersistentVolumeSpec();
spec.setCapacity(new HashMap<>());
spec.getCapacity().put("storage",
claim.getSpec().getResources().getRequests().get("storage"));
spec.setAccessModes(new ArrayList<>());
spec.getAccessModes().add("ReadWriteOnce");
V1ObjectReference claimRef = new V1ObjectReference();
claimRef.setName(claim.getMetadata().getName());
claimRef.setNamespace(claim.getMetadata().getNamespace());
claimRef.setKind(claim.getKind());
claimRef.setApiVersion(claim.getApiVersion());
claimRef.setUid(claim.getMetadata().getUid());
spec.setClaimRef(claimRef);
spec.persistentVolumeReclaimPolicy("Delete");
V1ISCSIVolumeSource iscsi = new V1ISCSIVolumeSource();
iscsi.setIqn(cblockUser + ":" + volumeName);
iscsi.setLun(0);
iscsi.setFsType("ext4");
String portal = externalIp + ":" + externalPort;
iscsi.setTargetPortal(portal);
iscsi.setPortals(new ArrayList<>());
iscsi.getPortals().add(portal);
spec.iscsi(iscsi);
v1PersistentVolume.setSpec(spec);
return v1PersistentVolume;
}
@VisibleForTesting
protected CoreV1Api getApi() {
return api;
}
public void start() {
watcherThread.start();
}
}

View File

@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* This package contains helper classes to run hadoop cluster in kubernetes
* environment.
*/
package org.apache.hadoop.cblock.kubernetes;

View File

@ -204,6 +204,8 @@ public class StorageManager {
LOGGER.error("Error creating container Container:{}:" +
" index:{} error:{}", container.getContainerID(),
containerIdx, e);
} else {
LOGGER.error("Error creating container.", e);
}
}
}

View File

@ -294,6 +294,51 @@
TCP port returned during the iscsi discovery.
</description>
</property>
<property>
<name>dfs.cblock.kubernetes.dynamic-provisioner.enabled</name>
<value>false</value>
<tag>CBLOCK, KUBERNETES</tag>
<description>Flag to enable automatic creation of cblocks and
kubernetes PersitentVolumes in kubernetes environment.</description>
</property>
<property>
<name>dfs.cblock.kubernetes.cblock-user</name>
<value>iqn.2001-04.org.apache.hadoop</value>
<tag>CBLOCK, KUBERNETES</tag>
<description>CBlock user to use for the dynamic provisioner.
This user will own all of the auto-created cblocks.</description>
</property>
<property>
<name>dfs.cblock.kubernetes.configfile</name>
<value></value>
<tag>CBLOCK, KUBERNETES</tag>
<description>Location of the kubernetes configuration file
to access the kubernetes cluster. Not required inside a pod
as the default service account will be if this value is
empty.</description>
</property>
<property>
<name>dfs.cblock.iscsi.advertised.ip</name>
<value></value>
<tag>CBLOCK, KUBERNETES</tag>
<description>IP where the cblock target server is available
from the kubernetes nodes. Usually it's a cluster ip address
which is defined by a deployed Service.</description>
</property>
<property>
<name>dfs.cblock.iscsi.advertised.port</name>
<value>3260</value>
<tag>CBLOCK, KUBERNETES</tag>
<description>Port where the cblock target server is available
from the kubernetes nodes. Could be different from the
listening port if jscsi is behind a Service.</description>
</property>
<!--Container Settings used by Datanode-->
<property>
<name>ozone.container.cache.size</name>

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock.kubernetes;
import io.kubernetes.client.JSON;
import io.kubernetes.client.models.V1PersistentVolume;
import io.kubernetes.client.models.V1PersistentVolumeClaim;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_ISCSI_ADVERTISED_IP;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.junit.Assert;
import org.junit.Test;
import java.nio.file.Files;
import java.nio.file.Paths;
/**
* Test the resource generation of Dynamic Provisioner.
*/
public class TestDynamicProvisioner {
@Test
public void persitenceVolumeBuilder() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setStrings(DFS_CBLOCK_ISCSI_ADVERTISED_IP, "1.2.3.4");
DynamicProvisioner provisioner =
new DynamicProvisioner(conf, null);
String pvc = new String(Files.readAllBytes(
Paths.get(getClass().getResource(
"/dynamicprovisioner/input1-pvc.json").toURI())));
String pv = new String(Files.readAllBytes(
Paths.get(getClass().getResource(
"/dynamicprovisioner/expected1-pv.json").toURI())));
JSON json = new io.kubernetes.client.JSON();
V1PersistentVolumeClaim claim =
json.getGson().fromJson(pvc, V1PersistentVolumeClaim.class);
String volumeName = provisioner.createVolumeName(claim);
V1PersistentVolume volume =
provisioner.persitenceVolumeBuilder(claim, volumeName);
//remove the data which should not been compared
V1PersistentVolume expectedVolume =
json.getGson().fromJson(pv, V1PersistentVolume.class);
Assert.assertEquals(expectedVolume, volume);
}
}

View File

@ -0,0 +1,37 @@
{
"apiVersion": "v1",
"kind": "PersistentVolume",
"metadata": {
"annotations": {
"volume.beta.kubernetes.io/storage-class": "cblock",
"pv.kubernetes.io/provisioned-by": "hadoop.apache.org/cblock"
},
"name": "volume1-b65d053d-f92e-11e7-be3b-84b261c34638",
"namespace": "ns"
},
"spec": {
"accessModes": [
"ReadWriteOnce"
],
"capacity": {
"storage": "1Gi"
},
"claimRef": {
"apiVersion": "v1",
"kind": "PersistentVolumeClaim",
"name": "volume1",
"namespace": "ns",
"uid": "b65d053d-f92e-11e7-be3b-84b261c34638"
},
"iscsi": {
"fsType": "ext4",
"iqn": "iqn.2001-04.org.apache.hadoop:volume1-b65d053d-f92e-11e7-be3b-84b261c34638",
"lun": 0,
"portals": [
"1.2.3.4:3260"
],
"targetPortal": "1.2.3.4:3260"
},
"persistentVolumeReclaimPolicy": "Delete"
}
}

View File

@ -0,0 +1,38 @@
{
"apiVersion": "v1",
"kind": "PersistentVolumeClaim",
"metadata": {
"annotations": {
"pv.kubernetes.io/bind-completed": "yes",
"pv.kubernetes.io/bound-by-controller": "yes",
"volume.beta.kubernetes.io/storage-provisioner": "hadoop.apache.org/cblock"
},
"creationTimestamp": "2018-01-14T13:27:48Z",
"name": "volume1",
"namespace": "ns",
"resourceVersion": "5532691",
"selfLink": "/api/v1/namespaces/demo1/persistentvolumeclaims/persistent",
"uid": "b65d053d-f92e-11e7-be3b-84b261c34638"
},
"spec": {
"accessModes": [
"ReadWriteOnce"
],
"resources": {
"requests": {
"storage": "1Gi"
}
},
"storageClassName": "cblock",
"volumeName": "persistent-b65d053d-f92e-11e7-be3b-84b261c34638"
},
"status": {
"accessModes": [
"ReadWriteOnce"
],
"capacity": {
"storage": "1Gi"
},
"phase": "Bound"
}
}

View File

@ -67,6 +67,12 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>