HiveBrain v1.2.0
Get Started
← Back to all entries
patternpythonMinor

Sorting almost sorted array with a minimum heap

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
sortingalmostwitharrayminimumheapsorted

Problem

I'm currently looking into the quickest way to sort an almost sorted array:


Given an array of \$n\$ elements, where each element is at most \$k\$ away
from its target position, devise an algorithm that sorts in \$O(n \log k)\$
time.

I've implemented a sorting function which "slides" through an input list, pushes the element from a sliding window into a min heap (using the heapq built-in module), pops the minimum element which is collected in the result list:

from typing import List
from heapq import heappush, heappop

def sort_almost_sorted(a: List[int], k: int) -> List[int]:
    if not a:
        return []

    length = len(a)
    if k >= length:
        return sorted(a)  # apply built-in "timsort", designed to be quick on almost sorted lists

    result = []
    min_heap = []  # maintain a min heap for a sliding window
    for slice_start in range(0, length, k + 1):  # sliding window
        # push the next slice to min heap
        for index in range(slice_start, min(slice_start + k + 1, length)):
            heappush(min_heap, a[index])

        result.append(heappop(min_heap))

    # put the rest of the heap into the result
    for _ in range(len(min_heap)):
        result.append(heappop(min_heap))

    return result


It works on my sample inputs.

Do you think I'm using the minimum heap appropriately and this implementation is \$O(n \log k)\$? What would you improve code-quality or code-organization wise?

Solution

Algorithm complexity

I am not 100% sure, but it looks like a fancy heapsort which runs (after all) in \$O(n \log n)\$. In order to get this intuition, consider

for slice_start in range(0, length, k + 1):  # sliding window
        # push the next slice to min heap
        for index in range(slice_start, min(slice_start + k + 1, length)):
            heappush(min_heap, a[index])

        result.append(heappop(min_heap))


You operate in chunks of length k + 1 array components (save for the very last chunk that may be shorter). For each chunk, you load it entirely to the heap, and then pop only one element from it to the result array. Finally, in

for _ in range(len(min_heap)):
        result.append(heappop(min_heap))


you handle the majority of elements via heap sort, and that's linearithmic worst case running time.

Perhaps this one?

# This sort runs in O(n log k + k log k),
# which simplifies to O(n log k) whenever k = o(n)
# (k is sublinear in n).
def window_heapsort(a, k):
    if k > len(a):
        return sorted(a)

    window_width = k + 1
    window_heap = []
    result_array = []
    index = 0

    # Initialize the window heap:
    # Runs in O(k log k)
    for i in range(window_width):
        heappush(window_heap, a[i])

    # Keep sliding the window over the array:
    # Here, the size of the heap alternates between
    # 'window_width' and 'window_width - 1', which is
    # O(k), and thus pops and pushes run in O(log k).
    # Since we march over (at most) n elements, the
    # complexity of this loop is O(n log k).
    while index + window_width < len(a):
        result_array.append(heappop(window_heap))
        heappush(window_heap, a[window_width + index])
        index += 1

    # Dump the leftovers to the end of the result array:
    # Runs in O(k log k)
    while window_heap:
        result_array.append(heappop(window_heap))

    return result_array


You can find the entire test snippet here. Hope that helps.

Code Snippets

for slice_start in range(0, length, k + 1):  # sliding window
        # push the next slice to min heap
        for index in range(slice_start, min(slice_start + k + 1, length)):
            heappush(min_heap, a[index])

        result.append(heappop(min_heap))
for _ in range(len(min_heap)):
        result.append(heappop(min_heap))
# This sort runs in O(n log k + k log k),
# which simplifies to O(n log k) whenever k = o(n)
# (k is sublinear in n).
def window_heapsort(a, k):
    if k > len(a):
        return sorted(a)

    window_width = k + 1
    window_heap = []
    result_array = []
    index = 0

    # Initialize the window heap:
    # Runs in O(k log k)
    for i in range(window_width):
        heappush(window_heap, a[i])

    # Keep sliding the window over the array:
    # Here, the size of the heap alternates between
    # 'window_width' and 'window_width - 1', which is
    # O(k), and thus pops and pushes run in O(log k).
    # Since we march over (at most) n elements, the
    # complexity of this loop is O(n log k).
    while index + window_width < len(a):
        result_array.append(heappop(window_heap))
        heappush(window_heap, a[window_width + index])
        index += 1

    # Dump the leftovers to the end of the result array:
    # Runs in O(k log k)
    while window_heap:
        result_array.append(heappop(window_heap))

    return result_array

Context

StackExchange Code Review Q#158642, answer score: 2

Revisions (0)

No revisions yet.