diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml index 723cc08e716..3316d59114a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml @@ -17,6 +17,9 @@ + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 642b473095e..27c79c0d86c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -37,6 +37,16 @@ public final class OzoneConfigKeys { public static final String DFS_STORAGE_HANDLER_TYPE_KEY = "dfs.storage.handler.type"; public static final String DFS_STORAGE_HANDLER_TYPE_DEFAULT = "distributed"; + public static final String DFS_STORAGE_RPC_ADDRESS_KEY = + "dfs.storage.rpc-address"; + public static final int DFS_STORAGE_RPC_DEFAULT_PORT = 50200; + public static final String DFS_STORAGE_RPC_ADDRESS_DEFAULT = + "0.0.0.0:" + DFS_STORAGE_RPC_DEFAULT_PORT; + public static final String DFS_STORAGE_RPC_BIND_HOST_KEY = + "dfs.storage.rpc-bind-host"; + public static final String DFS_STORAGE_HANDLER_COUNT_KEY = + "dfs.storage.handler.count"; + public static final int DFS_STORAGE_HANDLER_COUNT_DEFAULT = 10; public static final String DFS_OBJECTSTORE_TRACE_ENABLED_KEY = "dfs.objectstore.trace.enabled"; public static final boolean DFS_OBJECTSTORE_TRACE_ENABLED_DEFAULT = false; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/LocatedContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/LocatedContainer.java new file mode 100644 index 00000000000..1915caa5e83 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/LocatedContainer.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.protocol; + +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; + +/** + * Holds the nodes that currently host the container for an object key hash. + */ +@InterfaceAudience.Private +public final class LocatedContainer { + private final String key; + private final String matchedKeyPrefix; + private final String containerName; + private final Set locations; + private final DatanodeInfo leader; + + /** + * Creates a LocatedContainer. + * + * @param key object key + * @param matchedKeyPrefix prefix of key that was used to find the location + * @param containerName container name + * @param locations nodes that currently host the container + * @param leader node that currently acts as pipeline leader + */ + public LocatedContainer(String key, String matchedKeyPrefix, + String containerName, Set locations, DatanodeInfo leader) { + this.key = key; + this.matchedKeyPrefix = matchedKeyPrefix; + this.containerName = containerName; + this.locations = locations; + this.leader = leader; + } + + /** + * Returns the container name. + * + * @return container name + */ + public String getContainerName() { + return this.containerName; + } + + /** + * Returns the object key. + * + * @return object key + */ + public String getKey() { + return this.key; + } + + /** + * Returns the node that currently acts as pipeline leader. + * + * @return node that currently acts as pipeline leader + */ + public DatanodeInfo getLeader() { + return this.leader; + } + + /** + * Returns the nodes that currently host the container. + * + * @return Set nodes that currently host the container + */ + public Set getLocations() { + return this.locations; + } + + /** + * Returns the prefix of the key that was used to find the location. + * + * @return prefix of the key that was used to find the location + */ + public String getMatchedKeyPrefix() { + return this.matchedKeyPrefix; + } + + @Override + public boolean equals(Object otherObj) { + if (otherObj == null) { + return false; + } + if (!(otherObj instanceof LocatedContainer)) { + return false; + } + LocatedContainer other = (LocatedContainer)otherObj; + return this.key == null ? other.key == null : this.key.equals(other.key); + } + + @Override + public int hashCode() { + return key.hashCode(); + } + + @Override + public String toString() { + return getClass().getSimpleName() + + "{key=" + key + + "; matchedKeyPrefix=" + matchedKeyPrefix + + "; containerName=" + containerName + + "; locations=" + locations + + "; leader=" + leader + + "}"; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerLocationProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerLocationProtocol.java new file mode 100644 index 00000000000..b3605eedea6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerLocationProtocol.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.protocol; + +import java.io.IOException; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * ContainerLocationProtocol is used by an HDFS node to find the set of nodes + * that currently host a container. + */ +@InterfaceAudience.Private +public interface StorageContainerLocationProtocol { + + /** + * Find the set of nodes that currently host the container of an object, as + * identified by the object key hash. This method supports batch lookup by + * passing multiple key hashes. + * + * @param keys batch of object keys to find + * @return located containers for each object key + * @throws IOException if there is any failure + */ + Set getStorageContainerLocations(Set keys) + throws IOException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/package-info.java new file mode 100644 index 00000000000..8bc29aab77c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.protocol; + +/** + * This package contains classes for Ozone protocol definitions. + */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java new file mode 100644 index 00000000000..6aa8190a158 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.protocolPB; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Set; + +import com.google.common.collect.Sets; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.ipc.ProtobufHelper; +import org.apache.hadoop.ipc.ProtocolTranslator; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ozone.protocol.LocatedContainer; +import org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetStorageContainerLocationsRequestProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetStorageContainerLocationsResponseProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.LocatedContainerProto; + +/** + * This class is the client-side translator to translate the requests made on + * the {@link StorageContainerLocationProtocol} interface to the RPC server + * implementing {@link StorageContainerLocationProtocolPB}. + */ +@InterfaceAudience.Private +public final class StorageContainerLocationProtocolClientSideTranslatorPB + implements StorageContainerLocationProtocol, ProtocolTranslator, Closeable { + + /** RpcController is not used and hence is set to null. */ + private static final RpcController NULL_RPC_CONTROLLER = null; + + private final StorageContainerLocationProtocolPB rpcProxy; + + /** + * Creates a new StorageContainerLocationProtocolClientSideTranslatorPB. + * + * @param rpcProxy {@link StorageContainerLocationProtocolPB} RPC proxy + */ + public StorageContainerLocationProtocolClientSideTranslatorPB( + StorageContainerLocationProtocolPB rpcProxy) { + this.rpcProxy = rpcProxy; + } + + @Override + public Set getStorageContainerLocations(Set keys) + throws IOException { + GetStorageContainerLocationsRequestProto.Builder req = + GetStorageContainerLocationsRequestProto.newBuilder(); + for (String key: keys) { + req.addKeys(key); + } + final GetStorageContainerLocationsResponseProto resp; + try { + resp = rpcProxy.getStorageContainerLocations(NULL_RPC_CONTROLLER, + req.build()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + Set locatedContainers = + Sets.newLinkedHashSetWithExpectedSize(resp.getLocatedContainersCount()); + for (LocatedContainerProto locatedContainer: + resp.getLocatedContainersList()) { + Set locations = Sets.newLinkedHashSetWithExpectedSize( + locatedContainer.getLocationsCount()); + for (DatanodeInfoProto location: locatedContainer.getLocationsList()) { + locations.add(PBHelperClient.convert(location)); + } + locatedContainers.add(new LocatedContainer(locatedContainer.getKey(), + locatedContainer.getMatchedKeyPrefix(), + locatedContainer.getContainerName(), locations, + PBHelperClient.convert(locatedContainer.getLeader()))); + } + return locatedContainers; + } + + @Override + public Object getUnderlyingProxyObject() { + return rpcProxy; + } + + @Override + public void close() { + RPC.stopProxy(rpcProxy); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolPB.java new file mode 100644 index 00000000000..058765502b4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolPB.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.protocolPB; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.StorageContainerLocationProtocolService; + +/** + * Protocol used from an HDFS node to StorageContainerManager. This extends the + * Protocol Buffers service interface to add Hadoop-specific annotations. + */ +@ProtocolInfo(protocolName = + "org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol", + protocolVersion = 1) +@InterfaceAudience.Private +public interface StorageContainerLocationProtocolPB + extends StorageContainerLocationProtocolService.BlockingInterface { +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java new file mode 100644 index 00000000000..9d9707fa039 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.protocolPB; + +import java.io.IOException; +import java.util.Set; + +import com.google.common.collect.Sets; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.ozone.protocol.LocatedContainer; +import org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetStorageContainerLocationsRequestProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetStorageContainerLocationsResponseProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.LocatedContainerProto; + +/** + * This class is the server-side translator that forwards requests received on + * {@link StorageContainerLocationProtocolPB} to the + * {@link StorageContainerLocationProtocol} server implementation. + */ +@InterfaceAudience.Private +public final class StorageContainerLocationProtocolServerSideTranslatorPB + implements StorageContainerLocationProtocolPB { + + private final StorageContainerLocationProtocol impl; + + /** + * Creates a new StorageContainerLocationProtocolServerSideTranslatorPB. + * + * @param impl {@link StorageContainerLocationProtocol} server implementation + */ + public StorageContainerLocationProtocolServerSideTranslatorPB( + StorageContainerLocationProtocol impl) { + this.impl = impl; + } + + @Override + public GetStorageContainerLocationsResponseProto getStorageContainerLocations( + RpcController unused, GetStorageContainerLocationsRequestProto req) + throws ServiceException { + Set keys = Sets.newLinkedHashSetWithExpectedSize( + req.getKeysCount()); + for (String key: req.getKeysList()) { + keys.add(key); + } + final Set locatedContainers; + try { + locatedContainers = impl.getStorageContainerLocations(keys); + } catch (IOException e) { + throw new ServiceException(e); + } + GetStorageContainerLocationsResponseProto.Builder resp = + GetStorageContainerLocationsResponseProto.newBuilder(); + for (LocatedContainer locatedContainer: locatedContainers) { + LocatedContainerProto.Builder locatedContainerProto = + LocatedContainerProto.newBuilder() + .setKey(locatedContainer.getKey()) + .setMatchedKeyPrefix(locatedContainer.getMatchedKeyPrefix()) + .setContainerName(locatedContainer.getContainerName()); + for (DatanodeInfo location: locatedContainer.getLocations()) { + locatedContainerProto.addLocations(PBHelperClient.convert(location)); + } + locatedContainerProto.setLeader( + PBHelperClient.convert(locatedContainer.getLeader())); + resp.addLocatedContainers(locatedContainerProto.build()); + } + return resp.build(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/package-info.java new file mode 100644 index 00000000000..860386d9fdc --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/package-info.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.protocolPB; + +/** + * This package contains classes for the Protocol Buffers binding of Ozone + * protocols. + */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java new file mode 100644 index 00000000000..90e200a604d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java @@ -0,0 +1,509 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.storage; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_COUNT_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_COUNT_KEY; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_ADDRESS_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_BIND_HOST_KEY; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; +import com.google.protobuf.BlockingService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; +import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; +import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; +import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; +import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.protocol.LocatedContainer; +import org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos; +import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolPB; +import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolServerSideTranslatorPB; +import org.apache.hadoop.util.StringUtils; + +/** + * StorageContainerManager is the main entry point for the service that provides + * information about which HDFS nodes host containers. + * + * The current implementation is a stub suitable to begin end-to-end testing of + * Ozone service interactions. DataNodes report to StorageContainerManager + * using the existing heartbeat messages. StorageContainerManager tells clients + * container locations by reporting that all registered nodes are a viable + * location. This will evolve from a stub to a full-fledged implementation + * capable of partitioning the keyspace across multiple containers, with + * appropriate distribution across nodes. + */ +@InterfaceAudience.Private +public class StorageContainerManager + implements DatanodeProtocol, StorageContainerLocationProtocol { + + private static final Logger LOG = + LoggerFactory.getLogger(StorageContainerManager.class); + + private final StorageContainerNameService ns; + private final BlockManager blockManager; + + /** The RPC server that listens to requests from DataNodes. */ + private final RPC.Server serviceRpcServer; + private final InetSocketAddress serviceRpcAddress; + + /** The RPC server that listens to requests from clients. */ + private final RPC.Server clientRpcServer; + private final InetSocketAddress clientRpcAddress; + + /** The RPC server that listens to requests from nodes to find containers. */ + private final RPC.Server storageRpcServer; + private final InetSocketAddress storageRpcAddress; + + /** + * Creates a new StorageContainerManager. Configuration will be updated with + * information on the actual listening addresses used for RPC servers. + * + * @param conf configuration + */ + public StorageContainerManager(Configuration conf) + throws IOException { + ns = new StorageContainerNameService(); + boolean haEnabled = false; + blockManager = new BlockManager(ns, haEnabled, conf); + + RPC.setProtocolEngine(conf, DatanodeProtocolPB.class, + ProtobufRpcEngine.class); + RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class, + ProtobufRpcEngine.class); + + BlockingService dnProtoPbService = + DatanodeProtocolProtos + .DatanodeProtocolService + .newReflectiveBlockingService( + new DatanodeProtocolServerSideTranslatorPB(this)); + + InetSocketAddress serviceRpcAddr = NameNode.getServiceAddress(conf, false); + serviceRpcServer = startRpcServer(conf, serviceRpcAddr, + DatanodeProtocolPB.class, dnProtoPbService, + DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY, + DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY, + DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT); + serviceRpcAddress = updateListenAddress(conf, + DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, serviceRpcAddr, serviceRpcServer); + LOG.info(buildRpcServerStartMessage("Service RPC server", + serviceRpcAddress)); + + InetSocketAddress rpcAddr = DFSUtilClient.getNNAddress(conf); + clientRpcServer = startRpcServer(conf, rpcAddr, + DatanodeProtocolPB.class, dnProtoPbService, + DFS_NAMENODE_RPC_BIND_HOST_KEY, + DFS_NAMENODE_HANDLER_COUNT_KEY, + DFS_NAMENODE_HANDLER_COUNT_DEFAULT); + clientRpcAddress = updateListenAddress(conf, + DFS_NAMENODE_RPC_ADDRESS_KEY, rpcAddr, clientRpcServer); + conf.set(FS_DEFAULT_NAME_KEY, DFSUtilClient.getNNUri(clientRpcAddress) + .toString()); + LOG.info(buildRpcServerStartMessage("RPC server", clientRpcAddress)); + + BlockingService storageProtoPbService = + StorageContainerLocationProtocolProtos + .StorageContainerLocationProtocolService + .newReflectiveBlockingService( + new StorageContainerLocationProtocolServerSideTranslatorPB(this)); + + InetSocketAddress storageRpcAddr = NetUtils.createSocketAddr( + conf.getTrimmed(DFS_STORAGE_RPC_ADDRESS_KEY, + DFS_STORAGE_RPC_ADDRESS_DEFAULT), -1, DFS_STORAGE_RPC_ADDRESS_KEY); + + storageRpcServer = startRpcServer(conf, storageRpcAddr, + StorageContainerLocationProtocolPB.class, storageProtoPbService, + DFS_STORAGE_RPC_BIND_HOST_KEY, + DFS_STORAGE_HANDLER_COUNT_KEY, + DFS_STORAGE_HANDLER_COUNT_DEFAULT); + storageRpcAddress = updateListenAddress(conf, + DFS_STORAGE_RPC_ADDRESS_KEY, storageRpcAddr, storageRpcServer); + LOG.info(buildRpcServerStartMessage( + "StorageContainerLocationProtocol RPC server", storageRpcAddress)); + } + + @Override + public Set getStorageContainerLocations(Set keys) + throws IOException { + LOG.trace("getStorageContainerLocations keys = {}", keys); + List liveNodes = new ArrayList(); + blockManager.getDatanodeManager().fetchDatanodes(liveNodes, null, false); + if (liveNodes.isEmpty()) { + throw new IOException("Storage container locations not found."); + } + String containerName = UUID.randomUUID().toString(); + Set locations = + Sets.newLinkedHashSet(liveNodes); + DatanodeInfo leader = liveNodes.get(0); + Set locatedContainers = + Sets.newLinkedHashSetWithExpectedSize(keys.size()); + for (String key: keys) { + locatedContainers.add(new LocatedContainer(key, key, containerName, + locations, leader)); + } + LOG.trace("getStorageContainerLocations keys = {}, locatedContainers = {}", + keys, locatedContainers); + return locatedContainers; + } + + @Override + public DatanodeRegistration registerDatanode( + DatanodeRegistration registration) throws IOException { + ns.writeLock(); + try { + blockManager.getDatanodeManager().registerDatanode(registration); + } finally { + ns.writeUnlock(); + } + return registration; + } + + @Override + public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration, + StorageReport[] reports, long dnCacheCapacity, long dnCacheUsed, + int xmitsInProgress, int xceiverCount, int failedVolumes, + VolumeFailureSummary volumeFailureSummary, + boolean requestFullBlockReportLease) throws IOException { + ns.readLock(); + try { + long cacheCapacity = 0; + long cacheUsed = 0; + int maxTransfer = blockManager.getMaxReplicationStreams() + - xmitsInProgress; + DatanodeCommand[] cmds = blockManager.getDatanodeManager() + .handleHeartbeat(registration, reports, blockManager.getBlockPoolId(), + cacheCapacity, cacheUsed, xceiverCount, maxTransfer, + failedVolumes, volumeFailureSummary); + long txnId = 234; + NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat( + HAServiceProtocol.HAServiceState.ACTIVE, txnId); + RollingUpgradeInfo rollingUpgradeInfo = null; + long blockReportLeaseId = requestFullBlockReportLease ? + blockManager.requestBlockReportLeaseId(registration) : 0; + return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo, + blockReportLeaseId); + } finally { + ns.readUnlock(); + } + } + + @Override + public DatanodeCommand blockReport(DatanodeRegistration registration, + String poolId, StorageBlockReport[] reports, BlockReportContext context) + throws IOException { + for (int r = 0; r < reports.length; r++) { + final BlockListAsLongs storageContainerList = reports[r].getBlocks(); + blockManager.processReport(registration, reports[r].getStorage(), + storageContainerList, context, r == (reports.length - 1)); + } + return null; + } + + @Override + public DatanodeCommand cacheReport(DatanodeRegistration registration, + String poolId, List blockIds) throws IOException { + // Centralized Cache Management is not supported + return null; + } + + @Override + public void blockReceivedAndDeleted(DatanodeRegistration registration, + String poolId, StorageReceivedDeletedBlocks[] rcvdAndDeletedBlocks) + throws IOException { + for(StorageReceivedDeletedBlocks r : rcvdAndDeletedBlocks) { + ns.writeLock(); + try { + blockManager.processIncrementalBlockReport(registration, r); + } finally { + ns.writeUnlock(); + } + } + } + + @Override + public void errorReport(DatanodeRegistration registration, + int errorCode, String msg) throws IOException { + String dnName = + (registration == null) ? "Unknown DataNode" : registration.toString(); + if (errorCode == DatanodeProtocol.NOTIFY) { + LOG.info("Error report from " + dnName + ": " + msg); + return; + } + if (errorCode == DatanodeProtocol.DISK_ERROR) { + LOG.warn("Disk error on " + dnName + ": " + msg); + } else if (errorCode == DatanodeProtocol.FATAL_DISK_ERROR) { + LOG.warn("Fatal disk error on " + dnName + ": " + msg); + blockManager.getDatanodeManager().removeDatanode(registration); + } else { + LOG.info("Error report from " + dnName + ": " + msg); + } + } + + @Override + public NamespaceInfo versionRequest() throws IOException { + ns.readLock(); + try { + return new NamespaceInfo(1, "random", "random", 2, + NodeType.STORAGE_CONTAINER_SERVICE); + } finally { + ns.readUnlock(); + } + } + + @Override + public void reportBadBlocks(LocatedBlock[] blocks) throws IOException { + ns.writeLock(); + try { + for (int i = 0; i < blocks.length; i++) { + ExtendedBlock blk = blocks[i].getBlock(); + DatanodeInfo[] nodes = blocks[i].getLocations(); + String[] storageIDs = blocks[i].getStorageIDs(); + for (int j = 0; j < nodes.length; j++) { + blockManager.findAndMarkBlockAsCorrupt(blk, nodes[j], + storageIDs == null ? null: storageIDs[j], + "client machine reported it"); + } + } + } finally { + ns.writeUnlock(); + } + } + + @Override + public void commitBlockSynchronization(ExtendedBlock block, + long newgenerationstamp, long newlength, boolean closeFile, + boolean deleteblock, DatanodeID[] newtargets, String[] newtargetstorages) + throws IOException { + // Not needed for the purpose of object store + throw new UnsupportedOperationException(); + } + + /** + * Returns information on registered DataNodes. + * + * @param type DataNode type to report + * @return registered DataNodes matching requested type + */ + public DatanodeInfo[] getDatanodeReport(DatanodeReportType type) { + ns.readLock(); + try { + List results = + blockManager.getDatanodeManager().getDatanodeListForReport(type); + return results.toArray(new DatanodeInfo[results.size()]); + } finally { + ns.readUnlock(); + } + } + + /** + * Returns listen address of StorageContainerLocation RPC server. + * + * @return listen address of StorageContainerLocation RPC server + */ + @VisibleForTesting + public InetSocketAddress getStorageContainerLocationRpcAddress() { + return storageRpcAddress; + } + + /** + * Start service. + */ + public void start() { + clientRpcServer.start(); + if (serviceRpcServer != null) { + serviceRpcServer.start(); + } + storageRpcServer.start(); + } + + /** + * Stop service. + */ + public void stop() { + if (clientRpcServer != null) { + clientRpcServer.stop(); + } + if (serviceRpcServer != null) { + serviceRpcServer.stop(); + } + if (storageRpcServer != null) { + storageRpcServer.stop(); + } + IOUtils.closeStream(ns); + } + + /** + * Wait until service has completed shutdown. + */ + public void join() { + try { + clientRpcServer.join(); + if (serviceRpcServer != null) { + serviceRpcServer.join(); + } + storageRpcServer.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.info("Interrupted during StorageContainerManager join."); + } + } + + /** + * Builds a message for logging startup information about an RPC server. + * + * @param description RPC server description + * @param addr RPC server listening address + * @return server startup message + */ + private static String buildRpcServerStartMessage(String description, + InetSocketAddress addr) { + return addr != null ? String.format("%s is listening at %s", + description, NetUtils.getHostPortString(addr)) : + String.format("%s not started", description); + } + + /** + * Starts an RPC server, if configured. + * + * @param conf configuration + * @param addr configured address of RPC server + * @param protocol RPC protocol provided by RPC server + * @param instance RPC protocol implementation instance + * @param bindHostKey configuration key for setting explicit bind host. If + * the property is not configured, then the bind host is taken from addr. + * @param handlerCountKey configuration key for RPC server handler count + * @param handlerCountDefault default RPC server handler count if unconfigured + * @return RPC server, or null if addr is null + * @throws IOException if there is an I/O error while creating RPC server + */ + private static RPC.Server startRpcServer(Configuration conf, + InetSocketAddress addr, Class protocol, BlockingService instance, + String bindHostKey, String handlerCountKey, int handlerCountDefault) + throws IOException { + if (addr == null) { + return null; + } + String bindHost = conf.getTrimmed(bindHostKey); + if (bindHost == null || bindHost.isEmpty()) { + bindHost = addr.getHostName(); + } + int numHandlers = conf.getInt(handlerCountKey, handlerCountDefault); + RPC.Server rpcServer = new RPC.Builder(conf) + .setProtocol(protocol) + .setInstance(instance) + .setBindAddress(bindHost) + .setPort(addr.getPort()) + .setNumHandlers(numHandlers) + .setVerbose(false) + .setSecretManager(null) + .build(); + DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer); + return rpcServer; + } + + /** + * After starting an RPC server, updates configuration with the actual + * listening address of that server. The listening address may be different + * from the configured address if, for example, the configured address uses + * port 0 to request use of an ephemeral port. + * + * @param conf configuration to update + * @param rpcAddressKey configuration key for RPC server address + * @param addr configured address + * @param rpcServer started RPC server. If null, then the server was not + * started, and this method is a no-op. + */ + private static InetSocketAddress updateListenAddress(Configuration conf, + String rpcAddressKey, InetSocketAddress addr, RPC.Server rpcServer) { + if (rpcServer == null) { + return null; + } + InetSocketAddress listenAddr = rpcServer.getListenerAddress(); + InetSocketAddress updatedAddr = new InetSocketAddress( + addr.getHostName(), listenAddr.getPort()); + conf.set(rpcAddressKey, NetUtils.getHostPortString(updatedAddr)); + return updatedAddr; + } + + /** + * Main entry point for starting StorageContainerManager. + * + * @param argv arguments + * @throws IOException if startup fails due to I/O error + */ + public static void main(String[] argv) throws IOException { + StringUtils.startupShutdownMessage( + StorageContainerManager.class, argv, LOG); + StorageContainerManager scm = new StorageContainerManager( + new Configuration()); + scm.start(); + scm.join(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerNameService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerNameService.java new file mode 100644 index 00000000000..ca9d0eb0962 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerNameService.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.storage; + +import java.io.Closeable; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; +import org.apache.hadoop.hdfs.server.namenode.CacheManager; +import org.apache.hadoop.hdfs.server.namenode.Namesystem; +import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; + +/** + * Namesystem implementation intended for use by StorageContainerManager. + */ +@InterfaceAudience.Private +public class StorageContainerNameService implements Namesystem, Closeable { + + private final ReentrantReadWriteLock coarseLock = + new ReentrantReadWriteLock(); + private volatile boolean serviceRunning = true; + + @Override + public boolean isRunning() { + return serviceRunning; + } + + @Override + public BlockCollection getBlockCollection(long id) { + // TBD + return null; + } + + @Override + public void startSecretManagerIfNecessary() { + // Secret manager is not supported + } + + @Override + public CacheManager getCacheManager() { + // Centralized Cache Management is not supported + return null; + } + + @Override + public HAContext getHAContext() { + // HA mode is not supported + return null; + } + + @Override + public boolean inTransitionToActive() { + // HA mode is not supported + return false; + } + + @Override + public boolean isInSnapshot(long blockCollectionID) { + // Snapshots not supported + return false; + } + + @Override + public void readLock() { + coarseLock.readLock().lock(); + } + + @Override + public void readUnlock() { + coarseLock.readLock().unlock(); + } + + @Override + public boolean hasReadLock() { + return coarseLock.getReadHoldCount() > 0 || hasWriteLock(); + } + + @Override + public void writeLock() { + coarseLock.writeLock().lock(); + } + + @Override + public void writeLockInterruptibly() throws InterruptedException { + coarseLock.writeLock().lockInterruptibly(); + } + + @Override + public void writeUnlock() { + coarseLock.writeLock().unlock(); + } + + @Override + public boolean hasWriteLock() { + return coarseLock.isWriteLockedByCurrentThread(); + } + + @Override + public boolean isInSafeMode() { + // Safe mode is not supported + return false; + } + + @Override + public boolean isInStartupSafeMode() { + // Safe mode is not supported + return false; + } + + @Override + public void close() { + serviceRunning = false; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/package-info.java new file mode 100644 index 00000000000..75e337f10ca --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.storage; + +/** + * This package contains StorageContainerManager classes. + */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerLocationProtocol.proto new file mode 100644 index 00000000000..6ed326a9a28 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerLocationProtocol.proto @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * These .proto interfaces are private and unstable. + * Please see http://wiki.apache.org/hadoop/Compatibility + * for what changes are allowed for a *unstable* .proto interface. + */ + +option java_package = "org.apache.hadoop.ozone.protocol.proto"; +option java_outer_classname = "StorageContainerLocationProtocolProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.hdfs; + +import "hdfs.proto"; + +/** + * keys - batch of object keys to find + */ +message GetStorageContainerLocationsRequestProto { + repeated string keys = 1; +} + +/** + * locatedContainers - for each requested hash, nodes that currently host the + * container for that object key hash + */ +message GetStorageContainerLocationsResponseProto { + repeated LocatedContainerProto locatedContainers = 1; +} + +/** + * Holds the nodes that currently host the container for an object key. + */ +message LocatedContainerProto { + required string key = 1; + required string matchedKeyPrefix = 2; + required string containerName = 3; + repeated DatanodeInfoProto locations = 4; + required DatanodeInfoProto leader = 5; +} + +/** + * Protocol used from an HDFS node to StorageContainerManager. See the request + * and response messages for details of the RPC calls. + */ +service StorageContainerLocationProtocolService { + /** + * Find the set of nodes that currently host the container of an object, as + * identified by the object key hash. This method supports batch lookup by + * passing multiple key hashes. + */ + rpc getStorageContainerLocations(GetStorageContainerLocationsRequestProto) + returns(GetStorageContainerLocationsResponseProto); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index acb720e544d..cda7b0f3c67 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -1095,9 +1095,6 @@ public class MiniDFSCluster implements AutoCloseable { */ public static void configureNameNodes(MiniDFSNNTopology nnTopology, boolean federation, Configuration conf) throws IOException { - Preconditions.checkArgument(nnTopology.countNameNodes() > 0, - "empty NN topology: no namenodes specified!"); - if (!federation && nnTopology.countNameNodes() == 1) { NNConf onlyNN = nnTopology.getOnlyNameNode(); // we only had one NN, set DEFAULT_NAME for it. If not explicitly diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java new file mode 100644 index 00000000000..218058c4ec9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_ADDRESS_KEY; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolPB; +import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.ozone.storage.StorageContainerManager; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Time; + +/** + * MiniOzoneCluster creates a complete in-process Ozone cluster suitable for + * running tests. The cluster consists of a StorageContainerManager and + * multiple DataNodes. This class subclasses {@link MiniDFSCluster} for + * convenient reuse of logic for starting DataNodes. Unlike MiniDFSCluster, it + * does not start a NameNode, because Ozone does not require a NameNode. + */ +@InterfaceAudience.Private +public class MiniOzoneCluster extends MiniDFSCluster implements Closeable { + + private static final Logger LOG = + LoggerFactory.getLogger(MiniOzoneCluster.class); + + private final OzoneConfiguration conf; + private final StorageContainerManager scm; + + /** + * Creates a new MiniOzoneCluster. + * + * @param builder cluster builder + * @param scm StorageContainerManager, already running + * @throws IOException if there is an I/O error + */ + private MiniOzoneCluster(Builder builder, StorageContainerManager scm) + throws IOException { + super(builder); + this.conf = builder.conf; + this.scm = scm; + } + + /** + * Builder for configuring the MiniOzoneCluster to run. + */ + public static class Builder + extends org.apache.hadoop.hdfs.MiniDFSCluster.Builder { + + private final OzoneConfiguration conf; + + /** + * Creates a new Builder. + * + * @param conf configuration + */ + public Builder(OzoneConfiguration conf) { + super(conf); + this.conf = conf; + this.nnTopology(new MiniDFSNNTopology()); // No NameNode required + } + + @Override + public Builder numDataNodes(int val) { + super.numDataNodes(val); + return this; + } + + @Override + public MiniOzoneCluster build() throws IOException { + // Even though this won't start a NameNode, some of the logic in + // MiniDFSCluster expects to find the default file system configured with + // an HDFS URI. + conf.set(FS_DEFAULT_NAME_KEY, "hdfs://127.0.0.1:0"); + conf.set(DFS_STORAGE_RPC_ADDRESS_KEY, "127.0.0.1:0"); + StorageContainerManager scm = new StorageContainerManager(conf); + scm.start(); + return new MiniOzoneCluster(this, scm); + } + } + + @Override + public void close() { + shutdown(); + } + + @Override + public void shutdown() { + super.shutdown(); + LOG.info("Shutting down the Mini Ozone Cluster"); + if (scm == null) { + return; + } + LOG.info("Shutting down the StorageContainerManager"); + scm.stop(); + scm.join(); + } + + /** + * Waits for the Ozone cluster to be ready for processing requests. + */ + public void waitOzoneReady() { + long begin = Time.monotonicNow(); + while (scm.getDatanodeReport(DatanodeReportType.LIVE).length < + numDataNodes) { + if (Time.monotonicNow() - begin > 20000) { + throw new IllegalStateException( + "Timed out waiting for Ozone cluster to become ready."); + } + LOG.info("Waiting for Ozone cluster to become ready"); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException( + "Interrupted while waiting for Ozone cluster to become ready."); + } + } + } + + /** + * Creates an RPC proxy connected to this cluster's StorageContainerManager + * for accessing container location information. Callers take ownership of + * the proxy and must close it when done. + * + * @return RPC proxy for accessing container location information + * @throws IOException if there is an I/O error + */ + protected StorageContainerLocationProtocolClientSideTranslatorPB + createStorageContainerLocationClient() throws IOException { + long version = RPC.getProtocolVersion( + StorageContainerLocationProtocolPB.class); + InetSocketAddress address = scm.getStorageContainerLocationRpcAddress(); + return new StorageContainerLocationProtocolClientSideTranslatorPB( + RPC.getProxy(StorageContainerLocationProtocolPB.class, version, + address, UserGroupInformation.getCurrentUser(), conf, + NetUtils.getDefaultSocketFactory(conf), Client.getTimeout(conf))); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/storage/TestStorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/storage/TestStorageContainerManager.java new file mode 100644 index 00000000000..4ea1d6f6765 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/storage/TestStorageContainerManager.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.Set; + +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ozone.protocol.LocatedContainer; +import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; + +public class TestStorageContainerManager { + + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf; + private static StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocationClient; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @BeforeClass + public static void init() throws IOException { + conf = new OzoneConfiguration(); + conf.setBoolean(OzoneConfigKeys.DFS_OBJECTSTORE_ENABLED_KEY, true); + conf.set(OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY, "distributed"); + conf.setBoolean(OzoneConfigKeys.DFS_OBJECTSTORE_TRACE_ENABLED_KEY, true); + } + + @After + public void shutdown() throws InterruptedException { + IOUtils.cleanup(null, storageContainerLocationClient, cluster); + } + + @Test + public void testLocationsForSingleKey() throws IOException { + cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitOzoneReady(); + storageContainerLocationClient = + cluster.createStorageContainerLocationClient(); + Set containers = + storageContainerLocationClient.getStorageContainerLocations( + new LinkedHashSet<>(Arrays.asList("/key1"))); + assertNotNull(containers); + assertEquals(1, containers.size()); + assertLocatedContainer(containers, "/key1", 1); + } + + @Test + public void testLocationsForMultipleKeys() throws IOException { + cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitOzoneReady(); + storageContainerLocationClient = + cluster.createStorageContainerLocationClient(); + Set containers = + storageContainerLocationClient.getStorageContainerLocations( + new LinkedHashSet<>(Arrays.asList("/key1", "/key2", "/key3"))); + assertNotNull(containers); + assertEquals(3, containers.size()); + assertLocatedContainer(containers, "/key1", 1); + assertLocatedContainer(containers, "/key2", 1); + assertLocatedContainer(containers, "/key3", 1); + } + + @Test + public void testNoDataNodes() throws IOException { + cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(0).build(); + cluster.waitOzoneReady(); + storageContainerLocationClient = + cluster.createStorageContainerLocationClient(); + exception.expect(IOException.class); + exception.expectMessage("locations not found"); + storageContainerLocationClient.getStorageContainerLocations( + new LinkedHashSet<>(Arrays.asList("/key1"))); + } + + @Test + public void testMultipleDataNodes() throws IOException { + cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(3).build(); + cluster.waitOzoneReady(); + storageContainerLocationClient = + cluster.createStorageContainerLocationClient(); + Set containers = + storageContainerLocationClient.getStorageContainerLocations( + new LinkedHashSet<>(Arrays.asList("/key1"))); + assertNotNull(containers); + assertEquals(1, containers.size()); + assertLocatedContainer(containers, "/key1", 3); + } + + private static void assertLocatedContainer(Set containers, + String key, int expectedNumLocations) { + LocatedContainer container = null; + for (LocatedContainer curContainer: containers) { + if (key.equals(curContainer.getKey())) { + container = curContainer; + break; + } + } + assertNotNull("Container for key " + key + " not found.", container); + assertEquals(key, container.getKey()); + assertNotNull(container.getMatchedKeyPrefix()); + assertFalse(container.getMatchedKeyPrefix().isEmpty()); + assertNotNull(container.getContainerName()); + assertFalse(container.getContainerName().isEmpty()); + assertNotNull(container.getLocations()); + assertEquals(expectedNumLocations, container.getLocations().size()); + assertNotNull(container.getLeader()); + } +}