Skip to main content Link Menu Expand (external link) Document Search Copy Copied

How do I add data to FeatureBase tables using Kafka?

These instructions apply to Kafka schemas managed by Confluent Schema Management.

FeatureBase recommends using Confluent Schema Management because it makes it easier to setup Kafka dependencies in a local environment:

  • Schema registry
  • Apache Kafka
  • Apache Zookeeper

The Kafka Confluent ingest process:

  • streams and reads encoded records from a Kafka topic
  • uses Confluent Schema Management to determine the message schema, destination FeatureBase field data types
  • ingests the data into FeatureBase tables.
Table of contents

Before you begin

Avro schema for Kafka messages

{
    "namespace": "<namespace>",
    "type": "record",
    "name": "<name-of-schema>",
    "fields": [
        <Kafka-Avro-data-types>,
    ]
}

Kafka JSON parameters

Parameter Description Required Additional information
namespace      
"type": "record" The schema defines records.    
name      
fields      

Additional information

Mapping Avro and FeatureBase data types

The ./molecula-consumer-kafka and ./molecula-consumer-kafka-delete CLI commands:

  • Read the Avro schema from the Confluent Schema Registry
  • Infers the FeatureBase data type for each field

avro.Array

Avro data type Properties FeatureBase Data type
avro.Array : avro.String   STRINGSET
avro.Array : avro.Bytes   STRINGSET
avro.Array : avro.Fixed   STRINGSET
avro.Array : avro.Enum   STRINGSET
avro.Array : avro.String quantum=(YMD) STRINGSETQ
avro.Array : avro.Bytes quantum=(YMD) STRINGSETQ
avro.Array : avro.Fixed quantum=(YMD) STRINGSETQ
avro.Array : avro.Enum quantum=(YMD) STRINGSETQ
avro.Array : avro.Long   IDSET
avro.Array : avro.Long quantum=(YMD) IDSETQ

avro.Bytes

Avro data type Properties FeatureBase Data type
avro.Bytes logicalType=decimal, scale DECIMAL
avro.Bytes fieldType=decimal, scale DECIMAL
avro.Bytes fieldType=recordTime STRINGSETQ and IDSETQ
avro.Bytes   STRINGSET
avro.Bytes mutex=true STRING
avro.Bytes quantum=(YMD) STRINGSETQ

avro.boolean

Avro data type Properties FeatureBase Data type
avro.Boolean   BOOL

avro.Double, avro.Float

Avro data type Properties FeatureBase Data type
avro.Double, avro.Float scale DECIMAL

avro.Enum

Avro data type Properties FeatureBase Data type
avro.Enum   STRING

avro.Int, avro.Long

Avro data type Properties FeatureBase Data type
avro.Int, avro.Long fieldType=id IDSET
avro.Int, avro.Long fieldType=id, mutex=false ID
avro.Int, avro.Long fieldType=id, quantum=(YMD) IDSETQ
avro.Int, avro.Long fieldType=int, min, max INT

avro.String

Avro data type Properties FeatureBase Data type
avro.String   STRINGSET
avro.String mutex=true STRING
avro.String quantum=(YMD) STRINGSETQ

avro.Union

Avro data type Properties FeatureBase Data type
avro.Union   Supports one or two members (if two, one must be avro.NULL)

Not supported in FeatureBase

Avro data type Properties FeatureBase Data type
avro.Map   NOT SUPPORTED
avro.Null   NOT SUPPORTED
avro.Record   NOT SUPPORTED
avro.Recursive   NOT SUPPORTED

Kafka Avro data type syntax

Map Avro field data types and property key-value pairs to determine the FeatureBase record data type

BOOL

Kafka Avro fields Description
{"name": "bool_bool", "type": "boolean"} FeatureBase Bool from Avro Boolean

DECIMAL

Kafka Avro fields Description
{"name": "decimal_float", "type": "float", "fieldType": "decimal", "scale": 2} FeatureBase Decimal from Avro Float

