Skip to content

feat: expose Payload.WritePayload to allow serializing into IPC format#421

Merged
zeroshade merged 7 commits intoapache:mainfrom
alvarowolfx:feat-schemaless-ipc-writer
Jun 25, 2025
Merged

feat: expose Payload.WritePayload to allow serializing into IPC format#421
zeroshade merged 7 commits intoapache:mainfrom
alvarowolfx:feat-schemaless-ipc-writer

Conversation

@alvarowolfx
Copy link
Copy Markdown
Contributor

@alvarowolfx alvarowolfx commented Jun 23, 2025

Rationale for this change

The BigQuery Storage Write API now accepts Arrow data. But schema data and record batches needs to send separately. Right now the ipc.NewWriter writes the schema data to the output buffer, which makes it fail on the BigQuery side, as the first message is the schema. So we need a way to write just RecordBatches.

See related discussion on googleapis/google-cloud-go#12478

What changes are included in this PR?

Add WritePayload method to Payload, which exposes the underlying writeIPCPayload method, allowing to write IPC messages more easily.

Are these changes tested?

Yes, tested with BigQuery Storage Write API. But also added local tests to it too.

Are there any user-facing changes?

Yes, WritePayload will be a public method available option to users.

@alvarowolfx
Copy link
Copy Markdown
Contributor Author

Naming is hard, so I'm open to suggestion on the WithSchemalessOutput option

@alvarowolfx alvarowolfx marked this pull request as ready for review June 24, 2025 14:10
@alvarowolfx alvarowolfx requested a review from zeroshade as a code owner June 24, 2025 14:10
@zeroshade
Copy link
Copy Markdown
Member

Is there any way we can convince the BigQuery side to instead allow the schema message?

The Arrow IPC spec states that there should be a schema message as the first message in the stream. By rejecting the schema message, BigQuery is requiring an invalid IPC stream.

Also, instead of adding a hardcoded option like this, could you just utilize the PayloadWriter interface to implement a payload writer which skips the schema payload while writing out the rest?

@alvarowolfx
Copy link
Copy Markdown
Contributor Author

Is there any way we can convince the BigQuery side to instead allow the schema message?

I'm also discussing that with the team, but those things take a bit more time. That was my first thought when I found this issue

The Arrow IPC spec states that there should be a schema message as the first message in the stream. By rejecting the schema message, BigQuery is requiring an invalid IPC stream.

Good to know that, will use that as more evidence that we might need to fix on the service side.

Also, instead of adding a hardcoded option like this, could you just utilize the PayloadWriter interface to implement a payload writer which skips the schema payload while writing out the rest?

I tried to go this route, but how to reuse all the internal function to do that, like writeIPCPayload function ?

@alvarowolfx
Copy link
Copy Markdown
Contributor Author

alvarowolfx commented Jun 24, 2025

I tried some silly implementation like this, but could not get it to work, it seems to be missing some alignment of the data, which is handled by the writeIPCPayload method. Also, the Payload type doesn't have the MessageType as an exported field, so I can't detect if is a schema or a RecordBatch. Of course I can just skip the first message, but sounds hacky too.

type schemalessPayloadWriter struct {
	buf *bytes.Buffer
}

func (f *schemalessPayloadWriter) Start() error { return nil }
func (f *schemalessPayloadWriter) WritePayload(payload ipc.Payload) error {
	m := payload.Meta()
	defer m.Release()
	f.buf.Write(m.Bytes())
	err := payload.SerializeBody(f.buf)
	return err
}

var _ ipc.PayloadWriter = &schemalessPayloadWriter{}

@alvarowolfx
Copy link
Copy Markdown
Contributor Author

@zeroshade I hit a wall here and could not make any PayloadWriter or using ipc.GetRecordBatchPayload to generate the same output of the ipc.Writer. I think we would need to export some parts of the writeIPCPayload and/or writeMessage functions to be reused to get it work, as I think what is missing is the aligning part.

@alvarowolfx
Copy link
Copy Markdown
Contributor Author

I pushed another commit hiding the WithSchemalessOutput option and actually adding a NewRecordBatchWriter, which is the end goal: have a writer that emits only RecordBatch messages. Let me know if you have any other thoughts on that.

On the BigQuery side, the argument is that indeed we are only suppose to send RecordBatches, as receiving a schema trigger some logic to check if there is any change to the table schema, which is a mechanism that also exists when using the protobuf format. This mechanism is expensive and should be avoided when possible.

@alvarowolfx alvarowolfx changed the title feat: add option for ipc.writer to skip writing schema feat: add ipc.NewRecordBatchWriter Jun 25, 2025
@zeroshade
Copy link
Copy Markdown
Member

