Processamento Paralelo em Scala

7 views
Skip to first unread message

Rafael Afonso

unread,
Feb 2, 2010, 8:15:53 PM2/2/10
to scala-br
Olá:

Estou experimentando usar Programação Paralela em Scala. Baseado neste
exemplo
do site StackOverflow (http://stackoverflow.com/questions/1751953/
concurrent-map-foreach-in-scala),
criei um programa baseado no Problema 1 do Project Euler (http://
projecteuler.net/index.php?section=problems&id=1).
Experimentei três modos: O primeiro é uma execução simples, sem
paralelismo. O
segundo usando a API do java.util.concurrency, através de Executors
(http://java.sun.com/javase/6/docs/api/java/util/concurrent/
Executors.html)
e Callables (http://java.sun.com/javase/6/docs/api/java/util/
concurrent/Callable.html).
O Terceiro, baseado na sugestão da página do StackOverflow, usando
scala.Futures
(http://www.scala-lang.org/docu/files/api/scala/actors/Futures
$object.html). O objetivo
é comparar os tempos de se percorrer o range paralelamente e executar
a soma.
Segue-se o código:

package sandbox

import java.util.concurrent._
import scala.actors._

object TestPool {

def eval(n: Int): Boolean = (n % 3 == 0) || (n % 5 == 0)

def runSingle(max: Int): Int = (1 until max).filter(eval(_)).foldLeft
(0)(_ + _)

def runPool(max: Int): Int = {

def getCallable(i: Int): Callable[Boolean] = new Callable[Boolean]
{ def call = eval(i) }

val pool = Executors.newFixedThreadPool(5)
val result = (1 until max).filter(i => pool.submit(getCallable
(i)).get).foldLeft(0)(_ + _)
pool.shutdown
pool.awaitTermination(Math.MAX_LONG, TimeUnit.SECONDS)

result
}

def runFutures(max: Int): Int = (1 until max).filter(i =>
Futures.future(eval(i)).apply).foldLeft(0)(_ + _)

/**
* f é a função a ser executada. O retorno é uma Tuple2 contendo a
soma e o
* tempo de execução.
*/
def test(max: Int, f: Int => Int): (Int, Long) = {
val t0 = System.currentTimeMillis
val result = f(max)
val deltaT = System.currentTimeMillis - t0

(result, deltaT)
}


def main(args : Array[String]) : Unit = {
val max = 10000

println("Single : " + test(max, runSingle))
println("Pool : " + test(max, runPool))
println("Futures: " + test(max, runFutures))
}
}

O resultados que obtive foram os seguntes:
- max = 10:
Single : (23,31)
Pool : (23,16)
Futures: (23,31)

- max = 100:
Single : (2318,33)
Pool : (2318,31)
Futures: (2318,55)

- max = 1000:
Single : (233168,42)
Pool : (233168,111)
Futures: (233168,364)

- max = 10000:
Single : (23331668,144)
Pool : (23331668,544)
Futures: ... demorou muito e cancelei a execução

Claramente usando as APis de concorrencia do Java e de Scala não está
gerando os resultados esperados.
Portanto eu pergunto: Onde estou errando? Qual seria a forma mais
adequada de se
utilizar a Concorrência? E quanto aos atores de Scala? Como eles
poderiam ser utilizados?

Grato,

Rafael U. C. Afonso

Jonhnny Weslley

unread,
Feb 2, 2010, 9:41:33 PM2/2/10
to scal...@googlegroups.com
Ola Rafael,

Vc nao estah errando. O problema que vc escolheu é simples demais para que se possa notar os beneficios do paralelismo. Ao invés de simplesmente filtrar e somar números (operações computacionalmente "rápidas")  fosse melhor realizar operações de ponto flutuante, fractais, ou até mesmo calcular números primos e/ou máximo divisor comum (estes dois ultimos estão disponíveis na classe BigInteger de Java :) Além disso, o paralelismo será notado principalmente se vc utilizar uma máquina multicore, senão não faz muito sentido.

Segue uma simples implementação do seu problema utilizando actors:

 def runActors(max: Int): Int = {
   import scala.actors.Actor._

   case class Add(n: Int)
   case object Result
   val act = actor {
     var sum = 0
     loop {
       react {
         case Add(n) => if (eval(n)) { sum += n }
         case Result => reply(sum); exit
       }
     }
   }
   for (i <- 1 until max)
     act ! Add(i)

   act !? Result match {
    case result => result.toString.toInt
   }
 }

...
println("Actors: " + test(max, runActors))


2010/2/2 Rafael Afonso <rafael...@gmail.com>



--
Jonhnny Weslley Sousa Silva
MSc. Candidate in Computer Science
Distributed Systems Laboratory
Federal University of Campina Grande
------------------------------------------------------------------
BeeFS http://www.lsd.ufcg.edu.br/beefs
------------------------------------------------------------------
http://www.jonhnnyweslley.net

Rafael Afonso

unread,
Feb 4, 2010, 7:19:25 PM2/4/10
to scala-br
Olá:

Desculpe a demora, mas aí vai a minha resposta. Em primeiro lugar
nunca lidei de verdade com multiprocessamento, de forma que parece
ainda terei que ralar mais. Além disso ainda não consegui entender o
esquema dos Actors de Scala. De qualquer forma, experimentei a sua
sugestão de Actors e também usando ExecutorSevice/Callables conforme
me foi sugerido no fórum do GUJ (http://www.guj.com.br/posts/list/
197163.java).
Segue-se o código:

package sandbox

import java.util.concurrent._
import scala.actors._

object TestPool {

def eval(n: Int): Boolean = (n % 3 == 0) || (n % 5 == 0)

def closePool(pool: ExecutorService) {
pool.shutdown
pool.awaitTermination(Math.MAX_LONG, TimeUnit.SECONDS)
}

def runSingle(max: Int): Long = (1 until
max).filter(eval(_)).foldLeft(0L)(_ + _)

def runPool(max: Int): Long = {

def getCallable(i: Int): Callable[Boolean] = new Callable[Boolean]
{ def call = eval(i) }

val pool = Executors.newCachedThreadPool


val result = (1 until max).filter(i =>

pool.submit(getCallable(i)).get).foldLeft(0L)(_ + _)
closePool(pool)

result
}

def runPoolJava(max: Int): Long = {
var sum = 0L

class SumCallable(n: Int) extends Callable[Int] {
def call = {
if(eval(n)) sum += n
n
}
}

val callables: java.util.List[Callable[Int]] = new
java.util.ArrayList[Callable[Int]](max + 1)
(1 until max).foreach(i => callables.add(new SumCallable(i)))

val pool = Executors.newCachedThreadPool
pool.invokeAll(callables)
closePool(pool)

sum
}

def runActors(max: Int): Long = {
import scala.actors.Actor._

case class Add(n: Int)

case object Result

val act = actor {
var sum = 0L


loop {
react {
case Add(n) => if (eval(n)) { sum += n }
case Result => reply(sum); exit
}
}
}

for (i <- 1 until max) act ! Add(i)

act !? Result match {
case result => result.toString.toLong
}
}


/**
* f é a função a ser executada. O retorno é uma Tuple2 contendo a
soma e o
* tempo de execução.
*/

def test(max: Int, f: Int => Long): (Long, Long) = {


val t0 = System.currentTimeMillis
val result = f(max)
val deltaT = System.currentTimeMillis - t0

(result, deltaT)
}

def testMax(max: Int) {
println("* Max = " + max)


println("Single : " + test(max, runSingle))
println("Pool : " + test(max, runPool))

println("PoolJava: " + test(max, runPoolJava))


println("Actors : " + test(max, runActors))

println
}

def main(args : Array[String]) : Unit = {

Array(0, 1, 3, 10, 30, 100, 300, 1000, 3000, 10000, 30000,
100000).foreach(testMax(_))
}
}


Os resultados, infelizmente não foram muito melhores. E Além disso, no
caso de PoolJava (ExecutorSevice/Callables) o resultado tornou-se
errado a para 10000 e 100000.

* Max = 0
Single : (0,14)
Pool : (0,169)
PoolJava: (0,2)
Actors : (0,170)

* Max = 1
Single : (0,0)
Pool : (0,0)
PoolJava: (0,0)
Actors : (0,15)

* Max = 3
Single : (0,1)
Pool : (0,19)
PoolJava: (0,4)
Actors : (0,8)

* Max = 10
Single : (23,0)
Pool : (23,6)
PoolJava: (23,16)
Actors : (23,19)

* Max = 30
Single : (195,2)
Pool : (195,11)
PoolJava: (195,10)
Actors : (195,19)

* Max = 100
Single : (2318,1)
Pool : (2318,3)
PoolJava: (2318,8)
Actors : (2318,8)

* Max = 300
Single : (20850,2)
Pool : (20850,45)
PoolJava: (20850,29)
Actors : (20850,19)

* Max = 1000
Single : (233168,8)
Pool : (233168,71)
PoolJava: (233168,98)
Actors : (233168,122)

* Max = 3000
Single : (2098500,33)
Pool : (2098500,203)
PoolJava: (2098500,147)
Actors : (2098500,216)

* Max = 10000
Single : (23331668,48)
Pool : (23331668,189)
PoolJava: (23323633,355)
Actors : (23331668,417)

* Max = 30000
Single : (209985000,62)
Pool : (209985000,170)
PoolJava: (209985000,425)
Actors : (209985000,138)

* Max = 100000
Single : (2333316668,12)
Pool : (2333316668,518)
PoolJava: (2333253053,1103)
Actors : (2333316668,315)


Pelo visto ainda há muito o que aprender nesta área. :(

Grato,

Rafael U. C. Afonso

On 3 fev, 00:41, Jonhnny Weslley <jonhnnywesl...@jonhnnyweslley.net>
wrote:

> 2010/2/2 Rafael Afonso <rafael.afo...@gmail.com>


>
>
>
> > Olá:
>
> > Estou experimentando usar Programação Paralela em Scala. Baseado neste
> > exemplo
> > do site StackOverflow (http://stackoverflow.com/questions/1751953/

> > concurrent-map-foreach-in-scala<http://stackoverflow.com/questions/1751953/%0Aconcurrent-map-foreach-...>


> > ),
> > criei um programa baseado no Problema 1 do Project Euler (http://
> > projecteuler.net/index.php?section=problems&id=1).
> > Experimentei três modos: O primeiro é uma execução simples, sem
> > paralelismo. O
> > segundo usando a API do java.util.concurrency, através de Executors
> > (http://java.sun.com/javase/6/docs/api/java/util/concurrent/

> > Executors.html<http://java.sun.com/javase/6/docs/api/java/util/concurrent/%0AExecuto...>
> > )
> > e Callables (http://java.sun.com/javase/6/docs/api/java/util/
> > concurrent/Callable.html<http://java.sun.com/javase/6/docs/api/java/util/%0Aconcurrent/Callabl...>


> > ).
> > O Terceiro, baseado na sugestão da página do StackOverflow, usando
> > scala.Futures
> > (http://www.scala-lang.org/docu/files/api/scala/actors/Futures

> > $object.html<http://www.scala-lang.org/docu/files/api/scala/actors/Futures%0A$obje...>).

Reply all
Reply to author
Forward
0 new messages