Vertx und RXJava2 und der Abschied meines Verstands
Heute hab ich mich mal wieder an mein kleines Projekt geschmissen, das mit Vertx und sowas. Ich wollte da jetzt ein paar Tests einbauen und weil das einen Vertx Httpserver beinhaltet, dachte ich mir ich mache den Client im Test auch mit Vertx.
Weil ich die Observables vor RXJava besser finde als andauernd irgendwo meine Callbacks mitzugeben, habe ich die RXJava2 von Vertx mit dabei. Ich hätte gedacht, dass es sich dann schön einfach zusammensetzen lässt, aber ich wurde eines besseren belehrt. Wenn es so gelaufen wäre, wie ich es mir gewünscht hätte, dann hätte ich im Test an jeder asynchronen Stelle einfach blockieren können. Im Grunde hätte mein Test so ausgesehen:
val response: HttpClientResponse = client.get("localhost", "/test")
.toObservable()
.blockingSingle()
// then
response.statusCode() `should be equal to` 200
val body:Buffer = response.toObservable().blockingSingle()
body.toString `should be equal to` "ok"
Aber so schön und einfach ist es leider nicht. Als ich den Test erstmals ausführte lief der endlos. Und da der Endpoint im Servercode noch eine NotImplementedException
werfen würde, konnte ich recht
sicher sein, dass der Call nicht im Servercode ankommt, also auf zur Fehlersuche. Jetzt gibt es bei der Geschichte natürlich ein bescheidenes Problem: Asynchrone Aufrufe sind beschissen zu debuggen.
Immerhin konnte ich schonmal mit dem Debugger sicherstellen, dass er tatsächlich an dem blockingSingle
fest hing. Warum da aber keine Verbindung zum Server gemacht wurde, war mir schleierhaft, weil
bei jeder mir bekannten Nutzung der Observables würde an dieser Stelle der Call gemacht.
Also habe ich mir mal angeguckt, wie der Client standardmäßig ohne RXJava funktionieren würde:
client.get("localhost", "/test")
.handler {
// do something
}
.end()
Den handler
Part will ich ja insbesondere nicht, also hatte ich den erstmal gelöscht und nur mit end
den Call ausgeführt. Meine destruktive Handlung den handler
rauszunehmen hat Vertx mir dann
auch gleich böse genommen: “java.lang.IllegalStateException: You must set an handler for the HttpClientResponse before connecting”. Ich wollte das alles ja eigentlich in einem Observable haben, dass
dann blockiert, damit ich nicht über irgendwelche Futures oder sonstwas meinen Test synchronisieren muss.
var request: HttpClientRequest = client.get("localhost", "/test")
val response: Observable<HttpClientResponse> = request.toObservable()
request.end()
response.blockingSingle()
Das war dann der nächste Versuch. In der Hoffnung, dass das toObservable
bereits als Handler gilt. Aber nein, das ist Vertx noch nicht Handler genug. Wenn ich auf das Observable ein subscribe
mache, dann funktioniert es. Aber es ist nichts gegenüber dem üblichen Handler gewonnen.
Aber auf Observables kann man sich ja mehrmals subscriben, also habe ich ein subscribe
gemacht und nach dem end
nochmal mit blockingSingle
gewartet. Echt unschön, aber wenn es funktioniert. Oder
funktionieren würde, denn dieses Mal bekam ich die nette Fehlermeldung “Request already complete”. Es gibt also erstmal keine direkte Möglichkeit, auf die Antwort zu warten. Also bleibt nur die Wahl
zwischen Futures im Handler-Block oder ein Publisher im subscribe. Beide Varianten geben sich nicht viel.
Future:
val future: CompletableFuture<HttpClientResponse> = CompletableFuture()
request
.exceptionHandler { future.completeExceptionally(it) }
.handler { future.complete(it) }
request.end()
val response: HttpClientResponse = future.get()
Publisher:
val publisher: PublishProcessor<HttpClientResponse> = PublishProcessor.create()
request.toObservable().subscribe(publisher::onNext,
publisher::onError,
publisher::onComplete)
request.end()
val response: HttpClientResponse = publisher.blockingSingle()
Aber weil ich beschlossen habe im generellen eher die RXJava Sachen zu nutzen, werde ich bei der zweiten Variante bleiben. Ich denke, ich werde das auch gleich noch als Issue bei Vertx posten, weil ich mit Sicherheit nicht der Einzige bin, den das stört.
Jetzt habe ich meinen Test ausgeführt nachdem ich parallel diesen Post geschrieben habe, das erste erwartete Ergebnis tritt ein: Der Status-Check schlägt fehl, weil ich ja im Servercode noch nichts gemacht hatte. Also denke ich mir, der Rest ist jetzt straight forward, update den Blog und mach dann weiter. Während der Blog noch baut habe ich die Stelle im Servercode erstmal notdürftig gebaut, nur um zu testen. Und der Test hängt fest.
Offensichtlich klappt das Laden des ResponseBody doch nicht so einfach mit Observables. Zum Test habe ich auch mit dem normalen Browser einmal die Seite geladen und da klappt es. Also was zur Hölle ist jetzt nicht richtig?
Es ist das gleiche Spiel. Nur muss man nach dem bodyHandler
oder subscribe
keine weitere Methode aufrufen, wie es zuvor mit dem end
war. Aber obwohl das subscribe
das Laden des Inhalts
erledigt. Oder ist es wirklich so einfach? Natürlich nicht. Ich also nochmal den Code hin und her ändern, um zu gucken, wie man das machen kann und plötzlich läuft es mit einer blocking Methode.
Stellt sich heraus, dass blockingSingle
nicht funktioniert aber blockingFirst
läuft. Für mich ergibt das nicht wirklich Sinn, aber ist nunmal so. Natürlich denke ich mir dann, dass das bei dem
obrigen Code auch klappt… Ne. Da bleibt der Umweg über den Publisher. So, jetzt bin ich aber fertig. Einmal Test schön machen und ausführen und er hängt wieder. Sobald ich vor dem Body laden den
Status-Check einbaue, hängt der beim Body laden. Ich weiß nicht, was ich dazu sagen soll.
Wieder einen neuen Publisher zu machen fand ich eine blöde Lösung, aber Observables können ja noch ein bisschen mehr. Mit flatMap
überprüfe ich jetzt erstmal den Status der Antwort und wenn der OK
ist, dann hol ich mir das Observable für den Body. Es ist ein wenig unschön, aber es klappt ohne mehrere Futures oder Publisher oder sonstige Konstrukte:
// when
val request = client.get("localhost", "/test")
// then
val publisher: PublishProcessor<Buffer> = PublishProcessor.create()
request.toObservable()
.flatMap {
it.statusCode() `should be equal to` 200
it.toObservable()
}
.subscribe(publisher::onNext,
publisher::onError,
publisher::onComplete)
request.end()
val buffer = publisher.blockingFirst()
buffer.toString `should be equal to` "ok"
Der Code ist echt nicht das Gelbe vom Ei, aber er tut jetzt, was er soll und das sollte mir reichen. Aber weil es so schön ist, gibt es doch noch einen Nachschlag!
Weil ich beim nächsten Test - in dem ich ein Json verschicken will - gemerkt habe, dass es einen Vertx HTTP Client und einen Vertx Web Client gibt und ich bisher den ersteren genutzt habe, schaute ich mir mal den zweiten an, welcher a) schöne weitere Features hat (wie zum Beispiel automatischen Json parsen) b) RXJava support eingebaut hat. Und siehe da, es funktioniert!
// when
val response = client.get("localhost", "/test")
.rxSend()
.blockingGet()
// then
response.statusCode() `should be equal to` 200
response.bodyAsString() `should be equal to` "ok"
Genau das habe ich gewollt. Damit kann man arbeiten, damit kann man saubere Tests schreiben ohne viel Hick Hack! Danke!