S3 Notification configuration, with SQS?

1,606 views
Skip to first unread message

Chris Shenton

unread,
Jun 30, 2015, 7:44:23 PM6/30/15
to cloudto...@googlegroups.com
I'd like to be able to inject a message into an SQS queue upon S3 POST/PUT/CompleteMultiPartUpload. I'm seeing what looks like early support for TopicNotification (via SNS) but not QueueConfiguration. I can't figure out how to use TopicNotification. Are there any examples that I might cargo-cult to add Queue notification?

I've tried the following:

r_sns_topic = t.add_resource(
    sns.Topic(
        "S3SNSTopic",
        Subscription=[
            sns.Subscription(
                Endpoint="ch...@myRealDomain.example.com",
                Protocol="email")
        ],
    ))

r_s3_private = t.add_resource(
    s3.Bucket(
        "PrivateBucket",
        AccessControl=s3.BucketOwnerFullControl,
        BucketName="cshenton-test-notification",
        NotificationConfiguration=s3.NotificationConfiguration(
            TopicConfigurations=[
                s3.TopicConfiguration(
                    Event="s3:ObjectCreated:Post",
                    Topic=Ref(r_sns_topic),
                ),
            ],
        ),
    ))

But CloudFormation complains:
  CREATE FAILED Unable to validate the following destination configurations
  Physical ID:cshenton-test-notification

Any leads or examples? Thanks for your help.


Mark Peek

unread,
Jun 30, 2015, 8:02:27 PM6/30/15
to Chris Shenton, cloudto...@googlegroups.com
Perhaps it is correct and you're running into this:
https://forums.aws.amazon.com/thread.jspa?threadID=167470

Mark
--
You received this message because you are subscribed to the Google Groups "cloudtools-dev" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cloudtools-de...@googlegroups.com.
To post to this group, send email to cloudto...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cloudtools-dev/9fda787a-4bb1-43b6-8291-90df1fa252d9%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Chris Shenton

unread,
Jul 1, 2015, 7:09:55 PM7/1/15
to cloudto...@googlegroups.com, ch...@v-studios.com
Ug, that's gnarly. I'll try some direct CloudFormation and see if I can get it working, then abstract it back to Tropo.

Sure wish I could get details on what that error message means...

Thanks for the pointer!

Mike Weber

unread,
Jul 7, 2015, 2:28:53 AM7/7/15
to cloudto...@googlegroups.com
Hi Chris,

Did you ever work this one out? I've got the same issue.

Thanks,
Mike

Chris Shenton

unread,
Jul 10, 2015, 11:48:39 AM7/10/15
to cloudto...@googlegroups.com
I *thought* I did. I worked it out in pure CloudFormation: S3 -> SNS -> SQS. Then rewrote it in Tropo, got it working. Replaced hardwired S3 names with a Ref() and saw it fail with the same issue, so replaced the Ref() with an AWACS Arn from the bucket name, and got it working.  But when I folded this standalone work into my big project's Tropo, it failed with the "validate" error. I don't know why, so I'm gonna have to set the notification with some boto code. CF simply doesn't support the S3 -> SQS notification, and the validation error's killing us.

Here's my code that works in isolation:

#!/usr/bin/env python
# Create S3 bucket with Notification policy which
# sends to SNS Topic, delivers to SQS Queue

from awacs.aws import (
    # Action,
    Allow,
    ArnEquals,
    AWSPrincipal,
    Condition,
    Everybody,
    Policy,
    Principal,
    SourceArn,
    Statement,
)
import awacs.dynamodb
import awacs.iam
import awacs.s3
import awacs.sns
import awacs.sqs
import awacs.sts

from troposphere import (
#    Base64,
#    FindInMap,
    GetAtt,
    Join,
    Output,
#    Parameter,
    Ref,
#    Tags,
    Template,
#    autoscaling,
#    cloudwatch as cw,
#    dynamodb,
#    ec2,
#    elasticloadbalancing as elb,
#    iam,
#    route53,
    s3,
    sns,
    sqs,
)

###############################################################################
# TODO: we could read these from JSON/YAML config files

AWS_ACCOUNT = "[elided]"  # TODO: try ${aws:username}
AWS_REGION = "us-east-1"      # can't get from Policy Vars

