1212import lombok .AllArgsConstructor ;
1313import lombok .Data ;
1414import lombok .RequiredArgsConstructor ;
15- import org .apache .catalina .connector .ClientAbortException ;
1615import org .slf4j .Logger ;
1716import org .slf4j .LoggerFactory ;
1817import org .springframework .beans .factory .annotation .Value ;
1918import org .springframework .stereotype .Service ;
2019
2120import java .io .BufferedInputStream ;
22- import java .io .EOFException ;
2321import java .io .InputStream ;
2422import java .util .ArrayList ;
2523import java .util .List ;
2624import java .util .concurrent .CompletableFuture ;
27- import java .util .concurrent .atomic .AtomicInteger ;
25+ import java .util .concurrent .atomic .AtomicLong ;
2826
2927@ Service
3028@ RequiredArgsConstructor
@@ -47,12 +45,12 @@ public JsonStreamingLoadResponse loadFromJsonStream(
4745 Class <T > type ,
4846 InvalidDataHandlerService <T > invalidDataHandlerService ,
4947 UpdateStrategy updateStrategy ,
50- PreWriteTriggerService <T > pretrigger ,
51- PostWriteTriggerService <T > posttrigger ) {
48+ PreWriteTriggerService <T > preTrigger ,
49+ PostWriteTriggerService <T > postTrigger ) throws DataLoadException {
5250
53- AtomicInteger updates = new AtomicInteger (0 );
54- AtomicInteger deletes = new AtomicInteger (0 );
55- AtomicInteger inserts = new AtomicInteger (0 );
51+ AtomicLong updates = new AtomicLong (0 );
52+ AtomicLong deletes = new AtomicLong (0 );
53+ AtomicLong inserts = new AtomicLong (0 );
5654 List <T > toSave = new ArrayList <>();
5755 List <CompletableFuture <BulkWriteResult >> futures = new ArrayList <>();
5856
@@ -70,9 +68,9 @@ public JsonStreamingLoadResponse loadFromJsonStream(
7068
7169 T document = objectMapper .treeToValue (node , type );
7270
73- if (pretrigger != null ) {
71+ if (preTrigger != null ) {
7472 // For a mutable model
75- pretrigger .modifyMutableDataPreWrite (document );
73+ preTrigger .modifyMutableDataPreWrite (document );
7674 // for an immutable model
7775 // document = pretrigger.newImmutableDataPreWritedocument);
7876 }
@@ -86,7 +84,7 @@ public JsonStreamingLoadResponse loadFromJsonStream(
8684 futures .add (
8785 repository
8886 .asyncWriteMany (
89- copyOfToSave , type , invalidDataHandlerService , updateStrategy , posttrigger )
87+ copyOfToSave , type , invalidDataHandlerService , updateStrategy , postTrigger )
9088 .thenApply (
9189 bulkWriteResult -> {
9290 updates .addAndGet (bulkWriteResult .getModifiedCount ());
@@ -103,7 +101,7 @@ public JsonStreamingLoadResponse loadFromJsonStream(
103101 futures .add (
104102 repository
105103 .asyncWriteMany (
106- toSave , type , invalidDataHandlerService , updateStrategy , posttrigger )
104+ toSave , type , invalidDataHandlerService , updateStrategy , postTrigger )
107105 .thenApply (
108106 bulkWriteResult -> {
109107 updates .addAndGet (bulkWriteResult .getModifiedCount ());
@@ -122,22 +120,23 @@ public JsonStreamingLoadResponse loadFromJsonStream(
122120 LOG .info ("Processed {} docs. Time taken: {}ms." , count , endTime - startTime );
123121 LOG .info ("Modified: {} Added: {} Removed: {}" , updates , inserts , deletes );
124122 return new JsonStreamingLoadResponse (updates .get (), deletes .get (), inserts .get (), true , "" );
125- } catch (EOFException | ClientAbortException eofe ) {
126- LOG .error ("Load Terminated as sender disconnected: {}" , eofe .getMessage ());
127- return null ;
128123 } catch (Exception e ) {
129124 LOG .error ("Error during data load process: {}" , e .getMessage ());
130- return new JsonStreamingLoadResponse (
131- updates .get (), deletes .get (), inserts .get (), false , e .getMessage ());
125+ throw new DataLoadException (
126+ updates .get (),
127+ deletes .get (),
128+ inserts .get (),
129+ "Error during data load process: " + e .getMessage (),
130+ e );
132131 }
133132 }
134133
135134 @ Data
136135 @ AllArgsConstructor
137136 public static class JsonStreamingLoadResponse {
138- int updates ;
139- int deletes ;
140- int inserts ;
137+ long updates ;
138+ long deletes ;
139+ long inserts ;
141140 boolean success ;
142141 String message ;
143142 }
0 commit comments