User Tools

Site Tools


technology:es

Event Sourcing

In Event Sourcing wird der Zustand der Anwendung darüber gespeichert, indem die Ereignisse, die in der Domäne passiert sind, gespeichert werden. Dafür werden sinnvolle und sprechende Ereignisse definiert, und zwar in der Sprache der Domäne. Wenn man die INSERT, UPDATE und DELETE Befehle in der Datenbank loggt, hat man zwar pro Tabelle1) eine Historie über das, was passiert ist, aber man weiß nicht warum. Ein oft genanntes Beispiel ist ein Adresswechsel. Im UPDATE Befehl steht nicht drin, warum die Adresse geändert wurde. Umzug oder Korrektur? Wenn man INSERT, UPDATE und DELETE Befehle speichert, wendet man die Sprache der Datenbankdomäne an, nicht aber der Domäne, die man abbilden möchte.

Ein Adresswechsel einer Person könnte durch AddressCorrected und PersonMoved dargestellt werden. Beide Ereignisse haben zur Folge, dass die Anschrift der Person geändert wird. Man erkennt aber aus den Ereignissen nicht nur was tatsächlich passiert ist, sondern auch warum es passiert ist - zumindest so detailliert wie es die Domäne verlangt2).

Wenn ein Aggregate seinen Zustand durch einen Methodenaufruf ändert, wird die Änderung durch den Methodenaufruf ApplyChange(DomainEvent @event) fest gehalten. Das Aggregate hält seinen Zustand also nicht über eine Sicht auf die aktuellen Eigenschaften, sondern darüber, was alles passiert ist. So wird der Zustand auch gespeichert - die Ereignisse werden in ein EventStore geschrieben. Wenn das Aggregate wieder geladen wird, werden ihm alle seine historischen Ereignisse übergeben, aus denen es sich seinen aktuellen Zustand selbst errechnen kann.

Ein Ereignis hat immer einen Typ der von DomainEvent abgeleitet ist. Zudem hat es die ID des Aggregates zu dem es gehört, und einen laufenden, lückenlosen Zähler3), der die Reihenfolge darstellt4). Konflikte durch gleichzeitigen Zugriff sind einfach zu erkennen. Wenn zwei Prozesse ein Aggregate mit der aktuellen Version = N laden, und beide neue Ereignisse anfügen wollen, würden beide z.B. ein Ereignis mit der Version N+1 anfügen wollen. Die Speicheraktion des zweiten Prozesses wird fehl schlagen, weil auf die ID+Version ein eindeutiger Index eingestellt ist.

Die Wahrheit steckt also in den Ereignissen. Auch das ist ein oft genanntes Argument für Event Sourcing. Man hat nicht nur automatisch ein Log, das Log stimmt mit 100%er Sicherheit mit dem überein, was in der Domäne aktuell los ist. Das aus dem einfachen Grund, weil der aktuelle Zustand sich gerade aus diesem Log errechnet. Es gibt nicht wie üblich einen gespeicherten Zustand und ein separates Log. Da kann es schnell passieren, dass die Beiden auseinander laufen - vielleicht aus dem einfachen Grund dass eine Änderung an einer Eigenschaft nicht mit geloggt wurde. Was dann? Wer hat Recht? Das ist nicht auflösbar. Wenn es in den Ereignissen des Event Sourcing Fehler gibt, dann gibt es sie im aktuellen Zustand und im Log gleichermaßen.

Wenn es zu viele Ereignisse gibt, so dass das Laden eines Aggregates über die historischen Ereignisse zu aufwendig wird, kann man Snapshots einführen. Ein Snapshotter läuft parallel, greift die Ereignisse ab, und macht daraus einen Zustand. Wenn man z.B. die Authentifizierung mit Event Sourcing umsetzt, könnten die letzten 1.000 Loginversuche zu einem Zustand mit { ID = 555, Version = 1.025, Fehlversuche = 2, … } zusammengeführt werden. Wenn das Aggregate nun geladen wird, wird das Snapshot geladen. Dann werden dazu alle Ereignisse mit einer Version > 1.025 dazu geladen5) und dem Aggregate mitgeteilt. Sollte der Snapshotter fehlerhaft sein, muss der Fehler korrigiert werden. Die Snapshots werden einfach verworfen, und er fängt für alle Aggregates mit der Version 1 an. Das System merkt davon nichts, außer, dass die Aggregates anfangs langsamer geladen werden, weil es noch keine aktuellen Snapshots gibt. Es muss aber nichts korrigiert werden, weil die Ereignisse die einzige Quelle der Wahrheit sind. Fehler können immer passieren, das ändert aber nichts an der Tatsache, dass alles, was in den Ereignissen steht, offiziell auch so passiert ist.

Das bedeutet auch, dass das Aggregate, wenn es seinen Zustand aus den historischen Ereignissen herstellt, keinen Fehler werfen kann. Die Ereignisse sind passiert. Wenn sie aus einem Fehlerhaften Grund passiert sind, und der Code für das Aggregate so korrigiert wurde, dass dieser Fehler nicht mehr passieren kann, dann bedeutet das, dass dieser Fehler in Zukunft nicht mehr passieren kann, nicht aber, dass die vergangenen Fehler korrigiert sind. Das Aggregate muss somit weiter machen, auch wenn die historischen Ereignisse keinen Sinn ergeben. Wie, das muss der Entwickler, zusammen mit dem Domänenexperten entscheiden. Zudem kann man den Fehler kompensieren, in dem mann alle Aggregates, die einen fehlerhaften Zustand haben, über einen entsprechenden Methodenaufruf korrigiert. Das Aggregate hat dann Ereignisse, die es fehlerhaft machen, plus weitere Ereignisse, die es wieder in einen nicht-fehlerhaften Zustand versetzen.

Ein Aggregate hat auch einen aktuellen Zustand der dem Anwender angezeigt werden soll. Wenn ich eine Mitarbeiterliste aufrufe, werde ich nicht alle Ereignisse aller Mitarbeiter laden um die Liste anzuzeigen. Das funktioniert ähnlich wie die Snapshots. Es gibt Event Handler, die die veröffentlichte Ereignisse abfangen und daraus denormalisierte Listen erstellen. Man kann für jede UI Sicht eine eigene Liste bauen, d.h. man muss nicht jede Sicht aus Tabellen und JOINS bauen. Man kann die Ereignisse so weiter verarbeiten wie es einem beliebt, so dass auch die kompliziertesten Sichten als einfache, flache, schnell abrufbare Listen aufrufbar sind. Diese Listen, die durch sogenannten Denormalisierungen erzeugt werden, nennt man in der Regel Projektionen, da sie die Ereignisse, die in der Domäne passiert sind, projezieren.

Eventual Consistency

Die Denormalisierer, die die Projektionen bauen, greifen veröffentlichte und gespeicherte Ereignisse ab, und erstellen daraus Listen. Es kann beliebig viele solcher Denormalisierer geben. Manch komplexe Liste kann aufwendig zu bauen sein. Damit das Speichern von Ereignissen nicht durch die Denormalisierer verlangsamt wird, werden erst die Ereignisse gespeichert, und dann in einem anderen Prozess6) von den Event Handlern denormalisiert.

Das bedeutet nun, dass die denormalisierten Listen nicht transaktional mit den Ereignissen gespeichert werden. Wenn ich die Adresse einer Person ändere, und die Aktion erfolgreich durchgeführt wurde, kann es sein, dass die Adressliste, die ich gleich danach aufrufe, noch die alte Adresse anzeigt, weil die neue noch nicht vom Event Handler behandelt wurde. Die Änderung wird schlussendlich in der Liste landen, weil sie ja passiert ist, nur ist nicht klar, wie lange das dauert7). Das ganze läuft unter dem Namen eventual consistency8).

Verantwortungen beim Event Sourcing

Nach dem SRP9) sollte eine Klasse genau eine Aufgabe haben. Abgeleitet davon wird es oft so definiert, dass es nur genau einen Grund geben sollte, warum man die Klasse ändern müsste.

Um dem SRP nahe zu kommen, sollten Aggregates klein gehalten werden, zumindest die darin enthaltenen Klassen. Welche Aufgaben hat ein Aggregate? U.a. die folgenden:

  • Businesslogik einhalten und Domänenereignisse erzeugen

Wenn ein Aggregate von einem Repository wiederhergestellt wird, hat es zudem noch einige Infrastruktur bezogenen Aufgaben. Was aber ist mit den Ereignissen, die es erzeugt? Muss das Aggregate darauf achten, dass

  • die Ereignisse serialisierbar sind?
  • wichtige Eigenschaften in den Ereignissen sicher10) gespeichert werden sollen?

Als Aggregate möchte ich z.B. das Ereignis PasswordChanged(“meinpasswort”) erzeugen. Es ist nicht meine Aufgabe, dass Passwort zu verschlüsseln. Oder ich möchte mitteilen, dass meine Authentifizierungsstrategie über AuthenticationStrategySet(new KerberosAuthentication(“rta”, “meinpasswort”)) gesetzt wurde. Wie die KerberosAuthentication Strategie gespeichert wird, und dass das Kennwort dazu verschlüsselt wird, ist nicht die Aufgabe des Aggregates, sondern des Speichermechanismus.

Ich sehe hier zwei unterschiedliche Verantwortlichkeiten. Wenn der Ereignisstrom11) verschlüsselt werden soll, dann kann das vor dem speichern des JSON serialisierten Ereignisses werden sollen, und nach dem laden können die Anwendungen mit dem entsprechenden Schlüssel die Ereignisse wieder entschlüsseln. So kann alles verschlüsselt gespeichert werden12).

Zudem müssen die Daten in den Ereignissen, die vom Aggregate erzeugt werden, serialisierbar sein, und einige sind schützenswert, unabhängig davon, ob die Ereignisse komplett verschlüsselt werden oder nicht. Korrekt wäre es also, dass die Domäne Ereignisse erzeugt, diese dann von einem Adapter in andere, serialisierbare Ereignisse gewandelt werden, die dann gespeichert werden. Ein weiterer Adapter ändert die Ereignisse in solche, die auch serialisierbar und veröffentlichbar sind.

Es wurde von einigen berichtet, dass eine Trennung in Domain Events und Published Events versucht wurde, das Ganze aber ziemlich gegen die Wand gelaufen ist. In der Klassischen Datenbankanwendung gibt es diese Trennung ja auch nicht. Trotzdem sehe ich keinen anderen Weg als eine Trennung in DomainEvents und, nennen wir sie mal für jetzt AggregateEvents. Die Domäne erzeugt und konsumiert AggregateEvents. Nach Außen13) werden diese aber in DomainEvents gewandelt. Wenn das Aggregate nach seinen Ereignissen gefragt wird, werden diese in DomainEvents gemappt14). Dabei können Passworte verschlüsselt werden, Strategien in StrategyDTO Objekte15) gewandelt werden, usw. Alle Handler dieser Ereignisse können damit was anfangen, sie sind ja extra dafür gebaut.

Und wenn das Repository die DomainEvents lädt und zurück an das Aggregate übergibt, werden diese vorher zurück in AggregateEvents gewandelt. Klar ist das mehr Arbeit, aber mit AutoMapper und copy/paste ist der zusätzliche Tippaufwand vernachlässigbar im Vergleich zu der Zeit, die man für ein vernünftiges Design und den Bau der Geschäftslogik aufwendet. Zudem kann die Domäne nun völlig befreit von irgendwelchen Speicheraufgaben bleiben, muss nicht darauf achten, serialisierbare Ereignisse zu erzeugen, kann völlig Objektorientiert Strategien als Eigenschaften von Ereignissen setzen, usw. Vor allem sind die Strategien16) über diesen Weg erweiterbar, das Open-Closed Prinzip wird dadurch untermauert.

Teilt man die Ereignisse nicht derart, wird man an diversen Stellen Hacks bauen müssen, damit es klappt. Das mag anfangs pragmatisch wirken, wird aber schnell dazu führen, dass die Summe der Hacks aufwendiger wird, als einfach Ereignisse in zwei Formen zu bauen. Zudem wird die Nachvollziehbarkeit erschwert, weil man jeden Hack verstehen muss. Die Zweiteilung der Ereignisse muss man nur einmal verstehen.

Aggregate Events

Angenommen es gibt für ein Konto eine Authentifizierungsstrategie. Diese ist erweiterbar. Für die DomainEvents braucht man für die Strategie eine Repräsentation

[DataContract]
public abstract class AuthenticationStrategyRepresentation { }

Eine Erweiterung kann nun eine konkrete Repräsentation hinzufügen17)

[DataContract]
public abstract class UserNameAuthenticationStrategyRepresentation: AuthenticationStrategyRepresentation { }

Das entsprechende DomainEvent, über das eine Änderung der Strategie mitgeteilt wird, sieht so aus

[DataContract]
public class AuthenticationStrategySet: DomainEvent
{
    [DataMember]
    public AuthenticationStrategyRepresentation Strategy { get; private set; }
 
    public AuthenticationStrategySet(AuthenticationStrategyRepresentation strategy)
    {
        Strategy = strategy;
    }
}

In der Domäne sieht die Entsprechnung wie folgt aus

public abstract class AuthenticationStrategy
{
    public Boolean IsAuthenticated { get; private set; }
    public abstract void Authenticate(Credentials credentials); // hier wird u.a. IsAuthenticated gesetzt
 
    public AuthenticationStrategy()
    {
      IsAuthenticated = false;
    }
}

Eine Erweiterung kann nun eine konkrete Repräsentation hinzufügen

Ahhh, klappt so nicht. wie soll die Strategy vom Command Handler erzeugt werden und an das Konto übergeben werden??????? So ein Mist

public abstract class UserNameAuthenticationStrategy: AuthenticationStrategy
{
    public Account Owner { get; private set; }
    public string UserName { get; private set; }
    public string Password { get; private set; }
 
    public UserNameAuthenticationStrategy(Account owner, string userName, string password)
      : base()
    {
        Owner = owner; // hierüber kann z.B. Owner.ApplyChange(AggregateEvent @event) aufgerufen werden, um Ereignisse zum Aggregate zu erzeugen
        Owner.ApplyChange(new UserNameAuthenticationStrategyCredentialsSet(userName, password); // immer daran denken, dass jede Änderung des Zustands über Ereignisse gemacht werden muss!
    }
    public override Authenticate(Credentials credentials)
    {        
        var c = credentials as UserNameCredentials;
        // prüfen ob gesperrt, inaktiv, usw.
        if (c == null) throw new IllegalArgumentException();
        IsAuthenticated = ((c.UserName == UserName) && (c.Password == Password));
        if (!IsAuthenticated) Owner.ApplyChange(new AccountCredentialsRejected()); // + Zähler hochsetzen und Konto eventuell sperren
        else (Owner.ApplyChange(new AccountCredentialsAccepted()); // + Zähler auf 0 setzen
    }
}

Das entsprechende AggregateEvent, über das eine Änderung der Strategie mitgeteilt wird, sieht so aus

public class AuthenticationStrategySet: AggregateEvent
{
    public AuthenticationStrategy Strategy { get; private set; }
 
    public AuthenticationStrategySet(AuthenticationStrategy strategy)
    {
        Strategy = strategy;
    }
}

Im Konto passiert nun beim Setzen der Strategie folgendes

public abstract class Account: AggregateRoot
{
    public AuthenticationStrategy Strategy { get; private set; }
 
    public void SetAuthenticationStrategy(AuthenticationStrategy strategy)
    {
        ApplyChange(new AuthenticationStrategySet(strategy));
    }
}
1) die in etwa einem Business Objekt entspricht
2) Warum die Person umgezogen ist könnte z.B. auch noch in das PersonMoved Ereignis mit aufgenommen werden, falls das für die Domäne relevant sein sollte
3) die Version des Aggregates
4) die Reihenfolge gilt immer nur pro Aggregate
5) der Snapshotter läuft in einem eigenen Prozess, d.h. wenn ein Aggregate geladen wird, hat der Snaphshotter eventuell noch nicht die neuesten Ereignisse abgearbeitet
6) oder Thread
7) i.d.R. ein paar Millisekunden, wenn der Denormalisierer aber gerade nicht am Start ist, vielleicht deutlich länger
8) schlussendliche Konsistenz
9) Single Responsibility Principal
10) verschlüsselt
11) Event Stream
12) auch denormalisierte Sichten könnten verschlüsselt werden, wobei die Abfragen darauf schwierig werden
13) Speicherung und veröffentlichung an die Handler
14) Hierfür ist nun AutoMapper super, weil in 99,9% der Fälle das eine 1-zu-1 Zuordnung sein wird
15) besser StrategyRepresentation
16) und auch Rollen, Status, und andere erweiterbare Aspekte
17) Ursprünglich dachte ich an folgendes - aber das geht nicht, weil die Strategie erst nach allen Aktionen in die Repräsentation gewandelt wird, und der UserName und das Passwort inzwischen verändert werden können. D.h., dass Objekt Repräsentationen Parameterlos gespeichert werden müssen, also auch Parameterlose Konstruktore haben müssen. Folgendes geht also nicht:
[DataContract]
public abstract class UserNameAuthenticationStrategyRepresentation: AuthenticationStrategyRepresentation
{
   [DataMember]
    public string UserName { get; private set; }
    [DataMember]
    public string CryptedPassword { get; private set; }
    [DataMember]
    public string IV { get; private set; }
 
    public UserNameAuthenticationStrategyRepresentation(string userName, string cryptedPassword, string iv)
    {
        UserName = userName;
        CryptedPassword = cryptedPassword;
        IV = iv;
    }
}
technology/es.txt · Last modified: 2013/01/15 15:09 by rtavassoli