TD-agent doesn't recognise a custom plugin

588 views
Skip to first unread message

Vahid Junuzovic

unread,
Mar 22, 2021, 2:04:56 PM3/22/21
to Fluentd Google Group
I've installed td-agent-4.1.0 on RedHat 8.2 box and I'm trying to use custom plugin that is working fine with td-agent-2.3.5 on RedHat 7.x.

The custom plugin Ruby file is in /etc/td-agent/plugin directory, it manages java log output.

When I enable it in /etc/td-agent/td-agent.conf file I'm getting the following error and td-agent is not starting.

2021-03-22 18:25:14 +0100 [error]: fluent/log.rb:371:error: config error file="/etc/td-agent/td-agent.conf" error_class=Fluent::ConfigError error="Unknown input plugin 'taileaiesales'. Run 'gem search -rd fluent-plugin' to find plugins"

PluginCustom plugin: -rwxr-xr-x 1 td-agent td-agent 11375 Mar 22 11:40 /etc/td-agent/plugin/tailEaiSales.rb

/etc/td-agent/td-agent.conf:
<source>
   @type taileaiesales
   path /tmp/tescoll.log
   pos_file /var/log/td-agent/test.pos
   time_key time
   delimiter "|"
   tag mongo.test.testcoll
</source>


<match mongo.test.testcoll>
  @type mongo

  database db
  collection collection

  user user
  password password

  capped
  capped_size 1000000000

  host xxx.xxx.xxx.xxx
  port 27017

  <buffer>
    flush_interval 10s
  </buffer>
  # make sure to include the time key
  <inject>
    time_key time
  </inject>
</match>


What is wrong? Someone could help me ?

Thanks a lot

Kentaro Hayashi

unread,
Mar 23, 2021, 12:54:18 AM3/23/21
to Fluentd Google Group

Hi,

As td-agent 4.1.0 bundles fluentd 1.11.x, so you might need to support your custom plugin for fluentd v1.11.x.


Regards,

2021年3月23日火曜日 3:04:56 UTC+9 vjunu...@gmail.com:

Mr. Fiber

unread,
Mar 23, 2021, 1:25:21 AM3/23/21
to Fluentd Google Group
The problem seems to use wrong file name.
Fluentd plugins have the prefix for each plugin type.


Older fluentd doesn't follow this rule and it has unexpected plugin loading trouble.

--
You received this message because you are subscribed to the Google Groups "Fluentd Google Group" group.
To unsubscribe from this group and stop receiving emails from it, send an email to fluentd+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/fluentd/e10b6297-504e-4269-b77d-5f50fb14c8dbn%40googlegroups.com.

Vahid Junuzovic

unread,
Mar 23, 2021, 7:40:50 AM3/23/21
to Fluentd Google Group
Hi all,
I've already tried to rename it to in_ but result didn't changed.
I tried, I'm really new/beginner  on fluentd/ruby :( , to implement suggestions to upgrade the current plugin but it still give me the same error.
Below attached content of the script if somebody could had a look, lines added or modified comparing the original one by me:
1 require 'fluent/plugin/input' (new)
2 module Fluent::Plugin (previous: module Fluent)
3 Fluent::Plugin.register_input('taileaiesales', self) previous(Plugin.register_input('taileaiesales', self))
4 helpers :compat_parameters (new)
5 compat_parameters_convert(conf, :buffer, :inject, default_chunk_key: 'time') (new)
6      #return Time.now.to_i, h (old commented)
    return Fluent::EventTime.now, h (new)

--- cut here ---
require 'fluent/plugin/input'
require 'socket'
module Fluent::Plugin
class TailEaiSales < Input
  Fluent::Plugin.register_input('taileaiesales', self)
  helpers :compat_parameters

  def initialize
    super
    @paths = []
  end

  config_param :path, :string
  config_param :tag, :string
  config_param :rotate_wait, :time, :default => 5
  config_param :pos_file, :string, :default => nil

  attr_reader :paths

  def configure(conf)
    compat_parameters_convert(conf, :buffer, :inject, default_chunk_key: 'time')
    super

    @paths = @path.split(',').map {|path| path.strip }
    if @paths.empty?
      raise ConfigError, "tail: 'path' parameter is required on tail input"
    end

    if @pos_file
      @pf_file = File.open(@pos_file, File::RDWR|File::CREAT)
      @pf_file.sync = true
      @pf = PositionFile.parse(@pf_file)
    else
      $log.warn "'pos_file PATH' parameter is not set to a 'tail' source."
      $log.warn "this parameter is highly recommended to save the position to resume tailing."
    end

    configure_parser(conf)
  end




 # Override 'configure_parser(conf)' method that initializes the parser.
def configure_parser(conf)
@delim = conf['delimiter']
@time_key = conf['time_key']
end



  def start
    @loop = Coolio::Loop.new
    @tails = @paths.map {|path|
      pe = @pf ? @pf[path] : MemoryPositionEntry.new
      TailWatcher.new(path, @rotate_wait, pe, &method(:receive_lines))
    }
    @tails.each {|tail|
      tail.attach(@loop)
    }
    @thread = Thread.new(&method(:run))
  end

  def shutdown
    @tails.each {|tail|
      tail.close
    }
    @loop.stop
    @thread.join
    @pf_file.close if @pf_file
  end

  def run
    @loop.run
  rescue
    $log.error "unexpected error", :error=>$!.to_s
    $log.error_backtrace
  end

  def receive_lines(lines)
    es = MultiEventStream.new
    lines.each {|line|
      begin
        line.chomp!  # remove \n
        time, record = parse_line(line)
        if time && record
          es.add(time, record)
        end
      rescue
        $log.warn line.dump, :error=>$!.to_s
        $log.debug_backtrace
      end
    }

    unless es.empty?
      begin
        Engine.emit_stream(@tag, es)
      rescue
        # ignore errors. Engine shows logs and backtraces.
      end
    end
  end

  def parse_line(line)
    a= line.split("|")
    h={
      "Market"=>a[4],
      "Dealer"=>a[5],
      "Method"=>a[6],
      "Request"=>a[7],
      "Response"=>a[8],
      "Exception"=>a[9],
      "ErrorCode"=>a[10],
      "Time_call"=>a[0]+" -- "+a[1],
      "Host"=>Socket.gethostname
    }



    #return Time.now.to_i, h
    return Fluent::EventTime.now, h

  end

  class TailWatcher
    def initialize(path, rotate_wait, pe, &receive_lines)
      @path = path
      @rotate_wait = rotate_wait
      @pe = pe || MemoryPositionEntry.new
      @receive_lines = receive_lines

      @rotate_queue = []

      @timer_trigger = TimerWatcher.new(1, true, &method(:on_notify))
      @stat_trigger = StatWatcher.new(path, &method(:on_notify))

      @rotate_handler = RotateHandler.new(path, &method(:on_rotate))
      @io_handler = nil
    end

    def attach(loop)
      @timer_trigger.attach(loop)
      @stat_trigger.attach(loop)
      on_notify
    end

    def detach
      @timer_trigger.detach if @timer_trigger.attached?
      @stat_trigger.detach if @stat_trigger.attached?
    end

    def close
      @rotate_queue.reject! {|req|
        req.io.close
        true
      }
      detach
    end

    def on_notify
      @rotate_handler.on_notify
      return unless @io_handler
      @io_handler.on_notify

      # proceeds rotate queue
      return if @rotate_queue.empty?
      @rotate_queue.first.tick

      while @rotate_queue.first.ready?
        if io = @rotate_queue.first.io
          stat = io.stat
          inode = stat.ino
          if inode == @pe.read_inode
            # rotated file has the same inode number with the last file.
            # assuming following situation:
            #   a) file was once renamed and backed, or
            #   b) symlink or hardlink to the same file is recreated
            # in either case, seek to the saved position
            pos = @pe.read_pos
          else
            pos = io.pos
          end
          @pe.update(inode, pos)
          io_handler = IOHandler.new(io, @pe, &@receive_lines)
        else
          io_handler = NullIOHandler.new
        end
        @io_handler.close
        @io_handler = io_handler
        @rotate_queue.shift
        break if @rotate_queue.empty?
      end
    end

    def on_rotate(io)
      if @io_handler == nil
        if io
          # first time
          stat = io.stat
          fsize = stat.size
          inode = stat.ino
          if inode == @pe.read_inode
            # seek to the saved position
            pos = @pe.read_pos
          else
            # seek to the end of the file.
            # logs never duplicate but may be lost if fluentd is down.
            pos = fsize
            @pe.update(inode, pos)
          end
          io.seek(pos)

          @io_handler = IOHandler.new(io, @pe, &@receive_lines)
        else
          @io_handler = NullIOHandler.new
        end

      else
        if io && @rotate_queue.find {|req| req.io == io }
          return
        end
        last_io = @rotate_queue.empty? ? @io_handler.io : @rotate_queue.last.io
        if last_io == nil
          $log.info "detected rotation of #{@path}"
          # rotate imeediately if previous file is nil
          wait = 0
        else
          $log.info "detected rotation of #{@path}; waiting #{@rotate_wait} seconds"
          wait = @rotate_wait
          wait -= @rotate_queue.first.wait unless @rotate_queue.empty?
        end
        @rotate_queue << RotationRequest.new(io, wait)
      end
    end

    class TimerWatcher < Coolio::TimerWatcher
      def initialize(interval, repeat, &callback)
        @callback = callback
        super(interval, repeat)
      end

      def on_timer
        @callback.call
      rescue
        # TODO log?
        $log.error $!.to_s
        $log.error_backtrace
      end
    end

    class StatWatcher < Coolio::StatWatcher
      def initialize(path, &callback)
        @callback = callback
        super(path)
      end

      def on_change(prev, cur)
        @callback.call
      rescue
        # TODO log?
        $log.error $!.to_s
        $log.error_backtrace
      end
    end

    class RotationRequest
      def initialize(io, wait)
        @io = io
        @wait = wait
      end

      attr_reader :io

      def tick
        @wait -= 1
      end

      def ready?
        @wait <= 0
      end
    end

    MAX_LINES_AT_ONCE = 1000

    class IOHandler
      def initialize(io, pe, &receive_lines)
        $log.info "following tail of #{io.path}"
        @io = io
        @pe = pe
        @receive_lines = receive_lines
        @buffer = ''.force_encoding('ASCII-8BIT')
        @iobuf = ''.force_encoding('ASCII-8BIT')
      end

      attr_reader :io

      def on_notify
        begin
          lines = []
          read_more = false

          begin
            while true
              if @buffer.empty?
                @io.read_nonblock(2048, @buffer)
              else
                @buffer << @io.read_nonblock(2048, @iobuf)
              end
