IO and Bulk insertions

42 views
Skip to first unread message

Damien Gouyette

unread,
Jun 27, 2019, 11:54:02 AM6/27/19
to Scio Users
Hello everyone 

I am writing a connector for SalesForce Cloud Marketing. 🤪


The output will be in a data extension. 

To limit calls, I have to use a method that does batch insertion cf https://github.com/salesforce-marketingcloud/FuelSDK-Java/blob/master/src/main/java/com/exacttarget/fuelsdk/ETDataExtension .java # l837 

and 


To write the following code:

How can i split equally a SCollection[T] to a SCollection[Seq[T]] ?
Is there a better way ?

import com.spotify.scio.ScioContext
import com.spotify.scio.coders.Coder
import com.spotify.scio.io.{EmptyTap, EmptyTapOf, ScioIO, Tap}
import com.spotify.scio.values.SCollection

import scala.concurrent.Future
import scala.language.reflectiveCalls

final case class DataExtensionIO[T]() extends ScioIO[T] {

override type ReadP = Nothing
override type WriteP = WriteParam[T]
override val tapT = EmptyTapOf[T]

override def read(sc: ScioContext, params: ReadP): SCollection[T] = throw new UnsupportedOperationException("Can't read from Data extension")

override def tap(params: Nothing): Tap[tapT.T] = EmptyTap

override protected def write(data: SCollection[T], params: WriteParam[T]): Future[Tap[Nothing]] = {
data.map(marketingcloud.write)
Future.successful(EmptyTap)
}
}

package object marketingcloud {

def write[T]: Seq[T] => Seq[T] = (values: Seq[T]) => {
MCClient.insertRows(values.map(_.toString))
values
}


implicit class MarketingcloudSCollection[T](@transient private val self: SCollection[T])
extends AnyVal {
def saveAsDataExtension(name: String)(implicit coder: Coder[T]) = self.write(DataExtensionIO[T]())(WriteParam())
}


case class WriteParam[T](f: T => Seq[T])
}





Neville Li

unread,
Jun 27, 2019, 1:32:21 PM6/27/19
to Damien Gouyette, Scio Users
The easiest way is probably a `SCollection#groupByKey(fn)` where `fn` hashes each item into a group but that will not guarantee any size distribution of the grouped `Iterable[T]`. Plus it'll trigger a shuffle.

An alternative is a custom `DoFn[T, Seq[T]]` with memorization. The `DoFn` memorizes each `T` it sees in `processElement`, output `Seq[T]` when it hits the size threshold or at the end in `finishBundle`. But it's possible that the bundle size is smaller than threshold (more common in streaming) and you might not batch efficiently.

--
You received this message because you are subscribed to the Google Groups "Scio Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scio-users+...@googlegroups.com.
To post to this group, send email to scio-...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/scio-users/20274afa-e093-4602-bfb6-765abf033293%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Damien Gouyette

unread,
Jun 27, 2019, 2:29:58 PM6/27/19
to Scio Users
Thanks Neville, you saved my day.  👍
The first solution is really easy, and it works. 
I will also explore the second one.



Damien
To unsubscribe from this group and stop receiving emails from it, send an email to scio-...@googlegroups.com.

Neville Li

unread,
Jun 28, 2019, 11:27:39 AM6/28/19
to Damien Gouyette, Scio Users
Someone on gitter pointed out `GroupIntoBatches` which seems to do exactly what you want but in a nicer way.

To unsubscribe from this group and stop receiving emails from it, send an email to scio-users+...@googlegroups.com.

To post to this group, send email to scio-...@googlegroups.com.

damien Gouyette

unread,
Jul 17, 2019, 9:06:13 AM7/17/19
to Neville Li, Scio Users
It works nice with 0.8.0-alpha2, thanks

override protected def write(data: SCollection[RowT], params: WriteParam): EmptyTap.type = {
data.map(s => s.hashCode() -> s).batchByKey(100).map(s=> marketingcloud.write(s._2.toSeq, params.dataExtensionKey, params.dataExtensionName))
EmptyTap
}
--
Cordialement

Damien GOUYETTE
Reply all
Reply to author
Forward
0 new messages