Skip to main content

Quick Start

Create a topic

kafka-topics --create --topic file-chunk-topic --partitions 1 --bootstrap-server localhost:9092

Create directories to upload and download files

cd /tmp
rm -rf upload download
mkdir upload download

Start Confluent Platform

confluent local services start

Install the plugins

 confluent hub install streamsend-io/file-chunk-sink:latest
confluent hub install streamsend-io/file-chunk-source:latest

Create uploader.json

{
"name": "uploader"
,"config":{
"topic": "file-chunk-topic"
, "connector.class": "com.github.streamsend.file.chunk.source.ChunkSourceConnector"
, "files.dir": "/tmp/upload"
, "input.file.pattern": ".*"
, "tasks.max": "1"
, "file.minimum.age.ms" : "2000"
, "binary.chunk.size.bytes" : "300000"
, "cleanup.policy.maintain.relative.path": "true"
, "input.path.walk.recursively" : "true"
, "finished.file.retention.mins": "10"
, "topic.partitions": "1"
}}

Create downloader.json

{
"name": "downloader"
,"config":{
"topics": "file-chunk-topic"
, "connector.class": "com.github.streamsend.file.chunk.sink.ChunkSinkConnector"
, "tasks.max": "1"
, "files.dir": "/tmp/download"
, "binary.chunk.size.bytes" : "300000"
, "auto.register.schemas": "false"
, "schema.ignore": "true"
, "schema.generation.enabled": "false"
," key.converter.schemas.enable": "false"
,"value.converter.schemas.enable": "false"
, "schema.compatibility": "NONE"
, "topic.partitions": "1"
}}


Start the connectors

If restarting, clear contents of the /tmp/upload directory first.

Test a Pipeline

The uploader will chunk and stream any files (input.file.pattern": ".*") in /tmp/upload using chunks of 300KB size.

Start with a small file:

cp /var/log/install.log /tmp/upload

Wait a few seconds for the file to complete processing.

Examine the contents of /tmp/download/merged to confirm that "install.log" has been consumed and merged.

Diff the file to confirm that it is itentical to the uploaded file.

To test a pipeline, set source connector property "generate.test.files"="true" - this will generate a test file ranging in size from 0 to 2500 bytes, every five seconds. The test file is detected, chunked and streamed to the Downloader. When generating test files, the binary.chunk.size.bytes is automatically reset to "500".