Remove DataStreamer and DataStreamerThread (elastic/elasticsearch#353)

These classes are no longer needed. They used to be
necessary in order to handle compressed data and sending
data to multiple jobs. Compressed data is now handled
by elasticsearch and multi-job upload is no longer supported.

Original commit: elastic/x-pack-elasticsearch@08a9f29855
This commit is contained in:
Dimitris Athanasiou 2016-11-21 16:30:10 +00:00 committed by GitHub
parent be0b8575c1
commit 63e40d9c72
4 changed files with 0 additions and 321 deletions

View File

@ -1,66 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.data;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.messages.Messages;
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadParams;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.zip.GZIPInputStream;
import java.util.zip.ZipException;
public class DataStreamer {
private static final Logger LOGGER = Loggers.getLogger(DataStreamer.class);
private final DataProcessor dataProccesor;
public DataStreamer(DataProcessor dataProcessor) {
dataProccesor = Objects.requireNonNull(dataProcessor);
}
/**
* Stream the data to the native process.
*
* @return Count of records, fields, bytes, etc written
*/
public DataCounts streamData(String contentEncoding, String jobId, InputStream input, DataLoadParams params) throws IOException {
LOGGER.trace("Handle Post data to job {} ", jobId);
input = tryDecompressingInputStream(contentEncoding, jobId, input);
DataCounts stats = handleStream(jobId, input, params);
LOGGER.debug("Data uploaded to job {}", jobId);
return stats;
}
private InputStream tryDecompressingInputStream(String contentEncoding, String jobId, InputStream input) throws IOException {
if ("gzip".equals(contentEncoding)) {
LOGGER.debug("Decompressing post data in job {}", jobId);
try {
return new GZIPInputStream(input);
} catch (ZipException ze) {
LOGGER.error("Failed to decompress data file", ze);
throw new IllegalArgumentException(Messages.getMessage(Messages.REST_GZIP_ERROR), ze);
}
}
return input;
}
/**
* Pass the data stream to the native process.
*
* @return Count of records, fields, bytes, etc written
*/
private DataCounts handleStream(String jobId, InputStream input, DataLoadParams params) {
return dataProccesor.processData(jobId, input, params);
}
}

View File

@ -1,87 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.data;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadParams;
import java.io.IOException;
import java.io.InputStream;
import java.util.Optional;
// NORELEASE - Use ES ThreadPool
public final class DataStreamerThread extends Thread {
private static final Logger LOGGER = Loggers.getLogger(DataStreamerThread.class);
private DataCounts stats;
private final String jobId;
private final String contentEncoding;
private final DataLoadParams params;
private final InputStream input;
private final DataStreamer dataStreamer;
private ElasticsearchException jobException;
private IOException iOException;
public DataStreamerThread(DataStreamer dataStreamer, String jobId, String contentEncoding,
DataLoadParams params, InputStream input) {
super("DataStreamer-" + jobId);
this.dataStreamer = dataStreamer;
this.jobId = jobId;
this.contentEncoding = contentEncoding;
this.params = params;
this.input = input;
}
@Override
public void run() {
try {
stats = dataStreamer.streamData(contentEncoding, jobId, input, params);
} catch (ElasticsearchException e) {
jobException = e;
} catch (IOException e) {
iOException = e;
} finally {
try {
input.close();
} catch (IOException e) {
LOGGER.warn("Exception closing the data input stream", e);
}
}
}
/**
* This method should only be called <b>after</b> the thread
* has joined other wise the result could be <code>null</code>
* (best case) or undefined.
*/
public DataCounts getDataCounts() {
return stats;
}
/**
* If a Job exception was thrown during the run of this thread it
* is accessed here. Only call this method after the thread has joined.
*/
public Optional<ElasticsearchException> getJobException() {
return Optional.ofNullable(jobException);
}
/**
* If an IOException was thrown during the run of this thread it
* is accessed here. Only call this method after the thread has joined.
*/
public Optional<IOException> getIOException() {
return Optional.ofNullable(iOException);
}
public String getJobId() {
return jobId;
}
}

View File

@ -1,89 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.data;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadParams;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class DataStreamerTests extends ESTestCase {
public void testConstructor_GivenNullDataProcessor() {
ESTestCase.expectThrows(NullPointerException.class, () -> new DataStreamer(null));
}
public void testStreamData_GivenNoContentEncodingAndNoPersistBaseDir() throws IOException {
DataProcessor dataProcessor = mock(DataProcessor.class);
DataStreamer dataStreamer = new DataStreamer(dataProcessor);
InputStream inputStream = mock(InputStream.class);
DataLoadParams params = mock(DataLoadParams.class);
when(dataProcessor.processData("foo", inputStream, params)).thenReturn(new DataCounts("foo"));
dataStreamer.streamData("", "foo", inputStream, params);
verify(dataProcessor).processData("foo", inputStream, params);
Mockito.verifyNoMoreInteractions(dataProcessor);
}
public void testStreamData_ExpectsGzipButNotCompressed() throws IOException {
DataProcessor dataProcessor = mock(DataProcessor.class);
DataStreamer dataStreamer = new DataStreamer(dataProcessor);
InputStream inputStream = mock(InputStream.class);
DataLoadParams params = mock(DataLoadParams.class);
try {
dataStreamer.streamData("gzip", "foo", inputStream, params);
fail("content encoding : gzip with uncompressed data should throw");
} catch (IllegalArgumentException e) {
assertEquals("Content-Encoding = gzip but the data is not in gzip format", e.getMessage());
}
}
public void testStreamData_ExpectsGzipUsesGZipStream() throws IOException {
PipedInputStream pipedIn = new PipedInputStream();
PipedOutputStream pipedOut = new PipedOutputStream(pipedIn);
try (GZIPOutputStream gzip = new GZIPOutputStream(pipedOut)) {
gzip.write("Hello World compressed".getBytes(StandardCharsets.UTF_8));
DataProcessor dataProcessor = mock(DataProcessor.class);
DataStreamer dataStreamer = new DataStreamer(dataProcessor);
DataLoadParams params = mock(DataLoadParams.class);
when(dataProcessor.processData(Mockito.anyString(),
Mockito.any(InputStream.class),
Mockito.any(DataLoadParams.class)))
.thenReturn(new DataCounts("foo"));
dataStreamer.streamData("gzip", "foo", pipedIn, params);
// submitDataLoadJob should be called with a GZIPInputStream
ArgumentCaptor<InputStream> streamArg = ArgumentCaptor.forClass(InputStream.class);
verify(dataProcessor).processData(Mockito.anyString(),
streamArg.capture(),
Mockito.any(DataLoadParams.class));
assertTrue(streamArg.getValue() instanceof GZIPInputStream);
}
}
}

View File

@ -1,79 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.data;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import org.junit.After;
import org.junit.Before;
import org.mockito.Mockito;
import java.io.IOException;
import java.io.InputStream;
import java.util.Date;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class DataStreamerThreadTests extends ESTestCase {
private static final String JOB_ID = "foo";
private static final String CONTENT_ENCODING = "application/json";
private DataStreamer dataStreamer;
private DataLoadParams params;
private InputStream inputStream;
private DataStreamerThread dataStreamerThread;
@Before
public void setUpMocks() {
dataStreamer = Mockito.mock(DataStreamer.class);
params = Mockito.mock(DataLoadParams.class);
inputStream = Mockito.mock(InputStream.class);
dataStreamerThread = new DataStreamerThread(dataStreamer, JOB_ID, CONTENT_ENCODING, params, inputStream);
}
@After
public void verifyInputStreamClosed() throws IOException {
verify(inputStream).close();
}
public void testRun() throws Exception {
DataCounts counts = new DataCounts("foo", 42L, 0L, 0L, 0L, 0L, 0L, 0L, new Date(), new Date());
when(dataStreamer.streamData(CONTENT_ENCODING, JOB_ID, inputStream, params)).thenReturn(counts);
dataStreamerThread.run();
assertEquals(JOB_ID, dataStreamerThread.getJobId());
assertEquals(counts, dataStreamerThread.getDataCounts());
assertFalse(dataStreamerThread.getIOException().isPresent());
assertFalse(dataStreamerThread.getJobException().isPresent());
}
public void testRun_GivenIOException() throws Exception {
when(dataStreamer.streamData(CONTENT_ENCODING, JOB_ID, inputStream, params)).thenThrow(new IOException("prelert"));
dataStreamerThread.run();
assertEquals(JOB_ID, dataStreamerThread.getJobId());
assertNull(dataStreamerThread.getDataCounts());
assertEquals("prelert", dataStreamerThread.getIOException().get().getMessage());
assertFalse(dataStreamerThread.getJobException().isPresent());
}
public void testRun_GivenJobException() throws Exception {
when(dataStreamer.streamData(CONTENT_ENCODING, JOB_ID, inputStream, params)).thenThrow(ExceptionsHelper.serverError("job failed"));
dataStreamerThread.run();
assertEquals(JOB_ID, dataStreamerThread.getJobId());
assertNull(dataStreamerThread.getDataCounts());
assertFalse(dataStreamerThread.getIOException().isPresent());
assertEquals("job failed", dataStreamerThread.getJobException().get().getMessage());
}
}