diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index b645347c16b..1b66ead8625 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -18,12 +18,16 @@
package org.apache.hadoop.hdfs;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl;
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl;
import org.apache.hadoop.http.HttpConfig;
@@ -1029,6 +1033,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
FEDERATION_STORE_SERIALIZER_CLASS_DEFAULT =
StateStoreSerializerPBImpl.class;
+ public static final String FEDERATION_STORE_DRIVER_CLASS =
+ FEDERATION_STORE_PREFIX + "driver.class";
+ public static final Class extends StateStoreDriver>
+ FEDERATION_STORE_DRIVER_CLASS_DEFAULT = StateStoreFileImpl.class;
+
+ public static final String FEDERATION_STORE_CONNECTION_TEST_MS =
+ FEDERATION_STORE_PREFIX + "connection.test";
+ public static final long FEDERATION_STORE_CONNECTION_TEST_MS_DEFAULT =
+ TimeUnit.MINUTES.toMillis(1);
+
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java
new file mode 100644
index 00000000000..5e122224728
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.service.ServiceStateException;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Service to periodically execute a runnable.
+ */
+public abstract class PeriodicService extends AbstractService {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(PeriodicService.class);
+
+ /** Default interval in milliseconds for the periodic service. */
+ private static final long DEFAULT_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1);
+
+
+ /** Interval for running the periodic service in milliseconds. */
+ private long intervalMs;
+ /** Name of the service. */
+ private final String serviceName;
+
+ /** Scheduler for the periodic service. */
+ private final ScheduledExecutorService scheduler;
+
+ /** If the service is running. */
+ private volatile boolean isRunning = false;
+
+ /** How many times we run. */
+ private long runCount;
+ /** How many errors we got. */
+ private long errorCount;
+ /** When was the last time we executed this service successfully. */
+ private long lastRun;
+
+ /**
+ * Create a new periodic update service.
+ *
+ * @param name Name of the service.
+ */
+ public PeriodicService(String name) {
+ this(name, DEFAULT_INTERVAL_MS);
+ }
+
+ /**
+ * Create a new periodic update service.
+ *
+ * @param name Name of the service.
+ * @param interval Interval for the periodic service in milliseconds.
+ */
+ public PeriodicService(String name, long interval) {
+ super(name);
+ this.serviceName = name;
+ this.intervalMs = interval;
+
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setNameFormat(this.getName() + "-%d")
+ .build();
+ this.scheduler = Executors.newScheduledThreadPool(1, threadFactory);
+ }
+
+ /**
+ * Set the interval for the periodic service.
+ *
+ * @param interval Interval in milliseconds.
+ */
+ protected void setIntervalMs(long interval) {
+ if (getServiceState() == STATE.STARTED) {
+ throw new ServiceStateException("Periodic service already started");
+ } else {
+ this.intervalMs = interval;
+ }
+ }
+
+ /**
+ * Get the interval for the periodic service.
+ *
+ * @return Interval in milliseconds.
+ */
+ protected long getIntervalMs() {
+ return this.intervalMs;
+ }
+
+ /**
+ * Get how many times we failed to run the periodic service.
+ *
+ * @return Times we failed to run the periodic service.
+ */
+ protected long getErrorCount() {
+ return this.errorCount;
+ }
+
+ /**
+ * Get how many times we run the periodic service.
+ *
+ * @return Times we run the periodic service.
+ */
+ protected long getRunCount() {
+ return this.runCount;
+ }
+
+ /**
+ * Get the last time the periodic service was executed.
+ *
+ * @return Last time the periodic service was executed.
+ */
+ protected long getLastUpdate() {
+ return this.lastRun;
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ super.serviceStart();
+ LOG.info("Starting periodic service {}", this.serviceName);
+ startPeriodic();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ stopPeriodic();
+ LOG.info("Stopping periodic service {}", this.serviceName);
+ super.serviceStop();
+ }
+
+ /**
+ * Stop the periodic task.
+ */
+ protected synchronized void stopPeriodic() {
+ if (this.isRunning) {
+ LOG.info("{} is shutting down", this.serviceName);
+ this.isRunning = false;
+ this.scheduler.shutdownNow();
+ }
+ }
+
+ /**
+ * Start the periodic execution.
+ */
+ protected synchronized void startPeriodic() {
+ stopPeriodic();
+
+ // Create the runnable service
+ Runnable updateRunnable = new Runnable() {
+ @Override
+ public void run() {
+ LOG.debug("Running {} update task", serviceName);
+ try {
+ if (!isRunning) {
+ return;
+ }
+ periodicInvoke();
+ runCount++;
+ lastRun = Time.now();
+ } catch (Exception ex) {
+ errorCount++;
+ LOG.warn(serviceName + " service threw an exception", ex);
+ }
+ }
+ };
+
+ // Start the execution of the periodic service
+ this.isRunning = true;
+ this.scheduler.scheduleWithFixedDelay(
+ updateRunnable, 0, this.intervalMs, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Method that the service will run periodically.
+ */
+ protected abstract void periodicInvoke();
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java
new file mode 100644
index 00000000000..4d279c534e0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.PeriodicService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service to periodically monitor the connection of the StateStore
+ * {@link StateStoreService} data store and to re-open the connection
+ * to the data store if required.
+ */
+public class StateStoreConnectionMonitorService extends PeriodicService {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StateStoreConnectionMonitorService.class);
+
+ /** Service that maintains the State Store connection. */
+ private final StateStoreService stateStore;
+
+
+ /**
+ * Create a new service to monitor the connectivity of the state store driver.
+ *
+ * @param store Instance of the state store to be monitored.
+ */
+ public StateStoreConnectionMonitorService(StateStoreService store) {
+ super(StateStoreConnectionMonitorService.class.getSimpleName());
+ this.stateStore = store;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ this.setIntervalMs(conf.getLong(
+ DFSConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS,
+ DFSConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS_DEFAULT));
+
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public void periodicInvoke() {
+ LOG.debug("Checking state store connection");
+ if (!stateStore.isDriverReady()) {
+ LOG.info("Attempting to open state store driver.");
+ stateStore.loadDriver();
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
index 866daa30a2c..df207e0715f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
@@ -15,45 +15,168 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hdfs.server.federation.store;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
/**
* A service to initialize a
* {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver
- * StateStoreDriver} and maintain the connection to the data store. There
- * are multiple state store driver connections supported:
+ * StateStoreDriver} and maintain the connection to the data store. There are
+ * multiple state store driver connections supported:
*
- * The service also supports the dynamic registration of data interfaces such as
- * the following:
+ * The service also supports the dynamic registration of record stores like:
*
- *
{@link MembershipStateStore}: state of the Namenodes in the
+ *
{@link MembershipStore}: state of the Namenodes in the
* federation.
*
{@link MountTableStore}: Mount table between to subclusters.
* See {@link org.apache.hadoop.fs.viewfs.ViewFs ViewFs}.
- *
{@link RouterStateStore}: State of the routers in the federation.
*
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class StateStoreService extends CompositeService {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StateStoreService.class);
+
+
+ /** State Store configuration. */
+ private Configuration conf;
+
/** Identifier for the service. */
private String identifier;
- // Stub class
- public StateStoreService(String name) {
- super(name);
+ /** Driver for the back end connection. */
+ private StateStoreDriver driver;
+
+ /** Service to maintain data store connection. */
+ private StateStoreConnectionMonitorService monitorService;
+
+
+ public StateStoreService() {
+ super(StateStoreService.class.getName());
+ }
+
+ /**
+ * Initialize the State Store and the connection to the backend.
+ *
+ * @param config Configuration for the State Store.
+ * @throws IOException
+ */
+ @Override
+ protected void serviceInit(Configuration config) throws Exception {
+ this.conf = config;
+
+ // Create implementation of State Store
+ Class extends StateStoreDriver> driverClass = this.conf.getClass(
+ DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS,
+ DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS_DEFAULT,
+ StateStoreDriver.class);
+ this.driver = ReflectionUtils.newInstance(driverClass, this.conf);
+
+ if (this.driver == null) {
+ throw new IOException("Cannot create driver for the State Store");
+ }
+
+ // Check the connection to the State Store periodically
+ this.monitorService = new StateStoreConnectionMonitorService(this);
+ this.addService(monitorService);
+
+ super.serviceInit(this.conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ loadDriver();
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ closeDriver();
+
+ super.serviceStop();
+ }
+
+ /**
+ * List of records supported by this State Store.
+ *
+ * @return List of supported record classes.
+ */
+ public Collection> getSupportedRecords() {
+ // TODO add list of records
+ return new LinkedList<>();
+ }
+
+ /**
+ * Load the State Store driver. If successful, refresh cached data tables.
+ */
+ public void loadDriver() {
+ synchronized (this.driver) {
+ if (!isDriverReady()) {
+ String driverName = this.driver.getClass().getSimpleName();
+ if (this.driver.init(conf, getIdentifier(), getSupportedRecords())) {
+ LOG.info("Connection to the State Store driver {} is open and ready",
+ driverName);
+ } else {
+ LOG.error("Cannot initialize State Store driver {}", driverName);
+ }
+ }
+ }
+ }
+
+ /**
+ * Check if the driver is ready to be used.
+ *
+ * @return If the driver is ready.
+ */
+ public boolean isDriverReady() {
+ return this.driver.isDriverReady();
+ }
+
+ /**
+ * Manually shuts down the driver.
+ *
+ * @throws Exception If the driver cannot be closed.
+ */
+ @VisibleForTesting
+ public void closeDriver() throws Exception {
+ if (this.driver != null) {
+ this.driver.close();
+ }
+ }
+
+ /**
+ * Get the state store driver.
+ *
+ * @return State store driver.
+ */
+ public StateStoreDriver getDriver() {
+ return this.driver;
}
/**
@@ -74,4 +197,5 @@ public class StateStoreService extends CompositeService {
public void setIdentifier(String id) {
this.identifier = id;
}
-}
+
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java
index 8c681df8071..0a366192882 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java
@@ -17,17 +17,22 @@
*/
package org.apache.hadoop.hdfs.server.federation.store;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * Set of utility functions used to query, create, update and delete data
- * records in the state store.
+ * Set of utility functions used to work with the State Store.
*/
public final class StateStoreUtils {
- private static final Log LOG = LogFactory.getLog(StateStoreUtils.class);
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StateStoreUtils.class);
+
private StateStoreUtils() {
// Utility class
@@ -52,7 +57,7 @@ public final class StateStoreUtils {
// Check if we went too far
if (actualClazz.equals(BaseRecord.class)) {
- LOG.error("We went too far (" + actualClazz + ") with " + clazz);
+ LOG.error("We went too far ({}) with {}", actualClazz, clazz);
actualClazz = clazz;
}
return actualClazz;
@@ -69,4 +74,36 @@ public final class StateStoreUtils {
Class extends BaseRecord> getRecordClass(final T record) {
return getRecordClass(record.getClass());
}
-}
+
+ /**
+ * Get the base class name for a record. If we get an implementation of a
+ * record we will return the real parent record class.
+ *
+ * @param clazz Class of the data record to check.
+ * @return Name of the base class for the record.
+ */
+ public static String getRecordName(
+ final Class clazz) {
+ return getRecordClass(clazz).getSimpleName();
+ }
+
+ /**
+ * Filters a list of records to find all records matching the query.
+ *
+ * @param query Map of field names and objects to use to filter results.
+ * @param records List of data records to filter.
+ * @return List of all records matching the query (or empty list if none
+ * match), null if the data set could not be filtered.
+ */
+ public static List filterMultiple(
+ final Query query, final Iterable records) {
+
+ List matchingList = new ArrayList<>();
+ for (T record : records) {
+ if (query.matches(record)) {
+ matchingList.add(record);
+ }
+ }
+ return matchingList;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java
index a1527df418f..90111bfce9b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java
@@ -18,15 +18,16 @@
package org.apache.hadoop.hdfs.server.federation.store.driver;
import java.net.InetAddress;
-import java.util.List;
+import java.util.Collection;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Driver class for an implementation of a {@link StateStoreService}
@@ -35,7 +36,8 @@ import org.apache.hadoop.util.Time;
*/
public abstract class StateStoreDriver implements StateStoreRecordOperations {
- private static final Log LOG = LogFactory.getLog(StateStoreDriver.class);
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StateStoreDriver.class);
/** State Store configuration. */
@@ -47,13 +49,14 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations {
/**
* Initialize the state store connection.
+ *
* @param config Configuration for the driver.
* @param id Identifier for the driver.
* @param records Records that are supported.
* @return If initialized and ready, false if failed to initialize driver.
*/
public boolean init(final Configuration config, final String id,
- final List> records) {
+ final Collection> records) {
this.conf = config;
this.identifier = id;
@@ -62,8 +65,20 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations {
LOG.warn("The identifier for the State Store connection is not set");
}
- // TODO stub
- return false;
+ boolean success = initDriver();
+ if (!success) {
+ LOG.error("Cannot intialize driver for {}", getDriverName());
+ return false;
+ }
+
+ for (Class extends BaseRecord> cls : records) {
+ String recordString = StateStoreUtils.getRecordName(cls);
+ if (!initRecordStorage(recordString, cls)) {
+ LOG.error("Cannot initialize record store for {}", cls.getSimpleName());
+ return false;
+ }
+ }
+ return true;
}
/**
@@ -169,4 +184,4 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations {
}
return hostname;
}
-}
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java
index 739eeba2804..e76a733cb4c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java
@@ -19,11 +19,11 @@ package org.apache.hadoop.hdfs.server.federation.store.driver;
import java.io.IOException;
import java.util.List;
-import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
import org.apache.hadoop.io.retry.AtMostOnce;
import org.apache.hadoop.io.retry.Idempotent;
@@ -67,14 +67,14 @@ public interface StateStoreRecordOperations {
* Get a single record from the store that matches the query.
*
* @param clazz Class of record to fetch.
- * @param query Map of field names and objects to filter results.
+ * @param query Query to filter results.
* @return A single record matching the query. Null if there are no matching
* records or more than one matching record in the store.
* @throws IOException If multiple records match or if the data store cannot
* be queried.
*/
@Idempotent
- T get(Class clazz, Map query)
+ T get(Class clazz, Query query)
throws IOException;
/**
@@ -83,14 +83,14 @@ public interface StateStoreRecordOperations {
* supports filtering it should overwrite this method.
*
* @param clazz Class of record to fetch.
- * @param query Map of field names and objects to filter results.
+ * @param query Query to filter results.
* @return Records of type clazz that match the query or empty list if none
* are found.
* @throws IOException Throws exception if unable to query the data store.
*/
@Idempotent
List getMultiple(
- Class clazz, Map query) throws IOException;
+ Class clazz, Query query) throws IOException;
/**
* Creates a single record. Optionally updates an existing record with same
@@ -152,13 +152,12 @@ public interface StateStoreRecordOperations {
* Remove multiple records of a specific class that match a query. Requires
* the getAll implementation to fetch fresh records on each call.
*
- * @param clazz Class of record to remove.
- * @param filter matching filter to remove.
+ * @param query Query to filter what to remove.
* @return The number of records removed.
* @throws IOException Throws exception if unable to query the data store.
*/
@AtMostOnce
- int remove(Class clazz, Map filter)
+ int remove(Class clazz, Query query)
throws IOException;
-}
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java
index b711fa9bb09..1bd35f2556a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java
@@ -17,14 +17,17 @@
*/
package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
+import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
+import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
/**
* Base implementation of a State Store driver. It contains default
@@ -41,7 +44,7 @@ public abstract class StateStoreBaseImpl extends StateStoreDriver {
@Override
public T get(
- Class clazz, Map query) throws IOException {
+ Class clazz, Query query) throws IOException {
List records = getMultiple(clazz, query);
if (records.size() > 1) {
throw new IOException("Found more than one object in collection");
@@ -52,18 +55,32 @@ public abstract class StateStoreBaseImpl extends StateStoreDriver {
}
}
+ @Override
+ public List getMultiple(
+ Class clazz, Query query) throws IOException {
+ QueryResult result = get(clazz);
+ List records = result.getRecords();
+ List ret = filterMultiple(query, records);
+ if (ret == null) {
+ throw new IOException("Cannot fetch records from the store");
+ }
+ return ret;
+ }
+
@Override
public boolean put(
T record, boolean allowUpdate, boolean errorIfExists) throws IOException {
- List singletonList = new ArrayList();
+ List singletonList = new ArrayList<>();
singletonList.add(record);
return putAll(singletonList, allowUpdate, errorIfExists);
}
@Override
public boolean remove(T record) throws IOException {
- Map primaryKeys = record.getPrimaryKeys();
- Class extends BaseRecord> clazz = StateStoreUtils.getRecordClass(record);
- return remove(clazz, primaryKeys) == 1;
+ final Query query = new Query(record);
+ Class extends BaseRecord> clazz = record.getClass();
+ @SuppressWarnings("unchecked")
+ Class recordClass = (Class)StateStoreUtils.getRecordClass(clazz);
+ return remove(recordClass, query) == 1;
}
-}
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
new file mode 100644
index 00000000000..d7c00ffc1d9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
@@ -0,0 +1,429 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
+
+import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
+import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.getRecordClass;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
+import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link StateStoreDriver} implementation based on a local file.
+ */
+public abstract class StateStoreFileBaseImpl
+ extends StateStoreSerializableImpl {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StateStoreFileBaseImpl.class);
+
+ /** If it is initialized. */
+ private boolean initialized = false;
+
+ /** Name of the file containing the data. */
+ private static final String DATA_FILE_NAME = "records.data";
+
+
+ /**
+ * Lock reading records.
+ *
+ * @param clazz Class of the record.
+ */
+ protected abstract void lockRecordRead(Class clazz);
+
+ /**
+ * Unlock reading records.
+ *
+ * @param clazz Class of the record.
+ */
+ protected abstract void unlockRecordRead(
+ Class clazz);
+
+ /**
+ * Lock writing records.
+ *
+ * @param clazz Class of the record.
+ */
+ protected abstract void lockRecordWrite(
+ Class clazz);
+
+ /**
+ * Unlock writing records.
+ *
+ * @param clazz Class of the record.
+ */
+ protected abstract void unlockRecordWrite(
+ Class clazz);
+
+ /**
+ * Get the reader for the file system.
+ *
+ * @param clazz Class of the record.
+ */
+ protected abstract BufferedReader getReader(
+ Class clazz, String sub);
+
+ /**
+ * Get the writer for the file system.
+ *
+ * @param clazz Class of the record.
+ */
+ protected abstract BufferedWriter getWriter(
+ Class clazz, String sub);
+
+ /**
+ * Check if a path exists.
+ *
+ * @param path Path to check.
+ * @return If the path exists.
+ */
+ protected abstract boolean exists(String path);
+
+ /**
+ * Make a directory.
+ *
+ * @param path Path of the directory to create.
+ * @return If the directory was created.
+ */
+ protected abstract boolean mkdir(String path);
+
+ /**
+ * Get root directory.
+ *
+ * @return Root directory.
+ */
+ protected abstract String getRootDir();
+
+ /**
+ * Set the driver as initialized.
+ *
+ * @param ini If the driver is initialized.
+ */
+ public void setInitialized(boolean ini) {
+ this.initialized = ini;
+ }
+
+ @Override
+ public boolean initDriver() {
+ String rootDir = getRootDir();
+ try {
+ if (rootDir == null) {
+ LOG.error("Invalid root directory, unable to initialize driver.");
+ return false;
+ }
+
+ // Check root path
+ if (!exists(rootDir)) {
+ if (!mkdir(rootDir)) {
+ LOG.error("Cannot create State Store root directory {}", rootDir);
+ return false;
+ }
+ }
+ } catch (Exception ex) {
+ LOG.error(
+ "Cannot initialize filesystem using root directory {}", rootDir, ex);
+ return false;
+ }
+ setInitialized(true);
+ return true;
+ }
+
+ @Override
+ public boolean initRecordStorage(
+ String className, Class recordClass) {
+
+ String dataDirPath = getRootDir() + "/" + className;
+ try {
+ // Create data directories for files
+ if (!exists(dataDirPath)) {
+ LOG.info("{} data directory doesn't exist, creating it", dataDirPath);
+ if (!mkdir(dataDirPath)) {
+ LOG.error("Cannot create data directory {}", dataDirPath);
+ return false;
+ }
+ String dataFilePath = dataDirPath + "/" + DATA_FILE_NAME;
+ if (!exists(dataFilePath)) {
+ // Create empty file
+ List emtpyList = new ArrayList<>();
+ if(!writeAll(emtpyList, recordClass)) {
+ LOG.error("Cannot create data file {}", dataFilePath);
+ return false;
+ }
+ }
+ }
+ } catch (Exception ex) {
+ LOG.error("Cannot create data directory {}", dataDirPath, ex);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Read all lines from a file and deserialize into the desired record type.
+ *
+ * @param reader Open handle for the file.
+ * @param recordClass Record class to create.
+ * @param includeDates True if dateModified/dateCreated are serialized.
+ * @return List of records.
+ * @throws IOException
+ */
+ private List getAllFile(
+ BufferedReader reader, Class clazz, boolean includeDates)
+ throws IOException {
+
+ List ret = new ArrayList();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ if (!line.startsWith("#") && line.length() > 0) {
+ try {
+ T record = newRecord(line, clazz, includeDates);
+ ret.add(record);
+ } catch (Exception ex) {
+ LOG.error("Cannot parse line in data source file: {}", line, ex);
+ }
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public QueryResult get(Class clazz)
+ throws IOException {
+ return get(clazz, (String)null);
+ }
+
+ @Override
+ public QueryResult get(Class clazz, String sub)
+ throws IOException {
+ verifyDriverReady();
+ BufferedReader reader = null;
+ lockRecordRead(clazz);
+ try {
+ reader = getReader(clazz, sub);
+ List data = getAllFile(reader, clazz, true);
+ return new QueryResult(data, getTime());
+ } catch (Exception ex) {
+ LOG.error("Cannot fetch records {}", clazz.getSimpleName());
+ throw new IOException("Cannot read from data store " + ex.getMessage());
+ } finally {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ LOG.error("Failed closing file", e);
+ }
+ }
+ unlockRecordRead(clazz);
+ }
+ }
+
+ /**
+ * Overwrite the existing data with a new data set.
+ *
+ * @param list List of records to write.
+ * @param writer BufferedWriter stream to write to.
+ * @return If the records were succesfully written.
+ */
+ private boolean writeAllFile(
+ Collection records, BufferedWriter writer) {
+
+ try {
+ for (BaseRecord record : records) {
+ try {
+ String data = serializeString(record);
+ writer.write(data);
+ writer.newLine();
+ } catch (IllegalArgumentException ex) {
+ LOG.error("Cannot write record {} to file", record, ex);
+ }
+ }
+ writer.flush();
+ return true;
+ } catch (IOException e) {
+ LOG.error("Cannot commit records to file", e);
+ return false;
+ }
+ }
+
+ /**
+ * Overwrite the existing data with a new data set. Replaces all records in
+ * the data store for this record class. If all records in the data store are
+ * not successfully committed, this function must return false and leave the
+ * data store unchanged.
+ *
+ * @param records List of records to write. All records must be of type
+ * recordClass.
+ * @param recordClass Class of record to replace.
+ * @return true if all operations were successful, false otherwise.
+ * @throws StateStoreUnavailableException
+ */
+ public boolean writeAll(
+ Collection records, Class recordClass)
+ throws StateStoreUnavailableException {
+ verifyDriverReady();
+ lockRecordWrite(recordClass);
+ BufferedWriter writer = null;
+ try {
+ writer = getWriter(recordClass, null);
+ return writeAllFile(records, writer);
+ } catch (Exception e) {
+ LOG.error(
+ "Cannot add records to file for {}", recordClass.getSimpleName(), e);
+ return false;
+ } finally {
+ if (writer != null) {
+ try {
+ writer.close();
+ } catch (IOException e) {
+ LOG.error(
+ "Cannot close writer for {}", recordClass.getSimpleName(), e);
+ }
+ }
+ unlockRecordWrite(recordClass);
+ }
+ }
+
+ /**
+ * Get the data file name.
+ *
+ * @return Data file name.
+ */
+ protected String getDataFileName() {
+ return DATA_FILE_NAME;
+ }
+
+ @Override
+ public boolean isDriverReady() {
+ return this.initialized;
+ }
+
+ @Override
+ public boolean putAll(
+ List records, boolean allowUpdate, boolean errorIfExists)
+ throws StateStoreUnavailableException {
+ verifyDriverReady();
+
+ if (records.isEmpty()) {
+ return true;
+ }
+
+ @SuppressWarnings("unchecked")
+ Class clazz = (Class) getRecordClass(records.get(0).getClass());
+ QueryResult result;
+ try {
+ result = get(clazz);
+ } catch (IOException e) {
+ return false;
+ }
+ Map