ava.lang.IllegalArgumentException: Number of groups must be positive. Using postgres -> Redshift

402 views
Skip to first unread message

Charles Boat

unread,
Jan 29, 2018, 12:15:19 PM1/29/18
to Confluent Platform
Hi all.  

Working on setting up Kafka running from our RDS Postgres 9.6 to Redhift.  Using the guidelines at https://blog.insightdatascience.com/from-postgresql-to-redshift-with-kafka-connect-111c44954a6a and we have the all of the infrastructure set up, and am working on fully setting up Confluent.  I'm getting the error of ava.lang.IllegalArgumentException: Number of groups must be positive. when trying to set stuff up.  Here's my config file: 

name=source-postgres
connector
.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks
.max=16

connection
.url= ((correct url and information here))
mode
=timestamp+incrementing
timestamp
.column.name=updated_at
incrementing
.column.name=id
topic
.prefix=postgres_

I'm completely new at Confluent/ Kafka so please explain like I'm 5! I can provide anything else needed for a solution. 

Charles Boat

unread,
Jan 29, 2018, 12:23:59 PM1/29/18
to Confluent Platform
Full error:


/usr/local/confluent$ /usr/local/confluent/bin/connect-standalone /usr/local/confluent/etc/schema-registry/connect-avro-standalone.properties /usr/local/confluent/etc/kafka-connect-jdbc/source-postgres.properties
SLF4J
: Class path contains multiple SLF4J bindings.
SLF4J
: Found binding in [jar:file:/usr/local/confluent/share/java/kafka-serde-tools/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J
: Found binding in [jar:file:/usr/local/confluent/share/java/kafka-connect-elasticsearch/slf4j-simple-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J
: Found binding in [jar:file:/usr/local/confluent/share/java/kafka-connect-hdfs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J
: Found binding in [jar:file:/usr/local/confluent/share/java/kafka/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J
: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J
: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
[2018-01-29 16:49:49,820] INFO StandaloneConfig values:
        access
.control.allow.methods =
        access
.control.allow.origin =
        bootstrap
.servers = [localhost:9092]
       
internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
       
internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
        key
.converter = class io.confluent.connect.avro.AvroConverter
        offset
.flush.interval.ms = 60000
        offset
.flush.timeout.ms = 5000
        offset
.storage.file.filename = /tmp/connect.offsets
        rest
.advertised.host.name = null
        rest
.advertised.port = null
        rest
.host.name = null
        rest
.port = 8083
        task
.shutdown.graceful.timeout.ms = 5000
        value
.converter = class io.confluent.connect.avro.AvroConverter
 
(org.apache.kafka.connect.runtime.standalone.StandaloneConfig:180)
[2018-01-29 16:49:49,942] INFO Logging initialized @549ms (org.eclipse.jetty.util.log:186)
[2018-01-29 16:49:50,301] INFO Kafka Connect starting (org.apache.kafka.connect.runtime.Connect:52)
[2018-01-29 16:49:50,302] INFO Herder starting (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:70)
[2018-01-29 16:49:50,302] INFO Worker starting (org.apache.kafka.connect.runtime.Worker:113)
[2018-01-29 16:49:50,302] INFO Starting FileOffsetBackingStore with file /tmp/connect.offsets (org.apache.kafka.connect.storage.FileOffsetBackingStore:60)
[2018-01-29 16:49:50,304] INFO Worker started (org.apache.kafka.connect.runtime.Worker:118)
[2018-01-29 16:49:50,305] INFO Herder started (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:72)
[2018-01-29 16:49:50,305] INFO Starting REST server (org.apache.kafka.connect.runtime.rest.RestServer:98)
[2018-01-29 16:49:50,434] INFO jetty-9.2.15.v20160210 (org.eclipse.jetty.server.Server:327)
Jan 29, 2018 4:49:51 PM org.glassfish.jersey.internal.Errors logErrors
WARNING
: The following warnings have been detected: WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING
: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING
: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING
: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.
[2018-01-29 16:49:51,385] INFO Started o.e.j.s.ServletContextHandler@5aabbb29{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
[2018-01-29 16:49:51,409] INFO Started ServerConnector@54dab9ac{HTTP/
1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266)
[2018-01-29 16:49:51,409] INFO Started @2019ms (org.eclipse.jetty.server.Server:379)
[2018-01-29 16:49:51,410] INFO REST server listening at http://127.0.0.1:8083/, advertising URL http://127.0.0.1:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:150)
[2018-01-29 16:49:51,410] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:58)
[2018-01-29 16:49:51,412] INFO ConnectorConfig values:
        connector
.class = io.confluent.connect.jdbc.JdbcSourceConnector
        key
.converter = null
        name
= source-postgres
        tasks
.max = 16
        value
.converter = null
 
(org.apache.kafka.connect.runtime.ConnectorConfig:180)
[2018-01-29 16:49:51,413] INFO Creating connector source-postgres of type io.confluent.connect.jdbc.JdbcSourceConnector (org.apache.kafka.connect.runtime.Worker:159)
[2018-01-29 16:49:51,416] INFO Instantiated connector source-postgres with version 3.1.2 of type class io.confluent.connect.jdbc.JdbcSourceConnector (org.apache.kafka.connect.runtime.Worker:162)
[2018-01-29 16:49:51,419] INFO JdbcSourceConnectorConfig values:
        batch
.max.rows = 100
        connection
.url =
        incrementing
.column.name = id
        mode
= timestamp+incrementing
        poll
.interval.ms = 5000
        query
=
        schema
.pattern = null
        table
.blacklist = []
        table
.poll.interval.ms = 60000
        table
.types = [TABLE]
        table
.whitelist = []
        timestamp
.column.name = updated_at
        timestamp
.delay.interval.ms = 0
        topic
.prefix = postgres_
        validate
.non.null = true
 
(io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig:180)
[2018-01-29 16:49:52,129] INFO Finished creating connector source-postgres (org.apache.kafka.connect.runtime.Worker:173)
[2018-01-29 16:49:52,130] INFO SourceConnectorConfig values:
        connector
.class = io.confluent.connect.jdbc.JdbcSourceConnector
        key
.converter = null
        name
= source-postgres
        tasks
.max = 16
        value
.converter = null
 
(org.apache.kafka.connect.runtime.SourceConnectorConfig:180)
[2018-01-29 16:49:52,209] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:102)
java
.lang.IllegalArgumentException: Number of groups must be positive.
        at org
.apache.kafka.connect.util.ConnectorUtils.groupPartitions(ConnectorUtils.java:45)
        at io
.confluent.connect.jdbc.JdbcSourceConnector.taskConfigs(JdbcSourceConnector.java:123)
        at org
.apache.kafka.connect.runtime.Worker.connectorTaskConfigs(Worker.java:193)
        at org
.apache.kafka.connect.runtime.standalone.StandaloneHerder.recomputeTaskConfigs(StandaloneHerder.java:251)
        at org
.apache.kafka.connect.runtime.standalone.StandaloneHerder.updateConnectorTasks(StandaloneHerder.java:281)
        at org
.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:163)
        at org
.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:96)
[2018-01-29 16:49:52,210] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:68)
[2018-01-29 16:49:52,210] INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer:154)
[2018-01-29 16:49:52,213] INFO Stopped ServerConnector@54dab9ac{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:306)
[2018-01-29 16:49:52,218] INFO Stopped o.e.j.s.ServletContextHandler@5aabbb29{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:865)
[2018-01-29 16:49:52,224] INFO REST server stopped (org.apache.kafka.connect.runtime.rest.RestServer:165)
[2018-01-29 16:49:52,224] INFO Herder stopping (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:76)
[2018-01-29 16:49:52,224] INFO Stopping connector source-postgres (org.apache.kafka.connect.runtime.Worker:218)
[2018-01-29 16:49:52,225] INFO Stopping table monitoring thread (io.confluent.connect.jdbc.JdbcSourceConnector:137)
[2018-01-29 16:49:52,225] INFO Stopped connector source-postgres (org.apache.kafka.connect.runtime.Worker:229)
[2018-01-29 16:49:52,225] INFO Worker stopping (org.apache.kafka.connect.runtime.Worker:122)
[2018-01-29 16:49:52,225] INFO Stopped FileOffsetBackingStore (org.apache.kafka.connect.storage.FileOffsetBackingStore:68)
[2018-01-29 16:49:52,225] INFO Worker stopped (org.apache.kafka.connect.runtime.Worker:142)
[2018-01-29 16:49:57,334] INFO Reflections took 6952 ms to scan 263 urls, producing 12036 keys and 80097 values  (org.reflections.Reflections:229)
[2018-01-29 16:49:57,346] INFO Herder stopped (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:86)
[2018-01-29 16:49:57,346] INFO Kafka Connect stopped (org.apache.kafka.connect.runtime.Connect:73)

