HDFS-7830. DataNode does not release the volume lock when adding a volume fails. (Lei Xu via Colin P. McCabe)

(cherry picked from commit 5c1036d598)
This commit is contained in:
Colin Patrick Mccabe 2015-03-10 18:20:25 -07:00
parent f1b32d1436
commit eefca23e8c
6 changed files with 108 additions and 33 deletions

View File

@ -827,6 +827,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7818. OffsetParam should return the default value instead of throwing
NPE when the value is unspecified. (Eric Payne via wheat9)
HDFS-7830. DataNode does not release the volume lock when adding a volume
fails. (Lei Xu via Colin P. Mccabe)
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode

View File

@ -672,7 +672,7 @@ public abstract class Storage extends StorageInfo {
*/
public void lock() throws IOException {
if (isShared()) {
LOG.info("Locking is disabled");
LOG.info("Locking is disabled for " + this.root);
return;
}
FileLock newLock = tryLock();

View File

@ -47,6 +47,7 @@ import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -377,6 +378,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
}
@VisibleForTesting
public FsVolumeImpl createFsVolume(String storageUuid, File currentDir,
StorageType storageType) throws IOException {
return new FsVolumeImpl(this, storageUuid, currentDir, conf, storageType);
}
@Override
public void addVolume(final StorageLocation location,
final List<NamespaceInfo> nsInfos)
@ -396,8 +403,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
final Storage.StorageDirectory sd = builder.getStorageDirectory();
StorageType storageType = location.getStorageType();
final FsVolumeImpl fsVolume = new FsVolumeImpl(
this, sd.getStorageUuid(), sd.getCurrentDir(), this.conf, storageType);
final FsVolumeImpl fsVolume =
createFsVolume(sd.getStorageUuid(), sd.getCurrentDir(), storageType);
final ReplicaMap tempVolumeMap = new ReplicaMap(fsVolume);
ArrayList<IOException> exceptions = Lists.newArrayList();
@ -413,6 +420,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
}
if (!exceptions.isEmpty()) {
try {
sd.unlock();
} catch (IOException e) {
exceptions.add(e);
}
throw MultipleIOException.createIOException(exceptions);
}

View File

@ -37,18 +37,14 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -70,7 +66,6 @@ import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -538,31 +533,10 @@ public class TestDataNodeHotSwapVolumes {
private static void assertFileLocksReleased(Collection<String> dirs)
throws IOException {
for (String dir: dirs) {
StorageLocation sl = StorageLocation.parse(dir);
File lockFile = new File(sl.getFile(), Storage.STORAGE_FILE_LOCK);
RandomAccessFile raf = null;
FileChannel channel = null;
FileLock lock = null;
try {
raf = new RandomAccessFile(lockFile, "rws");
channel = raf.getChannel();
lock = channel.tryLock();
assertNotNull(String.format(
"Lock file at %s appears to be held by a different process.",
lockFile.getAbsolutePath()), lock);
} catch (OverlappingFileLockException e) {
fail(String.format("Must release lock file at %s.",
lockFile.getAbsolutePath()));
} finally {
if (lock != null) {
try {
lock.release();
} catch (IOException e) {
LOG.warn(String.format("I/O error releasing file lock %s.",
lockFile.getAbsolutePath()), e);
}
}
IOUtils.cleanup(null, channel, raf);
FsDatasetTestUtil.assertFileLockReleased(dir);
} catch (IOException e) {
LOG.warn(e);
}
}
}

View File

@ -19,13 +19,24 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.util.Collection;
import java.util.Random;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.io.IOUtils;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
public class FsDatasetTestUtil {
@ -72,4 +83,36 @@ public class FsDatasetTestUtil {
FsDatasetImpl fsDataset = ((FsDatasetImpl) dn.getFSDataset());
((FsDatasetImpl.LazyWriter) fsDataset.lazyWriter.getRunnable()).stop();
}
/**
* Asserts that the storage lock file in the given directory has been
* released. This method works by trying to acquire the lock file itself. If
* locking fails here, then the main code must have failed to release it.
*
* @param dir the storage directory to check
* @throws IOException if there is an unexpected I/O error
*/
public static void assertFileLockReleased(String dir) throws IOException {
StorageLocation sl = StorageLocation.parse(dir);
File lockFile = new File(sl.getFile(), Storage.STORAGE_FILE_LOCK);
try (RandomAccessFile raf = new RandomAccessFile(lockFile, "rws");
FileChannel channel = raf.getChannel()) {
FileLock lock = channel.tryLock();
assertNotNull(String.format(
"Lock file at %s appears to be held by a different process.",
lockFile.getAbsolutePath()), lock);
if (lock != null) {
try {
lock.release();
} catch (IOException e) {
FsDatasetImpl.LOG.warn(String.format("I/O error releasing file lock %s.",
lockFile.getAbsolutePath()), e);
throw e;
}
}
} catch (OverlappingFileLockException e) {
fail(String.format("Must release lock file at %s.",
lockFile.getAbsolutePath()));
}
}
}

View File

@ -35,11 +35,13 @@ import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.StringUtils;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@ -57,12 +59,17 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyList;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -291,4 +298,40 @@ public class TestFsDatasetImpl {
assertFalse(volumeList.getVolumes().contains(brokenVolume));
assertEquals(NUM_VOLUMES - 1, volumeList.getVolumes().size());
}
@Test
public void testAddVolumeFailureReleasesInUseLock() throws IOException {
FsDatasetImpl spyDataset = spy(dataset);
FsVolumeImpl mockVolume = mock(FsVolumeImpl.class);
File badDir = new File(BASE_DIR, "bad");
badDir.mkdirs();
doReturn(mockVolume).when(spyDataset)
.createFsVolume(anyString(), any(File.class), any(StorageType.class));
doThrow(new IOException("Failed to getVolumeMap()"))
.when(mockVolume).getVolumeMap(
anyString(),
any(ReplicaMap.class),
any(RamDiskReplicaLruTracker.class));
Storage.StorageDirectory sd = createStorageDirectory(badDir);
sd.lock();
DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd);
when(storage.prepareVolume(eq(datanode), eq(badDir),
Matchers.<List<NamespaceInfo>>any()))
.thenReturn(builder);
StorageLocation location = StorageLocation.parse(badDir.toString());
List<NamespaceInfo> nsInfos = Lists.newArrayList();
for (String bpid : BLOCK_POOL_IDS) {
nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1));
}
try {
spyDataset.addVolume(location, nsInfos);
fail("Expect to throw MultipleIOException");
} catch (MultipleIOException e) {
}
FsDatasetTestUtil.assertFileLockReleased(badDir.toString());
}
}