Continuously load data from AutoMQ Kafka

AutoMQ for Kafka is a cloud-native version of Kafka redesigned for cloud environments. AutoMQ Kafka is open source and fully compatible with the Kafka protocol, fully leveraging cloud benefits. Compared to self-managed Apache Kafka, AutoMQ Kafka, with its cloud-native architecture, offers features like capacity auto scaling, self-balancing of network traffic, move partition in seconds. These features contribute to a significantly lower Total Cost of Ownership (TCO) for users.

This article will guide you through importing data into AutoMQ Kafka using StarRocks Routine Load. For an understanding of the basic principles of Routine Load, refer to the section on Routine Load Fundamentals.

Prepare Environment

Prepare StarRocks and test data.

Ensure you have a running StarRocks cluster. For demonstration purposes, this article follow the deployment guide to install a StarRocks cluster on a Linux machine via Docker.

Creating a database and test table with the primary key model:

1
2
3
4
5
6
7
8
9
10
11
12
create database automq_db;
create table users (
id bigint NOT NULL,
name string NOT NULL,
timestamp string NULL,
status string NULL
) PRIMARY KEY (id)
DISTRIBUTED BY HASH(id)
PROPERTIES (
"replication_num" = "1",
"enable_persistent_index" = "true"
);

Prepare AutoMQ Kafka and test data

To prepare your AutoMQ Kafka environment and test data, follow the AutoMQ Quick Start guide to deploy your AutoMQ Kafka cluster. Ensure that StarRocks can directly connect to your AutoMQ Kafka server.

To quickly create a topic named example_topic in AutoMQ Kafka and write a test JSON data into it, follow these steps:

Create a topic:

Use Kafka’s command-line tools to create a topic. Ensure you have access to the Kafka environment and the Kafka service is running. Here is the command to create a topic:

1
./kafka-topics.sh --create --topic example_topic --bootstrap-server 10.0.96.4:9092 --partitions 1 --replication-factor 1

Note: Replace topic and bootstrap-server with your Kafka server address.

To check the result of the topic creation, use this command:

1
./kafka-topics.sh --describe example_topic --bootstrap-server 10.0.96.4:9092

Generate test data:

Generate a simple JSON format test data

1
2
3
4
5
6
{
"id": 1,
"name": "测试用户",
"timestamp": "2023-11-10T12:00:00",
"status": "active"
}

Write Test Data:

Use Kafka’s command-line tools or programming methods to write test data into example_topic. Here is an example using command-line tools:

1
echo '{"id": 1, "name": "测试用户", "timestamp": "2023-11-10T12:00:00", "status": "active"}' | sh kafka-console-producer.sh --broker-list 10.0.96.4:9092 --topic example_topic

Note: Replace topic and bootstrap-server with your Kafka server address.

To view the recently written topic data, use the following command:

1
sh kafka-console-consumer.sh --bootstrap-server 10.0.96.4:9092 --topic example_topic --from-beginning

Creating a Routine Load Task

In the StarRocks command line, create a Routine Load task to continuously import data from the AutoMQ Kafka topic:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CREATE ROUTINE LOAD automq_example_load ON users
COLUMNS(id, name, timestamp, status)
PROPERTIES
(
"desired_concurrent_number" = "5",
"format" = "json",
"jsonpaths" = "[\"$.id\",\"$.name\",\"$.timestamp\",\"$.status\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "10.0.96.4:9092",
"kafka_topic" = "example_topic",
"kafka_partitions" = "0",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

Note: Replace kafka_broker_list with your Kafka server address.

Explanation of Parameters

Data Format

Specify the data format as JSON in the “format” = “json” of the PROPERTIES clause.

Data Extraction and Transformation

To specify the mapping and transformation relationship between the source data and the target table, configure the COLUMNS and jsonpaths parameters. The column names in COLUMNS correspond to the column names of the target table, and their order corresponds to the column order in the source data. The jsonpaths parameter is used to extract the required field data from JSON data, similar to newly generated CSV data. Then the COLUMNS parameter temporarily names the fields in jsonpaths in order. For more explanations on data transformation, please see Data Transformation during Import.

Note:If each JSON object per line has key names and quantities (order is not required) that correspond to the columns of the target table, there is no need to configure COLUMNS.

Verifying Data Import:

First, we check the Routine Load import job and confirm the Routine Load import task status is in RUNNING status.

1
show routine load\G;

Then, querying the corresponding table in the StarRocks database, we can observe that the data has been successfully imported.

1
2
3
4
5
6
7
8
StarRocks > select * from users;
+------+--------------+---------------------+--------+
| id | name | timestamp | status |
+------+--------------+---------------------+--------+
| 1 | 测试用户 | 2023-11-10T12:00:00 | active |
| 2 | 测试用户 | 2023-11-10T12:00:00 | active |
+------+--------------+---------------------+--------+
2 rows in set (0.01 sec)