From 2c740a684a23663962119726bf0e7ecef173f6f1 Mon Sep 17 00:00:00 2001 From: Inigo Date: Thu, 6 Apr 2017 19:18:52 -0700 Subject: [PATCH] HDFS-10882. Federation State Store Interface API. Contributed by Jason Kace and Inigo Goiri. (cherry picked from commit 6d94c90ece1c1d23d4c97e72c54e9991f5dbc481) --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 11 ++ .../server/federation/store/RecordStore.java | 100 +++++++++++++++ .../store/driver/StateStoreSerializer.java | 119 ++++++++++++++++++ .../impl/StateStoreSerializerPBImpl.java | 115 +++++++++++++++++ .../store/records/impl/pb/PBRecord.java | 47 +++++++ .../store/records/impl/pb/package-info.java | 29 +++++ .../src/main/resources/hdfs-default.xml | 8 ++ 7 files changed, 429 insertions(+) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RecordStore.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreSerializer.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializerPBImpl.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/PBRecord.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/package-info.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 92cb0ecd742..c485ea692a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker; +import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl; import org.apache.hadoop.http.HttpConfig; /** @@ -1129,6 +1130,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS_DEFAULT = "org.apache.hadoop.hdfs.server.federation.MockResolver"; + // HDFS Router-based federation State Store + public static final String FEDERATION_STORE_PREFIX = + FEDERATION_ROUTER_PREFIX + "store."; + + public static final String FEDERATION_STORE_SERIALIZER_CLASS = + DFSConfigKeys.FEDERATION_STORE_PREFIX + "serializer"; + public static final Class + FEDERATION_STORE_SERIALIZER_CLASS_DEFAULT = + StateStoreSerializerPBImpl.class; + // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry @Deprecated public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RecordStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RecordStore.java new file mode 100644 index 00000000000..524f4329686 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RecordStore.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import java.lang.reflect.Constructor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; + +/** + * Store records in the State Store. Subclasses provide interfaces to operate on + * those records. + * + * @param Record to store by this interface. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class RecordStore { + + private static final Log LOG = LogFactory.getLog(RecordStore.class); + + + /** Class of the record stored in this State Store. */ + private final Class recordClass; + + /** State store driver backed by persistent storage. */ + private final StateStoreDriver driver; + + + /** + * Create a new store for records. + * + * @param clazz Class of the record to store. + * @param stateStoreDriver Driver for the State Store. + */ + protected RecordStore(Class clazz, StateStoreDriver stateStoreDriver) { + this.recordClass = clazz; + this.driver = stateStoreDriver; + } + + /** + * Report a required record to the data store. The data store uses this to + * create/maintain storage for the record. + * + * @return The class of the required record or null if no record is required + * for this interface. + */ + public Class getRecordClass() { + return this.recordClass; + } + + /** + * Get the State Store driver. + * + * @return State Store driver. + */ + protected StateStoreDriver getDriver() { + return this.driver; + } + + /** + * Build a state store API implementation interface. + * + * @param interfaceClass The specific interface implementation to create + * @param driver The {@link StateStoreDriver} implementation in use. + * @return An initialized instance of the specified state store API + * implementation. + */ + public static > T newInstance( + final Class clazz, final StateStoreDriver driver) { + + try { + Constructor constructor = clazz.getConstructor(StateStoreDriver.class); + T recordStore = constructor.newInstance(driver); + return recordStore; + } catch (Exception e) { + LOG.error("Cannot create new instance for " + clazz, e); + return null; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreSerializer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreSerializer.java new file mode 100644 index 00000000000..85404054aaa --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreSerializer.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * Serializer to store and retrieve data in the State Store. + */ +public abstract class StateStoreSerializer { + + /** Singleton for the serializer instance. */ + private static StateStoreSerializer defaultSerializer; + + /** + * Get the default serializer based. + * @return Singleton serializer. + */ + public static StateStoreSerializer getSerializer() { + return getSerializer(null); + } + + /** + * Get a serializer based on the provided configuration. + * @param conf Configuration. Default if null. + * @return Singleton serializer. + */ + public static StateStoreSerializer getSerializer(Configuration conf) { + if (conf == null) { + synchronized (StateStoreSerializer.class) { + if (defaultSerializer == null) { + conf = new Configuration(); + defaultSerializer = newSerializer(conf); + } + } + return defaultSerializer; + } else { + return newSerializer(conf); + } + } + + private static StateStoreSerializer newSerializer(final Configuration conf) { + Class serializerName = conf.getClass( + DFSConfigKeys.FEDERATION_STORE_SERIALIZER_CLASS, + DFSConfigKeys.FEDERATION_STORE_SERIALIZER_CLASS_DEFAULT, + StateStoreSerializer.class); + return ReflectionUtils.newInstance(serializerName, conf); + } + + /** + * Create a new record. + * @param clazz Class of the new record. + * @return New record. + */ + public static T newRecord(Class clazz) { + return getSerializer(null).newRecordInstance(clazz); + } + + /** + * Create a new record. + * @param clazz Class of the new record. + * @return New record. + */ + public abstract T newRecordInstance(Class clazz); + + /** + * Serialize a record into a byte array. + * @param record Record to serialize. + * @return Byte array with the serialized record. + */ + public abstract byte[] serialize(BaseRecord record); + + /** + * Serialize a record into a string. + * @param record Record to serialize. + * @return String with the serialized record. + */ + public abstract String serializeString(BaseRecord record); + + /** + * Deserialize a bytes array into a record. + * @param byteArray Byte array to deserialize. + * @param clazz Class of the record. + * @return New record. + * @throws IOException If it cannot deserialize the record. + */ + public abstract T deserialize( + byte[] byteArray, Class clazz) throws IOException; + + /** + * Deserialize a string into a record. + * @param data String with the data to deserialize. + * @param clazz Class of the record. + * @return New record. + * @throws IOException If it cannot deserialize the record. + */ + public abstract T deserialize( + String data, Class clazz) throws IOException; +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializerPBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializerPBImpl.java new file mode 100644 index 00000000000..45c5dd6e7c0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializerPBImpl.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver.impl; + +import java.io.IOException; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.codec.binary.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord; +import org.apache.hadoop.util.ReflectionUtils; + +import com.google.protobuf.Message; + +/** + * Protobuf implementation of the State Store serializer. + */ +public final class StateStoreSerializerPBImpl extends StateStoreSerializer { + + private static final String PB_IMPL_PACKAGE_SUFFIX = "impl.pb"; + private static final String PB_IMPL_CLASS_SUFFIX = "PBImpl"; + + private Configuration localConf = new Configuration(); + + + private StateStoreSerializerPBImpl() { + } + + @Override + @SuppressWarnings("unchecked") + public T newRecordInstance(Class clazz) { + try { + String clazzPBImpl = getPBImplClassName(clazz); + Class pbClazz = localConf.getClassByName(clazzPBImpl); + Object retObject = ReflectionUtils.newInstance(pbClazz, localConf); + return (T)retObject; + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + + private String getPBImplClassName(Class clazz) { + String srcPackagePart = getPackageName(clazz); + String srcClassName = getClassName(clazz); + String destPackagePart = srcPackagePart + "." + PB_IMPL_PACKAGE_SUFFIX; + String destClassPart = srcClassName + PB_IMPL_CLASS_SUFFIX; + return destPackagePart + "." + destClassPart; + } + + private String getClassName(Class clazz) { + String fqName = clazz.getName(); + return (fqName.substring(fqName.lastIndexOf(".") + 1, fqName.length())); + } + + private String getPackageName(Class clazz) { + return clazz.getPackage().getName(); + } + + @Override + public byte[] serialize(BaseRecord record) { + byte[] byteArray64 = null; + if (record instanceof PBRecord) { + PBRecord recordPB = (PBRecord) record; + Message msg = recordPB.getProto(); + byte[] byteArray = msg.toByteArray(); + byteArray64 = Base64.encodeBase64(byteArray, false); + } + return byteArray64; + } + + @Override + public String serializeString(BaseRecord record) { + byte[] byteArray64 = serialize(record); + String base64Encoded = StringUtils.newStringUtf8(byteArray64); + return base64Encoded; + } + + @Override + public T deserialize( + byte[] byteArray, Class clazz) throws IOException { + + T record = newRecord(clazz); + if (record instanceof PBRecord) { + PBRecord pbRecord = (PBRecord) record; + byte[] byteArray64 = Base64.encodeBase64(byteArray, false); + String base64Encoded = StringUtils.newStringUtf8(byteArray64); + pbRecord.readInstance(base64Encoded); + } + return record; + } + + @Override + public T deserialize(String data, Class clazz) + throws IOException { + byte[] byteArray64 = Base64.decodeBase64(data); + return deserialize(byteArray64, clazz); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/PBRecord.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/PBRecord.java new file mode 100644 index 00000000000..c369275b092 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/PBRecord.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.records.impl.pb; + +import java.io.IOException; + +import com.google.protobuf.Message; + +/** + * A record implementation using Protobuf. + */ +public interface PBRecord { + + /** + * Get the protocol for the record. + * @return The protocol for this record. + */ + Message getProto(); + + /** + * Set the protocol for the record. + * @param proto Protocol for this record. + */ + void setProto(Message proto); + + /** + * Populate this record with serialized data. + * @param base64String Serialized data in base64. + * @throws IOException If it cannot read the data. + */ + void readInstance(String base64String) throws IOException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/package-info.java new file mode 100644 index 00000000000..b329732e28c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/package-info.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * The protobuf implementations of state store data records defined in the + * org.apache.hadoop.hdfs.server.federation.store.records package. Each + * implementation wraps an associated protobuf proto definition. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +package org.apache.hadoop.hdfs.server.federation.store.records.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 36a9b2e0c03..67595d1538b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4666,4 +4666,12 @@ + + dfs.federation.router.store.serializer + org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl + + Class to serialize State Store records. + + +