BAEL-2953 A guide to Apache Mesos

This commit is contained in:
Chirag Dewan 2019-06-16 00:03:53 +05:30
parent 69f947e3cd
commit ad818f174e
4 changed files with 208 additions and 0 deletions

View File

@ -99,6 +99,12 @@
<version>${crawler4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.mesos</groupId>
<artifactId>mesos</artifactId>
<version>${mesos.library.version}</version>
</dependency>
</dependencies>
<properties>
@ -109,5 +115,6 @@
<chronicle.map.version>3.17.2</chronicle.map.version>
<crawler4j.version>4.4.0</crawler4j.version>
<spring-boot-starter.version>2.1.4.RELEASE</spring-boot-starter.version>
<mesos.library.version>0.28.3</mesos.library.version>
</properties>
</project>

View File

@ -0,0 +1,42 @@
package com.baeldung.mesos;
import com.baeldung.mesos.schedulers.HelloWorldScheduler;
import org.apache.mesos.MesosSchedulerDriver;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.CommandInfo;
import org.apache.mesos.Protos.ExecutorInfo;
import org.apache.mesos.Protos.FrameworkInfo;
public class HelloWorldMain {
public static void main(String[] args) {
String path = System.getProperty("user.dir")
+ "/target/libraries2-1.0.0-SNAPSHOT.jar";
CommandInfo.URI uri = CommandInfo.URI.newBuilder().setValue(path).setExtract(false).build();
String helloWorldCommand = "java -cp libraries2-1.0.0-SNAPSHOT.jar com.baeldung.mesos.executors.HelloWorldExecutor";
CommandInfo commandInfoHelloWorld = CommandInfo.newBuilder().setValue(helloWorldCommand).addUris(uri)
.build();
ExecutorInfo executorHelloWorld = ExecutorInfo.newBuilder()
.setExecutorId(Protos.ExecutorID.newBuilder().setValue("HelloWorldExecutor"))
.setCommand(commandInfoHelloWorld).setName("Hello World (Java)").setSource("java").build();
FrameworkInfo.Builder frameworkBuilder = FrameworkInfo.newBuilder().setFailoverTimeout(120000)
.setUser("")
.setName("Hello World Framework (Java)");
frameworkBuilder.setPrincipal("test-framework-java");
MesosSchedulerDriver driver = new MesosSchedulerDriver(new HelloWorldScheduler(executorHelloWorld), frameworkBuilder.build(), args[0]);
int status = driver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1;
// Ensure that the driver process terminates.
driver.stop();
System.exit(status);
}
}

View File

@ -0,0 +1,59 @@
package com.baeldung.mesos.executors;
import org.apache.mesos.Executor;
import org.apache.mesos.ExecutorDriver;
import org.apache.mesos.MesosExecutorDriver;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.TaskInfo;
public class HelloWorldExecutor implements Executor {
@Override
public void registered(ExecutorDriver driver, Protos.ExecutorInfo executorInfo, Protos.FrameworkInfo frameworkInfo, Protos.SlaveInfo slaveInfo) {
}
@Override
public void reregistered(ExecutorDriver driver, Protos.SlaveInfo slaveInfo) {
}
@Override
public void disconnected(ExecutorDriver driver) {
}
@Override
public void launchTask(ExecutorDriver driver, TaskInfo task) {
Protos.TaskStatus status = Protos.TaskStatus.newBuilder().setTaskId(task.getTaskId())
.setState(Protos.TaskState.TASK_RUNNING).build();
driver.sendStatusUpdate(status);
String myStatus = "Hello Framework";
driver.sendFrameworkMessage(myStatus.getBytes());
System.out.println("Hello World!!!");
status = Protos.TaskStatus.newBuilder().setTaskId(task.getTaskId())
.setState(Protos.TaskState.TASK_FINISHED).build();
driver.sendStatusUpdate(status);
}
@Override
public void killTask(ExecutorDriver driver, Protos.TaskID taskId) {
}
@Override
public void frameworkMessage(ExecutorDriver driver, byte[] data) {
}
@Override
public void shutdown(ExecutorDriver driver) {
}
@Override
public void error(ExecutorDriver driver, String message) {
}
public static void main(String[] args) {
MesosExecutorDriver driver = new MesosExecutorDriver(new HelloWorldExecutor());
System.exit(driver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1);
}
}

View File

@ -0,0 +1,100 @@
package com.baeldung.mesos.schedulers;
import com.google.protobuf.ByteString;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.ExecutorInfo;
import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.OfferID;
import org.apache.mesos.Protos.TaskInfo;
import org.apache.mesos.Scheduler;
import org.apache.mesos.SchedulerDriver;
import java.util.ArrayList;
import java.util.List;
public class HelloWorldScheduler implements Scheduler {
private int launchedTasks = 0;
private final ExecutorInfo helloWorldExecutor;
public HelloWorldScheduler(ExecutorInfo helloWorldExecutor) {
this.helloWorldExecutor = helloWorldExecutor;
}
@Override
public void registered(SchedulerDriver schedulerDriver, Protos.FrameworkID frameworkID, Protos.MasterInfo masterInfo) {
}
@Override
public void reregistered(SchedulerDriver schedulerDriver, Protos.MasterInfo masterInfo) {
}
@Override
public void resourceOffers(SchedulerDriver schedulerDriver, List<Offer> list) {
for (Offer offer : list) {
List<TaskInfo> tasks = new ArrayList<TaskInfo>();
Protos.TaskID taskId = Protos.TaskID.newBuilder().setValue(Integer.toString(launchedTasks++)).build();
System.out.println("Launching printHelloWorld " + taskId.getValue() + " Hello World Java");
TaskInfo printHelloWorld = TaskInfo
.newBuilder()
.setName("printHelloWorld " + taskId.getValue())
.setTaskId(taskId)
.setSlaveId(offer.getSlaveId())
.addResources(
Protos.Resource.newBuilder().setName("cpus").setType(Protos.Value.Type.SCALAR)
.setScalar(Protos.Value.Scalar.newBuilder().setValue(1)))
.addResources(
Protos.Resource.newBuilder().setName("mem").setType(Protos.Value.Type.SCALAR)
.setScalar(Protos.Value.Scalar.newBuilder().setValue(128)))
.setExecutor(ExecutorInfo.newBuilder(helloWorldExecutor)).build();
List<OfferID> offerIDS = new ArrayList<>();
offerIDS.add(offer.getId());
tasks.add(printHelloWorld);
schedulerDriver.declineOffer(offer.getId());
schedulerDriver.launchTasks(offerIDS, tasks);
}
}
@Override
public void offerRescinded(SchedulerDriver schedulerDriver, OfferID offerID) {
}
@Override
public void statusUpdate(SchedulerDriver schedulerDriver, Protos.TaskStatus taskStatus) {
}
@Override
public void frameworkMessage(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID, Protos.SlaveID slaveID, byte[] bytes) {
}
@Override
public void disconnected(SchedulerDriver schedulerDriver) {
}
@Override
public void slaveLost(SchedulerDriver schedulerDriver, Protos.SlaveID slaveID) {
}
@Override
public void executorLost(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID, Protos.SlaveID slaveID, int i) {
}
@Override
public void error(SchedulerDriver schedulerDriver, String s) {
}
}