Skip to content

Commit a1d883e

Browse files
authored
feat(cli): add --schema-from-file flag to infer table schema from parquet files (#927)
Added a new --schema-from-file flag to `iceberg create table` that infers the table schema from a local data file. Currently supports Parquet (.parquet, .parq), with the dispatch designed to support additional formats (e.g. Avro, ORC) in the future. When a Parquet file contains Iceberg field IDs, they are preserved. Files without field IDs are assigned fresh sequential IDs. The flag is mutually exclusive with --schema, and the inferred schema is printed before table creation for user verification.
1 parent 41f5c54 commit a1d883e

1 file changed

Lines changed: 83 additions & 5 deletions

File tree

cmd/iceberg/main.go

Lines changed: 83 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,12 @@ import (
2323
"fmt"
2424
"log"
2525
"os"
26+
"path/filepath"
2627
"strings"
2728

29+
"github.com/apache/arrow-go/v18/arrow/memory"
30+
"github.com/apache/arrow-go/v18/parquet/file"
31+
"github.com/apache/arrow-go/v18/parquet/pqarrow"
2832
"github.com/apache/iceberg-go"
2933
"github.com/apache/iceberg-go/catalog"
3034
"github.com/apache/iceberg-go/catalog/glue"
@@ -90,6 +94,8 @@ Options:
9094
--location-uri TEXT specify a location URI for the namespace
9195
--schema JSON specify table schema in json (for create table use only)
9296
Ex: [{"name":"id","type":"int","required":false,"doc":"unique id"}]
97+
--infer-schema FILE infer table schema from a local data file (for create table use only)
98+
Supported formats: parquet
9399
--properties TEXT specify table properties in key=value format (for create table use only)
94100
Ex:"format-version=2,write.format.default=parquet"
95101
--partition-spec TEXT specify partition spec as comma-separated field names(for create table use only)
@@ -144,6 +150,7 @@ type Config struct {
144150
Description string `docopt:"--description"`
145151
LocationURI string `docopt:"--location-uri"`
146152
SchemaStr string `docopt:"--schema"`
153+
InferSchema string `docopt:"--infer-schema"`
147154
TableProps string `docopt:"--properties"`
148155
PartitionSpec string `docopt:"--partition-spec"`
149156
SortOrder string `docopt:"--sort-order"`
@@ -322,14 +329,35 @@ func main() {
322329
}
323330
output.Text("Namespace " + cfg.Ident + " created successfully")
324331
case cfg.Table:
325-
if cfg.SchemaStr == "" {
326-
output.Error(errors.New("missing --schema for table creation"))
332+
if cfg.SchemaStr != "" && cfg.InferSchema != "" {
333+
output.Error(errors.New("--schema and --infer-schema are mutually exclusive"))
327334
os.Exit(1)
328335
}
329336

330-
schema, err := iceberg.NewSchemaFromJsonFields(0, cfg.SchemaStr)
331-
if err != nil {
332-
output.Error(err)
337+
var schema *iceberg.Schema
338+
339+
switch {
340+
case cfg.SchemaStr != "":
341+
var err error
342+
343+
schema, err = iceberg.NewSchemaFromJsonFields(0, cfg.SchemaStr)
344+
if err != nil {
345+
output.Error(err)
346+
os.Exit(1)
347+
}
348+
case cfg.InferSchema != "":
349+
var err error
350+
351+
schema, err = schemaFromFile(cfg.InferSchema)
352+
if err != nil {
353+
output.Error(err)
354+
os.Exit(1)
355+
}
356+
357+
output.Text("Inferred schema from " + cfg.InferSchema + ":")
358+
output.Schema(schema)
359+
default:
360+
output.Error(errors.New("missing --schema or --infer-schema for table creation"))
333361
os.Exit(1)
334362
}
335363

@@ -569,3 +597,53 @@ func mergeConf(fileConf *config.CatalogConfig, resConfig *Config, explicitFlags
569597
resConfig.RestOptions = fileConf.RestOptions
570598
}
571599
}
600+
601+
func schemaFromFile(path string) (*iceberg.Schema, error) {
602+
ext := strings.ToLower(filepath.Ext(path))
603+
604+
switch ext {
605+
case ".parquet", ".parq":
606+
return schemaFromParquetFile(path)
607+
default:
608+
return nil, fmt.Errorf("unsupported file format %s for %s: only .parquet and .parq files are supported", ext, path)
609+
}
610+
}
611+
612+
func schemaFromParquetFile(path string) (*iceberg.Schema, error) {
613+
f, err := os.Open(path)
614+
if err != nil {
615+
return nil, fmt.Errorf("failed to open file: %w", err)
616+
}
617+
618+
pqReader, err := file.NewParquetReader(f)
619+
if err != nil {
620+
f.Close()
621+
622+
return nil, fmt.Errorf("failed to read parquet file: %w", err)
623+
}
624+
defer pqReader.Close() // also closes underlying file
625+
626+
arrowReader, err := pqarrow.NewFileReader(pqReader, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
627+
if err != nil {
628+
return nil, fmt.Errorf("failed to read parquet schema: %w", err)
629+
}
630+
631+
arrowSchema, err := arrowReader.Schema()
632+
if err != nil {
633+
return nil, fmt.Errorf("failed to get arrow schema: %w", err)
634+
}
635+
636+
// Prefer existing field IDs from the Parquet file (written by Iceberg-aware
637+
// tools like Spark or PyIceberg). Fall back to fresh sequential IDs only
638+
// when the error is specifically about missing field IDs.
639+
schema, err := table.ArrowSchemaToIceberg(arrowSchema, true, nil)
640+
if err != nil {
641+
if errors.Is(err, iceberg.ErrInvalidSchema) && strings.Contains(err.Error(), "field-id") {
642+
return table.ArrowSchemaToIcebergWithFreshIDs(arrowSchema, true)
643+
}
644+
645+
return nil, fmt.Errorf("failed to convert parquet schema to iceberg: %w", err)
646+
}
647+
648+
return schema, nil
649+
}

0 commit comments

Comments
 (0)