Add a whack of logging
This commit is contained in:
parent
c6e73817d1
commit
2a455c3e54
|
@ -80,6 +80,7 @@ class WorkChannelMessageHandler implements MessageHandler {
|
|||
return;
|
||||
}
|
||||
WorkChunk workChunk = chunkOpt.get();
|
||||
ourLog.debug("Worker picked up chunk. [chunkId={}, stepId={}, startTime={}]", chunkId, workChunk.getTargetStepId(), workChunk.getStartTime());
|
||||
|
||||
JobWorkCursor<?, ?, ?> cursor = buildCursorFromNotification(workNotification);
|
||||
|
||||
|
|
|
@ -52,6 +52,10 @@ public class JobChunkProgressAccumulator {
|
|||
return getChunkIdsWithStatus(theInstanceId, theStepId, theStatuses).size();
|
||||
}
|
||||
|
||||
int getTotalChunkCountForInstanceAndStep(String theInstanceId, String theStepId) {
|
||||
return myInstanceIdToChunkStatuses.get(theInstanceId).stream().filter(chunkCount -> chunkCount.myStepId.equals(theStepId)).collect(Collectors.toList()).size();
|
||||
}
|
||||
|
||||
public List<String> getChunkIdsWithStatus(String theInstanceId, String theStepId, StatusEnum... theStatuses) {
|
||||
return getChunkStatuses(theInstanceId).stream()
|
||||
.filter(t -> t.myStepId.equals(theStepId))
|
||||
|
@ -74,6 +78,8 @@ public class JobChunkProgressAccumulator {
|
|||
// check avoids adding it twice.
|
||||
if (myConsumedInstanceAndChunkIds.add(instanceId + " " + chunkId)) {
|
||||
myInstanceIdToChunkStatuses.put(instanceId, new ChunkStatusCountValue(chunkId, theChunk.getTargetStepId(), theChunk.getStatus()));
|
||||
} else {
|
||||
ourLog.debug("Ignoring duplicate chunk {} for instance {}", chunkId, instanceId);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -140,8 +140,10 @@ public class JobInstanceProcessor {
|
|||
|
||||
String instanceId = myInstance.getInstanceId();
|
||||
String currentStepId = jobWorkCursor.getCurrentStepId();
|
||||
int totalChunks = myProgressAccumulator.getTotalChunkCountForInstanceAndStep(instanceId, currentStepId);
|
||||
int incompleteChunks = myProgressAccumulator.countChunksWithStatus(instanceId, currentStepId, StatusEnum.getIncompleteStatuses());
|
||||
|
||||
ourLog.debug("Considering whether to advance gated execution. [totalChunks={},incompleteChunks={},instanceId={},stepId={}", totalChunks, incompleteChunks, instanceId, currentStepId);
|
||||
if (incompleteChunks == 0) {
|
||||
String nextStepId = jobWorkCursor.nextStep.getStepId();
|
||||
|
||||
|
@ -165,7 +167,7 @@ public class JobInstanceProcessor {
|
|||
JobWorkNotification workNotification = new JobWorkNotification(myInstance, nextStepId, nextChunkId);
|
||||
myBatchJobSender.sendWorkChannelMessage(workNotification);
|
||||
}
|
||||
|
||||
ourLog.debug("Submitted a batch of chunks for processing. [chunkCount={}, instanceId={}, stepId={}]", chunksForNextStep.size(), instanceId, nextStepId);
|
||||
myInstance.setCurrentGatedStepId(nextStepId);
|
||||
myJobPersistence.updateInstance(myInstance);
|
||||
}
|
||||
|
|
|
@ -29,6 +29,8 @@ import org.apache.commons.lang3.builder.ToStringBuilder;
|
|||
import org.slf4j.Logger;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
class InstanceProgress {
|
||||
|
@ -36,6 +38,7 @@ class InstanceProgress {
|
|||
|
||||
private int myRecordsProcessed = 0;
|
||||
private int myIncompleteChunkCount = 0;
|
||||
private int myQueuedCount = 0;
|
||||
private int myCompleteChunkCount = 0;
|
||||
private int myErroredChunkCount = 0;
|
||||
private int myFailedChunkCount = 0;
|
||||
|
@ -44,6 +47,7 @@ class InstanceProgress {
|
|||
private Long myLatestEndTime = null;
|
||||
private String myErrormessage = null;
|
||||
private StatusEnum myNewStatus = null;
|
||||
private Map<String, Map<StatusEnum, Integer>> myStepToStatusCountMap = new HashMap<>();
|
||||
|
||||
public void addChunk(WorkChunk theChunk) {
|
||||
myErrorCountForAllStatuses += theChunk.getErrorCount();
|
||||
|
@ -55,6 +59,10 @@ class InstanceProgress {
|
|||
}
|
||||
|
||||
private void updateCompletionStatus(WorkChunk theChunk) {
|
||||
//Update the status map first.
|
||||
Map<StatusEnum, Integer> statusToCountMap = myStepToStatusCountMap.getOrDefault(theChunk.getTargetStepId(), new HashMap<>());
|
||||
statusToCountMap.put(theChunk.getStatus(), statusToCountMap.getOrDefault(theChunk.getStatus(), 0) + 1);
|
||||
|
||||
switch (theChunk.getStatus()) {
|
||||
case QUEUED:
|
||||
case IN_PROGRESS:
|
||||
|
@ -163,14 +171,19 @@ class InstanceProgress {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new ToStringBuilder(this)
|
||||
ToStringBuilder builder = new ToStringBuilder(this)
|
||||
.append("myIncompleteChunkCount", myIncompleteChunkCount)
|
||||
.append("myCompleteChunkCount", myCompleteChunkCount)
|
||||
.append("myErroredChunkCount", myErroredChunkCount)
|
||||
.append("myFailedChunkCount", myFailedChunkCount)
|
||||
.append("myErrormessage", myErrormessage)
|
||||
.append("myRecordsProcessed", myRecordsProcessed)
|
||||
.toString();
|
||||
.append("myRecordsProcessed", myRecordsProcessed);
|
||||
|
||||
builder.append("myStepToStatusCountMap", myStepToStatusCountMap);
|
||||
|
||||
return builder.toString();
|
||||
|
||||
|
||||
}
|
||||
|
||||
public StatusEnum getNewStatus() {
|
||||
|
|
|
@ -50,6 +50,7 @@ public class JobInstanceStatusUpdater {
|
|||
return false;
|
||||
}
|
||||
theJobInstance.setStatus(theNewStatus);
|
||||
ourLog.debug("Updating job instance {} of type {} from {} to {}", theJobInstance.getInstanceId(), theJobInstance.getJobDefinitionId(), origStatus, theNewStatus);
|
||||
return updateInstance(theJobInstance);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue