From c3b8b5e49ffccb01f173e19c9daceedd06875714 Mon Sep 17 00:00:00 2001 From: Inigo Date: Wed, 29 Mar 2017 19:35:06 -0700 Subject: [PATCH] HDFS-10881. Federation State Store Driver API. Contributed by Jason Kace and Inigo Goiri. (cherry picked from commit 0f88e049156dce173afc0dbda864e29190dd2210) (cherry picked from commit 533b986633e0a9076cf3918fba3e3b591c6f65f2) --- .../store/StateStoreUnavailableException.java | 33 +++ .../federation/store/StateStoreUtils.java | 72 +++++++ .../store/driver/StateStoreDriver.java | 172 ++++++++++++++++ .../driver/StateStoreRecordOperations.java | 164 +++++++++++++++ .../store/driver/impl/StateStoreBaseImpl.java | 69 +++++++ .../store/driver/impl/package-info.java | 39 ++++ .../federation/store/driver/package-info.java | 37 ++++ .../store/protocol/package-info.java | 31 +++ .../federation/store/records/BaseRecord.java | 189 ++++++++++++++++++ .../federation/store/records/QueryResult.java | 56 ++++++ .../store/records/package-info.java | 36 ++++ 11 files changed, 898 insertions(+) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUnavailableException.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/package-info.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/package-info.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/package-info.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/QueryResult.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/package-info.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUnavailableException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUnavailableException.java new file mode 100644 index 00000000000..4e6f8c89446 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUnavailableException.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import java.io.IOException; + +/** + * Thrown when the state store is not reachable or available. Cached APIs and + * queries may succeed. Client should retry again later. + */ +public class StateStoreUnavailableException extends IOException { + + private static final long serialVersionUID = 1L; + + public StateStoreUnavailableException(String msg) { + super(msg); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java new file mode 100644 index 00000000000..8c681df8071 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; + +/** + * Set of utility functions used to query, create, update and delete data + * records in the state store. + */ +public final class StateStoreUtils { + + private static final Log LOG = LogFactory.getLog(StateStoreUtils.class); + + private StateStoreUtils() { + // Utility class + } + + /** + * Get the base class for a record class. If we get an implementation of a + * record we will return the real parent record class. + * + * @param clazz Class of the data record to check. + * @return Base class for the record. + */ + @SuppressWarnings("unchecked") + public static + Class getRecordClass(final Class clazz) { + + // We ignore the Impl classes and go to the super class + Class actualClazz = clazz; + while (actualClazz.getSimpleName().endsWith("Impl")) { + actualClazz = (Class) actualClazz.getSuperclass(); + } + + // Check if we went too far + if (actualClazz.equals(BaseRecord.class)) { + LOG.error("We went too far (" + actualClazz + ") with " + clazz); + actualClazz = clazz; + } + return actualClazz; + } + + /** + * Get the base class for a record. If we get an implementation of a record we + * will return the real parent record class. + * + * @param record Record to check its main class. + * @return Base class for the record. + */ + public static + Class getRecordClass(final T record) { + return getRecordClass(record.getClass()); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java new file mode 100644 index 00000000000..a1527df418f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver; + +import java.net.InetAddress; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.util.Time; + +/** + * Driver class for an implementation of a {@link StateStoreService} + * provider. Driver implementations will extend this class and implement some of + * the default methods. + */ +public abstract class StateStoreDriver implements StateStoreRecordOperations { + + private static final Log LOG = LogFactory.getLog(StateStoreDriver.class); + + + /** State Store configuration. */ + private Configuration conf; + + /** Identifier for the driver. */ + private String identifier; + + + /** + * Initialize the state store connection. + * @param config Configuration for the driver. + * @param id Identifier for the driver. + * @param records Records that are supported. + * @return If initialized and ready, false if failed to initialize driver. + */ + public boolean init(final Configuration config, final String id, + final List> records) { + + this.conf = config; + this.identifier = id; + + if (this.identifier == null) { + LOG.warn("The identifier for the State Store connection is not set"); + } + + // TODO stub + return false; + } + + /** + * Get the State Store configuration. + * + * @return Configuration for the State Store. + */ + protected Configuration getConf() { + return this.conf; + } + + /** + * Gets a unique identifier for the running task/process. Typically the + * router address. + * + * @return Unique identifier for the running task. + */ + public String getIdentifier() { + return this.identifier; + } + + /** + * Prepare the driver to access data storage. + * + * @return True if the driver was successfully initialized. If false is + * returned, the state store will periodically attempt to + * re-initialize the driver and the router will remain in safe mode + * until the driver is initialized. + */ + public abstract boolean initDriver(); + + /** + * Initialize storage for a single record class. + * + * @param name String reference of the record class to initialize, used to + * construct paths and file names for the record. Determined by + * configuration settings for the specific driver. + * @param clazz Record type corresponding to the provided name. + * @return True if successful, false otherwise. + */ + public abstract boolean initRecordStorage( + String className, Class clazz); + + /** + * Check if the driver is currently running and the data store connection is + * valid. + * + * @return True if the driver is initialized and the data store is ready. + */ + public abstract boolean isDriverReady(); + + /** + * Check if the driver is ready to be used and throw an exception otherwise. + * + * @throws StateStoreUnavailableException If the driver is not ready. + */ + public void verifyDriverReady() throws StateStoreUnavailableException { + if (!isDriverReady()) { + String driverName = getDriverName(); + String hostname = getHostname(); + throw new StateStoreUnavailableException("State Store driver " + + driverName + " in " + hostname + " is not ready."); + } + } + + /** + * Close the State Store driver connection. + */ + public abstract void close() throws Exception; + + /** + * Returns the current time synchronization from the underlying store. + * Override for stores that supply a current date. The data store driver is + * responsible for maintaining the official synchronization time/date for all + * distributed components. + * + * @return Current time stamp, used for all synchronization dates. + */ + public long getTime() { + return Time.now(); + } + + /** + * Get the name of the driver implementation for debugging. + * + * @return Name of the driver implementation. + */ + private String getDriverName() { + return this.getClass().getSimpleName(); + } + + /** + * Get the host name of the machine running the driver for debugging. + * + * @return Host name of the machine running the driver. + */ + private String getHostname() { + String hostname = "Unknown"; + try { + hostname = InetAddress.getLocalHost().getHostName(); + } catch (Exception e) { + LOG.error("Cannot get local address", e); + } + return hostname; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java new file mode 100644 index 00000000000..739eeba2804 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; +import org.apache.hadoop.io.retry.AtMostOnce; +import org.apache.hadoop.io.retry.Idempotent; + +/** + * Operations for a driver to manage records in the State Store. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface StateStoreRecordOperations { + + /** + * Get all records of the requested record class from the data store. To use + * the default implementations in this class, getAll must return new instances + * of the records on each call. It is recommended to override the default + * implementations for better performance. + * + * @param clazz Class of record to fetch. + * @return List of all records that match the clazz. + * @throws IOException Throws exception if unable to query the data store. + */ + @Idempotent + QueryResult get(Class clazz) throws IOException; + + /** + * Get all records of the requested record class from the data store. To use + * the default implementations in this class, getAll must return new instances + * of the records on each call. It is recommended to override the default + * implementations for better performance. + * + * @param clazz Class of record to fetch. + * @param sub Sub path. + * @return List of all records that match the clazz and the sub path. + * @throws IOException + */ + @Idempotent + QueryResult get(Class clazz, String sub) + throws IOException; + + /** + * Get a single record from the store that matches the query. + * + * @param clazz Class of record to fetch. + * @param query Map of field names and objects to filter results. + * @return A single record matching the query. Null if there are no matching + * records or more than one matching record in the store. + * @throws IOException If multiple records match or if the data store cannot + * be queried. + */ + @Idempotent + T get(Class clazz, Map query) + throws IOException; + + /** + * Get multiple records from the store that match a query. This method + * assumes the underlying driver does not support filtering. If the driver + * supports filtering it should overwrite this method. + * + * @param clazz Class of record to fetch. + * @param query Map of field names and objects to filter results. + * @return Records of type clazz that match the query or empty list if none + * are found. + * @throws IOException Throws exception if unable to query the data store. + */ + @Idempotent + List getMultiple( + Class clazz, Map query) throws IOException; + + /** + * Creates a single record. Optionally updates an existing record with same + * primary key. + * + * @param record The record to insert or update. + * @param allowUpdate True if update of exiting record is allowed. + * @param errorIfExists True if an error should be returned when inserting + * an existing record. Only used if allowUpdate = false. + * @return True if the operation was successful. + * + * @throws IOException Throws exception if unable to query the data store. + */ + @AtMostOnce + boolean put( + T record, boolean allowUpdate, boolean errorIfExists) throws IOException; + + /** + * Creates multiple records. Optionally updates existing records that have + * the same primary key. + * + * @param records List of data records to update or create. All records must + * be of class clazz. + * @param clazz Record class of records. + * @param allowUpdate True if update of exiting record is allowed. + * @param errorIfExists True if an error should be returned when inserting + * an existing record. Only used if allowUpdate = false. + * @return true if all operations were successful. + * + * @throws IOException Throws exception if unable to query the data store. + */ + @AtMostOnce + boolean putAll( + List records, boolean allowUpdate, boolean errorIfExists) + throws IOException; + + /** + * Remove a single record. + * + * @param record Record to be removed. + * @return true If the record was successfully removed. False if the record + * could not be removed or not stored. + * @throws IOException Throws exception if unable to query the data store. + */ + @AtMostOnce + boolean remove(T record) throws IOException; + + /** + * Remove all records of this class from the store. + * + * @param clazz Class of records to remove. + * @return True if successful. + * @throws IOException Throws exception if unable to query the data store. + */ + @AtMostOnce + boolean removeAll(Class clazz) throws IOException; + + /** + * Remove multiple records of a specific class that match a query. Requires + * the getAll implementation to fetch fresh records on each call. + * + * @param clazz Class of record to remove. + * @param filter matching filter to remove. + * @return The number of records removed. + * @throws IOException Throws exception if unable to query the data store. + */ + @AtMostOnce + int remove(Class clazz, Map filter) + throws IOException; + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java new file mode 100644 index 00000000000..b711fa9bb09 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; + +/** + * Base implementation of a State Store driver. It contains default + * implementations for the optional functions. These implementations use an + * uncached read/write all algorithm for all changes. In most cases it is + * recommended to override the optional functions. + *

+ * Drivers may optionally override additional routines for performance + * optimization, such as custom get/put/remove queries, depending on the + * capabilities of the data store. + *

+ */ +public abstract class StateStoreBaseImpl extends StateStoreDriver { + + @Override + public T get( + Class clazz, Map query) throws IOException { + List records = getMultiple(clazz, query); + if (records.size() > 1) { + throw new IOException("Found more than one object in collection"); + } else if (records.size() == 1) { + return records.get(0); + } else { + return null; + } + } + + @Override + public boolean put( + T record, boolean allowUpdate, boolean errorIfExists) throws IOException { + List singletonList = new ArrayList(); + singletonList.add(record); + return putAll(singletonList, allowUpdate, errorIfExists); + } + + @Override + public boolean remove(T record) throws IOException { + Map primaryKeys = record.getPrimaryKeys(); + Class clazz = StateStoreUtils.getRecordClass(record); + return remove(clazz, primaryKeys) == 1; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/package-info.java new file mode 100644 index 00000000000..a18433e0768 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/package-info.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implementations of state store data providers/drivers. Each driver is + * responsible for maintaining, querying, updating and deleting persistent data + * records. Data records are defined as subclasses of + * {@link org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord}. + * Each driver implements the + * {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver + * StateStoreDriver} interface. + *

+ * Currently supported drivers: + *

    + *
  • {@link StateStoreFileImpl} A file-based data storage backend. + *
  • {@link StateStoreZooKeeperImpl} A zookeeper based data storage backend. + *
+ */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +package org.apache.hadoop.hdfs.server.federation.store.driver.impl; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/package-info.java new file mode 100644 index 00000000000..da998b5740c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/package-info.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * The state store uses modular data storage classes derived from + * StateStoreDriver to handle querying, updating and deleting data records. The + * data storage driver is initialized and maintained by the StateStoreService. + * The state store supports fetching all records of a type, filtering by column + * values or fetching a single record by its primary key. + *

+ * Each data storage backend is required to implement the methods contained in + * the StateStoreDriver interface. These methods allow the querying, updating, + * inserting and deleting of data records into the state store. + */ + +@InterfaceAudience.Private +@InterfaceStability.Evolving + +package org.apache.hadoop.hdfs.server.federation.store.driver; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/package-info.java new file mode 100644 index 00000000000..0249d2cbea9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/package-info.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains the abstract definitions of the API request and response objects for + * the various state store APIs. The state store supports multiple interface + * APIs and multiple data records. Each protocol object requires a serialization + * implementation, the default is protobuf. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving + +package org.apache.hadoop.hdfs.server.federation.store.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java new file mode 100644 index 00000000000..4192a3da353 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.records; + +import java.util.Map; + +import org.apache.hadoop.util.Time; + +/** + * Abstract base of a data record in the StateStore. All StateStore records are + * derived from this class. Data records are persisted in the data store and + * are identified by their primary key. Each data record contains: + *

    + *
  • A primary key consisting of a combination of record data fields. + *
  • A modification date. + *
  • A creation date. + *
+ */ +public abstract class BaseRecord implements Comparable { + + /** + * Set the modification time for the record. + * + * @param time Modification time of the record. + */ + public abstract void setDateModified(long time); + + /** + * Get the modification time for the record. + * + * @return Modification time of the record. + */ + public abstract long getDateModified(); + + /** + * Set the creation time for the record. + * + * @param time Creation time of the record. + */ + public abstract void setDateCreated(long time); + + /** + * Get the creation time for the record. + * + * @return Creation time of the record + */ + public abstract long getDateCreated(); + + /** + * Get the expiration time for the record. + * + * @return Expiration time for the record. + */ + public abstract long getExpirationMs(); + + /** + * Map of primary key names->values for the record. The primary key can be a + * combination of 1-n different State Store serialized values. + * + * @return Map of key/value pairs that constitute this object's primary key. + */ + public abstract Map getPrimaryKeys(); + + /** + * Initialize the object. + */ + public void init() { + // Call this after the object has been constructed + initDefaultTimes(); + } + + /** + * Initialize default times. The driver may update these timestamps on insert + * and/or update. This should only be called when initializing an object that + * is not backed by a data store. + */ + private void initDefaultTimes() { + long now = Time.now(); + this.setDateCreated(now); + this.setDateModified(now); + } + + /** + * Join the primary keys into one single primary key. + * + * @return A string that is guaranteed to be unique amongst all records of + * this type. + */ + public String getPrimaryKey() { + return generateMashupKey(getPrimaryKeys()); + } + + /** + * Generates a cache key from a map of values. + * + * @param keys Map of values. + * @return String mashup of key values. + */ + protected static String generateMashupKey(final Map keys) { + StringBuilder builder = new StringBuilder(); + for (Object value : keys.values()) { + if (builder.length() > 0) { + builder.append("-"); + } + builder.append(value); + } + return builder.toString(); + } + + /** + * Override equals check to use primary key(s) for comparison. + */ + @Override + public boolean equals(Object obj) { + if (!(obj instanceof BaseRecord)) { + return false; + } + + BaseRecord baseObject = (BaseRecord) obj; + Map keyset1 = this.getPrimaryKeys(); + Map keyset2 = baseObject.getPrimaryKeys(); + return keyset1.equals(keyset2); + } + + /** + * Override hash code to use primary key(s) for comparison. + */ + @Override + public int hashCode() { + Map keyset = this.getPrimaryKeys(); + return keyset.hashCode(); + } + + @Override + public int compareTo(BaseRecord record) { + if (record == null) { + return -1; + } + // Descending date order + return (int) (record.getDateModified() - this.getDateModified()); + } + + /** + * Called when the modification time and current time is available, checks for + * expirations. + * + * @param currentTime The current timestamp in ms from the data store, to be + * compared against the modification and creation dates of the + * object. + * @return boolean True if the record has been updated and should be + * committed to the data store. Override for customized behavior. + */ + public boolean checkExpired(long currentTime) { + long expiration = getExpirationMs(); + if (getDateModified() > 0 && expiration > 0) { + return (getDateModified() + expiration) < currentTime; + } + return false; + } + + /** + * Validates the record. Called when the record is created, populated from the + * state store, and before committing to the state store. + * @return If the record is valid. + */ + public boolean validate() { + return getDateCreated() > 0 && getDateModified() > 0; + } + + @Override + public String toString() { + return getPrimaryKey(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/QueryResult.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/QueryResult.java new file mode 100644 index 00000000000..64c2c71fe97 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/QueryResult.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.records; + +import java.util.Collections; +import java.util.List; + +/** + * Encapsulates a state store query result that includes a set of records and a + * time stamp for the result. + */ +public class QueryResult { + + /** Data result. */ + private final List records; + /** Time stamp of the data results. */ + private final long timestamp; + + public QueryResult(final List recs, final long time) { + this.records = recs; + this.timestamp = time; + } + + /** + * Get the result of the query. + * + * @return List of records. + */ + public List getRecords() { + return Collections.unmodifiableList(this.records); + } + + /** + * The timetamp in driver time of this query. + * + * @return Timestamp in driver time. + */ + public long getTimestamp() { + return this.timestamp; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/package-info.java new file mode 100644 index 00000000000..63b13af0911 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/package-info.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains the abstract definitions of the state store data records. The state + * store supports multiple multiple data records. + *

+ * Data records inherit from a common class + * {@link org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord + * BaseRecord}. Data records are serialized when written to the data store using + * a modular serialization implementation. The default is profobuf + * serialization. Data is stored as rows of records of the same type with each + * data member in a record representing a column. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving + +package org.apache.hadoop.hdfs.server.federation.store.records; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability;