About This Blog

Including my content from SQLBlog.com and some from SQLPerformance.com

Tuesday 4 August 2020

SQL Server 2019 Aggregate Splitting

SQL Server 2019 Aggregate Splitting

The SQL Server 2019 query optimizer has a new trick available to improve the performance of large aggregations. The new exploration abilities are encoded in two new closely-related optimizer rules:

  • GbAggSplitToRanges
  • SelOnGbAggSplitToRanges

The extended event query_optimizer_batch_mode_agg_split is provided to track when this new optimization is considered. The description of this event is:

Occurs when the query optimizer detects batch mode aggregation is likely to spill and tries to split it into multiple smaller aggregations.

Other than that, this new feature hasn’t been documented yet. This article is intended to help fill that gap.

Overview

Given single large aggregate like:

SELECT column_name, COUNT_BIG(*)
FROM column_store_table
GROUP BY column_name;

If the optimizer detects that the aggregate is likely to spill:

Spilling large single aggregate

…it splits the input into multiple disjoint ranges.

The optimizer doesn’t write T-SQL, but if it did, the alternative generated might look as if the query had been written like this:

-- Range 1
SELECT column_name, COUNT_BIG(*)
FROM column_store_table
WHERE column_name < @boundary1
GROUP BY column_name

UNION ALL

-- Range 2
SELECT column_name, COUNT_BIG(*)
FROM column_store_table
WHERE column_name >= @boundary1
AND column_name < @boundary2
GROUP BY column_name

UNION ALL

-- Range 3
SELECT column_name, COUNT_BIG(*)
FROM column_store_table
WHERE column_name >= @boundary2
AND column_name < @boundary3
GROUP BY column_name

UNION ALL

-- Range 3
SELECT column_name, COUNT_BIG(*)
FROM column_store_table
WHERE column_name >= @boundary3
GROUP BY column_name;

An execution plan using split aggregates may look like this:

With aggregate splitting

Splitting the aggregate using disjoint ranges guarantees the same results as the original, but requires less memory.

Say the single aggregate needs 64GB of memory to avoid spilling. The four split aggregates might only need 16GB each.

The Concatenation operator reads all rows from its first input before moving on to the next one. This means the memory grant used by one Hash Match Aggregate can be reused by the next one.

The split plan would therefore use only 16GB memory in total. The 16GB memory grant is used four times, once for each aggregate.

The trade off

The lower memory requirement means split aggregates can complete without spilling (or at least with reduced spilling). This should improve performance, perhaps significantly if the original spill was large.

On the other side of the ledger, the split aggregate execution plan needs to read from the source table multiple times and apply the boundary predicate(s) each time.

The optimizer estimates the costs of each alternative (single versus split) and chooses the one that appears cheapest. Batch mode spills are pretty expensive, and column store scans are relatively cheap, so aggregate splitting can often be worthwhile considering.

Choices and implementation

The optimizer needs to consider a few things:

  • Would a single aggregate spill?
  • Can the aggregate be split?
  • How many splits to use?

The optimizer knows how much memory grant is potentially available to the query, and can estimate the size of the data to be aggregated. If it looks like the aggregate would need more memory than the current hardware and instance configuration allows, the optimizer concludes a spill would likely be needed.

In the current implementation, only simple aggregates on a clustered column store source can be considered for splitting. There should only be one grouping column of a numeric type (e.g. integer, decimal, real), with no expressions or conversions.

The number of splits used depends on how much the estimated memory needs of the single aggregate exceeds the maximum memory grant available. The minimum number of splits is two, and the current maximum is eight.

The optimizer uses the statistics histogram on the grouping column to split the range of input values into equal-sized chunks. If the grouping column allows NULL, this is included in the first split.

Trace flags

There are two main trace flags affecting this feature, both undocumented:

  • Trace flag 9424 sets the number of splits to four.
  • Trace flag 9426 disables the aggregate splitting feature.

Demo

The following code uses the Stack Overflow 2013 database, set to compatibility level 150, on SQL Server 2019 CU5 with max server memory set to 16GB.

The sample database doesn’t come with any column store objects, so our first step is to create one. To keep things simple, our test table will have a single integer column and full scan statistics:

SELECT B.UserId 
INTO dbo.Badges2 
FROM dbo.Badges AS B;

CREATE CLUSTERED COLUMNSTORE INDEX CCSI 
ON dbo.Badges2 
WITH (MAXDOP = 1);

CREATE STATISTICS UserId 
ON dbo.Badges2 (UserId) 
WITH FULLSCAN;

The Badges table contains 8,042,005 rows with 1,318,413 distinct values, and so does our copy.

The basic test query will be:

SELECT B.UserId, NumRows = COUNT_BIG(*)
FROM dbo.Badges2 AS B
GROUP BY B.UserId;

Tweaks and hints

We’ll need to add a few hints and other features to the basic query for a variety of reasons:

  • To keep memory calculations simple, we will disable parallelism.
  • The test table isn’t big enough to risk an aggregate spill, so we will artificially limit the maximum memory grant available using a hint.
  • The query produces a large output so we will suppress that by assigning to variables.
  • Serial batch mode hash aggregates don’t always report detailed spilling information. We will work around that annoying bug by introducing an additional row of data.

If you run the demo with a different table or amount of memory, you will need to adjust the maximum memory grant hint accordingly.

Test 1 - No splitting

In addition to the listed tweaks, this first test sets the optimizer compatibility level to 140 to disable aggregate splitting:

DECLARE 
    @UserId integer, 
    @NumRows bigint;

SELECT 
    -- Assign to variables to suppress output
    @UserId = B.UserId, 
    @NumRows = B.NumRows
FROM
(
    -- Test query
    SELECT B.UserId, NumRows = COUNT_BIG(*) 
    FROM dbo.Badges2 AS B
    GROUP BY B.UserId

    -- Workaround for incomplete spill details bug
    UNION ALL
    SELECT NULL, NULL
) AS B
OPTION 
(
    -- No parallelism
    MAXDOP 1, 
    -- Limit memory
    MAX_GRANT_PERCENT = 1.1, 
    -- No aggregate spilling
    USE HINT ('QUERY_OPTIMIZER_COMPATIBILITY_LEVEL_140')
);

The execution plan shows the single aggregate spilling (as expected):

Single aggregate spills

The spill details show 5,857 pages written to tempdb:

Single aggregate spill details

The query executes in 2363ms using 2151ms of CPU.

Test 2 - Default aggregate splitting

If we remove the optimizer compatibility level 140 hint to allow aggregate splitting (assuming database compatibility 150):

DECLARE 
    @UserId integer, 
    @NumRows bigint;

SELECT 
    -- Assign to variables to suppress output
    @UserId = B.UserId, 
    @NumRows = B.NumRows
FROM
(
    -- Test query
    SELECT B.UserId, NumRows = COUNT_BIG(*) 
    FROM dbo.Badges2 AS B
    GROUP BY B.UserId

    -- Workaround for spill details
    UNION ALL

    SELECT NULL, NULL
) AS B
OPTION 
(
    -- No parallelism
    MAXDOP 1, 
    -- Limit memory
    MAX_GRANT_PERCENT = 1.1
);

The optimizer decides to split the aggregate into two parts:

Aggregate split in two

The predicate on the top Columnstore Index Scan is:

[B].[UserId]<(645561)

The lower Columnstore Index Scan has:

[B].[UserId]>=(645561)

The optimizer split the data set into two equal parts based on the statistics histogram (more about this later).

The first of the split aggregates does not spill, but the second one does:

Split aggregate spill

The 2,969 pages used in tempdb is less than the 5,857 used by the single aggregate spill. The query executes in 1799ms using 1574ms of CPU on my laptop.

Test 3 - Four splits forced

The optimizer chose to split the aggregate in two parts, based on data size and memory grant estimations.

We can override that choice logic using trace flag 9424 to force four splits:

DECLARE 
    @UserId integer, 
    @NumRows bigint;

SELECT 
    -- Assign to variables to suppress output
    @UserId = B.UserId, 
    @NumRows = B.NumRows
