I'm using the following code to convert a pipe into a map and use it in a subsequent step:
val globalIps = TypedPipe.from(...)
.toIterableExecution
.waitFor(conf, mode)
.get
.toMap
then my code looks like:
val filtered = somePipe
.filter('ip) { ip: String =>
globalIps.contains(ip)
}
The thing is that when this runs, I have a preliminary step which runs the Execution and saves it to a .seq file. Then my actual flow begins, I see many prints like "writing step state to dist cache, too large for job conf, size: 79401872",
and it finally ends with an java.lang.OutOfMemoryError: Java heap space.
The size of the map is about 50MB and i'm running with 2.4GB heap size so I can hardly believe it won't fit into memory. Am I doing something wrong?