Entrepreneurial of Data Integration - Technology

Preface

This is the second article in a series of entrepreneurship in the field of data integration. The main focus is on discussing some technical considerations and insights when implementing a data integration software. The overall content will be organized into several topics.

Protocol design for data integration software.

The thinking about protocol design in the data integration software I implemented in the past was not sufficient, which led to a lack of overall unity at the kernel level. If there is a well-designed core protocol, it would be very beneficial for the development of data integration software kernels. For example, the protocol can specify what core objects, capabilities and standards need to be implemented at the kernel level, which helps to form an extensible, efficient and reasonable abstract kernel. This is also important for team development collaboration. Protocol design allows us to think backwards about what is necessary and needs to be abstracted in advance when implementing data integration software. This point gave me important inspiration because I looked at airbyte protocol. We can look at the design of airbyte protocol and learn what abilities must be abstracted and defined in advance when designing data integration software. Airbyte defines its protocols as a series of standard components and their interfaces that collaborate through inter-process communication to implement an ELT pipeline.
image.png
Airbyte lists the following content in the protocol for design, which is of important reference significance for us to implement our own data integration software protocol later:

  • data store:The unified abstraction of data sources, including API, JDBC database, file, and so on.
  • state/checkpoint: State management, among which the abstraction of checkpoints is relatively important. There can be many names for checkpoints, such as offset, position, and checkpoint.
  • source/destination:Abstraction of kernel source and destination. As the name suggests, the source is responsible for extracting data, while the destination is responsible for loading data into the data store.
  • namespace: Used for isolating data. For example, in MySQL it is called a database, while in PostgreSQL it is called a schema.
  • message:Abstraction of Top-Level Messages
  • data types

Whether to support transactions or not.

I think integration software can have transaction support, but it should not be the default option. Even if there is transaction support, it should be an additional option that is enabled later, rather than a default configuration. The main reason for saying this is:

  • There are almost no scenarios: Only big companies like Alibaba that have multi-site high availability which need to read and write both source and target database and keep their data consistent. By research from the real customers we accessed, we have not found any cases that have a strong demand for transactions.
  • Impact on performance and resource usage: The most critical aspect of supporting transactions is that you have to hold an entire transaction until it ends before writing to the other end. In scenarios with large transactions or high concurrency, various transaction holds can consume a lot of resources. To maintain order, changes within the same transaction must wait until the transaction ends before they can be written.
  • Inconvenient for batch processing optimization and memory management: In addition, when implementing transaction support,It is easy to have such a tendency to process message in memory and take transaction as the granularity. Because it is simple. However, the consequence of doing so is that it is difficult to optimize performance through batch processing and memory management. For example, consider the following ring buffer pool. We encapsulate a transactional message as an Event object. The 16 slots in the figure below can hold 16 events, but you have no idea how many “rows” of changes there are. Because one event represents one transaction and there will be many changes within a single transaction, you have no idea how many rows of changes there are in the entire buffer pool or how much memory they will occupy. As a result,optimize performance by batch processing is very difficult in such situation. When you feel that performance is insufficient and just increase the slot count, then the program may directly OOM because you do not know how much memory each slot will occupy and the size occupied by slots also dynamically change with transaction size. On the other hand, optimizing performance in the granularity of rows is relatively easy. We can estimate on average per row size based on database statistics information of a table. Assume that every row’s size is 1KB and every row occupy one slot in the ring buffer, then we can easily give a conclusion that the whole ring buffer holds a total of 16 row changes and approximately 16KB in total size. We can let user adjust row batch size themself or just let program control the batch size. The total memory overheads are predictable and optimize performance by batch processing is easy.

image.png

Consider using back pressure to improve memory utilization.

I have used the disruptor component in multiple data integration software that I implemented in the past. The benefits are obvious: cache line friendly, lock-free, and concurrent. However, its drawbacks are also apparent - it does not support backpressure, which often becomes the culprit of memory consumption in data integration software. The reason is simple: most of the time, the performance bottleneck of message transfer in data integration software lies in writing data to the target datasource. If writing data to target datasource is a bottleneck of the data pipeline task, messages that cannot be written to the target on time can only reside in a ring buffer pool, causing resource waste. Most data integration software is not a CPU-bound program and it’s no need to use disruptor which may cause a waste of memory because it not have the ability of backpressure. Disruptor is more suitable for CPU-bound scenarios and non-block cases.

Object mapping implementation trade-offs

