1515 */
1616package io .gravitee .policy .callout ;
1717
18+ import static io .gravitee .common .util .VertxProxyOptionsUtils .setSystemProxy ;
19+ import static java .util .stream .Collectors .toList ;
20+
21+ import io .gravitee .gateway .reactive .api .ExecutionFailure ;
1822import io .gravitee .gateway .reactive .api .context .HttpExecutionContext ;
19- import io .gravitee .gateway .reactive .api .context .MessageExecutionContext ;
2023import io .gravitee .gateway .reactive .api .policy .Policy ;
24+ import io .gravitee .node .api .configuration .Configuration ;
2125import io .gravitee .policy .callout .configuration .CalloutHttpPolicyConfiguration ;
26+ import io .gravitee .policy .callout .configuration .HttpHeader ;
2227import io .gravitee .policy .v3 .callout .CalloutHttpPolicyV3 ;
2328import io .reactivex .rxjava3 .core .Completable ;
24- import org .slf4j .Logger ;
25- import org .slf4j .LoggerFactory ;
29+ import io .reactivex .rxjava3 .core .Flowable ;
30+ import io .reactivex .rxjava3 .core .Maybe ;
31+ import io .reactivex .rxjava3 .core .Single ;
32+ import io .vertx .core .http .HttpClientOptions ;
33+ import io .vertx .core .http .HttpHeaders ;
34+ import io .vertx .core .http .RequestOptions ;
35+ import io .vertx .rxjava3 .core .Vertx ;
36+ import io .vertx .rxjava3 .core .buffer .Buffer ;
37+ import java .net .URI ;
38+ import java .util .List ;
39+ import java .util .Optional ;
40+ import lombok .extern .slf4j .Slf4j ;
2641
2742/**
2843 * @author David BRASSELY (david.brassely at graviteesource.com)
2944 * @author GraviteeSource Team
3045 */
46+ @ Slf4j
3147public class CalloutHttpPolicy extends CalloutHttpPolicyV3 implements Policy {
3248
3349 public CalloutHttpPolicy (CalloutHttpPolicyConfiguration configuration ) {
@@ -41,21 +57,151 @@ public String id() {
4157
4258 @ Override
4359 public Completable onRequest (HttpExecutionContext ctx ) {
44- return Policy . super . onRequest ( ctx );
60+ return Completable . defer (() -> doCallOut ( ctx ) );
4561 }
4662
4763 @ Override
4864 public Completable onResponse (HttpExecutionContext ctx ) {
49- return Policy . super . onResponse ( ctx );
65+ return Completable . defer (() -> doCallOut ( ctx ) );
5066 }
5167
52- @ Override
53- public Completable onMessageRequest (MessageExecutionContext ctx ) {
54- return Policy .super .onMessageRequest (ctx );
68+ private Completable doCallOut (HttpExecutionContext ctx ) {
69+ var templateEngine = ctx .getTemplateEngine ();
70+ var vertx = ctx .getComponent (Vertx .class );
71+
72+ var url = templateEngine .eval (configuration .getUrl (), String .class ).switchIfEmpty (Single .just (configuration .getUrl ()));
73+ var body = configuration .getBody () != null
74+ ? templateEngine
75+ .eval (configuration .getBody (), String .class )
76+ .map (Optional ::of )
77+ .switchIfEmpty (Single .just (Optional .ofNullable (configuration .getBody ())))
78+ : Single .just (Optional .<String >empty ());
79+ var headers = Flowable
80+ .fromIterable (configuration .getHeaders ())
81+ .flatMap (header -> {
82+ if (header .getValue () != null ) {
83+ return templateEngine
84+ .eval (header .getValue (), String .class )
85+ .map (value -> new HttpHeader (header .getName (), value ))
86+ .switchIfEmpty (Single .just (header ))
87+ .toFlowable ();
88+ }
89+ return Flowable .empty ();
90+ })
91+ .collect (toList ());
92+
93+ return Single
94+ .zip (url , body , headers , Req ::new )
95+ .flatMap (reqConfig -> {
96+ var target = URI .create (reqConfig .url );
97+ var httpClient = vertx .createHttpClient (buildHttpClientOptions (ctx , target ));
98+ var requestOpts = new RequestOptions ().setAbsoluteURI (reqConfig .url ).setMethod (convert (configuration .getMethod ()));
99+
100+ return httpClient
101+ .rxRequest (requestOpts )
102+ .flatMap (req -> {
103+ if (reqConfig .headerList () != null ) {
104+ reqConfig
105+ .headerList ()
106+ .stream ()
107+ .filter (header -> header .getValue () != null )
108+ .forEach (header -> req .putHeader (header .getName (), header .getValue ()));
109+ }
110+
111+ if (reqConfig .body ().isPresent () && !reqConfig .body ().get ().isEmpty ()) {
112+ req .headers ().remove (HttpHeaders .TRANSFER_ENCODING );
113+ // Removing Content-Length header to let VertX automatically set it correctly
114+ req .headers ().remove (HttpHeaders .CONTENT_LENGTH );
115+ return req .rxSend (Buffer .buffer (reqConfig .body ().get ()));
116+ }
117+
118+ return req .send ();
119+ });
120+ })
121+ .onErrorResumeNext (throwable -> Single .error (new CalloutException (throwable )))
122+ .flatMap (httpClientResponse ->
123+ httpClientResponse
124+ .body ()
125+ .map (responseBody -> new CalloutResponse (httpClientResponse .getDelegate (), responseBody .toString ()))
126+ )
127+ .flatMapCompletable (calloutResponse -> processCalloutResponse (ctx , calloutResponse ))
128+ .onErrorResumeNext (th -> {
129+ if (th instanceof CalloutException && configuration .isExitOnError ()) {
130+ return ctx .interruptWith (
131+ new ExecutionFailure (configuration .getErrorStatusCode ()).key (CALLOUT_HTTP_ERROR ).message (th .getCause ().getMessage ())
132+ );
133+ }
134+ return Completable .error (th );
135+ });
55136 }
56137
57- @ Override
58- public Completable onMessageResponse (MessageExecutionContext ctx ) {
59- return Policy .super .onMessageResponse (ctx );
138+ private HttpClientOptions buildHttpClientOptions (HttpExecutionContext ctx , URI target ) {
139+ var options = new HttpClientOptions ();
140+ if (HTTPS_SCHEME .equalsIgnoreCase (target .getScheme ())) {
141+ options .setSsl (true ).setTrustAll (true ).setVerifyHost (false );
142+ }
143+
144+ if (configuration .isUseSystemProxy ()) {
145+ Configuration configuration = ctx .getComponent (Configuration .class );
146+ try {
147+ setSystemProxy (options , configuration );
148+ } catch (IllegalStateException e ) {
149+ log .warn (
150+ "CalloutHttp requires a system proxy to be defined but some configurations are missing or not well defined: {}. Ignoring proxy" ,
151+ e .getMessage ()
152+ );
153+ }
154+ }
155+ return options ;
156+ }
157+
158+ private Completable processCalloutResponse (HttpExecutionContext ctx , CalloutResponse calloutResponse ) {
159+ if (configuration .isFireAndForget ()) {
160+ return Completable .complete ();
161+ }
162+
163+ // Variables and exit on error are only managed if the fire & forget is disabled.
164+ ctx .getTemplateEngine ().getTemplateContext ().setVariable (TEMPLATE_VARIABLE , calloutResponse );
165+
166+ if (configuration .isExitOnError ()) {
167+ return ctx
168+ .getTemplateEngine ()
169+ .eval (configuration .getErrorCondition (), Boolean .class )
170+ .flatMapCompletable (exit -> {
171+ if (!exit ) {
172+ return processSuccess (ctx );
173+ }
174+
175+ return processError (ctx );
176+ });
177+ }
178+
179+ return processSuccess (ctx );
60180 }
181+
182+ private Completable processSuccess (HttpExecutionContext ctx ) {
183+ return Flowable
184+ .fromIterable (configuration .getVariables ())
185+ .flatMapCompletable (variable -> {
186+ ctx .setAttribute (variable .getName (), null );
187+ return Maybe
188+ .just (variable .getValue ())
189+ .flatMap (value -> ctx .getTemplateEngine ().eval (value , String .class ))
190+ .doOnSuccess (value -> ctx .setAttribute (variable .getName (), value ))
191+ .ignoreElement ();
192+ })
193+ .doOnComplete (() -> ctx .getTemplateEngine ().getTemplateContext ().setVariable (TEMPLATE_VARIABLE , null ));
194+ }
195+
196+ private Completable processError (HttpExecutionContext ctx ) {
197+ return Maybe
198+ .fromSupplier (configuration ::getErrorContent )
199+ .flatMap (content -> ctx .getTemplateEngine ().eval (content , String .class ))
200+ .switchIfEmpty (Single .just ("Request is terminated." ))
201+ .flatMapCompletable (errorContent ->
202+ ctx .interruptWith (new ExecutionFailure (configuration .getErrorStatusCode ()).key (CALLOUT_EXIT_ON_ERROR ).message (errorContent ))
203+ );
204+ }
205+
206+ record Req (String url , Optional <String > body , List <HttpHeader > headerList ) {}
61207}
0 commit comments