Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dubbo-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</dependency>
<dependency>
Comment thread
khanimteyaz marked this conversation as resolved.
Outdated
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.common.utils;

import org.apache.commons.lang3.time.FastDateFormat;

import java.util.Date;

/**
* This class is utility to provide dubbo date formatting and parsing.
*/
public final class DateUtil {

private DateUtil() {

};

/**
* This method used to return a formatted string of a given date object.
* @param date Input data object
* @param format format of data.
* @return
*/
public static String format(Date date,String format) {
Assert.notNull(date,"Given date can't be null");
Assert.notEmptyString(format,"Given date format can't be null or empty");
Comment thread
khanimteyaz marked this conversation as resolved.
Outdated
return FastDateFormat.getInstance(format).format(date);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
*/
package org.apache.dubbo.rpc.filter;

import static org.apache.dubbo.common.utils.DateUtil.format;

import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.ArrayUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.ConfigUtils;
import org.apache.dubbo.common.utils.NamedThreadFactory;
Expand All @@ -30,12 +31,11 @@
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;

import com.alibaba.fastjson.JSON;
import org.apache.dubbo.rpc.support.AccessLogData;

import java.io.File;
import java.io.FileWriter;
import java.text.SimpleDateFormat;
import java.io.IOException;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
Expand All @@ -44,7 +44,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -68,141 +67,134 @@ public class AccessLogFilter implements Filter {

private static final String ACCESS_LOG_KEY = "dubbo.accesslog";

private static final String FILE_DATE_FORMAT = "yyyyMMdd";

private static final String MESSAGE_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";

private static final int LOG_MAX_BUFFER = 5000;

private static final long LOG_OUTPUT_INTERVAL = 5000;

private final ConcurrentMap<String, Set<String>> logQueue = new ConcurrentHashMap<String, Set<String>>();

private final ScheduledExecutorService logScheduled = Executors.newScheduledThreadPool(2, new NamedThreadFactory("Dubbo-Access-Log", true));
private static final String FILE_DATE_FORMAT = "yyyyMMdd";

private volatile ScheduledFuture<?> logFuture = null;
private static final ConcurrentMap<String, Set<AccessLogData>> logQueue = new ConcurrentHashMap<String, Set<AccessLogData>>();
Comment thread
khanimteyaz marked this conversation as resolved.
Outdated

private void init() {
if (logFuture == null) {
synchronized (logScheduled) {
if (logFuture == null) {
logFuture = logScheduled.scheduleWithFixedDelay(new LogTask(), LOG_OUTPUT_INTERVAL, LOG_OUTPUT_INTERVAL, TimeUnit.MILLISECONDS);
}
}
}
}
private static final ScheduledExecutorService logScheduled = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-Access-Log", true));

private void log(String accesslog, String logmessage) {
init();
Set<String> logSet = logQueue.get(accesslog);
if (logSet == null) {
logQueue.putIfAbsent(accesslog, new ConcurrentHashSet<String>());
logSet = logQueue.get(accesslog);
}
if (logSet.size() < LOG_MAX_BUFFER) {
logSet.add(logmessage);
}
/**
* Default constructor initialize demon thread for writing into access log file with names with access log key
* defined in url <b>accesslog</b>
*/
public AccessLogFilter() {
logScheduled.scheduleWithFixedDelay(this::writeLogToFile, LOG_OUTPUT_INTERVAL, LOG_OUTPUT_INTERVAL, TimeUnit.MILLISECONDS);
}

/**
* This method logs the access log for service method invocation call.
*
* @param invoker service
* @param inv Invocation service method.
* @return Result from service method.
* @throws RpcException
*/
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
try {
String accesslog = invoker.getUrl().getParameter(Constants.ACCESS_LOG_KEY);
if (ConfigUtils.isNotEmpty(accesslog)) {
RpcContext context = RpcContext.getContext();
String serviceName = invoker.getInterface().getName();
String version = invoker.getUrl().getParameter(Constants.VERSION_KEY);
String group = invoker.getUrl().getParameter(Constants.GROUP_KEY);
StringBuilder sn = new StringBuilder();
sn.append("[").append(new SimpleDateFormat(MESSAGE_DATE_FORMAT).format(new Date())).append("] ").append(context.getRemoteHost()).append(":").append(context.getRemotePort())
.append(" -> ").append(context.getLocalHost()).append(":").append(context.getLocalPort())
.append(" - ");
if (null != group && group.length() > 0) {
sn.append(group).append("/");
}
sn.append(serviceName);
if (null != version && version.length() > 0) {
sn.append(":").append(version);
}
sn.append(" ");
sn.append(inv.getMethodName());
sn.append("(");
Class<?>[] types = inv.getParameterTypes();
if (types != null && types.length > 0) {
boolean first = true;
for (Class<?> type : types) {
if (first) {
first = false;
} else {
sn.append(",");
}
sn.append(type.getName());
}
}
sn.append(") ");
Object[] args = inv.getArguments();
if (ArrayUtils.isNotEmpty(args)) {
sn.append(JSON.toJSONString(args));
}
String msg = sn.toString();
if (ConfigUtils.isDefault(accesslog)) {
LoggerFactory.getLogger(ACCESS_LOG_KEY + "." + invoker.getInterface().getName()).info(msg);
} else {
log(accesslog, msg);
}
String accessLogKey = invoker.getUrl().getParameter(Constants.ACCESS_LOG_KEY);
if (ConfigUtils.isNotEmpty(accessLogKey)) {
AccessLogData logData = buildAccessLogData(invoker, inv);
log(accessLogKey, logData);
}
} catch (Throwable t) {
logger.warn("Exception in AcessLogFilter of service(" + invoker + " -> " + inv + ")", t);
}
return invoker.invoke(inv);
}

private class LogTask implements Runnable {
@Override
public void run() {
try {
if (logQueue != null && logQueue.size() > 0) {
for (Map.Entry<String, Set<String>> entry : logQueue.entrySet()) {
try {
String accesslog = entry.getKey();
Set<String> logSet = entry.getValue();
File file = new File(accesslog);
File dir = file.getParentFile();
if (null != dir && !dir.exists()) {
dir.mkdirs();
}
if (logger.isDebugEnabled()) {
logger.debug("Append log to " + accesslog);
}
if (file.exists()) {
String now = new SimpleDateFormat(FILE_DATE_FORMAT).format(new Date());
String last = new SimpleDateFormat(FILE_DATE_FORMAT).format(new Date(file.lastModified()));
if (!now.equals(last)) {
File archive = new File(file.getAbsolutePath() + "." + last);
file.renameTo(archive);
}
}
FileWriter writer = new FileWriter(file, true);
try {
for (Iterator<String> iterator = logSet.iterator();
iterator.hasNext();
iterator.remove()) {
writer.write(iterator.next());
writer.write("\r\n");
}
writer.flush();
} finally {
writer.close();
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
private void log(String accessLog, AccessLogData accessLogData) {
Set<AccessLogData> logSet = logQueue.computeIfAbsent(accessLog, k -> new ConcurrentHashSet<>());

if (logSet.size() < LOG_MAX_BUFFER) {
logSet.add(accessLogData);
} else {
//TODO we needs use force writing to file so that buffer gets clear and new log can be written.
logger.warn("AccessLog buffer is full skipping buffer ");
}
}

private void writeLogToFile() {
if (!logQueue.isEmpty()) {
for (Map.Entry<String, Set<AccessLogData>> entry : logQueue.entrySet()) {
try {
String accessLog = entry.getKey();
Set<AccessLogData> logSet = entry.getValue();
if (ConfigUtils.isDefault(accessLog)) {
processWithServiceLogger(logSet);
} else {
File file = new File(accessLog);
createIfLogDirAbsent(file);
if (logger.isDebugEnabled()) {
logger.debug("Append log to " + accessLog);
}
renameFile(file);
processWithAccessKeyLogger(logSet, file);
}

} catch (Exception e) {
logger.error(e.getMessage(), e);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}

}
private void processWithAccessKeyLogger(Set<AccessLogData> logSet, File file) throws IOException {
FileWriter writer = new FileWriter(file, true);
try {
for (Iterator<AccessLogData> iterator = logSet.iterator();
iterator.hasNext();
iterator.remove()) {
Comment thread
khanimteyaz marked this conversation as resolved.
writer.write(iterator.next().getLogMessage());
writer.write("\r\n");
}
writer.flush();
} finally {
writer.close();
Comment thread
khanimteyaz marked this conversation as resolved.
Outdated
}
}

private AccessLogData buildAccessLogData(Invoker<?> invoker, Invocation inv) {
RpcContext context = RpcContext.getContext();
AccessLogData logData = AccessLogData.newLogData();
logData.setServiceName(invoker.getInterface().getName());
logData.setMethodName(inv.getMethodName());
logData.setVersion(invoker.getUrl().getParameter(Constants.VERSION_KEY));
logData.setGroup(invoker.getUrl().getParameter(Constants.GROUP_KEY));
logData.setInvocationTime(new Date());
logData.setTypes(inv.getParameterTypes());
logData.setArguments(inv.getArguments());
return logData;
}

private void processWithServiceLogger(Set<AccessLogData> logSet) {
for (Iterator<AccessLogData> iterator = logSet.iterator();
iterator.hasNext();
iterator.remove()) {
AccessLogData logData = iterator.next();
LoggerFactory.getLogger(ACCESS_LOG_KEY + "." + logData.getServiceName()).info(logData.getLogMessage());
}
}

private void createIfLogDirAbsent(File file) {
File dir = file.getParentFile();
if (null != dir && !dir.exists()) {
dir.mkdirs();
}
}

private void renameFile(File file) {
if (file.exists()) {
String now = format(new Date(), FILE_DATE_FORMAT);
String last = format(new Date(file.lastModified()), FILE_DATE_FORMAT);
if (!now.equals(last)) {
File archive = new File(file.getAbsolutePath() + "." + last);
file.renameTo(archive);
}
}
}
}
Loading