[boto] r1335 committed - Incorporating patch from support@gimmesomecandy.com which fixes issue ...

2 views
Skip to first unread message

codesite...@google.com

unread,
Oct 25, 2009, 4:27:41 PM10/25/09
to boto-...@googlegroups.com
Revision: 1335
Author: Mitch.Garnaat
Date: Sun Oct 25 13:26:49 2009
Log: Incorporating patch from sup...@gimmesomecandy.com which fixes issue
285.
http://code.google.com/p/boto/source/detail?r=1335

Added:
/trunk/boto/tests/devpay_s3.py
Modified:
/trunk/boto/s3/bucket.py
/trunk/boto/s3/bucketlistresultset.py
/trunk/boto/s3/connection.py
/trunk/boto/s3/key.py
/trunk/boto/tests/test_s3connection.py

=======================================
--- /dev/null
+++ /trunk/boto/tests/devpay_s3.py Sun Oct 25 13:26:49 2009
@@ -0,0 +1,177 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/
+#
+# Permission is hereby granted, free of charge, to any person obtaining a
+# copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish, dis-
+# tribute, sublicense, and/or sell copies of the Software, and to permit
+# persons to whom the Software is furnished to do so, subject to the fol-
+# lowing conditions:
+#
+# The above copyright notice and this permission notice shall be included
+# in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
+# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+
+"""
+Some unit tests for the S3Connection
+"""
+
+import time
+import os
+import urllib
+
+from boto.s3.connection import S3Connection
+from boto.exception import S3PermissionsError
+
+# this test requires a devpay product and user token to run:
+
+AMAZON_USER_TOKEN = '{UserToken}...your token here...'
+DEVPAY_HEADERS = { 'x-amz-security-token': AMAZON_USER_TOKEN }
+
+print '--- running S3Connection tests (DevPay) ---'
+c = S3Connection()
+# create a new, empty bucket
+bucket_name = 'test-%d' % int(time.time())
+bucket = c.create_bucket(bucket_name, headers=DEVPAY_HEADERS)
+# now try a get_bucket call and see if it's really there
+bucket = c.get_bucket(bucket_name, headers=DEVPAY_HEADERS)
+# test logging
+logging_bucket = c.create_bucket(bucket_name + '-log',
headers=DEVPAY_HEADERS)
+logging_bucket.set_as_logging_target(headers=DEVPAY_HEADERS)
+bucket.enable_logging(target_bucket=logging_bucket,
target_prefix=bucket.name, headers=DEVPAY_HEADERS)
+bucket.disable_logging(headers=DEVPAY_HEADERS)
+c.delete_bucket(logging_bucket, headers=DEVPAY_HEADERS)
+# create a new key and store it's content from a string
+k = bucket.new_key()
+k.name = 'foobar'
+s1 = 'This is a test of file upload and download'
+s2 = 'This is a second string to test file upload and download'
+k.set_contents_from_string(s1, headers=DEVPAY_HEADERS)
+fp = open('foobar', 'wb')
+# now get the contents from s3 to a local file
+k.get_contents_to_file(fp, headers=DEVPAY_HEADERS)
+fp.close()
+fp = open('foobar')
+# check to make sure content read from s3 is identical to original
+assert s1 == fp.read(), 'corrupted file'
+fp.close()
+# test generated URLs
+url = k.generate_url(3600, headers=DEVPAY_HEADERS)
+file = urllib.urlopen(url)
+assert s1 == file.read(), 'invalid URL %s' % url
+url = k.generate_url(3600, force_http=True, headers=DEVPAY_HEADERS)
+file = urllib.urlopen(url)
+assert s1 == file.read(), 'invalid URL %s' % url
+bucket.delete_key(k, headers=DEVPAY_HEADERS)
+# test a few variations on get_all_keys - first load some data
+# for the first one, let's override the content type
+phony_mimetype = 'application/x-boto-test'
+headers = {'Content-Type': phony_mimetype}
+headers.update(DEVPAY_HEADERS)
+k.name = 'foo/bar'
+k.set_contents_from_string(s1, headers)
+k.name = 'foo/bas'
+k.set_contents_from_filename('foobar', headers=DEVPAY_HEADERS)
+k.name = 'foo/bat'
+k.set_contents_from_string(s1, headers=DEVPAY_HEADERS)
+k.name = 'fie/bar'
+k.set_contents_from_string(s1, headers=DEVPAY_HEADERS)
+k.name = 'fie/bas'
+k.set_contents_from_string(s1, headers=DEVPAY_HEADERS)
+k.name = 'fie/bat'
+k.set_contents_from_string(s1, headers=DEVPAY_HEADERS)
+# try resetting the contents to another value
+md5 = k.md5
+k.set_contents_from_string(s2, headers=DEVPAY_HEADERS)
+assert k.md5 != md5
+os.unlink('foobar')
+all = bucket.get_all_keys(headers=DEVPAY_HEADERS)
+assert len(all) == 6
+rs = bucket.get_all_keys(prefix='foo', headers=DEVPAY_HEADERS)
+assert len(rs) == 3
+rs = bucket.get_all_keys(prefix='', delimiter='/', headers=DEVPAY_HEADERS)
+assert len(rs) == 2
+rs = bucket.get_all_keys(maxkeys=5, headers=DEVPAY_HEADERS)
+assert len(rs) == 5
+# test the lookup method
+k = bucket.lookup('foo/bar', headers=DEVPAY_HEADERS)
+assert isinstance(k, bucket.key_class)
+assert k.content_type == phony_mimetype
+k = bucket.lookup('notthere', headers=DEVPAY_HEADERS)
+assert k == None
+# try some metadata stuff
+k = bucket.new_key()
+k.name = 'has_metadata'
+mdkey1 = 'meta1'
+mdval1 = 'This is the first metadata value'
+k.set_metadata(mdkey1, mdval1)
+mdkey2 = 'meta2'
+mdval2 = 'This is the second metadata value'
+k.set_metadata(mdkey2, mdval2)
+k.set_contents_from_string(s1, headers=DEVPAY_HEADERS)
+k = bucket.lookup('has_metadata', headers=DEVPAY_HEADERS)
+assert k.get_metadata(mdkey1) == mdval1
+assert k.get_metadata(mdkey2) == mdval2
+k = bucket.new_key()
+k.name = 'has_metadata'
+k.get_contents_as_string(headers=DEVPAY_HEADERS)
+assert k.get_metadata(mdkey1) == mdval1
+assert k.get_metadata(mdkey2) == mdval2
+bucket.delete_key(k, headers=DEVPAY_HEADERS)
+# test list and iterator
+rs1 = bucket.list(headers=DEVPAY_HEADERS)
+num_iter = 0
+for r in rs1:
+ num_iter = num_iter + 1
+rs = bucket.get_all_keys(headers=DEVPAY_HEADERS)
+num_keys = len(rs)
+assert num_iter == num_keys
+# try a key with a funny character
+k = bucket.new_key()
+k.name = 'testnewline\n'
+k.set_contents_from_string('This is a test', headers=DEVPAY_HEADERS)
+rs = bucket.get_all_keys(headers=DEVPAY_HEADERS)
+assert len(rs) == num_keys + 1
+bucket.delete_key(k, headers=DEVPAY_HEADERS)
+rs = bucket.get_all_keys(headers=DEVPAY_HEADERS)
+assert len(rs) == num_keys
+# try some acl stuff
+bucket.set_acl('public-read', headers=DEVPAY_HEADERS)
+policy = bucket.get_acl(headers=DEVPAY_HEADERS)
+assert len(policy.acl.grants) == 2
+bucket.set_acl('private', headers=DEVPAY_HEADERS)
+policy = bucket.get_acl(headers=DEVPAY_HEADERS)
+assert len(policy.acl.grants) == 1
+k = bucket.lookup('foo/bar', headers=DEVPAY_HEADERS)
+k.set_acl('public-read', headers=DEVPAY_HEADERS)
+policy = k.get_acl(headers=DEVPAY_HEADERS)
+assert len(policy.acl.grants) == 2
+k.set_acl('private', headers=DEVPAY_HEADERS)
+policy = k.get_acl(headers=DEVPAY_HEADERS)
+assert len(policy.acl.grants) == 1
+# try the convenience methods for grants
+# this doesn't work with devpay
+#bucket.add_user_grant('FULL_CONTROL',
+# 'c1e724fbfa0979a4448393c59a8c055011f739b6d102fb37a65f26414653cd67',
+# headers=DEVPAY_HEADERS)
+try:
+ bucket.add_email_grant('foobar', 'f...@bar.com', headers=DEVPAY_HEADERS)
+except S3PermissionsError:
+ pass
+# now delete all keys in bucket
+for k in all:
+ bucket.delete_key(k, headers=DEVPAY_HEADERS)
+# now delete bucket
+
+c.delete_bucket(bucket, headers=DEVPAY_HEADERS)
+
+print '--- tests completed ---'
=======================================
--- /trunk/boto/s3/bucket.py Tue Oct 6 05:52:53 2009
+++ /trunk/boto/s3/bucket.py Sun Oct 25 13:26:49 2009
@@ -94,7 +94,7 @@
"""
self.key_class = key_class

- def lookup(self, key_name):
+ def lookup(self, key_name, headers=None):
"""
Deprecated: Please use get_key method.

@@ -104,9 +104,9 @@
:rtype: :class:`boto.s3.key.Key`
:returns: A Key object from this bucket.
"""
- return self.get_key(key_name)
-
- def get_key(self, key_name):
+ return self.get_key(key_name, headers=headers)
+
+ def get_key(self, key_name, headers=None):
"""
Check to see if a particular key exists within the bucket. This
method uses a HEAD request to check for the existance of the key.
@@ -118,7 +118,7 @@
:rtype: :class:`boto.s3.key.Key`
:returns: A Key object from this bucket.
"""
- response = self.connection.make_request('HEAD', self.name,
key_name)
+ response = self.connection.make_request('HEAD', self.name,
key_name, headers=headers)
if response.status == 200:
body = response.read()
k = self.key_class(self)
@@ -137,7 +137,7 @@
else:
raise S3ResponseError(response.status, response.reason, '')

- def list(self, prefix='', delimiter='', marker=''):
+ def list(self, prefix='', delimiter='', marker='', headers=None):
"""
List key objects within a bucket. This returns an instance of an
BucketListResultSet that automatically handles all of the result
@@ -164,7 +164,7 @@
:rtype: :class:`boto.s3.bucketlistresultset.BucketListResultSet`
:return: an instance of a BucketListResultSet that handles paging,
etc
"""
- return BucketListResultSet(self, prefix, delimiter, marker)
+ return BucketListResultSet(self, prefix, delimiter, marker,
headers)

def get_all_keys(self, headers=None, **params):
"""
@@ -229,14 +229,14 @@
return self.connection.generate_url(expires_in, method, self.name,
headers=headers,
force_http=force_http)

- def delete_key(self, key_name):
+ def delete_key(self, key_name, headers=None):
"""
Deletes a key from the bucket.

:type key_name: string
:param key_name: The key name to delete
"""
- response = self.connection.make_request('DELETE', self.name,
key_name)
+ response = self.connection.make_request('DELETE', self.name,
key_name, headers=headers)
body = response.read()
if response.status != 204:
raise S3ResponseError(response.status, response.reason, body)
@@ -284,38 +284,44 @@
else:
raise S3ResponseError(response.status, response.reason, body)

- def set_canned_acl(self, acl_str, key_name=''):
+ def set_canned_acl(self, acl_str, key_name='', headers=None):
assert acl_str in CannedACLStrings
+
+ if headers:
+ headers['x-amz-acl'] = acl_str
+ else:
+ headers={'x-amz-acl': acl_str}
+
response = self.connection.make_request('PUT', self.name, key_name,
- headers={'x-amz-acl': acl_str}, query_args='acl')
+ headers=headers, query_args='acl')
body = response.read()
if response.status != 200:
raise S3ResponseError(response.status, response.reason, body)

- def get_xml_acl(self, key_name=''):
+ def get_xml_acl(self, key_name='', headers=None):
response = self.connection.make_request('GET', self.name, key_name,
- query_args='acl')
+ query_args='acl',
headers=headers)
body = response.read()
if response.status != 200:
raise S3ResponseError(response.status, response.reason, body)
return body

- def set_xml_acl(self, acl_str, key_name=''):
+ def set_xml_acl(self, acl_str, key_name='', headers=None):
response = self.connection.make_request('PUT', self.name, key_name,
- data=acl_str, query_args='acl')
+ data=acl_str, query_args='acl', headers=headers)
body = response.read()
if response.status != 200:
raise S3ResponseError(response.status, response.reason, body)

- def set_acl(self, acl_or_str, key_name=''):
+ def set_acl(self, acl_or_str, key_name='', headers=None):
if isinstance(acl_or_str, Policy):
- self.set_xml_acl(acl_or_str.to_xml(), key_name)
+ self.set_xml_acl(acl_or_str.to_xml(), key_name,
headers=headers)
else:
- self.set_canned_acl(acl_or_str, key_name)
-
- def get_acl(self, key_name=''):
+ self.set_canned_acl(acl_or_str, key_name, headers=headers)
+
+ def get_acl(self, key_name='', headers=None):
response = self.connection.make_request('GET', self.name, key_name,
- query_args='acl')
+ query_args='acl', headers=headers)
body = response.read()
if response.status == 200:
policy = Policy(self)
@@ -325,13 +331,13 @@
else:
raise S3ResponseError(response.status, response.reason, body)

