19: Working with Big Data - Dask, Polars, and Distributed Computing

Chapter 19: Working with Big Data - Dask, Polars, and Distributed Computing
Section titled “Chapter 19: Working with Big Data - Dask, Polars, and Distributed Computing”Overview
Section titled “Overview”In the previous chapters, you mastered the “Standard Stack”: pandas, NumPy, and scikit-learn. These tools are incredible, but they share a common limitation: they are eager and single-core. They load your entire dataset into RAM before performing any operation.
As a PHP developer, you’ve likely faced “Out of Memory” errors when trying to json_decode a massive file or process millions of DB rows at once. In the data science world, we call this the “RAM Wall.” If your dataset is 20GB and your machine has 16GB of RAM, pandas will crash.
In this chapter, we scale up. You will learn to use Polars (the modern, lightning-fast, multi-threaded alternative to pandas) and Dask (the framework for parallel and distributed computing). We’ll move from “In-Memory” processing to “Out-of-Core” processing—where data is streamed from disk and processed in parallel across all your CPU cores, or even multiple servers.
By the end of this chapter, you’ll know how to build data pipelines that handle 100GB+ datasets as easily as a small CSV, and you’ll learn how to orchestrate these massive jobs from your PHP application.
Prerequisites
Section titled “Prerequisites”Before starting this chapter, you should have:
- Completed Chapter 18: Data Visualization
- Python 3.11+ (Polars benefits significantly from newer versions)
polars,dask, anddistributedinstalled- Estimated Time: ~2 hours
Verify your setup:
# Install big data librariespip install polars dask distributed pyarrow
# Verify installationpython3 -c "import polars; import dask; print('Big Data stack ready!')"What You’ll Build
Section titled “What You’ll Build”By the end of this chapter, you will have created:
- Polars Performance Suite: A multi-threaded pipeline that processes 10M+ rows in a fraction of pandas’ time.
- Dask Out-of-Core Analyzer: A script that analyzes a 50GB dataset (simulated) on a standard laptop.
- Lazy Data Pipeline: A “set it and forget it” workflow that only executes when the result is needed.
- PHP Data Orchestrator: A PHP service that dispatches big data jobs to a distributed Python cluster.
Objectives
Section titled “Objectives”- Understand the difference between Eager and Lazy evaluation.
- Master the Polars Expressions API for multi-threaded performance.
- Use Dask DataFrames to parallelize pandas operations across all CPU cores.
- Implement Out-of-Core processing for datasets larger than RAM.
- Scale from a local machine to a Distributed Cluster.
- Orchestrate heavy Python processing from PHP 8.4 applications.
Step 1: Blazing Fast Data with Polars (~30 min)
Section titled “Step 1: Blazing Fast Data with Polars (~30 min)”Learn the Polars API and understand why multi-threading and lazy evaluation make it the new industry favorite.
Why It Matters
Section titled “Why It Matters”Pandas is written mostly in Python and C, but it is single-threaded. If you have an 8-core CPU, pandas only uses one. Polars is written in Rust and is multi-threaded by default. It also uses “Lazy Evaluation”—it looks at your entire query and optimizes it (like a SQL query planner) before running a single calculation.
Actions
Section titled “Actions”1. Create a Polars experimentation script:
import polars as plimport time
# 1. Generate 1 million rows of synthetic datanum_rows = 1_000_000df = pl.DataFrame({ "id": range(num_rows), "category": ["A", "B", "C", "D"] * (num_rows // 4), "value": [i * 0.1 for i in range(num_rows)]})
# 2. Eager execution (like pandas)start = time.time()result_eager = df.filter(pl.col("category") == "A").group_by("category").agg(pl.col("value").sum())print(f"Eager execution time: {time.time() - start:.4f}s")
# 3. Lazy execution (The Polars secret sauce)# .lazy() starts a recording session. No data is processed yet.start = time.time()lazy_query = ( df.lazy() .filter(pl.col("category") == "A") .with_columns((pl.col("value") * 2).alias("value_doubled")) .group_by("category") .agg([ pl.col("value").mean().alias("avg_value"), pl.col("value_doubled").sum().alias("total_doubled") ]))
# .collect() triggers the optimized executionresult_lazy = lazy_query.collect()print(f"Lazy execution time (with optimization): {time.time() - start:.4f}s")print(result_lazy)2. Run the script:
python3 examples/polars_lab.pyExpected Result
Section titled “Expected Result”Eager execution time: 0.0450sLazy execution time (with optimization): 0.0120sshape: (1, 3)┌──────────┬───────────┬───────────────┐│ category ┆ avg_value ┆ total_doubled ││ --- ┆ --- ┆ --- ││ str ┆ f64 ┆ f64 │╞══════════╪═══════════╪═══════════════╡│ A ┆ 49999.95 ┆ 2.4999e10 │└──────────┴───────────┴───────────────┘Why It Works
Section titled “Why It Works”- Expressions:
pl.col("value").sum()isn’t just a function; it’s an expression that Polars can optimize. - Lazy Evaluation: When you call
.lazy(), Polars builds a Query Plan. If you filter forcategory == "A", Polars will try to do that filter before loading other columns or doing expensive math, saving memory and CPU cycles. - Multi-threading: Polars splits the work across all available CPU cores automatically.
Troubleshooting
Section titled “Troubleshooting”Problem: ImportError: Missing optional dependency 'pyarrow'
Cause: Polars uses Arrow for fast memory layout.
Solution: pip install pyarrow.
Step 2: Scaling Beyond RAM with Dask (~30 min)
Section titled “Step 2: Scaling Beyond RAM with Dask (~30 min)”Use Dask to process datasets that are significantly larger than your computer’s RAM.
Why It Matters
Section titled “Why It Matters”If you have a 100GB CSV and 16GB of RAM, Polars (in eager mode) will still crash. Dask solves this by breaking the 100GB file into small “chunks” (e.g., 100MB each). It only loads a few chunks into RAM at a time, processes them, and moves on. This is called Out-of-Core computing.
Actions
Section titled “Actions”1. Create a Dask out-of-core script:
import dask.dataframe as ddimport pandas as pdimport numpy as npimport os
# 1. Create a "large" simulated dataset (many small CSVs)os.makedirs("data/big_dataset", exist_ok=True)for i in range(5): temp_df = pd.DataFrame({ "timestamp": pd.date_range("2025-01-01", periods=100000, freq="S"), "sensor_id": np.random.randint(1, 100, 100000), "reading": np.random.randn(100000) }) temp_df.to_csv(f"data/big_dataset/part_{i}.csv", index=False)
# 2. Load with Dask (This is instantaneous because it's lazy)# Notice the wildcard '*' - Dask loads all files as one logical DataFrameddf = dd.read_csv("data/big_dataset/part_*.csv", parse_dates=["timestamp"])
# 3. Define the computation# Find the average reading per sensorresult = ddf.groupby("sensor_id").reading.mean()
# 4. Trigger the computation# .compute() is like Polars' .collect() - it starts the parallel workprint("Computing average readings across all files...")final_stats = result.compute()print(final_stats.head())Why It Works
Section titled “Why It Works”- Logical Partitioning: Dask treats the
part_*.csvfiles as partitions of a single massive table. - Parallel Task Graph: Dask builds a graph of “tasks” (Read File 1, Calculate Mean, Merge with File 2…). It then executes these tasks across your CPU cores.
- Pandas API: Dask DataFrames mimic the pandas API almost perfectly, so you don’t have to relearn everything.
Troubleshooting
Section titled “Troubleshooting”Problem: MemoryError even with Dask
Cause: You might have a single task that is too large (e.g., a massive join or sort that requires all data to be in memory at once).
Solution: Increase the number of partitions or use df.repartition(npartitions=...).
Step 3: Distributed Computing & The Dask Cluster (~30 min)
Section titled “Step 3: Distributed Computing & The Dask Cluster (~30 min)”Move from local parallel processing to a distributed cluster.
Why It Matters
Section titled “Why It Matters”When one machine isn’t enough, you need a Cluster. A Dask cluster consists of:
- The Scheduler: Manages the task graph.
- Workers: Perform the actual computations.
- The Client: Your script, which sends tasks to the scheduler.
Actions
Section titled “Actions”1. Set up a Local Cluster:
from dask.distributed import Client, LocalClusterimport dask.array as da
if __name__ == "__main__": # 1. Start a local cluster # This automatically detects your CPU cores and RAM cluster = LocalCluster() client = Client(cluster)
# 2. View the Dashboard URL (Very important for monitoring!) print(f"Dask Dashboard is running at: {client.dashboard_link}")
# 3. Create a massive random array (distributed across workers) # 10,000 x 10,000 array = 100 million elements x = da.random.random((10000, 10000), chunks=(1000, 1000))
# 4. Perform a complex math operation y = x + x.T - x.mean(axis=0)
# 5. Compute the result print("Calculating on cluster...") result = y.sum().compute() print(f"Final Sum: {result}")
# Keep running to allow viewing the dashboard # input("Press Enter to close the cluster...")Why It Works
Section titled “Why It Works”- Dask Dashboard: This is a beautiful web interface (usually on port 8787) that shows you exactly which worker is busy, how much RAM is being used, and which tasks are running.
- Data Locality: Dask tries to move the computation to where the data is, rather than moving the data to the computation, reducing network overhead.
Step 4: PHP Job Orchestration (~30 min)
Section titled “Step 4: PHP Job Orchestration (~30 min)”Trigger and monitor big data Python jobs from your PHP 8.4 application.
Why It Matters
Section titled “Why It Matters”You shouldn’t run a 2-hour Dask job inside a synchronous PHP request. Instead, use PHP to dispatch the job and poll for completion.
Actions
Section titled “Actions”1. Create a Python Worker Script:
import sysimport jsonimport polars as pl
def process_logs(input_file, output_file): # Use Polars for fast processing df = pl.scan_csv(input_file) # .scan_csv() is the lazy version of .read_csv()
summary = ( df.group_by("level") .agg(pl.count("id").alias("count")) .collect() )
summary.write_csv(output_file) return {"status": "success", "output": output_file}
if __name__ == "__main__": # Simulate receiving arguments from PHP args = json.loads(sys.stdin.read()) result = process_logs(args['input'], args['output']) print(json.dumps(result))2. Create a PHP Orchestrator:
<?phpdeclare(strict_types=1);
namespace App\DataScience;
class BigDataJob{ public function __construct( private string $pythonPath = 'python3', private string $workerScript = 'services/big_data_worker.py' ) {}
public function run(string $inputFile, string $outputFile): array { $payload = json_encode([ 'input' => $inputFile, 'output' => $outputFile ]);
$descriptorspec = [ 0 => ["pipe", "r"], // stdin 1 => ["pipe", "w"], // stdout 2 => ["pipe", "w"] // stderr ];
$process = proc_open("{$this->pythonPath} {$this->workerScript}", $descriptorspec, $pipes);
if (is_resource($process)) { fwrite($pipes[0], $payload); fclose($pipes[0]);
$stdout = stream_get_contents($pipes[1]); $stderr = stream_get_contents($pipes[2]);
fclose($pipes[1]); fclose($pipes[2]); $returnValue = proc_close($process);
if ($returnValue !== 0) { throw new \RuntimeException("Python error: " . $stderr); }
return json_decode($stdout, true); }
throw new \RuntimeException("Could not start Python process"); }}
// Usagetry { $job = new BigDataJob(); echo "Starting Big Data Job via PHP...\n"; $result = $job->run('data/logs.csv', 'data/summary.csv'); echo "Job Complete! Summary saved to: " . $result['output'] . "\n";} catch (\Exception $e) { echo "Error: " . $e->getMessage() . "\n";}Why It Works
Section titled “Why It Works”- JSON Payload: We use a structured JSON payload to pass complex arguments (file paths, parameters) between PHP and Python.
- Error Handling: We capture
stderrto ensure that if Python crashes (e.g., File Not Found or Rust panic in Polars), PHP can log the exact error. - Scalability: In a real production environment, you would use a queue (like Laravel Queues or RabbitMQ) to handle these long-running
proc_opencalls.
Exercises
Section titled “Exercises”Exercise 1: The Polars Speed Demon
Section titled “Exercise 1: The Polars Speed Demon”Goal: Compare pandas and Polars on a medium dataset.
- Create a CSV with 5 million rows of random data.
- Write a script that reads the CSV and calculates the group-by mean of one column.
- Benchmark both pandas and Polars (lazy mode).
- Validation: Polars should be at least 3-5x faster.
Exercise 2: Out-of-Core Filter
Section titled “Exercise 2: Out-of-Core Filter”Goal: Use Dask to process data that “doesn’t exist” yet.
- Write a Dask script that scans a directory of CSVs.
- Apply a filter (e.g.,
value > 100). - Save the result to a new set of Parquet files using
df.to_parquet('data/output/'). - Validation: Check that the output directory contains multiple
.parquetfiles.
Exercise 3: PHP Cluster Monitor
Section titled “Exercise 3: PHP Cluster Monitor”Goal: Create a PHP script that checks the status of a Dask cluster.
- Start a Dask
LocalClusterin a background Python process. - Write a PHP script that pings the Dask Dashboard API (usually
http://localhost:8787/api/v1/status) usingcurl. - Display the number of active workers in your terminal.
Wrap-up
Section titled “Wrap-up”What You’ve Learned
Section titled “What You’ve Learned”In this chapter, you broke through the “RAM Wall” and learned to handle data at scale:
- The Polars Revolution: Why Rust-backed multi-threading is the future of DataFrames.
- Lazy Evaluation: How to build efficient query plans that only execute when needed.
- Out-of-Core Processing: Using Dask to handle datasets larger than your physical memory.
- Parallelism: Utilizing 100% of your CPU cores instead of just one.
- Distributed Thinking: Moving from a single machine to a cluster of workers.
- PHP Integration: Scaling your web application to handle massive analytical workloads.
What You’ve Built
Section titled “What You’ve Built”- Multi-threaded Data Pipelines: Blazing fast analytical code.
- Out-of-Core Analyzers: Tools that handle 100GB+ files on standard laptops.
- Lazy Execution Graphs: Optimized data workflows.
- PHP Orchestration Layer: A robust bridge between your web app and your big data workers.
Key Big Data Principles
Section titled “Key Big Data Principles”1. Don’t Collect until You Must
In both Polars and Dask, keep your data “Lazy” for as long as possible. Only call .collect() or .compute() at the very end.
2. Parquet over CSV
For big data, CSVs are slow. Parquet is a columnar format that is 10-50x faster to read and much smaller on disk. Use df.to_parquet().
3. Small Chunks, Big Throughput Ensure your partitions are small enough to fit in RAM (e.g., 100MB-500MB) but large enough that the overhead of the scheduler doesn’t dominate the computation.
4. Monitor the Dashboard The Dask Dashboard is your best friend. If your job is slow, the dashboard will show you the bottleneck (Network, CPU, or Disk).
Connection to Data Science Workflow
Section titled “Connection to Data Science Workflow”You are now a full-stack data engineer:
- ✅ Chapter 1-12: Built data systems in PHP.
- ✅ Chapter 13-18: Mastered Python, Stats, ML, and Viz.
- ✅ Chapter 19: Mastered Big Data & Scale ← You are here
- ➡️ Chapter 20: Final Chapter: Production ML Systems (MLOps).
Next Steps
Section titled “Next Steps”Immediate Practice:
- Try converting one of your pandas scripts from Chapter 14 into a Polars lazy script.
- Install the
dask-labextensionin Jupyter Lab to see your cluster status directly in your notebook. - Read the Polars User Guide—it’s incredibly well-written and covers advanced expressions.
Chapter 20 Preview:
In the final chapter, we’ll cover Production ML Systems (MLOps). You’ll learn:
- Model versioning and experiment tracking.
- Deploying models as Dockerized microservices.
- Monitoring for “Model Drift” (when your AI starts getting dumber).
- CI/CD for data science.
You’ll move from “experimenting” to “shipping production AI”!
Further Reading
Section titled “Further Reading”- Polars Official Documentation — Modern, fast, and Rust-powered.
- Dask Documentation — The industry standard for distributed Python.
- Dask Tutorial (GitHub) — Hands-on examples.
- High Performance Python — Excellent deep dive into scaling Python code.
::: tip Next Chapter Continue to Chapter 20: Production ML Systems - MLOps for PHP Developers to learn production ML deployment! :::