HDFS-12187. Ozone : add support to DEBUG CLI for ksm.db. Contributed by Chen Liang.

This commit is contained in:
Weiwei Yang 2017-07-25 09:43:19 +08:00 committed by Owen O'Malley
parent b00830142b
commit 539842ed8b
2 changed files with 475 additions and 0 deletions

View File

@ -27,6 +27,11 @@ import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.OzoneAclInfo;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.BucketInfo;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeInfo;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeList;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.Pipeline;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.util.Tool;
@ -51,6 +56,9 @@ import java.util.Set;
import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_DB;
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB;
import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME;
import static org.apache.hadoop.ozone.OzoneConsts.KSM_USER_PREFIX;
import static org.apache.hadoop.ozone.OzoneConsts.KSM_VOLUME_PREFIX;
import static org.apache.hadoop.ozone.OzoneConsts.NODEPOOL_DB;
import static org.apache.hadoop.ozone.OzoneConsts.OPEN_CONTAINERS_DB;
@ -125,6 +133,68 @@ public class SQLCLI extends Configured implements Tool {
"INSERT INTO openContainer (containerName, containerUsed) " +
"VALUES (\"%s\", \"%s\")";
// for ksm.db
private static final String CREATE_VOLUME_LIST =
"CREATE TABLE volumeList (" +
"userName TEXT NOT NULL," +
"volumeName TEXT NOT NULL," +
"PRIMARY KEY (userName, volumeName))";
private static final String INSERT_VOLUME_LIST =
"INSERT INTO volumeList (userName, volumeName) " +
"VALUES (\"%s\", \"%s\")";
private static final String CREATE_VOLUME_INFO =
"CREATE TABLE volumeInfo (" +
"adminName TEXT NOT NULL," +
"ownerName TEXT NOT NULL," +
"volumeName TEXT NOT NULL," +
"PRIMARY KEY (adminName, ownerName, volumeName))";
private static final String INSERT_VOLUME_INFO =
"INSERT INTO volumeInfo (adminName, ownerName, volumeName) " +
"VALUES (\"%s\", \"%s\", \"%s\")";
private static final String CREATE_ACL_INFO =
"CREATE TABLE aclInfo (" +
"adminName TEXT NOT NULL," +
"ownerName TEXT NOT NULL," +
"volumeName TEXT NOT NULL," +
"type TEXT NOT NULL," +
"userName TEXT NOT NULL," +
"rights TEXT NOT NULL," +
"FOREIGN KEY (adminName, ownerName, volumeName, userName, type)" +
"REFERENCES " +
"volumeInfo(adminName, ownerName, volumeName, userName, type)" +
"PRIMARY KEY (adminName, ownerName, volumeName, userName, type))";
private static final String INSERT_ACL_INFO =
"INSERT INTO aclInfo (adminName, ownerName, volumeName, type, " +
"userName, rights) " +
"VALUES (\"%s\", \"%s\", \"%s\", \"%s\", \"%s\", \"%s\")";
private static final String CREATE_BUCKET_INFO =
"CREATE TABLE bucketInfo (" +
"volumeName TEXT NOT NULL," +
"bucketName TEXT NOT NULL," +
"versionEnabled BOOLEAN NOT NULL," +
"storageType TEXT," +
"PRIMARY KEY (volumeName, bucketName))";
private static final String INSERT_BUCKET_INFO =
"INSERT INTO bucketInfo(volumeName, bucketName, " +
"versionEnabled, storageType)" +
"VALUES (\"%s\", \"%s\", \"%s\", \"%s\")";
private static final String CREATE_KEY_INFO =
"CREATE TABLE keyInfo (" +
"volumeName TEXT NOT NULL," +
"bucketName TEXT NOT NULL," +
"keyName TEXT NOT NULL," +
"dataSize INTEGER," +
"blockKey TEXT NOT NULL," +
"containerName TEXT NOT NULL," +
"PRIMARY KEY (volumeName, bucketName, keyName))";
private static final String INSERT_KEY_INFO =
"INSERT INTO keyInfo (volumeName, bucketName, keyName, dataSize, " +
"blockKey, containerName)" +
"VALUES (\"%s\", \"%s\", \"%s\", \"%s\", \"%s\", \"%s\")";
private static final Logger LOG =
LoggerFactory.getLogger(SQLCLI.class);
@ -203,6 +273,9 @@ public class SQLCLI extends Configured implements Tool {
} else if (dbName.toString().equals(OPEN_CONTAINERS_DB)) {
LOG.info("Converting open container DB");
convertOpenContainerDB(dbPath, outPath);
} else if (dbName.toString().equals(KSM_DB_NAME)) {
LOG.info("Converting ksm DB");
convertKSMDB(dbPath, outPath);
} else {
LOG.error("Unrecognized db name {}", dbName);
}
@ -222,6 +295,146 @@ public class SQLCLI extends Configured implements Tool {
}
}
/**
* Convert ksm.db to sqlite db file. With following schema.
* (* for primary key)
*
* 1. for key type USER, it contains a username and a list volumes
* volumeList
* --------------------------------
* userName* | volumeName*
* --------------------------------
*
* 2. for key type VOLUME:
*
* volumeInfo
* ----------------------------------------------
* adminName | ownerName* | volumeName* | aclID
* ----------------------------------------------
*
* aclInfo
* ----------------------------------------------
* aclEntryID* | type* | userName* | rights
* ----------------------------------------------
*
* 3. for key type BUCKET
* bucketInfo
* --------------------------------------------------------
* volumeName* | bucketName* | versionEnabled | storageType
* --------------------------------------------------------
*
* TODO : the following table will be changed when key partition is added.
* Only has the minimum entries for test purpose now.
* 4. for key type KEY
* -----------------------------------------------
* volumeName* | bucketName* | keyName* | dataSize
* -----------------------------------------------
*
*
*
* @param dbPath
* @param outPath
* @throws Exception
*/
private void convertKSMDB(Path dbPath, Path outPath) throws Exception {
LOG.info("Create tables for sql ksm db.");
File dbFile = dbPath.toFile();
try (MetadataStore dbStore = MetadataStoreBuilder.newBuilder()
.setDbFile(dbFile).build();
Connection conn = connectDB(outPath.toString())) {
executeSQL(conn, CREATE_VOLUME_LIST);
executeSQL(conn, CREATE_VOLUME_INFO);
executeSQL(conn, CREATE_ACL_INFO);
executeSQL(conn, CREATE_BUCKET_INFO);
executeSQL(conn, CREATE_KEY_INFO);
dbStore.iterate(null, (key, value) -> {
String keyString = DFSUtilClient.bytes2String(key);
KeyType type = getKeyType(keyString);
try {
insertKSMDB(conn, type, keyString, value);
} catch (IOException | SQLException ex) {
LOG.error("Exception inserting key {}", keyString, ex);
}
return true;
});
}
}
private void insertKSMDB(Connection conn, KeyType type, String keyName,
byte[] value) throws IOException, SQLException {
switch (type) {
case USER:
VolumeList volumeList = VolumeList.parseFrom(value);
for (String volumeName : volumeList.getVolumeNamesList()) {
String insertVolumeList =
String.format(INSERT_VOLUME_LIST, keyName, volumeName);
executeSQL(conn, insertVolumeList);
}
break;
case VOLUME:
VolumeInfo volumeInfo = VolumeInfo.parseFrom(value);
String adminName = volumeInfo.getAdminName();
String ownerName = volumeInfo.getOwnerName();
String volumeName = volumeInfo.getVolume();
String insertVolumeInfo =
String.format(INSERT_VOLUME_INFO, adminName, ownerName, volumeName);
executeSQL(conn, insertVolumeInfo);
for (OzoneAclInfo aclInfo : volumeInfo.getVolumeAclsList()) {
String insertAclInfo =
String.format(INSERT_ACL_INFO, adminName, ownerName, volumeName,
aclInfo.getType(), aclInfo.getName(), aclInfo.getRights());
executeSQL(conn, insertAclInfo);
}
break;
case BUCKET:
BucketInfo bucketInfo = BucketInfo.parseFrom(value);
String insertBucketInfo =
String.format(INSERT_BUCKET_INFO, bucketInfo.getVolumeName(),
bucketInfo.getBucketName(), bucketInfo.getIsVersionEnabled(),
bucketInfo.getStorageType());
executeSQL(conn, insertBucketInfo);
break;
case KEY:
KeyInfo keyInfo = KeyInfo.parseFrom(value);
String insertKeyInfo =
String.format(INSERT_KEY_INFO, keyInfo.getVolumeName(),
keyInfo.getBucketName(), keyInfo.getKeyName(),
keyInfo.getDataSize(), keyInfo.getBlockKey(),
keyInfo.getContainerName());
executeSQL(conn, insertKeyInfo);
break;
default:
throw new IOException("Unknown key from ksm.db");
}
}
private KeyType getKeyType(String key) {
if (key.startsWith(KSM_USER_PREFIX)) {
return KeyType.USER;
} else {
int count = key.length() - key.replace(KSM_VOLUME_PREFIX, "").length();
// NOTE : when delimiter gets changed, will need to change this part
if (count == 1) {
return KeyType.VOLUME;
} else if (count == 2) {
return KeyType.BUCKET;
} else if (count >= 3) {
return KeyType.KEY;
} else {
return KeyType.UNKNOWN;
}
}
}
private enum KeyType {
USER,
VOLUME,
BUCKET,
KEY,
UNKNOWN
}
/**
* Convert container.db to sqlite. The schema of sql db:
* three tables, containerId, containerMachines, datanodeInfo

View File

@ -0,0 +1,262 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.ksm;
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.scm.cli.SQLCLI;
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
import org.apache.hadoop.ozone.web.handlers.KeyArgs;
import org.apache.hadoop.ozone.web.handlers.UserArgs;
import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* This class tests the CLI that transforms ksm.db into SQLite DB files.
*/
public class TestKSMSQLCli {
private static MiniOzoneCluster cluster = null;
private static StorageHandler storageHandler;
private static UserArgs userArgs;
private static OzoneConfiguration conf;
private static SQLCLI cli;
private static String userName = "userTest";
private static String adminName = "adminTest";
private static String volumeName0 = "volumeTest0";
private static String volumeName1 = "volumeTest1";
private static String bucketName0 = "bucketTest0";
private static String bucketName1 = "bucketTest1";
private static String bucketName2 = "bucketTest2";
private static String keyName0 = "key0";
private static String keyName1 = "key1";
private static String keyName2 = "key2";
private static String keyName3 = "key3";
@Rule
public ExpectedException exception = ExpectedException.none();
/**
* Create a MiniDFSCluster for testing.
* <p>
* Ozone is made active by setting OZONE_ENABLED = true and
* OZONE_HANDLER_TYPE_KEY = "distributed"
*
* @throws IOException
*/
@BeforeClass
public static void setup() throws Exception {
conf = new OzoneConfiguration();
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
cluster = new MiniOzoneCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
null, null, null, null);
cluster.waitForHeartbeatProcessed();
VolumeArgs createVolumeArgs0 = new VolumeArgs(volumeName0, userArgs);
createVolumeArgs0.setUserName(userName);
createVolumeArgs0.setAdminName(adminName);
storageHandler.createVolume(createVolumeArgs0);
VolumeArgs createVolumeArgs1 = new VolumeArgs(volumeName1, userArgs);
createVolumeArgs1.setUserName(userName);
createVolumeArgs1.setAdminName(adminName);
storageHandler.createVolume(createVolumeArgs1);
BucketArgs bucketArgs0 = new BucketArgs(volumeName0, bucketName0, userArgs);
storageHandler.createBucket(bucketArgs0);
BucketArgs bucketArgs1 = new BucketArgs(volumeName1, bucketName1, userArgs);
storageHandler.createBucket(bucketArgs1);
BucketArgs bucketArgs2 = new BucketArgs(volumeName0, bucketName2, userArgs);
storageHandler.createBucket(bucketArgs2);
KeyArgs keyArgs0 =
new KeyArgs(volumeName0, bucketName0, keyName0, userArgs);
keyArgs0.setSize(100);
KeyArgs keyArgs1 =
new KeyArgs(volumeName1, bucketName1, keyName1, userArgs);
keyArgs1.setSize(200);
KeyArgs keyArgs2 =
new KeyArgs(volumeName0, bucketName2, keyName2, userArgs);
keyArgs2.setSize(300);
KeyArgs keyArgs3 =
new KeyArgs(volumeName0, bucketName2, keyName3, userArgs);
keyArgs3.setSize(400);
OutputStream stream = storageHandler.newKeyWriter(keyArgs0);
stream.close();
stream = storageHandler.newKeyWriter(keyArgs1);
stream.close();
stream = storageHandler.newKeyWriter(keyArgs2);
stream.close();
stream = storageHandler.newKeyWriter(keyArgs3);
stream.close();
cluster.shutdown();
}
@Before
public void init() throws Exception {
cli = new SQLCLI();
}
@Test
public void testKSMDB() throws Exception {
String dbOutPath = cluster.getDataDirectory() + "/out_sql.db";
String dbRootPath = conf.get(OzoneConfigKeys.OZONE_CONTAINER_METADATA_DIRS);
String dbPath = dbRootPath + "/" + KSM_DB_NAME;
String[] args = {"-p", dbPath, "-o", dbOutPath};
cli.run(args);
Connection conn = connectDB(dbOutPath);
String sql = "SELECT * FROM volumeList";
ResultSet rs = executeQuery(conn, sql);
List<String> expectedValues =
new LinkedList<>(Arrays.asList(volumeName0, volumeName1));
while (rs.next()) {
String userNameRs = rs.getString("userName");
String volumeNameRs = rs.getString("volumeName");
assertEquals(userName, userNameRs.substring(1));
assertTrue(expectedValues.remove(volumeNameRs));
}
assertEquals(0, expectedValues.size());
sql = "SELECT * FROM volumeInfo";
rs = executeQuery(conn, sql);
expectedValues =
new LinkedList<>(Arrays.asList(volumeName0, volumeName1));
while (rs.next()) {
String adName = rs.getString("adminName");
String ownerName = rs.getString("ownerName");
String volumeName = rs.getString("volumeName");
assertEquals(adminName, adName);
assertEquals(userName, ownerName);
assertTrue(expectedValues.remove(volumeName));
}
assertEquals(0, expectedValues.size());
sql = "SELECT * FROM aclInfo";
rs = executeQuery(conn, sql);
expectedValues =
new LinkedList<>(Arrays.asList(volumeName0, volumeName1));
while (rs.next()) {
String adName = rs.getString("adminName");
String ownerName = rs.getString("ownerName");
String volumeName = rs.getString("volumeName");
String type = rs.getString("type");
String uName = rs.getString("userName");
String rights = rs.getString("rights");
assertEquals(adminName, adName);
assertEquals(userName, ownerName);
assertEquals("USER", type);
assertEquals(userName, uName);
assertEquals("READ_WRITE", rights);
assertTrue(expectedValues.remove(volumeName));
}
assertEquals(0, expectedValues.size());
sql = "SELECT * FROM bucketInfo";
rs = executeQuery(conn, sql);
HashMap<String, String> expectedMap = new HashMap<>();
expectedMap.put(bucketName0, volumeName0);
expectedMap.put(bucketName2, volumeName0);
expectedMap.put(bucketName1, volumeName1);
while (rs.next()) {
String volumeName = rs.getString("volumeName");
String bucketName = rs.getString("bucketName");
boolean versionEnabled = rs.getBoolean("versionEnabled");
String storegeType = rs.getString("storageType");
assertEquals(volumeName, expectedMap.remove(bucketName));
assertFalse(versionEnabled);
assertEquals("DISK", storegeType);
}
assertEquals(0, expectedMap.size());
sql = "SELECT * FROM keyInfo";
rs = executeQuery(conn, sql);
HashMap<String, List<String>> expectedMap2 = new HashMap<>();
expectedMap2.put(keyName0,
Arrays.asList(volumeName0, bucketName0, Integer.toString(100)));
expectedMap2.put(keyName1,
Arrays.asList(volumeName1, bucketName1, Integer.toString(200)));
expectedMap2.put(keyName2,
Arrays.asList(volumeName0, bucketName2, Integer.toString(300)));
expectedMap2.put(keyName3,
Arrays.asList(volumeName0, bucketName2, Integer.toString(400)));
while (rs.next()) {
String volumeName = rs.getString("volumeName");
String bucketName = rs.getString("bucketName");
String keyName = rs.getString("keyName");
int dataSize = rs.getInt("dataSize");
List<String> vals = expectedMap2.remove(keyName);
assertNotNull(vals);
assertEquals(vals.get(0), volumeName);
assertEquals(vals.get(1), bucketName);
assertEquals(vals.get(2), Integer.toString(dataSize));
}
assertEquals(0, expectedMap2.size());
conn.close();
Files.delete(Paths.get(dbOutPath));
}
private ResultSet executeQuery(Connection conn, String sql)
throws SQLException {
Statement stmt = conn.createStatement();
return stmt.executeQuery(sql);
}
private Connection connectDB(String dbPath) throws Exception {
Class.forName("org.sqlite.JDBC");
String connectPath =
String.format("jdbc:sqlite:%s", dbPath);
return DriverManager.getConnection(connectPath);
}
}