What is BigQuery?
Google BigQuery is a fully managed, serverless data warehouse that enables scalable analysis over petabytes of data. It separates storage and compute, allowing you to pay only for what you use.
BigQuery uses a columnar storage format (Capacitor) and a distributed query engine (Dremel) to execute SQL queries across massive datasets in seconds.
BigQuery Architecture
┌─────────────────────────────────────────────────────────────────┐
│ BigQuery Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Dremel Query Engine │ │
│ │ - Distributed SQL execution │ │
│ │ - Automatic parallelization │ │
│ │ - In-memory shuffle │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌────────────────────────▼─────────────────────────────────┐ │
│ │ Colossus Storage │ │
│ │ - Columnar format (Capacitor) │ │
│ │ - Automatic compression │ │
│ │ - Replication & durability │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ Key Concepts: │
│ ├── Project (billing & access boundary) │
│ ├── Dataset (container for tables, like database) │
│ ├── Table (structured data storage) │
│ ├── View (saved query, virtual table) │
│ ├── Materialized View (cached query results) │
│ └── External Table (query data in GCS/Drive) │
│ │
│ Pricing Model: │
│ ├── Storage: ~$0.02/GB/month (active), $0.01/GB (long-term) │
│ ├── Queries: $5/TB processed (on-demand) │
│ └── Slots: Flat-rate pricing for dedicated capacity │
│ │
└─────────────────────────────────────────────────────────────────┘
Getting Started
# Install Python client
pip install google-cloud-bigquery
# Authentication
# Option 1: Service account (recommended for production)
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account.json"
# Option 2: User credentials (development)
gcloud auth application-default login
# Python: Initialize client
from google.cloud import bigquery
client = bigquery.Client(project='your-project-id')
# Create dataset
dataset_id = 'your-project-id.analytics'
dataset = bigquery.Dataset(dataset_id)
dataset.location = 'US'
dataset = client.create_dataset(dataset, exists_ok=True)
print(f'Created dataset {dataset.dataset_id}')
# Create table with schema
table_id = 'your-project-id.analytics.events'
schema = [
bigquery.SchemaField('event_id', 'STRING', mode='REQUIRED'),
bigquery.SchemaField('user_id', 'STRING', mode='REQUIRED'),
bigquery.SchemaField('event_type', 'STRING'),
bigquery.SchemaField('event_data', 'JSON'),
bigquery.SchemaField('event_time', 'TIMESTAMP', mode='REQUIRED'),
bigquery.SchemaField('amount', 'NUMERIC'),
]
table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table, exists_ok=True)
print(f'Created table {table.table_id}')
Loading Data
# Load from local file
from google.cloud import bigquery
client = bigquery.Client()
table_id = 'project.dataset.events'
# CSV
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.CSV,
skip_leading_rows=1,
autodetect=True,
)
with open('events.csv', 'rb') as f:
job = client.load_table_from_file(f, table_id, job_config=job_config)
job.result() # Wait for completion
# JSON (newline-delimited)
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
autodetect=True,
)
job = client.load_table_from_uri(
'gs://bucket/events/*.json',
table_id,
job_config=job_config
)
# Parquet (recommended for large datasets)
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
)
job = client.load_table_from_uri(
'gs://bucket/events/*.parquet',
table_id,
job_config=job_config
)
# Load from DataFrame
import pandas as pd
df = pd.DataFrame({
'event_id': ['e1', 'e2'],
'user_id': ['u1', 'u2'],
'event_type': ['click', 'purchase'],
'event_time': pd.to_datetime(['2024-01-01', '2024-01-02'])
})
job = client.load_table_from_dataframe(df, table_id)
job.result()
print(f'Loaded {job.output_rows} rows')
SQL Queries
-- Basic query
SELECT
user_id,
event_type,
COUNT(*) as event_count,
SUM(amount) as total_amount
FROM `project.dataset.events`
WHERE event_time >= '2024-01-01'
GROUP BY user_id, event_type
ORDER BY total_amount DESC
LIMIT 100;
-- Window functions
SELECT
user_id,
event_time,
amount,
SUM(amount) OVER (PARTITION BY user_id ORDER BY event_time) as running_total,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY event_time DESC) as event_rank
FROM `project.dataset.events`;
-- CTEs for complex queries
WITH daily_stats AS (
SELECT
DATE(event_time) as event_date,
user_id,
COUNT(*) as events,
SUM(amount) as revenue
FROM `project.dataset.events`
WHERE event_type = 'purchase'
GROUP BY event_date, user_id
),
user_segments AS (
SELECT
user_id,
CASE
WHEN SUM(revenue) > 1000 THEN 'high_value'
WHEN SUM(revenue) > 100 THEN 'medium_value'
ELSE 'low_value'
END as segment
FROM daily_stats
GROUP BY user_id
)
SELECT
s.segment,
COUNT(DISTINCT d.user_id) as users,
SUM(d.revenue) as total_revenue
FROM daily_stats d
JOIN user_segments s ON d.user_id = s.user_id
GROUP BY s.segment;
-- UNNEST arrays
SELECT
event_id,
tag
FROM `project.dataset.events`,
UNNEST(tags) as tag
WHERE tag LIKE 'promo%';
-- JSON functions
SELECT
event_id,
JSON_VALUE(event_data, '$.product_id') as product_id,
CAST(JSON_VALUE(event_data, '$.quantity') AS INT64) as quantity
FROM `project.dataset.events`;
Python Query Execution
from google.cloud import bigquery
import pandas as pd
client = bigquery.Client()
# Simple query
query = """
SELECT user_id, COUNT(*) as event_count
FROM `project.dataset.events`
GROUP BY user_id
LIMIT 1000
"""
results = client.query(query).to_dataframe()
print(results.head())
# Parameterized query (prevent SQL injection)
query = """
SELECT *
FROM `project.dataset.events`
WHERE event_type = @event_type
AND event_time >= @start_time
"""
job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter('event_type', 'STRING', 'purchase'),
bigquery.ScalarQueryParameter('start_time', 'TIMESTAMP', '2024-01-01'),
]
)
results = client.query(query, job_config=job_config).to_dataframe()
# Destination table (save results)
job_config = bigquery.QueryJobConfig(
destination='project.dataset.aggregated_events',
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
)
query_job = client.query(
"SELECT * FROM `project.dataset.events` WHERE amount > 100",
job_config=job_config
)
query_job.result()
# Dry run (estimate cost)
job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
query_job = client.query(query, job_config=job_config)
print(f'Query will process {query_job.total_bytes_processed / 1e9:.2f} GB')
Partitioning & Clustering
-- Partitioning reduces query cost by scanning less data
-- Time-partitioned table (most common)
CREATE TABLE `project.dataset.events_partitioned`
PARTITION BY DATE(event_time)
CLUSTER BY user_id, event_type
AS SELECT * FROM `project.dataset.events`;
-- Integer-range partitioning
CREATE TABLE `project.dataset.events_by_id`
PARTITION BY RANGE_BUCKET(user_shard, GENERATE_ARRAY(0, 100, 1))
AS SELECT *, MOD(ABS(FARM_FINGERPRINT(user_id)), 100) as user_shard
FROM `project.dataset.events`;
-- Query with partition filter (required if partition filter required is set)
SELECT *
FROM `project.dataset.events_partitioned`
WHERE DATE(event_time) = '2024-01-15' -- Scans only one partition
AND user_id = 'u123'; -- Uses clustering for efficiency
-- Python: Create partitioned table
from google.cloud import bigquery
client = bigquery.Client()
table_id = 'project.dataset.sales_partitioned'
schema = [
bigquery.SchemaField('sale_id', 'STRING'),
bigquery.SchemaField('product_id', 'STRING'),
bigquery.SchemaField('amount', 'NUMERIC'),
bigquery.SchemaField('sale_date', 'DATE'),
]
table = bigquery.Table(table_id, schema=schema)
# Time partitioning
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field='sale_date',
expiration_ms=90 * 24 * 60 * 60 * 1000, # 90 days retention
)
# Clustering
table.clustering_fields = ['product_id']
# Require partition filter
table.require_partition_filter = True
table = client.create_table(table)
print(f'Created partitioned table {table.table_id}')
Streaming Inserts
from google.cloud import bigquery
from datetime import datetime
client = bigquery.Client()
table_id = 'project.dataset.events'
# Single row insert
rows_to_insert = [
{
'event_id': 'e001',
'user_id': 'u123',
'event_type': 'purchase',
'event_time': datetime.utcnow().isoformat(),
'amount': 99.99
}
]
errors = client.insert_rows_json(table_id, rows_to_insert)
if errors:
print(f'Errors: {errors}')
else:
print('Rows inserted successfully')
# Batch streaming (more efficient)
from google.cloud.bigquery import StreamingBuffer
import json
def stream_events(events, batch_size=500):
"""Stream events in batches"""
table_id = 'project.dataset.events'
for i in range(0, len(events), batch_size):
batch = events[i:i + batch_size]
errors = client.insert_rows_json(table_id, batch)
if errors:
print(f'Batch {i//batch_size} errors: {errors}')
else:
print(f'Batch {i//batch_size} inserted: {len(batch)} rows')
# Note: Streaming has buffer delay (~90 seconds)
# For real-time queries, use:
# SELECT * FROM `project.dataset.events`
# WHERE _PARTITIONTIME IS NULL -- Streaming buffer
# UNION ALL
# SELECT * FROM `project.dataset.events`
# WHERE _PARTITIONTIME IS NOT NULL -- Committed data
Materialized Views
-- Materialized views cache query results
-- BigQuery automatically refreshes them
-- Create materialized view
CREATE MATERIALIZED VIEW `project.dataset.daily_sales_mv`
OPTIONS (
enable_refresh = true,
refresh_interval_minutes = 30
)
AS
SELECT
DATE(sale_time) as sale_date,
product_id,
COUNT(*) as transaction_count,
SUM(amount) as total_revenue,
AVG(amount) as avg_order_value
FROM `project.dataset.sales`
GROUP BY sale_date, product_id;
-- Query automatically uses MV when beneficial
SELECT *
FROM `project.dataset.daily_sales_mv`
WHERE sale_date = '2024-01-15';
-- Force refresh
CALL BQ.REFRESH_MATERIALIZED_VIEW('project.dataset.daily_sales_mv');
-- Python: Create materialized view
client = bigquery.Client()
mv_query = """
SELECT
DATE(event_time) as event_date,
event_type,
COUNT(*) as count
FROM `project.dataset.events`
GROUP BY event_date, event_type
"""
mv_id = 'project.dataset.events_summary_mv'
mv = bigquery.Table(mv_id)
mv.mview_query = mv_query
mv.mview_enable_refresh = True
mv.mview_refresh_interval = datetime.timedelta(minutes=60)
mv = client.create_table(mv)
print(f'Created materialized view {mv.table_id}')
GCP Data Integration
# BigQuery integrates with other GCP services
# 1. Cloud Storage (External Tables)
-- Query data directly from GCS without loading
CREATE EXTERNAL TABLE `project.dataset.external_logs`
OPTIONS (
format = 'PARQUET',
uris = ['gs://bucket/logs/*.parquet']
);
# 2. Dataflow (Apache Beam)
# Write streaming data to BigQuery
import apache_beam as beam
from apache_beam.io.gcp.bigquery import WriteToBigQuery
pipeline = beam.Pipeline()
(pipeline
| 'Read' >> beam.io.ReadFromPubSub(topic='projects/proj/topics/events')
| 'Parse' >> beam.Map(json.loads)
| 'Write' >> WriteToBigQuery(
'project:dataset.events',
schema='event_id:STRING,user_id:STRING,event_time:TIMESTAMP',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
# 3. Pub/Sub (BigQuery Subscription)
# Direct streaming from Pub/Sub to BigQuery
# gcloud pubsub subscriptions create bq-sub \
# --topic=events \
# --bigquery-table=project:dataset.events
# 4. Cloud Composer (Airflow)
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryInsertJobOperator
)
run_query = BigQueryInsertJobOperator(
task_id='run_aggregation',
configuration={
'query': {
'query': 'SELECT * FROM dataset.events',
'destinationTable': {
'projectId': 'project',
'datasetId': 'dataset',
'tableId': 'aggregated'
},
'writeDisposition': 'WRITE_TRUNCATE'
}
}
)
BigQuery ML
-- Train ML models using SQL
-- Create a linear regression model
CREATE OR REPLACE MODEL `project.dataset.revenue_model`
OPTIONS (
model_type='LINEAR_REG',
input_label_cols=['revenue']
) AS
SELECT
user_tenure_days,
total_purchases,
avg_order_value,
revenue
FROM `project.dataset.user_features`;
-- Evaluate model
SELECT *
FROM ML.EVALUATE(MODEL `project.dataset.revenue_model`);
-- Make predictions
SELECT
user_id,
predicted_revenue
FROM ML.PREDICT(
MODEL `project.dataset.revenue_model`,
(SELECT * FROM `project.dataset.new_users`)
);
-- Classification model
CREATE OR REPLACE MODEL `project.dataset.churn_model`
OPTIONS (
model_type='LOGISTIC_REG',
input_label_cols=['churned'],
auto_class_weights=TRUE
) AS
SELECT
days_since_last_purchase,
total_orders,
support_tickets,
churned
FROM `project.dataset.user_churn_data`;
-- Feature importance
SELECT *
FROM ML.FEATURE_IMPORTANCE(MODEL `project.dataset.churn_model`)
ORDER BY importance_weight DESC;
Cost Optimization
-- BigQuery cost optimization strategies
-- 1. Use partitioning and clustering
-- Always filter on partition column
-- 2. Select only needed columns
-- Bad: SELECT * FROM table
-- Good: SELECT col1, col2 FROM table
-- 3. Use LIMIT for exploration (doesn't reduce cost for full table)
-- Use WHERE clause to reduce data scanned
-- 4. Preview queries (dry run)
-- Shows bytes processed before running
-- 5. Use materialized views for repeated aggregations
-- 6. Set up cost controls
-- Project-level quota: bytes processed per day/user
-- Python: Monitor costs
from google.cloud import bigquery
client = bigquery.Client()
# Query to analyze costs
query = """
SELECT
user_email,
DATE(creation_time) as query_date,
COUNT(*) as query_count,
SUM(total_bytes_processed) / POW(10, 12) as tb_processed,
SUM(total_bytes_processed) / POW(10, 12) * 5 as estimated_cost_usd
FROM `region-us`.INFORMATION_SCHEMA.JOBS
WHERE creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)
AND job_type = 'QUERY'
GROUP BY user_email, query_date
ORDER BY estimated_cost_usd DESC
"""
costs = client.query(query).to_dataframe()
print(costs.head(20))
# Set custom cost limit per query
job_config = bigquery.QueryJobConfig(
maximum_bytes_billed=10 * 1024 * 1024 * 1024 # 10 GB limit
)
results = client.query(query, job_config=job_config)
Best Practices
- Partition all large tables: Reduces query costs significantly
- Cluster frequently filtered columns: After partitioning
- Use column selection: Avoid SELECT * in production
- Set cost controls: Prevent runaway query costs
- Use materialized views: For repeated aggregations
- Schedule queries: Use BigQuery scheduled queries for ETL
Master BigQuery & GCP
Our Data Engineering program covers cloud data warehousing with BigQuery and GCP services.
Explore Data Engineering Program