- def make_public(self, recursive=False):
- self.set_canned_acl('public-read')
+ def make_public(self, recursive=False, headers=None):
+ self.set_canned_acl('public-read', headers=headers)
if recursive:
for key in self:
- self.set_canned_acl('public-read', key.name)
-
- def add_email_grant(self, permission, email_address, recursive=False):
+ self.set_canned_acl('public-read', key.name,
headers=headers)
+
+ def add_email_grant(self, permission, email_address, recursive=False,
headers=None):
"""
Convenience method that provides a quick way to add an email grant
to a bucket.
This method retrieves the current ACL, creates a new grant based
on the parameters
@@ -354,14 +360,14 @@
"""
if permission not in S3Permissions:
raise S3PermissionsError('Unknown Permission: %s' % permission)
- policy = self.get_acl()
- policy.acl.add_email_grant(permission, email_address)
- self.set_acl(policy)
+ policy = self.get_acl(headers=headers)
+ policy.acl.add_email_grant(permission, email_address,
headers=headers)
+ self.set_acl(policy, headers=headers)
if recursive:
for key in self:
- key.add_email_grant(permission, email_address)
-
- def add_user_grant(self, permission, user_id, recursive=False):
+ key.add_email_grant(permission, email_address,
headers=headers)
+
+ def add_user_grant(self, permission, user_id, recursive=False,
headers=None):
"""
Convenience method that provides a quick way to add a canonical
user grant to a bucket.
This method retrieves the current ACL, creates a new grant based
on the parameters
@@ -386,15 +392,15 @@
"""
if permission not in S3Permissions:
raise S3PermissionsError('Unknown Permission: %s' % permission)
- policy = self.get_acl()
+ policy = self.get_acl(headers=headers)
policy.acl.add_user_grant(permission, user_id)
- self.set_acl(policy)
+ self.set_acl(policy, headers=headers)
if recursive:
for key in self:
- key.add_user_grant(permission, user_id)
-
- def list_grants(self):
- policy = self.get_acl()
+ key.add_user_grant(permission, user_id, headers=headers)
+
+ def list_grants(self, headers=None):
+ policy = self.get_acl(headers=headers)
return policy.acl.grants

def get_location(self):
@@ -416,73 +422,73 @@
else:
raise S3ResponseError(response.status, response.reason, body)

- def enable_logging(self, target_bucket, target_prefix=''):
+ def enable_logging(self, target_bucket, target_prefix='',
headers=None):
if isinstance(target_bucket, Bucket):
target_bucket = target_bucket.name
body = self.BucketLoggingBody % (target_bucket, target_prefix)
response = self.connection.make_request('PUT', self.name,
data=body,
- query_args='logging')
+ query_args='logging', headers=headers)
body = response.read()
if response.status == 200:
return True
else:
raise S3ResponseError(response.status, response.reason, body)

- def disable_logging(self):
+ def disable_logging(self, headers=None):
body = self.EmptyBucketLoggingBody
response = self.connection.make_request('PUT', self.name,
data=body,
- query_args='logging')
+ query_args='logging', headers=headers)
body = response.read()
if response.status == 200:
return True
else:
raise S3ResponseError(response.status, response.reason, body)

- def get_logging_status(self):
+ def get_logging_status(self, headers=None):
response = self.connection.make_request('GET', self.name,
- query_args='logging')
+ query_args='logging', headers=headers)
body = response.read()
if response.status == 200:
return body
else:
raise S3ResponseError(response.status, response.reason, body)

- def set_as_logging_target(self):
- policy = self.get_acl()
+ def set_as_logging_target(self, headers=None):
+ policy = self.get_acl(headers=headers)
g1 = Grant(permission='WRITE', type='Group', uri=self.LoggingGroup)
g2 = Grant(permission='READ_ACP', type='Group',
uri=self.LoggingGroup)
policy.acl.add_grant(g1)
policy.acl.add_grant(g2)
- self.set_acl(policy)
-
- def disable_logging(self):
+ self.set_acl(policy, headers=headers)
+
+ def disable_logging(self, headers=None):
body = self.EmptyBucketLoggingBody
response = self.connection.make_request('PUT', self.name,
data=body,
- query_args='logging')
+ query_args='logging', headers=headers)
body = response.read()
if response.status == 200:
return True
else:
raise S3ResponseError(response.status, response.reason, body)

- def get_request_payment(self):
+ def get_request_payment(self, headers=None):
response = self.connection.make_request('GET', self.name,
- query_args='requestPayment')
+ query_args='requestPayment', headers=headers)
body = response.read()
if response.status == 200:
return body
else:
raise S3ResponseError(response.status, response.reason, body)

- def set_request_payment(self, payer='BucketOwner'):
+ def set_request_payment(self, payer='BucketOwner', headers=None):
body = self.BucketPaymentBody % payer
response = self.connection.make_request('PUT', self.name,
data=body,
- query_args='requestPayment')
+ query_args='requestPayment', headers=headers)
body = response.read()
if response.status == 200:
return True
else:
raise S3ResponseError(response.status, response.reason, body)

- def delete(self):
- return self.connection.delete_bucket(self.name)
+ def delete(self, headers=None):
+ return self.connection.delete_bucket(self.name, headers=headers)
=======================================
--- /trunk/boto/s3/bucketlistresultset.py Thu Sep 11 10:38:22 2008
+++ /trunk/boto/s3/bucketlistresultset.py Sun Oct 25 13:26:49 2009
@@ -19,7 +19,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.

-def bucket_lister(bucket, prefix='', delimiter='', marker=''):
+def bucket_lister(bucket, prefix='', delimiter='', marker='',
headers=None):
"""
A generator function for listing keys in a bucket.
"""
@@ -27,7 +27,7 @@
k = None
while more_results:
rs = bucket.get_all_keys(prefix=prefix, marker=marker,
- delimiter=delimiter)
+ delimiter=delimiter, headers=headers)
for k in rs:
yield k
if k:
@@ -43,14 +43,15 @@
keys in a reasonably efficient manner.
"""

- def __init__(self, bucket=None, prefix='', delimiter='', marker=''):
+ def __init__(self, bucket=None, prefix='', delimiter='', marker='',
headers=None):
self.bucket = bucket
self.prefix = prefix
self.delimiter = delimiter
self.marker = marker
+ self.headers = headers

def __iter__(self):
return bucket_lister(self.bucket, prefix=self.prefix,
- delimiter=self.delimiter, marker=self.marker)
+ delimiter=self.delimiter, marker=self.marker,
headers=self.headers)


=======================================
--- /trunk/boto/s3/connection.py Sun Sep 20 18:05:11 2009
+++ /trunk/boto/s3/connection.py Sun Oct 25 13:26:49 2009
@@ -232,6 +232,8 @@
if query_auth:
query_part = '?' + self.QueryString % (encoded_canonical,
expires,
self.aws_access_key_id)
+ if 'x-amz-security-token' in headers:
+ query_part += '&x-amz-security-token=%s' %
urllib.quote(headers['x-amz-security-token']);
else:
query_part = ''
if force_http:
@@ -243,17 +245,17 @@
return self.calling_format.build_url_base(protocol,
self.server_name(port),
bucket, key) + query_part

- def get_all_buckets(self):
+ def get_all_buckets(self, headers=None):
response = self.make_request('GET')
body = response.read()
if response.status > 300:
- raise S3ResponseError(response.status, response.reason, body)
+ raise S3ResponseError(response.status, response.reason, body,
headers=headers)
rs = ResultSet([('Bucket', Bucket)])
h = handler.XmlHandler(rs, self)
xml.sax.parseString(body, h)
return rs

- def get_canonical_user_id(self):
+ def get_canonical_user_id(self, headers=None):
"""
Convenience method that returns the "CanonicalUserID" of the user
who's credentials
are associated with the connection. The only way to get this
value is to do a GET
@@ -264,18 +266,18 @@
:rtype: string
:return: A string containing the canonical user id.
"""
- rs = self.get_all_buckets()
+ rs = self.get_all_buckets(headers=headers)
return rs.ID

- def get_bucket(self, bucket_name, validate=True):
+ def get_bucket(self, bucket_name, validate=True, headers=None):
bucket = Bucket(self, bucket_name)
if validate:
- rs = bucket.get_all_keys(None, maxkeys=0)
+ rs = bucket.get_all_keys(headers, maxkeys=0)
return bucket

- def lookup(self, bucket_name, validate=True):
+ def lookup(self, bucket_name, validate=True, headers=None):
try:
- bucket = self.get_bucket(bucket_name, validate)
+ bucket = self.get_bucket(bucket_name, validate,
headers=headers)
except:
bucket = None
return bucket
@@ -318,8 +320,8 @@
else:
raise S3ResponseError(response.status, response.reason, body)

- def delete_bucket(self, bucket):
- response = self.make_request('DELETE', bucket)
+ def delete_bucket(self, bucket, headers=None):
+ response = self.make_request('DELETE', bucket, headers=headers)
body = response.read()
if response.status != 204:
raise S3ResponseError(response.status, response.reason, body)
=======================================
--- /trunk/boto/s3/key.py Sun Sep 20 18:05:11 2009
+++ /trunk/boto/s3/key.py Sun Oct 25 13:26:49 2009
@@ -233,25 +233,30 @@
self.metadata.update(d)

# convenience methods for setting/getting ACL
- def set_acl(self, acl_str):
+ def set_acl(self, acl_str, headers=None):
if self.bucket != None:
- self.bucket.set_acl(acl_str, self.name)
-
- def get_acl(self):
+ self.bucket.set_acl(acl_str, self.name, headers=headers)
+
+ def get_acl(self, headers=None):
if self.bucket != None:
- return self.bucket.get_acl(self.name)
-
- def get_xml_acl(self):
+ return self.bucket.get_acl(self.name, headers=headers)
+
+ def get_xml_acl(self, headers=None):
if self.bucket != None:
- return self.bucket.get_xml_acl(self.name)
-
- def set_xml_acl(self, acl_str):
+ return self.bucket.get_xml_acl(self.name, headers=headers)
+
+ def set_xml_acl(self, acl_str, headers=None):
if self.bucket != None:
- return self.bucket.set_xml_acl(acl_str, self.name)
-
- def make_public(self):
+ return self.bucket.set_xml_acl(acl_str, self.name,
headers=headers)
+
+ def make_public(self, headers=None):
+ if headers:
+ headers['x-amz-acl'] = 'public-read'
+ else:
+ headers={'x-amz-acl': 'public-read'}
+
response = self.bucket.connection.make_request('PUT',
self.bucket.name, self.name,
- headers={'x-amz-acl': 'public-read'}, query_args='acl')
+ headers=headers, query_args='acl')
body = response.read()
if response.status != 200:
raise S3ResponseError(response.status, response.reason, body)
=======================================
--- /trunk/boto/tests/test_s3connection.py Sun Dec 9 07:48:41 2007
+++ /trunk/boto/tests/test_s3connection.py Sun Oct 25 13:26:49 2009
@@ -28,6 +28,7 @@
import unittest
import time
import os
+import urllib
from boto.s3.connection import S3Connection
from boto.exception import S3PermissionsError

@@ -41,7 +42,12 @@
bucket = c.create_bucket(bucket_name)
# now try a get_bucket call and see if it's really there
bucket = c.get_bucket(bucket_name)
- # create a new key and store it's content from a string
+ # test logging
+ logging_bucket = c.create_bucket(bucket_name + '-log')
+ logging_bucket.set_as_logging_target()
+ bucket.enable_logging(target_bucket=logging_bucket,
target_prefix=bucket.name)
+ bucket.disable_logging()
+ c.delete_bucket(logging_bucket)
k = bucket.new_key()
k.name = 'foobar'
s1 = 'This is a test of file upload and download'
@@ -55,6 +61,13 @@
# check to make sure content read from s3 is identical to original
assert s1 == fp.read(), 'corrupted file'
fp.close()
+ # test generated URLs
+ url = k.generate_url(3600)
+ file = urllib.urlopen(url)
+ assert s1 == file.read(), 'invalid URL %s' % url
+ url = k.generate_url(3600, force_http=True)
+ file = urllib.urlopen(url)
+ assert s1 == file.read(), 'invalid URL %s' % url
bucket.delete_key(k)
# test a few variations on get_all_keys - first load some data
# for the first one, let's override the content type
@@ -110,9 +123,15 @@
assert k.get_metadata(mdkey1) == mdval1
assert k.get_metadata(mdkey2) == mdval2
bucket.delete_key(k)
- # try a key with a funny character
+ # test list and iterator
+ rs1 = bucket.list()
+ num_iter = 0
+ for r in rs1:
+ num_iter = num_iter + 1
rs = bucket.get_all_keys()
num_keys = len(rs)
+ assert num_iter == num_keys
+ # try a key with a funny character
k = bucket.new_key()
k.name = 'testnewline\n'
k.set_contents_from_string('This is a test')

Reply all
Reply to author
Forward
0 new messages