FSDigestInputStream.seek not supported

35 views
Skip to first unread message

Peter Voss

unread,
Aug 14, 2009, 7:53:59 AM8/14/09
to cascading-user
Hi,

during my local testing I use cascading with a http input source (I am
using cascading 1.0.13 and hadoop 0.19.1). When running my cascading
flow on a hadoop cluster a map task fails with the following
exception:

java.io.IOException: not supported
at cascading.tap.hadoop.FSDigestInputStream.seek
(FSDigestInputStream.java:125)
at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:
37)
at org.apache.hadoop.mapred.LineRecordReader.(LineRecordReader.java:
88)
at org.apache.hadoop.mapred.TextInputFormat.getRecordReader
(TextInputFormat.java:50)
at cascading.tap.hadoop.MultiInputFormat$1.operate
(MultiInputFormat.java:270)
at cascading.tap.hadoop.MultiInputFormat$1.operate
(MultiInputFormat.java:265)
at cascading.util.Util.retry(Util.java:445)
at cascading.tap.hadoop.MultiInputFormat.getRecordReader
(MultiInputFormat.java:264)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:331)
at org.apache.hadoop.mapred.Child.main(Child.java:158)

Is there anything known about this? Wouldn't it be good to also
implement FSDigestInputStream.seek instead of just throwing an
exception? E.g. when I use the following implementation everything
works fine again:

public void seek(long pos) throws IOException {
if (getPos() > pos) {
throw new IOException("Can't seek to " + pos + ", already at
" + getPos());
}
for (int i = 0; i < pos - getPos(); i++) {
read();
}
}

Thanks,
--Peter

Chris K Wensel

unread,
Aug 14, 2009, 8:02:10 AM8/14/09
to cascadi...@googlegroups.com
FSDigestInputStream is used by the Cascading s3tp:// FS for access to
S3. This has been deprecated for s3n://.

Use s3n:// you will be much happier for it.

I'll update Cascading to start issuing a warning in the logs when
using s3tp://

cheers
chris
--
Chris K Wensel
ch...@concurrentinc.com
http://www.concurrentinc.com

Peter Voss

unread,
Aug 14, 2009, 8:50:28 AM8/14/09
to cascading-user
Hi Chris,

thanks for your reply. I am actually using http:// not any s3
protocol. HttpFileSystem also seems to be using FSDigestInputStream.

Thanks,
--Peter

Chris K Wensel

unread,
Aug 14, 2009, 11:10:24 AM8/14/09
to cascadi...@googlegroups.com
ugh.. ok, let me see what I can do.

Chris K Wensel

unread,
Aug 14, 2009, 11:44:22 AM8/14/09
to cascadi...@googlegroups.com
Hey Peter

I just pushed wip-1.0.15 to github with a impl for seek(), can you
test it out for me?

cheers,
chris

On Aug 14, 2009, at 5:50 AM, Peter Voss wrote:

>

Peter Voss

unread,
Aug 17, 2009, 2:13:32 AM8/17/09
to cascading-user
Hi Chris,

that works perfectly.

Just one comment. Are you not worried that "new byte[len]" could
consume quite a lot of memory when seeking in a really large file?
You could also consider using a constant buffer. Something like:

byte[] buffer = new byte[4096];
while (len > 0) {
int bytesToRead = len < buffer.length ? len : buffer.length;
read(buffer, 0, bytesToRead);
len = len - bytesToRead;
}

But for what I am doing it doesn't matter.

Thanks for your help,
--Peter

Chris K Wensel

unread,
Aug 17, 2009, 10:51:19 AM8/17/09
to cascadi...@googlegroups.com
good catch. Let me clean that up this morning and push something out.

Chris K Wensel

unread,
Aug 17, 2009, 4:56:04 PM8/17/09
to cascadi...@googlegroups.com
ok, pushed up the following change:

int len = (int) ( pos - getPos() );
byte[] bytes = new byte[50 * 1024];

while( len > 0 )
len -= read( bytes, 0, Math.min( len, bytes.length ) );

let me know if i missed something obvious again. heh

cheers,
ckw
Reply all
Reply to author
Forward
0 new messages