HDFS-14459. ClosedChannelException silently ignored in FsVolumeList.addBlockPool(). Contributed by Stephen O'Donnell.
Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
parent
5840df86d7
commit
b0799148cf
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -35,6 +36,28 @@ public class AddBlockPoolException extends RuntimeException {
|
||||||
this.unhealthyDataDirs = unhealthyDataDirs;
|
this.unhealthyDataDirs = unhealthyDataDirs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public AddBlockPoolException() {
|
||||||
|
this.unhealthyDataDirs = new ConcurrentHashMap<FsVolumeSpi, IOException>();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void mergeException(AddBlockPoolException e) {
|
||||||
|
if (e == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for(FsVolumeSpi v : e.unhealthyDataDirs.keySet()) {
|
||||||
|
// If there is already an exception for this volume, keep the original
|
||||||
|
// exception and discard the new one. It is likely the first
|
||||||
|
// exception caused the second or they were both due to the disk issue
|
||||||
|
if (!unhealthyDataDirs.containsKey(v)) {
|
||||||
|
unhealthyDataDirs.put(v, e.unhealthyDataDirs.get(v));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean hasExceptions() {
|
||||||
|
return !unhealthyDataDirs.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
public Map<FsVolumeSpi, IOException> getFailingVolumes() {
|
public Map<FsVolumeSpi, IOException> getFailingVolumes() {
|
||||||
return unhealthyDataDirs;
|
return unhealthyDataDirs;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2829,11 +2829,23 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
public void addBlockPool(String bpid, Configuration conf)
|
public void addBlockPool(String bpid, Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
LOG.info("Adding block pool " + bpid);
|
LOG.info("Adding block pool " + bpid);
|
||||||
|
AddBlockPoolException volumeExceptions = new AddBlockPoolException();
|
||||||
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||||
volumes.addBlockPool(bpid, conf);
|
try {
|
||||||
|
volumes.addBlockPool(bpid, conf);
|
||||||
|
} catch (AddBlockPoolException e) {
|
||||||
|
volumeExceptions.mergeException(e);
|
||||||
|
}
|
||||||
volumeMap.initBlockPool(bpid);
|
volumeMap.initBlockPool(bpid);
|
||||||
}
|
}
|
||||||
volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker);
|
try {
|
||||||
|
volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker);
|
||||||
|
} catch (AddBlockPoolException e) {
|
||||||
|
volumeExceptions.mergeException(e);
|
||||||
|
}
|
||||||
|
if (volumeExceptions.hasExceptions()) {
|
||||||
|
throw volumeExceptions;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -203,9 +203,6 @@ class FsVolumeList {
|
||||||
long timeTaken = Time.monotonicNow() - startTime;
|
long timeTaken = Time.monotonicNow() - startTime;
|
||||||
FsDatasetImpl.LOG.info("Time to add replicas to map for block pool"
|
FsDatasetImpl.LOG.info("Time to add replicas to map for block pool"
|
||||||
+ " " + bpid + " on volume " + v + ": " + timeTaken + "ms");
|
+ " " + bpid + " on volume " + v + ": " + timeTaken + "ms");
|
||||||
} catch (ClosedChannelException e) {
|
|
||||||
FsDatasetImpl.LOG.info("The volume " + v + " is closed while " +
|
|
||||||
"adding replicas, ignored.");
|
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
FsDatasetImpl.LOG.info("Caught exception while adding replicas " +
|
FsDatasetImpl.LOG.info("Caught exception while adding replicas " +
|
||||||
"from " + v + ". Will throw later.", ioe);
|
"from " + v + ". Will throw later.", ioe);
|
||||||
|
@ -413,8 +410,6 @@ class FsVolumeList {
|
||||||
long timeTaken = Time.monotonicNow() - startTime;
|
long timeTaken = Time.monotonicNow() - startTime;
|
||||||
FsDatasetImpl.LOG.info("Time taken to scan block pool " + bpid +
|
FsDatasetImpl.LOG.info("Time taken to scan block pool " + bpid +
|
||||||
" on " + v + ": " + timeTaken + "ms");
|
" on " + v + ": " + timeTaken + "ms");
|
||||||
} catch (ClosedChannelException e) {
|
|
||||||
// ignore.
|
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
FsDatasetImpl.LOG.info("Caught exception while scanning " + v +
|
FsDatasetImpl.LOG.info("Caught exception while scanning " + v +
|
||||||
". Will throw later.", ioe);
|
". Will throw later.", ioe);
|
||||||
|
|
|
@ -0,0 +1,80 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests to ensure AddBlockPoolException behaves correctly when additional
|
||||||
|
* exceptions are merged, as this exception is a wrapper for multiple
|
||||||
|
* exceptions and hence carries some additional logic.
|
||||||
|
*/
|
||||||
|
public class TestAddBlockPoolException {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHasExeceptionsReturnsCorrectValue() {
|
||||||
|
AddBlockPoolException e = new AddBlockPoolException();
|
||||||
|
assertFalse(e.hasExceptions());
|
||||||
|
|
||||||
|
FsVolumeImpl fakeVol = mock(FsVolumeImpl.class);
|
||||||
|
ConcurrentHashMap<FsVolumeSpi, IOException> vols =
|
||||||
|
new ConcurrentHashMap<FsVolumeSpi, IOException>();
|
||||||
|
vols.put(fakeVol, new IOException("Error 1"));
|
||||||
|
e = new AddBlockPoolException(vols);
|
||||||
|
assertTrue(e.hasExceptions());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExceptionsCanBeMerged() {
|
||||||
|
FsVolumeImpl vol1 = mock(FsVolumeImpl.class);
|
||||||
|
FsVolumeImpl vol2 = mock(FsVolumeImpl.class);
|
||||||
|
|
||||||
|
ConcurrentHashMap<FsVolumeSpi, IOException> first =
|
||||||
|
new ConcurrentHashMap<FsVolumeSpi, IOException>();
|
||||||
|
ConcurrentHashMap<FsVolumeSpi, IOException> second =
|
||||||
|
new ConcurrentHashMap<FsVolumeSpi, IOException>();
|
||||||
|
first.put(vol1, new IOException("First Error"));
|
||||||
|
second.put(vol1, new IOException("Second Error"));
|
||||||
|
second.put(vol2, new IOException("V2 Error"));
|
||||||
|
|
||||||
|
AddBlockPoolException e = new AddBlockPoolException(first);
|
||||||
|
e.mergeException(new AddBlockPoolException(second));
|
||||||
|
|
||||||
|
// Ensure there are two exceptions in the map
|
||||||
|
assertEquals(e.getFailingVolumes().size(), 2);
|
||||||
|
// Ensure the first exception added for a volume is the one retained
|
||||||
|
// when multiple errors
|
||||||
|
assertEquals(e.getFailingVolumes().get(vol1).getMessage(), "First Error");
|
||||||
|
assertEquals(e.getFailingVolumes().get(vol2).getMessage(), "V2 Error");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEmptyExceptionsCanBeMerged() {
|
||||||
|
AddBlockPoolException e = new AddBlockPoolException();
|
||||||
|
e.mergeException(new AddBlockPoolException());
|
||||||
|
assertFalse(e.hasExceptions());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue