Master Google Cloud Logging for unified observability across BigQuery, Dataflow, and Composer. Real-world strategies for production data pipelines.
Data pipelines are the arteries of modern analytics. They move terabytes of information through BigQuery, transform data with Dataflow, and orchestrate workflows with Cloud Composer—but only if you can see what's happening inside them. That's where Google Cloud Logging comes in.
Google Cloud Logging is GCP's centralized log management service that ingests, stores, and analyzes logs from virtually every service in your data stack. Unlike scattered log files on individual servers or application-level logging that only captures business events, Cloud Logging gives you a unified view of system events, application errors, audit trails, and custom metrics across your entire data infrastructure.
For data engineering teams managing production pipelines, this matters enormously. A Dataflow job that silently fails at 3 AM, a BigQuery query that runs 10x slower than expected, or a Composer DAG that skips tasks without alerting—these aren't theoretical problems. They're the kinds of issues that cascade into stale dashboards, missed SLAs, and data quality disasters. Implementing observability in GCP requires more than just turning on logging; it requires understanding how to instrument each component of your pipeline and correlate signals across services.
Cloud Logging sits at the center of that strategy. It's not a replacement for application-specific monitoring or custom dashboards—it's the foundation that makes everything else possible. When you're running analytics at scale, you need to know the difference between a transient network blip and a structural problem in your pipeline. You need to trace a data quality issue back to its source. You need alerts that actually mean something, not noise.
This article walks you through how to instrument Google Cloud Logging across BigQuery, Dataflow, and Composer, then how to actually use those logs to understand what's happening in your pipelines. We'll cover the mechanics of log ingestion, query patterns that reveal real problems, and how to build observability into your architecture from the start.
Google Cloud's observability suite consists of three interconnected services: Cloud Logging, Cloud Monitoring, and Cloud Trace. Each serves a different purpose, but they work together.
Cloud Logging captures raw events—what happened, when, and where. It's the source of truth. A Dataflow worker crashed. A BigQuery query timed out. A Composer task failed. These are log events.
Cloud Monitoring aggregates metrics—numerical measurements over time. CPU usage, query latency, job completion rate. These are typically derived from logs or emitted directly by applications.
Cloud Trace tracks distributed requests across services. When a data request flows through multiple systems, Trace shows you the path and identifies bottlenecks.
For data pipeline observability, Cloud Logging and Monitoring form the core of your observability strategy. Trace is useful for latency-sensitive workloads but less critical for batch pipelines. The real power comes from understanding how to structure your logs so you can query them effectively and surface patterns that matter.
Think of it this way: if your data pipeline is a production system (and it should be treated as one), then Cloud Logging is your system's nervous system. It tells you what's happening. The question is whether you're listening.
BigQuery is often treated as a black box. You submit a query, it returns results, and you move on. But BigQuery generates extensive logs that reveal exactly what's happening inside that box: which tables were scanned, how much data was processed, whether slot reservations were used, and where query execution got stuck.
BigQuery automatically writes audit logs to Cloud Logging, but you need to enable them explicitly. There are two types:
Admin Activity logs capture metadata operations: dataset creation, table schema changes, permission modifications. These are enabled by default and stored for 30 days at no cost.
Data Access logs capture read operations: queries executed, tables scanned, data exported. These are disabled by default because they generate high volume and incur storage costs. But for production pipelines, they're essential.
To enable Data Access logs, navigate to your GCP project's IAM & Admin > Audit Logs, find BigQuery API, and check "Data Access." This immediately starts logging every query execution.
The cost is real—a high-volume query workload can generate gigabytes of logs daily. But the alternative is flying blind. Building an audit log analysis pipeline with BigQuery on GCP is a standard pattern: ingest Cloud Logging data into a separate BigQuery dataset, then query it to understand your own usage.
When a query executes, BigQuery writes a log entry containing:
These fields let you answer real questions:
For data pipeline observability, this means you can detect when a scheduled query suddenly becomes expensive (indicator of data quality issues or upstream changes), when a reporting dashboard query starts timing out (signal that the dataset has grown), or when an analytics team is running unoptimized queries at scale.
Dataflow is where the heavy lifting happens in most data pipelines. It processes terabytes of data, applies transformations, and writes results to BigQuery or Cloud Storage. But Dataflow jobs run across distributed workers, and distributed systems are inherently harder to observe.
Dataflow automatically writes logs to Cloud Logging from every worker. These logs include:
The key is that Dataflow logs are tied to a job ID, which lets you correlate all logs from a single pipeline run. If a job fails, you can query Cloud Logging for that job ID and see the complete execution history across all workers.
Out of the box, Dataflow logs are basic. To make them useful for observability, you need to instrument your pipeline code.
In Apache Beam (the framework underlying Dataflow), use the logging library to emit structured logs:
import logging
logger = logging.getLogger(__name__)
class MyDoFn(beam.DoFn):
def process(self, element):
try:
result = transform(element)
logger.info(f"Processed element", extra={
"element_id": element["id"],
"result_size": len(result),
"timestamp": datetime.now().isoformat()
})
yield result
except Exception as e:
logger.error(f"Failed to process element", extra={
"element_id": element["id"],
"error": str(e),
"error_type": type(e).__name__
})Structured logging (using the extra parameter) ensures that fields are queryable in Cloud Logging. Avoid unstructured log messages like "Something went wrong"—they're useless for observability.
With proper instrumentation, Cloud Logging reveals:
Data pipeline observability architecture requires logging every pipeline component, not just high-level job status. This means instrumenting custom transforms, external API calls, and data quality checks.
Cloud Composer is GCP's managed Airflow service. It orchestrates your data pipelines—triggering Dataflow jobs, running BigQuery queries, and coordinating dependencies. Unlike Dataflow, which processes data, Composer manages the workflow. But Composer still needs observability.
Cloud Composer automatically sends Airflow logs to Cloud Logging. These include:
Unlike BigQuery and Dataflow, which have native GCP logging integration, Airflow logs require a bit more setup. By default, Composer sends logs to Cloud Logging, but you need to ensure your DAG code is writing logs that are actually useful.
In Airflow, use the built-in logger to emit structured information:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.log.logging_mixin import LoggingMixin
import logging
logger = logging.getLogger(__name__)
def extract_data(**context):
execution_date = context["execution_date"]
logger.info(f"Starting extraction", extra={
"execution_date": execution_date.isoformat(),
"task_id": context["task"].task_id,
"dag_id": context["dag"].dag_id
})
# Your extraction logic
rows_extracted = 1000000
logger.info(f"Extraction complete", extra={
"rows_extracted": rows_extracted,
"duration_seconds": 45
})
with DAG("data_pipeline", ...) as dag:
extract = PythonOperator(
task_id="extract",
python_callable=extract_data
)Key fields to include in Composer logs:
With proper logging, Cloud Logging reveals:
Now you have logs from BigQuery, Dataflow, and Composer all flowing into Cloud Logging. The next step is actually querying them to understand what's happening.
The Logs Explorer is Cloud Logging's query interface. It uses a filter syntax that looks like:
resource.type="bigquery_resource"
severity="ERROR"
timestamp>="2024-01-15T00:00:00Z"
You can filter by:
Using Logs Explorer and Observability Analytics allows you to query logs and generate insights directly in Cloud Logging without exporting to BigQuery first.
Here are practical queries for data pipeline observability:
Find all failed BigQuery queries in the last hour:
resource.type="bigquery_resource"
severity="ERROR"
timestamp>="2024-01-15T23:00:00Z"
Find Dataflow jobs that processed more than 1TB:
resource.type="dataflow_job"
jsonPayload.bytes_processed>1000000000000
Find Cloud Composer tasks that took longer than 30 minutes:
resource.type="cloud_composer_environment"
jsonPayload.duration_seconds>1800
Find all queries from a specific service account:
resource.type="bigquery_resource"
protoPayload.authenticationInfo.principalEmail="[email protected]"
For serious observability, export Cloud Logging data to BigQuery. This lets you run SQL queries on your logs and build dashboards.
Create a log sink in Cloud Logging:
Cloud Logging will now write all matching logs to a BigQuery table. The table schema is automatically created, with columns for timestamp, severity, resource, jsonPayload, and more.
Once in BigQuery, you can run SQL to analyze your pipeline:
SELECT
jsonPayload.job_id,
jsonPayload.query,
jsonPayload.bytes_billed,
jsonPayload.query_duration_seconds,
jsonPayload.user_identity
FROM `myproject.cloud_logging.cloudaudit_googleapis_com_activity`
WHERE resource.type = "bigquery_resource"
AND jsonPayload.bytes_billed > 1000000000000
AND timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)
ORDER BY jsonPayload.bytes_billed DESCThis query finds your most expensive queries from the last week—essential for cost optimization.
Logs are raw data. Observability comes from turning those logs into actionable insights. This means building dashboards, setting up alerts, and establishing runbooks.
Once logs are in BigQuery, you can create dashboards using D23, which provides embedded analytics and self-serve BI on Apache Superset, Looker Studio, or any BI tool. The key is tracking metrics that matter for your pipelines:
Data Freshness: How often are your datasets updated? Are pipelines running on schedule?
SELECT
DATE(timestamp) as date,
COUNT(*) as pipeline_runs,
COUNTIF(status = "SUCCESS") as successful_runs,
COUNTIF(status = "FAILED") as failed_runs,
ROUND(100 * COUNTIF(status = "SUCCESS") / COUNT(*), 2) as success_rate
FROM `myproject.pipeline_logs.execution_log`
GROUP BY date
ORDER BY date DESCQuery Performance: Are BigQuery queries getting slower?
SELECT
DATE(timestamp) as date,
APPROX_QUANTILES(query_duration_seconds, 100)[OFFSET(50)] as p50_duration,
APPROX_QUANTILES(query_duration_seconds, 100)[OFFSET(95)] as p95_duration,
APPROX_QUANTILES(query_duration_seconds, 100)[OFFSET(99)] as p99_duration,
MAX(query_duration_seconds) as max_duration
FROM `myproject.cloud_logging.cloudaudit_googleapis_com_activity`
WHERE resource.type = "bigquery_resource"
GROUP BY date
ORDER BY date DESCData Quality: Are transformations dropping records?
SELECT
jsonPayload.transform_name,
SUM(jsonPayload.records_input) as total_input,
SUM(jsonPayload.records_output) as total_output,
ROUND(100 * (1 - SUM(jsonPayload.records_output) / SUM(jsonPayload.records_input)), 2) as drop_rate
FROM `myproject.cloud_logging.dataflow_logs`
WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)
GROUP BY jsonPayload.transform_name
HAVING drop_rate > 5Dashboards are passive—they show you what happened. Alerts are active—they tell you when something is wrong.
In Cloud Monitoring, create alert policies based on your logs:
Alert: Pipeline failure rate exceeds 10%
resource.type="cloud_composer_environment"
jsonPayload.status="FAILED"
Set the condition: if the rate of failed tasks exceeds 10% of all tasks in a 5-minute window, trigger an alert.
Alert: BigQuery query latency p95 exceeds 30 seconds
resource.type="bigquery_resource"
Set the condition: if the 95th percentile query duration exceeds 30 seconds, trigger an alert.
Alert: Dataflow job processing rate drops below expected
resource.type="dataflow_job"
Set the condition: if records per second drops below your baseline, trigger an alert.
The key to effective alerting is avoiding noise. Every alert should require action. If you're getting paged for alerts you ignore, you've lost observability.
When something goes wrong in your data pipeline, it rarely fails in isolation. A Dataflow job fails because a BigQuery table isn't ready. A Composer DAG times out because a query is slow. Understanding these correlations is where observability becomes powerful.
The most useful pattern is adding a trace ID to logs across your entire pipeline. When a record enters your pipeline, assign it a unique ID and include that ID in every log from every service.
In your Composer DAG, generate a trace ID:
import uuid
def extract_data(**context):
trace_id = str(uuid.uuid4())
context["task_instance"].xcom_push(key="trace_id", value=trace_id)
logger.info(f"Starting extraction", extra={"trace_id": trace_id})Pass the trace ID to Dataflow:
from airflow.providers.google.cloud.operators.dataflow import DataflowTemplateOperator
transform = DataflowTemplateOperator(
task_id="transform",
template="gs://my-templates/transform",
parameters={
"trace_id": "{{ ti.xcom_pull(task_ids='extract', key='trace_id') }}"
}
)In your Dataflow code, include the trace ID in all logs:
class MyDoFn(beam.DoFn):
def process(self, element, trace_id):
logger.info(f"Processing element", extra={"trace_id": trace_id, "element_id": element["id"]})Now, when something goes wrong, you can query Cloud Logging for a specific trace ID and see the complete path of a record through your entire pipeline:
trace_id="abc123def456"
This query returns every log entry related to that record, across Composer, Dataflow, and BigQuery. Instead of looking at millions of logs, you're looking at the specific execution path that matters.
Beyond individual trace IDs, you can correlate patterns across logs:
When BigQuery queries get slow, do Dataflow jobs also get slow?
WITH bigquery_latency AS (
SELECT
TIMESTAMP_TRUNC(timestamp, HOUR) as hour,
APPROX_QUANTILES(query_duration_seconds, 100)[OFFSET(95)] as bq_p95
FROM `myproject.cloud_logging.cloudaudit_googleapis_com_activity`
WHERE resource.type = "bigquery_resource"
GROUP BY hour
),
dataflow_latency AS (
SELECT
TIMESTAMP_TRUNC(timestamp, HOUR) as hour,
APPROX_QUANTILES(jsonPayload.processing_time_seconds, 100)[OFFSET(95)] as df_p95
FROM `myproject.cloud_logging.dataflow_logs`
GROUP BY hour
)
SELECT
bq.hour,
bq.bq_p95,
df.df_p95,
CORR(bq.bq_p95, df.df_p95) OVER (ORDER BY bq.hour ROWS BETWEEN 7 PRECEDING AND CURRENT ROW) as correlation
FROM bigquery_latency bq
JOIN dataflow_latency df ON bq.hour = df.hour
ORDER BY bq.hour DESCIf BigQuery and Dataflow latency are correlated, the issue is likely shared infrastructure (network, storage). If they're independent, the issue is specific to one service.
Here's a concrete example of implementing observability for a real data pipeline:
The Pipeline: A Composer DAG that runs daily, extracts data from a source system using Dataflow, loads it into BigQuery, and serves it via dashboards.
The Problem: The pipeline takes 2 hours on Monday but 4 hours on Friday. You need to understand why.
The Solution:
SELECT
FORMAT_TIMESTAMP("%A", timestamp) as day_of_week,
COUNT(DISTINCT DATE(timestamp)) as num_runs,
APPROX_QUANTILES(jsonPayload.duration_seconds, 100)[OFFSET(50)] as p50_duration,
APPROX_QUANTILES(jsonPayload.duration_seconds, 100)[OFFSET(95)] as p95_duration,
MAX(jsonPayload.duration_seconds) as max_duration
FROM `myproject.cloud_logging.pipeline_logs`
WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 90 DAY)
GROUP BY day_of_week
ORDER BY day_of_weekSELECT
jsonPayload.stage,
COUNT(*) as num_runs,
APPROX_QUANTILES(jsonPayload.duration_seconds, 100)[OFFSET(95)] as p95_duration
FROM `myproject.cloud_logging.pipeline_logs`
WHERE FORMAT_TIMESTAMP("%A", timestamp) = "Friday"
AND timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 90 DAY)
GROUP BY jsonPayload.stage
ORDER BY p95_duration DESCThis reveals which stage is slow on Fridays. Maybe the Dataflow job takes longer because the source system is busier. Maybe the BigQuery load is slower because the dataset is larger. Once you know which stage, you can investigate further.
While Cloud Logging and Cloud Monitoring are powerful, many organizations want to visualize pipeline observability alongside their business metrics. D23 provides embedded analytics and self-serve BI capabilities that can integrate with your observability data.
You can export your pipeline logs to BigQuery, then use D23's API-first approach and self-serve BI features to create dashboards that show both operational metrics (pipeline latency, failure rates) and business metrics (data freshness, query costs) in one place.
This is particularly valuable for data leaders who need to communicate pipeline health to stakeholders. Instead of maintaining separate observability dashboards and business dashboards, you have a unified view of how your data infrastructure supports your analytics.
Implementing observability isn't a one-time project—it's a practice. Here are the core principles:
1. Instrument from the start: Don't add logging as an afterthought. When you build a new pipeline, include structured logging from day one.
2. Use structured logging: Avoid unstructured messages. Use JSON payloads with queryable fields.
3. Include context: Every log should include enough information to understand what was happening (task ID, execution date, user, resource).
4. Log at the right level: DEBUG for development, INFO for normal operation, WARNING for unexpected conditions, ERROR for failures.
5. Set up alerts early: Don't wait for a production incident to realize you need monitoring. Define what "normal" looks like and alert when you deviate from it.
6. Correlate signals: Use trace IDs and timestamps to connect logs across services.
7. Review and iterate: Observability is never complete. As your pipelines grow, your observability needs to grow with them.
Pitfall 1: Logging too much
Cloud Logging charges by ingestion volume. If you log every record processed by Dataflow, your logs will cost more than your compute. Log at the task level ("processed 1M records"), not the record level.
Pitfall 2: Logs with no context
A log that says "Error: Connection refused" is useless without context. Which service? Which resource? When? Include enough information to act on the log.
Pitfall 3: Alerts that don't alert
If you have 50 alerts and only 5 of them require action, you'll ignore all of them. Be selective. Every alert should be actionable.
Pitfall 4: No runbooks
When an alert fires, your team needs to know what to do. Document runbooks for common issues: "If pipeline failure rate exceeds 10%, check if the source system is down."
Pitfall 5: Forgetting about retention
Cloud Logging retains logs for 30 days by default. If you need longer retention, export to BigQuery. But be aware of storage costs.
Data pipelines are critical infrastructure. When they fail, your analytics fail. When they're slow, your decisions are delayed. When they're inefficient, your costs skyrocket.
Google Cloud Logging provides the foundation for observability across your entire data stack, but only if you implement it thoughtfully. This means instrumenting BigQuery, Dataflow, and Composer with structured logging. It means exporting logs to BigQuery so you can analyze them with SQL. It means building dashboards and alerts that actually tell you what's happening.
The investment pays dividends. When you have visibility into your pipelines, you can optimize them. You can catch problems before they affect users. You can understand the true cost of your analytics infrastructure.
Start small: pick one critical pipeline, add structured logging, export logs to BigQuery, and build a dashboard. Once you see the value, expand to your entire data platform. Organizations implementing observability in GCP see dramatic improvements in reliability and cost efficiency.
Your data pipelines deserve to be as observable as your applications. Cloud Logging is the tool. The question is whether you're ready to use it.