Skip to content

Add option to save off .pose files for each segment #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
66 changes: 49 additions & 17 deletions sign_language_segmentation/bin.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/usr/bin/env python
from pathlib import Path
import argparse
import os

Expand All @@ -11,7 +12,7 @@
from sign_language_segmentation.src.utils.probs_to_segments import probs_to_segments


def add_optical_flow(pose: Pose):
def add_optical_flow(pose: Pose)->None:
from pose_format.numpy.representation.distance import DistanceRepresentation
from pose_format.utils.optical_flow import OpticalFlowCalculator

Expand All @@ -25,7 +26,7 @@ def add_optical_flow(pose: Pose):
pose.body.data = np.concatenate([pose.body.data, flow], axis=-1).astype(np.float32)


def process_pose(pose: Pose, optical_flow=False, hand_normalization=False):
def process_pose(pose: Pose, optical_flow=False, hand_normalization=False) -> Pose:
pose = pose.get_components(["POSE_LANDMARKS", "LEFT_HAND_LANDMARKS", "RIGHT_HAND_LANDMARKS"])

normalization_info = pose_normalization_info(pose.header)
Expand Down Expand Up @@ -57,40 +58,59 @@ def predict(model, pose: Pose):
return model(pose_data)


def save_pose_segments(tiers:dict, tier_id:str, input_file_path:Path)->None:
# reload it without any of the processing, so we get all the original points and such.
with input_file_path.open("rb") as f:
pose = Pose.read(f.read())

for i, segment in enumerate(tiers[tier_id]):
out_path = input_file_path.parent / f"{input_file_path.stem}_{tier_id}_{i}.pose"
start_frame = int(segment["start"])
end_frame = int(segment["end"])
cropped_pose = Pose(header=pose.header, body=pose.body[start_frame:end_frame])

print(f"Saving cropped pose with start {start_frame} and end {end_frame} to {out_path}")
with out_path.open("wb") as f:
cropped_pose.write(f)


def get_args():
parser = argparse.ArgumentParser()
parser.add_argument('--pose', required=True, type=str, help='path to input pose file')
parser.add_argument('--elan', required=True, type=str, help='path to output elan file')
parser.add_argument('--video', default=None, required=False, type=str, help='path to video file')
parser.add_argument('--subtitles', default=None, required=False, type=str, help='path to subtitle file')
parser.add_argument('--model', default='model_E1s-1.pth', required=False, type=str, help='path to model file')
parser.add_argument('--no-pose-link', action='store_true', help='whether to link the pose file')
parser.add_argument("--pose", required=True, type=Path, help="path to input pose file")
parser.add_argument("--elan", required=True, type=str, help="path to output elan file")
parser.add_argument(
"--save-segments", type=str, choices=["SENTENCE", "SIGN"], help="whether to save cropped .pose files"
)
parser.add_argument("--video", default=None, required=False, type=str, help="path to video file")
parser.add_argument("--subtitles", default=None, required=False, type=str, help="path to subtitle file")
parser.add_argument("--model", default="model_E1s-1.pth", required=False, type=str, help="path to model file")
parser.add_argument("--no-pose-link", action="store_true", help="whether to link the pose file")

return parser.parse_args()


def main():
args = get_args()

print('Loading pose ...')
print("Loading pose ...")
with open(args.pose, "rb") as f:
pose = Pose.read(f.read())
if 'E4' in args.model:
if "E4" in args.model:
pose = process_pose(pose, optical_flow=True, hand_normalization=True)
else:
pose = process_pose(pose)

print('Loading model ...')
print("Loading model ...")
install_dir = str(os.path.dirname(os.path.abspath(__file__)))
model = load_model(os.path.join(install_dir, "dist", args.model))

print('Estimating segments ...')
print("Estimating segments ...")
probs = predict(model, pose)

sign_segments = probs_to_segments(probs["sign"], 60, 50)
sentence_segments = probs_to_segments(probs["sentence"], 90, 90)

print('Building ELAN file ...')
print("Building ELAN file ...")
tiers = {
"SIGN": sign_segments,
"SENTENCE": sentence_segments,
Expand All @@ -110,21 +130,33 @@ def main():

for tier_id, segments in tiers.items():
eaf.add_tier(tier_id)
frames_per_millisecond = fps*1000
for segment in segments:
eaf.add_annotation(tier_id, int(segment["start"] / fps * 1000), int(segment["end"] / fps * 1000))
# convert frame numbers to millisecond timestamps, for Elan
start_frame_time = int(segment["start"] / fps * frames_per_millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this implementation is now wrong.
Was:

segment["start"] / fps * 1000

Now:

segment["start"] / fps * fps * 1000

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest

start_time = int(segment["start"] / fps * 1000)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it was wrong indeed! Fixed now

end_frame_time = int(segment["end"] / fps * frames_per_millisecond)
eaf.add_annotation(tier_id, start_frame_time, end_frame_time)

if args.save_segments:
print(f"Saving {args.save_segments} cropped .pose files")
save_pose_segments(tiers, tier_id=args.save_segments, input_file_path=args.pose)

if args.subtitles and os.path.exists(args.subtitles):
import srt

eaf.add_tier("SUBTITLE")
with open(args.subtitles, "r") as infile:
# open with explicit encoding,
# as directed in https://github.com/cdown/srt/blob/master/srt_tools/utils.py#L155-L160
# see also https://github.com/cdown/srt/issues/67, https://github.com/cdown/srt/issues/36
with open(args.subtitles, "r", encoding="utf-8-sig") as infile:
for subtitle in srt.parse(infile):
start = subtitle.start.total_seconds()
end = subtitle.end.total_seconds()
eaf.add_annotation("SUBTITLE", int(start * 1000), int(end * 1000), subtitle.content)

print('Saving to disk ...')
print("Saving .eaf to disk ...")
eaf.to_file(args.elan)


if __name__ == '__main__':
if __name__ == "__main__":
main()
Loading