HDFS-7496. Fix FsVolume removal race conditions on the DataNode by reference-counting the volume instances (Lei (Eddy) Xu via Colin P. McCabe)

(cherry-picked from commit a17584936c)
This commit is contained in:
Colin Patrick Mccabe 2015-01-21 11:38:08 -08:00
parent 1dc5483054
commit 5ec2b6caa9
22 changed files with 597 additions and 201 deletions

View File

@ -466,6 +466,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7641. Update archival storage user doc for list/set/get block storage
policies. (yliu)
HDFS-7496. Fix FsVolume removal race conditions on the DataNode by
reference-counting the volume instances (lei via cmccabe)
HDFS-7610. Fix removal of dynamically added DN volumes (Lei (Eddy) Xu via
Colin P. McCabe)

View File

@ -26,7 +26,6 @@ import java.io.DataOutputStream;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
@ -49,10 +48,8 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
@ -125,6 +122,8 @@ class BlockReceiver implements Closeable {
private boolean syncOnClose;
private long restartBudget;
/** the reference of the volume where the block receiver writes to */
private ReplicaHandler replicaHandler;
/**
* for replaceBlock response
@ -179,48 +178,50 @@ class BlockReceiver implements Closeable {
// Open local disk out
//
if (isDatanode) { //replication or move
replicaInfo = datanode.data.createTemporary(storageType, block);
replicaHandler = datanode.data.createTemporary(storageType, block);
} else {
switch (stage) {
case PIPELINE_SETUP_CREATE:
replicaInfo = datanode.data.createRbw(storageType, block, allowLazyPersist);
replicaHandler = datanode.data.createRbw(storageType, block, allowLazyPersist);
datanode.notifyNamenodeReceivingBlock(
block, replicaInfo.getStorageUuid());
block, replicaHandler.getReplica().getStorageUuid());
break;
case PIPELINE_SETUP_STREAMING_RECOVERY:
replicaInfo = datanode.data.recoverRbw(
replicaHandler = datanode.data.recoverRbw(
block, newGs, minBytesRcvd, maxBytesRcvd);
block.setGenerationStamp(newGs);
break;
case PIPELINE_SETUP_APPEND:
replicaInfo = datanode.data.append(block, newGs, minBytesRcvd);
replicaHandler = datanode.data.append(block, newGs, minBytesRcvd);
if (datanode.blockScanner != null) { // remove from block scanner
datanode.blockScanner.deleteBlock(block.getBlockPoolId(),
block.getLocalBlock());
}
block.setGenerationStamp(newGs);
datanode.notifyNamenodeReceivingBlock(
block, replicaInfo.getStorageUuid());
block, replicaHandler.getReplica().getStorageUuid());
break;
case PIPELINE_SETUP_APPEND_RECOVERY:
replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
replicaHandler = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
if (datanode.blockScanner != null) { // remove from block scanner
datanode.blockScanner.deleteBlock(block.getBlockPoolId(),
block.getLocalBlock());
}
block.setGenerationStamp(newGs);
datanode.notifyNamenodeReceivingBlock(
block, replicaInfo.getStorageUuid());
block, replicaHandler.getReplica().getStorageUuid());
break;
case TRANSFER_RBW:
case TRANSFER_FINALIZED:
// this is a transfer destination
replicaInfo = datanode.data.createTemporary(storageType, block);
replicaHandler =
datanode.data.createTemporary(storageType, block);
break;
default: throw new IOException("Unsupported stage " + stage +
" while receiving block " + block + " from " + inAddr);
}
}
replicaInfo = replicaHandler.getReplica();
this.dropCacheBehindWrites = (cachingStrategy.getDropBehind() == null) ?
datanode.getDnConf().dropCacheBehindWrites :
cachingStrategy.getDropBehind();
@ -339,6 +340,10 @@ class BlockReceiver implements Closeable {
finally{
IOUtils.closeStream(out);
}
if (replicaHandler != null) {
IOUtils.cleanup(null, replicaHandler);
replicaHandler = null;
}
if (measuredFlushTime) {
datanode.metrics.addFlushNanos(flushTotalNanos);
}
@ -950,15 +955,12 @@ class BlockReceiver implements Closeable {
//
byte[] buf = new byte[sizePartialChunk];
byte[] crcbuf = new byte[checksumSize];
ReplicaInputStreams instr = null;
try {
instr = datanode.data.getTmpInputStreams(block, blkoff, ckoff);
try (ReplicaInputStreams instr =
datanode.data.getTmpInputStreams(block, blkoff, ckoff)) {
IOUtils.readFully(instr.getDataIn(), buf, 0, sizePartialChunk);
// open meta file and read in crc value computer earlier
IOUtils.readFully(instr.getChecksumIn(), crcbuf, 0, crcbuf.length);
} finally {
IOUtils.closeStream(instr);
}
// compute crc of partial chunk from data read in the block file.
@ -1244,28 +1246,7 @@ class BlockReceiver implements Closeable {
if (lastPacketInBlock) {
// Finalize the block and close the block file
try {
finalizeBlock(startTime);
} catch (ReplicaNotFoundException e) {
// Verify that the exception is due to volume removal.
FsVolumeSpi volume;
synchronized (datanode.data) {
volume = datanode.data.getVolume(block);
}
if (volume == null) {
// ReplicaInfo has been removed due to the corresponding data
// volume has been removed. Don't need to check disk error.
LOG.info(myString
+ ": BlockReceiver is interrupted because the block pool "
+ block.getBlockPoolId() + " has been removed.", e);
sendAckUpstream(ack, expected, totalAckTimeNanos, 0,
Status.OOB_INTERRUPTED);
running = false;
receiverThread.interrupt();
continue;
}
throw e;
}
finalizeBlock(startTime);
}
sendAckUpstream(ack, expected, totalAckTimeNanos,

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
@ -143,6 +144,8 @@ class BlockSender implements java.io.Closeable {
/** The file descriptor of the block being sent */
private FileDescriptor blockInFd;
/** The reference to the volume where the block is located */
private FsVolumeReference volumeRef;
// Cache-management related fields
private final long readaheadLength;
@ -257,6 +260,9 @@ class BlockSender implements java.io.Closeable {
this.transferToAllowed = datanode.getDnConf().transferToAllowed &&
(!is32Bit || length <= Integer.MAX_VALUE);
// Obtain a reference before reading data
this.volumeRef = datanode.data.getVolume(block).obtainReference();
/*
* (corruptChecksumOK, meta_file_exist): operation
* True, True: will verify checksum
@ -420,6 +426,10 @@ class BlockSender implements java.io.Closeable {
blockIn = null;
blockInFd = null;
}
if (volumeRef != null) {
IOUtils.cleanup(null, volumeRef);
volumeRef = null;
}
// throw IOException if there is any
if(ioe!= null) {
throw ioe;

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import java.io.Closeable;
import java.io.IOException;
/**
* This class includes a replica being actively written and the reference to
* the fs volume where this replica is located.
*/
public class ReplicaHandler implements Closeable {
private final ReplicaInPipelineInterface replica;
private final FsVolumeReference volumeReference;
public ReplicaHandler(
ReplicaInPipelineInterface replica, FsVolumeReference reference) {
this.replica = replica;
this.volumeReference = reference;
}
@Override
public void close() throws IOException {
if (this.volumeReference != null) {
volumeReference.close();
}
}
public ReplicaInPipelineInterface getReplica() {
return replica;
}
}

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
@ -198,7 +199,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* @return the meta info of the replica which is being written to
* @throws IOException if an error occurs
*/
public ReplicaInPipelineInterface createTemporary(StorageType storageType,
public ReplicaHandler createTemporary(StorageType storageType,
ExtendedBlock b) throws IOException;
/**
@ -208,7 +209,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* @return the meta info of the replica which is being written to
* @throws IOException if an error occurs
*/
public ReplicaInPipelineInterface createRbw(StorageType storageType,
public ReplicaHandler createRbw(StorageType storageType,
ExtendedBlock b, boolean allowLazyPersist) throws IOException;
/**
@ -221,7 +222,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* @return the meta info of the replica which is being written to
* @throws IOException if an error occurs
*/
public ReplicaInPipelineInterface recoverRbw(ExtendedBlock b,
public ReplicaHandler recoverRbw(ExtendedBlock b,
long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException;
/**
@ -241,7 +242,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* @return the meata info of the replica which is being written to
* @throws IOException
*/
public ReplicaInPipelineInterface append(ExtendedBlock b, long newGS,
public ReplicaHandler append(ExtendedBlock b, long newGS,
long expectedBlockLen) throws IOException;
/**
@ -254,8 +255,8 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* @return the meta info of the replica which is being written to
* @throws IOException
*/
public ReplicaInPipelineInterface recoverAppend(ExtendedBlock b, long newGS,
long expectedBlockLen) throws IOException;
public ReplicaHandler recoverAppend(
ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException;
/**
* Recover a failed pipeline close

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.Closeable;
import java.io.IOException;
/**
* This is the interface for holding reference count as AutoClosable resource.
* It increases the reference count by one in the constructor, and decreases
* the reference count by one in {@link #close()}.
*
* <pre>
* {@code
* try (FsVolumeReference ref = volume.obtainReference()) {
* // Do IOs on the volume
* volume.createRwb(...);
* ...
* }
* }
* </pre>
*/
public interface FsVolumeReference extends Closeable {
/**
* Descrese the reference count of the volume.
* @throws IOException it never throws IOException.
*/
@Override
public void close() throws IOException;
/** Returns the underlying volume object */
public FsVolumeSpi getVolume();
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.File;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import org.apache.hadoop.hdfs.StorageType;
@ -26,6 +27,15 @@ import org.apache.hadoop.hdfs.StorageType;
* This is an interface for the underlying volume.
*/
public interface FsVolumeSpi {
/**
* Obtain a reference object that had increased 1 reference count of the
* volume.
*
* It is caller's responsibility to close {@link FsVolumeReference} to decrease
* the reference count on the volume.
*/
FsVolumeReference obtainReference() throws ClosedChannelException;
/** @return the StorageUuid of the volume */
public String getStorageID();

View File

@ -30,9 +30,12 @@ import org.apache.hadoop.io.IOUtils;
public class ReplicaInputStreams implements Closeable {
private final InputStream dataIn;
private final InputStream checksumIn;
private final FsVolumeReference volumeRef;
/** Create an object with a data input stream and a checksum input stream. */
public ReplicaInputStreams(FileDescriptor dataFd, FileDescriptor checksumFd) {
public ReplicaInputStreams(FileDescriptor dataFd, FileDescriptor checksumFd,
FsVolumeReference volumeRef) {
this.volumeRef = volumeRef;
this.dataIn = new FileInputStream(dataFd);
this.checksumIn = new FileInputStream(checksumFd);
}
@ -51,5 +54,6 @@ public class ReplicaInputStreams implements Closeable {
public void close() {
IOUtils.closeStream(dataIn);
IOUtils.closeStream(checksumIn);
IOUtils.cleanup(null, volumeRef);
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.File;
import java.io.FileDescriptor;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
@ -31,7 +32,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIOException;
@ -200,13 +203,13 @@ class FsDatasetAsyncDiskService {
* Delete the block file and meta file from the disk asynchronously, adjust
* dfsUsed statistics accordingly.
*/
void deleteAsync(FsVolumeImpl volume, File blockFile, File metaFile,
void deleteAsync(FsVolumeReference volumeRef, File blockFile, File metaFile,
ExtendedBlock block, String trashDirectory) {
LOG.info("Scheduling " + block.getLocalBlock()
+ " file " + blockFile + " for deletion");
ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(
volume, blockFile, metaFile, block, trashDirectory);
execute(volume.getCurrentDir(), deletionTask);
volumeRef, blockFile, metaFile, block, trashDirectory);
execute(((FsVolumeImpl) volumeRef.getVolume()).getCurrentDir(), deletionTask);
}
/** A task for deleting a block file and its associated meta file, as well
@ -216,15 +219,17 @@ class FsDatasetAsyncDiskService {
* files are deleted immediately.
*/
class ReplicaFileDeleteTask implements Runnable {
final FsVolumeReference volumeRef;
final FsVolumeImpl volume;
final File blockFile;
final File metaFile;
final ExtendedBlock block;
final String trashDirectory;
ReplicaFileDeleteTask(FsVolumeImpl volume, File blockFile,
ReplicaFileDeleteTask(FsVolumeReference volumeRef, File blockFile,
File metaFile, ExtendedBlock block, String trashDirectory) {
this.volume = volume;
this.volumeRef = volumeRef;
this.volume = (FsVolumeImpl) volumeRef.getVolume();
this.blockFile = blockFile;
this.metaFile = metaFile;
this.block = block;
@ -281,6 +286,7 @@ class FsDatasetAsyncDiskService {
LOG.info("Deleted " + block.getBlockPoolId() + " "
+ block.getLocalBlock() + " file " + blockFile);
}
IOUtils.cleanup(null, volumeRef);
}
}
}

View File

@ -29,6 +29,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collection;
@ -76,6 +77,7 @@ import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
@ -83,6 +85,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
@ -138,22 +141,26 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public StorageReport[] getStorageReports(String bpid)
throws IOException {
StorageReport[] reports;
List<StorageReport> reports;
synchronized (statsLock) {
List<FsVolumeImpl> curVolumes = getVolumes();
reports = new StorageReport[curVolumes.size()];
int i = 0;
reports = new ArrayList<>(curVolumes.size());
for (FsVolumeImpl volume : curVolumes) {
reports[i++] = new StorageReport(volume.toDatanodeStorage(),
false,
volume.getCapacity(),
volume.getDfsUsed(),
volume.getAvailable(),
volume.getBlockPoolUsed(bpid));
try (FsVolumeReference ref = volume.obtainReference()) {
StorageReport sr = new StorageReport(volume.toDatanodeStorage(),
false,
volume.getCapacity(),
volume.getDfsUsed(),
volume.getAvailable(),
volume.getBlockPoolUsed(bpid));
reports.add(sr);
} catch (ClosedChannelException e) {
continue;
}
}
}
return reports;
return reports.toArray(new StorageReport[reports.size()]);
}
@Override
@ -625,17 +632,24 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
long blkOffset, long ckoff) throws IOException {
ReplicaInfo info = getReplicaInfo(b);
File blockFile = info.getBlockFile();
RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
if (blkOffset > 0) {
blockInFile.seek(blkOffset);
FsVolumeReference ref = info.getVolume().obtainReference();
try {
File blockFile = info.getBlockFile();
RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
if (blkOffset > 0) {
blockInFile.seek(blkOffset);
}
File metaFile = info.getMetaFile();
RandomAccessFile metaInFile = new RandomAccessFile(metaFile, "r");
if (ckoff > 0) {
metaInFile.seek(ckoff);
}
return new ReplicaInputStreams(
blockInFile.getFD(), metaInFile.getFD(), ref);
} catch (IOException e) {
IOUtils.cleanup(null, ref);
throw e;
}
File metaFile = info.getMetaFile();
RandomAccessFile metaInFile = new RandomAccessFile(metaFile, "r");
if (ckoff > 0) {
metaInFile.seek(ckoff);
}
return new ReplicaInputStreams(blockInFile.getFD(), metaInFile.getFD());
}
static File moveBlockFiles(Block b, File srcfile, File destdir)
@ -729,26 +743,27 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
+ replicaInfo.getVolume().getStorageType());
}
FsVolumeImpl targetVolume = volumes.getNextVolume(targetStorageType,
block.getNumBytes());
File oldBlockFile = replicaInfo.getBlockFile();
File oldMetaFile = replicaInfo.getMetaFile();
try (FsVolumeReference volumeRef = volumes.getNextVolume(
targetStorageType, block.getNumBytes())) {
File oldBlockFile = replicaInfo.getBlockFile();
File oldMetaFile = replicaInfo.getMetaFile();
FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume();
// Copy files to temp dir first
File[] blockFiles = copyBlockFiles(block.getBlockId(),
block.getGenerationStamp(), oldMetaFile, oldBlockFile,
targetVolume.getTmpDir(block.getBlockPoolId()),
replicaInfo.isOnTransientStorage());
// Copy files to temp dir first
File[] blockFiles = copyBlockFiles(block.getBlockId(),
block.getGenerationStamp(), oldMetaFile, oldBlockFile,
targetVolume.getTmpDir(block.getBlockPoolId()),
replicaInfo.isOnTransientStorage());
ReplicaInfo newReplicaInfo = new ReplicaInPipeline(
replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(),
targetVolume, blockFiles[0].getParentFile(), 0);
newReplicaInfo.setNumBytes(blockFiles[1].length());
// Finalize the copied files
newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
ReplicaInfo newReplicaInfo = new ReplicaInPipeline(
replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(),
targetVolume, blockFiles[0].getParentFile(), 0);
newReplicaInfo.setNumBytes(blockFiles[1].length());
// Finalize the copied files
newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
removeOldReplica(replicaInfo, newReplicaInfo, oldBlockFile, oldMetaFile,
oldBlockFile.length(), oldMetaFile.length(), block.getBlockPoolId());
removeOldReplica(replicaInfo, newReplicaInfo, oldBlockFile, oldMetaFile,
oldBlockFile.length(), oldMetaFile.length(), block.getBlockPoolId());
}
// Replace the old block if any to reschedule the scanning.
datanode.getBlockScanner().addBlock(block);
@ -867,7 +882,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public synchronized ReplicaInPipeline append(ExtendedBlock b,
public synchronized ReplicaHandler append(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException {
// If the block was successfully finalized because all packets
// were successfully processed at the Datanode but the ack for
@ -892,8 +907,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
" expected length is " + expectedBlockLen);
}
return append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS,
b.getNumBytes());
FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
ReplicaBeingWritten replica = null;
try {
replica = append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS,
b.getNumBytes());
} catch (IOException e) {
IOUtils.cleanup(null, ref);
throw e;
}
return new ReplicaHandler(replica, ref);
}
/** Append to a finalized replica
@ -1014,22 +1037,30 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return replicaInfo;
}
@Override // FsDatasetSpi
public synchronized ReplicaInPipeline recoverAppend(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException {
public synchronized ReplicaHandler recoverAppend(
ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
LOG.info("Recover failed append to " + b);
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
// change the replica's state/gs etc.
if (replicaInfo.getState() == ReplicaState.FINALIZED ) {
return append(b.getBlockPoolId(), (FinalizedReplica) replicaInfo, newGS,
b.getNumBytes());
} else { //RBW
bumpReplicaGS(replicaInfo, newGS);
return (ReplicaBeingWritten)replicaInfo;
FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
ReplicaBeingWritten replica;
try {
// change the replica's state/gs etc.
if (replicaInfo.getState() == ReplicaState.FINALIZED) {
replica = append(b.getBlockPoolId(), (FinalizedReplica) replicaInfo,
newGS, b.getNumBytes());
} else { //RBW
bumpReplicaGS(replicaInfo, newGS);
replica = (ReplicaBeingWritten) replicaInfo;
}
} catch (IOException e) {
IOUtils.cleanup(null, ref);
throw e;
}
return new ReplicaHandler(replica, ref);
}
@Override // FsDatasetSpi
@ -1077,8 +1108,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
@Override // FsDatasetSpi
public synchronized ReplicaInPipeline createRbw(StorageType storageType,
ExtendedBlock b, boolean allowLazyPersist) throws IOException {
public synchronized ReplicaHandler createRbw(
StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
throws IOException {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getBlockId());
if (replicaInfo != null) {
@ -1087,15 +1119,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
" and thus cannot be created.");
}
// create a new block
FsVolumeImpl v;
FsVolumeReference ref;
while (true) {
try {
if (allowLazyPersist) {
// First try to place the block on a transient volume.
v = volumes.getNextTransientVolume(b.getNumBytes());
ref = volumes.getNextTransientVolume(b.getNumBytes());
datanode.getMetrics().incrRamDiskBlocksWrite();
} else {
v = volumes.getNextVolume(storageType, b.getNumBytes());
ref = volumes.getNextVolume(storageType, b.getNumBytes());
}
} catch (DiskOutOfSpaceException de) {
if (allowLazyPersist) {
@ -1107,18 +1139,25 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
break;
}
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
// create an rbw file to hold block in the designated volume
File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
File f;
try {
f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
} catch (IOException e) {
IOUtils.cleanup(null, ref);
throw e;
}
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes());
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
return newReplicaInfo;
return new ReplicaHandler(newReplicaInfo, ref);
}
@Override // FsDatasetSpi
public synchronized ReplicaInPipeline recoverRbw(ExtendedBlock b,
long newGS, long minBytesRcvd, long maxBytesRcvd)
public synchronized ReplicaHandler recoverRbw(
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException {
LOG.info("Recover RBW replica " + b);
@ -1157,20 +1196,25 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
minBytesRcvd + ", " + maxBytesRcvd + "].");
}
// Truncate the potentially corrupt portion.
// If the source was client and the last node in the pipeline was lost,
// any corrupt data written after the acked length can go unnoticed.
if (numBytes > bytesAcked) {
final File replicafile = rbw.getBlockFile();
truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked);
rbw.setNumBytes(bytesAcked);
rbw.setLastChecksumAndDataLen(bytesAcked, null);
}
FsVolumeReference ref = rbw.getVolume().obtainReference();
try {
// Truncate the potentially corrupt portion.
// If the source was client and the last node in the pipeline was lost,
// any corrupt data written after the acked length can go unnoticed.
if (numBytes > bytesAcked) {
final File replicafile = rbw.getBlockFile();
truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked);
rbw.setNumBytes(bytesAcked);
rbw.setLastChecksumAndDataLen(bytesAcked, null);
}
// bump the replica's generation stamp to newGS
bumpReplicaGS(rbw, newGS);
return rbw;
// bump the replica's generation stamp to newGS
bumpReplicaGS(rbw, newGS);
} catch (IOException e) {
IOUtils.cleanup(null, ref);
throw e;
}
return new ReplicaHandler(rbw, ref);
}
@Override // FsDatasetSpi
@ -1235,8 +1279,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
@Override // FsDatasetSpi
public synchronized ReplicaInPipeline createTemporary(StorageType storageType,
ExtendedBlock b) throws IOException {
public synchronized ReplicaHandler createTemporary(
StorageType storageType, ExtendedBlock b) throws IOException {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId());
if (replicaInfo != null) {
if (replicaInfo.getGenerationStamp() < b.getGenerationStamp()
@ -1251,14 +1295,22 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
" and thus cannot be created.");
}
}
FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes());
FsVolumeReference ref = volumes.getNextVolume(storageType, b.getNumBytes());
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
// create a temporary file to hold block in the designated volume
File f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
File f;
try {
f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
} catch (IOException e) {
IOUtils.cleanup(null, ref);
throw e;
}
ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(),
b.getGenerationStamp(), v, f.getParentFile(), 0);
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
return newReplicaInfo;
return new ReplicaHandler(newReplicaInfo, ref);
}
/**
@ -1641,10 +1693,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Delete the block asynchronously to make sure we can do it fast enough.
// It's ok to unlink the block file before the uncache operation
// finishes.
asyncDiskService.deleteAsync(v, f,
FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
new ExtendedBlock(bpid, invalidBlks[i]),
dataStorage.getTrashDirectoryForBlockFile(bpid, f));
try {
asyncDiskService.deleteAsync(v.obtainReference(), f,
FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
new ExtendedBlock(bpid, invalidBlks[i]),
dataStorage.getTrashDirectoryForBlockFile(bpid, f));
} catch (ClosedChannelException e) {
LOG.warn("Volume " + v + " is closed, ignore the deletion task for " +
"block " + invalidBlks[i]);
}
}
if (!errors.isEmpty()) {
StringBuilder b = new StringBuilder("Failed to delete ")
@ -2291,9 +2348,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
for (FsVolumeImpl volume : getVolumes()) {
long used = 0;
long free = 0;
try {
try (FsVolumeReference ref = volume.obtainReference()) {
used = volume.getDfsUsed();
free = volume.getAvailable();
} catch (ClosedChannelException e) {
continue;
} catch (IOException e) {
LOG.warn(e.getMessage());
used = 0;
@ -2325,15 +2384,23 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
List<FsVolumeImpl> curVolumes = getVolumes();
if (!force) {
for (FsVolumeImpl volume : curVolumes) {
if (!volume.isBPDirEmpty(bpid)) {
LOG.warn(bpid + " has some block files, cannot delete unless forced");
throw new IOException("Cannot delete block pool, "
+ "it contains some block files");
try (FsVolumeReference ref = volume.obtainReference()) {
if (!volume.isBPDirEmpty(bpid)) {
LOG.warn(bpid + " has some block files, cannot delete unless forced");
throw new IOException("Cannot delete block pool, "
+ "it contains some block files");
}
} catch (ClosedChannelException e) {
// ignore.
}
}
}
for (FsVolumeImpl volume : curVolumes) {
volume.deleteBPDirectories(bpid, force);
try (FsVolumeReference ref = volume.obtainReference()) {
volume.deleteBPDirectories(bpid, force);
} catch (ClosedChannelException e) {
// ignore.
}
}
}
@ -2566,6 +2633,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
*/
private boolean saveNextReplica() {
RamDiskReplica block = null;
FsVolumeReference targetReference;
FsVolumeImpl targetVolume;
ReplicaInfo replicaInfo;
boolean succeeded = false;
@ -2583,8 +2651,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
if (replicaInfo != null &&
replicaInfo.getVolume().isTransientStorage()) {
// Pick a target volume to persist the block.
targetVolume = volumes.getNextVolume(
targetReference = volumes.getNextVolume(
StorageType.DEFAULT, replicaInfo.getNumBytes());
targetVolume = (FsVolumeImpl) targetReference.getVolume();
ramDiskReplicaTracker.recordStartLazyPersist(
block.getBlockPoolId(), block.getBlockId(), targetVolume);
@ -2600,7 +2669,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
block.getBlockPoolId(), block.getBlockId(),
replicaInfo.getGenerationStamp(), block.getCreationTime(),
replicaInfo.getMetaFile(), replicaInfo.getBlockFile(),
targetVolume);
targetReference);
}
}
}
@ -2624,9 +2693,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Don't worry about fragmentation for now. We don't expect more than one
// transient volume per DN.
for (FsVolumeImpl v : getVolumes()) {
if (v.isTransientStorage()) {
capacity += v.getCapacity();
free += v.getAvailable();
try (FsVolumeReference ref = v.obtainReference()) {
if (v.isTransientStorage()) {
capacity += v.getCapacity();
free += v.getAvailable();
}
} catch (ClosedChannelException e) {
// ignore.
}
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.File;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@ -31,6 +32,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
@ -40,8 +43,10 @@ import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.util.CloseableReferenceCount;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -62,6 +67,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
private final File currentDir; // <StorageDirectory>/current
private final DF usage;
private final long reserved;
private CloseableReferenceCount reference = new CloseableReferenceCount();
// Disk space reserved for open blocks.
private AtomicLong reservedForRbw;
@ -99,6 +105,10 @@ public class FsVolumeImpl implements FsVolumeSpi {
if (storageType.isTransient()) {
return null;
}
if (dataset.datanode == null) {
// FsVolumeImpl is used in test.
return null;
}
final int maxNumThreads = dataset.datanode.getConf().getInt(
DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY,
@ -116,7 +126,114 @@ public class FsVolumeImpl implements FsVolumeSpi {
executor.allowCoreThreadTimeOut(true);
return executor;
}
private void printReferenceTraceInfo(String op) {
StackTraceElement[] stack = Thread.currentThread().getStackTrace();
for (StackTraceElement ste : stack) {
switch (ste.getMethodName()) {
case "getDfsUsed":
case "getBlockPoolUsed":
case "getAvailable":
case "getVolumeMap":
return;
default:
break;
}
}
FsDatasetImpl.LOG.trace("Reference count: " + op + " " + this + ": " +
this.reference.getReferenceCount());
FsDatasetImpl.LOG.trace(
Joiner.on("\n").join(Thread.currentThread().getStackTrace()));
}
/**
* Increase the reference count. The caller must increase the reference count
* before issuing IOs.
*
* @throws IOException if the volume is already closed.
*/
private void reference() throws ClosedChannelException {
this.reference.reference();
if (FsDatasetImpl.LOG.isTraceEnabled()) {
printReferenceTraceInfo("incr");
}
}
/**
* Decrease the reference count.
*/
private void unreference() {
if (FsDatasetImpl.LOG.isTraceEnabled()) {
printReferenceTraceInfo("desc");
}
if (FsDatasetImpl.LOG.isDebugEnabled()) {
if (reference.getReferenceCount() <= 0) {
FsDatasetImpl.LOG.debug("Decrease reference count <= 0 on " + this +
Joiner.on("\n").join(Thread.currentThread().getStackTrace()));
}
}
checkReference();
this.reference.unreference();
}
private static class FsVolumeReferenceImpl implements FsVolumeReference {
private final FsVolumeImpl volume;
FsVolumeReferenceImpl(FsVolumeImpl volume) throws ClosedChannelException {
this.volume = volume;
volume.reference();
}
/**
* Decreases the reference count.
* @throws IOException it never throws IOException.
*/
@Override
public void close() throws IOException {
volume.unreference();
}
@Override
public FsVolumeSpi getVolume() {
return this.volume;
}
}
@Override
public FsVolumeReference obtainReference() throws ClosedChannelException {
return new FsVolumeReferenceImpl(this);
}
private void checkReference() {
Preconditions.checkState(reference.getReferenceCount() > 0);
}
/**
* Close this volume and wait all other threads to release the reference count
* on this volume.
* @throws IOException if the volume is closed or the waiting is interrupted.
*/
void closeAndWait() throws IOException {
try {
this.reference.setClosed();
} catch (ClosedChannelException e) {
throw new IOException("The volume has already closed.", e);
}
final int SLEEP_MILLIS = 500;
while (this.reference.getReferenceCount() > 0) {
if (FsDatasetImpl.LOG.isDebugEnabled()) {
FsDatasetImpl.LOG.debug(String.format(
"The reference count for %s is %d, wait to be 0.",
this, reference.getReferenceCount()));
}
try {
Thread.sleep(SLEEP_MILLIS);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
}
File getCurrentDir() {
return currentDir;
}
@ -250,6 +367,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
* the block is finalized.
*/
File createTmpFile(String bpid, Block b) throws IOException {
checkReference();
return getBlockPoolSlice(bpid).createTmpFile(b);
}
@ -282,6 +400,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
* the block is finalized.
*/
File createRbwFile(String bpid, Block b) throws IOException {
checkReference();
reserveSpaceForRbw(b.getNumBytes());
return getBlockPoolSlice(bpid).createRbwFile(b);
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.File;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -29,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@ -59,6 +61,21 @@ class FsVolumeList {
return Collections.unmodifiableList(Arrays.asList(volumes.get()));
}
private FsVolumeReference chooseVolume(List<FsVolumeImpl> list, long blockSize)
throws IOException {
while (true) {
FsVolumeImpl volume = blockChooser.chooseVolume(list, blockSize);
try {
return volume.obtainReference();
} catch (ClosedChannelException e) {
FsDatasetImpl.LOG.warn("Chosen a closed volume: " + volume);
// blockChooser.chooseVolume returns DiskOutOfSpaceException when the list
// is empty, indicating that all volumes are closed.
list.remove(volume);
}
}
}
/**
* Get next volume.
*
@ -66,7 +83,7 @@ class FsVolumeList {
* @param storageType the desired {@link StorageType}
* @return next volume to store the block in.
*/
FsVolumeImpl getNextVolume(StorageType storageType, long blockSize)
FsVolumeReference getNextVolume(StorageType storageType, long blockSize)
throws IOException {
// Get a snapshot of currently available volumes.
final FsVolumeImpl[] curVolumes = volumes.get();
@ -76,7 +93,7 @@ class FsVolumeList {
list.add(v);
}
}
return blockChooser.chooseVolume(list, blockSize);
return chooseVolume(list, blockSize);
}
/**
@ -85,7 +102,7 @@ class FsVolumeList {
* @param blockSize free space needed on the volume
* @return next volume to store the block in.
*/
FsVolumeImpl getNextTransientVolume(long blockSize) throws IOException {
FsVolumeReference getNextTransientVolume(long blockSize) throws IOException {
// Get a snapshot of currently available volumes.
final List<FsVolumeImpl> curVolumes = getVolumes();
final List<FsVolumeImpl> list = new ArrayList<>(curVolumes.size());
@ -94,13 +111,17 @@ class FsVolumeList {
list.add(v);
}
}
return blockChooser.chooseVolume(list, blockSize);
return chooseVolume(list, blockSize);
}
long getDfsUsed() throws IOException {
long dfsUsed = 0L;
for (FsVolumeImpl v : volumes.get()) {
dfsUsed += v.getDfsUsed();
try(FsVolumeReference ref = v.obtainReference()) {
dfsUsed += v.getDfsUsed();
} catch (ClosedChannelException e) {
// ignore.
}
}
return dfsUsed;
}
@ -108,7 +129,11 @@ class FsVolumeList {
long getBlockPoolUsed(String bpid) throws IOException {
long dfsUsed = 0L;
for (FsVolumeImpl v : volumes.get()) {
dfsUsed += v.getBlockPoolUsed(bpid);
try (FsVolumeReference ref = v.obtainReference()) {
dfsUsed += v.getBlockPoolUsed(bpid);
} catch (ClosedChannelException e) {
// ignore.
}
}
return dfsUsed;
}
@ -116,7 +141,11 @@ class FsVolumeList {
long getCapacity() {
long capacity = 0L;
for (FsVolumeImpl v : volumes.get()) {
capacity += v.getCapacity();
try (FsVolumeReference ref = v.obtainReference()) {
capacity += v.getCapacity();
} catch (IOException e) {
// ignore.
}
}
return capacity;
}
@ -124,7 +153,11 @@ class FsVolumeList {
long getRemaining() throws IOException {
long remaining = 0L;
for (FsVolumeSpi vol : volumes.get()) {
remaining += vol.getAvailable();
try (FsVolumeReference ref = vol.obtainReference()) {
remaining += vol.getAvailable();
} catch (ClosedChannelException e) {
// ignore
}
}
return remaining;
}
@ -140,7 +173,7 @@ class FsVolumeList {
for (final FsVolumeImpl v : volumes.get()) {
Thread t = new Thread() {
public void run() {
try {
try (FsVolumeReference ref = v.obtainReference()) {
FsDatasetImpl.LOG.info("Adding replicas to map for block pool " +
bpid + " on volume " + v + "...");
long startTime = Time.monotonicNow();
@ -148,6 +181,9 @@ class FsVolumeList {
long timeTaken = Time.monotonicNow() - startTime;
FsDatasetImpl.LOG.info("Time to add replicas to map for block pool"
+ " " + bpid + " on volume " + v + ": " + timeTaken + "ms");
} catch (ClosedChannelException e) {
FsDatasetImpl.LOG.info("The volume " + v + " is closed while " +
"addng replicas, ignored.");
} catch (IOException ioe) {
FsDatasetImpl.LOG.info("Caught exception while adding replicas " +
"from " + v + ". Will throw later.", ioe);
@ -190,16 +226,21 @@ class FsVolumeList {
for(Iterator<FsVolumeImpl> i = volumeList.iterator(); i.hasNext(); ) {
final FsVolumeImpl fsv = i.next();
try {
try (FsVolumeReference ref = fsv.obtainReference()) {
fsv.checkDirs();
} catch (DiskErrorException e) {
FsDatasetImpl.LOG.warn("Removing failed volume " + fsv + ": ",e);
FsDatasetImpl.LOG.warn("Removing failed volume " + fsv + ": ", e);
if (removedVols == null) {
removedVols = new ArrayList<FsVolumeImpl>(1);
removedVols = new ArrayList<>(1);
}
removedVols.add(fsv);
removeVolume(fsv);
numFailedVolumes++;
} catch (ClosedChannelException e) {
FsDatasetImpl.LOG.debug("Caught exception when obtaining " +
"reference count on closed volume", e);
} catch (IOException e) {
FsDatasetImpl.LOG.error("Unexpected IOException", e);
}
}
@ -222,7 +263,6 @@ class FsVolumeList {
* @param newVolume the instance of new FsVolumeImpl.
*/
void addVolume(FsVolumeImpl newVolume) {
// Make a copy of volumes to add new volumes.
while (true) {
final FsVolumeImpl[] curVolumes = volumes.get();
final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
@ -253,6 +293,12 @@ class FsVolumeList {
if (volumeList.remove(target)) {
if (volumes.compareAndSet(curVolumes,
volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) {
try {
target.closeAndWait();
} catch (IOException e) {
FsDatasetImpl.LOG.warn(
"Error occurs when waiting volume to close: " + target, e);
}
target.shutdown();
FsDatasetImpl.LOG.info("Removed volume: " + target);
break;
@ -302,7 +348,7 @@ class FsVolumeList {
for (final FsVolumeImpl v : volumes.get()) {
Thread t = new Thread() {
public void run() {
try {
try (FsVolumeReference ref = v.obtainReference()) {
FsDatasetImpl.LOG.info("Scanning block pool " + bpid +
" on volume " + v + "...");
long startTime = Time.monotonicNow();
@ -310,6 +356,8 @@ class FsVolumeList {
long timeTaken = Time.monotonicNow() - startTime;
FsDatasetImpl.LOG.info("Time taken to scan block pool " + bpid +
" on " + v + ": " + timeTaken + "ms");
} catch (ClosedChannelException e) {
// ignore.
} catch (IOException ioe) {
FsDatasetImpl.LOG.info("Caught exception while scanning " + v +
". Will throw later.", ioe);

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import java.io.File;
import java.io.IOException;
@ -175,13 +176,14 @@ class RamDiskAsyncLazyPersistService {
void submitLazyPersistTask(String bpId, long blockId,
long genStamp, long creationTime,
File metaFile, File blockFile,
FsVolumeImpl targetVolume) throws IOException {
FsVolumeReference target) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("LazyWriter schedule async task to persist RamDisk block pool id: "
+ bpId + " block id: " + blockId);
}
File lazyPersistDir = targetVolume.getLazyPersistDir(bpId);
FsVolumeImpl volume = (FsVolumeImpl)target.getVolume();
File lazyPersistDir = volume.getLazyPersistDir(bpId);
if (!lazyPersistDir.exists() && !lazyPersistDir.mkdirs()) {
FsDatasetImpl.LOG.warn("LazyWriter failed to create " + lazyPersistDir);
throw new IOException("LazyWriter fail to find or create lazy persist dir: "
@ -190,8 +192,8 @@ class RamDiskAsyncLazyPersistService {
ReplicaLazyPersistTask lazyPersistTask = new ReplicaLazyPersistTask(
bpId, blockId, genStamp, creationTime, blockFile, metaFile,
targetVolume, lazyPersistDir);
execute(targetVolume.getCurrentDir(), lazyPersistTask);
target, lazyPersistDir);
execute(volume.getCurrentDir(), lazyPersistTask);
}
class ReplicaLazyPersistTask implements Runnable {
@ -201,13 +203,13 @@ class RamDiskAsyncLazyPersistService {
final long creationTime;
final File blockFile;
final File metaFile;
final FsVolumeImpl targetVolume;
final FsVolumeReference targetVolume;
final File lazyPersistDir;
ReplicaLazyPersistTask(String bpId, long blockId,
long genStamp, long creationTime,
File blockFile, File metaFile,
FsVolumeImpl targetVolume, File lazyPersistDir) {
FsVolumeReference targetVolume, File lazyPersistDir) {
this.bpId = bpId;
this.blockId = blockId;
this.genStamp = genStamp;
@ -236,7 +238,7 @@ class RamDiskAsyncLazyPersistService {
// Lock FsDataSetImpl during onCompleteLazyPersist callback
datanode.getFSDataset().onCompleteLazyPersist(bpId, blockId,
creationTime, targetFiles, targetVolume);
creationTime, targetFiles, (FsVolumeImpl)targetVolume.getVolume());
succeeded = true;
} catch (Exception e){
FsDatasetImpl.LOG.warn(

View File

@ -220,7 +220,7 @@ enum Status {
CHECKSUM_OK = 6;
ERROR_UNSUPPORTED = 7;
OOB_RESTART = 8; // Quick restart
OOB_INTERRUPTED = 9; // Interrupted
OOB_RESERVED1 = 9; // Reserved
OOB_RESERVED2 = 10; // Reserved
OOB_RESERVED3 = 11; // Reserved
IN_PROGRESS = 12;

View File

@ -97,7 +97,7 @@ public class TestWriteBlockGetsBlockLengthHint {
* correctly propagate the hint to FsDatasetSpi.
*/
@Override
public synchronized ReplicaInPipelineInterface createRbw(
public synchronized ReplicaHandler createRbw(
StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
throws IOException {
assertThat(b.getLocalBlock().getNumBytes(), is(EXPECTED_BLOCK_LENGTH));

View File

@ -22,6 +22,7 @@ import java.io.FileDescriptor;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -43,8 +44,8 @@ import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
@ -147,7 +148,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
oStream = null;
}
}
@Override
public String getStorageUuid() {
return storage.getStorageUuid();
@ -431,6 +432,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
this.storage = storage;
}
@Override
public FsVolumeReference obtainReference() throws ClosedChannelException {
return null;
}
@Override
public String getStorageID() {
return storage.getStorageUuid();
@ -780,8 +786,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
@Override // FsDatasetSpi
public synchronized ReplicaInPipelineInterface append(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException {
public synchronized ReplicaHandler append(
ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null || !binfo.isFinalized()) {
@ -789,12 +795,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
+ " is not valid, and cannot be appended to.");
}
binfo.unfinalizeBlock();
return binfo;
return new ReplicaHandler(binfo, null);
}
@Override // FsDatasetSpi
public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException {
public synchronized ReplicaHandler recoverAppend(
ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null) {
@ -807,7 +813,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
map.remove(b);
binfo.theBlock.setGenerationStamp(newGS);
map.put(binfo.theBlock, binfo);
return binfo;
return new ReplicaHandler(binfo, null);
}
@Override // FsDatasetSpi
@ -829,8 +835,9 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
@Override // FsDatasetSpi
public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b,
long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException {
public synchronized ReplicaHandler recoverRbw(
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
BInfo binfo = map.get(b.getLocalBlock());
if ( binfo == null) {
@ -844,18 +851,18 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
map.remove(b);
binfo.theBlock.setGenerationStamp(newGS);
map.put(binfo.theBlock, binfo);
return binfo;
return new ReplicaHandler(binfo, null);
}
@Override // FsDatasetSpi
public synchronized ReplicaInPipelineInterface createRbw(
public synchronized ReplicaHandler createRbw(
StorageType storageType, ExtendedBlock b,
boolean allowLazyPersist) throws IOException {
return createTemporary(storageType, b);
}
@Override // FsDatasetSpi
public synchronized ReplicaInPipelineInterface createTemporary(
public synchronized ReplicaHandler createTemporary(
StorageType storageType, ExtendedBlock b) throws IOException {
if (isValidBlock(b)) {
throw new ReplicaAlreadyExistsException("Block " + b +
@ -868,7 +875,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
BInfo binfo = new BInfo(b.getBlockPoolId(), b.getLocalBlock(), true);
map.put(binfo.theBlock, binfo);
return binfo;
return new ReplicaHandler(binfo, null);
}
synchronized InputStream getBlockInputStream(ExtendedBlock b

View File

@ -553,7 +553,7 @@ public class TestBlockRecovery {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(
StorageType.DEFAULT, block, false);
StorageType.DEFAULT, block, false).getReplica();
ReplicaOutputStreams streams = null;
try {
streams = replicaInfo.createStreams(true,

View File

@ -56,6 +56,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
@ -568,7 +570,7 @@ public class TestDataNodeHotSwapVolumes {
@Test(timeout=180000)
public void testRemoveVolumeBeingWritten()
throws InterruptedException, TimeoutException, ReconfigurationException,
IOException {
IOException, BrokenBarrierException {
// test against removing volumes on the different DataNode on the pipeline.
for (int i = 0; i < 3; i++) {
testRemoveVolumeBeingWrittenForDatanode(i);
@ -582,7 +584,7 @@ public class TestDataNodeHotSwapVolumes {
*/
private void testRemoveVolumeBeingWrittenForDatanode(int dataNodeIdx)
throws IOException, ReconfigurationException, TimeoutException,
InterruptedException {
InterruptedException, BrokenBarrierException {
// Starts DFS cluster with 3 DataNodes to form a pipeline.
startDFSCluster(1, 3);
@ -599,11 +601,27 @@ public class TestDataNodeHotSwapVolumes {
out.write(writeBuf);
out.hflush();
List<String> oldDirs = getDataDirs(dn);
String newDirs = oldDirs.get(1); // Remove the first volume.
dn.reconfigurePropertyImpl(
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs);
final CyclicBarrier barrier = new CyclicBarrier(2);
List<String> oldDirs = getDataDirs(dn);
final String newDirs = oldDirs.get(1); // Remove the first volume.
final List<Exception> exceptions = new ArrayList<>();
Thread reconfigThread = new Thread() {
public void run() {
try {
barrier.await();
dn.reconfigurePropertyImpl(
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs);
} catch (ReconfigurationException |
InterruptedException |
BrokenBarrierException e) {
exceptions.add(e);
}
}
};
reconfigThread.start();
barrier.await();
rb.nextBytes(writeBuf);
out.write(writeBuf);
out.hflush();
@ -614,5 +632,10 @@ public class TestDataNodeHotSwapVolumes {
// Read the content back
byte[] content = DFSTestUtil.readFileBuffer(fs, testFile);
assertEquals(BLOCK_SIZE, content.length);
reconfigThread.join();
if (!exceptions.isEmpty()) {
throw new IOException(exceptions.get(0).getCause());
}
}
}

View File

@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.util.LinkedList;
import java.util.List;
@ -44,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.io.IOUtils;
@ -539,7 +541,12 @@ public class TestDirectoryScanner {
public String[] getBlockPoolList() {
return new String[0];
}
@Override
public FsVolumeReference obtainReference() throws ClosedChannelException {
return null;
}
@Override
public long getAvailable() throws IOException {
return 0;

View File

@ -67,7 +67,7 @@ public class TestSimulatedFSDataset {
// we pass expected len as zero, - fsdataset should use the sizeof actual
// data written
ReplicaInPipelineInterface bInfo = fsdataset.createRbw(
StorageType.DEFAULT, b, false);
StorageType.DEFAULT, b, false).getReplica();
ReplicaOutputStreams out = bInfo.createStreams(true,
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
try {

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@ -164,14 +165,16 @@ public class TestFsDatasetImpl {
assertEquals(actualVolumes, expectedVolumes);
}
@Test
@Test(timeout = 30000)
public void testRemoveVolumes() throws IOException {
// Feed FsDataset with block metadata.
final int NUM_BLOCKS = 100;
for (int i = 0; i < NUM_BLOCKS; i++) {
String bpid = BLOCK_POOL_IDS[NUM_BLOCKS % BLOCK_POOL_IDS.length];
ExtendedBlock eb = new ExtendedBlock(bpid, i);
dataset.createRbw(StorageType.DEFAULT, eb, false);
try (ReplicaHandler replica =
dataset.createRbw(StorageType.DEFAULT, eb, false)) {
}
}
final String[] dataDirs =
conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(",");

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
@ -148,7 +149,8 @@ public class TestWriteToReplica {
};
ReplicaMap replicasMap = dataSet.volumeMap;
FsVolumeImpl vol = dataSet.volumes.getNextVolume(StorageType.DEFAULT, 0);
FsVolumeImpl vol = (FsVolumeImpl) dataSet.volumes
.getNextVolume(StorageType.DEFAULT, 0).getVolume();
ReplicaInfo replicaInfo = new FinalizedReplica(
blocks[FINALIZED].getLocalBlock(), vol, vol.getCurrentDir().getParentFile());
replicasMap.add(bpid, replicaInfo);
@ -157,10 +159,10 @@ public class TestWriteToReplica {
replicasMap.add(bpid, new ReplicaInPipeline(
blocks[TEMPORARY].getBlockId(),
blocks[TEMPORARY].getGenerationStamp(), vol,
blocks[TEMPORARY].getGenerationStamp(), vol,
vol.createTmpFile(bpid, blocks[TEMPORARY].getLocalBlock()).getParentFile(), 0));
replicaInfo = new ReplicaBeingWritten(blocks[RBW].getLocalBlock(), vol,
replicaInfo = new ReplicaBeingWritten(blocks[RBW].getLocalBlock(), vol,
vol.createRbwFile(bpid, blocks[RBW].getLocalBlock()).getParentFile(), null);
replicasMap.add(bpid, replicaInfo);
replicaInfo.getBlockFile().createNewFile();
@ -489,8 +491,8 @@ public class TestWriteToReplica {
long newGenStamp = blocks[NON_EXISTENT].getGenerationStamp() * 10;
blocks[NON_EXISTENT].setGenerationStamp(newGenStamp);
try {
ReplicaInPipeline replicaInfo =
dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]);
ReplicaInPipelineInterface replicaInfo =
dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]).getReplica();
Assert.assertTrue(replicaInfo.getGenerationStamp() == newGenStamp);
Assert.assertTrue(
replicaInfo.getBlockId() == blocks[NON_EXISTENT].getBlockId());