Skip to main content

Kafka

Target DataSource:

Connection

Basic Functions

FunctionDescription
Schema Migration

If the specified Topic after mapping does not exist in the Target, BladePipe will automatically create the Topic, allowing setting the number of partitions.

Incremental Data Sync

Allow subscribing to messages from the source Topic.

Subscription Modification

Add, delete, or modify the subscribed topics. For more information, see Modify Subscription.

Position Resetting

Reset positions by timestamp to consume the data in a past period again.

Advanced Functions

FunctionDescription
Message Format

The following message formats are supported. See Message Format.

  • CloudCanal format
  • AlibabaCanal format

Limits

LimitDescription
Creating Tables in the Target in Advance

Only support automatic Topic creation for messages.

Raw Message Format

Only support raw message replication from Kafka to Kafka, and Raw Message Format needs to be selected at both the Source and the Target.


Source

Prerequisites

PrerequisiteDescription
Port Preparation

Allow the migration and sync node (Worker) to connect to the Kafka ports.

Parameters

ParameterDescription
schemaFormat

MQ Message format. For more information, see Message Format.

consumerGroupId

Kafka consumer group ID.

consumeParallel

Degree of consuming Kafka topics in parallel.

sessionTimeoutMs

Kafka session timeout in milliseconds.

maxPollRecords

Maximum number of messages fetched in one poll from Kafka.

dbHeartbeatIntervalSec

Interval for initiating heartbeat on the source database.

dbHeartbeatToleranceStep

The threshold of gap between the latest offset and the current offset. If the actual gap is bigger than the threshold, BladePipe won‘t send heartbeat message.

Tips: To modify the general parameters, see General Parameters and Functions.


Target

Prerequisites

PrerequisiteDescription
Port Preparation

Allow the migration and sync node (Worker) to connect to the Kafka ports.

Parameters

ParameterDescription
schemaFormat

Message format. For more information, see Message Format.

batchWriteSize

The maximum data size of a single message. If the size exceeds the limit, the message will be split.

defaultTopic

Messages that cannot find a corresponding topic are sent to this topic (such as adding a new table)

ddlTopic

A topic specifically used to receive DDL events. If it is empty, the DDL events will be sent to the 0th partition of the corresponding topic.

compressionType

Kafka compression.type parameter to set compression algorithm. Support GZIP, SNAPPY, LZ4, ZSTD.

batchSize

Kafka batch.size parameter.

acks

Kafka acks parameter. By default, it is all.

maxRequestBytes

Kafka max.request.size parameter.

lingerMs

Kafka linger.ms parameter. By default, it is 1.

envelopSchemaInclude

When schemaFormat is set to DEBEZIUM_ENVELOP_JSON_FOR_MQ, it means whether the message body contains schema information.

Tips: To modify the general parameters, see General Parameters and Functions.

Connection

Basic Functions

Advanced Functions

Limits

Example

FAQ

Source

Prerequisites

Parameters

Target

Prerequisites

Parameters