Skip to content

Commit 50ce9f7

Browse files
committed
Add lots of plumbing to make rename atomic
1 parent 0cbe68f commit 50ce9f7

File tree

6 files changed

+77
-1
lines changed

6 files changed

+77
-1
lines changed

src/rpc.rs

+12-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use proto::{
3030
},
3131
tags::{
3232
CreateRequest as TagsCreateRequest, DeleteRequest as TagDeleteRequest,
33-
ListRequest as TagListRequest, SetRequest as TagsSetRequest, SyncMode,
33+
ListRequest as TagListRequest, RenameRequest, SetRequest as TagsSetRequest, SyncMode,
3434
},
3535
Request, RpcError, RpcResult, RpcService,
3636
};
@@ -158,6 +158,7 @@ impl<D: crate::store::Store> Handler<D> {
158158
Set(msg) => chan.rpc(msg, self, Self::tags_set).await,
159159
DeleteTag(msg) => chan.rpc(msg, self, Self::blob_delete_tag).await,
160160
ListTags(msg) => chan.server_streaming(msg, self, Self::blob_list_tags).await,
161+
Rename(msg) => chan.rpc(msg, self, Self::tags_rename).await,
161162
}
162163
}
163164

@@ -382,6 +383,16 @@ impl<D: crate::store::Store> Handler<D> {
382383
rx.map(AddPathResponse)
383384
}
384385

386+
async fn tags_rename(self, msg: RenameRequest) -> RpcResult<()> {
387+
let blobs = self;
388+
blobs
389+
.store()
390+
.rename_tag(msg.from, msg.to)
391+
.await
392+
.map_err(|e| RpcError::new(&e))?;
393+
Ok(())
394+
}
395+
385396
async fn tags_set(self, msg: TagsSetRequest) -> RpcResult<()> {
386397
let blobs = self;
387398
blobs

src/rpc/proto/tags.rs

+11
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ pub enum Request {
2020
#[rpc(response = RpcResult<()>)]
2121
Set(SetRequest),
2222
#[rpc(response = RpcResult<()>)]
23+
Rename(RenameRequest),
24+
#[rpc(response = RpcResult<()>)]
2325
DeleteTag(DeleteRequest),
2426
#[server_streaming(response = TagInfo)]
2527
ListTags(ListRequest),
@@ -111,3 +113,12 @@ impl From<DeleteOptions> for DeleteRequest {
111113
}
112114
}
113115
}
116+
117+
/// Rename a tag atomically
118+
#[derive(Debug, Serialize, Deserialize)]
119+
pub struct RenameRequest {
120+
/// Old tag name
121+
pub from: Tag,
122+
/// New tag name
123+
pub to: Tag,
124+
}

src/store/fs.rs

