Processes
Reading
Having a backend at hand from previous chapters like:
def backend : Backend[IO, Account, Event, Rejection, Notification] = ???
Outbox
For consuming outboxed items, you can use backend.outbox
directly, or you can use the provided OutboxConsumer
like the following:
def publisher : Stream[IO, Nothing] = OutboxConsumer(backend){ item =>
// use outboxed item
???
}
OutboxConsumer
will consume outboxed items, run the provided action on each of them, marks all items as read.
It does not handle or recover action failures to simplify error handling on application side.
Note that outbox pattern is meant for atomically publishing external messages, and while it has useful data in it, it is not meant to be the medium for processing messages like a queue; and it's best practice to have exactly one consumer on each outbox
Journal
for reading all journal from the beginning you can use:
def all = backend.journal.readAll
If you are only interested in a single stream, you can use
def singleStream = backend.journal.readStream("interesting-stream")
you can also read before or after a specific sequence number
def allAfter = backend.journal.readAllAfter(100)
def singleBefore = backend.journal.readStreamBefore("interesting-stream" ,100)
Repository
You can read the entire history of a single aggregate root
def history : Stream[IO, AggregateState[Account, Event, Rejection]] = backend.repository.history("interesting-stream")
Tip
Note that when you read from repository, you getAggregateState
, which contains aggregate version on valid streams, or last valid state along with errors and the event that caused this conflict.
It's worth noting that your journal are never gonna become corrupt by conflicting decisions, as they are prevented before being written to the journal by conflict protection implemented in Edomata; however if you change the meaning of events, you can change the meaning of history, and face a conflicting stream history. It is one of the most important assumptions of event sourcing, that history won't change its meaning and is not specific to this library, so you should invest in more compatibility testing when you are doing a migration or huge change.
Or read current state of the write side projection
def current : IO[AggregateState[Account, Event, Rejection]] = backend.repository.get("interesting-stream")
Integrating
You can easily use provided streams and read functionality to implement any complex processes or business workflows.
for example a process manager would be just a possibly stateful stream that handles notifications,
or a new custom read side projections would be just another stream that handle journal events.