XMLEventReader causing deadlock when stop() is called?

97 views
Skip to first unread message

Kostas kougios

unread,
May 5, 2015, 9:52:21 AM5/5/15
to scala...@googlegroups.com
Hi, this seems like a bug to me. Did anyone came upon this?

I have a n XMLEventReader that I use to parse a few elements from the beginning of big files. Then I call stop() to terminate it's background thread because it is causing a lot of resources to be used if I leave it running.

The problem seems to be that there is an internal LinkedBlockingQueue with 1000 elements capacity. This queue must be full by the time I call stop(). Since I don't consume any more elements from the thread that calls stop() and the blocking list is already full, my code deadlocks:

Stack Trace
main [1] (WAITING)
   sun.misc.Unsafe.park line: not available [native method]
   java.util.concurrent.locks.LockSupport.park line: 175
   java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await line: 2039
   java.util.concurrent.LinkedBlockingQueue.put line: 350
   scala.xml.pull.ProducerConsumerIterator$$anonfun$produce$1.apply$mcV$sp line: 144
   scala.xml.pull.ProducerConsumerIterator$$anonfun$produce$1.apply line: 144
   scala.xml.pull.ProducerConsumerIterator$$anonfun$produce$1.apply line: 144
   scala.xml.pull.ProducerConsumerIterator$class.interruptibly line: 125
   scala.xml.pull.XMLEventReader.interruptibly line: 27
   scala.xml.pull.ProducerConsumerIterator$class.produce line: 144
   scala.xml.pull.XMLEventReader.produce line: 27
   scala.xml.pull.XMLEventReader.stop line: 56
   pmc.IdFromNXml.apply line: 67

It looks like a bug to me. Also I don't understand the need for a separate thread. Since we're required to call stop() to stop processing, why the background thread? The class is instantiated via

new XMLEventReader(source)

and we need to close the source anyway, so it could all be done within the same thread.

Cheers

Kostas kougios

unread,
May 5, 2015, 10:12:39 AM5/5/15
to scala...@googlegroups.com
Also as a temp fix I tried:

val xer = new XMLEventReader(fixedSource) {
// tmp workaround till https://groups.google.com/forum/#!topic/scala-user/OO9t2pUAeZY is resolved
override val MaxQueueSize = Integer.MAX_VALUE
}

but now I sometimes get:

Caused by: java.lang.IllegalArgumentException
    at java.util.concurrent.LinkedBlockingQueue.<init>(LinkedBlockingQueue.java:261)
    at scala.xml.pull.ProducerConsumerIterator$class.scala$xml$pull$ProducerConsumerIterator$$queue(XMLEventReader.scala:133)
    at scala.xml.pull.XMLEventReader.scala$xml$pull$ProducerConsumerIterator$$queue$lzycompute(XMLEventReader.scala:27)
    at scala.xml.pull.XMLEventReader.scala$xml$pull$ProducerConsumerIterator$$queue(XMLEventReader.scala:27)
    at scala.xml.pull.ProducerConsumerIterator$$anonfun$produce$1.apply$mcV$sp(XMLEventReader.scala:144)
    at scala.xml.pull.ProducerConsumerIterator$$anonfun$produce$1.apply(XMLEventReader.scala:144)
    at scala.xml.pull.ProducerConsumerIterator$$anonfun$produce$1.apply(XMLEventReader.scala:144)
    at scala.xml.pull.ProducerConsumerIterator$class.interruptibly(XMLEventReader.scala:125)
    at scala.xml.pull.XMLEventReader.interruptibly(XMLEventReader.scala:27)
    at scala.xml.pull.ProducerConsumerIterator$class.produce(XMLEventReader.scala:144)
    at scala.xml.pull.XMLEventReader.produce(XMLEventReader.scala:27)
    at scala.xml.pull.XMLEventReader$Parser$$anonfun$setEvent$1.apply(XMLEventReader.scala:68)
    at scala.xml.pull.XMLEventReader$Parser$$anonfun$setEvent$1.apply(XMLEventReader.scala:68)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
    at scala.xml.pull.XMLEventReader$Parser.setEvent(XMLEventReader.scala:68)
    at scala.xml.pull.XMLEventReader$Parser.text(XMLEventReader.scala:91)
    at scala.xml.parsing.MarkupParser$class.appendText(MarkupParser.scala:407)
    at scala.xml.pull.XMLEventReader$Parser.appendText(XMLEventReader.scala:60)
    at scala.xml.parsing.MarkupParser$class.content(MarkupParser.scala:480)
    at scala.xml.pull.XMLEventReader$Parser.content(XMLEventReader.scala:60)
    at scala.xml.parsing.MarkupParser$class.document(MarkupParser.scala:244)
    at scala.xml.pull.XMLEventReader$Parser.document(XMLEventReader.scala:60)
    at scala.xml.pull.XMLEventReader$Parser$$anonfun$run$1.apply(XMLEventReader.scala:96)
    at scala.xml.pull.XMLEventReader$Parser$$anonfun$run$1.apply(XMLEventReader.scala:96)
    at scala.xml.pull.ProducerConsumerIterator$class.interruptibly(XMLEventReader.scala:125)
    at scala.xml.pull.XMLEventReader.interruptibly(XMLEventReader.scala:27)
    at scala.xml.pull.XMLEventReader$Parser.run(XMLEventReader.scala:96)
    at java.lang.Thread.run(Thread.java:745)

It fails because linkedblocking queue capacity is 0 for some reason (I think it might be due to sync issues because a separate thread tries to read MaxQueueSize) :
if (capacity <= 0) throw new IllegalArgumentException();

Seth Tisue

unread,
May 5, 2015, 10:50:32 AM5/5/15
to scala-user

Konstantinos Kougios

unread,
May 5, 2015, 11:12:28 AM5/5/15
to scala...@googlegroups.com
Hi, so seems like a very old issue. I opened a bug in the lib's home @
github.

There are a couple of issues actually with the lib. Currently as a
workaround I do a reader.drop(10000) to flush the queue before stop() in
a hope that the producer thread will not be fast enough to fill the queue.

Kostas kougios

unread,
May 6, 2015, 4:53:24 AM5/6/15
to scala...@googlegroups.com
I did a quick workaround by copying the XmlReader class and modifying stop() to use an atomicboolean instead of a POISON pill . I am not getting deadlocks anymore.

import java.nio.channels.ClosedChannelException
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.AtomicBoolean

import scala.io.Source
import scala.xml.parsing.{ExternalSources, MarkupHandler, MarkupParser}
import scala.xml.pull._
import scala.xml.{MetaData, NamespaceBinding, NodeSeq}

/**
* Main entry point into creating an event-based XML parser. Treating this
* as a [[scala.collection.Iterator]] will provide access to the generated events.
* @param src A [[scala.io.Source]] for XML data to parse
*
* @author Burak Emir
* @author Paul Phillips
*/
class XMLEventReaderPatched(src: Source)
extends scala.collection.AbstractIterator[XMLEvent]
with ProducerConsumerIterator[XMLEvent] {

// We implement a pull parser as an iterator, but since we may be operating on
// a stream (e.g. XML over a network) there may be arbitrarily long periods when
// the queue is empty. Fortunately the ProducerConsumerIterator is ideally
// suited to this task, possibly because it was written for use by this class.

// to override as necessary
val preserveWS = true

override val MaxQueueSize = 1000

protected case object POISON extends XMLEvent

val EndOfStream = POISON

// thread machinery
private[this] val parser = new Parser(src)
private[this] val parserThread = new Thread(parser, "XMLEventReader")
parserThread.start

// enqueueing the poison object is the reliable way to cause the
// iterator to terminate; hasNext will return false once it sees it.
// Calling interrupt() on the parserThread is the only way we can get
// it to stop producing tokens since it's lost deep in document() -
// we cross our fingers the interrupt() gets to its target, but if it
// fails for whatever reason the iterator correctness is not impacted,
// only performance (because it will finish the entire XML document,
// or at least as much as it can fit in the queue.)
def stop() = {
Stop.set(true)
// produce(POISON)
parserThread.interrupt()
}

private class Parser(val input: Source) extends MarkupHandler with MarkupParser with ExternalSources with Runnable {
val preserveWS = XMLEventReaderPatched.this.preserveWS
// track level for elem memory usage optimization
private var level = 0

// this is Parser's way to add to the queue - the odd return type
// is to conform to MarkupHandler's interface
def setEvent(es: XMLEvent*): NodeSeq = {
es foreach produce
NodeSeq.Empty
}

override def elemStart(pos: Int, pre: String, label: String, attrs: MetaData, scope: NamespaceBinding) {
level += 1
setEvent(EvElemStart(pre, label, attrs, scope))
}

override def elemEnd(pos: Int, pre: String, label: String) {
setEvent(EvElemEnd(pre, label))
level -= 1
}

// this is a dummy to satisfy MarkupHandler's API
// memory usage optimization return one <ignore/> for top level to satisfy
// MarkupParser.document() otherwise NodeSeq.Empty
private var ignoreWritten = false

final def elem(pos: Int, pre: String, label: String, attrs: MetaData, pscope: NamespaceBinding, empty: Boolean, nodes: NodeSeq): NodeSeq =
if (level == 1 && !ignoreWritten) {
ignoreWritten = true;
<ignore/>
} else NodeSeq.Empty

def procInstr(pos: Int, target: String, txt: String) = setEvent(EvProcInstr(target, txt))

def comment(pos: Int, txt: String) = setEvent(EvComment(txt))

def entityRef(pos: Int, n: String) = setEvent(EvEntityRef(n))

def text(pos: Int, txt: String) = setEvent(EvText(txt))

override def run() {
curInput = input
try {
interruptibly {
this.initialize.document()
}
} catch {
case e: Exception => setEvent(ExceptionEvent(e))
}
setEvent(POISON)
}
}

}

