Introduction to Akka Streaming (Data Decomposition)
Streaming means taking elements from a continuous data source and performing a computation in the following stage, then emitting it to the next stage or to “sink” it into a database.
In some cases, you may want to use Akka streams to split parts of a received data into segments (data decomposition) and handle each fragment separately.
Let’s jump straight into an example.
For instance we received a list of measurements from a drone sensor in a single message and want to decompose this message into a list of individual measurement fragments.
The function mapConcat will convert an element to a stream of iterable elements.
// Json data we want to split
val json = “”” |{ |
“id”: “c75cb448-df0e-4692–8e06–0321b7703992”, |
“timestamp”: 1486925114, |
“measurements”: { |
“power”: 1.7, |
“rotor_speed”: 3.9, |
“wind_speed”: 10.1
| }
|} “””
// Perform mapConcat on
// data source
.stripMargin Source.single(json)
.mapConcat[Measurement]( s => toImmutable[Measurement](parse.apply(s)))
.runWith(Sink.foreach(println))
// output print:
// Measurement(c75cb448-df0e-4692–8e06–0321b7703992,1486925114,power,1.7)
// Measurement(c75cb448-df0e-4692–8e06–0321b7703992,1486925114,rotor_speed,3.9)
// Measurement(c75cb448-df0e-4692–8e06–0321b7703992,1486925114,wind_speed,10.1)
}
// Define parse function
def parse: String => Iterable[Measurement] = json => {
// Import json parse package
import DataJsonProtocol._
Try(json.parseJson.convertTo[Data])
// Perform pattern matching on
// results
match {
// If successful, define display
// parameters
case Success(data) => data
.measurements
.map( keyVal => Measurement(data.id, data.timestamp, keyVal._1, keyVal._2))
// Else, display error
case Failure(ex) => println(s”error while parsing json $ex”)
ex.printStackTrace()
Seq.empty
}
Thanks!
Thanks for taking the time to read this post.
Find me on;
Github: Github.com/VakinduPhilliam
Twitter: Twitter.com/VakinduPhilliam
LinkedIn: LinkedIn.com/in/VakinduPhilliam