Skip to content

Commit

Permalink
[FLINK-36404][Connectors/Prometheus] Httpclient exceptions always fat…
Browse files Browse the repository at this point in the history
…al. Force httpclient to fail when a exception is thrown by the callback Upgrade httpclient version.
  • Loading branch information
nicusX committed Sep 29, 2024
1 parent ef568f2 commit 37c341b
Show file tree
Hide file tree
Showing 14 changed files with 193 additions and 209 deletions.
7 changes: 3 additions & 4 deletions flink-connector-prometheus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ PrometheusSink sink = PrometheusSink.builder()
.onPrometheusNonRetriableError(OnErrorBehavior.DISCARD_AND_CONTINUE)
// Default is FAIL for other error types
.onMaxRetryExceeded(OnErrorBehavior.FAIL)
.onHttpClientIOFail(OnErrorBehavior.FAIL)
.build())
.setMetricGroupName("Prometheus") // Customizable metric-group suffix (default: "Prometheus")
.build();
Expand Down Expand Up @@ -182,11 +181,10 @@ The possible behaviors are:
* `FAIL`: throw a `PrometheusSinkWriteException`, causing the job to fail.
* `DISCARD_AND_CONTINUE`: log the reason of the error, discard the offending request, and continue.

There are 3 error conditions:
There are two error conditions:

1. Prometheus returns a non-retriable error response (i.e. any `4xx` status code except `429`). Default: `DISCARD_AND_CONTINUE`.
2. Prometheus returns a retriable error response (i.e. `5xx` or `429`) but the max retry limit is exceeded. Default: `FAIL`.
3. The http client fails to complete the request, for an I/O error. Default: `FAIL`.

The error handling behaviors can be configured when creating the instance of the sink, as shown in this snipped:

Expand All @@ -196,7 +194,6 @@ PrometheusSink sink = PrometheusSink.builder()
.setErrorHandlingBehaviourConfiguration(SinkWriterErrorHandlingBehaviorConfiguration.builder()
.onPrometheusNonRetriableError(OnErrorBehavior.DISCARD_AND_CONTINUE)
.onMaxRetryExceeded(OnErrorBehavior.DISCARD_AND_CONTINUE)
.onHttpClientIOFail(OnErrorBehavior.DISCARD_AND_CONTINUE)
.build())
.build();
```
Expand Down Expand Up @@ -224,6 +221,8 @@ Prometheus does not return sufficient information to automatically handle partia
Remote-write endpoint responses 403 (Forbidden) and 404 (Not found) are always considered fatal, regardless the error
handling configuration.

Any I/O error during the communication with the endpoint is also fatal.

### Metrics

The sink exposes custom metrics, counting the samples and write-requests (batches) successfully written or discarded.
Expand Down
3 changes: 0 additions & 3 deletions flink-connector-prometheus/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,16 @@ under the License.
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.1.10.4</version>
</dependency>

<!-- http client -->
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>${apache.httpclient.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.core5</groupId>
<artifactId>httpcore5</artifactId>
<version>${apache.httpclient.version}</version>
</dependency>

<!-- test -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,27 +107,29 @@ public void completed(SimpleHttpResponse response) {

// Prometheus's response is a fatal error, regardless of configured behaviour
if (RemoteWriteResponseClassifier.isFatalErrorResponse(response)) {
throw new PrometheusSinkWriteException(
"Fatal error response from Prometheus",
statusCode,
reasonPhrase,
timeSeriesCount,
sampleCount,
responseBody);
logErrorAndThrow(
new PrometheusSinkWriteException(
"Fatal error response from Prometheus",
statusCode,
reasonPhrase,
timeSeriesCount,
sampleCount,
responseBody));
}

// Prometheus's response is a non-retriable error.
// Depending on the configured behaviour, log and discard or throw an exception
if (RemoteWriteResponseClassifier.isNonRetriableErrorResponse(response)) {
if (errorHandlingBehaviorConfig.getOnPrometheusNonRetriableError()
== PrometheusSinkConfiguration.OnErrorBehavior.FAIL) {
throw new PrometheusSinkWriteException(
"Non-retriable error response from Prometheus",
statusCode,
reasonPhrase,
timeSeriesCount,
sampleCount,
responseBody);
logErrorAndThrow(
new PrometheusSinkWriteException(
"Non-retriable error response from Prometheus",
statusCode,
reasonPhrase,
timeSeriesCount,
sampleCount,
responseBody));
}

LOG.warn(
Expand All @@ -146,13 +148,14 @@ public void completed(SimpleHttpResponse response) {
if (RemoteWriteResponseClassifier.isRetriableErrorResponse(response)) {
if (errorHandlingBehaviorConfig.getOnMaxRetryExceeded()
== PrometheusSinkConfiguration.OnErrorBehavior.FAIL) {
throw new PrometheusSinkWriteException(
"Max retry limit exceeded on retriable error",
statusCode,
reasonPhrase,
timeSeriesCount,
sampleCount,
responseBody);
logErrorAndThrow(
new PrometheusSinkWriteException(
"Max retry limit exceeded on retriable error",
statusCode,
reasonPhrase,
timeSeriesCount,
sampleCount,
responseBody));
}

LOG.warn(
Expand All @@ -167,35 +170,35 @@ public void completed(SimpleHttpResponse response) {
}

// Unexpected/unhandled response outcome
throw new PrometheusSinkWriteException(
"Unexpected status code returned from the remote-write endpoint",
statusCode,
reasonPhrase,
timeSeriesCount,
sampleCount,
responseBody);
logErrorAndThrow(
new PrometheusSinkWriteException(
"Unexpected status code returned from the remote-write endpoint",
statusCode,
reasonPhrase,
timeSeriesCount,
sampleCount,
responseBody));
}

@Override
public void failed(Exception ex) {
// General I/O failure reported by http client
// Depending on the configured behavior, throw an exception or log and discard
if (errorHandlingBehaviorConfig.getOnHttpClientIOFail()
== PrometheusSinkConfiguration.OnErrorBehavior.FAIL) {
throw new PrometheusSinkWriteException("Http client failure", ex);
} else {
LOG.warn(
"Exception executing the remote-write (discarded {} time-series containing {} samples)",
timeSeriesCount,
sampleCount,
ex);
metricsCallback.onFailedWriteRequestForHttpClientIoFail(sampleCount);
}
// Always fail
throw new PrometheusSinkWriteException("Http client failure", ex);
}

@Override
public void cancelled() {
// When the async http client is cancelled, the sink always throws an exception
throw new PrometheusSinkWriteException("Write request execution cancelled");
}

/**
* Log the exception at ERROR and rethrow. It will be intercepted up the client stack, by the
* {@link org.apache.flink.connector.prometheus.sink.http.RethrowingIOSessionListener}.
*/
private void logErrorAndThrow(PrometheusSinkWriteException ex) {
LOG.error("Error condition detected but the http response callback (on complete)", ex);
throw ex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public AsyncSinkBase<PrometheusTimeSeries, Types.TimeSeries> build() {
+ "\n\t\tmaxTimeInBufferMs={}\n\t\tmaxInFlightRequests={}\n\t\tmaxBufferedRequests={}"
+ "\n\t\tRetryConfiguration: initialRetryDelayMs={}, maxRetryDelayMs={}, maxRetryCount={}"
+ "\n\t\tsocketTimeoutMs={}\n\t\thttpUserAgent={}"
+ "\n\t\tErrorHandlingBehaviour: onMaxRetryExceeded={}, onHttpClientIOFailure={}, onNonRetriableError={}",
+ "\n\t\tErrorHandlingBehaviour: onMaxRetryExceeded={}, onNonRetriableError={}",
actualMaxBatchSizeInSamples,
actualMaxRecordSizeInSamples,
actualMaxTimeInBufferMS,
Expand All @@ -114,7 +114,6 @@ public AsyncSinkBase<PrometheusTimeSeries, Types.TimeSeries> build() {
socketTimeoutMs,
actualHttpUserAgent,
actualErrorHandlingBehaviorConfig.getOnMaxRetryExceeded(),
actualErrorHandlingBehaviorConfig.getOnHttpClientIOFail(),
actualErrorHandlingBehaviorConfig.getOnPrometheusNonRetriableError());

return new PrometheusSink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,17 @@ public enum OnErrorBehavior {
public static class SinkWriterErrorHandlingBehaviorConfiguration implements Serializable {

public static final OnErrorBehavior ON_MAX_RETRY_EXCEEDED_DEFAULT_BEHAVIOR = FAIL;
public static final OnErrorBehavior ON_HTTP_CLIENT_IO_FAIL_DEFAULT_BEHAVIOR = FAIL;
public static final OnErrorBehavior ON_PROMETHEUS_NON_RETRIABLE_ERROR_DEFAULT_BEHAVIOR =
DISCARD_AND_CONTINUE;

/** Behaviour when the max retries is exceeded on Prometheus retriable errors. */
private final OnErrorBehavior onMaxRetryExceeded;

/** Behaviour when the HTTP client fails, for an I/O problem. */
private final OnErrorBehavior onHttpClientIOFail;

/** Behaviour when Prometheus Remote-Write respond with a non-retriable error. */
private final OnErrorBehavior onPrometheusNonRetriableError;

public SinkWriterErrorHandlingBehaviorConfiguration(
OnErrorBehavior onMaxRetryExceeded,
OnErrorBehavior onHttpClientIOFail,
OnErrorBehavior onPrometheusNonRetriableError) {
OnErrorBehavior onMaxRetryExceeded, OnErrorBehavior onPrometheusNonRetriableError) {
// onPrometheusNonRetriableError cannot be set to FAIL, because it makes impossible for
// the job to restart from checkpoint (see FLINK-36319).
// We are retaining the possibility of configuring the behavior on this type of error to
Expand All @@ -71,26 +65,20 @@ public SinkWriterErrorHandlingBehaviorConfiguration(
onPrometheusNonRetriableError == DISCARD_AND_CONTINUE,
"Only DISCARD_AND_CONTINUE is currently supported for onPrometheusNonRetriableError");
this.onMaxRetryExceeded = onMaxRetryExceeded;
this.onHttpClientIOFail = onHttpClientIOFail;
this.onPrometheusNonRetriableError = onPrometheusNonRetriableError;
}

public OnErrorBehavior getOnMaxRetryExceeded() {
return onMaxRetryExceeded;
}

public OnErrorBehavior getOnHttpClientIOFail() {
return onHttpClientIOFail;
}

public OnErrorBehavior getOnPrometheusNonRetriableError() {
return onPrometheusNonRetriableError;
}

/** Builder for PrometheusSinkWriterErrorHandlingConfiguration. */
public static class Builder {
private OnErrorBehavior onMaxRetryExceeded = null;
private OnErrorBehavior onHttpClientIOFail = null;
private OnErrorBehavior onPrometheusNonRetriableError = null;

public Builder() {}
Expand All @@ -100,11 +88,6 @@ public Builder onMaxRetryExceeded(OnErrorBehavior onErrorBehavior) {
return this;
}

public Builder onHttpClientIOFail(OnErrorBehavior onErrorBehavior) {
this.onHttpClientIOFail = onErrorBehavior;
return this;
}

public Builder onPrometheusNonRetriableError(OnErrorBehavior onErrorBehavior) {
this.onPrometheusNonRetriableError = onErrorBehavior;
return this;
Expand All @@ -114,8 +97,6 @@ public SinkWriterErrorHandlingBehaviorConfiguration build() {
return new SinkWriterErrorHandlingBehaviorConfiguration(
Optional.ofNullable(onMaxRetryExceeded)
.orElse(ON_MAX_RETRY_EXCEEDED_DEFAULT_BEHAVIOR),
Optional.ofNullable(onHttpClientIOFail)
.orElse(ON_HTTP_CLIENT_IO_FAIL_DEFAULT_BEHAVIOR),
Optional.ofNullable(onPrometheusNonRetriableError)
.orElse(ON_PROMETHEUS_NON_RETRIABLE_ERROR_DEFAULT_BEHAVIOR));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public CloseableHttpAsyncClient buildAndStartClient(SinkMetricsCallback metricsC
HttpAsyncClients.custom()
.setConnectionManager(connectionManager)
.setIOReactorConfig(ioReactorConfig)
.setIOSessionListener(new RethrowingIOSessionListener())
.setRetryStrategy(
new RemoteWriteRetryStrategy(retryConfiguration, metricsCallback))
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.prometheus.sink.http;

import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.prometheus.sink.errorhandling.PrometheusSinkWriteException;

import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.reactor.IOSessionListener;

/**
* Selectively rethrow PrometheusSinkWriteException, causing the httpclient to fail. Otherwise, the
* exception would be swallowed by the IOReactor.
*/
@Internal
public class RethrowingIOSessionListener implements IOSessionListener {
@Override
public void exception(IOSession ioSession, Exception e) {
if (e instanceof PrometheusSinkWriteException) {
// Rethrow the exception
throw (PrometheusSinkWriteException) e;
}
}

@Override
public void connected(IOSession ioSession) {
// Nothing to do
}

@Override
public void startTls(IOSession ioSession) {
// Nothing to do
}

@Override
public void inputReady(IOSession ioSession) {
// Nothing to do
}

@Override
public void outputReady(IOSession ioSession) {
// Nothing to do
}

@Override
public void timeout(IOSession ioSession) {
// Nothing to do
}

@Override
public void disconnected(IOSession ioSession) {
// Nothing to do
}
}
Loading

0 comments on commit 37c341b

Please sign in to comment.