// An internal class used to propagate exception from helper threads to API end users.
private case class ExceptionEvent(exception: Exception) extends XMLEvent

// An iterator designed for one or more producers to generate
// elements, and a single consumer to iterate. Iteration will continue
// until closeIterator() is called, after which point producers
// calling produce() will receive interruptions.
//
// Since hasNext may block indefinitely if nobody is producing,
// there is also an available() method which will return true if
// the next call hasNext is guaranteed not to block.
//
// This is not thread-safe for multiple consumers!
trait ProducerConsumerIterator[T >: Null] extends Iterator[T] {
// abstract - iterator-specific distinguished object for marking eos
val EndOfStream: T

// defaults to unbounded - override to positive Int if desired
val MaxQueueSize = -1

val Stop = new AtomicBoolean(false)

def interruptibly[T](body: => T): Option[T] = try Some(body) catch {
case _: InterruptedException =>
Thread.currentThread.interrupt(); None
case _: ClosedChannelException => None
}

private[this] lazy val queue =
if (MaxQueueSize < 0) new LinkedBlockingQueue[T]()
else new LinkedBlockingQueue[T](MaxQueueSize)
private[this] var buffer: T = _

private def fillBuffer() = if (Stop.get) {
buffer = EndOfStream
isElement(buffer)
} else {
buffer = interruptibly(queue.take) getOrElse EndOfStream
isElement(buffer)
}

private def isElement(x: T) = x != null && x != EndOfStream

private def eos() = buffer == EndOfStream

// public producer interface - this is the only method producers call, so
// LinkedBlockingQueue's synchronization is all we need.
def produce(x: T): Unit = if (!Stop.get && !eos) interruptibly(queue put x)

// consumer/iterator interface - we need not synchronize access to buffer
// because we required there to be only one consumer.
def hasNext = !Stop.get && (!eos && (buffer != null || fillBuffer))

def next() = {
if (eos()) throw new NoSuchElementException("ProducerConsumerIterator")
if (buffer == null) fillBuffer()
if (buffer.isInstanceOf[ExceptionEvent]) throw buffer.asInstanceOf[ExceptionEvent].exception

drainBuffer()
}

def available() = isElement(buffer) || isElement(queue.peek)

private def drainBuffer() = {
assert(!eos)
val res = buffer
buffer = null
res
}
}

William Narmontas

unread,
May 20, 2015, 1:56:28 AM5/20/15
to scala...@googlegroups.com
Maybe this will interest you. https://github.com/ScalaWilliam/xs4s

I and my friend tried using XMLEventReader before as well. I ended up using StAX + FSMs for streamed data.

Konstantinos Kougios

unread,
May 20, 2015, 6:20:54 AM5/20/15
to scala...@googlegroups.com
Thanks, looks interesting.

Btw, seems the scala pull-xml lib wants maintainers ;)
--
You received this message because you are subscribed to a topic in the Google Groups "scala-user" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/scala-user/OO9t2pUAeZY/unsubscribe.
To unsubscribe from this group and all its topics, send an email to scala-user+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Kostas kougios

unread,
May 28, 2015, 11:37:17 AM5/28/15
to scala...@googlegroups.com
Is this something you plan to release or is it just a showcase?

I am trying to find a lib to use on spark jobs (scala 2.10).
Reply all
Reply to author
Forward
0 new messages