Skip to content

support collecting FsUsageMetrics for containerd #2872

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 82 additions & 42 deletions container/common/fsHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"time"

"github.com/google/cadvisor/fs"

"k8s.io/klog/v2"
)

Expand All @@ -37,15 +36,21 @@ type FsUsage struct {
InodeUsage uint64
}

type FsUsageProvider interface {
// Usage returns the fs usage
Usage() (*FsUsage, error)
// Targets returns where the fs usage metric is collected,it maybe a directory ,a file or some
// information about the snapshotter(for containerd)
Targets() []string
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add godoc on what "Targets" is referring to

Copy link
Author

@yyrdl yyrdl May 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added .

this method is used to get where the fs usage is collected.

}

type realFsHandler struct {
sync.RWMutex
lastUpdate time.Time
usage FsUsage
period time.Duration
minPeriod time.Duration
rootfs string
extraDir string
fsInfo fs.FsInfo
lastUpdate time.Time
usage FsUsage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is thisusage the cache?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

period time.Duration
minPeriod time.Duration
usageProvider FsUsageProvider
// Tells the container to stop.
stopChan chan struct{}
}
Expand All @@ -58,51 +63,33 @@ const DefaultPeriod = time.Minute

var _ FsHandler = &realFsHandler{}

func NewFsHandler(period time.Duration, rootfs, extraDir string, fsInfo fs.FsInfo) FsHandler {
func NewFsHandler(period time.Duration, provider FsUsageProvider) FsHandler {
return &realFsHandler{
lastUpdate: time.Time{},
usage: FsUsage{},
period: period,
minPeriod: period,
rootfs: rootfs,
extraDir: extraDir,
fsInfo: fsInfo,
stopChan: make(chan struct{}, 1),
lastUpdate: time.Time{},
usage: FsUsage{},
period: period,
minPeriod: period,
usageProvider: provider,
stopChan: make(chan struct{}, 1),
}
}

func (fh *realFsHandler) update() error {
var (
rootUsage, extraUsage fs.UsageInfo
rootErr, extraErr error
)
// TODO(vishh): Add support for external mounts.
if fh.rootfs != "" {
rootUsage, rootErr = fh.fsInfo.GetDirUsage(fh.rootfs)
}

if fh.extraDir != "" {
extraUsage, extraErr = fh.fsInfo.GetDirUsage(fh.extraDir)
usage, err := fh.usageProvider.Usage()

if err != nil {
return err
}

// Wait to handle errors until after all operartions are run.
// An error in one will not cause an early return, skipping others
fh.Lock()
defer fh.Unlock()
fh.lastUpdate = time.Now()
if fh.rootfs != "" && rootErr == nil {
fh.usage.InodeUsage = rootUsage.Inodes
fh.usage.BaseUsageBytes = rootUsage.Bytes
fh.usage.TotalUsageBytes = rootUsage.Bytes
}
if fh.extraDir != "" && extraErr == nil {
fh.usage.TotalUsageBytes += extraUsage.Bytes
}

// Combine errors into a single error to return
if rootErr != nil || extraErr != nil {
return fmt.Errorf("rootDiskErr: %v, extraDiskErr: %v", rootErr, extraErr)
}
fh.usage.InodeUsage = usage.InodeUsage
fh.usage.BaseUsageBytes = usage.BaseUsageBytes
fh.usage.TotalUsageBytes = usage.TotalUsageBytes

return nil
}

Expand All @@ -125,7 +112,8 @@ func (fh *realFsHandler) trackUsage() {
// if the long duration is persistent either because of slow
// disk or lots of containers.
longOp = longOp + time.Second
klog.V(2).Infof("fs: disk usage and inodes count on following dirs took %v: %v; will not log again for this container unless duration exceeds %v", duration, []string{fh.rootfs, fh.extraDir}, longOp)
klog.V(2).Infof(`fs: disk usage and inodes count on targets took %v: %v; `+
`will not log again for this container unless duration exceeds %v`, duration, fh.usageProvider.Targets(), longOp)
}
select {
case <-fh.stopChan:
Expand All @@ -148,3 +136,55 @@ func (fh *realFsHandler) Usage() FsUsage {
defer fh.RUnlock()
return fh.usage
}

type fsUsageProvider struct {
fsInfo fs.FsInfo
rootFs string
extraDir string
}

func NewGeneralFsUsageProvider(fsInfo fs.FsInfo, rootFs, extraDir string) FsUsageProvider {
return &fsUsageProvider{
fsInfo: fsInfo,
rootFs: rootFs,
extraDir: extraDir,
}
}

func (f *fsUsageProvider) Targets() []string {
return []string{f.rootFs, f.extraDir}
}

func (f *fsUsageProvider) Usage() (*FsUsage, error) {
var (
rootUsage, extraUsage fs.UsageInfo
rootErr, extraErr error
)

if f.rootFs != "" {
rootUsage, rootErr = f.fsInfo.GetDirUsage(f.rootFs)
}

if f.extraDir != "" {
extraUsage, extraErr = f.fsInfo.GetDirUsage(f.extraDir)
}

usage := &FsUsage{}

if f.rootFs != "" && rootErr == nil {
usage.InodeUsage = rootUsage.Inodes
usage.BaseUsageBytes = rootUsage.Bytes
usage.TotalUsageBytes = rootUsage.Bytes
}

if f.extraDir != "" && extraErr == nil {
usage.TotalUsageBytes += extraUsage.Bytes
}

// Combine errors into a single error to return
if rootErr != nil || extraErr != nil {
return nil, fmt.Errorf("rootDiskErr: %v, extraDiskErr: %v", rootErr, extraErr)
}

return usage, nil
}
20 changes: 20 additions & 0 deletions container/containerd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (
"time"

containersapi "github.com/containerd/containerd/api/services/containers/v1"
snaptshotapi "github.com/containerd/containerd/api/services/snapshots/v1"
tasksapi "github.com/containerd/containerd/api/services/tasks/v1"
versionapi "github.com/containerd/containerd/api/services/version/v1"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/pkg/dialer"
ptypes "github.com/gogo/protobuf/types"
"github.com/google/cadvisor/container/common"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
)
Expand All @@ -36,12 +38,14 @@ type client struct {
containerService containersapi.ContainersClient
taskService tasksapi.TasksClient
versionService versionapi.VersionClient
snapshotsService snaptshotapi.SnapshotsClient
}

type ContainerdClient interface {
LoadContainer(ctx context.Context, id string) (*containers.Container, error)
TaskPid(ctx context.Context, id string) (uint32, error)
Version(ctx context.Context) (string, error)
ContainerFsUsage(ctx context.Context, snapshotter, snapshotkey string) (*common.FsUsage, error)
}

var once sync.Once
Expand Down Expand Up @@ -92,6 +96,7 @@ func Client(address, namespace string) (ContainerdClient, error) {
containerService: containersapi.NewContainersClient(conn),
taskService: tasksapi.NewTasksClient(conn),
versionService: versionapi.NewVersionClient(conn),
snapshotsService: snaptshotapi.NewSnapshotsClient(conn),
}
})
return ctrdClient, retErr
Expand Down Expand Up @@ -125,6 +130,21 @@ func (c *client) Version(ctx context.Context) (string, error) {
return response.Version, nil
}

func (c *client) ContainerFsUsage(ctx context.Context, snapshotter, snapshotkey string) (*common.FsUsage, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does ContainerFsUsage need to be exported?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes , this client is delivered as interface, if this method is not exported, we can't get the fs usage.

func newContainerdContainerHandler(
	client ContainerdClient,//  Here
	name string,
	machineInfoFactory info.MachineInfoFactory,
	fsInfo fs.FsInfo,
	cgroupSubsystems *containerlibcontainer.CgroupSubsystems,
	inHostNamespace bool,
	metadataEnvs []string,
	includedMetrics container.MetricSet,
) (container.ContainerHandler, error)

usage, err := c.snapshotsService.Usage(ctx, &snaptshotapi.UsageRequest{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how expensive is this?

will containerd snapshotter have to recaculate the usage every time this is called or does it cache the usage internally?

Copy link
Author

@yyrdl yyrdl Jun 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there is a cache in it's CRI implemention . And Usage method will recaculate the usage every time, I have change it to ContainerStats,use CRI instead.

Snapshotter: snapshotter,
Key: snapshotkey,
})
if err != nil {
return nil, err
}
return &common.FsUsage{
BaseUsageBytes: uint64(usage.Size_),
TotalUsageBytes: uint64(usage.Size_),
Copy link
Collaborator

@bobbypage bobbypage May 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it correct that these are both set to usage.Size?

BaseUsageBytes is "Number of bytes consumed by a container through its root filesystem." vs TotalusageBytes is "Total Number of bytes consumed by container." ?

Copy link
Author

@yyrdl yyrdl May 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes ,for containerd ,there is no log file(used to store logs from container's stdout or stderr) ( At least ,I didn't find it (T_T) ) . But for docker and crio , there are log files, so when collect fs usage, the usage of log file is counted.

By the way, kubelet storge container's logs in another place (/var/log/pods), and it is counted by kubelet when it caculate the usage. see makeContainerStats in kubelet/stats/cri_stats_provider.go

// NOTE: This doesn't support the old pod log path, `/var/log/pods/UID`. For containers
	// using old log path, empty log stats are returned. This is fine, because we don't
	// officially support in-place upgrade anyway.
	var (
		containerLogPath = kuberuntime.BuildContainerLogsDirectory(meta.GetNamespace(),
			meta.GetName(), types.UID(meta.GetUid()), container.GetMetadata().GetName())
		err error
	)
	result.Logs, err = p.getPathFsStats(containerLogPath, rootFsInfo)
	if err != nil {
		klog.Errorf("Unable to fetch container log stats for path %s: %v ", containerLogPath, err)
	}
	return result

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't cadvisor take the log size of /var/log/pods in account when counting TotalusageBytes?

InodeUsage: uint64(usage.Inodes),
}, nil
}

func containerFromProto(containerpb containersapi.Container) *containers.Container {
var runtime containers.RuntimeInfo
if containerpb.Runtime != nil {
Expand Down
6 changes: 6 additions & 0 deletions container/containerd/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"

"github.com/containerd/containerd/containers"
"github.com/google/cadvisor/container/common"
)

type containerdClientMock struct {
Expand All @@ -45,6 +46,11 @@ func (c *containerdClientMock) TaskPid(ctx context.Context, id string) (uint32,
return 2389, nil
}

func (c *containerdClientMock) ContainerFsUsage(ctx context.Context,
snapshotter, snapshotkey string) (*common.FsUsage, error) {
return &common.FsUsage{}, nil
}

func mockcontainerdClient(cntrs map[string]*containers.Container, returnErr error) ContainerdClient {
return &containerdClientMock{
cntrs: cntrs,
Expand Down
57 changes: 54 additions & 3 deletions container/containerd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

// Handler for containerd containers.

package containerd

import (
Expand All @@ -38,6 +39,7 @@ type containerdContainerHandler struct {
// (e.g.: "cpu" -> "/sys/fs/cgroup/cpu/test")
cgroupPaths map[string]string
fsInfo fs.FsInfo
fsHandler common.FsHandler
// Metadata associated with the container.
reference info.ContainerReference
envs map[string]string
Expand Down Expand Up @@ -131,6 +133,16 @@ func newContainerdContainerHandler(
reference: containerReference,
libcontainerHandler: libcontainerHandler,
}

if includedMetrics.Has(container.DiskUsageMetrics) {
handler.fsHandler = common.NewFsHandler(common.DefaultPeriod, &fsUsageProvider{
ctx: ctx,
client: client,
snapshotter: cntr.Snapshotter,
snapshotkey: cntr.SnapshotKey,
})
}

// Add the name and bare ID as aliases of the container.
handler.image = cntr.Image

Expand Down Expand Up @@ -169,9 +181,7 @@ func (h *containerdContainerHandler) needNet() bool {
}

func (h *containerdContainerHandler) GetSpec() (info.ContainerSpec, error) {
// TODO: Since we dont collect disk usage stats for containerd, we set hasFilesystem
// to false. Revisit when we support disk usage stats for containerd
hasFilesystem := false
hasFilesystem := true
spec, err := common.GetSpec(h.cgroupPaths, h.machineInfoFactory, h.needNet(), hasFilesystem)
spec.Labels = h.labels
spec.Envs = h.envs
Expand All @@ -189,6 +199,26 @@ func (h *containerdContainerHandler) getFsStats(stats *info.ContainerStats) erro
if h.includedMetrics.Has(container.DiskIOMetrics) {
common.AssignDeviceNamesToDiskStats((*common.MachineInfoNamer)(mi), &stats.DiskIo)
}
if !h.includedMetrics.Has(container.DiskUsageMetrics) {
return nil
}

// TODO(yyrdl):for overlay ,the 'upperPath' is:
// `${containerd.Config.Root}/io.containerd.snapshotter.v1.overlayfs/snapshots/${snapshots.ID}/fs`,
// and for other snapshots plugins, we can also find the law from containerd's source code.

// Device 、fsType and fsLimits and other information are not supported yet, unless there is a way to
// know the id of the snapshot , or the `Stat`(snapshotsClient.Stat) method returns these information directly.
// And containerd has cached the disk usage stats in memory,so the best way is enhancing containerd's API
// (avoid collecting disk usage metrics twice)
fsStat := info.FsStats{}
usage := h.fsHandler.Usage()
fsStat.BaseUsage = usage.BaseUsageBytes
fsStat.Usage = usage.TotalUsageBytes
fsStat.Inodes = usage.InodeUsage

stats.Filesystem = append(stats.Filesystem, fsStat)

return nil
}

Expand Down Expand Up @@ -239,12 +269,33 @@ func (h *containerdContainerHandler) Type() container.ContainerType {
}

func (h *containerdContainerHandler) Start() {
if h.fsHandler != nil {
h.fsHandler.Start()
}
}

func (h *containerdContainerHandler) Cleanup() {
if h.fsHandler != nil {
h.fsHandler.Stop()
}
}

func (h *containerdContainerHandler) GetContainerIPAddress() string {
// containerd doesnt take care of networking.So it doesnt maintain networking states
return ""
}

type fsUsageProvider struct {
ctx context.Context
snapshotter string
snapshotkey string
client ContainerdClient
}

func (f *fsUsageProvider) Usage() (*common.FsUsage, error) {
return f.client.ContainerFsUsage(f.ctx, f.snapshotter, f.snapshotkey)
}

func (f *fsUsageProvider) Targets() []string {
return []string{fmt.Sprintf("snapshotter(%s) with key (%s)", f.snapshotter, f.snapshotkey)}
}
3 changes: 2 additions & 1 deletion container/crio/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ func newCrioContainerHandler(

// we optionally collect disk usage metrics
if includedMetrics.Has(container.DiskUsageMetrics) {
handler.fsHandler = common.NewFsHandler(common.DefaultPeriod, rootfsStorageDir, storageLogDir, fsInfo)
handler.fsHandler = common.NewFsHandler(common.DefaultPeriod, common.NewGeneralFsUsageProvider(
fsInfo, rootfsStorageDir, storageLogDir))
}
// TODO for env vars we wanted to show from container.Config.Env from whitelist
//for _, exposedEnv := range metadataEnvs {
Expand Down
3 changes: 2 additions & 1 deletion container/docker/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ func newDockerContainerHandler(

if includedMetrics.Has(container.DiskUsageMetrics) {
handler.fsHandler = &dockerFsHandler{
fsHandler: common.NewFsHandler(common.DefaultPeriod, rootfsStorageDir, otherStorageDir, fsInfo),
fsHandler: common.NewFsHandler(common.DefaultPeriod, common.NewGeneralFsUsageProvider(
fsInfo, rootfsStorageDir, otherStorageDir)),
thinPoolWatcher: thinPoolWatcher,
zfsWatcher: zfsWatcher,
deviceID: ctnr.GraphDriver.Data["DeviceId"],
Expand Down
2 changes: 1 addition & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -988,4 +988,4 @@ sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.15/go.mod h1:LEScyz
sigs.k8s.io/structured-merge-diff/v4 v4.0.2/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw=
sigs.k8s.io/structured-merge-diff/v4 v4.0.3/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw=
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=