AutoMQ 数据导入 StarRocks 教程

背景

AutoMQ for Kafka(简称 AutoMQ Kafka ) 是一款基于云重新设计的云原生 Kafka。AutoMQ Kafka 内核开源并且100% 兼容 Kafka 协议,充分兑现云的红利,相比自建 Apache Kafka,AutoMQ Kafka 可在云上提供至多 10x 成本节约和至多 800x 效率提升。

StarRocks 是一款高性能开源分析性数据库。

本文介绍下如何从 AutoMQ for Kafka 同步数据到 StarRocks

准备 StarRocks 以及测试数据

请确保自己已经准备好了可用的 StarRocks 集群。本文为了方便演示过程,参考 使用 Docker 部署 StarRocks 在一台 Linux 机器上安装了作为 Demo 的 StarRocks 集群。

创建库和主键表

1
2
3
4
5
6
7
8
9
10
11
12
13
14

create database automq_db;

create table users (
id bigint NOT NULL,
name string NOT NULL,
timestamp string NULL,
status string NULL
) PRIMARY KEY (id)
DISTRIBUTED BY HASH(id)
PROPERTIES (
"replication_num" = "1",
"enable_persistent_index" = "true"
);

准备 AutoMQ for Kafka 以及测试数据

参考 AutoMQ 快速入门部署好 AutoMQ Kafka 集群。
在Apache Kafka中快速创建一个名为 example_topic 的主题并向其中写入一条测试JSON数据,可以通过以下步骤实现:

  1. 创建Topic:
    使用Kafka的命令行工具来创建主题。你需要有Kafka环境的访问权限,并且确保Kafka服务正在运行。以下是创建主题的命令:
1
./kafka-topics.sh --create --topic exampleto_topic --bootstrap-server 10.0.96.4:9092  --partitions 1 --replication-factor 1

创建完topic可以用以下命令检查topic创建的结果

1
./kafka-topics.sh --describe example_topic --bootstrap-server 10.0.96.4:9092

将 topic 和 bootstarp-server 替换为你的Kafka服务器地址。

  1. 生成测试数据:
    我将为你生成一条简单的JSON格式的测试数据,和前文的表需要对应:
1
2
3
4
5
6
{
"id": 1,
"name": "测试用户",
"timestamp": "2023-11-10T12:00:00",
"status": "active"
}

使用如下命令可以查看刚写入的topic数据:

1
sh kafka-console-consumer.sh --bootstrap-server 10.0.96.4:9092 --topic example_topic --from-beginning
  1. 写入测试数据:
    使用Kafka的命令行工具或者编程方式将测试数据写入到example_topic。以下是使用命令行工具的一个示例:
1
echo '{"id": 1, "name": "测试用户", "timestamp": "2023-11-10T12:00:00", "status": "active"}' | sh kafka-console-producer.sh --broker-list 10.0.96.4:9092 --topic example_topic

将 topic 和 bootstarp-server 替换为你的Kafka服务器地址。

Kafka connector配置

准备 worker.properties

在目录 $KAFKA_HOME下准备worker.properties配置connector worker。更多配置可以参考 Worker Configuration Properties。默认的worker配置模板可以参考$KAFKA_HOME/kafka/config/connect-standalong.properties。下面是一个模板示例,注意修改以下属性的值为自己的值:

  1. bootstrap.servers
  2. plugin.path
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=10.0.96.4:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/home/ec2-user/kafka/starrocks-kafka-connector-1.0.0

准备connect-StarRocks-sink.properties

1
2
3
4
5
6
7
8
9
name=starrocks-kafka-connector
connector.class=com.starrocks.connector.kafka.StarRocksSinkConnector
topics=example_topic
starrocks.http.url=10.0.3.149:8030
starrocks.username=root
starrocks.password=
starrocks.database.name=automq_db
key.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter=io.confluent.connect.json.JsonSchemaConverter

启动kafka connector

1
$KAFKA_HOME/kafka/bin/connect-standalone.sh worker.properties connect-StarRocks-sink.properties