HDFS-9449. DiskBalancer: Add connectors. Contributed by Anu Engineer

This commit is contained in:
Tsz-Wo Nicholas Sze 2015-12-01 14:43:06 -08:00 committed by Arpit Agarwal
parent 91a5c48143
commit 30c6ebd699
6 changed files with 386 additions and 1 deletions

View File

@ -3,3 +3,5 @@ HDFS-1312 Change Log
NEW FEATURES
HDFS-9420. Add DataModels for DiskBalancer. (Anu Engineer via szetszwo)
HDFS-9449. DiskBalancer: Add connectors. (Anu Engineer via szetszwo)

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdfs.server.diskbalancer.connectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
* Connector factory creates appropriate connector based on the URL.
*/
public final class ConnectorFactory {
static final Log LOG = LogFactory.getLog(ConnectorFactory.class);
/**
* Constructs an appropriate connector based on the URL.
* @param clusterURI - URL
* @return ClusterConnector
*/
public static ClusterConnector getCluster(URI clusterURI, Configuration
conf) throws IOException, URISyntaxException {
LOG.info("Cluster URI : " + clusterURI);
LOG.info("scheme : " + clusterURI.getScheme());
if (clusterURI.getScheme().startsWith("file")) {
LOG.info("Creating a JsonNodeConnector");
return new JsonNodeConnector(clusterURI.toURL());
} else {
LOG.info("Creating NameNode connector");
return new DBNameNodeConnector(clusterURI, conf);
}
}
private ConnectorFactory() {
// never constructed
}
}

View File

