summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGabe Kangas <gabek@real-ity.com>2023-03-27 16:00:22 -0700
committerGabe Kangas <gabek@real-ity.com>2023-03-27 16:07:01 -0700
commitf10e5a23b55609be41ddacc9f81578330e57af7f (patch)
treed7fca232e2f6dd9b259af1a2cce2b2775aa3459a
parent7b6d71f8f62b58a01239cd9708b9efa225fd9296 (diff)
Add support for S3 cleanup + standardize firing cleanup. Closes #2646gek/hls-cleanup-refactor
-rw-r--r--core/storageproviders/local.go85
-rw-r--r--core/storageproviders/s3Storage.go94
-rw-r--r--core/streamState.go4
-rw-r--r--core/transcoder/hlsFilesystemCleanup.go78
-rw-r--r--models/storageProvider.go2
5 files changed, 168 insertions, 95 deletions
diff --git a/core/storageproviders/local.go b/core/storageproviders/local.go
index 05203cfea..f71306ca6 100644
--- a/core/storageproviders/local.go
+++ b/core/storageproviders/local.go
@@ -1,19 +1,19 @@
package storageproviders
import (
- "time"
+ "os"
+ "path/filepath"
+ "sort"
+ "github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/owncast/owncast/config"
- "github.com/owncast/owncast/core/transcoder"
+ "github.com/owncast/owncast/core/data"
)
// LocalStorage represents an instance of the local storage provider for HLS video.
-type LocalStorage struct {
- // Cleanup old public HLS content every N min from the webroot.
- onlineCleanupTicker *time.Ticker
-}
+type LocalStorage struct{}
// NewLocalStorage returns a new LocalStorage instance.
func NewLocalStorage() *LocalStorage {
@@ -22,14 +22,6 @@ func NewLocalStorage() *LocalStorage {
// Setup configures this storage provider.
func (s *LocalStorage) Setup() error {
- // NOTE: This cleanup timer will have to be disabled to support recordings in the future
- // as all HLS segments have to be publicly available on disk to keep a recording of them.
- s.onlineCleanupTicker = time.NewTicker(1 * time.Minute)
- go func() {
- for range s.onlineCleanupTicker.C {
- transcoder.CleanupOldContent(config.HLSStoragePath)
- }
- }()
return nil
}
@@ -59,3 +51,68 @@ func (s *LocalStorage) MasterPlaylistWritten(localFilePath string) {
func (s *LocalStorage) Save(filePath string, retryCount int) (string, error) {
return filePath, nil
}
+
+func (s *LocalStorage) Cleanup() error {
+ // Determine how many files we should keep on disk
+ maxNumber := data.GetStreamLatencyLevel().SegmentCount
+ buffer := 10
+ baseDirectory := config.HLSStoragePath
+
+ files, err := getAllFilesRecursive(baseDirectory)
+ if err != nil {
+ return errors.Wrap(err, "unable find old video files for cleanup")
+ }
+
+ // Delete old private HLS files on disk
+ for directory := range files {
+ files := files[directory]
+ if len(files) < maxNumber+buffer {
+ continue
+ }
+
+ filesToDelete := files[maxNumber+buffer:]
+ log.Traceln("Deleting", len(filesToDelete), "old files from", baseDirectory, "for video variant", directory)
+
+ for _, file := range filesToDelete {
+ fileToDelete := filepath.Join(baseDirectory, directory, file.Name())
+ err := os.Remove(fileToDelete)
+ if err != nil {
+ return errors.Wrap(err, "unable to delete old video files")
+ }
+ }
+ }
+ return nil
+}
+
+func getAllFilesRecursive(baseDirectory string) (map[string][]os.FileInfo, error) {
+ files := make(map[string][]os.FileInfo)
+
+ var directory string
+ err := filepath.Walk(baseDirectory, func(path string, info os.FileInfo, err error) error {
+ if err != nil {
+ return err
+ }
+
+ if info.IsDir() {
+ directory = info.Name()
+ }
+
+ if filepath.Ext(info.Name()) == ".ts" {
+ files[directory] = append(files[directory], info)
+ }
+
+ return nil
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ // Sort by date so we can delete old files
+ for directory := range files {
+ sort.Slice(files[directory], func(i, j int) bool {
+ return files[directory][i].ModTime().UnixNano() > files[directory][j].ModTime().UnixNano()
+ })
+ }
+
+ return files, nil
+}
diff --git a/core/storageproviders/s3Storage.go b/core/storageproviders/s3Storage.go
index 1495d50d2..922df3bd3 100644
--- a/core/storageproviders/s3Storage.go
+++ b/core/storageproviders/s3Storage.go
@@ -7,17 +7,20 @@ import (
"os"
"path"
"path/filepath"
+ "sort"
"strings"
"time"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/core/playlist"
"github.com/owncast/owncast/utils"
+ "github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
+ "github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/owncast/owncast/config"
@@ -27,8 +30,9 @@ import (
// S3Storage is the s3 implementation of a storage provider.
type S3Storage struct {
- sess *session.Session
- host string
+ sess *session.Session
+ s3Client *s3.S3
+ host string
s3Endpoint string
s3ServingEndpoint string
@@ -74,6 +78,7 @@ func (s *S3Storage) Setup() error {
s.s3ForcePathStyle = s3Config.ForcePathStyle
s.sess = s.connectAWS()
+ s.s3Client = s3.New(s.sess)
s.uploader = s3manager.NewUploader(s.sess)
@@ -187,6 +192,21 @@ func (s *S3Storage) Save(filePath string, retryCount int) (string, error) {
return response.Location, nil
}
+func (s *S3Storage) Cleanup() error {
+ // Determine how many files we should keep on S3 storage
+ maxNumber := data.GetStreamLatencyLevel().SegmentCount
+ buffer := 20
+
+ keys, err := s.getDeletableVideoSegmentsWithOffset(maxNumber + buffer)
+ if err != nil {
+ return err
+ }
+
+ s.deleteObjects(keys)
+
+ return nil
+}
+
func (s *S3Storage) connectAWS() *session.Session {
t := http.DefaultTransport.(*http.Transport).Clone()
t.MaxIdleConnsPerHost = 100
@@ -239,3 +259,73 @@ func (s *S3Storage) rewriteRemotePlaylist(filePath string) error {
return playlist.WritePlaylist(newPlaylist, publicPath)
}
+
+func (s *S3Storage) getDeletableVideoSegmentsWithOffset(offset int) ([]s3object, error) {
+ objectsToDelete, err := s.retrieveAllVideoSegments()
+ if err != nil {
+ return nil, err
+ }
+
+ objectsToDelete = objectsToDelete[offset : len(objectsToDelete)-1]
+
+ return objectsToDelete, nil
+}
+
+func (s *S3Storage) deleteObjects(objects []s3object) {
+ keys := make([]*s3.ObjectIdentifier, len(objects))
+ for i, object := range objects {
+ keys[i] = &s3.ObjectIdentifier{Key: aws.String(object.key)}
+ }
+
+ log.Debugln("Deleting", len(keys), "objects from S3 bucket:", s.s3Bucket)
+
+ deleteObjectsRequest := &s3.DeleteObjectsInput{
+ Bucket: aws.String(s.s3Bucket),
+ Delete: &s3.Delete{
+ Objects: keys,
+ Quiet: aws.Bool(true),
+ },
+ }
+
+ _, err := s.s3Client.DeleteObjects(deleteObjectsRequest)
+ if err != nil {
+ log.Errorf("Unable to delete objects from bucket %q, %v\n", s.s3Bucket, err)
+ }
+}
+
+func (s *S3Storage) retrieveAllVideoSegments() ([]s3object, error) {
+ allObjectsListRequest := &s3.ListObjectsInput{
+ Bucket: aws.String(s.s3Bucket),
+ }
+
+ // Fetch all objects in the bucket
+ allObjectsListResponse, err := s.s3Client.ListObjects(allObjectsListRequest)
+ if err != nil {
+ return nil, errors.Wrap(err, "Unable to fetch list of items in bucket for cleanup")
+ }
+
+ // Filter out non-video segments
+ allObjects := []s3object{}
+ for _, item := range allObjectsListResponse.Contents {
+ if !strings.HasSuffix(*item.Key, ".ts") {
+ continue
+ }
+
+ allObjects = append(allObjects, s3object{
+ key: *item.Key,
+ lastModified: *item.LastModified,
+ })
+ }
+
+ // Sort the results by timestamp
+ sort.Slice(allObjects, func(i, j int) bool {
+ return allObjects[i].lastModified.After(allObjects[j].lastModified)
+ })
+
+ return allObjects, nil
+}
+
+type s3object struct {
+ key string
+ lastModified time.Time
+}
diff --git a/core/streamState.go b/core/streamState.go
index 8013c18da..99db86f76 100644
--- a/core/streamState.go
+++ b/core/streamState.go
@@ -151,7 +151,9 @@ func startOnlineCleanupTimer() {
_onlineCleanupTicker = time.NewTicker(1 * time.Minute)
go func() {
for range _onlineCleanupTicker.C {
- transcoder.CleanupOldContent(config.HLSStoragePath)
+ if err := _storage.Cleanup(); err != nil {
+ log.Errorln(err)
+ }
}
}()
}
diff --git a/core/transcoder/hlsFilesystemCleanup.go b/core/transcoder/hlsFilesystemCleanup.go
deleted file mode 100644
index c9071f716..000000000
--- a/core/transcoder/hlsFilesystemCleanup.go
+++ /dev/null
@@ -1,78 +0,0 @@
-package transcoder
-
-import (
- log "github.com/sirupsen/logrus"
-
- "os"
- "path/filepath"
- "sort"
-
- "github.com/owncast/owncast/core/data"
-)
-
-// CleanupOldContent will delete old files from the private dir that are no longer being referenced
-// in the stream.
-func CleanupOldContent(baseDirectory string) {
- // Determine how many files we should keep on disk
- maxNumber := data.GetStreamLatencyLevel().SegmentCount
- buffer := 10
-
- files, err := getAllFilesRecursive(baseDirectory)
- if err != nil {
- log.Debugln("Unable to cleanup old video files", err)
- return
- }
-
- // Delete old private HLS files on disk
- for directory := range files {
- files := files[directory]
- if len(files) < maxNumber+buffer {
- continue
- }
-
- filesToDelete := files[maxNumber+buffer:]
- log.Traceln("Deleting", len(filesToDelete), "old files from", baseDirectory, "for video variant", directory)
-
- for _, file := range filesToDelete {
- fileToDelete := filepath.Join(baseDirectory, directory, file.Name())
- err := os.Remove(fileToDelete)
- if err != nil {
- log.Debugln(err)
- }
- }
- }
-}
-
-func getAllFilesRecursive(baseDirectory string) (map[string][]os.FileInfo, error) {
- var files = make(map[string][]os.FileInfo)
-
- var directory string
- err := filepath.Walk(baseDirectory, func(path string, info os.FileInfo, err error) error {
- if err != nil {
- return err
- }
-
- if info.IsDir() {
- directory = info.Name()
- }
-
- if filepath.Ext(info.Name()) == ".ts" {
- files[directory] = append(files[directory], info)
- }
-
- return nil
- })
-
- if err != nil {
- return nil, err
- }
-
- // Sort by date so we can delete old files
- for directory := range files {
- sort.Slice(files[directory], func(i, j int) bool {
- return files[directory][i].ModTime().UnixNano() > files[directory][j].ModTime().UnixNano()
- })
- }
-
- return files, nil
-}
diff --git a/models/storageProvider.go b/models/storageProvider.go
index ee874b9ba..44961d5c2 100644
--- a/models/storageProvider.go
+++ b/models/storageProvider.go
@@ -8,4 +8,6 @@ type StorageProvider interface {
SegmentWritten(localFilePath string)
VariantPlaylistWritten(localFilePath string)
MasterPlaylistWritten(localFilePath string)
+
+ Cleanup() error
}