Join us on YugabyteDB Community Slack
Star us on
Get Started
Slack
GitHub
Get Started
v2.13 (latest) v2.12 (stable) v2.8 (earlier version) v2.6 (earlier version) v2.4 (earlier version) v2.2 (earlier version) v2.1 (earlier version) v2.0 (earlier version) v1.3 (earlier version)
  • YUGABYTEDB CORE
    • Quick start
      • 1. Install YugabyteDB
      • 2. Create a local cluster
      • 3. Explore distributed SQL
      • 4. Build an application
        • Java
        • Node.js
        • Go
        • Python
        • Ruby
        • C#
        • PHP
        • C++
        • C
        • Scala
    • Explore
      • SQL features
        • Schemas and Tables
        • Data Types
        • Data Manipulation
        • Queries and Joins
        • Expressions and Operators
        • Stored Procedures
        • Triggers
        • Advanced features
          • Cursors
          • Table Partitioning
          • Views
          • Savepoints
          • Collations
          • Extensions
        • Going beyond SQL
          • Follower reads
          • Tablespaces
      • Fault tolerance
      • Horizontal scalability
        • Scaling Transactions
        • Sharding Data
      • Transactions
        • Distributed Transactions
        • Isolation Levels
        • Explicit Locking
      • Indexes and Constraints
        • Overview
        • Unique Indexes
        • Partial Indexes
        • Expression Indexes
        • Generalized Inverted Indexes
        • Primary Key
        • Foreign Key
        • Other Constraints
      • JSON support
      • Multi-region deployments
        • Sync replication (3+ regions)
        • Async Replication (2+ regions)
        • Row-Level Geo-Partitioning
        • Read replicas
      • Query tuning
        • Introduction
        • Get query statistics using pg_stat_statements
        • Viewing live queries with pg_stat_activity
        • Analyzing queries with EXPLAIN
        • Optimizing YSQL queries using pg_hint_plan
      • Cluster management
        • Point-in-time recovery
      • Security
      • Observability
        • Prometheus Integration
        • Grafana Dashboard
    • Develop
      • Learn app development
        • 1. SQL vs NoSQL
        • 2. Data modeling
        • 3. Data types
        • 4. ACID transactions
        • 5. Aggregations
        • 6. Batch operations
        • 7. Date and time
        • 8. Strings and text
        • 9. TTL for data expiration
      • Real-world examples
        • E-Commerce app
        • IoT fleet management
      • Explore sample apps
      • Best practices
      • Cloud-native development
        • Codespaces
        • Gitpod
    • Migrate
      • Migration process overview
      • Migrate from PostgreSQL
        • Convert a PostgreSQL schema
        • Migrate a PostgreSQL application
        • Export PostgreSQL data
        • Prepare a cluster
        • Import PostgreSQL data
        • Verify Migration
    • Deploy
      • Deployment checklist
      • Manual deployment
        • 1. System configuration
        • 2. Install software
        • 3. Start YB-Masters
        • 4. Start YB-TServers
        • 5. Verify deployment
      • Kubernetes
        • Single-zone
          • Open Source
          • Amazon EKS
          • Google Kubernetes Engine
          • Azure Kubernetes Service
        • Multi-zone
          • Amazon EKS
          • Google Kubernetes Engine
        • Multi-cluster
          • Google Kubernetes Engine
        • Best practices
        • Connect Clients
      • Docker
      • Public clouds
        • Amazon Web Services
        • Google Cloud Platform
        • Microsoft Azure
      • Multi-DC deployments
        • Three+ data center (3DC)
        • Asynchronous Replication
        • Read replica clusters
    • Benchmark
      • TPC-C
      • sysbench
      • YCSB
      • Key-value workload
      • Large datasets
      • Scalability
        • Scaling queries
      • Resilience
        • Jepsen testing
      • Performance Troubleshooting
    • Secure
      • Security checklist
      • Enable Authentication
        • Enable User Authentication
        • Configure ysql_hba_conf_csv
      • Authentication Methods
        • Password Authentication
        • LDAP Authentication
        • Host-Based Authentication
        • Trust Authentication
      • Role-Based Access Control
        • Overview
        • Manage Users and Roles
        • Grant Privileges
        • Row-Level Security (RLS)
        • Column-Level Security
      • Encryption in Transit
        • Create server certificates
        • Enable server-to-server encryption
        • Enable client-to-server encryption
        • Connect to Clusters
        • TLS and authentication
      • Encryption at rest
      • Column-level encryption
      • Audit Logging
        • Configure Audit Logging
        • Session-Level Audit Logging
        • Object-Level Audit Logging
      • Vulnerability disclosure policy
    • Manage
      • Back up and restore
        • Back up data
        • Restore data
        • Point-in-time recovery
        • Snapshot and restore data
      • Migrate data
        • Bulk import
        • Bulk export
      • Change cluster configuration
      • Diagnostics reporting
      • Upgrade a deployment
      • Grow cluster
    • Troubleshoot
      • Troubleshooting
      • Cluster level issues
        • YCQL connection issues
        • YEDIS connection Issues
        • Recover tserver/master
        • Replace a failed YB-TServer
        • Replace a failed YB-Master
        • Manual remote bootstrap when a majority of peers fail
      • Node level issues
        • Check servers
        • Inspect logs
        • System statistics
        • Disk failure
        • Common error messages
    • Contribute
      • Core database
        • Contribution checklist
        • Build the source
        • Configure a CLion project
        • Run the tests
        • Coding style
  • YUGABYTE PLATFORM
    • Overview
      • Install
      • Configure
    • Install Yugabyte Platform
      • Prerequisites
      • Prepare the environment
      • Install software
      • Prepare nodes (on-prem)
      • Uninstall software
    • Configure Yugabyte Platform
      • Create admin user
      • Configure the cloud provider
      • Configure the backup target
      • Configure alerts
    • Create deployments
      • Multi-zone universe
      • Multi-region universe
      • Multi-cloud universe
      • Read replica cluster
      • Asynchronous replication
    • Manage deployments
      • Start and stop processes
      • Add a node
      • Eliminate an unresponsive node
      • Enable high availability
      • Edit configuration flags
      • Edit a universe
      • Delete a universe
      • Configure instance tags
      • Upgrade YugabyteDB software
      • Migrate to Helm 3
    • Back up universes
      • Configure backup storage
      • Back up universe data
      • Restore universe data
      • Schedule data backups
    • Security
      • Security checklist
      • Customize ports
      • LDAP authentication
      • Authorization platform
      • Create a KMS configuration
      • Enable encryption at rest
      • Enable encryption in transit (TLS)
      • Network security
    • Alerts and monitoring
      • Alerts
      • Live Queries dashboard
      • Slow Queries dashboard
    • Troubleshoot
      • Install and upgrade issues
      • Universe issues
    • Administer Yugabyte Platform
      • Back Up Yugabyte Platform
      • Authenticate with LDAP
    • Upgrade Yugabyte Platform
      • Upgrade using Replicated
  • YUGABYTE CLOUD
    • Overview
    • Quick start
      • Create a free cluster
      • Connect to the cluster
      • Create a database
      • Explore distributed SQL
      • Build an application
        • Before you begin
        • Java
        • Go
        • Python
        • Node.js
        • C
        • C++
        • C#
        • Ruby
        • Rust
        • PHP
    • Deploy clusters
      • Planning a cluster
      • Create a free cluster
      • Create a standard cluster
      • VPC network
        • Overview
        • Set up a VPC network
        • VPCs
        • Peering Connections
    • Secure clusters
      • IP allow lists
      • Database authorization
      • Add database users
      • Encryption in transit
      • Audit cloud activity
    • Connect to clusters
      • Cloud Shell
      • Client shell
      • Connect applications
    • Alerts and monitoring
      • Alerts
      • Performance metrics
      • Live queries
      • Slow YSQL queries
      • Cluster activity
    • Manage clusters
      • Backup and restore
      • Scale and configure clusters
      • Create extensions
    • Administer Yugabyte Cloud
      • Manage cloud users
      • Manage billing
      • Cluster costs
    • Example applications
      • Connect a Spring application
      • Connect a YCQL Java application
      • Hasura Cloud
      • Deploy a GraphQL application
    • Security architecture
      • Security architecture
      • Shared responsibility model
    • Troubleshoot
    • Yugabyte Cloud FAQ
    • What's new
  • INTEGRATIONS
    • Apache Kafka
    • Apache Spark
    • JanusGraph
    • KairosDB
    • Presto
    • Metabase
    • WSO2 Identity Server
    • YSQL Loader
    • Yugabyte JDBC Driver
    • Prisma
    • Hasura
      • Application Development
      • Benchmarking
    • Spring Framework
      • Spring Data YugabyteDB
      • Spring Data Cassandra
    • Flyway
    • GORM
    • Liquibase
    • Sequelize
    • SQLAlchemy
    • Entity Framework
    • Django REST framework
  • REFERENCE
    • Architecture
      • Design goals
      • Key concepts
        • Universe
        • YB-TServer Service
        • YB-Master Service
      • Core functions
        • Universe creation
        • Table creation
        • Write IO path
        • Read IO path
        • High availability
      • Layered architecture
      • Query layer
        • Overview
      • DocDB transactions layer
        • Transactions overview
        • Transaction isolation levels
        • Explicit locking
        • Read Committed
        • Single-row transactions
        • Distributed transactions
        • Transactional IO path
      • DocDB sharding layer
        • Hash & range sharding
        • Tablet splitting
        • Colocated tables
      • DocDB replication layer
        • Replication
        • xCluster replication
        • Read replicas
        • Change data capture (CDC)
      • DocDB storage layer
        • Persistence
        • Performance
    • APIs
      • YSQL
        • The SQL language
          • SQL statements
            • ABORT
            • ALTER DATABASE
            • ALTER DEFAULT PRIVILEGES
            • ALTER DOMAIN
            • ALTER GROUP
            • ALTER POLICY
            • ALTER ROLE
            • ALTER SEQUENCE
            • ALTER TABLE
            • ALTER USER
            • ANALYZE
            • BEGIN
            • CALL
            • COMMENT
            • COMMIT
            • COPY
            • CREATE AGGREGATE
            • CREATE CAST
            • CREATE DATABASE
            • CREATE DOMAIN
            • CREATE EXTENSION
            • CREATE FUNCTION
            • CREATE GROUP
            • CREATE INDEX
            • CREATE MATERIALIZED VIEW
            • CREATE OPERATOR
            • CREATE OPERATOR CLASS
            • CREATE POLICY
            • CREATE PROCEDURE
            • CREATE ROLE
            • CREATE RULE
            • CREATE SCHEMA
            • CREATE SEQUENCE
            • CREATE TABLE
            • CREATE TABLE AS
            • CREATE TRIGGER
            • CREATE TYPE
            • CREATE USER
            • CREATE VIEW
            • DEALLOCATE
            • DELETE
            • DO
            • DROP AGGREGATE
            • DROP CAST
            • DROP DATABASE
            • DROP DOMAIN
            • DROP EXTENSION
            • DROP FUNCTION
            • DROP GROUP
            • DROP MATERIALIZED VIEW
            • DROP OPERATOR
            • DROP OPERATOR CLASS
            • DROP OWNED
            • DROP POLICY
            • DROP PROCEDURE
            • DROP ROLE
            • DROP RULE
            • DROP SEQUENCE
            • DROP TABLE
            • DROP TRIGGER
            • DROP TYPE
            • DROP USER
            • END
            • EXECUTE
            • EXPLAIN
            • GRANT
            • INSERT
            • LOCK
            • PREPARE
            • REASSIGN OWNED
            • REFRESH MATERIALIZED VIEW
            • RELEASE SAVEPOINT
            • RESET
            • REVOKE
            • ROLLBACK
            • ROLLBACK TO SAVEPOINT
            • SAVEPOINT
            • SELECT
            • SET
            • SET CONSTRAINTS
            • SET ROLE
            • SET SESSION AUTHORIZATION
            • SET TRANSACTION
            • SHOW
            • SHOW TRANSACTION
            • TRUNCATE
            • UPDATE
            • VALUES
          • WITH clause
            • WITH clause—SQL syntax and semantics
            • recursive CTE
            • case study—traversing an employee hierarchy
            • traversing general graphs
              • graph representation
              • common code
              • undirected cyclic graph
              • directed cyclic graph
              • directed acyclic graph
              • rooted tree
              • Unique containing paths
              • Stress testing find_paths()
            • case study—Bacon Numbers from IMDb
              • Bacon numbers for synthetic data
              • Bacon numbers for IMDb data
        • Data types
          • Array
            • array[] constructor
            • Literals
              • Text typecasting and literals
              • Array of primitive values
              • Row
              • Array of rows
            • FOREACH loop (PL/pgSQL)
            • array of DOMAINs
            • Functions and operators
              • ANY and ALL
              • Array comparison
              • Array slice operator
              • Array concatenation
              • Array properties
              • array_agg(), unnest(), generate_subscripts()
              • array_fill()
              • array_position(), array_positions()
              • array_remove()
              • array_replace() / set value
              • array_to_string()
              • string_to_array()
          • Binary
          • Boolean
          • Character
          • Date and time
            • Conceptual background
            • Timezones and UTC offsets
              • Catalog views
              • Extended_timezone_names
                • Unrestricted full projection
                • Real timezones with DST
                • Real timezones no DST
                • Synthetic timezones no DST
              • Offset/timezone-sensitive operations
                • Timestamptz to/from timestamp conversion
                • Pure 'day' interval arithmetic
              • Four ways to specify offset
                • Name-resolution rules
                  • 1 case-insensitive resolution
                  • 2 ~names.abbrev never searched
                  • 3 'set timezone' string not resolved in ~abbrevs.abbrev
                  • 4 ~abbrevs.abbrev before ~names.name
                  • Helper functions
              • Syntax contexts for offset
              • Recommended practice
            • Typecasting between date-time and text-values
            • Semantics of the date-time data types
              • Date data type
              • Time data type
              • Plain timestamp and timestamptz
              • Interval data type
                • Interval representation
                  • Ad hoc examples
                  • Representation model
                • Interval value limits
                • Declaring intervals
                • Justify() and extract(epoch...)
                • Interval arithmetic
                  • Interval-interval comparison
                  • Interval-interval addition and subtraction
                  • Interval-number multiplication
                  • Moment-moment overloads of "-"
                  • Moment-interval overloads of "+" and "-"
                • Custom interval domains
                • Interval utility functions
            • Typecasting between date-time datatypes
            • Operators
              • Test comparison overloads
              • Test addition overloads
              • Test subtraction overloads
              • Test multiplication overloads
              • Test division overloads
            • General-purpose functions
              • Creating date-time values
              • Manipulating date-time values
              • Current date-time moment
              • Delaying execution
              • Miscellaneous
                • Function age()
                • Function extract() | date_part()
                • Implementations that model the overlaps operator
            • Formatting functions
            • Case study—SQL stopwatch
            • Download & install the date-time utilities
            • ToC
          • JSON
            • JSON literals
            • Primitive and compound data types
            • Code example conventions
            • Indexes and check constraints
            • Functions & operators
              • ::jsonb, ::json, ::text (typecast)
              • ->, ->>, #>, #>> (JSON subvalues)
              • - and #- (remove)
              • || (concatenation)
              • = (equality)
              • @> and <@ (containment)
              • ? and ?| and ?& (key or value existence)
              • array_to_json()
              • jsonb_agg()
              • jsonb_array_elements()
              • jsonb_array_elements_text()
              • jsonb_array_length()
              • jsonb_build_object()
              • jsonb_build_array()
              • jsonb_each()
              • jsonb_each_text()
              • jsonb_extract_path()
              • jsonb_extract_path_text() and json_extract_path_text()
              • jsonb_object()
              • jsonb_object_agg()
              • jsonb_object_keys()
              • jsonb_populate_record()
              • jsonb_populate_recordset()
              • jsonb_pretty()
              • jsonb_set() and jsonb_insert()
              • jsonb_strip_nulls()
              • jsonb_to_record()
              • jsonb_to_recordset()
              • jsonb_typeof()
              • row_to_json()
              • to_jsonb()
          • Money
          • Numeric
          • Range
          • Serial
          • UUID
        • Functions and operators
          • Aggregate functions
            • Informal functionality overview
            • Invocation syntax and semantics
            • grouping sets, rollup, cube
            • Per function signature and purpose
              • avg(), count(), max(), min(), sum()
              • array_agg(), string_agg(), jsonb_agg(), jsonb_object_agg()
              • bit_and(), bit_or(), bool_and(), bool_or()
              • variance(), var_pop(), var_samp(), stddev(), stddev_pop(), stddev_samp()
              • linear regression
                • covar_pop(), covar_samp(), corr()
                • regr_%()
              • mode(), percentile_disc(), percentile_cont()
              • rank(), dense_rank(), percent_rank(), cume_dist()
            • case study—percentile_cont() and the "68–95–99.7" rule
            • case study—linear regression on COVID data
              • Download the COVIDcast data
              • Ingest the COVIDcast data
                • Inspect the COVIDcast data
                • Copy the .csv files to staging tables
                • Check staged data conforms to the rules
                • Join the staged data into a single table
                • SQL scripts
                  • Create cr_staging_tables()
                  • Create cr_copy_from_scripts()
                  • Create assert_assumptions_ok()
                  • Create xform_to_covidcast_fb_survey_results()
                  • ingest-the-data.sql
              • Analyze the COVIDcast data
                • symptoms vs mask-wearing by day
                • Data for scatter-plot for 21-Oct-2020
                • Scatter-plot for 21-Oct-2020
                • SQL scripts
                  • analysis-queries.sql
                  • synthetic-data.sql
          • currval()
          • lastval()
          • nextval()
          • Window functions
            • Informal functionality overview
            • Invocation syntax and semantics
            • Per function signature and purpose
              • row_number(), rank() and dense_rank()
              • percent_rank(), cume_dist() and ntile()
              • first_value(), nth_value(), last_value()
              • lag(), lead()
              • Tables for the code examples
                • table t1
                • table t2
                • table t3
                • table t4
            • case study—analyzing a normal distribution
              • Bucket allocation scheme
              • do_clean_start.sql
              • cr_show_t4.sql
              • cr_dp_views.sql
              • cr_int_views.sql
              • cr_pr_cd_equality_report.sql
              • cr_bucket_using_width_bucket.sql
              • cr_bucket_dedicated_code.sql
              • do_assert_bucket_ok
              • cr_histogram.sql
              • cr_do_ntile.sql
              • cr_do_percent_rank.sql
              • cr_do_cume_dist.sql
              • do_populate_results.sql
              • do_report_results.sql
              • do_compare_dp_results.sql
              • do_demo.sql
              • Reports
                • Histogram report
                • dp-results
                • compare-dp-results
                • int-results
          • yb_hash_code()
        • Extensions
        • Keywords
        • Reserved names
      • YCQL
        • ALTER KEYSPACE
        • ALTER ROLE
        • ALTER TABLE
        • CREATE INDEX
        • CREATE KEYSPACE
        • CREATE ROLE
        • CREATE TABLE
        • CREATE TYPE
        • DROP INDEX
        • DROP KEYSPACE
        • DROP ROLE
        • DROP TABLE
        • DROP TYPE
        • GRANT PERMISSION
        • GRANT ROLE
        • REVOKE PERMISSION
        • REVOKE ROLE
        • USE
        • INSERT
        • SELECT
        • EXPLAIN
        • UPDATE
        • DELETE
        • TRANSACTION
        • TRUNCATE
        • Simple expressions
        • Subscripted expressions
        • Function call
        • Operators
        • BLOB
        • BOOLEAN
        • Collection
        • FROZEN
        • INET
        • Integer and counter
        • Non-integer
        • TEXT
        • DATE, TIME, and TIMESTAMP
        • UUID and TIMEUUID
        • JSONB
        • Date and time
        • BATCH
    • CLIs
      • yb-ctl
      • yb-docker-ctl
      • ysqlsh
      • ycqlsh
      • yb-admin
      • yb-ts-cli
      • ysql_dump
      • ysql_dumpall
    • Configuration
      • yb-tserver
      • yb-master
      • yugabyted
      • Default ports
    • Drivers
      • Client drivers for YSQL
      • Client drivers for YCQL
    • Connectors
      • Kafka Connect YugabyteDB
    • Third party tools
      • Arctype
      • DBeaver
      • DbSchema
      • pgAdmin
      • SQL Workbench/J
      • TablePlus
      • Visual Studio Code
    • Sample datasets
      • Chinook
      • Northwind
      • PgExercises
      • SportsDB
      • Retail Analytics
  • RELEASES
    • Releases overview
      • v2.13 series (latest)
      • v2.12 series (stable)
      • v2.11 series
      • v2.9 series
      • v2.8 series
      • v2.7 series
      • v2.6 series
      • v2.5 series
      • v2.4 series
      • v2.3 series
      • v2.2 series
      • v2.1 series
      • v2.0 series
      • v1.3 series
      • v1.2 series
    • Release versioning
  • FAQ
    • Comparisons
      • Amazon Aurora
      • Google Cloud Spanner
      • CockroachDB
      • TiDB
      • Vitess
      • MongoDB
      • FoundationDB
      • Amazon DynamoDB
      • Azure Cosmos DB
      • Apache Cassandra
      • PostgreSQL
      • Redis in-memory store
      • Apache HBase
    • General FAQ
    • Operations FAQ
    • API compatibility FAQ
    • Yugabyte Platform FAQ
  • MISC
    • YEDIS
      • Quick start
      • Develop
        • Build an application
        • C#
        • C++
        • Go
        • Java
        • NodeJS
        • Python
      • API reference
        • APPEND
        • AUTH
        • CONFIG
        • CREATEDB
        • DELETEDB
        • LISTDB
        • SELECT
        • DEL
        • ECHO
        • EXISTS
        • EXPIRE
        • EXPIREAT
        • FLUSHALL
        • FLUSHDB
        • GET
        • GETRANGE
        • GETSET
        • HDEL
        • HEXISTS
        • HGET
        • HGETALL
        • HINCRBY
        • HKEYS
        • HLEN
        • HMGET
        • HMSET
        • HSET
        • HSTRLEN
        • HVALS
        • INCR
        • INCRBY
        • KEYS
        • MONITOR
        • PEXPIRE
        • PEXPIREAT
        • PTTL
        • ROLE
        • SADD
        • SCARD
        • RENAME
        • SET
        • SETEX
        • PSETEX
        • SETRANGE
        • SISMEMBER
        • SMEMBERS
        • SREM
        • STRLEN
        • ZRANGE
        • TSADD
        • TSCARD
        • TSGET
        • TSLASTN
        • TSRANGEBYTIME
        • TSREM
        • TSREVRANGEBYTIME
        • TTL
        • ZADD
        • ZCARD
        • ZRANGEBYSCORE
        • ZREM
        • ZREVRANGE
        • ZSCORE
        • PUBSUB
        • PUBLISH
        • SUBSCRIBE
        • UNSUBSCRIBE
        • PSUBSCRIBE
        • PUNSUBSCRIBE
    • Legal
      • Third party software
> Develop > Real-world examples >

IoT fleet management

Report a doc issue Suggest new content
  • Overview
  • Scenario
  • Application architecture
    • Confluent Kafka, KSQL, and YugabyteDB (CKY Stack)
    • Spark, Kafka, and YugabyteDB (SKY Stack)
  • Data store
  • Data producer
  • Data processor
    • KSQL
    • Apache Spark streaming
  • Data dashboard
  • Summary

Overview

This is an end-to-end functional application with source code and installation instructions available on GitHub. It is a blueprint for an IoT application built on top of YugabyteDB (using the Cassandra-compatible YCQL API) as the database, Confluent Kafka as the message broker, KSQL or Apache Spark Streaming for real-time analytics and Spring Boot as the application framework.

Scenario

Assume that a fleet management company wants to track their fleet of vehicles which are delivering shipments. The vehicles performing the shipments are of different types (18 Wheelers, buses, large trucks, etc), and the shipments themselves happen over 3 routes (Route-37, Route-82, Route-43). The company wants to track:

  • the breakdown of their vehicle types per shipment delivery route
  • which vehicles are near road closures so that they can predict delays in deliveries

This app renders a dashboard showing both of the above. Below is a view of the real-time, auto-refreshing dashboard.

YB IoT Fleet Management Dashboard

Application architecture

This application has the following subcomponents:

  • Data Store - YugabyteDB for storing raw events from Kafka as well as the aggregates from the Data Processor
  • Data Producer - Test program writing into Kafka
  • Data Processor - KSQL or Apache Spark Streaming reading from Kafka, computing the aggregates and store results in the Data Store
  • Data Dashboard - Spring Boot app using web sockets, jQuery and bootstrap

We will look at each of these components in detail. Below is an architecture diagram showing how these components fit together.

Confluent Kafka, KSQL, and YugabyteDB (CKY Stack)

App architecture with the CKY stack is shown below. The same Kafka Connect Sink Connector for YugabyteDB is used for storing both the raw events as well as the aggregate data (that's generated using KSQL).

YB IoT Fleet Management Architecture with KSQL

Spark, Kafka, and YugabyteDB (SKY Stack)

App architecture with the SKY stack is shown below. The Kafka Connect Sink Connector for YugabyteDB is used for storing raw events from Kafka to YugabyteDB. The aggregate data generated via Apache Spark Streaming is persisted in YugabyteDB using the Spark-Cassandra Connector.

YB IoT Fleet Management Architecture with Apache Spark

Data store

Stores all the user-facing data. YugabyteDB is used here, with the Cassandra-compatible YCQL as the programming language.

All the data is stored in the keyspace TrafficKeySpace:

CREATE KEYSPACE IF NOT EXISTS TrafficKeySpace

There is one table for storing the raw events. Note the default_time_to_live value below to ensure that raw events get auto-expired after the specified period of time. This is to ensure that the raw events do not consume up all the storage in the database and efficiently deleted from the database a short while after their aggregates have been computed.

CREATE TABLE TrafficKeySpace.Origin_Table (
  vehicleId text,
  routeId text,
  vehicleType text,
  longitude text,
  latitude text,
  timeStamp timestamp,
  speed double,
  fuelLevel double,
PRIMARY KEY ((vehicleId), timeStamp))
WITH default_time_to_live = 3600;

There are three tables that hold the user-facing data - Total_Traffic for the lifetime traffic information, Window_Traffic for the last 30 seconds of traffic and poi_traffic for the traffic near a point of interest (road closures). The data processor constantly updates these tables, and the dashboard reads from these tables. Below are the schemas for these tables.

CREATE TABLE TrafficKeySpace.Total_Traffic (
    routeId text,
    vehicleType text,
    totalCount bigint,
    timeStamp timestamp,
    recordDate text,
    PRIMARY KEY (routeId, recordDate, vehicleType)
);

CREATE TABLE TrafficKeySpace.Window_Traffic (
    routeId text,
    vehicleType text,
    totalCount bigint,
    timeStamp timestamp,
    recordDate text,
    PRIMARY KEY (routeId, recordDate, vehicleType)
);

CREATE TABLE TrafficKeySpace.poi_traffic(
    vehicleid text,
    vehicletype text,
    distance bigint,
    timeStamp timestamp,
    PRIMARY KEY (vehicleid)
);

Data producer

A program that generates random test data and publishes it to the Kafka topic iot-data-event. This emulates the data received from the connected vehicles using a message broker in the real world.

A single data point is a JSON payload and looks as follows:

{
  "vehicleId":"0bf45cac-d1b8-4364-a906-980e1c2bdbcb",
  "vehicleType":"Taxi",
  "routeId":"Route-37",
  "longitude":"-95.255615",
  "latitude":"33.49808",
  "timestamp":"2017-10-16 12:31:03",
  "speed":49.0,
  "fuelLevel":38.0
}

The Kafka Connect Sink Connector for YugabyteDB reads the above iot-data-event topic, transforms the event into a YCQL INSERT statement and then calls YugabyteDB to persist the event in the TrafficKeySpace.Origin_Table table.

Data processor

KSQL

KSQL is the open source streaming SQL engine for Apache Kafka. It provides an easy-to-use yet powerful interactive SQL interface for stream processing on Kafka, without the need to write code in a programming language such as Java or Python. It supports a wide range of streaming operations, including data filtering, transformations, aggregations, joins, windowing, and sessionization.

The first step in using KSQL is to create a stream from the raw events as shown below.

CREATE STREAM traffic_stream (
           vehicleId varchar,
           vehicleType varchar,
           routeId varchar,
           timeStamp varchar,
           latitude varchar,
           longitude varchar)
    WITH (
           KAFKA_TOPIC='iot-data-event',
           VALUE_FORMAT='json',
           TIMESTAMP='timeStamp',
           TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss');

Various aggregations/queries can now be run on the above stream with results of each type of query stored in a topic of its own. This application uses 3 such queries/topics. Thereafter, the Kafka Connect Sink Connector for YugabyteDB reads these 3 topics and persists the results into the 3 corresponding tables in YugabyteDB.

CREATE TABLE total_traffic
     WITH ( PARTITIONS=1,
            KAFKA_TOPIC='total_traffic',
            TIMESTAMP='timeStamp',
            TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss') AS
     SELECT routeId,
            vehicleType,
            count(vehicleId) AS totalCount,
            max(rowtime) AS timeStamp,
            TIMESTAMPTOSTRING(max(rowtime), 'yyyy-MM-dd') AS recordDate
     FROM traffic_stream
     GROUP BY routeId, vehicleType;
CREATE TABLE window_traffic
     WITH ( TIMESTAMP='timeStamp',
            KAFKA_TOPIC='window_traffic',
            TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss',
            PARTITIONS=1) AS
     SELECT routeId,
            vehicleType,
            count(vehicleId) AS totalCount,
            max(rowtime) AS timeStamp,
            TIMESTAMPTOSTRING(max(rowtime), 'yyyy-MM-dd') AS recordDate
     FROM traffic_stream
     WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 10 SECONDS)
     GROUP BY routeId, vehicleType;
CREATE STREAM poi_traffic
      WITH ( PARTITIONS=1,
             KAFKA_TOPIC='poi_traffic',
             TIMESTAMP='timeStamp',
             TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss') AS
      SELECT vehicleId,
             vehicleType,
             cast(GEO_DISTANCE(cast(latitude AS double),cast(longitude AS double),33.877495,-95.50238,'KM') AS bigint) AS distance,
             timeStamp
      FROM traffic_stream
      WHERE GEO_DISTANCE(cast(latitude AS double),cast(longitude AS double),33.877495,-95.50238,'KM') < 30;

Apache Spark streaming

As an alternative to KSQL, you can use Apache Spark - a multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters.

Below is a sample Apache Spark streaming application (Java) that consumes the data stream from a Kafka topic, converts it into meaningful insights and writes the resulting aggregate data to YugabyteDB via a DataStax Spark Cassandra Connector.

Set up the Spark Cassandra Connector:

SparkConf conf = new SparkConf()
                   .setAppName(prop.getProperty("com.iot.app.spark.app.name"))
                   .setMaster(prop.getProperty("com.iot.app.spark.master"))
                   .set("spark.cassandra.connection.host", cassandraHost)
                   .set("spark.cassandra.connection.port", cassandraPort)
                   .set("spark.cassandra.connection.keep_alive_ms", prop.getProperty("com.iot.app.cassandra.keep_alive"));

The data is consumed from a Kafka stream and collected in 5 second batches:

JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));

JavaPairInputDStream<String, IoTData> directKafkaStream =
  KafkaUtils.createDirectStream(jssc,
                                String.class,
                                IoTData.class,
                                StringDecoder.class,
                                IoTDataDecoder.class,
                                kafkaParams,
                                topicsSet
                               );

Create non-filtered and filtered streams, to be used later in actual processing:

// Non-filtered stream for Point-of-interest (POI) traffic data calculation
JavaDStream<IoTData> nonFilteredIotDataStream = directKafkaStream.map(tuple -> tuple._2());

// Filtered stream for total and traffic data calculation
JavaPairDStream<String,IoTData> iotDataPairStream =
  nonFilteredIotDataStream.mapToPair(iot -> new Tuple2<String,IoTData>(iot.getVehicleId(),iot)).reduceByKey((a, b) -> a );

// Check vehicle ID is already processed
JavaMapWithStateDStream<String, IoTData, Boolean, Tuple2<IoTData,Boolean>> iotDStreamWithStatePairs =
  iotDataPairStream.mapWithState(
    StateSpec.function(processedVehicleFunc).timeout(Durations.seconds(3600)) // Maintain state for one hour
  );

// Filter processed vehicle IDs and keep un-processed
JavaDStream<Tuple2<IoTData,Boolean>> filteredIotDStreams =
  iotDStreamWithStatePairs.map(tuple2 -> tuple2).filter(tuple -> tuple._2.equals(Boolean.FALSE));

// Get stream of IoTData
JavaDStream<IoTData> filteredIotDataStream = filteredIotDStreams.map(tuple -> tuple._1);

The above code uses the following function to check for processed vehicles:

Function3<String, Optional<IoTData>, State<Boolean>, Tuple2<IoTData, Boolean>> processedVehicleFunc = (String, iot, state) -> {
  Tuple2<IoTData,Boolean> vehicle = new Tuple2<>(iot.get(), false);
  if (state.exists()) {
    vehicle = new Tuple2<>(iot.get(), true);
  }
  else {
    state.update(Boolean.TRUE);
  }
  return vehicle;
};

Compute a breakdown by vehicle type and the shipment route across all the vehicles and shipments done so far:

// Get count of vehicle group by routeId and vehicleType
JavaPairDStream<AggregateKey, Long> countDStreamPair =
  filteredIotDataStream
    .mapToPair(iot -> new Tuple2<>(new AggregateKey(iot.getRouteId(), iot.getVehicleType()), 1L))
    .reduceByKey((a, b) -> a + b);

// Keep state for total count
JavaMapWithStateDStream<AggregateKey, Long, Long, Tuple2<AggregateKey, Long>> countDStreamWithStatePair =
  countDStreamPair.mapWithState(
    StateSpec.function(totalSumFunc).timeout(Durations.seconds(3600)) // Maintain state for one hour
  );

// Transform to DStream of TrafficData
JavaDStream<Tuple2<AggregateKey, Long>> countDStream = countDStreamWithStatePair.map(tuple2 -> tuple2);
JavaDStream<TotalTrafficData> trafficDStream = countDStream.map(totalTrafficDataFunc);

// Map Cassandra table column
Map<String, String> columnNameMappings = new HashMap<String, String>();
columnNameMappings.put("routeId", "routeid");
columnNameMappings.put("vehicleType", "vehicletype");
columnNameMappings.put("totalCount", "totalcount");
columnNameMappings.put("timeStamp", "timestamp");
columnNameMappings.put("recordDate", "recorddate");

// Call CassandraStreamingJavaUtil function to save in database
javaFunctions(trafficDStream)
  .writerBuilder("traffickeyspace", "total_traffic", CassandraJavaUtil.mapToRow(TotalTrafficData.class, columnNameMappings))
  .saveToCassandra();

Compute the same breakdown for active shipments. This is done by computing the breakdown by vehicle type and shipment route for the last 30 seconds:

// Reduce by key and window (30 sec window and 10 sec slide)
JavaPairDStream<AggregateKey, Long> countDStreamPair =
  filteredIotDataStream
    .mapToPair(iot -> new Tuple2<>(new AggregateKey(iot.getRouteId(), iot.getVehicleType()), 1L))
    .reduceByKeyAndWindow((a, b) -> a + b, Durations.seconds(30), Durations.seconds(10));

// Transform to DStream of TrafficData
JavaDStream<WindowTrafficData> trafficDStream = countDStreamPair.map(windowTrafficDataFunc);

// Map Cassandra table column
Map<String, String> columnNameMappings = new HashMap<String, String>();
columnNameMappings.put("routeId", "routeid");
columnNameMappings.put("vehicleType", "vehicletype");
columnNameMappings.put("totalCount", "totalcount");
columnNameMappings.put("timeStamp", "timestamp");
columnNameMappings.put("recordDate", "recorddate");

// Call CassandraStreamingJavaUtil function to save in database
javaFunctions(trafficDStream)
  .writerBuilder("traffickeyspace", "window_traffic", CassandraJavaUtil.mapToRow(WindowTrafficData.class, columnNameMappings))
  .saveToCassandra();

Detect the vehicles which are within a 20 mile radius of a given Point of Interest (POI), which represents a road-closure:

// Filter by routeId, vehicleType and in POI range
JavaDStream<IoTData> iotDataStreamFiltered =
  nonFilteredIotDataStream
    .filter(iot -> (iot.getRouteId().equals(broadcastPOIValues.value()._2())
                    && iot.getVehicleType().contains(broadcastPOIValues.value()._3())
                    && GeoDistanceCalculator.isInPOIRadius(
                         Double.valueOf(iot.getLatitude()),
                         Double.valueOf(iot.getLongitude()),
                         broadcastPOIValues.value()._1().getLatitude(),
                         broadcastPOIValues.value()._1().getLongitude(),
                         broadcastPOIValues.value()._1().getRadius()
                       )
                   )
    );

// Pair with POI
JavaPairDStream<IoTData, POIData> poiDStreamPair =
  iotDataStreamFiltered.mapToPair(iot -> new Tuple2<>(iot, broadcastPOIValues.value()._1()));

// Transform to DStream of POITrafficData
JavaDStream<POITrafficData> trafficDStream = poiDStreamPair.map(poiTrafficDataFunc);

// Map Cassandra table column
Map<String, String> columnNameMappings = new HashMap<String, String>();
columnNameMappings.put("vehicleId", "vehicleid");
columnNameMappings.put("distance", "distance");
columnNameMappings.put("vehicleType", "vehicletype");
columnNameMappings.put("timeStamp", "timestamp");

// Call CassandraStreamingJavaUtil function to save in database
javaFunctions(trafficDStream)
  .writerBuilder("traffickeyspace", "poi_traffic", CassandraJavaUtil.mapToRow(POITrafficData.class, columnNameMappings))
  .withConstantTTL(120) // Keeping data for 2 minutes
  .saveToCassandra();

The above "detection within radius" code uses the following helper functions:

// Function to get running sum by maintaining the state
Function3<AggregateKey, Optional<Long>, State<Long>, Tuple2<AggregateKey, Long>> totalSumFunc = (key, currentSum, state) -> {
  long totalSum = currentSum.or(0L) + (state.exists() ? state.get() : 0);
  Tuple2<AggregateKey, Long> total = new Tuple2<>(key, totalSum);
  state.update(totalSum);
  return total;
};

// Function to create TotalTrafficData object from IoT data
Function<Tuple2<AggregateKey, Long>, TotalTrafficData> totalTrafficDataFunc = (tuple -> {
  TotalTrafficData trafficData = new TotalTrafficData();
  trafficData.setRouteId(tuple._1().getRouteId());
  trafficData.setVehicleType(tuple._1().getVehicleType());
  trafficData.setTotalCount(tuple._2());
  trafficData.setTimeStamp(new Date());
  trafficData.setRecordDate(new SimpleDateFormat("yyyy-MM-dd").format(new Date()));
  return trafficData;
});

// Function to create WindowTrafficData object from IoT data
Function<Tuple2<AggregateKey, Long>, WindowTrafficData> windowTrafficDataFunc = (tuple -> {
  WindowTrafficData trafficData = new WindowTrafficData();
  trafficData.setRouteId(tuple._1().getRouteId());
  trafficData.setVehicleType(tuple._1().getVehicleType());
  trafficData.setTotalCount(tuple._2());
  trafficData.setTimeStamp(new Date());
  trafficData.setRecordDate(new SimpleDateFormat("yyyy-MM-dd").format(new Date()));
  return trafficData;
});

// Function to create POITrafficData object from IoT data
Function<Tuple2<IoTData, POIData>, POITrafficData> poiTrafficDataFunc = (tuple -> {
  POITrafficData poiTraffic = new POITrafficData();
  poiTraffic.setVehicleId(tuple._1.getVehicleId());
  poiTraffic.setVehicleType(tuple._1.getVehicleType());
  poiTraffic.setTimeStamp(new Date());
  double distance = GeoDistanceCalculator.getDistance(
    Double.valueOf(tuple._1.getLatitude()).doubleValue(),
    Double.valueOf(tuple._1.getLongitude()).doubleValue(),
    tuple._2.getLatitude(),
    tuple._2.getLongitude()
  );
  poiTraffic.setDistance(distance);
  return poiTraffic;
});

Data dashboard

This is a Spring Boot application which queries the data from YugabyteDB and pushes the data to the webpage using Web Sockets and jQuery. The data is pushed to the web page in fixed intervals so data will be refreshed automatically. Dashboard displays data in charts and tables. This web page uses bootstrap.js to display the dashboard containing charts and tables.

We create entity classes for the three tables Total_Traffic, Window_Traffic and Poi_Traffic, and DAO interfaces for all the entities extending CassandraRepository. For example, you create the DAO class for TotalTrafficData entity as follows.

@Repository
public interface TotalTrafficDataRepository extends CassandraRepository<TotalTrafficData> {
  @Query("SELECT * FROM traffickeyspace.total_traffic WHERE recorddate = ? ALLOW FILTERING")
  Iterable<TotalTrafficData> findTrafficDataByDate(String date);
}

In order to connect to YugabyteDB cluster and get connection for database operations, you write the CassandraConfig class. This is done as follows:

public class CassandraConfig extends AbstractCassandraConfiguration {
  @Bean
  public CassandraClusterFactoryBean cluster() {
    // Create a Cassandra cluster to access YugabyteDB using CQL.
    CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean();
    // Set the database host.
    cluster.setContactPoints(environment.getProperty("com.iot.app.cassandra.host"));
    // Set the database port.
    cluster.setPort(Integer.parseInt(environment.getProperty("com.iot.app.cassandra.port")));
    return cluster;
  }
}

Note that currently the Dashboard does not use the raw events table and relies only on the data stored in the aggregates tables.

Summary

This application is a blueprint for building IoT applications. The instructions to build and run the application, as well as the source code can be found in the IoT Fleet Management GitHub repository.

  • Overview
  • Scenario
  • Application architecture
    • Confluent Kafka, KSQL, and YugabyteDB (CKY Stack)
    • Spark, Kafka, and YugabyteDB (SKY Stack)
  • Data store
  • Data producer
  • Data processor
    • KSQL
    • Apache Spark streaming
  • Data dashboard
  • Summary
Ask our community
  • Slack
  • Github
  • Forum
  • StackOverflow
Yugabyte
Contact Us
Copyright © 2017-2022 Yugabyte, Inc. All rights reserved.