HDDS-354. VolumeInfo.getScmUsed throws NPE. Contributed by Hanisha Koneru.
This commit is contained in:
parent
e6b77ad65f
commit
2a07617f85
|
@ -286,17 +286,6 @@ public final class OzoneConfigKeys {
|
|||
public static final double
|
||||
HDDS_DATANODE_STORAGE_UTILIZATION_CRITICAL_THRESHOLD_DEFAULT = 0.75;
|
||||
|
||||
public static final String
|
||||
HDDS_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY =
|
||||
"hdds.write.lock.reporting.threshold.ms";
|
||||
public static final long
|
||||
HDDS_WRITE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT = 5000L;
|
||||
public static final String
|
||||
HDDS_LOCK_SUPPRESS_WARNING_INTERVAL_MS_KEY =
|
||||
"hdds.lock.suppress.warning.interval.ms";
|
||||
public static final long
|
||||
HDDS_LOCK_SUPPRESS_WARNING_INTERVAL_MS_DEAFULT = 10000L;
|
||||
|
||||
public static final String OZONE_CONTAINER_COPY_WORKDIR =
|
||||
"hdds.datanode.replication.work.dir";
|
||||
|
||||
|
|
|
@ -69,31 +69,39 @@ public class VersionEndpointTask implements
|
|||
VersionResponse response = VersionResponse.getFromProtobuf(
|
||||
versionResponse);
|
||||
rpcEndPoint.setVersion(response);
|
||||
VolumeSet volumeSet = ozoneContainer.getVolumeSet();
|
||||
Map<String, HddsVolume> volumeMap = volumeSet.getVolumeMap();
|
||||
|
||||
String scmId = response.getValue(OzoneConsts.SCM_ID);
|
||||
String clusterId = response.getValue(OzoneConsts.CLUSTER_ID);
|
||||
|
||||
Preconditions.checkNotNull(scmId, "Reply from SCM: scmId cannot be " +
|
||||
"null");
|
||||
Preconditions.checkNotNull(clusterId, "Reply from SCM: clusterId " +
|
||||
"cannot be null");
|
||||
// Check volumes
|
||||
VolumeSet volumeSet = ozoneContainer.getVolumeSet();
|
||||
volumeSet.readLock();
|
||||
try {
|
||||
Map<String, HddsVolume> volumeMap = volumeSet.getVolumeMap();
|
||||
|
||||
// If version file does not exist create version file and also set scmId
|
||||
for (Map.Entry<String, HddsVolume> entry : volumeMap.entrySet()) {
|
||||
HddsVolume hddsVolume = entry.getValue();
|
||||
boolean result = HddsVolumeUtil.checkVolume(hddsVolume, scmId,
|
||||
clusterId, LOG);
|
||||
if (!result) {
|
||||
volumeSet.failVolume(hddsVolume.getHddsRootDir().getPath());
|
||||
Preconditions.checkNotNull(scmId, "Reply from SCM: scmId cannot be " +
|
||||
"null");
|
||||
Preconditions.checkNotNull(clusterId, "Reply from SCM: clusterId " +
|
||||
"cannot be null");
|
||||
|
||||
// If version file does not exist create version file and also set scmId
|
||||
for (Map.Entry<String, HddsVolume> entry : volumeMap.entrySet()) {
|
||||
HddsVolume hddsVolume = entry.getValue();
|
||||
boolean result = HddsVolumeUtil.checkVolume(hddsVolume, scmId,
|
||||
clusterId, LOG);
|
||||
if (!result) {
|
||||
volumeSet.failVolume(hddsVolume.getHddsRootDir().getPath());
|
||||
}
|
||||
}
|
||||
if (volumeSet.getVolumesList().size() == 0) {
|
||||
// All volumes are inconsistent state
|
||||
throw new DiskOutOfSpaceException("All configured Volumes are in " +
|
||||
"Inconsistent State");
|
||||
}
|
||||
} finally {
|
||||
volumeSet.readUnlock();
|
||||
}
|
||||
if (volumeSet.getVolumesList().size() == 0) {
|
||||
// All volumes are inconsistent state
|
||||
throw new DiskOutOfSpaceException("All configured Volumes are in " +
|
||||
"Inconsistent State");
|
||||
}
|
||||
|
||||
ozoneContainer.getDispatcher().setScmId(scmId);
|
||||
|
||||
EndpointStateMachine.EndPointStates nextState =
|
||||
|
|
|
@ -164,7 +164,7 @@ public final class HddsVolumeUtil {
|
|||
}
|
||||
|
||||
/**
|
||||
* Check Volume is consistent state or not.
|
||||
* Check Volume is in consistent state or not.
|
||||
* @param hddsVolume
|
||||
* @param scmId
|
||||
* @param clusterId
|
||||
|
|
|
@ -33,15 +33,11 @@ import org.apache.hadoop.hdds.protocol.proto
|
|||
.StorageContainerDatanodeProtocolProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
|
||||
import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
|
||||
import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
|
||||
import org.apache.hadoop.ozone.container.common.volume.HddsVolume.VolumeState;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
||||
import org.apache.hadoop.util.InstrumentedLock;
|
||||
import org.apache.hadoop.util.ShutdownHookManager;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -53,8 +49,7 @@ import java.util.EnumMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
/**
|
||||
* VolumeSet to manage volumes in a DataNode.
|
||||
|
@ -84,11 +79,12 @@ public class VolumeSet {
|
|||
private EnumMap<StorageType, List<HddsVolume>> volumeStateMap;
|
||||
|
||||
/**
|
||||
* Lock to synchronize changes to the VolumeSet. Any update to
|
||||
* {@link VolumeSet#volumeMap}, {@link VolumeSet#failedVolumeMap}, or
|
||||
* {@link VolumeSet#volumeStateMap} should be done after acquiring this lock.
|
||||
* A Reentrant Read Write Lock to synchronize volume operations in VolumeSet.
|
||||
* Any update to {@link VolumeSet#volumeMap},
|
||||
* {@link VolumeSet#failedVolumeMap}, or {@link VolumeSet#volumeStateMap}
|
||||
* should be done after acquiring the write lock.
|
||||
*/
|
||||
private final AutoCloseableLock volumeSetLock;
|
||||
private final ReentrantReadWriteLock volumeSetRWLock;
|
||||
|
||||
private final String datanodeUuid;
|
||||
private String clusterID;
|
||||
|
@ -105,17 +101,7 @@ public class VolumeSet {
|
|||
this.datanodeUuid = dnUuid;
|
||||
this.clusterID = clusterID;
|
||||
this.conf = conf;
|
||||
this.volumeSetLock = new AutoCloseableLock(
|
||||
new InstrumentedLock(getClass().getName(), LOG,
|
||||
new ReentrantLock(true),
|
||||
conf.getTimeDuration(
|
||||
OzoneConfigKeys.HDDS_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY,
|
||||
OzoneConfigKeys.HDDS_WRITE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT,
|
||||
TimeUnit.MILLISECONDS),
|
||||
conf.getTimeDuration(
|
||||
OzoneConfigKeys.HDDS_LOCK_SUPPRESS_WARNING_INTERVAL_MS_KEY,
|
||||
OzoneConfigKeys.HDDS_LOCK_SUPPRESS_WARNING_INTERVAL_MS_DEAFULT,
|
||||
TimeUnit.MILLISECONDS)));
|
||||
this.volumeSetRWLock = new ReentrantReadWriteLock();
|
||||
|
||||
initializeVolumeSet();
|
||||
}
|
||||
|
@ -198,14 +184,35 @@ public class VolumeSet {
|
|||
}
|
||||
}
|
||||
|
||||
public void acquireLock() {
|
||||
volumeSetLock.acquire();
|
||||
/**
|
||||
* Acquire Volume Set Read lock.
|
||||
*/
|
||||
public void readLock() {
|
||||
volumeSetRWLock.readLock().lock();
|
||||
}
|
||||
|
||||
public void releaseLock() {
|
||||
volumeSetLock.release();
|
||||
/**
|
||||
* Release Volume Set Read lock.
|
||||
*/
|
||||
public void readUnlock() {
|
||||
volumeSetRWLock.readLock().unlock();
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquire Volume Set Write lock.
|
||||
*/
|
||||
public void writeLock() {
|
||||
volumeSetRWLock.writeLock().lock();
|
||||
}
|
||||
|
||||
/**
|
||||
* Release Volume Set Write lock.
|
||||
*/
|
||||
public void writeUnlock() {
|
||||
volumeSetRWLock.writeLock().unlock();
|
||||
}
|
||||
|
||||
|
||||
private HddsVolume createVolume(String locationString,
|
||||
StorageType storageType) throws IOException {
|
||||
HddsVolume.Builder volumeBuilder = new HddsVolume.Builder(locationString)
|
||||
|
@ -227,7 +234,8 @@ public class VolumeSet {
|
|||
String hddsRoot = HddsVolumeUtil.getHddsRoot(volumeRoot);
|
||||
boolean success;
|
||||
|
||||
try (AutoCloseableLock lock = volumeSetLock.acquire()) {
|
||||
this.writeLock();
|
||||
try {
|
||||
if (volumeMap.containsKey(hddsRoot)) {
|
||||
LOG.warn("Volume : {} already exists in VolumeMap", hddsRoot);
|
||||
success = false;
|
||||
|
@ -247,6 +255,8 @@ public class VolumeSet {
|
|||
} catch (IOException ex) {
|
||||
LOG.error("Failed to add volume " + volumeRoot + " to VolumeSet", ex);
|
||||
success = false;
|
||||
} finally {
|
||||
this.writeUnlock();
|
||||
}
|
||||
return success;
|
||||
}
|
||||
|
@ -255,7 +265,8 @@ public class VolumeSet {
|
|||
public void failVolume(String dataDir) {
|
||||
String hddsRoot = HddsVolumeUtil.getHddsRoot(dataDir);
|
||||
|
||||
try (AutoCloseableLock lock = volumeSetLock.acquire()) {
|
||||
this.writeLock();
|
||||
try {
|
||||
if (volumeMap.containsKey(hddsRoot)) {
|
||||
HddsVolume hddsVolume = volumeMap.get(hddsRoot);
|
||||
hddsVolume.failVolume();
|
||||
|
@ -270,6 +281,8 @@ public class VolumeSet {
|
|||
} else {
|
||||
LOG.warn("Volume : {} does not exist in VolumeSet", hddsRoot);
|
||||
}
|
||||
} finally {
|
||||
this.writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -277,7 +290,8 @@ public class VolumeSet {
|
|||
public void removeVolume(String dataDir) throws IOException {
|
||||
String hddsRoot = HddsVolumeUtil.getHddsRoot(dataDir);
|
||||
|
||||
try (AutoCloseableLock lock = volumeSetLock.acquire()) {
|
||||
this.writeLock();
|
||||
try {
|
||||
if (volumeMap.containsKey(hddsRoot)) {
|
||||
HddsVolume hddsVolume = volumeMap.get(hddsRoot);
|
||||
hddsVolume.shutdown();
|
||||
|
@ -295,14 +309,11 @@ public class VolumeSet {
|
|||
} else {
|
||||
LOG.warn("Volume : {} does not exist in VolumeSet", hddsRoot);
|
||||
}
|
||||
} finally {
|
||||
this.writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
public HddsVolume chooseVolume(long containerSize,
|
||||
VolumeChoosingPolicy choosingPolicy) throws IOException {
|
||||
return choosingPolicy.chooseVolume(getVolumesList(), containerSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method, call shutdown on each volume to shutdown volume usage
|
||||
* thread and write scmUsed on each volume.
|
||||
|
@ -352,55 +363,60 @@ public class VolumeSet {
|
|||
public StorageContainerDatanodeProtocolProtos.NodeReportProto getNodeReport()
|
||||
throws IOException {
|
||||
boolean failed;
|
||||
StorageLocationReport[] reports = new StorageLocationReport[volumeMap
|
||||
.size() + failedVolumeMap.size()];
|
||||
int counter = 0;
|
||||
HddsVolume hddsVolume;
|
||||
for (Map.Entry<String, HddsVolume> entry : volumeMap.entrySet()) {
|
||||
hddsVolume = entry.getValue();
|
||||
VolumeInfo volumeInfo = hddsVolume.getVolumeInfo();
|
||||
long scmUsed = 0;
|
||||
long remaining = 0;
|
||||
failed = false;
|
||||
try {
|
||||
scmUsed = volumeInfo.getScmUsed();
|
||||
remaining = volumeInfo.getAvailable();
|
||||
} catch (IOException ex) {
|
||||
LOG.warn("Failed to get scmUsed and remaining for container " +
|
||||
"storage location {}", volumeInfo.getRootDir());
|
||||
// reset scmUsed and remaining if df/du failed.
|
||||
scmUsed = 0;
|
||||
remaining = 0;
|
||||
failed = true;
|
||||
}
|
||||
this.readLock();
|
||||
try {
|
||||
StorageLocationReport[] reports = new StorageLocationReport[volumeMap
|
||||
.size() + failedVolumeMap.size()];
|
||||
int counter = 0;
|
||||
HddsVolume hddsVolume;
|
||||
for (Map.Entry<String, HddsVolume> entry : volumeMap.entrySet()) {
|
||||
hddsVolume = entry.getValue();
|
||||
VolumeInfo volumeInfo = hddsVolume.getVolumeInfo();
|
||||
long scmUsed = 0;
|
||||
long remaining = 0;
|
||||
failed = false;
|
||||
try {
|
||||
scmUsed = volumeInfo.getScmUsed();
|
||||
remaining = volumeInfo.getAvailable();
|
||||
} catch (IOException ex) {
|
||||
LOG.warn("Failed to get scmUsed and remaining for container " +
|
||||
"storage location {}", volumeInfo.getRootDir());
|
||||
// reset scmUsed and remaining if df/du failed.
|
||||
scmUsed = 0;
|
||||
remaining = 0;
|
||||
failed = true;
|
||||
}
|
||||
|
||||
StorageLocationReport.Builder builder =
|
||||
StorageLocationReport.newBuilder();
|
||||
builder.setStorageLocation(volumeInfo.getRootDir())
|
||||
.setId(hddsVolume.getStorageID())
|
||||
.setFailed(failed)
|
||||
.setCapacity(hddsVolume.getCapacity())
|
||||
.setRemaining(remaining)
|
||||
.setScmUsed(scmUsed)
|
||||
.setStorageType(hddsVolume.getStorageType());
|
||||
StorageLocationReport r = builder.build();
|
||||
reports[counter++] = r;
|
||||
StorageLocationReport.Builder builder =
|
||||
StorageLocationReport.newBuilder();
|
||||
builder.setStorageLocation(volumeInfo.getRootDir())
|
||||
.setId(hddsVolume.getStorageID())
|
||||
.setFailed(failed)
|
||||
.setCapacity(hddsVolume.getCapacity())
|
||||
.setRemaining(remaining)
|
||||
.setScmUsed(scmUsed)
|
||||
.setStorageType(hddsVolume.getStorageType());
|
||||
StorageLocationReport r = builder.build();
|
||||
reports[counter++] = r;
|
||||
}
|
||||
for (Map.Entry<String, HddsVolume> entry : failedVolumeMap.entrySet()) {
|
||||
hddsVolume = entry.getValue();
|
||||
StorageLocationReport.Builder builder = StorageLocationReport
|
||||
.newBuilder();
|
||||
builder.setStorageLocation(hddsVolume.getHddsRootDir()
|
||||
.getAbsolutePath()).setId(hddsVolume.getStorageID()).setFailed(true)
|
||||
.setCapacity(0).setRemaining(0).setScmUsed(0).setStorageType(
|
||||
hddsVolume.getStorageType());
|
||||
StorageLocationReport r = builder.build();
|
||||
reports[counter++] = r;
|
||||
}
|
||||
NodeReportProto.Builder nrb = NodeReportProto.newBuilder();
|
||||
for (int i = 0; i < reports.length; i++) {
|
||||
nrb.addStorageReport(reports[i].getProtoBufMessage());
|
||||
}
|
||||
return nrb.build();
|
||||
} finally {
|
||||
this.readUnlock();
|
||||
}
|
||||
for (Map.Entry<String, HddsVolume> entry : failedVolumeMap.entrySet()) {
|
||||
hddsVolume = entry.getValue();
|
||||
StorageLocationReport.Builder builder = StorageLocationReport
|
||||
.newBuilder();
|
||||
builder.setStorageLocation(hddsVolume.getHddsRootDir()
|
||||
.getAbsolutePath()).setId(hddsVolume.getStorageID()).setFailed(true)
|
||||
.setCapacity(0).setRemaining(0).setScmUsed(0).setStorageType(
|
||||
hddsVolume.getStorageType());
|
||||
StorageLocationReport r = builder.build();
|
||||
reports[counter++] = r;
|
||||
}
|
||||
NodeReportProto.Builder nrb = NodeReportProto.newBuilder();
|
||||
for (int i = 0; i < reports.length; i++) {
|
||||
nrb.addStorageReport(reports[i].getProtoBufMessage());
|
||||
}
|
||||
return nrb.build();
|
||||
}
|
||||
}
|
|
@ -108,8 +108,8 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
|
|||
Preconditions.checkNotNull(scmId, "scmId cannot be null");
|
||||
|
||||
File containerMetaDataPath = null;
|
||||
//acquiring volumeset lock and container lock
|
||||
volumeSet.acquireLock();
|
||||
//acquiring volumeset read lock
|
||||
volumeSet.readLock();
|
||||
long maxSize = containerData.getMaxSize();
|
||||
try {
|
||||
HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet
|
||||
|
@ -166,7 +166,7 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
|
|||
throw new StorageContainerException("Container creation failed.", ex,
|
||||
CONTAINER_INTERNAL_ERROR);
|
||||
} finally {
|
||||
volumeSet.releaseLock();
|
||||
volumeSet.readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -271,14 +271,14 @@ public class KeyValueHandler extends Handler {
|
|||
|
||||
public void populateContainerPathFields(KeyValueContainer container,
|
||||
long maxSize) throws IOException {
|
||||
volumeSet.acquireLock();
|
||||
volumeSet.readLock();
|
||||
try {
|
||||
HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet
|
||||
.getVolumesList(), maxSize);
|
||||
String hddsVolumeDir = containerVolume.getHddsRootDir().toString();
|
||||
container.populatePathFields(scmID, containerVolume, hddsVolumeDir);
|
||||
} finally {
|
||||
volumeSet.releaseLock();
|
||||
volumeSet.readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue