Introduction
Welcome to Data Accelerators first part in data analytics architecture series. In this blog will dive into analytics data sources! In today’s fast-paced business environment, data is more important than ever for making informed decisions. Analytics data sources provide the raw materials that organizations need to gain insights and make data-driven decisions. In this post, we will explore the different types of data sources that are commonly used for analytics, and discuss how the best practices of using as the foundation for building analytics platform.
Data Sources Origination Types
Every type of data source can help organizations to add a different angle and perspective to the current landscape in the below table a summary of the main types and how to use these data sources.
Data Type | What is It | How to Use |
---|---|---|
Transactional | Day-to-day transactions of an organization, such as sales, customer interactions, and financial transactions. | Used to track performance, customer behavior, and other key metrics. |
Social Media | Social media platforms such as Facebook, Twitter, and LinkedIn generate a vast amount of data that can be used to track brand sentiment, customer engagement, and other important metrics. | Social media data can be analyzed to gain insights into customer behavior, preferences, and sentiment. |
Machine-Generated | With the rise of the Internet of Things (IoT) and other connected devices, organizations are now able to collect large amounts of data from machines, such as sensor data and log files. | This type of data can be used to track performance and optimize operations in industries such as manufacturing, transportation, and healthcare |
Public Data | There are a lot of public data sources available like government data, open data, and data from non-profit organizations | Sources can be used to gain valuable insights into various industries, markets, and population trends. |
Third-Party Data | Organizations can also purchase data from third-party providers, such as demographic data or market research reports. | The sources can supplement their internal data and gain a more complete picture of their customers and the market. |
User Data | This is the business logic of any organization, it could be manual entries, classifications and grouping lists, catalogues and special dates. | The user data is used to enrich and present the source other data sources with the specific domain or organization business logic. Usually used to join, filter, group and map other data sources. |
Data Sources Technical Types
In this section will present the different source types from a technical point of view . Understanding how to load data efficiently is a key decision on the way to build sustainable data platform
Data Source Type | What is this Data Type | Best Practices -> How to Load Data |
---|---|---|
Relational databases | These databases, such as MySQL, SQL Server and Oracle, allow for the storage and retrieval of data in a structured format. The data is stored in tables, with each table consisting of rows and columns. This allows for easy querying and analysis of the data. | If OLTP use SQL with minimum impact on source identify the changes in data and load by dates, timestamps, and DB partitions . Use Build-in connector like ODBC,JDBC. Understand the tuning parameters regarding memory and networking to move the right volume. |
Non-relational databases | Non-relational databases, also known as NoSQL databases, are designed to handle large volumes of unstructured or semi-structured data. These databases, such as MongoDB and Cassandra, are often used in big data and real-time analytics applications. | Replicate data by taking full records and projection and search options available on the source, load each table separately |
Files and Logs | File data sources, such as CSV and Excel files, can also be used as a source of data. These types of files can be stored on local systems, network drives or cloud storage services like AWS S3 or Google cloud storage. File are likely to change and structure is hard to manage. File size and compression is a critical component in understanding data source. | Big Files- Divide to small files and compress, load data from source format and transform in analytics platform avoid changing complex formats on the fly to avoid reading row by row. Avoid specific scripts to include columns name but rather create dynamic loads. Prefer to use build-in connector that can automatically parse the file as CSV, XML, JSON. |
API | API’s (Application Programming Interface) allow organizations to access and retrieve data from external sources and systems. This could be from a third-party service or an internal system. Data can be pulled and integrated into an organization’s data pipeline for analysis and reporting. | Use batch or many records in one call. Batch multiple calls into streams or group them together and then flush them into the platform. |
Analytics Databases | While a relational database is optimized for storing rows of data, typically for transactional applications, a columnar database is optimized for fast retrieval of columns of data, typically in analytical applications. Column-oriented storage for database tables is an important factor in analytic query performance because it drastically reduces the overall disk I/O requirements and reduces the amount of data you need to load from disk | Use the compute power of the source and create joins, aggregations using the data source to do some of the heavy pre processing required. Use right size of computing to the task. If partitions are available make sure to filter using them. Explore the tuning and optimization options on the modern DB |
Data Sources Examples Using Best Practices
In this section will demonstrate examples how to use each data source. The focus in this section is on performance and utilizing the data source in the best efficient way.
I choose to use python, but any language or integration can be used. The importance is the right way of loading data.
Assumptions: Large volume of data, data loaded as batch or mini-batch (small amount of records frequently).
Relational databases
For this example will create a partition for a large order history table based on the order date, with one file group for each day of data:
CREATE PARTITION FUNCTION OrderDateRangePF (datetime)
AS RANGE LEFT FOR VALUES ('2022-01-01', '2022-01-02', '2022-01-03', ...);
CREATE PARTITION SCHEME OrderDateRangePS
AS PARTITION OrderDateRangePF
ALL TO ([PRIMARY]);
CREATE TABLE OrderHistory
(
OrderID int NOT NULL,
OrderDate datetime NOT NULL,
CustomerID int NOT NULL
...
)
ON OrderDateRangePS (OrderDate);
Step number 1 understand if a table has partitions or sub-partitions or any indexes.
Use the results to determine which columns to include in the “where clause” in order
to improve query performance by reducing the amount of data that needs to be scanned and processed.
SELECT
OBJECT_NAME(object_id) AS TableName,
partition_number AS PartitionNumber,
boundary_value_on_right AS BoundaryValue,
ROW_NUMBER() OVER (PARTITION BY OBJECT_NAME(object_id) ORDER BY boundary_value_on_right) AS PartitionRank
FROM
sys.partitions
WHERE
OBJECT_NAME(object_id) = 'OrderHistory' --replace with table name
AND index_id IN (0, 1)
ORDER BY
OBJECT_NAME(object_id),
PartitionRank;
Step number 2 load the data and filter by OrderDate.
Additional important comments:
- For resilience it’s always best to load data from the last time we loaded data successfully.
- Select * not recommended on wide tables, but specifying the columns name requires maintenance of new columns added.
- Avoid doing joins on this step unless the joins is to reduce and filter the data.
import pyodbc
import pandas as pd
# Connection string
conn = pyodbc.connect("Driver={SQL Server}; Server=<server_name>; Database=<database_name>; UID=<user_name>; PWD=<password>")
# this parameter can be maintained in load history table
last_success_load_date ='2022-01-01'
# SQL query using parition column OrderDate
query = f"SELECT * FROM OrderHistory WHERE OrderDate >= '{last_success_load_date }'"
# Load data into a DataFrame
df = pd.read_sql(query, conn)
# Close the connection
conn.close()
Now lets suppose we need to add the customer details , It’s better to join once the data frame only includes one day of orders.
import pandas as pd
import pyodbc
conn = pyodbc.connect('DRIVER={SQL Server};'
'SERVER=<server_name>;'
'DATABASE=<database_name>;'
'UID=<user_name>;'
'PWD=<password>')
last_success_load_date ='2022-01-01'
customer_query = "SELECT * FROM Customer"
order_query = f"SELECT * FROM OrderHistory WHERE OrderDate >= '{last_success_load_date }'"
customer_df = pd.read_sql(customer_query, conn)
order_df = pd.read_sql(order_query, conn)
merged_df = pd.merge(order_df, customer_df, on='CustomerID')
conn.close()
Non-relational databases
Sharding strategy can be used when the data is frequently queried based on a specific field, such as date or geographic location. In the below example MongoDB collection (table as we know in other DBs) is sharded by date.
Step number 1 identify shards on collection. This command will return information about the distribution of data across the shards for the specified collection.
db.collection_name.getShardDistribution()
Step number 2:
- In this example, a filter is added to the query. This filter specifies that only documents with a
date
field greater than or equal to “2022-01-01” and less than or equal to “2022-12-31” should be returned. - Batch loading: Use the
batch_size
method when loading the data from the MongoDB collection. This loads the data in chunks, which reduces the memory usage and improves performance. - The
collection.count_documents
method is used to get the total number of documents in the filtered collection, and a for loop is used to retrieve the filtered documents in batches.
import pymongo
import pandas as pd
# Connect to MongoDB
client = pymongo.MongoClient("mongodb://localhost:27017/")
db = client["mydatabase"]
collection = db["mycollection"]
# Define the batch size and the filter
batch_size = 1000
filter = {"date": {"$gte": "2022-01-01", "$lte": "2022-12-31"}}
# Get the total number of documents in the filtered collection
total_docs = collection.count_documents(filter)
# Loop through the filtered collection in batches
results = []
for i in range(0, total_docs, batch_size):
docs = collection.find(filter).skip(i).limit(batch_size)
batch = [doc for doc in docs]
results += batch
# Convert the results to a Pandas dataframe
df = pd.DataFrame(results)
Files and Logs
Files and logs loading technique can be vary from files type and cloud provider or local file system
In the below example will demonstrate some of the best practices to load using S3 using filtering, reading data in chunks and parallel processing.
- Use S3 Select: S3 Select is a feature of Amazon S3 that allows you to filter, transform, and reduce the data stored in your S3 objects. You can use S3 Select to extract only the data you need from your large S3 objects, reducing the amount of data you need to load into memory.
- Add parallel processing: Parallel processing can greatly improve the performance of reading large files from S3. You can use the
concurrent.futures
library in Python to process the data in parallel. - Implement chunked reading: When reading large files from S3, it’s a good idea to read the data in chunks rather than all at once. This can help reduce the memory usage and improve the performance of the read operation.
import boto3
import concurrent.futures
# Connect to S3 using boto3
s3 = boto3.client('s3')
# Define the S3 bucket and object
bucket = 'my-bucket'
object_key = 'large-file.csv'
# Use S3 Select to extract only the necessary data
response = s3.select_object_content(
Bucket=bucket,
Key=object_key,
Expression="SELECT * FROM S3Object WHERE <condition>",
ExpressionType='SQL',
InputSerialization={'CSV': {'FileHeaderInfo': 'Use'}},
OutputSerialization={'CSV': {}},
)
# Use parallel processing to read the data in chunks
def process_data(record):
# Process each chunk of data
return True
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
for event in response['Payload']:
if 'Records' in event:
records = event['Records']['Payload'].decode('utf-8')
executor.submit(process_data, records)
API
Below are some of the best practices when loading a large data set from API
- Pagination: APIs often have a limit on the number of records that can be returned in a single request. To retrieve large datasets, you’ll need to implement pagination to make multiple requests, each returning a smaller batch of data.
- Caching: To minimize the number of API requests and reduce the load on the API server, you can cache the data locally after each request. This can be achieved using disk-based caching, in-memory caching, or a combination of both.
- Error handling: When making API requests, it’s important to handle errors that may occur due to network connectivity issues, rate limiting, or other factors. You should implement error handling to ensure that the data retrieval process is robust and can continue even if some requests fail.
- Parallelization: To speed up the data retrieval process, you can parallelize the API requests by making multiple requests simultaneously. This can be achieved using multithreading or multiprocessing techniques
import requests
import pandas as pd
import pickle
# Define the API endpoint
API_URL = "https://api.example.com/data"
# Define the batch size
BATCH_SIZE = 100
# Define the cache file name
CACHE_FILE = "data.pkl"
# Function to make a single API request
def make_request(offset, limit):
params = {
"offset": offset,
"limit": limit
}
response = requests.get(API_URL, params=params)
return response.json()
# Function to retrieve data from the API
def get_data(batch_size=BATCH_SIZE):
try:
# Try to load data from cache
with open(CACHE_FILE, "rb") as file:
data = pickle.load(file)
return data
except (FileNotFoundError, EOFError):
# If the cache file doesn't exist, retrieve data from the API
data = []
offset = 0
while True:
batch = make_request(offset, batch_size)
if not batch:
break
data.extend(batch)
offset += batch_size
# Cache the data
with open(CACHE_FILE, "wb") as file:
pickle.dump(data, file)
return data
# Function to preprocess the data
def preprocess_data(data):
# Perform data preprocessing here
return data
# Function to load data into a pandas dataframe
def load_data_into_dataframe(data):
data = preprocess_data(data)
df = pd.DataFrame(data)
return df
# Get the data
data = get_data()
# Load the data into a dataframe
df = load_data_into_dataframe(data)
This code implements the following best practices:
- Pagination: The
offset
andlimit
parameters are used to paginate the API requests. - Caching: The data is cached to a local file using the
pickle
module. - Error handling: Error handling is implemented using the
try
/except
block. - Data preprocessing: The data is preprocessed before loading into the dataframe using the
preprocess_data
function.
Analytics Databases
Before we explain best practices ,firstly will show an example what type of queries we should run on analytics DB as our source.
In the below example we have a business requirement to show the top 100 customers revenue for a given date and product. This query joins multiple large tables (customers
, orders
, order_lines
, and products
) and aggregates data based on several columns. The query includes a GROUP BY
clause, a SUM
aggregate function, and a LIMIT
clause. Results are now encapsulated in a subquery allowing further manipulate the results, For example sorting or filtering, before retrieving the final results.
All of these elements make this a good candidate for analytics DB, which is optimized for handling large, complex queries like this one.
SELECT *
FROM (
SELECT
c.customer_id,
c.customer_name,
o.order_date,
p.product_name,
sum(ol.quantity) as total_quantity,
sum(ol.price * ol.quantity) as total_revenue
FROM
customers c
JOIN orders o
ON c.customer_id = o.customer_id
JOIN order_lines ol
ON o.order_id = ol.order_id
JOIN products p
ON ol.product_id = p.product_id
WHERE
o.order_date BETWEEN '2021-01-01' AND '2021-12-31'
GROUP BY
1, 2, 3, 4
) sub
ORDER BY
total_revenue DESC
LIMIT 100;
The most common analytics DB are cloud based and known some time as Cloud DWH among them:
Snowflake, Redshift, BigQuery, Synapse Analytics, Vertica (can be local as well), Firebolt.
Below a code example to load from Redshift into data frame. The following features in Redshift allowing us to execute complex queries in Redshift:
- Columnar storage: Redshift uses a columnar storage format that is optimized for fast data compression and retrieval, making it well-suited for analytical queries.
- Massively parallel processing (MPP): Redshift distributes data and processing across multiple nodes, allowing it to scale processing power to handle complex queries.
- Distributed sort: Redshift uses a distributed sort algorithm to speed up the sorting of large datasets, making it well-suited for queries that require sorting.
- Query optimization: Redshift uses advanced query optimization techniques to minimize the number of disk I/O operations required to execute a query, reducing query execution time.
- Materialized views: Materialized views in Redshift can be used to pre-compute and store results of complex queries, allowing you to retrieve the results quickly.
- Table partitioning: Redshift provides support for table partitioning, which allows you to partition large tables into smaller, more manageable pieces, making it easier to manage large datasets and speeding up query performance.
- Compression encoding: Redshift provides several compression encoding options, including run-length encoding (RLE) and delta encoding, which can help to reduce the size of your data and improve query performance.
- Workload management (WLM): Redshift provides a flexible workload management system that enables you to prioritize and manage complex queries to ensure optimal performance.
- Zone maps: Redshift uses zone maps to quickly locate the data blocks needed to execute a query, reducing disk I/O operations and improving query performance.
import psycopg2
import pandas as pd
conn = psycopg2.connect(
host="redshift_host",
port="5439",
database="database_name",
user="user_name",
password="password"
)
query = """
SELECT *
FROM (
SELECT
c.customer_id,
c.customer_name,
o.order_date,
p.product_name,
sum(ol.quantity) as total_quantity,
sum(ol.price * ol.quantity) as total_revenue
FROM
customers c
JOIN orders o
ON c.customer_id = o.customer_id
JOIN order_lines ol
ON o.order_id = ol.order_id
JOIN products p
ON ol.product_id = p.product_id
WHERE
o.order_date BETWEEN '2021-01-01' AND '2021-12-31'
GROUP BY
1, 2, 3, 4
) sub
ORDER BY
total_revenue DESC
LIMIT 100;
"""
df = pd.read_sql_query(query, conn)
conn.close()
Most of the cloud DWH use features as Redshift and provide a way to load efficiently this type of queries.
Summary
In this blog we explained the different data types and data source and outlined the best practice for using each one. We demonstrated code example and explanation how to load data.
In the next part of our series blog will discuss Data Acquisition – the process of collecting data, the different methods and techniques based on your requirements and SLA that crucial for ensuring accurate analysis and decision-making based on the data.
2 thoughts on “Data Analytics Architecture Series Part 1: Data Sources”
Comments are closed.