[ML] Before selecting an executor node for running job task, make sure that all required indices exist and are accessible.

Original commit: elastic/x-pack-elasticsearch@5459a0d5f2
This commit is contained in:
Martijn van Groningen 2017-02-21 21:05:57 +01:00
parent 43a09a2fa7
commit b40d5dde9b
4 changed files with 258 additions and 39 deletions

View File

@ -18,6 +18,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
@ -43,7 +44,10 @@ import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.utils.JobStateObserver;
import org.elasticsearch.xpack.persistent.PersistentActionRegistry;
@ -277,6 +281,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
// If we already know that we can't find an ml node because all ml nodes are running at capacity or
// simply because there are no ml nodes in the cluster then we fail quickly here:
ClusterState clusterState = clusterService.state();
validate(request, clusterState);
if (selectLeastLoadedMlNode(request.getJobId(), clusterState, maxConcurrentJobAllocations, logger) == null) {
throw new ElasticsearchStatusException("no nodes available to open job [" + request.getJobId() + "]",
RestStatus.TOO_MANY_REQUESTS);
@ -374,6 +379,10 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
static DiscoveryNode selectLeastLoadedMlNode(String jobId, ClusterState clusterState, int maxConcurrentJobAllocations,
Logger logger) {
if (verifyIndicesExistAndPrimaryShardsAreActive(logger, jobId, clusterState) == false) {
return null;
}
long maxAvailable = Long.MIN_VALUE;
List<String> reasons = new LinkedList<>();
DiscoveryNode minLoadedNode = null;
@ -433,8 +442,28 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
if (minLoadedNode != null) {
logger.info("selected node [{}] for job [{}]", minLoadedNode, jobId);
} else {
logger.info("no node selected for job [{}], reasons [{}]", jobId, String.join(",\n", reasons));
logger.warn("no node selected for job [{}], reasons [{}]", jobId, String.join(",", reasons));
}
return minLoadedNode;
}
static boolean verifyIndicesExistAndPrimaryShardsAreActive(Logger logger, String jobId, ClusterState clusterState) {
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
Job job = mlMetadata.getJobs().get(jobId);
String jobResultIndex = AnomalyDetectorsIndex.jobResultsIndexName(job.getResultsIndexName());
String[] indices = new String[]{AnomalyDetectorsIndex.jobStateIndexName(), jobResultIndex, JobProvider.ML_META_INDEX,
Auditor.NOTIFICATIONS_INDEX};
for (String index : indices) {
if (clusterState.metaData().hasIndex(index) == false) {
logger.warn("Not opening job [{}], because [{}] index is missing.", jobId, index);
return false;
}
IndexRoutingTable routingTable = clusterState.getRoutingTable().index(index);
if (routingTable == null || routingTable.allPrimaryShardsActive() == false) {
logger.warn("Not opening job [{}], because not all primary shards are active for the [{}] index.", jobId, index);
return false;
}
}
return true;
}
}

View File

@ -10,20 +10,36 @@ import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder;
@ -115,8 +131,13 @@ public class OpenJobActionTests extends ESTestCase {
PersistentTasksInProgress tasks = new PersistentTasksInProgress(3L, taskMap);
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addJobAndIndices(metaData, routingTable, "job_id1", "job_id2", "job_id3", "job_id4");
cs.nodes(nodes);
cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks));
metaData.putCustom(PersistentTasksInProgress.TYPE, tasks);
cs.metaData(metaData);
cs.routingTable(routingTable.build());
DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), 2, logger);
assertEquals("_node_id3", result.getId());
}
@ -142,8 +163,13 @@ public class OpenJobActionTests extends ESTestCase {
PersistentTasksInProgress tasks = new PersistentTasksInProgress(numNodes * maxRunningJobsPerNode, taskMap);
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addJobAndIndices(metaData, routingTable, "job_id1", "job_id2");
cs.nodes(nodes);
cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks));
metaData.putCustom(PersistentTasksInProgress.TYPE, tasks);
cs.metaData(metaData);
cs.routingTable(routingTable.build());
DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, logger);
assertNull(result);
}
@ -161,8 +187,13 @@ public class OpenJobActionTests extends ESTestCase {
PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task));
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addJobAndIndices(metaData, routingTable, "job_id1", "job_id2");
cs.nodes(nodes);
cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks));
metaData.putCustom(PersistentTasksInProgress.TYPE, tasks);
cs.metaData(metaData);
cs.routingTable(routingTable.build());
DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, logger);
assertNull(result);
}
@ -187,41 +218,81 @@ public class OpenJobActionTests extends ESTestCase {
taskMap.put(4L, createJobTask(4L, "job_id5", "_node_id3", JobState.OPENING));
PersistentTasksInProgress tasks = new PersistentTasksInProgress(5L, taskMap);
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
cs.nodes(nodes);
cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks));
DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id6", cs.build(), 2, logger);
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
csBuilder.nodes(nodes);
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addJobAndIndices(metaData, routingTable, "job_id1", "job_id2", "job_id3", "job_id4", "job_id5", "job_id6", "job_id7");
csBuilder.routingTable(routingTable.build());
metaData.putCustom(PersistentTasksInProgress.TYPE, tasks);
csBuilder.metaData(metaData);
ClusterState cs = csBuilder.build();
DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id6", cs, 2, logger);
assertEquals("_node_id3", result.getId());
PersistentTaskInProgress<OpenJobAction.Request> lastTask = createJobTask(5L, "job_id6", "_node_id3", JobState.OPENING);
taskMap.put(5L, lastTask);
tasks = new PersistentTasksInProgress(6L, taskMap);
cs = ClusterState.builder(new ClusterName("_name"));
cs.nodes(nodes);
cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks));
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs.build(), 2, logger);
csBuilder = ClusterState.builder(cs);
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasks));
cs = csBuilder.build();
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, logger);
assertNull("no node selected, because OPENING state", result);
taskMap.put(5L, new PersistentTaskInProgress<>(lastTask, false, "_node_id3"));
tasks = new PersistentTasksInProgress(6L, taskMap);
cs = ClusterState.builder(new ClusterName("_name"));
cs.nodes(nodes);
cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks));
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs.build(), 2, logger);
csBuilder = ClusterState.builder(cs);
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasks));
cs = csBuilder.build();
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, logger);
assertNull("no node selected, because stale task", result);
taskMap.put(5L, new PersistentTaskInProgress<>(lastTask, null));
tasks = new PersistentTasksInProgress(6L, taskMap);
cs = ClusterState.builder(new ClusterName("_name"));
cs.nodes(nodes);
cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks));
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs.build(), 2, logger);
csBuilder = ClusterState.builder(cs);
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasks));
cs = csBuilder.build();
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, logger);
assertNull("no node selected, because null state", result);
}
public void testVerifyIndicesExistAndPrimaryShardsAreActive() {
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addJobAndIndices(metaData, routingTable, "job_id");
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
csBuilder.routingTable(routingTable.build());
csBuilder.metaData(metaData);
ClusterState cs = csBuilder.build();
assertTrue(OpenJobAction.verifyIndicesExistAndPrimaryShardsAreActive(logger, "job_id", cs));
metaData = new MetaData.Builder(cs.metaData());
routingTable = new RoutingTable.Builder(cs.routingTable());
String indexToRemove = randomFrom(cs.metaData().getConcreteAllIndices());
if (randomBoolean()) {
routingTable.remove(indexToRemove);
} else if (randomBoolean()) {
Index index = new Index(indexToRemove, "_uuid");
ShardId shardId = new ShardId(index, 0);
ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""));
shardRouting = shardRouting.initialize("node_id", null, 0L);
routingTable.add(IndexRoutingTable.builder(index)
.addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build()));
} else {
metaData.remove(indexToRemove);
}
csBuilder.routingTable(routingTable.build());
csBuilder.metaData(metaData);
assertFalse(OpenJobAction.verifyIndicesExistAndPrimaryShardsAreActive(logger, "job_id", csBuilder.build()));
}
public static PersistentTaskInProgress<OpenJobAction.Request> createJobTask(long id, String jobId, String nodeId, JobState jobState) {
PersistentTaskInProgress<OpenJobAction.Request> task =
new PersistentTaskInProgress<>(id, OpenJobAction.NAME, new OpenJobAction.Request(jobId), false, true, nodeId);
@ -229,4 +300,38 @@ public class OpenJobActionTests extends ESTestCase {
return task;
}
private void addJobAndIndices(MetaData.Builder metaData, RoutingTable.Builder routingTable, String... jobIds) {
List<String> indices = new ArrayList<>();
indices.add(AnomalyDetectorsIndex.jobStateIndexName());
indices.add(JobProvider.ML_META_INDEX);
indices.add(Auditor.NOTIFICATIONS_INDEX);
for (String jobId : jobIds) {
indices.add(AnomalyDetectorsIndex.jobResultsIndexName(jobId));
}
for (String indexName : indices) {
IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName);
indexMetaData.settings(Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
);
metaData.put(indexMetaData);
Index index = new Index(indexName, "_uuid");
ShardId shardId = new ShardId(index, 0);
ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""));
shardRouting = shardRouting.initialize("node_id", null, 0L);
shardRouting = shardRouting.moveToStarted();
routingTable.add(IndexRoutingTable.builder(index)
.addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build()));
}
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
for (String jobId : jobIds) {
Job job = BaseMlIntegTestCase.createFareQuoteJob(jobId).build();
mlMetadata.putJob(job, false);
}
metaData.putCustom(MlMetadata.TYPE, mlMetadata.build());
}
}

View File

@ -5,14 +5,18 @@
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.CloseJobAction;
import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction;
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.ml.action.OpenJobAction;
import org.elasticsearch.xpack.ml.action.PostDataAction;
import org.elasticsearch.xpack.ml.action.PutDatafeedAction;
import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
@ -172,14 +176,9 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
internalCluster().stopRandomNode(settings -> settings.getAsBoolean(MachineLearning.ML_ENABLED.getKey(), true));
ensureStableCluster(2);
assertBusy(() -> {
// job should get and remain in a failed state:
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTaskInProgress task = tasks.taskMap().values().iterator().next();
assertNull(task.getExecutorNode());
// The status remains to be opened as from ml we didn't had the chance to set the status to failed:
assertEquals(JobState.OPENED, task.getStatus());
// job should get and remain in a failed state and
// the status remains to be opened as from ml we didn't had the chance to set the status to failed:
assertJobTask("job_id", JobState.OPENED, false);
});
logger.info("start ml node");
@ -187,16 +186,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
ensureStableCluster(3);
assertBusy(() -> {
// job should be re-opened:
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTaskInProgress task = tasks.taskMap().values().iterator().next();
assertNotNull(task.getExecutorNode());
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
Map<String, String> expectedNodeAttr = new HashMap<>();
expectedNodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10");
assertEquals(expectedNodeAttr, node.getAttributes());
assertEquals(JobState.OPENED, task.getStatus());
assertJobTask("job_id", JobState.OPENED, true);
});
}
@ -305,5 +295,80 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
assertEquals("Expected no violations, but got [" + violations + "]", 0, violations.size());
}
public void testMlIndicesNotAvailable() throws Exception {
internalCluster().ensureAtMostNumDataNodes(0);
// start non ml node, but that will hold the indices
logger.info("Start non ml node:");
internalCluster().startNode(Settings.builder()
.put("node.data", true)
.put(MachineLearning.ML_ENABLED.getKey(), false));
ensureStableCluster(1);
logger.info("Starting ml node");
internalCluster().startNode(Settings.builder()
.put("node.data", false)
.put(MachineLearning.ML_ENABLED.getKey(), true));
ensureStableCluster(2);
Job.Builder job = createFareQuoteJob("job_id");
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build());
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
assertTrue(putJobResponse.isAcknowledged());
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());
client().execute(OpenJobAction.INSTANCE, openJobRequest).get();
PostDataAction.Request postDataRequest = new PostDataAction.Request("job_id");
postDataRequest.setContent(new BytesArray(
"{\"airline\":\"AAL\",\"responsetime\":\"132.2046\",\"sourcetype\":\"farequote\",\"time\":\"1403481600\"}\n" +
"{\"airline\":\"JZA\",\"responsetime\":\"990.4628\",\"sourcetype\":\"farequote\",\"time\":\"1403481700\"}"
));
PostDataAction.Response response = client().execute(PostDataAction.INSTANCE, postDataRequest).actionGet();
assertEquals(2, response.getDataCounts().getProcessedRecordCount());
CloseJobAction.Request closeJobRequest = new CloseJobAction.Request("job_id");
client().execute(CloseJobAction.INSTANCE, closeJobRequest);
assertBusy(() -> {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
assertEquals(0, tasks.taskMap().size());
});
logger.info("Stop data node");
internalCluster().stopRandomNode(settings -> settings.getAsBoolean("node.data", true));
ensureStableCluster(1);
Exception e = expectThrows(ElasticsearchStatusException.class,
() -> client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet());
assertEquals("no nodes available to open job [job_id]", e.getMessage());
logger.info("Start data node");
internalCluster().startNode(Settings.builder()
.put("node.data", true)
.put(MachineLearning.ML_ENABLED.getKey(), false));
ensureStableCluster(2);
client().execute(OpenJobAction.INSTANCE, openJobRequest).get();
assertBusy(() -> assertJobTask("job_id", JobState.OPENED, true));
}
private void assertJobTask(String jobId, JobState expectedState, boolean hasExecutorNode) {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
assertEquals(1, tasks.taskMap().size());
PersistentTaskInProgress<?> task = tasks.findTasks(OpenJobAction.NAME, p -> {
return p.getRequest() instanceof OpenJobAction.Request &&
jobId.equals(((OpenJobAction.Request) p.getRequest()).getJobId());
}).iterator().next();
assertNotNull(task);
if (hasExecutorNode) {
assertNotNull(task.getExecutorNode());
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
Map<String, String> expectedNodeAttr = new HashMap<>();
expectedNodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10");
assertEquals(expectedNodeAttr, node.getAttributes());
} else {
assertNull(task.getExecutorNode());
}
assertEquals(expectedState, task.getStatus());
}
}

View File

@ -93,6 +93,26 @@ public abstract class BaseMlIntegTestCase extends SecurityIntegTestCase {
return builder;
}
public static Job.Builder createFareQuoteJob(String id) {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.JSON);
dataDescription.setTimeFormat(DataDescription.EPOCH);
dataDescription.setTimeField("time");
Detector.Builder d = new Detector.Builder("metric", "responsetime");
d.setByFieldName("by_field_name");
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build()));
analysisConfig.setBucketSpan(3600L);
Job.Builder builder = new Job.Builder();
builder.setId(id);
builder.setCreateTime(new Date());
builder.setAnalysisConfig(analysisConfig);
builder.setDataDescription(dataDescription);
return builder;
}
public static Job.Builder createScheduledJob(String jobId) {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.JSON);