HDFS-7830. DataNode does not release the volume lock when adding a volume fails. (Lei Xu via Colin P. McCabe)
(cherry picked from commit5c1036d598
) (cherry picked from commiteefca23e8c
)
This commit is contained in:
parent
9bd865f06d
commit
8241228db7
|
@ -818,6 +818,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
HDFS-7818. OffsetParam should return the default value instead of throwing
|
HDFS-7818. OffsetParam should return the default value instead of throwing
|
||||||
NPE when the value is unspecified. (Eric Payne via wheat9)
|
NPE when the value is unspecified. (Eric Payne via wheat9)
|
||||||
|
|
||||||
|
HDFS-7830. DataNode does not release the volume lock when adding a volume
|
||||||
|
fails. (Lei Xu via Colin P. Mccabe)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
|
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
|
||||||
|
|
||||||
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode
|
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode
|
||||||
|
|
|
@ -672,7 +672,7 @@ public abstract class Storage extends StorageInfo {
|
||||||
*/
|
*/
|
||||||
public void lock() throws IOException {
|
public void lock() throws IOException {
|
||||||
if (isShared()) {
|
if (isShared()) {
|
||||||
LOG.info("Locking is disabled");
|
LOG.info("Locking is disabled for " + this.root);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
FileLock newLock = tryLock();
|
FileLock newLock = tryLock();
|
||||||
|
|
|
@ -47,6 +47,7 @@ import javax.management.NotCompliantMBeanException;
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
import javax.management.StandardMBean;
|
import javax.management.StandardMBean;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
@ -377,6 +378,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
|
LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public FsVolumeImpl createFsVolume(String storageUuid, File currentDir,
|
||||||
|
StorageType storageType) throws IOException {
|
||||||
|
return new FsVolumeImpl(this, storageUuid, currentDir, conf, storageType);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addVolume(final StorageLocation location,
|
public void addVolume(final StorageLocation location,
|
||||||
final List<NamespaceInfo> nsInfos)
|
final List<NamespaceInfo> nsInfos)
|
||||||
|
@ -396,8 +403,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
final Storage.StorageDirectory sd = builder.getStorageDirectory();
|
final Storage.StorageDirectory sd = builder.getStorageDirectory();
|
||||||
|
|
||||||
StorageType storageType = location.getStorageType();
|
StorageType storageType = location.getStorageType();
|
||||||
final FsVolumeImpl fsVolume = new FsVolumeImpl(
|
final FsVolumeImpl fsVolume =
|
||||||
this, sd.getStorageUuid(), sd.getCurrentDir(), this.conf, storageType);
|
createFsVolume(sd.getStorageUuid(), sd.getCurrentDir(), storageType);
|
||||||
final ReplicaMap tempVolumeMap = new ReplicaMap(fsVolume);
|
final ReplicaMap tempVolumeMap = new ReplicaMap(fsVolume);
|
||||||
ArrayList<IOException> exceptions = Lists.newArrayList();
|
ArrayList<IOException> exceptions = Lists.newArrayList();
|
||||||
|
|
||||||
|
@ -413,6 +420,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!exceptions.isEmpty()) {
|
if (!exceptions.isEmpty()) {
|
||||||
|
try {
|
||||||
|
sd.unlock();
|
||||||
|
} catch (IOException e) {
|
||||||
|
exceptions.add(e);
|
||||||
|
}
|
||||||
throw MultipleIOException.createIOException(exceptions);
|
throw MultipleIOException.createIOException(exceptions);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,18 +37,14 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.RandomAccessFile;
|
|
||||||
import java.nio.channels.FileChannel;
|
|
||||||
import java.nio.channels.FileLock;
|
|
||||||
import java.nio.channels.OverlappingFileLockException;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -70,7 +66,6 @@ import static org.hamcrest.CoreMatchers.not;
|
||||||
import static org.hamcrest.core.Is.is;
|
import static org.hamcrest.core.Is.is;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotNull;
|
|
||||||
import static org.junit.Assert.assertThat;
|
import static org.junit.Assert.assertThat;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
@ -538,31 +533,10 @@ public class TestDataNodeHotSwapVolumes {
|
||||||
private static void assertFileLocksReleased(Collection<String> dirs)
|
private static void assertFileLocksReleased(Collection<String> dirs)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
for (String dir: dirs) {
|
for (String dir: dirs) {
|
||||||
StorageLocation sl = StorageLocation.parse(dir);
|
|
||||||
File lockFile = new File(sl.getFile(), Storage.STORAGE_FILE_LOCK);
|
|
||||||
RandomAccessFile raf = null;
|
|
||||||
FileChannel channel = null;
|
|
||||||
FileLock lock = null;
|
|
||||||
try {
|
try {
|
||||||
raf = new RandomAccessFile(lockFile, "rws");
|
FsDatasetTestUtil.assertFileLockReleased(dir);
|
||||||
channel = raf.getChannel();
|
|
||||||
lock = channel.tryLock();
|
|
||||||
assertNotNull(String.format(
|
|
||||||
"Lock file at %s appears to be held by a different process.",
|
|
||||||
lockFile.getAbsolutePath()), lock);
|
|
||||||
} catch (OverlappingFileLockException e) {
|
|
||||||
fail(String.format("Must release lock file at %s.",
|
|
||||||
lockFile.getAbsolutePath()));
|
|
||||||
} finally {
|
|
||||||
if (lock != null) {
|
|
||||||
try {
|
|
||||||
lock.release();
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn(String.format("I/O error releasing file lock %s.",
|
LOG.warn(e);
|
||||||
lockFile.getAbsolutePath()), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
IOUtils.cleanup(null, channel, raf);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,13 +19,24 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.RandomAccessFile;
|
||||||
|
import java.nio.channels.FileChannel;
|
||||||
|
import java.nio.channels.FileLock;
|
||||||
|
import java.nio.channels.OverlappingFileLockException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
public class FsDatasetTestUtil {
|
public class FsDatasetTestUtil {
|
||||||
|
|
||||||
|
@ -72,4 +83,36 @@ public class FsDatasetTestUtil {
|
||||||
FsDatasetImpl fsDataset = ((FsDatasetImpl) dn.getFSDataset());
|
FsDatasetImpl fsDataset = ((FsDatasetImpl) dn.getFSDataset());
|
||||||
((FsDatasetImpl.LazyWriter) fsDataset.lazyWriter.getRunnable()).stop();
|
((FsDatasetImpl.LazyWriter) fsDataset.lazyWriter.getRunnable()).stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Asserts that the storage lock file in the given directory has been
|
||||||
|
* released. This method works by trying to acquire the lock file itself. If
|
||||||
|
* locking fails here, then the main code must have failed to release it.
|
||||||
|
*
|
||||||
|
* @param dir the storage directory to check
|
||||||
|
* @throws IOException if there is an unexpected I/O error
|
||||||
|
*/
|
||||||
|
public static void assertFileLockReleased(String dir) throws IOException {
|
||||||
|
StorageLocation sl = StorageLocation.parse(dir);
|
||||||
|
File lockFile = new File(sl.getFile(), Storage.STORAGE_FILE_LOCK);
|
||||||
|
try (RandomAccessFile raf = new RandomAccessFile(lockFile, "rws");
|
||||||
|
FileChannel channel = raf.getChannel()) {
|
||||||
|
FileLock lock = channel.tryLock();
|
||||||
|
assertNotNull(String.format(
|
||||||
|
"Lock file at %s appears to be held by a different process.",
|
||||||
|
lockFile.getAbsolutePath()), lock);
|
||||||
|
if (lock != null) {
|
||||||
|
try {
|
||||||
|
lock.release();
|
||||||
|
} catch (IOException e) {
|
||||||
|
FsDatasetImpl.LOG.warn(String.format("I/O error releasing file lock %s.",
|
||||||
|
lockFile.getAbsolutePath()), e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (OverlappingFileLockException e) {
|
||||||
|
fail(String.format("Must release lock file at %s.",
|
||||||
|
lockFile.getAbsolutePath()));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,11 +35,13 @@ import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
|
import org.apache.hadoop.io.MultipleIOException;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.DiskChecker;
|
import org.apache.hadoop.util.DiskChecker;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Matchers;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
@ -57,12 +59,17 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.anyList;
|
||||||
import static org.mockito.Matchers.anyListOf;
|
import static org.mockito.Matchers.anyListOf;
|
||||||
|
import static org.mockito.Matchers.anyString;
|
||||||
import static org.mockito.Matchers.eq;
|
import static org.mockito.Matchers.eq;
|
||||||
import static org.mockito.Mockito.doAnswer;
|
import static org.mockito.Mockito.doAnswer;
|
||||||
|
import static org.mockito.Mockito.doReturn;
|
||||||
import static org.mockito.Mockito.doThrow;
|
import static org.mockito.Mockito.doThrow;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.never;
|
import static org.mockito.Mockito.never;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@ -291,4 +298,40 @@ public class TestFsDatasetImpl {
|
||||||
assertFalse(volumeList.getVolumes().contains(brokenVolume));
|
assertFalse(volumeList.getVolumes().contains(brokenVolume));
|
||||||
assertEquals(NUM_VOLUMES - 1, volumeList.getVolumes().size());
|
assertEquals(NUM_VOLUMES - 1, volumeList.getVolumes().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddVolumeFailureReleasesInUseLock() throws IOException {
|
||||||
|
FsDatasetImpl spyDataset = spy(dataset);
|
||||||
|
FsVolumeImpl mockVolume = mock(FsVolumeImpl.class);
|
||||||
|
File badDir = new File(BASE_DIR, "bad");
|
||||||
|
badDir.mkdirs();
|
||||||
|
doReturn(mockVolume).when(spyDataset)
|
||||||
|
.createFsVolume(anyString(), any(File.class), any(StorageType.class));
|
||||||
|
doThrow(new IOException("Failed to getVolumeMap()"))
|
||||||
|
.when(mockVolume).getVolumeMap(
|
||||||
|
anyString(),
|
||||||
|
any(ReplicaMap.class),
|
||||||
|
any(RamDiskReplicaLruTracker.class));
|
||||||
|
|
||||||
|
Storage.StorageDirectory sd = createStorageDirectory(badDir);
|
||||||
|
sd.lock();
|
||||||
|
DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd);
|
||||||
|
when(storage.prepareVolume(eq(datanode), eq(badDir),
|
||||||
|
Matchers.<List<NamespaceInfo>>any()))
|
||||||
|
.thenReturn(builder);
|
||||||
|
|
||||||
|
StorageLocation location = StorageLocation.parse(badDir.toString());
|
||||||
|
List<NamespaceInfo> nsInfos = Lists.newArrayList();
|
||||||
|
for (String bpid : BLOCK_POOL_IDS) {
|
||||||
|
nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1));
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
spyDataset.addVolume(location, nsInfos);
|
||||||
|
fail("Expect to throw MultipleIOException");
|
||||||
|
} catch (MultipleIOException e) {
|
||||||
|
}
|
||||||
|
|
||||||
|
FsDatasetTestUtil.assertFileLockReleased(badDir.toString());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue