Skip to content

Commit 86ea861

Browse files
fix(serverv2/cometbft): properly decode and route gRPC transactions (#20808)
Co-authored-by: marbar3778 <marbar3778@yahoo.com> Co-authored-by: Marko <marko@baricevic.me>
1 parent 511fb07 commit 86ea861

7 files changed

Lines changed: 84 additions & 44 deletions

File tree

runtime/v2/app.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"encoding/json"
55
"errors"
66

7+
gogoproto "github.com/cosmos/gogoproto/proto"
78
"golang.org/x/exp/slices"
89

910
runtimev2 "cosmossdk.io/api/cosmos/app/runtime/v2"
@@ -44,6 +45,10 @@ type App[T transaction.Tx] struct {
4445
interfaceRegistrar registry.InterfaceRegistrar
4546
amino legacy.Amino
4647
moduleManager *MM[T]
48+
49+
// GRPCQueryDecoders maps gRPC method name to a function that decodes the request
50+
// bytes into a gogoproto.Message, which then can be passed to appmanager.
51+
GRPCQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error)
4752
}
4853

4954
// Logger returns the app logger.
@@ -109,3 +114,7 @@ func (a *App[T]) ExecuteGenesisTx(_ []byte) error {
109114
func (a *App[T]) GetAppManager() *appmanager.AppManager[T] {
110115
return a.AppManager
111116
}
117+
118+
func (a *App[T]) GetGRPCQueryDecoders() map[string]func(requestBytes []byte) (gogoproto.Message, error) {
119+
return a.GRPCQueryDecoders
120+
}

runtime/v2/manager.go

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"errors"
77
"fmt"
8+
"reflect"
89
"sort"
910

1011
gogoproto "github.com/cosmos/gogoproto/proto"
@@ -555,17 +556,28 @@ func (m *MM[T]) assertNoForgottenModules(
555556

556557
func registerServices[T transaction.Tx](s appmodule.HasServices, app *App[T], registry *protoregistry.Files) error {
557558
c := &configurator{
558-
stfQueryRouter: app.queryRouterBuilder,
559-
stfMsgRouter: app.msgRouterBuilder,
560-
registry: registry,
561-
err: nil,
559+
grpcQueryDecoders: map[string]func([]byte) (gogoproto.Message, error){},
560+
stfQueryRouter: app.queryRouterBuilder,
561+
stfMsgRouter: app.msgRouterBuilder,
562+
registry: registry,
563+
err: nil,
562564
}
563-
return s.RegisterServices(c)
565+
566+
err := s.RegisterServices(c)
567+
if err != nil {
568+
return fmt.Errorf("unable to register services: %w", err)
569+
}
570+
app.GRPCQueryDecoders = c.grpcQueryDecoders
571+
return nil
564572
}
565573

566574
var _ grpc.ServiceRegistrar = (*configurator)(nil)
567575

568576
type configurator struct {
577+
// grpcQueryDecoders is required because module expose queries through gRPC
578+
// this provides a way to route to modules using gRPC.
579+
grpcQueryDecoders map[string]func([]byte) (gogoproto.Message, error)
580+
569581
stfQueryRouter *stf.MsgRouterBuilder
570582
stfMsgRouter *stf.MsgRouterBuilder
571583
registry *protoregistry.Files
@@ -596,17 +608,28 @@ func (c *configurator) RegisterService(sd *grpc.ServiceDesc, ss interface{}) {
596608
func (c *configurator) registerQueryHandlers(sd *grpc.ServiceDesc, ss interface{}) error {
597609
for _, md := range sd.Methods {
598610
// TODO(tip): what if a query is not deterministic?
599-
err := registerMethod(c.stfQueryRouter, sd, md, ss)
611+
requestFullName, err := registerMethod(c.stfQueryRouter, sd, md, ss)
600612
if err != nil {
601613
return fmt.Errorf("unable to register query handler %s: %w", md.MethodName, err)
602614
}
615+
616+
// register gRPC query method.
617+
typ := gogoproto.MessageType(requestFullName)
618+
if typ == nil {
619+
return fmt.Errorf("unable to find message in gogotype registry: %w", err)
620+
}
621+
decoderFunc := func(bytes []byte) (gogoproto.Message, error) {
622+
msg := reflect.New(typ.Elem()).Interface().(gogoproto.Message)
623+
return msg, gogoproto.Unmarshal(bytes, msg)
624+
}
625+
c.grpcQueryDecoders[md.MethodName] = decoderFunc
603626
}
604627
return nil
605628
}
606629

607630
func (c *configurator) registerMsgHandlers(sd *grpc.ServiceDesc, ss interface{}) error {
608631
for _, md := range sd.Methods {
609-
err := registerMethod(c.stfMsgRouter, sd, md, ss)
632+
_, err := registerMethod(c.stfMsgRouter, sd, md, ss)
610633
if err != nil {
611634
return fmt.Errorf("unable to register msg handler %s: %w", md.MethodName, err)
612635
}
@@ -633,13 +656,13 @@ func registerMethod(
633656
sd *grpc.ServiceDesc,
634657
md grpc.MethodDesc,
635658
ss interface{},
636-
) error {
659+
) (string, error) {
637660
requestName, err := requestFullNameFromMethodDesc(sd, md)
638661
if err != nil {
639-
return err
662+
return "", err
640663
}
641664

642-
return stfRouter.RegisterHandler(string(requestName), func(
665+
return string(requestName), stfRouter.RegisterHandler(string(requestName), func(
643666
ctx context.Context,
644667
msg appmodulev2.Message,
645668
) (resp appmodulev2.Message, err error) {

server/v2/cometbft/abci.go

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
abci "github.com/cometbft/cometbft/abci/types"
1111
abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1"
12+
gogoproto "github.com/cosmos/gogoproto/proto"
1213

1314
coreappmgr "cosmossdk.io/core/app"
1415
"cosmossdk.io/core/comet"
@@ -31,6 +32,9 @@ import (
3132
var _ abci.Application = (*Consensus[transaction.Tx])(nil)
3233

3334
type Consensus[T transaction.Tx] struct {
35+
// legacy support for gRPC
36+
grpcQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error)
37+
3438
app *appmanager.AppManager[T]
3539
cfg Config
3640
store types.Store
@@ -56,18 +60,28 @@ type Consensus[T transaction.Tx] struct {
5660
func NewConsensus[T transaction.Tx](
5761
app *appmanager.AppManager[T],
5862
mp mempool.Mempool[T],
63+
grpcQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error),
5964
store types.Store,
6065
cfg Config,
6166
txCodec transaction.Codec[T],
6267
logger log.Logger,
6368
) *Consensus[T] {
6469
return &Consensus[T]{
65-
mempool: mp,
66-
store: store,
67-
app: app,
68-
cfg: cfg,
69-
txCodec: txCodec,
70-
logger: logger,
70+
grpcQueryDecoders: grpcQueryDecoders,
71+
app: app,
72+
cfg: cfg,
73+
store: store,
74+
logger: logger,
75+
txCodec: txCodec,
76+
streaming: streaming.Manager{},
77+
snapshotManager: nil,
78+
mempool: mp,
79+
lastCommittedHeight: atomic.Int64{},
80+
prepareProposalHandler: nil,
81+
processProposalHandler: nil,
82+
verifyVoteExt: nil,
83+
extendVote: nil,
84+
chainID: "",
7185
}
7286
}
7387

@@ -150,18 +164,16 @@ func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abc
150164

151165
// Query implements types.Application.
152166
// It is called by cometbft to query application state.
153-
func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (*abciproto.QueryResponse, error) {
154-
// follow the query path from here
155-
decodedMsg, err := c.txCodec.Decode(req.Data)
156-
protoMsg, ok := any(decodedMsg).(transaction.Msg)
157-
if !ok {
158-
return nil, fmt.Errorf("decoded type T %T must implement core/transaction.Msg", decodedMsg)
159-
}
160-
161-
// if no error is returned then we can handle the query with the appmanager
162-
// otherwise it is a KV store query
163-
if err == nil {
164-
res, err := c.app.Query(ctx, uint64(req.Height), protoMsg)
167+
func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (resp *abciproto.QueryResponse, err error) {
168+
// check if it's a gRPC method
169+
grpcQueryDecoder, isGRPC := c.grpcQueryDecoders[req.Path]
170+
if isGRPC {
171+
protoRequest, err := grpcQueryDecoder(req.Data)
172+
if err != nil {
173+
return nil, fmt.Errorf("unable to decode gRPC request with path %s from ABCI.Query: %w", req.Path, err)
174+
}
175+
res, err := c.app.Query(ctx, uint64(req.Height), protoRequest)
176+
165177
if err != nil {
166178
resp := queryResult(err)
167179
resp.Height = req.Height
@@ -179,8 +191,6 @@ func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (
179191
return QueryResult(errorsmod.Wrap(cometerrors.ErrUnknownRequest, "no query path provided"), c.cfg.Trace), nil
180192
}
181193

182-
var resp *abciproto.QueryResponse
183-
184194
switch path[0] {
185195
case cmtservice.QueryPathApp:
186196
resp, err = c.handlerQueryApp(ctx, path, req)
@@ -391,7 +401,7 @@ func (c *Consensus[T]) FinalizeBlock(
391401
// ProposerAddress: req.ProposerAddress,
392402
// LastCommit: req.DecidedLastCommit,
393403
// },
394-
//}
404+
// }
395405
//
396406
// ctx = context.WithValue(ctx, corecontext.CometInfoKey, &comet.Info{
397407
// Evidence: sdktypes.ToSDKEvidence(req.Misbehavior),

server/v2/cometbft/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func (s *CometBFTServer[T]) Init(appI serverv2.AppI[T], v *viper.Viper, logger l
6060

6161
// create consensus
6262
store := appI.GetStore().(types.Store)
63-
consensus := NewConsensus[T](appI.GetAppManager(), s.options.Mempool, store, s.config, s.initTxCodec, s.logger)
63+
consensus := NewConsensus[T](appI.GetAppManager(), s.options.Mempool, appI.GetGRPCQueryDecoders(), store, s.config, s.initTxCodec, s.logger)
6464

6565
consensus.prepareProposalHandler = s.options.PrepareProposalHandler
6666
consensus.processProposalHandler = s.options.ProcessProposalHandler

server/v2/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package serverv2
22

33
import (
4+
gogoproto "github.com/cosmos/gogoproto/proto"
45
"github.com/spf13/viper"
56

67
coreapp "cosmossdk.io/core/app"
@@ -15,5 +16,6 @@ type AppI[T transaction.Tx] interface {
1516
GetAppManager() *appmanager.AppManager[T]
1617
GetConsensusAuthority() string
1718
InterfaceRegistry() coreapp.InterfaceRegistry
19+
GetGRPCQueryDecoders() map[string]func(requestBytes []byte) (gogoproto.Message, error)
1820
GetStore() any
1921
}

simapp/v2/app_di.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,6 @@ func NewSimApp[T transaction.Tx](
224224
if err := app.LoadLatest(); err != nil {
225225
panic(err)
226226
}
227-
228227
return app
229228
}
230229

simapp/v2/go.mod

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ go 1.22.2
44

55
require (
66
cosmossdk.io/api v0.7.5
7-
cosmossdk.io/collections v0.4.0 // indirect
7+
cosmossdk.io/client/v2 v2.0.0-00010101000000-000000000000
88
cosmossdk.io/core v0.12.1-0.20231114100755-569e3ff6a0d7
99
cosmossdk.io/depinject v1.0.0-alpha.4
1010
cosmossdk.io/log v1.3.1
@@ -13,6 +13,7 @@ require (
1313
cosmossdk.io/server/v2 v2.0.0-00010101000000-000000000000
1414
cosmossdk.io/server/v2/cometbft v0.0.0-00010101000000-000000000000
1515
cosmossdk.io/store/v2 v2.0.0
16+
cosmossdk.io/tools/confix v0.0.0-00010101000000-000000000000
1617
cosmossdk.io/x/accounts v0.0.0-20240226161501-23359a0b6d91
1718
cosmossdk.io/x/auth v0.0.0-00010101000000-000000000000
1819
cosmossdk.io/x/authz v0.0.0-00010101000000-000000000000
@@ -29,28 +30,18 @@ require (
2930
cosmossdk.io/x/protocolpool v0.0.0-20230925135524-a1bc045b3190
3031
cosmossdk.io/x/slashing v0.0.0-00010101000000-000000000000
3132
cosmossdk.io/x/staking v0.0.0-00010101000000-000000000000
32-
cosmossdk.io/x/tx v0.13.3 // indirect
3333
cosmossdk.io/x/upgrade v0.0.0-20230613133644-0a778132a60f
3434
github.com/cometbft/cometbft v1.0.0-rc1
3535
github.com/cosmos/cosmos-db v1.0.2
3636
// this version is not used as it is always replaced by the latest Cosmos SDK version
3737
github.com/cosmos/cosmos-sdk v0.51.0
38-
github.com/cosmos/gogoproto v1.5.0 // indirect
39-
github.com/golang/mock v1.6.0 // indirect
40-
github.com/spf13/cast v1.6.0 // indirect
4138
github.com/spf13/cobra v1.8.1
4239
github.com/spf13/pflag v1.0.5
4340
github.com/spf13/viper v1.19.0
4441
github.com/stretchr/testify v1.9.0
45-
golang.org/x/sync v0.7.0 // indirect
4642
google.golang.org/protobuf v1.34.2
4743
)
4844

49-
require (
50-
cosmossdk.io/client/v2 v2.0.0-00010101000000-000000000000
51-
cosmossdk.io/tools/confix v0.0.0-00010101000000-000000000000
52-
)
53-
5445
require (
5546
buf.build/gen/go/cometbft/cometbft/protocolbuffers/go v1.34.2-20240701160653-fedbb9acfd2f.2 // indirect
5647
buf.build/gen/go/cosmos/gogo-proto/protocolbuffers/go v1.34.2-20240130113600-88ef6483f90f.2 // indirect
@@ -60,6 +51,7 @@ require (
6051
cloud.google.com/go/compute/metadata v0.3.0 // indirect
6152
cloud.google.com/go/iam v1.1.8 // indirect
6253
cloud.google.com/go/storage v1.42.0 // indirect
54+
cosmossdk.io/collections v0.4.0 // indirect
6355
cosmossdk.io/core/testing v0.0.0-00010101000000-000000000000 // indirect
6456
cosmossdk.io/errors v1.0.1 // indirect
6557
cosmossdk.io/schema v0.1.1 // indirect
@@ -69,6 +61,7 @@ require (
6961
cosmossdk.io/x/accounts/defaults/lockup v0.0.0-20240417181816-5e7aae0db1f5 // indirect
7062
cosmossdk.io/x/accounts/defaults/multisig v0.0.0-00010101000000-000000000000 // indirect
7163
cosmossdk.io/x/epochs v0.0.0-20240522060652-a1ae4c3e0337 // indirect
64+
cosmossdk.io/x/tx v0.13.3 // indirect
7265
filippo.io/edwards25519 v1.1.0 // indirect
7366
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
7467
github.com/99designs/keyring v1.2.2 // indirect
@@ -97,6 +90,7 @@ require (
9790
github.com/cosmos/crypto v0.1.1 // indirect
9891
github.com/cosmos/go-bip39 v1.0.0 // indirect
9992
github.com/cosmos/gogogateway v1.2.0 // indirect
93+
github.com/cosmos/gogoproto v1.5.0 // indirect
10094
github.com/cosmos/iavl v1.2.0 // indirect
10195
github.com/cosmos/ics23/go v0.10.0 // indirect
10296
github.com/cosmos/ledger-cosmos-go v0.13.3 // indirect
@@ -125,6 +119,7 @@ require (
125119
github.com/gogo/protobuf v1.3.2 // indirect
126120
github.com/golang/glog v1.2.0 // indirect
127121
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
122+
github.com/golang/mock v1.6.0 // indirect
128123
github.com/golang/protobuf v1.5.4 // indirect
129124
github.com/golang/snappy v0.0.4 // indirect
130125
github.com/google/btree v1.1.2 // indirect
@@ -200,6 +195,7 @@ require (
200195
github.com/sasha-s/go-deadlock v0.3.1 // indirect
201196
github.com/sourcegraph/conc v0.3.0 // indirect
202197
github.com/spf13/afero v1.11.0 // indirect
198+
github.com/spf13/cast v1.6.0 // indirect
203199
github.com/subosito/gotenv v1.6.0 // indirect
204200
github.com/supranational/blst v0.3.12 // indirect
205201
github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect
@@ -223,6 +219,7 @@ require (
223219
golang.org/x/mod v0.17.0 // indirect
224220
golang.org/x/net v0.27.0 // indirect
225221
golang.org/x/oauth2 v0.21.0 // indirect
222+
golang.org/x/sync v0.7.0 // indirect
226223
golang.org/x/sys v0.22.0 // indirect
227224
golang.org/x/term v0.22.0 // indirect
228225
golang.org/x/text v0.16.0 // indirect

0 commit comments

Comments
 (0)