从 AutoMQ Kafka 加载流数据到 Timeplus

AutoMQ for Kafka(简称 AutoMQ Kafka ) 是一款基于云重新设计的云原生 Kafka。AutoMQ Kafka 内核开源并且100% 兼容 Kafka 协议,可以充分兑现云的红利。相比自建 Apache Kafka,AutoMQ Kafka 在其云原生架构基础上实现的自动弹性、流量自平衡、秒级分区移动等特性可以为用户带来更低的总体拥有成本(TCO)。
本文将介绍如何通过 Timeplus 控制台将数据从 AutoMQ Kafka 导入 Timeplus。AutoMQ Kafka 100% 兼容 Apache Kafka 协议,因此你也可以创建 Kafka 的外部流来分析 AutoMQ 中的数据而不移动数据。

准备 AutoMQ Kafka 环境和测试数据

参考 AutoMQ 快速入门部署好 AutoMQ Kafka 集群。请确保 Timeplus 能够与您的 AutoMQ Kafka 服务器直接连接。 您可以使用像 ngrok 这样的工具将你的本地 AutoMQ Kafka 代理安全地暴露在互联网上,这样 Timeplus Cloud 就可以连接到它。 查看博客了解更多详情。

注意
如果您保持 IP 白名单,则需要将我们的静态 IP 列入白名单:
52.83.159.13 对于 cloud.timeplus.com.cn

在AutoMQ Kafka中快速创建一个名为 example_topic 的主题并向其中写入一条测试JSON数据,可以通过以下步骤实现:

创建Topic:

使用Kafka的命令行工具来创建主题。你需要有Kafka环境的访问权限,并且确保Kafka服务正在运行。以下是创建主题的命令:

1
./kafka-topics.sh --create --topic exampleto_topic --bootstrap-server 10.0.96.4:9092  --partitions 1 --replication-factor 1

注意:将 topic 和 bootstarp-server 替换为你的Kafka服务器地址。

创建完topic可以用以下命令检查topic创建的结果

1
./kafka-topics.sh --describe example_topic --bootstrap-server 10.0.96.4:9092

生成测试数据:

生成一条简单的JSON格式的测试数据

1
2
3
4
5
6
{
"id": 1,
"name": "测试用户",
"timestamp": "2023-11-10T12:00:00",
"status": "active"
}

写入测试数据

使用Kafka的命令行工具或者编程方式将测试数据写入到example_topic。以下是使用命令行工具的一个示例:

1
echo '{"id": 1, "name": "测试用户", "timestamp": "2023-11-10T12:00:00", "status": "active"}' | sh kafka-console-producer.sh --broker-list 10.0.96.4:9092 --topic example_topic

注意:将 topic 和 bootstarp-server 替换为你的Kafka服务器地址。

使用如下命令可以查看刚写入的topic数据:

1
sh kafka-console-consumer.sh --bootstrap-server 10.0.96.4:9092 --topic example_topic --from-beginning

AutoMQ Kafka数据源

  1. 在左侧导航菜单中,单击 数据摄取,然后单击右上角的 添加数据 按钮。
  2. 在此弹出窗口中,您将看到您可以连接的数据源以及其他添加数据的方法。由于 AutoMQ Kafka 完全兼容 Apache Kafka,在此处可以直接点击 Apache Kafka。
  3. 输入broker URL ,由于默认创建的 AutoMQ Kafka 没有开启TLS和身份验证,因此这里先关闭TLS和身份验证。
  4. 输入 AutoMQ Kafka 主题的名称,并指定“读取为”的数据格式。 我们目前支持JSON、AVRO和文本格式。
    1. 如果 AutoMQ Kafka 主题中的数据采用 JSON 格式,但架构可能会随着时间的推移而发生变化,我们建议您选择 Text。 这样,整个 JSON 文档将保存为字符串,即使架构发生变化,您也可以应用与 JSON 相关的函数来提取值。
    2. 如果您选择AVRO,则有一个“自动提取”选项。 默认情况下,此选项处于关闭状态,这意味着整条消息将另存为字符串。 如果您将其打开,则 AVRO 消息中的顶级属性将被放入不同的列中。 这对您更方便查询,但不支持模式进化。 当选择AVRO时,您还需要指定schema注册表的地址、API密钥和密钥。
  5. 在接下来的“预览”步骤中,我们将从您指定的 AutoMQ Kafka 数据源中向您展示至少一个事件。
  6. 默认情况下,您的新数据源将在 Timeplus 中创建一个新流。 给这个新流命名并验证列信息(列名和数据类型)。 您也可以将一列设置为事件时间列。 如果您不这样做,我们将使用摄取时间作为事件时间。 或者,您可以从下拉列表中选择一个现有的流。
  7. 在预览您的数据后,您可以给源提供一个名称和一个可选的描述,并审查配置。 单击“完成”后,您的流数据将立即在指定的流中可用。

AutoMQ Kafka源说明

请注意:

  1. 目前,我们支持 AutoMQ Kafka 主题中的消息采用 JSON 和 AVRO 格式
  2. 主题级别 JSON 属性将被转换为流列。 对于嵌套属性, 元素将被保存为 String 列,然后您可以用 JSON functions 之一来查询它们。
  3. JSON消息中的数值或布尔值类型将被转换为流中的对应类型。
  4. 日期时间或时间戳将被保存为字符串列。 您可以通过 to_time function将它们转换回 DateTime。