Importing data from REST APIs into Apache® Kafka® topics generally involves writing a custom Kafka producer to read the data from the REST API and writing it in to topics. If you are dealing with multiple REST endpoints, responses, and authentications this can get complex quickly. In this article, we’ll look at how you can accelerate your data integration from REST APIs using Apache Kafka.
Kafka offers several different types of connectors out of the box - including the very popular JDBC connector. By themselves, we know that JDBC connectors can't connect to REST APIs, but with Progress DataDirect Autonomous REST Connector, you can connect to and query any REST API using SQL, without writing single line of code.
In this tutorial, we will show you how you can use Autonomous REST Connector to import data from a stock market research API called Alpha Vantage. The data from the API is in a time series format and gets updated every 60 seconds. We will use Autonomous REST Connector to import the data from this API every 60 seconds for Progress (PRGS) stock into Kafka topics. Feel free, however, to use any stock symbol you’d prefer.
tar -xvf confluent-community-5.1.1-2.11.tar.gz
Install the connector by running the setup executable file on your machine and following the instructions on the installer.
/home/<
user
>/Progress/DataDirect/JDBC_60/lib/autorest.jar
/<
path-to
>/confluent-5.1.1/share/java/kafka-connect-jdbc/autorest.jar
https:
//www.alphavantage.co/query?function=TIME_SERIES_INTRADAY&symbol=PRGS&interval=1min&apikey=<apikey>
jdbc:datadirect:autorest:sample=https:
//www.alphavantage.co/query?function=TIME_SERIES_INTRADAY&symbol=PRGS&interval=1min&apikey=<apikey>
{
"query"
:{
"#path"
:[
],
"Meta Data"
:{
"1. Information"
:
"VarChar(84)"
,
"2. Symbol"
:
"VarChar(64),#key"
,
"3. Last Refreshed"
:
"Timestamp(0)"
,
"4. Interval"
:
"VarChar(64)"
,
"5. Output Size"
:
"VarChar(64)"
,
"6. Time Zone"
:
"VarChar(64)"
},
"Time Series (1min){Timestamp(0)}"
:{
"1. open"
:
"Double"
,
"2. high"
:
"Double"
,
"3. low"
:
"Double"
,
"4. close"
:
"Double"
,
"5. volume"
:
"Integer"
},
"function"
:{
"#type"
:
"VarChar(64),#key"
,
"#virtual"
:
true
,
"#default"
:
"TIME_SERIES_INTRADAY"
,
"#eq"
:
"function"
},
"symbol"
:{
"#type"
:
"VarChar(64),#key"
,
"#virtual"
:
true
,
"#default"
:
"MSFT"
,
"#eq"
:
"symbol"
},
"interval"
:{
"#type"
:
"VarChar(64),#key"
,
"#virtual"
:
true
,
"#default"
:
"1min"
,
"#eq"
:
"interval"
},
"apikey"
:{
"#type"
:
"VarChar(64),#key"
,
"#virtual"
:
true
,
"#default"
:
"<api-key>"
,
"#eq"
:
"apikey"
}
}
}
SELECT
*
FROM
TIMESERIES1MIN
SELECT
*
FROM
TIMESERIES1MIN
WHERE
QUERY_SYMBOL=
'AAPL'
/<
path-to
>/Confluent-5.1.1/alphavantage.rest
/<path-to>/Confluent-5.1.1/source_autorest.json
{
"name"
:
"source_autorest"
,
"config"
: {
"connector.class"
:
"io.confluent.connect.jdbc.JdbcSourceConnector"
,
"connection.url"
:
"jdbc:datadirect:autorest:config=/home/progress/confluent-5.1.1/alphavantage.rest"
,
"query"
:
"SELECT * FROM TIMESERIES1MIN"
,
"mode"
:
"timestamp"
,
"timestamp.column.name"
:
"KEY"
,
"topic.prefix"
:
"PRGS1MIN"
}
}
We set the mode to timestamp and timestamp.column.name to KEY. Kafka uses this column to keep track of the data coming in from the REST API. By default, the poll interval is set to 5 seconds, but you can set it to 1 second if you prefer using the poll.interval.ms configuration option.
In the above configuration, change the config file path for alphavantage.rest to its location on your machine in the connection.url.
confluent start
confluent load source_autorest -d source_autorest.json
Check the status of the connector by running the following command. You should see a response as shown below if it is running properly.
confluent status connectors
Next, check the list of topics. You should find a new topic named PRGS1MIN.
kafka-topics --list --zookeeper localhost:2181
kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic PRGS1MIN --from-beginning --property schema.registry.url=http:
//localhost:8081
With Progress DataDirect Autonomous REST Connector, you don’t need to worry about interfacing with different REST endpoints. All you have to do is point the endpoint to Autonomous REST Connector and it provides you the interfacing using JDBC. You can now use this connectivity not only with Kafka but with any application that has ODBC/JDBC support. Feel free to contact us if you have any questions about the Progress DataDirect Autonomous REST Connector.