HDFS-12665. [AliasMap] Create a version of the AliasMap that runs in memory in the Namenode (leveldb). Contributed by Ewan Higgs.

This commit is contained in:
Virajith Jalaparti 2017-11-30 10:37:28 -08:00 committed by Chris Douglas
parent cc933cba77
commit 352f994b64
32 changed files with 2020 additions and 124 deletions

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import org.apache.hadoop.fs.Path;
import javax.annotation.Nonnull;
import java.util.Arrays;
/**
* ProvidedStorageLocation is a location in an external storage system
* containing the data for a block (~Replica).
*/
public class ProvidedStorageLocation {
private final Path path;
private final long offset;
private final long length;
private final byte[] nonce;
public ProvidedStorageLocation(Path path, long offset, long length,
byte[] nonce) {
this.path = path;
this.offset = offset;
this.length = length;
this.nonce = Arrays.copyOf(nonce, nonce.length);
}
public @Nonnull Path getPath() {
return path;
}
public long getOffset() {
return offset;
}
public long getLength() {
return length;
}
public @Nonnull byte[] getNonce() {
// create a copy of the nonce and return it.
return Arrays.copyOf(nonce, nonce.length);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ProvidedStorageLocation that = (ProvidedStorageLocation) o;
if ((offset != that.offset) || (length != that.length)
|| !path.equals(that.path)) {
return false;
}
return Arrays.equals(nonce, that.nonce);
}
@Override
public int hashCode() {
int result = path.hashCode();
result = 31 * result + (int) (offset ^ (offset >>> 32));
result = 31 * result + (int) (length ^ (length >>> 32));
result = 31 * result + Arrays.hashCode(nonce);
return result;
}
}

View File

@ -96,6 +96,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
@ -3227,4 +3228,35 @@ public class PBHelperClient {
}
return ret;
}
public static ProvidedStorageLocation convert(
HdfsProtos.ProvidedStorageLocationProto providedStorageLocationProto) {
if (providedStorageLocationProto == null) {
return null;
}
String path = providedStorageLocationProto.getPath();
long length = providedStorageLocationProto.getLength();
long offset = providedStorageLocationProto.getOffset();
ByteString nonce = providedStorageLocationProto.getNonce();
if (path == null || length == -1 || offset == -1 || nonce == null) {
return null;
} else {
return new ProvidedStorageLocation(new Path(path), offset, length,
nonce.toByteArray());
}
}
public static HdfsProtos.ProvidedStorageLocationProto convert(
ProvidedStorageLocation providedStorageLocation) {
String path = providedStorageLocation.getPath().toString();
return HdfsProtos.ProvidedStorageLocationProto.newBuilder()
.setPath(path)
.setLength(providedStorageLocation.getLength())
.setOffset(providedStorageLocation.getOffset())
.setNonce(ByteString.copyFrom(providedStorageLocation.getNonce()))
.build();
}
}

View File

@ -45,6 +45,20 @@ message ExtendedBlockProto {
// here for historical reasons
}
/**
* ProvidedStorageLocation will contain the exact location in the provided
storage. The path, offset and length will result in ranged read. The nonce
is there to verify that you receive what you expect.
*/
message ProvidedStorageLocationProto {
required string path = 1;
required int64 offset = 2;
required int64 length = 3;
required bytes nonce = 4;
}
/**
* Identifies a Datanode
*/

View File

@ -191,7 +191,6 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<dependency>
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
<version>1.8</version>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
@ -208,6 +207,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
@ -341,6 +345,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<include>fsimage.proto</include>
<include>FederationProtocol.proto</include>
<include>RouterProtocol.proto</include>
<include>AliasMapProtocol.proto</include>
</includes>
</source>
</configuration>

View File

@ -95,6 +95,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
public static final String DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50105";
public static final String DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.backup.dnrpc-address";
public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS = "dfs.provided.aliasmap.inmemory.dnrpc-address";
public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT = "0.0.0.0:50200";
public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR = "dfs.provided.aliasmap.inmemory.leveldb.dir";
public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_BATCH_SIZE = "dfs.provided.aliasmap.inmemory.batch-size";
public static final int DFS_PROVIDED_ALIASMAP_INMEMORY_BATCH_SIZE_DEFAULT = 500;
public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED = "dfs.provided.aliasmap.inmemory.enabled";
public static final boolean DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED_DEFAULT = false;
public static final String DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY;
public static final long DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT =
@ -1633,4 +1641,5 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
@Deprecated
public static final long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =
HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT;
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos;
import org.apache.hadoop.ipc.ProtocolInfo;
/**
* Protocol between the Namenode and the Datanode to read the AliasMap
* used for Provided storage.
* TODO add Kerberos support
*/
@ProtocolInfo(
protocolName =
"org.apache.hadoop.hdfs.server.aliasmap.AliasMapProtocol",
protocolVersion = 1)
@InterfaceAudience.Private
public interface AliasMapProtocolPB extends
AliasMapProtocolProtos.AliasMapProtocolService.BlockingInterface {
}

