Kafka2

The operating system used in this example is Centos 7 with Oracle 12c database.

 

Introduction

This is a beginners hands-on tutorial with detailed instructions on how to implement a data stream from Oracle database to Apache Kafka. Also, you will find links to useful resources and some tips that will help you avoid pitfalls I experienced.  For simplicity, Kafka is installed on a single node with no data replication or distribution. Kafka Source Connect worker can be installed in distributed or standalone mode. For simplicity as well, standalone mode was an obvious choice.

Installation

Follow these steps to install Confluent Platform. You can choose between open source and enterprise edition. We will use open source version in this example. Confluent Platform provides not only Kafka, but also tools for connecting data sources and sinks. The platform includes:

  • ·        Apache Kafka (Kafka Brokers and Java Client APIs)
  • ·        Clients for C, C++, Python and GO
  • ·        Connectors for JDBC, ElasticSearch and HDFS
  • ·        Schema Registry
  • ·        REST Proxy

Kafka 1

Figure 1: Confluent Platform 3.2 Components[1]



[1]http://docs.confluent.io/current/platform.html

Step 1: Check Java version

$ java -version

 

Install at least Oracle Java version 1.7. You can follow these instructions from Oracle, depending on your OS.

 

Step 2: Check ports

Make sure these ports are open and available.

 

Component

Default Port

Zookeeper

2181

Apache Kafka brokers (plain text)

9092

Schema Registry REST API

8081

REST Proxy

8082

Kafka Connect REST API

8083

Confluent Control Center

9021

 

 

Step 3: Prepare repositories

For RHEL, CentOS and Fedora-based distributions execute these commands. For other operating systems you can find instructions here.

Install Confluent’s public key (for checking package signatures)

$ sudo rpm –import http://packages.confluent.io/rpm/3.2/archive.key

 

Copy the following lines to your /etc/yum.repos.d/ directory in a file named confluent.repo:

 

[Confluent.dist]
name=Confluent repository (dist)
baseurl=http://packages.confluent.io/rpm/3.2/7
gpgcheck=1
gpgkey=http://packages.confluent.io/rpm/3.2/archive.key
enabled=1

[Confluent]
name=Confluent repository
baseurl=http://packages.confluent.io/rpm/3.2
gpgcheck=1
gpgkey=http://packages.confluent.io/rpm/3.2/archive.key
enabled=1

Clear Yum cache

$ sudo yum clean all

Step 4: Install Confluent Open Source

 

The number in the end of the package name indicates Scala version (2.10 and 2.11 are currently supported).

$ sudo yum install confluent-platform-oss-2.11

 

Start Kafka

After you have successfully completed the steps above, you can start Kafka along with Zookeeper and Schema Registry.

Note: Depending on the Kafka version, these files may be in different folders:

/usr/bin/                  # Confluent CLI and individual driver scripts for    starting/stopping services, prefixed with <package> names

/etc/<package>/            # Configuration files

/usr/share/java/<package>/ # Jars

 

1.      Navigate to the Confluent installation directory:

          $ cd /usr/confluent/confluent-3.2.1

2.      Run commands in the specified order, each in its own terminal window. I recommend you use sudo privileges.

a)  $ bin/zookeeper-server-start etc/kafka/zookeeper.properties
b)  $ bin/kafka-server-start etc/kafka/server.properties
c)  $ bin/schema-registry-start etc/schema-registry/schema-registry.properties

Now that we have Zookeeper, Kafka and Schema Registry and services running we can test the new Confluent Platform environment. It is important to note that we started these services with their corresponding configuration files: zookeeper.properties, server.properties and schema-registry.properties.

 

Verify installation

You can verify everything is up and running correctly by creating a topic and then using built-in producer and consumer to store and retrieve test data.

 

1.      Create a new topic with 1 replica and 1 partition (multicom_test_topic)

$ bin/kafka-topics –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic multicom_test_topic –partitions 1

2.      Check it is successfully created by displaying all topics in Kafka

$ bin/kafka-topics –list –zookeeper localhost:2181

3.      Start console producer (in a new terminal window)

$ bin/kafka-console-producer –broker-list localhost:9092 –topic multicom_test_topic

4.      Input some messages, e.g.:

 

„Hi from Multicom!“

 

5.      Display messages (in a new terminal window)

$ bin/kafka-console-consumer –zookeeper localhost:2181 –topic multicom_test_topic

 

If you see your messages from the specified topic, then you have successfully installed Confluent Platform J

 

Delete the test topic

To be able to delete a topic, together with all the data it contains you must first enable deletion by setting a flag in Kafka configuration file

$ sudo nano etc/kafka/server.properties

Set parameter:

delete.topic.enable = True

 

Then run command:

$ bin/kafka-topics –zookeeper localhost:2181 –delete –topic multicom_test_topic

 

Note: To purge a topic, you can modify data retention parameter and set e.g. 10ms retention (the default is 604800000ms). But the selected topic won’t be immediately purged, as Kafka needs additional time to refresh its configuration or you’ll need to restart the service.

You can alter the retention parameter with the following command

$ kafka-topics.sh –zookeeper localhost:13003 –alter –topic multicom_test_topic –config retention.ms=10

Don’t forget to set the old parameter value once the topic has been purged.

 

Connecting to Kafka broker from external systems

To be able to connect to Kafka externally, it is necessary to modify Kafka’s listener and set the correct IP address of the host machine. In the server properties file

$ sudo nano etc/kafka/server.properties

Set the following parameter:

Listeners=PLAINTEXT://host.ip.address:9092

Now you need to use host.ip.address instead of localhost in all the commands mentioned above as well. Make sure you restart all services.

 

Configuring Kafka Connect JDBC Connector and streaming data from Oracle table into a topic

The main advantage of using Confluent Connector instead of writing a connector using APIs (e.g. in Java) is that it takes significantly less time to set up a stream.

First, you need connector configuration file and worker configuration file.

In directory

/usr/confluent/confluent-3.2.1/etc/kafka-connect-jdbc/

Create a new connector configuration file

$ sudo nano source-quickstart-oracle.properties

 

Enter the following lines

name=test-source-oracle-jdbc

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector

tasks.max=1

connection.url=jdbc:oracle:thin:username/password@db_IP:port:SID

mode=incrementing

incrementing.column.name=enter_your_incrementing_col_name

topic.prefix=enter_desired_topic_prefix_

table.whitelist=enter_table_for_kafka_name

 

Save and close the new file.

 

In this example an incrementing column is used to detect new database entries. The table column must be defined as NOT NULL, otherwise you’ll get an exception on producer start. For more information on producer modes check the official Confluent documentation.

Before starting the worker you must install JDBC driver from Oracle. Download the ojdbc7.jar file from here (check the driver version corresponds to your Oracle DB version) and copy it into the CLASSPATH directory. In this case (relative to the installation directory), the path is

./share/java/kafka-connect-jdbc

Modify the worker configuration file

$ sudo nano etc/schema-registry/connect-avro-standalone.properties

Specify the key converter you want to use, i.e. the data format that will be stored in Kafka. Avro is the default format, but you can also use JSON (JsonConverter), comma separated (StringConverter) etc.

  • ·        Io.confluent.connect.AvroConverter – default, Avro converter
  • ·        org.apache.kafka.connect.json.JsonConverter – Json
  • ·        org.apache.kafka.connect.storage.StringConverter – CSV

 

Note:

You can configure the offset storage file path (offset.storage.file.filename property). The offset storage file contains the latest increment or timestamp from the database table (depends on the mode used). Every time the worker is started, the offset value must be read first so it can continue reading fresh entries from database.

Start the worker (producer)

$ sudo ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-jdbc/source-quickstart-oracle.properties

 

If everything is set up correctly, a new Kafka topic should be automatically created and data will be copied into Kafka from the beginning.

 

Verify the producer is working

Start the built-in consumer (in case the data is stored in Avro format)

$ ./bin/kafka-avro-console-consumer –topic test –zookeeper localhost:2181 –from-beginning

 

Or if the data is stored in JSON or comma delimited format

$ ./bin/kafka-console-consumer –topic test –zookeeper localhost:2181 –from-beginning

 

If you see Oracle table entries in the console window, everything is working well! In case you omit the from-beginning option, only new rows (created after the consumer is started) will be consumed.

 

Note:

To start the producer from the beginning, you will need to delete your new topic (as mentioned above) and delete the offset file as well. If you leave the old offset file, the worker will only copy new rows (i.e. rows with greater offset).

 

Thanks for reading!

Written by:

Aleksandar Tunjić

aleksandar.tunjic@multicom.hr

 

2589

Categories: Blog