Thursday, 20 October 2016

consuming scala play Enumerator/Stream


I was working on a startup using scala play app and was unit testing the response from Play Middlerware which is basically returningFuture[SimpleResult] with body as Enumerator[Array[Byte]].
If the response has json a body, Enumerator won't give me the body as flat string, rather a list of chunks. So, Enumerator is a simply a async collection aka Stream which needs to be consumed by someone else( Iteratee).

Stream source


For example, song is a collection of buffered chunks of audio clips.

SBT_SCALA_VERSION=2.10.4 play console

scala> import play.api.libs.iteratee.{Enumerator, Iteratee}
import play.api.libs.iteratee.{Enumerator, Iteratee}

scala> val songStream: Enumerator[String] = Enumerator("first 5 minutes ", "second 5 minutes ", "last 5 minutes")
songStream: play.a.l.i.Enumerator[String] = play.api.libs.i.Enumerator$$anon$19@24142c37

 // Enumerator can also be created using the stream
scala> val songStream: Enumerator[String] = Concurrent.unicast[String](onStart = stream => {
  stream.push("first 5 minutes ")
  stream.push("second 5 minutes ")
  stream.push("last 5 minutes")
})


Stream Consumer - I

To get the song, we first need to consume the Enumerator, and then flatMap the async response.
scala> val consumeSong = songStream(Iteratee.consume[String]())
consumeSong: s.c.Future[p.a.l.i.Iteratee[String,String]] = s.c.i.Promise$DefaultPromise@6854ee8b

flatMap the consumed song which is a Future[X]

scala> import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext.Implicits.global

scala> import scala.concurrent.Future
import scala.concurrent.Future

scala> val songThatCanBeListened :Future[String] = consumeSong.flatMap(chunk => chunk.run)
songThatCanBeListened: s.c.Future[String] = s.c.i.Promise$DefaultPromise@2a7fc3c5

scala> songThatCanBeListened.onSuccess {case song => println(s"playing a song : $song")}

scala> playing a song : first 5 minutessecond 5 minuteslast 5 minutes


Stream Consumer - II

The other way of consuming the stream(Enumerator) is with piping/sinking operator(I don't exactly know what Play apis calls it),
scala> val consumeSongOtherWay = Iteratee.flatten(songStream |>> Iteratee.consume[String]()).run
consumeSongOtherWay: s.c.Future[String] = s.c.i.Promise$DefaultPromise@6c9f484a

scala> consumeSongOtherWay.onSuccess{case wholeSong => println(wholeSong)}

No comments:

Post a Comment