Writing an online chat service using event sourcing with Scala and Akka typed actors

Introduction

During a recent duo of team Hackathons at work, I had the opportunity to re-acquaint myself with Akka — a JVM implementation of the actor model which I had briefly dipped my toes in a few years back in a previous job. I hadn’t really managed to come to grips with it beyond thinking “wow, this actor thing is really cool!”. After the hackathon, I had the same feeling, so I decided to give it another go. The domain I chose to implement was an “anonymous chat service”. A basic “conversation” between 2 participants is a pretty simple bounded context to model in a chat service and seems to fit naturally with the message driven nature of the actor model.

Project structure

Something that is surprisingly hard to find solid advice on online is how to structure the files and folders in your project. SBT for Scala will dictate the structure to a certain extent, and Microsoft docs has some good advice on it too. Here’s the structure I went with:

The REST API

Akka ships a HTTP library with it’s own routing DSL that allows you to expose a REST API for your service. A REST service might sound odd — how could a GET request be handled with asynchronous message passing? Not everything in Akka has to be totally asynchronous. Some operations like GET requests are inherently synchronous, and request-response messaging patterns (“ask”) are ideal for this. The underlying operations should still run asynchronously, and you’d need to choose a timeout based on what you consider an acceptable response time.

JSON serialization/deserialization

A nice feature you can leverage in your APIs is implicit JSON serialization and deserialization. Your API will automatically serialize and deserialize your case classes to and from JSON if you provide Spray’s implicit formatting methods within scope of the API routes. For example:

It’s convenient to mix a trait with your formatters into the class containing your routes to bring the formatters into scope:

Setting up a route to handle a POST request is shown below. Because our implicit serializer/deserializer are both in scope, the request is deserialized to a case class, and the response is serialized to JSON format without us having to do anything:

When handling a POST request, we probably don’t need to wait very long to respond to a client. We would probably want to confirm that we have persisted the message somewhere, and then return 200.

The Session Handler actor

First, we ask for a session from the SessionHandler actor, which is started when the application starts:

The SessionHandler could do some validation (such as verifying that the client passed up a valid JWT), and assuming no problems, it returns a message containing a reference to a session actor. This actor is what an authenticated client will use to interact with any conversation features. There is no other way that code can access this actor by mistake, because it is a private class that can only be instantiated by a SessionHandler. Because we can’t do anything unless we have a valid session, we should wait for the Future wrapping the granted session to complete.

You might have noticed that if this postMessage fails, we still return 200. We could use ask again to ensure we don’t respond unless the message is persisted, but we cover that later for the GET API, so in this case we’ll just assume the message will be successfully persisted and use the ! (“tell”) syntax to send a message to the session actors mailbox:

You could imagine that the session behavior contains some important business logic. For our use case, it doesn’t do much other than forward the message to the event store for further handling.

In the case of a GET API endpoint, we ask the session and wait for the returned future to complete. The session will do the same — it will ask the persistence layer for the data it requires, Await completion of the returned future, and forward the data on to the API layer which returns the data to the client.

Timed actors

Using a timer behavior with our session allows us to cache each session in a session handler, so that we won’t need to create a new session each time we receive new requests in quick succession from the same clients.

It’s not really necessary here, because the session handler doesn’t contain state other than the cached sessions (arguably, it should not be an actor at all for that reason. There doesn’t seem to be much point in an actor if it doesn’t hold state. It might just as easily be a collection of methods). It’s more to illustrate a pretty powerful feature. Using a timer, we can (for example) have the session actor send a timeout message to itself after not receiving any message for a fixed interval. The session then sends a message to the session handler to have itself removed from the session cache, frees up any memory allocated for it and then terminates.

Which the session handler actor duly obliges:

The conversation aggregate root

We implement our aggregate root, a conversation, by extending an AbstractBehavior abstract class. Use of behaviors seems to be the primary differentiator between classic and “typed” actors. Actors are spawned with a behavior, and the behavior dictates the messaging protocol that the actor understands. An actor should probably serve a specific role, which is determined by what messages it can process. We can achieve that by creating a sealed trait from which all messages a given actor can handle inherit. We can get around that by wrapping arbitrary messages in a message that a behavior can handle, but we are still being very explicit about the purpose of that actor. Extending AbstractBehavior allows us to package a piece of business logic and supporting methods into a cohesive unit.

To extend an AbstractBehavior, you need to specify the type of message the behavior can handle and override the onMessage method, which takes that type (or some subclass of that type) as a parameter and returns the next Behavior (again, of the same type) after handling some message. So, you can have the actor change its behavior after receiving a certain message type. The simplest example of that is to have an actor stop (using Behaviors.stopped after handling a message).

After processing a message, we return the idle() function which returns this. That means after processing a message, return the behaviour defined in this class, and restart the timer (described earlier — used for timing out sessions). In the case of PostMessage, the actor fires a message to the persistent event sourced actor. In the case of GetMessage, we make a request to the same actor, but there we wait for a response using Future.onComplete.

Event sourcing with Akka

If you’re not familiar with event sourcing, this documentation is a good place to start. I think you’ll have a pretty good idea of what it’s all about after reading it. Akka supports event sourcing using “Akka Persistence”. Using an EventSourcedBehavior actor behavior, you can specify how to react to commands and how to emit domain events upon handling a command. For quick iteration, you can use the levelDB plugin which handles event persistence for you. Interestingly, there are Cassandra, Couchbase and JDBC plugins available. The levelDB plugin will automatically serialize your events and write them to an on-disk journal file.

In your command handler, you’ll decide how to handle events — for example, for write events, you’ll probably want to persist those. That makes sense — you can use those persisted events to recreate state if you need to recover from a failure:

The result

Let’s run the application and see the result Let’s call the API in a loop:

The response:

{“message”:”Hello user 1 from user 0!”}{“message”:”Hello user 0 from user 1!”}{“message”:”Hello user 1 from user 0!”}{“message”:”Hello user 0 from user 1!”}{“message”:”Hello user 1 from user 0!”}

Logging shows us the sequence of operations:

2021–06–24 22:13:37,556 INFO - user 1 posted to conversation 123: Hello user 1 from user 0!
2021–06–24 22:13:37,556 INFO - Granted user 1 a session.
2021–06–24 22:13:37,557 INFO - Received a command: PostMessage(conversationID=123, sender=1, message=Hello user 1 from user 0!)
2021–06–24 22:13:37,557 INFO - Publishing domain event: MessagePosted(conversationID=123, sender=1, message=Hello user 1 from user 0!)
2021–06–24 22:18:37,570 INFO - Session timed out for conversation 123

The more verbose file logging allows us to follow the flow of messages between the actors. The interesting part is that after you restart the application, you’ll see A lot of (pretty verbose) logs containing messages like “Replaying events: from: 1, to: 9223372036854775807” and “Recovery successful, recovered until sequenceNr: [5]”. The event sourced actor recreated it’s state automatically. Making a GET history request will return the full history from the event-sourced actor’s in-memory ConversationInMemoryState for every subsequent request. It’s not hard to imagine some useful applications for that behavior.

Full source code: https://github.com/amencke/AnonChat

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store