Consumer Usage Examples
Also refer to the ingester configuration reference page for details on configuration flags for all consumers.
FeatureBase consumer binaries
CSV consumer
Example 1: Simple ingest, internal header, multiple-file, ID autogen
Command:
molecula-consumer-csv \
--batch-size=10000 \
--auto-generate \
--index=csv-ingest-test \
--files=sample.csv,sample2.csv \
sample.csv:
asset_tag__String,fan_time__RecordTime_2006-01-02,fan_val__String_F_YMD
ABCD,2019-01-02,70%
ABCD,2019-01-03,20%
BEDF,2019-01-02,70%
BEDF,2019-01-05,90%
ABCD,2019-01-30,40%
BEDF,2019-01-08,10%
BEDF,2019-01-08,20%
ABCD,2019-01-04,30%
Note the Header Specification:
asset_tag__String,fan_time__RecordTime_2006-01-02,fan_val__String_F_YMD
This specifies three fields:
asset_tag
, aString
field, …fan_time
, aRecordTime
field, with timestamp layout format2006-01-02
(according to go time)fan_val
, aString
field, …
Example 2: Simple ingest, external header
Command:
molecula-consumer-csv \
--batch-size=10000 \
--auto-generate \
--header=asset_tag__String,fan_time__RecordTime_2006-01-02,fan_val__String_F_YMD \
--ignore-header
--index=csv-ingest-test \
--files=sample.csv,sample2.csv \
sample.csv:
asset_tag,fan_time,fan_val
ABCD,2019-01-02,70%
ABCD,2019-01-03,20%
BEDF,2019-01-02,70%
BEDF,2019-01-05,90%
ABCD,2019-01-30,40%
BEDF,2019-01-08,10%
BEDF,2019-01-08,20%
ABCD,2019-01-04,30%
Note that a header line is present in the csv file, however, we are ignoring it. The Header specification is passed in the command line with --header
.
Example 3: Simple ingest over tls
Command
molecula-consumer-csv \
--pilosa-hosts=https://localhost:10101
--tls.certificate=featurebase.local.crt \
--tls.key=featurebase.local.key \
--tls.skip-verify \
--batch-size=10000 \
--auto-generate \
--index=csv-ingest-test \
--files=sample.csv \
Or, equivalently, with the --future.rename
configuration flag:
molecula-consumer-csv \
--future.rename \
--featurebase-hosts=https://localhost:10101
--tls.certificate=featurebase.local.crt \
--tls.key=featurebase.local.key \
--tls.skip-verify \
--batch-size=10000 \
--auto-generate \
--index=csv-ingest-test \
--files=sample.csv \
Note that we provide tls configuration (--tls.certificate
and --tls.key
) to the command for securely connecting to FeatureBase. The values for these parameters are the filenames for certificate and key that correspond to the target FeatureBase instances. If verification of certificate is not desired (especially for self-signed certificates), we need to include --tls.skip-verify
. Since the default bind point for FeatureBase hosts are HTTP, we provide the bind points using --pilosa-hosts
(or, --featurebase-hosts
with the --future.rename
configuration flag).
Kafka consumer
The Kafka consumer requires:
- A list of Kafka hosts
- A FeatureBase index name (
--index <indexname>
), - Exactly one primary key method (
--primary-key-field <fieldnames>
,--id-field <fieldname>
or--auto-generate
),
Example 1: minimal
Command: molecula-consumer-kafka
Data:
Kafka delete consumer
Configuration and usage for this consumer is identical to the Kafka consumer, with the exception of the pilosa-grpc-hosts
(or featurebase-grpc-hosts
with the --future.rename
configuration flag). Instead of inserting data into FeatureBase from the received messages, it deletes the corresponding data from FeatureBase. This provides delete capabilities using the same Kafka interface.
Kafka static consumer
Configuration and usage for this consumer is similar to the kafka consumer. It was developed for use in scenarios where Kafka is used without Confluent Schema Registry. In this case, the schema must be provided explicitly; the “static” in the consumer name refers to this “static schema”. For compatibility with complex JSON message formats, the schema is specified with a JSON document rather than the “header spec” used in other consumers.
Example 1: Simple ingest
Command:
molecula-consumer-kafka-static \
--kafka-hosts "localhost:9092" \
--index kafka-test \
--batch-size 10000 \
--topics test-topic \
--max-msgs 10000 \
--auto-generate \
--external-generate \
--header kafka-static-header-1.json
kafka-static-header-1.json:
[
{
"name": "int-featurebase-name",
"path": [
"int-kafka-path"
],
"type": "int"
},
{
"name": "string-featurebase-name",
"path": [
"string-kafka-path"
],
"type": "string"
}
]
Example Kafka message:
{
"int-kafka-path": 12345,
"string-kafka-path": "arbitraryString"
}
The header file specifies two fields:
int-featurebase-name
, a FeatureBase field of typeint
, populated with the value from theint-kafka-path
item in the Kafka message.string-featurebase-name
, a string-keyed FeatureBase field of typeset
, populated with the value from thestring-kafka-path
item in the Kafka message.
Example 2:
Command:
molecula-consumer-kafka-static \
--kafka-hosts "localhost:9092" \
--index kafka-test \
--batch-size=10000 \
--topics test-topic \
--auto-generate \
--allow-missing-fields \
--header kafka-static-header-2.json
kafka-static-header-2.json:
[
{
"name": "from_ip",
"path": [
"from_interface",
"ip"
],
"type": "string"
},
{
"name": "from_port",
"path": [
"from_interface",
"port"
],
"type": "int"
},
{
"name": "to_ip",
"path": [
"to_interface",
"ip"
],
"type": "string"
},
{
"name": "to_port",
"path": [
"to_interface",
"port"
],
"type": "int"
},
{
"name": "event_time",
"path": [
"event_time"
],
"type": "timestamp"
},
{
"name": "severity",
"path": [
"severity"
],
"type": "set"
},
{
"name": "bytes",
"path": [
"bytes"
],
"type": "int"
},
{
"name": "protocol",
"path": [
"protocol"
],
"type": "string"
}
]
Sample Kafka message:
{
"from_interface": {
"ip": "10.203.33.18",
"port": 38935
},
"to_interface": {
"ip": "203.77.221.220",
"port": 5872
},
"event_time": "2021-06-01T16:02:55Z06:00",
"protocol": "UDP",
"severity": 0,
"bytes": 8593
}
SQL consumer
Example 1
The molecula-consumer-sql
examples use sample data represented by the following two tables.
Table: assets
asset_tag | weight | warehouse |
---|---|---|
ABCD | 16 | US-EAST |
EFGH | 9 | US-WEST |
IJKL | 47 | US-WEST |
MNOP | 30 | US-EAST |
Table: events
pk | asset_tag | fan_time | fan_vol |
---|---|---|---|
aus-14 | ABCD | 2021-06-21 | 90% |
aus-15 | EFGH | 2021-06-19 | 10% |
aus-16 | ABCD | 2021-06-20 | 60% |
den-11 | IJKL | 2021-06-19 | 70% |
den-12 | MNOP | 2021-06-20 | 90% |
nyc-78 | MNOP | 2021-06-21 | 80% |
den-13 | MNOP | 2021-06-21 | 80% |
nyc-79 | ABCD | 2021-06-21 | 30% |
based on these CREATE TABLE statements:
CREATE TABLE `assets` (
`asset_tag` char(4) NOT NULL DEFAULT '',
`weight` int(8) DEFAULT 0,
`warehouse` char(10),
PRIMARY KEY (`asset_tag`)
);
CREATE TABLE `events` (
`pk` char(10) NOT NULL DEFAULT '',
`asset_tag` char(4) NOT NULL DEFAULT '',
`fan_time` date NOT NULL,
`fan_vol` char(10),
PRIMARY KEY (`pk`)
);
This example creates an index based directly on the assets
table:
molecula-consumer-sql \
--driver mysql \
--connection-string 'username:password@(127.0.0.1:3306)/dbname' \
--pilosa-hosts localhost:10101 \
--batch-size 10000 \
--index=asset_list \
--primary-key-fields 'asset_tag' \
--row-expr 'SELECT asset_tag as asset_tag__String, weight as weight__Int, warehouse as warehouse__String FROM assets'
Or, equivalently, with the --future.rename
configuration flag:
molecula-consumer-sql \
--future.rename \
--driver mysql \
--connection-string 'username:password@(127.0.0.1:3306)/dbname' \
--featurebase-hosts localhost:10101 \
--batch-size 10000 \
--index=asset_list \
--primary-key-fields 'asset_tag' \
--row-expr 'SELECT asset_tag as asset_tag__String, weight as weight__Int, warehouse as warehouse__String FROM assets'
It’s important to understand that the data to be indexed does not necessarily align to a table in the source database. Rather, it aligns with the results of the SQL query specified in the --row-expr
argument. In other words, you can use any SQL functionality to join or modify data, and the result of that query is what the consumer considers to be the source data.
So for example, you might want to ingest the events
table, but treat the fan_vol
field as an integer instead of a string. In that case, you can simply use SQL functions to convert that string field to an integer-ready value:
molecula-consumer-sql \
--driver mysql \
--connection-string 'username:password@(127.0.0.1:3306)/dbname' \
--pilosa-hosts localhost:10101 \
--batch-size 10000 \
--index=event_list \
--primary-key-fields 'pk' \
--row-expr 'SELECT pk as pk__String, asset_tag as asset_tag__String, fan_time as `fan_time__Timestamp_s_2006-01-02`, SUBSTRING(fan_vol, 1, CHAR_LENGTH(fan_vol)-1) as fan_vol__Int FROM events'
Or, equivalently, with the --future.rename
configuration flag:
molecula-consumer-sql \
--future.rename \
--driver mysql \
--connection-string 'username:password@(127.0.0.1:3306)/dbname' \
--featurebase-hosts localhost:10101 \
--batch-size 10000 \
--index=event_list \
--primary-key-fields 'pk' \
--row-expr 'SELECT pk as pk__String, asset_tag as asset_tag__String, fan_time as `fan_time__Timestamp_s_2006-01-02`, SUBSTRING(fan_vol, 1, CHAR_LENGTH(fan_vol)-1) as fan_vol__Int FROM events'
This example illustrates using a SQL query which joins data from both tables into a single FeatureBase index which contains the event data along with the weight
of the relative asset_tag
. It also creates a new field completely, called locale
, based on the first three characters of the pk
field:
molecula-consumer-sql \
--driver mysql \
--connection-string 'username:password@(127.0.0.1:3306)/dbname' \
--pilosa-hosts localhost:10101 \
--batch-size 10000 \
--index=events_plus_weight \
--primary-key-fields 'pk' \
--row-expr 'SELECT events.pk as pk__String, events.asset_tag as asset_tag__String, assets.weight as weight__Int, SUBSTRING(events.pk, 1, 3) as locale__String FROM events INNER JOIN assets on assets.asset_tag = events.asset_tag'
Or, equivalently, with the --future.rename
configuration flag:
molecula-consumer-sql \
--future.rename \
--driver mysql \
--connection-string 'username:password@(127.0.0.1:3306)/dbname' \
--featurebase-hosts localhost:10101 \
--batch-size 10000 \
--index=events_plus_weight \
--primary-key-fields 'pk' \
--row-expr 'SELECT events.pk as pk__String, events.asset_tag as asset_tag__String, assets.weight as weight__Int, SUBSTRING(events.pk, 1, 3) as locale__String FROM events INNER JOIN assets on assets.asset_tag = events.asset_tag'