Robin Moffatt

unread,
Jan 30, 2018, 4:11:44 AM1/30/18
to confluent...@googlegroups.com
What tables are you wanting to include? From what I can see you're not specifying schema.pattern, table.whitelist, or table.blacklist - which if it's valid would pull every object from the database. 
Even if you want all tables, can you try it with table.whitelist for one specific table, just to check that works? 

Could be related? https://github.com/confluentinc/kafka-connect-jdbc/issues/213



--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/b78e5f95-bb71-4396-b3e1-712b28c8c7a8%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Charles Boat

unread,
Feb 2, 2018, 11:46:24 AM2/2/18
to Confluent Platform
Hi Robin,

I'm trying to use Kafka for the entire DB from PG to RS, all schema, and tables as well.   Is this feasible within Kafka? Also, trying to even getting this off the ground anyway.  Is there specific configs I should be looking at? I'm very confused at the whole thing here.
To post to this group, send email to confluent...@googlegroups.com.

Charles Boat

unread,
Feb 2, 2018, 2:27:24 PM2/2/18
to Confluent Platform
So, I am getting this when I try to start up Schema Registry, and it just stops and hangs.
ubuntu@kafka-prod-01:/usr/local/confluent/etc/schema-registry$ /usr/local/confluent/bin/schema-registry-start /usr/local/confluent/etc/schema-registry/schema-registry.properties &
[3] 27612
[2]   Exit 127                usr/bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
ubuntu@kafka-prod-01:/usr/local/confluent/etc/schema-registry$ log4j:ERROR setFile(null,true) call failed.
java.io.FileNotFoundException: /tmp/schema-registry-logs/schema-registry.log (Permission denied)
        at java.io.FileOutputStream.open0(Native Method)
        at java.io.FileOutputStream.open(FileOutputStream.java:270)
        at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
        at java.io.FileOutputStream.<init>(FileOutputStream.java:133)
        at org.apache.log4j.FileAppender.setFile(FileAppender.java:294)
        at org.apache.log4j.RollingFileAppender.setFile(RollingFileAppender.java:207)
        at org.apache.log4j.FileAppender.activateOptions(FileAppender.java:165)
        at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)
        at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:172)
        at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:104)
        at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:842)
        at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)
        at org.apache.log4j.PropertyConfigurator.configureRootCategory(PropertyConfigurator.java:648)
        at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:514)
        at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)
        at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
        at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)
        at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:66)
        at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:358)
        at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:383)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.<clinit>(SchemaRegistryMain.java:29)
