Connecting to Cloud SQL from Dataproc

5,178 views
Skip to first unread message

Blesson Mathew Sam

unread,
Apr 17, 2018, 3:29:14 PM4/17/18
to Google Cloud Dataproc Discussions
I'm having trouble connecting to Cloud SQL from my spark job.

val spark = SparkSession
  .builder()
  .appName("Spark Cloud-SQL basic example")
  .getOrCreate()

val connectionProperties = new Properties()
connectionProperties.setProperty("user","root")
connectionProperties.setProperty("password","#####")

val databaseName = "test_database"
val instanceConnectionName = "instance_name"

val jdbcUrl = String.format("jdbc:mysql://google/%s?cloudSqlInstance=%s&socketFactory=com.google.cloud.sql.mysql.SocketFactory",databaseName,instanceConnectionName);

val mktPersonDf = spark.read.jdbc(jdbcUrl,"test_table",connectionProperties)

mktPersonDf.count()

When I submit this job to the cluster, I'm getting the following error.

Exception in thread "main" com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Could not create socket factory 'com.google.cloud.sql.mysql.SocketFactory' due to underlying exception: 
        at sun
.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun
.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun
.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java
.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at com
.mysql.jdbc.Util.handleNewInstance(Util.java:418)
        at com
.mysql.jdbc.Util.getInstance(Util.java:401)
        at com
.mysql.jdbc.SQLError.createSQLException(SQLError.java:918)
        at com
.mysql.jdbc.SQLError.createSQLException(SQLError.java:897)
        at com
.mysql.jdbc.SQLError.createSQLException(SQLError.java:886)
        at com
.mysql.jdbc.SQLError.createSQLException(SQLError.java:860)
        at com
.mysql.jdbc.MysqlIO.createSocketFactory(MysqlIO.java:3330)
        at com
.mysql.jdbc.MysqlIO.<init>(MysqlIO.java:296)
        at com
.mysql.jdbc.ConnectionImpl.coreConnect(ConnectionImpl.java:2192)
        at com
.mysql.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:2225)
        at com
.mysql.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:2024)
        at com
.mysql.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:779)
        at com
.mysql.jdbc.JDBC4Connection.<init>(JDBC4Connection.java:47)
        at sun
.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun
.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun
.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java
.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at com
.mysql.jdbc.Util.handleNewInstance(Util.java:418)
        at com
.mysql.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:389)
        at com
.mysql.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:330)
        at org
.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:61)
        at org
.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:52)
        at org
.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:58)
        at org
.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:114)
        at org
.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:52)
        at org
.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:307)
        at org
.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
        at org
.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
        at org
.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:193)
        at com
.marketo.analytics.App$.main(App.scala:27)
        at com
.marketo.analytics.App.main(App.scala)
        at sun
.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun
.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun
.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java
.lang.reflect.Method.invoke(Method.java:498)
        at org
.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:775)
        at org
.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org
.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org
.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
        at org
.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: com.google.cloud.sql.mysql.SocketFactory
        at java
.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java
.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun
.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
        at java
.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java
.lang.Class.forName0(Native Method)
        at java
.lang.Class.forName(Class.java:264)
        at com
.mysql.jdbc.MysqlIO.createSocketFactory(MysqlIO.java:3328)
       
... 33 more

I've added the mysql-socket-factory dependency to maven. 

<dependency>
  <groupId>com.google.cloud.sql</groupId>
  <artifactId>mysql-socket-factory</artifactId>
  <version>1.0.5</version>
</dependency>

Is there something else I need to configure to read from Cloud SQL? 

Patrick Clay

unread,
Apr 18, 2018, 2:31:52 PM4/18/18
to Google Cloud Dataproc Discussions
Are you using the shade or uber plugins to create an uber jar? Or bringing it along side with --jars or --packages? If not Maven will only compile your code and Spark won't have your dependencies.

Even if you are doing that you probably would hit MethodNotFoundErrors, because that jar conflicts with the GCS connector installed on the cluster. There are possible 3 solutions to that:
  1. Use a Proxy daemon instead the Socket Factory. This initialization action installs and configures proxy daemons so you don't need the socket factory.
  2. Use the Maven Shade plugin to relocate com.google in your jar. You may need to also change your parameters, but shade *should* handle that.
  3. Use this initialization action to upgrade the bundled connectors to versions that shouldn't collide with your jar.
I recommend the connector init action and --packages to be easiest. So
gcloud dataproc clusters create your-cluster --initialization-actions gs://dataproc-initialization-actions/connectors/connectors.sh
gcloud dataproc jobs submit spark --cluster your-cluster --jar your-jar --properties spark.jar.packages=com.google.cloud.sql:mysql-socket-factory:1.0.5

Disclaimer. I have not validated the above, but I think it should work. If I misunderstood your issue, or that doesn't work. i would be happy to help further.

-Patrick

Blesson Mathew Sam

unread,
Apr 19, 2018, 3:44:50 PM4/19/18
to Google Cloud Dataproc Discussions
Thanks for the reply Patrick!

Here is how it went when I tried the 3 options.

1. I had tried the proxy option before. I uploaded the script to my bucket after adding the correct mysql password. 

readonly MYSQL_ROOT_PASSWORD='######'

But when I tried to create the cluster with the initialization action, it stopped with the following error on all nodes.

/etc/google-dataproc/startup-scripts/dataproc-initialization-script-0: 20: set: Illegal option -o pipefail

After I removed the 'o' from

set -euxo pipefail

and ran again, it resulted in another error. 

/etc/google-dataproc/startup-scripts/dataproc-initialization-script-0: 45: /etc/google-dataproc/startup-scripts/dataproc-initialization-script-0: Syntax error: "(" unexpected

on 

echo "[$(date +'%Y-%m-%dT%H:%M:%S%z')]: $@" >&2

I did not proceed further as I thought there might be an issue with the script itself.

2. I did use shade to build an uber jar and made sure the dependency was included.

3. Used this initialization action but still ended with the same error.

Thanks again for the help!

Blesson

Igor Dvorzhak

unread,
Apr 19, 2018, 3:52:54 PM4/19/18
to Google Cloud Dataproc Discussions
Hi Blesson,

May you share you Shade plugin configuration that you have used to create an uber jar?

Best regards,
Igor Dvorzhak

Blesson Mathew Sam

unread,
Apr 19, 2018, 4:38:20 PM4/19/18
to Google Cloud Dataproc Discussions
Hi Igor,

Please find the shade configuration below.

