HDFS-10881. Federation State Store Driver API. Contributed by Jason Kace and Inigo Goiri.
(cherry picked from commit 0f88e04915
)
This commit is contained in:
parent
9e3491fafa
commit
ff134679e0
|
@ -0,0 +1,33 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.federation.store;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thrown when the state store is not reachable or available. Cached APIs and
|
||||||
|
* queries may succeed. Client should retry again later.
|
||||||
|
*/
|
||||||
|
public class StateStoreUnavailableException extends IOException {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
public StateStoreUnavailableException(String msg) {
|
||||||
|
super(msg);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,72 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.federation.store;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set of utility functions used to query, create, update and delete data
|
||||||
|
* records in the state store.
|
||||||
|
*/
|
||||||
|
public final class StateStoreUtils {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(StateStoreUtils.class);
|
||||||
|
|
||||||
|
private StateStoreUtils() {
|
||||||
|
// Utility class
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the base class for a record class. If we get an implementation of a
|
||||||
|
* record we will return the real parent record class.
|
||||||
|
*
|
||||||
|
* @param clazz Class of the data record to check.
|
||||||
|
* @return Base class for the record.
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public static <T extends BaseRecord>
|
||||||
|
Class<? extends BaseRecord> getRecordClass(final Class<T> clazz) {
|
||||||
|
|
||||||
|
// We ignore the Impl classes and go to the super class
|
||||||
|
Class<? extends BaseRecord> actualClazz = clazz;
|
||||||
|
while (actualClazz.getSimpleName().endsWith("Impl")) {
|
||||||
|
actualClazz = (Class<? extends BaseRecord>) actualClazz.getSuperclass();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if we went too far
|
||||||
|
if (actualClazz.equals(BaseRecord.class)) {
|
||||||
|
LOG.error("We went too far (" + actualClazz + ") with " + clazz);
|
||||||
|
actualClazz = clazz;
|
||||||
|
}
|
||||||
|
return actualClazz;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the base class for a record. If we get an implementation of a record we
|
||||||
|
* will return the real parent record class.
|
||||||
|
*
|
||||||
|
* @param record Record to check its main class.
|
||||||
|
* @return Base class for the record.
|
||||||
|
*/
|
||||||
|
public static <T extends BaseRecord>
|
||||||
|
Class<? extends BaseRecord> getRecordClass(final T record) {
|
||||||
|
return getRecordClass(record.getClass());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,172 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.federation.store.driver;
|
||||||
|
|
||||||
|
import java.net.InetAddress;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Driver class for an implementation of a {@link StateStoreService}
|
||||||
|
* provider. Driver implementations will extend this class and implement some of
|
||||||
|
* the default methods.
|
||||||
|
*/
|
||||||
|
public abstract class StateStoreDriver implements StateStoreRecordOperations {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(StateStoreDriver.class);
|
||||||
|
|
||||||
|
|
||||||
|
/** State Store configuration. */
|
||||||
|
private Configuration conf;
|
||||||
|
|
||||||
|
/** Identifier for the driver. */
|
||||||
|
private String identifier;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize the state store connection.
|
||||||
|
* @param config Configuration for the driver.
|
||||||
|
* @param id Identifier for the driver.
|
||||||
|
* @param records Records that are supported.
|
||||||
|
* @return If initialized and ready, false if failed to initialize driver.
|
||||||
|
*/
|
||||||
|
public boolean init(final Configuration config, final String id,
|
||||||
|
final List<Class<? extends BaseRecord>> records) {
|
||||||
|
|
||||||
|
this.conf = config;
|
||||||
|
this.identifier = id;
|
||||||
|
|
||||||
|
if (this.identifier == null) {
|
||||||
|
LOG.warn("The identifier for the State Store connection is not set");
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO stub
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the State Store configuration.
|
||||||
|
*
|
||||||
|
* @return Configuration for the State Store.
|
||||||
|
*/
|
||||||
|
protected Configuration getConf() {
|
||||||
|
return this.conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets a unique identifier for the running task/process. Typically the
|
||||||
|
* router address.
|
||||||
|
*
|
||||||
|
* @return Unique identifier for the running task.
|
||||||
|
*/
|
||||||
|
public String getIdentifier() {
|
||||||
|
return this.identifier;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prepare the driver to access data storage.
|
||||||
|
*
|
||||||
|
* @return True if the driver was successfully initialized. If false is
|
||||||
|
* returned, the state store will periodically attempt to
|
||||||
|
* re-initialize the driver and the router will remain in safe mode
|
||||||
|
* until the driver is initialized.
|
||||||
|
*/
|
||||||
|
public abstract boolean initDriver();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize storage for a single record class.
|
||||||
|
*
|
||||||
|
* @param name String reference of the record class to initialize, used to
|
||||||
|
* construct paths and file names for the record. Determined by
|
||||||
|
* configuration settings for the specific driver.
|
||||||
|
* @param clazz Record type corresponding to the provided name.
|
||||||
|
* @return True if successful, false otherwise.
|
||||||
|
*/
|
||||||
|
public abstract <T extends BaseRecord> boolean initRecordStorage(
|
||||||
|
String className, Class<T> clazz);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the driver is currently running and the data store connection is
|
||||||
|
* valid.
|
||||||
|
*
|
||||||
|
* @return True if the driver is initialized and the data store is ready.
|
||||||
|
*/
|
||||||
|
public abstract boolean isDriverReady();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the driver is ready to be used and throw an exception otherwise.
|
||||||
|
*
|
||||||
|
* @throws StateStoreUnavailableException If the driver is not ready.
|
||||||
|
*/
|
||||||
|
public void verifyDriverReady() throws StateStoreUnavailableException {
|
||||||
|
if (!isDriverReady()) {
|
||||||
|
String driverName = getDriverName();
|
||||||
|
String hostname = getHostname();
|
||||||
|
throw new StateStoreUnavailableException("State Store driver " +
|
||||||
|
driverName + " in " + hostname + " is not ready.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close the State Store driver connection.
|
||||||
|
*/
|
||||||
|
public abstract void close() throws Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the current time synchronization from the underlying store.
|
||||||
|
* Override for stores that supply a current date. The data store driver is
|
||||||
|
* responsible for maintaining the official synchronization time/date for all
|
||||||
|
* distributed components.
|
||||||
|
*
|
||||||
|
* @return Current time stamp, used for all synchronization dates.
|
||||||
|
*/
|
||||||
|
public long getTime() {
|
||||||
|
return Time.now();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the name of the driver implementation for debugging.
|
||||||
|
*
|
||||||
|
* @return Name of the driver implementation.
|
||||||
|
*/
|
||||||
|
private String getDriverName() {
|
||||||
|
return this.getClass().getSimpleName();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the host name of the machine running the driver for debugging.
|
||||||
|
*
|
||||||
|
* @return Host name of the machine running the driver.
|
||||||
|
*/
|
||||||
|
private String getHostname() {
|
||||||
|
String hostname = "Unknown";
|
||||||
|
try {
|
||||||
|
hostname = InetAddress.getLocalHost().getHostName();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Cannot get local address", e);
|
||||||
|
}
|
||||||
|
return hostname;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,164 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.federation.store.driver;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
|
||||||
|
import org.apache.hadoop.io.retry.AtMostOnce;
|
||||||
|
import org.apache.hadoop.io.retry.Idempotent;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Operations for a driver to manage records in the State Store.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public interface StateStoreRecordOperations {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get all records of the requested record class from the data store. To use
|
||||||
|
* the default implementations in this class, getAll must return new instances
|
||||||
|
* of the records on each call. It is recommended to override the default
|
||||||
|
* implementations for better performance.
|
||||||
|
*
|
||||||
|
* @param clazz Class of record to fetch.
|
||||||
|
* @return List of all records that match the clazz.
|
||||||
|
* @throws IOException Throws exception if unable to query the data store.
|
||||||
|
*/
|
||||||
|
@Idempotent
|
||||||
|
<T extends BaseRecord> QueryResult<T> get(Class<T> clazz) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get all records of the requested record class from the data store. To use
|
||||||
|
* the default implementations in this class, getAll must return new instances
|
||||||
|
* of the records on each call. It is recommended to override the default
|
||||||
|
* implementations for better performance.
|
||||||
|
*
|
||||||
|
* @param clazz Class of record to fetch.
|
||||||
|
* @param sub Sub path.
|
||||||
|
* @return List of all records that match the clazz and the sub path.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Idempotent
|
||||||
|
<T extends BaseRecord> QueryResult<T> get(Class<T> clazz, String sub)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a single record from the store that matches the query.
|
||||||
|
*
|
||||||
|
* @param clazz Class of record to fetch.
|
||||||
|
* @param query Map of field names and objects to filter results.
|
||||||
|
* @return A single record matching the query. Null if there are no matching
|
||||||
|
* records or more than one matching record in the store.
|
||||||
|
* @throws IOException If multiple records match or if the data store cannot
|
||||||
|
* be queried.
|
||||||
|
*/
|
||||||
|
@Idempotent
|
||||||
|
<T extends BaseRecord> T get(Class<T> clazz, Map<String, String> query)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get multiple records from the store that match a query. This method
|
||||||
|
* assumes the underlying driver does not support filtering. If the driver
|
||||||
|
* supports filtering it should overwrite this method.
|
||||||
|
*
|
||||||
|
* @param clazz Class of record to fetch.
|
||||||
|
* @param query Map of field names and objects to filter results.
|
||||||
|
* @return Records of type clazz that match the query or empty list if none
|
||||||
|
* are found.
|
||||||
|
* @throws IOException Throws exception if unable to query the data store.
|
||||||
|
*/
|
||||||
|
@Idempotent
|
||||||
|
<T extends BaseRecord> List<T> getMultiple(
|
||||||
|
Class<T> clazz, Map<String, String> query) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a single record. Optionally updates an existing record with same
|
||||||
|
* primary key.
|
||||||
|
*
|
||||||
|
* @param record The record to insert or update.
|
||||||
|
* @param allowUpdate True if update of exiting record is allowed.
|
||||||
|
* @param errorIfExists True if an error should be returned when inserting
|
||||||
|
* an existing record. Only used if allowUpdate = false.
|
||||||
|
* @return True if the operation was successful.
|
||||||
|
*
|
||||||
|
* @throws IOException Throws exception if unable to query the data store.
|
||||||
|
*/
|
||||||
|
@AtMostOnce
|
||||||
|
<T extends BaseRecord> boolean put(
|
||||||
|
T record, boolean allowUpdate, boolean errorIfExists) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates multiple records. Optionally updates existing records that have
|
||||||
|
* the same primary key.
|
||||||
|
*
|
||||||
|
* @param records List of data records to update or create. All records must
|
||||||
|
* be of class clazz.
|
||||||
|
* @param clazz Record class of records.
|
||||||
|
* @param allowUpdate True if update of exiting record is allowed.
|
||||||
|
* @param errorIfExists True if an error should be returned when inserting
|
||||||
|
* an existing record. Only used if allowUpdate = false.
|
||||||
|
* @return true if all operations were successful.
|
||||||
|
*
|
||||||
|
* @throws IOException Throws exception if unable to query the data store.
|
||||||
|
*/
|
||||||
|
@AtMostOnce
|
||||||
|
<T extends BaseRecord> boolean putAll(
|
||||||
|
List<T> records, boolean allowUpdate, boolean errorIfExists)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove a single record.
|
||||||
|
*
|
||||||
|
* @param record Record to be removed.
|
||||||
|
* @return true If the record was successfully removed. False if the record
|
||||||
|
* could not be removed or not stored.
|
||||||
|
* @throws IOException Throws exception if unable to query the data store.
|
||||||
|
*/
|
||||||
|
@AtMostOnce
|
||||||
|
<T extends BaseRecord> boolean remove(T record) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove all records of this class from the store.
|
||||||
|
*
|
||||||
|
* @param clazz Class of records to remove.
|
||||||
|
* @return True if successful.
|
||||||
|
* @throws IOException Throws exception if unable to query the data store.
|
||||||
|
*/
|
||||||
|
@AtMostOnce
|
||||||
|
<T extends BaseRecord> boolean removeAll(Class<T> clazz) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove multiple records of a specific class that match a query. Requires
|
||||||
|
* the getAll implementation to fetch fresh records on each call.
|
||||||
|
*
|
||||||
|
* @param clazz Class of record to remove.
|
||||||
|
* @param filter matching filter to remove.
|
||||||
|
* @return The number of records removed.
|
||||||
|
* @throws IOException Throws exception if unable to query the data store.
|
||||||
|
*/
|
||||||
|
@AtMostOnce
|
||||||
|
<T extends BaseRecord> int remove(Class<T> clazz, Map<String, String> filter)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,69 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Base implementation of a State Store driver. It contains default
|
||||||
|
* implementations for the optional functions. These implementations use an
|
||||||
|
* uncached read/write all algorithm for all changes. In most cases it is
|
||||||
|
* recommended to override the optional functions.
|
||||||
|
* <p>
|
||||||
|
* Drivers may optionally override additional routines for performance
|
||||||
|
* optimization, such as custom get/put/remove queries, depending on the
|
||||||
|
* capabilities of the data store.
|
||||||
|
* <p>
|
||||||
|
*/
|
||||||
|
public abstract class StateStoreBaseImpl extends StateStoreDriver {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T extends BaseRecord> T get(
|
||||||
|
Class<T> clazz, Map<String, String> query) throws IOException {
|
||||||
|
List<T> records = getMultiple(clazz, query);
|
||||||
|
if (records.size() > 1) {
|
||||||
|
throw new IOException("Found more than one object in collection");
|
||||||
|
} else if (records.size() == 1) {
|
||||||
|
return records.get(0);
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T extends BaseRecord> boolean put(
|
||||||
|
T record, boolean allowUpdate, boolean errorIfExists) throws IOException {
|
||||||
|
List<T> singletonList = new ArrayList<T>();
|
||||||
|
singletonList.add(record);
|
||||||
|
return putAll(singletonList, allowUpdate, errorIfExists);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T extends BaseRecord> boolean remove(T record) throws IOException {
|
||||||
|
Map<String, String> primaryKeys = record.getPrimaryKeys();
|
||||||
|
Class<? extends BaseRecord> clazz = StateStoreUtils.getRecordClass(record);
|
||||||
|
return remove(clazz, primaryKeys) == 1;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,39 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementations of state store data providers/drivers. Each driver is
|
||||||
|
* responsible for maintaining, querying, updating and deleting persistent data
|
||||||
|
* records. Data records are defined as subclasses of
|
||||||
|
* {@link org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord}.
|
||||||
|
* Each driver implements the
|
||||||
|
* {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver
|
||||||
|
* StateStoreDriver} interface.
|
||||||
|
* <p>
|
||||||
|
* Currently supported drivers:
|
||||||
|
* <ul>
|
||||||
|
* <li>{@link StateStoreFileImpl} A file-based data storage backend.
|
||||||
|
* <li>{@link StateStoreZooKeeperImpl} A zookeeper based data storage backend.
|
||||||
|
* </ul>
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
@ -0,0 +1,37 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The state store uses modular data storage classes derived from
|
||||||
|
* StateStoreDriver to handle querying, updating and deleting data records. The
|
||||||
|
* data storage driver is initialized and maintained by the StateStoreService.
|
||||||
|
* The state store supports fetching all records of a type, filtering by column
|
||||||
|
* values or fetching a single record by its primary key.
|
||||||
|
* <p>
|
||||||
|
* Each data storage backend is required to implement the methods contained in
|
||||||
|
* the StateStoreDriver interface. These methods allow the querying, updating,
|
||||||
|
* inserting and deleting of data records into the state store.
|
||||||
|
*/
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.federation.store.driver;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
@ -0,0 +1,31 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Contains the abstract definitions of the API request and response objects for
|
||||||
|
* the various state store APIs. The state store supports multiple interface
|
||||||
|
* APIs and multiple data records. Each protocol object requires a serialization
|
||||||
|
* implementation, the default is protobuf.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.federation.store.protocol;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
@ -0,0 +1,189 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.federation.store.records;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Abstract base of a data record in the StateStore. All StateStore records are
|
||||||
|
* derived from this class. Data records are persisted in the data store and
|
||||||
|
* are identified by their primary key. Each data record contains:
|
||||||
|
* <ul>
|
||||||
|
* <li>A primary key consisting of a combination of record data fields.
|
||||||
|
* <li>A modification date.
|
||||||
|
* <li>A creation date.
|
||||||
|
* </ul>
|
||||||
|
*/
|
||||||
|
public abstract class BaseRecord implements Comparable<BaseRecord> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the modification time for the record.
|
||||||
|
*
|
||||||
|
* @param time Modification time of the record.
|
||||||
|
*/
|
||||||
|
public abstract void setDateModified(long time);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the modification time for the record.
|
||||||
|
*
|
||||||
|
* @return Modification time of the record.
|
||||||
|
*/
|
||||||
|
public abstract long getDateModified();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the creation time for the record.
|
||||||
|
*
|
||||||
|
* @param time Creation time of the record.
|
||||||
|
*/
|
||||||
|
public abstract void setDateCreated(long time);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the creation time for the record.
|
||||||
|
*
|
||||||
|
* @return Creation time of the record
|
||||||
|
*/
|
||||||
|
public abstract long getDateCreated();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the expiration time for the record.
|
||||||
|
*
|
||||||
|
* @return Expiration time for the record.
|
||||||
|
*/
|
||||||
|
public abstract long getExpirationMs();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Map of primary key names->values for the record. The primary key can be a
|
||||||
|
* combination of 1-n different State Store serialized values.
|
||||||
|
*
|
||||||
|
* @return Map of key/value pairs that constitute this object's primary key.
|
||||||
|
*/
|
||||||
|
public abstract Map<String, String> getPrimaryKeys();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize the object.
|
||||||
|
*/
|
||||||
|
public void init() {
|
||||||
|
// Call this after the object has been constructed
|
||||||
|
initDefaultTimes();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize default times. The driver may update these timestamps on insert
|
||||||
|
* and/or update. This should only be called when initializing an object that
|
||||||
|
* is not backed by a data store.
|
||||||
|
*/
|
||||||
|
private void initDefaultTimes() {
|
||||||
|
long now = Time.now();
|
||||||
|
this.setDateCreated(now);
|
||||||
|
this.setDateModified(now);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Join the primary keys into one single primary key.
|
||||||
|
*
|
||||||
|
* @return A string that is guaranteed to be unique amongst all records of
|
||||||
|
* this type.
|
||||||
|
*/
|
||||||
|
public String getPrimaryKey() {
|
||||||
|
return generateMashupKey(getPrimaryKeys());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generates a cache key from a map of values.
|
||||||
|
*
|
||||||
|
* @param keys Map of values.
|
||||||
|
* @return String mashup of key values.
|
||||||
|
*/
|
||||||
|
protected static String generateMashupKey(final Map<String, String> keys) {
|
||||||
|
StringBuilder builder = new StringBuilder();
|
||||||
|
for (Object value : keys.values()) {
|
||||||
|
if (builder.length() > 0) {
|
||||||
|
builder.append("-");
|
||||||
|
}
|
||||||
|
builder.append(value);
|
||||||
|
}
|
||||||
|
return builder.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Override equals check to use primary key(s) for comparison.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (!(obj instanceof BaseRecord)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
BaseRecord baseObject = (BaseRecord) obj;
|
||||||
|
Map<String, String> keyset1 = this.getPrimaryKeys();
|
||||||
|
Map<String, String> keyset2 = baseObject.getPrimaryKeys();
|
||||||
|
return keyset1.equals(keyset2);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Override hash code to use primary key(s) for comparison.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
Map<String, String> keyset = this.getPrimaryKeys();
|
||||||
|
return keyset.hashCode();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareTo(BaseRecord record) {
|
||||||
|
if (record == null) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
// Descending date order
|
||||||
|
return (int) (record.getDateModified() - this.getDateModified());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called when the modification time and current time is available, checks for
|
||||||
|
* expirations.
|
||||||
|
*
|
||||||
|
* @param currentTime The current timestamp in ms from the data store, to be
|
||||||
|
* compared against the modification and creation dates of the
|
||||||
|
* object.
|
||||||
|
* @return boolean True if the record has been updated and should be
|
||||||
|
* committed to the data store. Override for customized behavior.
|
||||||
|
*/
|
||||||
|
public boolean checkExpired(long currentTime) {
|
||||||
|
long expiration = getExpirationMs();
|
||||||
|
if (getDateModified() > 0 && expiration > 0) {
|
||||||
|
return (getDateModified() + expiration) < currentTime;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Validates the record. Called when the record is created, populated from the
|
||||||
|
* state store, and before committing to the state store.
|
||||||
|
* @return If the record is valid.
|
||||||
|
*/
|
||||||
|
public boolean validate() {
|
||||||
|
return getDateCreated() > 0 && getDateModified() > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return getPrimaryKey();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,56 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.federation.store.records;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Encapsulates a state store query result that includes a set of records and a
|
||||||
|
* time stamp for the result.
|
||||||
|
*/
|
||||||
|
public class QueryResult<T extends BaseRecord> {
|
||||||
|
|
||||||
|
/** Data result. */
|
||||||
|
private final List<T> records;
|
||||||
|
/** Time stamp of the data results. */
|
||||||
|
private final long timestamp;
|
||||||
|
|
||||||
|
public QueryResult(final List<T> recs, final long time) {
|
||||||
|
this.records = recs;
|
||||||
|
this.timestamp = time;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the result of the query.
|
||||||
|
*
|
||||||
|
* @return List of records.
|
||||||
|
*/
|
||||||
|
public List<T> getRecords() {
|
||||||
|
return Collections.unmodifiableList(this.records);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The timetamp in driver time of this query.
|
||||||
|
*
|
||||||
|
* @return Timestamp in driver time.
|
||||||
|
*/
|
||||||
|
public long getTimestamp() {
|
||||||
|
return this.timestamp;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,36 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Contains the abstract definitions of the state store data records. The state
|
||||||
|
* store supports multiple multiple data records.
|
||||||
|
* <p>
|
||||||
|
* Data records inherit from a common class
|
||||||
|
* {@link org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord
|
||||||
|
* BaseRecord}. Data records are serialized when written to the data store using
|
||||||
|
* a modular serialization implementation. The default is profobuf
|
||||||
|
* serialization. Data is stored as rows of records of the same type with each
|
||||||
|
* data member in a record representing a column.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.federation.store.records;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
Loading…
Reference in New Issue