New Spark-based prototype

102 views
Skip to first unread message

Antonin Delpeuch (lists)

unread,
Apr 26, 2020, 1:58:36 PM4/26/20
to openref...@googlegroups.com
Hi all,

I have pushed a new version of the prototype of OpenRefine running on
Spark. I unfortunately have to force-push this branch
("spark-prototype"), so you might want to delete the branch on your side
if you have previously checked it out.

Packaged distributions are here:

- http://pintoch.ulminfo.fr/a0c60c72ee/openrefine-win-spark.zip

- http://pintoch.ulminfo.fr/5b42e2f945/openrefine-linux-spark.tar.gz

- http://pintoch.ulminfo.fr/cf0b0e601c/openrefine-spark-mac-spark.dmg

Here is an overview of the main things that changed:

1. Spark import tab

I added a "Spark" import tab, where you can input a Hadoop URI to be
imported by Spark (it currently only accepts a single file at a time but
it could support more, like the other tabs). The idea behind this is to
let the user import a dataset directly via Hadoop's file system. The
existing file-based import tab cannot be used for this since it involves
uploading the dataset via the user's browser to the server: this is not
workable for large datasets even if they are stored locally. The import
tab for URL downloads is also not workable for large datasets, since it
downloads the entire file locally before importing it. That being said,
because the two import tabs are conceptually similar, we could consider
exposing them as a single tab to the user, and figuring out based on the
URL whether it should be downloaded locally or input through Spark.

I have only tested this tab with locally stored files for now. It still
makes sense to use this tab for large locally stored files since it
avoids one round-trip through the web browser, server, and back to the
file system.

In terms of user experience, the slight downside with this tab is that
we cannot use a standard "Browse" button - since it is not possible to
retrieve the full path of a selected file in Javascript (for obvious
security reasons). So it sadly has to be a plain text input even for
local files. I don't think we can work around that as long as OpenRefine
remains a web app (perhaps with something like Electron?), but then
again this should mostly be used with HDFS paths anyway.

Currently, projects imported through this Spark tab will still be
replicated in the workspace (so, on the local file system). So, this
means although you could use this tab to work on a S3 dataset, you will
actually retrieve the full dataset locally as soon as the project is
saved. It is possible to lift this restriction: we could provide an
option to work fully remotely on the dataset. This means that the saved
project would only contain a pointer to the original source. If
implemented this should obviously be controllable by the user. If the
dataset is stored far away, this will obviously incur noticeable
performance costs, but I believe this could be useful when connecting
OpenRefine to a local Spark cluster, where both the executors and the
data live.

2. Spark-based importers

I have added a new importer, for fixed-width files. This one uses a new
importer architecture which reads datasets natively with Spark, instead
of loading the entire dataset in the JVM and then sending it to Spark.
This new architecture is necessary to get the most of the Spark import
tab described above.

The CSV/TSV importer is currently still with the previous architecture,
which is much less efficient. Spark provides a CSV importer which we
could use: this would provide the same performance as the fixed-width
importer. However the devil is in the detail: Spark's CSV/TSV importer
does not support the same sort of options than what we currently use, so
we might have to keep both. I don't think we want the user to choose
between two CSV importers, so we should pick the appropriate one based
on the import options and input method (always use Spark's importer when
using the Spark import tab, for instance).

3. More efficient serialization

The first prototype used plain Java serialization to write project data,
which was very inefficient. I have changed it to use something pretty
similar to what OR 3.x uses: rows serialized in JSON, one per line, in a
gzip-compressed text file. This has the big advantage of being
completely independent from Spark, so project data can be read with
other code bases. It is also more robust to version changes, since we
completely control the JSON format of our model (which has been covered
by tests for a few releases now).

This also makes it easier (or at least, more imaginable) to provide a
migration route for projects saved with OR 3.x, since it would be
totally doable to serialize the projects in a similar format with OR 3.x
(such that they are forward-compatible). It is also imaginable for the
Spark-based OR to read existing OR 3.x projects (those serialized right
now with OR 3.3) without being able to move in the history.

4. Records mode

My first prototype did not support the records mode, I have added that
back. Supporting the records mode is a bit challenging since there is no
equivalent to it in most database systems as far as I am aware. To
support it, I had to define custom RDDs, to transform a RDD of rows into
a RDD of records in an efficient way (see RecordsRDD in the
spark-prototype branch). The main difficulty is that records can be
spread across partition boundaries, so the transformation cannot be a
pure map of partitions. So a small Spark job is required to stitch back
split records at partition boundaries. This seems suitably efficient so
far. This also required ensuring that the sorting and indexing
information is retained in the process: we want to be able to quickly
fetch records in the relevant partitions based on their indices (by
default you would scan irrelevant partitions as well). Although RDDs are
order-aware, Spark put surprisingly little emphasis on actually
preserving this order, so it is easy to lose this precious information
(perhaps because a lot of the use cases for Spark are around collections
where order does not matter). This involved working around some known
issues in Spark and submitting a PR to Spark
(https://github.com/apache/spark/pull/28293, which has been accepted but
will sadly not be released any time soon as far as I can tell - but
thankfully the issue can be fixed on our side without patching Spark).

Note that in this prototype, the index of a record is the index of the
first row where it starts, unlike in 3.x and before, where it is simply
the number of records before it plus one. It is totally feasible to
adjust this to match the existing behaviour in an efficient way.

5. Small fixes

I tweaked various things, such as adding Windows native Hadoop binaries
in the distribution, to fix #2313.

6. My conclusions so far

The records mode was one of the main things I was worried about in this
project, and I am quite happy that it fits pretty neatly in Spark in the
end. So it removes a pretty big road block.

The fact that we really have to interface with Spark at a pretty low API
level means that there is definitely no hope of refactoring this to go
through Beam, as I am relying for this on data model aspects that are
really specific to Spark. That being said, Tom's suggestion to make the
data backend pluggable is increasingly appealing to me. It would mean
developing our own interface, which operations and changes would
interact with to implement the transformations, without being tied to a
specific platform. We would have a Spark implementation, and potentially
a fully in-memory implementation which could be faster on small datasets
by avoiding the task scheduling and serialization costs. It is
definitely a big chunk of work, but it could potentially be the only
option we have if we realize later on that Spark brings in latency that
is not acceptable for workflows on small datasets.

As always, I am keen to get your feedback on any of this.

Cheers,

Antonin



Thad Guidry

unread,
Apr 26, 2020, 4:31:45 PM4/26/20
to openref...@googlegroups.com
I'm surprised that you said there is no equivalent to a record in any database system?
Many databases have record support.
What I think you mean is the concept of hierarchical fields (also simply known as related fields).
In RDBMS, the use of a row structure as the group holding related fields together, forms a record. (flat records)
In Document systems, the use of a document structure as the group holding related fields together, forms an individual document record, like in MongoDB and CouchDB and Elasticsearch.
In Columnar systems, the use of a column itself forms a continuous series of values together.  The primary key is the data itself, mapped into rowids, like Vertica, etc. Records are formed by [1][2]

The use cases (our Column operations with mass transformations) in OpenRefine are more closely aligned to an OLAP workload where traditionally Columnar systems (or column serialized structures) have been used.

[1] https://en.wikipedia.org/wiki/Column-oriented_DBMS
Fast retrieval of data is always due to Indexes, either by the innate structure itself, like Columnar systems, or by creating tons of them efficiently, like Elasticsearch does.

That all being said above...
I still feel that Apache Spark is not the right approach for us.
I still feel that Apache Flink is a better fit for us with both its Dataset API and affords of graph processing with its Gelly API, and not to mention Checkpoints and Savepoints. (and later we can have SQL support since it has Table API and SQL through Apache Calcite)

https://flink.apache.org/flink-architecture.html
https://flink.apache.org/flink-applications.html
https://flink.apache.org/flink-operations.html

I truthfully feel you are a rockstar, Antonin, for pushing forward on this PoC, and it is useful in it's own right, either to continue, or to evolve and adapt.
(btw, with Apache Flink you can even get nice generic type support built-in via Kyro... or you can have your wonderful user-defined types via TypeInfoFactory.  No waiting around for Apache Spark team. ;-)



Antonin Delpeuch (lists)

unread,
Apr 26, 2020, 5:49:56 PM4/26/20
to openref...@googlegroups.com
Hi Thad,

Of course the word "record" is used a lot of database systems - but that
does not mean they have anything in common.

Flink could be interesting in the future to run OpenRefine workflows on
streams (which I would be keen to have), but that is a significant step
away from what OR currently does. Aiming for a data model that is even
further away from what OpenRefine currently is would make a migration
even riskier. I don't think we have the time or resources to venture
into that right now. The leap by going to Spark is big enough already.
If we end up making the data processing backend pluggable then why not
develop a Flink implementation, but I do not think that should be a
priority.

I know you would be keen to "have SQL support", whatever that actually
means - it's easy to ask for some software to "support" some technology,
it's something else to come up with a meaningful integration that
actually helps people in their workflows. Remember Data Packages…

If you mean being able to run a SQL query on an OpenRefine project (or
multiple projects, perhaps, with joins between them), then this is
something we could do with most engines, including Spark (and again, the
availability of user defined types is not going to be a blocker for
that). But I am yet to be convinced that this is something worth
developing in OpenRefine: we need to have a clear vision of which user
workflows we are talking about. *Concrete* workflows :)

I know you don't like when I ask for something "concrete", but that is
what it really comes down to: you cannot plan migrations like this on
the basis of a sales pitch enumerating the technologies supported by
some platform. You need to have a look deep into the architecture to
understand how that platform could fit in and cater for our users' needs.

Antonin


On 26/04/2020 22:31, Thad Guidry wrote:
> I'm surprised that you said there is no equivalent to a record in any
> database system?
> Many databases have record support.
> What I think you mean is the concept of hierarchical fields (also simply
> known as related fields).
> In RDBMS, the use of a *row* structure as the group holding related
> fields together, forms a record. (flat records)
> In Document systems, the use of a *document* structure as the group
> holding related fields together, forms an individual document record,
> like in MongoDB and CouchDB and Elasticsearch.
> In Columnar systems, the use of a *column* itself forms a continuous
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/types_serialization.html>
> via TypeInfoFactory.  No waiting around for Apache Spark team. ;-)
>
> Thad
> https://www.linkedin.com/in/thadguidry/
>
>
> --
> You received this message because you are subscribed to the Google
> Groups "OpenRefine Development" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to openrefine-de...@googlegroups.com
> <mailto:openrefine-de...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/openrefine-dev/CAChbWaML_zgmWgX4f%3Dzh%3Dtk5UfDXLjceXmst3vOpgL2aK7n5UQ%40mail.gmail.com
> <https://groups.google.com/d/msgid/openrefine-dev/CAChbWaML_zgmWgX4f%3Dzh%3Dtk5UfDXLjceXmst3vOpgL2aK7n5UQ%40mail.gmail.com?utm_medium=email&utm_source=footer>.

Thad Guidry

unread,
Apr 27, 2020, 8:47:02 AM4/27/20
to openref...@googlegroups.com
On Sun, Apr 26, 2020 at 4:49 PM Antonin Delpeuch (lists) <li...@antonin.delpeuch.eu> wrote:
Hi Thad,

Of course the word "record" is used a lot of database systems - but that
does not mean they have anything in common.

Flink could be interesting in the future to run OpenRefine workflows on
streams (which I would be keen to have), but that is a significant step
away from what OR currently does. Aiming for a data model that is even
further away from what OpenRefine currently is would make a migration
even riskier. I don't think we have the time or resources to venture
into that right now. The leap by going to Spark is big enough already.
If we end up making the data processing backend pluggable then why not
develop a Flink implementation, but I do not think that should be a
priority.

Flink supports batch processing, not only streams.  You may not be aware of that.  It was contributed to by Alibaba known as Blink and now merged into Flink and considered one of the fastest and used today by Ericsson, Capital One, etc.
But most of our users won't ever need to process incredibly large data sets.  But just large oftentimes.  There are several advantages which I'll mention later.
I understand that the leap and time and effort you are putting forth is already monumental for us, and I truly appreciate and stand in awe at this effort.  It does not go unnoticed.
Agree if the data processing backend is pluggable that would afford the most flexibility as Tom has mentioned.

