Skip to content

Commit d5f0c34

Browse files
fix(parquet/pqarrow): return an error on pqarrow write calls if the writer is already closed (#728)
### Rationale for this change Currently, the pqarrow FileWriter ignores the `closed` status of the FileWriter and write calls are attempted which can lead to surprising and hard to understand errors as noted in issue #727 . Returning a clear error stating the `FileWriter` is already closed should help nudge users about their misuse of the API (or provide an indicator of a potential race condition between invocations of Write calls and Close). . ### What changes are included in this PR? Adds a check on all `FileWriter` write methods to validate is a `FileWriter` is already closed to return a clear error and short-circuit the write execution. ### Are these changes tested? Yes, a unit tested was added to validate the behavior. ### Are there any user-facing changes? resolves #727
1 parent f831c1a commit d5f0c34

2 files changed

Lines changed: 34 additions & 0 deletions

File tree

parquet/pqarrow/file_writer.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,9 @@ func (fw *FileWriter) NumRows() int {
187187
// More memory is utilized compared to Write as the whole row group data is kept in memory before it's written
188188
// since Parquet files must have an entire column written before writing the next column.
189189
func (fw *FileWriter) WriteBuffered(rec arrow.RecordBatch) error {
190+
if fw.closed {
191+
return fmt.Errorf("WriteBuffered called on already closed FileWriter")
192+
}
190193
if !rec.Schema().Equal(fw.schema) {
191194
return fmt.Errorf("record schema does not match writer's. \nrecord: %s\nwriter: %s", rec.Schema(), fw.schema)
192195
}
@@ -241,6 +244,9 @@ func (fw *FileWriter) WriteBuffered(rec arrow.RecordBatch) error {
241244
// * a highly-restricted memory environment
242245
// * very large records with lots of rows (potentially close to the max row group length)
243246
func (fw *FileWriter) Write(rec arrow.RecordBatch) error {
247+
if fw.closed {
248+
return fmt.Errorf("invalid write call: FileWriter is already closed")
249+
}
244250
if !rec.Schema().Equal(fw.schema) {
245251
return fmt.Errorf("record schema does not match writer's. \nrecord: %s\nwriter: %s", rec.Schema(), fw.schema)
246252
}
@@ -276,6 +282,9 @@ func (fw *FileWriter) Write(rec arrow.RecordBatch) error {
276282
// row group for each chunk of chunkSize rows in the table. Calling this with 0 rows will
277283
// still write a 0 length Row Group to the file.
278284
func (fw *FileWriter) WriteTable(tbl arrow.Table, chunkSize int64) error {
285+
if fw.closed {
286+
return fmt.Errorf("invalid write call: FileWriter is already closed")
287+
}
279288
if chunkSize <= 0 && tbl.NumRows() > 0 {
280289
return xerrors.New("chunk size per row group must be greater than 0")
281290
} else if !tbl.Schema().Equal(fw.schema) {
@@ -344,6 +353,9 @@ func (fw *FileWriter) Close() error {
344353
// building of writing columns to a file via arrow data without needing to already have
345354
// a record or table.
346355
func (fw *FileWriter) WriteColumnChunked(data *arrow.Chunked, offset, size int64) error {
356+
if fw.closed {
357+
return fmt.Errorf("invalid write call: FileWriter is already closed")
358+
}
347359
acw, err := newArrowColumnWriter(data, offset, size, fw.manifest, fw.rgw, fw.colIdx)
348360
if err != nil {
349361
return err
@@ -356,6 +368,9 @@ func (fw *FileWriter) WriteColumnChunked(data *arrow.Chunked, offset, size int64
356368
// it is based on the current column of the row group writer allowing progressive building
357369
// of the file by columns without needing a full record or table to write.
358370
func (fw *FileWriter) WriteColumnData(data arrow.Array) error {
371+
if fw.closed {
372+
return fmt.Errorf("invalid write call: FileWriter is already closed")
373+
}
359374
chunked := arrow.NewChunked(data.DataType(), []arrow.Array{data})
360375
defer chunked.Release()
361376
return fw.WriteColumnChunked(chunked, 0, int64(data.Len()))

parquet/pqarrow/file_writer_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,3 +208,22 @@ func TestFileWriterTotalBytesBuffered(t *testing.T) {
208208
assert.Equal(t, int64(596), writer.TotalCompressedBytes())
209209
assert.Equal(t, int64(1306), writer.TotalBytesWritten())
210210
}
211+
212+
func TestWriteOnClosedFileWriter(t *testing.T) {
213+
schema := arrow.NewSchema([]arrow.Field{
214+
{Name: "one", Nullable: true, Type: arrow.PrimitiveTypes.Float64},
215+
}, nil)
216+
217+
output := &bytes.Buffer{}
218+
writer, err := pqarrow.NewFileWriter(schema, output, parquet.NewWriterProperties(), pqarrow.DefaultWriterProps())
219+
require.NoError(t, err)
220+
221+
// Close the writer
222+
require.NoError(t, writer.Close())
223+
224+
// Call each write method and ensure they all return an error stating the writer is already closed
225+
require.ErrorContains(t, writer.WriteBuffered(nil), "already closed")
226+
require.ErrorContains(t, writer.Write(nil), "already closed")
227+
require.ErrorContains(t, writer.WriteColumnChunked(nil, 0, 0), "already closed")
228+
require.ErrorContains(t, writer.WriteColumnData(nil), "already closed")
229+
}

0 commit comments

Comments
 (0)