-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathInMemoryOutboxRepository.php
104 lines (84 loc) · 2.71 KB
/
InMemoryOutboxRepository.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
<?php
namespace EventSauce\MessageOutbox;
use EventSauce\EventSourcing\Message;
use Traversable;
use function array_reduce;
use function array_slice;
use function count;
class InMemoryOutboxRepository implements OutboxRepository
{
const MESSAGE_ID_HEADER = '__in-memory.message-id';
const IS_CONSUMED_HEADER = '__in-memory.message-is-consumed';
private int $idCounter = 0;
/** @var array<int, Message> */
private array $messages = [];
public function persist(Message ...$messages): void
{
foreach ($messages as $message) {
$id = ++$this->idCounter;
$this->messages[$id] = $message->withHeader(self::MESSAGE_ID_HEADER, $id);
}
}
public function retrieveBatch(int $batchSize): Traversable
{
/** @var list<Message> $messages */
$messages = array_slice(array_values($this->messages), 0, $batchSize);
foreach ($messages as $message) {
if ($message->header(self::IS_CONSUMED_HEADER) !== 1) {
yield $message;
}
}
}
public function markConsumed(Message ...$messages): void
{
foreach ($messages as $message) {
$this->messages[$this->idFromMessage($message)] = $message->withHeader(self::IS_CONSUMED_HEADER, 1);
}
}
private function idFromMessage(Message $message): int
{
/** @var int|string $id */
$id = $message->header(self::MESSAGE_ID_HEADER);
return (int) $id;
}
public function deleteMessages(Message ...$messages): void
{
foreach ($messages as $message) {
unset($this->messages[$this->idFromMessage($message)]);
}
}
public function cleanupConsumedMessages(int $amount): int
{
$deleted = 0;
foreach ($this->messages as $message) {
if ($message->header(self::IS_CONSUMED_HEADER) === 1) {
unset($this->messages[$this->idFromMessage($message)]);
$deleted++;
}
if ($deleted >= $amount) {
break;
}
}
return $deleted;
}
public function numberOfMessages(): int
{
return count($this->messages);
}
public function numberOfConsumedMessages(): int
{
return array_reduce(
$this->messages,
fn(int $count, Message $message) => $message->header(self::IS_CONSUMED_HEADER) === 1 ? $count + 1 : $count,
0
);
}
public function numberOfPendingMessages(): int
{
return array_reduce(
$this->messages,
fn(int $count, Message $message) => $message->header(self::IS_CONSUMED_HEADER) === 1 ? $count : $count + 1,
0
);
}
}