Skip to content

Commit af2cf6b

Browse files
authored
Merge pull request #487 from imageworks/import-configuration
Import Configuration and Efficiency
2 parents de0665a + bf81b87 commit af2cf6b

File tree

11 files changed

+194
-103
lines changed

11 files changed

+194
-103
lines changed

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/spfs-cli/common/src/args.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,22 @@ pub struct Sync {
1717
/// already exists
1818
#[clap(long)]
1919
pub resync: bool,
20+
21+
/// The total number of manifests that can be synced concurrently
22+
#[clap(
23+
long,
24+
env = "SPFS_SYNC_MAX_CONCURRENT_MANIFESTS",
25+
default_value_t = spfs::sync::DEFAULT_MAX_CONCURRENT_MANIFESTS
26+
)]
27+
pub max_concurrent_manifests: usize,
28+
29+
/// The total number of file payloads that can be synced concurrently
30+
#[clap(
31+
long,
32+
env = "SPFS_SYNC_MAX_CONCURRENT_PAYLOADS",
33+
default_value_t = spfs::sync::DEFAULT_MAX_CONCURRENT_PAYLOADS
34+
)]
35+
pub max_concurrent_payloads: usize,
2036
}
2137

2238
impl Sync {
@@ -36,6 +52,8 @@ impl Sync {
3652
};
3753
spfs::Syncer::new(src, dest)
3854
.with_policy(policy)
55+
.with_max_concurrent_manifests(self.max_concurrent_manifests)
56+
.with_max_concurrent_payloads(self.max_concurrent_payloads)
3957
.with_reporter(spfs::sync::ConsoleSyncReporter::default())
4058
}
4159
}

crates/spfs/src/sync.rs

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// https://github.com/imageworks/spk
44

55
use std::collections::HashSet;
6+
use std::sync::Arc;
67

78
use futures::stream::{FuturesUnordered, TryStreamExt};
89
use once_cell::sync::OnceCell;
@@ -11,11 +12,22 @@ use tokio::sync::{RwLock, Semaphore};
1112
use crate::{encoding, prelude::*};
1213
use crate::{graph, storage, tracking, Error, Result};
1314

15+
/// The default limit for concurrent manifest sync operations
16+
/// per-syncer if not otherwise specified using
17+
/// [`Syncer::with_max_concurrent_manifests`]
18+
pub const DEFAULT_MAX_CONCURRENT_MANIFESTS: usize = 100;
19+
20+
/// The default limit for concurrent payload sync operations
21+
/// per-syncer if not otherwise specified using
22+
/// [`Syncer::with_max_concurrent_payloads`]
23+
pub const DEFAULT_MAX_CONCURRENT_PAYLOADS: usize = 100;
24+
1425
#[cfg(test)]
1526
#[path = "./sync_test.rs"]
1627
mod sync_test;
1728