+35
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,12 @@ pub(crate) enum ActorMessage {
625625
hash: HashAndFormat,
626626
tx: oneshot::Sender<ActorResult<Tag>>,
627627
},
628+
/// Modification method: rename a tag atomically.
629+
RenameTag {
630+
from: Tag,
631+
to: Tag,
632+
tx: oneshot::Sender<ActorResult<()>>,
633+
},
628634
/// Modification method: unconditional delete the data for a number of hashes
629635
Delete {
630636
hashes: Vec<Hash>,
@@ -680,6 +686,7 @@ impl ActorMessage {
680686
| Self::SetFullEntryState { .. }
681687
| Self::Delete { .. }
682688
| Self::DeleteTags { .. }
689+
| Self::RenameTag { .. }
683690
| Self::GcDelete { .. } => MessageCategory::ReadWrite,
684691
Self::UpdateInlineOptions { .. }
685692
| Self::Sync { .. }
@@ -901,6 +908,14 @@ impl StoreInner {
901908
Ok(rx.await??)
902909
}
903910

911+
async fn rename_tag(&self, from: Tag, to: Tag) -> OuterResult<()> {
912+
let (tx, rx) = oneshot::channel();
913+
self.tx
914+
.send(ActorMessage::RenameTag { from, to, tx })
915+
.await?;
916+
Ok(rx.await??)
917+
}
918+
904919
async fn delete(&self, hashes: Vec<Hash>) -> OuterResult<()> {
905920
let (tx, rx) = oneshot::channel();
906921
self.tx.send(ActorMessage::Delete { hashes, tx }).await?;
@@ -1404,6 +1419,10 @@ impl super::Store for Store {
14041419
Ok(self.0.create_tag(hash).await?)
14051420
}
14061421

1422+
async fn rename_tag(&self, from: Tag, to: Tag) -> io::Result<()> {
1423+
Ok(self.0.rename_tag(from, to).await?)
1424+
}
1425+
14071426
async fn delete(&self, hashes: Vec<Hash>) -> io::Result<()> {
14081427
Ok(self.0.delete(hashes).await?)
14091428
}
@@ -2021,6 +2040,18 @@ impl ActorState {
20212040
Ok(tag)
20222041
}
20232042

2043+
fn rename_tag(&mut self, tables: &mut Tables, from: Tag, to: Tag) -> ActorResult<()> {
2044+
let value = tables
2045+
.tags
2046+
.get(from)?
2047+
.ok_or_else(|| {
2048+
ActorError::Io(io::Error::new(io::ErrorKind::NotFound, "tag not found"))
2049+
})?
2050+
.value();
2051+
tables.tags.insert(to, value)?;
2052+
Ok(())
2053+
}
2054+
20242055
fn set_tag(&self, tables: &mut Tables, tag: Tag, value: HashAndFormat) -> ActorResult<()> {
20252056
tables.tags.insert(tag, value)?;
20262057
Ok(())
@@ -2393,6 +2424,10 @@ impl ActorState {
23932424
let res = self.create_tag(tables, hash);
23942425
tx.send(res).ok();
23952426
}
2427+
ActorMessage::RenameTag { from, to, tx } => {
2428+
let res = self.rename_tag(tables, from, to);
2429+
tx.send(res).ok();
2430+
}
23962431
ActorMessage::Delete { hashes, tx } => {
23972432
let res = self.delete(tables, hashes, true);
23982433
tx.send(res).ok();

src/store/mem.rs

+12
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,18 @@ impl super::Store for Store {
208208
.await?
209209
}
210210

211+
async fn rename_tag(&self, from: Tag, to: Tag) -> io::Result<()> {
212+
let mut state = self.write_lock();
213+
let value = state.tags.remove(&from).ok_or_else(|| {
214+
io::Error::new(
215+
io::ErrorKind::NotFound,
216+
format!("tag not found: {:?}", from),
217+
)
218+
})?;
219+
state.tags.insert(to, value);
220+
Ok(())
221+
}
222+
211223
async fn set_tag(&self, name: Tag, value: HashAndFormat) -> io::Result<()> {
212224
let mut state = self.write_lock();
213225
state.tags.insert(name, value);

src/store/readonly_mem.rs

+4
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,10 @@ impl super::Store for Store {
307307
Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
308308
}
309309

310+
async fn rename_tag(&self, _from: Tag, _to: Tag) -> io::Result<()> {
311+
Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
312+
}
313+
310314
async fn import_stream(
311315
&self,
312316
data: impl Stream<Item = io::Result<Bytes>> + Unpin + Send,

src/store/traits.rs

+3
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,9 @@ pub trait Store: ReadableStore + MapMut + std::fmt::Debug {
352352
hash: HashAndFormat,
353353
) -> impl Future<Output = io::Result<()>> + Send;
354354

355+
/// Rename a tag
356+
fn rename_tag(&self, from: Tag, to: Tag) -> impl Future<Output = io::Result<()>> + Send;
357+
355358
/// Delete a single tag
356359
fn delete_tag(&self, name: Tag) -> impl Future<Output = io::Result<()>> + Send {
357360
self.delete_tags(Some(name.clone()), Some(name.successor()))

0 commit comments

Comments
 (0)