Introduction

Welcome to Data Accelerators first part in data analytics architecture series. In this blog will dive into analytics data sources! In today’s fast-paced business environment, data is more important than ever for making informed decisions. Analytics data sources provide the raw materials that organizations need to gain insights and make data-driven decisions. In this post, we will explore the different types of data sources that are commonly used for analytics, and discuss how the best practices of using as the foundation for building analytics platform.

Data Sources Origination Types

Every type of data source can help organizations to add a different angle and perspective to the current landscape in the below table a summary of the main types and how to use these data sources.

Data TypeWhat is ItHow to Use
Transactional
Day-to-day transactions of an organization, such as sales, customer interactions, and financial transactions.Used to track performance, customer behavior, and other key metrics.
Social Media
Social media platforms such as Facebook, Twitter, and LinkedIn generate a vast amount of data that can be used to track brand sentiment, customer engagement, and other important metrics.Social media data can be analyzed to gain insights into customer behavior, preferences, and sentiment.
Machine-Generated
With the rise of the Internet of Things (IoT) and other connected devices, organizations are now able to collect large amounts of data from machines, such as sensor data and log files.This type of data can be used to track performance and optimize operations in industries such as manufacturing, transportation, and healthcare
Public Data
There are a lot of public data sources available like government data, open data, and data from non-profit organizationsSources can be used to gain valuable insights into various industries, markets, and population trends.
Third-Party Data
Organizations can also purchase data from third-party providers, such as demographic data or market research reports.The sources can supplement their internal data and gain a more complete picture of their customers and the market.
User Data
This is the business logic of any organization, it could be manual entries, classifications and grouping lists, catalogues and special dates.The user data is used to enrich and present the source other data sources with the specific domain or organization business logic.
Usually used to join, filter, group and map other data sources.

Data Sources Technical Types

In this section will present the different source types from a technical point of view . Understanding how to load data efficiently is a key decision on the way to build sustainable data platform

Data Source TypeWhat is this Data TypeBest Practices ->
How to Load Data
Relational databases

These databases, such as MySQL, SQL Server and Oracle, allow for the storage and retrieval of data in a structured format.

The data is stored in tables, with each table consisting of rows and columns. This allows for easy querying and analysis of the data.
If OLTP use SQL with minimum impact on source identify the changes in data and load by dates, timestamps, and DB partitions .

Use Build-in connector like ODBC,JDBC.
Understand the tuning parameters regarding memory and networking to move the right volume.
Non-relational databases

Non-relational databases, also known as NoSQL databases, are designed to handle large volumes of unstructured or semi-structured data.
These databases, such as MongoDB and Cassandra, are often used in big data and real-time analytics applications.
Replicate data by taking full records and projection and search options available on the source, load each table separately
Files and Logs
File data sources, such as CSV and Excel files, can also be used as a source of data. These types of files can be stored on local systems, network drives or cloud storage services like AWS S3 or Google cloud storage.

File are likely to change and structure is hard to manage.
File size and compression is a critical component in understanding data source.
Big Files-
Divide to small files and compress, load data from source format and transform in analytics platform avoid changing complex formats on the fly to avoid reading row by row.

Avoid specific scripts to include columns name but rather create dynamic loads.

Prefer to use build-in connector that can automatically parse the file as CSV, XML, JSON.
API
API’s (Application Programming Interface) allow organizations to access and retrieve data from external sources and systems. This could be from a third-party service or an internal system.

Data can be pulled and integrated into an organization’s data pipeline for analysis and reporting.
Use batch or many records in one call.
Batch multiple calls into streams or group them together and then flush them into the platform.
Analytics Databases

While a relational database is optimized for storing rows of data, typically for transactional applications, a columnar database is optimized for fast retrieval of columns of data, typically in analytical applications. Column-oriented storage for database tables is an important factor in analytic query performance because it drastically reduces the overall disk I/O requirements and reduces the amount of data you need to load from diskUse the compute power of the source and create joins, aggregations using the data source to do some of the heavy pre processing required.
Use right size of computing to the task.
If partitions are available make sure to filter using them.
Explore the tuning and optimization options on the modern DB

Data Sources Examples Using Best Practices

In this section will demonstrate examples how to use each data source. The focus in this section is on performance and utilizing the data source in the best efficient way.

I choose to use python, but any language or integration can be used. The importance is the right way of loading data.

Assumptions: Large volume of data, data loaded as batch or mini-batch (small amount of records frequently).

Relational databases

For this example will create a partition for a large order history table based on the order date, with one file group for each day of data:

SQL
CREATE PARTITION FUNCTION OrderDateRangePF (datetime)
AS RANGE LEFT FOR VALUES ('2022-01-01', '2022-01-02', '2022-01-03', ...);

CREATE PARTITION SCHEME OrderDateRangePS
AS PARTITION OrderDateRangePF
ALL TO ([PRIMARY]);

CREATE TABLE OrderHistory
(
    OrderID int NOT NULL,
    OrderDate datetime NOT NULL,
    CustomerID int NOT NULL
    ...
)
ON OrderDateRangePS (OrderDate);

Step number 1 understand if a table has partitions or sub-partitions or any indexes.

Use the results to determine which columns to include in the “where clause” in order

to improve query performance by reducing the amount of data that needs to be scanned and processed.

SQL
SELECT 
    OBJECT_NAME(object_id) AS TableName,
    partition_number AS PartitionNumber,
    boundary_value_on_right AS BoundaryValue,
    ROW_NUMBER() OVER (PARTITION BY OBJECT_NAME(object_id) ORDER BY boundary_value_on_right) AS PartitionRank
FROM 
    sys.partitions 
WHERE 
    OBJECT_NAME(object_id) = 'OrderHistory' --replace with table name
    AND index_id IN (0, 1)
ORDER BY 
    OBJECT_NAME(object_id), 
    PartitionRank;

Step number 2 load the data and filter by OrderDate.

Additional important comments:

  • For resilience it’s always best to load data from the last time we loaded data successfully.
  • Select * not recommended on wide tables, but specifying the columns name requires maintenance of new columns added.
  • Avoid doing joins on this step unless the joins is to reduce and filter the data.
Python
import pyodbc
import pandas as pd

# Connection string
conn = pyodbc.connect("Driver={SQL Server}; Server=<server_name>; Database=<database_name>; UID=<user_name>; PWD=<password>")
# this parameter can be maintained in load history table
last_success_load_date ='2022-01-01'
# SQL query using parition column OrderDate 
query = f"SELECT * FROM OrderHistory WHERE OrderDate >= '{last_success_load_date }'"

# Load data into a DataFrame
df = pd.read_sql(query, conn)

# Close the connection
conn.close()

Now lets suppose we need to add the customer details , It’s better to join once the data frame only includes one day of orders.

Python
import pandas as pd
import pyodbc

conn = pyodbc.connect('DRIVER={SQL Server};'
                      'SERVER=<server_name>;'
                      'DATABASE=<database_name>;'
                      'UID=<user_name>;'
                      'PWD=<password>')

last_success_load_date ='2022-01-01'
customer_query = "SELECT * FROM Customer"
order_query = f"SELECT * FROM OrderHistory WHERE OrderDate >= '{last_success_load_date }'"

customer_df = pd.read_sql(customer_query, conn)
order_df = pd.read_sql(order_query, conn)

merged_df = pd.merge(order_df, customer_df, on='CustomerID')

conn.close()

Non-relational databases

Sharding strategy can be used when the data is frequently queried based on a specific field, such as date or geographic location. In the below example MongoDB collection (table as we know in other DBs) is sharded by date.

Step number 1 identify shards on collection. This command will return information about the distribution of data across the shards for the specified collection.

ShellScript
db.collection_name.getShardDistribution()

Step number 2:

  • In this example, a filter is added to the query. This filter specifies that only documents with a date field greater than or equal to “2022-01-01” and less than or equal to “2022-12-31” should be returned.
  • Batch loading: Use the batch_size method when loading the data from the MongoDB collection. This loads the data in chunks, which reduces the memory usage and improves performance.
  • The collection.count_documents method is used to get the total number of documents in the filtered collection, and a for loop is used to retrieve the filtered documents in batches.
Python
import pymongo
import pandas as pd

# Connect to MongoDB
client = pymongo.MongoClient("mongodb://localhost:27017/")
db = client["mydatabase"]
collection = db["mycollection"]

# Define the batch size and the filter
batch_size = 1000
filter = {"date": {"$gte": "2022-01-01", "$lte": "2022-12-31"}}

# Get the total number of documents in the filtered collection
total_docs = collection.count_documents(filter)

# Loop through the filtered collection in batches
results = []
for i in range(0, total_docs, batch_size):
    docs = collection.find(filter).skip(i).limit(batch_size)
    batch = [doc for doc in docs]
    results += batch

# Convert the results to a Pandas dataframe
df = pd.DataFrame(results)

Files and Logs

Files and logs loading technique can be vary from files type and cloud provider or local file system

In the below example will demonstrate some of the best practices to load using S3 using filtering, reading data in chunks and parallel processing.

  • Use S3 Select: S3 Select is a feature of Amazon S3 that allows you to filter, transform, and reduce the data stored in your S3 objects. You can use S3 Select to extract only the data you need from your large S3 objects, reducing the amount of data you need to load into memory.
  • Add parallel processing: Parallel processing can greatly improve the performance of reading large files from S3. You can use the concurrent.futures library in Python to process the data in parallel.
  • Implement chunked reading: When reading large files from S3, it’s a good idea to read the data in chunks rather than all at once. This can help reduce the memory usage and improve the performance of the read operation.
Python
import boto3
import concurrent.futures

# Connect to S3 using boto3
s3 = boto3.client('s3')

# Define the S3 bucket and object
bucket = 'my-bucket'
object_key = 'large-file.csv'

# Use S3 Select to extract only the necessary data
response = s3.select_object_content(
    Bucket=bucket,
    Key=object_key,
    Expression="SELECT * FROM S3Object WHERE <condition>",
    ExpressionType='SQL',
    InputSerialization={'CSV': {'FileHeaderInfo': 'Use'}},
    OutputSerialization={'CSV': {}},
)

# Use parallel processing to read the data in chunks
def process_data(record):
    # Process each chunk of data
    return True

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    for event in response['Payload']:
        if 'Records' in event:
            records = event['Records']['Payload'].decode('utf-8')
            executor.submit(process_data, records)

API

Below are some of the best practices when loading a large data set from API

  • Pagination: APIs often have a limit on the number of records that can be returned in a single request. To retrieve large datasets, you’ll need to implement pagination to make multiple requests, each returning a smaller batch of data.
  • Caching: To minimize the number of API requests and reduce the load on the API server, you can cache the data locally after each request. This can be achieved using disk-based caching, in-memory caching, or a combination of both.
  • Error handling: When making API requests, it’s important to handle errors that may occur due to network connectivity issues, rate limiting, or other factors. You should implement error handling to ensure that the data retrieval process is robust and can continue even if some requests fail.
  • Parallelization: To speed up the data retrieval process, you can parallelize the API requests by making multiple requests simultaneously. This can be achieved using multithreading or multiprocessing techniques
Python
import requests
import pandas as pd
import pickle

# Define the API endpoint
API_URL = "https://api.example.com/data"

# Define the batch size
BATCH_SIZE = 100

# Define the cache file name
CACHE_FILE = "data.pkl"

# Function to make a single API request
def make_request(offset, limit):
    params = {
        "offset": offset,
        "limit": limit
    }
    response = requests.get(API_URL, params=params)
    return response.json()

# Function to retrieve data from the API
def get_data(batch_size=BATCH_SIZE):
    try:
        # Try to load data from cache
        with open(CACHE_FILE, "rb") as file:
            data = pickle.load(file)
            return data
    except (FileNotFoundError, EOFError):
        # If the cache file doesn't exist, retrieve data from the API
        data = []
        offset = 0
        while True:
            batch = make_request(offset, batch_size)
            if not batch:
                break
            data.extend(batch)
            offset += batch_size
        # Cache the data
        with open(CACHE_FILE, "wb") as file:
            pickle.dump(data, file)
        return data

# Function to preprocess the data
def preprocess_data(data):
    # Perform data preprocessing here
    return data

# Function to load data into a pandas dataframe
def load_data_into_dataframe(data):
    data = preprocess_data(data)
    df = pd.DataFrame(data)
    return df

# Get the data
data = get_data()

# Load the data into a dataframe
df = load_data_into_dataframe(data)

This code implements the following best practices:

  • Pagination: The offset and limit parameters are used to paginate the API requests.
  • Caching: The data is cached to a local file using the pickle module.
  • Error handling: Error handling is implemented using the try/except block.
  • Data preprocessing: The data is preprocessed before loading into the dataframe using the preprocess_data function.

Analytics Databases

Before we explain best practices ,firstly will show an example what type of queries we should run on analytics DB as our source.

In the below example we have a business requirement to show the top 100 customers revenue for a given date and product. This query joins multiple large tables (customers, orders, order_lines, and products) and aggregates data based on several columns. The query includes a GROUP BY clause, a SUM aggregate function, and a LIMIT clause. Results are now encapsulated in a subquery allowing further manipulate the results, For example sorting or filtering, before retrieving the final results.

All of these elements make this a good candidate for analytics DB, which is optimized for handling large, complex queries like this one.

SQL
SELECT *
FROM (
  SELECT
    c.customer_id,
    c.customer_name,
    o.order_date,
    p.product_name,
    sum(ol.quantity) as total_quantity,
    sum(ol.price * ol.quantity) as total_revenue
  FROM
    customers c
    JOIN orders o
      ON c.customer_id = o.customer_id
    JOIN order_lines ol
      ON o.order_id = ol.order_id
    JOIN products p
      ON ol.product_id = p.product_id
  WHERE
    o.order_date BETWEEN '2021-01-01' AND '2021-12-31'
  GROUP BY
    1, 2, 3, 4
) sub
ORDER BY
  total_revenue DESC
LIMIT 100;

The most common analytics DB are cloud based and known some time as Cloud DWH among them:

Snowflake, Redshift, BigQuery, Synapse Analytics, Vertica (can be local as well), Firebolt.

Below a code example to load from Redshift into data frame. The following features in Redshift allowing us to execute complex queries in Redshift:

  • Columnar storage: Redshift uses a columnar storage format that is optimized for fast data compression and retrieval, making it well-suited for analytical queries.
  • Massively parallel processing (MPP): Redshift distributes data and processing across multiple nodes, allowing it to scale processing power to handle complex queries.
  • Distributed sort: Redshift uses a distributed sort algorithm to speed up the sorting of large datasets, making it well-suited for queries that require sorting.
  • Query optimization: Redshift uses advanced query optimization techniques to minimize the number of disk I/O operations required to execute a query, reducing query execution time.
  • Materialized views: Materialized views in Redshift can be used to pre-compute and store results of complex queries, allowing you to retrieve the results quickly.
  • Table partitioning: Redshift provides support for table partitioning, which allows you to partition large tables into smaller, more manageable pieces, making it easier to manage large datasets and speeding up query performance.
  • Compression encoding: Redshift provides several compression encoding options, including run-length encoding (RLE) and delta encoding, which can help to reduce the size of your data and improve query performance.
  • Workload management (WLM): Redshift provides a flexible workload management system that enables you to prioritize and manage complex queries to ensure optimal performance.
  • Zone maps: Redshift uses zone maps to quickly locate the data blocks needed to execute a query, reducing disk I/O operations and improving query performance.
Python
import psycopg2
import pandas as pd

conn = psycopg2.connect(
    host="redshift_host",
    port="5439",
    database="database_name",
    user="user_name",
    password="password"
)

query = """
SELECT *
FROM (
  SELECT
    c.customer_id,
    c.customer_name,
    o.order_date,
    p.product_name,
    sum(ol.quantity) as total_quantity,
    sum(ol.price * ol.quantity) as total_revenue
  FROM
    customers c
    JOIN orders o
      ON c.customer_id = o.customer_id
    JOIN order_lines ol
      ON o.order_id = ol.order_id
    JOIN products p
      ON ol.product_id = p.product_id
  WHERE
    o.order_date BETWEEN '2021-01-01' AND '2021-12-31'
  GROUP BY
    1, 2, 3, 4
) sub
ORDER BY
  total_revenue DESC
LIMIT 100;
"""

df = pd.read_sql_query(query, conn)

conn.close()

Most of the cloud DWH use features as Redshift and provide a way to load efficiently this type of queries.

Summary

In this blog we explained the different data types and data source and outlined the best practice for using each one. We demonstrated code example and explanation how to load data.

In the next part of our series blog will discuss Data Acquisition – the process of collecting data, the different methods and techniques based on your requirements and SLA that crucial for ensuring accurate analysis and decision-making based on the data.

2 thoughts on “Data Analytics Architecture Series Part 1: Data Sources

Comments are closed.