AMQ-9166: Add destination to scheduler job

This commit is contained in:
Shikhar Gupta 2022-12-11 03:14:05 -05:00 committed by Jean-Baptiste Onofré
parent ff360abf43
commit f8bbeeee22
6 changed files with 64 additions and 0 deletions

View File

@ -30,6 +30,7 @@ import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.scheduler.Job;
import org.apache.activemq.broker.scheduler.JobScheduler;
import org.apache.activemq.broker.scheduler.JobSupport;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.util.ByteSequence;
@ -54,12 +55,22 @@ public class JobSchedulerView implements JobSchedulerViewMBean {
@Override
public TabularData getAllJobs() throws Exception {
return getAllJobs(false);
}
public TabularData getAllJobs(boolean includeDestinationName) throws Exception {
OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
CompositeType ct = factory.getCompositeType();
TabularType tt = new TabularType("Scheduled Jobs", "Scheduled Jobs", ct, new String[] { "jobId" });
TabularDataSupport rc = new TabularDataSupport(tt);
List<Job> jobs = this.jobScheduler.getAllJobs();
OpenWireFormat wireFormat = new OpenWireFormat();
for (Job job : jobs) {
if (includeDestinationName) {
Message msg = (Message) wireFormat.unmarshal(new ByteSequence(job.getPayload()));
ActiveMQDestination destination = (ActiveMQDestination) msg.getJMSDestination();
job.setDestinationName(destination.getPhysicalName());
}
rc.put(new CompositeDataSupport(ct, factory.getFields(job)));
}
return rc;
@ -67,6 +78,10 @@ public class JobSchedulerView implements JobSchedulerViewMBean {
@Override
public TabularData getAllJobs(String startTime, String finishTime) throws Exception {
return getAllJobs(startTime, finishTime, false);
}
public TabularData getAllJobs(String startTime, String finishTime, boolean includeDestinationName) throws Exception {
OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
CompositeType ct = factory.getCompositeType();
TabularType tt = new TabularType("Scheduled Jobs", "Scheduled Jobs", ct, new String[] { "jobId" });
@ -74,7 +89,13 @@ public class JobSchedulerView implements JobSchedulerViewMBean {
long start = JobSupport.getDataTime(startTime);
long finish = JobSupport.getDataTime(finishTime);
List<Job> jobs = this.jobScheduler.getAllJobs(start, finish);
OpenWireFormat wireFormat = new OpenWireFormat();
for (Job job : jobs) {
if (includeDestinationName) {
Message msg = (Message) wireFormat.unmarshal(new ByteSequence(job.getPayload()));
ActiveMQDestination destination = (ActiveMQDestination) msg.getJMSDestination();
job.setDestinationName(destination.getPhysicalName());
}
rc.put(new CompositeDataSupport(ct, factory.getFields(job)));
}
return rc;
@ -100,12 +121,22 @@ public class JobSchedulerView implements JobSchedulerViewMBean {
@Override
public TabularData getNextScheduleJobs() throws Exception {
return getNextScheduleJobs(false);
}
public TabularData getNextScheduleJobs(boolean includeDestinationName) throws Exception {
OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
CompositeType ct = factory.getCompositeType();
TabularType tt = new TabularType("Scheduled Jobs", "Scheduled Jobs", ct, new String[] { "jobId" });
TabularDataSupport rc = new TabularDataSupport(tt);
List<Job> jobs = this.jobScheduler.getNextScheduleJobs();
OpenWireFormat wireFormat = new OpenWireFormat();
for (Job job : jobs) {
if (includeDestinationName) {
Message msg = (Message) wireFormat.unmarshal(new ByteSequence(job.getPayload()));
ActiveMQDestination destination = (ActiveMQDestination) msg.getJMSDestination();
job.setDestinationName(destination.getPhysicalName());
}
rc.put(new CompositeDataSupport(ct, factory.getFields(job)));
}
return rc;

View File

@ -67,6 +67,7 @@ public interface JobSchedulerViewMBean {
@MBeanInfo("remove all scheduled jobs between time ranges ")
public abstract void removeAllJobs(@MBeanInfo("start: yyyy-MM-dd hh:mm:ss")String start,@MBeanInfo("finish: yyyy-MM-dd hh:mm:ss")String finish) throws Exception;
/**
* Get the next time jobs will be fired from this scheduler store.
*

View File

@ -420,6 +420,7 @@ public final class OpenTypeSupport {
addItem("next", "next time", SimpleType.STRING);
addItem("period", "period between jobs", SimpleType.LONG);
addItem("repeat", "number of times to repeat", SimpleType.INTEGER);
addItem("destinationName", "destination name", SimpleType.STRING);
}
@Override
@ -433,6 +434,7 @@ public final class OpenTypeSupport {
rc.put("next", job.getNextExecutionTime());
rc.put("period", job.getPeriod());
rc.put("repeat", job.getRepeat());
rc.put("destinationName", job.getDestinationName());
return rc;
}
}

View File

@ -78,4 +78,11 @@ public interface Job {
*/
public int getExecutionCount();
/**
*
* @return name of destination
*/
public String getDestinationName();
public void setDestinationName(String destinationName);
}

View File

@ -35,6 +35,7 @@ public class InMemoryJob implements Job {
private int executionCount;
private byte[] payload;
private String destinationName;
public InMemoryJob(String jobId) {
this.jobId = jobId;
@ -122,6 +123,16 @@ public class InMemoryJob implements Job {
return executionCount;
}
@Override
public String getDestinationName() {
return destinationName;
}
@Override
public void setDestinationName(String destinationName) {
this.destinationName = destinationName;
}
public void incrementExecutionCount() {
this.executionCount++;
}

View File

@ -25,6 +25,8 @@ public class JobImpl implements Job {
private final JobLocation jobLocation;
private final byte[] payload;
private String destinationName;
protected JobImpl(JobLocation location, ByteSequence bs) {
this.jobLocation = location;
this.payload = new byte[bs.getLength()];
@ -85,4 +87,14 @@ public class JobImpl implements Job {
public String toString() {
return "Job: " + getJobId();
}
@Override
public String getDestinationName() {
return destinationName;
}
@Override
public void setDestinationName(String destinationName) {
this.destinationName = destinationName;
}
}