From ad818f174e019f0f87b16cc3cc3414ed60ab65a2 Mon Sep 17 00:00:00 2001 From: Chirag Dewan Date: Sun, 16 Jun 2019 00:03:53 +0530 Subject: [PATCH] BAEL-2953 A guide to Apache Mesos --- libraries-2/pom.xml | 7 ++ .../com/baeldung/mesos/HelloWorldMain.java | 42 ++++++++ .../mesos/executors/HelloWorldExecutor.java | 59 +++++++++++ .../mesos/schedulers/HelloWorldScheduler.java | 100 ++++++++++++++++++ 4 files changed, 208 insertions(+) create mode 100644 libraries-2/src/main/java/com/baeldung/mesos/HelloWorldMain.java create mode 100644 libraries-2/src/main/java/com/baeldung/mesos/executors/HelloWorldExecutor.java create mode 100644 libraries-2/src/main/java/com/baeldung/mesos/schedulers/HelloWorldScheduler.java diff --git a/libraries-2/pom.xml b/libraries-2/pom.xml index 32f3f23812..e8c667c9c7 100644 --- a/libraries-2/pom.xml +++ b/libraries-2/pom.xml @@ -99,6 +99,12 @@ ${crawler4j.version} + + org.apache.mesos + mesos + ${mesos.library.version} + + @@ -109,5 +115,6 @@ 3.17.2 4.4.0 2.1.4.RELEASE + 0.28.3 diff --git a/libraries-2/src/main/java/com/baeldung/mesos/HelloWorldMain.java b/libraries-2/src/main/java/com/baeldung/mesos/HelloWorldMain.java new file mode 100644 index 0000000000..e4bf593e7e --- /dev/null +++ b/libraries-2/src/main/java/com/baeldung/mesos/HelloWorldMain.java @@ -0,0 +1,42 @@ +package com.baeldung.mesos; + +import com.baeldung.mesos.schedulers.HelloWorldScheduler; +import org.apache.mesos.MesosSchedulerDriver; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.CommandInfo; +import org.apache.mesos.Protos.ExecutorInfo; +import org.apache.mesos.Protos.FrameworkInfo; + +public class HelloWorldMain { + + public static void main(String[] args) { + + String path = System.getProperty("user.dir") + + "/target/libraries2-1.0.0-SNAPSHOT.jar"; + + CommandInfo.URI uri = CommandInfo.URI.newBuilder().setValue(path).setExtract(false).build(); + + String helloWorldCommand = "java -cp libraries2-1.0.0-SNAPSHOT.jar com.baeldung.mesos.executors.HelloWorldExecutor"; + CommandInfo commandInfoHelloWorld = CommandInfo.newBuilder().setValue(helloWorldCommand).addUris(uri) + .build(); + + ExecutorInfo executorHelloWorld = ExecutorInfo.newBuilder() + .setExecutorId(Protos.ExecutorID.newBuilder().setValue("HelloWorldExecutor")) + .setCommand(commandInfoHelloWorld).setName("Hello World (Java)").setSource("java").build(); + + FrameworkInfo.Builder frameworkBuilder = FrameworkInfo.newBuilder().setFailoverTimeout(120000) + .setUser("") + .setName("Hello World Framework (Java)"); + + frameworkBuilder.setPrincipal("test-framework-java"); + + MesosSchedulerDriver driver = new MesosSchedulerDriver(new HelloWorldScheduler(executorHelloWorld), frameworkBuilder.build(), args[0]); + + int status = driver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1; + + // Ensure that the driver process terminates. + driver.stop(); + + System.exit(status); + } +} diff --git a/libraries-2/src/main/java/com/baeldung/mesos/executors/HelloWorldExecutor.java b/libraries-2/src/main/java/com/baeldung/mesos/executors/HelloWorldExecutor.java new file mode 100644 index 0000000000..a8620bbce3 --- /dev/null +++ b/libraries-2/src/main/java/com/baeldung/mesos/executors/HelloWorldExecutor.java @@ -0,0 +1,59 @@ +package com.baeldung.mesos.executors; + +import org.apache.mesos.Executor; +import org.apache.mesos.ExecutorDriver; +import org.apache.mesos.MesosExecutorDriver; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.TaskInfo; + +public class HelloWorldExecutor implements Executor { + @Override + public void registered(ExecutorDriver driver, Protos.ExecutorInfo executorInfo, Protos.FrameworkInfo frameworkInfo, Protos.SlaveInfo slaveInfo) { + } + + @Override + public void reregistered(ExecutorDriver driver, Protos.SlaveInfo slaveInfo) { + } + + @Override + public void disconnected(ExecutorDriver driver) { + } + + @Override + public void launchTask(ExecutorDriver driver, TaskInfo task) { + + Protos.TaskStatus status = Protos.TaskStatus.newBuilder().setTaskId(task.getTaskId()) + .setState(Protos.TaskState.TASK_RUNNING).build(); + driver.sendStatusUpdate(status); + + String myStatus = "Hello Framework"; + driver.sendFrameworkMessage(myStatus.getBytes()); + + System.out.println("Hello World!!!"); + + status = Protos.TaskStatus.newBuilder().setTaskId(task.getTaskId()) + .setState(Protos.TaskState.TASK_FINISHED).build(); + driver.sendStatusUpdate(status); + } + + @Override + public void killTask(ExecutorDriver driver, Protos.TaskID taskId) { + } + + @Override + public void frameworkMessage(ExecutorDriver driver, byte[] data) { + } + + @Override + public void shutdown(ExecutorDriver driver) { + } + + @Override + public void error(ExecutorDriver driver, String message) { + } + + public static void main(String[] args) { + MesosExecutorDriver driver = new MesosExecutorDriver(new HelloWorldExecutor()); + System.exit(driver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1); + } +} diff --git a/libraries-2/src/main/java/com/baeldung/mesos/schedulers/HelloWorldScheduler.java b/libraries-2/src/main/java/com/baeldung/mesos/schedulers/HelloWorldScheduler.java new file mode 100644 index 0000000000..68808b4dd0 --- /dev/null +++ b/libraries-2/src/main/java/com/baeldung/mesos/schedulers/HelloWorldScheduler.java @@ -0,0 +1,100 @@ +package com.baeldung.mesos.schedulers; + +import com.google.protobuf.ByteString; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.ExecutorInfo; +import org.apache.mesos.Protos.Offer; +import org.apache.mesos.Protos.OfferID; +import org.apache.mesos.Protos.TaskInfo; +import org.apache.mesos.Scheduler; +import org.apache.mesos.SchedulerDriver; + +import java.util.ArrayList; +import java.util.List; + +public class HelloWorldScheduler implements Scheduler { + + private int launchedTasks = 0; + private final ExecutorInfo helloWorldExecutor; + + public HelloWorldScheduler(ExecutorInfo helloWorldExecutor) { + this.helloWorldExecutor = helloWorldExecutor; + } + + @Override + public void registered(SchedulerDriver schedulerDriver, Protos.FrameworkID frameworkID, Protos.MasterInfo masterInfo) { + + } + + @Override + public void reregistered(SchedulerDriver schedulerDriver, Protos.MasterInfo masterInfo) { + + } + + @Override + public void resourceOffers(SchedulerDriver schedulerDriver, List list) { + + for (Offer offer : list) { + List tasks = new ArrayList(); + Protos.TaskID taskId = Protos.TaskID.newBuilder().setValue(Integer.toString(launchedTasks++)).build(); + + System.out.println("Launching printHelloWorld " + taskId.getValue() + " Hello World Java"); + TaskInfo printHelloWorld = TaskInfo + .newBuilder() + .setName("printHelloWorld " + taskId.getValue()) + .setTaskId(taskId) + .setSlaveId(offer.getSlaveId()) + .addResources( + Protos.Resource.newBuilder().setName("cpus").setType(Protos.Value.Type.SCALAR) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(1))) + .addResources( + Protos.Resource.newBuilder().setName("mem").setType(Protos.Value.Type.SCALAR) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(128))) + .setExecutor(ExecutorInfo.newBuilder(helloWorldExecutor)).build(); + + List offerIDS = new ArrayList<>(); + offerIDS.add(offer.getId()); + + tasks.add(printHelloWorld); + + schedulerDriver.declineOffer(offer.getId()); + schedulerDriver.launchTasks(offerIDS, tasks); + } + + } + + @Override + public void offerRescinded(SchedulerDriver schedulerDriver, OfferID offerID) { + + } + + @Override + public void statusUpdate(SchedulerDriver schedulerDriver, Protos.TaskStatus taskStatus) { + + } + + @Override + public void frameworkMessage(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID, Protos.SlaveID slaveID, byte[] bytes) { + + } + + @Override + public void disconnected(SchedulerDriver schedulerDriver) { + + } + + @Override + public void slaveLost(SchedulerDriver schedulerDriver, Protos.SlaveID slaveID) { + + } + + @Override + public void executorLost(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID, Protos.SlaveID slaveID, int i) { + + } + + @Override + public void error(SchedulerDriver schedulerDriver, String s) { + + } +}