Skip to content

Commit 419d762

Browse files
Copilotchkr1011
andcommitted
Implement enhanced PowerShell cmdlets with async handler, WebSocket, TLS, and more features
Co-authored-by: chkr1011 <6939810+chkr1011@users.noreply.github.com>
1 parent 71140d1 commit 419d762

7 files changed

Lines changed: 305 additions & 20 deletions

Source/MQTTnet.PowerShell/Cmdlets/ConnectMqttSessionCmdlet.cs

Lines changed: 78 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@ public class ConnectMqttSessionCmdlet : PSCmdlet
1212
[Parameter]
1313
public string? ClientId { get; set; } = Guid.NewGuid().ToString();
1414

15-
[Parameter(Mandatory = true)]
16-
public required new string Host { get; set; }
15+
[Parameter(Mandatory = true, ParameterSetName = "Tcp")]
16+
[Parameter(Mandatory = true, ParameterSetName = "TcpWithTls")]
17+
public new string? Host { get; set; }
1718

1819
[Parameter]
1920
public string? Password { get; set; }
2021

21-
[Parameter]
22+
[Parameter(ParameterSetName = "Tcp")]
23+
[Parameter(ParameterSetName = "TcpWithTls")]
2224
public int Port { get; set; } = 1883;
2325

2426
[Parameter(Mandatory = true, ValueFromPipeline = true)]
@@ -27,19 +29,90 @@ public class ConnectMqttSessionCmdlet : PSCmdlet
2729
[Parameter]
2830
public string? Username { get; set; }
2931

30-
[Parameter]
32+
[Parameter(ParameterSetName = "TcpWithTls")]
3133
public SwitchParameter UseTls { get; set; }
3234

35+
[Parameter(ParameterSetName = "TcpWithTls")]
36+
public SwitchParameter AllowUntrustedCertificates { get; set; }
37+
38+
[Parameter(Mandatory = true, ParameterSetName = "WebSocket")]
39+
public string? WebSocketUri { get; set; }
40+
41+
[Parameter]
42+
public int KeepAlivePeriod { get; set; } = 15;
43+
44+
[Parameter]
45+
public int Timeout { get; set; } = 10;
46+
47+
[Parameter]
48+
public string? WillTopic { get; set; }
49+
50+
[Parameter]
51+
public string? WillPayload { get; set; }
52+
53+
[Parameter]
54+
public int WillQoS { get; set; } = 0;
55+
56+
[Parameter]
57+
public SwitchParameter WillRetain { get; set; }
58+
3359
protected override void ProcessRecord()
3460
{
3561
var clientOptionsBuilder = new MqttClientOptionsBuilder();
36-
clientOptionsBuilder.WithTcpServer(Host, Port);
62+
63+
// Configure transport
64+
if (ParameterSetName == "WebSocket")
65+
{
66+
clientOptionsBuilder.WithWebSocketServer(o => o.WithUri(WebSocketUri!));
67+
}
68+
else
69+
{
70+
clientOptionsBuilder.WithTcpServer(Host!, Port);
71+
72+
if (UseTls)
73+
{
74+
clientOptionsBuilder.WithTlsOptions(o =>
75+
{
76+
if (AllowUntrustedCertificates)
77+
{
78+
o.WithCertificateValidationHandler(_ => true);
79+
}
80+
});
81+
}
82+
}
3783

84+
// Configure authentication
3885
if (Username != null)
3986
{
4087
clientOptionsBuilder.WithCredentials(Username, Password);
4188
}
4289

90+
// Configure session
91+
clientOptionsBuilder.WithCleanSession(CleanSession);
92+
93+
if (!string.IsNullOrEmpty(ClientId))
94+
{
95+
clientOptionsBuilder.WithClientId(ClientId);
96+
}
97+
98+
// Configure keep alive
99+
clientOptionsBuilder.WithKeepAlivePeriod(TimeSpan.FromSeconds(KeepAlivePeriod));
100+
clientOptionsBuilder.WithTimeout(TimeSpan.FromSeconds(Timeout));
101+
102+
// Configure Will message
103+
if (!string.IsNullOrEmpty(WillTopic))
104+
{
105+
clientOptionsBuilder.WithWillTopic(WillTopic);
106+
107+
if (!string.IsNullOrEmpty(WillPayload))
108+
{
109+
clientOptionsBuilder.WithWillPayload(System.Text.Encoding.UTF8.GetBytes(WillPayload));
110+
}
111+
112+
clientOptionsBuilder.WithWillQualityOfServiceLevel((Protocol.MqttQualityOfServiceLevel)WillQoS);
113+
clientOptionsBuilder.WithWillRetain(WillRetain);
114+
}
115+
43116
var response = Session.GetClient().ConnectAsync(clientOptionsBuilder.Build()).GetAwaiter().GetResult();
44117

45118
WriteObject(response);
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System.Management.Automation;
6+
7+
namespace MQTTnet.PowerShell.Cmdlets;
8+
9+
[Cmdlet(VerbsCommon.Get, "MqttSessionStatus")]
10+
[OutputType(typeof(MqttClientConnectionStatus))]
11+
public class GetMqttSessionStatusCmdlet : PSCmdlet
12+
{
13+
[Parameter(Mandatory = true, ValueFromPipeline = true)]
14+
public required PsMqttSession Session { get; set; }
15+
16+
protected override void ProcessRecord()
17+
{
18+
var client = Session.GetClient();
19+
var status = new
20+
{
21+
IsConnected = client.IsConnected,
22+
Options = client.Options
23+
};
24+
25+
WriteObject(status);
26+
}
27+
}

Source/MQTTnet.PowerShell/Cmdlets/PublishMqttMessageCmdlet.cs

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
using System.Collections;
12
using System.Management.Automation;
23
using System.Text;
34
using MQTTnet.Protocol;
5+
using MQTTnet.Packets;
46

57
namespace MQTTnet.PowerShell.Cmdlets;
68

@@ -22,29 +24,44 @@ public class PublishMqttMessageCmdlet : PSCmdlet
2224
[Parameter(Mandatory = true)]
2325
public required string Topic { get; set; }
2426

27+
[Parameter]
2528
public string? ContentType { get; set; }
2629

30+
[Parameter]
2731
public string? ResponseTopic { get; set; }
2832

33+
[Parameter]
2934
public ushort TopicAlias { get; set; }
3035

36+
[Parameter]
3137
public uint MessageExpiryInterval { get; set; }
3238

39+
[Parameter]
40+
public Hashtable? UserProperties { get; set; }
41+
3342
protected override void ProcessRecord()
3443
{
35-
// if (Session == null || !Session.IsConnected)
36-
// throw new InvalidOperationException("Session not connected.");
37-
38-
var msg = new MqttApplicationMessageBuilder().WithTopic(Topic)
44+
var msgBuilder = new MqttApplicationMessageBuilder()
45+
.WithTopic(Topic)
3946
.WithPayload(Encoding.UTF8.GetBytes(Payload ?? string.Empty))
4047
.WithQualityOfServiceLevel((MqttQualityOfServiceLevel)QoS)
4148
.WithRetainFlag(Retain)
4249
.WithContentType(ContentType)
4350
.WithResponseTopic(ResponseTopic)
4451
.WithTopicAlias(TopicAlias)
45-
.WithMessageExpiryInterval(MessageExpiryInterval)
46-
.Build();
52+
.WithMessageExpiryInterval(MessageExpiryInterval);
53+
54+
// Add user properties if provided
55+
if (UserProperties != null && UserProperties.Count > 0)
56+
{
57+
foreach (var key in UserProperties.Keys)
58+
{
59+
var value = UserProperties[key]?.ToString() ?? string.Empty;
60+
msgBuilder.WithUserProperty(key.ToString()!, value);
61+
}
62+
}
4763

64+
var msg = msgBuilder.Build();
4865
var response = Session.GetClient().PublishAsync(msg).GetAwaiter().GetResult();
4966

5067
WriteObject(response);

Source/MQTTnet.PowerShell/Cmdlets/RegisterMqttMessageHandlerCmdlet.cs

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,43 @@ public class RegisterMqttMessageHandlerCmdlet : PSCmdlet
1414

1515
protected override void ProcessRecord()
1616
{
17-
EventHandler<PsMqttMessage> handler = (s, e) =>
17+
// Capture the ScriptBlock and current default runspace (available during cmdlet execution)
18+
var scriptBlock = Action ?? throw new ArgumentNullException(nameof(Action));
19+
var capturedRunspace = Runspace.DefaultRunspace ?? throw new InvalidOperationException("No default runspace available during cmdlet execution.");
20+
var capturedHost = Host;
21+
22+
EventHandler<PsMqttMessage> handler = (sender, e) =>
1823
{
19-
//throw new NotImplementedException();
20-
//InvokeCommand.InvokeScript(Action, false, PipelineResultTypes.Output, null, new object[] { e.Topic, e.Payload });
24+
// Run asynchronously to avoid blocking the event thread
25+
Task.Run(() =>
26+
{
27+
// Temporarily set the default runspace for this thread
28+
var originalRunspace = Runspace.DefaultRunspace;
29+
Runspace.DefaultRunspace = capturedRunspace;
30+
31+
try
32+
{
33+
// Invoke the ScriptBlock directly with arguments (returns Collection<PSObject>)
34+
var results = scriptBlock.Invoke(e.Topic, e.Payload, e); // Pass topic, payload, and full message
35+
36+
// Handle output manually (e.g., write to host/console)
37+
foreach (var result in results)
38+
{
39+
capturedHost.UI.WriteLine(result?.ToString() ?? string.Empty);
40+
}
41+
}
42+
catch (Exception ex)
43+
{
44+
// Handle errors by writing to host error stream
45+
capturedHost.UI.WriteErrorLine($"MQTT handler error: {ex.Message}");
46+
// Optionally: capturedHost.UI.WriteVerboseLine(ex.StackTrace); for more details
47+
}
48+
finally
49+
{
50+
// Restore the original default runspace for this thread
51+
Runspace.DefaultRunspace = originalRunspace;
52+
}
53+
});
2154
};
2255

2356
Session.MessageReceived += handler;
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System.Management.Automation;
6+
7+
namespace MQTTnet.PowerShell.Cmdlets;
8+
9+
[Cmdlet(VerbsLifecycle.Unregister, "MqttMessageHandler")]
10+
public class UnregisterMqttMessageHandlerCmdlet : PSCmdlet
11+
{
12+
[Parameter(Mandatory = true, ValueFromPipeline = true)]
13+
public required PsMqttSession Session { get; set; }
14+
15+
protected override void ProcessRecord()
16+
{
17+
// Clear all event handlers
18+
Session.ClearMessageHandlers();
19+
WriteObject($"All message handlers cleared for MQTT session.");
20+
}
21+
}

Source/MQTTnet.PowerShell/PsMqttSession.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,18 @@ public PsMqttSession()
3838

3939
public event EventHandler<PsMqttMessage>? MessageReceived;
4040

41+
public void ClearMessageHandlers()
42+
{
43+
// Remove all event subscribers
44+
if (MessageReceived != null)
45+
{
46+
foreach (var handler in MessageReceived.GetInvocationList())
47+
{
48+
MessageReceived -= (EventHandler<PsMqttMessage>)handler;
49+
}
50+
}
51+
}
52+
4153
public void Dispose()
4254
{
4355
_client.Dispose();

0 commit comments

Comments
 (0)