HDFS-15340. RBF: Implement BalanceProcedureScheduler basic framework. Contributed by Jinglun.

This commit is contained in:
Yiqun Lin 2020-05-20 10:39:40 +08:00
parent 0b7799bf6e
commit 1983eea62d
13 changed files with 2141 additions and 0 deletions

View File

@ -0,0 +1,361 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.procedure;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import java.util.HashSet;
import java.util.List;
import java.util.ArrayList;
/**
* A Job is a state machine consists of many procedures. The procedures are
* executed as a chain. Each procedure needs to specify the next procedure. If
* there is no next procedure then the job is finished.
*/
public final class BalanceJob<T extends BalanceProcedure> implements Writable {
private String id;
private BalanceProcedureScheduler scheduler;
private volatile boolean jobDone = false;
private Exception error;
public static final Logger LOG = LoggerFactory.getLogger(BalanceJob.class);
private Map<String, T> procedureTable = new HashMap<>();
private T firstProcedure;
private T curProcedure;
private T lastProcedure;
private boolean removeAfterDone;
static final String NEXT_PROCEDURE_NONE = "NONE";
private static Set<String> reservedNames = new HashSet<>();
static {
reservedNames.add(NEXT_PROCEDURE_NONE);
}
public static class Builder<T extends BalanceProcedure> {
private List<T> procedures = new ArrayList<>();
private boolean removeAfterDone = false;
/**
* Append a procedure to the tail.
*/
public Builder nextProcedure(T procedure) {
int size = procedures.size();
if (size > 0) {
procedures.get(size - 1).setNextProcedure(procedure.name());
}
procedure.setNextProcedure(NEXT_PROCEDURE_NONE);
procedures.add(procedure);
return this;
}
/**
* Automatically remove this job from the scheduler cache when the job is
* done.
*/
public Builder removeAfterDone(boolean remove) {
removeAfterDone = remove;
return this;
}
public BalanceJob build() throws IOException {
BalanceJob job = new BalanceJob(procedures, removeAfterDone);
for (BalanceProcedure<T> p : procedures) {
p.setJob(job);
}
return job;
}
}
private BalanceJob(Iterable<T> procedures, boolean remove)
throws IOException {
for (T p : procedures) {
String taskName = p.name();
if (reservedNames.contains(taskName)) {
throw new IOException(taskName + " is reserved.");
}
procedureTable.put(p.name(), p);
if (firstProcedure == null) {
firstProcedure = p;
}
}
removeAfterDone = remove;
lastProcedure = null;
curProcedure = firstProcedure;
}
/**
* Run the state machine.
*/
public void execute() {
boolean quit = false;
try {
while (!jobDone && !quit && scheduler.isRunning()) {
if (curProcedure == null) { // Job done.
finish(null);
quit = true;
} else {
if (curProcedure == firstProcedure || lastProcedure != curProcedure) {
LOG.info("Start procedure {}, last procedure is {}",
curProcedure.name(),
lastProcedure == null ? null : lastProcedure.name());
}
if (curProcedure.execute()) {
lastProcedure = curProcedure;
curProcedure = next();
}
if (!scheduler.writeJournal(this)) {
quit = true; // Write journal failed. Simply quit because this job
// has already been added to the recoverQueue.
LOG.debug("Write journal failed. Quit and wait for recovery.");
}
}
}
} catch (BalanceProcedure.RetryException tre) {
scheduler.delay(this, curProcedure.delayMillisBeforeRetry());
} catch (Exception e) {
finish(e);
} catch (Throwable t) {
IOException err = new IOException("Got throwable error.", t);
finish(err);
}
}
private T next() {
if (curProcedure == null) {
return firstProcedure;
} else {
return procedureTable.get(curProcedure.nextProcedure());
}
}
/**
* Job finishes. It could be either success or failure.
* @param exception the exception that causes the job to fail. null indicates
* the job is successful.
*/
private synchronized void finish(Exception exception) {
assert !jobDone;
if (scheduler.jobDone(this)) {
jobDone = true;
error = exception;
notifyAll();
}
}
void setScheduler(BalanceProcedureScheduler scheduler) {
this.scheduler = scheduler;
}
void setId(String id) {
this.id = id;
}
/**
* Get the uid of the job.
*/
public String getId() {
return this.id;
}
/**
* Whether this job should be removed after it's done.
*/
@VisibleForTesting
public boolean shouldRemoveAfterDone() {
return removeAfterDone;
}
@VisibleForTesting
void setLastProcedure(T lastProcedure) {
this.lastProcedure = lastProcedure;
}
@VisibleForTesting
void setCurrentProcedure(T currentProcedure) {
this.curProcedure = currentProcedure;
}
/**
* Return true if the job has finished.
*/
public boolean isJobDone() {
return jobDone;
}
/**
* Wait until the job is done.
*/
public synchronized void waitJobDone() throws InterruptedException {
while (!jobDone) {
wait();
}
}
/**
* Return the error exception during the job execution. This should be called
* after the job finishes.
*/
public Exception getError() {
return error;
}
@Override
public void write(DataOutput out) throws IOException {
if (id == null) {
throw new IOException("BalanceJob with id=null can not be serialized.");
}
Text.writeString(out, id);
int taskTableSize = procedureTable.size();
out.writeInt(taskTableSize);
for (T p : procedureTable.values()) {
Text.writeString(out, p.getClass().getName());
p.write(out);
}
if (firstProcedure != null) {
Text.writeString(out, firstProcedure.name());
} else {
Text.writeString(out, NEXT_PROCEDURE_NONE);
}
if (curProcedure != null) {
Text.writeString(out, curProcedure.name());
} else {
Text.writeString(out, NEXT_PROCEDURE_NONE);
}
if (lastProcedure != null) {
Text.writeString(out, lastProcedure.name());
} else {
Text.writeString(out, NEXT_PROCEDURE_NONE);
}
}
@Override
public void readFields(DataInput in) throws IOException {
this.id = Text.readString(in);
procedureTable = new HashMap<>();
int taskTableSize = in.readInt();
for (int i = 0; i < taskTableSize; i++) {
String className = Text.readString(in);
try {
T p = (T) ReflectionUtils.newInstance(Class.forName(className), null);
p.readFields(in);
procedureTable.put(p.name(), p);
} catch (Exception e) {
LOG.error("Failed reading Procedure.", e);
throw new IOException(e);
}
}
String firstProcedureName = Text.readString(in);
if (firstProcedureName.equals(NEXT_PROCEDURE_NONE)) {
firstProcedure = null;
} else {
firstProcedure = procedureTable.get(firstProcedureName);
}
String currentProcedureName = Text.readString(in);
if (currentProcedureName.equals(NEXT_PROCEDURE_NONE)) {
curProcedure = null;
} else {
curProcedure = procedureTable.get(currentProcedureName);
}
String lastProcedureName = Text.readString(in);
if (lastProcedureName.equals(NEXT_PROCEDURE_NONE)) {
lastProcedure = null;
} else {
lastProcedure = procedureTable.get(lastProcedureName);
}
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj == this) {
return true;
}
if (obj.getClass() != getClass()) {
return false;
}
BalanceJob bj = (BalanceJob) obj;
return new EqualsBuilder()
.append(id, bj.id)
.append(procedureTable, bj.procedureTable)
.append(firstProcedure, bj.firstProcedure)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(id)
.append(procedureTable)
.toHashCode();
}
@Override
public String toString() {
return "{jobId=" + id + "}";
}
/**
* Get the detail description of this job.
*/
public String getDetailMessage() {
StringBuilder builder = new StringBuilder();
builder.append("id=").append(id);
if (firstProcedure != null) {
builder.append(",firstProcedure=").append(firstProcedure);
}
if (curProcedure != null) {
builder.append(",currentProcedure=").append(curProcedure);
}
builder.append(",jobDone=").append(jobDone);
if (error != null) {
builder.append(",error=").append(error.getMessage());
}
return builder.toString();
}
boolean isSchedulerShutdown() {
return !scheduler.isRunning();
}
@VisibleForTesting
Map<String, T> getProcedureTable() {
return procedureTable;
}
@VisibleForTesting
T getCurProcedure() {
return curProcedure;
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.procedure;
import org.apache.hadoop.conf.Configurable;
import java.io.IOException;
/**
* The Journal of the state machine. It handles the job persistence and recover.
*/
public interface BalanceJournal extends Configurable {
/**
* Save journal of this job.
*/
void saveJob(BalanceJob job) throws IOException;
/**
* Recover the job from journal.
*/
void recoverJob(BalanceJob job) throws IOException;
/**
* List all unfinished jobs.
*/
BalanceJob[] listAllJobs() throws IOException;
/**
* Clear all the journals of this job.
*/
void clear(BalanceJob job) throws IOException;
}

View File

@ -0,0 +1,203 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.procedure;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.SequentialNumber;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.SCHEDULER_JOURNAL_URI;
import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.TMP_TAIL;
import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.JOB_PREFIX;
/**
* BalanceJournal based on HDFS. This class stores all the journals in the HDFS.
* The jobs are persisted into the HDFS and recovered from the HDFS.
*/
public class BalanceJournalInfoHDFS implements BalanceJournal {
public static final Logger LOG = LoggerFactory.getLogger(
BalanceJournalInfoHDFS.class);
public static class IdGenerator extends SequentialNumber {
protected IdGenerator(long initialValue) {
super(initialValue);
}
}
private URI workUri;
private Configuration conf;
private IdGenerator generator;
/**
* Save job journal to HDFS.
*
* All the journals are saved in the path base-dir. Each job has an individual
* directory named after the job id.
* When a job is saved, a new journal file is created. The file's name
* consists of a prefix 'JOB-' and an incremental sequential id. The file with
* the largest id is the latest journal of this job.
*
* Layout:
* base-dir/
* /job-3f1da5e5-2a60-48de-8736-418d134edbe9/
* /JOB-0
* /JOB-3
* /JOB-5
* /job-ebc19478-2324-46c2-8d1a-2f8c4391dc09/
* /JOB-1
* /JOB-2
* /JOB-4
*/
public void saveJob(BalanceJob job) throws IOException {
Path jobFile = getNewStateJobPath(job);
Path tmpJobFile = new Path(jobFile + TMP_TAIL);
FSDataOutputStream out = null;
try {
FileSystem fs = FileSystem.get(workUri, conf);
out = fs.create(tmpJobFile);
job.write(new DataOutputStream(out));
out.close();
out = null;
fs.rename(tmpJobFile, jobFile);
} finally {
IOUtils.closeStream(out);
}
LOG.debug("Save journal of job={}", job);
}
/**
* Recover job from journal on HDFS.
*/
public void recoverJob(BalanceJob job) throws IOException {
FSDataInputStream in = null;
try {
Path logPath = getLatestStateJobPath(job);
FileSystem fs = FileSystem.get(workUri, conf);
in = fs.open(logPath);
job.readFields(in);
LOG.debug("Recover job={} from journal.", job);
} finally {
if (in != null) {
in.close();
}
}
}
@Override
public BalanceJob[] listAllJobs() throws IOException {
FileSystem fs = FileSystem.get(workUri, conf);
Path workPath = new Path(workUri.getPath());
FileStatus[] statuses;
try {
statuses = fs.listStatus(workPath);
} catch (FileNotFoundException e) {
LOG.debug("Create work path {}", workPath);
fs.mkdirs(workPath);
return new BalanceJob[0];
}
BalanceJob[] jobs = new BalanceJob[statuses.length];
StringBuilder builder = new StringBuilder();
builder.append("List all jobs from journal [");
for (int i = 0; i < statuses.length; i++) {
if (statuses[i].isDirectory()) {
jobs[i] = new BalanceJob.Builder<>().build();
jobs[i].setId(statuses[i].getPath().getName());
builder.append(jobs[i]);
if (i < statuses.length -1) {
builder.append(", ");
}
}
}
builder.append("]");
LOG.debug(builder.toString());
return jobs;
}
@Override
public void clear(BalanceJob job) throws IOException {
Path jobBase = getJobBaseDir(job);
FileSystem fs = FileSystem.get(workUri, conf);
if (fs.exists(jobBase)) {
fs.delete(jobBase, true);
}
LOG.debug("Clear journal of job=" + job);
}
@Override
public void setConf(Configuration conf) {
try {
this.workUri = new URI(conf.get(SCHEDULER_JOURNAL_URI));
} catch (URISyntaxException e) {
throw new IllegalArgumentException("URI resolution failed.", e);
}
this.conf = conf;
this.generator = new IdGenerator(Time.monotonicNow());
}
@Override
public Configuration getConf() {
return conf;
}
private Path getJobBaseDir(BalanceJob job) {
String jobId = job.getId();
return new Path(workUri.getPath(), jobId);
}
private Path getNewStateJobPath(BalanceJob job) {
Path basePath = getJobBaseDir(job);
Path logPath = new Path(basePath, JOB_PREFIX + generator.nextValue());
return logPath;
}
private Path getLatestStateJobPath(BalanceJob job) throws IOException {
Path latestFile = null;
Path basePath = getJobBaseDir(job);
FileSystem fs = FileSystem.get(workUri, conf);
RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(basePath, false);
while (iterator.hasNext()) {
FileStatus status = iterator.next();
String fileName = status.getPath().getName();
if (fileName.startsWith(JOB_PREFIX) && !fileName.contains(TMP_TAIL)) {
if (latestFile == null) {
latestFile = status.getPath();
} else if (latestFile.getName().compareTo(fileName) <= 0) {
latestFile = status.getPath();
}
}
}
return latestFile;
}
}

View File

@ -0,0 +1,226 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.procedure;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import static org.apache.hadoop.hdfs.procedure.BalanceJob.NEXT_PROCEDURE_NONE;
/**
* The basic components of the Job. Extend this class to implement different
* job logic.
*/
public abstract class BalanceProcedure<T extends BalanceProcedure>
implements Writable {
public static final Logger LOG =
LoggerFactory.getLogger(BalanceProcedure.class);
private String nextProcedure; // the procedure after this procedure.
private String name; // the name of this procedure.
private long delayDuration; // this specifies how long will this procedure be
// delayed. The delay is triggered by throwing a
// RetryException.
private BalanceJob job;
public BalanceProcedure() {
}
/**
* The constructor of BalanceProcedure.
*
* @param name the name of the procedure.
* @param nextProcedure the name of the next procedure.
* @param delayDuration the delay duration when this procedure is delayed.
*/
public BalanceProcedure(String name, String nextProcedure,
long delayDuration) {
this();
this.name = name;
this.nextProcedure = nextProcedure;
this.delayDuration = delayDuration;
}
public BalanceProcedure(String name, long delayDuration) {
this(name, NEXT_PROCEDURE_NONE, delayDuration);
}
/**
* The main process. This is called by the ProcedureScheduler.
* Make sure the process quits fast when it's interrupted and the scheduler is
* shut down.
*
* One procedure may have many phases and all the phases share the same member
* variables. Each time this method returns, the journal is saved. User can
* serialize the current phase in write(DataOutput) so the job can continue
* with the last unfinished phase after it is recovered.
* The return value indicates whether the job should go to the next procedure.
* Return true after all the phases finish.
*
* Example:
* class ProcedureWithManyPhase extends BalanceProcedure {
*
* enum PHASE {
* P1, P2, P3
* }
* PHASE phase;
*
* public boolean execute(T lastProcedure) throws RetryException,
* IOException {
* switch (phase) {
* case P1:
* // do something.
* return false;
* case P2:
* // do something.
* return false;
* case P3:
* // do something.
* return true;
* default:
* throw new IOException("Unexpected phase " + phase);
* }
* }
*
* public void write(DataOutput out) {
* out.writeInt(phase.ordinal());
* }
*
* public void readFields(DataInput in) throws IOException {
* stage = Stage.values()[in.readInt()];
* }
* }
*
*
* @throws RetryException if this procedure needs delay a while then retry.
* @return true if the procedure has done and the job will go to the next
* procedure, otherwise false.
*/
public abstract boolean execute() throws RetryException, IOException;
/**
* The time in milliseconds the procedure should wait before retry.
*/
public long delayMillisBeforeRetry() {
return delayDuration;
}
/**
* The active flag.
*/
protected boolean isSchedulerShutdown() {
return job.isSchedulerShutdown();
}
protected void setNextProcedure(String nextProcedure) {
this.nextProcedure = nextProcedure;
}
void setJob(BalanceJob job) {
this.job = job;
}
/**
* Get the next procedure.
*/
public String nextProcedure() {
return nextProcedure;
}
/**
* Get the procedure name.
*/
public String name() {
return name;
}
@Override
public void write(DataOutput out) throws IOException {
if (nextProcedure == null) {
Text.writeString(out, NEXT_PROCEDURE_NONE);
} else {
Text.writeString(out, nextProcedure);
}
Text.writeString(out, name);
new LongWritable(delayDuration).write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
nextProcedure = Text.readString(in);
name = Text.readString(in);
delayDuration = readLong(in);
}
private static long readLong(DataInput in) throws IOException {
LongWritable delayWritable = new LongWritable();
delayWritable.readFields(in);
return delayWritable.get();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(nextProcedure)
.append(name)
.append(delayDuration)
.toHashCode();
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj == this) {
return true;
}
if (obj.getClass() != getClass()) {
return false;
}
BalanceProcedure rhs = (BalanceProcedure) obj;
return new EqualsBuilder()
.append(nextProcedure, rhs.nextProcedure)
.append(name, rhs.name)
.append(delayDuration, rhs.delayDuration)
.build();
}
@Override
public String toString() {
return name + ":" + this.getClass().getName();
}
/**
* The RetryException represents the current procedure should be delayed then
* retried.
*/
public static class RetryException extends Exception {
public RetryException() {}
}
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.procedure;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* This class contains constants for configuration keys and default values
* used in hdfs procedure.
*/
@InterfaceAudience.Private
public final class BalanceProcedureConfigKeys {
/* The worker threads number of the BalanceProcedureScheduler */
public static final String WORK_THREAD_NUM =
"hadoop.hdfs.procedure.work.thread.num";
public static final int WORK_THREAD_NUM_DEFAULT = 10;
/* The uri of the journal */
public static final String SCHEDULER_JOURNAL_URI =
"hadoop.hdfs.procedure.scheduler.journal.uri";
public static final String JOB_PREFIX = "JOB-";
public static final String TMP_TAIL = ".tmp";
public static final String JOURNAL_CLASS =
"hadoop.hdfs.procedure.journal.class";
private BalanceProcedureConfigKeys() {}
}

View File

@ -0,0 +1,450 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.procedure;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.builder.CompareToBuilder;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.WORK_THREAD_NUM;
import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.WORK_THREAD_NUM_DEFAULT;
import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.JOURNAL_CLASS;
/**
* The state machine framework consist of:
* Job: The state machine. It implements the basic logic of the
* state machine.
* Procedure: The components of the job. It implements the custom
* logic.
* ProcedureScheduler: The multi-thread model responsible for running,
* recovering, handling errors and job persistence.
* Journal: It handles the job persistence and recover.
*
* Example:
* Job.Builder builder = new Job.Builder<>();
* builder.nextProcedure(new WaitProcedure("wait", 1000, 30 * 1000));
* Job job = builder.build();
*
* ProcedureScheduler scheduler = new ProcedureScheduler(CONF);
* scheduler.init();
* scheduler.submit(job);
* scheduler.waitUntilDone(job);
*/
public class BalanceProcedureScheduler {
public static final Logger LOG =
LoggerFactory.getLogger(BalanceProcedureScheduler.class);
// The set containing all the jobs, including submitted and recovered ones.
private ConcurrentHashMap<BalanceJob, BalanceJob> jobSet;
// Containing jobs pending for running.
private LinkedBlockingQueue<BalanceJob> runningQueue;
// Containing jobs pending for wake up.
private DelayQueue<DelayWrapper> delayQueue;
// Containing jobs pending for recovery.
private LinkedBlockingQueue<BalanceJob> recoverQueue;
private Configuration conf;
private BalanceJournal journal; // handle jobs' journals.
private Thread readerThread; // consume the runningQueue and send to workers.
private ThreadPoolExecutor workersPool; // the real threads running the jobs.
private Thread roosterThread; // wake up the jobs in the delayQueue.
private Thread recoverThread; // recover the jobs in the recoverQueue.
// The running state of this scheduler.
private AtomicBoolean running = new AtomicBoolean(true);
public BalanceProcedureScheduler(Configuration conf) {
this.conf = conf;
}
/**
* Init the scheduler.
*
* @param recoverJobs whether to recover all the jobs from journal or not.
*/
public synchronized void init(boolean recoverJobs) throws IOException {
this.runningQueue = new LinkedBlockingQueue<>();
this.delayQueue = new DelayQueue<>();
this.recoverQueue = new LinkedBlockingQueue<>();
this.jobSet = new ConcurrentHashMap<>();
// start threads.
this.roosterThread = new Rooster();
this.roosterThread.setDaemon(true);
roosterThread.start();
this.recoverThread = new Recover();
this.recoverThread.setDaemon(true);
recoverThread.start();
int workerNum = conf.getInt(WORK_THREAD_NUM, WORK_THREAD_NUM_DEFAULT);
workersPool = new ThreadPoolExecutor(workerNum, workerNum * 2, 1,
TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>());
this.readerThread = new Reader();
this.readerThread.start();
// init journal.
Class<BalanceJournal> clazz = (Class<BalanceJournal>) conf
.getClass(JOURNAL_CLASS, BalanceJournalInfoHDFS.class);
journal = ReflectionUtils.newInstance(clazz, conf);
if (recoverJobs) {
recoverAllJobs();
}
}
/**
* Submit the job.
*/
public synchronized void submit(BalanceJob job) throws IOException {
if (!running.get()) {
throw new IOException("Scheduler is shutdown.");
}
String jobId = allocateJobId();
job.setId(jobId);
job.setScheduler(this);
journal.saveJob(job);
jobSet.put(job, job);
runningQueue.add(job);
LOG.info("Add new job={}", job);
}
/**
* Remove the job from scheduler if it finishes.
*/
public BalanceJob remove(BalanceJob job) {
BalanceJob inner = findJob(job);
if (inner == null) {
return null;
} else if (job.isJobDone()) {
synchronized (this) {
return jobSet.remove(inner);
}
}
return null;
}
/**
* Find job in scheduler.
*
* @return the job in scheduler. Null if the schedule has no job with the
* same id.
*/
public BalanceJob findJob(BalanceJob job) {
BalanceJob found = null;
for (BalanceJob j : jobSet.keySet()) {
if (j.getId().equals(job.getId())) {
found = j;
break;
}
}
return found;
}
/**
* Return all jobs in the scheduler.
*/
public Collection<BalanceJob> getAllJobs() {
return jobSet.values();
}
/**
* Wait permanently until the job is done.
*/
public void waitUntilDone(BalanceJob job) {
BalanceJob found = findJob(job);
if (found == null || found.isJobDone()) {
return;
}
while (!found.isJobDone()) {
try {
found.waitJobDone();
} catch (InterruptedException e) {
}
}
}
/**
* Delay this job.
*/
void delay(BalanceJob job, long delayInMilliseconds) {
delayQueue.add(new DelayWrapper(job, delayInMilliseconds));
LOG.info("Need delay {}ms. Add to delayQueue. job={}", delayInMilliseconds,
job);
}
boolean jobDone(BalanceJob job) {
try {
journal.clear(job);
if (job.shouldRemoveAfterDone()) {
jobSet.remove(job);
}
return true;
} catch (IOException e) {
LOG.warn("Clear journal failed, add to recoverQueue. job=" + job, e);
recoverQueue.add(job);
return false;
}
}
/**
* Save current status to journal.
*/
boolean writeJournal(BalanceJob job) {
try {
journal.saveJob(job);
return true;
} catch (Exception e) {
LOG.warn("Save procedure failed, add to recoverQueue. job=" + job, e);
recoverQueue.add(job);
return false;
}
}
/**
* The running state of the scheduler.
*/
public boolean isRunning() {
return running.get();
}
/**
* Shutdown the scheduler.
*/
public synchronized void shutDown() {
if (!running.get()) {
return;
}
running.set(false);
readerThread.interrupt();
roosterThread.interrupt();
recoverThread.interrupt();
workersPool.shutdownNow();
}
/**
* Shutdown scheduler and wait at most timeout seconds for procedures to
* finish.
* @param timeout Wait at most timeout seconds for procedures to finish.
*/
public synchronized void shutDownAndWait(int timeout) {
shutDown();
while (readerThread.isAlive()) {
try {
readerThread.join();
} catch (InterruptedException e) {
}
}
while (roosterThread.isAlive()) {
try {
roosterThread.join();
} catch (InterruptedException e) {
}
}
while (recoverThread.isAlive()) {
try {
recoverThread.join();
} catch (InterruptedException e) {
}
}
while (!workersPool.isTerminated()) {
try {
workersPool.awaitTermination(timeout, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
}
}
/**
* Search all jobs and add them to recoverQueue. It's called once after the
* scheduler starts.
*/
private void recoverAllJobs() throws IOException {
BalanceJob[] jobs = journal.listAllJobs();
for (BalanceJob job : jobs) {
recoverQueue.add(job);
jobSet.put(job, job);
}
}
@VisibleForTesting
static String allocateJobId() {
return "job-" + UUID.randomUUID();
}
@VisibleForTesting
public void setJournal(BalanceJournal journal) {
this.journal = journal;
}
/**
* This thread consumes the delayQueue and move the jobs to the runningQueue.
*/
class Rooster extends Thread {
@Override
public void run() {
while (running.get()) {
try {
DelayWrapper dJob = delayQueue.take();
runningQueue.add(dJob.getJob());
LOG.info("Wake up job={}", dJob.getJob());
} catch (InterruptedException e) {
// ignore interrupt exception.
}
}
}
}
/**
* This thread consumes the runningQueue and give the job to the workers.
*/
class Reader extends Thread {
@Override
public void run() {
while (running.get()) {
try {
final BalanceJob job = runningQueue.poll(500, TimeUnit.MILLISECONDS);
if (job != null) {
workersPool.submit(() -> {
LOG.info("Start job. job_msg={}", job.getDetailMessage());
job.execute();
if (!running.get()) {
return;
}
if (job.isJobDone()) {
if (job.getError() == null) {
LOG.info("Job done. job={}", job);
} else {
LOG.warn("Job failed. job=" + job, job.getError());
}
}
return;
});
}
} catch (InterruptedException e) {
// ignore interrupt exception.
}
}
}
}
/**
* This thread consumes the recoverQueue, recovers the job the adds it to the
* runningQueue.
*/
class Recover extends Thread {
@Override
public void run() {
while (running.get()) {
BalanceJob job = null;
try {
job = recoverQueue.poll(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException ie) {
// ignore interrupt exception.
}
if (job != null) {
try {
journal.recoverJob(job);
job.setScheduler(BalanceProcedureScheduler.this);
runningQueue.add(job);
LOG.info("Recover success, add to runningQueue. job={}", job);
} catch (IOException e) {
LOG.warn("Recover failed, re-add to recoverQueue. job=" + job, e);
recoverQueue.add(job);
}
}
}
}
}
/**
* Wrap the delayed BalanceJob.
*/
private static class DelayWrapper implements Delayed {
private BalanceJob job;
private long time;
DelayWrapper(BalanceJob job, long delayInMilliseconds) {
this.job = job;
this.time = Time.monotonicNow() + delayInMilliseconds;
}
BalanceJob getJob() {
return job;
}
@Override
public long getDelay(TimeUnit unit) {
long delay = time - Time.monotonicNow();
if (delay < 0) {
delay = 0;
}
return unit.convert(delay, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
DelayWrapper dw = (DelayWrapper) o;
return new CompareToBuilder()
.append(time, dw.time)
.append(job, dw.job)
.toComparison();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(time)
.append(job)
.toHashCode();
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj == this) {
return true;
}
if (obj.getClass() != getClass()) {
return false;
}
DelayWrapper dw = (DelayWrapper) obj;
return new EqualsBuilder()
.appendSuper(super.equals(obj))
.append(time, dw.time)
.append(job, dw.job)
.build();
}
}
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Classes under this package implement a state machine used for balancing data
* across federation namespaces.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
package org.apache.hadoop.hdfs.procedure;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.procedure;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* This simulates a procedure with many phases. This is used for test.
*/
public class MultiPhaseProcedure extends BalanceProcedure {
private int totalPhase;
private int currentPhase = 0;
private Configuration conf;
private FileSystem fs;
private Path path;
public MultiPhaseProcedure() {}
public MultiPhaseProcedure(String name, long delay, int totalPhase,
Configuration config, String spath) throws IOException {
super(name, delay);
this.totalPhase = totalPhase;
this.conf = config;
this.path = new Path(spath);
this.fs = path.getFileSystem(config);
}
@Override
public boolean execute() throws IOException {
if (currentPhase < totalPhase) {
LOG.info("Current phase {}", currentPhase);
Path phase = new Path(path, "phase-" + currentPhase);
if (!fs.exists(phase)) {
fs.mkdirs(phase);
}
currentPhase++;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
return false;
}
return true;
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeInt(totalPhase);
out.writeInt(currentPhase);
conf.write(out);
Text.writeString(out, path.toString());
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
totalPhase = in.readInt();
currentPhase = in.readInt();
conf = new Configuration(false);
conf.readFields(in);
path = new Path(Text.readString(in));
fs = path.getFileSystem(conf);
}
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.procedure;
import java.util.ArrayList;
import java.util.List;
/**
* This procedure records all the finished procedures. This is used for test.
*/
public class RecordProcedure extends BalanceProcedure<RecordProcedure> {
private static List<RecordProcedure> finish = new ArrayList<>();
public RecordProcedure() {}
public RecordProcedure(String name, long delay) {
super(name, delay);
}
@Override
public boolean execute() throws RetryException {
finish.add(this);
return true;
}
public static List<RecordProcedure> getFinishList() {
return finish;
}
}

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.procedure;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* This simulates a procedure needs many retries. This is used for test.
*/
public class RetryProcedure extends BalanceProcedure {
private int retryTime = 1;
private int totalRetry = 0;
public RetryProcedure() {}
public RetryProcedure(String name, long delay, int retryTime) {
super(name, delay);
this.retryTime = retryTime;
}
@Override
public boolean execute() throws RetryException {
if (retryTime > 0) {
retryTime--;
totalRetry++;
throw new RetryException();
}
return true;
}
public int getTotalRetry() {
return totalRetry;
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeInt(retryTime);
out.writeInt(totalRetry);
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
retryTime = in.readInt();
totalRetry = in.readInt();
}
}

View File

@ -0,0 +1,451 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.procedure;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.junit.BeforeClass;
import org.junit.AfterClass;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.io.ByteArrayOutputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.SCHEDULER_JOURNAL_URI;
import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.WORK_THREAD_NUM;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotSame;
import static org.mockito.ArgumentMatchers.any;
/**
* Test BalanceProcedureScheduler.
*/
public class TestBalanceProcedureScheduler {
private static MiniDFSCluster cluster;
private static final Configuration CONF = new Configuration();
private static DistributedFileSystem fs;
private static final int DEFAULT_BLOCK_SIZE = 512;
@BeforeClass
public static void setup() throws IOException {
CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
true);
CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "hdfs:///");
CONF.setBoolean(DFS_NAMENODE_ACLS_ENABLED_KEY, true);
CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
CONF.setInt(WORK_THREAD_NUM, 1);
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();
cluster.waitClusterUp();
cluster.waitActive();
fs = cluster.getFileSystem();
String workPath =
"hdfs://" + cluster.getNameNode().getHostAndPort() + "/procedure";
CONF.set(SCHEDULER_JOURNAL_URI, workPath);
fs.mkdirs(new Path(workPath));
}
@AfterClass
public static void close() {
if (cluster != null) {
cluster.shutdown();
}
}
/**
* Test the scheduler could be shutdown correctly.
*/
@Test(timeout = 60000)
public void testShutdownScheduler() throws Exception {
BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
scheduler.init(true);
// construct job
BalanceJob.Builder builder = new BalanceJob.Builder<>();
builder.nextProcedure(new WaitProcedure("wait", 1000, 5 * 1000));
BalanceJob job = builder.build();
scheduler.submit(job);
Thread.sleep(1000); // wait job to be scheduled.
scheduler.shutDownAndWait(30 * 1000);
BalanceJournal journal =
ReflectionUtils.newInstance(BalanceJournalInfoHDFS.class, CONF);
journal.clear(job);
}
/**
* Test a successful job.
*/
@Test(timeout = 60000)
public void testSuccessfulJob() throws Exception {
BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
scheduler.init(true);
try {
// construct job
List<RecordProcedure> procedures = new ArrayList<>();
BalanceJob.Builder builder = new BalanceJob.Builder<RecordProcedure>();
for (int i = 0; i < 5; i++) {
RecordProcedure r = new RecordProcedure("record-" + i, 1000L);
builder.nextProcedure(r);
procedures.add(r);
}
BalanceJob<RecordProcedure> job = builder.build();
scheduler.submit(job);
scheduler.waitUntilDone(job);
assertNull(job.getError());
// verify finish list.
assertEquals(5, RecordProcedure.getFinishList().size());
for (int i = 0; i < RecordProcedure.getFinishList().size(); i++) {
assertEquals(procedures.get(i), RecordProcedure.getFinishList().get(i));
}
} finally {
scheduler.shutDownAndWait(2);
}
}
/**
* Test a job fails and the error can be got.
*/
@Test(timeout = 60000)
public void testFailedJob() throws Exception {
BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
scheduler.init(true);
try {
// Mock bad procedure.
BalanceProcedure badProcedure = Mockito.mock(BalanceProcedure.class);
Mockito.doThrow(new IOException("Job failed exception."))
.when(badProcedure).execute();
Mockito.doReturn("bad-procedure").when(badProcedure).name();
BalanceJob.Builder builder = new BalanceJob.Builder<>();
builder.nextProcedure(badProcedure);
BalanceJob job = builder.build();
scheduler.submit(job);
scheduler.waitUntilDone(job);
GenericTestUtils
.assertExceptionContains("Job failed exception", job.getError());
} finally {
scheduler.shutDownAndWait(2);
}
}
/**
* Test recover a job. After the job is recovered, the job should start from
* the last unfinished procedure, which is the first procedure without
* journal.
*/
@Test(timeout = 60000)
public void testGetJobAfterRecover() throws Exception {
BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
scheduler.init(true);
try {
// Construct job.
BalanceJob.Builder builder = new BalanceJob.Builder<>();
String firstProcedure = "wait0";
WaitProcedure[] procedures = new WaitProcedure[5];
for (int i = 0; i < 5; i++) {
WaitProcedure procedure = new WaitProcedure("wait" + i, 1000, 1000);
builder.nextProcedure(procedure).removeAfterDone(false);
procedures[i] = procedure;
}
BalanceJob job = builder.build();
scheduler.submit(job);
// Sleep a random time then shut down.
long randomSleepTime = Math.abs(new Random().nextInt()) % 5 * 1000 + 1000;
Thread.sleep(randomSleepTime);
scheduler.shutDownAndWait(2);
// Current procedure is the last unfinished procedure. It is also the
// first procedure without journal.
WaitProcedure recoverProcedure = (WaitProcedure) job.getCurProcedure();
int recoverIndex = -1;
for (int i = 0; i < procedures.length; i++) {
if (procedures[i].name().equals(recoverProcedure.name())) {
recoverIndex = i;
break;
}
}
// Restart scheduler and recover the job.
scheduler = new BalanceProcedureScheduler(CONF);
scheduler.init(true);
scheduler.waitUntilDone(job);
// The job should be done successfully and the recoverJob should be equal
// to the original job.
BalanceJob recoverJob = scheduler.findJob(job);
assertNull(recoverJob.getError());
assertNotSame(job, recoverJob);
assertEquals(job, recoverJob);
// Verify whether the recovered job starts from the recoverProcedure.
Map<String, WaitProcedure> pTable = recoverJob.getProcedureTable();
List<WaitProcedure> recoveredProcedures =
procedureTableToList(pTable, firstProcedure);
for (int i = 0; i < recoverIndex; i++) {
// All procedures before recoverProcedure shouldn't be executed.
assertFalse(recoveredProcedures.get(i).getExecuted());
}
for (int i = recoverIndex; i < procedures.length; i++) {
// All procedures start from recoverProcedure should be executed.
assertTrue(recoveredProcedures.get(i).getExecuted());
}
} finally {
scheduler.shutDownAndWait(2);
}
}
/**
* Test RetryException is handled correctly.
*/
@Test(timeout = 60000)
public void testRetry() throws Exception {
BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
scheduler.init(true);
try {
// construct job
BalanceJob.Builder builder = new BalanceJob.Builder<>();
RetryProcedure retryProcedure = new RetryProcedure("retry", 1000, 3);
builder.nextProcedure(retryProcedure);
BalanceJob job = builder.build();
long start = Time.monotonicNow();
scheduler.submit(job);
scheduler.waitUntilDone(job);
assertNull(job.getError());
long duration = Time.monotonicNow() - start;
assertEquals(true, duration > 1000 * 3);
assertEquals(3, retryProcedure.getTotalRetry());
} finally {
scheduler.shutDownAndWait(2);
}
}
/**
* Test schedule an empty job.
*/
@Test(timeout = 60000)
public void testEmptyJob() throws Exception {
BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
scheduler.init(true);
try {
BalanceJob job = new BalanceJob.Builder<>().build();
scheduler.submit(job);
scheduler.waitUntilDone(job);
} finally {
scheduler.shutDownAndWait(2);
}
}
/**
* Test serialization and deserialization of Job.
*/
@Test(timeout = 60000)
public void testJobSerializeAndDeserialize() throws Exception {
BalanceJob.Builder builder = new BalanceJob.Builder<RecordProcedure>();
for (int i = 0; i < 5; i++) {
RecordProcedure r = new RecordProcedure("record-" + i, 1000L);
builder.nextProcedure(r);
}
builder.nextProcedure(new RetryProcedure("retry", 1000, 3));
BalanceJob<RecordProcedure> job = builder.build();
job.setId(BalanceProcedureScheduler.allocateJobId());
// Serialize.
ByteArrayOutputStream bao = new ByteArrayOutputStream();
job.write(new DataOutputStream(bao));
bao.flush();
ByteArrayInputStream bai = new ByteArrayInputStream(bao.toByteArray());
// Deserialize.
BalanceJob newJob = new BalanceJob.Builder<>().build();
newJob.readFields(new DataInputStream(bai));
assertEquals(job, newJob);
}
/**
* Test scheduler crashes and recovers.
*/
@Test(timeout = 60000)
public void testSchedulerDownAndRecoverJob() throws Exception {
BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
scheduler.init(true);
Path parent = new Path("/testSchedulerDownAndRecoverJob");
try {
// construct job
BalanceJob.Builder builder = new BalanceJob.Builder<>();
MultiPhaseProcedure multiPhaseProcedure =
new MultiPhaseProcedure("retry", 1000, 10, CONF, parent.toString());
builder.nextProcedure(multiPhaseProcedure);
BalanceJob job = builder.build();
scheduler.submit(job);
Thread.sleep(500); // wait procedure to be scheduled.
scheduler.shutDownAndWait(2);
assertFalse(job.isJobDone());
int len = fs.listStatus(parent).length;
assertTrue(len > 0 && len < 10);
// restart scheduler, test recovering the job.
scheduler = new BalanceProcedureScheduler(CONF);
scheduler.init(true);
scheduler.waitUntilDone(job);
assertEquals(10, fs.listStatus(parent).length);
for (int i = 0; i < 10; i++) {
assertTrue(fs.exists(new Path(parent, "phase-" + i)));
}
BalanceJob recoverJob = scheduler.findJob(job);
assertNull(recoverJob.getError());
assertNotSame(job, recoverJob);
assertEquals(job, recoverJob);
} finally {
if (fs.exists(parent)) {
fs.delete(parent, true);
}
scheduler.shutDownAndWait(2);
}
}
@Test(timeout = 60000)
public void testRecoverJobFromJournal() throws Exception {
BalanceJournal journal =
ReflectionUtils.newInstance(BalanceJournalInfoHDFS.class, CONF);
BalanceJob.Builder builder = new BalanceJob.Builder<RecordProcedure>();
BalanceProcedure wait0 = new WaitProcedure("wait0", 1000, 5000);
BalanceProcedure wait1 = new WaitProcedure("wait1", 1000, 1000);
builder.nextProcedure(wait0).nextProcedure(wait1);
BalanceJob job = builder.build();
job.setId(BalanceProcedureScheduler.allocateJobId());
job.setCurrentProcedure(wait1);
job.setLastProcedure(null);
journal.saveJob(job);
long start = Time.monotonicNow();
BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
scheduler.init(true);
try {
scheduler.waitUntilDone(job);
long duration = Time.monotonicNow() - start;
assertTrue(duration >= 1000 && duration < 5000);
} finally {
scheduler.shutDownAndWait(2);
}
}
@Test(timeout = 60000)
public void testClearJournalFail() throws Exception {
BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
scheduler.init(true);
BalanceJournal journal = Mockito.mock(BalanceJournal.class);
AtomicInteger count = new AtomicInteger(0);
Mockito.doAnswer(invocation -> {
if (count.incrementAndGet() == 1) {
throw new IOException("Mock clear failure");
}
return null;
}).when(journal).clear(any(BalanceJob.class));
scheduler.setJournal(journal);
try {
BalanceJob.Builder builder = new BalanceJob.Builder<>();
builder.nextProcedure(new WaitProcedure("wait", 1000, 1000));
BalanceJob job = builder.build();
scheduler.submit(job);
scheduler.waitUntilDone(job);
assertEquals(2, count.get());
} finally {
scheduler.shutDownAndWait(2);
}
}
/**
* Test the job will be recovered if writing journal fails.
*/
@Test(timeout = 60000)
public void testJobRecoveryWhenWriteJournalFail() throws Exception {
BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(CONF);
scheduler.init(true);
try {
// construct job
AtomicBoolean recoverFlag = new AtomicBoolean(true);
BalanceJob.Builder builder = new BalanceJob.Builder<>();
builder.nextProcedure(new WaitProcedure("wait", 1000, 1000))
.nextProcedure(
new UnrecoverableProcedure("shutdown", 1000, () -> {
cluster.restartNameNode(false);
return true;
})).nextProcedure(
new UnrecoverableProcedure("recoverFlag", 1000, () -> {
recoverFlag.set(false);
return true;
})).nextProcedure(new WaitProcedure("wait", 1000, 1000));
BalanceJob job = builder.build();
scheduler.submit(job);
scheduler.waitUntilDone(job);
assertTrue(job.isJobDone());
assertNull(job.getError());
assertTrue(recoverFlag.get());
} finally {
scheduler.shutDownAndWait(2);
}
}
/**
* Transform the procedure map into an ordered list based on the relations
* specified by the map.
*/
<T extends BalanceProcedure> List<T> procedureTableToList(
Map<String, T> pTable, String first) {
List<T> procedures = new ArrayList<>();
T cur = pTable.get(first);
while (cur != null) {
procedures.add(cur);
cur = pTable.get(cur.nextProcedure());
}
return procedures;
}
}

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.procedure;
import java.io.IOException;
/**
* This simulates a Procedure can not be recovered. This is for test only.
*
* If the job is not recovered, the handler is called. Once the job is recovered
* the procedure does nothing. We can use this to verify whether the job has
* been recovered.
*/
public class UnrecoverableProcedure extends BalanceProcedure {
public interface Call {
boolean execute() throws RetryException, IOException;
}
private Call handler;
public UnrecoverableProcedure() {}
/**
* The handler will be lost if the procedure is recovered.
*/
public UnrecoverableProcedure(String name, long delay, Call handler) {
super(name, delay);
this.handler = handler;
}
@Override
public boolean execute() throws RetryException,
IOException {
if (handler != null) {
return handler.execute();
} else {
return true;
}
}
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.procedure;
import org.apache.hadoop.util.Time;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* This procedure waits specified period of time then finish. It simulates the
* behaviour of blocking procedures.
*/
public class WaitProcedure extends BalanceProcedure {
private long waitTime;
private boolean executed = false;
public WaitProcedure() {
}
public WaitProcedure(String name, long delay, long waitTime) {
super(name, delay);
this.waitTime = waitTime;
}
@Override
public boolean execute() throws IOException {
long startTime = Time.monotonicNow();
long timeLeft = waitTime;
while (timeLeft > 0) {
try {
Thread.sleep(timeLeft);
} catch (InterruptedException e) {
if (isSchedulerShutdown()) {
return false;
}
} finally {
timeLeft = waitTime - (Time.monotonicNow() - startTime);
}
}
executed = true;
return true;
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeLong(waitTime);
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
waitTime = in.readLong();
}
public boolean getExecuted() {
return executed;
}
}