HDFS-10675. Datanode support to read from external stores.

This commit is contained in:
Virajith Jalaparti 2017-03-29 14:29:28 -07:00 committed by Chris Douglas
parent 44825f0960
commit b668eb9155
45 changed files with 2873 additions and 85 deletions

View File

@ -37,7 +37,8 @@ public enum StorageType {
RAM_DISK(true),
SSD(false),
DISK(false),
ARCHIVE(false);
ARCHIVE(false),
PROVIDED(false);
private final boolean isTransient;

View File

@ -285,7 +285,7 @@ public class TestCount {
// <----13---> <-------17------> <----13-----> <------17------->
" SSD_QUOTA REM_SSD_QUOTA DISK_QUOTA REM_DISK_QUOTA " +
// <----13---> <-------17------>
"ARCHIVE_QUOTA REM_ARCHIVE_QUOTA " +
"ARCHIVE_QUOTA REM_ARCHIVE_QUOTA PROVIDED_QUOTA REM_PROVIDED_QUOTA " +
"PATHNAME";
verify(out).println(withStorageTypeHeader);
verifyNoMoreInteractions(out);
@ -340,6 +340,7 @@ public class TestCount {
" SSD_QUOTA REM_SSD_QUOTA " +
" DISK_QUOTA REM_DISK_QUOTA " +
"ARCHIVE_QUOTA REM_ARCHIVE_QUOTA " +
"PROVIDED_QUOTA REM_PROVIDED_QUOTA " +
"PATHNAME";
verify(out).println(withStorageTypeHeader);
verifyNoMoreInteractions(out);

View File

@ -47,6 +47,10 @@ public final class HdfsConstants {
public static final String WARM_STORAGE_POLICY_NAME = "WARM";
public static final byte COLD_STORAGE_POLICY_ID = 2;
public static final String COLD_STORAGE_POLICY_NAME = "COLD";
// branch HDFS-9806 XXX temporary until HDFS-7076
public static final byte PROVIDED_STORAGE_POLICY_ID = 1;
public static final String PROVIDED_STORAGE_POLICY_NAME = "PROVIDED";
public static final int DEFAULT_DATA_SOCKET_SIZE = 0;

View File

@ -403,6 +403,8 @@ public class PBHelperClient {
return StorageTypeProto.ARCHIVE;
case RAM_DISK:
return StorageTypeProto.RAM_DISK;
case PROVIDED:
return StorageTypeProto.PROVIDED;
default:
throw new IllegalStateException(
"BUG: StorageType not found, type=" + type);
@ -419,6 +421,8 @@ public class PBHelperClient {
return StorageType.ARCHIVE;
case RAM_DISK:
return StorageType.RAM_DISK;
case PROVIDED:
return StorageType.PROVIDED;
default:
throw new IllegalStateException(
"BUG: StorageTypeProto not found, type=" + type);

View File

@ -205,6 +205,7 @@ enum StorageTypeProto {
SSD = 2;
ARCHIVE = 3;
RAM_DISK = 4;
PROVIDED = 5;
}
/**

View File

@ -328,6 +328,21 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.namenode.edits.asynclogging";
public static final boolean DFS_NAMENODE_EDITS_ASYNC_LOGGING_DEFAULT = true;
public static final String DFS_PROVIDER_CLASS = "dfs.provider.class";
public static final String DFS_PROVIDER_DF_CLASS = "dfs.provided.df.class";
public static final String DFS_PROVIDER_STORAGEUUID = "dfs.provided.storage.id";
public static final String DFS_PROVIDER_STORAGEUUID_DEFAULT = "DS-PROVIDED";
public static final String DFS_PROVIDER_BLK_FORMAT_CLASS = "dfs.provided.blockformat.class";
public static final String DFS_PROVIDED_BLOCK_MAP_DELIMITER = "dfs.provided.textprovider.delimiter";
public static final String DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT = ",";
public static final String DFS_PROVIDED_BLOCK_MAP_READ_PATH = "dfs.provided.textprovider.read.path";
public static final String DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT = "file:///tmp/blocks.csv";
public static final String DFS_PROVIDED_BLOCK_MAP_CODEC = "dfs.provided.textprovider.read.codec";
public static final String DFS_PROVIDED_BLOCK_MAP_WRITE_PATH = "dfs.provided.textprovider.write.path";
public static final String DFS_LIST_LIMIT = "dfs.ls.limit";
public static final int DFS_LIST_LIMIT_DEFAULT = 1000;
public static final String DFS_CONTENT_SUMMARY_LIMIT_KEY = "dfs.content-summary.limit";

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common;
import org.apache.hadoop.hdfs.protocol.Block;
/**
* Interface used to load provided blocks.
*/
public interface BlockAlias {
Block getBlock();
}

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common;
import java.io.Closeable;
import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.Block;
/**
* An abstract class used to read and write block maps for provided blocks.
*/
public abstract class BlockFormat<T extends BlockAlias> {
/**
* An abstract class that is used to read {@link BlockAlias}es
* for provided blocks.
*/
public static abstract class Reader<U extends BlockAlias>
implements Iterable<U>, Closeable {
/**
* reader options.
*/
public interface Options { }
public abstract U resolve(Block ident) throws IOException;
}
/**
* Returns the reader for the provided block map.
* @param opts reader options
* @return {@link Reader} to the block map.
* @throws IOException
*/
public abstract Reader<T> getReader(Reader.Options opts) throws IOException;
/**
* An abstract class used as a writer for the provided block map.
*/
public static abstract class Writer<U extends BlockAlias>
implements Closeable {
/**
* writer options.
*/
public interface Options { }
public abstract void store(U token) throws IOException;
}
/**
* Returns the writer for the provided block map.
* @param opts writer options.
* @return {@link Writer} to the block map.
* @throws IOException
*/
public abstract Writer<T> getWriter(Writer.Options opts) throws IOException;
/**
* Refresh based on the underlying block map.
* @throws IOException
*/
public abstract void refresh() throws IOException;
}

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
/**
* This class is used to represent provided blocks that are file regions,
* i.e., can be described using (path, offset, length).
*/
public class FileRegion implements BlockAlias {
private final Path path;
private final long offset;
private final long length;
private final long blockId;
private final String bpid;
private final long genStamp;
public FileRegion(long blockId, Path path, long offset,
long length, String bpid, long genStamp) {
this.path = path;
this.offset = offset;
this.length = length;
this.blockId = blockId;
this.bpid = bpid;
this.genStamp = genStamp;
}
public FileRegion(long blockId, Path path, long offset,
long length, String bpid) {
this(blockId, path, offset, length, bpid,
HdfsConstants.GRANDFATHER_GENERATION_STAMP);
}
public FileRegion(long blockId, Path path, long offset,
long length, long genStamp) {
this(blockId, path, offset, length, null, genStamp);
}
public FileRegion(long blockId, Path path, long offset, long length) {
this(blockId, path, offset, length, null);
}
@Override
public Block getBlock() {
return new Block(blockId, length, genStamp);
}
@Override
public boolean equals(Object other) {
if (!(other instanceof FileRegion)) {
return false;
}
FileRegion o = (FileRegion) other;
return blockId == o.blockId
&& offset == o.offset
&& length == o.length
&& genStamp == o.genStamp
&& path.equals(o.path);
}
@Override
public int hashCode() {
return (int)(blockId & Integer.MIN_VALUE);
}
public Path getPath() {
return path;
}
public long getOffset() {
return offset;
}
public long getLength() {
return length;
}
public long getGenerationStamp() {
return genStamp;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{ block=\"").append(getBlock()).append("\"");
sb.append(", path=\"").append(getPath()).append("\"");
sb.append(", off=\"").append(getOffset()).append("\"");
sb.append(", len=\"").append(getBlock().getNumBytes()).append("\"");
sb.append(", genStamp=\"").append(getBlock()
.getGenerationStamp()).append("\"");
sb.append(", bpid=\"").append(bpid).append("\"");
sb.append(" }");
return sb.toString();
}
public String getBlockPoolId() {
return this.bpid;
}
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
/**
* This class is a stub for reading file regions from the block map.
*/
public class FileRegionProvider implements Iterable<FileRegion> {
@Override
public Iterator<FileRegion> iterator() {
return Collections.emptyListIterator();
}
public void refresh() throws IOException {
return;
}
}

View File

@ -40,6 +40,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
@ -196,7 +197,10 @@ public abstract class Storage extends StorageInfo {
Iterator<StorageDirectory> it =
(dirType == null) ? dirIterator() : dirIterator(dirType);
for ( ;it.hasNext(); ) {
list.add(new File(it.next().getCurrentDir(), fileName));
File currentDir = it.next().getCurrentDir();
if (currentDir != null) {
list.add(new File(currentDir, fileName));
}
}
return list;
}
@ -328,10 +332,20 @@ public abstract class Storage extends StorageInfo {
*/
public StorageDirectory(String bpid, StorageDirType dirType,
boolean isShared, StorageLocation location) {
this(new File(location.getBpURI(bpid, STORAGE_DIR_CURRENT)), dirType,
this(getBlockPoolCurrentDir(bpid, location), dirType,
isShared, location);
}
private static File getBlockPoolCurrentDir(String bpid,
StorageLocation location) {
if (location == null ||
location.getStorageType() == StorageType.PROVIDED) {
return null;
} else {
return new File(location.getBpURI(bpid, STORAGE_DIR_CURRENT));
}
}
private StorageDirectory(File dir, StorageDirType dirType,
boolean isShared, StorageLocation location) {
this.root = dir;
@ -347,7 +361,8 @@ public abstract class Storage extends StorageInfo {
}
private static File getStorageLocationFile(StorageLocation location) {
if (location == null) {
if (location == null ||
location.getStorageType() == StorageType.PROVIDED) {
return null;
}
try {
@ -406,6 +421,10 @@ public abstract class Storage extends StorageInfo {
*/
public void clearDirectory() throws IOException {
File curDir = this.getCurrentDir();
if (curDir == null) {
//if the directory is null, there is nothing to do.
return;
}
if (curDir.exists()) {
File[] files = FileUtil.listFiles(curDir);
LOG.info("Will remove files: " + Arrays.toString(files));
@ -423,6 +442,9 @@ public abstract class Storage extends StorageInfo {
* @return the directory path
*/
public File getCurrentDir() {
if (root == null) {
return null;
}
return new File(root, STORAGE_DIR_CURRENT);
}
@ -443,6 +465,9 @@ public abstract class Storage extends StorageInfo {
* @return the version file path
*/
public File getVersionFile() {
if (root == null) {
return null;
}
return new File(new File(root, STORAGE_DIR_CURRENT), STORAGE_FILE_VERSION);
}
@ -452,6 +477,9 @@ public abstract class Storage extends StorageInfo {
* @return the previous version file path
*/
public File getPreviousVersionFile() {
if (root == null) {
return null;
}
return new File(new File(root, STORAGE_DIR_PREVIOUS), STORAGE_FILE_VERSION);
}
@ -462,6 +490,9 @@ public abstract class Storage extends StorageInfo {
* @return the directory path
*/
public File getPreviousDir() {
if (root == null) {
return null;
}
return new File(root, STORAGE_DIR_PREVIOUS);
}
@ -476,6 +507,9 @@ public abstract class Storage extends StorageInfo {
* @return the directory path
*/
public File getPreviousTmp() {
if (root == null) {
return null;
}
return new File(root, STORAGE_TMP_PREVIOUS);
}
@ -490,6 +524,9 @@ public abstract class Storage extends StorageInfo {
* @return the directory path
*/
public File getRemovedTmp() {
if (root == null) {
return null;
}
return new File(root, STORAGE_TMP_REMOVED);
}
@ -503,6 +540,9 @@ public abstract class Storage extends StorageInfo {
* @return the directory path
*/
public File getFinalizedTmp() {
if (root == null) {
return null;
}
return new File(root, STORAGE_TMP_FINALIZED);
}
@ -517,6 +557,9 @@ public abstract class Storage extends StorageInfo {
* @return the directory path
*/
public File getLastCheckpointTmp() {
if (root == null) {
return null;
}
return new File(root, STORAGE_TMP_LAST_CKPT);
}
@ -530,6 +573,9 @@ public abstract class Storage extends StorageInfo {
* @return the directory path
*/
public File getPreviousCheckpoint() {
if (root == null) {
return null;
}
return new File(root, STORAGE_PREVIOUS_CKPT);
}
@ -543,7 +589,7 @@ public abstract class Storage extends StorageInfo {
private void checkEmptyCurrent() throws InconsistentFSStateException,
IOException {
File currentDir = getCurrentDir();
if(!currentDir.exists()) {
if(currentDir == null || !currentDir.exists()) {
// if current/ does not exist, it's safe to format it.
return;
}
@ -589,6 +635,13 @@ public abstract class Storage extends StorageInfo {
public StorageState analyzeStorage(StartupOption startOpt, Storage storage,
boolean checkCurrentIsEmpty)
throws IOException {
if (location != null &&
location.getStorageType() == StorageType.PROVIDED) {
//currently we assume that PROVIDED storages are always NORMAL
return StorageState.NORMAL;
}
assert root != null : "root is null";
boolean hadMkdirs = false;
String rootPath = root.getCanonicalPath();
@ -710,6 +763,10 @@ public abstract class Storage extends StorageInfo {
*/
public void doRecover(StorageState curState) throws IOException {
File curDir = getCurrentDir();
if (curDir == null || root == null) {
//at this point, we do not support recovery on PROVIDED storages
return;
}
String rootPath = root.getCanonicalPath();
switch(curState) {
case COMPLETE_UPGRADE: // mv previous.tmp -> previous
@ -883,7 +940,8 @@ public abstract class Storage extends StorageInfo {
@Override
public String toString() {
return "Storage Directory " + this.root;
return "Storage Directory root= " + this.root +
"; location= " + this.location;
}
/**
@ -1153,6 +1211,9 @@ public abstract class Storage extends StorageInfo {
}
public void writeProperties(File to, StorageDirectory sd) throws IOException {
if (to == null) {
return;
}
Properties props = new Properties();
setPropertiesFromFields(props, sd);
writeProperties(to, props);

View File

@ -152,6 +152,9 @@ public class StorageInfo {
*/
protected void setFieldsFromProperties(
Properties props, StorageDirectory sd) throws IOException {
if (props == null) {
return;
}
setLayoutVersion(props, sd);
setNamespaceID(props, sd);
setcTime(props, sd);
@ -241,6 +244,9 @@ public class StorageInfo {
}
public static Properties readPropertiesFile(File from) throws IOException {
if (from == null) {
return null;
}
RandomAccessFile file = new RandomAccessFile(from, "rws");
FileInputStream in = null;
Properties props = new Properties();

View File

@ -0,0 +1,442 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common;
import java.io.File;
import java.io.IOException;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.NoSuchElementException;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* This class is used for block maps stored as text files,
* with a specified delimiter.
*/
public class TextFileRegionFormat
extends BlockFormat<FileRegion> implements Configurable {
private Configuration conf;
private ReaderOptions readerOpts = TextReader.defaults();
private WriterOptions writerOpts = TextWriter.defaults();
public static final Logger LOG =
LoggerFactory.getLogger(TextFileRegionFormat.class);
@Override
public void setConf(Configuration conf) {
readerOpts.setConf(conf);
writerOpts.setConf(conf);
this.conf = conf;
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public Reader<FileRegion> getReader(Reader.Options opts)
throws IOException {
if (null == opts) {
opts = readerOpts;
}
if (!(opts instanceof ReaderOptions)) {
throw new IllegalArgumentException("Invalid options " + opts.getClass());
}
ReaderOptions o = (ReaderOptions) opts;
Configuration readerConf = (null == o.getConf())
? new Configuration()
: o.getConf();
return createReader(o.file, o.delim, readerConf);
}
@VisibleForTesting
TextReader createReader(Path file, String delim, Configuration cfg)
throws IOException {
FileSystem fs = file.getFileSystem(cfg);
if (fs instanceof LocalFileSystem) {
fs = ((LocalFileSystem)fs).getRaw();
}
CompressionCodecFactory factory = new CompressionCodecFactory(cfg);
CompressionCodec codec = factory.getCodec(file);
return new TextReader(fs, file, codec, delim);
}
@Override
public Writer<FileRegion> getWriter(Writer.Options opts) throws IOException {
if (null == opts) {
opts = writerOpts;
}
if (!(opts instanceof WriterOptions)) {
throw new IllegalArgumentException("Invalid options " + opts.getClass());
}
WriterOptions o = (WriterOptions) opts;
Configuration cfg = (null == o.getConf())
? new Configuration()
: o.getConf();
if (o.codec != null) {
CompressionCodecFactory factory = new CompressionCodecFactory(cfg);
CompressionCodec codec = factory.getCodecByName(o.codec);
String name = o.file.getName() + codec.getDefaultExtension();
o.filename(new Path(o.file.getParent(), name));
return createWriter(o.file, codec, o.delim, cfg);
}
return createWriter(o.file, null, o.delim, conf);
}
@VisibleForTesting
TextWriter createWriter(Path file, CompressionCodec codec, String delim,
Configuration cfg) throws IOException {
FileSystem fs = file.getFileSystem(cfg);
if (fs instanceof LocalFileSystem) {
fs = ((LocalFileSystem)fs).getRaw();
}
OutputStream tmp = fs.create(file);
java.io.Writer out = new BufferedWriter(new OutputStreamWriter(
(null == codec) ? tmp : codec.createOutputStream(tmp), "UTF-8"));
return new TextWriter(out, delim);
}
/**
* Class specifying reader options for the {@link TextFileRegionFormat}.
*/
public static class ReaderOptions
implements TextReader.Options, Configurable {
private Configuration conf;
private String delim =
DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT;
private Path file = new Path(
new File(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT)
.toURI().toString());
@Override
public void setConf(Configuration conf) {
this.conf = conf;
String tmpfile = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_READ_PATH,
DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT);
file = new Path(tmpfile);
delim = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER,
DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT);
LOG.info("TextFileRegionFormat: read path " + tmpfile.toString());
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public ReaderOptions filename(Path file) {
this.file = file;
return this;
}
@Override
public ReaderOptions delimiter(String delim) {
this.delim = delim;
return this;
}
}
/**
* Class specifying writer options for the {@link TextFileRegionFormat}.
*/
public static class WriterOptions
implements TextWriter.Options, Configurable {
private Configuration conf;
private String codec = null;
private Path file =
new Path(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT);
private String delim =
DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT;
@Override
public void setConf(Configuration conf) {
this.conf = conf;
String tmpfile = conf.get(
DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_WRITE_PATH, file.toString());
file = new Path(tmpfile);
codec = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_CODEC);
delim = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER,
DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT);
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public WriterOptions filename(Path file) {
this.file = file;
return this;
}
public String getCodec() {
return codec;
}
public Path getFile() {
return file;
}
@Override
public WriterOptions codec(String codec) {
this.codec = codec;
return this;
}
@Override
public WriterOptions delimiter(String delim) {
this.delim = delim;
return this;
}
}
/**
* This class is used as a reader for block maps which
* are stored as delimited text files.
*/
public static class TextReader extends Reader<FileRegion> {
/**
* Options for {@link TextReader}.
*/
public interface Options extends Reader.Options {
Options filename(Path file);
Options delimiter(String delim);
}
static ReaderOptions defaults() {
return new ReaderOptions();
}
private final Path file;
private final String delim;
private final FileSystem fs;
private final CompressionCodec codec;
private final Map<FRIterator, BufferedReader> iterators;
protected TextReader(FileSystem fs, Path file, CompressionCodec codec,
String delim) {
this(fs, file, codec, delim,
new IdentityHashMap<FRIterator, BufferedReader>());
}
TextReader(FileSystem fs, Path file, CompressionCodec codec, String delim,
Map<FRIterator, BufferedReader> iterators) {
this.fs = fs;
this.file = file;
this.codec = codec;
this.delim = delim;
this.iterators = Collections.synchronizedMap(iterators);
}
@Override
public FileRegion resolve(Block ident) throws IOException {
// consider layering index w/ composable format
Iterator<FileRegion> i = iterator();
try {
while (i.hasNext()) {
FileRegion f = i.next();
if (f.getBlock().equals(ident)) {
return f;
}
}
} finally {
BufferedReader r = iterators.remove(i);
if (r != null) {
// null on last element
r.close();
}
}
return null;
}
class FRIterator implements Iterator<FileRegion> {
private FileRegion pending;
@Override
public boolean hasNext() {
return pending != null;
}
@Override
public FileRegion next() {
if (null == pending) {
throw new NoSuchElementException();
}
FileRegion ret = pending;
try {
pending = nextInternal(this);
} catch (IOException e) {
throw new RuntimeException(e);
}
return ret;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
private FileRegion nextInternal(Iterator<FileRegion> i) throws IOException {
BufferedReader r = iterators.get(i);
if (null == r) {
throw new IllegalStateException();
}
String line = r.readLine();
if (null == line) {
iterators.remove(i);
return null;
}
String[] f = line.split(delim);
if (f.length != 6) {
throw new IOException("Invalid line: " + line);
}
return new FileRegion(Long.parseLong(f[0]), new Path(f[1]),
Long.parseLong(f[2]), Long.parseLong(f[3]), f[5],
Long.parseLong(f[4]));
}
public InputStream createStream() throws IOException {
InputStream i = fs.open(file);
if (codec != null) {
i = codec.createInputStream(i);
}
return i;
}
@Override
public Iterator<FileRegion> iterator() {
FRIterator i = new FRIterator();
try {
BufferedReader r =
new BufferedReader(new InputStreamReader(createStream(), "UTF-8"));
iterators.put(i, r);
i.pending = nextInternal(i);
} catch (IOException e) {
iterators.remove(i);
throw new RuntimeException(e);
}
return i;
}
@Override
public void close() throws IOException {
ArrayList<IOException> ex = new ArrayList<>();
synchronized (iterators) {
for (Iterator<BufferedReader> i = iterators.values().iterator();
i.hasNext();) {
try {
BufferedReader r = i.next();
r.close();
} catch (IOException e) {
ex.add(e);
} finally {
i.remove();
}
}
iterators.clear();
}
if (!ex.isEmpty()) {
throw MultipleIOException.createIOException(ex);
}
}
}
/**
* This class is used as a writer for block maps which
* are stored as delimited text files.
*/
public static class TextWriter extends Writer<FileRegion> {
/**
* Interface for Writer options.
*/
public interface Options extends Writer.Options {
Options codec(String codec);
Options filename(Path file);
Options delimiter(String delim);
}
public static WriterOptions defaults() {
return new WriterOptions();
}
private final String delim;
private final java.io.Writer out;
public TextWriter(java.io.Writer out, String delim) {
this.out = out;
this.delim = delim;
}
@Override
public void store(FileRegion token) throws IOException {
out.append(String.valueOf(token.getBlock().getBlockId())).append(delim);
out.append(token.getPath().toString()).append(delim);
out.append(Long.toString(token.getOffset())).append(delim);
out.append(Long.toString(token.getLength())).append(delim);
out.append(Long.toString(token.getGenerationStamp())).append(delim);
out.append(token.getBlockPoolId()).append("\n");
}
@Override
public void close() throws IOException {
out.close();
}
}
@Override
public void refresh() throws IOException {
//nothing to do;
}
}

View File

@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.util.ReflectionUtils;
/**
* This class is used to read file regions from block maps
* specified using delimited text.
*/
public class TextFileRegionProvider
extends FileRegionProvider implements Configurable {
private Configuration conf;
private BlockFormat<FileRegion> fmt;
@SuppressWarnings("unchecked")
@Override
public void setConf(Configuration conf) {
fmt = ReflectionUtils.newInstance(
conf.getClass(DFSConfigKeys.DFS_PROVIDER_BLK_FORMAT_CLASS,
TextFileRegionFormat.class,
BlockFormat.class),
conf);
((Configurable)fmt).setConf(conf); //redundant?
this.conf = conf;
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public Iterator<FileRegion> iterator() {
try {
final BlockFormat.Reader<FileRegion> r = fmt.getReader(null);
return new Iterator<FileRegion>() {
private final Iterator<FileRegion> inner = r.iterator();
@Override
public boolean hasNext() {
return inner.hasNext();
}
@Override
public FileRegion next() {
return inner.next();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
} catch (IOException e) {
throw new RuntimeException("Failed to read provided blocks", e);
}
}
@Override
public void refresh() throws IOException {
fmt.refresh();
}
}

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
@ -360,6 +361,9 @@ public class BlockPoolSliceStorage extends Storage {
private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo,
StartupOption startOpt, List<Callable<StorageDirectory>> callables,
Configuration conf) throws IOException {
if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED) {
return false; // regular startup for PROVIDED storage directories
}
if (startOpt == StartupOption.ROLLBACK && sd.getPreviousDir().exists()) {
Preconditions.checkState(!getTrashRootDir(sd).exists(),
sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " +
@ -439,6 +443,10 @@ public class BlockPoolSliceStorage extends Storage {
LayoutVersion.Feature.FEDERATION, layoutVersion)) {
return;
}
//no upgrades for storage directories that are PROVIDED
if (bpSd.getRoot() == null) {
return;
}
final int oldLV = getLayoutVersion();
LOG.info("Upgrading block pool storage directory " + bpSd.getRoot()
+ ".\n old LV = " + oldLV
@ -589,8 +597,9 @@ public class BlockPoolSliceStorage extends Storage {
throws IOException {
File prevDir = bpSd.getPreviousDir();
// regular startup if previous dir does not exist
if (!prevDir.exists())
if (prevDir == null || !prevDir.exists()) {
return;
}
// read attributes out of the VERSION file of previous directory
BlockPoolSliceStorage prevInfo = new BlockPoolSliceStorage();
prevInfo.readPreviousVersionProperties(bpSd);
@ -631,6 +640,10 @@ public class BlockPoolSliceStorage extends Storage {
* that holds the snapshot.
*/
void doFinalize(File dnCurDir) throws IOException {
LOG.info("doFinalize: " + dnCurDir);
if (dnCurDir == null) {
return; //we do nothing if the directory is null
}
File bpRoot = getBpRoot(blockpoolID, dnCurDir);
StorageDirectory bpSd = new StorageDirectory(bpRoot);
// block pool level previous directory
@ -841,6 +854,9 @@ public class BlockPoolSliceStorage extends Storage {
public void setRollingUpgradeMarkers(List<StorageDirectory> dnStorageDirs)
throws IOException {
for (StorageDirectory sd : dnStorageDirs) {
if (sd.getCurrentDir() == null) {
return;
}
File bpRoot = getBpRoot(blockpoolID, sd.getCurrentDir());
File markerFile = new File(bpRoot, ROLLING_UPGRADE_MARKER_FILE);
if (!storagesWithRollingUpgradeMarker.contains(bpRoot.toString())) {
@ -863,6 +879,9 @@ public class BlockPoolSliceStorage extends Storage {
public void clearRollingUpgradeMarkers(List<StorageDirectory> dnStorageDirs)
throws IOException {
for (StorageDirectory sd : dnStorageDirs) {
if (sd.getCurrentDir() == null) {
continue;
}
File bpRoot = getBpRoot(blockpoolID, sd.getCurrentDir());
File markerFile = new File(bpRoot, ROLLING_UPGRADE_MARKER_FILE);
if (!storagesWithoutRollingUpgradeMarker.contains(bpRoot.toString())) {

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.Block;
@ -129,22 +130,31 @@ public class DataStorage extends Storage {
this.datanodeUuid = newDatanodeUuid;
}
private static boolean createStorageID(StorageDirectory sd, int lv) {
private static boolean createStorageID(StorageDirectory sd, int lv,
Configuration conf) {
// Clusters previously upgraded from layout versions earlier than
// ADD_DATANODE_AND_STORAGE_UUIDS failed to correctly generate a
// new storage ID. We check for that and fix it now.
final boolean haveValidStorageId = DataNodeLayoutVersion.supports(
LayoutVersion.Feature.ADD_DATANODE_AND_STORAGE_UUIDS, lv)
&& DatanodeStorage.isValidStorageId(sd.getStorageUuid());
return createStorageID(sd, !haveValidStorageId);
return createStorageID(sd, !haveValidStorageId, conf);
}
/** Create an ID for this storage.
* @return true if a new storage ID was generated.
* */
public static boolean createStorageID(
StorageDirectory sd, boolean regenerateStorageIds) {
StorageDirectory sd, boolean regenerateStorageIds, Configuration conf) {
final String oldStorageID = sd.getStorageUuid();
if (sd.getStorageLocation() != null &&
sd.getStorageLocation().getStorageType() == StorageType.PROVIDED) {
// We only support one provided storage per datanode for now.
// TODO support multiple provided storage ids per datanode.
sd.setStorageUuid(conf.get(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID,
DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT));
return false;
}
if (oldStorageID == null || regenerateStorageIds) {
sd.setStorageUuid(DatanodeStorage.generateUuid());
LOG.info("Generated new storageID " + sd.getStorageUuid() +
@ -273,7 +283,7 @@ public class DataStorage extends Storage {
LOG.info("Storage directory with location " + location
+ " is not formatted for namespace " + nsInfo.getNamespaceID()
+ ". Formatting...");
format(sd, nsInfo, datanode.getDatanodeUuid());
format(sd, nsInfo, datanode.getDatanodeUuid(), datanode.getConf());
break;
default: // recovery part is common
sd.doRecover(curState);
@ -547,15 +557,15 @@ public class DataStorage extends Storage {
}
void format(StorageDirectory sd, NamespaceInfo nsInfo,
String datanodeUuid) throws IOException {
String newDatanodeUuid, Configuration conf) throws IOException {
sd.clearDirectory(); // create directory
this.layoutVersion = HdfsServerConstants.DATANODE_LAYOUT_VERSION;
this.clusterID = nsInfo.getClusterID();
this.namespaceID = nsInfo.getNamespaceID();
this.cTime = 0;
setDatanodeUuid(datanodeUuid);
setDatanodeUuid(newDatanodeUuid);
createStorageID(sd, false);
createStorageID(sd, false, conf);
writeProperties(sd);
}
@ -600,6 +610,9 @@ public class DataStorage extends Storage {
private void setFieldsFromProperties(Properties props, StorageDirectory sd,
boolean overrideLayoutVersion, int toLayoutVersion) throws IOException {
if (props == null) {
return;
}
if (overrideLayoutVersion) {
this.layoutVersion = toLayoutVersion;
} else {
@ -694,6 +707,10 @@ public class DataStorage extends Storage {
private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo,
StartupOption startOpt, List<Callable<StorageDirectory>> callables,
Configuration conf) throws IOException {
if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED) {
createStorageID(sd, layoutVersion, conf);
return false; // regular start up for PROVIDED storage directories
}
if (startOpt == StartupOption.ROLLBACK) {
doRollback(sd, nsInfo); // rollback if applicable
}
@ -724,7 +741,7 @@ public class DataStorage extends Storage {
// regular start up.
if (this.layoutVersion == HdfsServerConstants.DATANODE_LAYOUT_VERSION) {
createStorageID(sd, layoutVersion);
createStorageID(sd, layoutVersion, conf);
return false; // need to write properties
}
@ -733,7 +750,7 @@ public class DataStorage extends Storage {
if (federationSupported) {
// If the existing on-disk layout version supports federation,
// simply update the properties.
upgradeProperties(sd);
upgradeProperties(sd, conf);
} else {
doUpgradePreFederation(sd, nsInfo, callables, conf);
}
@ -829,15 +846,16 @@ public class DataStorage extends Storage {
// 4. Write version file under <SD>/current
clusterID = nsInfo.getClusterID();
upgradeProperties(sd);
upgradeProperties(sd, conf);
// 5. Rename <SD>/previous.tmp to <SD>/previous
rename(tmpDir, prevDir);
LOG.info("Upgrade of " + sd.getRoot()+ " is complete");
}
void upgradeProperties(StorageDirectory sd) throws IOException {
createStorageID(sd, layoutVersion);
void upgradeProperties(StorageDirectory sd, Configuration conf)
throws IOException {
createStorageID(sd, layoutVersion, conf);
LOG.info("Updating layout version from " + layoutVersion
+ " to " + HdfsServerConstants.DATANODE_LAYOUT_VERSION
+ " for storage " + sd.getRoot());
@ -989,7 +1007,7 @@ public class DataStorage extends Storage {
// then finalize it. Else finalize the corresponding BP.
for (StorageDirectory sd : getStorageDirs()) {
File prevDir = sd.getPreviousDir();
if (prevDir.exists()) {
if (prevDir != null && prevDir.exists()) {
// data node level storage finalize
doFinalize(sd);
} else {

View File

@ -44,6 +44,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@ -105,7 +106,7 @@ public class DirectoryScanner implements Runnable {
* @param b whether to retain diffs
*/
@VisibleForTesting
void setRetainDiffs(boolean b) {
public void setRetainDiffs(boolean b) {
retainDiffs = b;
}
@ -215,7 +216,8 @@ public class DirectoryScanner implements Runnable {
* @param dataset the dataset to scan
* @param conf the Configuration object
*/
DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset, Configuration conf) {
public DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset,
Configuration conf) {
this.datanode = datanode;
this.dataset = dataset;
int interval = (int) conf.getTimeDuration(
@ -369,15 +371,14 @@ public class DirectoryScanner implements Runnable {
* Reconcile differences between disk and in-memory blocks
*/
@VisibleForTesting
void reconcile() throws IOException {
public void reconcile() throws IOException {
scan();
for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) {
String bpid = entry.getKey();
LinkedList<ScanInfo> diff = entry.getValue();
for (ScanInfo info : diff) {
dataset.checkAndUpdate(bpid, info.getBlockId(), info.getBlockFile(),
info.getMetaFile(), info.getVolume());
dataset.checkAndUpdate(bpid, info);
}
}
if (!retainDiffs) clear();
@ -429,11 +430,12 @@ public class DirectoryScanner implements Runnable {
}
// Block file and/or metadata file exists on the disk
// Block exists in memory
if (info.getBlockFile() == null) {
if (info.getVolume().getStorageType() != StorageType.PROVIDED &&
info.getBlockFile() == null) {
// Block metadata file exits and block file is missing
addDifference(diffRecord, statsRecord, info);
} else if (info.getGenStamp() != memBlock.getGenerationStamp()
|| info.getBlockFileLength() != memBlock.getNumBytes()) {
|| info.getBlockLength() != memBlock.getNumBytes()) {
// Block metadata file is missing or has wrong generation stamp,
// or block file length is different than expected
statsRecord.mismatchBlocks++;
@ -611,6 +613,9 @@ public class DirectoryScanner implements Runnable {
for (String bpid : bpList) {
LinkedList<ScanInfo> report = new LinkedList<>();
perfTimer.reset().start();
throttleTimer.reset().start();
try {
result.put(bpid, volume.compileReport(bpid, report, this));
} catch (InterruptedException ex) {

View File

@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
/**
* This class is used for provided replicas that are finalized.
*/
public class FinalizedProvidedReplica extends ProvidedReplica {
public FinalizedProvidedReplica(long blockId, URI fileURI,
long fileOffset, long blockLen, long genStamp,
FsVolumeSpi volume, Configuration conf) {
super(blockId, fileURI, fileOffset, blockLen, genStamp, volume, conf);
}
@Override
public ReplicaState getState() {
return ReplicaState.FINALIZED;
}
@Override
public long getBytesOnDisk() {
return getNumBytes();
}
@Override
public long getVisibleLength() {
return getNumBytes(); //all bytes are visible
}
@Override // Object
public boolean equals(Object o) {
return super.equals(o);
}
@Override // Object
public int hashCode() {
return super.hashCode();
}
@Override
public String toString() {
return super.toString();
}
@Override
public ReplicaInfo getOriginalReplica() {
throw new UnsupportedOperationException("Replica of type " + getState() +
" does not support getOriginalReplica");
}
@Override
public long getRecoveryID() {
throw new UnsupportedOperationException("Replica of type " + getState() +
" does not support getRecoveryID");
}
@Override
public void setRecoveryID(long recoveryId) {
throw new UnsupportedOperationException("Replica of type " + getState() +
" does not support setRecoveryID");
}
@Override
public ReplicaRecoveryInfo createInfo() {
throw new UnsupportedOperationException("Replica of type " + getState() +
" does not support createInfo");
}
}

View File

@ -0,0 +1,248 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This abstract class is used as a base class for provided replicas.
*/
public abstract class ProvidedReplica extends ReplicaInfo {
public static final Logger LOG =
LoggerFactory.getLogger(ProvidedReplica.class);
// Null checksum information for provided replicas.
// Shared across all replicas.
static final byte[] NULL_CHECKSUM_ARRAY =
FsDatasetUtil.createNullChecksumByteArray();
private URI fileURI;
private long fileOffset;
private Configuration conf;
private FileSystem remoteFS;
/**
* Constructor.
* @param blockId block id
* @param fileURI remote URI this block is to be read from
* @param fileOffset the offset in the remote URI
* @param blockLen the length of the block
* @param genStamp the generation stamp of the block
* @param volume the volume this block belongs to
*/
public ProvidedReplica(long blockId, URI fileURI, long fileOffset,
long blockLen, long genStamp, FsVolumeSpi volume, Configuration conf) {
super(volume, blockId, blockLen, genStamp);
this.fileURI = fileURI;
this.fileOffset = fileOffset;
this.conf = conf;
try {
this.remoteFS = FileSystem.get(fileURI, this.conf);
} catch (IOException e) {
LOG.warn("Failed to obtain filesystem for " + fileURI);
this.remoteFS = null;
}
}
public ProvidedReplica(ProvidedReplica r) {
super(r);
this.fileURI = r.fileURI;
this.fileOffset = r.fileOffset;
this.conf = r.conf;
try {
this.remoteFS = FileSystem.newInstance(fileURI, this.conf);
} catch (IOException e) {
this.remoteFS = null;
}
}
@Override
public URI getBlockURI() {
return this.fileURI;
}
@Override
public InputStream getDataInputStream(long seekOffset) throws IOException {
if (remoteFS != null) {
FSDataInputStream ins = remoteFS.open(new Path(fileURI));
ins.seek(fileOffset + seekOffset);
return new FSDataInputStream(ins);
} else {
throw new IOException("Remote filesystem for provided replica " + this +
" does not exist");
}
}
@Override
public OutputStream getDataOutputStream(boolean append) throws IOException {
throw new UnsupportedOperationException(
"OutputDataStream is not implemented for ProvidedReplica");
}
@Override
public URI getMetadataURI() {
return null;
}
@Override
public OutputStream getMetadataOutputStream(boolean append)
throws IOException {
return null;
}
@Override
public boolean blockDataExists() {
if(remoteFS != null) {
try {
return remoteFS.exists(new Path(fileURI));
} catch (IOException e) {
return false;
}
} else {
return false;
}
}
@Override
public boolean deleteBlockData() {
throw new UnsupportedOperationException(
"ProvidedReplica does not support deleting block data");
}
@Override
public long getBlockDataLength() {
return this.getNumBytes();
}
@Override
public LengthInputStream getMetadataInputStream(long offset)
throws IOException {
return new LengthInputStream(new ByteArrayInputStream(NULL_CHECKSUM_ARRAY),
NULL_CHECKSUM_ARRAY.length);
}
@Override
public boolean metadataExists() {
return NULL_CHECKSUM_ARRAY == null ? false : true;
}
@Override
public boolean deleteMetadata() {
throw new UnsupportedOperationException(
"ProvidedReplica does not support deleting metadata");
}
@Override
public long getMetadataLength() {
return NULL_CHECKSUM_ARRAY == null ? 0 : NULL_CHECKSUM_ARRAY.length;
}
@Override
public boolean renameMeta(URI destURI) throws IOException {
throw new UnsupportedOperationException(
"ProvidedReplica does not support renaming metadata");
}
@Override
public boolean renameData(URI destURI) throws IOException {
throw new UnsupportedOperationException(
"ProvidedReplica does not support renaming data");
}
@Override
public boolean getPinning(LocalFileSystem localFS) throws IOException {
return false;
}
@Override
public void setPinning(LocalFileSystem localFS) throws IOException {
throw new UnsupportedOperationException(
"ProvidedReplica does not support pinning");
}
@Override
public void bumpReplicaGS(long newGS) throws IOException {
throw new UnsupportedOperationException(
"ProvidedReplica does not yet support writes");
}
@Override
public boolean breakHardLinksIfNeeded() throws IOException {
return false;
}
@Override
public ReplicaRecoveryInfo createInfo()
throws UnsupportedOperationException {
throw new UnsupportedOperationException(
"ProvidedReplica does not yet support writes");
}
@Override
public int compareWith(ScanInfo info) {
//local scanning cannot find any provided blocks.
if (info.getFileRegion().equals(
new FileRegion(this.getBlockId(), new Path(fileURI),
fileOffset, this.getNumBytes(), this.getGenerationStamp()))) {
return 0;
} else {
return (int) (info.getBlockLength() - getNumBytes());
}
}
@Override
public void truncateBlock(long newLength) throws IOException {
throw new UnsupportedOperationException(
"ProvidedReplica does not yet support truncate");
}
@Override
public void updateWithReplica(StorageLocation replicaLocation) {
throw new UnsupportedOperationException(
"ProvidedReplica does not yet support update");
}
@Override
public void copyMetadata(URI destination) throws IOException {
throw new UnsupportedOperationException(
"ProvidedReplica does not yet support copy metadata");
}
@Override
public void copyBlockdata(URI destination) throws IOException {
throw new UnsupportedOperationException(
"ProvidedReplica does not yet support copy data");
}
}

View File

@ -18,9 +18,13 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.File;
import java.net.URI;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
/**
@ -42,11 +46,20 @@ public class ReplicaBuilder {
private ReplicaInfo fromReplica;
private URI uri;
private long offset;
private Configuration conf;
private FileRegion fileRegion;
public ReplicaBuilder(ReplicaState state) {
volume = null;
writer = null;
block = null;
length = -1;
fileRegion = null;
conf = null;
fromReplica = null;
uri = null;
this.state = state;
}
@ -105,6 +118,26 @@ public class ReplicaBuilder {
return this;
}
public ReplicaBuilder setURI(URI uri) {
this.uri = uri;
return this;
}
public ReplicaBuilder setConf(Configuration conf) {
this.conf = conf;
return this;
}
public ReplicaBuilder setOffset(long offset) {
this.offset = offset;
return this;
}
public ReplicaBuilder setFileRegion(FileRegion fileRegion) {
this.fileRegion = fileRegion;
return this;
}
public LocalReplicaInPipeline buildLocalReplicaInPipeline()
throws IllegalArgumentException {
LocalReplicaInPipeline info = null;
@ -176,7 +209,7 @@ public class ReplicaBuilder {
}
}
private ReplicaInfo buildFinalizedReplica() throws IllegalArgumentException {
private LocalReplica buildFinalizedReplica() throws IllegalArgumentException {
if (null != fromReplica &&
fromReplica.getState() == ReplicaState.FINALIZED) {
return new FinalizedReplica((FinalizedReplica)fromReplica);
@ -193,7 +226,7 @@ public class ReplicaBuilder {
}
}
private ReplicaInfo buildRWR() throws IllegalArgumentException {
private LocalReplica buildRWR() throws IllegalArgumentException {
if (null != fromReplica && fromReplica.getState() == ReplicaState.RWR) {
return new ReplicaWaitingToBeRecovered(
@ -211,7 +244,7 @@ public class ReplicaBuilder {
}
}
private ReplicaInfo buildRUR() throws IllegalArgumentException {
private LocalReplica buildRUR() throws IllegalArgumentException {
if (null == fromReplica) {
throw new IllegalArgumentException(
"Missing a valid replica to recover from");
@ -228,8 +261,53 @@ public class ReplicaBuilder {
}
}
public ReplicaInfo build() throws IllegalArgumentException {
ReplicaInfo info = null;
private ProvidedReplica buildProvidedFinalizedReplica()
throws IllegalArgumentException {
ProvidedReplica info = null;
if (fromReplica != null) {
throw new IllegalArgumentException("Finalized PROVIDED replica " +
"cannot be constructed from another replica");
}
if (fileRegion == null && uri == null) {
throw new IllegalArgumentException(
"Trying to construct a provided replica on " + volume +
" without enough information");
}
if (fileRegion == null) {
info = new FinalizedProvidedReplica(blockId, uri, offset,
length, genStamp, volume, conf);
} else {
info = new FinalizedProvidedReplica(fileRegion.getBlock().getBlockId(),
fileRegion.getPath().toUri(),
fileRegion.getOffset(),
fileRegion.getBlock().getNumBytes(),
fileRegion.getBlock().getGenerationStamp(),
volume, conf);
}
return info;
}
private ProvidedReplica buildProvidedReplica()
throws IllegalArgumentException {
ProvidedReplica info = null;
switch(this.state) {
case FINALIZED:
info = buildProvidedFinalizedReplica();
break;
case RWR:
case RUR:
case RBW:
case TEMPORARY:
default:
throw new IllegalArgumentException("Unknown replica state " +
state + " for PROVIDED replica");
}
return info;
}
private LocalReplica buildLocalReplica()
throws IllegalArgumentException {
LocalReplica info = null;
switch(this.state) {
case FINALIZED:
info = buildFinalizedReplica();
@ -249,4 +327,16 @@ public class ReplicaBuilder {
}
return info;
}
public ReplicaInfo build() throws IllegalArgumentException {
ReplicaInfo info = null;
if(volume != null && volume.getStorageType() == StorageType.PROVIDED) {
info = buildProvidedReplica();
} else {
info = buildLocalReplica();
}
return info;
}
}

View File

@ -49,6 +49,17 @@ abstract public class ReplicaInfo extends Block
private static final FileIoProvider DEFAULT_FILE_IO_PROVIDER =
new FileIoProvider(null, null);
/**
* Constructor.
* @param block a block
* @param vol volume where replica is located
* @param dir directory path where block and meta files are located
*/
ReplicaInfo(Block block, FsVolumeSpi vol) {
this(vol, block.getBlockId(), block.getNumBytes(),
block.getGenerationStamp());
}
/**
* Constructor
* @param vol volume where replica is located
@ -62,7 +73,14 @@ abstract public class ReplicaInfo extends Block
}
/**
* Get the volume where this replica is located on disk.
* Copy constructor.
* @param from where to copy from
*/
ReplicaInfo(ReplicaInfo from) {
this(from, from.getVolume());
}
/**
* @return the volume where this replica is located on disk
*/
public FsVolumeSpi getVolume() {

View File

@ -98,6 +98,16 @@ public class StorageLocation
public boolean matchesStorageDirectory(StorageDirectory sd,
String bpid) throws IOException {
if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED &&
storageType == StorageType.PROVIDED) {
return matchesStorageDirectory(sd);
}
if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED ||
storageType == StorageType.PROVIDED) {
//only one of these is PROVIDED; so it cannot be a match!
return false;
}
//both storage directories are local
return this.getBpURI(bpid, Storage.STORAGE_DIR_CURRENT).normalize()
.equals(sd.getRoot().toURI().normalize());
}
@ -197,6 +207,10 @@ public class StorageLocation
if (conf == null) {
conf = new HdfsConfiguration();
}
if (storageType == StorageType.PROVIDED) {
//skip creation if the storage type is PROVIDED
return;
}
LocalFileSystem localFS = FileSystem.getLocal(conf);
FsPermission permission = new FsPermission(conf.get(
@ -213,10 +227,14 @@ public class StorageLocation
@Override // Checkable
public VolumeCheckResult check(CheckContext context) throws IOException {
DiskChecker.checkDir(
context.localFileSystem,
new Path(baseURI),
context.expectedPermission);
//we assume provided storage locations are always healthy,
//and check only for local storages.
if (storageType != StorageType.PROVIDED) {
DiskChecker.checkDir(
context.localFileSystem,
new Path(baseURI),
context.expectedPermission);
}
return VolumeCheckResult.HEALTHY;
}

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@ -252,8 +253,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* and, in case that they are not matched, update the record or mark it
* as corrupted.
*/
void checkAndUpdate(String bpid, long blockId, File diskFile,
File diskMetaFile, FsVolumeSpi vol) throws IOException;
void checkAndUpdate(String bpid, ScanInfo info) throws IOException;
/**
* @param b - the block

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
@ -241,10 +242,11 @@ public interface FsVolumeSpi
private final FsVolumeSpi volume;
private final FileRegion fileRegion;
/**
* Get the file's length in async block scan
*/
private final long blockFileLength;
private final long blockLength;
private final static Pattern CONDENSED_PATH_REGEX =
Pattern.compile("(?<!^)(\\\\|/){2,}");
@ -294,13 +296,30 @@ public interface FsVolumeSpi
*/
public ScanInfo(long blockId, File blockFile, File metaFile,
FsVolumeSpi vol) {
this(blockId, blockFile, metaFile, vol, null,
(blockFile != null) ? blockFile.length() : 0);
}
/**
* Create a ScanInfo object for a block. This constructor will examine
* the block data and meta-data files.
*
* @param blockId the block ID
* @param blockFile the path to the block data file
* @param metaFile the path to the block meta-data file
* @param vol the volume that contains the block
* @param fileRegion the file region (for provided blocks)
* @param length the length of the block data
*/
public ScanInfo(long blockId, File blockFile, File metaFile,
FsVolumeSpi vol, FileRegion fileRegion, long length) {
this.blockId = blockId;
String condensedVolPath =
(vol == null || vol.getBaseURI() == null) ? null :
getCondensedPath(new File(vol.getBaseURI()).getAbsolutePath());
this.blockSuffix = blockFile == null ? null :
getSuffix(blockFile, condensedVolPath);
this.blockFileLength = (blockFile != null) ? blockFile.length() : 0;
this.blockLength = length;
if (metaFile == null) {
this.metaSuffix = null;
} else if (blockFile == null) {
@ -310,6 +329,7 @@ public interface FsVolumeSpi
condensedVolPath + blockSuffix);
}
this.volume = vol;
this.fileRegion = fileRegion;
}
/**
@ -328,8 +348,8 @@ public interface FsVolumeSpi
*
* @return the length of the data block
*/
public long getBlockFileLength() {
return blockFileLength;
public long getBlockLength() {
return blockLength;
}
/**
@ -399,6 +419,10 @@ public interface FsVolumeSpi
getMetaFile().getName()) :
HdfsConstants.GRANDFATHER_GENERATION_STAMP;
}
public FileRegion getFileRegion() {
return fileRegion;
}
}
/**

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
/**
* The default usage statistics for a provided volume.
*/
public class DefaultProvidedVolumeDF
implements ProvidedVolumeDF, Configurable {
@Override
public void setConf(Configuration conf) {
}
@Override
public Configuration getConf() {
return null;
}
@Override
public long getCapacity() {
return Long.MAX_VALUE;
}
@Override
public long getSpaceUsed() {
return 0;
}
@Override
public long getBlockPoolUsed(String bpid) {
return 0;
}
@Override
public long getAvailable() {
return Long.MAX_VALUE;
}
}

View File

@ -86,6 +86,7 @@ import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
@ -1744,6 +1745,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
Set<String> missingVolumesReported = new HashSet<>();
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
//skip blocks in PROVIDED storage
if (b.getVolume().getStorageType() == StorageType.PROVIDED) {
continue;
}
String volStorageID = b.getVolume().getStorageID();
if (!builders.containsKey(volStorageID)) {
if (!missingVolumesReported.contains(volStorageID)) {
@ -1879,7 +1884,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
try (AutoCloseableLock lock = datasetLock.acquire()) {
r = volumeMap.get(bpid, blockId);
}
if (r != null) {
if (r.blockDataExists()) {
return r;
@ -2232,13 +2236,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* @param vol Volume of the block file
*/
@Override
public void checkAndUpdate(String bpid, long blockId, File diskFile,
File diskMetaFile, FsVolumeSpi vol) throws IOException {
public void checkAndUpdate(String bpid, ScanInfo scanInfo)
throws IOException {
long blockId = scanInfo.getBlockId();
File diskFile = scanInfo.getBlockFile();
File diskMetaFile = scanInfo.getMetaFile();
FsVolumeSpi vol = scanInfo.getVolume();
Block corruptBlock = null;
ReplicaInfo memBlockInfo;
try (AutoCloseableLock lock = datasetLock.acquire()) {
memBlockInfo = volumeMap.get(bpid, blockId);
if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) {
if (memBlockInfo != null &&
memBlockInfo.getState() != ReplicaState.FINALIZED) {
// Block is not finalized - ignore the difference
return;
}
@ -2253,6 +2264,26 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
Block.getGenerationStamp(diskMetaFile.getName()) :
HdfsConstants.GRANDFATHER_GENERATION_STAMP;
if (vol.getStorageType() == StorageType.PROVIDED) {
if (memBlockInfo == null) {
//replica exists on provided store but not in memory
ReplicaInfo diskBlockInfo =
new ReplicaBuilder(ReplicaState.FINALIZED)
.setFileRegion(scanInfo.getFileRegion())
.setFsVolume(vol)
.setConf(conf)
.build();
volumeMap.add(bpid, diskBlockInfo);
LOG.warn("Added missing block to memory " + diskBlockInfo);
} else {
//replica exists in memory but not in the provided store
volumeMap.remove(bpid, blockId);
LOG.warn("Deleting missing provided block " + memBlockInfo);
}
return;
}
if (!diskFileExists) {
if (memBlockInfo == null) {
// Block file does not exist and block does not exist in memory
@ -3028,7 +3059,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
newReplicaInfo =
replicaState.getLazyPersistVolume().activateSavedReplica(bpid,
replicaInfo, replicaState);
// Update the volumeMap entry.
volumeMap.add(bpid, newReplicaInfo);

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileInputStream;
@ -32,10 +34,12 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
/** Utility methods. */
@InterfaceAudience.Private
@ -44,6 +48,22 @@ public class FsDatasetUtil {
return f.getName().endsWith(DatanodeUtil.UNLINK_BLOCK_SUFFIX);
}
public static byte[] createNullChecksumByteArray() {
DataChecksum csum =
DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 512);
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(out);
try {
BlockMetadataHeader.writeHeader(dataOut, csum);
dataOut.close();
} catch (IOException e) {
FsVolumeImpl.LOG.error(
"Exception in creating null checksum stream: " + e);
return null;
}
return out.toByteArray();
}
static File getOrigFile(File unlinkTmpFile) {
final String name = unlinkTmpFile.getName();
if (!name.endsWith(DatanodeUtil.UNLINK_BLOCK_SUFFIX)) {
@ -135,8 +155,9 @@ public class FsDatasetUtil {
* Compute the checksum for a block file that does not already have
* its checksum computed, and save it to dstMeta file.
*/
public static void computeChecksum(File srcMeta, File dstMeta, File blockFile,
int smallBufferSize, Configuration conf) throws IOException {
public static void computeChecksum(File srcMeta, File dstMeta,
File blockFile, int smallBufferSize, Configuration conf)
throws IOException {
Preconditions.checkNotNull(srcMeta);
Preconditions.checkNotNull(dstMeta);
Preconditions.checkNotNull(blockFile);

View File

@ -154,18 +154,24 @@ public class FsVolumeImpl implements FsVolumeSpi {
this.reservedForReplicas = new AtomicLong(0L);
this.storageLocation = sd.getStorageLocation();
this.currentDir = sd.getCurrentDir();
File parent = currentDir.getParentFile();
this.usage = new DF(parent, conf);
this.storageType = storageLocation.getStorageType();
this.reserved = conf.getLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY
+ "." + StringUtils.toLowerCase(storageType.toString()), conf.getLong(
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY,
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT));
this.configuredCapacity = -1;
if (currentDir != null) {
File parent = currentDir.getParentFile();
this.usage = new DF(parent, conf);
cacheExecutor = initializeCacheExecutor(parent);
this.metrics = DataNodeVolumeMetrics.create(conf, parent.getPath());
} else {
this.usage = null;
cacheExecutor = null;
this.metrics = null;
}
this.conf = conf;
this.fileIoProvider = fileIoProvider;
cacheExecutor = initializeCacheExecutor(parent);
this.metrics = DataNodeVolumeMetrics.create(conf, getBaseURI().getPath());
}
protected ThreadPoolExecutor initializeCacheExecutor(File parent) {
@ -440,7 +446,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
/**
* Unplanned Non-DFS usage, i.e. Extra usage beyond reserved.
*
* @return
* @return Disk usage excluding space used by HDFS and excluding space
* reserved for blocks open for write.
* @throws IOException
*/
public long getNonDfsUsed() throws IOException {
@ -518,7 +525,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
public String[] getBlockPoolList() {
return bpSlices.keySet().toArray(new String[bpSlices.keySet().size()]);
}
/**
* Temporary files. They get moved to the finalized block directory when
* the block is finalized.

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
@ -67,6 +68,11 @@ public class FsVolumeImplBuilder {
}
FsVolumeImpl build() throws IOException {
if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED) {
return new ProvidedVolumeImpl(dataset, storageID, sd,
fileIoProvider != null ? fileIoProvider :
new FileIoProvider(null, null), conf);
}
return new FsVolumeImpl(
dataset, storageID, sd,
fileIoProvider != null ? fileIoProvider :

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
/**
* This interface is used to define the usage statistics
* of the provided storage.
*/
public interface ProvidedVolumeDF {
long getCapacity();
long getSpaceUsed();
long getBlockPoolUsed(String bpid);
long getAvailable();
}

View File

@ -0,0 +1,526 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.hdfs.server.common.FileRegionProvider;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.TextFileRegionProvider;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
import org.apache.hadoop.util.Timer;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.AutoCloseableLock;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectReader;
import org.codehaus.jackson.map.ObjectWriter;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
/**
* This class is used to create provided volumes.
*/
public class ProvidedVolumeImpl extends FsVolumeImpl {
static class ProvidedBlockPoolSlice {
private FsVolumeImpl providedVolume;
private FileRegionProvider provider;
private Configuration conf;
private String bpid;
private ReplicaMap bpVolumeMap;
ProvidedBlockPoolSlice(String bpid, ProvidedVolumeImpl volume,
Configuration conf) {
this.providedVolume = volume;
bpVolumeMap = new ReplicaMap(new AutoCloseableLock());
Class<? extends FileRegionProvider> fmt =
conf.getClass(DFSConfigKeys.DFS_PROVIDER_CLASS,
TextFileRegionProvider.class, FileRegionProvider.class);
provider = ReflectionUtils.newInstance(fmt, conf);
this.conf = conf;
this.bpid = bpid;
bpVolumeMap.initBlockPool(bpid);
LOG.info("Created provider: " + provider.getClass());
}
FileRegionProvider getFileRegionProvider() {
return provider;
}
public void getVolumeMap(ReplicaMap volumeMap,
RamDiskReplicaTracker ramDiskReplicaMap) throws IOException {
Iterator<FileRegion> iter = provider.iterator();
while(iter.hasNext()) {
FileRegion region = iter.next();
if (region.getBlockPoolId() != null &&
region.getBlockPoolId().equals(bpid)) {
ReplicaInfo newReplica = new ReplicaBuilder(ReplicaState.FINALIZED)
.setBlockId(region.getBlock().getBlockId())
.setURI(region.getPath().toUri())
.setOffset(region.getOffset())
.setLength(region.getBlock().getNumBytes())
.setGenerationStamp(region.getBlock().getGenerationStamp())
.setFsVolume(providedVolume)
.setConf(conf).build();
ReplicaInfo oldReplica =
volumeMap.get(bpid, newReplica.getBlockId());
if (oldReplica == null) {
volumeMap.add(bpid, newReplica);
bpVolumeMap.add(bpid, newReplica);
} else {
throw new IOException(
"A block with id " + newReplica.getBlockId() +
" already exists in the volumeMap");
}
}
}
}
public boolean isEmpty() {
return bpVolumeMap.replicas(bpid).size() == 0;
}
public void shutdown(BlockListAsLongs blocksListsAsLongs) {
//nothing to do!
}
public void compileReport(LinkedList<ScanInfo> report,
ReportCompiler reportCompiler)
throws IOException, InterruptedException {
/* refresh the provider and return the list of blocks found.
* the assumption here is that the block ids in the external
* block map, after the refresh, are consistent with those
* from before the refresh, i.e., for blocks which did not change,
* the ids remain the same.
*/
provider.refresh();
Iterator<FileRegion> iter = provider.iterator();
while(iter.hasNext()) {
reportCompiler.throttle();
FileRegion region = iter.next();
if (region.getBlockPoolId().equals(bpid)) {
LOG.info("Adding ScanInfo for blkid " +
region.getBlock().getBlockId());
report.add(new ScanInfo(region.getBlock().getBlockId(), null, null,
providedVolume, region, region.getLength()));
}
}
}
}
private URI baseURI;
private final Map<String, ProvidedBlockPoolSlice> bpSlices =
new ConcurrentHashMap<String, ProvidedBlockPoolSlice>();
private ProvidedVolumeDF df;
ProvidedVolumeImpl(FsDatasetImpl dataset, String storageID,
StorageDirectory sd, FileIoProvider fileIoProvider,
Configuration conf) throws IOException {
super(dataset, storageID, sd, fileIoProvider, conf);
assert getStorageLocation().getStorageType() == StorageType.PROVIDED:
"Only provided storages must use ProvidedVolume";
baseURI = getStorageLocation().getUri();
Class<? extends ProvidedVolumeDF> dfClass =
conf.getClass(DFSConfigKeys.DFS_PROVIDER_DF_CLASS,
DefaultProvidedVolumeDF.class, ProvidedVolumeDF.class);
df = ReflectionUtils.newInstance(dfClass, conf);
}
@Override
public String[] getBlockPoolList() {
return bpSlices.keySet().toArray(new String[bpSlices.keySet().size()]);
}
@Override
public long getCapacity() {
if (configuredCapacity < 0) {
return df.getCapacity();
}
return configuredCapacity;
}
@Override
public long getDfsUsed() throws IOException {
return df.getSpaceUsed();
}
@Override
long getBlockPoolUsed(String bpid) throws IOException {
return df.getBlockPoolUsed(bpid);
}
@Override
public long getAvailable() throws IOException {
return df.getAvailable();
}
@Override
long getActualNonDfsUsed() throws IOException {
return df.getSpaceUsed();
}
@Override
public long getNonDfsUsed() throws IOException {
return 0L;
}
@Override
public URI getBaseURI() {
return baseURI;
}
@Override
public File getFinalizedDir(String bpid) throws IOException {
return null;
}
@Override
public void reserveSpaceForReplica(long bytesToReserve) {
throw new UnsupportedOperationException(
"ProvidedVolume does not yet support writes");
}
@Override
public void releaseReservedSpace(long bytesToRelease) {
throw new UnsupportedOperationException(
"ProvidedVolume does not yet support writes");
}
private static final ObjectWriter WRITER =
new ObjectMapper().writerWithDefaultPrettyPrinter();
private static final ObjectReader READER =
new ObjectMapper().reader(ProvidedBlockIteratorState.class);
private static class ProvidedBlockIteratorState {
ProvidedBlockIteratorState() {
iterStartMs = Time.now();
lastSavedMs = iterStartMs;
atEnd = false;
lastBlockId = -1;
}
// The wall-clock ms since the epoch at which this iterator was last saved.
@JsonProperty
private long lastSavedMs;
// The wall-clock ms since the epoch at which this iterator was created.
@JsonProperty
private long iterStartMs;
@JsonProperty
private boolean atEnd;
//The id of the last block read when the state of the iterator is saved.
//This implementation assumes that provided blocks are returned
//in sorted order of the block ids.
@JsonProperty
private long lastBlockId;
}
private class ProviderBlockIteratorImpl
implements FsVolumeSpi.BlockIterator {
private String bpid;
private String name;
private FileRegionProvider provider;
private Iterator<FileRegion> blockIterator;
private ProvidedBlockIteratorState state;
ProviderBlockIteratorImpl(String bpid, String name,
FileRegionProvider provider) {
this.bpid = bpid;
this.name = name;
this.provider = provider;
rewind();
}
@Override
public void close() throws IOException {
//No action needed
}
@Override
public ExtendedBlock nextBlock() throws IOException {
if (null == blockIterator || !blockIterator.hasNext()) {
return null;
}
FileRegion nextRegion = null;
while (null == nextRegion && blockIterator.hasNext()) {
FileRegion temp = blockIterator.next();
if (temp.getBlock().getBlockId() < state.lastBlockId) {
continue;
}
if (temp.getBlockPoolId().equals(bpid)) {
nextRegion = temp;
}
}
if (null == nextRegion) {
return null;
}
state.lastBlockId = nextRegion.getBlock().getBlockId();
return new ExtendedBlock(bpid, nextRegion.getBlock());
}
@Override
public boolean atEnd() {
return blockIterator != null ? !blockIterator.hasNext(): true;
}
@Override
public void rewind() {
blockIterator = provider.iterator();
state = new ProvidedBlockIteratorState();
}
@Override
public void save() throws IOException {
//We do not persist the state of this iterator anywhere, locally.
//We just re-scan provided volumes as necessary.
state.lastSavedMs = Time.now();
}
@Override
public void setMaxStalenessMs(long maxStalenessMs) {
//do not use max staleness
}
@Override
public long getIterStartMs() {
return state.iterStartMs;
}
@Override
public long getLastSavedMs() {
return state.lastSavedMs;
}
@Override
public String getBlockPoolId() {
return bpid;
}
public void load() throws IOException {
//on load, we just rewind the iterator for provided volumes.
rewind();
LOG.trace("load({}, {}): loaded iterator {}: {}", getStorageID(),
bpid, name, WRITER.writeValueAsString(state));
}
}
@Override
public BlockIterator newBlockIterator(String bpid, String name) {
return new ProviderBlockIteratorImpl(bpid, name,
bpSlices.get(bpid).getFileRegionProvider());
}
@Override
public BlockIterator loadBlockIterator(String bpid, String name)
throws IOException {
ProviderBlockIteratorImpl iter = new ProviderBlockIteratorImpl(bpid, name,
bpSlices.get(bpid).getFileRegionProvider());
iter.load();
return iter;
}
@Override
ReplicaInfo addFinalizedBlock(String bpid, Block b,
ReplicaInfo replicaInfo, long bytesReserved) throws IOException {
throw new UnsupportedOperationException(
"ProvidedVolume does not yet support writes");
}
@Override
public VolumeCheckResult check(VolumeCheckContext ignored)
throws DiskErrorException {
return VolumeCheckResult.HEALTHY;
}
@Override
void getVolumeMap(ReplicaMap volumeMap,
final RamDiskReplicaTracker ramDiskReplicaMap)
throws IOException {
LOG.info("Creating volumemap for provided volume " + this);
for(ProvidedBlockPoolSlice s : bpSlices.values()) {
s.getVolumeMap(volumeMap, ramDiskReplicaMap);
}
}
private ProvidedBlockPoolSlice getProvidedBlockPoolSlice(String bpid)
throws IOException {
ProvidedBlockPoolSlice bp = bpSlices.get(bpid);
if (bp == null) {
throw new IOException("block pool " + bpid + " is not found");
}
return bp;
}
@Override
void getVolumeMap(String bpid, ReplicaMap volumeMap,
final RamDiskReplicaTracker ramDiskReplicaMap)
throws IOException {
getProvidedBlockPoolSlice(bpid).getVolumeMap(volumeMap, ramDiskReplicaMap);
}
@VisibleForTesting
FileRegionProvider getFileRegionProvider(String bpid) throws IOException {
return getProvidedBlockPoolSlice(bpid).getFileRegionProvider();
}
@Override
public String toString() {
return this.baseURI.toString();
}
@Override
void addBlockPool(String bpid, Configuration conf) throws IOException {
addBlockPool(bpid, conf, null);
}
@Override
void addBlockPool(String bpid, Configuration conf, Timer timer)
throws IOException {
LOG.info("Adding block pool " + bpid +
" to volume with id " + getStorageID());
ProvidedBlockPoolSlice bp;
bp = new ProvidedBlockPoolSlice(bpid, this, conf);
bpSlices.put(bpid, bp);
}
void shutdown() {
if (cacheExecutor != null) {
cacheExecutor.shutdown();
}
Set<Entry<String, ProvidedBlockPoolSlice>> set = bpSlices.entrySet();
for (Entry<String, ProvidedBlockPoolSlice> entry : set) {
entry.getValue().shutdown(null);
}
}
@Override
void shutdownBlockPool(String bpid, BlockListAsLongs blocksListsAsLongs) {
ProvidedBlockPoolSlice bp = bpSlices.get(bpid);
if (bp != null) {
bp.shutdown(blocksListsAsLongs);
}
bpSlices.remove(bpid);
}
@Override
boolean isBPDirEmpty(String bpid) throws IOException {
return getProvidedBlockPoolSlice(bpid).isEmpty();
}
@Override
void deleteBPDirectories(String bpid, boolean force) throws IOException {
throw new UnsupportedOperationException(
"ProvidedVolume does not yet support writes");
}
@Override
public LinkedList<ScanInfo> compileReport(String bpid,
LinkedList<ScanInfo> report, ReportCompiler reportCompiler)
throws InterruptedException, IOException {
LOG.info("Compiling report for volume: " + this + " bpid " + bpid);
//get the report from the appropriate block pool.
if(bpSlices.containsKey(bpid)) {
bpSlices.get(bpid).compileReport(report, reportCompiler);
}
return report;
}
@Override
public ReplicaInPipeline append(String bpid, ReplicaInfo replicaInfo,
long newGS, long estimateBlockLen) throws IOException {
throw new UnsupportedOperationException(
"ProvidedVolume does not yet support writes");
}
@Override
public ReplicaInPipeline createRbw(ExtendedBlock b) throws IOException {
throw new UnsupportedOperationException(
"ProvidedVolume does not yet support writes");
}
@Override
public ReplicaInPipeline convertTemporaryToRbw(ExtendedBlock b,
ReplicaInfo temp) throws IOException {
throw new UnsupportedOperationException(
"ProvidedVolume does not yet support writes");
}
@Override
public ReplicaInPipeline createTemporary(ExtendedBlock b)
throws IOException {
throw new UnsupportedOperationException(
"ProvidedVolume does not yet support writes");
}
@Override
public ReplicaInPipeline updateRURCopyOnTruncate(ReplicaInfo rur,
String bpid, long newBlockId, long recoveryId, long newlength)
throws IOException {
throw new UnsupportedOperationException(
"ProvidedVolume does not yet support writes");
}
@Override
public ReplicaInfo moveBlockToTmpLocation(ExtendedBlock block,
ReplicaInfo replicaInfo, int smallBufferSize,
Configuration conf) throws IOException {
throw new UnsupportedOperationException(
"ProvidedVolume does not yet support writes");
}
@Override
public File[] copyBlockToLazyPersistLocation(String bpId, long blockId,
long genStamp, ReplicaInfo replicaInfo, int smallBufferSize,
Configuration conf) throws IOException {
throw new UnsupportedOperationException(
"ProvidedVolume does not yet support writes");
}
}

View File

@ -686,7 +686,7 @@ public class Mover {
}
}
static class Cli extends Configured implements Tool {
public static class Cli extends Configured implements Tool {
private static final String USAGE = "Usage: hdfs mover "
+ "[-p <files/dirs> | -f <local file>]"
+ "\n\t-p <files/dirs>\ta space separated list of HDFS files/dirs to migrate."

View File

@ -39,7 +39,7 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
class FSImageCompression {
public class FSImageCompression {
/** Codec to use to save or load image, or null if the image is not compressed */
private CompressionCodec imageCodec;

View File

@ -658,6 +658,10 @@ public class NNStorage extends Storage implements Closeable,
void readProperties(StorageDirectory sd, StartupOption startupOption)
throws IOException {
Properties props = readPropertiesFile(sd.getVersionFile());
if (props == null) {
throw new IOException(
"Properties not found for storage directory " + sd);
}
if (HdfsServerConstants.RollingUpgradeStartupOption.ROLLBACK
.matches(startupOption)) {
int lv = Integer.parseInt(getProperty(props, sd, "layoutVersion"));
@ -975,7 +979,11 @@ public class NNStorage extends Storage implements Closeable,
StorageDirectory sd = sdit.next();
try {
Properties props = readPropertiesFile(sd.getVersionFile());
cid = props.getProperty("clusterID");
if (props == null) {
cid = null;
} else {
cid = props.getProperty("clusterID");
}
LOG.info("current cluster id for sd="+sd.getCurrentDir() +
";lv=" + layoutVersion + ";cid=" + cid);

View File

@ -4621,6 +4621,84 @@
</description>
</property>
<property>
<name>dfs.provider.class</name>
<value>org.apache.hadoop.hdfs.server.common.TextFileRegionProvider</value>
<description>
The class that is used to load information about blocks stored in
provided storages.
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TextFileRegionProvider
is used as the default, which expects the blocks to be specified
using a delimited text file.
</description>
</property>
<property>
<name>dfs.provided.df.class</name>
<value>org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.DefaultProvidedVolumeDF</value>
<description>
The class that is used to measure usage statistics of provided stores.
</description>
</property>
<property>
<name>dfs.provided.storage.id</name>
<value>DS-PROVIDED</value>
<description>
The storage ID used for provided stores.
</description>
</property>
<property>
<name>dfs.provided.blockformat.class</name>
<value>org.apache.hadoop.hdfs.server.common.TextFileRegionFormat</value>
<description>
The class that is used to specify the input format of the blocks on
provided storages. The default is
org.apache.hadoop.hdfs.server.common.TextFileRegionFormat which uses
file regions to describe blocks. The file regions are specified as a
delimited text file. Each file region is a 6-tuple containing the
block id, remote file path, offset into file, length of block, the
block pool id containing the block, and the generation stamp of the
block.
</description>
</property>
<property>
<name>dfs.provided.textprovider.delimiter</name>
<value>,</value>
<description>
The delimiter used when the provided block map is specified as
a text file.
</description>
</property>
<property>
<name>dfs.provided.textprovider.read.path</name>
<value></value>
<description>
The path specifying the provided block map as a text file, specified as
a URI.
</description>
</property>
<property>
<name>dfs.provided.textprovider.read.codec</name>
<value></value>
<description>
The codec used to de-compress the provided block map.
</description>
</property>
<property>
<name>dfs.provided.textprovider.write.path</name>
<value></value>
<description>
The path to which the provided block map should be written as a text
file, specified as a URI.
</description>
</property>
<property>
<name>dfs.lock.suppress.warning.interval</name>
<value>10s</value>

View File

@ -208,7 +208,7 @@ public class TestDFSRollback {
UpgradeUtilities.createDataNodeVersionFile(
dataCurrentDirs,
storageInfo,
UpgradeUtilities.getCurrentBlockPoolID(cluster));
UpgradeUtilities.getCurrentBlockPoolID(cluster), conf);
cluster.startDataNodes(conf, 1, false, StartupOption.ROLLBACK, null);
assertTrue(cluster.isDataNodeUp());
@ -256,7 +256,7 @@ public class TestDFSRollback {
NodeType.DATA_NODE);
UpgradeUtilities.createDataNodeVersionFile(baseDirs, storageInfo,
UpgradeUtilities.getCurrentBlockPoolID(cluster));
UpgradeUtilities.getCurrentBlockPoolID(cluster), conf);
startBlockPoolShouldFail(StartupOption.ROLLBACK,
cluster.getNamesystem().getBlockPoolId());
@ -283,7 +283,7 @@ public class TestDFSRollback {
NodeType.DATA_NODE);
UpgradeUtilities.createDataNodeVersionFile(baseDirs, storageInfo,
UpgradeUtilities.getCurrentBlockPoolID(cluster));
UpgradeUtilities.getCurrentBlockPoolID(cluster), conf);
startBlockPoolShouldFail(StartupOption.ROLLBACK,
cluster.getNamesystem().getBlockPoolId());

View File

@ -265,7 +265,7 @@ public class TestDFSStartupVersions {
conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY), "current");
log("DataNode version info", DATA_NODE, i, versions[i]);
UpgradeUtilities.createDataNodeVersionFile(storage,
versions[i].storageInfo, bpid, versions[i].blockPoolId);
versions[i].storageInfo, bpid, versions[i].blockPoolId, conf);
try {
cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
} catch (Exception ignore) {

View File

@ -290,7 +290,7 @@ public class TestDFSUpgrade {
UpgradeUtilities.getCurrentFsscTime(cluster), NodeType.DATA_NODE);
UpgradeUtilities.createDataNodeVersionFile(baseDirs, storageInfo,
UpgradeUtilities.getCurrentBlockPoolID(cluster));
UpgradeUtilities.getCurrentBlockPoolID(cluster), conf);
startBlockPoolShouldFail(StartupOption.REGULAR, UpgradeUtilities
.getCurrentBlockPoolID(null));
@ -308,7 +308,7 @@ public class TestDFSUpgrade {
NodeType.DATA_NODE);
UpgradeUtilities.createDataNodeVersionFile(baseDirs, storageInfo,
UpgradeUtilities.getCurrentBlockPoolID(cluster));
UpgradeUtilities.getCurrentBlockPoolID(cluster), conf);
// Ensure corresponding block pool failed to initialized
startBlockPoolShouldFail(StartupOption.REGULAR, UpgradeUtilities
.getCurrentBlockPoolID(null));

View File

@ -384,8 +384,10 @@ public class UpgradeUtilities {
new File(datanodeStorage.toString()));
sd.setStorageUuid(DatanodeStorage.generateUuid());
Properties properties = Storage.readPropertiesFile(sd.getVersionFile());
properties.setProperty("storageID", sd.getStorageUuid());
Storage.writeProperties(sd.getVersionFile(), properties);
if (properties != null) {
properties.setProperty("storageID", sd.getStorageUuid());
Storage.writeProperties(sd.getVersionFile(), properties);
}
retVal[i] = newDir;
}
@ -461,8 +463,9 @@ public class UpgradeUtilities {
* @param bpid Block pool Id
*/
public static void createDataNodeVersionFile(File[] parent,
StorageInfo version, String bpid) throws IOException {
createDataNodeVersionFile(parent, version, bpid, bpid);
StorageInfo version, String bpid, Configuration conf)
throws IOException {
createDataNodeVersionFile(parent, version, bpid, bpid, conf);
}
/**
@ -477,7 +480,8 @@ public class UpgradeUtilities {
* @param bpidToWrite Block pool Id to write into the version file
*/
public static void createDataNodeVersionFile(File[] parent,
StorageInfo version, String bpid, String bpidToWrite) throws IOException {
StorageInfo version, String bpid, String bpidToWrite, Configuration conf)
throws IOException {
DataStorage storage = new DataStorage(version);
storage.setDatanodeUuid("FixedDatanodeUuid");
@ -485,7 +489,7 @@ public class UpgradeUtilities {
for (int i = 0; i < parent.length; i++) {
File versionFile = new File(parent[i], "VERSION");
StorageDirectory sd = new StorageDirectory(parent[i].getParentFile());
DataStorage.createStorageID(sd, false);
DataStorage.createStorageID(sd, false, conf);
storage.writeProperties(versionFile, sd);
versionFiles[i] = versionFile;
File bpDir = BlockPoolSliceStorage.getBpRoot(bpid, parent[i]);

View File

@ -0,0 +1,160 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.TextFileRegionFormat.*;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* Test for the text based block format for provided block maps.
*/
public class TestTextBlockFormat {
static final Path OUTFILE = new Path("hdfs://dummyServer:0000/dummyFile.txt");
void check(TextWriter.Options opts, final Path vp,
final Class<? extends CompressionCodec> vc) throws IOException {
TextFileRegionFormat mFmt = new TextFileRegionFormat() {
@Override
public TextWriter createWriter(Path file, CompressionCodec codec,
String delim, Configuration conf) throws IOException {
assertEquals(vp, file);
if (null == vc) {
assertNull(codec);
} else {
assertEquals(vc, codec.getClass());
}
return null; // ignored
}
};
mFmt.getWriter(opts);
}
@Test
public void testWriterOptions() throws Exception {
TextWriter.Options opts = TextWriter.defaults();
assertTrue(opts instanceof WriterOptions);
WriterOptions wopts = (WriterOptions) opts;
Path def = new Path(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT);
assertEquals(def, wopts.getFile());
assertNull(wopts.getCodec());
opts.filename(OUTFILE);
check(opts, OUTFILE, null);
opts.filename(OUTFILE);
opts.codec("gzip");
Path cp = new Path(OUTFILE.getParent(), OUTFILE.getName() + ".gz");
check(opts, cp, org.apache.hadoop.io.compress.GzipCodec.class);
}
@Test
public void testCSVReadWrite() throws Exception {
final DataOutputBuffer out = new DataOutputBuffer();
FileRegion r1 = new FileRegion(4344L, OUTFILE, 0, 1024);
FileRegion r2 = new FileRegion(4345L, OUTFILE, 1024, 1024);
FileRegion r3 = new FileRegion(4346L, OUTFILE, 2048, 512);
try (TextWriter csv = new TextWriter(new OutputStreamWriter(out), ",")) {
csv.store(r1);
csv.store(r2);
csv.store(r3);
}
Iterator<FileRegion> i3;
try (TextReader csv = new TextReader(null, null, null, ",") {
@Override
public InputStream createStream() {
DataInputBuffer in = new DataInputBuffer();
in.reset(out.getData(), 0, out.getLength());
return in;
}}) {
Iterator<FileRegion> i1 = csv.iterator();
assertEquals(r1, i1.next());
Iterator<FileRegion> i2 = csv.iterator();
assertEquals(r1, i2.next());
assertEquals(r2, i2.next());
assertEquals(r3, i2.next());
assertEquals(r2, i1.next());
assertEquals(r3, i1.next());
assertFalse(i1.hasNext());
assertFalse(i2.hasNext());
i3 = csv.iterator();
}
try {
i3.next();
} catch (IllegalStateException e) {
return;
}
fail("Invalid iterator");
}
@Test
public void testCSVReadWriteTsv() throws Exception {
final DataOutputBuffer out = new DataOutputBuffer();
FileRegion r1 = new FileRegion(4344L, OUTFILE, 0, 1024);
FileRegion r2 = new FileRegion(4345L, OUTFILE, 1024, 1024);
FileRegion r3 = new FileRegion(4346L, OUTFILE, 2048, 512);
try (TextWriter csv = new TextWriter(new OutputStreamWriter(out), "\t")) {
csv.store(r1);
csv.store(r2);
csv.store(r3);
}
Iterator<FileRegion> i3;
try (TextReader csv = new TextReader(null, null, null, "\t") {
@Override
public InputStream createStream() {
DataInputBuffer in = new DataInputBuffer();
in.reset(out.getData(), 0, out.getLength());
return in;
}}) {
Iterator<FileRegion> i1 = csv.iterator();
assertEquals(r1, i1.next());
Iterator<FileRegion> i2 = csv.iterator();
assertEquals(r1, i2.next());
assertEquals(r2, i2.next());
assertEquals(r3, i2.next());
assertEquals(r2, i1.next());
assertEquals(r3, i1.next());
assertFalse(i1.hasNext());
assertFalse(i2.hasNext());
i3 = csv.iterator();
}
try {
i3.next();
} catch (IllegalStateException e) {
return;
}
fail("Invalid iterator");
}
}

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
@ -616,7 +617,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
this.datanode = datanode;
if (storage != null) {
for (int i = 0; i < storage.getNumStorageDirs(); ++i) {
DataStorage.createStorageID(storage.getStorageDir(i), false);
DataStorage.createStorageID(storage.getStorageDir(i), false, conf);
}
this.datanodeUuid = storage.getDatanodeUuid();
} else {
@ -1352,8 +1353,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
@Override
public void checkAndUpdate(String bpid, long blockId, File diskFile,
File diskMetaFile, FsVolumeSpi vol) throws IOException {
public void checkAndUpdate(String bpid, ScanInfo info) throws IOException {
throw new UnsupportedOperationException();
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.*;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
@ -94,8 +95,8 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
}
@Override
public void checkAndUpdate(String bpid, long blockId, File diskFile,
File diskMetaFile, FsVolumeSpi vol) {
public void checkAndUpdate(String bpid, ScanInfo info) {
return;
}
@Override

View File

@ -119,11 +119,12 @@ public class TestFsDatasetImpl {
private final static String BLOCKPOOL = "BP-TEST";
private static Storage.StorageDirectory createStorageDirectory(File root)
private static Storage.StorageDirectory createStorageDirectory(File root,
Configuration conf)
throws SecurityException, IOException {
Storage.StorageDirectory sd = new Storage.StorageDirectory(
StorageLocation.parse(root.toURI().toString()));
DataStorage.createStorageID(sd, false);
DataStorage.createStorageID(sd, false, conf);
return sd;
}
@ -137,7 +138,7 @@ public class TestFsDatasetImpl {
File loc = new File(BASE_DIR + "/data" + i);
dirStrings.add(new Path(loc.toString()).toUri().toString());
loc.mkdirs();
dirs.add(createStorageDirectory(loc));
dirs.add(createStorageDirectory(loc, conf));
when(storage.getStorageDir(i)).thenReturn(dirs.get(i));
}
@ -197,7 +198,8 @@ public class TestFsDatasetImpl {
String pathUri = new Path(path).toUri().toString();
expectedVolumes.add(new File(pathUri).getAbsolutePath());
StorageLocation loc = StorageLocation.parse(pathUri);
Storage.StorageDirectory sd = createStorageDirectory(new File(path));
Storage.StorageDirectory sd = createStorageDirectory(
new File(path), conf);
DataStorage.VolumeBuilder builder =
new DataStorage.VolumeBuilder(storage, sd);
when(storage.prepareVolume(eq(datanode), eq(loc),
@ -315,7 +317,8 @@ public class TestFsDatasetImpl {
String newVolumePath = BASE_DIR + "/newVolumeToRemoveLater";
StorageLocation loc = StorageLocation.parse(newVolumePath);
Storage.StorageDirectory sd = createStorageDirectory(new File(newVolumePath));
Storage.StorageDirectory sd = createStorageDirectory(
new File(newVolumePath), conf);
DataStorage.VolumeBuilder builder =
new DataStorage.VolumeBuilder(storage, sd);
when(storage.prepareVolume(eq(datanode), eq(loc),
@ -348,7 +351,7 @@ public class TestFsDatasetImpl {
any(ReplicaMap.class),
any(RamDiskReplicaLruTracker.class));
Storage.StorageDirectory sd = createStorageDirectory(badDir);
Storage.StorageDirectory sd = createStorageDirectory(badDir, conf);
sd.lock();
DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd);
when(storage.prepareVolume(eq(datanode),
@ -492,7 +495,7 @@ public class TestFsDatasetImpl {
String path = BASE_DIR + "/newData0";
String pathUri = new Path(path).toUri().toString();
StorageLocation loc = StorageLocation.parse(pathUri);
Storage.StorageDirectory sd = createStorageDirectory(new File(path));
Storage.StorageDirectory sd = createStorageDirectory(new File(path), conf);
DataStorage.VolumeBuilder builder =
new DataStorage.VolumeBuilder(storage, sd);
when(

View File

@ -0,0 +1,426 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.hdfs.server.common.FileRegionProvider;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.StringUtils;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Basic test cases for provided implementation.
*/
public class TestProvidedImpl {
private static final Logger LOG =
LoggerFactory.getLogger(TestFsDatasetImpl.class);
private static final String BASE_DIR =
new FileSystemTestHelper().getTestRootDir();
private static final int NUM_LOCAL_INIT_VOLUMES = 1;
private static final int NUM_PROVIDED_INIT_VOLUMES = 1;
private static final String[] BLOCK_POOL_IDS = {"bpid-0", "bpid-1"};
private static final int NUM_PROVIDED_BLKS = 10;
private static final long BLK_LEN = 128 * 1024;
private static final int MIN_BLK_ID = 0;
private static final int CHOSEN_BP_ID = 0;
private static String providedBasePath = BASE_DIR;
private Configuration conf;
private DataNode datanode;
private DataStorage storage;
private FsDatasetImpl dataset;
private static Map<Long, String> blkToPathMap;
private static List<FsVolumeImpl> providedVolumes;
/**
* A simple FileRegion iterator for tests.
*/
public static class TestFileRegionIterator implements Iterator<FileRegion> {
private int numBlocks;
private int currentCount;
private String basePath;
public TestFileRegionIterator(String basePath, int minID, int numBlocks) {
this.currentCount = minID;
this.numBlocks = numBlocks;
this.basePath = basePath;
}
@Override
public boolean hasNext() {
return currentCount < numBlocks;
}
@Override
public FileRegion next() {
FileRegion region = null;
if (hasNext()) {
File newFile = new File(basePath, "file" + currentCount);
if(!newFile.exists()) {
try {
LOG.info("Creating file for blkid " + currentCount);
blkToPathMap.put((long) currentCount, newFile.getAbsolutePath());
LOG.info("Block id " + currentCount + " corresponds to file " +
newFile.getAbsolutePath());
newFile.createNewFile();
Writer writer = new OutputStreamWriter(
new FileOutputStream(newFile.getAbsolutePath()), "utf-8");
for(int i=0; i< BLK_LEN/(Integer.SIZE/8); i++) {
writer.write(currentCount);
}
writer.flush();
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
region = new FileRegion(currentCount, new Path(newFile.toString()),
0, BLK_LEN, BLOCK_POOL_IDS[CHOSEN_BP_ID]);
currentCount++;
}
return region;
}
@Override
public void remove() {
//do nothing.
}
public void resetMinBlockId(int minId) {
currentCount = minId;
}
public void resetBlockCount(int numBlocks) {
this.numBlocks = numBlocks;
}
}
/**
* A simple FileRegion provider for tests.
*/
public static class TestFileRegionProvider
extends FileRegionProvider implements Configurable {
private Configuration conf;
private int minId;
private int numBlocks;
TestFileRegionProvider() {
minId = MIN_BLK_ID;
numBlocks = NUM_PROVIDED_BLKS;
}
@Override
public Iterator<FileRegion> iterator() {
return new TestFileRegionIterator(providedBasePath, minId, numBlocks);
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public void refresh() {
//do nothing!
}
public void setMinBlkId(int minId) {
this.minId = minId;
}
public void setBlockCount(int numBlocks) {
this.numBlocks = numBlocks;
}
}
private static Storage.StorageDirectory createLocalStorageDirectory(
File root, Configuration conf)
throws SecurityException, IOException {
Storage.StorageDirectory sd =
new Storage.StorageDirectory(
StorageLocation.parse(root.toURI().toString()));
DataStorage.createStorageID(sd, false, conf);
return sd;
}
private static Storage.StorageDirectory createProvidedStorageDirectory(
String confString, Configuration conf)
throws SecurityException, IOException {
Storage.StorageDirectory sd =
new Storage.StorageDirectory(StorageLocation.parse(confString));
DataStorage.createStorageID(sd, false, conf);
return sd;
}
private static void createStorageDirs(DataStorage storage,
Configuration conf, int numDirs, int numProvidedDirs)
throws IOException {
List<Storage.StorageDirectory> dirs =
new ArrayList<Storage.StorageDirectory>();
List<String> dirStrings = new ArrayList<String>();
FileUtils.deleteDirectory(new File(BASE_DIR));
for (int i = 0; i < numDirs; i++) {
File loc = new File(BASE_DIR, "data" + i);
dirStrings.add(new Path(loc.toString()).toUri().toString());
loc.mkdirs();
dirs.add(createLocalStorageDirectory(loc, conf));
when(storage.getStorageDir(i)).thenReturn(dirs.get(i));
}
for (int i = numDirs; i < numDirs + numProvidedDirs; i++) {
File loc = new File(BASE_DIR, "data" + i);
providedBasePath = loc.getAbsolutePath();
loc.mkdirs();
String dirString = "[PROVIDED]" +
new Path(loc.toString()).toUri().toString();
dirStrings.add(dirString);
dirs.add(createProvidedStorageDirectory(dirString, conf));
when(storage.getStorageDir(i)).thenReturn(dirs.get(i));
}
String dataDir = StringUtils.join(",", dirStrings);
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDir);
when(storage.dirIterator()).thenReturn(dirs.iterator());
when(storage.getNumStorageDirs()).thenReturn(numDirs + numProvidedDirs);
}
private int getNumVolumes() {
try (FsDatasetSpi.FsVolumeReferences volumes =
dataset.getFsVolumeReferences()) {
return volumes.size();
} catch (IOException e) {
return 0;
}
}
private void compareBlkFile(InputStream ins, String filepath)
throws FileNotFoundException, IOException {
try (ReadableByteChannel i = Channels.newChannel(
new FileInputStream(new File(filepath)))) {
try (ReadableByteChannel j = Channels.newChannel(ins)) {
ByteBuffer ib = ByteBuffer.allocate(4096);
ByteBuffer jb = ByteBuffer.allocate(4096);
while (true) {
int il = i.read(ib);
int jl = j.read(jb);
if (il < 0 || jl < 0) {
assertEquals(il, jl);
break;
}
ib.flip();
jb.flip();
int cmp = Math.min(ib.remaining(), jb.remaining());
for (int k = 0; k < cmp; ++k) {
assertEquals(ib.get(), jb.get());
}
ib.compact();
jb.compact();
}
}
}
}
@Before
public void setUp() throws IOException {
datanode = mock(DataNode.class);
storage = mock(DataStorage.class);
this.conf = new Configuration();
this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0);
when(datanode.getConf()).thenReturn(conf);
final DNConf dnConf = new DNConf(datanode);
when(datanode.getDnConf()).thenReturn(dnConf);
final BlockScanner disabledBlockScanner = new BlockScanner(datanode, conf);
when(datanode.getBlockScanner()).thenReturn(disabledBlockScanner);
final ShortCircuitRegistry shortCircuitRegistry =
new ShortCircuitRegistry(conf);
when(datanode.getShortCircuitRegistry()).thenReturn(shortCircuitRegistry);
this.conf.setClass(DFSConfigKeys.DFS_PROVIDER_CLASS,
TestFileRegionProvider.class, FileRegionProvider.class);
blkToPathMap = new HashMap<Long, String>();
providedVolumes = new LinkedList<FsVolumeImpl>();
createStorageDirs(
storage, conf, NUM_LOCAL_INIT_VOLUMES, NUM_PROVIDED_INIT_VOLUMES);
dataset = new FsDatasetImpl(datanode, storage, conf);
FsVolumeReferences volumes = dataset.getFsVolumeReferences();
for (int i = 0; i < volumes.size(); i++) {
FsVolumeSpi vol = volumes.get(i);
if (vol.getStorageType() == StorageType.PROVIDED) {
providedVolumes.add((FsVolumeImpl) vol);
}
}
for (String bpid : BLOCK_POOL_IDS) {
dataset.addBlockPool(bpid, conf);
}
assertEquals(NUM_LOCAL_INIT_VOLUMES + NUM_PROVIDED_INIT_VOLUMES,
getNumVolumes());
assertEquals(0, dataset.getNumFailedVolumes());
}
@Test
public void testProvidedStorageID() throws IOException {
for (int i = 0; i < providedVolumes.size(); i++) {
assertEquals(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT,
providedVolumes.get(i).getStorageID());
}
}
@Test
public void testBlockLoad() throws IOException {
for (int i = 0; i < providedVolumes.size(); i++) {
FsVolumeImpl vol = providedVolumes.get(i);
ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
vol.getVolumeMap(volumeMap, null);
assertEquals(vol.getBlockPoolList().length, BLOCK_POOL_IDS.length);
for (int j = 0; j < BLOCK_POOL_IDS.length; j++) {
if (j != CHOSEN_BP_ID) {
//this block pool should not have any blocks
assertEquals(null, volumeMap.replicas(BLOCK_POOL_IDS[j]));
}
}
assertEquals(NUM_PROVIDED_BLKS,
volumeMap.replicas(BLOCK_POOL_IDS[CHOSEN_BP_ID]).size());
}
}
@Test
public void testProvidedBlockRead() throws IOException {
for (int id = 0; id < NUM_PROVIDED_BLKS; id++) {
ExtendedBlock eb = new ExtendedBlock(
BLOCK_POOL_IDS[CHOSEN_BP_ID], id, BLK_LEN,
HdfsConstants.GRANDFATHER_GENERATION_STAMP);
InputStream ins = dataset.getBlockInputStream(eb, 0);
String filepath = blkToPathMap.get((long) id);
compareBlkFile(ins, filepath);
}
}
@Test
public void testProvidedBlockIterator() throws IOException {
for (int i = 0; i < providedVolumes.size(); i++) {
FsVolumeImpl vol = providedVolumes.get(i);
BlockIterator iter =
vol.newBlockIterator(BLOCK_POOL_IDS[CHOSEN_BP_ID], "temp");
Set<Long> blockIdsUsed = new HashSet<Long>();
while(!iter.atEnd()) {
ExtendedBlock eb = iter.nextBlock();
long blkId = eb.getBlockId();
assertTrue(blkId >= MIN_BLK_ID && blkId < NUM_PROVIDED_BLKS);
//all block ids must be unique!
assertTrue(!blockIdsUsed.contains(blkId));
blockIdsUsed.add(blkId);
}
assertEquals(NUM_PROVIDED_BLKS, blockIdsUsed.size());
}
}
@Test
public void testRefresh() throws IOException {
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1);
for (int i = 0; i < providedVolumes.size(); i++) {
ProvidedVolumeImpl vol = (ProvidedVolumeImpl) providedVolumes.get(i);
TestFileRegionProvider provider = (TestFileRegionProvider)
vol.getFileRegionProvider(BLOCK_POOL_IDS[CHOSEN_BP_ID]);
//equivalent to two new blocks appearing
provider.setBlockCount(NUM_PROVIDED_BLKS + 2);
//equivalent to deleting the first block
provider.setMinBlkId(MIN_BLK_ID + 1);
DirectoryScanner scanner = new DirectoryScanner(datanode, dataset, conf);
scanner.reconcile();
ReplicaInfo info = dataset.getBlockReplica(
BLOCK_POOL_IDS[CHOSEN_BP_ID], NUM_PROVIDED_BLKS + 1);
//new replica should be added to the dataset
assertTrue(info != null);
try {
info = dataset.getBlockReplica(BLOCK_POOL_IDS[CHOSEN_BP_ID], 0);
} catch(Exception ex) {
LOG.info("Exception expected: " + ex);
}
}
}
}

View File

@ -68,7 +68,10 @@ public class TestClusterId {
fsImage.getStorage().dirIterator(NNStorage.NameNodeDirType.IMAGE);
StorageDirectory sd = sdit.next();
Properties props = Storage.readPropertiesFile(sd.getVersionFile());
String cid = props.getProperty("clusterID");
String cid = null;
if (props != null) {
cid = props.getProperty("clusterID");
}
LOG.info("successfully formated : sd="+sd.getCurrentDir() + ";cid="+cid);
return cid;
}