How to execute PostgreSQL SELECT query using cloud sql in cloud composer's airflow?

1,461 views
Skip to first unread message

aniruddh...@nytimes.com

unread,
Apr 9, 2019, 2:28:28 AM4/9/19
to cloud-composer-discuss
I am new to cloud composer & I want to execute one PostgreSQL SELECT query using gcp_cloud_sql hook in cloud composer's airflow. I tried with CloudSqlQueryOperator but it doesn't work with SELECT queries.

I want to create DAGs on basis of results I get from this select query.However, I am not able to create even simple connection for this SELECT query.My Cloud Composer's airflow version is 1.10.0 and I get error as - broken DAG[dynamic_dag.py] no module named gcp_sql_operator. Is there any other cloud sql operator or postgre operator which I can use for my SELECT query ?


from six.moves.urllib.parse import quote_plus
import airflow
from airflow import models     
from airflow.contrib.operators.gcp_sql_operator import (
    CloudSqlQueryOperator
)
from datetime import date, datetime, timedelta 

GCP_PROJECT_ID = "adtech-dev"
GCP_REGION = "<my cluster zone>"
GCSQL_POSTGRES_INSTANCE_NAME_QUERY = "testpostgres"
GCSQL_POSTGRES_DATABASE_NAME = ""
GCSQL_POSTGRES_USER = "<PostgreSQL User Name>"
GCSQL_POSTGRES_PASSWORD = "**********"
GCSQL_POSTGRES_PUBLIC_IP = "0.0.0.0"
GCSQL_POSTGRES_PUBLIC_PORT = "5432"

rule_query = "select r.id from rules r where r.id = 1"

postgres_kwargs = dict(
user=quote_plus(GCSQL_POSTGRES_USER),
password=quote_plus(GCSQL_POSTGRES_PASSWORD),
public_port=GCSQL_POSTGRES_PUBLIC_PORT,
public_ip=quote_plus(GCSQL_POSTGRES_PUBLIC_IP),
project_id=quote_plus(GCP_PROJECT_ID),
location=quote_plus(GCP_REGION),
instance=quote_plus(GCSQL_POSTGRES_INSTANCE_NAME_QUERY),
database=quote_plus(GCSQL_POSTGRES_DATABASE_NAME)
)

default_args = { 
    'owner': 'airflow',
    'start_date': datetime(2018, 5, 31),
    'email': ['aniruddh...@xyz.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'depends_on_past': False,
    'catchup': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=10),
}

os.environ['AIRFLOW_CONN_PROXY_POSTGRES_TCP'] = \
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
    "database_type=postgres&" \
    "project_id={project_id}&" \
    "location={location}&" \
    "instance={instance}&" \
    "use_proxy=True&" \
    "sql_proxy_use_tcp=True".format(**postgres_kwargs)

connection_names = [
    "proxy_postgres_tcp"
]

tasks = []

with models.DAG(
    dag_id='example_gcp_sql_query',
    default_args=default_args,
    schedule_interval=None
) as dag:
    prev_task = None

    for connection_name in connection_names:
        task = CloudSqlQueryOperator(
            gcp_cloudsql_conn_id=connection_name,
            task_id="example_gcp_sql_task_" + connection_name,
            sql=rule_query
       )
        tasks.append(task)
        if prev_task:
            prev_task >> task
        prev_task = task

Raj Shekar

unread,
Apr 9, 2019, 2:35:25 AM4/9/19
to aniruddh...@nytimes.com, cloud-composer-discuss
Hi Anirudha,

Use this postgres operator.

from airflow.operators import  PostgresOperator

PostgresOperator(
           task_id='create_cluster_success_sql_operator',
           sql=getQuery('create_cluster','success',r'successfully cluster=%s is created'%cluster_name), => here write your query
           postgres_conn_id=POSTGRES_CONN_ID,
           dag=dag
        )

Thanks,
Rajashekar.

--
You received this message because you are subscribed to the Google Groups "cloud-composer-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-di...@googlegroups.com.
To post to this group, send email to cloud-compo...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cloud-composer-discuss/188e4b02-7987-4abe-9151-6ce29eee7272%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

aniruddh...@nytimes.com

unread,
Apr 10, 2019, 1:37:12 AM4/10/19
to cloud-composer-discuss
Hi Raj Shekhar, 

     In case I use Postgres Operator, what should be POSTGRES_CONN_ID ?
     I mean in case of cloud sql, I create connection using "gcpcloudsql://" while in Postgres connection, I would have to create connection URL using "postgresql://" which will not work in cloud composer environment.
     I am using cloud composer's Airflow version 1.10.1
     Please correct me if I am wrong here.

Regards,
Aniruddh 

Raj Shekar

unread,
Apr 10, 2019, 1:47:14 AM4/10/19
to aniruddh...@nytimes.com, cloud-composer-discuss
Hi Aniruddha,

Please go through below url to configure postgres connection in connection.


Thanks,
Rajashekar.

--
You received this message because you are subscribed to the Google Groups "cloud-composer-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-di...@googlegroups.com.
To post to this group, send email to cloud-compo...@googlegroups.com.

Iyer Sree

unread,
Jan 24, 2022, 4:32:53 PM1/24/22
to cloud-composer-discuss
Hi Aniruddh,

Were you able to solve this issue ? 
I used PostgresOperator and for the postgresql connection string, I set it up on airflow --> admin ---> connections.
everything looks correct to me, but  still getting 

psycopg2.OperationalError: connection to server at "35.222.xxx.xxx", port 5432 failed: Connection timed out Is the server running on that host and accepting TCP/IP connections?

Any information will be highly helpful!!!

Reply all
Reply to author
Forward
0 new messages