Закрытие канала в рекурсивной функции

33 views
Skip to first unread message

Владимир Буянов

unread,
Apr 2, 2018, 6:14:56 PM4/2/18
to Golang Russian
Здравствуйте. 
Есть небольшая проблема, никак не соображу как ее правильно решить.
Есть вот такой кусок кода. Запускается некоторое кол-во гороутин с воркерами, которые рекурсивно обходят директории в S3. При этом они читают директории из канала prefixChan и пушат в него же найденные директории.
Работает хорошо, за исключением того, что цикл чтения из канала prefixChan никогда не заканчивается. Какой тру-вей для подобных кейсов, как правильно определить момент, когда все воркеры закончили обходить директории?
Спасибо.

func (storage AWSStorage) List(output chan<- object) error {
   prefixChan
:= make(chan object, cli.Workers*2)
   listResultChan
:= make(chan error)
   wg
:= sync.WaitGroup{}

   listObjectsRecursive
:= func(prefixChan chan object, output chan<- object) {
      listObjectsFn
:= func(p *s3.ListObjectsOutput, lastPage bool) bool {
         
for _, o := range p.CommonPrefixes {
            log
.Debugf("Prefix: %s", aws.StringValue(o.Prefix))
            prefixChan
<- object{Key: aws.StringValue(o.Prefix)}
         
}
         
for _, o := range p.Contents {
            atomic
.AddUint64(&totalObjCnt, 1)
            output
<- object{Key: aws.StringValue(o.Key), ETag: aws.StringValue(o.ETag), Mtime: aws.TimeValue(o.LastModified)}


         }
         
return true // continue paging
      }

     
for prefix := range prefixChan {
         
if err := storage.awsSvc.ListObjectsPages(&s3.ListObjectsInput{
           
Bucket:    aws.String(storage.awsBucket),
            Prefix:    aws.String(prefix.Key),
            MaxKeys:   aws.Int64(s3keysPerReq),
            Delimiter: aws.String("/"),
         }, listObjectsFn); err != nil{
            listResultChan
<- err
         
}
     
}
      wg
.Done()
   
}

   
for i := cli.Workers; i != 0; i-- {
      wg
.Add(1)
     
go listObjectsRecursive(prefixChan, output)
   
}

   
go func() {
      wg
.Wait()
      listResultChan
<- nil
   
}()

   
// Start listing from storage.prefix
   prefixChan <- object{Key: storage.prefix}

   
select {
   
case msg := <-listResultChan:
      close
(output)
     
return msg
   
}
}



Aln Kapa

unread,
Apr 3, 2018, 2:12:24 AM4/3/18
to gola...@googlegroups.com
Закройте канал как только он вам больше не нужен.


3 апреля 2018 г., 0:55 пользователь Владимир Буянов <kafa...@gmail.com> написал:

--
Вы получили это сообщение, поскольку подписаны на группу "Golang Russian".
Чтобы отменить подписку на эту группу и больше не получать от нее сообщения, отправьте письмо на электронный адрес golang-ru+unsubscribe@googlegroups.com.
Чтобы настроить другие параметры, перейдите по ссылке https://groups.google.com/d/optout.

Alex Lurye

unread,
Apr 3, 2018, 3:10:09 AM4/3/18
to gola...@googlegroups.com
У вас в коде нет единой точки, где можно было бы принять решение, когда останавливаться. 

Я бы переделал так: есть основная программа и есть пул рабочих горутин. Коммуникация с горутинами идёт через два канала - в один пишутся задания, а в другой горутины выдают ответы - ровно один ответ на один запрос. Ответ может быть или ошибкой, или списком поддиректории.

Основная программа ведёт счётчик, сколько сейчас директорий в работе. Она отдает первую директорию в канал и читает ответы из канала. Если ответ содержит поддиректории, то она создаёт новые задания. Как только счётчик обнулится, значит все - обход закончен и можно выходить.

--
Вы получили это сообщение, поскольку подписаны на группу "Golang Russian".
Чтобы отменить подписку на эту группу и больше не получать от нее сообщения, отправьте письмо на электронный адрес golang-ru+...@googlegroups.com.

Владимир Буянов

unread,
Apr 3, 2018, 3:21:27 AM4/3/18
to Golang Russian
Спасибо. Примерно так я и сделал. Осталось только сделать так, что бы все воркеры останавливались при получении сообщения об ошибке от одного из них.

func (storage AWSStorage) List(output chan<- object) error {

   prefixChan
:= make(chan string, cli.Workers*2)

   listResultChan
:= make(chan error)
   wg
:= sync.WaitGroup{}


   listObjectsFn
:= func(p *s3.ListObjectsOutput, lastPage bool) bool {

     
for _, o := range p.CommonPrefixes {

         wg
.Add(1)
         prefixChan
<- aws.StringValue(o.Prefix)

     
}
     
for _, o := range p.Contents {
         atomic
.AddUint64(&totalObjCnt, 1)
         output
<- object{Key: aws.StringValue(o.Key), ETag: aws.StringValue(o.ETag), Mtime: aws.TimeValue(o.LastModified)}
     
}
     
return true // continue paging
   }


   listObjectsRecursive
:= func(prefixChan chan string, output chan<- object) {

     
for prefix := range prefixChan {

         
for i := uint(0); i <= cli.Retry; i++ {

            err
:= storage.awsSvc.ListObjectsPages(&s3.ListObjectsInput{
               
Bucket:    aws.String(storage.awsBucket),
               Prefix:    aws.String(prefix),
               MaxKeys:   aws.Int64(s3keysPerReq),
               Delimiter: aws.String("/"),
            }, listObjectsFn)

           
if (err != nil) && (i == cli.Retry) {
               wg
.Done()
               listResultChan
<- err
               
break
            } else if err == nil {
               wg
.Done()
               
break
            } else {
               log
.Debugf("S3 listing failed with error: %s", err)

           
}
         
}
     
}
   
}

   
for i := cli.Workers; i != 0; i-- {

     
go listObjectsRecursive(prefixChan, output)
   
}

   
go func() {
      wg
.Wait()

      close
(prefixChan)

      listResultChan
<- nil
   
}()

   
// Start listing from storage.prefix
   wg.Add(1)
   prefixChan
<- storage.prefix

   
select {

   
case msg := <-listResultChan:
      close
(output)
     
return msg
   
}
}



вторник, 3 апреля 2018 г., 9:10:09 UTC+2 пользователь Alex Lurye написал:
Reply all
Reply to author
Forward
0 new messages