1+ // Copyright (c) Microsoft Corporation. All rights reserved.
2+ // Licensed under the MIT License.
3+
4+ using System ;
5+ using System . Collections . Concurrent ;
6+ using Azure . Core ;
7+ using Azure . Messaging . EventHubs . Consumer ;
8+ using Azure . Messaging . EventHubs . Producer ;
9+ using Azure . Storage . Blobs ;
10+ using Microsoft . Azure . WebJobs . EventHubs . Processor ;
11+ using Microsoft . Azure . WebJobs . Extensions . Clients . Shared ;
12+ using Microsoft . Azure . WebJobs . Host ;
13+ using Microsoft . Extensions . Azure ;
14+ using Microsoft . Extensions . Configuration ;
15+ using Microsoft . Extensions . Options ;
16+
17+ namespace Microsoft . Azure . WebJobs . EventHubs
18+ {
19+ internal class EventHubClientFactory
20+ {
21+ private readonly IConfiguration _configuration ;
22+ private readonly AzureComponentFactory _componentFactory ;
23+ private readonly EventHubOptions _options ;
24+ private readonly INameResolver _nameResolver ;
25+ private readonly ConcurrentDictionary < string , EventHubProducerClient > _producerCache ;
26+ private readonly ConcurrentDictionary < string , IEventHubConsumerClient > _consumerCache = new ( ) ;
27+
28+ public EventHubClientFactory (
29+ IConfiguration configuration ,
30+ AzureComponentFactory componentFactory ,
31+ IOptions < EventHubOptions > options ,
32+ INameResolver nameResolver )
33+ {
34+ _configuration = configuration ;
35+ _componentFactory = componentFactory ;
36+ _options = options . Value ;
37+ _nameResolver = nameResolver ;
38+ _producerCache = new ConcurrentDictionary < string , EventHubProducerClient > ( _options . RegisteredProducers ) ;
39+ }
40+
41+ internal EventHubProducerClient GetEventHubProducerClient ( string eventHubName , string connection )
42+ {
43+ eventHubName = _nameResolver . ResolveWholeString ( eventHubName ) ;
44+
45+ return _producerCache . GetOrAdd ( eventHubName , key =>
46+ {
47+ if ( ! string . IsNullOrWhiteSpace ( connection ) )
48+ {
49+ var info = ResolveConnectionInformation ( connection ) ;
50+
51+ if ( info . FullyQualifiedEndpoint != null &&
52+ info . TokenCredential != null )
53+ {
54+ return new EventHubProducerClient ( info . FullyQualifiedEndpoint , eventHubName , info . TokenCredential ) ;
55+ }
56+
57+ return new EventHubProducerClient ( NormalizeConnectionString ( info . ConnectionString , eventHubName ) ) ;
58+ }
59+
60+ throw new InvalidOperationException ( "No event hub sender named " + eventHubName ) ;
61+ } ) ;
62+ }
63+
64+ internal EventProcessorHost GetEventProcessorHost ( string eventHubName , string connection , string consumerGroup )
65+ {
66+ eventHubName = _nameResolver . ResolveWholeString ( eventHubName ) ;
67+ consumerGroup ??= EventHubConsumerClient . DefaultConsumerGroupName ;
68+
69+ if ( _options . RegisteredConsumerCredentials . TryGetValue ( eventHubName , out var creds ) )
70+ {
71+ return new EventProcessorHost ( consumerGroup : consumerGroup ,
72+ connectionString : creds . EventHubConnectionString ,
73+ eventHubName : eventHubName ,
74+ options : _options . EventProcessorOptions ,
75+ eventBatchMaximumCount : _options . MaxBatchSize ,
76+ invokeProcessorAfterReceiveTimeout : _options . InvokeProcessorAfterReceiveTimeout ,
77+ exceptionHandler : _options . ExceptionHandler ) ;
78+ }
79+ else if ( ! string . IsNullOrEmpty ( connection ) )
80+ {
81+ var info = ResolveConnectionInformation ( connection ) ;
82+
83+ if ( info . FullyQualifiedEndpoint != null &&
84+ info . TokenCredential != null )
85+ {
86+ return new EventProcessorHost ( consumerGroup : consumerGroup ,
87+ fullyQualifiedNamespace : info . FullyQualifiedEndpoint ,
88+ eventHubName : eventHubName ,
89+ credential : info . TokenCredential ,
90+ options : _options . EventProcessorOptions ,
91+ eventBatchMaximumCount : _options . MaxBatchSize ,
92+ invokeProcessorAfterReceiveTimeout : _options . InvokeProcessorAfterReceiveTimeout ,
93+ exceptionHandler : _options . ExceptionHandler ) ;
94+ }
95+
96+ return new EventProcessorHost ( consumerGroup : consumerGroup ,
97+ connectionString : NormalizeConnectionString ( info . ConnectionString , eventHubName ) ,
98+ eventHubName : eventHubName ,
99+ options : _options . EventProcessorOptions ,
100+ eventBatchMaximumCount : _options . MaxBatchSize ,
101+ invokeProcessorAfterReceiveTimeout : _options . InvokeProcessorAfterReceiveTimeout ,
102+ exceptionHandler : _options . ExceptionHandler ) ;
103+ }
104+
105+ throw new InvalidOperationException ( "No event hub receiver named " + eventHubName ) ;
106+ }
107+
108+ internal IEventHubConsumerClient GetEventHubConsumerClient ( string eventHubName , string connection , string consumerGroup )
109+ {
110+ eventHubName = _nameResolver . ResolveWholeString ( eventHubName ) ;
111+ consumerGroup ??= EventHubConsumerClient . DefaultConsumerGroupName ;
112+
113+ return _consumerCache . GetOrAdd ( eventHubName , name =>
114+ {
115+ EventHubConsumerClient client = null ;
116+ if ( _options . RegisteredConsumerCredentials . TryGetValue ( eventHubName , out var creds ) )
117+ {
118+ client = new EventHubConsumerClient ( consumerGroup , creds . EventHubConnectionString , eventHubName ) ;
119+ }
120+ else if ( ! string . IsNullOrEmpty ( connection ) )
121+ {
122+ var info = ResolveConnectionInformation ( connection ) ;
123+
124+ if ( info . FullyQualifiedEndpoint != null &&
125+ info . TokenCredential != null )
126+ {
127+ client = new EventHubConsumerClient ( consumerGroup , info . FullyQualifiedEndpoint , eventHubName , info . TokenCredential ) ;
128+ }
129+ else
130+ {
131+ client = new EventHubConsumerClient ( consumerGroup , NormalizeConnectionString ( info . ConnectionString , eventHubName ) ) ;
132+ }
133+ }
134+
135+ if ( client != null )
136+ {
137+ return new EventHubConsumerClientImpl ( client ) ;
138+ }
139+
140+ throw new InvalidOperationException ( "No event hub receiver named " + eventHubName ) ;
141+ } ) ;
142+ }
143+
144+ internal BlobContainerClient GetCheckpointStoreClient ( string eventHubName )
145+ {
146+ string storageConnectionString = null ;
147+ if ( _options . RegisteredConsumerCredentials . TryGetValue ( eventHubName , out var creds ) )
148+ {
149+ storageConnectionString = creds . StorageConnectionString ;
150+ }
151+
152+ // Fall back to default if not explicitly registered
153+ return new BlobContainerClient ( storageConnectionString ?? _configuration . GetWebJobsConnectionString ( ConnectionStringNames . Storage ) , _options . LeaseContainerName ) ;
154+ }
155+
156+ internal static string NormalizeConnectionString ( string originalConnectionString , string eventHubName )
157+ {
158+ var connectionString = ConnectionString . Parse ( originalConnectionString ) ;
159+
160+ if ( ! connectionString . ContainsSegmentKey ( "EntityPath" ) )
161+ {
162+ connectionString . Add ( "EntityPath" , eventHubName ) ;
163+ }
164+
165+ return connectionString . ToString ( ) ;
166+ }
167+
168+ private EventHubsConnectionInformation ResolveConnectionInformation ( string connection )
169+ {
170+ IConfigurationSection connectionSection = _configuration . GetWebJobsConnectionStringSection ( connection ) ;
171+ if ( ! connectionSection . Exists ( ) )
172+ {
173+ // Not found
174+ throw new InvalidOperationException ( $ "EventHub account connection string '{ connection } ' does not exist." +
175+ $ "Make sure that it is a defined App Setting.") ;
176+ }
177+
178+ if ( ! string . IsNullOrWhiteSpace ( connectionSection . Value ) )
179+ {
180+ return new EventHubsConnectionInformation ( connectionSection . Value ) ;
181+ }
182+
183+ var fullyQualifiedNamespace = connectionSection [ "fullyQualifiedNamespace" ] ;
184+ if ( string . IsNullOrWhiteSpace ( fullyQualifiedNamespace ) )
185+ {
186+ // Not found
187+ throw new InvalidOperationException ( $ "Connection should have an 'fullyQualifiedNamespace' property or be a string representing a connection string.") ;
188+ }
189+
190+ var credential = _componentFactory . CreateTokenCredential ( connectionSection ) ;
191+
192+ return new EventHubsConnectionInformation ( fullyQualifiedNamespace , credential ) ;
193+ }
194+
195+ private record EventHubsConnectionInformation
196+ {
197+ public EventHubsConnectionInformation ( string connectionString )
198+ {
199+ ConnectionString = connectionString ;
200+ }
201+
202+ public EventHubsConnectionInformation ( string fullyQualifiedEndpoint , TokenCredential tokenCredential )
203+ {
204+ FullyQualifiedEndpoint = fullyQualifiedEndpoint ;
205+ TokenCredential = tokenCredential ;
206+ }
207+
208+ public string ConnectionString { get ; }
209+ public string FullyQualifiedEndpoint { get ; }
210+ public TokenCredential TokenCredential { get ; }
211+ }
212+ }
213+ }
0 commit comments