Skip to content

Commit 7850c82

Browse files
committed
lz4: start read from offset
1 parent b724b32 commit 7850c82

File tree

2 files changed

+50
-24
lines changed

2 files changed

+50
-24
lines changed

plugin/input/file/provider.go

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,15 @@ type jobProvider struct {
6060
}
6161

6262
type Job struct {
63-
file *os.File
64-
inode inodeID
65-
sourceID pipeline.SourceID // some value to distinguish jobs with same inode
66-
filename string
67-
symlink string
68-
curOffset int64 // offset to not call Seek() everytime
69-
tail []byte // some data of a new line read by worker, to not seek backwards to read from line start
63+
file *os.File
64+
mimeType string
65+
isCompressed bool
66+
inode inodeID
67+
sourceID pipeline.SourceID // some value to distinguish jobs with same inode
68+
filename string
69+
symlink string
70+
curOffset int64 // offset to not call Seek() everytime
71+
tail []byte // some data of a new line read by worker, to not seek backwards to read from line start
7072

7173
ignoreEventsLE uint64 // events with seq id less or equal than this should be ignored in terms offset commitment
7274
lastEventSeq uint64
@@ -83,10 +85,15 @@ type Job struct {
8385
mu *sync.Mutex
8486
}
8587

86-
func (j *Job) seek(offset int64, whence int, hint string) int64 {
87-
n, err := j.file.Seek(offset, whence)
88-
if err != nil {
89-
logger.Infof("file seek error hint=%s, name=%s, err=%s", hint, j.filename, err.Error())
88+
func (j *Job) seek(offset int64, whence int, hint string) (n int64) {
89+
var err error
90+
if !j.isCompressed {
91+
n, err = j.file.Seek(offset, whence)
92+
if err != nil {
93+
logger.Infof("file seek error hint=%s, name=%s, err=%s", hint, j.filename, err.Error())
94+
}
95+
} else {
96+
n = 0
9097
}
9198
j.curOffset = n
9299

@@ -354,6 +361,10 @@ func (jp *jobProvider) checkFileWasTruncated(job *Job, size int64) {
354361
}
355362
}
356363

364+
func isCompressed(mimeType string) bool {
365+
return mimeType == "application/x-lz4"
366+
}
367+
357368
func (jp *jobProvider) addJob(file *os.File, stat os.FileInfo, filename string, symlink string) {
358369
sourceID := sourceIDByStat(stat, symlink)
359370

@@ -370,12 +381,16 @@ func (jp *jobProvider) addJob(file *os.File, stat os.FileInfo, filename string,
370381
}
371382

372383
inode := getInode(stat)
384+
mimeType := getMimeType(filename)
385+
373386
job := &Job{
374-
file: file,
375-
inode: inode,
376-
filename: filename,
377-
symlink: symlink,
378-
sourceID: sourceID,
387+
file: file,
388+
isCompressed: isCompressed(mimeType),
389+
mimeType: mimeType,
390+
inode: inode,
391+
filename: filename,
392+
symlink: symlink,
393+
sourceID: sourceID,
379394

380395
isVirgin: true,
381396
isDone: true,

plugin/input/file/worker.go

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,20 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi
8989
}
9090
}
9191

92-
mimeType := getMimeType(file.Name())
9392
var reader io.Reader
94-
95-
if mimeType == "application/x-lz4" {
93+
if job.mimeType == "application/x-lz4" {
9694
lz4Reader := lz4.NewReader(file)
95+
if len(offsets) > 0 {
96+
for lastOffset+int64(readBufferSize) < offsets[0].Offset {
97+
n, err := lz4Reader.Read(readBuf)
98+
if err != nil {
99+
if err == io.EOF {
100+
break // End of file reached
101+
}
102+
}
103+
lastOffset += int64(n)
104+
}
105+
}
97106
reader = lz4Reader
98107
} else {
99108
reader = file
@@ -202,11 +211,13 @@ func (w *worker) processEOF(file *os.File, job *Job, jobProvider *jobProvider, t
202211
return err
203212
}
204213

205-
// files truncated from time to time, after logs from file was processed.
206-
// Position > stat.Size() means that data was truncated and
207-
// caret pointer must be moved to start of file.
208-
if totalOffset > stat.Size() {
209-
jobProvider.truncateJob(job)
214+
if !job.isCompressed {
215+
// files truncated from time to time, after logs from file was processed.
216+
// Position > stat.Size() means that data was truncated and
217+
// caret pointer must be moved to start of file.
218+
if totalOffset > stat.Size() {
219+
jobProvider.truncateJob(job)
220+
}
210221
}
211222

212223
// Mark job as done till new lines has appeared.

0 commit comments

Comments
 (0)