HDFS-10631. Federation State Store ZooKeeper implementation. Contributed by Jason Kace and Inigo Goiri.

(cherry picked from commit 23c4ddee11)
This commit is contained in:
Inigo Goiri 2017-08-21 11:40:41 -07:00
parent 346c9fce43
commit 7cb6bdf09e
5 changed files with 432 additions and 1 deletions

View File

@ -203,6 +203,15 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -30,6 +30,11 @@ import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
*/
public abstract class StateStoreSerializableImpl extends StateStoreBaseImpl {
/** Mark for slashes in path names. */
protected static final String SLASH_MARK = "0SLASH0";
/** Mark for colon in path names. */
protected static final String COLON_MARK = "_";
/** Default serializer for this driver. */
private StateStoreSerializer serializer;
@ -74,4 +79,18 @@ public abstract class StateStoreSerializableImpl extends StateStoreBaseImpl {
String data, Class<T> clazz, boolean includeDates) throws IOException {
return serializer.deserialize(data, clazz);
}
/**
* Get the primary key for a record. If we don't want to store in folders, we
* need to remove / from the name.
*
* @param record Record to get the primary key for.
* @return Primary key for the record.
*/
protected static String getPrimaryKey(BaseRecord record) {
String primaryKey = record.getPrimaryKey();
primaryKey = primaryKey.replaceAll("/", SLASH_MARK);
primaryKey = primaryKey.replaceAll(":", COLON_MARK);
return primaryKey;
}
}

View File

@ -0,0 +1,298 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.getRecordName;
import static org.apache.hadoop.util.curator.ZKCuratorManager.getNodePath;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* {@link StateStoreDriver} driver implementation that uses ZooKeeper as a
* backend.
* <p>
* The structure of the znodes in the ensemble is:
* PARENT_PATH
* |--- MOUNT
* |--- MEMBERSHIP
* |--- REBALANCER
* |--- ROUTERS
*/
public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
private static final Logger LOG =
LoggerFactory.getLogger(StateStoreZooKeeperImpl.class);
/** Configuration keys. */
public static final String FEDERATION_STORE_ZK_DRIVER_PREFIX =
DFSConfigKeys.FEDERATION_STORE_PREFIX + "driver.zk.";
public static final String FEDERATION_STORE_ZK_PARENT_PATH =
FEDERATION_STORE_ZK_DRIVER_PREFIX + "parent-path";
public static final String FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT =
"/hdfs-federation";
/** Directory to store the state store data. */
private String baseZNode;
/** Interface to ZooKeeper. */
private ZKCuratorManager zkManager;
@Override
public boolean initDriver() {
LOG.info("Initializing ZooKeeper connection");
Configuration conf = getConf();
baseZNode = conf.get(
FEDERATION_STORE_ZK_PARENT_PATH,
FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT);
try {
this.zkManager = new ZKCuratorManager(conf);
this.zkManager.start();
} catch (IOException e) {
LOG.error("Cannot initialize the ZK connection", e);
return false;
}
return true;
}
@Override
public <T extends BaseRecord> boolean initRecordStorage(
String className, Class<T> clazz) {
try {
String checkPath = getNodePath(baseZNode, className);
zkManager.createRootDirRecursively(checkPath);
return true;
} catch (Exception e) {
LOG.error("Cannot initialize ZK node for {}: {}",
className, e.getMessage());
return false;
}
}
@Override
public void close() throws Exception {
if (zkManager != null) {
zkManager.close();
}
}
@Override
public boolean isDriverReady() {
return zkManager != null;
}
@Override
public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz)
throws IOException {
return get(clazz, (String)null);
}
@Override
public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz, String sub)
throws IOException {
verifyDriverReady();
List<T> ret = new ArrayList<>();
String znode = getZNodeForClass(clazz);
try {
List<String> children = zkManager.getChildren(znode);
for (String child : children) {
try {
String path = getNodePath(znode, child);
Stat stat = new Stat();
String data = zkManager.getStringData(path, stat);
boolean corrupted = false;
if (data == null || data.equals("")) {
// All records should have data, otherwise this is corrupted
corrupted = true;
} else {
try {
T record = createRecord(data, stat, clazz);
ret.add(record);
} catch (IOException e) {
LOG.error("Cannot create record type \"{}\" from \"{}\": {}",
clazz.getSimpleName(), data, e.getMessage());
corrupted = true;
}
}
if (corrupted) {
LOG.error("Cannot get data for {} at {}, cleaning corrupted data",
child, path);
zkManager.delete(path);
}
} catch (Exception e) {
LOG.error("Cannot get data for {}: {}", child, e.getMessage());
}
}
} catch (Exception e) {
String msg = "Cannot get children for \"" + znode + "\": " +
e.getMessage();
LOG.error(msg);
throw new IOException(msg);
}
return new QueryResult<T>(ret, getTime());
}
@Override
public <T extends BaseRecord> boolean putAll(
List<T> records, boolean update, boolean error) throws IOException {
verifyDriverReady();
if (records.isEmpty()) {
return true;
}
// All records should be the same
T record0 = records.get(0);
Class<? extends BaseRecord> recordClass = record0.getClass();
String znode = getZNodeForClass(recordClass);
boolean status = true;
for (T record : records) {
String primaryKey = getPrimaryKey(record);
String recordZNode = getNodePath(znode, primaryKey);
byte[] data = serialize(record);
if (!writeNode(recordZNode, data, update, error)){
status = false;
}
}
return status;
}
@Override
public <T extends BaseRecord> int remove(
Class<T> clazz, Query<T> query) throws IOException {
verifyDriverReady();
if (query == null) {
return 0;
}
// Read the current data
List<T> records = null;
try {
QueryResult<T> result = get(clazz);
records = result.getRecords();
} catch (IOException ex) {
LOG.error("Cannot get existing records", ex);
return 0;
}
// Check the records to remove
String znode = getZNodeForClass(clazz);
List<T> recordsToRemove = filterMultiple(query, records);
// Remove the records
int removed = 0;
for (T existingRecord : recordsToRemove) {
LOG.info("Removing \"{}\"", existingRecord);
try {
String primaryKey = getPrimaryKey(existingRecord);
String path = getNodePath(znode, primaryKey);
if (zkManager.delete(path)) {
removed++;
} else {
LOG.error("Did not remove \"{}\"", existingRecord);
}
} catch (Exception e) {
LOG.error("Cannot remove \"{}\"", existingRecord, e);
}
}
return removed;
}
@Override
public <T extends BaseRecord> boolean removeAll(Class<T> clazz)
throws IOException {
boolean status = true;
String znode = getZNodeForClass(clazz);
LOG.info("Deleting all children under {}", znode);
try {
List<String> children = zkManager.getChildren(znode);
for (String child : children) {
String path = getNodePath(znode, child);
LOG.info("Deleting {}", path);
zkManager.delete(path);
}
} catch (Exception e) {
LOG.error("Cannot remove {}: {}", znode, e.getMessage());
status = false;
}
return status;
}
private boolean writeNode(
String znode, byte[] bytes, boolean update, boolean error) {
try {
boolean created = zkManager.create(znode);
if (!update && !created && error) {
LOG.info("Cannot write record \"{}\", it already exists", znode);
return false;
}
// Write data
zkManager.setData(znode, bytes, -1);
return true;
} catch (Exception e) {
LOG.error("Cannot write record \"{}\": {}", znode, e.getMessage());
}
return false;
}
/**
* Get the ZNode for a class.
*
* @param clazz Record class to evaluate.
* @return The ZNode for the class.
*/
private <T extends BaseRecord> String getZNodeForClass(Class<T> clazz) {
String className = getRecordName(clazz);
return getNodePath(baseZNode, className);
}
/**
* Creates a record from a string returned by ZooKeeper.
*
* @param source Object from ZooKeeper.
* @param clazz The data record type to create.
* @return The created record.
* @throws IOException
*/
private <T extends BaseRecord> T createRecord(
String data, Stat stat, Class<T> clazz) throws IOException {
T record = newRecord(data, clazz, false);
record.setDateCreated(stat.getCtime());
record.setDateModified(stat.getMtime());
return record;
}
}

View File

@ -89,7 +89,7 @@ public class TestStateStoreDriverBase {
}
private String generateRandomString() {
String randomString = "/randomString-" + RANDOM.nextInt();
String randomString = "randomString-" + RANDOM.nextInt();
return randomString;
}

View File

@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.driver;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Test the ZooKeeper implementation of the State Store driver.
*/
public class TestStateStoreZK extends TestStateStoreDriverBase {
private static TestingServer curatorTestingServer;
private static CuratorFramework curatorFramework;
@BeforeClass
public static void setupCluster() throws Exception {
curatorTestingServer = new TestingServer();
curatorTestingServer.start();
String connectString = curatorTestingServer.getConnectString();
curatorFramework = CuratorFrameworkFactory.builder()
.connectString(connectString)
.retryPolicy(new RetryNTimes(100, 100))
.build();
curatorFramework.start();
// Create the ZK State Store
Configuration conf =
getStateStoreConfiguration(StateStoreZooKeeperImpl.class);
conf.set(CommonConfigurationKeys.ZK_ADDRESS, connectString);
// Disable auto-repair of connection
conf.setLong(DFSConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS,
TimeUnit.HOURS.toMillis(1));
getStateStore(conf);
}
@AfterClass
public static void tearDownCluster() {
curatorFramework.close();
try {
curatorTestingServer.stop();
} catch (IOException e) {
}
}
@Before
public void startup() throws IOException {
removeAll(getStateStoreDriver());
}
@Test
public void testInsert()
throws IllegalArgumentException, IllegalAccessException, IOException {
testInsert(getStateStoreDriver());
}
@Test
public void testUpdate()
throws IllegalArgumentException, ReflectiveOperationException,
IOException, SecurityException {
testPut(getStateStoreDriver());
}
@Test
public void testDelete()
throws IllegalArgumentException, IllegalAccessException, IOException {
testRemove(getStateStoreDriver());
}
@Test
public void testFetchErrors()
throws IllegalArgumentException, IllegalAccessException, IOException {
testFetchErrors(getStateStoreDriver());
}
}