Skip to content

Commit 89c51cc

Browse files
authored
Expose cassandra consistency and serial consistency settings in configuration (#533)
This change enables an end-user to configure consistency and serial consistency for their Cassandra Datastores. The behavior is as follows: If the user chooses not to set this at all, we default to local_quorum and local_serial respectively. Otherwise, the user can choose to set a "default" that applies to ALL stores. We expect the string values as understood by the gocql library and will validate this on server startup.
1 parent 7ffa04c commit 89c51cc

15 files changed

+381
-35
lines changed

common/payloads/payloads.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func Decode(ps *commonpb.Payloads, valuePtr ...interface{}) error {
6565
func ToString(ps *commonpb.Payloads) string {
6666
str, err := toString(ps)
6767
if err != nil {
68-
return fmt.Sprintf("Unable to decode heartbeat details. %+v", err)
68+
return fmt.Sprintf("Unable to decode payloads. %+v", err)
6969
}
7070
return str
7171
}

common/persistence/cassandra/cassandraClusterMetadata.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
"strings"
3030
"time"
3131

32-
"github.com/gocql/gocql"
3332
"github.com/pborman/uuid"
3433
"go.temporal.io/api/serviceerror"
3534

@@ -85,12 +84,12 @@ type (
8584

8685
var _ p.ClusterMetadataStore = (*cassandraClusterMetadata)(nil)
8786

88-
// newMetadataPersistenceV2 is used to create an instance of HistoryManager implementation
87+
// newClusterMetadataInstance is used to create an instance of ClusterMetadataStore implementation
8988
func newClusterMetadataInstance(cfg config.Cassandra, logger log.Logger) (p.ClusterMetadataStore, error) {
9089
cluster := cassandra.NewCassandraCluster(cfg)
9190
cluster.ProtoVersion = cassandraProtoVersion
92-
cluster.Consistency = gocql.LocalQuorum
93-
cluster.SerialConsistency = gocql.LocalSerial
91+
cluster.Consistency = cfg.Consistency.GetConsistency()
92+
cluster.SerialConsistency = cfg.Consistency.GetSerialConsistency()
9493
cluster.Timeout = defaultSessionTimeout
9594

9695
session, err := cluster.CreateSession()

common/persistence/cassandra/cassandraHistoryPersistence.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,16 +79,16 @@ func NewHistoryV2PersistenceFromSession(
7979
return &cassandraHistoryV2Persistence{cassandraStore: cassandraStore{session: session, logger: logger}}
8080
}
8181

82-
// newHistoryPersistence is used to create an instance of HistoryManager implementation
82+
// newHistoryV2Persistence is used to create an instance of HistoryManager implementation
8383
func newHistoryV2Persistence(
8484
cfg config.Cassandra,
8585
logger log.Logger,
8686
) (p.HistoryStore, error) {
8787

8888
cluster := cassandra.NewCassandraCluster(cfg)
8989
cluster.ProtoVersion = cassandraProtoVersion
90-
cluster.Consistency = gocql.LocalQuorum
91-
cluster.SerialConsistency = gocql.LocalSerial
90+
cluster.Consistency = cfg.Consistency.GetConsistency()
91+
cluster.SerialConsistency = cfg.Consistency.GetSerialConsistency()
9292
cluster.Timeout = defaultSessionTimeout
9393
session, err := cluster.CreateSession()
9494
if err != nil {

common/persistence/cassandra/cassandraMetadataPersistenceV2.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,12 +99,12 @@ type (
9999
}
100100
)
101101

102-
// newMetadataPersistenceV2 is used to create an instance of HistoryManager implementation
102+
// newMetadataPersistenceV2 is used to create an instance of the Namespace MetadataStore implementation
103103
func newMetadataPersistenceV2(cfg config.Cassandra, currentClusterName string, logger log.Logger) (p.MetadataStore, error) {
104104
cluster := cassandra.NewCassandraCluster(cfg)
105105
cluster.ProtoVersion = cassandraProtoVersion
106-
cluster.Consistency = gocql.LocalQuorum
107-
cluster.SerialConsistency = gocql.LocalSerial
106+
cluster.Consistency = cfg.Consistency.GetConsistency()
107+
cluster.SerialConsistency = cfg.Consistency.GetSerialConsistency()
108108
cluster.Timeout = defaultSessionTimeout
109109

110110
session, err := cluster.CreateSession()

common/persistence/cassandra/cassandraPersistence.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -726,8 +726,8 @@ var _ p.ExecutionStore = (*cassandraPersistence)(nil)
726726
func newShardPersistence(cfg config.Cassandra, clusterName string, logger log.Logger) (p.ShardStore, error) {
727727
cluster := cassandra.NewCassandraCluster(cfg)
728728
cluster.ProtoVersion = cassandraProtoVersion
729-
cluster.Consistency = gocql.LocalQuorum
730-
cluster.SerialConsistency = gocql.LocalSerial
729+
cluster.Consistency = cfg.Consistency.GetConsistency()
730+
cluster.SerialConsistency = cfg.Consistency.GetSerialConsistency()
731731
cluster.Timeout = defaultSessionTimeout
732732

733733
session, err := cluster.CreateSession()
@@ -755,8 +755,8 @@ func NewWorkflowExecutionPersistence(
755755
func newTaskPersistence(cfg config.Cassandra, logger log.Logger) (p.TaskStore, error) {
756756
cluster := cassandra.NewCassandraCluster(cfg)
757757
cluster.ProtoVersion = cassandraProtoVersion
758-
cluster.Consistency = gocql.LocalQuorum
759-
cluster.SerialConsistency = gocql.LocalSerial
758+
cluster.Consistency = cfg.Consistency.GetConsistency()
759+
cluster.SerialConsistency = cfg.Consistency.GetSerialConsistency()
760760
cluster.Timeout = defaultSessionTimeout
761761
session, err := cluster.CreateSession()
762762
if err != nil {
@@ -1970,7 +1970,7 @@ func (d *cassandraPersistence) GetTransferTask(request *p.GetTransferTaskRequest
19701970

19711971
func (d *cassandraPersistence) GetTransferTasks(request *p.GetTransferTasksRequest) (*p.GetTransferTasksResponse, error) {
19721972

1973-
// Reading transfer tasks need to be quorum level consistent, otherwise we could loose task
1973+
// Reading transfer tasks need to be quorum level consistent, otherwise we could lose task
19741974
query := d.session.Query(templateGetTransferTasksQuery,
19751975
d.shardID,
19761976
rowTypeTransferTask,
@@ -2041,7 +2041,7 @@ func (d *cassandraPersistence) GetReplicationTasks(
20412041
request *p.GetReplicationTasksRequest,
20422042
) (*p.GetReplicationTasksResponse, error) {
20432043

2044-
// Reading replication tasks need to be quorum level consistent, otherwise we could loose task
2044+
// Reading replication tasks need to be quorum level consistent, otherwise we could lose task
20452045
query := d.session.Query(templateGetReplicationTasksQuery,
20462046
d.shardID,
20472047
rowTypeReplicationTask,
@@ -2540,7 +2540,7 @@ func (d *cassandraPersistence) GetTasks(request *p.GetTasksRequest) (*p.GetTasks
25402540
return &p.GetTasksResponse{}, nil
25412541
}
25422542

2543-
// Reading taskqueue tasks need to be quorum level consistent, otherwise we could loose task
2543+
// Reading taskqueue tasks need to be quorum level consistent, otherwise we could lose tasks
25442544
query := d.session.Query(templateGetTasksQuery,
25452545
request.NamespaceID,
25462546
request.TaskQueue,
@@ -2671,7 +2671,7 @@ func (d *cassandraPersistence) GetTimerTask(request *p.GetTimerTaskRequest) (*p.
26712671

26722672
func (d *cassandraPersistence) GetTimerIndexTasks(request *p.GetTimerIndexTasksRequest) (*p.GetTimerIndexTasksResponse,
26732673
error) {
2674-
// Reading timer tasks need to be quorum level consistent, otherwise we could loose task
2674+
// Reading timer tasks need to be quorum level consistent, otherwise we could lose tasks
26752675
minTimestamp := p.UnixNanoToDBTimestamp(request.MinTimestamp.UnixNano())
26762676
maxTimestamp := p.UnixNanoToDBTimestamp(request.MaxTimestamp.UnixNano())
26772677
query := d.session.Query(templateGetTimerTasksQuery,
@@ -2743,7 +2743,7 @@ func (d *cassandraPersistence) PutReplicationTaskToDLQ(request *p.PutReplication
27432743
func (d *cassandraPersistence) GetReplicationTasksFromDLQ(
27442744
request *p.GetReplicationTasksFromDLQRequest,
27452745
) (*p.GetReplicationTasksFromDLQResponse, error) {
2746-
// Reading replication tasks need to be quorum level consistent, otherwise we could loose task
2746+
// Reading replication tasks need to be quorum level consistent, otherwise we could lose tasks
27472747
query := d.session.Query(templateGetReplicationTasksQuery,
27482748
d.shardID,
27492749
rowTypeDLQ,

common/persistence/cassandra/cassandraQueue.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@ func newQueue(
8080
) (persistence.Queue, error) {
8181
cluster := cassandra.NewCassandraCluster(cfg)
8282
cluster.ProtoVersion = cassandraProtoVersion
83-
cluster.Consistency = gocql.LocalQuorum
84-
cluster.SerialConsistency = gocql.LocalSerial
83+
cluster.Consistency = cfg.Consistency.GetConsistency()
84+
cluster.SerialConsistency = cfg.Consistency.GetSerialConsistency()
8585
cluster.Timeout = defaultSessionTimeout
8686

8787
session, err := cluster.CreateSession()
@@ -200,7 +200,7 @@ func (q *cassandraQueue) ReadMessages(
200200
lastMessageID int64,
201201
maxCount int,
202202
) ([]*persistence.QueueMessage, error) {
203-
// Reading replication tasks need to be quorum level consistent, otherwise we could loose task
203+
// Reading replication tasks need to be quorum level consistent, otherwise we could lose tasks
204204
query := q.session.Query(templateGetMessagesQuery,
205205
q.queueType,
206206
lastMessageID,
@@ -234,7 +234,7 @@ func (q *cassandraQueue) ReadMessagesFromDLQ(
234234
pageSize int,
235235
pageToken []byte,
236236
) ([]*persistence.QueueMessage, []byte, error) {
237-
// Reading replication tasks need to be quorum level consistent, otherwise we could loose task
237+
// Reading replication tasks need to be quorum level consistent, otherwise we could lose tasks
238238
// Use negative queue type as the dlq type
239239
query := q.session.Query(templateGetMessagesFromDLQQuery,
240240
q.getDLQTypeFromQueueType(),

common/persistence/cassandra/cassandraVisibilityPersistence.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,8 @@ type (
152152
func newVisibilityPersistence(cfg config.Cassandra, logger log.Logger) (p.VisibilityStore, error) {
153153
cluster := cassandra.NewCassandraCluster(cfg)
154154
cluster.ProtoVersion = cassandraProtoVersion
155-
cluster.Consistency = gocql.LocalQuorum
156-
cluster.SerialConsistency = gocql.LocalSerial
155+
cluster.Consistency = cfg.Consistency.GetConsistency()
156+
cluster.SerialConsistency = cfg.Consistency.GetSerialConsistency()
157157
cluster.Timeout = defaultSessionTimeout
158158

159159
session, err := cluster.CreateSession()

common/persistence/cassandra/cassandraVisibilityPersistenceV2.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ type (
8181
func NewVisibilityPersistenceV2(persistence p.VisibilityStore, cfg *config.Cassandra, logger log.Logger) (p.VisibilityStore, error) {
8282
cluster := cassandra.NewCassandraCluster(*cfg)
8383
cluster.ProtoVersion = cassandraProtoVersion
84-
cluster.Consistency = gocql.LocalQuorum
85-
cluster.SerialConsistency = gocql.LocalSerial
84+
cluster.Consistency = cfg.Consistency.GetConsistency()
85+
cluster.SerialConsistency = cfg.Consistency.GetSerialConsistency()
8686
cluster.Timeout = defaultSessionTimeout
8787

8888
session, err := cluster.CreateSession()

common/persistence/cassandra/factory.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,8 @@ func (f *Factory) executionStoreFactory() (*executionStoreFactory, error) {
138138
func newExecutionStoreFactory(cfg config.Cassandra, logger log.Logger) (*executionStoreFactory, error) {
139139
cluster := cassandra.NewCassandraCluster(cfg)
140140
cluster.ProtoVersion = cassandraProtoVersion
141-
cluster.Consistency = gocql.LocalQuorum
142-
cluster.SerialConsistency = gocql.LocalSerial
141+
cluster.Consistency = cfg.Consistency.GetConsistency()
142+
cluster.SerialConsistency = cfg.Consistency.GetSerialConsistency()
143143
cluster.Timeout = defaultSessionTimeout
144144
session, err := cluster.CreateSession()
145145
if err != nil {

common/service/config/config.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,23 @@ type (
238238
MaxConns int `yaml:"maxConns"`
239239
// TLS configuration
240240
TLS *auth.TLS `yaml:"tls"`
241+
// Consistency configuration (defaults to LOCAL_QUORUM / LOCAL_SERIAL for all stores if this field not set)
242+
Consistency *CassandraStoreConsistency `yaml:"consistency"`
243+
}
244+
245+
// CassandraStoreConsistency enables you to set the consistency settings for each Cassandra Persistence Store for Temporal
246+
CassandraStoreConsistency struct {
247+
// Default defines the consistency level for ALL stores.
248+
// Defaults to LOCAL_QUORUM and LOCAL_SERIAL if not set
249+
Default *CassandraConsistencySettings `yaml:"default"`
250+
}
251+
252+
// CassandraConsistencySettings sets the default consistency level for regular & serial queries to Cassandra.
253+
CassandraConsistencySettings struct {
254+
// Consistency sets the default consistency level. Values identical to gocql Consistency values. (defaults to LOCAL_QUORUM if not set).
255+
Consistency string `yaml:"consistency"`
256+
// SerialConsistency sets the consistency for the serial prtion of queries. Values identical to gocql SerialConsistency values. (defaults to LOCAL_SERIAL if not set)
257+
SerialConsistency string `yaml:"serialConsistency"`
241258
}
242259

243260
// SQL is the configuration for connecting to a SQL backed datastore

0 commit comments

Comments
 (0)