From 31f5b28a5b72b692f6f6dfcbd45b9215c3a792c4 Mon Sep 17 00:00:00 2001 From: Bartosz Drozd Date: Tue, 28 Jan 2025 16:07:11 +0100 Subject: [PATCH] docs: update readme & small var renaming --- README.md | 13 +++++++++---- packages/core/lib/queues/AbstractQueueService.ts | 4 ++-- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index f8b57d73..6d558afb 100644 --- a/README.md +++ b/README.md @@ -400,8 +400,9 @@ See [@message-queue-toolkit/metrics](packages/metrics/README.md) for concrete im Publisher-level store-based message deduplication is a mechanism that prevents the same message from being sent to the queue multiple times. It is useful when you want to ensure that a message is published only once, regardless of how many times it is sent. -The mechanism relies on a deduplication store, which is used to store deduplication keys for a certain period of time. -Before message is published, a deduplication key is generated based on the message content and checked against the store. +The mechanism relies on: +1. a deduplication store, which is used to store deduplication keys for a certain period of time +2. a deduplication key, which must be generated and passed as the message property (`messageDeduplicationIdField` property of publisher configuration lets you specify the field in the message that contains the deduplication id). A good deduplication key should be unique for each message and should not change between retries. It should also be relatively short to avoid unnecessary storage costs. Note that in case of some queuing systems, such as standard SQS, publisher-level deduplication is not sufficient to guarantee that a message is **processed** only once. This is because standard SQS has an at-least-once delivery guarantee, which means that a message can be delivered more than once. @@ -459,12 +460,16 @@ Instead, you should either enable content-based deduplication on the queue or pa Consumer-level store-based message deduplication is a mechanism that prevents the same message from being processed multiple times. It is useful when you want to be sure that message is processed only once, regardless of how many times it is received. -The mechanism relies on a deduplication store, which is used to store deduplication keys for a certain period of time. -Upon processing a message, a deduplication key is generated based on the message content and checked against the store. +The mechanism relies on: +1. a deduplication store, which is used to store deduplication keys for a certain period of time. +2. a deduplication key, which should be stored as part of the message before publishing it (`messageDeduplicationIdField` property of `newPublisherOptions` lets you specify the field in the message that contains the deduplication id) + In case the key doesn't exist in the store, key is stored with status `PROCESSING` in the store for a certain period of time (i.e. `maximumProcessingTimeSeconds`) and the message is processed. Upon successful processing, deduplication key TTL is updated to `deduplicationWindowSeconds` to prevent processing the same message again. The value is updated to `PROCESSED`. In case of errors handled gracefully by a consumer, deduplication key is removed from the store to allow instant reprocessing of the message. In case of unexpected errors, the message will be retried again after deduplication key TTL expires. +In case of another consumer trying to process the same message while its status is `PROCESSING`, the message will be retried again according to `queueMessageForRetry` method which can be override by the consumer. +In case of another consumer trying to process the same message while its status is `PROCESSED`, the message will be considered as a duplicate and will be ignored. In case you would like to use SQS FIFO deduplication feature, this feature won't handle it for you. Instead, you should either enable content-based deduplication on the queue or pass `MessageDeduplicationId` within message options when publishing a message. diff --git a/packages/core/lib/queues/AbstractQueueService.ts b/packages/core/lib/queues/AbstractQueueService.ts index 4ad9444d..3c73ec73 100644 --- a/packages/core/lib/queues/AbstractQueueService.ts +++ b/packages/core/lib/queues/AbstractQueueService.ts @@ -635,14 +635,14 @@ export abstract class AbstractQueueService< .consumerMessageDeduplicationConfig as ConsumerMessageDeduplicationConfig try { - const result = await consumerDeduplicationConfig.deduplicationStore.setIfNotExists( + const wasLockAcquired = await consumerDeduplicationConfig.deduplicationStore.setIfNotExists( deduplicationId, ConsumerMessageDeduplicationKeyStatus.PROCESSING, messageDeduplicationConfig.maximumProcessingTimeSeconds, ) // Deduplication key was just created meaning the lock was acquired and message can be processed - if (result) { + if (wasLockAcquired) { return true }