from typing import Union
from datetime import datetime, timedelta
from PyObjCTools import AppHelper
import AppKit
import CoreBluetooth
import libdispatch
import threading
import os
from ..XATypes import XARectangle
from PyXA import XABase
from time import sleep
import objc
SCStreamOutput = objc.protocolNamed("SCStreamOutput")
[docs]
class XAScreen:
"""A reusable controller for screen-related functionality.
.. versionadded:: 0.3.0
"""
def __init__(self):
pass
[docs]
def capture(self) -> XABase.XAImage:
"""Captures the screen and returns it as a :class:`PyXA.XABase.XAImage` object.
:return: The resulting image.
:rtype: XAImage
.. versionadded:: 0.3.0
"""
import Quartz
img = Quartz.CGWindowListCreateImage(
Quartz.CGRectInfinite,
Quartz.kCGWindowListOptionOnScreenOnly,
Quartz.kCGNullWindowID,
Quartz.kCGWindowImageDefault,
)
nsimage = AppKit.NSImage.alloc().initWithCGImage_size_(
img, AppKit.NSScreen.mainScreen().frame().size
)
return XABase.XAImage(nsimage)
[docs]
def capture_rect(self, x: int, y: int, width: int, height: int) -> XABase.XAImage:
"""Captures the screen within the specified rectangle and returns it as a :class:`PyXA.XABase.XAImage` object.
:param rect: The rectangle to capture.
:type rect: XARect
:return: The resulting image.
:rtype: XAImage
.. versionadded:: 0.3.0
"""
import Quartz
img = Quartz.CGWindowListCreateImage(
Quartz.CGRectMake(x, y, width, height),
Quartz.kCGWindowListOptionOnScreenOnly,
Quartz.kCGNullWindowID,
Quartz.kCGWindowImageDefault,
)
nsimage = AppKit.NSImage.alloc().initWithCGImage_size_(
img, AppKit.NSScreen.mainScreen().frame().size
)
return XABase.XAImage(nsimage)
[docs]
def capture_window(
self, app: XABase.XAApplication, window_index: int
) -> XABase.XAImage:
"""Captures the specified window and returns it as a :class:`PyXA.XABase.XAImage` object.
:param app: The application to capture the window from.
:type app: XAApplication
:param window_index: The index of the window to capture.
:type window_index: int
:return: The resulting image.
:rtype: XAImage
.. versionadded:: 0.3.0
"""
import Quartz
windows = Quartz.CGWindowListCopyWindowInfo(
Quartz.kCGWindowListOptionOnScreenOnly, Quartz.kCGNullWindowID
)
app_windows = XABase.XAPredicate.evaluate_with_format(
windows, "kCGWindowOwnerName == %@", app.localized_name
)
window_id = None
i = 0
for window in app_windows:
if window["kCGWindowOwnerName"] == app.localized_name:
i += 1
if i == window_index:
window_id = window["kCGWindowNumber"]
break
img = Quartz.CGWindowListCreateImage(
Quartz.CGRectInfinite,
Quartz.kCGWindowListOptionIncludingWindow,
window_id,
Quartz.kCGWindowImageDefault,
)
nsimage = AppKit.NSImage.alloc().initWithCGImage_size_(
img, AppKit.NSScreen.mainScreen().frame().size
)
return XABase.XAImage(nsimage)
[docs]
def record(
self, file_path: Union[str, XABase.XAPath], duration: Union[float, None] = 10
) -> XABase.XAVideo:
import ScreenCaptureKit
import CoreMedia
import Quartz
import AVFoundation
if isinstance(file_path, XABase.XAPath):
file_path = file_path.path
global SCStreamOutput
samples = []
done = False
class SCStreamOutputDelegate(AppKit.NSObject, protocols=[SCStreamOutput]):
def stream_didOutputSampleBuffer_ofType_(
self, stream, sample_buffer, output_type
):
nonlocal samples
samples.append(sample_buffer)
def completion_handler(content, error):
if error is not None:
print(error)
return
display = content.displays().objectAtIndex_(0)
filter = ScreenCaptureKit.SCContentFilter.alloc().initWithDisplay_excludingWindows_(
display, AppKit.NSArray.alloc().init()
)
config = ScreenCaptureKit.SCStreamConfiguration.alloc().init()
config.setWidth_(display.frame().size.width)
config.setHeight_(display.frame().size.height)
interval = CoreMedia.CMTimeMake(1, 60)
config.setMinimumFrameInterval_(interval)
config.setQueueDepth_(60 * duration * 2)
stream = ScreenCaptureKit.SCStream.alloc().initWithFilter_configuration_delegate_(
filter, config, None
)
output = SCStreamOutputDelegate.alloc().init().retain()
queue = libdispatch.dispatch_get_global_queue(0, 0)
stream.addStreamOutput_type_sampleHandlerQueue_error_(
output, ScreenCaptureKit.SCStreamOutputTypeScreen, queue, None
)
def completion_handler(error):
nonlocal samples, done
output_url = AppKit.NSURL.fileURLWithPath_(file_path)
start_date = AppKit.NSDate.date()
while (
AppKit.NSDate.date().timeIntervalSinceDate_(start_date) < duration
):
AppKit.NSRunLoop.currentRunLoop().runUntilDate_(
datetime.now() + timedelta(seconds=1)
)
stream.stopCaptureWithCompletionHandler_(lambda error: None)
writer = AVFoundation.AVAssetWriter.alloc().initWithURL_fileType_error_(
output_url, AVFoundation.AVFileTypeMPEG4, None
)[0]
writer_settings = {
AVFoundation.AVVideoCodecKey: AVFoundation.AVVideoCodecTypeHEVC,
AVFoundation.AVVideoWidthKey: display.frame().size.width,
AVFoundation.AVVideoHeightKey: display.frame().size.height,
}
writer_input = AVFoundation.AVAssetWriterInput.alloc().initWithMediaType_outputSettings_sourceFormatHint_(
AVFoundation.AVMediaTypeVideo, writer_settings, None
)
writer_input.setExpectsMediaDataInRealTime_(False)
import CoreMedia
pixel_buffer_adaptor = AVFoundation.AVAssetWriterInputPixelBufferAdaptor.alloc().initWithAssetWriterInput_sourcePixelBufferAttributes_(
writer_input, None
)
writer.addInput_(writer_input)
writer.startWriting()
writer.startSessionAtSourceTime_(
CoreMedia.CMSampleBufferGetPresentationTimeStamp(samples[0])
)
for index, sample in enumerate(samples):
presentation_time = (
CoreMedia.CMSampleBufferGetPresentationTimeStamp(sample)
)
image_buffer_ref = CoreMedia.CMSampleBufferGetImageBuffer(sample)
if image_buffer_ref is not None:
pixel_buffer_adaptor.appendPixelBuffer_withPresentationTime_(
image_buffer_ref, presentation_time
)
while not writer_input.isReadyForMoreMediaData():
sleep(0.1)
writer.finishWriting()
done = True
stream.startCaptureWithCompletionHandler_(completion_handler)
ScreenCaptureKit.SCShareableContent.getShareableContentWithCompletionHandler_(
completion_handler
)
while not done:
AppKit.NSRunLoop.currentRunLoop().runUntilDate_(
datetime.now() + timedelta(seconds=0.1)
)
return XABase.XAVideo(file_path)
[docs]
class XACamera:
"""A reusable controller for camera-related functionality.
.. versionadded:: 0.3.0
"""
def __init__(self):
pass
[docs]
def capture(self) -> XABase.XAImage:
import CoreMedia
import Quartz
import AVFoundation
img = None
session = AVFoundation.AVCaptureSession.alloc().init()
class CaptureDelegate(AppKit.NSObject):
def captureOutput_didOutputSampleBuffer_fromConnection_(
self, output, sample_buffer, connection
):
nonlocal img
AppHelper.stopEventLoop()
session.stopRunning()
if img is not None:
return
image_buffer_ref = CoreMedia.CMSampleBufferGetImageBuffer(sample_buffer)
ci_image = Quartz.CIImage.imageWithCVPixelBuffer_(image_buffer_ref)
image = AppKit.NSImage.alloc().initWithCGImage_size_(
Quartz.CIContext.context().createCGImage_fromRect_(
ci_image, ci_image.extent()
),
AppKit.NSMakeSize(1920, 1080),
)
img = XABase.XAImage(image)
device = AVFoundation.AVCaptureDevice.defaultDeviceWithMediaType_(
AVFoundation.AVMediaTypeVideo
)
AVFoundation.AVCaptureDevice.requestAccessForMediaType_completionHandler_(
AVFoundation.AVMediaTypeVideo, None
)
deviceInput = AVFoundation.AVCaptureDeviceInput.deviceInputWithDevice_error_(
device, None
)[0]
output = AVFoundation.AVCaptureVideoDataOutput.alloc().init().retain()
video_settings = (
output.recommendedVideoSettingsForAssetWriterWithOutputFileType_(
AVFoundation.AVFileTypeAppleM4V
)
)
output.setVideoSettings_(video_settings)
output.setAlwaysDiscardsLateVideoFrames_(True)
# Add input and output to the session (enable the video output)
session.beginConfiguration()
session.addInput_(deviceInput)
session.addOutput_(output)
session.commitConfiguration()
delegate = CaptureDelegate.alloc().init().retain()
queue = libdispatch.dispatch_get_main_queue()
session.startRunning()
sleep(1)
output.setSampleBufferDelegate_queue_(delegate, queue)
while img is None:
AppKit.NSRunLoop.currentRunLoop().runUntilDate_(
datetime.now() + timedelta(seconds=0.1)
)
return img
[docs]
def record(
self, file_path: Union[str, XABase.XAPath], duration: Union[float, None] = 10
) -> XABase.XAVideo:
"""Records a video for the specified duration, saving into the provided file path.
:param file_path: The file path to save the video at.
:type file_path: Union[str, XAPath]
:param duration: The duration of the video, in seconds, or None to record continuously until the script is canceled, defaults to 10.
:type duration: Union[float, None], optional
:return: The resulting video as a PyXA object.
:rtype: XAVideo
.. versionadded:: 0.3.0
"""
import AVFoundation
session = AVFoundation.AVCaptureSession.alloc().init()
device = AVFoundation.AVCaptureDevice.defaultDeviceWithMediaType_(
AVFoundation.AVMediaTypeVideo
)
AVFoundation.AVCaptureDevice.requestAccessForMediaType_completionHandler_(
AVFoundation.AVMediaTypeVideo, None
)
deviceInput = AVFoundation.AVCaptureDeviceInput.deviceInputWithDevice_error_(
device, None
)[0]
output = AVFoundation.AVCaptureMovieFileOutput.alloc().init().retain()
# Add input and output to the session (enable the video output)
session.beginConfiguration()
session.addInput_(deviceInput)
session.addOutput_(output)
session.commitConfiguration()
session.startRunning()
if isinstance(file_path, XABase.XAPath):
file_path = file_path.path
url = XABase.XAURL(file_path)
output.startRecordingToOutputFileURL_recordingDelegate_(url.xa_elem, self)
if duration is None:
AppHelper.runConsoleEventLoop()
else:
AppKit.NSRunLoop.currentRunLoop().runUntilDate_(
datetime.now() + timedelta(seconds=duration)
)
return XABase.XAVideo(file_path)
[docs]
class XAMicrophone:
"""A reusable controller for microphone-related functionality.
.. versionadded:: 0.3.0
"""
def __init__(self):
pass
[docs]
def record(
self, file_path: Union[str, XABase.XAPath], duration: Union[float, None] = 10
) -> XABase.XASound:
"""Records audio for the specified duration, saving into the provided file path.
:param file_path: The file path to save the audio at.
:type file_path: Union[str, XAPath]
:param duration: The duration of the audio, in seconds, or None to record continuously until the script is canceled, defaults to 10.
:type duration: Union[float, None], optional
:return: The resulting audio as a PyXA object.
:rtype: XASound
.. versionadded:: 0.3.0
"""
import AVFoundation
session = AVFoundation.AVCaptureSession.alloc().init()
session.setSessionPreset_(AVFoundation.AVCaptureSessionPresetPhoto)
device = AVFoundation.AVCaptureDevice.defaultDeviceWithMediaType_(
AVFoundation.AVMediaTypeAudio
)
AVFoundation.AVCaptureDevice.requestAccessForMediaType_completionHandler_(
AVFoundation.AVMediaTypeAudio, None
)
deviceInput = AVFoundation.AVCaptureDeviceInput.deviceInputWithDevice_error_(
device, None
)[0]
output = AVFoundation.AVCaptureAudioFileOutput.alloc().init().retain()
# Add input and output to the session (enable the video output)
session.beginConfiguration()
session.addInput_(deviceInput)
session.addOutput_(output)
session.commitConfiguration()
session.startRunning()
if isinstance(file_path, XABase.XAPath):
file_path = file_path.path
url = XABase.XAURL(file_path)
file_type = AVFoundation.AVFileTypeAIFF
if file_path.lower().endswith("aiff"):
file_type = AVFoundation.AVFileTypeAIFF
elif file_path.lower().endswith("wav"):
file_type = AVFoundation.AVFileTypeWAVE
output.startRecordingToOutputFileURL_outputFileType_recordingDelegate_(
url.xa_elem, file_type, self
)
if duration is None:
AppHelper.runConsoleEventLoop()
else:
AppKit.NSRunLoop.currentRunLoop().runUntilDate_(
datetime.now() + timedelta(seconds=duration)
)
output.stopRecording()
AppKit.NSRunLoop.currentRunLoop().runUntilDate_(
datetime.now() + timedelta(seconds=1)
)
while output.isRecording():
sleep(0.1)
return XABase.XASound(file_path)