Write data to HDFS

9 views
Skip to first unread message

Sang Đặng

unread,
Apr 24, 2023, 12:24:02 AM4/24/23
to divolte-collector
Dear Exhibitor,

I have a problem when write data to HDFS.  This is the error: 2023-04-24 11:13:48.757+07 [main] ERROR [HdfsFileManager]: Could not initialize HDFS filesystem or failed to check for existence of publish and / or working directories..
I setiing the confinf file as the below:
//
// Copyright 2018 GoDataDriven B.V.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

// This is the default configuration.
divolte {
  mappings {
    my_mapping = {
      schema_file = "../conf/ShopEventRecord.avsc"
      mapping_script_file = "../conf/mapping2.groovy"
      sources = [browser]
      sinks = [a_sink]
    }
  }
  global {
    server {
      // The host to which the server binds.
      // Set to a specific IP address to selectively listen on that interface.
      // If not present, a loopback-only address will be bound.
      host = 0.0.0.0
      // The bind host can be overridden using the DIVOLTE_HOST environment variable.
      //host = ${?DIVOLTE_HOST}

      // The port on which the sever listens.
      port = 8290
      // Server port can be overridden using the DIVOLTE_PORT environment variable.
      port = ${?DIVOLTE_PORT}

      // Whether to use the X-Forwarded-For header HTTP header
      // for determining the source IP of a request if present.
      // When a X-Forwared-For header is present, the rightmost
      // IP address of the value is used as source IP when
      // when multiple IP addresses are separated by a comma.
      // When the header is present more than once, the last
      // value will be used.
      // E.g.
      // "X-Forwarded-For: 10.200.13.28, 11.45.82.30" ==> 11.45.82.30
      //
      // "X-Forwarded-For: 10.200.13.28"
      // "X-Forwarded-For: 11.45.82.30" ==> 11.45.82.30
      use_x_forwarded_for = false

      // When true Divolte Collector serves a static test page at /.
      serve_static_resources = true

      // Whether requests (and their response) should be logged for debugging.
      // This is for testing purposes only; it should never be enabled in production.
      debug_requests = false

      // When a shutdown is signalled, the delay until the shutdown really starts.
      // During this period the /ping handler will already return 503 but other
      // requests will be handled normally.
      shutdown_delay = 0 ms

      // After a shutdown starts, requests that are already underway will be allowed to
      // complete. If they don't complete within this timeout the server will stop anyway.
      shutdown_timeout = 2 minutes
    }

    mapper {
      // Size of the buffer used by each mapper to hold the incoming
      // events that need to be mapped. This is rounded up to the
      // nearest power of two.
      buffer_size = 1048576

      // The number of threads each configured mapper should use to
      // process the events.
      threads = 1

      // The amount of memory that each mapper thread should use for
      // detecting duplicate events.
      duplicate_memory_size = 1000000

      // This section controls the user agent parsing settings. The user agent
      // parsing is based on this library (https://github.com/before/uadetector),
      // which allows for dynamic reloading of the backing database if a internet
      // connection is available.
      user_agent_parser {
        // The parser type. Possible values are:
        // - non_updating:         Uses a local database, bundled
        //                         with Divolte Collector.
        // - online_updating:      Uses a online database only, never falls back
        //                         to the local database.
        // - caching_and_updating: Uses a cached version of the online database
        //                         and periodically checks for new version at the
        //                         remote location. Updates are downloaded
        //                         automatically and cached locally.
        type = non_updating

        // User agent parsing is a relatively expensive operation that requires
        // many regular expression evaluations. Very often the same user agent
        // will make consecutive requests and many clients will have the exact
        // same user agent as well. It therefore makes sense to cache the
        // parsing results in memory and do a lookup before trying a parse.
        // This setting determines how many unique user agent strings will be
        // cached.
        cache_size = 1000
      }
    }

    hdfs {
      // If true, flushing to HDFS is enabled.
      enabled = true

      // Number of threads to use for flushing events to HDFS.
      // Each thread creates its own files on HDFS. Depending
      // on the flushing strategy, multiple concurrent files
      // could be kept open per thread.
      threads = 2

      // The maximum queue of mapped events to buffer before
      // starting to drop new ones. Note that when this buffer is full,
      // events are dropped and a warning is logged. No errors are reported
      // to the source of the events. A single buffer is shared between all
      // threads, and its size will be rounded up to the nearest power of 2.
      buffer_size = 1048576

      // Arbitrary HDFS client properties.
      // If absent, hdfs-site.xml from the classpath will be used.
      //client {}
    }

    gcs {
      // If true, flushing to Google Cloud Storage is enabled.
      enabled = false

      // Number of threads to use for flushing events to Google Cloud Storage. Each
      // thread creates its own files on Google Cloud Storage. Depending on the
      // flushing strategy, multiple concurrent files could be kept open per thread.
      threads = 1

      // The maximum queue of mapped events to buffer before
      // starting to drop new ones. Note that when this buffer is full,
      // events are dropped and a warning is logged. No errors are reported
      // to the source of the events. A single buffer is shared between all
      // threads, and its size will be rounded up to the nearest power of 2.
      buffer_size = 1048576
    }

    kafka {
      // If true, flushing to Kafka is enabled.
      enabled = true

      // Number of threads to use for flushing events to Kafka
      threads = 1

      // The maximum queue of mapped events to buffer before
      // starting to drop new ones. Note that when this buffer is full,
      // events are dropped and a warning is logged. No errors are reported
      // to the source of the events. A single buffer is shared between all
      // threads, and its size will be rounded up to the nearest power of 2.
      buffer_size = 1048576

      // All settings in here are used as-is to configure
      // the Kafka producer.
      // See: http://kafka.apache.org/082/documentation.html#newproducerconfigs
      producer = {
        bootstrap.servers = ["localhost:9092"]
        //bootstrap.servers = ${?DIVOLTE_KAFKA_BROKER_LIST}
        //client.id = 839b886f9b732b15
        client.id = ${?DIVOLTE_KAFKA_CLIENT_ID}

        acks = 1
        retries = 0
        //compression.type = lz4
        //max.in.flight.requests.per.connection = 1
      }
    }

    gcps {
      // If ture, flushing to Google Cloud Pub/Sub is enabled.
      enabled = false

      // Number of threads to use for flushing events to Kafka
      threads = 2

      // The maximum queue of mapped events to buffer before
      // starting to drop new ones. Note that when this buffer is full,
      // events are dropped and a warning is logged. No errors are reported
      // to the source of the events. A single buffer is shared between all
      // threads, and its size will be rounded up to the nearest power of 2.
      buffer_size = 1048576

      // The project-id in which pub/sub topics to publish to can be found.
      // The default project-id is picked up from the application environment.
      #project-id =
    }

  }
  sinks {
    // The name of the sink. (It's referred to by the mapping.)
    kafka {
      type = kafka

      // This is the name of the topic that data will be produced on
      topic = dathocnetclick
    },
    a_sink {
      type = hdfs

      file_strategy.working_dir = /divolte/inflight
      file_strategy.publish_dir = /divolte/published
    }
  }

  // Sources, sinks and mappings are provided only if the user hasn't
  // specified anything. Due to the merging rules for configuration,
  // defaults are not present here: this is handled in code.
}
Reply all
Reply to author
Forward
0 new messages