Hi all,
I have pushed a new version of the prototype of OpenRefine running on
Spark. I unfortunately have to force-push this branch
("spark-prototype"), so you might want to delete the branch on your side
if you have previously checked it out.
Packaged distributions are here:
-
http://pintoch.ulminfo.fr/a0c60c72ee/openrefine-win-spark.zip
-
http://pintoch.ulminfo.fr/5b42e2f945/openrefine-linux-spark.tar.gz
-
http://pintoch.ulminfo.fr/cf0b0e601c/openrefine-spark-mac-spark.dmg
Here is an overview of the main things that changed:
1. Spark import tab
I added a "Spark" import tab, where you can input a Hadoop URI to be
imported by Spark (it currently only accepts a single file at a time but
it could support more, like the other tabs). The idea behind this is to
let the user import a dataset directly via Hadoop's file system. The
existing file-based import tab cannot be used for this since it involves
uploading the dataset via the user's browser to the server: this is not
workable for large datasets even if they are stored locally. The import
tab for URL downloads is also not workable for large datasets, since it
downloads the entire file locally before importing it. That being said,
because the two import tabs are conceptually similar, we could consider
exposing them as a single tab to the user, and figuring out based on the
URL whether it should be downloaded locally or input through Spark.
I have only tested this tab with locally stored files for now. It still
makes sense to use this tab for large locally stored files since it
avoids one round-trip through the web browser, server, and back to the
file system.
In terms of user experience, the slight downside with this tab is that
we cannot use a standard "Browse" button - since it is not possible to
retrieve the full path of a selected file in Javascript (for obvious
security reasons). So it sadly has to be a plain text input even for
local files. I don't think we can work around that as long as OpenRefine
remains a web app (perhaps with something like Electron?), but then
again this should mostly be used with HDFS paths anyway.
Currently, projects imported through this Spark tab will still be
replicated in the workspace (so, on the local file system). So, this
means although you could use this tab to work on a S3 dataset, you will
actually retrieve the full dataset locally as soon as the project is
saved. It is possible to lift this restriction: we could provide an
option to work fully remotely on the dataset. This means that the saved
project would only contain a pointer to the original source. If
implemented this should obviously be controllable by the user. If the
dataset is stored far away, this will obviously incur noticeable
performance costs, but I believe this could be useful when connecting
OpenRefine to a local Spark cluster, where both the executors and the
data live.
2. Spark-based importers
I have added a new importer, for fixed-width files. This one uses a new
importer architecture which reads datasets natively with Spark, instead
of loading the entire dataset in the JVM and then sending it to Spark.
This new architecture is necessary to get the most of the Spark import
tab described above.
The CSV/TSV importer is currently still with the previous architecture,
which is much less efficient. Spark provides a CSV importer which we
could use: this would provide the same performance as the fixed-width
importer. However the devil is in the detail: Spark's CSV/TSV importer
does not support the same sort of options than what we currently use, so
we might have to keep both. I don't think we want the user to choose
between two CSV importers, so we should pick the appropriate one based
on the import options and input method (always use Spark's importer when
using the Spark import tab, for instance).
3. More efficient serialization
The first prototype used plain Java serialization to write project data,
which was very inefficient. I have changed it to use something pretty
similar to what OR 3.x uses: rows serialized in JSON, one per line, in a
gzip-compressed text file. This has the big advantage of being
completely independent from Spark, so project data can be read with
other code bases. It is also more robust to version changes, since we
completely control the JSON format of our model (which has been covered
by tests for a few releases now).
This also makes it easier (or at least, more imaginable) to provide a
migration route for projects saved with OR 3.x, since it would be
totally doable to serialize the projects in a similar format with OR 3.x
(such that they are forward-compatible). It is also imaginable for the
Spark-based OR to read existing OR 3.x projects (those serialized right
now with OR 3.3) without being able to move in the history.
4. Records mode
My first prototype did not support the records mode, I have added that
back. Supporting the records mode is a bit challenging since there is no
equivalent to it in most database systems as far as I am aware. To
support it, I had to define custom RDDs, to transform a RDD of rows into
a RDD of records in an efficient way (see RecordsRDD in the
spark-prototype branch). The main difficulty is that records can be
spread across partition boundaries, so the transformation cannot be a
pure map of partitions. So a small Spark job is required to stitch back
split records at partition boundaries. This seems suitably efficient so
far. This also required ensuring that the sorting and indexing
information is retained in the process: we want to be able to quickly
fetch records in the relevant partitions based on their indices (by
default you would scan irrelevant partitions as well). Although RDDs are
order-aware, Spark put surprisingly little emphasis on actually
preserving this order, so it is easy to lose this precious information
(perhaps because a lot of the use cases for Spark are around collections
where order does not matter). This involved working around some known
issues in Spark and submitting a PR to Spark
(
https://github.com/apache/spark/pull/28293, which has been accepted but
will sadly not be released any time soon as far as I can tell - but
thankfully the issue can be fixed on our side without patching Spark).
Note that in this prototype, the index of a record is the index of the
first row where it starts, unlike in 3.x and before, where it is simply
the number of records before it plus one. It is totally feasible to
adjust this to match the existing behaviour in an efficient way.
5. Small fixes
I tweaked various things, such as adding Windows native Hadoop binaries
in the distribution, to fix #2313.
6. My conclusions so far
The records mode was one of the main things I was worried about in this
project, and I am quite happy that it fits pretty neatly in Spark in the
end. So it removes a pretty big road block.
The fact that we really have to interface with Spark at a pretty low API
level means that there is definitely no hope of refactoring this to go
through Beam, as I am relying for this on data model aspects that are
really specific to Spark. That being said, Tom's suggestion to make the
data backend pluggable is increasingly appealing to me. It would mean
developing our own interface, which operations and changes would
interact with to implement the transformations, without being tied to a
specific platform. We would have a Spark implementation, and potentially
a fully in-memory implementation which could be faster on small datasets
by avoiding the task scheduling and serialization costs. It is
definitely a big chunk of work, but it could potentially be the only
option we have if we realize later on that Spark brings in latency that
is not acceptable for workflows on small datasets.
As always, I am keen to get your feedback on any of this.
Cheers,
Antonin