Skip to content

Commit a2220ed

Browse files
author
Xinyu Liu
committed
SAMZA-1245: Make stream samza.physical.name config name string public
For certain system such as hdfs, the physical stream name might need to be finalized during the config generation. In order to do that, we will need to expose the stream samza.physical.name config string. Author: Xinyu Liu <[email protected]> Reviewers: Jake Maes <[email protected]> Closes apache#145 from xinyuiscool/SAMZA-1245
1 parent 9f44867 commit a2220ed

File tree

2 files changed

+6
-1
lines changed

2 files changed

+6
-1
lines changed

samza-core/src/main/java/org/apache/samza/execution/StreamManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.List;
2626
import java.util.Map;
2727
import java.util.Set;
28+
import org.apache.samza.SamzaException;
2829
import org.apache.samza.system.StreamSpec;
2930
import org.apache.samza.system.SystemAdmin;
3031
import org.apache.samza.system.SystemStreamMetadata;
@@ -62,6 +63,10 @@ Map<String, Integer> getStreamPartitionCounts(String systemName, Set<String> str
6263
Map<String, Integer> streamToPartitionCount = new HashMap<>();
6364

6465
SystemAdmin systemAdmin = sysAdmins.get(systemName);
66+
if (systemAdmin == null) {
67+
throw new SamzaException(String.format("System %s does not exist.", systemName));
68+
}
69+
6570
// retrieve the metadata for the streams in this system
6671
Map<String, SystemStreamMetadata> streamToMetadata = systemAdmin.getSystemStreamMetadata(streamNames);
6772
// set the partitions of a stream to its StreamEdge

samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ object StreamConfig {
4343

4444
protected val STREAM_ID_PREFIX = STREAMS_PREFIX + "%s."
4545
protected val SYSTEM_FOR_STREAM_ID = STREAM_ID_PREFIX + SYSTEM
46-
protected val PHYSICAL_NAME_FOR_STREAM_ID = STREAM_ID_PREFIX + PHYSICAL_NAME
46+
val PHYSICAL_NAME_FOR_STREAM_ID = STREAM_ID_PREFIX + PHYSICAL_NAME
4747

4848
implicit def Config2Stream(config: Config) = new StreamConfig(config)
4949
}

0 commit comments

Comments
 (0)