go scheduler tracing

105 views
Skip to first unread message

envee

unread,
Jun 15, 2020, 9:29:06 AM6/15/20
to golang-nuts
I am running a program which reads multiple gzipped input files and performs some processing on each line of the file. 
It creates 8 goroutines (1 per input file which is to be processed. the number of such files can be thought to remain 8 at the max).
Each of the go routines send to a buffered channel after finishing processing of their respective file.
After creating the go routines, the program waits (using WaitGroup) for all go routines to finish and also drain the channel for all the values sent by the go routines.

I have an 4 core CPU with 2 threads per core = 8 logical cores.

But I set GOMAXPROCS=4

When I run the program with scheduler trace interval set to 1000ms, I can see the following :

SCHED 1001ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 idlethreads=0 runqueue=0 [0 0 0 1]
SCHED 2008ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 idlethreads=1 runqueue=0 [1 0 5 0]
SCHED 3015ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 idlethreads=1 runqueue=1 [0 0 1 0]
SCHED 4022ms: gomaxprocs=4 idleprocs=0 threads=9 spinningthreads=0 idlethreads=2 runqueue=0 [0 0 0 0]
SCHED 5029ms: gomaxprocs=4 idleprocs=0 threads=9 spinningthreads=0 idlethreads=2 runqueue=1 [0 0 0 4]


If I create 8 go routines, shouldn't they all be distributed equally among the 4 logical cores ?

Why do some runqueues of the logical cores show values of 4 or 5 and some have values of 0 ?

I was hoping to see something like which I according to my understanding means that all 4 processors have 1 go routine each waiting in the local runqueue and at the same time has 1 go routine running on the assigned OS Thread :

SCHED 1001ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 idlethreads=0 runqueue=0 [1 1 1 1]

Thanks.

envee

unread,
Jun 17, 2020, 5:06:06 PM6/17/20
to golang-nuts
Hi, Is anyone able to help me here ?
Here is a (simplified) snippet of the code, in case it helps answering my query. I basically create a goroutine for every input file (assume max 8) and then wait for processing of all files to finish. Each goroutine processes a line within the file and then any records which match a certain criteria are appended to a slice. After all lines have been processed in a file, the list is Sent to a channel. Finally, in the Closer goroutine, I wait for all goroutines to finish and close the channel once all goroutines have finished :

package main

import (
"bufio"
"compress/gzip"
"flag"
"fmt"
"log"
"os"
"path/filepath"
"strings"
"sync"
)

const (
inputFilePrefix = "subscriber_db_"
)

var (
inputDir              string
)

type QuarantineObject struct {
objectType string
id         string
}

func init() {
flag.StringVar(&inputDir, "d", "", "Path to the Input folder which is to be analysed")
}

func main() {

var err error
alog.SetLogLevel(alog.TRACE)
flag.Parse()

// Validation of input parameters
if inputDir == "" {
fmt.Fprintf(os.Stderr, "No Input Directory Specified\n")
flag.Usage()
os.Exit(1)
}

// Is the input directory valid ?
if _, err := os.Stat(inputDir); os.IsNotExist(err) {
fmt.Fprintf(os.Stderr, "Input Directory %s is Invalid\n", inputDir)
flag.Usage()
os.Exit(1)
}

// Determine all subscriber files by matching on the subscriber files prefix

inputFileNames, err := filepath.Glob(fmt.Sprintf("%s/%s*.log.gz", inputDir, inputFilePrefix))
if err != nil {
fmt.Fprintf(os.Stderr, "Error listing files : %v\n", err)
os.Exit(1)
}

// Loop through all subscriber files
// Make a goroutine for processing each file
// Create a channel to receive the quarantined objects
qObjChannel := make(chan []QuarantineObject, len(inputFileNames))

//runtime.GOMAXPROCS(len(inputFileNames))
var wg sync.WaitGroup
for _, inputFileGz := range inputFileNames {
wg.Add(1)
go func(inputFileGz string) {
nRecords := 0

qObjList := make([]QuarantineObject, 0, 0)
defer wg.Done()
defer func() {
alog.Trace("Finished Processing File : %s. Total Records Analysed : %d\n", inputFileGz, nRecords)
}()
// Open the file as a GZIP stream
alog.Trace("==================================================================================================================================")
alog.Trace("Processing Input File : %s", inputFileGz)
alog.Trace("==================================================================================================================================")

f, err := os.Open(inputFileGz)
if err != nil {
fmt.Fprintf(os.Stderr, "Error opening file : %v\n", err)
return
}
defer f.Close()

fgz, err := gzip.NewReader(f)
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating GZIP reader : %v\n", err)
return
}
defer fgz.Close()

scanner := bufio.NewScanner(fgz)

// Iterate over all lines of the file and decode

for scanner.Scan() {
qObject := decodeLine()
if qObject.IsQuarantined() {
qObjList = append(qObjList, qObject)
}
}
///////////////////////////////////////////////////////
// After all lines have been processed, Send to Channel
///////////////////////////////////////////////////////
qObjChannel <- qObjList
}(inputFileGz)

}

fmt.Println("Waiting for processing of all files to finish")
///////////////////////////////////////////////////////
// Closer GoRoutine
///////////////////////////////////////////////////////
go func() {
wg.Wait()
close(qObjChannel)
fmt.Println("Quarantined Objects List")
fmt.Println("------------------------")
}()

qFound := false

for qObjList := range qObjChannel {
for _, qObj := range qObjList {
fmt.Println(qObj.id, "--->", qObj.objectType)
qFound = true

Robert Engels

unread,
Jun 17, 2020, 5:24:40 PM6/17/20
to envee, golang-nuts
What is the question?
--
You received this message because you are subscribed to the Google Groups "golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/golang-nuts/36efa087-d66c-4d7e-b5b2-de1d4d3ea339o%40googlegroups.com.

envee

unread,
Jun 17, 2020, 7:40:19 PM6/17/20
to golang-nuts
Hi Robert, It is in my first post in this thread. Basically, I want to know why all my logical processors are not being used  in my program. Thanks.
To unsubscribe from this group and stop receiving emails from it, send an email to golan...@googlegroups.com.

Ian Lance Taylor

unread,
Jun 17, 2020, 8:02:25 PM6/17/20
to envee, golang-nuts
On Wed, Jun 17, 2020 at 4:40 PM envee <neeraj....@gmail.com> wrote:
>
> Hi Robert, It is in my first post in this thread. Basically, I want to know why all my logical processors are not being used in my program. Thanks.

New goroutines are added to the run queue for the P that creates them.
When a P has nothing to do, it will steal goroutines from the run
queue of other P's. The run queue length doesn't necessarily indicate
about whether P's are running them; it just tells you something about
which P's are creating new goroutines.

Ian
> To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts...@googlegroups.com.
> To view this discussion on the web visit https://groups.google.com/d/msgid/golang-nuts/e297ab2e-898a-4f95-b923-e34859ebcbeao%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages