I have a problem when write data to HDFS. This is the error: 2023-04-24 11:13:48.757+07 [main] ERROR [HdfsFileManager]: Could not initialize HDFS filesystem or failed to check for existence of publish and / or working directories..
//
// Copyright 2018 GoDataDriven B.V.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
http://www.apache.org/licenses/LICENSE-2.0//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// This is the default configuration.
divolte {
mappings {
my_mapping = {
schema_file = "../conf/ShopEventRecord.avsc"
mapping_script_file = "../conf/mapping2.groovy"
sources = [browser]
sinks = [a_sink]
}
}
global {
server {
// The host to which the server binds.
// Set to a specific IP address to selectively listen on that interface.
// If not present, a loopback-only address will be bound.
host = 0.0.0.0
// The bind host can be overridden using the DIVOLTE_HOST environment variable.
//host = ${?DIVOLTE_HOST}
// The port on which the sever listens.
port = 8290
// Server port can be overridden using the DIVOLTE_PORT environment variable.
port = ${?DIVOLTE_PORT}
// Whether to use the X-Forwarded-For header HTTP header
// for determining the source IP of a request if present.
// When a X-Forwared-For header is present, the rightmost
// IP address of the value is used as source IP when
// when multiple IP addresses are separated by a comma.
// When the header is present more than once, the last
// value will be used.
// E.g.
// "X-Forwarded-For: 10.200.13.28, 11.45.82.30" ==> 11.45.82.30
//
// "X-Forwarded-For: 10.200.13.28"
// "X-Forwarded-For: 11.45.82.30" ==> 11.45.82.30
use_x_forwarded_for = false
// When true Divolte Collector serves a static test page at /.
serve_static_resources = true
// Whether requests (and their response) should be logged for debugging.
// This is for testing purposes only; it should never be enabled in production.
debug_requests = false
// When a shutdown is signalled, the delay until the shutdown really starts.
// During this period the /ping handler will already return 503 but other
// requests will be handled normally.
shutdown_delay = 0 ms
// After a shutdown starts, requests that are already underway will be allowed to
// complete. If they don't complete within this timeout the server will stop anyway.
shutdown_timeout = 2 minutes
}
mapper {
// Size of the buffer used by each mapper to hold the incoming
// events that need to be mapped. This is rounded up to the
// nearest power of two.
buffer_size = 1048576
// The number of threads each configured mapper should use to
// process the events.
threads = 1
// The amount of memory that each mapper thread should use for
// detecting duplicate events.
duplicate_memory_size = 1000000
// This section controls the user agent parsing settings. The user agent
// parsing is based on this library (
https://github.com/before/uadetector),
// which allows for dynamic reloading of the backing database if a internet
// connection is available.
user_agent_parser {
// The parser type. Possible values are:
// - non_updating: Uses a local database, bundled
// with Divolte Collector.
// - online_updating: Uses a online database only, never falls back
// to the local database.
// - caching_and_updating: Uses a cached version of the online database
// and periodically checks for new version at the
// remote location. Updates are downloaded
// automatically and cached locally.
type = non_updating
// User agent parsing is a relatively expensive operation that requires
// many regular expression evaluations. Very often the same user agent
// will make consecutive requests and many clients will have the exact
// same user agent as well. It therefore makes sense to cache the
// parsing results in memory and do a lookup before trying a parse.
// This setting determines how many unique user agent strings will be
// cached.
cache_size = 1000
}
}
hdfs {
// If true, flushing to HDFS is enabled.
enabled = true
// Number of threads to use for flushing events to HDFS.
// Each thread creates its own files on HDFS. Depending
// on the flushing strategy, multiple concurrent files
// could be kept open per thread.
threads = 2
// The maximum queue of mapped events to buffer before
// starting to drop new ones. Note that when this buffer is full,
// events are dropped and a warning is logged. No errors are reported
// to the source of the events. A single buffer is shared between all
// threads, and its size will be rounded up to the nearest power of 2.
buffer_size = 1048576
// Arbitrary HDFS client properties.
// If absent, hdfs-site.xml from the classpath will be used.
//client {}
}
gcs {
// If true, flushing to Google Cloud Storage is enabled.
enabled = false
// Number of threads to use for flushing events to Google Cloud Storage. Each
// thread creates its own files on Google Cloud Storage. Depending on the
// flushing strategy, multiple concurrent files could be kept open per thread.
threads = 1
// The maximum queue of mapped events to buffer before
// starting to drop new ones. Note that when this buffer is full,
// events are dropped and a warning is logged. No errors are reported
// to the source of the events. A single buffer is shared between all
// threads, and its size will be rounded up to the nearest power of 2.
buffer_size = 1048576
}
kafka {
// If true, flushing to Kafka is enabled.
enabled = true
// Number of threads to use for flushing events to Kafka
threads = 1
// The maximum queue of mapped events to buffer before
// starting to drop new ones. Note that when this buffer is full,
// events are dropped and a warning is logged. No errors are reported
// to the source of the events. A single buffer is shared between all
// threads, and its size will be rounded up to the nearest power of 2.
buffer_size = 1048576
// All settings in here are used as-is to configure
// the Kafka producer.
// See:
http://kafka.apache.org/082/documentation.html#newproducerconfigs producer = {
bootstrap.servers = ["localhost:9092"]
//bootstrap.servers = ${?DIVOLTE_KAFKA_BROKER_LIST}
//
client.id = 839b886f9b732b15
client.id = ${?DIVOLTE_KAFKA_CLIENT_ID}
acks = 1
retries = 0
//compression.type = lz4
//max.in.flight.requests.per.connection = 1
}
}
gcps {
// If ture, flushing to Google Cloud Pub/Sub is enabled.
enabled = false
// Number of threads to use for flushing events to Kafka
threads = 2
// The maximum queue of mapped events to buffer before
// starting to drop new ones. Note that when this buffer is full,
// events are dropped and a warning is logged. No errors are reported
// to the source of the events. A single buffer is shared between all
// threads, and its size will be rounded up to the nearest power of 2.
buffer_size = 1048576
// The project-id in which pub/sub topics to publish to can be found.
// The default project-id is picked up from the application environment.
#project-id =
}
}
sinks {
// The name of the sink. (It's referred to by the mapping.)
kafka {
type = kafka
// This is the name of the topic that data will be produced on
topic = dathocnetclick
},
a_sink {
type = hdfs
file_strategy.working_dir = /divolte/inflight
file_strategy.publish_dir = /divolte/published
}
}
// Sources, sinks and mappings are provided only if the user hasn't
// specified anything. Due to the merging rules for configuration,
// defaults are not present here: this is handled in code.
}