I don't quite understand how i can register a schema to use it in jdbc source or sink connectors and read the data in Spark
this is the avro schema that i would like to use to retrieve records from a MS SQL Database
{
"type": "record",
"name": "myrecord",
"fields": [
{ "name": "int1", "type": "int" },
{ "name": "str1", "type": "string" },
{ "name": "str2", "type": "string" }
]
}
i want to use this schema for this source connector
{"name": "mssql-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"incrementing.column.name": "int1",
"tasks.max": "1",
"table.whitelist": "Hello",
"mode": "incrementing",
"topic.prefix": "mssql-",
"name": "mssql-source",
"connection.url":
"jdbc:sqlserver://XXX.XXX.X;databaseName=XXX;username=XX;password=XX"
}
and this is the Spark Consumer that i'm using
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import kafka.serializer.DefaultDecoder;
import kafka.serializer.StringDecoder;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class SparkAvroConsumer {
private static Injection<GenericRecord, byte[]> recordInjection;
private static final String USER_SCHEMA = "{"
+ "\"type\":\"record\","
+ "\"name\":\"myrecord\","
+ "\"fields\":["
+ " { \"name\":\"int1\", \"type\":\"int\" },"
+ " { \"name\":\"str1\", \"type\":\"string\" },"
+ " { \"name\":\"str2\", \"type\":\"string\" }"
+ "]}";
static {
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
recordInjection = GenericAvroCodecs.toBinary(schema);
}
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("kafka-sandbox")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
Set<String> topics = Collections.singleton("mssql-Hello");
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
kafkaParams.put("metadata.broker.list", "localhost:9092");
kafkaParams.put("schema.registry.url", "http://localhost:8081");
JavaPairInputDStream<String, byte[]> directKafkaStream = KafkaUtils.createDirectStream(ssc,
String.class, byte[].class, StringDecoder.class, DefaultDecoder.class, kafkaParams, topics);
directKafkaStream
.map(message -> recordInjection.invert(message._2).get())
.foreachRDD(rdd -> {
rdd.foreach(record -> {
System.out.println("int1= " + record.get("int1")
+ ", str1= " + record.get("str1")
+ ", str2=" + record.get("str2"));
});
});
ssc.start();
ssc.awaitTermination();
}
}--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/811ce2c1-1e5b-4795-8580-4a227205fbd9%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.