@ -0,0 +1,162 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdfs.server.diskbalancer.connectors;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel
.DiskBalancerDataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.LinkedList;
import java.util.List;
/**
* DBNameNodeConnector connects to Namenode and extracts information from a
* given cluster.
*/
class DBNameNodeConnector implements ClusterConnector {
static final Log LOG = LogFactory.getLog(DBNameNodeConnector.class);
static final Path DISKBALANCER_ID_PATH = new Path("/system/diskbalancer.id");
private final URI clusterURI;
private final NameNodeConnector connector;
/**
* Constructs a DBNameNodeConnector.
*
* @param clusterURI - URL to connect to.
*/
public DBNameNodeConnector(URI clusterURI, Configuration conf) throws
IOException, URISyntaxException {
// we don't care how many instances of disk balancers run.
// The admission is controlled at the data node, where we will
// execute only one plan at a given time.
NameNodeConnector.setWrite2IdFile(false);
try {
connector = new NameNodeConnector("DiskBalancer",
clusterURI, DISKBALANCER_ID_PATH, null, conf, 1);
} catch (IOException ex) {
LOG.error("Unable to connect to NameNode " + ex.toString());
throw ex;
}
this.clusterURI = clusterURI;
}
/**
* getNodes function returns a list of DiskBalancerDataNodes.
*
* @return Array of DiskBalancerDataNodes
*/
@Override
public List<DiskBalancerDataNode> getNodes() throws Exception {
Preconditions.checkNotNull(this.connector);
List<DiskBalancerDataNode> nodeList = new LinkedList<>();
DatanodeStorageReport[] reports = this.connector
.getLiveDatanodeStorageReport();
for (DatanodeStorageReport report : reports) {
DiskBalancerDataNode datanode = getBalancerNodeFromDataNode(
report.getDatanodeInfo());
getVolumeInfoFromStorageReports(datanode, report.getStorageReports());
nodeList.add(datanode);
}
return nodeList;
}
/**
* Returns info about the connector.
*
* @return String.
*/
@Override
public String getConnectorInfo() {
return "Name Node Connector : " + clusterURI.toString();
}
/**
* This function maps the required fields from DataNodeInfo to disk
* BalancerDataNode.
*
* @param nodeInfo
* @return DiskBalancerDataNode
*/
private DiskBalancerDataNode
getBalancerNodeFromDataNode(DatanodeInfo nodeInfo) {
Preconditions.checkNotNull(nodeInfo);
DiskBalancerDataNode dbDataNode = new DiskBalancerDataNode(nodeInfo
.getDatanodeUuid());
dbDataNode.setDataNodeIP(nodeInfo.getIpAddr());
dbDataNode.setDataNodeName(nodeInfo.getHostName());
dbDataNode.setDataNodePort(nodeInfo.getIpcPort());
return dbDataNode;
}
/**
* Reads the relevant fields from each storage volume and populate the
* DiskBalancer Node.
*
* @param node - Disk Balancer Node
* @param reports - Array of StorageReport
*/
private void getVolumeInfoFromStorageReports(DiskBalancerDataNode node,
StorageReport[] reports)
throws Exception {
Preconditions.checkNotNull(node);
Preconditions.checkNotNull(reports);
for (StorageReport report : reports) {
DatanodeStorage storage = report.getStorage();
DiskBalancerVolume volume = new DiskBalancerVolume();
volume.setCapacity(report.getCapacity());
volume.setFailed(report.isFailed());
volume.setUsed(report.getDfsUsed());
// TODO : Should we do BlockPool level balancing at all ?
// Does it make sense ? Balancer does do that. Right now
// we only deal with volumes and not blockPools
volume.setUsed(report.getDfsUsed());
volume.setUuid(storage.getStorageID());
// we will skip this volume for disk balancer if
// it is read-only since we will not be able to delete
// or if it is already failed.
volume.setSkip((storage.getState() == DatanodeStorage.State
.READ_ONLY_SHARED) || report.isFailed());
volume.setStorageType(storage.getStorageType().name());
volume.setIsTransient(storage.getStorageType().isTransient());
//volume.setPath(storage.getVolumePath());
node.addVolume(volume);
}
}
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdfs.server.diskbalancer.connectors;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel
.DiskBalancerDataNode;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.File;
import java.net.URL;
import java.util.List;
/**
* A connector that understands JSON data cluster models.
*/
public class JsonNodeConnector implements ClusterConnector {
static final Log LOG = LogFactory.getLog(JsonNodeConnector.class);
private final URL clusterURI;
/**
* Constructs a JsonNodeConnector.
* @param clusterURI - A file URL that contains cluster information.
*/
public JsonNodeConnector(URL clusterURI) {
this.clusterURI = clusterURI;
}
/**
* getNodes function connects to a cluster definition file
* and returns nodes defined in that file.
*
* @return Array of DiskBalancerDataNodes
*/
@Override
public List<DiskBalancerDataNode> getNodes() throws Exception {
Preconditions.checkNotNull(this.clusterURI);
String dataFilePath = this.clusterURI.getPath();
LOG.info("Reading cluster info from file : " + dataFilePath);
ObjectMapper mapper = new ObjectMapper();
DiskBalancerCluster cluster =
mapper.readValue(new File(dataFilePath), DiskBalancerCluster.class);
String message = String.format("Found %d node(s)",
cluster.getNodes().size());
LOG.info(message);
return cluster.getNodes();
}
/**
* Returns info about the connector.
*
* @return String.
*/
@Override
public String getConnectorInfo() {
return "Json Cluster Connector : Connects to a JSON file that describes a" +
" cluster : " + clusterURI.toString();
}
}

View File

@ -21,7 +21,15 @@ package org.apache.hadoop.hdfs.server.diskbalancer.connectors;
* Connectors package is a set of logical connectors that connect
* to various data sources to read the hadoop cluster information.
*
* We currently have 1 connector in this package. it is
* We currently have 3 connectors in this package. They are
*
* DBNameNodeConnector - This uses the connector from the original
* balancer package to connect to a real hadoop cluster.
*
* JsonNodeConnector - This connects to a file and reads the data about a
* cluster. We can generate a cluster json from a real cluster using
* the diskBalancer tool or hand-craft it. There are some sample Json files
* checked in under test/resources/diskBalancer directory.
*
* NullConnector - This is an in-memory connector that is useful in testing.
* we can crate dataNodes on the fly and attach to this connector and

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdfs.server.diskbalancer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
public class TestConnectors {
private MiniDFSCluster cluster;
final int numDatanodes = 3;
final int volumeCount = 2; // default volumes in MiniDFSCluster.
Configuration conf;
@Before
public void setup() throws IOException {
conf = new HdfsConfiguration();
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDatanodes).build();
}
@After
public void teardown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void TestNameNodeConnector() throws Exception {
cluster.waitActive();
ClusterConnector nameNodeConnector =
ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster
(nameNodeConnector);
diskBalancerCluster.readClusterInfo();
Assert.assertEquals("Expected number of Datanodes not found.",
numDatanodes, diskBalancerCluster.getNodes().size());
Assert.assertEquals("Expected number of volumes not found.",
volumeCount, diskBalancerCluster.getNodes().get(0).getVolumeCount());
}
@Test
public void TestJsonConnector() throws Exception {
cluster.waitActive();
ClusterConnector nameNodeConnector =
ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster
(nameNodeConnector);
diskBalancerCluster.readClusterInfo();
String diskBalancerJson = diskBalancerCluster.toJson();
DiskBalancerCluster serializedCluster = DiskBalancerCluster.parseJson
(diskBalancerJson);
Assert.assertEquals("Parsed cluster is not equal to persisted info.",
diskBalancerCluster.getNodes().size(), serializedCluster.getNodes()
.size());
}
}