Kafka JDBC Source Connector for Oracle – Quick Start Guide
The operating system used in this example is Centos 7 with Oracle 12c database.
Introduction
This is a beginners hands-on tutorial with detailed instructions on how to implement a data stream from Oracle database to Apache Kafka. Also, you will find links to useful resources and some tips that will help you avoid pitfalls I experienced. For simplicity, Kafka is installed on a single node with no data replication or distribution. Kafka Source Connect worker can be installed in distributed or standalone mode. For simplicity as well, standalone mode was an obvious choice.
Installation
Follow these steps to install Confluent Platform. You can choose between open source and enterprise edition. We will use open source version in this example. Confluent Platform provides not only Kafka, but also tools for connecting data sources and sinks. The platform includes:
- · Apache Kafka (Kafka Brokers and Java Client APIs)
- · Clients for C, C++, Python and GO
- · Connectors for JDBC, ElasticSearch and HDFS
- · Schema Registry
- · REST Proxy
Figure 1: Confluent Platform 3.2 Components[1]
[1]http://docs.confluent.io/current/platform.html
Step 1: Check Java version
$ java -version
Install at least Oracle Java version 1.7. You can follow these instructions from Oracle, depending on your OS.
Step 2: Check ports
Make sure these ports are open and available.
Component | Default Port |
Zookeeper | 2181 |
Apache Kafka brokers (plain text) | 9092 |
Schema Registry REST API | 8081 |
REST Proxy | 8082 |
Kafka Connect REST API | 8083 |
Confluent Control Center | 9021 |
Step 3: Prepare repositories
For RHEL, CentOS and Fedora-based distributions execute these commands. For other operating systems you can find instructions here.
Install Confluent’s public key (for checking package signatures)
$ sudo rpm –import http://packages.confluent.io/rpm/3.2/archive.key
Copy the following lines to your /etc/yum.repos.d/ directory in a file named confluent.repo:
[Confluent.dist]
name=Confluent repository (dist)
baseurl=http://packages.confluent.io/rpm/3.2/7
gpgcheck=1
gpgkey=http://packages.confluent.io/rpm/3.2/archive.key
enabled=1
[Confluent]
name=Confluent repository
baseurl=http://packages.confluent.io/rpm/3.2
gpgcheck=1
gpgkey=http://packages.confluent.io/rpm/3.2/archive.key
enabled=1
Clear Yum cache
$ sudo yum clean all
Step 4: Install Confluent Open Source
The number in the end of the package name indicates Scala version (2.10 and 2.11 are currently supported).
$ sudo yum install confluent-platform-oss-2.11
Start Kafka
After you have successfully completed the steps above, you can start Kafka along with Zookeeper and Schema Registry.
Note: Depending on the Kafka version, these files may be in different folders:
/usr/bin/ # Confluent CLI and individual driver scripts for starting/stopping services, prefixed with <package> names
/etc/<package>/ # Configuration files
/usr/share/java/<package>/ # Jars
1. Navigate to the Confluent installation directory:
$ cd /usr/confluent/confluent-3.2.1
2. Run commands in the specified order, each in its own terminal window. I recommend you use sudo privileges.
a) $ bin/zookeeper-server-start etc/kafka/zookeeper.properties
b) $ bin/kafka-server-start etc/kafka/server.properties
c) $ bin/schema-registry-start etc/schema-registry/schema-registry.properties
Now that we have Zookeeper, Kafka and Schema Registry and services running we can test the new Confluent Platform environment. It is important to note that we started these services with their corresponding configuration files: zookeeper.properties, server.properties and schema-registry.properties.
Verify installation
You can verify everything is up and running correctly by creating a topic and then using built-in producer and consumer to store and retrieve test data.
1. Create a new topic with 1 replica and 1 partition (multicom_test_topic)
$ bin/kafka-topics –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic multicom_test_topic –partitions 1
2. Check it is successfully created by displaying all topics in Kafka
$ bin/kafka-topics –list –zookeeper localhost:2181
3. Start console producer (in a new terminal window)
$ bin/kafka-console-producer –broker-list localhost:9092 –topic multicom_test_topic
4. Input some messages, e.g.:
„Hi from Multicom!“
5. Display messages (in a new terminal window)
$ bin/kafka-console-consumer –zookeeper localhost:2181 –topic multicom_test_topic
If you see your messages from the specified topic, then you have successfully installed Confluent Platform J
Delete the test topic
To be able to delete a topic, together with all the data it contains you must first enable deletion by setting a flag in Kafka configuration file
$ sudo nano etc/kafka/server.properties
Set parameter:
delete.topic.enable = True
Then run command:
$ bin/kafka-topics –zookeeper localhost:2181 –delete –topic multicom_test_topic
Note: To purge a topic, you can modify data retention parameter and set e.g. 10ms retention (the default is 604800000ms). But the selected topic won’t be immediately purged, as Kafka needs additional time to refresh its configuration or you’ll need to restart the service.
You can alter the retention parameter with the following command
$ kafka-topics.sh –zookeeper localhost:13003 –alter –topic multicom_test_topic –config retention.ms=10
Don’t forget to set the old parameter value once the topic has been purged.
Connecting to Kafka broker from external systems
To be able to connect to Kafka externally, it is necessary to modify Kafka’s listener and set the correct IP address of the host machine. In the server properties file
$ sudo nano etc/kafka/server.properties
Set the following parameter:
Listeners=PLAINTEXT://host.ip.address:9092
Now you need to use host.ip.address instead of localhost in all the commands mentioned above as well. Make sure you restart all services.
Configuring Kafka Connect JDBC Connector and streaming data from Oracle table into a topic
The main advantage of using Confluent Connector instead of writing a connector using APIs (e.g. in Java) is that it takes significantly less time to set up a stream.
First, you need connector configuration file and worker configuration file.
In directory
/usr/confluent/confluent-3.2.1/etc/kafka-connect-jdbc/
Create a new connector configuration file
$ sudo nano source-quickstart-oracle.properties
Enter the following lines
name=test-source-oracle-jdbc
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:oracle:thin:username/password@db_IP:port:SID
mode=incrementing
incrementing.column.name=enter_your_incrementing_col_name
topic.prefix=enter_desired_topic_prefix_
table.whitelist=enter_table_for_kafka_name
Save and close the new file.
In this example an incrementing column is used to detect new database entries. The table column must be defined as NOT NULL, otherwise you’ll get an exception on producer start. For more information on producer modes check the official Confluent documentation.
Before starting the worker you must install JDBC driver from Oracle. Download the ojdbc7.jar file from here (check the driver version corresponds to your Oracle DB version) and copy it into the CLASSPATH directory. In this case (relative to the installation directory), the path is
./share/java/kafka-connect-jdbc
Modify the worker configuration file
$ sudo nano etc/schema-registry/connect-avro-standalone.properties
Specify the key converter you want to use, i.e. the data format that will be stored in Kafka. Avro is the default format, but you can also use JSON (JsonConverter), comma separated (StringConverter) etc.
- · Io.confluent.connect.AvroConverter – default, Avro converter
- · org.apache.kafka.connect.json.JsonConverter – Json
- · org.apache.kafka.connect.storage.StringConverter – CSV
Note:
You can configure the offset storage file path (offset.storage.file.filename property). The offset storage file contains the latest increment or timestamp from the database table (depends on the mode used). Every time the worker is started, the offset value must be read first so it can continue reading fresh entries from database.
Start the worker (producer)
$ sudo ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-jdbc/source-quickstart-oracle.properties
If everything is set up correctly, a new Kafka topic should be automatically created and data will be copied into Kafka from the beginning.
Verify the producer is working
Start the built-in consumer (in case the data is stored in Avro format)
$ ./bin/kafka-avro-console-consumer –topic test –zookeeper localhost:2181 –from-beginning
Or if the data is stored in JSON or comma delimited format
$ ./bin/kafka-console-consumer –topic test –zookeeper localhost:2181 –from-beginning
If you see Oracle table entries in the console window, everything is working well! In case you omit the from-beginning option, only new rows (created after the consumer is started) will be consumed.
Note:
To start the producer from the beginning, you will need to delete your new topic (as mentioned above) and delete the offset file as well. If you leave the old offset file, the worker will only copy new rows (i.e. rows with greater offset).
Thanks for reading!
Written by:
Aleksandar Tunjić
aleksandar.tunjic@multicom.hr