TB

MoppleIT Tech Blog

Welcome to my personal blog where I share thoughts, ideas, and experiences.

Stream Large Data in PowerShell with Begin/Process/End: Fast, Low-Memory Pipelines

When you process large files or long streams of objects, the difference between buffering and streaming is the difference between finishing in seconds and running out of memory. PowerShell’s advanced function lifecycle—Begin, Process, End—lets you initialize once, handle each input as it arrives, and finalize cleanly. The result is predictable output, lower memory, and high throughput that plays well with other tools in the pipeline.

Why Begin/Process/End keeps memory low and throughput high

Advanced functions in PowerShell use three blocks that map perfectly to streaming work:

  • Begin: Runs once before any pipeline input. Initialize resources (regex, HTTP clients, database connections, caches), precompute constants, and set counters.
  • Process: Runs once per input object. Work on a single item, emit results immediately, and avoid holding onto the entire dataset.
  • End: Runs once after all input is processed. Dispose resources, emit summaries, write totals, or finalize reports.

Compared to buffering, this pattern:

  • Reduces peak memory by not loading entire files or object sets at once.
  • Improves latency because downstream commands start receiving results immediately.
  • Stays testable since Begin and End are easy to assert, and Process is naturally unit-scoped.
  • Composes with other tools by emitting objects, not strings, enabling Sort-Object, Group-Object, Export-Csv, and more.

Anatomy of a streaming advanced function

The minimal skeleton

This is the basic template to keep in your toolbox:

function Invoke-StreamingThing {
  [CmdletBinding(SupportsShouldProcess)]
  param(
    [Parameter(Mandatory, ValueFromPipeline, ValueFromPipelineByPropertyName)]
    [string]$Path
  )

  Begin {
    # Initialize resources once (e.g., precompile regex)
    $wordRegex = [regex]::new('\S+', [System.Text.RegularExpressions.RegexOptions]::Compiled)
    $total = 0
  }

  Process {
    try {
      $resolved = Resolve-Path -LiteralPath $Path -ErrorAction Stop
      if ($PSCmdlet.ShouldProcess($resolved.Path, 'Process item')) {
        # Do unit-of-work per input and emit an object
        # ...
      }
    } catch {
      Write-Warning ("Skip {0}: {1}" -f $Path, $_.Exception.Message)
    }
  }

  End {
    # Optionally emit a summary object
    # [pscustomobject]@{ Type = 'Summary'; Total = $total }
  }
}

From buffered to streaming word count

A buffered approach often reads whole files into memory. That’s simple, but for large files it’s expensive. Here’s a non-streaming example that loads an entire file with -Raw:

# Buffered (loads entire file into memory)
function Get-WordCountBuffered {
  [CmdletBinding()]
  param(
    [Parameter(Mandatory, ValueFromPipeline)]
    [string]$Path
  )
  Begin { $total = 0 }
  Process {
    try {
      $text = Get-Content -LiteralPath $Path -Raw -ErrorAction Stop
      $count = ($text -split '\\s+') | Where-Object { $_ } | Measure-Object | Select-Object -ExpandProperty Count
      $total += $count
      [pscustomobject]@{ Path = (Resolve-Path -LiteralPath $Path).Path; Words = $count }
    } catch {
      Write-Warning ("Skip {0}: {1}" -f $Path, $_.Exception.Message)
    }
  }
  End { Write-Verbose ("Total words: {0}" -f $total) }
}

Here’s a streaming version that keeps memory flat by reading lines in batches and counting with a precompiled regex. It also emits a summary object in End, which is pipeline-friendly compared to Write-Host.

# Streaming (line batches; steady memory; fast)
function Get-WordCount {
  [CmdletBinding()]
  param(
    [Parameter(Mandatory, ValueFromPipeline, ValueFromPipelineByPropertyName)]
    [string]$Path,

    # Emit a total summary object at the end
    [switch]$IncludeSummary
  )

  Begin {
    $total = 0
    # Count consecutive non-whitespace runs as words
    $wordRegex = [regex]::new('\\S+', [System.Text.RegularExpressions.RegexOptions]::Compiled)
  }

  Process {
    try {
      $resolved = Resolve-Path -LiteralPath $Path -ErrorAction Stop
      $fileCount = 0

      # Read in manageable batches; adjust ReadCount to tune throughput
      Get-Content -LiteralPath $resolved.Path -ReadCount 4096 -ErrorAction Stop |
        ForEach-Object {
          foreach ($line in $_) {
            $fileCount += $wordRegex.Matches($line).Count
          }
        }

      $total += $fileCount
      [pscustomobject]@{ Path = $resolved.Path; Words = $fileCount }
    } catch {
      Write-Warning ("Skip {0}: {1}" -f $Path, $_.Exception.Message)
    }
  }

  End {
    if ($IncludeSummary) {
      [pscustomobject]@{ Path = 'TOTAL'; Words = $total }
    }
  }
}

# Usage
'file1.txt','file2.txt' | Get-WordCount -IncludeSummary | Sort-Object Path

Why this is better:

  • Streaming I/O with -ReadCount avoids giant strings in memory.
  • Faster regex thanks to precompilation in Begin.
  • Pipeline-friendly output (objects), including an optional summary object instead of host text.
  • Composable: Pipe into Sort-Object, Export-Csv, or further transforms.

Patterns and best practices for high-throughput pipelines

1) Emit objects, not host text

  • Prefer emitting [pscustomobject] from Process and End.
  • Use Write-Verbose, Write-Information, and Write-Warning for diagnostics.
  • Avoid Write-Host in library functions—it bypasses the pipeline and breaks composition.

2) Batch reads with -ReadCount

  • Get-Content -ReadCount N yields arrays of N lines, which reduces pipeline overhead.
  • Tune N empirically (e.g., 512–8192) for your data and machine. Larger batches reduce overhead but slightly increase memory.

3) Precompute in Begin

  • Precompile regular expressions: [regex]::new('pattern',[regexoptions]::Compiled).
  • Open reusable resources once: HTTP client, database connection, StreamWriter, or caches.
  • Initialize counters and structures (e.g., [System.Collections.Generic.Dictionary[string,int]]).

4) Be robust in Process

  • Resolve paths in Process (Resolve-Path -LiteralPath) to support relative input and predictable errors.
  • Wrap per-item work in try/catch and continue on failure to keep the stream alive.
  • Respect -WhatIf and -Confirm with [CmdletBinding(SupportsShouldProcess)] and $PSCmdlet.ShouldProcess() when mutating state.

5) Summarize in End

  • Emit a final object with totals or metrics; make it optional with a switch (-IncludeSummary).
  • Dispose anything you opened in Begin (files, DB connections, timers).

6) Keep it testable

  • Test Begin/Process/End behavior with small fixtures and Pester. Feed the function via pipeline in tests.
  • Assert both per-item outputs and summary output if enabled.

7) Real-world streaming examples

Log scanning with a compiled regex

function Find-LogPattern {
  [CmdletBinding()]
  param(
    [Parameter(Mandatory, ValueFromPipeline, ValueFromPipelineByPropertyName)]
    [string]$Path,
    [Parameter(Mandatory)]
    [string]$Pattern
  )
  Begin {
    $rx = [regex]::new($Pattern, [System.Text.RegularExpressions.RegexOptions]::IgnoreCase -bor [System.Text.RegularExpressions.RegexOptions]::Compiled)
  }
  Process {
    try {
      $resolved = Resolve-Path -LiteralPath $Path -ErrorAction Stop
      Get-Content -LiteralPath $resolved.Path -ReadCount 4096 -ErrorAction Stop | ForEach-Object {
        foreach ($line in $_) {
          if ($rx.IsMatch($line)) {
            [pscustomobject]@{ Path = $resolved.Path; Line = $line }
          }
        }
      }
    } catch {
      Write-Warning ("Skip {0}: {1}" -f $Path, $_.Exception.Message)
    }
  }
}

# Example
Get-ChildItem logs/*.log | Select-Object -ExpandProperty FullName | Find-LogPattern -Pattern 'error|fail|timeout' |
  Export-Csv matches.csv -NoTypeInformation

Scan gzip-compressed logs without extracting

Use GZipStream and StreamReader to stream lines directly from compressed files:

function Find-ErrorInGzip {
  [CmdletBinding()]
  param(
    [Parameter(Mandatory, ValueFromPipeline, ValueFromPipelineByPropertyName)]
    [string]$Path,
    [Parameter(Mandatory)]
    [string]$Pattern
  )

  Begin {
    $rx = [regex]::new($Pattern, [System.Text.RegularExpressions.RegexOptions]::IgnoreCase -bor [System.Text.RegularExpressions.RegexOptions]::Compiled)
  }

  Process {
    try {
      $resolved = Resolve-Path -LiteralPath $Path -ErrorAction Stop
      $fs = [System.IO.File]::OpenRead($resolved.Path)
      $gz = [System.IO.Compression.GZipStream]::new($fs, [System.IO.Compression.CompressionMode]::Decompress)
      $sr = [System.IO.StreamReader]::new($gz)
      try {
        while (($line = $sr.ReadLine()) -ne $null) {
          if ($rx.IsMatch($line)) {
            [pscustomobject]@{ Path = $resolved.Path; Line = $line }
          }
        }
      } finally {
        $sr.Dispose(); $gz.Dispose(); $fs.Dispose()
      }
    } catch {
      Write-Warning ("Skip {0}: {1}" -f $Path, $_.Exception.Message)
    }
  }
}

# Usage
Get-ChildItem logs/*.gz | Select-Object -ExpandProperty FullName | Find-ErrorInGzip -Pattern 'exception|500|timeout'

8) Performance and safety tips

  • Prefer typed parameters when possible for early validation and better tab-completion ([string], [uri], custom classes).
  • Use ValueFromPipelineByPropertyName to accept objects with a Path property directly from Get-ChildItem.
  • Guard memory by avoiding -Raw on huge files unless you truly need the entire string.
  • Throttle parallelism when using ForEach-Object -Parallel in PowerShell 7; streaming + uncontrolled parallelism can still exhaust memory.
  • Keep outputs predictable. Emit the same object type from Process and End (if any). Make summaries opt-in.
  • Measure with Measure-Command and monitor process memory (e.g., Get-Process pwsh) during test runs.

By leaning on Begin/Process/End, you prepare once, process each input as it arrives, and finalize cleanly. Your functions remain fast, testable, and pipeline-friendly—ideal for CI logs, data migrations, and any place you need reliable throughput under tight memory budgets.

← All Posts Home →