FROM
(
    -- Test query
    SELECT B.UserId, NumRows = COUNT_BIG(*) 
    FROM dbo.Badges2 AS B
    GROUP BY B.UserId

    -- Workaround for spill details
    UNION ALL

    SELECT NULL, NULL
) AS B
OPTION 
(
    -- No parallelism
    MAXDOP 1, 
    -- Limit memory
    MAX_GRANT_PERCENT = 1.1,
    -- Force four splits
    QUERYTRACEON 9424
);

The execution plan now has four split aggregates:

Four split aggregates

The split ranges chosen by the optimizer (top to bottom) are:

[B].[UserId]<(214851)
[B].[UserId]>=(214851) AND [B].[UserId]<(645561)
[B].[UserId]>=(645561) AND [B].[UserId]<(1362736)
[B].[UserId]>=(1362736)

Once again, one of the split aggregates spills:

Four split aggregate spill

Only 825 pages were used in tempdb this time. The query executes in 1388ms using 1179ms of CPU.

Results for tests 1 - 3

Summarizing the results so far:

Test Splits Spill Pages Memory KB Elapsed ms CPU ms
1 n/a 5,857 33,224 2363 2151
2 2 2,969 33,224 1799 1574
3 4 825 33,224 1388 1179

Each test received the same total memory grant. Of the 33,224KB granted to the query as a whole 31,712 KB is available to each batch mode hash aggregate. (The rest of the query grant is largely accounted for by batch packets and the column store scan).

A Flaw in the Optimizer’s Reasoning?

Sharp-eyed readers may have noticed that the optimizer chose the split boundaries such that each scan of the base table was estimated to produce about the same number of rows.

As a reminder, from the four-split example:

Split aggregate estimates

Each split is estimated to produce about 2,010,500 rows.

The actual row counts match the estimates quite closely (and all are below), so why did only one of the aggregates spill?

Boundary point estimation

The optimizer splits the range into equal-sized sections by:

  • Dividing the table cardinality by the number of splits to get the target number of rows per partition.
  • Walking the histogram, adding equal rows and range rows until it gets close to the target number of rows.
  • Using linear interpolation if necessary in the last step to estimate the column key value needed to produce the required row count estimate.
  • Continuing to walk the histogram until all boundary values are found.

The optimizer’s boundary-setting approach might seem sensible, but it doesn’t match the way hash aggregates work (it would be valid for a hash join).

A hash aggregate maintains an aggregate in its hash table for each distinct value encountered. Subsequent rows with the same grouping key add to the existing accumulated value rather than making a new entry in the hash table.

So, the number of entries in an aggregate’s hash table isn’t proportional to the number of input rows — it is proportional to the number of distinct values encountered.

It appears that the optimizer ought to split the column store scan based on the expected number of distinct values, not row count. Assuming the goal is to split the size of the work evenly among the split aggregates.

Improving on the optimizer’s logic

Instead of using equal rows and range rows from the histogram, the optimizer could calculate a target number of distinct values per split partition, then walk the histogram accumulating distinct values.

The following code implements that logic for our test table (it doesn’t account for sampled stats to keep the code short, so full scan stats were deliberately created earlier):

SET NOCOUNT ON;
SET STATISTICS XML OFF;

DECLARE 
    @DistinctValues float =
    (
        SELECT 
            -- One distinct value per RANGE_HI_KEY
            COUNT_BIG(*) + 
            -- Distinct values between steps
            SUM(H.distinct_range_rows)
        FROM sys.dm_db_stats_histogram
            (OBJECT_ID(N'dbo.Badges2'), 2) AS H
    ),
    -- Target number of splits
    @SplitsTarget integer = 4,
    @SplitsDone integer = 1;

DECLARE
    @TargetValues float = @DistinctValues / @SplitsTarget,
    @CurrentValues float = 0;

-- Collects results as we find them
DECLARE @Boundaries table
(
    BoundaryValue float PRIMARY KEY
);

DECLARE 
    @Histogram CURSOR,
    @Key sql_variant,
    @iKey integer,
    @PreviousKey integer,
    @DistinctRangeRows float,
    @EqualRows float,
    @Fraction float;
    
SET @Histogram =
    CURSOR STATIC FORWARD_ONLY READ_ONLY
FOR
    SELECT H.range_high_key, H.distinct_range_rows, 1 AS equal_row
    FROM sys.dm_db_stats_histogram(OBJECT_ID('badges2'), 2) AS H
    ORDER BY H.range_high_key;

OPEN @Histogram;

FETCH @Histogram INTO @Key, @DistinctRangeRows, @EqualRows;

WHILE @@FETCH_STATUS = 0 AND @SplitsDone < @SplitsTarget
BEGIN
    SET @iKey = CONVERT(integer, @Key);

    -- Allocate from distinct range rows
    WHILE @DistinctRangeRows > 0
    BEGIN
        -- Used to interpolate if necessary
        SET @Fraction = (@TargetValues - @CurrentValues) / @DistinctRangeRows;
        -- Cap
        IF @Fraction > 1 SET @Fraction = 1;

        -- Allocate
        SET @CurrentValues += @Fraction * @DistinctRangeRows;
        SET @DistinctRangeRows -= @Fraction * @DistinctRangeRows;

        -- Current range complete?
        IF @CurrentValues >= @TargetValues
        BEGIN
            -- Compute key value
            INSERT @Boundaries (BoundaryValue) 
            VALUES (@PreviousKey + (@Fraction * (@iKey - @PreviousKey)));

            -- Adjust current
            SET @CurrentValues -= @TargetValues;

            -- Done a range
            SET @SplitsDone += 1;
        END;
    END;

     -- Allocate equal row
    SET @CurrentValues += 1;

    -- Range complete?
    IF @CurrentValues >= @TargetValues
    BEGIN
        INSERT @Boundaries (BoundaryValue) 
        VALUES (@iKey);

        -- Adjust current
        SET @CurrentValues -= @TargetValues;

        -- Done a range
        SET @SplitsDone += 1;
    END;
    
    SET @PreviousKey = @iKey;

    FETCH @Histogram INTO @Key, @DistinctRangeRows, @EqualRows;
END;

SELECT
    BoundaryValue = CEILING(B.BoundaryValue)
FROM @Boundaries AS B;

The output of that script for the test table is:

Boundary Value
693598
1354956
2101933

We can use these values to split the aggregates manually.

Test 4 - Manual splitting using histogram distinct values

The test query with manually-split aggregates is:

DECLARE 
    @UserId integer, 
    @NumRows bigint;

SELECT 
    -- Assign to variables to suppress output
    @UserId = B.UserId, 
    @NumRows = B.NumRows
FROM
(
    -- Test query

    -- Manual split #1
    SELECT B.UserId, COUNT_BIG(*) 
    FROM dbo.Badges2 AS B 
    WHERE B.UserId < 693598 
    GROUP BY B.UserId
    
    UNION ALL
    
    -- Manual split #2
    SELECT B.UserId, COUNT_BIG(*) 
    FROM dbo.Badges2 AS B 
    WHERE B.UserId >= 693598 
    AND B.UserId < 1354956 
    GROUP BY B.UserId

    UNION ALL

    -- Manual split #3
    SELECT B.UserId, COUNT_BIG(*) 
    FROM dbo.Badges2 AS B 
    WHERE B.UserId >= 1354956 
    AND B.UserId < 2101933 
    GROUP BY B.UserId
    
    UNION ALL
    
    -- Manual split #4
    SELECT B.UserId, COUNT_BIG(*) 
    FROM dbo.Badges2 AS B 
    WHERE B.UserId >= 2101933 
    GROUP BY B.UserId

    -- Workaround for spill details
    UNION ALL

    SELECT NULL, NULL

) AS B (UserId, NumRows)
OPTION 
(
    -- No parallelism
    MAXDOP 1, 
    -- Limit memory
    MAX_GRANT_PERCENT = 1.1,
    -- Disable aggregate splitting
    USE HINT ('QUERY_OPTIMIZER_COMPATIBILITY_LEVEL_140')
);

The execution plan is:

Manually-split aggregates

Notice the row count estimates on the Columnstore Index Scan operators now differ substantially from each other, because we are targeting an equal number of distinct values, not an equal number of rows.

The estimates on the Hash Match Aggregates output are all 329,603 or 329,604. This shows the histogram script was successful in achieving an equal split of distinct values.

None of the aggregates spill, and the query completes in 928ms using 927ms of CPU.

Test performance summary

Updating the results table to include the manual split results (test 4):

Test Splits Spill Pages Memory KB Elapsed ms CPU ms
1 n/a 5,857 33,224 2363 2151
2 2 2,969 33,224 1799 1574
3 4 825 33,224 1388 1179
4 4 0 33,224 928 927

Clearly the manual split gave a better result here, on all metrics.

Note that manual splitting isn’t magic. It cannot prevent spills if there simply isn’t enough memory, but it can help ensure the work is evenly split among the aggregates.

Not Just Batch Mode Hash Aggregates

Although this feature is aimed at batch mode hash aggregates, the logical operator tree generated by the new optimizer rule is still explored and tested with various implementation options as is usual for a cost-based optimizer.

If lower-cost alternatives exist, the logical split aggregates may still be present, but not implemented using physical hash aggregates. The optimizer may also choose not to use batch mode execution.

To demonstrate, let’s add a nonclustered b-tree index to our test column store table:

CREATE NONCLUSTERED INDEX i 
ON dbo.Badges2 (UserId);

…and add an ORDER BY clause to our test query:

DECLARE 
    @UserId integer, 
    @NumRows bigint;

SELECT 
    -- Assign to variables to suppress output
    @UserId = B.UserId, 
    @NumRows = B.NumRows
FROM
(
    -- Test query
    SELECT B.UserId, NumRows = COUNT_BIG(*) 
    FROM dbo.Badges2 AS B
    GROUP BY B.UserId

    -- Workaround for spill details
    UNION ALL

    SELECT NULL, NULL
) AS B
-- This is new
ORDER BY 
    B.UserId ASC
OPTION 
(
    -- No parallelism
    MAXDOP 1, 
    -- Limit memory
    MAX_GRANT_PERCENT = 1.1
);

The execution plan is:

Row mode stream aggregate splitting

The aggregates have been split, but there are no hash aggregates in this plan, and no batch-mode operators at all.

Running the query with TF 9424 to give four splits:

Row mode stream aggregate 4-way split

Final Thoughts

This is a potentially very useful addition to the query optimizer in SQL Server 2019 when dealing with aggregates larger than available memory grant.

If you see multiple similar aggregates in a plan, with their results gathered together using one or more concatenation operators, and disjoint range predicates below, you can be pretty sure the plan was generated using this mechanism.

If you want to be sure, use the extended event query_optimizer_batch_mode_agg_split as noted earlier. Example output shown below:

Extended event output

It is a shame the optimizer estimates are based on splitting the data in equal-sized row count ranges rather than using distinct values.

I suppose it is just about possible this decision was taken deliberately after testing on a range of workloads, but really I doubt it. It does seem much more like an implementation oversight. Perhaps it will be corrected in future builds.

The range of aggregates that qualify for these exploration rule might also be expanded in future.

This new feature is in addition to other documented and undocumented hash aggregate performance optimizations, including grouped aggregate pushdown.

Thanks to Forrest McDaniel who prompted me to write about this.

2 comments:

  1. Hi Paul
    Thank you for this post, really useful info.
    Just one thought - I think there is a small typo in the first code snippet in the "Test 1 - No splitting" section. It says the ('QUERY_OPTIMIZER_COMPATIBILITY_LEVEL_140') hint controls "No aggregate spilling", but I believe this was meant to be "No aggregate splitting".

    Also, one question - how did you come up with the MAX_GRANT_PERCENT of 1.1% ? Just fine-tuning the threshold based on the minimum memory this query requires ?

    Regards
    Karch (Eugene Karpovich)

    ReplyDelete
    Replies
    1. Code comment typo! Thank you, I have fixed that.

      Yes the MAX_GRANT_PERCENT setting was adjusted by hand. Not the *minimum* memory though, the *desired* memory. In any case, it is to simulate a hashing operation too large to perform entirely in memory, given the current hardware and configuration.

      Delete

All comments are reviewed before publication.