diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/Scheduler.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/Scheduler.java index 1171dbff64b..71eaf3397fc 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/Scheduler.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/Scheduler.java @@ -87,14 +87,17 @@ public class Scheduler { * yet executed are also cancelled. For the executing tasks the scheduler * waits 60 seconds for completion. */ - public void close() { + public synchronized void close() { isClosed = true; - scheduler.shutdownNow(); - try { - scheduler.awaitTermination(60, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOG.info(threadName + " interrupted while waiting for task completion {}", - e); + if (scheduler != null) { + scheduler.shutdownNow(); + try { + scheduler.awaitTermination(60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.info( + threadName + " interrupted while waiting for task completion {}", + e); + } } scheduler = null; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/SCMChillModeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/SCMChillModeManager.java index 849a65695ca..1c7c881bf5e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/SCMChillModeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/SCMChillModeManager.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; -import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.EventQueue; import org.slf4j.Logger; @@ -147,8 +146,7 @@ public class SCMChillModeManager { emitChillModeStatus(); // TODO: #CLUTIL if we reenter chill mode the fixed interval pipeline // creation job needs to stop - RatisPipelineUtils - .scheduleFixedIntervalPipelineCreator(pipelineManager, config); + pipelineManager.startPipelineCreator(); } public boolean getInChillMode() { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java index 38b3fe02640..1dc924b2575 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hdds.scm.node; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; -import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -41,7 +40,6 @@ public class NewNodeHandler implements EventHandler { @Override public void onMessage(DatanodeDetails datanodeDetails, EventPublisher publisher) { - RatisPipelineUtils - .triggerPipelineCreation(pipelineManager, conf, 0); + pipelineManager.triggerPipelineCreation(); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java index 2c9b685ce1a..5976c17a607 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java @@ -21,7 +21,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; -import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -43,6 +42,6 @@ public class NonHealthyToHealthyNodeHandler @Override public void onMessage(DatanodeDetails datanodeDetails, EventPublisher publisher) { - RatisPipelineUtils.triggerPipelineCreation(pipelineManager, conf, 0); + pipelineManager.triggerPipelineCreation(); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java index 93630f04200..26e8f5fb279 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; -import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.slf4j.Logger; @@ -61,8 +60,7 @@ public class StaleNodeHandler implements EventHandler { for (PipelineID pipelineID : pipelineIds) { try { Pipeline pipeline = pipelineManager.getPipeline(pipelineID); - RatisPipelineUtils - .finalizeAndDestroyPipeline(pipelineManager, pipeline, conf, true); + pipelineManager.finalizeAndDestroyPipeline(pipeline, true); } catch (IOException e) { LOG.info("Could not finalize pipeline={} for dn={}", pipelineID, datanodeDetails); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java new file mode 100644 index 00000000000..26e11b80ff7 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.pipeline; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.utils.Scheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Implements api for running background pipeline creation jobs. + */ +class BackgroundPipelineCreator { + + private static final Logger LOG = + LoggerFactory.getLogger(BackgroundPipelineCreator.class); + + private final Scheduler scheduler; + private final AtomicBoolean isPipelineCreatorRunning; + private final PipelineManager pipelineManager; + private final Configuration conf; + + BackgroundPipelineCreator(PipelineManager pipelineManager, + Scheduler scheduler, Configuration conf) { + this.pipelineManager = pipelineManager; + this.conf = conf; + this.scheduler = scheduler; + isPipelineCreatorRunning = new AtomicBoolean(false); + } + + private boolean shouldSchedulePipelineCreator() { + return isPipelineCreatorRunning.compareAndSet(false, true); + } + + /** + * Schedules a fixed interval job to create pipelines. + */ + void startFixedIntervalPipelineCreator() { + long intervalInMillis = conf + .getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL, + ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + // TODO: #CLUTIL We can start the job asap + scheduler.scheduleWithFixedDelay(() -> { + if (!shouldSchedulePipelineCreator()) { + return; + } + createPipelines(); + }, 0, intervalInMillis, TimeUnit.MILLISECONDS); + } + + /** + * Triggers pipeline creation via background thread. + */ + void triggerPipelineCreation() { + // TODO: #CLUTIL introduce a better mechanism to not have more than one + // job of a particular type running, probably via ratis. + if (!shouldSchedulePipelineCreator()) { + return; + } + scheduler.schedule(this::createPipelines, 0, TimeUnit.MILLISECONDS); + } + + private void createPipelines() { + // TODO: #CLUTIL Different replication factor may need to be supported + HddsProtos.ReplicationType type = HddsProtos.ReplicationType.valueOf( + conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE, + OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT)); + + for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor + .values()) { + while (true) { + try { + if (scheduler.isClosed()) { + break; + } + pipelineManager.createPipeline(type, factor); + } catch (IOException ioe) { + break; + } catch (Throwable t) { + LOG.error("Error while creating pipelines {}", t); + break; + } + } + } + isPipelineCreatorRunning.set(false); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java index 94f757b5f0a..da704d24af4 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java @@ -59,9 +59,7 @@ public class PipelineActionHandler Pipeline pipeline = pipelineManager.getPipeline(pipelineID); LOG.info("Received pipeline action {} for {} from datanode [}", action.getAction(), pipeline, report.getDatanodeDetails()); - RatisPipelineUtils - .finalizeAndDestroyPipeline(pipelineManager, pipeline, ozoneConf, - true); + pipelineManager.finalizeAndDestroyPipeline(pipeline, true); } catch (IOException ioe) { LOG.error("Could not execute pipeline action={} pipeline={} {}", action, pipelineID, ioe); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java index 9a846adaf4b..89349761bfc 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java @@ -61,10 +61,4 @@ public final class PipelineFactory { List nodes) { return providers.get(type).create(factor, nodes); } - - public void close() { - for (PipelineProvider p : providers.values()) { - p.close(); - } - } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index 11ba2c3b0af..2793647b7f4 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -67,9 +67,12 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean { int getNumberOfContainers(PipelineID pipelineID) throws IOException; - void finalizePipeline(PipelineID pipelineID) throws IOException; - void openPipeline(PipelineID pipelineId) throws IOException; - void removePipeline(PipelineID pipelineID) throws IOException; + void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout) + throws IOException; + + void startPipelineCreator(); + + void triggerPipelineCreation(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java index 610e78a690d..bb165337511 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java @@ -33,5 +33,4 @@ public interface PipelineProvider { Pipeline create(ReplicationFactor factor, List nodes); - void close(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java index 9c914b077b2..330ad8bffc1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java @@ -108,15 +108,6 @@ public class PipelineReportHandler implements // if all the dns have reported, pipeline can be moved to OPEN state pipelineManager.openPipeline(pipelineID); } - } else if (pipeline.isClosed()) { - int numContainers = pipelineManager.getNumberOfContainers(pipelineID); - if (numContainers == 0) { - // since all the containers have been closed the pipeline can be - // destroyed - LOGGER.info("Destroying pipeline {} as all containers are closed", - pipeline); - RatisPipelineUtils.destroyPipeline(pipelineManager, pipeline, conf); - } } else { // In OPEN state case just report the datanode pipeline.reportDatanode(dn); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index b73f63d9702..695220029a8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacem import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState; -import org.apache.hadoop.utils.Scheduler; import java.io.IOException; import java.lang.reflect.Constructor; @@ -38,8 +37,6 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; - /** * Implements Api for creating ratis pipelines. */ @@ -48,20 +45,12 @@ public class RatisPipelineProvider implements PipelineProvider { private final NodeManager nodeManager; private final PipelineStateManager stateManager; private final Configuration conf; - private static Scheduler scheduler; - //TODO static Scheduler should be removed!!!! HDDS-1128 - @SuppressFBWarnings("ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD") RatisPipelineProvider(NodeManager nodeManager, PipelineStateManager stateManager, Configuration conf) { this.nodeManager = nodeManager; this.stateManager = stateManager; this.conf = conf; - scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1); - } - - static Scheduler getScheduler() { - return scheduler; } /** @@ -98,7 +87,6 @@ public class RatisPipelineProvider implements PipelineProvider { } } - @Override public Pipeline create(ReplicationFactor factor) throws IOException { // Get set of datanodes already used for ratis pipeline @@ -146,9 +134,4 @@ public class RatisPipelineProvider implements PipelineProvider { protected void initializePipeline(Pipeline pipeline) throws IOException { RatisPipelineUtils.createPipeline(pipeline, conf); } - - @Override - public void close() { - scheduler.close(); - } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java index 89dfc0e8599..3b36add0ed9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java @@ -19,12 +19,10 @@ package org.apache.hadoop.hdds.scm.pipeline; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.io.MultipleIOException; -import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.ratis.RatisHelper; import org.apache.ratis.client.RaftClient; import org.apache.ratis.grpc.GrpcTlsConfig; @@ -42,17 +40,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; /** * Utility class for Ratis pipelines. Contains methods to create and destroy * ratis pipelines. */ -public final class RatisPipelineUtils { - - private static AtomicBoolean isPipelineCreatorRunning = - new AtomicBoolean(false); +final class RatisPipelineUtils { private static final Logger LOG = LoggerFactory.getLogger(RatisPipelineUtils.class); @@ -87,13 +80,11 @@ public final class RatisPipelineUtils { * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all * the datanodes. * - * @param pipelineManager - SCM pipeline manager * @param pipeline - Pipeline to be destroyed * @param ozoneConf - Ozone configuration * @throws IOException */ - public static void destroyPipeline(PipelineManager pipelineManager, - Pipeline pipeline, Configuration ozoneConf) throws IOException { + static void destroyPipeline(Pipeline pipeline, Configuration ozoneConf) { final RaftGroup group = RatisHelper.newRaftGroup(pipeline); LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group); for (DatanodeDetails dn : pipeline.getNodes()) { @@ -104,42 +95,6 @@ public final class RatisPipelineUtils { pipeline.getId(), dn); } } - // remove the pipeline from the pipeline manager - pipelineManager.removePipeline(pipeline.getId()); - triggerPipelineCreation(pipelineManager, ozoneConf, 0); - } - - /** - * Finalizes pipeline in the SCM. Removes pipeline and sends ratis command to - * destroy pipeline on the datanodes immediately or after timeout based on the - * value of onTimeout parameter. - * - * @param pipelineManager - SCM pipeline manager - * @param pipeline - Pipeline to be destroyed - * @param ozoneConf - Ozone Configuration - * @param onTimeout - if true pipeline is removed and destroyed on - * datanodes after timeout - * @throws IOException - */ - public static void finalizeAndDestroyPipeline(PipelineManager pipelineManager, - Pipeline pipeline, Configuration ozoneConf, boolean onTimeout) - throws IOException { - final RaftGroup group = RatisHelper.newRaftGroup(pipeline); - LOG.info("destroying pipeline:{} with {}", pipeline.getId(), group); - pipelineManager.finalizePipeline(pipeline.getId()); - if (onTimeout) { - long pipelineDestroyTimeoutInMillis = ozoneConf - .getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, - ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT, - TimeUnit.MILLISECONDS); - RatisPipelineProvider.getScheduler() - .schedule(() -> destroyPipeline(pipelineManager, pipeline, ozoneConf), - pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS, LOG, String - .format("Destroy pipeline failed for pipeline:%s with %s", - pipeline.getId(), group)); - } else { - destroyPipeline(pipelineManager, pipeline, ozoneConf); - } } /** @@ -194,80 +149,14 @@ public final class RatisPipelineUtils { retryPolicy, maxOutstandingRequests, tlsConfig)) { rpc.accept(client, p); } catch (IOException ioe) { - exceptions.add( - new IOException("Failed invoke Ratis rpc " + rpc + " for " + - d.getUuid(), ioe)); + String errMsg = + "Failed invoke Ratis rpc " + rpc + " for " + d.getUuid(); + LOG.error(errMsg, ioe); + exceptions.add(new IOException(errMsg, ioe)); } }); if (!exceptions.isEmpty()) { throw MultipleIOException.createIOException(exceptions); } } - - /** - * Schedules a fixed interval job to create pipelines. - * - * @param pipelineManager - Pipeline manager - * @param conf - Configuration - */ - public static void scheduleFixedIntervalPipelineCreator( - PipelineManager pipelineManager, Configuration conf) { - long intervalInMillis = conf - .getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL, - ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT, - TimeUnit.MILLISECONDS); - // TODO: #CLUTIL We can start the job asap - RatisPipelineProvider.getScheduler().scheduleWithFixedDelay(() -> { - if (!isPipelineCreatorRunning.compareAndSet(false, true)) { - return; - } - createPipelines(pipelineManager, conf); - }, intervalInMillis, intervalInMillis, TimeUnit.MILLISECONDS); - } - - /** - * Triggers pipeline creation after the specified time. - * - * @param pipelineManager - Pipeline manager - * @param conf - Configuration - * @param afterMillis - Time after which pipeline creation needs to be - * triggered - */ - public static void triggerPipelineCreation(PipelineManager pipelineManager, - Configuration conf, long afterMillis) { - // TODO: #CLUTIL introduce a better mechanism to not have more than one - // job of a particular type running, probably via ratis. - if (!isPipelineCreatorRunning.compareAndSet(false, true)) { - return; - } - RatisPipelineProvider.getScheduler() - .schedule(() -> createPipelines(pipelineManager, conf), afterMillis, - TimeUnit.MILLISECONDS); - } - - private static void createPipelines(PipelineManager pipelineManager, - Configuration conf) { - // TODO: #CLUTIL Different replication factor may need to be supported - HddsProtos.ReplicationType type = HddsProtos.ReplicationType.valueOf( - conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE, - OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT)); - - for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor - .values()) { - while (true) { - try { - if (RatisPipelineProvider.getScheduler().isClosed()) { - break; - } - pipelineManager.createPipeline(type, factor); - } catch (IOException ioe) { - break; - } catch (Throwable t) { - LOG.error("Error while creating pipelines {}", t); - break; - } - } - } - isPipelineCreatorRunning.set(false); - } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index 1bb00991ef3..f274829df37 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.node.NodeManager; @@ -34,6 +35,7 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.utils.MetadataKeyFilters; import org.apache.hadoop.utils.MetadataStore; import org.apache.hadoop.utils.MetadataStoreBuilder; +import org.apache.hadoop.utils.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +48,7 @@ import java.util.Map; import java.util.NavigableSet; import java.util.Set; import java.util.Collection; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -68,19 +71,27 @@ public class SCMPipelineManager implements PipelineManager { private final ReadWriteLock lock; private final PipelineFactory pipelineFactory; private final PipelineStateManager stateManager; - private final MetadataStore pipelineStore; + private final BackgroundPipelineCreator backgroundPipelineCreator; + private Scheduler scheduler; + private MetadataStore pipelineStore; private final EventPublisher eventPublisher; private final NodeManager nodeManager; private final SCMPipelineMetrics metrics; + private final Configuration conf; // Pipeline Manager MXBean private ObjectName pmInfoBean; public SCMPipelineManager(Configuration conf, NodeManager nodeManager, EventPublisher eventPublisher) throws IOException { this.lock = new ReentrantReadWriteLock(); + this.conf = conf; this.stateManager = new PipelineStateManager(conf); this.pipelineFactory = new PipelineFactory(nodeManager, stateManager, conf); + // TODO: See if thread priority needs to be set for these threads + scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1); + this.backgroundPipelineCreator = + new BackgroundPipelineCreator(this, scheduler, conf); int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB, OZONE_SCM_DB_CACHE_SIZE_DEFAULT); final File metaDir = ServerUtils.getScmDbDir(conf); @@ -267,20 +278,6 @@ public class SCMPipelineManager implements PipelineManager { return stateManager.getNumberOfContainers(pipelineID); } - @Override - public void finalizePipeline(PipelineID pipelineId) throws IOException { - lock.writeLock().lock(); - try { - stateManager.finalizePipeline(pipelineId); - Set containerIDs = stateManager.getContainers(pipelineId); - for (ContainerID containerID : containerIDs) { - eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID); - } - } finally { - lock.writeLock().unlock(); - } - } - @Override public void openPipeline(PipelineID pipelineId) throws IOException { lock.writeLock().lock(); @@ -291,19 +288,31 @@ public class SCMPipelineManager implements PipelineManager { } } + /** + * Finalizes pipeline in the SCM. Removes pipeline and makes rpc call to + * destroy pipeline on the datanodes immediately or after timeout based on the + * value of onTimeout parameter. + * + * @param pipeline - Pipeline to be destroyed + * @param onTimeout - if true pipeline is removed and destroyed on + * datanodes after timeout + * @throws IOException + */ @Override - public void removePipeline(PipelineID pipelineID) throws IOException { - lock.writeLock().lock(); - try { - pipelineStore.delete(pipelineID.getProtobuf().toByteArray()); - Pipeline pipeline = stateManager.removePipeline(pipelineID); - nodeManager.removePipeline(pipeline); - metrics.incNumPipelineDestroyed(); - } catch (IOException ex) { - metrics.incNumPipelineDestroyFailed(); - throw ex; - } finally { - lock.writeLock().unlock(); + public void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout) + throws IOException { + LOG.info("destroying pipeline:{}", pipeline); + finalizePipeline(pipeline.getId()); + if (onTimeout) { + long pipelineDestroyTimeoutInMillis = + conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, + ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + scheduler.schedule(() -> destroyPipeline(pipeline), + pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS, LOG, + String.format("Destroy pipeline failed for pipeline:%s", pipeline)); + } else { + destroyPipeline(pipeline); } } @@ -319,14 +328,87 @@ public class SCMPipelineManager implements PipelineManager { return pipelineInfo; } + /** + * Schedules a fixed interval job to create pipelines. + */ + @Override + public void startPipelineCreator() { + backgroundPipelineCreator.startFixedIntervalPipelineCreator(); + } + + /** + * Triggers pipeline creation after the specified time. + */ + @Override + public void triggerPipelineCreation() { + backgroundPipelineCreator.triggerPipelineCreation(); + } + + /** + * Moves the pipeline to CLOSED state and sends close container command for + * all the containers in the pipeline. + * + * @param pipelineId - ID of the pipeline to be moved to CLOSED state. + * @throws IOException + */ + private void finalizePipeline(PipelineID pipelineId) throws IOException { + lock.writeLock().lock(); + try { + stateManager.finalizePipeline(pipelineId); + Set containerIDs = stateManager.getContainers(pipelineId); + for (ContainerID containerID : containerIDs) { + eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID); + } + } finally { + lock.writeLock().unlock(); + } + } + + /** + * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all + * the datanodes for ratis pipelines. + * + * @param pipeline - Pipeline to be destroyed + * @throws IOException + */ + private void destroyPipeline(Pipeline pipeline) throws IOException { + RatisPipelineUtils.destroyPipeline(pipeline, conf); + // remove the pipeline from the pipeline manager + removePipeline(pipeline.getId()); + triggerPipelineCreation(); + } + + /** + * Removes the pipeline from the db and pipeline state map. + * + * @param pipelineId - ID of the pipeline to be removed + * @throws IOException + */ + private void removePipeline(PipelineID pipelineId) throws IOException { + lock.writeLock().lock(); + try { + pipelineStore.delete(pipelineId.getProtobuf().toByteArray()); + Pipeline pipeline = stateManager.removePipeline(pipelineId); + nodeManager.removePipeline(pipeline); + metrics.incNumPipelineDestroyed(); + } catch (IOException ex) { + metrics.incNumPipelineDestroyFailed(); + throw ex; + } finally { + lock.writeLock().unlock(); + } + } + @Override public void close() throws IOException { - if (pipelineFactory != null) { - pipelineFactory.close(); + if (scheduler != null) { + scheduler.close(); + scheduler = null; } if (pipelineStore != null) { pipelineStore.close(); + pipelineStore = null; } if(pmInfoBean != null) { MBeans.unregister(this.pmInfoBean); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java index b92f17e5525..3e42df33268 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java @@ -72,9 +72,4 @@ public class SimplePipelineProvider implements PipelineProvider { .setNodes(nodes) .build(); } - - @Override - public void close() { - // Nothing to do in here. - } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index 46460d20e17..e85da540fd4 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -46,7 +46,6 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; -import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; import org.apache.hadoop.io.IOUtils; @@ -411,8 +410,7 @@ public class SCMClientProtocolServer implements PipelineManager pipelineManager = scm.getPipelineManager(); Pipeline pipeline = pipelineManager.getPipeline(PipelineID.getFromProtobuf(pipelineID)); - RatisPipelineUtils - .finalizeAndDestroyPipeline(pipelineManager, pipeline, conf, false); + pipelineManager.finalizeAndDestroyPipeline(pipeline, false); AUDIT.logWriteSuccess( buildAuditMessageForSuccess(SCMAction.CLOSE_PIPELINE, null) ); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java index 88d9f9b3de7..2e0af7bcf3b 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; -import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils; import org.apache.hadoop.hdds.scm.server.SCMConfigurator; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.server.events.EventHandler; @@ -274,8 +273,7 @@ public class TestBlockManager implements EventHandler { .waitFor(() -> !blockManager.isScmInChillMode(), 10, 1000 * 5); for (Pipeline pipeline : pipelineManager.getPipelines()) { - RatisPipelineUtils - .finalizeAndDestroyPipeline(pipelineManager, pipeline, conf, false); + pipelineManager.finalizeAndDestroyPipeline(pipeline, false); } Assert.assertEquals(0, pipelineManager.getPipelines(type, factor).size()); Assert.assertNotNull(blockManager diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestSCMChillModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestSCMChillModeManager.java index 55dca160b34..01b78f27a20 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestSCMChillModeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestSCMChillModeManager.java @@ -35,19 +35,16 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.MockNodeManager; -import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider; +import org.apache.hadoop.hdds.scm.pipeline.*; import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider; -import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.test.GenericTestUtils; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; +import org.mockito.Mockito; /** Test class for SCMChillModeManager. */ @@ -128,12 +125,13 @@ public class TestSCMChillModeManager { } @Test - @Ignore("TODO:HDDS-1140") public void testDisableChillMode() { OzoneConfiguration conf = new OzoneConfiguration(config); conf.setBoolean(HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED, false); - scmChillModeManager = new SCMChillModeManager( - conf, containers, null, queue); + PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); + Mockito.doNothing().when(pipelineManager).startPipelineCreator(); + scmChillModeManager = + new SCMChillModeManager(conf, containers, pipelineManager, queue); assertFalse(scmChillModeManager.getInChillMode()); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java index db70a7a3df3..c7470a3cb60 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java @@ -112,8 +112,8 @@ public class TestNode2PipelineMap { ratisContainer.getPipeline().getId()); Assert.assertEquals(0, set2.size()); - pipelineManager.finalizePipeline(ratisContainer.getPipeline().getId()); - pipelineManager.removePipeline(ratisContainer.getPipeline().getId()); + pipelineManager + .finalizeAndDestroyPipeline(ratisContainer.getPipeline(), false); pipelines = scm.getScmNodeManager() .getPipelines(dns.get(0)); Assert diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java index e855d2ce167..eb4dba52809 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java @@ -24,24 +24,23 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.TestUtils; -import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -121,12 +120,8 @@ public class TestPipelineClose { .getContainersInPipeline(ratisContainer.getPipeline().getId()); Assert.assertEquals(0, setClosed.size()); - pipelineManager.finalizePipeline(ratisContainer.getPipeline().getId()); - Pipeline pipeline1 = pipelineManager - .getPipeline(ratisContainer.getPipeline().getId()); - Assert.assertEquals(Pipeline.PipelineState.CLOSED, - pipeline1.getPipelineState()); - pipelineManager.removePipeline(pipeline1.getId()); + pipelineManager + .finalizeAndDestroyPipeline(ratisContainer.getPipeline(), false); for (DatanodeDetails dn : ratisContainer.getPipeline().getNodes()) { // Assert that the pipeline has been removed from Node2PipelineMap as well Assert.assertFalse(scm.getScmNodeManager().getPipelines(dn) @@ -135,21 +130,23 @@ public class TestPipelineClose { } @Test - public void testPipelineCloseWithOpenContainer() throws IOException, - TimeoutException, InterruptedException { + public void testPipelineCloseWithOpenContainer() + throws IOException, TimeoutException, InterruptedException { Set setOpen = pipelineManager.getContainersInPipeline( ratisContainer.getPipeline().getId()); Assert.assertEquals(1, setOpen.size()); - ContainerID cId2 = ratisContainer.getContainerInfo().containerID(); - pipelineManager.finalizePipeline(ratisContainer.getPipeline().getId()); - Assert.assertEquals(Pipeline.PipelineState.CLOSED, - pipelineManager.getPipeline( - ratisContainer.getPipeline().getId()).getPipelineState()); - Pipeline pipeline2 = pipelineManager - .getPipeline(ratisContainer.getPipeline().getId()); - Assert.assertEquals(Pipeline.PipelineState.CLOSED, - pipeline2.getPipelineState()); + pipelineManager + .finalizeAndDestroyPipeline(ratisContainer.getPipeline(), false); + GenericTestUtils.waitFor(() -> { + try { + return containerManager + .getContainer(ratisContainer.getContainerInfo().containerID()) + .getState() == HddsProtos.LifeCycleState.CLOSING; + } catch (ContainerNotFoundException e) { + return false; + } + }, 100, 10000); } @Test @@ -183,39 +180,4 @@ public class TestPipelineClose { } catch (PipelineNotFoundException e) { } } - - @Test - public void testPipelineCloseWithPipelineReport() throws IOException { - Pipeline pipeline = ratisContainer.getPipeline(); - pipelineManager.finalizePipeline(pipeline.getId()); - // remove pipeline from SCM - pipelineManager.removePipeline(pipeline.getId()); - - for (DatanodeDetails dn : pipeline.getNodes()) { - PipelineReportFromDatanode pipelineReport = - TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId()); - EventQueue eventQueue = new EventQueue(); - SCMChillModeManager scmChillModeManager = - new SCMChillModeManager(new OzoneConfiguration(), - new ArrayList<>(), pipelineManager, eventQueue); - PipelineReportHandler pipelineReportHandler = - new PipelineReportHandler(scmChillModeManager, pipelineManager, conf); - // on receiving pipeline report for the pipeline, pipeline report handler - // should destroy the pipeline for the dn - pipelineReportHandler.onMessage(pipelineReport, eventQueue); - } - - OzoneContainer ozoneContainer = - cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() - .getContainer(); - List pipelineReports = - ozoneContainer.getPipelineReport().getPipelineReportList(); - for (PipelineReport pipelineReport : pipelineReports) { - // pipeline should not be reported by any dn - Assert.assertNotEquals( - PipelineID.getFromProtobuf(pipelineReport.getPipelineID()), - ratisContainer.getPipeline().getId()); - } - } - -} \ No newline at end of file +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineUtils.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineUtils.java index 2180834cd4f..b653e7a2b92 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineUtils.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineUtils.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdds.scm.pipeline; +import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; @@ -46,6 +47,8 @@ public class TestRatisPipelineUtils { private static PipelineManager pipelineManager; public void init(int numDatanodes) throws Exception { + conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, + GenericTestUtils.getRandomizedTempPath()); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(numDatanodes) .setHbInterval(1000) @@ -71,8 +74,7 @@ public class TestRatisPipelineUtils { .getPipelines(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN); for (Pipeline pipeline : pipelines) { - RatisPipelineUtils - .finalizeAndDestroyPipeline(pipelineManager, pipeline, conf, false); + pipelineManager.finalizeAndDestroyPipeline(pipeline, false); } // make sure two pipelines are created waitForPipelines(2); @@ -108,7 +110,13 @@ public class TestRatisPipelineUtils { for (HddsDatanodeService dn : dns) { cluster.restartHddsDatanode(dn.getDatanodeDetails(), false); } + + // destroy the existing pipelines + for (Pipeline pipeline : pipelines) { + pipelineManager.finalizeAndDestroyPipeline(pipeline, false); + } // make sure pipelines is created after node start + pipelineManager.triggerPipelineCreation(); waitForPipelines(1); } @@ -117,6 +125,6 @@ public class TestRatisPipelineUtils { GenericTestUtils.waitFor(() -> pipelineManager .getPipelines(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN) - .size() == numPipelines, 100, 20000); + .size() == numPipelines, 100, 40000); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java index 4d8e3af656d..990d73ab90f 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java @@ -104,8 +104,7 @@ public class TestSCMPipelineManager { // clean up for (Pipeline pipeline : pipelines) { - pipelineManager.finalizePipeline(pipeline.getId()); - pipelineManager.removePipeline(pipeline.getId()); + pipelineManager.finalizeAndDestroyPipeline(pipeline, false); } pipelineManager.close(); } @@ -126,10 +125,7 @@ public class TestSCMPipelineManager { pipelineManager.openPipeline(pipeline.getId()); pipelineManager .addContainerToPipeline(pipeline.getId(), ContainerID.valueof(1)); - pipelineManager.finalizePipeline(pipeline.getId()); - pipelineManager - .removeContainerFromPipeline(pipeline.getId(), ContainerID.valueof(1)); - pipelineManager.removePipeline(pipeline.getId()); + pipelineManager.finalizeAndDestroyPipeline(pipeline, false); pipelineManager.close(); // new pipeline manager should not be able to load removed pipelines @@ -192,13 +188,12 @@ public class TestSCMPipelineManager { .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isOpen()); // close the pipeline - pipelineManager.finalizePipeline(pipeline.getId()); + pipelineManager.finalizeAndDestroyPipeline(pipeline, false); for (DatanodeDetails dn: pipeline.getNodes()) { PipelineReportFromDatanode pipelineReportFromDatanode = TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId()); - // pipeline report for a closed pipeline should destroy the pipeline - // and remove it from the pipeline manager + // pipeline report for destroyed pipeline should be ignored pipelineReportHandler .onMessage(pipelineReportFromDatanode, new EventQueue()); } diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java index 280cf90211d..13ecab60226 100644 --- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java +++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java @@ -25,7 +25,6 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; -import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; import org.junit.AfterClass; @@ -103,7 +102,6 @@ public class TestFreonWithPipelineDestroy { PipelineManager pipelineManager = cluster.getStorageContainerManager().getPipelineManager(); Pipeline pipeline = pipelineManager.getPipeline(id); - RatisPipelineUtils - .finalizeAndDestroyPipeline(pipelineManager, pipeline, conf, false); + pipelineManager.finalizeAndDestroyPipeline(pipeline, false); } }