Explore our expert-made templates & start with the right one for you.
Apache Kafka is used to stream messages and events between micro-services and applications, making communicating changes fast and consistent. To collect and process this data for the purpose of analytics and machine learning it’s common practice to store these events in the data lake.
But storing the raw events in the data lake can lead to some challenges. In particular the rate at which events may be flowing will require a fair bit of processing power to write them. Also, to maintain access to fresh and up-to-the-minute data, services that write events to the data lake do not usually batch. This results in many small files that contain only few events per file. Trying to query this data is slow and costly because the raw data is not optimized by default. In this tutorial, you’ll learn how to use Upsolver SQLake to easily read data from a Kafka topic and stage it in a data lake table. SQLake automatically scales to match the volume of events and optimizes the data in the data lake so queries are fast and cost-effective.
Creating a Kafka connection in SQLake is straightforward. Here is the syntax:
[ VERSION = { CURRENT | LEGACY } ]
[ REQUIRE_STATIC_IP = { TRUE | FALSE } ]
[ SSL = { TRUE | FALSE } ]
[ TOPIC_DISPLAY_FILTER[S] = {‘<topic_name>’ | (‘<topic_name>'[, …]) } ]
[ COMMENT = ‘<comment>’ ]
The following is an example showing how to create a Kafka connection using username and password with plain SASL authentication.
CREATE KAFKA CONNECTION <CONNECTION_IDENTIFIER> HOSTS = ('<BOOSTRAP_SEVER1>:<PORT_NUMBER>') CONSUMER_PROPERTIES = ' <BOOTSTRAP_SERVER1>:<PORT_NUMBER> security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<KAFKA_USERNAME>" password="<KAFKA_PASSWORD>"; ssl.endpoint.identification.algorithm=https sasl.mechanism=PLAIN ' VERSION = CURRENT REQUIRE_STATIC_IP = TRUE SSL = FALSE TOPIC_DISPLAY_FILTERS = ('topic1', 'topic2') COMMENT = 'My new Kafka connection';
Typically you would need to configure SSL for your Kafka connection. Please refer to the documentation for more detail on setting this up.
The following is an example showing how to create a Kafka connection using SSL.
CREATE KAFKA CONNECTION my_kafka_connection HOSTS = ( '<bootstrap_server_1>:<port_number>', '<bootstrap_server_2>:<port_number>' ) CONSUMER_PROPERTIES = ' security.protocol=SSL ssl.truststore.location=/opt/kafka.client.truststore.jks ssl.keystore.location=/opt/kafka.client.keystore.jks ssl.keystore.password=<PASSWORD> ssl.key.password=<PASSWORD> ';
Three quick steps are all you need:
Let’s walk through the steps.
To get started quickly, we recommend working with Confluent cloud Kafka because it’s faster and easier to set up. The following SQL creates a Kafka connection in SQLake; substitute your Confluent configuration as needed.
CREATE KAFKA CONNECTION upsolver_kafka_samples HOSTS = ('<hostname>.us-east-1.aws.confluent.cloud:9092') CONSUMER_PROPERTIES = ' bootstrap.servers=<hostname>.us-east-1.aws.confluent.cloud:9092 security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<USERNAME>" password="<PASSWORD>"; ssl.endpoint.identification.algorithm=https sasl.mechanism=PLAIN';
With a connection to Kafka created, you now create the staging data lake table. Since we don’t know the schema of the events we’ll let SQLake automatically detect and populate the metadata in the AWS Glue Data Catalog for us.
CREATE TABLE default_glue_catalog.database_0297c0.staged_kafka_orders () PARTITIONED BY $event_date;
Now create a job to copy events from a Kafka topic and load them into the data lake table. SQLake automatically writes data in Apache Parquet format and compacts small files into larger ones. This improves query performance.
CREATE SYNC JOB load_raw_data_from_kafka_topic_orders START_FROM = BEGINNING CONTENT_TYPE = JSON READER_SHARDS = 2 COMPRESSION = SNAPPY COMMENT = 'Load raw orders data from Kafka topic to a staging table' AS COPY FROM KAFKA upsolver_kafka_samples TOPIC = 'orders' INTO default_glue_catalog.database_0297c0.staged_kafka_orders;
Execute the job and SQLake automatically starts pulling events from Kafka. Note that since we configured START_FROM to BEGINNING, SQLake will attempt to seek and read the firstmost event that is available in the topic. If you prefer to instruct SQLake to read new events starting from now, configure this setting to NOW. Also, the READER_SHARDS property enables you to configure how many readers SQLake will use to consume events. A best practice is 1 reader for every 70 MB/second of traffic ingested per topic.
In some cases, you may want to connect to an external Kafka schema registry that SQLake will use to learn the schema of the events. You can configure the schema registry when creating the ingestion job as follows:
CREATE SYNC JOB load_raw_data_from_kafka_topic_orders START_FROM = BEGINNING CONTENT_TYPE = ( TYPE = AVRO_SCHEMA_REGISTRY SCHEMA_REGISTRY_URL = 'https://<hostname>.confluent.cloud/schema/111' ) READER_SHARDS = 2 COMPRESSION = SNAPPY COMMENT = 'Load raw orders data from Kafka topic to a staging table' AS COPY FROM KAFKA upsolver_kafka_samples TOPIC = 'orders' INTO default_glue_catalog.database_0297c0.staged_kafka_orders;
For more details about the different properties you can configure and what they mean, visit the ingestion job documentation.
After the job is created you can query the staging table directly from SQLake by executing a SELECT query. You can also use your favorite data lake query engine, such as Amazon Athena.
Kafka is a core platform for many companies as a popular way to move data in real-time between applications and services. It’s used to stream events like user visits and clicks, ad impressions, online game activities, and ecommerce purchases. This data is extremely useful for analytics and machine learning, and SQLake makes it easy to read this data from Kafka and store it in the data lake or data warehouse for business analysts, application developers, and data scientists to query.
Get started today for free with sample data or bring your own.
Browsi replaced Spark, Lambda, and EMR with Upsolver’s self-service data integration.
Read case studyironSource operationalizes petabyte-scale streaming data.
Read case studyPeer39 chose Upsolver over Databricks to migrate from Netezza to the Cloud.
Read case studyBigabid chose Upsolver Lookup Tables over Redis and DynamoDB for low-latency data serving.
Read case studyAccelerate data lake queries
Real-time ETL for cloud data warehouse
Build real-time data products
Explore our expert-made templates & start with the right one for you.