View File

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
import org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.KeyValueProto;
import org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.ReadResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.WriteRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.WriteResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMapProtocol;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.*;
import static org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap.*;
/**
* AliasMapProtocolServerSideTranslatorPB is responsible for translating RPC
* calls and forwarding them to the internal InMemoryAliasMap.
*/
public class AliasMapProtocolServerSideTranslatorPB
implements AliasMapProtocolPB {
private final InMemoryAliasMapProtocol aliasMap;
public AliasMapProtocolServerSideTranslatorPB(
InMemoryAliasMapProtocol aliasMap) {
this.aliasMap = aliasMap;
}
private static final WriteResponseProto VOID_WRITE_RESPONSE =
WriteResponseProto.newBuilder().build();
@Override
public WriteResponseProto write(RpcController controller,
WriteRequestProto request) throws ServiceException {
try {
FileRegion toWrite =
PBHelper.convert(request.getKeyValuePair());
aliasMap.write(toWrite.getBlock(), toWrite.getProvidedStorageLocation());
return VOID_WRITE_RESPONSE;
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public ReadResponseProto read(RpcController controller,
ReadRequestProto request) throws ServiceException {
try {
Block toRead = PBHelperClient.convert(request.getKey());
Optional<ProvidedStorageLocation> optionalResult =
aliasMap.read(toRead);
ReadResponseProto.Builder builder = ReadResponseProto.newBuilder();
if (optionalResult.isPresent()) {
ProvidedStorageLocation providedStorageLocation = optionalResult.get();
builder.setValue(PBHelperClient.convert(providedStorageLocation));
}
return builder.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public ListResponseProto list(RpcController controller,
ListRequestProto request) throws ServiceException {
try {
BlockProto marker = request.getMarker();
IterationResult iterationResult;
if (marker.isInitialized()) {
iterationResult =
aliasMap.list(Optional.of(PBHelperClient.convert(marker)));
} else {
iterationResult = aliasMap.list(Optional.empty());
}
ListResponseProto.Builder responseBuilder =
ListResponseProto.newBuilder();
List<FileRegion> fileRegions = iterationResult.getFileRegions();
List<KeyValueProto> keyValueProtos = fileRegions.stream()
.map(PBHelper::convert).collect(Collectors.toList());
responseBuilder.addAllFileRegions(keyValueProtos);
Optional<Block> nextMarker = iterationResult.getNextBlock();
nextMarker
.map(m -> responseBuilder.setNextMarker(PBHelperClient.convert(m)));
return responseBuilder.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMapProtocol;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT;
import static org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.*;
import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.*;
/**
* This class is the client side translator to translate requests made to the
* {@link InMemoryAliasMapProtocol} interface to the RPC server implementing
* {@link AliasMapProtocolPB}.
*/
public class InMemoryAliasMapProtocolClientSideTranslatorPB
implements InMemoryAliasMapProtocol {
private static final Logger LOG =
LoggerFactory
.getLogger(InMemoryAliasMapProtocolClientSideTranslatorPB.class);
private AliasMapProtocolPB rpcProxy;
public InMemoryAliasMapProtocolClientSideTranslatorPB(Configuration conf) {
String addr = conf.getTrimmed(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS,
DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT);
InetSocketAddress aliasMapAddr = NetUtils.createSocketAddr(addr);
RPC.setProtocolEngine(conf, AliasMapProtocolPB.class,
ProtobufRpcEngine.class);
LOG.info("Connecting to address: " + addr);
try {
rpcProxy = RPC.getProxy(AliasMapProtocolPB.class,
RPC.getProtocolVersion(AliasMapProtocolPB.class), aliasMapAddr, null,
conf, NetUtils.getDefaultSocketFactory(conf), 0);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public InMemoryAliasMap.IterationResult list(Optional<Block> marker)
throws IOException {
ListRequestProto.Builder builder = ListRequestProto.newBuilder();
if (marker.isPresent()) {
builder.setMarker(PBHelperClient.convert(marker.get()));
}
ListRequestProto request = builder.build();
try {
ListResponseProto response = rpcProxy.list(null, request);
List<KeyValueProto> fileRegionsList = response.getFileRegionsList();
List<FileRegion> fileRegions = fileRegionsList
.stream()
.map(kv -> new FileRegion(
PBHelperClient.convert(kv.getKey()),
PBHelperClient.convert(kv.getValue()),
null
))
.collect(Collectors.toList());
BlockProto nextMarker = response.getNextMarker();
if (nextMarker.isInitialized()) {
return new InMemoryAliasMap.IterationResult(fileRegions,
Optional.of(PBHelperClient.convert(nextMarker)));
} else {
return new InMemoryAliasMap.IterationResult(fileRegions,
Optional.empty());
}
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Nonnull
@Override
public Optional<ProvidedStorageLocation> read(@Nonnull Block block)
throws IOException {
ReadRequestProto request =
ReadRequestProto
.newBuilder()
.setKey(PBHelperClient.convert(block))
.build();
try {
ReadResponseProto response = rpcProxy.read(null, request);
ProvidedStorageLocationProto providedStorageLocation =
response.getValue();
if (providedStorageLocation.isInitialized()) {
return Optional.of(PBHelperClient.convert(providedStorageLocation));
}
return Optional.empty();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public void write(@Nonnull Block block,
@Nonnull ProvidedStorageLocation providedStorageLocation)
throws IOException {
WriteRequestProto request =
WriteRequestProto
.newBuilder()
.setKeyValuePair(KeyValueProto.newBuilder()
.setKey(PBHelperClient.convert(block))
.setValue(PBHelperClient.convert(providedStorageLocation))
.build())
.build();
try {
rpcProxy.write(null, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
public void stop() {
RPC.stopProxy(rpcProxy);
}
}

View File

@ -36,6 +36,8 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
import org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.KeyValueProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECReconstructionCommandProto;
@ -56,6 +58,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstr
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ProvidedStorageLocationProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfosProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
@ -80,6 +83,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto;
import org.apache.hadoop.hdfs.security.token.block.BlockKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@ -1096,4 +1100,28 @@ public class PBHelper {
DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION,
blkECReconstructionInfos);
}
public static KeyValueProto convert(FileRegion fileRegion) {
return KeyValueProto
.newBuilder()
.setKey(PBHelperClient.convert(fileRegion.getBlock()))
.setValue(PBHelperClient.convert(
fileRegion.getProvidedStorageLocation()))
.build();
}
public static FileRegion
convert(KeyValueProto keyValueProto) {
BlockProto blockProto =
keyValueProto.getKey();
ProvidedStorageLocationProto providedStorageLocationProto =
keyValueProto.getValue();
Block block =
PBHelperClient.convert(blockProto);
ProvidedStorageLocation providedStorageLocation =
PBHelperClient.convert(providedStorageLocationProto);
return new FileRegion(block, providedStorageLocation, null);
}
}

View File

@ -0,0 +1,213 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.aliasmap;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ProvidedStorageLocationProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;
/**
* InMemoryAliasMap is an implementation of the InMemoryAliasMapProtocol for
* use with LevelDB.
*/
public class InMemoryAliasMap implements InMemoryAliasMapProtocol,
Configurable {
private static final Logger LOG = LoggerFactory
.getLogger(InMemoryAliasMap.class);
private final DB levelDb;
private Configuration conf;
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return this.conf;
}
@VisibleForTesting
static String createPathErrorMessage(String directory) {
return new StringBuilder()
.append("Configured directory '")
.append(directory)
.append("' doesn't exist")
.toString();
}
public static @Nonnull InMemoryAliasMap init(Configuration conf)
throws IOException {
Options options = new Options();
options.createIfMissing(true);
String directory =
conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR);
LOG.info("Attempting to load InMemoryAliasMap from \"{}\"", directory);
File path = new File(directory);
if (!path.exists()) {
String error = createPathErrorMessage(directory);
throw new IOException(error);
}
DB levelDb = JniDBFactory.factory.open(path, options);
InMemoryAliasMap aliasMap = new InMemoryAliasMap(levelDb);
aliasMap.setConf(conf);
return aliasMap;
}
@VisibleForTesting
InMemoryAliasMap(DB levelDb) {
this.levelDb = levelDb;
}
@Override
public IterationResult list(Optional<Block> marker) throws IOException {
return withIterator((DBIterator iterator) -> {
Integer batchSize =
conf.getInt(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_BATCH_SIZE,
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_BATCH_SIZE_DEFAULT);
if (marker.isPresent()) {
iterator.seek(toProtoBufBytes(marker.get()));
} else {
iterator.seekToFirst();
}
int i = 0;
ArrayList<FileRegion> batch =
Lists.newArrayListWithExpectedSize(batchSize);
while (iterator.hasNext() && i < batchSize) {
Map.Entry<byte[], byte[]> entry = iterator.next();
Block block = fromBlockBytes(entry.getKey());
ProvidedStorageLocation providedStorageLocation =
fromProvidedStorageLocationBytes(entry.getValue());
batch.add(new FileRegion(block, providedStorageLocation, null));
++i;
}
if (iterator.hasNext()) {
Block nextMarker = fromBlockBytes(iterator.next().getKey());
return new IterationResult(batch, Optional.of(nextMarker));
} else {
return new IterationResult(batch, Optional.empty());
}
});
}
public @Nonnull Optional<ProvidedStorageLocation> read(@Nonnull Block block)
throws IOException {
byte[] extendedBlockDbFormat = toProtoBufBytes(block);
byte[] providedStorageLocationDbFormat = levelDb.get(extendedBlockDbFormat);
if (providedStorageLocationDbFormat == null) {
return Optional.empty();
} else {
ProvidedStorageLocation providedStorageLocation =
fromProvidedStorageLocationBytes(providedStorageLocationDbFormat);
return Optional.of(providedStorageLocation);
}
}
public void write(@Nonnull Block block,
@Nonnull ProvidedStorageLocation providedStorageLocation)
throws IOException {
byte[] extendedBlockDbFormat = toProtoBufBytes(block);
byte[] providedStorageLocationDbFormat =
toProtoBufBytes(providedStorageLocation);
levelDb.put(extendedBlockDbFormat, providedStorageLocationDbFormat);
}
public void close() throws IOException {
levelDb.close();
}
@Nonnull
public static ProvidedStorageLocation fromProvidedStorageLocationBytes(
@Nonnull byte[] providedStorageLocationDbFormat)
throws InvalidProtocolBufferException {
ProvidedStorageLocationProto providedStorageLocationProto =
ProvidedStorageLocationProto
.parseFrom(providedStorageLocationDbFormat);
return PBHelperClient.convert(providedStorageLocationProto);
}
@Nonnull
public static Block fromBlockBytes(@Nonnull byte[] blockDbFormat)
throws InvalidProtocolBufferException {
BlockProto blockProto = BlockProto.parseFrom(blockDbFormat);
return PBHelperClient.convert(blockProto);
}
public static byte[] toProtoBufBytes(@Nonnull ProvidedStorageLocation
providedStorageLocation) throws IOException {
ProvidedStorageLocationProto providedStorageLocationProto =
PBHelperClient.convert(providedStorageLocation);
ByteArrayOutputStream providedStorageLocationOutputStream =
new ByteArrayOutputStream();
providedStorageLocationProto.writeTo(providedStorageLocationOutputStream);
return providedStorageLocationOutputStream.toByteArray();
}
public static byte[] toProtoBufBytes(@Nonnull Block block)
throws IOException {
BlockProto blockProto =
PBHelperClient.convert(block);
ByteArrayOutputStream blockOutputStream = new ByteArrayOutputStream();
blockProto.writeTo(blockOutputStream);
return blockOutputStream.toByteArray();
}
private IterationResult withIterator(
CheckedFunction<DBIterator, IterationResult> func) throws IOException {
try (DBIterator iterator = levelDb.iterator()) {
return func.apply(iterator);
}
}
/**
* CheckedFunction is akin to {@link java.util.function.Function} but
* specifies an IOException.
* @param <T> Argument type.
* @param <R> Return type.
*/
@FunctionalInterface
public interface CheckedFunction<T, R> {
R apply(T t) throws IOException;
}
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.aliasmap;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
/**
* Protocol used by clients to read/write data about aliases of
* provided blocks for an in-memory implementation of the
* {@link org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap}.
*/
public interface InMemoryAliasMapProtocol {
/**
* The result of a read from the in-memory aliasmap. It contains the
* a list of FileRegions that are returned, along with the next block
* from which the read operation must continue.
*/
class IterationResult {
private final List<FileRegion> batch;
private final Optional<Block> nextMarker;
public IterationResult(List<FileRegion> batch, Optional<Block> nextMarker) {
this.batch = batch;
this.nextMarker = nextMarker;
}
public List<FileRegion> getFileRegions() {
return batch;
}
public Optional<Block> getNextBlock() {
return nextMarker;
}
}
/**
* List the next batch of {@link FileRegion}s in the alias map starting from
* the given {@code marker}. To retrieve all {@link FileRegion}s stored in the
* alias map, multiple calls to this function might be required.
* @param marker the next block to get fileregions from.
* @return the {@link IterationResult} with a set of
* FileRegions and the next marker.
* @throws IOException
*/
InMemoryAliasMap.IterationResult list(Optional<Block> marker)
throws IOException;
/**
* Gets the {@link ProvidedStorageLocation} associated with the
* specified block.
* @param block the block to lookup
* @return the associated {@link ProvidedStorageLocation}.
* @throws IOException
*/
@Nonnull
Optional<ProvidedStorageLocation> read(@Nonnull Block block)
throws IOException;
/**
* Stores the block and it's associated {@link ProvidedStorageLocation}
* in the alias map.
* @param block
* @param providedStorageLocation
* @throws IOException
*/
void write(@Nonnull Block block,
@Nonnull ProvidedStorageLocation providedStorageLocation)
throws IOException;
}

View File

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.aliasmap;
import com.google.protobuf.BlockingService;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
import org.apache.hadoop.hdfs.protocolPB.AliasMapProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.AliasMapProtocolServerSideTranslatorPB;
import org.apache.hadoop.ipc.RPC;
import javax.annotation.Nonnull;
import java.io.Closeable;
import java.io.IOException;
import java.util.Optional;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT;
import static org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.*;
import static org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap.CheckedFunction;
/**
* InMemoryLevelDBAliasMapServer is the entry point from the Namenode into
* the {@link InMemoryAliasMap}.
*/
public class InMemoryLevelDBAliasMapServer implements InMemoryAliasMapProtocol,
Configurable, Closeable {
private static final Logger LOG = LoggerFactory
.getLogger(InMemoryLevelDBAliasMapServer.class);
private final CheckedFunction<Configuration, InMemoryAliasMap> initFun;
private RPC.Server aliasMapServer;
private Configuration conf;
private InMemoryAliasMap aliasMap;
public InMemoryLevelDBAliasMapServer(
CheckedFunction<Configuration, InMemoryAliasMap> initFun) {
this.initFun = initFun;
}
public void start() throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
throw new UnsupportedOperationException("Unable to start "
+ "InMemoryLevelDBAliasMapServer as security is enabled");
}
RPC.setProtocolEngine(getConf(), AliasMapProtocolPB.class,
ProtobufRpcEngine.class);
AliasMapProtocolServerSideTranslatorPB aliasMapProtocolXlator =
new AliasMapProtocolServerSideTranslatorPB(this);
BlockingService aliasMapProtocolService =
AliasMapProtocolService
.newReflectiveBlockingService(aliasMapProtocolXlator);
String rpcAddress =
conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS,
DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT);
String[] split = rpcAddress.split(":");
String bindHost = split[0];
Integer port = Integer.valueOf(split[1]);
aliasMapServer = new RPC.Builder(conf)
.setProtocol(AliasMapProtocolPB.class)
.setInstance(aliasMapProtocolService)
.setBindAddress(bindHost)
.setPort(port)
.setNumHandlers(1)
.setVerbose(true)
.build();
LOG.info("Starting InMemoryLevelDBAliasMapServer on ", rpcAddress);
aliasMapServer.start();
}
@Override
public InMemoryAliasMap.IterationResult list(Optional<Block> marker)
throws IOException {
return aliasMap.list(marker);
}
@Nonnull
@Override
public Optional<ProvidedStorageLocation> read(@Nonnull Block block)
throws IOException {
return aliasMap.read(block);
}
@Override
public void write(@Nonnull Block block,
@Nonnull ProvidedStorageLocation providedStorageLocation)
throws IOException {
aliasMap.write(block, providedStorageLocation);
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
try {
this.aliasMap = initFun.apply(conf);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public void close() {
LOG.info("Stopping InMemoryLevelDBAliasMapServer");
try {
aliasMap.close();
} catch (IOException e) {
LOG.error(e.getMessage());
}
aliasMapServer.stop();
}
}

View File

@ -17,9 +17,11 @@
*/
package org.apache.hadoop.hdfs.server.common;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
/**
* This class is used to represent provided blocks that are file regions,
@ -27,95 +29,70 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
*/
public class FileRegion implements BlockAlias {
private final Path path;
private final long offset;
private final long length;
private final long blockId;
private final Pair<Block, ProvidedStorageLocation> pair;
private final String bpid;
private final long genStamp;
public FileRegion(long blockId, Path path, long offset,
long length, String bpid, long genStamp) {
this.path = path;
this.offset = offset;
this.length = length;
this.blockId = blockId;
this.bpid = bpid;
this.genStamp = genStamp;
this(new Block(blockId, length, genStamp),
new ProvidedStorageLocation(path, offset, length, new byte[0]), bpid);
}
public FileRegion(long blockId, Path path, long offset,
long length, String bpid) {
this(blockId, path, offset, length, bpid,
HdfsConstants.GRANDFATHER_GENERATION_STAMP);
}
public FileRegion(long blockId, Path path, long offset,
long length, long genStamp) {
this(blockId, path, offset, length, null, genStamp);
}
public FileRegion(Block block,
ProvidedStorageLocation providedStorageLocation) {
this.pair = Pair.of(block, providedStorageLocation);
this.bpid = null;
}
public FileRegion(Block block,
ProvidedStorageLocation providedStorageLocation, String bpid) {
this.pair = Pair.of(block, providedStorageLocation);
this.bpid = bpid;
}
public FileRegion(long blockId, Path path, long offset, long length) {
this(blockId, path, offset, length, null);
}
@Override
public Block getBlock() {
return new Block(blockId, length, genStamp);
return pair.getKey();
}
@Override
public boolean equals(Object other) {
if (!(other instanceof FileRegion)) {
return false;
}
FileRegion o = (FileRegion) other;
return blockId == o.blockId
&& offset == o.offset
&& length == o.length
&& genStamp == o.genStamp
&& path.equals(o.path);
}
@Override
public int hashCode() {
return (int)(blockId & Integer.MIN_VALUE);
}
public Path getPath() {
return path;
}
public long getOffset() {
return offset;
}
public long getLength() {
return length;
}
public long getGenerationStamp() {
return genStamp;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{ block=\"").append(getBlock()).append("\"");
sb.append(", path=\"").append(getPath()).append("\"");
sb.append(", off=\"").append(getOffset()).append("\"");
sb.append(", len=\"").append(getBlock().getNumBytes()).append("\"");
sb.append(", genStamp=\"").append(getBlock()
.getGenerationStamp()).append("\"");
sb.append(", bpid=\"").append(bpid).append("\"");
sb.append(" }");
return sb.toString();
public ProvidedStorageLocation getProvidedStorageLocation() {
return pair.getValue();
}
public String getBlockPoolId() {
return this.bpid;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FileRegion that = (FileRegion) o;
return pair.equals(that.pair);
}
@Override
public int hashCode() {
return pair.hashCode();
}
}

View File

@ -19,6 +19,8 @@ package org.apache.hadoop.hdfs.server.common.blockaliasmap;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.Optional;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.BlockAlias;
@ -28,6 +30,19 @@ import org.apache.hadoop.hdfs.server.common.BlockAlias;
*/
public abstract class BlockAliasMap<T extends BlockAlias> {
/**
* ImmutableIterator is an Iterator that does not support the remove
* operation. This could inherit {@link java.util.Enumeration} but Iterator
* is supported by more APIs and Enumeration's javadoc even suggests using
* Iterator instead.
*/
public abstract class ImmutableIterator implements Iterator<T> {
public void remove() {
throw new UnsupportedOperationException(
"Remove is not supported for provided storage");
}
}
/**
* An abstract class that is used to read {@link BlockAlias}es
* for provided blocks.
@ -45,7 +60,7 @@ public abstract class BlockAliasMap<T extends BlockAlias> {
* @return BlockAlias correspoding to the provided block.
* @throws IOException
*/
public abstract U resolve(Block ident) throws IOException;
public abstract Optional<U> resolve(Block ident) throws IOException;
}
@ -85,4 +100,6 @@ public abstract class BlockAliasMap<T extends BlockAlias> {
*/
public abstract void refresh() throws IOException;
public abstract void close() throws IOException;
}

View File

@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common.blockaliasmap.impl;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
import org.apache.hadoop.hdfs.protocolPB.InMemoryAliasMapProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
/**
* InMemoryLevelDBAliasMapClient is the client for the InMemoryAliasMapServer.
* This is used by the Datanode and fs2img to store and retrieve FileRegions
* based on the given Block.
*/
public class InMemoryLevelDBAliasMapClient extends BlockAliasMap<FileRegion>
implements Configurable {
private Configuration conf;
private InMemoryAliasMapProtocolClientSideTranslatorPB aliasMap;
@Override
public void close() {
aliasMap.stop();
}
class LevelDbReader extends BlockAliasMap.Reader<FileRegion> {
@Override
public Optional<FileRegion> resolve(Block block) throws IOException {
Optional<ProvidedStorageLocation> read = aliasMap.read(block);
return read.map(psl -> new FileRegion(block, psl, null));
}
@Override
public void close() throws IOException {
}
private class LevelDbIterator
extends BlockAliasMap<FileRegion>.ImmutableIterator {
private Iterator<FileRegion> iterator;
private Optional<Block> nextMarker;
LevelDbIterator() {
batch(Optional.empty());
}
private void batch(Optional<Block> newNextMarker) {
try {
InMemoryAliasMap.IterationResult iterationResult =
aliasMap.list(newNextMarker);
List<FileRegion> fileRegions = iterationResult.getFileRegions();
this.iterator = fileRegions.iterator();
this.nextMarker = iterationResult.getNextBlock();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public boolean hasNext() {
return iterator.hasNext() || nextMarker.isPresent();
}
@Override
public FileRegion next() {
if (iterator.hasNext()) {
return iterator.next();
} else {
if (nextMarker.isPresent()) {
batch(nextMarker);
return next();
} else {
throw new NoSuchElementException();
}
}
}
}
@Override
public Iterator<FileRegion> iterator() {
return new LevelDbIterator();
}
}
class LevelDbWriter extends BlockAliasMap.Writer<FileRegion> {
@Override
public void store(FileRegion fileRegion) throws IOException {
aliasMap.write(fileRegion.getBlock(),
fileRegion.getProvidedStorageLocation());
}
@Override
public void close() throws IOException {
}
}
InMemoryLevelDBAliasMapClient() {
if (UserGroupInformation.isSecurityEnabled()) {
throw new UnsupportedOperationException("Unable to start "
+ "InMemoryLevelDBAliasMapClient as security is enabled");
}
}
@Override
public Reader<FileRegion> getReader(Reader.Options opts) throws IOException {
return new LevelDbReader();
}
@Override
public Writer<FileRegion> getWriter(Writer.Options opts) throws IOException {
return new LevelDbWriter();
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
this.aliasMap = new InMemoryAliasMapProtocolClientSideTranslatorPB(conf);
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public void refresh() throws IOException {
}
}

View File

@ -32,6 +32,7 @@ import java.util.Map;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.NoSuchElementException;
import java.util.Optional;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
@ -40,6 +41,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
import org.apache.hadoop.io.MultipleIOException;
@ -160,7 +162,7 @@ public class TextFileRegionAliasMap
file = new Path(tmpfile);
delim = conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER,
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT);
LOG.info("TextFileRegionAliasMap: read path " + tmpfile.toString());
LOG.info("TextFileRegionAliasMap: read path {}", tmpfile);
}
@Override
@ -190,7 +192,7 @@ public class TextFileRegionAliasMap
private Configuration conf;
private String codec = null;
private Path file =
new Path(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_PATH_DEFAULT);;
new Path(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_PATH_DEFAULT);
private String delim =
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT;
@ -252,7 +254,7 @@ public class TextFileRegionAliasMap
Options delimiter(String delim);
}
static ReaderOptions defaults() {
public static ReaderOptions defaults() {
return new ReaderOptions();
}
@ -278,14 +280,14 @@ public class TextFileRegionAliasMap
}
@Override
public FileRegion resolve(Block ident) throws IOException {
public Optional<FileRegion> resolve(Block ident) throws IOException {
// consider layering index w/ composable format
Iterator<FileRegion> i = iterator();
try {
while (i.hasNext()) {
FileRegion f = i.next();
if (f.getBlock().equals(ident)) {
return f;
return Optional.of(f);
}
}
} finally {
@ -295,7 +297,7 @@ public class TextFileRegionAliasMap
r.close();
}
}
return null;
return Optional.empty();
}
class FRIterator implements Iterator<FileRegion> {
@ -342,8 +344,8 @@ public class TextFileRegionAliasMap
throw new IOException("Invalid line: " + line);
}
return new FileRegion(Long.parseLong(f[0]), new Path(f[1]),
Long.parseLong(f[2]), Long.parseLong(f[3]), f[5],
Long.parseLong(f[4]));
Long.parseLong(f[2]), Long.parseLong(f[3]), f[4],
Long.parseLong(f[5]));
}
public InputStream createStream() throws IOException {
@ -390,7 +392,6 @@ public class TextFileRegionAliasMap
throw MultipleIOException.createIOException(ex);
}
}
}
/**
@ -422,12 +423,16 @@ public class TextFileRegionAliasMap
@Override
public void store(FileRegion token) throws IOException {
out.append(String.valueOf(token.getBlock().getBlockId())).append(delim);
out.append(token.getPath().toString()).append(delim);
out.append(Long.toString(token.getOffset())).append(delim);
out.append(Long.toString(token.getLength())).append(delim);
out.append(Long.toString(token.getGenerationStamp())).append(delim);
out.append(token.getBlockPoolId()).append("\n");
final Block block = token.getBlock();
final ProvidedStorageLocation psl = token.getProvidedStorageLocation();
out.append(String.valueOf(block.getBlockId())).append(delim);
out.append(psl.getPath().toString()).append(delim);
out.append(Long.toString(psl.getOffset())).append(delim);
out.append(Long.toString(psl.getLength())).append(delim);
out.append(token.getBlockPoolId()).append(delim);
out.append(Long.toString(block.getGenerationStamp())).append(delim);
out.append("\n");
}
@Override
@ -443,4 +448,9 @@ public class TextFileRegionAliasMap
"Refresh not supported by " + getClass());
}
@Override
public void close() throws IOException {
//nothing to do;
}
}

View File

@ -22,6 +22,7 @@ import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
@ -38,6 +39,16 @@ public class FinalizedProvidedReplica extends ProvidedReplica {
remoteFS);
}
public FinalizedProvidedReplica(FileRegion fileRegion, FsVolumeSpi volume,
Configuration conf, FileSystem remoteFS) {
super(fileRegion.getBlock().getBlockId(),
fileRegion.getProvidedStorageLocation().getPath().toUri(),
fileRegion.getProvidedStorageLocation().getOffset(),
fileRegion.getBlock().getNumBytes(),
fileRegion.getBlock().getGenerationStamp(),
volume, conf, remoteFS);
}
public FinalizedProvidedReplica(long blockId, Path pathPrefix,
String pathSuffix, long fileOffset, long blockLen, long genStamp,
FsVolumeSpi volume, Configuration conf, FileSystem remoteFS) {

View File

@ -315,12 +315,7 @@ public class ReplicaBuilder {
offset, length, genStamp, volume, conf, remoteFS);
}
} else {
info = new FinalizedProvidedReplica(fileRegion.getBlock().getBlockId(),
fileRegion.getPath().toUri(),
fileRegion.getOffset(),
fileRegion.getBlock().getNumBytes(),
fileRegion.getBlock().getGenerationStamp(),
volume, conf, remoteFS);
info = new FinalizedProvidedReplica(fileRegion, volume, conf, remoteFS);
}
return info;
}

View File

@ -148,7 +148,7 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
this.aliasMap = blockAliasMap;
}
public void getVolumeMap(ReplicaMap volumeMap,
void fetchVolumeMap(ReplicaMap volumeMap,
RamDiskReplicaTracker ramDiskReplicaMap, FileSystem remoteFS)
throws IOException {
BlockAliasMap.Reader<FileRegion> reader = aliasMap.getReader(null);
@ -157,21 +157,19 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
+ "; no blocks will be populated");
return;
}
Iterator<FileRegion> iter = reader.iterator();
Path blockPrefixPath = new Path(providedVolume.getBaseURI());
while (iter.hasNext()) {
FileRegion region = iter.next();
for (FileRegion region : reader) {
if (region.getBlockPoolId() != null
&& region.getBlockPoolId().equals(bpid)
&& containsBlock(providedVolume.baseURI,
region.getPath().toUri())) {
String blockSuffix =
getSuffix(blockPrefixPath, new Path(region.getPath().toUri()));
region.getProvidedStorageLocation().getPath().toUri())) {
String blockSuffix = getSuffix(blockPrefixPath,
new Path(region.getProvidedStorageLocation().getPath().toUri()));
ReplicaInfo newReplica = new ReplicaBuilder(ReplicaState.FINALIZED)
.setBlockId(region.getBlock().getBlockId())
.setPathPrefix(blockPrefixPath)
.setPathSuffix(blockSuffix)
.setOffset(region.getOffset())
.setOffset(region.getProvidedStorageLocation().getOffset())
.setLength(region.getBlock().getNumBytes())
.setGenerationStamp(region.getBlock().getGenerationStamp())
.setFsVolume(providedVolume)
@ -216,18 +214,12 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
*/
aliasMap.refresh();
BlockAliasMap.Reader<FileRegion> reader = aliasMap.getReader(null);
if (reader == null) {
LOG.warn("Got null reader from BlockAliasMap " + aliasMap
+ "; no blocks will be populated in scan report");
return;
}
Iterator<FileRegion> iter = reader.iterator();
while(iter.hasNext()) {
for (FileRegion region : reader) {
reportCompiler.throttle();
FileRegion region = iter.next();
if (region.getBlockPoolId().equals(bpid)) {
report.add(new ScanInfo(region.getBlock().getBlockId(),
providedVolume, region, region.getLength()));
providedVolume, region,
region.getProvidedStorageLocation().getLength()));
}
}
}
@ -522,7 +514,7 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
throws IOException {
LOG.info("Creating volumemap for provided volume " + this);
for(ProvidedBlockPoolSlice s : bpSlices.values()) {
s.getVolumeMap(volumeMap, ramDiskReplicaMap, remoteFS);
s.fetchVolumeMap(volumeMap, ramDiskReplicaMap, remoteFS);
}
}
@ -539,7 +531,7 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
void getVolumeMap(String bpid, ReplicaMap volumeMap,
final RamDiskReplicaTracker ramDiskReplicaMap)
throws IOException {
getProvidedBlockPoolSlice(bpid).getVolumeMap(volumeMap, ramDiskReplicaMap,
getProvidedBlockPoolSlice(bpid).fetchVolumeMap(volumeMap, ramDiskReplicaMap,
remoteFS);
}
@ -689,6 +681,12 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
return !volumeURI.relativize(blockURI).equals(blockURI);
}
@VisibleForTesting
BlockAliasMap<FileRegion> getFileRegionProvider(String bpid) throws
IOException {
return getProvidedBlockPoolSlice(bpid).getBlockAliasMap();
}
@VisibleForTesting
void setFileRegionProvider(String bpid,
BlockAliasMap<FileRegion> blockAliasMap) throws IOException {

View File

@ -45,6 +45,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryLevelDBAliasMapServer;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
@ -208,6 +210,8 @@ public class NameNode extends ReconfigurableBase implements
HdfsConfiguration.init();
}
private InMemoryLevelDBAliasMapServer levelDBAliasMapServer;
/**
* Categories of operations supported by the namenode.
*/
@ -745,6 +749,20 @@ public class NameNode extends ReconfigurableBase implements
startCommonServices(conf);
startMetricsLogger(conf);
startAliasMapServerIfNecessary(conf);
}
private void startAliasMapServerIfNecessary(Configuration conf)
throws IOException {
if (conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED,
DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED_DEFAULT)
&& conf.getBoolean(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED,
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED_DEFAULT)) {
levelDBAliasMapServer =
new InMemoryLevelDBAliasMapServer(InMemoryAliasMap::init);
levelDBAliasMapServer.setConf(conf);
levelDBAliasMapServer.start();
}
}
private void initReconfigurableBackoffKey() {
@ -1027,6 +1045,9 @@ public class NameNode extends ReconfigurableBase implements
MBeans.unregister(nameNodeStatusBeanName);
nameNodeStatusBeanName = null;
}
if (levelDBAliasMapServer != null) {
levelDBAliasMapServer.close();
}
}
tracer.close();
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.hdfs.protocol.proto";
option java_outer_classname = "AliasMapProtocolProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.hdfs;
import "hdfs.proto";
message KeyValueProto {
optional BlockProto key = 1;
optional ProvidedStorageLocationProto value = 2;
}
message WriteRequestProto {
required KeyValueProto keyValuePair = 1;
}
message WriteResponseProto {
}
message ReadRequestProto {
required BlockProto key = 1;
}
message ReadResponseProto {
optional ProvidedStorageLocationProto value = 1;
}
message ListRequestProto {
optional BlockProto marker = 1;
}
message ListResponseProto {
repeated KeyValueProto fileRegions = 1;
optional BlockProto nextMarker = 2;
}
service AliasMapProtocolService {
rpc write(WriteRequestProto) returns(WriteResponseProto);
rpc read(ReadRequestProto) returns(ReadResponseProto);
rpc list(ListRequestProto) returns(ListResponseProto);
}

View File

@ -4652,6 +4652,40 @@
</description>
</property>
<property>
<name>dfs.provided.aliasmap.inmemory.batch-size</name>
<value>500</value>
<description>
The batch size when iterating over the database backing the aliasmap
</description>
</property>
<property>
<name>dfs.provided.aliasmap.inmemory.dnrpc-address</name>
<value>0.0.0.0:50200</value>
<description>
The address where the aliasmap server will be running
</description>
</property>
<property>
<name>dfs.provided.aliasmap.inmemory.leveldb.dir</name>
<value>/tmp</value>
<description>
The directory where the leveldb files will be kept
</description>
</property>
<property>
<name>dfs.provided.aliasmap.inmemory.enabled</name>
<value>false</value>
<description>
Don't use the aliasmap by default. Some tests will fail
because they try to start the namenode twice with the
same parameters if you turn it on.
</description>
</property>
<property>
<name>dfs.provided.aliasmap.text.delimiter</name>
<value>,</value>

View File

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.aliasmap;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Optional;
/**
* ITestInMemoryAliasMap is an integration test that writes and reads to
* an AliasMap. This is an integration test because it can't be run in parallel
* like normal unit tests since there is conflict over the port being in use.
*/
public class ITestInMemoryAliasMap {
private InMemoryAliasMap aliasMap;
private File tempDirectory;
@Before
public void setUp() throws Exception {
Configuration conf = new Configuration();
tempDirectory = Files.createTempDirectory("seagull").toFile();
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR,
tempDirectory.getAbsolutePath());
aliasMap = InMemoryAliasMap.init(conf);
}
@After
public void tearDown() throws Exception {
aliasMap.close();
FileUtils.deleteDirectory(tempDirectory);
}
@Test
public void readNotFoundReturnsNothing() throws IOException {
Block block = new Block(42, 43, 44);
Optional<ProvidedStorageLocation> actualProvidedStorageLocationOpt
= aliasMap.read(block);
assertFalse(actualProvidedStorageLocationOpt.isPresent());
}
@Test
public void readWrite() throws Exception {
Block block = new Block(42, 43, 44);
Path path = new Path("eagle", "mouse");
long offset = 47;
long length = 48;
int nonceSize = 4;
byte[] nonce = new byte[nonceSize];
Arrays.fill(nonce, 0, (nonceSize - 1), Byte.parseByte("0011", 2));
ProvidedStorageLocation expectedProvidedStorageLocation =
new ProvidedStorageLocation(path, offset, length, nonce);
aliasMap.write(block, expectedProvidedStorageLocation);
Optional<ProvidedStorageLocation> actualProvidedStorageLocationOpt
= aliasMap.read(block);
assertTrue(actualProvidedStorageLocationOpt.isPresent());
assertEquals(expectedProvidedStorageLocation,
actualProvidedStorageLocationOpt.get());
}
@Test
public void list() throws IOException {
Block block1 = new Block(42, 43, 44);
Block block2 = new Block(43, 44, 45);
Block block3 = new Block(44, 45, 46);
Path path = new Path("eagle", "mouse");
int nonceSize = 4;
byte[] nonce = new byte[nonceSize];
Arrays.fill(nonce, 0, (nonceSize - 1), Byte.parseByte("0011", 2));
ProvidedStorageLocation expectedProvidedStorageLocation1 =
new ProvidedStorageLocation(path, 47, 48, nonce);
ProvidedStorageLocation expectedProvidedStorageLocation2 =
new ProvidedStorageLocation(path, 48, 49, nonce);
ProvidedStorageLocation expectedProvidedStorageLocation3 =
new ProvidedStorageLocation(path, 49, 50, nonce);
aliasMap.write(block1, expectedProvidedStorageLocation1);
aliasMap.write(block2, expectedProvidedStorageLocation2);
aliasMap.write(block3, expectedProvidedStorageLocation3);
InMemoryAliasMap.IterationResult list = aliasMap.list(Optional.empty());
// we should have 3 results
assertEquals(3, list.getFileRegions().size());
// no more results expected
assertFalse(list.getNextBlock().isPresent());
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.aliasmap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.junit.Test;
import java.io.IOException;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
/**
* TestInMemoryAliasMap tests the initialization of an AliasMap. Most of the
* rest of the tests are in ITestInMemoryAliasMap since the tests are not
* thread safe (there is competition for the port).
*/
public class TestInMemoryAliasMap {
@Test
public void testInit() {
String nonExistingDirectory = "non-existing-directory";
Configuration conf = new Configuration();
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR,
nonExistingDirectory);
assertThatExceptionOfType(IOException.class)
.isThrownBy(() -> InMemoryAliasMap.init(conf)).withMessage(
InMemoryAliasMap.createPathErrorMessage(nonExistingDirectory));
}
}

View File

@ -28,7 +28,6 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.util.RwLock;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import static org.junit.Assert.assertNotNull;

View File

@ -0,0 +1,341 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common.blockaliasmap.impl;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryLevelDBAliasMapServer;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
/**
* Tests the {@link InMemoryLevelDBAliasMapClient}.
*/
public class TestInMemoryLevelDBAliasMapClient {
private InMemoryLevelDBAliasMapServer levelDBAliasMapServer;
private InMemoryLevelDBAliasMapClient inMemoryLevelDBAliasMapClient;
private File tempDir;
private Configuration conf;
@Before
public void setUp() throws IOException {
levelDBAliasMapServer =
new InMemoryLevelDBAliasMapServer(InMemoryAliasMap::init);
conf = new Configuration();
int port = 9876;
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS,
"localhost:" + port);
tempDir = Files.createTempDir();
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR,
tempDir.getAbsolutePath());
inMemoryLevelDBAliasMapClient = new InMemoryLevelDBAliasMapClient();
}
@After
public void tearDown() throws IOException {
levelDBAliasMapServer.close();
inMemoryLevelDBAliasMapClient.close();
FileUtils.deleteDirectory(tempDir);
}
@Test
public void writeRead() throws Exception {
inMemoryLevelDBAliasMapClient.setConf(conf);
levelDBAliasMapServer.setConf(conf);
levelDBAliasMapServer.start();
Block block = new Block(42, 43, 44);
byte[] nonce = "blackbird".getBytes();
ProvidedStorageLocation providedStorageLocation
= new ProvidedStorageLocation(new Path("cuckoo"),
45, 46, nonce);
BlockAliasMap.Writer<FileRegion> writer =
inMemoryLevelDBAliasMapClient.getWriter(null);
writer.store(new FileRegion(block, providedStorageLocation));
BlockAliasMap.Reader<FileRegion> reader =
inMemoryLevelDBAliasMapClient.getReader(null);
Optional<FileRegion> fileRegion = reader.resolve(block);
assertEquals(new FileRegion(block, providedStorageLocation),
fileRegion.get());
}
@Test
public void iterateSingleBatch() throws Exception {
inMemoryLevelDBAliasMapClient.setConf(conf);
levelDBAliasMapServer.setConf(conf);
levelDBAliasMapServer.start();
Block block1 = new Block(42, 43, 44);
Block block2 = new Block(43, 44, 45);
byte[] nonce1 = "blackbird".getBytes();
byte[] nonce2 = "cuckoo".getBytes();
ProvidedStorageLocation providedStorageLocation1 =
new ProvidedStorageLocation(new Path("eagle"),
46, 47, nonce1);
ProvidedStorageLocation providedStorageLocation2 =
new ProvidedStorageLocation(new Path("falcon"),
46, 47, nonce2);
BlockAliasMap.Writer<FileRegion> writer1 =
inMemoryLevelDBAliasMapClient.getWriter(null);
writer1.store(new FileRegion(block1, providedStorageLocation1));
BlockAliasMap.Writer<FileRegion> writer2 =
inMemoryLevelDBAliasMapClient.getWriter(null);
writer2.store(new FileRegion(block2, providedStorageLocation2));
BlockAliasMap.Reader<FileRegion> reader =
inMemoryLevelDBAliasMapClient.getReader(null);
List<FileRegion> actualFileRegions =
Lists.newArrayListWithCapacity(2);
for (FileRegion fileRegion : reader) {
actualFileRegions.add(fileRegion);
}
assertArrayEquals(
new FileRegion[] {new FileRegion(block1, providedStorageLocation1),
new FileRegion(block2, providedStorageLocation2)},
actualFileRegions.toArray());
}
@Test
public void iterateThreeBatches() throws Exception {
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_BATCH_SIZE, "2");
levelDBAliasMapServer.setConf(conf);
inMemoryLevelDBAliasMapClient.setConf(conf);
levelDBAliasMapServer.start();
Block block1 = new Block(42, 43, 44);
Block block2 = new Block(43, 44, 45);
Block block3 = new Block(44, 45, 46);
Block block4 = new Block(47, 48, 49);
Block block5 = new Block(50, 51, 52);
Block block6 = new Block(53, 54, 55);
byte[] nonce1 = "blackbird".getBytes();
byte[] nonce2 = "cuckoo".getBytes();
byte[] nonce3 = "sparrow".getBytes();
byte[] nonce4 = "magpie".getBytes();
byte[] nonce5 = "seagull".getBytes();
byte[] nonce6 = "finch".getBytes();
ProvidedStorageLocation providedStorageLocation1 =
new ProvidedStorageLocation(new Path("eagle"),
46, 47, nonce1);
ProvidedStorageLocation providedStorageLocation2 =
new ProvidedStorageLocation(new Path("falcon"),
48, 49, nonce2);
ProvidedStorageLocation providedStorageLocation3 =
new ProvidedStorageLocation(new Path("robin"),
50, 51, nonce3);
ProvidedStorageLocation providedStorageLocation4 =
new ProvidedStorageLocation(new Path("parakeet"),
52, 53, nonce4);
ProvidedStorageLocation providedStorageLocation5 =
new ProvidedStorageLocation(new Path("heron"),
54, 55, nonce5);
ProvidedStorageLocation providedStorageLocation6 =
new ProvidedStorageLocation(new Path("duck"),
56, 57, nonce6);
inMemoryLevelDBAliasMapClient
.getWriter(null)
.store(new FileRegion(block1, providedStorageLocation1));
inMemoryLevelDBAliasMapClient
.getWriter(null)
.store(new FileRegion(block2, providedStorageLocation2));
inMemoryLevelDBAliasMapClient
.getWriter(null)
.store(new FileRegion(block3, providedStorageLocation3));
inMemoryLevelDBAliasMapClient
.getWriter(null)
.store(new FileRegion(block4, providedStorageLocation4));
inMemoryLevelDBAliasMapClient
.getWriter(null)
.store(new FileRegion(block5, providedStorageLocation5));
inMemoryLevelDBAliasMapClient
.getWriter(null)
.store(new FileRegion(block6, providedStorageLocation6));
BlockAliasMap.Reader<FileRegion> reader =
inMemoryLevelDBAliasMapClient.getReader(null);
List<FileRegion> actualFileRegions =
Lists.newArrayListWithCapacity(6);
for (FileRegion fileRegion : reader) {
actualFileRegions.add(fileRegion);
}
FileRegion[] expectedFileRegions =
new FileRegion[] {new FileRegion(block1, providedStorageLocation1),
new FileRegion(block2, providedStorageLocation2),
new FileRegion(block3, providedStorageLocation3),
new FileRegion(block4, providedStorageLocation4),
new FileRegion(block5, providedStorageLocation5),
new FileRegion(block6, providedStorageLocation6)};
assertArrayEquals(expectedFileRegions, actualFileRegions.toArray());
}
class ReadThread implements Runnable {
private final Block block;
private final BlockAliasMap.Reader<FileRegion> reader;
private int delay;
private Optional<FileRegion> fileRegionOpt;
ReadThread(Block block, BlockAliasMap.Reader<FileRegion> reader,
int delay) {
this.block = block;
this.reader = reader;
this.delay = delay;
}
public Optional<FileRegion> getFileRegion() {
return fileRegionOpt;
}
@Override
public void run() {
try {
Thread.sleep(delay);
fileRegionOpt = reader.resolve(block);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
class WriteThread implements Runnable {
private final Block block;
private final BlockAliasMap.Writer<FileRegion> writer;
private final ProvidedStorageLocation providedStorageLocation;
private int delay;
WriteThread(Block block, ProvidedStorageLocation providedStorageLocation,
BlockAliasMap.Writer<FileRegion> writer, int delay) {
this.block = block;
this.writer = writer;
this.providedStorageLocation = providedStorageLocation;
this.delay = delay;
}
@Override
public void run() {
try {
Thread.sleep(delay);
writer.store(new FileRegion(block, providedStorageLocation));
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public FileRegion generateRandomFileRegion(int seed) {
Block block = new Block(seed, seed + 1, seed + 2);
Path path = new Path("koekoek");
byte[] nonce = new byte[0];
ProvidedStorageLocation providedStorageLocation =
new ProvidedStorageLocation(path, seed + 3, seed + 4, nonce);
return new FileRegion(block, providedStorageLocation);
}
@Test
public void multipleReads() throws IOException {
inMemoryLevelDBAliasMapClient.setConf(conf);
levelDBAliasMapServer.setConf(conf);
levelDBAliasMapServer.start();
Random r = new Random();
List<FileRegion> expectedFileRegions = r.ints(0, 200)
.limit(50)
.boxed()
.map(i -> generateRandomFileRegion(i))
.collect(Collectors.toList());
BlockAliasMap.Reader<FileRegion> reader =
inMemoryLevelDBAliasMapClient.getReader(null);
BlockAliasMap.Writer<FileRegion> writer =
inMemoryLevelDBAliasMapClient.getWriter(null);
ExecutorService executor = Executors.newCachedThreadPool();
List<ReadThread> readThreads = expectedFileRegions
.stream()
.map(fileRegion -> new ReadThread(fileRegion.getBlock(),
reader,
4000))
.collect(Collectors.toList());
List<? extends Future<?>> readFutures =
readThreads.stream()
.map(readThread -> executor.submit(readThread))
.collect(Collectors.toList());
List<? extends Future<?>> writeFutures = expectedFileRegions
.stream()
.map(fileRegion -> new WriteThread(fileRegion.getBlock(),
fileRegion.getProvidedStorageLocation(),
writer,
1000))
.map(writeThread -> executor.submit(writeThread))
.collect(Collectors.toList());
readFutures.stream()
.map(readFuture -> {
try {
return readFuture.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toList());
List<FileRegion> actualFileRegions = readThreads.stream()
.map(readThread -> readThread.getFileRegion().get())
.collect(Collectors.toList());
assertThat(actualFileRegions).containsExactlyInAnyOrder(
expectedFileRegions.toArray(new FileRegion[0]));
}
}

View File

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common.blockaliasmap.impl;
import com.google.common.io.Files;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryLevelDBAliasMapServer;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.iq80.leveldb.DBException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
/**
* Tests the in-memory alias map with a mock level-db implementation.
*/
public class TestLevelDbMockAliasMapClient {
private InMemoryLevelDBAliasMapServer levelDBAliasMapServer;
private InMemoryLevelDBAliasMapClient inMemoryLevelDBAliasMapClient;
private File tempDir;
private Configuration conf;
private InMemoryAliasMap aliasMapMock;
@Before
public void setUp() throws IOException {
aliasMapMock = mock(InMemoryAliasMap.class);
levelDBAliasMapServer = new InMemoryLevelDBAliasMapServer(
config -> aliasMapMock);
conf = new Configuration();
int port = 9877;
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS,
"localhost:" + port);
tempDir = Files.createTempDir();
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR,
tempDir.getAbsolutePath());
inMemoryLevelDBAliasMapClient = new InMemoryLevelDBAliasMapClient();
inMemoryLevelDBAliasMapClient.setConf(conf);
levelDBAliasMapServer.setConf(conf);
levelDBAliasMapServer.start();
}
@After
public void tearDown() throws IOException {
levelDBAliasMapServer.close();
inMemoryLevelDBAliasMapClient.close();
FileUtils.deleteDirectory(tempDir);
}
@Test
public void readFailure() throws Exception {
Block block = new Block(42, 43, 44);
doThrow(new IOException())
.doThrow(new DBException())
.when(aliasMapMock)
.read(block);
assertThatExceptionOfType(IOException.class)
.isThrownBy(() ->
inMemoryLevelDBAliasMapClient.getReader(null).resolve(block));
assertThatExceptionOfType(IOException.class)
.isThrownBy(() ->
inMemoryLevelDBAliasMapClient.getReader(null).resolve(block));
}
@Test
public void writeFailure() throws IOException {
Block block = new Block(42, 43, 44);
byte[] nonce = new byte[0];
Path path = new Path("koekoek");
ProvidedStorageLocation providedStorageLocation =
new ProvidedStorageLocation(path, 45, 46, nonce);
doThrow(new IOException())
.when(aliasMapMock)
.write(block, providedStorageLocation);
assertThatExceptionOfType(IOException.class)
.isThrownBy(() ->
inMemoryLevelDBAliasMapClient.getWriter(null)
.store(new FileRegion(block, providedStorageLocation)));
assertThatExceptionOfType(IOException.class)
.isThrownBy(() ->
inMemoryLevelDBAliasMapClient.getWriter(null)
.store(new FileRegion(block, providedStorageLocation)));
}
}

View File

@ -43,6 +43,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.io.FileUtils;
@ -214,7 +215,8 @@ public class TestProvidedImpl {
}
@Override
public FileRegion resolve(Block ident) throws IOException {
public Optional<FileRegion> resolve(Block ident)
throws IOException {
return null;
}
};
@ -232,6 +234,11 @@ public class TestProvidedImpl {
public void refresh() throws IOException {
// do nothing!
}
@Override
public void close() throws IOException {
// do nothing
}
}
private static Storage.StorageDirectory createLocalStorageDirectory(

View File

@ -1336,7 +1336,6 @@
<artifactId>mssql-jdbc</artifactId>
<version>${mssql.version}</version>
</dependency>
<dependency>
<groupId>io.swagger</groupId>
<artifactId>swagger-annotations</artifactId>
@ -1352,7 +1351,12 @@
<artifactId>snakeyaml</artifactId>
<version>${snakeyaml.version}</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.8.0</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -66,6 +66,12 @@
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.8.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Optional;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.FileRegion;
@ -57,14 +58,14 @@ public class NullBlockAliasMap extends BlockAliasMap<FileRegion> {
}
@Override
public FileRegion resolve(Block ident) throws IOException {
public Optional<FileRegion> resolve(Block ident) throws IOException {
throw new UnsupportedOperationException();
}
};
}
@Override
public Writer<FileRegion> getWriter(Writer.Options opts) throws IOException {
public Writer getWriter(Writer.Options opts) throws IOException {
return new Writer<FileRegion>() {
@Override
public void store(FileRegion token) throws IOException {
@ -83,4 +84,8 @@ public class NullBlockAliasMap extends BlockAliasMap<FileRegion> {
// do nothing
}
@Override
public void close() throws IOException {
}
}

View File

@ -27,11 +27,13 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Files;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@ -39,6 +41,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.BlockMissingException;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
@ -48,6 +51,8 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryLevelDBAliasMapServer;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
@ -56,6 +61,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.ProvidedStorageMap;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.InMemoryLevelDBAliasMapClient;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@ -172,16 +178,16 @@ public class TestNameNodeProvidedImplementation {
void createImage(TreeWalk t, Path out,
Class<? extends BlockResolver> blockIdsClass) throws Exception {
createImage(t, out, blockIdsClass, "");
createImage(t, out, blockIdsClass, "", TextFileRegionAliasMap.class);
}
void createImage(TreeWalk t, Path out,
Class<? extends BlockResolver> blockIdsClass, String clusterID)
throws Exception {
Class<? extends BlockResolver> blockIdsClass, String clusterID,
Class<? extends BlockAliasMap> aliasMapClass) throws Exception {
ImageWriter.Options opts = ImageWriter.defaults();
opts.setConf(conf);
opts.output(out.toString())
.blocks(TextFileRegionAliasMap.class)
.blocks(aliasMapClass)
.blockIds(blockIdsClass)
.clusterID(clusterID);
try (ImageWriter w = new ImageWriter(opts)) {
@ -389,17 +395,8 @@ public class TestNameNodeProvidedImplementation {
return ret;
}
@Test(timeout=30000)
public void testBlockRead() throws Exception {
conf.setClass(ImageWriter.Options.UGI_CLASS,
FsUGIResolver.class, UGIResolver.class);
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
FixedBlockResolver.class);
startCluster(NNDIRPATH, 3,
new StorageType[] {StorageType.PROVIDED, StorageType.DISK}, null,
false);
private void verifyFileSystemContents() throws Exception {
FileSystem fs = cluster.getFileSystem();
Thread.sleep(2000);
int count = 0;
// read NN metadata, verify contents match
for (TreePath e : new FSTreeWalk(NAMEPATH, conf)) {
@ -683,7 +680,7 @@ public class TestNameNodeProvidedImplementation {
public void testSetClusterID() throws Exception {
String clusterID = "PROVIDED-CLUSTER";
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
FixedBlockResolver.class, clusterID);
FixedBlockResolver.class, clusterID, TextFileRegionAliasMap.class);
// 2 Datanodes, 1 PROVIDED and other DISK
startCluster(NNDIRPATH, 2, null,
new StorageType[][] {
@ -744,4 +741,42 @@ public class TestNameNodeProvidedImplementation {
verifyFileLocation(i, expectedLocations);
}
}
// This test will fail until there is a refactoring of the FileRegion
// (HDFS-12713).
@Test(expected=BlockMissingException.class)
public void testInMemoryAliasMap() throws Exception {
conf.setClass(ImageWriter.Options.UGI_CLASS,
FsUGIResolver.class, UGIResolver.class);
conf.setClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
InMemoryLevelDBAliasMapClient.class, BlockAliasMap.class);
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS,
"localhost:32445");
File tempDirectory =
Files.createTempDirectory("in-memory-alias-map").toFile();
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR,
tempDirectory.getAbsolutePath());
conf.setBoolean(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED, true);
InMemoryLevelDBAliasMapServer levelDBAliasMapServer =
new InMemoryLevelDBAliasMapServer(InMemoryAliasMap::init);
levelDBAliasMapServer.setConf(conf);
levelDBAliasMapServer.start();
createImage(new FSTreeWalk(NAMEPATH, conf),
NNDIRPATH,
FixedBlockResolver.class, "",
InMemoryLevelDBAliasMapClient.class);
levelDBAliasMapServer.close();
// start cluster with two datanodes,
// each with 1 PROVIDED volume and other DISK volume
startCluster(NNDIRPATH, 2,
new StorageType[] {StorageType.PROVIDED, StorageType.DISK},
null, false);
verifyFileSystemContents();
FileUtils.deleteDirectory(tempDirectory);
}
}