streaming.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package streaming implements encoder and decoder for streams
  14. // of runtime.Objects over io.Writer/Readers.
  15. package streaming
  16. import (
  17. "bytes"
  18. "fmt"
  19. "io"
  20. "k8s.io/apimachinery/pkg/runtime"
  21. "k8s.io/apimachinery/pkg/runtime/schema"
  22. )
  23. // Encoder is a runtime.Encoder on a stream.
  24. type Encoder interface {
  25. // Encode will write the provided object to the stream or return an error. It obeys the same
  26. // contract as runtime.VersionedEncoder.
  27. Encode(obj runtime.Object) error
  28. }
  29. // Decoder is a runtime.Decoder from a stream.
  30. type Decoder interface {
  31. // Decode will return io.EOF when no more objects are available.
  32. Decode(defaults *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error)
  33. // Close closes the underlying stream.
  34. Close() error
  35. }
  36. // Serializer is a factory for creating encoders and decoders that work over streams.
  37. type Serializer interface {
  38. NewEncoder(w io.Writer) Encoder
  39. NewDecoder(r io.ReadCloser) Decoder
  40. }
  41. type decoder struct {
  42. reader io.ReadCloser
  43. decoder runtime.Decoder
  44. buf []byte
  45. maxBytes int
  46. resetRead bool
  47. }
  48. // NewDecoder creates a streaming decoder that reads object chunks from r and decodes them with d.
  49. // The reader is expected to return ErrShortRead if the provided buffer is not large enough to read
  50. // an entire object.
  51. func NewDecoder(r io.ReadCloser, d runtime.Decoder) Decoder {
  52. return &decoder{
  53. reader: r,
  54. decoder: d,
  55. buf: make([]byte, 1024),
  56. maxBytes: 16 * 1024 * 1024,
  57. }
  58. }
  59. var ErrObjectTooLarge = fmt.Errorf("object to decode was longer than maximum allowed size")
  60. // Decode reads the next object from the stream and decodes it.
  61. func (d *decoder) Decode(defaults *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
  62. base := 0
  63. for {
  64. n, err := d.reader.Read(d.buf[base:])
  65. if err == io.ErrShortBuffer {
  66. if n == 0 {
  67. return nil, nil, fmt.Errorf("got short buffer with n=0, base=%d, cap=%d", base, cap(d.buf))
  68. }
  69. if d.resetRead {
  70. continue
  71. }
  72. // double the buffer size up to maxBytes
  73. if len(d.buf) < d.maxBytes {
  74. base += n
  75. d.buf = append(d.buf, make([]byte, len(d.buf))...)
  76. continue
  77. }
  78. // must read the rest of the frame (until we stop getting ErrShortBuffer)
  79. d.resetRead = true
  80. return nil, nil, ErrObjectTooLarge
  81. }
  82. if err != nil {
  83. return nil, nil, err
  84. }
  85. if d.resetRead {
  86. // now that we have drained the large read, continue
  87. d.resetRead = false
  88. continue
  89. }
  90. base += n
  91. break
  92. }
  93. return d.decoder.Decode(d.buf[:base], defaults, into)
  94. }
  95. func (d *decoder) Close() error {
  96. return d.reader.Close()
  97. }
  98. type encoder struct {
  99. writer io.Writer
  100. encoder runtime.Encoder
  101. buf *bytes.Buffer
  102. }
  103. // NewEncoder returns a new streaming encoder.
  104. func NewEncoder(w io.Writer, e runtime.Encoder) Encoder {
  105. return &encoder{
  106. writer: w,
  107. encoder: e,
  108. buf: &bytes.Buffer{},
  109. }
  110. }
  111. // Encode writes the provided object to the nested writer.
  112. func (e *encoder) Encode(obj runtime.Object) error {
  113. if err := e.encoder.Encode(obj, e.buf); err != nil {
  114. return err
  115. }
  116. _, err := e.writer.Write(e.buf.Bytes())
  117. e.buf.Reset()
  118. return err
  119. }
  120. type encoderWithAllocator struct {
  121. writer io.Writer
  122. encoder runtime.EncoderWithAllocator
  123. memAllocator runtime.MemoryAllocator
  124. }
  125. // NewEncoderWithAllocator returns a new streaming encoder
  126. func NewEncoderWithAllocator(w io.Writer, e runtime.EncoderWithAllocator, a runtime.MemoryAllocator) Encoder {
  127. return &encoderWithAllocator{
  128. writer: w,
  129. encoder: e,
  130. memAllocator: a,
  131. }
  132. }
  133. // Encode writes the provided object to the nested writer
  134. func (e *encoderWithAllocator) Encode(obj runtime.Object) error {
  135. return e.encoder.EncodeWithAllocator(obj, e.writer, e.memAllocator)
  136. }