Skip to content

Commit c76c70a

Browse files
zeripathsapk
authored andcommitted
Move mailer to use a queue (#9789)
* Move mailer to use a queue * Make sectionMap map[string]bool * Ensure that Message is json encodable
1 parent 06cd3e0 commit c76c70a

File tree

4 files changed

+92
-53
lines changed

4 files changed

+92
-53
lines changed

modules/setting/queue.go

+17-7
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,11 @@ func NewQueueService() {
103103

104104
// Now handle the old issue_indexer configuration
105105
section := Cfg.Section("queue.issue_indexer")
106-
issueIndexerSectionMap := map[string]string{}
106+
sectionMap := map[string]bool{}
107107
for _, key := range section.Keys() {
108-
issueIndexerSectionMap[key.Name()] = key.Value()
108+
sectionMap[key.Name()] = true
109109
}
110-
if _, ok := issueIndexerSectionMap["TYPE"]; !ok {
110+
if _, ok := sectionMap["TYPE"]; !ok {
111111
switch Indexer.IssueQueueType {
112112
case LevelQueueType:
113113
section.Key("TYPE").SetValue("level")
@@ -120,18 +120,28 @@ func NewQueueService() {
120120
Indexer.IssueQueueType)
121121
}
122122
}
123-
if _, ok := issueIndexerSectionMap["LENGTH"]; !ok {
123+
if _, ok := sectionMap["LENGTH"]; !ok {
124124
section.Key("LENGTH").SetValue(fmt.Sprintf("%d", Indexer.UpdateQueueLength))
125125
}
126-
if _, ok := issueIndexerSectionMap["BATCH_LENGTH"]; !ok {
126+
if _, ok := sectionMap["BATCH_LENGTH"]; !ok {
127127
section.Key("BATCH_LENGTH").SetValue(fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber))
128128
}
129-
if _, ok := issueIndexerSectionMap["DATADIR"]; !ok {
129+
if _, ok := sectionMap["DATADIR"]; !ok {
130130
section.Key("DATADIR").SetValue(Indexer.IssueQueueDir)
131131
}
132-
if _, ok := issueIndexerSectionMap["CONN_STR"]; !ok {
132+
if _, ok := sectionMap["CONN_STR"]; !ok {
133133
section.Key("CONN_STR").SetValue(Indexer.IssueQueueConnStr)
134134
}
135+
136+
// Handle the old mailer configuration
137+
section = Cfg.Section("queue.mailer")
138+
sectionMap = map[string]bool{}
139+
for _, key := range section.Keys() {
140+
sectionMap[key.Name()] = true
141+
}
142+
if _, ok := sectionMap["LENGTH"]; !ok {
143+
section.Key("LENGTH").SetValue(fmt.Sprintf("%d", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100)))
144+
}
135145
}
136146

137147
// ParseQueueConnStr parses a queue connection string

services/mailer/mail.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func InitMailRender(subjectTpl *texttmpl.Template, bodyTpl *template.Template) {
5151

5252
// SendTestMail sends a test mail
5353
func SendTestMail(email string) error {
54-
return gomail.Send(Sender, NewMessage([]string{email}, "Gitea Test Email!", "Gitea Test Email!").Message)
54+
return gomail.Send(Sender, NewMessage([]string{email}, "Gitea Test Email!", "Gitea Test Email!").ToMessage())
5555
}
5656

5757
// SendUserMail sends a mail to the user

services/mailer/mail_test.go

+15-14
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,11 @@ func TestComposeIssueCommentMessage(t *testing.T) {
6161
msgs := composeIssueCommentMessages(&mailCommentContext{Issue: issue, Doer: doer, ActionType: models.ActionCommentIssue,
6262
Content: "test body", Comment: comment}, tos, false, "issue comment")
6363
assert.Len(t, msgs, 2)
64-
65-
mailto := msgs[0].GetHeader("To")
66-
subject := msgs[0].GetHeader("Subject")
67-
inreplyTo := msgs[0].GetHeader("In-Reply-To")
68-
references := msgs[0].GetHeader("References")
64+
gomailMsg := msgs[0].ToMessage()
65+
mailto := gomailMsg.GetHeader("To")
66+
subject := gomailMsg.GetHeader("Subject")
67+
inreplyTo := gomailMsg.GetHeader("In-Reply-To")
68+
references := gomailMsg.GetHeader("References")
6969

7070
assert.Len(t, mailto, 1, "exactly one recipient is expected in the To field")
7171
assert.Equal(t, "Re: ", subject[0][:4], "Comment reply subject should contain Re:")
@@ -96,14 +96,15 @@ func TestComposeIssueMessage(t *testing.T) {
9696
Content: "test body"}, tos, false, "issue create")
9797
assert.Len(t, msgs, 2)
9898

99-
mailto := msgs[0].GetHeader("To")
100-
subject := msgs[0].GetHeader("Subject")
101-
messageID := msgs[0].GetHeader("Message-ID")
99+
gomailMsg := msgs[0].ToMessage()
100+
mailto := gomailMsg.GetHeader("To")
101+
subject := gomailMsg.GetHeader("Subject")
102+
messageID := gomailMsg.GetHeader("Message-ID")
102103

103104
assert.Len(t, mailto, 1, "exactly one recipient is expected in the To field")
104105
assert.Equal(t, "[user2/repo1] @user2 #1 - issue1", subject[0])
105-
assert.Nil(t, msgs[0].GetHeader("In-Reply-To"))
106-
assert.Nil(t, msgs[0].GetHeader("References"))
106+
assert.Nil(t, gomailMsg.GetHeader("In-Reply-To"))
107+
assert.Nil(t, gomailMsg.GetHeader("References"))
107108
assert.Equal(t, messageID[0], "<user2/repo1/issues/1@localhost>", "Message-ID header doesn't match")
108109
}
109110

@@ -134,9 +135,9 @@ func TestTemplateSelection(t *testing.T) {
134135
InitMailRender(stpl, btpl)
135136

136137
expect := func(t *testing.T, msg *Message, expSubject, expBody string) {
137-
subject := msg.GetHeader("Subject")
138+
subject := msg.ToMessage().GetHeader("Subject")
138139
msgbuf := new(bytes.Buffer)
139-
_, _ = msg.WriteTo(msgbuf)
140+
_, _ = msg.ToMessage().WriteTo(msgbuf)
140141
wholemsg := msgbuf.String()
141142
assert.Equal(t, []string{expSubject}, subject)
142143
assert.Contains(t, wholemsg, expBody)
@@ -188,9 +189,9 @@ func TestTemplateServices(t *testing.T) {
188189
msg := testComposeIssueCommentMessage(t, &mailCommentContext{Issue: issue, Doer: doer, ActionType: actionType,
189190
Content: "test body", Comment: comment}, tos, fromMention, "TestTemplateServices")
190191

191-
subject := msg.GetHeader("Subject")
192+
subject := msg.ToMessage().GetHeader("Subject")
192193
msgbuf := new(bytes.Buffer)
193-
_, _ = msg.WriteTo(msgbuf)
194+
_, _ = msg.ToMessage().WriteTo(msgbuf)
194195
wholemsg := msgbuf.String()
195196

196197
assert.Equal(t, []string{expSubject}, subject)

services/mailer/mailer.go

+59-31
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ import (
1818
"time"
1919

2020
"code.gitea.io/gitea/modules/base"
21+
"code.gitea.io/gitea/modules/graceful"
2122
"code.gitea.io/gitea/modules/log"
23+
"code.gitea.io/gitea/modules/queue"
2224
"code.gitea.io/gitea/modules/setting"
2325

2426
"github.com/jaytaylor/html2text"
@@ -27,38 +29,63 @@ import (
2729

2830
// Message mail body and log info
2931
type Message struct {
30-
Info string // Message information for log purpose.
31-
*gomail.Message
32+
Info string // Message information for log purpose.
33+
FromAddress string
34+
FromDisplayName string
35+
To []string
36+
Subject string
37+
Date time.Time
38+
Body string
39+
Headers map[string][]string
3240
}
3341

34-
// NewMessageFrom creates new mail message object with custom From header.
35-
func NewMessageFrom(to []string, fromDisplayName, fromAddress, subject, body string) *Message {
36-
log.Trace("NewMessageFrom (body):\n%s", body)
37-
42+
// ToMessage converts a Message to gomail.Message
43+
func (m *Message) ToMessage() *gomail.Message {
3844
msg := gomail.NewMessage()
39-
msg.SetAddressHeader("From", fromAddress, fromDisplayName)
40-
msg.SetHeader("To", to...)
45+
msg.SetAddressHeader("From", m.FromAddress, m.FromDisplayName)
46+
msg.SetHeader("To", m.To...)
47+
for header := range m.Headers {
48+
msg.SetHeader(header, m.Headers[header]...)
49+
}
50+
4151
if len(setting.MailService.SubjectPrefix) > 0 {
42-
msg.SetHeader("Subject", setting.MailService.SubjectPrefix+" "+subject)
52+
msg.SetHeader("Subject", setting.MailService.SubjectPrefix+" "+m.Subject)
4353
} else {
44-
msg.SetHeader("Subject", subject)
54+
msg.SetHeader("Subject", m.Subject)
4555
}
46-
msg.SetDateHeader("Date", time.Now())
56+
msg.SetDateHeader("Date", m.Date)
4757
msg.SetHeader("X-Auto-Response-Suppress", "All")
4858

49-
plainBody, err := html2text.FromString(body)
59+
plainBody, err := html2text.FromString(m.Body)
5060
if err != nil || setting.MailService.SendAsPlainText {
51-
if strings.Contains(base.TruncateString(body, 100), "<html>") {
61+
if strings.Contains(base.TruncateString(m.Body, 100), "<html>") {
5262
log.Warn("Mail contains HTML but configured to send as plain text.")
5363
}
5464
msg.SetBody("text/plain", plainBody)
5565
} else {
5666
msg.SetBody("text/plain", plainBody)
57-
msg.AddAlternative("text/html", body)
67+
msg.AddAlternative("text/html", m.Body)
5868
}
69+
return msg
70+
}
71+
72+
// SetHeader adds additional headers to a message
73+
func (m *Message) SetHeader(field string, value ...string) {
74+
m.Headers[field] = value
75+
}
76+
77+
// NewMessageFrom creates new mail message object with custom From header.
78+
func NewMessageFrom(to []string, fromDisplayName, fromAddress, subject, body string) *Message {
79+
log.Trace("NewMessageFrom (body):\n%s", body)
5980

6081
return &Message{
61-
Message: msg,
82+
FromAddress: fromAddress,
83+
FromDisplayName: fromDisplayName,
84+
To: to,
85+
Subject: subject,
86+
Date: time.Now(),
87+
Body: body,
88+
Headers: map[string][]string{},
6289
}
6390
}
6491

@@ -257,18 +284,7 @@ func (s *dummySender) Send(from string, to []string, msg io.WriterTo) error {
257284
return nil
258285
}
259286

260-
func processMailQueue() {
261-
for msg := range mailQueue {
262-
log.Trace("New e-mail sending request %s: %s", msg.GetHeader("To"), msg.Info)
263-
if err := gomail.Send(Sender, msg.Message); err != nil {
264-
log.Error("Failed to send emails %s: %s - %v", msg.GetHeader("To"), msg.Info, err)
265-
} else {
266-
log.Trace("E-mails sent %s: %s", msg.GetHeader("To"), msg.Info)
267-
}
268-
}
269-
}
270-
271-
var mailQueue chan *Message
287+
var mailQueue queue.Queue
272288

273289
// Sender sender for sending mail synchronously
274290
var Sender gomail.Sender
@@ -291,22 +307,34 @@ func NewContext() {
291307
Sender = &dummySender{}
292308
}
293309

294-
mailQueue = make(chan *Message, setting.MailService.QueueLength)
295-
go processMailQueue()
310+
mailQueue = queue.CreateQueue("mail", func(data ...queue.Data) {
311+
for _, datum := range data {
312+
msg := datum.(*Message)
313+
gomailMsg := msg.ToMessage()
314+
log.Trace("New e-mail sending request %s: %s", gomailMsg.GetHeader("To"), msg.Info)
315+
if err := gomail.Send(Sender, gomailMsg); err != nil {
316+
log.Error("Failed to send emails %s: %s - %v", gomailMsg.GetHeader("To"), msg.Info, err)
317+
} else {
318+
log.Trace("E-mails sent %s: %s", gomailMsg.GetHeader("To"), msg.Info)
319+
}
320+
}
321+
}, &Message{})
322+
323+
go graceful.GetManager().RunWithShutdownFns(mailQueue.Run)
296324
}
297325

298326
// SendAsync send mail asynchronously
299327
func SendAsync(msg *Message) {
300328
go func() {
301-
mailQueue <- msg
329+
_ = mailQueue.Push(msg)
302330
}()
303331
}
304332

305333
// SendAsyncs send mails asynchronously
306334
func SendAsyncs(msgs []*Message) {
307335
go func() {
308336
for _, msg := range msgs {
309-
mailQueue <- msg
337+
_ = mailQueue.Push(msg)
310338
}
311339
}()
312340
}

0 commit comments

Comments
 (0)