diff --git a/CHANGELOG.md b/CHANGELOG.md index 86ae72006d..de880ef300 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,8 @@ the release. ([2214](https://github.com/open-telemetry/opentelemetry-demo/pull/2214)) * [quote] replace debian image with latest alpine image ([2216](https://github.com/open-telemetry/opentelemetry-demo/pull/2216)) +* [load-generator] Update locustfile for logging with TraceContext + ([2265](https://github.com/open-telemetry/opentelemetry-demo/pull/2265)) ## 2.0.2 diff --git a/src/load-generator/locustfile.py b/src/load-generator/locustfile.py index 4f86fe4b79..7f0a737dff 100644 --- a/src/load-generator/locustfile.py +++ b/src/load-generator/locustfile.py @@ -25,6 +25,7 @@ from opentelemetry.instrumentation.requests import RequestsInstrumentor from opentelemetry.instrumentation.system_metrics import SystemMetricsInstrumentor from opentelemetry.instrumentation.urllib3 import URLLib3Instrumentor +from opentelemetry.instrumentation.logging import LoggingInstrumentor from opentelemetry._logs import set_logger_provider from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( OTLPLogExporter, @@ -39,34 +40,41 @@ from playwright.async_api import Route, Request -logger_provider = LoggerProvider(resource=Resource.create( - { - "service.name": "load-generator", - } - ),) +# Configure tracer provider first (needed for trace context in logs) +tracer_provider = TracerProvider() +trace.set_tracer_provider(tracer_provider) +tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(insecure=True))) + +# Configure logger provider with the same resource +logger_provider = LoggerProvider() set_logger_provider(logger_provider) -exporter = OTLPLogExporter(insecure=True) -logger_provider.add_log_record_processor(BatchLogRecordProcessor(exporter)) +# Set up log exporter and processor +log_exporter = OTLPLogExporter(insecure=True) +logger_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter)) + +# Create logging handler that will include trace context handler = LoggingHandler(level=logging.INFO, logger_provider=logger_provider) -# Attach OTLP handler to locust logger -logging.getLogger().addHandler(handler) -logging.getLogger().setLevel(logging.INFO) +# Configure root logger +root_logger = logging.getLogger() +root_logger.addHandler(handler) +root_logger.setLevel(logging.INFO) -exporter = OTLPMetricExporter(insecure=True) -set_meter_provider(MeterProvider([PeriodicExportingMetricReader(exporter)])) +# Configure metrics +metric_exporter = OTLPMetricExporter(insecure=True) +set_meter_provider(MeterProvider([PeriodicExportingMetricReader(metric_exporter)])) -tracer_provider = TracerProvider() -trace.set_tracer_provider(tracer_provider) -tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) +# Instrument logging to automatically inject trace context +LoggingInstrumentor().instrument(set_logging_format=True) # Instrumenting manually to avoid error with locust gevent monkey Jinja2Instrumentor().instrument() RequestsInstrumentor().instrument() SystemMetricsInstrumentor().instrument() URLLib3Instrumentor().instrument() -logging.info("Instrumentation complete") + +logging.info("Instrumentation complete - logs will now include trace context") # Initialize Flagd provider base_url = f"http://{os.environ.get('FLAGD_HOST', 'localhost')}:{os.environ.get('FLAGD_OFREP_PORT', 8016)}" @@ -107,76 +115,110 @@ def get_flagd_value(FlagName): class WebsiteUser(HttpUser): wait_time = between(1, 10) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.tracer = trace.get_tracer(__name__) + @task(1) def index(self): - self.client.get("/") + with self.tracer.start_as_current_span("user_index"): + logging.info("User accessing index page") + self.client.get("/") @task(10) def browse_product(self): - self.client.get("/api/products/" + random.choice(products)) + product = random.choice(products) + with self.tracer.start_as_current_span("user_browse_product", attributes={"product.id": product}): + logging.info(f"User browsing product: {product}") + self.client.get("/api/products/" + product) @task(3) def get_recommendations(self): - params = { - "productIds": [random.choice(products)], - } - self.client.get("/api/recommendations", params=params) + product = random.choice(products) + with self.tracer.start_as_current_span("user_get_recommendations", attributes={"product.id": product}): + logging.info(f"User getting recommendations for product: {product}") + params = { + "productIds": [product], + } + self.client.get("/api/recommendations", params=params) @task(3) def get_ads(self): - params = { - "contextKeys": [random.choice(categories)], - } - self.client.get("/api/data/", params=params) + category = random.choice(categories) + with self.tracer.start_as_current_span("user_get_ads", attributes={"category": str(category)}): + logging.info(f"User getting ads for category: {category}") + params = { + "contextKeys": [category], + } + self.client.get("/api/data/", params=params) @task(3) def view_cart(self): - self.client.get("/api/cart") + with self.tracer.start_as_current_span("user_view_cart"): + logging.info("User viewing cart") + self.client.get("/api/cart") @task(2) def add_to_cart(self, user=""): if user == "": user = str(uuid.uuid1()) product = random.choice(products) - self.client.get("/api/products/" + product) - cart_item = { - "item": { - "productId": product, - "quantity": random.choice([1, 2, 3, 4, 5, 10]), - }, - "userId": user, - } - self.client.post("/api/cart", json=cart_item) + quantity = random.choice([1, 2, 3, 4, 5, 10]) + with self.tracer.start_as_current_span("user_add_to_cart", + attributes={"user.id": user, "product.id": product, "quantity": quantity}): + logging.info(f"User {user} adding {quantity} of product {product} to cart") + self.client.get("/api/products/" + product) + cart_item = { + "item": { + "productId": product, + "quantity": quantity, + }, + "userId": user, + } + self.client.post("/api/cart", json=cart_item) @task(1) def checkout(self): # checkout call with an item added to cart user = str(uuid.uuid1()) - self.add_to_cart(user=user) - checkout_person = random.choice(people) - checkout_person["userId"] = user - self.client.post("/api/checkout", json=checkout_person) + with self.tracer.start_as_current_span("user_checkout_single", attributes={"user.id": user}): + self.add_to_cart(user=user) + checkout_person = random.choice(people) + checkout_person["userId"] = user + self.client.post("/api/checkout", json=checkout_person) + logging.info(f"Checkout completed for user {user}") @task(1) def checkout_multi(self): # checkout call which adds 2-4 different items to cart before checkout user = str(uuid.uuid1()) - for i in range(random.choice([2, 3, 4])): - self.add_to_cart(user=user) - checkout_person = random.choice(people) - checkout_person["userId"] = user - self.client.post("/api/checkout", json=checkout_person) + item_count = random.choice([2, 3, 4]) + with self.tracer.start_as_current_span("user_checkout_multi", + attributes={"user.id": user, "item.count": item_count}): + for i in range(item_count): + self.add_to_cart(user=user) + checkout_person = random.choice(people) + checkout_person["userId"] = user + self.client.post("/api/checkout", json=checkout_person) + logging.info(f"Multi-item checkout completed for user {user}") @task(5) def flood_home(self): - for _ in range(0, get_flagd_value("loadGeneratorFloodHomepage")): - self.client.get("/") + flood_count = get_flagd_value("loadGeneratorFloodHomepage") + if flood_count > 0: + with self.tracer.start_as_current_span("user_flood_home", attributes={"flood.count": flood_count}): + logging.info(f"User flooding homepage {flood_count} times") + for _ in range(0, flood_count): + self.client.get("/") def on_start(self): - ctx = baggage.set_baggage("session.id", str(uuid.uuid4())) - ctx = baggage.set_baggage("synthetic_request", "true", context=ctx) - context.attach(ctx) - self.index() + with self.tracer.start_as_current_span("user_session_start"): + session_id = str(uuid.uuid4()) + logging.info(f"Starting user session: {session_id}") + ctx = baggage.set_baggage("session.id", session_id) + ctx = baggage.set_baggage("synthetic_request", "true", context=ctx) + context.attach(ctx) + self.index() browser_traffic_enabled = os.environ.get("LOCUST_BROWSER_TRAFFIC_ENABLED", "").lower() in ("true", "yes", "on") @@ -185,30 +227,38 @@ def on_start(self): class WebsiteBrowserUser(PlaywrightUser): headless = True # to use a headless browser, without a GUI + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.tracer = trace.get_tracer(__name__) + @task @pw async def open_cart_page_and_change_currency(self, page: PageWithRetry): - try: - page.on("console", lambda msg: print(msg.text)) - await page.route('**/*', add_baggage_header) - await page.goto("/cart", wait_until="domcontentloaded") - await page.select_option('[name="currency_code"]', 'CHF') - await page.wait_for_timeout(2000) # giving the browser time to export the traces - except: - pass + with self.tracer.start_as_current_span("browser_change_currency"): + try: + page.on("console", lambda msg: print(msg.text)) + await page.route('**/*', add_baggage_header) + await page.goto("/cart", wait_until="domcontentloaded") + await page.select_option('[name="currency_code"]', 'CHF') + await page.wait_for_timeout(2000) # giving the browser time to export the traces + logging.info("Currency changed to CHF") + except Exception as e: + logging.error(f"Error in change currency task: {str(e)}") @task @pw async def add_product_to_cart(self, page: PageWithRetry): - try: - page.on("console", lambda msg: print(msg.text)) - await page.route('**/*', add_baggage_header) - await page.goto("/", wait_until="domcontentloaded") - await page.click('p:has-text("Roof Binoculars")', wait_until="domcontentloaded") - await page.click('button:has-text("Add To Cart")', wait_until="domcontentloaded") - await page.wait_for_timeout(2000) # giving the browser time to export the traces - except: - pass + with self.tracer.start_as_current_span("browser_add_to_cart"): + try: + page.on("console", lambda msg: print(msg.text)) + await page.route('**/*', add_baggage_header) + await page.goto("/", wait_until="domcontentloaded") + await page.click('p:has-text("Roof Binoculars")', wait_until="domcontentloaded") + await page.click('button:has-text("Add To Cart")', wait_until="domcontentloaded") + await page.wait_for_timeout(2000) # giving the browser time to export the traces + logging.info("Product added to cart successfully") + except Exception as e: + logging.error(f"Error in add to cart task: {str(e)}") async def add_baggage_header(route: Route, request: Request): diff --git a/src/load-generator/requirements.txt b/src/load-generator/requirements.txt index 8dff8b3267..d478e77c66 100644 --- a/src/load-generator/requirements.txt +++ b/src/load-generator/requirements.txt @@ -11,3 +11,4 @@ opentelemetry-instrumentation-system-metrics==0.55b1 opentelemetry-instrumentation-urllib3==0.55b1 opentelemetry-sdk==1.34.1 opentelemetry-semantic-conventions==0.55b1 +opentelemetry-instrumentation-logging==0.55b1