2020import org .apache .dubbo .common .extension .Activate ;
2121import org .apache .dubbo .common .logger .Logger ;
2222import org .apache .dubbo .common .logger .LoggerFactory ;
23- import org .apache .dubbo .common .utils .ArrayUtils ;
2423import org .apache .dubbo .common .utils .ConcurrentHashSet ;
2524import org .apache .dubbo .common .utils .ConfigUtils ;
2625import org .apache .dubbo .common .utils .NamedThreadFactory ;
3029import org .apache .dubbo .rpc .Result ;
3130import org .apache .dubbo .rpc .RpcContext ;
3231import org .apache .dubbo .rpc .RpcException ;
33-
34- import com .alibaba .fastjson .JSON ;
32+ import org .apache .dubbo .rpc .support .AccessLogData ;
3533
3634import java .io .File ;
3735import java .io .FileWriter ;
36+ import java .io .IOException ;
37+ import java .text .DateFormat ;
3838import java .text .SimpleDateFormat ;
3939import java .util .Date ;
4040import java .util .Iterator ;
4141import java .util .Map ;
4242import java .util .Set ;
4343import java .util .concurrent .ConcurrentHashMap ;
44- import java .util .concurrent .ConcurrentMap ;
4544import java .util .concurrent .Executors ;
4645import java .util .concurrent .ScheduledExecutorService ;
47- import java .util .concurrent .ScheduledFuture ;
4846import java .util .concurrent .TimeUnit ;
4947
5048/**
@@ -68,141 +66,134 @@ public class AccessLogFilter implements Filter {
6866
6967 private static final String ACCESS_LOG_KEY = "dubbo.accesslog" ;
7068
71- private static final String FILE_DATE_FORMAT = "yyyyMMdd" ;
72-
73- private static final String MESSAGE_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss" ;
74-
7569 private static final int LOG_MAX_BUFFER = 5000 ;
7670
7771 private static final long LOG_OUTPUT_INTERVAL = 5000 ;
7872
79- private final ConcurrentMap < String , Set < String >> logQueue = new ConcurrentHashMap < String , Set < String >>() ;
73+ private static final String FILE_DATE_FORMAT = "yyyyMMdd" ;
8074
81- private final ScheduledExecutorService logScheduled = Executors .newScheduledThreadPool (2 , new NamedThreadFactory ("Dubbo-Access-Log" , true ));
75+ // It's safe to declare it as singleton since it runs on single thread only
76+ private static final DateFormat FILE_NAME_FORMATTER = new SimpleDateFormat (FILE_DATE_FORMAT );
8277
83- private volatile ScheduledFuture <?> logFuture = null ;
78+ private static final Map < String , Set < AccessLogData >> logEntries = new ConcurrentHashMap < String , Set < AccessLogData >>() ;
8479
85- private void init () {
86- if (logFuture == null ) {
87- synchronized (logScheduled ) {
88- if (logFuture == null ) {
89- logFuture = logScheduled .scheduleWithFixedDelay (new LogTask (), LOG_OUTPUT_INTERVAL , LOG_OUTPUT_INTERVAL , TimeUnit .MILLISECONDS );
90- }
91- }
92- }
93- }
80+ private static final ScheduledExecutorService logScheduled = Executors .newSingleThreadScheduledExecutor (new NamedThreadFactory ("Dubbo-Access-Log" , true ));
9481
95- private void log (String accesslog , String logmessage ) {
96- init ();
97- Set <String > logSet = logQueue .get (accesslog );
98- if (logSet == null ) {
99- logQueue .putIfAbsent (accesslog , new ConcurrentHashSet <String >());
100- logSet = logQueue .get (accesslog );
101- }
102- if (logSet .size () < LOG_MAX_BUFFER ) {
103- logSet .add (logmessage );
104- }
82+ /**
83+ * Default constructor initialize demon thread for writing into access log file with names with access log key
84+ * defined in url <b>accesslog</b>
85+ */
86+ public AccessLogFilter () {
87+ logScheduled .scheduleWithFixedDelay (this ::writeLogToFile , LOG_OUTPUT_INTERVAL , LOG_OUTPUT_INTERVAL , TimeUnit .MILLISECONDS );
10588 }
10689
90+ /**
91+ * This method logs the access log for service method invocation call.
92+ *
93+ * @param invoker service
94+ * @param inv Invocation service method.
95+ * @return Result from service method.
96+ * @throws RpcException
97+ */
10798 @ Override
10899 public Result invoke (Invoker <?> invoker , Invocation inv ) throws RpcException {
109100 try {
110- String accesslog = invoker .getUrl ().getParameter (Constants .ACCESS_LOG_KEY );
111- if (ConfigUtils .isNotEmpty (accesslog )) {
112- RpcContext context = RpcContext .getContext ();
113- String serviceName = invoker .getInterface ().getName ();
114- String version = invoker .getUrl ().getParameter (Constants .VERSION_KEY );
115- String group = invoker .getUrl ().getParameter (Constants .GROUP_KEY );
116- StringBuilder sn = new StringBuilder ();
117- sn .append ("[" ).append (new SimpleDateFormat (MESSAGE_DATE_FORMAT ).format (new Date ())).append ("] " ).append (context .getRemoteHost ()).append (":" ).append (context .getRemotePort ())
118- .append (" -> " ).append (context .getLocalHost ()).append (":" ).append (context .getLocalPort ())
119- .append (" - " );
120- if (null != group && group .length () > 0 ) {
121- sn .append (group ).append ("/" );
122- }
123- sn .append (serviceName );
124- if (null != version && version .length () > 0 ) {
125- sn .append (":" ).append (version );
126- }
127- sn .append (" " );
128- sn .append (inv .getMethodName ());
129- sn .append ("(" );
130- Class <?>[] types = inv .getParameterTypes ();
131- if (types != null && types .length > 0 ) {
132- boolean first = true ;
133- for (Class <?> type : types ) {
134- if (first ) {
135- first = false ;
136- } else {
137- sn .append ("," );
138- }
139- sn .append (type .getName ());
140- }
141- }
142- sn .append (") " );
143- Object [] args = inv .getArguments ();
144- if (ArrayUtils .isNotEmpty (args )) {
145- sn .append (JSON .toJSONString (args ));
146- }
147- String msg = sn .toString ();
148- if (ConfigUtils .isDefault (accesslog )) {
149- LoggerFactory .getLogger (ACCESS_LOG_KEY + "." + invoker .getInterface ().getName ()).info (msg );
150- } else {
151- log (accesslog , msg );
152- }
101+ String accessLogKey = invoker .getUrl ().getParameter (Constants .ACCESS_LOG_KEY );
102+ if (ConfigUtils .isNotEmpty (accessLogKey )) {
103+ AccessLogData logData = buildAccessLogData (invoker , inv );
104+ log (accessLogKey , logData );
153105 }
154106 } catch (Throwable t ) {
155107 logger .warn ("Exception in AcessLogFilter of service(" + invoker + " -> " + inv + ")" , t );
156108 }
157109 return invoker .invoke (inv );
158110 }
159111
160- private class LogTask implements Runnable {
161- @ Override
162- public void run () {
163- try {
164- if (logQueue != null && logQueue .size () > 0 ) {
165- for (Map .Entry <String , Set <String >> entry : logQueue .entrySet ()) {
166- try {
167- String accesslog = entry .getKey ();
168- Set <String > logSet = entry .getValue ();
169- File file = new File (accesslog );
170- File dir = file .getParentFile ();
171- if (null != dir && !dir .exists ()) {
172- dir .mkdirs ();
173- }
174- if (logger .isDebugEnabled ()) {
175- logger .debug ("Append log to " + accesslog );
176- }
177- if (file .exists ()) {
178- String now = new SimpleDateFormat (FILE_DATE_FORMAT ).format (new Date ());
179- String last = new SimpleDateFormat (FILE_DATE_FORMAT ).format (new Date (file .lastModified ()));
180- if (!now .equals (last )) {
181- File archive = new File (file .getAbsolutePath () + "." + last );
182- file .renameTo (archive );
183- }
184- }
185- FileWriter writer = new FileWriter (file , true );
186- try {
187- for (Iterator <String > iterator = logSet .iterator ();
188- iterator .hasNext ();
189- iterator .remove ()) {
190- writer .write (iterator .next ());
191- writer .write ("\r \n " );
192- }
193- writer .flush ();
194- } finally {
195- writer .close ();
196- }
197- } catch (Exception e ) {
198- logger .error (e .getMessage (), e );
112+ private void log (String accessLog , AccessLogData accessLogData ) {
113+ Set <AccessLogData > logSet = logEntries .computeIfAbsent (accessLog , k -> new ConcurrentHashSet <>());
114+
115+ if (logSet .size () < LOG_MAX_BUFFER ) {
116+ logSet .add (accessLogData );
117+ } else {
118+ //TODO we needs use force writing to file so that buffer gets clear and new log can be written.
119+ logger .warn ("AccessLog buffer is full skipping buffer " );
120+ }
121+ }
122+
123+ private void writeLogToFile () {
124+ if (!logEntries .isEmpty ()) {
125+ for (Map .Entry <String , Set <AccessLogData >> entry : logEntries .entrySet ()) {
126+ try {
127+ String accessLog = entry .getKey ();
128+ Set <AccessLogData > logSet = entry .getValue ();
129+ if (ConfigUtils .isDefault (accessLog )) {
130+ processWithServiceLogger (logSet );
131+ } else {
132+ File file = new File (accessLog );
133+ createIfLogDirAbsent (file );
134+ if (logger .isDebugEnabled ()) {
135+ logger .debug ("Append log to " + accessLog );
199136 }
137+ renameFile (file );
138+ processWithAccessKeyLogger (logSet , file );
200139 }
140+
141+ } catch (Exception e ) {
142+ logger .error (e .getMessage (), e );
201143 }
202- } catch (Exception e ) {
203- logger .error (e .getMessage (), e );
204144 }
205145 }
206146 }
207147
148+ private void processWithAccessKeyLogger (Set <AccessLogData > logSet , File file ) throws IOException {
149+ try (FileWriter writer = new FileWriter (file , true )) {
150+ for (Iterator <AccessLogData > iterator = logSet .iterator ();
151+ iterator .hasNext ();
152+ iterator .remove ()) {
153+ writer .write (iterator .next ().getLogMessage ());
154+ writer .write ("\r \n " );
155+ }
156+ writer .flush ();
157+ }
158+ }
159+
160+ private AccessLogData buildAccessLogData (Invoker <?> invoker , Invocation inv ) {
161+ RpcContext context = RpcContext .getContext ();
162+ AccessLogData logData = AccessLogData .newLogData ();
163+ logData .setServiceName (invoker .getInterface ().getName ());
164+ logData .setMethodName (inv .getMethodName ());
165+ logData .setVersion (invoker .getUrl ().getParameter (Constants .VERSION_KEY ));
166+ logData .setGroup (invoker .getUrl ().getParameter (Constants .GROUP_KEY ));
167+ logData .setInvocationTime (new Date ());
168+ logData .setTypes (inv .getParameterTypes ());
169+ logData .setArguments (inv .getArguments ());
170+ return logData ;
171+ }
172+
173+ private void processWithServiceLogger (Set <AccessLogData > logSet ) {
174+ for (Iterator <AccessLogData > iterator = logSet .iterator ();
175+ iterator .hasNext ();
176+ iterator .remove ()) {
177+ AccessLogData logData = iterator .next ();
178+ LoggerFactory .getLogger (ACCESS_LOG_KEY + "." + logData .getServiceName ()).info (logData .getLogMessage ());
179+ }
180+ }
181+
182+ private void createIfLogDirAbsent (File file ) {
183+ File dir = file .getParentFile ();
184+ if (null != dir && !dir .exists ()) {
185+ dir .mkdirs ();
186+ }
187+ }
188+
189+ private void renameFile (File file ) {
190+ if (file .exists ()) {
191+ String now = FILE_NAME_FORMATTER .format (new Date ());
192+ String last = FILE_NAME_FORMATTER .format (new Date (file .lastModified ()));
193+ if (!now .equals (last )) {
194+ File archive = new File (file .getAbsolutePath () + "." + last );
195+ file .renameTo (archive );
196+ }
197+ }
198+ }
208199}
0 commit comments