[2018-02-02 16:24:49,809] INFO SchemaRegistryConfig values:
        metric.reporters = []
        kafkastore.sasl.kerberos.kinit.cmd = /usr/bin/kinit
        response.mediatype.default = application/vnd.schemaregistry.v1+json
        kafkastore.ssl.trustmanager.algorithm = PKIX
        authentication.realm =
        ssl.keystore.type = JKS
        kafkastore.topic = _schemas
        metrics.jmx.prefix = kafka.schema.registry
        kafkastore.ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1
        kafkastore.topic.replication.factor = 3
        ssl.truststore.password =
        kafkastore.timeout.ms = 500
        host.name = localhost
        kafkastore.bootstrap.servers = []
        schema.registry.zk.namespace = schema_registry
        kafkastore.sasl.kerberos.ticket.renew.window.factor = 0.8
        kafkastore.sasl.kerberos.service.name =
        ssl.endpoint.identification.algorithm =
        compression.enable = false
        kafkastore.ssl.truststore.type = JKS
        avro.compatibility.level = backward
        kafkastore.ssl.protocol = TLS
        kafkastore.ssl.provider =
        kafkastore.ssl.truststore.location =
        response.mediatype.preferred = [application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json]
        kafkastore.ssl.keystore.type = JKS
        ssl.truststore.type = JKS
        kafkastore.ssl.truststore.password =
        access.control.allow.origin =
        ssl.truststore.location =
        ssl.keystore.password =
        port = 8081
        kafkastore.ssl.keystore.location =
        master.eligibility = true
        ssl.client.auth = false
        kafkastore.ssl.keystore.password =
        kafkastore.security.protocol = PLAINTEXT
        ssl.trustmanager.algorithm =
        authentication.method = NONE
        request.logger.name = io.confluent.rest-utils.requests
        ssl.key.password =
        kafkastore.zk.session.timeout.ms = 30000
        kafkastore.sasl.mechanism = GSSAPI
        kafkastore.sasl.kerberos.ticket.renew.jitter = 0.05
        kafkastore.ssl.key.password =
        zookeeper.set.acl = false
        authentication.roles = [*]
        metrics.num.samples = 2
        ssl.protocol = TLS
        kafkastore.ssl.keymanager.algorithm = SunX509
        kafkastore.connection.url = localhost:2181
        debug = false
        listeners = [http://0.0.0.0:8081]
        ssl.provider =
        ssl.enabled.protocols = []
        shutdown.graceful.ms = 1000
        ssl.keystore.location =
        ssl.cipher.suites = []
        kafkastore.ssl.endpoint.identification.algorithm =
        kafkastore.ssl.cipher.suites =
        access.control.allow.methods =
        kafkastore.sasl.kerberos.min.time.before.relogin = 60000
        ssl.keymanager.algorithm =
        metrics.sample.window.ms = 30000
        kafkastore.init.timeout.ms = 60000
 (io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig:169)
[2018-02-02 16:24:50,372] INFO Initializing KafkaStore with broker endpoints: PLAINTEXT://localhost:9092 (io.confluent.kafka.schemaregistry.storage.KafkaStore:127)
[2018-02-02 16:24:50,391] WARN The replication factor of the schema topic _schemas is less than the desired one of 3. If this is a production environment, it's crucial to add more brokers and increase the replication factor of the topic. (io.confluent.kafka.schemaregistry.storage.KafkaStore:323)
[2018-02-02 16:24:50,646] INFO Initialized last consumed offset to -1 (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:122)
[2018-02-02 16:24:50,656] INFO [kafka-store-reader-thread-_schemas], Starting  (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:70)
[2018-02-02 16:24:50,726] INFO Wait to catch up until the offset of the last message at 2 (io.confluent.kafka.schemaregistry.storage.KafkaStore:343)
[2018-02-02 16:24:50,799] INFO Created schema registry namespace localhost:2181/schema_registry (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry:238)
[2018-02-02 16:24:50,816] INFO Successfully elected the new master: {"host":"localhost","port":8081,"master_eligibility":true,"version":1} (io.confluent.kafka.schemaregistry.zookeeper.ZookeeperMasterElector:83)
[2018-02-02 16:24:50,822] INFO Successfully elected the new master: {"host":"localhost","port":8081,"master_eligibility":true,"version":1} (io.confluent.kafka.schemaregistry.zookeeper.ZookeeperMasterElector:83)
[2018-02-02 16:24:50,850] INFO Logging initialized @1388ms (org.eclipse.jetty.util.log:186)
[2018-02-02 16:24:50,908] INFO Adding listener: http://0.0.0.0:8081 (io.confluent.rest.Application:174)
[2018-02-02 16:24:50,962] INFO jetty-9.2.12.v20150709 (org.eclipse.jetty.server.Server:327)
[2018-02-02 16:24:51,495] INFO HV000001: Hibernate Validator 5.1.2.Final (org.hibernate.validator.internal.util.Version:27)
[2018-02-02 16:24:51,625] INFO Started o.e.j.s.ServletContextHandler@278bb07e{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
[2018-02-02 16:24:51,633] INFO Started NetworkTrafficServerConnector@366c4480{HTTP/1.1}{0.0.0.0:8081} (org.eclipse.jetty.server.NetworkTrafficServerConnector:266)
[2018-02-02 16:24:51,634] INFO Started @2172ms (org.eclipse.jetty.server.Server:379)
[2018-02-02 16:24:51,635] INFO Server started, listening for requests... (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:45)


On Tuesday, January 30, 2018 at 4:11:44 AM UTC-5, Robin Moffatt wrote:
To post to this group, send email to confluent...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages