clazz, boolean includeDates) throws IOException {
return serializer.deserialize(data, clazz);
}
+
+ /**
+ * Get the primary key for a record. If we don't want to store in folders, we
+ * need to remove / from the name.
+ *
+ * @param record Record to get the primary key for.
+ * @return Primary key for the record.
+ */
+ protected static String getPrimaryKey(BaseRecord record) {
+ String primaryKey = record.getPrimaryKey();
+ primaryKey = primaryKey.replaceAll("/", SLASH_MARK);
+ primaryKey = primaryKey.replaceAll(":", COLON_MARK);
+ return primaryKey;
+ }
}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
new file mode 100644
index 00000000000..ddcd537040b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
+
+import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
+import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.getRecordName;
+import static org.apache.hadoop.util.curator.ZKCuratorManager.getNodePath;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
+import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
+import org.apache.hadoop.util.curator.ZKCuratorManager;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link StateStoreDriver} driver implementation that uses ZooKeeper as a
+ * backend.
+ *
+ * The structure of the znodes in the ensemble is:
+ * PARENT_PATH
+ * |--- MOUNT
+ * |--- MEMBERSHIP
+ * |--- REBALANCER
+ * |--- ROUTERS
+ */
+public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StateStoreZooKeeperImpl.class);
+
+
+ /** Configuration keys. */
+ public static final String FEDERATION_STORE_ZK_DRIVER_PREFIX =
+ DFSConfigKeys.FEDERATION_STORE_PREFIX + "driver.zk.";
+ public static final String FEDERATION_STORE_ZK_PARENT_PATH =
+ FEDERATION_STORE_ZK_DRIVER_PREFIX + "parent-path";
+ public static final String FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT =
+ "/hdfs-federation";
+
+
+ /** Directory to store the state store data. */
+ private String baseZNode;
+
+ /** Interface to ZooKeeper. */
+ private ZKCuratorManager zkManager;
+
+
+ @Override
+ public boolean initDriver() {
+ LOG.info("Initializing ZooKeeper connection");
+
+ Configuration conf = getConf();
+ baseZNode = conf.get(
+ FEDERATION_STORE_ZK_PARENT_PATH,
+ FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT);
+ try {
+ this.zkManager = new ZKCuratorManager(conf);
+ this.zkManager.start();
+ } catch (IOException e) {
+ LOG.error("Cannot initialize the ZK connection", e);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean initRecordStorage(
+ String className, Class clazz) {
+ try {
+ String checkPath = getNodePath(baseZNode, className);
+ zkManager.createRootDirRecursively(checkPath);
+ return true;
+ } catch (Exception e) {
+ LOG.error("Cannot initialize ZK node for {}: {}",
+ className, e.getMessage());
+ return false;
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (zkManager != null) {
+ zkManager.close();
+ }
+ }
+
+ @Override
+ public boolean isDriverReady() {
+ return zkManager != null;
+ }
+
+ @Override
+ public QueryResult get(Class clazz)
+ throws IOException {
+ return get(clazz, (String)null);
+ }
+
+ @Override
+ public QueryResult get(Class clazz, String sub)
+ throws IOException {
+ verifyDriverReady();
+ List ret = new ArrayList<>();
+ String znode = getZNodeForClass(clazz);
+ try {
+ List children = zkManager.getChildren(znode);
+ for (String child : children) {
+ try {
+ String path = getNodePath(znode, child);
+ Stat stat = new Stat();
+ String data = zkManager.getStringData(path, stat);
+ boolean corrupted = false;
+ if (data == null || data.equals("")) {
+ // All records should have data, otherwise this is corrupted
+ corrupted = true;
+ } else {
+ try {
+ T record = createRecord(data, stat, clazz);
+ ret.add(record);
+ } catch (IOException e) {
+ LOG.error("Cannot create record type \"{}\" from \"{}\": {}",
+ clazz.getSimpleName(), data, e.getMessage());
+ corrupted = true;
+ }
+ }
+
+ if (corrupted) {
+ LOG.error("Cannot get data for {} at {}, cleaning corrupted data",
+ child, path);
+ zkManager.delete(path);
+ }
+ } catch (Exception e) {
+ LOG.error("Cannot get data for {}: {}", child, e.getMessage());
+ }
+ }
+ } catch (Exception e) {
+ String msg = "Cannot get children for \"" + znode + "\": " +
+ e.getMessage();
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+ return new QueryResult(ret, getTime());
+ }
+
+ @Override
+ public boolean putAll(
+ List records, boolean update, boolean error) throws IOException {
+ verifyDriverReady();
+ if (records.isEmpty()) {
+ return true;
+ }
+
+ // All records should be the same
+ T record0 = records.get(0);
+ Class extends BaseRecord> recordClass = record0.getClass();
+ String znode = getZNodeForClass(recordClass);
+
+ boolean status = true;
+ for (T record : records) {
+ String primaryKey = getPrimaryKey(record);
+ String recordZNode = getNodePath(znode, primaryKey);
+ byte[] data = serialize(record);
+ if (!writeNode(recordZNode, data, update, error)){
+ status = false;
+ }
+ }
+ return status;
+ }
+
+ @Override
+ public int remove(
+ Class clazz, Query query) throws IOException {
+ verifyDriverReady();
+ if (query == null) {
+ return 0;
+ }
+
+ // Read the current data
+ List records = null;
+ try {
+ QueryResult result = get(clazz);
+ records = result.getRecords();
+ } catch (IOException ex) {
+ LOG.error("Cannot get existing records", ex);
+ return 0;
+ }
+
+ // Check the records to remove
+ String znode = getZNodeForClass(clazz);
+ List recordsToRemove = filterMultiple(query, records);
+
+ // Remove the records
+ int removed = 0;
+ for (T existingRecord : recordsToRemove) {
+ LOG.info("Removing \"{}\"", existingRecord);
+ try {
+ String primaryKey = getPrimaryKey(existingRecord);
+ String path = getNodePath(znode, primaryKey);
+ if (zkManager.delete(path)) {
+ removed++;
+ } else {
+ LOG.error("Did not remove \"{}\"", existingRecord);
+ }
+ } catch (Exception e) {
+ LOG.error("Cannot remove \"{}\"", existingRecord, e);
+ }
+ }
+ return removed;
+ }
+
+ @Override
+ public boolean removeAll(Class clazz)
+ throws IOException {
+ boolean status = true;
+ String znode = getZNodeForClass(clazz);
+ LOG.info("Deleting all children under {}", znode);
+ try {
+ List children = zkManager.getChildren(znode);
+ for (String child : children) {
+ String path = getNodePath(znode, child);
+ LOG.info("Deleting {}", path);
+ zkManager.delete(path);
+ }
+ } catch (Exception e) {
+ LOG.error("Cannot remove {}: {}", znode, e.getMessage());
+ status = false;
+ }
+ return status;
+ }
+
+ private boolean writeNode(
+ String znode, byte[] bytes, boolean update, boolean error) {
+ try {
+ boolean created = zkManager.create(znode);
+ if (!update && !created && error) {
+ LOG.info("Cannot write record \"{}\", it already exists", znode);
+ return false;
+ }
+
+ // Write data
+ zkManager.setData(znode, bytes, -1);
+ return true;
+ } catch (Exception e) {
+ LOG.error("Cannot write record \"{}\": {}", znode, e.getMessage());
+ }
+ return false;
+ }
+
+ /**
+ * Get the ZNode for a class.
+ *
+ * @param clazz Record class to evaluate.
+ * @return The ZNode for the class.
+ */
+ private String getZNodeForClass(Class clazz) {
+ String className = getRecordName(clazz);
+ return getNodePath(baseZNode, className);
+ }
+
+ /**
+ * Creates a record from a string returned by ZooKeeper.
+ *
+ * @param source Object from ZooKeeper.
+ * @param clazz The data record type to create.
+ * @return The created record.
+ * @throws IOException
+ */
+ private T createRecord(
+ String data, Stat stat, Class clazz) throws IOException {
+ T record = newRecord(data, clazz, false);
+ record.setDateCreated(stat.getCtime());
+ record.setDateModified(stat.getMtime());
+ return record;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
index 8239fb125a5..65e763bf18a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
@@ -89,7 +89,7 @@ public class TestStateStoreDriverBase {
}
private String generateRandomString() {
- String randomString = "/randomString-" + RANDOM.nextInt();
+ String randomString = "randomString-" + RANDOM.nextInt();
return randomString;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java
new file mode 100644
index 00000000000..36353ff9fd6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver;
+
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test the ZooKeeper implementation of the State Store driver.
+ */
+public class TestStateStoreZK extends TestStateStoreDriverBase {
+
+ private static TestingServer curatorTestingServer;
+ private static CuratorFramework curatorFramework;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ curatorTestingServer = new TestingServer();
+ curatorTestingServer.start();
+ String connectString = curatorTestingServer.getConnectString();
+ curatorFramework = CuratorFrameworkFactory.builder()
+ .connectString(connectString)
+ .retryPolicy(new RetryNTimes(100, 100))
+ .build();
+ curatorFramework.start();
+
+ // Create the ZK State Store
+ Configuration conf =
+ getStateStoreConfiguration(StateStoreZooKeeperImpl.class);
+ conf.set(CommonConfigurationKeys.ZK_ADDRESS, connectString);
+ // Disable auto-repair of connection
+ conf.setLong(DFSConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS,
+ TimeUnit.HOURS.toMillis(1));
+ getStateStore(conf);
+ }
+
+ @AfterClass
+ public static void tearDownCluster() {
+ curatorFramework.close();
+ try {
+ curatorTestingServer.stop();
+ } catch (IOException e) {
+ }
+ }
+
+ @Before
+ public void startup() throws IOException {
+ removeAll(getStateStoreDriver());
+ }
+
+ @Test
+ public void testInsert()
+ throws IllegalArgumentException, IllegalAccessException, IOException {
+ testInsert(getStateStoreDriver());
+ }
+
+ @Test
+ public void testUpdate()
+ throws IllegalArgumentException, ReflectiveOperationException,
+ IOException, SecurityException {
+ testPut(getStateStoreDriver());
+ }
+
+ @Test
+ public void testDelete()
+ throws IllegalArgumentException, IllegalAccessException, IOException {
+ testRemove(getStateStoreDriver());
+ }
+
+ @Test
+ public void testFetchErrors()
+ throws IllegalArgumentException, IllegalAccessException, IOException {
+ testFetchErrors(getStateStoreDriver());
+ }
+}
\ No newline at end of file