diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJob.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJob.java new file mode 100644 index 00000000000..847092a2aa2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJob.java @@ -0,0 +1,361 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.procedure; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; +import java.util.HashMap; +import java.util.Set; +import java.util.HashSet; +import java.util.List; +import java.util.ArrayList; + +/** + * A Job is a state machine consists of many procedures. The procedures are + * executed as a chain. Each procedure needs to specify the next procedure. If + * there is no next procedure then the job is finished. + */ +public final class BalanceJob implements Writable { + private String id; + private BalanceProcedureScheduler scheduler; + private volatile boolean jobDone = false; + private Exception error; + public static final Logger LOG = LoggerFactory.getLogger(BalanceJob.class); + private Map procedureTable = new HashMap<>(); + private T firstProcedure; + private T curProcedure; + private T lastProcedure; + private boolean removeAfterDone; + + static final String NEXT_PROCEDURE_NONE = "NONE"; + private static Set reservedNames = new HashSet<>(); + + static { + reservedNames.add(NEXT_PROCEDURE_NONE); + } + + public static class Builder { + + private List procedures = new ArrayList<>(); + private boolean removeAfterDone = false; + + /** + * Append a procedure to the tail. + */ + public Builder nextProcedure(T procedure) { + int size = procedures.size(); + if (size > 0) { + procedures.get(size - 1).setNextProcedure(procedure.name()); + } + procedure.setNextProcedure(NEXT_PROCEDURE_NONE); + procedures.add(procedure); + return this; + } + + /** + * Automatically remove this job from the scheduler cache when the job is + * done. + */ + public Builder removeAfterDone(boolean remove) { + removeAfterDone = remove; + return this; + } + + public BalanceJob build() throws IOException { + BalanceJob job = new BalanceJob(procedures, removeAfterDone); + for (BalanceProcedure p : procedures) { + p.setJob(job); + } + return job; + } + } + + private BalanceJob(Iterable procedures, boolean remove) + throws IOException { + for (T p : procedures) { + String taskName = p.name(); + if (reservedNames.contains(taskName)) { + throw new IOException(taskName + " is reserved."); + } + procedureTable.put(p.name(), p); + if (firstProcedure == null) { + firstProcedure = p; + } + } + removeAfterDone = remove; + lastProcedure = null; + curProcedure = firstProcedure; + } + + /** + * Run the state machine. + */ + public void execute() { + boolean quit = false; + try { + while (!jobDone && !quit && scheduler.isRunning()) { + if (curProcedure == null) { // Job done. + finish(null); + quit = true; + } else { + if (curProcedure == firstProcedure || lastProcedure != curProcedure) { + LOG.info("Start procedure {}, last procedure is {}", + curProcedure.name(), + lastProcedure == null ? null : lastProcedure.name()); + } + if (curProcedure.execute()) { + lastProcedure = curProcedure; + curProcedure = next(); + } + if (!scheduler.writeJournal(this)) { + quit = true; // Write journal failed. Simply quit because this job + // has already been added to the recoverQueue. + LOG.debug("Write journal failed. Quit and wait for recovery."); + } + } + } + } catch (BalanceProcedure.RetryException tre) { + scheduler.delay(this, curProcedure.delayMillisBeforeRetry()); + } catch (Exception e) { + finish(e); + } catch (Throwable t) { + IOException err = new IOException("Got throwable error.", t); + finish(err); + } + } + + private T next() { + if (curProcedure == null) { + return firstProcedure; + } else { + return procedureTable.get(curProcedure.nextProcedure()); + } + } + + /** + * Job finishes. It could be either success or failure. + * @param exception the exception that causes the job to fail. null indicates + * the job is successful. + */ + private synchronized void finish(Exception exception) { + assert !jobDone; + if (scheduler.jobDone(this)) { + jobDone = true; + error = exception; + notifyAll(); + } + } + + void setScheduler(BalanceProcedureScheduler scheduler) { + this.scheduler = scheduler; + } + + void setId(String id) { + this.id = id; + } + + /** + * Get the uid of the job. + */ + public String getId() { + return this.id; + } + + /** + * Whether this job should be removed after it's done. + */ + @VisibleForTesting + public boolean shouldRemoveAfterDone() { + return removeAfterDone; + } + + @VisibleForTesting + void setLastProcedure(T lastProcedure) { + this.lastProcedure = lastProcedure; + } + + @VisibleForTesting + void setCurrentProcedure(T currentProcedure) { + this.curProcedure = currentProcedure; + } + + /** + * Return true if the job has finished. + */ + public boolean isJobDone() { + return jobDone; + } + + /** + * Wait until the job is done. + */ + public synchronized void waitJobDone() throws InterruptedException { + while (!jobDone) { + wait(); + } + } + + /** + * Return the error exception during the job execution. This should be called + * after the job finishes. + */ + public Exception getError() { + return error; + } + + @Override + public void write(DataOutput out) throws IOException { + if (id == null) { + throw new IOException("BalanceJob with id=null can not be serialized."); + } + Text.writeString(out, id); + int taskTableSize = procedureTable.size(); + out.writeInt(taskTableSize); + for (T p : procedureTable.values()) { + Text.writeString(out, p.getClass().getName()); + p.write(out); + } + if (firstProcedure != null) { + Text.writeString(out, firstProcedure.name()); + } else { + Text.writeString(out, NEXT_PROCEDURE_NONE); + } + if (curProcedure != null) { + Text.writeString(out, curProcedure.name()); + } else { + Text.writeString(out, NEXT_PROCEDURE_NONE); + } + if (lastProcedure != null) { + Text.writeString(out, lastProcedure.name()); + } else { + Text.writeString(out, NEXT_PROCEDURE_NONE); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + this.id = Text.readString(in); + procedureTable = new HashMap<>(); + int taskTableSize = in.readInt(); + for (int i = 0; i < taskTableSize; i++) { + String className = Text.readString(in); + try { + T p = (T) ReflectionUtils.newInstance(Class.forName(className), null); + p.readFields(in); + procedureTable.put(p.name(), p); + } catch (Exception e) { + LOG.error("Failed reading Procedure.", e); + throw new IOException(e); + } + } + String firstProcedureName = Text.readString(in); + if (firstProcedureName.equals(NEXT_PROCEDURE_NONE)) { + firstProcedure = null; + } else { + firstProcedure = procedureTable.get(firstProcedureName); + } + String currentProcedureName = Text.readString(in); + if (currentProcedureName.equals(NEXT_PROCEDURE_NONE)) { + curProcedure = null; + } else { + curProcedure = procedureTable.get(currentProcedureName); + } + String lastProcedureName = Text.readString(in); + if (lastProcedureName.equals(NEXT_PROCEDURE_NONE)) { + lastProcedure = null; + } else { + lastProcedure = procedureTable.get(lastProcedureName); + } + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } + if (obj.getClass() != getClass()) { + return false; + } + BalanceJob bj = (BalanceJob) obj; + return new EqualsBuilder() + .append(id, bj.id) + .append(procedureTable, bj.procedureTable) + .append(firstProcedure, bj.firstProcedure) + .isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37) + .append(id) + .append(procedureTable) + .toHashCode(); + } + + @Override + public String toString() { + return "{jobId=" + id + "}"; + } + + /** + * Get the detail description of this job. + */ + public String getDetailMessage() { + StringBuilder builder = new StringBuilder(); + builder.append("id=").append(id); + if (firstProcedure != null) { + builder.append(",firstProcedure=").append(firstProcedure); + } + if (curProcedure != null) { + builder.append(",currentProcedure=").append(curProcedure); + } + builder.append(",jobDone=").append(jobDone); + if (error != null) { + builder.append(",error=").append(error.getMessage()); + } + return builder.toString(); + } + + boolean isSchedulerShutdown() { + return !scheduler.isRunning(); + } + + @VisibleForTesting + Map getProcedureTable() { + return procedureTable; + } + + @VisibleForTesting + T getCurProcedure() { + return curProcedure; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJournal.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJournal.java new file mode 100644 index 00000000000..011ae857bc1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJournal.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.procedure; + +import org.apache.hadoop.conf.Configurable; + +import java.io.IOException; + +/** + * The Journal of the state machine. It handles the job persistence and recover. + */ +public interface BalanceJournal extends Configurable { + + /** + * Save journal of this job. + */ + void saveJob(BalanceJob job) throws IOException; + + /** + * Recover the job from journal. + */ + void recoverJob(BalanceJob job) throws IOException; + + /** + * List all unfinished jobs. + */ + BalanceJob[] listAllJobs() throws IOException; + + /** + * Clear all the journals of this job. + */ + void clear(BalanceJob job) throws IOException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJournalInfoHDFS.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJournalInfoHDFS.java new file mode 100644 index 00000000000..4e759d8d7f3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJournalInfoHDFS.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.procedure; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.SequentialNumber; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.SCHEDULER_JOURNAL_URI; +import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.TMP_TAIL; +import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.JOB_PREFIX; + +/** + * BalanceJournal based on HDFS. This class stores all the journals in the HDFS. + * The jobs are persisted into the HDFS and recovered from the HDFS. + */ +public class BalanceJournalInfoHDFS implements BalanceJournal { + + public static final Logger LOG = LoggerFactory.getLogger( + BalanceJournalInfoHDFS.class); + + public static class IdGenerator extends SequentialNumber { + protected IdGenerator(long initialValue) { + super(initialValue); + } + } + + private URI workUri; + private Configuration conf; + private IdGenerator generator; + + /** + * Save job journal to HDFS. + * + * All the journals are saved in the path base-dir. Each job has an individual + * directory named after the job id. + * When a job is saved, a new journal file is created. The file's name + * consists of a prefix 'JOB-' and an incremental sequential id. The file with + * the largest id is the latest journal of this job. + * + * Layout: + * base-dir/ + * /job-3f1da5e5-2a60-48de-8736-418d134edbe9/ + * /JOB-0 + * /JOB-3 + * /JOB-5 + * /job-ebc19478-2324-46c2-8d1a-2f8c4391dc09/ + * /JOB-1 + * /JOB-2 + * /JOB-4 + */ + public void saveJob(BalanceJob job) throws IOException { + Path jobFile = getNewStateJobPath(job); + Path tmpJobFile = new Path(jobFile + TMP_TAIL); + FSDataOutputStream out = null; + try { + FileSystem fs = FileSystem.get(workUri, conf); + out = fs.create(tmpJobFile); + job.write(new DataOutputStream(out)); + out.close(); + out = null; + fs.rename(tmpJobFile, jobFile); + } finally { + IOUtils.closeStream(out); + } + LOG.debug("Save journal of job={}", job); + } + + /** + * Recover job from journal on HDFS. + */ + public void recoverJob(BalanceJob job) throws IOException { + FSDataInputStream in = null; + try { + Path logPath = getLatestStateJobPath(job); + FileSystem fs = FileSystem.get(workUri, conf); + in = fs.open(logPath); + job.readFields(in); + LOG.debug("Recover job={} from journal.", job); + } finally { + if (in != null) { + in.close(); + } + } + } + + @Override + public BalanceJob[] listAllJobs() throws IOException { + FileSystem fs = FileSystem.get(workUri, conf); + Path workPath = new Path(workUri.getPath()); + FileStatus[] statuses; + try { + statuses = fs.listStatus(workPath); + } catch (FileNotFoundException e) { + LOG.debug("Create work path {}", workPath); + fs.mkdirs(workPath); + return new BalanceJob[0]; + } + BalanceJob[] jobs = new BalanceJob[statuses.length]; + StringBuilder builder = new StringBuilder(); + builder.append("List all jobs from journal ["); + for (int i = 0; i < statuses.length; i++) { + if (statuses[i].isDirectory()) { + jobs[i] = new BalanceJob.Builder<>().build(); + jobs[i].setId(statuses[i].getPath().getName()); + builder.append(jobs[i]); + if (i < statuses.length -1) { + builder.append(", "); + } + } + } + builder.append("]"); + LOG.debug(builder.toString()); + return jobs; + } + + @Override + public void clear(BalanceJob job) throws IOException { + Path jobBase = getJobBaseDir(job); + FileSystem fs = FileSystem.get(workUri, conf); + if (fs.exists(jobBase)) { + fs.delete(jobBase, true); + } + LOG.debug("Clear journal of job=" + job); + } + + @Override + public void setConf(Configuration conf) { + try { + this.workUri = new URI(conf.get(SCHEDULER_JOURNAL_URI)); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("URI resolution failed.", e); + } + this.conf = conf; + this.generator = new IdGenerator(Time.monotonicNow()); + } + + @Override + public Configuration getConf() { + return conf; + } + + private Path getJobBaseDir(BalanceJob job) { + String jobId = job.getId(); + return new Path(workUri.getPath(), jobId); + } + + private Path getNewStateJobPath(BalanceJob job) { + Path basePath = getJobBaseDir(job); + Path logPath = new Path(basePath, JOB_PREFIX + generator.nextValue()); + return logPath; + } + + private Path getLatestStateJobPath(BalanceJob job) throws IOException { + Path latestFile = null; + Path basePath = getJobBaseDir(job); + FileSystem fs = FileSystem.get(workUri, conf); + RemoteIterator iterator = fs.listFiles(basePath, false); + while (iterator.hasNext()) { + FileStatus status = iterator.next(); + String fileName = status.getPath().getName(); + if (fileName.startsWith(JOB_PREFIX) && !fileName.contains(TMP_TAIL)) { + if (latestFile == null) { + latestFile = status.getPath(); + } else if (latestFile.getName().compareTo(fileName) <= 0) { + latestFile = status.getPath(); + } + } + } + return latestFile; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedure.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedure.java new file mode 100644 index 00000000000..6320e8fe994 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedure.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.procedure; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import static org.apache.hadoop.hdfs.procedure.BalanceJob.NEXT_PROCEDURE_NONE; + +/** + * The basic components of the Job. Extend this class to implement different + * job logic. + */ +public abstract class BalanceProcedure + implements Writable { + + public static final Logger LOG = + LoggerFactory.getLogger(BalanceProcedure.class); + private String nextProcedure; // the procedure after this procedure. + private String name; // the name of this procedure. + private long delayDuration; // this specifies how long will this procedure be + // delayed. The delay is triggered by throwing a + // RetryException. + private BalanceJob job; + + public BalanceProcedure() { + } + + /** + * The constructor of BalanceProcedure. + * + * @param name the name of the procedure. + * @param nextProcedure the name of the next procedure. + * @param delayDuration the delay duration when this procedure is delayed. + */ + public BalanceProcedure(String name, String nextProcedure, + long delayDuration) { + this(); + this.name = name; + this.nextProcedure = nextProcedure; + this.delayDuration = delayDuration; + } + + public BalanceProcedure(String name, long delayDuration) { + this(name, NEXT_PROCEDURE_NONE, delayDuration); + } + + /** + * The main process. This is called by the ProcedureScheduler. + + * Make sure the process quits fast when it's interrupted and the scheduler is + * shut down. + * + * One procedure may have many phases and all the phases share the same member + * variables. Each time this method returns, the journal is saved. User can + * serialize the current phase in write(DataOutput) so the job can continue + * with the last unfinished phase after it is recovered. + * The return value indicates whether the job should go to the next procedure. + * Return true after all the phases finish. + * + * Example: + * class ProcedureWithManyPhase extends BalanceProcedure { + * + * enum PHASE { + * P1, P2, P3 + * } + * PHASE phase; + * + * public boolean execute(T lastProcedure) throws RetryException, + * IOException { + * switch (phase) { + * case P1: + * // do something. + * return false; + * case P2: + * // do something. + * return false; + * case P3: + * // do something. + * return true; + * default: + * throw new IOException("Unexpected phase " + phase); + * } + * } + * + * public void write(DataOutput out) { + * out.writeInt(phase.ordinal()); + * } + * + * public void readFields(DataInput in) throws IOException { + * stage = Stage.values()[in.readInt()]; + * } + * } + * + * + * @throws RetryException if this procedure needs delay a while then retry. + * @return true if the procedure has done and the job will go to the next + * procedure, otherwise false. + */ + public abstract boolean execute() throws RetryException, IOException; + + /** + * The time in milliseconds the procedure should wait before retry. + */ + public long delayMillisBeforeRetry() { + return delayDuration; + } + + /** + * The active flag. + */ + protected boolean isSchedulerShutdown() { + return job.isSchedulerShutdown(); + } + + protected void setNextProcedure(String nextProcedure) { + this.nextProcedure = nextProcedure; + } + + void setJob(BalanceJob job) { + this.job = job; + } + + /** + * Get the next procedure. + */ + public String nextProcedure() { + return nextProcedure; + } + + /** + * Get the procedure name. + */ + public String name() { + return name; + } + + @Override + public void write(DataOutput out) throws IOException { + if (nextProcedure == null) { + Text.writeString(out, NEXT_PROCEDURE_NONE); + } else { + Text.writeString(out, nextProcedure); + } + Text.writeString(out, name); + new LongWritable(delayDuration).write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + nextProcedure = Text.readString(in); + name = Text.readString(in); + delayDuration = readLong(in); + } + + private static long readLong(DataInput in) throws IOException { + LongWritable delayWritable = new LongWritable(); + delayWritable.readFields(in); + return delayWritable.get(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37) + .append(nextProcedure) + .append(name) + .append(delayDuration) + .toHashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } + if (obj.getClass() != getClass()) { + return false; + } + BalanceProcedure rhs = (BalanceProcedure) obj; + return new EqualsBuilder() + .append(nextProcedure, rhs.nextProcedure) + .append(name, rhs.name) + .append(delayDuration, rhs.delayDuration) + .build(); + } + + @Override + public String toString() { + return name + ":" + this.getClass().getName(); + } + + /** + * The RetryException represents the current procedure should be delayed then + * retried. + */ + public static class RetryException extends Exception { + public RetryException() {} + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedureConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedureConfigKeys.java new file mode 100644 index 00000000000..f8690351960 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedureConfigKeys.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.procedure; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * This class contains constants for configuration keys and default values + * used in hdfs procedure. + */ +@InterfaceAudience.Private +public final class BalanceProcedureConfigKeys { + /* The worker threads number of the BalanceProcedureScheduler */ + public static final String WORK_THREAD_NUM = + "hadoop.hdfs.procedure.work.thread.num"; + public static final int WORK_THREAD_NUM_DEFAULT = 10; + /* The uri of the journal */ + public static final String SCHEDULER_JOURNAL_URI = + "hadoop.hdfs.procedure.scheduler.journal.uri"; + public static final String JOB_PREFIX = "JOB-"; + public static final String TMP_TAIL = ".tmp"; + public static final String JOURNAL_CLASS = + "hadoop.hdfs.procedure.journal.class"; + + private BalanceProcedureConfigKeys() {} +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedureScheduler.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedureScheduler.java new file mode 100644 index 00000000000..74606c5580e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedureScheduler.java @@ -0,0 +1,450 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.procedure; + +import com.google.common.annotations.VisibleForTesting; + +import java.io.IOException; +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.lang3.builder.CompareToBuilder; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.WORK_THREAD_NUM; +import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.WORK_THREAD_NUM_DEFAULT; +import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.JOURNAL_CLASS; +/** + * The state machine framework consist of: + * Job: The state machine. It implements the basic logic of the + * state machine. + * Procedure: The components of the job. It implements the custom + * logic. + * ProcedureScheduler: The multi-thread model responsible for running, + * recovering, handling errors and job persistence. + * Journal: It handles the job persistence and recover. + * + * Example: + * Job.Builder builder = new Job.Builder<>(); + * builder.nextProcedure(new WaitProcedure("wait", 1000, 30 * 1000)); + * Job job = builder.build(); + * + * ProcedureScheduler scheduler = new ProcedureScheduler(CONF); + * scheduler.init(); + * scheduler.submit(job); + * scheduler.waitUntilDone(job); + */ +public class BalanceProcedureScheduler { + public static final Logger LOG = + LoggerFactory.getLogger(BalanceProcedureScheduler.class); + // The set containing all the jobs, including submitted and recovered ones. + private ConcurrentHashMap jobSet; + // Containing jobs pending for running. + private LinkedBlockingQueue runningQueue; + // Containing jobs pending for wake up. + private DelayQueue delayQueue; + // Containing jobs pending for recovery. + private LinkedBlockingQueue recoverQueue; + private Configuration conf; + private BalanceJournal journal; // handle jobs' journals. + + private Thread readerThread; // consume the runningQueue and send to workers. + private ThreadPoolExecutor workersPool; // the real threads running the jobs. + private Thread roosterThread; // wake up the jobs in the delayQueue. + private Thread recoverThread; // recover the jobs in the recoverQueue. + // The running state of this scheduler. + private AtomicBoolean running = new AtomicBoolean(true); + + public BalanceProcedureScheduler(Configuration conf) { + this.conf = conf; + } + + /** + * Init the scheduler. + * + * @param recoverJobs whether to recover all the jobs from journal or not. + */ + public synchronized void init(boolean recoverJobs) throws IOException { + this.runningQueue = new LinkedBlockingQueue<>(); + this.delayQueue = new DelayQueue<>(); + this.recoverQueue = new LinkedBlockingQueue<>(); + this.jobSet = new ConcurrentHashMap<>(); + + // start threads. + this.roosterThread = new Rooster(); + this.roosterThread.setDaemon(true); + roosterThread.start(); + this.recoverThread = new Recover(); + this.recoverThread.setDaemon(true); + recoverThread.start(); + int workerNum = conf.getInt(WORK_THREAD_NUM, WORK_THREAD_NUM_DEFAULT); + workersPool = new ThreadPoolExecutor(workerNum, workerNum * 2, 1, + TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>()); + this.readerThread = new Reader(); + this.readerThread.start(); + + // init journal. + Class clazz = (Class) conf + .getClass(JOURNAL_CLASS, BalanceJournalInfoHDFS.class); + journal = ReflectionUtils.newInstance(clazz, conf); + + if (recoverJobs) { + recoverAllJobs(); + } + } + + /** + * Submit the job. + */ + public synchronized void submit(BalanceJob job) throws IOException { + if (!running.get()) { + throw new IOException("Scheduler is shutdown."); + } + String jobId = allocateJobId(); + job.setId(jobId); + job.setScheduler(this); + journal.saveJob(job); + jobSet.put(job, job); + runningQueue.add(job); + LOG.info("Add new job={}", job); + } + + /** + * Remove the job from scheduler if it finishes. + */ + public BalanceJob remove(BalanceJob job) { + BalanceJob inner = findJob(job); + if (inner == null) { + return null; + } else if (job.isJobDone()) { + synchronized (this) { + return jobSet.remove(inner); + } + } + return null; + } + + /** + * Find job in scheduler. + * + * @return the job in scheduler. Null if the schedule has no job with the + * same id. + */ + public BalanceJob findJob(BalanceJob job) { + BalanceJob found = null; + for (BalanceJob j : jobSet.keySet()) { + if (j.getId().equals(job.getId())) { + found = j; + break; + } + } + return found; + } + + /** + * Return all jobs in the scheduler. + */ + public Collection getAllJobs() { + return jobSet.values(); + } + + /** + * Wait permanently until the job is done. + */ + public void waitUntilDone(BalanceJob job) { + BalanceJob found = findJob(job); + if (found == null || found.isJobDone()) { + return; + } + while (!found.isJobDone()) { + try { + found.waitJobDone(); + } catch (InterruptedException e) { + } + } + } + + /** + * Delay this job. + */ + void delay(BalanceJob job, long delayInMilliseconds) { + delayQueue.add(new DelayWrapper(job, delayInMilliseconds)); + LOG.info("Need delay {}ms. Add to delayQueue. job={}", delayInMilliseconds, + job); + } + + boolean jobDone(BalanceJob job) { + try { + journal.clear(job); + if (job.shouldRemoveAfterDone()) { + jobSet.remove(job); + } + return true; + } catch (IOException e) { + LOG.warn("Clear journal failed, add to recoverQueue. job=" + job, e); + recoverQueue.add(job); + return false; + } + } + + /** + * Save current status to journal. + */ + boolean writeJournal(BalanceJob job) { + try { + journal.saveJob(job); + return true; + } catch (Exception e) { + LOG.warn("Save procedure failed, add to recoverQueue. job=" + job, e); + recoverQueue.add(job); + return false; + } + } + + /** + * The running state of the scheduler. + */ + public boolean isRunning() { + return running.get(); + } + + /** + * Shutdown the scheduler. + */ + public synchronized void shutDown() { + if (!running.get()) { + return; + } + running.set(false); + readerThread.interrupt(); + roosterThread.interrupt(); + recoverThread.interrupt(); + workersPool.shutdownNow(); + } + + /** + * Shutdown scheduler and wait at most timeout seconds for procedures to + * finish. + * @param timeout Wait at most timeout seconds for procedures to finish. + */ + public synchronized void shutDownAndWait(int timeout) { + shutDown(); + while (readerThread.isAlive()) { + try { + readerThread.join(); + } catch (InterruptedException e) { + } + } + while (roosterThread.isAlive()) { + try { + roosterThread.join(); + } catch (InterruptedException e) { + } + } + while (recoverThread.isAlive()) { + try { + recoverThread.join(); + } catch (InterruptedException e) { + } + } + while (!workersPool.isTerminated()) { + try { + workersPool.awaitTermination(timeout, TimeUnit.SECONDS); + } catch (InterruptedException e) { + } + } + } + + /** + * Search all jobs and add them to recoverQueue. It's called once after the + * scheduler starts. + */ + private void recoverAllJobs() throws IOException { + BalanceJob[] jobs = journal.listAllJobs(); + for (BalanceJob job : jobs) { + recoverQueue.add(job); + jobSet.put(job, job); + } + } + + @VisibleForTesting + static String allocateJobId() { + return "job-" + UUID.randomUUID(); + } + + @VisibleForTesting + public void setJournal(BalanceJournal journal) { + this.journal = journal; + } + + /** + * This thread consumes the delayQueue and move the jobs to the runningQueue. + */ + class Rooster extends Thread { + @Override + public void run() { + while (running.get()) { + try { + DelayWrapper dJob = delayQueue.take(); + runningQueue.add(dJob.getJob()); + LOG.info("Wake up job={}", dJob.getJob()); + } catch (InterruptedException e) { + // ignore interrupt exception. + } + } + } + } + + /** + * This thread consumes the runningQueue and give the job to the workers. + */ + class Reader extends Thread { + @Override + public void run() { + while (running.get()) { + try { + final BalanceJob job = runningQueue.poll(500, TimeUnit.MILLISECONDS); + if (job != null) { + workersPool.submit(() -> { + LOG.info("Start job. job_msg={}", job.getDetailMessage()); + job.execute(); + if (!running.get()) { + return; + } + if (job.isJobDone()) { + if (job.getError() == null) { + LOG.info("Job done. job={}", job); + } else { + LOG.warn("Job failed. job=" + job, job.getError()); + } + } + return; + }); + } + } catch (InterruptedException e) { + // ignore interrupt exception. + } + } + } + } + + /** + * This thread consumes the recoverQueue, recovers the job the adds it to the + * runningQueue. + */ + class Recover extends Thread { + @Override + public void run() { + while (running.get()) { + BalanceJob job = null; + try { + job = recoverQueue.poll(500, TimeUnit.MILLISECONDS); + } catch (InterruptedException ie) { + // ignore interrupt exception. + } + if (job != null) { + try { + journal.recoverJob(job); + job.setScheduler(BalanceProcedureScheduler.this); + runningQueue.add(job); + LOG.info("Recover success, add to runningQueue. job={}", job); + } catch (IOException e) { + LOG.warn("Recover failed, re-add to recoverQueue. job=" + job, e); + recoverQueue.add(job); + } + } + } + } + } + + /** + * Wrap the delayed BalanceJob. + */ + private static class DelayWrapper implements Delayed { + private BalanceJob job; + private long time; + + DelayWrapper(BalanceJob job, long delayInMilliseconds) { + this.job = job; + this.time = Time.monotonicNow() + delayInMilliseconds; + } + + BalanceJob getJob() { + return job; + } + + @Override + public long getDelay(TimeUnit unit) { + long delay = time - Time.monotonicNow(); + if (delay < 0) { + delay = 0; + } + return unit.convert(delay, TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(Delayed o) { + DelayWrapper dw = (DelayWrapper) o; + return new CompareToBuilder() + .append(time, dw.time) + .append(job, dw.job) + .toComparison(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37) + .append(time) + .append(job) + .toHashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } + if (obj.getClass() != getClass()) { + return false; + } + DelayWrapper dw = (DelayWrapper) obj; + return new EqualsBuilder() + .appendSuper(super.equals(obj)) + .append(time, dw.time) + .append(job, dw.job) + .build(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/package-info.java new file mode 100644 index 00000000000..626d3b3727c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/package-info.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Classes under this package implement a state machine used for balancing data + * across federation namespaces. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving + +package org.apache.hadoop.hdfs.procedure; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/MultiPhaseProcedure.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/MultiPhaseProcedure.java new file mode 100644 index 00000000000..27cfebd3a34 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/MultiPhaseProcedure.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.procedure; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * This simulates a procedure with many phases. This is used for test. + */ +public class MultiPhaseProcedure extends BalanceProcedure { + + private int totalPhase; + private int currentPhase = 0; + private Configuration conf; + private FileSystem fs; + private Path path; + + public MultiPhaseProcedure() {} + + public MultiPhaseProcedure(String name, long delay, int totalPhase, + Configuration config, String spath) throws IOException { + super(name, delay); + this.totalPhase = totalPhase; + this.conf = config; + this.path = new Path(spath); + this.fs = path.getFileSystem(config); + } + + @Override + public boolean execute() throws IOException { + if (currentPhase < totalPhase) { + LOG.info("Current phase {}", currentPhase); + Path phase = new Path(path, "phase-" + currentPhase); + if (!fs.exists(phase)) { + fs.mkdirs(phase); + } + currentPhase++; + try { + Thread.sleep(100); + } catch (InterruptedException e) { + } + return false; + } + return true; + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeInt(totalPhase); + out.writeInt(currentPhase); + conf.write(out); + Text.writeString(out, path.toString()); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + totalPhase = in.readInt(); + currentPhase = in.readInt(); + conf = new Configuration(false); + conf.readFields(in); + path = new Path(Text.readString(in)); + fs = path.getFileSystem(conf); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/RecordProcedure.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/RecordProcedure.java new file mode 100644 index 00000000000..706d4a1bcec --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/RecordProcedure.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.procedure; + +import java.util.ArrayList; +import java.util.List; + +/** + * This procedure records all the finished procedures. This is used for test. + */ +public class RecordProcedure extends BalanceProcedure { + + private static List finish = new ArrayList<>(); + + public RecordProcedure() {} + + public RecordProcedure(String name, long delay) { + super(name, delay); + } + + @Override + public boolean execute() throws RetryException { + finish.add(this); + return true; + } + + public static List getFinishList() { + return finish; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/RetryProcedure.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/RetryProcedure.java new file mode 100644 index 00000000000..336873e6a85 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/RetryProcedure.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.procedure; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * This simulates a procedure needs many retries. This is used for test. + */ +public class RetryProcedure extends BalanceProcedure { + + private int retryTime = 1; + private int totalRetry = 0; + + public RetryProcedure() {} + + public RetryProcedure(String name, long delay, int retryTime) { + super(name, delay); + this.retryTime = retryTime; + } + + @Override + public boolean execute() throws RetryException { + if (retryTime > 0) { + retryTime--; + totalRetry++; + throw new RetryException(); + } + return true; + } + + public int getTotalRetry() { + return totalRetry; + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeInt(retryTime); + out.writeInt(totalRetry); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + retryTime = in.readInt(); + totalRetry = in.readInt(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/TestBalanceProcedureScheduler.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/TestBalanceProcedureScheduler.java new file mode 100644 index 00000000000..39e000b644d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/TestBalanceProcedureScheduler.java @@ -0,0 +1,451 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.procedure; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Time; +import org.junit.BeforeClass; +import org.junit.AfterClass; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.io.ByteArrayOutputStream; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.SCHEDULER_JOURNAL_URI; +import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.WORK_THREAD_NUM; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotSame; +import static org.mockito.ArgumentMatchers.any; + +/** + * Test BalanceProcedureScheduler. + */ +public class TestBalanceProcedureScheduler { + + private static MiniDFSCluster cluster; + private static final Configuration CONF = new Configuration(); + private static DistributedFileSystem fs; + private static final int DEFAULT_BLOCK_SIZE = 512; + + @BeforeClass + public static void setup() throws IOException { + CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, + true); + CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "hdfs:///"); + CONF.setBoolean(DFS_NAMENODE_ACLS_ENABLED_KEY, true); + CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); + CONF.setInt(WORK_THREAD_NUM, 1); + + cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build(); + cluster.waitClusterUp(); + cluster.waitActive(); + + fs = cluster.getFileSystem(); + String workPath = + "hdfs://" + cluster.getNameNode().getHostAndPort() + "/procedure"; + CONF.set(SCHEDULER_JOURNAL_URI, workPath); + fs.mkdirs(new Path(workPath)); + } + + @AfterClass + public static void close() { + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * Test the scheduler could be shutdown correctly. + */ + @Test(timeout = 60000) + public void testShutdownScheduler() throws Exception { + BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF); + scheduler.init(true); + // construct job + BalanceJob.Builder builder = new BalanceJob.Builder<>(); + builder.nextProcedure(new WaitProcedure("wait", 1000, 5 * 1000)); + BalanceJob job = builder.build(); + + scheduler.submit(job); + Thread.sleep(1000); // wait job to be scheduled. + scheduler.shutDownAndWait(30 * 1000); + + BalanceJournal journal = + ReflectionUtils.newInstance(BalanceJournalInfoHDFS.class, CONF); + journal.clear(job); + } + + /** + * Test a successful job. + */ + @Test(timeout = 60000) + public void testSuccessfulJob() throws Exception { + BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF); + scheduler.init(true); + try { + // construct job + List procedures = new ArrayList<>(); + BalanceJob.Builder builder = new BalanceJob.Builder(); + for (int i = 0; i < 5; i++) { + RecordProcedure r = new RecordProcedure("record-" + i, 1000L); + builder.nextProcedure(r); + procedures.add(r); + } + BalanceJob job = builder.build(); + + scheduler.submit(job); + scheduler.waitUntilDone(job); + assertNull(job.getError()); + // verify finish list. + assertEquals(5, RecordProcedure.getFinishList().size()); + for (int i = 0; i < RecordProcedure.getFinishList().size(); i++) { + assertEquals(procedures.get(i), RecordProcedure.getFinishList().get(i)); + } + } finally { + scheduler.shutDownAndWait(2); + } + } + + /** + * Test a job fails and the error can be got. + */ + @Test(timeout = 60000) + public void testFailedJob() throws Exception { + BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF); + scheduler.init(true); + try { + // Mock bad procedure. + BalanceProcedure badProcedure = Mockito.mock(BalanceProcedure.class); + Mockito.doThrow(new IOException("Job failed exception.")) + .when(badProcedure).execute(); + Mockito.doReturn("bad-procedure").when(badProcedure).name(); + + BalanceJob.Builder builder = new BalanceJob.Builder<>(); + builder.nextProcedure(badProcedure); + BalanceJob job = builder.build(); + scheduler.submit(job); + scheduler.waitUntilDone(job); + GenericTestUtils + .assertExceptionContains("Job failed exception", job.getError()); + } finally { + scheduler.shutDownAndWait(2); + } + } + + /** + * Test recover a job. After the job is recovered, the job should start from + * the last unfinished procedure, which is the first procedure without + * journal. + */ + @Test(timeout = 60000) + public void testGetJobAfterRecover() throws Exception { + BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF); + scheduler.init(true); + try { + // Construct job. + BalanceJob.Builder builder = new BalanceJob.Builder<>(); + String firstProcedure = "wait0"; + WaitProcedure[] procedures = new WaitProcedure[5]; + for (int i = 0; i < 5; i++) { + WaitProcedure procedure = new WaitProcedure("wait" + i, 1000, 1000); + builder.nextProcedure(procedure).removeAfterDone(false); + procedures[i] = procedure; + } + BalanceJob job = builder.build(); + scheduler.submit(job); + + // Sleep a random time then shut down. + long randomSleepTime = Math.abs(new Random().nextInt()) % 5 * 1000 + 1000; + Thread.sleep(randomSleepTime); + scheduler.shutDownAndWait(2); + + // Current procedure is the last unfinished procedure. It is also the + // first procedure without journal. + WaitProcedure recoverProcedure = (WaitProcedure) job.getCurProcedure(); + int recoverIndex = -1; + for (int i = 0; i < procedures.length; i++) { + if (procedures[i].name().equals(recoverProcedure.name())) { + recoverIndex = i; + break; + } + } + + // Restart scheduler and recover the job. + scheduler = new BalanceProcedureScheduler(CONF); + scheduler.init(true); + scheduler.waitUntilDone(job); + + // The job should be done successfully and the recoverJob should be equal + // to the original job. + BalanceJob recoverJob = scheduler.findJob(job); + assertNull(recoverJob.getError()); + assertNotSame(job, recoverJob); + assertEquals(job, recoverJob); + // Verify whether the recovered job starts from the recoverProcedure. + Map pTable = recoverJob.getProcedureTable(); + List recoveredProcedures = + procedureTableToList(pTable, firstProcedure); + for (int i = 0; i < recoverIndex; i++) { + // All procedures before recoverProcedure shouldn't be executed. + assertFalse(recoveredProcedures.get(i).getExecuted()); + } + for (int i = recoverIndex; i < procedures.length; i++) { + // All procedures start from recoverProcedure should be executed. + assertTrue(recoveredProcedures.get(i).getExecuted()); + } + } finally { + scheduler.shutDownAndWait(2); + } + } + + /** + * Test RetryException is handled correctly. + */ + @Test(timeout = 60000) + public void testRetry() throws Exception { + BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF); + scheduler.init(true); + try { + // construct job + BalanceJob.Builder builder = new BalanceJob.Builder<>(); + RetryProcedure retryProcedure = new RetryProcedure("retry", 1000, 3); + builder.nextProcedure(retryProcedure); + BalanceJob job = builder.build(); + + long start = Time.monotonicNow(); + scheduler.submit(job); + scheduler.waitUntilDone(job); + assertNull(job.getError()); + + long duration = Time.monotonicNow() - start; + assertEquals(true, duration > 1000 * 3); + assertEquals(3, retryProcedure.getTotalRetry()); + } finally { + scheduler.shutDownAndWait(2); + } + } + + /** + * Test schedule an empty job. + */ + @Test(timeout = 60000) + public void testEmptyJob() throws Exception { + BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF); + scheduler.init(true); + try { + BalanceJob job = new BalanceJob.Builder<>().build(); + scheduler.submit(job); + scheduler.waitUntilDone(job); + } finally { + scheduler.shutDownAndWait(2); + } + } + + /** + * Test serialization and deserialization of Job. + */ + @Test(timeout = 60000) + public void testJobSerializeAndDeserialize() throws Exception { + BalanceJob.Builder builder = new BalanceJob.Builder(); + for (int i = 0; i < 5; i++) { + RecordProcedure r = new RecordProcedure("record-" + i, 1000L); + builder.nextProcedure(r); + } + builder.nextProcedure(new RetryProcedure("retry", 1000, 3)); + BalanceJob job = builder.build(); + job.setId(BalanceProcedureScheduler.allocateJobId()); + // Serialize. + ByteArrayOutputStream bao = new ByteArrayOutputStream(); + job.write(new DataOutputStream(bao)); + bao.flush(); + ByteArrayInputStream bai = new ByteArrayInputStream(bao.toByteArray()); + // Deserialize. + BalanceJob newJob = new BalanceJob.Builder<>().build(); + newJob.readFields(new DataInputStream(bai)); + assertEquals(job, newJob); + } + + /** + * Test scheduler crashes and recovers. + */ + @Test(timeout = 60000) + public void testSchedulerDownAndRecoverJob() throws Exception { + BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF); + scheduler.init(true); + Path parent = new Path("/testSchedulerDownAndRecoverJob"); + try { + // construct job + BalanceJob.Builder builder = new BalanceJob.Builder<>(); + MultiPhaseProcedure multiPhaseProcedure = + new MultiPhaseProcedure("retry", 1000, 10, CONF, parent.toString()); + builder.nextProcedure(multiPhaseProcedure); + BalanceJob job = builder.build(); + + scheduler.submit(job); + Thread.sleep(500); // wait procedure to be scheduled. + scheduler.shutDownAndWait(2); + + assertFalse(job.isJobDone()); + int len = fs.listStatus(parent).length; + assertTrue(len > 0 && len < 10); + // restart scheduler, test recovering the job. + scheduler = new BalanceProcedureScheduler(CONF); + scheduler.init(true); + scheduler.waitUntilDone(job); + + assertEquals(10, fs.listStatus(parent).length); + for (int i = 0; i < 10; i++) { + assertTrue(fs.exists(new Path(parent, "phase-" + i))); + } + + BalanceJob recoverJob = scheduler.findJob(job); + assertNull(recoverJob.getError()); + assertNotSame(job, recoverJob); + assertEquals(job, recoverJob); + } finally { + if (fs.exists(parent)) { + fs.delete(parent, true); + } + scheduler.shutDownAndWait(2); + } + } + + @Test(timeout = 60000) + public void testRecoverJobFromJournal() throws Exception { + BalanceJournal journal = + ReflectionUtils.newInstance(BalanceJournalInfoHDFS.class, CONF); + BalanceJob.Builder builder = new BalanceJob.Builder(); + BalanceProcedure wait0 = new WaitProcedure("wait0", 1000, 5000); + BalanceProcedure wait1 = new WaitProcedure("wait1", 1000, 1000); + builder.nextProcedure(wait0).nextProcedure(wait1); + + BalanceJob job = builder.build(); + job.setId(BalanceProcedureScheduler.allocateJobId()); + job.setCurrentProcedure(wait1); + job.setLastProcedure(null); + journal.saveJob(job); + + long start = Time.monotonicNow(); + BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF); + scheduler.init(true); + try { + scheduler.waitUntilDone(job); + long duration = Time.monotonicNow() - start; + assertTrue(duration >= 1000 && duration < 5000); + } finally { + scheduler.shutDownAndWait(2); + } + } + + @Test(timeout = 60000) + public void testClearJournalFail() throws Exception { + BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF); + scheduler.init(true); + + BalanceJournal journal = Mockito.mock(BalanceJournal.class); + AtomicInteger count = new AtomicInteger(0); + Mockito.doAnswer(invocation -> { + if (count.incrementAndGet() == 1) { + throw new IOException("Mock clear failure"); + } + return null; + }).when(journal).clear(any(BalanceJob.class)); + scheduler.setJournal(journal); + + try { + BalanceJob.Builder builder = new BalanceJob.Builder<>(); + builder.nextProcedure(new WaitProcedure("wait", 1000, 1000)); + BalanceJob job = builder.build(); + scheduler.submit(job); + scheduler.waitUntilDone(job); + assertEquals(2, count.get()); + } finally { + scheduler.shutDownAndWait(2); + } + } + + /** + * Test the job will be recovered if writing journal fails. + */ + @Test(timeout = 60000) + public void testJobRecoveryWhenWriteJournalFail() throws Exception { + BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF); + scheduler.init(true); + + try { + // construct job + AtomicBoolean recoverFlag = new AtomicBoolean(true); + BalanceJob.Builder builder = new BalanceJob.Builder<>(); + builder.nextProcedure(new WaitProcedure("wait", 1000, 1000)) + .nextProcedure( + new UnrecoverableProcedure("shutdown", 1000, () -> { + cluster.restartNameNode(false); + return true; + })).nextProcedure( + new UnrecoverableProcedure("recoverFlag", 1000, () -> { + recoverFlag.set(false); + return true; + })).nextProcedure(new WaitProcedure("wait", 1000, 1000)); + + BalanceJob job = builder.build(); + scheduler.submit(job); + scheduler.waitUntilDone(job); + assertTrue(job.isJobDone()); + assertNull(job.getError()); + assertTrue(recoverFlag.get()); + } finally { + scheduler.shutDownAndWait(2); + } + } + + /** + * Transform the procedure map into an ordered list based on the relations + * specified by the map. + */ + List procedureTableToList( + Map pTable, String first) { + List procedures = new ArrayList<>(); + T cur = pTable.get(first); + while (cur != null) { + procedures.add(cur); + cur = pTable.get(cur.nextProcedure()); + } + return procedures; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/UnrecoverableProcedure.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/UnrecoverableProcedure.java new file mode 100644 index 00000000000..941d0a0ae7e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/UnrecoverableProcedure.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.procedure; + +import java.io.IOException; + +/** + * This simulates a Procedure can not be recovered. This is for test only. + * + * If the job is not recovered, the handler is called. Once the job is recovered + * the procedure does nothing. We can use this to verify whether the job has + * been recovered. + */ +public class UnrecoverableProcedure extends BalanceProcedure { + + public interface Call { + boolean execute() throws RetryException, IOException; + } + + private Call handler; + + public UnrecoverableProcedure() {} + + /** + * The handler will be lost if the procedure is recovered. + */ + public UnrecoverableProcedure(String name, long delay, Call handler) { + super(name, delay); + this.handler = handler; + } + + @Override + public boolean execute() throws RetryException, + IOException { + if (handler != null) { + return handler.execute(); + } else { + return true; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/WaitProcedure.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/WaitProcedure.java new file mode 100644 index 00000000000..8666caf2f60 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/WaitProcedure.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.procedure; + +import org.apache.hadoop.util.Time; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * This procedure waits specified period of time then finish. It simulates the + * behaviour of blocking procedures. + */ +public class WaitProcedure extends BalanceProcedure { + + private long waitTime; + private boolean executed = false; + + public WaitProcedure() { + } + + public WaitProcedure(String name, long delay, long waitTime) { + super(name, delay); + this.waitTime = waitTime; + } + + @Override + public boolean execute() throws IOException { + long startTime = Time.monotonicNow(); + long timeLeft = waitTime; + while (timeLeft > 0) { + try { + Thread.sleep(timeLeft); + } catch (InterruptedException e) { + if (isSchedulerShutdown()) { + return false; + } + } finally { + timeLeft = waitTime - (Time.monotonicNow() - startTime); + } + } + executed = true; + return true; + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeLong(waitTime); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + waitTime = in.readLong(); + } + + public boolean getExecuted() { + return executed; + } +}