lucene in hdfs via elephant-bird for queries in range of millions

42 views
Skip to first unread message

jaikit savla

unread,
Nov 5, 2014, 5:02:19 PM11/5/14
to elephant...@googlegroups.com
Hello Folks,

I am using elephant-bird pig-lucene to index and query on HDFS. I have an index size of 10 GB which is distributed into 200 index part files. I have about 10 million query terms which I need to query this index. Since the queries are huge, after running few queries over initial index part, it fails with StackOverFlow error (pasted below) and sometimes with memory error. 

I did write an UDF which basically looks up all the indexes for given query. I was able to split queries across multiple mappers by using setting "mapred.max.split.size". Issue with that solution is that, since UDF looks into all the indexes in one call, it takes lot of time (~1 to 20s per query). Also I need to periodically (usually after 10 queries) reload all indexes since after a while it throws BlockNotFoundException and reloading index takes long time as well. This solution works with small size of index for millions of queries. But for large index size it performs very poorly.

Another approach I am trying right now is:

Have one mapper for each index part file and each mapper queries all the queries. After than I am planning to run reduce job which will spit out the result with highest solr score. 
Let me know what you think about this solution.

I would really appreciate if anyone can provide any pointers or reference code which I can look at.

Thanks,
Jaikit

2014-11-05 07:39:03,006 FATAL [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.StackOverflowError
at sun.nio.ch.Net.localInetAddress(Native Method)
at sun.nio.ch.Net.localAddress(Net.java:389)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:742)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:529)
at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:2884)
at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:747)
at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:662)
at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:326)
at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:570)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:793)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:840)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:648)
at java.io.DataInputStream.readByte(DataInputStream.java:265)
at com.twitter.elephantbird.mapreduce.input.LuceneHdfsDirectory$HDFSIndexInput.readByte(LuceneHdfsDirectory.java:135)
at org.apache.lucene.store.DataInput.readVInt(DataInput.java:108)
at org.apache.lucene.codecs.BlockTreeTermsReader$FieldReader$SegmentTermsEnum$Frame.loadBlock(BlockTreeTermsReader.java:2342)
at org.apache.lucene.codecs.BlockTreeTermsReader$FieldReader$SegmentTermsEnum.seekExact(BlockTreeTermsReader.java:1678)
at org.apache.lucene.index.TermContext.build(TermContext.java:95)
at org.apache.lucene.search.TermQuery.createWeight(TermQuery.java:167)
at org.apache.lucene.search.BooleanQuery$BooleanWeight.<init>(BooleanQuery.java:186)
at org.apache.lucene.search.BooleanQuery.createWeight(BooleanQuery.java:400)
at org.apache.lucene.search.IndexSearcher.createNormalizedWeight(IndexSearcher.java:647)
at org.apache.lucene.search.IndexSearcher.search(IndexSearcher.java:264)
at org.apache.lucene.search.IndexSearcher.search(IndexSearcher.java:252)
at com.xx.extraction.magma.PGIndexInputFormat$1.search(PGIndexInputFormat.java:46)
Reply all
Reply to author
Forward
0 new messages