ARN_SQS_PREFIX = "arn:aws:sqs:{}:{}:".format(AWS_REGION, AWS_ACCOUNT)

NAME = "cshenton-sns-sqs"
S3_NAME = NAME
SNS_NAME = NAME
SQS_NAME = NAME

###############################################################################

t = Template()
t.add_version("2010-09-09")
t.add_description("S3 notifies SNS which creates message in SQS")

###############################################################################

r_queue = t.add_resource(
    sqs.Queue(
        "Queue",
        QueueName=SQS_NAME,
    ))
t.add_output([
    Output("Queue", Value=Ref(r_queue)),  # returns a URL
    Output("QueueARN", Value=GetAtt(r_queue, "Arn"))
])

r_topic = t.add_resource(
    sns.Topic(
        "Topic",
        Subscription=[
            sns.Subscription(
                Endpoint="ch...@example.com",
                Protocol="email"),
            sns.Subscription(
                Endpoint=GetAtt(r_queue, "Arn"),
                Protocol="sqs")
        ],
    ))
t.add_output([
    Output("Topic",
           Value=GetAtt(r_topic, "TopicName")),
    Output("TopicARN",       # Ref() gets ARN here!
           Value=Ref(r_topic))
])

r_bucket = t.add_resource(
    s3.Bucket(
        "Bucket",
        AccessControl=s3.BucketOwnerFullControl,
        BucketName=S3_NAME,
        NotificationConfiguration=s3.NotificationConfiguration(
            TopicConfigurations=[
                s3.TopicConfiguration(
                    Event="s3:ObjectCreated:*",
                    Topic=Ref(r_topic)
                )
            ],
        ),
    ))
t.add_output([
    Output("Bucket", Value=Ref(r_bucket)),
    Output("BucketArn",
           Value=Join("", ["arn:aws:s3:::", Ref(r_bucket)])),
])

r_topic_policy = t.add_resource(
    sns.TopicPolicy(
        "TopicPolicy",
        PolicyDocument=Policy(
            Version="2008-10-17",
            Statement=[
                Statement(
                    Action=[awacs.sns.Publish],
                    Condition=Condition(
                        # CF can't get ARN from S3 so build it from name.
                        # BUG: Join("", ["arn:aws:s3:::", Ref(r_s3_bucket)])
                        # https://forums.aws.amazon.com/\
                        #         thread.jspa?threadID=167470
                        # CF create-stack can't use Ref(r_s3_bucket) as it's
                        # not stablized when SNS Policy needs it ("Unable to
                        # validate the ... destination ... <bucketname>")
                        # so hardcode the name in the ARN.
                        ArnEquals(
                            SourceArn,
                            awacs.s3.ARN(S3_NAME),
                        )
                    ),
                    Effect=Allow,
                    Principal=Principal("Service", "s3.amazonaws.com"),
                    Resource=[Ref(r_topic)],
                    Sid="Allow-S3-to-Send-Message-To-SNS-Topic",
                ),
            ],
        ),
        Topics=[Ref(r_topic)],
    )
)
t.add_output([
    Output("TopicPolicy", Value=Ref(r_topic_policy))
])

r_queue_policy = t.add_resource(
    sqs.QueuePolicy(
        "QueuePolicy",
        PolicyDocument=Policy(
            Version="2012-10-17",
            Statement=[
                Statement(
                    Action=[awacs.sqs.SendMessage],
                    Condition=Condition(
                        ArnEquals(
                            SourceArn, Ref(r_topic)
                        )
                    ),
                    Effect=Allow,
                    Principal=AWSPrincipal(Everybody),
                    Resource=[GetAtt(r_queue, "Arn")],
                    Sid="Allow-SNS-Topic-to-SendMessage-to-SQS-Queue",
                ),
            ],
        ),
        Queues=[Ref(r_queue)],
    )
)
t.add_output([
    Output("QueuePolicy", Value=Ref(r_queue_policy))
])


###############################################################################

def get_template_json():
    """Assemble and return JSON format template."""
    return t.to_json()


def main():
    """Entrypoint to use as command."""
    print(get_template_json())


if __name__ == "__main__":
    main()

Reply all
Reply to author
Forward
0 new messages