Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Benthos acknowledges gcp_pubsub input messages even on error #183

Open
Tarun-19 opened this issue Feb 22, 2025 · 0 comments
Open

Benthos acknowledges gcp_pubsub input messages even on error #183

Tarun-19 opened this issue Feb 22, 2025 · 0 comments

Comments

@Tarun-19
Copy link

Here is the scenario i'm facing: I have written a Benthos job that pulls messages from a Pub/Sub subscriber, calls two APIs using that data, and sends Slack alerts for each failure. Finally, it leaves the message unacknowledged if any error is encountered while calling the two APIs.

The problem is that even after explicitly throwing an error using throw("Intentional error"), this job still acknowledges the pulled message. Ideally, it should leave the message unacknowledged so that Pub/Sub's redelivery handles the retry. Am I missing something here?

PS: I had to throw error explicitly because 2nd API(event_service) sends event related to the 1st API(update_api_call) status.

Here is my current config which is giving this issue:

input:
  gcp_pubsub:
    project: project-name
    subscription: subscription-name
    sync: true

pipeline:
  threads: 0
  processors:
    - branch:
        request_map: |
          root = {
            "A": this.A.number(),
          }
        processors:
          - try:
              - resource: update_api_call
          - catch:
              - mapping: |
                  root = {
                    "text": "@here Alert message update_api_call failure"
                  }
              - resource: slack_service
              - mapping: 'root = {"error": error()}'
        result_map: |
          root.responses.update_api_call = {
            "response": if this.error == null { this } else { null },
            "error": if this.error != null { this.error } else { meta("error") }
          }

    - branch:
        request_map: |
          root = {
            "i": uuid_v4(),
            "data": {
              "event": "event_name",
              "properties": {
                "A": this.A,
                "status": if this.responses.update_api_call.error == null { true } else { false },
                "failureReason": this.responses.update_api_call.error
              }
            }
          }
        processors:
          - try:
              - resource: event_service
          - catch:
              - mapping: |
                  root = {
                    "text": "@here Alert message send event failure",
                  }
              - resource: slack_service
              - mapping: 'root = {"error": error()}'
        result_map: |
          root.responses.event_service = {
            "response": if this.error == null { this } else { null },
            "error": if this.error != null { this.error } else { meta("error") }
          }

    # New processor to fail the message if either branch recorded an error.
    - bloblang: |
        if this.responses.update_api_call.error != null || this.responses.event_service.error != null {
          throw("Intentional error") # for leaving message unacked
        }

output:
  label: responses
  stdout:
    codec: lines
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant