HDFS-11788. Ozone : add DEBUG CLI support for nodepool db file. Contributed by Chen Liang

This commit is contained in:
Chen Liang 2017-05-10 15:15:22 -07:00
parent 5df9a1d2ca
commit 1b1b74a096
2 changed files with 160 additions and 51 deletions

View File

@ -24,8 +24,10 @@ import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Pipeline;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@ -50,9 +52,15 @@ import java.util.Set;
import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_DB;
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB;
import static org.apache.hadoop.ozone.OzoneConsts.NODEPOOL_DB;
/**
* This is the CLI that can be use to convert a levelDB into a sqlite DB file.
*
* NOTE: user should use this CLI in an offline fashion. Namely, this should not
* be used to convert a levelDB that is currently being used by Ozone. Instead,
* this should be used to debug and diagnosis closed levelDB instances.
*
*/
public class SQLCLI extends Configured implements Tool {
@ -65,10 +73,10 @@ public class SQLCLI extends Configured implements Tool {
"CREATE TABLE containerInfo (" +
"containerName TEXT PRIMARY KEY NOT NULL, " +
"leaderUUID TEXT NOT NULL)";
private static final String CREATE_CONTAINER_MACHINE =
private static final String CREATE_CONTAINER_MEMBERS =
"CREATE TABLE containerMembers (" +
"containerName INTEGER NOT NULL, " +
"datanodeUUID INTEGER NOT NULL," +
"containerName TEXT NOT NULL, " +
"datanodeUUID TEXT NOT NULL," +
"PRIMARY KEY(containerName, datanodeUUID));";
private static final String CREATE_DATANODE_INFO =
"CREATE TABLE datanodeInfo (" +
@ -98,6 +106,16 @@ public class SQLCLI extends Configured implements Tool {
private static final String INSERT_BLOCK_CONTAINER =
"INSERT INTO blockContainer (blockKey, containerName) " +
"VALUES (\"%s\", \"%s\")";
// for nodepool.db
private static final String CREATE_NODE_POOL =
"CREATE TABLE nodePool (" +
"datanodeUUID TEXT NOT NULL," +
"poolName TEXT NOT NULL," +
"PRIMARY KEY(datanodeUUID, poolName))";
private static final String INSERT_NODE_POOL =
"INSERT INTO nodePool (datanodeUUID, poolName) " +
"VALUES (\"%s\", \"%s\")";
// and reuse CREATE_DATANODE_INFO and INSERT_DATANODE_INFO
private static final Logger LOG =
@ -170,6 +188,9 @@ public class SQLCLI extends Configured implements Tool {
} else if (dbName.toString().equals(BLOCK_DB)) {
LOG.info("Converting block DB");
convertBlockDB(dbPath, outPath);
} else if (dbName.toString().equals(NODEPOOL_DB)) {
LOG.info("Converting node pool DB");
convertNodePoolDB(dbPath, outPath);
} else {
LOG.error("Unrecognized db name {}", dbName);
}
@ -183,10 +204,6 @@ public class SQLCLI extends Configured implements Tool {
return DriverManager.getConnection(connectPath);
}
private void closeDB(Connection conn) throws SQLException {
conn.close();
}
private void executeSQL(Connection conn, String sql) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.executeUpdate(sql);
@ -226,24 +243,22 @@ public class SQLCLI extends Configured implements Tool {
LOG.info("Create tables for sql container db.");
File dbFile = dbPath.toFile();
org.iq80.leveldb.Options dbOptions = new org.iq80.leveldb.Options();
LevelDBStore dbStore = new LevelDBStore(dbFile, dbOptions);
try (LevelDBStore dbStore = new LevelDBStore(dbFile, dbOptions);
Connection conn = connectDB(outPath.toString())) {
executeSQL(conn, CREATE_CONTAINER_INFO);
executeSQL(conn, CREATE_CONTAINER_MEMBERS);
executeSQL(conn, CREATE_DATANODE_INFO);
Connection conn = connectDB(outPath.toString());
executeSQL(conn, CREATE_CONTAINER_INFO);
executeSQL(conn, CREATE_CONTAINER_MACHINE);
executeSQL(conn, CREATE_DATANODE_INFO);
DBIterator iter = dbStore.getIterator();
iter.seekToFirst();
HashSet<String> uuidChecked = new HashSet<>();
while(iter.hasNext()) {
Map.Entry<byte[], byte[]> entry = iter.next();
String containerName = new String(entry.getKey(), encoding);
Pipeline pipeline = Pipeline.parseFrom(entry.getValue());
insertContainerDB(conn, containerName, pipeline, uuidChecked);
DBIterator iter = dbStore.getIterator();
iter.seekToFirst();
HashSet<String> uuidChecked = new HashSet<>();
while (iter.hasNext()) {
Map.Entry<byte[], byte[]> entry = iter.next();
String containerName = new String(entry.getKey(), encoding);
Pipeline pipeline = Pipeline.parseFrom(entry.getValue());
insertContainerDB(conn, containerName, pipeline, uuidChecked);
}
}
closeDB(conn);
dbStore.close();
}
/**
@ -304,23 +319,79 @@ public class SQLCLI extends Configured implements Tool {
LOG.info("Create tables for sql block db.");
File dbFile = dbPath.toFile();
org.iq80.leveldb.Options dbOptions = new org.iq80.leveldb.Options();
LevelDBStore dbStore = new LevelDBStore(dbFile, dbOptions);
try (LevelDBStore dbStore = new LevelDBStore(dbFile, dbOptions);
Connection conn = connectDB(outPath.toString())) {
executeSQL(conn, CREATE_BLOCK_CONTAINER);
Connection conn = connectDB(outPath.toString());
executeSQL(conn, CREATE_BLOCK_CONTAINER);
DBIterator iter = dbStore.getIterator();
iter.seekToFirst();
while (iter.hasNext()) {
Map.Entry<byte[], byte[]> entry = iter.next();
String blockKey = DFSUtilClient.bytes2String(entry.getKey());
String containerName = DFSUtilClient.bytes2String(entry.getValue());
String insertBlockContainer = String.format(
INSERT_BLOCK_CONTAINER, blockKey, containerName);
executeSQL(conn, insertBlockContainer);
DBIterator iter = dbStore.getIterator();
iter.seekToFirst();
while (iter.hasNext()) {
Map.Entry<byte[], byte[]> entry = iter.next();
String blockKey = DFSUtilClient.bytes2String(entry.getKey());
String containerName = DFSUtilClient.bytes2String(entry.getValue());
String insertBlockContainer = String.format(
INSERT_BLOCK_CONTAINER, blockKey, containerName);
executeSQL(conn, insertBlockContainer);
}
}
closeDB(conn);
dbStore.close();
}
/**
* Converts nodePool.db to sqlite. The schema of sql db:
* two tables, nodePool and datanodeInfo (the same datanode Info as for
* container.db).
*
* nodePool
* ---------------------------------------------------------
* datanodeUUID* | poolName*
* ---------------------------------------------------------
*
* datanodeInfo:
* ---------------------------------------------------------
* hostname | datanodeUUid* | xferPort | infoPort | ipcPort
* ---------------------------------------------------------
*
* --------------------------------
* | infoSecurePort | containerPort
* --------------------------------
*
* @param dbPath path to container db.
* @param outPath path to output sqlite
* @throws IOException throws exception.
*/
private void convertNodePoolDB(Path dbPath, Path outPath) throws Exception {
LOG.info("Create table for sql node pool db.");
File dbFile = dbPath.toFile();
org.iq80.leveldb.Options dbOptions = new org.iq80.leveldb.Options();
try (LevelDBStore dbStore = new LevelDBStore(dbFile, dbOptions);
Connection conn = connectDB(outPath.toString())) {
executeSQL(conn, CREATE_NODE_POOL);
executeSQL(conn, CREATE_DATANODE_INFO);
DBIterator iter = dbStore.getIterator();
iter.seekToFirst();
while (iter.hasNext()) {
Map.Entry<byte[], byte[]> entry = iter.next();
DatanodeID nodeId = DatanodeID.getFromProtoBuf(
HdfsProtos.DatanodeIDProto.PARSER.parseFrom(entry.getKey()));
String blockPool = DFSUtil.bytes2String(entry.getValue());
insertNodePoolDB(conn, blockPool, nodeId);
}
}
}
private void insertNodePoolDB(Connection conn, String blockPool,
DatanodeID datanodeID) throws SQLException {
String insertNodePool = String.format(INSERT_NODE_POOL,
datanodeID.getDatanodeUuid(), blockPool);
executeSQL(conn, insertNodePool);
String insertDatanodeID = String.format(INSERT_DATANODE_INFO,
datanodeID.getHostName(), datanodeID.getDatanodeUuid(),
datanodeID.getIpAddr(), datanodeID.getXferPort(),
datanodeID.getInfoPort(), datanodeID.getIpcPort(),
datanodeID.getInfoSecurePort(), datanodeID.getContainerPort());
executeSQL(conn, insertDatanodeID);
}
private CommandLine parseArgs(String[] argv)

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.ozone.scm;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
@ -25,9 +26,12 @@ import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.scm.block.BlockManagerImpl;
import org.apache.hadoop.ozone.scm.cli.SQLCLI;
import org.apache.hadoop.ozone.scm.container.ContainerMapping;
import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy;
import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -46,6 +50,7 @@ import java.util.HashMap;
import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_DB;
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB;
import static org.apache.hadoop.ozone.OzoneConsts.KB;
import static org.apache.hadoop.ozone.OzoneConsts.NODEPOOL_DB;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -64,8 +69,8 @@ public class TestContainerSQLCli {
private static NodeManager nodeManager;
private static BlockManagerImpl blockManager;
private static String pipelineName1;
private static String pipelineName2;
private static Pipeline pipeline1;
private static Pipeline pipeline2;
private static HashMap<String, String> blockContainerMap;
@ -78,7 +83,10 @@ public class TestContainerSQLCli {
conf = new OzoneConfiguration();
conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, 2);
cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(1)
conf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(2
)
.storageCapacities(new long[] {datanodeCapacities, datanodeCapacities})
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
storageContainerLocationClient =
@ -96,8 +104,8 @@ public class TestContainerSQLCli {
// so the first allocateBlock() will create two containers. A random one
// is assigned for the block.
AllocatedBlock ab1 = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE);
pipelineName1 = ab1.getPipeline().getContainerName();
blockContainerMap.put(ab1.getKey(), pipelineName1);
pipeline1 = ab1.getPipeline();
blockContainerMap.put(ab1.getKey(), pipeline1.getContainerName());
AllocatedBlock ab2;
// we want the two blocks on the two provisioned containers respectively,
@ -108,9 +116,9 @@ public class TestContainerSQLCli {
// the size of blockContainerMap will vary each time the test is run.
while (true) {
ab2 = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE);
pipelineName2 = ab2.getPipeline().getContainerName();
blockContainerMap.put(ab2.getKey(), pipelineName2);
if (!pipelineName2.equals(pipelineName1)) {
pipeline2 = ab2.getPipeline();
blockContainerMap.put(ab2.getKey(), pipeline2.getContainerName());
if (!pipeline1.getContainerName().equals(pipeline2.getContainerName())) {
break;
}
}
@ -149,6 +157,33 @@ public class TestContainerSQLCli {
Files.delete(Paths.get(dbOutPath));
}
@Test
public void testConvertNodepoolDB() throws Exception {
String dbOutPath = cluster.getDataDirectory() + "/out_sql.db";
String dbRootPath = conf.get(OzoneConfigKeys.OZONE_CONTAINER_METADATA_DIRS);
String dbPath = dbRootPath + "/" + NODEPOOL_DB;
String[] args = {"-p", dbPath, "-o", dbOutPath};
cli.run(args);
// verify the sqlite db
HashMap<String, String> expectedPool = new HashMap<>();
for (DatanodeID dnid : nodeManager.getAllNodes()) {
expectedPool.put(dnid.getDatanodeUuid(), "DefaultNodePool");
}
Connection conn = connectDB(dbOutPath);
String sql = "SELECT * FROM nodePool";
ResultSet rs = executeQuery(conn, sql);
while(rs.next()) {
String datanodeUUID = rs.getString("datanodeUUID");
String poolName = rs.getString("poolName");
assertTrue(expectedPool.remove(datanodeUUID).equals(poolName));
}
assertEquals(0, expectedPool.size());
Files.delete(Paths.get(dbOutPath));
}
@Test
public void testConvertContainerDB() throws Exception {
String dbOutPath = cluster.getDataDirectory() + "/out_sql.db";
@ -175,8 +210,8 @@ public class TestContainerSQLCli {
//assertEquals(dnUUID, rs.getString("leaderUUID"));
}
assertTrue(containerNames.size() == 2 &&
containerNames.contains(pipelineName1) &&
containerNames.contains(pipelineName2));
containerNames.contains(pipeline1.getContainerName()) &&
containerNames.contains(pipeline2.getContainerName()));
sql = "SELECT * FROM containerMembers";
rs = executeQuery(conn, sql);
@ -186,8 +221,8 @@ public class TestContainerSQLCli {
//assertEquals(dnUUID, rs.getString("datanodeUUID"));
}
assertTrue(containerNames.size() == 2 &&
containerNames.contains(pipelineName1) &&
containerNames.contains(pipelineName2));
containerNames.contains(pipeline1.getContainerName()) &&
containerNames.contains(pipeline2.getContainerName()));
sql = "SELECT * FROM datanodeInfo";
rs = executeQuery(conn, sql);
@ -197,7 +232,10 @@ public class TestContainerSQLCli {
//assertEquals(dnUUID, rs.getString("datanodeUUID"));
count += 1;
}
assertEquals(1, count);
// the two containers maybe on the same datanode, maybe not.
int expected = pipeline1.getLeader().getDatanodeUuid().equals(
pipeline2.getLeader().getDatanodeUuid())? 1 : 2;
assertEquals(expected, count);
Files.delete(Paths.get(dbOutPath));
}