Summary

This writeup explains how to copy the contents of one database table to another database table using Apache NiFi. Database vendor is irrelevant; only the JDBC driver is needed. In this writeup, the data is moved from one MySQL table to another MySQL table in the same schema. Only records that do not exist in the destination table are copied.

Disclaimer

There must be one hundred ways of doing this. This is a note to self so that I record how I do this and hoping it helps someone else.

Prerequisites

  1. MySQL Server.
  2. MySQL JDBC Driver.
  3. Apache NiFi.
  4. The sample data used comes from MySQL’s Employees Sample Database. Its freely available on their github page. The file I used is called load_employees.dump.

Configure NiFi to use MySQL JDBC Driver

Download the MySQL JDBC driver and place in NiFi’s lib directory.
For my setup, I placed JAR in /nifi-1.9.2/lib.

Setup the database schema

-- Create the database schema.
DROP SCHEMA IF EXISTS nifi;
CREATE SCHEMA nifi;

-- Select newly created schema to put our tables in
USE nifi;

-- Create the source table.
CREATE TABLE employees_source (
	emp_no		int(11)		NOT NULL,
	birth_date	date		NOT NULL,
	first_name	varchar(14) 	NOT NULL,
	last_name	varchar(16) 	NOT NULL,
	gender		enum('M','F') 	NOT NULL,
	hire_date	date		NOT NULL,
	PRIMARY KEY (emp_no)
);

-- create the destination table.
CREATE TABLE employees_destination (
	emp_no		int(11)		NOT NULL,
	birth_date	date 		NOT NULL,
	first_name	varchar(14) 	NOT NULL,
	last_name	varchar(16) 	NOT NULL,
	gender		enum('M','F') 	NOT NULL,
	hire_date	date		NOT NULL,
	PRIMARY KEY (emp_no)
);

NiFi Configuration

1. QueryDatabaseTable Processor Configuration

From the top menu in NiFi, click the processor icon processor icon and drag it to the main canvas.
On the Add Processor Screen, search and add a QueryDatabaseTable processor.

Add Processor

On the processor window, rigth-click/configure then select Properties to configure this processor.

QueryDatabaseTable Processor

Property  
Database Connection Pooling Service Choose Create new service and press create. Now back on the Configure Processor Window, click the arrow next to your newly created pooling service to configure 1.
Database Type Select MySQL.
Table Name This is the source table where data will be copied from.
Columns to Return Column list for the source table.
Maximun-value Columns This column will be used to keep track of the maximun values as they are copied over. Multiple columns can be defined. Using the numeric primary key sufices for this writeup.

2. SplitAvro Processor Configuration

Go back to the top menu and drap another processor right under our QueryDatabaseTable processor just configured. Search for SplitAvro to add this processor. All the settings for this processor can be left to their default values for this writeup.

3. PutDatabaseRecord Processor Configuration

One last time, add a processor to the canvas. This time, search and add a PutDatabaseRecord processor to the canvas. Just like with our QueryDatabaseTable processor, go to porperties to configure.

PutDatabaseRecord Processor

Property  
Record Reader Choose create new service from dropdown and press create. Now back on the Configure Processor Window, click the arrow next to your newly created avro reader service to configure 2.
Statement Type INSERT
Database Connection Pooling Service Since we are moving data from tables in the same schema, we select the pooling service previously created.
Schema Name The name of your database schema.
Table Name The name of our destination table, employees_destination.

4. Connecting the Processors

At this point, the only thing left is to connect our processors in order of execution.

  1. Mouse over our QueryDatabaseTable processor, then click and drag arrow to our SplitAvro processor.
  2. Mouse over our SplitAvro processor, then click and drag arrow to our PutDatabaseRecord processor.
    For this connection, make sure to select all relationships on the details tab as shown.

avro to put record connection

This is the last step in the NiFi configuration.

Nifi Data Flow

By now, your NiFi data flow should look like this:

nifi data flow

Running NiFi Example

  1. Right-clik and select Play on each of our three processors.
  2. Check the recordcount of each of our tables. It is zero for both.
  3. Edit script load_employees.dump replacing all instances of employees with employees_source to match our source table name.
  4. From the command prompt, load the employee records
    mysql -h172.17.0.3 -uroot -ppassword nifi < load_employees.dump
    
  5. Check the recordcount of each of our tables. It should be zero for employees destination and 300,024 for employees source.
  6. Back on Nifi, note how the GUI represents the data flow from beginning to end. It may take a bit to refresh.
  7. Check the recordcount of each of our tables. It is 300,024 for both.
  8. You can press stop in each processor.

Other things you can do in this workflow

  1. Resetting query state Right-click on the Query Table Processor and select state. Note how Nifi saves the highest value of the primary key. This is how NiFi knows what records to bring (or not). Note that if your primary key is not numeric you would have to make different arragements to keep state in your data flow. You could similarly use an int that is not your primary key. Lastly, note that you can also clear the state if you wanted to move all the data again.
  2. Between processors, you have configurations. For each of these, you can right-click and empty the queue of records in-motion.

There are many other features for each processor and there is a mutitude of processors for all sorts of tasks. Think of these a bit like ANT Tasks to get an idea of the toolset at hand.

Resetting Exercise Fast

  1. Clear NiFi queues and states.
  2. Shut down NiFi.
  3. Rerun SQL snippet from beginning of post to drop and recreate schema with tables.
  4. Start NiFi

Conclusion

This is the quickest Get-Up-And-Going for NiFi I could manage. It provides a succinct guide to creating a simple workflow for moving records from one table to another.

Without much trouble, a second data source could be added to the configuration and we could as easily move data between databases.

NiFi workflows can be scheduled as well. Double-click a processor and note the Scheduling Tab.

NiFi provides a very efficient way of vendor independent data flows. I hope development continues and popularity increases. Thanks for reading.


DbConnectionPool Controller Configuration

From the NiFi Flow Configuration popup, select the gear icon for the pool service and revise the settings to match the following screenshot.

dbcpservice properties

Property  
Database Connection URL DBC connection string to your schema.
Database Driver Class Name db driver name.
Database User Database username.
Database Password Database user’s password.
Valudation Query select/check sql statement.

Back

Avro Reader Service Configuration

From the NiFi Flow Configuration popup, select the gear icon for the avro reader service. Ensure the properties tab looks like this.

split avro controller properties

Back