The data integration software provides end-to-end full fresh and incremental sync capabilities and users often need the ability to map data objects. For example, if the source database name is A, the mapped database name on the target side would be B. For relational databases, this mapping relationship can be mapped at four dimensions: database, schema, table, and column.

There are actually two approaches here:

  • Based on user-defined configuration: The advantage is that it has strong capabilities, flexible functions, and can support mapping configurations at various levels. The disadvantage is that the implementation is more complex and the space occupied by mapping configurations themselves will be large. When migrating and synchronizing thousands of tables, the cost of transmitting configuration data in the network is relatively high. If based on custom configuration, a bottom-up mapping processing can be used to shield the complexity caused by inconsistent hierarchical levels of heterogeneous datasource data objects (such as mysql’s two-layer structure db/table, while postgresql’s is db/schema/table).
  • Based on rules: Mapping configurations based on rules have smaller space requirements and are much simpler to implement.

From the actual user scenarios I have encountered, a rule-based implementation is better. There are few scenarios where users need completely customizable mapping rules, and providing some rules is enough to meet their practical needs while ensuring simple and efficient kernel implementation. In the past, implementing a fully user-customizable configuration at the kernel level was quite complex, and because all data links depended on this kernel, it basically belonged to a “monolithic kernel implementation” that was difficult to modify. In reality, this approach was not very reasonable.

Testability should be considered from the beginning of the design.

The focus of testability for data integration software is reflected in two dimensions:

  • Kernel Testability: The testability of the data integration software kernel needs to be considered at the design stage, which can reverse require us to design a loosely coupled kernel. This point is easily overlooked in practice because we are always accustomed to implementing code first, and it will be much harder to adjust the code later to support kernel testability. The testability of the data integration software kernel is very important because there are too many data sources, including various RDBMSs, files, storage, APIs and so on. If the testability is not good enough, it will hinder us from building robust connector and data pipelines.
  • End-to-End Integration Testing: As an integration test, it focuses on functionality, data accuracy and performance from the perspective of the entire product dimension.

Real-time synchronization of DDL changes

In actual user usage scenarios, customers have a greater demand to subscribe newly added tables rather then sync DDL sql of already subscribed tables to target directly. In the scenario of online real-time incremental sync, users prefer to control the rhythm of DDL execution themselves in order to pursue better controllability and security. This is reasonable because DDL is a risky operation and it is not suitable from the perspective of DBA to leave it to data integration software for automatic execution.

Real-time synchronization of DDL is actually a quite heavy feature, because to support real-time synchronization of DDL, you have to do:

  • ddl parse
  • sql rewrite with target datasource’s dialect
  • Internal schema metadata update of data integration software(This usually used to support reset offset and data recall).

There are two mainstream implementation methods now:

  • Druid Parser: a Chinese product with an active community and continuous maintenance by the author. It is also used in Alibaba. Its performance is said to be better than ANTLR, but it may not support SQL integrity as well as ANTLR.
  • ANTLR: The advantage of this parser is that it has g4 files for various database complete syntaxes, and its completeness support is very good. The disadvantage may be that its performance needs to be optimized compared to Druid Parser.

As a very infrequent operation in the field of data integration, DDL can accept certain performance losses. From my perspective, using ANTLR for implementation is better in terms of controllability and support.

Decoupling of source and target

The decoupling of the source and target implementation in data integration software is a good practice. If the source and target are highly coupled, the complexity of the core will increase exponentially as the number of data sources increases. The following figure shows the change in the number of relationships between sources and endpoints before and after decoupling. By introducing an intermediate system, we can decouple the source from its endpoint, making the entire core cleaner.
image.png

Processing of data schema

The schema info is an important metadata, which is mainly applied in data integration software for:

  • Schema migration between heterogeneous data sources: To achieve this capability, it is necessary to obtain the schema information of the source and dest, so as to support SQL rewriting and generate new SQL applicable to the target end.
  • Value processing based on type information: Different types of read-write operations in heterogeneous data sources require special handling based on schema type information.
  • Performance optimization based on schema information: Some performance optimizations can be made based on schema metadata such as primary key,unique key and other information. For example, concurrent model based on pk partition may cause uk conflict issues. Data can be regrouped according to pk and uk information to avoid conflicts and improve performance.

