New Spark-based prototype

172 views
Skip to first unread message

Antonin Delpeuch (lists)

unread,
Apr 26, 2020, 1:58:36 PM4/26/20
to openref...@googlegroups.com
Hi all,

I have pushed a new version of the prototype of OpenRefine running on
Spark. I unfortunately have to force-push this branch
("spark-prototype"), so you might want to delete the branch on your side
if you have previously checked it out.

Packaged distributions are here:

- http://pintoch.ulminfo.fr/a0c60c72ee/openrefine-win-spark.zip

- http://pintoch.ulminfo.fr/5b42e2f945/openrefine-linux-spark.tar.gz

- http://pintoch.ulminfo.fr/cf0b0e601c/openrefine-spark-mac-spark.dmg

Here is an overview of the main things that changed:

1. Spark import tab

I added a "Spark" import tab, where you can input a Hadoop URI to be
imported by Spark (it currently only accepts a single file at a time but
it could support more, like the other tabs). The idea behind this is to
let the user import a dataset directly via Hadoop's file system. The
existing file-based import tab cannot be used for this since it involves
uploading the dataset via the user's browser to the server: this is not
workable for large datasets even if they are stored locally. The import
tab for URL downloads is also not workable for large datasets, since it
downloads the entire file locally before importing it. That being said,
because the two import tabs are conceptually similar, we could consider
exposing them as a single tab to the user, and figuring out based on the
URL whether it should be downloaded locally or input through Spark.

I have only tested this tab with locally stored files for now. It still
makes sense to use this tab for large locally stored files since it
avoids one round-trip through the web browser, server, and back to the
file system.

In terms of user experience, the slight downside with this tab is that
we cannot use a standard "Browse" button - since it is not possible to
retrieve the full path of a selected file in Javascript (for obvious
security reasons). So it sadly has to be a plain text input even for
local files. I don't think we can work around that as long as OpenRefine
remains a web app (perhaps with something like Electron?), but then
again this should mostly be used with HDFS paths anyway.

Currently, projects imported through this Spark tab will still be
replicated in the workspace (so, on the local file system). So, this
means although you could use this tab to work on a S3 dataset, you will
actually retrieve the full dataset locally as soon as the project is
saved. It is possible to lift this restriction: we could provide an
option to work fully remotely on the dataset. This means that the saved
project would only contain a pointer to the original source. If
implemented this should obviously be controllable by the user. If the
dataset is stored far away, this will obviously incur noticeable
performance costs, but I believe this could be useful when connecting
OpenRefine to a local Spark cluster, where both the executors and the
data live.

2. Spark-based importers

I have added a new importer, for fixed-width files. This one uses a new
importer architecture which reads datasets natively with Spark, instead
of loading the entire dataset in the JVM and then sending it to Spark.
This new architecture is necessary to get the most of the Spark import
tab described above.

The CSV/TSV importer is currently still with the previous architecture,
which is much less efficient. Spark provides a CSV importer which we
could use: this would provide the same performance as the fixed-width
importer. However the devil is in the detail: Spark's CSV/TSV importer
does not support the same sort of options than what we currently use, so
we might have to keep both. I don't think we want the user to choose
between two CSV importers, so we should pick the appropriate one based
on the import options and input method (always use Spark's importer when
using the Spark import tab, for instance).

3. More efficient serialization

The first prototype used plain Java serialization to write project data,
which was very inefficient. I have changed it to use something pretty
similar to what OR 3.x uses: rows serialized in JSON, one per line, in a
gzip-compressed text file. This has the big advantage of being
completely independent from Spark, so project data can be read with
other code bases. It is also more robust to version changes, since we
completely control the JSON format of our model (which has been covered
by tests for a few releases now).

This also makes it easier (or at least, more imaginable) to provide a
migration route for projects saved with OR 3.x, since it would be
totally doable to serialize the projects in a similar format with OR 3.x
(such that they are forward-compatible). It is also imaginable for the
Spark-based OR to read existing OR 3.x projects (those serialized right
now with OR 3.3) without being able to move in the history.

4. Records mode

My first prototype did not support the records mode, I have added that
back. Supporting the records mode is a bit challenging since there is no
equivalent to it in most database systems as far as I am aware. To
support it, I had to define custom RDDs, to transform a RDD of rows into
a RDD of records in an efficient way (see RecordsRDD in the
spark-prototype branch). The main difficulty is that records can be
spread across partition boundaries, so the transformation cannot be a
pure map of partitions. So a small Spark job is required to stitch back
split records at partition boundaries. This seems suitably efficient so
far. This also required ensuring that the sorting and indexing
information is retained in the process: we want to be able to quickly
fetch records in the relevant partitions based on their indices (by
default you would scan irrelevant partitions as well). Although RDDs are
order-aware, Spark put surprisingly little emphasis on actually
preserving this order, so it is easy to lose this precious information
(perhaps because a lot of the use cases for Spark are around collections
where order does not matter). This involved working around some known
issues in Spark and submitting a PR to Spark
(https://github.com/apache/spark/pull/28293, which has been accepted but
will sadly not be released any time soon as far as I can tell - but
thankfully the issue can be fixed on our side without patching Spark).

Note that in this prototype, the index of a record is the index of the
first row where it starts, unlike in 3.x and before, where it is simply
the number of records before it plus one. It is totally feasible to
adjust this to match the existing behaviour in an efficient way.

5. Small fixes

I tweaked various things, such as adding Windows native Hadoop binaries
in the distribution, to fix #2313.

6. My conclusions so far

The records mode was one of the main things I was worried about in this
project, and I am quite happy that it fits pretty neatly in Spark in the
end. So it removes a pretty big road block.

The fact that we really have to interface with Spark at a pretty low API
level means that there is definitely no hope of refactoring this to go
through Beam, as I am relying for this on data model aspects that are
really specific to Spark. That being said, Tom's suggestion to make the
data backend pluggable is increasingly appealing to me. It would mean
developing our own interface, which operations and changes would
interact with to implement the transformations, without being tied to a
specific platform. We would have a Spark implementation, and potentially
a fully in-memory implementation which could be faster on small datasets
by avoiding the task scheduling and serialization costs. It is
definitely a big chunk of work, but it could potentially be the only
option we have if we realize later on that Spark brings in latency that
is not acceptable for workflows on small datasets.

As always, I am keen to get your feedback on any of this.

Cheers,

Antonin



Thad Guidry

unread,
Apr 26, 2020, 4:31:45 PM4/26/20
to openref...@googlegroups.com
I'm surprised that you said there is no equivalent to a record in any database system?
Many databases have record support.
What I think you mean is the concept of hierarchical fields (also simply known as related fields).
In RDBMS, the use of a row structure as the group holding related fields together, forms a record. (flat records)
In Document systems, the use of a document structure as the group holding related fields together, forms an individual document record, like in MongoDB and CouchDB and Elasticsearch.
In Columnar systems, the use of a column itself forms a continuous series of values together.  The primary key is the data itself, mapped into rowids, like Vertica, etc. Records are formed by [1][2]

The use cases (our Column operations with mass transformations) in OpenRefine are more closely aligned to an OLAP workload where traditionally Columnar systems (or column serialized structures) have been used.

[1] https://en.wikipedia.org/wiki/Column-oriented_DBMS
Fast retrieval of data is always due to Indexes, either by the innate structure itself, like Columnar systems, or by creating tons of them efficiently, like Elasticsearch does.

That all being said above...
I still feel that Apache Spark is not the right approach for us.
I still feel that Apache Flink is a better fit for us with both its Dataset API and affords of graph processing with its Gelly API, and not to mention Checkpoints and Savepoints. (and later we can have SQL support since it has Table API and SQL through Apache Calcite)

https://flink.apache.org/flink-architecture.html
https://flink.apache.org/flink-applications.html
https://flink.apache.org/flink-operations.html

I truthfully feel you are a rockstar, Antonin, for pushing forward on this PoC, and it is useful in it's own right, either to continue, or to evolve and adapt.
(btw, with Apache Flink you can even get nice generic type support built-in via Kyro... or you can have your wonderful user-defined types via TypeInfoFactory.  No waiting around for Apache Spark team. ;-)



Antonin Delpeuch (lists)

unread,
Apr 26, 2020, 5:49:56 PM4/26/20
to openref...@googlegroups.com
Hi Thad,

Of course the word "record" is used a lot of database systems - but that
does not mean they have anything in common.

Flink could be interesting in the future to run OpenRefine workflows on
streams (which I would be keen to have), but that is a significant step
away from what OR currently does. Aiming for a data model that is even
further away from what OpenRefine currently is would make a migration
even riskier. I don't think we have the time or resources to venture
into that right now. The leap by going to Spark is big enough already.
If we end up making the data processing backend pluggable then why not
develop a Flink implementation, but I do not think that should be a
priority.

I know you would be keen to "have SQL support", whatever that actually
means - it's easy to ask for some software to "support" some technology,
it's something else to come up with a meaningful integration that
actually helps people in their workflows. Remember Data Packages…

If you mean being able to run a SQL query on an OpenRefine project (or
multiple projects, perhaps, with joins between them), then this is
something we could do with most engines, including Spark (and again, the
availability of user defined types is not going to be a blocker for
that). But I am yet to be convinced that this is something worth
developing in OpenRefine: we need to have a clear vision of which user
workflows we are talking about. *Concrete* workflows :)

I know you don't like when I ask for something "concrete", but that is
what it really comes down to: you cannot plan migrations like this on
the basis of a sales pitch enumerating the technologies supported by
some platform. You need to have a look deep into the architecture to
understand how that platform could fit in and cater for our users' needs.

Antonin


On 26/04/2020 22:31, Thad Guidry wrote:
> I'm surprised that you said there is no equivalent to a record in any
> database system?
> Many databases have record support.
> What I think you mean is the concept of hierarchical fields (also simply
> known as related fields).
> In RDBMS, the use of a *row* structure as the group holding related
> fields together, forms a record. (flat records)
> In Document systems, the use of a *document* structure as the group
> holding related fields together, forms an individual document record,
> like in MongoDB and CouchDB and Elasticsearch.
> In Columnar systems, the use of a *column* itself forms a continuous
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/types_serialization.html>
> via TypeInfoFactory.  No waiting around for Apache Spark team. ;-)
>
> Thad
> https://www.linkedin.com/in/thadguidry/
>
>
> --
> You received this message because you are subscribed to the Google
> Groups "OpenRefine Development" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to openrefine-de...@googlegroups.com
> <mailto:openrefine-de...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/openrefine-dev/CAChbWaML_zgmWgX4f%3Dzh%3Dtk5UfDXLjceXmst3vOpgL2aK7n5UQ%40mail.gmail.com
> <https://groups.google.com/d/msgid/openrefine-dev/CAChbWaML_zgmWgX4f%3Dzh%3Dtk5UfDXLjceXmst3vOpgL2aK7n5UQ%40mail.gmail.com?utm_medium=email&utm_source=footer>.

Thad Guidry

unread,
Apr 27, 2020, 8:47:02 AM4/27/20
to openref...@googlegroups.com
On Sun, Apr 26, 2020 at 4:49 PM Antonin Delpeuch (lists) <li...@antonin.delpeuch.eu> wrote:
Hi Thad,

Of course the word "record" is used a lot of database systems - but that
does not mean they have anything in common.

Flink could be interesting in the future to run OpenRefine workflows on
streams (which I would be keen to have), but that is a significant step
away from what OR currently does. Aiming for a data model that is even
further away from what OpenRefine currently is would make a migration
even riskier. I don't think we have the time or resources to venture
into that right now. The leap by going to Spark is big enough already.
If we end up making the data processing backend pluggable then why not
develop a Flink implementation, but I do not think that should be a
priority.

Flink supports batch processing, not only streams.  You may not be aware of that.  It was contributed to by Alibaba known as Blink and now merged into Flink and considered one of the fastest and used today by Ericsson, Capital One, etc.
But most of our users won't ever need to process incredibly large data sets.  But just large oftentimes.  There are several advantages which I'll mention later.
I understand that the leap and time and effort you are putting forth is already monumental for us, and I truly appreciate and stand in awe at this effort.  It does not go unnoticed.
Agree if the data processing backend is pluggable that would afford the most flexibility as Tom has mentioned.

I know you would be keen to "have SQL support", whatever that actually
means - it's easy to ask for some software to "support" some technology,
it's something else to come up with a meaningful integration that
actually helps people in their workflows. Remember Data Packages…


Data Packages was a Google push, not mine.
Since Google was active within W3C in pushing standards like CSV on the Web, etc., which is semi-successful now and Google and others leverage some of that and offer services back such as the very cool indeed Google Dataset Search.
Which many of our users utilize to find additional data, wrangle it, and contribute back to efforts like Wikidata.


If you mean being able to run a SQL query on an OpenRefine project (or
multiple projects, perhaps, with joins between them), then this is
something we could do with most engines, including Spark (and again, the
availability of user defined types is not going to be a blocker for
that). But I am yet to be convinced that this is something worth
developing in OpenRefine: we need to have a clear vision of which user
workflows we are talking about. *Concrete* workflows :)


Many of our users are already familiar with SQL, this is true in GLAM (where traditionally RDBMS's are used) and Newsrooms (Propublica, etc.)
I am not pushing for SQL usage in OpenRefine, but merely stating as I have in the past that its just a "nice to have, but not required".
I know you don't like when I ask for something "concrete", but that is
what it really comes down to: you cannot plan migrations like this on
the basis of a sales pitch enumerating the technologies supported by
some platform. You need to have a look deep into the architecture to
understand how that platform could fit in and cater for our users' needs.


Sorry I offended you somehow.  Perhaps you are hinting that I am wasting your time with indirection's.  That was not my intent.
I am very cognizant of your time as well as others.
I have considerable time available now to spend in research, asking others within any community, and gathering feedback on any architectures.
I had looked quite deeply into Apache Flink, since I was already familiar with it's capabilities, fitting use cases, and API's available.

Just a few interesting areas of Flink that you might not be aware of:
Flink supports batch processing, on bounded datasets, not only streams.  This is handled both by the DataSet API as well as Table API & SQL.
The DataSet API has a plethora of dataset transformations that are useful to OpenRefine users and that correlate to its existing functions, some not found in Spark.
  • Notably, Records mode can be handled via Grouped DataSet transformations in Flink, where grouping is by user-defined functions and expressions.
  • Various keyed operations in OpenRefine like cross() for instance, have a corollary in Flink via CoGroup functions which are even more powerful.
  • deduplication can be handled by columns, expressions, or key values.
  • Some of OpenRefine's sequenced operations (intermediate steps that need to be performed first that filter on large data, then finally apply a transformation).
    • These workflows can directly leverage Flink's GroupCombine on a Grouped DataSet where the advantage over Spark is that memory management is automatic, and no data exchange needed between partitions.
Regards,

Thad Guidry

unread,
Apr 27, 2020, 9:24:33 AM4/27/20
to openref...@googlegroups.com
Let me also share some of our past discussion and research before you joined OpenRefine.
Note the below thread was from 2017, prior to the Blink merge and additional niceties added to the DataSet API and Table API & SQL.



Antonin Delpeuch (lists)

unread,
Apr 27, 2020, 9:48:46 AM4/27/20
to openref...@googlegroups.com
Hi Thad,

Thanks for the link to the previous discussion.

On 27/04/2020 14:46, Thad Guidry wrote:
> Flink supports batch processing, not only streams.

Yes, I am aware that Flink supports batch and stream processing.
Spark and Flink are very similar in many respects, see below.
> Many of our users are already familiar with SQL, this is true in GLAM
> (where traditionally RDBMS's are used) and Newsrooms (Propublica, etc.)
> I am not pushing for SQL usage in OpenRefine, but merely stating as I
> have in the past that its just a "nice to have, but not required".

You are very welcome to push for an idea but please can it be more than
"X support" or "X usage" where X is some technology? Forget about the
feature lists of big data frameworks. As a user, what do you want to do?

>
> Just a few interesting areas of Flink that you might not be aware of:
> Flink supports batch processing, on bounded datasets, not only streams. 
> This is handled both by the DataSet API
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/batch/> as
> well as Table API & SQL
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/>.
> The DataSet API has a plethora of dataset transformations that are
> useful to OpenRefine users and that correlate to its existing functions,
> some not found in Spark.
>
> * Notably, Records mode can be handled viaGrouped DataSet
> transformations
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/batch/dataset_transformations.html#transformations-on-grouped-dataset>
> in Flink, where grouping is by user-defined functions and expressions.

Initially, I was also planning to implement the records mode via Spark's
grouping functions (which are very similar - I don't see anything that
cannot be done in Spark in your link).

The problem with this approach is that it is very inefficient, because
going through a generic grouping API will forget about the locality of
records, which is crucial to optimize this computation. This will be
inefficient in Flink too.

Imagine you have a function which computes for each row its record id,
and then group rows together using this function. The algorithm which
will do this grouping is not aware that rows belonging to the same
record are consecutive, so it will work very hard to make sure that if a
row at the beginning of the dataset has the same record id as one at the
end (potentially stored in different partitions), they will be reunited.

The solution I have in Spark could probably be implemented in Flink too,
I suspect, although I haven't checked that.

> * Various keyed operations in OpenRefine like cross() for instance,
> have a corollary in Flink via CoGroup
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/batch/dataset_transformations.html#cogroup>
> functions which are even more powerful.

Spark has that too, although it is definitely not how I plan to
implement cross.

> * deduplication
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/batch/dataset_transformations.html#distinct>
> can be handled by columns, expressions, or key values.

Spark has that too.

> * Some of OpenRefine's sequenced operations (intermediate steps that
> need to be performed first that filter on large data, then finally
> apply a transformation).
> o These workflows can directly leverage Flink's GroupCombine on a
> Grouped DataSet where the advantage over Spark is that memory
> management is automatic, and no data exchange needed between
> partitions.

I do not understand what you mean here.

Anyway, the point is: given how close Spark and Flink are, I don't
really see the point of switching. Flink puts streams first, Spark puts
batches first, both can do both streams and batches, are both based on
Hadoop, and so on.

What I would really like is that you try to change perspective and think
more about the user point of view, because this is what you do best.
Leave the choice of technical solutions to the programmers. As a user,
what do you want?

Overall, my goal in terms of UX is to make the change of backend
unnoticeable for the user. OpenRefine is popular to work on small
datasets and we cannot compromise this use case.

So it is critical that the new data processing backend adds very little
overhead in terms of task scheduling and serialization. That is my
number one priority at the moment. Spark's RDD API feels like a good
pick for this since there is basically no optimization going on at this
level, compared to Flink where there seems to be more indirections going on.

The problem with using a big data framework for small data is that all
the overheads of the framework become a lot more noticeable. So it's not
useful to look at feature lists and "Spark vs Flink benchmarks" or the
like - these comparisons are geared towards very different use cases.
Both of these frameworks will support large datasets, as they are
designed for that, but this is not the area where we should evaluate them.

That's exactly the point made here in the earlier thread:
http://mail-archives.apache.org/mod_mbox/flink-community/201706.mbox/%3cCAAdrtT1kJqqh_p8=-i-ChF4xLjYAdqCLU...@mail.gmail.com%3e

Antonin

Thad Guidry

unread,
Apr 27, 2020, 10:30:37 AM4/27/20
to openref...@googlegroups.com
You have the confidence here that Spark's RDD API is agreeable for us, and I sense you are far enough along that we should continue.  Thanks for listening.

My 2 main usage problems with current architecture:

1. Joins - joining large datasets via custom functions/keys.  (imagine joining Freebase Property values with Wikidata's statements)
2. Records mode - schema-based relational table columns are not ideal for this, and David's work on the Records overlay was just a stepping stone, it was where we stopped, but wasn't our final destination.  For instance, we always envisioned we would have similar ways to produce and work with records, as we kinda had within Freebase.  Freebase's multi-value properties were amazing to work with (document model-like structures built on graph edges), and our idea, although not realized by him and I, but still in the back of my mind, was being able to work with data in that same fashion (schema-less), easily generating records from the data via new GREL syntax or smart menu options, as well as querying records without worrying about thinking of joins.  Read more here:  Freebase multi-value properties  


Thad Guidry

unread,
Apr 27, 2020, 10:44:02 AM4/27/20
to openref...@googlegroups.com
Which is why I thought Flink...with the Gelly API...brings the vision of Records (really just relations which can be pushed into a Graph structure, or Document-store like Elasticsearch, as you are thinking with continuing Tabular structure with IDs across RDD's).

But perhaps that vision for Records mode, flexible schema (schema-less) can still be done with Spark, or a different data processing backend later.


Antonin Delpeuch (lists)

unread,
Apr 27, 2020, 12:37:13 PM4/27/20
to openref...@googlegroups.com
Hi Thad,

Yes, it would be great to make it easier to join two projects. The cross
function is not ideal for that. But that being said it is not clear to
me what the replacement should look like.

As you know I am really not a fan of the records mode in its current
form. I am also very keen to find a replacement for it, but it is the
same problem here: I don't think we have identified a clear replacement
for it. We need to make progress in that thread:
https://groups.google.com/forum/#!searchin/openrefine/future|sort:date/openrefine/X9O8NBC1UKQ

For now, I am trying to make incremental changes: first, migrate to
Spark while keeping the existing functionality, then, figure out how we
could improve over things like cross and records.

If I attempt to solve all these problems at once this is guaranteed to
fail in a pretty spectacular way.

Antonin
> <https://developers.google.com/freebase/guide/basic_concepts#more_properties>
>  
>
> Thad
> https://www.linkedin.com/in/thadguidry/
>
> --
> You received this message because you are subscribed to the Google
> Groups "OpenRefine Development" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to openrefine-de...@googlegroups.com
> <mailto:openrefine-de...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/openrefine-dev/CAChbWaMN5p10Yqo04zN%3DYePQcCYNTP5jeHxLUYiBb_TcZui-uw%40mail.gmail.com
> <https://groups.google.com/d/msgid/openrefine-dev/CAChbWaMN5p10Yqo04zN%3DYePQcCYNTP5jeHxLUYiBb_TcZui-uw%40mail.gmail.com?utm_medium=email&utm_source=footer>.

Thad Guidry

unread,
Apr 27, 2020, 12:51:57 PM4/27/20
to openref...@googlegroups.com
Sure, I understand the iterative step-by-step approach for our migration.  And agree it lowers the risk for us.
I was just quickly responding to your concrete user workflow question.
Glad we also agree on the many improvements we want to make in areas

In regards to hierarchical data displays... I've been researching a few and should have a paper put together comparing/contrasting a few options for us.
Curious, have you yourself done any research in that area yet that you could throw up on a shareable doc? or not?


Antonin Delpeuch (lists)

unread,
Apr 27, 2020, 1:00:16 PM4/27/20
to openref...@googlegroups.com
I haven't written up anything about this, no. It would be a useful thing
to do.

Antonin
> --
> You received this message because you are subscribed to the Google
> Groups "OpenRefine Development" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to openrefine-de...@googlegroups.com
> <mailto:openrefine-de...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/openrefine-dev/CAChbWaOm%2BagLvVcGQhcaLGo20aRKYe9f6J51d1W1gPmhGWGpoQ%40mail.gmail.com
> <https://groups.google.com/d/msgid/openrefine-dev/CAChbWaOm%2BagLvVcGQhcaLGo20aRKYe9f6J51d1W1gPmhGWGpoQ%40mail.gmail.com?utm_medium=email&utm_source=footer>.

Flavio Pompermaier

unread,
Apr 27, 2020, 2:45:50 PM4/27/20
to openref...@googlegroups.com
Sorry for not being so helpful here but unfortunately I don't have any time to follow the details of the evolution of this part.
I'm very interested about the evolution of this part because we would like to use OpenRefine also in our big data projects.
I can just give some insights based on my experience but beyond that I have to blindly trust the work you're carrying out.

Following the points of this discussion I can just give my point of view about some choice (I can be very helpful with Flink but much less with Spark):
  • Spark vs Flink: I'm a super fan of Flink (beyond OR of course) and I'm probably the oldest user of it (we are using it at Okkam since 2012, when it still was Stratosphere). Btw, I have to agree with Antonin that AT THE MOMENT Spark is undoubtedly superior wrt Flink in the batch area. And this is even worse if you consider Machine Learning and AI. However, I can tell you that since Alibaba entered in the project (about a year ago) the development of Flink improved dramatically and I think that within the end of this year it would compete with Spark also for what concerns with batch. Also for machine learning and AI there was a recent contribution by Alibaba that open sourced Alink [1] (it's machine learning platform based on Flink) and I'm pretty sure that they will improve a lot also that part. But not in the short time. Also the Table API and SQL support is moving super fast in this moment in Flink.
  • Flink Gelly: Gelly is just a library of Flink that introduce classes and operators to work with graphs. However it's quite unused and I suggest to avoid it's usage. Flink Stateful Functions[2] could be a better fit for graph processing (but also this library is in early stage and thus immature/unstable for the moment). But I don't think that Gelly or Statefun are needed in OpenRefine (at least at the moment)
  • Flink and Pyton: Flink now has a very good support to Python as a first class citizen[3]. However I didn't tried it yet so I can't tell how performant their approach is
  • Record vs Row model: actually I can't understand the problem here. Isn't Records just a group-by/key-by? Could you explain me better why this is a problem in Spark?
  • Flink Dataset vs Stream API: they are in a phase of deep reworking of those API. From what I know they are currently unifying the 2 in order to have a single API (Datastream) but with different optimizations if the datastream if finite (batch) or infininte (stream), This will come with the finalization of the merging of Blink. Batch API are not optimized at all at the moment (wrt Spark)
  • General approach: ok to go with Spark right now but make it easy to switch the execution engines. I think Dremio and Zeppelin have the best approach to this topic and can be took as reference
  • Refine history to SQL: I think this would be a VERY interesting third-party project to make it easy to port OR projects to Flink or Spark SQL

To unsubscribe from this group and stop receiving emails from it, send an email to openrefine-de...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/openrefine-dev/2ec6f3ca-678b-37e3-0b03-dea23fc98989%40antonin.delpeuch.eu.

Antonin Delpeuch (lists)

unread,
Apr 27, 2020, 7:08:18 PM4/27/20
to openref...@googlegroups.com
Hi Flavio,

Thanks for chiming in! Replying inline.

On 27/04/2020 20:45, Flavio Pompermaier wrote:
> * *Record vs Row model*: actually I can't understand the problem here.
> Isn't Records just a group-by/key-by? Could you explain me better
> why this is a problem in Spark?

It is not a problem anymore, I have found a nice way to do it, I think:
https://github.com/OpenRefine/OpenRefine/blob/spark-prototype/or-model/src/main/java/org/openrefine/model/rdd/RecordRDD.java

But I'd be curious to hear how you would do it efficiently with groupBy
/ keyBy - in general it seems impossible to avoid shuffles with that API.


> * *General approach*: ok to go with Spark right now but make it easy
> to switch the execution engines. I think Dremio and Zeppelin have
> the best approach to this topic and can be took as reference

Thanks, I'll have a look at those.

> * *Refine history to SQL*: I think this would be a VERY interesting
> third-party project to make it easy to port OR projects to Flink or
> Spark SQL

I do not think exporting a history to SQL is doable in general, and I am
not following this approach.

For instance, how would you implement the reconciliation operations? Or
even things like key-value columnize?
https://docs.openrefine.org/operations/key_value_columnize.html

As far as I can tell SQL is just not the tool for the job.

Antonin

Flavio Pompermaier

unread,
Apr 28, 2020, 5:54:03 AM4/28/20
to openref...@googlegroups.com
If your problem was to avoid shuffling then you have to use a smart custom partitioning strategy[1] like you did somehow in the RecordRDD (if I'm not mistaken).

Probably Refine does a lot more than pure SQL but maybe it's not..this is something I always asked myself. Indeed, the SQL translation of the history is something that could be a nice experiment if someone is brave enough to engage this challenge: maybe it's not fully compatible at 100% but you could read the history first and check whether that history is convertible or not. 
I think most of the times it will. But of course this is those kind of experiments that could be nice for a student or a person who wants to have fun in its spare time..

The key-value columnize is something I don't know but from what I read maybe you can still translate to SQL using an external select? Like SELECT X.x, (some expression) AS Y FROM (SELECT * FROM XXX)?


--
You received this message because you are subscribed to the Google Groups "OpenRefine Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to openrefine-de...@googlegroups.com.

Antonin Delpeuch (lists)

unread,
Apr 28, 2020, 6:12:07 AM4/28/20
to openref...@googlegroups.com
On 28/04/2020 11:53, Flavio Pompermaier wrote:
> Probably Refine does a lot more than pure SQL but maybe it's not..this
> is something I always asked myself. Indeed, the SQL translation of the
> history is something that could be a nice experiment if someone is brave
> enough to engage this challenge: maybe it's not fully compatible at 100%
> but you could read the history first and check whether that history is
> convertible or not. 
> I think most of the times it will. But of course this is those kind of
> experiments that could be nice for a student or a person who wants to
> have fun in its spare time..

It's something that could be done outside of OR itself, just working on
the JSON history. Especially if you do not intend to have full support
for all operations.

In OpenRefine itself, the backend has to support all operations of
course. We cannot afford to throw away things like reconciliation :)

>
> The key-value columnize is something I don't know but from what I read
> maybe you can still translate to SQL using an external select? Like
> SELECT X.x, (some expression) AS Y FROM (SELECT * FROM XXX)?

It is a pivot-like operation and there are some ways to do things like
this in SQL, but they either rely on vendor-specific SQL statements
(PIVOT in Microsoft SQL Server, or in Spark…) or by statically
hard-coding the resulting column names in the query:
https://postgresql.verite.pro/blog/2018/06/19/crosstab-pivot.html

But even with these approaches, good luck with generating the records
structure when there are multiple values:
https://docs.openrefine.org/operations/key_value_columnize.html#entries-with-multiple-values-in-the-same-column

Perhaps it can be done, but I doubt this would be very efficient.

Antonin
> <mailto:openrefine-dev%2Bunsu...@googlegroups.com>.
> --
> You received this message because you are subscribed to the Google
> Groups "OpenRefine Development" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to openrefine-de...@googlegroups.com
> <mailto:openrefine-de...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/openrefine-dev/CALQ49Kk931%2BZH1SBU0vVthGS25GGL7ss9twT7EqM_XrZv8BoMw%40mail.gmail.com
> <https://groups.google.com/d/msgid/openrefine-dev/CALQ49Kk931%2BZH1SBU0vVthGS25GGL7ss9twT7EqM_XrZv8BoMw%40mail.gmail.com?utm_medium=email&utm_source=footer>.

Flavio Pompermaier

unread,
Apr 28, 2020, 6:28:30 AM4/28/20
to openref...@googlegroups.com
Of course this part of SQL would be a third party project, an experimental extension.
It's something that could be interesting from a philosophical perspective..but could be useful to better understand what OR does that other SQL tools (like Dremio or Trifacta) can't.

To unsubscribe from this group and stop receiving emails from it, send an email to openrefine-de...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/openrefine-dev/55d07f44-26ac-6d5a-04aa-d84ac68b42d3%40antonin.delpeuch.eu.

Antoine Beaubien

unread,
May 7, 2020, 9:04:32 PM5/7/20
to OpenRefine Development
Hi Thad and Antonin,

   While reading this interesting discussion on tools and our record mode, I have a double interrogation:
  1. Do we plan at some point in the future to have more than one table in a project?
  2. Do we plan at some point in the future to be able to save « trees » in a project?
Regards,
   Antoine

Antonin Delpeuch (lists)

unread,
May 9, 2020, 12:31:40 PM5/9/20
to openref...@googlegroups.com
Hi Antoine,


On 08/05/2020 03:04, Antoine Beaubien wrote:
> 1. Do we plan at some point in the future to have more than one table
> in a project?

It could be an interesting generalization of OR. It's not in our
milestones for this year, but if you have ideas about how this should
work feel free to expand :)

> 2. Do we plan at some point in the future to be able to save « trees »
> in a project?

Do you mean representing hierarchical information in a more faithful way
than the records mode? I would be very keen to have that. As you know we
have been discussing this before:
https://groups.google.com/forum/#!searchin/openrefine/future|sort:date/openrefine/X9O8NBC1UKQ

If that's not what you mean, just expand :)

Antonin

>
> Regards,
>    Antoine
>
> Le lundi 27 avril 2020 12:51:57 UTC-4, Thad Guidry a écrit :
>
> Sure, I understand the iterative step-by-step approach for our
> migration.  And agree it lowers the risk for us.
> I was just quickly responding to your concrete user workflow question.
> Glad we also agree on the many improvements we want to make in areas
>
> In regards to hierarchical data displays... I've been researching a
> few and should have a paper put together comparing/contrasting a few
> options for us.
> Curious, have you yourself done any research in that area yet that
> you could throw up on a shareable doc? or not?
>
> Thad
> https://www.linkedin.com/in/thadguidry/
> <https://www.linkedin.com/in/thadguidry/>
>
>
> Le lundi 27 avril 2020 12:37:13 UTC-4, Antonin a écrit :
>
> Hi Thad,
> Yes, it would be great to make it easier to join two projects.  
>
> --
> You received this message because you are subscribed to the Google
> Groups "OpenRefine Development" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to openrefine-de...@googlegroups.com
> <mailto:openrefine-de...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/openrefine-dev/9c034401-2385-4418-9b7c-4619a59efbe5%40googlegroups.com
> <https://groups.google.com/d/msgid/openrefine-dev/9c034401-2385-4418-9b7c-4619a59efbe5%40googlegroups.com?utm_medium=email&utm_source=footer>.

Antoine Beaubien

unread,
May 10, 2020, 12:59:45 AM5/10/20
to OpenRefine Development
Hi Antonin, 
>  2. Do we plan at some point in the future to be able to save « trees »
>     in a project?
Do you mean representing hierarchical information in a more faithful way
than the records mode? I would be very keen to have that. As you know we
have been discussing this before:
https://groups.google.com/forum/#!searchin/openrefine/future|sort:date/openrefine/X9O8NBC1UKQ
If that's not what you mean, just expand :)

« records mode » enables us to have one level of hierarchy from a flat table. For that, I would like to be able, from the same table, to change which column is the « current » sub-hierarchy. That way, as a OR user, I could access all hierarchies from my flat table, just one at the time. That also would be good, but it's not what I meant.

I would like that an OR project be a collection of tables, trees, all types of schemas, meaning: 1) WD schemas, of course, but also SQL schemas from external import (and maybe, one day, external direct export), but also INTERNAL schemas between the tables and trees inside the OR project. So a movies project could have a table for movies, a table for people, a table for compagnies.

So, if an OR project could store trees (json, xml, or generic), then, they could be used in transformation, either to extract data, or a small tree could serve as a template to generate either subtree used elsewhere (i.e. in templating export). These dataset could just be called like variables, i.e.
table["My-Table-1"].rows
or
tree["My-First-Tree".branches]

The idea is really to have an unity of cooperating datasets. With internal and external paths declarations. So you could have an informational ecosystem with linked tables and trees inside your project, and outside connections to other datasets. It could be very useful, especially if OR becomes more used by the general public.

Regards,
   Antoine

Antonin Delpeuch (lists)

unread,
May 23, 2020, 8:19:06 AM5/23/20
to openref...@googlegroups.com
Hi all,

Here are some updates about this work.

I have updated the spark-prototype branch with a working snapshot of the
prototype. I can create packaged snapshots if there is interest (but
there is not much difference with the previous one in terms of UX).

This month I have implemented the idea (suggested in this thread) of
making the datamodel implementation pluggable, so that we could easily
switch from Spark to something else. I am really happy about this move
because it does make things a lot cleaner:

- I think there is still a chance that we might not want to use Spark by
default. For small datasets, the overhead added by Spark in task
serialization and scheduling is significant compared to the actual data
processing time. Concretely you can experience that in the prototype, by
playing with the UI interactively: some operations are a bit less
reactive than in the 3.x branch. Because I have not spent a huge amount
of time optimizing the implementation so far, there might be ways to
reduce this overhead while keeping everything in Spark. But we need to
spend a lot of effort to do this optimization, and we only find out if
we are happy with the performance at the end of this optimization
effort. So that's obviously a risk for the project. Now the good thing
with this refactoring is that it gives us a simple escape route: we can
implement our own spark-like runner, which keeps the partitioning and
lazy evaluation of datasets but avoids the task serialization and
scheduling (because we can rely on the fact that everything is executed
in the same process). Such a runner would still offer nice scalability
guarantees (we can work on datasets which do not fit in RAM) without
compromising on the responsiveness of the tool on small datasets.

- It is possible to develop implementations of the data model for other
big data backends (Flink, or even Beam itself). These are worth
developing even if they incur significant overheads: they would just be
used to run OR recipes in these environments, rather than being used in
OR's backend when using the tool interactively.

- Testing is a lot easier. I have created a naive implementation of the
data model (which stores everything in RAM), and that is the
implementation that is used to run tests of operations. So we only have
to create a Spark instance to test the Spark implementation of the data
model. This means that tests on operations (or rather, changes) are
quick to run, too. There is also a common test suite that all
implementations of the data model can extend: this makes it even easier
to develop new implementations. If we want to be extra safe we could
also configure our test suite to run operation tests with both the
testing runner and the spark runner: that would for instance make sense
for larger integration tests. This would make sense because some bugs
only appear in a spark-specific context (the most common one being task
serialization).

Here is a quick overview of the architecture. The idea is very similar
to Apache Beam, but by implementing our own interface we can control the
API as we wish, which is important to leave space for optimizations and
cater for the peculiar aspects of our data model (such as the records mode).

The main interface is GridState, which represents a state of the grid at
some point in a recipe.
https://github.com/OpenRefine/OpenRefine/blob/spark-prototype/or-model/src/main/java/org/openrefine/model/GridState.java
For now, not many operations can be run on the grid: basically only maps
on rows or records. As I migrate more change classes, I will add more
methods to this interface (removing rows, reordering rows, performing
stateful maps on rows, and so on). The GridState is a rough analogue to
Spark RDDs or Beam PCollections: it is the basic representation of a
dataset, and we can run operations on them, which gives new collections.

The interface that represents an implementation of the datamodel is
DatamodelRunner, which basically just a factory for GridState.

Both of these interfaces are defined in the "or-model" Maven module,
which does not depend on Spark. Then, we have an "or-spark" module which
does depend on Spark and provides an implementation of these interfaces
that relies on Spark's RDDs. The Spark context is held by the
SparkDatamodelRunner.

The "or-testing" module also depends on "or-model" and implements two
things:
- a testing runner which does everything in RAM;
- a testing suite, which holds a collection of tests that all
implementations of the data model should satisfy. This suite is defined
as main source file, not as test file, so that other modules (such as
"or-spark") can rely on it in their own tests.

For now, Spark is still used by default in the tool: "or-spark" is added
as a runtime dependency of the "main" module, and the
SparkDatamodelLoader is dynamically loaded at startup. This is not
user-configurable currently, but it will be simple to expose that later:
one could imagine adding command-line options to the `refine` scripts
and executables to select which datamodel implementation to use (and
configure it, for instance to connect to an existing Spark cluster).

I have also migrated of a more significant set of operations:
- split columns
- split multi-valued cells (as records)
- join multi-valued cells
- move columns
- reorder columns
- remove columns
- rename columns

These migrations were a bit slow to start with, because I have been
refining the GridState interface as I notice problems in it, and also
because Change classes are basically not tested, so I need to write
tests as I go along.

For the coming weeks, my plan is to migrate all the other changes, which
is going to require designing a mechanism to store change data (for
reconciliation, fetching urls, fetching data from reconciled values).
This must be done via the datamodel interfaces so that runners can load
the data in an appropriate way, making it easier to join it with project
data.

Let me know if anything is unclear or if you feel like this is not going
in the right direction!

Cheers,
Antonin

Thad Guidry

unread,
May 23, 2020, 9:21:15 AM5/23/20
to openref...@googlegroups.com
This month I have implemented the idea (suggested in this thread) of
making the datamodel implementation pluggable, so that we could easily
switch from Spark to something else. I am really happy about this move
because it does make things a lot cleaner:


YEAHHHHHHHH!  HUGE! Congrats on this!

I'll find the time this weekend to explore the changes and test a bit.


Tom Morris

unread,
Jun 1, 2020, 5:45:53 PM6/1/20
to openref...@googlegroups.com
This sounds like it's evolving in a good direction. I particularly like that it's not tied to Spark alone.

Is there anything that describes how facets interact with operations in the new design? One of the things that I found when I looked into this is that the current interface is awkward and hard to abstract since it has to resolve to a set of rows that get operated on when being able to express it as the equivalent of a SELECT clause would allow you to push the filtering down into the database engine.

Is there an analysis of the current operations with a breakdown of which ones are easy / hard / impossible to migrate to the new architecture? Now would be the time to drop any particularly awkward operations which inhibit an efficient backend architecture.

It looks like there's a package rename mixed in with the backend rearchitecture work. Was the rename done before everything else? If so, is there a tag or commit that can be used as a base to diff against?

I still haven't been able to look at this in great details, but I'll keep poking at it as I find time.

Tom

--
You received this message because you are subscribed to the Google Groups "OpenRefine Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to openrefine-de...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/openrefine-dev/ced64272-0201-99bc-43fb-82566470386c%40antonin.delpeuch.eu.

Antonin Delpeuch (lists)

unread,
Jun 21, 2020, 8:43:20 AM6/21/20
to openref...@googlegroups.com
Hi Tom,

I realize I have not replied to this! Very sorry about that.

On 01/06/2020 23:45, Tom Morris wrote:
> Is there anything that describes how facets interact with operations in
> the new design? One of the things that I found when I looked into this
> is that the current interface is awkward and hard to abstract since it
> has to resolve to a set of rows that get operated on when being able to
> express it as the equivalent of a SELECT clause would allow you to push
> the filtering down into the database engine.

Yes, the existing architecture is not suitable for the migration since
it relies on the visitor pattern, which does not fit with the API
exposed by Spark and other frameworks.

Here is a summary of the current architecture:

Facets declare a subclass of FacetState, which represents statistics
aggregated over rows. For instance, for a text facet, it stores the list
of values seen in a set of rows. They provide the following methods
around it:
- a method to update a facet state after processing one row
- a method to merge facet states gathered over disjoint parts of the grid
- an initial facet state (when no row has been seen)
This makes it possible to compute the facet statistics in parallel over
multiple partitions of the grid.

- the GridState interface offers aggregateRows and aggregateRecords
methods which compute the final state of an aggregation using the
methods above. It is to the discretion of the implementation which
strategy is used to do this aggregation. The Spark-based implementation
uses Spark's own aggregation API, which is basically identical.

That lets you compute facet states. This does not let you compute
approximate facet states yet: for now, we are scanning the entire table.
I plan to introduce an API for approximate aggregation once the first
migration is done: for instance, we could ask the backend to aggregate
facet statistics for up to N rows (N being configurable of course) and
report the statistics as percentages to the user.

>
> Is there an analysis of the current operations with a breakdown of which
> ones are easy / hard / impossible to migrate to the new architecture?
> Now would be the time to drop any particularly awkward operations which
> inhibit an efficient backend architecture.

My goal is to keep all operations. The inefficient ones will still be
around, even if they will remain inefficient: for most users who deal
with small datasets, inefficiency is not a concern and I would not want
to reduce the set of features we offer to them.

For instance, consider the "key-value columnize" transform (or in fact
any transpose operation we offer). The logic of those operations are
quite complex, and there is no clear way how to formulate this with the
Spark API.

This month I have been migrating those, simply by keeping the existing
code (modulo bureaucratic refactorings). That means the operation is run
on a single node (the "executor" in Spark terms), not parallelized at
all, as inefficient as it is in 3.x.

By having a close look at these operations, of course I have ideas of
things I would like to improve there (or have doubts about their general
usefulness). But for now I want to keep the scope of my changes small: I
do not change the behaviour of operations, I only migrate them to the
new architecture.

More broadly this is even true for the records mode: it would have been
nice to get rid of it and replace it by a better way to represent
hierarchical data, before doing this migration. But we do not have a
good replacement solution for it as far as I know, and I would not
attempt to do that in the same go either.

My current goal is to migrate all existing functionality to the new
architecture, to get to a state where we have the same feature set and
comparable performance for small datasets.

>
> It looks like there's a package rename mixed in with the backend
> rearchitecture work. Was the rename done before everything else? If so,
> is there a tag or commit that can be used as a base to diff against?

Yes, the "4.x" branch has the package rename (and maven module
reorganization) without the Spark migration.

https://github.com/OpenRefine/OpenRefine/tree/4.x

Antonin
> <mailto:openrefine-dev%2Bunsu...@googlegroups.com>.
> --
> You received this message because you are subscribed to the Google
> Groups "OpenRefine Development" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to openrefine-de...@googlegroups.com
> <mailto:openrefine-de...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/openrefine-dev/CAE9vqEE8KdStHcfj2XWMCLJ5o5sd_2qGqH7ZrHzx%2BfCJh0vwPQ%40mail.gmail.com
> <https://groups.google.com/d/msgid/openrefine-dev/CAE9vqEE8KdStHcfj2XWMCLJ5o5sd_2qGqH7ZrHzx%2BfCJh0vwPQ%40mail.gmail.com?utm_medium=email&utm_source=footer>.

Tom Morris

unread,
Jun 21, 2020, 6:09:57 PM6/21/20
to openref...@googlegroups.com
On Sun, Jun 21, 2020 at 8:43 AM Antonin Delpeuch (lists) <li...@antonin.delpeuch.eu> wrote:

I realize I have not replied to this! Very sorry about that.

No worries. When I went to look at the thread, I noticed that I had a half-finished draft from last week that I never sent. It kind of rehashes some of what you covered, but I'll include it here as a starting point anyway:

Rereading some of the earlier discussion from the end of April, the details of which I kind of skimmed over in the flurry of discussion, I wanted to provide feedback on some of the perceived design constraints.
I don't take it as a given that functionality and form needs to be preserved 100%. In particular, when there's a "nothing else in the world operates quite this way" moment, I take that as a sign to take a step back and consider if OpenRefine should do it differently too. As an example, Fill Down and Blank Down, which require row state, exist principally as an artifact of how we create row groups aka "records". We could provide the same functionality by designating certain columns to be the grouping key (currently leading columns) and instead of using blank/null as the flag, say that any rows with the same set of values for those cells are in the same group/record. In the same vein, we should look at records with an eye towards what people use them for rather than as a literal implementation which can't be changed.
 
From an internal architecture point of view, if rather than having facets create a filtered row set to operate on we can have them generate a filter expression (a la SQL WHERE) that we can use to generate statistics and combine together to generate sets of rows to operate on. It's been years since I looked at it, but as I remember MassEdit was a candidate. It's kind of a generic catch-all that's used to implement a bunch of stuff.

So, picking up from there, someone (Flavio?) in one of the earlier discussions mentioned the idea of using a language as the decoupling mechanism. That really resonates with me, because it's guaranteed to be low bandwidth and really enforces a disciplined separation. It might be harder than using an API directly, but I think it could potentially yield benefits.

Your description below focuses on the statistical / visualization aspect of facets, which requires crunching a lot of data (all currently) to generate a summary to visualize. Going the other direction, though, where the facets are used as a selection mechanism, the bandwidth required is potentially much much less. A numeric facet with hundreds of histogram buckets boils down to just a min and max value on the selection side. Ditto Timeline facets. Text selections are value IN ['choice1', 'choice2'], etc.

At scale, the workflows are probably going to be very different as well. It won't make sense for a single user to try to "Cluster & Edit" millions of rows by themselves. I think that's one of the things MassEdit is used for, so reimagining that workflow could get rid of one of the needs for it.

I'll continue to try and familiarize myself with the prototype, but wanted to provide that high level feedback. Ideally everything would magically port over with no losses, but I'd rather end up with something useful that's incompatible than something that's 100% compatible, but crippled in how far it can scale, computationally and humanly, requiring more breaking changes later.

Tom

Antonin Delpeuch (lists)

unread,
Jun 21, 2020, 11:47:44 PM6/21/20
to openref...@googlegroups.com
This is likely to require us to drop quite a few operations, the records
mode, and probably many other things in GREL… If you go down that route
I think it would deserve to be a different codebase, so a different
project. This is because both versions of the codebase will need to be
maintained for a very long time (if OpenRefine 3.x remains your go-to
tool for small datasets and you only use OpenRefine 4.x for big datasets).

I think it is really possible to have a 4.x branch which does not
compromise on the usefulness of the tool at small scale, but makes it
possible to scale the workflows which only use scalable operations. And
also reduces the memory footprint of the tool for mid-sized datasets
(say 100k rows).

>
> Your description below focuses on the statistical / visualization aspect
> of facets, which requires crunching a lot of data (all currently) to
> generate a summary to visualize. Going the other direction, though,
> where the facets are used as a selection mechanism, the bandwidth
> required is potentially much much less. A numeric facet with hundreds of
> histogram buckets boils down to just a min and max value on the
> selection side. Ditto Timeline facets. Text selections are value IN
> ['choice1', 'choice2'], etc.

That is absolutely true, and this is exactly how the prototype works to
do the filtering. In fact, if you use it to run a workflow in a Spark
instance (instead of being the underlying backend of the web app), no
aggregations will be run since we do not need to compute facet states in
that context (except when the user explicitly relies on them with the
`facetCount` GREL function for instance).

This is not something that is easy to realize with the current prototype
because we compute facet states and displayed rows at the same time in
the UI (so you will see Spark running aggregation jobs every time you
change the grid and have some facets around), but should become clearer
when executing OR workflows outside of the web app.

>
> At scale, the workflows are probably going to be very different as well.
> It won't make sense for a single user to try to "Cluster & Edit"
> millions of rows by themselves. I think that's one of the things
> MassEdit is used for, so reimagining that workflow could get rid of one
> of the needs for it.

At the moment, our cluster and edit functionality is represented in the
workflows as a mass edit operation, which are indeed perfectly scalable.
So if you have a workflow that was worked out on a small dataset using
cluster and edit, it will run fine on a larger version of the same
dataset (but that dataset might contain new values that were not
reported in the clustering tool, so will be left untouched by your
operation).

I think people have already expressed the need to do it the other way
around in certain scenarios: the JSON workflow would contain the
clustering configuration, without listing explicitly the values that you
merge, and you would trust that this recipe works well in an open domain
(pretty much like you can do now with reconciliation if you trust the
scoring). It is something we could support too.

>
> I'll continue to try and familiarize myself with the prototype, but
> wanted to provide that high level feedback. Ideally everything would
> magically port over with no losses, but I'd rather end up with something
> useful that's incompatible than something that's 100% compatible, but
> crippled in how far it can scale, computationally and humanly, requiring
> more breaking changes later.

I think sticking with the current feature set is a good guarantee to
remain useful: this is the tool that is currently used, and it is
reasonably popular I would say.

I personally I do not think I have a mandate to get rid of fundamental
aspects of the tool like the records mode without a tried and tested
alternative. I would not call the tool "OpenRefine" anymore after that.
Not that I like the records mode at all (I would love to be able to do
without it). I am really worried that such a version would not be
adopted by our users and would simply end up gathering dust, hurting the
development by keeping two very different versions of the tool in
separate branches over an extended period.

But it is definitely an interesting exercise to list all the operations
we have and try to think how they would be formulated in SQL (not sure
if there are other languages you would consider as potential
interfaces?). For me, this would mean removing:
- the records mode
- blank down, fill down
- all three transpose operations
- split and join multi-valued cells
- the ability to store cells of different types in the same column,
probably?
- reconciliation and all related operations? (you could try encoding
reconciled cells as JSON blobs, but then manipulating that is going to
be quite painful I guess)
- custom expression language support would need to be restricted to
whatever is supported by the SQL backend (which is vendor-specific as
far as I am aware - I remember running some Python in Postgres, but
surely that does not work in the same way everywhere). And it would
probably force us to redesign some aspects of GREL too.
- probably other things I cannot think of at the moment!

All that being said: I am very keen to make progress on the discussion
around the records mode to identify its successor (and have similar
discussions on other aspects of the tool that we want to revamp). It
just requires a lot of design effort that we have not been able to put
in so far.

Antonin
> <mailto:li...@antonin.delpeuch.eu
> --
> You received this message because you are subscribed to the Google
> Groups "OpenRefine Development" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to openrefine-de...@googlegroups.com
> <mailto:openrefine-de...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/openrefine-dev/CAE9vqEEF56grUKYSy0V1gv2X5dK49j3qoTw9joip_6kcTT%2BVuA%40mail.gmail.com
> <https://groups.google.com/d/msgid/openrefine-dev/CAE9vqEEF56grUKYSy0V1gv2X5dK49j3qoTw9joip_6kcTT%2BVuA%40mail.gmail.com?utm_medium=email&utm_source=footer>.

Thad Guidry

unread,
Jun 22, 2020, 9:07:18 AM6/22/20
to openref...@googlegroups.com
Antonin,

SQL
I don't think all of that would necessarily need to be removed actually.

Since Postgres 9.5 (now version 12), it offers a lot for working with data and built in:

JSON (and fast as binary jsonb with caveats),
Records and Recordsets,
Pivoting, Transpose with Crosstab and Crosstabview,
and more.

Regarding reconciliation, it's quite easy to manipulate JSON with some now pretty powerful functions and query it in Postgres.
Are you thinking that manipulation of it would be hard in Java even when using JDBC driver?

Fill down, Blank down
are just means to an end (often to help with joining via cross() later)
But different folks have different needs, and the fill down/blank down operations that help with records creation do not always get used for that purpose, but instead users are concerned about some output format for other tool compatibility once they export.
Anyways, correlated subqueries can be used in SQL for reproducing the need for fill down or blank down, if we want to retain that for users.



All that being said: I am very keen to make progress on the discussion
around the records mode to identify its successor (and have similar
discussions on other aspects of the tool that we want to revamp). It
just requires a lot of design effort that we have not been able to put
in so far.

Antonin


Is it the UI itself and how to give users controls on visualizing, transforming, querying records that you struggle with here?
Or is it the internals and data models?

Antonin Delpeuch (lists)

unread,
Jun 22, 2020, 12:11:00 PM6/22/20
to openref...@googlegroups.com
On 22/06/2020 15:07, Thad Guidry wrote:
> Antonin,
>
> *SQL *
> I don't think all of that would necessarily need to be removed actually.

Great! I am easy to convince: just cook up a SQL schema to represent an
OpenRefine project, and then give some sample SQL queries which
represent the operations I listed.

>
> Is it the UI itself and how to give users controls on visualizing,
> transforming, querying records that you struggle with here?
> Or is it the internals and data models?

The records mode is fundamentally a data model problem. The question is:
how should users represent hierarchical data in OpenRefine projects? For
instance, how could workflows like this one be done without the records
mode:
https://www.wikidata.org/wiki/Wikidata:Tools/OpenRefine/Editing/Tutorials/Working_with_APIs

Antonin

Thad Guidry

unread,
Jun 22, 2020, 1:58:49 PM6/22/20
to openref...@googlegroups.com
Great! I am easy to convince: just cook up a SQL schema to represent an
OpenRefine project, and then give some sample SQL queries which
represent the operations I listed.


Sorry, I'm having "an episode", Antonin, and my points are getting missed because I need to slow down and take time to explain more concretely perhaps.
Postgres offers a lot for working with schema-less data now but SQL operations (or even Postgres JSON functions) might not be the best way to handle data transformations as OpenRefine can.
But let me further explain.

>
> Is it the UI itself and how to give users controls on visualizing,
> transforming, querying records that you struggle with here?
> Or is it the internals and data models?

The records mode is fundamentally a data model problem. The question is:
how should users represent hierarchical data in OpenRefine projects? For
instance, how could workflows like this one be done without the records
mode:
https://www.wikidata.org/wiki/Wikidata:Tools/OpenRefine/Editing/Tutorials/Working_with_APIs

Antonin


For OpenRefine, we typically do not think schema, that's too rigid and OpenRefine operations and data model were actually originally designed with schema-less "ideas" even though we never said the words "schema-less".
The original thoughts were that schema didn't have to be dealt with much until Export time (and maybe a little at Import time).
See original Gridworks code and issues around Freebase schema skeleton alignment (flexible data model --> rigid data model), etc. and some of David's papers on web data extraction and flexible data modeling and presentation such as the snippet below. (and I would encourage you to read his thesis related-work if you haven't seen it already, a lot of it inspired various things in OpenRefine's data model and records mode)

Data publishing technologies can be compared along two orthogonal dimensions: flexibility of data modeling and flexibility of presentation. The former is a spec-trum ranging from rigid schemas that cannot be changed to general data models that can fit any information. The latter ranges from predetermined layouts and styles to completely customizable presentations. Figure 2.1 lays out this space and frames the discussion that follows

Some of the other inspirations David took from me through chats with him because he knew that I had a background in Data Architecture in the Enterprise (ETL, database tech) and Library work that dealt with completely different sets of data problems that David often didn't have exposure to, where he and I often shared experiences around data modeling, use cases, and trying to get weird messy data of various forms into Freebase much more easily.

In my world, in the Enterprise, dealing with data the way that OpenRefine can is often handled with schema-less technologies such as Document stores rather than Table stores (both however can deal with relations and approaches for relations are different as you are well aware).

Elasticsearch is a popular one for schema-less, and supported with Hadoop and Spark for interaction.
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/float.html

MongoDB has some rough edges still to this day and I don't recommend it.

Postgres can do schema-less with JSONB, or JSON, or very roughly HStore.
My mention before on other threads about Apache Flink possibly being a good fit was in context of Elasticsearch where I have seen multiple times them being used to handle nested and out-of-order streaming joins, and always thought that was powerful, but might not always be needed for users or users of OpenRefine as its workflows currently exist.  But I now think and agree with you that Apache Flink has a place in ETL but perhaps not within OpenRefine style processing.

If I and David were chatting about this today, knowing what we know, and seeing what is available, I think we would definitely be looking at JSON in some form in order to deal with schema-less, flexible data modeling.
Which technology or technologies combined lend themselves well to flexible data modeling and operations on those data models... I have my preferences (Elasticsearch) much for the same reasons that Siren choose it (<-- Important to read through please)(where David is an advisor now), but we'll need to rethink a richer UI in OpenRefine for flexible presentation.

Antonin Delpeuch (lists)

unread,
Jul 2, 2020, 8:45:19 AM7/2/20
to openref...@googlegroups.com
Hi all,

Here is a progress update on this. I have just pushed an up-to-date
snapshot of my refactoring in the "spark-prototype" branch (again,
force-pushed, because I keep rebasing the last commit which deletes the
parts that have not been migrated yet).

What is new this month:

1. Support for sorting

I have restored the sorting functionality by migrating it to the new
architecture. This means adding the relevant methods in the GridState
interface to:
- browse the grid using a sorting configuration rather than its natural
ordering
- reorder rows permanently (generating a new grid)

In terms of implementation with Spark, this is relatively
straightforward as it uses the sorting API for RDDs directly.

2. Migration of all operations and changes which do not fetch external data

That means all operations except:
- fetching URLs
- performing an initial reconciliation
- extending data based on reconciled values
- support for certain GREL functions (cross, facetCount) which fetch
data outside their evaluation context

The new supported operations are:
- fill down
- blank down
- reorder rows
- remove rows
- transform column
- add column based on this column
- judge one cell
- judge similar cells
- clear recon for one cell
- clear recon for similar cells
- mark cells as new
- clear recon data for cells
- copy recon data across columns
- match each cell to its best candidate
- match each cell to a specific candidate
- reconcile by using values as identifiers
- star / flag one rows
- star / flag filtered rows

The majority of these operations are simple maps on rows, which did not
require extending the GridState interface. Even if the migration was
conceptually easy, it took some time since most of these operations did
not have any tests associated to them, so I had to write tests as I
migrated the code.

Another side effect of the migration is the conversion of classes in the
datamodel to be immutable (mainly Row, Cell, Recon, ReconCandidate)
since this is required by the new functional architecture. This is
widely recognized as a good practice in Java in general. This also
incurred a migration effort, to migrate away from effectful to
functional code.

3. Planning for operations which rely on external data

I still have not migrated these yet because I want to get the
architecture right, in a way that allows for multiple use cases. I am
not trying to implement all of these use cases at once, but only to come
up with an architecture which is broadly compatible with all of them (in
spirit, at least):

- as an OpenRefine user, I initially want the same experience as in 3.x:
fetching external data is done only once, when the operation is first
applied. For instance, if I add a column by fetching URLs, then undo
this change, then redo it, I do not want to have to wait for the URLs to
fetch again: those results are already stored locally. Similarly, if I
use the cross function in one of my operations, then I delete the
project which was used by the cross function, I do not want this to
corrupt the data of the project in which I used the expression. This
behaviour is natural with the previous architecture, where all computed
values are persisted in change data, but not in the new architecture
which relies on lazy evaluation, so an explicit mechanism to handle
these scenarios must be introduced.

- when running an OpenRefine workflow in a pipeline (for instance as a
Spark job), I want external data to be fetched in a pipelined way: for
instance, if my workflow consists of reconciliation followed by data
extension, I expect that the corresponding HTTP requests will be done in
near-parallel: the first data extension request will not have to wait
for the last reconciliation request.

- similarly, if we implement runners for stream-based engines (such as
Flink), we could potentially run OpenRefine workflows on streams (such
as Kafka sources). Not all operations make sense in this setting (for
instance, reordering rows using a sorting criterion is not possible
without adding a windowing strategy), but this could still be useful
down the line. In this setting, if I am fetching URLs, I obviously want
these to be fetched as the stream is consumed.

- in the near future, we also want to be able to persist partial URL
fetching results (or reconciliation results) such that we can recover
from a crash during a long-running operation more efficiently, and also
let the user inspect partial results before the operation completes
(typically, for reconciliation: being able to inspect the first few
reconciliation candidates that are returned, and potentially start
judging them before all other candidates are fetched).

Do you agree with these use cases? Are there others I should keep in mind?

The architecture I am planning to go for, at this stage, is to introduce
a class to sit between the Change and the external data source, which
acts as a cache that can be persisted in the HistoryEntry. The main
question is how tightly this should be tied to the datamodel interface
(which is then implemented by Spark or other providers).

If we have stored the result of a URL-fetching operation on disk and we
want to compute the state of the project after that, we essentially need
to do a join between the previous state and the fetched URLs. For this
join to be efficient, it really helps if the fetched URLs are stored as
a Spark RDD which should be co-partitioned (i.e. also ordered by row id
and split into partitions with the same boundaries). This should ideally
avoid any shuffle and match the current performance on small datasets.
And of course this needs to be independent from Spark.

While I am still working on this architecture, my goal is to migrate the
remaining classes:
- facets
- exporters
- importers
- clustering
The goal is to reach a stage where I can merge the backlog of commits
that have accumulated on the master branch since I branched off to work
on the new architecture (this is going to be a lot of fun) and then stay
up to date by cherry-picking changes as they are merged in master.
Having a complete feature set soon will help run arbitrary workflows on
this branch, which is necessary for thorough debugging.

I am also seeking feedback from Spark experts about these design
choices, to make sure we have enough eye balls on this migration.

Thad Guidry

unread,
Jul 2, 2020, 11:19:37 AM7/2/20
to openref...@googlegroups.com

- when running an OpenRefine workflow in a pipeline (for instance as a
Spark job), I want external data to be fetched in a pipelined way: for
instance, if my workflow consists of reconciliation followed by data
extension, I expect that the corresponding HTTP requests will be done in
near-parallel: the first data extension request will not have to wait
for the last reconciliation request.

This sounds similar to Apache Nifi's Process Group (in Flow Based Programming this is called a subnet where processing occurs through nodes or stations that have dependencies on each and grouped together as a Process Group).
It might make sense to see a Process Group as a Spark Job?  Where each Process in Spark terms is a task or Executor more appropriately (the node or station on an assembly line).
From Spark and working with parallelized collections:
The behavior of the above code is undefined, and may not work as intended. To execute Jobs, Spark breaks up the processing of RDD operations into tasks, each of which is executed by an executor. Prior to execution, Spark computes the task’s closure. The closure is those variables and methods which must be visible for the executor to perform its computations on the RDD (in this case foreach()). This closure is serialized and sent to each executor.
How do you envision in Spark handling your 2 part process (reconciliation + data extension) would that be 2 tasks in a Job?

One thing to note that is very useful in Apache NiFi is the idea of buffering with Back Pressure (slowing down previous Processors in the pipeline so that later Processor's can catch up, i.e. slowing down reconciliation because data extension is even slower for whatever reason).
I often need back pressure available and I am wondering if you have thoughts to add something like that and how that might affect the architecture strategy?


- similarly, if we implement runners for stream-based engines (such as
Flink), we could potentially run OpenRefine workflows on streams (such
as Kafka sources). Not all operations make sense in this setting (for
instance, reordering rows using a sorting criterion is not possible
without adding a windowing strategy), but this could still be useful
down the line. In this setting, if I am fetching URLs, I obviously want
these to be fetched as the stream is consumed.

- in the near future, we also want to be able to persist partial URL
fetching results (or reconciliation results) such that we can recover
from a crash during a long-running operation more efficiently, and also
let the user inspect partial results before the operation completes
(typically, for reconciliation: being able to inspect the first few
reconciliation candidates that are returned, and potentially start
judging them before all other candidates are fetched).

Do you agree with these use cases? Are there others I should keep in mind?


Appending rows to an existing OpenRefine project?  I have this need as well often, and also other users, where new data is already aligned to existing data in an OpenRefine Project and we want to analyze and transform those aligned datasets as one large dataset or collection in OpenRefine Project.  I collect data from lots of disparate places around the internet, constantly aligning and adding to a growing dataset.  In that context, I want to use a single OpenRefine Project(not many separate projects), and import additional rows of aligned data, reconcile the new rows, do other things for a week, open that OpenRefine Project back up the following week, append some new rows into that same Project and reconcile only those new rows, transform some of them(a filter then transform some values).

With Spark, I imagine this can be handled in various ways.
1. UnionRDD?
2. Treat new data as a child RDD of the parent RDD?  Does Spark's sortByKey() and then zip and zipPartition apply here or not?  If we checkpoint the RDD, then references to its parent RDDs are removed; what then? Maybe we don't want checkpointing at all in this case https://stackoverflow.com/questions/35127720/what-is-the-difference-between-spark-checkpoint-and-persist-to-a-disk

So, I'm unsure of the best approach in Spark for that use case in issue #715 and hope you have a way to solve that use case for me and them.
Quote from that issue:
I would like to be able to load a new Excel/CSV file into the existing OpenRefine dataset and do cleaning on it. Ideally, this new dataset would come in as new rows that I can then start manipulating.
 
 
The goal is to reach a stage where I can merge the backlog of commits
that have accumulated on the master branch since I branched off to work
on the new architecture (this is going to be a lot of fun) and then stay
up to date by cherry-picking changes as they are merged in master.
Having a complete feature set soon will help run arbitrary workflows on
this branch, which is necessary for thorough debugging.


"fun" - I like your optimism here.  Don't lose that :-)

Thad Guidry

unread,
Jul 2, 2020, 11:31:46 AM7/2/20
to openref...@googlegroups.com
Actually thinking after I replied...
The appending rows problem... hmm, it technically doesn't have to be appended at all.  The need is just "doing stuff with multiple datasets that are aligned...at one time"
Imagine if we COULD keep the OpenRefine Projects separate and still have a way to put some into a Collection to do stuff across all of them as a whole at one time.
W could work with them separately or as a collection.
In Apache NiFi, that's how it works... each OpenRefine Project is essentially a Flowfile.

It would be nice to have OpenRefine 4.0 work similarly.
That way we can keep individual metadata and provenance about each OpenRefine Project, but still be able to treat several as a large Collection to extend and transform as a collective whole.


Antonin Delpeuch (lists)

unread,
Aug 8, 2020, 12:32:39 PM8/8/20
to openref...@googlegroups.com
I had a look at Apache Nifi but did not end up replying here, here are a
few thoughts:

On 02/07/2020 17:19, Thad Guidry wrote:
> One thing to note that is very useful in Apache NiFi is the idea of
> buffering with Back Pressure (slowing down previous Processors in the
> pipeline so that later Processor's can catch up, i.e. slowing down
> reconciliation because data extension is even slower for whatever reason).
> I often need back pressure available and I am wondering if you have
> thoughts to add something like that and how that might affect the
> architecture strategy?

As things stand, there is no need for back pressure in OpenRefine, since
we don't even have support for concurrent operations.

When running a workflow in a stream-based runner, that will become
relevant. With the current implementation I have, the back pressure will
be immediate (no buffer between consecutive operations). But it should
be doable to introduce some buffering (in Spark, this could be
implemented as a RDD which prefetches items from its parent up to a
certain buffer size. But I do not think this is a priority since a lot
more work is needed to bring concurrent operations in the tool itself.

>
>
> - similarly, if we implement runners for stream-based engines (such as
> Flink), we could potentially run OpenRefine workflows on streams (such
> as Kafka sources). Not all operations make sense in this setting (for
> instance, reordering rows using a sorting criterion is not possible
> without adding a windowing strategy), but this could still be useful
> down the line. In this setting, if I am fetching URLs, I obviously want
> these to be fetched as the stream is consumed.
>
> - in the near future, we also want to be able to persist partial URL
> fetching results (or reconciliation results) such that we can recover
> from a crash during a long-running operation more efficiently, and also
> let the user inspect partial results before the operation completes
> (typically, for reconciliation: being able to inspect the first few
> reconciliation candidates that are returned, and potentially start
> judging them before all other candidates are fetched).
>
> Do you agree with these use cases? Are there others I should keep in
> mind?
>
>
> Appending rows to an existing OpenRefine project?

That is doable, but mostly independent from the change of architecture.
It is something you can implement both with the old and new architecture.

Antonin

Thad Guidry

unread,
Aug 8, 2020, 12:38:26 PM8/8/20
to openref...@googlegroups.com
Antonin,

Thanks for looking into this and letting me know more.
Appreciate it!



--
You received this message because you are subscribed to the Google Groups "OpenRefine Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to openrefine-de...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/openrefine-dev/5f43ef04-94ed-0d02-5234-7938a876898f%40antonin.delpeuch.eu.

Antonin Delpeuch (lists)

unread,
Aug 8, 2020, 2:52:31 PM8/8/20
to openref...@googlegroups.com
Hi all,

Here is a new update on this front.

I have finally found an architecture for long-running operations (URL
fetching, reconciliation, data extension) which I am reasonably happy with.

The goals were:
- to replicate the current behaviour of the tool despite the new
architecture based on lazy evaluation (data fetched in an operation is
fetched only once)
- to make it possible to run operations efficiently in the context of a
pipeline (for instance with Spark), where we want to break the barriers
created by the fetching and serialization of external data (see previous
message)

The proposed architecture is as follows.

We introduce a new interface "ChangeData", which represents the external
data fetched by an operation. This data is indexed by row ids. Just like
"GridState", the implementation of this interface is provided by the
pluggable data model implementation (which means that it can be
manipulated efficiently without loading everything in memory).

https://github.com/OpenRefine/OpenRefine/blob/spark-prototype/or-model/src/main/java/org/openrefine/model/changes/ChangeData.java

The API looks like this:
- given a GridState object, one can generate a ChangeData object on its
rows by mapping a function which can fetch external data (such as making
HTTP requests). Batching is supported (which is important for
reconciliation and data extension).
- given a GridState object and a ChangeData object, one can join the two
to obtain a new GridState, where the change data has been used to update
the grid (such as reconciling a column)

https://github.com/OpenRefine/OpenRefine/blob/spark-prototype/or-model/src/main/java/org/openrefine/model/GridState.java#L277-L299

ChangeData objects can of course be persisted to disk and restored, with
an API similar to that of GridState.

In addition, projects are associated with a ChangeDataStore which holds
all the change data objects associated with operations in the project
history.

https://github.com/OpenRefine/OpenRefine/blob/spark-prototype/or-model/src/main/java/org/openrefine/model/changes/ChangeDataStore.java

This interface has two implementations, and we can choose between the
two to switch between the two behaviours stated in the goals above:
- the FileChangeDataStore persists all the change data it stores into
the project directory. This is the default one, used in the interactive
tool;
- the LazyChangeDataStore does not persist its change data objects.
Instead it holds them in a simple Map. This is used when running
operations in an external pipeline.

The implementation of a long-running operation follows a simple workflow.
1. In the process associated to the operation, create a ChangeData
object from the GridState of the project.
2. Then, register the change data in the store.
3. Finally, return a Change object which applies as follows:
* Load the ChangeData object back via the data store
* join it with the current grid
* return the modified grid.

https://github.com/OpenRefine/OpenRefine/blob/spark-prototype/main/src/main/java/org/openrefine/operations/column/ColumnAdditionByFetchingURLsOperation.java#L425-L469

When using FileChangeDataStore, this workflow will fetch all the
external data at the second step (since it is required to persist the
data to disk).

When using LazyChangeDataStore, none of these steps actually require
fetching the external data: this fetching will only be triggered later
on when the grid state is evaluated. Those calls will be intertwined
with other external calls, in a streaming fashion.

This also leaves the door open to resumption of interrupted long-running
operations. If we add enough metadata to the serialization of change
data, then we should be able to start again the fetching process after
an incomplete attempt.

This architecture should also be compatible with running independent
operations concurrently. (By compatible I mean that it should not be
hard to make it evolve to support that - it is by no means supported as
of today).

The remaining steps are:
- add back support for progress reporting and cancellation. The latter
is a bit tricky to do properly because Spark does not offer an
asynchronous interface to save operations, making it hard to cancel
those tasks.
- migrate the remaining operations