object FileDownloader {
private case object RetryDownload
def props(implicit basePath: Path): Props =
Props(classOf[FileDownloader], basePath)
val downloaderSystem = ActorSystem("FileDownloader", config = ConfigFactory.load.atPath("downloader-system"))
}
class FileDownloader(implicit val basePath: Path) extends Actor with ActorLogging with Stash {
import FileDownloader._
import downloaderSystem.dispatcher
implicit val timeout = 30.seconds
val parent = context.parent
val backoff = 1.second
val maxAttempts = 3
def io = IO(Http)(downloaderSystem)
override val receive: Receive = {
case AcquireResource(source) if source.localPath.toFile.exists =>
sender() ! ResourceFetched(source)
// TODO add handling for S3 Resource Downloads
//case AcquireResource(source: S3ResourceLocation) =>
case AcquireResource(source) =>
attemptDownload(source, Seq(sender()), 1)
case _ => ()
}
def awaitingDownload(source: ResourceLocation, requestors: Seq[ActorRef], attempt: Int): Receive = {
case AcquireResource(`source`) if !(requestors contains sender()) =>
context become awaitingDownload(source, requestors :+ sender(), attempt)
case AcquireResource(`source`) => ()
case _: AcquireResource =>
stash()
case HttpResponse(StatusCodes.OK, entity, _, _) =>
try {
val dest = source.localPath
dest.getParent.toFile.mkdirs()
val tempFile = Files.createTempFile("temp-" + source.filename, ".temp")
FileUtils.writeByteArrayToFile(tempFile.toFile, entity.data.toByteArray)
Files.move(tempFile, dest)
success(source, requestors)
} catch {
case e: Throwable =>
self ! Failure(e)
}
case RetryDownload =>
attemptDownload(source, requestors, attempt + 1)
case _: Failure | _: HttpResponse if attempt <= maxAttempts =>
context.system.scheduler.scheduleOnce(backoff, self, RetryDownload)
case HttpResponse(statusCode, entity, _, _) =>
fail(source, requestors, new Error(s"Failed to download $source (${source.toURL}); reason: $statusCode, ${entity.asString}"))
case Failure(throwable) =>
log.error(throwable, s"Failed to download $source (${source.toURL})")
fail(source, requestors, throwable)
}
def fail(source: ResourceLocation, requestors: Seq[ActorRef], reason: Throwable): Unit = {
val evt = ResourceFetchFailed(source, reason)
(requestors :+ parent).foreach(_ ! evt)
complete()
}
def success(source: ResourceLocation, requestors: Seq[ActorRef]): Unit = {
val evt = ResourceFetched(source)
(requestors :+ parent).foreach(_ ! evt)
complete()
}
def complete(): Unit = {
unstashAll()
context become receive
}
def attemptDownload(source: ResourceLocation, requestors: Seq[ActorRef], count: Int): Unit = {
io ! HttpRequest(HttpMethods.GET, Uri(source.toURL.toString))
context become awaitingDownload(source, requestors, count)
}
}