#              while line = @buffer.slice!(/.*?^2\n/m)
              #while line = @buffer.slice!(/^2([0-9][0-9][0-9])[-]([0-9][0-9])[-]([0-9][0-9]).*/)
              while line = @buffer.slice!(/(^\d{4}(([^\|]*)?\|){11})/)
                lines << line
              end
              if lines.size >= MAX_LINES_AT_ONCE
                # not to use too much memory in case the file is very large
                read_more = true
                break
              end
            end
          rescue EOFError
          end

          unless lines.empty?
            @receive_lines.call(lines)
            @pe.update_pos(@io.pos - @buffer.bytesize)
          end

        end while read_more

      rescue
        $log.error $!.to_s
        $log.error_backtrace
        close
      end

      def close
        @io.close unless @io.closed?
      end
    end

    class NullIOHandler
      def initialize
      end

      def io
      end

      def on_notify
      end

      def close
      end
    end

    class RotateHandler
      def initialize(path, &on_rotate)
        @path = path
        @inode = nil
        @fsize = -1  # first
        @on_rotate = on_rotate
        @path = path
      end

      def on_notify
        begin
          io = File.open(@path)
          stat = io.stat
          inode = stat.ino
          fsize = stat.size
        rescue Errno::ENOENT
          # moved or deleted
          inode = nil
          fsize = 0
        end

        begin
          if @inode != inode || fsize < @fsize
            # rotated or truncated
            @on_rotate.call(io)
            io = nil
          end

          @inode = inode
          @fsize = fsize
        ensure
          io.close if io
        end

      rescue
        $log.error $!.to_s
        $log.error_backtrace
      end
    end
  end


  class PositionFile
    def initialize(file, map, last_pos)
      @file = file
      @map = map
      @last_pos = last_pos
    end

    def [](path)
      if m = @map[path]
        return m
      end

      @file.pos = @last_pos
      @file.write path
      @file.write "\t"
      seek = @file.pos
      @file.write "0000000000000000\t00000000\n"
      @last_pos = @file.pos

      @map[path] = FilePositionEntry.new(@file, seek)
    end

    def self.parse(file)
      map = {}
      file.pos = 0
      file.each_line {|line|
        m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(line)
        next unless m
        path = m[1]
        pos = m[2].to_i(16)
        ino = m[3].to_i(16)
        seek = file.pos - line.bytesize + path.bytesize + 1
        map[path] = FilePositionEntry.new(file, seek)
      }
      new(file, map, file.pos)
    end
  end

  # pos               inode
  # ffffffffffffffff\tffffffff\n
  class FilePositionEntry
    POS_SIZE = 16
    INO_OFFSET = 17
    INO_SIZE = 8
    LN_OFFSET = 25
    SIZE = 26

    def initialize(file, seek)
      @file = file
      @seek = seek
    end

    def update(ino, pos)
      @file.pos = @seek
      @file.write "%016x\t%08x" % [pos, ino]
      @inode = ino
    end

    def update_pos(pos)
      @file.pos = @seek
      @file.write "%016x" % pos
    end

    def read_inode
      @file.pos = @seek + INO_OFFSET
      @file.read(8).to_i(16)
    end

    def read_pos
      @file.pos = @seek
      @file.read(16).to_i(16)
    end
  end

  class MemoryPositionEntry
    def initialize
      @pos = 0
      @inode = 0
    end

    def update(ino, pos)
      @inode = ino
      @pos = pos
    end

    def update_pos(pos)
      @pos = pos
    end

    def read_pos
      @pos
    end

    def read_inode
      @inode
    end
  end
end


end

--- cut here ---

Thanks a lot
Reply all
Reply to author
Forward
Message has been deleted
0 new messages