1829
/// Methods for syncing data between repositories
30+
#[derive(Copy, Clone, Debug)]
1931
pub enum SyncPolicy {
2032
/// Starting at the top-most requested item, sync and
2133
/// descend only into objects that are missing in the
@@ -57,14 +69,16 @@ impl SyncPolicy {
5769
}
5870

5971
/// Handles the syncing of data between repositories
72+
///
73+
/// The syncer can be cloned efficiently
6074
pub struct Syncer<'src, 'dst, Reporter: SyncReporter = SilentSyncReporter> {
6175
src: &'src storage::RepositoryHandle,
6276
dest: &'dst storage::RepositoryHandle,
63-
reporter: Reporter,
77+
reporter: Arc<Reporter>,
6478
policy: SyncPolicy,
65-
manifest_semaphore: Semaphore,
66-
payload_semaphore: Semaphore,
67-
processed_digests: RwLock<HashSet<encoding::Digest>>,
79+
manifest_semaphore: Arc<Semaphore>,
80+
payload_semaphore: Arc<Semaphore>,
81+
processed_digests: Arc<RwLock<HashSet<encoding::Digest>>>,
6882
}
6983

7084
impl<'src, 'dst> Syncer<'src, 'dst> {
@@ -75,11 +89,11 @@ impl<'src, 'dst> Syncer<'src, 'dst> {
7589
Self {
7690
src,
7791
dest,
78-
reporter: SilentSyncReporter::default(),
92+
reporter: Arc::new(SilentSyncReporter::default()),
7993
policy: SyncPolicy::default(),
80-
manifest_semaphore: Semaphore::new(100),
81-
payload_semaphore: Semaphore::new(100),
82-
processed_digests: RwLock::new(HashSet::new()),
94+
manifest_semaphore: Arc::new(Semaphore::new(DEFAULT_MAX_CONCURRENT_MANIFESTS)),
95+
payload_semaphore: Arc::new(Semaphore::new(DEFAULT_MAX_CONCURRENT_PAYLOADS)),
96+
processed_digests: Arc::new(RwLock::new(HashSet::new())),
8397
}
8498
}
8599
}
@@ -88,6 +102,27 @@ impl<'src, 'dst, Reporter> Syncer<'src, 'dst, Reporter>
88102
where
89103
Reporter: SyncReporter,
90104
{
105+
/// Creates a new syncer pulling from the provided source
106+
///
107+
/// This new instance shares the same resource pool and cache
108+
/// as the one that is was cloned from. This allows them to more
109+
/// safely run concurrently, and to sync more efficiently
110+
/// by avoiding re-syncing the same objects.
111+
pub fn clone_with_source<'src2>(
112+
&self,
113+
source: &'src2 storage::RepositoryHandle,
114+
) -> Syncer<'src2, 'dst, Reporter> {
115+
Syncer {
116+
src: source,
117+
dest: self.dest,
118+
reporter: Arc::clone(&self.reporter),
119+
policy: self.policy,
120+
manifest_semaphore: Arc::clone(&self.manifest_semaphore),
121+
payload_semaphore: Arc::clone(&self.payload_semaphore),
122+
processed_digests: Arc::clone(&self.processed_digests),
123+
}
124+
}
125+
91126
/// Specifies how the Syncer should deal with different types of data
92127
/// during the sync process, replacing any existing one.
93128
/// See [`SyncPolicy`].
@@ -101,28 +136,29 @@ where
101136
/// The possible total concurrent sync tasks will be the
102137
/// layer concurrency plus the payload concurrency.
103138
pub fn with_max_concurrent_manifests(mut self, concurrency: usize) -> Self {
104-
self.manifest_semaphore = Semaphore::new(concurrency);
139+
self.manifest_semaphore = Arc::new(Semaphore::new(concurrency));
105140
self
106141
}
107142

108143
/// Set how many payloads/files can be processed at once.
109144
///
110145
/// The possible total concurrent sync tasks will be the
111146
/// layer concurrency plus the payload concurrency.
112-
pub fn with_max_payload_concurrency(mut self, concurrency: usize) -> Self {
113-
self.payload_semaphore = Semaphore::new(concurrency);
147+
pub fn with_max_concurrent_payloads(mut self, concurrency: usize) -> Self {
148+
self.payload_semaphore = Arc::new(Semaphore::new(concurrency));
114149
self
115150
}
116151

117152
/// Report progress to the given instance, replacing any existing one
118-
pub fn with_reporter<R>(self, reporter: R) -> Syncer<'src, 'dst, R>
153+
pub fn with_reporter<T, R>(self, reporter: T) -> Syncer<'src, 'dst, R>
119154
where
155+
T: Into<Arc<R>>,
120156
R: SyncReporter,
121157
{
122158
Syncer {
123159
src: self.src,
124160
dest: self.dest,
125-
reporter,
161+
reporter: reporter.into(),
126162
policy: self.policy,
127163
manifest_semaphore: self.manifest_semaphore,
128164
payload_semaphore: self.payload_semaphore,

crates/spk-build/src/archive_test.rs

Lines changed: 1 addition & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -5,70 +5,10 @@
55
use rstest::rstest;
66
use spk_schema::foundation::spec_ops::PackageOps;
77
use spk_schema::recipe;
8-
use spk_storage::{export_package, fixtures::*, import_package};
8+
use spk_storage::{export_package, fixtures::*};
99

1010
use crate::{BinaryPackageBuilder, BuildSource};
1111

12-
#[rstest]
13-
#[tokio::test]
14-
async fn test_archive_io() {
15-
let rt = spfs_runtime().await;
16-
let spec = recipe!(
17-
{
18-
"pkg": "spk-archive-test/0.0.1",
19-
"build": {"script": "touch /spfs/file.txt"},
20-
}
21-
);
22-
rt.tmprepo.publish_recipe(&spec).await.unwrap();
23-
let (spec, _) = BinaryPackageBuilder::from_recipe(spec)
24-
.with_source(BuildSource::LocalPath(".".into()))
25-
.build_and_publish(&*rt.tmprepo)
26-
.await
27-
.unwrap();
28-
let filename = rt.tmpdir.path().join("archive.spk");
29-
filename.ensure();
30-
export_package(spec.ident(), &filename)
31-
.await
32-
.expect("failed to export");
33-
let mut actual = Vec::new();
34-
let mut tarfile = tar::Archive::new(std::fs::File::open(&filename).unwrap());
35-
for entry in tarfile.entries().unwrap() {
36-
let filename = entry.unwrap().path().unwrap().to_string_lossy().to_string();
37-
if filename.contains('/') && !filename.contains("tags") {
38-
// ignore specific object data for this test
39-
continue;
40-
}
41-
actual.push(filename);
42-
}
43-
actual.sort();
44-
assert_eq!(
45-
actual,
46-
vec![
47-
"VERSION".to_string(),
48-
"objects".to_string(),
49-
"payloads".to_string(),
50-
"renders".to_string(),
51-
"tags".to_string(),
52-
"tags/spk".to_string(),
53-
"tags/spk/pkg".to_string(),
54-
"tags/spk/pkg/spk-archive-test".to_string(),
55-
"tags/spk/pkg/spk-archive-test/0.0.1".to_string(),
56-
"tags/spk/pkg/spk-archive-test/0.0.1/3I42H3S6".to_string(),
57-
"tags/spk/pkg/spk-archive-test/0.0.1/3I42H3S6.tag".to_string(),
58-
"tags/spk/pkg/spk-archive-test/0.0.1/3I42H3S6/build.tag".to_string(),
59-
"tags/spk/pkg/spk-archive-test/0.0.1/3I42H3S6/run.tag".to_string(),
60-
"tags/spk/spec".to_string(),
61-
"tags/spk/spec/spk-archive-test".to_string(),
62-
"tags/spk/spec/spk-archive-test/0.0.1".to_string(),
63-
"tags/spk/spec/spk-archive-test/0.0.1.tag".to_string(),
64-
"tags/spk/spec/spk-archive-test/0.0.1/3I42H3S6.tag".to_string(),
65-
]
66-
);
67-
import_package(&filename)
68-
.await
69-
.expect("failed to import package");
70-
}
71-
7212
#[rstest]
7313
#[tokio::test]
7414
async fn test_archive_create_parents() {

crates/spk-cli/group3/Cargo.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,16 @@ anyhow = "1.0"
99
async-trait = "0.1"
1010
clap = { version = "3.2", features = ["derive", "env"] }
1111
colored = "2.0.0"
12+
futures = "0.3.9"
13+
spfs = { path = "../../spfs" }
1214
spk-cli-common = { path = '../common' }
1315
spk-storage = { path = '../../spk-storage' }
1416
spk-schema = { path = '../../spk-schema' }
17+
spfs-cli-common = { path = '../../spfs-cli/common' }
1518
tracing = "0.1.35"
19+
tokio = "1.20"
20+
21+
[dev-dependencies]
22+
tar = "0.4.3"
23+
spk-build = { path = '../../spk-build' }
24+
rstest = "0.15.0"

crates/spk-cli/group3/src/cmd_import.rs

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,51 @@
44

55
use anyhow::{Context, Result};
66
use clap::Args;
7+
use futures::TryStreamExt;
8+
79
use spk_cli_common::{CommandArgs, Run};
8-
use spk_storage::{self as storage};
10+
11+
#[cfg(test)]
12+
#[path = "./cmd_import_test.rs"]
13+
mod cmd_import_test;
914

1015
/// Import a previously exported package/archive
1116
#[derive(Args)]
1217
pub struct Import {
18+
#[clap(flatten)]
19+
sync: spfs_cli_common::Sync,
20+
1321
/// The archive to import from
14-
#[clap(name = "FILE")]
22+
#[clap(name = "FILE", required = true)]
1523
pub files: Vec<std::path::PathBuf>,
1624
}
1725

1826
#[async_trait::async_trait]
1927
impl Run for Import {
2028
async fn run(&mut self) -> Result<i32> {
29+
let mut summary = spfs::sync::SyncSummary::default();
30+
let local_repo = spk_storage::local_repository().await?;
31+
// src and dst are the same here which is useless, but we will
32+
// be using this syncer to create more useful ones for each archive
33+
let syncer = self.sync.get_syncer(&local_repo, &local_repo);
2134
for filename in self.files.iter() {
22-
storage::import_package(filename)
35+
let tar_repo = spfs::storage::tar::TarRepository::open(&filename).await?;
36+
let tar_repo: spfs::storage::RepositoryHandle = tar_repo.into();
37+
let env_spec = tar_repo
38+
.iter_tags()
39+
.map_ok(|(spec, _)| spec)
40+
.try_collect()
41+
.await
42+
.context("Failed to collect tags from archive")?;
43+
tracing::info!(archive = ?filename, "importing");
44+
summary += syncer
45+
.clone_with_source(&tar_repo)
46+
.sync_env(env_spec)
2347
.await
24-
.context("Import failed")?;
48+
.context("Failed to sync archived data")?
49+
.summary();
2550
}
51+
tracing::info!("{:#?}", summary);
2652
Ok(0)
2753
}
2854
}

0 commit comments

Comments
 (0)