Bonjour,
Je suis en train de développer un service au-dessus de stack-overflow. J'ai un petit crawler qui me récupère les questions depuis leur API et qui les charges dans une db PG.
Je souhaite ensuite me servir d'Elasticsearch pour faire des recherches fulltext sur les title, body et tags.
Pour cela, j'utilise le plugin JDBC de jprant en pull-mode/river (je sais, c'est un peu deprecated, mais ça m'arrange).
Logs :
[2015-04-21 00:02:35,369][DEBUG][action.bulk ] [Ancient One] [grep1][3] failed to execute bulk item (index) index {[grep][stack_questions][29660932], source[{"qid":"29660932","website":"stackoverflow","link":"http://stackoverflow.com/questions/29660932/how-to-make-filter-aggregations-using-pyes","tags":"elasticsearch elasticsearch-plugin elastic pyes pyelasticsearch","title":"How to make filter aggregations using PyES","body":{"aggs":{"in_stock_products":{"aggs":{"avg_price":{"avg":{"field":"price"}}},"filter":{"range":{"stock":{"gt":0}}}}}},"owner_uid":"4793874","owner_name":"Subbu Pendyala","createdAt":"2015-04-15T23:08:08.000+02:00","updatedAt":"2015-04-15T23:08:19.783+02:00"}]}
org.elasticsearch.index.mapper.MapperParsingException: failed to parse [body]
at org.elasticsearch.index.mapper.core.AbstractFieldMapper.parse(AbstractFieldMapper.java:415)
at org.elasticsearch.index.mapper.object.ObjectMapper.serializeObject(ObjectMapper.java:555)
at org.elasticsearch.index.mapper.object.ObjectMapper.parse(ObjectMapper.java:490)
at org.elasticsearch.index.mapper.DocumentMapper.parse(DocumentMapper.java:541)
at org.elasticsearch.index.mapper.DocumentMapper.parse(DocumentMapper.java:490)
at org.elasticsearch.index.shard.service.InternalIndexShard.prepareIndex(InternalIndexShard.java:413)
at org.elasticsearch.action.bulk.TransportShardBulkAction.shardIndexOperation(TransportShardBulkAction.java:435)
at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:150)
at org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction$AsyncShardOperationAction.performOnPrimary(TransportShardReplicationOperationAction.java:511)
at org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction$AsyncShardOperationAction$1.run(TransportShardReplicationOperationAction.java:419)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.elasticsearch.ElasticsearchIllegalArgumentException: unknown property [aggs]
at org.elasticsearch.index.mapper.core.StringFieldMapper.parseCreateFieldForString(StringFieldMapper.java:331)
at org.elasticsearch.index.mapper.core.StringFieldMapper.parseCreateField(StringFieldMapper.java:277)
at org.elasticsearch.index.mapper.core.AbstractFieldMapper.parse(AbstractFieldMapper.java:405)
... 12 more
Le problème :
{"aggs":{"in_stock_products":{"aggs":{"avg_price":{"avg":{"field":"price"}}},"filter":{"range":{"stock":{"gt":0}}}}}}
Je pense qu'il tente de parser ceci en json, car :
- l'ordre des attributs est changé par rapport au post de stackoverflow
- il ne conserve pas le reste de la question
J'ai déclaré le mapping suivant :
{
"stack_questions": {
"_id": {
"path": "qid",
"index": "no"
},
"_timestamp": {
"enabled": true,
"path": "createdAt",
"format": "dateTime"
},
"properties": {
"website": {
"type": "string",
"index": "not_analyzed"
},
"link": {
"type": "string",
"index": "no"
},
"tags": {
"type": "string",
"index": "analyzed"
},
"title": {
"type": "string",
"index": "analyzed",
"analyzer": "english"
},
"body": {
"type": "string",
"index": "analyzed",
"analyzer": "english"
},
"owner_uid": {
"type": "string",
"index": "not_analyzed"
},
"owner_name": {
"type": "string",
"index": "not_analyzed"
},
"updatedAt": {
"type": "date",
"format": "dateTime",
"store": false
}
}
}
}
Le body devrait donc être analysé avec un language analyser.
A chaque fois qu'une exception est lancé pendant le transfert, le bulk entier s'arrête. Malheureusement, il y a maintenant tellement de docs qui n'ont pas été indexés dans ES que tous les bulks contiennent au moins un document qui pète.
La river :
{
"interval": "30s",
"timezone": "UTC/GMT",
"type": "jdbc",
"jdbc": {
"url": "jdbc:postgresql://{url}:5432/{db}?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory",
"user": "{user}",
"password": "{password}",
"sql": "SELECT *, qid AS _id FROM questions",
"index": "stackoverflow",
"type": "stack_questions",
"index_settings": {
"index" : {
"refresh_interval" : "30s"
}
}
}
}
Le doc, tel qu'il est stocké dans PG :
=> select body FROM questions WHERE qid = '29660932';
body
------------------------------------------------------------------------
{\r +
"aggs" : {\r +
"in_stock_products" : {\r +
"filter" : { "range" : { "stock" : { "gt" : 0 } } },\r+
"aggs" : {\r +
"avg_price" : { "avg" : { "field" : "price" } }\r +
}\r +
}\r +
}\r +
}\r +
\r +
How to generate the above query using PyES? +
Je ne suis même pas certain que l'erreur vienne d'ES. C'est peut être dû au plugin JDBC...
Version d'ES : 1.4.1
Version du driver JDBC : postgresql-9.4-1201.jdbc41
Version de la river JDBC : 1.4.0.10
Merci pour votre aide