-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRelayMessagesThroughConsumer.php
69 lines (58 loc) · 2.03 KB
/
RelayMessagesThroughConsumer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
<?php
namespace EventSauce\MessageOutbox;
use EventSauce\BackOff\BackOffStrategy;
use EventSauce\BackOff\ExponentialBackOffStrategy;
use EventSauce\EventSourcing\Message;
use EventSauce\EventSourcing\MessageConsumer;
use Throwable;
use function count;
class RelayMessagesThroughConsumer implements RelayMessages
{
private BackOffStrategy $backOff;
private RelayCommitStrategy $commitStrategy;
public function __construct(
private OutboxRepository $repository,
private MessageConsumer $consumer,
BackOffStrategy $backOff = null,
RelayCommitStrategy $commitStrategy = null
) {
$this->backOff = $backOff ?: new ExponentialBackOffStrategy(100000, 25);
$this->commitStrategy = $commitStrategy ?: new MarkMessagesConsumedOnCommit();
}
public function publishBatch(int $batchSize, int $commitSize = 1): int
{
/** @var Message $messages */
$messages = $this->repository->retrieveBatch($batchSize);
$numberPublished = 0;
/** @var Message $publishedMessages */
$publishedMessages = [];
foreach ($messages as $message) {
$tries = 0;
start_relay:
try {
$tries++;
$this->consumer->handle($message);
$publishedMessages[] = $message;
if (($numberPublished + 1) % $commitSize === 0) {
$this->commitMessages($publishedMessages);
$publishedMessages = [];
}
} catch (Throwable $throwable) {
$this->backOff->backOff($tries, $throwable);
goto start_relay;
}
$numberPublished++;
}
if (count($publishedMessages) > 0) {
$this->commitMessages($publishedMessages);
}
return $numberPublished;
}
/**
* @param Message[] $messages
*/
private function commitMessages(array $messages): void
{
$this->commitStrategy->commitMessages($this->repository, ...$messages);
}
}