From b23d635763317d15fcf36e90ca21efb6d9a7281a Mon Sep 17 00:00:00 2001 From: hzliangbin Date: Sun, 5 Jun 2022 23:29:09 +0800 Subject: [PATCH] feat: add status api --- cmd/cmd.go | 3 +- pkg/dt/distributed_transaction_manger.go | 2 + pkg/http/routes.go | 3 + pkg/http/status.go | 95 ++++++++++++++++++++++++ 4 files changed, 102 insertions(+), 1 deletion(-) create mode 100644 pkg/http/status.go diff --git a/cmd/cmd.go b/cmd/cmd.go index 4b31c027..a87ea3fb 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -19,7 +19,6 @@ package main import ( "context" "fmt" - "net" "net/http" "os" @@ -121,10 +120,12 @@ var ( } if conf.DistributedTransaction != nil { + dbpackHttp.DistributedTransactionEnabled = true driver := etcd.NewEtcdStore(conf.DistributedTransaction.EtcdConfig) dt.InitDistributedTransactionManager(conf.DistributedTransaction, driver) } + dbpackHttp.Listeners = conf.Listeners dbpack := server.NewServer() for _, listenerConf := range conf.Listeners { switch listenerConf.ProtocolType { diff --git a/pkg/dt/distributed_transaction_manger.go b/pkg/dt/distributed_transaction_manger.go index 71979830..06b3e565 100644 --- a/pkg/dt/distributed_transaction_manger.go +++ b/pkg/dt/distributed_transaction_manger.go @@ -32,6 +32,7 @@ import ( "github.com/cectc/dbpack/pkg/dt/api" "github.com/cectc/dbpack/pkg/dt/metrics" "github.com/cectc/dbpack/pkg/dt/storage" + dbpackHttp "github.com/cectc/dbpack/pkg/http" "github.com/cectc/dbpack/pkg/log" "github.com/cectc/dbpack/pkg/misc" "github.com/cectc/dbpack/pkg/misc/uuid" @@ -71,6 +72,7 @@ func InitDistributedTransactionManager(conf *config.DistributedTransaction, stor } go func() { if storageDriver.LeaderElection(manager.applicationID) { + dbpackHttp.IsMaster = true if err := manager.processGlobalSessions(); err != nil { log.Fatal(err) } diff --git a/pkg/http/routes.go b/pkg/http/routes.go index 5f272011..3cfa44d5 100644 --- a/pkg/http/routes.go +++ b/pkg/http/routes.go @@ -30,5 +30,8 @@ func RegisterRoutes() (http.Handler, error) { // Add server metrics router registerMetricsRouter(router) + // Add status router + registerStatusRouter(router) + return router, nil } diff --git a/pkg/http/status.go b/pkg/http/status.go new file mode 100644 index 00000000..f7738254 --- /dev/null +++ b/pkg/http/status.go @@ -0,0 +1,95 @@ +/* + * Copyright 2022 CECTC, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package http + +import ( + "encoding/json" + "fmt" + "net" + "net/http" + "time" + + "github.com/gorilla/mux" + + "github.com/cectc/dbpack/pkg/config" +) + +const ( + statusPath = "/status" +) + +var ( + Listeners []*config.Listener + DistributedTransactionEnabled bool + IsMaster bool +) + +type ListenerStatus struct { + ProtocolType string `json:"protocol_type"` + SocketAddress config.SocketAddress `json:"socket_address"` + Active bool `json:"active"` +} + +type Result struct { + ListenersStatus []ListenerStatus `json:"listeners"` + DTEnabled bool `json:"distributed_transaction_enabled"` + IsMaster bool `json:"is_master"` +} + +func registerStatusRouter(router *mux.Router) { + router.Methods(http.MethodGet).Path(statusPath).HandlerFunc(statusHandler) +} + +func statusHandler(w http.ResponseWriter, r *http.Request) { + listenersStatus := make([]ListenerStatus, len(Listeners)) + + for i, listener := range Listeners { + active := false + lisAddr := fmt.Sprintf("%s:%d", listener.SocketAddress.Address, listener.SocketAddress.Port) + conn, err := net.DialTimeout("tcp", lisAddr, 5*time.Second) + if err == nil && conn != nil { + active = true + conn.Close() + } + + protocolType := "http" + if listener.ProtocolType == config.Mysql { + protocolType = "mysql" + } + + status := ListenerStatus{ + ProtocolType: protocolType, + SocketAddress: listener.SocketAddress, + Active: active, + } + listenersStatus[i] = status + } + + result := Result{ListenersStatus: listenersStatus} + if DistributedTransactionEnabled { + result.DTEnabled = true + result.IsMaster = IsMaster + } + + b, err := json.Marshal(result) + if err != nil { + w.Write([]byte(err.Error())) + w.WriteHeader(http.StatusInternalServerError) + return + } + w.Write(b) + w.WriteHeader(http.StatusOK) +}