Skip to main content Link Menu Expand (external link) Document Search Copy Copied

Kafka Loader

If fbsql is provided the --loader-kafka=filename flag, it will run as a Kafka consumer in non-interactive mode. Based on the configuration provided in filename, fbsql will read messages from a Kafka topic and submit them to FeatureBase via BULK INSERT statements. In this mode, fbsql processes messages until terminated by the user.

Kafka Specific Configuration Options

Messages from Kafka must be in JSON format.

General

The table below holds the key/value pairs supported in the TOML configuration file if you are connecting to kafka:

Key Description Example Value Default
hosts Specifies one more more Kafka broker hosts. If only one host is needed, this value can be provided without brackets. ["host1", "host2"]  
group Kafka consumer group. See the Confluent Documentation for more information. "groupname"  
topics Specifies one more more Kafka topics to consumer from. If only one topic is needed, this value can be provided without brackets. ["topic1", "topic2"]  
batch-max-staleness Maximum length of time that the oldest record in a batch can exist before flushing the batch. Note that this can potentially stack with timeouts waiting for the source. The format of this value should be provided as a duration string, which is a possibly signed sequence of decimal numbers, each with optional fraction and a unit suffix, such as “300ms”, “-1.5h” or “2h45m”. Valid time units are “ns”, “us” (or “µs”), “ms”, “s”, “m”, “h”. Set this value to 0 to disable timeouts. 2h 1s
timeout Time to wait for more records from Kafka before flushing a batch. The format of this value should be provided as a duration string, which is a possibly signed sequence of decimal numbers, each with optional fraction and a unit suffix, such as “300ms”, “-1.5h” or “2h45m”. Valid time units are “ns”, “us” (or “µs”), “ms”, “s”, “m”, “h”. Set this value to 0 to disable timeouts. 1m 1s

Fields

The table below holds the key/value pairs supported in the TOML fields array if you are connecting to kafka:

Key Description Example Value Default
source-path If the data for a particular FeatureBase column needs to be extracted from a nested JSON object, that can be specified using source-path. Each additional element in the array represents a nested key. If source-path is not provided, it will default to the value provided in name. ["key", "sub-key"] value of name

Example

FeatureBase CREATE TABLE statement:

create table events (
    _id id,
    name string,
    qty int,
    categories stringset,
    level decimal(2),
    ts timestamp
)

Sample Kafka Message:

{
    "event_id":30,
    "name":"name-30",
    "qty":4,
    "demo":{
        "categories":["A", "C"],
        "level":24.56
    },
    "ts":"2023-02-27T12:26:25.091284-06:00"
}

fbsql configuration file pointed to by --loader-kafka:

hosts = ["localhost:9092"]
group = "grp"
topics = "events"
table = "events"
batch-size = 300
batch-max-staleness = "5s"
timeout = "5s"

[[fields]]
name        = "event_id"
source-type = "id"
primary-key = true

[[fields]]
name        = "name"
source-type = "string"

[[fields]]
name        = "qty"
source-type = "int"

[[fields]]
name        = "categories"
source-type = "stringset"
source-path = ["demo", "categories"]

[[fields]]
name        = "level"
source-type = "decimal(2)"
source-path = ["demo", "level"]

[[fields]]
name        = "ts"
source-type = "timestamp"

Additional Resources