Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

Multi thread reading a file

12 views
Skip to first unread message

Mag Gam

unread,
Jun 30, 2009, 9:52:18 PM6/30/09
to pytho...@python.org
Hello All,

I am very new to python and I am in the process of loading a very
large compressed csv file into another format. I was wondering if I
can do this in a multi thread approach.

Here is the pseudo code I was thinking about:

Let T = Total number of lines in a file, Example 1000000 (1 million files)
Let B = Total number of lines in a buffer, for example 10000 lines


Create a thread to read until buffer
Create another thread to read buffer+buffer ( So we have 2 threads
now. But since the file is zipped I have to wait until the first
thread is completed. Unless someone knows of a clever technique.
Write the content of thread 1 into a numpy array
Write the content of thread 2 into a numpy array

But I don't think we are capable of multiprocessing tasks for this....


Any ideas? Has anyone ever tackled a problem like this before?

Gabriel Genellina

unread,
Jul 1, 2009, 12:54:45 AM7/1/09
to pytho...@python.org
En Tue, 30 Jun 2009 22:52:18 -0300, Mag Gam <maga...@gmail.com> escribi�:

> I am very new to python and I am in the process of loading a very
> large compressed csv file into another format. I was wondering if I
> can do this in a multi thread approach.

Does the format conversion involve a significant processing time? If not,
the total time is dominated by the I/O time (reading and writing the file)
so it's doubtful you gain anything from multiple threads.

> Here is the pseudo code I was thinking about:
>
> Let T = Total number of lines in a file, Example 1000000 (1 million
> files)
> Let B = Total number of lines in a buffer, for example 10000 lines
>
>
> Create a thread to read until buffer
> Create another thread to read buffer+buffer ( So we have 2 threads
> now. But since the file is zipped I have to wait until the first
> thread is completed. Unless someone knows of a clever technique.
> Write the content of thread 1 into a numpy array
> Write the content of thread 2 into a numpy array

Can you process each line independently? Is the record order important? If
not (or at least, some local dis-ordering is acceptable) you may use a few
worker threads (doing the conversion), feed them thru a Queue object, put
the converted lines into another Queue, and let another thread write the
results onto the destination file.

import Queue, threading, csv

def convert(in_queue, out_queue):
while True:
row = in_queue.get()
if row is None: break
# ... convert row
out_queue.put(converted_line)

def write_output(out_queue):
while True:
line = out_queue.get()
if line is None: break
# ... write line to output file

in_queue = Queue.Queue()
out_queue = Queue.Queue()
tlist = []
for i in range(4):
t = threading.Thread(target=convert, args=(in_queue, out_queue))
t.start()
tlist.append(t)
output_thread = threading.Thread(target=write_output, args=(out_queue,))
output_thread.start()

with open("...") as csvfile:
reader = csv.reader(csvfile, ...)
for row in reader:
in_queue.put(row)

for t in tlist: in_queue.put(None) # indicate end-of-work
for t in tlist: t.join() # wait until finished
out_queue.put(None)
output_thread.join() # wait until finished

--
Gabriel Genellina

Lawrence D'Oliveiro

unread,
Jul 1, 2009, 1:07:47 AM7/1/09
to
In message <mailman.2420.1246413...@python.org>, Mag Gam
wrote:

> I am very new to python and I am in the process of loading a very
> large compressed csv file into another format. I was wondering if I
> can do this in a multi thread approach.

Why bother?

Stefan Behnel

unread,
Jul 1, 2009, 1:30:13 AM7/1/09
to
Gabriel Genellina wrote:
> En Tue, 30 Jun 2009 22:52:18 -0300, Mag Gam <maga...@gmail.com> escribi�:
>
>> I am very new to python and I am in the process of loading a very
>> large compressed csv file into another format. I was wondering if I
>> can do this in a multi thread approach.
>
> Does the format conversion involve a significant processing time? If
> not, the total time is dominated by the I/O time (reading and writing
> the file) so it's doubtful you gain anything from multiple threads.

Well, the OP didn't say anything about multiple processors, so multiple
threads may not help wrt. processing time. However, if the file is large
and the OS can schedule the I/O in a way that a seek disaster is avoided
(although that's hard to assure with today's hard disk storage density, but
SSDs may benefit), multiple threads reading multiple partial streams may
still reduce the overall runtime due to increased I/O throughput.

That said, the OP was mentioning that the data was compressed, so I doubt
that the I/O bandwidth is a problem here. As another poster put it: why
bother? Run a few benchmarks first to see where (and if!) things really get
slow, and then check what to do about the real problem.

Stefan

Scott David Daniels

unread,
Jul 1, 2009, 11:49:31 AM7/1/09
to
Gabriel Genellina wrote:
> ...

> def convert(in_queue, out_queue):
> while True:
> row = in_queue.get()
> if row is None: break
> # ... convert row
> out_queue.put(converted_line)

These loops work well with the two-argument version of iter,
which is easy to forget, but quite useful to have in your bag
of tricks:

def convert(in_queue, out_queue):
for row in iter(in_queue.get, None):


# ... convert row
out_queue.put(converted_line)

--Scott David Daniels
Scott....@Acm.Org

Mag Gam

unread,
Jul 1, 2009, 7:41:29 AM7/1/09
to Gabriel Genellina, pytho...@python.org
Thanks for the response Gabriel.

On Wed, Jul 1, 2009 at 12:54 AM, Gabriel
Genellina<gags...@yahoo.com.ar> wrote:


> En Tue, 30 Jun 2009 22:52:18 -0300, Mag Gam <maga...@gmail.com> escribió:
>
>> I am very new to python and I am in the process of loading a very
>> large compressed csv file into another format.  I was wondering if I
>> can do this in a multi thread approach.
>
> Does the format conversion involve a significant processing time? If not,
> the total time is dominated by the I/O time (reading and writing the file)
> so it's doubtful you gain anything from multiple threads.

The format does inolve significant time processing each line.


>
>> Here is the pseudo code I was thinking about:
>>
>> Let T  = Total number of lines in a file, Example 1000000 (1 million
>> files)
>> Let B = Total number of lines in a buffer, for example 10000 lines
>>
>>
>> Create a thread to read until buffer
>> Create another thread to read buffer+buffer  ( So we have 2 threads
>> now. But since the file is zipped I have to wait until the first
>> thread is completed. Unless someone knows of a clever technique.
>> Write the content of thread 1 into a numpy array
>> Write the content of thread 2 into a numpy array
>
> Can you process each line independently? Is the record order important? If
> not (or at least, some local dis-ordering is acceptable) you may use a few
> worker threads (doing the conversion), feed them thru a Queue object, put
> the converted lines into another Queue, and let another thread write the
> results onto the destination file.

Yes, each line can be independent. The original file is a time series
file which I am placing it into a Numpy array therefore I don't think
the record order is important. The writing is actually done when I
place a value into a "dset" object.


Let me show you what I mean.
reader=csv.reader(open("100000.csv"))
for s,row in enumerate(reader):
if s!=0 and s%bufsize==0:
dset[s-bufsize:s] = t #here is where I am writing the data to
the data structure. Using a range or broadcasting.

#15 columns
if len(row) != 15:
break

t[s%bufsize] = tuple(row)

#Do this all the way at the end for flushing.
if (s%bufsize != 0):
dset[(s//bufsize)*bufsize:s]=t[0:s%bufsize]


>
> import Queue, threading, csv


>
> def convert(in_queue, out_queue):
>  while True:
>    row = in_queue.get()
>    if row is None: break
>    # ... convert row
>    out_queue.put(converted_line)
>

> def write_output(out_queue):
>  while True:
>    line = out_queue.get()
>    if line is None: break
>    # ... write line to output file
>
> in_queue = Queue.Queue()
> out_queue = Queue.Queue()
> tlist = []
> for i in range(4):
>  t = threading.Thread(target=convert, args=(in_queue, out_queue))
>  t.start()
>  tlist.append(t)
> output_thread = threading.Thread(target=write_output, args=(out_queue,))
> output_thread.start()
>
> with open("...") as csvfile:
>  reader = csv.reader(csvfile, ...)
>  for row in reader:
>    in_queue.put(row)
>
> for t in tlist: in_queue.put(None) # indicate end-of-work
> for t in tlist: t.join() # wait until finished
> out_queue.put(None)
> output_thread.join() # wait until finished
>
> --
> Gabriel Genellina
>

> --
> http://mail.python.org/mailman/listinfo/python-list
>

Mag Gam

unread,
Jul 1, 2009, 9:28:12 PM7/1/09
to pytho...@python.org
LOL! :-)

Why not. I think I will take just do it single thread for now and if
performance is really that bad I can re investigate.

Either way, thanks everyone for your feedback! I think I like python a
lot because of the great user community and wiliness to help!

> --
> http://mail.python.org/mailman/listinfo/python-list
>

Stefan Behnel

unread,
Jul 2, 2009, 12:50:54 AM7/2/09
to
Mag Gam wrote:

Have you tried if using the csv reader here is actually fast enough for
you? Maybe you can just .split(',') each line or something (no idea what
else the csv reader may do, but anyway...)


> for s,row in enumerate(reader):
> if s!=0 and s%bufsize==0:
> dset[s-bufsize:s] = t #here is where I am writing the data to
> the data structure. Using a range or broadcasting.

Erm, what's "t"? And where do you think the "significant" processing time
comes from in your example? From your code, that's totally non-obvious to me.


> #15 columns
> if len(row) != 15:
> break
>
> t[s%bufsize] = tuple(row)
>
> #Do this all the way at the end for flushing.
> if (s%bufsize != 0):
> dset[(s//bufsize)*bufsize:s]=t[0:s%bufsize]

If you *really* think your code is not I/O bound, but that the code for
writing the data into the NumPy array is the slow part (which I actually
doubt, but anyway), you can try writing your code in Cython, which has
direct support for writing to NumPy arrays through their buffer interface.

http://docs.cython.org/docs/numpy_tutorial.html

Stefan

Gabriel Genellina

unread,
Jul 2, 2009, 6:10:17 AM7/2/09
to pytho...@python.org
En Wed, 01 Jul 2009 12:49:31 -0300, Scott David Daniels
<Scott....@acm.org> escribi�:

Yep, I always forget about that variant of iter() -- very handy!

--
Gabriel Genellina

ryles

unread,
Jul 2, 2009, 10:10:18 PM7/2/09
to
On Jul 2, 6:10 am, "Gabriel Genellina" <gagsl-...@yahoo.com.ar> wrote:
> En Wed, 01 Jul 2009 12:49:31 -0300, Scott David Daniels  
> <Scott.Dani...@acm.org> escribió:

> > These loops work well with the two-argument version of iter,
> > which is easy to forget, but quite useful to have in your bag
> > of tricks:
>
> >      def convert(in_queue, out_queue):
> >          for row in iter(in_queue.get, None):
> >              # ... convert row
> >              out_queue.put(converted_line)
>
> Yep, I always forget about that variant of iter() -- very handy!

Yes, at first glance using iter() here seems quite elegant and clever.
You might even pat yourself on the back, or treat yourself to an ice
cream cone, as I once did. There is one subtle distinction, however.
Please allow me to demonstrate.

>>> import Queue
>>>
>>> queue = Queue.Queue()
>>>
>>> queue.put(1)
>>> queue.put("la la la")
>>> queue.put(None)
>>>
>>> list(iter(queue.get, None))
[1, 'la la la']
>>>
>>> # Cool, it really works! I'm going to change all my old code to use this... new and *improved*
...
>>> # And then one day your user inevitably does something like this.
...
>>> class A(object):
... def __init__(self, value):
... self.value = value
...
... def __eq__(self, other):
... return self.value == other.value
...
>>> queue.put(A(1))
>>> queue.put(None)
>>>
>>> # And then this happens inside your 'generic' code (which probably even passed your unit tests).
...
>>> list(iter(queue.get, None))
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 5, in __eq__
AttributeError: 'NoneType' object has no attribute 'value'
>>>
>>> # Oh... yeah. I really *did* want 'is None' and not '== None' which iter() will do. Sorry guys!

Please don't let this happen to you too ;)

Paul Rubin

unread,
Jul 2, 2009, 10:20:03 PM7/2/09
to
ryles <ryl...@gmail.com> writes:
> >>> # Oh... yeah. I really *did* want 'is None' and not '== None'
> >>> which iter() will do. Sorry guys!
>
> Please don't let this happen to you too ;)

None is a perfectly good value to put onto a queue. I prefer
using a unique sentinel to mark the end of the stream:

sentinel = object()

ryles

unread,
Jul 2, 2009, 11:09:25 PM7/2/09
to
On Jul 2, 10:20 pm, Paul Rubin <http://phr...@NOSPAM.invalid> wrote:

I agree, this is cleaner than None. We're still in the same boat,
though, regarding iter(). Either it's 'item == None' or 'item == object
()', and depending on the type, __eq__ can introduce some avoidable
risk.

FWIW, even object() has its disadvantages. Namely, it doesn't work for
multiprocessing.Queue which pickles and unpickles, thus giving you a
new object. One way to deal with this is to define a "Stopper" class
and type check objects taken from the queue. This is not news to
anyone who's worked with multiprocessing.Queue, though.

Paul Rubin

unread,
Jul 2, 2009, 11:15:40 PM7/2/09
to
ryles <ryl...@gmail.com> writes:
> > � �sentinel = object()

>
> I agree, this is cleaner than None. We're still in the same boat,
> though, regarding iter(). Either it's 'item == None' or 'item == object ()'

Use "item is sentinel".

Gabriel Genellina

unread,
Jul 2, 2009, 11:47:54 PM7/2/09
to pytho...@python.org
En Fri, 03 Jul 2009 00:15:40 -0300, <//phr...@nospam.invalid>> escribi�:

We're talking about the iter() builtin behavior, and that uses ==
internally.

It could have used an identity test, and that would be better for this
specific case. But then iter(somefile.read, '') wouldn't work. A
compromise solution is required; since one can customize the equality test
but not an identity test, the former has a small advantage. (I don't know
if this was the actual reason, or even if this really was a concious
decision, but that's why *I* would choose == to test against the sentinel
value).

--
Gabriel Genellina

Paul Rubin

unread,
Jul 2, 2009, 11:55:12 PM7/2/09
to
"Gabriel Genellina" <gags...@yahoo.com.ar> writes:
> We're talking about the iter() builtin behavior, and that uses ==
> internally.

Oh, I see. Drat.

> It could have used an identity test, and that would be better for this
> specific case. But then iter(somefile.read, '') wouldn't work.

Yeah, it should allow supplying a predicate instead of using == on
a value. How about (untested):

from itertools import *
...
for row in takewhile(lambda x: x is sentinel,
starmap(in_queue.get, repeat(()))):
...

ryles

unread,
Jul 3, 2009, 12:13:19 AM7/3/09
to
On Jul 2, 11:55 pm, Paul Rubin <http://phr...@NOSPAM.invalid> wrote:
> Yeah, it should allow supplying a predicate instead of using == on
> a value.  How about (untested):
>
>    from itertools import *
>    ...
>    for row in takewhile(lambda x: x is sentinel,
>                          starmap(in_queue.get, repeat(()))):
>       ...

Yeah, it's a small recipe I'm sure a lot of others have written as
well. My old version:

def iterwhile(callable_, predicate):
""" Like iter() but with a predicate instead of a sentinel. """
return itertools.takewhile(predicate, repeatfunc(callable_))

where repeatfunc is as defined here:

http://docs.python.org/library/itertools.html#recipes

I wish all of these little recipes made their way into itertools or a
like module; itertools seems a bit tightly guarded.

Lawrence D'Oliveiro

unread,
Jul 4, 2009, 8:12:22 PM7/4/09
to
In message <1beffd94-cfe6-4cf6-
bd48-2cc...@j32g2000yqh.googlegroups.com>, ryles wrote:

>>>> # Oh... yeah. I really *did* want 'is None' and not '== None' which

>>>> # iter() will do. Sorry guys!


>
> Please don't let this happen to you too ;)

Strange. others have got told off for using "== None" instead of "is None"
<http://groups.google.co.nz/group/comp.lang.python/msg/a1f3170fa202af57>,
and yet it turns out Python itself does exactly the same thing.

Steven D'Aprano

unread,
Jul 4, 2009, 9:35:31 PM7/4/09
to

That's not "strange", that's a bug. Did you report it to the tracker?

--
Steven

Lawrence D'Oliveiro

unread,
Jul 5, 2009, 5:52:22 AM7/5/09
to
In message <025ff4f1$0$20657$c3e...@news.astraweb.com>, Steven D'Aprano wrote:

> On Sun, 05 Jul 2009 12:12:22 +1200, Lawrence D'Oliveiro wrote:
>

>> In message <1beffd94-cfe6-4cf6...@j32g2000yqh.googlegroups.com>, ryles wrote:
>>
>>>>>> # Oh... yeah. I really *did* want 'is None' and not '== None' which
>>>>>> # iter() will do. Sorry guys!
>>>
>>> Please don't let this happen to you too ;)
>>
>> Strange. others have got told off for using "== None" instead of "is

>> None" <http://groups.google.co.nz/group/comp.lang.python/msg/a1f3170fa202af57>,


>> and yet it turns out Python itself does exactly the same thing.
>
> That's not "strange", that's a bug.

It's not a bug, as Gabriel Genellina has pointed out.

0 new messages