Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions arrow/ipc/file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,12 @@ func (p *Payload) SerializeBody(w io.Writer) error {
return nil
}

// WritePayload serializes the payload in IPC format
// into the provided writer.
func (p *Payload) WritePayload(w io.Writer) (int, error) {
return writeIPCPayload(w, *p)
}

func (p *Payload) Release() {
if p.meta != nil {
p.meta.Release()
Expand Down
1 change: 1 addition & 0 deletions arrow/ipc/ipc.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type config struct {
ensureNativeEndian bool
noAutoSchema bool
emitDictDeltas bool
skipEmittingSchema bool
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove this?

minSpaceSavings *float64
}

Expand Down
27 changes: 17 additions & 10 deletions arrow/ipc/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ type Writer struct {

// map of the last written dictionaries by id
// so we can avoid writing the same dictionary over and over
lastWrittenDicts map[int64]arrow.Array
emitDictDeltas bool
lastWrittenDicts map[int64]arrow.Array
emitDictDeltas bool
skipEmittingSchema bool
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, can we revert the changes for skipEmittingSchema since we are just adding the WritePayload method?

}

// NewWriterWithPayloadWriter constructs a writer with the provided payload writer
Expand All @@ -116,14 +117,15 @@ func NewWriterWithPayloadWriter(pw PayloadWriter, opts ...Option) *Writer {
func NewWriter(w io.Writer, opts ...Option) *Writer {
cfg := newConfig(opts...)
return &Writer{
w: w,
mem: cfg.alloc,
pw: &streamWriter{w: w},
schema: cfg.schema,
codec: cfg.codec,
emitDictDeltas: cfg.emitDictDeltas,
compressNP: cfg.compressNP,
compressors: make([]compressor, cfg.compressNP),
w: w,
mem: cfg.alloc,
pw: &streamWriter{w: w},
schema: cfg.schema,
codec: cfg.codec,
emitDictDeltas: cfg.emitDictDeltas,
skipEmittingSchema: cfg.skipEmittingSchema,
compressNP: cfg.compressNP,
compressors: make([]compressor, cfg.compressNP),
}
}

Expand Down Expand Up @@ -276,6 +278,11 @@ func (w *Writer) start() error {
w.mapper.ImportSchema(w.schema)
w.lastWrittenDicts = make(map[int64]arrow.Array)

// skip writing schema payloads
if w.skipEmittingSchema {
return nil
}

// write out schema payloads
ps := payloadFromSchema(w.schema, w.mem, &w.mapper)
defer ps.Release()
Expand Down
25 changes: 25 additions & 0 deletions arrow/ipc/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,3 +332,28 @@ func TestGetPayloads(t *testing.T) {

assert.Truef(t, array.RecordEqual(rec, got), "expected: %s\ngot: %s", rec, got)
}

func TestWritePayload(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)

bldr := array.NewRecordBuilder(mem, arrow.NewSchema([]arrow.Field{{Name: "col", Type: arrow.PrimitiveTypes.Int8}}, nil))
bldr.Field(0).(*array.Int8Builder).AppendValues([]int8{1, 2, 3, 4, 5}, nil)
rec := bldr.NewRecord()
defer rec.Release()

var buf bytes.Buffer
p, err := GetRecordBatchPayload(rec, WithAllocator(mem))
defer p.Release()
require.NoError(t, err)

_, err = p.WritePayload(&buf)
require.NoError(t, err)

r := NewMessageReader(&buf, WithAllocator(mem))
defer r.Release()

msg, err := r.Message()
require.NoError(t, err)
require.True(t, msg.Type() == MessageRecordBatch)
}
Loading