Background

I'm using Flow to process a large, bounded data stream (on the order of 20 million records in a file).

I want to accumulate rows up to a threshold, do a batch insertion, and then reset the accumulator. Repeat until the entire stream has been processed. All the while, I need to accumulate the total number of rows inserted.

I've found (through a very helpful error message from Flow) that I need to use Flow.on_trigger/2 both to do my final batch inserts and to calculate the total number of rows inserted.



I've developed a pipeline that works, but it feels clumsy. I want to see if I can improve it.

Flow Pipeline

Here is my initial pipeline, with the accumulator setup:

  defmodule Accum do
    defstruct [
      total_records: 0,
      rows: [],
      row_count: 0,
      insert_length: @insert_length,
      schema: nil
    ]
  end

  def initial_acc_fn(schema), do: fn -> %Accum{schema: schema} end

  def ingest(txt_file_path, schema, to_row_fn) do
    File.stream!(txt_file_path, read_ahead: 100_000)
    |> Flow.from_enumerable([stages: @stage_count])
    |> Flow.map(to_row_fn)
    |> Flow.reduce(initial_acc_fn(schema), &handle_reduce(&1, &2))
    |> Flow.on_trigger(&handle_trigger/1)
    |> Enum.to_list()
    |> Enum.sum()
  end

So what's going on here is I am creating row params from each line of text in the file (using Flow.map/2), and then using Flow.reduce/3 to perform batch insertions of a collection of row params.

Then, Flow.on_trigger/2 performs final cleanup on the :done trigger, which fires at the end of the stream (once for each stage). Then Enum.sum returns the total number of inserted rows over the stream.

The reduce callback

Here are the reducing function heads. We use pattern matching on row_count == insert_length to do work, otherwise we just accumulate rows:

  defp handle_reduce(row, %{row_count: insert_length, insert_length: insert_length} = acc) do
    {num_inserted, nil} = Repo.insert_all(acc.schema, acc.rows)
    
    total_records = acc.total_records + num_inserted

    acc
    |> Map.put(:rows, [row])
    |> Map.put(:row_count, 1)
    |> Map.put(:total_records, total_records)
  end
  defp handle_reduce(row, %{rows: rows, row_count: row_count} = acc, _schema) do
    acc
    |> Map.put(:rows, [row|rows])
    |> Map.put(:row_count, row_count + 1)
  end  

So this works, it's not too bad code-wise, I don't think, but could perhaps be cleared up a bit. The reduce function is doing a few different things: accumulation, insertion, summation, and accumulator "resets".

The on_trigger callback

We also have an on_trigger callback, which only runs once, on the :done trigger (well, this trigger actually runs once for each Flow stage as they reach the end of the stream).

This function also has to do a batch insertion of any leftover rows. Then it emits the total row count.

  defp handle_trigger(%Accum{rows: rows, total_records: total} = acc) do
    {num_inserted, nil} = Repo.insert_all(acc.schema, rows)
    {[total + num_inserted], nil}
  end

So I think overall this code is ok. It works, and there are no obvious optimizations or refactors to make. At least, not if you don't know much about Flow yet.

I do want to know a little more about these Flow windows and triggers, though. I am using this :done trigger, and I know that windows can be used to generate other kinds of triggers. Maybe some more knowledge will be useful.

In my next post, we're going to learn a little bit about windows and triggers, and we'll illustrate how to use them to perform a batch inserts on a bounded stream.