package main
import (
"bufio"
"compress/gzip"
"flag"
"fmt"
"log"
"os"
"path/filepath"
"strings"
"sync"
)
const (
inputFilePrefix = "subscriber_db_"
)
var (
inputDir string
)
type QuarantineObject struct {
objectType string
id string
}
func init() {
flag.StringVar(&inputDir, "d", "", "Path to the Input folder which is to be analysed")
}
func main() {
var err error
alog.SetLogLevel(alog.TRACE)
flag.Parse()
// Validation of input parameters
if inputDir == "" {
fmt.Fprintf(os.Stderr, "No Input Directory Specified\n")
flag.Usage()
os.Exit(1)
}
// Is the input directory valid ?
if _, err := os.Stat(inputDir); os.IsNotExist(err) {
fmt.Fprintf(os.Stderr, "Input Directory %s is Invalid\n", inputDir)
flag.Usage()
os.Exit(1)
}
// Determine all subscriber files by matching on the subscriber files prefix
inputFileNames, err := filepath.Glob(fmt.Sprintf("%s/%s*.log.gz", inputDir, inputFilePrefix))
if err != nil {
fmt.Fprintf(os.Stderr, "Error listing files : %v\n", err)
os.Exit(1)
}
// Loop through all subscriber files
// Make a goroutine for processing each file
// Create a channel to receive the quarantined objects
qObjChannel := make(chan []QuarantineObject, len(inputFileNames))
//runtime.GOMAXPROCS(len(inputFileNames))
var wg sync.WaitGroup
for _, inputFileGz := range inputFileNames {
wg.Add(1)
go func(inputFileGz string) {
nRecords := 0
qObjList := make([]QuarantineObject, 0, 0)
defer wg.Done()
defer func() {
alog.Trace("Finished Processing File : %s. Total Records Analysed : %d\n", inputFileGz, nRecords)
}()
// Open the file as a GZIP stream
alog.Trace("==================================================================================================================================")
alog.Trace("Processing Input File : %s", inputFileGz)
alog.Trace("==================================================================================================================================")
f, err := os.Open(inputFileGz)
if err != nil {
fmt.Fprintf(os.Stderr, "Error opening file : %v\n", err)
return
}
defer f.Close()
fgz, err := gzip.NewReader(f)
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating GZIP reader : %v\n", err)
return
}
defer fgz.Close()
scanner := bufio.NewScanner(fgz)
// Iterate over all lines of the file and decode
for scanner.Scan() {
qObject := decodeLine()
if qObject.IsQuarantined() {
qObjList = append(qObjList, qObject)
}
}
///////////////////////////////////////////////////////
// After all lines have been processed, Send to Channel
///////////////////////////////////////////////////////
qObjChannel <- qObjList
}(inputFileGz)
}
fmt.Println("Waiting for processing of all files to finish")
///////////////////////////////////////////////////////
// Closer GoRoutine
///////////////////////////////////////////////////////
go func() {
wg.Wait()
close(qObjChannel)
fmt.Println("Quarantined Objects List")
fmt.Println("------------------------")
}()
qFound := false
for qObjList := range qObjChannel {
for _, qObj := range qObjList {
fmt.Println(qObj.id, "--->", qObj.objectType)
qFound = true