I'm new to disco and I'm struggling :(
I have an input file in JSON format which is basically a list of objects:
[
{"id": 1,
"value": 121},
{"id: 2,
"value": 32},
{"id": 3,
"value": 6656}
]
I try to write my custom map_reader function to yield single objects from the JSON file.
If I use the file directly as Job input: SomeJobClass().run(input=["file://path"]) I am able to write a custom_json_reader() function which yields single objects with map_reader=custom_json_reader
However, If I push chunked data into DDFS and use it as input SomeJobClass().run(input=["tag://data:log"]) I can't get it to work.
As far as I understand the documentation, DDFS data is binary and I have to use disco.worker.task_io.chain_reader
So I have the following class
from disco.worker.task_io import chain_reader
from json_reader import custom_json_reader
class SomeJobClass(Job):
map_input_stream = [chain_reader, custom_json_reader]
def map():
pass
def reduce():
pass
I cannot figure out how to get the data out of DDFS through chain_reader into custom_json_reader to yield single objects from the JSON file.
The custom_json_reader itself works fine if I use the file directly as input. How do I make it work when the data is in the DDFS?