commit 35956d981fb3bfb1eb75a215af300fee9f4056bd Author: fg-admin Date: Wed Jan 25 14:03:37 2023 +0300 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..53a80b7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/cmd/proxy/proxy +/proxy diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..7a4a3ea --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..acda386 --- /dev/null +++ b/README.md @@ -0,0 +1,29 @@ +# Proxy + +## Info + +Proxy listen on URL: `http://${address}/proxy` + +Flink's `module.yaml` example: + +```yaml +kind: io.statefun.endpoints.v2/http +spec: + functions: proxy/* + urlPathTemplate: http://${address}/proxy + maxNumBatchRequests: 1 + transport: + type: io.statefun.transports.v1/async + connect: 5s + call: 600s + payload_max_bytes: 52428800 + +``` + +## Run + +### Environment variables + +|Variable|Default|Description| +|:------:|:-----:|:---------:| +|STATEFUN_PROXY_DEBUG|""|Debug log level| diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go new file mode 100644 index 0000000..5412460 --- /dev/null +++ b/cmd/proxy/main.go @@ -0,0 +1,17 @@ +// Copyright 2022 Listware + +package main + +import ( + "fmt" + "os" + + "git.fg-tech.ru/listware/proxy/pkg/agent" +) + +func main() { + if err := agent.Proxy.Run(os.Args); err != nil { + fmt.Println(err) + os.Exit(1) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..beeb27e --- /dev/null +++ b/go.mod @@ -0,0 +1,31 @@ +module git.fg-tech.ru/listware/proxy + +go 1.19 + +require ( + git.fg-tech.ru/listware/cmdb v0.1.0 + git.fg-tech.ru/listware/go-core v0.1.0 + git.fg-tech.ru/listware/proto v0.1.1 + github.com/gorilla/mux v1.8.0 + github.com/urfave/cli/v2 v2.24.1 + go.uber.org/zap v1.24.0 + google.golang.org/protobuf v1.28.1 +) + +require ( + github.com/apache/flink-statefun/statefun-sdk-go/v3 v3.2.0 // indirect + github.com/arangodb/go-driver v1.4.1 // indirect + github.com/arangodb/go-velocypack v0.0.0-20200318135517-5af53c29c67e // indirect + github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect + github.com/golang/protobuf v1.5.2 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect + go.uber.org/atomic v1.7.0 // indirect + go.uber.org/multierr v1.6.0 // indirect + golang.org/x/net v0.5.0 // indirect + golang.org/x/sys v0.4.0 // indirect + golang.org/x/text v0.6.0 // indirect + google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6 // indirect + google.golang.org/grpc v1.52.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..26ac875 --- /dev/null +++ b/go.sum @@ -0,0 +1,66 @@ +git.fg-tech.ru/listware/cmdb v0.1.0 h1:K4A8XI8X5nh975+m+LVea/kIa2hSQSXCnH3qVjDjOwk= +git.fg-tech.ru/listware/cmdb v0.1.0/go.mod h1:kke3p3xl6lPlqwSfmA+grlKhGX6zKXctjcbE1Pcs9Ik= +git.fg-tech.ru/listware/go-core v0.1.0 h1:ftwr4VjmFSHfCqPQFTi2rcpELVznYBUPmwRbKMm/OGQ= +git.fg-tech.ru/listware/go-core v0.1.0/go.mod h1:jKjqdeZUtlNoSFSSHSmiw0fAy4FWeYhLqqRvLU5K4t4= +git.fg-tech.ru/listware/proto v0.1.1 h1:CSqteAtgysiJe7+KtLOEXSIvxypmlJCKwQtla1d2v+A= +git.fg-tech.ru/listware/proto v0.1.1/go.mod h1:t5lyMTuX/if05HI/na9tJAlHCHHMdhdPLBTkhvscedQ= +github.com/apache/flink-statefun/statefun-sdk-go/v3 v3.2.0 h1:OfLhhWnnOfBUvzbQuhE7hCKJdlBW41nV3CfCF/q7UJs= +github.com/apache/flink-statefun/statefun-sdk-go/v3 v3.2.0/go.mod h1:uHiPJsi71a161NMH/ISkkSPIXenkcG9A2m+uhT8UlJ4= +github.com/arangodb/go-driver v1.4.1 h1:Jg0N7XKxiKwjswmAcMCnefWmt81KJEqybqRAGJDRWlo= +github.com/arangodb/go-driver v1.4.1/go.mod h1:UTtaxTUMmyPWzKc2dsWWOZzZ3yM6aHWxn/eubGa3YmQ= +github.com/arangodb/go-velocypack v0.0.0-20200318135517-5af53c29c67e h1:Xg+hGrY2LcQBbxd0ZFdbGSyRKTYMZCfBbw/pMJFOk1g= +github.com/arangodb/go-velocypack v0.0.0-20200318135517-5af53c29c67e/go.mod h1:mq7Shfa/CaixoDxiyAAc5jZ6CVBAyPaNQCGS7mkj4Ho= +github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= +github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= +github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= +github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/urfave/cli/v2 v2.24.1 h1:/QYYr7g0EhwXEML8jO+8OYt5trPnLHS0p3mrgExJ5NU= +github.com/urfave/cli/v2 v2.24.1/go.mod h1:GHupkWPMM0M/sj1a2b4wUrWBPzazNrIjouW6fmdJLxc= +github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= +github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= +go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= +go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= +go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= +golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw= +golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= +golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= +golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k= +golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6 h1:a2S6M0+660BgMNl++4JPlcAO/CjkqYItDEZwkoDQK7c= +google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6/go.mod h1:rZS5c/ZVYMaOGBfO68GWtjOw/eLaZM1X6iVtgjZ+EWg= +google.golang.org/grpc v1.52.1 h1:2NpOPk5g5Xtb0qebIEs7hNIa++PdtZLo2AQUpc1YnSU= +google.golang.org/grpc v1.52.1/go.mod h1:pu6fVzoFb+NBYNAvQL08ic+lvB2IojljRYuun5vorUY= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= +google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/pkg/agent/proxy.go b/pkg/agent/proxy.go new file mode 100644 index 0000000..ebf20de --- /dev/null +++ b/pkg/agent/proxy.go @@ -0,0 +1,48 @@ +// Copyright 2022 Listware + +package agent + +import ( + "git.fg-tech.ru/listware/proxy/pkg/proxy" + "github.com/urfave/cli/v2" +) + +var Proxy = &cli.App{ + Name: "statefun-proxy", + Usage: "Flink's Stateful Functions Proxy", + Version: "v0.0.1", + Flags: []cli.Flag{ + &cli.StringSliceFlag{ + Name: "broker", + Usage: "Kafka broker URLs", + Value: cli.NewStringSlice("0.0.0.0:19092"), + EnvVars: []string{"KAFKA_BROKER"}, + }, + &cli.StringFlag{ + Name: "listen", + Aliases: []string{"l"}, + Usage: "Address to listen on", + Value: ":8801", + EnvVars: []string{"STATEFUN_PROXY_ADDR"}, + }, + &cli.BoolFlag{ + Name: "debug", + Aliases: []string{"d"}, + Usage: "Debug log level", + Value: false, + EnvVars: []string{"STATEFUN_PROXY_DEBUG"}, + }, + }, + Action: func(ctx *cli.Context) (err error) { + p, err := proxy.New(proxy.WithAddr(ctx.String("listen"))) + if err != nil { + return + } + if ctx.Bool("debug") { + if err = p.Configure(proxy.WithDebug()); err != nil { + return + } + } + return p.Run(ctx.Context) + }, +} diff --git a/pkg/module/module.go b/pkg/module/module.go new file mode 100644 index 0000000..5fc4d41 --- /dev/null +++ b/pkg/module/module.go @@ -0,0 +1,11 @@ +// Copyright 2022 Listware + +package module + +import ( + "git.fg-tech.ru/listware/go-core/pkg/module" +) + +func New(namespace string, opts ...module.Opt) module.Module { + return module.New(namespace, opts...) +} diff --git a/pkg/proxy/opts.go b/pkg/proxy/opts.go new file mode 100644 index 0000000..67b252b --- /dev/null +++ b/pkg/proxy/opts.go @@ -0,0 +1,27 @@ +// Copyright 2022 Listware + +package proxy + +import ( + "go.uber.org/zap" +) + +type Opt func(*Proxy) error + +func WithAddr(addr string) Opt { + return func(p *Proxy) (err error) { + p.addr = addr + return + } +} + +func WithDebug() Opt { + return func(p *Proxy) (err error) { + l, err := zap.NewDevelopment() + if err != nil { + return + } + p.log = l.Sugar() + return + } +} diff --git a/pkg/proxy/proxy.go b/pkg/proxy/proxy.go new file mode 100644 index 0000000..c818df4 --- /dev/null +++ b/pkg/proxy/proxy.go @@ -0,0 +1,212 @@ +// Copyright 2022 Listware + +package proxy + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httputil" + "net/url" + + "git.fg-tech.ru/listware/cmdb/pkg/cmdb/qdsl" + "git.fg-tech.ru/listware/proto/sdk/pbflink" + "git.fg-tech.ru/listware/proto/sdk/pbtypes" + "github.com/gorilla/mux" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" +) + +const ( + forwardedHeader = "X-Forwarded-Host" +) + +var ( + noopFromFunction = &pbflink.FromFunction{ + Response: &pbflink.FromFunction_InvocationResult{ + InvocationResult: &pbflink.FromFunction_InvocationResponse{}, + }, + } +) + +type Proxy struct { + addr string + reverseProxy *httputil.ReverseProxy + + log *zap.SugaredLogger + + ctx context.Context + cancel context.CancelFunc + + noopData []byte +} + +func New(opts ...Opt) (p *Proxy, err error) { + l, err := zap.NewProduction() + if err != nil { + return + } + p = &Proxy{ + log: l.Sugar(), + } + p.reverseProxy = &httputil.ReverseProxy{ + Director: p.directorFn, + ModifyResponse: p.modifyResponse, + ErrorHandler: p.errorHandler, + } + + if p.noopData, err = proto.Marshal(noopFromFunction); err != nil { + return + } + + return p, p.Configure(opts...) +} + +func (p *Proxy) Configure(opts ...Opt) (err error) { + for _, opt := range opts { + if err = opt(p); err != nil { + return + } + } + return +} + +func (p *Proxy) Run(ctx context.Context) (err error) { + r := mux.NewRouter() + r.HandleFunc("/{type}", handler(p.reverseProxy)) + + srv := &http.Server{ + Addr: p.addr, + Handler: r, + } + + p.ctx, p.cancel = context.WithCancel(ctx) + + p.log.Debugf("listen & serve on: %s", p.addr) + + go func() { + if err = srv.ListenAndServe(); err != nil { + p.log.Errorf("failed: %s", err.Error()) + p.cancel() + } + }() + <-p.ctx.Done() + if err != nil { + return + } + return srv.Shutdown(p.ctx) +} + +func (p *Proxy) directorFn(r *http.Request) { + if r.Method != http.MethodPost { + p.log.Errorf("only '%s' method allowed: %s", http.MethodPost, r.Method) + return + } + + functionType := mux.Vars(r)["type"] + + body, err := io.ReadAll(r.Body) + if err != nil { + p.log.Error(err.Error()) + return + } + + var toFunction pbflink.ToFunction + if err = proto.Unmarshal(body, &toFunction); err != nil { + p.log.Error(err.Error()) + return + } + + batch := toFunction.GetInvocation() + tgt := batch.GetTarget() + + if r.URL, err = p.queryURL(functionType, tgt.GetId()); err != nil { + p.log.Error(err.Error()) + return + } + + if body, err = proto.Marshal(&toFunction); err != nil { + p.log.Error(err.Error()) + return + } + + r.Body = io.NopCloser(bytes.NewBuffer(body)) + r.ContentLength = int64(len(body)) + + r.Header.Add(forwardedHeader, r.Host) + r.Host = r.URL.Host +} + +func (p *Proxy) modifyResponse(r *http.Response) (err error) { + if r.StatusCode != http.StatusOK { + return p.noopResponse(r) + } + + body, err := io.ReadAll(r.Body) + if err != nil { + p.log.Error(err.Error()) + return p.noopResponse(r) + } + + var fromFunction pbflink.FromFunction + if err = proto.Unmarshal(body, &fromFunction); err != nil { + return p.noopResponse(r) + } + + r.Body = io.NopCloser(bytes.NewBuffer(body)) + r.ContentLength = int64(len(body)) + r.Header.Set("Content-Length", fmt.Sprint(r.ContentLength)) + return +} + +func (p *Proxy) errorHandler(w http.ResponseWriter, r *http.Request, err error) { + w.WriteHeader(http.StatusOK) + w.Write(p.noopData) +} + +func (p *Proxy) noopResponse(r *http.Response) (err error) { + r.Body = io.NopCloser(bytes.NewBuffer(p.noopData)) + r.ContentLength = int64(len(p.noopData)) + r.StatusCode = http.StatusOK + + r.Header.Del("X-Content-Type-Options") + + r.Header.Set("Content-Length", fmt.Sprint(r.ContentLength)) + r.Header.Set("Content-Type", "application/octet-stream") + return +} + +// FIXME add r.URL to DNS instead cmdb qdsl +// r.Host = functionType +// functionType = from dns +func (p *Proxy) queryURL(functionType, id string) (u *url.URL, err error) { + query := fmt.Sprintf("*[?@._id == '%s'?].%s", id, functionType) + + elems, err := qdsl.Qdsl(p.ctx, query, qdsl.WithLink()) + if err != nil { + return + } + elemslen := len(elems) + if elemslen == 0 { + err = fmt.Errorf("unknown function implementation: %s", query) + return + } + if elemslen > 1 { + err = fmt.Errorf("multiple implementation of function: %s", query) + return + } + var link pbtypes.FunctionRoute + if err = json.Unmarshal(elems[0].Link, &link); err != nil { + return + } + return url.Parse(link.Url) +} + +func handler(p *httputil.ReverseProxy) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + p.ServeHTTP(w, r) + } +} diff --git a/pkg/utils/reader.go b/pkg/utils/reader.go new file mode 100644 index 0000000..3fa8018 --- /dev/null +++ b/pkg/utils/reader.go @@ -0,0 +1,38 @@ +// Copyright 2022 Listware + +package utils + +import ( + "bytes" + "io" +) + +type reusableReader struct { + io.Reader + readBuf *bytes.Buffer + backBuf *bytes.Buffer +} + +func ReusableReader(r io.Reader) io.Reader { + readBuf := bytes.Buffer{} + readBuf.ReadFrom(r) // error handling ignored for brevity + backBuf := bytes.Buffer{} + + return reusableReader{ + io.TeeReader(&readBuf, &backBuf), + &readBuf, + &backBuf, + } +} + +func (r reusableReader) Read(p []byte) (int, error) { + n, err := r.Reader.Read(p) + if err == io.EOF { + r.reset() + } + return n, err +} + +func (r reusableReader) reset() { + io.Copy(r.readBuf, r.backBuf) // nolint: errcheck +}