Hello everyone
I am writing a connector for SalesForce Cloud Marketing. 🤪
The output will be in a data extension.
and
To write the following code:
How can i split equally a SCollection[T] to a SCollection[Seq[T]] ?
Is there a better way ?
import com.spotify.scio.ScioContext
import com.spotify.scio.coders.Coder
import com.spotify.scio.io.{EmptyTap, EmptyTapOf, ScioIO, Tap}
import com.spotify.scio.values.SCollection
import scala.concurrent.Future
import scala.language.reflectiveCalls
final case class DataExtensionIO[T]() extends ScioIO[T] {
override type ReadP = Nothing
override type WriteP = WriteParam[T]
override val tapT = EmptyTapOf[T]
override def read(sc: ScioContext, params: ReadP): SCollection[T] = throw new UnsupportedOperationException("Can't read from Data extension")
override def tap(params: Nothing): Tap[tapT.T] = EmptyTap
override protected def write(data: SCollection[T], params: WriteParam[T]): Future[Tap[Nothing]] = {
data.map(marketingcloud.write)
Future.successful(EmptyTap)
}
}
package object marketingcloud {
def write[T]: Seq[T] => Seq[T] = (values: Seq[T]) => {
MCClient.insertRows(values.map(_.toString))
values
}
implicit class MarketingcloudSCollection[T](@transient private val self: SCollection[T])
extends AnyVal {
def saveAsDataExtension(name: String)(implicit coder: Coder[T]) = self.write(DataExtensionIO[T]())(WriteParam())
}
case class WriteParam[T](f: T => Seq[T])
}