<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-shade-plugin</artifactId>
  <version>3.0.0</version>
  <executions>
    <execution>
      <phase>package</phase>
      <goals>
        <goal>shade</goal>
      </goals>
      <configuration>
        <artifactSet>
          <excludes>
            <exclude>junit:junit</exclude>
            <exclude>org.scalatest:scalatest_2.10</exclude>
          </excludes>
        </artifactSet>
        <filters>
          <filter>
            <artifact>*:*</artifact>
            <excludes>
              <exclude>META-INF/*.SF</exclude>
              <exclude>META-INF/*.DSA</exclude>
              <exclude>META-INF/*.RSA</exclude>
            </excludes>
          </filter>
        </filters>
      </configuration>
    </execution>
  </executions>
</plugin>

I see that the jars are included when I do mvn package.

[INFO] Including com.google.cloud.sql:mysql-socket-factory:jar:1.0.5 in the shaded jar.
[INFO] Including mysql:mysql-connector-java:jar:5.1.38 in the shaded jar.
[INFO] Including com.google.cloud.sql:jdbc-socket-factory-core:jar:1.0.5 in the shaded jar.
[INFO] Including com.google.apis:google-api-services-sqladmin:jar:v1beta4-rev25-1.22.0 in the shaded jar.

Thanks!
Blesson

Igor Dvorzhak

unread,
Apr 19, 2018, 5:53:03 PM4/19/18
to Google Cloud Dataproc Discussions
Thanks Blesson, I will try to reproduce this issue.

In the meantime, may you try to create cluster using next command:
gcloud dataproc clusters create <CLUSTER_NAME> \
    --initialization-actions gs://dataproc-initialization-actions/connectors/connectors.sh \
    --metadata 'gcs-connector-version=1.8.1' \
    --metadata 'bigquery-connector-version=0.12.1'

and submit your job to it?

Best,
Igor

Igor Dvorzhak

unread,
Apr 20, 2018, 12:34:22 AM4/20/18
to Google Cloud Dataproc Discussions
As specified in Spark documentation you need to add JDBC driver jar to Spark driver class-path: https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases

Using Dataproc CLI it could be done with the command:
$ gcloud dataproc jobs submit spark \
   
--cluster=<CLUSTER_NAME> \
   
--jars=gs://<BUCKET>/<DIRECTORIES>/<UBER_JAR_NAME> \
   
--class=<MAIN_CLASS> \
   
--properties=spark.driver.extraClassPath=<UBER_JAR_NAME>

Because Dataproc distributes all jars to all cluster nodes and uber jar contains JDBC driver, you can just add it to Spark Driver class-path instead of separate JDBC driver jar.

Please, remember to specify Spark dependencies in your Maven project as provided, so Shade plugin will not include them in uber jar.

Igor Dvorzhak

unread,
Apr 20, 2018, 2:13:55 AM4/20/18
to Google Cloud Dataproc Discussions

Blesson Mathew Sam

unread,
Apr 20, 2018, 3:22:00 PM4/20/18
to Google Cloud Dataproc Discussions
Hi Igor,

Adding JDBC driver to the classpath worked! I still had to change the way I created the cluster in order for it to access CloudSQL. 

I had to add --scopes=cloud-platform to the cluster creation command so that the VM instances in the cluster had access to the Cloud APIs.

gcloud dataproc clusters create <CLUSTER_NAME> \
   --initialization-actions gs://dataproc-initialization-actions/connectors/connectors.sh \
   
--scopes=cloud-platform \
   --metadata 'gcs-connector-version=1.8.1' \
   --metadata 'bigquery-connector-version=0.12.1'

Thank you so much for the help!

Blesson
Message has been deleted
Message has been deleted

Karthik Bheemsingh

unread,
Feb 9, 2019, 8:37:09 PM2/9/19
to Google Cloud Dataproc Discussions
HI, I have created a Dataproc cluster with 5 nodes. also i have created a spark job to read content from a text file and update it to CloudSQL. able to write to CloudSQL successfully but when am trying to write 100k rows it is running more than 1 hour. which is running in a minute in my local machine.

Is it because of CloudSQL or any cluster configuration issue ? Any suggestion how to make it faster.

Thanks in Advance..

Regards,

Karthik

Amey Barve

unread,
Apr 1, 2019, 8:15:47 AM4/1/19
to Google Cloud Dataproc Discussions
Hi Blesson,

Please need Help

I want to write to a PostgreSQL instance of Cloud SQL using spark code through dataproc.
I have used following dependencies:
 a. postgresql.jar
 b. postgres-socket-factory.jar
 c. postgres-socket-factory-1.0.11-jar-with-dependencies.jar

I am running a dataproc which will try to connect using step#1
But when get following exception


2019-04-01 11:05:03.998 IST
Something unusual has occurred to cause the driver to fail. Please report this exception. at org.postgresql.Driver.connect(Driver.java:277) at java.sql.DriverManager.getConnection(DriverManager.java:664) at java.sql.DriverManager.getConnection(DriverManager.java:270) at rdsConnector$.getConnection(rdsConnector.scala:33) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)Caused by: java.lang.RuntimeException: Unable to retrieve information about Cloud SQL instance [gold-bruin-220416:us-east1:pii-isolation] at com.google.cloud.sql.core.SslSocketFactory.obtainInstanceMetadata(SslSocketFactory.java:459) at com.google.cloud.sql.core.SslSocketFactory.fetchInstanceSslInfo(SslSocketFactory.java:333) at com.google.cloud.sql.core.SslSocketFactory.getInstanceSslInfo(SslSocketFactory.java:313) at com.google.cloud.sql.core.SslSocketFactory.createAndConfigureSocket(SslSocketFactory.java:194) at com.google.cloud.sql.core.SslSocketFactory.create(SslSocketFactory.java:160) at com.google.cloud.sql.postgres.SocketFactory.createSocket(SocketFactory.java:96) at org.postgresql.core.PGStream.<init>(PGStream.java:62) at org.postgresql.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:91) at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:192) at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49) at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:195) at org.postgresql.Driver.makeConnection(Driver.java:454) at org.postgresql.Driver.connect(Driver.java:256) ... 11 moreCaused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 Forbidden


2019-04-01 11:05:03.000 IST
User class threw exception: org.postgresql.util.PSQLException: Something unusual has occurred to cause the driver to fail. Please report this exception.


I understand this as a permission issue, but since I am using dataproc to connect to postgres what permission is missing?

I am using python code to launch dataproc cluster and submit a job to it.
How can I give permissions to my python code while starting dataproc cluster.

def create_cluster(self):
       
       
print('Creating cluster...')
       
print('dataproc',self.dataproc)
           
       
#dataproc = get_client()
       
        zone_uri
= \
           
'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format(
               
self.project_id, self.zone)
        cluster_data
= {
           
'projectId': self.project_id,
           
'clusterName': self.cluster_name,
     
'scopes': 'sql-admin',
           
'config': {
               
'gceClusterConfig': {
                   
'zoneUri': zone_uri
               
},
               
'masterConfig': {
                   
'numInstances': 1,
                   
'machineTypeUri': 'n1-standard-1'
               
},
               
'workerConfig': {
                   
'numInstances': 2,
                   
'machineTypeUri': 'n1-standard-1'
               
}                    
           
}
       
#cluster_data['config']['lifecycleConfig']['idleDeleteTtl'] = '300s'
       
}
        result
= self.dataproc.projects().regions().clusters().create(
            projectId
=self.project_id,
            region
=self.cluster_region,
            body
=cluster_data).execute()
       
print("Cluster Created!!!")
       
return result


Thanks in advance!
Message has been deleted

adam...@quantexa.com

unread,
Apr 1, 2019, 8:54:12 AM4/1/19
to Google Cloud Dataproc Discussions

com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 Forbidden

 

Can you ensure that you have the Cloud SQL API auth scope enabled on your compute instances for your dataproc cluster?

 

Further to this can you ensure that you have CloudSQL IAM permissions on the dataproc compute instance service account

Message has been deleted

adam...@quantexa.com

unread,
Apr 1, 2019, 8:55:47 AM4/1/19
to Google Cloud Dataproc Discussions
roles/cloudsql.client

Amey Barve

unread,
Apr 1, 2019, 11:27:37 PM4/1/19
to Google Cloud Dataproc Discussions
Thanks Adam,

Can you ensure that you have the Cloud SQL API auth scope enabled on your compute instances for your dataproc cluster?
>>> How to set Cloud SQL API auth scope on compute instances for dataproc cluster, I am using python code to launch this cluster.

Further to this can you ensure that you have CloudSQL IAM permissions on the dataproc compute instance service account
>>> I am using an account who has owner permissions (Role).

Regards,

Reply all
Reply to author
Forward
0 new messages