From ee6970eff139c58a45998a87c02b661f32be5cbe Mon Sep 17 00:00:00 2001 From: Matthias Loibl Date: Fri, 15 Dec 2023 16:51:49 +0100 Subject: [PATCH] pqarrow/arrowutils: Add SortRecord and ReorderRecord (#628) * pqarrow/arrowutils: Add SortRecord and ReorderRecord This is extract from a previous PR #461. * pqarrow/arrowutils: Update SortRecord to allow for multiple sort columns This isn't implemented yet, just the function signature is future proof. * pqarrow/arrowutils: Use compute.Take for ReorderRecord * pqarrow/arrowutils: Add support for sorting NULL NULL always gets sorted to the back. This seems to be the default for other language implementations. It can be made configurable in the future. * Update pqarrow/arrowutils/sort.go Co-authored-by: Geofrey Ernest * Update pqarrow/arrowutils/sort.go Co-authored-by: Geofrey Ernest * Update pqarrow/arrowutils/sort.go Co-authored-by: Geofrey Ernest * Update pqarrow/arrowutils/sort.go Co-authored-by: Geofrey Ernest * Update pqarrow/arrowutils/sort.go Co-authored-by: Geofrey Ernest * pqarrow/arrowutils: Remove sorting *array.Binary This isn't properly unit tested and was more of an experiment. * pqarrow/arrowutils: Add context and reserve indices length --------- Co-authored-by: Geofrey Ernest --- go.mod | 1 + go.sum | 4 +- pqarrow/arrowutils/sort.go | 94 +++++++++++++++++++++++++++++++++ pqarrow/arrowutils/sort_test.go | 83 +++++++++++++++++++++++++++++ 4 files changed, 180 insertions(+), 2 deletions(-) create mode 100644 pqarrow/arrowutils/sort.go create mode 100644 pqarrow/arrowutils/sort_test.go diff --git a/go.mod b/go.mod index 1f4c7b016..fdc58a5c0 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( ) require ( + github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/benbjohnson/clock v1.3.5 // indirect github.com/benbjohnson/immutable v0.4.0 // indirect diff --git a/go.sum b/go.sum index 2f84ddda6..1282c14b5 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU= +github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk= github.com/RoaringBitmap/roaring v0.9.4 h1:ckvZSX5gwCRaJYBNe7syNawCU5oruY9gQmjXlp4riwo= github.com/RoaringBitmap/roaring v0.9.4/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= @@ -79,8 +81,6 @@ github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= -github.com/parquet-go/parquet-go v0.19.1-0.20231129084429-9010539a4f7a h1:NxS5GxNgZa5nJeLjJFidbzhwn+YuhdV5pXHtOw7VKB8= -github.com/parquet-go/parquet-go v0.19.1-0.20231129084429-9010539a4f7a/go.mod h1:4YfUo8TkoGoqwzhA/joZKZ8f77wSMShOLHESY4Ys0bY= github.com/parquet-go/parquet-go v0.20.0 h1:a6tV5XudF893P1FMuyp01zSReXbBelquKQgRxBgJ29w= github.com/parquet-go/parquet-go v0.20.0/go.mod h1:4YfUo8TkoGoqwzhA/joZKZ8f77wSMShOLHESY4Ys0bY= github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= diff --git a/pqarrow/arrowutils/sort.go b/pqarrow/arrowutils/sort.go new file mode 100644 index 000000000..e4e3f7fab --- /dev/null +++ b/pqarrow/arrowutils/sort.go @@ -0,0 +1,94 @@ +package arrowutils + +import ( + "context" + "fmt" + "sort" + + "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v14/arrow/compute" + "github.com/apache/arrow/go/v14/arrow/memory" +) + +// SortRecord sorts the given record's rows by the given column. Currently only supports int64, string and binary columns. +func SortRecord(mem memory.Allocator, r arrow.Record, cols []int) (*array.Int64, error) { + if len(cols) > 1 { + return nil, fmt.Errorf("sorting by multiple columns isn't implemented yet") + } + indicesBuilder := array.NewInt64Builder(mem) + + if r.NumRows() == 0 { + return indicesBuilder.NewInt64Array(), nil + } + if r.NumRows() == 1 { + indicesBuilder.Append(0) + return indicesBuilder.NewInt64Array(), nil + } + + indices := make([]int64, r.NumRows()) + // populate indices + for i := range indices { + indices[i] = int64(i) + } + + switch c := r.Column(cols[0]).(type) { + case *array.Int64: + sort.Sort(orderedSorter[int64]{array: c, indices: indices}) + case *array.String: + sort.Sort(orderedSorter[string]{array: c, indices: indices}) + default: + return nil, fmt.Errorf("unsupported column type for sorting %T", c) + } + + indicesBuilder.Reserve(len(indices)) + for _, i := range indices { + indicesBuilder.Append(i) + } + + return indicesBuilder.NewInt64Array(), nil +} + +// ReorderRecord reorders the given record's rows by the given indices. +// This is a wrapper around compute.Take which handles the type castings. +func ReorderRecord(ctx context.Context, r arrow.Record, indices arrow.Array) (arrow.Record, error) { + res, err := compute.Take( + ctx, + *compute.DefaultTakeOptions(), + compute.NewDatum(r), + compute.NewDatum(indices), + ) + if err != nil { + return nil, err + } + return res.(*compute.RecordDatum).Value, nil +} + +type orderedArray[T int64 | float64 | string] interface { + Value(int) T + IsNull(int) bool + Len() int +} + +type orderedSorter[T int64 | float64 | string] struct { + array orderedArray[T] + indices []int64 +} + +func (s orderedSorter[T]) Len() int { + return s.array.Len() +} + +func (s orderedSorter[T]) Less(i, j int) bool { + if s.array.IsNull(int(s.indices[i])) { + return false + } + if s.array.IsNull(int(s.indices[j])) { + return true + } + return s.array.Value(int(s.indices[i])) < s.array.Value(int(s.indices[j])) +} + +func (s orderedSorter[T]) Swap(i, j int) { + s.indices[i], s.indices[j] = s.indices[j], s.indices[i] +} diff --git a/pqarrow/arrowutils/sort_test.go b/pqarrow/arrowutils/sort_test.go new file mode 100644 index 000000000..8f94586cb --- /dev/null +++ b/pqarrow/arrowutils/sort_test.go @@ -0,0 +1,83 @@ +package arrowutils + +import ( + "context" + "testing" + + "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v14/arrow/memory" + "github.com/stretchr/testify/require" +) + +func TestSortRecord(t *testing.T) { + ctx := context.Background() + schema := arrow.NewSchema( + []arrow.Field{ + {Name: "int", Type: arrow.PrimitiveTypes.Int64}, + {Name: "string", Type: arrow.BinaryTypes.String}, + }, + nil, + ) + + mem := memory.DefaultAllocator + ib := array.NewInt64Builder(mem) + ib.Append(0) + ib.AppendNull() + ib.Append(3) + ib.Append(5) + ib.Append(1) + + sb := array.NewStringBuilder(mem) + sb.Append("d") + sb.Append("c") + sb.Append("b") + sb.AppendNull() + sb.Append("a") + + record := array.NewRecord(schema, []arrow.Array{ib.NewArray(), sb.NewArray()}, int64(5)) + + // Sort the record by the first column - int64 + { + sortedIndices, err := SortRecord(mem, record, []int{record.Schema().FieldIndices("int")[0]}) + require.NoError(t, err) + require.Equal(t, []int64{0, 4, 2, 3, 1}, sortedIndices.Int64Values()) + + sortedByInts, err := ReorderRecord(ctx, record, sortedIndices) + require.NoError(t, err) + + // check that the column got sortedIndices + intCol := sortedByInts.Column(0).(*array.Int64) + require.Equal(t, []int64{0, 1, 3, 5, 0}, intCol.Int64Values()) + require.True(t, intCol.IsNull(intCol.Len()-1)) // last is NULL + // make sure the other column got updated too + strings := make([]string, sortedByInts.NumRows()) + stringCol := sortedByInts.Column(1).(*array.String) + for i := 0; i < int(sortedByInts.NumRows()); i++ { + strings[i] = stringCol.Value(i) + } + require.Equal(t, []string{"d", "a", "b", "", "c"}, strings) + } + + // Sort the record by the second column - string + { + sortedIndices, err := SortRecord(mem, record, []int{record.Schema().FieldIndices("string")[0]}) + require.NoError(t, err) + require.Equal(t, []int64{4, 2, 1, 0, 3}, sortedIndices.Int64Values()) + + sortedByStrings, err := ReorderRecord(ctx, record, sortedIndices) + require.NoError(t, err) + + // check that the column got sortedByInts + intCol := sortedByStrings.Column(0).(*array.Int64) + require.Equal(t, []int64{1, 3, 0, 0, 5}, intCol.Int64Values()) + // make sure the other column got updated too + strings := make([]string, sortedByStrings.NumRows()) + stringCol := sortedByStrings.Column(1).(*array.String) + for i := 0; i < int(sortedByStrings.NumRows()); i++ { + strings[i] = stringCol.Value(i) + } + require.Equal(t, []string{"a", "b", "c", "d", ""}, strings) + require.True(t, stringCol.IsNull(stringCol.Len()-1)) // last is NULL + } +}