I know you would be keen to "have SQL support", whatever that actually
means - it's easy to ask for some software to "support" some technology,
it's something else to come up with a meaningful integration that
actually helps people in their workflows. Remember Data Packages…


Data Packages was a Google push, not mine.
Since Google was active within W3C in pushing standards like CSV on the Web, etc., which is semi-successful now and Google and others leverage some of that and offer services back such as the very cool indeed Google Dataset Search.
Which many of our users utilize to find additional data, wrangle it, and contribute back to efforts like Wikidata.


If you mean being able to run a SQL query on an OpenRefine project (or
multiple projects, perhaps, with joins between them), then this is
something we could do with most engines, including Spark (and again, the
availability of user defined types is not going to be a blocker for
that). But I am yet to be convinced that this is something worth
developing in OpenRefine: we need to have a clear vision of which user
workflows we are talking about. *Concrete* workflows :)


Many of our users are already familiar with SQL, this is true in GLAM (where traditionally RDBMS's are used) and Newsrooms (Propublica, etc.)
I am not pushing for SQL usage in OpenRefine, but merely stating as I have in the past that its just a "nice to have, but not required".
I know you don't like when I ask for something "concrete", but that is
what it really comes down to: you cannot plan migrations like this on
the basis of a sales pitch enumerating the technologies supported by
some platform. You need to have a look deep into the architecture to
understand how that platform could fit in and cater for our users' needs.


Sorry I offended you somehow.  Perhaps you are hinting that I am wasting your time with indirection's.  That was not my intent.
I am very cognizant of your time as well as others.
I have considerable time available now to spend in research, asking others within any community, and gathering feedback on any architectures.
I had looked quite deeply into Apache Flink, since I was already familiar with it's capabilities, fitting use cases, and API's available.

Just a few interesting areas of Flink that you might not be aware of:
Flink supports batch processing, on bounded datasets, not only streams.  This is handled both by the DataSet API as well as Table API & SQL.
The DataSet API has a plethora of dataset transformations that are useful to OpenRefine users and that correlate to its existing functions, some not found in Spark.
  • Notably, Records mode can be handled via Grouped DataSet transformations in Flink, where grouping is by user-defined functions and expressions.
  • Various keyed operations in OpenRefine like cross() for instance, have a corollary in Flink via CoGroup functions which are even more powerful.
  • deduplication can be handled by columns, expressions, or key values.
  • Some of OpenRefine's sequenced operations (intermediate steps that need to be performed first that filter on large data, then finally apply a transformation).
    • These workflows can directly leverage Flink's GroupCombine on a Grouped DataSet where the advantage over Spark is that memory management is automatic, and no data exchange needed between partitions.
Regards,

Thad Guidry

unread,
Apr 27, 2020, 9:24:33 AM4/27/20
to openref...@googlegroups.com
Let me also share some of our past discussion and research before you joined OpenRefine.
Note the below thread was from 2017, prior to the Blink merge and additional niceties added to the DataSet API and Table API & SQL.



Antonin Delpeuch (lists)

unread,
Apr 27, 2020, 9:48:46 AM4/27/20
to openref...@googlegroups.com
Hi Thad,

Thanks for the link to the previous discussion.

On 27/04/2020 14:46, Thad Guidry wrote:
> Flink supports batch processing, not only streams.

Yes, I am aware that Flink supports batch and stream processing.
Spark and Flink are very similar in many respects, see below.
> Many of our users are already familiar with SQL, this is true in GLAM
> (where traditionally RDBMS's are used) and Newsrooms (Propublica, etc.)
> I am not pushing for SQL usage in OpenRefine, but merely stating as I
> have in the past that its just a "nice to have, but not required".

You are very welcome to push for an idea but please can it be more than
"X support" or "X usage" where X is some technology? Forget about the
feature lists of big data frameworks. As a user, what do you want to do?

>
> Just a few interesting areas of Flink that you might not be aware of:
> Flink supports batch processing, on bounded datasets, not only streams. 
> This is handled both by the DataSet API
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/batch/> as
> well as Table API & SQL
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/>.
> The DataSet API has a plethora of dataset transformations that are
> useful to OpenRefine users and that correlate to its existing functions,
> some not found in Spark.
>
> * Notably, Records mode can be handled viaGrouped DataSet
> transformations
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/batch/dataset_transformations.html#transformations-on-grouped-dataset>
> in Flink, where grouping is by user-defined functions and expressions.

Initially, I was also planning to implement the records mode via Spark's
grouping functions (which are very similar - I don't see anything that
cannot be done in Spark in your link).

The problem with this approach is that it is very inefficient, because
going through a generic grouping API will forget about the locality of
records, which is crucial to optimize this computation. This will be
inefficient in Flink too.

Imagine you have a function which computes for each row its record id,
and then group rows together using this function. The algorithm which
will do this grouping is not aware that rows belonging to the same
record are consecutive, so it will work very hard to make sure that if a
row at the beginning of the dataset has the same record id as one at the
end (potentially stored in different partitions), they will be reunited.

The solution I have in Spark could probably be implemented in Flink too,
I suspect, although I haven't checked that.

> * Various keyed operations in OpenRefine like cross() for instance,
> have a corollary in Flink via CoGroup
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/batch/dataset_transformations.html#cogroup>
> functions which are even more powerful.

Spark has that too, although it is definitely not how I plan to
implement cross.

> * deduplication
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/batch/dataset_transformations.html#distinct>
> can be handled by columns, expressions, or key values.

Spark has that too.

> * Some of OpenRefine's sequenced operations (intermediate steps that
> need to be performed first that filter on large data, then finally
> apply a transformation).
> o These workflows can directly leverage Flink's GroupCombine on a
> Grouped DataSet where the advantage over Spark is that memory
> management is automatic, and no data exchange needed between
> partitions.

I do not understand what you mean here.

Anyway, the point is: given how close Spark and Flink are, I don't
really see the point of switching. Flink puts streams first, Spark puts
batches first, both can do both streams and batches, are both based on
Hadoop, and so on.

What I would really like is that you try to change perspective and think
more about the user point of view, because this is what you do best.
Leave the choice of technical solutions to the programmers. As a user,
what do you want?

Overall, my goal in terms of UX is to make the change of backend
unnoticeable for the user. OpenRefine is popular to work on small
datasets and we cannot compromise this use case.

So it is critical that the new data processing backend adds very little
overhead in terms of task scheduling and serialization. That is my
number one priority at the moment. Spark's RDD API feels like a good
pick for this since there is basically no optimization going on at this
level, compared to Flink where there seems to be more indirections going on.

The problem with using a big data framework for small data is that all
the overheads of the framework become a lot more noticeable. So it's not
useful to look at feature lists and "Spark vs Flink benchmarks" or the
like - these comparisons are geared towards very different use cases.
Both of these frameworks will support large datasets, as they are
designed for that, but this is not the area where we should evaluate them.

That's exactly the point made here in the earlier thread:
http://mail-archives.apache.org/mod_mbox/flink-community/201706.mbox/%3cCAAdrtT1kJqqh_p8=-i-ChF4xLjYAdqCLU...@mail.gmail.com%3e

Antonin

Thad Guidry

unread,
Apr 27, 2020, 10:30:37 AM4/27/20
to openref...@googlegroups.com
You have the confidence here that Spark's RDD API is agreeable for us, and I sense you are far enough along that we should continue.  Thanks for listening.

My 2 main usage problems with current architecture:

1. Joins - joining large datasets via custom functions/keys.  (imagine joining Freebase Property values with Wikidata's statements)
2. Records mode - schema-based relational table columns are not ideal for this, and David's work on the Records overlay was just a stepping stone, it was where we stopped, but wasn't our final destination.  For instance, we always envisioned we would have similar ways to produce and work with records, as we kinda had within Freebase.  Freebase's multi-value properties were amazing to work with (document model-like structures built on graph edges), and our idea, although not realized by him and I, but still in the back of my mind, was being able to work with data in that same fashion (schema-less), easily generating records from the data via new GREL syntax or smart menu options, as well as querying records without worrying about thinking of joins.  Read more here:  Freebase multi-value properties  


Thad Guidry

unread,
Apr 27, 2020, 10:44:02 AM4/27/20
to openref...@googlegroups.com
Which is why I thought Flink...with the Gelly API...brings the vision of Records (really just relations which can be pushed into a Graph structure, or Document-store like Elasticsearch, as you are thinking with continuing Tabular structure with IDs across RDD's).

But perhaps that vision for Records mode, flexible schema (schema-less) can still be done with Spark, or a different data processing backend later.


Antonin Delpeuch (lists)

unread,
Apr 27, 2020, 12:37:13 PM4/27/20
to openref...@googlegroups.com
Hi Thad,

Yes, it would be great to make it easier to join two projects. The cross
function is not ideal for that. But that being said it is not clear to
me what the replacement should look like.

As you know I am really not a fan of the records mode in its current
form. I am also very keen to find a replacement for it, but it is the
same problem here: I don't think we have identified a clear replacement
for it. We need to make progress in that thread:
https://groups.google.com/forum/#!searchin/openrefine/future|sort:date/openrefine/X9O8NBC1UKQ

For now, I am trying to make incremental changes: first, migrate to
Spark while keeping the existing functionality, then, figure out how we
could improve over things like cross and records.

If I attempt to solve all these problems at once this is guaranteed to
fail in a pretty spectacular way.

Antonin
> <https://developers.google.com/freebase/guide/basic_concepts#more_properties>
>  
>
> Thad
> https://www.linkedin.com/in/thadguidry/
>
> --
> You received this message because you are subscribed to the Google
> Groups "OpenRefine Development" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to openrefine-de...@googlegroups.com
> <mailto:openrefine-de...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/openrefine-dev/CAChbWaMN5p10Yqo04zN%3DYePQcCYNTP5jeHxLUYiBb_TcZui-uw%40mail.gmail.com
> <https://groups.google.com/d/msgid/openrefine-dev/CAChbWaMN5p10Yqo04zN%3DYePQcCYNTP5jeHxLUYiBb_TcZui-uw%40mail.gmail.com?utm_medium=email&utm_source=footer>.

Thad Guidry

unread,
Apr 27, 2020, 12:51:57 PM4/27/20
to openref...@googlegroups.com
Sure, I understand the iterative step-by-step approach for our migration.  And agree it lowers the risk for us.
I was just quickly responding to your concrete user workflow question.
Glad we also agree on the many improvements we want to make in areas

In regards to hierarchical data displays... I've been researching a few and should have a paper put together comparing/contrasting a few options for us.
Curious, have you yourself done any research in that area yet that you could throw up on a shareable doc? or not?


Antonin Delpeuch (lists)

unread,
Apr 27, 2020, 1:00:16 PM4/27/20
to openref...@googlegroups.com
I haven't written up anything about this, no. It would be a useful thing
to do.

Antonin
> --
> You received this message because you are subscribed to the Google
> Groups "OpenRefine Development" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to openrefine-de...@googlegroups.com
> <mailto:openrefine-de...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/openrefine-dev/CAChbWaOm%2BagLvVcGQhcaLGo20aRKYe9f6J51d1W1gPmhGWGpoQ%40mail.gmail.com
> <https://groups.google.com/d/msgid/openrefine-dev/CAChbWaOm%2BagLvVcGQhcaLGo20aRKYe9f6J51d1W1gPmhGWGpoQ%40mail.gmail.com?utm_medium=email&utm_source=footer>.

Flavio Pompermaier

unread,
Apr 27, 2020, 2:45:50 PM4/27/20
to openref...@googlegroups.com
Sorry for not being so helpful here but unfortunately I don't have any time to follow the details of the evolution of this part.
I'm very interested about the evolution of this part because we would like to use OpenRefine also in our big data projects.
I can just give some insights based on my experience but beyond that I have to blindly trust the work you're carrying out.

Following the points of this discussion I can just give my point of view about some choice (I can be very helpful with Flink but much less with Spark):
  • Spark vs Flink: I'm a super fan of Flink (beyond OR of course) and I'm probably the oldest user of it (we are using it at Okkam since 2012, when it still was Stratosphere). Btw, I have to agree with Antonin that AT THE MOMENT Spark is undoubtedly superior wrt Flink in the batch area. And this is even worse if you consider Machine Learning and AI. However, I can tell you that since Alibaba entered in the project (about a year ago) the development of Flink improved dramatically and I think that within the end of this year it would compete with Spark also for what concerns with batch. Also for machine learning and AI there was a recent contribution by Alibaba that open sourced Alink [1] (it's machine learning platform based on Flink) and I'm pretty sure that they will improve a lot also that part. But not in the short time. Also the Table API and SQL support is moving super fast in this moment in Flink.
  • Flink Gelly: Gelly is just a library of Flink that introduce classes and operators to work with graphs. However it's quite unused and I suggest to avoid it's usage. Flink Stateful Functions[2] could be a better fit for graph processing (but also this library is in early stage and thus immature/unstable for the moment). But I don't think that Gelly or Statefun are needed in OpenRefine (at least at the moment)
  • Flink and Pyton: Flink now has a very good support to Python as a first class citizen[3]. However I didn't tried it yet so I can't tell how performant their approach is
  • Record vs Row model: actually I can't understand the problem here. Isn't Records just a group-by/key-by? Could you explain me better why this is a problem in Spark?
  • Flink Dataset vs Stream API: they are in a phase of deep reworking of those API. From what I know they are currently unifying the 2 in order to have a single API (Datastream) but with different optimizations if the datastream if finite (batch) or infininte (stream), This will come with the finalization of the merging of Blink. Batch API are not optimized at all at the moment (wrt Spark)
  • General approach: ok to go with Spark right now but make it easy to switch the execution engines. I think Dremio and Zeppelin have the best approach to this topic and can be took as reference
  • Refine history to SQL: I think this would be a VERY interesting third-party project to make it easy to port OR projects to Flink or Spark SQL

To unsubscribe from this group and stop receiving emails from it, send an email to openrefine-de...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/openrefine-dev/2ec6f3ca-678b-37e3-0b03-dea23fc98989%40antonin.delpeuch.eu.

Antonin Delpeuch (lists)

unread,
Apr 27, 2020, 7:08:18 PM4/27/20
to openref...@googlegroups.com
Hi Flavio,

Thanks for chiming in! Replying inline.

On 27/04/2020 20:45, Flavio Pompermaier wrote:
> * *Record vs Row model*: actually I can't understand the problem here.
> Isn't Records just a group-by/key-by? Could you explain me better
> why this is a problem in Spark?

It is not a problem anymore, I have found a nice way to do it, I think:
https://github.com/OpenRefine/OpenRefine/blob/spark-prototype/or-model/src/main/java/org/openrefine/model/rdd/RecordRDD.java

But I'd be curious to hear how you would do it efficiently with groupBy
/ keyBy - in general it seems impossible to avoid shuffles with that API.


> * *General approach*: ok to go with Spark right now but make it easy
> to switch the execution engines. I think Dremio and Zeppelin have
> the best approach to this topic and can be took as reference

Thanks, I'll have a look at those.

> * *Refine history to SQL*: I think this would be a VERY interesting
> third-party project to make it easy to port OR projects to Flink or
> Spark SQL

I do not think exporting a history to SQL is doable in general, and I am
not following this approach.

For instance, how would you implement the reconciliation operations? Or
even things like key-value columnize?
https://docs.openrefine.org/operations/key_value_columnize.html

As far as I can tell SQL is just not the tool for the job.

Antonin

Flavio Pompermaier

unread,
Apr 28, 2020, 5:54:03 AM4/28/20
to openref...@googlegroups.com
If your problem was to avoid shuffling then you have to use a smart custom partitioning strategy[1] like you did somehow in the RecordRDD (if I'm not mistaken).

Probably Refine does a lot more than pure SQL but maybe it's not..this is something I always asked myself. Indeed, the SQL translation of the history is something that could be a nice experiment if someone is brave enough to engage this challenge: maybe it's not fully compatible at 100% but you could read the history first and check whether that history is convertible or not. 
I think most of the times it will. But of course this is those kind of experiments that could be nice for a student or a person who wants to have fun in its spare time..

The key-value columnize is something I don't know but from what I read maybe you can still translate to SQL using an external select? Like SELECT X.x, (some expression) AS Y FROM (SELECT * FROM XXX)?


--
You received this message because you are subscribed to the Google Groups "OpenRefine Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to openrefine-de...@googlegroups.com.

Antonin Delpeuch (lists)

unread,
Apr 28, 2020, 6:12:07 AM4/28/20
to openref...@googlegroups.com
On 28/04/2020 11:53, Flavio Pompermaier wrote:
> Probably Refine does a lot more than pure SQL but maybe it's not..this
> is something I always asked myself. Indeed, the SQL translation of the
> history is something that could be a nice experiment if someone is brave
> enough to engage this challenge: maybe it's not fully compatible at 100%
> but you could read the history first and check whether that history is
> convertible or not. 
> I think most of the times it will. But of course this is those kind of
> experiments that could be nice for a student or a person who wants to
> have fun in its spare time..

It's something that could be done outside of OR itself, just working on
the JSON history. Especially if you do not intend to have full support
for all operations.

In OpenRefine itself, the backend has to support all operations of
course. We cannot afford to throw away things like reconciliation :)

>
> The key-value columnize is something I don't know but from what I read
> maybe you can still translate to SQL using an external select? Like
> SELECT X.x, (some expression) AS Y FROM (SELECT * FROM XXX)?

It is a pivot-like operation and there are some ways to do things like
this in SQL, but they either rely on vendor-specific SQL statements
(PIVOT in Microsoft SQL Server, or in Spark…) or by statically
hard-coding the resulting column names in the query:
https://postgresql.verite.pro/blog/2018/06/19/crosstab-pivot.html

But even with these approaches, good luck with generating the records
structure when there are multiple values:
https://docs.openrefine.org/operations/key_value_columnize.html#entries-with-multiple-values-in-the-same-column

Perhaps it can be done, but I doubt this would be very efficient.

Antonin
> <mailto:openrefine-dev%2Bunsu...@googlegroups.com>.
> --
> You received this message because you are subscribed to the Google
> Groups "OpenRefine Development" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to openrefine-de...@googlegroups.com
> <mailto:openrefine-de...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/openrefine-dev/CALQ49Kk931%2BZH1SBU0vVthGS25GGL7ss9twT7EqM_XrZv8BoMw%40mail.gmail.com
> <https://groups.google.com/d/msgid/openrefine-dev/CALQ49Kk931%2BZH1SBU0vVthGS25GGL7ss9twT7EqM_XrZv8BoMw%40mail.gmail.com?utm_medium=email&utm_source=footer>.

Flavio Pompermaier

unread,
Apr 28, 2020, 6:28:30 AM4/28/20
to openref...@googlegroups.com
Of course this part of SQL would be a third party project, an experimental extension.
It's something that could be interesting from a philosophical perspective..but could be useful to better understand what OR does that other SQL tools (like Dremio or Trifacta) can't.

To unsubscribe from this group and stop receiving emails from it, send an email to openrefine-de...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/openrefine-dev/55d07f44-26ac-6d5a-04aa-d84ac68b42d3%40antonin.delpeuch.eu.

Antoine Beaubien

unread,
May 7, 2020, 9:04:32 PM5/7/20
to OpenRefine Development
Hi Thad and Antonin,

   While reading this interesting discussion on tools and our record mode, I have a double interrogation:
  1. Do we plan at some point in the future to have more than one table in a project?
  2. Do we plan at some point in the future to be able to save « trees » in a project?
Regards,
   Antoine

Antonin Delpeuch (lists)

unread,
May 9, 2020, 12:31:40 PM5/9/20
to openref...@googlegroups.com
Hi Antoine,


On 08/05/2020 03:04, Antoine Beaubien wrote:
> 1. Do we plan at some point in the future to have more than one table
> in a project?

It could be an interesting generalization of OR. It's not in our
milestones for this year, but if you have ideas about how this should
work feel free to expand :)

> 2. Do we plan at some point in the future to be able to save « trees »
> in a project?

Do you mean representing hierarchical information in a more faithful way
than the records mode? I would be very keen to have that. As you know we
have been discussing this before:
https://groups.google.com/forum/#!searchin/openrefine/future|sort:date/openrefine/X9O8NBC1UKQ

If that's not what you mean, just expand :)

Antonin

>
> Regards,
>    Antoine
>
> Le lundi 27 avril 2020 12:51:57 UTC-4, Thad Guidry a écrit :
>
> Sure, I understand the iterative step-by-step approach for our
> migration.  And agree it lowers the risk for us.
> I was just quickly responding to your concrete user workflow question.
> Glad we also agree on the many improvements we want to make in areas
>
> In regards to hierarchical data displays... I've been researching a
> few and should have a paper put together comparing/contrasting a few
> options for us.
> Curious, have you yourself done any research in that area yet that
> you could throw up on a shareable doc? or not?
>
> Thad
> https://www.linkedin.com/in/thadguidry/
> <https://www.linkedin.com/in/thadguidry/>
>
>
> Le lundi 27 avril 2020 12:37:13 UTC-4, Antonin a écrit :
>
> Hi Thad,
> Yes, it would be great to make it easier to join two projects.  
>
> --
> You received this message because you are subscribed to the Google
> Groups "OpenRefine Development" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to openrefine-de...@googlegroups.com
> <mailto:openrefine-de...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/openrefine-dev/9c034401-2385-4418-9b7c-4619a59efbe5%40googlegroups.com
> <https://groups.google.com/d/msgid/openrefine-dev/9c034401-2385-4418-9b7c-4619a59efbe5%40googlegroups.com?utm_medium=email&utm_source=footer>.

Antoine Beaubien

unread,
May 10, 2020, 12:59:45 AM5/10/20
to OpenRefine Development
Hi Antonin, 
>  2. Do we plan at some point in the future to be able to save « trees »
>     in a project?
Do you mean representing hierarchical information in a more faithful way
than the records mode? I would be very keen to have that. As you know we
have been discussing this before:
https://groups.google.com/forum/#!searchin/openrefine/future|sort:date/openrefine/X9O8NBC1UKQ
If that's not what you mean, just expand :)

« records mode » enables us to have one level of hierarchy from a flat table. For that, I would like to be able, from the same table, to change which column is the « current » sub-hierarchy. That way, as a OR user, I could access all hierarchies from my flat table, just one at the time. That also would be good, but it's not what I meant.

I would like that an OR project be a collection of tables, trees, all types of schemas, meaning: 1) WD schemas, of course, but also SQL schemas from external import (and maybe, one day, external direct export), but also INTERNAL schemas between the tables and trees inside the OR project. So a movies project could have a table for movies, a table for people, a table for compagnies.

So, if an OR project could store trees (json, xml, or generic), then, they could be used in transformation, either to extract data, or a small tree could serve as a template to generate either subtree used elsewhere (i.e. in templating export). These dataset could just be called like variables, i.e.
table["My-Table-1"].rows
or
tree["My-First-Tree".branches]

The idea is really to have an unity of cooperating datasets. With internal and external paths declarations. So you could have an informational ecosystem with linked tables and trees inside your project, and outside connections to other datasets. It could be very useful, especially if OR becomes more used by the general public.

Regards,
   Antoine

Antonin Delpeuch (lists)

unread,
May 23, 2020, 8:19:06 AM5/23/20
to openref...@googlegroups.com
Hi all,

Here are some updates about this work.

I have updated the spark-prototype branch with a working snapshot of the
prototype. I can create packaged snapshots if there is interest (but
there is not much difference with the previous one in terms of UX).

This month I have implemented the idea (suggested in this thread) of
making the datamodel implementation pluggable, so that we could easily
switch from Spark to something else. I am really happy about this move
because it does make things a lot cleaner:

- I think there is still a chance that we might not want to use Spark by
default. For small datasets, the overhead added by Spark in task
serialization and scheduling is significant compared to the actual data
processing time. Concretely you can experience that in the prototype, by
playing with the UI interactively: some operations are a bit less
reactive than in the 3.x branch. Because I have not spent a huge amount
of time optimizing the implementation so far, there might be ways to
reduce this overhead while keeping everything in Spark. But we need to
spend a lot of effort to do this optimization, and we only find out if
we are happy with the performance at the end of this optimization
effort. So that's obviously a risk for the project. Now the good thing
with this refactoring is that it gives us a simple escape route: we can
implement our own spark-like runner, which keeps the partitioning and
lazy evaluation of datasets but avoids the task serialization and
scheduling (because we can rely on the fact that everything is executed
in the same process). Such a runner would still offer nice scalability
guarantees (we can work on datasets which do not fit in RAM) without
compromising on the responsiveness of the tool on small datasets.

- It is possible to develop implementations of the data model for other
big data backends (Flink, or even Beam itself). These are worth
developing even if they incur significant overheads: they would just be
used to run OR recipes in these environments, rather than being used in
OR's backend when using the tool interactively.

- Testing is a lot easier. I have created a naive implementation of the
data model (which stores everything in RAM), and that is the
implementation that is used to run tests of operations. So we only have
to create a Spark instance to test the Spark implementation of the data
model. This means that tests on operations (or rather, changes) are
quick to run, too. There is also a common test suite that all
implementations of the data model can extend: this makes it even easier
to develop new implementations. If we want to be extra safe we could
also configure our test suite to run operation tests with both the
testing runner and the spark runner: that would for instance make sense
for larger integration tests. This would make sense because some bugs
only appear in a spark-specific context (the most common one being task
serialization).

Here is a quick overview of the architecture. The idea is very similar
to Apache Beam, but by implementing our own interface we can control the
API as we wish, which is important to leave space for optimizations and
cater for the peculiar aspects of our data model (such as the records mode).

The main interface is GridState, which represents a state of the grid at
some point in a recipe.
https://github.com/OpenRefine/OpenRefine/blob/spark-prototype/or-model/src/main/java/org/openrefine/model/GridState.java
For now, not many operations can be run on the grid: basically only maps
on rows or records. As I migrate more change classes, I will add more
methods to this interface (removing rows, reordering rows, performing
stateful maps on rows, and so on). The GridState is a rough analogue to
Spark RDDs or Beam PCollections: it is the basic representation of a
dataset, and we can run operations on them, which gives new collections.

The interface that represents an implementation of the datamodel is
DatamodelRunner, which basically just a factory for GridState.

Both of these interfaces are defined in the "or-model" Maven module,
which does not depend on Spark. Then, we have an "or-spark" module which
does depend on Spark and provides an implementation of these interfaces
that relies on Spark's RDDs. The Spark context is held by the
SparkDatamodelRunner.

The "or-testing" module also depends on "or-model" and implements two
things:
- a testing runner which does everything in RAM;
- a testing suite, which holds a collection of tests that all
implementations of the data model should satisfy. This suite is defined
as main source file, not as test file, so that other modules (such as
"or-spark") can rely on it in their own tests.

For now, Spark is still used by default in the tool: "or-spark" is added
as a runtime dependency of the "main" module, and the
SparkDatamodelLoader is dynamically loaded at startup. This is not
user-configurable currently, but it will be simple to expose that later:
one could imagine adding command-line options to the `refine` scripts
and executables to select which datamodel implementation to use (and
configure it, for instance to connect to an existing Spark cluster).

I have also migrated of a more significant set of operations:
- split columns
- split multi-valued cells (as records)
- join multi-valued cells
- move columns
- reorder columns
- remove columns
- rename columns

These migrations were a bit slow to start with, because I have been
refining the GridState interface as I notice problems in it, and also
because Change classes are basically not tested, so I need to write
tests as I go along.

For the coming weeks, my plan is to migrate all the other changes, which
is going to require designing a mechanism to store change data (for
reconciliation, fetching urls, fetching data from reconciled values).
This must be done via the datamodel interfaces so that runners can load
the data in an appropriate way, making it easier to join it with project
data.

Let me know if anything is unclear or if you feel like this is not going
in the right direction!

Cheers,
Antonin

Thad Guidry

unread,
May 23, 2020, 9:21:15 AM5/23/20
to openref...@googlegroups.com
This month I have implemented the idea (suggested in this thread) of
making the datamodel implementation pluggable, so that we could easily
switch from Spark to something else. I am really happy about this move
because it does make things a lot cleaner:


YEAHHHHHHHH!  HUGE! Congrats on this!

I'll find the time this weekend to explore the changes and test a bit.


Tom Morris

unread,
Jun 1, 2020, 5:45:53 PM6/1/20
to openref...@googlegroups.com
This sounds like it's evolving in a good direction. I particularly like that it's not tied to Spark alone.

Is there anything that describes how facets interact with operations in the new design? One of the things that I found when I looked into this is that the current interface is awkward and hard to abstract since it has to resolve to a set of rows that get operated on when being able to express it as the equivalent of a SELECT clause would allow you to push the filtering down into the database engine.

Is there an analysis of the current operations with a breakdown of which ones are easy / hard / impossible to migrate to the new architecture? Now would be the time to drop any particularly awkward operations which inhibit an efficient backend architecture.

It looks like there's a package rename mixed in with the backend rearchitecture work. Was the rename done before everything else? If so, is there a tag or commit that can be used as a base to diff against?

I still haven't been able to look at this in great details, but I'll keep poking at it as I find time.

Tom

--
You received this message because you are subscribed to the Google Groups "OpenRefine Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to openrefine-de...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/openrefine-dev/ced64272-0201-99bc-43fb-82566470386c%40antonin.delpeuch.eu.

Antonin Delpeuch (lists)

unread,
Jun 21, 2020, 8:43:20 AM6/21/20
to openref...@googlegroups.com
Hi Tom,

I realize I have not replied to this! Very sorry about that.

On 01/06/2020 23:45, Tom Morris wrote:
> Is there anything that describes how facets interact with operations in
> the new design? One of the things that I found when I looked into this
> is that the current interface is awkward and hard to abstract since it
> has to resolve to a set of rows that get operated on when being able to
> express it as the equivalent of a SELECT clause would allow you to push
> the filtering down into the database engine.

Yes, the existing architecture is not suitable for the migration since
it relies on the visitor pattern, which does not fit with the API
exposed by Spark and other frameworks.

Here is a summary of the current architecture:

Facets declare a subclass of FacetState, which represents statistics
aggregated over rows. For instance, for a text facet, it stores the list
of values seen in a set of rows. They provide the following methods
around it:
- a method to update a facet state after processing one row
- a method to merge facet states gathered over disjoint parts of the grid
- an initial facet state (when no row has been seen)
This makes it possible to compute the facet statistics in parallel over
multiple partitions of the grid.

- the GridState interface offers aggregateRows and aggregateRecords
methods which compute the final state of an aggregation using the
methods above. It is to the discretion of the implementation which
strategy is used to do this aggregation. The Spark-based implementation
uses Spark's own aggregation API, which is basically identical.

That lets you compute facet states. This does not let you compute
approximate facet states yet: for now, we are scanning the entire table.
I plan to introduce an API for approximate aggregation once the first
migration is done: for instance, we could ask the backend to aggregate
facet statistics for up to N rows (N being configurable of course) and
report the statistics as percentages to the user.

>
> Is there an analysis of the current operations with a breakdown of which
> ones are easy / hard / impossible to migrate to the new architecture?
> Now would be the time to drop any particularly awkward operations which
> inhibit an efficient backend architecture.

My goal is to keep all operations. The inefficient ones will still be
around, even if they will remain inefficient: for most users who deal
with small datasets, inefficiency is not a concern and I would not want
to reduce the set of features we offer to them.

For instance, consider the "key-value columnize" transform (or in fact
any transpose operation we offer). The logic of those operations are
quite complex, and there is no clear way how to formulate this with the
Spark API.

This month I have been migrating those, simply by keeping the existing
code (modulo bureaucratic refactorings). That means the operation is run
on a single node (the "executor" in Spark terms), not parallelized at
all, as inefficient as it is in 3.x.

By having a close look at these operations, of course I have ideas of
things I would like to improve there (or have doubts about their general
usefulness). But for now I want to keep the scope of my changes small: I
do not change the behaviour of operations, I only migrate them to the
new architecture.

More broadly this is even true for the records mode: it would have been
nice to get rid of it and replace it by a better way to represent
hierarchical data, before doing this migration. But we do not have a
good replacement solution for it as far as I know, and I would not
attempt to do that in the same go either.

My current goal is to migrate all existing functionality to the new
architecture, to get to a state where we have the same feature set and
comparable performance for small datasets.

>
> It looks like there's a package rename mixed in with the backend
> rearchitecture work. Was the rename done before everything else? If so,
> is there a tag or commit that can be used as a base to diff against?

Yes, the "4.x" branch has the package rename (and maven module
reorganization) without the Spark migration.

https://github.com/OpenRefine/OpenRefine/tree/4.x

Antonin
> <mailto:openrefine-dev%2Bunsu...@googlegroups.com>.
> --
> You received this message because you are subscribed to the Google
> Groups "OpenRefine Development" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to openrefine-de...@googlegroups.com
> <mailto:openrefine-de...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/openrefine-dev/CAE9vqEE8KdStHcfj2XWMCLJ5o5sd_2qGqH7ZrHzx%2BfCJh0vwPQ%40mail.gmail.com
> <https://groups.google.com/d/msgid/openrefine-dev/CAE9vqEE8KdStHcfj2XWMCLJ5o5sd_2qGqH7ZrHzx%2BfCJh0vwPQ%40mail.gmail.com?utm_medium=email&utm_source=footer>.

Tom Morris

unread,
Jun 21, 2020, 6:09:57 PM6/21/20
to openref...@googlegroups.com
On Sun, Jun 21, 2020 at 8:43 AM Antonin Delpeuch (lists) <li...@antonin.delpeuch.eu> wrote:

I realize I have not replied to this! Very sorry about that.

No worries. When I went to look at the thread, I noticed that I had a half-finished draft from last week that I never sent. It kind of rehashes some of what you covered, but I'll include it here as a starting point anyway:

Rereading some of the earlier discussion from the end of April, the details of which I kind of skimmed over in the flurry of discussion, I wanted to provide feedback on some of the perceived design constraints.
I don't take it as a given that functionality and form needs to be preserved 100%. In particular, when there's a "nothing else in the world operates quite this way" moment, I take that as a sign to take a step back and consider if OpenRefine should do it differently too. As an example, Fill Down and Blank Down, which require row state, exist principally as an artifact of how we create row groups aka "records". We could provide the same functionality by designating certain columns to be the grouping key (currently leading columns) and instead of using blank/null as the flag, say that any rows with the same set of values for those cells are in the same group/record. In the same vein, we should look at records with an eye towards what people use them for rather than as a literal implementation which can't be changed.
 
From an internal architecture point of view, if rather than having facets create a filtered row set to operate on we can have them generate a filter expression (a la SQL WHERE) that we can use to generate statistics and combine together to generate sets of rows to operate on. It's been years since I looked at it, but as I remember MassEdit was a candidate. It's kind of a generic catch-all that's used to implement a bunch of stuff.

So, picking up from there, someone (Flavio?) in one of the earlier discussions mentioned the idea of using a language as the decoupling mechanism. That really resonates with me, because it's guaranteed to be low bandwidth and really enforces a disciplined separation. It might be harder than using an API directly, but I think it could potentially yield benefits.

Your description below focuses on the statistical / visualization aspect of facets, which requires crunching a lot of data (all currently) to generate a summary to visualize. Going the other direction, though, where the facets are used as a selection mechanism, the bandwidth required is potentially much much less. A numeric facet with hundreds of histogram buckets boils down to just a min and max value on the selection side. Ditto Timeline facets. Text selections are value IN ['choice1', 'choice2'], etc.

At scale, the workflows are probably going to be very different as well. It won't make sense for a single user to try to "Cluster & Edit" millions of rows by themselves. I think that's one of the things MassEdit is used for, so reimagining that workflow could get rid of one of the needs for it.

I'll continue to try and familiarize myself with the prototype, but wanted to provide that high level feedback. Ideally everything would magically port over with no losses, but I'd rather end up with something useful that's incompatible than something that's 100% compatible, but crippled in how far it can scale, computationally and humanly, requiring more breaking changes later.

Tom

Antonin Delpeuch (lists)

unread,
Jun 21, 2020, 11:47:44 PM6/21/20
to openref...@googlegroups.com
This is likely to require us to drop quite a few operations, the records
mode, and probably many other things in GREL… If you go down that route
I think it would deserve to be a different codebase, so a different
project. This is because both versions of the codebase will need to be
maintained for a very long time (if OpenRefine 3.x remains your go-to
tool for small datasets and you only use OpenRefine 4.x for big datasets).

I think it is really possible to have a 4.x branch which does not
compromise on the usefulness of the tool at small scale, but makes it
possible to scale the workflows which only use scalable operations. And
also reduces the memory footprint of the tool for mid-sized datasets
(say 100k rows).

>
> Your description below focuses on the statistical / visualization aspect
> of facets, which requires crunching a lot of data (all currently) to
> generate a summary to visualize. Going the other direction, though,
> where the facets are used as a selection mechanism, the bandwidth
> required is potentially much much less. A numeric facet with hundreds of
> histogram buckets boils down to just a min and max value on the
> selection side. Ditto Timeline facets. Text selections are value IN
> ['choice1', 'choice2'], etc.

That is absolutely true, and this is exactly how the prototype works to
do the filtering. In fact, if you use it to run a workflow in a Spark
instance (instead of being the underlying backend of the web app), no
aggregations will be run since we do not need to compute facet states in
that context (except when the user explicitly relies on them with the
`facetCount` GREL function for instance).

This is not something that is easy to realize with the current prototype
because we compute facet states and displayed rows at the same time in
the UI (so you will see Spark running aggregation jobs every time you
change the grid and have some facets around), but should become clearer
when executing OR workflows outside of the web app.

>
> At scale, the workflows are probably going to be very different as well.
> It won't make sense for a single user to try to "Cluster & Edit"
> millions of rows by themselves. I think that's one of the things
> MassEdit is used for, so reimagining that workflow could get rid of one
> of the needs for it.

At the moment, our cluster and edit functionality is represented in the
workflows as a mass edit operation, which are indeed perfectly scalable.
So if you have a workflow that was worked out on a small dataset using
cluster and edit, it will run fine on a larger version of the same
dataset (but that dataset might contain new values that were not
reported in the clustering tool, so will be left untouched by your
operation).

I think people have already expressed the need to do it the other way
around in certain scenarios: the JSON workflow would contain the
clustering configuration, without listing explicitly the values that you
merge, and you would trust that this recipe works well in an open domain
(pretty much like you can do now with reconciliation if you trust the
scoring). It is something we could support too.

>
> I'll continue to try and familiarize myself with the prototype, but
> wanted to provide that high level feedback. Ideally everything would
> magically port over with no losses, but I'd rather end up with something
> useful that's incompatible than something that's 100% compatible, but
> crippled in how far it can scale, computationally and humanly, requiring
> more breaking changes later.

I think sticking with the current feature set is a good guarantee to
remain useful: this is the tool that is currently used, and it is
reasonably popular I would say.

I personally I do not think I have a mandate to get rid of fundamental
aspects of the tool like the records mode without a tried and tested
alternative. I would not call the tool "OpenRefine" anymore after that.
Not that I like the records mode at all (I would love to be able to do
without it). I am really worried that such a version would not be
adopted by our users and would simply end up gathering dust, hurting the
development by keeping two very different versions of the tool in
separate branches over an extended period.

But it is definitely an interesting exercise to list all the operations
we have and try to think how they would be formulated in SQL (not sure
if there are other languages you would consider as potential
interfaces?). For me, this would mean removing:
- the records mode
- blank down, fill down
- all three transpose operations
- split and join multi-valued cells
- the ability to store cells of different types in the same column,
probably?
- reconciliation and all related operations? (you could try encoding
reconciled cells as JSON blobs, but then manipulating that is going to
be quite painful I guess)
- custom expression language support would need to be restricted to
whatever is supported by the SQL backend (which is vendor-specific as
far as I am aware - I remember running some Python in Postgres, but
surely that does not work in the same way everywhere). And it would
probably force us to redesign some aspects of GREL too.
- probably other things I cannot think of at the moment!

All that being said: I am very keen to make progress on the discussion
around the records mode to identify its successor (and have similar
discussions on other aspects of the tool that we want to revamp). It
just requires a lot of design effort that we have not been able to put
in so far.

Antonin
> <mailto:li...@antonin.delpeuch.eu
> --
> You received this message because you are subscribed to the Google
> Groups "OpenRefine Development" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to openrefine-de...@googlegroups.com
> <mailto:openrefine-de...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/openrefine-dev/CAE9vqEEF56grUKYSy0V1gv2X5dK49j3qoTw9joip_6kcTT%2BVuA%40mail.gmail.com
> <https://groups.google.com/d/msgid/openrefine-dev/CAE9vqEEF56grUKYSy0V1gv2X5dK49j3qoTw9joip_6kcTT%2BVuA%40mail.gmail.com?utm_medium=email&utm_source=footer>.

Thad Guidry

unread,
Jun 22, 2020, 9:07:18 AM6/22/20
to openref...@googlegroups.com
Antonin,

SQL
I don't think all of that would necessarily need to be removed actually.

Since Postgres 9.5 (now version 12), it offers a lot for working with data and built in:

JSON (and fast as binary jsonb with caveats),
Records and Recordsets,
Pivoting, Transpose with Crosstab and Crosstabview,
and more.

Regarding reconciliation, it's quite easy to manipulate JSON with some now pretty powerful functions and query it in Postgres.
Are you thinking that manipulation of it would be hard in Java even when using JDBC driver?

Fill down, Blank down
are just means to an end (often to help with joining via cross() later)
But different folks have different needs, and the fill down/blank down operations that help with records creation do not always get used for that purpose, but instead users are concerned about some output format for other tool compatibility once they export.
Anyways, correlated subqueries can be used in SQL for reproducing the need for fill down or blank down, if we want to retain that for users.



All that being said: I am very keen to make progress on the discussion
around the records mode to identify its successor (and have similar
discussions on other aspects of the tool that we want to revamp). It
just requires a lot of design effort that we have not been able to put
in so far.

Antonin


Is it the UI itself and how to give users controls on visualizing, transforming, querying records that you struggle with here?
Or is it the internals and data models?

Antonin Delpeuch (lists)

unread,
Jun 22, 2020, 12:11:00 PM6/22/20
to openref...@googlegroups.com
On 22/06/2020 15:07, Thad Guidry wrote:
> Antonin,
>
> *SQL *
> I don't think all of that would necessarily need to be removed actually.

Great! I am easy to convince: just cook up a SQL schema to represent an
OpenRefine project, and then give some sample SQL queries which
represent the operations I listed.

>
> Is it the UI itself and how to give users controls on visualizing,
> transforming, querying records that you struggle with here?
> Or is it the internals and data models?

The records mode is fundamentally a data model problem. The question is:
how should users represent hierarchical data in OpenRefine projects? For
instance, how could workflows like this one be done without the records
mode:
https://www.wikidata.org/wiki/Wikidata:Tools/OpenRefine/Editing/Tutorials/Working_with_APIs

Antonin

Thad Guidry

unread,
Jun 22, 2020, 1:58:49 PM6/22/20
to openref...@googlegroups.com
Great! I am easy to convince: just cook up a SQL schema to represent an
OpenRefine project, and then give some sample SQL queries which
represent the operations I listed.


Sorry, I'm having "an episode", Antonin, and my points are getting missed because I need to slow down and take time to explain more concretely perhaps.
Postgres offers a lot for working with schema-less data now but SQL operations (or even Postgres JSON functions) might not be the best way to handle data transformations as OpenRefine can.
But let me further explain.

>
> Is it the UI itself and how to give users controls on visualizing,
> transforming, querying records that you struggle with here?
> Or is it the internals and data models?

The records mode is fundamentally a data model problem. The question is:
how should users represent hierarchical data in OpenRefine projects? For
instance, how could workflows like this one be done without the records
mode:
https://www.wikidata.org/wiki/Wikidata:Tools/OpenRefine/Editing/Tutorials/Working_with_APIs

Antonin


For OpenRefine, we typically do not think schema, that's too rigid and OpenRefine operations and data model were actually originally designed with schema-less "ideas" even though we never said the words "schema-less".
The original thoughts were that schema didn't have to be dealt with much until Export time (and maybe a little at Import time).
See original Gridworks code and issues around Freebase schema skeleton alignment (flexible data model --> rigid data model), etc. and some of David's papers on web data extraction and flexible data modeling and presentation such as the snippet below. (and I would encourage you to read his thesis related-work if you haven't seen it already, a lot of it inspired various things in OpenRefine's data model and records mode)

Data publishing technologies can be compared along two orthogonal dimensions: flexibility of data modeling and flexibility of presentation. The former is a spec-trum ranging from rigid schemas that cannot be changed to general data models that can fit any information. The latter ranges from predetermined layouts and styles to completely customizable presentations. Figure 2.1 lays out this space and frames the discussion that follows

Some of the other inspirations David took from me through chats with him because he knew that I had a background in Data Architecture in the Enterprise (ETL, database tech) and Library work that dealt with completely different sets of data problems that David often didn't have exposure to, where he and I often shared experiences around data modeling, use cases, and trying to get weird messy data of various forms into Freebase much more easily.

In my world, in the Enterprise, dealing with data the way that OpenRefine can is often handled with schema-less technologies such as Document stores rather than Table stores (both however can deal with relations and approaches for relations are different as you are well aware).

Elasticsearch is a popular one for schema-less, and supported with Hadoop and Spark for interaction.
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/float.html

MongoDB has some rough edges still to this day and I don't recommend it.

Postgres can do schema-less with JSONB, or JSON, or very roughly HStore.
My mention before on other threads about Apache Flink possibly being a good fit was in context of Elasticsearch where I have seen multiple times them being used to handle nested and out-of-order streaming joins, and always thought that was powerful, but might not always be needed for users or users of OpenRefine as its workflows currently exist.  But I now think and agree with you that Apache Flink has a place in ETL but perhaps not within OpenRefine style processing.

If I and David were chatting about this today, knowing what we know, and seeing what is available, I think we would definitely be looking at JSON in some form in order to deal with schema-less, flexible data modeling.
Which technology or technologies combined lend themselves well to flexible data modeling and operations on those data models... I have my preferences (Elasticsearch) much for the same reasons that Siren choose it (<-- Important to read through please)(where David is an advisor now), but we'll need to rethink a richer UI in OpenRefine for flexible presentation.

Antonin Delpeuch (lists)

unread,
Jul 2, 2020, 8:45:19 AM7/2/20
to openref...@googlegroups.com
Hi all,

Here is a progress update on this. I have just pushed an up-to-date
snapshot of my refactoring in the "spark-prototype" branch (again,
force-pushed, because I keep rebasing the last commit which deletes the
parts that have not been migrated yet).

What is new this month:

1. Support for sorting

I have restored the sorting functionality by migrating it to the new
architecture. This means adding the relevant methods in the GridState
interface to:
- browse the grid using a sorting configuration rather than its natural
ordering
- reorder rows permanently (generating a new grid)

In terms of implementation with Spark, this is relatively
straightforward as it uses the sorting API for RDDs directly.

2. Migration of all operations and changes which do not fetch external data

That means all operations except:
- fetching URLs
- performing an initial reconciliation
- extending data based on reconciled values
- support for certain GREL functions (cross, facetCount) which fetch
data outside their evaluation context

The new supported operations are:
- fill down
- blank down
- reorder rows
- remove rows
- transform column
- add column based on this column
- judge one cell
- judge similar cells
- clear recon for one cell
- clear recon for similar cells
- mark cells as new
- clear recon data for cells
- copy recon data across columns
- match each cell to its best candidate
- match each cell to a specific candidate
- reconcile by using values as identifiers
- star / flag one rows
- star / flag filtered rows

The majority of these operations are simple maps on rows, which did not
require extending the GridState interface. Even if the migration was
conceptually easy, it took some time since most of these operations did
not have any tests associated to them, so I had to write tests as I
migrated the code.

Another side effect of the migration is the conversion of classes in the
datamodel to be immutable (mainly Row, Cell, Recon, ReconCandidate)
since this is required by the new functional architecture. This is
widely recognized as a good practice in Java in general. This also
incurred a migration effort, to migrate away from effectful to
functional code.

3. Planning for operations which rely on external data

I still have not migrated these yet because I want to get the
architecture right, in a way that allows for multiple use cases. I am
not trying to implement all of these use cases at once, but only to come
up with an architecture which is broadly compatible with all of them (in
spirit, at least):

- as an OpenRefine user, I initially want the same experience as in 3.x:
fetching external data is done only once, when the operation is first
applied. For instance, if I add a column by fetching URLs, then undo
this change, then redo it, I do not want to have to wait for the URLs to
fetch again: those results are already stored locally. Similarly, if I
use the cross function in one of my operations, then I delete the
project which was used by the cross function, I do not want this to
corrupt the data of the project in which I used the expression. This
behaviour is natural with the previous architecture, where all computed
values are persisted in change data, but not in the new architecture
which relies on lazy evaluation, so an explicit mechanism to handle
these scenarios must be introduced.

- when running an OpenRefine workflow in a pipeline (for instance as a
Spark job), I want external data to be fetched in a pipelined way: for
instance, if my workflow consists of reconciliation followed by data
extension, I expect that the corresponding HTTP requests will be done in
near-parallel: the first data extension request will not have to wait
for the last reconciliation request.

- similarly, if we implement runners for stream-based engines (such as
Flink), we could potentially run OpenRefine workflows on streams (such
as Kafka sources). Not all operations make sense in this setting (for
instance, reordering rows using a sorting criterion is not possible
without adding a windowing strategy), but this could still be useful
down the line. In this setting, if I am fetching URLs, I obviously want
these to be fetched as the stream is consumed.

- in the near future, we also want to be able to persist partial URL
fetching results (or reconciliation results) such that we can recover
from a crash during a long-running operation more efficiently, and also
let the user inspect partial results before the operation completes
(typically, for reconciliation: being able to inspect the first few
reconciliation candidates that are returned, and potentially start
judging them before all other candidates are fetched).

Do you agree with these use cases? Are there others I should keep in mind?

The architecture I am planning to go for, at this stage, is to introduce
a class to sit between the Change and the external data source, which
acts as a cache that can be persisted in the HistoryEntry. The main
question is how tightly this should be tied to the datamodel interface
(which is then implemented by Spark or other providers).

If we have stored the result of a URL-fetching operation on disk and we
want to compute the state of the project after that, we essentially need
to do a join between the previous state and the fetched URLs. For this
join to be efficient, it really helps if the fetched URLs are stored as
a Spark RDD which should be co-partitioned (i.e. also ordered by row id
and split into partitions with the same boundaries). This should ideally
avoid any shuffle and match the current performance on small datasets.
And of course this needs to be independent from Spark.

While I am still working on this architecture, my goal is to migrate the
remaining classes:
- facets
- exporters
- importers
- clustering
The goal is to reach a stage where I can merge the backlog of commits
that have accumulated on the master branch since I branched off to work
on the new architecture (this is going to be a lot of fun) and then stay
up to date by cherry-picking changes as they are merged in master.
Having a complete feature set soon will help run arbitrary workflows on
this branch, which is necessary for thorough debugging.

I am also seeking feedback from Spark experts about these design
choices, to make sure we have enough eye balls on this migration.

Thad Guidry

unread,
Jul 2, 2020, 11:19:37 AM7/2/20
to openref...@googlegroups.com

- when running an OpenRefine workflow in a pipeline (for instance as a
Spark job), I want external data to be fetched in a pipelined way: for
instance, if my workflow consists of reconciliation followed by data
extension, I expect that the corresponding HTTP requests will be done in
near-parallel: the first data extension request will not have to wait
for the last reconciliation request.

This sounds similar to Apache Nifi's Process Group (in Flow Based Programming this is called a subnet where processing occurs through nodes or stations that have dependencies on each and grouped together as a Process Group).
It might make sense to see a Process Group as a Spark Job?  Where each Process in Spark terms is a task or Executor more appropriately (the node or station on an assembly line).
From Spark and working with parallelized collections:
The behavior of the above code is undefined, and may not work as intended. To execute Jobs, Spark breaks up the processing of RDD operations into tasks, each of which is executed by an executor. Prior to execution, Spark computes the task’s closure. The closure is those variables and methods which must be visible for the executor to perform its computations on the RDD (in this case foreach()). This closure is serialized and sent to each executor.
How do you envision in Spark handling your 2 part process (reconciliation + data extension) would that be 2 tasks in a Job?

One thing to note that is very useful in Apache NiFi is the idea of buffering with Back Pressure (slowing down previous Processors in the pipeline so that later Processor's can catch up, i.e. slowing down reconciliation because data extension is even slower for whatever reason).
I often need back pressure available and I am wondering if you have thoughts to add something like that and how that might affect the architecture strategy?


- similarly, if we implement runners for stream-based engines (such as
Flink), we could potentially run OpenRefine workflows on streams (such
as Kafka sources). Not all operations make sense in this setting (for
instance, reordering rows using a sorting criterion is not possible
without adding a windowing strategy), but this could still be useful
down the line. In this setting, if I am fetching URLs, I obviously want
these to be fetched as the stream is consumed.

- in the near future, we also want to be able to persist partial URL
fetching results (or reconciliation results) such that we can recover
from a crash during a long-running operation more efficiently, and also
let the user inspect partial results before the operation completes
(typically, for reconciliation: being able to inspect the first few
reconciliation candidates that are returned, and potentially start
judging them before all other candidates are fetched).

Do you agree with these use cases? Are there others I should keep in mind?


Appending rows to an existing OpenRefine project?  I have this need as well often, and also other users, where new data is already aligned to existing data in an OpenRefine Project and we want to analyze and transform those aligned datasets as one large dataset or collection in OpenRefine Project.  I collect data from lots of disparate places around the internet, constantly aligning and adding to a growing dataset.  In that context, I want to use a single OpenRefine Project(not many separate projects), and import additional rows of aligned data, reconcile the new rows, do other things for a week, open that OpenRefine Project back up the following week, append some new rows into that same Project and reconcile only those new rows, transform some of them(a filter then transform some values).

With Spark, I imagine this can be handled in various ways.
1. UnionRDD?
2. Treat new data as a child RDD of the parent RDD?  Does Spark's sortByKey() and then zip and zipPartition apply here or not?  If we checkpoint the RDD, then references to its parent RDDs are removed; what then? Maybe we don't want checkpointing at all in this case https://stackoverflow.com/questions/35127720/what-is-the-difference-between-spark-checkpoint-and-persist-to-a-disk

So, I'm unsure of the best approach in Spark for that use case in issue #715 and hope you have a way to solve that use case for me and them.
Quote from that issue:
I would like to be able to load a new Excel/CSV file into the existing OpenRefine dataset and do cleaning on it. Ideally, this new dataset would come in as new rows that I can then start manipulating.
 
 
The goal is to reach a stage where I can merge the backlog of commits
that have accumulated on the master branch since I branched off to work
on the new architecture (this is going to be a lot of fun) and then stay
up to date by cherry-picking changes as they are merged in master.
Having a complete feature set soon will help run arbitrary workflows on
this branch, which is necessary for thorough debugging.


"fun" - I like your optimism here.  Don't lose that :-)

Thad Guidry

unread,
Jul 2, 2020, 11:31:46 AM7/2/20
to openref...@googlegroups.com
Actually thinking after I replied...
The appending rows problem... hmm, it technically doesn't have to be appended at all.  The need is just "doing stuff with multiple datasets that are aligned...at one time"
Imagine if we COULD keep the OpenRefine Projects separate and still have a way to put some into a Collection to do stuff across all of them as a whole at one time.
W could work with them separately or as a collection.
In Apache NiFi, that's how it works... each OpenRefine Project is essentially a Flowfile.

It would be nice to have OpenRefine 4.0 work similarly.
That way we can keep individual metadata and provenance about each OpenRefine Project, but still be able to treat several as a large Collection to extend and transform as a collective whole.


Antonin Delpeuch (lists)

unread,
Aug 8, 2020, 12:32:39 PM8/8/20
to openref...@googlegroups.com
I had a look at Apache Nifi but did not end up replying here, here are a
few thoughts:

On 02/07/2020 17:19, Thad Guidry wrote:
> One thing to note that is very useful in Apache NiFi is the idea of
> buffering with Back Pressure (slowing down previous Processors in the
> pipeline so that later Processor's can catch up, i.e. slowing down
> reconciliation because data extension is even slower for whatever reason).
> I often need back pressure available and I am wondering if you have
> thoughts to add something like that and how that might affect the
> architecture strategy?

As things stand, there is no need for back pressure in OpenRefine, since
we don't even have support for concurrent operations.

When running a workflow in a stream-based runner, that will become
relevant. With the current implementation I have, the back pressure will
be immediate (no buffer between consecutive operations). But it should
be doable to introduce some buffering (in Spark, this could be
implemented as a RDD which prefetches items from its parent up to a
certain buffer size. But I do not think this is a priority since a lot
more work is needed to bring concurrent operations in the tool itself.

>
>
> - similarly, if we implement runners for stream-based engines (such as
> Flink), we could potentially run OpenRefine workflows on streams (such
> as Kafka sources). Not all operations make sense in this setting (for
> instance, reordering rows using a sorting criterion is not possible
> without adding a windowing strategy), but this could still be useful
> down the line. In this setting, if I am fetching URLs, I obviously want
> these to be fetched as the stream is consumed.
>
> - in the near future, we also want to be able to persist partial URL
> fetching results (or reconciliation results) such that we can recover
> from a crash during a long-running operation more efficiently, and also
> let the user inspect partial results before the operation completes
> (typically, for reconciliation: being able to inspect the first few
> reconciliation candidates that are returned, and potentially start
> judging them before all other candidates are fetched).
>
> Do you agree with these use cases? Are there others I should keep in
> mind?
>
>
> Appending rows to an existing OpenRefine project?

That is doable, but mostly independent from the change of architecture.
It is something you can implement both with the old and new architecture.

Antonin

Thad Guidry

unread,
Aug 8, 2020, 12:38:26 PM8/8/20
to openref...@googlegroups.com
Antonin,

Thanks for looking into this and letting me know more.
Appreciate it!



--
You received this message because you are subscribed to the Google Groups "OpenRefine Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to openrefine-de...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/openrefine-dev/5f43ef04-94ed-0d02-5234-7938a876898f%40antonin.delpeuch.eu.

Antonin Delpeuch (lists)

unread,
Aug 8, 2020, 2:52:31 PM8/8/20
to openref...@googlegroups.com
Hi all,

Here is a new update on this front.

I have finally found an architecture for long-running operations (URL
fetching, reconciliation, data extension) which I am reasonably happy with.

The goals were:
- to replicate the current behaviour of the tool despite the new
architecture based on lazy evaluation (data fetched in an operation is
fetched only once)
- to make it possible to run operations efficiently in the context of a
pipeline (for instance with Spark), where we want to break the barriers
created by the fetching and serialization of external data (see previous
message)

The proposed architecture is as follows.

We introduce a new interface "ChangeData", which represents the external
data fetched by an operation. This data is indexed by row ids. Just like
"GridState", the implementation of this interface is provided by the
pluggable data model implementation (which means that it can be
manipulated efficiently without loading everything in memory).

https://github.com/OpenRefine/OpenRefine/blob/spark-prototype/or-model/src/main/java/org/openrefine/model/changes/ChangeData.java

The API looks like this:
- given a GridState object, one can generate a ChangeData object on its
rows by mapping a function which can fetch external data (such as making
HTTP requests). Batching is supported (which is important for
reconciliation and data extension).
- given a GridState object and a ChangeData object, one can join the two
to obtain a new GridState, where the change data has been used to update
the grid (such as reconciling a column)

https://github.com/OpenRefine/OpenRefine/blob/spark-prototype/or-model/src/main/java/org/openrefine/model/GridState.java#L277-L299

ChangeData objects can of course be persisted to disk and restored, with
an API similar to that of GridState.

In addition, projects are associated with a ChangeDataStore which holds
all the change data objects associated with operations in the project
history.

https://github.com/OpenRefine/OpenRefine/blob/spark-prototype/or-model/src/main/java/org/openrefine/model/changes/ChangeDataStore.java

This interface has two implementations, and we can choose between the
two to switch between the two behaviours stated in the goals above:
- the FileChangeDataStore persists all the change data it stores into
the project directory. This is the default one, used in the interactive
tool;
- the LazyChangeDataStore does not persist its change data objects.
Instead it holds them in a simple Map. This is used when running
operations in an external pipeline.

The implementation of a long-running operation follows a simple workflow.
1. In the process associated to the operation, create a ChangeData
object from the GridState of the project.
2. Then, register the change data in the store.
3. Finally, return a Change object which applies as follows:
* Load the ChangeData object back via the data store
* join it with the current grid
* return the modified grid.

https://github.com/OpenRefine/OpenRefine/blob/spark-prototype/main/src/main/java/org/openrefine/operations/column/ColumnAdditionByFetchingURLsOperation.java#L425-L469

When using FileChangeDataStore, this workflow will fetch all the
external data at the second step (since it is required to persist the
data to disk).

When using LazyChangeDataStore, none of these steps actually require
fetching the external data: this fetching will only be triggered later
on when the grid state is evaluated. Those calls will be intertwined
with other external calls, in a streaming fashion.

This also leaves the door open to resumption of interrupted long-running
operations. If we add enough metadata to the serialization of change
data, then we should be able to start again the fetching process after
an incomplete attempt.

This architecture should also be compatible with running independent
operations concurrently. (By compatible I mean that it should not be
hard to make it evolve to support that - it is by no means supported as
of today).

The remaining steps are:
- add back support for progress reporting and cancellation. The latter
is a bit tricky to do properly because Spark does not offer an
asynchronous interface to save operations, making it hard to cancel
those tasks.
- migrate the remaining operations (so far only fetching URLs is supported)

Best,
Antonin

Thad Guidry

unread,
Aug 17, 2020, 5:51:41 PM8/17/20
to openref...@googlegroups.com
Reviewed a bit of what you explained and also scanned through the code on the `spark-prototype` branch.
But a question appeared in my mind, both reading your above explanation and then also the code.

ChangeData objects can of course be persisted to disk and restored, with
an API similar to that of GridState.

In addition, projects are associated with a ChangeDataStore which holds
all the change data objects associated with operations in the project
history.
 
You said "can" of course be persisted to disk...
1. Where is the detection in the code for when/if the ChangeData cannot fit in memory?  Maybe to flip around the question... What are the conditions that would invoke RowChangeDataProducer in the code? 
If you can just reply back with an answer to 1. , or may add some code comments where that might be best explained.

Also, great job on naming things in the code and improving Spark's own documentation for RDD.


Antonin Delpeuch (lists)

unread,
Aug 18, 2020, 12:40:12 AM8/18/20
to openref...@googlegroups.com
On 17/08/2020 23:51, Thad Guidry wrote:
> 1. Where is the detection in the code for when/if the ChangeData cannot
> fit in memory?

ChangeData objects are serialized to disk. So if you do not have enough
disk space to store the contents of the URLs you fetched (say), the
operation will fail.

Antonin

Antonin Delpeuch (lists)

unread,
Sep 2, 2020, 12:38:33 PM9/2/20
to openref...@googlegroups.com
Hello all,

Here are some news from this migration.

There has not been so much progress this month, but I have finally
migrated all operations to the new architecture (not counting
extensions, which I have not touched at all so far). The two remaining
operations were reconciliation and data extension.

For the reconciliation operation, we used to proceed as follows:
- first, prepare all the reconciliation queries generated by the table,
deduplicating them.
- second, send these queries to the service and record the responses.
That has the benefit of minimizing the number of calls made to the
reconciliation service: if the first and last rows of the project happen
to contain the same data, the service will only be queried with their
data once.

The problem with this approach is that it does not fit well in either
distributed or stream-based environment:
- in a distributed environment, this would require communication between
the nodes to perform this deduplication (a shuffle in Spark terms)
- in a stream-based environment, this would require reading the entire
stream before starting to perform any reconciliation query.

Therefore I have used another approach, which consists in using a simple
cache like the one used in the URL fetching operation. This cache is
local to each worker, removing the need to communicate between workers.
This means that reconciliation queries generated by rows that are nearby
will still be deduplicated, but this cache will only hold up to a fixed
number of responses at any time. I think that should work well in
practice (the slowdown should not be noticeable).

Another small difference, that I am perhaps not as happy with, is the
way queries are grouped in batches. At the moment, when fetching
external data for a change, the fetcher can declare a batch size. If it
does so, it will be called by batches of the desired length. But when
the fetcher uses a cache (as in the case of the reconciliation
operation), this means that some of the rows in a batch might not
generate a new reconciliation query, so the reconciliation service can
be hit by batches smaller than the target size. That is not a big
problem in itself, but it can mean generating more HTTP requests than in
the previous architecture. One way to solve this could be to add support
for caching directly in the datamodel interfaces: the fetcher could
generate keys for each row/record and the datamodel implementation would
be responsible for the caching directly.

For the data extension operation, we did not have any caching so far,
but the same problem applies. This would also benefit from a caching
mechanism natively supported by the datamodel interface.

I also (re)discovered one aspect of the data extension operation which
made its migration a bit more difficult. When you run this operation in
rows mode, the operation does not respect facets fully.
For instance, consider the following table:

a b d
c

Say you have facets which only select the first row. Fetch data from the
last column, fetching one property which has two values for "d". This
will give you the following table:

a b d v1
c v2

So the second row is modified even if it was not selected by the facet
in the first place. This is useful to maintain a record structure, but
that means that the operation can not use the row-based methods of the
GridState interface (since they enforce that facets are respected
row-wise). I solved this by running the operation record-wise in all
situations, and adding back row-wise filtering inside records if the
rows mode is enabled. So the current behaviour is preserved.

Next steps:
- add progress reporting (still not supported)
- migrate facets, importers and exporters

Antonin

Thad Guidry

unread,
Sep 2, 2020, 1:04:53 PM9/2/20
to openref...@googlegroups.com
Awesome new, Antonin.
I personally would like to see the importers prioritized next.
But I think realistically we might have more technical challenges within Facets, so they probably might be best to dive into and address those challenges head on as a priority?  Do you agree?


Tom Morris

unread,
Sep 2, 2020, 2:57:29 PM9/2/20
to openref...@googlegroups.com
Thanks for the update. It seems early in the process to be worrying about performance optimizations like caching. External caches in front of the reconciliation service and/or on the local network are always an option.

As for this:
 
I also (re)discovered one aspect of the data extension operation which
made its migration a bit more difficult. When you run this operation in
rows mode, the operation does not respect facets fully.

If that's not a typo where you meant record mode, that just sounds like a bug. Is there a good reason to preserve the buggy behavior?

As for next steps, I agree that facets are much higher priority than importers/exporters (well, except for perhaps basic CSV/TSV support to get data into and out of the system). Facets are a key part of the operation of the system.

Tom

--
You received this message because you are subscribed to the Google Groups "OpenRefine Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to openrefine-de...@googlegroups.com.

Antonin Delpeuch (lists)

unread,
Sep 3, 2020, 2:23:49 AM9/3/20
to openref...@googlegroups.com
On 02/09/2020 20:57, Tom Morris wrote:
> I also (re)discovered one aspect of the data extension operation which
> made its migration a bit more difficult. When you run this operation in
> rows mode, the operation does not respect facets fully.
>
>
> If that's not a typo where you meant record mode, that just sounds like
> a bug. Is there a good reason to preserve the buggy behavior?

I did mean what you quoted. In records mode, facets are respected, but
not in rows mode.

For me, the current behaviour feels right, although it can be a bit
surprising at first. Taking the same example grid, what other behaviour
could we have?

We could only fetch the first value of each property when in rows mode,
which would give us:

a b d v1
c

But that is silently discarding data, which sounds pretty dangerous.

Or we could add extra values on new empty rows:

a b d v1
v2
c

But that breaks the record structure, since b and c should intuitively
be on contiguous rows.

That being said this last behaviour is perhaps okay for most purposes, I
could be convinced to switch to that. But the bottom line is that when
dealing with multi-valued properties, the user will have to switch to
the records mode anyway.

>
> As for next steps, I agree that facets are much higher priority than
> importers/exporters (well, except for perhaps basic CSV/TSV support to
> get data into and out of the system). Facets are a key part of the
> operation of the system.

Sure, the CSV/TSV importer is already migrated. The facet architecture
is also in place and the text facet is already available, it is "just" a
matter of migrating the numeric, timeline and scatterplot facets,
although I am sure I will encounter unexpected road blocks there too.
There is also the idea of facet computation by sampling rows (instead of
processing all rows) which needs to be exposed to the user.

Antonin
> <mailto:openrefine-dev%2Bunsu...@googlegroups.com>.
> --
> You received this message because you are subscribed to the Google
> Groups "OpenRefine Development" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to openrefine-de...@googlegroups.com
> <mailto:openrefine-de...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/openrefine-dev/CAE9vqEFVcL3o5OCG2whrZhG%3DhtkbBSWzTpLxpydaSMc7TePesQ%40mail.gmail.com
> <https://groups.google.com/d/msgid/openrefine-dev/CAE9vqEFVcL3o5OCG2whrZhG%3DhtkbBSWzTpLxpydaSMc7TePesQ%40mail.gmail.com?utm_medium=email&utm_source=footer>.

Tom Morris

unread,
Sep 4, 2020, 2:03:00 PM9/4/20
to openref...@googlegroups.com
I didn't fully grasp the issue before, so I went back and took a closer look.

On Thu, Sep 3, 2020 at 2:23 AM Antonin Delpeuch (lists) <li...@antonin.delpeuch.eu> wrote:
On 02/09/2020 20:57, Tom Morris wrote:

> If that's not a typo where you meant record mode, that just sounds like
> a bug. Is there a good reason to preserve the buggy behavior?

I did mean what you quoted. In records mode, facets are respected, but
not in rows mode.

For me, the current behaviour feels right, although it can be a bit
surprising at first. Taking the same example grid, what other behaviour
could we have?

Two other possibilities are: 1) store an error and 2) store a list
 
We could only fetch the first value of each property when in rows mode,
which would give us:

a       b       d       v1
        c

But that is silently discarding data, which sounds pretty dangerous.

Or we could add extra values on new empty rows:

a       b       d       v1
                        v2
        c

But that breaks the record structure, since b and c should intuitively
be on contiguous rows.

That's not clear to me because the semantics have never been clearly defined. Are the additional rows mini-subrecords who's columns form a coherent set or is every column independent with the multiple values for the record in that column stored in arbitrary order directly underneath the anchor row for the record? I bet there are use cases for (and users who expect) both.
 
That being said this last behaviour is perhaps okay for most purposes, I
could be convinced to switch to that. But the bottom line is that when
dealing with multi-valued properties, the user will have to switch to
the records mode anyway.

I think the behavior for three Add Column ... operations should be consistent. Currently if I use Add Column Based on this Column with an expression like "a,b".split(","), I get "String[] value not storable". This is part of the schizophrenic Record Mode / multi-valued cells behavior that needs to be sorted out. 

Extending your example,

a       b       d       v1
        c       e       v2


if extending "d" returns v1 & v2, where does the extended value for "e" go? What if "e" doesn't have an extended value? How do I know that v2 is the second extended value for "d", not the extended value for "e"? The safest behavior, until we have a better model, is probably just to make this an error.

Tom

Thad Guidry

unread,
Sep 4, 2020, 3:51:45 PM9/4/20
to openref...@googlegroups.com
I think that drawing lines or visually representing the data within a Tree is more suited, rather than pure rows.  Using solid horizontal rows doesn't help much reflect relationships, parent-child, or keys very well.
Some of my design ideas are to change this and visually represent hierarchies much better than our current UI does.
One of those ideas is actually collapsible subrecords and zoom capability, where keys are used to hold those relationships.

For the modeling, can this be approached with multiple RDD's as overlays where those relationships are handled with keys?  Just like we do in regular RDBMS's and multiple tables?

Note: when using custom objects as the key in key-value pair operations, you must be sure that a custom equals() method is accompanied with a matching hashCode() method. For full details, see the contract outlined in the Object.hashCode() documentation.
 
For me, I think it's easier to approach Records mode and hierarchical data needs by doing the following:
1. Use RDD's for representing the low level subrecords.
2. Use Spark SQL and DataFrames to construct records from the existing RDD's.  Where use of join(), cogroup(), groupWith(), etc. help build relationships of record elements.

Transforms and Filters, etc. seem to become easier in this approach. https://stackoverflow.com/questions/28332494/querying-spark-sql-dataframe-with-complex-types

As far as our UI is concerned, which is where I think both of you are struggling with decisions on data representation... we should not be the ones picking that... let the user do so and simply offer Flexible Data Representation as I have tried to conceptualize and help design further in https://github.com/OpenRefine/OpenRefine/issues/2825

Antonin Delpeuch (lists)

unread,
Sep 5, 2020, 3:20:45 AM9/5/20
to openref...@googlegroups.com
On 04/09/2020 20:02, Tom Morris wrote:
> I didn't fully grasp the issue before, so I went back and took a closer
> look.
>
> On Thu, Sep 3, 2020 at 2:23 AM Antonin Delpeuch (lists)
> <li...@antonin.delpeuch.eu <mailto:li...@antonin.delpeuch.eu>> wrote:
>
> On 02/09/2020 20:57, Tom Morris wrote:
>
> > If that's not a typo where you meant record mode, that just sounds
> like
> > a bug. Is there a good reason to preserve the buggy behavior?
>
> I did mean what you quoted. In records mode, facets are respected, but
> not in rows mode.
>
> For me, the current behaviour feels right, although it can be a bit
> surprising at first. Taking the same example grid, what other behaviour
> could we have?
>
>
> Two other possibilities are: 1) store an error and 2) store a list

I think it sends a pretty bad message if we start storing errors when
properties have multiple values: this is a situation that has been
supported so far, so if we change this behaviour it should be an
improvement over that.

Storing lists in single cells is not supported yet as you note below.

>  
>
> We could only fetch the first value of each property when in rows mode,
> which would give us:
>
> a       b       d       v1
>         c
>
> But that is silently discarding data, which sounds pretty dangerous.
>
> Or we could add extra values on new empty rows:
>
> a       b       d       v1
>                         v2
>         c
>
> But that breaks the record structure, since b and c should intuitively
> be on contiguous rows.
>
>
> That's not clear to me because the semantics have never been clearly
> defined. Are the additional rows mini-subrecords who's columns form a
> coherent set or is every column independent with the multiple values for
> the record in that column stored in arbitrary order directly underneath
> the anchor row for the record? I bet there are use cases for (and users
> who expect) both.

Yes, that is the ambiguity that column groups were supposed to solve,
but this functionality has never been finished and is unusable in its
current state.

>  
>
> That being said this last behaviour is perhaps okay for most purposes, I
> could be convinced to switch to that. But the bottom line is that when
> dealing with multi-valued properties, the user will have to switch to
> the records mode anyway.
>
>
> I think the behavior for three Add Column ... operations should be
> consistent. Currently if I use Add Column Based on this Column with an
> expression like "a,b".split(","), I get "String[] value not storable".
> This is part of the schizophrenic Record Mode / multi-valued cells
> behavior that needs to be sorted out.

Yes I would very much welcome improvements on this front, but I think
trying to single-handedly migrate out of an in-memory model and design
an alternative to the records mode (or fix it) would be a bad idea. So
that is why I am preserving the current behaviour so far - that does not
mean I am against improving over it in the future.

>
> Extending your example,
>
> a       b       d       v1
>         c       e       v2
>
> if extending "d" returns v1 & v2, where does the extended value for "e"
> go? What if "e" doesn't have an extended value? How do I know that v2 is
> the second extended value for "d", not the extended value for "e"? The
> safest behavior, until we have a better model, is probably just to make
> this an error.

So let's say our table is

a b d
c e

If fetching values for "e" returns two values "w1" and "w2", this will give:

a b d v1
v2
c e w1
w2

The easiest thing is to try it for yourself with Wikidata, I would say
(for instance, fetching cast members on films).

Antonin

Antonin Delpeuch (lists)

unread,
Oct 5, 2020, 8:23:51 AM10/5/20
to openref...@googlegroups.com
Hi all,

Here is a summary of what happened on this front this month: the
migration of facets.

The architecture for facets was already in place and the list facet was
already migrated. I worked on migrating the remaining facets (text
filter, numeric facet, timeline facet and scatterplot facet) and adding
a sampling strategy for large datasets.

1. Numeric facet

The numeric facet creates a histogram of all numeric values generated by
its expression. The bins of the histogram all have the same size, which
is a power of 10. This power of 10 is determined by the range of the
data (max - min), so that the total number of bins is not bigger than a
fixed constant (100 at the moment). If you have worked a bit with these
facets, you might have found this a bit disconcerting at times - it does
not always give very natural results. For instance if you have some
outliers which are way beyond the typical distribution of your values,
this will force the bin size to be coarser than you might want. I have
created an issue for this:
https://github.com/OpenRefine/OpenRefine/issues/3248. It should be
possible to fix it regardless of the architecture.

Independently of this issue, the new architecture for this facet
exploits the fact that the bin boundaries at various scales are aligned:
for instance, a bin of size 0.1 contains 10 bins of size 0.01. This
means that it is possible to scan the grid and add values in the
corresponding bins incrementally. When a new value falls outside of the
current histogram, new bins are added on the fly, and when there are too
many such bins, the histogram is rescaled (making it coarser) by merging
neighbouring bins together. The nice thing about this approach is that
we never need to compute the list of all numeric values covered by the
facet, which could be arbitrarily big for large projects. Instead, we
only work with histograms, whose size can be capped (by configuring the
maximum number of bins).

This approach could easily be adapted to define a desired bin size
(still as a power of 10).

For smaller datasets it is possible that this approach becomes slower
(as the code is a bit more complicated). I have not properly benchmarked
that so far, but with interactive testing it seems unnoticeable
(compared to the overheads Spark brings, for instance. These will go
away once we migrate to a home-grown local implementation of the data
model).

2. Timeline facet

The timeline facet is pretty similar to the numeric facet, it also
produces a histogram whose bin size is determined by the data. However,
unlike the numeric facet, the possible bin sizes are not powers of 10 in
some unit, but rather "natural" time constants: second, minute, hour,
day, week, month… This means that the boundaries for each histogram bar
are more natural for the user (it would not be very practical to group
together all dates by powers of 10 for their number of seconds since
Epoch, for instance).

However, that makes it impossible to use the same incremental binning
strategy as above, since the various bin scales overlap: some weeks
spread over two different months, for instance. So for this facet, we
use a simpler strategy: we aggregate the list of all dates covered by
the facet, and compute the histogram afterwards. This is essentially
identical to the algorithm that was used before, so it should not
degrade performance. One small optimization I did was to use native
arrays (long[]) instead of lists of boxed integers (List<Long>) which
are more memory-efficient, so that should hopefully improve memory
consumption marginally.

If we want to invest more time into speeding up this facet, we could
change the faceting API to let facets make a first pass on the data to
compute the min and max values (which can be done without keeping all
the values in memory), and then do the actual aggregation with the min
and max already computed. That would let us determine the bin size after
the first pass, and use incremental binning for the second pass, which
would be more memory-efficient. But it might be slower because of the
two passes.

3. Scatterplot facet

The scatterplot facet does not do any binning: it just draws a point for
each row where two numerical values are generated by the expressions. I
migrated it to an architecture similar to the timeline facet: first,
compute all the coordinates of the points covered by the facet, then
draw the image.

If facets were allowed to do two passes on the data, we could again
optimize this to avoid storing the list of all pairs of values: first,
compute the min and max values for each coordinate, and then draw the
points on the image incrementally in a second pass on the data. One
slight complication is that aggregation states are required to be
serializable (for the sake of compatibility with Spark) so some care
would be required to make sure the image is serializable properly, but
it could be doable. I did not venture into that because I think the
facet would deserve a big overhaul, where the facet would be generated
client-side instead of server-side, with a zoomable view (like a slippy
map).

One issue encountered while migrating this facet was that its filtering
settings were relative to the extremal values seen by the facet, making
it unreproducible: https://github.com/OpenRefine/OpenRefine/issues/3222

I solved this by including the min and max values on both axis in the
facet configuration, making it possible to reinterpret the user-selected
coordinates on the scatterplot facet.

I see this as a small positive sign for the new architecture: the new
faceting API makes it harder to define non-reproducible facets (although
it was not the original aim). I really had to fix this bug to migrate
the scatterplot facet to the new architecture.

