From 93687da438f879c4c2c0016216056fb570012ee2 Mon Sep 17 00:00:00 2001 From: Inigo Goiri Date: Tue, 2 May 2017 15:49:53 -0700 Subject: [PATCH] HDFS-10630. Federation State Store FS Implementation. Contributed by Jason Kace and Inigo Goiri. (cherry picked from commit c6e0bd640cdaf83a660fa050809cad6f1d4c6f4d) (cherry picked from commit 4bf877b03f0e01c4bcedc689c66689701e62b560) --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 14 + .../federation/router/PeriodicService.java | 198 +++++++ .../StateStoreConnectionMonitorService.java | 67 +++ .../federation/store/StateStoreService.java | 152 +++++- .../federation/store/StateStoreUtils.java | 51 +- .../store/driver/StateStoreDriver.java | 31 +- .../driver/StateStoreRecordOperations.java | 17 +- .../store/driver/impl/StateStoreBaseImpl.java | 31 +- .../driver/impl/StateStoreFileBaseImpl.java | 429 ++++++++++++++++ .../store/driver/impl/StateStoreFileImpl.java | 161 ++++++ .../driver/impl/StateStoreFileSystemImpl.java | 178 +++++++ .../impl/StateStoreSerializableImpl.java | 77 +++ .../federation/store/records/BaseRecord.java | 20 +- .../federation/store/records/Query.java | 66 +++ .../src/main/resources/hdfs-default.xml | 16 + .../store/FederationStateStoreTestUtils.java | 232 +++++++++ .../driver/TestStateStoreDriverBase.java | 483 ++++++++++++++++++ .../store/driver/TestStateStoreFile.java | 64 +++ .../driver/TestStateStoreFileSystem.java | 88 ++++ 19 files changed, 2329 insertions(+), 46 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/Query.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFile.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index b645347c16b..1b66ead8625 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -18,12 +18,16 @@ package org.apache.hadoop.hdfs; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl; import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl; import org.apache.hadoop.http.HttpConfig; @@ -1029,6 +1033,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys { FEDERATION_STORE_SERIALIZER_CLASS_DEFAULT = StateStoreSerializerPBImpl.class; + public static final String FEDERATION_STORE_DRIVER_CLASS = + FEDERATION_STORE_PREFIX + "driver.class"; + public static final Class + FEDERATION_STORE_DRIVER_CLASS_DEFAULT = StateStoreFileImpl.class; + + public static final String FEDERATION_STORE_CONNECTION_TEST_MS = + FEDERATION_STORE_PREFIX + "connection.test"; + public static final long FEDERATION_STORE_CONNECTION_TEST_MS_DEFAULT = + TimeUnit.MINUTES.toMillis(1); + // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry @Deprecated public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java new file mode 100644 index 00000000000..5e122224728 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.service.ServiceStateException; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Service to periodically execute a runnable. + */ +public abstract class PeriodicService extends AbstractService { + + private static final Logger LOG = + LoggerFactory.getLogger(PeriodicService.class); + + /** Default interval in milliseconds for the periodic service. */ + private static final long DEFAULT_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1); + + + /** Interval for running the periodic service in milliseconds. */ + private long intervalMs; + /** Name of the service. */ + private final String serviceName; + + /** Scheduler for the periodic service. */ + private final ScheduledExecutorService scheduler; + + /** If the service is running. */ + private volatile boolean isRunning = false; + + /** How many times we run. */ + private long runCount; + /** How many errors we got. */ + private long errorCount; + /** When was the last time we executed this service successfully. */ + private long lastRun; + + /** + * Create a new periodic update service. + * + * @param name Name of the service. + */ + public PeriodicService(String name) { + this(name, DEFAULT_INTERVAL_MS); + } + + /** + * Create a new periodic update service. + * + * @param name Name of the service. + * @param interval Interval for the periodic service in milliseconds. + */ + public PeriodicService(String name, long interval) { + super(name); + this.serviceName = name; + this.intervalMs = interval; + + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat(this.getName() + "-%d") + .build(); + this.scheduler = Executors.newScheduledThreadPool(1, threadFactory); + } + + /** + * Set the interval for the periodic service. + * + * @param interval Interval in milliseconds. + */ + protected void setIntervalMs(long interval) { + if (getServiceState() == STATE.STARTED) { + throw new ServiceStateException("Periodic service already started"); + } else { + this.intervalMs = interval; + } + } + + /** + * Get the interval for the periodic service. + * + * @return Interval in milliseconds. + */ + protected long getIntervalMs() { + return this.intervalMs; + } + + /** + * Get how many times we failed to run the periodic service. + * + * @return Times we failed to run the periodic service. + */ + protected long getErrorCount() { + return this.errorCount; + } + + /** + * Get how many times we run the periodic service. + * + * @return Times we run the periodic service. + */ + protected long getRunCount() { + return this.runCount; + } + + /** + * Get the last time the periodic service was executed. + * + * @return Last time the periodic service was executed. + */ + protected long getLastUpdate() { + return this.lastRun; + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + LOG.info("Starting periodic service {}", this.serviceName); + startPeriodic(); + } + + @Override + protected void serviceStop() throws Exception { + stopPeriodic(); + LOG.info("Stopping periodic service {}", this.serviceName); + super.serviceStop(); + } + + /** + * Stop the periodic task. + */ + protected synchronized void stopPeriodic() { + if (this.isRunning) { + LOG.info("{} is shutting down", this.serviceName); + this.isRunning = false; + this.scheduler.shutdownNow(); + } + } + + /** + * Start the periodic execution. + */ + protected synchronized void startPeriodic() { + stopPeriodic(); + + // Create the runnable service + Runnable updateRunnable = new Runnable() { + @Override + public void run() { + LOG.debug("Running {} update task", serviceName); + try { + if (!isRunning) { + return; + } + periodicInvoke(); + runCount++; + lastRun = Time.now(); + } catch (Exception ex) { + errorCount++; + LOG.warn(serviceName + " service threw an exception", ex); + } + } + }; + + // Start the execution of the periodic service + this.isRunning = true; + this.scheduler.scheduleWithFixedDelay( + updateRunnable, 0, this.intervalMs, TimeUnit.MILLISECONDS); + } + + /** + * Method that the service will run periodically. + */ + protected abstract void periodicInvoke(); +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java new file mode 100644 index 00000000000..4d279c534e0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.PeriodicService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service to periodically monitor the connection of the StateStore + * {@link StateStoreService} data store and to re-open the connection + * to the data store if required. + */ +public class StateStoreConnectionMonitorService extends PeriodicService { + + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreConnectionMonitorService.class); + + /** Service that maintains the State Store connection. */ + private final StateStoreService stateStore; + + + /** + * Create a new service to monitor the connectivity of the state store driver. + * + * @param store Instance of the state store to be monitored. + */ + public StateStoreConnectionMonitorService(StateStoreService store) { + super(StateStoreConnectionMonitorService.class.getSimpleName()); + this.stateStore = store; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.setIntervalMs(conf.getLong( + DFSConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS, + DFSConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS_DEFAULT)); + + super.serviceInit(conf); + } + + @Override + public void periodicInvoke() { + LOG.debug("Checking state store connection"); + if (!stateStore.isDriverReady()) { + LOG.info("Attempting to open state store driver."); + stateStore.loadDriver(); + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java index 866daa30a2c..df207e0715f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java @@ -15,45 +15,168 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hdfs.server.federation.store; +import java.io.IOException; +import java.util.Collection; +import java.util.LinkedList; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.ReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; /** * A service to initialize a * {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver - * StateStoreDriver} and maintain the connection to the data store. There - * are multiple state store driver connections supported: + * StateStoreDriver} and maintain the connection to the data store. There are + * multiple state store driver connections supported: * *

- * The service also supports the dynamic registration of data interfaces such as - * the following: + * The service also supports the dynamic registration of record stores like: *

*/ @InterfaceAudience.Private @InterfaceStability.Evolving public class StateStoreService extends CompositeService { + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreService.class); + + + /** State Store configuration. */ + private Configuration conf; + /** Identifier for the service. */ private String identifier; - // Stub class - public StateStoreService(String name) { - super(name); + /** Driver for the back end connection. */ + private StateStoreDriver driver; + + /** Service to maintain data store connection. */ + private StateStoreConnectionMonitorService monitorService; + + + public StateStoreService() { + super(StateStoreService.class.getName()); + } + + /** + * Initialize the State Store and the connection to the backend. + * + * @param config Configuration for the State Store. + * @throws IOException + */ + @Override + protected void serviceInit(Configuration config) throws Exception { + this.conf = config; + + // Create implementation of State Store + Class driverClass = this.conf.getClass( + DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS, + DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS_DEFAULT, + StateStoreDriver.class); + this.driver = ReflectionUtils.newInstance(driverClass, this.conf); + + if (this.driver == null) { + throw new IOException("Cannot create driver for the State Store"); + } + + // Check the connection to the State Store periodically + this.monitorService = new StateStoreConnectionMonitorService(this); + this.addService(monitorService); + + super.serviceInit(this.conf); + } + + @Override + protected void serviceStart() throws Exception { + loadDriver(); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + closeDriver(); + + super.serviceStop(); + } + + /** + * List of records supported by this State Store. + * + * @return List of supported record classes. + */ + public Collection> getSupportedRecords() { + // TODO add list of records + return new LinkedList<>(); + } + + /** + * Load the State Store driver. If successful, refresh cached data tables. + */ + public void loadDriver() { + synchronized (this.driver) { + if (!isDriverReady()) { + String driverName = this.driver.getClass().getSimpleName(); + if (this.driver.init(conf, getIdentifier(), getSupportedRecords())) { + LOG.info("Connection to the State Store driver {} is open and ready", + driverName); + } else { + LOG.error("Cannot initialize State Store driver {}", driverName); + } + } + } + } + + /** + * Check if the driver is ready to be used. + * + * @return If the driver is ready. + */ + public boolean isDriverReady() { + return this.driver.isDriverReady(); + } + + /** + * Manually shuts down the driver. + * + * @throws Exception If the driver cannot be closed. + */ + @VisibleForTesting + public void closeDriver() throws Exception { + if (this.driver != null) { + this.driver.close(); + } + } + + /** + * Get the state store driver. + * + * @return State store driver. + */ + public StateStoreDriver getDriver() { + return this.driver; } /** @@ -74,4 +197,5 @@ public class StateStoreService extends CompositeService { public void setIdentifier(String id) { this.identifier = id; } -} + +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java index 8c681df8071..0a366192882 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java @@ -17,17 +17,22 @@ */ package org.apache.hadoop.hdfs.server.federation.store; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import java.util.ArrayList; +import java.util.List; + import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * Set of utility functions used to query, create, update and delete data - * records in the state store. + * Set of utility functions used to work with the State Store. */ public final class StateStoreUtils { - private static final Log LOG = LogFactory.getLog(StateStoreUtils.class); + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreUtils.class); + private StateStoreUtils() { // Utility class @@ -52,7 +57,7 @@ public final class StateStoreUtils { // Check if we went too far if (actualClazz.equals(BaseRecord.class)) { - LOG.error("We went too far (" + actualClazz + ") with " + clazz); + LOG.error("We went too far ({}) with {}", actualClazz, clazz); actualClazz = clazz; } return actualClazz; @@ -69,4 +74,36 @@ public final class StateStoreUtils { Class getRecordClass(final T record) { return getRecordClass(record.getClass()); } -} + + /** + * Get the base class name for a record. If we get an implementation of a + * record we will return the real parent record class. + * + * @param clazz Class of the data record to check. + * @return Name of the base class for the record. + */ + public static String getRecordName( + final Class clazz) { + return getRecordClass(clazz).getSimpleName(); + } + + /** + * Filters a list of records to find all records matching the query. + * + * @param query Map of field names and objects to use to filter results. + * @param records List of data records to filter. + * @return List of all records matching the query (or empty list if none + * match), null if the data set could not be filtered. + */ + public static List filterMultiple( + final Query query, final Iterable records) { + + List matchingList = new ArrayList<>(); + for (T record : records) { + if (query.matches(record)) { + matchingList.add(record); + } + } + return matchingList; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java index a1527df418f..90111bfce9b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java @@ -18,15 +18,16 @@ package org.apache.hadoop.hdfs.server.federation.store.driver; import java.net.InetAddress; -import java.util.List; +import java.util.Collection; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Driver class for an implementation of a {@link StateStoreService} @@ -35,7 +36,8 @@ import org.apache.hadoop.util.Time; */ public abstract class StateStoreDriver implements StateStoreRecordOperations { - private static final Log LOG = LogFactory.getLog(StateStoreDriver.class); + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreDriver.class); /** State Store configuration. */ @@ -47,13 +49,14 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations { /** * Initialize the state store connection. + * * @param config Configuration for the driver. * @param id Identifier for the driver. * @param records Records that are supported. * @return If initialized and ready, false if failed to initialize driver. */ public boolean init(final Configuration config, final String id, - final List> records) { + final Collection> records) { this.conf = config; this.identifier = id; @@ -62,8 +65,20 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations { LOG.warn("The identifier for the State Store connection is not set"); } - // TODO stub - return false; + boolean success = initDriver(); + if (!success) { + LOG.error("Cannot intialize driver for {}", getDriverName()); + return false; + } + + for (Class cls : records) { + String recordString = StateStoreUtils.getRecordName(cls); + if (!initRecordStorage(recordString, cls)) { + LOG.error("Cannot initialize record store for {}", cls.getSimpleName()); + return false; + } + } + return true; } /** @@ -169,4 +184,4 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations { } return hostname; } -} +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java index 739eeba2804..e76a733cb4c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java @@ -19,11 +19,11 @@ package org.apache.hadoop.hdfs.server.federation.store.driver; import java.io.IOException; import java.util.List; -import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.Query; import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; import org.apache.hadoop.io.retry.AtMostOnce; import org.apache.hadoop.io.retry.Idempotent; @@ -67,14 +67,14 @@ public interface StateStoreRecordOperations { * Get a single record from the store that matches the query. * * @param clazz Class of record to fetch. - * @param query Map of field names and objects to filter results. + * @param query Query to filter results. * @return A single record matching the query. Null if there are no matching * records or more than one matching record in the store. * @throws IOException If multiple records match or if the data store cannot * be queried. */ @Idempotent - T get(Class clazz, Map query) + T get(Class clazz, Query query) throws IOException; /** @@ -83,14 +83,14 @@ public interface StateStoreRecordOperations { * supports filtering it should overwrite this method. * * @param clazz Class of record to fetch. - * @param query Map of field names and objects to filter results. + * @param query Query to filter results. * @return Records of type clazz that match the query or empty list if none * are found. * @throws IOException Throws exception if unable to query the data store. */ @Idempotent List getMultiple( - Class clazz, Map query) throws IOException; + Class clazz, Query query) throws IOException; /** * Creates a single record. Optionally updates an existing record with same @@ -152,13 +152,12 @@ public interface StateStoreRecordOperations { * Remove multiple records of a specific class that match a query. Requires * the getAll implementation to fetch fresh records on each call. * - * @param clazz Class of record to remove. - * @param filter matching filter to remove. + * @param query Query to filter what to remove. * @return The number of records removed. * @throws IOException Throws exception if unable to query the data store. */ @AtMostOnce - int remove(Class clazz, Map filter) + int remove(Class clazz, Query query) throws IOException; -} +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java index b711fa9bb09..1bd35f2556a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java @@ -17,14 +17,17 @@ */ package org.apache.hadoop.hdfs.server.federation.store.driver.impl; +import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple; + import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Map; import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.Query; +import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; /** * Base implementation of a State Store driver. It contains default @@ -41,7 +44,7 @@ public abstract class StateStoreBaseImpl extends StateStoreDriver { @Override public T get( - Class clazz, Map query) throws IOException { + Class clazz, Query query) throws IOException { List records = getMultiple(clazz, query); if (records.size() > 1) { throw new IOException("Found more than one object in collection"); @@ -52,18 +55,32 @@ public abstract class StateStoreBaseImpl extends StateStoreDriver { } } + @Override + public List getMultiple( + Class clazz, Query query) throws IOException { + QueryResult result = get(clazz); + List records = result.getRecords(); + List ret = filterMultiple(query, records); + if (ret == null) { + throw new IOException("Cannot fetch records from the store"); + } + return ret; + } + @Override public boolean put( T record, boolean allowUpdate, boolean errorIfExists) throws IOException { - List singletonList = new ArrayList(); + List singletonList = new ArrayList<>(); singletonList.add(record); return putAll(singletonList, allowUpdate, errorIfExists); } @Override public boolean remove(T record) throws IOException { - Map primaryKeys = record.getPrimaryKeys(); - Class clazz = StateStoreUtils.getRecordClass(record); - return remove(clazz, primaryKeys) == 1; + final Query query = new Query(record); + Class clazz = record.getClass(); + @SuppressWarnings("unchecked") + Class recordClass = (Class)StateStoreUtils.getRecordClass(clazz); + return remove(recordClass, query) == 1; } -} +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java new file mode 100644 index 00000000000..d7c00ffc1d9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java @@ -0,0 +1,429 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver.impl; + +import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple; +import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.getRecordClass; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.Query; +import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link StateStoreDriver} implementation based on a local file. + */ +public abstract class StateStoreFileBaseImpl + extends StateStoreSerializableImpl { + + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreFileBaseImpl.class); + + /** If it is initialized. */ + private boolean initialized = false; + + /** Name of the file containing the data. */ + private static final String DATA_FILE_NAME = "records.data"; + + + /** + * Lock reading records. + * + * @param clazz Class of the record. + */ + protected abstract void lockRecordRead(Class clazz); + + /** + * Unlock reading records. + * + * @param clazz Class of the record. + */ + protected abstract void unlockRecordRead( + Class clazz); + + /** + * Lock writing records. + * + * @param clazz Class of the record. + */ + protected abstract void lockRecordWrite( + Class clazz); + + /** + * Unlock writing records. + * + * @param clazz Class of the record. + */ + protected abstract void unlockRecordWrite( + Class clazz); + + /** + * Get the reader for the file system. + * + * @param clazz Class of the record. + */ + protected abstract BufferedReader getReader( + Class clazz, String sub); + + /** + * Get the writer for the file system. + * + * @param clazz Class of the record. + */ + protected abstract BufferedWriter getWriter( + Class clazz, String sub); + + /** + * Check if a path exists. + * + * @param path Path to check. + * @return If the path exists. + */ + protected abstract boolean exists(String path); + + /** + * Make a directory. + * + * @param path Path of the directory to create. + * @return If the directory was created. + */ + protected abstract boolean mkdir(String path); + + /** + * Get root directory. + * + * @return Root directory. + */ + protected abstract String getRootDir(); + + /** + * Set the driver as initialized. + * + * @param ini If the driver is initialized. + */ + public void setInitialized(boolean ini) { + this.initialized = ini; + } + + @Override + public boolean initDriver() { + String rootDir = getRootDir(); + try { + if (rootDir == null) { + LOG.error("Invalid root directory, unable to initialize driver."); + return false; + } + + // Check root path + if (!exists(rootDir)) { + if (!mkdir(rootDir)) { + LOG.error("Cannot create State Store root directory {}", rootDir); + return false; + } + } + } catch (Exception ex) { + LOG.error( + "Cannot initialize filesystem using root directory {}", rootDir, ex); + return false; + } + setInitialized(true); + return true; + } + + @Override + public boolean initRecordStorage( + String className, Class recordClass) { + + String dataDirPath = getRootDir() + "/" + className; + try { + // Create data directories for files + if (!exists(dataDirPath)) { + LOG.info("{} data directory doesn't exist, creating it", dataDirPath); + if (!mkdir(dataDirPath)) { + LOG.error("Cannot create data directory {}", dataDirPath); + return false; + } + String dataFilePath = dataDirPath + "/" + DATA_FILE_NAME; + if (!exists(dataFilePath)) { + // Create empty file + List emtpyList = new ArrayList<>(); + if(!writeAll(emtpyList, recordClass)) { + LOG.error("Cannot create data file {}", dataFilePath); + return false; + } + } + } + } catch (Exception ex) { + LOG.error("Cannot create data directory {}", dataDirPath, ex); + return false; + } + return true; + } + + /** + * Read all lines from a file and deserialize into the desired record type. + * + * @param reader Open handle for the file. + * @param recordClass Record class to create. + * @param includeDates True if dateModified/dateCreated are serialized. + * @return List of records. + * @throws IOException + */ + private List getAllFile( + BufferedReader reader, Class clazz, boolean includeDates) + throws IOException { + + List ret = new ArrayList(); + String line; + while ((line = reader.readLine()) != null) { + if (!line.startsWith("#") && line.length() > 0) { + try { + T record = newRecord(line, clazz, includeDates); + ret.add(record); + } catch (Exception ex) { + LOG.error("Cannot parse line in data source file: {}", line, ex); + } + } + } + return ret; + } + + @Override + public QueryResult get(Class clazz) + throws IOException { + return get(clazz, (String)null); + } + + @Override + public QueryResult get(Class clazz, String sub) + throws IOException { + verifyDriverReady(); + BufferedReader reader = null; + lockRecordRead(clazz); + try { + reader = getReader(clazz, sub); + List data = getAllFile(reader, clazz, true); + return new QueryResult(data, getTime()); + } catch (Exception ex) { + LOG.error("Cannot fetch records {}", clazz.getSimpleName()); + throw new IOException("Cannot read from data store " + ex.getMessage()); + } finally { + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + LOG.error("Failed closing file", e); + } + } + unlockRecordRead(clazz); + } + } + + /** + * Overwrite the existing data with a new data set. + * + * @param list List of records to write. + * @param writer BufferedWriter stream to write to. + * @return If the records were succesfully written. + */ + private boolean writeAllFile( + Collection records, BufferedWriter writer) { + + try { + for (BaseRecord record : records) { + try { + String data = serializeString(record); + writer.write(data); + writer.newLine(); + } catch (IllegalArgumentException ex) { + LOG.error("Cannot write record {} to file", record, ex); + } + } + writer.flush(); + return true; + } catch (IOException e) { + LOG.error("Cannot commit records to file", e); + return false; + } + } + + /** + * Overwrite the existing data with a new data set. Replaces all records in + * the data store for this record class. If all records in the data store are + * not successfully committed, this function must return false and leave the + * data store unchanged. + * + * @param records List of records to write. All records must be of type + * recordClass. + * @param recordClass Class of record to replace. + * @return true if all operations were successful, false otherwise. + * @throws StateStoreUnavailableException + */ + public boolean writeAll( + Collection records, Class recordClass) + throws StateStoreUnavailableException { + verifyDriverReady(); + lockRecordWrite(recordClass); + BufferedWriter writer = null; + try { + writer = getWriter(recordClass, null); + return writeAllFile(records, writer); + } catch (Exception e) { + LOG.error( + "Cannot add records to file for {}", recordClass.getSimpleName(), e); + return false; + } finally { + if (writer != null) { + try { + writer.close(); + } catch (IOException e) { + LOG.error( + "Cannot close writer for {}", recordClass.getSimpleName(), e); + } + } + unlockRecordWrite(recordClass); + } + } + + /** + * Get the data file name. + * + * @return Data file name. + */ + protected String getDataFileName() { + return DATA_FILE_NAME; + } + + @Override + public boolean isDriverReady() { + return this.initialized; + } + + @Override + public boolean putAll( + List records, boolean allowUpdate, boolean errorIfExists) + throws StateStoreUnavailableException { + verifyDriverReady(); + + if (records.isEmpty()) { + return true; + } + + @SuppressWarnings("unchecked") + Class clazz = (Class) getRecordClass(records.get(0).getClass()); + QueryResult result; + try { + result = get(clazz); + } catch (IOException e) { + return false; + } + Map writeList = new HashMap<>(); + + // Write all of the existing records + for (T existingRecord : result.getRecords()) { + String key = existingRecord.getPrimaryKey(); + writeList.put(key, existingRecord); + } + + // Add inserts and updates, overwrite any existing values + for (T updatedRecord : records) { + try { + updatedRecord.validate(); + String key = updatedRecord.getPrimaryKey(); + if (writeList.containsKey(key) && allowUpdate) { + // Update + writeList.put(key, updatedRecord); + // Update the mod time stamp. Many backends will use their + // own timestamp for the mod time. + updatedRecord.setDateModified(this.getTime()); + } else if (!writeList.containsKey(key)) { + // Insert + // Create/Mod timestamps are already initialized + writeList.put(key, updatedRecord); + } else if (errorIfExists) { + LOG.error("Attempt to insert record {} that already exists", + updatedRecord); + return false; + } + } catch (IllegalArgumentException ex) { + LOG.error("Cannot write invalid record to State Store", ex); + return false; + } + } + + // Write all + boolean status = writeAll(writeList.values(), clazz); + return status; + } + + @Override + public int remove(Class clazz, Query query) + throws StateStoreUnavailableException { + verifyDriverReady(); + + if (query == null) { + return 0; + } + + int removed = 0; + // Get the current records + try { + final QueryResult result = get(clazz); + final List existingRecords = result.getRecords(); + // Write all of the existing records except those to be removed + final List recordsToRemove = filterMultiple(query, existingRecords); + removed = recordsToRemove.size(); + final List newRecords = new LinkedList<>(); + for (T record : existingRecords) { + if (!recordsToRemove.contains(record)) { + newRecords.add(record); + } + } + if (!writeAll(newRecords, clazz)) { + throw new IOException( + "Cannot remove record " + clazz + " query " + query); + } + } catch (IOException e) { + LOG.error("Cannot remove records {} query {}", clazz, query, e); + } + + return removed; + } + + @Override + public boolean removeAll(Class clazz) + throws StateStoreUnavailableException { + verifyDriverReady(); + List emptyList = new ArrayList<>(); + boolean status = writeAll(emptyList, clazz); + return status; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java new file mode 100644 index 00000000000..24e966052fd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver.impl; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.io.Files; + +/** + * StateStoreDriver implementation based on a local file. + */ +public class StateStoreFileImpl extends StateStoreFileBaseImpl { + + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreFileImpl.class); + + /** Configuration keys. */ + public static final String FEDERATION_STORE_FILE_DIRECTORY = + DFSConfigKeys.FEDERATION_STORE_PREFIX + "driver.file.directory"; + + /** Synchronization. */ + private static final ReadWriteLock READ_WRITE_LOCK = + new ReentrantReadWriteLock(); + + /** Root directory for the state store. */ + private String rootDirectory; + + + @Override + protected boolean exists(String path) { + File test = new File(path); + return test.exists(); + } + + @Override + protected boolean mkdir(String path) { + File dir = new File(path); + return dir.mkdirs(); + } + + @Override + protected String getRootDir() { + if (this.rootDirectory == null) { + String dir = getConf().get(FEDERATION_STORE_FILE_DIRECTORY); + if (dir == null) { + File tempDir = Files.createTempDir(); + dir = tempDir.getAbsolutePath(); + } + this.rootDirectory = dir; + } + return this.rootDirectory; + } + + @Override + protected void lockRecordWrite(Class recordClass) { + // TODO - Synchronize via FS + READ_WRITE_LOCK.writeLock().lock(); + } + + @Override + protected void unlockRecordWrite( + Class recordClass) { + // TODO - Synchronize via FS + READ_WRITE_LOCK.writeLock().unlock(); + } + + @Override + protected void lockRecordRead(Class recordClass) { + // TODO - Synchronize via FS + READ_WRITE_LOCK.readLock().lock(); + } + + @Override + protected void unlockRecordRead(Class recordClass) { + // TODO - Synchronize via FS + READ_WRITE_LOCK.readLock().unlock(); + } + + @Override + protected BufferedReader getReader( + Class clazz, String sub) { + String filename = StateStoreUtils.getRecordName(clazz); + if (sub != null && sub.length() > 0) { + filename += "/" + sub; + } + filename += "/" + getDataFileName(); + + try { + LOG.debug("Loading file: {}", filename); + File file = new File(getRootDir(), filename); + FileInputStream fis = new FileInputStream(file); + InputStreamReader isr = + new InputStreamReader(fis, StandardCharsets.UTF_8); + BufferedReader reader = new BufferedReader(isr); + return reader; + } catch (Exception ex) { + LOG.error( + "Cannot open read stream for record {}", clazz.getSimpleName(), ex); + return null; + } + } + + @Override + protected BufferedWriter getWriter( + Class clazz, String sub) { + String filename = StateStoreUtils.getRecordName(clazz); + if (sub != null && sub.length() > 0) { + filename += "/" + sub; + } + filename += "/" + getDataFileName(); + + try { + File file = new File(getRootDir(), filename); + FileOutputStream fos = new FileOutputStream(file, false); + OutputStreamWriter osw = + new OutputStreamWriter(fos, StandardCharsets.UTF_8); + BufferedWriter writer = new BufferedWriter(osw); + return writer; + } catch (IOException ex) { + LOG.error( + "Cannot open read stream for record {}", clazz.getSimpleName(), ex); + return null; + } + } + + @Override + public void close() throws Exception { + setInitialized(false); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java new file mode 100644 index 00000000000..59684215ab5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver.impl; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.net.URI; +import java.nio.charset.StandardCharsets; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * StateStoreDriver} implementation based on a filesystem. The most common uses + * HDFS as a backend. + */ +public class StateStoreFileSystemImpl extends StateStoreFileBaseImpl { + + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreFileSystemImpl.class); + + + /** Configuration keys. */ + public static final String FEDERATION_STORE_FS_PATH = + DFSConfigKeys.FEDERATION_STORE_PREFIX + "driver.fs.path"; + + /** File system to back the State Store. */ + private FileSystem fs; + /** Working path in the filesystem. */ + private String workPath; + + @Override + protected boolean exists(String path) { + try { + return fs.exists(new Path(path)); + } catch (IOException e) { + return false; + } + } + + @Override + protected boolean mkdir(String path) { + try { + return fs.mkdirs(new Path(path)); + } catch (IOException e) { + return false; + } + } + + @Override + protected String getRootDir() { + if (this.workPath == null) { + String rootPath = getConf().get(FEDERATION_STORE_FS_PATH); + URI workUri; + try { + workUri = new URI(rootPath); + fs = FileSystem.get(workUri, getConf()); + } catch (Exception ex) { + return null; + } + this.workPath = rootPath; + } + return this.workPath; + } + + @Override + public void close() throws Exception { + if (fs != null) { + fs.close(); + } + } + + /** + * Get the folder path for the record class' data. + * + * @param cls Data record class. + * @return Path of the folder containing the record class' data files. + */ + private Path getPathForClass(Class clazz) { + if (clazz == null) { + return null; + } + // TODO extract table name from class: entry.getTableName() + String className = StateStoreUtils.getRecordName(clazz); + return new Path(workPath, className); + } + + @Override + protected void lockRecordRead(Class clazz) { + // Not required, synced with HDFS leasing + } + + @Override + protected void unlockRecordRead(Class clazz) { + // Not required, synced with HDFS leasing + } + + @Override + protected void lockRecordWrite(Class clazz) { + // TODO -> wait for lease to be available + } + + @Override + protected void unlockRecordWrite(Class clazz) { + // TODO -> ensure lease is closed for the file + } + + @Override + protected BufferedReader getReader( + Class clazz, String sub) { + + Path path = getPathForClass(clazz); + if (sub != null && sub.length() > 0) { + path = Path.mergePaths(path, new Path("/" + sub)); + } + path = Path.mergePaths(path, new Path("/" + getDataFileName())); + + try { + FSDataInputStream fdis = fs.open(path); + InputStreamReader isr = + new InputStreamReader(fdis, StandardCharsets.UTF_8); + BufferedReader reader = new BufferedReader(isr); + return reader; + } catch (IOException ex) { + LOG.error("Cannot open write stream for {} to {}", + clazz.getSimpleName(), path); + return null; + } + } + + @Override + protected BufferedWriter getWriter( + Class clazz, String sub) { + + Path path = getPathForClass(clazz); + if (sub != null && sub.length() > 0) { + path = Path.mergePaths(path, new Path("/" + sub)); + } + path = Path.mergePaths(path, new Path("/" + getDataFileName())); + + try { + FSDataOutputStream fdos = fs.create(path, true); + OutputStreamWriter osw = + new OutputStreamWriter(fdos, StandardCharsets.UTF_8); + BufferedWriter writer = new BufferedWriter(osw); + return writer; + } catch (IOException ex) { + LOG.error("Cannot open write stream for {} to {}", + clazz.getSimpleName(), path); + return null; + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java new file mode 100644 index 00000000000..e9b3fdf1133 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver.impl; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; + +/** + * State Store driver that stores a serialization of the records. The serializer + * is pluggable. + */ +public abstract class StateStoreSerializableImpl extends StateStoreBaseImpl { + + /** Default serializer for this driver. */ + private StateStoreSerializer serializer; + + + @Override + public boolean init(final Configuration config, final String id, + final Collection> records) { + boolean ret = super.init(config, id, records); + + this.serializer = StateStoreSerializer.getSerializer(config); + + return ret; + } + + /** + * Serialize a record using the serializer. + * @param record Record to serialize. + * @return Byte array with the serialization of the record. + */ + protected byte[] serialize(T record) { + return serializer.serialize(record); + } + + /** + * Serialize a record using the serializer. + * @param record Record to serialize. + * @return String with the serialization of the record. + */ + protected String serializeString(T record) { + return serializer.serializeString(record); + } + + /** + * Creates a record from an input data string. + * @param data Serialized text of the record. + * @param clazz Record class. + * @param includeDates If dateModified and dateCreated are serialized. + * @return The created record. + * @throws IOException + */ + protected T newRecord( + String data, Class clazz, boolean includeDates) throws IOException { + return serializer.deserialize(data, clazz); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java index 4192a3da353..79f99c8aaba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java @@ -122,6 +122,24 @@ public abstract class BaseRecord implements Comparable { return builder.toString(); } + /** + * Check if this record matches a partial record. + * + * @param other Partial record. + * @return If this record matches. + */ + public boolean like(BaseRecord other) { + if (other == null) { + return false; + } + Map thisKeys = this.getPrimaryKeys(); + Map otherKeys = other.getPrimaryKeys(); + if (thisKeys == null) { + return otherKeys == null; + } + return thisKeys.equals(otherKeys); + } + /** * Override equals check to use primary key(s) for comparison. */ @@ -186,4 +204,4 @@ public abstract class BaseRecord implements Comparable { public String toString() { return getPrimaryKey(); } -} +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/Query.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/Query.java new file mode 100644 index 00000000000..3c59abf3d07 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/Query.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.records; + +/** + * Check if a record matches a query. The query is usually a partial record. + * + * @param Type of the record to query. + */ +public class Query { + + /** Partial object to compare against. */ + private final T partial; + + + /** + * Create a query to search for a partial record. + * + * @param partial It defines the attributes to search. + */ + public Query(final T part) { + this.partial = part; + } + + /** + * Get the partial record used to query. + * + * @return The partial record used for the query. + */ + public T getPartial() { + return this.partial; + } + + /** + * Check if a record matches the primary keys or the partial record. + * + * @param other Record to check. + * @return If the record matches. Don't match if there is no partial. + */ + public boolean matches(T other) { + if (this.partial == null) { + return false; + } + return this.partial.like(other); + } + + @Override + public String toString() { + return "Checking: " + this.partial; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 215b410d726..7c23de513b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4378,4 +4378,20 @@ + + dfs.federation.router.store.driver.class + org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl + + Class to implement the State Store. By default it uses the local disk. + + + + + dfs.federation.router.store.connection.test + 60000 + + How often to check for the connection to the State Store in milliseconds. + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java new file mode 100644 index 00000000000..fc5aebd4fe8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS; +import static org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl.FEDERATION_STORE_FILE_DIRECTORY; +import static org.junit.Assert.assertNotNull; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileBaseImpl; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.util.Time; + +/** + * Utilities to test the State Store. + */ +public final class FederationStateStoreTestUtils { + + private FederationStateStoreTestUtils() { + // Utility Class + } + + /** + * Get the default State Store driver implementation. + * + * @return Class of the default State Store driver implementation. + */ + public static Class getDefaultDriver() { + return DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS_DEFAULT; + } + + /** + * Create a default State Store configuration. + * + * @return State Store configuration. + */ + public static Configuration getStateStoreConfiguration() { + Class clazz = getDefaultDriver(); + return getStateStoreConfiguration(clazz); + } + + /** + * Create a new State Store configuration for a particular driver. + * + * @param clazz Class of the driver to create. + * @return State Store configuration. + */ + public static Configuration getStateStoreConfiguration( + Class clazz) { + Configuration conf = new HdfsConfiguration(false); + + conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true); + conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "hdfs://test"); + + conf.setClass(FEDERATION_STORE_DRIVER_CLASS, clazz, StateStoreDriver.class); + + if (clazz.isAssignableFrom(StateStoreFileBaseImpl.class)) { + setFileConfiguration(conf); + } + return conf; + } + + /** + * Create a new State Store based on a configuration. + * + * @param configuration Configuration for the State Store. + * @return New State Store service. + * @throws IOException If it cannot create the State Store. + * @throws InterruptedException If we cannot wait for the store to start. + */ + public static StateStoreService getStateStore( + Configuration configuration) throws IOException, InterruptedException { + + StateStoreService stateStore = new StateStoreService(); + assertNotNull(stateStore); + + // Set unique identifier, this is normally the router address + String identifier = UUID.randomUUID().toString(); + stateStore.setIdentifier(identifier); + + stateStore.init(configuration); + stateStore.start(); + + // Wait for state store to connect + waitStateStore(stateStore, TimeUnit.SECONDS.toMillis(10)); + + return stateStore; + } + + /** + * Wait for the State Store to initialize its driver. + * + * @param stateStore State Store. + * @param timeoutMs Time out in milliseconds. + * @throws IOException If the State Store cannot be reached. + * @throws InterruptedException If the sleep is interrupted. + */ + public static void waitStateStore(StateStoreService stateStore, + long timeoutMs) throws IOException, InterruptedException { + long startingTime = Time.monotonicNow(); + while (!stateStore.isDriverReady()) { + Thread.sleep(100); + if (Time.monotonicNow() - startingTime > timeoutMs) { + throw new IOException("Timeout waiting for State Store to connect"); + } + } + } + + /** + * Delete the default State Store. + * + * @throws IOException + */ + public static void deleteStateStore() throws IOException { + Class driverClass = getDefaultDriver(); + deleteStateStore(driverClass); + } + + /** + * Delete the State Store. + * @param driverClass Class of the State Store driver implementation. + * @throws IOException If it cannot be deleted. + */ + public static void deleteStateStore( + Class driverClass) throws IOException { + + if (StateStoreFileBaseImpl.class.isAssignableFrom(driverClass)) { + String workingDirectory = System.getProperty("user.dir"); + File dir = new File(workingDirectory + "/statestore"); + if (dir.exists()) { + FileUtils.cleanDirectory(dir); + } + } + } + + /** + * Set the default configuration for drivers based on files. + * + * @param conf Configuration to extend. + */ + public static void setFileConfiguration(Configuration conf) { + String workingPath = System.getProperty("user.dir"); + String stateStorePath = workingPath + "/statestore"; + conf.set(FEDERATION_STORE_FILE_DIRECTORY, stateStorePath); + } + + /** + * Clear all the records from the State Store. + * + * @param store State Store to remove records from. + * @return If the State Store was cleared. + * @throws IOException If it cannot clear the State Store. + */ + public static boolean clearAllRecords(StateStoreService store) + throws IOException { + Collection> allRecords = + store.getSupportedRecords(); + for (Class recordType : allRecords) { + if (!clearRecords(store, recordType)) { + return false; + } + } + return true; + } + + /** + * Clear records from a certain type from the State Store. + * + * @param store State Store to remove records from. + * @param recordClass Class of the records to remove. + * @return If the State Store was cleared. + * @throws IOException If it cannot clear the State Store. + */ + public static boolean clearRecords( + StateStoreService store, Class recordClass) throws IOException { + List emptyList = new ArrayList<>(); + if (!synchronizeRecords(store, emptyList, recordClass)) { + return false; + } + return true; + } + + /** + * Synchronize a set of records. Remove all and keep the ones specified. + * + * @param stateStore State Store service managing the driver. + * @param records Records to add. + * @param clazz Class of the record to synchronize. + * @return If the synchronization succeeded. + * @throws IOException If it cannot connect to the State Store. + */ + public static boolean synchronizeRecords( + StateStoreService stateStore, List records, Class clazz) + throws IOException { + StateStoreDriver driver = stateStore.getDriver(); + driver.verifyDriverReady(); + if (driver.removeAll(clazz)) { + if (driver.putAll(records, true, false)) { + return true; + } + } + return false; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java new file mode 100644 index 00000000000..7f0b36a23c0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java @@ -0,0 +1,483 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.Query; +import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; +import org.junit.AfterClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Base tests for the driver. The particular implementations will use this to + * test their functionality. + */ +public class TestStateStoreDriverBase { + + private static final Logger LOG = + LoggerFactory.getLogger(TestStateStoreDriverBase.class); + + private static StateStoreService stateStore; + private static Configuration conf; + + + /** + * Get the State Store driver. + * @return State Store driver. + */ + protected StateStoreDriver getStateStoreDriver() { + return stateStore.getDriver(); + } + + @AfterClass + public static void tearDownCluster() { + if (stateStore != null) { + stateStore.stop(); + } + } + + /** + * Get a new State Store using this configuration. + * + * @param config Configuration for the State Store. + * @throws Exception If we cannot get the State Store. + */ + public static void getStateStore(Configuration config) throws Exception { + conf = config; + stateStore = FederationStateStoreTestUtils.getStateStore(conf); + } + + private T generateFakeRecord(Class recordClass) + throws IllegalArgumentException, IllegalAccessException, IOException { + + // TODO add record + return null; + } + + /** + * Validate if a record is the same. + * + * @param original + * @param committed + * @param assertEquals Assert if the records are equal or just return. + * @return + * @throws IllegalArgumentException + * @throws IllegalAccessException + */ + private boolean validateRecord( + BaseRecord original, BaseRecord committed, boolean assertEquals) + throws IllegalArgumentException, IllegalAccessException { + + boolean ret = true; + + Map> fields = getFields(original); + for (String key : fields.keySet()) { + if (key.equals("dateModified") || + key.equals("dateCreated") || + key.equals("proto")) { + // Fields are updated/set on commit and fetch and may not match + // the fields that are initialized in a non-committed object. + continue; + } + Object data1 = getField(original, key); + Object data2 = getField(committed, key); + if (assertEquals) { + assertEquals("Field " + key + " does not match", data1, data2); + } else if (!data1.equals(data2)) { + ret = false; + } + } + + long now = stateStore.getDriver().getTime(); + assertTrue( + committed.getDateCreated() <= now && committed.getDateCreated() > 0); + assertTrue(committed.getDateModified() >= committed.getDateCreated()); + + return ret; + } + + public static void removeAll(StateStoreDriver driver) throws IOException { + // TODO add records to remove + } + + public void testInsert( + StateStoreDriver driver, Class recordClass) + throws IllegalArgumentException, IllegalAccessException, IOException { + + assertTrue(driver.removeAll(recordClass)); + QueryResult records = driver.get(recordClass); + assertTrue(records.getRecords().isEmpty()); + + // Insert single + BaseRecord record = generateFakeRecord(recordClass); + driver.put(record, true, false); + + // Verify + records = driver.get(recordClass); + assertEquals(1, records.getRecords().size()); + validateRecord(record, records.getRecords().get(0), true); + + // Insert multiple + List insertList = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + T newRecord = generateFakeRecord(recordClass); + insertList.add(newRecord); + } + driver.putAll(insertList, true, false); + + // Verify + records = driver.get(recordClass); + assertEquals(11, records.getRecords().size()); + } + + public void testFetchErrors(StateStoreDriver driver, + Class clazz) throws IllegalAccessException, IOException { + + // Fetch empty list + driver.removeAll(clazz); + QueryResult result0 = driver.get(clazz); + assertNotNull(result0); + List records0 = result0.getRecords(); + assertEquals(records0.size(), 0); + + // Insert single + BaseRecord record = generateFakeRecord(clazz); + assertTrue(driver.put(record, true, false)); + + // Verify + QueryResult result1 = driver.get(clazz); + List records1 = result1.getRecords(); + assertEquals(1, records1.size()); + validateRecord(record, records1.get(0), true); + + // Test fetch single object with a bad query + final T fakeRecord = generateFakeRecord(clazz); + final Query query = new Query(fakeRecord); + T getRecord = driver.get(clazz, query); + assertNull(getRecord); + + // Test fetch multiple objects does not exist returns empty list + assertEquals(driver.getMultiple(clazz, query).size(), 0); + } + + public void testPut( + StateStoreDriver driver, Class clazz) + throws IllegalArgumentException, ReflectiveOperationException, + IOException, SecurityException { + + driver.removeAll(clazz); + QueryResult records = driver.get(clazz); + assertTrue(records.getRecords().isEmpty()); + + // Insert multiple + List insertList = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + T newRecord = generateFakeRecord(clazz); + insertList.add(newRecord); + } + + // Verify + assertTrue(driver.putAll(insertList, false, true)); + records = driver.get(clazz); + assertEquals(records.getRecords().size(), 10); + + // Generate a new record with the same PK fields as an existing record + BaseRecord updatedRecord = generateFakeRecord(clazz); + BaseRecord existingRecord = records.getRecords().get(0); + Map primaryKeys = existingRecord.getPrimaryKeys(); + for (Entry entry : primaryKeys.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + Class fieldType = getFieldType(existingRecord, key); + Object field = fromString(value, fieldType); + assertTrue(setField(updatedRecord, key, field)); + } + + // Attempt an update of an existing entry, but it is not allowed. + assertFalse(driver.put(updatedRecord, false, true)); + + // Verify no update occurred, all original records are unchanged + QueryResult newRecords = driver.get(clazz); + assertTrue(newRecords.getRecords().size() == 10); + assertEquals("A single entry was improperly updated in the store", 10, + countMatchingEntries(records.getRecords(), newRecords.getRecords())); + + // Update the entry (allowing updates) + assertTrue(driver.put(updatedRecord, true, false)); + + // Verify that one entry no longer matches the original set + newRecords = driver.get(clazz); + assertEquals(10, newRecords.getRecords().size()); + assertEquals( + "Record of type " + clazz + " not updated in the store", 9, + countMatchingEntries(records.getRecords(), newRecords.getRecords())); + } + + private int countMatchingEntries( + Collection committedList, + Collection matchList) { + + int matchingCount = 0; + for (BaseRecord committed : committedList) { + for (BaseRecord match : matchList) { + try { + if (match.getPrimaryKey().equals(committed.getPrimaryKey())) { + if (validateRecord(match, committed, false)) { + matchingCount++; + } + break; + } + } catch (Exception ex) { + } + } + } + return matchingCount; + } + + public void testRemove( + StateStoreDriver driver, Class clazz) + throws IllegalArgumentException, IllegalAccessException, IOException { + + // Remove all + assertTrue(driver.removeAll(clazz)); + QueryResult records = driver.get(clazz); + assertTrue(records.getRecords().isEmpty()); + + // Insert multiple + List insertList = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + T newRecord = generateFakeRecord(clazz); + insertList.add(newRecord); + } + + // Verify + assertTrue(driver.putAll(insertList, false, true)); + records = driver.get(clazz); + assertEquals(records.getRecords().size(), 10); + + // Remove Single + assertTrue(driver.remove(records.getRecords().get(0))); + + // Verify + records = driver.get(clazz); + assertEquals(records.getRecords().size(), 9); + + // Remove with filter + final T firstRecord = records.getRecords().get(0); + final Query query0 = new Query(firstRecord); + assertTrue(driver.remove(clazz, query0) > 0); + + final T secondRecord = records.getRecords().get(1); + final Query query1 = new Query(secondRecord); + assertTrue(driver.remove(clazz, query1) > 0); + + // Verify + records = driver.get(clazz); + assertEquals(records.getRecords().size(), 7); + + // Remove all + assertTrue(driver.removeAll(clazz)); + + // Verify + records = driver.get(clazz); + assertTrue(records.getRecords().isEmpty()); + } + + public void testInsert(StateStoreDriver driver) + throws IllegalArgumentException, IllegalAccessException, IOException { + // TODO add records + } + + public void testPut(StateStoreDriver driver) + throws IllegalArgumentException, ReflectiveOperationException, + IOException, SecurityException { + // TODO add records + } + + public void testRemove(StateStoreDriver driver) + throws IllegalArgumentException, IllegalAccessException, IOException { + // TODO add records + } + + public void testFetchErrors(StateStoreDriver driver) + throws IllegalArgumentException, IllegalAccessException, IOException { + // TODO add records + } + + /** + * Sets the value of a field on the object. + * + * @param fieldName The string name of the field. + * @param data The data to pass to the field's setter. + * + * @return True if successful, fails if failed. + */ + private static boolean setField( + BaseRecord record, String fieldName, Object data) { + + Method m = locateSetter(record, fieldName); + if (m != null) { + try { + m.invoke(record, data); + } catch (Exception e) { + LOG.error("Cannot set field " + fieldName + " on object " + + record.getClass().getName() + " to data " + data + " of type " + + data.getClass(), e); + return false; + } + } + return true; + } + + /** + * Finds the appropriate setter for a field name. + * + * @param fieldName The legacy name of the field. + * @return The matching setter or null if not found. + */ + private static Method locateSetter(BaseRecord record, String fieldName) { + for (Method m : record.getClass().getMethods()) { + if (m.getName().equalsIgnoreCase("set" + fieldName)) { + return m; + } + } + return null; + } + + /** + * Returns all serializable fields in the object. + * + * @return Map with the fields. + */ + private static Map> getFields(BaseRecord record) { + Map> getters = new HashMap<>(); + for (Method m : record.getClass().getDeclaredMethods()) { + if (m.getName().startsWith("get")) { + try { + Class type = m.getReturnType(); + char[] c = m.getName().substring(3).toCharArray(); + c[0] = Character.toLowerCase(c[0]); + String key = new String(c); + getters.put(key, type); + } catch (Exception e) { + LOG.error("Cannot execute getter " + m.getName() + + " on object " + record); + } + } + } + return getters; + } + + /** + * Get the type of a field. + * + * @param fieldName + * @return Field type + */ + private static Class getFieldType(BaseRecord record, String fieldName) { + Method m = locateGetter(record, fieldName); + return m.getReturnType(); + } + + /** + * Fetches the value for a field name. + * + * @param fieldName the legacy name of the field. + * @return The field data or null if not found. + */ + private static Object getField(BaseRecord record, String fieldName) { + Object result = null; + Method m = locateGetter(record, fieldName); + if (m != null) { + try { + result = m.invoke(record); + } catch (Exception e) { + LOG.error("Cannot get field " + fieldName + " on object " + record); + } + } + return result; + } + + /** + * Finds the appropriate getter for a field name. + * + * @param fieldName The legacy name of the field. + * @return The matching getter or null if not found. + */ + private static Method locateGetter(BaseRecord record, String fieldName) { + for (Method m : record.getClass().getMethods()) { + if (m.getName().equalsIgnoreCase("get" + fieldName)) { + return m; + } + } + return null; + } + + /** + * Expands a data object from the store into an record object. Default store + * data type is a String. Override if additional serialization is required. + * + * @param data Object containing the serialized data. Only string is + * supported. + * @param clazz Target object class to hold the deserialized data. + * @return An instance of the target data object initialized with the + * deserialized data. + */ + @Deprecated + @SuppressWarnings({ "unchecked", "rawtypes" }) + private static T fromString(String data, Class clazz) { + + if (data.equals("null")) { + return null; + } else if (clazz == String.class) { + return (T) data; + } else if (clazz == Long.class || clazz == long.class) { + return (T) Long.valueOf(data); + } else if (clazz == Integer.class || clazz == int.class) { + return (T) Integer.valueOf(data); + } else if (clazz == Double.class || clazz == double.class) { + return (T) Double.valueOf(data); + } else if (clazz == Float.class || clazz == float.class) { + return (T) Float.valueOf(data); + } else if (clazz == Boolean.class || clazz == boolean.class) { + return (T) Boolean.valueOf(data); + } else if (clazz.isEnum()) { + return (T) Enum.valueOf((Class) clazz, data); + } + return null; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFile.java new file mode 100644 index 00000000000..920e280d8d4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFile.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver; + +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test the FileSystem (e.g., HDFS) implementation of the State Store driver. + */ +public class TestStateStoreFile extends TestStateStoreDriverBase { + + @BeforeClass + public static void setupCluster() throws Exception { + Configuration conf = getStateStoreConfiguration(StateStoreFileImpl.class); + getStateStore(conf); + } + + @Before + public void startup() throws IOException { + removeAll(getStateStoreDriver()); + } + + @Test + public void testInsert() + throws IllegalArgumentException, IllegalAccessException, IOException { + testInsert(getStateStoreDriver()); + } + + @Test + public void testUpdate() + throws IllegalArgumentException, ReflectiveOperationException, + IOException, SecurityException { + testPut(getStateStoreDriver()); + } + + @Test + public void testDelete() + throws IllegalArgumentException, IllegalAccessException, IOException { + testRemove(getStateStoreDriver()); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java new file mode 100644 index 00000000000..da2e51ddd72 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils; +import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileSystemImpl; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test the FileSystem (e.g., HDFS) implementation of the State Store driver. + */ +public class TestStateStoreFileSystem extends TestStateStoreDriverBase { + + private static MiniDFSCluster dfsCluster; + + @BeforeClass + public static void setupCluster() throws Exception { + Configuration conf = FederationStateStoreTestUtils + .getStateStoreConfiguration(StateStoreFileSystemImpl.class); + conf.set(StateStoreFileSystemImpl.FEDERATION_STORE_FS_PATH, + "/hdfs-federation/"); + + // Create HDFS cluster to back the state tore + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf); + builder.numDataNodes(1); + dfsCluster = builder.build(); + dfsCluster.waitClusterUp(); + getStateStore(conf); + } + + @AfterClass + public static void tearDownCluster() { + if (dfsCluster != null) { + dfsCluster.shutdown(); + } + } + + @Before + public void startup() throws IOException { + removeAll(getStateStoreDriver()); + } + + @Test + public void testInsert() + throws IllegalArgumentException, IllegalAccessException, IOException { + testInsert(getStateStoreDriver()); + } + + @Test + public void testUpdate() + throws IllegalArgumentException, IllegalAccessException, IOException { + testInsert(getStateStoreDriver()); + } + + @Test + public void testDelete() + throws IllegalArgumentException, IllegalAccessException, IOException { + testInsert(getStateStoreDriver()); + } + + @Test + public void testFetchErrors() + throws IllegalArgumentException, IllegalAccessException, IOException { + testFetchErrors(getStateStoreDriver()); + } +} \ No newline at end of file