Skip to content
Snippets Groups Projects
Unverified Commit 24cafd73 authored by Eugen Rochko's avatar Eugen Rochko Committed by GitHub
Browse files

Lists (#5703)

* Add structure for lists

* Add list timeline streaming API

* Add list APIs, bind list-account relation to follow relation

* Add API for adding/removing accounts from lists

* Add pagination to lists API

* Add pagination to list accounts API

* Adjust scopes for new APIs

- Creating and modifying lists merely requires "write" scope
- Fetching information about lists merely requires "read" scope

* Add test for wrong user context on list timeline

* Clean up tests
parent 4a2fc2d4
No related branches found
No related tags found
No related merge requests found
......@@ -18,8 +18,8 @@ RSpec.describe AfterBlockService do
end
it "clears account's statuses" do
FeedManager.instance.push(:home, account, status)
FeedManager.instance.push(:home, account, other_account_status)
FeedManager.instance.push_to_home(account, status)
FeedManager.instance.push_to_home(account, other_account_status)
is_expected.to change {
Redis.current.zrange(home_timeline_key, 0, -1)
......
......@@ -30,11 +30,11 @@ RSpec.describe BatchedRemoveStatusService do
end
it 'removes statuses from author\'s home feed' do
expect(Feed.new(:home, alice).get(10)).to_not include([status1.id, status2.id])
expect(HomeFeed.new(alice).get(10)).to_not include([status1.id, status2.id])
end
it 'removes statuses from local follower\'s home feed' do
expect(Feed.new(:home, jeff).get(10)).to_not include([status1.id, status2.id])
expect(HomeFeed.new(jeff).get(10)).to_not include([status1.id, status2.id])
end
it 'notifies streaming API of followers' do
......
......@@ -19,12 +19,12 @@ RSpec.describe FanOutOnWriteService do
end
it 'delivers status to home timeline' do
expect(Feed.new(:home, author).get(10).map(&:id)).to include status.id
expect(HomeFeed.new(author).get(10).map(&:id)).to include status.id
end
it 'delivers status to local followers' do
pending 'some sort of problem in test environment causes this to sometimes fail'
expect(Feed.new(:home, follower).get(10).map(&:id)).to include status.id
expect(HomeFeed.new(follower).get(10).map(&:id)).to include status.id
end
it 'delivers status to hashtag' do
......
......@@ -18,8 +18,8 @@ RSpec.describe MuteService do
end
it "clears account's statuses" do
FeedManager.instance.push(:home, account, status)
FeedManager.instance.push(:home, account, other_account_status)
FeedManager.instance.push_to_home(account, status)
FeedManager.instance.push_to_home(account, other_account_status)
is_expected.to change {
Redis.current.zrange(home_timeline_key, 0, -1)
......
......@@ -25,11 +25,11 @@ RSpec.describe RemoveStatusService do
end
it 'removes status from author\'s home feed' do
expect(Feed.new(:home, alice).get(10)).to_not include(@status.id)
expect(HomeFeed.new(alice).get(10)).to_not include(@status.id)
end
it 'removes status from local follower\'s home feed' do
expect(Feed.new(:home, jeff).get(10)).to_not include(@status.id)
expect(HomeFeed.new(jeff).get(10)).to_not include(@status.id)
end
it 'sends PuSH update to PuSH subscribers' do
......
......@@ -11,41 +11,41 @@ describe FeedInsertWorker do
context 'when there are no records' do
it 'skips push with missing status' do
instance = double(push: nil)
instance = double(push_to_home: nil)
allow(FeedManager).to receive(:instance).and_return(instance)
result = subject.perform(nil, follower.id)
expect(result).to eq true
expect(instance).not_to have_received(:push)
expect(instance).not_to have_received(:push_to_home)
end
it 'skips push with missing account' do
instance = double(push: nil)
instance = double(push_to_home: nil)
allow(FeedManager).to receive(:instance).and_return(instance)
result = subject.perform(status.id, nil)
expect(result).to eq true
expect(instance).not_to have_received(:push)
expect(instance).not_to have_received(:push_to_home)
end
end
context 'when there are real records' do
it 'skips the push when there is a filter' do
instance = double(push: nil, filter?: true)
instance = double(push_to_home: nil, filter?: true)
allow(FeedManager).to receive(:instance).and_return(instance)
result = subject.perform(status.id, follower.id)
expect(result).to be_nil
expect(instance).not_to have_received(:push)
expect(instance).not_to have_received(:push_to_home)
end
it 'pushes the status onto the home timeline without filter' do
instance = double(push: nil, filter?: false)
instance = double(push_to_home: nil, filter?: false)
allow(FeedManager).to receive(:instance).and_return(instance)
result = subject.perform(status.id, follower.id)
expect(result).to be_nil
expect(instance).to have_received(:push).with(:home, follower, status)
expect(instance).to have_received(:push_to_home).with(follower, status)
end
end
end
......
......@@ -254,6 +254,26 @@ const startWorker = (workerId) => {
const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', ');
const authorizeListAccess = (id, req, next) => {
pgPool.connect((err, client, done) => {
if (err) {
next(false);
return;
}
client.query('SELECT id, account_id FROM lists WHERE id = $1 LIMIT 1', [id], (err, result) => {
done();
if (err || result.rows.length === 0 || result.rows[0].account_id !== req.accountId) {
next(false);
return;
}
next(true);
});
});
};
const streamFrom = (id, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false) => {
const streamType = notificationOnly ? ' (notification)' : '';
log.verbose(req.requestId, `Starting stream from ${id} for ${req.accountId}${streamType}`);
......@@ -410,7 +430,22 @@ const startWorker = (workerId) => {
streamFrom(`timeline:hashtag:${req.query.tag.toLowerCase()}:local`, req, streamToHttp(req, res), streamHttpEnd(req), true);
});
const wss = new WebSocket.Server({ server, verifyClient: wsVerifyClient });
app.get('/api/v1/streaming/list', (req, res) => {
const listId = req.query.list;
authorizeListAccess(listId, req, authorized => {
if (!authorized) {
res.writeHead(404, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Not found' }));
return;
}
const channel = `timeline:list:${listId}`;
streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel)));
});
});
const wss = new WebSocket.Server({ server, verifyClient: wsVerifyClient });
wss.on('connection', ws => {
const req = ws.upgradeReq;
......@@ -443,6 +478,19 @@ const startWorker = (workerId) => {
case 'hashtag:local':
streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}:local`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
break;
case 'list':
const listId = location.query.list;
authorizeListAccess(listId, req, authorized => {
if (!authorized) {
ws.close();
return;
}
const channel = `timeline:list:${listId}`;
streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
});
break;
default:
ws.close();
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment