NIFI-2669: This closes #949. Ensure that if Exception is thrown during Transaction initialization that the underlying client is closed/cleaned up. Also ensure that we generate bulletins when logging error/warn level log messages

This commit is contained in:
Mark Payne 2016-08-25 13:25:24 -04:00 committed by joewitt
parent a84d3c9873
commit f908ae3c3b
7 changed files with 69 additions and 37 deletions

View File

@ -16,15 +16,6 @@
*/
package org.apache.nifi.remote.client;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.protocol.http.HttpProxy;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.web.api.dto.ControllerDTO;
import org.apache.nifi.web.api.dto.PortDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
@ -34,9 +25,16 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class SiteInfoProvider {
import javax.net.ssl.SSLContext;
private static final Logger logger = LoggerFactory.getLogger(SiteInfoProvider.class);
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.protocol.http.HttpProxy;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.web.api.dto.ControllerDTO;
import org.apache.nifi.web.api.dto.PortDTO;
public class SiteInfoProvider {
private static final long REMOTE_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
@ -60,7 +58,7 @@ public class SiteInfoProvider {
private ControllerDTO refreshRemoteInfo() throws IOException {
final ControllerDTO controller;
try (final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(sslContext, proxy)) {
try (final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(sslContext, proxy, EventReporter.NO_OP)) {
apiClient.resolveBaseUrl(clusterUrl);
apiClient.setConnectTimeoutMillis(connectTimeoutMillis);
apiClient.setReadTimeoutMillis(readTimeoutMillis);

View File

@ -157,7 +157,7 @@ public interface SiteToSiteClient extends Closeable {
private String truststoreFilename;
private String truststorePass;
private KeystoreType truststoreType;
private EventReporter eventReporter;
private EventReporter eventReporter = EventReporter.NO_OP;
private File peerPersistenceFile;
private boolean useCompression;
private String portName;

View File

@ -105,7 +105,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
@Override
public Set<PeerStatus> fetchRemotePeerStatuses(PeerDescription peerDescription) throws IOException {
// Each node should has the same URL structure and network reach-ability with the proxy configuration.
try (final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy())) {
try (final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy(), config.getEventReporter())) {
final String scheme = peerDescription.isSecure() ? "https" : "http";
final String clusterApiUrl = apiClient.resolveBaseUrl(scheme, peerDescription.getHostname(), peerDescription.getPort());
@ -148,7 +148,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
}
}
final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy());
final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy(), config.getEventReporter());
apiClient.setBaseUrl(peer.getUrl());
apiClient.setConnectTimeoutMillis(timeoutMillis);
@ -192,7 +192,12 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
}
};
transaction.initialize(apiClient, transactionUrl);
try {
transaction.initialize(apiClient, transactionUrl);
} catch (final Exception e) {
transaction.error();
throw e;
}
activeTransactions.add(transaction);
return transaction;

View File

@ -55,6 +55,7 @@ import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpCoreContext;
import org.apache.http.util.EntityUtils;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.http.TransportProtocolVersionNegotiator;
@ -68,6 +69,7 @@ import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.ResponseCode;
import org.apache.nifi.remote.protocol.http.HttpHeaders;
import org.apache.nifi.remote.protocol.http.HttpProxy;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.security.util.CertificateUtils;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
@ -128,6 +130,9 @@ import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTE
public class SiteToSiteRestApiClient implements Closeable {
private static final String EVENT_CATEGORY = "Site-to-Site";
private static final int DATA_PACKET_CHANNEL_READ_BUFFER_SIZE = 16384;
private static final int RESPONSE_CODE_OK = 200;
private static final int RESPONSE_CODE_CREATED = 201;
private static final int RESPONSE_CODE_ACCEPTED = 202;
@ -140,6 +145,7 @@ public class SiteToSiteRestApiClient implements Closeable {
protected final SSLContext sslContext;
protected final HttpProxy proxy;
private final AtomicBoolean proxyAuthRequiresResend = new AtomicBoolean(false);
private final EventReporter eventReporter;
private RequestConfig requestConfig;
private CredentialsProvider credentialsProvider;
@ -156,16 +162,21 @@ public class SiteToSiteRestApiClient implements Closeable {
private String trustedPeerDn;
private final ScheduledExecutorService ttlExtendTaskExecutor;
private ScheduledFuture<?> ttlExtendingThread;
private ScheduledFuture<?> ttlExtendingFuture;
private SiteToSiteRestApiClient extendingApiClient;
private int connectTimeoutMillis;
private int readTimeoutMillis;
private static final Pattern HTTP_ABS_URL = Pattern.compile("^https?://.+$");
public SiteToSiteRestApiClient(final SSLContext sslContext, final HttpProxy proxy) {
private Future<HttpResponse> postResult;
private CountDownLatch transferDataLatch = new CountDownLatch(1);
public SiteToSiteRestApiClient(final SSLContext sslContext, final HttpProxy proxy, final EventReporter eventReporter) {
this.sslContext = sslContext;
this.proxy = proxy;
this.eventReporter = eventReporter;
ttlExtendTaskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
@ -295,6 +306,7 @@ public class SiteToSiteRestApiClient implements Closeable {
} catch (final CertificateException e) {
final String msg = "Could not extract subject DN from SSL session peer certificate";
logger.warn(msg);
eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, msg);
throw new SSLPeerUnverifiedException(msg);
}
}
@ -379,6 +391,7 @@ public class SiteToSiteRestApiClient implements Closeable {
throw handleErrResponse(responseCode, content);
}
}
logger.debug("initiateTransaction handshaking finished, transactionUrl={}", transactionUrl);
return transactionUrl;
@ -487,7 +500,9 @@ public class SiteToSiteRestApiClient implements Closeable {
@Override
public void failed(Exception ex) {
logger.error("Create transaction for {} has failed", post.getURI(), ex);
final String msg = String.format("Failed to create transactino for %s", post.getURI());
logger.error(msg, ex);
eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, msg);
}
@Override
@ -620,9 +635,6 @@ public class SiteToSiteRestApiClient implements Closeable {
}
}
private final int DATA_PACKET_CHANNEL_READ_BUFFER_SIZE = 16384;
private Future<HttpResponse> postResult;
private CountDownLatch transferDataLatch = new CountDownLatch(1);
public void openConnectionForSend(final String transactionUrl, final Peer peer) throws IOException {
@ -717,6 +729,7 @@ public class SiteToSiteRestApiClient implements Closeable {
final long totalWritten = commSession.getOutput().getBytesWritten();
logger.debug("sending data to {} has reached to its end. produced {} bytes by reading {} bytes from channel. {} bytes written in this transaction.",
flowFilesPath, totalProduced, totalRead, totalWritten);
if (totalRead != totalWritten || totalProduced != totalWritten) {
final String msg = "Sending data to %s has reached to its end, but produced : read : wrote byte sizes (%d : %d : %d) were not equal. Something went wrong.";
throw new RuntimeException(String.format(msg, flowFilesPath, totalProduced, totalRead, totalWritten));
@ -751,7 +764,9 @@ public class SiteToSiteRestApiClient implements Closeable {
@Override
public void failed(final Exception ex) {
logger.error("Sending data to {} has failed", flowFilesPath, ex);
final String msg = String.format("Failed to send data to %s due to %s", flowFilesPath, ex.toString());
logger.error(msg, ex);
eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, msg);
}
@Override
@ -842,21 +857,25 @@ public class SiteToSiteRestApiClient implements Closeable {
}
private void startExtendingTtl(final String transactionUrl, final Closeable stream, final CloseableHttpResponse response) {
if (ttlExtendingThread != null) {
if (ttlExtendingFuture != null) {
// Already started.
return;
}
logger.debug("Starting extending TTL thread...");
extendingApiClient = new SiteToSiteRestApiClient(sslContext, proxy);
extendingApiClient = new SiteToSiteRestApiClient(sslContext, proxy, EventReporter.NO_OP);
extendingApiClient.transportProtocolVersionNegotiator = this.transportProtocolVersionNegotiator;
extendingApiClient.connectTimeoutMillis = this.connectTimeoutMillis;
extendingApiClient.readTimeoutMillis = this.readTimeoutMillis;
final int extendFrequency = serverTransactionTtl / 2;
ttlExtendingThread = ttlExtendTaskExecutor.scheduleWithFixedDelay(() -> {
ttlExtendingFuture = ttlExtendTaskExecutor.scheduleWithFixedDelay(() -> {
try {
extendingApiClient.extendTransaction(transactionUrl);
} catch (final Exception e) {
logger.warn("Failed to extend transaction ttl", e);
try {
// Without disconnecting, Site-to-Site client keep reading data packet,
// while server has already rollback.
@ -874,7 +893,7 @@ public class SiteToSiteRestApiClient implements Closeable {
closeable.close();
}
} catch (final IOException e) {
logger.warn("Got an exception during closing {}: {}", closeable, e.getMessage());
logger.warn("Got an exception when closing {}: {}", closeable, e.getMessage());
if (logger.isDebugEnabled()) {
logger.warn("", e);
}
@ -912,9 +931,9 @@ public class SiteToSiteRestApiClient implements Closeable {
ttlExtendTaskExecutor.shutdown();
}
if (ttlExtendingThread != null && !ttlExtendingThread.isCancelled()) {
if (ttlExtendingFuture != null && !ttlExtendingFuture.isCancelled()) {
logger.debug("Cancelling extending ttl...");
ttlExtendingThread.cancel(true);
ttlExtendingFuture.cancel(true);
}
closeSilently(extendingApiClient);
@ -1099,6 +1118,7 @@ public class SiteToSiteRestApiClient implements Closeable {
final ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
try {
return mapper.readValue(responseMessage, entityClass);
} catch (JsonParseException e) {

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.remote.util;
import org.apache.nifi.events.EventReporter;
import org.junit.Assert;
import org.junit.Test;
@ -24,7 +25,7 @@ public class TestSiteToSiteRestApiClient {
@Test
public void testResolveBaseUrlHttp() throws Exception{
final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null);
final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP);
final String baseUrl = apiClient.resolveBaseUrl("http://nifi.example.com/nifi");
Assert.assertEquals("http://nifi.example.com/nifi-api", baseUrl);
@ -33,7 +34,7 @@ public class TestSiteToSiteRestApiClient {
@Test
public void testResolveBaseUrlHttpSub() throws Exception{
final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null);
final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP);
final String baseUrl = apiClient.resolveBaseUrl("http://nifi.example.com/foo/bar/baz/nifi");
Assert.assertEquals("http://nifi.example.com/foo/bar/baz/nifi-api", baseUrl);
@ -41,7 +42,7 @@ public class TestSiteToSiteRestApiClient {
@Test
public void testResolveBaseUrlHttpPort() {
final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null);
final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP);
final String baseUrl = apiClient.resolveBaseUrl("http://nifi.example.com:8080/nifi");
Assert.assertEquals("http://nifi.example.com:8080/nifi-api", baseUrl);
@ -50,7 +51,7 @@ public class TestSiteToSiteRestApiClient {
@Test
public void testResolveBaseUrlHttps() throws Exception{
final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null);
final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP);
final String baseUrl = apiClient.resolveBaseUrl("https://nifi.example.com/nifi");
Assert.assertEquals("https://nifi.example.com/nifi-api", baseUrl);
@ -58,7 +59,7 @@ public class TestSiteToSiteRestApiClient {
@Test
public void testResolveBaseUrlHttpsPort() {
final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null);
final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP);
final String baseUrl = apiClient.resolveBaseUrl("https://nifi.example.com:8443/nifi");
Assert.assertEquals("https://nifi.example.com:8443/nifi-api", baseUrl);

View File

@ -24,6 +24,15 @@ import org.apache.nifi.reporting.Severity;
* Implementations MUST be thread-safe
*/
public interface EventReporter extends Serializable {
/**
* An Event Reporter that performs no action and ignores all given input
*/
public static final EventReporter NO_OP = new EventReporter() {
@Override
public void reportEvent(Severity severity, String category, String message) {
}
};
void reportEvent(Severity severity, String category, String message);
}

View File

@ -835,8 +835,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
try {
// perform the request
final ControllerDTO dto;
try (
final SiteToSiteRestApiClient apiClient = getSiteToSiteRestApiClient()) {
try (final SiteToSiteRestApiClient apiClient = getSiteToSiteRestApiClient()) {
dto = apiClient.getController();
} catch (IOException e) {
writeLock.lock();
@ -928,7 +927,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
private SiteToSiteRestApiClient getSiteToSiteRestApiClient() {
SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(sslContext, new HttpProxy(proxyHost, proxyPort, proxyUser, proxyPassword));
SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(sslContext, new HttpProxy(proxyHost, proxyPort, proxyUser, proxyPassword), getEventReporter());
apiClient.setBaseUrl(getApiUri());
apiClient.setConnectTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS));
apiClient.setReadTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS));