Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ public void record(LogBufferRecord record) {
}
// since the subscribe() method auto-replays all existing logs, filter to just once this consumer probably
// hasn't seen
if (record.getTimestampMicros() < request.getLastSeenLogTimestamp()) {
if (record.getTimestampMicros() <= request.getLastSeenLogTimestamp()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a note, not really feedback or a request other than to consider whether or not we should change the API.

Interesting. The listener could miss log messages that occurred at the same timestamp, but I see how the alternative guarantees at least one duplicate message. Maybe a better API would count the log messages and you just provide an offset. (This is probably why Kafka uses an offset instead of timestamp for new subscriptions.)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that's fair. Dup messages were confusing me, and the comment made it appear that this was "filtering to [ones] this consumer probably hadn't seen" meant something other than "will definitely always dup the last message seen, and maybe more". @devinrsmith any thoughts? I think you touched this last?

I know we will sometimes have gaps since this is implemented as a ring buffer, and maybe we tacitly accept that?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "guarantees" around the log buffer API are sketchy at best - the consumer never knows if they actually missed log messages or not. I'm happy to "break" the current API w/ the change as Colin has here... all else equal, if the client wants to they can request timestamp-1, and potentially handle dubious de-duping logic if necessary themselves.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, so I'll revert this and fix the comment instead?

return;
}
// Note: we can't send record off-thread without doing a deepCopy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ public abstract class QueryConnectable<Self extends QueryConnectable<Self>> exte
@JsProperty(namespace = "dh.QueryInfo")
public static final String EVENT_CONNECT = "connect";

/**
* Removed in favor of a proper disconnect/reconnect. Event listeners should switch to the "disconnect" and
* "reconnect" events instead.
*/
@JsProperty(namespace = "dh.IdeConnection")
@Deprecated
public static final String HACK_CONNECTION_FAILURE = "hack-connection-failure";

private final List<IdeSession> sessions = new ArrayList<>();
Expand All @@ -66,8 +71,9 @@ public QueryConnectable() {

public abstract Promise<ConnectOptions> getConnectOptions();

@Deprecated
public void notifyConnectionError(ResponseStreamWrapper.Status status) {
if (notifiedConnectionError) {
if (notifiedConnectionError || !hasListeners(HACK_CONNECTION_FAILURE)) {
return;
}
notifiedConnectionError = true;
Expand All @@ -78,6 +84,8 @@ public void notifyConnectionError(ResponseStreamWrapper.Status status) {
"details", status.getDetails(),
"metadata", status.getMetadata()));
fireEvent(HACK_CONNECTION_FAILURE, event);
JsLog.warn(
"The event dh.IdeConnection.HACK_CONNECTION_FAILURE is deprecated and will be removed in a later release");
}

@Override
Expand All @@ -104,6 +112,20 @@ public String getServerUrl() {
throw new UnsupportedOperationException();
}

/**
* Internal method to permit delegating to some orchestration tool to see if this worker can be connected to yet.
*/
@JsIgnore
public Promise<Self> onReady() {
// noinspection unchecked
return Promise.resolve((Self) this);
}

/**
* Promise that resolves when this worker instance can be connected to, or rejects if it can't be used.
*
* @return A promise that resolves with this instance.
*/
public abstract Promise<Self> running();

@JsMethod
Expand Down
Loading