func (storage AWSStorage) List(output chan<- object) error {
prefixChan := make(chan object, cli.Workers*2)
listResultChan := make(chan error)
wg := sync.WaitGroup{}
listObjectsRecursive := func(prefixChan chan object, output chan<- object) {
listObjectsFn := func(p *s3.ListObjectsOutput, lastPage bool) bool {
for _, o := range p.CommonPrefixes {
log.Debugf("Prefix: %s", aws.StringValue(o.Prefix))
prefixChan <- object{Key: aws.StringValue(o.Prefix)}
}
for _, o := range p.Contents {
atomic.AddUint64(&totalObjCnt, 1)
output <- object{Key: aws.StringValue(o.Key), ETag: aws.StringValue(o.ETag), Mtime: aws.TimeValue(o.LastModified)}
}
return true // continue paging
}
for prefix := range prefixChan {
if err := storage.awsSvc.ListObjectsPages(&s3.ListObjectsInput{
Bucket: aws.String(storage.awsBucket),
Prefix: aws.String(prefix.Key),
MaxKeys: aws.Int64(s3keysPerReq),
Delimiter: aws.String("/"),
}, listObjectsFn); err != nil{
listResultChan <- err
}
}
wg.Done()
}
for i := cli.Workers; i != 0; i-- {
wg.Add(1)
go listObjectsRecursive(prefixChan, output)
}
go func() {
wg.Wait()
listResultChan <- nil
}()
// Start listing from storage.prefix
prefixChan <- object{Key: storage.prefix}
select {
case msg := <-listResultChan:
close(output)
return msg
}
}
--
Вы получили это сообщение, поскольку подписаны на группу "Golang Russian".
Чтобы отменить подписку на эту группу и больше не получать от нее сообщения, отправьте письмо на электронный адрес golang-ru+unsubscribe@googlegroups.com.
Чтобы настроить другие параметры, перейдите по ссылке https://groups.google.com/d/optout.
--
Вы получили это сообщение, поскольку подписаны на группу "Golang Russian".
Чтобы отменить подписку на эту группу и больше не получать от нее сообщения, отправьте письмо на электронный адрес golang-ru+...@googlegroups.com.
func (storage AWSStorage) List(output chan<- object) error {
prefixChan := make(chan string, cli.Workers*2)
listResultChan := make(chan error)
wg := sync.WaitGroup{}
listObjectsFn := func(p *s3.ListObjectsOutput, lastPage bool) bool {
for _, o := range p.CommonPrefixes {
wg.Add(1)
prefixChan <- aws.StringValue(o.Prefix)
}
for _, o := range p.Contents {
atomic.AddUint64(&totalObjCnt, 1)
output <- object{Key: aws.StringValue(o.Key), ETag: aws.StringValue(o.ETag), Mtime: aws.TimeValue(o.LastModified)}
}
return true // continue paging
}
listObjectsRecursive := func(prefixChan chan string, output chan<- object) {
for prefix := range prefixChan {
for i := uint(0); i <= cli.Retry; i++ {
err := storage.awsSvc.ListObjectsPages(&s3.ListObjectsInput{
Bucket: aws.String(storage.awsBucket),
Prefix: aws.String(prefix),
MaxKeys: aws.Int64(s3keysPerReq),
Delimiter: aws.String("/"),
}, listObjectsFn)
if (err != nil) && (i == cli.Retry) {
wg.Done()
listResultChan <- err
break
} else if err == nil {
wg.Done()
break
} else {
log.Debugf("S3 listing failed with error: %s", err)
}
}
}
}
for i := cli.Workers; i != 0; i-- {
go listObjectsRecursive(prefixChan, output)
}
go func() {
wg.Wait()
close(prefixChan)
listResultChan <- nil
}()
// Start listing from storage.prefix
wg.Add(1)
prefixChan <- storage.prefix
select {
case msg := <-listResultChan:
close(output)
return msg
}
}