Skip to content

Commit c542dd6

Browse files
authored
feat: expose Payload.WritePayload to allow serializing into IPC format (#421)
### Rationale for this change The BigQuery Storage Write API now accepts Arrow data. But schema data and record batches needs to send separately. Right now the `ipc.NewWriter` writes the schema data to the output buffer, which makes it fail on the BigQuery side, as the first message is the schema. So we need a way to write just RecordBatches. See related discussion on googleapis/google-cloud-go#12478 ### What changes are included in this PR? Add `WritePayload ` method to `Payload`, which exposes the underlying `writeIPCPayload` method, allowing to write IPC messages more easily. ### Are these changes tested? Yes, tested with BigQuery Storage Write API. But also added local tests to it too. ### Are there any user-facing changes? Yes, `WritePayload` will be a public method available option to users.
1 parent c88dd3e commit c542dd6

2 files changed

Lines changed: 31 additions & 0 deletions

File tree

arrow/ipc/file_writer.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,12 @@ func (p *Payload) SerializeBody(w io.Writer) error {
203203
return nil
204204
}
205205

206+
// WritePayload serializes the payload in IPC format
207+
// into the provided writer.
208+
func (p *Payload) WritePayload(w io.Writer) (int, error) {
209+
return writeIPCPayload(w, *p)
210+
}
211+
206212
func (p *Payload) Release() {
207213
if p.meta != nil {
208214
p.meta.Release()

arrow/ipc/writer_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,3 +332,28 @@ func TestGetPayloads(t *testing.T) {
332332

333333
assert.Truef(t, array.RecordEqual(rec, got), "expected: %s\ngot: %s", rec, got)
334334
}
335+
336+
func TestWritePayload(t *testing.T) {
337+
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
338+
defer mem.AssertSize(t, 0)
339+
340+
bldr := array.NewRecordBuilder(mem, arrow.NewSchema([]arrow.Field{{Name: "col", Type: arrow.PrimitiveTypes.Int8}}, nil))
341+
bldr.Field(0).(*array.Int8Builder).AppendValues([]int8{1, 2, 3, 4, 5}, nil)
342+
rec := bldr.NewRecord()
343+
defer rec.Release()
344+
345+
var buf bytes.Buffer
346+
p, err := GetRecordBatchPayload(rec, WithAllocator(mem))
347+
defer p.Release()
348+
require.NoError(t, err)
349+
350+
_, err = p.WritePayload(&buf)
351+
require.NoError(t, err)
352+
353+
r := NewMessageReader(&buf, WithAllocator(mem))
354+
defer r.Release()
355+
356+
msg, err := r.Message()
357+
require.NoError(t, err)
358+
require.True(t, msg.Type() == MessageRecordBatch)
359+
}

0 commit comments

Comments
 (0)