1515 */
1616package org .openrewrite ;
1717
18+ import com .fasterxml .jackson .databind .DeserializationFeature ;
19+ import com .fasterxml .jackson .databind .ObjectMapper ;
20+ import com .fasterxml .jackson .module .paramnames .ParameterNamesModule ;
1821import com .univocity .parsers .csv .CsvParser ;
1922import com .univocity .parsers .csv .CsvParserSettings ;
2023import com .univocity .parsers .csv .CsvWriter ;
2831import java .nio .charset .StandardCharsets ;
2932import java .nio .file .Files ;
3033import java .nio .file .Path ;
34+ import java .nio .file .StandardOpenOption ;
3135import java .security .MessageDigest ;
3236import java .security .NoSuchAlgorithmException ;
3337import java .util .*;
4448 * Supports configurable output stream creation (e.g., for GZIP compression)
4549 * and static prefix/suffix columns (e.g., repository metadata).
4650 *
51+ * <p>
52+ * When {@link #getRows} is called, open writers for the requested table are
53+ * <em>closed</em> (not merely flushed) so that compression trailers such as
54+ * the GZIP footer are written, producing a fully valid file on disk. If
55+ * {@link #insertRow} is called again for the same table, a new writer is
56+ * created in append mode. For this reason the {@code outputStreamFactory}
57+ * <strong>must</strong> open the stream with {@link java.nio.file.StandardOpenOption#CREATE
58+ * CREATE} and {@link java.nio.file.StandardOpenOption#APPEND APPEND} semantics
59+ * so that data written before the close is preserved. For compressed streams
60+ * this produces a multi-member archive (e.g., concatenated GZIP members),
61+ * which {@link java.util.zip.GZIPInputStream} handles transparently.
62+ *
4763 * <pre>{@code
4864 * // Plain CSV
4965 * new CsvDataTableStore(outputDir)
5066 *
5167 * // GZIP compressed with repository columns (write-only)
5268 * new CsvDataTableStore(outputDir,
53- * path -> new GZIPOutputStream(Files.newOutputStream(path)),
69+ * path -> new GZIPOutputStream(Files.newOutputStream(path,
70+ * StandardOpenOption.CREATE, StandardOpenOption.APPEND)),
5471 * ".csv.gz",
5572 * Map.of("repositoryOrigin", origin, "repositoryPath", path),
5673 * Map.of("org1", orgValue))
5774 *
5875 * // GZIP compressed with read-back support
5976 * new CsvDataTableStore(outputDir,
60- * path -> new GZIPOutputStream(Files.newOutputStream(path)),
77+ * path -> new GZIPOutputStream(Files.newOutputStream(path,
78+ * StandardOpenOption.CREATE, StandardOpenOption.APPEND)),
6179 * path -> new GZIPInputStream(Files.newInputStream(path)),
6280 * ".csv.gz",
6381 * Map.of("repositoryOrigin", origin, "repositoryPath", path),
@@ -73,6 +91,8 @@ public class CsvDataTableStore implements DataTableStore, AutoCloseable {
7391 private final Map <String , String > prefixColumns ;
7492 private final Map <String , String > suffixColumns ;
7593 private final ConcurrentHashMap <String , BucketWriter > writers = new ConcurrentHashMap <>();
94+ private final ConcurrentHashMap <String , RowMetadata > rowMetadata = new ConcurrentHashMap <>();
95+ private final ConcurrentHashMap <String , DataTable <?>> knownTables = new ConcurrentHashMap <>();
7696
7797 /**
7898 * Create a store that writes plain CSV files.
@@ -112,7 +132,11 @@ public CsvDataTableStore(Path outputDir,
112132 * and additional static columns prepended/appended to each row.
113133 *
114134 * @param outputDir directory to write files into
115- * @param outputStreamFactory creates an output stream for each file path (e.g., wrapping with GZIPOutputStream)
135+ * @param outputStreamFactory creates an output stream for each file path. <strong>Must</strong> use
136+ * {@link java.nio.file.StandardOpenOption#CREATE CREATE} and
137+ * {@link java.nio.file.StandardOpenOption#APPEND APPEND} so that
138+ * rows written before a mid-run {@link #getRows} call are preserved
139+ * when the writer is re-opened for subsequent inserts.
116140 * @param inputStreamFactory creates an input stream for each file path (e.g., wrapping with GZIPInputStream)
117141 * @param fileExtension file extension including dot (e.g., ".csv" or ".csv.gz")
118142 * @param prefixColumns static columns prepended to each row, in insertion order
@@ -139,7 +163,7 @@ public CsvDataTableStore(Path outputDir,
139163
140164 private static OutputStream defaultOutputStream (Path path ) {
141165 try {
142- return Files .newOutputStream (path );
166+ return Files .newOutputStream (path , StandardOpenOption . CREATE , StandardOpenOption . APPEND );
143167 } catch (IOException e ) {
144168 throw new UncheckedIOException (e );
145169 }
@@ -155,32 +179,53 @@ private static InputStream defaultInputStream(Path path) {
155179
156180 @ Override
157181 public <Row > void insertRow (DataTable <Row > dataTable , ExecutionContext ctx , Row row ) {
182+ String metaKey = metaKey (dataTable .getName (), dataTable .getGroup ());
183+ rowMetadata .computeIfAbsent (metaKey , k -> RowMetadata .from (dataTable ));
184+ knownTables .putIfAbsent (fileKey (dataTable ), dataTable );
158185 String fileKey = fileKey (dataTable );
159186 BucketWriter writer = writers .computeIfAbsent (fileKey , k -> createBucketWriter (dataTable ));
160187 writer .writeRow (row );
161188 }
162189
163190 @ Override
164191 public Stream <?> getRows (String dataTableName , @ Nullable String group ) {
165- // Flush any open writers for this data table so all rows are on disk
166- for (BucketWriter writer : writers .values ()) {
192+ // Close (not just flush) matching writers so that compression trailers
193+ // (e.g., GZIP footer) are written, making the files fully readable.
194+ // Removed writers will be lazily re-created in append mode on the next insertRow().
195+ Iterator <Map .Entry <String , BucketWriter >> it = writers .entrySet ().iterator ();
196+ while (it .hasNext ()) {
197+ Map .Entry <String , BucketWriter > entry = it .next ();
198+ BucketWriter writer = entry .getValue ();
167199 if (writer .dataTable .getName ().equals (dataTableName ) &&
168200 Objects .equals (writer .dataTable .getGroup (), group )) {
169- writer .flush ();
201+ writer .close ();
202+ it .remove ();
170203 }
171204 }
172205
173- List <String []> allRows = new ArrayList <>();
206+ RowMetadata meta = rowMetadata .get (metaKey (dataTableName , group ));
207+
208+ List <Object > allRows = new ArrayList <>();
174209 //noinspection DataFlowIssue
175210 File [] files = outputDir .toFile ().listFiles ((dir , name ) -> name .endsWith (fileExtension ));
176211 if (files == null ) {
177212 return Stream .empty ();
178213 }
179214
215+ // Build set of file paths with still-open writers (other tables).
216+ // These files have incomplete compression trailers and cannot be read.
217+ Set <Path > activeWriterPaths = new HashSet <>();
218+ for (Map .Entry <String , BucketWriter > entry : writers .entrySet ()) {
219+ activeWriterPaths .add (outputDir .resolve (entry .getKey () + fileExtension ));
220+ }
221+
180222 int prefixCount = prefixColumns .size ();
181223 int suffixCount = suffixColumns .size ();
182224
183225 for (File file : files ) {
226+ if (activeWriterPaths .contains (file .toPath ())) {
227+ continue ;
228+ }
184229 try (InputStream is = inputStreamFactory .apply (file .toPath ())) {
185230 DataTableDescriptor descriptor = readDescriptor (is );
186231 if (descriptor == null ||
@@ -191,7 +236,7 @@ public Stream<?> getRows(String dataTableName, @Nullable String group) {
191236 // readDescriptor consumed comment lines; now parse the remaining CSV
192237 // (header + data rows). Re-read the full file with CsvParser.
193238 } catch (IOException e ) {
194- continue ;
239+ throw new UncheckedIOException ( e ) ;
195240 }
196241
197242 try (InputStream is = inputStreamFactory .apply (file .toPath ())) {
@@ -205,13 +250,14 @@ public Stream<?> getRows(String dataTableName, @Nullable String group) {
205250 while ((row = parser .parseNext ()) != null ) {
206251 // Strip prefix and suffix columns, returning only data table columns
207252 int dataCount = row .length - prefixCount - suffixCount ;
253+ String [] dataRow ;
208254 if (dataCount <= 0 ) {
209- allRows . add ( row ) ;
255+ dataRow = row ;
210256 } else {
211- String [] dataRow = new String [dataCount ];
257+ dataRow = new String [dataCount ];
212258 System .arraycopy (row , prefixCount , dataRow , 0 , dataCount );
213- allRows .add (dataRow );
214259 }
260+ allRows .add (meta != null ? meta .toRow (dataRow ) : dataRow );
215261 }
216262 parser .stopParsing ();
217263 } catch (IOException e ) {
@@ -224,11 +270,7 @@ public Stream<?> getRows(String dataTableName, @Nullable String group) {
224270
225271 @ Override
226272 public Collection <DataTable <?>> getDataTables () {
227- List <DataTable <?>> result = new ArrayList <>(writers .size ());
228- for (BucketWriter writer : writers .values ()) {
229- result .add (writer .dataTable );
230- }
231- return Collections .unmodifiableCollection (result );
273+ return Collections .unmodifiableCollection (knownTables .values ());
232274 }
233275
234276 @ Override
@@ -242,6 +284,7 @@ public void close() {
242284 private BucketWriter createBucketWriter (DataTable <?> dataTable ) {
243285 String fileKey = fileKey (dataTable );
244286 Path path = outputDir .resolve (fileKey + fileExtension );
287+ boolean append = Files .exists (path );
245288
246289 DataTableDescriptor descriptor = dataTableDescriptorFromDataTable (dataTable );
247290 List <String > fieldNames = new ArrayList <>();
@@ -259,15 +302,17 @@ private BucketWriter createBucketWriter(DataTable<?> dataTable) {
259302 OutputStream os = outputStreamFactory .apply (path );
260303 try {
261304 CsvWriterSettings settings = new CsvWriterSettings ();
262- settings .setHeaderWritingEnabled (true );
305+ settings .setHeaderWritingEnabled (! append );
263306 settings .getFormat ().setComment ('#' );
264307 CsvWriter csvWriter = new CsvWriter (os , settings );
265308
266- // Write metadata as comments
267- csvWriter .commentRow (" @name " + dataTable .getName ());
268- csvWriter .commentRow (" @instanceName " + dataTable .getInstanceName ());
269- csvWriter .commentRow (" @group " + (dataTable .getGroup () != null ? dataTable .getGroup () : "" ));
270- csvWriter .writeHeaders (headers );
309+ if (!append ) {
310+ // Write metadata as comments only for new files
311+ csvWriter .commentRow (" @name " + dataTable .getName ());
312+ csvWriter .commentRow (" @instanceName " + dataTable .getInstanceName ());
313+ csvWriter .commentRow (" @group " + (dataTable .getGroup () != null ? dataTable .getGroup () : "" ));
314+ csvWriter .writeHeaders (headers );
315+ }
271316
272317 return new BucketWriter (dataTable , csvWriter , os , fieldNames , headers .size ());
273318 } catch (Exception e ) {
@@ -326,19 +371,57 @@ synchronized void writeRow(Object row) {
326371 csvWriter .writeRow ((Object []) values );
327372 }
328373
329- synchronized void flush () {
330- csvWriter .flush ();
374+ void close () {
375+ csvWriter .close ();
331376 try {
332- os .flush ();
377+ os .close ();
333378 } catch (IOException ignored ) {
334379 }
335380 }
381+ }
336382
337- void close () {
338- csvWriter .close ();
383+ private static String metaKey (String dataTableName , @ Nullable String group ) {
384+ return dataTableName + "\0 " + (group != null ? group : "" );
385+ }
386+
387+ /**
388+ * Holds the row class and its @Column field names so that
389+ * String[] rows read from CSV can be deserialized back to typed objects
390+ * via Jackson's {@link ObjectMapper#convertValue}.
391+ */
392+ private static class RowMetadata {
393+ private static final ObjectMapper MAPPER = new ObjectMapper ()
394+ .registerModule (new ParameterNamesModule ())
395+ .configure (DeserializationFeature .FAIL_ON_UNKNOWN_PROPERTIES , false );
396+
397+ final String rowClassName ;
398+ final List <String > fieldNames ;
399+
400+ RowMetadata (String rowClassName , List <String > fieldNames ) {
401+ this .rowClassName = rowClassName ;
402+ this .fieldNames = fieldNames ;
403+ }
404+
405+ static RowMetadata from (DataTable <?> dataTable ) {
406+ Class <?> rowClass = dataTable .getType ();
407+ List <String > names = new ArrayList <>();
408+ for (Field f : rowClass .getDeclaredFields ()) {
409+ if (f .isAnnotationPresent (Column .class )) {
410+ names .add (f .getName ());
411+ }
412+ }
413+ return new RowMetadata (rowClass .getName (), names );
414+ }
415+
416+ Object toRow (String [] values ) {
417+ Map <String , String > map = new LinkedHashMap <>();
418+ for (int i = 0 ; i < fieldNames .size (); i ++) {
419+ map .put (fieldNames .get (i ), i < values .length ? values [i ] : "" );
420+ }
339421 try {
340- os .close ();
341- } catch (IOException ignored ) {
422+ return MAPPER .convertValue (map , Class .forName (rowClassName ));
423+ } catch (ClassNotFoundException e ) {
424+ throw new IllegalStateException ("Row class not found: " + rowClassName , e );
342425 }
343426 }
344427 }
0 commit comments