Skip to content

Commit 77ff112

Browse files
benpeartgitster
authored andcommitted
read-cache: load cache entries on worker threads
This patch helps address the CPU cost of loading the index by utilizing the Index Entry Offset Table (IEOT) to divide loading and conversion of the cache entries across multiple threads in parallel. I used p0002-read-cache.sh to generate some performance data: Test w/100,000 files reduced the time by 32.24% Test w/1,000,000 files reduced the time by -4.77% Note that on the 1,000,000 files case, multi-threading the cache entry parsing does not yield a performance win. This is because the cost to parse the index extensions in this repo, far outweigh the cost of loading the cache entries. The high cost of parsing the index extensions is driven by the cache tree and the untracked cache extensions. As this is currently the longest pole, any reduction in this time will reduce the overall index load times so is worth further investigation in another patch series. Signed-off-by: Ben Peart <[email protected]> Signed-off-by: Junio C Hamano <[email protected]>
1 parent 3255089 commit 77ff112

File tree

1 file changed

+193
-37
lines changed

1 file changed

+193
-37
lines changed

read-cache.c

Lines changed: 193 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1720,7 +1720,8 @@ int read_index(struct index_state *istate)
17201720
return read_index_from(istate, get_index_file(), get_git_dir());
17211721
}
17221722

