Big data comprises datasets that are massive, varied, complex, and can't be handled traditionally. Big data can include both structured and unstructured data, and it is often stored in data lakes or data warehouses. As organizations grow, big data becomes increasingly more crucial for gathering business insights and analytics. The Big Data Zone contains the resources you need for understanding data storage, data modeling, ELT, ETL, and more.
Elasticsearch is a highly scalable, open-source search engine and analytics platform designed to handle large amounts of data. It is built on top of Apache Lucene, a high-performance text search engine, and provides a distributed and easy-to-use solution for storing, searching, and analyzing large volumes of data. In this article, we will explore the use of Elasticsearch and its key features, including indexing, searching, and aggregations. Indexing One of the most important features of Elasticsearch is its ability to index data. The indexing API is simple to use and accepts JSON documents, which are then stored in an index. An index is a collection of documents that share similar characteristics, and can be thought of as a table in a relational database. For example, you can create an index for customer information, another for product information, and so on. Example To index a document into Elasticsearch, you can use the following command: JSON PUT /customer/doc/1 { "first_name": "John", "last_name": "Doe", "age": 35, "email": "john.doe@example.com" } Searching Another important feature of Elasticsearch is its ability to search data. The search API is rich and flexible, allowing you to search for documents based on full-text, keyword, and numeric fields. You can also apply filters, facets, and aggregations to your search queries to get more advanced results. Example To search for all documents that contain the word “John” in the first_name field, you can use the following command:bash: JSON GET/customer/_search { "query": { "match": { "first_name": "John" } } } Aggregations In addition to searching, Elasticsearch provides a powerful aggregation framework that enables you to perform complex data analysis. Aggregations can be used to calculate statistics, such as the average, sum, or count of values, or perform complex operations, such as finding the most frequently used words in a set of documents. Example To find the average age of all customers, you can use the following command: JSON GET/customer/_search { "size": 0, "aggs": { "avg_age": { "avg": { "field": "age" } } } } Complex Search Use Cases Geo-Spatial Search Elasticsearch provides support for geo-spatial search, enabling you to search for documents based on their geographic location. Example You can search for all customers located within a certain distance from a given location: JSON GET/customer/_search { "query": { "bool": { "must": { "match_all": { } }, "filter": { "geo_distance": { "distance": "10km", "location": { "lat": 40.748817, "lon": -73.985428 } } } } } } Faceted Search Faceted search is a popular search paradigm that enables users to filter search results based on specific criteria. In Elasticsearch, you can use the aggregation framework to perform a faceted search, which allows you to group your data into categories and then calculate statistics for each category. Example Suppose you have an e-commerce website that sells books, and you want to allow users to filter books by category and price range. You can use the following command to perform a faceted search that returns the number of books in each category and price range: JSON GET/books/_search { "size": 0, "aggs": { "categories": { "terms": { "field": "category" } }, "price_ranges": { "range": { "field": "price", "ranges": [ { "to": 50 }, { "from": 50, "to": 100 }, { "from": 100 } ] } } } } Multifield Search In some cases, you may want to search multiple fields at once. Example You may want to search for books that match the author’s name or the title. In Elasticsearch, you can use the multi-match query to search multiple fields at once: JSON GET/books/_search { "query": { "multi_match": { "query": "The Great Gatsby", "fields": [ "title", "author" ] } } } Nested Objects Search In Elasticsearch, you can store nested objects within a document. Example You can store multiple authors for a book or multiple addresses for a customer. To search for documents that contain specific nested objects, you can use the nested query: JSON GET/books/_search { "query": { "nested": { "path": "authors", "query": { "match": { "authors.name": "F. Scott Fitzgerald" } } } } } Conclusion Elasticsearch is a powerful tool for managing, storing, and analyzing large volumes of data. Its rich API and aggregation framework make it possible to perform complex data analysis, including full-text search, faceted search, and geo-spatial search. Whether you are building a search engine, an e-commerce platform, or a data analytics application, Elasticsearch provides a scalable and flexible solution for your needs.
Data streaming emerged as a new software category. It complements traditional middleware, data warehouse, and data lakes. Apache Kafka became the de facto standard. New players enter the market because of Kafka’s success. One of those is Redpanda, a lightweight Kafka-compatible C++ implementation. This article explores the differences between Apache Kafka and Redpanda, when to choose which framework, and how the Kafka ecosystem, licensing, and community adoption impact a proper evaluation. Data Streaming: A New Software Category Data-driven applications are the new black. As part of this, data streaming is a new software category. If you don’t understand yet how it differs from other data management platforms like a data warehouse or data lake, check out the following article series: Data Warehouse vs. Data Lake vs. Data Streaming – Friends, Enemies, Frenemies? Data Streaming for Data Ingestion into the Data Warehouse and Data Lake Data Warehouse Modernization: From Legacy On-Premise to Cloud-Native Infrastructure Case Studies: Cloud-native Data Streaming for Data Warehouse Modernization Apache Kafka: The De Facto Standard for Data Streaming Apache Kafka became the de facto standard for data streaming similar to Amazon S3 is the de facto standard for object storage. Kafka is used across industries for many use cases. The Adoption Curve of Apache Kafka The growth of the Apache Kafka community in the last years is impressive: >100,000 organizations using Apache Kafka >41,000 Kafka meetup attendees >32,000 Stack Overflow questions >12,000 Jiras for Apache Kafka >31,000 open job listings request Kafka skills Look at the increased number of active monthly unique users downloading the Kafka Java client library with Maven: Source: Sonatype The numbers grow exponentially. That’s no surprise to me as the adoption pattern and maturity curve for Kafka are similar in most companies: Start with one or few use cases (that prove the business value quickly). Deploy the first applications to production and operate them 24/7. Tap into the data streams from many domains, business units, and technologies. Move to a strategic central nervous system with a decentralized data hub. Kafka Use Cases by Business Value Across Industries The main reason for the incredible growth of Kafka’s adoption curve is the variety of potential use cases for data streaming. The potential is almost endless. Kafka’s characteristics of combing low latency, scalability, reliability, and true decoupling, establish benefits across all industries and use cases: The Emergence of Many Kafka Vendors The market for data streaming is enormous. With so many potential use cases, it is no surprise that more and more software vendors add Kafka support to their products. Most vendors use Kafka or implement its protocol because Kafka has become the de facto standard for data streaming. To be clear: An increasing number of Kafka vendors is a great thing! It proves the creation of a new software category. Competition pushes innovation. The market share is big enough for many vendors. And, I am 100% convinced that we are still in a very early stage of the data streaming hype cycle... After a lengthy introduction to set the context, let’s review a new entrant into the Kafka market: Redpanda. Introducing Redpanda: Kafka-Compatible Data Streaming Redpanda is a data streaming platform. Its website explains its positioning in the market and product strategy as follows (to differentiate it from Apache Kafka): No Java: A JVM-free and ZooKeeper-free infrastructure. Designed in C++: Designed for a better performance than Apache Kafka. A single-binary architecture: No dependencies to other libraries or nodes. Self-managing and self-healing: A simple but scalable architecture for on-premise and cloud deployments. Kafka-compatible: Out-of-the-box support for the Kafka protocol with existing applications, tools, and integrations. This sounds great. You need to evaluate whether Redpanda is the right choice for your next project or if you should stick with “real Apache Kafka.” How To Choose the Proper “Kafka” Implementation for Your Project A recommendation that some people find surprising: Qualify out first! That’s much easier. Similarly, like I explained when NOT to use Apache Kafka. As part of the evaluation, the question is if Kafka is the proper protocol for you. And for Kafka, pick different offerings and begin with the comparison. Start your evaluation with the business case requirements and define your most critical needs like uptime SLAs, disaster recovery strategy, enterprise support, operations tooling, self-managed vs. fully-managed cloud service, capabilities like messaging vs. data ingestion vs. data integration vs. applications, and so on. Based on your use cases and requirements, you can start qualifying out vendors like Confluent, Redpanda, Cloudera, Red Hat / IBM, Amazon MSK, Amazon Kinesis, Google Pub Sub, and others to create a shortlist. The following sections compare the open-source project Apache Kafka versus the re-implementation of the Kafka protocol of Redpanda. You can use these criteria (and information from other blogs, articles, videos, and so on) to evaluate your options. Similarities Between Redpanda and Apache Kafka The high-level value propositions are the same in Redpanda and Apache Kafka: Data streaming to process data in real-time at scale continuously. Decouple applications and domains with a distributed storage layer. Integrate with various data sources and data sinks. Leverage stream processing to correlate data and take action in real-time. Self-managed operations or consuming a fully-managed cloud offering. However, the devil is in the details and facts. Don’t trust marketing, but look deeper into the various products and cloud services. Deployment Options: Self-Managed vs. Cloud Service Data streaming is required everywhere. While most companies across industries have a cloud-first strategy, some workloads must stay at the edge for different reasons: Cost, latency, or security requirements. Besides operating Redpanda by yourself, you can buy Redpanda as a product and deploy it in your environment. Instead of self-hosting Redpanda, you can deploy it as a data plane in your environment using Kubernetes (supported by the vendor’s external control plane) or leverage a cloud service (fully managed by the vendor). The different deployment options for Redpanda are great. Pick what you need. This is very similar to Confluent’s deployment options for Apache Kafka. Some other Kafka vendors only provide either self-managed (e.g., Cloudera) or fully managed (e.g., Amazon MSK Serverless) deployment options. What I miss from Redpanda: No official documentation about SLAs of the cloud service and enterprise support. I hope they do better than Amazon MSK (excluding Kafka support from their cloud offerings). I am sure you will get that information if you reach out to the Redpanda team, who will probably soon incorporate some information into their website. Bring Your Own Cluster (BYOC) There is a third option besides self-managing a data streaming cluster and leveraging a fully managed cloud service: Bring your own Cluster (BYOC). This alternative allows end users to deploy a solution partially managed by the vendor in your own infrastructure (like your data center or your cloud VPC). Here is Redpanda’s marketing slogan: “Redpanda clusters hosted on your cloud, fully managed by Redpanda, so that your data never leaves your environment.” This sounds very appealing in theory. Unfortunately, it creates more questions and problems than it solves: How does the vendor access your data center or VPC? Who decides how and when to scale a cluster? When to act on issues? How and when do you roll a cluster to incorporate bug fixes or version upgrades? What about cost management? What is the total cost of ownership? How much value does the vendor solution bring? How do you guarantee SLAs? Who has to guarantee them, you or the vendor? For regulated industries, how are security controls and compliance supported? How are you sure about what the vendor does in an environment you ostensibly control? How much harder will a bespoke third-party risk assessment be if you aren’t using pure SaaS? For these reasons, cloud vendors only host managed services in the cloud vendor's environment. Look at Amazon MSK, Azure Event Hubs, Google Pub Sub, Confluent Cloud, etc. All fully managed cloud services are only in the VPC of the vendor for the above reasons. There are only two options: Either you hand over the responsibility to a SaaS offering or control it yourself. Everything in the middle is still your responsibility in the end. Community vs. Commercial Offerings The sales approach of Redpanda looks almost identical to how Confluent sells data streaming. A free community edition is available, even for production usage. The enterprise edition adds enterprise features like tiered storage, automatic data balancing, or 24/7 enterprise support. No surprise here. And a good strategy, as data streaming is required everywhere for different users and buyers. Technical Differences Between Apache Kafka and Redpanda There are plenty of technical and non-functional differences between Apache Kafka products and Redpanda. Keep in mind that Redpanda is NOT Kafka. Redpanda uses the Kafka protocol. This is a small but critical difference. Let’s explore these details in the following sections. Apache Kafka vs. Kafka Protocol Compatibility Redpanda is NOT an Apache Kafka distribution like Confluent Platform, Cloudera, or Red Hat. Instead, Redpanda re-implements the Kafka protocol to provide API compatibility. Being Kafka-compatible is not the same as using Apache Kafka under the hood, even if it sounds great in theory. Two other examples of Kafka-compatible offerings: Azure Event Hubs: A Kafka-compatible SaaS cloud service offering from Microsoft Azure. The service itself works and performs well. However, its Kafka compatibility has many limitations. Microsoft lists a lot of them on its website. Some limitations of the cloud service are the consequence of a different implementation under the hood, like limited retention time and message sizes. Apache Pulsar: An open-source framework competing with Kafka. The feature set overlaps a lot. Unfortunately, Pulsar often only has good marketing for advanced features to compete with Kafka or to differentiate. One example is its Kafka mapper to be compatible with the Kafka protocol. Contrary to Azure Event Hubs, as a serious implementation (with some limitations), Pulsar’s compatibility wrapper provides a basic implementation that is compatible with only minor parts of the Kafka protocol. So, while the alleged “Kafka compatibility” sounds nice on paper, one shouldn’t seriously consider this for migrating your running Kafka infrastructure to Pulsar. We have seen compatible products for open-source frameworks in the past. Re-implementations are usually far away from being complete and perfect. For instance, MongoDB compared the official open source protocol to its competitor Amazon DocumentDB to pinpoint the fact that DocumentDB only passes ~33% of the MongoDB integration test chain. In summary, it is totally fine to use these non-Kafka solutions like Azure Event Hubs, Apache Pulsar, or Redpanda for a new project if they fulfill your requirements better than Apache Kafka. But, keep in mind that it is not Kafka. There is no guarantee that additional components from the Kafka ecosystem (like Kafka Connect, Kafka Streams, REST Proxy, and Schema Registry) behave the same when integrated with a non-Kafka solution that only uses the Kafka protocol with its own implementation. How Good Is Redpanda’s Kafka Protocol Compatibility? Frankly, I don’t know. Probably and hopefully, Redpanda has better Kafka compatibility than Pulsar. The whole product is based on this value proposition. Hence, we can assume the Redpanda team spends plenty of time on compatibility. Redpanda has NOT achieved 100% API compatibility yet. Time will tell when we see more case studies from enterprises across industries that migrated some Apache Kafka projects to Redpanda and successfully operated the infrastructure for a few years. Why wait a few years to see? Well, I compare it to what I see from people starting with Amazon MSK. It is pretty easy to get started. However, after a few months, the first issues happen. Users find out that Amazon MSK is not a fully-managed product and does not provide serious Kafka SLAs. Hence, I see too many teams starting with Amazon MSK and then migrating to Confluent Cloud after some months. Let’s be clear: If you run an application against Apache Kafka and migrate to a re-implementation supporting the Kafka protocol, you should NOT expect 100% the same behavior as with Kafka. Some underlying behavior will differ even if the API is 100% compatible. This is sometimes a benefit. For instance, Redpanda focuses on performance optimization with C++. This is only possible in some workloads because of the re-implementation. C++ is superior compared to Java and the JVM for some performance and memory scenarios. Redpanda = Apache Kafka—Kafka Connect—Kafka Streams Apache Kafka includes Kafka Connect for data integration and Kafka Streams for stream processing. Like most Kafka-compatible projects, Redpanda does exclude these critical pieces from its offering. Hence, even 100 percent protocol compatibility would not mean a product re-implements everything in the Apache Kafka project. Lower Latency vs. Benchmarketing Always think about your performance requirements before starting a project. If necessary, do a proof of concept (POC) with Apache Kafka, Apache Pulsar, and Redpanda. I bet, that in 99% of scenarios, all three of them will show a good enough performance for your use case. Don’t trust opinionated benchmarks from others. Your use case will have different requirements and characteristics. Performance is typically just one of many evaluation dimensions. I am not a fan of most “benchmarks” of performance and throughput. Benchmarks are almost always opinionated and configured for a specific problem (whether a vendor, independent consultant, or researcher conducts them). My colleague Jack Vanlightly explained this concept of benchmarketing with excellent diagrams: Source: Jack Vanlightly Here is one concrete example you will find in one of Redpanda’s benchmarks: Kafka was not built for very high throughput producers, and this is what Redpanda is exploiting when they claim that Kafka’s throughput is inferior to Redpanda. Ask yourself this question: Of 1GB/s use cases, who would create that throughput with just 4 producers? Benchmarketing at its finest. Hence, once again, start with your business requirements. Then choose the right tool for the job. Benchmarks are always built for winning against others. Nobody will publish a benchmark where the competition wins. Soft Real-Time vs. Hard Real-Time When we speak about real-time in the IT world, we mean end-to-end data processing pipelines that need at least a few milliseconds. This is called soft real-time. And this is where Apache Kafka, Apache Pulsar, Redpanda, Azure Event Hubs, Apache Flink, Amazon Kinesis, and similar platforms fit into. None of these can do hard real time. Hard real-time requires a deterministic network with zero latency and no spikes. Typical scenarios include embedded systems, field buses, and PLCs in manufacturing, cars, robots, securities trading, etc. Time-Sensitive Networking (TSN) is the right keyword if you want more research. I wrote a dedicated blog post about why data streaming is NOT hard real-time. Hence, don’t try to use Kafka or Redpanda for these use cases. That’s OT (operational technology), not IT (information technology). OT is plain C or Rust on embedded software. No ZooKeeper With Redpanda vs. No ZooKeeper With Kafka Besides being implemented in C++ instead of using the JVM, the second big differentiator of Redpanda is no need for ZooKeeper and two complex distributed systems. Well, with Apache Kafka 3.3, this differentiator is gone. Kafka is now production-ready without ZooKeeper. KIP-500 was a multi-year journey and an operation at Kafka’s heart: To be fair, it will still take some time until the new ZooKeeper-less architecture goes into production. Also, today, it is only supported by new Kafka clusters. However, migration scenarios with zero downtime and without data loss will be supported in 2023, too. But that’s how a severe release cycle works for a mature software product: Step-by-step implementation and battle-testing instead of starting with marketing and selling of alpha and beta features. ZooKeeper-less data streaming with Kafka is not just a massive benefit for the scalability and reliability of Kafka but also makes operations much more straightforward, similar to ZooKeeper-less Redpanda. By the way, this was one of the major arguments why I did not see the value of Apache Pulsar. The latter requires not just two but three distributed systems: Pulsar broker, ZooKeeper, and BookKeeper. That’s nonsense and unnecessary complexity for virtually all projects and use cases. Lightweight Redpanda + Heavyweight Ecosystem = Middleweight Data Streaming? Redpanda is very lightweight and efficient because of its C++ implementation. This can help in limited compute environments like edge hardware. As an additional consequence, Redpanda has fewer latency spikes than Apache Kafka. That are significant arguments for Redpanda for some use cases. However, you need to look at the complete end-to-end data pipeline. If you use Redpanda as a message queue, you get these benefits compared to the JVM-based Kafka engine. You might then pick a message queue like RabbitMQ or NATs instead. I don’t start this discussion here as I focus on the much more powerful and advanced data streaming use cases. Even in edge use cases where you deploy a single Kafka broker, the hardware, like an industrial computer (IPC), usually provides at least 4GB or 8GB of memory. That is sufficient for deploying the whole data streaming platform around Kafka and other technologies. Data Streaming Is More Than Messaging or Data Ingestion My fundamental question is: what is the benefit of a C++ implementation of the data hub if all the surrounding systems are built with JVM technology or even worse and slow technologies like Python? Kafka-compatible tools, like Redpanda, integrate well with the Kafka ecosystem, as they use the same protocol. Hence, tools like Kafka Connect, Kafka Streams, KSQL, Apache Flink, Faust, and all other components from the Kafka ecosystem work with Redpanda. You will find such an example for almost every existing Kafka tool on the Redpanda blog. However, these combinations kill almost all the benefits of having a C++ layer in the middle. All integration and processing components would also need to be as efficient as Redpanda and use C++ (or Go or Rust) under the hood. These tools do not exist today (likely, as they are not needed by many people). And here is an additional drawback: The debugging, testing, and monitoring infrastructure must combine C++, Python, and JVM platforms if you combine tools like Java-based Kafka Connect and Python-based Faust with C++-based Redpanda. So, I don’t get the value proposition here. Data Replication Across Clusters Having more than one Kafka cluster is the norm, not an exception. Use cases like disaster recovery, aggregation, data sovereignty in different countries, or migration from on-premise to the cloud require multiple data streaming clusters. Replication across clusters is part of open-source Apache Kafka. MirrorMaker 2 (based on Kafka Connect) supports these use cases. More advanced (proprietary) tools from vendors like Confluent Replicator or Cluster Linking make these use cases more effortless and reliable. Data streaming with the Kafka ecosystem is perfect as the foundation of a decentralized data mesh: How do you build these use cases with Redpanda? It is the same story as for data integration and stream processing: How much does it help to have a very lightweight and performant core if all other components rely on “3rd party” code bases and infrastructure? In the case of data replication, Redpanda uses Kafka’s Mirrormaker. Make sure to compare MirrorMaker to Confluent Cluster Linking—the latter uses the Kafka protocol for replications and does not need additional infrastructure, operations, offset sync, etc. Non-Functional Differences Between Apache Kafka and Redpanda Technical evaluations are dominant when talking about Redpanda vs. Apache Kafka. However, the non-functional differences are as crucial before making the strategic decision to choose the data streaming platform for your next project. Licensing, adoption curve, and the total cost of ownership (TCO) are critical for the success of establishing a data streaming platform. Open Source (Kafka) vs. Source Available (Redpanda) As the name says, Apache Kafka is under the very permissive Apache license 2.0. Everyone, including cloud providers, can use the framework for building internal applications, commercial products, and cloud services. Committers and contributions are spread across various companies and individuals. Redpanda is released under the more restrictive Source Available License (BSL). The intention is to deter cloud providers from offering Redpanda’s work as a service. For most companies, this is fine, but it limits broader adoption across different communities and vendors. The likelihood of external contributors, committers, or even other vendors picking the technology is much smaller than in Apache projects like Kafka. This has a significant impact on the (future) adoption curve. Maturity, Community, and Ecosystem The introduction of this article showed the impressive adoption of Kafka. Just keep in mind: Redpanda is NOT Apache Kafka. It just supports the Kafka protocol. Redpanda is a brand-new product and implementation. Operations are different. The behavior of the engine is different. Experts are not available. Job offerings do not exist. And so on. Kafka is significantly better documented, has a tremendously larger community of experts, and has a vast array of supporting tooling that makes operations more straightforward. There are many local and online Kafka training options, including online courses, books, meetups, and conferences. You won’t find much for Redpanda beyond the content of the vendor behind it. And don’t trust marketing. That’s true for every vendor, of course. If you read a great feature list on the Redpanda website, double-check if the feature truly exists and in what shape it is. Example: RBAC (role-based access control) is available for Redpanda. The devil lies in the details. Quote from the Redpanda RBAC documentation: “This page describes RBAC in Redpanda Console and, therefore, manages access only for console users but not clients that interact via the Kafka API. To restrict Kafka API access, you need to use Kafka ACLs.” There are plenty of similar examples today. Just try to use the Redpanda cloud service. You will find many things that are more alpha than beta today. Make sure not to fall into the same myths around the marketing of product features as some users did with Apache Pulsar a few years ago. The Total Cost of Ownership and Business Value When you define your project’s business requirements and SLAs, ask yourself how much downtime or data loss is acceptable. The RTO (recovery time objective) and RPO (recovery point objective) impact a data streaming platform’s architecture and overall process to ensure business continuity, even in the case of a disaster. The TCO is not just about the cost of a product or cloud service. Full-time engineers need to operate and integrate the data streaming platform. Expensive project leads, architects, and developers build applications. Project risk includes the maturity of the product and the expertise you can bring in for consulting and 24/7 support. Similar to benchmarketing regarding latency, vendors use the same strategy for TCO calculations. Here is one concrete example you always hear from Redpanda: “C++ does enable more efficient use of CPU resources.” This statement is correct. However, the problem with that statement is that Kafka is rarely CPU-bound and much more IO-bound. Redpanda has the same network and disk requirements as Kafka, which means Redpanda has limited differences from Kafka in terms of TCO regarding infrastructure. When to Choose Redpanda Instead of Apache Kafka? You need to evaluate whether Redpanda is the right choice for your next project or if you should stick with the “real Apache Kafka” and related products or cloud offerings. Read articles and blogs, watch videos, search for case studies in your industry, talk to different competitive vendors, and build your proof of concept or pilot project. Qualifying out products is much easier than evaluating plenty of offerings. When to Seriously Consider Redpanda? You need C++ infrastructure because your ops team cannot handle and analyze JVM logs—but be aware that this is only the messaging core, not the data integration, data processing, or other capabilities of the Kafka ecosystem. The slight performance differences matter to you—and you still don’t need hard real-time. Simple, lightweight development on your laptop and in automated test environments—but you should then also run Redpanda in production (using different implementations of an API for TEST and PROD is a risky anti-pattern). You should evaluate Redpanda against Apache Kafka distributions and cloud services in these cases. This article explored the trade-offs Redpanda has from a technical and non-functional perspective. If you need an enterprise-grade solution or fully-managed cloud service, a broad ecosystem (connectors, data processing capabilities, etc.), and if 10ms latency is good enough and a few p99 spikes are okay, then I don’t see many reasons why you would take the risk of adopting Redpanda instead of an actual Apache Kafka product or cloud service. The Future Will Tell Us if Redpanda Is a Severe Competitor I didn’t even cover the fact that a startup always has challenges finding great case studies, especially with big enterprises like fortune 500 companies. The first great logos are always the hardest to find. Sometimes, startups never get there. In other cases, a truly competitive technology and product are created. Such a journey takes years. Let’s revisit this article in one, two, and five years to see the evolution of Redpanda (and Apache Kafka). What are your thoughts? When do you consider using Redpanda instead of Apache Kafka? Are you using Redpanda already? Why and for what use cases?
Amazon Simple Storage Service (S3) is a highly scalable, durable, and secure object storage service offered by Amazon Web Services (AWS). S3 allows businesses to store and retrieve any amount of data from anywhere on the web by making use of its enterprise-level services. S3 is designed to be highly interoperable and integrates seamlessly with other Amazon Web Services (AWS) and third-party tools and technologies to process data stored in Amazon S3. One of which is Amazon EMR (Elastic MapReduce), which allows you to process large amounts of data using open-source tools such as Spark. Apache Spark is an open-source distributed computing system used for large-scale data processing. Spark is built to enable speed and supports various data sources, including the Amazon S3. Spark provides an efficient way to process large amounts of data and perform complex computations in minimal time. Memphis.dev is a next-generation alternative to traditional message brokers.A simple, robust, and durable cloud-native message broker wrapped with an entire ecosystem that enables cost-effective, fast, and reliable development of modern queue-based use cases. The common pattern of message brokers is to delete messages after passing the defined retention policy, like time/size/number of messages. Memphis offers a 2nd storage tier for longer, possibly infinite retention for stored messages. Each message that expels from the station will automatically migrate to the 2nd storage tier, which, in that case, is AWS S3. In this tutorial, you will be guided through the process of setting up a Memphis station with a 2nd storage class connected to AWS S3. An environment on AWS. Followed by creating an S3 bucket, setting up an EMR cluster, installing and configuring Apache Spark on the cluster, preparing data in S3 for processing, processing data with Apache Spark, best practices, and performance tuning. Setting Up the Environment Memphis To get started, first install Memphis. Enable AWS S3 integration via the “Memphis integration center.” 3. Create a station (topic), and choose a retention policy. Each message passing the configured retention policy will be offloaded to an S3 bucket. 4. Check the newly configured AWS S3 integration as 2nd storage class by clicking “Connect.” 5. Start producing events into your newly created Memphis station. Create an AWS S3 Bucket If you haven’t done so already, you need to create an AWS account on AWS’ official page. Next, create an S3 bucket where you can store your data. You can use the AWS Management Console, the AWS CLI, or an SDK to create a bucket. For this tutorial, you will use the AWS management console here. Click on “Create bucket.” Then proceed to create a bucket name complying with the naming convention and choose the region where you want the bucket to be located. Configure the “Object ownership” and “Block all public access” to your use case. Make sure to configure other bucket permissions to allow your Spark application to access the data. Finally, click on the “Create bucket” button to create the bucket. Setting Up an EMR Cluster With Spark Installed The Amazon Elastic MapReduce (EMR) is a web service based on Apache Hadoop that allows users to cost-effectively process vast amounts of data using big data technologies, including Apache Spark. To create an EMR cluster with Spark installed, open the EMR console here and select “Clusters” under “EMR on EC2” on the left side of the page. Click on “Create cluster” and give the cluster a descriptive name.Under “Application bundle,” select “Spark” to install it on your cluster. Scroll down to the “Cluster logs” section and select the checkbox of “Publish cluster-specific logs to Amazon S3.” This will create a prompt to enter the Amazon S3 location using the S3 bucket name you created in the previous step followed by /logs , ie., s3: //myawsbucket/logs . /logs are required by Amazon to create a new folder in your bucket where Amazon EMR can copy the log files of your cluster. Go to the “Security configuration and permissions section,” and input your EC2 key pair or go with the option to create one. Then click on the dropdown options for “Service role for Amazon EMR” and choose “AWSServiceRoleForSupport.” Choose the same dropdown option for “IAM role for instance profile.” Refresh the icon if need be to get these dropdown options. Finally, click the “Create cluster” button to launch the cluster and monitor the cluster status to validate that it’s been created. Installing and Configuring Apache Spark on EMR Cluster After successfully creating an EMR cluster the next step will be to configure Apache Spark on the EMR Cluster. The EMR clusters provide a managed environment for running Spark applications on AWS infrastructure, making it easy to launch and manage Spark clusters in the cloud. It configures Spark to work with your data and processing needs and then submits Spark jobs to the cluster to process your data. You can configure Apache Spark to the cluster with the Secure Shell (SSH) protocol. First, you need to authorize the SSH security connections to your cluster, which was set by default when you created the EMR cluster. A guide on how to authorize SSH connections can be found here. To create an SSH connection, you need to specify the EC2 key pair that you selected when creating the cluster. Next, connect to the EMR cluster using the Spark shell by connecting the primary node. You need to fetch the master public DNS of the primary node by: Navigating to the left of the AWS console. Under EMR on EC2, choose “Clusters.” Select the cluster of the public DNS name you want to get. On your OS terminal, input the following command: ssh hadoop@ec2-###-##-##-###.compute-1.amazonaws.com -i ~/mykeypair.pem Replace the ec2-###-##-##-###.compute-1.amazonaws.com with the name of your master public DNS and the ~/mykeypair.pem with the file and path name of your .pem file. A prompt message will pop up, and your response should be “yes.” Type in “exit” to close the SSH command. Preparing Data for Processing With Spark and Uploading to S3 Bucket Data processing requires preparation before uploading to present the data in a format that Spark can easily process. The format used is influenced by the type of data you have and the analysis you plan to perform. Some formats used include CSV, JSON, and Parquet. Create a new Spark session and load your data into Spark using the relevant API. For instance, use the spark.read.csv() method to read CSV files into a Spark DataFrame. Amazon EMR, a managed service for Hadoop ecosystem clusters, can be used to process data. It reduces the need to set up, tune, and maintain clusters. It also features other integrations with Amazon SageMaker, for example, to start a SageMaker model training job from a Spark pipeline in Amazon EMR. Once your data is ready, using the DataFrame.write.format(“s3”) method, you can read a CSV file from the Amazon S3 bucket into a Spark DataFrame. You should have configured your AWS credentials and have written permissions to access the S3 bucket. Indicate the S3 bucket and path where you want to save the data. For example, you can use the df. write .format( “s3” ). save ( “s3://my-bucket/path/to/data” ) method to save the data to the specified S3 bucket. Once the data is saved to the S3 bucket, you can access it from other Spark applications or tools, or you can download it for further analysis or processing. To upload the bucket, create a folder and choose the bucket you initially created. Choose the “Actions” button, and click on “Create Folder” in the drop-down items. You can now name the new folder. To upload the data files to the bucket: Select the name of the data folder. In the “Upload,” select “Files wizard,” and choose “Add Files.” Proceed with the Amazon S3 console direction to upload the files and select “Start Upload.” It’s important to consider and ensure best practices for securing your data before uploading your data to the S3 bucket. Understanding Data Formats and Schemas Data formats and schemas are two related, but completely different and important concepts, in data management. Data format refers to the organization and structure of data within the database. There are various formats to store data, ie., CSV, JSON, XML, YAML, etc. These formats define how data should be structured alongside the different types of data and applications applicable to it. While data schemas are the structure of the database itself. It defines the layout of the database and ensures data is stored appropriately. A database schema specifies the views, tables, indexes, types, and other elements. These concepts are important in analytics and the visualization of the database. Cleaning and Preprocessing Data in S3 It is essential to double-check for errors in your data before processing it. To get started, access the data folder you saved the data file in your S3 bucket, and download it to your local machine. Next, you will load the data into the data processing tool, which would be used to clean and preprocess the data. For this tutorial, the preprocessing tool used is Amazon Athena, which helps analyze unstructured and structured data stored in Amazon S3. 1. Go to the Amazon Athena in AWS Console. 2. Click on “Create” to create a new table and then “CREATE TABLE.” 3. Type in the path of your data file in the part highlighted as “LOCATION.” 4. Go along with the prompts to define the schema for the data and save the table. Now, you can run a query to validate that the data is loaded correctly and then clean and preprocess the data. For example: This query identifies the duplicates present in the data: SELECT row1, row2, COUNT(*) FROM table GROUP row, row2 HAVING COUNT(*) > 1; This example creates a new table without the duplicates: CREATE TABLE new_table AS SELECT DISTINCT * FROM table; Finally, export the cleaned data back to S3 by navigating to the S3 bucket and the folder to upload the file. Understanding the Spark Framework The Spark framework is an open-source, simple, and expressive cluster computing system that was built for rapid development. It is based on the Java programming language and serves as an alternative to other Java frameworks. The core feature of Spark is its in-memory data computing abilities, which speed up the processing of large datasets. Configuring Spark to Work With S3 To configure Spark to work with S3, begin by adding the Hadoop AWS dependency to your Spark application. Do this by adding the following line to your build file (e.g., build.sbt for Scala or pom.xml for Java): libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "3.3.1" Input the AWS access key ID and secret access key in your Spark application by setting the following configuration properties: spark.hadoop.fs.s3a.access.key <ACCESS_KEY_ID> spark.hadoop.fs.s3a.secret.key <SECRET_ACCESS_KEY> Set the following properties using the SparkConf object in your code: val conf = new SparkConf() .set("spark.hadoop.fs.s3a.access.key", "<ACCESS_KEY_ID>") .set("spark.hadoop.fs.s3a.secret.key", "<SECRET_ACCESS_KEY> Set the S3 endpoint URL in your Spark application by setting the following configuration property: spark.hadoop.fs.s3a.endpoint s3.<REGION>.amazonaws.com Replace <REGION> with the AWS region where your S3 bucket is located (e.g. us-east-1). A DNS-compatible bucket name is required to grant the S3 client in Hadoop access for the S3 requests. If your bucket name contains dots or underscores, you may need to enable path style access for the sake of the S3 client in Hadoop, which uses a virtual host style. Set the following configuration property to enable path access: spark.hadoop.fs.s3a.path.style.access true Lastly, create a Spark session with the S3 configuration by setting the spark.hadoop prefix in the Spark configuration: val spark = SparkSession.builder() .appName("MyApp") .config("spark.hadoop.fs.s3a.access.key", "<ACCESS_KEY_ID>") .config("spark.hadoop.fs.s3a.secret.key", "<SECRET_ACCESS_KEY>") .config("spark.hadoop.fs.s3a.endpoint", "s3.<REGION>.amazonaws.com") .getOrCreate() To read the data from S3 in Spark, the spark.read method will be used and then specify the S3 path to your data as the input source. An example code demonstrating how to read a CSV file from S3 into a DataFrame in Spark: val spark = SparkSession.builder() .appName("ReadDataFromS3") .getOrCreate() val df = spark.read .option("header", "true") // Specify whether the first line is the header or not .option("inferSchema", "true") // Infer the schema automatically .csv("s3a://<BUCKET_NAME>/<FILE_PATH>") In this example, replace <BUCKET_NAME> with the name of your S3 bucket and <FILE_PATH> with the path to your CSV file within the bucket. Transforming Data With Spark Transforming data with Spark typically refers to operations on data to clean, filter, aggregate, and join data. Spark makes available a rich set of APIs for data transformation, they include DataFrame, Dataset, and RDD APIs. Some of the common data transformation operations in Spark include filtering, selecting columns, aggregating data, joining data, and sorting data. Here’s one example of data transformation operations: Sorting data: This operation involves sorting data based on one or more columns. The orderBy or sort method on a DataFrame or Dataset is used to sort data based on one or more columns. For example: val sortedData = df.orderBy(col("age").desc) Finally, you may need to write the result back to S3 to store the results. Spark provides various APIs to write data to S3, such as DataFrameWriter, DatasetWriter, and RDD.saveAsTextFile. The following is a code example demonstrating how to write a DataFrame to S3 in Parquet format: val outputS3Path = "s3a://<BUCKET_NAME>/<OUTPUT_DIRECTORY>" df.write .mode(SaveMode.Overwrite) .option("compression", "snappy") .parquet(outputS3Path) Replace the input field of the <BUCKET_NAME> with the name of your S3 bucket, and <OUTPUT_DIRECTORY> with the path to the output directory in the bucket. The mode method specifies the write mode, which can be Overwrite, Append, Ignore, or ErrorIfExists. The option method can be used to specify various options for the output format, such as compression code. You can also write data to S3 in other formats, such as CSV, JSON, and Avro, by changing the output format and specifying the appropriate options. Understanding Data Partitioning in Spark In simple terms, data partitioning in Spark refers to the splitting of the dataset into smaller, more manageable portions across the cluster. The purpose of this is to optimize performance, reduce scalability, and, ultimately, improve database manageability. In Spark, data is processed in parallel on several clusters. This is made possible by Resilient Distributed Datasets (RDD), which are a collection of huge, complex data. By default, RDD is partitioned across various nodes due to their size. To perform optimally, there are ways to configure Spark to make sure jobs are executed promptly and the resources are managed effectively. Some of these include caching, memory management, data serialization, and the use of mapPartitions() over map(). Spark UI is a web-based graphical user interface that provides comprehensive information about a Spark application’s performance and resource usage. It includes several pages, such as Overview, Executors, Stages, and Tasks, that provide information about various aspects of a Spark job. Spark UI is an essential tool for monitoring and debugging Spark applications, as it helps identify performance bottlenecks, and resource constraints, and troubleshoot errors. By examining metrics, such as the number of completed tasks, duration of the job, CPU and memory usage, and shuffle data written and read, users can optimize their Spark jobs and ensure they run efficiently. Conclusion Processing your data on AWS S3 using Apache Spark is an effective and scalable way to analyze huge datasets. By utilizing the cloud-based storage and computing resources of AWS S3 and Apache Spark, users can process their data fast and effectively without having to worry about architecture management. In this tutorial, we went through setting up an S3 bucket and Apache Spark cluster on AWS EMR, configuring Spark to work with AWS S3, and writing and running Spark applications to process data. We also covered data partitioning in Spark, Spark UI, and optimizing performance in Spark.
What We Use ClickHouse For The music library of Tencent Music contains data of all forms and types: recorded music, live music, audio, videos, etc. As data platform engineers, our job is to distill information from the data, based on which our teammates can make better decisions to support our users and musical partners. Specifically, we do an all-round analysis of the songs, lyrics, melodies, albums, and artists, turn all this information into data assets, and pass them to our internal data users for inventory counting, user profiling, metrics analysis, and group targeting. We stored and processed most of our data in Tencent Data Warehouse (TDW), an offline data platform where we put the data into various tag and metric systems and then created flat tables centering each object (songs, artists, etc.). Then we imported the flat tables into ClickHouse for analysis and Elasticsearch for data searching and group targeting. After that, our data analysts used the data under the tags and metrics they needed to form datasets for different usage scenarios, during which they could create their own tags and metrics. The data processing pipeline looked like this: Why ClickHouse Is Not a Good Fit When working with the above pipeline, we encountered a few difficulties: Partial Update: Partial update of columns was not supported. Therefore, any latency from any one of the data sources could delay the creation of flat tables and thus undermine data timeliness. High storage cost: Data under different tags and metrics was updated at different frequencies. As much as ClickHouse excelled in dealing with flat tables, it was a huge waste of storage resources to just pour all data into a flat table and partition it by day, not to mention the maintenance cost coming with it. High maintenance cost: Architecturally speaking, ClickHouse was characterized by the strong coupling of storage nodes and compute nodes. Its components were heavily interdependent, adding to the risks of cluster instability. Plus, for federated queries across ClickHouse and Elasticsearch, we had to take care of a huge amount of connection issues. That was just tedious. Transition to Apache Doris Apache Doris, a real-time analytical database, boasts a few features that are exactly what we needed to solve our problems: Partial update: Doris supports a wide variety of data models, among which the Aggregate Model supports the real-time partial update of columns. Building on this, we can directly ingest raw data into Doris and create flat tables there. The ingestion goes like this: Firstly, we use Spark to load data into Kafka; then, any incremental data will be updated to Doris and Elasticsearch via Flink. Meanwhile, Flink will pre-aggregate the data so as to release the burden on Doris and Elasticsearch. Storage cost: Doris supports multi-table join queries and federated queries across Hive, Iceberg, Hudi, MySQL, and Elasticsearch. This allows us to split the large flat tables into smaller ones and partition them by update frequency. The benefits of doing so include relief of storage burden and an increase in query throughput. Maintenance cost: Doris is of simple architecture and is compatible with MySQL protocol. Deploying Doris only involves two processes (FE and BE) with no dependency on other systems, making it easy to operate and maintain. Also, Doris supports querying external ES data tables. It can easily interface with the metadata in ES and automatically map the table schema from ES so we can conduct queries on Elasticsearch data via Doris without grappling with complex connections. What's more, Doris supports multiple data ingestion methods, including batch import from remote storage such as HDFS and S3, data reads from MySQL binlog and Kafka, and real-time data synchronization or batch import from MySQL, Oracle, and PostgreSQL. It ensures service availability and data reliability through a consistency protocol and is capable of auto-debugging. This is great news for our operators and maintainers. Statistically speaking, these features have cut our storage cost by 42% and development cost by 40%. During our usage of Doris, we have received lots of support from the open-source Apache Doris community and timely help from the SelectDB team, which is now running a commercial version of Apache Doris. Further Improvements To Serve Our Needs Introduce a Semantic Layer Speaking of the datasets, on the bright side, our data analysts are given the liberty of redefining and combining the tags and metrics at their convenience. But on the dark side, high heterogeneity of the tag and metric systems leads to more difficulty in their usage and management. Our solution is to introduce a semantic layer in our data processing pipeline. The semantic layer is where all the technical terms are translated into more comprehensible concepts for our internal data users. In other words, we are turning the tags and metrics into first-class citizens for data definement and management. Why Would This Help? For data analysts, all tags and metrics will be created and shared at the semantic layer so there will be less confusion and higher efficiency. For data users, they no longer need to create their own datasets or figure out which one is applicable for each scenario but can simply conduct queries on their specified tagset and metricset. Upgrade the Semantic Layer Explicitly defining the tags and metrics at the semantic layer was not enough. In order to build a standardized data processing system, our next goal was to ensure consistent definition of tags and metrics throughout the whole data processing pipeline. For this sake, we made the semantic layer the heart of our data management system: How Does It Work? All computing logics in TDW will be defined at the semantic layer in the form of a single tag or metric. The semantic layer receives logic queries from the application side, selects an engine accordingly, and generates SQL. Then it sends the SQL command to TDW for execution. Meanwhile, it might also send configuration and data ingestion tasks to Doris and decide which metrics and tags should be accelerated. In this way, we have made the tags and metrics more manageable. A fly in the ointment is that since each tag and metric is individually defined, we are struggling with automating the generation of a valid SQL statement for the queries. If you have any idea about this, you are more than welcome to talk to us. Give Full Play to Apache Doris As you can see, Apache Doris has played a pivotal role in our solution. Optimizing the usage of Doris can largely improve our overall data processing efficiency. So, in this part, we are going to share with you what we do with Doris to accelerate data ingestion and queries and reduce costs. What We Want Currently, we have 800+ tags and 1300+ metrics derived from the 80+ source tables in TDW. When importing data from TDW to Doris, we hope to achieve: Real-time availability: In addition to the traditional T+1 offline data ingestion, we require real-time tagging. Partial update: Each source table generates data through its own ETL task at various paces and involves only part of the tags and metrics, so we require support for partial update of columns. High performance: We need a response time of only a few seconds in group targeting, analysis, and reporting scenarios. Low costs: We hope to reduce costs as much as possible. What We Do Generate Flat Tables in Flink Instead of TDW Generating flat tables in TDW has a few downsides: High storage cost: TDW has to maintain an extra flat table apart from the discrete 80+ source tables. That's huge redundancy. Low real-timeliness: Any delay in the source tables will be augmented and retard the whole data link. High development cost: To achieve real-timeliness would require extra development efforts and resources. On the contrary, generating flat tables in Doris is much easier and less expensive. The process is as follows: Use Spark to import new data into Kafka in an offline manner. Use Flink to consume Kafka data. Create a flat table via the primary key ID. Import the flat table into Doris. As is shown below, Flink has aggregated the five lines of data, of which "ID"=1, into one line in Doris, reducing the data writing pressure on Doris. This can largely reduce storage costs since TDW no longer has to maintain two copies of data, and KafKa only needs to store the new data pending for ingestion. What's more, we can add whatever ETL logic we want into Flink and reuse lots of development logic for offline and real-time data ingestion. Name the Columns Smartly As we mentioned, the Aggregate Model of Doris allows for a partial update of columns. Here we provide a simple introduction to other data models in Doris for your reference: Unique Model: This is applicable for scenarios requiring primary key uniqueness. It only keeps the latest data of the same primary key ID. (As far as we know, the Apache Doris community is planning to include partial update of columns in the Unique Model, too.) Duplicate Model: This model stores all original data exactly as it is without any pre-aggregation or deduplication. After determining the data model, we had to think about how to name the columns. Using the tags or metrics as column names was not a choice because: Our internal data users might need to rename the metrics or tags, but Doris 1.1.3 does not support the modification of column names. Tags might be taken online and offline frequently. If that involves the adding and dropping of columns, it will be not only time-consuming but also detrimental to query performance. Instead, we do the following: For flexible renaming of tags and metrics, we use MySQL tables to store the metadata (name, globally unique ID, status, etc.). Any change to the names will only happen in the metadata but will not affect the table schema in Doris. For example, if a song_name is given an ID of 4, it will be stored with the column name of a4 in Doris. Then if the song_nameis involved in a query, it will be converted to a4 in SQL. For the onlining and offlining of tags, we sort out the tags based on how frequently they are being used. The least used ones will be given an offline mark in their metadata. No new data will be put under the offline tags but the existing data under those tags will still be available. For real-time availability of newly added tags and metrics, we prebuild a few ID columns in Doris tables based on the mapping of name IDs. These reserved ID columns will be allocated to the newly added tags and metrics. Thus, we can avoid table schema change and the consequent overheads. Our experience shows that only 10 minutes after the tags and metrics are added, the data under them can be available. Noteworthily, the recently released Doris 1.2.0 supports Light Schema Change, which means that to add or remove columns, you only need to modify the metadata in FE. Also, you can rename the columns in data tables as long as you have enabled Light Schema Change for the tables. This is a big trouble saver for us. Optimize Date Writing Here are a few practices that have reduced our daily offline data ingestion time by 75% and our CUMU compaction score from 600+ to 100. Flink pre-aggregation: as is mentioned above. Auto-sizing of writing batch: To reduce Flink resource usage, we enable the data in one Kafka Topic to be written into various Doris tables and realize the automatic alteration of batch size based on the data amount. Optimization of Doris data writing: fine-tune the sizes of tablets and buckets as well as the compaction parameters for each scenario: max_XXXX_compaction_thread max_cumulative_compaction_num_singleton_deltas Optimization of the BE commit logic: conduct regular caching of BE lists, commit them to the BE nodes batch by batch, and use finer load balancing granularity. Use Dori-on-ES in Queries About 60% of our data queries involve group targeting. Group targeting is to find our target data by using a set of tags as filters. It poses a few requirements for our data processing architecture: Group targeting related to APP users can involve very complicated logic. That means the system must support hundreds of tags as filters simultaneously. Most group targeting scenarios only require the latest tag data. However, metric queries need to support historical data. Data users might need to perform further aggregated analysis of metric data after group targeting. Data users might also need to perform detailed queries on tags and metrics after group targeting. After consideration, we decided to adopt Doris-on-ES. Doris is where we store the metric data for each scenario as a partition table, while Elasticsearch stores all tag data. The Doris-on-ES solution combines the distributed query planning capability of Doris and the full-text search capability of Elasticsearch. The query pattern is as follows: SELECT tag, agg(metric) FROM Doris WHERE id in (select id from Es where tagFilter) GROUP BY tag As is shown, the ID data located in Elasticsearch will be used in the sub-query in Doris for metric analysis. In practice, we find that the query response time is related to the size of the target group. If the target group contains over one million objects, the query will take up to 60 seconds. If it is even larger, a timeout error might occur. After investigation, we identified our two biggest time wasters: When Doris BE pulls data from Elasticsearch (1024 lines at a time by default) for a target group of over one million objects, the network I/O overhead can be huge. After the data pulling, Doris BE needs to conduct Join operations with local metric tables via SHUFFLE/BROADCAST, which can cost a lot. Thus, we make the following optimizations: Add a query session variable es_optimize that specifies whether to enable optimization. In data writing into ES, add a BK column to store the bucket number after the primary key ID is hashed. The algorithm is the same as the bucketing algorithm in Doris (CRC32). Use Doris BE to generate a Bucket Join execution plan, dispatch the bucket number to BE ScanNode, and push it down to ES. Use ES to compress the queried data, turn multiple data fetch into one and reduce network I/O overhead. Make sure that Doris BE only pulls the data of buckets related to the local metric tables and conducts local Join operations directly to avoid data shuffling between Doris BEs. As a result, we reduce the query response time for large group targeting from 60 seconds to a surprising 3.7 seconds. Community information shows that Doris is going to support inverted indexing since version 2.0.0, which is soon to be released. With this new version, we will be able to conduct a full-text search on text types, equivalence or range filtering of texts, numbers, and datetime, and conveniently combine AND, OR, NOT logic in filtering since the inverted indexing supports array types. This new feature of Doris is expected to deliver 3~5 times better performance than Elasticsearch on the same task. Refine the Management of Data Doris' capability of cold and hot data separation provides the foundation of our cost reduction strategies in data processing. Based on the TTL mechanism of Doris, we only store data of the current year in Doris and put the historical data before that in TDW for lower storage cost. We vary the number of copies for different data partitions. For example, we set three copies for data from the recent three months, which is used frequently, one copy for data older than six months, and two copies for data in between. Doris supports turning hot data into cold data, so we only store data of the past seven days in SSD and transfer data older than that to HDD for less expensive storage. Conclusion Thank you for scrolling all the way down here and finishing this long read. We've shared our cheers and tears, lessons learned, and a few practices that might be of some value to you during our transition from ClickHouse to Doris. We really appreciate the help from the Apache Doris community and the SelectDB team, but we might still be chasing them around for a while since we attempt to realize auto-identification of cold and hot data, pre-computation of frequently used tags/metrics, simplification of code logic using Materialized Views, and so on and so forth. (This article is co-written by me and my colleague Kai Dai. We are both data platform engineers at Tencent Music (NYSE: TME), a music streaming service provider with a whopping 800 million monthly active users. To drop the number here is not to brag but to give a hint of the sea of data that my poor coworkers and I have to deal with every day.)
With constant digital evolution, the sources of streaming data are rising, such as IoT, networked devices, online activities of various kinds, server log files, and so on. And with every industry becoming reliant on this data to unlock data-driven business insights, streaming processing systems power everything from real-time fraud detection, stock trading platforms, and sentiment analysis from social media feeds to multiplayer games and GPS tracking. However, streaming data is generated at very high velocities by many data sources. Thus, it can be challenging to build robust stream processing solutions. This article describes how stream processing works and the fundamental building blocks of its architecture. What Is Stream Processing? Stream processing is a data management technology that processes data on the fly. It involves ingesting a continuous flow of incoming data and processing/transforming it as it arrives. Once processed, the results are delivered to a destination for immediate action and/or stored for later use. How Does It Work? Before we dive deeper into how stream processing works, let's look at some standard stream processing terms, which include: The continuously generated data is referred to as streaming data, which typically arrives at high velocities, in high volumes, and unbounded (a dataset that is theoretically infinite in size). Events refer to any number of things in a digital system, such as application metrics, user activities on a website, financial transactions, or IoT sensor data. In a typical stream processing application, events and data are generated by one or multiple publishers/sources (also called producers). The data is then enhanced, tested, and transformed if necessary. Finally, the system sends the data to a subscriber/sink (also called a consumer). The publishers and subscribers are also commonly referred to as pub/sub. Common sources and sinks include Apache Kafka and big data repositories like Hadoop. Note: It is essential to note that stream processing signifies the notion of real-time analytics but in relative terms. "Real time" could mean millionths of a second for an algorithmic stock trading app, billionths of a second for a physics researcher, or five minutes for a weather analytics app. This notion points to how a stream processing engine packages bunches of data for different applications. It organizes data events arriving in short batches to present them to other applications as a continuous feed. Thus, it also simplifies the logic for developers who combine and recombine data from different sources from different time scales. Components of a Stream Processing Architecture A stream processing system is a framework of software components that provides a solution to handle most use cases, if not all. So architecturally, it can be a complex process to build such as system. So what are the building blocks of a streaming architecture? Below we will discuss and review where and how each building block or component type fits in the overall architecture. 1. Stream Processor or Message Broker to Collect Data and Redistribute It Stream processors or message brokers use API to fetch data from producers/sources. The processor converts the data into a standard messaging format and streams the output continuously to consumers. It collects data streams from various sources, such as social media networks, clickstreams, in-game player activities, e-commerce purchases, and more. Moreover, data can arrive in different formats (unstructured or semi-structured formats, such as JSON). The first generation of message brokers (such as Apache ActiveMQ and RabbitMQ) relied on the MOM (Message Oriented Middleware) paradigm. But later, hyper-performant messaging platforms called stream processors emerged and proved more suitable for a streaming paradigm. Popular stream processing tools today are Apache Kafka, Azure Event Hub, Amazon Kinesis Data Streams, and Google Cloud PubSub. 2. Stream Processing and Data Transformation Tools (ETL, ELT, Etc.) to Ready Data for Querying After the message broker deposits data, stream processing or data transformation tools, transform, aggregate, and structure the data to ensure it is ready for analysis. The transformations can include- normalization, mapping relevant fields to columns, compacting, enrichment (combining data points with other data sources to create more context and meaning), partitioning, and more. The result may be an action, an alert, an API call, a visualization, or (in some cases) a new data stream. 3. Analytics and Query Engines to Extract Business Value As soon as the data is prepared for consumption, it is analyzed to unlock value. There are various approaches to streaming data analytics, depending on the use case. Some examples of tools and techniques include query engines (Amazon Athena, Amazon Redshift), text search engines (Elasticsearch), and so on. Furthermore, the processed data is often written to analytical data stores, where it is optimized for visualization and analytics. Or it is ingested directly into the analytics and reporting layer for analysis, BI (business intelligence), and real-time dashboard visualization. 4. Data Storage It can include cost-effective storage (file and object storage) for high volumes and the multi-structured nature of streaming data. Or data stores are also used to store output data after processing for further use later. For example, if you are storing your streaming data on Snowflake, it also lets you perform real-time analytics with dashboards and BI tools. These data stores can also act as flexible integration points as tools outside the streaming ecosystem can access the data. Moreover, with the advent of low-cost storage technologies such as Amazon S3, most organizations today store their streaming event data or archive it. Stateless vs. Stateful Stream Processing Stream processing can be stateful or stateless. In stateless stream processing, the current data/events are processed independently of previous ones. The data is evaluated as it arrives without consideration for the prior state or knowledge. On the contrary, stateful stream processing is concerned with the overall state of data. It means that past and current events share a state. So the context of preceding events helps shape the processing of current events. For instance, stateless processing applies when you need a real-time feed of the temperature of an industrial machine without concern for how it changes. But a stateful stream processing system is ideal if you want to forecast future temperature based on how it has changed over time. Conclusion It can be challenging to build a stream processing system that is fast, scalable, secure, and reliable. Moreover, numerous modern and flexible tools and managed services are available on the market today. We hope this article helped you understand the vital components and considerations for choosing the right technologies to develop an efficient stream processing engine.
Open-source technology is becoming increasingly popular in the data integration industry, and for good reasons. Open source creates the right incentives, allowing users to own their data entirely, unlike closed source, where you build knowledge in a proprietary tool with a price tag. Open source also creates communities around common problems, allowing for the exchange of valuable knowledge and collaborative problem-solving. In this article, we will start investigating the reasons behind the adoption success of open source before delving deeper into the data integration industry, more specifically focusing on open-source vs. closed-source ELT (Extract, Load, Transform) solutions. We will discuss how open-source ELT allows for greater control over the data integration process, more efficient data processing, and cost savings for organizations. Additionally, we will explore the growing trend of open-source ELT adoption in the industry and examine the future of open-source data integration. If you're ready to consider open source, Airbyte is a great place to start. Its platform solves the long tail of connectors that closed-source solutions often neglect. We’ll explore its easy-to-use Connector Development Kit and more. Why Open Source: From Visibility To Open Standards and Flexible Deployments Options Open source means you have visibility and flexibility. Given that a single organization can't solve data problems with the ever-growing data ecosystem market, open source is the approach to tackle the challenge collaboratively and in a sustainable way as data tools/frameworks get created once for everyone, following DRY. Open source allows fast interactions as different companies use the same tools, report back in case of error, or even fix it for everyone else. The best example is security patches that must be resolved quickly. With open source, you are in full control. Whether you process the data through the fully open system and have the code of it saved and version controlled for full transparency. You know the alternative: building a custom-built tool for your employer where the one initially created left a couple of years ago — or having a close source solution but missing a critical feature or connector that you cannot add yourself, even though you'd have the skills. Open source also creates communities around a common problem. You can exchange valuable knowledge and find solutions collaboratively. Now you are not alone in fighting all these problems; suddenly, you have peers at the same stage, just in a different company. Besides the community, open source creates open standards that are crucial for integration across-company efforts. With many close source vendors, it's hard to agree on standards, code is hidden, and everyone wants to be the standard. Lastly, flexible deployment options. As it's open, you can deploy it on-premise in your infrastructure if you have sensitive data or work in sensitive sectors such as health care or banking, which also have high regulation by the law. But also in terms of security and GDPR, open source helps tremendously, open source ELT as you can use things like EtLT (we will get into it in a minute). Why NOT Open-Source? Although open source is an appreciated buzzword, if your audience is not engineers, open source can be overwhelming at first. The community is one key argument for open source; if you do not have an overlap between your developers and that community, the benefits are more minor. If you have a small need for customization and have simple use cases, it is better to use a standardized closed-source solution and pay for that. Open source requires a lot of education. If that piece of software is outside the core of your value proposition, it might be better not to use open source. But with the above consideration, keep in mind that with the closed source, you are building knowledge in a proprietary tool rather than something generic and easily transferable (e.g., coding in Python). It's powerful for a simple pipeline, but it isn't easy to extend and maintain when it grows. It takes work to follow the best software engineering practices like testing or versioning. Licensing is usually rather expensive. What About Open-Source ELT? Let's briefly recap what ELT (Extract Load and Transform) stands for. ELT is in contrast to the more traditional ETL data integration approach, in which data is transformed before it arrives at the destination. Read more About the Differences between ETL vs. ELTETL and ELT are two paradigms for moving data from one system to another. We detailed comparisons, including images in our Data Glossary on ETL vs. ELT. The ETL approach was once necessary because of the high costs of on-premises computation and storage. With the rapid growth of cloud-based data warehouses such as Snowflake and the plummeting price of cloud-based computation and storage, there is lesser reason to continue doing transformation before loading at the final destination. Indeed, flipping the two enables analysts to do a better job autonomously and support agile decision-making. You are letting them develop insights based on existing data instead of coming up with ideas beforehand, defining schemas, and transforming. ETL has several disadvantages compared to ELT. Generally, only transformed data is stored in the destination system, so analysts must know beforehand how to use it and every report they produce, creating slower development cycles. Changes to requirements can be costly, often resulting in re-ingesting data from source systems. Every transformation performed on the data may obscure some underlying information, and analysts only see what was kept during the transformation phase. Building an ETL-based data pipeline is often beyond the technical capabilities of analysts. On the contrary, ELT solutions tend to be simpler to understand. ELT promotes data literacy across a data-driven company, as with cloud-based business intelligence tools, everyone in the company can explore and create analytics on all data. Dashboards become accessible even for non-technical users. ELT/ETL Tool ComparisonNeed to find the best data integration tool for your business? Which platform integrates with hour data sources and destinations? Which one provides the features you’re looking for? We made it simple for you and collected them in a spreadsheet with a comparison of all those actors. Or an extensive detailed comparison between the tools on Top ETL tools compared in detail. Why Airbyte? Airbyte is the open source platform that unifies data integration with 300+ connectors (and growing fast) to tackle the long tail of connectors, which makes it the most connectors in the industry. And more than 35,000 companies have used Airbyte to sync data from sources such as PostgreSQL, MySQL, Facebook Ads, Salesforce, and Stripe and connect to destinations that include Redshift, Snowflake, Databricks, and BigQuery over the past year and a half. Most closed-source companies stagnate at 150 connectors as the most challenging part is not building the connectors; it is maintaining them. That is costly, and any closed-source solution is constrained by ROI (return on investment) considerations. As a result, ETL suppliers focus on the most popular integrations, yet companies use more and more tools every month, and the long tail of connectors needs to be addressed. When it comes to the cost of ownership, Airbyte shines in the long run. Closed-source solutions grow more and more expensive over time as more edge cases emerge that aren't supported. Besides paying for the connectors, you also need to maintain an in-house team to create non-supported but essential connectors. Airbyte and open-source ELT make data integration future-proof as you get both in one with a wide variety of out-of-the-box connectors, plus an easy way to extend or create custom connectors. Furthermore, in the event that you can't find an ELT connector that suits your requirements, Airbyte makes it easy to build a connector with the Airbyte CDK (Connector Developer Kit), which generates 75% of the code required. Here is the complete list of connectors currently available for Airbyte. Included are templates for building new connectors in Java or Python. Airbyte offers robust pre-built features that otherwise need to be added by your engineers. You can configure replications to meet your needs: Schedule full-refresh, incremental, and log-based CDC replications across all your configured destinations. What’s Next for Open-Source ELT? As we've seen, open-source ELT is rapidly gaining popularity in the data ecosystem and the data integration industry precisely due to its numerous benefits. The increased transparency, openness, and customizability allow for faster interactions and more efficient problem-solving, making open source an ideal solution for businesses of all sizes. As the industry continues to evolve and data becomes an even more integral part of business operations, it is no surprise that open-source ELT is the future of data integration. Companies that take advantage of these solutions will be better equipped to handle the demands of a data-driven world in the long term. Collaboration and knowledge-sharing within communities also allow for more efficient problem-solving and innovation.
In this article, we are going to analyze a new data orchestrator: Dagster. In our opinion, this is the first generation of data orchestrators that bring data pipelines closer to critical business processes that would really be business data processes for mission-critical solutions. To describe Dagster's capabilities and use cases, we are going to provide some context about patterns and some historical information that is necessary to understand what business value it brings. In the last decade, many trends have been around orchestration and choreography patterns. We are going to provide a simple description of these patterns: Orchestration: It is a well-defined workflow orchestrated and centralized by an orchestration system. An orchestration system is like a musical orchestra where there is a conductor that provides direction to musicians to set the tempo and ensure correct entries. There are three main characteristics of orchestration: Provide a centralized workflow that allows visualizing easily the business or data process. The workflow is managed by a single layer which is the most critical component. If the orchestration system is down there is no business service, without a conductor there is no choral concert. They are very flexible to be integrated into different architecture patterns such as APIs, event-driven, RPC, or data processes. Choreography: It is based on event-driven or streaming architecture, the goal is that every component in the architecture works uncouple, and has its own responsibility to make decisions about the actions it has to take. There are three main characteristics of choreography: It has to be based on an event-driven or streaming pattern. There is no single and centralized layer so there is no single point of failure unless you have a single message broker. Provide more scalability, flexibility, and also more complexity to understand the process. Orchestrators and business process management software have always been close to the business layer, which increased their popularity in the company's strategic tech roadmap. The first generation of BPMs started around the year 2000 and were technologies for software engineers. A few years later between 2014 and 2018, event-driven and choreography patterns started to increase in popularity, first with Netflix, and then with the appearance of streaming platforms such as Apache Kafka. The data world always remains a bit late compared to the software world. Although in my opinion, we are moving towards a scenario where the operational and analytical worlds will not be isolated. Architectures and team topologies where the analytical and operational layer work as two different minds are not working when companies need to apply a data-driven approach where data is part of the core of the decision-making process. What Happened to the World of Data When the concept of big data started to become popular, the first data orchestrators appeared, such as Apache Ozzie (2010 by Yahoo), based on DAG XML configurations, a scheduled workflow, and very focused on the Hadoop ecosystem. A little later, Apache Airflow (2015 by Airbnb) appeared based on Python. It provided more capabilities such as moving from DAG XML configuration to programmatic configuration, and more integrations outside Hadoop ecosystems, but is also a scheduled workflows system. In the middle of both appeared Luigi (2012 by Spotify): based on Python, but pipeline-oriented instead of DAG, but including interesting software best practices such as A/B testing. <workflow-app name="useooziewf" xmlns="uri:oozie:workflow:0.1"> ... <decision name="mydecision"> <switch> <case to="reconsolidatejob"> ${fs:fileSize(secondjobOutputDir) gt 10 * GB} </case> <case to="rexpandjob"> ${fs:fileSize(secondjobOutputDir) lt 100 * MB} </case> <case to="recomputejob"> ${ hadoop:counters('secondjob')[RECORDS][REDUCE_OUT] lt 1000000 } </case> <default to="end"/> </switch> </decision> ... </workflow-app> Apache Airflow was the first real evolution in the data orchestrator, but in our opinion has some points of improvement that make it a product very focused on the traditional world of data and not on the new reality in which data is becoming the center of decision-making in companies. It has a poor UI interface, totally oriented toward data engineers. It is mainly oriented to executing tasks, without knowing what those tasks do. Everything is a task, increasing the complexity in terms of maintainability and comprehension. At the end of 2021 introduced the concept of sensors which are a special type of operator designed to wait for an external event such as a Kafka event, JMS message, or time-based. import requests from airflow.decorators import dag, task @dag( dag_id="example_complex", schedule_interval="0 0 * * *", start_date=pendulum.datetime(2022, 1, 1, tz="UTC"), catchup=False, dagrun_timeout=datetime.timedelta(minutes=60), ) def ExampleComplex(): ... @task def get_data(): ... @task def merge_data(): ... dag = ProcessEmployees() All these developments in orchestration solutions have something in common: the challenges faced by these companies (Yahoo, Airbnb, or Spotify) to manage the complexity in their data pipelines. Data-Driven: The New Era of Data This first release of the orchestrator tools was very focused on data engineers' experience and based on traditional analytical and operational platforms architecture such as data lakes, data hubs, or experiments data science workspaces. David and I (Miguel) began our journey into the world of data around the year 2017. Before that, we had been working on operational mission-critical solutions close to business processes and based on event-driven solutions. At that moment we found out about some of the tools such as Oozie or Airflow that were ETLs tools focused on scheduled tasks/workflow with no cloud solution offering, enterprise complex implementation and maintenance, poor scalability, and poor user experience. Our first thought at that moment was these are the tools that we would have to be content with right now but that we would not use in the next years. Nowadays the data-driven approach has changed everything, every day the border between analytical and operational workloads is more diffuse than ever. There are many business-critical processes based on analytical and operational workloads. Probably many of these critical processes would be very similar to the non-critical data pipeline a few years ago. In 2020, Zhamak Dehghani published an article about the principles of data mesh, some of them quite well-known. She wrote one sentence particularly significant for us regarding operational and analytical planes: "I do believe that at some point in the future, our technologies will evolve to bring these two planes even closer together, but for now, I suggest we keep their concerns separate." Our opinion is that these planes are closer than we think and in terms of business value, they are more achievable on a small scale than the logical architecture of data mesh itself. For instance, consider the fashion retail sector and critical processes such as allocation, e-commerce, or logistic solutions. All these processes that years ago were operational and many of them used traditional applications like SAP or Oracle, today, they need sophisticated data processes that include big data ingestion, transformation, analytics, and machine learning model to provide real-time recommendations and demand forecasts to allow data-driven decision-making. Of course, traditional data pipelines/workflows are needed and traditional solutions based on isolated operational and analytics platforms will continue to provide value for some reports, batch analytical processes, experiments, and other analytical innovations. But today there are other kinds of data processes; business data processes that have different needs and provide more business value. We need data solutions that provide the following capabilities: Better software development experience and management of data as code applying all the best practices such as isolated environments, unit testing, A/B, data contracts, etc. Friendly and rich integration ecosystem not only with the big data world but with the data world in general. Cloud-based solutions that provide easy scalability, low maintenance, and easy integration with IAM solutions. Much better user experience not only for developers but also for data scientists, data analysts, business analysts, and operational teams Introducing Dagster Dagster is a platform for data flow orchestration that goes beyond what we understand as a data traditional orchestrator. The project was started in 2018 by Nick Shrock and was conceived as a result of a need identified by him while working at Facebook. One of the goals of Dagster has been to provide a tool that removes the barrier between pipeline development and pipeline operation, but during this journey, he came to link the world of data processing with business processes. Dagster provides significant improvements over previous solutions. Oriented to data engineers, developers, and data/business operations engineers: Its versatility and abstraction allow us to design pipelines in a more developer-oriented way, applying software best practices and managing data, and data pipelines as code. Complies with the First-principles approach to data engineering, the full development lifecycle: development, deployment, monitoring, and observability It includes a new and differential concept, which is a software-defined asset. An asset is a data object or machine learning modeled in Dagster and persisted in a data repository. Dagit UI is a web-based interface for viewing and interacting with Dagster objects. A very intuitive, and user-friendly interface that allows a very simple operation. It is an open-source solution that at the same time offers a SaaS cloud solution that accelerates the implementation of the solution. A really fast learning curve that enables development teams to deliver value very early on Let’s Talk a Little About the Dagster Concepts We will explain basic concepts giving simple definitions, and examples that allow us to build a simple business data pipeline in the next article of this series. Common Components of an Orchestrator At a high level, these are the basic components of any orchestrator: Job: Main unit of execution and monitoring; instantiation of a Graph with configurations and parameters Ops: Basically, they are the tasks we want to execute, they contain the operations and should perform simple tasks such as executing a database query (to ingest or retrieve data), initiating a remote job (Spark, Python, etc.), or sending an event. Graph: Set of interconnected ops of sub-graphs, ops can be assembled into a graph to accomplish complex tasks Software-Defined Assets An asset is an object in persistent storage, such as a table, file, or persisted machine learning model. A software-defined asset is a Dagster object that couples an asset to the function and upstream assets used to produce its contents. This is a declarative data definition, in Python, that allows us to: Define data contracts as code and how those will be used in our pipelines. Define the composition of our entity, regardless of its physical structure through a projection of the data. Decouple the business logic to compute assets from the I/O logic to read to and write from the persistent storage. Apply testing best practices including the ability to use mocks when we are writing a unit test. The ability to define partitions opens up a world of possibilities for running our processes and also at the time of re-launching. Sometimes we only have to reprocess a partition that for some reason has been processed incorrectly or incompletely. An amazing capability is being able to use external tools such as DBT, Snowflake, Airbyte, Fivetran, and many others tools to define our assets. It is amazing because it allows us to be integrated into our global platform and not only in the big data ecosystem. Options for Launching Jobs In this case, the great differential is the capabilities that these sensors provide us with for: Schedules: It is used to execute a job at a fixed interval. Sensors: Allow us to run based on some external event such as Kafka event, new files in S3, specific asset materialization, or a data partition update. Partitions: Allow us to run based on changes in a subset of the data of an asset, for example, records in a data set that fall within a particular time window. Backfills: It provides the ability to re-execute the data pipeline on only the set of partitions we are interested in. For example, relaunch the pipeline that calculates the aggregation of store sales per country but only for the partition with the data of the USA stores. The combination of the capabilities offered by sensors, partitions, backfills, IO managers, and assets represents a very significant paradigm shift in the world of data pipeline orchestration. IO Managers It provides integration components with the persistent repositories that allow us to persist and load assets and Op outputs to/from S3, Snowflake, or other data repositories. These components contain all the integration logic with each of the external channels we use in our architecture. We can use existing integrations, extend them to include specific logic, or develop new ones. Lineage The use of assets provides a base layer of lineage, data observability, and data quality monitoring. In our opinion, data lineage is a very complex and key aspect that nowadays goes beyond traditional tables and includes APIs, topics, and other data sources. This is why we believe that although Dagster provides great capabilities, it should be one more source in the overall lineage of our platform and not the source of truth. Debugging and Observability Another differentiating capability of Dagster is that it provides data observability when we are using software-definition assets. Data operators or data engineers have several features to analyze a data pipeline: Pipeline status, ops status, and timing Logs with error and info traces Assets can include metadata that displays information with the link to access the data materialization. It even provides us with the capability to send relevant information to different channels such as slack, events, or persistence in a report in S3. These capabilities allow engineers to have self-autonomy and not feel the orchestrator as a black box. Metadata Dagster allows meta-information to be added practically at all levels and most importantly it is very accessible by data ops or any other operational user. Having metadata is very important but it must also be usable and that is where Dagster makes a difference. Operation teams have less process context and more cognitive changes because they do not participate in developments but at the same time manage multiple production flows. As soon as our data workflow becomes part of the business critical mission providing this meta-information is a must. Dagster for Critical Business Process Dagster allows us to have a data processing-oriented tool that we can integrate into our business processes in the critical path to provide the greatest business value. Let's think about stock replenishment retail processes from warehouses to the stores or other channels such as partners, or e-commerce. This is the process of replenishment of items from the distribution warehouses to the stores, the goal is that the right products are available in an optimal quantity and in the right place to meet the demand from customers. Improves the customer experience by ensuring that products are available across all channels and avoiding “out of stock." Increase profitability by avoiding replenishment of products with low sales probability. It is a business process where analytics and machine learning models have a lot of impacts. We can define this as a data-driven business process. Stock replenishment is a complex operational business-critical process that has demand forecasting and stock inventory as its main pillars: Demand forecast requires an advanced machine-learning process based on historical data to project future demand. The stock inventory provides the quantity of stock available at each location such as stores, warehouses, distributors, etc. Sales provide information on how demand for the products is behaving based on indicators such as pricing, markdowns, etc. These processes can be launched weekly, daily, or several times a day depending on whether the replenishment is done from a central warehouse or, for example, a warehouse close to the physical stores. This process requires some information updated in near real-time, but it is a data process that runs in batch or micro-batch mode. Dagster is the first data orchestrator that truly enables delivering business value from a purely operational point of view at the operational layer. Dagster Common Use Cases There are other more traditional use cases in Dagster such as: Data pipelines with different sources and as destinations for analytical systems such as Data Warehouses or Data Lakes Machine learning training model Analytical processes that integrate machine learning models Of course, Dagster represents an evolution for this type of process for the same criteria as mentioned above. Conclusions As the next few years are expected to be really exciting in the world of data, we are evolving especially in the construction of tools that allow us to generate a more real and closer impact on the business. Dagster is another step in that direction. The challenges are not new data architectures or complex analytical systems; the challenges are in providing real business value as soon as possible. The risks are to think that tools like Dagster and architectures like data mesh will bring value by themselves. These tools provide us with capabilities that we did not have years ago and allow us to design features that meet the needs of our customers. We need to learn from the mistakes we have made, applying continuous improvement and a critical thinking approach to be better. Though there is no "one ring to rule them all," Dagster is a fantastic tool and great innovation, like other tools such as Dbt, Apache Kafka, DataHub Data Catalog, and many more, but if we believe that one tool can solve all our needs, we will build a new generation of monoliths. Dagster, although a great product, is just another piece that complements our solutions to add value.
Are you planning to use Kafka Streams to build a distributed application? This article shows some advanced tips and techniques for Kafka Streams developers. Over the last two years, we discovered these techniques to handle advanced Kafka Streams capabilities. We built Kestra, an open-source data orchestration and scheduling platform, and we decided to use Kafka as the central datastore to build a scalable architecture. We rely heavily on Kafka Streams for most of our services (the executor and the scheduler) and have made some assumptions on how it handles the workload. However, Kafka has some restrictions since it is not a database, so we need to deal with the constraints and adapt the code to make it work with Kafka. We will cover topics, such as using the same Kafka topic for source and destination, and creating a custom joiner for Kafka Streams, to ensure high throughput and low latency while adapting to the constraints of Kafka and making it work with Kestra. Why Apache Kafka? Apache Kafka is an open-source distributed event store and stream-processing platform that handles high volumes of data at high velocity. The Kafka ecosystem also brings a robust streaming framework called Kafka Streams designed to simplify the creation of streaming data pipelines and perform high-level operations like joining and aggregation. One of its key benefits is the ability to embed the streaming application directly within your Java application, eliminating the need to manage a separate platform. While building Kestra, we wanted to rely only on the queue as a database for our application (persistent queue) without additional dependencies. We analyzed many candidates (RabbitMQ, Apache Pulsar, Redis, etc.) and found that Apache Kafka was the only one that covered everything for our use case. Same Kafka Topic for Source and Destination In Kestra, we have a Kafka topic for the current flow execution. That topic is both the source and destination. We update the current execution to add some information and send it back to Kafka for further processing. Initially, we were unsure if this design was possible with Kafka. We asked Matthias J. Sax, one of the primary maintainers of Kafka Streams, who responded on Stack Overflow. Yes, it’s possible if you are certain that, for the same key (the execution ID, in this case), you have only one process that can write it. If you see this warning in the console Detected out-of-order KTable update for execution at offset 10, partition 7, you likely have more than one process for the same key, which can lead to unexpected behavior (like overwriting previous values). Struggling to understand what this means? Imagine a topology with the topic as the source, some branching logic, and two different processes writing to the same destination: Java KStream<String, Execution> executions = builder .stream("executions", Consumed.with(Serdes.String(), JsonSerde.of(Execution.class))); executions .mapValues((readOnlyKey, value) -> value) .to("executions", Produced.with(Serdes.String(), JsonSerde.of(Execution.class))); executions .leftJoin( builder.table("results", Consumed.with(Serdes.String(), JsonSerde.of(WorkerTaskResult.class))), (readOnlyKey, value1, value2) -> value1 ) .to("executions", Produced.with(Serdes.String(), JsonSerde.of(Execution.class))); In this case, a concurrent process can write this topic on the same key, overwriting the previous value, effectively losing its data. In this context, you must define a single writer for a key at a given time. This leads us to our next section, a custom joiner. Custom Joiner for Kafka Streams We wrote a microservice to process the executions and split the microservice into multiple topics: A topic with the executions (with multiple tasks). A topic with tasks results. To allow the next task of a flow to start, we need to create a state with all tasks results merged into the current execution. Our first thought was to use join() from Kafka Streams. In hindsight, this was not a very clever decision. All joins provided by Kafka Streams were designed with aggregation in mind, like sum, avg, etc. It processes all the incoming data from both topics 1 to 1. We will see all the changes on the streams on both sides, as illustrated below: Markdown # timeline --A-------- > Execution -----B--C-- > Task Result # join result timeline - (A,null) - (A, B) => emit (A+B) - (A, C) => emit (A+C) <=== you have overwritten the result of A+B - (A+B, null) - (A+C, null) <== we will never have (A+B+C) However, we are building a state machine and want to keep the last state of execution, meaning we do not want to see the intermediate states. In this case, we have no choice but to build a custom joiner since Kafka Streams doesn’t have a built-in one. Our custom joiner needs to: Manually create a store that will save the last state of an execution. Create a custom merge function that will merge the execution stream with the tasks results stream. Get the last value from the state, add the task result, and emit the new state that will finally be saved on the store and final topic. With all this, we make sure the execution state will always be the last version, whatever the number of tasks results coming in parallel might be. Distributed Workload Between Multiple Backends In Kestra, a scheduler will look up all flows either with scheduled execution or a long-polling mechanism (detecting files on S3 or SFTP). To avoid a single point of failure on this service, we needed to split the flows between all instances of schedulers. We rely on Kafka’s consumer groups that will handle the complexity of a distributed system for us. Here's how we do it: Create a Kafka stream that will read in a KTable and transmit all the results to a Consumer. Listen to state changes (mostly REBALANCED streams) and empty all the flows for the Consumer. On the READY state, read all the KTable again. With these, all flows will be dispatched to all listeners. That means if you have a thousand flows, every consumer will have ~500 flows (depending on the repartition of keys). Kafka will handle all the heavy parts of the distributed systems, such as: Heartbeat to detect failure for a consumer. Notifications for rebalancing. Ensure exactly-once semantic for a topic, ensuring only one consumer will handle the data. This way, you will have a fully distributed system thanks to Kafka without the pain of going through a Jespen analysis. Partitions To Detect Dead Kafka Consumers In Kestra, workers are Kafka Consumers that process tasks submitted to it and will handle all the computing (connect and query a database, fetch data from external services, etc.) and are long-running processes. We need to detect when a worker was processing a task and died. The reasons for the process “dying” could range from an outage to a simple restart during processing. Thanks to the Kafka consumer mechanism, we can know the specific partitions affected by a died consumer. We use these features to detect dead workers: We create a UUID on startup for the worker. When a consumer connects to Kafka, we listen to the partitions affected using a ConsumerRebalanceListener. We publish to Kafka a WorkerInstance with the UUID and assigned partitions. For each task run, we publish a TaskRunning message with the worker UUID. Now, let’s handle the data stored in Kafka. The main logic is a Kafka Stream, which will: Create a global KTable with all the WorkerInstance. On every change, it will listen to the changed WorkerInstance. If there is a new WorkerInstance, we look at the partitions assigned to it. If there is an overlap between this instance’s partitions and the previous one, we know that the previous WorkerInstance is dead. In Kafka, you can’t have two consumers on the same partition. We only need to look at the affected tasks in this WorkerInstance and resend them for processing. Et voilà! We have detection of dead consumers using just the Kafka API. Beware of State Store all() We use a GlobalKTable to detect flow triggers. For all the flows on the cluster, we test all the flow’s conditions to find matching flows. For this, we are using an API to fetch all flows from a GlobalKTable using store.all() that returns all the flows from RocksDB (internal database from Kafka Stream). Our first assumption was that all() returns an object (Flow in our case), as the API return object, but we discovered that the all() method will: Fetch all the data from RocksDB. Deserialize the data from RocksDB that is stored as byte, and map it to concrete Java POJO. So, each time we call the all() method, all values are deserialized, which can lead to high CPU usage and latency on your stream. We are talking about all flow revisions on our cluster. The last revision had 2.5K flows, but we don’t see people creating a lot of revisions. Imagine 100K byte[] to deserialize to POJO for every call. Since we only need the last revision in our use case, we create an in-memory map with all the flows using the following: Java builder.addGlobalStore( Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore(FLOW_STATE_STORE_NAME), Serdes.String(), JsonSerde.of(Flow.class) ), kafkaAdminService.getTopicName(Flow.class), Consumed.with(Serdes.String(), JsonSerde.of(Flow.class)).withName("GlobalStore.Flow"), () -> new GlobalInMemoryStateProcessor<>( FLOW_STATE_STORE_NAME,, flows -> kafkaFlowExecutor.setFlows(flows), store -> kafkaFlowExecutor.setStore(store) ) ); GlobalInMemoryStateProcessor is a simple wrapper that saves the state store and sends a complete list on every change (not so frequent). Using this, we decided to gather all the flows in memory. This works well for our use cases because we know that an instance of Kestra will not have millions of flows. Remember that all store operations (like GET) will lead to deserialization that costs you some CPU. Many Source Topics Within a Kafka Stream At first, we designed Kestra to have only one huge stream for all the processing of the executor. At first, it seemed cool, but this led to some drawbacks. Here is the last version of our main and only Kafka Stream with many topics (High resolution version here): Yes, this is a huge Kafka Stream. It was working well despite its complexity. But the major drawbacks were: Monitoring: All the metrics are under the same consumer group. Debugging: Each topic is consumed independently during a crash. When a message fails, the whole process crashes. Lag: This is the most important one. Since Kafka Streams optimize the consumption of messages by themselves, a topic with large outputs could lead to lag on unrelated topics. In that case, it is impossible to understand the lag on our consumers. Now, we have decided to split it into multiple streams to be able to monitor and properly understand the lag on our Kafka Streams. To split our giant stream, we dealt with only one topic at a time. We consumed only one topic at a time (to avoid large network transit), so we grouped all streams by source topics. Do More With Kafka Streams We have covered some tips that took us a lot of time to find to deal with our issues. Even though there were some challenges, we could adapt our code so that Kafka worked well for our use case. We learned how to use the same Kafka topic for source and destination, write a custom joiner for Kafka Streams, distribute workloads between multiple backends, use partitions to detect dead Kafka Consumers, tradeoffs for using state store all(), and using many source topics within a Kafka Stream. We hope you enjoyed our story.
In today’s M2M (Machine to machine) communications landscape, there is a huge requirement for streaming the digital data from heterogeneous IoT devices to the various RDBMS for further analysis via the dashboard, triggering different events to perform numerous actions. To support the above scenarios, Apache Kafka acts like a central nervous system where data can be ingested from various IoT devices and persisted into various types of the repository, RDBMS, cloud storage, etc. Besides, various types of data pipelines can be executed before or after data arrives at Kafka’s topic. By using the Kafka JDBC sink connector, we can stream data continuously from Kafka’s topic into respective RDBMS. The Biggest JDBC Sink Connector Difficulty The biggest difficulty with the JDBC sink connector is that it requires knowledge of the schema of data that has already landed on the Kafka topic. Schema Registry must, therefore, be integrated as a separate component with the exiting Kafka cluster in order to transfer the data into the RDBMS. Therefore, to sink data from the Kafka topic to the RDBMS, the producers must publish messages/data containing the schema. The schema defines the structure of the data format. If the schema is not provided, the JDBC sink connector would not be able to map the messages with the database table’s columns after consuming messages from the topic. By leveraging Schema Registry, we can avoid sending schema every time with messages/payloads from the producers because Schema Registry stores (or registers) schemas in _schemas topic and bind accordingly with the configured/mentioned topic name as defined in the JDBC sink connector’s properties file. The licensing cost might be the hurdle for small or medium size companies who wish to use Oracle or Confluent’s Schema Registry with open source Apache Kafka to gather IoT device data for their business perspectives. In this article, we are going to use Java code snippet to see how data can be streamed continuously into MySQL database from Apache Kafka topic by using JDBC Sink connector without Schema Registry. Apache Kafka and JDBC Connectors Apache Kafka has not bundled the JDBC connectors for vendor-specific RDBMS similar to file source and sink connectors. It’s our responsibility to implement or develop the code for specific RDBMS by implementing Apache Kafka’s Connect API. But Confluent has developed, tested, and supported JDBC Sink Connector and eventually open-sourced under Confluent Community License, so we have integrated JDBC Sink Connector with Apache Kafka. There won’t be any exception thrown from the topic even if we send the incorrect schema or no schema at all because the Kafka topic accepts all messages or records as byte arrays in key-value pairs. Before transmitting the entire message to the topic, the producer has to convert the message into a byte array by using serializers. Below is the sample schema that is bonded with payload or actual data that has to be published from the Apache Kafka message producers. Also, here is the Java code snippet for the message producer: public class ProducerWithSchema { private String status = "Failed"; private String paylaodWithSchema = "{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"type\": \"int32\", \"optional\": false, \"field\": \"deviceId\" }, { \"type\": \"string\", \"optional\": false, \"field\": \"deviceData\" }, { \"type\": \"int64\", \"optional\": false, \"name\": \"org.apache.kafka.connect.data.Timestamp\", \"version\": 1, \"field\": \"generatedTime\" } ] }, \"payload\": { \"deviceId\": 3000, \"deviceData\": \"PPPPPwfgjijk\", \"generatedTime\": 1401984166000} }"; private String key = "first"; public Producer createProducer() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IKafkaConstants.KAFKA_BROKERS); props.put(ProducerConfig.CLIENT_ID_CONFIG, IKafkaConstants.CLIENT_ID); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()); //props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName()); System.setProperty("org.apache.logging.log4j.level", "INFO"); return new KafkaProducer(props); } public String sendMsgToTopic(){ Producer producer = null; ObjectMapper objectMapper = new ObjectMapper(); try { JsonNode jsonNode = objectMapper.readTree(paylaodWithSchema); ProducerRecord<String, JsonNode> record = new ProducerRecord<String, JsonNode>(IKafkaConstants.TOPIC_NAME,jsonNode); producer = this.createProducer(); producer.send(record); producer.flush(); producer.close(); }catch (Exception e) { System.out.println("Error in sending record"); System.out.println(e.getMessage()); } return status; } public static void main(String[] args) { // TODO Auto-generated method stub new ProducerWithSchema().sendMsgToTopic(); } } Of course, with the above approach, a couple of bottlenecks are there, such as: Tightly coupled between messages and schema. Every time schema should be clubbed with actual data. Issues with schema evolution. Code maintainability, etc. To mitigate or resolve the above issues, the Schema Registry has been introduced as a separate component where all the schemas would be deployed/maintained. Compatibility checks are necessary during schema evolution to make sure, the producer-consumer contract is upheld and Schema Registry can be utilized to achieve this. You could watch the below video to see how data is streaming continuously from topic to MySQL’s specific table using JDBC sink connector on single-node Apache Kafka cluster. Conclusion By now, you should have a better understanding of the biggest difficulty with the JDBC connector and bundling Apache Kafka with JDBC connectors. I hope you have enjoyed this read. Please like and share if you feel this composition is valuable.
In this hands-on lab from ScyllaDB University, you will learn how to use the ScyllaDB CDC source connector to push the row-level changes events in the tables of a ScyllaDB cluster to a Kafka server. What Is ScyllaDB CDC? To recap, Change Data Capture (CDC) is a feature that allows you to not only query the current state of a database’s table but also query the history of all changes made to the table. CDC is production-ready (GA) starting from ScyllaDB Enterprise 2021.1.1 and ScyllaDB Open Source 4.3. In ScyllaDB, CDC is optional and enabled on a per-table basis. The history of changes made to a CDC-enabled table is stored in a separate associated table. You can enable CDC when creating or altering a table using the CDC option, for example: CREATE TABLE ks.t (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true}; ScyllaDB CDC Source Connector ScyllaDB CDC Source Connector is a source connector capturing row-level changes in the tables of a ScyllaDB cluster. It is a Debezium connector, compatible with Kafka Connect (with Kafka 2.6.0+). The connector reads the CDC log for specified tables and produces Kafka messages for each row-level INSERT, UPDATE, or DELETE operation. The connector is fault-tolerant, retrying reading data from Scylla in case of failure. It periodically saves the current position in the ScyllaDB CDC log using Kafka Connect offset tracking. Each generated Kafka message contains information about the source, such as the timestamp and the table name. Note: at the time of writing, there is no support for collection types (LIST, SET, MAP) and UDTs—columns with those types are omitted from generated messages. Stay up to date on this enhancement request and other developments in the GitHub project. Confluent and Kafka Connect Confluent is a full-scale data streaming platform that enables you to easily access, store, and manage data as continuous, real-time streams. It expands the benefits of Apache Kafka with enterprise-grade features. Confluent makes it easy to build modern, event-driven applications, and gain a universal data pipeline, supporting scalability, performance, and reliability. Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other data systems. It makes it simple to define connectors that move large data sets in and out of Kafka. It can ingest entire databases or collect metrics from application servers into Kafka topics, making the data available for stream processing with low latency. Kafka Connect includes two types of connectors: Source connector: Source connectors ingest entire databases and stream table updates to Kafka topics. Source connectors can also collect metrics from application servers and store the data in Kafka topics, making the data available for stream processing with low latency. Sink connector: Sink connectors deliver data from Kafka topics to secondary indexes, such as Elasticsearch, or batch systems, such as Hadoop, for offline analysis. Service Setup With Docker In this lab, you’ll use Docker. Please ensure that your environment meets the following prerequisites: Docker for Linux, Mac, or Windows. Note: running ScyllaDB in Docker is only recommended to evaluate and try ScyllaDB. ScyllaDB open source. For the best performance, a regular install is recommended. 8 GB of RAM or greater for Kafka and ScyllaDB services. docker-compose Git ScyllaDB Install and Init Table First, you’ll launch a three-node ScyllaDB cluster and create a table with CDC enabled. If you haven’t done so yet, download the example from git: git clone https://github.com/scylladb/scylla-code-samples.git cd scylla-code-samples/CDC_Kafka_Lab This is the docker-compose file you’ll use. It starts a three-node ScyllaDB Cluster: version: "3" services: scylla-node1: container_name: scylla-node1 image: scylladb/scylla:5.0.0 ports: - 9042:9042 restart: always command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0 scylla-node2: container_name: scylla-node2 image: scylladb/scylla:5.0.0 restart: always command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0 scylla-node3: container_name: scylla-node3 image: scylladb/scylla:5.0.0 restart: always command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0 Launch the ScyllaDB cluster: docker-compose -f docker-compose-scylladb.yml up -d Wait for a minute or so, and check that the ScyllaDB cluster is up and in normal status: docker exec scylla-node1 nodetool status Next, you’ll use cqlsh to interact with ScyllaDB. Create a keyspace, and a table with CDC enabled, and insert a row into the table: docker exec -ti scylla-node1 cqlsh CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1}; CREATE TABLE ks.my_table (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true}; INSERT INTO ks.my_table(pk, ck, v) VALUES (1, 1, 20); exit [guy@fedora cdc_test]$ docker-compose -f docker-compose-scylladb.yml up -d Creating scylla-node1 ... done Creating scylla-node2 ... done Creating scylla-node3 ... done [guy@fedora cdc_test]$ docker exec scylla-node1 nodetool status Datacenter: datacenter1 ======================= Status=Up/Down |/ State=Normal/Leaving/Joining/Moving -- Address Load Tokens Owns Host ID Rack UN 172.19.0.3 ? 256 ? 4d4eaad4-62a4-485b-9a05-61432516a737 rack1 UN 172.19.0.2 496 KB 256 ? bec834b5-b0de-4d55-b13d-a8aa6800f0b9 rack1 UN 172.19.0.4 ? 256 ? 2788324e-548a-49e2-8337-976897c61238 rack1 Note: Non-system keyspaces don't have the same replication settings, effective ownership information is meaningless [guy@fedora cdc_test]$ docker exec -ti scylla-node1 cqlsh Connected to at 172.19.0.2:9042. [cqlsh 5.0.1 | Cassandra 3.0.8 | CQL spec 3.3.1 | Native protocol v4] Use HELP for help. cqlsh> CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1}; cqlsh> CREATE TABLE ks.my_table (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true}; cqlsh> INSERT INTO ks.my_table(pk, ck, v) VALUES (1, 1, 20); cqlsh> exit [guy@fedora cdc_test]$ Confluent Setup and Connector Configuration To launch a Kafka server, you’ll use the Confluent platform, which provides a user-friendly web GUI to track topics and messages. The confluent platform provides a docker-compose.yml file to set up the services. Note: this is not how you would use Apache Kafka in production. The example is useful for training and development purposes only. Get the file: wget -O docker-compose-confluent.yml https://raw.githubusercontent.com/confluentinc/cp-all-in-one/7.3.0-post/cp-all-in-one/docker-compose.yml Next, download the ScyllaDB CDC connector: wget -O scylla-cdc-plugin.jar https://github.com/scylladb/scylla-cdc-source-connector/releases/download/scylla-cdc-source-connector-1.0.1/scylla-cdc-source-connector-1.0.1-jar-with-dependencies.jar Add the ScyllaDB CDC connector to the Confluent connect service plugin directory using a Docker volume by editing docker-compose-confluent.yml to add the two lines as below, replacing the directory with the directory of your scylla-cdc-plugin.jar file. image: cnfldemos/cp-server-connect-datagen:0.5.3-7.1.0 hostname: connect container_name: connect + volumes: + - <directory>/scylla-cdc-plugin.jar:/usr/share/java/kafka/plugins/scylla-cdc-plugin.jar depends_on: - broker - schema-registry Launch the Confluent services: docker-compose -f docker-compose-confluent.yml up -d Wait a minute or so, then access http://localhost:9021 for the Confluent web GUI. Add the ScyllaConnector using the Confluent dashboard: Add the Scylla Connector by clicking the plugin: Fill the “Hosts” with the IP address of one of the Scylla nodes (you can see it in the output of the nodetool status command) and port 9042, which is listened to by the ScyllaDB service. The “Namespace” is the keyspace you created before in ScyllaDB. Notice that it might take a minute or so for the ks.my_table to appear: Test Kafka Messages You can see that MyScyllaCluster.ks.my_table is the topic created by the ScyllaDB CDC connector. Now, check for Kafka messages from the Topics panel: Select the topic, which is the same as the keyspace and table name that you created in ScyllaDB: From the “Overview” tab, you can see the topic info. At the bottom, it shows this topic is on partition 0. A partition is the smallest storage unit that holds a subset of records owned by a topic. Each partition is a single log file where records are written to it in an append-only fashion. The records in the partitions are each assigned a sequential identifier called the offset, which is unique for each record within the partition. The offset is an incremental and immutable number maintained by Kafka. As you already know, the ScyllaDB CDC messages are sent to the ks.my_table topic, and the partition id of the topic is 0. Next, go to the “Messages” tab and enter partition id 0 into the “offset” field: You can see from the output of the Kafka topic messages that the ScyllaDB table INSERT event and the data were transferred to Kafka messages by the Scylla CDC Source Connector. Click on the message to view the full message info: The message contains the ScyllaDB table name and keyspace name with the time, as well as the data status before the action and afterward. Since this is an insert operation, the data before the insert is null. Next, insert another row into the ScyllaDB table: docker exec -ti scylla-node1 cqlsh INSERT INTO ks.my_table(pk, ck, v) VALUES (200, 50, 70); Now, in Kafka, wait for a few seconds and you can see the details of the new Message: Cleanup Once you are done working on this lab, you can stop and remove the Docker containers and images. To view a list of all container IDs: docker container ls -aq Then you can stop and remove the containers you are no longer using: docker stop <ID_or_Name> docker rm <ID_or_Name> Later, if you want to rerun the lab, you can follow the steps and use docker-compose as before. Summary With the CDC source connector, a Kafka plugin compatible with Kafka Connect, you can capture all the ScyllaDB table row-level changes (INSERT, UPDATE, or DELETE) and convert those events to Kafka messages. You can then consume the data from other applications or perform any other operation with Kafka.
Miguel Garcia
VP of Engineering,
Nextail Labs
Gautam Goswami
Founder,
DataView
Alexander Eleseev
Full Stack Developer,
First Line Software
Ben Herzberg
Chief Scientist,
Satori