Skip to content

Commit 21b8c2d

Browse files
authored
Merge pull request #509 from suhailhijry/main
Event replay optimizations
2 parents aae3b13 + 3fe4e80 commit 21b8c2d

File tree

4 files changed

+35
-11
lines changed

4 files changed

+35
-11
lines changed

src/Console/ReplayCommand.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Illuminate\Console\Command;
77
use Illuminate\Support\Collection;
88
use Spatie\EventSourcing\Projectionist;
9+
use Spatie\EventSourcing\EventHandlers\Projectors\Projector;
910
use Spatie\EventSourcing\StoredEvents\Repositories\StoredEventRepository;
1011

1112
class ReplayCommand extends Command
@@ -67,7 +68,8 @@ public function selectProjectors(array $projectorClassNames): ?Collection
6768
public function replay(Collection $projectors, int $startingFrom, ?string $aggregateUuid = null): void
6869
{
6970
$repository = app(StoredEventRepository::class);
70-
$replayCount = $repository->countAllStartingFrom($startingFrom, $aggregateUuid);
71+
$events = collect($projectors->toArray())->map(fn(Projector $projector) => $projector->getEventHandlingMethods()->keys())->flatten()->toArray();
72+
$replayCount = $repository->countAllStartingFrom($startingFrom, $aggregateUuid, $events);
7173

7274
if ($replayCount === 0) {
7375
$this->warn('There are no events to replay');

src/Projectionist.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,7 @@ public function replay(
322322
?callable $onEventReplayed = null,
323323
?string $aggregateUuid = null
324324
): void {
325+
$events = collect($projectors->toArray())->map(fn(Projector $projector) => $projector->getEventHandlingMethods()->keys())->flatten()->toArray();
325326
$projectors = (new EventHandlerCollection($projectors))
326327
->sortBy(fn (EventHandler $eventHandler) => $eventHandler->getWeight(null));
327328

@@ -340,8 +341,7 @@ public function replay(
340341
$projectors->call('onStartingEventReplay');
341342

342343
app(StoredEventRepository::class)
343-
->retrieveAllStartingFrom($startingFromEventId, $aggregateUuid)
344-
->each(function (StoredEvent $storedEvent) use ($projectors, $onEventReplayed) {
344+
->runForAllStartingFrom($startingFromEventId, function (StoredEvent $storedEvent) use ($projectors, $onEventReplayed) {
345345
$this->applyStoredEventToProjectors(
346346
$storedEvent,
347347
$projectors->forEvent($storedEvent)
@@ -350,7 +350,7 @@ public function replay(
350350
if ($onEventReplayed) {
351351
$onEventReplayed($storedEvent);
352352
}
353-
});
353+
}, 1000, $aggregateUuid, $events);
354354

355355
$this->isReplaying = false;
356356

src/StoredEvents/Repositories/EloquentStoredEventRepository.php

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Carbon\Carbon;
66
use Illuminate\Database\Eloquent\Builder;
77
use Illuminate\Support\LazyCollection;
8+
use Illuminate\Support\Collection;
89
use ReflectionClass;
910
use ReflectionException;
1011
use Spatie\EventSourcing\AggregateRoots\Exceptions\InvalidEloquentStoredEventModel;
@@ -49,9 +50,9 @@ public function retrieveAll(?string $uuid = null): LazyCollection
4950
return $query->orderBy('id')->cursor()->map(fn (EloquentStoredEvent $storedEvent) => $storedEvent->toStoredEvent());
5051
}
5152

52-
public function retrieveAllStartingFrom(int $startingFrom, ?string $uuid = null): LazyCollection
53+
public function retrieveAllStartingFrom(int $startingFrom, ?string $uuid = null, array $events = []): LazyCollection
5354
{
54-
$query = $this->prepareEventModelQuery($startingFrom, $uuid);
55+
$query = $this->prepareEventModelQuery($startingFrom, $uuid, $events);
5556

5657
/** @var LazyCollection $lazyCollection */
5758
$lazyCollection = $query
@@ -61,9 +62,24 @@ public function retrieveAllStartingFrom(int $startingFrom, ?string $uuid = null)
6162
return $lazyCollection->map(fn (EloquentStoredEvent $storedEvent) => $storedEvent->toStoredEvent());
6263
}
6364

64-
public function countAllStartingFrom(int $startingFrom, ?string $uuid = null): int
65+
66+
public function runForAllStartingFrom(int $startingFrom, callable|\Closure $function, int $chunkSize = 1000, ?string $uuid = null, array $events = []): bool {
67+
$query = $this->prepareEventModelQuery($startingFrom, $uuid, $events);
68+
69+
$query = $query
70+
->orderBy('id');
71+
72+
return $query->chunk($chunkSize, function (Collection $events) use ($function) {
73+
foreach ($events as $event) {
74+
$storedEVent = $event->toStoredEvent();
75+
$function($storedEVent);
76+
}
77+
});
78+
}
79+
80+
public function countAllStartingFrom(int $startingFrom, ?string $uuid = null, array $events = []): int
6581
{
66-
return $this->prepareEventModelQuery($startingFrom, $uuid)->count('id');
82+
return $this->prepareEventModelQuery($startingFrom, $uuid, $events)->count('id');
6783
}
6884

6985
public function retrieveAllAfterVersion(int $aggregateVersion, string $aggregateUuid): LazyCollection
@@ -171,14 +187,18 @@ public function getLatestAggregateVersion(string $aggregateUuid): int
171187
->max('aggregate_version') ?? 0;
172188
}
173189

174-
private function prepareEventModelQuery(int $startingFrom, ?string $uuid = null): Builder
190+
private function prepareEventModelQuery(int $startingFrom, ?string $uuid = null, array $events = []): Builder
175191
{
176192
$query = $this->getQuery()->startingFrom($startingFrom);
177193

178194
if ($uuid) {
179195
$query->whereAggregateRoot($uuid);
180196
}
181197

198+
if (!empty($events)) {
199+
$query->whereEvent(...$events);
200+
}
201+
182202
return $query;
183203
}
184204

src/StoredEvents/Repositories/StoredEventRepository.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@ public function find(int $id): StoredEvent;
1212

1313
public function retrieveAll(?string $uuid = null): LazyCollection;
1414

15-
public function retrieveAllStartingFrom(int $startingFrom, ?string $uuid = null): LazyCollection;
15+
public function retrieveAllStartingFrom(int $startingFrom, ?string $uuid = null, array $events = []): LazyCollection;
16+
17+
public function runForAllStartingFrom(int $startingFrom, callable|\Closure $function, int $chunkSize = 1000, ?string $uuid = null, array $events = []): bool;
1618

1719
public function retrieveAllAfterVersion(int $aggregateVersion, string $aggregateUuid): LazyCollection;
1820

19-
public function countAllStartingFrom(int $startingFrom, ?string $uuid = null): int;
21+
public function countAllStartingFrom(int $startingFrom, ?string $uuid = null, array $events = []): int;
2022

2123
public function persist(ShouldBeStored $event, ?string $uuid = null): StoredEvent;
2224

0 commit comments

Comments
 (0)