From 2a07617f852ceddcf6b38ddcefd912fd953823d9 Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Thu, 4 Oct 2018 21:39:29 -0700 Subject: [PATCH] HDDS-354. VolumeInfo.getScmUsed throws NPE. Contributed by Hanisha Koneru. --- .../apache/hadoop/ozone/OzoneConfigKeys.java | 11 -- .../states/endpoint/VersionEndpointTask.java | 44 +++-- .../common/utils/HddsVolumeUtil.java | 2 +- .../container/common/volume/VolumeSet.java | 178 ++++++++++-------- .../container/keyvalue/KeyValueContainer.java | 6 +- .../container/keyvalue/KeyValueHandler.java | 4 +- 6 files changed, 129 insertions(+), 116 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 599b4e80bf2..e8aa22c2ac0 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -286,17 +286,6 @@ public final class OzoneConfigKeys { public static final double HDDS_DATANODE_STORAGE_UTILIZATION_CRITICAL_THRESHOLD_DEFAULT = 0.75; - public static final String - HDDS_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY = - "hdds.write.lock.reporting.threshold.ms"; - public static final long - HDDS_WRITE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT = 5000L; - public static final String - HDDS_LOCK_SUPPRESS_WARNING_INTERVAL_MS_KEY = - "hdds.lock.suppress.warning.interval.ms"; - public static final long - HDDS_LOCK_SUPPRESS_WARNING_INTERVAL_MS_DEAFULT = 10000L; - public static final String OZONE_CONTAINER_COPY_WORKDIR = "hdds.datanode.replication.work.dir"; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java index 64e078d2967..2d0467706ec 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java @@ -69,31 +69,39 @@ public class VersionEndpointTask implements VersionResponse response = VersionResponse.getFromProtobuf( versionResponse); rpcEndPoint.setVersion(response); - VolumeSet volumeSet = ozoneContainer.getVolumeSet(); - Map volumeMap = volumeSet.getVolumeMap(); String scmId = response.getValue(OzoneConsts.SCM_ID); String clusterId = response.getValue(OzoneConsts.CLUSTER_ID); - Preconditions.checkNotNull(scmId, "Reply from SCM: scmId cannot be " + - "null"); - Preconditions.checkNotNull(clusterId, "Reply from SCM: clusterId " + - "cannot be null"); + // Check volumes + VolumeSet volumeSet = ozoneContainer.getVolumeSet(); + volumeSet.readLock(); + try { + Map volumeMap = volumeSet.getVolumeMap(); - // If version file does not exist create version file and also set scmId - for (Map.Entry entry : volumeMap.entrySet()) { - HddsVolume hddsVolume = entry.getValue(); - boolean result = HddsVolumeUtil.checkVolume(hddsVolume, scmId, - clusterId, LOG); - if (!result) { - volumeSet.failVolume(hddsVolume.getHddsRootDir().getPath()); + Preconditions.checkNotNull(scmId, "Reply from SCM: scmId cannot be " + + "null"); + Preconditions.checkNotNull(clusterId, "Reply from SCM: clusterId " + + "cannot be null"); + + // If version file does not exist create version file and also set scmId + for (Map.Entry entry : volumeMap.entrySet()) { + HddsVolume hddsVolume = entry.getValue(); + boolean result = HddsVolumeUtil.checkVolume(hddsVolume, scmId, + clusterId, LOG); + if (!result) { + volumeSet.failVolume(hddsVolume.getHddsRootDir().getPath()); + } } + if (volumeSet.getVolumesList().size() == 0) { + // All volumes are inconsistent state + throw new DiskOutOfSpaceException("All configured Volumes are in " + + "Inconsistent State"); + } + } finally { + volumeSet.readUnlock(); } - if (volumeSet.getVolumesList().size() == 0) { - // All volumes are inconsistent state - throw new DiskOutOfSpaceException("All configured Volumes are in " + - "Inconsistent State"); - } + ozoneContainer.getDispatcher().setScmId(scmId); EndpointStateMachine.EndPointStates nextState = diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java index bc0bd056b1d..cb356dadeb2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java @@ -164,7 +164,7 @@ public final class HddsVolumeUtil { } /** - * Check Volume is consistent state or not. + * Check Volume is in consistent state or not. * @param hddsVolume * @param scmId * @param clusterId diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java index 06f48fc2936..5b6b823c9c6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java @@ -33,15 +33,11 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.NodeReportProto; -import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.common.InconsistentStorageStateException; import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport; import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil; import org.apache.hadoop.ozone.container.common.volume.HddsVolume.VolumeState; -import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; -import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; -import org.apache.hadoop.util.InstrumentedLock; import org.apache.hadoop.util.ShutdownHookManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,8 +49,7 @@ import java.util.EnumMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * VolumeSet to manage volumes in a DataNode. @@ -84,11 +79,12 @@ public class VolumeSet { private EnumMap> volumeStateMap; /** - * Lock to synchronize changes to the VolumeSet. Any update to - * {@link VolumeSet#volumeMap}, {@link VolumeSet#failedVolumeMap}, or - * {@link VolumeSet#volumeStateMap} should be done after acquiring this lock. + * A Reentrant Read Write Lock to synchronize volume operations in VolumeSet. + * Any update to {@link VolumeSet#volumeMap}, + * {@link VolumeSet#failedVolumeMap}, or {@link VolumeSet#volumeStateMap} + * should be done after acquiring the write lock. */ - private final AutoCloseableLock volumeSetLock; + private final ReentrantReadWriteLock volumeSetRWLock; private final String datanodeUuid; private String clusterID; @@ -105,17 +101,7 @@ public class VolumeSet { this.datanodeUuid = dnUuid; this.clusterID = clusterID; this.conf = conf; - this.volumeSetLock = new AutoCloseableLock( - new InstrumentedLock(getClass().getName(), LOG, - new ReentrantLock(true), - conf.getTimeDuration( - OzoneConfigKeys.HDDS_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY, - OzoneConfigKeys.HDDS_WRITE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT, - TimeUnit.MILLISECONDS), - conf.getTimeDuration( - OzoneConfigKeys.HDDS_LOCK_SUPPRESS_WARNING_INTERVAL_MS_KEY, - OzoneConfigKeys.HDDS_LOCK_SUPPRESS_WARNING_INTERVAL_MS_DEAFULT, - TimeUnit.MILLISECONDS))); + this.volumeSetRWLock = new ReentrantReadWriteLock(); initializeVolumeSet(); } @@ -198,14 +184,35 @@ public class VolumeSet { } } - public void acquireLock() { - volumeSetLock.acquire(); + /** + * Acquire Volume Set Read lock. + */ + public void readLock() { + volumeSetRWLock.readLock().lock(); } - public void releaseLock() { - volumeSetLock.release(); + /** + * Release Volume Set Read lock. + */ + public void readUnlock() { + volumeSetRWLock.readLock().unlock(); } + /** + * Acquire Volume Set Write lock. + */ + public void writeLock() { + volumeSetRWLock.writeLock().lock(); + } + + /** + * Release Volume Set Write lock. + */ + public void writeUnlock() { + volumeSetRWLock.writeLock().unlock(); + } + + private HddsVolume createVolume(String locationString, StorageType storageType) throws IOException { HddsVolume.Builder volumeBuilder = new HddsVolume.Builder(locationString) @@ -227,7 +234,8 @@ public class VolumeSet { String hddsRoot = HddsVolumeUtil.getHddsRoot(volumeRoot); boolean success; - try (AutoCloseableLock lock = volumeSetLock.acquire()) { + this.writeLock(); + try { if (volumeMap.containsKey(hddsRoot)) { LOG.warn("Volume : {} already exists in VolumeMap", hddsRoot); success = false; @@ -247,6 +255,8 @@ public class VolumeSet { } catch (IOException ex) { LOG.error("Failed to add volume " + volumeRoot + " to VolumeSet", ex); success = false; + } finally { + this.writeUnlock(); } return success; } @@ -255,7 +265,8 @@ public class VolumeSet { public void failVolume(String dataDir) { String hddsRoot = HddsVolumeUtil.getHddsRoot(dataDir); - try (AutoCloseableLock lock = volumeSetLock.acquire()) { + this.writeLock(); + try { if (volumeMap.containsKey(hddsRoot)) { HddsVolume hddsVolume = volumeMap.get(hddsRoot); hddsVolume.failVolume(); @@ -270,6 +281,8 @@ public class VolumeSet { } else { LOG.warn("Volume : {} does not exist in VolumeSet", hddsRoot); } + } finally { + this.writeUnlock(); } } @@ -277,7 +290,8 @@ public class VolumeSet { public void removeVolume(String dataDir) throws IOException { String hddsRoot = HddsVolumeUtil.getHddsRoot(dataDir); - try (AutoCloseableLock lock = volumeSetLock.acquire()) { + this.writeLock(); + try { if (volumeMap.containsKey(hddsRoot)) { HddsVolume hddsVolume = volumeMap.get(hddsRoot); hddsVolume.shutdown(); @@ -295,14 +309,11 @@ public class VolumeSet { } else { LOG.warn("Volume : {} does not exist in VolumeSet", hddsRoot); } + } finally { + this.writeUnlock(); } } - public HddsVolume chooseVolume(long containerSize, - VolumeChoosingPolicy choosingPolicy) throws IOException { - return choosingPolicy.chooseVolume(getVolumesList(), containerSize); - } - /** * This method, call shutdown on each volume to shutdown volume usage * thread and write scmUsed on each volume. @@ -352,55 +363,60 @@ public class VolumeSet { public StorageContainerDatanodeProtocolProtos.NodeReportProto getNodeReport() throws IOException { boolean failed; - StorageLocationReport[] reports = new StorageLocationReport[volumeMap - .size() + failedVolumeMap.size()]; - int counter = 0; - HddsVolume hddsVolume; - for (Map.Entry entry : volumeMap.entrySet()) { - hddsVolume = entry.getValue(); - VolumeInfo volumeInfo = hddsVolume.getVolumeInfo(); - long scmUsed = 0; - long remaining = 0; - failed = false; - try { - scmUsed = volumeInfo.getScmUsed(); - remaining = volumeInfo.getAvailable(); - } catch (IOException ex) { - LOG.warn("Failed to get scmUsed and remaining for container " + - "storage location {}", volumeInfo.getRootDir()); - // reset scmUsed and remaining if df/du failed. - scmUsed = 0; - remaining = 0; - failed = true; - } + this.readLock(); + try { + StorageLocationReport[] reports = new StorageLocationReport[volumeMap + .size() + failedVolumeMap.size()]; + int counter = 0; + HddsVolume hddsVolume; + for (Map.Entry entry : volumeMap.entrySet()) { + hddsVolume = entry.getValue(); + VolumeInfo volumeInfo = hddsVolume.getVolumeInfo(); + long scmUsed = 0; + long remaining = 0; + failed = false; + try { + scmUsed = volumeInfo.getScmUsed(); + remaining = volumeInfo.getAvailable(); + } catch (IOException ex) { + LOG.warn("Failed to get scmUsed and remaining for container " + + "storage location {}", volumeInfo.getRootDir()); + // reset scmUsed and remaining if df/du failed. + scmUsed = 0; + remaining = 0; + failed = true; + } - StorageLocationReport.Builder builder = - StorageLocationReport.newBuilder(); - builder.setStorageLocation(volumeInfo.getRootDir()) - .setId(hddsVolume.getStorageID()) - .setFailed(failed) - .setCapacity(hddsVolume.getCapacity()) - .setRemaining(remaining) - .setScmUsed(scmUsed) - .setStorageType(hddsVolume.getStorageType()); - StorageLocationReport r = builder.build(); - reports[counter++] = r; + StorageLocationReport.Builder builder = + StorageLocationReport.newBuilder(); + builder.setStorageLocation(volumeInfo.getRootDir()) + .setId(hddsVolume.getStorageID()) + .setFailed(failed) + .setCapacity(hddsVolume.getCapacity()) + .setRemaining(remaining) + .setScmUsed(scmUsed) + .setStorageType(hddsVolume.getStorageType()); + StorageLocationReport r = builder.build(); + reports[counter++] = r; + } + for (Map.Entry entry : failedVolumeMap.entrySet()) { + hddsVolume = entry.getValue(); + StorageLocationReport.Builder builder = StorageLocationReport + .newBuilder(); + builder.setStorageLocation(hddsVolume.getHddsRootDir() + .getAbsolutePath()).setId(hddsVolume.getStorageID()).setFailed(true) + .setCapacity(0).setRemaining(0).setScmUsed(0).setStorageType( + hddsVolume.getStorageType()); + StorageLocationReport r = builder.build(); + reports[counter++] = r; + } + NodeReportProto.Builder nrb = NodeReportProto.newBuilder(); + for (int i = 0; i < reports.length; i++) { + nrb.addStorageReport(reports[i].getProtoBufMessage()); + } + return nrb.build(); + } finally { + this.readUnlock(); } - for (Map.Entry entry : failedVolumeMap.entrySet()) { - hddsVolume = entry.getValue(); - StorageLocationReport.Builder builder = StorageLocationReport - .newBuilder(); - builder.setStorageLocation(hddsVolume.getHddsRootDir() - .getAbsolutePath()).setId(hddsVolume.getStorageID()).setFailed(true) - .setCapacity(0).setRemaining(0).setScmUsed(0).setStorageType( - hddsVolume.getStorageType()); - StorageLocationReport r = builder.build(); - reports[counter++] = r; - } - NodeReportProto.Builder nrb = NodeReportProto.newBuilder(); - for (int i = 0; i < reports.length; i++) { - nrb.addStorageReport(reports[i].getProtoBufMessage()); - } - return nrb.build(); } } \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java index 09d40546f0f..e5b344de483 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java @@ -108,8 +108,8 @@ public class KeyValueContainer implements Container { Preconditions.checkNotNull(scmId, "scmId cannot be null"); File containerMetaDataPath = null; - //acquiring volumeset lock and container lock - volumeSet.acquireLock(); + //acquiring volumeset read lock + volumeSet.readLock(); long maxSize = containerData.getMaxSize(); try { HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet @@ -166,7 +166,7 @@ public class KeyValueContainer implements Container { throw new StorageContainerException("Container creation failed.", ex, CONTAINER_INTERNAL_ERROR); } finally { - volumeSet.releaseLock(); + volumeSet.readUnlock(); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 5be6e2849b7..922db2ad888 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -271,14 +271,14 @@ public class KeyValueHandler extends Handler { public void populateContainerPathFields(KeyValueContainer container, long maxSize) throws IOException { - volumeSet.acquireLock(); + volumeSet.readLock(); try { HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet .getVolumesList(), maxSize); String hddsVolumeDir = containerVolume.getHddsRootDir().toString(); container.populatePathFields(scmID, containerVolume, hddsVolumeDir); } finally { - volumeSet.releaseLock(); + volumeSet.readUnlock(); } }