api.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653
  1. // Copyright 2011 Google Inc. All rights reserved.
  2. // Use of this source code is governed by the Apache 2.0
  3. // license that can be found in the LICENSE file.
  4. //go:build !appengine
  5. // +build !appengine
  6. package internal
  7. import (
  8. "bytes"
  9. "context"
  10. "errors"
  11. "fmt"
  12. "io/ioutil"
  13. "log"
  14. "net"
  15. "net/http"
  16. "net/url"
  17. "os"
  18. "runtime"
  19. "strconv"
  20. "strings"
  21. "sync"
  22. "sync/atomic"
  23. "time"
  24. "github.com/golang/protobuf/proto"
  25. basepb "google.golang.org/appengine/internal/base"
  26. logpb "google.golang.org/appengine/internal/log"
  27. remotepb "google.golang.org/appengine/internal/remote_api"
  28. )
  29. const (
  30. apiPath = "/rpc_http"
  31. )
  32. var (
  33. // Incoming headers.
  34. ticketHeader = http.CanonicalHeaderKey("X-AppEngine-API-Ticket")
  35. dapperHeader = http.CanonicalHeaderKey("X-Google-DapperTraceInfo")
  36. traceHeader = http.CanonicalHeaderKey("X-Cloud-Trace-Context")
  37. curNamespaceHeader = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace")
  38. userIPHeader = http.CanonicalHeaderKey("X-AppEngine-User-IP")
  39. remoteAddrHeader = http.CanonicalHeaderKey("X-AppEngine-Remote-Addr")
  40. devRequestIdHeader = http.CanonicalHeaderKey("X-Appengine-Dev-Request-Id")
  41. // Outgoing headers.
  42. apiEndpointHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Endpoint")
  43. apiEndpointHeaderValue = []string{"app-engine-apis"}
  44. apiMethodHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Method")
  45. apiMethodHeaderValue = []string{"/VMRemoteAPI.CallRemoteAPI"}
  46. apiDeadlineHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Deadline")
  47. apiContentType = http.CanonicalHeaderKey("Content-Type")
  48. apiContentTypeValue = []string{"application/octet-stream"}
  49. logFlushHeader = http.CanonicalHeaderKey("X-AppEngine-Log-Flush-Count")
  50. apiHTTPClient = &http.Client{
  51. Transport: &http.Transport{
  52. Proxy: http.ProxyFromEnvironment,
  53. Dial: limitDial,
  54. MaxIdleConns: 1000,
  55. MaxIdleConnsPerHost: 10000,
  56. IdleConnTimeout: 90 * time.Second,
  57. },
  58. }
  59. )
  60. func apiURL(ctx context.Context) *url.URL {
  61. host, port := "appengine.googleapis.internal", "10001"
  62. if h := os.Getenv("API_HOST"); h != "" {
  63. host = h
  64. }
  65. if hostOverride := ctx.Value(apiHostOverrideKey); hostOverride != nil {
  66. host = hostOverride.(string)
  67. }
  68. if p := os.Getenv("API_PORT"); p != "" {
  69. port = p
  70. }
  71. if portOverride := ctx.Value(apiPortOverrideKey); portOverride != nil {
  72. port = portOverride.(string)
  73. }
  74. return &url.URL{
  75. Scheme: "http",
  76. Host: host + ":" + port,
  77. Path: apiPath,
  78. }
  79. }
  80. // Middleware wraps an http handler so that it can make GAE API calls
  81. func Middleware(next http.Handler) http.Handler {
  82. return handleHTTPMiddleware(executeRequestSafelyMiddleware(next))
  83. }
  84. func handleHTTPMiddleware(next http.Handler) http.Handler {
  85. return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  86. c := &aeContext{
  87. req: r,
  88. outHeader: w.Header(),
  89. }
  90. r = r.WithContext(withContext(r.Context(), c))
  91. c.req = r
  92. stopFlushing := make(chan int)
  93. // Patch up RemoteAddr so it looks reasonable.
  94. if addr := r.Header.Get(userIPHeader); addr != "" {
  95. r.RemoteAddr = addr
  96. } else if addr = r.Header.Get(remoteAddrHeader); addr != "" {
  97. r.RemoteAddr = addr
  98. } else {
  99. // Should not normally reach here, but pick a sensible default anyway.
  100. r.RemoteAddr = "127.0.0.1"
  101. }
  102. // The address in the headers will most likely be of these forms:
  103. // 123.123.123.123
  104. // 2001:db8::1
  105. // net/http.Request.RemoteAddr is specified to be in "IP:port" form.
  106. if _, _, err := net.SplitHostPort(r.RemoteAddr); err != nil {
  107. // Assume the remote address is only a host; add a default port.
  108. r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80")
  109. }
  110. if logToLogservice() {
  111. // Start goroutine responsible for flushing app logs.
  112. // This is done after adding c to ctx.m (and stopped before removing it)
  113. // because flushing logs requires making an API call.
  114. go c.logFlusher(stopFlushing)
  115. }
  116. next.ServeHTTP(c, r)
  117. c.outHeader = nil // make sure header changes aren't respected any more
  118. flushed := make(chan struct{})
  119. if logToLogservice() {
  120. stopFlushing <- 1 // any logging beyond this point will be dropped
  121. // Flush any pending logs asynchronously.
  122. c.pendingLogs.Lock()
  123. flushes := c.pendingLogs.flushes
  124. if len(c.pendingLogs.lines) > 0 {
  125. flushes++
  126. }
  127. c.pendingLogs.Unlock()
  128. go func() {
  129. defer close(flushed)
  130. // Force a log flush, because with very short requests we
  131. // may not ever flush logs.
  132. c.flushLog(true)
  133. }()
  134. w.Header().Set(logFlushHeader, strconv.Itoa(flushes))
  135. }
  136. // Avoid nil Write call if c.Write is never called.
  137. if c.outCode != 0 {
  138. w.WriteHeader(c.outCode)
  139. }
  140. if c.outBody != nil {
  141. w.Write(c.outBody)
  142. }
  143. if logToLogservice() {
  144. // Wait for the last flush to complete before returning,
  145. // otherwise the security ticket will not be valid.
  146. <-flushed
  147. }
  148. })
  149. }
  150. func executeRequestSafelyMiddleware(next http.Handler) http.Handler {
  151. return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  152. defer func() {
  153. if x := recover(); x != nil {
  154. c := w.(*aeContext)
  155. logf(c, 4, "%s", renderPanic(x)) // 4 == critical
  156. c.outCode = 500
  157. }
  158. }()
  159. next.ServeHTTP(w, r)
  160. })
  161. }
  162. func renderPanic(x interface{}) string {
  163. buf := make([]byte, 16<<10) // 16 KB should be plenty
  164. buf = buf[:runtime.Stack(buf, false)]
  165. // Remove the first few stack frames:
  166. // this func
  167. // the recover closure in the caller
  168. // That will root the stack trace at the site of the panic.
  169. const (
  170. skipStart = "internal.renderPanic"
  171. skipFrames = 2
  172. )
  173. start := bytes.Index(buf, []byte(skipStart))
  174. p := start
  175. for i := 0; i < skipFrames*2 && p+1 < len(buf); i++ {
  176. p = bytes.IndexByte(buf[p+1:], '\n') + p + 1
  177. if p < 0 {
  178. break
  179. }
  180. }
  181. if p >= 0 {
  182. // buf[start:p+1] is the block to remove.
  183. // Copy buf[p+1:] over buf[start:] and shrink buf.
  184. copy(buf[start:], buf[p+1:])
  185. buf = buf[:len(buf)-(p+1-start)]
  186. }
  187. // Add panic heading.
  188. head := fmt.Sprintf("panic: %v\n\n", x)
  189. if len(head) > len(buf) {
  190. // Extremely unlikely to happen.
  191. return head
  192. }
  193. copy(buf[len(head):], buf)
  194. copy(buf, head)
  195. return string(buf)
  196. }
  197. // aeContext represents the aeContext of an in-flight HTTP request.
  198. // It implements the appengine.Context and http.ResponseWriter interfaces.
  199. type aeContext struct {
  200. req *http.Request
  201. outCode int
  202. outHeader http.Header
  203. outBody []byte
  204. pendingLogs struct {
  205. sync.Mutex
  206. lines []*logpb.UserAppLogLine
  207. flushes int
  208. }
  209. }
  210. var contextKey = "holds a *context"
  211. // jointContext joins two contexts in a superficial way.
  212. // It takes values and timeouts from a base context, and only values from another context.
  213. type jointContext struct {
  214. base context.Context
  215. valuesOnly context.Context
  216. }
  217. func (c jointContext) Deadline() (time.Time, bool) {
  218. return c.base.Deadline()
  219. }
  220. func (c jointContext) Done() <-chan struct{} {
  221. return c.base.Done()
  222. }
  223. func (c jointContext) Err() error {
  224. return c.base.Err()
  225. }
  226. func (c jointContext) Value(key interface{}) interface{} {
  227. if val := c.base.Value(key); val != nil {
  228. return val
  229. }
  230. return c.valuesOnly.Value(key)
  231. }
  232. // fromContext returns the App Engine context or nil if ctx is not
  233. // derived from an App Engine context.
  234. func fromContext(ctx context.Context) *aeContext {
  235. c, _ := ctx.Value(&contextKey).(*aeContext)
  236. return c
  237. }
  238. func withContext(parent context.Context, c *aeContext) context.Context {
  239. ctx := context.WithValue(parent, &contextKey, c)
  240. if ns := c.req.Header.Get(curNamespaceHeader); ns != "" {
  241. ctx = withNamespace(ctx, ns)
  242. }
  243. return ctx
  244. }
  245. func toContext(c *aeContext) context.Context {
  246. return withContext(context.Background(), c)
  247. }
  248. func IncomingHeaders(ctx context.Context) http.Header {
  249. if c := fromContext(ctx); c != nil {
  250. return c.req.Header
  251. }
  252. return nil
  253. }
  254. func ReqContext(req *http.Request) context.Context {
  255. return req.Context()
  256. }
  257. func WithContext(parent context.Context, req *http.Request) context.Context {
  258. return jointContext{
  259. base: parent,
  260. valuesOnly: req.Context(),
  261. }
  262. }
  263. // RegisterTestRequest registers the HTTP request req for testing, such that
  264. // any API calls are sent to the provided URL.
  265. // It should only be used by aetest package.
  266. func RegisterTestRequest(req *http.Request, apiURL *url.URL, appID string) *http.Request {
  267. ctx := req.Context()
  268. ctx = withAPIHostOverride(ctx, apiURL.Hostname())
  269. ctx = withAPIPortOverride(ctx, apiURL.Port())
  270. ctx = WithAppIDOverride(ctx, appID)
  271. // use the unregistered request as a placeholder so that withContext can read the headers
  272. c := &aeContext{req: req}
  273. c.req = req.WithContext(withContext(ctx, c))
  274. return c.req
  275. }
  276. var errTimeout = &CallError{
  277. Detail: "Deadline exceeded",
  278. Code: int32(remotepb.RpcError_CANCELLED),
  279. Timeout: true,
  280. }
  281. func (c *aeContext) Header() http.Header { return c.outHeader }
  282. // Copied from $GOROOT/src/pkg/net/http/transfer.go. Some response status
  283. // codes do not permit a response body (nor response entity headers such as
  284. // Content-Length, Content-Type, etc).
  285. func bodyAllowedForStatus(status int) bool {
  286. switch {
  287. case status >= 100 && status <= 199:
  288. return false
  289. case status == 204:
  290. return false
  291. case status == 304:
  292. return false
  293. }
  294. return true
  295. }
  296. func (c *aeContext) Write(b []byte) (int, error) {
  297. if c.outCode == 0 {
  298. c.WriteHeader(http.StatusOK)
  299. }
  300. if len(b) > 0 && !bodyAllowedForStatus(c.outCode) {
  301. return 0, http.ErrBodyNotAllowed
  302. }
  303. c.outBody = append(c.outBody, b...)
  304. return len(b), nil
  305. }
  306. func (c *aeContext) WriteHeader(code int) {
  307. if c.outCode != 0 {
  308. logf(c, 3, "WriteHeader called multiple times on request.") // error level
  309. return
  310. }
  311. c.outCode = code
  312. }
  313. func post(ctx context.Context, body []byte, timeout time.Duration) (b []byte, err error) {
  314. apiURL := apiURL(ctx)
  315. hreq := &http.Request{
  316. Method: "POST",
  317. URL: apiURL,
  318. Header: http.Header{
  319. apiEndpointHeader: apiEndpointHeaderValue,
  320. apiMethodHeader: apiMethodHeaderValue,
  321. apiContentType: apiContentTypeValue,
  322. apiDeadlineHeader: []string{strconv.FormatFloat(timeout.Seconds(), 'f', -1, 64)},
  323. },
  324. Body: ioutil.NopCloser(bytes.NewReader(body)),
  325. ContentLength: int64(len(body)),
  326. Host: apiURL.Host,
  327. }
  328. c := fromContext(ctx)
  329. if c != nil {
  330. if info := c.req.Header.Get(dapperHeader); info != "" {
  331. hreq.Header.Set(dapperHeader, info)
  332. }
  333. if info := c.req.Header.Get(traceHeader); info != "" {
  334. hreq.Header.Set(traceHeader, info)
  335. }
  336. }
  337. tr := apiHTTPClient.Transport.(*http.Transport)
  338. var timedOut int32 // atomic; set to 1 if timed out
  339. t := time.AfterFunc(timeout, func() {
  340. atomic.StoreInt32(&timedOut, 1)
  341. tr.CancelRequest(hreq)
  342. })
  343. defer t.Stop()
  344. defer func() {
  345. // Check if timeout was exceeded.
  346. if atomic.LoadInt32(&timedOut) != 0 {
  347. err = errTimeout
  348. }
  349. }()
  350. hresp, err := apiHTTPClient.Do(hreq)
  351. if err != nil {
  352. return nil, &CallError{
  353. Detail: fmt.Sprintf("service bridge HTTP failed: %v", err),
  354. Code: int32(remotepb.RpcError_UNKNOWN),
  355. }
  356. }
  357. defer hresp.Body.Close()
  358. hrespBody, err := ioutil.ReadAll(hresp.Body)
  359. if hresp.StatusCode != 200 {
  360. return nil, &CallError{
  361. Detail: fmt.Sprintf("service bridge returned HTTP %d (%q)", hresp.StatusCode, hrespBody),
  362. Code: int32(remotepb.RpcError_UNKNOWN),
  363. }
  364. }
  365. if err != nil {
  366. return nil, &CallError{
  367. Detail: fmt.Sprintf("service bridge response bad: %v", err),
  368. Code: int32(remotepb.RpcError_UNKNOWN),
  369. }
  370. }
  371. return hrespBody, nil
  372. }
  373. func Call(ctx context.Context, service, method string, in, out proto.Message) error {
  374. if ns := NamespaceFromContext(ctx); ns != "" {
  375. if fn, ok := NamespaceMods[service]; ok {
  376. fn(in, ns)
  377. }
  378. }
  379. if f, ctx, ok := callOverrideFromContext(ctx); ok {
  380. return f(ctx, service, method, in, out)
  381. }
  382. // Handle already-done contexts quickly.
  383. select {
  384. case <-ctx.Done():
  385. return ctx.Err()
  386. default:
  387. }
  388. c := fromContext(ctx)
  389. // Apply transaction modifications if we're in a transaction.
  390. if t := transactionFromContext(ctx); t != nil {
  391. if t.finished {
  392. return errors.New("transaction aeContext has expired")
  393. }
  394. applyTransaction(in, &t.transaction)
  395. }
  396. // Default RPC timeout is 60s.
  397. timeout := 60 * time.Second
  398. if deadline, ok := ctx.Deadline(); ok {
  399. timeout = deadline.Sub(time.Now())
  400. }
  401. data, err := proto.Marshal(in)
  402. if err != nil {
  403. return err
  404. }
  405. ticket := ""
  406. if c != nil {
  407. ticket = c.req.Header.Get(ticketHeader)
  408. if dri := c.req.Header.Get(devRequestIdHeader); IsDevAppServer() && dri != "" {
  409. ticket = dri
  410. }
  411. }
  412. req := &remotepb.Request{
  413. ServiceName: &service,
  414. Method: &method,
  415. Request: data,
  416. RequestId: &ticket,
  417. }
  418. hreqBody, err := proto.Marshal(req)
  419. if err != nil {
  420. return err
  421. }
  422. hrespBody, err := post(ctx, hreqBody, timeout)
  423. if err != nil {
  424. return err
  425. }
  426. res := &remotepb.Response{}
  427. if err := proto.Unmarshal(hrespBody, res); err != nil {
  428. return err
  429. }
  430. if res.RpcError != nil {
  431. ce := &CallError{
  432. Detail: res.RpcError.GetDetail(),
  433. Code: *res.RpcError.Code,
  434. }
  435. switch remotepb.RpcError_ErrorCode(ce.Code) {
  436. case remotepb.RpcError_CANCELLED, remotepb.RpcError_DEADLINE_EXCEEDED:
  437. ce.Timeout = true
  438. }
  439. return ce
  440. }
  441. if res.ApplicationError != nil {
  442. return &APIError{
  443. Service: *req.ServiceName,
  444. Detail: res.ApplicationError.GetDetail(),
  445. Code: *res.ApplicationError.Code,
  446. }
  447. }
  448. if res.Exception != nil || res.JavaException != nil {
  449. // This shouldn't happen, but let's be defensive.
  450. return &CallError{
  451. Detail: "service bridge returned exception",
  452. Code: int32(remotepb.RpcError_UNKNOWN),
  453. }
  454. }
  455. return proto.Unmarshal(res.Response, out)
  456. }
  457. func (c *aeContext) Request() *http.Request {
  458. return c.req
  459. }
  460. func (c *aeContext) addLogLine(ll *logpb.UserAppLogLine) {
  461. // Truncate long log lines.
  462. // TODO(dsymonds): Check if this is still necessary.
  463. const lim = 8 << 10
  464. if len(*ll.Message) > lim {
  465. suffix := fmt.Sprintf("...(length %d)", len(*ll.Message))
  466. ll.Message = proto.String((*ll.Message)[:lim-len(suffix)] + suffix)
  467. }
  468. c.pendingLogs.Lock()
  469. c.pendingLogs.lines = append(c.pendingLogs.lines, ll)
  470. c.pendingLogs.Unlock()
  471. }
  472. var logLevelName = map[int64]string{
  473. 0: "DEBUG",
  474. 1: "INFO",
  475. 2: "WARNING",
  476. 3: "ERROR",
  477. 4: "CRITICAL",
  478. }
  479. func logf(c *aeContext, level int64, format string, args ...interface{}) {
  480. if c == nil {
  481. panic("not an App Engine aeContext")
  482. }
  483. s := fmt.Sprintf(format, args...)
  484. s = strings.TrimRight(s, "\n") // Remove any trailing newline characters.
  485. if logToLogservice() {
  486. c.addLogLine(&logpb.UserAppLogLine{
  487. TimestampUsec: proto.Int64(time.Now().UnixNano() / 1e3),
  488. Level: &level,
  489. Message: &s,
  490. })
  491. }
  492. // Log to stdout if not deployed
  493. if !IsSecondGen() {
  494. log.Print(logLevelName[level] + ": " + s)
  495. }
  496. }
  497. // flushLog attempts to flush any pending logs to the appserver.
  498. // It should not be called concurrently.
  499. func (c *aeContext) flushLog(force bool) (flushed bool) {
  500. c.pendingLogs.Lock()
  501. // Grab up to 30 MB. We can get away with up to 32 MB, but let's be cautious.
  502. n, rem := 0, 30<<20
  503. for ; n < len(c.pendingLogs.lines); n++ {
  504. ll := c.pendingLogs.lines[n]
  505. // Each log line will require about 3 bytes of overhead.
  506. nb := proto.Size(ll) + 3
  507. if nb > rem {
  508. break
  509. }
  510. rem -= nb
  511. }
  512. lines := c.pendingLogs.lines[:n]
  513. c.pendingLogs.lines = c.pendingLogs.lines[n:]
  514. c.pendingLogs.Unlock()
  515. if len(lines) == 0 && !force {
  516. // Nothing to flush.
  517. return false
  518. }
  519. rescueLogs := false
  520. defer func() {
  521. if rescueLogs {
  522. c.pendingLogs.Lock()
  523. c.pendingLogs.lines = append(lines, c.pendingLogs.lines...)
  524. c.pendingLogs.Unlock()
  525. }
  526. }()
  527. buf, err := proto.Marshal(&logpb.UserAppLogGroup{
  528. LogLine: lines,
  529. })
  530. if err != nil {
  531. log.Printf("internal.flushLog: marshaling UserAppLogGroup: %v", err)
  532. rescueLogs = true
  533. return false
  534. }
  535. req := &logpb.FlushRequest{
  536. Logs: buf,
  537. }
  538. res := &basepb.VoidProto{}
  539. c.pendingLogs.Lock()
  540. c.pendingLogs.flushes++
  541. c.pendingLogs.Unlock()
  542. if err := Call(toContext(c), "logservice", "Flush", req, res); err != nil {
  543. log.Printf("internal.flushLog: Flush RPC: %v", err)
  544. rescueLogs = true
  545. return false
  546. }
  547. return true
  548. }
  549. const (
  550. // Log flushing parameters.
  551. flushInterval = 1 * time.Second
  552. forceFlushInterval = 60 * time.Second
  553. )
  554. func (c *aeContext) logFlusher(stop <-chan int) {
  555. lastFlush := time.Now()
  556. tick := time.NewTicker(flushInterval)
  557. for {
  558. select {
  559. case <-stop:
  560. // Request finished.
  561. tick.Stop()
  562. return
  563. case <-tick.C:
  564. force := time.Now().Sub(lastFlush) > forceFlushInterval
  565. if c.flushLog(force) {
  566. lastFlush = time.Now()
  567. }
  568. }
  569. }
  570. }
  571. func ContextForTesting(req *http.Request) context.Context {
  572. return toContext(&aeContext{req: req})
  573. }
  574. func logToLogservice() bool {
  575. // TODO: replace logservice with json structured logs to $LOG_DIR/app.log.json
  576. // where $LOG_DIR is /var/log in prod and some tmpdir in dev
  577. return os.Getenv("LOG_TO_LOGSERVICE") != "0"
  578. }