lsat: extract runtime info into interceptContext

To make the code easier to be reused for the upcoming stream
interceptor, all generic information about the interceptor's context
is extracted into a struct and the helper methods are adapted to
work with that struct.
pull/145/head
Oliver Gugger 4 years ago
parent 7b4eb6eac4
commit bcd92cead4
No known key found for this signature in database
GPG Key ID: 8E4256593F177720

@ -89,6 +89,15 @@ func NewInterceptor(lnd *lndclient.LndServices, store Store,
}
}
// interceptContext is a struct that contains all information about a call that
// is intercepted by the interceptor.
type interceptContext struct {
mainCtx context.Context
opts []grpc.CallOption
metadata *metadata.MD
token *Token
}
// UnaryInterceptor is an interceptor method that can be used directly by gRPC
// for unary calls. If the store contains a token, it is attached as credentials
// to every call before patching it through. The response error is also
@ -105,21 +114,54 @@ func (i *Interceptor) UnaryInterceptor(ctx context.Context, method string,
i.lock.Lock()
defer i.lock.Unlock()
addLsatCredentials := func(token *Token) error {
macaroon, err := token.PaidMacaroon()
if err != nil {
return err
}
opts = append(opts, grpc.PerRPCCredentials(
macaroons.NewMacaroonCredential(macaroon),
))
return nil
// Create the context that we'll use to initiate the real request. This
// contains the means to extract response headers and possibly also an
// auth token, if we already have paid for one.
iCtx, err := i.newInterceptContext(ctx, opts)
if err != nil {
return err
}
// Try executing the call now. If anything goes wrong, we only handle
// the LSAT error message that comes in the form of a gRPC status error.
rpcCtx, cancel := context.WithTimeout(ctx, i.callTimeout)
defer cancel()
err = invoker(rpcCtx, method, req, reply, cc, iCtx.opts...)
if !isPaymentRequired(err) {
return err
}
// Find out if we need to pay for a new token or perhaps resume
// a previously aborted payment.
err = i.handlePayment(iCtx)
if err != nil {
return err
}
// Execute the same request again, now with the LSAT
// token added as an RPC credential.
rpcCtx2, cancel2 := context.WithTimeout(ctx, i.callTimeout)
defer cancel2()
return invoker(rpcCtx2, method, req, reply, cc, iCtx.opts...)
}
// newInterceptContext creates the initial intercept context that can capture
// metadata from the server and sends the local token to the server if one
// already exists.
func (i *Interceptor) newInterceptContext(ctx context.Context,
opts []grpc.CallOption) (*interceptContext, error) {
iCtx := &interceptContext{
mainCtx: ctx,
opts: opts,
metadata: &metadata.MD{},
}
// Let's see if the store already contains a token and what state it
// might be in. If a previous call was aborted, we might have a pending
// token that needs to be handled separately.
token, err := i.store.CurrentToken()
var err error
iCtx.token, err = i.store.CurrentToken()
switch {
// If there is no token yet, nothing to do at this point.
case err == ErrNoToken:
@ -127,16 +169,18 @@ func (i *Interceptor) UnaryInterceptor(ctx context.Context, method string,
// Some other error happened that we have to surface.
case err != nil:
log.Errorf("Failed to get token from store: %v", err)
return fmt.Errorf("getting token from store failed: %v", err)
return nil, fmt.Errorf("getting token from store failed: %v",
err)
// Only if we have a paid token append it. We don't resume a pending
// payment just yet, since we don't even know if a token is required for
// this call. We also never send a pending payment to the server since
// we know it's not valid.
case !token.isPending():
if err = addLsatCredentials(token); err != nil {
case !iCtx.token.isPending():
if err = i.addLsatCredentials(iCtx); err != nil {
log.Errorf("Adding macaroon to request failed: %v", err)
return fmt.Errorf("adding macaroon failed: %v", err)
return nil, fmt.Errorf("adding macaroon failed: %v",
err)
}
}
@ -145,60 +189,59 @@ func (i *Interceptor) UnaryInterceptor(ctx context.Context, method string,
// option. We execute the request and inspect the error. If it's the
// LSAT specific payment required error, we might execute the same
// method again later with the paid LSAT token.
trailerMetadata := &metadata.MD{}
opts = append(opts, grpc.Trailer(trailerMetadata))
rpcCtx, cancel := context.WithTimeout(ctx, i.callTimeout)
defer cancel()
err = invoker(rpcCtx, method, req, reply, cc, opts...)
// Only handle the LSAT error message that comes in the form of
// a gRPC status error.
if isPaymentRequired(err) {
paidToken, err := i.handlePayment(ctx, token, trailerMetadata)
if err != nil {
return err
}
if err = addLsatCredentials(paidToken); err != nil {
log.Errorf("Adding macaroon to request failed: %v", err)
return fmt.Errorf("adding macaroon failed: %v", err)
}
// Execute the same request again, now with the LSAT
// token added as an RPC credential.
rpcCtx2, cancel2 := context.WithTimeout(ctx, i.callTimeout)
defer cancel2()
return invoker(rpcCtx2, method, req, reply, cc, opts...)
}
return err
iCtx.opts = append(iCtx.opts, grpc.Trailer(iCtx.metadata))
return iCtx, nil
}
// handlePayment tries to obtain a valid token by either tracking the payment
// status of a pending token or paying for a new one.
func (i *Interceptor) handlePayment(ctx context.Context, token *Token,
md *metadata.MD) (*Token, error) {
func (i *Interceptor) handlePayment(iCtx *interceptContext) error {
switch {
// Resume/track a pending payment if it was interrupted for some reason.
case token != nil && token.isPending():
case iCtx.token != nil && iCtx.token.isPending():
log.Infof("Payment of LSAT token is required, resuming/" +
"tracking previous payment from pending LSAT token")
err := i.trackPayment(ctx, token)
err := i.trackPayment(iCtx.mainCtx, iCtx.token)
if err != nil {
return nil, err
return err
}
return token, nil
// We don't have a token yet, try to get a new one.
case token == nil:
case iCtx.token == nil:
// We don't have a token yet, get a new one.
log.Infof("Payment of LSAT token is required, paying invoice")
return i.payLsatToken(ctx, md)
var err error
iCtx.token, err = i.payLsatToken(iCtx.mainCtx, iCtx.metadata)
if err != nil {
return err
}
// We have a token and it's valid, nothing more to do here.
default:
log.Debugf("Found valid LSAT token to add to request")
return token, nil
}
if err := i.addLsatCredentials(iCtx); err != nil {
log.Errorf("Adding macaroon to request failed: %v", err)
return fmt.Errorf("adding macaroon failed: %v", err)
}
return nil
}
// addLsatCredentials adds an LSAT token to the given intercept context.
func (i *Interceptor) addLsatCredentials(iCtx *interceptContext) error {
if iCtx.token == nil {
return fmt.Errorf("cannot add nil token to context")
}
macaroon, err := iCtx.token.PaidMacaroon()
if err != nil {
return err
}
iCtx.opts = append(iCtx.opts, grpc.PerRPCCredentials(
macaroons.NewMacaroonCredential(macaroon),
))
return nil
}
// payLsatToken reads the payment challenge from the response metadata and tries

Loading…
Cancel
Save