Skip to content

Commit 02f535d

Browse files
authored
AMQP 1.0 (#91)
Completely rewrote message broker comm layer, removed support for RabbitMQ AMQP 0.9.1, added support for AMQP 1.0 - tested with RabbitMQ 4.2.0, AciveMQ Classic 6.1.8, ActiveMQ Artemis 2.44.0. Fixed a few bugs in/around message handler. Improvements in TF deployment. Removed k8s POC and Amazon Keyspaces deployment, they are now in https://github.com/capillariesio/capillaries-experimental. Added experimental CapiMQ message broker.
1 parent 3d6861a commit 02f535d

File tree

171 files changed

+6483
-4547
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

171 files changed

+6483
-4547
lines changed

.github/workflows/go.yml

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,19 @@ jobs:
5353
exit 1
5454
fi
5555
56+
- name: pkg/capimq_message_broker test coverage threshold check
57+
env:
58+
TESTCOVERAGE_THRESHOLD: 82.4
59+
run: |
60+
go test -v ./pkg/capimq_message_broker/... -coverprofile coverage.out -covermode count
61+
totalCoverage=`go tool cover -func=coverage.out | grep total | grep -Eo '[0-9]+\.[0-9]+'`
62+
if (( $(echo "$totalCoverage $TESTCOVERAGE_THRESHOLD" | awk '{print ($1 >= $2)}') )); then
63+
echo -e "\033[32mOK: $totalCoverage >= $TESTCOVERAGE_THRESHOLD\033[0m"
64+
else
65+
echo -e "\033[31mFAILED: $totalCoverage < $TESTCOVERAGE_THRESHOLD. Cover more with unit tests or adjust threshold to a lower value.\033[0m"
66+
exit 1
67+
fi
68+
5669
- name: pkg/cql test coverage threshold check
5770
env:
5871
TESTCOVERAGE_THRESHOLD: 88.1
@@ -81,7 +94,7 @@ jobs:
8194
8295
- name: pkg/custom/tag_and_denormalize test coverage threshold check
8396
env:
84-
TESTCOVERAGE_THRESHOLD: 76.1
97+
TESTCOVERAGE_THRESHOLD: 78.5
8598
run: |
8699
go test -v ./pkg/custom/tag_and_denormalize/... -coverprofile coverage.out -covermode count
87100
totalCoverage=`go tool cover -func=coverage.out | grep total | grep -Eo '[0-9]+\.[0-9]+'`
@@ -94,7 +107,7 @@ jobs:
94107
95108
- name: pkg/dpc test coverage threshold check
96109
env:
97-
TESTCOVERAGE_THRESHOLD: 85.7
110+
TESTCOVERAGE_THRESHOLD: 80.0
98111
run: |
99112
go test -v ./pkg/dpc/... -coverprofile coverage.out -covermode count
100113
totalCoverage=`go tool cover -func=coverage.out | grep total | grep -Eo '[0-9]+\.[0-9]+'`
@@ -120,7 +133,7 @@ jobs:
120133
121134
- name: pkg/sc test coverage threshold check
122135
env:
123-
TESTCOVERAGE_THRESHOLD: 89.6
136+
TESTCOVERAGE_THRESHOLD: 89.5
124137
run: |
125138
go test -v ./pkg/sc/... -coverprofile coverage.out -covermode count
126139
totalCoverage=`go tool cover -func=coverage.out | grep total | grep -Eo '[0-9]+\.[0-9]+'`

.golangci.yml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,4 +133,12 @@ linters:
133133
linters: [ revive ]
134134
- path: pkg\/proc\/proc_file_creator\.go
135135
text: "unchecked-type-assertion: type cast result is unchecked in [^ ]+FileRecordHeapItem"
136-
linters: [ revive ]
136+
linters: [ revive ]
137+
- path: pkg\/api\/message_handler\.go
138+
text: function ProcessDataBatchMsg has cyclomatic complexity
139+
linters: [ revive ]
140+
# - path: .+\.go
141+
# text: "unused-parameter: parameter '_"
142+
# linters: [ revive ]
143+
144+

.vscode/launch.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,16 @@
107107
"program": "${workspaceFolder}/pkg/exe/webapi/capiwebapi.go",
108108
"args": []
109109
},
110+
{
111+
"name": "CapiMqBroker",
112+
"type": "go",
113+
"request": "launch",
114+
"mode": "debug",
115+
"envFile": "${env:HOME}/vscode-daemon.env",
116+
"cwd":"${workspaceFolder}/pkg/exe/mq",
117+
"program": "${workspaceFolder}/pkg/exe/mq/capimq.go",
118+
"args": []
119+
},
110120
{
111121
"name": "capiparquet diff",
112122
"type": "go",

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ This is a GitHub readme. More details and a blog at https://capillaries.io.
1818
| Data filtering | SQL queries, custom code | [Go expressions](doc/glossary.md#go-expressions) (scalability, maintainability) |
1919
| Data transform | SQL expressions, custom code | [Go expressions](doc/glossary.md#go-expressions), Python [formulas](doc/glossary.md#py_calc-processor) (parallel execution, maintainability) |
2020
| Intermediate data storage | Files, relational databases | on-the-fly-created Cassandra [keyspaces](doc/glossary.md#keyspace) and [tables](doc/glossary.md#table) (scalability, maintainability) |
21-
| Workflow execution | Shell scripts, custom code, workflow frameworks | RabbitMQ as scheduler, workflow status stored in Cassandra (parallel execution, fault tolerance, incremental computing) |
21+
| Workflow execution | Shell scripts, custom code, workflow frameworks | Message queue as scheduler, workflow status stored in Cassandra (parallel execution, fault tolerance, incremental computing) |
2222
| Workflow monitoring and interaction | Custom solutions | Capillaries [UI](ui/README.md), [Toolbelt](doc/glossary.md#toolbelt) utility, [API](doc/api.md), [Web API](doc/glossary.md#webapi) (transparency, operator validation support) |
2323
| Workflow management | Shell scripts, custom code | Capillaries configuration: [script file](doc/glossary.md#script) with [DAG](doc/glossary.md#dag), Python [formulas](doc/glossary.md#py_calc-processor) |
2424

@@ -65,7 +65,7 @@ Log messages generated by:
6565
- Capillaries [Daemon](./doc/glossary.md#daemon)
6666
- Capillaries [WebAPI](./doc/glossary.md#webapi)
6767
- Capillaries [UI](./doc/glossary.md#capillaries-ui)
68-
- RabbitMQ
68+
- Message queue
6969
- Cassandra with Prometheus jmx-exporter
7070
- Prometheus
7171
are collected by fluentd and saved in /tmp/capi_log.
@@ -91,11 +91,11 @@ For more details about getting started, see [Getting started](doc/started.md).
9191

9292
#### Container-based deployments
9393

94-
Capillaries binaries are intended to be container-friendly. Check out the `docker-compose.yml` and [Kubernetes deployment POC](./deploy/k8s/README.md), these test projects may be a good starting point for creating your full-scale container-based deployment.
94+
Capillaries binaries are intended to be container-friendly. Check out the `docker-compose.yml`, it may be a good starting point for creating your full-scale container-based deployment.
9595

9696
#### VM-based deployment
9797

98-
See [Terraform script](./deploy/tf/cassandra_cluster/README.md) that creates Capillaries deployment in AWS.
98+
See [Terraform script](./deploy/README.md) that creates Capillaries deployment in AWS.
9999

100100
## Capillaries in depth
101101

binaries_build.sh

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
#!/bin/bash
22

3-
DIR_BUILD_LINUX_AMD64=./build/linux/amd64
4-
DIR_BUILD_LINUX_ARM64=./build/linux/arm64
3+
DIR_BUILD_LINUX=./build/linux
4+
DIR_BUILD_LINUX_AMD64=$DIR_BUILD_LINUX/amd64
5+
DIR_BUILD_LINUX_ARM64=$DIR_BUILD_LINUX/arm64
56
DIR_BUILD_CA=./build/ca
67
DIR_BUILD_WEBUI=./build/webui
78

8-
rm -fR ./build
9+
rm -fR $DIR_BUILD_CA
10+
rm -fR $DIR_BUILD_LINUX
11+
rm -fR $DIR_BUILD_WEBUI
12+
913
mkdir -p $DIR_BUILD_LINUX_AMD64
1014
mkdir -p $DIR_BUILD_LINUX_ARM64
1115
mkdir -p $DIR_BUILD_CA
@@ -45,6 +49,12 @@ gzip -f -k $DIR_BUILD_LINUX_AMD64/capitoolbelt
4549
GOOS=linux GOARCH=amd64 go build -o $DIR_BUILD_LINUX_AMD64/capiparquet -ldflags="-s -w -X main.version=$(git describe --tags --always)" ./test/code/parquet/capiparquet.go
4650
gzip -f -k $DIR_BUILD_LINUX_AMD64/capiparquet
4751

52+
GOOS=linux GOARCH=arm64 go build -o $DIR_BUILD_LINUX_AMD64/capimq -ldflags="-s -w -X main.version=$(git describe --tags --always)" ./pkg/exe/mq/capimq.go
53+
cp ./pkg/exe/mq/capimq.json $DIR_BUILD_LINUX_AMD64
54+
gzip -f -k $DIR_BUILD_LINUX_AMD64/capimq
55+
56+
57+
4858
echo "Building "$DIR_BUILD_LINUX_ARM64
4959

5060
GOOS=linux GOARCH=arm64 go build -o $DIR_BUILD_LINUX_ARM64/capidaemon -ldflags="-s -w -X main.version=$(git describe --tags --always)" ./pkg/exe/daemon/capidaemon.go
@@ -61,3 +71,7 @@ gzip -f -k $DIR_BUILD_LINUX_ARM64/capitoolbelt
6171

6272
GOOS=linux GOARCH=arm64 go build -o $DIR_BUILD_LINUX_ARM64/capiparquet -ldflags="-s -w -X main.version=$(git describe --tags --always)" ./test/code/parquet/capiparquet.go
6373
gzip -f -k $DIR_BUILD_LINUX_ARM64/capiparquet
74+
75+
GOOS=linux GOARCH=arm64 go build -o $DIR_BUILD_LINUX_ARM64/capimq -ldflags="-s -w -X main.version=$(git describe --tags --always)" ./pkg/exe/mq/capimq.go
76+
cp ./pkg/exe/mq/capimq.json $DIR_BUILD_LINUX_ARM64
77+
gzip -f -k $DIR_BUILD_LINUX_ARM64/capimq

binaries_upload.sh

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,6 @@ if [ "$1" = "" ]; then
4343
fi
4444

4545
echo "Copying in files to "$1
46-
aws s3 cp ./build/ $1/ --recursive --include "*"
46+
aws s3 cp ./build/ca $1/ca/ --recursive --include "*"
47+
aws s3 cp ./build/linux $1/linux/ --recursive --include "*"
48+
aws s3 cp ./build/webui $1/webui/ --recursive --include "*"
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ TF variables: `ssh_keypair_name`
2020

2121
## S3 bucket
2222

23-
Make sure you have an S3 bucket as explained [Capillaries S3 doc](../../../doc/s3.md). Most likely, you will need to set up a user [UserAccessCapillariesTestbucket](../../../doc/s3.md#iam-user-useraccesscapillariestestbucket), otherwise you will not be able to upload source data and configuration files to the bucket from your devops machine. Make sure this user has full access to the bucket as the doc says.
23+
Make sure you have an S3 bucket as explained [Capillaries S3 doc](../doc/s3.md). Most likely, you will need to set up a user [UserAccessCapillariesTestbucket](../doc/s3.md#iam-user-useraccesscapillariestestbucket), otherwise you will not be able to upload source data and configuration files to the bucket from your devops machine. Make sure this user has full access to the bucket as the doc says.
2424

2525
EC2 instances of this deployment do NOT need UserAccessCapillariesTestbucket credentials, the instances use AssumeRole mechanism to access S3 bucket.
2626

27-
This example writes [Webapi](../../../doc/glossary.md#webapi) and [Daemon](../../../doc/glossary.md#daemon) log files to the `log` directory in your bucket.
27+
This example writes [Webapi](../doc/glossary.md#webapi) and [Daemon](../doc/glossary.md#daemon) log files to the `log` directory in your bucket.
2828

2929
TF variables: `s3_log_url`
3030

@@ -94,7 +94,7 @@ export CAPILLARIES_AWS_TESTBUCKET=capillaries-testbucket
9494
export BASTION_IP=your_bastion_ip_address
9595
export EXTERNAL_WEBAPI_PORT=6544
9696
97-
# Remember UserAccessCapillariesTestbucket introduced in ../../../doc/s3.md ?
97+
# Remember UserAccessCapillariesTestbucket introduced in ../doc/s3.md ?
9898
source ~/UserAccessCapillariesTestbucket.rc
9999
100100
# Full test cycle

deploy/tf/cassandra_cluster/bastion.sh.tpl renamed to deploy/bastion.sh.tpl

File renamed without changes.

deploy/bastion.tf

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
resource "aws_network_interface" "bastion_internal_ip" {
2+
subnet_id = aws_subnet.public_subnet.id
3+
private_ips = [var.internal_bastion_ip]
4+
security_groups = [aws_security_group.capillaries_securitygroup_bastion.id]
5+
6+
tags = {
7+
Name = "capillaries_bastion_internal_ip"
8+
}
9+
}
10+
11+
resource "aws_eip_association" "bastion_public_ip_association" {
12+
instance_id = aws_instance.bastion.id
13+
allocation_id = aws_eip.bastion_public_ip.id
14+
}
15+
16+
# Make sure it matched the list of expected variables in bastion.sh (bastion_provisioner_vars add a bit too)
17+
locals {
18+
bastion_provisioner_static_vars = "WEBAPI_GOMEMLIMIT_GB=${local.webapi_gomemlimit_gb} WEBAPI_GOGC=${var.webapi_gogc} AWSREGION=${var.awsregion} SSH_USER=${var.ssh_user} OS_ARCH=${local.os_arch} CAPILLARIES_RELEASE_URL=${var.capillaries_release_url} EXTERNAL_WEBAPI_PORT=${var.external_webapi_port} INTERNAL_WEBAPI_PORT=${var.internal_webapi_port} EXTERNAL_RABBITMQ_CONSOLE_PORT=${var.external_rabbitmq_console_port} EXTERNAL_ACTIVEMQ_CONSOLE_PORT=${var.external_activemq_console_port} EXTERNAL_PROMETHEUS_CONSOLE_PORT=${var.external_prometheus_console_port} BASTION_ALLOWED_IPS=${var.BASTION_ALLOWED_IPS} S3_LOG_URL=${var.s3_log_url} CASSANDRA_HOSTS=${local.cassandra_hosts} CASSANDRA_PORT=${var.cassandra_port} CASSANDRA_USERNAME=${var.cassandra_username} CASSANDRA_PASSWORD=${var.cassandra_password} MQ_TYPE=${var.mq_type} CAPIMQ_CLIENT_URL=${local.capimq_url} AMQP10_URL=${local.activemq_url} AMQP10_ADDRESS=${var.amqp10_flavor_address_map[var.amqp10_server_flavor]} AMQP10_USER_NAME=${var.amqp10_user_name} AMQP10_USER_PASS=${var.amqp10_user_pass} AMQP10_ADMIN_NAME=${var.amqp10_admin_name} AMQP10_ADMIN_PASS=${var.amqp10_admin_pass} AMQP10_SERVER_FLAVOR=${var.amqp10_server_flavor} PROMETHEUS_NODE_EXPORTER_FILENAME=${local.prometheus_node_exporter_filename} PROMETHEUS_SERVER_FILENAME=${local.prometheus_server_filename} PROMETHEUS_NODE_TARGETS=${local.prometheus_node_targets} PROMETHEUS_JMX_TARGETS=${local.prometheus_jmx_targets} PROMETHEUS_GO_TARGETS=${local.prometheus_go_targets} RABBITMQ_ERLANG_FILENAME=${local.rabbitmq_erlang_filename} RABBITMQ_SERVER_FILENAME=${local.rabbitmq_server_filename} ACTIVEMQ_CLASSIC_SERVER_FILENAME=${local.activemq_classic_server_filename} ACTIVEMQ_ARTEMIS_SERVER_FILENAME=${local.activemq_artemis_server_filename} INTERNAL_CAPIMQ_BROKER_PORT=${var.internal_capimq_broker_port} EXTERNAL_CAPIMQ_BROKER_PORT=${var.external_capimq_broker_port} CAPIMQ_BROKER_MAX_MESSAGES=${var.capimq_broker_max_messages} CAPIMQ_BROKER_RETURNED_DELIVERY_DELAY=${var.capimq_broker_returned_delivery_delay} CAPIMQ_BROKER_DEAD_AFTER_NO_HEARTBEAT_TIMEOUT=${var.capimq_broker_dead_after_no_heartbeat_timeout} WEBAPI_PROMETHEUS_EXPORTER_PORT=${var.webapi_prometheus_exporter_port} CAPIMQ_PROMETHEUS_EXPORTER_PORT=${var.capimq_prometheus_exporter_port}"
19+
}
20+
21+
resource "aws_instance" "bastion" {
22+
instance_type = var.bastion_instance_type
23+
ami = var.bastion_ami_name
24+
key_name = var.ssh_keypair_name
25+
26+
network_interface {
27+
network_interface_id = aws_network_interface.bastion_internal_ip.id
28+
device_index = 0
29+
}
30+
31+
# Bastion needs to assume this role to access S3 bucket to get cloud-init bastion.sh
32+
iam_instance_profile = aws_iam_instance_profile.capillaries_instance_profile.name
33+
34+
user_data = templatefile("./bastion.sh.tpl", {
35+
os_arch = local.os_arch
36+
ssh_user = var.ssh_user
37+
capillaries_instance_profile = aws_iam_instance_profile.capillaries_instance_profile.name
38+
# Used in bastion.sh.tpl
39+
bastion_provisioner_vars = join(" ", [local.bastion_provisioner_static_vars], ["BASTION_EXTERNAL_IP_ADDRESS=${aws_eip.bastion_public_ip.public_ip}"])
40+
capillaries_tf_deploy_temp_bucket_name = var.capillaries_tf_deploy_temp_bucket_name
41+
})
42+
43+
tags = {
44+
Name = "capillaries_bastion"
45+
}
46+
}

deploy/tf/cassandra_cluster/cassandra.sh.tpl renamed to deploy/cassandra.sh.tpl

File renamed without changes.

0 commit comments

Comments
 (0)