GoPLS Viewer

Home|gopls/internal/jsonrpc2/conn.go
1// Copyright 2018 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package jsonrpc2
6
7import (
8    "context"
9    "encoding/json"
10    "fmt"
11    "sync"
12    "sync/atomic"
13
14    "golang.org/x/tools/internal/event"
15    "golang.org/x/tools/internal/event/label"
16    "golang.org/x/tools/internal/event/tag"
17)
18
19// Conn is the common interface to jsonrpc clients and servers.
20// Conn is bidirectional; it does not have a designated server or client end.
21// It manages the jsonrpc2 protocol, connecting responses back to their calls.
22type Conn interface {
23    // Call invokes the target method and waits for a response.
24    // The params will be marshaled to JSON before sending over the wire, and will
25    // be handed to the method invoked.
26    // The response will be unmarshaled from JSON into the result.
27    // The id returned will be unique from this connection, and can be used for
28    // logging or tracking.
29    Call(ctx context.Contextmethod stringparamsresult interface{}) (IDerror)
30
31    // Notify invokes the target method but does not wait for a response.
32    // The params will be marshaled to JSON before sending over the wire, and will
33    // be handed to the method invoked.
34    Notify(ctx context.Contextmethod stringparams interface{}) error
35
36    // Go starts a goroutine to handle the connection.
37    // It must be called exactly once for each Conn.
38    // It returns immediately.
39    // You must block on Done() to wait for the connection to shut down.
40    // This is a temporary measure, this should be started automatically in the
41    // future.
42    Go(ctx context.Contexthandler Handler)
43
44    // Close closes the connection and it's underlying stream.
45    // It does not wait for the close to complete, use the Done() channel for
46    // that.
47    Close() error
48
49    // Done returns a channel that will be closed when the processing goroutine
50    // has terminated, which will happen if Close() is called or an underlying
51    // stream is closed.
52    Done() <-chan struct{}
53
54    // Err returns an error if there was one from within the processing goroutine.
55    // If err returns non nil, the connection will be already closed or closing.
56    Err() error
57}
58
59type conn struct {
60    seq       int64      // must only be accessed using atomic operations
61    writeMu   sync.Mutex // protects writes to the stream
62    stream    Stream
63    pendingMu sync.Mutex // protects the pending map
64    pending   map[ID]chan *Response
65
66    done chan struct{}
67    err  atomic.Value
68}
69
70// NewConn creates a new connection object around the supplied stream.
71func NewConn(s StreamConn {
72    conn := &conn{
73        stream:  s,
74        pendingmake(map[ID]chan *Response),
75        done:    make(chan struct{}),
76    }
77    return conn
78}
79
80func (c *connNotify(ctx context.Contextmethod stringparams interface{}) (err error) {
81    notifyerr := NewNotification(methodparams)
82    if err != nil {
83        return fmt.Errorf("marshaling notify parameters: %v"err)
84    }
85    ctxdone := event.Start(ctxmethod,
86        tag.Method.Of(method),
87        tag.RPCDirection.Of(tag.Outbound),
88    )
89    defer func() {
90        recordStatus(ctxerr)
91        done()
92    }()
93
94    event.Metric(ctxtag.Started.Of(1))
95    nerr := c.write(ctxnotify)
96    event.Metric(ctxtag.SentBytes.Of(n))
97    return err
98}
99
100func (c *connCall(ctx context.Contextmethod stringparamsresult interface{}) (_ IDerr error) {
101    // generate a new request identifier
102    id := ID{numberatomic.AddInt64(&c.seq1)}
103    callerr := NewCall(idmethodparams)
104    if err != nil {
105        return idfmt.Errorf("marshaling call parameters: %v"err)
106    }
107    ctxdone := event.Start(ctxmethod,
108        tag.Method.Of(method),
109        tag.RPCDirection.Of(tag.Outbound),
110        tag.RPCID.Of(fmt.Sprintf("%q"id)),
111    )
112    defer func() {
113        recordStatus(ctxerr)
114        done()
115    }()
116    event.Metric(ctxtag.Started.Of(1))
117    // We have to add ourselves to the pending map before we send, otherwise we
118    // are racing the response. Also add a buffer to rchan, so that if we get a
119    // wire response between the time this call is cancelled and id is deleted
120    // from c.pending, the send to rchan will not block.
121    rchan := make(chan *Response1)
122    c.pendingMu.Lock()
123    c.pending[id] = rchan
124    c.pendingMu.Unlock()
125    defer func() {
126        c.pendingMu.Lock()
127        delete(c.pendingid)
128        c.pendingMu.Unlock()
129    }()
130    // now we are ready to send
131    nerr := c.write(ctxcall)
132    event.Metric(ctxtag.SentBytes.Of(n))
133    if err != nil {
134        // sending failed, we will never get a response, so don't leave it pending
135        return iderr
136    }
137    // now wait for the response
138    select {
139    case response := <-rchan:
140        // is it an error response?
141        if response.err != nil {
142            return idresponse.err
143        }
144        if result == nil || len(response.result) == 0 {
145            return idnil
146        }
147        if err := json.Unmarshal(response.resultresult); err != nil {
148            return idfmt.Errorf("unmarshaling result: %v"err)
149        }
150        return idnil
151    case <-ctx.Done():
152        return idctx.Err()
153    }
154}
155
156func (c *connreplier(req RequestspanDone func()) Replier {
157    return func(ctx context.Contextresult interface{}, err errorerror {
158        defer func() {
159            recordStatus(ctxerr)
160            spanDone()
161        }()
162        callok := req.(*Call)
163        if !ok {
164            // request was a notify, no need to respond
165            return nil
166        }
167        responseerr := NewResponse(call.idresulterr)
168        if err != nil {
169            return err
170        }
171        nerr := c.write(ctxresponse)
172        event.Metric(ctxtag.SentBytes.Of(n))
173        if err != nil {
174            // TODO(iancottrell): if a stream write fails, we really need to shut down
175            // the whole stream
176            return err
177        }
178        return nil
179    }
180}
181
182func (c *connwrite(ctx context.Contextmsg Message) (int64error) {
183    c.writeMu.Lock()
184    defer c.writeMu.Unlock()
185    return c.stream.Write(ctxmsg)
186}
187
188func (c *connGo(ctx context.Contexthandler Handler) {
189    go c.run(ctxhandler)
190}
191
192func (c *connrun(ctx context.Contexthandler Handler) {
193    defer close(c.done)
194    for {
195        // get the next message
196        msgnerr := c.stream.Read(ctx)
197        if err != nil {
198            // The stream failed, we cannot continue.
199            c.fail(err)
200            return
201        }
202        switch msg := msg.(type) {
203        case Request:
204            labels := []label.Label{
205                tag.Method.Of(msg.Method()),
206                tag.RPCDirection.Of(tag.Inbound),
207                {}, // reserved for ID if present
208            }
209            if callok := msg.(*Call); ok {
210                labels[len(labels)-1] = tag.RPCID.Of(fmt.Sprintf("%q"call.ID()))
211            } else {
212                labels = labels[:len(labels)-1]
213            }
214            reqCtxspanDone := event.Start(ctxmsg.Method(), labels...)
215            event.Metric(reqCtx,
216                tag.Started.Of(1),
217                tag.ReceivedBytes.Of(n))
218            if err := handler(reqCtxc.replier(msgspanDone), msg); err != nil {
219                // delivery failed, not much we can do
220                event.Error(reqCtx"jsonrpc2 message delivery failed"err)
221            }
222        case *Response:
223            // If method is not set, this should be a response, in which case we must
224            // have an id to send the response back to the caller.
225            c.pendingMu.Lock()
226            rchanok := c.pending[msg.id]
227            c.pendingMu.Unlock()
228            if ok {
229                rchan <- msg
230            }
231        }
232    }
233}
234
235func (c *connClose() error {
236    return c.stream.Close()
237}
238
239func (c *connDone() <-chan struct{} {
240    return c.done
241}
242
243func (c *connErr() error {
244    if err := c.err.Load(); err != nil {
245        return err.(error)
246    }
247    return nil
248}
249
250// fail sets a failure condition on the stream and closes it.
251func (c *connfail(err error) {
252    c.err.Store(err)
253    c.stream.Close()
254}
255
256func recordStatus(ctx context.Contexterr error) {
257    if err != nil {
258        event.Label(ctxtag.StatusCode.Of("ERROR"))
259    } else {
260        event.Label(ctxtag.StatusCode.Of("OK"))
261    }
262}
263
MembersX
conn.stream
conn.Call.BlockStmt.response
conn.Done.c
conn.Call.done
conn.fail.c
atomic
label
conn.Notify.c
conn.Notify
conn.Call.n
conn.Go.c
conn.run.BlockStmt.BlockStmt.labels
conn.fail.err
json
conn.Notify.ctx
conn.Notify.method
conn.Call.ctx
conn.Notify.params
conn.Call
conn.write.ctx
conn.Call.id
conn.replier.BlockStmt.n
conn.run.BlockStmt.n
context
event
conn.Notify.n
conn.Call.method
conn.Go.handler
conn.run.BlockStmt.BlockStmt.spanDone
conn.write.c
conn.Go.ctx
conn.run.BlockStmt.err
recordStatus.err
fmt
conn.pendingMu
NewConn
conn.Notify.notify
conn.Close
conn.Call.rchan
conn.replier.c
conn.replier.spanDone
conn.replier.BlockStmt.err
conn.run.BlockStmt.BlockStmt.err
conn.Close.c
conn.seq
conn.Call.BlockStmt.err
conn.write
conn.run.BlockStmt.msg
conn.Call.result
conn.replier.BlockStmt.response
conn.Err.c
tag
conn.writeMu
conn.pending
conn.Call.params
conn.run.c
recordStatus.ctx
conn.Call.err
conn.write.msg
conn.run.BlockStmt.BlockStmt.reqCtx
conn.run.ctx
conn.Err.err
conn.fail
conn.done
conn.err
conn.Notify.done
conn.run
conn.Call.c
conn.Call.call
recordStatus
conn.Notify.err
conn.replier.req
conn.Done
sync
Conn
NewConn.s
NewConn.conn
conn.Err
conn
conn.replier
conn.Go
conn.run.handler
Members
X