Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
<resheader name="writer">
<value>System.Resources.ResXResourceWriter, System.Windows.Forms, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089</value>
</resheader>
<data name="CallToolHandler_ToolInvalid" xml:space="preserve">
<value>The tool '{0}' is currently unavailable.</value>
</data>
<data name="CallToolHandler_ToolNotFound" xml:space="preserve">
<value>The tool '{0}' was not found.</value>
</data>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,21 @@ public static async ValueTask<CallToolResult> HandleAsync(
};
}

if (!tool.HasValidDocument)
{
return new CallToolResult
{
Content =
[
new TextContentBlock
{
Text = string.Format(CallToolHandler_ToolInvalid, context.Params.Name)
}
],
IsError = true
};
}

var requestExecutor = services.GetRequiredService<IRequestExecutor>();
var rootServiceProvider = services.GetRequiredService<IRootServiceProviderAccessor>().ServiceProvider;
var httpContext = rootServiceProvider.GetRequiredService<IHttpContextAccessor>().HttpContext!;
Expand Down
233 changes: 153 additions & 80 deletions src/HotChocolate/Adapters/src/Adapters.Mcp.Core/McpStorageObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ public async Task StartAsync(CancellationToken cancellationToken)
_promptsSubscription = _storage
.Buffer<PromptStorageEventArgs>(TimeSpan.FromMilliseconds(500), 10)
.Where(batch => batch.Count > 0)
.Subscribe(onNext: ProcessBatch);
.Subscribe(_ => HandlePromptsChangedAsync().FireAndForget());

_toolsSubscription = _storage
.Buffer<OperationToolStorageEventArgs>(TimeSpan.FromMilliseconds(500), 10)
.Where(batch => batch.Count > 0)
.Subscribe(onNext: ProcessBatch);
.Subscribe(_ => HandleToolsChangedAsync().FireAndForget());
Comment thread
glen-84 marked this conversation as resolved.
Outdated
Comment thread
glen-84 marked this conversation as resolved.
Outdated

try
{
Expand All @@ -83,134 +83,207 @@ await Task.WhenAll(
}
finally
{
_semaphore.Release();
try
{
_semaphore.Release();
}
catch (ObjectDisposedException)
{
// Dispose() may have raced ahead and disposed the semaphore.
}
}
}

private async Task InitializePromptsAsync(CancellationToken cancellationToken)
{
var prompts = ImmutableDictionary.CreateBuilder<string, (Prompt, ImmutableArray<PromptMessage>)>();
using var scope = _diagnosticEvents.InitializePrompts();

foreach (var promptDefinition in await _storage.GetPromptDefinitionsAsync(cancellationToken))
{
var prompt = PromptFactory.CreatePrompt(promptDefinition);
prompts.Add(promptDefinition.Name, prompt);
}

_prompts = prompts.ToImmutable();
_registry.UpdatePrompts(_prompts);
await RebuildPromptsAsync(cancellationToken);
}

private async Task InitializeToolsAsync(CancellationToken cancellationToken)
{
var tools = ImmutableDictionary.CreateBuilder<string, OperationTool>();
using var scope = _diagnosticEvents.InitializeTools();
await RebuildToolsAsync(cancellationToken);
}

foreach (var toolDefinition in await _storage.GetOperationToolDefinitionsAsync(cancellationToken))
private async Task HandlePromptsChangedAsync()
{
if (_disposed)
{
var validationResult = s_documentValidator.Validate(_schema, toolDefinition.Document);
return;
}

if (validationResult.HasErrors)
try
{
await _semaphore.WaitAsync(_ct);
}
catch (OperationCanceledException)
{
return;
}
catch (ObjectDisposedException)
{
return;
}

var rebuildSucceeded = false;
try
{
if (_disposed)
{
_diagnosticEvents.ValidationErrors(validationResult.Errors);
continue;
return;
}

tools.Add(toolDefinition.Name, _toolFactory.CreateTool(toolDefinition));
using var scope = _diagnosticEvents.UpdatePrompts();
await RebuildPromptsAsync(_ct);
rebuildSucceeded = true;
}
catch
{
// Ignore unexpected exceptions while processing updates.
Comment thread
glen-84 marked this conversation as resolved.
}
finally
{
try
{
_semaphore.Release();
}
catch (ObjectDisposedException)
{
}
}

_tools = tools.ToImmutable();
_registry.UpdateTools(_tools);
if (!rebuildSucceeded)
{
return;
}

foreach (var mcpServer in _mcpServers.Values)
{
mcpServer.SendNotificationAsync(PromptListChangedNotification, cancellationToken: _ct).FireAndForget();
}
}

private void ProcessBatch(IList<PromptStorageEventArgs> eventArgs)
private async Task HandleToolsChangedAsync()
{
_semaphore.Wait(_ct);
if (_disposed)
{
return;
}

try
{
foreach (var eventArg in eventArgs)
await _semaphore.WaitAsync(_ct);
}
catch (OperationCanceledException)
{
return;
}
catch (ObjectDisposedException)
{
return;
}

var rebuildSucceeded = false;
try
{
if (_disposed)
{
switch (eventArg.Type)
{
case PromptStorageEventType.Updated:
using (_diagnosticEvents.UpdatePrompts())
{
var prompt = PromptFactory.CreatePrompt(eventArg.PromptDefinition!);
_prompts = _prompts.SetItem(eventArg.Name, prompt);
break;
}

case PromptStorageEventType.Removed:
_prompts = _prompts.Remove(eventArg.Name);
break;

default:
throw new ArgumentOutOfRangeException();
}
return;
}

_registry.UpdatePrompts(_prompts);
using var scope = _diagnosticEvents.UpdateTools();
await RebuildToolsAsync(_ct);
rebuildSucceeded = true;
}
catch
{
// Ignore unexpected exceptions while processing updates.
}
finally
{
_semaphore.Release();
try
{
_semaphore.Release();
}
catch (ObjectDisposedException)
{
}
}

if (!rebuildSucceeded)
{
return;
}

foreach (var mcpServer in _mcpServers.Values)
{
mcpServer.SendNotificationAsync(PromptListChangedNotification, cancellationToken: _ct).FireAndForget();
mcpServer.SendNotificationAsync(ToolListChangedNotification, cancellationToken: _ct).FireAndForget();
}
}

private void ProcessBatch(IList<OperationToolStorageEventArgs> eventArgs)
private async Task RebuildPromptsAsync(CancellationToken cancellationToken)
{
_semaphore.Wait(_ct);
var prompts = ImmutableDictionary.CreateBuilder<string, (Prompt, ImmutableArray<PromptMessage>)>();

try
foreach (var promptDefinition in await _storage.GetPromptDefinitionsAsync(cancellationToken))
{
foreach (var eventArg in eventArgs)
// When multiple definitions share a name (e.g. across collections published to the
// same stage), the first one wins in storage iteration order.
if (prompts.ContainsKey(promptDefinition.Name))
{
switch (eventArg.Type)
{
case OperationToolStorageEventType.Updated:
using (_diagnosticEvents.UpdateTools())
{
var validationResult =
s_documentValidator.Validate(_schema, eventArg.ToolDefinition!.Document);

if (validationResult.HasErrors)
{
_diagnosticEvents.ValidationErrors(validationResult.Errors);
continue;
}

var tool = _toolFactory.CreateTool(eventArg.ToolDefinition!);
_tools = _tools.SetItem(eventArg.Name, tool);
break;
}

case OperationToolStorageEventType.Removed:
_tools = _tools.Remove(eventArg.Name);
break;

default:
throw new ArgumentOutOfRangeException();
}
continue;
}

_registry.UpdateTools(_tools);
var prompt = PromptFactory.CreatePrompt(promptDefinition);
prompts.Add(promptDefinition.Name, prompt);
}
finally

_prompts = prompts.ToImmutable();
_registry.UpdatePrompts(_prompts);
}

private async Task RebuildToolsAsync(CancellationToken cancellationToken)
{
var tools = ImmutableDictionary.CreateBuilder<string, OperationTool>();
var invalidFallbacks = new Dictionary<string, OperationToolDefinition>(StringComparer.Ordinal);

foreach (var toolDefinition in await _storage.GetOperationToolDefinitionsAsync(cancellationToken))
{
_semaphore.Release();
// When multiple definitions share a name (e.g. across collections published to
// the same stage), the first valid one wins. A later valid duplicate is skipped
// here; an earlier invalid duplicate is overridden once the valid one arrives.
if (tools.ContainsKey(toolDefinition.Name))
{
continue;
}

var validationResult = s_documentValidator.Validate(_schema, toolDefinition.Document);

if (validationResult.HasErrors)
{
_diagnosticEvents.ValidationErrors(validationResult.Errors);
// Remember the first invalid definition per name as a fallback. It is only
// surfaced if no valid duplicate exists.
invalidFallbacks.TryAdd(toolDefinition.Name, toolDefinition);
continue;
}

tools.Add(toolDefinition.Name, _toolFactory.CreateTool(toolDefinition));
}

foreach (var mcpServer in _mcpServers.Values)
// For names with no valid definition at all, surface the first invalid one so calls
// to it return the "unavailable" error rather than tool-not-found.
foreach (var (name, definition) in invalidFallbacks)
{
mcpServer.SendNotificationAsync(ToolListChangedNotification, cancellationToken: _ct).FireAndForget();
if (!tools.ContainsKey(name))
{
tools.Add(name, OperationToolFactory.CreateInvalidTool(definition));
}
}

_tools = tools.ToImmutable();
_registry.UpdateTools(_tools);
}

public void Dispose()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,10 @@ internal sealed class OperationTool(DocumentNode documentNode, Tool tool)
public Resource? ViewResource { get; init; }

public string? ViewHtml { get; init; }

/// <summary>
/// True when the tool's document validates against the current schema. Invalid tools are
/// still listed (so consumers see they exist) but calls to them must be rejected.
/// </summary>
public bool HasValidDocument { get; init; } = true;
}
Loading
Loading