ID

Kafka Avro fields Description
{"name": "id_long", "type": "long", "mutex": true, "fieldType": "id"} FeatureBase ID from Avro Long
{"name": "id_int", "type": "int", "mutex": true, "fieldType": "id"} FeatureBase ID from Avro int

IDSET

Kafka Avro fields Description
{"name": "idset_int", "type": "int", "fieldType": "id"} FeatureBase IDSET from Avro Int
{"name": "idset_intarray", "type": {"type": "array", "items": "int"}} FeatureBase IDSET from Avro Int Array

IDSETQ

STRINGSETQ Avro strings require a matching RecordTime field in the Avro Schema. Examples are provided below.

Kafka Avro string Description Required in Avro schema
{"name": "idsetq_int", "type": "int", "fieldType": "id", "quantum": "YMD"} FeatureBase IDSETQ from Avro Int {"name": "recordtime_bytes", "type": "bytes", "fieldType": "recordTime", "layout": "2006-01-02 15:04:05", "unit": "s"}
{"name": "idsetq_intarray", "type": "array", "items": {"type": "int", "quantum": "YMD"}} FeatureBase IDSETQ from Avro Int Array {"name": "recordtime_bytes", "type": "bytes", "fieldType": "recordTime", "layout": "2006-01-02 15:04:05", "unit": "s"}

INT

Kafka Avro fields Description
{"name": "int_int", "type": "int", "fieldType": "int"} FeatureBase Int from Avro Int

Strings

Kafka Avro fields Description Additional
{"name": "string_string", "type": "string", "mutex": true } FeatureBase String from Avro String  
{"name": "string_bytes", "type": "bytes" , "mutex": true } FeatureBase String from Avro Bytes  
{"name": "string_enum", "type": "enum"} FeatureBase String from Avro Enum  
{"name": "string_string", "type": ["string", "null"], "mutex": true } Optional String Ignore missing fields
{"name": "stringset_stringarray", "type": [{"type": "array", "items": "string"}, "null"]} Optional Array of Strings Ignore missing fields

STRINGSET

Kafka Avro string Description
{"name": "stringset_string", "type": "string"} FeatureBase StringSet from Avro String
{"name": "stringset_bytes", "type": "bytes"} FeatureBase StringSet from Avro Bytes
{"name": "stringset_stringarray", "type": {"type": "array", "items": "string"}} FeatureBase StringSet from Avro String Array

STRINGSETQ

STRINGSETQ Avro strings require a matching RecordTime field in the Avro Schema. Examples are provided below.

Kafka Avro string Description Required in Avro schema
{“name”: “stringsetq_string”, “type”: “string”, “quantum”: “YMD”} FeatureBase StringSetQ with Day Granularity from Avro String {"name": "recordtime_bytes", "type": "bytes", "fieldType": "recordTime", "layout": "2006-01-02 15:04:05", "unit": "s"}
{“name”: “stringsetq_stringarray”, “type”: “array”, “items”: {“type”: “string”, “quantum”: “YMD”}} FeatureBase StringSetQ with Day Granularity from Avro String Array {"name": "recordtime_bytes", "type": "bytes", "fieldType": "recordTime", "layout": "2006-01-02 15:04:05", "unit": "s"}

TIMESTAMP

Kafka Avro string Description Additional
{"name": "timestamp_bytes_ts", "type": "bytes", "fieldType": "timestamp", "layout": "2006-01-02 15:04:05", "epoch": "1970-01-01 00:00:00"} FeatureBase Timestamp from Avro Bytes Expects byte representation of string timestamp
{"name": "timestamp_bytes_int", "type": ["bytes", "null"], "fieldType": "timestamp", "unit": "s", "layout": "2006-01-02 15:04:05", "epoch": "1970-01-01 00:00:00"} FeatureBase Timestamp from Avro Int  

Examples

Next step

Further information