protobuf.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package protobuf
  14. import (
  15. "bytes"
  16. "fmt"
  17. "io"
  18. "net/http"
  19. "reflect"
  20. "github.com/gogo/protobuf/proto"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "k8s.io/apimachinery/pkg/runtime"
  23. "k8s.io/apimachinery/pkg/runtime/schema"
  24. "k8s.io/apimachinery/pkg/runtime/serializer/recognizer"
  25. "k8s.io/apimachinery/pkg/util/framer"
  26. "k8s.io/klog/v2"
  27. )
  28. var (
  29. // protoEncodingPrefix serves as a magic number for an encoded protobuf message on this serializer. All
  30. // proto messages serialized by this schema will be preceded by the bytes 0x6b 0x38 0x73, with the fourth
  31. // byte being reserved for the encoding style. The only encoding style defined is 0x00, which means that
  32. // the rest of the byte stream is a message of type k8s.io.kubernetes.pkg.runtime.Unknown (proto2).
  33. //
  34. // See k8s.io/apimachinery/pkg/runtime/generated.proto for details of the runtime.Unknown message.
  35. //
  36. // This encoding scheme is experimental, and is subject to change at any time.
  37. protoEncodingPrefix = []byte{0x6b, 0x38, 0x73, 0x00}
  38. )
  39. type errNotMarshalable struct {
  40. t reflect.Type
  41. }
  42. func (e errNotMarshalable) Error() string {
  43. return fmt.Sprintf("object %v does not implement the protobuf marshalling interface and cannot be encoded to a protobuf message", e.t)
  44. }
  45. func (e errNotMarshalable) Status() metav1.Status {
  46. return metav1.Status{
  47. Status: metav1.StatusFailure,
  48. Code: http.StatusNotAcceptable,
  49. Reason: metav1.StatusReason("NotAcceptable"),
  50. Message: e.Error(),
  51. }
  52. }
  53. // IsNotMarshalable checks the type of error, returns a boolean true if error is not nil and not marshalable false otherwise
  54. func IsNotMarshalable(err error) bool {
  55. _, ok := err.(errNotMarshalable)
  56. return err != nil && ok
  57. }
  58. // NewSerializer creates a Protobuf serializer that handles encoding versioned objects into the proper wire form. If a typer
  59. // is passed, the encoded object will have group, version, and kind fields set. If typer is nil, the objects will be written
  60. // as-is (any type info passed with the object will be used).
  61. func NewSerializer(creater runtime.ObjectCreater, typer runtime.ObjectTyper) *Serializer {
  62. return &Serializer{
  63. prefix: protoEncodingPrefix,
  64. creater: creater,
  65. typer: typer,
  66. }
  67. }
  68. // Serializer handles encoding versioned objects into the proper wire form
  69. type Serializer struct {
  70. prefix []byte
  71. creater runtime.ObjectCreater
  72. typer runtime.ObjectTyper
  73. }
  74. var _ runtime.Serializer = &Serializer{}
  75. var _ runtime.EncoderWithAllocator = &Serializer{}
  76. var _ recognizer.RecognizingDecoder = &Serializer{}
  77. const serializerIdentifier runtime.Identifier = "protobuf"
  78. // Decode attempts to convert the provided data into a protobuf message, extract the stored schema kind, apply the provided default
  79. // gvk, and then load that data into an object matching the desired schema kind or the provided into. If into is *runtime.Unknown,
  80. // the raw data will be extracted and no decoding will be performed. If into is not registered with the typer, then the object will
  81. // be straight decoded using normal protobuf unmarshalling (the MarshalTo interface). If into is provided and the original data is
  82. // not fully qualified with kind/version/group, the type of the into will be used to alter the returned gvk. On success or most
  83. // errors, the method will return the calculated schema kind.
  84. func (s *Serializer) Decode(originalData []byte, gvk *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
  85. prefixLen := len(s.prefix)
  86. switch {
  87. case len(originalData) == 0:
  88. // TODO: treat like decoding {} from JSON with defaulting
  89. return nil, nil, fmt.Errorf("empty data")
  90. case len(originalData) < prefixLen || !bytes.Equal(s.prefix, originalData[:prefixLen]):
  91. return nil, nil, fmt.Errorf("provided data does not appear to be a protobuf message, expected prefix %v", s.prefix)
  92. case len(originalData) == prefixLen:
  93. // TODO: treat like decoding {} from JSON with defaulting
  94. return nil, nil, fmt.Errorf("empty body")
  95. }
  96. data := originalData[prefixLen:]
  97. unk := runtime.Unknown{}
  98. if err := unk.Unmarshal(data); err != nil {
  99. return nil, nil, err
  100. }
  101. actual := unk.GroupVersionKind()
  102. copyKindDefaults(&actual, gvk)
  103. if intoUnknown, ok := into.(*runtime.Unknown); ok && intoUnknown != nil {
  104. *intoUnknown = unk
  105. if ok, _, _ := s.RecognizesData(unk.Raw); ok {
  106. intoUnknown.ContentType = runtime.ContentTypeProtobuf
  107. }
  108. return intoUnknown, &actual, nil
  109. }
  110. if into != nil {
  111. types, _, err := s.typer.ObjectKinds(into)
  112. switch {
  113. case runtime.IsNotRegisteredError(err):
  114. pb, ok := into.(proto.Message)
  115. if !ok {
  116. return nil, &actual, errNotMarshalable{reflect.TypeOf(into)}
  117. }
  118. if err := proto.Unmarshal(unk.Raw, pb); err != nil {
  119. return nil, &actual, err
  120. }
  121. return into, &actual, nil
  122. case err != nil:
  123. return nil, &actual, err
  124. default:
  125. copyKindDefaults(&actual, &types[0])
  126. // if the result of defaulting did not set a version or group, ensure that at least group is set
  127. // (copyKindDefaults will not assign Group if version is already set). This guarantees that the group
  128. // of into is set if there is no better information from the caller or object.
  129. if len(actual.Version) == 0 && len(actual.Group) == 0 {
  130. actual.Group = types[0].Group
  131. }
  132. }
  133. }
  134. if len(actual.Kind) == 0 {
  135. return nil, &actual, runtime.NewMissingKindErr(fmt.Sprintf("%#v", unk.TypeMeta))
  136. }
  137. if len(actual.Version) == 0 {
  138. return nil, &actual, runtime.NewMissingVersionErr(fmt.Sprintf("%#v", unk.TypeMeta))
  139. }
  140. return unmarshalToObject(s.typer, s.creater, &actual, into, unk.Raw)
  141. }
  142. // EncodeWithAllocator writes an object to the provided writer.
  143. // In addition, it allows for providing a memory allocator for efficient memory usage during object serialization.
  144. func (s *Serializer) EncodeWithAllocator(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
  145. return s.encode(obj, w, memAlloc)
  146. }
  147. // Encode serializes the provided object to the given writer.
  148. func (s *Serializer) Encode(obj runtime.Object, w io.Writer) error {
  149. return s.encode(obj, w, &runtime.SimpleAllocator{})
  150. }
  151. func (s *Serializer) encode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
  152. if co, ok := obj.(runtime.CacheableObject); ok {
  153. return co.CacheEncode(s.Identifier(), func(obj runtime.Object, w io.Writer) error { return s.doEncode(obj, w, memAlloc) }, w)
  154. }
  155. return s.doEncode(obj, w, memAlloc)
  156. }
  157. func (s *Serializer) doEncode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
  158. if memAlloc == nil {
  159. klog.Error("a mandatory memory allocator wasn't provided, this might have a negative impact on performance, check invocations of EncodeWithAllocator method, falling back on runtime.SimpleAllocator")
  160. memAlloc = &runtime.SimpleAllocator{}
  161. }
  162. prefixSize := uint64(len(s.prefix))
  163. var unk runtime.Unknown
  164. switch t := obj.(type) {
  165. case *runtime.Unknown:
  166. estimatedSize := prefixSize + uint64(t.Size())
  167. data := memAlloc.Allocate(estimatedSize)
  168. i, err := t.MarshalTo(data[prefixSize:])
  169. if err != nil {
  170. return err
  171. }
  172. copy(data, s.prefix)
  173. _, err = w.Write(data[:prefixSize+uint64(i)])
  174. return err
  175. default:
  176. kind := obj.GetObjectKind().GroupVersionKind()
  177. unk = runtime.Unknown{
  178. TypeMeta: runtime.TypeMeta{
  179. Kind: kind.Kind,
  180. APIVersion: kind.GroupVersion().String(),
  181. },
  182. }
  183. }
  184. switch t := obj.(type) {
  185. case bufferedMarshaller:
  186. // this path performs a single allocation during write only when the Allocator wasn't provided
  187. // it also requires the caller to implement the more efficient Size and MarshalToSizedBuffer methods
  188. encodedSize := uint64(t.Size())
  189. estimatedSize := prefixSize + estimateUnknownSize(&unk, encodedSize)
  190. data := memAlloc.Allocate(estimatedSize)
  191. i, err := unk.NestedMarshalTo(data[prefixSize:], t, encodedSize)
  192. if err != nil {
  193. return err
  194. }
  195. copy(data, s.prefix)
  196. _, err = w.Write(data[:prefixSize+uint64(i)])
  197. return err
  198. case proto.Marshaler:
  199. // this path performs extra allocations
  200. data, err := t.Marshal()
  201. if err != nil {
  202. return err
  203. }
  204. unk.Raw = data
  205. estimatedSize := prefixSize + uint64(unk.Size())
  206. data = memAlloc.Allocate(estimatedSize)
  207. i, err := unk.MarshalTo(data[prefixSize:])
  208. if err != nil {
  209. return err
  210. }
  211. copy(data, s.prefix)
  212. _, err = w.Write(data[:prefixSize+uint64(i)])
  213. return err
  214. default:
  215. // TODO: marshal with a different content type and serializer (JSON for third party objects)
  216. return errNotMarshalable{reflect.TypeOf(obj)}
  217. }
  218. }
  219. // Identifier implements runtime.Encoder interface.
  220. func (s *Serializer) Identifier() runtime.Identifier {
  221. return serializerIdentifier
  222. }
  223. // RecognizesData implements the RecognizingDecoder interface.
  224. func (s *Serializer) RecognizesData(data []byte) (bool, bool, error) {
  225. return bytes.HasPrefix(data, s.prefix), false, nil
  226. }
  227. // copyKindDefaults defaults dst to the value in src if dst does not have a value set.
  228. func copyKindDefaults(dst, src *schema.GroupVersionKind) {
  229. if src == nil {
  230. return
  231. }
  232. // apply kind and version defaulting from provided default
  233. if len(dst.Kind) == 0 {
  234. dst.Kind = src.Kind
  235. }
  236. if len(dst.Version) == 0 && len(src.Version) > 0 {
  237. dst.Group = src.Group
  238. dst.Version = src.Version
  239. }
  240. }
  241. // bufferedMarshaller describes a more efficient marshalling interface that can avoid allocating multiple
  242. // byte buffers by pre-calculating the size of the final buffer needed.
  243. type bufferedMarshaller interface {
  244. proto.Sizer
  245. runtime.ProtobufMarshaller
  246. }
  247. // Like bufferedMarshaller, but is able to marshal backwards, which is more efficient since it doesn't call Size() as frequently.
  248. type bufferedReverseMarshaller interface {
  249. proto.Sizer
  250. runtime.ProtobufReverseMarshaller
  251. }
  252. // estimateUnknownSize returns the expected bytes consumed by a given runtime.Unknown
  253. // object with a nil RawJSON struct and the expected size of the provided buffer. The
  254. // returned size will not be correct if RawJSOn is set on unk.
  255. func estimateUnknownSize(unk *runtime.Unknown, byteSize uint64) uint64 {
  256. size := uint64(unk.Size())
  257. // protobuf uses 1 byte for the tag, a varint for the length of the array (at most 8 bytes - uint64 - here),
  258. // and the size of the array.
  259. size += 1 + 8 + byteSize
  260. return size
  261. }
  262. // NewRawSerializer creates a Protobuf serializer that handles encoding versioned objects into the proper wire form. If typer
  263. // is not nil, the object has the group, version, and kind fields set. This serializer does not provide type information for the
  264. // encoded object, and thus is not self describing (callers must know what type is being described in order to decode).
  265. //
  266. // This encoding scheme is experimental, and is subject to change at any time.
  267. func NewRawSerializer(creater runtime.ObjectCreater, typer runtime.ObjectTyper) *RawSerializer {
  268. return &RawSerializer{
  269. creater: creater,
  270. typer: typer,
  271. }
  272. }
  273. // RawSerializer encodes and decodes objects without adding a runtime.Unknown wrapper (objects are encoded without identifying
  274. // type).
  275. type RawSerializer struct {
  276. creater runtime.ObjectCreater
  277. typer runtime.ObjectTyper
  278. }
  279. var _ runtime.Serializer = &RawSerializer{}
  280. const rawSerializerIdentifier runtime.Identifier = "raw-protobuf"
  281. // Decode attempts to convert the provided data into a protobuf message, extract the stored schema kind, apply the provided default
  282. // gvk, and then load that data into an object matching the desired schema kind or the provided into. If into is *runtime.Unknown,
  283. // the raw data will be extracted and no decoding will be performed. If into is not registered with the typer, then the object will
  284. // be straight decoded using normal protobuf unmarshalling (the MarshalTo interface). If into is provided and the original data is
  285. // not fully qualified with kind/version/group, the type of the into will be used to alter the returned gvk. On success or most
  286. // errors, the method will return the calculated schema kind.
  287. func (s *RawSerializer) Decode(originalData []byte, gvk *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
  288. if into == nil {
  289. return nil, nil, fmt.Errorf("this serializer requires an object to decode into: %#v", s)
  290. }
  291. if len(originalData) == 0 {
  292. // TODO: treat like decoding {} from JSON with defaulting
  293. return nil, nil, fmt.Errorf("empty data")
  294. }
  295. data := originalData
  296. actual := &schema.GroupVersionKind{}
  297. copyKindDefaults(actual, gvk)
  298. if intoUnknown, ok := into.(*runtime.Unknown); ok && intoUnknown != nil {
  299. intoUnknown.Raw = data
  300. intoUnknown.ContentEncoding = ""
  301. intoUnknown.ContentType = runtime.ContentTypeProtobuf
  302. intoUnknown.SetGroupVersionKind(*actual)
  303. return intoUnknown, actual, nil
  304. }
  305. types, _, err := s.typer.ObjectKinds(into)
  306. switch {
  307. case runtime.IsNotRegisteredError(err):
  308. pb, ok := into.(proto.Message)
  309. if !ok {
  310. return nil, actual, errNotMarshalable{reflect.TypeOf(into)}
  311. }
  312. if err := proto.Unmarshal(data, pb); err != nil {
  313. return nil, actual, err
  314. }
  315. return into, actual, nil
  316. case err != nil:
  317. return nil, actual, err
  318. default:
  319. copyKindDefaults(actual, &types[0])
  320. // if the result of defaulting did not set a version or group, ensure that at least group is set
  321. // (copyKindDefaults will not assign Group if version is already set). This guarantees that the group
  322. // of into is set if there is no better information from the caller or object.
  323. if len(actual.Version) == 0 && len(actual.Group) == 0 {
  324. actual.Group = types[0].Group
  325. }
  326. }
  327. if len(actual.Kind) == 0 {
  328. return nil, actual, runtime.NewMissingKindErr("<protobuf encoded body - must provide default type>")
  329. }
  330. if len(actual.Version) == 0 {
  331. return nil, actual, runtime.NewMissingVersionErr("<protobuf encoded body - must provide default type>")
  332. }
  333. return unmarshalToObject(s.typer, s.creater, actual, into, data)
  334. }
  335. // unmarshalToObject is the common code between decode in the raw and normal serializer.
  336. func unmarshalToObject(typer runtime.ObjectTyper, creater runtime.ObjectCreater, actual *schema.GroupVersionKind, into runtime.Object, data []byte) (runtime.Object, *schema.GroupVersionKind, error) {
  337. // use the target if necessary
  338. obj, err := runtime.UseOrCreateObject(typer, creater, *actual, into)
  339. if err != nil {
  340. return nil, actual, err
  341. }
  342. pb, ok := obj.(proto.Message)
  343. if !ok {
  344. return nil, actual, errNotMarshalable{reflect.TypeOf(obj)}
  345. }
  346. if err := proto.Unmarshal(data, pb); err != nil {
  347. return nil, actual, err
  348. }
  349. if actual != nil {
  350. obj.GetObjectKind().SetGroupVersionKind(*actual)
  351. }
  352. return obj, actual, nil
  353. }
  354. // Encode serializes the provided object to the given writer. Overrides is ignored.
  355. func (s *RawSerializer) Encode(obj runtime.Object, w io.Writer) error {
  356. return s.encode(obj, w, &runtime.SimpleAllocator{})
  357. }
  358. // EncodeWithAllocator writes an object to the provided writer.
  359. // In addition, it allows for providing a memory allocator for efficient memory usage during object serialization.
  360. func (s *RawSerializer) EncodeWithAllocator(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
  361. return s.encode(obj, w, memAlloc)
  362. }
  363. func (s *RawSerializer) encode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
  364. if co, ok := obj.(runtime.CacheableObject); ok {
  365. return co.CacheEncode(s.Identifier(), func(obj runtime.Object, w io.Writer) error { return s.doEncode(obj, w, memAlloc) }, w)
  366. }
  367. return s.doEncode(obj, w, memAlloc)
  368. }
  369. func (s *RawSerializer) doEncode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
  370. if memAlloc == nil {
  371. klog.Error("a mandatory memory allocator wasn't provided, this might have a negative impact on performance, check invocations of EncodeWithAllocator method, falling back on runtime.SimpleAllocator")
  372. memAlloc = &runtime.SimpleAllocator{}
  373. }
  374. switch t := obj.(type) {
  375. case bufferedReverseMarshaller:
  376. // this path performs a single allocation during write only when the Allocator wasn't provided
  377. // it also requires the caller to implement the more efficient Size and MarshalToSizedBuffer methods
  378. encodedSize := uint64(t.Size())
  379. data := memAlloc.Allocate(encodedSize)
  380. n, err := t.MarshalToSizedBuffer(data)
  381. if err != nil {
  382. return err
  383. }
  384. _, err = w.Write(data[:n])
  385. return err
  386. case bufferedMarshaller:
  387. // this path performs a single allocation during write only when the Allocator wasn't provided
  388. // it also requires the caller to implement the more efficient Size and MarshalTo methods
  389. encodedSize := uint64(t.Size())
  390. data := memAlloc.Allocate(encodedSize)
  391. n, err := t.MarshalTo(data)
  392. if err != nil {
  393. return err
  394. }
  395. _, err = w.Write(data[:n])
  396. return err
  397. case proto.Marshaler:
  398. // this path performs extra allocations
  399. data, err := t.Marshal()
  400. if err != nil {
  401. return err
  402. }
  403. _, err = w.Write(data)
  404. return err
  405. default:
  406. return errNotMarshalable{reflect.TypeOf(obj)}
  407. }
  408. }
  409. // Identifier implements runtime.Encoder interface.
  410. func (s *RawSerializer) Identifier() runtime.Identifier {
  411. return rawSerializerIdentifier
  412. }
  413. // LengthDelimitedFramer is exported variable of type lengthDelimitedFramer
  414. var LengthDelimitedFramer = lengthDelimitedFramer{}
  415. // Provides length delimited frame reader and writer methods
  416. type lengthDelimitedFramer struct{}
  417. // NewFrameWriter implements stream framing for this serializer
  418. func (lengthDelimitedFramer) NewFrameWriter(w io.Writer) io.Writer {
  419. return framer.NewLengthDelimitedFrameWriter(w)
  420. }
  421. // NewFrameReader implements stream framing for this serializer
  422. func (lengthDelimitedFramer) NewFrameReader(r io.ReadCloser) io.ReadCloser {
  423. return framer.NewLengthDelimitedFrameReader(r)
  424. }