Apache Flink is a framework and distributed processing engine for stateful computations over batch and streaming data. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale. One of the use cases for Apache Flink is data pipeline applications where data is transformed, enriched, and moved from one storage system to another. Flink provides many connectors to various systems such as JDBC, Kafka, Elasticsearch, and Kinesis.
One of the common sources or destinations is a storage system with a JDBC interface like SQL Server, Oracle, Salesforce, Hive, Eloqua or Google Big Query. With Progress DataDirect JDBC Connectors you can connect to various Relational, Big Data and SaaS sources such as those listed above. In this tutorial we will walk you through how you can read data from the SQL Server, transform the data and write it to your Hive destination data source.
java -jar PROGRESS_DATADIRECT_JDBC_SQLSERVER_ALL.jar
java -jar PROGRESS_DATADIRECT_JDBC_HIVE_ALL.jar
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TypeInformation<?>[] fieldTypes =
new
TypeInformation<?>[] {
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO };
RowTypeInfo rowTypeInfo =
new
RowTypeInfo(fieldTypes);
JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(
"com.ddtek.jdbc.sqlserver.SQLServerDriver"
)
.setDBUrl(
"jdbc:datadirect:sqlserver://localhost:1433;databaseName=Chinook;"
)
.setUsername(
"username"
)
.setPassword(
"password"
)
.setQuery(
"SELECT [AlbumId], [Title] ,[ArtistId] FROM [Chinook].[dbo].[Album]"
)
.setRowTypeInfo(rowTypeInfo)
.finish();
TypeInformation<?>[] fieldTypes2 =
new
TypeInformation<?>[] {
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
};
RowTypeInfo rowTypeInfo2 =
new
RowTypeInfo(fieldTypes2);
JDBCInputFormat jdbcInputFormat2 = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(
"com.ddtek.jdbc.sqlserver.SQLServerDriver"
)
.setDBUrl(
"jdbc:datadirect:sqlserver://localhost:1433;databaseName=Chinook;"
)
.setUsername(
"username"
)
.setPassword(
"password"
)
.setQuery(
"SELECT [ArtistId], [Name] FROM [Chinook].[dbo].[Artist]"
)
.setRowTypeInfo(rowTypeInfo2)
.finish();
DataSet<Row> albumData = env.createInput(jdbcInputFormat);
DataSet<Row> artistData = env.createInput(jdbcInputFormat2);
DataSet<Tuple2<Row, Row>> joinedData = albumData.join(artistData).where(
"f2"
).equalTo(
"f0"
);
@Override
public void flatMap(Tuple2<Row, Row> value, Collector<Row> out) {
Row row =
new
Row(3);
row.setField(0, value.f0.getField(0));
row.setField(1, value.f0.getField(1));
row.setField(2, value.f1.getField(1));
out.collect(row);
}
}
DataSet<
Row
> rowDataSet = joinedData.flatMap(new RowFlat());
JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername(
"com.ddtek.jdbc.hive.HiveDriver"
)
.setDBUrl(
"jdbc:datadirect:hive://hiveserver:10000;"
)
.setUsername(
"username"
)
.setPassword(
"password"
)
.setQuery(
"INSERT INTO albums (albumid, name, artistname) VALUES (?, ?, ?)"
)
.finish();
rowDataSet.output(jdbcOutput);
env.execute();
With Progress DataDirect JDBC Connectors for SQL Server, DB2, Oracle, Salesforce, Hive, Google Big Query etc., you are free to create any data pipeline to any source or sink of your choice without having to worry about connectivity because we do the heavy lifting for you. Feel free to try any of our JDBC Connectors and let us know if you have any questions.