The Kafka API implementation bypasses Portask's native protocol and writes directly to Dragonfly storage, creating an architectural inconsistency.
Location: pkg/network/protocol.go
type PortaskProtocolHandler struct {
codecManager *serialization.CodecManager
storage storage.MessageStore
// ... other fields
}
// Portask Protocol Format:
// [4 bytes] - Magic Number (0x504F5254 = "PORT")
// [1 byte] - Protocol Version
// [1 byte] - Message Type
// [2 bytes] - Flags
// [4 bytes] - Payload Length
// [4 bytes] - CRC32 Checksum
// [N bytes] - PayloadMessage Flow:
Client (Portask Protocol)
↓
PortaskProtocolHandler
↓
CodecManager (Serialization)
↓
MessageStore (Dragonfly)
↓
Storage (Disk)
Location: pkg/kafka/handlers.go:212
func (h *KafkaProtocolHandler) handleProduce(request *KafkaRequest) []byte {
// ...
// ❌ PROBLEM: Direct storage write, bypassing Portask protocol
offset, err := h.messageStore.ProduceMessage(topic, partition, nil, messageSet)
// ...
}Message Flow:
Client (Kafka Protocol)
↓
KafkaProtocolHandler
↓
messageStore.ProduceMessage() ❌ BYPASSES PORTASK PROTOCOL
↓
Dragonfly (Direct Write)
↓
Storage (Disk)
-
Portask Protocol Layer
- Binary protocol framing
- Magic number validation
- Version control
- Message type handling
- CRC32 checksum verification
-
CodecManager
- Serialization abstraction
- Codec selection
- Message encoding/decoding
-
PortaskProtocolHandler Logic
- Connection handling
- Message processing pipeline
- Event hooks (OnConnect, OnDisconnect, OnMessage)
- Metrics and statistics
┌─────────────────────────────────────────────────────┐
│ INCONSISTENT ARCHITECTURE │
├─────────────────────────────────────────────────────┤
│ │
│ Native Portask API: │
│ ┌───────────┐ ┌──────────────┐ ┌──────────┐ │
│ │ Client │──▶│ Portask │──▶│ Dragonfly│ │
│ │ (Portask) │ │ Protocol │ │ │ │
│ └───────────┘ └──────────────┘ └──────────┘ │
│ ✅ Uses full protocol stack │
│ │
│ Kafka API: │
│ ┌───────────┐ ┌──────────┐ │
│ │ Client │─────────────────────▶│ Dragonfly│ │
│ │ (Kafka) │ ❌ BYPASSES │ │ │
│ └───────────┘ PROTOCOL └──────────┘ │
│ ❌ Direct storage access │
│ │
└─────────────────────────────────────────────────────┘
Messages written through Kafka API miss:
- ✅ Protocol validation
- ✅ Version management
- ✅ Message type classification
- ✅ Checksum verification
- ✅ Serialization abstraction
- ✅ Event hooks
- ✅ Unified metrics
- Two different code paths for same functionality
- Bug fixes must be applied in two places
- Features added to Portask protocol won't work for Kafka
- Testing complexity doubled
PortaskProtocolHandler tracks:
• totalMessages
• totalErrors
• avgProcessTime
Kafka API doesn't track these!
└─ Incomplete observability
Portask Protocol: [Magic][Version][Type][Flags][Length][CRC32][Payload]
└─ CRC32 validates data integrity
Kafka API: [Payload]
└─ No checksum validation ❌
Original Design Goal: Quick Kafka compatibility
Implementation Approach:
// Quick and dirty: Just implement Kafka wire protocol
type KafkaServer struct {
addr string
handler *KafkaProtocolHandler
store MessageStore // ❌ Direct storage access
}What Should Have Been:
// Proper integration: Use Portask protocol internally
type KafkaServer struct {
addr string
handler *KafkaProtocolHandler
portask *PortaskProtocolHandler // ✅ Use Portask protocol
}Convert Kafka messages to Portask protocol internally:
type KafkaToPortaskAdapter struct {
portaskHandler *PortaskProtocolHandler
codecManager *serialization.CodecManager
}
func (a *KafkaToPortaskAdapter) ProduceMessage(topic string, partition int32, key, value []byte) (int64, error) {
// 1. Convert Kafka message to Portask message
portaskMsg := &types.PortaskMessage{
ID: types.MessageID(generateID()),
Topic: types.TopicName(topic),
Partition: partition,
Key: string(key),
Payload: value,
Timestamp: time.Now().UnixNano(),
}
// 2. Encode using CodecManager
encoded, err := a.codecManager.Encode(portaskMsg)
if err != nil {
return 0, err
}
// 3. Build Portask protocol frame
frame := buildProtocolFrame(encoded)
// 4. Use PortaskProtocolHandler to process
return a.portaskHandler.ProcessMessage(frame)
}Benefits:
- ✅ Single code path
- ✅ All Portask features work
- ✅ Unified metrics
- ✅ Data integrity
- ✅ Easy maintenance
Changes Required:
1. Create pkg/kafka/portask_adapter.go
2. Modify pkg/kafka/server.go to use adapter
3. Update handlers.go to use adapter
4. Add tests for adapter
Make MessageStore aware of Portask protocol:
type MessageStore interface {
// New method: Process using Portask protocol
ProcessPortaskMessage(msg *PortaskMessage) error
// Kafka uses this internally
ProduceMessage(topic string, partition int32, key, value []byte) (int64, error)
}
// Implementation in DragonflyStore
func (d *DragonflyStore) ProduceMessage(topic string, partition int32, key, value []byte) (int64, error) {
// Convert to Portask message
portaskMsg := toPortaskMessage(topic, partition, key, value)
// Use ProcessPortaskMessage internally
return d.ProcessPortaskMessage(portaskMsg)
}Benefits:
- ✅ Minimal API changes
- ✅ Storage layer handles protocol
- ✅ Backward compatible
Drawbacks:
⚠️ Protocol logic mixed with storage⚠️ Harder to test⚠️ Less separation of concerns
Create a bridge layer that handles protocol conversion:
type ProtocolBridge struct {
portaskHandler *PortaskProtocolHandler
kafkaHandler *KafkaProtocolHandler
}
func (b *ProtocolBridge) HandleKafkaMessage(msg *KafkaMessage) error {
// Convert Kafka → Portask
portaskMsg := b.convertKafkaToPortask(msg)
// Process through Portask pipeline
return b.portaskHandler.ProcessMessage(portaskMsg)
}Benefits:
- ✅ Clean separation
- ✅ Easy to add more protocols (AMQP, etc.)
- ✅ Testable
- ✅ Flexible
Drawbacks:
⚠️ More code⚠️ Extra abstraction layer
Immediate Implementation:
-
Create Adapter (
pkg/kafka/portask_adapter.go)type PortaskAdapter struct { codecManager *serialization.CodecManager storage storage.MessageStore } func (a *PortaskAdapter) ProduceMessage(...) { // Convert to Portask format // Use CodecManager // Store with protocol metadata }
-
Update Kafka Handlers
func (h *KafkaProtocolHandler) handleProduce(...) { // OLD: h.messageStore.ProduceMessage(...) // NEW: h.portaskAdapter.ProduceMessage(...) }
-
Add Tests
- Test Kafka → Portask conversion
- Verify protocol metadata
- Check integrity
Timeline: 1-2 days
After adapter is working:
- Extract protocol bridge pattern
- Make extensible for future protocols
- Add comprehensive metrics
- Document architecture
Timeline: 1 week
func TestKafkaUsesPortaskProtocol(t *testing.T) {
// Send Kafka message
kafkaClient.Produce("test-topic", []byte("test"))
// Verify stored with Portask protocol
stored := dragonfly.Get("test-topic:...")
// Check for Portask protocol markers
assert.Contains(stored, PortaskMagicNumber)
assert.Contains(stored, ProtocolVersion)
assert.Contains(stored, CRC32Checksum)
}
func TestKafkaMetricsTracked(t *testing.T) {
before := portaskHandler.totalMessages
// Send via Kafka
kafkaClient.Produce("test-topic", []byte("test"))
// Verify Portask metrics incremented
after := portaskHandler.totalMessages
assert.Equal(before+1, after)
}Storage Layout (Inconsistent):
Native Portask Messages:
[Magic][Version][Type][Flags][Length][CRC32][SerializedData]
Kafka Messages:
[RawKafkaBytes] ❌ No protocol metadata
Storage Layout (Consistent):
All Messages (Native + Kafka):
[Magic][Version][Type][Flags][Length][CRC32][SerializedData]
✅ Unified format
✅ Data integrity
✅ Feature parity
Severity: 🔴 High
Reasons:
- Architectural inconsistency
- Data integrity concerns
- Maintenance burden
- Feature parity issues
- Observability gaps
Recommendation: Fix in next sprint
Current Implementation:
pkg/kafka/handlers.go:212- Direct storage writepkg/kafka/server.go- Server setuppkg/network/protocol.go- Portask protocol
Files to Create:
pkg/kafka/portask_adapter.go- New adapterpkg/kafka/adapter_test.go- Adapter tests
Files to Modify:
pkg/kafka/handlers.go- Use adapterpkg/kafka/server.go- Initialize adapterpkg/kafka/protocol.go- Add helper methods
Current State: ❌ Kafka API bypasses Portask protocol
Desired State: ✅ Kafka API uses Portask protocol internally
Action Required: Implement adapter pattern (Option 1)
Expected Benefit:
- Unified architecture
- Data integrity
- Feature parity
- Easier maintenance
- Better observability
Discovered By: User observation
Date: October 7, 2025
Status: 🔴 Open - Needs Fix
Priority: High