NIFI-743 Moving OnShutdown to OnStopped for GetSolr and GetHttp

Adding OnRemoved methods to GetSolr and GetHttp

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Bryan Bende 2015-07-03 10:52:35 -04:00 committed by Mark Payne
parent 8da7327188
commit 63a9008527
4 changed files with 56 additions and 13 deletions

View File

@ -21,7 +21,8 @@ package org.apache.nifi.processors.solr;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
@ -157,11 +158,19 @@ public class GetSolr extends SolrProcessor {
lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE);
}
@OnShutdown
public void onShutdown() {
@OnStopped
public void onStopped() {
writeLastEndDate();
}
@OnRemoved
public void onRemoved() {
final File lastEndDateCache = new File(FILE_PREFIX + getIdentifier());
if (lastEndDateCache.exists()) {
lastEndDateCache.delete();
}
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final ProcessorLog logger = getLogger();

View File

@ -104,14 +104,15 @@ public class TestGetSolr {
File confDir = new File("conf");
assertTrue(confDir.exists());
File[] files = confDir.listFiles();
assertTrue(files.length > 0);
for (File file : files) {
assertTrue("Failed to delete " + file.getName(), file.delete());
if (files.length > 0) {
for (File file : files) {
assertTrue("Failed to delete " + file.getName(), file.delete());
}
}
assertTrue(confDir.delete());
try {
solrClient.shutdown();
solrClient.close();
} catch (Exception e) {
}
}
@ -184,6 +185,26 @@ public class TestGetSolr {
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 0);
}
@Test
public void testOnRemovedRemovesState() throws IOException, SolrServerException {
final TestableProcessor proc = new TestableProcessor(solrClient);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue());
runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
runner.setProperty(GetSolr.SOLR_QUERY, "last:smith");
runner.setProperty(GetSolr.RETURN_FIELDS, "created");
runner.setProperty(GetSolr.SORT_CLAUSE, "created desc");
runner.setProperty(GetSolr.DATE_FIELD, "created");
runner.setProperty(GetSolr.BATCH_SIZE, "10");
runner.run();
File lastEndDateCache = new File(GetSolr.FILE_PREFIX + proc.getIdentifier());
Assert.assertTrue("State file should exist, but doesn't", lastEndDateCache.exists());
proc.onRemoved();
Assert.assertFalse("State file should have been removed, but wasn't", lastEndDateCache.exists());
}
// Override createSolrClient and return the passed in SolrClient
private class TestableProcessor extends GetSolr {

View File

@ -73,7 +73,8 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@ -267,8 +268,8 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
lastModifiedRef.set(UNINITIALIZED_LAST_MODIFIED_VALUE);
}
@OnShutdown
public void onShutdown() {
@OnStopped
public void onStopped() {
final File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier());
try (FileOutputStream fos = new FileOutputStream(httpCache)) {
final Properties props = new Properties();
@ -280,6 +281,14 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
}
@OnRemoved
public void onRemoved() {
final File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier());
if (httpCache.exists()) {
httpCache.delete();
}
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
final Collection<ValidationResult> results = new ArrayList<>();

View File

@ -65,9 +65,10 @@ public class TestGetHTTP {
File confDir = new File("conf");
assertTrue(confDir.exists());
File[] files = confDir.listFiles();
assertTrue(files.length > 0);
for (File file : files) {
assertTrue("Failed to delete " + file.getName(), file.delete());
if (files.length > 0) {
for (File file : files) {
assertTrue("Failed to delete " + file.getName(), file.delete());
}
}
assertTrue(confDir.delete());
}
@ -235,6 +236,9 @@ public class TestGetHTTP {
assertEquals(etag, props.getProperty(GetHTTP.ETAG));
assertEquals(lastMod, props.getProperty(GetHTTP.LAST_MODIFIED));
getHTTPProcessor.onRemoved();
assertFalse(file.exists());
// shutdown web service
} finally {
server.shutdownServer();