How do I add data to FeatureBase tables using Kafka?
These instructions apply to Kafka schemas managed by Confluent Schema Management.
FeatureBase recommends using Confluent Schema Management because it makes it easier to setup Kafka dependencies in a local environment:
- Schema registry
- Apache Kafka
- Apache Zookeeper
The Kafka Confluent ingest process:
- streams and reads encoded records from a Kafka topic
- uses Confluent Schema Management to determine the message schema, destination FeatureBase field data types
- ingests the data into FeatureBase tables.
Table of contents
Before you begin
Avro schema for Kafka messages
{
"namespace": "<namespace>",
"type": "record",
"name": "<name-of-schema>",
"fields": [
<Kafka-Avro-data-types>,
]
}
Kafka JSON parameters
Parameter | Description | Required | Additional information |
namespace | | | |
"type": "record" | The schema defines records. | | |
name | | | |
fields | | | |
Mapping Avro and FeatureBase data types
The ./molecula-consumer-kafka
and ./molecula-consumer-kafka-delete
CLI commands:
- Read the Avro schema from the Confluent Schema Registry
- Infers the FeatureBase data type for each field
avro.Array
Avro data type | Properties | FeatureBase Data type |
avro.Array : avro.String | | STRINGSET |
avro.Array : avro.Bytes | | STRINGSET |
avro.Array : avro.Fixed | | STRINGSET |
avro.Array : avro.Enum | | STRINGSET |
avro.Array : avro.String | quantum=(YMD) | STRINGSETQ |
avro.Array : avro.Bytes | quantum=(YMD) | STRINGSETQ |
avro.Array : avro.Fixed | quantum=(YMD) | STRINGSETQ |
avro.Array : avro.Enum | quantum=(YMD) | STRINGSETQ |
avro.Array : avro.Long | | IDSET |
avro.Array : avro.Long | quantum=(YMD) | IDSETQ |
avro.Bytes
Avro data type | Properties | FeatureBase Data type |
avro.Bytes | logicalType=decimal, scale | DECIMAL |
avro.Bytes | fieldType=decimal, scale | DECIMAL |
avro.Bytes | fieldType=recordTime | STRINGSETQ and IDSETQ |
avro.Bytes | | STRINGSET |
avro.Bytes | mutex=true | STRING |
avro.Bytes | quantum=(YMD) | STRINGSETQ |
avro.boolean
Avro data type | Properties | FeatureBase Data type |
avro.Boolean | | BOOL |
avro.Double, avro.Float
Avro data type | Properties | FeatureBase Data type |
avro.Double, avro.Float | scale | DECIMAL |
avro.Enum
Avro data type | Properties | FeatureBase Data type |
avro.Enum | | STRING |
avro.Int, avro.Long
Avro data type | Properties | FeatureBase Data type |
avro.Int, avro.Long | fieldType=id | IDSET |
avro.Int, avro.Long | fieldType=id, mutex=false | ID |
avro.Int, avro.Long | fieldType=id, quantum=(YMD) | IDSETQ |
avro.Int, avro.Long | fieldType=int, min, max | INT |
avro.String
Avro data type | Properties | FeatureBase Data type |
avro.String | | STRINGSET |
avro.String | mutex=true | STRING |
avro.String | quantum=(YMD) | STRINGSETQ |
avro.Union
Avro data type | Properties | FeatureBase Data type |
avro.Union | | Supports one or two members (if two, one must be avro.NULL) |
Not supported in FeatureBase
Avro data type | Properties | FeatureBase Data type |
avro.Map | | NOT SUPPORTED |
avro.Null | | NOT SUPPORTED |
avro.Record | | NOT SUPPORTED |
avro.Recursive | | NOT SUPPORTED |
Kafka Avro data type syntax
Map Avro field data types and property key-value pairs to determine the FeatureBase record data type
BOOL
Kafka Avro fields | Description |
{"name": "bool_bool", "type": "boolean"} | FeatureBase Bool from Avro Boolean |
DECIMAL
Kafka Avro fields | Description |
{"name": "decimal_float", "type": "float", "fieldType": "decimal", "scale": 2} | FeatureBase Decimal from Avro Float |
ID
Kafka Avro fields | Description |
{"name": "id_long", "type": "long", "mutex": true, "fieldType": "id"} | FeatureBase ID from Avro Long |
{"name": "id_int", "type": "int", "mutex": true, "fieldType": "id"} | FeatureBase ID from Avro int |
IDSET
Kafka Avro fields | Description |
{"name": "idset_int", "type": "int", "fieldType": "id"} | FeatureBase IDSET from Avro Int |
{"name": "idset_intarray", "type": {"type": "array", "items": "int"}} | FeatureBase IDSET from Avro Int Array |
IDSETQ
STRINGSETQ
Avro strings require a matching RecordTime
field in the Avro Schema. Examples are provided below.
Kafka Avro string | Description | Required in Avro schema |
{"name": "idsetq_int", "type": "int", "fieldType": "id", "quantum": "YMD"} | FeatureBase IDSETQ from Avro Int | {"name": "recordtime_bytes", "type": "bytes", "fieldType": "recordTime", "layout": "2006-01-02 15:04:05", "unit": "s"} |
{"name": "idsetq_intarray", "type": "array", "items": {"type": "int", "quantum": "YMD"}} | FeatureBase IDSETQ from Avro Int Array | {"name": "recordtime_bytes", "type": "bytes", "fieldType": "recordTime", "layout": "2006-01-02 15:04:05", "unit": "s"} |
INT
Kafka Avro fields | Description |
{"name": "int_int", "type": "int", "fieldType": "int"} | FeatureBase Int from Avro Int |
Strings
Kafka Avro fields | Description | Additional |
{"name": "string_string", "type": "string", "mutex": true } | FeatureBase String from Avro String | |
{"name": "string_bytes", "type": "bytes" , "mutex": true } | FeatureBase String from Avro Bytes | |
{"name": "string_enum", "type": "enum"} | FeatureBase String from Avro Enum | |
{"name": "string_string", "type": ["string", "null"], "mutex": true } | Optional String | Ignore missing fields |
{"name": "stringset_stringarray", "type": [{"type": "array", "items": "string"}, "null"]} | Optional Array of Strings | Ignore missing fields |
STRINGSET
Kafka Avro string | Description |
{"name": "stringset_string", "type": "string"} | FeatureBase StringSet from Avro String |
{"name": "stringset_bytes", "type": "bytes"} | FeatureBase StringSet from Avro Bytes |
{"name": "stringset_stringarray", "type": {"type": "array", "items": "string"}} | FeatureBase StringSet from Avro String Array |
STRINGSETQ
STRINGSETQ
Avro strings require a matching RecordTime
field in the Avro Schema. Examples are provided below.
Kafka Avro string | Description | Required in Avro schema |
{“name”: “stringsetq_string”, “type”: “string”, “quantum”: “YMD”} | FeatureBase StringSetQ with Day Granularity from Avro String | {"name": "recordtime_bytes", "type": "bytes", "fieldType": "recordTime", "layout": "2006-01-02 15:04:05", "unit": "s"} |
{“name”: “stringsetq_stringarray”, “type”: “array”, “items”: {“type”: “string”, “quantum”: “YMD”}} | FeatureBase StringSetQ with Day Granularity from Avro String Array | {"name": "recordtime_bytes", "type": "bytes", "fieldType": "recordTime", "layout": "2006-01-02 15:04:05", "unit": "s"} |
TIMESTAMP
Kafka Avro string | Description | Additional |
{"name": "timestamp_bytes_ts", "type": "bytes", "fieldType": "timestamp", "layout": "2006-01-02 15:04:05", "epoch": "1970-01-01 00:00:00"} | FeatureBase Timestamp from Avro Bytes | Expects byte representation of string timestamp |
{"name": "timestamp_bytes_int", "type": ["bytes", "null"], "fieldType": "timestamp", "unit": "s", "layout": "2006-01-02 15:04:05", "epoch": "1970-01-01 00:00:00"} | FeatureBase Timestamp from Avro Int | |
Examples
Next step