val xer = new XMLEventReader(fixedSource) {
// tmp workaround till https://groups.google.com/forum/#!topic/scala-user/OO9t2pUAeZY is resolved
override val MaxQueueSize = Integer.MAX_VALUE
}
if (capacity <= 0) throw new IllegalArgumentException();
import java.nio.channels.ClosedChannelException
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.AtomicBoolean
import scala.io.Source
import scala.xml.parsing.{ExternalSources, MarkupHandler, MarkupParser}
import scala.xml.pull._
import scala.xml.{MetaData, NamespaceBinding, NodeSeq}
/**
* Main entry point into creating an event-based XML parser. Treating this
* as a [[scala.collection.Iterator]] will provide access to the generated events.
* @param src A [[scala.io.Source]] for XML data to parse
*
* @author Burak Emir
* @author Paul Phillips
*/
class XMLEventReaderPatched(src: Source)
extends scala.collection.AbstractIterator[XMLEvent]
with ProducerConsumerIterator[XMLEvent] {
// We implement a pull parser as an iterator, but since we may be operating on
// a stream (e.g. XML over a network) there may be arbitrarily long periods when
// the queue is empty. Fortunately the ProducerConsumerIterator is ideally
// suited to this task, possibly because it was written for use by this class.
// to override as necessary
val preserveWS = true
override val MaxQueueSize = 1000
protected case object POISON extends XMLEvent
val EndOfStream = POISON
// thread machinery
private[this] val parser = new Parser(src)
private[this] val parserThread = new Thread(parser, "XMLEventReader")
parserThread.start
// enqueueing the poison object is the reliable way to cause the
// iterator to terminate; hasNext will return false once it sees it.
// Calling interrupt() on the parserThread is the only way we can get
// it to stop producing tokens since it's lost deep in document() -
// we cross our fingers the interrupt() gets to its target, but if it
// fails for whatever reason the iterator correctness is not impacted,
// only performance (because it will finish the entire XML document,
// or at least as much as it can fit in the queue.)
def stop() = {
Stop.set(true)
// produce(POISON)
parserThread.interrupt()
}
private class Parser(val input: Source) extends MarkupHandler with MarkupParser with ExternalSources with Runnable {
val preserveWS = XMLEventReaderPatched.this.preserveWS
// track level for elem memory usage optimization
private var level = 0
// this is Parser's way to add to the queue - the odd return type
// is to conform to MarkupHandler's interface
def setEvent(es: XMLEvent*): NodeSeq = {
es foreach produce
NodeSeq.Empty
}
override def elemStart(pos: Int, pre: String, label: String, attrs: MetaData, scope: NamespaceBinding) {
level += 1
setEvent(EvElemStart(pre, label, attrs, scope))
}
override def elemEnd(pos: Int, pre: String, label: String) {
setEvent(EvElemEnd(pre, label))
level -= 1
}
// this is a dummy to satisfy MarkupHandler's API
// memory usage optimization return one <ignore/> for top level to satisfy
// MarkupParser.document() otherwise NodeSeq.Empty
private var ignoreWritten = false
final def elem(pos: Int, pre: String, label: String, attrs: MetaData, pscope: NamespaceBinding, empty: Boolean, nodes: NodeSeq): NodeSeq =
if (level == 1 && !ignoreWritten) {
ignoreWritten = true;
<ignore/>
} else NodeSeq.Empty
def procInstr(pos: Int, target: String, txt: String) = setEvent(EvProcInstr(target, txt))
def comment(pos: Int, txt: String) = setEvent(EvComment(txt))
def entityRef(pos: Int, n: String) = setEvent(EvEntityRef(n))
def text(pos: Int, txt: String) = setEvent(EvText(txt))
override def run() {
curInput = input
try {
interruptibly {
this.initialize.document()
}
} catch {
case e: Exception => setEvent(ExceptionEvent(e))
}
setEvent(POISON)
}
}
}
// An internal class used to propagate exception from helper threads to API end users.
private case class ExceptionEvent(exception: Exception) extends XMLEvent
// An iterator designed for one or more producers to generate
// elements, and a single consumer to iterate. Iteration will continue
// until closeIterator() is called, after which point producers
// calling produce() will receive interruptions.
//
// Since hasNext may block indefinitely if nobody is producing,
// there is also an available() method which will return true if
// the next call hasNext is guaranteed not to block.
//
// This is not thread-safe for multiple consumers!
trait ProducerConsumerIterator[T >: Null] extends Iterator[T] {
// abstract - iterator-specific distinguished object for marking eos
val EndOfStream: T
// defaults to unbounded - override to positive Int if desired
val MaxQueueSize = -1
val Stop = new AtomicBoolean(false)
def interruptibly[T](body: => T): Option[T] = try Some(body) catch {
case _: InterruptedException =>
Thread.currentThread.interrupt(); None
case _: ClosedChannelException => None
}
private[this] lazy val queue =
if (MaxQueueSize < 0) new LinkedBlockingQueue[T]()
else new LinkedBlockingQueue[T](MaxQueueSize)
private[this] var buffer: T = _
private def fillBuffer() = if (Stop.get) {
buffer = EndOfStream
isElement(buffer)
} else {
buffer = interruptibly(queue.take) getOrElse EndOfStream
isElement(buffer)
}
private def isElement(x: T) = x != null && x != EndOfStream
private def eos() = buffer == EndOfStream
// public producer interface - this is the only method producers call, so
// LinkedBlockingQueue's synchronization is all we need.
def produce(x: T): Unit = if (!Stop.get && !eos) interruptibly(queue put x)
// consumer/iterator interface - we need not synchronize access to buffer
// because we required there to be only one consumer.
def hasNext = !Stop.get && (!eos && (buffer != null || fillBuffer))
def next() = {
if (eos()) throw new NoSuchElementException("ProducerConsumerIterator")
if (buffer == null) fillBuffer()
if (buffer.isInstanceOf[ExceptionEvent]) throw buffer.asInstanceOf[ExceptionEvent].exception
drainBuffer()
}
def available() = isElement(buffer) || isElement(queue.peek)
private def drainBuffer() = {
assert(!eos)
val res = buffer
buffer = null
res
}
}
--
You received this message because you are subscribed to a topic in the Google Groups "scala-user" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/scala-user/OO9t2pUAeZY/unsubscribe.
To unsubscribe from this group and all its topics, send an email to scala-user+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.