HDFS-10630. Federation State Store FS Implementation. Contributed by Jason Kace and Inigo Goiri.
(cherry picked from commit c6e0bd640c
)
This commit is contained in:
parent
05afc382d9
commit
9b09f73006
|
@ -18,6 +18,8 @@
|
|||
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
|
@ -25,6 +27,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl;
|
||||
import org.apache.hadoop.http.HttpConfig;
|
||||
|
||||
|
@ -1137,6 +1141,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
FEDERATION_STORE_SERIALIZER_CLASS_DEFAULT =
|
||||
StateStoreSerializerPBImpl.class;
|
||||
|
||||
public static final String FEDERATION_STORE_DRIVER_CLASS =
|
||||
FEDERATION_STORE_PREFIX + "driver.class";
|
||||
public static final Class<? extends StateStoreDriver>
|
||||
FEDERATION_STORE_DRIVER_CLASS_DEFAULT = StateStoreFileImpl.class;
|
||||
|
||||
public static final String FEDERATION_STORE_CONNECTION_TEST_MS =
|
||||
FEDERATION_STORE_PREFIX + "connection.test";
|
||||
public static final long FEDERATION_STORE_CONNECTION_TEST_MS_DEFAULT =
|
||||
TimeUnit.MINUTES.toMillis(1);
|
||||
|
||||
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
|
||||
@Deprecated
|
||||
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
|
||||
|
|
|
@ -0,0 +1,198 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.service.ServiceStateException;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
* Service to periodically execute a runnable.
|
||||
*/
|
||||
public abstract class PeriodicService extends AbstractService {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(PeriodicService.class);
|
||||
|
||||
/** Default interval in milliseconds for the periodic service. */
|
||||
private static final long DEFAULT_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1);
|
||||
|
||||
|
||||
/** Interval for running the periodic service in milliseconds. */
|
||||
private long intervalMs;
|
||||
/** Name of the service. */
|
||||
private final String serviceName;
|
||||
|
||||
/** Scheduler for the periodic service. */
|
||||
private final ScheduledExecutorService scheduler;
|
||||
|
||||
/** If the service is running. */
|
||||
private volatile boolean isRunning = false;
|
||||
|
||||
/** How many times we run. */
|
||||
private long runCount;
|
||||
/** How many errors we got. */
|
||||
private long errorCount;
|
||||
/** When was the last time we executed this service successfully. */
|
||||
private long lastRun;
|
||||
|
||||
/**
|
||||
* Create a new periodic update service.
|
||||
*
|
||||
* @param name Name of the service.
|
||||
*/
|
||||
public PeriodicService(String name) {
|
||||
this(name, DEFAULT_INTERVAL_MS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new periodic update service.
|
||||
*
|
||||
* @param name Name of the service.
|
||||
* @param interval Interval for the periodic service in milliseconds.
|
||||
*/
|
||||
public PeriodicService(String name, long interval) {
|
||||
super(name);
|
||||
this.serviceName = name;
|
||||
this.intervalMs = interval;
|
||||
|
||||
ThreadFactory threadFactory = new ThreadFactoryBuilder()
|
||||
.setNameFormat(this.getName() + "-%d")
|
||||
.build();
|
||||
this.scheduler = Executors.newScheduledThreadPool(1, threadFactory);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the interval for the periodic service.
|
||||
*
|
||||
* @param interval Interval in milliseconds.
|
||||
*/
|
||||
protected void setIntervalMs(long interval) {
|
||||
if (getServiceState() == STATE.STARTED) {
|
||||
throw new ServiceStateException("Periodic service already started");
|
||||
} else {
|
||||
this.intervalMs = interval;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the interval for the periodic service.
|
||||
*
|
||||
* @return Interval in milliseconds.
|
||||
*/
|
||||
protected long getIntervalMs() {
|
||||
return this.intervalMs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get how many times we failed to run the periodic service.
|
||||
*
|
||||
* @return Times we failed to run the periodic service.
|
||||
*/
|
||||
protected long getErrorCount() {
|
||||
return this.errorCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get how many times we run the periodic service.
|
||||
*
|
||||
* @return Times we run the periodic service.
|
||||
*/
|
||||
protected long getRunCount() {
|
||||
return this.runCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the last time the periodic service was executed.
|
||||
*
|
||||
* @return Last time the periodic service was executed.
|
||||
*/
|
||||
protected long getLastUpdate() {
|
||||
return this.lastRun;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
super.serviceStart();
|
||||
LOG.info("Starting periodic service {}", this.serviceName);
|
||||
startPeriodic();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
stopPeriodic();
|
||||
LOG.info("Stopping periodic service {}", this.serviceName);
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the periodic task.
|
||||
*/
|
||||
protected synchronized void stopPeriodic() {
|
||||
if (this.isRunning) {
|
||||
LOG.info("{} is shutting down", this.serviceName);
|
||||
this.isRunning = false;
|
||||
this.scheduler.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the periodic execution.
|
||||
*/
|
||||
protected synchronized void startPeriodic() {
|
||||
stopPeriodic();
|
||||
|
||||
// Create the runnable service
|
||||
Runnable updateRunnable = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.debug("Running {} update task", serviceName);
|
||||
try {
|
||||
if (!isRunning) {
|
||||
return;
|
||||
}
|
||||
periodicInvoke();
|
||||
runCount++;
|
||||
lastRun = Time.now();
|
||||
} catch (Exception ex) {
|
||||
errorCount++;
|
||||
LOG.warn(serviceName + " service threw an exception", ex);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Start the execution of the periodic service
|
||||
this.isRunning = true;
|
||||
this.scheduler.scheduleWithFixedDelay(
|
||||
updateRunnable, 0, this.intervalMs, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Method that the service will run periodically.
|
||||
*/
|
||||
protected abstract void periodicInvoke();
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.store;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.server.federation.router.PeriodicService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Service to periodically monitor the connection of the StateStore
|
||||
* {@link StateStoreService} data store and to re-open the connection
|
||||
* to the data store if required.
|
||||
*/
|
||||
public class StateStoreConnectionMonitorService extends PeriodicService {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(StateStoreConnectionMonitorService.class);
|
||||
|
||||
/** Service that maintains the State Store connection. */
|
||||
private final StateStoreService stateStore;
|
||||
|
||||
|
||||
/**
|
||||
* Create a new service to monitor the connectivity of the state store driver.
|
||||
*
|
||||
* @param store Instance of the state store to be monitored.
|
||||
*/
|
||||
public StateStoreConnectionMonitorService(StateStoreService store) {
|
||||
super(StateStoreConnectionMonitorService.class.getSimpleName());
|
||||
this.stateStore = store;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
this.setIntervalMs(conf.getLong(
|
||||
DFSConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS,
|
||||
DFSConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS_DEFAULT));
|
||||
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void periodicInvoke() {
|
||||
LOG.debug("Checking state store connection");
|
||||
if (!stateStore.isDriverReady()) {
|
||||
LOG.info("Attempting to open state store driver.");
|
||||
stateStore.loadDriver();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -15,45 +15,168 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.server.federation.store;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedList;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
|
||||
import org.apache.hadoop.service.CompositeService;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* A service to initialize a
|
||||
* {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver
|
||||
* StateStoreDriver} and maintain the connection to the data store. There
|
||||
* are multiple state store driver connections supported:
|
||||
* StateStoreDriver} and maintain the connection to the data store. There are
|
||||
* multiple state store driver connections supported:
|
||||
* <ul>
|
||||
* <li>File {@link org.apache.hadoop.hdfs.server.federation.store.driver.impl.
|
||||
* <li>File
|
||||
* {@link org.apache.hadoop.hdfs.server.federation.store.driver.impl.
|
||||
* StateStoreFileImpl StateStoreFileImpl}
|
||||
* <li>ZooKeeper {@link org.apache.hadoop.hdfs.server.federation.store.driver.
|
||||
* impl.StateStoreZooKeeperImpl StateStoreZooKeeperImpl}
|
||||
* <li>ZooKeeper
|
||||
* {@link org.apache.hadoop.hdfs.server.federation.store.driver.impl.
|
||||
* StateStoreZooKeeperImpl StateStoreZooKeeperImpl}
|
||||
* </ul>
|
||||
* <p>
|
||||
* The service also supports the dynamic registration of data interfaces such as
|
||||
* the following:
|
||||
* The service also supports the dynamic registration of record stores like:
|
||||
* <ul>
|
||||
* <li>{@link MembershipStateStore}: state of the Namenodes in the
|
||||
* <li>{@link MembershipStore}: state of the Namenodes in the
|
||||
* federation.
|
||||
* <li>{@link MountTableStore}: Mount table between to subclusters.
|
||||
* See {@link org.apache.hadoop.fs.viewfs.ViewFs ViewFs}.
|
||||
* <li>{@link RouterStateStore}: State of the routers in the federation.
|
||||
* </ul>
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class StateStoreService extends CompositeService {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(StateStoreService.class);
|
||||
|
||||
|
||||
/** State Store configuration. */
|
||||
private Configuration conf;
|
||||
|
||||
/** Identifier for the service. */
|
||||
private String identifier;
|
||||
|
||||
// Stub class
|
||||
public StateStoreService(String name) {
|
||||
super(name);
|
||||
/** Driver for the back end connection. */
|
||||
private StateStoreDriver driver;
|
||||
|
||||
/** Service to maintain data store connection. */
|
||||
private StateStoreConnectionMonitorService monitorService;
|
||||
|
||||
|
||||
public StateStoreService() {
|
||||
super(StateStoreService.class.getName());
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the State Store and the connection to the backend.
|
||||
*
|
||||
* @param config Configuration for the State Store.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
protected void serviceInit(Configuration config) throws Exception {
|
||||
this.conf = config;
|
||||
|
||||
// Create implementation of State Store
|
||||
Class<? extends StateStoreDriver> driverClass = this.conf.getClass(
|
||||
DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS,
|
||||
DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS_DEFAULT,
|
||||
StateStoreDriver.class);
|
||||
this.driver = ReflectionUtils.newInstance(driverClass, this.conf);
|
||||
|
||||
if (this.driver == null) {
|
||||
throw new IOException("Cannot create driver for the State Store");
|
||||
}
|
||||
|
||||
// Check the connection to the State Store periodically
|
||||
this.monitorService = new StateStoreConnectionMonitorService(this);
|
||||
this.addService(monitorService);
|
||||
|
||||
super.serviceInit(this.conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
loadDriver();
|
||||
super.serviceStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
closeDriver();
|
||||
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
/**
|
||||
* List of records supported by this State Store.
|
||||
*
|
||||
* @return List of supported record classes.
|
||||
*/
|
||||
public Collection<Class<? extends BaseRecord>> getSupportedRecords() {
|
||||
// TODO add list of records
|
||||
return new LinkedList<>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Load the State Store driver. If successful, refresh cached data tables.
|
||||
*/
|
||||
public void loadDriver() {
|
||||
synchronized (this.driver) {
|
||||
if (!isDriverReady()) {
|
||||
String driverName = this.driver.getClass().getSimpleName();
|
||||
if (this.driver.init(conf, getIdentifier(), getSupportedRecords())) {
|
||||
LOG.info("Connection to the State Store driver {} is open and ready",
|
||||
driverName);
|
||||
} else {
|
||||
LOG.error("Cannot initialize State Store driver {}", driverName);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the driver is ready to be used.
|
||||
*
|
||||
* @return If the driver is ready.
|
||||
*/
|
||||
public boolean isDriverReady() {
|
||||
return this.driver.isDriverReady();
|
||||
}
|
||||
|
||||
/**
|
||||
* Manually shuts down the driver.
|
||||
*
|
||||
* @throws Exception If the driver cannot be closed.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public void closeDriver() throws Exception {
|
||||
if (this.driver != null) {
|
||||
this.driver.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the state store driver.
|
||||
*
|
||||
* @return State store driver.
|
||||
*/
|
||||
public StateStoreDriver getDriver() {
|
||||
return this.driver;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -74,4 +197,5 @@ public class StateStoreService extends CompositeService {
|
|||
public void setIdentifier(String id) {
|
||||
this.identifier = id;
|
||||
}
|
||||
|
||||
}
|
|
@ -17,17 +17,22 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.store;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Set of utility functions used to query, create, update and delete data
|
||||
* records in the state store.
|
||||
* Set of utility functions used to work with the State Store.
|
||||
*/
|
||||
public final class StateStoreUtils {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(StateStoreUtils.class);
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(StateStoreUtils.class);
|
||||
|
||||
|
||||
private StateStoreUtils() {
|
||||
// Utility class
|
||||
|
@ -52,7 +57,7 @@ public final class StateStoreUtils {
|
|||
|
||||
// Check if we went too far
|
||||
if (actualClazz.equals(BaseRecord.class)) {
|
||||
LOG.error("We went too far (" + actualClazz + ") with " + clazz);
|
||||
LOG.error("We went too far ({}) with {}", actualClazz, clazz);
|
||||
actualClazz = clazz;
|
||||
}
|
||||
return actualClazz;
|
||||
|
@ -69,4 +74,36 @@ public final class StateStoreUtils {
|
|||
Class<? extends BaseRecord> getRecordClass(final T record) {
|
||||
return getRecordClass(record.getClass());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the base class name for a record. If we get an implementation of a
|
||||
* record we will return the real parent record class.
|
||||
*
|
||||
* @param clazz Class of the data record to check.
|
||||
* @return Name of the base class for the record.
|
||||
*/
|
||||
public static <T extends BaseRecord> String getRecordName(
|
||||
final Class<T> clazz) {
|
||||
return getRecordClass(clazz).getSimpleName();
|
||||
}
|
||||
|
||||
/**
|
||||
* Filters a list of records to find all records matching the query.
|
||||
*
|
||||
* @param query Map of field names and objects to use to filter results.
|
||||
* @param records List of data records to filter.
|
||||
* @return List of all records matching the query (or empty list if none
|
||||
* match), null if the data set could not be filtered.
|
||||
*/
|
||||
public static <T extends BaseRecord> List<T> filterMultiple(
|
||||
final Query<T> query, final Iterable<T> records) {
|
||||
|
||||
List<T> matchingList = new ArrayList<>();
|
||||
for (T record : records) {
|
||||
if (query.matches(record)) {
|
||||
matchingList.add(record);
|
||||
}
|
||||
}
|
||||
return matchingList;
|
||||
}
|
||||
}
|
|
@ -18,15 +18,16 @@
|
|||
package org.apache.hadoop.hdfs.server.federation.store.driver;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.util.List;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Driver class for an implementation of a {@link StateStoreService}
|
||||
|
@ -35,7 +36,8 @@ import org.apache.hadoop.util.Time;
|
|||
*/
|
||||
public abstract class StateStoreDriver implements StateStoreRecordOperations {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(StateStoreDriver.class);
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(StateStoreDriver.class);
|
||||
|
||||
|
||||
/** State Store configuration. */
|
||||
|
@ -47,13 +49,14 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations {
|
|||
|
||||
/**
|
||||
* Initialize the state store connection.
|
||||
*
|
||||
* @param config Configuration for the driver.
|
||||
* @param id Identifier for the driver.
|
||||
* @param records Records that are supported.
|
||||
* @return If initialized and ready, false if failed to initialize driver.
|
||||
*/
|
||||
public boolean init(final Configuration config, final String id,
|
||||
final List<Class<? extends BaseRecord>> records) {
|
||||
final Collection<Class<? extends BaseRecord>> records) {
|
||||
|
||||
this.conf = config;
|
||||
this.identifier = id;
|
||||
|
@ -62,10 +65,22 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations {
|
|||
LOG.warn("The identifier for the State Store connection is not set");
|
||||
}
|
||||
|
||||
// TODO stub
|
||||
boolean success = initDriver();
|
||||
if (!success) {
|
||||
LOG.error("Cannot intialize driver for {}", getDriverName());
|
||||
return false;
|
||||
}
|
||||
|
||||
for (Class<? extends BaseRecord> cls : records) {
|
||||
String recordString = StateStoreUtils.getRecordName(cls);
|
||||
if (!initRecordStorage(recordString, cls)) {
|
||||
LOG.error("Cannot initialize record store for {}", cls.getSimpleName());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the State Store configuration.
|
||||
*
|
||||
|
|
|
@ -19,11 +19,11 @@ package org.apache.hadoop.hdfs.server.federation.store.driver;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
|
||||
import org.apache.hadoop.io.retry.AtMostOnce;
|
||||
import org.apache.hadoop.io.retry.Idempotent;
|
||||
|
@ -67,14 +67,14 @@ public interface StateStoreRecordOperations {
|
|||
* Get a single record from the store that matches the query.
|
||||
*
|
||||
* @param clazz Class of record to fetch.
|
||||
* @param query Map of field names and objects to filter results.
|
||||
* @param query Query to filter results.
|
||||
* @return A single record matching the query. Null if there are no matching
|
||||
* records or more than one matching record in the store.
|
||||
* @throws IOException If multiple records match or if the data store cannot
|
||||
* be queried.
|
||||
*/
|
||||
@Idempotent
|
||||
<T extends BaseRecord> T get(Class<T> clazz, Map<String, String> query)
|
||||
<T extends BaseRecord> T get(Class<T> clazz, Query<T> query)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
|
@ -83,14 +83,14 @@ public interface StateStoreRecordOperations {
|
|||
* supports filtering it should overwrite this method.
|
||||
*
|
||||
* @param clazz Class of record to fetch.
|
||||
* @param query Map of field names and objects to filter results.
|
||||
* @param query Query to filter results.
|
||||
* @return Records of type clazz that match the query or empty list if none
|
||||
* are found.
|
||||
* @throws IOException Throws exception if unable to query the data store.
|
||||
*/
|
||||
@Idempotent
|
||||
<T extends BaseRecord> List<T> getMultiple(
|
||||
Class<T> clazz, Map<String, String> query) throws IOException;
|
||||
Class<T> clazz, Query<T> query) throws IOException;
|
||||
|
||||
/**
|
||||
* Creates a single record. Optionally updates an existing record with same
|
||||
|
@ -152,13 +152,12 @@ public interface StateStoreRecordOperations {
|
|||
* Remove multiple records of a specific class that match a query. Requires
|
||||
* the getAll implementation to fetch fresh records on each call.
|
||||
*
|
||||
* @param clazz Class of record to remove.
|
||||
* @param filter matching filter to remove.
|
||||
* @param query Query to filter what to remove.
|
||||
* @return The number of records removed.
|
||||
* @throws IOException Throws exception if unable to query the data store.
|
||||
*/
|
||||
@AtMostOnce
|
||||
<T extends BaseRecord> int remove(Class<T> clazz, Map<String, String> filter)
|
||||
<T extends BaseRecord> int remove(Class<T> clazz, Query<T> query)
|
||||
throws IOException;
|
||||
|
||||
}
|
|
@ -17,14 +17,17 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
|
||||
|
||||
/**
|
||||
* Base implementation of a State Store driver. It contains default
|
||||
|
@ -41,7 +44,7 @@ public abstract class StateStoreBaseImpl extends StateStoreDriver {
|
|||
|
||||
@Override
|
||||
public <T extends BaseRecord> T get(
|
||||
Class<T> clazz, Map<String, String> query) throws IOException {
|
||||
Class<T> clazz, Query<T> query) throws IOException {
|
||||
List<T> records = getMultiple(clazz, query);
|
||||
if (records.size() > 1) {
|
||||
throw new IOException("Found more than one object in collection");
|
||||
|
@ -52,18 +55,32 @@ public abstract class StateStoreBaseImpl extends StateStoreDriver {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends BaseRecord> List<T> getMultiple(
|
||||
Class<T> clazz, Query<T> query) throws IOException {
|
||||
QueryResult<T> result = get(clazz);
|
||||
List<T> records = result.getRecords();
|
||||
List<T> ret = filterMultiple(query, records);
|
||||
if (ret == null) {
|
||||
throw new IOException("Cannot fetch records from the store");
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends BaseRecord> boolean put(
|
||||
T record, boolean allowUpdate, boolean errorIfExists) throws IOException {
|
||||
List<T> singletonList = new ArrayList<T>();
|
||||
List<T> singletonList = new ArrayList<>();
|
||||
singletonList.add(record);
|
||||
return putAll(singletonList, allowUpdate, errorIfExists);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends BaseRecord> boolean remove(T record) throws IOException {
|
||||
Map<String, String> primaryKeys = record.getPrimaryKeys();
|
||||
Class<? extends BaseRecord> clazz = StateStoreUtils.getRecordClass(record);
|
||||
return remove(clazz, primaryKeys) == 1;
|
||||
final Query<T> query = new Query<T>(record);
|
||||
Class<? extends BaseRecord> clazz = record.getClass();
|
||||
@SuppressWarnings("unchecked")
|
||||
Class<T> recordClass = (Class<T>)StateStoreUtils.getRecordClass(clazz);
|
||||
return remove(recordClass, query) == 1;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,429 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
|
||||
import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.getRecordClass;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* {@link StateStoreDriver} implementation based on a local file.
|
||||
*/
|
||||
public abstract class StateStoreFileBaseImpl
|
||||
extends StateStoreSerializableImpl {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(StateStoreFileBaseImpl.class);
|
||||
|
||||
/** If it is initialized. */
|
||||
private boolean initialized = false;
|
||||
|
||||
/** Name of the file containing the data. */
|
||||
private static final String DATA_FILE_NAME = "records.data";
|
||||
|
||||
|
||||
/**
|
||||
* Lock reading records.
|
||||
*
|
||||
* @param clazz Class of the record.
|
||||
*/
|
||||
protected abstract <T extends BaseRecord> void lockRecordRead(Class<T> clazz);
|
||||
|
||||
/**
|
||||
* Unlock reading records.
|
||||
*
|
||||
* @param clazz Class of the record.
|
||||
*/
|
||||
protected abstract <T extends BaseRecord> void unlockRecordRead(
|
||||
Class<T> clazz);
|
||||
|
||||
/**
|
||||
* Lock writing records.
|
||||
*
|
||||
* @param clazz Class of the record.
|
||||
*/
|
||||
protected abstract <T extends BaseRecord> void lockRecordWrite(
|
||||
Class<T> clazz);
|
||||
|
||||
/**
|
||||
* Unlock writing records.
|
||||
*
|
||||
* @param clazz Class of the record.
|
||||
*/
|
||||
protected abstract <T extends BaseRecord> void unlockRecordWrite(
|
||||
Class<T> clazz);
|
||||
|
||||
/**
|
||||
* Get the reader for the file system.
|
||||
*
|
||||
* @param clazz Class of the record.
|
||||
*/
|
||||
protected abstract <T extends BaseRecord> BufferedReader getReader(
|
||||
Class<T> clazz, String sub);
|
||||
|
||||
/**
|
||||
* Get the writer for the file system.
|
||||
*
|
||||
* @param clazz Class of the record.
|
||||
*/
|
||||
protected abstract <T extends BaseRecord> BufferedWriter getWriter(
|
||||
Class<T> clazz, String sub);
|
||||
|
||||
/**
|
||||
* Check if a path exists.
|
||||
*
|
||||
* @param path Path to check.
|
||||
* @return If the path exists.
|
||||
*/
|
||||
protected abstract boolean exists(String path);
|
||||
|
||||
/**
|
||||
* Make a directory.
|
||||
*
|
||||
* @param path Path of the directory to create.
|
||||
* @return If the directory was created.
|
||||
*/
|
||||
protected abstract boolean mkdir(String path);
|
||||
|
||||
/**
|
||||
* Get root directory.
|
||||
*
|
||||
* @return Root directory.
|
||||
*/
|
||||
protected abstract String getRootDir();
|
||||
|
||||
/**
|
||||
* Set the driver as initialized.
|
||||
*
|
||||
* @param ini If the driver is initialized.
|
||||
*/
|
||||
public void setInitialized(boolean ini) {
|
||||
this.initialized = ini;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean initDriver() {
|
||||
String rootDir = getRootDir();
|
||||
try {
|
||||
if (rootDir == null) {
|
||||
LOG.error("Invalid root directory, unable to initialize driver.");
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check root path
|
||||
if (!exists(rootDir)) {
|
||||
if (!mkdir(rootDir)) {
|
||||
LOG.error("Cannot create State Store root directory {}", rootDir);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
LOG.error(
|
||||
"Cannot initialize filesystem using root directory {}", rootDir, ex);
|
||||
return false;
|
||||
}
|
||||
setInitialized(true);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends BaseRecord> boolean initRecordStorage(
|
||||
String className, Class<T> recordClass) {
|
||||
|
||||
String dataDirPath = getRootDir() + "/" + className;
|
||||
try {
|
||||
// Create data directories for files
|
||||
if (!exists(dataDirPath)) {
|
||||
LOG.info("{} data directory doesn't exist, creating it", dataDirPath);
|
||||
if (!mkdir(dataDirPath)) {
|
||||
LOG.error("Cannot create data directory {}", dataDirPath);
|
||||
return false;
|
||||
}
|
||||
String dataFilePath = dataDirPath + "/" + DATA_FILE_NAME;
|
||||
if (!exists(dataFilePath)) {
|
||||
// Create empty file
|
||||
List<T> emtpyList = new ArrayList<>();
|
||||
if(!writeAll(emtpyList, recordClass)) {
|
||||
LOG.error("Cannot create data file {}", dataFilePath);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Cannot create data directory {}", dataDirPath, ex);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read all lines from a file and deserialize into the desired record type.
|
||||
*
|
||||
* @param reader Open handle for the file.
|
||||
* @param recordClass Record class to create.
|
||||
* @param includeDates True if dateModified/dateCreated are serialized.
|
||||
* @return List of records.
|
||||
* @throws IOException
|
||||
*/
|
||||
private <T extends BaseRecord> List<T> getAllFile(
|
||||
BufferedReader reader, Class<T> clazz, boolean includeDates)
|
||||
throws IOException {
|
||||
|
||||
List<T> ret = new ArrayList<T>();
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
if (!line.startsWith("#") && line.length() > 0) {
|
||||
try {
|
||||
T record = newRecord(line, clazz, includeDates);
|
||||
ret.add(record);
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Cannot parse line in data source file: {}", line, ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz)
|
||||
throws IOException {
|
||||
return get(clazz, (String)null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz, String sub)
|
||||
throws IOException {
|
||||
verifyDriverReady();
|
||||
BufferedReader reader = null;
|
||||
lockRecordRead(clazz);
|
||||
try {
|
||||
reader = getReader(clazz, sub);
|
||||
List<T> data = getAllFile(reader, clazz, true);
|
||||
return new QueryResult<T>(data, getTime());
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Cannot fetch records {}", clazz.getSimpleName());
|
||||
throw new IOException("Cannot read from data store " + ex.getMessage());
|
||||
} finally {
|
||||
if (reader != null) {
|
||||
try {
|
||||
reader.close();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed closing file", e);
|
||||
}
|
||||
}
|
||||
unlockRecordRead(clazz);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Overwrite the existing data with a new data set.
|
||||
*
|
||||
* @param list List of records to write.
|
||||
* @param writer BufferedWriter stream to write to.
|
||||
* @return If the records were succesfully written.
|
||||
*/
|
||||
private <T extends BaseRecord> boolean writeAllFile(
|
||||
Collection<T> records, BufferedWriter writer) {
|
||||
|
||||
try {
|
||||
for (BaseRecord record : records) {
|
||||
try {
|
||||
String data = serializeString(record);
|
||||
writer.write(data);
|
||||
writer.newLine();
|
||||
} catch (IllegalArgumentException ex) {
|
||||
LOG.error("Cannot write record {} to file", record, ex);
|
||||
}
|
||||
}
|
||||
writer.flush();
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
LOG.error("Cannot commit records to file", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Overwrite the existing data with a new data set. Replaces all records in
|
||||
* the data store for this record class. If all records in the data store are
|
||||
* not successfully committed, this function must return false and leave the
|
||||
* data store unchanged.
|
||||
*
|
||||
* @param records List of records to write. All records must be of type
|
||||
* recordClass.
|
||||
* @param recordClass Class of record to replace.
|
||||
* @return true if all operations were successful, false otherwise.
|
||||
* @throws StateStoreUnavailableException
|
||||
*/
|
||||
public <T extends BaseRecord> boolean writeAll(
|
||||
Collection<T> records, Class<T> recordClass)
|
||||
throws StateStoreUnavailableException {
|
||||
verifyDriverReady();
|
||||
lockRecordWrite(recordClass);
|
||||
BufferedWriter writer = null;
|
||||
try {
|
||||
writer = getWriter(recordClass, null);
|
||||
return writeAllFile(records, writer);
|
||||
} catch (Exception e) {
|
||||
LOG.error(
|
||||
"Cannot add records to file for {}", recordClass.getSimpleName(), e);
|
||||
return false;
|
||||
} finally {
|
||||
if (writer != null) {
|
||||
try {
|
||||
writer.close();
|
||||
} catch (IOException e) {
|
||||
LOG.error(
|
||||
"Cannot close writer for {}", recordClass.getSimpleName(), e);
|
||||
}
|
||||
}
|
||||
unlockRecordWrite(recordClass);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the data file name.
|
||||
*
|
||||
* @return Data file name.
|
||||
*/
|
||||
protected String getDataFileName() {
|
||||
return DATA_FILE_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDriverReady() {
|
||||
return this.initialized;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends BaseRecord> boolean putAll(
|
||||
List<T> records, boolean allowUpdate, boolean errorIfExists)
|
||||
throws StateStoreUnavailableException {
|
||||
verifyDriverReady();
|
||||
|
||||
if (records.isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
Class<T> clazz = (Class<T>) getRecordClass(records.get(0).getClass());
|
||||
QueryResult<T> result;
|
||||
try {
|
||||
result = get(clazz);
|
||||
} catch (IOException e) {
|
||||
return false;
|
||||
}
|
||||
Map<Object, T> writeList = new HashMap<>();
|
||||
|
||||
// Write all of the existing records
|
||||
for (T existingRecord : result.getRecords()) {
|
||||
String key = existingRecord.getPrimaryKey();
|
||||
writeList.put(key, existingRecord);
|
||||
}
|
||||
|
||||
// Add inserts and updates, overwrite any existing values
|
||||
for (T updatedRecord : records) {
|
||||
try {
|
||||
updatedRecord.validate();
|
||||
String key = updatedRecord.getPrimaryKey();
|
||||
if (writeList.containsKey(key) && allowUpdate) {
|
||||
// Update
|
||||
writeList.put(key, updatedRecord);
|
||||
// Update the mod time stamp. Many backends will use their
|
||||
// own timestamp for the mod time.
|
||||
updatedRecord.setDateModified(this.getTime());
|
||||
} else if (!writeList.containsKey(key)) {
|
||||
// Insert
|
||||
// Create/Mod timestamps are already initialized
|
||||
writeList.put(key, updatedRecord);
|
||||
} else if (errorIfExists) {
|
||||
LOG.error("Attempt to insert record {} that already exists",
|
||||
updatedRecord);
|
||||
return false;
|
||||
}
|
||||
} catch (IllegalArgumentException ex) {
|
||||
LOG.error("Cannot write invalid record to State Store", ex);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Write all
|
||||
boolean status = writeAll(writeList.values(), clazz);
|
||||
return status;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends BaseRecord> int remove(Class<T> clazz, Query<T> query)
|
||||
throws StateStoreUnavailableException {
|
||||
verifyDriverReady();
|
||||
|
||||
if (query == null) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int removed = 0;
|
||||
// Get the current records
|
||||
try {
|
||||
final QueryResult<T> result = get(clazz);
|
||||
final List<T> existingRecords = result.getRecords();
|
||||
// Write all of the existing records except those to be removed
|
||||
final List<T> recordsToRemove = filterMultiple(query, existingRecords);
|
||||
removed = recordsToRemove.size();
|
||||
final List<T> newRecords = new LinkedList<>();
|
||||
for (T record : existingRecords) {
|
||||
if (!recordsToRemove.contains(record)) {
|
||||
newRecords.add(record);
|
||||
}
|
||||
}
|
||||
if (!writeAll(newRecords, clazz)) {
|
||||
throw new IOException(
|
||||
"Cannot remove record " + clazz + " query " + query);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Cannot remove records {} query {}", clazz, query, e);
|
||||
}
|
||||
|
||||
return removed;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends BaseRecord> boolean removeAll(Class<T> clazz)
|
||||
throws StateStoreUnavailableException {
|
||||
verifyDriverReady();
|
||||
List<T> emptyList = new ArrayList<>();
|
||||
boolean status = writeAll(emptyList, clazz);
|
||||
return status;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,161 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.io.Files;
|
||||
|
||||
/**
|
||||
* StateStoreDriver implementation based on a local file.
|
||||
*/
|
||||
public class StateStoreFileImpl extends StateStoreFileBaseImpl {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(StateStoreFileImpl.class);
|
||||
|
||||
/** Configuration keys. */
|
||||
public static final String FEDERATION_STORE_FILE_DIRECTORY =
|
||||
DFSConfigKeys.FEDERATION_STORE_PREFIX + "driver.file.directory";
|
||||
|
||||
/** Synchronization. */
|
||||
private static final ReadWriteLock READ_WRITE_LOCK =
|
||||
new ReentrantReadWriteLock();
|
||||
|
||||
/** Root directory for the state store. */
|
||||
private String rootDirectory;
|
||||
|
||||
|
||||
@Override
|
||||
protected boolean exists(String path) {
|
||||
File test = new File(path);
|
||||
return test.exists();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean mkdir(String path) {
|
||||
File dir = new File(path);
|
||||
return dir.mkdirs();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getRootDir() {
|
||||
if (this.rootDirectory == null) {
|
||||
String dir = getConf().get(FEDERATION_STORE_FILE_DIRECTORY);
|
||||
if (dir == null) {
|
||||
File tempDir = Files.createTempDir();
|
||||
dir = tempDir.getAbsolutePath();
|
||||
}
|
||||
this.rootDirectory = dir;
|
||||
}
|
||||
return this.rootDirectory;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T extends BaseRecord> void lockRecordWrite(Class<T> recordClass) {
|
||||
// TODO - Synchronize via FS
|
||||
READ_WRITE_LOCK.writeLock().lock();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T extends BaseRecord> void unlockRecordWrite(
|
||||
Class<T> recordClass) {
|
||||
// TODO - Synchronize via FS
|
||||
READ_WRITE_LOCK.writeLock().unlock();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T extends BaseRecord> void lockRecordRead(Class<T> recordClass) {
|
||||
// TODO - Synchronize via FS
|
||||
READ_WRITE_LOCK.readLock().lock();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T extends BaseRecord> void unlockRecordRead(Class<T> recordClass) {
|
||||
// TODO - Synchronize via FS
|
||||
READ_WRITE_LOCK.readLock().unlock();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T extends BaseRecord> BufferedReader getReader(
|
||||
Class<T> clazz, String sub) {
|
||||
String filename = StateStoreUtils.getRecordName(clazz);
|
||||
if (sub != null && sub.length() > 0) {
|
||||
filename += "/" + sub;
|
||||
}
|
||||
filename += "/" + getDataFileName();
|
||||
|
||||
try {
|
||||
LOG.debug("Loading file: {}", filename);
|
||||
File file = new File(getRootDir(), filename);
|
||||
FileInputStream fis = new FileInputStream(file);
|
||||
InputStreamReader isr =
|
||||
new InputStreamReader(fis, StandardCharsets.UTF_8);
|
||||
BufferedReader reader = new BufferedReader(isr);
|
||||
return reader;
|
||||
} catch (Exception ex) {
|
||||
LOG.error(
|
||||
"Cannot open read stream for record {}", clazz.getSimpleName(), ex);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T extends BaseRecord> BufferedWriter getWriter(
|
||||
Class<T> clazz, String sub) {
|
||||
String filename = StateStoreUtils.getRecordName(clazz);
|
||||
if (sub != null && sub.length() > 0) {
|
||||
filename += "/" + sub;
|
||||
}
|
||||
filename += "/" + getDataFileName();
|
||||
|
||||
try {
|
||||
File file = new File(getRootDir(), filename);
|
||||
FileOutputStream fos = new FileOutputStream(file, false);
|
||||
OutputStreamWriter osw =
|
||||
new OutputStreamWriter(fos, StandardCharsets.UTF_8);
|
||||
BufferedWriter writer = new BufferedWriter(osw);
|
||||
return writer;
|
||||
} catch (IOException ex) {
|
||||
LOG.error(
|
||||
"Cannot open read stream for record {}", clazz.getSimpleName(), ex);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
setInitialized(false);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,178 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.net.URI;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* StateStoreDriver} implementation based on a filesystem. The most common uses
|
||||
* HDFS as a backend.
|
||||
*/
|
||||
public class StateStoreFileSystemImpl extends StateStoreFileBaseImpl {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(StateStoreFileSystemImpl.class);
|
||||
|
||||
|
||||
/** Configuration keys. */
|
||||
public static final String FEDERATION_STORE_FS_PATH =
|
||||
DFSConfigKeys.FEDERATION_STORE_PREFIX + "driver.fs.path";
|
||||
|
||||
/** File system to back the State Store. */
|
||||
private FileSystem fs;
|
||||
/** Working path in the filesystem. */
|
||||
private String workPath;
|
||||
|
||||
@Override
|
||||
protected boolean exists(String path) {
|
||||
try {
|
||||
return fs.exists(new Path(path));
|
||||
} catch (IOException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean mkdir(String path) {
|
||||
try {
|
||||
return fs.mkdirs(new Path(path));
|
||||
} catch (IOException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getRootDir() {
|
||||
if (this.workPath == null) {
|
||||
String rootPath = getConf().get(FEDERATION_STORE_FS_PATH);
|
||||
URI workUri;
|
||||
try {
|
||||
workUri = new URI(rootPath);
|
||||
fs = FileSystem.get(workUri, getConf());
|
||||
} catch (Exception ex) {
|
||||
return null;
|
||||
}
|
||||
this.workPath = rootPath;
|
||||
}
|
||||
return this.workPath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
if (fs != null) {
|
||||
fs.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the folder path for the record class' data.
|
||||
*
|
||||
* @param cls Data record class.
|
||||
* @return Path of the folder containing the record class' data files.
|
||||
*/
|
||||
private Path getPathForClass(Class<? extends BaseRecord> clazz) {
|
||||
if (clazz == null) {
|
||||
return null;
|
||||
}
|
||||
// TODO extract table name from class: entry.getTableName()
|
||||
String className = StateStoreUtils.getRecordName(clazz);
|
||||
return new Path(workPath, className);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T extends BaseRecord> void lockRecordRead(Class<T> clazz) {
|
||||
// Not required, synced with HDFS leasing
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T extends BaseRecord> void unlockRecordRead(Class<T> clazz) {
|
||||
// Not required, synced with HDFS leasing
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T extends BaseRecord> void lockRecordWrite(Class<T> clazz) {
|
||||
// TODO -> wait for lease to be available
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T extends BaseRecord> void unlockRecordWrite(Class<T> clazz) {
|
||||
// TODO -> ensure lease is closed for the file
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T extends BaseRecord> BufferedReader getReader(
|
||||
Class<T> clazz, String sub) {
|
||||
|
||||
Path path = getPathForClass(clazz);
|
||||
if (sub != null && sub.length() > 0) {
|
||||
path = Path.mergePaths(path, new Path("/" + sub));
|
||||
}
|
||||
path = Path.mergePaths(path, new Path("/" + getDataFileName()));
|
||||
|
||||
try {
|
||||
FSDataInputStream fdis = fs.open(path);
|
||||
InputStreamReader isr =
|
||||
new InputStreamReader(fdis, StandardCharsets.UTF_8);
|
||||
BufferedReader reader = new BufferedReader(isr);
|
||||
return reader;
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Cannot open write stream for {} to {}",
|
||||
clazz.getSimpleName(), path);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T extends BaseRecord> BufferedWriter getWriter(
|
||||
Class<T> clazz, String sub) {
|
||||
|
||||
Path path = getPathForClass(clazz);
|
||||
if (sub != null && sub.length() > 0) {
|
||||
path = Path.mergePaths(path, new Path("/" + sub));
|
||||
}
|
||||
path = Path.mergePaths(path, new Path("/" + getDataFileName()));
|
||||
|
||||
try {
|
||||
FSDataOutputStream fdos = fs.create(path, true);
|
||||
OutputStreamWriter osw =
|
||||
new OutputStreamWriter(fdos, StandardCharsets.UTF_8);
|
||||
BufferedWriter writer = new BufferedWriter(osw);
|
||||
return writer;
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Cannot open write stream for {} to {}",
|
||||
clazz.getSimpleName(), path);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,77 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
|
||||
|
||||
/**
|
||||
* State Store driver that stores a serialization of the records. The serializer
|
||||
* is pluggable.
|
||||
*/
|
||||
public abstract class StateStoreSerializableImpl extends StateStoreBaseImpl {
|
||||
|
||||
/** Default serializer for this driver. */
|
||||
private StateStoreSerializer serializer;
|
||||
|
||||
|
||||
@Override
|
||||
public boolean init(final Configuration config, final String id,
|
||||
final Collection<Class<? extends BaseRecord>> records) {
|
||||
boolean ret = super.init(config, id, records);
|
||||
|
||||
this.serializer = StateStoreSerializer.getSerializer(config);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Serialize a record using the serializer.
|
||||
* @param record Record to serialize.
|
||||
* @return Byte array with the serialization of the record.
|
||||
*/
|
||||
protected <T extends BaseRecord> byte[] serialize(T record) {
|
||||
return serializer.serialize(record);
|
||||
}
|
||||
|
||||
/**
|
||||
* Serialize a record using the serializer.
|
||||
* @param record Record to serialize.
|
||||
* @return String with the serialization of the record.
|
||||
*/
|
||||
protected <T extends BaseRecord> String serializeString(T record) {
|
||||
return serializer.serializeString(record);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a record from an input data string.
|
||||
* @param data Serialized text of the record.
|
||||
* @param clazz Record class.
|
||||
* @param includeDates If dateModified and dateCreated are serialized.
|
||||
* @return The created record.
|
||||
* @throws IOException
|
||||
*/
|
||||
protected <T extends BaseRecord> T newRecord(
|
||||
String data, Class<T> clazz, boolean includeDates) throws IOException {
|
||||
return serializer.deserialize(data, clazz);
|
||||
}
|
||||
}
|
|
@ -122,6 +122,24 @@ public abstract class BaseRecord implements Comparable<BaseRecord> {
|
|||
return builder.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if this record matches a partial record.
|
||||
*
|
||||
* @param other Partial record.
|
||||
* @return If this record matches.
|
||||
*/
|
||||
public boolean like(BaseRecord other) {
|
||||
if (other == null) {
|
||||
return false;
|
||||
}
|
||||
Map<String, String> thisKeys = this.getPrimaryKeys();
|
||||
Map<String, String> otherKeys = other.getPrimaryKeys();
|
||||
if (thisKeys == null) {
|
||||
return otherKeys == null;
|
||||
}
|
||||
return thisKeys.equals(otherKeys);
|
||||
}
|
||||
|
||||
/**
|
||||
* Override equals check to use primary key(s) for comparison.
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.store.records;
|
||||
|
||||
/**
|
||||
* Check if a record matches a query. The query is usually a partial record.
|
||||
*
|
||||
* @param <T> Type of the record to query.
|
||||
*/
|
||||
public class Query<T extends BaseRecord> {
|
||||
|
||||
/** Partial object to compare against. */
|
||||
private final T partial;
|
||||
|
||||
|
||||
/**
|
||||
* Create a query to search for a partial record.
|
||||
*
|
||||
* @param partial It defines the attributes to search.
|
||||
*/
|
||||
public Query(final T part) {
|
||||
this.partial = part;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the partial record used to query.
|
||||
*
|
||||
* @return The partial record used for the query.
|
||||
*/
|
||||
public T getPartial() {
|
||||
return this.partial;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a record matches the primary keys or the partial record.
|
||||
*
|
||||
* @param other Record to check.
|
||||
* @return If the record matches. Don't match if there is no partial.
|
||||
*/
|
||||
public boolean matches(T other) {
|
||||
if (this.partial == null) {
|
||||
return false;
|
||||
}
|
||||
return this.partial.like(other);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Checking: " + this.partial;
|
||||
}
|
||||
}
|
|
@ -4674,4 +4674,20 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.federation.router.store.driver.class</name>
|
||||
<value>org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl</value>
|
||||
<description>
|
||||
Class to implement the State Store. By default it uses the local disk.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.federation.router.store.connection.test</name>
|
||||
<value>60000</value>
|
||||
<description>
|
||||
How often to check for the connection to the State Store in milliseconds.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
|
|
@ -0,0 +1,232 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.store;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS;
|
||||
import static org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl.FEDERATION_STORE_FILE_DIRECTORY;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileBaseImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
/**
|
||||
* Utilities to test the State Store.
|
||||
*/
|
||||
public final class FederationStateStoreTestUtils {
|
||||
|
||||
private FederationStateStoreTestUtils() {
|
||||
// Utility Class
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the default State Store driver implementation.
|
||||
*
|
||||
* @return Class of the default State Store driver implementation.
|
||||
*/
|
||||
public static Class<? extends StateStoreDriver> getDefaultDriver() {
|
||||
return DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS_DEFAULT;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a default State Store configuration.
|
||||
*
|
||||
* @return State Store configuration.
|
||||
*/
|
||||
public static Configuration getStateStoreConfiguration() {
|
||||
Class<? extends StateStoreDriver> clazz = getDefaultDriver();
|
||||
return getStateStoreConfiguration(clazz);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new State Store configuration for a particular driver.
|
||||
*
|
||||
* @param clazz Class of the driver to create.
|
||||
* @return State Store configuration.
|
||||
*/
|
||||
public static Configuration getStateStoreConfiguration(
|
||||
Class<? extends StateStoreDriver> clazz) {
|
||||
Configuration conf = new HdfsConfiguration(false);
|
||||
|
||||
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
|
||||
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "hdfs://test");
|
||||
|
||||
conf.setClass(FEDERATION_STORE_DRIVER_CLASS, clazz, StateStoreDriver.class);
|
||||
|
||||
if (clazz.isAssignableFrom(StateStoreFileBaseImpl.class)) {
|
||||
setFileConfiguration(conf);
|
||||
}
|
||||
return conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new State Store based on a configuration.
|
||||
*
|
||||
* @param configuration Configuration for the State Store.
|
||||
* @return New State Store service.
|
||||
* @throws IOException If it cannot create the State Store.
|
||||
* @throws InterruptedException If we cannot wait for the store to start.
|
||||
*/
|
||||
public static StateStoreService getStateStore(
|
||||
Configuration configuration) throws IOException, InterruptedException {
|
||||
|
||||
StateStoreService stateStore = new StateStoreService();
|
||||
assertNotNull(stateStore);
|
||||
|
||||
// Set unique identifier, this is normally the router address
|
||||
String identifier = UUID.randomUUID().toString();
|
||||
stateStore.setIdentifier(identifier);
|
||||
|
||||
stateStore.init(configuration);
|
||||
stateStore.start();
|
||||
|
||||
// Wait for state store to connect
|
||||
waitStateStore(stateStore, TimeUnit.SECONDS.toMillis(10));
|
||||
|
||||
return stateStore;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the State Store to initialize its driver.
|
||||
*
|
||||
* @param stateStore State Store.
|
||||
* @param timeoutMs Time out in milliseconds.
|
||||
* @throws IOException If the State Store cannot be reached.
|
||||
* @throws InterruptedException If the sleep is interrupted.
|
||||
*/
|
||||
public static void waitStateStore(StateStoreService stateStore,
|
||||
long timeoutMs) throws IOException, InterruptedException {
|
||||
long startingTime = Time.monotonicNow();
|
||||
while (!stateStore.isDriverReady()) {
|
||||
Thread.sleep(100);
|
||||
if (Time.monotonicNow() - startingTime > timeoutMs) {
|
||||
throw new IOException("Timeout waiting for State Store to connect");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete the default State Store.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void deleteStateStore() throws IOException {
|
||||
Class<? extends StateStoreDriver> driverClass = getDefaultDriver();
|
||||
deleteStateStore(driverClass);
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete the State Store.
|
||||
* @param driverClass Class of the State Store driver implementation.
|
||||
* @throws IOException If it cannot be deleted.
|
||||
*/
|
||||
public static void deleteStateStore(
|
||||
Class<? extends StateStoreDriver> driverClass) throws IOException {
|
||||
|
||||
if (StateStoreFileBaseImpl.class.isAssignableFrom(driverClass)) {
|
||||
String workingDirectory = System.getProperty("user.dir");
|
||||
File dir = new File(workingDirectory + "/statestore");
|
||||
if (dir.exists()) {
|
||||
FileUtils.cleanDirectory(dir);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the default configuration for drivers based on files.
|
||||
*
|
||||
* @param conf Configuration to extend.
|
||||
*/
|
||||
public static void setFileConfiguration(Configuration conf) {
|
||||
String workingPath = System.getProperty("user.dir");
|
||||
String stateStorePath = workingPath + "/statestore";
|
||||
conf.set(FEDERATION_STORE_FILE_DIRECTORY, stateStorePath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear all the records from the State Store.
|
||||
*
|
||||
* @param store State Store to remove records from.
|
||||
* @return If the State Store was cleared.
|
||||
* @throws IOException If it cannot clear the State Store.
|
||||
*/
|
||||
public static boolean clearAllRecords(StateStoreService store)
|
||||
throws IOException {
|
||||
Collection<Class<? extends BaseRecord>> allRecords =
|
||||
store.getSupportedRecords();
|
||||
for (Class<? extends BaseRecord> recordType : allRecords) {
|
||||
if (!clearRecords(store, recordType)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear records from a certain type from the State Store.
|
||||
*
|
||||
* @param store State Store to remove records from.
|
||||
* @param recordClass Class of the records to remove.
|
||||
* @return If the State Store was cleared.
|
||||
* @throws IOException If it cannot clear the State Store.
|
||||
*/
|
||||
public static <T extends BaseRecord> boolean clearRecords(
|
||||
StateStoreService store, Class<T> recordClass) throws IOException {
|
||||
List<T> emptyList = new ArrayList<>();
|
||||
if (!synchronizeRecords(store, emptyList, recordClass)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Synchronize a set of records. Remove all and keep the ones specified.
|
||||
*
|
||||
* @param stateStore State Store service managing the driver.
|
||||
* @param records Records to add.
|
||||
* @param clazz Class of the record to synchronize.
|
||||
* @return If the synchronization succeeded.
|
||||
* @throws IOException If it cannot connect to the State Store.
|
||||
*/
|
||||
public static <T extends BaseRecord> boolean synchronizeRecords(
|
||||
StateStoreService stateStore, List<T> records, Class<T> clazz)
|
||||
throws IOException {
|
||||
StateStoreDriver driver = stateStore.getDriver();
|
||||
driver.verifyDriverReady();
|
||||
if (driver.removeAll(clazz)) {
|
||||
if (driver.putAll(records, true, false)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,483 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.store.driver;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
|
||||
import org.junit.AfterClass;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Base tests for the driver. The particular implementations will use this to
|
||||
* test their functionality.
|
||||
*/
|
||||
public class TestStateStoreDriverBase {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestStateStoreDriverBase.class);
|
||||
|
||||
private static StateStoreService stateStore;
|
||||
private static Configuration conf;
|
||||
|
||||
|
||||
/**
|
||||
* Get the State Store driver.
|
||||
* @return State Store driver.
|
||||
*/
|
||||
protected StateStoreDriver getStateStoreDriver() {
|
||||
return stateStore.getDriver();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownCluster() {
|
||||
if (stateStore != null) {
|
||||
stateStore.stop();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a new State Store using this configuration.
|
||||
*
|
||||
* @param config Configuration for the State Store.
|
||||
* @throws Exception If we cannot get the State Store.
|
||||
*/
|
||||
public static void getStateStore(Configuration config) throws Exception {
|
||||
conf = config;
|
||||
stateStore = FederationStateStoreTestUtils.getStateStore(conf);
|
||||
}
|
||||
|
||||
private <T extends BaseRecord> T generateFakeRecord(Class<T> recordClass)
|
||||
throws IllegalArgumentException, IllegalAccessException, IOException {
|
||||
|
||||
// TODO add record
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate if a record is the same.
|
||||
*
|
||||
* @param original
|
||||
* @param committed
|
||||
* @param assertEquals Assert if the records are equal or just return.
|
||||
* @return
|
||||
* @throws IllegalArgumentException
|
||||
* @throws IllegalAccessException
|
||||
*/
|
||||
private boolean validateRecord(
|
||||
BaseRecord original, BaseRecord committed, boolean assertEquals)
|
||||
throws IllegalArgumentException, IllegalAccessException {
|
||||
|
||||
boolean ret = true;
|
||||
|
||||
Map<String, Class<?>> fields = getFields(original);
|
||||
for (String key : fields.keySet()) {
|
||||
if (key.equals("dateModified") ||
|
||||
key.equals("dateCreated") ||
|
||||
key.equals("proto")) {
|
||||
// Fields are updated/set on commit and fetch and may not match
|
||||
// the fields that are initialized in a non-committed object.
|
||||
continue;
|
||||
}
|
||||
Object data1 = getField(original, key);
|
||||
Object data2 = getField(committed, key);
|
||||
if (assertEquals) {
|
||||
assertEquals("Field " + key + " does not match", data1, data2);
|
||||
} else if (!data1.equals(data2)) {
|
||||
ret = false;
|
||||
}
|
||||
}
|
||||
|
||||
long now = stateStore.getDriver().getTime();
|
||||
assertTrue(
|
||||
committed.getDateCreated() <= now && committed.getDateCreated() > 0);
|
||||
assertTrue(committed.getDateModified() >= committed.getDateCreated());
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
public static void removeAll(StateStoreDriver driver) throws IOException {
|
||||
// TODO add records to remove
|
||||
}
|
||||
|
||||
public <T extends BaseRecord> void testInsert(
|
||||
StateStoreDriver driver, Class<T> recordClass)
|
||||
throws IllegalArgumentException, IllegalAccessException, IOException {
|
||||
|
||||
assertTrue(driver.removeAll(recordClass));
|
||||
QueryResult<T> records = driver.get(recordClass);
|
||||
assertTrue(records.getRecords().isEmpty());
|
||||
|
||||
// Insert single
|
||||
BaseRecord record = generateFakeRecord(recordClass);
|
||||
driver.put(record, true, false);
|
||||
|
||||
// Verify
|
||||
records = driver.get(recordClass);
|
||||
assertEquals(1, records.getRecords().size());
|
||||
validateRecord(record, records.getRecords().get(0), true);
|
||||
|
||||
// Insert multiple
|
||||
List<T> insertList = new ArrayList<>();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
T newRecord = generateFakeRecord(recordClass);
|
||||
insertList.add(newRecord);
|
||||
}
|
||||
driver.putAll(insertList, true, false);
|
||||
|
||||
// Verify
|
||||
records = driver.get(recordClass);
|
||||
assertEquals(11, records.getRecords().size());
|
||||
}
|
||||
|
||||
public <T extends BaseRecord> void testFetchErrors(StateStoreDriver driver,
|
||||
Class<T> clazz) throws IllegalAccessException, IOException {
|
||||
|
||||
// Fetch empty list
|
||||
driver.removeAll(clazz);
|
||||
QueryResult<T> result0 = driver.get(clazz);
|
||||
assertNotNull(result0);
|
||||
List<T> records0 = result0.getRecords();
|
||||
assertEquals(records0.size(), 0);
|
||||
|
||||
// Insert single
|
||||
BaseRecord record = generateFakeRecord(clazz);
|
||||
assertTrue(driver.put(record, true, false));
|
||||
|
||||
// Verify
|
||||
QueryResult<T> result1 = driver.get(clazz);
|
||||
List<T> records1 = result1.getRecords();
|
||||
assertEquals(1, records1.size());
|
||||
validateRecord(record, records1.get(0), true);
|
||||
|
||||
// Test fetch single object with a bad query
|
||||
final T fakeRecord = generateFakeRecord(clazz);
|
||||
final Query<T> query = new Query<T>(fakeRecord);
|
||||
T getRecord = driver.get(clazz, query);
|
||||
assertNull(getRecord);
|
||||
|
||||
// Test fetch multiple objects does not exist returns empty list
|
||||
assertEquals(driver.getMultiple(clazz, query).size(), 0);
|
||||
}
|
||||
|
||||
public <T extends BaseRecord> void testPut(
|
||||
StateStoreDriver driver, Class<T> clazz)
|
||||
throws IllegalArgumentException, ReflectiveOperationException,
|
||||
IOException, SecurityException {
|
||||
|
||||
driver.removeAll(clazz);
|
||||
QueryResult<T> records = driver.get(clazz);
|
||||
assertTrue(records.getRecords().isEmpty());
|
||||
|
||||
// Insert multiple
|
||||
List<T> insertList = new ArrayList<>();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
T newRecord = generateFakeRecord(clazz);
|
||||
insertList.add(newRecord);
|
||||
}
|
||||
|
||||
// Verify
|
||||
assertTrue(driver.putAll(insertList, false, true));
|
||||
records = driver.get(clazz);
|
||||
assertEquals(records.getRecords().size(), 10);
|
||||
|
||||
// Generate a new record with the same PK fields as an existing record
|
||||
BaseRecord updatedRecord = generateFakeRecord(clazz);
|
||||
BaseRecord existingRecord = records.getRecords().get(0);
|
||||
Map<String, String> primaryKeys = existingRecord.getPrimaryKeys();
|
||||
for (Entry<String, String> entry : primaryKeys.entrySet()) {
|
||||
String key = entry.getKey();
|
||||
String value = entry.getValue();
|
||||
Class<?> fieldType = getFieldType(existingRecord, key);
|
||||
Object field = fromString(value, fieldType);
|
||||
assertTrue(setField(updatedRecord, key, field));
|
||||
}
|
||||
|
||||
// Attempt an update of an existing entry, but it is not allowed.
|
||||
assertFalse(driver.put(updatedRecord, false, true));
|
||||
|
||||
// Verify no update occurred, all original records are unchanged
|
||||
QueryResult<T> newRecords = driver.get(clazz);
|
||||
assertTrue(newRecords.getRecords().size() == 10);
|
||||
assertEquals("A single entry was improperly updated in the store", 10,
|
||||
countMatchingEntries(records.getRecords(), newRecords.getRecords()));
|
||||
|
||||
// Update the entry (allowing updates)
|
||||
assertTrue(driver.put(updatedRecord, true, false));
|
||||
|
||||
// Verify that one entry no longer matches the original set
|
||||
newRecords = driver.get(clazz);
|
||||
assertEquals(10, newRecords.getRecords().size());
|
||||
assertEquals(
|
||||
"Record of type " + clazz + " not updated in the store", 9,
|
||||
countMatchingEntries(records.getRecords(), newRecords.getRecords()));
|
||||
}
|
||||
|
||||
private int countMatchingEntries(
|
||||
Collection<? extends BaseRecord> committedList,
|
||||
Collection<? extends BaseRecord> matchList) {
|
||||
|
||||
int matchingCount = 0;
|
||||
for (BaseRecord committed : committedList) {
|
||||
for (BaseRecord match : matchList) {
|
||||
try {
|
||||
if (match.getPrimaryKey().equals(committed.getPrimaryKey())) {
|
||||
if (validateRecord(match, committed, false)) {
|
||||
matchingCount++;
|
||||
}
|
||||
break;
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
}
|
||||
}
|
||||
}
|
||||
return matchingCount;
|
||||
}
|
||||
|
||||
public <T extends BaseRecord> void testRemove(
|
||||
StateStoreDriver driver, Class<T> clazz)
|
||||
throws IllegalArgumentException, IllegalAccessException, IOException {
|
||||
|
||||
// Remove all
|
||||
assertTrue(driver.removeAll(clazz));
|
||||
QueryResult<T> records = driver.get(clazz);
|
||||
assertTrue(records.getRecords().isEmpty());
|
||||
|
||||
// Insert multiple
|
||||
List<T> insertList = new ArrayList<>();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
T newRecord = generateFakeRecord(clazz);
|
||||
insertList.add(newRecord);
|
||||
}
|
||||
|
||||
// Verify
|
||||
assertTrue(driver.putAll(insertList, false, true));
|
||||
records = driver.get(clazz);
|
||||
assertEquals(records.getRecords().size(), 10);
|
||||
|
||||
// Remove Single
|
||||
assertTrue(driver.remove(records.getRecords().get(0)));
|
||||
|
||||
// Verify
|
||||
records = driver.get(clazz);
|
||||
assertEquals(records.getRecords().size(), 9);
|
||||
|
||||
// Remove with filter
|
||||
final T firstRecord = records.getRecords().get(0);
|
||||
final Query<T> query0 = new Query<T>(firstRecord);
|
||||
assertTrue(driver.remove(clazz, query0) > 0);
|
||||
|
||||
final T secondRecord = records.getRecords().get(1);
|
||||
final Query<T> query1 = new Query<T>(secondRecord);
|
||||
assertTrue(driver.remove(clazz, query1) > 0);
|
||||
|
||||
// Verify
|
||||
records = driver.get(clazz);
|
||||
assertEquals(records.getRecords().size(), 7);
|
||||
|
||||
// Remove all
|
||||
assertTrue(driver.removeAll(clazz));
|
||||
|
||||
// Verify
|
||||
records = driver.get(clazz);
|
||||
assertTrue(records.getRecords().isEmpty());
|
||||
}
|
||||
|
||||
public void testInsert(StateStoreDriver driver)
|
||||
throws IllegalArgumentException, IllegalAccessException, IOException {
|
||||
// TODO add records
|
||||
}
|
||||
|
||||
public void testPut(StateStoreDriver driver)
|
||||
throws IllegalArgumentException, ReflectiveOperationException,
|
||||
IOException, SecurityException {
|
||||
// TODO add records
|
||||
}
|
||||
|
||||
public void testRemove(StateStoreDriver driver)
|
||||
throws IllegalArgumentException, IllegalAccessException, IOException {
|
||||
// TODO add records
|
||||
}
|
||||
|
||||
public void testFetchErrors(StateStoreDriver driver)
|
||||
throws IllegalArgumentException, IllegalAccessException, IOException {
|
||||
// TODO add records
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the value of a field on the object.
|
||||
*
|
||||
* @param fieldName The string name of the field.
|
||||
* @param data The data to pass to the field's setter.
|
||||
*
|
||||
* @return True if successful, fails if failed.
|
||||
*/
|
||||
private static boolean setField(
|
||||
BaseRecord record, String fieldName, Object data) {
|
||||
|
||||
Method m = locateSetter(record, fieldName);
|
||||
if (m != null) {
|
||||
try {
|
||||
m.invoke(record, data);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Cannot set field " + fieldName + " on object "
|
||||
+ record.getClass().getName() + " to data " + data + " of type "
|
||||
+ data.getClass(), e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds the appropriate setter for a field name.
|
||||
*
|
||||
* @param fieldName The legacy name of the field.
|
||||
* @return The matching setter or null if not found.
|
||||
*/
|
||||
private static Method locateSetter(BaseRecord record, String fieldName) {
|
||||
for (Method m : record.getClass().getMethods()) {
|
||||
if (m.getName().equalsIgnoreCase("set" + fieldName)) {
|
||||
return m;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns all serializable fields in the object.
|
||||
*
|
||||
* @return Map with the fields.
|
||||
*/
|
||||
private static Map<String, Class<?>> getFields(BaseRecord record) {
|
||||
Map<String, Class<?>> getters = new HashMap<>();
|
||||
for (Method m : record.getClass().getDeclaredMethods()) {
|
||||
if (m.getName().startsWith("get")) {
|
||||
try {
|
||||
Class<?> type = m.getReturnType();
|
||||
char[] c = m.getName().substring(3).toCharArray();
|
||||
c[0] = Character.toLowerCase(c[0]);
|
||||
String key = new String(c);
|
||||
getters.put(key, type);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Cannot execute getter " + m.getName()
|
||||
+ " on object " + record);
|
||||
}
|
||||
}
|
||||
}
|
||||
return getters;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the type of a field.
|
||||
*
|
||||
* @param fieldName
|
||||
* @return Field type
|
||||
*/
|
||||
private static Class<?> getFieldType(BaseRecord record, String fieldName) {
|
||||
Method m = locateGetter(record, fieldName);
|
||||
return m.getReturnType();
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches the value for a field name.
|
||||
*
|
||||
* @param fieldName the legacy name of the field.
|
||||
* @return The field data or null if not found.
|
||||
*/
|
||||
private static Object getField(BaseRecord record, String fieldName) {
|
||||
Object result = null;
|
||||
Method m = locateGetter(record, fieldName);
|
||||
if (m != null) {
|
||||
try {
|
||||
result = m.invoke(record);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Cannot get field " + fieldName + " on object " + record);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds the appropriate getter for a field name.
|
||||
*
|
||||
* @param fieldName The legacy name of the field.
|
||||
* @return The matching getter or null if not found.
|
||||
*/
|
||||
private static Method locateGetter(BaseRecord record, String fieldName) {
|
||||
for (Method m : record.getClass().getMethods()) {
|
||||
if (m.getName().equalsIgnoreCase("get" + fieldName)) {
|
||||
return m;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Expands a data object from the store into an record object. Default store
|
||||
* data type is a String. Override if additional serialization is required.
|
||||
*
|
||||
* @param data Object containing the serialized data. Only string is
|
||||
* supported.
|
||||
* @param clazz Target object class to hold the deserialized data.
|
||||
* @return An instance of the target data object initialized with the
|
||||
* deserialized data.
|
||||
*/
|
||||
@Deprecated
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
private static <T> T fromString(String data, Class<T> clazz) {
|
||||
|
||||
if (data.equals("null")) {
|
||||
return null;
|
||||
} else if (clazz == String.class) {
|
||||
return (T) data;
|
||||
} else if (clazz == Long.class || clazz == long.class) {
|
||||
return (T) Long.valueOf(data);
|
||||
} else if (clazz == Integer.class || clazz == int.class) {
|
||||
return (T) Integer.valueOf(data);
|
||||
} else if (clazz == Double.class || clazz == double.class) {
|
||||
return (T) Double.valueOf(data);
|
||||
} else if (clazz == Float.class || clazz == float.class) {
|
||||
return (T) Float.valueOf(data);
|
||||
} else if (clazz == Boolean.class || clazz == boolean.class) {
|
||||
return (T) Boolean.valueOf(data);
|
||||
} else if (clazz.isEnum()) {
|
||||
return (T) Enum.valueOf((Class<Enum>) clazz, data);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,64 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.store.driver;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Test the FileSystem (e.g., HDFS) implementation of the State Store driver.
|
||||
*/
|
||||
public class TestStateStoreFile extends TestStateStoreDriverBase {
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
Configuration conf = getStateStoreConfiguration(StateStoreFileImpl.class);
|
||||
getStateStore(conf);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void startup() throws IOException {
|
||||
removeAll(getStateStoreDriver());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInsert()
|
||||
throws IllegalArgumentException, IllegalAccessException, IOException {
|
||||
testInsert(getStateStoreDriver());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdate()
|
||||
throws IllegalArgumentException, ReflectiveOperationException,
|
||||
IOException, SecurityException {
|
||||
testPut(getStateStoreDriver());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDelete()
|
||||
throws IllegalArgumentException, IllegalAccessException, IOException {
|
||||
testRemove(getStateStoreDriver());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,88 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.store.driver;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileSystemImpl;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Test the FileSystem (e.g., HDFS) implementation of the State Store driver.
|
||||
*/
|
||||
public class TestStateStoreFileSystem extends TestStateStoreDriverBase {
|
||||
|
||||
private static MiniDFSCluster dfsCluster;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
Configuration conf = FederationStateStoreTestUtils
|
||||
.getStateStoreConfiguration(StateStoreFileSystemImpl.class);
|
||||
conf.set(StateStoreFileSystemImpl.FEDERATION_STORE_FS_PATH,
|
||||
"/hdfs-federation/");
|
||||
|
||||
// Create HDFS cluster to back the state tore
|
||||
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
|
||||
builder.numDataNodes(1);
|
||||
dfsCluster = builder.build();
|
||||
dfsCluster.waitClusterUp();
|
||||
getStateStore(conf);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownCluster() {
|
||||
if (dfsCluster != null) {
|
||||
dfsCluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void startup() throws IOException {
|
||||
removeAll(getStateStoreDriver());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInsert()
|
||||
throws IllegalArgumentException, IllegalAccessException, IOException {
|
||||
testInsert(getStateStoreDriver());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdate()
|
||||
throws IllegalArgumentException, IllegalAccessException, IOException {
|
||||
testInsert(getStateStoreDriver());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDelete()
|
||||
throws IllegalArgumentException, IllegalAccessException, IOException {
|
||||
testInsert(getStateStoreDriver());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchErrors()
|
||||
throws IllegalArgumentException, IllegalAccessException, IOException {
|
||||
testFetchErrors(getStateStoreDriver());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue