From 97703bf0da329c488fbb776cc00bfe19b1d8d1db Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 21 Mar 2018 14:54:55 +0000 Subject: [PATCH] [ML] Refactor method to process model size stats (elastic/x-pack-elasticsearch#4175) Original commit: elastic/x-pack-elasticsearch@6262ff33a98c4a9fb35cae7bcffc1cfb6af5b3a4 --- .../output/AutoDetectResultProcessor.java | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index fa50b79e5d5..0ac9c4ec969 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -241,22 +241,7 @@ public class AutoDetectResultProcessor { } ModelSizeStats modelSizeStats = result.getModelSizeStats(); if (modelSizeStats != null) { - LOGGER.trace("[{}] Parsed ModelSizeStats: {} / {} / {} / {} / {} / {}", - context.jobId, modelSizeStats.getModelBytes(), modelSizeStats.getTotalByFieldCount(), - modelSizeStats.getTotalOverFieldCount(), modelSizeStats.getTotalPartitionFieldCount(), - modelSizeStats.getBucketAllocationFailuresCount(), modelSizeStats.getMemoryStatus()); - - latestModelSizeStats = modelSizeStats; - haveNewLatestModelSizeStats = true; - persister.persistModelSizeStats(modelSizeStats); - // This is a crude way to NOT refresh the index and NOT attempt to update established model memory during the first 20 buckets - // because this is when the model size stats are likely to be least stable and lots of updates will be coming through, and - // we'll NEVER consider memory usage to be established during this period - if (restoredSnapshot || bucketCount >= JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE) { - // We need to make all results written up to and including these stats available for the established memory calculation - persister.commitResultWrites(context.jobId); - updateEstablishedModelMemoryOnJob(modelSizeStats.getTimestamp(), modelSizeStats); - } + processModelSizeStats(context, modelSizeStats); } ModelSnapshot modelSnapshot = result.getModelSnapshot(); if (modelSnapshot != null) { @@ -293,6 +278,25 @@ public class AutoDetectResultProcessor { } } + private void processModelSizeStats(Context context, ModelSizeStats modelSizeStats) { + LOGGER.trace("[{}] Parsed ModelSizeStats: {} / {} / {} / {} / {} / {}", + context.jobId, modelSizeStats.getModelBytes(), modelSizeStats.getTotalByFieldCount(), + modelSizeStats.getTotalOverFieldCount(), modelSizeStats.getTotalPartitionFieldCount(), + modelSizeStats.getBucketAllocationFailuresCount(), modelSizeStats.getMemoryStatus()); + + latestModelSizeStats = modelSizeStats; + haveNewLatestModelSizeStats = true; + persister.persistModelSizeStats(modelSizeStats); + // This is a crude way to NOT refresh the index and NOT attempt to update established model memory during the first 20 buckets + // because this is when the model size stats are likely to be least stable and lots of updates will be coming through, and + // we'll NEVER consider memory usage to be established during this period + if (restoredSnapshot || bucketCount >= JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE) { + // We need to make all results written up to and including these stats available for the established memory calculation + persister.commitResultWrites(context.jobId); + updateEstablishedModelMemoryOnJob(modelSizeStats.getTimestamp(), modelSizeStats); + } + } + protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) { JobUpdate update = new JobUpdate.Builder(jobId) .setModelSnapshotId(modelSnapshot.getSnapshotId())