Skip to content

Commit a886a57

Browse files
authored
fix(parquet/pqarrow): selective column reading of complex map column (#668)
### Rationale for this change Upstream fix for the issue identified in apache/iceberg-go#737. When reading maps with nested values using column indices for selective column reading, if the child fields of the map weren't in the list of indices there was a problem: - Maps are represented in Parquet as a list of key-value structs (`list<struct<key, value>>` - The struct *MUST* have exactly 2 fields (key and value) to be converted into a proper Arrow typed Map column - When applying the column filtering, if only the key *OR* value field (but not both) were in the list of columns, the resulting child struct would only have 1 field - As a result, the `Map.validateData()` method would fail with a panic of `arrow/array: map array child array should have two fields`. ### What changes are included in this PR? In pqarrow/file_reader.go leaf filtering is disabled when reading a map's key-value struct. This will ensure both the key and value columns are always read together, maintaining the required 2-field structure for map array. ### Are these changes tested? Yes a test case is added for the change. ### Are there any user-facing changes? This only affects map type reading with column filter selection ensuring correctness. The only change is that a failure mode has been eliminated.
1 parent dcb85bf commit a886a57

2 files changed

Lines changed: 140 additions & 1 deletion

File tree

parquet/pqarrow/file_reader.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -606,7 +606,22 @@ func (fr *FileReader) getReader(ctx context.Context, field *SchemaField, arrowFi
606606
out = newStructReader(&rctx, &filtered, field.LevelInfo, childReaders, fr.Props)
607607
case arrow.LIST, arrow.FIXED_SIZE_LIST, arrow.MAP:
608608
child := field.Children[0]
609-
childReader, err := fr.getReader(ctx, &child, *child.Field)
609+
610+
// For maps, we must read BOTH key and value columns, regardless of column selection.
611+
// Map arrays require a complete key-value struct with exactly 2 fields.
612+
// Disable leaf filtering when reading a map's key-value struct.
613+
childCtx := ctx
614+
if _, isMap := arrowField.Type.(*arrow.MapType); isMap {
615+
childCtx = context.WithValue(ctx, rdrCtxKey{}, readerCtx{
616+
rdr: rctx.rdr,
617+
mem: rctx.mem,
618+
colFactory: rctx.colFactory,
619+
filterLeaves: false, // Don't filter leaves for map key-value struct
620+
includedLeaves: rctx.includedLeaves,
621+
})
622+
}
623+
624+
childReader, err := fr.getReader(childCtx, &child, *child.Field)
610625
if err != nil {
611626
return nil, err
612627
}

parquet/pqarrow/file_reader_test.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -652,3 +652,127 @@ func TestPartialStructColumnRead(t *testing.T) {
652652
cArr := arr.Field(1).(*array.Float64)
653653
require.Equal(t, 3.0, cArr.Value(0))
654654
}
655+
656+
// TestMapColumnWithFilters tests that map columns can be read correctly when
657+
// using column filtering. This is a regression test for a bug where reading
658+
// a map column with filters would fail because the code tried to filter the
659+
// individual key and value columns of the map's internal key-value struct.
660+
// Maps require both key and value columns to be read together, so leaf filtering
661+
// must be disabled for map types.
662+
func TestMapColumnWithFilters(t *testing.T) {
663+
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
664+
defer mem.AssertSize(t, 0)
665+
666+
// Create schema with a map column and other columns
667+
schema := arrow.NewSchema([]arrow.Field{
668+
{Name: "id", Type: arrow.PrimitiveTypes.Int64},
669+
{Name: "properties", Type: arrow.MapOf(arrow.BinaryTypes.String, arrow.PrimitiveTypes.Int32)},
670+
{Name: "value", Type: arrow.PrimitiveTypes.Float64},
671+
}, nil)
672+
673+
// Build test data
674+
b := array.NewRecordBuilder(mem, schema)
675+
defer b.Release()
676+
677+
// Build ID column
678+
idBuilder := b.Field(0).(*array.Int64Builder)
679+
idBuilder.AppendValues([]int64{1, 2}, nil)
680+
681+
// Build map column
682+
mapBuilder := b.Field(1).(*array.MapBuilder)
683+
kb := mapBuilder.KeyBuilder().(*array.StringBuilder)
684+
vb := mapBuilder.ItemBuilder().(*array.Int32Builder)
685+
686+
// First map: {"key1": 100, "key2": 200}
687+
mapBuilder.Append(true)
688+
kb.AppendValues([]string{"key1", "key2"}, nil)
689+
vb.AppendValues([]int32{100, 200}, nil)
690+
691+
// Second map: {"key3": 300}
692+
mapBuilder.Append(true)
693+
kb.AppendValues([]string{"key3"}, nil)
694+
vb.AppendValues([]int32{300}, nil)
695+
696+
// Build value column
697+
valueBuilder := b.Field(2).(*array.Float64Builder)
698+
valueBuilder.AppendValues([]float64{1.5, 2.5}, nil)
699+
700+
rec := b.NewRecordBatch()
701+
defer rec.Release()
702+
703+
// Write to parquet
704+
buf := new(bytes.Buffer)
705+
writer, err := pqarrow.NewFileWriter(schema, buf, nil, pqarrow.DefaultWriterProps())
706+
require.NoError(t, err)
707+
require.NoError(t, writer.Write(rec))
708+
require.NoError(t, writer.Close())
709+
710+
// Read back with column filtering (only read id and properties, skip value)
711+
pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
712+
require.NoError(t, err)
713+
defer pf.Close()
714+
715+
fr, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{}, mem)
716+
require.NoError(t, err)
717+
718+
// Read only columns for the first two fields (id and map)
719+
// Column 0 = id
720+
// Column 1 = map.key
721+
// Column 2 = map.value
722+
// Column 3 = value (skipped)
723+
// This exercises the code path where maps need both key and value columns
724+
// even when column filtering is active
725+
ctx := context.Background()
726+
colIndices := []int{0, 1, 2} // id, map.key, map.value
727+
rr, err := fr.GetRecordReader(ctx, colIndices, nil)
728+
require.NoError(t, err)
729+
require.NotNil(t, rr)
730+
defer rr.Release()
731+
732+
// Read the record batch
733+
require.True(t, rr.Next())
734+
result := rr.RecordBatch()
735+
// Note: Don't release result manually - the record reader owns it
736+
737+
// Verify schema - should have only 2 fields (id and properties)
738+
require.Equal(t, 2, int(result.NumCols()))
739+
require.Equal(t, "id", result.Schema().Field(0).Name)
740+
require.Equal(t, "properties", result.Schema().Field(1).Name)
741+
742+
// Verify ID column
743+
idCol := result.Column(0).(*array.Int64)
744+
require.Equal(t, int64(1), idCol.Value(0))
745+
require.Equal(t, int64(2), idCol.Value(1))
746+
747+
// Verify map column - this is the critical test for the fix
748+
// The key test is that reading succeeds without panic
749+
mapCol := result.Column(1).(*array.Map)
750+
require.Equal(t, 2, mapCol.Len())
751+
752+
// Verify the map has the correct structure (keys and items arrays exist)
753+
keys := mapCol.Keys().(*array.String)
754+
vals := mapCol.Items().(*array.Int32)
755+
require.NotNil(t, keys)
756+
require.NotNil(t, vals)
757+
758+
// Verify total number of key-value pairs across all maps
759+
require.Equal(t, 3, keys.Len()) // Total: 2 from first map + 1 from second map
760+
require.Equal(t, 3, vals.Len())
761+
762+
// Verify the map offsets are correct
763+
start0, end0 := mapCol.ValueOffsets(0)
764+
require.Equal(t, int64(0), start0)
765+
require.Equal(t, int64(2), end0) // First map has entries from 0 to 2 (2 entries)
766+
767+
start1, end1 := mapCol.ValueOffsets(1)
768+
require.Equal(t, int64(2), start1)
769+
require.Equal(t, int64(3), end1) // Second map has entries from 2 to 3 (1 entry)
770+
771+
// Verify key-value pairs
772+
require.Equal(t, "key1", keys.Value(0))
773+
require.Equal(t, int32(100), vals.Value(0))
774+
require.Equal(t, "key2", keys.Value(1))
775+
require.Equal(t, int32(200), vals.Value(1))
776+
require.Equal(t, "key3", keys.Value(2))
777+
require.Equal(t, int32(300), vals.Value(2))
778+
}

0 commit comments

Comments
 (0)