diff --git a/Database Systems/Concurrency Control/Concurrency Control.md b/Database Systems/Concurrency Control/Concurrency Control.md index 936ae3e..d2986fa 100644 --- a/Database Systems/Concurrency Control/Concurrency Control.md +++ b/Database Systems/Concurrency Control/Concurrency Control.md @@ -1,5 +1,5 @@ # Explanation -- *Transaction*: execution of one or more operations (e.g, SQL queries) on a shared database to perform some higher level function. +- *Transaction*: execution of one or more operations (e.g, SQL queries) on a shared database to perform some higher level function. ^dae2be - Basic unit of change in DBMS. - Must be #atomic. - Transaction starts with the BEGIN command and ends with either COMMIT or ABORT. diff --git a/Database Systems/Database Systems.md b/Database Systems/Database Systems.md index 18618d2..79f9ed1 100644 --- a/Database Systems/Database Systems.md +++ b/Database Systems/Database Systems.md @@ -1,6 +1,6 @@ # Explanation - *Database*: organized collection of inter-related data that models some aspect of the real world. -- *Database Management System*: software that allows applications to store and analyze information in a database. +- *Database Management System*: software that allows applications to store and analyze information in a database. ^1ec6f9 - Designed to allow definition, creation, querying, updating, and administration, of databases in accordance with some data model. - *Data Model*: collection of concepts for describing the data in a database. Examples: - [[Relational Model]] (most common), @@ -22,6 +22,7 @@ - [[Query Planning & Optimization]] - [[Concurrency Control]] - [[Crash Recovery]] +- [[Distributed Databases]] # FAQ - Why do we need a database management system? Why not just store data in a flat CSV file? (AKA Strawman system)? Because there are many issues with this approach: diff --git a/Distributed Systems/CAP Theorem.md b/Distributed Systems/CAP Theorem.md new file mode 100644 index 0000000..b023756 --- /dev/null +++ b/Distributed Systems/CAP Theorem.md @@ -0,0 +1,26 @@ +# Explanation + +> *The CAP Theorem*: it is impossible for a distributed system to always be **Consistent**, **Available**, and **Partition-Tolerant**. Only two of these three properties can be chosen. + +--- + +### Consistency +- [[Linearizability#^7f4541|Consistency]] is synonymous with [[Linearizability|linearizability]] for operations on all nodes. +- Once a write completes, all future reads should return the value of that write applied or a later write applied. +- Additionally, once a read has been returned, future reads should return that value or the value of a later applied write. +- NoSQL systems compromise this property in favor of the latter two. + - Other systems will favor this property and one of the latter two. + +### Availability +- *Availability*: the concept that all nodes that are up can satisfy all requests. + +### Partition-Tolerance +- *Partition tolerance*: the system can still operate correctly despite some message loss between nodes that are trying to reach consensus on values. + +## Insights +- If consistency and partition-tolerance is chosen for a system, updates will not be allowed until a majority of nodes are reconnected, typically done in traditional or NewSQL DBMSs. +- There is a modern version that considers consistency vs. latency trade-offs: *PACELC Theorem*. + - In case of **network partitioning** (P) in a distributed system, one has to choose between **availability** (A) and **consistency** (C), else (E), even when the system runs normally without network partitions, one has to choose between **latency** (L) and **consistency** (C). + +# Sources +- CMU 15-445 Lecture 22 - "Distributed Transactional Database Systems" \ No newline at end of file diff --git a/Distributed Systems/Chain Replication.md b/Distributed Systems/Chain Replication.md new file mode 100644 index 0000000..000c168 --- /dev/null +++ b/Distributed Systems/Chain Replication.md @@ -0,0 +1,52 @@ +# Explanation +- *Chain Replication (CR)*: a form of [[Replication#Replicated State Machine|replicated state machine]] for replicating data at the [[Replication#^518c94|application level]] across multiple nodes to provide a strongly consistent storage interface. + +## Mechanism +- The basic approach organizes all nodes storing an object in a chain. + - Nodes form a chain of a defined length. + - The chain tail handles all read requests, and the chain head handles all write requests. +- **Writes** propagate down the chain in order and are considered **committed** once they reach the tail. + - When a write operation is received by a node, it is propagated to the next node in the chain. + - Once the write reaches the tail node, it has been applied to all replicas in the chain, and it is considered committed. +- **Reads** are only handled by the tail node, so only committed values can be returned by a read. +- CR achieves **strong consistency**/[[Linearizability]] because all writes are committed at the tail and all reads come from the tail. + +## Types +- **Basic Chain Replication**: This is the original chain replication method. + - All reads must be handled by the tail node and writes propagate down the chain from the head. + - This method has good throughput and easy recovery, but suffers from read throughput limitations and potential hotspots. +- **Chain Replication with Apportioned Queries (CRAQ)**: This method improves upon basic Chain Replication by allowing any node in the chain to handle read operations while still providing strong consistency guarantees. + - This is achieved by allowing nodes to store multiple versions of an object and using version queries to ensure consistency. + +## Advantages +- **Strong Consistency**: CR guarantees that all reads will see the latest written value, ensuring data consistency. +- **Simplicity**: CR is relatively simple to implement compared to other strong consistency protocols. +- **Good Throughput**: Write operations are cheaper than in other protocols offering strong consistency, and can be pipelined down the chain, distributing transmission costs evenly. + - The head sends each write just once, while in Raft, the leader sends writes to all nodes. +- **Quick and Easy Recovery**: The chain structure simplifies failure recovery, making it faster and easier compared to other methods. + - Every replica knows of every committed write, and if the head or tail fails, their successor takes over. + +## Disadvantages +- **Limited Read Throughput**: All reads must go to the tail node, limiting read throughput to the capacity of a single node. + - In Raft, reads involve all servers, not just one as in CR. +- **Hotspots**: The single point of read access at the tail can create performance bottlenecks. +- **Increased Latency**: Wide-area deployments can suffer from increased write latency as writes must propagate through the entire chain. +- **Not Immediately Fault-Tolerant**: All CRAQ replicas must participate for any write to commit. If a node is not reachable, CRAQ must wait. This is different from protocols like ZooKeeper and Raft, which are immediately fault-tolerant. +- **Susceptible to Partitions**: Because CRAQ is not immediately fault-tolerant, it is not able to handle partitions. If a partition happens, the system may experience split brain, and must wait until the network is restored. + +## Handling Partitions +- To safely use a replication system that can't handle partitions, a single configuration manager must be used. + - The configuration manager is responsible for choosing the head, chain, and tail of the system. + - Servers and clients must obey the configuration manager, or stop operating, regardless of which nodes they think are alive or dead. +- A **configuration manager** is a common pattern for distributed systems. + - It is a core part of systems such as [[The Google File System]] and VMware-FT. + - Typically, Paxos, Raft, or ZooKeeper are used as the configuration service. + +## Addressing Limitations +- One way to improve read throughput in CR is to split objects over multiple chains, where each server participates in multiple chains. +- This only works if the load is evenly distributed among the chains. +- However, load is often not evenly distributed, meaning that splitting objects over multiple chains may not be a viable solution. +- CRAQ offers another way to address the read throughput limitation of CR. + +# Sources +- [MIT 6.824 - Lecture 9](https://www.youtube.com/watch?v=IXHzbCuADt0) diff --git a/Distributed Systems/Consistent Hashing.md b/Distributed Systems/Consistent Hashing.md new file mode 100644 index 0000000..80e6e56 --- /dev/null +++ b/Distributed Systems/Consistent Hashing.md @@ -0,0 +1,12 @@ +# Explanation +- *Consistent Hashing*: a hashing scheme that + 1. assigns every node to a location on some logical ring, + 2. then the hash of every partition key maps to some location on the ring. + 3. The node that is closest to the key in the clockwise direction is responsible for that key. +- When a node is added or removed, keys are only moved between nodes adjacent to the new/removed node. +- A replication factor of $n$ means that each key is replicated at the $n$ closest nodes in the clockwise direction. + +- Illustration: ![[Consistent Hashing.png]] + +# Sources +- CMU 15-445 Lecture 21 - "Introduction to Distributed Databases" diff --git a/Distributed Systems/Consistent Hashing.png b/Distributed Systems/Consistent Hashing.png new file mode 100644 index 0000000..8f1f27d Binary files /dev/null and b/Distributed Systems/Consistent Hashing.png differ diff --git a/Distributed Systems/Distributed Databases/Centralized Coordinator.png b/Distributed Systems/Distributed Databases/Centralized Coordinator.png new file mode 100644 index 0000000..455a2a4 Binary files /dev/null and b/Distributed Systems/Distributed Databases/Centralized Coordinator.png differ diff --git a/Distributed Systems/Distributed Databases/Database System Architectures.png b/Distributed Systems/Distributed Databases/Database System Architectures.png new file mode 100644 index 0000000..ca4f79b Binary files /dev/null and b/Distributed Systems/Distributed Databases/Database System Architectures.png differ diff --git a/Distributed Systems/Distributed Databases/Distributed Concurrency Control.md b/Distributed Systems/Distributed Databases/Distributed Concurrency Control.md new file mode 100644 index 0000000..1687a37 --- /dev/null +++ b/Distributed Systems/Distributed Databases/Distributed Concurrency Control.md @@ -0,0 +1,39 @@ +# Explanation +- A distributed transaction accesses data at one or more partitions, which requires expensive coordination. + +## Coordination Approaches +### Centralized coordinator +- The centralized coordinator acts as a global “traffic cop” that coordinates all the behavior. +- Illustration: ![[Centralized Coordinator.png]] + +### Middleware +- Centralized coordinators can be used as middleware, which accepts query requests and routes queries to correct partitions. + +### Decentralized coordinator +- In a decentralized approach, nodes organize themselves. +- The client directly sends queries to one of the partitions. +- This home partition will send results back to the client. + - The home partition is in charge of communicating with other partitions and committing accordingly. + +## Distributed Transactions +- *Distributed [[Concurrency Control#^dae2be|Transaction]]*: a transactions that accesses data on multiple nodes. +- Executing distributed transactions is more challenging than single-node transactions because now when the transaction commits, the DBMS has to make sure that all the nodes agree to commit the transaction. +- The DBMS ensures that the database provides the same [[Concurrency Control#ACID|ACID]] guarantees as a single-node DBMS even in the case of node failures, message delays, or message loss. +- One can assume that all nodes in a distributed DBMS are well-behaved and under the same administrative domain. + - In other words, given that there is not a node failure, a node which is told to commit a transaction will commit the transaction. + - If the other nodes in a distributed DBMS cannot be trusted, then the DBMS needs to use a byzantine fault tolerant protocol (e.g., blockchain) for transactions. + +## Atomic Commit Protocols +- When a multi-node transaction finishes, the DBMS needs to ask all of the nodes involved whether it is safe to commit. +- Depending on the protocol, a majority of the nodes or all of the nodes may be needed to commit. +- Examples: + - [[Two-Phase Commit]] (Common) + - Three-Phase Commit (Uncommon) + - Paxos (Common) + - Raft (Common) + - ZAB (Zookeeper Atomic Broadcast protocol, Apache Zookeeper) + - Viewstamped Replication (first provably correct protocol) + +# Sources +- CMU 15-445 Lecture 21 - "Introduction to Distributed Databases" +- CMU 15-445 Lecture 22 - "Distributed Transactional Database Systems" \ No newline at end of file diff --git a/Distributed Systems/Distributed Databases/Distributed Database Architecture.md b/Distributed Systems/Distributed Databases/Distributed Database Architecture.md new file mode 100644 index 0000000..46d2729 --- /dev/null +++ b/Distributed Systems/Distributed Databases/Distributed Database Architecture.md @@ -0,0 +1,40 @@ +# Explanation +- A DBMS’s system architecture specifies what shared resources are directly accessible to CPUs. + - It affects how CPUs coordinate with each other and where they retrieve and store objects in the database. +- Types: ![[Database System Architectures.png]] + +## Shared everything +- A single-node DBMS uses what is called a **shared everything** architecture. +- This single node executes workers on a local CPU(s) with its own local memory address space and disk. + +## Shared Memory +- An alternative to shared everything architecture in distributed systems is shared memory. + - CPUs have access to common memory address space via a fast interconnect. CPUs also share the same disk. +- In practice, most DBMSs do not use this architecture, since + - it is provided at the [[Operating Systems|OS]]/kernel level, and + - it also causes problems, since each process’s scope of memory is the same memory address space, which can be modified by multiple processes. +- Each processor has a global view of all the in-memory data structures. +- Each DBMS instance on a processor has to “know” about the other instances. + +## Shared Disk +- In a shared [[Disks|disk]] architecture, all CPUs can read and write to a single logical disk directly via an interconnect, but each have their own private memories. + - The local storage on each compute node can act as caches. + - This approach is more common in cloud-based DBMSs. +- The DBMS’s execution layer can scale independently from the storage layer. + - Adding new storage nodes or execution nodes does not affect the layout or location of data in the other layer. +- Nodes must send messages between them to learn about other node’s current state. + - That is, since memory is local, if data is modified, changes must be communicated to other CPUs in the case that piece of data is in main memory for the other CPUs. +- Nodes have their own buffer pool and are considered stateless. + - A node crash does not affect the state of the database since that is stored separately on the shared disk. + - The storage layer persists the state in the case of crashes. + +## Shared Nothing +- In a shared nothing environment, each node has its own CPU, memory, and disk. + - Nodes only communicate with each other via network. + - Before the rise of cloud storage platforms, the shared nothing architecture used to be considered the correct way to build distributed DBMSs. +- It is more difficult to increase capacity in this architecture because the DBMS has to physically move data to new nodes. +- It is also difficult to ensure consistency across all nodes in the DBMS, since the nodes must coordinate with each other on the state of transactions. +- The advantage is that shared nothing DBMSs can potentially achieve better performance and are more efficient then other types of distributed DBMS architectures. + +# Sources +- CMU 15-445 Lecture 21 - "Introduction to Distributed Databases" \ No newline at end of file diff --git a/Distributed Systems/Distributed Databases/Distributed Database Partitioning.md b/Distributed Systems/Distributed Databases/Distributed Database Partitioning.md new file mode 100644 index 0000000..50cd28e --- /dev/null +++ b/Distributed Systems/Distributed Databases/Distributed Database Partitioning.md @@ -0,0 +1,35 @@ +# Explanation +- Distributed database systems must partition the database across multiple resources, including disks, nodes, processors. + - This process is sometimes called sharding in NoSQL systems. +- When the DBMS receives a query, + 1. it first analyzes the data that the query plan needs to access. + 2. The DBMS may potentially send fragments of the query plan to different nodes, + 3. then combines the results to produce a single answer. +- The goal of a partitioning scheme is to maximize single-node transactions, or transactions that only access data contained on one partition. + - This allows the DBMS to not need to coordinate the behavior of concurrent transactions running on other nodes. +- A distributed transaction accessing data at one or more partitions will require expensive, difficult coordination. + +- Types: + - *Logical Partitioning*: a type of partitioning where particular nodes are in charge of accessing specific tuples from a shared disk. + - *Physical Partitioning*: a type of partitioning where each shared-nothing node reads and updates tuples it contains on its own local disk. + +## Naive Table Partitioning +- The simplest way to partition tables is naive data partitioning where each node stores one table, assuming enough storage space for a given node. +- This is easy to implement because a query is just routed to a specific partitioning. +- This can be bad, since it is not scalable: one partition’s resources can be exhausted if that one table is queried on often, not using all nodes available. +- Illustration: ![[Naive Table Partitioning.png]] + +## Vertical Partitioning +- *Vertical Partitioning*: a partitioning scheme which splits a table’s attributes into separate partitions. +- Each partition must also store tuple information for reconstructing the original record. + +## Horizontal Partitioning +- *Horizontal Partitioning*: a partitioning scheme which splits a table’s tuples into disjoint subsets that are equal in terms of size, load, or usage, based on some partitioning key(s). + +## Hash Partitioning +- The DBMS can partition a database physically (shared nothing) or logically (shared disk) via hash partitioning or range partitioning. +- The problem of hash partitioning is that when a new node is added or removed, a lot of data needs to be shuffled around. + - The solution for this is [[Consistent Hashing]]. + +# Sources +- CMU 15-445 Lecture 21 - "Introduction to Distributed Databases" diff --git a/Distributed Systems/Distributed Databases/Distributed Databases.md b/Distributed Systems/Distributed Databases/Distributed Databases.md new file mode 100644 index 0000000..a8c0e84 --- /dev/null +++ b/Distributed Systems/Distributed Databases/Distributed Databases.md @@ -0,0 +1,23 @@ +# Explanation +- *Distributed DBMS*: a [[Database Systems#^1ec6f9|DBMS]] that divides a single logical database across multiple physical resources. + - The application is (usually) unaware that data is split across separated hardware. + - The system relies on the techniques and algorithms from single-node DBMSs to support transaction processing and query execution in a distributed environment. +- An important goal in designing a distributed DBMS is fault tolerance (i.e., avoiding a single one node failure taking down the entire system). + +## Parallel vs Distributed Databases +- **Parallel Database**: + - Nodes are physically close to each other. + - Nodes are connected via high-speed LAN (fast, reliable communication fabric). + - The communication cost between nodes is assumed to be small. As such, one does not need to worry about nodes crashing or packets getting dropped when designing internal protocols. +- **Distributed Database**: + - Nodes can be far from each other. + - Nodes are potentially connected via a public network, which can be slow and unreliable. + - The communication cost and connection problems cannot be ignored (i.e., nodes can crash, and packets can get dropped). + +# Topics +- [[Distributed Database Architecture]] +- [[Distributed Database Partitioning]] +- [[Distributed Concurrency Control]] + +# Sources +- CMU 15-445 Lecture 21 - "Introduction to Distributed Databases" \ No newline at end of file diff --git a/Distributed Systems/Distributed Databases/Naive Table Partitioning.png b/Distributed Systems/Distributed Databases/Naive Table Partitioning.png new file mode 100644 index 0000000..0fe4901 Binary files /dev/null and b/Distributed Systems/Distributed Databases/Naive Table Partitioning.png differ diff --git a/Distributed Systems/Distributed Databases/Two-Phase Commit.md b/Distributed Systems/Distributed Databases/Two-Phase Commit.md new file mode 100644 index 0000000..1933777 --- /dev/null +++ b/Distributed Systems/Distributed Databases/Two-Phase Commit.md @@ -0,0 +1,34 @@ +# Explanation + +## Mechanism + +### Phase 1: Prepare +1. The client sends a Commit Request to the [[Distributed Concurrency Control#Coordination Approaches|coordinator]]. +2. In the first phase of this protocol, the coordinator sends a Prepare message, essentially asking the participant nodes if the current transaction is allowed to commit. +3. If a given participant verifies that the given transaction is valid, they send an OK to the coordinator. +4. If the coordinator receives an OK from all the participants, the system can now go into the second phase in the protocol. +5. If anyone sends an Abort to the coordinator, the coordinator sends an Abort to the client. + +### Phase 2: Commit +1. The coordinator sends a Commit to all the participants, telling those nodes to commit the transaction, if all the participants sent an OK. +2. Once the participants respond with an OK, the coordinator can tell the client that the transaction is committed. +3. If the transaction was aborted in the first phase, the participants receive an Abort from the coordinator, to which they should respond to with an OK. + +- **Notes**: + - Either everyone commits or no one does. + - The coordinator can also be a participant in the system. + +### Crash Recovery +- In the case of a crash, all nodes keep track of a non-volatile log of the outcome of each phase. +- Nodes block until they can figure out the next course of action. +- If the coordinator crashes, the participants must decide what to do. + - A safe option is just to abort. + - Alternatively, the nodes can communicate with each other to see if they can commit without the explicit permission of the coordinator. +- If a participant crashes, the coordinator assumes that it responded with an abort if it has not sent an acknowledgement yet. + +## Optimizations +- **Early Prepare Voting**: If the DBMS sends a query to a remote node that it knows will be the last one executed there, then that node will also return their vote for the prepare phase with the query result. +- **Early Acknowledgement after Prepare**: If all nodes vote to commit a transaction, the coordinator can send the client an acknowledgement that their transaction was successful before the commit phase finishes. + +# Sources +- CMU 15-445 Lecture 22 - "Distributed Transactional Database Systems" diff --git a/Distributed Systems/Distributed Systems.md b/Distributed Systems/Distributed Systems.md new file mode 100644 index 0000000..cde07d5 --- /dev/null +++ b/Distributed Systems/Distributed Systems.md @@ -0,0 +1,36 @@ +# Explanation +- *Distributed System*: A collection of independent computers that appear to the users of the system as a single computer. + +## Why do people build distributed systems? +- to increase capacity via parallelism, +- to tolerate faults via replication, +- to place computing physically close to external entities, and +- to achieve security via isolation. + +## Challenges in distributed systems +- They have many concurrent parts. +- They must cope with partial failures. +- It's tricky to realize performance potential. + +## Topics +- [[Replication]] + - Primary/Backup Replication + - [[Chain Replication]] +- [[Linearizability]] +- [[CAP Theorem]] +- [[Consistent Hashing]] +- [[Distributed Databases]] + +## Papers +- [[MapReduce]] +- [[The Google File System]] +- VMware Fault-Tolerant Virtual Machines +- Raft +- Zookeeper +- Kafka +- CRAQ +- Amazon Aurora +- Frangipani + +# Sources +- [MIT 6.824 - Lecture 1](https://www.youtube.com/watch?v=cQP8WApzIQQ&list=PLrw6a1wE39_tb2fErI4-WkMbsvGQk9_UB&index=1&pp=iAQB) diff --git a/Distributed Systems/GFS Architecture.png b/Distributed Systems/GFS Architecture.png new file mode 100644 index 0000000..d410079 Binary files /dev/null and b/Distributed Systems/GFS Architecture.png differ diff --git a/Distributed Systems/Linearizability.md b/Distributed Systems/Linearizability.md new file mode 100644 index 0000000..431b620 --- /dev/null +++ b/Distributed Systems/Linearizability.md @@ -0,0 +1,20 @@ +# Explanation +- *Linearizability*: an execution history is linearizable if one can find a total order of all operations, + - that matches real-time (for non-overlapping ops), and + - in which each read sees the value from the write preceding it in the order. +- *Execution History*: a record of client operations, each with arguments, return value, time of start, time completed. +- *Strong Consistency*: when a distributed system behaves the same way that a system with only one node would behave. ^7f4541 + - The term is often used interchangeably with Linearizability. + +## Notes +- The definition is based on external behavior so we can apply it without having to know how service works. +- Systems can choose any order for concurrent writes but all clients must see the writes in the same order. + - This is important when we have replicas or caches because they have to all agree on the order in which operations occur. +- According to the definition, reads must return fresh data: stale values aren't linearizable. +- Linearizability forbids many situations: + - split brain (two active leaders), + - forgetting committed writes after a reboot, and + - reading from lagging replicas. + +# Sources +- [MIT 6.824 - Lecture 7](https://www.youtube.com/watch?v=4r8Mz3MMivY) diff --git a/Distributed Systems/MapReduce Execution overview.png b/Distributed Systems/MapReduce Execution overview.png new file mode 100644 index 0000000..561a933 Binary files /dev/null and b/Distributed Systems/MapReduce Execution overview.png differ diff --git a/Distributed Systems/MapReduce.md b/Distributed Systems/MapReduce.md new file mode 100644 index 0000000..9007e1c --- /dev/null +++ b/Distributed Systems/MapReduce.md @@ -0,0 +1,118 @@ +# Explanation + +## Abstract +- MapReduce is a programming model and an associated implementation for processing and generating large data sets. +- Users specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate key. +- Many real world tasks are expressible in this model, as shown in the paper. +- Programs written in this functional style are automatically parallelized and executed on a large cluster of commodity machines. +- The run-time system takes care of the details of partitioning the input data, scheduling the program’s execution across a set of machines, handling machine failures, and managing the required inter-machine communication. +- This allows programmers without any experience with parallel and distributed systems to easily utilize the resources of a large distributed system. + +## Programming Model +- The computation takes a set of input key/value pairs, and produces a set of output key/value pairs. +- The user of the MapReduce library expresses the computation as two functions: + - *Map*: takes an input pair and produces a set of intermediate key/value pairs. + - The MapReduce library groups together all intermediate values associated with the same intermediate key I and passes them to the Reduce function. + - *Reduce*: accepts an intermediate key I and a set of values for that key. It merges together these values to form a possibly smaller set of values. + - Typically just zero or one output value is produced per Reduce invocation. + - The intermediate values are supplied to the user’s reduce function via an iterator, which allows us to handle lists of values that are too large to fit in memory. + +### Word Count Example +- Consider the problem of counting the number of occurrences of each word in a large collection of documents. The user would write code similar to the following pseudo-code: +``` +map(String key, String value): + // key: document name + // value: document contents + for each word w in value: + EmitIntermediate(w, "1"); + +reduce(String key, Iterator values): + // key: a word + // values: a list of counts + int result = 0; + for each v in values: + result += ParseInt(v); + Emit(AsString(result)); +``` + +### Types +- Even though the previous pseudo-code is written in terms of string inputs and outputs, conceptually the map and reduce functions supplied by the user have associated types: +``` +map (k1,v1) -> list(k2,v2) +reduce (k2,list(v2)) -> list(v2) +``` + +### More Examples +- Distributed Grep. +- Count of URL Access Frequency. +- Reverse Web-Link Graph. +- Inverted Index. +- Distributed Sort. + +## Implementation +- Many different implementations of the MapReduce interface are possible. +- The right choice depends on the environment. + - For example, one implementation may be suitable for a small shared-memory machine, another for a large NUMA multi-processor, and yet another for an even larger collection of networked machines. + +### Execution +- The Map invocations are distributed across multiple machines by automatically partitioning the input data into a set of $M$ splits. + - The input splits can be processed in parallel by different machines. +- Reduce invocations are distributed by partitioning the intermediate key space into $R$ pieces using a partitioning function (e.g., $hash(key)$ mod $R$). + - The number of partitions (R) and the partitioning function are specified by the user. +- Overall flow of a MapReduce operation: + 1. The MapReduce library in the user program first splits the input files into M pieces of typically 16 megabytes to 64 megabytes (MB) per piece (controllable by the user via an optional parameter). It then starts up many copies of the program on a cluster of machines. + 2. One of the copies of the program is special – the master. The rest are workers that are assigned work by the master. There are M map tasks and R reduce tasks to assign. The master picks idle workers and assigns each one a map task or a reduce task. + 3. 3. A worker who is assigned a map task reads the contents of the corresponding input split. It parses key/value pairs out of the input data and passes each pair to the user-defined Map function. The intermediate key/value pairs produced by the Map function are buffered in memory. + 4. Periodically, the buffered pairs are written to local disk, partitioned into R regions by the partitioning function. The locations of these buffered pairs on the local disk are passed back to the master, who is responsible for forwarding these locations to the reduce workers. + 5. When a reduce worker is notified by the master about these locations, it uses remote procedure calls to read the buffered data from the local disks of the map workers. When a reduce worker has read all intermediate data, it sorts it by the intermediate keys so that all occurrences of the same key are grouped together. The sorting is needed because typically many different keys map to the same reduce task. If the amount of intermediate data is too large to fit in memory, an external sort is used. + 6. The reduce worker iterates over the sorted intermediate data and for each unique intermediate key encountered, it passes the key and the corresponding set of intermediate values to the user’s Reduce function. The output of the Reduce function is appended to a final output file for this reduce partition. + 7. When all map tasks and reduce tasks have been completed, the master wakes up the user program. At this point, the MapReduce call in the user program returns back to the user code. + 8. After successful completion, the output of the mapreduce execution is available in the R output files (one per reduce task, with file names as specified by the user) + +#### Illustration +![[MapReduce Execution overview.png]] + +### Master Data Structures +- For each map task and reduce task, the master stores the state (idle, in-progress, or completed), and the identity of the worker machine (for non-idle tasks). + +### Fault Tolerance + +#### Worker Failure +- MapReduce is resilient to large-scale worker failures. +- The master pings every worker periodically. +- If no response is received from a worker in a certain amount of time, the master marks the worker as failed. +- Any map tasks completed by the worker are reset back to their initial idle state, and therefore become eligible for scheduling on other workers. + - Similarly, any map task or reduce task in progress on a failed worker is also reset to idle and becomes eligible for rescheduling. + - Completed map tasks are re-executed on a failure because their output is stored on the local disk(s) of the failed machine and is therefore inaccessible. + - Completed reduce tasks do not need to be re-executed since their output is stored in a global file system. +- When a map task is executed first by worker A and then later executed by worker B (because A failed), all workers executing reduce tasks are notified of the re-execution. +- Any reduce task that has not already read the data from worker A will read the data from worker B. + +#### Master Failure +- It is easy to make the master write periodic checkpoints of the master data structures described above. + - If the master task dies, a new copy can be started from the last checkpointed state. +- However, given that there is only a single master, its failure is unlikely; therefore our current implementation aborts the MapReduce computation if the master fails. + - Clients can check for this condition and retry the MapReduce operation if they desire. + +### Locality +- We conserve network bandwidth by taking advantage of the fact that the input data (managed by GFS) is stored on the local disks of the machines that make up our cluster. +- The MapReduce master takes the location information of the input files into account and attempts to schedule a map task on a machine that contains a replica of the corresponding input data. +- Failing that, it attempts to schedule a map task near a replica of that task’s input data (e.g., on a worker machine that is on the same network switch as the machine containing the data). + +### Task Granularity +- We subdivide the map phase into $M$ pieces and the reduce phase into $R$ pieces, as described above. +- Ideally, $M$ and $R$ should be much larger than the number of worker machines. + - Having each worker perform many different tasks improves dynamic load balancing, and also speeds up recovery when a worker fails: the many map tasks it has completed can be spread out across all the other worker machines. +- There are practical bounds on how large $M$ and $R$ can be in our implementation, since the master must make $O(M + R)$ scheduling decisions and keeps $O(M \cdot R)$ state in memory as described above. + +### Backup Tasks +- One of the common causes that lengthens the total time taken for a MapReduce operation is a “straggler”: a machine that takes an unusually long time to complete one of the last few map or reduce tasks in the computation. +- Reasons why stragglers can arise: + - A machine with a bad disk may experience frequent correctable errors that slow its read performance from $30$ MB/s to $1$ MB/s. + - The cluster scheduling system may have scheduled other tasks on the machine, causing it to execute the MapReduce code more slowly due to competition for CPU, memory, local disk, or network bandwidth. +- When a MapReduce operation is close to completion, the master schedules backup executions of the remaining in-progress tasks. + - The task is marked as completed whenever either the primary or the backup execution completes. + +# Sources +- [MapReduce: Simplified Data Processing on Large Clusters](https://static.googleusercontent.com/media/research.google.com/en//archive/mapreduce-osdi04.pdf) +- [MIT 6.824 - Lecture 1](https://www.youtube.com/watch?v=cQP8WApzIQQ&list=PLrw6a1wE39_tb2fErI4-WkMbsvGQk9_UB&index=1&pp=iAQB) diff --git a/Distributed Systems/Replication.md b/Distributed Systems/Replication.md new file mode 100644 index 0000000..0f511ad --- /dev/null +++ b/Distributed Systems/Replication.md @@ -0,0 +1,35 @@ +# Explanation +- *Replication*: creating duplicate copies of data or services across multiple nodes. +- Replication can tolerate **fail-stop** failures of a single replica. Examples: + - Fan stops working, CPU overheats and shuts itself down, + - Someone trips over replica's power cord or network cable, or + - Software notices it is out of disk space and stops. +- It can't tolerate defects in hardware or bugs in software or human configuration errors because they're often not fail-stop and may be correlated (i.e. cause all replicas to crash at the same time). +- It can tolerate catastrophes (earthquake or city-wide power failure) only if the replicas are physically separated. + +## Approaches + +### State Transfer +- *State Transfer*: primary replica executes the service and sends *new* state to backups. +- It's simpler but the state may be large, which means it's slow to transfer over the network. + +### Replicated State Machine +- *Replicated State Machine*: primary receives operations from clients and sends them to all other replicas. + - All replicas execute all operations. + - If all the replicas start at the same state, execute the same operations in the same order, and they're deterministic, then all replicas will arrive at the same end state. +- Often generates less network traffic because operations are often small compared to state but it's complex to get right. + +## Replication Levels +- **Application State**: e.g., database tables. ^518c94 + - Can be efficient; primary only sends high-level operations to backup. + - The downside is application code (server) must understand fault tolerance, to e.g. forward operations stream. + - [[The Google File System|GFS]] works this way. +- **Machine level**: e.g. registers and RAM content + - Might allow us to replicate any existing server without modifications. + - Downsides: + - requires forwarding of machine events (interrupts, DMA, &c), and + - requires "machine" modifications to send/receive event stream. + - [[Fault-tolerant Virtual Machines|VM-FT]] works this way. + +# Sources +- [MIT 6.824 - Lecture 4](https://www.youtube.com/watch?v=M_teob23ZzY) diff --git a/Distributed Systems/The Google File System.md b/Distributed Systems/The Google File System.md new file mode 100644 index 0000000..7e061d4 --- /dev/null +++ b/Distributed Systems/The Google File System.md @@ -0,0 +1,105 @@ +# Explanation + +## Abstract +- The Google File System, a scalable distributed file system for large distributed data-intensive applications. +- It provides fault tolerance while running on inexpensive commodity hardware, and it delivers high aggregate performance to a large number of clients. + +## Introduction +- GFS shares many of the same goals as previous distributed file systems such as performance, scalability, reliability, and availability. +- However, its design has been driven by key observations of Google's application workloads and technological environment, both current and anticipated, that reflect a marked departure from some earlier file system design assumptions. + +### Assumptions +- **Component failures are the norm rather than the exception** + - Problems can be caused by application bugs, operating system bugs, human errors, and the failures of disks, memory, connectors, networking, and power supplies. + - Constant monitoring, error detection, fault tolerance, and automatic recovery must be integral to the system. +- **Files are huge by traditional standards. Multi-GB files are common** + - As a result, design assumptions and parameters such as I/O operation and block sizes have to be revisited. +- **Most files are mutated by appending new data rather than overwriting existing data** + - Random writes within a file are practically non-existent. + - Once written, the files are only read, and often only sequentially. +- **Co-designing the applications and the file system API benefits the overall system by increasing our flexibility** + - For example, GFS has a relaxed consistency model to vastly simplify the file system without imposing an onerous burden on the applications. + - Also, atomic append operation was introduced so that multiple clients can append concurrently to a file without extra synchronization between them. + +## Design Overview + +### Goals +- **The system is built from many inexpensive commodity components that often fail.** + - It must constantly monitor itself and detect, tolerate, and recover promptly from component failures on a routine basis. +- **The system stores a modest number of large files.** + - We expect a few million files, each typically 100 MB or larger in size. Multi-GB files are the common case and should be managed efficiently. + - Small files must be supported, but we need not optimize for them. +- **The workloads primarily consist of two kinds of reads: large streaming reads and small random reads**. +- **The workloads also have many large, sequential writes that append data to files**. + - Once written, files are seldom modified again. Small writes at arbitrary positions in a file are supported but do not have to be efficient. +- **The system must efficiently implement well-defined semantics for multiple clients that concurrently append to the same file**. + - Our files are often used as producer-consumer queues or for many-way merging. + - Hundreds of producers, running one per machine, will concurrently append to a file. Atomicity with minimal synchronization overhead is essential. + - The file may be read later, or a consumer may be reading through the file simultaneously. +- **High sustained bandwidth is more important than low latency**. + - Most of our target applications place a premium on processing data in bulk at a high rate, while few have stringent response time requirements for an individual read or write. + +### Interface +- GFS provides a familiar file system interface, though it does not implement a standard API such as POSIX. +- Files are organized hierarchically in directories and identified by pathnames. +- We support the usual operations to create, delete, open, close, read, and write files. +- Moreover, has two additional operations: + - **Snapshot**: creates a copy of a file or a directory tree at low cost. + - **Record append**: allows multiple clients to append data to the same file concurrently while guaranteeing the atomicity of each individual client’s append. + +### Architecture +- Illustration: ![[GFS Architecture.png]] +- A GFS cluster consists of a single master and multiple chunkservers and is accessed by multiple clients. + - Each of these is typically a commodity Linux machine running a user-level server process. +- Files are divided into fixed-size chunks. + - Each chunk is identified by an immutable and globally unique 64 bit chunk handle assigned by the master at the time of chunk creation. +- Chunkservers store chunks on local disks as Linux files and read or write chunk data specified by a chunk handle and byte range. + - For reliability, each chunk is replicated on multiple chunkservers. + - By default, we store three replicas, though users can designate different replication levels for different regions of the file namespace. +- The master maintains all file system metadata including: the namespace, access control information, the mapping from files to chunks, and the current locations of chunks. + - It also controls system-wide activities such as chunk lease management, garbage collection of orphaned chunks, and chunk migration between chunkservers. +- The master periodically communicates with each chunkserver in HeartBeat messages to give it instructions and collect its state. +- GFS client code linked into each application implements the file system API and communicates with the master and chunkservers to read or write data on behalf of the application. + - Clients interact with the master for metadata operations, but all data-bearing communication goes directly to the chunkservers. +- Neither the client nor the chunkserver caches file data. + - Client caches offer little benefit because most applications stream through huge files or have working sets too large to be cached. + - Not having client caches simplifies the client and the overall system by eliminating cache coherence issues. + - Clients do cache metadata, however. + - Chunkservers need not cache file data because chunks are stored as local files and so Linux’s buffer cache already keeps frequently accessed data in memory. + +### Single Master +- Having a single master vastly simplifies our design and enables the master to make sophisticated chunk placement and replication decisions using global knowledge. +- Clients never read and write file data through the master. Instead, a client asks the master which chunkservers it should contact. + - It caches this information for a limited time and interacts with the chunkservers directly for many subsequent operations. + +### Metadata +- The master stores three major types of metadata: the file and chunk namespaces, the mapping from files to chunks, and the locations of each chunk’s replicas. +- All metadata is kept in the master’s memory. +- Namespaces and file-to-chunk mapping are also kept persistent by logging mutations to an operation log stored on the master’s local disk and replicated on remote machines. + - Using a log allows us to update the master state simply, reliably, and without risking inconsistencies in the event of a master crash. +- The master does not store chunk location information persistently. Instead, it asks each chunkserver about its chunks at master startup and whenever a chunkserver joins the cluster. + +#### In-Memory Data Structures +- Since metadata is stored in memory, master operations are fast. + - Therefore, It is easy and efficient for the master to periodically scan through its entire state in the background. + - This periodic scanning is used to implement chunk garbage collection, re-replication in the presence of chunkserver failures, and chunk migration to balance load and disk space usage across chunkservers. +- One potential concern for this memory-only approach is that the number of chunks and hence the capacity of the whole system is limited by how much memory the master has. + - This is not a serious limitation in practice because the master maintains less than 64 bytes of metadata for each 64 MB chunk. + +#### Chunk Locations +- The master does not keep a persistent record of which chunkservers have a replica of a given chunk. + - It simply polls chunkservers for that information at startup. + - The master can keep itself up-to-date thereafter because it controls all chunk placement and monitors chunkserver status with regular HeartBeat messages. +- This eliminates the problem of keeping the master and chunkservers in sync as chunkservers join and leave the cluster, change names, fail, restart, and so on. + +#### Operation Log +- The operation log contains a historical record of critical metadata changes. +- It is central to GFS. Not only is it the only persistent record of metadata, but it also serves as a logical time line that defines the order of concurrent operations. +- Files and chunks, as well as their versions are all uniquely and eternally identified by the logical times at which they were created. + +## System Interactions +// TODO + +# Sources +- [The Google File System](http://nil.csail.mit.edu/6.824/2020/papers/gfs.pdf) +- [MIT 6.824 - Lecture 3](https://www.youtube.com/watch?v=EpIgvowZr00&list=PLrw6a1wE39_tb2fErI4-WkMbsvGQk9_UB&index=3)