feat: add in-memory event store implementation#36
Conversation
WalkthroughThis change introduces in-memory implementations of the Changes
Uplevel your code reviews with CodeRabbit ProCodeRabbit ProIf you like this project, please support us by purchasing the Pro version. The Pro version has advanced context, superior noise reduction and several proprietary improvements compared to the open source version. Moreover, CodeRabbit Pro is free for open source projects. |
There was a problem hiding this comment.
Files selected (4)
- src/EventStoreFactory.php (1)
- src/Internal/EventStoreAsyncInMemory.php (1)
- src/Internal/EventStoreInMemory.php (1)
- tests/Unit/EventStoreInMemoryTest.php (1)
Review comments generated (15)
- Review: 11
- LGTM: 4
Tips
Chat with
CodeRabbit Bot (@coderabbitai)
- Reply on review comments left by this bot to ask follow-up questions. A review comment is a comment on a diff or a file.
- Invite the bot into a review comment chain by tagging
@coderabbitaiin a reply.
Code suggestions
- The bot may make code suggestions, but please review them carefully before committing since the line number ranges may be misaligned.
- You can edit the comment made by the bot and manually tweak the suggestion if it is slightly off.
Pausing incremental reviews
- Add
@coderabbitai: ignoreanywhere in the PR description to pause further reviews from the bot.
| public function persistEvent(Event $event, int $version): void { | ||
| if ($event->isCreated()) { | ||
| throw new \RuntimeException('event is created'); | ||
| } | ||
|
|
||
| $aggregateId = $event->getAggregateId()->asString(); | ||
|
|
||
| if ($this->snapshots[$aggregateId]->getVersion() !== $version) { | ||
| throw new OptimisticLockException( | ||
| 'Transaction write was canceled due to conditional check failure' | ||
| ); | ||
| } | ||
|
|
||
| $newVersion = $this->snapshots[$aggregateId]->getVersion() + 1; | ||
| $this->events[$aggregateId][] = $event; | ||
|
|
||
| $snapshot = $this->snapshots[$aggregateId]; | ||
| $snapshot = $snapshot->withVersion($newVersion); | ||
| $this->snapshots[$aggregateId] = $snapshot; | ||
| } |
There was a problem hiding this comment.
There is a potential issue with the persistEvent method. It assumes that an event for a given aggregate ID already exists in the $snapshots array. If it doesn't, this will result in a PHP Notice: Undefined index. To avoid this, you should check if the aggregate ID exists in the $snapshots array before trying to access it.
- if ($this->snapshots[$aggregateId]->getVersion() !== $version) {
+ if (!isset($this->snapshots[$aggregateId]) || $this->snapshots[$aggregateId]->getVersion() !== $version) {
throw new OptimisticLockException(
'Transaction write was canceled due to conditional check failure'
);
}| public function persistEventAndSnapshot(Event $event, Aggregate $aggregate): void { | ||
| $aggregateId = $event->getAggregateId()->asString(); | ||
| $newVersion = self::INITIAL_VERSION; | ||
|
|
||
| if (!$event->isCreated()) { | ||
| $version = $this->snapshots[$aggregateId]->getVersion(); | ||
| if ($version !== $aggregate->getVersion()) { | ||
| throw new OptimisticLockException( | ||
| 'Transaction write was canceled due to conditional check failure' | ||
| ); | ||
| } | ||
| $newVersion = $this->snapshots[$aggregateId]->getVersion() + 1; | ||
| } | ||
|
|
||
| $this->events[$aggregateId][] = $event; | ||
| $this->snapshots[$aggregateId] = $aggregate->withVersion($newVersion); | ||
| } |
There was a problem hiding this comment.
Similar to the previous comment, the persistEventAndSnapshot method also assumes that an event for a given aggregate ID already exists in the $snapshots array. You should add a check to ensure the aggregate ID exists in the $snapshots array before trying to access it.
- if (!$event->isCreated()) {
+ if (!$event->isCreated() && isset($this->snapshots[$aggregateId])) {
$version = $this->snapshots[$aggregateId]->getVersion();
if ($version !== $aggregate->getVersion()) {
throw new OptimisticLockException(
'Transaction write was canceled due to conditional check failure'
);
}
$newVersion = $this->snapshots[$aggregateId]->getVersion() + 1;
}| if ($this->snapshots[$aggregateId]->getVersion() !== $version) { | ||
| throw new OptimisticLockException( | ||
| 'Transaction write was canceled due to conditional check failure' | ||
| ); | ||
| } |
There was a problem hiding this comment.
There is no check if the snapshot for a given aggregateId exists before accessing it. This could lead to an undefined index error. Consider adding a check before accessing the array.
- if ($this->snapshots[$aggregateId]->getVersion() !== $version) {
+ if (!isset($this->snapshots[$aggregateId]) || $this->snapshots[$aggregateId]->getVersion() !== $version) {| $version = $this->snapshots[$aggregateId]->getVersion(); | ||
| if ($version !== $aggregate->getVersion()) { | ||
| throw new OptimisticLockException( | ||
| 'Transaction write was canceled due to conditional check failure' | ||
| ); | ||
| } | ||
| $newVersion = $this->snapshots[$aggregateId]->getVersion() + 1; | ||
| } |
There was a problem hiding this comment.
Similar to the previous comment, there is no check if the snapshot for a given aggregateId exists before accessing it. This could lead to an undefined index error. Consider adding a check before accessing the array.
- $version = $this->snapshots[$aggregateId]->getVersion();
+ $version = isset($this->snapshots[$aggregateId]) ? $this->snapshots[$aggregateId]->getVersion() : null;| $promise = new FulfilledPromise($aggregateId); | ||
| $promise->then(function (AggregateId $aggregateId) { | ||
| $id = $aggregateId->asString(); | ||
| if (isset($this->snapshots[$id])) { | ||
| return $this->snapshots[$id]; | ||
| } | ||
| return null; | ||
| }); |
There was a problem hiding this comment.
In the getLatestSnapshotById method, the promise is fulfilled but the result of the then function is not returned. This will cause the promise to always resolve with the original value ($aggregateId) instead of the expected snapshot. You should return the result of the then function.
- $promise->then(function (AggregateId $aggregateId) {
+ return $promise->then(function (AggregateId $aggregateId) {| $promise = new FulfilledPromise([$aggregateId, $sequenceNumber]); | ||
| $promise->then(function ($arg) { | ||
| /** @var AggregateId $aggregateId */ | ||
| /** @var int $sequenceNumber */ | ||
| [$aggregateId, $sequenceNumber] = $arg; | ||
| $result = []; | ||
| $id = $aggregateId->asString(); | ||
|
|
||
| if (!isset($this->events[$id])) { | ||
| return $result; | ||
| } | ||
|
|
||
| foreach ($this->events[$id] as $event) { | ||
| if ($event->getSequenceNumber() >= $sequenceNumber) { | ||
| $result[] = $event; | ||
| } | ||
| } | ||
| return $result; | ||
| }); |
There was a problem hiding this comment.
In the getEventsByIdSinceSequenceNumber method, the promise is fulfilled but the result of the then function is not returned. This will cause the promise to always resolve with the original value ([$aggregateId, $sequenceNumber]) instead of the expected events. You should return the result of the then function.
- $promise->then(function ($arg) {
+ return $promise->then(function ($arg) {Overall, this class seems to be a good start for an in-memory event store implementation. However, it lacks error handling and could benefit from more robustness. Consider adding checks for undefined indices and ensuring promises are correctly resolved.
| * @throws \Exception | ||
| */ | ||
| public function testPersist(): void { | ||
| $eventStore = EventStoreFactory::createInMemory()->withKeepSnapshot(true); |
There was a problem hiding this comment.
The withKeepSnapshot(true) method is called on the EventStore instance, but it's not clear if this method exists in the EventStore interface. If it doesn't exist, it could lead to a fatal error. Please ensure that the withKeepSnapshot method is defined in the EventStore interface or in the classes that implement this interface.
- $eventStore = EventStoreFactory::createInMemory()->withKeepSnapshot(true);
+ $eventStore = EventStoreFactory::createInMemory();
+ if (method_exists($eventStore, 'withKeepSnapshot')) {
+ $eventStore->withKeepSnapshot(true);
+ }| $userAccountId = new UserAccountId(); | ||
| [$userAccount, $event] = UserAccount::create($userAccountId, "test"); | ||
| $eventStore->persistEventAndSnapshot($event, $userAccount); |
There was a problem hiding this comment.
There's no error handling for the UserAccount::create() method. If it fails, it might return null which would cause a type error when trying to persist the event and snapshot. Consider adding error handling here.
- $userAccountId = new UserAccountID();
- [$userAccount, $event] = UserAccount::create($userAccountId, "test");
- $eventStore->persistEventAndSnapshot($event, $userAccount);
+ $userAccountId = new UserAccountID();
+ [$userAccount, $event] = UserAccount::create($userAccountId, "test");
+ if ($userAccount !== null && $event !== null) {
+ $eventStore->persistEventAndSnapshot($event, $userAccount);
+ } else {
+ $this->fail("UserAccount creation failed.");
+ }| $result = $eventStore->getLatestSnapshotById($userAccountId); | ||
| $userAccountResult = null; | ||
| if ($result instanceof UserAccount) { | ||
| $userAccountResult = $result; | ||
| } | ||
|
|
||
| if ($userAccount instanceof UserAccount && $userAccountResult instanceof UserAccount) { | ||
| $this->assertTrue($userAccount->getId()->equals($userAccountResult->getId())); | ||
| $this->assertTrue($userAccount->equals($userAccountResult), "object"); | ||
| } else { | ||
| $this->fail(); | ||
| } |
There was a problem hiding this comment.
The code checks if $result and $userAccount are instances of UserAccount, but there's no handling for the case where they're not. This could lead to unexpected behavior. Consider adding an else clause to handle this case.
- $result = $eventStore->getLatestSnapshotById($userAccountId);
- $userAccountResult = null;
- if ($result instanceof UserAccount) {
- $userAccountResult = $result;
- }
-
- if ($userAccount instanceof UserAccount && $userAccountResult instanceof UserAccount) {
- $this->assertTrue($userAccount->getId()->equals($userAccountResult->getId()));
- $this->assertTrue($userAccount->equals($userAccountResult), "object");
- } else {
- $this->fail();
- }
+ $result = $eventStore->getLatestSnapshotById($userAccountId);
+ $userAccountResult = null;
+ if ($result instanceof UserAccount) {
+ $userAccountResult = $result;
+ } else {
+ $this->fail("Latest snapshot is not an instance of UserAccount.");
+ }
+
+ if ($userAccount instanceof UserAccount && $userAccountResult instanceof UserAccount) {
+ $this->assertTrue($userAccount->getId()->equals($userAccountResult->getId()));
+ $this->assertTrue($userAccount->equals($userAccountResult), "object");
+ } else {
+ $this->fail("UserAccount or UserAccountResult is not an instance of UserAccount.");
+ }| [$userAccount2, $event2] = $userAccountResult->rename("test-2"); | ||
|
|
||
| $eventStore->persistEvent($event2, $userAccount2->getVersion()); | ||
|
|
||
| $snapshot2 = $eventStore->getLatestSnapshotById($userAccountId); | ||
| if ($snapshot2 instanceof UserAccount) { | ||
| /** @var array<UserAccountEvent> $events */ | ||
| $events = $eventStore->getEventsByIdSinceSequenceNumber($userAccountId, $snapshot2->getSequenceNumber()); | ||
| $aggregate2 = UserAccount::replay($events, $snapshot2); | ||
| $this->assertTrue($aggregate2->getName() === "test-2"); | ||
| } |
There was a problem hiding this comment.
There's no error handling for the rename method and the replay method. If they fail, it might return null which would cause a type error when trying to persist the event and get the name of the aggregate. Consider adding error handling here.
- [$userAccount2, $event2] = $userAccountResult->rename("test-2");
- $eventStore->persistEvent($event2, $userAccount2->getVersion());
- $snapshot2 = $eventStore->getLatestSnapshotById($userAccountId);
- if ($snapshot2 instanceof UserAccount) {
- /** @var array<UserAccountEvent> $events */
- $events = $eventStore->getEventsByIdSinceSequenceNumber($userAccountId, $snapshot2->getSequenceNumber());
- $aggregate2 = UserAccount::replay($events, $snapshot2);
- $this->assertTrue($aggregate2->getName() === "test-2");
- }
+ [$userAccount2, $event2] = $userAccountResult->rename("test-2");
+ if ($userAccount2 !== null && $event2 !== null) {
+ $eventStore->persistEvent($event2, $userAccount2->getVersion());
+ $snapshot2 = $eventStore->getLatestSnapshotById($userAccountId);
+ if ($snapshot2 instanceof UserAccount) {
+ /** @var array<UserAccountEvent> $events */
+ $events = $eventStore->getEventsByIdSinceSequenceNumber($userAccountId, $snapshot2->getSequenceNumber());
+ $aggregate2 = UserAccount::replay($events, $snapshot2);
+ if ($aggregate2 !== null) {
+ $this->assertTrue($aggregate2->getName() === "test-2");
+ } else {
+ $this->fail("Replay returned null.");
+ }
+ } else {
+ $this->fail("Latest snapshot is not an instance of UserAccount.");
+ }
+ } else {
+ $this->fail("Rename operation failed.");
+ }There was a problem hiding this comment.
Files selected (2)
- src/Internal/EventStoreAsyncInMemory.php (1)
- src/Internal/EventStoreInMemory.php (1)
Review comments generated (10)
- Review: 10
- LGTM: 0
Tips
Chat with
CodeRabbit Bot (@coderabbitai)
- Reply on review comments left by this bot to ask follow-up questions. A review comment is a comment on a diff or a file.
- Invite the bot into a review comment chain by tagging
@coderabbitaiin a reply.
Code suggestions
- The bot may make code suggestions, but please review them carefully before committing since the line number ranges may be misaligned.
- You can edit the comment made by the bot and manually tweak the suggestion if it is slightly off.
Pausing incremental reviews
- Add
@coderabbitai: ignoreanywhere in the PR description to pause further reviews from the bot.
| public function withKeepSnapshot(bool $keepSnapshot): EventStore { | ||
| return $this; | ||
| } | ||
|
|
||
| public function withDeleteTtl(int $deleteTtlInMillSec): EventStore { | ||
| return $this; | ||
| } | ||
|
|
||
| public function withKeepSnapshotCount(int $keepSnapshotCount): EventStore { | ||
| return $this; | ||
| } | ||
|
|
||
| public function withKeyResolver(KeyResolver $keyResolver): EventStore { | ||
| return $this; | ||
| } | ||
|
|
||
| public function withEventSerializer(EventSerializer $eventSerializer): EventStore { | ||
| return $this; | ||
| } | ||
|
|
||
| public function withSnapshotSerializer(SnapshotSerializer $snapshotSerializer): EventStore { | ||
| return $this; | ||
| } |
There was a problem hiding this comment.
The methods withKeepSnapshot, withDeleteTtl, withKeepSnapshotCount, withKeyResolver, withEventSerializer, and withSnapshotSerializer are not implemented. They just return $this without doing anything. If these methods are not needed, consider removing them. If they are needed, implement the necessary logic.
- public function withKeepSnapshot(bool $keepSnapshot): EventStore {
- return $this;
- }
-
- public function withDeleteTtl(int $deleteTtlInMillSec): EventStore {
- return $this;
- }
-
- public function withKeepSnapshotCount(int $keepSnapshotCount): EventStore {
- return $this;
- }
-
- public function withKeyResolver(KeyResolver $keyResolver): EventStore {
- return $this;
- }
-
- public function withEventSerializer(EventSerializer $eventSerializer): EventStore {
- return $this;
- }
-
- public function withSnapshotSerializer(SnapshotSerializer $snapshotSerializer): EventStore {
- return $this;
- }| public function persistEvent(Event $event, int $version): void { | ||
| if ($event->isCreated()) { | ||
| throw new \RuntimeException('event is created'); | ||
| } | ||
|
|
||
| $aggregateId = $event->getAggregateId()->asString(); | ||
|
|
||
| if (!isset($this->snapshots[$aggregateId]) || $this->snapshots[$aggregateId]->getVersion() !== $version) { | ||
| throw new OptimisticLockException( | ||
| 'Transaction write was canceled due to conditional check failure' | ||
| ); | ||
| } | ||
|
|
||
| $newVersion = $this->snapshots[$aggregateId]->getVersion() + 1; | ||
| $this->events[$aggregateId][] = $event; | ||
|
|
||
| $snapshot = $this->snapshots[$aggregateId]; | ||
| $snapshot = $snapshot->withVersion($newVersion); | ||
| $this->snapshots[$aggregateId] = $snapshot; | ||
| } |
There was a problem hiding this comment.
The method persistEvent is not thread-safe. If two threads call this method at the same time, it could lead to a race condition where both threads read the same version of the snapshot, increment it, and then write back their own versions. This would result in one of the events being lost. Consider using a locking mechanism to ensure that only one thread can execute this section of code at a time.
| public function persistEventAndSnapshot(Event $event, Aggregate $aggregate): void { | ||
| $aggregateId = $event->getAggregateId()->asString(); | ||
| $newVersion = self::INITIAL_VERSION; | ||
|
|
||
| if (!$event->isCreated() && isset($this->snapshots[$aggregateId])) { | ||
| if ($this->snapshots[$aggregateId]->getVersion() !== $aggregate->getVersion()) { | ||
| throw new OptimisticLockException( | ||
| 'Transaction write was canceled due to conditional check failure' | ||
| ); | ||
| } | ||
| $newVersion = $this->snapshots[$aggregateId]->getVersion() + 1; | ||
| } | ||
|
|
||
| $this->events[$aggregateId][] = $event; | ||
| $this->snapshots[$aggregateId] = $aggregate->withVersion($newVersion); | ||
| } |
| public function getEventsByIdSinceSequenceNumber(AggregateId $aggregateId, int $sequenceNumber): array { | ||
| $result = []; | ||
| $id = $aggregateId->asString(); | ||
|
|
||
| if (!isset($this->events[$id])) { | ||
| return $result; | ||
| } | ||
|
|
||
| foreach ($this->events[$id] as $event) { | ||
| if ($event->getSequenceNumber() >= $sequenceNumber) { | ||
| $result[] = $event; | ||
| } | ||
| } | ||
| return $result; | ||
| } |
There was a problem hiding this comment.
The method getEventsByIdSinceSequenceNumber could be optimized. Currently, it iterates over all events for a given ID, even if the sequence number is very high and most of the events do not satisfy the condition. Consider storing the events in a data structure that allows efficient range queries, such as a balanced search tree or a skip list.
- public function getEventsByIdSinceSequenceNumber(AggregateId $aggregateId, int $sequenceNumber): array {
- $result = [];
- $id = $aggregateId->asString();
-
- if (!isset($this->events[$id])) {
- return $result;
- }
-
- foreach ($this->events[$id] as $event) {
- if ($event->getSequenceNumber() >= $sequenceNumber) {
- $result[] = $event;
- }
- }
- return $result;
- }| <?php | ||
|
|
||
| namespace J5ik2o\EventStoreAdapterPhp\Internal; | ||
|
|
||
| use GuzzleHttp\Promise\FulfilledPromise; | ||
| use GuzzleHttp\Promise\PromiseInterface; | ||
| use J5ik2o\EventStoreAdapterPhp\Aggregate; | ||
| use J5ik2o\EventStoreAdapterPhp\AggregateId; | ||
| use J5ik2o\EventStoreAdapterPhp\Event; | ||
| use J5ik2o\EventStoreAdapterPhp\EventSerializer; | ||
| use J5ik2o\EventStoreAdapterPhp\EventStoreAsync; | ||
| use J5ik2o\EventStoreAdapterPhp\KeyResolver; | ||
| use J5ik2o\EventStoreAdapterPhp\SnapshotSerializer; | ||
| use J5ik2o\EventStoreAdapterPhp\OptimisticLockException; | ||
|
|
||
| class EventStoreAsyncInMemory implements EventStoreAsync { | ||
| private const INITIAL_VERSION = 1; | ||
| /** @var array<string, array<Event>> */ | ||
| private array $events; | ||
|
|
||
| /** @var array<string, Aggregate> */ | ||
| private array $snapshots; | ||
|
|
||
| public function __construct() { | ||
| $this->events = []; | ||
| $this->snapshots = []; | ||
| } | ||
|
|
||
| public function withKeepSnapshot(bool $keepSnapshot): EventStoreAsync { | ||
| return $this; | ||
| } | ||
|
|
||
| public function withDeleteTtl(int $deleteTtlInMillSec): EventStoreAsync { | ||
| return $this; | ||
| } | ||
|
|
||
| public function withKeepSnapshotCount(int $keepSnapshotCount): EventStoreAsync { | ||
| return $this; | ||
| } | ||
|
|
||
| public function withKeyResolver(KeyResolver $keyResolver): EventStoreAsync { | ||
| return $this; | ||
| } | ||
|
|
||
| public function withEventSerializer(EventSerializer $eventSerializer): EventStoreAsync { | ||
| return $this; | ||
| } | ||
|
|
||
| public function withSnapshotSerializer(SnapshotSerializer $snapshotSerializer): EventStoreAsync { | ||
| return $this; | ||
| } | ||
|
|
||
| public function persistEvent(Event $event, int $version): PromiseInterface { | ||
| $promise = new FulfilledPromise([$event, $version]); | ||
| return $promise->then(function ($arg) { | ||
| /** @var Event $event */ | ||
| /** @var int $version */ | ||
| [$event, $version] = $arg; | ||
| if ($event->isCreated()) { | ||
| throw new \RuntimeException('event is created'); | ||
| } | ||
|
|
||
| $aggregateId = $event->getAggregateId()->asString(); | ||
|
|
||
| if (!isset($this->snapshots[$aggregateId]) || $this->snapshots[$aggregateId]->getVersion() !== $version) { | ||
| throw new OptimisticLockException( | ||
| 'Transaction write was canceled due to conditional check failure' | ||
| ); | ||
| } | ||
|
|
||
| $newVersion = $this->snapshots[$aggregateId]->getVersion() + 1; | ||
| $this->events[$aggregateId][] = $event; | ||
|
|
||
| $snapshot = $this->snapshots[$aggregateId]; | ||
| $snapshot = $snapshot->withVersion($newVersion); | ||
| $this->snapshots[$aggregateId] = $snapshot; | ||
| }); | ||
| } | ||
|
|
||
| public function persistEventAndSnapshot(Event $event, Aggregate $aggregate): PromiseInterface { | ||
| $promise = new FulfilledPromise([$event, $aggregate]); | ||
| return $promise->then(function ($arg) { | ||
| /** @var Event $event */ | ||
| /** @var Aggregate $aggregate */ | ||
| [$event, $aggregate] = $arg; | ||
| $aggregateId = $event->getAggregateId()->asString(); | ||
| $newVersion = self::INITIAL_VERSION; | ||
|
|
||
| if (!$event->isCreated() && isset($this->snapshots[$aggregateId])) { | ||
| if ($this->snapshots[$aggregateId]->getVersion() !== $aggregate->getVersion()) { | ||
| throw new OptimisticLockException( | ||
| 'Transaction write was canceled due to conditional check failure' | ||
| ); | ||
| } | ||
| $newVersion = $this->snapshots[$aggregateId]->getVersion() + 1; | ||
| } | ||
|
|
||
| $this->events[$aggregateId][] = $event; | ||
| $this->snapshots[$aggregateId] = $aggregate->withVersion($newVersion); | ||
| return null; | ||
| }); | ||
| } | ||
|
|
||
| public function getLatestSnapshotById(AggregateId $aggregateId): PromiseInterface { | ||
| $promise = new FulfilledPromise($aggregateId); | ||
| return $promise->then(function (AggregateId $aggregateId) { | ||
| $id = $aggregateId->asString(); | ||
| if (isset($this->snapshots[$id])) { | ||
| return $this->snapshots[$id]; | ||
| } | ||
| return null; | ||
| }); | ||
| } | ||
|
|
||
| public function getEventsByIdSinceSequenceNumber(AggregateId $aggregateId, int $sequenceNumber): PromiseInterface { | ||
| $promise = new FulfilledPromise([$aggregateId, $sequenceNumber]); | ||
| return $promise->then(function ($arg) { | ||
| /** @var AggregateId $aggregateId */ | ||
| /** @var int $sequenceNumber */ | ||
| [$aggregateId, $sequenceNumber] = $arg; | ||
| $result = []; | ||
| $id = $aggregateId->asString(); | ||
|
|
||
| if (!isset($this->events[$id])) { | ||
| return $result; | ||
| } | ||
|
|
||
| foreach ($this->events[$id] as $event) { | ||
| if ($event->getSequenceNumber() >= $sequenceNumber) { | ||
| $result[] = $event; | ||
| } | ||
| } | ||
| return $result; | ||
| }); | ||
| } | ||
| } |
| public function withKeepSnapshot(bool $keepSnapshot): EventStoreAsync { | ||
| return $this; | ||
| } | ||
|
|
||
| public function withDeleteTtl(int $deleteTtlInMillSec): EventStoreAsync { | ||
| return $this; | ||
| } | ||
|
|
||
| public function withKeepSnapshotCount(int $keepSnapshotCount): EventStoreAsync { | ||
| return $this; | ||
| } | ||
|
|
||
| public function withKeyResolver(KeyResolver $keyResolver): EventStoreAsync { | ||
| return $this; | ||
| } | ||
|
|
||
| public function withEventSerializer(EventSerializer $eventSerializer): EventStoreAsync { | ||
| return $this; | ||
| } | ||
|
|
||
| public function withSnapshotSerializer(SnapshotSerializer $snapshotSerializer): EventStoreAsync { | ||
| return $this; | ||
| } |
There was a problem hiding this comment.
The methods withKeepSnapshot, withDeleteTtl, withKeepSnapshotCount, withKeyResolver, withEventSerializer, withSnapshotSerializer do not modify any state of the object and simply return $this. This could be misleading to users of these methods as they might expect some changes to occur in the object state. If these methods are not intended to change the state, consider removing them or implementing their functionality if needed.
- public function withKeepSnapshot(bool $keepSnapshot): EventStoreAsync {
- return $this;
- }
-
- public function withDeleteTtl(int $deleteTtlInMillSec): EventStoreAsync {
- return $this;
- }
-
- public function withKeepSnapshotCount(int $keepSnapshotCount): EventStoreAsync {
- return $this;
- }
-
- public function withKeyResolver(KeyResolver $keyResolver): EventStoreAsync {
- return $this;
- }
-
- public function withEventSerializer(EventSerializer $eventSerializer): EventStoreAsync {
- return $this;
- }
-
- public function withSnapshotSerializer(SnapshotSerializer $snapshotSerializer): EventStoreAsync {
- return $this;
- }| public function persistEvent(Event $event, int $version): PromiseInterface { | ||
| $promise = new FulfilledPromise([$event, $version]); | ||
| return $promise->then(function ($arg) { | ||
| /** @var Event $event */ | ||
| /** @var int $version */ | ||
| [$event, $version] = $arg; | ||
| if ($event->isCreated()) { | ||
| throw new \RuntimeException('event is created'); | ||
| } | ||
|
|
||
| $aggregateId = $event->getAggregateId()->asString(); | ||
|
|
||
| if (!isset($this->snapshots[$aggregateId]) || $this->snapshots[$aggregateId]->getVersion() !== $version) { | ||
| throw new OptimisticLockException( | ||
| 'Transaction write was canceled due to conditional check failure' | ||
| ); | ||
| } | ||
|
|
||
| $newVersion = $this->snapshots[$aggregateId]->getVersion() + 1; | ||
| $this->events[$aggregateId][] = $event; | ||
|
|
||
| $snapshot = $this->snapshots[$aggregateId]; | ||
| $snapshot = $snapshot->withVersion($newVersion); | ||
| $this->snapshots[$aggregateId] = $snapshot; | ||
| }); | ||
| } |
There was a problem hiding this comment.
The persistEvent method uses a FulfilledPromise to wrap its logic. However, since all operations inside the promise are synchronous, there is no need to use a promise here. The same applies to the persistEventAndSnapshot, getLatestSnapshotById, and getEventsByIdSinceSequenceNumber methods.
- public function persistEvent(Event $event, int $version): PromiseInterface {
- $promise = new FulfilledPromise([$event, $version]);
- return $promise->then(function ($arg) {
- /** @var Event $event */
- /** @var int $version */
- [$event, $version] = $arg;
- if ($event->isCreated()) {
- throw new \RuntimeException('event is created');
- }
-
- $aggregateId = $event->getAggregateId()->asString();
-
- if (!isset($this->snapshots[$aggregateId]) || $this->snapshots[$aggregateId]->getVersion() !== $version) {
- throw new OptimisticLockException(
- 'Transaction write was canceled due to conditional check failure'
- );
- }
-
- $newVersion = $this->snapshots[$aggregateId]->getVersion() + 1;
- $this->events[$aggregateId][] = $event;
-
- $snapshot = $this->snapshots[$aggregateId];
- $snapshot = $snapshot->withVersion($newVersion);
- $this->snapshots[$aggregateId] = $snapshot;
- });
- }| public function persistEventAndSnapshot(Event $event, Aggregate $aggregate): PromiseInterface { | ||
| $promise = new FulfilledPromise([$event, $aggregate]); | ||
| return $promise->then(function ($arg) { | ||
| /** @var Event $event */ | ||
| /** @var Aggregate $aggregate */ | ||
| [$event, $aggregate] = $arg; | ||
| $aggregateId = $event->getAggregateId()->asString(); | ||
| $newVersion = self::INITIAL_VERSION; | ||
|
|
||
| if (!$event->isCreated() && isset($this->snapshots[$aggregateId])) { | ||
| if ($this->snapshots[$aggregateId]->getVersion() !== $aggregate->getVersion()) { | ||
| throw new OptimisticLockException( | ||
| 'Transaction write was canceled due to conditional check failure' | ||
| ); | ||
| } | ||
| $newVersion = $this->snapshots[$aggregateId]->getVersion() + 1; | ||
| } | ||
|
|
||
| $this->events[$aggregateId][] = $event; | ||
| $this->snapshots[$aggregateId] = $aggregate->withVersion($newVersion); | ||
| return null; | ||
| }); | ||
| } |
There was a problem hiding this comment.
The persistEventAndSnapshot method has a similar issue with unnecessary promise usage. Also, it returns null at the end which is not necessary for promise-based functions. If no value needs to be returned, you can simply omit the return statement.
- public function persistEventAndSnapshot(Event $event, Aggregate $aggregate): PromiseInterface {
- $promise = new FulfilledPromise([$event, $aggregate]);
- return $promise->then(function ($arg) {
- /** @var Event $event */
- /** @var Aggregate $aggregate */
- [$event, $aggregate] = $arg;
- $aggregateId = $event->getAggregateId()->asString();
- $newVersion = self::INITIAL_VERSION;
-
- if (!$event->isCreated() && isset($this->snapshots[$aggregateId])) {
- if ($this->snapshots[$aggregateId]->getVersion() !== $aggregate->getVersion()) {
- throw new OptimisticLockException(
- 'Transaction write was canceled due to conditional check failure'
- );
- }
- $newVersion = $this->snapshots[$aggregateId]->getVersion() + 1;
- }
-
- $this->events[$aggregateId][] = $event;
- $this->snapshots[$aggregateId] = $aggregate->withVersion($newVersion);
- return null;
- });
- }| public function getLatestSnapshotById(AggregateId $aggregateId): PromiseInterface { | ||
| $promise = new FulfilledPromise($aggregateId); | ||
| return $promise->then(function (AggregateId $aggregateId) { | ||
| $id = $aggregateId->asString(); | ||
| if (isset($this->snapshots[$id])) { | ||
| return $this->snapshots[$id]; | ||
| } | ||
| return null; | ||
| }); |
There was a problem hiding this comment.
The getLatestSnapshotById method also uses a promise unnecessarily.
- public function getLatestSnapshotById(AggregateId $aggregateId): PromiseInterface {
- $promise = new FulfilledPromise($aggregateId);
- return $promise->then(function (AggregateId $aggregateId) {
- $id = $aggregateId->asString();
- if (isset($this->snapshots[$id])) {
- return $this->snapshots[$id];
- }
- return null;
- });
- }| public function getEventsByIdSinceSequenceNumber(AggregateId $aggregateId, int $sequenceNumber): PromiseInterface { | ||
| $promise = new FulfilledPromise([$aggregateId, $sequenceNumber]); | ||
| return $promise->then(function ($arg) { | ||
| /** @var AggregateId $aggregateId */ | ||
| /** @var int $sequenceNumber */ | ||
| [$aggregateId, $sequenceNumber] = $arg; | ||
| $result = []; | ||
| $id = $aggregateId->asString(); | ||
|
|
||
| if (!isset($this->events[$id])) { | ||
| return $result; | ||
| } | ||
|
|
||
| foreach ($this->events[$id] as $event) { | ||
| if ($event->getSequenceNumber() >= $sequenceNumber) { | ||
| $result[] = $event; | ||
| } | ||
| } | ||
| return $result; | ||
| }); |
There was a problem hiding this comment.
The getEventsByIdSinceSequenceNumber method has the same issue with unnecessary promise usage.
- public function getEventsByIdSinceSequenceNumber(AggregateId $aggregateId, int $sequenceNumber): PromiseInterface {
- $promise = new FulfilledPromise([$aggregateId, $sequenceNumber]);
- return $promise->then(function ($arg) {
- /** @var AggregateId $aggregateId */
- /** @var int $sequenceNumber */
- [$aggregateId, $sequenceNumber] = $arg;
- $result = [];
- $id = $aggregateId->asString();
-
- if (!isset($this->events[$id])) {
- return $result;
- }
-
- foreach ($this->events[$id] as $event) {
- if ($event->getSequenceNumber() >= $sequenceNumber) {
- $result[] = $event;
- }
- }
- return $result;
- });
- }There was a problem hiding this comment.
Files selected (2)
- src/Internal/EventStoreAsyncInMemory.php (1)
- src/Internal/EventStoreInMemory.php (1)
Review comments generated (14)
- Review: 8
- LGTM: 6
Tips
Chat with
CodeRabbit Bot (@coderabbitai)
- Reply on review comments left by this bot to ask follow-up questions. A review comment is a comment on a diff or a file.
- Invite the bot into a review comment chain by tagging
@coderabbitaiin a reply.
Code suggestions
- The bot may make code suggestions, but please review them carefully before committing since the line number ranges may be misaligned.
- You can edit the comment made by the bot and manually tweak the suggestion if it is slightly off.
Pausing incremental reviews
- Add
@coderabbitai: ignoreanywhere in the PR description to pause further reviews from the bot.
|
|
||
| public function persistEvent(Event $event, int $version): PromiseInterface { | ||
| $promise = new FulfilledPromise([$event, $version]); | ||
| return $promise->then(function ($arg) { | ||
| /** @var Event $event */ | ||
| /** @var int $version */ | ||
| [$event, $version] = $arg; | ||
| if ($event->isCreated()) { | ||
| throw new \RuntimeException('event is created'); | ||
| } | ||
|
|
||
| $aggregateId = $event->getAggregateId()->asString(); | ||
|
|
||
| if (!isset($this->snapshots[$aggregateId]) || $this->snapshots[$aggregateId]->getVersion() !== $version) { | ||
| throw new OptimisticLockException( | ||
| 'Transaction write was canceled due to conditional check failure' | ||
| ); | ||
| } | ||
|
|
||
| $newVersion = $this->snapshots[$aggregateId]->getVersion() + 1; | ||
| $this->events[$aggregateId][] = $event; | ||
|
|
||
| $snapshot = $this->snapshots[$aggregateId]; | ||
| $snapshot = $snapshot->withVersion($newVersion); | ||
| $this->snapshots[$aggregateId] = $snapshot; | ||
| }); | ||
| } |
There was a problem hiding this comment.
The persistEvent method is not handling the case when an event is being persisted for a new aggregate (i.e., when there is no snapshot yet for the given aggregate ID). This could lead to an Undefined index error at line 65. You should add a check to handle this scenario.
$aggregateId = $event->getAggregateId()->asString();
- if (!isset($this->snapshots[$aggregateId]) || $this->snapshots[$aggregateId]->getVersion() !== $version) {
+ if (!isset($this->snapshots[$aggregateId])) {
+ throw new \RuntimeException('No snapshot found for the given aggregate ID');
+ } elseif ($this->snapshots[$aggregateId]->getVersion() !== $version) {
throw new OptimisticLockException(
'Transaction write was canceled due to conditional check failure'
);
}|
|
||
| public function persistEventAndSnapshot(Event $event, Aggregate $aggregate): PromiseInterface { | ||
| $promise = new FulfilledPromise([$event, $aggregate]); | ||
| return $promise->then(function ($arg) { | ||
| /** @var Event $event */ | ||
| /** @var Aggregate $aggregate */ | ||
| [$event, $aggregate] = $arg; | ||
| $aggregateId = $event->getAggregateId()->asString(); | ||
| $newVersion = self::INITIAL_VERSION; | ||
|
|
||
| if (!$event->isCreated() && isset($this->snapshots[$aggregateId])) { | ||
| if ($this->snapshots[$aggregateId]->getVersion() !== $aggregate->getVersion()) { | ||
| throw new OptimisticLockException( | ||
| 'Transaction write was canceled due to conditional check failure' | ||
| ); | ||
| } | ||
| $newVersion = $this->snapshots[$aggregateId]->getVersion() + 1; | ||
| } | ||
|
|
||
| $this->events[$aggregateId][] = $event; | ||
| $this->snapshots[$aggregateId] = $aggregate->withVersion($newVersion); | ||
| return null; | ||
| }); |
There was a problem hiding this comment.
The persistEventAndSnapshot method has the same issue as the persistEvent method. It does not handle the case when an event is being persisted for a new aggregate. Add a check to handle this scenario.
$aggregateId = $event->getAggregateId()->asString();
$newVersion = self::INITIAL_VERSION;
- if (!$event->isCreated() && isset($this->snapshots[$aggregateId])) {
+ if (!$event->isCreated()) {
+ if (!isset($this->snapshots[$aggregateId])) {
+ throw new \RuntimeException('No snapshot found for the given aggregate ID');
+ } elseif ($this->snapshots[$aggregateId]->getVersion() !== $aggregate->getVersion()) {
throw new OptimisticLockException(
'Transaction write was canceled due to conditional check failure'
);
}
$newVersion = $this->snapshots[$aggregateId]->getVersion() + 1;
}| public function withKeepSnapshot(bool $keepSnapshot): EventStore { | ||
| return $this; | ||
| } |
There was a problem hiding this comment.
The withKeepSnapshot method is a no-op and does not change any state. This could lead to confusion for users of this class who expect it to have an effect. If the in-memory event store does not support this feature, consider throwing an exception or logging a warning message.
- public function withKeepSnapshot(bool $keepSnapshot): EventStore {
- return $this;
- }
+ public function withKeepSnapshot(bool $keepSnapshot): EventStore {
+ throw new \RuntimeException('In-memory event store does not support keeping snapshots');
+ }| public function withDeleteTtl(int $deleteTtlInMillSec): EventStore { | ||
| return $this; | ||
| } |
There was a problem hiding this comment.
Similar to the previous comment, the withDeleteTtl method is a no-op and does not change any state. Consider throwing an exception or logging a warning message.
- public function withDeleteTtl(int $deleteTtlInMillSec): EventStore {
- return $this;
- }
+ public function withDeleteTtl(int $deleteTtlInMillSec): EventStore {
+ throw new \RuntimeException('In-memory event store does not support setting TTL');
+ }| public function withKeepSnapshotCount(int $keepSnapshotCount): EventStore { | ||
| return $this; | ||
| } |
There was a problem hiding this comment.
Again, the withKeepSnapshotCount method is a no-op and does not change any state. Consider throwing an exception or logging a warning message.
- public function withKeepSnapshotCount(int $keepSnapshotCount): EventStore {
- return $this;
- }
+ public function withKeepSnapshotCount(int $keepSnapshotCount): EventStore {
+ throw new \RuntimeException('In-memory event store does not support keeping snapshot count');
+ }| public function withKeyResolver(KeyResolver $keyResolver): EventStore { | ||
| return $this; | ||
| } |
There was a problem hiding this comment.
The withKeyResolver method is a no-op and does not change any state. Consider throwing an exception or logging a warning message.
- public function withKeyResolver(KeyResolver $keyResolver): EventStore {
- return $this;
- }
+ public function withKeyResolver(KeyResolver $keyResolver): EventStore {
+ throw new \RuntimeException('In-memory event store does not support key resolver');
+ }| public function withEventSerializer(EventSerializer $eventSerializer): EventStore { | ||
| return $this; | ||
| } |
There was a problem hiding this comment.
The withEventSerializer method is a no-op and does not change any state. Consider throwing an exception or logging a warning message.
- public function withEventSerializer(EventSerializer $eventSerializer): EventStore {
- return $this;
- }
+ public function withEventSerializer(EventSerializer $eventSerializer): EventStore {
+ throw new \RuntimeException('In-memory event store does not support event serializer');
+ }| public function withSnapshotSerializer(SnapshotSerializer $snapshotSerializer): EventStore { | ||
| return $this; | ||
| } |
There was a problem hiding this comment.
The withSnapshotSerializer method is a no-op and does not change any state. Consider throwing an exception or logging a warning message.
- public function withSnapshotSerializer(SnapshotSerializer $snapshotSerializer): EventStore {
- return $this;
- }
+ public function withSnapshotSerializer(SnapshotSerializer $snapshotSerializer): EventStore {
+ throw new \RuntimeException('In-memory event store does not support snapshot serializer');
+ }
Summary by CodeRabbit
EventStoreFactoryclass,createInMemory()andcreateInMemoryAsync(), allowing for the creation of in-memory instances ofEventStoreandEventStoreAsync.EventStoreandEventStoreAsyncinterfaces, providing methods for persisting events and snapshots, retrieving the latest snapshot by ID, and getting events by ID since a given sequence number.EventStoreInMemoryTestto verify the persistence of events and snapshots in an in-memory event store.