From f347c348d8dd092181b7ce135968f4469f685841 Mon Sep 17 00:00:00 2001 From: Hrishikesh Gadre Date: Thu, 11 Jul 2019 15:08:56 -0700 Subject: [PATCH] Add support for checksum verification in data scrubber Signed-off-by: Anu Engineer --- .../apache/hadoop/hdds/HddsConfigKeys.java | 5 +- .../hadoop/ozone/common/ChecksumData.java | 7 +- .../common/interfaces/Container.java | 26 ++- .../container/keyvalue/KeyValueContainer.java | 59 +++---- .../keyvalue/KeyValueContainerCheck.java | 99 +++++++---- .../keyvalue/impl/ChunkManagerFactory.java | 6 +- .../ozoneimpl/ContainerDataScanner.java | 108 ++++++++++++ .../ozoneimpl/ContainerMetadataScanner.java | 110 ++++++++++++ .../ozoneimpl/ContainerScrubber.java | 158 ------------------ .../ContainerScrubberConfiguration.java | 74 ++++++++ .../container/ozoneimpl/OzoneContainer.java | 41 +++-- .../keyvalue/TestKeyValueContainerCheck.java | 101 ++++++++++- .../ozone/TestOzoneConfigurationFields.java | 4 +- .../ozone/dn/scrubber/TestDataScrubber.java | 6 +- .../TestDataValidateWithDummyContainers.java | 5 +- 15 files changed, 543 insertions(+), 266 deletions(-) create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerDataScanner.java create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerMetadataScanner.java delete mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScrubber.java create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScrubberConfiguration.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java index 9e757c1ee17..c541f9b8289 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java @@ -65,15 +65,12 @@ public final class HddsConfigKeys { public static final float HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT = 0.9f; public static final String HDDS_SCM_SAFEMODE_ENABLED = "hdds.scm.safemode.enabled"; - public static final String HDDS_CONTAINERSCRUB_ENABLED = - "hdds.containerscrub.enabled"; - public static final boolean HDDS_CONTAINERSCRUB_ENABLED_DEFAULT = false; + public static final boolean HDDS_SCM_SAFEMODE_ENABLED_DEFAULT = true; public static final String HDDS_SCM_SAFEMODE_MIN_DATANODE = "hdds.scm.safemode.min.datanode"; public static final int HDDS_SCM_SAFEMODE_MIN_DATANODE_DEFAULT = 1; - public static final String HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT = "hdds.scm.wait.time.after.safemode.exit"; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumData.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumData.java index c0799bb25ee..4a927fbae6c 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumData.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumData.java @@ -40,9 +40,14 @@ public class ChecksumData { private List checksums; public ChecksumData(ChecksumType checksumType, int bytesPerChecksum) { + this(checksumType, bytesPerChecksum, Lists.newArrayList()); + } + + public ChecksumData(ChecksumType checksumType, int bytesPerChecksum, + List checksums) { this.type = checksumType; this.bytesPerChecksum = bytesPerChecksum; - this.checksums = Lists.newArrayList(); + this.checksums = checksums; } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java index 10fec6059cf..05ff93fb9fc 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java @@ -30,6 +30,8 @@ import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; +import org.apache.hadoop.hdfs.util.Canceler; +import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.hdfs.util.RwLock; import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; @@ -153,9 +155,27 @@ public interface Container extends RwLock { void updateBlockCommitSequenceId(long blockCommitSequenceId); /** - * check and report the structural integrity of the container. - * @return true if the integrity checks pass + * Scan the container metadata to detect corruption. + */ + boolean scanMetaData(); + + /** + * Return if the container data should be checksum verified to detect + * corruption. The result depends upon the current state of the container + * (e.g. if a container is accepting writes, it may not be a good idea to + * perform checksum verification to avoid concurrency issues). + */ + boolean shouldScanData(); + + /** + * Perform checksum verification for the container data. + * + * @param throttler A reference of {@link DataTransferThrottler} used to + * perform I/O bandwidth throttling + * @param canceler A reference of {@link Canceler} used to cancel the + * I/O bandwidth throttling (e.g. for shutdown purpose). + * @return true if the checksum verification succeeds * false otherwise */ - boolean check(); + boolean scanData(DataTransferThrottler throttler, Canceler canceler); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java index c57e92d21a1..53065cc6a6e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java @@ -39,6 +39,8 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; +import org.apache.hadoop.hdfs.util.Canceler; +import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; @@ -671,52 +673,33 @@ public class KeyValueContainer implements Container { .getContainerID() + OzoneConsts.DN_CONTAINER_DB); } - /** - * run integrity checks on the Container metadata. - */ - public boolean check() { - ContainerCheckLevel level = ContainerCheckLevel.NO_CHECK; + public boolean scanMetaData() { long containerId = containerData.getContainerID(); + KeyValueContainerCheck checker = + new KeyValueContainerCheck(containerData.getMetadataPath(), config, + containerId); + return checker.fastCheck(); + } - switch (containerData.getState()) { - case OPEN: - level = ContainerCheckLevel.FAST_CHECK; - LOG.info("Doing Fast integrity checks for Container ID : {}," - + " because it is OPEN", containerId); - break; - case CLOSING: - level = ContainerCheckLevel.FAST_CHECK; - LOG.info("Doing Fast integrity checks for Container ID : {}," - + " because it is CLOSING", containerId); - break; - case CLOSED: - case QUASI_CLOSED: - level = ContainerCheckLevel.FULL_CHECK; - LOG.debug("Doing Full integrity checks for Container ID : {}," - + " because it is in {} state", containerId, - containerData.getState()); - break; - default: - break; - } - - if (level == ContainerCheckLevel.NO_CHECK) { - LOG.debug("Skipping integrity checks for Container Id : {}", containerId); - return true; + @Override + public boolean shouldScanData() { + return containerData.getState() == ContainerDataProto.State.CLOSED + || containerData.getState() == ContainerDataProto.State.QUASI_CLOSED; + } + + public boolean scanData(DataTransferThrottler throttler, Canceler canceler) { + if (!shouldScanData()) { + throw new IllegalStateException("The checksum verification can not be" + + " done for container in state " + + containerData.getState()); } + long containerId = containerData.getContainerID(); KeyValueContainerCheck checker = new KeyValueContainerCheck(containerData.getMetadataPath(), config, containerId); - switch (level) { - case FAST_CHECK: - return checker.fastCheck(); - case FULL_CHECK: - return checker.fullCheck(); - default: - return true; - } + return checker.fullCheck(throttler, canceler); } private enum ContainerCheckLevel { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java index 3e252bf0b36..d2b26f9bd8a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java @@ -22,6 +22,11 @@ import com.google.common.base.Preconditions; import com.google.common.primitives.Longs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdfs.util.Canceler; +import org.apache.hadoop.hdfs.util.DataTransferThrottler; +import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.common.ChecksumData; +import org.apache.hadoop.ozone.common.OzoneChecksumException; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; @@ -30,12 +35,15 @@ import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils; import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil; +import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; -import java.util.List; +import java.io.InputStream; +import java.util.Arrays; -import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,13 +109,13 @@ public class KeyValueContainerCheck { * * @return true : integrity checks pass, false : otherwise. */ - public boolean fullCheck() { + public boolean fullCheck(DataTransferThrottler throttler, Canceler canceler) { boolean valid = false; try { valid = fastCheck(); if (valid) { - checkBlockDB(); + scanData(throttler, canceler); } } catch (IOException e) { handleCorruption(e); @@ -194,7 +202,8 @@ public class KeyValueContainerCheck { } } - private void checkBlockDB() throws IOException { + private void scanData(DataTransferThrottler throttler, Canceler canceler) + throws IOException { /** * Check the integrity of the DB inside each container. * In Scope: @@ -220,43 +229,67 @@ public class KeyValueContainerCheck { throw new IOException(dbFileErrorMsg); } - onDiskContainerData.setDbFile(dbFile); try(ReferenceCountedDB db = - BlockUtils.getDB(onDiskContainerData, checkConfig)) { - iterateBlockDB(db); - } - } + BlockUtils.getDB(onDiskContainerData, checkConfig); + KeyValueBlockIterator kvIter = new KeyValueBlockIterator(containerID, + new File(onDiskContainerData.getContainerPath()))) { - private void iterateBlockDB(ReferenceCountedDB db) - throws IOException { - Preconditions.checkState(db != null); - - // get "normal" keys from the Block DB - try(KeyValueBlockIterator kvIter = new KeyValueBlockIterator(containerID, - new File(onDiskContainerData.getContainerPath()))) { - - // ensure there is a chunk file for each key in the DB - while (kvIter.hasNext()) { + while(kvIter.hasNext()) { BlockData block = kvIter.nextBlock(); - - List chunkInfoList = block.getChunks(); - for (ContainerProtos.ChunkInfo chunk : chunkInfoList) { - File chunkFile; - chunkFile = ChunkUtils.getChunkFile(onDiskContainerData, + for(ContainerProtos.ChunkInfo chunk : block.getChunks()) { + File chunkFile = ChunkUtils.getChunkFile(onDiskContainerData, ChunkInfo.getFromProtoBuf(chunk)); - if (!chunkFile.exists()) { // concurrent mutation in Block DB? lookup the block again. byte[] bdata = db.getStore().get( Longs.toByteArray(block.getBlockID().getLocalID())); - if (bdata == null) { - LOG.trace("concurrency with delete, ignoring deleted block"); - break; // skip to next block from kvIter - } else { - String errorStr = "Missing chunk file " - + chunkFile.getAbsolutePath(); - throw new IOException(errorStr); + if (bdata != null) { + throw new IOException("Missing chunk file " + + chunkFile.getAbsolutePath()); + } + } else if (chunk.getChecksumData().getType() + != ContainerProtos.ChecksumType.NONE){ + int length = chunk.getChecksumData().getChecksumsList().size(); + ChecksumData cData = new ChecksumData( + chunk.getChecksumData().getType(), + chunk.getChecksumData().getBytesPerChecksum(), + chunk.getChecksumData().getChecksumsList()); + long bytesRead = 0; + byte[] buffer = new byte[cData.getBytesPerChecksum()]; + try (InputStream fs = new FileInputStream(chunkFile)) { + int i = 0, v = 0; + for (; i < length; i++) { + v = fs.read(buffer); + if (v == -1) { + break; + } + bytesRead += v; + throttler.throttle(v, canceler); + Checksum cal = new Checksum(cData.getChecksumType(), + cData.getBytesPerChecksum()); + ByteString expected = cData.getChecksums().get(i); + ByteString actual = cal.computeChecksum(buffer) + .getChecksums().get(0); + if (!Arrays.equals(expected.toByteArray(), + actual.toByteArray())) { + throw new OzoneChecksumException(String + .format("Inconsistent read for chunk=%s len=%d expected" + + " checksum %s actual checksum %s for block %s", + chunk.getChunkName(), chunk.getLen(), + Arrays.toString(expected.toByteArray()), + Arrays.toString(actual.toByteArray()), + block.getBlockID())); + } + + } + if (v == -1 && i < length) { + throw new OzoneChecksumException(String + .format("Inconsistent read for chunk=%s expected length=%d" + + " actual length=%d for block %s", + chunk.getChunkName(), + chunk.getLen(), bytesRead, block.getBlockID())); + } } } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerFactory.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerFactory.java index 8e2687b65bd..046bfddfaee 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerFactory.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerFactory.java @@ -65,12 +65,12 @@ public final class ChunkManagerFactory { if (!persist) { boolean scrubber = config.getBoolean( - HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED, - HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED_DEFAULT); + "hdds.containerscrub.enabled", + false); if (scrubber) { // Data Scrubber needs to be disabled for non-persistent chunks. LOG.warn("Failed to set " + HDDS_CONTAINER_PERSISTDATA + " to false." - + " Please set " + HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED + + " Please set hdds.containerscrub.enabled" + " also to false to enable non-persistent containers."); persist = true; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerDataScanner.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerDataScanner.java new file mode 100644 index 00000000000..2b0f3f33776 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerDataScanner.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.ozoneimpl; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.hdfs.util.Canceler; +import org.apache.hadoop.hdfs.util.DataTransferThrottler; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * VolumeScanner scans a single volume. Each VolumeScanner has its own thread. + *

They are all managed by the DataNode's BlockScanner. + */ +public class ContainerDataScanner extends Thread { + public static final Logger LOG = + LoggerFactory.getLogger(ContainerDataScanner.class); + + /** + * The volume that we're scanning. + */ + private final HddsVolume volume; + private final ContainerController controller; + private final DataTransferThrottler throttler; + private final Canceler canceler; + + /** + * True if the thread is stopping.

+ * Protected by this object's lock. + */ + private volatile boolean stopping = false; + + + public ContainerDataScanner(ContainerController controller, + HddsVolume volume, long bytesPerSec) { + this.controller = controller; + this.volume = volume; + this.throttler = new DataTransferThrottler(bytesPerSec); + this.canceler = new Canceler(); + setName("ContainerDataScanner(" + volume + ")"); + setDaemon(true); + } + + @Override + public void run() { + LOG.trace("{}: thread starting.", this); + try { + while (!stopping) { + Iterator itr = controller.getContainers(volume); + while (!stopping && itr.hasNext()) { + Container c = itr.next(); + try { + if (c.shouldScanData()) { + if(!c.scanData(throttler, canceler)) { + controller.markContainerUnhealthy( + c.getContainerData().getContainerID()); + } + } + } catch (IOException ex) { + long containerId = c.getContainerData().getContainerID(); + LOG.warn("Unexpected exception while scanning container " + + containerId, ex); + } + } + } + LOG.info("{} exiting.", this); + } catch (Throwable e) { + LOG.error("{} exiting because of exception ", this, e); + } + } + + public synchronized void shutdown() { + this.stopping = true; + this.canceler.cancel("ContainerDataScanner("+volume+") is shutting down"); + this.interrupt(); + try { + this.join(); + } catch (InterruptedException ex) { + LOG.warn("Unexpected exception while stopping data scanner for volume " + + volume, ex); + } + } + + @Override + public String toString() { + return "ContainerDataScanner(" + volume + + ", " + volume.getStorageID() + ")"; + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerMetadataScanner.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerMetadataScanner.java new file mode 100644 index 00000000000..8ff11409306 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerMetadataScanner.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.ozoneimpl; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; +import java.util.concurrent.TimeUnit; + +/** + * This class is responsible to perform metadata verification of the + * containers. + */ +public class ContainerMetadataScanner extends Thread { + public static final Logger LOG = + LoggerFactory.getLogger(ContainerMetadataScanner.class); + + private final ContainerController controller; + private final long metadataScanInterval; + /** + * True if the thread is stopping.

+ * Protected by this object's lock. + */ + private boolean stopping = false; + + public ContainerMetadataScanner(ContainerController controller, + long metadataScanInterval) { + this.controller = controller; + this.metadataScanInterval = metadataScanInterval; + setName("ContainerMetadataScanner"); + setDaemon(true); + } + + @Override + public void run() { + /** + * the outer daemon loop exits on down() + */ + LOG.info("Background ContainerMetadataScanner starting up"); + while (!stopping) { + long start = System.nanoTime(); + scrub(); + long interval = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()-start); + // ensure to delay next metadata scan with respect to user config. + if (!stopping && interval < metadataScanInterval) { + try { + Thread.sleep(metadataScanInterval - interval); + } catch (InterruptedException e) { + LOG.info("Background ContainerMetadataScanner interrupted." + + " Going to exit"); + } + } + } + } + + private void scrub() { + Iterator containerIt = controller.getContainers(); + long count = 0; + + while (!stopping && containerIt.hasNext()) { + Container container = containerIt.next(); + try { + scrub(container); + } catch (IOException e) { + LOG.info("Unexpected error while scrubbing container {}", + container.getContainerData().getContainerID()); + } + count++; + } + + LOG.debug("iterator ran integrity checks on {} containers", count); + } + + @VisibleForTesting + public void scrub(Container container) throws IOException { + if (!container.scanMetaData()) { + controller.markContainerUnhealthy( + container.getContainerData().getContainerID()); + } + } + + public synchronized void shutdown() { + this.stopping = true; + this.interrupt(); + try { + this.join(); + } catch (InterruptedException ex) { + LOG.warn("Unexpected exception while stopping metadata scanner.", ex); + } + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScrubber.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScrubber.java deleted file mode 100644 index ac473a419c4..00000000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScrubber.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.ozoneimpl; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.net.ntp.TimeStamp; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.ozone.container.common.interfaces.Container; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Iterator; - -/** - * Background Metadata scrubbing for Ozone Containers. - * Future scope : data(chunks) checksum verification. - */ -public class ContainerScrubber implements Runnable { - private static final Logger LOG = - LoggerFactory.getLogger(ContainerScrubber.class); - private final OzoneConfiguration config; - private final long timePerContainer = 10000; // 10 sec in millis - private boolean halt; - private Thread scrubThread; - private ContainerController controller; - - - public ContainerScrubber(OzoneConfiguration conf, - ContainerController controller) { - this.config = conf; - this.halt = false; - this.scrubThread = null; - this.controller = controller; - } - - @Override public void run() { - /** - * the outer daemon loop exits on down() - */ - LOG.info("Background ContainerScrubber starting up"); - while (true) { - - scrub(); - - if (this.halt) { - break; // stop and exit if requested - } - - try { - Thread.sleep(300000); /* 5 min between scans */ - } catch (InterruptedException e) { - LOG.info("Background ContainerScrubber interrupted. Going to exit"); - } - } - } - - /** - * Start the scrub scanner thread. - */ - public void up() { - - this.halt = false; - if (this.scrubThread == null) { - this.scrubThread = new Thread(this); - scrubThread.start(); - } else { - LOG.info("Scrubber up called multiple times. Scrub thread already up."); - } - } - - /** - * Stop the scrub scanner thread. Wait for thread to exit - */ - public void down() { - - this.halt = true; - if (scrubThread == null) { - LOG.info("Scrubber down invoked, but scrub thread is not running"); - return; - } - - this.scrubThread.interrupt(); - try { - this.scrubThread.join(); - } catch (Exception e) { - LOG.warn("Exception when waiting for Container Scrubber thread ", e); - } finally { - this.scrubThread = null; - } - } - - /** - * Current implementation : fixed rate scrub, no feedback loop. - * Dynamic throttling based on system load monitoring to be - * implemented later as jira [XXX] - * - * @param startTime - */ - private void throttleScrubber(TimeStamp startTime) { - TimeStamp endTime = new TimeStamp(System.currentTimeMillis()); - long timeTaken = endTime.getTime() - startTime.getTime(); - - if (timeTaken < timePerContainer) { - try { - Thread.sleep(timePerContainer - timeTaken); - } catch (InterruptedException e) { - LOG.debug("Ignoring interrupted sleep inside throttle"); - } - } - } - - private void scrub() { - Iterator containerIt = controller.getContainers(); - long count = 0; - - while (containerIt.hasNext() && !halt) { - TimeStamp startTime = new TimeStamp(System.currentTimeMillis()); - Container container = containerIt.next(); - try { - scrub(container); - } catch (IOException e) { - LOG.info("Unexpected error while scrubbing container {}", - container.getContainerData().getContainerID()); - } - - count++; - - throttleScrubber(startTime); - } - - LOG.debug("iterator ran integrity checks on {} containers", count); - } - - @VisibleForTesting - public void scrub(Container container) throws IOException { - if (!container.check()) { - controller.markContainerUnhealthy( - container.getContainerData().getContainerID()); - } - } -} \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScrubberConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScrubberConfiguration.java new file mode 100644 index 00000000000..bc830b6efc5 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScrubberConfiguration.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.ozoneimpl; + +import org.apache.hadoop.hdds.conf.Config; +import org.apache.hadoop.hdds.conf.ConfigGroup; +import org.apache.hadoop.hdds.conf.ConfigTag; +import org.apache.hadoop.hdds.conf.ConfigType; + +/** + * This class defines configuration parameters for container scrubber. + **/ +@ConfigGroup(prefix = "hdds.containerscrub") +public class ContainerScrubberConfiguration { + private boolean enabled; + private long metadataScanInterval; + private long bandwidthPerVolume; + + @Config(key = "enabled", + type = ConfigType.BOOLEAN, + defaultValue = "false", + tags = {ConfigTag.STORAGE}, + description = "Config parameter to enable container scrubber.") + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public boolean isEnabled() { + return enabled; + } + + @Config(key = "metadata.scan.interval", + type = ConfigType.TIME, + defaultValue = "3h", + tags = {ConfigTag.STORAGE}, + description = "Config parameter define time interval in milliseconds" + + " between two metadata scans by container scrubber.") + public void setMetadataScanInterval(long metadataScanInterval) { + this.metadataScanInterval = metadataScanInterval; + } + + public long getMetadataScanInterval() { + return metadataScanInterval; + } + + @Config(key = "volume.bytes.per.second", + type = ConfigType.LONG, + defaultValue = "1048576", + tags = {ConfigTag.STORAGE}, + description = "Config parameter to throttle I/O bandwidth used" + + " by scrubber per volume.") + public void setBandwidthPerVolume(long bandwidthPerVolume) { + this.bandwidthPerVolume = bandwidthPerVolume; + } + + public long getBandwidthPerVolume() { + return bandwidthPerVolume; + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index d6e4588a8a9..8b9dc5713e7 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.container.ozoneimpl; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; -import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto @@ -53,6 +52,7 @@ import org.slf4j.LoggerFactory; import java.io.*; import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -75,7 +75,8 @@ public class OzoneContainer { private final XceiverServerSpi writeChannel; private final XceiverServerSpi readChannel; private final ContainerController controller; - private ContainerScrubber scrubber; + private ContainerMetadataScanner metadataScanner; + private List dataScanners; private final BlockDeletingService blockDeletingService; /** @@ -92,7 +93,7 @@ public class OzoneContainer { this.config = conf; this.volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf); this.containerSet = new ContainerSet(); - this.scrubber = null; + this.metadataScanner = null; buildContainerSet(); final ContainerMetrics metrics = ContainerMetrics.create(conf); @@ -167,18 +168,28 @@ public class OzoneContainer { * Start background daemon thread for performing container integrity checks. */ private void startContainerScrub() { - boolean enabled = config.getBoolean( - HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED, - HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED_DEFAULT); + ContainerScrubberConfiguration c = config.getObject( + ContainerScrubberConfiguration.class); + boolean enabled = c.isEnabled(); + long metadataScanInterval = c.getMetadataScanInterval(); + long bytesPerSec = c.getBandwidthPerVolume(); if (!enabled) { - LOG.info("Background container scrubber has been disabled by {}", - HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED); + LOG.info("Background container scanner has been disabled."); } else { - if (this.scrubber == null) { - this.scrubber = new ContainerScrubber(config, controller); + if (this.metadataScanner == null) { + this.metadataScanner = new ContainerMetadataScanner(controller, + metadataScanInterval); + } + this.metadataScanner.start(); + + dataScanners = new ArrayList<>(); + for (HddsVolume v : volumeSet.getVolumesList()) { + ContainerDataScanner s = new ContainerDataScanner(controller, + v, bytesPerSec); + s.start(); + dataScanners.add(s); } - scrubber.up(); } } @@ -186,10 +197,14 @@ public class OzoneContainer { * Stop the scanner thread and wait for thread to die. */ private void stopContainerScrub() { - if (scrubber == null) { + if (metadataScanner == null) { return; } - scrubber.down(); + metadataScanner.shutdown(); + metadataScanner = null; + for (ContainerDataScanner s : dataScanners) { + s.shutdown(); + } } /** diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java index 5dccca6e374..eeeb364d6d5 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java @@ -19,21 +19,26 @@ package org.apache.hadoop.ozone.container.keyvalue; import com.google.common.primitives.Longs; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; +import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils; +import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil; import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl; import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScrubberConfiguration; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB; import org.junit.After; @@ -42,7 +47,9 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.RandomAccessFile; import java.util.Arrays; import java.util.ArrayList; import java.nio.ByteBuffer; @@ -55,7 +62,10 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_ROCKSDB; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + /** * Basic sanity test for the KeyValueContainerCheck class. @@ -66,7 +76,7 @@ import static org.junit.Assert.assertTrue; private KeyValueContainerData containerData; private ChunkManagerImpl chunkManager; private VolumeSet volumeSet; - private Configuration conf; + private OzoneConfiguration conf; private File testRoot; public TestKeyValueContainerCheck(String metadataImpl) { @@ -95,12 +105,15 @@ import static org.junit.Assert.assertTrue; * Sanity test, when there are no corruptions induced. * @throws Exception */ - @Test public void testKeyValueContainerCheckNoCorruption() throws Exception { + @Test + public void testKeyValueContainerCheckNoCorruption() throws Exception { long containerID = 101; int deletedBlocks = 1; int normalBlocks = 3; int chunksPerBlock = 4; boolean valid = false; + ContainerScrubberConfiguration c = conf.getObject( + ContainerScrubberConfiguration.class); // test Closed Container createContainerWithBlocks(containerID, normalBlocks, deletedBlocks, 65536, @@ -120,10 +133,70 @@ import static org.junit.Assert.assertTrue; container.close(); // next run checks on a Closed Container - valid = kvCheck.fullCheck(); + valid = kvCheck.fullCheck(new DataTransferThrottler( + c.getBandwidthPerVolume()), null); assertTrue(valid); } + /** + * Sanity test, when there are corruptions induced. + * @throws Exception + */ + @Test + public void testKeyValueContainerCheckCorruption() throws Exception { + long containerID = 102; + int deletedBlocks = 1; + int normalBlocks = 3; + int chunksPerBlock = 4; + boolean valid = false; + ContainerScrubberConfiguration sc = conf.getObject( + ContainerScrubberConfiguration.class); + + // test Closed Container + createContainerWithBlocks(containerID, normalBlocks, deletedBlocks, 65536, + chunksPerBlock); + File chunksPath = new File(containerData.getChunksPath()); + assertTrue(chunksPath.listFiles().length + == (deletedBlocks + normalBlocks) * chunksPerBlock); + + container.close(); + + KeyValueContainerCheck kvCheck = + new KeyValueContainerCheck(containerData.getMetadataPath(), conf, + containerID); + + File metaDir = new File(containerData.getMetadataPath()); + File dbFile = KeyValueContainerLocationUtil + .getContainerDBFile(metaDir, containerID); + containerData.setDbFile(dbFile); + try(ReferenceCountedDB db = + BlockUtils.getDB(containerData, conf); + KeyValueBlockIterator kvIter = new KeyValueBlockIterator(containerID, + new File(containerData.getContainerPath()))) { + BlockData block = kvIter.nextBlock(); + assertTrue(!block.getChunks().isEmpty()); + ContainerProtos.ChunkInfo c = block.getChunks().get(0); + File chunkFile = ChunkUtils.getChunkFile(containerData, + ChunkInfo.getFromProtoBuf(c)); + long length = chunkFile.length(); + assertTrue(length > 0); + // forcefully truncate the file to induce failure. + try (RandomAccessFile file = new RandomAccessFile(chunkFile, "rws")) { + file.setLength(length / 2); + } + assertEquals(length/2, chunkFile.length()); + } + + // metadata check should pass. + valid = kvCheck.fastCheck(); + assertTrue(valid); + + // checksum validation should fail. + valid = kvCheck.fullCheck(new DataTransferThrottler( + sc.getBandwidthPerVolume()), null); + assertFalse(valid); + } + /** * Creates a container with normal and deleted blocks. * First it will insert normal blocks, and then it will insert @@ -134,12 +207,15 @@ import static org.junit.Assert.assertTrue; * @throws Exception */ private void createContainerWithBlocks(long containerId, int normalBlocks, - int deletedBlocks, long chunkLen, int chunksPerBlock) throws Exception { + int deletedBlocks, int chunkLen, int chunksPerBlock) throws Exception { long chunkCount; String strBlock = "block"; String strChunk = "-chunkFile"; - byte[] chunkData = new byte[(int) chunkLen]; long totalBlks = normalBlocks + deletedBlocks; + Checksum checksum = new Checksum(ContainerProtos.ChecksumType.SHA256, + chunkLen); + byte[] chunkData = generateRandomData(chunkLen); + ChecksumData checksumData = checksum.computeChecksum(chunkData); containerData = new KeyValueContainerData(containerId, (long) StorageUnit.BYTES.toBytes( @@ -166,8 +242,8 @@ import static org.junit.Assert.assertTrue; chunkList.clear(); for (chunkCount = 0; chunkCount < chunksPerBlock; chunkCount++) { String chunkName = strBlock + i + strChunk + chunkCount; - long offset = chunkCount * chunkLen; - ChunkInfo info = new ChunkInfo(chunkName, offset, chunkLen); + ChunkInfo info = new ChunkInfo(chunkName, 0, chunkLen); + info.setChecksumData(checksumData); chunkList.add(info.getProtoBufMessage()); chunkManager .writeChunk(container, blockID, info, ByteBuffer.wrap(chunkData), @@ -195,4 +271,13 @@ import static org.junit.Assert.assertTrue; } } } + + private static byte[] generateRandomData(int length) { + assertTrue(length % 2 == 0); + ByteArrayOutputStream os = new ByteArrayOutputStream(length); + for (int i = 0; i < length; i++) { + os.write(i % 10); + } + return os.toByteArray(); + } } \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java index 0afd6b93f45..cbd6a0bd268 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java @@ -38,11 +38,13 @@ public class TestOzoneConfigurationFields extends TestConfigurationFieldsBase { new Class[] {OzoneConfigKeys.class, ScmConfigKeys.class, OMConfigKeys.class, HddsConfigKeys.class, ReconServerConfigKeys.class, - S3GatewayConfigKeys.class}; + S3GatewayConfigKeys.class + }; errorIfMissingConfigProps = true; errorIfMissingXmlProps = true; xmlPropsToSkipCompare.add("hadoop.tags.custom"); xmlPropsToSkipCompare.add("ozone.om.nodes.EXAMPLEOMSERVICEID"); + xmlPropsToSkipCompare.add("hdds.containerscrub.enabled"); addPropertiesNotInXml(); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scrubber/TestDataScrubber.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scrubber/TestDataScrubber.java index 0f35e50a174..e4d52621698 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scrubber/TestDataScrubber.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scrubber/TestDataScrubber.java @@ -45,7 +45,7 @@ import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.interfaces.Container; -import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScrubber; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerMetadataScanner; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; @@ -159,8 +159,8 @@ public class TestDataScrubber { deleteDirectory(chunksDir); Assert.assertFalse(chunksDir.exists()); - ContainerScrubber sb = new ContainerScrubber(ozoneConfig, - oc.getController()); + ContainerMetadataScanner sb = new ContainerMetadataScanner( + oc.getController(), 0); sb.scrub(c); // wait for the incremental container report to propagate to SCM diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidateWithDummyContainers.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidateWithDummyContainers.java index d8c98c26ebc..b0683bd9156 100644 --- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidateWithDummyContainers.java +++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidateWithDummyContainers.java @@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.freon; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScrubberConfiguration; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -44,7 +45,9 @@ public class TestDataValidateWithDummyContainers @BeforeClass public static void init() throws Exception { OzoneConfiguration conf = new OzoneConfiguration(); - conf.setBoolean(HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED, false); + ContainerScrubberConfiguration sc = + conf.getObject(ContainerScrubberConfiguration.class); + sc.setEnabled(false); conf.setBoolean(HddsConfigKeys.HDDS_CONTAINER_PERSISTDATA, false); conf.setBoolean(OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED, false);