4. Sampling strategy

I have posted more details about this in a separate thread:
https://groups.google.com/forum/#!topic/openrefine-dev/Is1Q9abYfQs

5. Next steps

I will work on importers and exporters, where the API differences are
more minor, so they should hopefully be faster to migrate.

Best,

Antonin Delpeuch (lists)

unread,
Nov 5, 2020, 3:55:42 AM11/5/20
to openref...@googlegroups.com
Hi all,

Here is a new update on this project. This month I migrated importers to
the new architecture. As before my aim was to preserve existing
functionality so there isn't much to see on the user-facing side of
things, but I can tell you about what happened behind the scenes.

The importing stage can be an important bottleneck to scalability, so
it's an area where it is important to get it right. Here is an overview
of the current process:
- the user uploads data from their browser to the backend (for instance
by uploading a file or submitting some text in the clipboard area).
- the backend saves this file as is, in a dedicated temporary location.
- the format of the file is guessed based on the extension, MIME type
and the file contents
- the best importer selected for that format gets a chance to read the
file (or parts of it) to propose sensible import options (for instance,
guessing the separator of a CSV/TSV file)
- the importer produces a preview of the import, again by reading a
small part of the file only if possible
- once the user validates the import, the entire file is parsed in memory
- the grid is then saved to disk when the project is saved

In this process, the main bottleneck is the parsing of the project into
memory before it is saved as a parsed project. Ideally we would like to
do this in a streaming fashion: reading rows from the source file and
saving them as parsed project rows directly after, so as to avoid having
to hold the entire grid in memory.

This is something that is unfortunately not so easy to do, for multiple
reasons.

First, for some formats it is just very hard (or plainly impossible) to
parse them as a stream. Often, it is not supported (or not fully
supported) by the libraries we rely on. This is the case for ODS or XLS
files for instance (the value of a cell in a spreadsheet can depend on
cells very far away from them). This means that for those formats, we
will still have to parse the entire project at the import stage. The
other performance improvements will still be applicable after the import
stage (since we do not work on the original file after that).

Second, Spark's data model does not make it easy to leverage streaming
parsers. This has to do with the structure of RDDs (Resilient
Distributed Datasets) on which Spark is based. A RDD consists of
partitions, which represent sequential chunks of the grid. From each of
these partitions, you can read in a streaming fashion (enumerate the
items in it, in order). Assuming you have a stream of elements that you
want to convert to a RDD, there is no API method to do so without
loading all elements in memory in the process. This is because Spark
would need to iterate over the stream multiple times: first to count the
number of elements in it (to determine where partitions should start and
end), and then to iterate from any given partition. Instead of following
this strategy, Spark fetches all items from the stream in an array from
the start.

There is still some cases where we can obtain a performance improvements
by passing on the parsing to Spark's native parsers. This is the case for:
- text files
- fixed width column files
- CSV/TSV files
This relies on the fact that for all these files, lines are separated by
newline characters, which makes it easy for Spark to efficiently read
partitions from their starting point.

To maintain the separation between the application code (where importers
are implemented) and the datamodel itself (which can be implemented by
Spark), I have added a dedicated method on the DatamodelRunner interface
to load text files. This can be used by line-based importers as a first
step, after which they can parse the lines into rows as needed.

For CSV/TSV files, I haven't implemented this yet because there is
unfortunately a slight complication: using quotes to delimit cell values
means that cell values can contain newlines, so a given row can spread
on multiple lines. Spark's own CSV parser falls back on full in-memory
parsing in these cases. I will open a separate thread to bring up
related problems with the CSV importer.

Another next step I have in mind is adding a method in the datamodel
interface to parse a grid from an iterable stream (a stream that you can
iterate over multiple times). Even if by default Spark does not take
advantage of that, other datamodel runners could (our home-grown one,
Flink, others…)

This should make it possible to improve the efficiency of parsers such
as the JSON/XML parsers, for which we can read the file in a streaming
fashion.

Another improvement I would also like to make is the ability to create
an OpenRefine project from a file, without duplicating the file contents
in the workspace: being able to work from the original file directly.
This would be useful for large datasets held in a cluster (for instance
a Hadoop file system), when they are in a format that is quick enough to
parse.

We should also discuss about whether (and how) we would like to support
formats such as Parquet. Since the use of Spark in OpenRefine should be
optional, this means we should rely on other libraries to parse these
(when Spark is not available). It is not clear to me that this is an
urgent need of the OpenRefine community either, so I haven't dedicated
effort to that yet (but I have ideas of how to make it work). Perhaps it
would still be worth doing it for the sake of bringing visible changes
to users.

The remaining area of the code base that is still to be migrated is the
clustering methods. For this area again I will try to do a simple and
correct migration, leaving more advanced performance optimizations for
later. In that process I should be able to make the fingerprinting
methods more scalable, but definitely not the distance-based clusterers.

As always, I am keen to hear your thoughts on all this!