Some insights on managing schema metadata:

  • Decoupling from the kernel and providing independent services: Schema management is suitable to be abstracted as an independent service. It is not suitable for schema management to be coupled with data integration software kernel. The kernel, as a work process responsible for migration and incremental sync, often involves restarts, which are more common in the context of cloud-native. Obtaining schema itself is a very heavy operation, especially in scenarios where many libraries and tables are involved. Reinitializing schema every time the kernel restarts will greatly slow down startup speed. Decoupling from the kernel as an independent service will make the architecture clearer and better suited for data sharing and performance optimization specifically for schema services.
  • Do not use the JDBC metadata class to obtain schema information: It is unreliable to obtain schema metadata through this class because it depends on the implementation of the JDBC driver. Different database vendors have different levels of implementation for the JDBC standard, and some databases may not implement the MetaData interface at all. The correct approach should be to uniformly obtain metadata through system tables of database like information_schema in MySQL.

Designing intermediate type systems.

It is crucial to design a type system that adapts heterogeneous source and dest datasources for data integration software, which brings benefits including:

  • Decoupling the reading and writing betweem source and destination.
  • Providing a unified type system at the kernel level can enable standardized abstraction, resulting in more streamlined and efficient code architecture.

Designing an intermediate type system can implement a unified kernel read-write layer on this basis, decoupling the differences in reading and writing heterogeneous data sources from the kernel.

Data Integration and Stream Computing

At the beginning of the year, reading an article on “Rethinking Stream Processing and Streaming Databases” also prompted me to re-examine and think about what insights data integration software can gain from stream computing engines and stream databases. From a certain perspective, data integration software and stream computing have some similarities. Both deal with processing stream data and are somewhat like ETL/ELT pipelines. However, data integration focuses mainly on reading and writing, essentially just a pipeline for data flow without much computation or very little computation, while stream computing is essentially a computational engine.

From my personal judgment, it is not necessary to introduce a heavy computing layer at the kernel level for data integration software. The computing engine is designed for compute-intensive applications, which data integration software clearly is not due to its responsibilities. The core responsibility of data integration software is heterogeneous data source reading and writing as well as serving as a unified pipeline. This does not mean that data integration software does not require any computation, but rather that there is no need to introduce a heavy computing layer at the kernel level. Data integration software may also involve computation, but generally in less intensive scenarios such as event-driven situations like clipping and cleaning of data. Leave specialized computations to specialized computing engine.

In the future, streaming computing engines will occupy scenarios with extremely low latency and event-driven capabilities, while new types of databases, with their increasing computational power and natural ease of use, will compete for other heavy computing scenarios. Data integration software itself plays the role of a bridge between heterogeneous datasources. Although there will still be some overlap between various data software in the future, they will work together in a coordinated manner and have certain boundaries.

Data Integration and Messaging System

The message system here mainly refers to storage-based message systems such as Kafka and RocketMQ. There are many similarities between message systems and data integration software, both of which serve as pipelines for data streams. The core difference is that the message system is more like a universal bus, responsible for high-throughput data distribution, while the pipeline of data integration is oriented towards specific domains. The inspiration that the message system brings to data integration software mainly comes from storage-based messaging systems like Kafka. This makes me wonder what would happen if data integration software introduced a storage layer?

From my accumulation in this field in recent years, I think it is necessary for data integration software to introduce a loosely coupled storage layer. Data integration software is often designed as a stateless pipeline and does not persist data itself. For data integration software, the biggest problem with completely abandoning storage is** losing control of change data**. Typical problems caused by losing control of change data include:

  • Lost change log: The most common problem we encounter in actual application scenarios is that the user’s own change log on the client side is missing, causing synchronization blockage. Once this happens, it basically requires a complete and incremental operation to recover data.
  • The stability of the data pipeline is uncontrollable: Delay is an inevitable problem in CDC sync scenarios, and often it is constrained by resources and the read/write capabilities of source or dest databases. Because change data cannot be controlled, whether the data pipeline can continue to work stably depends on the database itself. Data integration software can not interfere database to manage and controls change logs.

Overall, introducing storage is meaningful for data integration software. We can design a storage layer that is loosely coupled with the kernel to store change data, which can bring many benefits. These include:

  • Obtain control over change data: The entire data change flow is under the control of the data integration software itself. We can let users decide on the strategy for retaining data, and the CDC capability of the core data integration is no longer affected by internal database mechanisms, achieving a simpler and more stable data pipeline.
  • Shareable change data: Without a storage layer at the level of the data integration software, there is no sharing of change data. If a database’s change log has many downstreams, dumping and parsing this process will be repeated, which is wasteful in terms of performance and network bandwidth.
  • Targeted optimization: Introducing a storage layer makes it possible to do more performance optimizations for the data integration software. For example, optimization work can be carried out from perspectives such as shared-data caching.