I pushed another commit hiding the WithSchemalessOutput option and actually adding a NewRecordBatchWriter, which is the end goal: have a writer that emits only RecordBatch messages. Let me know if you have any other thoughts on that.

I'll take a look in a bit and get back with my thoughts.

On the BigQuery side, the argument is that indeed we are only suppose to send RecordBatches, as receiving a schema trigger some logic to check if there is any change to the table schema, which is a mechanism that also exists when using the protobuf format. This mechanism is expensive and should be avoided when possible.

Why is the schema verification expensive? Also, wouldn't you still need to run the same logic to check for changes to the table schema upon receiving the first RecordBatch? How does avoiding the Schema message allow you to avoid validating the schema?

@alvarowolfx
Copy link
Copy Markdown
Contributor Author

Why is the schema verification expensive?

It basically adds an extra database call/metadata check on the backend compare the BigQuery Table schema with Arrow Schema and see if there is any changes.

Also, wouldn't you still need to run the same logic to check for changes to the table schema upon receiving the first RecordBatch?

Yes, but only on the first RecordBatch and/or when opening the stream to start writing data. Also when the schema do changes, there is a separated field where the schema can be informed. Schema and serialized record batches goes on separated fields to the backend.

How does avoiding the Schema message allow you to avoid validating the schema?

We only check for schema changes, when the specific schema field is filled out, works the same in Arrow and Protobuf.

@zeroshade
Copy link
Copy Markdown
Member

Yes, but only on the first RecordBatch and/or when opening the stream to start writing data. Also when the schema do changes, there is a separated field where the schema can be informed. Schema and serialized record batches goes on separated fields to the backend.

Can't this logic just shift to only do the check on the Schema message then? A Schema message is semantically the same as a RecordBatch message with no rows. The same semantics that guarantee the record batches all have the same schema is still true for the schema message (along with dictionary messages). It seems like you have to validate the schema regardless (whether on the first record batch or on the schema message which is always the first message), so wouldn't it be preferable to accept a valid IPC stream (which contains the schema message) instead of an invalid IPC stream that is missing the schema message?

@zeroshade
Copy link
Copy Markdown
Member

It's still pretty easy to populate the protobuf by calling GetSchemaPayload and GetRecordBatchPayload etc. Though I'm curious if BigQuery supports dictionary encoded columns which require supporting the DictionaryBatch messages in the IPC stream

@alvarowolfx
Copy link
Copy Markdown
Contributor Author

Can't this logic just shift to only do the check on the Schema message then?

But the schema message would go on every request, not sure if I follow the logic here. The idea is to avoid sending the schema on every request.

To add another point, Pyarrow allows for reading the schema and recordbatches separately in IPC format:
https://cloud.google.com/bigquery/docs/write-api-streaming#arrow-format

from google.cloud.bigquery_storage_v1 import types as gapic_types
from google.cloud.bigquery_storage_v1.writer import AppendRowsStream
from google.cloud import bigquery_storage_v1

def append_rows_with_pyarrow(
  pyarrow_table: pyarrow.Table,
  project_id: str,
  dataset_id: str,
  table_id: str,
):
  bqstorage_write_client = bigquery_storage_v1.BigQueryWriteClient()

  # Create request_template.
  request_template = gapic_types.AppendRowsRequest()
  request_template.write_stream = (
      f"projects/{project_id}/datasets/{dataset_id}/tables/{table_id}/_default"
  )
  arrow_data = gapic_types.AppendRowsRequest.ArrowData()
  arrow_data.writer_schema.serialized_schema = (
      pyarrow_table.schema.serialize().to_pybytes()
  )
  request_template.arrow_rows = arrow_data

  # Create AppendRowsStream.
  append_rows_stream = AppendRowsStream(
      bqstorage_write_client,
      request_template,
  )

  # Create request with table data.
  request = gapic_types.AppendRowsRequest()
  request.arrow_rows.rows.serialized_record_batch = (
      pyarrow_table.to_batches()[0].serialize().to_pybytes()
  )

  # Send request.
  future = append_rows_stream.send(request)

  # Wait for result.
  future.result()

@alvarowolfx
Copy link
Copy Markdown
Contributor Author

It's still pretty easy to populate the protobuf by calling GetSchemaPayload and GetRecordBatchPayload etc. Though I'm curious if BigQuery supports dictionary encoded columns which require supporting the DictionaryBatch messages in the IPC stream

I tried this approach, but the messages got rejected, I'll get more information on the error here, but it seems like it doesn't expected DictionaryBatch messages.

@zeroshade
Copy link
Copy Markdown
Member

But the schema message would go on every request, not sure if I follow the logic here. The idea is to avoid sending the schema on every request.

My suggestion was that instead of avoiding sending the schema on every request, the backend would just use whatever logic it does to validate the schema of the first record batch to validate the schema and thus it would be able to skip validating the record batch schema (since it was already validated by validating the Schema message). Just shifting the logic from the first record batch message to the schema message (nothing else about the logic would change). As I said, according to the spec leaving off the schema message is technically an invalid IPC stream.

To add another point, Pyarrow allows for reading the schema and recordbatches separately in IPC format:
https://cloud.google.com/bigquery/docs/write-api-streaming#arrow-format

You can already do the equivalent to that Python code in Go, though I guess the issue you run into is the lack of the padding handling. If we simply add a new method to the Payload struct, we can achieve the exact same logic. This PR could instead just be the following:

// a drawback to this is having to use bytes.Buffer to get the raw bytes
// if you aren't already using an io.Writer.
func (p *Payload) WritePayload(w io.Writer) (int, error) {
        return writeIPCPayload(w, *p)
}

// alternatively if we just want to get the raw bytes, we can do
func (p *Payload) SerializedBytes() ([]byte, error) {
        var b bytes.Buffer
        _, err := writeIPCPayload(&b, *p)
        if err != nil {
               return err
        }
        return b.Bytes(), nil
}

Then you can create the equivalent Go to the pyarrow example you provided, without needing to have an entire new writer.

func appendRows(tbl arrow.Table, projectID, datasetID, tableID string) error {
       // create request etc....
       
       schemaPayload := ipc.GetSchemaPayload(tbl.Schema(), memory.DefaultAllocator)    
       serializedSchemaBytes, err := schemaPayload.SerializedBytes()
       if err != nil {
             return err
       }
       // do whatever you want with the byte slice for the schema

       rdr := array.NewTableReader(tbl, tbl.NumRows())
       defer rdr.Release()

       // the pyarrow example only uses the first record batch, you probably would instead use 
       // for rdr.Next() to loop over all the batches.... but i'll mirror the pyarrow example for now
       rdr.Next()
       payload, err := ipc.GetRecordBatchPayload(rdr.Record())
       if err != nil {
            return err
       }

       serializedRecordBytes, err := payload.SerializedBytes()
       if err != nil {
             return err
        }
        // do whatever you like with the serializedRecordBytes

        // ....
}       

@alvarowolfx
Copy link
Copy Markdown
Contributor Author

This is what I tried with GetSchemaPayload and GetRecordBatchPayload (missing some err handling code). I get Arrow Error: IOError: Invalid flatbuffers message from the BigQuery side.

func writeToBQClient(schema *arrow.Schema, records []arrow.Record, writeStreamName string, appendStream storagepb.BigQueryWrite_AppendRowsClient) error {
	// Serialize schema using IPC format (just the schema, no data)
	var err error
	var schemaBuf bytes.Buffer
	schemaPayload := ipc.GetSchemaPayload(schema, memory.DefaultAllocator)
	schemaMd := schemaPayload.Meta()
	defer schemaMd.Release()
	schemaBuf.Write(schemaMd.Bytes())
	schemaPayload.SerializeBody(&schemaBuf)

	schemaData := schemaBuf.Bytes()

	// Serialize record batch using IPC format
	var recordBuf bytes.Buffer
	for _, record := range records {
		p, err := ipc.GetRecordBatchPayload(record, ipc.WithSchema(schema))
		if err != nil {
			slog.Error("failed to write Arrow record", "error", err)
			return err
		}
		m := p.Meta()
		defer m.Release()
		recordBuf.Write(m.Bytes())
		p.SerializeBody(&recordBuf)
	}

	recordData := recordBuf.Bytes()

	slog.Info("Schema IPC data size", "bytes", len(schemaData))
	slog.Info("Record IPC data size", "bytes", len(recordData))

	request := &storagepb.AppendRowsRequest{
		WriteStream: writeStreamName,
		Rows: &storagepb.AppendRowsRequest_ArrowRows{
			ArrowRows: &storagepb.AppendRowsRequest_ArrowData{
				WriterSchema: &storagepb.ArrowSchema{
					SerializedSchema: schemaData,
				},
				Rows: &storagepb.ArrowRecordBatch{
					SerializedRecordBatch: recordData,
				},
			},
		},
	}

	// Send the request
	err = appendStream.Send(request)
	if err != nil {
		slog.Error("failed to send AppendRows request", "error", err)
		return err
	}

	err = appendStream.CloseSend()
	if err != nil {
		slog.Error("failed to close AppendRows request: %v")
		return err
	}

	return nil
}

@zeroshade
Copy link
Copy Markdown
Member

This is what I tried with GetSchemaPayload and GetRecordBatchPayload (missing some err handling code). I get Arrow Error: IOError: Invalid flatbuffers message from the BigQuery side.

Yea, that's because SerializeBody() doesn't perform the padding and alignment, hence my suggestion in #421 (comment) to just adding methods to the Payload object

@alvarowolfx
Copy link
Copy Markdown
Contributor Author

To add another point, Pyarrow allows for reading the schema and recordbatches separately in IPC format:
https://cloud.google.com/bigquery/docs/write-api-streaming#arrow-format

You can already do the equivalent to that Python code in Go, though I guess the issue you run into is the lack of the padding handling. If we simply add a new method to the Payload struct, we can achieve the exact same logic. This PR could instead just be the following:

// a drawback to this is having to use bytes.Buffer to get the raw bytes
// if you aren't already using an io.Writer.
func (p *Payload) WritePayload(w io.Writer) (int, error) {
        return writeIPCPayload(w, *p)
}

// alternatively if we just want to get the raw bytes, we can do
func (p *Payload) SerializedBytes() ([]byte, error) {
        var b bytes.Buffer
        _, err := writeIPCPayload(&b, *p)
        if err != nil {
               return err
        }
        return b.Bytes(), nil
}

Then you can create the equivalent Go to the pyarrow example you provided, without needing to have an entire new writer.

func appendRows(tbl arrow.Table, projectID, datasetID, tableID string) error {
       // create request etc....
       
       schemaPayload := ipc.GetSchemaPayload(tbl.Schema(), memory.DefaultAllocator)    
       serializedSchemaBytes, err := schemaPayload.SerializedBytes()
       if err != nil {
             return err
       }
       // do whatever you want with the byte slice for the schema

       rdr := array.NewTableReader(tbl, tbl.NumRows())
       defer rdr.Release()

       // the pyarrow example only uses the first record batch, you probably would instead use 
       // for rdr.Next() to loop over all the batches.... but i'll mirror the pyarrow example for now
       rdr.Next()
       payload, err := ipc.GetRecordBatchPayload(rdr.Record())
       if err != nil {
            return err
       }

       serializedRecordBytes, err := payload.SerializedBytes()
       if err != nil {
             return err
        }
        // do whatever you like with the serializedRecordBytes

        // ....
}       

this approach works for me too, I figure out that exposing writeIPCPayload was going to work too, but wasn't sure what was the best approach.

@zeroshade
Copy link
Copy Markdown
Member

zeroshade commented Jun 25, 2025

Yea, the preferred approach would be to simply expose writeIPCPayload through the Payload object instead of adding this WithSchemaless option or so on.

Well, the preferred approach would be that BigQuery accept a valid IPC stream with the Schema message 😄 but this works too

@alvarowolfx
Copy link
Copy Markdown
Contributor Author

Yea, the preferred approach would be to simply expose writeIPCPayload through the Payload object instead of adding this WithSchemaless option or so on.

Agree, doing this right now, exposing the WritePayload which accepts an io.Writer, which makes it more flexible.

Well, the preferred approach would be that BigQuery accept a valid IPC stream with the Schema message 😄 but this works too

Still trying to win this fight here, but I brought up the reasons on why is working this way currently.

@zeroshade
Copy link
Copy Markdown
Member

Still trying to win this fight here, but I brought up the reasons on why is working this way currently.

Absolutely, I appreciate the context and feel free to have anyone internal with more questions reach out to me if they want more info. I'm gonna hope you can win that fight 😄

@alvarowolfx alvarowolfx changed the title feat: add ipc.NewRecordBatchWriter feat: expose Payload.WritePayload to allow serializing into IPC format Jun 25, 2025
@alvarowolfx
Copy link
Copy Markdown
Contributor Author

alvarowolfx commented Jun 25, 2025

Pushed updates to the PR and updated description

Copy link
Copy Markdown
Member

@zeroshade zeroshade left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a nit pick

Comment thread arrow/ipc/ipc.go Outdated
ensureNativeEndian bool
noAutoSchema bool
emitDictDeltas bool
skipEmittingSchema bool
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove this?

Comment thread arrow/ipc/writer.go Outdated
Comment on lines +94 to +96
lastWrittenDicts map[int64]arrow.Array
emitDictDeltas bool
lastWrittenDicts map[int64]arrow.Array
emitDictDeltas bool
skipEmittingSchema bool
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, can we revert the changes for skipEmittingSchema since we are just adding the WritePayload method?

@alvarowolfx
Copy link
Copy Markdown
Contributor Author

just a nit pick

ah sorry, I'm stupid, forgot those changes. Let me remove here

@alvarowolfx alvarowolfx requested a review from zeroshade June 25, 2025 20:06
Copy link
Copy Markdown
Member

@zeroshade zeroshade left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this and the discussion. Looking forward to more discussion with BigQuery in the future!

@zeroshade zeroshade merged commit c542dd6 into apache:main Jun 25, 2025
23 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants