package main
import (
"flag"
"github.com/Shopify/sarama"
"log"
"os"
"fmt"
"strings"
"github.com/hpcloud/tail"
"github.com/spf13/viper"
//"io/ioutil"
//"reflect"
)
//func produce(producer sarama.SyncProducer, cfg *sarama.Config, brokers *string, topic string, logger *log.Logger, log string, t *tail.Tail){
func produce(cfg *sarama.Config, brokers *string, topic string, logger *log.Logger, log string, t *tail.Tail){
logger.Println("Entering produce")
/*logger.Println(strings.Split(*brokers, ","))
logger.Println(reflect.TypeOf(strings.Split(*brokers, ",")))
logger.Println(log)*/
logger.Printf("sarama.NewSyncProducer")
producer, err := sarama.NewSyncProducer(strings.Split(*brokers, ","), cfg)
if err != nil {
logger.Fatalln(err)
}
defer func() {
if err := producer.Close(); err != nil {
logger.Fatalln(err)
}
}()
/*t, err := tail.TailFile(log, tail.Config{Follow: true, ReOpen: true})
if err != nil {
fmt.Println(fmt.Errorf("Error with tail: %v\n", err.Error()))
}*/
for line := range t.Lines {
//logger.Println(line)
//logger.Println(line.Text)
logger.Printf("ProduceMessage")
msg := &sarama.ProducerMessage{Topic: topic, Value: sarama.StringEncoder(line.Text)}
_, _, err := producer.SendMessage(msg)
if err != nil {
logger.Printf("FAILED to send message: %s\n", err)
}
}
}