How to consume a multipart response with Play Framework 2 (Scala)

In our current project we have to consume a REST web service that provides data as a multipart document, e.g. a list of videos (or video metadata) where each video is a single part. While it’s common to submit or handle multipart requests (e.g. multipart/form-data), the multipart content type is not widely used for http responses. In consequence, the support of http clients for multipart responses is not as good as for requests. The Play Frameworks WS client for example does not directly support responses of type multipart/*.

With this post I want to show how multipart responses can be processed using Plays WS client, and because we’re working with maybe 100s of megabytes, this will be done incrementally in a non blocking, asynchronous manner. Each part (video metadata) should be sent to an actor for further processing (which is out of scope of this blog post).

The multipart response

At first let’s examine the response to handle. The http response headers specify the content type to be multipart/package, which also includes the “boundary” parameter:

200 Ok
Content-Type: multipart/package; boundary="_----------=_1399621717135"
Transfer-Encoding: chunked
Cache-Control: max-age=864000
Date: Fri 09 May 2014 07:48:37 +0000
Last-Modified: Fri 09 May 2014 07:48:37 UTC
Etag: 201405071528572365

The response body delivers the multipart document like this:

--_----------=_1399621717135
Content-Disposition: inline
Content-Type: application/vnd.vidzapp.videometa+json
Link: </videos/1000010298>;rel="self"
Content-Length: 2388

{ "name": "video title 1", ... }

--_----------=_1399621717135
Content-Disposition: inline
Content-Type: application/vnd.vidzapp.videometa+json
Link: </videos/1000010299>;rel="self"
Content-Length: 2731

{ "name": "video title 2", ... }

--_----------=_1399621717135--

Parsing this should be not too hard, but remember that we want to process the parts as a stream, because the whole document might not fit into the available memory.

Processing large responses with WS

Because we’re accessing an http service, the first choice in Play for this is the WS client, sitting on top of the AsyncHttpClient (in future versions of Play there will be alternative underlying implementations). Fortunately, the WS client allows to consume a response in a streaming fashion, by accepting a consumer (for WS client coming with Play 2.3.0-RC1):

def get[A](consumer: WSResponseHeaders => Iteratee[Array[Byte], A])
  (implicit ec: ExecutionContext): Future[Iteratee[Array[Byte], A]]

So we need to provide a function from response headers to an iteratee that consumes bytes and produces some value (of type A) when finished.

To get an idea how this could look like, the documentation provides an example on how to write the response body to a file (check out the docs for more explanation):

But how do we parse the multipart response so that we can send each part to an actor for processing?

Play provides a multipart BodyParser

The Play Framework already provides a body parser that allows to process a multipart request like a stream using Iteratees. The implementation provides all we need, unfortunately its interface is not suitable for our use case. Here’s an excerpt of the multipartParser (complete source on github):

The issue is that this is tied to RequestHeader (request) and BodyParser, so we need to adapt this implementation to fit the requirements of the WS client and our needs as well.

Integration of the multipartParser with WS

Let’s first concentrate on the api.

  • Because the future result of our request should provide the returned response headers, they should be part of the iteratee result.
  • For testing, it might also be interesting how many parts have been processed.
  • Additionally, from the future result of the WS call we want to know if the response was processed or not (e.g. because the status code was not OK).

Thus, the result the iteratee eventually produces should be of type

Either[WSResponseHeaders, (WSResponseHeaders, Int)]

In the case of invalid response headers (like wrong status code), the response headers should be returned (as Left). If the response headers were ok and the response was processed, the response headers and the number of processed parts should be returned (as Right).

Therefore, the signature of the consume function passed to WS.get looks like this:

def consumeMultipart
  (partHandler: Map[String, String] => Iteratee[Array[Byte], Unit])
  (headers: WSResponseHeaders):
  Iteratee[Array[Byte], Either[WSResponseHeaders, (WSResponseHeaders, Int)]]

The partHandler is a function the receives the part headers, and returns an iteratee that consumes the bytes of the part payload and returns Unit, because we’re just sending the data to an actor for further processing. The partHandler function will be invoked for each part, and for each part a new iteratee should be returned to collect the data for that part (when the part is complete the iteratee receives Input.EOF).

The headers: WSResponseHeaders are defined as a separate parameter list so that we can create a concrete multipart consumer with a specific partHandler, and pass this to WS.get.

Now that we have the api of our consumer function, here’s the full source code:

As already said, most of this and especially the multipartConsumer implementation is just copied from the original multipartParser.

Usage

To show how to use the consumeMultipart function, especially how the partHandler might look like, I’ve created the following test (using ScalaTest + Play):

To mention it: This solution is based on Play 2.3.0 (RC1), because there was an issue in WS when running the iteratee in flatMap while the iteratee was already in state Done (e.g. because of invalid response headers).

One thing that’s not shown here is how to handle back-pressure (when data is much faster received than processed), this might be part of a separate blog post.

Kommentare