I have a CSV which is utf-16 encoded (outputted from SSIS). Before I can load it into my database, I need to clean up the formatting of the data a bit. The idea is that I open the CSV into a CSVDict, process it (formats, names, etc), then output it as a gzip. When I run my code, I get this error: " TypeError: a bytes-like object is required, not 'str' "
I had to use with open(file, 'r', encoding='utf16') to properly load the file. When I tried to implement this logic completely in Luigi, I got an error:
class ProcessFile(luigi.Task):
filename = luigi.Parameter()
filedate = luigi.DateParameter(default = (datetime.date.today() - datetime.timedelta(1)))
filetype = luigi.Parameter(default='.csv')
def requires(self):
return GetFileFromFTP(self.filename, self.filedate, self.filetype)
def output(self):
return luigi.LocalTarget('output.csv')
def run(self):
r = self.input().open('r')
csvread = csv.DictReader(r, delimiter='|', quotechar='"')
w = self.output().open('w')
headers = ['Column1', 'Column2', 'Column3', 'Column4',
'Column5', 'Column6', 'Column7', 'Column8',
'Column9', 'Column10', 'Column11', 'Column12', 'Column13']
writer = csv.DictWriter(w, headers, extrasaction='ignore', delimiter='|', quoting=csv.QUOTE_ALL)
writer.writeheader()
#A bunch of for row in r logic
writer.writerow(row)
Error: " UnicodeDecodeError: 'utf-8' codec can't decode byte 0xff in position 0: invalid start byte "
It seems that this is due to the file being in UTF-16 encoding, so I re-wrote the code like so:
class ProcessFile(luigi.ExternalTask):
filename = luigi.Parameter()
filedate = luigi.DateParameter(default = (datetime.date.today() - datetime.timedelta(1)))
filetype = luigi.Parameter(default='.csv')
def requires(self):
return GetFileFromFTP(self.filename, self.filedate, self.filetype)
def output(self):
return luigi.LocalTarget('output.gz', format=luigi.format.Gzip)
def run(self):
file = self.filename + self.filedate.strftime("%Y%m%d") + self.filetype
with open(file, 'r', encoding='utf16') as readfile:
csvread = csv.DictReader(readfile, delimiter='|', quotechar='"')
w = self.output().open('w')
headers = ['Column1', 'Column2', 'Column3', 'Column4',
'Column5', 'Column6', 'Column7', 'Column8',
'Column9', 'Column10', 'Column11', 'Column12', 'Column13']
writer = csv.DictWriter(w, headers, extrasaction='ignore', delimiter='|', quoting=csv.QUOTE_ALL)
writer.writeheader()
for row in csvread:
row['Column14'] = datetime.datetime.strptime(row['Column13'], '%Y-%m-%d %H:%M:%S').strftime("%Y-%m-%d")
if len(row['Column12']) == 29:
row['Column15'] = datetime.datetime.strptime(row['Column12'][:26], '%Y-%m-%d %H:%M:%S.%f').strftime("%Y-%m-%d")
elif len(row['CreatedDate']) == 19:
row['Column15'] = datetime.datetime.strptime(row['Column12'], '%Y-%m-%d %H:%M:%S').strftime("%Y-%m-%d")
else:
row['Column15'] = datetime.datetime.strptime(row['Column12'], '%Y-%m-%d').strftime("%Y-%m-%d")
#Cleanup unformatted decimals
#If the decimal does not have a number before it, add a leading zero
for key, value in row.items():
if value[:1] == '.' and value.replace('.','',1).isdigit() == True:
row[key] = '0' + value
writer.writerow(row)
w.close()
When I run this, I get the error: " TypeError: a bytes-like object is required, not 'str' ". I could add another Task to gzip, but I know Luigi can gzip a file as the output. Anyone have an idea of what I am doing incorrectly?
Thanks