HDFS-8210. Ozone: Implement storage container manager. Contributed by Chris Nauroth.

This commit is contained in:
Anu Engineer 2016-03-30 14:49:55 -07:00 committed by Owen O'Malley
parent e758f90f93
commit 0744d0a947
16 changed files with 1497 additions and 3 deletions

View File

@ -17,6 +17,9 @@
<Match> <Match>
<Package name="org.apache.hadoop.hdfs.qjournal.protocol" /> <Package name="org.apache.hadoop.hdfs.qjournal.protocol" />
</Match> </Match>
<Match>
<Package name="org.apache.hadoop.ozone.protocol.proto" />
</Match>
<Match> <Match>
<Bug pattern="EI_EXPOSE_REP" /> <Bug pattern="EI_EXPOSE_REP" />
</Match> </Match>

View File

@ -37,6 +37,16 @@ public final class OzoneConfigKeys {
public static final String DFS_STORAGE_HANDLER_TYPE_KEY = public static final String DFS_STORAGE_HANDLER_TYPE_KEY =
"dfs.storage.handler.type"; "dfs.storage.handler.type";
public static final String DFS_STORAGE_HANDLER_TYPE_DEFAULT = "distributed"; public static final String DFS_STORAGE_HANDLER_TYPE_DEFAULT = "distributed";
public static final String DFS_STORAGE_RPC_ADDRESS_KEY =
"dfs.storage.rpc-address";
public static final int DFS_STORAGE_RPC_DEFAULT_PORT = 50200;
public static final String DFS_STORAGE_RPC_ADDRESS_DEFAULT =
"0.0.0.0:" + DFS_STORAGE_RPC_DEFAULT_PORT;
public static final String DFS_STORAGE_RPC_BIND_HOST_KEY =
"dfs.storage.rpc-bind-host";
public static final String DFS_STORAGE_HANDLER_COUNT_KEY =
"dfs.storage.handler.count";
public static final int DFS_STORAGE_HANDLER_COUNT_DEFAULT = 10;
public static final String DFS_OBJECTSTORE_TRACE_ENABLED_KEY = public static final String DFS_OBJECTSTORE_TRACE_ENABLED_KEY =
"dfs.objectstore.trace.enabled"; "dfs.objectstore.trace.enabled";
public static final boolean DFS_OBJECTSTORE_TRACE_ENABLED_DEFAULT = false; public static final boolean DFS_OBJECTSTORE_TRACE_ENABLED_DEFAULT = false;

View File

@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.protocol;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
/**
* Holds the nodes that currently host the container for an object key hash.
*/
@InterfaceAudience.Private
public final class LocatedContainer {
private final String key;
private final String matchedKeyPrefix;
private final String containerName;
private final Set<DatanodeInfo> locations;
private final DatanodeInfo leader;
/**
* Creates a LocatedContainer.
*
* @param key object key
* @param matchedKeyPrefix prefix of key that was used to find the location
* @param containerName container name
* @param locations nodes that currently host the container
* @param leader node that currently acts as pipeline leader
*/
public LocatedContainer(String key, String matchedKeyPrefix,
String containerName, Set<DatanodeInfo> locations, DatanodeInfo leader) {
this.key = key;
this.matchedKeyPrefix = matchedKeyPrefix;
this.containerName = containerName;
this.locations = locations;
this.leader = leader;
}
/**
* Returns the container name.
*
* @return container name
*/
public String getContainerName() {
return this.containerName;
}
/**
* Returns the object key.
*
* @return object key
*/
public String getKey() {
return this.key;
}
/**
* Returns the node that currently acts as pipeline leader.
*
* @return node that currently acts as pipeline leader
*/
public DatanodeInfo getLeader() {
return this.leader;
}
/**
* Returns the nodes that currently host the container.
*
* @return Set<DatanodeInfo> nodes that currently host the container
*/
public Set<DatanodeInfo> getLocations() {
return this.locations;
}
/**
* Returns the prefix of the key that was used to find the location.
*
* @return prefix of the key that was used to find the location
*/
public String getMatchedKeyPrefix() {
return this.matchedKeyPrefix;
}
@Override
public boolean equals(Object otherObj) {
if (otherObj == null) {
return false;
}
if (!(otherObj instanceof LocatedContainer)) {
return false;
}
LocatedContainer other = (LocatedContainer)otherObj;
return this.key == null ? other.key == null : this.key.equals(other.key);
}
@Override
public int hashCode() {
return key.hashCode();
}
@Override
public String toString() {
return getClass().getSimpleName()
+ "{key=" + key
+ "; matchedKeyPrefix=" + matchedKeyPrefix
+ "; containerName=" + containerName
+ "; locations=" + locations
+ "; leader=" + leader
+ "}";
}
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.protocol;
import java.io.IOException;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* ContainerLocationProtocol is used by an HDFS node to find the set of nodes
* that currently host a container.
*/
@InterfaceAudience.Private
public interface StorageContainerLocationProtocol {
/**
* Find the set of nodes that currently host the container of an object, as
* identified by the object key hash. This method supports batch lookup by
* passing multiple key hashes.
*
* @param keys batch of object keys to find
* @return located containers for each object key
* @throws IOException if there is any failure
*/
Set<LocatedContainer> getStorageContainerLocations(Set<String> keys)
throws IOException;
}

View File

@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.protocol;
/**
* This package contains classes for Ozone protocol definitions.
*/

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.protocolPB;
import java.io.Closeable;
import java.io.IOException;
import java.util.Set;
import com.google.common.collect.Sets;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.protocol.LocatedContainer;
import org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetStorageContainerLocationsRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetStorageContainerLocationsResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.LocatedContainerProto;
/**
* This class is the client-side translator to translate the requests made on
* the {@link StorageContainerLocationProtocol} interface to the RPC server
* implementing {@link StorageContainerLocationProtocolPB}.
*/
@InterfaceAudience.Private
public final class StorageContainerLocationProtocolClientSideTranslatorPB
implements StorageContainerLocationProtocol, ProtocolTranslator, Closeable {
/** RpcController is not used and hence is set to null. */
private static final RpcController NULL_RPC_CONTROLLER = null;
private final StorageContainerLocationProtocolPB rpcProxy;
/**
* Creates a new StorageContainerLocationProtocolClientSideTranslatorPB.
*
* @param rpcProxy {@link StorageContainerLocationProtocolPB} RPC proxy
*/
public StorageContainerLocationProtocolClientSideTranslatorPB(
StorageContainerLocationProtocolPB rpcProxy) {
this.rpcProxy = rpcProxy;
}
@Override
public Set<LocatedContainer> getStorageContainerLocations(Set<String> keys)
throws IOException {
GetStorageContainerLocationsRequestProto.Builder req =
GetStorageContainerLocationsRequestProto.newBuilder();
for (String key: keys) {
req.addKeys(key);
}
final GetStorageContainerLocationsResponseProto resp;
try {
resp = rpcProxy.getStorageContainerLocations(NULL_RPC_CONTROLLER,
req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
Set<LocatedContainer> locatedContainers =
Sets.newLinkedHashSetWithExpectedSize(resp.getLocatedContainersCount());
for (LocatedContainerProto locatedContainer:
resp.getLocatedContainersList()) {
Set<DatanodeInfo> locations = Sets.newLinkedHashSetWithExpectedSize(
locatedContainer.getLocationsCount());
for (DatanodeInfoProto location: locatedContainer.getLocationsList()) {
locations.add(PBHelperClient.convert(location));
}
locatedContainers.add(new LocatedContainer(locatedContainer.getKey(),
locatedContainer.getMatchedKeyPrefix(),
locatedContainer.getContainerName(), locations,
PBHelperClient.convert(locatedContainer.getLeader())));
}
return locatedContainers;
}
@Override
public Object getUnderlyingProxyObject() {
return rpcProxy;
}
@Override
public void close() {
RPC.stopProxy(rpcProxy);
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.protocolPB;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.StorageContainerLocationProtocolService;
/**
* Protocol used from an HDFS node to StorageContainerManager. This extends the
* Protocol Buffers service interface to add Hadoop-specific annotations.
*/
@ProtocolInfo(protocolName =
"org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol",
protocolVersion = 1)
@InterfaceAudience.Private
public interface StorageContainerLocationProtocolPB
extends StorageContainerLocationProtocolService.BlockingInterface {
}

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.protocolPB;
import java.io.IOException;
import java.util.Set;
import com.google.common.collect.Sets;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.ozone.protocol.LocatedContainer;
import org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetStorageContainerLocationsRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetStorageContainerLocationsResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.LocatedContainerProto;
/**
* This class is the server-side translator that forwards requests received on
* {@link StorageContainerLocationProtocolPB} to the
* {@link StorageContainerLocationProtocol} server implementation.
*/
@InterfaceAudience.Private
public final class StorageContainerLocationProtocolServerSideTranslatorPB
implements StorageContainerLocationProtocolPB {
private final StorageContainerLocationProtocol impl;
/**
* Creates a new StorageContainerLocationProtocolServerSideTranslatorPB.
*
* @param impl {@link StorageContainerLocationProtocol} server implementation
*/
public StorageContainerLocationProtocolServerSideTranslatorPB(
StorageContainerLocationProtocol impl) {
this.impl = impl;
}
@Override
public GetStorageContainerLocationsResponseProto getStorageContainerLocations(
RpcController unused, GetStorageContainerLocationsRequestProto req)
throws ServiceException {
Set<String> keys = Sets.newLinkedHashSetWithExpectedSize(
req.getKeysCount());
for (String key: req.getKeysList()) {
keys.add(key);
}
final Set<LocatedContainer> locatedContainers;
try {
locatedContainers = impl.getStorageContainerLocations(keys);
} catch (IOException e) {
throw new ServiceException(e);
}
GetStorageContainerLocationsResponseProto.Builder resp =
GetStorageContainerLocationsResponseProto.newBuilder();
for (LocatedContainer locatedContainer: locatedContainers) {
LocatedContainerProto.Builder locatedContainerProto =
LocatedContainerProto.newBuilder()
.setKey(locatedContainer.getKey())
.setMatchedKeyPrefix(locatedContainer.getMatchedKeyPrefix())
.setContainerName(locatedContainer.getContainerName());
for (DatanodeInfo location: locatedContainer.getLocations()) {
locatedContainerProto.addLocations(PBHelperClient.convert(location));
}
locatedContainerProto.setLeader(
PBHelperClient.convert(locatedContainer.getLeader()));
resp.addLocatedContainers(locatedContainerProto.build());
}
return resp.build();
}
}

View File

@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.protocolPB;
/**
* This package contains classes for the Protocol Buffers binding of Ozone
* protocols.
*/

View File

@ -0,0 +1,509 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.storage;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_ADDRESS_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_BIND_HOST_KEY;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import com.google.protobuf.BlockingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.protocol.LocatedContainer;
import org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolServerSideTranslatorPB;
import org.apache.hadoop.util.StringUtils;
/**
* StorageContainerManager is the main entry point for the service that provides
* information about which HDFS nodes host containers.
*
* The current implementation is a stub suitable to begin end-to-end testing of
* Ozone service interactions. DataNodes report to StorageContainerManager
* using the existing heartbeat messages. StorageContainerManager tells clients
* container locations by reporting that all registered nodes are a viable
* location. This will evolve from a stub to a full-fledged implementation
* capable of partitioning the keyspace across multiple containers, with
* appropriate distribution across nodes.
*/
@InterfaceAudience.Private
public class StorageContainerManager
implements DatanodeProtocol, StorageContainerLocationProtocol {
private static final Logger LOG =
LoggerFactory.getLogger(StorageContainerManager.class);
private final StorageContainerNameService ns;
private final BlockManager blockManager;
/** The RPC server that listens to requests from DataNodes. */
private final RPC.Server serviceRpcServer;
private final InetSocketAddress serviceRpcAddress;
/** The RPC server that listens to requests from clients. */
private final RPC.Server clientRpcServer;
private final InetSocketAddress clientRpcAddress;
/** The RPC server that listens to requests from nodes to find containers. */
private final RPC.Server storageRpcServer;
private final InetSocketAddress storageRpcAddress;
/**
* Creates a new StorageContainerManager. Configuration will be updated with
* information on the actual listening addresses used for RPC servers.
*
* @param conf configuration
*/
public StorageContainerManager(Configuration conf)
throws IOException {
ns = new StorageContainerNameService();
boolean haEnabled = false;
blockManager = new BlockManager(ns, haEnabled, conf);
RPC.setProtocolEngine(conf, DatanodeProtocolPB.class,
ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
ProtobufRpcEngine.class);
BlockingService dnProtoPbService =
DatanodeProtocolProtos
.DatanodeProtocolService
.newReflectiveBlockingService(
new DatanodeProtocolServerSideTranslatorPB(this));
InetSocketAddress serviceRpcAddr = NameNode.getServiceAddress(conf, false);
serviceRpcServer = startRpcServer(conf, serviceRpcAddr,
DatanodeProtocolPB.class, dnProtoPbService,
DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY,
DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
serviceRpcAddress = updateListenAddress(conf,
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, serviceRpcAddr, serviceRpcServer);
LOG.info(buildRpcServerStartMessage("Service RPC server",
serviceRpcAddress));
InetSocketAddress rpcAddr = DFSUtilClient.getNNAddress(conf);
clientRpcServer = startRpcServer(conf, rpcAddr,
DatanodeProtocolPB.class, dnProtoPbService,
DFS_NAMENODE_RPC_BIND_HOST_KEY,
DFS_NAMENODE_HANDLER_COUNT_KEY,
DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
clientRpcAddress = updateListenAddress(conf,
DFS_NAMENODE_RPC_ADDRESS_KEY, rpcAddr, clientRpcServer);
conf.set(FS_DEFAULT_NAME_KEY, DFSUtilClient.getNNUri(clientRpcAddress)
.toString());
LOG.info(buildRpcServerStartMessage("RPC server", clientRpcAddress));
BlockingService storageProtoPbService =
StorageContainerLocationProtocolProtos
.StorageContainerLocationProtocolService
.newReflectiveBlockingService(
new StorageContainerLocationProtocolServerSideTranslatorPB(this));
InetSocketAddress storageRpcAddr = NetUtils.createSocketAddr(
conf.getTrimmed(DFS_STORAGE_RPC_ADDRESS_KEY,
DFS_STORAGE_RPC_ADDRESS_DEFAULT), -1, DFS_STORAGE_RPC_ADDRESS_KEY);
storageRpcServer = startRpcServer(conf, storageRpcAddr,
StorageContainerLocationProtocolPB.class, storageProtoPbService,
DFS_STORAGE_RPC_BIND_HOST_KEY,
DFS_STORAGE_HANDLER_COUNT_KEY,
DFS_STORAGE_HANDLER_COUNT_DEFAULT);
storageRpcAddress = updateListenAddress(conf,
DFS_STORAGE_RPC_ADDRESS_KEY, storageRpcAddr, storageRpcServer);
LOG.info(buildRpcServerStartMessage(
"StorageContainerLocationProtocol RPC server", storageRpcAddress));
}
@Override
public Set<LocatedContainer> getStorageContainerLocations(Set<String> keys)
throws IOException {
LOG.trace("getStorageContainerLocations keys = {}", keys);
List<DatanodeDescriptor> liveNodes = new ArrayList<DatanodeDescriptor>();
blockManager.getDatanodeManager().fetchDatanodes(liveNodes, null, false);
if (liveNodes.isEmpty()) {
throw new IOException("Storage container locations not found.");
}
String containerName = UUID.randomUUID().toString();
Set<DatanodeInfo> locations =
Sets.<DatanodeInfo>newLinkedHashSet(liveNodes);
DatanodeInfo leader = liveNodes.get(0);
Set<LocatedContainer> locatedContainers =
Sets.newLinkedHashSetWithExpectedSize(keys.size());
for (String key: keys) {
locatedContainers.add(new LocatedContainer(key, key, containerName,
locations, leader));
}
LOG.trace("getStorageContainerLocations keys = {}, locatedContainers = {}",
keys, locatedContainers);
return locatedContainers;
}
@Override
public DatanodeRegistration registerDatanode(
DatanodeRegistration registration) throws IOException {
ns.writeLock();
try {
blockManager.getDatanodeManager().registerDatanode(registration);
} finally {
ns.writeUnlock();
}
return registration;
}
@Override
public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
StorageReport[] reports, long dnCacheCapacity, long dnCacheUsed,
int xmitsInProgress, int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease) throws IOException {
ns.readLock();
try {
long cacheCapacity = 0;
long cacheUsed = 0;
int maxTransfer = blockManager.getMaxReplicationStreams()
- xmitsInProgress;
DatanodeCommand[] cmds = blockManager.getDatanodeManager()
.handleHeartbeat(registration, reports, blockManager.getBlockPoolId(),
cacheCapacity, cacheUsed, xceiverCount, maxTransfer,
failedVolumes, volumeFailureSummary);
long txnId = 234;
NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
HAServiceProtocol.HAServiceState.ACTIVE, txnId);
RollingUpgradeInfo rollingUpgradeInfo = null;
long blockReportLeaseId = requestFullBlockReportLease ?
blockManager.requestBlockReportLeaseId(registration) : 0;
return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo,
blockReportLeaseId);
} finally {
ns.readUnlock();
}
}
@Override
public DatanodeCommand blockReport(DatanodeRegistration registration,
String poolId, StorageBlockReport[] reports, BlockReportContext context)
throws IOException {
for (int r = 0; r < reports.length; r++) {
final BlockListAsLongs storageContainerList = reports[r].getBlocks();
blockManager.processReport(registration, reports[r].getStorage(),
storageContainerList, context, r == (reports.length - 1));
}
return null;
}
@Override
public DatanodeCommand cacheReport(DatanodeRegistration registration,
String poolId, List<Long> blockIds) throws IOException {
// Centralized Cache Management is not supported
return null;
}
@Override
public void blockReceivedAndDeleted(DatanodeRegistration registration,
String poolId, StorageReceivedDeletedBlocks[] rcvdAndDeletedBlocks)
throws IOException {
for(StorageReceivedDeletedBlocks r : rcvdAndDeletedBlocks) {
ns.writeLock();
try {
blockManager.processIncrementalBlockReport(registration, r);
} finally {
ns.writeUnlock();
}
}
}
@Override
public void errorReport(DatanodeRegistration registration,
int errorCode, String msg) throws IOException {
String dnName =
(registration == null) ? "Unknown DataNode" : registration.toString();
if (errorCode == DatanodeProtocol.NOTIFY) {
LOG.info("Error report from " + dnName + ": " + msg);
return;
}
if (errorCode == DatanodeProtocol.DISK_ERROR) {
LOG.warn("Disk error on " + dnName + ": " + msg);
} else if (errorCode == DatanodeProtocol.FATAL_DISK_ERROR) {
LOG.warn("Fatal disk error on " + dnName + ": " + msg);
blockManager.getDatanodeManager().removeDatanode(registration);
} else {
LOG.info("Error report from " + dnName + ": " + msg);
}
}
@Override
public NamespaceInfo versionRequest() throws IOException {
ns.readLock();
try {
return new NamespaceInfo(1, "random", "random", 2,
NodeType.STORAGE_CONTAINER_SERVICE);
} finally {
ns.readUnlock();
}
}
@Override
public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
ns.writeLock();
try {
for (int i = 0; i < blocks.length; i++) {
ExtendedBlock blk = blocks[i].getBlock();
DatanodeInfo[] nodes = blocks[i].getLocations();
String[] storageIDs = blocks[i].getStorageIDs();
for (int j = 0; j < nodes.length; j++) {
blockManager.findAndMarkBlockAsCorrupt(blk, nodes[j],
storageIDs == null ? null: storageIDs[j],
"client machine reported it");
}
}
} finally {
ns.writeUnlock();
}
}
@Override
public void commitBlockSynchronization(ExtendedBlock block,
long newgenerationstamp, long newlength, boolean closeFile,
boolean deleteblock, DatanodeID[] newtargets, String[] newtargetstorages)
throws IOException {
// Not needed for the purpose of object store
throw new UnsupportedOperationException();
}
/**
* Returns information on registered DataNodes.
*
* @param type DataNode type to report
* @return registered DataNodes matching requested type
*/
public DatanodeInfo[] getDatanodeReport(DatanodeReportType type) {
ns.readLock();
try {
List<DatanodeDescriptor> results =
blockManager.getDatanodeManager().getDatanodeListForReport(type);
return results.toArray(new DatanodeInfo[results.size()]);
} finally {
ns.readUnlock();
}
}
/**
* Returns listen address of StorageContainerLocation RPC server.
*
* @return listen address of StorageContainerLocation RPC server
*/
@VisibleForTesting
public InetSocketAddress getStorageContainerLocationRpcAddress() {
return storageRpcAddress;
}
/**
* Start service.
*/
public void start() {
clientRpcServer.start();
if (serviceRpcServer != null) {
serviceRpcServer.start();
}
storageRpcServer.start();
}
/**
* Stop service.
*/
public void stop() {
if (clientRpcServer != null) {
clientRpcServer.stop();
}
if (serviceRpcServer != null) {
serviceRpcServer.stop();
}
if (storageRpcServer != null) {
storageRpcServer.stop();
}
IOUtils.closeStream(ns);
}
/**
* Wait until service has completed shutdown.
*/
public void join() {
try {
clientRpcServer.join();
if (serviceRpcServer != null) {
serviceRpcServer.join();
}
storageRpcServer.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.info("Interrupted during StorageContainerManager join.");
}
}
/**
* Builds a message for logging startup information about an RPC server.
*
* @param description RPC server description
* @param addr RPC server listening address
* @return server startup message
*/
private static String buildRpcServerStartMessage(String description,
InetSocketAddress addr) {
return addr != null ? String.format("%s is listening at %s",
description, NetUtils.getHostPortString(addr)) :
String.format("%s not started", description);
}
/**
* Starts an RPC server, if configured.
*
* @param conf configuration
* @param addr configured address of RPC server
* @param protocol RPC protocol provided by RPC server
* @param instance RPC protocol implementation instance
* @param bindHostKey configuration key for setting explicit bind host. If
* the property is not configured, then the bind host is taken from addr.
* @param handlerCountKey configuration key for RPC server handler count
* @param handlerCountDefault default RPC server handler count if unconfigured
* @return RPC server, or null if addr is null
* @throws IOException if there is an I/O error while creating RPC server
*/
private static RPC.Server startRpcServer(Configuration conf,
InetSocketAddress addr, Class<?> protocol, BlockingService instance,
String bindHostKey, String handlerCountKey, int handlerCountDefault)
throws IOException {
if (addr == null) {
return null;
}
String bindHost = conf.getTrimmed(bindHostKey);
if (bindHost == null || bindHost.isEmpty()) {
bindHost = addr.getHostName();
}
int numHandlers = conf.getInt(handlerCountKey, handlerCountDefault);
RPC.Server rpcServer = new RPC.Builder(conf)
.setProtocol(protocol)
.setInstance(instance)
.setBindAddress(bindHost)
.setPort(addr.getPort())
.setNumHandlers(numHandlers)
.setVerbose(false)
.setSecretManager(null)
.build();
DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer);
return rpcServer;
}
/**
* After starting an RPC server, updates configuration with the actual
* listening address of that server. The listening address may be different
* from the configured address if, for example, the configured address uses
* port 0 to request use of an ephemeral port.
*
* @param conf configuration to update
* @param rpcAddressKey configuration key for RPC server address
* @param addr configured address
* @param rpcServer started RPC server. If null, then the server was not
* started, and this method is a no-op.
*/
private static InetSocketAddress updateListenAddress(Configuration conf,
String rpcAddressKey, InetSocketAddress addr, RPC.Server rpcServer) {
if (rpcServer == null) {
return null;
}
InetSocketAddress listenAddr = rpcServer.getListenerAddress();
InetSocketAddress updatedAddr = new InetSocketAddress(
addr.getHostName(), listenAddr.getPort());
conf.set(rpcAddressKey, NetUtils.getHostPortString(updatedAddr));
return updatedAddr;
}
/**
* Main entry point for starting StorageContainerManager.
*
* @param argv arguments
* @throws IOException if startup fails due to I/O error
*/
public static void main(String[] argv) throws IOException {
StringUtils.startupShutdownMessage(
StorageContainerManager.class, argv, LOG);
StorageContainerManager scm = new StorageContainerManager(
new Configuration());
scm.start();
scm.join();
}
}

View File

@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.storage;
import java.io.Closeable;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.namenode.CacheManager;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
/**
* Namesystem implementation intended for use by StorageContainerManager.
*/
@InterfaceAudience.Private
public class StorageContainerNameService implements Namesystem, Closeable {
private final ReentrantReadWriteLock coarseLock =
new ReentrantReadWriteLock();
private volatile boolean serviceRunning = true;
@Override
public boolean isRunning() {
return serviceRunning;
}
@Override
public BlockCollection getBlockCollection(long id) {
// TBD
return null;
}
@Override
public void startSecretManagerIfNecessary() {
// Secret manager is not supported
}
@Override
public CacheManager getCacheManager() {
// Centralized Cache Management is not supported
return null;
}
@Override
public HAContext getHAContext() {
// HA mode is not supported
return null;
}
@Override
public boolean inTransitionToActive() {
// HA mode is not supported
return false;
}
@Override
public boolean isInSnapshot(long blockCollectionID) {
// Snapshots not supported
return false;
}
@Override
public void readLock() {
coarseLock.readLock().lock();
}
@Override
public void readUnlock() {
coarseLock.readLock().unlock();
}
@Override
public boolean hasReadLock() {
return coarseLock.getReadHoldCount() > 0 || hasWriteLock();
}
@Override
public void writeLock() {
coarseLock.writeLock().lock();
}
@Override
public void writeLockInterruptibly() throws InterruptedException {
coarseLock.writeLock().lockInterruptibly();
}
@Override
public void writeUnlock() {
coarseLock.writeLock().unlock();
}
@Override
public boolean hasWriteLock() {
return coarseLock.isWriteLockedByCurrentThread();
}
@Override
public boolean isInSafeMode() {
// Safe mode is not supported
return false;
}
@Override
public boolean isInStartupSafeMode() {
// Safe mode is not supported
return false;
}
@Override
public void close() {
serviceRunning = false;
}
}

View File

@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.storage;
/**
* This package contains StorageContainerManager classes.
*/

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* These .proto interfaces are private and unstable.
* Please see http://wiki.apache.org/hadoop/Compatibility
* for what changes are allowed for a *unstable* .proto interface.
*/
option java_package = "org.apache.hadoop.ozone.protocol.proto";
option java_outer_classname = "StorageContainerLocationProtocolProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.hdfs;
import "hdfs.proto";
/**
* keys - batch of object keys to find
*/
message GetStorageContainerLocationsRequestProto {
repeated string keys = 1;
}
/**
* locatedContainers - for each requested hash, nodes that currently host the
* container for that object key hash
*/
message GetStorageContainerLocationsResponseProto {
repeated LocatedContainerProto locatedContainers = 1;
}
/**
* Holds the nodes that currently host the container for an object key.
*/
message LocatedContainerProto {
required string key = 1;
required string matchedKeyPrefix = 2;
required string containerName = 3;
repeated DatanodeInfoProto locations = 4;
required DatanodeInfoProto leader = 5;
}
/**
* Protocol used from an HDFS node to StorageContainerManager. See the request
* and response messages for details of the RPC calls.
*/
service StorageContainerLocationProtocolService {
/**
* Find the set of nodes that currently host the container of an object, as
* identified by the object key hash. This method supports batch lookup by
* passing multiple key hashes.
*/
rpc getStorageContainerLocations(GetStorageContainerLocationsRequestProto)
returns(GetStorageContainerLocationsResponseProto);
}

View File

@ -1095,9 +1095,6 @@ public class MiniDFSCluster implements AutoCloseable {
*/ */
public static void configureNameNodes(MiniDFSNNTopology nnTopology, boolean federation, public static void configureNameNodes(MiniDFSNNTopology nnTopology, boolean federation,
Configuration conf) throws IOException { Configuration conf) throws IOException {
Preconditions.checkArgument(nnTopology.countNameNodes() > 0,
"empty NN topology: no namenodes specified!");
if (!federation && nnTopology.countNameNodes() == 1) { if (!federation && nnTopology.countNameNodes() == 1) {
NNConf onlyNN = nnTopology.getOnlyNameNode(); NNConf onlyNN = nnTopology.getOnlyNameNode();
// we only had one NN, set DEFAULT_NAME for it. If not explicitly // we only had one NN, set DEFAULT_NAME for it. If not explicitly

View File

@ -0,0 +1,168 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_ADDRESS_KEY;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.storage.StorageContainerManager;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
/**
* MiniOzoneCluster creates a complete in-process Ozone cluster suitable for
* running tests. The cluster consists of a StorageContainerManager and
* multiple DataNodes. This class subclasses {@link MiniDFSCluster} for
* convenient reuse of logic for starting DataNodes. Unlike MiniDFSCluster, it
* does not start a NameNode, because Ozone does not require a NameNode.
*/
@InterfaceAudience.Private
public class MiniOzoneCluster extends MiniDFSCluster implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(MiniOzoneCluster.class);
private final OzoneConfiguration conf;
private final StorageContainerManager scm;
/**
* Creates a new MiniOzoneCluster.
*
* @param builder cluster builder
* @param scm StorageContainerManager, already running
* @throws IOException if there is an I/O error
*/
private MiniOzoneCluster(Builder builder, StorageContainerManager scm)
throws IOException {
super(builder);
this.conf = builder.conf;
this.scm = scm;
}
/**
* Builder for configuring the MiniOzoneCluster to run.
*/
public static class Builder
extends org.apache.hadoop.hdfs.MiniDFSCluster.Builder {
private final OzoneConfiguration conf;
/**
* Creates a new Builder.
*
* @param conf configuration
*/
public Builder(OzoneConfiguration conf) {
super(conf);
this.conf = conf;
this.nnTopology(new MiniDFSNNTopology()); // No NameNode required
}
@Override
public Builder numDataNodes(int val) {
super.numDataNodes(val);
return this;
}
@Override
public MiniOzoneCluster build() throws IOException {
// Even though this won't start a NameNode, some of the logic in
// MiniDFSCluster expects to find the default file system configured with
// an HDFS URI.
conf.set(FS_DEFAULT_NAME_KEY, "hdfs://127.0.0.1:0");
conf.set(DFS_STORAGE_RPC_ADDRESS_KEY, "127.0.0.1:0");
StorageContainerManager scm = new StorageContainerManager(conf);
scm.start();
return new MiniOzoneCluster(this, scm);
}
}
@Override
public void close() {
shutdown();
}
@Override
public void shutdown() {
super.shutdown();
LOG.info("Shutting down the Mini Ozone Cluster");
if (scm == null) {
return;
}
LOG.info("Shutting down the StorageContainerManager");
scm.stop();
scm.join();
}
/**
* Waits for the Ozone cluster to be ready for processing requests.
*/
public void waitOzoneReady() {
long begin = Time.monotonicNow();
while (scm.getDatanodeReport(DatanodeReportType.LIVE).length <
numDataNodes) {
if (Time.monotonicNow() - begin > 20000) {
throw new IllegalStateException(
"Timed out waiting for Ozone cluster to become ready.");
}
LOG.info("Waiting for Ozone cluster to become ready");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(
"Interrupted while waiting for Ozone cluster to become ready.");
}
}
}
/**
* Creates an RPC proxy connected to this cluster's StorageContainerManager
* for accessing container location information. Callers take ownership of
* the proxy and must close it when done.
*
* @return RPC proxy for accessing container location information
* @throws IOException if there is an I/O error
*/
protected StorageContainerLocationProtocolClientSideTranslatorPB
createStorageContainerLocationClient() throws IOException {
long version = RPC.getProtocolVersion(
StorageContainerLocationProtocolPB.class);
InetSocketAddress address = scm.getStorageContainerLocationRpcAddress();
return new StorageContainerLocationProtocolClientSideTranslatorPB(
RPC.getProxy(StorageContainerLocationProtocolPB.class, version,
address, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf), Client.getTimeout(conf)));
}
}

View File

@ -0,0 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone;
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.Set;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.protocol.LocatedContainer;
import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
public class TestStorageContainerManager {
private static MiniOzoneCluster cluster;
private static OzoneConfiguration conf;
private static StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
@Rule
public ExpectedException exception = ExpectedException.none();
@BeforeClass
public static void init() throws IOException {
conf = new OzoneConfiguration();
conf.setBoolean(OzoneConfigKeys.DFS_OBJECTSTORE_ENABLED_KEY, true);
conf.set(OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY, "distributed");
conf.setBoolean(OzoneConfigKeys.DFS_OBJECTSTORE_TRACE_ENABLED_KEY, true);
}
@After
public void shutdown() throws InterruptedException {
IOUtils.cleanup(null, storageContainerLocationClient, cluster);
}
@Test
public void testLocationsForSingleKey() throws IOException {
cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(1).build();
cluster.waitOzoneReady();
storageContainerLocationClient =
cluster.createStorageContainerLocationClient();
Set<LocatedContainer> containers =
storageContainerLocationClient.getStorageContainerLocations(
new LinkedHashSet<>(Arrays.asList("/key1")));
assertNotNull(containers);
assertEquals(1, containers.size());
assertLocatedContainer(containers, "/key1", 1);
}
@Test
public void testLocationsForMultipleKeys() throws IOException {
cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(1).build();
cluster.waitOzoneReady();
storageContainerLocationClient =
cluster.createStorageContainerLocationClient();
Set<LocatedContainer> containers =
storageContainerLocationClient.getStorageContainerLocations(
new LinkedHashSet<>(Arrays.asList("/key1", "/key2", "/key3")));
assertNotNull(containers);
assertEquals(3, containers.size());
assertLocatedContainer(containers, "/key1", 1);
assertLocatedContainer(containers, "/key2", 1);
assertLocatedContainer(containers, "/key3", 1);
}
@Test
public void testNoDataNodes() throws IOException {
cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(0).build();
cluster.waitOzoneReady();
storageContainerLocationClient =
cluster.createStorageContainerLocationClient();
exception.expect(IOException.class);
exception.expectMessage("locations not found");
storageContainerLocationClient.getStorageContainerLocations(
new LinkedHashSet<>(Arrays.asList("/key1")));
}
@Test
public void testMultipleDataNodes() throws IOException {
cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(3).build();
cluster.waitOzoneReady();
storageContainerLocationClient =
cluster.createStorageContainerLocationClient();
Set<LocatedContainer> containers =
storageContainerLocationClient.getStorageContainerLocations(
new LinkedHashSet<>(Arrays.asList("/key1")));
assertNotNull(containers);
assertEquals(1, containers.size());
assertLocatedContainer(containers, "/key1", 3);
}
private static void assertLocatedContainer(Set<LocatedContainer> containers,
String key, int expectedNumLocations) {
LocatedContainer container = null;
for (LocatedContainer curContainer: containers) {
if (key.equals(curContainer.getKey())) {
container = curContainer;
break;
}
}
assertNotNull("Container for key " + key + " not found.", container);
assertEquals(key, container.getKey());
assertNotNull(container.getMatchedKeyPrefix());
assertFalse(container.getMatchedKeyPrefix().isEmpty());
assertNotNull(container.getContainerName());
assertFalse(container.getContainerName().isEmpty());
assertNotNull(container.getLocations());
assertEquals(expectedNumLocations, container.getLocations().size());
assertNotNull(container.getLeader());
}
}