1723-
static struct cache_entry *create_from_disk(struct index_state *istate,
1723+
static struct cache_entry *create_from_disk(struct mem_pool *ce_mem_pool,
1724+
unsigned int version,
17241725
struct ondisk_cache_entry *ondisk,
17251726
unsigned long *ent_size,
17261727
const struct cache_entry *previous_ce)
@@ -1737,7 +1738,7 @@ static struct cache_entry *create_from_disk(struct index_state *istate,
17371738
* number of bytes to be stripped from the end of the previous name,
17381739
* and the bytes to append to the result, to come up with its name.
17391740
*/
1740-
int expand_name_field = istate->version == 4;
1741+
int expand_name_field = version == 4;
17411742

17421743
/* On-disk flags are just 16 bits */
17431744
flags = get_be16(&ondisk->flags);
@@ -1761,16 +1762,17 @@ static struct cache_entry *create_from_disk(struct index_state *istate,
17611762
const unsigned char *cp = (const unsigned char *)name;
17621763
size_t strip_len, previous_len;
17631764

1764-
previous_len = previous_ce ? previous_ce->ce_namelen : 0;
1765+
/* If we're at the begining of a block, ignore the previous name */
17651766
strip_len = decode_varint(&cp);
1766-
if (previous_len < strip_len) {
1767-
if (previous_ce)
1767+
if (previous_ce) {
1768+
previous_len = previous_ce->ce_namelen;
1769+
if (previous_len < strip_len)
17681770
die(_("malformed name field in the index, near path '%s'"),
1769-
previous_ce->name);
1770-
else
1771-
die(_("malformed name field in the index in the first path"));
1771+
previous_ce->name);
1772+
copy_len = previous_len - strip_len;
1773+
} else {
1774+
copy_len = 0;
17721775
}
1773-
copy_len = previous_len - strip_len;
17741776
name = (const char *)cp;
17751777
}
17761778

@@ -1780,7 +1782,7 @@ static struct cache_entry *create_from_disk(struct index_state *istate,
17801782
len += copy_len;
17811783
}
17821784

1783-
ce = mem_pool__ce_alloc(istate->ce_mem_pool, len);
1785+
ce = mem_pool__ce_alloc(ce_mem_pool, len);
17841786

17851787
ce->ce_stat_data.sd_ctime.sec = get_be32(&ondisk->ctime.sec);
17861788
ce->ce_stat_data.sd_mtime.sec = get_be32(&ondisk->mtime.sec);
@@ -1948,6 +1950,52 @@ static void *load_index_extensions(void *_data)
19481950
return NULL;
19491951
}
19501952

1953+
/*
1954+
* A helper function that will load the specified range of cache entries
1955+
* from the memory mapped file and add them to the given index.
1956+
*/
1957+
static unsigned long load_cache_entry_block(struct index_state *istate,
1958+
struct mem_pool *ce_mem_pool, int offset, int nr, const char *mmap,
1959+
unsigned long start_offset, const struct cache_entry *previous_ce)
1960+
{
1961+
int i;
1962+
unsigned long src_offset = start_offset;
1963+
1964+
for (i = offset; i < offset + nr; i++) {
1965+
struct ondisk_cache_entry *disk_ce;
1966+
struct cache_entry *ce;
1967+
unsigned long consumed;
1968+
1969+
disk_ce = (struct ondisk_cache_entry *)(mmap + src_offset);
1970+
ce = create_from_disk(ce_mem_pool, istate->version, disk_ce, &consumed, previous_ce);
1971+
set_index_entry(istate, i, ce);
1972+
1973+
src_offset += consumed;
1974+
previous_ce = ce;
1975+
}
1976+
return src_offset - start_offset;
1977+
}
1978+
1979+
static unsigned long load_all_cache_entries(struct index_state *istate,
1980+
const char *mmap, size_t mmap_size, unsigned long src_offset)
1981+
{
1982+
unsigned long consumed;
1983+
1984+
if (istate->version == 4) {
1985+
mem_pool_init(&istate->ce_mem_pool,
1986+
estimate_cache_size_from_compressed(istate->cache_nr));
1987+
} else {
1988+
mem_pool_init(&istate->ce_mem_pool,
1989+
estimate_cache_size(mmap_size, istate->cache_nr));
1990+
}
1991+
1992+
consumed = load_cache_entry_block(istate, istate->ce_mem_pool,
1993+
0, istate->cache_nr, mmap, src_offset, NULL);
1994+
return consumed;
1995+
}
1996+
1997+
#ifndef NO_PTHREADS
1998+
19511999
/*
19522000
* Mostly randomly chosen maximum thread counts: we
19532001
* cap the parallelism to online_cpus() threads, and we want
@@ -1957,20 +2005,123 @@ static void *load_index_extensions(void *_data)
19572005

19582006
#define THREAD_COST (10000)
19592007

2008+
struct load_cache_entries_thread_data
2009+
{
2010+
pthread_t pthread;
2011+
struct index_state *istate;
2012+
struct mem_pool *ce_mem_pool;
2013+
int offset;
2014+
const char *mmap;
2015+
struct index_entry_offset_table *ieot;
2016+
int ieot_start; /* starting index into the ieot array */
2017+
int ieot_blocks; /* count of ieot entries to process */
2018+
unsigned long consumed; /* return # of bytes in index file processed */
2019+
};
2020+
2021+
/*
2022+
* A thread proc to run the load_cache_entries() computation
2023+
* across multiple background threads.
2024+
*/
2025+
static void *load_cache_entries_thread(void *_data)
2026+
{
2027+
struct load_cache_entries_thread_data *p = _data;
2028+
int i;
2029+
2030+
/* iterate across all ieot blocks assigned to this thread */
2031+
for (i = p->ieot_start; i < p->ieot_start + p->ieot_blocks; i++) {
2032+
p->consumed += load_cache_entry_block(p->istate, p->ce_mem_pool,
2033+
p->offset, p->ieot->entries[i].nr, p->mmap, p->ieot->entries[i].offset, NULL);
2034+
p->offset += p->ieot->entries[i].nr;
2035+
}
2036+
return NULL;
2037+
}
2038+
2039+
static unsigned long load_cache_entries_threaded(struct index_state *istate, const char *mmap, size_t mmap_size,
2040+
unsigned long src_offset, int nr_threads, struct index_entry_offset_table *ieot)
2041+
{
2042+
int i, offset, ieot_blocks, ieot_start, err;
2043+
struct load_cache_entries_thread_data *data;
2044+
unsigned long consumed = 0;
2045+
2046+
/* a little sanity checking */
2047+
if (istate->name_hash_initialized)
2048+
BUG("the name hash isn't thread safe");
2049+
2050+
mem_pool_init(&istate->ce_mem_pool, 0);
2051+
2052+
/* ensure we have no more threads than we have blocks to process */
2053+
if (nr_threads > ieot->nr)
2054+
nr_threads = ieot->nr;
2055+
data = xcalloc(nr_threads, sizeof(*data));
2056+
2057+
offset = ieot_start = 0;
2058+
ieot_blocks = DIV_ROUND_UP(ieot->nr, nr_threads);
2059+
for (i = 0; i < nr_threads; i++) {
2060+
struct load_cache_entries_thread_data *p = &data[i];
2061+
int nr, j;
2062+
2063+
if (ieot_start + ieot_blocks > ieot->nr)
2064+
ieot_blocks = ieot->nr - ieot_start;
2065+
2066+
p->istate = istate;
2067+
p->offset = offset;
2068+
p->mmap = mmap;
2069+
p->ieot = ieot;
2070+
p->ieot_start = ieot_start;
2071+
p->ieot_blocks = ieot_blocks;
2072+
2073+
/* create a mem_pool for each thread */
2074+
nr = 0;
2075+
for (j = p->ieot_start; j < p->ieot_start + p->ieot_blocks; j++)
2076+
nr += p->ieot->entries[j].nr;
2077+
if (istate->version == 4) {
2078+
mem_pool_init(&p->ce_mem_pool,
2079+
estimate_cache_size_from_compressed(nr));
2080+
} else {
2081+
mem_pool_init(&p->ce_mem_pool,
2082+
estimate_cache_size(mmap_size, nr));
2083+
}
2084+
2085+
err = pthread_create(&p->pthread, NULL, load_cache_entries_thread, p);
2086+
if (err)
2087+
die(_("unable to create load_cache_entries thread: %s"), strerror(err));
2088+
2089+
/* increment by the number of cache entries in the ieot block being processed */
2090+
for (j = 0; j < ieot_blocks; j++)
2091+
offset += ieot->entries[ieot_start + j].nr;
2092+
ieot_start += ieot_blocks;
2093+
}
2094+
2095+
for (i = 0; i < nr_threads; i++) {
2096+
struct load_cache_entries_thread_data *p = &data[i];
2097+
2098+
err = pthread_join(p->pthread, NULL);
2099+
if (err)
2100+
die(_("unable to join load_cache_entries thread: %s"), strerror(err));
2101+
mem_pool_combine(istate->ce_mem_pool, p->ce_mem_pool);
2102+
consumed += p->consumed;
2103+
}
2104+
2105+
free(data);
2106+
2107+
return consumed;
2108+
}
2109+
#endif
2110+
19602111
/* remember to discard_cache() before reading a different cache! */
19612112
int do_read_index(struct index_state *istate, const char *path, int must_exist)
19622113
{
1963-
int fd, i;
2114+
int fd;
19642115
struct stat st;
19652116
unsigned long src_offset;
19662117
const struct cache_header *hdr;
19672118
const char *mmap;
19682119
size_t mmap_size;
1969-
const struct cache_entry *previous_ce = NULL;
19702120
struct load_index_extensions p;
19712121
size_t extension_offset = 0;
19722122
#ifndef NO_PTHREADS
1973-
int nr_threads;
2123+
int nr_threads, cpus;
2124+
struct index_entry_offset_table *ieot = NULL;
19742125
#endif
19752126

19762127
if (istate->initialized)
@@ -2012,10 +2163,18 @@ int do_read_index(struct index_state *istate, const char *path, int must_exist)
20122163
p.mmap = mmap;
20132164
p.mmap_size = mmap_size;
20142165

2166+
src_offset = sizeof(*hdr);
2167+
20152168
#ifndef NO_PTHREADS
20162169
nr_threads = git_config_get_index_threads();
2017-
if (!nr_threads)
2018-
nr_threads = online_cpus();
2170+
2171+
/* TODO: does creating more threads than cores help? */
2172+
if (!nr_threads) {
2173+
nr_threads = istate->cache_nr / THREAD_COST;
2174+
cpus = online_cpus();
2175+
if (nr_threads > cpus)
2176+
nr_threads = cpus;
2177+
}
20192178

20202179
if (nr_threads > 1) {
20212180
extension_offset = read_eoie_extension(mmap, mmap_size);
@@ -2030,29 +2189,24 @@ int do_read_index(struct index_state *istate, const char *path, int must_exist)
20302189
nr_threads--;
20312190
}
20322191
}
2033-
#endif
20342192

2035-
if (istate->version == 4) {
2036-
mem_pool_init(&istate->ce_mem_pool,
2037-
estimate_cache_size_from_compressed(istate->cache_nr));
2193+
/*
2194+
* Locate and read the index entry offset table so that we can use it
2195+
* to multi-thread the reading of the cache entries.
2196+
*/
2197+
if (extension_offset && nr_threads > 1)
2198+
ieot = read_ieot_extension(mmap, mmap_size, extension_offset);
2199+
2200+
if (ieot) {
2201+
src_offset += load_cache_entries_threaded(istate, mmap, mmap_size, src_offset, nr_threads, ieot);
2202+
free(ieot);
20382203
} else {
2039-
mem_pool_init(&istate->ce_mem_pool,
2040-
estimate_cache_size(mmap_size, istate->cache_nr));
2204+
src_offset += load_all_cache_entries(istate, mmap, mmap_size, src_offset);
20412205
}
2206+
#else
2207+
src_offset += load_all_cache_entries(istate, mmap, mmap_size, src_offset);
2208+
#endif
20422209

2043-
src_offset = sizeof(*hdr);
2044-
for (i = 0; i < istate->cache_nr; i++) {
2045-
struct ondisk_cache_entry *disk_ce;
2046-
struct cache_entry *ce;
2047-
unsigned long consumed;
2048-
2049-
disk_ce = (struct ondisk_cache_entry *)(mmap + src_offset);
2050-
ce = create_from_disk(istate, disk_ce, &consumed, previous_ce);
2051-
set_index_entry(istate, i, ce);
2052-
2053-
src_offset += consumed;
2054-
previous_ce = ce;
2055-
}
20562210
istate->timestamp.sec = st.st_mtime;
20572211
istate->timestamp.nsec = ST_MTIME_NSEC(st);
20582212

@@ -2549,7 +2703,7 @@ static int do_write_index(struct index_state *istate, struct tempfile *tempfile,
25492703
struct strbuf previous_name_buf = STRBUF_INIT, *previous_name;
25502704
int drop_cache_tree = istate->drop_cache_tree;
25512705
off_t offset;
2552-
int ieot_blocks = 1;
2706+
int ieot_entries = 1;
25532707
struct index_entry_offset_table *ieot = NULL;
25542708
int nr, nr_threads;
25552709

@@ -2602,6 +2756,8 @@ static int do_write_index(struct index_state *istate, struct tempfile *tempfile,
26022756
ieot_blocks = cpus - 1;
26032757
} else {
26042758
ieot_blocks = nr_threads;
2759+
if (ieot_blocks > istate->cache_nr)
2760+
ieot_blocks = istate->cache_nr;
26052761
}
26062762

26072763
/*
@@ -2611,7 +2767,7 @@ static int do_write_index(struct index_state *istate, struct tempfile *tempfile,
26112767
if (ieot_blocks > 1) {
26122768
ieot = xcalloc(1, sizeof(struct index_entry_offset_table)
26132769
+ (ieot_blocks * sizeof(struct index_entry_offset)));
2614-
ieot_blocks = DIV_ROUND_UP(entries, ieot_blocks);
2770+
ieot_entries = DIV_ROUND_UP(entries, ieot_blocks);
26152771
}
26162772
}
26172773
#endif
@@ -2644,7 +2800,7 @@ static int do_write_index(struct index_state *istate, struct tempfile *tempfile,
26442800

26452801
drop_cache_tree = 1;
26462802
}
2647-
if (ieot && i && (i % ieot_blocks == 0)) {
2803+
if (ieot && i && (i % ieot_entries == 0)) {
26482804
ieot->entries[ieot->nr].nr = nr;
26492805
ieot->entries[ieot->nr].offset = offset;
26502806
ieot->nr++;

0 commit comments

Comments
 (0)