From f8bbeeee22aac7ffae1be67240214788a3bdcf29 Mon Sep 17 00:00:00 2001 From: Shikhar Gupta Date: Sun, 11 Dec 2022 03:14:05 -0500 Subject: [PATCH] AMQ-9166: Add destination to scheduler job --- .../activemq/broker/jmx/JobSchedulerView.java | 31 +++++++++++++++++++ .../broker/jmx/JobSchedulerViewMBean.java | 1 + .../activemq/broker/jmx/OpenTypeSupport.java | 2 ++ .../apache/activemq/broker/scheduler/Job.java | 7 +++++ .../broker/scheduler/memory/InMemoryJob.java | 11 +++++++ .../store/kahadb/scheduler/JobImpl.java | 12 +++++++ 6 files changed, 64 insertions(+) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java index 92dfe9277b..b0695ada2b 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java @@ -30,6 +30,7 @@ import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; import org.apache.activemq.broker.scheduler.Job; import org.apache.activemq.broker.scheduler.JobScheduler; import org.apache.activemq.broker.scheduler.JobSupport; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.util.ByteSequence; @@ -54,12 +55,22 @@ public class JobSchedulerView implements JobSchedulerViewMBean { @Override public TabularData getAllJobs() throws Exception { + return getAllJobs(false); + } + + public TabularData getAllJobs(boolean includeDestinationName) throws Exception { OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class); CompositeType ct = factory.getCompositeType(); TabularType tt = new TabularType("Scheduled Jobs", "Scheduled Jobs", ct, new String[] { "jobId" }); TabularDataSupport rc = new TabularDataSupport(tt); List jobs = this.jobScheduler.getAllJobs(); + OpenWireFormat wireFormat = new OpenWireFormat(); for (Job job : jobs) { + if (includeDestinationName) { + Message msg = (Message) wireFormat.unmarshal(new ByteSequence(job.getPayload())); + ActiveMQDestination destination = (ActiveMQDestination) msg.getJMSDestination(); + job.setDestinationName(destination.getPhysicalName()); + } rc.put(new CompositeDataSupport(ct, factory.getFields(job))); } return rc; @@ -67,6 +78,10 @@ public class JobSchedulerView implements JobSchedulerViewMBean { @Override public TabularData getAllJobs(String startTime, String finishTime) throws Exception { + return getAllJobs(startTime, finishTime, false); + } + + public TabularData getAllJobs(String startTime, String finishTime, boolean includeDestinationName) throws Exception { OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class); CompositeType ct = factory.getCompositeType(); TabularType tt = new TabularType("Scheduled Jobs", "Scheduled Jobs", ct, new String[] { "jobId" }); @@ -74,7 +89,13 @@ public class JobSchedulerView implements JobSchedulerViewMBean { long start = JobSupport.getDataTime(startTime); long finish = JobSupport.getDataTime(finishTime); List jobs = this.jobScheduler.getAllJobs(start, finish); + OpenWireFormat wireFormat = new OpenWireFormat(); for (Job job : jobs) { + if (includeDestinationName) { + Message msg = (Message) wireFormat.unmarshal(new ByteSequence(job.getPayload())); + ActiveMQDestination destination = (ActiveMQDestination) msg.getJMSDestination(); + job.setDestinationName(destination.getPhysicalName()); + } rc.put(new CompositeDataSupport(ct, factory.getFields(job))); } return rc; @@ -100,12 +121,22 @@ public class JobSchedulerView implements JobSchedulerViewMBean { @Override public TabularData getNextScheduleJobs() throws Exception { + return getNextScheduleJobs(false); + } + + public TabularData getNextScheduleJobs(boolean includeDestinationName) throws Exception { OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class); CompositeType ct = factory.getCompositeType(); TabularType tt = new TabularType("Scheduled Jobs", "Scheduled Jobs", ct, new String[] { "jobId" }); TabularDataSupport rc = new TabularDataSupport(tt); List jobs = this.jobScheduler.getNextScheduleJobs(); + OpenWireFormat wireFormat = new OpenWireFormat(); for (Job job : jobs) { + if (includeDestinationName) { + Message msg = (Message) wireFormat.unmarshal(new ByteSequence(job.getPayload())); + ActiveMQDestination destination = (ActiveMQDestination) msg.getJMSDestination(); + job.setDestinationName(destination.getPhysicalName()); + } rc.put(new CompositeDataSupport(ct, factory.getFields(job))); } return rc; diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java index 9aedbefc72..8649ca5b26 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java @@ -67,6 +67,7 @@ public interface JobSchedulerViewMBean { @MBeanInfo("remove all scheduled jobs between time ranges ") public abstract void removeAllJobs(@MBeanInfo("start: yyyy-MM-dd hh:mm:ss")String start,@MBeanInfo("finish: yyyy-MM-dd hh:mm:ss")String finish) throws Exception; + /** * Get the next time jobs will be fired from this scheduler store. * diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java index dafbdf148f..cf0dc4d917 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java @@ -420,6 +420,7 @@ public final class OpenTypeSupport { addItem("next", "next time", SimpleType.STRING); addItem("period", "period between jobs", SimpleType.LONG); addItem("repeat", "number of times to repeat", SimpleType.INTEGER); + addItem("destinationName", "destination name", SimpleType.STRING); } @Override @@ -433,6 +434,7 @@ public final class OpenTypeSupport { rc.put("next", job.getNextExecutionTime()); rc.put("period", job.getPeriod()); rc.put("repeat", job.getRepeat()); + rc.put("destinationName", job.getDestinationName()); return rc; } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java index 047fe239e9..1845213951 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java @@ -78,4 +78,11 @@ public interface Job { */ public int getExecutionCount(); + /** + * + * @return name of destination + */ + public String getDestinationName(); + + public void setDestinationName(String destinationName); } \ No newline at end of file diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJob.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJob.java index 9a4b012661..7d470664f3 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJob.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJob.java @@ -35,6 +35,7 @@ public class InMemoryJob implements Job { private int executionCount; private byte[] payload; + private String destinationName; public InMemoryJob(String jobId) { this.jobId = jobId; @@ -122,6 +123,16 @@ public class InMemoryJob implements Job { return executionCount; } + @Override + public String getDestinationName() { + return destinationName; + } + + @Override + public void setDestinationName(String destinationName) { + this.destinationName = destinationName; + } + public void incrementExecutionCount() { this.executionCount++; } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java index 217bc1fb2a..56b49fa1f3 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java @@ -25,6 +25,8 @@ public class JobImpl implements Job { private final JobLocation jobLocation; private final byte[] payload; + private String destinationName; + protected JobImpl(JobLocation location, ByteSequence bs) { this.jobLocation = location; this.payload = new byte[bs.getLength()]; @@ -85,4 +87,14 @@ public class JobImpl implements Job { public String toString() { return "Job: " + getJobId(); } + + @Override + public String getDestinationName() { + return destinationName; + } + + @Override + public void setDestinationName(String destinationName) { + this.destinationName = destinationName; + } }