A collection of custom Kafka Connect components providing additional connectors, converters, and Single Message Transformations (SMTs) for Apache Kafka.
This repository contains custom Kafka Connect add-ons designed to extend Kafka Connect capabilities with specialized functionality for MongoDB integration, Oracle data type conversion, and JSON message transformation.
Components:
- Connectors - Custom sink connectors for external systems
- Converters - Data type converters for specialized formats
- SMTs - Single Message Transformations for record manipulation
Class: org.hifly.kafka.mongo.sink.CustomMongoSinkConnector
A custom MongoDB sink connector with advanced features:
- Upsert Operations: Insert or update documents based on document ID
- Delete Support: Handle tombstone messages (null values) for document deletion
- Array Field Management: Append elements to array fields within documents
- Deduplication: Automatic deduplication of array elements based on configurable key fields
- Field Merging: Intelligent merging of document fields during updates
Use Cases:
- Maintaining aggregated data in MongoDB with unique array elements
- CDC (Change Data Capture) scenarios requiring upserts and deletes
- Event sourcing with array-based event logs
Class: org.hifly.kafka.OracleRawToBsonKeyConverter
Converts byte arrays containing Oracle RAW data to BSON format, enabling seamless integration between Oracle databases and MongoDB through Kafka Connect.
Use Cases:
- Oracle CDC to MongoDB replication
- Converting Oracle RAW keys to BSON ObjectIds
Class: org.hifly.kafka.ByteArrayAndStringConverter
A pass-through converter supporting both byte array and string schema types, providing flexible data handling for keys and values.
Use Cases:
- Mixed data type handling in heterogeneous systems
- Header conversion scenarios
Class: org.hifly.kafka.smt.JsonKeyToValue
Adds the message key to the message value as a new field, useful for denormalization and data enrichment.
Configuration:
valuename: Field name to add the key toidkey: Name of the ID field to compute
Use Cases:
- Embedding record keys in message bodies
- Simplifying downstream processing by denormalizing data
Class: org.hifly.kafka.smt.ExplodeJsonString
Extracts JSON content from a string field and promotes nested JSON fields to top-level fields in the record, creating a proper Struct representation.
Configuration:
valuename: Name of the JSON field to extract and explode
Use Cases:
- Flattening nested JSON structures
- Converting JSON strings to structured data for downstream processing
- Preparing data for systems that don't handle nested JSON well
- Java: JDK 11 or later (main project)
- Maven: 3.6.0 or later
- Apache Kafka: 3.5.0+ (for connectors and SMTs)
- MongoDB: 4.x or later (for CustomMongoSinkConnector)
- Oracle JDBC Driver: 19.3 (for OracleRawToBsonKeyConverter)
The main project contains the converters and SMTs. Note that it requires the Oracle JDBC driver to be installed locally.
Download the Oracle JDBC driver (ojdbc10.jar) and install it to your local Maven repository:
mvn install:install-file \
-Dfile=ojdbc10.jar \
-DgroupId=com.oracle \
-DartifactId=ojdbc10 \
-Dversion=19.3 \
-Dpackaging=jar# Compile the project
mvn clean compile
# Create the package (shaded JAR with dependencies)
mvn clean packageThe output JAR will be located at: target/kafka-connect-extensions-1.9.1.jar
mvn clean testThe MongoDB custom connector is a separate module with its own build configuration.
cd mongo-custom-connector
# Compile the project
mvn clean compile
# Create distribution package
mvn clean compile assembly:singleThe output will be located at: mongo-custom-connector/target/mongo-custom-sink-0.1.0.zip
cd mongo-custom-connector
mvn clean test- Build the main project JAR as described above
- Copy the JAR to your Kafka Connect plugin path:
cp target/kafka-connect-extensions-1.9.1.jar $KAFKA_CONNECT_PLUGIN_PATH- Restart Kafka Connect workers
- Build the connector package as described above
- Extract the distribution:
unzip mongo-custom-connector/target/mongo-custom-sink-0.1.0.zip -d $KAFKA_CONNECT_PLUGINS/- Restart Kafka Connect workers
{
"name": "my-connector",
"config": {
"connector.class": "...",
"transforms": "addKey",
"transforms.addKey.type": "org.hifly.kafka.smt.JsonKeyToValue",
"transforms.addKey.valuename": "recordId",
"transforms.addKey.idkey": "id"
}
}{
"name": "my-connector",
"config": {
"connector.class": "...",
"transforms": "explode",
"transforms.explode.type": "org.hifly.kafka.smt.ExplodeJsonString",
"transforms.explode.valuename": "jsonData"
}
}{
"name": "mongo-sink-custom",
"config": {
"connector.class": "org.hifly.kafka.mongo.sink.CustomMongoSinkConnector",
"tasks.max": "1",
"topics": "your-topic",
"connection.uri": "mongodb://localhost:27017",
"database": "your-database",
"collection": "your-collection",
"doc.array.field.name": "events",
"doc.id.name": "userId",
"doc.array.field.dedup.keys": "eventId,timestamp",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}{
"name": "oracle-to-mongo",
"config": {
"connector.class": "...",
"key.converter": "org.hifly.kafka.OracleRawToBsonKeyConverter"
}
}The converters and SMTs are designed to run within Kafka Connect. To test them:
- Build the project
- Configure a Kafka Connect connector with the desired component
- Deploy to Kafka Connect (Standalone or Distributed mode)
Standalone Mode:
$KAFKA_HOME/bin/connect-standalone.sh \
config/connect-standalone.properties \
config/your-connector.propertiesDistributed Mode:
# Start Kafka Connect
$KAFKA_HOME/bin/connect-distributed.sh config/connect-distributed.properties
# Deploy connector via REST API
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @your-connector-config.jsonThe MongoDB connector can be tested using the unit tests included in the project, or deployed to a running Kafka Connect cluster as shown above.
Contributions are welcome! Please feel free to submit issues or pull requests.
This project is available under the MIT License.
For issues, questions, or contributions, please visit the GitHub repository.